How to use is_kinesis_stream_exists method in localstack

Best Python code snippet using localstack_python

dynamodb_listener.py

Source:dynamodb_listener.py Github

copy

Full Screen

...240 elif action == "EnableKinesisStreamingDestination":241 # Check if table exists, to avoid error log output from DynamoDBLocal242 if not self.table_exists(ddb_client, data["TableName"]):243 return get_table_not_found_error()244 stream = is_kinesis_stream_exists(stream_arn=data["StreamArn"])245 if not stream:246 return error_response(247 error_type="ValidationException",248 message="User does not have a permission to use kinesis stream",249 )250 return dynamodb_enable_kinesis_streaming_destination(data, table_def)251 elif action == "DisableKinesisStreamingDestination":252 # Check if table exists, to avoid error log output from DynamoDBLocal253 if not self.table_exists(ddb_client, data["TableName"]):254 return get_table_not_found_error()255 stream = is_kinesis_stream_exists(stream_arn=data["StreamArn"])256 if not stream:257 return error_response(258 error_type="ValidationException",259 message="User does not have a permission to use kinesis stream",260 )261 return dynamodb_disable_kinesis_streaming_destination(data, table_def)262 elif action == "DescribeKinesisStreamingDestination":263 # Check if table exists, to avoid error log output from DynamoDBLocal264 if not self.table_exists(ddb_client, data["TableName"]):265 return get_table_not_found_error()266 response = aws_responses.requests_response(267 {268 "KinesisDataStreamDestinations": table_def.get("KinesisDataStreamDestinations")269 or [],270 "TableName": data["TableName"],271 }272 )273 return response274 return Request(data=data_orig, method=method, headers=headers)275 def return_response(self, method, path, data, headers, response):276 if path.startswith("/shell") or method == "GET":277 return278 data = json.loads(to_str(data))279 # update table definitions280 if data and "TableName" in data and "KeySchema" in data:281 table_definitions = DynamoDBRegion.get().table_definitions282 table_definitions[data["TableName"]] = data283 if response._content:284 # fix the table and latest stream ARNs (DynamoDBLocal hardcodes "ddblocal" as the region)285 content_replaced = re.sub(286 r'("TableArn"|"LatestStreamArn"|"StreamArn")\s*:\s*"arn:aws:dynamodb:ddblocal:([^"]+)"',287 r'\1: "arn:aws:dynamodb:%s:\2"' % aws_stack.get_region(),288 to_str(response._content),289 )290 if content_replaced != response._content:291 response._content = content_replaced292 fix_headers_for_updated_response(response)293 action = headers.get("X-Amz-Target", "")294 action = action.replace(ACTION_PREFIX, "")295 if not action:296 return297 # upgrade event version to 1.1298 record = {299 "eventID": "1",300 "eventVersion": "1.1",301 "dynamodb": {302 "ApproximateCreationDateTime": time.time(),303 # 'StreamViewType': 'NEW_AND_OLD_IMAGES',304 "SizeBytes": -1,305 },306 "awsRegion": aws_stack.get_region(),307 "eventSource": "aws:dynamodb",308 }309 records = [record]310 streams_enabled_cache = {}311 table_name = data.get("TableName")312 event_sources_or_streams_enabled = has_event_sources_or_streams_enabled(313 table_name, streams_enabled_cache314 )315 if action == "UpdateItem":316 if response.status_code == 200 and event_sources_or_streams_enabled:317 existing_item = self._thread_local("existing_item")318 record["eventName"] = "INSERT" if not existing_item else "MODIFY"319 record["eventID"] = short_uid()320 updated_item = find_existing_item(data)321 if not updated_item:322 return323 record["dynamodb"]["Keys"] = data["Key"]324 if existing_item:325 record["dynamodb"]["OldImage"] = existing_item326 record["dynamodb"]["NewImage"] = updated_item327 record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))328 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)329 if stream_spec:330 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]331 elif action == "BatchWriteItem":332 records, unprocessed_items = self.prepare_batch_write_item_records(record, data)333 for record in records:334 event_sources_or_streams_enabled = (335 event_sources_or_streams_enabled336 or has_event_sources_or_streams_enabled(337 record["eventSourceARN"], streams_enabled_cache338 )339 )340 if response.status_code == 200 and any(unprocessed_items):341 content = json.loads(to_str(response.content))342 table_name = list(data["RequestItems"].keys())[0]343 if table_name not in content["UnprocessedItems"]:344 content["UnprocessedItems"][table_name] = []345 for key in ["PutRequest", "DeleteRequest"]:346 if any(unprocessed_items[key]):347 content["UnprocessedItems"][table_name].append(348 {key: unprocessed_items[key]}349 )350 unprocessed = content["UnprocessedItems"]351 for key in list(unprocessed.keys()):352 if not unprocessed.get(key):353 del unprocessed[key]354 response._content = json.dumps(content)355 fix_headers_for_updated_response(response)356 elif action == "TransactWriteItems":357 records = self.prepare_transact_write_item_records(record, data)358 for record in records:359 event_sources_or_streams_enabled = (360 event_sources_or_streams_enabled361 or has_event_sources_or_streams_enabled(362 record["eventSourceARN"], streams_enabled_cache363 )364 )365 elif action == "PutItem":366 if response.status_code == 200:367 keys = dynamodb_extract_keys(item=data["Item"], table_name=table_name)368 if isinstance(keys, Response):369 return keys370 # fix response371 if response._content == "{}":372 response._content = update_put_item_response_content(data, response._content)373 fix_headers_for_updated_response(response)374 if event_sources_or_streams_enabled:375 existing_item = self._thread_local("existing_item")376 # Get stream specifications details for the table377 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)378 record["eventName"] = "INSERT" if not existing_item else "MODIFY"379 # prepare record keys380 record["dynamodb"]["Keys"] = keys381 record["dynamodb"]["NewImage"] = data["Item"]382 record["dynamodb"]["SizeBytes"] = len(json.dumps(data["Item"]))383 record["eventID"] = short_uid()384 if stream_spec:385 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]386 if existing_item:387 record["dynamodb"]["OldImage"] = existing_item388 elif action in ("GetItem", "Query"):389 if response.status_code == 200:390 content = json.loads(to_str(response.content))391 # make sure we append 'ConsumedCapacity', which is properly392 # returned by dynalite, but not by AWS's DynamoDBLocal393 if "ConsumedCapacity" not in content and data.get("ReturnConsumedCapacity") in [394 "TOTAL",395 "INDEXES",396 ]:397 content["ConsumedCapacity"] = {398 "TableName": table_name,399 "CapacityUnits": 5, # TODO hardcoded400 "ReadCapacityUnits": 2,401 "WriteCapacityUnits": 3,402 }403 response._content = json.dumps(content)404 fix_headers_for_updated_response(response)405 elif action == "DeleteItem":406 if response.status_code == 200 and event_sources_or_streams_enabled:407 old_item = self._thread_local("existing_item")408 record["eventID"] = short_uid()409 record["eventName"] = "REMOVE"410 record["dynamodb"]["Keys"] = data["Key"]411 record["dynamodb"]["OldImage"] = old_item412 record["dynamodb"]["SizeBytes"] = len(json.dumps(old_item))413 # Get stream specifications details for the table414 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)415 if stream_spec:416 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]417 elif action == "CreateTable":418 if response.status_code == 200:419 table_definitions = (420 DynamoDBRegion.get().table_definitions.get(data["TableName"]) or {}421 )422 if "TableId" not in table_definitions:423 table_definitions["TableId"] = long_uid()424 if "SSESpecification" in table_definitions:425 sse_specification = table_definitions.pop("SSESpecification")426 table_definitions["SSEDescription"] = get_sse_description(sse_specification)427 content = json.loads(to_str(response.content))428 if table_definitions:429 table_content = content.get("Table", {})430 table_content.update(table_definitions)431 content["TableDescription"].update(table_content)432 update_response_content(response, content)433 if "StreamSpecification" in data:434 create_dynamodb_stream(435 data, content["TableDescription"].get("LatestStreamLabel")436 )437 if data.get("Tags"):438 table_arn = content["TableDescription"]["TableArn"]439 DynamoDBRegion.TABLE_TAGS[table_arn] = {440 tag["Key"]: tag["Value"] for tag in data["Tags"]441 }442 event_publisher.fire_event(443 event_publisher.EVENT_DYNAMODB_CREATE_TABLE,444 payload={"n": event_publisher.get_hash(table_name)},445 )446 return447 elif action == "DeleteTable":448 if response.status_code == 200:449 table_arn = (450 json.loads(response._content).get("TableDescription", {}).get("TableArn")451 )452 event_publisher.fire_event(453 event_publisher.EVENT_DYNAMODB_DELETE_TABLE,454 payload={"n": event_publisher.get_hash(table_name)},455 )456 self.delete_all_event_source_mappings(table_arn)457 dynamodbstreams_api.delete_streams(table_arn)458 DynamoDBRegion.TABLE_TAGS.pop(table_arn, None)459 return460 elif action == "UpdateTable":461 content_str = to_str(response._content or "")462 if response.status_code == 200 and "StreamSpecification" in data:463 content = json.loads(content_str)464 create_dynamodb_stream(data, content["TableDescription"].get("LatestStreamLabel"))465 if (466 response.status_code >= 400467 and data.get("ReplicaUpdates")468 and "Nothing to update" in content_str469 ):470 table_name = data.get("TableName")471 # update local table props (replicas)472 table_properties = DynamoDBRegion.get().table_properties473 table_properties[table_name] = table_props = table_properties.get(table_name) or {}474 table_props["Replicas"] = replicas = table_props.get("Replicas") or []475 for repl_update in data["ReplicaUpdates"]:476 for key, details in repl_update.items():477 region = details.get("RegionName")478 if key == "Create":479 details["ReplicaStatus"] = details.get("ReplicaStatus") or "ACTIVE"480 replicas.append(details)481 if key == "Update":482 replica = [r for r in replicas if r.get("RegionName") == region]483 if replica:484 replica[0].update(details)485 if key == "Delete":486 table_props["Replicas"] = [487 r for r in replicas if r.get("RegionName") != region488 ]489 # update response content490 schema = get_table_schema(table_name)491 result = {"TableDescription": schema["Table"]}492 update_response_content(response, json_safe(result), 200)493 return494 elif action == "DescribeTable":495 table_name = data.get("TableName")496 table_props = DynamoDBRegion.get().table_properties.get(table_name)497 if table_props:498 content = json.loads(to_str(response.content))499 content.get("Table", {}).update(table_props)500 update_response_content(response, content)501 # Update only TableId and SSEDescription if present502 table_definitions = DynamoDBRegion.get().table_definitions.get(table_name)503 if table_definitions:504 for key in ["TableId", "SSEDescription"]:505 if table_definitions.get(key):506 content = json.loads(to_str(response.content))507 content.get("Table", {})[key] = table_definitions[key]508 update_response_content(response, content)509 elif action == "TagResource":510 table_arn = data["ResourceArn"]511 table_tags = DynamoDBRegion.TABLE_TAGS512 if table_arn not in table_tags:513 table_tags[table_arn] = {}514 table_tags[table_arn].update({tag["Key"]: tag["Value"] for tag in data.get("Tags", [])})515 return516 elif action == "UntagResource":517 table_arn = data["ResourceArn"]518 for tag_key in data.get("TagKeys", []):519 DynamoDBRegion.TABLE_TAGS.get(table_arn, {}).pop(tag_key, None)520 return521 else:522 # nothing to do523 return524 if event_sources_or_streams_enabled and records and "eventName" in records[0]:525 if "TableName" in data:526 records[0]["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)527 # forward to kinesis stream528 records_to_kinesis = copy.deepcopy(records)529 forward_to_kinesis_stream(records_to_kinesis)530 # forward to lambda and ddb_streams531 forward_to_lambda(records)532 records = self.prepare_records_to_forward_to_ddb_stream(records)533 forward_to_ddb_stream(records)534 # -------------535 # UTIL METHODS536 # -------------537 def prepare_request_headers(self, headers):538 # Note: We need to ensure that the same access key is used here for all requests,539 # otherwise DynamoDBLocal stores tables/items in separate namespaces540 headers["Authorization"] = re.sub(541 r"Credential=[^/]+/",542 r"Credential=%s/" % constants.TEST_AWS_ACCESS_KEY_ID,543 headers.get("Authorization") or "",544 )545 def prepare_batch_write_item_records(self, record, data):546 records = []547 unprocessed_items = {"PutRequest": {}, "DeleteRequest": {}}548 i = 0549 for table_name in sorted(data["RequestItems"].keys()):550 # Add stream view type to record if ddb stream is enabled551 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)552 if stream_spec:553 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]554 for request in data["RequestItems"][table_name]:555 put_request = request.get("PutRequest")556 existing_items = self._thread_local("existing_items")557 if put_request:558 if existing_items and len(existing_items) > i:559 existing_item = existing_items[i]560 keys = dynamodb_extract_keys(561 item=put_request["Item"], table_name=table_name562 )563 if isinstance(keys, Response):564 return keys565 new_record = clone(record)566 new_record["eventID"] = short_uid()567 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))568 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"569 new_record["dynamodb"]["Keys"] = keys570 new_record["dynamodb"]["NewImage"] = put_request["Item"]571 if existing_item:572 new_record["dynamodb"]["OldImage"] = existing_item573 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)574 records.append(new_record)575 unprocessed_put_items = self._thread_local("unprocessed_put_items")576 if unprocessed_put_items and len(unprocessed_put_items) > i:577 unprocessed_item = unprocessed_put_items[i]578 if unprocessed_item:579 unprocessed_items["PutRequest"].update(580 json.loads(json.dumps(unprocessed_item))581 )582 delete_request = request.get("DeleteRequest")583 if delete_request:584 if existing_items and len(existing_items) > i:585 keys = delete_request["Key"]586 if isinstance(keys, Response):587 return keys588 new_record = clone(record)589 new_record["eventID"] = short_uid()590 new_record["eventName"] = "REMOVE"591 new_record["dynamodb"]["Keys"] = keys592 new_record["dynamodb"]["OldImage"] = existing_items[i]593 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_items[i]))594 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)595 records.append(new_record)596 unprocessed_delete_items = self._thread_local("unprocessed_delete_items")597 if unprocessed_delete_items and len(unprocessed_delete_items) > i:598 unprocessed_item = unprocessed_delete_items[i]599 if unprocessed_item:600 unprocessed_items["DeleteRequest"].update(601 json.loads(json.dumps(unprocessed_item))602 )603 i += 1604 return records, unprocessed_items605 def prepare_transact_write_item_records(self, record, data):606 records = []607 # Fix issue #2745: existing_items only contain the Put/Update/Delete records,608 # so we will increase the index based on these events609 i = 0610 for request in data["TransactItems"]:611 put_request = request.get("Put")612 if put_request:613 existing_item = self._thread_local("existing_items")[i]614 table_name = put_request["TableName"]615 keys = dynamodb_extract_keys(item=put_request["Item"], table_name=table_name)616 if isinstance(keys, Response):617 return keys618 # Add stream view type to record if ddb stream is enabled619 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)620 if stream_spec:621 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]622 new_record = clone(record)623 new_record["eventID"] = short_uid()624 new_record["eventName"] = "INSERT" if not existing_item else "MODIFY"625 new_record["dynamodb"]["Keys"] = keys626 new_record["dynamodb"]["NewImage"] = put_request["Item"]627 if existing_item:628 new_record["dynamodb"]["OldImage"] = existing_item629 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)630 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(put_request["Item"]))631 records.append(new_record)632 i += 1633 update_request = request.get("Update")634 if update_request:635 table_name = update_request["TableName"]636 keys = update_request["Key"]637 if isinstance(keys, Response):638 return keys639 updated_item = find_existing_item(update_request, table_name)640 if not updated_item:641 return []642 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)643 if stream_spec:644 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]645 new_record = clone(record)646 new_record["eventID"] = short_uid()647 new_record["eventName"] = "MODIFY"648 new_record["dynamodb"]["Keys"] = keys649 new_record["dynamodb"]["OldImage"] = self._thread_local("existing_items")[i]650 new_record["dynamodb"]["NewImage"] = updated_item651 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)652 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(updated_item))653 records.append(new_record)654 i += 1655 delete_request = request.get("Delete")656 if delete_request:657 table_name = delete_request["TableName"]658 keys = delete_request["Key"]659 existing_item = self._thread_local("existing_items")[i]660 if isinstance(keys, Response):661 return keys662 stream_spec = dynamodb_get_table_stream_specification(table_name=table_name)663 if stream_spec:664 record["dynamodb"]["StreamViewType"] = stream_spec["StreamViewType"]665 new_record = clone(record)666 new_record["eventID"] = short_uid()667 new_record["eventName"] = "REMOVE"668 new_record["dynamodb"]["Keys"] = keys669 new_record["dynamodb"]["OldImage"] = existing_item670 new_record["dynamodb"]["SizeBytes"] = len(json.dumps(existing_item))671 new_record["eventSourceARN"] = aws_stack.dynamodb_table_arn(table_name)672 records.append(new_record)673 i += 1674 return records675 def prepare_records_to_forward_to_ddb_stream(self, records):676 # StreamViewType determines what information is written to the stream for the table677 # When an item in the table is inserted, updated or deleted678 for record in records:679 if record["dynamodb"].get("StreamViewType"):680 # KEYS_ONLY - Only the key attributes of the modified item are written to the stream681 if record["dynamodb"]["StreamViewType"] == "KEYS_ONLY":682 record["dynamodb"].pop("OldImage", None)683 record["dynamodb"].pop("NewImage", None)684 # NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream685 elif record["dynamodb"]["StreamViewType"] == "NEW_IMAGE":686 record["dynamodb"].pop("OldImage", None)687 # OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream688 elif record["dynamodb"]["StreamViewType"] == "OLD_IMAGE":689 record["dynamodb"].pop("NewImage", None)690 return records691 def delete_all_event_source_mappings(self, table_arn):692 if table_arn:693 # fix start dynamodb service without lambda694 if not is_api_enabled("lambda"):695 return696 lambda_client = aws_stack.connect_to_service("lambda")697 result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)698 for event in result["EventSourceMappings"]:699 event_source_mapping_id = event["UUID"]700 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)701 @staticmethod702 def _thread_local(name, default=None):703 try:704 return getattr(ProxyListenerDynamoDB.thread_local, name)705 except AttributeError:706 return default707def get_sse_kms_managed_key():708 if MANAGED_KMS_KEYS.get(aws_stack.get_region()):709 return MANAGED_KMS_KEYS[aws_stack.get_region()]710 kms_client = aws_stack.connect_to_service("kms")711 key_data = kms_client.create_key(Description="Default key that protects DynamoDB data")712 key_id = key_data["KeyMetadata"]["KeyId"]713 # not really happy with this import here714 from localstack.services.kms import kms_listener715 kms_listener.set_key_managed(key_id)716 MANAGED_KMS_KEYS[aws_stack.get_region()] = key_id717 return key_id718def get_sse_description(data):719 if data.get("Enabled"):720 kms_master_key_id = data.get("KMSMasterKeyId")721 if not kms_master_key_id:722 # this is of course not the actual key for dynamodb, just a better, since existing, mock723 kms_master_key_id = get_sse_kms_managed_key()724 kms_master_key_id = aws_stack.kms_key_arn(kms_master_key_id)725 return {726 "Status": "ENABLED",727 "SSEType": "KMS", # no other value is allowed here728 "KMSMasterKeyArn": kms_master_key_id,729 }730 return {}731def handle_special_request(method, path, data, headers):732 if path.startswith("/shell") or method == "GET":733 if path == "/shell":734 headers = {"Refresh": "0; url=%s/shell/" % config.TEST_DYNAMODB_URL}735 return aws_responses.requests_response("", headers=headers)736 return True737 if method == "OPTIONS":738 return 200739def create_global_table(data):740 table_name = data["GlobalTableName"]741 if table_name in DynamoDBRegion.GLOBAL_TABLES:742 return get_error_message(743 "Global Table with this name already exists",744 "GlobalTableAlreadyExistsException",745 )746 DynamoDBRegion.GLOBAL_TABLES[table_name] = data747 for group in data.get("ReplicationGroup", []):748 group["ReplicaStatus"] = "ACTIVE"749 group["ReplicaStatusDescription"] = "Replica active"750 result = {"GlobalTableDescription": data}751 return result752def describe_global_table(data):753 table_name = data["GlobalTableName"]754 details = DynamoDBRegion.GLOBAL_TABLES.get(table_name)755 if not details:756 return get_error_message(757 "Global Table with this name does not exist", "GlobalTableNotFoundException"758 )759 result = {"GlobalTableDescription": details}760 return result761def list_global_tables(data):762 result = [763 select_attributes(tab, ["GlobalTableName", "ReplicationGroup"])764 for tab in DynamoDBRegion.GLOBAL_TABLES.values()765 ]766 result = {"GlobalTables": result}767 return result768def update_global_table(data):769 table_name = data["GlobalTableName"]770 details = DynamoDBRegion.GLOBAL_TABLES.get(table_name)771 if not details:772 return get_error_message(773 "Global Table with this name does not exist", "GlobalTableNotFoundException"774 )775 for update in data.get("ReplicaUpdates", []):776 repl_group = details["ReplicationGroup"]777 # delete existing778 delete = update.get("Delete")779 if delete:780 details["ReplicationGroup"] = [781 g for g in repl_group if g["RegionName"] != delete["RegionName"]782 ]783 # create new784 create = update.get("Create")785 if create:786 exists = [g for g in repl_group if g["RegionName"] == create["RegionName"]]787 if exists:788 continue789 new_group = {790 "RegionName": create["RegionName"],791 "ReplicaStatus": "ACTIVE",792 "ReplicaStatusDescription": "Replica active",793 }794 details["ReplicationGroup"].append(new_group)795 result = {"GlobalTableDescription": details}796 return result797def is_index_query_valid(table_name, index_query_type):798 schema = get_table_schema(table_name)799 for index in schema["Table"].get("GlobalSecondaryIndexes", []):800 index_projection_type = index.get("Projection").get("ProjectionType")801 if index_query_type == "ALL_ATTRIBUTES" and index_projection_type != "ALL":802 return False803 return True804def has_event_sources_or_streams_enabled(table_name, cache={}):805 if not table_name:806 return807 table_arn = aws_stack.dynamodb_table_arn(table_name)808 cached = cache.get(table_arn)809 if isinstance(cached, bool):810 return cached811 sources = lambda_api.get_event_sources(source_arn=table_arn)812 result = False813 if sources:814 result = True815 if not result and dynamodbstreams_api.get_stream_for_table(table_arn):816 result = True817 cache[table_arn] = result818 # if kinesis streaming destination is enabled819 # get table name from table_arn820 # since batch_wrtie and transact write operations passing table_arn instead of table_name821 table_name = table_arn.split("/", 1)[-1]822 table_definitions = DynamoDBRegion.get().table_definitions823 if not result and table_definitions.get(table_name):824 if table_definitions[table_name].get("KinesisDataStreamDestinationStatus") == "ACTIVE":825 result = True826 return result827def get_table_schema(table_name):828 key = "%s/%s" % (aws_stack.get_region(), table_name)829 schema = SCHEMA_CACHE.get(key)830 if not schema:831 ddb_client = aws_stack.connect_to_service("dynamodb")832 schema = ddb_client.describe_table(TableName=table_name)833 SCHEMA_CACHE[key] = schema834 return schema835def find_existing_item(put_item, table_name=None):836 table_name = table_name or put_item["TableName"]837 ddb_client = aws_stack.connect_to_service("dynamodb")838 search_key = {}839 if "Key" in put_item:840 search_key = put_item["Key"]841 else:842 schema = get_table_schema(table_name)843 schemas = [schema["Table"]["KeySchema"]]844 for index in schema["Table"].get("GlobalSecondaryIndexes", []):845 # TODO846 # schemas.append(index['KeySchema'])847 pass848 for schema in schemas:849 for key in schema:850 key_name = key["AttributeName"]851 search_key[key_name] = put_item["Item"][key_name]852 if not search_key:853 return854 req = {"TableName": table_name, "Key": search_key}855 existing_item = aws_stack.dynamodb_get_item_raw(req)856 if not existing_item:857 return existing_item858 if "Item" not in existing_item:859 if "message" in existing_item:860 table_names = ddb_client.list_tables()["TableNames"]861 msg = "Unable to get item from DynamoDB (existing tables: %s ...truncated if >100 tables): %s" % (862 table_names,863 existing_item["message"],864 )865 LOGGER.warning(msg)866 return867 return existing_item.get("Item")868def get_error_message(message, error_type):869 response = error_response(message=message, error_type=error_type)870 fix_headers_for_updated_response(response)871 return response872def get_table_not_found_error():873 return get_error_message(874 message="Cannot do operations on a non-existent table",875 error_type="ResourceNotFoundException",876 )877def fix_headers_for_updated_response(response):878 response.headers["Content-Length"] = len(to_bytes(response.content))879 response.headers["x-amz-crc32"] = calculate_crc32(response)880def update_response_content(response, content, status_code=None):881 aws_responses.set_response_content(response, content)882 if status_code:883 response.status_code = status_code884 fix_headers_for_updated_response(response)885def update_put_item_response_content(data, response_content):886 # when return-values variable is set only then attribute data should be returned887 # in the response otherwise by default is should not return any data.888 # https://github.com/localstack/localstack/issues/2121889 if data.get("ReturnValues"):890 response_content = json.dumps({"Attributes": data["Item"]})891 return response_content892def calculate_crc32(response):893 return crc32(to_bytes(response.content)) & 0xFFFFFFFF894def create_dynamodb_stream(data, latest_stream_label):895 stream = data["StreamSpecification"]896 enabled = stream.get("StreamEnabled")897 if enabled not in [False, "False"]:898 table_name = data["TableName"]899 view_type = stream["StreamViewType"]900 dynamodbstreams_api.add_dynamodb_stream(901 table_name=table_name,902 latest_stream_label=latest_stream_label,903 view_type=view_type,904 enabled=enabled,905 )906def forward_to_lambda(records):907 for record in records:908 sources = lambda_api.get_event_sources(source_arn=record["eventSourceARN"])909 event = {"Records": [record]}910 for src in sources:911 if src.get("State") != "Enabled":912 continue913 lambda_api.run_lambda(914 func_arn=src["FunctionArn"],915 event=event,916 context={},917 asynchronous=not config.SYNCHRONOUS_DYNAMODB_EVENTS,918 )919def forward_to_ddb_stream(records):920 dynamodbstreams_api.forward_events(records)921def forward_to_kinesis_stream(records):922 kinesis = aws_stack.connect_to_service("kinesis")923 table_definitions = DynamoDBRegion.get().table_definitions924 for record in records:925 if record.get("eventSourceARN"):926 table_name = record["eventSourceARN"].split("/", 1)[-1]927 table_def = table_definitions.get(table_name) or {}928 if table_def.get("KinesisDataStreamDestinationStatus") == "ACTIVE":929 stream_name = table_def["KinesisDataStreamDestinations"][-1]["StreamArn"].split(930 "/", 1931 )[-1]932 record["tableName"] = table_name933 record.pop("eventSourceARN", None)934 record["dynamodb"].pop("StreamViewType", None)935 partition_key = list(936 filter(lambda key: key["KeyType"] == "HASH", table_def["KeySchema"])937 )[0]["AttributeName"]938 kinesis.put_record(939 StreamName=stream_name,940 Data=json.dumps(record),941 PartitionKey=partition_key,942 )943def dynamodb_extract_keys(item, table_name):944 result = {}945 table_definitions = DynamoDBRegion.get().table_definitions946 if table_name not in table_definitions:947 LOGGER.warning("Unknown table: %s not found in %s" % (table_name, table_definitions))948 return None949 for key in table_definitions[table_name]["KeySchema"]:950 attr_name = key["AttributeName"]951 if attr_name not in item:952 return error_response(953 error_type="ValidationException",954 message="One of the required keys was not given a value",955 )956 result[attr_name] = item[attr_name]957 return result958def dynamodb_get_table_stream_specification(table_name):959 try:960 return get_table_schema(table_name)["Table"].get("StreamSpecification")961 except Exception as e:962 LOGGER.info(963 "Unable to get stream specification for table %s : %s %s"964 % (table_name, e, traceback.format_exc())965 )966 raise e967def is_kinesis_stream_exists(stream_arn):968 # connect to kinesis969 kinesis = aws_stack.connect_to_service("kinesis")970 stream_name_from_arn = stream_arn.split("/", 1)[1]971 # check if the stream exists in kinesis for the user972 filtered = list(973 filter(974 lambda stream_name: stream_name == stream_name_from_arn,975 kinesis.list_streams()["StreamNames"],976 )977 )978 return bool(filtered)979def dynamodb_enable_kinesis_streaming_destination(data, table_def):980 if table_def.get("KinesisDataStreamDestinationStatus") in [981 "DISABLED",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful