Best Python code snippet using localstack_python
sns_listener.py
Source: sns_listener.py
...74 try:75 sqs_client.send_message(76 QueueUrl=queue_url,77 MessageBody=create_sns_message_body(subscriber, req_data),78 MessageAttributes=create_sqs_message_attributes(subscriber, message_attributes)79 )80 except Exception as exc:81 return make_error(message=str(exc), code=400)82 elif subscriber['Protocol'] == 'lambda':83 lambda_api.process_sns_notification(84 subscriber['Endpoint'],85 topic_arn, message, subject=req_data.get('Subject', [None])[0]86 )87 elif subscriber['Protocol'] in ['http', 'https']:88 try:89 message_body = create_sns_message_body(subscriber, req_data)90 except Exception as exc:91 return make_error(message=str(exc), code=400)92 requests.post(93 subscriber['Endpoint'],94 headers={95 'Content-Type': 'text/plain',96 'x-amz-sns-message-type': 'Notification'97 },98 data=message_body99 )100 else:101 LOGGER.warning('Unexpected protocol "%s" for SNS subscription' % subscriber['Protocol'])102 # return response here because we do not want the request to be forwarded to SNS103 return make_response(req_action)104 return True105 def return_response(self, method, path, data, headers, response):106 # This method is executed by the proxy after we've already received a107 # response from the backend, hence we can utilize the "response" variable here108 if method == 'POST' and path == '/':109 req_data = urlparse.parse_qs(to_str(data))110 req_action = req_data['Action'][0]111 if req_action == 'Subscribe' and response.status_code < 400:112 response_data = xmltodict.parse(response.content)113 topic_arn = (req_data.get('TargetArn') or req_data.get('TopicArn'))[0]114 sub_arn = response_data['SubscribeResponse']['SubscribeResult']['SubscriptionArn']115 do_subscribe(topic_arn, req_data['Endpoint'][0], req_data['Protocol'][0], sub_arn)116# instantiate listener117UPDATE_SNS = ProxyListenerSNS()118def do_create_topic(topic_arn):119 if topic_arn not in SNS_SUBSCRIPTIONS:120 SNS_SUBSCRIPTIONS[topic_arn] = []121def do_delete_topic(topic_arn):122 if topic_arn in SNS_SUBSCRIPTIONS:123 del SNS_SUBSCRIPTIONS[topic_arn]124def do_subscribe(topic_arn, endpoint, protocol, subscription_arn):125 subscription = {126 # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html127 'TopicArn': topic_arn,128 'Endpoint': endpoint,129 'Protocol': protocol,130 'SubscriptionArn': subscription_arn,131 'RawMessageDelivery': 'false'132 }133 SNS_SUBSCRIPTIONS[topic_arn].append(subscription)134def do_unsubscribe(subscription_arn):135 for topic_arn in SNS_SUBSCRIPTIONS:136 SNS_SUBSCRIPTIONS[topic_arn] = [137 sub for sub in SNS_SUBSCRIPTIONS[topic_arn]138 if sub['SubscriptionArn'] != subscription_arn139 ]140# ---------------141# HELPER METHODS142# ---------------143def get_topic_by_arn(topic_arn):144 if topic_arn in SNS_SUBSCRIPTIONS:145 return SNS_SUBSCRIPTIONS[topic_arn]146 else:147 return None148def get_subscription_by_arn(sub_arn):149 # TODO maintain separate map instead of traversing all items150 for key, subscriptions in SNS_SUBSCRIPTIONS.items():151 for sub in subscriptions:152 if sub['SubscriptionArn'] == sub_arn:153 return sub154def make_response(op_name, content=''):155 response = Response()156 if not content:157 content = '<MessageId>%s</MessageId>' % short_uid()158 response._content = """<{op_name}Response xmlns="http://sns.amazonaws.com/doc/2010-03-31/">159 <{op_name}Result>160 {content}161 </{op_name}Result>162 <ResponseMetadata><RequestId>{req_id}</RequestId></ResponseMetadata>163 </{op_name}Response>""".format(op_name=op_name, content=content, req_id=short_uid())164 response.status_code = 200165 return response166def make_error(message, code=400, code_string='InvalidParameter'):167 response = Response()168 response._content = """<ErrorResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/"><Error>169 <Type>Sender</Type>170 <Code>{code_string}</Code>171 <Message>{message}</Message>172 </Error><RequestId>{req_id}</RequestId>173 </ErrorResponse>""".format(message=message, code_string=code_string, req_id=short_uid())174 response.status_code = code175 return response176def create_sns_message_body(subscriber, req_data):177 message = req_data['Message'][0]178 subject = req_data.get('Subject', [None])[0]179 protocol = subscriber['Protocol']180 if subscriber['RawMessageDelivery'] == 'true':181 return message182 if req_data.get('MessageStructure') == ['json']:183 message = json.loads(message)184 try:185 message = message.get(protocol, message['default'])186 except KeyError:187 raise Exception("Unable to find 'default' key in message payload")188 data = {}189 data['MessageId'] = str(uuid.uuid4())190 data['Type'] = 'Notification'191 data['Message'] = message192 data['TopicArn'] = subscriber['TopicArn']193 if subject is not None:194 data['Subject'] = subject195 attributes = get_message_attributes(req_data)196 if attributes:197 data['MessageAttributes'] = attributes198 return json.dumps(data)199def create_sqs_message_attributes(subscriber, attributes):200 if subscriber.get('RawMessageDelivery') not in ('true', True):201 return {}202 message_attributes = {}203 for key, value in attributes.items():204 attribute = {}205 attribute['DataType'] = value['Type']206 if value['Type'] == 'Binary':207 attribute['BinaryValue'] = value['Value']208 else:209 attribute['StringValue'] = value['Value']210 message_attributes[key] = attribute211 return message_attributes212def get_message_attributes(req_data):213 attributes = {}...
Check out the latest blogs from LambdaTest on this topic:
The fact is not alien to us anymore that cross browser testing is imperative to enhance your application’s user experience. Enhanced knowledge of popular and highly acclaimed testing frameworks goes a long way in developing a new app. It holds more significance if you are a full-stack developer or expert programmer.
QA testers have a unique role and responsibility to serve the customer. Serving the customer in software testing means protecting customers from application defects, failures, and perceived failures from missing or misunderstood requirements. Testing for known requirements based on documentation or discussion is the core of the testing profession. One unique way QA testers can both differentiate themselves and be innovative occurs when senseshaping is used to improve the application user experience.
Having a good web design can empower business and make your brand stand out. According to a survey by Top Design Firms, 50% of users believe that website design is crucial to an organization’s overall brand. Therefore, businesses should prioritize website design to meet customer expectations and build their brand identity. Your website is the face of your business, so it’s important that it’s updated regularly as per the current web design trends.
Enterprise resource planning (ERP) is a form of business process management software—typically a suite of integrated applications—that assists a company in managing its operations, interpreting data, and automating various back-office processes. The introduction of a new ERP system is analogous to the introduction of a new product into the market. If the product is not handled appropriately, it will fail, resulting in significant losses for the business. Most significantly, the employees’ time, effort, and morale would suffer as a result of the procedure.
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!