Best Python code snippet using localstack_python
sqs_listener.py
Source:sqs_listener.py
...233 return _get_attributes_forward_request(234 method, path, headers, req_data, forward_attrs235 )236 elif action == "TagQueue":237 req_data = self.fix_missing_tag_values(req_data)238 elif action == "CreateQueue":239 req_data = self.fix_missing_tag_values(req_data)240 def _is_fifo():241 for k, v in req_data.items():242 if v == "FifoQueue":243 return req_data[k.replace("Name", "Value")].lower() == "true"244 return False245 if req_data.get("QueueName").endswith(".fifo") and not _is_fifo():246 LOG.warn(247 'You are trying to create a queue ending in ".fifo". Please use the --attributes parameter to set FifoQueue appropriately.'248 )249 msg = "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"250 return make_requests_error(251 code=400, code_string="InvalidParameterValue", message=msg252 )253 changed_attrs = _fix_dlq_arn_in_attributes(req_data)254 if changed_attrs:255 return _get_attributes_forward_request(256 method, path, headers, req_data, changed_attrs257 )258 elif action == "DeleteQueue":259 queue_url = _queue_url(path, req_data, headers)260 QUEUE_ATTRIBUTES.pop(queue_url, None)261 sns_listener.unsubscribe_sqs_queue(queue_url)262 elif action == "ListDeadLetterSourceQueues":263 # TODO remove this function if we stop using ElasticMQ entirely264 queue_url = _queue_url(path, req_data, headers)265 if SQS_BACKEND_IMPL == "elasticmq":266 headers = {"content-type": "application/xhtml+xml"}267 content_str = _list_dead_letter_source_queues(QUEUE_ATTRIBUTES, queue_url)268 return requests_response(content_str, headers=headers)269 if "QueueName" in req_data:270 encoded_data = urlencode(req_data, doseq=True) if method == "POST" else ""271 modified_url = None272 if method == "GET":273 base_path = path.partition("?")[0]274 modified_url = "%s?%s" % (275 base_path,276 urlencode(req_data, doseq=True),277 )278 return Request(data=encoded_data, url=modified_url, headers=headers, method=method)279 return True280 def return_response(self, method, path, data, headers, response):281 # persist requests to disk282 super(ProxyListenerSQS, self).return_response(method, path, data, headers, response)283 if method == "OPTIONS" and path == "/":284 # Allow CORS preflight requests to succeed.285 return 200286 if method != "POST":287 return288 region_name = aws_stack.get_region()289 req_data = parse_request_data(method, path, data)290 action = req_data.get("Action")291 content_str = content_str_original = to_str(response.content)292 if response.status_code >= 400:293 return response294 _fire_event(req_data, response)295 # patch the response and add missing attributes296 if action == "GetQueueAttributes":297 content_str = _add_queue_attributes(path, req_data, content_str, headers)298 name = r"<Name>\s*RedrivePolicy\s*<\/Name>"299 value = r"<Value>\s*{(.*)}\s*<\/Value>"300 for p1, p2 in ((name, value), (value, name)):301 content_str = re.sub(302 r"<Attribute>\s*%s\s*%s\s*<\/Attribute>" % (p1, p2),303 _fix_redrive_policy,304 content_str,305 )306 # patch the response and return the correct endpoint URLs / ARNs307 if action in (308 "CreateQueue",309 "GetQueueUrl",310 "ListQueues",311 "GetQueueAttributes",312 "ListDeadLetterSourceQueues",313 ):314 if config.USE_SSL and "<QueueUrl>http://" in content_str:315 # return https://... if we're supposed to use SSL316 content_str = re.sub(r"<QueueUrl>\s*http://", r"<QueueUrl>https://", content_str)317 # expose external hostname:port318 external_port = SQS_PORT_EXTERNAL or get_external_port(headers)319 content_str = re.sub(320 r"<QueueUrl>\s*([a-z]+)://[^<]*:([0-9]+)/([^<]*)\s*</QueueUrl>",321 r"<QueueUrl>\1://%s:%s/\3</QueueUrl>" % (config.HOSTNAME_EXTERNAL, external_port),322 content_str,323 )324 # encode account ID in queue URL325 content_str = re.sub(326 r"<QueueUrl>\s*([a-z]+)://([^/]+)/queue/([^<]*)\s*</QueueUrl>",327 r"<QueueUrl>\1://\2/%s/\3</QueueUrl>" % constants.TEST_AWS_ACCOUNT_ID,328 content_str,329 )330 # fix queue ARN331 content_str = re.sub(332 r"<([a-zA-Z0-9]+)>\s*arn:aws:sqs:elasticmq:([^<]+)</([a-zA-Z0-9]+)>",333 r"<\1>arn:aws:sqs:%s:\2</\3>" % region_name,334 content_str,335 )336 if action == "CreateQueue":337 regex = r".*<QueueUrl>(.*)</QueueUrl>"338 queue_url = re.match(regex, content_str, re.DOTALL).group(1)339 if SQS_BACKEND_IMPL == "elasticmq":340 _set_queue_attributes(queue_url, req_data)341 elif action == "SendMessageBatch":342 if validate_empty_message_batch(data, req_data):343 msg = "There should be at least one SendMessageBatchRequestEntry in the request."344 return make_requests_error(code=404, code_string="EmptyBatchRequest", message=msg)345 if content_str_original != content_str:346 # if changes have been made, return patched response347 response.headers["Content-Length"] = len(content_str)348 response.headers["x-amz-crc32"] = calculate_crc32(content_str)349 return requests_response(350 content_str, headers=response.headers, status_code=response.status_code351 )352 @classmethod353 # TODO still needed? (can probably be removed)354 def get_message_attributes_md5(cls, req_data):355 req_data = clone(req_data)356 orig_types = {}357 for key, entry in dict(req_data).items():358 # Fix an issue in moto where data types like 'Number.java.lang.Integer' are359 # not supported: Keep track of the original data type, and temporarily change360 # it to the short form (e.g., 'Number'), before changing it back again.361 if key.endswith("DataType"):362 parts = entry.split(".")363 if len(parts) > 2:364 short_type_name = parts[0]365 full_type_name = entry366 attr_num = key.split(".")[1]367 attr_name = req_data["MessageAttribute.%s.Name" % attr_num]368 orig_types[attr_name] = full_type_name369 req_data[key] = [short_type_name]370 if full_type_name not in TRANSPORT_TYPE_ENCODINGS:371 TRANSPORT_TYPE_ENCODINGS[full_type_name] = TRANSPORT_TYPE_ENCODINGS[372 short_type_name373 ]374 # moto parse_message_attributes(..) expects params to be passed as dict of lists375 req_data_lists = {k: [v] for k, v in req_data.items()}376 moto_message = Message("dummy_msg_id", "dummy_body")377 moto_message.message_attributes = parse_message_attributes(req_data_lists)378 for key, data_type in orig_types.items():379 moto_message.message_attributes[key]["data_type"] = data_type380 message_attr_hash = moto_message.attribute_md5381 return message_attr_hash382 # Fixes tags with empty strings as value383 def fix_missing_tag_values(self, req_data):384 keys_matched = []385 for k, v in req_data.items():386 match = re.match(r"^Tag\.(\d+)\.Key", k)387 if match:388 index = match.group(1)389 tag_val = "Tag.{}.Value".format(index)390 if tag_val not in req_data.keys():391 keys_matched.append(tag_val)392 if keys_matched:393 for tag_val in keys_matched:394 req_data[tag_val] = ""395 return req_data396# instantiate listener397UPDATE_SQS = ProxyListenerSQS()
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!