Best Python code snippet using autotest_python
20200730133500_additional_raw_fields.py
Source:20200730133500_additional_raw_fields.py
1import logging2from pymongo import UpdateOne3from datetime import datetime, timezone4from diworker.migrations.base import BaseMigration5"""6Adds a 'start_date', 'start_date', 'cost' fields to all raw expenses'.7Needed for calculating costs per day'.8"""9LOG = logging.getLogger(__name__)10CHUNK_SIZE = 20011UNIQUE_INDEX_NAME = 'AWSReportImporter'12OLD_UNIQUE_FIELD_LIST = [13 'lineItem/LineItemType',14 'lineItem/UsageStartDate',15 'lineItem/UsageType',16 'lineItem/Operation',17 'lineItem/ResourceId',18 'cloud_credentials_id',19]20OLD_PARTIAL_FILTER_EXPRESSION = {21 'lineItem/LineItemType': {'$exists': True},22 'lineItem/UsageStartDate': {'$exists': True},23 'lineItem/UsageType': {'$exists': True},24 'lineItem/Operation': {'$exists': True},25}26NEW_PARTIAL_FILTER_EXPRESSION = {27 'lineItem/LineItemType': {'$exists': True},28 'lineItem/UsageStartDate': {'$exists': True},29 'lineItem/UsageType': {'$exists': True},30 'lineItem/Operation': {'$exists': True},31 'start_date': {'$exists': True},32}33NEW_UNIQUE_FIELD_LIST = [34 'lineItem/LineItemType',35 'lineItem/UsageStartDate',36 'lineItem/UsageType',37 'lineItem/Operation',38 'lineItem/ResourceId',39 'cloud_credentials_id',40 'start_date'41]42class Migration(BaseMigration):43 def _create_raw_unique_index(self, field_list, partial_filter_expression):44 existing_indexes = [45 x['name'] for x in self.db.raw_expenses.list_indexes()46 ]47 if UNIQUE_INDEX_NAME not in existing_indexes:48 LOG.info('Creating unique index %s in raw collection',49 UNIQUE_INDEX_NAME)50 self.db.raw_expenses.create_index(51 [(f, 1) for f in field_list],52 name=UNIQUE_INDEX_NAME,53 unique=True,54 partialFilterExpression=partial_filter_expression55 )56 def fix_resource_ids(self):57 update_requests = []58 filters = {59 'resource_id': {'$exists': True},60 'lineItem/ResourceId': {'$exists': True},61 }62 for item in self.db.raw_expenses.find(filters):63 r_id = item['lineItem/ResourceId']64 update_request = UpdateOne(65 filter={'_id': item['_id']},66 update={'$set': {67 'resource_id': r_id[r_id.find('/') + 1:]68 }}69 )70 update_requests.append(update_request)71 if len(update_requests) == CHUNK_SIZE:72 self.db.raw_expenses.bulk_write(update_requests)73 update_requests = []74 if len(update_requests) > 0:75 self.db.raw_expenses.bulk_write(update_requests)76 def add_new_fields(self):77 update_requests = []78 filters = {79 'cost': {'$exists': False},80 'lineItem/UsageStartDate': {'$exists': True}81 }82 for item in self.db.raw_expenses.find(filters):83 update_request = UpdateOne(84 filter={'_id': item['_id']},85 update={'$set': {86 'cost': float(item.get('lineItem/BlendedCost', 0)),87 'end_date': datetime.strptime(88 item['lineItem/UsageEndDate'], '%Y-%m-%dT%H:%M:%SZ'89 ).replace(tzinfo=timezone.utc),90 'start_date': datetime.strptime(91 item['lineItem/UsageStartDate'], '%Y-%m-%dT%H:%M:%SZ'92 ).replace(tzinfo=timezone.utc),93 }},94 )95 update_requests.append(update_request)96 if len(update_requests) == CHUNK_SIZE:97 self.db.raw_expenses.bulk_write(update_requests)98 update_requests = []99 if len(update_requests) > 0:100 self.db.raw_expenses.bulk_write(update_requests)101 def upgrade(self):102 self.fix_resource_ids()103 self.add_new_fields()104 existing_indexes = [105 x['name'] for x in self.db.raw_expenses.list_indexes()106 ]107 if UNIQUE_INDEX_NAME in existing_indexes:108 self.db.raw_expenses.drop_index(UNIQUE_INDEX_NAME)109 self._create_raw_unique_index(110 NEW_UNIQUE_FIELD_LIST, NEW_PARTIAL_FILTER_EXPRESSION)111 def downgrade(self):112 update_requests = []113 filters = {114 'resource_id': {'$exists': True},115 'lineItem/ResourceId': {'$exists': True},116 }117 for item in self.db.raw_expenses.find(filters):118 update_request = UpdateOne(119 filter={'_id': item['_id']},120 update={'$set': {121 'resource_id': item['lineItem/ResourceId'].split('/')[-1]}122 },123 )124 update_requests.append(update_request)125 if len(update_requests) == CHUNK_SIZE:126 self.db.raw_expenses.bulk_write(update_requests)127 update_requests = []128 if len(update_requests) > 0:129 self.db.raw_expenses.bulk_write(update_requests)130 self.db.raw_expenses.update_many(131 {}, {'$unset': {'cost': '', 'end_date': '', 'start_date': ''}})132 existing_indexes = [133 x['name'] for x in self.db.raw_expenses.list_indexes()134 ]135 if UNIQUE_INDEX_NAME in existing_indexes:136 self.db.raw_expenses.drop_index(UNIQUE_INDEX_NAME)137 self._create_raw_unique_index(...
importer.py
Source:importer.py
1"""Short script to imports vessel information into elastic search."""2import json3import sys4import datetime as dt5import itertools as it6import json7import re8from pipe_vessels.elasticsearch.server import ElasticSearchServer9def batch(iterable, size):10 args = [iter(iterable)] * size11 return it.zip_longest(*args)12def line_to_elasticsearch_bulk_command(line):13 record = json.loads(line)14 command = {"index": {"_index": unique_index_name,15 "_id": record["vesselId"]}}16 return [command, record]17# Configuration options18server_url = sys.argv[1]19server_auth = sys.argv[2]20index_name = sys.argv[3]21index_schema = sys.argv[4]22# Derived configuration options23timestamp = dt.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")24unique_index_name = '{}-{}'.format(index_name, timestamp)25# Open a base http connection to elasticsearch server26server = ElasticSearchServer(server_url, server_auth)27# Get where the current alias is pointing to later remove old indices28print("Obtaining alias information for the current index")29alias_info = server.alias_information(index_name)30old_indices = list(alias_info.keys())31print(("The alias is currently pointing to {}".format(old_indices)))32# Precreate the index so that we can setup proper mappings33print(("Creating index {}".format(unique_index_name)))34server.create_index(unique_index_name, index_schema)35try:36 # Process the records in batches37 bulk_commands = list(map(38 line_to_elasticsearch_bulk_command, iter(sys.stdin)))39 batched_commands = batch(bulk_commands, 5000)40 # For each batch, push it as a bulk payload41 for batch in batched_commands:42 print("Indexing batch")43 server.bulk(filter(lambda x: x is not None, batch))44 # Update the alias to point to the new index45 print(("Updating index alias which was pointing to {} to point to the new index {}".format(46 old_indices, unique_index_name)))47 server.alias({48 "actions": [49 {"add": {"index": unique_index_name, "alias": index_name}},50 ]51 })52except Exception as e:53 print(("Exception while importing records to elastic search. {}".format(e)))54 print(("Removing new index {} as the import process failed".format(55 unique_index_name)))56 server.drop_index(unique_index_name)57 raise58# Remove the old indices59for old_index in old_indices:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!