Best Python code snippet using localstack_python
aws_events_info.py
Source:aws_events_info.py
...155 return paginator.paginate(156 NamePrefix=module.params['name_prefix'],157 ), True158 else:159 return client.list_event_sources(160 NamePrefix=module.params['name_prefix'],161 ), False162 elif module.params['list_partner_event_source_accounts']:163 if client.can_paginate('list_partner_event_source_accounts'):164 paginator = client.get_paginator('list_partner_event_source_accounts')165 return paginator.paginate(166 EventSourceName=module.params['event_source_name'],167 ), True168 else:169 return client.list_partner_event_source_accounts(170 EventSourceName=module.params['event_source_name'],171 ), False172 elif module.params['list_partner_event_sources']:173 if client.can_paginate('list_partner_event_sources'):...
event_stream_trigger.py
Source:event_stream_trigger.py
...83 print("Creating event source mapping")84 return True85 except ClientError as error:86 print(error.response)87def list_event_sources(source_arn, lambda_arn):88 '''89 Gets the UUID of the current event source so we can update it90 91 args:92 source_arn: the arn of the kinesis or dynamodb stream, retrieved from get arn helper functions93 lambda_name: name of the lambda, retried from config file94 '''95 try:96 current_sources = lambdaClient.list_event_source_mappings(FunctionName=lambda_arn, EventSourceArn=source_arn)97 if current_sources['ResponseMetadata']['HTTPStatusCode'] == 200:98 if len(current_sources['EventSourceMappings']) > 0:99 uuid = current_sources['EventSourceMappings'][0]['UUID']100 batch_size = current_sources['EventSourceMappings'][0]['BatchSize']101 state = current_sources['EventSourceMappings'][0]['State']102 source = current_sources['EventSourceMappings'][0]['EventSourceArn']103 if state == 'Enabled':104 state = True105 else:106 state = False107 108 return uuid, batch_size, state, source109 else:110 return False111 except ClientError as error:112 print(error.response)113def update_event_source_trigger(uuid=None, lambda_arn=None, enabled=False, batch_size=None):114 '''115 updates current event source116 args:117 uuid: the uuid of the current event source118 lambda_name: name of the lambda we're working with119 enabled: Boolean, false by default120 batch_size: size of records being processed121 '''122 print('Attempting to update current event source')123 try:124 update_event = lambdaClient.update_event_source_mapping(125 UUID=uuid,126 FunctionName=lambda_arn,127 Enabled=enabled,128 BatchSize=batch_size129 )130 if update_event['ResponseMetadata']['HTTPStatusCode'] == 202:131 print("Event source updated!")132 return True133 except ClientError as error:134 print(error.response)135############## Main Entry Point ##############136def create_event_source(source_type=None, source_name=None, lambda_name=None, batch_size=None, enabled=False, starting_position=False, alias=False):137 '''138 Entrypoint for adding an event source as the lambda invocator 139 args:140 source_type: kinesis or dynamodb141 source_name: the name of the resource 142 lambda_name: name of the lambda we're going to be triggering143 enabled: boolean, False by default144 starting_position: Choice of 'TRIM_HORIZON', 'LATEST', 'AT_TIMESTAMP'145 '''146 #Get the source type and assign the value of stream accordingly 147 if source_type == 'kinesis':148 stream = get_kinesis_stream(source_name)149 elif source_type == 'dynamodb':150 stream = get_dynamo_stream(source_name)151 else:152 print("No valid source type found, please use 'kinesis', or 'dynamodb'")153 sys.exit(1)154 #Get lambda arn, this is for aliases155 arn = get_lambda_arn(lambda_name=lambda_name, alias=alias)156 #Create the mapping157 create = create_event_source_trigger(lambda_arn=arn, event_source=stream, enabled=enabled, batch_size=batch_size, starting_position=starting_position)158 if create:159 print("Event source created successfully")160 return True161def update_event_source(source_type=None, source_name=None, lambda_name=None, batch_size=None, enabled=False, alias=False):162 '''163 Main entry point for updating the event source. The first thing it does is get the arn for the desired resource164 then it checks to see if we actually need an update by validating our current state against our desired state.165 If the match it exits gracefully, else it will perform an update166 args:167 source_type: kinesis or dynamodb168 source_name: name of the resource (table name or stream name)169 lambda_name: name of the lambda function170 batch_size: integer, size of batch per lambda invocation171 enabled: Boolean, false by default172 '''173 if source_type == 'kinesis':174 stream = get_kinesis_stream(source_name)175 elif source_type == 'dynamodb':176 stream = get_dynamo_stream(source_name)177 else:178 print("No valid source type found, please use 'kinesis', or 'dynamodb'")179 sys.exit(1)180 #Get lambda arn, this is for aliases181 arn = get_lambda_arn(lambda_name=lambda_name, alias=alias)182 183 #Get all of our current values184 current_uuid, current_batch_size, current_state, current_source = list_event_sources(source_arn=stream, lambda_arn=arn)185 #Validates what we want vs what we have186 if all(item in [stream, batch_size, enabled] for item in [current_source, current_batch_size, current_state]):187 print('No update needed')188 return True189 else:190 print('Updating event source')191 update = update_event_source_trigger(uuid=current_uuid, lambda_arn=arn, enabled=enabled, batch_size=batch_size)192def check_source_mapping(source_type=None, source_name=None, lambda_name=None, alias=False):193 '''194 Gets current event source config and reports back to update-config action. This is used to see if we need to either update195 or create the event source mapping196 args:197 source_type: kinesis or dynamodb198 source_name: name of the resource (table name or stream name)199 lambda_name: name of the lambda function200 '''201 if source_type == 'kinesis':202 stream = get_kinesis_stream(source_name)203 elif source_type == 'dynamodb':204 stream = get_dynamo_stream(source_name)205 else:206 print("No valid source type found, please use 'kinesis', or 'dynamodb'")207 sys.exit(1)208 #Get arn209 arn = get_lambda_arn(lambda_name=lambda_name, alias=alias)210 211 #Get our current sources212 get_sources = list_event_sources(source_arn=stream, lambda_arn=arn)213 #If get sources returns true it means the mapping is there and we only need to update, else we'll need to create214 if get_sources:215 return True216 else:...
generate_bruteforce_tests.py
Source:generate_bruteforce_tests.py
1import re2import os3import json4OUTPUT_FMT = 'BRUTEFORCE_TESTS = %s'5OUTPUT_FILE = 'bruteforce_tests.py'6API_DEFINITIONS = 'aws-sdk-js/apis/'7OPERATION_CONTAINS = {8 'list_',9 'describe_',10 'get_',11}12BLACKLIST_OPERATIONS = {13 'get_apis',14 'get_bucket_notification',15 'get_bucket_notification_configuration',16 'list_web_ac_ls',17 'get_hls_streaming_session_url',18 'describe_scaling_plans',19 'list_certificate_authorities',20 'list_event_sources',21 'get_geo_location',22 'get_checker_ip_ranges',23 'list_geo_locations',24 'list_public_keys',25 # https://twitter.com/AndresRiancho/status/110668043444280935026 'describe_stacks',27 'describe_service_errors',28 'describe_application_versions',29 'describe_applications',30 'describe_environments',31 'describe_events',32 'list_available_solution_stacks',33 'list_platform_versions',34}35def extract_service_name(filename, api_json):36 try:37 endpoint = api_json['metadata']['endpointPrefix']38 except:39 return None40 endpoint = endpoint.replace('api.', '')41 endpoint = endpoint.replace('opsworks-cm', 'opworks')42 endpoint = endpoint.replace('acm-pca', 'acm')43 return endpoint44def is_dangerous(operation_name):45 for safe in OPERATION_CONTAINS:46 if safe in operation_name:47 return False48 return True49def extract_operations(api_json):50 operations = []51 items = api_json['operations'].items()52 for operation_name, operation_data in items:53 operation_name = to_underscore(operation_name)54 if is_dangerous(operation_name):55 continue56 if operation_name in BLACKLIST_OPERATIONS:57 continue58 inputs = operation_data.get('input', None)59 if inputs is None:60 operations.append(operation_name)61 continue62 inputs = str(inputs)63 if "required" not in inputs:64 operations.append(operation_name)65 continue66 operations = list(set(operations))67 operations.sort()68 return operations69def to_underscore(name):70 s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)71 return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()72def main():73 bruteforce_tests = dict()74 for filename in os.listdir(API_DEFINITIONS):75 if not filename.endswith('.min.json'):76 continue77 api_json_data = open(os.path.join(API_DEFINITIONS, filename)).read()78 api_json = json.loads(api_json_data)79 service_name = extract_service_name(filename, api_json)80 if service_name is None:81 print('%s does not define a service name' % filename)82 continue83 operations = extract_operations(api_json)84 if not operations:85 continue86 if service_name in bruteforce_tests:87 bruteforce_tests[service_name].extend(operations)88 else:89 bruteforce_tests[service_name] = operations90 output = OUTPUT_FMT % json.dumps(bruteforce_tests,91 indent=4,92 sort_keys=True)93 open(OUTPUT_FILE, 'w').write(output)94if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!