How to use dynamodbstreams_client method in localstack

Best Python code snippet using localstack_python

test_dynamodb.py

Source:test_dynamodb.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import json3import re4from datetime import datetime5from time import sleep6from typing import Dict7import pytest8from boto3.dynamodb.conditions import Key9from boto3.dynamodb.types import STRING10from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON3611from localstack.services.dynamodbstreams.dynamodbstreams_api import get_kinesis_stream_name12from localstack.utils import testutil13from localstack.utils.aws import aws_stack14from localstack.utils.common import json_safe, long_uid, retry, short_uid15from localstack.utils.testutil import check_expected_lambda_log_events_length16from .awslambda.test_lambda import TEST_LAMBDA_PYTHON_ECHO17from .test_kinesis import get_shard_iterator18PARTITION_KEY = "id"19TEST_DDB_TABLE_NAME = "test-ddb-table-1"20TEST_DDB_TABLE_NAME_2 = "test-ddb-table-2"21TEST_DDB_TABLE_NAME_3 = "test-ddb-table-3"22TEST_DDB_TAGS = [23 {"Key": "Name", "Value": "test-table"},24 {"Key": "TestKey", "Value": "true"},25]26@pytest.fixture()27def dynamodb(dynamodb_resource):28 return dynamodb_resource29class TestDynamoDB:30 def test_non_ascii_chars(self, dynamodb):31 aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY)32 table = dynamodb.Table(TEST_DDB_TABLE_NAME)33 # write some items containing non-ASCII characters34 items = {35 "id1": {PARTITION_KEY: "id1", "data": "foobar123 ✓"},36 "id2": {PARTITION_KEY: "id2", "data": "foobar123 £"},37 "id3": {PARTITION_KEY: "id3", "data": "foobar123 ¢"},38 }39 for k, item in items.items():40 table.put_item(Item=item)41 for item_id in items.keys():42 item = table.get_item(Key={PARTITION_KEY: item_id})["Item"]43 # need to fix up the JSON and convert str to unicode for Python 244 item1 = json_safe(item)45 item2 = json_safe(items[item_id])46 assert item1 == item247 # clean up48 delete_table(TEST_DDB_TABLE_NAME)49 def test_large_data_download(self, dynamodb):50 aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME_2, partition_key=PARTITION_KEY)51 table = dynamodb.Table(TEST_DDB_TABLE_NAME_2)52 # Create a large amount of items53 num_items = 2054 for i in range(0, num_items):55 item = {PARTITION_KEY: "id%s" % i, "data1": "foobar123 " * 1000}56 table.put_item(Item=item)57 # Retrieve the items. The data will be transmitted to the client with chunked transfer encoding58 result = table.scan(TableName=TEST_DDB_TABLE_NAME_2)59 assert len(result["Items"]) == num_items60 # clean up61 delete_table(TEST_DDB_TABLE_NAME_2)62 def test_time_to_live(self, dynamodb):63 aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME_3, partition_key=PARTITION_KEY)64 table = dynamodb.Table(TEST_DDB_TABLE_NAME_3)65 # Insert some items to the table66 items = {67 "id1": {PARTITION_KEY: "id1", "data": "IT IS"},68 "id2": {PARTITION_KEY: "id2", "data": "TIME"},69 "id3": {PARTITION_KEY: "id3", "data": "TO LIVE!"},70 }71 for k, item in items.items():72 table.put_item(Item=item)73 # Describe TTL when still unset74 response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)75 assert response.status_code == 20076 assert (77 json.loads(response._content)["TimeToLiveDescription"]["TimeToLiveStatus"] == "DISABLED"78 )79 # Enable TTL for given table80 response = testutil.send_update_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3, True)81 assert response.status_code == 20082 assert json.loads(response._content)["TimeToLiveSpecification"]["Enabled"]83 # Describe TTL status after being enabled.84 response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)85 assert response.status_code == 20086 assert (87 json.loads(response._content)["TimeToLiveDescription"]["TimeToLiveStatus"] == "ENABLED"88 )89 # Disable TTL for given table90 response = testutil.send_update_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3, False)91 assert response.status_code == 20092 assert not json.loads(response._content)["TimeToLiveSpecification"]["Enabled"]93 # Describe TTL status after being disabled.94 response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)95 assert response.status_code == 20096 assert (97 json.loads(response._content)["TimeToLiveDescription"]["TimeToLiveStatus"] == "DISABLED"98 )99 # Enable TTL for given table again100 response = testutil.send_update_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3, True)101 assert response.status_code == 200102 assert json.loads(response._content)["TimeToLiveSpecification"]["Enabled"]103 # Describe TTL status after being enabled again.104 response = testutil.send_describe_dynamodb_ttl_request(TEST_DDB_TABLE_NAME_3)105 assert response.status_code == 200106 assert (107 json.loads(response._content)["TimeToLiveDescription"]["TimeToLiveStatus"] == "ENABLED"108 )109 # clean up110 delete_table(TEST_DDB_TABLE_NAME_3)111 def test_list_tags_of_resource(self, dynamodb):112 table_name = "ddb-table-%s" % short_uid()113 dynamodb = aws_stack.create_external_boto_client("dynamodb")114 rs = dynamodb.create_table(115 TableName=table_name,116 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],117 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],118 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},119 Tags=TEST_DDB_TAGS,120 )121 table_arn = rs["TableDescription"]["TableArn"]122 rs = dynamodb.list_tags_of_resource(ResourceArn=table_arn)123 assert rs["Tags"] == TEST_DDB_TAGS124 dynamodb.tag_resource(ResourceArn=table_arn, Tags=[{"Key": "NewKey", "Value": "TestValue"}])125 rs = dynamodb.list_tags_of_resource(ResourceArn=table_arn)126 assert len(rs["Tags"]) == len(TEST_DDB_TAGS) + 1127 tags = {tag["Key"]: tag["Value"] for tag in rs["Tags"]}128 assert "NewKey" in tags.keys()129 assert tags["NewKey"] == "TestValue"130 dynamodb.untag_resource(ResourceArn=table_arn, TagKeys=["Name", "NewKey"])131 rs = dynamodb.list_tags_of_resource(ResourceArn=table_arn)132 tags = {tag["Key"]: tag["Value"] for tag in rs["Tags"]}133 assert "Name" not in tags.keys()134 assert "NewKey" not in tags.keys()135 delete_table(table_name)136 def test_stream_spec_and_region_replacement(self, dynamodb):137 ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")138 kinesis = aws_stack.create_external_boto_client("kinesis")139 table_name = f"ddb-{short_uid()}"140 aws_stack.create_dynamodb_table(141 table_name,142 partition_key=PARTITION_KEY,143 stream_view_type="NEW_AND_OLD_IMAGES",144 )145 table = dynamodb.Table(table_name)146 # assert ARN formats147 expected_arn_prefix = "arn:aws:dynamodb:" + aws_stack.get_local_region()148 assert table.table_arn.startswith(expected_arn_prefix)149 assert table.latest_stream_arn.startswith(expected_arn_prefix)150 # assert stream has been created151 stream_tables = [s["TableName"] for s in ddbstreams.list_streams()["Streams"]]152 assert table_name in stream_tables153 stream_name = get_kinesis_stream_name(table_name)154 assert stream_name in kinesis.list_streams()["StreamNames"]155 # assert shard ID formats156 result = ddbstreams.describe_stream(StreamArn=table.latest_stream_arn)["StreamDescription"]157 assert "Shards" in result158 for shard in result["Shards"]:159 assert re.match(r"^shardId-[0-9]{20}-[a-zA-Z0-9]{1,36}$", shard["ShardId"])160 # clean up161 delete_table(table_name)162 def _assert_stream_deleted():163 stream_tables = [s["TableName"] for s in ddbstreams.list_streams()["Streams"]]164 assert table_name not in stream_tables165 assert stream_name not in kinesis.list_streams()["StreamNames"]166 # assert stream has been deleted167 retry(_assert_stream_deleted, sleep=0.4, retries=5)168 def test_multiple_update_expressions(self, dynamodb):169 dynamodb_client = aws_stack.create_external_boto_client("dynamodb")170 aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY)171 table = dynamodb.Table(TEST_DDB_TABLE_NAME)172 item_id = short_uid()173 table.put_item(Item={PARTITION_KEY: item_id, "data": "foobar123 ✓"})174 response = dynamodb_client.update_item(175 TableName=TEST_DDB_TABLE_NAME,176 Key={PARTITION_KEY: {"S": item_id}},177 UpdateExpression="SET attr1 = :v1, attr2 = :v2",178 ExpressionAttributeValues={":v1": {"S": "value1"}, ":v2": {"S": "value2"}},179 )180 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200181 item = table.get_item(Key={PARTITION_KEY: item_id})["Item"]182 assert item["attr1"] == "value1"183 assert item["attr2"] == "value2"184 attributes = [{"AttributeName": "id", "AttributeType": STRING}]185 user_id_idx = [186 {187 "Create": {188 "IndexName": "id-index",189 "KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],190 "Projection": {191 "ProjectionType": "INCLUDE",192 "NonKeyAttributes": ["data"],193 },194 "ProvisionedThroughput": {195 "ReadCapacityUnits": 5,196 "WriteCapacityUnits": 5,197 },198 }199 },200 ]201 # for each index202 table.update(AttributeDefinitions=attributes, GlobalSecondaryIndexUpdates=user_id_idx)203 with pytest.raises(Exception) as ctx:204 table.query(205 TableName=TEST_DDB_TABLE_NAME,206 IndexName="id-index",207 KeyConditionExpression=Key(PARTITION_KEY).eq(item_id),208 Select="ALL_ATTRIBUTES",209 )210 assert ctx.match("ValidationException")211 def test_invalid_query_index(self, dynamodb):212 """Raises an exception when a query requests ALL_ATTRIBUTES,213 but the index does not have a ProjectionType of ALL"""214 table_name = f"test-table-{short_uid()}"215 table = dynamodb.create_table(216 TableName=table_name,217 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],218 AttributeDefinitions=[219 {"AttributeName": "id", "AttributeType": "S"},220 {"AttributeName": "field_a", "AttributeType": "S"},221 ],222 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},223 Tags=TEST_DDB_TAGS,224 GlobalSecondaryIndexes=[225 {226 "IndexName": "field_a_index",227 "KeySchema": [{"AttributeName": "field_a", "KeyType": "HASH"}],228 "Projection": {"ProjectionType": "KEYS_ONLY"},229 "ProvisionedThroughput": {230 "ReadCapacityUnits": 1,231 "WriteCapacityUnits": 1,232 },233 },234 ],235 )236 with pytest.raises(Exception) as ctx:237 table.query(238 TableName=table_name,239 IndexName="field_a_index",240 KeyConditionExpression=Key("field_a").eq("xyz"),241 Select="ALL_ATTRIBUTES",242 )243 assert ctx.match("ValidationException")244 # clean up245 delete_table(table_name)246 def test_valid_query_index(self, dynamodb):247 """Query requests ALL_ATTRIBUTES and the named index has a ProjectionType of ALL,248 no exception should be raised."""249 table_name = f"test-table-{short_uid()}"250 table = dynamodb.create_table(251 TableName=table_name,252 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],253 AttributeDefinitions=[254 {"AttributeName": "id", "AttributeType": "S"},255 {"AttributeName": "field_a", "AttributeType": "S"},256 {"AttributeName": "field_b", "AttributeType": "S"},257 ],258 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},259 Tags=TEST_DDB_TAGS,260 GlobalSecondaryIndexes=[261 {262 "IndexName": "field_a_index",263 "KeySchema": [{"AttributeName": "field_a", "KeyType": "HASH"}],264 "Projection": {"ProjectionType": "KEYS_ONLY"},265 "ProvisionedThroughput": {266 "ReadCapacityUnits": 1,267 "WriteCapacityUnits": 1,268 },269 },270 {271 "IndexName": "field_b_index",272 "KeySchema": [{"AttributeName": "field_b", "KeyType": "HASH"}],273 "Projection": {"ProjectionType": "ALL"},274 "ProvisionedThroughput": {275 "ReadCapacityUnits": 1,276 "WriteCapacityUnits": 1,277 },278 },279 ],280 )281 table.query(282 TableName=table_name,283 IndexName="field_b_index",284 KeyConditionExpression=Key("field_b").eq("xyz"),285 Select="ALL_ATTRIBUTES",286 )287 # clean up288 delete_table(table_name)289 def test_valid_local_secondary_index(290 self, dynamodb_client, dynamodb_create_table_with_parameters, dynamodb_wait_for_table_active291 ):292 try:293 table_name = f"test-table-{short_uid()}"294 dynamodb_create_table_with_parameters(295 TableName=table_name,296 KeySchema=[297 {"AttributeName": "PK", "KeyType": "HASH"},298 {"AttributeName": "SK", "KeyType": "RANGE"},299 ],300 AttributeDefinitions=[301 {"AttributeName": "PK", "AttributeType": "S"},302 {"AttributeName": "SK", "AttributeType": "S"},303 {"AttributeName": "LSI1SK", "AttributeType": "N"},304 ],305 LocalSecondaryIndexes=[306 {307 "IndexName": "LSI1",308 "KeySchema": [309 {"AttributeName": "PK", "KeyType": "HASH"},310 {"AttributeName": "LSI1SK", "KeyType": "RANGE"},311 ],312 "Projection": {"ProjectionType": "ALL"},313 }314 ],315 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},316 Tags=TEST_DDB_TAGS,317 )318 dynamodb_wait_for_table_active(table_name)319 item = {"SK": {"S": "hello"}, "LSI1SK": {"N": "123"}, "PK": {"S": "test one"}}320 dynamodb_client.put_item(TableName=table_name, Item=item)321 result = dynamodb_client.query(322 TableName=table_name,323 IndexName="LSI1",324 KeyConditionExpression="PK = :v1",325 ExpressionAttributeValues={":v1": {"S": "test one"}},326 Select="ALL_ATTRIBUTES",327 )328 assert result["Items"] == [item]329 finally:330 dynamodb_client.delete_table(TableName=table_name)331 def test_more_than_20_global_secondary_indexes(self, dynamodb, dynamodb_client):332 table_name = f"test-table-{short_uid()}"333 num_gsis = 25334 attrs = [{"AttributeName": f"a{i}", "AttributeType": "S"} for i in range(num_gsis)]335 gsis = [336 {337 "IndexName": f"gsi_{i}",338 "KeySchema": [{"AttributeName": f"a{i}", "KeyType": "HASH"}],339 "Projection": {"ProjectionType": "ALL"},340 }341 for i in range(num_gsis)342 ]343 dynamodb.create_table(344 TableName=table_name,345 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],346 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}, *attrs],347 GlobalSecondaryIndexes=gsis,348 BillingMode="PAY_PER_REQUEST",349 )350 table = dynamodb_client.describe_table(TableName=table_name)351 assert len(table["Table"]["GlobalSecondaryIndexes"]) == num_gsis352 # clean up353 delete_table(table_name)354 @pytest.mark.aws_validated355 def test_return_values_in_put_item(self, dynamodb, dynamodb_client):356 aws_stack.create_dynamodb_table(357 TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY, client=dynamodb_client358 )359 table = dynamodb.Table(TEST_DDB_TABLE_NAME)360 def _validate_response(response, expected: dict = {}):361 """362 Validates the response against the optionally expected one.363 It checks that the response doesn't contain `Attributes`,364 `ConsumedCapacity` and `ItemCollectionMetrics` unless they are expected.365 """366 should_not_contain = {367 "Attributes",368 "ConsumedCapacity",369 "ItemCollectionMetrics",370 } - expected.keys()371 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200372 assert expected.items() <= response.items()373 assert response.keys().isdisjoint(should_not_contain)374 # items which are being used to put in the table375 item1 = {PARTITION_KEY: "id1", "data": "foobar"}376 item1b = {PARTITION_KEY: "id1", "data": "barfoo"}377 item2 = {PARTITION_KEY: "id2", "data": "foobar"}378 response = table.put_item(Item=item1, ReturnValues="ALL_OLD")379 # there is no data present in the table already so even if return values380 # is set to 'ALL_OLD' as there is no data it will not return any data.381 _validate_response(response)382 # now the same data is present so when we pass return values as 'ALL_OLD'383 # it should give us attributes384 response = table.put_item(Item=item1, ReturnValues="ALL_OLD")385 _validate_response(response, expected={"Attributes": item1})386 # now a previous version of data is present, so when we pass return387 # values as 'ALL_OLD' it should give us the old attributes388 response = table.put_item(Item=item1b, ReturnValues="ALL_OLD")389 _validate_response(response, expected={"Attributes": item1})390 response = table.put_item(Item=item2)391 # we do not have any same item as item2 already so when we add this by default392 # return values is set to None so no Attribute values should be returned393 _validate_response(response)394 response = table.put_item(Item=item2)395 # in this case we already have item2 in the table so on this request396 # it should not return any data as return values is set to None so no397 # Attribute values should be returned398 _validate_response(response)399 # cleanup400 table.delete()401 @pytest.mark.aws_validated402 def test_empty_and_binary_values(self, dynamodb, dynamodb_client):403 aws_stack.create_dynamodb_table(404 TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY, client=dynamodb_client405 )406 table = dynamodb.Table(TEST_DDB_TABLE_NAME)407 # items which are being used to put in the table408 item1 = {PARTITION_KEY: "id1", "data": ""}409 item2 = {PARTITION_KEY: "id2", "data": b"\x90"}410 response = table.put_item(Item=item1)411 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200412 response = table.put_item(Item=item2)413 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200414 # clean up415 table.delete()416 def test_batch_write_binary(self, dynamodb_client):417 table_name = "table_batch_binary_%s" % short_uid()418 dynamodb_client.create_table(419 TableName=table_name,420 AttributeDefinitions=[421 {"AttributeName": "PK", "AttributeType": "S"},422 {"AttributeName": "SK", "AttributeType": "S"},423 ],424 KeySchema=[425 {"AttributeName": "PK", "KeyType": "HASH"},426 {"AttributeName": "SK", "KeyType": "RANGE"},427 ],428 BillingMode="PAY_PER_REQUEST",429 )430 dynamodb_client.put_item(431 TableName=table_name,432 Item={"PK": {"S": "hello"}, "SK": {"S": "user"}, "data": {"B": b"test"}},433 )434 item = {435 "Item": {436 "PK": {"S": "hello-1"},437 "SK": {"S": "user-1"},438 "data": {"B": b"test-1"},439 }440 }441 item_non_decodable = {442 "Item": {443 "PK": {"S": "hello-2"},444 "SK": {"S": "user-2"},445 "data": {"B": b"test \xc0 \xed"},446 }447 }448 response = dynamodb_client.batch_write_item(449 RequestItems={table_name: [{"PutRequest": item}, {"PutRequest": item_non_decodable}]}450 )451 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200452 dynamodb_client.delete_table(TableName=table_name)453 def test_binary_data_with_stream(454 self,455 wait_for_stream_ready,456 dynamodb_create_table_with_parameters,457 dynamodb_client,458 kinesis_client,459 ):460 table_name = f"table-{short_uid()}"461 dynamodb_create_table_with_parameters(462 TableName=table_name,463 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],464 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],465 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},466 StreamSpecification={467 "StreamEnabled": True,468 "StreamViewType": "NEW_AND_OLD_IMAGES",469 },470 )471 stream_name = get_kinesis_stream_name(table_name)472 wait_for_stream_ready(stream_name)473 response = dynamodb_client.put_item(474 TableName=table_name, Item={"id": {"S": "id1"}, "data": {"B": b"\x90"}}475 )476 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200477 iterator = get_shard_iterator(stream_name, kinesis_client)478 response = kinesis_client.get_records(ShardIterator=iterator)479 json_records = response.get("Records")480 assert 1 == len(json_records)481 assert "Data" in json_records[0]482 def test_dynamodb_stream_shard_iterator(self, wait_for_stream_ready):483 dynamodb = aws_stack.create_external_boto_client("dynamodb")484 ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")485 table_name = "table_with_stream-%s" % short_uid()486 table = dynamodb.create_table(487 TableName=table_name,488 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],489 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],490 StreamSpecification={491 "StreamEnabled": True,492 "StreamViewType": "NEW_IMAGE",493 },494 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},495 )496 stream_name = get_kinesis_stream_name(table_name)497 wait_for_stream_ready(stream_name)498 stream_arn = table["TableDescription"]["LatestStreamArn"]499 result = ddbstreams.describe_stream(StreamArn=stream_arn)500 response = ddbstreams.get_shard_iterator(501 StreamArn=stream_arn,502 ShardId=result["StreamDescription"]["Shards"][0]["ShardId"],503 ShardIteratorType="LATEST",504 )505 assert "ShardIterator" in response506 response = ddbstreams.get_shard_iterator(507 StreamArn=stream_arn,508 ShardId=result["StreamDescription"]["Shards"][0]["ShardId"],509 ShardIteratorType="AT_SEQUENCE_NUMBER",510 SequenceNumber=result["StreamDescription"]["Shards"][0]511 .get("SequenceNumberRange")512 .get("StartingSequenceNumber"),513 )514 assert "ShardIterator" in response515 def test_dynamodb_create_table_with_class(self, dynamodb_client):516 table_name = "table_with_class_%s" % short_uid()517 # create table518 result = dynamodb_client.create_table(519 TableName=table_name,520 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],521 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],522 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},523 TableClass="STANDARD",524 )525 assert result["TableDescription"]["TableClassSummary"]["TableClass"] == "STANDARD"526 result = dynamodb_client.describe_table(TableName=table_name)527 assert result["Table"]["TableClassSummary"]["TableClass"] == "STANDARD"528 result = dynamodb_client.update_table(529 TableName=table_name, TableClass="STANDARD_INFREQUENT_ACCESS"530 )531 assert (532 result["TableDescription"]["TableClassSummary"]["TableClass"]533 == "STANDARD_INFREQUENT_ACCESS"534 )535 result = dynamodb_client.describe_table(TableName=table_name)536 assert result["Table"]["TableClassSummary"]["TableClass"] == "STANDARD_INFREQUENT_ACCESS"537 # clean resources538 dynamodb_client.delete_table(TableName=table_name)539 def test_dynamodb_execute_transaction(self, dynamodb_client):540 table_name = "table_%s" % short_uid()541 dynamodb_client.create_table(542 TableName=table_name,543 KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],544 AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],545 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},546 )547 statements = [548 {"Statement": f"INSERT INTO {table_name} VALUE {{'Username': 'user01'}}"},549 {"Statement": f"INSERT INTO {table_name} VALUE {{'Username': 'user02'}}"},550 ]551 result = dynamodb_client.execute_transaction(TransactStatements=statements)552 assert result["ResponseMetadata"]["HTTPStatusCode"] == 200553 result = dynamodb_client.scan(TableName=table_name)554 assert result["ScannedCount"] == 2555 dynamodb_client.delete_table(TableName=table_name)556 def test_dynamodb_batch_execute_statement(self, dynamodb_client):557 table_name = "table_%s" % short_uid()558 dynamodb_client.create_table(559 TableName=table_name,560 KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],561 AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],562 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},563 )564 dynamodb_client.put_item(TableName=table_name, Item={"Username": {"S": "user02"}})565 statements = [566 {"Statement": f"INSERT INTO {table_name} VALUE {{'Username': 'user01'}}"},567 {"Statement": f"UPDATE {table_name} SET Age=20 WHERE Username='user02'"},568 ]569 result = dynamodb_client.batch_execute_statement(Statements=statements)570 # actions always succeeds571 assert not any("Error" in r for r in result["Responses"])572 item = dynamodb_client.get_item(TableName=table_name, Key={"Username": {"S": "user02"}})[573 "Item"574 ]575 assert item["Age"]["N"] == "20"576 item = dynamodb_client.get_item(TableName=table_name, Key={"Username": {"S": "user01"}})[577 "Item"578 ]579 assert item580 dynamodb_client.delete_table(TableName=table_name)581 def test_dynamodb_partiql_missing(self, dynamodb_client):582 table_name = "table_with_stream_%s" % short_uid()583 # create table584 dynamodb_client.create_table(585 TableName=table_name,586 KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],587 AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],588 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},589 )590 # create items with FirstName attribute591 dynamodb_client.execute_statement(592 Statement=f"INSERT INTO {table_name} VALUE {{'Username': 'Alice123', 'FirstName':'Alice'}}"593 )594 items = dynamodb_client.execute_statement(595 Statement=f"SELECT * FROM {table_name} WHERE FirstName IS NOT MISSING"596 )["Items"]597 assert len(items) == 1598 items = dynamodb_client.execute_statement(599 Statement=f"SELECT * FROM {table_name} WHERE FirstName IS MISSING"600 )["Items"]601 assert len(items) == 0602 dynamodb_client.delete_table(TableName=table_name)603 def test_dynamodb_stream_stream_view_type(self):604 dynamodb = aws_stack.create_external_boto_client("dynamodb")605 ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")606 table_name = "table_with_stream_%s" % short_uid()607 # create table608 table = dynamodb.create_table(609 TableName=table_name,610 KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],611 AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],612 StreamSpecification={613 "StreamEnabled": True,614 "StreamViewType": "KEYS_ONLY",615 },616 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},617 )618 stream_arn = table["TableDescription"]["LatestStreamArn"]619 # wait for stream to be created620 sleep(1)621 # put item in table - INSERT event622 dynamodb.put_item(TableName=table_name, Item={"Username": {"S": "Fred"}})623 # update item in table - MODIFY event624 dynamodb.update_item(625 TableName=table_name,626 Key={"Username": {"S": "Fred"}},627 UpdateExpression="set S=:r",628 ExpressionAttributeValues={":r": {"S": "Fred_Modified"}},629 ReturnValues="UPDATED_NEW",630 )631 # delete item in table - REMOVE event632 dynamodb.delete_item(TableName=table_name, Key={"Username": {"S": "Fred"}})633 result = ddbstreams.describe_stream(StreamArn=stream_arn)634 # assert stream_view_type of the table635 assert result["StreamDescription"]["StreamViewType"] == "KEYS_ONLY"636 # add item via PartiQL query - INSERT event637 dynamodb.execute_statement(638 Statement=f"INSERT INTO {table_name} VALUE {{'Username': 'Alice'}}"639 )640 # run update via PartiQL query - MODIFY event641 dynamodb.execute_statement(642 Statement=f"UPDATE {table_name} SET partiql=1 WHERE Username='Alice'"643 )644 # run update via PartiQL query - REMOVE event645 dynamodb.execute_statement(Statement=f"DELETE FROM {table_name} WHERE Username='Alice'")646 # get shard iterator647 response = ddbstreams.get_shard_iterator(648 StreamArn=stream_arn,649 ShardId=result["StreamDescription"]["Shards"][0]["ShardId"],650 ShardIteratorType="AT_SEQUENCE_NUMBER",651 SequenceNumber=result["StreamDescription"]["Shards"][0]652 .get("SequenceNumberRange")653 .get("StartingSequenceNumber"),654 )655 # get stream records656 records = ddbstreams.get_records(ShardIterator=response["ShardIterator"])["Records"]657 assert len(records) == 6658 events = [rec["eventName"] for rec in records]659 assert events == ["INSERT", "MODIFY", "REMOVE"] * 2660 # assert that all records contain proper event IDs661 event_ids = [rec.get("eventID") for rec in records]662 assert all(event_ids)663 # assert that updates have been received from regular table operations and PartiQL query operations664 for idx, record in enumerate(records):665 assert "SequenceNumber" in record["dynamodb"]666 assert record["dynamodb"]["StreamViewType"] == "KEYS_ONLY"667 assert record["dynamodb"]["Keys"] == {"Username": {"S": "Fred" if idx < 3 else "Alice"}}668 assert "OldImage" not in record["dynamodb"]669 assert "NewImage" not in record["dynamodb"]670 # clean up671 delete_table(table_name)672 def test_dynamodb_with_kinesis_stream(self):673 dynamodb = aws_stack.create_external_boto_client("dynamodb")674 kinesis = aws_stack.create_external_boto_client("kinesis")675 # create kinesis datastream676 stream_name = "kinesis_dest_stream"677 kinesis.create_stream(StreamName=stream_name, ShardCount=1)678 # wait for the stream to be created679 sleep(1)680 # Get stream description681 stream_description = kinesis.describe_stream(StreamName=stream_name)["StreamDescription"]682 table_name = "table_with_kinesis_stream-%s" % short_uid()683 # create table684 dynamodb.create_table(685 TableName=table_name,686 KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],687 AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],688 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},689 )690 # Enable kinesis destination for the table691 dynamodb.enable_kinesis_streaming_destination(692 TableName=table_name, StreamArn=stream_description["StreamARN"]693 )694 # put item into table695 dynamodb.put_item(TableName=table_name, Item={"Username": {"S": "Fred"}})696 # update item in table697 dynamodb.update_item(698 TableName=table_name,699 Key={"Username": {"S": "Fred"}},700 UpdateExpression="set S=:r",701 ExpressionAttributeValues={":r": {"S": "Fred_Modified"}},702 ReturnValues="UPDATED_NEW",703 )704 # delete item in table705 dynamodb.delete_item(TableName=table_name, Key={"Username": {"S": "Fred"}})706 def _fetch_records():707 records = aws_stack.kinesis_get_latest_records(708 stream_name, shard_id=stream_description["Shards"][0]["ShardId"]709 )710 assert len(records) == 3711 return records712 # get records from the stream713 records = retry(_fetch_records)714 for record in records:715 record = json.loads(record["Data"])716 assert record["tableName"] == table_name717 # check eventSourceARN not exists in the stream record718 assert "eventSourceARN" not in record719 if record["eventName"] == "INSERT":720 assert "OldImage" not in record["dynamodb"]721 assert "NewImage" in record["dynamodb"]722 elif record["eventName"] == "MODIFY":723 assert "NewImage" in record["dynamodb"]724 assert "OldImage" in record["dynamodb"]725 elif record["eventName"] == "REMOVE":726 assert "NewImage" not in record["dynamodb"]727 assert "OldImage" in record["dynamodb"]728 # describe kinesis streaming destination of the table729 destinations = dynamodb.describe_kinesis_streaming_destination(TableName=table_name)730 destination = destinations["KinesisDataStreamDestinations"][0]731 # assert kinesis streaming destination status732 assert stream_description["StreamARN"] == destination["StreamArn"]733 assert destination["DestinationStatus"] == "ACTIVE"734 # Disable kinesis destination735 dynamodb.disable_kinesis_streaming_destination(736 TableName=table_name, StreamArn=stream_description["StreamARN"]737 )738 # describe kinesis streaming destination of the table739 result = dynamodb.describe_kinesis_streaming_destination(TableName=table_name)740 destination = result["KinesisDataStreamDestinations"][0]741 # assert kinesis streaming destination status742 assert stream_description["StreamARN"] == destination["StreamArn"]743 assert destination["DestinationStatus"] == "DISABLED"744 # clean up745 delete_table(table_name)746 kinesis.delete_stream(StreamName="kinesis_dest_stream")747 def test_global_tables(self):748 aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY)749 dynamodb = aws_stack.create_external_boto_client("dynamodb")750 # create global table751 regions = [752 {"RegionName": "us-east-1"},753 {"RegionName": "us-west-1"},754 {"RegionName": "eu-central-1"},755 ]756 response = dynamodb.create_global_table(757 GlobalTableName=TEST_DDB_TABLE_NAME, ReplicationGroup=regions758 )["GlobalTableDescription"]759 assert "ReplicationGroup" in response760 assert len(response["ReplicationGroup"]) == len(regions)761 # describe global table762 response = dynamodb.describe_global_table(GlobalTableName=TEST_DDB_TABLE_NAME)[763 "GlobalTableDescription"764 ]765 assert "ReplicationGroup" in response766 assert len(regions) == len(response["ReplicationGroup"])767 # update global table768 updates = [769 {"Create": {"RegionName": "us-east-2"}},770 {"Create": {"RegionName": "us-west-2"}},771 {"Delete": {"RegionName": "us-west-1"}},772 ]773 response = dynamodb.update_global_table(774 GlobalTableName=TEST_DDB_TABLE_NAME, ReplicaUpdates=updates775 )["GlobalTableDescription"]776 assert "ReplicationGroup" in response777 assert len(response["ReplicationGroup"]) == len(regions) + 1778 # assert exceptions for invalid requests779 with pytest.raises(Exception) as ctx:780 dynamodb.create_global_table(781 GlobalTableName=TEST_DDB_TABLE_NAME, ReplicationGroup=regions782 )783 assert ctx.match("GlobalTableAlreadyExistsException")784 with pytest.raises(Exception) as ctx:785 dynamodb.describe_global_table(GlobalTableName="invalid-table-name")786 assert ctx.match("GlobalTableNotFoundException")787 def test_create_duplicate_table(self, dynamodb_create_table_with_parameters):788 table_name = "duplicateTable"789 dynamodb_create_table_with_parameters(790 TableName=table_name,791 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],792 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],793 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},794 Tags=TEST_DDB_TAGS,795 )796 with pytest.raises(Exception) as ctx:797 dynamodb_create_table_with_parameters(798 TableName=table_name,799 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],800 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],801 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},802 Tags=TEST_DDB_TAGS,803 )804 ctx.match("ResourceInUseException")805 def test_delete_table(self, dynamodb_client, dynamodb_create_table):806 table_name = "test-ddb-table-%s" % short_uid()807 tables_before = len(dynamodb_client.list_tables()["TableNames"])808 dynamodb_create_table(809 table_name=table_name,810 partition_key=PARTITION_KEY,811 )812 table_list = dynamodb_client.list_tables()813 # TODO: fix assertion, to enable parallel test execution!814 assert tables_before + 1 == len(table_list["TableNames"])815 assert table_name in table_list["TableNames"]816 dynamodb_client.delete_table(TableName=table_name)817 table_list = dynamodb_client.list_tables()818 assert tables_before == len(table_list["TableNames"])819 with pytest.raises(Exception) as ctx:820 dynamodb_client.delete_table(TableName=table_name)821 assert ctx.match("ResourceNotFoundException")822 def test_transaction_write_items(self, dynamodb_client, dynamodb_create_table_with_parameters):823 table_name = "test-ddb-table-%s" % short_uid()824 dynamodb_create_table_with_parameters(825 TableName=table_name,826 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],827 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],828 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},829 Tags=TEST_DDB_TAGS,830 )831 response = dynamodb_client.transact_write_items(832 TransactItems=[833 {834 "ConditionCheck": {835 "TableName": table_name,836 "ConditionExpression": "attribute_not_exists(id)",837 "Key": {"id": {"S": "test1"}},838 }839 },840 {"Put": {"TableName": table_name, "Item": {"id": {"S": "test2"}}}},841 {842 "Update": {843 "TableName": table_name,844 "Key": {"id": {"S": "test3"}},845 "UpdateExpression": "SET attr1 = :v1, attr2 = :v2",846 "ExpressionAttributeValues": {847 ":v1": {"S": "value1"},848 ":v2": {"S": "value2"},849 },850 }851 },852 {"Delete": {"TableName": table_name, "Key": {"id": {"S": "test4"}}}},853 ]854 )855 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200856 @pytest.mark.aws_validated857 def test_transaction_write_canceled(858 self, dynamodb_create_table_with_parameters, dynamodb_wait_for_table_active, dynamodb_client859 ):860 table_name = "table_%s" % short_uid()861 # create table862 dynamodb_create_table_with_parameters(863 TableName=table_name,864 KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],865 AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],866 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},867 )868 dynamodb_wait_for_table_active(table_name)869 # put item in table - INSERT event870 dynamodb_client.put_item(TableName=table_name, Item={"Username": {"S": "Fred"}})871 # provoke a TransactionCanceledException by adding a condition which is not met872 with pytest.raises(Exception) as ctx:873 dynamodb_client.transact_write_items(874 TransactItems=[875 {876 "ConditionCheck": {877 "TableName": table_name,878 "ConditionExpression": "attribute_not_exists(Username)",879 "Key": {"Username": {"S": "Fred"}},880 }881 },882 {"Delete": {"TableName": table_name, "Key": {"Username": {"S": "Bert"}}}},883 ]884 )885 # Make sure the exception contains the cancellation reasons886 assert ctx.match("TransactionCanceledException")887 assert (888 str(ctx.value)889 == "An error occurred (TransactionCanceledException) when calling the TransactWriteItems operation: "890 "Transaction cancelled, please refer cancellation reasons for specific reasons "891 "[ConditionalCheckFailed, None]"892 )893 assert hasattr(ctx.value, "response")894 assert "CancellationReasons" in ctx.value.response895 conditional_check_failed = [896 reason897 for reason in ctx.value.response["CancellationReasons"]898 if reason.get("Code") == "ConditionalCheckFailed"899 ]900 assert len(conditional_check_failed) == 1901 assert "Message" in conditional_check_failed[0]902 # dynamodb-local adds a trailing "." to the message, AWS does not903 assert re.match(904 r"^The conditional request failed\.?$", conditional_check_failed[0]["Message"]905 )906 def test_transaction_write_binary_data(907 self, dynamodb_client, dynamodb_create_table_with_parameters908 ):909 table_name = "test-ddb-table-%s" % short_uid()910 dynamodb_create_table_with_parameters(911 TableName=table_name,912 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],913 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],914 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},915 Tags=TEST_DDB_TAGS,916 )917 binary_item = {"B": b"foobar"}918 response = dynamodb_client.transact_write_items(919 TransactItems=[920 {921 "Put": {922 "TableName": table_name,923 "Item": {924 "id": {"S": "someUser"},925 "binaryData": binary_item,926 },927 }928 }929 ]930 )931 item = dynamodb_client.get_item(TableName=table_name, Key={"id": {"S": "someUser"}})["Item"]932 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200933 assert item["binaryData"]934 assert item["binaryData"] == binary_item935 def test_transact_get_items(self, dynamodb_client, dynamodb_create_table):936 table_name = "test-ddb-table-%s" % short_uid()937 dynamodb_create_table(938 table_name=table_name,939 partition_key=PARTITION_KEY,940 )941 dynamodb_client.put_item(TableName=table_name, Item={"id": {"S": "John"}})942 result = dynamodb_client.transact_get_items(943 TransactItems=[{"Get": {"Key": {"id": {"S": "John"}}, "TableName": table_name}}]944 )945 assert result["ResponseMetadata"]["HTTPStatusCode"] == 200946 def test_batch_write_items(self, dynamodb_client, dynamodb_create_table_with_parameters):947 table_name = "test-ddb-table-%s" % short_uid()948 dynamodb_create_table_with_parameters(949 TableName=table_name,950 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],951 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],952 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},953 Tags=TEST_DDB_TAGS,954 )955 dynamodb_client.put_item(TableName=table_name, Item={"id": {"S": "Fred"}})956 response = dynamodb_client.batch_write_item(957 RequestItems={958 table_name: [959 {"DeleteRequest": {"Key": {"id": {"S": "Fred"}}}},960 {"PutRequest": {"Item": {"id": {"S": "Bob"}}}},961 ]962 }963 )964 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200965 @pytest.mark.xfail(reason="this test flakes regularly in CI")966 def test_dynamodb_stream_records_with_update_item(967 self,968 dynamodb_client,969 dynamodbstreams_client,970 dynamodb_resource,971 dynamodb_create_table,972 wait_for_stream_ready,973 ):974 table_name = f"test-ddb-table-{short_uid()}"975 dynamodb_create_table(976 table_name=table_name,977 partition_key=PARTITION_KEY,978 stream_view_type="NEW_AND_OLD_IMAGES",979 )980 table = dynamodb_resource.Table(table_name)981 stream_name = get_kinesis_stream_name(table_name)982 wait_for_stream_ready(stream_name)983 response = dynamodbstreams_client.describe_stream(StreamArn=table.latest_stream_arn)984 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200985 assert len(response["StreamDescription"]["Shards"]) == 1986 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]987 starting_sequence_number = int(988 response["StreamDescription"]["Shards"][0]989 .get("SequenceNumberRange")990 .get("StartingSequenceNumber")991 )992 response = dynamodbstreams_client.get_shard_iterator(993 StreamArn=table.latest_stream_arn,994 ShardId=shard_id,995 ShardIteratorType="LATEST",996 )997 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200998 assert "ShardIterator" in response999 iterator_id = response["ShardIterator"]1000 item_id = short_uid()1001 for _ in range(2):1002 dynamodb_client.update_item(1003 TableName=table_name,1004 Key={PARTITION_KEY: {"S": item_id}},1005 UpdateExpression="SET attr1 = :v1, attr2 = :v2",1006 ExpressionAttributeValues={1007 ":v1": {"S": "value1"},1008 ":v2": {"S": "value2"},1009 },1010 ReturnValues="ALL_NEW",1011 ReturnConsumedCapacity="INDEXES",1012 )1013 def check_expected_records():1014 records = dynamodbstreams_client.get_records(ShardIterator=iterator_id)1015 assert records["ResponseMetadata"]["HTTPStatusCode"] == 2001016 assert len(records["Records"]) == 21017 assert isinstance(1018 records["Records"][0]["dynamodb"]["ApproximateCreationDateTime"],1019 datetime,1020 )1021 assert records["Records"][0]["dynamodb"]["ApproximateCreationDateTime"].microsecond == 01022 assert records["Records"][0]["eventVersion"] == "1.1"1023 assert records["Records"][0]["eventName"] == "INSERT"1024 assert "OldImage" not in records["Records"][0]["dynamodb"]1025 assert (1026 int(records["Records"][0]["dynamodb"]["SequenceNumber"]) > starting_sequence_number1027 )1028 assert isinstance(1029 records["Records"][1]["dynamodb"]["ApproximateCreationDateTime"],1030 datetime,1031 )1032 assert records["Records"][1]["dynamodb"]["ApproximateCreationDateTime"].microsecond == 01033 assert records["Records"][1]["eventVersion"] == "1.1"1034 assert records["Records"][1]["eventName"] == "MODIFY"1035 assert "OldImage" in records["Records"][1]["dynamodb"]1036 assert (1037 int(records["Records"][1]["dynamodb"]["SequenceNumber"]) > starting_sequence_number1038 )1039 retry(check_expected_records, retries=5, sleep=1, sleep_before=2)1040 def test_query_on_deleted_resource(self, dynamodb_client, dynamodb_create_table):1041 table_name = "ddb-table-%s" % short_uid()1042 partition_key = "username"1043 dynamodb_create_table(table_name=table_name, partition_key=partition_key)1044 rs = dynamodb_client.query(1045 TableName=table_name,1046 KeyConditionExpression="{} = :username".format(partition_key),1047 ExpressionAttributeValues={":username": {"S": "test"}},1048 )1049 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 2001050 dynamodb_client.delete_table(TableName=table_name)1051 with pytest.raises(Exception) as ctx:1052 dynamodb_client.query(1053 TableName=table_name,1054 KeyConditionExpression="{} = :username".format(partition_key),1055 ExpressionAttributeValues={":username": {"S": "test"}},1056 )1057 assert ctx.match("ResourceNotFoundException")1058 def test_dynamodb_stream_to_lambda(1059 self, lambda_client, dynamodb_resource, dynamodb_create_table, wait_for_stream_ready1060 ):1061 table_name = "ddb-table-%s" % short_uid()1062 function_name = "func-%s" % short_uid()1063 partition_key = "SK"1064 dynamodb_create_table(1065 table_name=table_name,1066 partition_key=partition_key,1067 stream_view_type="NEW_AND_OLD_IMAGES",1068 )1069 table = dynamodb_resource.Table(table_name)1070 latest_stream_arn = table.latest_stream_arn1071 stream_name = get_kinesis_stream_name(table_name)1072 wait_for_stream_ready(stream_name)1073 testutil.create_lambda_function(1074 handler_file=TEST_LAMBDA_PYTHON_ECHO,1075 func_name=function_name,1076 runtime=LAMBDA_RUNTIME_PYTHON36,1077 )1078 mapping_uuid = lambda_client.create_event_source_mapping(1079 EventSourceArn=latest_stream_arn,1080 FunctionName=function_name,1081 StartingPosition="TRIM_HORIZON",1082 )["UUID"]1083 item = {"SK": short_uid(), "Name": "name-{}".format(short_uid())}1084 table.put_item(Item=item)1085 events = retry(1086 check_expected_lambda_log_events_length,1087 retries=10,1088 sleep=1,1089 function_name=function_name,1090 expected_length=1,1091 regex_filter=r"Records",1092 )1093 assert len(events) == 11094 assert len(events[0]["Records"]) == 11095 dynamodb_event = events[0]["Records"][0]["dynamodb"]1096 assert dynamodb_event["StreamViewType"] == "NEW_AND_OLD_IMAGES"1097 assert dynamodb_event["Keys"] == {"SK": {"S": item["SK"]}}1098 assert dynamodb_event["NewImage"]["Name"] == {"S": item["Name"]}1099 assert "SequenceNumber" in dynamodb_event1100 lambda_client.delete_event_source_mapping(UUID=mapping_uuid)1101 def test_dynamodb_batch_write_item(1102 self, dynamodb_client, dynamodb_create_table_with_parameters1103 ):1104 table_name = "ddb-table-%s" % short_uid()1105 dynamodb_create_table_with_parameters(1106 TableName=table_name,1107 KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1108 AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1109 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1110 Tags=TEST_DDB_TAGS,1111 )1112 result = dynamodb_client.batch_write_item(1113 RequestItems={1114 table_name: [1115 {"PutRequest": {"Item": {PARTITION_KEY: {"S": "Test1"}}}},1116 {"PutRequest": {"Item": {PARTITION_KEY: {"S": "Test2"}}}},1117 {"PutRequest": {"Item": {PARTITION_KEY: {"S": "Test3"}}}},1118 ]1119 }1120 )1121 assert result.get("UnprocessedItems") == {}1122 def test_dynamodb_pay_per_request(self, dynamodb_create_table_with_parameters):1123 table_name = "ddb-table-%s" % short_uid()1124 with pytest.raises(Exception) as e:1125 dynamodb_create_table_with_parameters(1126 TableName=table_name,1127 KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1128 AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1129 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1130 BillingMode="PAY_PER_REQUEST",1131 )1132 assert e.match("ValidationException")1133 def test_dynamodb_create_table_with_sse_specification(1134 self, dynamodb_create_table_with_parameters1135 ):1136 table_name = "ddb-table-%s" % short_uid()1137 kms_master_key_id = long_uid()1138 sse_specification = {"Enabled": True, "SSEType": "KMS", "KMSMasterKeyId": kms_master_key_id}1139 kms_master_key_arn = aws_stack.kms_key_arn(kms_master_key_id)1140 result = dynamodb_create_table_with_parameters(1141 TableName=table_name,1142 KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1143 AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1144 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1145 SSESpecification=sse_specification,1146 Tags=TEST_DDB_TAGS,1147 )1148 assert result["TableDescription"]["SSEDescription"]1149 assert result["TableDescription"]["SSEDescription"]["Status"] == "ENABLED"1150 assert result["TableDescription"]["SSEDescription"]["KMSMasterKeyArn"] == kms_master_key_arn1151 def test_dynamodb_create_table_with_partial_sse_specification(1152 self, dynamodb_create_table_with_parameters, kms_client1153 ):1154 table_name = "ddb-table-%s" % short_uid()1155 sse_specification = {"Enabled": True}1156 result = dynamodb_create_table_with_parameters(1157 TableName=table_name,1158 KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1159 AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1160 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1161 SSESpecification=sse_specification,1162 Tags=TEST_DDB_TAGS,1163 )1164 assert result["TableDescription"]["SSEDescription"]1165 assert result["TableDescription"]["SSEDescription"]["Status"] == "ENABLED"1166 assert result["TableDescription"]["SSEDescription"]["SSEType"] == "KMS"1167 assert "KMSMasterKeyArn" in result["TableDescription"]["SSEDescription"]1168 kms_master_key_arn = result["TableDescription"]["SSEDescription"]["KMSMasterKeyArn"]1169 result = kms_client.describe_key(KeyId=kms_master_key_arn)1170 assert result["KeyMetadata"]["KeyManager"] == "AWS"1171 def test_dynamodb_get_batch_items(self, dynamodb_client, dynamodb_create_table_with_parameters):1172 table_name = "ddb-table-%s" % short_uid()1173 dynamodb_create_table_with_parameters(1174 TableName=table_name,1175 KeySchema=[{"AttributeName": "PK", "KeyType": "HASH"}],1176 AttributeDefinitions=[{"AttributeName": "PK", "AttributeType": "S"}],1177 ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},1178 )1179 result = dynamodb_client.batch_get_item(1180 RequestItems={table_name: {"Keys": [{"PK": {"S": "test-key"}}]}}1181 )1182 assert list(result["Responses"])[0] == table_name1183 def test_dynamodb_streams_describe_with_exclusive_start_shard_id(1184 self, dynamodb_resource, dynamodb_create_table1185 ):1186 table_name = f"test-ddb-table-{short_uid()}"1187 ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")1188 dynamodb_create_table(1189 table_name=table_name,1190 partition_key=PARTITION_KEY,1191 stream_view_type="NEW_AND_OLD_IMAGES",1192 )1193 table = dynamodb_resource.Table(table_name)1194 response = ddbstreams.describe_stream(StreamArn=table.latest_stream_arn)1195 assert response["ResponseMetadata"]["HTTPStatusCode"] == 2001196 assert len(response["StreamDescription"]["Shards"]) == 11197 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]1198 response = ddbstreams.describe_stream(1199 StreamArn=table.latest_stream_arn, ExclusiveStartShardId=shard_id1200 )1201 assert response["ResponseMetadata"]["HTTPStatusCode"] == 2001202 assert len(response["StreamDescription"]["Shards"]) == 01203 @pytest.mark.aws_validated1204 def test_dynamodb_idempotent_writing(1205 self, dynamodb_create_table_with_parameters, dynamodb_client, dynamodb_wait_for_table_active1206 ):1207 table_name = f"ddb-table-{short_uid()}"1208 dynamodb_create_table_with_parameters(1209 TableName=table_name,1210 KeySchema=[1211 {"AttributeName": "id", "KeyType": "HASH"},1212 {"AttributeName": "name", "KeyType": "RANGE"},1213 ],1214 AttributeDefinitions=[1215 {"AttributeName": "id", "AttributeType": "S"},1216 {"AttributeName": "name", "AttributeType": "S"},1217 ],1218 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1219 )1220 dynamodb_wait_for_table_active(table_name)1221 def _transact_write(_d: Dict):1222 response = dynamodb_client.transact_write_items(1223 ClientRequestToken="dedupe_token",1224 TransactItems=[1225 {1226 "Put": {1227 "TableName": table_name,1228 "Item": _d,1229 }1230 },1231 ],1232 )1233 assert response["ResponseMetadata"]["HTTPStatusCode"] == 2001234 _transact_write({"id": {"S": "id1"}, "name": {"S": "name1"}})1235 _transact_write({"name": {"S": "name1"}, "id": {"S": "id1"}})1236def delete_table(name):1237 dynamodb_client = aws_stack.create_external_boto_client("dynamodb")...

