Best Python code snippet using localstack_python
sns_listener.py
Source:sns_listener.py
...74 try:75 sqs_client.send_message(76 QueueUrl=queue_url,77 MessageBody=create_sns_message_body(subscriber, req_data),78 MessageAttributes=create_sqs_message_attributes(subscriber, message_attributes)79 )80 except Exception as exc:81 return make_error(message=str(exc), code=400)82 elif subscriber['Protocol'] == 'lambda':83 lambda_api.process_sns_notification(84 subscriber['Endpoint'],85 topic_arn, message, subject=req_data.get('Subject', [None])[0]86 )87 elif subscriber['Protocol'] in ['http', 'https']:88 try:89 message_body = create_sns_message_body(subscriber, req_data)90 except Exception as exc:91 return make_error(message=str(exc), code=400)92 requests.post(93 subscriber['Endpoint'],94 headers={95 'Content-Type': 'text/plain',96 'x-amz-sns-message-type': 'Notification'97 },98 data=message_body99 )100 else:101 LOGGER.warning('Unexpected protocol "%s" for SNS subscription' % subscriber['Protocol'])102 # return response here because we do not want the request to be forwarded to SNS103 return make_response(req_action)104 return True105 def return_response(self, method, path, data, headers, response):106 # This method is executed by the proxy after we've already received a107 # response from the backend, hence we can utilize the "response" variable here108 if method == 'POST' and path == '/':109 req_data = urlparse.parse_qs(to_str(data))110 req_action = req_data['Action'][0]111 if req_action == 'Subscribe' and response.status_code < 400:112 response_data = xmltodict.parse(response.content)113 topic_arn = (req_data.get('TargetArn') or req_data.get('TopicArn'))[0]114 sub_arn = response_data['SubscribeResponse']['SubscribeResult']['SubscriptionArn']115 do_subscribe(topic_arn, req_data['Endpoint'][0], req_data['Protocol'][0], sub_arn)116# instantiate listener117UPDATE_SNS = ProxyListenerSNS()118def do_create_topic(topic_arn):119 if topic_arn not in SNS_SUBSCRIPTIONS:120 SNS_SUBSCRIPTIONS[topic_arn] = []121def do_delete_topic(topic_arn):122 if topic_arn in SNS_SUBSCRIPTIONS:123 del SNS_SUBSCRIPTIONS[topic_arn]124def do_subscribe(topic_arn, endpoint, protocol, subscription_arn):125 subscription = {126 # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html127 'TopicArn': topic_arn,128 'Endpoint': endpoint,129 'Protocol': protocol,130 'SubscriptionArn': subscription_arn,131 'RawMessageDelivery': 'false'132 }133 SNS_SUBSCRIPTIONS[topic_arn].append(subscription)134def do_unsubscribe(subscription_arn):135 for topic_arn in SNS_SUBSCRIPTIONS:136 SNS_SUBSCRIPTIONS[topic_arn] = [137 sub for sub in SNS_SUBSCRIPTIONS[topic_arn]138 if sub['SubscriptionArn'] != subscription_arn139 ]140# ---------------141# HELPER METHODS142# ---------------143def get_topic_by_arn(topic_arn):144 if topic_arn in SNS_SUBSCRIPTIONS:145 return SNS_SUBSCRIPTIONS[topic_arn]146 else:147 return None148def get_subscription_by_arn(sub_arn):149 # TODO maintain separate map instead of traversing all items150 for key, subscriptions in SNS_SUBSCRIPTIONS.items():151 for sub in subscriptions:152 if sub['SubscriptionArn'] == sub_arn:153 return sub154def make_response(op_name, content=''):155 response = Response()156 if not content:157 content = '<MessageId>%s</MessageId>' % short_uid()158 response._content = """<{op_name}Response xmlns="http://sns.amazonaws.com/doc/2010-03-31/">159 <{op_name}Result>160 {content}161 </{op_name}Result>162 <ResponseMetadata><RequestId>{req_id}</RequestId></ResponseMetadata>163 </{op_name}Response>""".format(op_name=op_name, content=content, req_id=short_uid())164 response.status_code = 200165 return response166def make_error(message, code=400, code_string='InvalidParameter'):167 response = Response()168 response._content = """<ErrorResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/"><Error>169 <Type>Sender</Type>170 <Code>{code_string}</Code>171 <Message>{message}</Message>172 </Error><RequestId>{req_id}</RequestId>173 </ErrorResponse>""".format(message=message, code_string=code_string, req_id=short_uid())174 response.status_code = code175 return response176def create_sns_message_body(subscriber, req_data):177 message = req_data['Message'][0]178 subject = req_data.get('Subject', [None])[0]179 protocol = subscriber['Protocol']180 if subscriber['RawMessageDelivery'] == 'true':181 return message182 if req_data.get('MessageStructure') == ['json']:183 message = json.loads(message)184 try:185 message = message.get(protocol, message['default'])186 except KeyError:187 raise Exception("Unable to find 'default' key in message payload")188 data = {}189 data['MessageId'] = str(uuid.uuid4())190 data['Type'] = 'Notification'191 data['Message'] = message192 data['TopicArn'] = subscriber['TopicArn']193 if subject is not None:194 data['Subject'] = subject195 attributes = get_message_attributes(req_data)196 if attributes:197 data['MessageAttributes'] = attributes198 return json.dumps(data)199def create_sqs_message_attributes(subscriber, attributes):200 if subscriber.get('RawMessageDelivery') not in ('true', True):201 return {}202 message_attributes = {}203 for key, value in attributes.items():204 attribute = {}205 attribute['DataType'] = value['Type']206 if value['Type'] == 'Binary':207 attribute['BinaryValue'] = value['Value']208 else:209 attribute['StringValue'] = value['Value']210 message_attributes[key] = attribute211 return message_attributes212def get_message_attributes(req_data):213 attributes = {}...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!