Best Python code snippet using molotov_python
request.py
Source:request.py
1# -*- coding: utf-8 -*-2import threading3import time4import traceback5import re6from resources.lib.modules.globals import g7from resources.lib.modules.scrapers import source_utils8from resources.lib.modules.scrapers.common_types import UrlParts9from resources.lib.modules.scrapers.third_party.cloudscraper import cloudscraper10from requests.compat import urlparse, urlunparse11_head_checks = {}12def _get(cfscrape, url, headers, timeout, allow_redirects, update_options_fn):13 request_options = {14 'method': 'GET',15 'url': url,16 'headers': headers,17 'timeout': timeout,18 'allow_redirects': allow_redirects19 }20 if update_options_fn is not None:21 update_options_fn(request_options)22 return cfscrape.request(**request_options)23def _is_cloudflare_iuam_challenge(resp, allow_empty_body=False):24 try:25 return (26 resp.headers.get('Server', '').startswith('cloudflare')27 and resp.status_code in [429, 503]28 and (allow_empty_body or re.search(29 r'action="/.*?__cf_chl_jschl_tk__=\S+".*?name="jschl_vc"\svalue=.*?',30 resp.text,31 re.M | re.DOTALL32 ))33 )34 except AttributeError:35 pass36 return False37def _get_domain(url): 38 parsed_url = urlparse(url)39 scheme = parsed_url.scheme if parsed_url.scheme != '' else 'https'40 return "%s://%s" % (scheme, parsed_url.netloc)41def _get_head_check(url):42 result = _head_checks.get(url, None)43 if isinstance(result, bool):44 return (url, result)45 elif result is not None:46 return _get_head_check(result)47 return (url, None)48class Request(object):49 def __init__(self, sequental=False, timeout=None, wait=1):50 self._request = source_utils.randomUserAgentRequests()51 self._cfscrape = cloudscraper.create_scraper(interpreter='native')52 self._sequental = sequental53 self._wait = wait54 self._should_wait = False55 self._lock = threading.Lock()56 self._timeout = 1057 if timeout is not None:58 self._timeout = timeout59 self.exc_msg = ''60 self.skip_head = False61 self.request_time = 9962 def _verify_response(self, response):63 if response.status_code >= 400:64 self.exc_msg = 'response status code %s' % response.status_code65 if response.status_code in [429, 503]:66 self.exc_msg = '%s (probably Cloudflare)' % self.exc_msg67 raise Exception()68 def _request_core(self, request, sequental = None, cf_retries=3):69 self.exc_msg = ''70 if sequental is None:71 sequental = self._sequental72 response_err = lambda: None73 response_err.status_code = 50174 try:75 response = None76 if sequental is False:77 self._request_start = time.time()78 response = request(None)79 self._request_end = time.time()80 self.request_time = round(self._request_end - self._request_start)81 response_err = response82 self._verify_response(response)83 return response84 # with self._lock:85 # if self._should_wait:86 # time.sleep(self._wait)87 # self._should_wait = True88 # self._request_start = time.time()89 # response = request(_update_request_options)90 # self._request_end = time.time()91 # self.request_time = round(self._request_end - self._request_start)92 #93 # response_err = response94 # self._verify_response(response)95 #96 # try:97 # if self.exc_msg == '' and response.request.headers.get('X-Domain', None) is not None:98 # _save_cf_cookies(self._cfscrape, response)99 # except: pass100 #101 # return response102 except:103 self._request_end = time.time()104 self.request_time = round(self._request_end - self._request_start)105 if self.exc_msg == '':106 exc = traceback.format_exc(limit=1)107 if 'ConnectTimeout' in exc or 'ReadTimeout' in exc:108 self.exc_msg = 'request timed out'109 if 'Detected the new Cloudflare challenge.' in exc and cf_retries > 0 and self.request_time < 2:110 cf_retries -= 1111 tools.log('cf_new_challenge_retry: %s' % request.url, 'notice')112 return self._request_core(request, sequental, cf_retries)113 elif 'Cloudflare' in exc or '!!Loop Protection!!' in exc:114 self.exc_msg = 'failed Cloudflare protection'115 elif 'Max retries exceeded with url' in exc:116 self.exc_msg = 'Max retries exceeded'117 else:118 self.exc_msg = 'failed - %s' % exc119 # g.log('%s %s' % (request.url, self.exc_msg), 'notice')120 return response_err121 def _check_redirect(self, src, response):122 if response.status_code in [301, 302]:123 redirect_url = response.headers['Location']124 if not redirect_url.endswith('127.0.0.1') and not redirect_url.endswith('localhost') and response.url != redirect_url:125 dest = redirect_url126 src_clean = re.sub(r'https?://', '', src)127 dest_clean = re.sub(r'https?://', '', _get_domain(dest))128 if src_clean != dest_clean or 'https://' in dest:129 dest130 return False131 def _head(self, url):132 global _head_checks133 if self.skip_head:134 return (url, 200)135 (url, head_check) = _get_head_check(url)136 if head_check:137 return (url, 200)138 elif head_check is False:139 return (url, 500)140 url = _get_domain(url)141 # g.log('HEAD: %s' % url, 'debug')142 request = lambda _: self._request.head(url, timeout=2)143 request.url = url144 try:145 response = self._request_core(request, sequental=False)146 if _is_cloudflare_iuam_challenge(response, allow_empty_body=True):147 response = lambda: None148 response.url = url149 response.status_code = 200150 if response.status_code >= 400:151 response = lambda: None152 response.url = url153 response.status_code = 200154 except:155 response = lambda: None156 response.url = url157 response.status_code = 200158 try:159 head_check_key = _get_domain(response.url)160 except:161 response.url = url162 head_check_key = _get_domain(url)163 redirect_url = self._check_redirect(head_check_key, response)164 if redirect_url:165 _head_checks[head_check_key] = redirect_url166 return self._head(redirect_url)167 _head_checks[head_check_key] = response.status_code == 200168 return (response.url, response.status_code)169 def head(self, url):170 return self._head(url)171 # return database.get(self._head, 12, url)172 def find_url(self, urls):173 for url in urls:174 (response_url, status_code) = self.head(url.base)175 if status_code != 200:176 continue177 if response_url.endswith("/"):178 response_url = response_url[:-1]179 return UrlParts(base=response_url, search=url.search, default_search=url.default_search)180 return None181 def get(self, url, headers={}, allow_redirects=True):182 parsed_url = urlparse(url)183 response = self.head(_get_domain(url))184 if response is None:185 return None186 # (url, status_code) = response187 # if status_code != 200:188 # return None189 resolved_url = urlparse(url)190 url = urlunparse(191 (192 resolved_url.scheme,193 resolved_url.netloc,194 parsed_url.path,195 parsed_url.params,196 parsed_url.query,197 parsed_url.fragment,198 )199 )200 # g.log('GET: %s' % url)#, 'debug')201 request = lambda x: _get(self._cfscrape, url, headers, self._timeout, allow_redirects, x)202 request.url = url203 return self._request_core(request)204 def post(self, url, data, headers={}):205 # g.log('POST: %s' % url, 'debug')206 request = lambda _: self._cfscrape.post(url, data, headers=headers, timeout=self._timeout)207 request.url = url...
observer_thread_wrapper.py
Source:observer_thread_wrapper.py
1# -*- coding: utf-8 -*-2"""Wrapper for observer registered in ThreadedMolerConnection (old name: ObservableConnection)."""3__author__ = 'Marcin Usielski'4__copyright__ = 'Copyright (C) 2020-2021, Nokia'5__email__ = 'marcin.usielski@nokia.com'6from threading import Thread7from moler.config.loggers import TRACE8from moler.exceptions import CommandFailure, MolerException9import logging10from moler.util import tracked_thread11import threading12try:13 import queue14except ImportError:15 import Queue as queue # For python 216class ObserverThreadWrapper(object):17 """Wrapper for observer registered in ThreadedMolerConnection (old name: ObservableConnection)."""18 _th_nr = 119 def __init__(self, observer, observer_self, logger):20 """21 Construct wrapper for observer.22 :param observer: observer to wrap.23 :param observer_self: self for observer if observer is method from object or None if observer is a function.24 :param logger: logger to log.25 """26 self._observer = observer27 self._observer_self = observer_self28 self._queue = queue.Queue()29 self._request_end = threading.Event()30 self._timeout_for_get_from_queue = 131 self.logger = logger32 self._t = Thread(target=self._loop_for_observer, name="ObserverThreadWrapper-{}-{}".format(33 ObserverThreadWrapper._th_nr, observer_self))34 ObserverThreadWrapper._th_nr += 135 self._t.setDaemon(True)36 self._t.start()37 def feed(self, data, recv_time):38 """39 Put data here.40 :param data: data to put.41 :param recv_time: time when data is received/read from connection.42 :return: None43 """44 self._queue.put((data, recv_time))45 def request_stop(self):46 """47 Call if you want to stop feed observer.48 :return: None49 """50 self._request_end.set()51 # self._t.join() # only for debugging to have less active threads.52 if self._t:53 self._t = None54 @tracked_thread.log_exit_exception55 def _loop_for_observer(self):56 """57 Loop to pass data (put by method feed) to observer.58 :return: None59 """60 logging.getLogger("moler_threads").debug("ENTER {}".format(self._observer))61 heartbeat = tracked_thread.report_alive()62 while not self._request_end.is_set():63 if next(heartbeat):64 logging.getLogger("moler_threads").debug("ALIVE")65 try:66 data, timestamp = self._queue.get(True, self._timeout_for_get_from_queue)67 try:68 self.logger.log(level=TRACE, msg=r'notifying {}({!r})'.format(self._observer, repr(data)))69 except ReferenceError:70 self._request_end.set() # self._observer is no more valid.71 try:72 if self._observer_self:73 self._observer(self._observer_self, data, timestamp)74 else:75 self._observer(data, timestamp)76 except ReferenceError:77 self._request_end.set() # self._observer is no more valid.78 except Exception as ex:79 self._handle_unexpected_error_from_observer(exception=ex, data=data, timestamp=timestamp)80 except queue.Empty:81 pass # No incoming data within self._timeout_for_get_from_queue82 self._observer = None83 self._observer_self = None84 logging.getLogger("moler_threads").debug("EXIT")85 def _handle_unexpected_error_from_observer(self, exception, data, timestamp):86 self.logger.exception(msg=r'Exception inside: {}({!r}) at {}'.format(self._observer, repr(data), timestamp))87class ObserverThreadWrapperForConnectionObserver(ObserverThreadWrapper):88 def _handle_unexpected_error_from_observer(self, exception, data, timestamp):89 self.logger.warning("Unhandled exception from '{} 'caught by ObserverThreadWrapperForConnectionObserver"90 " (Runner normally). '{}' : '{}'.".format(self._observer_self, exception, repr(exception)))91 ex_msg = "Unexpected exception from {} caught by runner when processing data >>{}<< at '{}':" \92 " >>>{}<<< -> repr: >>>{}<<<".format(self._observer_self, data, timestamp, exception, repr(exception))93 if self._observer_self.is_command():94 ex = CommandFailure(command=self._observer_self, message=ex_msg)95 else:96 ex = MolerException(ex_msg)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!