Best Python code snippet using localstack_python
sns_listener.py
Source:sns_listener.py
...32 req_action = req_data['Action'][0]33 topic_arn = req_data.get('TargetArn') or req_data.get('TopicArn')34 if topic_arn:35 topic_arn = topic_arn[0]36 topic_arn = aws_stack.fix_account_id_in_arns(topic_arn)37 if req_action == 'SetSubscriptionAttributes':38 sub = get_subscription_by_arn(req_data['SubscriptionArn'][0])39 if not sub:40 return make_error(message='Unable to find subscription for given ARN', code=400)41 attr_name = req_data['AttributeName'][0]42 attr_value = req_data['AttributeValue'][0]43 sub[attr_name] = attr_value44 return make_response(req_action)45 elif req_action == 'GetSubscriptionAttributes':46 sub = get_subscription_by_arn(req_data['SubscriptionArn'][0])47 if not sub:48 return make_error(message='Unable to find subscription for given ARN', code=400)49 content = '<Attributes>'50 for key, value in sub.items():51 content += '<entry><key>%s</key><value>%s</value></entry>\n' % (key, value)52 content += '</Attributes>'53 return make_response(req_action, content=content)54 elif req_action == 'Subscribe':55 if 'Endpoint' not in req_data:56 return make_error(message='Endpoint not specified in subscription', code=400)57 elif req_action == 'Unsubscribe':58 if 'SubscriptionArn' not in req_data:59 return make_error(message='SubscriptionArn not specified in unsubscribe request', code=400)60 do_unsubscribe(req_data.get('SubscriptionArn')[0])61 elif req_action == 'DeleteTopic':62 do_delete_topic(topic_arn)63 elif req_action == 'Publish':64 # No need to create a topic to send SMS with SNS65 # but we can't mock a sending so we only return that it went well66 if 'PhoneNumber' not in req_data:67 if topic_arn not in SNS_SUBSCRIPTIONS.keys():68 return make_error(code=404, code_string='NotFound', message='Topic does not exist')69 publish_message(topic_arn, req_data)70 # return response here because we do not want the request to be forwarded to SNS backend71 return make_response(req_action)72 elif req_action == 'ListTagsForResource':73 tags = do_list_tags_for_resource(topic_arn)74 content = '<Tags/>'75 if len(tags) > 0:76 content = '<Tags>'77 for tag in tags:78 content += '<member>'79 content += '<Key>%s</Key>' % tag['Key']80 content += '<Value>%s</Value>' % tag['Value']81 content += '</member>'82 content += '</Tags>'83 return make_response(req_action, content=content)84 elif req_action == 'TagResource':85 tags = []86 req_tags = {k: v for k, v in req_data.items() if k.startswith('Tags.member.')}87 for i in range(int(len(req_tags.keys()) / 2)):88 key = req_tags['Tags.member.' + str(i + 1) + '.Key'][0]89 value = req_tags['Tags.member.' + str(i + 1) + '.Value'][0]90 tags.append({'Key': key, 'Value': value})91 do_tag_resource(topic_arn, tags)92 return make_response(req_action)93 elif req_action == 'UntagResource':94 tags_to_remove = []95 req_tags = {k: v for k, v in req_data.items() if k.startswith('TagKeys.member.')}96 req_tags = req_tags.values()97 for tag in req_tags:98 tags_to_remove.append(tag[0])99 do_untag_resource(topic_arn, tags_to_remove)100 return make_response(req_action)101 data = self._reset_account_id(data)102 return Request(data=data, headers=headers, method=method)103 return True104 def _reset_account_id(self, data):105 """ Fix account ID in request payload. All external-facing responses contain our106 predefined account ID (defaults to 000000000000), whereas the backend endpoint107 from moto expects a different hardcoded account ID (123456789012). """108 return aws_stack.fix_account_id_in_arns(109 data, colon_delimiter='%3A', existing=TEST_AWS_ACCOUNT_ID, replace=MOTO_ACCOUNT_ID)110 def return_response(self, method, path, data, headers, response):111 if method == 'POST' and path == '/':112 # convert account IDs in ARNs113 data = aws_stack.fix_account_id_in_arns(data, colon_delimiter='%3A')114 aws_stack.fix_account_id_in_arns(response)115 # parse request and extract data116 req_data = urlparse.parse_qs(to_str(data))117 req_action = req_data['Action'][0]118 if req_action == 'Subscribe' and response.status_code < 400:119 response_data = xmltodict.parse(response.content)120 topic_arn = (req_data.get('TargetArn') or req_data.get('TopicArn'))[0]121 attributes = get_subscribe_attributes(req_data)122 sub_arn = response_data['SubscribeResponse']['SubscribeResult']['SubscriptionArn']123 do_subscribe(124 topic_arn,125 req_data['Endpoint'][0],126 req_data['Protocol'][0],127 sub_arn,128 attributes...
cloudformation_listener.py
Source:cloudformation_listener.py
...157 if method == 'OPTIONS':158 return 200159 data = data or ''160 data_orig = data161 data = aws_stack.fix_account_id_in_arns(data, existing='%3A{}%3Astack/'.format(TEST_AWS_ACCOUNT_ID),162 replace='%3A{}%3Astack/'.format(MOTO_CLOUDFORMATION_ACCOUNT_ID), colon_delimiter='')163 data = aws_stack.fix_account_id_in_arns(data, existing='%3A{}%3AchangeSet/'.format(TEST_AWS_ACCOUNT_ID),164 replace='%3A{}%3AchangeSet/'.format(MOTO_CLOUDFORMATION_ACCOUNT_ID), colon_delimiter='')165 data = aws_stack.fix_account_id_in_arns(data, existing=TEST_AWS_ACCOUNT_ID,166 replace=MOTO_ACCOUNT_ID, colon_delimiter='%3A')167 req_data = None168 if method == 'POST' and path == '/':169 req_data = urlparse.parse_qs(to_str(data))170 req_data = dict([(k, v[0]) for k, v in req_data.items()])171 action = req_data.get('Action')172 stack_name = req_data.get('StackName')173 if action == 'CreateStack':174 event_publisher.fire_event(175 event_publisher.EVENT_CLOUDFORMATION_CREATE_STACK,176 payload={'n': event_publisher.get_hash(stack_name)}177 )178 if action == 'DeleteStack':179 client = aws_stack.connect_to_service(180 'cloudformation',181 region_name=aws_stack.extract_region_from_auth_header(headers)182 )183 stack_resources = client.list_stack_resources(StackName=stack_name)['StackResourceSummaries']184 template_deployer.delete_stack(stack_name, stack_resources)185 if action == 'DescribeStackEvents':186 # fix an issue where moto cannot handle ARNs as stack names (or missing names)187 run_fix = not stack_name188 if stack_name:189 if stack_name.startswith('arn:aws:cloudformation'):190 run_fix = True191 pattern = r'arn:aws:cloudformation:[^:]+:[^:]+:stack/([^/]+)(/.+)?'192 stack_name = re.sub(pattern, r'\1', stack_name)193 if run_fix:194 stack_names = [stack_name] if stack_name else self._list_stack_names()195 client = aws_stack.connect_to_service('cloudformation')196 events = []197 for stack_name in stack_names:198 tmp = client.describe_stack_events(StackName=stack_name)['StackEvents'][:1]199 events.extend(tmp)200 events = [{'member': e} for e in events]201 response_content = '<StackEvents>%s</StackEvents>' % obj_to_xml(events)202 return make_response('DescribeStackEvents', response_content)203 if req_data:204 if action == 'ValidateTemplate':205 return validate_template(req_data)206 if action in ['CreateStack', 'UpdateStack', 'CreateChangeSet']:207 do_replace_url = is_real_s3_url(req_data.get('TemplateURL'))208 if do_replace_url:209 req_data['TemplateURL'] = convert_s3_to_local_url(req_data['TemplateURL'])210 url = req_data.get('TemplateURL', '')211 is_custom_local_endpoint = is_local_service_url(url) and '://localhost:' not in url212 modified_template_body = transform_template(req_data)213 if not modified_template_body and is_custom_local_endpoint:214 modified_template_body = get_template_body(req_data)215 if modified_template_body:216 req_data.pop('TemplateURL', None)217 req_data['TemplateBody'] = modified_template_body218 if modified_template_body or do_replace_url:219 data = urlparse.urlencode(req_data, doseq=True)220 return Request(data=data, headers=headers, method=method)221 if data != data_orig or action in ['DescribeChangeSet', 'ExecuteChangeSet']:222 return Request(data=urlparse.urlencode(req_data, doseq=True), headers=headers, method=method)223 return True224 def return_response(self, method, path, data, headers, response):225 req_data = urlparse.parse_qs(to_str(data))226 req_data = dict([(k, v[0]) for k, v in req_data.items()])227 action = req_data.get('Action')228 if response.status_code >= 400:229 LOG.debug('Error response for CloudFormation action "%s" (%s) %s %s: %s' %230 (action, response.status_code, method, path, response.content))231 if response._content:232 aws_stack.fix_account_id_in_arns(response)233 fix_hardcoded_creation_date(response)234 fix_region_in_arns(response)235 @staticmethod236 def _list_stack_names():237 client = aws_stack.connect_to_service('cloudformation')238 stacks = client.list_stacks()['StackSummaries']239 stack_names = []240 for stack in stacks:241 status = stack['StackStatus']242 if 'FAILED' in status or 'DELETE' in status:243 continue244 stack_names.append(stack['StackName'])245 return stack_names246# instantiate listener...
iam_listener.py
Source:iam_listener.py
...30 def _reset_account_id(self, data):31 """ Fix account ID in request payload. All external-facing responses contain our32 predefined account ID (defaults to 000000000000), whereas the backend endpoint33 from moto expects a different hardcoded account ID (123456789012). """34 return aws_stack.fix_account_id_in_arns(35 data, colon_delimiter='%3A', existing=TEST_AWS_ACCOUNT_ID, replace=MOTO_ACCOUNT_ID)36 def _replace(self, response, pattern, replacement):37 content = to_str(response.content)38 response._content = re.sub(pattern, replacement, content)39# instantiate listener...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!