Best Python code snippet using localstack_python
test_lambda_integration.py
Source:test_lambda_integration.py
...197 runtime=LAMBDA_RUNTIME_PYTHON37,198 role=role_arn,199 )200 dynamodb_create_table(table_name=table_name, partition_key=partition_key)201 _await_dynamodb_table_active(dynamodb_client, table_name)202 stream_arn = dynamodb_client.update_table(203 TableName=table_name,204 StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_IMAGE"},205 )["TableDescription"]["LatestStreamArn"]206 event_source_uuid = lambda_client.create_event_source_mapping(207 FunctionName=function_name,208 BatchSize=1,209 StartingPosition="LATEST",210 EventSourceArn=stream_arn,211 MaximumBatchingWindowInSeconds=1,212 MaximumRetryAttempts=1,213 )["UUID"]214 _await_event_source_mapping_enabled(lambda_client, event_source_uuid)215 dynamodb_client.put_item(TableName=table_name, Item=db_item)216 retry(check_logs, retries=50, sleep=2)217 finally:218 lambda_client.delete_event_source_mapping(UUID=event_source_uuid)219 def test_disabled_dynamodb_event_source_mapping(220 self,221 create_lambda_function,222 lambda_client,223 dynamodb_resource,224 dynamodb_client,225 dynamodb_create_table,226 logs_client,227 dynamodbstreams_client,228 lambda_su_role,229 ):230 def is_stream_enabled():231 return (232 dynamodbstreams_client.describe_stream(StreamArn=latest_stream_arn)[233 "StreamDescription"234 ]["StreamStatus"]235 == "ENABLED"236 )237 function_name = f"lambda_func-{short_uid()}"238 ddb_table = f"ddb_table-{short_uid()}"239 items = [240 {"id": short_uid(), "data": "data1"},241 {"id": short_uid(), "data": "data2"},242 ]243 try:244 create_lambda_function(245 func_name=function_name,246 handler_file=TEST_LAMBDA_PYTHON_ECHO,247 runtime=LAMBDA_RUNTIME_PYTHON36,248 role=lambda_su_role,249 )250 latest_stream_arn = dynamodb_create_table(251 table_name=ddb_table, partition_key="id", stream_view_type="NEW_IMAGE"252 )["TableDescription"]["LatestStreamArn"]253 rs = lambda_client.create_event_source_mapping(254 FunctionName=function_name,255 EventSourceArn=latest_stream_arn,256 StartingPosition="TRIM_HORIZON",257 MaximumBatchingWindowInSeconds=1,258 )259 uuid = rs["UUID"]260 _await_event_source_mapping_enabled(lambda_client, uuid)261 assert poll_condition(is_stream_enabled, timeout=30)262 table = dynamodb_resource.Table(ddb_table)263 table.put_item(Item=items[0])264 # Lambda should be invoked 1 time265 retry(266 check_expected_lambda_log_events_length,267 retries=10,268 sleep=3,269 function_name=function_name,270 expected_length=1,271 logs_client=logs_client,272 )273 # disable event source mapping274 lambda_client.update_event_source_mapping(UUID=uuid, Enabled=False)275 time.sleep(2)276 table.put_item(Item=items[1])277 # lambda no longer invoked, still have 1 event278 check_expected_lambda_log_events_length(279 expected_length=1, function_name=function_name, logs_client=logs_client280 )281 finally:282 lambda_client.delete_event_source_mapping(UUID=uuid)283 # TODO invalid test against AWS, this behavior just is not correct284 def test_deletion_event_source_mapping_with_dynamodb(285 self, create_lambda_function, lambda_client, dynamodb_client, lambda_su_role286 ):287 function_name = f"lambda_func-{short_uid()}"288 ddb_table = f"ddb_table-{short_uid()}"289 create_lambda_function(290 func_name=function_name,291 handler_file=TEST_LAMBDA_PYTHON_ECHO,292 runtime=LAMBDA_RUNTIME_PYTHON36,293 role=lambda_su_role,294 )295 latest_stream_arn = aws_stack.create_dynamodb_table(296 table_name=ddb_table,297 partition_key="id",298 client=dynamodb_client,299 stream_view_type="NEW_IMAGE",300 )["TableDescription"]["LatestStreamArn"]301 lambda_client.create_event_source_mapping(302 FunctionName=function_name,303 EventSourceArn=latest_stream_arn,304 StartingPosition="TRIM_HORIZON",305 )306 _await_dynamodb_table_active(dynamodb_client, ddb_table)307 dynamodb_client.delete_table(TableName=ddb_table)308 result = lambda_client.list_event_source_mappings(EventSourceArn=latest_stream_arn)309 assert 1 == len(result["EventSourceMappings"])310 def test_dynamodb_event_source_mapping_with_on_failure_destination_config(311 self,312 lambda_client,313 create_lambda_function,314 sqs_client,315 sqs_queue_arn,316 sqs_create_queue,317 create_iam_role_with_policy,318 dynamodb_client,319 dynamodb_create_table,320 ):321 function_name = f"lambda_func-{short_uid()}"322 role = f"test-lambda-role-{short_uid()}"323 policy_name = f"test-lambda-policy-{short_uid()}"324 table_name = f"test-table-{short_uid()}"325 partition_key = "my_partition_key"326 item = {partition_key: {"S": "hello world"}}327 try:328 role_arn = create_iam_role_with_policy(329 RoleName=role,330 PolicyName=policy_name,331 RoleDefinition=lambda_role,332 PolicyDefinition=s3_lambda_permission,333 )334 create_lambda_function(335 handler_file=TEST_LAMBDA_PYTHON_UNHANDLED_ERROR,336 func_name=function_name,337 runtime=LAMBDA_RUNTIME_PYTHON37,338 role=role_arn,339 )340 dynamodb_create_table(table_name=table_name, partition_key=partition_key)341 _await_dynamodb_table_active(dynamodb_client, table_name)342 result = dynamodb_client.update_table(343 TableName=table_name,344 StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_IMAGE"},345 )346 stream_arn = result["TableDescription"]["LatestStreamArn"]347 destination_queue = sqs_create_queue()348 queue_failure_event_source_mapping_arn = sqs_queue_arn(destination_queue)349 destination_config = {350 "OnFailure": {"Destination": queue_failure_event_source_mapping_arn}351 }352 result = lambda_client.create_event_source_mapping(353 FunctionName=function_name,354 BatchSize=1,355 StartingPosition="LATEST",356 EventSourceArn=stream_arn,357 MaximumBatchingWindowInSeconds=1,358 MaximumRetryAttempts=1,359 DestinationConfig=destination_config,360 )361 event_source_mapping_uuid = result["UUID"]362 _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)363 dynamodb_client.put_item(TableName=table_name, Item=item)364 def verify_failure_received():365 res = sqs_client.receive_message(QueueUrl=destination_queue)366 msg = res["Messages"][0]367 body = json.loads(msg["Body"])368 assert body["requestContext"]["condition"] == "RetryAttemptsExhausted"369 assert body["DDBStreamBatchInfo"]["batchSize"] == 1370 assert body["DDBStreamBatchInfo"]["streamArn"] in stream_arn371 retry(verify_failure_received, retries=5, sleep=5, sleep_before=5)372 finally:373 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)374class TestLambdaHttpInvocation:375 def test_http_invocation_with_apigw_proxy(self, create_lambda_function):376 lambda_name = f"test_lambda_{short_uid()}"377 lambda_resource = "/api/v1/{proxy+}"378 lambda_path = "/api/v1/hello/world"379 lambda_request_context_path = "/" + TEST_STAGE_NAME + lambda_path380 lambda_request_context_resource_path = lambda_resource381 create_lambda_function(382 func_name=lambda_name,383 handler_file=TEST_LAMBDA_PYTHON,384 libs=TEST_LAMBDA_LIBS,385 )386 # create API Gateway and connect it to the Lambda proxy backend387 lambda_uri = aws_stack.lambda_function_arn(lambda_name)388 target_uri = f"arn:aws:apigateway:{aws_stack.get_region()}:lambda:path/2015-03-31/functions/{lambda_uri}/invocations"389 result = testutil.connect_api_gateway_to_http_with_lambda_proxy(390 "test_gateway2",391 target_uri,392 path=lambda_resource,393 stage_name=TEST_STAGE_NAME,394 )395 api_id = result["id"]396 url = path_based_url(api_id=api_id, stage_name=TEST_STAGE_NAME, path=lambda_path)397 result = safe_requests.post(398 url, data=b"{}", headers={"User-Agent": "python-requests/testing"}399 )400 content = json.loads(result.content)401 assert lambda_path == content["path"]402 assert lambda_resource == content["resource"]403 assert lambda_request_context_path == content["requestContext"]["path"]404 assert lambda_request_context_resource_path == content["requestContext"]["resourcePath"]405class TestKinesisSource:406 def test_create_kinesis_event_source_mapping(407 self,408 create_lambda_function,409 lambda_client,410 kinesis_client,411 kinesis_create_stream,412 lambda_su_role,413 wait_for_stream_ready,414 logs_client,415 ):416 function_name = f"lambda_func-{short_uid()}"417 stream_name = f"test-foobar-{short_uid()}"418 record_data = "hello"419 num_events_kinesis = 10420 try:421 create_lambda_function(422 func_name=function_name,423 handler_file=TEST_LAMBDA_PYTHON_ECHO,424 runtime=LAMBDA_RUNTIME_PYTHON36,425 role=lambda_su_role,426 )427 kinesis_create_stream(StreamName=stream_name, ShardCount=1)428 wait_for_stream_ready(stream_name=stream_name)429 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)430 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1431 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[432 "StreamDescription"433 ]["StreamARN"]434 uuid = lambda_client.create_event_source_mapping(435 EventSourceArn=stream_arn, FunctionName=function_name, StartingPosition="LATEST"436 )["UUID"]437 _await_event_source_mapping_enabled(lambda_client, uuid)438 kinesis_client.put_records(439 Records=[440 {"Data": record_data, "PartitionKey": f"test_{i}"}441 for i in range(0, num_events_kinesis)442 ],443 StreamName=stream_name,444 )445 events = _get_lambda_invocation_events(446 logs_client, function_name, expected_num_events=1447 )448 records = events[0]["Records"]449 assert len(records) == num_events_kinesis450 for record in records:451 assert "eventID" in record452 assert "eventSourceARN" in record453 assert "eventSource" in record454 assert "eventVersion" in record455 assert "eventName" in record456 assert "invokeIdentityArn" in record457 assert "awsRegion" in record458 assert "kinesis" in record459 actual_record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")460 assert actual_record_data == record_data461 finally:462 lambda_client.delete_event_source_mapping(UUID=uuid)463 @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)464 def test_kinesis_event_source_mapping_with_async_invocation(465 self,466 lambda_client,467 kinesis_client,468 create_lambda_function,469 kinesis_create_stream,470 wait_for_stream_ready,471 logs_client,472 lambda_su_role,473 ):474 # TODO: this test will fail if `log_cli=true` is set and `LAMBDA_EXECUTOR=local`!475 # apparently this particular configuration prevents lambda logs from being extracted properly, giving the476 # appearance that the function was never invoked.477 try:478 function_name = f"lambda_func-{short_uid()}"479 stream_name = f"test-foobar-{short_uid()}"480 num_records_per_batch = 10481 num_batches = 2482 create_lambda_function(483 handler_file=TEST_LAMBDA_PARALLEL_FILE,484 func_name=function_name,485 runtime=LAMBDA_RUNTIME_PYTHON36,486 role=lambda_su_role,487 )488 kinesis_create_stream(StreamName=stream_name, ShardCount=1)489 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[490 "StreamDescription"491 ]["StreamARN"]492 wait_for_stream_ready(stream_name=stream_name)493 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)494 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1495 uuid = lambda_client.create_event_source_mapping(496 EventSourceArn=stream_arn,497 FunctionName=function_name,498 StartingPosition="LATEST",499 BatchSize=num_records_per_batch,500 )["UUID"]501 _await_event_source_mapping_enabled(lambda_client, uuid)502 for i in range(num_batches):503 start = time.perf_counter()504 kinesis_client.put_records(505 Records=[506 {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}507 for j in range(0, num_records_per_batch)508 ],509 StreamName=stream_name,510 )511 assert (time.perf_counter() - start) < 1 # this should not take more than a second512 invocation_events = _get_lambda_invocation_events(513 logs_client, function_name, expected_num_events=num_batches514 )515 for i in range(num_batches):516 event = invocation_events[i]517 assert len(event["event"]["Records"]) == num_records_per_batch518 actual_record_ids = []519 for record in event["event"]["Records"]:520 assert "eventID" in record521 assert "eventSourceARN" in record522 assert "eventSource" in record523 assert "eventVersion" in record524 assert "eventName" in record525 assert "invokeIdentityArn" in record526 assert "awsRegion" in record527 assert "kinesis" in record528 record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")529 actual_record_id = json.loads(record_data)["record_id"]530 actual_record_ids.append(actual_record_id)531 actual_record_ids.sort()532 assert actual_record_ids == [i for i in range(num_records_per_batch)]533 assert (534 invocation_events[1]["executionStart"] - invocation_events[0]["executionStart"]535 ) > 5536 finally:537 lambda_client.delete_event_source_mapping(UUID=uuid)538 def test_kinesis_event_source_trim_horizon(539 self,540 lambda_client,541 kinesis_client,542 create_lambda_function,543 kinesis_create_stream,544 wait_for_stream_ready,545 logs_client,546 lambda_su_role,547 ):548 function_name = f"lambda_func-{short_uid()}"549 stream_name = f"test-foobar-{short_uid()}"550 num_records_per_batch = 10551 num_batches = 3552 try:553 create_lambda_function(554 handler_file=TEST_LAMBDA_PARALLEL_FILE,555 func_name=function_name,556 runtime=LAMBDA_RUNTIME_PYTHON36,557 role=lambda_su_role,558 )559 kinesis_create_stream(StreamName=stream_name, ShardCount=1)560 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[561 "StreamDescription"562 ]["StreamARN"]563 wait_for_stream_ready(stream_name=stream_name)564 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)565 assert stream_summary["StreamDescriptionSummary"]["OpenShardCount"] == 1566 # insert some records before event source mapping created567 for i in range(num_batches - 1):568 kinesis_client.put_records(569 Records=[570 {"Data": json.dumps({"record_id": j}), "PartitionKey": f"test_{i}"}571 for j in range(0, num_records_per_batch)572 ],573 StreamName=stream_name,574 )575 uuid = lambda_client.create_event_source_mapping(576 EventSourceArn=stream_arn,577 FunctionName=function_name,578 StartingPosition="TRIM_HORIZON",579 BatchSize=num_records_per_batch,580 )["UUID"]581 # insert some more records582 kinesis_client.put_records(583 Records=[584 {"Data": json.dumps({"record_id": i}), "PartitionKey": f"test_{num_batches}"}585 for i in range(0, num_records_per_batch)586 ],587 StreamName=stream_name,588 )589 invocation_events = _get_lambda_invocation_events(590 logs_client, function_name, expected_num_events=num_batches591 )592 for i in range(num_batches):593 event = invocation_events[i]594 assert len(event["event"]["Records"]) == num_records_per_batch595 actual_record_ids = []596 for record in event["event"]["Records"]:597 assert "eventID" in record598 assert "eventSourceARN" in record599 assert "eventSource" in record600 assert "eventVersion" in record601 assert "eventName" in record602 assert "invokeIdentityArn" in record603 assert "awsRegion" in record604 assert "kinesis" in record605 record_data = base64.b64decode(record["kinesis"]["data"]).decode("utf-8")606 actual_record_id = json.loads(record_data)["record_id"]607 actual_record_ids.append(actual_record_id)608 actual_record_ids.sort()609 assert actual_record_ids == [i for i in range(num_records_per_batch)]610 finally:611 lambda_client.delete_event_source_mapping(UUID=uuid)612 def test_disable_kinesis_event_source_mapping(613 self,614 lambda_client,615 kinesis_client,616 create_lambda_function,617 kinesis_create_stream,618 wait_for_stream_ready,619 logs_client,620 lambda_su_role,621 ):622 function_name = f"lambda_func-{short_uid()}"623 stream_name = f"test-foobar-{short_uid()}"624 num_records_per_batch = 10625 try:626 create_lambda_function(627 handler_file=TEST_LAMBDA_PYTHON_ECHO,628 func_name=function_name,629 runtime=LAMBDA_RUNTIME_PYTHON36,630 role=lambda_su_role,631 )632 kinesis_create_stream(StreamName=stream_name, ShardCount=1)633 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[634 "StreamDescription"635 ]["StreamARN"]636 wait_for_stream_ready(stream_name=stream_name)637 event_source_uuid = lambda_client.create_event_source_mapping(638 EventSourceArn=stream_arn,639 FunctionName=function_name,640 StartingPosition="LATEST",641 BatchSize=num_records_per_batch,642 )["UUID"]643 _await_event_source_mapping_enabled(lambda_client, event_source_uuid)644 kinesis_client.put_records(645 Records=[646 {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}647 for i in range(0, num_records_per_batch)648 ],649 StreamName=stream_name,650 )651 events = _get_lambda_invocation_events(652 logs_client, function_name, expected_num_events=1653 )654 assert len(events) == 1655 lambda_client.update_event_source_mapping(UUID=event_source_uuid, Enabled=False)656 time.sleep(2)657 kinesis_client.put_records(658 Records=[659 {"Data": json.dumps({"record_id": i}), "PartitionKey": "test"}660 for i in range(0, num_records_per_batch)661 ],662 StreamName=stream_name,663 )664 time.sleep(7) # wait for records to pass through stream665 # should still only get the first batch from before mapping was disabled666 events = _get_lambda_invocation_events(667 logs_client, function_name, expected_num_events=1668 )669 assert len(events) == 1670 finally:671 lambda_client.delete_event_source_mapping(UUID=event_source_uuid)672 def test_kinesis_event_source_mapping_with_on_failure_destination_config(673 self,674 lambda_client,675 create_lambda_function,676 sqs_client,677 sqs_queue_arn,678 sqs_create_queue,679 create_iam_role_with_policy,680 kinesis_client,681 wait_for_stream_ready,682 ):683 try:684 function_name = f"lambda_func-{short_uid()}"685 role = f"test-lambda-role-{short_uid()}"686 policy_name = f"test-lambda-policy-{short_uid()}"687 kinesis_name = f"test-kinesis-{short_uid()}"688 role_arn = create_iam_role_with_policy(689 RoleName=role,690 PolicyName=policy_name,691 RoleDefinition=lambda_role,692 PolicyDefinition=s3_lambda_permission,693 )694 create_lambda_function(695 handler_file=TEST_LAMBDA_PYTHON,696 func_name=function_name,697 runtime=LAMBDA_RUNTIME_PYTHON37,698 role=role_arn,699 )700 kinesis_client.create_stream(StreamName=kinesis_name, ShardCount=1)701 result = kinesis_client.describe_stream(StreamName=kinesis_name)["StreamDescription"]702 kinesis_arn = result["StreamARN"]703 wait_for_stream_ready(stream_name=kinesis_name)704 queue_event_source_mapping = sqs_create_queue()705 destination_queue = sqs_queue_arn(queue_event_source_mapping)706 destination_config = {"OnFailure": {"Destination": destination_queue}}707 message = {708 "input": "hello",709 "value": "world",710 lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1,711 }712 result = lambda_client.create_event_source_mapping(713 FunctionName=function_name,714 BatchSize=1,715 StartingPosition="LATEST",716 EventSourceArn=kinesis_arn,717 MaximumBatchingWindowInSeconds=1,718 MaximumRetryAttempts=1,719 DestinationConfig=destination_config,720 )721 event_source_mapping_uuid = result["UUID"]722 _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)723 kinesis_client.put_record(724 StreamName=kinesis_name, Data=to_bytes(json.dumps(message)), PartitionKey="custom"725 )726 def verify_failure_received():727 result = sqs_client.receive_message(QueueUrl=queue_event_source_mapping)728 msg = result["Messages"][0]729 body = json.loads(msg["Body"])730 assert body["requestContext"]["condition"] == "RetryAttemptsExhausted"731 assert body["KinesisBatchInfo"]["batchSize"] == 1732 assert body["KinesisBatchInfo"]["streamArn"] == kinesis_arn733 retry(verify_failure_received, retries=50, sleep=5, sleep_before=5)734 finally:735 kinesis_client.delete_stream(StreamName=kinesis_name, EnforceConsumerDeletion=True)736 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)737def _await_event_source_mapping_enabled(lambda_client, uuid, retries=30):738 def assert_mapping_enabled():739 assert lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled"740 retry(assert_mapping_enabled, sleep_before=2, retries=retries)741def _await_dynamodb_table_active(dynamodb_client, table_name, retries=6):742 def assert_table_active():743 assert (744 dynamodb_client.describe_table(TableName=table_name)["Table"]["TableStatus"] == "ACTIVE"745 )746 retry(assert_table_active, retries=retries, sleep_before=2)747def _get_lambda_invocation_events(logs_client, function_name, expected_num_events, retries=30):748 def get_events():749 events = get_lambda_log_events(function_name, logs_client=logs_client)750 assert len(events) == expected_num_events751 return events...
test_lambda_integration_dynamodbstreams.py
Source:test_lambda_integration_dynamodbstreams.py
...116 table_name=table_name, partition_key=partition_key117 )118 # snapshot create table to get the table name registered as resource119 snapshot.match("create-table-result", create_table_result)120 _await_dynamodb_table_active(dynamodb_client, table_name)121 stream_arn = dynamodb_client.update_table(122 TableName=table_name,123 StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_IMAGE"},124 )["TableDescription"]["LatestStreamArn"]125 assert wait_for_dynamodb_stream_enabled(stream_arn)126 create_event_source_mapping_response = lambda_client.create_event_source_mapping(127 FunctionName=function_name,128 BatchSize=1,129 StartingPosition="TRIM_HORIZON", # TODO investigate how to get it back to LATEST130 EventSourceArn=stream_arn,131 MaximumBatchingWindowInSeconds=1,132 MaximumRetryAttempts=1,133 )134 snapshot.match("create_event_source_mapping_response", create_event_source_mapping_response)135 event_source_uuid = create_event_source_mapping_response["UUID"]136 cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=event_source_uuid))137 _await_event_source_mapping_enabled(lambda_client, event_source_uuid)138 def _send_and_receive_events():139 dynamodb_client.put_item(TableName=table_name, Item=db_item)140 return get_lambda_logs_event(141 function_name=function_name, expected_num_events=1, retries=20142 )143 event_logs = retry(_send_and_receive_events, retries=3)144 snapshot.match("event_logs", event_logs)145 @pytest.mark.aws_validated146 def test_disabled_dynamodb_event_source_mapping(147 self,148 create_lambda_function,149 lambda_client,150 dynamodb_resource,151 dynamodb_create_table,152 logs_client,153 dynamodbstreams_client,154 lambda_su_role,155 cleanups,156 wait_for_dynamodb_stream_enabled,157 snapshot,158 ):159 function_name = f"lambda_func-{short_uid()}"160 ddb_table = f"ddb_table-{short_uid()}"161 items = [162 {"id": short_uid(), "data": "data1"},163 {"id": short_uid(), "data": "data2"},164 ]165 create_lambda_function(166 func_name=function_name,167 handler_file=TEST_LAMBDA_PYTHON_ECHO,168 runtime=LAMBDA_RUNTIME_PYTHON37,169 role=lambda_su_role,170 )171 dynamodb_create_table_result = dynamodb_create_table(172 table_name=ddb_table, partition_key="id", stream_view_type="NEW_IMAGE"173 )174 latest_stream_arn = dynamodb_create_table_result["TableDescription"]["LatestStreamArn"]175 snapshot.match("dynamodb_create_table_result", dynamodb_create_table_result)176 rs = lambda_client.create_event_source_mapping(177 FunctionName=function_name,178 EventSourceArn=latest_stream_arn,179 StartingPosition="TRIM_HORIZON",180 MaximumBatchingWindowInSeconds=1,181 )182 snapshot.match("create_event_source_mapping_result", rs)183 uuid = rs["UUID"]184 cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=uuid))185 _await_event_source_mapping_enabled(lambda_client, uuid)186 assert wait_for_dynamodb_stream_enabled(latest_stream_arn)187 table = dynamodb_resource.Table(ddb_table)188 table.put_item(Item=items[0])189 # Lambda should be invoked 1 time190 retry(191 check_expected_lambda_log_events_length,192 retries=10,193 sleep=3,194 function_name=function_name,195 expected_length=1,196 logs_client=logs_client,197 )198 # disable event source mapping199 update_event_source_mapping_result = lambda_client.update_event_source_mapping(200 UUID=uuid, Enabled=False201 )202 snapshot.match("update_event_source_mapping_result", update_event_source_mapping_result)203 time.sleep(2)204 table.put_item(Item=items[1])205 # lambda no longer invoked, still have 1 event206 check_expected_lambda_log_events_length(207 expected_length=1, function_name=function_name, logs_client=logs_client208 )209 @pytest.mark.aws_validated210 def test_deletion_event_source_mapping_with_dynamodb(211 self,212 create_lambda_function,213 lambda_client,214 dynamodb_client,215 lambda_su_role,216 snapshot,217 cleanups,218 dynamodb_create_table,219 ):220 function_name = f"lambda_func-{short_uid()}"221 ddb_table = f"ddb_table-{short_uid()}"222 create_lambda_function(223 func_name=function_name,224 handler_file=TEST_LAMBDA_PYTHON_ECHO,225 runtime=LAMBDA_RUNTIME_PYTHON39,226 role=lambda_su_role,227 )228 create_dynamodb_table_response = dynamodb_create_table(229 table_name=ddb_table,230 partition_key="id",231 client=dynamodb_client,232 stream_view_type="NEW_IMAGE",233 )234 snapshot.match("create_dynamodb_table_response", create_dynamodb_table_response)235 latest_stream_arn = create_dynamodb_table_response["TableDescription"]["LatestStreamArn"]236 result = lambda_client.create_event_source_mapping(237 FunctionName=function_name,238 EventSourceArn=latest_stream_arn,239 StartingPosition="TRIM_HORIZON",240 )241 snapshot.match("create_event_source_mapping_result", result)242 event_source_mapping_uuid = result["UUID"]243 cleanups.append(244 lambda: lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)245 )246 _await_dynamodb_table_active(dynamodb_client, ddb_table)247 dynamodb_client.delete_table(TableName=ddb_table)248 result = lambda_client.list_event_source_mappings(EventSourceArn=latest_stream_arn)249 snapshot.match("list_event_source_mapping_result", result)250 @pytest.mark.aws_validated251 # FIXME last three skip verification entries are purely due to numbering mismatches252 @pytest.mark.skip_snapshot_verify(253 condition=is_old_provider,254 paths=[255 "$..Messages..Body.requestContext.approximateInvokeCount",256 "$..Messages..Body.requestContext.functionArn",257 "$..Messages..Body.requestContext.requestId",258 "$..Messages..Body.responseContext.statusCode",259 "$..Messages..MessageId",260 "$..TableDescription.TableId",261 "$..FunctionArn",262 "$..UUID",263 ],264 )265 def test_dynamodb_event_source_mapping_with_on_failure_destination_config(266 self,267 lambda_client,268 create_lambda_function,269 sqs_client,270 sqs_queue_arn,271 sqs_create_queue,272 create_iam_role_with_policy,273 dynamodb_client,274 dynamodb_create_table,275 snapshot,276 cleanups,277 ):278 snapshot.add_transformer(snapshot.transform.key_value("MD5OfBody"))279 snapshot.add_transformer(snapshot.transform.key_value("ReceiptHandle"))280 snapshot.add_transformer(snapshot.transform.key_value("startSequenceNumber"))281 function_name = f"lambda_func-{short_uid()}"282 role = f"test-lambda-role-{short_uid()}"283 policy_name = f"test-lambda-policy-{short_uid()}"284 table_name = f"test-table-{short_uid()}"285 partition_key = "my_partition_key"286 item = {partition_key: {"S": "hello world"}}287 role_arn = create_iam_role_with_policy(288 RoleName=role,289 PolicyName=policy_name,290 RoleDefinition=lambda_role,291 PolicyDefinition=s3_lambda_permission,292 )293 create_lambda_function(294 handler_file=TEST_LAMBDA_PYTHON_UNHANDLED_ERROR,295 func_name=function_name,296 runtime=LAMBDA_RUNTIME_PYTHON39,297 role=role_arn,298 )299 dynamodb_create_table(table_name=table_name, partition_key=partition_key)300 _await_dynamodb_table_active(dynamodb_client, table_name)301 update_table_response = dynamodb_client.update_table(302 TableName=table_name,303 StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_IMAGE"},304 )305 snapshot.match("update_table_response", update_table_response)306 stream_arn = update_table_response["TableDescription"]["LatestStreamArn"]307 destination_queue = sqs_create_queue()308 queue_failure_event_source_mapping_arn = sqs_queue_arn(destination_queue)309 destination_config = {"OnFailure": {"Destination": queue_failure_event_source_mapping_arn}}310 create_event_source_mapping_response = lambda_client.create_event_source_mapping(311 FunctionName=function_name,312 BatchSize=1,313 StartingPosition="TRIM_HORIZON",314 EventSourceArn=stream_arn,315 MaximumBatchingWindowInSeconds=1,316 MaximumRetryAttempts=1,317 DestinationConfig=destination_config,318 )319 snapshot.match("create_event_source_mapping_response", create_event_source_mapping_response)320 event_source_mapping_uuid = create_event_source_mapping_response["UUID"]321 cleanups.append(322 lambda: lambda_client.delete_event_source_mapping(UUID=event_source_mapping_uuid)323 )324 _await_event_source_mapping_enabled(lambda_client, event_source_mapping_uuid)325 dynamodb_client.put_item(TableName=table_name, Item=item)326 def verify_failure_received():327 res = sqs_client.receive_message(QueueUrl=destination_queue)328 assert res.get("Messages")329 return res330 messages = retry(verify_failure_received, retries=15, sleep=5, sleep_before=5)331 snapshot.match("destination_queue_messages", messages)332 @pytest.mark.aws_validated333 @pytest.mark.parametrize(334 "item_to_put1, item_to_put2, filter, calls",335 [336 # Test with filter, and two times same entry337 (338 {"id": {"S": "test123"}, "id2": {"S": "test42"}},339 None,340 {"eventName": ["INSERT"]},341 1,342 ),343 # Test with OR filter344 (345 {"id": {"S": "test123"}},346 {"id": {"S": "test123"}, "id2": {"S": "42test"}},347 {"eventName": ["INSERT", "MODIFY"]},348 2,349 ),350 # Test with 2 filters (AND), and two times same entry (second time modified aka MODIFY eventName)351 (352 {"id": {"S": "test123"}},353 {"id": {"S": "test123"}, "id2": {"S": "42test"}},354 {"eventName": ["INSERT"], "eventSource": ["aws:dynamodb"]},355 1,356 ),357 # Test exists filter358 (359 {"id": {"S": "test123"}},360 {"id": {"S": "test1234"}, "presentKey": {"S": "test123"}},361 {"dynamodb": {"NewImage": {"presentKey": [{"exists": False}]}}},362 1,363 ),364 # numeric filters365 # NOTE: numeric filters seem not to work with DynamoDB as the values are represented as string366 # and it looks like that there is no conversion happening367 # I leave the test here in case this changes in future.368 (369 {"id": {"S": "test123"}, "numericFilter": {"N": "123"}},370 {"id": {"S": "test1234"}, "numericFilter": {"N": "12"}},371 {"dynamodb": {"NewImage": {"numericFilter": {"N": [{"numeric": [">", 100]}]}}}},372 0,373 ),374 (375 {"id": {"S": "test123"}, "numericFilter": {"N": "100"}},376 {"id": {"S": "test1234"}, "numericFilter": {"N": "12"}},377 {378 "dynamodb": {379 "NewImage": {"numericFilter": {"N": [{"numeric": [">=", 100, "<", 200]}]}}380 }381 },382 0,383 ),384 # Prefix385 (386 {"id": {"S": "test123"}, "prefix": {"S": "us-1-testtest"}},387 {"id": {"S": "test1234"}, "prefix": {"S": "testtest"}},388 {"dynamodb": {"NewImage": {"prefix": {"S": [{"prefix": "us-1"}]}}}},389 1,390 ),391 ],392 )393 def test_dynamodb_event_filter(394 self,395 create_lambda_function,396 lambda_client,397 dynamodb_client,398 dynamodb_create_table,399 lambda_su_role,400 logs_client,401 wait_for_dynamodb_stream_ready,402 filter,403 calls,404 item_to_put1,405 item_to_put2,406 cleanups,407 snapshot,408 ):409 function_name = f"lambda_func-{short_uid()}"410 table_name = f"test-table-{short_uid()}"411 max_retries = 50412 create_lambda_function(413 handler_file=TEST_LAMBDA_PYTHON_ECHO,414 func_name=function_name,415 runtime=LAMBDA_RUNTIME_PYTHON37,416 role=lambda_su_role,417 )418 table_creation_response = dynamodb_create_table(table_name=table_name, partition_key="id")419 snapshot.match("table_creation_response", table_creation_response)420 _await_dynamodb_table_active(dynamodb_client, table_name)421 stream_arn = dynamodb_client.update_table(422 TableName=table_name,423 StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"},424 )["TableDescription"]["LatestStreamArn"]425 wait_for_dynamodb_stream_ready(stream_arn)426 event_source_mapping_kwargs = {427 "FunctionName": function_name,428 "BatchSize": 1,429 "StartingPosition": "TRIM_HORIZON",430 "EventSourceArn": stream_arn,431 "MaximumBatchingWindowInSeconds": 1,432 "MaximumRetryAttempts": 1,433 }434 event_source_mapping_kwargs.update(435 FilterCriteria={436 "Filters": [437 {"Pattern": json.dumps(filter)},438 ]439 }440 )441 create_event_source_mapping_response = lambda_client.create_event_source_mapping(442 **event_source_mapping_kwargs443 )444 event_source_uuid = create_event_source_mapping_response["UUID"]445 cleanups.append(lambda: lambda_client.delete_event_source_mapping(UUID=event_source_uuid))446 snapshot.match("create_event_source_mapping_response", create_event_source_mapping_response)447 _await_event_source_mapping_enabled(lambda_client, event_source_uuid)448 dynamodb_client.put_item(TableName=table_name, Item=item_to_put1)449 def assert_lambda_called():450 events = get_lambda_log_events(function_name, logs_client=logs_client)451 if calls > 0:452 assert len(events) == 1453 else:454 # negative test for 'numeric' filter455 assert len(events) == 0456 return events457 events = retry(assert_lambda_called, retries=max_retries)458 snapshot.match("lambda-log-events", events)459 # Following lines are relevant if variables are set via parametrize460 if item_to_put2:461 # putting a new item (item_to_put2) a second time is a 'INSERT' request462 dynamodb_client.put_item(TableName=table_name, Item=item_to_put2)463 else:464 # putting the same item (item_to_put1) a second time is a 'MODIFY' request (at least in Localstack)465 dynamodb_client.put_item(TableName=table_name, Item=item_to_put1)466 # depending on the parametrize values the filter (and the items to put) the lambda might be called multiple times467 if calls > 1:468 def assert_events_called_multiple():469 events = get_lambda_log_events(function_name, logs_client=logs_client)470 assert len(events) == calls471 return events472 # lambda was called a second time, so new records should be found473 events = retry(assert_events_called_multiple, retries=max_retries)474 else:475 # lambda wasn't called a second time, so no new records should be found476 events = retry(assert_lambda_called, retries=max_retries)477 snapshot.match("lambda-multiple-log-events", events)478 @pytest.mark.aws_validated479 @pytest.mark.parametrize(480 "filter",481 [482 "single-string",483 '[{"eventName": ["INSERT"=123}]',484 ],485 )486 def test_dynamodb_invalid_event_filter(487 self,488 create_lambda_function,489 lambda_client,490 dynamodb_client,491 dynamodb_create_table,492 lambda_su_role,493 wait_for_dynamodb_stream_ready,494 filter,495 snapshot,496 ):497 function_name = f"lambda_func-{short_uid()}"498 table_name = f"test-table-{short_uid()}"499 create_lambda_function(500 handler_file=TEST_LAMBDA_PYTHON_ECHO,501 func_name=function_name,502 runtime=LAMBDA_RUNTIME_PYTHON37,503 role=lambda_su_role,504 )505 dynamodb_create_table(table_name=table_name, partition_key="id")506 _await_dynamodb_table_active(dynamodb_client, table_name)507 stream_arn = dynamodb_client.update_table(508 TableName=table_name,509 StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"},510 )["TableDescription"]["LatestStreamArn"]511 wait_for_dynamodb_stream_ready(stream_arn)512 event_source_mapping_kwargs = {513 "FunctionName": function_name,514 "BatchSize": 1,515 "StartingPosition": "TRIM_HORIZON",516 "EventSourceArn": stream_arn,517 "MaximumBatchingWindowInSeconds": 1,518 "MaximumRetryAttempts": 1,519 "FilterCriteria": {520 "Filters": [...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!