Best Python code snippet using localstack_python
tablestore_streamreader_console.py
Source:tablestore_streamreader_console.py
...15 self.accesskey = str(config['accessKey'])16 self.instance_name = str(config['instanceName'])17 self.status_table = str(config['statusTable'])18 self.ots = OTSClient(self.endpoint, self.accessid, self.accesskey, self.instance_name)19def describe_job(config, options):20 '''21 1. get job's description22 2. get all job's checkpoints and check if it is done23 '''24 if not options.stream_id:25 print "Error: Should set the stream id using '-s' or '--streamid'."26 sys.exit(-1)27 if not options.timestamp:28 print "Error: Should set the timestamp using '-t' or '--timestamp'."29 sys.exit(-1)30 pk = [('StreamId', options.stream_id), ('StatusType', 'DataxJobDesc'), ('StatusValue', '%16d' % int(options.timestamp))]31 consumed, pk, attrs, next_token = config.ots.get_row(config.status_table, pk, [], None, 1)32 if not attrs:33 print 'Stream job is not found.'34 sys.exit(-1)35 job_detail = parse_job_detail(attrs)36 print '----------JobDescriptions----------'37 print json.dumps(job_detail, indent=2)38 print '-----------------------------------'39 stream_checkpoints = _list_checkpoints(config, options.stream_id, int(options.timestamp))40 cps_headers = ['ShardId', 'SendRecordCount', 'Checkpoint', 'SkipCount', 'Version']41 table_content = []42 for cp in stream_checkpoints:43 table_content.append([cp['ShardId'], cp['SendRecordCount'], cp['Checkpoint'], cp['SkipCount'], cp['Version']])44 print tabulate.tabulate(table_content, headers=cps_headers)45 # check if stream job has finished46 finished = True47 if len(job_detail['ShardIds']) != len(stream_checkpoints):48 finished = False49 for cp in stream_checkpoints:50 if cp['Version'] != job_detail['Version']:51 finished = False52 print '----------JobSummary----------'53 print 'ShardsCount:', len(job_detail['ShardIds'])54 print 'CheckPointsCount:', len(stream_checkpoints)55 print 'JobStatus:', 'Finished' if finished else 'NotFinished'56 print '------------------------------'57def _list_checkpoints(config, stream_id, timestamp):58 start_pk = [('StreamId', stream_id), ('StatusType', 'CheckpointForDataxReader'), ('StatusValue', '%16d' % timestamp)]59 end_pk = [('StreamId', stream_id), ('StatusType', 'CheckpointForDataxReader'), ('StatusValue', '%16d' % (timestamp + 1))]60 consumed_counter = CapacityUnit(0, 0)61 columns_to_get = []62 checkpoints = []63 range_iter = config.ots.xget_range(64 config.status_table, Direction.FORWARD,65 start_pk, end_pk,66 consumed_counter, columns_to_get, 100,67 column_filter=None, max_version=168 )69 rows = []70 for (primary_key, attrs) in range_iter:71 checkpoint = {}72 for attr in attrs:73 checkpoint[attr[0]] = attr[1]74 if not checkpoint.has_key('SendRecordCount'):75 checkpoint['SendRecordCount'] = 076 checkpoint['ShardId'] = primary_key[2][1].split('\t')[1]77 checkpoints.append(checkpoint)78 return checkpoints79def list_job(config, options):80 '''81 Two options:82 1. list all jobs of stream83 2. list all jobs and all streams84 '''85 consumed_counter = CapacityUnit(0, 0)86 if options.stream_id:87 start_pk = [('StreamId', options.stream_id), ('StatusType', INF_MIN), ('StatusValue', INF_MIN)]88 end_pk = [('StreamId', options.stream_id), ('StatusType', INF_MAX), ('StatusValue', INF_MAX)]89 else:90 start_pk = [('StreamId', INF_MIN), ('StatusType', INF_MIN), ('StatusValue', INF_MIN)]91 end_pk = [('StreamId', INF_MAX), ('StatusType', INF_MAX), ('StatusValue', INF_MAX)]92 columns_to_get = []93 range_iter = config.ots.xget_range(94 config.status_table, Direction.FORWARD,95 start_pk, end_pk,96 consumed_counter, columns_to_get, None,97 column_filter=None, max_version=198 )99 rows = []100 for (primary_key, attrs) in range_iter:101 if primary_key[1][1] == 'DataxJobDesc':102 job_detail = parse_job_detail(attrs)103 rows.append([job_detail['TableName'], job_detail['JobStreamId'], job_detail['EndTime'], job_detail['StartTime'], job_detail['EndTime'], job_detail['Version']])104 headers = ['TableName', 'JobStreamId', 'Timestamp', 'StartTime', 'EndTime', 'Version']105 print tabulate.tabulate(rows, headers=headers)106def parse_job_detail(attrs):107 job_details = {}108 shard_ids_content = ''109 for attr in attrs:110 if attr[0].startswith('ShardIds_'):111 shard_ids_content += attr[1]112 else:113 job_details[attr[0]] = attr[1]114 shard_ids = json.loads(zlib.decompress(shard_ids_content))115 if not job_details.has_key('Version'):116 job_details['Version'] = ''117 if not job_details.has_key('SkipCount'):118 job_details['SkipCount'] = 0119 job_details['ShardIds'] = shard_ids120 return job_details121def parse_time(value):122 try:123 return int(value)124 except Exception,e:125 return int(time.mktime(time.strptime(value, '%Y-%m-%d %H:%M:%S')))126if __name__ == '__main__':127 parser = OptionParser()128 parser.add_option('-c', '--config', dest='config_file', help='path of config file', metavar='tablestore_streamreader_config.json')129 parser.add_option('-a', '--action', dest='action', help='the action to do', choices = ['describe_job', 'list_job'], metavar='')130 parser.add_option('-t', '--timestamp', dest='timestamp', help='the timestamp', metavar='')131 parser.add_option('-s', '--streamid', dest='stream_id', help='the id of stream', metavar='')132 parser.add_option('-d', '--shardid', dest='shard_id', help='the id of shard', metavar='')133 options, args = parser.parse_args()134 if not options.config_file:135 print "Error: Should set the path of config file using '-c' or '--config'."136 sys.exit(-1)137 if not options.action:138 print "Error: Should set the action using '-a' or '--action'."139 sys.exit(-1)140 console_config = ConsoleConfig(options.config_file)141 if options.action == 'list_job':142 list_job(console_config, options)143 elif options.action == 'describe_job':...
describe_job.py
Source:describe_job.py
1import logging2import boto33from botocore.exceptions import ClientError4def describe_job(vault_name, job_id):5 """Retrieve the status of an Amazon S3 Glacier job, such as an6 inventory-retrieval job7 To retrieve the output of the finished job, call Glacier.Client.get_job_output()8 :param vault_name: string9 :param job_id: string. The job ID was returned by Glacier.Client.initiate_job().10 :return: Dictionary of information related to the job. If error, return None.11 """12 # Retrieve the status of the job13 glacier = boto3.client('glacier')14 try:15 response = glacier.describe_job(vaultName=vault_name, jobId=job_id)16 except ClientError as e:17 logging.error(e)18 return None19 return response20def main():21 """Exercise describe_job()"""22 # Assign the following values before running the program23 test_vault_name = 'VAULT_NAME'24 test_job_id = 'JOB_ID'25 # Set up logging26 logging.basicConfig(level=logging.DEBUG,27 format='%(levelname)s: %(asctime)s: %(message)s')28 # Retrieve the job's status29 response = describe_job(test_vault_name, test_job_id)30 if response is not None:31 logging.info(f'Job Type: {response["Action"]}, '32 f'Status: {response["StatusCode"]}')33if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!