Best Python code snippet using fMBT_python
dag_controller.py
Source:dag_controller.py
1'''2This function can be used by any DAG that needs a python operator to run a sql file3'''4import datetime as dt5import json6import os7from random import randint8import pandas as pd9from dattasa import data_pipeline as dp10def run_script(**kwargs):11 '''12 The function takes in13 file name, db_host, db_config_file, audit_table_name14 sql path, output path, log path15 delimited out indicator and db host as input16 Default db_config_file = $HOME/database.yaml17 Default folders if not specified are $SQL_DIR, $TEMP_DIR, $LOG_DIR18 The function then searches for the db_host name in database.yaml file19 and gets the login credentials. The sql is run and the output and log files are redirected to the20 path specified. In case of any sql failures, the dattasa package will throw and exception and21 this function will not complete successfully22 Naming convention for the files23 sql_file name should be under <sql_path>/<file_name>.sql24 out_file name will be under <out_path>/<file_name>.out25 log_file name will be under <log_path>/<file_name>_'YYYY-MM-DD_HH:MM:SS'.log26 ADDTIONAL FUNCTIONALITY -27 pre_check_tables : Provide list of tables that have to be checked before running sql28 pre_check_scripts : Provide list of sql scripts that have to be checked before running sql29 load_tables : Provide list of tables that have to be audited as part of this run30 run_sql_file - By default value is True. If this is set to False the sql file will not be run. However you31 can still use this python function for testing pre-checks and/or auditing load tables32 SQL33 :param kwargs:34 :return:35 '''36 # Passing arguments to a function37 file_name = kwargs['file_name']38 audit_table = kwargs['audit_table']39 db_host = kwargs['db_host']40 db_config_file = kwargs.get('db_config_file', os.environ['HOME'] + '/database.yaml')41 sql_path = kwargs.get('sql_path', os.environ['SQL_DIR'])42 out_path = kwargs.get('out_path', os.environ['TEMP_DIR'])43 log_path = kwargs.get('log_path', os.environ['LOG_DIR'])44 pre_check_tables = kwargs.get('pre_check_tables', [])45 pre_check_scripts = kwargs.get('pre_check_scripts', [])46 load_tables = kwargs.get('load_tables', [])47 run_sql_file = kwargs.get('run_sql_file', True)48 delimited_file = kwargs.get('delimited_file', 'F')49 # Converting string to boolean value50 if delimited_file == 'T':51 delimited=True52 elif delimited_file == 'F':53 delimited=False54 '''55 Set all the files needed before calling run_psql_file function from greenplum_client56 You can either use os environment variables eg:- os.environ['HOME'] or airflow variables 57 '''58 sql_file = sql_path + '/' + file_name + '.sql'59 out_file = out_path + '/' + file_name + ".out"60 log_file = log_path + '/' + file_name + "_" + \61 dt.datetime.now().strftime("%Y-%m-%d_%H:%M:%S") + '.log'62 '''63 Perform pre-checks before running sql64 '''65 print ("Running pre-checks before running sql file " + file_name + "\n")66 run_pre_check_scripts(log_file, db_config_file, db_host, pre_check_tables, pre_check_scripts)67 audit_id = str(long(dt.datetime.now().strftime("%Y%m%d%H%M%S%f")) * 100 + randint(0, 9) * 10 + randint(0, 9))68 '''69 Create an entry in the audit table before running the sql for each load_name70 '''71 kwargs = {"load_type": "pre", "audit_id": audit_id, "audit_table": audit_table,72 "load_tables": load_tables, "load_description": file_name}73 load_audit_table(**kwargs)74 '''75 If sql file has to be run (Default=True) then run the sql file here.76 Note the sql will run only after all pre-checks succeed and entry in audit table is made succesfully77 '''78 if run_sql_file:79 db = dp.DataComponent().set_credentials(db_host, db_config_file)80 db.run_psql_file(sql_file, log_file, out_file, delimited)81 '''82 Update the audit table after sql run has completed and update final row counts for each load name83 '''84 kwargs = {"load_type": "post", "audit_id": audit_id, "audit_table": audit_table,85 "load_tables": load_tables, "load_description": file_name}86 load_audit_table(**kwargs)87 return88def run_pre_check_scripts(log_file, yaml_file, db_host, pre_check_tables, pre_check_scripts):89 '''90 :param log_file:91 :param yaml_file:92 :param db_host:93 :param pre_check_tables:94 :param pre_check_scripts:95 :return:96 '''97 db = dp.DataComponent().set_credentials(db_host, yaml_file)98 conn = db.get_db_conn(True, True, 20)99 out_log = open(log_file, "w")100 '''101 Ensure none of the pre check tables are empty.102 We will ask the DB for five rows and log whether we got them.103 If there were zero rows, fail the job.104 '''105 for table in pre_check_tables:106 pre_check_sql = "SELECT * FROM " + table + " LIMIT 5;"107 pre_check_df = pd.read_sql(pre_check_sql, conn)108 pre_check_row_count = pre_check_df.shape[0]109 out_log.write(str(pre_check_row_count) + " rows returned from " + table + "\n")110 if pre_check_row_count == 0:111 print ("No rows found in pre check table : " + table + "\n")112 out_log.close()113 db.close_connection()114 raise Exception('pre-check found an empty table ' + table)115 break116 '''117 Ensure none of the pre check sql scripts return empty result118 We will ask the DB for five rows and log whether we got them.119 If there were zero rows, fail the job.120 '''121 for sql_script in pre_check_scripts:122 pre_check_df = pd.read_sql(sql_script, conn)123 pre_check_row_count = pre_check_df.shape[0]124 out_log.write(str(pre_check_row_count) + " rows returned from " + sql_script + "\n")125 if pre_check_row_count == 0:126 print ("No rows found for pre check sql : " + sql_script + "\n")127 print ("SQL File " + file_name + " will not be run. Aborting ..." + "\n")128 out_log.close()129 db.close_connection()130 raise Exception('pre-check did not return any rows for sql ' + sql_script)131 break132 out_log.close()133 db.close_connection()134 return135'''136 Example on how to call load_audit_table independently in another python program137import sys, os138import datetime as dt139from random import randint140from data_jedi import dag_controller141python_file=os.environ['AIRFLOW_HOME'] + '/dags'142sys.path.append(python_file)143import dag_controller144load_tables=['adm.adm.fact_employer_feature', 'adm.fact_account_feature']145load_desc="reg_rate_tracking"146audit_id = str(long(dt.datetime.now().strftime("%Y%m%d%H%M%S%f")) * 100 + randint(0, 9) * 10 + randint(0, 9))147kwargs={"load_type": "pre", "audit_id": audit_id, "audit_table": "adm.airflow_load_audit",148"load_tables": load_tables, "load_description": load_desc}149dag_controller.load_audit_table(**kwargs)150kwargs={"load_type": "post", "audit_id": audit_id, "audit_table": "adm.airflow_load_audit",151"load_tables": load_tables, "load_description": load_desc}152dag_controller.load_audit_table(**kwargs)153'''154def load_audit_table(**kwargs):155 '''156 :param load_type:157 :param load_tables:158 :param load_description:159 :return:160 '''161 audit_id = kwargs['audit_id'] # Required Parameter162 load_type = kwargs['load_type'] # Required Parameter163 audit_table = kwargs['audit_table'] # Required Parameter164 queue_name = kwargs['queue_name'] # Required Parameter165 rabbitmq_host = kwargs['rabbitmq_host'] # Required Parameter166 load_tables = kwargs.get('load_tables', [])167 load_description = kwargs.get('load_description', '')168 rabbitmq_config_yaml = kwargs.get('config_yaml', os.environ['HOME'] + '/database.yaml')169 producer = dp.DataComponent().set_credentials(rabbitmq_host, rabbitmq_config_yaml)170 ''' Publish a message for each table '''171 for table_name in load_tables:172 data = {173 "audit_id": audit_id,174 "load_type": load_type,175 "load_tables": table_name,176 "load_description": load_description,177 "audit_table": audit_table,178 "load_time": dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")179 }180 message = json.dumps(data)181 status = producer.send(queue_name, message, auto_delete=False)182 print("published message for creating " + load_type + "-load audit record for " + table_name)183 producer.stop()184 return185def get_copy_mysql_table_to_gp_command(source_table, target_table, source_conn='',186 target_conn='', error_limit='2', null_string='',187 description=''):188 if null_string != '':189 load_command = 'python ' + os.environ['PROJECT_HOME'] + '/python_bash_scripts copy_mysql_table_to_gp.py ' + \190 ' -s ' + source_conn + ' -t ' + target_conn + ' -x ' + source_table + \191 ' -y ' + target_table + ' -d ' + description + ' -e ' + error_limit + \192 ' -n ' + null_string193 else:194 load_command = 'python ' + os.environ['PROJECT_HOME'] + '/python_bash_scripts copy_mysql_table_to_gp.py ' + \195 ' -s ' + source_conn + ' -t ' + target_conn + ' -x ' + source_table + \196 ' -y ' + target_table + ' -d ' + description + ' -e ' + error_limit197 return load_command198def get_copy_postgres_table_to_gp_command(source_table, target_table, source_conn='',199 target_conn='', error_limit='2', null_string='',200 description=''):201 if null_string != '':202 load_command = 'python ' + os.environ['PROJECT_HOME'] + '/python_bash_scripts copy_postgres_table_to_gp.py ' + \203 ' -s ' + source_conn + ' -t ' + target_conn + ' -x ' + source_table + \204 ' -y ' + target_table + ' -d ' + description + ' -e ' + error_limit + \205 ' -n ' + null_string206 else:207 load_command = 'python ' + os.environ['PROJECT_HOME'] + '/python_bash_scripts copy_postgres_table_to_gp.py ' + \208 ' -s ' + source_conn + ' -t ' + target_conn + ' -x ' + source_table + \209 ' -y ' + target_table + ' -d ' + description + ' -e ' + error_limit...
load.py
Source:load.py
...4from utils.database_connection import *5from utils.file_content_toString import *6con = databaseConnect()7cur = con.cursor()8def load_tables(load_procedure):9 """10 This function loads std_tables into dimensions and facts. 11 :param load_procedure: The sql procedure path to help transform into fact and dimension tables. 12 """13 try:14 table_name = load_procedure.split('/')[3].replace('.sql','').replace('load_','')15 transformation_sql = file_content_toString(load_procedure)16 print(f'\n[{SUCCESS}Info{END}] {BOLD}Loading into {table_name} table...{END}')17 start_time = time.time()18 cur.execute(transformation_sql)19 con.commit()20 print(f'[{SUCCESS}INFO{END}] {BOLD}Load Successful!{END}')21 print(f'{SUCCESS}[+] Elapsed Time: {(time.time() - start_time):.4f} seconds.{END}')22 except Exception as e:23 print(f'{FAILURE}[-] Exception Occured:{END}',e)24if __name__ == '__main__':25 load_tables('../sql/procedures/load_dim_location.sql')26 load_tables('../sql/procedures/load_dim_categories.sql')27 load_tables('../sql/procedures/load_dim_business_categories.sql')28 load_tables('../sql/procedures/load_fact_business.sql')29 load_tables('../sql/procedures/load_dim_photos.sql')30 load_tables('../sql/procedures/load_dim_elite_years.sql')31 load_tables('../sql/procedures/load_fact_user.sql')32 load_tables('../sql/procedures/load_fact_checkin.sql')33 load_tables('../sql/procedures/load_fact_tip.sql')34 load_tables('../sql/procedures/load_fact_review.sql')35 ...
app.py
Source:app.py
...22 ##Transforms order string23 transform.transform_order_string()24 25 #Loads clean data into database26 load_tables = load_db.load_tables()27 load_tables.load_products()28 load_tables.load_locations()29 load_tables.load_payment_type()30 load_tables.load_transactions()31 32 #Pulls transaction ID from transactions table in database33 load_tables.get_transaction_ids()34 35 ##Split orders per transaction ID36 transform.transaction_split()37 38 #Loads split orders39 load_tables.load_basket()40
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!