Best Python code snippet using tavern
sessions.py
Source:sessions.py
1# -*- coding: utf-8 -*-2"""3requests.session4~~~~~~~~~~~~~~~~5This module provides a Session object to manage and persist settings across6requests (cookies, auth, proxies).7"""8import os9from collections import Mapping10from datetime import datetime11from .auth import _basic_auth_str12from .compat import cookielib, OrderedDict, urljoin, urlparse, builtin_str13from .cookies import (14 cookiejar_from_dict, extract_cookies_to_jar, RequestsCookieJar, merge_cookies)15from .models import Request, PreparedRequest, DEFAULT_REDIRECT_LIMIT16from .hooks import default_hooks, dispatch_hook17from .utils import to_key_val_list, default_headers, to_native_string18from .exceptions import (19 TooManyRedirects, InvalidSchema, ChunkedEncodingError, ContentDecodingError)20from .structures import CaseInsensitiveDict21from .adapters import HTTPAdapter22from .utils import (23 requote_uri, get_environ_proxies, get_netrc_auth, should_bypass_proxies,24 get_auth_from_url25)26from .status_codes import codes27# formerly defined here, reexposed here for backward compatibility28from .models import REDIRECT_STATI29def merge_setting(request_setting, session_setting, dict_class=OrderedDict):30 """31 Determines appropriate setting for a given request, taking into account the32 explicit setting on that request, and the setting in the session. If a33 setting is a dictionary, they will be merged together using `dict_class`34 """35 if session_setting is None:36 return request_setting37 if request_setting is None:38 return session_setting39 # Bypass if not a dictionary (e.g. verify)40 if not (41 isinstance(session_setting, Mapping) and42 isinstance(request_setting, Mapping)43 ):44 return request_setting45 merged_setting = dict_class(to_key_val_list(session_setting))46 merged_setting.update(to_key_val_list(request_setting))47 # Remove keys that are set to None.48 for (k, v) in request_setting.items():49 if v is None:50 del merged_setting[k]51 merged_setting = dict((k, v) for (k, v) in merged_setting.items() if v is not None)52 return merged_setting53def merge_hooks(request_hooks, session_hooks, dict_class=OrderedDict):54 """55 Properly merges both requests and session hooks.56 This is necessary because when request_hooks == {'response': []}, the57 merge breaks Session hooks entirely.58 """59 if session_hooks is None or session_hooks.get('response') == []:60 return request_hooks61 if request_hooks is None or request_hooks.get('response') == []:62 return session_hooks63 return merge_setting(request_hooks, session_hooks, dict_class)64class SessionRedirectMixin(object):65 def resolve_redirects(self, resp, req, stream=False, timeout=None,66 verify=True, cert=None, proxies=None):67 """Receives a Response. Returns a generator of Responses."""68 i = 069 hist = [] # keep track of history70 while resp.is_redirect:71 prepared_request = req.copy()72 if i > 0:73 # Update history and keep track of redirects.74 hist.append(resp)75 new_hist = list(hist)76 resp.history = new_hist77 try:78 resp.content # Consume socket so it can be released79 except (ChunkedEncodingError, ContentDecodingError, RuntimeError):80 resp.raw.read(decode_content=False)81 if i >= self.max_redirects:82 raise TooManyRedirects('Exceeded %s redirects.' % self.max_redirects)83 # Release the connection back into the pool.84 resp.close()85 url = resp.headers['location']86 method = req.method87 # Handle redirection without scheme (see: RFC 1808 Section 4)88 if url.startswith('//'):89 parsed_rurl = urlparse(resp.url)90 url = '%s:%s' % (parsed_rurl.scheme, url)91 # The scheme should be lower case...92 parsed = urlparse(url)93 url = parsed.geturl()94 # Facilitate relative 'location' headers, as allowed by RFC 7231.95 # (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource')96 # Compliant with RFC3986, we percent encode the url.97 if not urlparse(url).netloc:98 url = urljoin(resp.url, requote_uri(url))99 else:100 url = requote_uri(url)101 prepared_request.url = to_native_string(url)102 # Cache the url, unless it redirects to itself.103 if resp.is_permanent_redirect and req.url != prepared_request.url:104 self.redirect_cache[req.url] = prepared_request.url105 # http://tools.ietf.org/html/rfc7231#section-6.4.4106 if (resp.status_code == codes.see_other and107 method != 'HEAD'):108 method = 'GET'109 # Do what the browsers do, despite standards...110 # First, turn 302s into GETs.111 if resp.status_code == codes.found and method != 'HEAD':112 method = 'GET'113 # Second, if a POST is responded to with a 301, turn it into a GET.114 # This bizarre behaviour is explained in Issue 1704.115 if resp.status_code == codes.moved and method == 'POST':116 method = 'GET'117 prepared_request.method = method118 # https://github.com/kennethreitz/requests/issues/1084119 if resp.status_code not in (codes.temporary_redirect, codes.permanent_redirect):120 if 'Content-Length' in prepared_request.headers:121 del prepared_request.headers['Content-Length']122 prepared_request.body = None123 headers = prepared_request.headers124 try:125 del headers['Cookie']126 except KeyError:127 pass128 extract_cookies_to_jar(prepared_request._cookies, prepared_request, resp.raw)129 prepared_request._cookies.update(self.cookies)130 prepared_request.prepare_cookies(prepared_request._cookies)131 # Rebuild auth and proxy information.132 proxies = self.rebuild_proxies(prepared_request, proxies)133 self.rebuild_auth(prepared_request, resp)134 # Override the original request.135 req = prepared_request136 resp = self.send(137 req,138 stream=stream,139 timeout=timeout,140 verify=verify,141 cert=cert,142 proxies=proxies,143 allow_redirects=False,144 )145 extract_cookies_to_jar(self.cookies, prepared_request, resp.raw)146 i += 1147 yield resp148 def rebuild_auth(self, prepared_request, response):149 """150 When being redirected we may want to strip authentication from the151 request to avoid leaking credentials. This method intelligently removes152 and reapplies authentication where possible to avoid credential loss.153 """154 headers = prepared_request.headers155 url = prepared_request.url156 if 'Authorization' in headers:157 # If we get redirected to a new host, we should strip out any158 #Â authentication headers.159 original_parsed = urlparse(response.request.url)160 redirect_parsed = urlparse(url)161 if (original_parsed.hostname != redirect_parsed.hostname):162 del headers['Authorization']163 # .netrc might have more auth for us on our new host.164 new_auth = get_netrc_auth(url) if self.trust_env else None165 if new_auth is not None:166 prepared_request.prepare_auth(new_auth)167 return168 def rebuild_proxies(self, prepared_request, proxies):169 """170 This method re-evaluates the proxy configuration by considering the171 environment variables. If we are redirected to a URL covered by172 NO_PROXY, we strip the proxy configuration. Otherwise, we set missing173 proxy keys for this URL (in case they were stripped by a previous174 redirect).175 This method also replaces the Proxy-Authorization header where176 necessary.177 """178 headers = prepared_request.headers179 url = prepared_request.url180 scheme = urlparse(url).scheme181 new_proxies = proxies.copy() if proxies is not None else {}182 if self.trust_env and not should_bypass_proxies(url):183 environ_proxies = get_environ_proxies(url)184 proxy = environ_proxies.get(scheme)185 if proxy:186 new_proxies.setdefault(scheme, environ_proxies[scheme])187 if 'Proxy-Authorization' in headers:188 del headers['Proxy-Authorization']189 try:190 username, password = get_auth_from_url(new_proxies[scheme])191 except KeyError:192 username, password = None, None193 if username and password:194 headers['Proxy-Authorization'] = _basic_auth_str(username, password)195 return new_proxies196class Session(SessionRedirectMixin):197 """A Requests session.198 Provides cookie persistence, connection-pooling, and configuration.199 Basic Usage::200 >>> import requests201 >>> s = requests.Session()202 >>> s.get('http://httpbin.org/get')203 200204 """205 __attrs__ = [206 'headers', 'cookies', 'auth', 'proxies', 'hooks', 'params', 'verify',207 'cert', 'prefetch', 'adapters', 'stream', 'trust_env',208 'max_redirects', 'redirect_cache'209 ]210 def __init__(self):211 #: A case-insensitive dictionary of headers to be sent on each212 #: :class:`Request <Request>` sent from this213 #: :class:`Session <Session>`.214 self.headers = default_headers()215 #: Default Authentication tuple or object to attach to216 #: :class:`Request <Request>`.217 self.auth = None218 #: Dictionary mapping protocol to the URL of the proxy (e.g.219 #: {'http': 'foo.bar:3128'}) to be used on each220 #: :class:`Request <Request>`.221 self.proxies = {}222 #: Event-handling hooks.223 self.hooks = default_hooks()224 #: Dictionary of querystring data to attach to each225 #: :class:`Request <Request>`. The dictionary values may be lists for226 #: representing multivalued query parameters.227 self.params = {}228 #: Stream response content default.229 self.stream = False230 #: SSL Verification default.231 self.verify = True232 #: SSL certificate default.233 self.cert = None234 #: Maximum number of redirects allowed. If the request exceeds this235 #: limit, a :class:`TooManyRedirects` exception is raised.236 self.max_redirects = DEFAULT_REDIRECT_LIMIT237 #: Should we trust the environment?238 self.trust_env = True239 #: A CookieJar containing all currently outstanding cookies set on this240 #: session. By default it is a241 #: :class:`RequestsCookieJar <requests.cookies.RequestsCookieJar>`, but242 #: may be any other ``cookielib.CookieJar`` compatible object.243 self.cookies = cookiejar_from_dict({})244 # Default connection adapters.245 self.adapters = OrderedDict()246 self.mount('https://', HTTPAdapter())247 self.mount('http://', HTTPAdapter())248 self.redirect_cache = {}249 def __enter__(self):250 return self251 def __exit__(self, *args):252 self.close()253 def prepare_request(self, request):254 """Constructs a :class:`PreparedRequest <PreparedRequest>` for255 transmission and returns it. The :class:`PreparedRequest` has settings256 merged from the :class:`Request <Request>` instance and those of the257 :class:`Session`.258 :param request: :class:`Request` instance to prepare with this259 session's settings.260 """261 cookies = request.cookies or {}262 # Bootstrap CookieJar.263 if not isinstance(cookies, cookielib.CookieJar):264 cookies = cookiejar_from_dict(cookies)265 # Merge with session cookies266 merged_cookies = merge_cookies(267 merge_cookies(RequestsCookieJar(), self.cookies), cookies)268 # Set environment's basic authentication if not explicitly set.269 auth = request.auth270 if self.trust_env and not auth and not self.auth:271 auth = get_netrc_auth(request.url)272 p = PreparedRequest()273 p.prepare(274 method=request.method.upper(),275 url=request.url,276 files=request.files,277 data=request.data,278 json=request.json,279 headers=merge_setting(request.headers, self.headers, dict_class=CaseInsensitiveDict),280 params=merge_setting(request.params, self.params),281 auth=merge_setting(auth, self.auth),282 cookies=merged_cookies,283 hooks=merge_hooks(request.hooks, self.hooks),284 )285 return p286 def request(self, method, url,287 params=None,288 data=None,289 headers=None,290 cookies=None,291 files=None,292 auth=None,293 timeout=None,294 allow_redirects=True,295 proxies=None,296 hooks=None,297 stream=None,298 verify=None,299 cert=None,300 json=None):301 """Constructs a :class:`Request <Request>`, prepares it and sends it.302 Returns :class:`Response <Response>` object.303 :param method: method for the new :class:`Request` object.304 :param url: URL for the new :class:`Request` object.305 :param params: (optional) Dictionary or bytes to be sent in the query306 string for the :class:`Request`.307 :param data: (optional) Dictionary or bytes to send in the body of the308 :class:`Request`.309 :param json: (optional) json to send in the body of the310 :class:`Request`.311 :param headers: (optional) Dictionary of HTTP Headers to send with the312 :class:`Request`.313 :param cookies: (optional) Dict or CookieJar object to send with the314 :class:`Request`.315 :param files: (optional) Dictionary of ``'filename': file-like-objects``316 for multipart encoding upload.317 :param auth: (optional) Auth tuple or callable to enable318 Basic/Digest/Custom HTTP Auth.319 :param timeout: (optional) How long to wait for the server to send320 data before giving up, as a float, or a (`connect timeout, read321 timeout <user/advanced.html#timeouts>`_) tuple.322 :type timeout: float or tuple323 :param allow_redirects: (optional) Set to True by default.324 :type allow_redirects: bool325 :param proxies: (optional) Dictionary mapping protocol to the URL of326 the proxy.327 :param stream: (optional) whether to immediately download the response328 content. Defaults to ``False``.329 :param verify: (optional) if ``True``, the SSL cert will be verified.330 A CA_BUNDLE path can also be provided.331 :param cert: (optional) if String, path to ssl client cert file (.pem).332 If Tuple, ('cert', 'key') pair.333 """334 method = builtin_str(method)335 # Create the Request.336 req = Request(337 method = method.upper(),338 url = url,339 headers = headers,340 files = files,341 data = data or {},342 json = json,343 params = params or {},344 auth = auth,345 cookies = cookies,346 hooks = hooks,347 )348 prep = self.prepare_request(req)349 proxies = proxies or {}350 settings = self.merge_environment_settings(351 prep.url, proxies, stream, verify, cert352 )353 # Send the request.354 send_kwargs = {355 'timeout': timeout,356 'allow_redirects': allow_redirects,357 }358 send_kwargs.update(settings)359 resp = self.send(prep, **send_kwargs)360 return resp361 def get(self, url, **kwargs):362 """Sends a GET request. Returns :class:`Response` object.363 :param url: URL for the new :class:`Request` object.364 :param \*\*kwargs: Optional arguments that ``request`` takes.365 """366 kwargs.setdefault('allow_redirects', True)367 return self.request('GET', url, **kwargs)368 def options(self, url, **kwargs):369 """Sends a OPTIONS request. Returns :class:`Response` object.370 :param url: URL for the new :class:`Request` object.371 :param \*\*kwargs: Optional arguments that ``request`` takes.372 """373 kwargs.setdefault('allow_redirects', True)374 return self.request('OPTIONS', url, **kwargs)375 def head(self, url, **kwargs):376 """Sends a HEAD request. Returns :class:`Response` object.377 :param url: URL for the new :class:`Request` object.378 :param \*\*kwargs: Optional arguments that ``request`` takes.379 """380 kwargs.setdefault('allow_redirects', False)381 return self.request('HEAD', url, **kwargs)382 def post(self, url, data=None, json=None, **kwargs):383 """Sends a POST request. Returns :class:`Response` object.384 :param url: URL for the new :class:`Request` object.385 :param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`.386 :param json: (optional) json to send in the body of the :class:`Request`.387 :param \*\*kwargs: Optional arguments that ``request`` takes.388 """389 return self.request('POST', url, data=data, json=json, **kwargs)390 def put(self, url, data=None, **kwargs):391 """Sends a PUT request. Returns :class:`Response` object.392 :param url: URL for the new :class:`Request` object.393 :param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`.394 :param \*\*kwargs: Optional arguments that ``request`` takes.395 """396 return self.request('PUT', url, data=data, **kwargs)397 def patch(self, url, data=None, **kwargs):398 """Sends a PATCH request. Returns :class:`Response` object.399 :param url: URL for the new :class:`Request` object.400 :param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`.401 :param \*\*kwargs: Optional arguments that ``request`` takes.402 """403 return self.request('PATCH', url, data=data, **kwargs)404 def delete(self, url, **kwargs):405 """Sends a DELETE request. Returns :class:`Response` object.406 :param url: URL for the new :class:`Request` object.407 :param \*\*kwargs: Optional arguments that ``request`` takes.408 """409 return self.request('DELETE', url, **kwargs)410 def send(self, request, **kwargs):411 """Send a given PreparedRequest."""412 # Set defaults that the hooks can utilize to ensure they always have413 # the correct parameters to reproduce the previous request.414 kwargs.setdefault('stream', self.stream)415 kwargs.setdefault('verify', self.verify)416 kwargs.setdefault('cert', self.cert)417 kwargs.setdefault('proxies', self.proxies)418 # It's possible that users might accidentally send a Request object.419 # Guard against that specific failure case.420 if not isinstance(request, PreparedRequest):421 raise ValueError('You can only send PreparedRequests.')422 checked_urls = set()423 while request.url in self.redirect_cache:424 checked_urls.add(request.url)425 new_url = self.redirect_cache.get(request.url)426 if new_url in checked_urls:427 break428 request.url = new_url429 # Set up variables needed for resolve_redirects and dispatching of hooks430 allow_redirects = kwargs.pop('allow_redirects', True)431 stream = kwargs.get('stream')432 timeout = kwargs.get('timeout')433 verify = kwargs.get('verify')434 cert = kwargs.get('cert')435 proxies = kwargs.get('proxies')436 hooks = request.hooks437 # Get the appropriate adapter to use438 adapter = self.get_adapter(url=request.url)439 # Start time (approximately) of the request440 start = datetime.utcnow()441 # Send the request442 r = adapter.send(request, **kwargs)443 # Total elapsed time of the request (approximately)444 r.elapsed = datetime.utcnow() - start445 # Response manipulation hooks446 r = dispatch_hook('response', hooks, r, **kwargs)447 # Persist cookies448 if r.history:449 # If the hooks create history then we want those cookies too450 for resp in r.history:451 extract_cookies_to_jar(self.cookies, resp.request, resp.raw)452 extract_cookies_to_jar(self.cookies, request, r.raw)453 # Redirect resolving generator.454 gen = self.resolve_redirects(r, request,455 stream=stream,456 timeout=timeout,457 verify=verify,458 cert=cert,459 proxies=proxies)460 # Resolve redirects if allowed.461 history = [resp for resp in gen] if allow_redirects else []462 # Shuffle things around if there's history.463 if history:464 # Insert the first (original) request at the start465 history.insert(0, r)466 # Get the last request made467 r = history.pop()468 r.history = history469 if not stream:470 r.content471 return r472 def merge_environment_settings(self, url, proxies, stream, verify, cert):473 """Check the environment and merge it with some settings."""474 # Gather clues from the surrounding environment.475 if self.trust_env:476 # Set environment's proxies.477 env_proxies = get_environ_proxies(url) or {}478 for (k, v) in env_proxies.items():479 proxies.setdefault(k, v)480 # Look for requests environment configuration and be compatible481 # with cURL.482 if verify is True or verify is None:483 verify = (os.environ.get('REQUESTS_CA_BUNDLE') or484 os.environ.get('CURL_CA_BUNDLE'))485 # Merge all the kwargs.486 proxies = merge_setting(proxies, self.proxies)487 stream = merge_setting(stream, self.stream)488 verify = merge_setting(verify, self.verify)489 cert = merge_setting(cert, self.cert)490 return {'verify': verify, 'proxies': proxies, 'stream': stream,491 'cert': cert}492 def get_adapter(self, url):493 """Returns the appropriate connnection adapter for the given URL."""494 for (prefix, adapter) in self.adapters.items():495 if url.lower().startswith(prefix):496 return adapter497 # Nothing matches :-/498 raise InvalidSchema("No connection adapters were found for '%s'" % url)499 def close(self):500 """Closes all adapters and as such the session"""501 for v in self.adapters.values():502 v.close()503 def mount(self, prefix, adapter):504 """Registers a connection adapter to a prefix.505 Adapters are sorted in descending order by key length."""506 self.adapters[prefix] = adapter507 keys_to_move = [k for k in self.adapters if len(k) < len(prefix)]508 for key in keys_to_move:509 self.adapters[key] = self.adapters.pop(key)510 def __getstate__(self):511 return dict((attr, getattr(self, attr, None)) for attr in self.__attrs__)512 def __setstate__(self, state):513 for attr, value in state.items():514 setattr(self, attr, value)515def session():516 """Returns a :class:`Session` for context-management."""...
signer.py
Source:signer.py
1# SPDX-License-Identifier: Apache-2.02#3# The OpenSearch Contributors require contributions made to4# this file be licensed under the Apache-2.0 license or a5# compatible open source license.6#7# Modifications Copyright OpenSearch Contributors. See8# GitHub history for details.9import sys10import requests11OPENSEARCH_SERVICE = "es"12PY3 = sys.version_info[0] == 313if PY3:14 from urllib.parse import parse_qs, urlencode, urlparse15def fetch_url(prepared_request): # type: ignore16 """17 This is a util method that helps in reconstructing the request url.18 :param prepared_request: unsigned request19 :return: reconstructed url20 """21 url = urlparse(prepared_request.url)22 path = url.path or "/"23 # fetch the query string if present in the request24 querystring = ""25 if url.query:26 querystring = "?" + urlencode(27 parse_qs(url.query, keep_blank_values=True), doseq=True28 )29 # fetch the host information from headers30 headers = dict(31 (key.lower(), value) for key, value in prepared_request.headers.items()32 )33 location = headers.get("host") or url.netloc34 # construct the url and return35 return url.scheme + "://" + location + path + querystring36class AWSV4SignerAuth(requests.auth.AuthBase):37 """38 AWS V4 Request Signer for Requests.39 """40 def __init__(self, credentials, region): # type: ignore41 if not credentials:42 raise ValueError("Credentials cannot be empty")43 self.credentials = credentials44 if not region:45 raise ValueError("Region cannot be empty")46 self.region = region47 def __call__(self, request): # type: ignore48 return self._sign_request(request) # type: ignore49 def _sign_request(self, prepared_request): # type: ignore50 """51 This method helps in signing the request by injecting the required headers.52 :param prepared_request: unsigned request53 :return: signed request54 """55 from botocore.auth import SigV4Auth56 from botocore.awsrequest import AWSRequest57 url = fetch_url(prepared_request) # type: ignore58 # create an AWS request object and sign it using SigV4Auth59 aws_request = AWSRequest(60 method=prepared_request.method.upper(),61 url=url,62 data=prepared_request.body,63 )64 sig_v4_auth = SigV4Auth(self.credentials, OPENSEARCH_SERVICE, self.region)65 sig_v4_auth.add_auth(aws_request)66 # copy the headers from AWS request object into the prepared_request67 prepared_request.headers.update(dict(aws_request.headers.items()))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!