Best Python code snippet using localstack_python
test_events.py
Source:test_events.py
...62 },63 },64]65class TestEvents:66 def assert_valid_event(self, event):67 expected_fields = (68 "version",69 "id",70 "detail-type",71 "source",72 "account",73 "time",74 "region",75 "resources",76 "detail",77 )78 for field in expected_fields:79 assert field in event80 def test_put_rule(self, events_client):81 rule_name = f"rule-{short_uid()}"82 events_client.put_rule(Name=rule_name, EventPattern=json.dumps(TEST_EVENT_PATTERN))83 rules = events_client.list_rules(NamePrefix=rule_name)["Rules"]84 assert len(rules) == 185 assert json.loads(rules[0]["EventPattern"]) == TEST_EVENT_PATTERN86 # clean up87 self.cleanup(rule_name=rule_name)88 def test_events_written_to_disk_are_timestamp_prefixed_for_chronological_ordering(89 self, events_client90 ):91 event_type = str(uuid.uuid4())92 event_details_to_publish = list(map(lambda n: f"event {n}", range(10)))93 for detail in event_details_to_publish:94 events_client.put_events(95 Entries=[96 {97 "Source": "unittest",98 "Resources": [],99 "DetailType": event_type,100 "Detail": json.dumps(detail),101 }102 ]103 )104 events_tmp_dir = _get_events_tmp_dir()105 sorted_events_written_to_disk = map(106 lambda filename: json.loads(str(load_file(os.path.join(events_tmp_dir, filename)))),107 sorted(os.listdir(events_tmp_dir)),108 )109 sorted_events = list(110 filter(111 lambda event: event.get("DetailType") == event_type,112 sorted_events_written_to_disk,113 )114 )115 assert (116 list(map(lambda event: json.loads(event["Detail"]), sorted_events))117 == event_details_to_publish118 )119 def test_list_tags_for_resource(self, events_client):120 rule_name = "rule-{}".format(short_uid())121 rule = events_client.put_rule(Name=rule_name, EventPattern=json.dumps(TEST_EVENT_PATTERN))122 rule_arn = rule["RuleArn"]123 expected = [124 {"Key": "key1", "Value": "value1"},125 {"Key": "key2", "Value": "value2"},126 ]127 # insert two tags, verify both are visible128 events_client.tag_resource(ResourceARN=rule_arn, Tags=expected)129 actual = events_client.list_tags_for_resource(ResourceARN=rule_arn)["Tags"]130 assert actual == expected131 # remove 'key2', verify only 'key1' remains132 expected = [{"Key": "key1", "Value": "value1"}]133 events_client.untag_resource(ResourceARN=rule_arn, TagKeys=["key2"])134 actual = events_client.list_tags_for_resource(ResourceARN=rule_arn)["Tags"]135 assert actual == expected136 # clean up137 self.cleanup(rule_name=rule_name)138 @pytest.mark.aws_validated139 def test_put_events_with_target_sqs(self, events_client, sqs_client):140 entries = [141 {142 "Source": TEST_EVENT_PATTERN["source"][0],143 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],144 "Detail": json.dumps(EVENT_DETAIL),145 }146 ]147 self._put_events_with_filter_to_sqs(148 events_client, sqs_client, pattern=TEST_EVENT_PATTERN, entries_asserts=[(entries, True)]149 )150 @pytest.mark.aws_validated151 def test_put_events_with_nested_event_pattern(self, events_client, sqs_client):152 pattern = {"detail": {"event": {"data": {"type": ["1"]}}}}153 entries1 = [154 {155 "Source": "test",156 "DetailType": "test",157 "Detail": json.dumps({"event": {"data": {"type": "1"}}}),158 }159 ]160 entries2 = [161 {162 "Source": "test",163 "DetailType": "test",164 "Detail": json.dumps({"event": {"data": {"type": "2"}}}),165 }166 ]167 entries3 = [168 {169 "Source": "test",170 "DetailType": "test",171 "Detail": json.dumps({"hello": "world"}),172 }173 ]174 entries_asserts = [(entries1, True), (entries2, False), (entries3, False)]175 self._put_events_with_filter_to_sqs(176 events_client,177 sqs_client,178 pattern=pattern,179 entries_asserts=entries_asserts,180 input_path="$.detail",181 )182 def test_put_events_with_target_sqs_event_detail_match(self, events_client, sqs_client):183 entries1 = [184 {185 "Source": TEST_EVENT_PATTERN["source"][0],186 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],187 "Detail": json.dumps({"EventType": "1"}),188 }189 ]190 entries2 = [191 {192 "Source": TEST_EVENT_PATTERN["source"][0],193 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],194 "Detail": json.dumps({"EventType": "2"}),195 }196 ]197 entries_asserts = [(entries1, True), (entries2, False)]198 self._put_events_with_filter_to_sqs(199 events_client,200 sqs_client,201 pattern={"detail": {"EventType": ["0", "1"]}},202 entries_asserts=entries_asserts,203 input_path="$.detail",204 )205 def _put_events_with_filter_to_sqs(206 self,207 events_client,208 sqs_client,209 pattern: Dict,210 entries_asserts: List[Tuple[List[Dict], bool]],211 input_path: str = None,212 ):213 queue_name = f"queue-{short_uid()}"214 rule_name = f"rule-{short_uid()}"215 target_id = f"target-{short_uid()}"216 bus_name = f"bus-{short_uid()}"217 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]218 queue_arn = self._get_queue_arn(queue_url, sqs_client)219 policy = {220 "Version": "2012-10-17",221 "Id": f"sqs-eventbridge-{short_uid()}",222 "Statement": [223 {224 "Sid": f"SendMessage-{short_uid()}",225 "Effect": "Allow",226 "Principal": {"Service": "events.amazonaws.com"},227 "Action": "sqs:SendMessage",228 "Resource": queue_arn,229 }230 ],231 }232 sqs_client.set_queue_attributes(233 QueueUrl=queue_url, Attributes={"Policy": json.dumps(policy)}234 )235 events_client.create_event_bus(Name=bus_name)236 events_client.put_rule(237 Name=rule_name,238 EventBusName=bus_name,239 EventPattern=json.dumps(pattern),240 )241 kwargs = {"InputPath": input_path} if input_path else {}242 rs = events_client.put_targets(243 Rule=rule_name,244 EventBusName=bus_name,245 Targets=[{"Id": target_id, "Arn": queue_arn, **kwargs}],246 )247 assert rs["FailedEntryCount"] == 0248 assert rs["FailedEntries"] == []249 try:250 for entry_asserts in entries_asserts:251 entries = entry_asserts[0]252 for entry in entries:253 entry.setdefault("EventBusName", bus_name)254 self._put_entries_assert_results_sqs(255 events_client,256 sqs_client,257 queue_url,258 entries=entries,259 should_match=entry_asserts[1],260 )261 finally:262 self.cleanup(263 bus_name,264 rule_name,265 target_id,266 queue_url=queue_url,267 events_client=events_client,268 sqs_client=sqs_client,269 )270 def _put_entries_assert_results_sqs(271 self, events_client, sqs_client, queue_url: str, entries: List[Dict], should_match: bool272 ):273 response = events_client.put_events(Entries=entries)274 assert not response.get("FailedEntryCount")275 def get_message(queue_url):276 resp = sqs_client.receive_message(QueueUrl=queue_url)277 messages = resp.get("Messages")278 if should_match:279 assert len(messages) == 1280 return messages281 messages = retry(get_message, retries=5, sleep=1, queue_url=queue_url)282 if should_match:283 actual_event = json.loads(messages[0]["Body"])284 if "detail" in actual_event:285 self.assert_valid_event(actual_event)286 else:287 assert not messages288 return messages289 # TODO: further unify/parameterize the tests for the different target types below290 def test_put_events_with_target_sns(291 self, events_client, sns_client, sqs_client, sns_subscription292 ):293 queue_name = "test-%s" % short_uid()294 rule_name = "rule-{}".format(short_uid())295 target_id = "target-{}".format(short_uid())296 bus_name = "bus-{}".format(short_uid())297 topic_name = "topic-{}".format(short_uid())298 topic_arn = sns_client.create_topic(Name=topic_name)["TopicArn"]299 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]300 queue_arn = aws_stack.sqs_queue_arn(queue_name)301 sns_subscription(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)302 events_client.create_event_bus(Name=bus_name)303 events_client.put_rule(304 Name=rule_name,305 EventBusName=bus_name,306 EventPattern=json.dumps(TEST_EVENT_PATTERN),307 )308 rs = events_client.put_targets(309 Rule=rule_name,310 EventBusName=bus_name,311 Targets=[{"Id": target_id, "Arn": topic_arn}],312 )313 assert "FailedEntryCount" in rs314 assert "FailedEntries" in rs315 assert rs["FailedEntryCount"] == 0316 assert rs["FailedEntries"] == []317 events_client.put_events(318 Entries=[319 {320 "EventBusName": bus_name,321 "Source": TEST_EVENT_PATTERN["source"][0],322 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],323 "Detail": json.dumps(EVENT_DETAIL),324 }325 ]326 )327 def get_message(queue_url):328 resp = sqs_client.receive_message(QueueUrl=queue_url)329 return resp["Messages"]330 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)331 assert len(messages) == 1332 actual_event = json.loads(messages[0]["Body"]).get("Message")333 self.assert_valid_event(actual_event)334 assert json.loads(actual_event).get("detail") == EVENT_DETAIL335 # clean up336 sns_client.delete_topic(TopicArn=topic_arn)337 self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)338 def test_put_events_into_event_bus(self, events_client, sqs_client):339 queue_name = "queue-{}".format(short_uid())340 rule_name = "rule-{}".format(short_uid())341 target_id = "target-{}".format(short_uid())342 bus_name_1 = "bus1-{}".format(short_uid())343 bus_name_2 = "bus2-{}".format(short_uid())344 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]345 queue_arn = self._get_queue_arn(queue_url, sqs_client)346 events_client.create_event_bus(Name=bus_name_1)347 resp = events_client.create_event_bus(Name=bus_name_2)348 events_client.put_rule(349 Name=rule_name,350 EventBusName=bus_name_1,351 EventPattern=json.dumps(TEST_EVENT_PATTERN),352 )353 events_client.put_targets(354 Rule=rule_name,355 EventBusName=bus_name_1,356 Targets=[{"Id": target_id, "Arn": resp.get("EventBusArn")}],357 )358 events_client.put_targets(359 Rule=rule_name,360 EventBusName=bus_name_2,361 Targets=[{"Id": target_id, "Arn": queue_arn}],362 )363 events_client.put_events(364 Entries=[365 {366 "EventBusName": bus_name_1,367 "Source": TEST_EVENT_PATTERN["source"][0],368 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],369 "Detail": json.dumps(EVENT_DETAIL),370 }371 ]372 )373 def get_message(queue_url):374 resp = sqs_client.receive_message(QueueUrl=queue_url)375 return resp["Messages"]376 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)377 assert len(messages) == 1378 actual_event = json.loads(messages[0]["Body"])379 self.assert_valid_event(actual_event)380 assert actual_event["detail"] == EVENT_DETAIL381 # clean up382 self.cleanup(bus_name_1, rule_name, target_id)383 self.cleanup(bus_name_2)384 sqs_client.delete_queue(QueueUrl=queue_url)385 def test_put_events_with_target_lambda(self, events_client):386 rule_name = "rule-{}".format(short_uid())387 function_name = "lambda-func-{}".format(short_uid())388 target_id = "target-{}".format(short_uid())389 bus_name = "bus-{}".format(short_uid())390 rs = testutil.create_lambda_function(391 handler_file=TEST_LAMBDA_PYTHON_ECHO,392 func_name=function_name,393 runtime=LAMBDA_RUNTIME_PYTHON36,394 )395 func_arn = rs["CreateFunctionResponse"]["FunctionArn"]396 events_client.create_event_bus(Name=bus_name)397 events_client.put_rule(398 Name=rule_name,399 EventBusName=bus_name,400 EventPattern=json.dumps(TEST_EVENT_PATTERN),401 )402 rs = events_client.put_targets(403 Rule=rule_name,404 EventBusName=bus_name,405 Targets=[{"Id": target_id, "Arn": func_arn}],406 )407 assert "FailedEntryCount" in rs408 assert "FailedEntries" in rs409 assert rs["FailedEntryCount"] == 0410 assert rs["FailedEntries"] == []411 events_client.put_events(412 Entries=[413 {414 "EventBusName": bus_name,415 "Source": TEST_EVENT_PATTERN["source"][0],416 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],417 "Detail": json.dumps(EVENT_DETAIL),418 }419 ]420 )421 # Get lambda's log events422 events = retry(423 check_expected_lambda_log_events_length,424 retries=3,425 sleep=1,426 function_name=function_name,427 expected_length=1,428 )429 actual_event = events[0]430 self.assert_valid_event(actual_event)431 assert actual_event["detail"] == EVENT_DETAIL432 # clean up433 testutil.delete_lambda_function(function_name)434 self.cleanup(bus_name, rule_name, target_id)435 def test_rule_disable(self, events_client):436 rule_name = "rule-{}".format(short_uid())437 events_client.put_rule(Name=rule_name, ScheduleExpression="rate(1 minutes)")438 response = events_client.list_rules()439 assert response["Rules"][0]["State"] == "ENABLED"440 events_client.disable_rule(Name=rule_name)441 response = events_client.list_rules(NamePrefix=rule_name)442 assert response["Rules"][0]["State"] == "DISABLED"443 # clean up444 self.cleanup(rule_name=rule_name)445 def test_scheduled_expression_events(446 self, stepfunctions_client, sns_client, sqs_client, events_client, sns_subscription447 ):448 class HttpEndpointListener(ProxyListener):449 def forward_request(self, method, path, data, headers):450 event = json.loads(to_str(data))451 events.append(event)452 return 200453 local_port = get_free_tcp_port()454 proxy = start_proxy(local_port, update_listener=HttpEndpointListener())455 wait_for_port_open(local_port)456 topic_name = "topic-{}".format(short_uid())457 queue_name = "queue-{}".format(short_uid())458 fifo_queue_name = "queue-{}.fifo".format(short_uid())459 rule_name = "rule-{}".format(short_uid())460 endpoint = "{}://{}:{}".format(461 get_service_protocol(), config.LOCALSTACK_HOSTNAME, local_port462 )463 sm_role_arn = aws_stack.role_arn("sfn_role")464 sm_name = "state-machine-{}".format(short_uid())465 topic_target_id = "target-{}".format(short_uid())466 sm_target_id = "target-{}".format(short_uid())467 queue_target_id = "target-{}".format(short_uid())468 fifo_queue_target_id = "target-{}".format(short_uid())469 events = []470 state_machine_definition = """471 {472 "StartAt": "Hello",473 "States": {474 "Hello": {475 "Type": "Pass",476 "Result": "World",477 "End": true478 }479 }480 }481 """482 state_machine_arn = stepfunctions_client.create_state_machine(483 name=sm_name, definition=state_machine_definition, roleArn=sm_role_arn484 )["stateMachineArn"]485 topic_arn = sns_client.create_topic(Name=topic_name)["TopicArn"]486 sns_subscription(TopicArn=topic_arn, Protocol="http", Endpoint=endpoint)487 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]488 fifo_queue_url = sqs_client.create_queue(489 QueueName=fifo_queue_name,490 Attributes={"FifoQueue": "true", "ContentBasedDeduplication": "true"},491 )["QueueUrl"]492 queue_arn = aws_stack.sqs_queue_arn(queue_name)493 fifo_queue_arn = aws_stack.sqs_queue_arn(fifo_queue_name)494 event = {"env": "testing"}495 events_client.put_rule(Name=rule_name, ScheduleExpression="rate(1 minutes)")496 events_client.put_targets(497 Rule=rule_name,498 Targets=[499 {"Id": topic_target_id, "Arn": topic_arn, "Input": json.dumps(event)},500 {501 "Id": sm_target_id,502 "Arn": state_machine_arn,503 "Input": json.dumps(event),504 },505 {"Id": queue_target_id, "Arn": queue_arn, "Input": json.dumps(event)},506 {507 "Id": fifo_queue_target_id,508 "Arn": fifo_queue_arn,509 "Input": json.dumps(event),510 "SqsParameters": {"MessageGroupId": "123"},511 },512 ],513 )514 def received(q_urls):515 # state machine got executed516 executions = stepfunctions_client.list_executions(stateMachineArn=state_machine_arn)[517 "executions"518 ]519 assert len(executions) >= 1520 # http endpoint got events521 assert len(events) >= 2522 notifications = [523 event["Message"] for event in events if event["Type"] == "Notification"524 ]525 assert len(notifications) >= 1526 # get state machine execution detail527 execution_arn = executions[0]["executionArn"]528 execution_input = stepfunctions_client.describe_execution(executionArn=execution_arn)[529 "input"530 ]531 all_msgs = []532 # get message from queue533 for url in q_urls:534 msgs = sqs_client.receive_message(QueueUrl=url).get("Messages", [])535 assert len(msgs) >= 1536 all_msgs.append(msgs[0])537 return execution_input, notifications[0], all_msgs538 execution_input, notification, msgs_received = retry(539 received, retries=5, sleep=15, q_urls=[queue_url, fifo_queue_url]540 )541 assert json.loads(notification) == event542 assert json.loads(execution_input) == event543 for msg_received in msgs_received:544 assert json.loads(msg_received["Body"]) == event545 # clean up546 proxy.stop()547 target_ids = [topic_target_id, sm_target_id, queue_target_id, fifo_queue_target_id]548 self.cleanup(None, rule_name, target_ids=target_ids, queue_url=queue_url)549 sns_client.delete_topic(TopicArn=topic_arn)550 stepfunctions_client.delete_state_machine(stateMachineArn=state_machine_arn)551 @pytest.mark.parametrize("auth", API_DESTINATION_AUTHS)552 def test_api_destinations(self, events_client, auth):553 token = short_uid()554 bearer = f"Bearer {token}"555 class HttpEndpointListener(ProxyListener):556 def forward_request(self, method, path, data, headers):557 event = json.loads(to_str(data))558 data_received.update(event)559 request_split = extract_query_string_params(path)560 paths_list.append(request_split[0])561 query_params_received.update(request_split[1])562 headers_received.update(headers)563 if "client_id" in event:564 oauth_data.update(565 {566 "client_id": event.get("client_id"),567 "client_secret": event.get("client_secret"),568 "header_value": headers.get("oauthheader"),569 "body_value": event.get("oauthbody"),570 "path": path,571 }572 )573 return requests_response(574 {575 "access_token": token,576 "token_type": "Bearer",577 "expires_in": 86400,578 }579 )580 data_received = {}581 query_params_received = {}582 paths_list = []583 headers_received = {}584 oauth_data = {}585 local_port = get_free_tcp_port()586 proxy = start_proxy(local_port, update_listener=HttpEndpointListener())587 wait_for_port_open(local_port)588 url = f"http://localhost:{local_port}"589 if auth.get("type") == "OAUTH_CLIENT_CREDENTIALS":590 auth["parameters"]["AuthorizationEndpoint"] = url591 connection_name = f"c-{short_uid()}"592 connection_arn = events_client.create_connection(593 Name=connection_name,594 AuthorizationType=auth.get("type"),595 AuthParameters={596 auth.get("key"): auth.get("parameters"),597 "InvocationHttpParameters": {598 "BodyParameters": [599 {600 "Key": "connection_body_param",601 "Value": "value",602 "IsValueSecret": False,603 },604 ],605 "HeaderParameters": [606 {607 "Key": "connection_header_param",608 "Value": "value",609 "IsValueSecret": False,610 },611 {612 "Key": "overwritten_header",613 "Value": "original",614 "IsValueSecret": False,615 },616 ],617 "QueryStringParameters": [618 {619 "Key": "connection_query_param",620 "Value": "value",621 "IsValueSecret": False,622 },623 {624 "Key": "overwritten_query",625 "Value": "original",626 "IsValueSecret": False,627 },628 ],629 },630 },631 )["ConnectionArn"]632 # create api destination633 dest_name = f"d-{short_uid()}"634 result = events_client.create_api_destination(635 Name=dest_name,636 ConnectionArn=connection_arn,637 InvocationEndpoint=url,638 HttpMethod="POST",639 )640 # create rule and target641 rule_name = f"r-{short_uid()}"642 target_id = f"target-{short_uid}"643 pattern = json.dumps({"source": ["source-123"], "detail-type": ["type-123"]})644 events_client.put_rule(Name=rule_name, EventPattern=pattern)645 events_client.put_targets(646 Rule=rule_name,647 Targets=[648 {649 "Id": target_id,650 "Arn": result["ApiDestinationArn"],651 "Input": '{"target_value":"value"}',652 "HttpParameters": {653 "PathParameterValues": ["target_path"],654 "HeaderParameters": {655 "target_header": "target_header_value",656 "overwritten_header": "changed",657 },658 "QueryStringParameters": {659 "target_query": "t_query",660 "overwritten_query": "changed",661 },662 },663 }664 ],665 )666 entries = [667 {668 "Source": "source-123",669 "DetailType": "type-123",670 "Detail": '{"i": 0}',671 }672 ]673 events_client.put_events(Entries=entries)674 # clean up675 events_client.delete_connection(Name=connection_name)676 events_client.delete_api_destination(Name=dest_name)677 self.cleanup(rule_name=rule_name, target_ids=target_id)678 # assert that all events have been received in the HTTP server listener679 user_pass = to_str(base64.b64encode(b"user:pass"))680 def check():681 # Connection data validation682 assert data_received.get("connection_body_param") == "value"683 assert headers_received.get("Connection_Header_Param") == "value"684 assert query_params_received.get("connection_query_param") == "value"685 # Target parameters validation686 assert "/target_path" in paths_list687 assert data_received.get("target_value") == "value"688 assert headers_received.get("Target_Header") == "target_header_value"689 assert query_params_received.get("target_query") == "t_query"690 # connection/target overwrite test691 assert headers_received.get("Overwritten_Header") == "original"692 assert query_params_received.get("overwritten_query") == "original"693 # Auth validation694 if auth.get("type") == "BASIC":695 assert headers_received.get("Authorization") == f"Basic {user_pass}"696 if auth.get("type") == "API_KEY":697 assert headers_received.get("Api") == "apikey_secret"698 if auth.get("type") == "OAUTH_CLIENT_CREDENTIALS":699 assert headers_received.get("Authorization") == bearer700 # Oauth login validation701 assert oauth_data.get("client_id") == "id"702 assert oauth_data.get("client_secret") == "password"703 assert oauth_data.get("header_value") == "value2"704 assert oauth_data.get("body_value") == "value1"705 assert "oauthquery=value3" in oauth_data.get("path")706 retry(check, sleep=0.5, retries=5)707 # clean up708 proxy.stop()709 def test_create_connection_validations(self, events_client):710 connection_name = "This should fail with two errors 123467890123412341234123412341234"711 with pytest.raises(ClientError) as ctx:712 events_client.create_connection(713 Name=connection_name,714 AuthorizationType="INVALID",715 AuthParameters={"BasicAuthParameters": {"Username": "user", "Password": "pass"}},716 ),717 assert ctx.value.response["ResponseMetadata"]["HTTPStatusCode"] == 400718 assert ctx.value.response["Error"]["Code"] == "ValidationException"719 message = ctx.value.response["Error"]["Message"]720 assert "3 validation errors" in message721 assert "must satisfy regular expression pattern" in message722 assert "must have length less than or equal to 64" in message723 assert "must satisfy enum value set: [BASIC, OAUTH_CLIENT_CREDENTIALS, API_KEY]" in message724 def test_put_events_with_target_firehose(self, events_client, s3_client, firehose_client):725 s3_bucket = "s3-{}".format(short_uid())726 s3_prefix = "testeventdata"727 stream_name = "firehose-{}".format(short_uid())728 rule_name = "rule-{}".format(short_uid())729 target_id = "target-{}".format(short_uid())730 bus_name = "bus-{}".format(short_uid())731 # create firehose target bucket732 aws_stack.get_or_create_bucket(s3_bucket)733 # create firehose delivery stream to s3734 stream = firehose_client.create_delivery_stream(735 DeliveryStreamName=stream_name,736 S3DestinationConfiguration={737 "RoleARN": aws_stack.iam_resource_arn("firehose"),738 "BucketARN": aws_stack.s3_bucket_arn(s3_bucket),739 "Prefix": s3_prefix,740 },741 )742 stream_arn = stream["DeliveryStreamARN"]743 events_client.create_event_bus(Name=bus_name)744 events_client.put_rule(745 Name=rule_name,746 EventBusName=bus_name,747 EventPattern=json.dumps(TEST_EVENT_PATTERN),748 )749 rs = events_client.put_targets(750 Rule=rule_name,751 EventBusName=bus_name,752 Targets=[{"Id": target_id, "Arn": stream_arn}],753 )754 assert "FailedEntryCount" in rs755 assert "FailedEntries" in rs756 assert rs["FailedEntryCount"] == 0757 assert rs["FailedEntries"] == []758 events_client.put_events(759 Entries=[760 {761 "EventBusName": bus_name,762 "Source": TEST_EVENT_PATTERN["source"][0],763 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],764 "Detail": json.dumps(EVENT_DETAIL),765 }766 ]767 )768 # run tests769 bucket_contents = s3_client.list_objects(Bucket=s3_bucket)["Contents"]770 assert len(bucket_contents) == 1771 key = bucket_contents[0]["Key"]772 s3_object = s3_client.get_object(Bucket=s3_bucket, Key=key)773 actual_event = json.loads(s3_object["Body"].read().decode())774 self.assert_valid_event(actual_event)775 assert actual_event["detail"] == EVENT_DETAIL776 # clean up777 firehose_client.delete_delivery_stream(DeliveryStreamName=stream_name)778 # empty and delete bucket779 s3_client.delete_object(Bucket=s3_bucket, Key=key)780 s3_client.delete_bucket(Bucket=s3_bucket)781 self.cleanup(bus_name, rule_name, target_id)782 def test_put_events_with_target_sqs_new_region(self):783 events_client = aws_stack.create_external_boto_client("events", region_name="eu-west-1")784 queue_name = "queue-{}".format(short_uid())785 rule_name = "rule-{}".format(short_uid())786 target_id = "target-{}".format(short_uid())787 bus_name = "bus-{}".format(short_uid())788 sqs_client = aws_stack.create_external_boto_client("sqs", region_name="eu-west-1")789 sqs_client.create_queue(QueueName=queue_name)790 queue_arn = aws_stack.sqs_queue_arn(queue_name)791 events_client.create_event_bus(Name=bus_name)792 events_client.put_rule(793 Name=rule_name,794 EventBusName=bus_name,795 EventPattern=json.dumps(TEST_EVENT_PATTERN),796 )797 events_client.put_targets(798 Rule=rule_name,799 EventBusName=bus_name,800 Targets=[{"Id": target_id, "Arn": queue_arn}],801 )802 response = events_client.put_events(803 Entries=[804 {805 "Source": "com.mycompany.myapp",806 "Detail": '{ "key1": "value1", "key": "value2" }',807 "Resources": [],808 "DetailType": "myDetailType",809 }810 ]811 )812 assert "Entries" in response813 assert len(response.get("Entries")) == 1814 assert "EventId" in response.get("Entries")[0]815 def test_put_events_with_target_kinesis(self, events_client, kinesis_client):816 rule_name = "rule-{}".format(short_uid())817 target_id = "target-{}".format(short_uid())818 bus_name = "bus-{}".format(short_uid())819 stream_name = "stream-{}".format(short_uid())820 stream_arn = aws_stack.kinesis_stream_arn(stream_name)821 kinesis_client.create_stream(StreamName=stream_name, ShardCount=1)822 events_client.create_event_bus(Name=bus_name)823 events_client.put_rule(824 Name=rule_name,825 EventBusName=bus_name,826 EventPattern=json.dumps(TEST_EVENT_PATTERN),827 )828 put_response = events_client.put_targets(829 Rule=rule_name,830 EventBusName=bus_name,831 Targets=[832 {833 "Id": target_id,834 "Arn": stream_arn,835 "KinesisParameters": {"PartitionKeyPath": "$.detail-type"},836 }837 ],838 )839 assert "FailedEntryCount" in put_response840 assert "FailedEntries" in put_response841 assert put_response["FailedEntryCount"] == 0842 assert put_response["FailedEntries"] == []843 def check_stream_status():844 _stream = kinesis_client.describe_stream(StreamName=stream_name)845 assert _stream["StreamDescription"]["StreamStatus"] == "ACTIVE"846 # wait until stream becomes available847 retry(check_stream_status, retries=7, sleep=0.8)848 events_client.put_events(849 Entries=[850 {851 "EventBusName": bus_name,852 "Source": TEST_EVENT_PATTERN["source"][0],853 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],854 "Detail": json.dumps(EVENT_DETAIL),855 }856 ]857 )858 stream = kinesis_client.describe_stream(StreamName=stream_name)859 shard_id = stream["StreamDescription"]["Shards"][0]["ShardId"]860 shard_iterator = kinesis_client.get_shard_iterator(861 StreamName=stream_name,862 ShardId=shard_id,863 ShardIteratorType="AT_TIMESTAMP",864 Timestamp=datetime(2020, 1, 1),865 )["ShardIterator"]866 record = kinesis_client.get_records(ShardIterator=shard_iterator)["Records"][0]867 partition_key = record["PartitionKey"]868 data = json.loads(record["Data"].decode())869 assert partition_key == TEST_EVENT_PATTERN["detail-type"][0]870 assert data["detail"] == EVENT_DETAIL871 self.assert_valid_event(data)872 def test_put_events_with_input_path(self, events_client, sqs_client):873 queue_name = f"queue-{short_uid()}"874 rule_name = f"rule-{short_uid()}"875 target_id = f"target-{short_uid()}"876 bus_name = f"bus-{short_uid()}"877 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]878 queue_arn = aws_stack.sqs_queue_arn(queue_name)879 events_client.create_event_bus(Name=bus_name)880 events_client.put_rule(881 Name=rule_name,882 EventBusName=bus_name,883 EventPattern=json.dumps(TEST_EVENT_PATTERN),884 )885 events_client.put_targets(886 Rule=rule_name,887 EventBusName=bus_name,888 Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],889 )890 events_client.put_events(891 Entries=[892 {893 "EventBusName": bus_name,894 "Source": TEST_EVENT_PATTERN["source"][0],895 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],896 "Detail": json.dumps(EVENT_DETAIL),897 }898 ]899 )900 def get_message(queue_url):901 resp = sqs_client.receive_message(QueueUrl=queue_url)902 return resp.get("Messages")903 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)904 assert len(messages) == 1905 assert json.loads(messages[0].get("Body")) == EVENT_DETAIL906 events_client.put_events(907 Entries=[908 {909 "EventBusName": bus_name,910 "Source": "dummySource",911 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],912 "Detail": json.dumps(EVENT_DETAIL),913 }914 ]915 )916 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)917 assert messages is None918 # clean up919 self.cleanup(bus_name, rule_name, target_id, queue_url=queue_url)920 def test_put_events_with_input_path_multiple(self, events_client, sqs_client):921 queue_name = "queue-{}".format(short_uid())922 queue_name_1 = "queue-{}".format(short_uid())923 rule_name = "rule-{}".format(short_uid())924 target_id = "target-{}".format(short_uid())925 target_id_1 = "target-{}".format(short_uid())926 bus_name = "bus-{}".format(short_uid())927 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]928 queue_arn = aws_stack.sqs_queue_arn(queue_name)929 queue_url_1 = sqs_client.create_queue(QueueName=queue_name_1)["QueueUrl"]930 queue_arn_1 = aws_stack.sqs_queue_arn(queue_name_1)931 events_client.create_event_bus(Name=bus_name)932 events_client.put_rule(933 Name=rule_name,934 EventBusName=bus_name,935 EventPattern=json.dumps(TEST_EVENT_PATTERN),936 )937 events_client.put_targets(938 Rule=rule_name,939 EventBusName=bus_name,940 Targets=[941 {"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"},942 {943 "Id": target_id_1,944 "Arn": queue_arn_1,945 },946 ],947 )948 events_client.put_events(949 Entries=[950 {951 "EventBusName": bus_name,952 "Source": TEST_EVENT_PATTERN["source"][0],953 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],954 "Detail": json.dumps(EVENT_DETAIL),955 }956 ]957 )958 def get_message(queue_url):959 resp = sqs_client.receive_message(QueueUrl=queue_url)960 return resp.get("Messages")961 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)962 assert len(messages) == 1963 assert json.loads(messages[0].get("Body")) == EVENT_DETAIL964 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url_1)965 assert len(messages) == 1966 assert json.loads(messages[0].get("Body")).get("detail") == EVENT_DETAIL967 events_client.put_events(968 Entries=[969 {970 "EventBusName": bus_name,971 "Source": "dummySource",972 "DetailType": TEST_EVENT_PATTERN["detail-type"][0],973 "Detail": json.dumps(EVENT_DETAIL),974 }975 ]976 )977 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)978 assert messages is None979 # clean up980 self.cleanup(bus_name, rule_name, [target_id, target_id_1], queue_url=queue_url)981 def test_put_event_without_source(self):982 events_client = aws_stack.create_external_boto_client("events", region_name="eu-west-1")983 response = events_client.put_events(Entries=[{"DetailType": "Test", "Detail": "{}"}])984 assert response.get("Entries")985 def test_put_event_without_detail(self):986 events_client = aws_stack.create_external_boto_client("events", region_name="eu-west-1")987 response = events_client.put_events(988 Entries=[989 {990 "DetailType": "Test",991 }992 ]993 )994 assert response.get("Entries")995 def test_trigger_event_on_ssm_change(self, events_client, sqs_client, ssm_client):996 rule_name = "rule-{}".format(short_uid())997 target_id = "target-{}".format(short_uid())998 # create queue999 queue_name = "queue-{}".format(short_uid())1000 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]1001 queue_arn = aws_stack.sqs_queue_arn(queue_name)1002 # put rule listening on SSM changes1003 ssm_prefix = "/test/local/"1004 events_client.put_rule(1005 Name=rule_name,1006 EventPattern=json.dumps(1007 {1008 "detail": {1009 "name": [{"prefix": ssm_prefix}],1010 "operation": ["Create", "Update", "Delete", "LabelParameterVersion"],1011 },1012 "detail-type": ["Parameter Store Change"],1013 "source": ["aws.ssm"],1014 }1015 ),1016 State="ENABLED",1017 Description="Trigger on SSM parameter changes",1018 )1019 # put target1020 events_client.put_targets(1021 Rule=rule_name,1022 EventBusName=TEST_EVENT_BUS_NAME,1023 Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],1024 )1025 # change SSM param to trigger event1026 ssm_client.put_parameter(Name=f"{ssm_prefix}/test123", Value="value1", Type="String")1027 def assert_message():1028 resp = sqs_client.receive_message(QueueUrl=queue_url)1029 result = resp.get("Messages")1030 body = json.loads(result[0]["Body"])1031 assert body == {"name": "/test/local/test123", "operation": "Create"}1032 # assert that message has been received1033 retry(assert_message, retries=7, sleep=0.3)1034 # clean up1035 self.cleanup(rule_name=rule_name, target_ids=target_id)1036 def test_put_event_with_content_base_rule_in_pattern(self, events_client, sqs_client):1037 queue_name = f"queue-{short_uid()}"1038 rule_name = f"rule-{short_uid()}"1039 target_id = f"target-{short_uid()}"1040 queue_url = sqs_client.create_queue(QueueName=queue_name)["QueueUrl"]1041 queue_arn = aws_stack.sqs_queue_arn(queue_name)1042 pattern = {1043 "Source": [{"exists": True}],1044 "detail-type": [{"prefix": "core.app"}],1045 "Detail": {1046 "decription": ["this-is-event-details"],1047 "amount": [200],1048 "salary": [2000, 4000],1049 "env": ["dev", "prod"],1050 "user": ["user1", "user2", "user3"],1051 "admins": ["skyli", {"prefix": "hey"}, {"prefix": "ad"}],1052 "test1": [{"anything-but": 200}],1053 "test2": [{"anything-but": "test2"}],1054 "test3": [{"anything-but": ["test3", "test33"]}],1055 "test4": [{"anything-but": {"prefix": "test4"}}],1056 "ip": [{"cidr": "10.102.1.0/24"}],1057 "num-test1": [{"numeric": ["<", 200]}],1058 "num-test2": [{"numeric": ["<=", 200]}],1059 "num-test3": [{"numeric": [">", 200]}],1060 "num-test4": [{"numeric": [">=", 200]}],1061 "num-test5": [{"numeric": [">=", 200, "<=", 500]}],1062 "num-test6": [{"numeric": [">", 200, "<", 500]}],1063 "num-test7": [{"numeric": [">=", 200, "<", 500]}],1064 },1065 }1066 event = {1067 "EventBusName": TEST_EVENT_BUS_NAME,1068 "Source": "core.update-account-command",1069 "DetailType": "core.app.backend",1070 "Detail": json.dumps(1071 {1072 "decription": "this-is-event-details",1073 "amount": 200,1074 "salary": 2000,1075 "env": "prod",1076 "user": "user3",1077 "admins": "admin",1078 "test1": 300,1079 "test2": "test22",1080 "test3": "test333",1081 "test4": "this test4",1082 "ip": "10.102.1.100",1083 "num-test1": 100,1084 "num-test2": 200,1085 "num-test3": 300,1086 "num-test4": 200,1087 "num-test5": 500,1088 "num-test6": 300,1089 "num-test7": 300,1090 }1091 ),1092 }1093 events_client.create_event_bus(Name=TEST_EVENT_BUS_NAME)1094 events_client.put_rule(1095 Name=rule_name,1096 EventBusName=TEST_EVENT_BUS_NAME,1097 EventPattern=json.dumps(pattern),1098 )1099 events_client.put_targets(1100 Rule=rule_name,1101 EventBusName=TEST_EVENT_BUS_NAME,1102 Targets=[{"Id": target_id, "Arn": queue_arn, "InputPath": "$.detail"}],1103 )1104 events_client.put_events(Entries=[event])1105 def get_message(queue_url):1106 resp = sqs_client.receive_message(QueueUrl=queue_url)1107 return resp.get("Messages")1108 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)1109 assert len(messages) == 11110 assert json.loads(messages[0].get("Body")) == json.loads(event["Detail"])1111 event_details = json.loads(event["Detail"])1112 event_details["admins"] = "no"1113 event["Detail"] = json.dumps(event_details)1114 events_client.put_events(Entries=[event])1115 messages = retry(get_message, retries=3, sleep=1, queue_url=queue_url)1116 assert messages is None1117 # clean up1118 self.cleanup(TEST_EVENT_BUS_NAME, rule_name, target_id, queue_url=queue_url)1119 @pytest.mark.parametrize(1120 "schedule_expression", ["rate(1 minute)", "rate(1 day)", "rate(1 hour)"]1121 )1122 @pytest.mark.aws_validated1123 def test_create_rule_with_one_unit_in_singular_should_succeed(1124 self, events_client, schedule_expression1125 ):1126 rule_name = f"rule-{short_uid()}"1127 # rule should be creatable with given expression1128 try:1129 events_client.put_rule(Name=rule_name, ScheduleExpression=schedule_expression)1130 finally:1131 self.cleanup(rule_name=rule_name, events_client=events_client)1132 @pytest.mark.parametrize(1133 "schedule_expression", ["rate(1 minutes)", "rate(1 days)", "rate(1 hours)"]1134 )1135 @pytest.mark.aws_validated1136 @pytest.mark.xfail1137 def test_create_rule_with_one_unit_in_plural_should_fail(1138 self, events_client, schedule_expression1139 ):1140 rule_name = f"rule-{short_uid()}"1141 # rule should not be creatable with given expression1142 with pytest.raises(ClientError):1143 events_client.put_rule(Name=rule_name, ScheduleExpression=schedule_expression)1144 @pytest.mark.aws_validated1145 @pytest.mark.xfail1146 def test_verify_rule_event_content(self, events_client, logs_client):1147 log_group_name = f"/aws/events/testLogGroup-{short_uid()}"1148 rule_name = f"rule-{short_uid()}"1149 target_id = f"testRuleId-{short_uid()}"1150 logs_client.create_log_group(logGroupName=log_group_name)1151 log_groups = logs_client.describe_log_groups(logGroupNamePrefix=log_group_name)1152 assert len(log_groups["logGroups"]) == 11153 log_group = log_groups["logGroups"][0]1154 log_group_arn = log_group["arn"]1155 events_client.put_rule(Name=rule_name, ScheduleExpression="rate(1 minute)")1156 events_client.put_targets(Rule=rule_name, Targets=[{"Id": target_id, "Arn": log_group_arn}])1157 def ensure_log_stream_exists():1158 streams = logs_client.describe_log_streams(logGroupName=log_group_name)1159 return len(streams["logStreams"]) == 11160 poll_condition(condition=ensure_log_stream_exists, timeout=65, interval=5)1161 log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)1162 log_stream_name = log_streams["logStreams"][0]["logStreamName"]1163 log_content = logs_client.get_log_events(1164 logGroupName=log_group_name, logStreamName=log_stream_name1165 )1166 events = log_content["events"]1167 assert len(events) == 11168 event = events[0]1169 self.assert_valid_event(event["message"])1170 self.cleanup(1171 rule_name=rule_name,1172 target_ids=target_id,1173 events_client=events_client,1174 logs_client=logs_client,1175 log_group_name=log_group_name,1176 )1177 def _get_queue_arn(self, queue_url, sqs_client):1178 queue_attrs = sqs_client.get_queue_attributes(1179 QueueUrl=queue_url, AttributeNames=["QueueArn"]1180 )1181 return queue_attrs["Attributes"]["QueueArn"]1182 def cleanup(1183 self,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!