Best Python code snippet using localstack_python
i94immigration.py
Source:i94immigration.py
1#-----------------------------------------------------------------------------------#2# import packages #3#-----------------------------------------------------------------------------------#4from datetime import datetime, timedelta5from airflow import DAG6from airflow.operators.dummy_operator import DummyOperator7from airflow.operators import PythonOperator8import lib.emr as libemr9import lib.python_function as pyfun10import lib.redshift_sql_syntax as red_sql11import lib.redshift_function as red_fun12#-----------------------------------------------------------------------------------#13# define connection parameters for AWS Redshift, AWS EMR, and AWS S3 #14#-----------------------------------------------------------------------------------#15# Redshift parameters16redshift_host_dns = 'YOUR REDSHIFT HOST DNS'17redshift_database = 'YOUR REDSHIFT DATABASE NAME'18redshift_username = 'REDSHIFT USERNAME'19redshift_password_for_red = 'REDSHIFT PASSWORD'20redshift_port_num = 'REDSHIFT PORT NUMBER'21# S3 parameters22key_id = 'YOUR AWS ACCESS KEY ID'23key_secret = 'YOUR AWS SECRET KEY'24# EMR parameters25emr_host_dns = 'YOUR EMR CLUSTER DNS'26#-----------------------------------------------------------------------------------#27# define DAG arguments and create a DAG #28#-----------------------------------------------------------------------------------#29default_args = {30 'owner': 'ChunYen-Chang',31 'start_date': datetime(2019, 9, 1),32 'depends_on_past': False,33 'retries': 3,34 'retry_delay': timedelta(minutes=5),35 'email_on_retry': False,36}37dag = DAG('i94form_analysis_dag',38 default_args=default_args,39 description='Extract i94form data from S3, transform it by python and spark, load it back to S3, and transfer it to Redshift',40 schedule_interval='0 0 1 * *',41 catchup=False,42 max_active_runs=143 )44#-----------------------------------------------------------------------------------#45# define task in this DAG #46#-----------------------------------------------------------------------------------#47# the dag structure explanation: #48# This dag can be seperated into two parts. The first part is extracting data #49# from S3, transforming data, and saving data back to S3. The second part is #50# moving the after-processing data from S3 to Redshift. #51# #52# In the first part,due to the data file size, we decide to use two different #53# ways to transform the data. One is using the python panda package (for the #54# small data file size.) Another one is using spark cluster. (for the data #55# file which has big size). Thus, in the belowing section, you can see we #56# design two different data pipelines. One extracts data from s3 to EC2 #57# instance, transforms data, and saves result back to S3. Another one extracts# 58# data from s3 to a spark cluster, transform data, and saves result back to S3#59# #60# In the second part, since this project uses the flask schema in data #61# warehouse, we define I94immigration_form as fact table, other tables as #62# dimensional table. In the beginning of second part, we drop tables, create #63# tables, and copy data fro S3 to Redshift #64#-----------------------------------------------------------------------------------#65# define start_operator task66start_operator = DummyOperator(67 task_id='Begin_execution', 68 dag=dag69)70# part 1: upload data to S3 (from local to S3)71upload_data_to_s3_task = PythonOperator(72 task_id='upload_data_to_s3',73 python_callable=pyfun.upload_to_aws,74 op_kwargs={75 'bucket': 'udacityfikrusnk',76 'AWS_ACCESS_KEY_ID': key_id,77 'AWS_SECRET_ACCESS_KEY': key_secret78 },79 dag=dag)80# part 1: extract data process (from s3 to EC2 instance)81read_airport_csv_from_s3_task = PythonOperator(82 task_id='read_airport_csv_from_s3',83 python_callable=pyfun.read_csv_from_s3,84 op_kwargs={ 85 'bucket': 'udacityfikrusnk',86 's3_file': 'airport-codes_csv.csv',87 'AWS_ACCESS_KEY_ID': key_id,88 'AWS_SECRET_ACCESS_KEY': key_secret89 },90 dag=dag)91read_demography_csv_from_s3_task = PythonOperator(92 task_id='read_demography_csv_from_s3',93 python_callable=pyfun.read_csv_from_s3,94 op_kwargs={95 'bucket': 'udacityfikrusnk',96 's3_file': 'us-cities-demographics.csv',97 'AWS_ACCESS_KEY_ID': key_id,98 'AWS_SECRET_ACCESS_KEY': key_secret,99 'delimiterValue': ';'100 },101 dag=dag)102read_I94SASLabels_from_s3_task = PythonOperator(103 task_id='read_I94SASLabels_from_s3',104 python_callable=pyfun.read_txt_from_s3,105 op_kwargs={106 'bucket': 'udacityfikrusnk',107 's3_file': 'I94_SAS_Labels_Descriptions.SAS',108 'AWS_ACCESS_KEY_ID': key_id,109 'AWS_SECRET_ACCESS_KEY': key_secret110 },111 dag=dag)112# part1: data transformation process (by python package)113process_airport_data_task = PythonOperator(114 task_id='process_airport_data',115 python_callable=pyfun.process_airport_data,116 provide_context=True,117 dag=dag)118process_demography_data_bycity_task = PythonOperator(119 task_id='process_demography_data_bycity',120 python_callable=pyfun.process_demography_data_bycity,121 provide_context=True,122 dag=dag)123process_demography_data_bystate_task = PythonOperator(124 task_id='process_demography_data_bystate',125 python_callable=pyfun.process_demography_data_bystate,126 provide_context=True,127 dag=dag)128process_I94SASLabels_task = PythonOperator(129 task_id='process_I94SASLabels',130 python_callable=pyfun.process_I94SASLabels,131 provide_context=True,132 dag=dag)133# part1: data loading process (from EC2 instance to S3)134write_airport_data_to_s3_task = PythonOperator(135 task_id='write_airport_data_to_s3',136 python_callable=pyfun.write_airport_data_to_s3,137 op_kwargs={138 'bucketname': 'udacityfikrusnk', 139 'filename': 'airport-codes-after-process.csv',140 'AWS_ACCESS_KEY_ID': key_id,141 'AWS_SECRET_ACCESS_KEY': key_secret142 },143 provide_context=True,144 dag=dag)145write_demographybycity_data_to_s3_task = PythonOperator(146 task_id='write_demographybycity_data_to_s3',147 python_callable=pyfun.write_demographybycity_data_to_s3,148 op_kwargs={149 'bucketname': 'udacityfikrusnk',150 'filename': 'us-demographics-by-city.csv',151 'AWS_ACCESS_KEY_ID': key_id,152 'AWS_SECRET_ACCESS_KEY': key_secret153 },154 provide_context=True,155 dag=dag)156write_demographybystate_data_to_s3_task = PythonOperator(157 task_id='write_demographybystate_data_to_s3',158 python_callable=pyfun.write_demographybystate_data_to_s3,159 op_kwargs={160 'bucketname': 'udacityfikrusnk',161 'filename': 'us-demographics-by-state.csv',162 'AWS_ACCESS_KEY_ID': key_id,163 'AWS_SECRET_ACCESS_KEY': key_secret164 },165 provide_context=True,166 dag=dag)167write_I94CITandI94RES_to_s3_task = PythonOperator(168 task_id='write_I94CITandI94RES_to_s3',169 python_callable=pyfun.write_I94CITandI94RES_to_s3,170 op_kwargs={171 'bucketname': 'udacityfikrusnk',172 'filename': 'I94CITandI94RES.csv',173 'AWS_ACCESS_KEY_ID': key_id,174 'AWS_SECRET_ACCESS_KEY': key_secret175 },176 provide_context=True,177 dag=dag)178write_I94PORT_to_s3_task = PythonOperator(179 task_id='write_I94PORT_to_s3',180 python_callable=pyfun.write_I94PORT_to_s3,181 op_kwargs={182 'bucketname': 'udacityfikrusnk',183 'filename': 'I94PORT.csv',184 'AWS_ACCESS_KEY_ID': key_id,185 'AWS_SECRET_ACCESS_KEY': key_secret186 },187 provide_context=True,188 dag=dag)189write_I94MODE_to_s3_task = PythonOperator(190 task_id='write_I94MODE_to_s3',191 python_callable=pyfun.write_I94MODE_to_s3,192 op_kwargs={193 'bucketname': 'udacityfikrusnk',194 'filename': 'I94MODE.csv',195 'AWS_ACCESS_KEY_ID': key_id,196 'AWS_SECRET_ACCESS_KEY': key_secret197 },198 provide_context=True,199 dag=dag)200write_I94ADDR_to_s3_task = PythonOperator(201 task_id='write_I94ADDR_to_s3',202 python_callable=pyfun.write_I94ADDR_to_s3,203 op_kwargs={204 'bucketname': 'udacityfikrusnk',205 'filename': 'I94ADDR.csv',206 'AWS_ACCESS_KEY_ID': key_id,207 'AWS_SECRET_ACCESS_KEY': key_secret208 },209 provide_context=True,210 dag=dag)211write_I94VISA_to_s3_task = PythonOperator(212 task_id='write_I94VISA_to_s3',213 python_callable=pyfun.write_I94VISA_to_s3,214 op_kwargs={215 'bucketname': 'udacityfikrusnk',216 'filename': 'I94VISA.csv',217 'AWS_ACCESS_KEY_ID': key_id,218 'AWS_SECRET_ACCESS_KEY': key_secret219 },220 provide_context=True,221 dag=dag)222# part1: extract data form S3, transform data by spark, and loadk data back to S3223submit_command_to_emr_task = PythonOperator(224 task_id='submit_command_to_emr',225 python_callable=libemr.submit_command_to_emr,226 op_kwargs={227 'cluster_dns': emr_host_dns228 },229 params = {"file" : '/usr/local/airflow/dags/lib/immigration_pyspark.py'},230 provide_context=True,231 dag=dag)232# the middlw execution phase233middle_operator = DummyOperator(task_id='middle_execution', dag=dag)234# part2: drop tables235drop_table_I94CIT_I94RES_Code_task = PythonOperator(236 task_id='drop_table_I94CIT_I94RES_Code',237 python_callable=red_fun.postgres_dropandcreate,238 op_kwargs={239 'host_dns': redshift_host_dns,240 'database': redshift_database,241 'username': redshift_username,242 'password_redshift': redshift_password_for_red,243 'port_num': redshift_port_num,244 'syntax': red_sql.I94CIT_I94RES_Code_drop245 },246 dag=dag)247drop_table_I94Addr_Code_task = PythonOperator(248 task_id='drop_table_I94Addr_Code',249 python_callable=red_fun.postgres_dropandcreate,250 op_kwargs={251 'host_dns': redshift_host_dns,252 'database': redshift_database,253 'username': redshift_username,254 'password_redshift': redshift_password_for_red,255 'port_num': redshift_port_num,256 'syntax': red_sql.I94Addr_Code_drop257 },258 dag=dag)259drop_table_US_Demography_by_State_task = PythonOperator(260 task_id='drop_table_US_Demography_by_State',261 python_callable=red_fun.postgres_dropandcreate,262 op_kwargs={263 'host_dns': redshift_host_dns,264 'database': redshift_database,265 'username': redshift_username,266 'password_redshift': redshift_password_for_red,267 'port_num': redshift_port_num,268 'syntax': red_sql.US_Demography_by_State_drop269 },270 dag=dag)271drop_table_I94Port_Code_task = PythonOperator(272 task_id='drop_table_I94Port_Code',273 python_callable=red_fun.postgres_dropandcreate,274 op_kwargs={275 'host_dns': redshift_host_dns,276 'database': redshift_database,277 'username': redshift_username,278 'password_redshift': redshift_password_for_red,279 'port_num': redshift_port_num,280 'syntax': red_sql.I94Port_Code_drop281 },282 dag=dag)283drop_table_I94Mode_Code_task = PythonOperator(284 task_id='drop_table_I94Mode_Code',285 python_callable=red_fun.postgres_dropandcreate,286 op_kwargs={287 'host_dns': redshift_host_dns,288 'database': redshift_database,289 'username': redshift_username,290 'password_redshift': redshift_password_for_red,291 'port_num': redshift_port_num,292 'syntax': red_sql.I94Mode_Code_drop293 },294 dag=dag)295drop_table_I94Visa_Code_task = PythonOperator(296 task_id='drop_table_I94Visa_Code',297 python_callable=red_fun.postgres_dropandcreate,298 op_kwargs={299 'host_dns': redshift_host_dns,300 'database': redshift_database,301 'username': redshift_username,302 'password_redshift': redshift_password_for_red,303 'port_num': redshift_port_num,304 'syntax': red_sql.I94Visa_Code_drop305 },306 dag=dag)307drop_table_US_Demography_by_City_task = PythonOperator(308 task_id='drop_table_US_Demography_by_City',309 python_callable=red_fun.postgres_dropandcreate,310 op_kwargs={311 'host_dns': redshift_host_dns,312 'database': redshift_database,313 'username': redshift_username,314 'password_redshift': redshift_password_for_red,315 'port_num': redshift_port_num,316 'syntax': red_sql.US_Demography_by_City_drop317 },318 dag=dag)319drop_table_Airport_Information_task = PythonOperator(320 task_id='drop_table_Airport_Information',321 python_callable=red_fun.postgres_dropandcreate,322 op_kwargs={323 'host_dns': redshift_host_dns,324 'database': redshift_database,325 'username': redshift_username,326 'password_redshift': redshift_password_for_red,327 'port_num': redshift_port_num,328 'syntax': red_sql.Airport_Information_drop329 },330 dag=dag)331# part2: create tables332create_table_I94Immigration_form_task = PythonOperator(333 task_id='create_table_I94Immigration_form',334 python_callable=red_fun.postgres_dropandcreate,335 op_kwargs={336 'host_dns': redshift_host_dns,337 'database': redshift_database,338 'username': redshift_username,339 'password_redshift': redshift_password_for_red,340 'port_num': redshift_port_num,341 'syntax': red_sql.I94Immigration_form_table_create342 },343 dag=dag)344create_table_I94CIT_I94RES_Code_task = PythonOperator(345 task_id='create_table_I94CIT_I94RES_Code',346 python_callable=red_fun.postgres_dropandcreate,347 op_kwargs={348 'host_dns': redshift_host_dns,349 'database': redshift_database,350 'username': redshift_username,351 'password_redshift': redshift_password_for_red,352 'port_num': redshift_port_num,353 'syntax': red_sql.I94CIT_I94RES_table_create354 },355 dag=dag)356create_table_I94Addr_Code_task = PythonOperator(357 task_id='create_table_I94Addr_Code',358 python_callable=red_fun.postgres_dropandcreate,359 op_kwargs={360 'host_dns': redshift_host_dns,361 'database': redshift_database,362 'username': redshift_username,363 'password_redshift': redshift_password_for_red,364 'port_num': redshift_port_num,365 'syntax': red_sql.I94Addr_Code_table_create366 },367 dag=dag)368create_table_US_Demography_by_State_task = PythonOperator(369 task_id='create_table_US_Demography_by_State',370 python_callable=red_fun.postgres_dropandcreate,371 op_kwargs={372 'host_dns': redshift_host_dns,373 'database': redshift_database,374 'username': redshift_username,375 'password_redshift': redshift_password_for_red,376 'port_num': redshift_port_num,377 'syntax': red_sql.US_Demography_by_State_table_create378 },379 dag=dag)380create_table_I94Port_Code_task = PythonOperator(381 task_id='create_table_I94Port_Code',382 python_callable=red_fun.postgres_dropandcreate,383 op_kwargs={384 'host_dns': redshift_host_dns,385 'database': redshift_database,386 'username': redshift_username,387 'password_redshift': redshift_password_for_red,388 'port_num': redshift_port_num,389 'syntax': red_sql.I94Port_Code_table_create390 },391 dag=dag)392create_table_I94Mode_Code_task = PythonOperator(393 task_id='create_table_I94Mode_Code',394 python_callable=red_fun.postgres_dropandcreate,395 op_kwargs={396 'host_dns': redshift_host_dns,397 'database': redshift_database,398 'username': redshift_username,399 'password_redshift': redshift_password_for_red,400 'port_num': redshift_port_num,401 'syntax': red_sql.I94Mode_Code_table_create402 },403 dag=dag)404create_table_I94Visa_Code_task = PythonOperator(405 task_id='create_table_I94Visa_Code',406 python_callable=red_fun.postgres_dropandcreate,407 op_kwargs={408 'host_dns': redshift_host_dns,409 'database': redshift_database,410 'username': redshift_username,411 'password_redshift': redshift_password_for_red,412 'port_num': redshift_port_num,413 'syntax': red_sql.I94Visa_Code_table_create414 },415 dag=dag)416create_table_US_Demography_by_City_task = PythonOperator(417 task_id='create_table_US_Demography_by_City',418 python_callable=red_fun.postgres_dropandcreate,419 op_kwargs={420 'host_dns': redshift_host_dns,421 'database': redshift_database,422 'username': redshift_username,423 'password_redshift': redshift_password_for_red,424 'port_num': redshift_port_num,425 'syntax': red_sql.US_Demography_by_City_table_create426 },427 dag=dag)428create_table_Airport_Information_task = PythonOperator(429 task_id='create_table_Airport_Information',430 python_callable=red_fun.postgres_dropandcreate,431 op_kwargs={432 'host_dns': redshift_host_dns,433 'database': redshift_database,434 'username': redshift_username,435 'password_redshift': redshift_password_for_red,436 'port_num': redshift_port_num,437 'syntax': red_sql.Airport_Information_table_create438 },439 dag=dag)440# copt data from S3 to each table441insert_table_I94Addr_Code_task = PythonOperator(442 task_id='insert_table_I94Addr_Code',443 python_callable=red_fun.postgres_insert,444 op_kwargs={445 'host_dns': redshift_host_dns,446 'database': redshift_database,447 'username': redshift_username,448 'password_redshift': redshift_password_for_red,449 'port_num': redshift_port_num,450 'syntax': red_sql.I94Addr_table_insert,451 'aws_key_id': key_id,452 'aws_secret_key': key_secret453 },454 dag=dag)455insert_table_I94CIT_I94RES_Code_task = PythonOperator(456 task_id='insert_table_I94CIT_I94RES_Code',457 python_callable=red_fun.postgres_insert,458 op_kwargs={459 'host_dns': redshift_host_dns,460 'database': redshift_database,461 'username': redshift_username,462 'password_redshift': redshift_password_for_red,463 'port_num': redshift_port_num,464 'syntax': red_sql.I94CITandI94RES_table_insert,465 'aws_key_id': key_id,466 'aws_secret_key': key_secret467 },468 dag=dag)469insert_table_I94Mode_Code_task = PythonOperator(470 task_id='insert_table_I94Mode_Code',471 python_callable=red_fun.postgres_insert,472 op_kwargs={473 'host_dns': redshift_host_dns,474 'database': redshift_database,475 'username': redshift_username,476 'password_redshift': redshift_password_for_red,477 'port_num': redshift_port_num,478 'syntax': red_sql.I94Mode_table_insert,479 'aws_key_id': key_id,480 'aws_secret_key': key_secret481 },482 dag=dag)483insert_table_I94Port_Code_task = PythonOperator(484 task_id='insert_table_I94Port_Code',485 python_callable=red_fun.postgres_insert,486 op_kwargs={487 'host_dns': redshift_host_dns,488 'database': redshift_database,489 'username': redshift_username,490 'password_redshift': redshift_password_for_red,491 'port_num': redshift_port_num,492 'syntax': red_sql.I94Port_table_insert,493 'aws_key_id': key_id,494 'aws_secret_key': key_secret495 },496 dag=dag)497insert_table_I94Visa_Code_task = PythonOperator(498 task_id='insert_table_I94Visa_Code',499 python_callable=red_fun.postgres_insert,500 op_kwargs={501 'host_dns': redshift_host_dns,502 'database': redshift_database,503 'username': redshift_username,504 'password_redshift': redshift_password_for_red,505 'port_num': redshift_port_num,506 'syntax': red_sql.I94Visa_table_insert,507 'aws_key_id': key_id,508 'aws_secret_key': key_secret509 },510 dag=dag)511insert_table_Airport_Information_task = PythonOperator(512 task_id='insert_table_Airport_Information',513 python_callable=red_fun.postgres_insert,514 op_kwargs={515 'host_dns': redshift_host_dns,516 'database': redshift_database,517 'username': redshift_username,518 'password_redshift': redshift_password_for_red,519 'port_num': redshift_port_num,520 'syntax': red_sql.Airport_Information_table_insert,521 'aws_key_id': key_id,522 'aws_secret_key': key_secret523 },524 dag=dag)525insert_table_US_Demography_by_City_task = PythonOperator(526 task_id='insert_table_US_Demography_by_City',527 python_callable=red_fun.postgres_insert,528 op_kwargs={529 'host_dns': redshift_host_dns,530 'database': redshift_database,531 'username': redshift_username,532 'password_redshift': redshift_password_for_red,533 'port_num': redshift_port_num,534 'syntax': red_sql.US_Demography_by_City_table_insert,535 'aws_key_id': key_id,536 'aws_secret_key': key_secret537 },538 dag=dag)539insert_table_US_Demography_by_State_task = PythonOperator(540 task_id='insert_table_US_Demography_by_State',541 python_callable=red_fun.postgres_insert,542 op_kwargs={543 'host_dns': redshift_host_dns,544 'database': redshift_database,545 'username': redshift_username,546 'password_redshift': redshift_password_for_red,547 'port_num': redshift_port_num,548 'syntax': red_sql.US_Demography_by_State_table_insert,549 'aws_key_id': key_id,550 'aws_secret_key': key_secret551 },552 dag=dag)553insert_table_I94Immigration_form_task = PythonOperator(554 task_id='insert_table_I94Immigration_form',555 python_callable=red_fun.postgres_insert_immigration,556 op_kwargs={557 'host_dns': redshift_host_dns,558 'database': redshift_database,559 'username': redshift_username,560 'password_redshift': redshift_password_for_red,561 'port_num': redshift_port_num,562 'syntax': red_sql.I94Immigration_form_table_insert,563 'aws_key_id': key_id,564 'aws_secret_key': key_secret565 },566 provide_context=True,567 dag=dag)568# define end_operator task569end_operator = DummyOperator(task_id='Stop_execution', dag=dag)570#-----------------------------------------------------------------------------------#571# define the data flow #572#-----------------------------------------------------------------------------------#573start_operator >> upload_data_to_s3_task574upload_data_to_s3_task >> submit_command_to_emr_task >> middle_operator575upload_data_to_s3_task >> [read_airport_csv_from_s3_task, read_demography_csv_from_s3_task, read_I94SASLabels_from_s3_task]576read_airport_csv_from_s3_task >> process_airport_data_task >> write_airport_data_to_s3_task577read_demography_csv_from_s3_task >> process_demography_data_bycity_task >> [write_demographybycity_data_to_s3_task, process_demography_data_bystate_task]578process_demography_data_bystate_task >> write_demographybystate_data_to_s3_task579read_I94SASLabels_from_s3_task >> process_I94SASLabels_task >> [write_I94CITandI94RES_to_s3_task, write_I94PORT_to_s3_task, write_I94MODE_to_s3_task, write_I94ADDR_to_s3_task, write_I94VISA_to_s3_task]580[write_airport_data_to_s3_task, write_demographybycity_data_to_s3_task, write_demographybystate_data_to_s3_task] >> middle_operator581[write_I94CITandI94RES_to_s3_task, write_I94PORT_to_s3_task, write_I94MODE_to_s3_task, write_I94ADDR_to_s3_task, write_I94VISA_to_s3_task] >> middle_operator582middle_operator >> drop_table_I94CIT_I94RES_Code_task >> create_table_I94CIT_I94RES_Code_task >> create_table_I94Immigration_form_task583middle_operator >> drop_table_I94Addr_Code_task >> create_table_I94Addr_Code_task >> create_table_I94Immigration_form_task584middle_operator >> drop_table_US_Demography_by_State_task >> create_table_US_Demography_by_State_task >> create_table_I94Immigration_form_task585middle_operator >> drop_table_I94Port_Code_task >> create_table_I94Port_Code_task >> create_table_I94Immigration_form_task586middle_operator >> drop_table_I94Mode_Code_task >> create_table_I94Mode_Code_task >> create_table_I94Immigration_form_task587middle_operator >> drop_table_I94Visa_Code_task >> create_table_I94Visa_Code_task >> create_table_I94Immigration_form_task588middle_operator >> drop_table_US_Demography_by_City_task >> create_table_US_Demography_by_City_task >> create_table_I94Immigration_form_task589middle_operator >> drop_table_Airport_Information_task >> create_table_Airport_Information_task >> create_table_I94Immigration_form_task590create_table_I94Immigration_form_task >> insert_table_I94CIT_I94RES_Code_task >> end_operator 591create_table_I94Immigration_form_task >> insert_table_I94Addr_Code_task >> end_operator592create_table_I94Immigration_form_task >> insert_table_US_Demography_by_State_task >> end_operator593create_table_I94Immigration_form_task >> insert_table_I94Port_Code_task >> end_operator594create_table_I94Immigration_form_task >> insert_table_I94Mode_Code_task >> end_operator595create_table_I94Immigration_form_task >> insert_table_I94Visa_Code_task >> end_operator596create_table_I94Immigration_form_task >> insert_table_US_Demography_by_City_task >> end_operator597create_table_I94Immigration_form_task >> insert_table_Airport_Information_task >> end_operator...
test_redshift_hook.py
Source:test_redshift_hook.py
1# -*- coding: utf-8 -*-2#3# Licensed to the Apache Software Foundation (ASF) under one4# or more contributor license agreements. See the NOTICE file5# distributed with this work for additional information6# regarding copyright ownership. The ASF licenses this file7# to you under the Apache License, Version 2.0 (the8# "License"); you may not use this file except in compliance9# with the License. You may obtain a copy of the License at10#11# http://www.apache.org/licenses/LICENSE-2.012#13# Unless required by applicable law or agreed to in writing,14# software distributed under the License is distributed on an15# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY16# KIND, either express or implied. See the License for the17# specific language governing permissions and limitations18# under the License.19#20import unittest21import boto322from airflow.contrib.hooks.redshift_hook import RedshiftHook23from airflow.contrib.hooks.aws_hook import AwsHook24try:25 from moto import mock_redshift26except ImportError:27 mock_redshift = None28class TestRedshiftHook(unittest.TestCase):29 @staticmethod30 def _create_clusters():31 client = boto3.client('redshift', region_name='us-east-1')32 client.create_cluster(33 ClusterIdentifier='test_cluster',34 NodeType='dc1.large',35 MasterUsername='admin',36 MasterUserPassword='mock_password'37 )38 client.create_cluster(39 ClusterIdentifier='test_cluster_2',40 NodeType='dc1.large',41 MasterUsername='admin',42 MasterUserPassword='mock_password'43 )44 if len(client.describe_clusters()['Clusters']) == 0:45 raise ValueError('AWS not properly mocked')46 @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present')47 @mock_redshift48 def test_get_client_type_returns_a_boto3_client_of_the_requested_type(self):49 self._create_clusters()50 hook = AwsHook(aws_conn_id='aws_default')51 client_from_hook = hook.get_client_type('redshift')52 clusters = client_from_hook.describe_clusters()['Clusters']53 self.assertEqual(len(clusters), 2)54 @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present')55 @mock_redshift56 def test_restore_from_cluster_snapshot_returns_dict_with_cluster_data(self):57 self._create_clusters()58 hook = RedshiftHook(aws_conn_id='aws_default')59 hook.create_cluster_snapshot('test_snapshot', 'test_cluster')60 self.assertEqual(61 hook.restore_from_cluster_snapshot(62 'test_cluster_3', 'test_snapshot'63 )['ClusterIdentifier'],64 'test_cluster_3')65 @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present')66 @mock_redshift67 def test_delete_cluster_returns_a_dict_with_cluster_data(self):68 self._create_clusters()69 hook = RedshiftHook(aws_conn_id='aws_default')70 cluster = hook.delete_cluster('test_cluster_2')71 self.assertNotEqual(cluster, None)72 @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present')73 @mock_redshift74 def test_create_cluster_snapshot_returns_snapshot_data(self):75 self._create_clusters()76 hook = RedshiftHook(aws_conn_id='aws_default')77 snapshot = hook.create_cluster_snapshot('test_snapshot_2', 'test_cluster')78 self.assertNotEqual(snapshot, None)79 @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present')80 @mock_redshift81 def test_cluster_status_returns_cluster_not_found(self):82 self._create_clusters()83 hook = RedshiftHook(aws_conn_id='aws_default')84 status = hook.cluster_status('test_cluster_not_here')85 self.assertEqual(status, 'cluster_not_found')86 @unittest.skipIf(mock_redshift is None, 'mock_redshift package not present')87 @mock_redshift88 def test_cluster_status_returns_available_cluster(self):89 self._create_clusters()90 hook = RedshiftHook(aws_conn_id='aws_default')91 status = hook.cluster_status('test_cluster')92 self.assertEqual(status, 'available')93if __name__ == '__main__':...
redshift_function.py
Source:redshift_function.py
1# import packages2import psycopg23# define functions4def postgres_dropandcreate(host_dns, database, username, password_redshift, port_num, syntax):5 """6 Description: This function helps users to drop Redshift tables.7 Parameters: -host_dns: Redshift DNS8 -database: Redshift database name9 -username: Redshift username10 -password_redshift: Redshift password11 -port_num: Redshift port12 -syntax: The SQL code for deleting Redshift tables13 Returns: None14 """15 conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(host_dns, database, username, password_redshift, port_num))16 cur = conn.cursor()17 cur.execute(syntax)18 conn.commit()19def postgres_insert(host_dns, database, username, password_redshift, port_num, syntax, aws_key_id, aws_secret_key):20 """21 Description: This function helps users to copy data from S3 to Redshift dimensional tables22 Parameters: -host_dns: Redshift DNS23 -database: Redshift database name24 -username: Redshift username25 -password_redshift: Redshift password26 -port_num: Redshift port27 -syntax: The SQL code for loading data into Redshift dimensional tables28 -aws_key_id: the aws id for connecting to S329 -aws_secret_key: the aws password for connecting to S330 Returns: None31 """32 conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(host_dns, database, username, password_redshift, port_num))33 cur = conn.cursor()34 print(syntax)35 syntax = syntax.format(aws_key_id,aws_secret_key)36 print(syntax)37 cur.execute(syntax)38 conn.commit()39def postgres_insert_immigration(host_dns, database, username, password_redshift, port_num, syntax, aws_key_id, aws_secret_key, **kwargs):40 """41 Description: This function helps users to copy data from S3 to Redshift fact tables42 Parameters: -host_dns: Redshift DNS43 -database: Redshift database name44 -username: Redshift username45 -password_redshift: Redshift password46 -port_num: Redshift port47 -syntax: The SQL code for loading data into Redshift dimensional tables48 -aws_key_id: the aws id for connecting to S349 -aws_secret_key: the aws password for connecting to S350 Returns: None51 """52 conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(host_dns, database, username, password_redshift, port_num))53 cur = conn.cursor()54 55 # receive the airflow context varable56 ti = kwargs['ti']57 # extract monthyear variable from xcom58 monthyear = ti.xcom_pull(task_ids='submit_command_to_emr')59 60 syntax = syntax.format(monthyear, aws_key_id,aws_secret_key)61 62 cur.execute(syntax)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!