Best Python code snippet using localstack_python
provider.py
Source:provider.py
...288 def forward_request(289 self, context: RequestContext, service_request: ServiceRequest = None290 ) -> ServiceResponse:291 # check rate limiting for this request and raise an error, if provisioned throughput is exceeded292 self.check_provisioned_throughput(context.operation.name)293 # note: modifying headers in-place here before forwarding the request294 self.prepare_request_headers(context.request.headers)295 return self.request_forwarder(context, service_request)296 def get_forward_url(self) -> str:297 """Return the URL of the backend DynamoDBLocal server to forward requests to"""298 return f"http://{LOCALHOST}:{server.get_server().port}"299 def on_before_start(self):300 start_dynamodb()301 wait_for_dynamodb()302 def handle_shell_ui_redirect(self, request: werkzeug.Request) -> Response:303 headers = {"Refresh": f"0; url={config.service_url('dynamodb')}/shell/index.html"}304 return Response("", headers=headers)305 def handle_shell_ui_request(self, request: werkzeug.Request, req_path: str) -> Response:306 # TODO: "DynamoDB Local Web Shell was deprecated with version 1.16.X and is not available any307 # longer from 1.17.X to latest. There are no immediate plans for a new Web Shell to be introduced."308 # -> keeping this for now, to allow configuring custom installs; should consider removing it in the future309 # https://repost.aws/questions/QUHyIzoEDqQ3iOKlUEp1LPWQ#ANdBm9Nz9TRf6VqR3jZtcA1g310 req_path = f"/{req_path}" if not req_path.startswith("/") else req_path311 url = f"{self.get_forward_url()}/shell{req_path}"312 result = requests.request(313 method=request.method, url=url, headers=request.headers, data=request.data314 )315 return Response(result.content, headers=dict(result.headers), status=result.status_code)316 @handler("CreateTable", expand=False)317 def create_table(318 self,319 context: RequestContext,320 create_table_input: CreateTableInput,321 ) -> CreateTableOutput:322 # Check if table exists, to avoid error log output from DynamoDBLocal323 table_name = create_table_input["TableName"]324 if self.table_exists(table_name):325 raise ResourceInUseException("Cannot create preexisting table")326 billing_mode = create_table_input.get("BillingMode")327 provisioned_throughput = create_table_input.get("ProvisionedThroughput")328 if billing_mode == BillingMode.PAY_PER_REQUEST and provisioned_throughput is not None:329 raise ValidationException(330 "One or more parameter values were invalid: Neither ReadCapacityUnits nor WriteCapacityUnits can be "331 "specified when BillingMode is PAY_PER_REQUEST"332 )333 # forward request to backend334 result = self.forward_request(context)335 table_description = result["TableDescription"]336 backend = DynamoDBRegion.get()337 backend.table_definitions[table_name] = table_definitions = dict(create_table_input)338 if "TableId" not in table_definitions:339 table_definitions["TableId"] = long_uid()340 if "SSESpecification" in table_definitions:341 sse_specification = table_definitions.pop("SSESpecification")342 table_definitions["SSEDescription"] = SSEUtils.get_sse_description(sse_specification)343 if table_definitions:344 table_content = result.get("Table", {})345 table_content.update(table_definitions)346 table_description.update(table_content)347 if "StreamSpecification" in table_definitions:348 create_dynamodb_stream(table_definitions, table_description.get("LatestStreamLabel"))349 if "TableClass" in table_definitions:350 table_class = table_description.pop("TableClass", None) or table_definitions.pop(351 "TableClass"352 )353 table_description["TableClassSummary"] = {"TableClass": table_class}354 tags = table_definitions.pop("Tags", [])355 if tags:356 table_arn = table_description["TableArn"]357 table_arn = self.fix_table_arn(table_arn)358 DynamoDBRegion.TABLE_TAGS[table_arn] = {tag["Key"]: tag["Value"] for tag in tags}359 # remove invalid attributes from result360 table_description.pop("Tags", None)361 table_description.pop("BillingMode", None)362 event_publisher.fire_event(363 event_publisher.EVENT_DYNAMODB_CREATE_TABLE,364 payload={"n": event_publisher.get_hash(table_name)},365 )366 return result367 def delete_table(self, context: RequestContext, table_name: TableName) -> DeleteTableOutput:368 # Check if table exists, to avoid error log output from DynamoDBLocal369 if not self.table_exists(table_name):370 raise ResourceNotFoundException("Cannot do operations on a non-existent table")371 # forward request to backend372 result = self.forward_request(context)373 event_publisher.fire_event(374 event_publisher.EVENT_DYNAMODB_DELETE_TABLE,375 payload={"n": event_publisher.get_hash(table_name)},376 )377 table_arn = result.get("TableDescription", {}).get("TableArn")378 table_arn = self.fix_table_arn(table_arn)379 self.delete_all_event_source_mappings(table_arn)380 dynamodbstreams_api.delete_streams(table_arn)381 DynamoDBRegion.TABLE_TAGS.pop(table_arn, None)382 return result383 def describe_table(self, context: RequestContext, table_name: TableName) -> DescribeTableOutput:384 # Check if table exists, to avoid error log output from DynamoDBLocal385 if not self.table_exists(table_name):386 raise ResourceNotFoundException("Cannot do operations on a non-existent table")387 # forward request to backend388 result = self.forward_request(context)389 # update response with additional props390 table_props = DynamoDBRegion.get().table_properties.get(table_name)391 if table_props:392 result.get("Table", {}).update(table_props)393 # update only TableId and SSEDescription if present394 table_definitions = DynamoDBRegion.get().table_definitions.get(table_name)395 if table_definitions:396 for key in ["TableId", "SSEDescription"]:397 if table_definitions.get(key):398 result.get("Table", {})[key] = table_definitions[key]399 if "TableClass" in table_definitions:400 result.get("Table", {})["TableClassSummary"] = {401 "TableClass": table_definitions["TableClass"]402 }403 return result404 @handler("UpdateTable", expand=False)405 def update_table(406 self, context: RequestContext, update_table_input: UpdateTableInput407 ) -> UpdateTableOutput:408 try:409 # forward request to backend410 result = self.forward_request(context)411 except CommonServiceException as e:412 is_no_update_error = (413 e.code == "ValidationException" and "Nothing to update" in e.message414 )415 if not is_no_update_error or not list(416 {"TableClass", "ReplicaUpdates"} & set(update_table_input.keys())417 ):418 raise419 table_name = update_table_input.get("TableName")420 if update_table_input.get("TableClass"):421 table_definitions = DynamoDBRegion.get().table_definitions.setdefault(422 table_name, {}423 )424 table_definitions["TableClass"] = update_table_input.get("TableClass")425 if update_table_input.get("ReplicaUpdates"):426 # update local table props (replicas)427 table_properties = DynamoDBRegion.get().table_properties428 table_properties[table_name] = table_props = table_properties.get(table_name) or {}429 table_props["Replicas"] = replicas = table_props.get("Replicas") or []430 for repl_update in update_table_input["ReplicaUpdates"]:431 for key, details in repl_update.items():432 region = details.get("RegionName")433 if key == "Create":434 details["ReplicaStatus"] = details.get("ReplicaStatus") or "ACTIVE"435 replicas.append(details)436 if key == "Update":437 replica = [r for r in replicas if r.get("RegionName") == region]438 if replica:439 replica[0].update(details)440 if key == "Delete":441 table_props["Replicas"] = [442 r for r in replicas if r.get("RegionName") != region443 ]444 # update response content445 schema = SchemaExtractor.get_table_schema(table_name)446 return UpdateTableOutput(TableDescription=schema["Table"])447 if "StreamSpecification" in update_table_input:448 create_dynamodb_stream(449 update_table_input, result["TableDescription"].get("LatestStreamLabel")450 )451 return result452 def list_tables(453 self,454 context: RequestContext,455 exclusive_start_table_name: TableName = None,456 limit: ListTablesInputLimit = None,457 ) -> ListTablesOutput:458 return self.forward_request(context)459 @handler("PutItem", expand=False)460 def put_item(self, context: RequestContext, put_item_input: PutItemInput) -> PutItemOutput:461 existing_item = None462 table_name = put_item_input["TableName"]463 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)464 if event_sources_or_streams_enabled:465 existing_item = ItemFinder.find_existing_item(put_item_input)466 # forward request to backend467 self.fix_return_consumed_capacity(put_item_input)468 result = self.forward_request(context, put_item_input)469 # Get stream specifications details for the table470 if event_sources_or_streams_enabled:471 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)472 item = put_item_input["Item"]473 # prepare record keys474 keys = SchemaExtractor.extract_keys(item=item, table_name=table_name)475 # create record476 record = self.get_record_template()477 record["eventName"] = "INSERT" if not existing_item else "MODIFY"478 record["dynamodb"].update(479 {480 "Keys": keys,481 "NewImage": item,482 "SizeBytes": len(json.dumps(item)),483 }484 )485 if stream_spec:486 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]487 if existing_item:488 record["dynamodb"]["OldImage"] = existing_item489 self.forward_stream_records([record], table_name=table_name)490 return result491 @handler("DeleteItem", expand=False)492 def delete_item(493 self,494 context: RequestContext,495 delete_item_input: DeleteItemInput,496 ) -> DeleteItemOutput:497 existing_item = None498 table_name = delete_item_input["TableName"]499 if has_event_sources_or_streams_enabled(table_name):500 existing_item = ItemFinder.find_existing_item(delete_item_input)501 # forward request to backend502 self.fix_return_consumed_capacity(delete_item_input)503 result = self.forward_request(context, delete_item_input)504 # determine and forward stream record505 if existing_item:506 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)507 if event_sources_or_streams_enabled:508 # create record509 record = self.get_record_template()510 record["eventName"] = "REMOVE"511 record["dynamodb"].update(512 {513 "Keys": delete_item_input["Key"],514 "OldImage": existing_item,515 "SizeBytes": len(json.dumps(existing_item)),516 }517 )518 # Get stream specifications details for the table519 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)520 if stream_spec:521 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]522 self.forward_stream_records([record], table_name=table_name)523 return result524 @handler("UpdateItem", expand=False)525 def update_item(526 self,527 context: RequestContext,528 update_item_input: UpdateItemInput,529 ) -> UpdateItemOutput:530 existing_item = None531 table_name = update_item_input["TableName"]532 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)533 if event_sources_or_streams_enabled:534 existing_item = ItemFinder.find_existing_item(update_item_input)535 # forward request to backend536 self.fix_return_consumed_capacity(update_item_input)537 result = self.forward_request(context, update_item_input)538 # construct and forward stream record539 if event_sources_or_streams_enabled:540 updated_item = ItemFinder.find_existing_item(update_item_input)541 if updated_item:542 record = self.get_record_template()543 record["eventName"] = "INSERT" if not existing_item else "MODIFY"544 record["dynamodb"].update(545 {546 "Keys": update_item_input["Key"],547 "NewImage": updated_item,548 "SizeBytes": len(json.dumps(updated_item)),549 }550 )551 if existing_item:552 record["dynamodb"]["OldImage"] = existing_item553 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)554 if stream_spec:555 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]556 self.forward_stream_records([record], table_name=table_name)557 return result558 @handler("GetItem", expand=False)559 def get_item(self, context: RequestContext, get_item_input: GetItemInput) -> GetItemOutput:560 result = self.forward_request(context)561 self.fix_consumed_capacity(get_item_input, result)562 return result563 @handler("Query", expand=False)564 def query(self, context: RequestContext, query_input: QueryInput) -> QueryOutput:565 index_name = query_input.get("IndexName")566 if index_name:567 if not is_index_query_valid(query_input):568 raise ValidationException(569 "One or more parameter values were invalid: Select type ALL_ATTRIBUTES "570 "is not supported for global secondary index id-index because its projection "571 "type is not ALL",572 )573 result = self.forward_request(context)574 self.fix_consumed_capacity(query_input, result)575 return result576 @handler("Scan", expand=False)577 def scan(self, context: RequestContext, scan_input: ScanInput) -> ScanOutput:578 return self.forward_request(context)579 @handler("BatchWriteItem", expand=False)580 def batch_write_item(581 self,582 context: RequestContext,583 batch_write_item_input: BatchWriteItemInput,584 ) -> BatchWriteItemOutput:585 existing_items = []586 unprocessed_put_items = []587 unprocessed_delete_items = []588 request_items = batch_write_item_input["RequestItems"]589 for table_name in sorted(request_items.keys()):590 for request in request_items[table_name]:591 for key in ["PutRequest", "DeleteRequest"]:592 inner_request = request.get(key)593 if inner_request:594 if self.should_throttle("BatchWriteItem"):595 if key == "PutRequest":596 unprocessed_put_items.append(inner_request)597 elif key == "DeleteRequest":598 unprocessed_delete_items.append(inner_request)599 else:600 item = ItemFinder.find_existing_item(inner_request, table_name)601 existing_items.append(item)602 # forward request to backend603 result = self.forward_request(context)604 # determine and forward stream records605 request_items = batch_write_item_input["RequestItems"]606 records, unprocessed_items = self.prepare_batch_write_item_records(607 request_items=request_items,608 unprocessed_put_items=unprocessed_put_items,609 unprocessed_delete_items=unprocessed_delete_items,610 existing_items=existing_items,611 )612 streams_enabled_cache = {}613 event_sources_or_streams_enabled = False614 for record in records:615 event_sources_or_streams_enabled = (616 event_sources_or_streams_enabled617 or has_event_sources_or_streams_enabled(618 record["eventSourceARN"], streams_enabled_cache619 )620 )621 if event_sources_or_streams_enabled:622 self.forward_stream_records(records)623 # update response624 if any(unprocessed_items):625 table_name = list(request_items.keys())[0]626 unprocessed = result["UnprocessedItems"]627 if table_name not in unprocessed:628 unprocessed[table_name] = []629 for key in ["PutRequest", "DeleteRequest"]:630 if any(unprocessed_items[key]):631 unprocessed_items[table_name].append({key: unprocessed_items[key]})632 for key in list(unprocessed.keys()):633 if not unprocessed.get(key):634 del unprocessed[key]635 return result636 @handler("BatchGetItem")637 def batch_get_item(638 self,639 context: RequestContext,640 request_items: BatchGetRequestMap,641 return_consumed_capacity: ReturnConsumedCapacity = None,642 ) -> BatchGetItemOutput:643 return self.forward_request(context)644 @handler("TransactWriteItems", expand=False)645 def transact_write_items(646 self,647 context: RequestContext,648 transact_write_items_input: TransactWriteItemsInput,649 ) -> TransactWriteItemsOutput:650 existing_items = []651 for item in transact_write_items_input["TransactItems"]:652 for key in ["Put", "Update", "Delete"]:653 inner_item = item.get(key)654 if inner_item:655 existing_items.append(ItemFinder.find_existing_item(inner_item))656 # forward request to backend657 result = self.forward_request(context)658 # determine and forward stream records659 streams_enabled_cache = {}660 records = self.prepare_transact_write_item_records(661 transact_items=transact_write_items_input["TransactItems"],662 existing_items=existing_items,663 )664 event_sources_or_streams_enabled = False665 for record in records:666 event_sources_or_streams_enabled = (667 event_sources_or_streams_enabled668 or has_event_sources_or_streams_enabled(669 record["eventSourceARN"], streams_enabled_cache670 )671 )672 if event_sources_or_streams_enabled:673 self.forward_stream_records(records)674 return result675 @handler("TransactGetItems", expand=False)676 def transact_get_items(677 self,678 context: RequestContext,679 transact_items: TransactGetItemList,680 return_consumed_capacity: ReturnConsumedCapacity = None,681 ) -> TransactGetItemsOutput:682 return self.forward_request(context)683 @handler("ExecuteTransaction", expand=False)684 def execute_transaction(685 self, context: RequestContext, execute_transaction_input: ExecuteTransactionInput686 ) -> ExecuteTransactionOutput:687 result = self.forward_request(context)688 return result689 @handler("ExecuteStatement", expand=False)690 def execute_statement(691 self,692 context: RequestContext,693 execute_statement_input: ExecuteStatementInput,694 ) -> ExecuteStatementOutput:695 statement = execute_statement_input["Statement"]696 table_name = extract_table_name_from_partiql_update(statement)697 existing_items = None698 if table_name and has_event_sources_or_streams_enabled(table_name):699 # Note: fetching the entire list of items is hugely inefficient, especially for larger tables700 # TODO: find a mechanism to hook into the PartiQL update mechanism of DynamoDB Local directly!701 existing_items = ItemFinder.list_existing_items_for_statement(statement)702 # forward request to backend703 result = self.forward_request(context)704 # construct and forward stream record705 event_sources_or_streams_enabled = table_name and has_event_sources_or_streams_enabled(706 table_name707 )708 if event_sources_or_streams_enabled:709 records = get_updated_records(table_name, existing_items)710 self.forward_stream_records(records, table_name=table_name)711 return result712 def tag_resource(713 self, context: RequestContext, resource_arn: ResourceArnString, tags: TagList714 ) -> None:715 table_tags = DynamoDBRegion.TABLE_TAGS716 if resource_arn not in table_tags:717 table_tags[resource_arn] = {}718 table_tags[resource_arn].update({tag["Key"]: tag["Value"] for tag in tags})719 def untag_resource(720 self, context: RequestContext, resource_arn: ResourceArnString, tag_keys: TagKeyList721 ) -> None:722 for tag_key in tag_keys or []:723 DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).pop(tag_key, None)724 def list_tags_of_resource(725 self,726 context: RequestContext,727 resource_arn: ResourceArnString,728 next_token: NextTokenString = None,729 ) -> ListTagsOfResourceOutput:730 result = [731 {"Key": k, "Value": v}732 for k, v in DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).items()733 ]734 return ListTagsOfResourceOutput(Tags=result)735 def describe_time_to_live(736 self, context: RequestContext, table_name: TableName737 ) -> DescribeTimeToLiveOutput:738 backend = DynamoDBRegion.get()739 ttl_spec = backend.ttl_specifications.get(table_name)740 result = {"TimeToLiveStatus": "DISABLED"}741 if ttl_spec:742 if ttl_spec.get("Enabled"):743 ttl_status = "ENABLED"744 else:745 ttl_status = "DISABLED"746 result = {747 "AttributeName": ttl_spec.get("AttributeName"),748 "TimeToLiveStatus": ttl_status,749 }750 return DescribeTimeToLiveOutput(TimeToLiveDescription=result)751 def update_time_to_live(752 self,753 context: RequestContext,754 table_name: TableName,755 time_to_live_specification: TimeToLiveSpecification,756 ) -> UpdateTimeToLiveOutput:757 # TODO: TTL status is maintained/mocked but no real expiry is happening for items758 backend = DynamoDBRegion.get()759 backend.ttl_specifications[table_name] = time_to_live_specification760 return UpdateTimeToLiveOutput(TimeToLiveSpecification=time_to_live_specification)761 def create_global_table(762 self, context: RequestContext, global_table_name: TableName, replication_group: ReplicaList763 ) -> CreateGlobalTableOutput:764 if global_table_name in DynamoDBRegion.GLOBAL_TABLES:765 raise GlobalTableAlreadyExistsException("Global table with this name already exists")766 replication_group = [grp.copy() for grp in replication_group or []]767 data = {"GlobalTableName": global_table_name, "ReplicationGroup": replication_group}768 DynamoDBRegion.GLOBAL_TABLES[global_table_name] = data769 for group in replication_group:770 group["ReplicaStatus"] = "ACTIVE"771 group["ReplicaStatusDescription"] = "Replica active"772 return CreateGlobalTableOutput(GlobalTableDescription=data)773 def describe_global_table(774 self, context: RequestContext, global_table_name: TableName775 ) -> DescribeGlobalTableOutput:776 details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)777 if not details:778 raise GlobalTableNotFoundException("Global table with this name does not exist")779 return DescribeGlobalTableOutput(GlobalTableDescription=details)780 def list_global_tables(781 self,782 context: RequestContext,783 exclusive_start_global_table_name: TableName = None,784 limit: PositiveIntegerObject = None,785 region_name: RegionName = None,786 ) -> ListGlobalTablesOutput:787 # TODO: add paging support788 result = [789 select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])790 for tab in DynamoDBRegion.GLOBAL_TABLES.values()791 ]792 return ListGlobalTablesOutput(GlobalTables=result)793 def update_global_table(794 self,795 context: RequestContext,796 global_table_name: TableName,797 replica_updates: ReplicaUpdateList,798 ) -> UpdateGlobalTableOutput:799 details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)800 if not details:801 raise GlobalTableNotFoundException("Global table with this name does not exist")802 for update in replica_updates or []:803 repl_group = details["ReplicationGroup"]804 # delete existing805 delete = update.get("Delete")806 if delete:807 details["ReplicationGroup"] = [808 g for g in repl_group if g["RegionName"] != delete["RegionName"]809 ]810 # create new811 create = update.get("Create")812 if create:813 exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]814 if exists:815 continue816 new_group = {817 "RegionName": create["RegionName"],818 "ReplicaStatus": "ACTIVE",819 "ReplicaStatusDescription": "Replica active",820 }821 details["ReplicationGroup"].append(new_group)822 return UpdateGlobalTableOutput(GlobalTableDescription=details)823 def enable_kinesis_streaming_destination(824 self, context: RequestContext, table_name: TableName, stream_arn: StreamArn825 ) -> KinesisStreamingDestinationOutput:826 # Check if table exists, to avoid error log output from DynamoDBLocal827 if not self.table_exists(table_name):828 raise ResourceNotFoundException("Cannot do operations on a non-existent table")829 stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)830 if not stream:831 raise ValidationException("User does not have a permission to use kinesis stream")832 table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})833 dest_status = table_def.get("KinesisDataStreamDestinationStatus")834 if dest_status not in ["DISABLED", "ENABLE_FAILED", None]:835 raise ValidationException(836 "Table is not in a valid state to enable Kinesis Streaming "837 "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "838 "to perform ENABLE operation."839 )840 table_def["KinesisDataStreamDestinations"] = (841 table_def.get("KinesisDataStreamDestinations") or []842 )843 # remove the stream destination if already present844 table_def["KinesisDataStreamDestinations"] = [845 t for t in table_def["KinesisDataStreamDestinations"] if t["StreamArn"] != stream_arn846 ]847 # append the active stream destination at the end of the list848 table_def["KinesisDataStreamDestinations"].append(849 {850 "DestinationStatus": "ACTIVE",851 "DestinationStatusDescription": "Stream is active",852 "StreamArn": stream_arn,853 }854 )855 table_def["KinesisDataStreamDestinationStatus"] = "ACTIVE"856 return KinesisStreamingDestinationOutput(857 DestinationStatus="ACTIVE", StreamArn=stream_arn, TableName=table_name858 )859 def disable_kinesis_streaming_destination(860 self, context: RequestContext, table_name: TableName, stream_arn: StreamArn861 ) -> KinesisStreamingDestinationOutput:862 # Check if table exists, to avoid error log output from DynamoDBLocal863 if not self.table_exists(table_name):864 raise ResourceNotFoundException("Cannot do operations on a non-existent table")865 stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)866 if not stream:867 raise ValidationException(868 "User does not have a permission to use kinesis stream",869 )870 table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})871 stream_destinations = table_def.get("KinesisDataStreamDestinations")872 if stream_destinations:873 if table_def["KinesisDataStreamDestinationStatus"] == "ACTIVE":874 for dest in stream_destinations:875 if dest["StreamArn"] == stream_arn and dest["DestinationStatus"] == "ACTIVE":876 dest["DestinationStatus"] = "DISABLED"877 dest["DestinationStatusDescription"] = ("Stream is disabled",)878 table_def["KinesisDataStreamDestinationStatus"] = "DISABLED"879 return KinesisStreamingDestinationOutput(880 DestinationStatus="DISABLED",881 StreamArn=stream_arn,882 TableName=table_name,883 )884 raise ValidationException(885 "Table is not in a valid state to disable Kinesis Streaming Destination:"886 "DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."887 )888 def describe_kinesis_streaming_destination(889 self, context: RequestContext, table_name: TableName890 ) -> DescribeKinesisStreamingDestinationOutput:891 # Check if table exists, to avoid error log output from DynamoDBLocal892 if not self.table_exists(table_name):893 raise ResourceNotFoundException("Cannot do operations on a non-existent table")894 table_def = DynamoDBRegion.get().table_definitions.get(table_name) or {}895 stream_destinations = table_def.get("KinesisDataStreamDestinations") or []896 return DescribeKinesisStreamingDestinationOutput(897 KinesisDataStreamDestinations=stream_destinations, TableName=table_name898 )899 @staticmethod900 def table_exists(table_name):901 return aws_stack.dynamodb_table_exists(table_name)902 @staticmethod903 def prepare_request_headers(headers):904 def _replace(regex, replace):905 headers["Authorization"] = re.sub(906 regex, replace, headers.get("Authorization") or "", flags=re.IGNORECASE907 )908 # Note: We need to ensure that the same access key is used here for all requests,909 # otherwise DynamoDBLocal stores tables/items in separate namespaces910 _replace(r"Credential=[^/]+/", rf"Credential={constants.INTERNAL_AWS_ACCESS_KEY_ID}/")911 # Note: The NoSQL Workbench sends "localhost" or "local" as the region name, which we need to fix here912 _replace(913 r"Credential=([^/]+/[^/]+)/local(host)?/",914 rf"Credential=\1/{aws_stack.get_local_region()}/",915 )916 def fix_return_consumed_capacity(self, request_dict):917 # Fix incorrect values if ReturnValues==ALL_OLD and ReturnConsumedCapacity is918 # empty, see https://github.com/localstack/localstack/issues/2049919 return_values_all = (request_dict.get("ReturnValues") == "ALL_OLD") or (920 not request_dict.get("ReturnValues")921 )922 if return_values_all and not request_dict.get("ReturnConsumedCapacity"):923 request_dict["ReturnConsumedCapacity"] = "TOTAL"924 def fix_consumed_capacity(self, request: Dict, result: Dict):925 # make sure we append 'ConsumedCapacity', which is properly926 # returned by dynalite, but not by AWS's DynamoDBLocal927 table_name = request.get("TableName")928 return_cap = request.get("ReturnConsumedCapacity")929 if "ConsumedCapacity" not in result and return_cap in ["TOTAL", "INDEXES"]:930 request["ConsumedCapacity"] = {931 "TableName": table_name,932 "CapacityUnits": 5, # TODO hardcoded933 "ReadCapacityUnits": 2,934 "WriteCapacityUnits": 3,935 }936 def fix_table_arn(self, table_arn: str) -> str:937 return re.sub(938 "arn:aws:dynamodb:ddblocal:",939 f"arn:aws:dynamodb:{aws_stack.get_region()}:",940 table_arn,941 )942 def prepare_transact_write_item_records(self, transact_items, existing_items):943 records = []944 record = self.get_record_template()945 # Fix issue #2745: existing_items only contain the Put/Update/Delete records,946 # so we will increase the index based on these events947 i = 0948 for request in transact_items:949 put_request = request.get("Put")950 if put_request:951 existing_item = existing_items[i]952 table_name = put_request["TableName"]953 keys = SchemaExtractor.extract_keys(item=put_request["Item"], table_name=table_name)954 # Add stream view type to record if ddb stream is enabled955 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)956 if stream_spec:957 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]958 new_record = copy.deepcopy(record)959 new_record["eventID"] = short_uid()960 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"961 new_record["dynamodb"]["Keys"] = keys962 new_record["dynamodb"]["NewImage"] = put_request["Item"]963 if existing_item:964 new_record["dynamodb"]["OldImage"] = existing_item965 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)966 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))967 records.append(new_record)968 i += 1969 update_request = request.get("Update")970 if update_request:971 table_name = update_request["TableName"]972 keys = update_request["Key"]973 updated_item = ItemFinder.find_existing_item(update_request, table_name)974 if not updated_item:975 return []976 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)977 if stream_spec:978 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]979 new_record = copy.deepcopy(record)980 new_record["eventID"] = short_uid()981 new_record["eventName"] = "MODIFY"982 new_record["dynamodb"]["Keys"] = keys983 new_record["dynamodb"]["OldImage"] = existing_items[i]984 new_record["dynamodb"]["NewImage"] = updated_item985 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)986 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))987 records.append(new_record)988 i += 1989 delete_request = request.get("Delete")990 if delete_request:991 table_name = delete_request["TableName"]992 keys = delete_request["Key"]993 existing_item = existing_items[i]994 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)995 if stream_spec:996 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]997 new_record = copy.deepcopy(record)998 new_record["eventID"] = short_uid()999 new_record["eventName"] = "REMOVE"1000 new_record["dynamodb"]["Keys"] = keys1001 new_record["dynamodb"]["OldImage"] = existing_item1002 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_item))1003 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1004 records.append(new_record)1005 i += 11006 return records1007 def batch_execute_statement(1008 self,1009 context: RequestContext,1010 statements: PartiQLBatchRequest,1011 return_consumed_capacity: ReturnConsumedCapacity = None,1012 ) -> BatchExecuteStatementOutput:1013 result = self.forward_request(context)1014 return result1015 def prepare_batch_write_item_records(1016 self,1017 request_items,1018 existing_items,1019 unprocessed_put_items: List,1020 unprocessed_delete_items: List,1021 ):1022 records = []1023 record = self.get_record_template()1024 unprocessed_items = {"PutRequest": {}, "DeleteRequest": {}}1025 i = 01026 for table_name in sorted(request_items.keys()):1027 # Add stream view type to record if ddb stream is enabled1028 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)1029 if stream_spec:1030 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]1031 for request in request_items[table_name]:1032 put_request = request.get("PutRequest")1033 if put_request:1034 if existing_items and len(existing_items) > i:1035 existing_item = existing_items[i]1036 keys = SchemaExtractor.extract_keys(1037 item=put_request["Item"], table_name=table_name1038 )1039 new_record = copy.deepcopy(record)1040 new_record["eventID"] = short_uid()1041 new_record["dynamodb"]["SizeBytes"] = _get_size_bytes(put_request["Item"])1042 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"1043 new_record["dynamodb"]["Keys"] = keys1044 new_record["dynamodb"]["NewImage"] = put_request["Item"]1045 if existing_item:1046 new_record["dynamodb"]["OldImage"] = existing_item1047 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1048 records.append(new_record)1049 if unprocessed_put_items and len(unprocessed_put_items) > i:1050 unprocessed_item = unprocessed_put_items[i]1051 if unprocessed_item:1052 unprocessed_items["PutRequest"].update(1053 json.loads(json.dumps(unprocessed_item))1054 )1055 delete_request = request.get("DeleteRequest")1056 if delete_request:1057 if existing_items and len(existing_items) > i:1058 keys = delete_request["Key"]1059 new_record = copy.deepcopy(record)1060 new_record["eventID"] = short_uid()1061 new_record["eventName"] = "REMOVE"1062 new_record["dynamodb"]["Keys"] = keys1063 new_record["dynamodb"]["OldImage"] = existing_items[i]1064 new_record["dynamodb"]["SizeBytes"] = _get_size_bytes(existing_items[i])1065 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1066 records.append(new_record)1067 if unprocessed_delete_items and len(unprocessed_delete_items) > i:1068 unprocessed_item = unprocessed_delete_items[i]1069 if unprocessed_item:1070 unprocessed_items["DeleteRequest"].update(1071 json.loads(json.dumps(unprocessed_item))1072 )1073 i += 11074 return records, unprocessed_items1075 def forward_stream_records(self, records: List[Dict], table_name: str = None):1076 if records and "eventName" in records[0]:1077 if table_name:1078 for record in records:1079 record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1080 EventForwarder.forward_to_targets(records, background=True)1081 def delete_all_event_source_mappings(self, table_arn):1082 if table_arn:1083 # fix start dynamodb service without lambda1084 if not is_api_enabled("lambda"):1085 return1086 lambda_client = aws_stack.connect_to_service("lambda")1087 result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)1088 for event in result["EventSourceMappings"]:1089 event_source_mapping_id = event["UUID"]1090 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)1091 def get_record_template(self) -> Dict:1092 return {1093 "eventID": short_uid(),1094 "eventVersion": "1.1",1095 "dynamodb": {1096 # expects nearest second rounded down1097 "ApproximateCreationDateTime": int(time.time()),1098 # 'StreamViewType': 'NEW_AND_OLD_IMAGES',1099 "SizeBytes": -1,1100 },1101 "awsRegion": aws_stack.get_region(),1102 "eventSource": "aws:dynamodb",1103 }1104 def check_provisioned_throughput(self, action):1105 if self.should_throttle(action):1106 message = (1107 "The level of configured provisioned throughput for the table was exceeded. "1108 + "Consider increasing your provisioning level with the UpdateTable API"1109 )1110 raise ProvisionedThroughputExceededException(message)1111 def action_should_throttle(self, action, actions):1112 throttled = [f"{ACTION_PREFIX}{a}" for a in actions]1113 return (action in throttled) or (action in actions)1114 def should_throttle(self, action):1115 rand = random.random()1116 if rand < config.DYNAMODB_READ_ERROR_PROBABILITY and self.action_should_throttle(1117 action, READ_THROTTLED_ACTIONS1118 ):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!