Best Python code snippet using avocado_python
qbit_downloader.py
Source:qbit_downloader.py
...32 self.client = get_client()33 try:34 if ospath.exists(link):35 is_file = True36 self.ext_hash = _get_hash_file(link)37 else:38 is_file = False39 self.ext_hash = _get_hash_magnet(link)40 if is_file:41 op = self.client.torrents_add(torrent_files=[link], save_path=path)42 else:43 op = self.client.torrents_add(link, save_path=path)44 sleep(0.3)45 if op.lower() == "ok.":46 tor_info = self.client.torrents_info(torrent_hashes=self.ext_hash)47 if len(tor_info) == 0:48 if is_file:49 self.ext_hash = _get_hash_file(link, True)50 while True:51 tor_info = self.client.torrents_info(torrent_hashes=self.ext_hash)52 if len(tor_info) > 0:53 break54 elif time() - self.__stalled_time >= 30:55 ermsg = "The Torrent was not added. Report when you see this error"56 sendMessage(ermsg, self.__listener.bot, self.__listener.message)57 self.client.torrents_delete(torrent_hashes=self.ext_hash, delete_files=True)58 self.client.auth_log_out()59 return60 else:61 sendMessage("This is an unsupported/invalid link.", self.__listener.bot, self.__listener.message)62 self.client.torrents_delete(torrent_hashes=self.ext_hash, delete_files=True)63 self.client.auth_log_out()64 if is_file:65 osremove(link)66 return67 if is_file:68 osremove(link)69 tor_info = tor_info[0]70 self.ext_hash = tor_info.hash71 gid = self.ext_hash[:12]72 if getDownloadByGid(gid) is not None:73 sendMessage("This Torrent is already in list.", self.__listener.bot, self.__listener.message)74 self.client.auth_log_out()75 return76 with download_dict_lock:77 download_dict[self.__listener.uid] = QbDownloadStatus(self.__listener, self)78 LOGGER.info(f"QbitDownload started: {tor_info.name} - Hash: {self.ext_hash}")79 self.periodic = setInterval(self.POLLING_INTERVAL, self.__qb_listener)80 if BASE_URL is not None and select:81 if not is_file:82 metamsg = "Downloading Metadata, wait then you can select files or mirror torrent file"83 meta = sendMessage(metamsg, self.__listener.bot, self.__listener.message)84 while True:85 tor_info = self.client.torrents_info(torrent_hashes=self.ext_hash)86 if len(tor_info) == 0:87 return deleteMessage(self.__listener.bot, meta)88 try:89 tor_info = tor_info[0]90 if tor_info.state not in ["metaDL", "checkingResumeData", "pausedDL"]:91 deleteMessage(self.__listener.bot, meta)92 break93 except:94 return deleteMessage(self.__listener.bot, meta)95 self.client.torrents_pause(torrent_hashes=self.ext_hash)96 pincode = ""97 for n in str(self.ext_hash):98 if n.isdigit():99 pincode += str(n)100 if len(pincode) == 4:101 break102 buttons = button_build.ButtonMaker()103 if WEB_PINCODE:104 buttons.buildbutton("Select Files", f"{BASE_URL}/app/files/{self.ext_hash}")105 buttons.sbutton("Pincode", f"qbs pin {gid} {pincode}")106 else:107 buttons.buildbutton("Select Files", f"{BASE_URL}/app/files/{self.ext_hash}?pin_code={pincode}")108 buttons.sbutton("Done Selecting", f"qbs done {gid} {self.ext_hash}")109 QBBUTTONS = InlineKeyboardMarkup(buttons.build_menu(2))110 msg = "Your download paused. Choose files then press Done Selecting button to start downloading."111 sendMarkup(msg, self.__listener.bot, self.__listener.message, QBBUTTONS)112 else:113 sendStatusMessage(self.__listener.message, self.__listener.bot)114 except Exception as e:115 sendMessage(str(e), self.__listener.bot, self.__listener.message)116 self.client.auth_log_out()117 def __qb_listener(self):118 try:119 tor_info = self.client.torrents_info(torrent_hashes=self.ext_hash)120 if len(tor_info) == 0:121 return122 tor_info = tor_info[0]123 if tor_info.state == "metaDL":124 self.__stalled_time = time()125 if TORRENT_TIMEOUT is not None and time() - tor_info.added_on >= TORRENT_TIMEOUT:126 self.__onDownloadError("Dead Torrent!")127 elif tor_info.state == "downloading":128 self.__stalled_time = time()129 if not self.__dupChecked and STOP_DUPLICATE and ospath.isdir(f'{self.__path}') and not self.__listener.isLeech:130 LOGGER.info('Checking File/Folder if already in Drive')131 qbname = str(listdir(f'{self.__path}')[-1])132 if qbname.endswith('.!qB'):133 qbname = ospath.splitext(qbname)[0]134 if self.__listener.isZip:135 qbname = qbname + ".zip"136 elif self.__listener.extract:137 try:138 qbname = get_base_name(qbname)139 except:140 qbname = None141 if qbname is not None:142 qbmsg, button = GoogleDriveHelper().drive_list(qbname, True)143 if qbmsg:144 self.__onDownloadError("File/Folder is already available in Drive.")145 sendMarkup("Here are the search results:", self.__listener.bot, self.__listener.message, button)146 self.__dupChecked = True147 if not self.__sizeChecked:148 size = tor_info.size149 arch = any([self.__listener.isZip, self.__listener.extract])150 if STORAGE_THRESHOLD is not None:151 acpt = check_storage_threshold(size, arch)152 if not acpt:153 msg = f'You must leave {STORAGE_THRESHOLD}GB free storage.'154 msg += f'\nYour File/Folder size is {get_readable_file_size(size)}'155 self.__onDownloadError(msg)156 return157 limit = None158 if ZIP_UNZIP_LIMIT is not None and arch:159 mssg = f'Zip/Unzip limit is {ZIP_UNZIP_LIMIT}GB'160 limit = ZIP_UNZIP_LIMIT161 elif TORRENT_DIRECT_LIMIT is not None:162 mssg = f'Torrent limit is {TORRENT_DIRECT_LIMIT}GB'163 limit = TORRENT_DIRECT_LIMIT164 if limit is not None:165 LOGGER.info('Checking File/Folder Size...')166 if size > limit * 1024**3:167 fmsg = f"{mssg}.\nYour File/Folder size is {get_readable_file_size(size)}"168 self.__onDownloadError(fmsg)169 self.__sizeChecked = True170 elif tor_info.state == "stalledDL":171 if not self.__rechecked and 0.99989999999999999 < tor_info.progress < 1:172 msg = f"Force recheck - Name: {tor_info.name} Hash: "173 msg += f"{self.ext_hash} Downloaded Bytes: {tor_info.downloaded} "174 msg += f"Size: {tor_info.size} Total Size: {tor_info.total_size}"175 LOGGER.info(msg)176 self.client.torrents_recheck(torrent_hashes=self.ext_hash)177 self.__rechecked = True178 elif TORRENT_TIMEOUT is not None and time() - self.__stalled_time >= TORRENT_TIMEOUT:179 self.__onDownloadError("Dead Torrent!")180 elif tor_info.state == "missingFiles":181 self.client.torrents_recheck(torrent_hashes=self.ext_hash)182 elif tor_info.state == "error":183 self.__onDownloadError("No enough space for this torrent on device")184 elif (tor_info.state.lower().endswith("up") or tor_info.state == "uploading") and \185 not self.__uploaded and len(listdir(self.__path)) != 0:186 self.__uploaded = True187 if not QB_SEED:188 self.client.torrents_pause(torrent_hashes=self.ext_hash)189 if self.select:190 clean_unwanted(self.__path)191 self.__listener.onDownloadComplete()192 if QB_SEED and not self.__listener.isLeech and not self.__listener.extract:193 with download_dict_lock:194 if self.__listener.uid not in list(download_dict.keys()):195 self.client.torrents_delete(torrent_hashes=self.ext_hash, delete_files=True)196 self.client.auth_log_out()197 self.periodic.cancel()198 return199 download_dict[self.__listener.uid] = QbDownloadStatus(self.__listener, self)200 update_all_messages()201 LOGGER.info(f"Seeding started: {tor_info.name}")202 else:203 self.client.torrents_delete(torrent_hashes=self.ext_hash, delete_files=True)204 self.client.auth_log_out()205 self.periodic.cancel()206 elif tor_info.state == 'pausedUP' and QB_SEED:207 self.__listener.onUploadError(f"Seeding stopped with Ratio: {round(tor_info.ratio, 3)} and Time: {get_readable_time(tor_info.seeding_time)}")208 self.client.torrents_delete(torrent_hashes=self.ext_hash, delete_files=True)209 self.client.auth_log_out()210 self.periodic.cancel()211 except Exception as e:212 LOGGER.error(str(e))213 def __onDownloadError(self, err):214 self.client.torrents_pause(torrent_hashes=self.ext_hash)215 sleep(0.3)216 self.__listener.onDownloadError(err)217 self.client.torrents_delete(torrent_hashes=self.ext_hash, delete_files=True)218 self.client.auth_log_out()219 self.periodic.cancel()220def get_confirm(update, context):221 query = update.callback_query222 user_id = query.from_user.id223 data = query.data224 data = data.split(" ")225 qbdl = getDownloadByGid(data[2])226 if not qbdl:227 query.answer(text="This task has been cancelled!", show_alert=True)228 query.message.delete()229 elif user_id != qbdl.listener().message.from_user.id:230 query.answer(text="This task is not for you!", show_alert=True)231 elif data[1] == "pin":232 query.answer(text=data[3], show_alert=True)233 elif data[1] == "done":234 query.answer()235 qbdl.client().torrents_resume(torrent_hashes=data[3])236 sendStatusMessage(qbdl.listener().message, qbdl.listener().bot)237 query.message.delete()238def _get_hash_magnet(mgt: str):239 if 'xt=urn:btmh:' in mgt:240 return re_search(r'(?<=xt=urn:btmh:)[a-zA-Z0-9]+', mgt).group(0)241 else:242 return re_search(r'(?<=xt=urn:btih:)[a-zA-Z0-9]+', mgt).group(0)243def _get_hash_file(path, v2=False):244 with open(path, "rb") as f:245 decodedDict = bdecode(f.read())246 if v2:247 hash_ = sha256(bencode(decodedDict[b'info'])).hexdigest()248 else:249 hash_ = sha1(bencode(decodedDict[b'info'])).hexdigest()250 return str(hash_)251qbs_handler = CallbackQueryHandler(get_confirm, pattern="qbs", run_async=True)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!