Best Python code snippet using localstack_python
test_sns.py
Source:test_sns.py
...472 for test in test_data:473 filter_policy = test[1]474 attributes = test[2]475 expected = test[3]476 assert expected == check_filter_policy(filter_policy, attributes)477 def test_is_raw_message_delivery(self, subscriber):478 valid_true_values = ["true", "True", True]479 for true_value in valid_true_values:480 subscriber["RawMessageDelivery"] = true_value481 assert is_raw_message_delivery(subscriber)482 def test_is_not_raw_message_delivery(self, subscriber):483 invalid_values = ["false", "False", False, "somevalue", ""]484 for invalid_values in invalid_values:485 subscriber["RawMessageDelivery"] = invalid_values486 assert not is_raw_message_delivery(subscriber)487 del subscriber["RawMessageDelivery"]...
sns_listener.py
Source:sns_listener.py
...57 sqs_client = aws_stack.connect_to_service('sqs')58 for subscriber in SNS_SUBSCRIPTIONS[topic_arn]:59 filter_policy = json.loads(subscriber.get('FilterPolicy', '{}'))60 message_attributes = get_message_attributes(req_data)61 if check_filter_policy(filter_policy, message_attributes):62 if subscriber['Protocol'] == 'sqs':63 endpoint = subscriber['Endpoint']64 if 'sqs_queue_url' in subscriber:65 queue_url = subscriber.get('sqs_queue_url')66 elif '://' in endpoint:67 queue_url = endpoint68 else:69 queue_name = endpoint.split(':')[5]70 queue_url = aws_stack.get_sqs_queue_url(queue_name)71 subscriber['sqs_queue_url'] = queue_url72 try:73 sqs_client.send_message(74 QueueUrl=queue_url,75 MessageBody=create_sns_message_body(subscriber, req_data)76 )77 except Exception as exc:78 return make_error(message=str(exc), code=400)79 elif subscriber['Protocol'] == 'lambda':80 lambda_api.process_sns_notification(81 subscriber['Endpoint'],82 topic_arn, message, subject=req_data.get('Subject', [None])[0]83 )84 elif subscriber['Protocol'] in ['http', 'https']:85 try:86 message_body = create_sns_message_body(subscriber, req_data)87 except Exception as exc:88 return make_error(message=str(exc), code=400)89 requests.post(90 subscriber['Endpoint'],91 headers={92 'Content-Type': 'text/plain',93 'x-amz-sns-message-type': 'Notification'94 },95 data=message_body96 )97 else:98 LOGGER.warning('Unexpected protocol "%s" for SNS subscription' % subscriber['Protocol'])99 # return response here because we do not want the request to be forwarded to SNS100 return make_response(req_action)101 return True102 def return_response(self, method, path, data, headers, response):103 # This method is executed by the proxy after we've already received a104 # response from the backend, hence we can utilize the "response" variable here105 if method == 'POST' and path == '/':106 req_data = urlparse.parse_qs(to_str(data))107 req_action = req_data['Action'][0]108 if req_action == 'Subscribe' and response.status_code < 400:109 response_data = xmltodict.parse(response.content)110 topic_arn = (req_data.get('TargetArn') or req_data.get('TopicArn'))[0]111 sub_arn = response_data['SubscribeResponse']['SubscribeResult']['SubscriptionArn']112 do_subscribe(topic_arn, req_data['Endpoint'][0], req_data['Protocol'][0], sub_arn)113# instantiate listener114UPDATE_SNS = ProxyListenerSNS()115def do_create_topic(topic_arn):116 if topic_arn not in SNS_SUBSCRIPTIONS:117 SNS_SUBSCRIPTIONS[topic_arn] = []118def do_subscribe(topic_arn, endpoint, protocol, subscription_arn):119 subscription = {120 # http://docs.aws.amazon.com/cli/latest/reference/sns/get-subscription-attributes.html121 'TopicArn': topic_arn,122 'Endpoint': endpoint,123 'Protocol': protocol,124 'SubscriptionArn': subscription_arn,125 'RawMessageDelivery': 'false'126 }127 SNS_SUBSCRIPTIONS[topic_arn].append(subscription)128def do_unsubscribe(subscription_arn):129 for topic_arn in SNS_SUBSCRIPTIONS:130 SNS_SUBSCRIPTIONS[topic_arn] = [131 sub for sub in SNS_SUBSCRIPTIONS[topic_arn]132 if sub['SubscriptionArn'] != subscription_arn133 ]134# ---------------135# HELPER METHODS136# ---------------137def get_topic_by_arn(topic_arn):138 if topic_arn in SNS_SUBSCRIPTIONS:139 return SNS_SUBSCRIPTIONS[topic_arn]140 else:141 return None142def get_subscription_by_arn(sub_arn):143 # TODO maintain separate map instead of traversing all items144 for key, subscriptions in SNS_SUBSCRIPTIONS.items():145 for sub in subscriptions:146 if sub['SubscriptionArn'] == sub_arn:147 return sub148def make_response(op_name, content=''):149 response = Response()150 if not content:151 content = '<MessageId>%s</MessageId>' % short_uid()152 response._content = """<{op_name}Response xmlns="http://sns.amazonaws.com/doc/2010-03-31/">153 <{op_name}Result>154 {content}155 </{op_name}Result>156 <ResponseMetadata><RequestId>{req_id}</RequestId></ResponseMetadata>157 </{op_name}Response>""".format(op_name=op_name, content=content, req_id=short_uid())158 response.status_code = 200159 return response160def make_error(message, code=400, code_string='InvalidParameter'):161 response = Response()162 response._content = """<ErrorResponse xmlns="http://sns.amazonaws.com/doc/2010-03-31/"><Error>163 <Type>Sender</Type>164 <Code>{code_string}</Code>165 <Message>{message}</Message>166 </Error><RequestId>{req_id}</RequestId>167 </ErrorResponse>""".format(message=message, code_string=code_string, req_id=short_uid())168 response.status_code = code169 return response170def create_sns_message_body(subscriber, req_data):171 message = req_data['Message'][0]172 subject = req_data.get('Subject', [None])[0]173 protocol = subscriber['Protocol']174 if subscriber['RawMessageDelivery'] == 'true':175 return message176 if req_data.get('MessageStructure') == ['json']:177 message = json.loads(message)178 try:179 message = message.get(protocol, message['default'])180 except KeyError:181 raise Exception("Unable to find 'default' key in message payload")182 data = {}183 data['MessageId'] = str(uuid.uuid4())184 data['Type'] = 'Notification'185 data['Message'] = message186 data['TopicArn'] = subscriber['TopicArn']187 if subject is not None:188 data['Subject'] = subject189 attributes = get_message_attributes(req_data)190 if attributes:191 data['MessageAttributes'] = attributes192 return json.dumps(data)193def get_message_attributes(req_data):194 attributes = {}195 x = 1196 while True:197 name = req_data.get('MessageAttributes.entry.' + str(x) + '.Name', [None])[0]198 if name is not None:199 attribute = {}200 attribute['Type'] = req_data.get('MessageAttributes.entry.' + str(x) + '.Value.DataType', [None])[0]201 string_value = req_data.get('MessageAttributes.entry.' + str(x) + '.Value.StringValue', [None])[0]202 binary_value = req_data.get('MessageAttributes.entry.' + str(x) + '.Value.BinaryValue', [None])[0]203 if string_value is not None:204 attribute['Value'] = string_value205 elif binary_value is not None:206 attribute['Value'] = binary_value207 attributes[name] = attribute208 x += 1209 else:210 break211 return attributes212def is_number(x):213 try:214 float(x)215 return True216 except ValueError:217 return False218def evaluate_numeric_condition(conditions, value):219 if not is_number(value):220 return False221 for i in range(0, len(conditions), 2):222 operator = conditions[i]223 operand = conditions[i + 1]224 if operator == '=':225 if value != operand:226 return False227 elif operator == '>':228 if value <= operand:229 return False230 elif operator == '<':231 if value >= operand:232 return False233 elif operator == '>=':234 if value < operand:235 return False236 elif operator == '<=':237 if value > operand:238 return False239 return True240def evaluate_condition(value, condition):241 if type(condition) is not dict:242 return value == condition243 elif condition.get('anything-but'):244 return value not in condition.get('anything-but')245 elif condition.get('prefix'):246 prefix = condition.get('prefix')247 return value.startswith(prefix)248 elif condition.get('numeric'):249 return evaluate_numeric_condition(condition.get('numeric'), value)250 return False251def evaluate_filter_policy_conditions(conditions, attribute):252 if type(conditions) is not list:253 conditions = [conditions]254 if attribute['Type'] == 'String.Array':255 values = ast.literal_eval(attribute['Value'])256 for value in values:257 for condition in conditions:258 if evaluate_condition(value, condition):259 return True260 else:261 for condition in conditions:262 if evaluate_condition(attribute['Value'], condition):263 return True264 return False265def check_filter_policy(filter_policy, message_attributes):266 if not filter_policy:267 return True268 for criteria in filter_policy:269 conditions = filter_policy.get(criteria)270 attribute = message_attributes.get(criteria)271 if attribute is None:272 return False273 if evaluate_filter_policy_conditions(conditions, attribute) is False:274 return False...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!