Best Python code snippet using localstack_python
test_session.py
Source:test_session.py
1import logging2from unittest.mock import Mock3import botocore.exceptions4import pytest5from bloop.exceptions import (6 BloopException,7 ConstraintViolation,8 InvalidSearch,9 InvalidShardIterator,10 InvalidStream,11 RecordsExpired,12 ShardIteratorExpired,13 TableMismatch,14 TransactionCanceled,15)16from bloop.models import (17 BaseModel,18 Column,19 GlobalSecondaryIndex,20 LocalSecondaryIndex,21)22from bloop.session import (23 BATCH_GET_ITEM_CHUNK_SIZE,24 SessionWrapper,25 compare_tables,26 create_table_request,27 ready,28 sanitize_table_description,29 simple_table_status,30)31from bloop.types import String, Timestamp32from bloop.util import Sentinel, ordered33from ..helpers.models import ComplexModel, SimpleModel, User34missing = Sentinel("missing")35@pytest.fixture36def dynamodb():37 # No spec since clients are generated dynamically.38 # We could use botocore.client.BaseClient but it's so generic39 # that we don't gain any useful inspections40 return Mock()41@pytest.fixture42def dynamodbstreams():43 # No spec since clients are generated dynamically.44 return Mock()45@pytest.fixture46def session(dynamodb, dynamodbstreams):47 return SessionWrapper(dynamodb=dynamodb, dynamodbstreams=dynamodbstreams)48@pytest.fixture49def model():50 """Return a clean model so each test can mutate the model's Meta"""51 class MyModel(BaseModel):52 class Meta:53 backups = {"enabled": True}54 billing = {"mode": "provisioned"}55 encryption = {"enabled": True}56 stream = {"include": {"old", "new"}}57 ttl = {"column": "expiry"}58 read_units = 359 write_units = 760 id = Column(String, hash_key=True)61 range = Column(String, range_key=True)62 expiry = Column(Timestamp)63 email = Column(String)64 gsi_email_keys = GlobalSecondaryIndex(65 projection="keys", hash_key=email,66 read_units=13, write_units=17)67 gsi_email_specific = GlobalSecondaryIndex(68 projection=["expiry"], hash_key=email,69 read_units=23, write_units=27)70 gsi_email_all = GlobalSecondaryIndex(71 projection="all", hash_key=email,72 read_units=23, write_units=27)73 lsi_email_keys = LocalSecondaryIndex(projection="keys", range_key=email)74 lsi_email_specific = LocalSecondaryIndex(projection=["expiry"], range_key=email)75 lsi_email_all = LocalSecondaryIndex(projection="all", range_key=email)76 return MyModel77@pytest.fixture78def basic_model():79 class BasicModel(BaseModel):80 id = Column(String, hash_key=True)81 return BasicModel82@pytest.fixture83def logger(caplog):84 class CaplogWrapper:85 def __init__(self):86 self.caplog = caplog87 def assert_logged(self, msg, level=logging.DEBUG):88 assert ("bloop.session", level, msg) in self.caplog.record_tuples89 def assert_only_logged(self, msg, level=logging.DEBUG):90 self.assert_logged(msg, level=level)91 assert len(self.caplog.record_tuples) == 192 return CaplogWrapper()93def description_for(cls, active=None):94 """Returns an exact description for the model"""95 description = create_table_request(cls.Meta.table_name, cls)96 if cls.Meta.encryption:97 description.pop("SSESpecification")98 description["SSEDescription"] = {"Status": "ENABLED"}99 if cls.Meta.ttl:100 description["TimeToLiveDescription"] = {101 "AttributeName": cls.Meta.ttl["column"].dynamo_name,102 "TimeToLiveStatus": "ENABLED",103 }104 if cls.Meta.backups:105 description["ContinuousBackupsDescription"] = {106 "ContinuousBackupsStatus": "ENABLED"107 }108 description["LatestStreamArn"] = "not-a-real-arn"109 # CreateTable::BillingMode -> DescribeTable::BillingModeSummary.BillingMode110 description["BillingModeSummary"] = {"BillingMode": description.pop("BillingMode")}111 description = sanitize_table_description(description)112 # post-sanitize because it strips TableStatus113 if active is not None:114 description["TableStatus"] = "ACTIVE" if active else "TEST-NOT-ACTIVE"115 for gsi in description["GlobalSecondaryIndexes"]:116 gsi["IndexStatus"] = "ACTIVE" if active else "TEST-NOT-ACTIVE"117 return description118def build_describe_stream_response(shards=missing, next_id=missing):119 description = {120 "StreamDescription": {121 "CreationRequestDateTime": "now",122 "KeySchema": [{"AttributeName": "string", "KeyType": "string"}],123 "LastEvaluatedShardId": next_id,124 "Shards": shards,125 "StreamArn": "string",126 "StreamLabel": "string",127 "StreamStatus": "string",128 "StreamViewType": "string",129 "TableName": "string"130 }131 }132 if shards is missing:133 description["StreamDescription"].pop("Shards")134 if next_id is missing:135 description["StreamDescription"].pop("LastEvaluatedShardId")136 return description137def client_error(code):138 error_response = {139 "Error": {140 "Code": code,141 "Message": "FooMessage"}}142 operation_name = "OperationName"143 return botocore.exceptions.ClientError(error_response, operation_name)144def minimal_description(active=True):145 description = sanitize_table_description({})146 description["TableStatus"] = "ACTIVE" if active else "TEST-NOT-ACTIVE"147 return description148# SAVE ITEM ================================================================================================ SAVE ITEM149def test_save_item(session, dynamodb):150 request = {"foo": "bar"}151 session.save_item(request)152 dynamodb.update_item.assert_called_once_with(**request)153def test_save_item_unknown_error(session, dynamodb):154 request = {"foo": "bar"}155 cause = dynamodb.update_item.side_effect = client_error("FooError")156 with pytest.raises(BloopException) as excinfo:157 session.save_item(request)158 assert excinfo.value.__cause__ is cause159 dynamodb.update_item.assert_called_once_with(**request)160def test_save_item_condition_failed(session, dynamodb):161 request = {"foo": "bar"}162 dynamodb.update_item.side_effect = client_error("ConditionalCheckFailedException")163 with pytest.raises(ConstraintViolation):164 session.save_item(request)165 dynamodb.update_item.assert_called_once_with(**request)166# END SAVE ITEM ======================================================================================== END SAVE ITEM167# DELETE ITEM ============================================================================================ DELETE ITEM168def test_delete_item(session, dynamodb):169 request = {"foo": "bar"}170 session.delete_item(request)171 dynamodb.delete_item.assert_called_once_with(**request)172def test_delete_item_unknown_error(session, dynamodb):173 request = {"foo": "bar"}174 cause = dynamodb.delete_item.side_effect = client_error("FooError")175 with pytest.raises(BloopException) as excinfo:176 session.delete_item(request)177 assert excinfo.value.__cause__ is cause178 dynamodb.delete_item.assert_called_once_with(**request)179def test_delete_item_condition_failed(session, dynamodb):180 request = {"foo": "bar"}181 dynamodb.delete_item.side_effect = client_error("ConditionalCheckFailedException")182 with pytest.raises(ConstraintViolation):183 session.delete_item(request)184 dynamodb.delete_item.assert_called_once_with(**request)185# END DELETE ITEM ==================================================================================== END DELETE ITEM186# LOAD ITEMS ============================================================================================== LOAD ITEMS187def test_batch_get_raises(session, dynamodb):188 cause = dynamodb.batch_get_item.side_effect = client_error("FooError")189 request = {"TableName": {"Keys": ["key"], "ConsistentRead": False}}190 with pytest.raises(BloopException) as excinfo:191 session.load_items(request)192 assert excinfo.value.__cause__ is cause193def test_batch_get_one_item(session, dynamodb):194 """A single call for a single item"""195 user = User(id="user_id")196 request = {"User": {"Keys": [{"id": {"S": user.id}}],197 "ConsistentRead": False}}198 # When batching input with less keys than the batch size, the request199 # will look identical200 expected_request = request201 response = {202 "Responses": {203 "User": [{"id": {"S": user.id}, "age": {"N": "4"}}]204 },205 "UnprocessedKeys": {}206 }207 # Expected response is a single list of users208 expected_response = {"User": [{"id": {"S": user.id}, "age": {"N": "4"}}]}209 def handle(RequestItems):210 assert RequestItems == expected_request211 return response212 dynamodb.batch_get_item.side_effect = handle213 response = session.load_items(request)214 assert response == expected_response215 dynamodb.batch_get_item.assert_called_once_with(RequestItems=expected_request)216def test_batch_get_one_batch(session, dynamodb):217 """A single call when the number of requested items is <= batch size"""218 users = [User(id=str(i)) for i in range(BATCH_GET_ITEM_CHUNK_SIZE)]219 # Request to the bloop client220 client_request = {221 "User": {222 "Keys": [223 {"id": {"S": user.id}}224 for user in users225 ],226 "ConsistentRead": False227 }228 }229 boto3_client_response = {230 "Responses": {231 "User": [232 {"id": {"S": user.id}, "age": {"N": "4"}}233 for user in users234 ]235 },236 "UnprocessedKeys": {}237 }238 # The response that the bloop client should return239 expected_client_response = boto3_client_response["Responses"]240 dynamodb.batch_get_item.return_value = boto3_client_response241 response = session.load_items(client_request)242 dynamodb.batch_get_item.assert_called_once_with(RequestItems=client_request)243 assert response == expected_client_response244def test_batch_get_paginated(session, dynamodb):245 """Paginate requests to fit within the max batch size"""246 users = [User(id=str(i)) for i in range(BATCH_GET_ITEM_CHUNK_SIZE + 1)]247 keys = [248 {"id": {"S": user.id}}249 for user in users250 ]251 # Request with BATCH_GET_ITEM_CHUNK_SIZE + 1 items sent to the bloop client252 client_request = {"User": {"Keys": keys, "ConsistentRead": False}}253 # The two responses that boto3 would return to the bloop client254 batched_responses = [255 # [0, BATCH_GET_ITEM_CHUNK_SIZE] items256 {257 "Responses": {258 "User": [259 {"id": {"S": user.id}, "age": {"N": "4"}}260 for user in users[:BATCH_GET_ITEM_CHUNK_SIZE]261 ]262 },263 "UnprocessedKeys": {}264 },265 # [BATCH_GET_ITEM_CHUNK_SIZE+1, ] items266 {267 "Responses": {268 "User": [269 {"id": {"S": user.id}, "age": {"N": "4"}}270 for user in users[BATCH_GET_ITEM_CHUNK_SIZE:]271 ]272 },273 "UnprocessedKeys": {}274 }275 ]276 # The response that the bloop client should return (all items)277 expected_client_response = {278 "User": [279 {"id": {"S": user.id}, "age": {"N": "4"}}280 for user in users281 ]282 }283 dynamodb.batch_get_item.side_effect = batched_responses284 response = session.load_items(client_request)285 assert dynamodb.batch_get_item.call_count == 2286 assert response == expected_client_response287def test_batch_get_unprocessed(session, dynamodb):288 """ Re-request unprocessed keys """289 user = User(id="user_id")290 request = {291 "User": {292 "Keys": [{"id": {"S": user.id}}],293 "ConsistentRead": False294 }295 }296 expected_requests = [{297 "User": {298 "Keys": [{"id": {"S": user.id}}],299 "ConsistentRead": False}300 }, {301 "User": {302 "Keys": [{"id": {"S": user.id}}],303 "ConsistentRead": False}304 }]305 responses = [{306 "UnprocessedKeys": {307 "User": {308 "Keys": [{"id": {"S": user.id}}],309 "ConsistentRead": False}}310 }, {311 "Responses": {312 "User": [{"id": {"S": user.id}, "age": {"N": "4"}}]313 },314 "UnprocessedKeys": {}315 }]316 expected_response = {"User": [{"id": {"S": user.id}, "age": {"N": "4"}}]}317 calls = 0318 def handle(RequestItems):319 nonlocal calls320 expected = expected_requests[calls]321 response = responses[calls]322 calls += 1323 assert RequestItems == expected324 return response325 dynamodb.batch_get_item = handle326 response = session.load_items(request)327 assert calls == 2328 assert response == expected_response329# END LOAD ITEMS ====================================================================================== END LOAD ITEMS330# QUERY SCAN SEARCH ================================================================================ QUERY SCAN SEARCH331@pytest.mark.parametrize("response, expected", [332 ({}, (0, 0)),333 ({"Count": -1}, (-1, -1)),334 ({"ScannedCount": -1}, (0, -1)),335 ({"Count": 1, "ScannedCount": 2}, (1, 2))336], ids=str)337def test_query_scan(session, dynamodb, response, expected):338 dynamodb.query.return_value = response339 dynamodb.scan.return_value = response340 expected = {"Count": expected[0], "ScannedCount": expected[1]}341 assert session.query_items({}) == expected342 assert session.scan_items({}) == expected343def test_query_scan_raise(session, dynamodb):344 cause = dynamodb.query.side_effect = client_error("FooError")345 with pytest.raises(BloopException) as excinfo:346 session.query_items({})347 assert excinfo.value.__cause__ is cause348 cause = dynamodb.scan.side_effect = client_error("FooError")349 with pytest.raises(BloopException) as excinfo:350 session.scan_items({})351 assert excinfo.value.__cause__ is cause352def test_search_unknown(session):353 with pytest.raises(InvalidSearch) as excinfo:354 session.search_items(mode="foo", request={})355 assert "foo" in str(excinfo.value)356# END QUERY SCAN SEARCH ======================================================================== END QUERY SCAN SEARCH357# CREATE TABLE ========================================================================================== CREATE TABLE358def test_create_table(session, dynamodb):359 expected = {360 "LocalSecondaryIndexes": [361 {"Projection": {"NonKeyAttributes": ["date", "name",362 "email", "joined"],363 "ProjectionType": "INCLUDE"},364 "IndexName": "by_joined",365 "KeySchema": [366 {"KeyType": "HASH", "AttributeName": "name"},367 {"KeyType": "RANGE", "AttributeName": "joined"}]}],368 "ProvisionedThroughput": {"ReadCapacityUnits": 3,369 "WriteCapacityUnits": 2},370 "GlobalSecondaryIndexes": [371 {"Projection": {"ProjectionType": "ALL"},372 "IndexName": "by_email",373 "ProvisionedThroughput": {"ReadCapacityUnits": 4,374 "WriteCapacityUnits": 5},375 "KeySchema": [{"KeyType": "HASH", "AttributeName": "email"}]}],376 "TableName": "LocalTableName",377 "KeySchema": [378 {"KeyType": "HASH", "AttributeName": "name"},379 {"KeyType": "RANGE", "AttributeName": "date"}],380 "AttributeDefinitions": [381 {"AttributeType": "S", "AttributeName": "date"},382 {"AttributeType": "S", "AttributeName": "name"},383 {"AttributeType": "S", "AttributeName": "joined"},384 {"AttributeType": "S", "AttributeName": "email"}],385 'BillingMode': 'PROVISIONED',386 }387 def handle(**table):388 assert ordered(table) == ordered(expected)389 dynamodb.create_table.side_effect = handle390 session.create_table("LocalTableName", ComplexModel)391 assert dynamodb.create_table.call_count == 1392def test_create_subclass(session, dynamodb):393 """Creating a subclass should include all parent models' columns in the request"""394 class SubModel(User):395 my_id = Column(String, hash_key=True)396 expected = {397 'AttributeDefinitions': [398 {'AttributeName': 'my_id', 'AttributeType': 'S'},399 {'AttributeName': 'email', 'AttributeType': 'S'},400 ],401 'BillingMode': 'PROVISIONED',402 'KeySchema': [{'AttributeName': 'my_id', 'KeyType': 'HASH'}],403 'ProvisionedThroughput': {404 'ReadCapacityUnits': 1, 'WriteCapacityUnits': 1},405 'TableName': 'LocalTableName',406 'GlobalSecondaryIndexes': [407 {'IndexName': 'by_email', 'KeySchema': [{'AttributeName': 'email', 'KeyType': 'HASH'}],408 'Projection': {'ProjectionType': 'ALL'},409 'ProvisionedThroughput': {'WriteCapacityUnits': 1, 'ReadCapacityUnits': 1}}]410 }411 def handle(**table):412 assert ordered(table) == ordered(expected)413 dynamodb.create_table.side_effect = handle414 session.create_table("LocalTableName", SubModel)415 assert dynamodb.create_table.call_count == 1416def test_create_raises_unknown(session, dynamodb):417 cause = dynamodb.create_table.side_effect = client_error("FooError")418 with pytest.raises(BloopException) as excinfo:419 session.create_table("User", User)420 assert excinfo.value.__cause__ is cause421 assert dynamodb.create_table.call_count == 1422def test_create_already_exists(session, dynamodb):423 dynamodb.create_table.side_effect = client_error("ResourceInUseException")424 session.create_table("User", User)425 assert dynamodb.create_table.call_count == 1426# END CREATE TABLE ================================================================================== END CREATE TABLE427# DESCRIBE TABLE ====================================================================================== DESCRIBE TABLE428def test_describe_table_raises_unknown(session, dynamodb):429 cause = dynamodb.describe_table.side_effect = client_error("FooError")430 with pytest.raises(BloopException) as excinfo:431 session.describe_table("User")432 assert excinfo.value.__cause__ is cause433 assert dynamodb.describe_table.call_count == 1434 assert dynamodb.describe_time_to_live.call_count == 0435def test_describe_ttl_raises_unknown(session, dynamodb):436 dynamodb.describe_table.return_value = {"Table": minimal_description(True)}437 cause = dynamodb.describe_time_to_live.side_effect = client_error("FooError")438 with pytest.raises(BloopException) as excinfo:439 session.describe_table("User")440 assert excinfo.value.__cause__ is cause441 assert dynamodb.describe_table.call_count == 1442 assert dynamodb.describe_time_to_live.call_count == 1443def test_describe_backups_raises_unknown(session, dynamodb):444 dynamodb.describe_table.return_value = {"Table": minimal_description(True)}445 dynamodb.describe_time_to_live.return_value = {}446 cause = dynamodb.describe_continuous_backups.side_effect = client_error("FooError")447 with pytest.raises(BloopException) as excinfo:448 session.describe_table("User")449 assert excinfo.value.__cause__ is cause450 assert dynamodb.describe_table.call_count == 1451 assert dynamodb.describe_time_to_live.call_count == 1452 assert dynamodb.describe_continuous_backups.call_count == 1453def test_describe_table_polls_status(session, dynamodb):454 dynamodb.describe_table.side_effect = [455 {"Table": minimal_description(False)},456 {"Table": minimal_description(True)}457 ]458 dynamodb.describe_time_to_live.return_value = {"TimeToLiveDescription": {}}459 dynamodb.describe_continuous_backups.return_value = {"ContinuousBackupsDescription": {}}460 description = session.describe_table("User")461 # table status is filtered out462 assert "TableStatus" not in description463 assert dynamodb.describe_table.call_count == 2464 assert dynamodb.describe_time_to_live.call_count == 1465 assert dynamodb.describe_continuous_backups.call_count == 1466def test_describe_table_sanitizes(session, dynamodb, caplog):467 responses = dynamodb.describe_table.side_effect = [468 {"Table": minimal_description(False)},469 {"Table": minimal_description(True)}470 ]471 # New/unknown fields are filtered out472 responses[-1]["Table"]["UnknownField"] = "Something"473 # Missing fields are added474 responses[-1]["Table"].pop("GlobalSecondaryIndexes")475 dynamodb.describe_time_to_live.return_value = {"TimeToLiveDescription": {}}476 dynamodb.describe_continuous_backups.return_value = {"ContinuousBackupsDescription": {}}477 description = session.describe_table("User")478 assert "UnknownField" not in description479 assert description["GlobalSecondaryIndexes"] == []480 assert caplog.record_tuples == [481 ("bloop.session", logging.DEBUG,482 "describe_table: table \"User\" was in ACTIVE state after 2 calls"),483 ]484def test_describe_table_caches_responses(session, dynamodb):485 dynamodb.describe_table.side_effect = [486 {"Table": minimal_description(True)},487 {"Table": minimal_description(True)}488 ]489 dynamodb.describe_time_to_live.return_value = {"TimeToLiveDescription": {}}490 dynamodb.describe_continuous_backups.return_value = {"ContinuousBackupsDescription": {}}491 first_description = session.describe_table("User")492 second_description = session.describe_table("User")493 assert first_description is second_description494 assert dynamodb.describe_table.call_count == 1495 assert dynamodb.describe_time_to_live.call_count == 1496 assert dynamodb.describe_continuous_backups.call_count == 1497 session.clear_cache()498 session.describe_table("User")499 assert dynamodb.describe_table.call_count == 2500 assert dynamodb.describe_time_to_live.call_count == 2501 assert dynamodb.describe_continuous_backups.call_count == 2502# END DESCRIBE TABLE ============================================================================== END DESCRIBE TABLE503def test_enable_ttl(session, dynamodb):504 class Model(BaseModel):505 class Meta:506 ttl = {"column": "expiry"}507 id = Column(String, hash_key=True)508 expiry = Column(Timestamp, dynamo_name="e!!")509 session.enable_ttl("LocalTableName", Model)510 expected = {511 "TableName": "LocalTableName",512 "TimeToLiveSpecification": {513 "AttributeName": "e!!",514 "Enabled": True515 }516 }517 dynamodb.update_time_to_live.assert_called_once_with(**expected)518def test_enable_ttl_wraps_exception(session, dynamodb):519 class Model(BaseModel):520 class Meta:521 ttl = {"column": "expiry"}522 id = Column(String, hash_key=True)523 expiry = Column(Timestamp, dynamo_name="e!!")524 dynamodb.update_time_to_live.side_effect = expected = client_error("FooError")525 with pytest.raises(BloopException) as excinfo:526 session.enable_ttl("LocalTableName", Model)527 assert excinfo.value.__cause__ is expected528def test_enable_backups(session, dynamodb):529 class Model(BaseModel):530 class Meta:531 backups = {"enabled": True}532 id = Column(String, hash_key=True)533 session.enable_backups("LocalTableName", Model)534 expected = {535 "TableName": "LocalTableName",536 "PointInTimeRecoverySpecification": {"PointInTimeRecoveryEnabled": True}537 }538 dynamodb.update_continuous_backups.assert_called_once_with(**expected)539def test_enable_backups_wraps_exception(session, dynamodb):540 class Model(BaseModel):541 class Meta:542 backups = {"enabled": True}543 id = Column(String, hash_key=True)544 dynamodb.update_continuous_backups.side_effect = expected = client_error("FooError")545 with pytest.raises(BloopException) as excinfo:546 session.enable_backups("LocalTableName", Model)547 assert excinfo.value.__cause__ is expected548# VALIDATE TABLE ====================================================================================== VALIDATE TABLE549def test_validate_table_all_meta(model, session, dynamodb, logger):550 description = description_for(model, active=True)551 dynamodb.describe_table.return_value = {"Table": description}552 dynamodb.describe_time_to_live.return_value = {553 "TimeToLiveDescription": {554 "AttributeName": model.Meta.ttl["column"].dynamo_name,555 "TimeToLiveStatus": "ENABLED"556 }557 }558 dynamodb.describe_continuous_backups.return_value = {559 "ContinuousBackupsDescription": {560 "ContinuousBackupsStatus": "ENABLED"561 }562 }563 session.validate_table(model.Meta.table_name, model)564def test_validate_table_mismatch(basic_model, session, dynamodb, logger):565 description = description_for(basic_model, active=True)566 description["AttributeDefinitions"] = []567 dynamodb.describe_table.return_value = {"Table": description}568 dynamodb.describe_time_to_live.return_value = {}569 dynamodb.describe_continuous_backups.return_value = {}570 with pytest.raises(TableMismatch) as excinfo:571 session.validate_table(basic_model.Meta.table_name, basic_model)572 assert str(excinfo.value) == "The expected and actual tables for 'BasicModel' do not match."573 logger.assert_logged("Table is missing expected attribute 'id'")574def test_validate_table_sets_stream_arn(model, session, dynamodb, logger):575 # isolate the Meta component we're trying to observe576 model.Meta.billing = None577 # model.Meta.stream = None578 model.Meta.ttl = None579 model.Meta.encryption = None580 model.Meta.backups = None581 description = description_for(model, active=True)582 dynamodb.describe_table.return_value = {"Table": description}583 dynamodb.describe_time_to_live.return_value = {}584 dynamodb.describe_continuous_backups.return_value = {}585 session.validate_table(model.Meta.table_name, model)586 assert model.Meta.stream["arn"] == "not-a-real-arn"587 logger.assert_logged("Set MyModel.Meta.stream['arn'] to 'not-a-real-arn' from DescribeTable response")588def test_validate_table_sets_ttl(model, session, dynamodb, logger):589 # isolate the Meta component we're trying to observe590 model.Meta.billing = None591 model.Meta.stream = None592 # model.Meta.ttl = None593 model.Meta.encryption = None594 model.Meta.backups = None595 description = description_for(model, active=True)596 dynamodb.describe_table.return_value = {"Table": description}597 dynamodb.describe_time_to_live.return_value = {598 "TimeToLiveDescription": {599 "AttributeName": model.Meta.ttl["column"].dynamo_name,600 "TimeToLiveStatus": "ENABLED"601 }602 }603 dynamodb.describe_continuous_backups.return_value = {}604 session.validate_table(model.Meta.table_name, model)605 assert model.Meta.ttl["enabled"] is True606 logger.assert_logged("Set MyModel.Meta.ttl['enabled'] to 'True' from DescribeTable response")607def test_validate_table_sets_encryption(model, session, dynamodb, logger):608 # isolate the Meta component we're trying to observe609 model.Meta.billing = None610 model.Meta.stream = None611 model.Meta.ttl = None612 # model.Meta.encryption = None613 model.Meta.backups = None614 description = description_for(model, active=True)615 dynamodb.describe_table.return_value = {"Table": description}616 dynamodb.describe_time_to_live.return_value = {}617 dynamodb.describe_continuous_backups.return_value = {}618 # clear the Meta value so validate_table can set it619 model.Meta.encryption = None620 session.validate_table(model.Meta.table_name, model)621 assert model.Meta.encryption["enabled"] is True622 logger.assert_logged("Set MyModel.Meta.encryption['enabled'] to 'True' from DescribeTable response")623def test_validate_table_sets_backups(model, session, dynamodb, logger):624 # isolate the Meta component we're trying to observe625 model.Meta.billing = None626 model.Meta.stream = None627 model.Meta.ttl = None628 model.Meta.encryption = None629 # model.Meta.backups = None630 description = description_for(model, active=True)631 dynamodb.describe_table.return_value = {"Table": description}632 dynamodb.describe_time_to_live.return_value = {}633 dynamodb.describe_continuous_backups.return_value = {634 "ContinuousBackupsDescription": {635 "ContinuousBackupsStatus": "ENABLED"636 }637 }638 # clear the Meta value so validate_table can set it639 model.Meta.backups = None640 session.validate_table(model.Meta.table_name, model)641 assert model.Meta.backups == {"enabled": True}642 logger.assert_logged("Set MyModel.Meta.backups['enabled'] to 'True' from DescribeTable response")643@pytest.mark.parametrize("billing_mode", ["provisioned", "on_demand"])644def test_validate_table_sets_billing_mode(billing_mode, model, session, dynamodb, logger):645 # isolate the Meta components we're trying to observe646 # model.Meta.billing = None647 model.Meta.stream = None648 model.Meta.ttl = None649 model.Meta.encryption = None650 model.Meta.backups = None651 model.Meta.billing["mode"] = billing_mode652 description = description_for(model, active=True)653 dynamodb.describe_table.return_value = {"Table": description}654 dynamodb.describe_time_to_live.return_value = {}655 dynamodb.describe_continuous_backups.return_value = {}656 # clear the Meta value so validate_table can set it657 model.Meta.billing = None658 session.validate_table(model.Meta.table_name, model)659 assert model.Meta.billing["mode"] == billing_mode660 logger.assert_logged(f"Set MyModel.Meta.billing['mode'] to '{billing_mode}' from DescribeTable response")661def test_validate_table_sets_table_throughput(model, session, dynamodb, logger):662 # isolate the Meta components we're trying to observe663 model.Meta.billing = None664 model.Meta.stream = None665 model.Meta.ttl = None666 model.Meta.encryption = None667 model.Meta.backups = None668 description = description_for(model, active=True)669 dynamodb.describe_table.return_value = {"Table": description}670 dynamodb.describe_time_to_live.return_value = {}671 dynamodb.describe_continuous_backups.return_value = {}672 # tell the model to stop tracking read/write units so that we can see it's added back673 expected_read_units = model.Meta.read_units674 expected_write_units = model.Meta.write_units675 model.Meta.read_units = model.Meta.write_units = None676 session.validate_table(model.Meta.table_name, model)677 assert model.Meta.read_units == expected_read_units678 assert model.Meta.write_units == expected_write_units679 logger.assert_logged("Set MyModel.Meta.read_units to 3 from DescribeTable response")680 logger.assert_logged("Set MyModel.Meta.write_units to 7 from DescribeTable response")681def test_validate_table_sets_gsi_throughput(model, session, dynamodb, logger):682 # isolate the Meta components we're trying to observe683 model.Meta.billing = None684 model.Meta.stream = None685 model.Meta.ttl = None686 model.Meta.encryption = None687 model.Meta.backups = None688 description = description_for(model, active=True)689 dynamodb.describe_table.return_value = {"Table": description}690 dynamodb.describe_time_to_live.return_value = {}691 dynamodb.describe_continuous_backups.return_value = {}692 # tell the model to stop tracking read/write units so that we can see it's added back693 index = any_index(model, "gsis")694 expected_read_units = index.read_units695 expected_write_units = index.write_units696 index.read_units = index.write_units = None697 session.validate_table(model.Meta.table_name, model)698 assert index.read_units == expected_read_units699 assert index.write_units == expected_write_units700 logger.assert_logged(f"Set MyModel.{index.name}.read_units to {expected_read_units} from DescribeTable response")701 logger.assert_logged(f"Set MyModel.{index.name}.write_units to {expected_write_units} from DescribeTable response")702# END VALIDATE TABLE ============================================================================== END VALIDATE TABLE703# DESCRIBE STREAM ==================================================================================== DESCRIBE STREAM704def test_describe_stream_unknown_error(session, dynamodbstreams):705 request = {"StreamArn": "arn", "ExclusiveStartShardId": "shard id"}706 cause = dynamodbstreams.describe_stream.side_effect = client_error("FooError")707 with pytest.raises(BloopException) as excinfo:708 session.describe_stream("arn", "shard id")709 assert excinfo.value.__cause__ is cause710 dynamodbstreams.describe_stream.assert_called_once_with(**request)711def test_describe_stream_not_found(session, dynamodbstreams):712 request = {"StreamArn": "arn", "ExclusiveStartShardId": "shard id"}713 cause = dynamodbstreams.describe_stream.side_effect = client_error("ResourceNotFoundException")714 with pytest.raises(InvalidStream) as excinfo:715 session.describe_stream("arn", "shard id")716 assert excinfo.value.__cause__ is cause717 dynamodbstreams.describe_stream.assert_called_once_with(**request)718@pytest.mark.parametrize("no_shards", [missing, list()])719@pytest.mark.parametrize("next_ids", [(missing, ), (None, ), ("two pages", None)])720def test_describe_stream_no_results(no_shards, next_ids, session, dynamodbstreams):721 stream_arn = "arn"722 responses = [build_describe_stream_response(shards=no_shards, next_id=next_id) for next_id in next_ids]723 dynamodbstreams.describe_stream.side_effect = responses724 description = session.describe_stream(stream_arn=stream_arn, first_shard="first-token")725 assert description["Shards"] == []726 empty_response = build_describe_stream_response(shards=[])["StreamDescription"]727 assert ordered(description) == ordered(empty_response)728 dynamodbstreams.describe_stream.assert_any_call(StreamArn=stream_arn, ExclusiveStartShardId="first-token")729 assert dynamodbstreams.describe_stream.call_count == len(next_ids)730@pytest.mark.parametrize("shard_list", [731 # results followed by empty732 (["first", "second"], []),733 # empty followed by results734 ([], ["first", "second"])735])736def test_describe_stream_combines_results(shard_list, session, dynamodbstreams):737 stream_arn = "arn"738 responses = [build_describe_stream_response(shards=shard_list[0], next_id="second-token"),739 build_describe_stream_response(shards=shard_list[1], next_id=missing)]740 dynamodbstreams.describe_stream.side_effect = responses741 description = session.describe_stream(stream_arn)742 assert description["Shards"] == ["first", "second"]743 assert dynamodbstreams.describe_stream.call_count == 2744 dynamodbstreams.describe_stream.assert_any_call(StreamArn=stream_arn)745 dynamodbstreams.describe_stream.assert_any_call(StreamArn=stream_arn, ExclusiveStartShardId="second-token")746# END DESCRIBE STREAM ============================================================================ END DESCRIBE STREAM747# GET SHARD ITERATOR ============================================================================= GET SHARD ITERATOR748def test_get_unknown_shard_iterator(dynamodbstreams, session):749 unknown_type = "foo123"750 with pytest.raises(InvalidShardIterator) as excinfo:751 session.get_shard_iterator(752 stream_arn="arn",753 shard_id="shard_id",754 iterator_type=unknown_type,755 sequence_number=None756 )757 assert unknown_type in str(excinfo.value)758 dynamodbstreams.get_shard_iterator.assert_not_called()759def test_get_trimmed_shard_iterator(dynamodbstreams, session):760 dynamodbstreams.get_shard_iterator.side_effect = client_error("TrimmedDataAccessException")761 with pytest.raises(RecordsExpired):762 session.get_shard_iterator(763 stream_arn="arn",764 shard_id="shard_id",765 iterator_type="at_sequence",766 sequence_number="sequence-123"767 )768 dynamodbstreams.get_shard_iterator.assert_called_once_with(769 StreamArn="arn",770 ShardId="shard_id",771 ShardIteratorType="AT_SEQUENCE_NUMBER",772 SequenceNumber="sequence-123"773 )774def test_get_shard_iterator_unknown_error(dynamodbstreams, session):775 cause = dynamodbstreams.get_shard_iterator.side_effect = client_error("FooError")776 with pytest.raises(BloopException) as excinfo:777 session.get_shard_iterator(stream_arn="arn", shard_id="shard_id", iterator_type="at_sequence")778 assert excinfo.value.__cause__ is cause779def test_get_shard_iterator_after_sequence(dynamodbstreams, session):780 dynamodbstreams.get_shard_iterator.return_value = {"ShardIterator": "return value"}781 shard_iterator = session.get_shard_iterator(782 stream_arn="arn",783 shard_id="shard_id",784 iterator_type="after_sequence",785 sequence_number="sequence-123"786 )787 assert shard_iterator == "return value"788 dynamodbstreams.get_shard_iterator.assert_called_once_with(789 StreamArn="arn",790 ShardId="shard_id",791 ShardIteratorType="AFTER_SEQUENCE_NUMBER",792 SequenceNumber="sequence-123"793 )794def test_get_shard_iterator_latest(dynamodbstreams, session):795 dynamodbstreams.get_shard_iterator.return_value = {"ShardIterator": "return value"}796 shard_iterator = session.get_shard_iterator(797 stream_arn="arn",798 shard_id="shard_id",799 iterator_type="latest"800 )801 assert shard_iterator == "return value"802 dynamodbstreams.get_shard_iterator.assert_called_once_with(803 StreamArn="arn",804 ShardId="shard_id",805 ShardIteratorType="LATEST"806 )807# END GET SHARD ITERATOR ====================================================================== END GET SHARD ITERATOR808# GET STREAM RECORDS ============================================================================== GET STREAM RECORDS809def test_get_trimmed_records(dynamodbstreams, session):810 dynamodbstreams.get_records.side_effect = client_error("TrimmedDataAccessException")811 with pytest.raises(RecordsExpired):812 session.get_stream_records(iterator_id="iterator-123")813def test_get_records_expired_iterator(dynamodbstreams, session):814 dynamodbstreams.get_records.side_effect = client_error("ExpiredIteratorException")815 with pytest.raises(ShardIteratorExpired):816 session.get_stream_records("some-iterator")817def test_get_shard_records_unknown_error(dynamodbstreams, session):818 cause = dynamodbstreams.get_records.side_effect = client_error("FooError")819 with pytest.raises(BloopException) as excinfo:820 session.get_stream_records("iterator-123")821 assert excinfo.value.__cause__ is cause822def test_get_records(dynamodbstreams, session):823 # Return structure isn't important, since it's just a passthrough824 response = dynamodbstreams.get_records.return_value = {"return": "value"}825 records = session.get_stream_records(iterator_id="some-iterator")826 assert records is response827 dynamodbstreams.get_records.assert_called_once_with(ShardIterator="some-iterator")828# END GET STREAM RECORDS ====================================================================== END GET STREAM RECORDS829# TRANSACTION READ ================================================================================== TRANSACTION READ830def test_transaction_read(dynamodb, session):831 response = dynamodb.transact_get_items.return_value = {"Responses": ["placeholder"]}832 result = session.transaction_read("some-items")833 assert result is response834 dynamodb.transact_get_items.assert_called_once_with(TransactItems="some-items")835def test_transaction_read_canceled(dynamodb, session):836 cause = dynamodb.transact_get_items.side_effect = client_error("TransactionCanceledException")837 with pytest.raises(TransactionCanceled) as excinfo:838 session.transaction_read("some-items")839 assert excinfo.value.__cause__ is cause840def test_transaction_read_unknown_error(dynamodb, session):841 cause = dynamodb.transact_get_items.side_effect = client_error("FooError")842 with pytest.raises(BloopException) as excinfo:843 session.transaction_read("some-items")844 assert excinfo.value.__cause__ is cause845# END TRANSACTION READ ========================================================================== END TRANSACTION READ846# TRANSACTION WRITE ================================================================================ TRANSACTION WRITE847def test_transaction_write(dynamodb, session):848 session.transaction_write("some-items", "some-token")849 dynamodb.transact_write_items.assert_called_once_with(TransactItems="some-items", ClientRequestToken="some-token")850def test_transaction_write_canceled(dynamodb, session):851 cause = dynamodb.transact_write_items.side_effect = client_error("TransactionCanceledException")852 with pytest.raises(TransactionCanceled) as excinfo:853 session.transaction_write("some-items", "some-token")854 assert excinfo.value.__cause__ is cause855def test_transaction_write_unknown_error(dynamodb, session):856 cause = dynamodb.transact_write_items.side_effect = client_error("FooError")857 with pytest.raises(BloopException) as excinfo:858 session.transaction_write("some-items", "some-token")859 assert excinfo.value.__cause__ is cause860# END TRANSACTION WRITE ======================================================================== END TRANSACTION WRITE861# COMPARE TABLES ====================================================================================== COMPARE TABLES862def remove_index_by_name(description, to_remove):863 for index_type in ["GlobalSecondaryIndexes", "LocalSecondaryIndexes"]:864 description[index_type] = [865 index866 for index in description[index_type]867 if index["IndexName"] != to_remove868 ]869def find_index(description, index_name):870 for index_type in ["GlobalSecondaryIndexes", "LocalSecondaryIndexes"]:871 for index in description[index_type]:872 if index["IndexName"] == index_name:873 return index874 raise RuntimeError("test setup failed to find expected index by name")875def any_index(model, index_type, require_attributes=False):876 indexes = getattr(model.Meta, index_type)877 for index in indexes:878 if index.projection["mode"] != "keys" or not require_attributes:879 return index880 raise RuntimeError("test setup failed to find a usable index")881def test_compare_table_sanity_check(model, logger):882 """By default the test setup should provide a fully-valid description of the table.883 Without this sanity check, any test that makes the description slightly invalid wouldn't actually884 verify the compare_table method is failing as expected.885 """886 description = description_for(model)887 assert compare_tables(model, description)888 assert not logger.caplog.record_tuples889def test_compare_table_simple(basic_model):890 """A minimal model that doesn't care about streaming, ttl, or encryption and has no indexes"""891 description = description_for(basic_model)892 assert compare_tables(basic_model, description)893def test_compare_table_wrong_encryption_enabled(model, logger):894 description = description_for(model)895 description["SSEDescription"]["Status"] = "DISABLED"896 assert not compare_tables(model, description)897 logger.assert_only_logged("Model expects SSE to be 'ENABLED' but was 'DISABLED'")898def test_compare_table_wrong_backups_enabled(model, logger):899 description = description_for(model)900 description["ContinuousBackupsDescription"]["ContinuousBackupsStatus"] = "DISABLED"901 assert not compare_tables(model, description)902 logger.assert_only_logged("Model expects backups to be 'ENABLED' but was 'DISABLED'")903def test_compare_table_wrong_stream_enabled(model, logger):904 description = description_for(model)905 description["StreamSpecification"]["StreamEnabled"] = False906 assert not compare_tables(model, description)907 logger.assert_only_logged("Model expects streaming but streaming is not enabled")908def test_compare_table_wrong_stream_type(model, logger):909 description = description_for(model)910 description["StreamSpecification"]["StreamViewType"] = "UNKNOWN"911 assert not compare_tables(model, description)912 logger.assert_only_logged("Model expects StreamViewType 'NEW_AND_OLD_IMAGES' but was 'UNKNOWN'")913def test_compare_table_wrong_ttl_enabled(model, logger):914 description = description_for(model)915 description["TimeToLiveDescription"]["TimeToLiveStatus"] = "DISABLED"916 assert not compare_tables(model, description)917 logger.assert_only_logged("Model expects ttl but ttl is not enabled")918def test_compare_table_wrong_ttl_column(model, logger):919 description = description_for(model)920 description["TimeToLiveDescription"]["AttributeName"] = "wrong_column"921 assert not compare_tables(model, description)922 logger.assert_only_logged("Model expects ttl column to be 'expiry' but was 'wrong_column'")923@pytest.mark.parametrize("expected, wire", [924 ("on_demand", "provisioned"),925 ("provisioned", "on_demand")926])927def test_compare_table_wrong_billing_mode(expected, wire, model, logger):928 description = description_for(model)929 description["BillingModeSummary"]["BillingMode"] = {930 "on_demand": "PAY_PER_REQUEST",931 "provisioned": "PROVISIONED"932 }[wire]933 model.Meta.billing["mode"] = expected934 assert not compare_tables(model, description)935 logger.assert_only_logged(f"Model expects billing mode to be '{expected}' but was '{wire}'")936def test_compare_table_wrong_provisioned_throughput(model, logger):937 description = description_for(model)938 description["ProvisionedThroughput"]["ReadCapacityUnits"] = 200939 description["ProvisionedThroughput"]["WriteCapacityUnits"] = -100940 assert not compare_tables(model, description)941 logger.assert_logged("Model expects 3 read units but was 200")942 logger.assert_logged("Model expects 7 write units but was -100")943@pytest.mark.parametrize("index_type", ["gsis", "lsis"])944def test_compare_table_missing_index(index_type, model, logger):945 index_name = any_index(model, index_type).dynamo_name946 description = description_for(model)947 remove_index_by_name(description, index_name)948 assert not compare_tables(model, description)949 logger.assert_only_logged(f"Table is missing expected index '{index_name}'")950@pytest.mark.parametrize("index_type", ["gsis", "lsis"])951def test_compare_table_wrong_index_key_schema(index_type, model, logger):952 index = any_index(model, index_type)953 description = description_for(model)954 # drop the last entry in the key schema to ensure it's invalid955 index_description = find_index(description, index.dynamo_name)956 index_description["KeySchema"] = index_description["KeySchema"][:-1]957 assert not compare_tables(model, description)958 logger.assert_only_logged(f"KeySchema mismatch for index '{index.dynamo_name}'")959@pytest.mark.parametrize("index_type", ["gsis", "lsis"])960def test_compare_table_wrong_index_projection_type(index_type, model, logger):961 index = any_index(model, index_type)962 description = description_for(model)963 index_description = find_index(description, index.dynamo_name)964 index_description["Projection"]["ProjectionType"] = "UnknownProjectionType"965 assert not compare_tables(model, description)966 logger.assert_logged(f"Projection mismatch for index '{index.dynamo_name}'")967 logger.assert_logged("unexpected index ProjectionType 'UnknownProjectionType'", level=logging.INFO)968@pytest.mark.parametrize("index_type", ["gsis", "lsis"])969def test_compare_table_missing_index_projection_attributes(index_type, model, logger):970 index = any_index(model, index_type, require_attributes=True)971 description = description_for(model)972 index_description = find_index(description, index.dynamo_name)973 index_description["Projection"]["NonKeyAttributes"] = []974 # Since an index projecting "ALL" short-circuits the superset check, we need to advertise975 # a different valid but insufficient projection type976 if index_description["Projection"]["ProjectionType"] == "ALL":977 index_description["Projection"]["ProjectionType"] = "INCLUDE"978 assert not compare_tables(model, description)979 logger.assert_only_logged(f"Projection mismatch for index '{index.dynamo_name}'")980@pytest.mark.parametrize("unit_type", ["ReadCapacityUnits", "WriteCapacityUnits"])981def test_compare_table_wrong_gsi_throughput(unit_type, model, logger):982 index = any_index(model, "gsis")983 description = description_for(model)984 # set the capacity units to an impossible value985 index_description = find_index(description, index.dynamo_name)986 index_description["ProvisionedThroughput"][unit_type] = -1987 assert not compare_tables(model, description)988 logger.assert_only_logged(f"ProvisionedThroughput.{unit_type} mismatch for index '{index.dynamo_name}'")989def test_compare_table_missing_attribute(model, logger):990 description = description_for(model)991 attribute = description["AttributeDefinitions"].pop(-1)992 name = attribute["AttributeName"]993 assert not compare_tables(model, description)994 logger.assert_only_logged(f"Table is missing expected attribute '{name}'")995def test_compare_table_wrong_attribute_type(model, logger):996 description = description_for(model)997 attribute = description["AttributeDefinitions"][-1]998 attribute["AttributeType"] = "B"999 name = attribute["AttributeName"]1000 assert not compare_tables(model, description)1001 logger.assert_only_logged(f"AttributeDefinition mismatch for attribute '{name}'")1002def test_compare_table_extra_indexes(basic_model, model):1003 description = description_for(basic_model)1004 extended = description_for(model)1005 description["GlobalSecondaryIndexes"] = extended["GlobalSecondaryIndexes"]1006 description["LocalSecondaryIndexes"] = extended["LocalSecondaryIndexes"]1007 assert compare_tables(basic_model, description)1008@pytest.mark.parametrize("index_type", ["gsis", "lsis"])1009def test_compare_table_index_superset(index_type, model, capsys):1010 index = any_index(model, index_type)1011 description = description_for(model)1012 index_description = find_index(description, index.dynamo_name)1013 index_description["Projection"]["NonKeyAttributes"].append("AdditionalAttribute")1014 with capsys.disabled():1015 assert compare_tables(model, description)1016def test_compare_table_extra_attribute(basic_model, model):1017 description = description_for(basic_model)1018 extended = description_for(model)1019 description["AttributeDefinitions"].extend(extended["AttributeDefinitions"])1020 assert compare_tables(basic_model, description)1021# END COMPARE TABLES ============================================================================== END COMPARE TABLES1022# OTHER TABLE HELPERS ============================================================================ OTHER TABLE HELPERS1023def assert_unordered(obj, other):1024 assert ordered(obj) == ordered(other)1025def test_create_simple():1026 expected = {1027 'AttributeDefinitions': [1028 {'AttributeName': 'id', 'AttributeType': 'S'}],1029 'BillingMode': 'PROVISIONED',1030 'KeySchema': [{'AttributeName': 'id', 'KeyType': 'HASH'}],1031 'ProvisionedThroughput': {1032 'ReadCapacityUnits': 1,1033 'WriteCapacityUnits': 1},1034 'TableName': 'Simple'}1035 assert_unordered(create_table_request("Simple", SimpleModel), expected)1036def test_create_complex():1037 expected = {1038 'AttributeDefinitions': [1039 {'AttributeType': 'S', 'AttributeName': 'date'},1040 {'AttributeType': 'S', 'AttributeName': 'email'},1041 {'AttributeType': 'S', 'AttributeName': 'joined'},1042 {'AttributeType': 'S', 'AttributeName': 'name'}],1043 'BillingMode': 'PROVISIONED',1044 'GlobalSecondaryIndexes': [{1045 'IndexName': 'by_email',1046 'KeySchema': [{'KeyType': 'HASH', 'AttributeName': 'email'}],1047 'Projection': {'ProjectionType': 'ALL'},1048 'ProvisionedThroughput': {1049 'ReadCapacityUnits': 4, 'WriteCapacityUnits': 5}}],1050 'KeySchema': [{'KeyType': 'HASH', 'AttributeName': 'name'},1051 {'KeyType': 'RANGE', 'AttributeName': 'date'}],1052 'LocalSecondaryIndexes': [{1053 'IndexName': 'by_joined',1054 'KeySchema': [1055 {'KeyType': 'HASH', 'AttributeName': 'name'},1056 {'KeyType': 'RANGE', 'AttributeName': 'joined'}],1057 'Projection': {1058 'NonKeyAttributes': ['joined', 'email', 'date', 'name'],1059 'ProjectionType': 'INCLUDE'}}],1060 'ProvisionedThroughput': {1061 'ReadCapacityUnits': 3, 'WriteCapacityUnits': 2},1062 'TableName': 'CustomTableName'}1063 assert_unordered(create_table_request("CustomTableName", ComplexModel), expected)1064def test_create_table_no_stream():1065 """No StreamSpecification if Model.Meta.stream is None"""1066 class Model(BaseModel):1067 class Meta:1068 stream = None1069 id = Column(String, hash_key=True)1070 table = create_table_request("Model", Model)1071 assert "StreamSpecification" not in table1072@pytest.mark.parametrize("include, view_type", [1073 (["keys"], "KEYS_ONLY"),1074 (["new"], "NEW_IMAGE"),1075 (["old"], "OLD_IMAGE"),1076 (["new", "old"], "NEW_AND_OLD_IMAGES"),1077])1078def test_create_table_with_stream(include, view_type):1079 """A table that streams only new images"""1080 class Model(BaseModel):1081 class Meta:1082 stream = {"include": include}1083 id = Column(String, hash_key=True)1084 table = create_table_request("Model", Model)1085 assert table["StreamSpecification"] == {"StreamEnabled": True, "StreamViewType": view_type}1086@pytest.mark.parametrize("sse_encryption", [True, False])1087def test_create_table_with_encryption(sse_encryption):1088 """A table that specifies encryption settings"""1089 class Model(BaseModel):1090 class Meta:1091 encryption = {"enabled": sse_encryption}1092 id = Column(String, hash_key=True)1093 table = create_table_request("Model", Model)1094 assert table["SSESpecification"] == {"Enabled": bool(sse_encryption)}1095@pytest.mark.parametrize("table_status, gsi_status, expected_status", [1096 ("ACTIVE", "ACTIVE", ready),1097 ("ACTIVE", None, ready),1098 ("ACTIVE", "BUSY", None),1099 ("BUSY", "ACTIVE", None),1100 ("BUSY", "BUSY", None)1101])1102def test_simple_status(table_status, gsi_status, expected_status):1103 """Status is busy because table isn't ACTIVE, no GSIs"""1104 description = {"TableStatus": table_status}1105 if gsi_status is not None:1106 description["GlobalSecondaryIndexes"] = [{"IndexStatus": gsi_status}]1107 assert simple_table_status(description) == expected_status...
test_backup.py
Source:test_backup.py
...14# table is created, it has continuous backups disabled - but a few seconds15# later, it suddenly reports that they are enabled! I don't know how to16# explain this, but I believe this is not a behavior we should reproduce.17def test_describe_continuous_backups_without_continuous_backups(test_table):18 response = test_table.meta.client.describe_continuous_backups(TableName=test_table.name)19 print(response)20 assert 'ContinuousBackupsDescription' in response21 assert 'ContinuousBackupsStatus' in response['ContinuousBackupsDescription']22 assert response['ContinuousBackupsDescription']['ContinuousBackupsStatus'] == 'DISABLED'23 assert 'PointInTimeRecoveryDescription' in response['ContinuousBackupsDescription']24 assert response['ContinuousBackupsDescription']['PointInTimeRecoveryDescription'] == {'PointInTimeRecoveryStatus': 'DISABLED'}25# Test the DescribeContinuousBackups operation on a table that doesn't26# exist. It should report a TableNotFoundException - not that continuous27# backups are disabled.28def test_describe_continuous_backups_nonexistent(test_table):29 with pytest.raises(ClientError, match='TableNotFoundException'):30 test_table.meta.client.describe_continuous_backups(TableName=test_table.name+'nonexistent')31# Test the DescribeContinuousBackups operation without a table name.32# It should fail with ValidationException.33def test_describe_continuous_backups_missing(test_table):34 with pytest.raises(ClientError, match='ValidationException'):...
export_table_s3.py
Source:export_table_s3.py
...12# Create low level client13session = boto3.session.Session()14client = session.client('dynamodb')15pitr_status = False16backup_response = client.describe_continuous_backups(TableName=table_name)17if backup_response.get("ContinuousBackupsDescription").get("PointInTimeRecoveryDescription").get("PointInTimeRecoveryStatus") == "ENABLED":18 pitr_status = True19print(f"PITR is enabled: {pitr_status}")20if pitr_status:21 try:22 response = client.export_table_to_point_in_time(23 TableArn=table_arn,24 ExportTime=datetime(2021, 12, 16, 16, 45, 0, tzinfo=tzlocal()),25 S3Bucket=s3_bucket_name,26 S3Prefix='DB_BACKUP',27 S3SseAlgorithm='AES256',28 ExportFormat='DYNAMODB_JSON'29 )30 print(response)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!