Best Python code snippet using localstack_python
test_sqs.py
Source:test_sqs.py
...2170 },2171 )2172 assert response.ok2173 assert "<DeleteQueueResponse " in response.text2174 assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)2175 @pytest.mark.aws_validated2176 def test_get_send_and_receive_messages(self, sqs_create_queue, sqs_http_client):2177 queue1_url = sqs_create_queue()2178 queue2_url = sqs_create_queue()2179 # items in queue 12180 response = sqs_http_client.get(2181 queue1_url,2182 params={2183 "Action": "SendMessage",2184 "MessageBody": "foobar",2185 },2186 )2187 assert response.ok2188 # no items in queue 22189 response = sqs_http_client.get(2190 queue2_url,2191 params={2192 "Action": "ReceiveMessage",2193 },2194 )2195 assert response.ok2196 assert "foobar" not in response.text2197 assert "<ReceiveMessageResult/>" in response.text.replace(2198 " />", "/>"2199 ) # expect response to be empty2200 # get items from queue 12201 response = sqs_http_client.get(2202 queue1_url,2203 params={2204 "Action": "ReceiveMessage",2205 },2206 )2207 assert response.ok2208 assert "<Body>foobar</Body>" in response.text2209 assert "<MD5OfBody>" in response.text2210 @pytest.mark.aws_validated2211 def test_get_on_deleted_queue_fails(2212 self, sqs_client, sqs_create_queue, sqs_http_client, sqs_queue_exists2213 ):2214 queue_url = sqs_create_queue()2215 sqs_client.delete_queue(QueueUrl=queue_url)2216 assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)2217 response = sqs_http_client.get(2218 queue_url,2219 params={2220 "Action": "GetQueueAttributes",2221 "AttributeName.1": "QueueArn",2222 },2223 )2224 assert "<Code>AWS.SimpleQueueService.NonExistentQueue</Code>" in response.text2225 assert "<Message>The specified queue does not exist for this wsdl version" in response.text2226 assert response.status_code == 4002227 @pytest.mark.aws_validated2228 def test_get_without_query_returns_unknown_operation(self, sqs_create_queue, sqs_http_client):2229 queue_name = f"test-queue-{short_uid()}"2230 queue_url = sqs_create_queue(QueueName=queue_name)...
test_sns.py
Source:test_sns.py
...1561 sns_topic_arn=topic_arn,1562 )1563 sqs_client.delete_queue(QueueUrl=queue_url)1564 # AWS takes some time to delete the queue, which make the test fails as it delivers the message correctly1565 assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)1566 message = "test_dlq_after_sqs_endpoint_deleted"1567 message_attr = {1568 "attr1": {1569 "DataType": "Number",1570 "StringValue": "111",1571 },1572 "attr2": {1573 "DataType": "Binary",1574 "BinaryValue": b"\x02\x03\x04",1575 },1576 }1577 sns_client.publish(TopicArn=topic_arn, Message=message, MessageAttributes=message_attr)1578 response = sqs_client.receive_message(1579 QueueUrl=dlq_url,1580 WaitTimeSeconds=10,1581 AttributeNames=["All"],1582 MessageAttributeNames=["All"],1583 )1584 snapshot.match("messages", response)1585 @pytest.mark.aws_validated1586 def test_message_attributes_not_missing(1587 self,1588 sns_client,1589 sqs_client,1590 sns_create_sqs_subscription,1591 sns_create_topic,1592 sqs_create_queue,1593 snapshot,1594 ):1595 # the hash isn't the same because of the Binary attributes (maybe decoding order?)1596 snapshot.add_transformer(1597 snapshot.transform.key_value(1598 "MD5OfMessageAttributes",1599 value_replacement="<md5-hash>",1600 reference_replacement=False,1601 )1602 )1603 topic_arn = sns_create_topic()["TopicArn"]1604 queue_url = sqs_create_queue()1605 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1606 sns_client.set_subscription_attributes(1607 SubscriptionArn=subscription["SubscriptionArn"],1608 AttributeName="RawMessageDelivery",1609 AttributeValue="true",1610 )1611 attributes = {1612 "an-attribute-key": {"DataType": "String", "StringValue": "an-attribute-value"},1613 "binary-attribute": {"DataType": "Binary", "BinaryValue": b"\x02\x03\x04"},1614 }1615 publish_response = sns_client.publish(1616 TopicArn=topic_arn,1617 Message="text",1618 MessageAttributes=attributes,1619 )1620 snapshot.match("publish-msg-raw", publish_response)1621 msg = sqs_client.receive_message(1622 QueueUrl=queue_url,1623 AttributeNames=["All"],1624 MessageAttributeNames=["All"],1625 WaitTimeSeconds=3,1626 )1627 # as SNS piggybacks on SQS MessageAttributes when RawDelivery is true1628 # BinaryValue depends on SQS implementation, and is decoded automatically1629 snapshot.match("raw-delivery-msg-attrs", msg)1630 sqs_client.delete_message(1631 QueueUrl=queue_url, ReceiptHandle=msg["Messages"][0]["ReceiptHandle"]1632 )1633 sns_client.set_subscription_attributes(1634 SubscriptionArn=subscription["SubscriptionArn"],1635 AttributeName="RawMessageDelivery",1636 AttributeValue="false",1637 )1638 publish_response = sns_client.publish(1639 TopicArn=topic_arn,1640 Message="text",1641 MessageAttributes=attributes,1642 )1643 snapshot.match("publish-msg-json", publish_response)1644 msg = sqs_client.receive_message(1645 QueueUrl=queue_url,1646 AttributeNames=["All"],1647 MessageAttributeNames=["All"],1648 WaitTimeSeconds=3,1649 )1650 snapshot.match("json-delivery-msg-attrs", msg)1651 # binary payload in base64 encoded by AWS, UTF-8 for JSON1652 # https://docs.aws.amazon.com/sns/latest/api/API_MessageAttributeValue.html1653 @pytest.mark.only_localstack1654 @pytest.mark.aws_validated1655 @pytest.mark.parametrize("raw_message_delivery", [True, False])1656 def test_subscribe_external_http_endpoint(1657 self,1658 sns_client,1659 sns_create_http_endpoint,1660 raw_message_delivery,1661 ):1662 # Necessitate manual set up to allow external access to endpoint, only in local testing1663 topic_arn, subscription_arn, endpoint_url, server = sns_create_http_endpoint(1664 raw_message_delivery1665 )1666 assert poll_condition(1667 lambda: len(server.log) >= 1,1668 timeout=5,1669 )1670 sub_request, _ = server.log[0]1671 payload = sub_request.get_json(force=True)1672 assert payload["Type"] == "SubscriptionConfirmation"1673 assert sub_request.headers["x-amz-sns-message-type"] == "SubscriptionConfirmation"1674 assert "Signature" in payload1675 assert "SigningCertURL" in payload1676 token = payload["Token"]1677 subscribe_url = payload["SubscribeURL"]1678 service_url, subscribe_url_path = payload["SubscribeURL"].rsplit("/", maxsplit=1)1679 assert subscribe_url == (1680 f"{service_url}/?Action=ConfirmSubscription" f"&TopicArn={topic_arn}&Token={token}"1681 )1682 confirm_subscribe_request = requests.get(subscribe_url)1683 confirm_subscribe = xmltodict.parse(confirm_subscribe_request.content)1684 assert (1685 confirm_subscribe["ConfirmSubscriptionResponse"]["ConfirmSubscriptionResult"][1686 "SubscriptionArn"1687 ]1688 == subscription_arn1689 )1690 subscription_attributes = sns_client.get_subscription_attributes(1691 SubscriptionArn=subscription_arn1692 )1693 assert subscription_attributes["Attributes"]["PendingConfirmation"] == "false"1694 message = "test_external_http_endpoint"1695 sns_client.publish(TopicArn=topic_arn, Message=message)1696 assert poll_condition(1697 lambda: len(server.log) >= 2,1698 timeout=5,1699 )1700 notification_request, _ = server.log[1]1701 assert notification_request.headers["x-amz-sns-message-type"] == "Notification"1702 expected_unsubscribe_url = (1703 f"{service_url}/?Action=Unsubscribe&SubscriptionArn={subscription_arn}"1704 )1705 if raw_message_delivery:1706 payload = notification_request.data.decode()1707 assert payload == message1708 else:1709 payload = notification_request.get_json(force=True)1710 assert payload["Type"] == "Notification"1711 assert "Signature" in payload1712 assert "SigningCertURL" in payload1713 assert payload["Message"] == message1714 assert payload["UnsubscribeURL"] == expected_unsubscribe_url1715 unsub_request = requests.get(expected_unsubscribe_url)1716 unsubscribe_confirmation = xmltodict.parse(unsub_request.content)1717 assert "UnsubscribeResponse" in unsubscribe_confirmation1718 assert poll_condition(1719 lambda: len(server.log) >= 3,1720 timeout=5,1721 )1722 unsub_request, _ = server.log[2]1723 payload = unsub_request.get_json(force=True)1724 assert payload["Type"] == "UnsubscribeConfirmation"1725 assert unsub_request.headers["x-amz-sns-message-type"] == "UnsubscribeConfirmation"1726 assert "Signature" in payload1727 assert "SigningCertURL" in payload1728 token = payload["Token"]1729 assert payload["SubscribeURL"] == (1730 f"{service_url}/?" f"Action=ConfirmSubscription&TopicArn={topic_arn}&Token={token}"1731 )1732 @pytest.mark.only_localstack1733 @pytest.mark.parametrize("raw_message_delivery", [True, False])1734 def test_dlq_external_http_endpoint(1735 self,1736 sns_client,1737 sqs_client,1738 sns_create_topic,1739 sqs_create_queue,1740 sqs_queue_arn,1741 sns_subscription,1742 sns_create_http_endpoint,1743 sns_create_sqs_subscription,1744 sns_allow_topic_sqs_queue,1745 raw_message_delivery,1746 ):1747 # Necessitate manual set up to allow external access to endpoint, only in local testing1748 topic_arn, http_subscription_arn, endpoint_url, server = sns_create_http_endpoint(1749 raw_message_delivery1750 )1751 dlq_url = sqs_create_queue()1752 dlq_arn = sqs_queue_arn(dlq_url)1753 sns_allow_topic_sqs_queue(1754 sqs_queue_url=dlq_url, sqs_queue_arn=dlq_arn, sns_topic_arn=topic_arn1755 )1756 sns_client.set_subscription_attributes(1757 SubscriptionArn=http_subscription_arn,1758 AttributeName="RedrivePolicy",1759 AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),1760 )1761 assert poll_condition(1762 lambda: len(server.log) >= 1,1763 timeout=5,1764 )1765 sub_request, _ = server.log[0]1766 payload = sub_request.get_json(force=True)1767 assert payload["Type"] == "SubscriptionConfirmation"1768 assert sub_request.headers["x-amz-sns-message-type"] == "SubscriptionConfirmation"1769 subscribe_url = payload["SubscribeURL"]1770 service_url, subscribe_url_path = payload["SubscribeURL"].rsplit("/", maxsplit=1)1771 confirm_subscribe_request = requests.get(subscribe_url)1772 confirm_subscribe = xmltodict.parse(confirm_subscribe_request.content)1773 assert (1774 confirm_subscribe["ConfirmSubscriptionResponse"]["ConfirmSubscriptionResult"][1775 "SubscriptionArn"1776 ]1777 == http_subscription_arn1778 )1779 subscription_attributes = sns_client.get_subscription_attributes(1780 SubscriptionArn=http_subscription_arn1781 )1782 assert subscription_attributes["Attributes"]["PendingConfirmation"] == "false"1783 server.stop()1784 wait_for_port_closed(server.port)1785 message = "test_dlq_external_http_endpoint"1786 sns_client.publish(TopicArn=topic_arn, Message=message)1787 response = sqs_client.receive_message(QueueUrl=dlq_url, WaitTimeSeconds=3)1788 assert (1789 len(response["Messages"]) == 11790 ), f"invalid number of messages in DLQ response {response}"1791 if raw_message_delivery:1792 assert response["Messages"][0]["Body"] == message1793 else:1794 received_message = json.loads(response["Messages"][0]["Body"])1795 assert received_message["Type"] == "Notification"1796 assert received_message["Message"] == message1797 receipt_handle = response["Messages"][0]["ReceiptHandle"]1798 sqs_client.delete_message(QueueUrl=dlq_url, ReceiptHandle=receipt_handle)1799 expected_unsubscribe_url = (1800 f"{service_url}/?Action=Unsubscribe&SubscriptionArn={http_subscription_arn}"1801 )1802 unsub_request = requests.get(expected_unsubscribe_url)1803 unsubscribe_confirmation = xmltodict.parse(unsub_request.content)1804 assert "UnsubscribeResponse" in unsubscribe_confirmation1805 response = sqs_client.receive_message(QueueUrl=dlq_url, WaitTimeSeconds=2)1806 # AWS doesn't send to the DLQ if the UnsubscribeConfirmation fails to be delivered1807 assert "Messages" not in response1808 @pytest.mark.aws_validated1809 def test_publish_too_long_message(self, sns_client, sns_create_topic, snapshot):1810 topic_arn = sns_create_topic()["TopicArn"]1811 # simulate payload over 256kb1812 message = "This is a test message" * 120001813 with pytest.raises(ClientError) as e:1814 sns_client.publish(TopicArn=topic_arn, Message=message)1815 snapshot.match("error", e.value.response)1816 assert e.value.response["Error"]["Code"] == "InvalidParameter"1817 assert e.value.response["Error"]["Message"] == "Invalid parameter: Message too long"1818 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001819 @pytest.mark.only_localstack # needs real credentials for GCM/FCM1820 def test_publish_to_gcm(self, sns_client):1821 key = "mock_server_key"1822 token = "mock_token"1823 platform_app_arn = sns_client.create_platform_application(1824 Name="firebase", Platform="GCM", Attributes={"PlatformCredential": key}1825 )["PlatformApplicationArn"]1826 endpoint_arn = sns_client.create_platform_endpoint(1827 PlatformApplicationArn=platform_app_arn,1828 Token=token,1829 )["EndpointArn"]1830 message = {1831 "GCM": '{ "notification": {"title": "Title of notification", "body": "It works" } }'1832 }1833 with pytest.raises(ClientError) as ex:1834 sns_client.publish(1835 TargetArn=endpoint_arn, MessageStructure="json", Message=json.dumps(message)1836 )1837 assert ex.value.response["Error"]["Code"] == "InvalidParameter"1838 sns_client.delete_endpoint(EndpointArn=endpoint_arn)1839 sns_client.delete_platform_application(PlatformApplicationArn=platform_app_arn)1840 @pytest.mark.aws_validated1841 @pytest.mark.skip_snapshot_verify(1842 paths=[1843 "$..Attributes.Owner",1844 "$..Attributes.ConfirmationWasAuthenticated",1845 "$..Attributes.RawMessageDelivery",1846 "$..Attributes.sqs_queue_url",1847 "$..Subscriptions..Owner",1848 ]1849 )1850 def test_subscription_after_failure_to_deliver(1851 self,1852 sns_client,1853 sqs_client,1854 sns_create_topic,1855 sqs_create_queue,1856 sqs_queue_arn,1857 sqs_queue_exists,1858 sns_create_sqs_subscription,1859 sns_allow_topic_sqs_queue,1860 snapshot,1861 ):1862 topic_arn = sns_create_topic()["TopicArn"]1863 queue_name = f"test-queue-{short_uid()}"1864 queue_url = sqs_create_queue(QueueName=queue_name)1865 subscription = sns_create_sqs_subscription(topic_arn=topic_arn, queue_url=queue_url)1866 subscription_arn = subscription["SubscriptionArn"]1867 dlq_url = sqs_create_queue()1868 dlq_arn = sqs_queue_arn(dlq_url)1869 sns_allow_topic_sqs_queue(1870 sqs_queue_url=dlq_url,1871 sqs_queue_arn=dlq_arn,1872 sns_topic_arn=topic_arn,1873 )1874 sub_attrs = sns_client.get_subscription_attributes(SubscriptionArn=subscription_arn)1875 snapshot.match("subscriptions-attrs", sub_attrs)1876 message = "test_dlq_before_sqs_endpoint_deleted"1877 sns_client.publish(TopicArn=topic_arn, Message=message)1878 response = sqs_client.receive_message(1879 QueueUrl=queue_url, WaitTimeSeconds=10, MaxNumberOfMessages=41880 )1881 snapshot.match("messages-before-delete", response)1882 sqs_client.delete_message(1883 QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]1884 )1885 sqs_client.delete_queue(QueueUrl=queue_url)1886 # try to send a message before setting a DLQ1887 message = "test_dlq_after_sqs_endpoint_deleted"1888 sns_client.publish(TopicArn=topic_arn, Message=message)1889 # to avoid race condition, publish is async and the redrive policy can be in effect before the actual publish1890 time.sleep(1)1891 # check the subscription is still there after we deleted the queue1892 subscriptions = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)1893 snapshot.match("subscriptions", subscriptions)1894 sns_client.set_subscription_attributes(1895 SubscriptionArn=subscription_arn,1896 AttributeName="RedrivePolicy",1897 AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),1898 )1899 sub_attrs = sns_client.get_subscription_attributes(SubscriptionArn=subscription_arn)1900 snapshot.match("subscriptions-attrs-with-redrive", sub_attrs)1901 # AWS takes some time to delete the queue, which make the test fails as it delivers the message correctly1902 assert poll_condition(lambda: not sqs_queue_exists(queue_url), timeout=5)1903 # test sending and receiving multiple messages1904 for i in range(2):1905 message = f"test_dlq_after_sqs_endpoint_deleted_{i}"1906 sns_client.publish(TopicArn=topic_arn, Message=message)1907 response = sqs_client.receive_message(1908 QueueUrl=dlq_url, WaitTimeSeconds=10, MaxNumberOfMessages=41909 )1910 sqs_client.delete_message(1911 QueueUrl=dlq_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]1912 )1913 snapshot.match(f"message-{i}-after-delete", response)1914 @pytest.mark.aws_validated1915 def test_publish_to_firehose_with_s3(1916 self,...
find_dangling_cloudwatch_alarms.py
Source:find_dangling_cloudwatch_alarms.py
...39 return False40 else:41 raise42@functools.cache43def sqs_queue_exists(sess, *, queue_name):44 """45 Returns True if this SQS queue exists, False otherwise.46 """47 client = sess.client("sqs")48 try:49 client.get_queue_url(QueueName=queue_name)50 return True51 except ClientError as err: # pragma: no cover52 if err.response["Error"]["Code"] == "AWS.SimpleQueueService.NonExistentQueue":53 return False54 else:55 raise56@functools.cache57def lambda_function_exists(sess, *, function_name):58 """59 Returns True if this Lambda function exists, False otherwise.60 """61 client = sess.client("lambda")62 try:63 client.get_function(FunctionName=function_name)64 return True65 except ClientError as err: # pragma: no cover66 if err.response["Error"]["Code"] == "ResourceNotFoundException":67 return False68 else:69 raise70if __name__ == "__main__":71 sess = boto3.Session()72 for alarm in get_metric_alarm_descriptions(sess):73 dimensions = {dim["Name"]: dim["Value"] for dim in alarm["Dimensions"]}74 # Is this alarm based on a non-existent table?75 if alarm.get("Namespace") == "AWS/DynamoDB":76 table_name = dimensions["TableName"]77 if not dynamodb_table_exists(sess, table_name=table_name):78 print(79 f"!!! Alarm {alarm['AlarmArn']} is based on non-existent table {table_name}"80 )81 continue82 # Is this alarm based on a non-existent queue?83 if alarm.get("Namespace") == "AWS/SQS":84 queue_name = dimensions["QueueName"]85 if not sqs_queue_exists(sess, queue_name=queue_name):86 print(87 f"!!! Alarm {alarm['AlarmArn']} is based on non-existent SQS queue {queue_name}"88 )89 continue90 # Is this alarm based on a non-existent Lambda function?91 if alarm.get("Namespace") == "AWS/Lambda":92 function_name = dimensions["FunctionName"]93 if not lambda_function_exists(sess, function_name=function_name):94 print(95 f"!!! Alarm {alarm['AlarmArn']} is based on non-existent Lambda function {function_name}"96 )...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!