Best Python code snippet using playwright-python
main.py
Source:main.py
...10ALLOWED_EXTENSIONS = set(['csv', 'xlsx'])11GEOCODE_URL = "https://maps.googleapis.com/maps/api/geocode/json"12def serialize_response(data):13 return json.dumps({'rows': data}, indent=2, ensure_ascii=False)14def serialize_error(message):15 return json.dumps({'error': message})16def read_functions(extension):17 dic = {18 'csv': pd.read_csv,19 'xlsx': pd.read_excel20 }21 return dic[extension]22def get_google_results(address):23 """24 Get geocode results from Google Maps Geocoding API.25 26 Note, that in the case of multiple google geocode reuslts, this function returns details of the FIRST result.27 28 @param address: String address as accurate as possible. For Example "18 Grafton Street, Dublin, Ireland"29 @param api_key: String API key if present from google. 30 If supplied, requests will use your allowance from the Google API. If not, you31 will be limited to the free usage of 2500 requests per day.32 @param return_full_response: Boolean to indicate if you'd like to return the full response from google. This33 is useful if you'd like additional location details for storage or parsing later.34 """35 # Set up your Geocoding url36 logging.info("[GOOGLE URL]: init")37 params = {38 "address":address,39 "key":GEOPY.get('AQUEDUCT_GOOGLE_PLACES_PRIVATE_KEY')40 }41 42 # Ping google for the reuslts:43 try:44 with requests.Session() as s:45 s.mount('https://',HTTPAdapter(max_retries=Retry(2, backoff_factor=0.001)))46 r = s.get(url=GEOCODE_URL, params=params, timeout=5)47 48 if r.status_code == requests.codes.ok:49 # Results will be in JSON format - convert to dict using requests functionality50 results = r.json()51 # if there's no results or an error, return empty results.52 if len(results['results']) == 0:53 output = {54 "matched_address" : None,55 "lat": None,56 "lon": None,57 "match": False58 }59 else: 60 answer = results['results'][0]61 output = {62 "matched_address" : answer.get('formatted_address'),63 "lat": answer.get('geometry').get('location').get('lat'),64 "lon": answer.get('geometry').get('location').get('lng'),65 "match":True66 }67 else:68 logging.error(f"[GEOCODER: Get google place]: {r.text}")69 logging.error(f"[GEOCODER- GOOGLE URL]: {r.status_code}")70 output = {71 "matched_address" : None,72 "lat": None,73 "lon": None,74 "match": False75 }76 77 # Append some other details: 78 output['address'] = address79 output['number_of_results'] = len(results['results'])80 output['status'] = results.get('status')81 82 return output83 except Exception as e:84 raise e85def get_latlonrow(x):86 index, row = x87 logging.info(f"{index}")88 if pd.notna(row['address']) or (row['address'] in ('', ' ')):89 address = get_google_results(row['address'])90 return {**address, **row}91 else:92 return {93 **row,94 "matched_address" : None,95 "lat": None,96 "lon": None,97 "match": False,98 "number_of_results": None,99 "status":"Address value not available"100 }101def geocoding(data):102 try:103 logging.info(f'[GeoCode Service] geocoding init:')104 105 with Pool(processes=16) as p:106 output = p.map_async(get_latlonrow, data.iterrows())107 output.wait()108 109 logging.info('[GeoCode Service] geocoding end')110 return output.get() 111 except Exception as e:112 raise e113def geocoder(request):114 # For more information about CORS and CORS preflight requests, see115 # https://developer.mozilla.org/en-US/docs/Glossary/Preflight_request116 # for more information.117 # Set CORS headers for the main request118 request.get_json()119 120 headers = {121 'Access-Control-Allow-Origin': '*',122 'Content-Type': 'application/json'123 }124 # Set CORS headers for the preflight request125 if request.method == 'OPTIONS':126 # Allows GET requests from any origin with the Content-Type127 # header and caches preflight response for an 3600s128 headers.update({129 'Access-Control-Allow-Origin': '*',130 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',131 'Access-Control-Allow-Headers': 'Content-Type',132 'Access-Control-Max-Age': '3600'133 })134 return ('', 204, headers)135 try:136 logging.info(f'[GeoCode Service] init')137 if request.method == 'POST':138 if 'file' not in request.files:139 return (serialize_error(f'No file provided'), 500, headers)140 141 file = request.files['file']142 extension = file.filename.rsplit('.')[-1].lower()143 if extension in ALLOWED_EXTENSIONS:144 data = read_functions(extension)(request.files.get('file'))145 data.columns = data.columns.str.lower()146 147 logging.info(f'[GeoCode Service] Data loaded: {data.columns}')148 data.rename(columns={'Unnamed: 0': 'row'}, inplace=True)149 150 if 'row' not in data.columns:151 data.insert(loc=0, column='row', value=range(1, 1 + len(data)))152 153 154 data.dropna(axis=1, how='all', inplace=True)155 156 #data.fillna(value=None, method=None, inplace=True)157 logging.info(f'[GeoCode Service] Data loaded; columns cleaned: {data.columns}')158 if 'location_name' not in data.columns:159 data.insert(loc=0, column='location_name', value=range(1, 1 + len(data)))160 logging.info(f'[GeoCode Service] Data loaded; columns cleaned: {data.columns}')161 if 'address' not in data.columns:162 return (serialize_error(f'Address column missing'), 500, headers)163 if len(data) == 0:164 return (serialize_error(f'The file is empty'), 500, headers)165 if len(data) > 500:166 dataSize = len(data)167 return (serialize_error(f'Row number should be less or equal to 500, provided {dataSize} rows'), 500, headers)168 169 return (serialize_response(geocoding(data)), 200, headers)170 else:171 return (serialize_error(f'{extension} is not an allowed file extension'), 500, headers)172 else:173 return (serialize_error(f'request method ({request.method}) not allowed'), 500, headers) 174 except Exception as e:...
Update.py
Source:Update.py
...11 from boto3.session import Session12 from botocore.exceptions import ClientError13except ImportError as e:14 module = str(e).split("'")15 response = utility.serialize_error(False, module[1], module[0])16 sys.exit(response)17class Update(object):18 def __init__(self):19 # database init20 self.db = cfg.database["file"]21 self.path = cfg.database["path"]22 self.local_db = os.path.join(self.path, self.db)23 self.access_key = cfg.subscription["access_key"]24 self.secret_key = cfg.subscription["secret_key"]25 self.plan_license = cfg.subscription["plan"]26 def update(self):27 """ callable method - initiate the database update in accordance with plan validity """28 # init authorization29 files = self.authorization()30 try:31 for self.file in files:32 if "update" in self.file:33 self.update_file = self.file34 else:35 self.remote_db = self.file36 if not os.path.isfile(self.local_db):37 print("[+] Deploying new database ...")38 self.download(self.remote_db)39 self.unpack_database()40 else:41 print("[+] Checking update status ...")42 self.download(self.update_file)43 self.check_status(self.update_file)44 except Exception as e:45 response = utility.serialize_error(False, str(e), str(e))46 sys.exit(response)47 def check_status(self, file):48 """ check if new db is available"""49 # set the target file50 file = os.path.join(self.path, file)51 try:52 with open(file, 'r') as f:53 checksum_remote = f.read().strip()54 print("\t[-] Checksum verification", checksum_remote)55 if checksum_remote == utility.checksum(self.local_db):56 print("\t[-] Already updated")57 self.clean()58 else:59 print("\t[-] Database update available")60 self.download(self.remote_db)61 self.unpack_database()62 except Exception as e:63 response = utility.serialize_error(False, str(e), str(e))64 sys.exit(response)65 def download(self, file):66 """ download files"""67 print("\t[-] Downloading", file)68 # set the target file69 self.target = os.path.join(self.path, file)70 try:71 self.bucket.download_file(file, self.target)72 except Exception as e:73 response = utility.serialize_error(False, str(e), str(e))74 sys.exit(response)75 def unpack_database(self):76 """ extract database """77 print("\t[-] Unpacking", self.target)78 try:79 tar = tarfile.open(self.target, 'r:gz')80 tar.extractall('.')81 except Exception as e:82 response = utility.serialize_error(False, str(e), str(e))83 sys.exit(response)84 shutil.move(self.db, self.local_db)85 self.clean()86 def authorization(self):87 """ check authorization """88 # init files dict89 files = []90 try:91 session = Session(aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key)92 s3 = session.resource('s3')93 self.bucket = s3.Bucket(self.plan_license)94 for file in self.bucket.objects.all():95 files.append(file.key)96 except Exception as e:97 if "Could not connect" in str(e):98 reason = "Connectivity error"99 else:100 code_error = e.response["Error"]["Code"]101 if code_error == "403":102 reason = "Access suspended to: %s" % self.plan_license103 if code_error == "AccessDenied":104 reason = "Access denied to plan: %s" % self.plan_license105 if code_error == "InvalidAccessKeyId":106 reason = "Error on access key: %s" % self.access_key107 if code_error == "SignatureDoesNotMatch":108 reason = "Error on secret key: %s" % self.secret_key109 if code_error == "AuthorizationHeaderMalformed":110 reason = "Empty access key"111 if code_error == "NoSuchBucket":112 reason = "Licensed plan not specified."113 response = utility.serialize_error(False, reason, str(e))114 sys.exit(response)115 return files116 def clean(self):117 """ clean directory"""118 print("[+] Cleaning tmp downloads ...")119 try:120 for file in os.listdir(self.path):121 if "tgz" in file or "update" in file:122 os.remove(os.path.join(self.path, file))123 else:124 pass125 except Exception as e:...
newsletter.py
Source:newsletter.py
...15 """ returns MD5 hash of the lowercase version of the email address. """16 email_hash = hashlib.md5(email.encode('utf-8').lower()).hexdigest()17 return email_hash18 @staticmethod19 def serialize_error(error:ApiClientError) -> dict:20 """ 21 convert apiclient error to dict 22 type(error.text) -> str so it has to be converted to dict23 to access the key and values24 """25 return json.loads(error.text)26 def __init__(self, list_id:str):27 """ initialize connection to mailchimp """28 self.mailchimp = MailchimpMarketing.Client()29 self.mailchimp.set_config({30 "api_key": config('MAILCHIMP_KEY'),31 "server": config('MAILCHIMP_SERVER')32 })33 self.list_id = list_id34 def __str__(self):35 """ string representation of the newsletter object """36 return f"Newsletter object for {self.list_id}"37 38 def __eq__(self, other_object):39 """ compares two newsletter objects """40 if isinstance(other_object, Newsletter):41 return self.list_id == other_object.list_id42 return False43 def add_member(self, is_exist_subscribe: bool=True, **member_info):44 """ 45 adds member to the audience list with the member_info.46 is_exist_subscribe determines whether user whose email_address47 already exist in the list should be updated to change their status to48 subscribed. This is useful as users who have unsubscribed might want49 to subscribe back. Moreso, when a user unsubsribed their email_address50 isn't removed from the list only their status is changed. 51 More info on mailchimp docs as regards status and removing members.52 """53 try:54 response = self.mailchimp.lists.add_list_member(self.list_id, member_info)55 return 'added'56 except ApiClientError as error:57 response = self.get_member_info(self.get_email_hash(member_info['email_address']))58 error = Newsletter.serialize_error(error)59 if error['title'] == 'Member Exists' and response['status'] == 'unsubscribed':60 if is_exist_subscribe:61 return self.subscribe(response['contact_id'])62 return error['title']63 def get_all_members(self):64 """ gets all the members in a list """65 try:66 response = self.mailchimp.lists.get_list_members_info(self.list_id)67 return response68 except ApiClientError as error:69 error = Newsletter.serialize_error(error)70 return error71 def get_member_info(self, member_email_hash):72 """ gets info of a specific member """73 try:74 response = self.mailchimp.lists.get_list_member(self.list_id, member_email_hash)75 return response76 except ApiClientError as error:77 error = Newsletter.serialize_error(error)78 return error79 def update_contact(self, member_email_hash, **member_info):80 """ update the member info of the member that owns the email_hash """81 try:82 response = self.mailchimp.lists.update_list_member(self.list_id, member_email_hash, member_info)83 return 'updated'84 except ApiClientError as error:85 error = Newsletter.serialize_error(error)86 return error87 88 def subscribe(self, member_email_hash):89 """ change a specific member status to subscribed """90 return self.update_contact(member_email_hash, status='subscribed')91 def unsubscribe(self, member_email_hash):92 """ change a specific member status to unsubscribed """93 return self.update_contact(member_email_hash, status='unsubscribed')94list_id = 'a8eb3204d1'...
Search.py
Source:Search.py
...15 # set the CVE as uppercase16 self.id = self.id.upper()17 # check if valid CVE18 if not self.id.startswith('CVE-'):19 response = utility.serialize_error(False, self.id, "Not a valid CVE identifier")20 return response21 # load CVE data22 response = json.loads(Information(self.id).get_info())23 exploits = json.loads(Exploitation(self.id).get_exploits())24 if response['description'] != []:25 # new tag added to search whenever an exploits is available26 if exploits['exploitation'] != []:27 response.update(exploits)28 return utility.serialize_data(response)29 def search_cwe(self):30 """ basic search CWE identifiers """31 # set the CWE as uppercase32 self.cwe = self.id.upper()33 # check if valid CWE34 if not self.cwe.startswith('CWE-'):35 response = utility.serialize_error(False, self.id, "Not a valid CWE identifier")36 return response37 # query the database38 self.cur.execute("SELECT title,class,link FROM cwe_db WHERE cwe_id=? ", (self.cwe,))39 cwe_data = self.cur.fetchone()40 if cwe_data:41 # set the CWE data42 title = cwe_data[0]43 cwe_class = cwe_data[1]44 url = cwe_data[2]45 # query the database46 self.cur.execute("SELECT cve_id from map_cwe_cve where cwe_id=? ORDER BY cve_id DESC", (self.cwe,))47 data = self.cur.fetchall()48 if data:49 # init dict50 cve = []51 for cve_id in data:52 cve.append(cve_id[0])53 # set the response54 response = {"id": self.cwe, "parameters": {"title": title, "class": cwe_class, "url": url},55 "vulnerability": cve}56 return utility.serialize_data(response)57 def search_cpe(self):58 """ basic search for CPE identifiers """59 if not self.id.startswith("cpe:/") and not self.id.startswith("cpe:2.3:"):60 response = utility.serialize_error(False, self.id, "Not a valid CPE identifier")61 return response62 # check whether is CPE 2.2 or CPE 2.363 if "cpe:2.3" in self.id:64 col = "cpe23_id"65 if "cpe:/" in self.id:66 col = "cpe_id"67 # query the database68 self.cur.execute("SELECT cve_id FROM map_cpe_cve WHERE {tn} = ? ORDER BY cve_id DESC".format(tn=col),69 self.query)70 data = self.cur.fetchall()71 if data:72 # init dict73 cve = []74 for cve_id in data:...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!