Best Python code snippet using localstack_python
test_sqs.py
Source:test_sqs.py
...770 tags=TEST_LAMBDA_TAGS,771 )772 self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])773 self.check_tagged_queue(response["QueueUrl"])774 def test_tag_untag_queue(self):775 queue_name = "queue-{}".format(short_uid())776 queue_url = self.client.create_queue(QueueName=queue_name)["QueueUrl"]777 response = self.client.tag_queue(778 QueueUrl=queue_url,779 Tags=TEST_LAMBDA_TAGS,780 )781 self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])782 self.check_tagged_queue(queue_url)783 def test_posting_to_queue_with_trailing_slash(self):784 queue_name = "queue-{}".format(short_uid())785 queue_url = self.client.create_queue(QueueName=queue_name)["QueueUrl"]786 base_url = "{}://{}:{}".format(787 get_service_protocol(), config.LOCALSTACK_HOSTNAME, config.PORT_SQS788 )789 encoded_url = urlencode(790 {791 "Action": "SendMessage",792 "Version": "2012-11-05",793 "QueueUrl": "{}/{}/{}/".format(base_url, TEST_AWS_ACCOUNT_ID, queue_name),794 "MessageBody": "test body",795 }796 )797 r = requests.post(url=base_url, data=encoded_url)798 self.assertEqual(r.status_code, 200)799 # We can get the message back800 resp = self.client.receive_message(QueueUrl=queue_url)801 self.assertEqual(resp["Messages"][0]["Body"], "test body")802 # clean up803 self.client.delete_queue(QueueUrl=queue_url)804 def test_create_queue_with_slashes(self):805 queue_name = "queue/%s" % short_uid()806 queue_url = self.client.create_queue(QueueName=queue_name)807 result = self.client.list_queues()808 self.assertIn(queue_url.get("QueueUrl"), result.get("QueueUrls"))809 # clean up810 self.client.delete_queue(QueueUrl=queue_url.get("QueueUrl"))811 result = self.client.list_queues()812 self.assertNotIn(queue_url.get("QueueUrl"), result.get("QueueUrls", []))813 def list_queues_with_auth_in_presigned_url(self, method):814 base_url = "{}://{}:{}".format(815 get_service_protocol(), config.LOCALSTACK_HOSTNAME, config.PORT_SQS816 )817 req = AWSRequest(818 method=method,819 url=base_url,820 data={"Action": "ListQueues", "Version": "2012-11-05"},821 )822 # boto doesn't support querystring-style auth, so we have to do some823 # weird logic to use boto's signing functions, to understand what's824 # going on here look at the internals of the SigV4Auth.add_auth825 # method.826 datetime_now = datetime.datetime.utcnow()827 req.context["timestamp"] = datetime_now.strftime(SIGV4_TIMESTAMP)828 signer = SigV4Auth(829 Credentials(TEST_AWS_ACCESS_KEY_ID, TEST_AWS_SECRET_ACCESS_KEY),830 "sqs",831 aws_stack.get_region(),832 )833 canonical_request = signer.canonical_request(req)834 string_to_sign = signer.string_to_sign(req, canonical_request)835 payload = {836 "Action": "ListQueues",837 "Version": "2012-11-05",838 "X-Amz-Algorithm": "AWS4-HMAC-SHA256",839 "X-Amz-Credential": signer.scope(req),840 "X-Amz-SignedHeaders": ";".join(signer.headers_to_sign(req).keys()),841 "X-Amz-Signature": signer.signature(string_to_sign, req),842 }843 if method == "GET":844 return requests.get(url=base_url, params=payload)845 else:846 return requests.post(url=base_url, data=urlencode(payload))847 def test_post_list_queues_with_auth_in_presigned_url(self):848 res = self.list_queues_with_auth_in_presigned_url(method="POST")849 self.assertEqual(res.status_code, 200)850 self.assertTrue(b"<ListQueuesResponse>" in res.content)851 def test_get_list_queues_with_auth_in_presigned_url(self):852 res = self.list_queues_with_auth_in_presigned_url(method="GET")853 self.assertEqual(res.status_code, 200)854 self.assertTrue(b"<ListQueuesResponse>" in res.content)855 # ---------------856 # HELPER METHODS857 # ---------------858 def receive_dlq(self, queue_url, assert_error_details=False, assert_receive_count=None):859 """Assert that a message has been received on the given DLQ"""860 result = self.client.receive_message(QueueUrl=queue_url, MessageAttributeNames=["All"])861 self.assertGreater(len(result["Messages"]), 0)862 msg = result["Messages"][0]863 msg_attrs = msg.get("MessageAttributes") or msg.get("Attributes")864 if assert_error_details:865 self.assertIn("RequestID", msg_attrs)866 self.assertIn("ErrorCode", msg_attrs)867 self.assertIn("ErrorMessage", msg_attrs)868 else:869 if assert_receive_count is not None:870 pass871 # TODO: this started failing with latest moto upgrade,872 # probably in or around this commit:873 # https://github.com/spulec/moto/commit/6da4905da940e25e317db60b7657ea632f58ef1d874 # self.assertEqual(str(assert_receive_count), msg_attrs.get('ApproximateReceiveCount'))875 def check_tagged_queue(self, queue_url):876 response = self.client.list_queue_tags(QueueUrl=queue_url)877 self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])878 tags_keys_as_list = list(TEST_LAMBDA_TAGS.keys())879 for key in tags_keys_as_list:880 self.assertIn(key, response["Tags"])881 random_tag_key = random.choice(tags_keys_as_list)882 # Pop random chosen tag key from list883 tags_keys_as_list.pop(tags_keys_as_list.index(random_tag_key))884 response = self.client.untag_queue(QueueUrl=queue_url, TagKeys=[random_tag_key])885 self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])886 response = self.client.list_queue_tags(QueueUrl=queue_url)887 self.assertEqual(200, response["ResponseMetadata"]["HTTPStatusCode"])888 self.assertNotIn(random_tag_key, response["Tags"])889 for key in tags_keys_as_list:890 self.assertIn(key, response["Tags"])891 # Clean up892 self.client.untag_queue(893 QueueUrl=queue_url,894 TagKeys=tags_keys_as_list,895 )896 self.client.delete_queue(QueueUrl=queue_url)897 def check_msg_attributes(self, queue_url, attrs):898 messages_all_attributes = self.client.receive_message(899 QueueUrl=queue_url, MessageAttributeNames=["All"]900 )901 self.assertEqual(messages_all_attributes.get("Messages")[0]["MessageAttributes"], attrs)902class TestSqsProvider:903 def test_list_queues(self, sqs_client, sqs_create_queue):904 queue_names = [905 "a-test-queue-" + short_uid(),906 "a-test-queue-" + short_uid(),907 "b-test-queue-" + short_uid(),908 ]909 # create three queues with prefixes and collect their urls910 queue_urls = []911 for name in queue_names:912 sqs_create_queue(QueueName=name)913 queue_url = sqs_client.get_queue_url(QueueName=name)["QueueUrl"]914 assert queue_url.endswith(name)915 queue_urls.append(queue_url)916 # list queues with first prefix917 result = sqs_client.list_queues(QueueNamePrefix="a-test-queue-")918 assert "QueueUrls" in result919 assert len(result["QueueUrls"]) == 2920 assert queue_urls[0] in result["QueueUrls"]921 assert queue_urls[1] in result["QueueUrls"]922 assert queue_urls[2] not in result["QueueUrls"]923 # list queues with second prefix924 result = sqs_client.list_queues(QueueNamePrefix="b-test-queue-")925 assert "QueueUrls" in result926 assert len(result["QueueUrls"]) == 1927 assert queue_urls[0] not in result["QueueUrls"]928 assert queue_urls[1] not in result["QueueUrls"]929 assert queue_urls[2] in result["QueueUrls"]930 def test_create_queue_and_get_attributes(self, sqs_client, sqs_queue):931 result = sqs_client.get_queue_attributes(932 QueueUrl=sqs_queue, AttributeNames=["QueueArn", "CreatedTimestamp", "VisibilityTimeout"]933 )934 assert "Attributes" in result935 attrs = result["Attributes"]936 assert len(attrs) == 3937 assert "test-queue-" in attrs["QueueArn"]938 assert int(float(attrs["CreatedTimestamp"])) == pytest.approx(int(time.time()), 30)939 assert int(attrs["VisibilityTimeout"]) == 30, "visibility timeout is not the default value"940 def test_send_receive_message(self, sqs_client, sqs_queue):941 send_result = sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="message")942 assert send_result["MessageId"]943 assert send_result["MD5OfMessageBody"] == "78e731027d8fd50ed642340b7c9a63b3"944 # TODO: other attributes945 receive_result = sqs_client.receive_message(QueueUrl=sqs_queue)946 assert len(receive_result["Messages"]) == 1947 message = receive_result["Messages"][0]948 assert message["ReceiptHandle"]949 assert message["Body"] == "message"950 assert message["MessageId"] == send_result["MessageId"]951 assert message["MD5OfBody"] == send_result["MD5OfMessageBody"]952 def test_send_receive_message_multiple_queues(self, sqs_client, sqs_create_queue):953 queue0 = sqs_create_queue()954 queue1 = sqs_create_queue()955 sqs_client.send_message(QueueUrl=queue0, MessageBody="message")956 result = sqs_client.receive_message(QueueUrl=queue1)957 assert "Messages" not in result958 result = sqs_client.receive_message(QueueUrl=queue0)959 assert len(result["Messages"]) == 1960 assert result["Messages"][0]["Body"] == "message"961 def test_send_message_batch(self, sqs_client, sqs_queue):962 sqs_client.send_message_batch(963 QueueUrl=sqs_queue,964 Entries=[965 {"Id": "1", "MessageBody": "message-0"},966 {"Id": "2", "MessageBody": "message-1"},967 ],968 )969 response0 = sqs_client.receive_message(QueueUrl=sqs_queue)970 response1 = sqs_client.receive_message(QueueUrl=sqs_queue)971 response2 = sqs_client.receive_message(QueueUrl=sqs_queue)972 assert len(response0.get("Messages", [])) == 1973 assert len(response1.get("Messages", [])) == 1974 assert len(response2.get("Messages", [])) == 0975 message0 = response0["Messages"][0]976 message1 = response1["Messages"][0]977 assert message0["Body"] == "message-0"978 assert message1["Body"] == "message-1"979 def test_send_batch_receive_multiple(self, sqs_client, sqs_queue):980 # send a batch, then a single message, receive them a981 sqs_client.send_message_batch(982 QueueUrl=sqs_queue,983 Entries=[984 {"Id": "1", "MessageBody": "message-0"},985 {"Id": "2", "MessageBody": "message-1"},986 ],987 )988 sqs_client.send_message(QueueUrl=sqs_queue, MessageBody="message-2")989 response = sqs_client.receive_message(QueueUrl=sqs_queue, MaxNumberOfMessages=3)990 assert len(response["Messages"]) == 3991 assert response["Messages"][0]["Body"] == "message-0"992 assert response["Messages"][1]["Body"] == "message-1"993 assert response["Messages"][2]["Body"] == "message-2"994 def test_send_message_batch_with_empty_list(self, sqs_client, sqs_create_queue):995 queue_url = sqs_create_queue()996 try:997 sqs_client.send_message_batch(QueueUrl=queue_url, Entries=[])998 except ClientError as e:999 assert "EmptyBatchRequest" in e.response["Error"]["Code"]1000 assert e.response["ResponseMetadata"]["HTTPStatusCode"] in [400, 404]1001 def test_tag_untag_queue(self, sqs_client, sqs_create_queue):1002 queue_url = sqs_create_queue()1003 # tag queue1004 tags = {"tag1": "value1", "tag2": "value2", "tag3": ""}1005 sqs_client.tag_queue(QueueUrl=queue_url, Tags=tags)1006 # check queue tags1007 response = sqs_client.list_queue_tags(QueueUrl=queue_url)1008 assert response["Tags"] == tags1009 # remove tag1 and tag31010 sqs_client.untag_queue(QueueUrl=queue_url, TagKeys=["tag1", "tag3"])1011 response = sqs_client.list_queue_tags(QueueUrl=queue_url)1012 assert response["Tags"] == {"tag2": "value2"}1013 # remove tag21014 sqs_client.untag_queue(QueueUrl=queue_url, TagKeys=["tag2"])1015 response = sqs_client.list_queue_tags(QueueUrl=queue_url)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!