How to use _request_end method in Molotov

Best Python code snippet using molotov_python

request.py

Source: request.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import threading3import time4import traceback5import re6from resources.lib.modules.globals import g7from resources.lib.modules.scrapers import source_utils8from resources.lib.modules.scrapers.common_types import UrlParts9from resources.lib.modules.scrapers.third_party.cloudscraper import cloudscraper10from requests.compat import urlparse, urlunparse11_head_checks = {}12def _get(cfscrape, url, headers, timeout, allow_redirects, update_options_fn):13 request_options = {14 'method': 'GET',15 'url': url,16 'headers': headers,17 'timeout': timeout,18 'allow_redirects': allow_redirects19 }20 if update_options_fn is not None:21 update_options_fn(request_options)22 return cfscrape.request(**request_options)23def _is_cloudflare_iuam_challenge(resp, allow_empty_body=False):24 try:25 return (26 resp.headers.get('Server', '').startswith('cloudflare')27 and resp.status_code in [429, 503]28 and (allow_empty_body or re.search(29 r'action="/​.*?__cf_chl_jschl_tk__=\S+".*?name="jschl_vc"\svalue=.*?',30 resp.text,31 re.M | re.DOTALL32 ))33 )34 except AttributeError:35 pass36 return False37def _get_domain(url): 38 parsed_url = urlparse(url)39 scheme = parsed_url.scheme if parsed_url.scheme != '' else 'https'40 return "%s:/​/​%s" % (scheme, parsed_url.netloc)41def _get_head_check(url):42 result = _head_checks.get(url, None)43 if isinstance(result, bool):44 return (url, result)45 elif result is not None:46 return _get_head_check(result)47 return (url, None)48class Request(object):49 def __init__(self, sequental=False, timeout=None, wait=1):50 self._request = source_utils.randomUserAgentRequests()51 self._cfscrape = cloudscraper.create_scraper(interpreter='native')52 self._sequental = sequental53 self._wait = wait54 self._should_wait = False55 self._lock = threading.Lock()56 self._timeout = 1057 if timeout is not None:58 self._timeout = timeout59 self.exc_msg = ''60 self.skip_head = False61 self.request_time = 9962 def _verify_response(self, response):63 if response.status_code >= 400:64 self.exc_msg = 'response status code %s' % response.status_code65 if response.status_code in [429, 503]:66 self.exc_msg = '%s (probably Cloudflare)' % self.exc_msg67 raise Exception()68 def _request_core(self, request, sequental = None, cf_retries=3):69 self.exc_msg = ''70 if sequental is None:71 sequental = self._sequental72 response_err = lambda: None73 response_err.status_code = 50174 try:75 response = None76 if sequental is False:77 self._request_start = time.time()78 response = request(None)79 self._request_end = time.time()80 self.request_time = round(self._request_end - self._request_start)81 response_err = response82 self._verify_response(response)83 return response84 # with self._lock:85 # if self._should_wait:86 # time.sleep(self._wait)87 # self._should_wait = True88 # self._request_start = time.time()89 # response = request(_update_request_options)90 # self._request_end = time.time()91 # self.request_time = round(self._request_end - self._request_start)92 #93 # response_err = response94 # self._verify_response(response)95 #96 # try:97 # if self.exc_msg == '' and response.request.headers.get('X-Domain', None) is not None:98 # _save_cf_cookies(self._cfscrape, response)99 # except: pass100 #101 # return response102 except:103 self._request_end = time.time()104 self.request_time = round(self._request_end - self._request_start)105 if self.exc_msg == '':106 exc = traceback.format_exc(limit=1)107 if 'ConnectTimeout' in exc or 'ReadTimeout' in exc:108 self.exc_msg = 'request timed out'109 if 'Detected the new Cloudflare challenge.' in exc and cf_retries > 0 and self.request_time < 2:110 cf_retries -= 1111 tools.log('cf_new_challenge_retry: %s' % request.url, 'notice')112 return self._request_core(request, sequental, cf_retries)113 elif 'Cloudflare' in exc or '!!Loop Protection!!' in exc:114 self.exc_msg = 'failed Cloudflare protection'115 elif 'Max retries exceeded with url' in exc:116 self.exc_msg = 'Max retries exceeded'117 else:118 self.exc_msg = 'failed - %s' % exc119 # g.log('%s %s' % (request.url, self.exc_msg), 'notice')120 return response_err121 def _check_redirect(self, src, response):122 if response.status_code in [301, 302]:123 redirect_url = response.headers['Location']124 if not redirect_url.endswith('127.0.0.1') and not redirect_url.endswith('localhost') and response.url != redirect_url:125 dest = redirect_url126 src_clean = re.sub(r'https?:/​/​', '', src)127 dest_clean = re.sub(r'https?:/​/​', '', _get_domain(dest))128 if src_clean != dest_clean or 'https:/​/​' in dest:129 dest130 return False131 def _head(self, url):132 global _head_checks133 if self.skip_head:134 return (url, 200)135 (url, head_check) = _get_head_check(url)136 if head_check:137 return (url, 200)138 elif head_check is False:139 return (url, 500)140 url = _get_domain(url)141 # g.log('HEAD: %s' % url, 'debug')142 request = lambda _: self._request.head(url, timeout=2)143 request.url = url144 try:145 response = self._request_core(request, sequental=False)146 if _is_cloudflare_iuam_challenge(response, allow_empty_body=True):147 response = lambda: None148 response.url = url149 response.status_code = 200150 if response.status_code >= 400:151 response = lambda: None152 response.url = url153 response.status_code = 200154 except:155 response = lambda: None156 response.url = url157 response.status_code = 200158 try:159 head_check_key = _get_domain(response.url)160 except:161 response.url = url162 head_check_key = _get_domain(url)163 redirect_url = self._check_redirect(head_check_key, response)164 if redirect_url:165 _head_checks[head_check_key] = redirect_url166 return self._head(redirect_url)167 _head_checks[head_check_key] = response.status_code == 200168 return (response.url, response.status_code)169 def head(self, url):170 return self._head(url)171 # return database.get(self._head, 12, url)172 def find_url(self, urls):173 for url in urls:174 (response_url, status_code) = self.head(url.base)175 if status_code != 200:176 continue177 if response_url.endswith("/​"):178 response_url = response_url[:-1]179 return UrlParts(base=response_url, search=url.search, default_search=url.default_search)180 return None181 def get(self, url, headers={}, allow_redirects=True):182 parsed_url = urlparse(url)183 response = self.head(_get_domain(url))184 if response is None:185 return None186 # (url, status_code) = response187 # if status_code != 200:188 # return None189 resolved_url = urlparse(url)190 url = urlunparse(191 (192 resolved_url.scheme,193 resolved_url.netloc,194 parsed_url.path,195 parsed_url.params,196 parsed_url.query,197 parsed_url.fragment,198 )199 )200 # g.log('GET: %s' % url)#, 'debug')201 request = lambda x: _get(self._cfscrape, url, headers, self._timeout, allow_redirects, x)202 request.url = url203 return self._request_core(request)204 def post(self, url, data, headers={}):205 # g.log('POST: %s' % url, 'debug')206 request = lambda _: self._cfscrape.post(url, data, headers=headers, timeout=self._timeout)207 request.url = url...

