Best Python code snippet using playwright-python
main.py
Source:main.py
1"""."""2import os3import ast4import atexit5from functools import wraps6from datetime import datetime, timedelta7#from zoneinfo import ZoneInfo # alternative to thirdparty pytz8# from apscheduler.schedulers.background import BackgroundScheduler9import flask10from flask import Flask, render_template, request11import firebase_admin12from firebase_admin import db13from firebase_admin import auth14from firebase_admin import credentials15import google.oauth2.id_token16from google.auth.transport import requests17from werkzeug.utils import redirect18from logger import GCPLogging19app = Flask(__name__)20CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "").strip()21CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "").strip()22certs = os.getenv('GOOGLE_APPLICATION_CREDENTIALS', '').strip()23#certs = certs1.strip()24parsed_json = ast.literal_eval(str(certs))25cred = credentials.Certificate(parsed_json)26firebase_admin.initialize_app(cred, {27 'databaseURL':28 'https://dailyquest-9d678-default-rtdb.firebaseio.com/'29})30app.secret_key = os.getenv("secret_key")31logme = GCPLogging(parsed_json)32# Disable them in production33if os.getenv("dev") == "local":34 os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1"35 os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"36firebase_request_adapter = requests.Request()37@app.before_request38def before_request():39 if 'DYNO' in os.environ: # Only runs when on heroku40 logme.info(f"Headers: {request.headers=}")41 if request.url.startswith('http://') and request.headers.get(42 'X-Forwarded-Proto', "") == "http":43 url = request.url.replace('http://', 'https://', 1)44 code = 30145def tomorrow():46 next_date = datetime.today() + timedelta(days=1)47 return next_date.strftime("%Y/%m/%d")48def get_date():49 """Returns date time for url format in firebase"""50 return (datetime.now(ZoneInfo('Asia/Kolkata')) +51 timedelta(hours=6, minutes=30)).strftime("%Y/%m/%d")52def get_user_details():53 id_token = request.cookies.get("token")54 user_data = {}55 flg = False56 if id_token:57 try:58 user_data = google.oauth2.id_token.verify_firebase_token(59 id_token, firebase_request_adapter)60 flg = True61 except Exception as error:62 logme.error(f"Sorry, Error in getting user_data: {error}")63 return flg, user_data64def create_topics(topic):65 if not get_topic(topic):66 ref = db.reference("/Topic_List")67 ref.push({68 'name': topic69 })70def get_topics():71 ref = db.reference("/Topic_List")72 return {key: value['name'] for key, value in ref.get().items()}73def get_topic(topic):74 ref = db.reference("/Topic_List")75 topic_id = ""76 for key in ref.order_by_child('name').equal_to(topic).get():77 topic_id = key78 return topic_id79def create_subtopics(topic_id, subtopic):80 ref = db.reference(f"/Topic_List/{topic_id}/sub_topics")81 ref.push(subtopic)82def is_role(role):83 logme.info(f"{role} in is_role")84 return True if is_allowed(role) else False85def is_allowed(role):86 """."""87 flg, user_data = get_user_details()88 logme.info(f"in is_allowed: {flg=}\n{user_data=}")89 if not flg and not user_data:90 return False91 user_email = user_data.get('email', "")92 user_id = user_data.get('user_id', "")93 ref_url = f"/access_control_list/{role}"94 return (user_id, user_email) in db.reference(ref_url).get().items()95def need_role(role):96 def decorator(fun):97 @wraps(fun)98 def wrapper(*args, **kwargs):99 if is_allowed(role):100 return fun(*args, **kwargs)101 return flask.redirect('/')102 return wrapper103 return decorator104def auth_required(func):105 """."""106 def _inner_(*args):107 """."""108 user_data = ""109 try:110 id_token = request.cookies.get("token")111 logme.info(f"{id_token=}")112 except ValueError as ve:113 logme.error(f"{ve=}")114 id_token = None115 if id_token:116 try:117 while not user_data:118 user_data = google.oauth2.id_token.verify_firebase_token(119 id_token, firebase_request_adapter)120 result = func(user_data=user_data, *args)121 logme.info(f"{user_data=}")122 return result123 except Exception as error:124 logme.error(f"Failed in auth_required: {error=}")125 user_data = None126 logme.error(f"redirecting to logout as {id_token=} and {user_data=}")127 return redirect("/logout")128 # Renaming the function name:129 _inner_.__name__ = func.__name__130 return _inner_131# def get_topics():132# logme.info("inside get_topics")133# ref = db.reference("/QuestionBank")134# logme.info("Lets get the topics from QuestionBank")135# topics = tuple(ref.get('QuestionBank')[0].keys())136# logme.info(f"Topics {topics=}")137# return topics138def get_todays_quest(topic):139 logme.info("inside get_todays_quest")140 name = quest = None141 today = get_date()142 logme.info(f"Todays date: {today}")143 ref = db.reference(f"/Topics/{topic}/{today}")144 logme.info(f"2. Todays date: {today} - {ref=}")145 qid = ""146 if ref:147 logme.info("Lets start to find the qid from topics")148 logme.info(f"Lets get from Topics/{topic}/{today} with {ref=}")149 qid = ref.get('qid')150 logme.info(f"{qid=} and {None not in qid}")151 if qid and None not in qid:152 qid = qid[0].get('qid') if ref else None153 logme.info(f"{qid=}..")154 url = f"/QuestionBank/{topic}/{qid}"155 logme.info(f"{url=}")156 ref = db.reference(url)157 if ref:158 logme.info(f"{ref=}")159 data = ref.get()160 logme.info(f"{data=}")161 if data and None not in data:162 name = data.get("name", "")163 quest = data.get("quest", "")164 logme.info(f">{name=}")165 logme.info(f"{name=},\n{quest=}\n{qid=}")166 logme.info(f"{qid=}, {name=}, {quest=}")167 return qid, name, quest168def get_latest_quest(topic_id):169 logme.info("inside get_todays_quest")170 name = quest = None171 # date = get_date()172 # logme.info(f"Todays date: {today}")173 past = 0174 while not (qid := get_value(f"/Topics/{topic_id}/{get_date(past)}", "qid", False)):175 past -= 1176 # Now lets find the quest details to display177 logme.info("Now lets find the quest details to display")178 url = f"/QuestionBank/{topic_id}/{qid}"179 name = get_value(url, "name", "")180 quest = get_value(url, "quest", "")181 return qid, name, quest, get_date(past)182 # ref = db.reference(f"/Topics/{topic}/{today}")183 # logme.info(f"2. Todays date: {today} - {ref=}")184 # qid = ""185 # if ref:186 # logme.info("Lets start to find the qid from topics")187 # logme.info(f"Lets get from Topics/{topic}/{today} with {ref=}")188 # qid = ref.get('qid')189 # logme.info(f"{qid=} and {None not in qid}")190 # if qid and None not in qid:191 # qid = qid[0].get('qid') if ref else None192 # logme.info(f"{qid=}..")193 # url = f"/QuestionBank/{topic}/{qid}"194 # logme.info(f"{url=}")195 # ref = db.reference(url)196 # if ref:197 # logme.info(f"{ref=}")198 # data = ref.get()199 # logme.info(f"{data=}")200 # if data and None not in data:201 # name = data.get("name", "")202 # quest = data.get("quest", "")203 # logme.info(f">{name=}")204 # logme.info(f"{name=},\n{quest=}\n{qid=}")205 # logme.info(f"{qid=}, {name=}, {quest=}")206 # return qid, name, quest207def get_solution(topic: str, qid: str) -> str:208 ref = db.reference(f"/QuestionBank/{topic}/{qid}")209 return ref.get() if ref else None210@app.route("/submit_solution", methods=['POST'])211@auth_required212def submit_solution(user_data=None):213 """."""214 logme.info(f"{user_data=}")215 proposed_solution = request.form['proposed_solution']216 topic, qid, quest_date = request.form['quest_date'].split("::")217 user_id = user_data.get("user_id", "")218 month, date = quest_date.rsplit("/", 1)219 url = f"/history/{topic}/{month}"220 ref = db.reference(url)221 ref.push({222 "date": date,223 "uid": user_id,224 "q_id": qid,225 "proposed": proposed_solution,226 "result": "Not Evaluated" # Not Evaluated, Fail, Pass227 })228 return """<html><body>Thanks for Submitting the solution,229 click <a href='/'> me </a> to return back to home</body></html>"""230@app.route("/my_history", methods=["GET", "POST"])231@auth_required232def my_history(user_data=None):233 topics = get_topics()234 admin = is_role("admins")235 if request.method == 'POST':236 month = request.form.get("month")237 topic = request.form.get("topic")238 user_id = user_data.get("user_id", "")239 url = f"/history/{topic}/{month}"240 logme.info(f"{url=} -- {user_id=}")241 ref = db.reference(url)242 monthly_data = {}243 if ref.get():244 for key, value in ref.order_by_child(245 'uid').equal_to(user_id).get().items():246 logme.info(f"{key=}\n{value=}")247 solution_data = get_solution(topic, value['q_id'])248 logme.info(f"{solution_data=}")249 value.update(solution_data)250 logme.info(f"updated {value=}")251 monthly_data[key] = value252 return render_template("history.html", admin=admin,253 user_data=user_data,254 data=monthly_data,255 topics=topics)256 else:257 return render_template("history.html", user_data=user_data,258 admin=admin, topics=topics, data={})259@app.route("/add_quest", methods=["GET", "POST"])260@need_role("admins")261@auth_required262def add_quest(user_data=None):263 admin = is_role("admins")264 if request.method == 'POST':265 topic = request.form.get("topic")266 name = request.form.get("name", "")267 quest = request.form.get("quest", "")268 solution = request.form.get("solution", "")269 logme.info(f"{topic=}\n{name=}\n{quest=}")270 ref = db.reference(f"/QuestionBank/{topic}")271 ref.push({'name': name.strip(), 'quest': quest.strip(), 'solution':272 solution.strip(), "used": 0})273 topics = get_topics()274 return flask.render_template("add_quest.html", user_data=user_data,275 admin=admin, title="Enter New Quest",276 topics=topics)277@app.route("/todays_quest", methods=["GET", "POST"])278@auth_required279def todays_quest(user_data=None):280 admin = is_role("admins")281 logme.info(f"Reached todays_quest with {request.method=}")282 if request.method == 'POST':283 topic = request.form.get("topic")284 logme.info(f"today's {topic=}")285 # qid, name, quest = get_todays_quest(topic)286 qid, name, quest, date = get_latest_quest(topic)287 if all((qid, name, quest)):288 logme.info(f"{qid=} -- {name=} -- {quest=}")289 return flask.render_template("todays_quest.html",290 title="Enter Daily Quest",291 quest_date=date, topic=topic,292 admin=admin, user_data=user_data,293 qid=qid, name=name, quest=quest)294 else:295 logme.info("Sorry no quest available")296 return "<html><body>Sorry, no new quest for today</body></html>"297 else:298 logme.info("user tried GET method for /todays_quest")299 return flask.redirect('/')300def get_value(url, key, default_value):301 ref = db.reference(f"/{url}")302 return data.get(key) if (data := ref.get()) else default_value303def get_cutoftime(topic_id):304 return get_value(f"/QuestDuration/{topic_id}", "cutoftime", 1750)305def get_repeat(topic_id):306 return get_value(f"/QuestDuration/{topic_id}", "repeat", 1)307# def log_me(msg, log_type):308# # Emits the data using the standard logging module309# logging.warning(msg)310def get_date(future_in=0):311 next_date = datetime.today() + timedelta(days=future_in)312 return next_date.strftime("%Y/%m/%d")313# def get_date():314# """Returns date time for url format in firebase"""315# return (datetime.now(ZoneInfo('Asia/Kolkata')) +316# timedelta(hours=6, minutes=30)).strftime("%Y/%m/%d")317def to_update(topic_id):318 """319 """320 latest = get_value(f"/QuestDuration/{topic_id}", "latest", 1)321 repeat = get_value(f"/QuestDuration/{topic_id}", "repeat", 1)322 today = get_date()323 next_date = get_date(repeat)324 logme.info(f"{today=} {next_date=}")325 # if next_date is already present then dont do anywhing326 val = get_value(f"/Topics/{topic_id}/{next_date}", "qid", False)327 logme.info(f"to do or not to do: {val=}")328 return next_date if not val else False329def get_fresh_quest(topic_id):330 """331 """332 key = ""333 ref = db.reference(f"/QuestionBank/{topic_id}")334 data = ref.order_by_child("used").equal_to(0).limit_to_first(1).get()335 logme.info(f"got the next free quest,{data=}")336 if data:337 key = list(data.keys())[0]338 # logme.info(f"Adding {key=}")339 # next_date = tomorrow()340 # logme.info(f"{next_date=}")341 # quest_ref = db.reference(f"/Topics/{topic}/{next_date}")342 # quest_ref.set({343 # "qid": key344 # })345 # Now lets update the questionbank # Code working346 child = ref.child(key)347 child.update({348 "used": 1349 })350 return key351def use_quest(quest_id, topic_id, next_date):352 try:353 quest_ref = db.reference(f"/Topics/{topic_id}/{next_date}")354 quest_ref.set({355 "qid": quest_id356 })357 # lets update the latest358 ref = db.reference(f"/QuestDuration/{topic_id}")359 ref.update({360 "latest": next_date361 })362 return True363 except Exception as e:364 logme.error(e)365 return False366def populate_quests():367 """368 Step 1: Get the next date for the quest based on `latest` and `repeat`369 Step 2. Do we need to populate next quest and skip if not370 3. if yes, then obtain the first unused quest for topic371 4. populate the quest with previous questdetails372 5. Update the quest as used.373 """374 logme.info("Lets start populating the quests for next valid date")375 # Lets find all the quests we have376 for topic_id in get_topics():377 logme.info(f"\nChecking {topic_id=} if it need updating")378 if(next_date := to_update(topic_id)):379 """380 Lets update the quest381 """382 logme.info(383 f"lets updat the topic with quest for date: {next_date}")384 if quest_id := get_fresh_quest(topic_id):385 logme.info(f"Found the quest: {quest_id=}")386 use_quest(quest_id, topic_id, next_date)387# def get_next_free_quest():388# """Use Push to add the questions else this will fail :("""389# topics = get_topics()390# logme.info(f"In get_next_free_quest: {topics}")391# for topic in topics:392# ref = db.reference(f"/QuestionBank/{topic}")393# data = ref.order_by_child("used").equal_to(0).limit_to_first(1).get()394# logme.info(f"{data=}")395# if data:396# key = list(data.keys())[0]397# logme.info(f"Adding {key=}")398# next_date = tomorrow()399# logme.info(f"{next_date=}")400# quest_ref = db.reference(f"/Topics/{topic}/{next_date}")401# quest_ref.set({402# "qid": key403# })404# # Now lets update the questionbank405# child = ref.child(key)406# child.update({407# "used": 1408# })409def schedule_quest():410 logme.info("Starting the logging")411 scheduler = BackgroundScheduler(timezone='Asia/Kolkata', daemon=True)412 scheduler.add_job(populate_quests, 'cron',413 hour='09', minute='30', second="00")414 scheduler.start()415 # Shut down the scheduler when exiting the app416 atexit.register(lambda: scheduler.shutdown())417@app.route("/evaluate", methods=['POST'])418@need_role("admins")419@auth_required420def evaluation(user_data=None):421 # topic = request.form.get("topic")422 month = request.form.get("selected_month")423 topic = request.form.get("selected_topic")424 logme.info(f"{month=} {topic=}")425 logme.info(f"{request.form=}")426 url = f"/history/{topic}/{month}"427 for key in request.form:428 if key.startswith("-"):429 _url = f"{url}/{key}"430 child = db.reference(_url)431 logme.info(f"Updating child: {key} -> {request.form.get(key)=}")432 child.update({433 "result": request.form.get(key)434 })435 return flask.redirect('/eval')436@app.route("/eval", methods=['GET', 'POST'])437@need_role("admins")438@auth_required439def eval_quest(user_data=None):440 topics = get_topics()441 admin = is_role("admins")442 if request.method == 'POST':443 month = request.form.get("month")444 topic = request.form.get("topic")445 # user_id = user_data.get("user_id", "")446 url = f"/history/{topic}/{month}"447 # logme.info(f"{url=} -- {user_id=}")448 ref = db.reference(url)449 monthly_data = {}450 if ref.get():451 for key, value in ref.order_by_child('date').get().items():452 logme.info(f"{key=}\n{value=}")453 solution_data = get_solution(topic, value['q_id'])454 logme.info(f"{solution_data=}")455 value.update(solution_data)456 logme.info(f"updated {value=}")457 monthly_data[key] = value458 return render_template("evaluate.html",459 user_data=user_data, month=month, topic=topic,460 data=monthly_data, admin=admin,461 topics=topics)462 else:463 return render_template("evaluate.html", user_data=user_data,464 admin=admin, topics=topics, data={})465@app.route("/logout")466def logout():467 """."""468 logme.info(f"{flask.request.cookies=}")469 id_token = flask.request.cookies.get('token')470 try:471 if id_token:472 logme.info(f"{id_token=}")473 decoded_claims = google.oauth2.id_token.verify_firebase_token(474 id_token, firebase_request_adapter)475 # decoded_claims = auth.verify_session_cookie(session_cookie)476 auth.revoke_refresh_tokens(decoded_claims['sub'])477 response = flask.make_response(flask.redirect('/login'))478 response.set_cookie('token', "", expires=-1)479 return response480 except auth.InvalidSessionCookieError as error:481 logme.error(f"Error: {error}")482 except ValueError as ve:483 logme.error(f"Value Error {ve=}")484 response = flask.make_response(flask.redirect('/login'))485 response.set_cookie('token', "", expires=-1)486 return response487 return flask.redirect('/login')488@app.route("/", methods=["GET"])489@auth_required490def index(user_data=None):491 admin = is_role("admins")492 logme.info(f"1. {admin=}")493 if user_data:494 logme.info(">> Getting topics")495 topics = get_topics()496 logme.info(f"1. user: {user_data=}\n{topics=}")497 return flask.render_template("index.html", title="Daily Quest",498 admin=admin, user_data=user_data,499 topics=topics)500 return redirect("/logout")501@app.route("/login")502def login():503 # if flask.request.cookies.get('token'):504 # return redirect("/logout")505 return flask.render_template("login.html", title="Please Login")506def get_past_quests(topic, month):507 url = f"/history/{topic}/{month}"508 # Lets get details of every user for month of509 topic_ref = db.reference(url)510 solution = topic_ref.order_by_child("date").get()511 return solution512def main():513 logme.info("Inside main")514 # schedule_quest()515 if __name__ == '__main__':516 port = int(os.getenv('PORT', "5000"))517 app.run(host='0.0.0.0', port=port, debug=False)...
run.py
Source:run.py
1import sys, os, time2from asyncio import get_event_loop, TimeoutError, ensure_future, new_event_loop, set_event_loop3from . import datelock, feed, get, output, verbose, storage4from .storage import db5import logging as logme6class Twint:7 def __init__(self, config):8 logme.debug(__name__+':Twint:__init__')9 if config.Resume is not None and (config.TwitterSearch or config.Followers or config.Following):10 logme.debug(__name__+':Twint:__init__:Resume')11 self.init = self.get_resume(config.Resume)12 else:13 self.init = '-1'14 self.feed = [-1]15 self.count = 016 self.user_agent = ""17 self.config = config18 self.conn = db.Conn(config.Database)19 self.d = datelock.Set(self.config.Until, self.config.Since)20 verbose.Elastic(config.Elasticsearch)21 if self.config.Store_object:22 logme.debug(__name__+':Twint:__init__:clean_follow_list')23 output._clean_follow_list()24 if self.config.Pandas_clean:25 logme.debug(__name__+':Twint:__init__:pandas_clean')26 storage.panda.clean()27 def get_resume(self, resumeFile):28 if not os.path.exists(resumeFile):29 return '-1'30 with open(resumeFile, 'r') as rFile:31 _init = rFile.readlines()[-1].strip('\n')32 return _init33 async def Feed(self):34 logme.debug(__name__+':Twint:Feed')35 consecutive_errors_count = 036 while True:37 response = await get.RequestUrl(self.config, self.init, headers=[("User-Agent", self.user_agent)])38 if self.config.Debug:39 print(response, file=open("twint-last-request.log", "w", encoding="utf-8"))40 self.feed = []41 try:42 if self.config.Favorites:43 self.feed, self.init = feed.Mobile(response)44 if not self.count % 40:45 time.sleep(5)46 elif self.config.Followers or self.config.Following:47 self.feed, self.init = feed.Follow(response)48 if not self.count % 40:49 time.sleep(5)50 elif self.config.Profile:51 if self.config.Profile_full:52 self.feed, self.init = feed.Mobile(response)53 else:54 self.feed, self.init = feed.profile(response)55 elif self.config.TwitterSearch:56 self.feed, self.init = feed.Json(response)57 break58 except TimeoutError as e:59 if self.config.Proxy_host.lower() == "tor":60 print("[?] Timed out, changing Tor identity...")61 if self.config.Tor_control_password is None:62 logme.critical(__name__+':Twint:Feed:tor-password')63 sys.stderr.write("Error: config.Tor_control_password must be set for proxy autorotation!\r\n")64 sys.stderr.write("Info: What is it? See https://stem.torproject.org/faq.html#can-i-interact-with-tors-controller-interface-directly\r\n")65 break66 else:67 get.ForceNewTorIdentity(self.config)68 continue69 else:70 logme.critical(__name__+':Twint:Feed:' + str(e))71 print(str(e))72 break73 except Exception as e:74 if self.config.Profile or self.config.Favorites:75 print("[!] Twitter does not return more data, scrape stops here.")76 break77 logme.critical(__name__+':Twint:Feed:noData' + str(e))78 # Sometimes Twitter says there is no data. But it's a lie.79 consecutive_errors_count += 180 if consecutive_errors_count < self.config.Retries_count:81 # skip to the next iteration if wait time does not satisfy limit constraints82 delay = round(consecutive_errors_count ** self.config.Backoff_exponent, 1)83 # if the delay is less than users set min wait time then replace delay84 if self.config.Min_wait_time > delay:85 delay = self.config.Min_wait_time86 sys.stderr.write('sleeping for {} secs\n'.format(delay))87 time.sleep(delay)88 self.user_agent = await get.RandomUserAgent(wa=True)89 continue90 logme.critical(__name__+':Twint:Feed:Tweets_known_error:' + str(e))91 sys.stderr.write(str(e) + " [x] run.Feed")92 sys.stderr.write("[!] if get this error but you know for sure that more tweets exist, please open an issue and we will investigate it!")93 break94 if self.config.Resume:95 print(self.init, file=open(self.config.Resume, "a", encoding="utf-8"))96 async def follow(self):97 await self.Feed()98 if self.config.User_full:99 logme.debug(__name__+':Twint:follow:userFull')100 self.count += await get.Multi(self.feed, self.config, self.conn)101 else:102 logme.debug(__name__+':Twint:follow:notUserFull')103 for user in self.feed:104 self.count += 1105 username = user.find("a")["name"]106 await output.Username(username, self.config, self.conn)107 async def favorite(self):108 logme.debug(__name__+':Twint:favorite')109 await self.Feed()110 self.count += await get.Multi(self.feed, self.config, self.conn)111 async def profile(self):112 await self.Feed()113 if self.config.Profile_full:114 logme.debug(__name__+':Twint:profileFull')115 self.count += await get.Multi(self.feed, self.config, self.conn)116 else:117 logme.debug(__name__+':Twint:notProfileFull')118 for tweet in self.feed:119 self.count += 1120 await output.Tweets(tweet, self.config, self.conn)121 async def tweets(self):122 await self.Feed()123 if self.config.Location:124 logme.debug(__name__+':Twint:tweets:location')125 self.count += await get.Multi(self.feed, self.config, self.conn)126 else:127 logme.debug(__name__+':Twint:tweets:notLocation')128 for tweet in self.feed:129 self.count += 1130 await output.Tweets(tweet, self.config, self.conn)131 async def main(self, callback=None):132 task = ensure_future(self.run()) # Might be changed to create_task in 3.7+.133 if callback:134 task.add_done_callback(callback)135 await task136 async def run(self):137 if self.config.TwitterSearch:138 self.user_agent = await get.RandomUserAgent(wa=True)139 else:140 self.user_agent = await get.RandomUserAgent()141 if self.config.User_id is not None:142 logme.debug(__name__+':Twint:main:user_id')143 self.config.Username = await get.Username(self.config.User_id)144 if self.config.Username is not None:145 logme.debug(__name__+':Twint:main:username')146 url = f"https://twitter.com/{self.config.Username}?lang=en"147 self.config.User_id = await get.User(url, self.config, self.conn, True)148 if self.config.TwitterSearch and self.config.Since and self.config.Until:149 logme.debug(__name__+':Twint:main:search+since+until')150 while self.d._since < self.d._until:151 self.config.Since = str(self.d._since)152 self.config.Until = str(self.d._until)153 if len(self.feed) > 0:154 await self.tweets()155 else:156 logme.debug(__name__+':Twint:main:gettingNewTweets')157 break158 if get.Limit(self.config.Limit, self.count):159 break160 else:161 logme.debug(__name__+':Twint:main:not-search+since+until')162 while True:163 if len(self.feed) > 0:164 if self.config.Followers or self.config.Following:165 logme.debug(__name__+':Twint:main:follow')166 await self.follow()167 elif self.config.Favorites:168 logme.debug(__name__+':Twint:main:favorites')169 await self.favorite()170 elif self.config.Profile:171 logme.debug(__name__+':Twint:main:profile')172 await self.profile()173 elif self.config.TwitterSearch:174 logme.debug(__name__+':Twint:main:twitter-search')175 await self.tweets()176 else:177 logme.debug(__name__+':Twint:main:no-more-tweets')178 break179 #logging.info("[<] " + str(datetime.now()) + ':: run+Twint+main+CallingGetLimit2')180 if get.Limit(self.config.Limit, self.count):181 logme.debug(__name__+':Twint:main:reachedLimit')182 break183 if self.config.Count:184 verbose.Count(self.count, self.config)185def run(config, callback=None):186 logme.debug(__name__+':run')187 try:188 get_event_loop()189 except RuntimeError as e:190 if "no current event loop" in str(e):191 set_event_loop(new_event_loop())192 else:193 logme.exception(__name__+':Lookup:Unexpected exception while handling an expected RuntimeError.')194 raise195 except Exception as e:196 logme.exception(__name__+':Lookup:Unexpected exception occured while attempting to get or create a new event loop.')197 raise198 get_event_loop().run_until_complete(Twint(config).main(callback))199def Favorites(config):200 logme.debug(__name__+':Favorites')201 config.Favorites = True202 config.Following = False203 config.Followers = False204 config.Profile = False205 config.Profile_full = False206 config.TwitterSearch = False207 run(config)208 if config.Pandas_au:209 storage.panda._autoget("tweet")210def Followers(config):211 logme.debug(__name__+':Followers')212 config.Followers = True213 config.Following = False214 config.Profile = False215 config.Profile_full = False216 config.Favorites = False217 config.TwitterSearch = False218 run(config)219 if config.Pandas_au:220 storage.panda._autoget("followers")221 if config.User_full:222 storage.panda._autoget("user")223 if config.Pandas_clean and not config.Store_object:224 #storage.panda.clean()225 output._clean_follow_list()226def Following(config):227 logme.debug(__name__+':Following')228 config.Following = True229 config.Followers = False230 config.Profile = False231 config.Profile_full = False232 config.Favorites = False233 config.TwitterSearch = False234 run(config)235 if config.Pandas_au:236 storage.panda._autoget("following")237 if config.User_full:238 storage.panda._autoget("user")239 if config.Pandas_clean and not config.Store_object:240 #storage.panda.clean()241 output._clean_follow_list()242def Lookup(config):243 logme.debug(__name__+':Lookup')244 try:245 get_event_loop()246 except RuntimeError as e:247 if "no current event loop" in str(e):248 set_event_loop(new_event_loop())249 else:250 logme.exception(__name__+':Lookup:Unexpected exception while handling an expected RuntimeError.')251 raise252 except Exception as e:253 logme.exception(__name__+':Lookup:Unexpected exception occured while attempting to get or create a new event loop.')254 raise255 try:256 if config.User_id is not None:257 logme.debug(__name__+':Twint:Lookup:user_id')258 config.Username = get_event_loop().run_until_complete(get.Username(config.User_id))259 url = f"https://twitter.com/{config.Username}?lang=en"260 get_event_loop().run_until_complete(get.User(url, config, db.Conn(config.Database)))261 if config.Pandas_au:262 storage.panda._autoget("user")263 except RuntimeError as e:264 if "no current event loop" in str(e):265 logme.exception(__name__+':Lookup:Previous attempt to to create an event loop failed.')266 raise267 except Exception as e:268 logme.exception(__name__+':Lookup:Unexpected exception occured.')269 raise270def Profile(config):271 logme.debug(__name__+':Profile')272 config.Profile = True273 config.Favorites = False274 config.Following = False275 config.Followers = False276 config.TwitterSearch = False277 run(config)278 if config.Pandas_au:279 storage.panda._autoget("tweet")280def Search(config, callback=None):281 logme.debug(__name__+':Search')282 config.TwitterSearch = True283 config.Favorites = False284 config.Following = False285 config.Followers = False286 config.Profile = False287 config.Profile_full = False288 run(config, callback)289 if config.Pandas_au:...
create_snapshots_by_instance_and_purge_old_backups.py
Source:create_snapshots_by_instance_and_purge_old_backups.py
...17 os.makedirs(output_dir + '/csv/')18if not os.path.exists(home + '/log/'):19 os.makedirs(home + '/log/')20logfile_name = home + '/log/' + 'scheduled_backup_run.' + datestamp + '.' + timestamp + '.log';21def logme(string_to_log,logfile_name):22 if (logToConsole=True):23 print(string_to_log)24 logfile_object = open(logfile_name,'a',0)25 logfile_object.write(string_to_log + "\n")26 logfile_object.close()27 28def get_snapshots_to_remove(myEC2Class,default_retention_value,specified_instances=[]):29 snaps_to_remove=[]30 specified_instance_ids=[]31 if len(specified_instances) == 0:32 specified_instances = myEC2Class.instance_details33 for record in myEC2Class.volume_details:34 in_backup_policy=False35 volid,volobj,volblkd,insid,insobj,num_of_total_snaps=record36 number_of_backup_snaps=037 for sid,sno,vid,vo,iid,io,sd,st,ubami,amiid,svex,ioex,amex,sndesc in myEC2Class.snapshot_details:38 if vid == volid:39 if 'Backup_Type' in sno.__dict__['tags']:40 if sno.__dict__['tags']['Backup_Type'] == 'Scheduled':41 number_of_backup_snaps += 142 for specified_instance in specified_instances:43 i_id = specified_instance[0]44 i_obj = specified_instance[2]45 i_name = specified_instance[1]46 retention_value = default_retention_value47 if i_id == insid:48 logme("\n" + volid +" on "+ i_name + " (" + i_id + ") : Checking if Scheduled Backup Policy is to be applied..",logfile_name)49 if 'Apply_Scheduled_Backup_Policy' in i_obj.__dict__['tags']:50 logme("\tApply_Scheduled_Backup_Policy tag found",logfile_name)51 if i_obj.__dict__['tags']['Apply_Scheduled_Backup_Policy'].lower() == "yes":52 logme("\tScheduled Backup Policy is applied to " + i_name +" (" + i_id + ")",logfile_name)53 if 'Backup_Retention' in i_obj.__dict__['tags']:54 logme("\tBackup retention value found",logfile_name)55 retention_value = int(i_obj.__dict__['tags']['Backup_Retention'])56 logme("\tRetention value is now :" + str(retention_value),logfile_name)57 logme("\tTotal snapshots of volume : " + str(num_of_total_snaps),logfile_name) 58 logme("\tTotal snapshots created by scheduled backup policy : " + str(number_of_backup_snaps),logfile_name) 59 if number_of_backup_snaps > retention_value:60 num_of_snaps_to_delete = number_of_backup_snaps - retention_value61 logme("\tNumber of backup snapshots to delete : " + str(num_of_snaps_to_delete),logfile_name)62 oldest_snapshots=myEC2Class.return_oldest_snapshots_to_delete_for_vol(volid,num_of_snaps_to_delete)63 for vid,snapid,snapdate,snaptime in oldest_snapshots:64 snaps_to_remove.append([insid,vid,snapid,snapdate,snaptime,number_of_backup_snaps,retention_value])65 else:66 logme("\tNumber of snapshots to delete : 0",logfile_name) 67 return snaps_to_remove 68def get_backup_volumes(myEC2Class,specified_instances=[]):69 snaps_created=[]70 specified_instance_ids=[]71 if len(specified_instances) == 0:72 specified_instances = myEC2Class.instance_details73 for record in myEC2Class.volume_details:74 volid,volobj,volblkd,insid,insobj,num_of_snaps=record75 for specified_instance in specified_instances:76 i_id = specified_instance[0]77 i_name = specified_instance[1]78 i_obj = specified_instance[2]79 if i_id == insid:80 if 'Backup_Retention' in i_obj.__dict__['tags']:81 retention_value = int(i_obj.__dict__['tags']['Backup_Retention'])82 if 'Apply_Scheduled_Backup_Policy' in i_obj.__dict__['tags']:83 if i_obj.__dict__['tags']['Apply_Scheduled_Backup_Policy'].lower() == "yes":84 timestamp=strftime("%d-%m-%Y %H:%M:%S", gmtime())85 created_snapshot=volobj.create_snapshot()86 created_snapshot.add_tags({'Name': 'Scheduled Backup of ' + i_name + ' (' + i_id + ') ' + timestamp , 'Instance_ID' : i_id, 'Volume_ID' : volid,'Backup_Type' : 'Scheduled' }) 87 snapid=created_snapshot.id88 snapdate,snaptime=str(created_snapshot.start_time)[:19].split('T')89 snaps_created.append([i_name,insid,volid,snapid,snapdate,snaptime])90 return snaps_created91def delete_snapshots(myEC2Class,snaps_to_remove):92 deleted_snapshots=[]93 for snap_to_remove in snaps_to_remove:94 for snapshots in myEC2Class.snapshot_details:95 # check ids match up - we do this to find out the object for the snapshot being deleted 96 if snap_to_remove[2] == snapshots[0]:97 logme("Deleting " + snapshots[0] + " of volume " + snapshots[2] + " (" + str(snapshots[5].__dict__['tags']['Name']) + ")...",logfile_name) 98 snapshots[1].delete()99 deleted_snapshots.append(snapshots)100 return deleted_snapshots101def get_EC2_totals_for_backup_policy_usage(myEC2Class):102 total_snapshots_created_other=0103 total_snapshots_created_by_backup_policy=0104 total_instances_backup_policy_off=0105 total_instances_backup_policy_on=0106 total_instances_backup_policy_unset=0107 for snaps_detail in myEC2Class.snapshot_details:108 sna_obj=snaps_detail[1]109 if 'Backup_Type' in sna_obj.__dict__['tags']:110 if sna_obj.__dict__['tags']['Backup_Type'] == "Scheduled":111 total_snapshots_created_by_backup_policy += 1112 else:113 total_snapshots_created_other += 1114 else:115 total_snapshots_created_other += 1116 for instance_detail in myEC2Class.instance_details:117 i_obj = instance_detail[2]118 if 'Apply_Scheduled_Backup_Policy' in i_obj.__dict__['tags']:119 if i_obj.__dict__['tags']['Apply_Scheduled_Backup_Policy'].lower() == "yes":120 total_instances_backup_policy_on += 1121 else:122 total_instances_backup_policy_off += 1123 else:124 total_instances_backup_policy_unset += 1125 126 return [total_snapshots_created_by_backup_policy,total_snapshots_created_other,total_instances_backup_policy_on,total_instances_backup_policy_off,total_instances_backup_policy_unset]127def main():128 global myEC2Utils129 region,awskeyid,awsseckey,awsaccountid=get_config_info(home + '/.aws_config/awsconfig.txt.ctrust_acc')130 myEC2Class = EC2Utils(region,awskeyid,awsseckey,awsaccountid)131 retention_value=2132 totals_array=[]133 if not os.path.exists(output_dir + "Snapshots_Created_By_Scheduled_Backup/csv"):134 os.makedirs(output_dir + "Snapshots_Created_By_Scheduled_Backup/csv")135 if not os.path.exists(output_dir + "Snapshots_Created_By_Scheduled_Backup/xlsx"):136 os.makedirs(output_dir + "Snapshots_Created_By_Scheduled_Backup/xlsx")137 if not os.path.exists(output_dir + "Totals_for_scheduled_backup_run/csv"):138 os.makedirs(output_dir + "Totals_for_scheduled_backup_run/csv")139 if not os.path.exists(output_dir + "Totals_for_scheduled_backup_run/xlsx"):140 os.makedirs(output_dir + "Totals_for_scheduled_backup_run/xlsx")141 logme("-------------------------------------------------------------",logfile_name)142 logme("Scheduled Backup Run : " + datetime.datetime.now().strftime("%A %D @ %T"),logfile_name)143 logme("-------------------------------------------------------------",logfile_name)144 logme("Total snapshots existing pre backup process : " + str(len(myEC2Class.snapshot_details)),logfile_name)145 totals_array.append(["Total Snapshots Pre Backup Process : ",len(myEC2Class.snapshot_details)])146 logme("Starting Scheduled Backup Process.....",logfile_name)147 created_snaps=get_backup_volumes(myEC2Class)148 logme("Total new snapshots creared by backup process : " + str(len(created_snaps)),logfile_name)149 totals_array.append(["Total new snaps created by backup process : ", len(created_snaps)])150 header=['Instance Id','Instance Name','Volume ID','Snapshot ID','Snapshot Date','Snapshot Time']151 outputfile = output_dir + 'Snapshots_Created_By_Scheduled_Backup/' + 'xlsx/' + datestamp + '_' + timestamp + '_Snapshots_Created_By_Scheduled_Backup.xlsx'152 logme("\nCreating Excel report containing details about created snapshots",logfile_name)153 myEC2Class.generate_report_from_array(outputfile,header,created_snaps,"Snapshots_Created")154 outputfile = output_dir + 'Snapshots_Created_By_Scheduled_Backup/' + 'csv/' + datestamp + '_' + timestamp + '_Snapshots_Created_By_Scheduled_Backup.csv'155 myEC2Class.generate_report_from_array(outputfile,header,created_snaps,"Snapshots_Created")156 logme("\nRefreshing Snapshot Information to get latest backup information",logfile_name)157 myEC2Class.update()158 logme("\nStarting Process to Delete old backups....",logfile_name)159 total_pre_deletion=len(myEC2Class.snapshot_details)160 totals_array.append(["Total Pre Deletion",total_pre_deletion])161 logme("Total of all snapshots (pre deletion): " + str(total_pre_deletion),logfile_name)162 snaps_to_remove=get_snapshots_to_remove(myEC2Class,retention_value)163 logme("\nTotal number of snapshots to be deleted : " + str(len(snaps_to_remove)),logfile_name)164 totals_array.append(["Total Snapshots to be Deleted",len(snaps_to_remove)])165 logme("\nDeleting old backups....",logfile_name) 166 deleted_snapshots=delete_snapshots(myEC2Class,snaps_to_remove)167 logme("\nRefreshing Snapshot Information after deletion..",logfile_name)168 myEC2Class.update()169 total_post_deletion=len(myEC2Class.snapshot_details)170 total_deleted=total_pre_deletion-total_post_deletion171 logme("\nTotal Snapshots successfully deleted : " + str(total_deleted),logfile_name)172 totals_array.append(["Total snapshot Successfully Deleted",total_deleted])173 logme("Total of all snapshots (post deletion): " + str(total_post_deletion),logfile_name)174 totals_array.append(["Total of all snapshots (post deletion)",total_post_deletion])175 tscb,tsco,tibe,tibo,tibu=get_EC2_totals_for_backup_policy_usage(myEC2Class)176 logme("Total Snapshots in EC2 created by backup policy : " + str(tscb),logfile_name)177 totals_array.append(["Total Snapshots in EC2 created using backup policy",tscb])178 logme("Total Snapshots in EC2 not created by backup policy : " + str(tsco),logfile_name)179 totals_array.append(["Total Snapshots in EC2 not created using backup policy",tsco])180 logme("Total Instances in EC2 with backup policy applied : " + str(tibe),logfile_name)181 totals_array.append(["Total Instances in EC2 with backup policy applied",tibe])182 logme("Total Instances in EC2 with backup policy turned off : " + str(tibo),logfile_name)183 totals_array.append(["Total Instances in EC2 with backup policy turned off",tibo])184 logme("Total Instances in EC2 with backup policy unset : " + str(tibu),logfile_name)185 totals_array.append(["Total Instances in EC2 with backup policy unset",tibu])186 header=['Total Type','Total']187 logme("\nCreating Totals report file",logfile_name)188 outputfile = output_dir + 'Totals_for_scheduled_backup_run/' + 'xlsx/' + datestamp + '_' + timestamp + '_Totals_for_scheduled_backup_run.xlsx'189 myEC2Class.generate_report_from_array(outputfile,header,totals_array,"Backup Policy Totals")190 outputfile = output_dir + 'Totals_for_scheduled_backup_run/' + 'csv/' + datestamp + '_' + timestamp + '_Totals_for_scheduled_backup_run.csv'191 myEC2Class.generate_report_from_array(outputfile,header,totals_array,"Backup Policy Totals")192 logme("",logfile_name)193 logme("-------------------------------------------------------------",logfile_name)194 logme("Finished Backup Run : " + datetime.datetime.now().strftime("%A %D @ %T"),logfile_name)195 logme("-------------------------------------------------------------",logfile_name)196if __name__ == "__main__":...
get.py
Source:get.py
1from async_timeout import timeout2from datetime import datetime3from bs4 import BeautifulSoup4import sys5import socket6import aiohttp7from fake_useragent import UserAgent8import asyncio9import concurrent.futures10import random11from json import loads12from aiohttp_socks import SocksConnector, SocksVer13from . import url14from .output import Tweets, Users15from .user import inf16import logging as logme17httpproxy = None18user_agent_list = [19 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36',20 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36',21 'Mozilla/5.0 (Windows NT 5.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36',22 'Mozilla/5.0 (Windows NT 6.2; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36',23 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36',24 'Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36',25 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36',26 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36',27 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36',28 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36',29 'Mozilla/4.0 (compatible; MSIE 9.0; Windows NT 6.1)',30 'Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko',31 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)',32 'Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko',33 'Mozilla/5.0 (Windows NT 6.2; WOW64; Trident/7.0; rv:11.0) like Gecko',34 'Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko',35 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.0; Trident/5.0)',36 'Mozilla/5.0 (Windows NT 6.3; WOW64; Trident/7.0; rv:11.0) like Gecko',37 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)',38 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; Trident/7.0; rv:11.0) like Gecko',39 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0)',40 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)',41 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729)'42]43def get_connector(config):44 logme.debug(__name__+':get_connector')45 _connector = None46 if config.Proxy_host:47 if config.Proxy_host.lower() == "tor":48 _connector = SocksConnector(49 socks_ver=SocksVer.SOCKS5,50 host='127.0.0.1',51 port=9050,52 rdns=True)53 elif config.Proxy_port and config.Proxy_type:54 if config.Proxy_type.lower() == "socks5":55 _type = SocksVer.SOCKS556 elif config.Proxy_type.lower() == "socks4":57 _type = SocksVer.SOCKS458 elif config.Proxy_type.lower() == "http":59 global httpproxy60 httpproxy = "http://" + config.Proxy_host + ":" + str(config.Proxy_port)61 return _connector62 else:63 logme.critical("get_connector:proxy-type-error")64 print("Error: Proxy types allowed are: http, socks5 and socks4. No https.")65 sys.exit(1)66 _connector = SocksConnector(67 socks_ver=_type,68 host=config.Proxy_host,69 port=config.Proxy_port,70 rdns=True)71 else:72 logme.critical(__name__+':get_connector:proxy-port-type-error')73 print("Error: Please specify --proxy-host, --proxy-port, and --proxy-type")74 sys.exit(1)75 else:76 if config.Proxy_port or config.Proxy_type:77 logme.critical(__name__+':get_connector:proxy-host-arg-error')78 print("Error: Please specify --proxy-host, --proxy-port, and --proxy-type")79 sys.exit(1)80 return _connector81async def RequestUrl(config, init, headers = []):82 logme.debug(__name__+':RequestUrl')83 _connector = get_connector(config)84 _serialQuery = ""85 params = []86 _url = ""87 if config.Profile:88 if config.Profile_full:89 logme.debug(__name__+':RequestUrl:Profile_full')90 _url = await url.MobileProfile(config.Username, init)91 else:92 logme.debug(__name__+':RequestUrl:notProfile_full')93 _url = await url.Profile(config.Username, init)94 _serialQuery = _url95 elif config.TwitterSearch:96 logme.debug(__name__+':RequestUrl:TwitterSearch')97 _url, params, _serialQuery = await url.Search(config, init)98 else:99 if config.Following:100 logme.debug(__name__+':RequestUrl:Following')101 _url = await url.Following(config.Username, init)102 elif config.Followers:103 logme.debug(__name__+':RequestUrl:Followers')104 _url = await url.Followers(config.Username, init)105 else:106 logme.debug(__name__+':RequestUrl:Favorites')107 _url = await url.Favorites(config.Username, init)108 _serialQuery = _url109 response = await Request(_url, params=params, connector=_connector, headers=headers)110 if config.Debug:111 print(_serialQuery, file=open("twint-request_urls.log", "a", encoding="utf-8"))112 return response113def ForceNewTorIdentity(config):114 logme.debug(__name__+':ForceNewTorIdentity')115 try:116 tor_c = socket.create_connection(('127.0.0.1', config.Tor_control_port))117 tor_c.send('AUTHENTICATE "{}"\r\nSIGNAL NEWNYM\r\n'.format(config.Tor_control_password).encode())118 response = tor_c.recv(1024)119 if response != b'250 OK\r\n250 OK\r\n':120 sys.stderr.write('Unexpected response from Tor control port: {}\n'.format(response))121 logme.critical(__name__+':ForceNewTorIdentity:unexpectedResponse')122 except Exception as e:123 logme.debug(__name__+':ForceNewTorIdentity:errorConnectingTor')124 sys.stderr.write('Error connecting to Tor control port: {}\n'.format(repr(e)))125 sys.stderr.write('If you want to rotate Tor ports automatically - enable Tor control port\n')126async def Request(url, connector=None, params=[], headers=[]):127 logme.debug(__name__+':Request:Connector')128 async with aiohttp.ClientSession(connector=connector, headers=headers) as session:129 return await Response(session, url, params)130async def Response(session, url, params=[]):131 logme.debug(__name__+':Response')132 with timeout(120):133 async with session.get(url, ssl=True, params=params, proxy=httpproxy) as response:134 return await response.text()135async def RandomUserAgent(wa=None):136 logme.debug(__name__+':RandomUserAgent')137 try:138 if wa:139 return "Mozilla/5.0 (Windows NT 6.4; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2225.0 Safari/537.36"140 return UserAgent(verify_ssl=False, use_cache_server=False).random141 except:142 return random.choice(user_agent_list)143async def Username(_id):144 logme.debug(__name__+':Username')145 url = f"https://twitter.com/intent/user?user_id={_id}&lang=en"146 r = await Request(url)147 soup = BeautifulSoup(r, "html.parser")148 return soup.find("a", "fn url alternate-context")["href"].replace("/", "")149async def Tweet(url, config, conn):150 logme.debug(__name__+':Tweet')151 try:152 response = await Request(url)153 soup = BeautifulSoup(response, "html.parser")154 tweets = soup.find_all("div", "tweet")155 await Tweets(tweets, config, conn, url)156 except Exception as e:157 logme.critical(__name__+':Tweet:' + str(e))158async def User(url, config, conn, user_id = False):159 logme.debug(__name__+':User')160 _connector = get_connector(config)161 try:162 response = await Request(url, connector=_connector)163 soup = BeautifulSoup(response, "html.parser")164 if user_id:165 return int(inf(soup, "id"))166 await Users(soup, config, conn)167 except Exception as e:168 logme.critical(__name__+':User:' + str(e))169def Limit(Limit, count):170 logme.debug(__name__+':Limit')171 if Limit is not None and count >= int(Limit):172 return True173async def Multi(feed, config, conn):174 logme.debug(__name__+':Multi')175 count = 0176 try:177 with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:178 loop = asyncio.get_event_loop()179 futures = []180 for tweet in feed:181 count += 1182 if config.Favorites or config.Profile_full:183 logme.debug(__name__+':Multi:Favorites-profileFull')184 link = tweet.find("a")["href"]185 url = f"https://twitter.com{link}&lang=en"186 elif config.User_full:187 logme.debug(__name__+':Multi:userFull')188 username = tweet.find("a")["name"]189 url = f"http://twitter.com/{username}?lang=en"190 else:191 logme.debug(__name__+':Multi:else-url')192 link = tweet.find("a", "tweet-timestamp js-permalink js-nav js-tooltip")["href"]193 url = f"https://twitter.com{link}?lang=en"194 if config.User_full:195 logme.debug(__name__+':Multi:user-full-Run')196 futures.append(loop.run_in_executor(executor, await User(url,197 config, conn)))198 else:199 logme.debug(__name__+':Multi:notUser-full-Run')200 futures.append(loop.run_in_executor(executor, await Tweet(url,201 config, conn)))202 logme.debug(__name__+':Multi:asyncioGather')203 await asyncio.gather(*futures)204 except Exception as e:205 # TODO: fix error not error206 # print(str(e) + " [x] get.Multi")207 # will return "'NoneType' object is not callable"208 # but still works209 # logme.critical(__name__+':Multi:' + str(e))210 pass...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!