Best Python code snippet using localstack_python
dynamodb.py
Source:dynamodb.py
...220 for repgrp in gtable['ReplicationGroup'] :221 replicationgrp.append(repgrp)222 if 'LastEvaluatedGlobalTableName' in gtables :223 lastgtablename = gtables['LastEvaluatedGlobalTableName']224 tableinfo = describe_global_table(gtablename)225 # klogger.debug(tableinfo)226 gtablearn = ' '; createdtm = ' '; gtablesgtatus = ' '; 227 if tableinfo != None :228 gtablearn = tableinfo['GlobalTableArn'] if 'GlobalTableArn' in tableinfo else ' '229 createdtm = tableinfo['CreationDateTime'].strftime('%Y-%m-%d %H:%M') if 'CreationDateTime' in tableinfo else ' '230 gtablesgtatus = tableinfo['GlobalTableStatus'] if 'GlobalTableStatus' in tableinfo else ' '231 # list count sync with space232 utils.ListSyncCountWithSpace(replicationgrp233 )234 235 results.append({ "GlobalTableName" : gtablename,236 "ReplicationGroup" : replicationgrp,237 "LastEvaluatedGlobalTableName" : lastgtablename,238 "GlobalTableArn" : gtablearn,239 "CreationDateTime" : createdtm, 240 "GlobalTableStatus" : gtablesgtatus,241 })242 else: # column list243 lastgtablename = ' '244 if 'LastEvaluatedGlobalTableName' in gtables :245 lastgtablename = gtables['LastEvaluatedGlobalTableName']246 results.append({ "GlobalTableName" : ' ',247 "ReplicationGroup" : list(' '),248 "LastEvaluatedGlobalTableName" : lastgtablename,249 "GlobalTableArn" : ' ',250 "CreationDateTime" : ' ',251 "GlobalTableStatus" : ' ',252 })253 else:254 klogger.error("call error : %d", gtables["ResponseMetadata"]["HTTPStatusCode"])255 results.append({ "GlobalTableName" : 'ERROR CHECK',256 "ReplicationGroup" : 'ERROR CHECK',257 "LastEvaluatedGlobalTableName" : 'ERROR CHECK',258 "GlobalTableArn" : 'ERROR CHECK',259 "CreationDateTime" : 'ERROR CHECK',260 "GlobalTableStatus" : list(' '),261 })262 # klogger.debug(results)263 except Exception as othererr:264 klogger.error("dynamodb.list_global_tables(),%s", othererr)265 results.append({ "GlobalTableName" : 'ERROR CHECK',266 "ReplicationGroup" : 'ERROR CHECK',267 "LastEvaluatedGlobalTableName" : 'ERROR CHECK',268 "GlobalTableArn" : 'ERROR CHECK',269 "CreationDateTime" : 'ERROR CHECK',270 "GlobalTableStatus" : list(' '),271 })272 finally:273 return results274def describe_global_table(GlobalTableName):275 '''276 search dynamodb global table277 '''278# klogger_dat.debug('dynamodb global table')279 try:280 result = None281 dynamodb=boto3.client('dynamodb')282 # klogger.debug(f'{GlobalTableName}')283 gtable = dynamodb.describe_global_table(GlobalTableName=GlobalTableName)284 # klogger.debug("%s", gtable)285 if 200 == gtable["ResponseMetadata"]["HTTPStatusCode"]:286 # klogger_dat.debug("%s",gtable["GlobalTableDescription"])287 if 'GlobalTableDescription' in gtable :288 result = gtable['GlobalTableDescription']289 else:290 klogger.error("call error : %d", gtable["ResponseMetadata"]["HTTPStatusCode"])291 # klogger.debug(result)292 except Exception as othererr:293 klogger.error("dynamodb.describe_global_table(),%s", othererr)294 finally:295 return result296def main(argv):297 list_global_tables()298 list_tables() 299 sys.exit(0)300if __name__ == "__main__":...
aws_dynamodb_info.py
Source:aws_dynamodb_info.py
...180 return client.describe_table(181 TableName=module.params['name'],182 ), False183 elif module.params['describe_global_table']:184 return client.describe_global_table(185 GlobalTableName=module.params['name'],186 ), False187 else:188 if client.can_paginate('list_tables'):189 paginator = client.get_paginator('list_tables')190 return paginator.paginate(), True191 else:192 return client.list_tables(), False193 except (BotoCoreError, ClientError) as e:194 module.fail_json_aws(e, msg='Failed to fetch AWS Dynamo DB details')195def main():196 argument_spec = dict(197 arn=dict(required=False, aliases=['table_arn']),198 name=dict(required=False, aliases=['table_name']),...
dynamoDBGlobalTableCreate.py
Source:dynamoDBGlobalTableCreate.py
...35 except client.exceptions.GlobalTableAlreadyExistsException as e:36 logger.info('Global Table already exists, falling back to update logic: ' + e.message )37 # Describe the already existing global table, so we can attempt to update instead38 response = {}39 response = client.describe_global_table(40 GlobalTableName=global_table_name41 )42 logger.debug(response)43 existing_replication_group_list = []44 for region in response['GlobalTableDescription']['ReplicationGroup']:45 logger.debug('Existing Replication Group List')46 existing_replication_group_list.append(region['RegionName'])47 event['OldResourceProperties']= { 'ReplicationGroupList': existing_replication_group_list }48 response = global_table_update(event,context)49 return response50 except Exception as e:51 logger.error('Failed to create DynamoDB Global Table: ' + global_table_name)52 resp['Reason'] = 'Failed to create DynamoDB Global Table: ' + e.message 53 resp['Status'] = 'FAILED'54 55 return resp56def global_table_update(event, context):57 logger.debug(json.dumps(event))58 global_table_name = event['ResourceProperties']['GlobalTableName']59 try:60 resp = {'Status': 'FAILED', 'Data': {'GlobalTableName': global_table_name, 'LogGroup': context.log_group_name }}61 client = boto3.client('dynamodb')62 # Describe the existing global table, as we cannot trust the provided63 # event['OldResourceProperties']['ReplicationGroupList']) to show the64 # current actual replication group list for a stack update failure / rollback.65 response = {}66 response = client.describe_global_table(67 GlobalTableName=global_table_name68 )69 logger.debug(response)70 existing_replication_groups = set() 71 for region in response['GlobalTableDescription']['ReplicationGroup']:72 existing_replication_groups.add(region['RegionName'])73 new_rep_groups = set(event['ResourceProperties']['ReplicationGroupList'])74 old_rep_groups = set(event['OldResourceProperties']['ReplicationGroupList'])75 changeset_rep_group_discrepancies = old_rep_groups - existing_replication_groups76 if len(changeset_rep_group_discrepancies):77 logger.info("Using the results of describe_global_table rather than the value from OldResourceProperties in the event")78 # Use reality, not the value from OldResourceProperties in the event79 old_rep_groups = existing_replication_groups80 if new_rep_groups == old_rep_groups:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!