Best Python code snippet using toolium_python
MTADownloader.py
Source:MTADownloader.py
1from bs4 import BeautifulSoup2import os, requests, sys, random, time, zipfile, urllib3, io3class MTADownloader():4 url = 'https://www.malware-traffic-analysis.net/'5 folder_name = 'Malware_Pcaps'6 downloaded_zip = "pcaps_zip_mta_downloaded.txt"7 mta_pwd = ["infected"]8 def __init__(self, number_of_pcaps_download=10, randomly=True, save_links_to_pcap=True, decompress=True, file_name="pcaps_mta_link.txt", save_downloaded=True, delete_zip=True):9 self.random = randomly10 self.pcaps_to_download = number_of_pcaps_download11 self.save_links = save_links_to_pcap12 self.decompress = decompress13 self.links_to_pcaps = file_name14 self.save_downloaded = save_downloaded15 self.pcap_name = []16 self.delete_zip = delete_zip17 self.save_downloaded_file = None18 assert self.random is False or self.random is True, "Valore randomly non valido. Inserire un Booleano."19 assert self.pcaps_to_download >= 0, "Valore per number_of_pcaps_download non valido. Inserire un valore maggiore o uguale a zero."20 def createFolder(folder_name):21 if not os.path.exists(folder_name):22 os.makedirs(folder_name)23 else:24 pass25 def create_downloaded_file(downloaded_zip):26 if(os.path.isfile(downloaded_zip) is True):27 print("\nHistory file with pcaps downloaded already exists.\n")28 self.save_downloaded_file = open(downloaded_zip, "a")29 else:30 self.save_downloaded_file = open(downloaded_zip, "a+")31 createFolder(self.folder_name)32 if(self.save_downloaded):33 create_downloaded_file(self.downloaded_zip)34 def get_link_to_pcaps_file_name(self):35 return self.links_to_pcaps36 def get_folder_name(self):37 return self.folder_name38 def get_zips_downloaded_file(self):39 return self.downloaded_zip40 def get_links_in_main_page(self):41 def get_html(self):42 try:43 result = requests.get(self.url)44 bf = BeautifulSoup(result.content)45 return bf46 except requests.exceptions.RequestException as e:47 print(e)48 bf_html = get_html()49 generic_links = []50 for li in bf_html.findAll("li"):51 for a in li.findAll("a"):52 generic_links.append(a["href"])53 return generic_links54 def get_links_to_annual_archives(self):55 main_page_links_array = self.get_links_in_main_page()56 links_annual_archives = []57 for link in main_page_links_array:58 try:59 int(link[:4])60 links_annual_archives.append(self.url + link)61 except Exception as e:62 print (e)63 return links_annual_archives64 def get_links_to_pcap(self):65 links_to_annual_archives = self.get_links_to_annual_archives()66 def get_html(url):67 try:68 result = requests.get(url)69 bf = BeautifulSoup(result.content)70 return bf71 except requests.exceptions.RequestException as e:72 print(e)73 links_to_pcap = []74 for link in links_to_annual_archives:75 annual_pages = get_html(link)76 for ul in annual_pages.findAll("ul"):77 for li in ul.findAll("li"):78 for a in li.findAll("a", {"class": "list_header"}):79 head, step, tail = link.partition("/index")80 link_head = head + "/"81 links_to_pcap.append(link_head + a["href"])82 return links_to_pcap83 def get_links_for_download_pcaps(self):84 if(self.pcaps_to_download == 0):85 print("\n" + "Attenzione! Scaricherai tutto. L' operazione potrebbe richiedere molto tempo" + "\n")86 else:87 pass88 def get_html(url):89 try:90 result = requests.get(url)91 bf = BeautifulSoup(result.content)92 return bf93 except requests.exceptions.RequestException as e:94 print(e)95 links_to_pcap = self.get_links_to_pcap()96 number_of_pcap_in_site = len(links_to_pcap)97 array_of_link_for_download = []98 if (self.pcaps_to_download > 0 and self.pcaps_to_download <= number_of_pcap_in_site):99 if (self.random):100 pcap_trovato = False101 index = 0102 while (len(array_of_link_for_download) != self.pcaps_to_download):103 rnd = random.randint(0, number_of_pcap_in_site)104 html = get_html(links_to_pcap[rnd])105 for a in html.findAll("a", {"class": "menu_link"}, href=True):106 pcap_trovato = False107 href = a["href"]108 if (href.find("pcap") != -1 or href.find("pcaps") != -1):109 head, step, tail = links_to_pcap[rnd].partition("/index")110 link = head + "/" + href111 link.replace(" ", "")112 if link in array_of_link_for_download:113 pass114 else:115 pcap_trovato = True116 array_of_link_for_download.append(link)117 break118 else:119 pass120 if (not pcap_trovato):121 print("La pagina " + links_to_pcap[index] + " non contiene .pcap. Saltata." + "\n")122 else:123 pass124 index = index + 1125 else:126 pcap_trovato = False127 index = 0128 while (len(array_of_link_for_download) != self.pcaps_to_download):129 html = get_html(links_to_pcap[index])130 for a in html.findAll("a", {"class": "menu_link"}, href=True):131 pcap_trovato = False132 href = a["href"]133 if (href.find("pcap") != -1 or href.find("pcaps") != -1):134 head, step, tail = links_to_pcap[index].partition("/index")135 link = head + "/" + href136 link.replace(" ", "")137 array_of_link_for_download.append(link)138 pcap_trovato = True139 break140 else:141 pass142 if (not pcap_trovato):143 print("La pagina " + links_to_pcap[index] + " non contiene .pcap. Saltata." + "\n")144 else:145 pass146 index = index + 1147 elif (self.pcaps_to_download == 0 or self.pcaps_to_download > number_of_pcap_in_site):148 for i in range(0, number_of_pcap_in_site):149 pcap_trovato = False150 html = get_html(links_to_pcap[i])151 for a in html.findAll("a", {"class": "menu_link"}, href=True):152 pcap_trovato = False153 href = a["href"]154 if (href.find("pcap") != -1 or href.find("pcaps") != -1):155 head, step, tail = links_to_pcap[i].partition("/index")156 link = head + "/" + href157 link.replace(" ", "")158 array_of_link_for_download.append(link)159 pcap_trovato = True160 break161 else:162 pass163 if (not pcap_trovato):164 print("La pagina " + links_to_pcap[i] + " non contiene .pcap. Saltata." + "\n")165 else:166 pass167 else:168 print("Errore generico. Programma terminato" + "\n")169 sys.exit()170 if (self.save_links is True):171 if(self.check_if_file_with_links_exists() is True):172 print("Il file gia' esiste. Cancellarlo e rieseguire.")173 else:174 writer = open(self.links_to_pcaps, "a+")175 for link in array_of_link_for_download:176 writer.write(link + "\n")177 writer.close()178 return array_of_link_for_download179 def download_pcap(self, array_of_links_to_download):180 self.pcap_name.clear()181 def check_url(url_to_validate):182 i = 0183 urlValid = False184 while True:185 try:186 page = requests.get(url_to_validate)187 urlValid = True188 except Exception as e:189 print(e)190 urlValid = False191 i += 1192 if (i == 5):193 break194 else:195 print("\nConnection refused by the server...\n")196 print("\nLet me sleep for 5 seconds\n")197 time.sleep(5)198 print("\nRetrying...\n")199 continue200 break201 return urlValid202 def check_if_file_is_already_downloaded(name_of_file):203 os.chdir(os.getcwd() + "/" + self.folder_name + "/")204 name_of_file = name_of_file.replace("\n", "")205 if (os.path.isfile(name_of_file)):206 os.chdir("..")207 return True208 else:209 os.chdir("..")210 return False211 def add_link_to_dowloaded(link):212 self.save_downloaded_file.write(link + "\n")213 self.save_downloaded_file.flush()214 return215 for link in array_of_links_to_download:216 if(link.endswith("\n")):217 link = link.replace("\n", "")218 name_of_pcap_zip = os.path.basename(link)219 if(name_of_pcap_zip.endswith("\n")):220 name_of_pcap_zip = name_of_pcap_zip.replace("\n", "")221 #os.chdir(os.getcwd() + "/" + self.folder_name + "/")222 if (not check_if_file_is_already_downloaded(name_of_pcap_zip)):223 if (check_url(link)):224 try:225 pcap_file = requests.get(link, stream=True)226 print("\nSto scaricando " + name_of_pcap_zip)227 os.chdir(os.getcwd() + "/" + self.folder_name + "/")228 with open(name_of_pcap_zip, "wb") as local_file:229 local_file.write(pcap_file.content)230 print("\nDownload Completato" + "\n")231 os.chdir("..")232 if(self.check_if_link_is_in_downloaded_file(link) is False):233 if (self.save_downloaded is True):234 add_link_to_dowloaded(link)235 print("\nLink aggiunto alla lista degli scaricati.\n")236 else:237 print("\nLink gia' presente alla lista degli scaricati. Non aggiunto.\n")238 except requests.exceptions.RequestException as e:239 print(e)240 print("\nErrore durante il download" + "\n")241 else:242 print("\n" + link + ": URL non valido" + "\n")243 else:244 print("\n" + name_of_pcap_zip + " gia presente! Non sara' scaricato" + "\n")245 if(self.decompress is True):246 os.chdir(os.getcwd() + "/" + self.folder_name + "/")247 for pwd in self.mta_pwd:248 estratto = False249 try:250 zip = zipfile.ZipFile(name_of_pcap_zip)251 for name in zip.namelist():252 if(os.path.isfile(name) is False):253 print("\nEstraggo " + name + " da " + name_of_pcap_zip + "\n")254 zip.extract(name, pwd=bytes(pwd, "utf-8"))255 self.pcap_name.append(name)256 estratto = True257 print("\nEstrazione " + name + " Completa\n")258 else:259 self.pcap_name.append(None)260 print("\nFile gia' esistente. Non sara' estratto.\n")261 estratto = True262 zip.close()263 except Exception as e:264 print(e)265 estratto = False266 if(estratto is True):267 break268 if (self.delete_zip):269 os.remove(name_of_pcap_zip)270 os.chdir("..")271 return self.get_list_of_pcap_downloaded_and_extracted()272 def get_list_of_pcap_downloaded_and_extracted(self):273 return self.pcap_name274 def check_if_link_is_in_downloaded_file(self, link):275 link_presente = False276 if(link.endswith("\n")):277 link = link.replace("\n", "")278 if(os.path.isfile(self.downloaded_zip) is True):279 reader = open(self.downloaded_zip, "r")280 f = reader.readlines()281 for line in f:282 if(line.endswith("\n")):283 line = line.replace("\n", "")284 if(line == link):285 link_presente = True286 break287 else:288 pass289 return link_presente290 else:291 return False292 def get_pcap_names_by_url(self, array_of_url):293 array_of_names = []294 for url in array_of_url:295 arr_name = url.split("/")296 name = arr_name[len(arr_name)-1]297 if(name.endswith("\n")):298 name = name.replace("\n", "")299 array_of_names.append(name)300 return array_of_names301 def check_if_file_with_links_exists(self):...
wikimedia-commons-bulk-downloader-by-category.py
Source:wikimedia-commons-bulk-downloader-by-category.py
...91 print("An exception occurred " + file_name)92 print(e)93 94# Download file from commons download link 95async def save_downloaded_file(session, dl, index, total_links_count):96 try:97 async with session.get(dl, allow_redirects=True) as audio_request:98 if audio_request.status == 200:99 d_filename = urllib.parse.unquote(dl).split("/")[-1]100 # print(str(index+1) + "/" + str(total_links_count) + " => " + d_filename)101 dest_folder = category.replace(" ", "_")102 file_path = os.path.join(dest_folder, d_filename)103 f = await aiofiles.open(file_path, mode='wb')104 await f.write(await audio_request.read())105 await f.close()106 except BaseException as e:107 print_line()108 print("An exception occurred while downloading " + dl)109 print(e)110# Task to Start getting download links then download files from download links111download_links = []112async def fetch_download_links_and_download_file():113 print_line()114 async with aiohttp.ClientSession() as session:115 tasks = []116 print("Fetching download links....")117 for index, f in enumerate(final_files_list):118 task = asyncio.ensure_future(get_file_download_link(session, f))119 tasks.append(task)120 121 temp_download_links = await asyncio.gather(*tasks)122 download_links = list(filter(None, temp_download_links)) # Filter valid links123 124 print("Total time for Scraping download links: %.2f secs" % (time.time() - s_time))125 total_links_count = len(download_links)126 print("Total download links: " + str(total_links_count))127 128 129 # Start Download Task130 # Download file from commons download link 131 132 # create folder if it does not exist133 dest_folder = category.replace(" ", "_")134 if not os.path.exists(dest_folder):135 os.makedirs(dest_folder)136 137 print_line()138 print("Please wait %s files are Downloading...." % (str(total_links_count))) 139 download_tasks = []140 for index, dl in enumerate(download_links):141 download_task = asyncio.ensure_future(save_downloaded_file(session, dl, index, total_links_count))142 download_tasks.append(download_task)143 download_files = await asyncio.gather(*download_tasks)144 print("Total time for Download files : %.2f secs" % (time.time() - s_time))145 print("Total downloaded files: " + str(len(download_files)))146 print_line()147 148 print("All files are downloaded under: " + dest_folder)149 print_line()...
test.py
Source:test.py
...15 is_download_succesfull = download_zip_file(self.url)16 self.assertTrue(is_download_succesfull)17 def test_downloaded_exists(self):18 #check file exists19 download_file_exists = save_downloaded_file(self.dest_path, self.binary_str)20 self.assertTrue(os.path.exists(self.dest_path))21 22 def test_downloaded_content_equals_expected(self):23 save_downloaded_file(self.dest_path, self.binary_str)24 with open(self.dest_path, 'rb') as file:25 content = file.read()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!