Full Screen

Full Screen

observer_thread_wrapper.py

Source: observer_thread_wrapper.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""Wrapper for observer registered in ThreadedMolerConnection (old name: ObservableConnection)."""3__author__ = 'Marcin Usielski'4__copyright__ = 'Copyright (C) 2020-2021, Nokia'5__email__ = 'marcin.usielski@nokia.com'6from threading import Thread7from moler.config.loggers import TRACE8from moler.exceptions import CommandFailure, MolerException9import logging10from moler.util import tracked_thread11import threading12try:13 import queue14except ImportError:15 import Queue as queue # For python 216class ObserverThreadWrapper(object):17 """Wrapper for observer registered in ThreadedMolerConnection (old name: ObservableConnection)."""18 _th_nr = 119 def __init__(self, observer, observer_self, logger):20 """21 Construct wrapper for observer.22 :param observer: observer to wrap.23 :param observer_self: self for observer if observer is method from object or None if observer is a function.24 :param logger: logger to log.25 """26 self._observer = observer27 self._observer_self = observer_self28 self._queue = queue.Queue()29 self._request_end = threading.Event()30 self._timeout_for_get_from_queue = 131 self.logger = logger32 self._t = Thread(target=self._loop_for_observer, name="ObserverThreadWrapper-{}-{}".format(33 ObserverThreadWrapper._th_nr, observer_self))34 ObserverThreadWrapper._th_nr += 135 self._t.setDaemon(True)36 self._t.start()37 def feed(self, data, recv_time):38 """39 Put data here.40 :param data: data to put.41 :param recv_time: time when data is received/​read from connection.42 :return: None43 """44 self._queue.put((data, recv_time))45 def request_stop(self):46 """47 Call if you want to stop feed observer.48 :return: None49 """50 self._request_end.set()51 # self._t.join() # only for debugging to have less active threads.52 if self._t:53 self._t = None54 @tracked_thread.log_exit_exception55 def _loop_for_observer(self):56 """57 Loop to pass data (put by method feed) to observer.58 :return: None59 """60 logging.getLogger("moler_threads").debug("ENTER {}".format(self._observer))61 heartbeat = tracked_thread.report_alive()62 while not self._request_end.is_set():63 if next(heartbeat):64 logging.getLogger("moler_threads").debug("ALIVE")65 try:66 data, timestamp = self._queue.get(True, self._timeout_for_get_from_queue)67 try:68 self.logger.log(level=TRACE, msg=r'notifying {}({!r})'.format(self._observer, repr(data)))69 except ReferenceError:70 self._request_end.set() # self._observer is no more valid.71 try:72 if self._observer_self:73 self._observer(self._observer_self, data, timestamp)74 else:75 self._observer(data, timestamp)76 except ReferenceError:77 self._request_end.set() # self._observer is no more valid.78 except Exception as ex:79 self._handle_unexpected_error_from_observer(exception=ex, data=data, timestamp=timestamp)80 except queue.Empty:81 pass # No incoming data within self._timeout_for_get_from_queue82 self._observer = None83 self._observer_self = None84 logging.getLogger("moler_threads").debug("EXIT")85 def _handle_unexpected_error_from_observer(self, exception, data, timestamp):86 self.logger.exception(msg=r'Exception inside: {}({!r}) at {}'.format(self._observer, repr(data), timestamp))87class ObserverThreadWrapperForConnectionObserver(ObserverThreadWrapper):88 def _handle_unexpected_error_from_observer(self, exception, data, timestamp):89 self.logger.warning("Unhandled exception from '{} 'caught by ObserverThreadWrapperForConnectionObserver"90 " (Runner normally). '{}' : '{}'.".format(self._observer_self, exception, repr(exception)))91 ex_msg = "Unexpected exception from {} caught by runner when processing data >>{}<< at '{}':" \92 " >>>{}<<< -> repr: >>>{}<<<".format(self._observer_self, data, timestamp, exception, repr(exception))93 if self._observer_self.is_command():94 ex = CommandFailure(command=self._observer_self, message=ex_msg)95 else:96 ex = MolerException(ex_msg)...

Full Screen

Full Screen

Blogs

Check out the latest blogs from LambdaTest on this topic:

Testing in Production: A Detailed Guide

When most firms employed a waterfall development model, it was widely joked about in the industry that Google kept its products in beta forever. Google has been a pioneer in making the case for in-production testing. Traditionally, before a build could go live, a tester was responsible for testing all scenarios, both defined and extempore, in a testing environment. However, this concept is evolving on multiple fronts today. For example, the tester is no longer testing alone. Developers, designers, build engineers, other stakeholders, and end users, both inside and outside the product team, are testing the product and providing feedback.

Dec’22 Updates: The All-New LT Browser 2.0, XCUI App Automation with HyperExecute, And More!

Greetings folks! With the new year finally upon us, we’re excited to announce a collection of brand-new product updates. At LambdaTest, we strive to provide you with a comprehensive test orchestration and execution platform to ensure the ultimate web and mobile experience.

An Interactive Guide To CSS Hover Effects

Building a website is all about keeping the user experience in mind. Ultimately, it’s about providing visitors with a mind-blowing experience so they’ll keep coming back. One way to ensure visitors have a great time on your site is to add some eye-catching text or image animations.

Fault-Based Testing and the Pesticide Paradox

In some sense, testing can be more difficult than coding, as validating the efficiency of the test cases (i.e., the ‘goodness’ of your tests) can be much harder than validating code correctness. In practice, the tests are just executed without any validation beyond the pass/fail verdict. On the contrary, the code is (hopefully) always validated by testing. By designing and executing the test cases the result is that some tests have passed, and some others have failed. Testers do not know much about how many bugs remain in the code, nor about their bug-revealing efficiency.

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Molotov automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful