Best Python code snippet using playwright-python
test_wsgi_apps.py
Source:test_wsgi_apps.py
...32 with self.assertRaisesRegex(AssertionError, r'expect true-value'):33 self.response.status = 40434 self.assertIs(self.response.status, consts.Statuses.BAD_REQUEST)35 def test_headers(self):36 def assert_headers(headers):37 self.assertEqual(self.response.headers, headers)38 # Test methods of _Response.Headers class.39 self.assertEqual(sorted(self.response.headers), sorted(headers))40 self.assertEqual(bool(self.response.headers), bool(headers))41 self.assertEqual(len(self.response.headers), len(headers))42 for key in headers:43 with self.subTest(key):44 self.assertIn(key, self.response.headers)45 self.assertEqual(self.response.headers[key], headers[key])46 self.assertNotIn('no-such-key', self.response.headers)47 assert_headers({})48 self.response.headers['p'] = 'q'49 self.response.headers['r'] = 's'50 assert_headers({'p': 'q', 'r': 's'})51 del self.response.headers['r']52 assert_headers({'p': 'q'})53 self.response.commit()54 assert_headers({'p': 'q'})55 with self.assertRaisesRegex(AssertionError, r'expect true-value'):56 self.response.headers['x'] = 'y'57 with self.assertRaisesRegex(AssertionError, r'expect true-value'):58 del self.response.headers['x']59 assert_headers({'p': 'q'})60 self.response.close()61 assert_headers({'p': 'q'})62 with self.assertRaisesRegex(AssertionError, r'expect true-value'):63 self.response.headers['x'] = 'y'64 with self.assertRaisesRegex(AssertionError, r'expect true-value'):65 del self.response.headers['x']66 assert_headers({'p': 'q'})67 @kernels.with_kernel68 def test_reset(self):69 self.response.status = 40070 self.response.headers['p'] = 'q'71 kernels.run(self.response.write(b'hello world'))72 self.response.reset()73 self.assertIs(self.response.status, consts.Statuses.OK)74 self.assertEqual(self.response.headers, {})75 self.assertIsNone(self.response._precommit.read_nonblocking())76 self.response.commit()77 with self.assertRaisesRegex(AssertionError, r'expect true-value'):78 self.response.reset()79 self.response.close()80 with self.assertRaisesRegex(AssertionError, r'expect true-value'):...
utils.py
Source:utils.py
1import datetime2import hashlib3import hmac4from copy import deepcopy5from functools import wraps6from urllib.parse import urlparse, parse_qsl, quote7SCOPE = 'dl1_request'8class DLSigner(object):9 __slots__ = ['service', 'access_key', 'secret_key', 'algorithm', 'solution', 'hash_method']10 def __init__(self, service, access_key, secret_key, algorithm='DL-HMAC-SHA256', solution='RING'):11 """12 Signer initialization, accepts arguments that are constant in13 signing process and not related to specific request14 :param service:15 :param access_key: Key that allows you to access to API16 :param secret_key: Key17 :param algorithm: One of following hashing algorithms:18 * DL-HMAC-SHASHA224,19 * DL-HMAC-SHASHA256, - if algorithm param is missing, used as default value20 * DL-HMAC-SHASHA384,21 * DL-HMAC-SHASHA51222 :param solution: Solution which aggregates a several services.23 """24 assert service, 'Missing service parameter.'25 self.service = service26 assert access_key, 'Missing access_key parameter.'27 self.access_key = access_key28 assert secret_key, 'Missing secret_key parameter'29 self.secret_key = secret_key30 self.solution = solution31 assert algorithm.startswith('DL-HMAC-SHA'), 'Invalid hashing method.'32 self.algorithm = algorithm33 self.hash_method = algorithm.split('-')[-1].lower()34 assert self.hash_method in (35 'sha224', 'sha256', 'sha384', 'sha512'), 'Invalid hashing algorithm.'36 self.hash_method = getattr(hashlib, self.hash_method)37 @staticmethod38 def _check_sign_params(request):39 """Checks params of request dictionary."""40 assert 'headers' in request, 'Missing headers.'41 assert_headers = set(k.lower() for k in request['headers'])42 assert 'host' in assert_headers, 'Missing Host parameter.'43 if 'body' in request:44 assert isinstance(request['body'], bytearray), \45 f'Body must be instance of bytes. not {type(request["body"])}'46 assert 'content-type' in assert_headers47 copied_request = deepcopy(request)48 if 'x-dl-date' not in assert_headers:49 copied_request['headers']['X-DL-Date'] = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')50 del assert_headers51 return copied_request52 def _sign(self, key, msg, hex_output=False):53 """Performs hashing, returns digest or hexdigest depending on 'hex_output' argument."""54 key = key if isinstance(key, bytes) else key.encode('utf-8')55 msg = msg if isinstance(msg, bytes) else msg.encode('utf-8')56 sign = hmac.new(key, msg, self.hash_method)57 return sign.digest() if not hex_output else sign.hexdigest()58 def _get_canonical_request(self, request):59 """Return formatted string of canonical request data."""60 method = request['method']61 uri = urlparse(request.get('uri', '/'))62 payload = request.get('body', b'')63 headers = self._get_headers(request)64 params = parse_qsl(uri.query, keep_blank_values=True)65 params = '&'.join('{}={}'.format(quote(k, safe='-_.~'), quote(v, safe='-_.~')) for k, v in sorted(params))66 return "{method}\n{uri}\n{params}\n{canonical_headers}\n{signed_headers}\n{payload_hash}".format(67 method=method,68 uri=quote(uri.path, safe='/-_.~'),69 params=params,70 canonical_headers=headers['canonical_headers'],71 signed_headers=headers['signed_headers'],72 payload_hash=self.hash_method(payload).hexdigest()73 )74 @staticmethod75 def _get_headers(request):76 """Method returning dictionary with formatted strings of canonical_headers and signed_headers."""77 canonical_headers = []78 signed_headers = []79 for header_key, header_value in sorted(request['headers'].items(), key=lambda s: s[0].lower()):80 canonical_headers.append('{}:{}'.format(header_key.lower(), header_value.strip()))81 signed_headers.append(header_key.lower())82 canonical_headers = '\n'.join(canonical_headers)83 signed_headers = ';'.join(signed_headers)84 return {'canonical_headers': canonical_headers,85 'signed_headers': signed_headers}86 def _get_string_to_sign(self, canonical_request, date):87 return "{algorithm}\n{date}\n{scope}\n{canonical_request_hash}".format(88 algorithm=self.algorithm,89 date=date,90 scope=date[:8] + '/' + self.solution + '/' + self.service + '/' + SCOPE,91 canonical_request_hash=self.hash_method(canonical_request.encode('utf-8')).hexdigest()92 )93 def _get_signing_key(self, date):94 key = self._sign('DL' + self.secret_key, date[:8])95 key = self._sign(key, self.solution)96 key = self._sign(key, self.service)97 key = self._sign(key, SCOPE)98 return key99 def _get_signature(self, request):100 """Get_signature is calling other methods to process data to finally101 return a signature. """102 canonical_request = self._get_canonical_request(request)103 string_to_sign = self._get_string_to_sign(canonical_request, request['headers']['X-DL-Date'])104 signing_key = self._get_signing_key(request['headers']['X-DL-Date'])105 signature = self._sign(signing_key, string_to_sign, True)106 return signature107 def sign(self, original_request):108 """109 Signs request and returns dictionary with parameters required for authorization process.110 :param original_request: has to be an instance of dict with keys:111 * method: - with values POST/GET/PUT/DELETE112 * uri: URI of the request. If there is no URI given in request dict,113 program will insert default value of URI.114 * headers: - headers of your requests. This key has to be a dictionary.115 Into headers you have to put 'host' key.116 * payload: - optional.117 :returns: dict:118 """119 request = self._check_sign_params(original_request)120 return {121 'Authorization':122 '{algorithm} Credential={credentials},SignedHeaders={signed_headers},Signature={signature}'.format(123 algorithm=self.algorithm.upper(),124 credentials=self.access_key + '/' + request['headers']['X-DL-Date'][:8] +125 '/' + self.solution + '/'126 + self.service + '/' + SCOPE,127 signed_headers=self._get_headers(request)['signed_headers'],128 signature=self._get_signature(request)129 ),130 'X-DL-Date': request['headers']['X-DL-Date']}131class ClientError(Exception):132 def __init__(self, code, message, data=''):133 self.code = code134 self.message = message135 self.data = data136 def __str__(self):137 return f'code={self.code} message={self.message}' \138 f'{f" data={self.data}" if self.data else ""}'139def attr_generator(cls):140 def getattr(self, item):141 if item.isidentifier() and not item.startswith('_'):142 return cls(item, self)143 raise AttributeError(f'{cls.__name__} must be public method')144 def add_attr(klass):145 klass.__getattr__ = getattr146 return klass147 return add_attr148def convert(func, iterable_type):149 @wraps(func)150 def converted(*args, **kwargs):151 return iterable_type(func(*args, **kwargs))...
test_confluence.py
Source:test_confluence.py
...115 self.api._session = client116 got = self.api.get_author('foo')117 self.assertEqual(got['userKey'], userKey)118class TestConfluenceHeaders(unittest.TestCase):119 def assert_headers(self, got, want):120 for name, value in want.items():121 self.assertEqual(got[name], value)122 def testHeaders(self):123 headers = ['Cookie: NAME=VALUE', 'X-CUSTOM-HEADER: VALUE']124 want = {'Cookie': 'NAME=VALUE', 'X-CUSTOM-HEADER': 'VALUE'}125 api = Confluence(api_url='https://wiki.example.com/rest/api',126 headers=headers)127 self.assert_headers(api._session.headers, want)128 def testHeadersDuplicates(self):129 # HTTP headers are case insensitive. If multiple headers with the same130 # name are passed, the last one should win.131 headers = ['X-CUSTOM-HEADER: foo', 'X-Custom-Header: bar', 'x-custom-header: baz']132 want = {'x-custom-header': 'baz'}133 api = Confluence(api_url='https://wiki.example.com/rest/api',134 headers=headers)135 self.assert_headers(api._session.headers, want)136 def testHeadersNoValue(self):137 # If no value is set, an empty string should be used.138 headers = ['X-CUSTOM-HEADER-1:', 'X-CUSTOM-HEADER-2']139 want = {'X-CUSTOM-HEADER-1': '', 'X-CUSTOM-HEADER-2': ''}140 api = Confluence(api_url='https://wiki.example.com/rest/api',141 headers=headers)142 self.assert_headers(api._session.headers, want)143if __name__ == '__main__':...
test_download.py
Source:test_download.py
...18 # our google test urls dont support HEAD19 req = requests.get(url)20 # we test against binary response: Content-Length not accurate as gzip-encoded21 assert file.stat().st_size == len(req.content)22def assert_headers(returned_headers):23 assert isinstance(returned_headers, requests.structures.CaseInsensitiveDict)24 assert returned_headers["Content-Type"] == "image/x-icon"25def get_dest_file(tmp_path):26 return tmp_path.joinpath("favicon.ico")27def test_missing_dest(tmp_path):28 with pytest.raises(requests.exceptions.ConnectionError):29 stream_file(url="http://some_url", byte_stream=io.BytesIO())30def test_invalid_url(tmp_path, invalid_url):31 dest_file = tmp_path / "favicon.ico"32 with pytest.raises(requests.exceptions.ConnectionError):33 stream_file(url=invalid_url, fpath=dest_file)34def test_no_output_supplied(valid_http_url):35 with pytest.raises(36 ValueError, match="Either file path or a bytesIO object is needed"37 ):38 stream_file(url=valid_http_url)39def test_first_block_download(valid_http_url):40 byte_stream = io.BytesIO()41 size, ret = stream_file(42 url=valid_http_url, byte_stream=byte_stream, only_first_block=True43 )44 assert_headers(ret)45 # valid_http_url randomly returns gzip-encoded content.46 # otherwise, expected size is default block size47 expected = 3062 if ret.get("Content-Encoding") == "gzip" else 102448 assert len(byte_stream.read()) <= expected49@pytest.mark.slow50def test_save_http(tmp_path, valid_http_url):51 dest_file = tmp_path / "favicon.ico"52 size, ret = stream_file(url=valid_http_url, fpath=dest_file)53 assert_headers(ret)54 assert_downloaded_file(valid_http_url, dest_file)55@pytest.mark.slow56def test_save_https(tmp_path, valid_https_url):57 dest_file = tmp_path / "favicon.ico"58 size, ret = stream_file(url=valid_https_url, fpath=dest_file)59 assert_headers(ret)60 assert_downloaded_file(valid_https_url, dest_file)61@pytest.mark.slow62def test_stream_to_bytes(valid_https_url):63 byte_stream = io.BytesIO()64 size, ret = stream_file(url=valid_https_url, byte_stream=byte_stream)65 assert_headers(ret)66 assert byte_stream.read() == requests.get(valid_https_url).content67@pytest.mark.slow68def test_save_parent_folder_missing(tmp_path, valid_http_url):69 dest_file = tmp_path / "some-folder" / "favicon.ico"70 with pytest.raises(IOError):71 stream_file(url=valid_http_url, fpath=dest_file)72@pytest.mark.slow73def test_save_http_error(tmp_path, http_error_url):74 dest_file = tmp_path / "favicon.ico"75 with pytest.raises(requests.exceptions.HTTPError):76 stream_file(url=http_error_url, fpath=dest_file)77@pytest.mark.slow78def test_large_download_http(tmp_path, valid_http_url):79 dest_file = tmp_path / "favicon.ico"...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!