Best Python code snippet using localstack_python
sqs_listener.py
Source:sqs_listener.py
...95 QUEUE_ATTRIBUTES[queue_url] = QUEUE_ATTRIBUTES.get(queue_url) or {}96 QUEUE_ATTRIBUTES[queue_url].update(local_attrs)97 forward_attrs = {k: v for k, v in attrs.items() if k not in UNSUPPORTED_ATTRIBUTE_NAMES}98 return forward_attrs99def _fix_dlq_arn_in_attributes(req_data):100 """Convert queue URL to ARN for DLQ in redrive policy config."""101 attrs = _format_attributes(req_data)102 policy = json.loads(attrs.get("RedrivePolicy") or "{}")103 dlq_arn = policy.get("deadLetterTargetArn", "")104 if "://" in dlq_arn:105 # convert queue URL to queue ARN106 policy["deadLetterTargetArn"] = aws_stack.sqs_queue_arn(dlq_arn)107 attrs["RedrivePolicy"] = json.dumps(policy)108 return attrs109def _fix_redrive_policy(match):110 result = "<Attribute><Name>RedrivePolicy</Name><Value>{%s}</Value></Attribute>" % (111 match.group(1).replace(" ", "")112 )113 return result114def _add_queue_attributes(path, req_data, content_str, headers):115 # TODO remove this function if we stop using ElasticMQ entirely116 if SQS_BACKEND_IMPL != "elasticmq":117 return content_str118 flags = re.MULTILINE | re.DOTALL119 queue_url = _queue_url(path, req_data, headers)120 requested_attributes = _format_attributes_names(req_data)121 regex = r"(.*<GetQueueAttributesResult>)(.*)(</GetQueueAttributesResult>.*)"122 attrs = re.sub(regex, r"\2", content_str, flags=flags)123 for key, value in QUEUE_ATTRIBUTES.get(queue_url, {}).items():124 if (125 not requested_attributes or requested_attributes.intersection({"All", key})126 ) and not re.match(r"<Name>\s*%s\s*</Name>" % key, attrs, flags=flags):127 attrs += "<Attribute><Name>%s</Name><Value>%s</Value></Attribute>" % (128 key,129 value,130 )131 content_str = (132 re.sub(regex, r"\1", content_str, flags=flags)133 + attrs134 + re.sub(regex, r"\3", content_str, flags=flags)135 )136 return content_str137def _fire_event(req_data, response):138 action = req_data.get("Action")139 event_type = None140 queue_url = None141 if action == "CreateQueue":142 event_type = event_publisher.EVENT_SQS_CREATE_QUEUE143 response_data = xmltodict.parse(response.content)144 if "CreateQueueResponse" in response_data:145 queue_url = response_data["CreateQueueResponse"]["CreateQueueResult"]["QueueUrl"]146 elif action == "DeleteQueue":147 event_type = event_publisher.EVENT_SQS_DELETE_QUEUE148 queue_url = req_data.get("QueueUrl")149 if event_type and queue_url:150 event_publisher.fire_event(event_type, payload={"u": event_publisher.get_hash(queue_url)})151def _queue_url(path, req_data, headers):152 queue_url = req_data.get("QueueUrl")153 if queue_url:154 return queue_url155 url = config.service_url("sqs")156 if headers.get("Host"):157 url = "%s://%s" % (get_service_protocol(), headers["Host"])158 queue_url = "%s%s" % (url, path.partition("?")[0])159 return queue_url160def _list_dead_letter_source_queues(queues, queue_url):161 dead_letter_source_queues = []162 for k, v in queues.items():163 for i, j in v.items():164 if i == "RedrivePolicy":165 f = json.loads(v[i])166 queue_url_split = queue_url.split("/")167 if queue_url_split[-1] in f["deadLetterTargetArn"]:168 dead_letter_source_queues.append(k)169 return format_list_dl_source_queues_response(dead_letter_source_queues)170def format_list_dl_source_queues_response(queues):171 content_str = """<ListDeadLetterSourceQueuesResponse xmlns="{}">172 <ListDeadLetterSourceQueuesResult>173 {}174 </ListDeadLetterSourceQueuesResult>175 </ListDeadLetterSourceQueuesResponse>"""176 queue_urls = ""177 for q in queues:178 queue_urls += "<QueueUrl>{}</QueueUrl>".format(q)179 return content_str.format(XMLNS_SQS, queue_urls)180# extract the external port used by the client to make the request181def get_external_port(headers):182 host = headers.get("Host", "")183 if not host:184 forwarded = headers.get("X-Forwarded-For", "").split(",")185 host = forwarded[-2] if len(forwarded) > 2 else forwarded[-1]186 if ":" in host:187 return int(host.split(":")[1])188 # If we cannot find the Host header, then fall back to the port of SQS itself (i.e., edge proxy).189 # (Note that this could be incorrect, e.g., if running in Docker with a host port that190 # is different from the internal container port, but there is not much else we can do.)191 return config.service_port("sqs")192def validate_empty_message_batch(data, req_data):193 data = to_str(data).split("Entries=")194 if len(data) > 1 and not req_data.get("Entries"):195 return True196 return False197class ProxyListenerSQS(PersistingProxyListener):198 def api_name(self):199 return "sqs"200 def forward_request(self, method, path, data, headers):201 if method == "OPTIONS":202 return 200203 req_data = parse_request_data(method, path, data)204 if is_sqs_queue_url(path) and method == "GET":205 if not headers.get("Authorization"):206 headers["Authorization"] = aws_stack.mock_aws_request_headers(service="sqs")[207 "Authorization"208 ]209 method = "POST"210 req_data = {211 "Action": "GetQueueUrl",212 "Version": API_VERSION,213 "QueueName": path.split("/")[-1],214 }215 if req_data:216 action = req_data.get("Action")217 if action in ("SendMessage", "SendMessageBatch") and SQS_BACKEND_IMPL == "moto":218 # check message contents219 for key, value in req_data.items():220 if not re.match(MSG_CONTENT_REGEX, str(value)):221 return make_requests_error(222 code=400,223 code_string="InvalidMessageContents",224 message="Message contains invalid characters",225 )226 elif action == "SetQueueAttributes":227 # TODO remove this function if we stop using ElasticMQ228 queue_url = _queue_url(path, req_data, headers)229 if SQS_BACKEND_IMPL == "elasticmq":230 forward_attrs = _set_queue_attributes(queue_url, req_data)231 if len(req_data) != len(forward_attrs):232 # make sure we only forward the supported attributes to the backend233 return _get_attributes_forward_request(234 method, path, headers, req_data, forward_attrs235 )236 elif action == "TagQueue":237 req_data = self.fix_missing_tag_values(req_data)238 elif action == "CreateQueue":239 req_data = self.fix_missing_tag_values(req_data)240 def _is_fifo():241 for k, v in req_data.items():242 if v == "FifoQueue":243 return req_data[k.replace("Name", "Value")].lower() == "true"244 return False245 if req_data.get("QueueName").endswith(".fifo") and not _is_fifo():246 LOG.warn(247 'You are trying to create a queue ending in ".fifo". Please use the --attributes parameter to set FifoQueue appropriately.'248 )249 msg = "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"250 return make_requests_error(251 code=400, code_string="InvalidParameterValue", message=msg252 )253 changed_attrs = _fix_dlq_arn_in_attributes(req_data)254 if changed_attrs:255 return _get_attributes_forward_request(256 method, path, headers, req_data, changed_attrs257 )258 elif action == "DeleteQueue":259 queue_url = _queue_url(path, req_data, headers)260 QUEUE_ATTRIBUTES.pop(queue_url, None)261 sns_listener.unsubscribe_sqs_queue(queue_url)262 elif action == "ListDeadLetterSourceQueues":263 # TODO remove this function if we stop using ElasticMQ entirely264 queue_url = _queue_url(path, req_data, headers)265 if SQS_BACKEND_IMPL == "elasticmq":266 headers = {"content-type": "application/xhtml+xml"}267 content_str = _list_dead_letter_source_queues(QUEUE_ATTRIBUTES, queue_url)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!