Best Python code snippet using localstack_python
sqs_listener.py
Source:sqs_listener.py
...217 if action in ("SendMessage", "SendMessageBatch") and SQS_BACKEND_IMPL == "moto":218 # check message contents219 for key, value in req_data.items():220 if not re.match(MSG_CONTENT_REGEX, str(value)):221 return make_requests_error(222 code=400,223 code_string="InvalidMessageContents",224 message="Message contains invalid characters",225 )226 elif action == "SetQueueAttributes":227 # TODO remove this function if we stop using ElasticMQ228 queue_url = _queue_url(path, req_data, headers)229 if SQS_BACKEND_IMPL == "elasticmq":230 forward_attrs = _set_queue_attributes(queue_url, req_data)231 if len(req_data) != len(forward_attrs):232 # make sure we only forward the supported attributes to the backend233 return _get_attributes_forward_request(234 method, path, headers, req_data, forward_attrs235 )236 elif action == "TagQueue":237 req_data = self.fix_missing_tag_values(req_data)238 elif action == "CreateQueue":239 req_data = self.fix_missing_tag_values(req_data)240 def _is_fifo():241 for k, v in req_data.items():242 if v == "FifoQueue":243 return req_data[k.replace("Name", "Value")].lower() == "true"244 return False245 if req_data.get("QueueName").endswith(".fifo") and not _is_fifo():246 LOG.warn(247 'You are trying to create a queue ending in ".fifo". Please use the --attributes parameter to set FifoQueue appropriately.'248 )249 msg = "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"250 return make_requests_error(251 code=400, code_string="InvalidParameterValue", message=msg252 )253 changed_attrs = _fix_dlq_arn_in_attributes(req_data)254 if changed_attrs:255 return _get_attributes_forward_request(256 method, path, headers, req_data, changed_attrs257 )258 elif action == "DeleteQueue":259 queue_url = _queue_url(path, req_data, headers)260 QUEUE_ATTRIBUTES.pop(queue_url, None)261 sns_listener.unsubscribe_sqs_queue(queue_url)262 elif action == "ListDeadLetterSourceQueues":263 # TODO remove this function if we stop using ElasticMQ entirely264 queue_url = _queue_url(path, req_data, headers)265 if SQS_BACKEND_IMPL == "elasticmq":266 headers = {"content-type": "application/xhtml+xml"}267 content_str = _list_dead_letter_source_queues(QUEUE_ATTRIBUTES, queue_url)268 return requests_response(content_str, headers=headers)269 if "QueueName" in req_data:270 encoded_data = urlencode(req_data, doseq=True) if method == "POST" else ""271 modified_url = None272 if method == "GET":273 base_path = path.partition("?")[0]274 modified_url = "%s?%s" % (275 base_path,276 urlencode(req_data, doseq=True),277 )278 return Request(data=encoded_data, url=modified_url, headers=headers, method=method)279 return True280 def return_response(self, method, path, data, headers, response):281 # persist requests to disk282 super(ProxyListenerSQS, self).return_response(method, path, data, headers, response)283 if method == "OPTIONS" and path == "/":284 # Allow CORS preflight requests to succeed.285 return 200286 if method != "POST":287 return288 region_name = aws_stack.get_region()289 req_data = parse_request_data(method, path, data)290 action = req_data.get("Action")291 content_str = content_str_original = to_str(response.content)292 if response.status_code >= 400:293 return response294 _fire_event(req_data, response)295 # patch the response and add missing attributes296 if action == "GetQueueAttributes":297 content_str = _add_queue_attributes(path, req_data, content_str, headers)298 name = r"<Name>\s*RedrivePolicy\s*<\/Name>"299 value = r"<Value>\s*{(.*)}\s*<\/Value>"300 for p1, p2 in ((name, value), (value, name)):301 content_str = re.sub(302 r"<Attribute>\s*%s\s*%s\s*<\/Attribute>" % (p1, p2),303 _fix_redrive_policy,304 content_str,305 )306 # patch the response and return the correct endpoint URLs / ARNs307 if action in (308 "CreateQueue",309 "GetQueueUrl",310 "ListQueues",311 "GetQueueAttributes",312 "ListDeadLetterSourceQueues",313 ):314 if config.USE_SSL and "<QueueUrl>http://" in content_str:315 # return https://... if we're supposed to use SSL316 content_str = re.sub(r"<QueueUrl>\s*http://", r"<QueueUrl>https://", content_str)317 # expose external hostname:port318 external_port = SQS_PORT_EXTERNAL or get_external_port(headers)319 content_str = re.sub(320 r"<QueueUrl>\s*([a-z]+)://[^<]*:([0-9]+)/([^<]*)\s*</QueueUrl>",321 r"<QueueUrl>\1://%s:%s/\3</QueueUrl>" % (config.HOSTNAME_EXTERNAL, external_port),322 content_str,323 )324 # encode account ID in queue URL325 content_str = re.sub(326 r"<QueueUrl>\s*([a-z]+)://([^/]+)/queue/([^<]*)\s*</QueueUrl>",327 r"<QueueUrl>\1://\2/%s/\3</QueueUrl>" % constants.TEST_AWS_ACCOUNT_ID,328 content_str,329 )330 # fix queue ARN331 content_str = re.sub(332 r"<([a-zA-Z0-9]+)>\s*arn:aws:sqs:elasticmq:([^<]+)</([a-zA-Z0-9]+)>",333 r"<\1>arn:aws:sqs:%s:\2</\3>" % region_name,334 content_str,335 )336 if action == "CreateQueue":337 regex = r".*<QueueUrl>(.*)</QueueUrl>"338 queue_url = re.match(regex, content_str, re.DOTALL).group(1)339 if SQS_BACKEND_IMPL == "elasticmq":340 _set_queue_attributes(queue_url, req_data)341 elif action == "SendMessageBatch":342 if validate_empty_message_batch(data, req_data):343 msg = "There should be at least one SendMessageBatchRequestEntry in the request."344 return make_requests_error(code=404, code_string="EmptyBatchRequest", message=msg)345 if content_str_original != content_str:346 # if changes have been made, return patched response347 response.headers["Content-Length"] = len(content_str)348 response.headers["x-amz-crc32"] = calculate_crc32(content_str)349 return requests_response(350 content_str, headers=response.headers, status_code=response.status_code351 )352 @classmethod353 # TODO still needed? (can probably be removed)354 def get_message_attributes_md5(cls, req_data):355 req_data = clone(req_data)356 orig_types = {}357 for key, entry in dict(req_data).items():358 # Fix an issue in moto where data types like 'Number.java.lang.Integer' are...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!