Full Screen

Full Screen

test_lambda_integration.py

Source:test_lambda_integration.py Github

copy

Full Screen

1import base642import json3import os4import time5from unittest.mock import patch6import pytest7from botocore.exceptions import ClientError8from localstack import config9from localstack.services.apigateway.helpers import path_based_url10from localstack.services.awslambda.lambda_api import (11 BATCH_SIZE_RANGES,12 INVALID_PARAMETER_VALUE_EXCEPTION,13)14from localstack.services.awslambda.lambda_utils import LAMBDA_RUNTIME_PYTHON3615from localstack.utils import testutil16from localstack.utils.aws import aws_stack17from localstack.utils.common import retry, safe_requests, short_uid18from localstack.utils.sync import poll_condition19from localstack.utils.testutil import check_expected_lambda_log_events_length, get_lambda_log_events20from .test_lambda import (21 TEST_LAMBDA_FUNCTION_PREFIX,22 TEST_LAMBDA_LIBS,23 TEST_LAMBDA_PYTHON,24 TEST_LAMBDA_PYTHON_ECHO,25 is_old_provider,26)27TEST_STAGE_NAME = "testing"28TEST_SNS_TOPIC_NAME = "sns-topic-1"29THIS_FOLDER = os.path.dirname(os.path.realpath(__file__))30TEST_LAMBDA_PARALLEL_FILE = os.path.join(THIS_FOLDER, "functions", "lambda_parallel.py")31class TestLambdaEventSourceMappings:32 def test_event_source_mapping_default_batch_size(33 self,34 create_lambda_function,35 lambda_client,36 sqs_client,37 sqs_create_queue,38 sqs_queue_arn,39 dynamodb_client,40 dynamodb_create_table,41 lambda_su_role,42 ):43 function_name = f"lambda_func-{short_uid()}"44 queue_name_1 = f"queue-{short_uid()}-1"45 queue_name_2 = f"queue-{short_uid()}-2"46 ddb_table = f"ddb_table-{short_uid()}"47 create_lambda_function(48 func_name=function_name,49 handler_file=TEST_LAMBDA_PYTHON_ECHO,50 runtime=LAMBDA_RUNTIME_PYTHON36,51 role=lambda_su_role,52 )53 queue_url_1 = sqs_create_queue(QueueName=queue_name_1)54 queue_arn_1 = sqs_queue_arn(queue_url_1)55 rs = lambda_client.create_event_source_mapping(56 EventSourceArn=queue_arn_1, FunctionName=function_name57 )58 assert BATCH_SIZE_RANGES["sqs"][0] == rs["BatchSize"]59 uuid = rs["UUID"]60 def wait_for_event_source_mapping():61 return lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled"62 assert poll_condition(wait_for_event_source_mapping, timeout=30)63 with pytest.raises(ClientError) as e:64 # Update batch size with invalid value65 lambda_client.update_event_source_mapping(66 UUID=uuid,67 FunctionName=function_name,68 BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,69 )70 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)71 queue_url_2 = sqs_create_queue(QueueName=queue_name_2)72 queue_arn_2 = sqs_queue_arn(queue_url_2)73 with pytest.raises(ClientError) as e:74 # Create event source mapping with invalid batch size value75 lambda_client.create_event_source_mapping(76 EventSourceArn=queue_arn_2,77 FunctionName=function_name,78 BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,79 )80 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)81 table_description = dynamodb_create_table(82 table_name=ddb_table,83 partition_key="id",84 stream_view_type="NEW_IMAGE",85 )["TableDescription"]86 # table ARNs are not sufficient as event source, needs to be a dynamodb stream arn87 if not is_old_provider():88 with pytest.raises(ClientError) as e:89 lambda_client.create_event_source_mapping(90 EventSourceArn=table_description["TableArn"],91 FunctionName=function_name,92 StartingPosition="LATEST",93 )94 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)95 # check if event source mapping can be created with latest stream ARN96 rs = lambda_client.create_event_source_mapping(97 EventSourceArn=table_description["LatestStreamArn"],98 FunctionName=function_name,99 StartingPosition="LATEST",100 )101 assert BATCH_SIZE_RANGES["dynamodb"][0] == rs["BatchSize"]102 def test_disabled_event_source_mapping_with_dynamodb(103 self,104 create_lambda_function,105 lambda_client,106 dynamodb_resource,107 dynamodb_client,108 dynamodb_create_table,109 logs_client,110 dynamodbstreams_client,111 lambda_su_role,112 ):113 function_name = f"lambda_func-{short_uid()}"114 ddb_table = f"ddb_table-{short_uid()}"115 create_lambda_function(116 func_name=function_name,117 handler_file=TEST_LAMBDA_PYTHON_ECHO,118 runtime=LAMBDA_RUNTIME_PYTHON36,119 role=lambda_su_role,120 )121 latest_stream_arn = dynamodb_create_table(122 table_name=ddb_table, partition_key="id", stream_view_type="NEW_IMAGE"123 )["TableDescription"]["LatestStreamArn"]124 rs = lambda_client.create_event_source_mapping(125 FunctionName=function_name,126 EventSourceArn=latest_stream_arn,127 StartingPosition="TRIM_HORIZON",128 MaximumBatchingWindowInSeconds=1,129 )130 uuid = rs["UUID"]131 def wait_for_table_created():132 return (133 dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]134 == "ACTIVE"135 )136 assert poll_condition(wait_for_table_created, timeout=30)137 def wait_for_stream_created():138 return (139 dynamodbstreams_client.describe_stream(StreamArn=latest_stream_arn)[140 "StreamDescription"141 ]["StreamStatus"]142 == "ENABLED"143 )144 assert poll_condition(wait_for_stream_created, timeout=30)145 table = dynamodb_resource.Table(ddb_table)146 items = [147 {"id": short_uid(), "data": "data1"},148 {"id": short_uid(), "data": "data2"},149 ]150 table.put_item(Item=items[0])151 def assert_events():152 events = get_lambda_log_events(function_name, logs_client=logs_client)153 # lambda was invoked 1 time154 assert 1 == len(events[0]["Records"])155 # might take some time against AWS156 retry(assert_events, sleep=3, retries=10)157 # disable event source mapping158 lambda_client.update_event_source_mapping(UUID=uuid, Enabled=False)159 table.put_item(Item=items[1])160 events = get_lambda_log_events(function_name, logs_client=logs_client)161 # lambda no longer invoked, still have 1 event162 assert 1 == len(events[0]["Records"])163 # TODO invalid test against AWS, this behavior just is not correct164 def test_deletion_event_source_mapping_with_dynamodb(165 self, create_lambda_function, lambda_client, dynamodb_client, lambda_su_role166 ):167 function_name = f"lambda_func-{short_uid()}"168 ddb_table = f"ddb_table-{short_uid()}"169 create_lambda_function(170 func_name=function_name,171 handler_file=TEST_LAMBDA_PYTHON_ECHO,172 runtime=LAMBDA_RUNTIME_PYTHON36,173 role=lambda_su_role,174 )175 latest_stream_arn = aws_stack.create_dynamodb_table(176 table_name=ddb_table,177 partition_key="id",178 client=dynamodb_client,179 stream_view_type="NEW_IMAGE",180 )["TableDescription"]["LatestStreamArn"]181 lambda_client.create_event_source_mapping(182 FunctionName=function_name,183 EventSourceArn=latest_stream_arn,184 StartingPosition="TRIM_HORIZON",185 )186 def wait_for_table_created():187 return (188 dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]189 == "ACTIVE"190 )191 assert poll_condition(wait_for_table_created, timeout=30)192 dynamodb_client.delete_table(TableName=ddb_table)193 result = lambda_client.list_event_source_mappings(EventSourceArn=latest_stream_arn)194 assert 1 == len(result["EventSourceMappings"])195 def test_event_source_mapping_with_sqs(196 self,197 create_lambda_function,198 lambda_client,199 sqs_client,200 sqs_create_queue,201 sqs_queue_arn,202 logs_client,203 lambda_su_role,204 ):205 function_name = f"lambda_func-{short_uid()}"206 queue_name_1 = f"queue-{short_uid()}-1"207 create_lambda_function(208 func_name=function_name,209 handler_file=TEST_LAMBDA_PYTHON_ECHO,210 runtime=LAMBDA_RUNTIME_PYTHON36,211 role=lambda_su_role,212 )213 queue_url_1 = sqs_create_queue(QueueName=queue_name_1)214 queue_arn_1 = sqs_queue_arn(queue_url_1)215 lambda_client.create_event_source_mapping(216 EventSourceArn=queue_arn_1, FunctionName=function_name, MaximumBatchingWindowInSeconds=1217 )218 sqs_client.send_message(QueueUrl=queue_url_1, MessageBody=json.dumps({"foo": "bar"}))219 def assert_lambda_log_events():220 events = get_lambda_log_events(function_name=function_name, logs_client=logs_client)221 # lambda was invoked 1 time222 assert 1 == len(events[0]["Records"])223 retry(assert_lambda_log_events, sleep_before=3, retries=30)224 rs = sqs_client.receive_message(QueueUrl=queue_url_1)225 assert rs.get("Messages") is None226 def test_create_kinesis_event_source_mapping(227 self,228 create_lambda_function,229 lambda_client,230 kinesis_client,231 kinesis_create_stream,232 lambda_su_role,233 wait_for_stream_ready,234 logs_client,235 ):236 function_name = f"lambda_func-{short_uid()}"237 stream_name = f"test-foobar-{short_uid()}"238 create_lambda_function(239 func_name=function_name,240 handler_file=TEST_LAMBDA_PYTHON_ECHO,241 runtime=LAMBDA_RUNTIME_PYTHON36,242 role=lambda_su_role,243 )244 kinesis_create_stream(StreamName=stream_name, ShardCount=1)245 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][246 "StreamARN"247 ]248 # only valid against AWS / new provider (once implemented)249 if not is_old_provider():250 with pytest.raises(ClientError) as e:251 lambda_client.create_event_source_mapping(252 EventSourceArn=stream_arn, FunctionName=function_name253 )254 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)255 wait_for_stream_ready(stream_name=stream_name)256 lambda_client.create_event_source_mapping(257 EventSourceArn=stream_arn, FunctionName=function_name, StartingPosition="TRIM_HORIZON"258 )259 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)260 assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]261 num_events_kinesis = 10262 kinesis_client.put_records(263 Records=[264 {"Data": "{}", "PartitionKey": f"test_{i}"} for i in range(0, num_events_kinesis)265 ],266 StreamName=stream_name,267 )268 def get_lambda_events():269 events = get_lambda_log_events(function_name, logs_client=logs_client)270 assert events271 return events272 events = retry(get_lambda_events, retries=30)273 assert 10 == len(events[0]["Records"])274 assert "eventID" in events[0]["Records"][0]275 assert "eventSourceARN" in events[0]["Records"][0]276 assert "eventSource" in events[0]["Records"][0]277 assert "eventVersion" in events[0]["Records"][0]278 assert "eventName" in events[0]["Records"][0]279 assert "invokeIdentityArn" in events[0]["Records"][0]280 assert "awsRegion" in events[0]["Records"][0]281 assert "kinesis" in events[0]["Records"][0]282 def test_python_lambda_subscribe_sns_topic(283 self,284 create_lambda_function,285 sns_client,286 lambda_su_role,287 sns_topic,288 logs_client,289 lambda_client,290 ):291 function_name = f"{TEST_LAMBDA_FUNCTION_PREFIX}-{short_uid()}"292 permission_id = f"test-statement-{short_uid()}"293 lambda_creation_response = create_lambda_function(294 func_name=function_name,295 handler_file=TEST_LAMBDA_PYTHON_ECHO,296 runtime=LAMBDA_RUNTIME_PYTHON36,297 role=lambda_su_role,298 )299 lambda_arn = lambda_creation_response["CreateFunctionResponse"]["FunctionArn"]300 topic_arn = sns_topic["Attributes"]["TopicArn"]301 lambda_client.add_permission(302 FunctionName=function_name,303 StatementId=permission_id,304 Action="lambda:InvokeFunction",305 Principal="sns.amazonaws.com",306 SourceArn=topic_arn,307 )308 sns_client.subscribe(309 TopicArn=topic_arn,310 Protocol="lambda",311 Endpoint=lambda_arn,312 )313 subject = "[Subject] Test subject"314 message = "Hello world."315 sns_client.publish(TopicArn=topic_arn, Subject=subject, Message=message)316 events = retry(317 check_expected_lambda_log_events_length,318 retries=10,319 sleep=1,320 function_name=function_name,321 expected_length=1,322 regex_filter="Records.*Sns",323 logs_client=logs_client,324 )325 notification = events[0]["Records"][0]["Sns"]326 assert "Subject" in notification327 assert subject == notification["Subject"]328class TestLambdaHttpInvocation:329 def test_http_invocation_with_apigw_proxy(self, create_lambda_function):330 lambda_name = f"test_lambda_{short_uid()}"331 lambda_resource = "/api/v1/{proxy+}"332 lambda_path = "/api/v1/hello/world"333 lambda_request_context_path = "/" + TEST_STAGE_NAME + lambda_path334 lambda_request_context_resource_path = lambda_resource335 # create lambda function336 create_lambda_function(337 func_name=lambda_name,338 handler_file=TEST_LAMBDA_PYTHON,339 libs=TEST_LAMBDA_LIBS,340 )341 # create API Gateway and connect it to the Lambda proxy backend342 lambda_uri = aws_stack.lambda_function_arn(lambda_name)343 target_uri = f"arn:aws:apigateway:{aws_stack.get_region()}:lambda:path/2015-03-31/functions/{lambda_uri}/invocations"344 result = testutil.connect_api_gateway_to_http_with_lambda_proxy(345 "test_gateway2",346 target_uri,347 path=lambda_resource,348 stage_name=TEST_STAGE_NAME,349 )350 api_id = result["id"]351 url = path_based_url(api_id=api_id, stage_name=TEST_STAGE_NAME, path=lambda_path)352 result = safe_requests.post(353 url, data=b"{}", headers={"User-Agent": "python-requests/testing"}354 )355 content = json.loads(result.content)356 assert lambda_path == content["path"]357 assert lambda_resource == content["resource"]358 assert lambda_request_context_path == content["requestContext"]["path"]359 assert lambda_request_context_resource_path == content["requestContext"]["resourcePath"]360class TestKinesisSource:361 @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)362 def test_kinesis_lambda_parallelism(363 self,364 lambda_client,365 kinesis_client,366 create_lambda_function,367 kinesis_create_stream,368 wait_for_stream_ready,369 logs_client,370 lambda_su_role,371 ):372 function_name = f"lambda_func-{short_uid()}"373 stream_name = f"test-foobar-{short_uid()}"374 create_lambda_function(375 handler_file=TEST_LAMBDA_PARALLEL_FILE,376 func_name=function_name,377 runtime=LAMBDA_RUNTIME_PYTHON36,378 role=lambda_su_role,379 )380 kinesis_create_stream(StreamName=stream_name, ShardCount=1)381 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][382 "StreamARN"383 ]384 wait_for_stream_ready(stream_name=stream_name)385 lambda_client.create_event_source_mapping(386 EventSourceArn=stream_arn,387 FunctionName=function_name,388 StartingPosition="TRIM_HORIZON",389 BatchSize=10,390 )391 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)392 assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]393 num_events_kinesis = 10394 # assure async call395 start = time.perf_counter()396 kinesis_client.put_records(397 Records=[398 {"Data": '{"batch": 0}', "PartitionKey": f"test_{i}"}399 for i in range(0, num_events_kinesis)400 ],401 StreamName=stream_name,402 )403 assert (time.perf_counter() - start) < 1 # this should not take more than a second404 kinesis_client.put_records(405 Records=[406 {"Data": '{"batch": 1}', "PartitionKey": f"test_{i}"}407 for i in range(0, num_events_kinesis)408 ],409 StreamName=stream_name,410 )411 def get_events():412 events = get_lambda_log_events(413 function_name, regex_filter=r"event.*Records", logs_client=logs_client414 )415 assert len(events) == 2416 return events417 events = retry(get_events, retries=30)418 def assertEvent(event, batch_no):419 assert 10 == len(event["event"]["Records"])420 assert "eventID" in event["event"]["Records"][0]421 assert "eventSourceARN" in event["event"]["Records"][0]422 assert "eventSource" in event["event"]["Records"][0]423 assert "eventVersion" in event["event"]["Records"][0]424 assert "eventName" in event["event"]["Records"][0]425 assert "invokeIdentityArn" in event["event"]["Records"][0]426 assert "awsRegion" in event["event"]["Records"][0]427 assert "kinesis" in event["event"]["Records"][0]428 assert {"batch": batch_no} == json.loads(429 base64.b64decode(event["event"]["Records"][0]["kinesis"]["data"]).decode(430 config.DEFAULT_ENCODING431 )432 )433 assertEvent(events[0], 0)434 assertEvent(events[1], 1)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful