Best Python code snippet using localstack_python
dynamodb_listener.py
Source:dynamodb_listener.py
...422 if "TableId" not in table_definitions:423 table_definitions["TableId"] = long_uid()424 if "SSESpecification" in table_definitions:425 sse_specification = table_definitions.pop("SSESpecification")426 table_definitions["SSEDescription"] = get_sse_description(sse_specification)427 content = json.loads(to_str(response.content))428 if table_definitions:429 table_content = content.get("Table", {})430 table_content.update(table_definitions)431 content["TableDescription"].update(table_content)432 update_response_content(response, content)433 if "StreamSpecification" in data:434 create_dynamodb_stream(435 data, content["TableDescription"].get("LatestStreamLabel")436 )437 if data.get("Tags"):438 table_arn = content["TableDescription"]["TableArn"]439 DynamoDBRegion.TABLE_TAGS[table_arn] = {440 tag["Key"]: tag["Value"] for tag in data["Tags"]441 }442 event_publisher.fire_event(443 event_publisher.EVENT_DYNAMODB_CREATE_TABLE,444 payload={"n": event_publisher.get_hash(table_name)},445 )446 return447 elif action == "DeleteTable":448 if response.status_code == 200:449 table_arn = (450 json.loads(response._content).get("TableDescription", {}).get("TableArn")451 )452 event_publisher.fire_event(453 event_publisher.EVENT_DYNAMODB_DELETE_TABLE,454 payload={"n": event_publisher.get_hash(table_name)},455 )456 self.delete_all_event_source_mappings(table_arn)457 dynamodbstreams_api.delete_streams(table_arn)458 DynamoDBRegion.TABLE_TAGS.pop(table_arn, None)459 return460 elif action == "UpdateTable":461 content_str = to_str(response._content or "")462 if response.status_code == 200 and "StreamSpecification" in data:463 content = json.loads(content_str)464 create_dynamodb_stream(data, content["TableDescription"].get("LatestStreamLabel"))465 if (466 response.status_code >= 400467 and data.get("ReplicaUpdates")468 and "Nothing to update" in content_str469 ):470 table_name = data.get("TableName")471 # update local table props (replicas)472 table_properties = DynamoDBRegion.get().table_properties473 table_properties[table_name] = table_props = table_properties.get(table_name) or {}474 table_props["Replicas"] = replicas = table_props.get("Replicas") or []475 for repl_update in data["ReplicaUpdates"]:476 for key, details in repl_update.items():477 region = details.get("RegionName")478 if key == "Create":479 details["ReplicaStatus"] = details.get("ReplicaStatus") or "ACTIVE"480 replicas.append(details)481 if key == "Update":482 replica = [r for r in replicas if r.get("RegionName") == region]483 if replica:484 replica[0].update(details)485 if key == "Delete":486 table_props["Replicas"] = [487 r for r in replicas if r.get("RegionName") != region488 ]489 # update response content490 schema = get_table_schema(table_name)491 result = {"TableDescription": schema["Table"]}492 update_response_content(response, json_safe(result), 200)493 return494 elif action == "DescribeTable":495 table_name = data.get("TableName")496 table_props = DynamoDBRegion.get().table_properties.get(table_name)497 if table_props:498 content = json.loads(to_str(response.content))499 content.get("Table", {}).update(table_props)500 update_response_content(response, content)501 # Update only TableId and SSEDescription if present502 table_definitions = DynamoDBRegion.get().table_definitions.get(table_name)503 if table_definitions:504 for key in ["TableId", "SSEDescription"]:505 if table_definitions.get(key):506 content = json.loads(to_str(response.content))507 content.get("Table", {})[key] = table_definitions[key]508 update_response_content(response, content)509 elif action == "TagResource":510 table_arn = data["ResourceArn"]511 table_tags = DynamoDBRegion.TABLE_TAGS512 if table_arn not in table_tags:513 table_tags[table_arn] = {}514 table_tags[table_arn].update({tag["Key"]: tag["Value"] for tag in data.get("Tags", [])})515 return516 elif action == "UntagResource":517 table_arn = data["ResourceArn"]518 for tag_key in data.get("TagKeys", []):519 DynamoDBRegion.TABLE_TAGS.get(table_arn, {}).pop(tag_key, None)520 return521 else:522 # nothing to do523 return524 if event_sources_or_streams_enabled and records and "eventName" in records[0]:525 if "TableName" in data:526 records[0]["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)527 # forward to kinesis stream528 records_to_kinesis = copy.deepcopy(records)529 forward_to_kinesis_stream(records_to_kinesis)530 # forward to lambda and ddb_streams531 forward_to_lambda(records)532 records = self.prepare_records_to_forward_to_ddb_stream(records)533 forward_to_ddb_stream(records)534 # -------------535 # UTIL METHODS536 # -------------537 def prepare_request_headers(self, headers):538 # Note: We need to ensure that the same access key is used here for all requests,539 # otherwise DynamoDBLocal stores tables/items in separate namespaces540 headers["Authorization"] = re.sub(541 r"Credential=[^/]+/",542 r"Credential=%s/" % constants.TEST_AWS_ACCESS_KEY_ID,543 headers.get("Authorization") or "",544 )545 def prepare_batch_write_item_records(self, record, data):546 records = []547 unprocessed_items = {"PutRequest": {}, "DeleteRequest": {}}548 i = 0549 for table_name in sorted(data["RequestItems"].keys()):550 # Add stream view type to record if ddb stream is enabled551 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)552 if stream_spec:553 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]554 for request in data["RequestItems"][table_name]:555 put_request = request.get("PutRequest")556 existing_items = self._thread_local("existing_items")557 if put_request:558 if existing_items and len(existing_items) > i:559 existing_item = existing_items[i]560 keys = dynamodb_extract_keys(561 item=put_request["Item"], table_name=table_name562 )563 if isinstance(keys, Response):564 return keys565 new_record = clone(record)566 new_record["eventID"] = short_uid()567 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))568 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"569 new_record["dynamodb"]["Keys"] = keys570 new_record["dynamodb"]["NewImage"] = put_request["Item"]571 if existing_item:572 new_record["dynamodb"]["OldImage"] = existing_item573 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)574 records.append(new_record)575 unprocessed_put_items = self._thread_local("unprocessed_put_items")576 if unprocessed_put_items and len(unprocessed_put_items) > i:577 unprocessed_item = unprocessed_put_items[i]578 if unprocessed_item:579 unprocessed_items["PutRequest"].update(580 json.loads(json.dumps(unprocessed_item))581 )582 delete_request = request.get("DeleteRequest")583 if delete_request:584 if existing_items and len(existing_items) > i:585 keys = delete_request["Key"]586 if isinstance(keys, Response):587 return keys588 new_record = clone(record)589 new_record["eventID"] = short_uid()590 new_record["eventName"] = "REMOVE"591 new_record["dynamodb"]["Keys"] = keys592 new_record["dynamodb"]["OldImage"] = existing_items[i]593 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_items[i]))594 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)595 records.append(new_record)596 unprocessed_delete_items = self._thread_local("unprocessed_delete_items")597 if unprocessed_delete_items and len(unprocessed_delete_items) > i:598 unprocessed_item = unprocessed_delete_items[i]599 if unprocessed_item:600 unprocessed_items["DeleteRequest"].update(601 json.loads(json.dumps(unprocessed_item))602 )603 i += 1604 return records, unprocessed_items605 def prepare_transact_write_item_records(self, record, data):606 records = []607 # Fix issue #2745: existing_items only contain the Put/Update/Delete records,608 # so we will increase the index based on these events609 i = 0610 for request in data["TransactItems"]:611 put_request = request.get("Put")612 if put_request:613 existing_item = self._thread_local("existing_items")[i]614 table_name = put_request["TableName"]615 keys = dynamodb_extract_keys(item=put_request["Item"], table_name=table_name)616 if isinstance(keys, Response):617 return keys618 # Add stream view type to record if ddb stream is enabled619 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)620 if stream_spec:621 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]622 new_record = clone(record)623 new_record["eventID"] = short_uid()624 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"625 new_record["dynamodb"]["Keys"] = keys626 new_record["dynamodb"]["NewImage"] = put_request["Item"]627 if existing_item:628 new_record["dynamodb"]["OldImage"] = existing_item629 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)630 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))631 records.append(new_record)632 i += 1633 update_request = request.get("Update")634 if update_request:635 table_name = update_request["TableName"]636 keys = update_request["Key"]637 if isinstance(keys, Response):638 return keys639 updated_item = find_existing_item(update_request, table_name)640 if not updated_item:641 return []642 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)643 if stream_spec:644 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]645 new_record = clone(record)646 new_record["eventID"] = short_uid()647 new_record["eventName"] = "MODIFY"648 new_record["dynamodb"]["Keys"] = keys649 new_record["dynamodb"]["OldImage"] = self._thread_local("existing_items")[i]650 new_record["dynamodb"]["NewImage"] = updated_item651 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)652 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))653 records.append(new_record)654 i += 1655 delete_request = request.get("Delete")656 if delete_request:657 table_name = delete_request["TableName"]658 keys = delete_request["Key"]659 existing_item = self._thread_local("existing_items")[i]660 if isinstance(keys, Response):661 return keys662 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)663 if stream_spec:664 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]665 new_record = clone(record)666 new_record["eventID"] = short_uid()667 new_record["eventName"] = "REMOVE"668 new_record["dynamodb"]["Keys"] = keys669 new_record["dynamodb"]["OldImage"] = existing_item670 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_item))671 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)672 records.append(new_record)673 i += 1674 return records675 def prepare_records_to_forward_to_ddb_stream(self, records):676 # StreamViewType determines what information is written to the stream for the table677 # When an item in the table is inserted, updated or deleted678 for record in records:679 if record["dynamodb"].get("StreamViewType"):680 # KEYS_ONLY - Only the key attributes of the modified item are written to the stream681 if record["dynamodb"]["StreamViewType"] == "KEYS_ONLY":682 record["dynamodb"].pop("OldImage", None)683 record["dynamodb"].pop("NewImage", None)684 # NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream685 elif record["dynamodb"]["StreamViewType"] == "NEW_IMAGE":686 record["dynamodb"].pop("OldImage", None)687 # OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream688 elif record["dynamodb"]["StreamViewType"] == "OLD_IMAGE":689 record["dynamodb"].pop("NewImage", None)690 return records691 def delete_all_event_source_mappings(self, table_arn):692 if table_arn:693 # fix start dynamodb service without lambda694 if not is_api_enabled("lambda"):695 return696 lambda_client = aws_stack.connect_to_service("lambda")697 result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)698 for event in result["EventSourceMappings"]:699 event_source_mapping_id = event["UUID"]700 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)701 @staticmethod702 def _thread_local(name, default=None):703 try:704 return getattr(ProxyListenerDynamoDB.thread_local, name)705 except AttributeError:706 return default707def get_sse_kms_managed_key():708 if MANAGED_KMS_KEYS.get(aws_stack.get_region()):709 return MANAGED_KMS_KEYS[aws_stack.get_region()]710 kms_client = aws_stack.connect_to_service("kms")711 key_data = kms_client.create_key(Description="Default key that protects DynamoDB data")712 key_id = key_data["KeyMetadata"]["KeyId"]713 # not really happy with this import here714 from localstack.services.kms import kms_listener715 kms_listener.set_key_managed(key_id)716 MANAGED_KMS_KEYS[aws_stack.get_region()] = key_id717 return key_id718def get_sse_description(data):719 if data.get("Enabled"):720 kms_master_key_id = data.get("KMSMasterKeyId")721 if not kms_master_key_id:722 # this is of course not the actual key for dynamodb, just a better, since existing, mock723 kms_master_key_id = get_sse_kms_managed_key()724 kms_master_key_id = aws_stack.kms_key_arn(kms_master_key_id)725 return {726 "Status": "ENABLED",727 "SSEType": "KMS", # no other value is allowed here728 "KMSMasterKeyArn": kms_master_key_id,729 }730 return {}731def handle_special_request(method, path, data, headers):732 if path.startswith("/shell") or method == "GET":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!