Best Python code snippet using SeleniumBase
lambda_function.py
Source:lambda_function.py
...51 timestamp = event[source_prefix]["SubmitTime"]52 logger.debug(f"Retrieving topic dict for {TMP_DIR+TOPICS_FILE_NAME}")53 pubish_topics(source_prefix, job_id, timestamp, doc_topics_file_name=TMP_DIR + TOPICS_FILE_NAME)54 logger.debug("Complete topic publishing")55 delete_downloaded_file(TMP_DIR + TOPICS_FILE_NAME) # delete the topics csv file56def topic_mapping_handler(event, context):57 records = event["Records"]58 logger.debug("Staring to process the records")59 try:60 for record in records:61 logger.debug(f"Records is {record}")62 message = json.loads(record["body"])63 publish_topic_id_mapping(message["Platform"], message["JobId"], message["SubmitTime"], message["Topic"])64 except Exception as e:65 logger.error(f"Error occured when processing topics: {e}")66 raise e67 logger.debug("Publishing topics mappings complete")68def topic_terms_handler(event, context):69 source_prefix_list = os.environ["SOURCE_PREFIX"].lower().split(",")70 for source_prefix in source_prefix_list:71 file_available = False72 try:73 file_available = is_file_available(event, source_prefix, TERMS_FILE_NAME)74 except IngestionSourcePrefixMissingError as prefix_error:75 logger.error(f"Received prefix missing error - {prefix_error} for prefix: {source_prefix}")76 continue77 if file_available:78 # get job ID and timestamp details79 job_id = event[source_prefix]["JobId"]80 timestamp = event[source_prefix]["SubmitTime"]81 publish_topics(job_id, timestamp, topic_terms_file_name=TMP_DIR + TERMS_FILE_NAME)82 logger.debug("Publishing topics terms complete")83 delete_downloaded_file(TMP_DIR + TERMS_FILE_NAME) # delete the terms csv file84def is_file_available(event, source_prefix, file_to_extract):85 if event.get(source_prefix, None):86 s3_uri_parse = urlparse(event[source_prefix]["OutputDataConfig"]["S3Uri"])87 bucket = s3_uri_parse.netloc88 key = s3_uri_parse.path.lstrip("/")89 logger.debug("Bucket is " + bucket + " and key is " + key)90 file_name = os.path.basename(key)91 logger.debug("File name is " + file_name)92 try:93 """94 Lambda functions provide a /tmp directory to store temporary files.95 This is not the same /tmp as on a conventional unix OR linux96 system. Hence suppressing the rule97 """98 s3.download_file(bucket, key, TMP_DIR + file_name)99 logger.debug(file_name + " downloaded from S3 bucket")100 if tarfile.is_tarfile(TMP_DIR + file_name):101 # This archive is generated by AWS Comprehend Topic Modeling job and stored in an S3 bucket.102 # The bucket permissions only allow the comprehend job and lambda function to read/ write from it103 archive_file = tarfile.open(TMP_DIR + file_name)104 file_list = archive_file.getnames()105 logger.debug(f"File list length is {len(file_list)} and files in the archive {file_list}")106 if len(file_list) != 2 and not (TOPICS_FILE_NAME in file_list and TERMS_FILE_NAME in file_list):107 raise IncorrectTarFileException(108 "Either number of files in the archive are not 2 or file names are not as expected in the archive. May not be a valid archive"109 )110 archive_file.extractall(TMP_DIR, member_file_to_extract(archive_file, file_to_extract))111 archive_file.close()112 delete_downloaded_file(TMP_DIR + file_name) # delete the downloaded archive113 logger.debug(f"Extraction complete. Files in the directory are {os.listdir(TMP_DIR)}")114 return True115 except Exception as e:116 logger.error(f"Error occured when processing topics: ${str(e)}")117 raise e118 else:119 logger.error(f"Ingestion source prefix information not available in event to process data")120 raise IngestionSourcePrefixMissingError(121 "Ingestion source prefix information not available in event to process data"122 )123def member_file_to_extract(archive_file, file_to_extract):124 for tarinfo in archive_file:125 logger.debug(f"File name in archive: {tarinfo.name} and ")126 if os.path.basename(tarinfo.name) == file_to_extract:127 logger.debug(f"inside if loop {tarinfo}")128 yield tarinfo129def delete_downloaded_file(file_path):130 """131 This method deletes the file name that is passed to it from the /tmp/ directory.132 """133 os.remove(file_path)...
fetch_article.py
Source:fetch_article.py
...19 if chunk:20 f.write(chunk)21 f.flush()22 return local_filename23def delete_downloaded_file(file_name):24 os.unlink(file_name)25def get_favicon(url):26 parts = urlparse(url)27 return "{}://{}/favicon.ico".format(parts.scheme, parts.netloc)28@app.task(name='fetch-article')29def fetch_article_action(input_payload):30 with time_print(logging, "Fetching article"):31 result_payloads = do_fetch_article(input_payload)32 return result_payloads33def extract_metadata(source):34 logging.info("-"*70)35 logging.info("-"*70)36 logging.info("-"*70)37 logging.info(source)38 logging.info("-"*70)39 logging.info("-"*70)40 logging.info("-"*70)41 article = Schemato(source)42 metadata = {}43 d1 = ParselyDistiller(article)44 try:45 metadata = d1.distill()46 except AttributeError:47 logging.exception("ParselyDistiller failed to extract metadata from article")48 try:49 d2 = NewsDistiller(article)50 metadata = d2.distill()51 except AttributeError:52 logging.exception("NewsDistiller failed to extract metadata from article")53 if metadata:54 logging.info("Yay, extracted metadata:")55 logging.info(metadata)56 return metadata57def read_file(file_name):58 with open(file_name) as f:59 result = f.read()60 return result61def do_fetch_article(input_payload):62 logging.info("Fetching article from social post")63 result_payloads = []64 for link in input_payload["urls"]:65 url = link.get("expanded_url")66 display_url = link.get("display_url")67 shortened_url = link.get("url")68 file_name = download_file(url)69 text = textract.process(file_name)70 logging.info("Extracted article text ({} characters)".format(len(text)))71 metadata = {}72 try:73 metadata = extract_metadata(file_name)74 except:75 logging.exception("Failed to extract metadata from {}".format(url))76 delete_downloaded_file(file_name)77 logging.info("Deleted temp file: {}".format(file_name))78 result_payloads.append(79 {80 "contentType": "article-text",81 "key": url,82 "picture": get_favicon(url),83 "summary": {84 "url": url,85 "display_url": display_url,86 "shortened_url": shortened_url,87 "metadata": metadata88 },89 "raw": {90 "text": text...
data_load.py
Source:data_load.py
1import datetime2import logging3import pandas as pd4import psycopg25import os6from airflow import DAG7from airflow.hooks.postgres_hook import PostgresHook8from airflow.operators.postgres_operator import PostgresOperator9from airflow.operators.python_operator import PythonOperator10from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator11import sql_queries12# FOLDER_ID = '1nj2AXJG10DTjSjL0J17WlDdZeunJQ9r6'#'1rgBurNjLUqErTcaLx0TB53FNWEsausiJhP9Oa3goJSzlv3_xaAjBD8tPz4ROmSIq96KSHggH'13# FILE_NAME = 'user_purchase.csv'#14def delete_file():15 os.remove("user_purchase.csv")16def load_csv(*args, **kwargs):17 table = 'user_purchase'18 cloudsql_hook = PostgresHook(postgres_conn_id="cloudsql")19 conn = cloudsql_hook.get_conn()20 cursor = conn.cursor()21 fpath = 'user_purchase.csv'22 with open('user_purchase.csv', "r") as f:23 next(f)24 cursor.copy_expert("COPY user_purchase FROM STDIN WITH CSV HEADER", f)25 conn.commit()26 # with open(fpath, 'r') as f:27 # next(f)28 # cursor.copy_expert("COPY users.user_purchase FROM STDIN WITH CSV HEADER", f)29 # conn.commit()30 #31 # # CSV loading to table32 # with open(file_path("cities_clean.csv"), "r") as f:33 # next(f)34 # curr.copy_from(f, 'cities', sep=",")35 # get_postgres_conn.commit()36 # print("loading csv done")37 # cursor.close()38dag = DAG(39 'load_users_table',40 start_date=datetime.datetime.now()41)42download_file = GCSToLocalFilesystemOperator(43 dag=dag,44 task_id="download_file",45 object_name='user_purchase.csv',46 bucket='ir-raw-data',47 filename='user_purchase.csv'48)49create_table = PostgresOperator(50 task_id="create_table",51 dag=dag,52 postgres_conn_id="cloudsql",53 sql=sql_queries.CREATE_USER_PURCHASE_TABLE54)55copy_from_gcs = PythonOperator(56 task_id='copy_csv_to_cloudsql',57 dag=dag,58 python_callable=load_csv,59)60delete_downloaded_file = PythonOperator(61 task_id='delete_csv_file',62 dag=dag,63 python_callable=delete_file,64)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!