Best Python code snippet using localstack_python
test_sns.py
Source:test_sns.py
...69 Action="lambda:InvokeFunction",70 Principal="sns.amazonaws.com",71 SourceArn=topic_arn,72 )73 sns_subscription(74 TopicArn=topic_arn,75 Protocol="lambda",76 Endpoint=lambda_arn,77 )78 sns_client.publish(TopicArn=topic_arn, Subject=subject, Message=message)79 events = retry(80 check_expected_lambda_log_events_length,81 retries=10,82 sleep=1,83 function_name=function_name,84 expected_length=1,85 regex_filter="Records.*Sns",86 logs_client=logs_client,87 )88 notification = events[0]["Records"][0]["Sns"]89 assert "Subject" in notification90 assert subject == notification["Subject"]91class TestSNSProvider:92 def test_publish_unicode_chars(93 self,94 sns_client,95 sns_create_topic,96 sqs_create_queue,97 sqs_client,98 sqs_queue_arn,99 sns_subscription,100 ):101 topic_arn = sns_create_topic()["TopicArn"]102 queue_url = sqs_create_queue()103 queue_arn = sqs_queue_arn(queue_url)104 sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)105 # publish message to SNS, receive it from SQS, assert that messages are equal106 message = 'ö§a1"_!?,. £$-'107 sns_client.publish(TopicArn=topic_arn, Message=message)108 def check_message():109 msgs = sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)110 msg_received = msgs["Messages"][0]111 msg_received = json.loads(to_str(msg_received["Body"]))112 msg_received = msg_received["Message"]113 assert message == msg_received114 retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)115 def test_subscribe_http_endpoint(self, sns_client, sns_create_topic, sns_subscription):116 topic_arn = sns_create_topic()["TopicArn"]117 # create HTTP endpoint and connect it to SNS topic118 class MyUpdateListener(ProxyListener):119 def forward_request(self, method, path, data, headers):120 records.append((json.loads(to_str(data)), headers))121 return 200122 records = []123 local_port = get_free_tcp_port()124 proxy = start_proxy(local_port, backend_url=None, update_listener=MyUpdateListener())125 wait_for_port_open(local_port)126 queue_arn = "%s://localhost:%s" % (get_service_protocol(), local_port)127 sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=queue_arn)128 def received():129 assert records[0][0]["Type"] == "SubscriptionConfirmation"130 assert records[0][1]["x-amz-sns-message-type"] == "SubscriptionConfirmation"131 token = records[0][0]["Token"]132 subscribe_url = records[0][0]["SubscribeURL"]133 assert subscribe_url == (134 f"{external_service_url('sns')}/?Action=ConfirmSubscription&TopicArn={topic_arn}&Token={token}"135 )136 assert "Signature" in records[0][0]137 assert "SigningCertURL" in records[0][0]138 retry(received, retries=5, sleep=1)139 proxy.stop()140 def test_subscribe_with_invalid_protocol(self, sns_client, sns_create_topic, sns_subscription):141 topic_arn = sns_create_topic(Name=TEST_TOPIC_NAME_2)["TopicArn"]142 with pytest.raises(ClientError) as e:143 sns_subscription(144 TopicArn=topic_arn, Protocol="test-protocol", Endpoint="localstack@yopmail.com"145 )146 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400147 assert e.value.response["Error"]["Code"] == "InvalidParameter"148 def test_attribute_raw_subscribe(149 self, sqs_client, sns_client, sns_create_topic, sqs_queue, sqs_queue_arn, sns_subscription150 ):151 topic_arn = sns_create_topic()["TopicArn"]152 # create SNS topic and connect it to an SQS queue153 queue_url = sqs_queue154 queue_arn = sqs_queue_arn(queue_url)155 attributes = {"RawMessageDelivery": "True"}156 sns_subscription(157 TopicArn=topic_arn,158 Protocol="sqs",159 Endpoint=queue_arn,160 Attributes=attributes,161 )162 # fetch subscription information163 subscription_list = sns_client.list_subscriptions()164 subscription_arn = ""165 for subscription in subscription_list["Subscriptions"]:166 if subscription["TopicArn"] == topic_arn:167 subscription_arn = subscription["SubscriptionArn"]168 actual_attributes = sns_client.get_subscription_attributes(169 SubscriptionArn=subscription_arn170 )["Attributes"]171 # assert the attributes are well set172 assert actual_attributes["RawMessageDelivery"]173 # publish message to SNS, receive it from SQS, assert that messages are equal and that they are Raw174 message = "This is a test message"175 binary_attribute = b"\x02\x03\x04"176 # extending this test case to test support for binary message attribute data177 # https://github.com/localstack/localstack/issues/2432178 sns_client.publish(179 TopicArn=topic_arn,180 Message=message,181 MessageAttributes={"store": {"DataType": "Binary", "BinaryValue": binary_attribute}},182 )183 def check_message():184 msgs = sqs_client.receive_message(185 QueueUrl=queue_url, MessageAttributeNames=["All"], VisibilityTimeout=0186 )187 msg_received = msgs["Messages"][0]188 assert message == msg_received["Body"]189 assert binary_attribute == msg_received["MessageAttributes"]["store"]["BinaryValue"]190 retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)191 sns_client.unsubscribe(SubscriptionArn=subscription_arn)192 def test_filter_policy(193 self,194 sqs_create_queue,195 sqs_queue_arn,196 sns_client,197 sns_create_topic,198 sqs_client,199 sns_subscription,200 ):201 # connect SNS topic to an SQS queue202 queue_name = f"queue-{short_uid()}"203 queue_url = sqs_create_queue(QueueName=queue_name)204 queue_arn = sqs_queue_arn(queue_url)205 topic_arn = sns_create_topic()["TopicArn"]206 filter_policy = {"attr1": [{"numeric": [">", 0, "<=", 100]}]}207 sns_subscription(208 TopicArn=topic_arn,209 Protocol="sqs",210 Endpoint=queue_arn,211 Attributes={"FilterPolicy": json.dumps(filter_policy)},212 )213 # get number of messages214 num_msgs_0 = len(215 sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0).get("Messages", [])216 )217 # publish message that satisfies the filter policy, assert that message is received218 message = "This is a test message"219 message_attributes = {"attr1": {"DataType": "Number", "StringValue": "99"}}220 sns_client.publish(221 TopicArn=topic_arn,222 Message=message,223 MessageAttributes=message_attributes,224 )225 def check_message():226 msgs_1 = sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)["Messages"]227 num_msgs_1 = len(msgs_1)228 assert num_msgs_1 == (num_msgs_0 + 1)229 return num_msgs_1230 num_msgs_1 = retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)231 # publish message that does not satisfy the filter policy, assert that message is not received232 message = "This is another test message"233 sns_client.publish(234 TopicArn=topic_arn,235 Message=message,236 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "111"}},237 )238 def check_message2():239 num_msgs_2 = len(240 sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)["Messages"]241 )242 assert num_msgs_2 == num_msgs_1243 return num_msgs_2244 retry(check_message2, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)245 def test_exists_filter_policy(246 self,247 sqs_create_queue,248 sqs_queue_arn,249 sns_create_topic,250 sns_client,251 sqs_client,252 sns_subscription,253 ):254 # connect SNS topic to an SQS queue255 queue_name = f"queue-{short_uid()}"256 queue_url = sqs_create_queue(QueueName=queue_name)257 queue_arn = sqs_queue_arn(queue_url)258 topic_arn = sns_create_topic()["TopicArn"]259 filter_policy = {"store": [{"exists": True}]}260 def do_subscribe(filter_policy, queue_arn):261 sns_subscription(262 TopicArn=topic_arn,263 Protocol="sqs",264 Endpoint=queue_arn,265 Attributes={"FilterPolicy": json.dumps(filter_policy)},266 )267 do_subscribe(filter_policy, queue_arn)268 # get number of messages269 num_msgs_0 = len(270 sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0).get("Messages", [])271 )272 # publish message that satisfies the filter policy, assert that message is received273 message = f"message-{short_uid()}"274 sns_client.publish(275 TopicArn=topic_arn,276 Message=message,277 MessageAttributes={278 "store": {"DataType": "Number", "StringValue": "99"},279 "def": {"DataType": "Number", "StringValue": "99"},280 },281 )282 def check_message1():283 num_msgs_1 = len(284 sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)["Messages"]285 )286 assert num_msgs_1 == (num_msgs_0 + 1)287 return num_msgs_1288 num_msgs_1 = retry(check_message1, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)289 # publish message that does not satisfy the filter policy, assert that message is not received290 message = f"message-{short_uid()}"291 sns_client.publish(292 TopicArn=topic_arn,293 Message=message,294 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "111"}},295 )296 def check_message2():297 num_msgs_2 = len(298 sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)["Messages"]299 )300 assert num_msgs_2 == num_msgs_1301 return num_msgs_2302 retry(check_message2, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)303 # test with exist operator set to false.304 queue_arn = aws_stack.sqs_queue_arn(TEST_QUEUE_NAME)305 filter_policy = {"store": [{"exists": False}]}306 do_subscribe(filter_policy, queue_arn)307 # get number of messages308 num_msgs_0 = len(sqs_client.receive_message(QueueUrl=queue_url).get("Messages", []))309 # publish message with the attribute and see if its getting filtered.310 message = f"message-{short_uid()}"311 sns_client.publish(312 TopicArn=topic_arn,313 Message=message,314 MessageAttributes={315 "store": {"DataType": "Number", "StringValue": "99"},316 "def": {"DataType": "Number", "StringValue": "99"},317 },318 )319 def check_message():320 num_msgs_1 = len(321 sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0).get(322 "Messages", []323 )324 )325 assert num_msgs_1 == num_msgs_0326 return num_msgs_1327 num_msgs_1 = retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)328 # publish message that without the attribute and see if its getting filtered.329 message = f"message-{short_uid()}"330 sns_client.publish(331 TopicArn=topic_arn,332 Message=message,333 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "111"}},334 )335 def check_message3():336 num_msgs_2 = len(337 sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0).get(338 "Messages", []339 )340 )341 assert num_msgs_2 == num_msgs_1342 return num_msgs_2343 retry(check_message3, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)344 def test_subscribe_sqs_queue(345 self,346 sqs_create_queue,347 sqs_queue_arn,348 sns_create_topic,349 sns_client,350 sqs_client,351 sns_subscription,352 ):353 # TODO: check with non default external port354 # connect SNS topic to an SQS queue355 queue_name = f"queue-{short_uid()}"356 queue_url = sqs_create_queue(QueueName=queue_name)357 queue_arn = sqs_queue_arn(queue_url)358 topic_arn = sns_create_topic()["TopicArn"]359 # create subscription with filter policy360 filter_policy = {"attr1": [{"numeric": [">", 0, "<=", 100]}]}361 subscription = sns_subscription(362 TopicArn=topic_arn,363 Protocol="sqs",364 Endpoint=queue_arn,365 Attributes={"FilterPolicy": json.dumps(filter_policy)},366 )367 # publish message that satisfies the filter policy368 message = "This is a test message"369 sns_client.publish(370 TopicArn=topic_arn,371 Message=message,372 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "99.12"}},373 )374 # assert that message is received375 def check_message():376 messages = sqs_client.receive_message(377 QueueUrl=queue_url, VisibilityTimeout=0, MessageAttributeNames=["All"]378 )["Messages"]379 message = messages[0]380 assert message["MessageAttributes"]["attr1"]["StringValue"] == "99.12"381 retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)382 # clean up383 sns_client.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"])384 def test_subscribe_platform_endpoint(385 self, sns_client, sqs_create_queue, sns_create_topic, sns_subscription386 ):387 sns_backend = SNSBackend.get()388 topic_arn = sns_create_topic()["TopicArn"]389 app_arn = sns_client.create_platform_application(Name="app1", Platform="p1", Attributes={})[390 "PlatformApplicationArn"391 ]392 platform_arn = sns_client.create_platform_endpoint(393 PlatformApplicationArn=app_arn, Token="token_1"394 )["EndpointArn"]395 # create subscription with filter policy396 filter_policy = {"attr1": [{"numeric": [">", 0, "<=", 100]}]}397 subscription = sns_subscription(398 TopicArn=topic_arn,399 Protocol="application",400 Endpoint=platform_arn,401 Attributes={"FilterPolicy": json.dumps(filter_policy)},402 )403 # publish message that satisfies the filter policy404 message = "This is a test message"405 sns_client.publish(406 TopicArn=topic_arn,407 Message=message,408 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": "99.12"}},409 )410 # assert that message has been received411 def check_message():412 assert len(sns_backend.platform_endpoint_messages[platform_arn]) > 0413 retry(check_message, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)414 # clean up415 sns_client.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"])416 sns_client.delete_endpoint(EndpointArn=platform_arn)417 sns_client.delete_platform_application(PlatformApplicationArn=app_arn)418 def test_unknown_topic_publish(self, sns_client):419 fake_arn = "arn:aws:sns:us-east-1:123456789012:i_dont_exist"420 message = "This is a test message"421 with pytest.raises(ClientError) as e:422 sns_client.publish(TopicArn=fake_arn, Message=message)423 assert e.value.response["Error"]["Code"] == "NotFound"424 assert e.value.response["Error"]["Message"] == "Topic does not exist"425 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404426 def test_publish_sms(self, sns_client):427 response = sns_client.publish(PhoneNumber="+33000000000", Message="This is a SMS")428 assert "MessageId" in response429 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200430 def test_publish_target(self, sns_client):431 response = sns_client.publish(432 TargetArn="arn:aws:sns:us-east-1:000000000000:endpoint/APNS/abcdef/0f7d5971-aa8b-4bd5-b585-0826e9f93a66",433 Message="This is a push notification",434 )435 assert "MessageId" in response436 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200437 def test_tags(self, sns_client, sns_create_topic):438 topic_arn = sns_create_topic()["TopicArn"]439 sns_client.tag_resource(440 ResourceArn=topic_arn,441 Tags=[442 {"Key": "123", "Value": "abc"},443 {"Key": "456", "Value": "def"},444 {"Key": "456", "Value": "def"},445 ],446 )447 tags = sns_client.list_tags_for_resource(ResourceArn=topic_arn)448 distinct_tags = [449 tag for idx, tag in enumerate(tags["Tags"]) if tag not in tags["Tags"][:idx]450 ]451 # test for duplicate tags452 assert len(tags["Tags"]) == len(distinct_tags)453 assert len(tags["Tags"]) == 2454 assert tags["Tags"][0]["Key"] == "123"455 assert tags["Tags"][0]["Value"] == "abc"456 assert tags["Tags"][1]["Key"] == "456"457 assert tags["Tags"][1]["Value"] == "def"458 sns_client.untag_resource(ResourceArn=topic_arn, TagKeys=["123"])459 tags = sns_client.list_tags_for_resource(ResourceArn=topic_arn)460 assert len(tags["Tags"]) == 1461 assert tags["Tags"][0]["Key"] == "456"462 assert tags["Tags"][0]["Value"] == "def"463 sns_client.tag_resource(ResourceArn=topic_arn, Tags=[{"Key": "456", "Value": "pqr"}])464 tags = sns_client.list_tags_for_resource(ResourceArn=topic_arn)465 assert len(tags["Tags"]) == 1466 assert tags["Tags"][0]["Key"] == "456"467 assert tags["Tags"][0]["Value"] == "pqr"468 def test_topic_subscription(self, sns_client, sns_create_topic, sns_subscription):469 topic_arn = sns_create_topic()["TopicArn"]470 subscription = sns_subscription(471 TopicArn=topic_arn,472 Protocol="email",473 Endpoint="localstack@yopmail.com",474 )475 sns_backend = SNSBackend.get()476 def check_subscription():477 subscription_arn = subscription["SubscriptionArn"]478 subscription_obj = sns_backend.subscription_status[subscription_arn]479 assert subscription_obj["Status"] == "Not Subscribed"480 _token = subscription_obj["Token"]481 sns_client.confirm_subscription(TopicArn=topic_arn, Token=_token)482 assert subscription_obj["Status"] == "Subscribed"483 retry(check_subscription, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)484 def test_sqs_topic_subscription_confirmation(485 self, sns_client, sns_create_topic, sqs_create_queue, sqs_queue_arn, sns_subscription486 ):487 topic_arn = sns_create_topic()["TopicArn"]488 queue_arn = sqs_queue_arn(sqs_create_queue())489 subscription = sns_subscription(490 TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn, ReturnSubscriptionArn=True491 )492 def check_subscription():493 subscription_arn = subscription["SubscriptionArn"]494 subscription_attrs = sns_client.get_subscription_attributes(495 SubscriptionArn=subscription_arn496 )497 assert subscription_attrs["Attributes"]["PendingConfirmation"] == "false"498 retry(check_subscription, retries=PUBLICATION_RETRIES, sleep=PUBLICATION_TIMEOUT)499 def test_dead_letter_queue(500 self,501 sns_client,502 sqs_client,503 sns_create_topic,504 sqs_create_queue,505 sqs_queue_arn,506 create_lambda_function,507 sns_subscription,508 ):509 lambda_name = f"test-{short_uid()}"510 lambda_arn = aws_stack.lambda_function_arn(lambda_name)511 topic_arn = sns_create_topic()["TopicArn"]512 queue_name = f"test-{short_uid()}"513 queue_url = sqs_create_queue(QueueName=queue_name)514 queue_arn = sqs_queue_arn(queue_url)515 create_lambda_function(516 func_name=lambda_name,517 handler_file=TEST_LAMBDA_PYTHON,518 libs=TEST_LAMBDA_LIBS,519 runtime=LAMBDA_RUNTIME_PYTHON36,520 DeadLetterConfig={"TargetArn": queue_arn},521 )522 sns_subscription(TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn)523 payload = {524 lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1,525 }526 sns_client.publish(TopicArn=topic_arn, Message=json.dumps(payload))527 def receive_dlq():528 result = sqs_client.receive_message(529 QueueUrl=queue_url, MessageAttributeNames=["All"], VisibilityTimeout=0530 )531 msg_attrs = result["Messages"][0]["MessageAttributes"]532 assert len(result["Messages"]) > 0533 assert "RequestID" in msg_attrs534 assert "ErrorCode" in msg_attrs535 assert "ErrorMessage" in msg_attrs536 retry(receive_dlq, retries=8, sleep=2)537 def test_redrive_policy_http_subscription(538 self,539 sns_client,540 sns_create_topic,541 sqs_client,542 sqs_create_queue,543 sqs_queue_arn,544 sns_subscription,545 ):546 # self.unsubscribe_all_from_sns()547 dlq_name = f"dlq-{short_uid()}"548 dlq_url = sqs_create_queue(QueueName=dlq_name)549 dlq_arn = sqs_queue_arn(dlq_url)550 topic_arn = sns_create_topic()["TopicArn"]551 # create HTTP endpoint and connect it to SNS topic552 class MyUpdateListener(ProxyListener):553 def forward_request(self, method, path, data, headers):554 records.append((json.loads(to_str(data)), headers))555 return 200556 records = []557 local_port = get_free_tcp_port()558 proxy = start_proxy(local_port, backend_url=None, update_listener=MyUpdateListener())559 wait_for_port_open(local_port)560 http_endpoint = f"{get_service_protocol()}://localhost:{local_port}"561 subscription = sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=http_endpoint)562 sns_client.set_subscription_attributes(563 SubscriptionArn=subscription["SubscriptionArn"],564 AttributeName="RedrivePolicy",565 AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),566 )567 proxy.stop()568 # for some reason, it takes a long time to stop the proxy thread -> TODO investigate569 time.sleep(5)570 sns_client.publish(571 TopicArn=topic_arn,572 Message=json.dumps({"message": "test_redrive_policy"}),573 )574 def receive_dlq():575 result = sqs_client.receive_message(QueueUrl=dlq_url, MessageAttributeNames=["All"])576 assert len(result["Messages"]) > 0577 assert (578 json.loads(json.loads(result["Messages"][0]["Body"])["Message"][0])["message"]579 == "test_redrive_policy"580 )581 retry(receive_dlq, retries=7, sleep=2.5)582 def test_redrive_policy_lambda_subscription(583 self,584 sns_client,585 sns_create_topic,586 sqs_create_queue,587 sqs_queue_arn,588 create_lambda_function,589 sqs_client,590 sns_subscription,591 ):592 # self.unsubscribe_all_from_sns()593 dlq_name = f"dlq-{short_uid()}"594 dlq_url = sqs_create_queue(QueueName=dlq_name)595 dlq_arn = sqs_queue_arn(dlq_url)596 topic_arn = sns_create_topic()["TopicArn"]597 lambda_name = f"test-{short_uid()}"598 lambda_arn = create_lambda_function(599 func_name=lambda_name,600 libs=TEST_LAMBDA_LIBS,601 handler_file=TEST_LAMBDA_PYTHON,602 runtime=LAMBDA_RUNTIME_PYTHON36,603 )["CreateFunctionResponse"]["FunctionArn"]604 subscription = sns_subscription(TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn)605 sns_client.set_subscription_attributes(606 SubscriptionArn=subscription["SubscriptionArn"],607 AttributeName="RedrivePolicy",608 AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),609 )610 testutil.delete_lambda_function(lambda_name)611 sns_client.publish(612 TopicArn=topic_arn,613 Message=json.dumps({"message": "test_redrive_policy"}),614 )615 def receive_dlq():616 result = sqs_client.receive_message(QueueUrl=dlq_url, MessageAttributeNames=["All"])617 assert len(result["Messages"]) > 0618 assert (619 json.loads(json.loads(result["Messages"][0]["Body"])["Message"][0])["message"]620 == "test_redrive_policy"621 )622 retry(receive_dlq, retries=10, sleep=2)623 def test_redrive_policy_queue_subscription(624 self,625 sns_client,626 sns_create_topic,627 sqs_create_queue,628 sqs_queue_arn,629 sqs_client,630 sns_subscription,631 ):632 # self.unsubscribe_all_from_sns()633 dlq_name = f"dlq-{short_uid()}"634 dlq_url = sqs_create_queue(QueueName=dlq_name)635 dlq_arn = sqs_queue_arn(dlq_url)636 topic_arn = sns_create_topic()["TopicArn"]637 invalid_queue_arn = aws_stack.sqs_queue_arn("invalid_queue")638 # subscribe with an invalid queue ARN, to trigger event on DLQ below639 subscription = sns_subscription(640 TopicArn=topic_arn, Protocol="sqs", Endpoint=invalid_queue_arn641 )642 sns_client.set_subscription_attributes(643 SubscriptionArn=subscription["SubscriptionArn"],644 AttributeName="RedrivePolicy",645 AttributeValue=json.dumps({"deadLetterTargetArn": dlq_arn}),646 )647 sns_client.publish(648 TopicArn=topic_arn, Message=json.dumps({"message": "test_redrive_policy"})649 )650 def receive_dlq():651 result = sqs_client.receive_message(QueueUrl=dlq_url, MessageAttributeNames=["All"])652 assert len(result["Messages"]) > 0653 assert (654 json.loads(json.loads(result["Messages"][0]["Body"])["Message"][0])["message"]655 == "test_redrive_policy"656 )657 retry(receive_dlq, retries=10, sleep=2)658 def test_publish_with_empty_subject(self, sns_client, sns_create_topic):659 topic_arn = sns_create_topic()["TopicArn"]660 # Publish without subject661 rs = sns_client.publish(TopicArn=topic_arn, Message=json.dumps({"message": "test_publish"}))662 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200663 with pytest.raises(ClientError) as e:664 sns_client.publish(665 TopicArn=topic_arn,666 Subject="",667 Message=json.dumps({"message": "test_publish"}),668 )669 assert e.value.response["Error"]["Code"] == "InvalidParameter"670 def test_create_topic_test_arn(self, sns_create_topic, sns_client):671 topic_name = f"topic-{short_uid()}"672 response = sns_create_topic(Name=topic_name)673 topic_arn_params = response["TopicArn"].split(":")674 testutil.response_arn_matches_partition(sns_client, response["TopicArn"])675 assert topic_arn_params[4] == TEST_AWS_ACCOUNT_ID676 assert topic_arn_params[5] == topic_name677 def test_publish_message_by_target_arn(678 self, sns_client, sns_create_topic, create_lambda_function, sns_subscription679 ):680 # self.unsubscribe_all_from_sns()681 func_name = f"lambda-{short_uid()}"682 topic_arn = sns_create_topic()["TopicArn"]683 lambda_arn = create_lambda_function(684 handler_file=TEST_LAMBDA_PYTHON_ECHO,685 func_name=func_name,686 runtime=LAMBDA_RUNTIME_PYTHON36,687 )["CreateFunctionResponse"]["FunctionArn"]688 subscription_arn = sns_subscription(689 TopicArn=topic_arn, Protocol="lambda", Endpoint=lambda_arn690 )["SubscriptionArn"]691 sns_client.publish(TopicArn=topic_arn, Message="test_message_1", Subject="test subject")692 # Lambda invoked 1 time693 events = retry(694 check_expected_lambda_log_events_length,695 retries=3,696 sleep=1,697 function_name=func_name,698 expected_length=1,699 )700 message = events[0]["Records"][0]701 assert message["EventSubscriptionArn"] == subscription_arn702 sns_client.publish(TargetArn=topic_arn, Message="test_message_2", Subject="test subject")703 events = retry(704 check_expected_lambda_log_events_length,705 retries=3,706 sleep=1,707 function_name=func_name,708 expected_length=2,709 )710 # Lambda invoked 1 more time711 assert len(events) == 2712 for event in events:713 message = event["Records"][0]714 assert message["EventSubscriptionArn"] == subscription_arn715 def test_publish_message_after_subscribe_topic(716 self,717 sns_client,718 sns_create_topic,719 sqs_client,720 sqs_create_queue,721 sqs_queue_arn,722 sns_subscription,723 ):724 # self.unsubscribe_all_from_sns()725 topic_arn = sns_create_topic()["TopicArn"]726 queue_url = sqs_create_queue()727 queue_arn = sqs_queue_arn(queue_url)728 rs = sns_client.publish(729 TopicArn=topic_arn, Subject="test subject", Message="test_message_1"730 )731 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200732 sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)733 message_subject = "sqs subject"734 message_body = "test_message_2"735 rs = sns_client.publish(TopicArn=topic_arn, Subject=message_subject, Message=message_body)736 # time.sleep(100)737 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 200738 message_id = rs["MessageId"]739 def get_message(q_url):740 resp = sqs_client.receive_message(QueueUrl=q_url, VisibilityTimeout=0)741 return json.loads(resp["Messages"][0]["Body"])742 message = retry(get_message, retries=3, sleep=2, q_url=queue_url)743 assert message["MessageId"] == message_id744 assert message["Subject"] == message_subject745 assert message["Message"] == message_body746 def test_create_duplicate_topic_with_more_tags(self, sns_client, sns_create_topic):747 topic_name = f"test-{short_uid()}"748 sns_create_topic(Name=topic_name)749 with pytest.raises(ClientError) as e:750 sns_client.create_topic(Name=topic_name, Tags=[{"Key": "456", "Value": "pqr"}])751 assert e.value.response["Error"]["Code"] == "InvalidParameter"752 assert e.value.response["Error"]["Message"] == "Topic already exists with different tags"753 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400754 def test_create_duplicate_topic_check_idempotentness(self, sns_create_topic):755 topic_name = f"test-{short_uid()}"756 tags = [{"Key": "a", "Value": "1"}, {"Key": "b", "Value": "2"}]757 kwargs = [758 {"Tags": tags}, # to create topic with two tags759 {"Tags": tags}, # to create the same topic again with same tags760 {"Tags": [tags[0]]}, # to create the same topic again with one of the tags from above761 {"Tags": []}, # to create the same topic again with no tags762 ]763 responses = []764 for arg in kwargs:765 responses.append(sns_create_topic(Name=topic_name, **arg))766 # assert TopicArn is returned by all the above create_topic calls767 for i in range(len(responses)):768 assert "TopicArn" in responses[i]769 def test_create_platform_endpoint_check_idempotentness(self, sns_client):770 response = sns_client.create_platform_application(771 Name=f"test-{short_uid()}",772 Platform="GCM",773 Attributes={"PlatformCredential": "123"},774 )775 kwargs_list = [776 {"Token": "test1", "CustomUserData": "test-data"},777 {"Token": "test1", "CustomUserData": "test-data"},778 {"Token": "test1"},779 {"Token": "test1"},780 ]781 platform_arn = response["PlatformApplicationArn"]782 responses = []783 for kwargs in kwargs_list:784 responses.append(785 sns_client.create_platform_endpoint(PlatformApplicationArn=platform_arn, **kwargs)786 )787 # Assert endpointarn is returned in every call create platform call788 for i in range(len(responses)):789 assert "EndpointArn" in responses[i]790 endpoint_arn = responses[0]["EndpointArn"]791 # clean up792 sns_client.delete_endpoint(EndpointArn=endpoint_arn)793 sns_client.delete_platform_application(PlatformApplicationArn=platform_arn)794 def test_publish_by_path_parameters(795 self,796 sns_create_topic,797 sns_client,798 sqs_client,799 sqs_create_queue,800 sqs_queue_arn,801 sns_subscription,802 ):803 topic_name = f"topic-{short_uid()}"804 queue_name = f"queue-{short_uid()}"805 message = f"test message {short_uid()}"806 topic_arn = sns_create_topic(Name=topic_name)["TopicArn"]807 base_url = (808 f"{get_service_protocol()}://{config.LOCALSTACK_HOSTNAME}:{config.service_port('sns')}"809 )810 path = "Action=Publish&Version=2010-03-31&TopicArn={}&Message={}".format(topic_arn, message)811 queue_url = sqs_create_queue(QueueName=queue_name)812 queue_arn = sqs_queue_arn(queue_url)813 subscription_arn = sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)[814 "SubscriptionArn"815 ]816 r = requests.post(817 url="{}/?{}".format(base_url, path),818 headers=aws_stack.mock_aws_request_headers("sns"),819 )820 assert r.status_code == 200821 def get_notification(q_url):822 resp = sqs_client.receive_message(QueueUrl=q_url)823 return json.loads(resp["Messages"][0]["Body"])824 notification = retry(get_notification, retries=3, sleep=2, q_url=queue_url)825 assert notification["TopicArn"] == topic_arn826 assert notification["Message"] == message827 sns_client.unsubscribe(SubscriptionArn=subscription_arn)828 def test_multiple_subscriptions_http_endpoint(829 self, sns_client, sns_create_topic, sns_subscription830 ):831 # create a topic832 topic_arn = sns_create_topic()["TopicArn"]833 # build fake http server endpoints834 _requests = queue.Queue()835 # create HTTP endpoint and connect it to SNS topic836 class MyUpdateListener(ProxyListener):837 def forward_request(self, method, path, data, headers):838 _requests.put(Request(method, path, headers=headers, body=data))839 return 429840 number_of_endpoints = 4841 proxies = []842 for _ in range(number_of_endpoints):843 local_port = get_free_tcp_port()844 proxies.append(845 start_proxy(local_port, backend_url=None, update_listener=MyUpdateListener())846 )847 wait_for_port_open(local_port)848 http_endpoint = f"{get_service_protocol()}://localhost:{local_port}"849 sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=http_endpoint)850 # fetch subscription information851 subscription_list = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn)852 assert subscription_list["ResponseMetadata"]["HTTPStatusCode"] == 200853 assert (854 len(subscription_list["Subscriptions"]) == number_of_endpoints855 ), f"unexpected number of subscriptions {subscription_list}"856 for _ in range(number_of_endpoints):857 request = _requests.get(timeout=2)858 assert request.get_json(True)["TopicArn"] == topic_arn859 with pytest.raises(queue.Empty):860 # make sure only four requests are received861 _requests.get(timeout=1)862 for proxy in proxies:863 proxy.stop()864 def test_publish_sms_endpoint(self, sns_client, sns_create_topic, sns_subscription):865 list_of_contacts = [866 f"+{random.randint(100000000, 9999999999)}",867 f"+{random.randint(100000000, 9999999999)}",868 f"+{random.randint(100000000, 9999999999)}",869 ]870 message = "Good news everyone!"871 topic_arn = sns_create_topic()["TopicArn"]872 for number in list_of_contacts:873 sns_subscription(TopicArn=topic_arn, Protocol="sms", Endpoint=number)874 sns_client.publish(Message=message, TopicArn=topic_arn)875 sns_backend = SNSBackend.get()876 def check_messages():877 sms_messages = sns_backend.sms_messages878 for contact in list_of_contacts:879 sms_was_found = False880 for message in sms_messages:881 if message["endpoint"] == contact:882 sms_was_found = True883 break884 assert sms_was_found885 retry(check_messages, sleep=0.5)886 def test_publish_sqs_from_sns(887 self,888 sns_client,889 sns_create_topic,890 sqs_client,891 sqs_create_queue,892 sqs_queue_arn,893 sns_subscription,894 ):895 topic_arn = sns_create_topic()["TopicArn"]896 queue_url = sqs_create_queue()897 queue_arn = sqs_queue_arn(queue_url)898 subscription_arn = sns_subscription(899 TopicArn=topic_arn,900 Protocol="sqs",901 Endpoint=queue_arn,902 Attributes={"RawMessageDelivery": "true"},903 )["SubscriptionArn"]904 string_value = "99.12"905 sns_client.publish(906 TopicArn=topic_arn,907 Message="Test msg",908 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": string_value}},909 )910 def get_message_with_attributes(queue_url):911 response = sqs_client.receive_message(912 QueueUrl=queue_url, MessageAttributeNames=["All"], VisibilityTimeout=0913 )914 assert response["Messages"][0]["MessageAttributes"] == {915 "attr1": {"DataType": "Number", "StringValue": string_value}916 }917 sqs_client.delete_message(918 QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"]919 )920 retry(get_message_with_attributes, retries=3, sleep=3, queue_url=queue_url)921 sns_client.set_subscription_attributes(922 SubscriptionArn=subscription_arn,923 AttributeName="RawMessageDelivery",924 AttributeValue="false",925 )926 string_value = "100.12"927 sns_client.publish(928 TargetArn=topic_arn,929 Message="Test msg",930 MessageAttributes={"attr1": {"DataType": "Number", "StringValue": string_value}},931 )932 retry(get_message_with_attributes, retries=3, sleep=3, queue_url=queue_url)933 def test_publish_batch_messages_from_sns_to_sqs(934 self,935 sns_client,936 sns_create_topic,937 sqs_create_queue,938 sqs_queue_arn,939 sqs_client,940 sns_subscription,941 ):942 topic_arn = sns_create_topic()["TopicArn"]943 queue_url = sqs_create_queue()944 queue_arn = sqs_queue_arn(queue_url)945 sns_subscription(946 TopicArn=topic_arn,947 Protocol="sqs",948 Endpoint=queue_arn,949 Attributes={"RawMessageDelivery": "true"},950 )951 publish_batch_response = sns_client.publish_batch(952 TopicArn=topic_arn,953 PublishBatchRequestEntries=[954 {955 "Id": "1",956 "Message": "Test Message with two attributes",957 "Subject": "Subject",958 "MessageAttributes": {959 "attr1": {"DataType": "Number", "StringValue": "99.12"},960 "attr2": {"DataType": "Number", "StringValue": "109.12"},961 },962 },963 {964 "Id": "2",965 "Message": "Test Message with one attribute",966 "Subject": "Subject",967 "MessageAttributes": {"attr1": {"DataType": "Number", "StringValue": "19.12"}},968 },969 {970 "Id": "3",971 "Message": "Test Message without attribute",972 "Subject": "Subject",973 },974 {975 "Id": "4",976 "Message": "Test Message without subject",977 },978 ],979 )980 assert "Successful" in publish_batch_response981 assert "Failed" in publish_batch_response982 for successful_resp in publish_batch_response["Successful"]:983 assert "Id" in successful_resp984 assert "MessageId" in successful_resp985 def get_messages(queue_url):986 response = sqs_client.receive_message(987 QueueUrl=queue_url,988 MessageAttributeNames=["All"],989 MaxNumberOfMessages=10,990 VisibilityTimeout=0,991 )992 assert len(response["Messages"]) == 4993 for message in response["Messages"]:994 assert "Body" in message995 if message["Body"] == "Test Message with two attributes":996 assert len(message["MessageAttributes"]) == 2997 assert message["MessageAttributes"]["attr1"] == {998 "StringValue": "99.12",999 "DataType": "Number",1000 }1001 assert message["MessageAttributes"]["attr2"] == {1002 "StringValue": "109.12",1003 "DataType": "Number",1004 }1005 elif message["Body"] == "Test Message with one attribute":1006 assert len(message["MessageAttributes"]) == 11007 assert message["MessageAttributes"]["attr1"] == {1008 "StringValue": "19.12",1009 "DataType": "Number",1010 }1011 elif message["Body"] == "Test Message without attribute":1012 assert message.get("MessageAttributes") is None1013 retry(get_messages, retries=5, sleep=1, queue_url=queue_url)1014 def test_publish_batch_messages_from_fifo_topic_to_fifo_queue(1015 self, sns_client, sns_create_topic, sqs_client, sqs_create_queue, sns_subscription1016 ):1017 topic_name = f"topic-{short_uid()}.fifo"1018 queue_name = f"queue-{short_uid()}.fifo"1019 topic_arn = sns_create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})["TopicArn"]1020 queue_url = sqs_create_queue(1021 QueueName=queue_name,1022 Attributes={"FifoQueue": "true"},1023 )1024 sns_subscription(1025 TopicArn=topic_arn,1026 Protocol="sqs",1027 Endpoint=queue_url,1028 Attributes={"RawMessageDelivery": "true"},1029 )1030 message_group_id = "complexMessageGroupId"1031 publish_batch_response = sns_client.publish_batch(1032 TopicArn=topic_arn,1033 PublishBatchRequestEntries=[1034 {1035 "Id": "1",1036 "MessageGroupId": message_group_id,1037 "Message": "Test Message with two attributes",1038 "Subject": "Subject",1039 "MessageAttributes": {1040 "attr1": {"DataType": "Number", "StringValue": "99.12"},1041 "attr2": {"DataType": "Number", "StringValue": "109.12"},1042 },1043 },1044 {1045 "Id": "2",1046 "MessageGroupId": message_group_id,1047 "Message": "Test Message with one attribute",1048 "Subject": "Subject",1049 "MessageAttributes": {"attr1": {"DataType": "Number", "StringValue": "19.12"}},1050 },1051 {1052 "Id": "3",1053 "MessageGroupId": message_group_id,1054 "Message": "Test Message without attribute",1055 "Subject": "Subject",1056 },1057 ],1058 )1059 assert "Successful" in publish_batch_response1060 assert "Failed" in publish_batch_response1061 for successful_resp in publish_batch_response["Successful"]:1062 assert "Id" in successful_resp1063 assert "MessageId" in successful_resp1064 def get_messages(queue_url):1065 response = sqs_client.receive_message(1066 QueueUrl=queue_url,1067 MessageAttributeNames=["All"],1068 AttributeNames=["All"],1069 MaxNumberOfMessages=10,1070 VisibilityTimeout=0,1071 )1072 assert len(response["Messages"]) == 31073 for message in response["Messages"]:1074 assert "Body" in message1075 assert message["Attributes"]["MessageGroupId"] == message_group_id1076 if message["Body"] == "Test Message with two attributes":1077 assert len(message["MessageAttributes"]) == 21078 assert message["MessageAttributes"]["attr1"] == {1079 "StringValue": "99.12",1080 "DataType": "Number",1081 }1082 assert message["MessageAttributes"]["attr2"] == {1083 "StringValue": "109.12",1084 "DataType": "Number",1085 }1086 elif message["Body"] == "Test Message with one attribute":1087 assert len(message["MessageAttributes"]) == 11088 assert message["MessageAttributes"]["attr1"] == {1089 "StringValue": "19.12",1090 "DataType": "Number",1091 }1092 elif message["Body"] == "Test Message without attribute":1093 assert message.get("MessageAttributes") is None1094 retry(get_messages, retries=5, sleep=1, queue_url=queue_url)1095 def test_publish_batch_exceptions(1096 self, sns_client, sqs_client, sns_create_topic, sqs_create_queue, sns_subscription1097 ):1098 topic_name = f"topic-{short_uid()}.fifo"1099 queue_name = f"queue-{short_uid()}.fifo"1100 topic_arn = sns_create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})["TopicArn"]1101 queue_url = sqs_create_queue(1102 QueueName=queue_name,1103 Attributes={"FifoQueue": "true"},1104 )1105 queue_arn = aws_stack.sqs_queue_arn(queue_url)1106 sns_subscription(1107 TopicArn=topic_arn,1108 Protocol="sqs",1109 Endpoint=queue_arn,1110 Attributes={"RawMessageDelivery": "true"},1111 )1112 with pytest.raises(ClientError) as e:1113 sns_client.publish_batch(1114 TopicArn=topic_arn,1115 PublishBatchRequestEntries=[1116 {1117 "Id": "1",1118 "Message": "Test Message with two attributes",1119 }1120 ],1121 )1122 assert e.value.response["Error"]["Code"] == "InvalidParameter"1123 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001124 with pytest.raises(ClientError) as e:1125 sns_client.publish_batch(1126 TopicArn=topic_arn,1127 PublishBatchRequestEntries=[1128 {"Id": f"Id_{i}", "Message": f"message_{i}"} for i in range(11)1129 ],1130 )1131 assert e.value.response["Error"]["Code"] == "TooManyEntriesInBatchRequest"1132 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001133 with pytest.raises(ClientError) as e:1134 sns_client.publish_batch(1135 TopicArn=topic_arn,1136 PublishBatchRequestEntries=[1137 {"Id": "1", "Message": f"message_{i}"} for i in range(2)1138 ],1139 )1140 assert e.value.response["Error"]["Code"] == "BatchEntryIdsNotDistinct"1141 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4001142 def add_xray_header(self, request, **kwargs):1143 request.headers[1144 "X-Amzn-Trace-Id"1145 ] = "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"1146 def test_publish_sqs_from_sns_with_xray_propagation(1147 self, sns_client, sns_create_topic, sqs_client, sqs_create_queue, sns_subscription1148 ):1149 # TODO: remove or adapt for asf1150 if SQS_BACKEND_IMPL != "elasticmq":1151 pytest.skip("not using elasticmq as SQS backend")1152 sns_client.meta.events.register("before-send.sns.Publish", self.add_xray_header)1153 topic = sns_create_topic()1154 topic_arn = topic["TopicArn"]1155 queue_url = sqs_create_queue()1156 sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_url)1157 sns_client.publish(TargetArn=topic_arn, Message="X-Ray propagation test msg")1158 response = sqs_client.receive_message(1159 QueueUrl=queue_url,1160 AttributeNames=["SentTimestamp", "AWSTraceHeader"],1161 MaxNumberOfMessages=1,1162 MessageAttributeNames=["All"],1163 VisibilityTimeout=2,1164 WaitTimeSeconds=2,1165 )1166 assert len(response["Messages"]) == 11167 message = response["Messages"][0]1168 assert "Attributes" in message1169 assert "AWSTraceHeader" in message["Attributes"]1170 assert (1171 message["Attributes"]["AWSTraceHeader"]1172 == "Root=1-3152b799-8954dae64eda91bc9a23a7e8;Parent=7fa8c0f79203be72;Sampled=1"1173 )1174 def test_create_topic_after_delete_with_new_tags(self, sns_create_topic, sns_client):1175 topic_name = f"test-{short_uid()}"1176 topic = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "pqr"}])1177 sns_client.delete_topic(TopicArn=topic["TopicArn"])1178 topic1 = sns_create_topic(Name=topic_name, Tags=[{"Key": "Name", "Value": "abc"}])1179 assert topic["TopicArn"] == topic1["TopicArn"]1180 def test_not_found_error_on_get_subscription_attributes(1181 self, sns_client, sns_create_topic, sqs_create_queue, sqs_queue_arn, sns_subscription1182 ):1183 topic_arn = sns_create_topic()["TopicArn"]1184 queue_url = sqs_create_queue()1185 queue_arn = sqs_queue_arn(queue_url)1186 subscription = sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)1187 subscription_attributes = sns_client.get_subscription_attributes(1188 SubscriptionArn=subscription["SubscriptionArn"]1189 )1190 assert (1191 subscription_attributes.get("Attributes").get("SubscriptionArn")1192 == subscription["SubscriptionArn"]1193 )1194 sns_client.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"])1195 with pytest.raises(ClientError) as e:1196 sns_client.get_subscription_attributes(SubscriptionArn=subscription["SubscriptionArn"])1197 assert e.value.response["Error"]["Code"] == "NotFound"1198 assert e.value.response["ResponseMetadata"]["HTTPStatusCode"] == 4041199 def test_message_to_fifo_sqs(1200 self,1201 sns_client,1202 sqs_client,1203 sns_create_topic,1204 sqs_create_queue,1205 sqs_queue_arn,1206 sns_subscription,1207 ):1208 topic_name = f"topic-{short_uid()}.fifo"1209 queue_name = f"queue-{short_uid()}.fifo"1210 topic_arn = sns_create_topic(Name=topic_name, Attributes={"FifoTopic": "true"})["TopicArn"]1211 queue_url = sqs_create_queue(1212 QueueName=queue_name,1213 Attributes={"FifoQueue": "true"},1214 )1215 queue_arn = sqs_queue_arn(queue_url)1216 sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)1217 message = "Test"1218 sns_client.publish(TopicArn=topic_arn, Message=message, MessageGroupId=short_uid())1219 def get_message():1220 received = sqs_client.receive_message(QueueUrl=queue_url, VisibilityTimeout=0)[1221 "Messages"1222 ][0]["Body"]1223 assert json.loads(received)["Message"] == message1224 retry(get_message, retries=10, sleep_before=0.15, sleep=1)1225 def test_validations_for_fifo(1226 self,1227 sns_client,1228 sqs_client,1229 sns_create_topic,1230 sqs_create_queue,1231 sqs_queue_arn,1232 sns_subscription,1233 ):1234 topic_name = f"topic-{short_uid()}"1235 fifo_topic_name = f"topic-{short_uid()}.fifo"1236 fifo_queue_name = f"queue-{short_uid()}.fifo"1237 topic_arn = sns_create_topic(Name=topic_name)["TopicArn"]1238 fifo_topic_arn = sns_create_topic(Name=fifo_topic_name, Attributes={"FifoTopic": "true"})[1239 "TopicArn"1240 ]1241 fifo_queue_url = sqs_create_queue(1242 QueueName=fifo_queue_name, Attributes={"FifoQueue": "true"}1243 )1244 fifo_queue_arn = sqs_queue_arn(fifo_queue_url)1245 with pytest.raises(ClientError) as e:1246 sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=fifo_queue_arn)1247 assert e.match("standard SNS topic")1248 with pytest.raises(ClientError) as e:1249 sns_client.publish(TopicArn=fifo_topic_arn, Message="test")1250 assert e.match("MessageGroupId")1251 def test_empty_sns_message(1252 self, sns_client, sqs_client, sns_topic, sqs_queue, sqs_queue_arn, sns_subscription1253 ):1254 topic_arn = sns_topic["Attributes"]["TopicArn"]1255 queue_arn = sqs_queue_arn(sqs_queue)1256 sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)1257 with pytest.raises(ClientError) as e:1258 sns_client.publish(Message="", TopicArn=topic_arn)1259 assert e.match("Empty message")1260 assert (1261 sqs_client.get_queue_attributes(1262 QueueUrl=sqs_queue, AttributeNames=["ApproximateNumberOfMessages"]1263 )["Attributes"]["ApproximateNumberOfMessages"]1264 == "0"...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!