How to use table_name_from_stream_arn method in localstack

Best Python code snippet using localstack_python

dynamodbstreams_api.py

Source:dynamodbstreams_api.py Github

copy

Full Screen

...44 payload={"n": event_publisher.get_hash(table_name)},45 )46def get_stream_for_table(table_arn):47 region = DynamoDBStreamsBackend.get()48 table_name = table_name_from_stream_arn(table_arn)49 return region.ddb_streams.get(table_name)50def forward_events(records):51 kinesis = aws_stack.connect_to_service("kinesis")52 for record in records:53 if "SequenceNumber" not in record["dynamodb"]:54 record["dynamodb"]["SequenceNumber"] = str(55 DynamoDBStreamsBackend.SEQUENCE_NUMBER_COUNTER56 )57 DynamoDBStreamsBackend.SEQUENCE_NUMBER_COUNTER += 158 table_arn = record["eventSourceARN"]59 stream = get_stream_for_table(table_arn)60 if stream:61 table_name = table_name_from_stream_arn(stream["StreamArn"])62 stream_name = get_kinesis_stream_name(table_name)63 kinesis.put_record(StreamName=stream_name, Data=json.dumps(record), PartitionKey="TODO")64def delete_streams(table_arn):65 region = DynamoDBStreamsBackend.get()66 table_name = table_name_from_table_arn(table_arn)67 stream = region.ddb_streams.pop(table_name, None)68 if stream:69 stream_name = get_kinesis_stream_name(table_name)70 try:71 aws_stack.connect_to_service("kinesis").delete_stream(StreamName=stream_name)72 # sleep a bit, as stream deletion can take some time ...73 time.sleep(1)74 except Exception:75 pass # ignore "stream not found" errors76@app.route("/", methods=["POST"])77def post_request():78 region = DynamoDBStreamsBackend.get()79 action = request.headers.get("x-amz-target", "")80 action = action.split(".")[-1]81 data = json.loads(to_str(request.data))82 result = {}83 kinesis = aws_stack.connect_to_service("kinesis")84 if action == "ListStreams":85 result = {"Streams": list(region.ddb_streams.values())}86 elif action == "DescribeStream":87 for stream in region.ddb_streams.values():88 if stream["StreamArn"] == data["StreamArn"]:89 result = {"StreamDescription": stream}90 # get stream details91 dynamodb = aws_stack.connect_to_service("dynamodb")92 table_name = table_name_from_stream_arn(stream["StreamArn"])93 stream_name = get_kinesis_stream_name(table_name)94 stream_details = kinesis.describe_stream(StreamName=stream_name)95 table_details = dynamodb.describe_table(TableName=table_name)96 stream["KeySchema"] = table_details["Table"]["KeySchema"]97 # Replace Kinesis ShardIDs with ones that mimic actual98 # DynamoDBStream ShardIDs.99 stream_shards = stream_details["StreamDescription"]["Shards"]100 for shard in stream_shards:101 shard["ShardId"] = shard_id(stream_name, shard["ShardId"])102 stream["Shards"] = stream_shards103 break104 if not result:105 return error_response(106 "Requested resource not found", error_type="ResourceNotFoundException"107 )108 elif action == "GetShardIterator":109 # forward request to Kinesis API110 stream_name = stream_name_from_stream_arn(data["StreamArn"])111 stream_shard_id = kinesis_shard_id(data["ShardId"])112 kwargs = (113 {"StartingSequenceNumber": data["SequenceNumber"]} if data.get("SequenceNumber") else {}114 )115 result = kinesis.get_shard_iterator(116 StreamName=stream_name,117 ShardId=stream_shard_id,118 ShardIteratorType=data["ShardIteratorType"],119 **kwargs,120 )121 elif action == "GetRecords":122 kinesis_records = kinesis.get_records(**data)123 result = {124 "Records": [],125 "NextShardIterator": kinesis_records.get("NextShardIterator"),126 }127 for record in kinesis_records["Records"]:128 record_data = json.loads(to_str(record["Data"]))129 record_data["dynamodb"]["SequenceNumber"] = record["SequenceNumber"]130 result["Records"].append(record_data)131 else:132 print('WARNING: Unknown operation "%s"' % action)133 return jsonify(result)134# -----------------135# HELPER FUNCTIONS136# -----------------137def error_response(message=None, error_type=None, code=400):138 if not message:139 message = "Unknown error"140 if not error_type:141 error_type = "UnknownError"142 if "com.amazonaws.dynamodb" not in error_type:143 error_type = "com.amazonaws.dynamodb.v20120810#%s" % error_type144 content = {"message": message, "__type": error_type}145 return make_response(jsonify(content), code)146def get_kinesis_stream_name(table_name):147 return DDB_KINESIS_STREAM_NAME_PREFIX + table_name148def table_name_from_stream_arn(stream_arn):149 return stream_arn.split(":table/", 1)[-1].split("/")[0]150def table_name_from_table_arn(table_arn):151 return table_name_from_stream_arn(table_arn)152def stream_name_from_stream_arn(stream_arn):153 table_name = table_name_from_stream_arn(stream_arn)154 return get_kinesis_stream_name(table_name)155def shard_id(stream_arn, kinesis_shard_id):156 timestamp = str(int(now_utc()))157 timestamp = "%s00000000" % timestamp[:-5]158 timestamp = "%s%s" % ("0" * (20 - len(timestamp)), timestamp)159 suffix = kinesis_shard_id.replace("shardId-", "")[:32]160 return "shardId-%s-%s" % (timestamp, suffix)161def kinesis_shard_id(dynamodbstream_shard_id):162 shard_params = dynamodbstream_shard_id.rsplit("-")163 return "{0}-{1}".format(shard_params[0], shard_params[-1])164def serve(port, quiet=True):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful