Best Python code snippet using localstack_python
test_sqs.py
Source:test_sqs.py
...591 response = sqs.get_queue_attributes(QueueUrl=queue_url2, AttributeNames=["All"])592 redrive_policy = json.loads(response["Attributes"]["RedrivePolicy"])593 self.assertEqual(redrive_policy["maxReceiveCount"], 1)594 self.assertIn(redrive_policy["deadLetterTargetArn"], queue_arn1)595 def test_send_message_batch_with_empty_list(self):596 client = self.client597 response = client.create_queue(QueueName="test-queue")598 queue_url = response["QueueUrl"]599 try:600 client.send_message_batch(QueueUrl=queue_url, Entries=[])601 except ClientError as e:602 self.assertEqual(e.response["Error"]["Code"], "EmptyBatchRequest")603 self.assertIn(e.response["ResponseMetadata"]["HTTPStatusCode"], [400, 404])604 entries = [605 {606 "Id": "message{:02d}".format(0),607 "MessageBody": "msgBody{:02d}".format(0),608 "MessageAttributes": {609 "CustomAttribute": {610 "DataType": "String",611 "StringValue": "CustomAttributeValue{:02d}".format(0),612 }613 },614 }615 ]616 result = client.send_message_batch(QueueUrl=queue_url, Entries=entries)617 self.assertEqual(result["ResponseMetadata"]["HTTPStatusCode"], 200)618 # clean up619 client.delete_queue(QueueUrl=queue_url)620 def _run_test_lambda_invoked_by_sqs_message_with_delay_seconds_dotnet(621 self, zip_file, handler, runtime622 ):623 if not use_docker():624 return625 func_name = "dotnet-sqs-{}".format(short_uid())626 queue_name = "queue-%s" % short_uid()627 queue_url = self.client.create_queue(QueueName=queue_name)["QueueUrl"]628 queue_arn = aws_stack.sqs_queue_arn(queue_name)629 delay_time = 1630 testutil.create_lambda_function(631 func_name=func_name, zip_file=zip_file, handler=handler, runtime=runtime632 )633 lambda_client = aws_stack.connect_to_service("lambda")634 lambda_client.create_event_source_mapping(EventSourceArn=queue_arn, FunctionName=func_name)635 self.client.send_message(636 QueueUrl=queue_url, MessageBody="hello world.", DelaySeconds=delay_time637 )638 # assert that the Lambda has been invoked639 def get_logs():640 logs = get_lambda_log_events(func_name)641 self.assertGreater(len(logs), 0)642 retry(get_logs, retries=5, sleep=3)643 # assert that the message has been deleted from the queue644 resp = self.client.receive_message(QueueUrl=queue_url, MessageAttributeNames=["All"])645 self.assertEqual(200, resp["ResponseMetadata"]["HTTPStatusCode"])646 self.assertEqual(None, resp.get("Messages", None))647 # clean up648 self.client.delete_queue(QueueUrl=queue_url)649 testutil.delete_lambda_function(func_name)650 def test_lambda_invoked_by_sqs_message_with_delay_seconds_dotnetcore2(self):651 zip_file = load_file(TEST_LAMBDA_DOTNETCORE2, mode="rb")652 handler = "DotNetCore2::DotNetCore2.Lambda.Function::SimpleFunctionHandler"653 self._run_test_lambda_invoked_by_sqs_message_with_delay_seconds_dotnet(654 zip_file, handler, LAMBDA_RUNTIME_DOTNETCORE2655 )656 def test_lambda_invoked_by_sqs_message_with_delay_seconds_dotnetcore31(self):657 zip_file = load_file(TEST_LAMBDA_DOTNETCORE31, mode="rb")658 handler = "dotnetcore31::dotnetcore31.Function::FunctionHandler"659 self._run_test_lambda_invoked_by_sqs_message_with_delay_seconds_dotnet(660 zip_file, handler, LAMBDA_RUNTIME_DOTNETCORE31661 )662 def _run_test_fifo_queue_send_multiple_messages(self):663 fifo_queue = "queue-{}.fifo".format(short_uid())664 message_group = "group-%s" % short_uid()665 results = []666 number_of_messages = 5667 queue_url = self.client.create_queue(668 QueueName=fifo_queue, Attributes={"FifoQueue": "true"}669 )["QueueUrl"]670 # it should preserve .fifo in the queue name671 self.assertIn(fifo_queue, queue_url)672 # try sending multiple message with message group ID and deduplication ID673 for i in range(number_of_messages):674 rs = self.client.send_message(675 QueueUrl=queue_url,676 MessageBody="message-{}".format(i),677 MessageDeduplicationId="deduplication-{}".format(i),678 MessageGroupId=message_group,679 )680 results.append(rs)681 return queue_url, number_of_messages, results682 def test_fifo_queue_send_multiple_messages_single_receive(self):683 (684 queue_url,685 number_of_messages,686 results,687 ) = self._run_test_fifo_queue_send_multiple_messages()688 # receive multiple message in the same time689 messages = self.client.receive_message(690 QueueUrl=queue_url,691 MessageAttributeNames=["All"],692 MaxNumberOfMessages=number_of_messages,693 )694 # asset the received messages data695 self.assertEqual(number_of_messages, len(messages["Messages"]))696 for i in range(number_of_messages):697 self.assertEqual("message-{}".format(i), messages["Messages"][i]["Body"])698 self.assertEqual(results[i]["MD5OfMessageBody"], messages["Messages"][i]["MD5OfBody"])699 self.assertEqual(results[i]["MessageId"], messages["Messages"][i]["MessageId"])700 # clean up701 self.client.delete_queue(QueueUrl=queue_url)702 def test_fifo_queue_send_multiple_messages_multiple_receives(self):703 (704 queue_url,705 number_of_messages,706 results,707 ) = self._run_test_fifo_queue_send_multiple_messages()708 first_receives = number_of_messages // 2709 # receive multiple message first time710 messages = self.client.receive_message(711 QueueUrl=queue_url,712 MessageAttributeNames=["All"],713 MaxNumberOfMessages=first_receives,714 )715 # asset the received messages data first time716 self.assertEqual(first_receives, len(messages["Messages"]))717 for i in range(first_receives):718 self.assertEqual("message-{}".format(i), messages["Messages"][i]["Body"])719 self.assertEqual(results[i]["MD5OfMessageBody"], messages["Messages"][i]["MD5OfBody"])720 self.assertEqual(results[i]["MessageId"], messages["Messages"][i]["MessageId"])721 # delete message to receive next message in queue722 self.client.delete_message(723 QueueUrl=queue_url,724 ReceiptHandle=messages["Messages"][i]["ReceiptHandle"],725 )726 second_receives = number_of_messages - first_receives727 # try to get one by one message in the second time728 for i in range(second_receives):729 message = self.client.receive_message(QueueUrl=queue_url)730 self.assertEqual(731 "message-{}".format(first_receives + i), message["Messages"][0]["Body"]732 )733 self.assertEqual(734 results[first_receives + i]["MD5OfMessageBody"],735 message["Messages"][0]["MD5OfBody"],736 )737 self.assertEqual(738 results[first_receives + i]["MessageId"],739 message["Messages"][0]["MessageId"],740 )741 # delete message to receive next message in queue742 self.client.delete_message(743 QueueUrl=queue_url,744 ReceiptHandle=message["Messages"][0]["ReceiptHandle"],745 )746 # clean up747 self.client.delete_queue(QueueUrl=queue_url)748 def test_fifo_queue_send_multiple_messages_multiple_single_receives(self):749 (750 queue_url,751 number_of_messages,752 results,753 ) = self._run_test_fifo_queue_send_multiple_messages()754 for i in range(number_of_messages):755 resp = self.client.receive_message(QueueUrl=queue_url)756 self.assertEqual("message-{}".format(i), resp["Messages"][0]["Body"])757 self.assertEqual(results[i]["MD5OfMessageBody"], resp["Messages"][0]["MD5OfBody"])758 self.assertEqual(results[i]["MessageId"], resp["Messages"][0]["MessageId"])759 # delete message to receive next message in queue760 self.client.delete_message(761 QueueUrl=queue_url, ReceiptHandle=resp["Messages"][0]["ReceiptHandle"]762 )763 # clean up764 self.client.delete_queue(QueueUrl=queue_url)765 # Not the same to create a queue with tags than tagging an existing queue766 def test_create_queue_with_tags(self):767 queue_name = "queue-{}".format(short_uid())768 response = self.client.create_queue(769 QueueName=queue_name,770 tags=TEST_LAMBDA_TAGS,771 )772 self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])773 self.check_tagged_queue(response["QueueUrl"])774 def test_tag_untag_queue(self):775 queue_name = "queue-{}".format(short_uid())776 queue_url = self.client.create_queue(QueueName=queue_name)["QueueUrl"]777 response = self.client.tag_queue(778 QueueUrl=queue_url,779 Tags=TEST_LAMBDA_TAGS,780 )781 self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])782 self.check_tagged_queue(queue_url)783 def test_posting_to_queue_with_trailing_slash(self):784 queue_name = "queue-{}".format(short_uid())785 queue_url = self.client.create_queue(QueueName=queue_name)["QueueUrl"]786 base_url = "{}://{}:{}".format(787 get_service_protocol(), config.LOCALSTACK_HOSTNAME, config.PORT_SQS788 )789 encoded_url = urlencode(790 {791 "Action": "SendMessage",792 "Version": "2012-11-05",793 "QueueUrl": "{}/{}/{}/".format(base_url, TEST_AWS_ACCOUNT_ID, queue_name),794 "MessageBody": "test body",795 }796 )797 r = requests.post(url=base_url, data=encoded_url)798 self.assertEqual(r.status_code, 200)799 # We can get the message back800 resp = self.client.receive_message(QueueUrl=queue_url)801 self.assertEqual(resp["Messages"][0]["Body"], "test body")802 # clean up803 self.client.delete_queue(QueueUrl=queue_url)804 def test_create_queue_with_slashes(self):805 queue_name = "queue/%s" % short_uid()806 queue_url = self.client.create_queue(QueueName=queue_name)807 result = self.client.list_queues()808 self.assertIn(queue_url.get("QueueUrl"), result.get("QueueUrls"))809 # clean up810 self.client.delete_queue(QueueUrl=queue_url.get("QueueUrl"))811 result = self.client.list_queues()812 self.assertNotIn(queue_url.get("QueueUrl"), result.get("QueueUrls", []))813 def list_queues_with_auth_in_presigned_url(self, method):814 base_url = "{}://{}:{}".format(815 get_service_protocol(), config.LOCALSTACK_HOSTNAME, config.PORT_SQS816 )817 req = AWSRequest(818 method=method,819 url=base_url,820 data={"Action": "ListQueues", "Version": "2012-11-05"},821 )822 # boto doesn't support querystring-style auth, so we have to do some823 # weird logic to use boto's signing functions, to understand what's824 # going on here look at the internals of the SigV4Auth.add_auth825 # method.826 datetime_now = datetime.datetime.utcnow()827 req.context["timestamp"] = datetime_now.strftime(SIGV4_TIMESTAMP)828 signer = SigV4Auth(829 Credentials(TEST_AWS_ACCESS_KEY_ID, TEST_AWS_SECRET_ACCESS_KEY),830 "sqs",831 aws_stack.get_region(),832 )833 canonical_request = signer.canonical_request(req)834 string_to_sign = signer.string_to_sign(req, canonical_request)835 payload = {836 "Action": "ListQueues",837 "Version": "2012-11-05",838 "X-Amz-Algorithm": "AWS4-HMAC-SHA256",839 "X-Amz-Credential": signer.scope(req),840 "X-Amz-SignedHeaders": ";".join(signer.headers_to_sign(req).keys()),841 "X-Amz-Signature": signer.signature(string_to_sign, req),842 }843 if method == "GET":844 return requests.get(url=base_url, params=payload)845 else:846 return requests.post(url=base_url, data=urlencode(payload))847 def test_post_list_queues_with_auth_in_presigned_url(self):848 res = self.list_queues_with_auth_in_presigned_url(method="POST")849 self.assertEqual(res.status_code, 200)850 self.assertTrue(b"<ListQueuesResponse>" in res.content)851 def test_get_list_queues_with_auth_in_presigned_url(self):852 res = self.list_queues_with_auth_in_presigned_url(method="GET")853 self.assertEqual(res.status_code, 200)854 self.assertTrue(b"<ListQueuesResponse>" in res.content)855 # ---------------856 # HELPER METHODS857 # ---------------858 def receive_dlq(self, queue_url, assert_error_details=False, assert_receive_count=None):859 """Assert that a message has been received on the given DLQ"""860 result = self.client.receive_message(QueueUrl=queue_url, MessageAttributeNames=["All"])861 self.assertGreater(len(result["Messages"]), 0)862 msg = result["Messages"][0]863 msg_attrs = msg.get("MessageAttributes") or msg.get("Attributes")864 if assert_error_details:865 self.assertIn("RequestID", msg_attrs)866 self.assertIn("ErrorCode", msg_attrs)867 self.assertIn("ErrorMessage", msg_attrs)868 else:869 if assert_receive_count is not None:870 pass871 # TODO: this started failing with latest moto upgrade,872 # probably in or around this commit:873 # https://github.com/spulec/moto/commit/6da4905da940e25e317db60b7657ea632f58ef1d874 # self.assertEqual(str(assert_receive_count), msg_attrs.get('ApproximateReceiveCount'))875 def check_tagged_queue(self, queue_url):876 response = self.client.list_queue_tags(QueueUrl=queue_url)877 self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])878 tags_keys_as_list = list(TEST_LAMBDA_TAGS.keys())879 for key in tags_keys_as_list:880 self.assertIn(key, response["Tags"])881 random_tag_key = random.choice(tags_keys_as_list)882 # Pop random chosen tag key from list883 tags_keys_as_list.pop(tags_keys_as_list.index(random_tag_key))884 response = self.client.untag_queue(QueueUrl=queue_url, TagKeys=[random_tag_key])885 self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])886 response = self.client.list_queue_tags(QueueUrl=queue_url)887 self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])888 self.assertNotIn(random_tag_key, response["Tags"])889 for key in tags_keys_as_list:890 self.assertIn(key, response["Tags"])891 # Clean up892 self.client.untag_queue(893 QueueUrl=queue_url,894 TagKeys=tags_keys_as_list,895 )896 self.client.delete_queue(QueueUrl=queue_url)897 def check_msg_attributes(self, queue_url, attrs):898 messages_all_attributes = self.client.receive_message(899 QueueUrl=queue_url, MessageAttributeNames=["All"]900 )901 self.assertEqual(messages_all_attributes.get("Messages")[0]["MessageAttributes"], attrs)902class TestSqsProvider:903 def test_list_queues(self, sqs_client, sqs_create_queue):904 queue_names = [905 "a-test-queue-" + short_uid(),906 "a-test-queue-" + short_uid(),907 "b-test-queue-" + short_uid(),908 ]909 # create three queues with prefixes and collect their urls910 queue_urls = []911 for name in queue_names:912 sqs_create_queue(QueueName=name)913 queue_url = sqs_client.get_queue_url(QueueName=name)["QueueUrl"]914 assert queue_url.endswith(name)915 queue_urls.append(queue_url)916 # list queues with first prefix917 result = sqs_client.list_queues(QueueNamePrefix="a-test-queue-")918 assert "QueueUrls" in result919 assert len(result["QueueUrls"]) == 2920 assert queue_urls[0] in result["QueueUrls"]921 assert queue_urls[1] in result["QueueUrls"]922 assert queue_urls[2] not in result["QueueUrls"]923 # list queues with second prefix924 result = sqs_client.list_queues(QueueNamePrefix="b-test-queue-")925 assert "QueueUrls" in result926 assert len(result["QueueUrls"]) == 1927 assert queue_urls[0] not in result["QueueUrls"]928 assert queue_urls[1] not in result["QueueUrls"]929 assert queue_urls[2] in result["QueueUrls"]930 def test_create_queue_and_get_attributes(self, sqs_client, sqs_queue):931 result = sqs_client.get_queue_attributes(932 QueueUrl=sqs_queue, AttributeNames=["QueueArn", "CreatedTimestamp", "VisibilityTimeout"]933 )934 assert "Attributes" in result935 attrs = result["Attributes"]936 assert len(attrs) == 3937 assert "test-queue-" in attrs["QueueArn"]938 assert int(float(attrs["CreatedTimestamp"])) == pytest.approx(int(time.time()), 30)939 assert int(attrs["VisibilityTimeout"]) == 30, "visibility timeout is not the default value"940 def test_send_receive_message(self, sqs_client, sqs_queue):941 send_result = sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="message")942 assert send_result["MessageId"]943 assert send_result["MD5OfMessageBody"] == "78e731027d8fd50ed642340b7c9a63b3"944 # TODO: other attributes945 receive_result = sqs_client.receive_message(QueueUrl=sqs_queue)946 assert len(receive_result["Messages"]) == 1947 message = receive_result["Messages"][0]948 assert message["ReceiptHandle"]949 assert message["Body"] == "message"950 assert message["MessageId"] == send_result["MessageId"]951 assert message["MD5OfBody"] == send_result["MD5OfMessageBody"]952 def test_send_receive_message_multiple_queues(self, sqs_client, sqs_create_queue):953 queue0 = sqs_create_queue()954 queue1 = sqs_create_queue()955 sqs_client.send_message(QueueUrl=queue0, MessageBody="message")956 result = sqs_client.receive_message(QueueUrl=queue1)957 assert "Messages" not in result958 result = sqs_client.receive_message(QueueUrl=queue0)959 assert len(result["Messages"]) == 1960 assert result["Messages"][0]["Body"] == "message"961 def test_send_message_batch(self, sqs_client, sqs_queue):962 sqs_client.send_message_batch(963 QueueUrl=sqs_queue,964 Entries=[965 {"Id": "1", "MessageBody": "message-0"},966 {"Id": "2", "MessageBody": "message-1"},967 ],968 )969 response0 = sqs_client.receive_message(QueueUrl=sqs_queue)970 response1 = sqs_client.receive_message(QueueUrl=sqs_queue)971 response2 = sqs_client.receive_message(QueueUrl=sqs_queue)972 assert len(response0.get("Messages", [])) == 1973 assert len(response1.get("Messages", [])) == 1974 assert len(response2.get("Messages", [])) == 0975 message0 = response0["Messages"][0]976 message1 = response1["Messages"][0]977 assert message0["Body"] == "message-0"978 assert message1["Body"] == "message-1"979 def test_send_batch_receive_multiple(self, sqs_client, sqs_queue):980 # send a batch, then a single message, receive them a981 sqs_client.send_message_batch(982 QueueUrl=sqs_queue,983 Entries=[984 {"Id": "1", "MessageBody": "message-0"},985 {"Id": "2", "MessageBody": "message-1"},986 ],987 )988 sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="message-2")989 response = sqs_client.receive_message(QueueUrl=sqs_queue, MaxNumberOfMessages=3)990 assert len(response["Messages"]) == 3991 assert response["Messages"][0]["Body"] == "message-0"992 assert response["Messages"][1]["Body"] == "message-1"993 assert response["Messages"][2]["Body"] == "message-2"994 def test_send_message_batch_with_empty_list(self, sqs_client, sqs_create_queue):995 queue_url = sqs_create_queue()996 try:997 sqs_client.send_message_batch(QueueUrl=queue_url, Entries=[])998 except ClientError as e:999 assert "EmptyBatchRequest" in e.response["Error"]["Code"]1000 assert e.response["ResponseMetadata"]["HTTPStatusCode"] in [400, 404]1001 def test_tag_untag_queue(self, sqs_client, sqs_create_queue):1002 queue_url = sqs_create_queue()1003 # tag queue1004 tags = {"tag1": "value1", "tag2": "value2", "tag3": ""}1005 sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)1006 # check queue tags1007 response = sqs_client.list_queue_tags(QueueUrl=queue_url)1008 assert response["Tags"] == tags...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!