Best Python code snippet using molotov_python
EHThread.py
Source:EHThread.py
1# -*- coding: UTF-82__author__ = 'kz'3FETCH_SLEEP_INTERVAL = 14FETCH_SLEEP_INTERVAL_LONG = 55FETCH_RETRY_COUNT = 36IMAGE_WORKER_THREADS_COUNT = 57import time8import threading9import os10from EHentaiDownloader import Html, Http11class CommonException(Exception):12 """13 Common EHThread exception14 """15 pass16class ThreadTerminatedException(CommonException):17 """18 Tread is terminated simultaneously19 """20 pass21class AbstractEHThread(threading.Thread):22 """23 Abstract class with some common crap24 """25 def __init__(self, queue, name):26 """27 Initialize thread28 :param queue: queue.Queue - images queue29 :param name: string - thread unique name30 """31 from EHentaiDownloader import Environment32 self._app = Environment.Application()33 super().__init__(name=name)34 self._queue = queue35 self._slept = True36 self._stopped = False37 self._retried = 038class PageNavigator(AbstractEHThread):39 """40 Class which represents collecting direct links to images41 """42 def __init__(self, queue):43 """44 Initialize thread45 :param queue: queue.Queue - images queue46 """47 super().__init__(queue, 'PageNavigator')48 self._destination = self._app['destination']49 self._subpages = [self._app['uri']]50 self._parser = Html.EHTMLParser(self._app['uri'])51 self._httpClient = Http.Client()52 self._httpClient.changeUserAgent(self._app['user_agent'])53 self._pages = []54 def run(self):55 """56 Run thread57 """58 from EHentaiDownloader import Environment59 firstIteration = True60 pageNumber = 161 try:62 while self._subpages:63 subpage = self._subpages.pop(0)64 Environment.Log('Fetching subpage %s' % subpage, Environment.LOG_LEVEL_DEBUG)65 fetchFields = ('pages', 'subpages', 'meta') if firstIteration else ('pages',)66 result = self._fetchContent(subpage, fetchFields)67 if firstIteration:68 meta = result['meta']69 if not meta['title']['main']:70 raise CommonException('Gallery has empty title or server response was incorrect. Aborted.')71 Environment.Log('Title: {0[main]} / {0[jap]}'.format(meta['title']), Environment.LOG_LEVEL_INFO)72 for k in meta['additional'].keys():73 Environment.Log('{0:.<20}...{1}'.format(k, meta['additional'][k]), Environment.LOG_LEVEL_INFO)74 self._subpages = result['subpages']75 self._destination = '/'.join((self._destination.rstrip('/'), meta['title']['main']76 .replace('/', '_')))77 os.mkdir(self._destination)78 self._app['destination'] = self._destination79 self._app['total_images'] = meta['count']80 Environment.Log('Found subpages: %s' % repr(self._subpages), Environment.LOG_LEVEL_DEBUG)81 Environment.Log('Found pages: %s' % repr(result['pages']), Environment.LOG_LEVEL_DEBUG)82 self._pages += result['pages']83 while self._pages:84 page = self._pages.pop(0)85 Environment.Log('Fetching page # %d; URI %s' % (pageNumber, page), Environment.LOG_LEVEL_DEBUG)86 image = self._fetchContent(page, ('image',))['image']87 Environment.Log('Found image %s, adding to queue' % image, Environment.LOG_LEVEL_DEBUG)88 image['number'] = pageNumber89 image['destination'] = self._destination90 self._queue.put(image)91 pageNumber += 192 firstIteration = False93 self._queue.join()94 except ThreadTerminatedException:95 pass96 except BaseException as e:97 error = Environment.ErrorInfo(self._name, e)98 self._app.setError(error)99 finally:100 self._app.complete()101 def stop(self):102 """103 Set flag to stop thread as soon as possible104 """105 self._stopped = True106 def _fetchContent(self, uri, fields):107 """108 Download a page and parse out all needed content109 :param uri: string - uri to fetch content from110 :param fields: tuple - tuple of fields to fetch labels111 :return: dict - dictionary with parsed values112 """113 from EHentaiDownloader import Environment114 if self._stopped:115 raise ThreadTerminatedException116 try:117 if not self._slept:118 Environment.Log('Sleeping %f seconds before the next request' % FETCH_SLEEP_INTERVAL,119 Environment.LOG_LEVEL_DEBUG)120 time.sleep(FETCH_SLEEP_INTERVAL)121 result = {}122 self._parser.content = self._httpClient.get(uri)123 for field in fields:124 result[field] = self._parser[field]125 self._slept = False126 self._retried = 0127 return result128 except Html.TemporaryBanException:129 Environment.Log(130 'Temporary ban ocured, sleeping %f seconds before the next request' % FETCH_SLEEP_INTERVAL_LONG,131 Environment.LOG_LEVEL_WARN132 )133 time.sleep(FETCH_SLEEP_INTERVAL_LONG)134 self._slept = True135 except Html.EHTMLParserException as e:136 self._retried += 1137 self._slept = False138 Environment.Log('%s, retry # %d' % (str(e), self._retried), Environment.LOG_LEVEL_WARN)139 if self._retried == self._app['retries']:140 raise e141 return self._fetchContent(uri, fields)142class ImageDownloader(AbstractEHThread):143 """144 Image downloader thread145 """146 def __init__(self, queue, number=0):147 """148 Initialize image download worker thread149 :param queue: queue.Queue - images queue150 :param number: int - worker thread unique id151 """152 super().__init__(queue, 'ImageDownloader-%s' % number)153 def run(self):154 """155 Download images from queue156 """157 from EHentaiDownloader.Environment import ErrorInfo158 while True:159 try:160 self._performTask()161 except BaseException as e:162 self._retried += 1163 if self._retried == self._app['retries']:164 error = ErrorInfo(self._name, e)165 self._app.setError(error)166 break167 self._performTask()168 def _performTask(self):169 """170 Get and perform a single task171 """172 from EHentaiDownloader import Environment173 imageInfo = self._queue.get()174 totalImages = self._app['total_images']175 percents = int(imageInfo['number'] / totalImages * 100)176 Environment.Log('Downloading image #{0}/{1} ({2}%)'.format(imageInfo['number'], totalImages, percents),177 Environment.LOG_LEVEL_INFO, True)178 Environment.Log('Thread: %s; from: %s; to: %s' % (self._name, imageInfo['full_uri'], imageInfo['destination']),179 Environment.LOG_LEVEL_DEBUG)180 httpClient = Http.Client(imageInfo['host'], imageInfo['port'])181 httpClient.changeUserAgent(self._app['user_agent'])182 httpClient.sendRequest(imageInfo['uri'])183 contentType = httpClient.getHeader('Content-type')184 extension = self._getExtensionFromContentType(contentType)185 fileName = '{image[destination]}/{image[number]:0={padding}}.{ext}'.format(186 image=imageInfo, ext=extension, padding=str(totalImages).__len__())187 file = open(fileName, 'wb')188 try:189 while True:190 file.write(httpClient.getChunk())191 except Http.ReadResponseException:192 pass193 file.close()194 httpClient.close()195 self._retried = 0196 self._queue.task_done()197 def _getExtensionFromContentType(self, contentType):198 """199 Try to get file extension according to content-type200 This crap is some kind of suitable for images only,201 but we don't need anything more202 :return: string - corresponding file extension203 """...
index.py
Source:index.py
1from flask import Flask2from flask import render_template, redirect, request3from pymongo import MongoClient4from datetime import datetime, date5from flask import jsonify6from multiprocessing import Pool7import os8import json9from wtforms.fields.core import SelectField10from libs.save_info import *11from libs.excel_parse import *12from libs.user_dao import *13from libs.sms_receive import *14# from libs.detect import *15import pandas as pd16from flask import send_file17from flask_bootstrap import Bootstrap18from flask_wtf import FlaskForm19from wtforms import StringField, SubmitField20from wtforms.validators import DataRequired21app = Flask(__name__)22# Flask-WTF requires an encryption key - the string can be anything23# MONGO_URI = "mongodb://127.0.0.1"24app.config['SECRET_KEY'] = 'C2HWGVoMGfNTBsrYQg8EcMrdTimkZfAb'25app.config['TEMPLATES_AUTO_RELOAD'] = True26Bootstrap(app)27SUBMITTED = False28SUBMITTED_DATE = None29class NameForm(FlaskForm):30 domain = StringField('åå', validators=[DataRequired()])31 country = SelectField('å½å®¶', validators=[DataRequired()], choices=[32 ("china", "China"), ("usa", "USA")])33 submit = SubmitField('Submit')34@app.route("/", methods=['GET', 'POST'])35def hello():36 global SUBMITTED_DATE, SUBMITTED, TOTAL_COUNT37 form = NameForm()38 message = ""39 if SUBMITTED_DATE is not None and date.today() > SUBMITTED_DATE:40 SUBMITTED = False41 TOTAL_COUNT = 042 if form.validate_on_submit():43 if not SUBMITTED:44 domain = form.domain.data45 country = form.country.data46 # empty the form field47 form.domain.data = ""48 form.country.data = ""49 xl = []50 try:51 for i in range(4):52 x = threading.Thread(53 target=save_info, args=(domain, country))54 x.start()55 xl.append(x)56 SUBMITTED = True57 SUBMITTED_DATE = date.today()58 except:59 print("Error: unable to start thread")60 TOTAL_COUNT = 061 return "running..."62 else:63 return "already submitted.."64 return render_template("index.html", form=form)65@app.route("/run/<domain>/<country>/<number>/")66def run(domain, country, number):67 xl = []68 try:69 for i in range(4):70 x = threading.Thread(71 target=save_info, args=(domain, country, number))72 x.start()73 xl.append(x)74 for x in xl:75 x.join()76 TOTAL_COUNT = 077 except:78 print("Error: unable to start thread")79 TOTAL_COUNT = 080 return "runing..."81@app.route("/count")82def count():83 start = datetime.combine(date.today(), datetime.min.time())84 end = datetime.combine(date.today(), datetime.max.time())85 all_count = rdv_page.find({"date": {"$lt": end, "$gte": start}}).count()86 failed_count = len(get_all_failed(rdv_page))87 code_sent = len(get_all_successed(rdv_page))88 return str(date.today()) + f" æ»æ°ï¼{all_count}, 失败: {failed_count}, æå: {code_sent}"89@app.route("/send/<sim_line>/")90def send(sim_line):91 all_user = get_all_user_by_line(person_page, sim_line)92 # _div = [all_user[i:i+4] for i in range(0, len(all_user), 4)]93 count = 094 for p in all_user:95 count += send_request(p)96 fialed = len(all_user) - count97 # for sub_list in _div:98 # _slept = randrange(50)99 # sleep(_slept)100 # # print(len(sub_list))101 # with Pool(4) as p:102 # p.map(send_request, sub_list)103 # with Pool(16) as p:104 # p.map(send_request, all_user)105 # x.start()106 # x.join()107 # send_request(person_db, all_user)108 return f"sim line {sim_line} sent, {fialed} failed."109@app.route("/resend/<sim_line>/")110def resend(sim_line):111 all_user = get_all_failed_by_line(person_page, sim_line)112 # _div = [all_user[i:i+4] for i in range(0, len(all_user), 4)]113 count = 0114 for p in all_user:115 count += send_request(p)116 fialed = len(all_user) - count117 return f"sim line {sim_line} sent, {fialed} failed."118@app.route("/update")119def update():120 all_user = get_all_users(person_page)121 _div = [all_user[i:i+16] for i in range(0, len(all_user), 16)]122 for i, sub_list in enumerate(_div):123 # _slept = randrange(50)124 # sleep(_slept)125 # print(len(sub_list))126 print(i + 1)127 for p in sub_list:128 p["sim_line"] = i + 1129 person_page.replace_one({"_id":p["_id"]}, p,upsert=True)130 # with Pool(16) as p:131 # p.map(send_request, all_user)132 # x.start()133 # x.join()134 # send_request(person_db, all_user)135 return "running..."136# @app.route("/resend")137# def resend():138# all_user = get_all_failed(rdv_page)139 140# _div = [all_user[i:i+16] for i in range(0, len(all_user), 16)]141# # print(len(_div))142# for sub_list in _div:143# _slept = randrange(50)144# sleep(_slept)145# # print(len(sub_list))146# with Pool(16) as p:147# p.map(resend_request, sub_list)148# # print(len(all_user))149# # resend_request(all_user[0], rdv_page)150# # with Pool(1) as p:151# # p.map(resend_request, all_user)152# # x.start()153# # x.join()154# # send_request(person_db, all_user)155# return "resending..."156@app.route("/detect")157def detect():158 all_rdv = get_all_successed(rdv_page)159 # rdv_object = rdv_page.find_one({"rdv_list": {"$elemMatch": {"phone_number": "33769142022"}}})160 # send_sms(rdv_page, "33758145465")161 for p in all_rdv:162 url = p["url"].replace("/register/", "/")163 print(url)164 detect_ok(url, p)165 # sleep(100)166 # print(rdv_object)167 return "dict(rdv_object)"168@app.route("/test")169def test():170 # rdv_object = rdv_page.find_one({"rdv_list": {"$elemMatch": {"phone_number": "33769142022"}}})171 send_sms(rdv_page, "33758145465")172 # print(rdv_object)173 return "dict(rdv_object)"174@app.route("/sms_input/<telephone_num>/<sms_text>/<date>/<sms_port>/", methods=['GET'])175def sms_input(telephone_num, sms_text, date, sms_port):176 print(telephone_num, sms_text, date, sms_port)177 save_sms_code(rdv_page, telephone_num, sms_text, sms_port, date)178 send_sms(rdv_page, telephone_num)179 resp = jsonify(success=True)180 return resp181@app.route("/upload", methods=['GET', 'POST'])182def upload_file():183 if request.method == 'POST':184 # print(request.files['file'])185 f = request.files['file']186 f.save(os.path.join("./assects/uploads", f.filename))187 people_list, data_file = parse_excel(f)188 insert_user(db_page=person_page, person_list=people_list)189 return data_file.to_html()190 return '''191 <!doctype html>192 <title>Upload an excel file</title>193 <h1>Excel file upload (csv, tsv, csvz, tsvz only)</h1>194 <form action="" method=post enctype=multipart/form-data>195 <p><input type=file name=file><input type=submit value=Upload>196 </form>197 '''198@app.route("/csv")199def csv():200 start = datetime.combine(date.today(), datetime.min.time())201 end = datetime.combine(date.today(), datetime.max.time())202 # Make a query to the specific DB and Collection203 cursor = rdv_page.find({"date": {"$lt": end, "$gte": start}, "status":True, "code_sent":True})204 # Expand the cursor and construct the DataFrame205 df = pd.DataFrame(list(cursor))206 # Delete the _id207 if '_id' in df:208 del df['_id']209 df.to_csv('person.csv', index=False)...
state.py
Source:state.py
1from game.states.manager import GameStateManager2from game.ui.component import Component3class RegisterGameState(type):4 """5 Meta class to register child classes of GameState with the6 GameStateManager.7 """8 def __init__(cls, name, bases, clsdict):9 super().__init__(name, bases, clsdict)10 if cls.ID != 'game_state':11 GameStateManager.register(cls)12class GameState(metaclass=RegisterGameState):13 """14 A base class for defining different game states. The flow of the game is15 managed by the `GameStateManager` which transitions the game between16 various `GameState`s.17 """18 # The ID of the state19 ID = 'game_state'20 # Possible statuses for a game state21 READY = 022 ACTIVE = 123 PAUSED = 224 # UI enabled and visible on pause25 PAUSED_UI_ENABLED = False26 PAUSED_UI_VISIBLE = False27 def __init__(self, game_state_manager):28 self._game_state_manager = game_state_manager29 self._status = self.READY30 self._ui_root = None31 self._slept = None32 @property33 def game(self):34 return self.game_state_manager.game35 @property36 def game_state_manager(self):37 return self._game_state_manager38 @property39 def paused(self):40 return self._status == self.PAUSED41 @property42 def status(self):43 return self._status44 @property45 def ui_root(self):46 return self._ui_root47 def enter(self, **kw):48 """49 Called when the game enter the state which then becomes the active50 state.51 """52 assert self.status == self.READY, \53 'State can only be entered if status is READY.'54 # Set the state to active55 self._status = self.ACTIVE56 # Add root UI component57 self._ui_root = Component()58 self._ui_root.add_tag('state_root')59 self._ui_root.bottom = 060 self._ui_root.right = 061 self.game.ui_root.add_child(self._ui_root)62 # Reset slept63 self._slept = None64 def leave(self):65 """66 Called when the game leaves this state.67 """68 assert self.status == self.ACTIVE or self.status == self.PAUSED, \69 'State can only be left if status is ACTIVE or PAUSED.'70 # Remove root UI component71 self.game.ui_root.remove_child(self._ui_root)72 del self._ui_root73 # Set state to ready74 self._status = self.READY75 def pause(self, new_state_id):76 """77 Called when the game pauses this state before transitioning to78 another.79 """80 assert self.status == self.ACTIVE, \81 'State can only be paused if status is ACTIVE.'82 self.ui_root.enabled = self.__class__.PAUSED_UI_ENABLED83 self.ui_root.visible = self.__class__.PAUSED_UI_VISIBLE84 self._status = self.PAUSED85 def resume(self, **kw):86 """Called when the game transitiong back to the paused state"""87 assert self.status == self.PAUSED, \88 'State can only be resumed if status is PAUSED.'89 self.ui_root.enabled = True90 self.ui_root.visible = True91 self._status = self.ACTIVE92 def input(self, char):93 """Handle the given user input for the state"""94 if chr(char) == 'q':95 # @@ This should switch us to a confirmation screen96 self.game.quit = True97 if self.ui_root:98 self.ui_root.input(char)99 def update(self, dt):100 """101 Update the state based on the time ellapsed between now and the102 previous call to update.103 """104 if self._slept is not None:105 self._slept -= dt106 if self._slept <= 0:107 self._slept = None108 if self.ui_root:109 self.ui_root.update(dt)110 def render(self):111 """Render the state's visual representation to the screen"""112 if self.ui_root:113 self.ui_root.render(self.game.main_window)114 # Helpers115 def slept(self, seconds):116 """117 Slept provides a simple mechanism to execute a block of code at118 regular intervals (the given number of seconds) while not blocking119 the state. The first call to slept will always return True, subsequent120 calls will only return True once the the given period of time has121 past.122 NOTE: Slept isn't designed to be accurate, it relies on the delta (dt)123 set in update and therefore is susceptible to other blocking code.124 """125 if self._slept is None:126 self._slept = seconds127 return True...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!