Best Python code snippet using localstack_python
provider.py
Source:provider.py
...452 @handler("PutItem", expand=False)453 def put_item(self, context: RequestContext, put_item_input: PutItemInput) -> PutItemOutput:454 existing_item = None455 table_name = put_item_input["TableName"]456 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)457 if event_sources_or_streams_enabled:458 existing_item = ItemFinder.find_existing_item(put_item_input)459 # forward request to backend460 self.fix_return_consumed_capacity(put_item_input)461 result = self.forward_request(context, put_item_input)462 # Get stream specifications details for the table463 if event_sources_or_streams_enabled:464 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)465 item = put_item_input["Item"]466 # prepare record keys467 keys = SchemaExtractor.extract_keys(item=item, table_name=table_name)468 # create record469 record = self.get_record_template()470 record["eventName"] = "INSERT" if not existing_item else "MODIFY"471 record["dynamodb"].update(472 {473 "Keys": keys,474 "NewImage": item,475 "SizeBytes": len(json.dumps(item)),476 }477 )478 if stream_spec:479 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]480 if existing_item:481 record["dynamodb"]["OldImage"] = existing_item482 self.forward_stream_records([record], table_name=table_name)483 return result484 @handler("DeleteItem", expand=False)485 def delete_item(486 self,487 context: RequestContext,488 delete_item_input: DeleteItemInput,489 ) -> DeleteItemOutput:490 existing_item = None491 table_name = delete_item_input["TableName"]492 if has_event_sources_or_streams_enabled(table_name):493 existing_item = ItemFinder.find_existing_item(delete_item_input)494 # forward request to backend495 self.fix_return_consumed_capacity(delete_item_input)496 result = self.forward_request(context, delete_item_input)497 # determine and forward stream record498 if existing_item:499 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)500 if event_sources_or_streams_enabled:501 # create record502 record = self.get_record_template()503 record["eventName"] = "REMOVE"504 record["dynamodb"].update(505 {506 "Keys": delete_item_input["Key"],507 "OldImage": existing_item,508 "SizeBytes": len(json.dumps(existing_item)),509 }510 )511 # Get stream specifications details for the table512 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)513 if stream_spec:514 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]515 self.forward_stream_records([record], table_name=table_name)516 return result517 @handler("UpdateItem", expand=False)518 def update_item(519 self,520 context: RequestContext,521 update_item_input: UpdateItemInput,522 ) -> UpdateItemOutput:523 existing_item = None524 table_name = update_item_input["TableName"]525 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(table_name)526 if event_sources_or_streams_enabled:527 existing_item = ItemFinder.find_existing_item(update_item_input)528 # forward request to backend529 self.fix_return_consumed_capacity(update_item_input)530 result = self.forward_request(context, update_item_input)531 # construct and forward stream record532 if event_sources_or_streams_enabled:533 updated_item = ItemFinder.find_existing_item(update_item_input)534 if updated_item:535 record = self.get_record_template()536 record["eventName"] = "INSERT" if not existing_item else "MODIFY"537 record["dynamodb"].update(538 {539 "Keys": update_item_input["Key"],540 "NewImage": updated_item,541 "SizeBytes": len(json.dumps(updated_item)),542 }543 )544 if existing_item:545 record["dynamodb"]["OldImage"] = existing_item546 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)547 if stream_spec:548 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]549 self.forward_stream_records([record], table_name=table_name)550 return result551 @handler("GetItem", expand=False)552 def get_item(self, context: RequestContext, get_item_input: GetItemInput) -> GetItemOutput:553 result = self.forward_request(context)554 self.fix_consumed_capacity(get_item_input, result)555 return result556 @handler("Query", expand=False)557 def query(self, context: RequestContext, query_input: QueryInput) -> QueryOutput:558 index_name = query_input.get("IndexName")559 if index_name:560 if not is_index_query_valid(query_input):561 raise ValidationException(562 "One or more parameter values were invalid: Select type ALL_ATTRIBUTES "563 "is not supported for global secondary index id-index because its projection "564 "type is not ALL",565 )566 result = self.forward_request(context)567 self.fix_consumed_capacity(query_input, result)568 return result569 @handler("Scan", expand=False)570 def scan(self, context: RequestContext, scan_input: ScanInput) -> ScanOutput:571 return self.forward_request(context)572 @handler("BatchWriteItem", expand=False)573 def batch_write_item(574 self,575 context: RequestContext,576 batch_write_item_input: BatchWriteItemInput,577 ) -> BatchWriteItemOutput:578 existing_items = []579 unprocessed_put_items = []580 unprocessed_delete_items = []581 request_items = batch_write_item_input["RequestItems"]582 for table_name in sorted(request_items.keys()):583 for request in request_items[table_name]:584 for key in ["PutRequest", "DeleteRequest"]:585 inner_request = request.get(key)586 if inner_request:587 if self.should_throttle("BatchWriteItem"):588 if key == "PutRequest":589 unprocessed_put_items.append(inner_request)590 elif key == "DeleteRequest":591 unprocessed_delete_items.append(inner_request)592 else:593 item = ItemFinder.find_existing_item(inner_request, table_name)594 existing_items.append(item)595 # forward request to backend596 result = self.forward_request(context)597 # determine and forward stream records598 request_items = batch_write_item_input["RequestItems"]599 records, unprocessed_items = self.prepare_batch_write_item_records(600 request_items=request_items,601 unprocessed_put_items=unprocessed_put_items,602 unprocessed_delete_items=unprocessed_delete_items,603 existing_items=existing_items,604 )605 streams_enabled_cache = {}606 event_sources_or_streams_enabled = False607 for record in records:608 event_sources_or_streams_enabled = (609 event_sources_or_streams_enabled610 or has_event_sources_or_streams_enabled(611 record["eventSourceARN"], streams_enabled_cache612 )613 )614 if event_sources_or_streams_enabled:615 self.forward_stream_records(records)616 # update response617 if any(unprocessed_items):618 table_name = list(request_items.keys())[0]619 unprocessed = result["UnprocessedItems"]620 if table_name not in unprocessed:621 unprocessed[table_name] = []622 for key in ["PutRequest", "DeleteRequest"]:623 if any(unprocessed_items[key]):624 unprocessed_items[table_name].append({key: unprocessed_items[key]})625 for key in list(unprocessed.keys()):626 if not unprocessed.get(key):627 del unprocessed[key]628 return result629 @handler("TransactWriteItems", expand=False)630 def transact_write_items(631 self,632 context: RequestContext,633 transact_write_items_input: TransactWriteItemsInput,634 ) -> TransactWriteItemsOutput:635 existing_items = []636 for item in transact_write_items_input["TransactItems"]:637 for key in ["Put", "Update", "Delete"]:638 inner_item = item.get(key)639 if inner_item:640 existing_items.append(ItemFinder.find_existing_item(inner_item))641 # forward request to backend642 result = self.forward_request(context)643 # determine and forward stream records644 streams_enabled_cache = {}645 records = self.prepare_transact_write_item_records(646 transact_items=transact_write_items_input["TransactItems"],647 existing_items=existing_items,648 )649 event_sources_or_streams_enabled = False650 for record in records:651 event_sources_or_streams_enabled = (652 event_sources_or_streams_enabled653 or has_event_sources_or_streams_enabled(654 record["eventSourceARN"], streams_enabled_cache655 )656 )657 if event_sources_or_streams_enabled:658 self.forward_stream_records(records)659 return result660 @handler("ExecuteStatement", expand=False)661 def execute_statement(662 self,663 context: RequestContext,664 execute_statement_input: ExecuteStatementInput,665 ) -> ExecuteStatementOutput:666 statement = execute_statement_input["Statement"]667 table_name = extract_table_name_from_partiql_update(statement)668 existing_items = None669 if table_name and has_event_sources_or_streams_enabled(table_name):670 # Note: fetching the entire list of items is hugely inefficient, especially for larger tables671 # TODO: find a mechanism to hook into the PartiQL update mechanism of DynamoDB Local directly!672 existing_items = ItemFinder.list_existing_items_for_statement(statement)673 # forward request to backend674 result = self.forward_request(context)675 # construct and forward stream record676 event_sources_or_streams_enabled = table_name and has_event_sources_or_streams_enabled(677 table_name678 )679 if event_sources_or_streams_enabled:680 records = get_updated_records(table_name, existing_items)681 self.forward_stream_records(records, table_name=table_name)682 return result683 def tag_resource(684 self, context: RequestContext, resource_arn: ResourceArnString, tags: TagList685 ) -> None:686 table_tags = DynamoDBRegion.TABLE_TAGS687 if resource_arn not in table_tags:688 table_tags[resource_arn] = {}689 table_tags[resource_arn].update({tag["Key"]: tag["Value"] for tag in tags})690 def untag_resource(691 self, context: RequestContext, resource_arn: ResourceArnString, tag_keys: TagKeyList692 ) -> None:693 for tag_key in tag_keys or []:694 DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).pop(tag_key, None)695 def list_tags_of_resource(696 self,697 context: RequestContext,698 resource_arn: ResourceArnString,699 next_token: NextTokenString = None,700 ) -> ListTagsOfResourceOutput:701 result = [702 {"Key": k, "Value": v}703 for k, v in DynamoDBRegion.TABLE_TAGS.get(resource_arn, {}).items()704 ]705 return ListTagsOfResourceOutput(Tags=result)706 def describe_time_to_live(707 self, context: RequestContext, table_name: TableName708 ) -> DescribeTimeToLiveOutput:709 backend = DynamoDBRegion.get()710 ttl_spec = backend.ttl_specifications.get(table_name)711 result = {"TimeToLiveStatus": "DISABLED"}712 if ttl_spec:713 if ttl_spec.get("Enabled"):714 ttl_status = "ENABLED"715 else:716 ttl_status = "DISABLED"717 result = {718 "AttributeName": ttl_spec.get("AttributeName"),719 "TimeToLiveStatus": ttl_status,720 }721 return DescribeTimeToLiveOutput(TimeToLiveDescription=result)722 def update_time_to_live(723 self,724 context: RequestContext,725 table_name: TableName,726 time_to_live_specification: TimeToLiveSpecification,727 ) -> UpdateTimeToLiveOutput:728 # TODO: TTL status is maintained/mocked but no real expiry is happening for items729 backend = DynamoDBRegion.get()730 backend.ttl_specifications[table_name] = time_to_live_specification731 return UpdateTimeToLiveOutput(TimeToLiveSpecification=time_to_live_specification)732 def create_global_table(733 self, context: RequestContext, global_table_name: TableName, replication_group: ReplicaList734 ) -> CreateGlobalTableOutput:735 if global_table_name in DynamoDBRegion.GLOBAL_TABLES:736 raise GlobalTableAlreadyExistsException("Global table with this name already exists")737 replication_group = [grp.copy() for grp in replication_group or []]738 data = {"GlobalTableName": global_table_name, "ReplicationGroup": replication_group}739 DynamoDBRegion.GLOBAL_TABLES[global_table_name] = data740 for group in replication_group:741 group["ReplicaStatus"] = "ACTIVE"742 group["ReplicaStatusDescription"] = "Replica active"743 return CreateGlobalTableOutput(GlobalTableDescription=data)744 def describe_global_table(745 self, context: RequestContext, global_table_name: TableName746 ) -> DescribeGlobalTableOutput:747 details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)748 if not details:749 raise GlobalTableNotFoundException("Global table with this name does not exist")750 return DescribeGlobalTableOutput(GlobalTableDescription=details)751 def list_global_tables(752 self,753 context: RequestContext,754 exclusive_start_global_table_name: TableName = None,755 limit: PositiveIntegerObject = None,756 region_name: RegionName = None,757 ) -> ListGlobalTablesOutput:758 # TODO: add paging support759 result = [760 select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])761 for tab in DynamoDBRegion.GLOBAL_TABLES.values()762 ]763 return ListGlobalTablesOutput(GlobalTables=result)764 def update_global_table(765 self,766 context: RequestContext,767 global_table_name: TableName,768 replica_updates: ReplicaUpdateList,769 ) -> UpdateGlobalTableOutput:770 details = DynamoDBRegion.GLOBAL_TABLES.get(global_table_name)771 if not details:772 raise GlobalTableNotFoundException("Global table with this name does not exist")773 for update in replica_updates or []:774 repl_group = details["ReplicationGroup"]775 # delete existing776 delete = update.get("Delete")777 if delete:778 details["ReplicationGroup"] = [779 g for g in repl_group if g["RegionName"] != delete["RegionName"]780 ]781 # create new782 create = update.get("Create")783 if create:784 exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]785 if exists:786 continue787 new_group = {788 "RegionName": create["RegionName"],789 "ReplicaStatus": "ACTIVE",790 "ReplicaStatusDescription": "Replica active",791 }792 details["ReplicationGroup"].append(new_group)793 return UpdateGlobalTableOutput(GlobalTableDescription=details)794 def enable_kinesis_streaming_destination(795 self, context: RequestContext, table_name: TableName, stream_arn: StreamArn796 ) -> KinesisStreamingDestinationOutput:797 # Check if table exists, to avoid error log output from DynamoDBLocal798 if not self.table_exists(table_name):799 raise ResourceNotFoundException("Cannot do operations on a non-existent table")800 stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)801 if not stream:802 raise ValidationException("User does not have a permission to use kinesis stream")803 table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})804 dest_status = table_def.get("KinesisDataStreamDestinationStatus")805 if dest_status not in ["DISABLED", "ENABLE_FAILED", None]:806 raise ValidationException(807 "Table is not in a valid state to enable Kinesis Streaming "808 "Destination:EnableKinesisStreamingDestination must be DISABLED or ENABLE_FAILED "809 "to perform ENABLE operation."810 )811 table_def["KinesisDataStreamDestinations"] = (812 table_def.get("KinesisDataStreamDestinations") or []813 )814 # remove the stream destination if already present815 table_def["KinesisDataStreamDestinations"] = [816 t for t in table_def["KinesisDataStreamDestinations"] if t["StreamArn"] != stream_arn817 ]818 # append the active stream destination at the end of the list819 table_def["KinesisDataStreamDestinations"].append(820 {821 "DestinationStatus": "ACTIVE",822 "DestinationStatusDescription": "Stream is active",823 "StreamArn": stream_arn,824 }825 )826 table_def["KinesisDataStreamDestinationStatus"] = "ACTIVE"827 return KinesisStreamingDestinationOutput(828 DestinationStatus="ACTIVE", StreamArn=stream_arn, TableName=table_name829 )830 def disable_kinesis_streaming_destination(831 self, context: RequestContext, table_name: TableName, stream_arn: StreamArn832 ) -> KinesisStreamingDestinationOutput:833 # Check if table exists, to avoid error log output from DynamoDBLocal834 if not self.table_exists(table_name):835 raise ResourceNotFoundException("Cannot do operations on a non-existent table")836 stream = EventForwarder.is_kinesis_stream_exists(stream_arn=stream_arn)837 if not stream:838 raise ValidationException(839 "User does not have a permission to use kinesis stream",840 )841 table_def = DynamoDBRegion.get().table_definitions.setdefault(table_name, {})842 stream_destinations = table_def.get("KinesisDataStreamDestinations")843 if stream_destinations:844 if table_def["KinesisDataStreamDestinationStatus"] == "ACTIVE":845 for dest in stream_destinations:846 if dest["StreamArn"] == stream_arn and dest["DestinationStatus"] == "ACTIVE":847 dest["DestinationStatus"] = "DISABLED"848 dest["DestinationStatusDescription"] = ("Stream is disabled",)849 table_def["KinesisDataStreamDestinationStatus"] = "DISABLED"850 return KinesisStreamingDestinationOutput(851 DestinationStatus="DISABLED",852 StreamArn=stream_arn,853 TableName=table_name,854 )855 raise ValidationException(856 "Table is not in a valid state to disable Kinesis Streaming Destination:"857 "DisableKinesisStreamingDestination must be ACTIVE to perform DISABLE operation."858 )859 def describe_kinesis_streaming_destination(860 self, context: RequestContext, table_name: TableName861 ) -> DescribeKinesisStreamingDestinationOutput:862 # Check if table exists, to avoid error log output from DynamoDBLocal863 if not self.table_exists(table_name):864 raise ResourceNotFoundException("Cannot do operations on a non-existent table")865 table_def = DynamoDBRegion.get().table_definitions.get(table_name)866 stream_destinations = table_def.get("KinesisDataStreamDestinations") or []867 return DescribeKinesisStreamingDestinationOutput(868 KinesisDataStreamDestinations=stream_destinations, TableName=table_name869 )870 @staticmethod871 def table_exists(table_name):872 return aws_stack.dynamodb_table_exists(table_name)873 @staticmethod874 def prepare_request_headers(headers):875 def _replace(regex, replace):876 headers["Authorization"] = re.sub(877 regex, replace, headers.get("Authorization") or "", flags=re.IGNORECASE878 )879 # Note: We need to ensure that the same access key is used here for all requests,880 # otherwise DynamoDBLocal stores tables/items in separate namespaces881 _replace(r"Credential=[^/]+/", rf"Credential={constants.INTERNAL_AWS_ACCESS_KEY_ID}/")882 # Note: The NoSQL Workbench sends "localhost" or "local" as the region name, which we need to fix here883 _replace(884 r"Credential=([^/]+/[^/]+)/local(host)?/",885 rf"Credential=\1/{aws_stack.get_local_region()}/",886 )887 def fix_return_consumed_capacity(self, request_dict):888 # Fix incorrect values if ReturnValues==ALL_OLD and ReturnConsumedCapacity is889 # empty, see https://github.com/localstack/localstack/issues/2049890 return_values_all = (request_dict.get("ReturnValues") == "ALL_OLD") or (891 not request_dict.get("ReturnValues")892 )893 if return_values_all and not request_dict.get("ReturnConsumedCapacity"):894 request_dict["ReturnConsumedCapacity"] = "TOTAL"895 def fix_consumed_capacity(self, request: Dict, result: Dict):896 # make sure we append 'ConsumedCapacity', which is properly897 # returned by dynalite, but not by AWS's DynamoDBLocal898 table_name = request.get("TableName")899 return_cap = request.get("ReturnConsumedCapacity")900 if "ConsumedCapacity" not in result and return_cap in ["TOTAL", "INDEXES"]:901 request["ConsumedCapacity"] = {902 "TableName": table_name,903 "CapacityUnits": 5, # TODO hardcoded904 "ReadCapacityUnits": 2,905 "WriteCapacityUnits": 3,906 }907 def fix_table_arn(self, table_arn: str) -> str:908 return re.sub(909 "arn:aws:dynamodb:ddblocal:",910 f"arn:aws:dynamodb:{aws_stack.get_region()}:",911 table_arn,912 )913 def prepare_transact_write_item_records(self, transact_items, existing_items):914 records = []915 record = self.get_record_template()916 # Fix issue #2745: existing_items only contain the Put/Update/Delete records,917 # so we will increase the index based on these events918 i = 0919 for request in transact_items:920 put_request = request.get("Put")921 if put_request:922 existing_item = existing_items[i]923 table_name = put_request["TableName"]924 keys = SchemaExtractor.extract_keys(item=put_request["Item"], table_name=table_name)925 # Add stream view type to record if ddb stream is enabled926 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)927 if stream_spec:928 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]929 new_record = copy.deepcopy(record)930 new_record["eventID"] = short_uid()931 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"932 new_record["dynamodb"]["Keys"] = keys933 new_record["dynamodb"]["NewImage"] = put_request["Item"]934 if existing_item:935 new_record["dynamodb"]["OldImage"] = existing_item936 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)937 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))938 records.append(new_record)939 i += 1940 update_request = request.get("Update")941 if update_request:942 table_name = update_request["TableName"]943 keys = update_request["Key"]944 updated_item = ItemFinder.find_existing_item(update_request, table_name)945 if not updated_item:946 return []947 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)948 if stream_spec:949 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]950 new_record = copy.deepcopy(record)951 new_record["eventID"] = short_uid()952 new_record["eventName"] = "MODIFY"953 new_record["dynamodb"]["Keys"] = keys954 new_record["dynamodb"]["OldImage"] = existing_items[i]955 new_record["dynamodb"]["NewImage"] = updated_item956 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)957 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))958 records.append(new_record)959 i += 1960 delete_request = request.get("Delete")961 if delete_request:962 table_name = delete_request["TableName"]963 keys = delete_request["Key"]964 existing_item = existing_items[i]965 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)966 if stream_spec:967 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]968 new_record = copy.deepcopy(record)969 new_record["eventID"] = short_uid()970 new_record["eventName"] = "REMOVE"971 new_record["dynamodb"]["Keys"] = keys972 new_record["dynamodb"]["OldImage"] = existing_item973 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_item))974 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)975 records.append(new_record)976 i += 1977 return records978 def prepare_batch_write_item_records(979 self,980 request_items,981 existing_items,982 unprocessed_put_items: List,983 unprocessed_delete_items: List,984 ):985 records = []986 record = self.get_record_template()987 unprocessed_items = {"PutRequest": {}, "DeleteRequest": {}}988 i = 0989 for table_name in sorted(request_items.keys()):990 # Add stream view type to record if ddb stream is enabled991 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)992 if stream_spec:993 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]994 for request in request_items[table_name]:995 put_request = request.get("PutRequest")996 if put_request:997 if existing_items and len(existing_items) > i:998 existing_item = existing_items[i]999 keys = SchemaExtractor.extract_keys(1000 item=put_request["Item"], table_name=table_name1001 )1002 new_record = copy.deepcopy(record)1003 new_record["eventID"] = short_uid()1004 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))1005 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"1006 new_record["dynamodb"]["Keys"] = keys1007 new_record["dynamodb"]["NewImage"] = put_request["Item"]1008 if existing_item:1009 new_record["dynamodb"]["OldImage"] = existing_item1010 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1011 records.append(new_record)1012 if unprocessed_put_items and len(unprocessed_put_items) > i:1013 unprocessed_item = unprocessed_put_items[i]1014 if unprocessed_item:1015 unprocessed_items["PutRequest"].update(1016 json.loads(json.dumps(unprocessed_item))1017 )1018 delete_request = request.get("DeleteRequest")1019 if delete_request:1020 if existing_items and len(existing_items) > i:1021 keys = delete_request["Key"]1022 new_record = copy.deepcopy(record)1023 new_record["eventID"] = short_uid()1024 new_record["eventName"] = "REMOVE"1025 new_record["dynamodb"]["Keys"] = keys1026 new_record["dynamodb"]["OldImage"] = existing_items[i]1027 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_items[i]))1028 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1029 records.append(new_record)1030 if unprocessed_delete_items and len(unprocessed_delete_items) > i:1031 unprocessed_item = unprocessed_delete_items[i]1032 if unprocessed_item:1033 unprocessed_items["DeleteRequest"].update(1034 json.loads(json.dumps(unprocessed_item))1035 )1036 i += 11037 return records, unprocessed_items1038 def forward_stream_records(self, records: List[Dict], table_name: str = None):1039 if records and "eventName" in records[0]:1040 if table_name:1041 for record in records:1042 record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)1043 EventForwarder.forward_to_targets(records, background=True)1044 def delete_all_event_source_mappings(self, table_arn):1045 if table_arn:1046 # fix start dynamodb service without lambda1047 if not is_api_enabled("lambda"):1048 return1049 lambda_client = aws_stack.connect_to_service("lambda")1050 result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)1051 for event in result["EventSourceMappings"]:1052 event_source_mapping_id = event["UUID"]1053 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)1054 def get_record_template(self) -> Dict:1055 return {1056 "eventID": short_uid(),1057 "eventVersion": "1.1",1058 "dynamodb": {1059 # expects nearest second rounded down1060 "ApproximateCreationDateTime": int(time.time()),1061 # 'StreamViewType': 'NEW_AND_OLD_IMAGES',1062 "SizeBytes": -1,1063 },1064 "awsRegion": aws_stack.get_region(),1065 "eventSource": "aws:dynamodb",1066 }1067 def action_should_throttle(self, action, actions):1068 throttled = [f"{ACTION_PREFIX}{a}" for a in actions]1069 return (action in throttled) or (action in actions)1070 def should_throttle(self, action):1071 rand = random.random()1072 if rand < config.DYNAMODB_READ_ERROR_PROBABILITY and self.action_should_throttle(1073 action, READ_THROTTLED_ACTIONS1074 ):1075 return True1076 elif rand < config.DYNAMODB_WRITE_ERROR_PROBABILITY and self.action_should_throttle(1077 action, WRITE_THROTTLED_ACTIONS1078 ):1079 return True1080 elif rand < config.DYNAMODB_ERROR_PROBABILITY and self.action_should_throttle(1081 action, THROTTLED_ACTIONS1082 ):1083 return True1084 else:1085 return False1086# ---1087# Misc. util functions1088# ---1089def get_global_secondary_index(table_name, index_name):1090 schema = SchemaExtractor.get_table_schema(table_name)1091 for index in schema["Table"].get("GlobalSecondaryIndexes", []):1092 if index["IndexName"] == index_name:1093 return index1094 raise ResourceNotFoundException("Index not found")1095def is_index_query_valid(query_data: dict) -> bool:1096 table_name = to_str(query_data["TableName"])1097 index_name = to_str(query_data["IndexName"])1098 index_query_type = query_data.get("Select")1099 index = get_global_secondary_index(table_name, index_name)1100 index_projection_type = index.get("Projection").get("ProjectionType")1101 if index_query_type == "ALL_ATTRIBUTES" and index_projection_type != "ALL":1102 return False1103 return True1104def has_event_sources_or_streams_enabled(table_name: str, cache: Dict = None):1105 if cache is None:1106 cache = {}1107 if not table_name:1108 return1109 table_arn = aws_stack.dynamodb_table_arn(table_name)1110 cached = cache.get(table_arn)1111 if isinstance(cached, bool):1112 return cached1113 sources = lambda_api.get_event_sources(source_arn=table_arn)1114 result = False1115 if sources:1116 result = True1117 if not result and dynamodbstreams_api.get_stream_for_table(table_arn):1118 result = True...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!