Best Python code snippet using playwright-python
download_apks.py
Source:download_apks.py
1from bs4 import BeautifulSoup as soup2from selenium import webdriver3from webdriver_manager.chrome import ChromeDriverManager4import webbrowser5from time import sleep6from os import system, path7from requests import get8from warnings import filterwarnings9from traceback import format_exc1011filterwarnings("ignore", category=DeprecationWarning)1213'''14IMPORTANT:15- THE DOWNLOAD LINK WORKS WITH REQUESTS16- if a file has the format apkm, it can be renamed to .apks17- Also maybe call an os.rename on the downloaded file to make it's version more clear (or not to preserve original formatting)18- Do the below url instead where there are 4 pages but there is every bit of vanced software:19 - https://www.apkmirror.com/uploads/?devcategory=team-vanced20- The sleeps are because cloudflare is rate limiting and preventing the program from getting the request data21- Download rate limiting began around getting download #4822- I become unable to get any pages after download #5923'''2425'''26TODO:27- Instead of using the browser to automatically download the apks, click the "here" link in the downloads page to get the28'''2930RATE_LIMIT_PAUSE = .53132def main(url):33 # Setting up the webdriver34 options = webdriver.ChromeOptions()35 options.add_experimental_option("prefs", {"download.default_directory": "apks", "download_restrictions": 3})36 options.add_extension("ublock-origin.crx")37 driver = webdriver.Chrome(options=options)38 apks_list_final = []39 dl_urls_list = []4041 # Getting a list of all download urls42 if not path.exists("apks_list.txt"):43 for i in range(4):44 driver.get(url if i == 0 else url.replace("/uploads", f"/uploads/page/{i+1}"))45 sleep(RATE_LIMIT_PAUSE)46 apks_list = soup(driver.page_source, "html.parser")47 apks_list = apks_list.find_all("div", {"class": "iconsBox"})4849 for elm in apks_list:50 try: apks_list_final.append("https://apkmirror.com" + elm.find_all("a")[1].get("href"))51 except IndexError: pass52 else:53 apks_list_final = open("apks_list.txt").read().split("\n")54 55 system("cls")5657 open("apks_list.txt", "w+").write("\n".join(apks_list_final))5859 try: apks_list_begin = len(open("download_urls_incomplete.txt").readlines())60 except: apks_list_begin = 06162 if not path.exists("download_urls.txt"):63 for i, link in enumerate(apks_list_final[apks_list_begin:]):64 #pause()65 driver.get(link + "#download")66 dl_name = link.split("/")[-2]67 print(f"{i+1+apks_list_begin}/{len(apks_list_final)}) Downloading {dl_name}")68 dl_url = None6970 try:71 dl_url = driver.find_element_by_class_name("downloadButton")#.get_attribute("href") # Clicking the download button72 if dl_url.tag_name != "a": 73 dl_url = None74 raise Exception7576 driver.get(dl_url)#driver.execute_script(f"window.open(\"{dl_url}\");")77 print("Found immediate dl button")78 #print("-" * 20)79 continue80 except:81 dl_url = None82 print("No immediate dl button found")83 pass84 85 if not dl_url:86 print("Navigating to dl page")87 accent_color_elms = driver.find_elements_by_class_name("accent_color")88 dl_page_navigated = False89 for url in accent_color_elms:90 cur_accent_url = url.get_attribute("href")91 if cur_accent_url is not None and dl_name in cur_accent_url and "apk-download" in cur_accent_url:92 dl_page_navigated = True93 url.click()94 sleep(RATE_LIMIT_PAUSE)95 break9697 if not dl_page_navigated:98 print("Could not navigate to download page!", accent_color_elms)99 print("-" * 20)100 continue101 #driver.find_elements_by_class_name("accent_color")[-1].click() # Getting to the download page102 print("Clicking dl button")103 dl_url = driver.find_element_by_class_name("downloadButton").get_attribute("href") # Clicking the download button104 driver.get(dl_url)105 sleep(RATE_LIMIT_PAUSE)106107 # Getting the apk download url that works with requests108 print("Getting static download url")109 cur_href = ""110 #e = "\n".join([str(i.get_attribute("href")) for i in driver.find_elements_by_tag_name("a")[::-1]])111 #print(e)112 for url in driver.find_elements_by_tag_name("a")[::-1]:113 cur_href = url.get_attribute("href")114 if "download.php" in str(cur_href):115 #driver.get(cur_href)#dl_url = url.get_attribute("href")116 sleep(RATE_LIMIT_PAUSE) 117 dl_url = cur_href if "wp-content" in cur_href.lower() else driver.current_url118 print("Static download url found! Using " + "cur_href" if "wp-content" in cur_href.lower() else "driver.current_url")119 cur_href = None120 break121 122 #print(dl_url)123 dl_url = driver.current_url if cur_href else dl_url124 dl_urls_list.append(f"{i+1+apks_list_begin}) {dl_name} | {dl_url}")125 print(f"Successfully finished on url '{dl_name}'\nDL URL: '{dl_url}'\nCur URL: '{driver.current_url}'")126 sleep(RATE_LIMIT_PAUSE)127 print("-" * 20)128 #break129130 # Below code uses string manipulation to hopefully get a dl url, that appears to return 404 on some dls so I'll be using selenium to navigate to the download button131 '''link = link + link.split("/")[-2].replace("release", "android-apk-download/download") # needs reworking to work with other vanced stuff132 #print(link, "\n")#, link.split("/"), "\n")133 print(f"Downloading {dl_name} | {link}")134 driver.get(link)135 if "<h1>404</h1>" in str(soup(driver.page_source, "html.parser")):136 print(f"404 at {dl_name} | {link}")137 else: print(f"Success downloading {dl_name} | {link}")'''138 # TODO: Close current webdriver, create new one that allows downloads, download all dl urls139 print("\n".join(dl_urls_list))140 open("download_urls.txt", "w+").write("\n".join(dl_urls_list))141 driver.quit()142143 ''' Downloading all the apks '''144 # Setting up the webdriver145 download_options = webdriver.ChromeOptions()146 download_options.add_experimental_option("prefs", {"download.default_directory": "apks"})147 download_driver = webdriver.Chrome(options=download_options)148 # Dictionary containing the download status of each download 149 download_statuses = {}150 if path.exists("download_statuses.txt"):151 with open("download_statuses.txt") as status_file:152 for status in status_file.read().split("\n"):153 cur_num = status.split(": ")[0]154 cur_status = status.split(": ")[-1]155 download_statuses[cur_num] = cur_status156 print("Downloading apks...")157 with open("download_urls.txt") as dl_urls:158 for i, cur_line in enumerate(dl_urls.read().split("\n")):159 if float(i) == 50:160 print("!!!WARNING, CLOUDFLARE WILL PROBABLY BEGIN RATE LIMITING NOW!!!")161 # Getting the string values needed162 cur_url = cur_line.split("| ")[-1]163 cur_num = cur_line.split(")")[0]164 cur_name = cur_line.split(" ")[1]165 print(f"{cur_num}) DOWNLOADING '{cur_name}'...")166167 # Rerunning the download if the entry for it in the download_statuses dict is blocked due to cloudflare168 try:169 if download_statuses[cur_num] == "Cloudflare":170 print("Download was previously blocked by Cloudflare, retrying...")171 else:172 # TODO: Probably check the apks folder to see if the download was actually successful instead of assuming that the dictionary was correct173 # Edit: I don't think that will be possible as the file names are different to the url or 'name'174 print("Download was previously successful, continuing to next download...")175 print("-" * 20)176 continue177 except:178 print("No entry for this download's status exists, continuing download...")179180 # Getting the download page of the apk181 download_driver.get(cur_url)182183 # Detecting if cloudflare has rate limited the program184 if "cloudflare" in download_driver.page_source.lower() or "rate" in download_driver.page_source.lower() or "limit" in download_driver.page_source.lower() or "request" in download_driver.page_source.lower():185 print("ERROR: CLOUDFLARE IS RATE LIMITING!")186 download_statuses[cur_num] = "Cloudflare"187 else:188 print("DOWNLOAD SUCCESSFUL!")189 download_statuses[cur_num] = "Success"190 print("-" * 20)191192 print("Apks downloaded!")193 pause()194 #download_driver.quit()195 # Saving the dictionary to a file196 with open("download_statuses.txt", "w+") as status_file:197 for status in download_statuses:198 status_file.write(f"{status}: {download_statuses[status]}\n")199200if __name__ == "__main__":201 pause = lambda: system("pause")202 try: main("https://www.apkmirror.com/uploads/?devcategory=team-vanced")203 except KeyboardInterrupt: pass204 except Exception as e: print(e, "\n", format_exc())
...
driver_handler.py
Source:driver_handler.py
...53 if driver_path and path_exists(driver_path):54 add_to_path(driver_path)55 else:56 from .downloader import download_driver57 download_driver(CONSTANTS.CHROME_DRIVER, overwrite_existing=False)58 driver = webdriver.Chrome(options=options)59 return driver60 @staticmethod61 def _create_chrome_options(api_obj):62 options = ChromeOptions()63 args = set()64 if api_obj.headless:65 args.add("--headless")66 if api_obj.incognito:67 args.add("--incognito")68 if api_obj.browser_options_args:69 args.update(api_obj.browser_options_args)70 for arg in args:71 if arg:72 options.add_argument(arg)73 if api_obj.http_proxy:74 options.add_argument('--proxy-server=%s' %api_obj.http_proxy)75 if api_obj.browser_options_dict:76 for k, v in api_obj.browser_options_dict.items():77 options.set_capability(k, v)78 return options79class IEHandler(BaseDriverHandler):80 @staticmethod81 def create_driver(driver_path, api_obj):82 options = IEHandler._create_IE_options(api_obj)83 if hasattr(api_obj, "remote_url") and is_valid_url(api_obj.remote_url):84 caps = options.to_capabilities()85 driver = IEHandler.create_remote_driver(api_obj.remote_url, caps)86 return driver87 if driver_path and path_exists(driver_path):88 add_to_path(driver_path)89 else:90 from .downloader import download_driver91 download_driver(CONSTANTS.IE_DRIVER, overwrite_existing=False)92 IEHandler._handle_IE_registry_entry()93 driver = webdriver.Ie(options=options)94 return driver95 #takes care of adding necessary registry entries needed in windows (needs admin privileges)96 @staticmethod97 def _handle_IE_registry_entry():98 if "window" not in os_name().lower():99 return100 try:101 path32 = r"SOFTWARE\Microsoft\Internet Explorer\Main\FeatureControl\FEATURE_BFCACHE" 102 path64 = r"SOFTWARE\Wow6432Node\Microsoft\Internet Explorer\Main\FeatureControl\FEATURE_BFCACHE"103 path = path32 if os_bits() == 32 else path64104 set_winreg_fromlocalmachine(path, "iexplore.exe", 0, overwrite=True)105 except Exception as e:106 get_logger().warning(str(e))107 @staticmethod108 def _create_IE_options(api_obj):109 options = IEOptions()110 args = set()111 #not recommended, ensure correct security settings in IE112 args.add(IEOptions.IGNORE_PROTECTED_MODE_SETTINGS)113 args.add(IEOptions.ENSURE_CLEAN_SESSION)114 args.add(IEOptions.IGNORE_ZOOM_LEVEL)115 args.add(IEOptions.REQUIRE_WINDOW_FOCUS)116 args.add(IEOptions.PERSISTENT_HOVER)117 if api_obj.browser_options_args:118 args.update(api_obj.browser_options_args)119 for arg in args:120 if arg:121 options.add_argument(arg)122 if api_obj.http_proxy:123 proxy = { 'proxyType': "manual", 'httpProxy': str(api_obj.http_proxy)}124 options.set_capability("proxy", proxy)125 options.set_capability(IEOptions.NATIVE_EVENTS, False)126 if api_obj.browser_options_dict:127 for k, v in api_obj.browser_options_dict.items():128 options.set_capability(k, v)129 return options130class FirefoxHandler(BaseDriverHandler):131 @staticmethod132 def create_driver(driver_path, api_obj):133 options = FirefoxHandler._create_firefox_options(api_obj)134 if hasattr(api_obj, "remote_url") and is_valid_url(api_obj.remote_url):135 caps = DesiredCapabilities.FIREFOX.copy()136 if options:137 caps.update(options.to_capabilities())138 driver = FirefoxHandler.create_remote_driver(api_obj.remote_url, caps)139 return driver140 if driver_path and path_exists(driver_path):141 add_to_path(driver_path)142 else:143 from .downloader import download_driver144 download_driver(CONSTANTS.FIREFOX_DRIVER, overwrite_existing=False)145 log_p = os.path.join(log_path(), "geckodriver.log")146 driver = webdriver.Firefox(options=options, service_log_path=log_p)147 return driver148 @staticmethod149 def _create_firefox_options(api_obj):150 options = FirefoxOptions()151 args = set()152 if api_obj.browser_options_args:153 args.update(api_obj.browser_options_args)154 for arg in args:155 if arg:156 options.add_argument(arg)157 options.headless = api_obj.headless158 if api_obj.firefox_binary_path:...
download_client.py
Source:download_client.py
1#!/usr/bin/env python2import sys3import os4import datetime5import argparse6import blessings7import simplejson8import biokbase.Transform.Client9import biokbase.Transform.script_utils10import biokbase.Transform.drivers11if __name__ == "__main__":12 logger = biokbase.Transform.script_utils.stdoutlogger(__file__)13 parser = argparse.ArgumentParser(description='KBase Download demo and client')14 parser.add_argument('--demo', action="store_true")15 parser.add_argument('--ujs_service_url', nargs='?', help='UserandJobState service for monitoring progress', const="", default="https://kbase.us/services/userandjobstate/")16 parser.add_argument('--workspace_service_url', nargs='?', help='Workspace service for KBase objects', const="", default="https://kbase.us/services/ws/")17 parser.add_argument('--awe_service_url', nargs='?', help='AWE service for additional job monitoring', const="", default="http://140.221.67.3:7080")18 parser.add_argument('--transform_service_url', nargs='?', help='Transform service that handles the data conversion to KBase', const="", default="http://140.221.67.3:7778/")19 parser.add_argument('--external_type', nargs='?', help='the external type of the data', const="", default="")20 parser.add_argument('--kbase_type', nargs='?', help='the kbase object type to create', const="", default="")21 parser.add_argument('--workspace_name', nargs='?', help='name of the workspace where your objects should be created', const="", default="upload_testing")22 parser.add_argument('--object_name', nargs='?', help='name of the workspace object to create', const="", default="")23 parser.add_argument('--download_path', nargs='?', help='path to place downloaded files for validation', const=".", default=".")24 parser.add_argument('--config_file', nargs='?', help='path to config file with parameters', const="", default="")25 parser.add_argument('--create_log', help='create pass/fail log file', action='store_true')26 args = parser.parse_args()27 token = biokbase.Transform.script_utils.get_token()28 inputs = list()29 services = dict()30 if not args.demo:31 if args.config_file:32 f = open(args.config_file, 'r')33 config = simplejson.loads(f.read())34 f.close()35 36 services = config["services"]37 inputs = config["download"]38 else:39 inputs = {"user": 40 {"external_type": args.external_type,41 "kbase_type": args.kbase_type,42 "object_name": args.object_name,43 "workspace_name": args.workspace_name,44 "downloadPath": args.download_path45 }46 }47 services = {"shock_service_url": args.shock_service_url,48 "ujs_service_url": args.ujs_service_url,49 "workspace_service_url": args.workspace_service_url,50 "awe_service_url": args.awe_service_url,51 "transform_service_url": args.transform_service_url,52 "handle_service_url": args.handle_service_url}53 workspace = args.workspace_name 54 else:55 if "kbasetest" not in token and len(args.workspace_name.strip()) == 0:56 print "If you are running the demo as a different user than kbasetest, you need to provide the name of your workspace with --workspace."57 sys.exit(0)58 else:59 if args.workspace_name is not None:60 workspace = args.workspace_name61 else :62 workspace = "upload_testing"63 64 f = open("conf/download/download_demo.cfg")65 config = simplejson.loads(f.read())66 f.close()67 services = config["services"]68 inputs = config["download"]69 70 stamp = datetime.datetime.now().isoformat()71 os.mkdir(stamp)72 download_driver = biokbase.Transform.drivers.TransformClientTerminalDriver(service_urls=services)73 74 term = blessings.Terminal()75 for x in inputs:76 external_type = inputs[x]["external_type"]77 kbase_type = inputs[x]["kbase_type"]78 object_name = inputs[x]["object_name"]79 80 if inputs[x].has_key("workspace_name") and inputs[x]["workspace_name"]:81 ws_name = inputs[x]["workspace_name"]82 else:83 ws_name = workspace84 optional_arguments = None85 if inputs[x].has_key("optional_arguments"):86 optional_arguments = inputs[x]["optional_arguments"]87 print "\n\n"88 print "\t", term.bold_underline_bright_magenta("{0}").format(x.upper())89 print "\t", term.bold("#"*80)90 print "\t", term.bold_white_on_black("Converting {0} => {1}".format(kbase_type,external_type))91 print "\t", term.bold("#"*80)92 conversionDownloadPath = os.path.join(stamp, kbase_type + "_to_" + external_type)93 try:94 os.mkdir(conversionDownloadPath)95 except:96 pass97 98 downloadPath = os.path.join(conversionDownloadPath)99 try:100 print term.bold("\tStep 1: Make KBase download request")101 input_object = dict()102 input_object["external_type"] = external_type103 input_object["kbase_type"] = kbase_type104 input_object["workspace_name"] = ws_name105 input_object["object_name"] = object_name106 if optional_arguments is not None:107 input_object["optional_arguments"] = optional_arguments108 else:109 input_object["optional_arguments"] = {"transform": {}}110 transform_client = biokbase.Transform.Client.Transform(url=services["transform_service_url"], token=token) 111 awe_job_id, ujs_job_id = transform_client.download(input_object)112 113 print term.blue("\tTransform service download requested:")114 print "\t\tConverting from {0} => {1}\n\t\tUsing workspace {2} with object name {3}".format(kbase_type,external_type,workspace,object_name)115 print term.blue("\tTransform service responded with job ids:")116 print "\t\tAWE job id {0}\n\t\tUJS job id {1}".format(awe_job_id, ujs_job_id)117 118 job_exit_status = download_driver.monitor_job(awe_job_id, ujs_job_id)119 if not job_exit_status[0]: 120 download_driver.show_job_debug(awe_job_id, ujs_job_id)121 raise Exception("KBase Download exited with an error")122 123 results = download_driver.get_job_results(ujs_job_id)124 125 print "\t", term.bold("Step 2: Grab data from SHOCK\n")126 download_driver.download_from_shock(results["shockurl"], results["results"][0]["id"], downloadPath)127 128 print term.green("\tShock download of {0} successful.\n\n".format(downloadPath))129 except Exception, e:...
chromedriver_update.py
Source:chromedriver_update.py
...48 if zf == 'chromedriver.exe':49 zfile.extract(zf, os.path.dirname(self._chrome_driver_path))50 zfile.close()51 os.remove('download_driver.zip')52 def _download_driver(self, chrome_ver):53 self._shut_down_current_driver()54 target_chromedriver_ver = requests.get(self._chromedriver_url + 'LATEST_RELEASE_' + str(chrome_ver.split('.')[0])).text55 chromedriver_path = self._chromedriver_url + target_chromedriver_ver + '/'56 if self._platform == 'win32':57 self._download_extract_driver(chromedriver_path + 'chromedriver_win32.zip')58 # python2 returns 'linux2'59 elif self._platform == 'linux':60 self._download_extract_driver(chromedriver_path + 'chromedriver_linux64.zip')61 else:62 self._download_extract_driver(chromedriver_path + 'chromedriver_mac64.zip')63 64 def compare_download(self):65 chrome_ver = self._get_current_chrome_ver66 chromedriver_ver = self._get_current_chromedriver_ver67 if chrome_ver.split('.')[0] != chromedriver_ver.split('.')[0]:68 self._download_driver(chrome_ver) 69 return f'Chrome: {chrome_ver}\nChromeDriver: {self._get_current_chromedriver_ver}'70if __name__ == "__main__":...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!