Best Python code snippet using localstack_python
provider.py
Source:provider.py
...853 if (854 queue.attributes855 and queue.attributes.get(QueueAttributeName.RedrivePolicy) is not None856 ):857 moved_to_dlq = self._dead_letter_check(queue, standard_message, context)858 if moved_to_dlq:859 continue860 # filter attributes861 if message_attribute_names:862 if "All" not in message_attribute_names:863 msg["MessageAttributes"] = {864 k: v865 for k, v in msg["MessageAttributes"].items()866 if k in message_attribute_names867 }868 # TODO: why is this called even if we receive "All" attributes?869 msg["MD5OfMessageAttributes"] = _create_message_attribute_hash(870 msg["MessageAttributes"]871 )872 else:873 del msg["MessageAttributes"]874 # add message to result875 messages.append(msg)876 num -= 1877 # TODO: how does receiving behave if the queue was deleted in the meantime?878 return ReceiveMessageResult(Messages=messages)879 def _dead_letter_check(880 self, queue: SqsQueue, std_m: SqsMessage, context: RequestContext881 ) -> bool:882 redrive_policy = json.loads(queue.attributes.get(QueueAttributeName.RedrivePolicy))883 # TODO: include the names of the dictionary sub - attributes in the autogenerated code?884 max_receive_count = redrive_policy["maxReceiveCount"]885 if std_m.receive_times > max_receive_count:886 dead_letter_target_arn = redrive_policy["deadLetterTargetArn"]887 dl_queue = self._require_queue_by_arn(dead_letter_target_arn)888 # TODO: this needs to be atomic?889 dead_message = std_m.message890 dl_queue.put(message=dead_message)891 queue.remove(std_m.message["ReceiptHandle"])892 return True893 else:...
test_sqs.py
Source:test_sqs.py
...28 md5_listener = get_message_attributes_md5(msg_attrs_listener)29 msg_attrs_provider = {"timestamp": {"StringValue": "1493147359900", "DataType": "Number"}}30 md5_provider = provider._create_message_attribute_hash(msg_attrs_provider)31 assert md5_provider == md5_listener32def test_handle_string_max_receive_count_in_dead_letter_check():33 # fmt: off34 policy = {"RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:us-east-1:000000000000:DeadLetterQueue\",\"maxReceiveCount\": \"5\" }"}35 # fmt: on36 queue = provider.SqsQueue("TestQueue", "us-east-1", "123456789", policy)37 sqs_message = provider.SqsMessage(Message(), {})38 result = provider.SqsProvider()._dead_letter_check(queue, sqs_message, None)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!