Best Python code snippet using localstack_python
example_dataproc.py
Source:example_dataproc.py
1# -*- coding: utf-8 -*-2#3# Licensed to the Apache Software Foundation (ASF) under one4# or more contributor license agreements. See the NOTICE file5# distributed with this work for additional information6# regarding copyright ownership. The ASF licenses this file7# to you under the Apache License, Version 2.0 (the8# "License"); you may not use this file except in compliance9# with the License. You may obtain a copy of the License at10#11# http://www.apache.org/licenses/LICENSE-2.012#13# Unless required by applicable law or agreed to in writing,14# software distributed under the License is distributed on an15# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY16# KIND, either express or implied. See the License for the17# specific language governing permissions and limitations18# under the License.19"""20Example Airflow DAG that show how to use various Dataproc21operators to manage a cluster and submit jobs.22"""23import os24import airflow25from airflow import models26from airflow.providers.google.cloud.operators.dataproc import (27 DataprocClusterCreateOperator, DataprocClusterDeleteOperator, DataprocSubmitJobOperator,28 DataprocUpdateClusterOperator,29)30PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")31CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "example-project")32REGION = os.environ.get("GCP_LOCATION", "europe-west1")33ZONE = os.environ.get("GCP_REGION", "europe-west-1b")34BUCKET = os.environ.get("GCP_DATAPROC_BUCKET", "dataproc-system-tests")35OUTPUT_FOLDER = "wordcount"36OUTPUT_PATH = "gs://{}/{}/".format(BUCKET, OUTPUT_FOLDER)37PYSPARK_MAIN = os.environ.get("PYSPARK_MAIN", "hello_world.py")38PYSPARK_URI = "gs://{}/{}".format(BUCKET, PYSPARK_MAIN)39# Cluster definition40CLUSTER = {41 "project_id": PROJECT_ID,42 "cluster_name": CLUSTER_NAME,43 "config": {44 "master_config": {45 "num_instances": 1,46 "machine_type_uri": "n1-standard-4",47 "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},48 },49 "worker_config": {50 "num_instances": 2,51 "machine_type_uri": "n1-standard-4",52 "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},53 },54 },55}56# Update options57CLUSTER_UPDATE = {58 "config": {59 "worker_config": {"num_instances": 3},60 "secondary_worker_config": {"num_instances": 3},61 }62}63UPDATE_MASK = {64 "paths": [65 "config.worker_config.num_instances",66 "config.secondary_worker_config.num_instances",67 ]68}69TIMEOUT = {"seconds": 1 * 24 * 60 * 60}70# Jobs definitions71PIG_JOB = {72 "reference": {"project_id": PROJECT_ID},73 "placement": {"cluster_name": CLUSTER_NAME},74 "pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},75}76SPARK_SQL_JOB = {77 "reference": {"project_id": PROJECT_ID},78 "placement": {"cluster_name": CLUSTER_NAME},79 "spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},80}81SPARK_JOB = {82 "reference": {"project_id": PROJECT_ID},83 "placement": {"cluster_name": CLUSTER_NAME},84 "spark_job": {85 "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],86 "main_class": "org.apache.spark.examples.SparkPi",87 },88}89PYSPARK_JOB = {90 "reference": {"project_id": PROJECT_ID},91 "placement": {"cluster_name": CLUSTER_NAME},92 "pyspark_job": {"main_python_file_uri": PYSPARK_URI},93}94HIVE_JOB = {95 "reference": {"project_id": PROJECT_ID},96 "placement": {"cluster_name": CLUSTER_NAME},97 "hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},98}99HADOOP_JOB = {100 "reference": {"project_id": PROJECT_ID},101 "placement": {"cluster_name": CLUSTER_NAME},102 "hadoop_job": {103 "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",104 "args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],105 },106}107with models.DAG(108 "example_gcp_dataproc",109 default_args={"start_date": airflow.utils.dates.days_ago(1)},110 schedule_interval=None,111) as dag:112 create_cluster = DataprocClusterCreateOperator(113 task_id="create_cluster", project_id=PROJECT_ID, cluster=CLUSTER, region=REGION114 )115 scale_cluster = DataprocUpdateClusterOperator(116 task_id="scale_cluster",117 cluster_name=CLUSTER_NAME,118 cluster=CLUSTER_UPDATE,119 update_mask=UPDATE_MASK,120 graceful_decommission_timeout=TIMEOUT,121 project_id=PROJECT_ID,122 location=REGION,123 )124 pig_task = DataprocSubmitJobOperator(125 task_id="pig_task", job=PIG_JOB, location=REGION, project_id=PROJECT_ID126 )127 spark_sql_task = DataprocSubmitJobOperator(128 task_id="spark_sql_task",129 job=SPARK_SQL_JOB,130 location=REGION,131 project_id=PROJECT_ID,132 )133 spark_task = DataprocSubmitJobOperator(134 task_id="spark_task", job=SPARK_JOB, location=REGION, project_id=PROJECT_ID135 )136 pyspark_task = DataprocSubmitJobOperator(137 task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID138 )139 hive_task = DataprocSubmitJobOperator(140 task_id="hive_task", job=HIVE_JOB, location=REGION, project_id=PROJECT_ID141 )142 hadoop_task = DataprocSubmitJobOperator(143 task_id="hadoop_task", job=HADOOP_JOB, location=REGION, project_id=PROJECT_ID144 )145 delete_cluster = DataprocClusterDeleteOperator(146 task_id="delete_cluster",147 project_id=PROJECT_ID,148 cluster_name=CLUSTER_NAME,149 region=REGION,150 )151 create_cluster >> scale_cluster152 scale_cluster >> hive_task >> delete_cluster153 scale_cluster >> pig_task >> delete_cluster154 scale_cluster >> spark_sql_task >> delete_cluster155 scale_cluster >> spark_task >> delete_cluster156 scale_cluster >> pyspark_task >> delete_cluster...
airflow_data_proc.py
Source:airflow_data_proc.py
1import os2from airflow import DAG3from airflow.providers.google.cloud.operators.dataproc import (4 DataprocCreateClusterOperator,5 DataprocDeleteClusterOperator,6 DataprocSubmitJobOperator,7)8from airflow.utils.dates import days_ago9PROJECT_ID = os.environ.get('GCP_PROJECT')10REGION = 'us-central1'11CLUSTER_NAME = 'ephemeral-spark-cluster-{{ ds_nodash }}'12PYSPARK_URI = 'gs://packt-data-eng-on-gcp-data-bucket/chapter-5/code/pyspark_gcs_to_bq.py'13PYSPARK_JOB = {14 "reference": {"project_id": PROJECT_ID},15 "placement": {"cluster_name": CLUSTER_NAME},16 "pyspark_job": {"main_python_file_uri": PYSPARK_URI,17 "jar_file_uris":["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"]18 }19}20cluster_config_json = {21 "worker_config": {22 "num_instances": 223 }24}25args = {26 'owner': 'packt-developer',27}28with DAG(29 dag_id='dataproc_ephemeral_cluster_job',30 schedule_interval='0 5 * * *',31 start_date=days_ago(1),32 default_args=args33) as dag:34 create_cluster = DataprocCreateClusterOperator(35 task_id="create_cluster",36 project_id=PROJECT_ID,37 cluster_config=cluster_config_json,38 region=REGION,39 cluster_name=CLUSTER_NAME,40 idle_delete_ttl=60041 )42 pyspark_task = DataprocSubmitJobOperator(43 task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID44 )45 delete_cluster = DataprocDeleteClusterOperator(46 task_id="delete_cluster", project_id=PROJECT_ID, cluster_name=CLUSTER_NAME, region=REGION47 )...
delete_cluster.py
Source:delete_cluster.py
...3import logging4# CONFIG5config = configparser.ConfigParser()6config.read('dwh.cfg')7def delete_cluster():8 """9 Deletes Redshift cluster.10 """11 try:12 redshift = boto3.client('redshift',13 config['REGION']['AWS_REGION']14 )15 redshift.delete_cluster(16 ClusterIdentifier=config['CLUSTER']['CLUSTER_ID'], 17 SkipFinalClusterSnapshot=True18 )19 except Exception as e:20 logging.error(e)21if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!