Best Python code snippet using localstack_python
dynamodb_listener.py
Source:dynamodb_listener.py
...123 existing_items.append(find_existing_item(inner_request, table_name))124 ProxyListenerDynamoDB.thread_local.existing_items = existing_items125 elif action == '%s.Query' % ACTION_PREFIX:126 if data.get('IndexName'):127 if not is_index_query_valid(to_str(data['TableName']), data.get('Select')):128 return error_response(message='One or more parameter values were invalid: Select type '129 'ALL_ATTRIBUTES is not supported for global secondary index id-index '130 'because its projection type is not ALL',131 error_type='ValidationException', code=400)132 elif action == '%s.TransactWriteItems' % ACTION_PREFIX:133 existing_items = []134 for item in data['TransactItems']:135 for key in ['Put', 'Update', 'Delete']:136 inner_item = item.get(key)137 if inner_item:138 existing_items.append(find_existing_item(inner_item))139 ProxyListenerDynamoDB.thread_local.existing_items = existing_items140 elif action == '%s.UpdateTimeToLive' % ACTION_PREFIX:141 # TODO: TTL status is maintained/mocked but no real expiry is happening for items142 response = Response()143 response.status_code = 200144 self._table_ttl_map[data['TableName']] = {145 'AttributeName': data['TimeToLiveSpecification']['AttributeName'],146 'Status': data['TimeToLiveSpecification']['Enabled']147 }148 response._content = json.dumps({'TimeToLiveSpecification': data['TimeToLiveSpecification']})149 fix_headers_for_updated_response(response)150 return response151 elif action == '%s.DescribeTimeToLive' % ACTION_PREFIX:152 response = Response()153 response.status_code = 200154 if data['TableName'] in self._table_ttl_map:155 if self._table_ttl_map[data['TableName']]['Status']:156 ttl_status = 'ENABLED'157 else:158 ttl_status = 'DISABLED'159 response._content = json.dumps({160 'TimeToLiveDescription': {161 'AttributeName': self._table_ttl_map[data['TableName']]['AttributeName'],162 'TimeToLiveStatus': ttl_status163 }164 })165 else: # TTL for dynamodb table not set166 response._content = json.dumps({'TimeToLiveDescription': {'TimeToLiveStatus': 'DISABLED'}})167 fix_headers_for_updated_response(response)168 return response169 elif action == '%s.TagResource' % ACTION_PREFIX or action == '%s.UntagResource' % ACTION_PREFIX:170 response = Response()171 response.status_code = 200172 response._content = '' # returns an empty body on success.173 fix_headers_for_updated_response(response)174 return response175 elif action == '%s.ListTagsOfResource' % ACTION_PREFIX:176 response = Response()177 response.status_code = 200178 response._content = json.dumps({179 'Tags': [180 {'Key': k, 'Value': v}181 for k, v in TABLE_TAGS.get(data['ResourceArn'], {}).items()182 ]183 })184 fix_headers_for_updated_response(response)185 return response186 return True187 def return_response(self, method, path, data, headers, response):188 if path.startswith('/shell') or method == 'GET':189 return190 data = json.loads(to_str(data))191 # update table definitions192 if data and 'TableName' in data and 'KeySchema' in data:193 TABLE_DEFINITIONS[data['TableName']] = data194 if response._content:195 # fix the table and latest stream ARNs (DynamoDBLocal hardcodes "ddblocal" as the region)196 content_replaced = re.sub(197 r'("TableArn"|"LatestStreamArn"|"StreamArn")\s*:\s*"arn:aws:dynamodb:ddblocal:([^"]+)"',198 r'\1: "arn:aws:dynamodb:%s:\2"' % aws_stack.get_region(),199 to_str(response._content)200 )201 if content_replaced != response._content:202 response._content = content_replaced203 fix_headers_for_updated_response(response)204 action = headers.get('X-Amz-Target')205 if not action:206 return207 # upgrade event version to 1.1208 record = {209 'eventID': '1',210 'eventVersion': '1.1',211 'dynamodb': {212 'ApproximateCreationDateTime': time.time(),213 'StreamViewType': 'NEW_AND_OLD_IMAGES',214 'SizeBytes': -1215 },216 'awsRegion': aws_stack.get_region(),217 'eventSource': 'aws:dynamodb'218 }219 records = [record]220 if action == '%s.UpdateItem' % ACTION_PREFIX:221 if response.status_code == 200:222 existing_item = self._thread_local('existing_item')223 record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'224 updated_item = find_existing_item(data)225 if not updated_item:226 return227 record['dynamodb']['Keys'] = data['Key']228 if existing_item:229 record['dynamodb']['OldImage'] = existing_item230 record['dynamodb']['NewImage'] = updated_item231 record['dynamodb']['SizeBytes'] = len(json.dumps(updated_item))232 elif action == '%s.BatchWriteItem' % ACTION_PREFIX:233 records = self.prepare_batch_write_item_records(record, data)234 elif action == '%s.TransactWriteItems' % ACTION_PREFIX:235 records = self.prepare_transact_write_item_records(record, data)236 elif action == '%s.PutItem' % ACTION_PREFIX:237 if response.status_code == 200:238 existing_item = self._thread_local('existing_item')239 record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'240 keys = dynamodb_extract_keys(item=data['Item'], table_name=data['TableName'])241 if isinstance(keys, Response):242 return keys243 # fix response244 if response._content == '{}':245 response._content = update_put_item_response_content(data, response._content)246 fix_headers_for_updated_response(response)247 # prepare record keys248 record['dynamodb']['Keys'] = keys249 record['dynamodb']['NewImage'] = data['Item']250 record['dynamodb']['SizeBytes'] = len(json.dumps(data['Item']))251 if existing_item:252 record['dynamodb']['OldImage'] = existing_item253 elif action in ['%s.GetItem' % ACTION_PREFIX, '%s.Query' % ACTION_PREFIX]:254 if response.status_code == 200:255 content = json.loads(to_str(response.content))256 # make sure we append 'ConsumedCapacity', which is properly257 # returned by dynalite, but not by AWS's DynamoDBLocal258 if 'ConsumedCapacity' not in content and data.get('ReturnConsumedCapacity') in ['TOTAL', 'INDEXES']:259 content['ConsumedCapacity'] = {260 'TableName': data['TableName'],261 'CapacityUnits': 5, # TODO hardcoded262 'ReadCapacityUnits': 2,263 'WriteCapacityUnits': 3264 }265 response._content = json.dumps(content)266 fix_headers_for_updated_response(response)267 elif action == '%s.DeleteItem' % ACTION_PREFIX:268 if response.status_code == 200:269 old_item = self._thread_local('existing_item')270 record['eventName'] = 'REMOVE'271 record['dynamodb']['Keys'] = data['Key']272 record['dynamodb']['OldImage'] = old_item273 elif action == '%s.CreateTable' % ACTION_PREFIX:274 if 'StreamSpecification' in data:275 if response.status_code == 200:276 content = json.loads(to_str(response._content))277 create_dynamodb_stream(data, content['TableDescription'].get('LatestStreamLabel'))278 event_publisher.fire_event(event_publisher.EVENT_DYNAMODB_CREATE_TABLE,279 payload={'n': event_publisher.get_hash(data['TableName'])})280 if data.get('Tags') and response.status_code == 200:281 table_arn = json.loads(response._content)['TableDescription']['TableArn']282 TABLE_TAGS[table_arn] = {tag['Key']: tag['Value'] for tag in data['Tags']}283 return284 elif action == '%s.DeleteTable' % ACTION_PREFIX:285 table_arn = json.loads(response._content).get('TableDescription', {}).get('TableArn')286 event_publisher.fire_event(287 event_publisher.EVENT_DYNAMODB_DELETE_TABLE,288 payload={'n': event_publisher.get_hash(data['TableName'])}289 )290 self.delete_all_event_source_mappings(table_arn)291 TABLE_TAGS.pop(table_arn, None)292 return293 elif action == '%s.UpdateTable' % ACTION_PREFIX:294 if 'StreamSpecification' in data:295 if response.status_code == 200:296 content = json.loads(to_str(response._content))297 create_dynamodb_stream(data, content['TableDescription'].get('LatestStreamLabel'))298 return299 elif action == '%s.TagResource' % ACTION_PREFIX:300 table_arn = data['ResourceArn']301 if table_arn not in TABLE_TAGS:302 TABLE_TAGS[table_arn] = {}303 TABLE_TAGS[table_arn].update({tag['Key']: tag['Value'] for tag in data.get('Tags', [])})304 return305 elif action == '%s.UntagResource' % ACTION_PREFIX:306 table_arn = data['ResourceArn']307 for tag_key in data.get('TagKeys', []):308 TABLE_TAGS.get(table_arn, {}).pop(tag_key, None)309 return310 else:311 # nothing to do312 return313 if len(records) > 0 and 'eventName' in records[0]:314 if 'TableName' in data:315 records[0]['eventSourceARN'] = aws_stack.dynamodb_table_arn(data['TableName'])316 forward_to_lambda(records)317 forward_to_ddb_stream(records)318 # -------------319 # UTIL METHODS320 # -------------321 def prepare_batch_write_item_records(self, record, data):322 records = []323 i = 0324 for table_name in sorted(data['RequestItems'].keys()):325 for request in data['RequestItems'][table_name]:326 put_request = request.get('PutRequest')327 if put_request:328 existing_item = self._thread_local('existing_items')[i]329 keys = dynamodb_extract_keys(item=put_request['Item'], table_name=table_name)330 if isinstance(keys, Response):331 return keys332 new_record = clone(record)333 new_record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'334 new_record['dynamodb']['Keys'] = keys335 new_record['dynamodb']['NewImage'] = put_request['Item']336 if existing_item:337 new_record['dynamodb']['OldImage'] = existing_item338 new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)339 records.append(new_record)340 delete_request = request.get('DeleteRequest')341 if delete_request:342 keys = delete_request['Key']343 if isinstance(keys, Response):344 return keys345 new_record = clone(record)346 new_record['eventName'] = 'REMOVE'347 new_record['dynamodb']['Keys'] = keys348 new_record['dynamodb']['OldImage'] = self._thread_local('existing_items')[i]349 new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)350 records.append(new_record)351 i += 1352 return records353 def prepare_transact_write_item_records(self, record, data):354 records = []355 # Fix issue #2745: existing_items only contain the Put/Update/Delete records,356 # so we will increase the index based on these events357 i = 0358 for request in data['TransactItems']:359 put_request = request.get('Put')360 if put_request:361 existing_item = self._thread_local('existing_items')[i]362 table_name = put_request['TableName']363 keys = dynamodb_extract_keys(item=put_request['Item'], table_name=table_name)364 if isinstance(keys, Response):365 return keys366 new_record = clone(record)367 new_record['eventName'] = 'INSERT' if not existing_item else 'MODIFY'368 new_record['dynamodb']['Keys'] = keys369 new_record['dynamodb']['NewImage'] = put_request['Item']370 if existing_item:371 new_record['dynamodb']['OldImage'] = existing_item372 new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)373 records.append(new_record)374 i += 1375 update_request = request.get('Update')376 if update_request:377 table_name = update_request['TableName']378 keys = update_request['Key']379 if isinstance(keys, Response):380 return keys381 updated_item = find_existing_item(update_request, table_name)382 if not updated_item:383 return384 new_record = clone(record)385 new_record['eventName'] = 'MODIFY'386 new_record['dynamodb']['Keys'] = keys387 new_record['dynamodb']['OldImage'] = self._thread_local('existing_items')[i]388 new_record['dynamodb']['NewImage'] = updated_item389 new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)390 records.append(new_record)391 i += 1392 delete_request = request.get('Delete')393 if delete_request:394 table_name = delete_request['TableName']395 keys = delete_request['Key']396 if isinstance(keys, Response):397 return keys398 new_record = clone(record)399 new_record['eventName'] = 'REMOVE'400 new_record['dynamodb']['Keys'] = keys401 new_record['dynamodb']['OldImage'] = self._thread_local('existing_items')[i]402 new_record['eventSourceARN'] = aws_stack.dynamodb_table_arn(table_name)403 records.append(new_record)404 i += 1405 return records406 def delete_all_event_source_mappings(self, table_arn):407 if table_arn:408 # fix start dynamodb service without lambda409 if not is_api_enabled('lambda'):410 return411 lambda_client = aws_stack.connect_to_service('lambda')412 result = lambda_client.list_event_source_mappings(EventSourceArn=table_arn)413 for event in result['EventSourceMappings']:414 event_source_mapping_id = event['UUID']415 lambda_client.delete_event_source_mapping(UUID=event_source_mapping_id)416 @staticmethod417 def _thread_local(name, default=None):418 try:419 return getattr(ProxyListenerDynamoDB.thread_local, name)420 except AttributeError:421 return default422def handle_special_request(method, path, data, headers):423 if path.startswith('/shell') or method == 'GET':424 if path == '/shell':425 headers = {'Refresh': '0; url=%s/shell/' % config.TEST_DYNAMODB_URL}426 return aws_responses.requests_response('', headers=headers)427 return True428 if method == 'OPTIONS':429 return 200430def create_global_table(data):431 table_name = data['GlobalTableName']432 if table_name in GLOBAL_TABLES:433 return get_error_message('Global Table with this name already exists', 'GlobalTableAlreadyExistsException')434 GLOBAL_TABLES[table_name] = data435 for group in data.get('ReplicationGroup', []):436 group['ReplicaStatus'] = 'ACTIVE'437 group['ReplicaStatusDescription'] = 'Replica active'438 result = {'GlobalTableDescription': data}439 return result440def describe_global_table(data):441 table_name = data['GlobalTableName']442 details = GLOBAL_TABLES.get(table_name)443 if not details:444 return get_error_message('Global Table with this name does not exist', 'GlobalTableNotFoundException')445 result = {'GlobalTableDescription': details}446 return result447def list_global_tables(data):448 result = [select_attributes(tab, ['GlobalTableName', 'ReplicationGroup']) for tab in GLOBAL_TABLES.values()]449 result = {'GlobalTables': result}450 return result451def update_global_table(data):452 table_name = data['GlobalTableName']453 details = GLOBAL_TABLES.get(table_name)454 if not details:455 return get_error_message('Global Table with this name does not exist', 'GlobalTableNotFoundException')456 for update in data.get('ReplicaUpdates', []):457 repl_group = details['ReplicationGroup']458 # delete existing459 delete = update.get('Delete')460 if delete:461 details['ReplicationGroup'] = [g for g in repl_group if g['RegionName'] != delete['RegionName']]462 # create new463 create = update.get('Create')464 if create:465 exists = [g for g in repl_group if g['RegionName'] == create['RegionName']]466 if exists:467 continue468 new_group = {469 'RegionName': create['RegionName'], 'ReplicaStatus': 'ACTIVE',470 'ReplicaStatusDescription': 'Replica active'471 }472 details['ReplicationGroup'].append(new_group)473 result = {'GlobalTableDescription': details}474 return result475def is_index_query_valid(table_name, index_query_type):476 ddb_client = aws_stack.connect_to_service('dynamodb')477 schema = ddb_client.describe_table(TableName=table_name)478 for index in schema['Table'].get('GlobalSecondaryIndexes', []):479 index_projection_type = index.get('Projection').get('ProjectionType')480 if index_query_type == 'ALL_ATTRIBUTES' and index_projection_type != 'ALL':481 return False482 return True483def find_existing_item(put_item, table_name=None):484 table_name = table_name or put_item['TableName']485 ddb_client = aws_stack.connect_to_service('dynamodb')486 search_key = {}487 if 'Key' in put_item:488 search_key = put_item['Key']489 else:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!