Best Python code snippet using localstack_python
test_dynamodb.py
Source:test_dynamodb.py
...290 self, dynamodb_client, dynamodb_create_table_with_parameters, dynamodb_wait_for_table_active291 ):292 try:293 table_name = f"test-table-{short_uid()}"294 dynamodb_create_table_with_parameters(295 TableName=table_name,296 KeySchema=[297 {"AttributeName": "PK", "KeyType": "HASH"},298 {"AttributeName": "SK", "KeyType": "RANGE"},299 ],300 AttributeDefinitions=[301 {"AttributeName": "PK", "AttributeType": "S"},302 {"AttributeName": "SK", "AttributeType": "S"},303 {"AttributeName": "LSI1SK", "AttributeType": "N"},304 ],305 LocalSecondaryIndexes=[306 {307 "IndexName": "LSI1",308 "KeySchema": [309 {"AttributeName": "PK", "KeyType": "HASH"},310 {"AttributeName": "LSI1SK", "KeyType": "RANGE"},311 ],312 "Projection": {"ProjectionType": "ALL"},313 }314 ],315 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},316 Tags=TEST_DDB_TAGS,317 )318 dynamodb_wait_for_table_active(table_name)319 item = {"SK": {"S": "hello"}, "LSI1SK": {"N": "123"}, "PK": {"S": "test one"}}320 dynamodb_client.put_item(TableName=table_name, Item=item)321 result = dynamodb_client.query(322 TableName=table_name,323 IndexName="LSI1",324 KeyConditionExpression="PK = :v1",325 ExpressionAttributeValues={":v1": {"S": "test one"}},326 Select="ALL_ATTRIBUTES",327 )328 assert result["Items"] == [item]329 finally:330 dynamodb_client.delete_table(TableName=table_name)331 def test_more_than_20_global_secondary_indexes(self, dynamodb, dynamodb_client):332 table_name = f"test-table-{short_uid()}"333 num_gsis = 25334 attrs = [{"AttributeName": f"a{i}", "AttributeType": "S"} for i in range(num_gsis)]335 gsis = [336 {337 "IndexName": f"gsi_{i}",338 "KeySchema": [{"AttributeName": f"a{i}", "KeyType": "HASH"}],339 "Projection": {"ProjectionType": "ALL"},340 }341 for i in range(num_gsis)342 ]343 dynamodb.create_table(344 TableName=table_name,345 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],346 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}, *attrs],347 GlobalSecondaryIndexes=gsis,348 BillingMode="PAY_PER_REQUEST",349 )350 table = dynamodb_client.describe_table(TableName=table_name)351 assert len(table["Table"]["GlobalSecondaryIndexes"]) == num_gsis352 # clean up353 delete_table(table_name)354 @pytest.mark.aws_validated355 def test_return_values_in_put_item(self, dynamodb, dynamodb_client):356 aws_stack.create_dynamodb_table(357 TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY, client=dynamodb_client358 )359 table = dynamodb.Table(TEST_DDB_TABLE_NAME)360 def _validate_response(response, expected: dict = {}):361 """362 Validates the response against the optionally expected one.363 It checks that the response doesn't contain `Attributes`,364 `ConsumedCapacity` and `ItemCollectionMetrics` unless they are expected.365 """366 should_not_contain = {367 "Attributes",368 "ConsumedCapacity",369 "ItemCollectionMetrics",370 } - expected.keys()371 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200372 assert expected.items() <= response.items()373 assert response.keys().isdisjoint(should_not_contain)374 # items which are being used to put in the table375 item1 = {PARTITION_KEY: "id1", "data": "foobar"}376 item1b = {PARTITION_KEY: "id1", "data": "barfoo"}377 item2 = {PARTITION_KEY: "id2", "data": "foobar"}378 response = table.put_item(Item=item1, ReturnValues="ALL_OLD")379 # there is no data present in the table already so even if return values380 # is set to 'ALL_OLD' as there is no data it will not return any data.381 _validate_response(response)382 # now the same data is present so when we pass return values as 'ALL_OLD'383 # it should give us attributes384 response = table.put_item(Item=item1, ReturnValues="ALL_OLD")385 _validate_response(response, expected={"Attributes": item1})386 # now a previous version of data is present, so when we pass return387 # values as 'ALL_OLD' it should give us the old attributes388 response = table.put_item(Item=item1b, ReturnValues="ALL_OLD")389 _validate_response(response, expected={"Attributes": item1})390 response = table.put_item(Item=item2)391 # we do not have any same item as item2 already so when we add this by default392 # return values is set to None so no Attribute values should be returned393 _validate_response(response)394 response = table.put_item(Item=item2)395 # in this case we already have item2 in the table so on this request396 # it should not return any data as return values is set to None so no397 # Attribute values should be returned398 _validate_response(response)399 # cleanup400 table.delete()401 @pytest.mark.aws_validated402 def test_empty_and_binary_values(self, dynamodb, dynamodb_client):403 aws_stack.create_dynamodb_table(404 TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY, client=dynamodb_client405 )406 table = dynamodb.Table(TEST_DDB_TABLE_NAME)407 # items which are being used to put in the table408 item1 = {PARTITION_KEY: "id1", "data": ""}409 item2 = {PARTITION_KEY: "id2", "data": b"\x90"}410 response = table.put_item(Item=item1)411 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200412 response = table.put_item(Item=item2)413 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200414 # clean up415 table.delete()416 def test_batch_write_binary(self, dynamodb_client):417 table_name = "table_batch_binary_%s" % short_uid()418 dynamodb_client.create_table(419 TableName=table_name,420 AttributeDefinitions=[421 {"AttributeName": "PK", "AttributeType": "S"},422 {"AttributeName": "SK", "AttributeType": "S"},423 ],424 KeySchema=[425 {"AttributeName": "PK", "KeyType": "HASH"},426 {"AttributeName": "SK", "KeyType": "RANGE"},427 ],428 BillingMode="PAY_PER_REQUEST",429 )430 dynamodb_client.put_item(431 TableName=table_name,432 Item={"PK": {"S": "hello"}, "SK": {"S": "user"}, "data": {"B": b"test"}},433 )434 item = {435 "Item": {436 "PK": {"S": "hello-1"},437 "SK": {"S": "user-1"},438 "data": {"B": b"test-1"},439 }440 }441 item_non_decodable = {442 "Item": {443 "PK": {"S": "hello-2"},444 "SK": {"S": "user-2"},445 "data": {"B": b"test \xc0 \xed"},446 }447 }448 response = dynamodb_client.batch_write_item(449 RequestItems={table_name: [{"PutRequest": item}, {"PutRequest": item_non_decodable}]}450 )451 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200452 dynamodb_client.delete_table(TableName=table_name)453 def test_binary_data_with_stream(454 self,455 wait_for_stream_ready,456 dynamodb_create_table_with_parameters,457 dynamodb_client,458 kinesis_client,459 ):460 table_name = f"table-{short_uid()}"461 dynamodb_create_table_with_parameters(462 TableName=table_name,463 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],464 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],465 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},466 StreamSpecification={467 "StreamEnabled": True,468 "StreamViewType": "NEW_AND_OLD_IMAGES",469 },470 )471 stream_name = get_kinesis_stream_name(table_name)472 wait_for_stream_ready(stream_name)473 response = dynamodb_client.put_item(474 TableName=table_name, Item={"id": {"S": "id1"}, "data": {"B": b"\x90"}}475 )476 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200477 iterator = get_shard_iterator(stream_name, kinesis_client)478 response = kinesis_client.get_records(ShardIterator=iterator)479 json_records = response.get("Records")480 assert 1 == len(json_records)481 assert "Data" in json_records[0]482 def test_dynamodb_stream_shard_iterator(self, wait_for_stream_ready):483 dynamodb = aws_stack.create_external_boto_client("dynamodb")484 ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")485 table_name = "table_with_stream-%s" % short_uid()486 table = dynamodb.create_table(487 TableName=table_name,488 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],489 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],490 StreamSpecification={491 "StreamEnabled": True,492 "StreamViewType": "NEW_IMAGE",493 },494 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},495 )496 stream_name = get_kinesis_stream_name(table_name)497 wait_for_stream_ready(stream_name)498 stream_arn = table["TableDescription"]["LatestStreamArn"]499 result = ddbstreams.describe_stream(StreamArn=stream_arn)500 response = ddbstreams.get_shard_iterator(501 StreamArn=stream_arn,502 ShardId=result["StreamDescription"]["Shards"][0]["ShardId"],503 ShardIteratorType="LATEST",504 )505 assert "ShardIterator" in response506 response = ddbstreams.get_shard_iterator(507 StreamArn=stream_arn,508 ShardId=result["StreamDescription"]["Shards"][0]["ShardId"],509 ShardIteratorType="AT_SEQUENCE_NUMBER",510 SequenceNumber=result["StreamDescription"]["Shards"][0]511 .get("SequenceNumberRange")512 .get("StartingSequenceNumber"),513 )514 assert "ShardIterator" in response515 def test_dynamodb_create_table_with_class(self, dynamodb_client):516 table_name = "table_with_class_%s" % short_uid()517 # create table518 result = dynamodb_client.create_table(519 TableName=table_name,520 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],521 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],522 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},523 TableClass="STANDARD",524 )525 assert result["TableDescription"]["TableClassSummary"]["TableClass"] == "STANDARD"526 result = dynamodb_client.describe_table(TableName=table_name)527 assert result["Table"]["TableClassSummary"]["TableClass"] == "STANDARD"528 result = dynamodb_client.update_table(529 TableName=table_name, TableClass="STANDARD_INFREQUENT_ACCESS"530 )531 assert (532 result["TableDescription"]["TableClassSummary"]["TableClass"]533 == "STANDARD_INFREQUENT_ACCESS"534 )535 result = dynamodb_client.describe_table(TableName=table_name)536 assert result["Table"]["TableClassSummary"]["TableClass"] == "STANDARD_INFREQUENT_ACCESS"537 # clean resources538 dynamodb_client.delete_table(TableName=table_name)539 def test_dynamodb_execute_transaction(self, dynamodb_client):540 table_name = "table_%s" % short_uid()541 dynamodb_client.create_table(542 TableName=table_name,543 KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],544 AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],545 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},546 )547 statements = [548 {"Statement": f"INSERT INTO {table_name} VALUE {{'Username': 'user01'}}"},549 {"Statement": f"INSERT INTO {table_name} VALUE {{'Username': 'user02'}}"},550 ]551 result = dynamodb_client.execute_transaction(TransactStatements=statements)552 assert result["ResponseMetadata"]["HTTPStatusCode"] == 200553 result = dynamodb_client.scan(TableName=table_name)554 assert result["ScannedCount"] == 2555 dynamodb_client.delete_table(TableName=table_name)556 def test_dynamodb_batch_execute_statement(self, dynamodb_client):557 table_name = "table_%s" % short_uid()558 dynamodb_client.create_table(559 TableName=table_name,560 KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],561 AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],562 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},563 )564 dynamodb_client.put_item(TableName=table_name, Item={"Username": {"S": "user02"}})565 statements = [566 {"Statement": f"INSERT INTO {table_name} VALUE {{'Username': 'user01'}}"},567 {"Statement": f"UPDATE {table_name} SET Age=20 WHERE Username='user02'"},568 ]569 result = dynamodb_client.batch_execute_statement(Statements=statements)570 # actions always succeeds571 assert not any("Error" in r for r in result["Responses"])572 item = dynamodb_client.get_item(TableName=table_name, Key={"Username": {"S": "user02"}})[573 "Item"574 ]575 assert item["Age"]["N"] == "20"576 item = dynamodb_client.get_item(TableName=table_name, Key={"Username": {"S": "user01"}})[577 "Item"578 ]579 assert item580 dynamodb_client.delete_table(TableName=table_name)581 def test_dynamodb_partiql_missing(self, dynamodb_client):582 table_name = "table_with_stream_%s" % short_uid()583 # create table584 dynamodb_client.create_table(585 TableName=table_name,586 KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],587 AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],588 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},589 )590 # create items with FirstName attribute591 dynamodb_client.execute_statement(592 Statement=f"INSERT INTO {table_name} VALUE {{'Username': 'Alice123', 'FirstName':'Alice'}}"593 )594 items = dynamodb_client.execute_statement(595 Statement=f"SELECT * FROM {table_name} WHERE FirstName IS NOT MISSING"596 )["Items"]597 assert len(items) == 1598 items = dynamodb_client.execute_statement(599 Statement=f"SELECT * FROM {table_name} WHERE FirstName IS MISSING"600 )["Items"]601 assert len(items) == 0602 dynamodb_client.delete_table(TableName=table_name)603 def test_dynamodb_stream_stream_view_type(self):604 dynamodb = aws_stack.create_external_boto_client("dynamodb")605 ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")606 table_name = "table_with_stream_%s" % short_uid()607 # create table608 table = dynamodb.create_table(609 TableName=table_name,610 KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],611 AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],612 StreamSpecification={613 "StreamEnabled": True,614 "StreamViewType": "KEYS_ONLY",615 },616 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},617 )618 stream_arn = table["TableDescription"]["LatestStreamArn"]619 # wait for stream to be created620 sleep(1)621 # put item in table - INSERT event622 dynamodb.put_item(TableName=table_name, Item={"Username": {"S": "Fred"}})623 # update item in table - MODIFY event624 dynamodb.update_item(625 TableName=table_name,626 Key={"Username": {"S": "Fred"}},627 UpdateExpression="set S=:r",628 ExpressionAttributeValues={":r": {"S": "Fred_Modified"}},629 ReturnValues="UPDATED_NEW",630 )631 # delete item in table - REMOVE event632 dynamodb.delete_item(TableName=table_name, Key={"Username": {"S": "Fred"}})633 result = ddbstreams.describe_stream(StreamArn=stream_arn)634 # assert stream_view_type of the table635 assert result["StreamDescription"]["StreamViewType"] == "KEYS_ONLY"636 # add item via PartiQL query - INSERT event637 dynamodb.execute_statement(638 Statement=f"INSERT INTO {table_name} VALUE {{'Username': 'Alice'}}"639 )640 # run update via PartiQL query - MODIFY event641 dynamodb.execute_statement(642 Statement=f"UPDATE {table_name} SET partiql=1 WHERE Username='Alice'"643 )644 # run update via PartiQL query - REMOVE event645 dynamodb.execute_statement(Statement=f"DELETE FROM {table_name} WHERE Username='Alice'")646 # get shard iterator647 response = ddbstreams.get_shard_iterator(648 StreamArn=stream_arn,649 ShardId=result["StreamDescription"]["Shards"][0]["ShardId"],650 ShardIteratorType="AT_SEQUENCE_NUMBER",651 SequenceNumber=result["StreamDescription"]["Shards"][0]652 .get("SequenceNumberRange")653 .get("StartingSequenceNumber"),654 )655 # get stream records656 records = ddbstreams.get_records(ShardIterator=response["ShardIterator"])["Records"]657 assert len(records) == 6658 events = [rec["eventName"] for rec in records]659 assert events == ["INSERT", "MODIFY", "REMOVE"] * 2660 # assert that all records contain proper event IDs661 event_ids = [rec.get("eventID") for rec in records]662 assert all(event_ids)663 # assert that updates have been received from regular table operations and PartiQL query operations664 for idx, record in enumerate(records):665 assert "SequenceNumber" in record["dynamodb"]666 assert record["dynamodb"]["StreamViewType"] == "KEYS_ONLY"667 assert record["dynamodb"]["Keys"] == {"Username": {"S": "Fred" if idx < 3 else "Alice"}}668 assert "OldImage" not in record["dynamodb"]669 assert "NewImage" not in record["dynamodb"]670 # clean up671 delete_table(table_name)672 def test_dynamodb_with_kinesis_stream(self):673 dynamodb = aws_stack.create_external_boto_client("dynamodb")674 kinesis = aws_stack.create_external_boto_client("kinesis")675 # create kinesis datastream676 stream_name = "kinesis_dest_stream"677 kinesis.create_stream(StreamName=stream_name, ShardCount=1)678 # wait for the stream to be created679 sleep(1)680 # Get stream description681 stream_description = kinesis.describe_stream(StreamName=stream_name)["StreamDescription"]682 table_name = "table_with_kinesis_stream-%s" % short_uid()683 # create table684 dynamodb.create_table(685 TableName=table_name,686 KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],687 AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],688 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},689 )690 # Enable kinesis destination for the table691 dynamodb.enable_kinesis_streaming_destination(692 TableName=table_name, StreamArn=stream_description["StreamARN"]693 )694 # put item into table695 dynamodb.put_item(TableName=table_name, Item={"Username": {"S": "Fred"}})696 # update item in table697 dynamodb.update_item(698 TableName=table_name,699 Key={"Username": {"S": "Fred"}},700 UpdateExpression="set S=:r",701 ExpressionAttributeValues={":r": {"S": "Fred_Modified"}},702 ReturnValues="UPDATED_NEW",703 )704 # delete item in table705 dynamodb.delete_item(TableName=table_name, Key={"Username": {"S": "Fred"}})706 def _fetch_records():707 records = aws_stack.kinesis_get_latest_records(708 stream_name, shard_id=stream_description["Shards"][0]["ShardId"]709 )710 assert len(records) == 3711 return records712 # get records from the stream713 records = retry(_fetch_records)714 for record in records:715 record = json.loads(record["Data"])716 assert record["tableName"] == table_name717 # check eventSourceARN not exists in the stream record718 assert "eventSourceARN" not in record719 if record["eventName"] == "INSERT":720 assert "OldImage" not in record["dynamodb"]721 assert "NewImage" in record["dynamodb"]722 elif record["eventName"] == "MODIFY":723 assert "NewImage" in record["dynamodb"]724 assert "OldImage" in record["dynamodb"]725 elif record["eventName"] == "REMOVE":726 assert "NewImage" not in record["dynamodb"]727 assert "OldImage" in record["dynamodb"]728 # describe kinesis streaming destination of the table729 destinations = dynamodb.describe_kinesis_streaming_destination(TableName=table_name)730 destination = destinations["KinesisDataStreamDestinations"][0]731 # assert kinesis streaming destination status732 assert stream_description["StreamARN"] == destination["StreamArn"]733 assert destination["DestinationStatus"] == "ACTIVE"734 # Disable kinesis destination735 dynamodb.disable_kinesis_streaming_destination(736 TableName=table_name, StreamArn=stream_description["StreamARN"]737 )738 # describe kinesis streaming destination of the table739 result = dynamodb.describe_kinesis_streaming_destination(TableName=table_name)740 destination = result["KinesisDataStreamDestinations"][0]741 # assert kinesis streaming destination status742 assert stream_description["StreamARN"] == destination["StreamArn"]743 assert destination["DestinationStatus"] == "DISABLED"744 # clean up745 delete_table(table_name)746 kinesis.delete_stream(StreamName="kinesis_dest_stream")747 def test_global_tables(self):748 aws_stack.create_dynamodb_table(TEST_DDB_TABLE_NAME, partition_key=PARTITION_KEY)749 dynamodb = aws_stack.create_external_boto_client("dynamodb")750 # create global table751 regions = [752 {"RegionName": "us-east-1"},753 {"RegionName": "us-west-1"},754 {"RegionName": "eu-central-1"},755 ]756 response = dynamodb.create_global_table(757 GlobalTableName=TEST_DDB_TABLE_NAME, ReplicationGroup=regions758 )["GlobalTableDescription"]759 assert "ReplicationGroup" in response760 assert len(response["ReplicationGroup"]) == len(regions)761 # describe global table762 response = dynamodb.describe_global_table(GlobalTableName=TEST_DDB_TABLE_NAME)[763 "GlobalTableDescription"764 ]765 assert "ReplicationGroup" in response766 assert len(regions) == len(response["ReplicationGroup"])767 # update global table768 updates = [769 {"Create": {"RegionName": "us-east-2"}},770 {"Create": {"RegionName": "us-west-2"}},771 {"Delete": {"RegionName": "us-west-1"}},772 ]773 response = dynamodb.update_global_table(774 GlobalTableName=TEST_DDB_TABLE_NAME, ReplicaUpdates=updates775 )["GlobalTableDescription"]776 assert "ReplicationGroup" in response777 assert len(response["ReplicationGroup"]) == len(regions) + 1778 # assert exceptions for invalid requests779 with pytest.raises(Exception) as ctx:780 dynamodb.create_global_table(781 GlobalTableName=TEST_DDB_TABLE_NAME, ReplicationGroup=regions782 )783 assert ctx.match("GlobalTableAlreadyExistsException")784 with pytest.raises(Exception) as ctx:785 dynamodb.describe_global_table(GlobalTableName="invalid-table-name")786 assert ctx.match("GlobalTableNotFoundException")787 def test_create_duplicate_table(self, dynamodb_create_table_with_parameters):788 table_name = "duplicateTable"789 dynamodb_create_table_with_parameters(790 TableName=table_name,791 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],792 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],793 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},794 Tags=TEST_DDB_TAGS,795 )796 with pytest.raises(Exception) as ctx:797 dynamodb_create_table_with_parameters(798 TableName=table_name,799 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],800 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],801 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},802 Tags=TEST_DDB_TAGS,803 )804 ctx.match("ResourceInUseException")805 def test_delete_table(self, dynamodb_client, dynamodb_create_table):806 table_name = "test-ddb-table-%s" % short_uid()807 tables_before = len(dynamodb_client.list_tables()["TableNames"])808 dynamodb_create_table(809 table_name=table_name,810 partition_key=PARTITION_KEY,811 )812 table_list = dynamodb_client.list_tables()813 # TODO: fix assertion, to enable parallel test execution!814 assert tables_before + 1 == len(table_list["TableNames"])815 assert table_name in table_list["TableNames"]816 dynamodb_client.delete_table(TableName=table_name)817 table_list = dynamodb_client.list_tables()818 assert tables_before == len(table_list["TableNames"])819 with pytest.raises(Exception) as ctx:820 dynamodb_client.delete_table(TableName=table_name)821 assert ctx.match("ResourceNotFoundException")822 def test_transaction_write_items(self, dynamodb_client, dynamodb_create_table_with_parameters):823 table_name = "test-ddb-table-%s" % short_uid()824 dynamodb_create_table_with_parameters(825 TableName=table_name,826 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],827 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],828 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},829 Tags=TEST_DDB_TAGS,830 )831 response = dynamodb_client.transact_write_items(832 TransactItems=[833 {834 "ConditionCheck": {835 "TableName": table_name,836 "ConditionExpression": "attribute_not_exists(id)",837 "Key": {"id": {"S": "test1"}},838 }839 },840 {"Put": {"TableName": table_name, "Item": {"id": {"S": "test2"}}}},841 {842 "Update": {843 "TableName": table_name,844 "Key": {"id": {"S": "test3"}},845 "UpdateExpression": "SET attr1 = :v1, attr2 = :v2",846 "ExpressionAttributeValues": {847 ":v1": {"S": "value1"},848 ":v2": {"S": "value2"},849 },850 }851 },852 {"Delete": {"TableName": table_name, "Key": {"id": {"S": "test4"}}}},853 ]854 )855 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200856 @pytest.mark.aws_validated857 def test_transaction_write_canceled(858 self, dynamodb_create_table_with_parameters, dynamodb_wait_for_table_active, dynamodb_client859 ):860 table_name = "table_%s" % short_uid()861 # create table862 dynamodb_create_table_with_parameters(863 TableName=table_name,864 KeySchema=[{"AttributeName": "Username", "KeyType": "HASH"}],865 AttributeDefinitions=[{"AttributeName": "Username", "AttributeType": "S"}],866 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},867 )868 dynamodb_wait_for_table_active(table_name)869 # put item in table - INSERT event870 dynamodb_client.put_item(TableName=table_name, Item={"Username": {"S": "Fred"}})871 # provoke a TransactionCanceledException by adding a condition which is not met872 with pytest.raises(Exception) as ctx:873 dynamodb_client.transact_write_items(874 TransactItems=[875 {876 "ConditionCheck": {877 "TableName": table_name,878 "ConditionExpression": "attribute_not_exists(Username)",879 "Key": {"Username": {"S": "Fred"}},880 }881 },882 {"Delete": {"TableName": table_name, "Key": {"Username": {"S": "Bert"}}}},883 ]884 )885 # Make sure the exception contains the cancellation reasons886 assert ctx.match("TransactionCanceledException")887 assert (888 str(ctx.value)889 == "An error occurred (TransactionCanceledException) when calling the TransactWriteItems operation: "890 "Transaction cancelled, please refer cancellation reasons for specific reasons "891 "[ConditionalCheckFailed, None]"892 )893 assert hasattr(ctx.value, "response")894 assert "CancellationReasons" in ctx.value.response895 conditional_check_failed = [896 reason897 for reason in ctx.value.response["CancellationReasons"]898 if reason.get("Code") == "ConditionalCheckFailed"899 ]900 assert len(conditional_check_failed) == 1901 assert "Message" in conditional_check_failed[0]902 # dynamodb-local adds a trailing "." to the message, AWS does not903 assert re.match(904 r"^The conditional request failed\.?$", conditional_check_failed[0]["Message"]905 )906 def test_transaction_write_binary_data(907 self, dynamodb_client, dynamodb_create_table_with_parameters908 ):909 table_name = "test-ddb-table-%s" % short_uid()910 dynamodb_create_table_with_parameters(911 TableName=table_name,912 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],913 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],914 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},915 Tags=TEST_DDB_TAGS,916 )917 binary_item = {"B": b"foobar"}918 response = dynamodb_client.transact_write_items(919 TransactItems=[920 {921 "Put": {922 "TableName": table_name,923 "Item": {924 "id": {"S": "someUser"},925 "binaryData": binary_item,926 },927 }928 }929 ]930 )931 item = dynamodb_client.get_item(TableName=table_name, Key={"id": {"S": "someUser"}})["Item"]932 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200933 assert item["binaryData"]934 assert item["binaryData"] == binary_item935 def test_transact_get_items(self, dynamodb_client, dynamodb_create_table):936 table_name = "test-ddb-table-%s" % short_uid()937 dynamodb_create_table(938 table_name=table_name,939 partition_key=PARTITION_KEY,940 )941 dynamodb_client.put_item(TableName=table_name, Item={"id": {"S": "John"}})942 result = dynamodb_client.transact_get_items(943 TransactItems=[{"Get": {"Key": {"id": {"S": "John"}}, "TableName": table_name}}]944 )945 assert result["ResponseMetadata"]["HTTPStatusCode"] == 200946 def test_batch_write_items(self, dynamodb_client, dynamodb_create_table_with_parameters):947 table_name = "test-ddb-table-%s" % short_uid()948 dynamodb_create_table_with_parameters(949 TableName=table_name,950 KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],951 AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],952 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},953 Tags=TEST_DDB_TAGS,954 )955 dynamodb_client.put_item(TableName=table_name, Item={"id": {"S": "Fred"}})956 response = dynamodb_client.batch_write_item(957 RequestItems={958 table_name: [959 {"DeleteRequest": {"Key": {"id": {"S": "Fred"}}}},960 {"PutRequest": {"Item": {"id": {"S": "Bob"}}}},961 ]962 }963 )964 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200965 @pytest.mark.xfail(reason="this test flakes regularly in CI")966 def test_dynamodb_stream_records_with_update_item(967 self,968 dynamodb_client,969 dynamodbstreams_client,970 dynamodb_resource,971 dynamodb_create_table,972 wait_for_stream_ready,973 ):974 table_name = f"test-ddb-table-{short_uid()}"975 dynamodb_create_table(976 table_name=table_name,977 partition_key=PARTITION_KEY,978 stream_view_type="NEW_AND_OLD_IMAGES",979 )980 table = dynamodb_resource.Table(table_name)981 stream_name = get_kinesis_stream_name(table_name)982 wait_for_stream_ready(stream_name)983 response = dynamodbstreams_client.describe_stream(StreamArn=table.latest_stream_arn)984 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200985 assert len(response["StreamDescription"]["Shards"]) == 1986 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]987 starting_sequence_number = int(988 response["StreamDescription"]["Shards"][0]989 .get("SequenceNumberRange")990 .get("StartingSequenceNumber")991 )992 response = dynamodbstreams_client.get_shard_iterator(993 StreamArn=table.latest_stream_arn,994 ShardId=shard_id,995 ShardIteratorType="LATEST",996 )997 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200998 assert "ShardIterator" in response999 iterator_id = response["ShardIterator"]1000 item_id = short_uid()1001 for _ in range(2):1002 dynamodb_client.update_item(1003 TableName=table_name,1004 Key={PARTITION_KEY: {"S": item_id}},1005 UpdateExpression="SET attr1 = :v1, attr2 = :v2",1006 ExpressionAttributeValues={1007 ":v1": {"S": "value1"},1008 ":v2": {"S": "value2"},1009 },1010 ReturnValues="ALL_NEW",1011 ReturnConsumedCapacity="INDEXES",1012 )1013 def check_expected_records():1014 records = dynamodbstreams_client.get_records(ShardIterator=iterator_id)1015 assert records["ResponseMetadata"]["HTTPStatusCode"] == 2001016 assert len(records["Records"]) == 21017 assert isinstance(1018 records["Records"][0]["dynamodb"]["ApproximateCreationDateTime"],1019 datetime,1020 )1021 assert records["Records"][0]["dynamodb"]["ApproximateCreationDateTime"].microsecond == 01022 assert records["Records"][0]["eventVersion"] == "1.1"1023 assert records["Records"][0]["eventName"] == "INSERT"1024 assert "OldImage" not in records["Records"][0]["dynamodb"]1025 assert (1026 int(records["Records"][0]["dynamodb"]["SequenceNumber"]) > starting_sequence_number1027 )1028 assert isinstance(1029 records["Records"][1]["dynamodb"]["ApproximateCreationDateTime"],1030 datetime,1031 )1032 assert records["Records"][1]["dynamodb"]["ApproximateCreationDateTime"].microsecond == 01033 assert records["Records"][1]["eventVersion"] == "1.1"1034 assert records["Records"][1]["eventName"] == "MODIFY"1035 assert "OldImage" in records["Records"][1]["dynamodb"]1036 assert (1037 int(records["Records"][1]["dynamodb"]["SequenceNumber"]) > starting_sequence_number1038 )1039 retry(check_expected_records, retries=5, sleep=1, sleep_before=2)1040 def test_query_on_deleted_resource(self, dynamodb_client, dynamodb_create_table):1041 table_name = "ddb-table-%s" % short_uid()1042 partition_key = "username"1043 dynamodb_create_table(table_name=table_name, partition_key=partition_key)1044 rs = dynamodb_client.query(1045 TableName=table_name,1046 KeyConditionExpression="{} = :username".format(partition_key),1047 ExpressionAttributeValues={":username": {"S": "test"}},1048 )1049 assert rs["ResponseMetadata"]["HTTPStatusCode"] == 2001050 dynamodb_client.delete_table(TableName=table_name)1051 with pytest.raises(Exception) as ctx:1052 dynamodb_client.query(1053 TableName=table_name,1054 KeyConditionExpression="{} = :username".format(partition_key),1055 ExpressionAttributeValues={":username": {"S": "test"}},1056 )1057 assert ctx.match("ResourceNotFoundException")1058 def test_dynamodb_stream_to_lambda(1059 self, lambda_client, dynamodb_resource, dynamodb_create_table, wait_for_stream_ready1060 ):1061 table_name = "ddb-table-%s" % short_uid()1062 function_name = "func-%s" % short_uid()1063 partition_key = "SK"1064 dynamodb_create_table(1065 table_name=table_name,1066 partition_key=partition_key,1067 stream_view_type="NEW_AND_OLD_IMAGES",1068 )1069 table = dynamodb_resource.Table(table_name)1070 latest_stream_arn = table.latest_stream_arn1071 stream_name = get_kinesis_stream_name(table_name)1072 wait_for_stream_ready(stream_name)1073 testutil.create_lambda_function(1074 handler_file=TEST_LAMBDA_PYTHON_ECHO,1075 func_name=function_name,1076 runtime=LAMBDA_RUNTIME_PYTHON36,1077 )1078 mapping_uuid = lambda_client.create_event_source_mapping(1079 EventSourceArn=latest_stream_arn,1080 FunctionName=function_name,1081 StartingPosition="TRIM_HORIZON",1082 )["UUID"]1083 item = {"SK": short_uid(), "Name": "name-{}".format(short_uid())}1084 table.put_item(Item=item)1085 events = retry(1086 check_expected_lambda_log_events_length,1087 retries=10,1088 sleep=1,1089 function_name=function_name,1090 expected_length=1,1091 regex_filter=r"Records",1092 )1093 assert len(events) == 11094 assert len(events[0]["Records"]) == 11095 dynamodb_event = events[0]["Records"][0]["dynamodb"]1096 assert dynamodb_event["StreamViewType"] == "NEW_AND_OLD_IMAGES"1097 assert dynamodb_event["Keys"] == {"SK": {"S": item["SK"]}}1098 assert dynamodb_event["NewImage"]["Name"] == {"S": item["Name"]}1099 assert "SequenceNumber" in dynamodb_event1100 lambda_client.delete_event_source_mapping(UUID=mapping_uuid)1101 def test_dynamodb_batch_write_item(1102 self, dynamodb_client, dynamodb_create_table_with_parameters1103 ):1104 table_name = "ddb-table-%s" % short_uid()1105 dynamodb_create_table_with_parameters(1106 TableName=table_name,1107 KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1108 AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1109 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1110 Tags=TEST_DDB_TAGS,1111 )1112 result = dynamodb_client.batch_write_item(1113 RequestItems={1114 table_name: [1115 {"PutRequest": {"Item": {PARTITION_KEY: {"S": "Test1"}}}},1116 {"PutRequest": {"Item": {PARTITION_KEY: {"S": "Test2"}}}},1117 {"PutRequest": {"Item": {PARTITION_KEY: {"S": "Test3"}}}},1118 ]1119 }1120 )1121 assert result.get("UnprocessedItems") == {}1122 def test_dynamodb_pay_per_request(self, dynamodb_create_table_with_parameters):1123 table_name = "ddb-table-%s" % short_uid()1124 with pytest.raises(Exception) as e:1125 dynamodb_create_table_with_parameters(1126 TableName=table_name,1127 KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1128 AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1129 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1130 BillingMode="PAY_PER_REQUEST",1131 )1132 assert e.match("ValidationException")1133 def test_dynamodb_create_table_with_sse_specification(1134 self, dynamodb_create_table_with_parameters1135 ):1136 table_name = "ddb-table-%s" % short_uid()1137 kms_master_key_id = long_uid()1138 sse_specification = {"Enabled": True, "SSEType": "KMS", "KMSMasterKeyId": kms_master_key_id}1139 kms_master_key_arn = aws_stack.kms_key_arn(kms_master_key_id)1140 result = dynamodb_create_table_with_parameters(1141 TableName=table_name,1142 KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1143 AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1144 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1145 SSESpecification=sse_specification,1146 Tags=TEST_DDB_TAGS,1147 )1148 assert result["TableDescription"]["SSEDescription"]1149 assert result["TableDescription"]["SSEDescription"]["Status"] == "ENABLED"1150 assert result["TableDescription"]["SSEDescription"]["KMSMasterKeyArn"] == kms_master_key_arn1151 def test_dynamodb_create_table_with_partial_sse_specification(1152 self, dynamodb_create_table_with_parameters, kms_client1153 ):1154 table_name = "ddb-table-%s" % short_uid()1155 sse_specification = {"Enabled": True}1156 result = dynamodb_create_table_with_parameters(1157 TableName=table_name,1158 KeySchema=[{"AttributeName": PARTITION_KEY, "KeyType": "HASH"}],1159 AttributeDefinitions=[{"AttributeName": PARTITION_KEY, "AttributeType": "S"}],1160 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1161 SSESpecification=sse_specification,1162 Tags=TEST_DDB_TAGS,1163 )1164 assert result["TableDescription"]["SSEDescription"]1165 assert result["TableDescription"]["SSEDescription"]["Status"] == "ENABLED"1166 assert result["TableDescription"]["SSEDescription"]["SSEType"] == "KMS"1167 assert "KMSMasterKeyArn" in result["TableDescription"]["SSEDescription"]1168 kms_master_key_arn = result["TableDescription"]["SSEDescription"]["KMSMasterKeyArn"]1169 result = kms_client.describe_key(KeyId=kms_master_key_arn)1170 assert result["KeyMetadata"]["KeyManager"] == "AWS"1171 def test_dynamodb_get_batch_items(self, dynamodb_client, dynamodb_create_table_with_parameters):1172 table_name = "ddb-table-%s" % short_uid()1173 dynamodb_create_table_with_parameters(1174 TableName=table_name,1175 KeySchema=[{"AttributeName": "PK", "KeyType": "HASH"}],1176 AttributeDefinitions=[{"AttributeName": "PK", "AttributeType": "S"}],1177 ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},1178 )1179 result = dynamodb_client.batch_get_item(1180 RequestItems={table_name: {"Keys": [{"PK": {"S": "test-key"}}]}}1181 )1182 assert list(result["Responses"])[0] == table_name1183 def test_dynamodb_streams_describe_with_exclusive_start_shard_id(1184 self, dynamodb_resource, dynamodb_create_table1185 ):1186 table_name = f"test-ddb-table-{short_uid()}"1187 ddbstreams = aws_stack.create_external_boto_client("dynamodbstreams")1188 dynamodb_create_table(1189 table_name=table_name,1190 partition_key=PARTITION_KEY,1191 stream_view_type="NEW_AND_OLD_IMAGES",1192 )1193 table = dynamodb_resource.Table(table_name)1194 response = ddbstreams.describe_stream(StreamArn=table.latest_stream_arn)1195 assert response["ResponseMetadata"]["HTTPStatusCode"] == 2001196 assert len(response["StreamDescription"]["Shards"]) == 11197 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]1198 response = ddbstreams.describe_stream(1199 StreamArn=table.latest_stream_arn, ExclusiveStartShardId=shard_id1200 )1201 assert response["ResponseMetadata"]["HTTPStatusCode"] == 2001202 assert len(response["StreamDescription"]["Shards"]) == 01203 @pytest.mark.aws_validated1204 def test_dynamodb_idempotent_writing(1205 self, dynamodb_create_table_with_parameters, dynamodb_client, dynamodb_wait_for_table_active1206 ):1207 table_name = f"ddb-table-{short_uid()}"1208 dynamodb_create_table_with_parameters(1209 TableName=table_name,1210 KeySchema=[1211 {"AttributeName": "id", "KeyType": "HASH"},1212 {"AttributeName": "name", "KeyType": "RANGE"},1213 ],1214 AttributeDefinitions=[1215 {"AttributeName": "id", "AttributeType": "S"},1216 {"AttributeName": "name", "AttributeType": "S"},1217 ],1218 ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},1219 )1220 dynamodb_wait_for_table_active(table_name)1221 def _transact_write(_d: Dict):1222 response = dynamodb_client.transact_write_items(...
fixtures.py
Source:fixtures.py
...243 )244 poll_condition(wait, timeout=30)245 return wait_for_table_active246@pytest.fixture247def dynamodb_create_table_with_parameters(dynamodb_client, dynamodb_wait_for_table_active):248 tables = []249 def factory(**kwargs):250 if "TableName" not in kwargs:251 kwargs["TableName"] = "test-table-%s" % short_uid()252 tables.append(kwargs["TableName"])253 return dynamodb_client.create_table(**kwargs)254 yield factory255 # cleanup256 for table in tables:257 try:258 # table has to be in ACTIVE state before deletion259 dynamodb_wait_for_table_active(table)260 dynamodb_client.delete_table(TableName=table)261 except Exception as e:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!