Best Python code snippet using playwright-python
test_http_headers.py
Source:test_http_headers.py
...199 """200 Tests for the backwards compatible C{dict} interface for L{Headers}201 provided by L{_DictHeaders}.202 """203 def headers(self, **kw):204 """205 Create a L{Headers} instance populated with the header name/values206 specified by C{kw} and a L{_DictHeaders} wrapped around it and return207 them both.208 """209 h = Headers()210 for k, v in kw.items():211 h.setRawHeaders(k.encode('ascii'), v)212 return h, _DictHeaders(h)213 def test_getItem(self):214 """215 L{_DictHeaders.__getitem__} returns a single header for the given name.216 """217 headers, wrapper = self.headers(test=[b"lemur"])218 self.assertEqual(wrapper[b"test"], b"lemur")219 def test_getItemMultiple(self):220 """221 L{_DictHeaders.__getitem__} returns only the last header value for a222 given name.223 """224 headers, wrapper = self.headers(test=[b"lemur", b"panda"])225 self.assertEqual(wrapper[b"test"], b"panda")226 def test_getItemMissing(self):227 """228 L{_DictHeaders.__getitem__} raises L{KeyError} if called with a header229 which is not present.230 """231 headers, wrapper = self.headers()232 exc = self.assertRaises(KeyError, wrapper.__getitem__, b"test")233 self.assertEqual(exc.args, (b"test",))234 def test_iteration(self):235 """236 L{_DictHeaders.__iter__} returns an iterator the elements of which237 are the lowercase name of each header present.238 """239 headers, wrapper = self.headers(foo=[b"lemur", b"panda"], bar=[b"baz"])240 self.assertEqual(set(list(wrapper)), set([b"foo", b"bar"]))241 def test_length(self):242 """243 L{_DictHeaders.__len__} returns the number of headers present.244 """245 headers, wrapper = self.headers()246 self.assertEqual(len(wrapper), 0)247 headers.setRawHeaders(b"foo", [b"bar"])248 self.assertEqual(len(wrapper), 1)249 headers.setRawHeaders(b"test", [b"lemur", b"panda"])250 self.assertEqual(len(wrapper), 2)251 def test_setItem(self):252 """253 L{_DictHeaders.__setitem__} sets a single header value for the given254 name.255 """256 headers, wrapper = self.headers()257 wrapper[b"test"] = b"lemur"258 self.assertEqual(headers.getRawHeaders(b"test"), [b"lemur"])259 def test_setItemOverwrites(self):260 """261 L{_DictHeaders.__setitem__} will replace any previous header values for262 the given name.263 """264 headers, wrapper = self.headers(test=[b"lemur", b"panda"])265 wrapper[b"test"] = b"lemur"266 self.assertEqual(headers.getRawHeaders(b"test"), [b"lemur"])267 def test_delItem(self):268 """269 L{_DictHeaders.__delitem__} will remove the header values for the given270 name.271 """272 headers, wrapper = self.headers(test=[b"lemur"])273 del wrapper[b"test"]274 self.assertFalse(headers.hasHeader(b"test"))275 def test_delItemMissing(self):276 """277 L{_DictHeaders.__delitem__} will raise L{KeyError} if the given name is278 not present.279 """280 headers, wrapper = self.headers()281 exc = self.assertRaises(KeyError, wrapper.__delitem__, b"test")282 self.assertEqual(exc.args, (b"test",))283 def test_keys(self, _method='keys', _requireList=not _PY3):284 """285 L{_DictHeaders.keys} will return a list of all present header names.286 """287 headers, wrapper = self.headers(test=[b"lemur"], foo=[b"bar"])288 keys = getattr(wrapper, _method)()289 if _requireList:290 self.assertIsInstance(keys, list)291 self.assertEqual(set(keys), set([b"foo", b"test"]))292 def test_iterkeys(self):293 """294 L{_DictHeaders.iterkeys} will return all present header names.295 """296 self.test_keys('iterkeys', False)297 def test_values(self, _method='values', _requireList=not _PY3):298 """299 L{_DictHeaders.values} will return a list of all present header values,300 returning only the last value for headers with more than one.301 """302 headers, wrapper = self.headers(303 foo=[b"lemur"], bar=[b"marmot", b"panda"])304 values = getattr(wrapper, _method)()305 if _requireList:306 self.assertIsInstance(values, list)307 self.assertEqual(set(values), set([b"lemur", b"panda"]))308 def test_itervalues(self):309 """310 L{_DictHeaders.itervalues} will return all present header values,311 returning only the last value for headers with more than one.312 """313 self.test_values('itervalues', False)314 def test_items(self, _method='items', _requireList=not _PY3):315 """316 L{_DictHeaders.items} will return a list of all present header names317 and values as tuples, returning only the last value for headers with318 more than one.319 """320 headers, wrapper = self.headers(321 foo=[b"lemur"], bar=[b"marmot", b"panda"])322 items = getattr(wrapper, _method)()323 if _requireList:324 self.assertIsInstance(items, list)325 self.assertEqual(326 set(items), set([(b"foo", b"lemur"), (b"bar", b"panda")]))327 def test_iteritems(self):328 """329 L{_DictHeaders.iteritems} will return all present header names and330 values as tuples, returning only the last value for headers with more331 than one.332 """333 self.test_items('iteritems', False)334 def test_clear(self):335 """336 L{_DictHeaders.clear} will remove all headers.337 """338 headers, wrapper = self.headers(foo=[b"lemur"], bar=[b"panda"])339 wrapper.clear()340 self.assertEqual(list(headers.getAllRawHeaders()), [])341 def test_copy(self):342 """343 L{_DictHeaders.copy} will return a C{dict} with all the same headers344 and the last value for each.345 """346 headers, wrapper = self.headers(347 foo=[b"lemur", b"panda"], bar=[b"marmot"])348 duplicate = wrapper.copy()349 self.assertEqual(duplicate, {b"foo": b"panda", b"bar": b"marmot"})350 def test_get(self):351 """352 L{_DictHeaders.get} returns the last value for the given header name.353 """354 headers, wrapper = self.headers(foo=[b"lemur", b"panda"])355 self.assertEqual(wrapper.get(b"foo"), b"panda")356 def test_getMissing(self):357 """358 L{_DictHeaders.get} returns C{None} for a header which is not present.359 """360 headers, wrapper = self.headers()361 self.assertIdentical(wrapper.get(b"foo"), None)362 def test_getDefault(self):363 """364 L{_DictHeaders.get} returns the last value for the given header name365 even when it is invoked with a default value.366 """367 headers, wrapper = self.headers(foo=[b"lemur"])368 self.assertEqual(wrapper.get(b"foo", b"bar"), b"lemur")369 def test_getDefaultMissing(self):370 """371 L{_DictHeaders.get} returns the default value specified if asked for a372 header which is not present.373 """374 headers, wrapper = self.headers()375 self.assertEqual(wrapper.get(b"foo", b"bar"), b"bar")376 def test_has_key(self):377 """378 L{_DictHeaders.has_key} returns C{True} if the given header is present,379 C{False} otherwise.380 """381 headers, wrapper = self.headers(foo=[b"lemur"])382 self.assertTrue(wrapper.has_key(b"foo"))383 self.assertFalse(wrapper.has_key(b"bar"))384 def test_contains(self):385 """386 L{_DictHeaders.__contains__} returns C{True} if the given header is387 present, C{False} otherwise.388 """389 headers, wrapper = self.headers(foo=[b"lemur"])390 self.assertIn(b"foo", wrapper)391 self.assertNotIn(b"bar", wrapper)392 def test_pop(self):393 """394 L{_DictHeaders.pop} returns the last header value associated with the395 given header name and removes the header.396 """397 headers, wrapper = self.headers(foo=[b"lemur", b"panda"])398 self.assertEqual(wrapper.pop(b"foo"), b"panda")399 self.assertIdentical(headers.getRawHeaders(b"foo"), None)400 def test_popMissing(self):401 """402 L{_DictHeaders.pop} raises L{KeyError} if passed a header name which is403 not present.404 """405 headers, wrapper = self.headers()406 self.assertRaises(KeyError, wrapper.pop, b"foo")407 def test_popDefault(self):408 """409 L{_DictHeaders.pop} returns the last header value associated with the410 given header name and removes the header, even if it is supplied with a411 default value.412 """413 headers, wrapper = self.headers(foo=[b"lemur"])414 self.assertEqual(wrapper.pop(b"foo", b"bar"), b"lemur")415 self.assertIdentical(headers.getRawHeaders(b"foo"), None)416 def test_popDefaultMissing(self):417 """418 L{_DictHeaders.pop} returns the default value is asked for a header419 name which is not present.420 """421 headers, wrapper = self.headers(foo=[b"lemur"])422 self.assertEqual(wrapper.pop(b"bar", b"baz"), b"baz")423 self.assertEqual(headers.getRawHeaders(b"foo"), [b"lemur"])424 def test_popitem(self):425 """426 L{_DictHeaders.popitem} returns some header name/value pair.427 """428 headers, wrapper = self.headers(foo=[b"lemur", b"panda"])429 self.assertEqual(wrapper.popitem(), (b"foo", b"panda"))430 self.assertIdentical(headers.getRawHeaders(b"foo"), None)431 def test_popitemEmpty(self):432 """433 L{_DictHeaders.popitem} raises L{KeyError} if there are no headers434 present.435 """436 headers, wrapper = self.headers()437 self.assertRaises(KeyError, wrapper.popitem)438 def test_update(self):439 """440 L{_DictHeaders.update} adds the header/value pairs in the C{dict} it is441 passed, overriding any existing values for those headers.442 """443 headers, wrapper = self.headers(foo=[b"lemur"])444 wrapper.update({b"foo": b"panda", b"bar": b"marmot"})445 self.assertEqual(headers.getRawHeaders(b"foo"), [b"panda"])446 self.assertEqual(headers.getRawHeaders(b"bar"), [b"marmot"])447 def test_updateWithKeywords(self):448 """449 L{_DictHeaders.update} adds header names given as keyword arguments450 with the keyword values as the header value.451 """452 headers, wrapper = self.headers(foo=[b"lemur"])453 wrapper.update(foo=b"panda", bar=b"marmot")454 self.assertEqual(headers.getRawHeaders(b"foo"), [b"panda"])455 self.assertEqual(headers.getRawHeaders(b"bar"), [b"marmot"])456 if _PY3:457 test_updateWithKeywords.skip = "Not yet supported on Python 3; see #6082."458 def test_setdefaultMissing(self):459 """460 If passed the name of a header which is not present,461 L{_DictHeaders.setdefault} sets the value of the given header to the462 specified default value and returns it.463 """464 headers, wrapper = self.headers(foo=[b"bar"])465 self.assertEqual(wrapper.setdefault(b"baz", b"quux"), b"quux")466 self.assertEqual(headers.getRawHeaders(b"foo"), [b"bar"])467 self.assertEqual(headers.getRawHeaders(b"baz"), [b"quux"])468 def test_setdefaultPresent(self):469 """470 If passed the name of a header which is present,471 L{_DictHeaders.setdefault} makes no changes to the headers and472 returns the last value already associated with that header.473 """474 headers, wrapper = self.headers(foo=[b"bar", b"baz"])475 self.assertEqual(wrapper.setdefault(b"foo", b"quux"), b"baz")476 self.assertEqual(headers.getRawHeaders(b"foo"), [b"bar", b"baz"])477 def test_setdefaultDefault(self):478 """479 If a value is not passed to L{_DictHeaders.setdefault}, C{None} is480 used.481 """482 # This results in an invalid state for the headers, but maybe some483 # application is doing this an intermediate step towards some other484 # state. Anyway, it was broken with the old implementation so it's485 # broken with the new implementation. Compatibility, for the win.486 # -exarkun487 headers, wrapper = self.headers()488 self.assertIdentical(wrapper.setdefault(b"foo"), None)489 self.assertEqual(headers.getRawHeaders(b"foo"), [None])490 def test_dictComparison(self):491 """492 An instance of L{_DictHeaders} compares equal to a C{dict} which493 contains the same header/value pairs. For header names with multiple494 values, the last value only is considered.495 """496 headers, wrapper = self.headers(foo=[b"lemur"], bar=[b"panda", b"marmot"])497 self.assertNotEqual(wrapper, {b"foo": b"lemur", b"bar": b"panda"})498 self.assertEqual(wrapper, {b"foo": b"lemur", b"bar": b"marmot"})499 def test_otherComparison(self):500 """501 An instance of L{_DictHeaders} does not compare equal to other502 unrelated objects.503 """504 headers, wrapper = self.headers()505 self.assertNotEqual(wrapper, ())506 self.assertNotEqual(wrapper, object())507 self.assertNotEqual(wrapper, b"foo")508 if _PY3:509 # Python 3 lacks these APIs...
test_base.py
Source:test_base.py
1# encoding: utf-82from nose import tools as nose_tools3import ckan.tests.helpers as helpers4import ckan.plugins as p5import ckan.tests.factories as factories6class TestRenderSnippet(helpers.FunctionalTestBase):7 """8 Test ``ckan.lib.base.render_snippet``.9 """10 @helpers.change_config('debug', True)11 def test_comment_present_if_debug_true(self):12 response = self._get_test_app().get('/')13 assert '<!-- Snippet ' in response14 @helpers.change_config('debug', False)15 def test_comment_absent_if_debug_false(self):16 response = self._get_test_app().get('/')17 assert '<!-- Snippet ' not in response18class TestGetUserForApikey(helpers.FunctionalTestBase):19 def test_apikey_missing(self):20 app = self._get_test_app()21 request_headers = {}22 app.get('/dataset/new', headers=request_headers, status=403)23 def test_apikey_in_authorization_header(self):24 user = factories.Sysadmin()25 app = self._get_test_app()26 request_headers = {'Authorization': str(user['apikey'])}27 app.get('/dataset/new', headers=request_headers)28 def test_apikey_in_x_ckan_header(self):29 user = factories.Sysadmin()30 app = self._get_test_app()31 # non-standard header name is defined in test-core.ini32 request_headers = {'X-Non-Standard-CKAN-API-Key': str(user['apikey'])}33 app.get('/dataset/new', headers=request_headers)34 def test_apikey_contains_unicode(self):35 # there is no valid apikey containing unicode, but we should fail36 # nicely if unicode is supplied37 app = self._get_test_app()38 request_headers = {'Authorization': '\xc2\xb7'}39 app.get('/dataset/new', headers=request_headers, status=403)40class TestCORS(helpers.FunctionalTestBase):41 def test_options(self):42 app = self._get_test_app()43 response = app.options(url='/', status=200)44 assert len(str(response.body)) == 0, 'OPTIONS must return no content'45 def test_cors_config_no_cors(self):46 '''47 No ckan.cors settings in config, so no Access-Control-Allow headers in48 response.49 '''50 app = self._get_test_app()51 response = app.get('/')52 response_headers = dict(response.headers)53 assert 'Access-Control-Allow-Origin' not in response_headers54 assert 'Access-Control-Allow-Methods' not in response_headers55 assert 'Access-Control-Allow-Headers' not in response_headers56 def test_cors_config_no_cors_with_origin(self):57 '''58 No ckan.cors settings in config, so no Access-Control-Allow headers in59 response, even with origin header in request.60 '''61 app = self._get_test_app()62 request_headers = {'Origin': 'http://thirdpartyrequests.org'}63 response = app.get('/', headers=request_headers)64 response_headers = dict(response.headers)65 assert 'Access-Control-Allow-Origin' not in response_headers66 assert 'Access-Control-Allow-Methods' not in response_headers67 assert 'Access-Control-Allow-Headers' not in response_headers68 @helpers.change_config('ckan.cors.origin_allow_all', 'true')69 def test_cors_config_origin_allow_all_true_no_origin(self):70 '''71 With origin_allow_all set to true, but no origin in the request72 header, no Access-Control-Allow headers should be in the response.73 '''74 app = self._get_test_app()75 response = app.get('/')76 response_headers = dict(response.headers)77 assert 'Access-Control-Allow-Origin' not in response_headers78 assert 'Access-Control-Allow-Methods' not in response_headers79 assert 'Access-Control-Allow-Headers' not in response_headers80 @helpers.change_config('ckan.cors.origin_allow_all', 'true')81 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')82 def test_cors_config_origin_allow_all_true_with_origin(self):83 '''84 With origin_allow_all set to true, and an origin in the request85 header, the appropriate Access-Control-Allow headers should be in the86 response.87 '''88 app = self._get_test_app()89 request_headers = {'Origin': 'http://thirdpartyrequests.org'}90 response = app.get('/', headers=request_headers)91 response_headers = dict(response.headers)92 assert 'Access-Control-Allow-Origin' in response_headers93 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], '*')94 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")95 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")96 @helpers.change_config('ckan.cors.origin_allow_all', 'false')97 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')98 def test_cors_config_origin_allow_all_false_with_origin_without_whitelist(self):99 '''100 With origin_allow_all set to false, with an origin in the request101 header, but no whitelist defined, there should be no Access-Control-102 Allow headers in the response.103 '''104 app = self._get_test_app()105 request_headers = {'Origin': 'http://thirdpartyrequests.org'}106 response = app.get('/', headers=request_headers)107 response_headers = dict(response.headers)108 assert 'Access-Control-Allow-Origin' not in response_headers109 assert 'Access-Control-Allow-Methods' not in response_headers110 assert 'Access-Control-Allow-Headers' not in response_headers111 @helpers.change_config('ckan.cors.origin_allow_all', 'false')112 @helpers.change_config('ckan.cors.origin_whitelist', 'http://thirdpartyrequests.org')113 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')114 def test_cors_config_origin_allow_all_false_with_whitelisted_origin(self):115 '''116 With origin_allow_all set to false, with an origin in the request117 header, and a whitelist defined (containing the origin), the118 appropriate Access-Control-Allow headers should be in the response.119 '''120 app = self._get_test_app()121 request_headers = {'Origin': 'http://thirdpartyrequests.org'}122 response = app.get('/', headers=request_headers)123 response_headers = dict(response.headers)124 assert 'Access-Control-Allow-Origin' in response_headers125 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')126 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")127 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")128 @helpers.change_config('ckan.cors.origin_allow_all', 'false')129 @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://thirdpartyrequests.org http://yahoo.co.uk')130 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')131 def test_cors_config_origin_allow_all_false_with_multiple_whitelisted_origins(self):132 '''133 With origin_allow_all set to false, with an origin in the request134 header, and a whitelist defining multiple allowed origins (containing135 the origin), the appropriate Access-Control-Allow headers should be in136 the response.137 '''138 app = self._get_test_app()139 request_headers = {'Origin': 'http://thirdpartyrequests.org'}140 response = app.get('/', headers=request_headers)141 response_headers = dict(response.headers)142 assert 'Access-Control-Allow-Origin' in response_headers143 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')144 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")145 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")146 @helpers.change_config('ckan.cors.origin_allow_all', 'false')147 @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://yahoo.co.uk')148 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')149 def test_cors_config_origin_allow_all_false_with_whitelist_not_containing_origin(self):150 '''151 With origin_allow_all set to false, with an origin in the request152 header, and a whitelist defining multiple allowed origins (but not153 containing the requesting origin), there should be no Access-Control-154 Allow headers in the response.155 '''156 app = self._get_test_app()157 request_headers = {'Origin': 'http://thirdpartyrequests.org'}158 response = app.get('/', headers=request_headers)159 response_headers = dict(response.headers)160 assert 'Access-Control-Allow-Origin' not in response_headers161 assert 'Access-Control-Allow-Methods' not in response_headers162 assert 'Access-Control-Allow-Headers' not in response_headers163class TestCORSFlask(helpers.FunctionalTestBase):164 @classmethod165 def setup_class(cls):166 super(TestCORSFlask, cls).setup_class()167 cls.app = cls._get_test_app()168 flask_app = cls.app.flask_app169 if not p.plugin_loaded('test_routing_plugin'):170 p.load('test_routing_plugin')171 plugin = p.get_plugin('test_routing_plugin')172 flask_app.register_blueprint(plugin.get_blueprint(),173 prioritise_rules=True)174 @classmethod175 def teardown_class(cls):176 super(TestCORSFlask, cls).teardown_class()177 p.unload('test_routing_plugin')178 def test_options(self):179 response = self.app.options(url='/simple_flask', status=200)180 assert len(str(response.body)) == 0, 'OPTIONS must return no content'181 def test_cors_config_no_cors(self):182 '''183 No ckan.cors settings in config, so no Access-Control-Allow headers in184 response.185 '''186 response = self.app.get('/simple_flask')187 response_headers = dict(response.headers)188 assert 'Access-Control-Allow-Origin' not in response_headers189 assert 'Access-Control-Allow-Methods' not in response_headers190 assert 'Access-Control-Allow-Headers' not in response_headers191 def test_cors_config_no_cors_with_origin(self):192 '''193 No ckan.cors settings in config, so no Access-Control-Allow headers in194 response, even with origin header in request.195 '''196 request_headers = {'Origin': 'http://thirdpartyrequests.org'}197 response = self.app.get('/simple_flask', headers=request_headers)198 response_headers = dict(response.headers)199 assert 'Access-Control-Allow-Origin' not in response_headers200 assert 'Access-Control-Allow-Methods' not in response_headers201 assert 'Access-Control-Allow-Headers' not in response_headers202 @helpers.change_config('ckan.cors.origin_allow_all', 'true')203 def test_cors_config_origin_allow_all_true_no_origin(self):204 '''205 With origin_allow_all set to true, but no origin in the request206 header, no Access-Control-Allow headers should be in the response.207 '''208 response = self.app.get('/simple_flask')209 response_headers = dict(response.headers)210 assert 'Access-Control-Allow-Origin' not in response_headers211 assert 'Access-Control-Allow-Methods' not in response_headers212 assert 'Access-Control-Allow-Headers' not in response_headers213 @helpers.change_config('ckan.cors.origin_allow_all', 'true')214 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')215 def test_cors_config_origin_allow_all_true_with_origin(self):216 '''217 With origin_allow_all set to true, and an origin in the request218 header, the appropriate Access-Control-Allow headers should be in the219 response.220 '''221 request_headers = {'Origin': 'http://thirdpartyrequests.org'}222 response = self.app.get('/simple_flask', headers=request_headers)223 response_headers = dict(response.headers)224 assert 'Access-Control-Allow-Origin' in response_headers225 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], '*')226 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")227 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")228 @helpers.change_config('ckan.cors.origin_allow_all', 'false')229 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')230 def test_cors_config_origin_allow_all_false_with_origin_without_whitelist(self):231 '''232 With origin_allow_all set to false, with an origin in the request233 header, but no whitelist defined, there should be no Access-Control-234 Allow headers in the response.235 '''236 request_headers = {'Origin': 'http://thirdpartyrequests.org'}237 response = self.app.get('/simple_flask', headers=request_headers)238 response_headers = dict(response.headers)239 assert 'Access-Control-Allow-Origin' not in response_headers240 assert 'Access-Control-Allow-Methods' not in response_headers241 assert 'Access-Control-Allow-Headers' not in response_headers242 @helpers.change_config('ckan.cors.origin_allow_all', 'false')243 @helpers.change_config('ckan.cors.origin_whitelist', 'http://thirdpartyrequests.org')244 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')245 def test_cors_config_origin_allow_all_false_with_whitelisted_origin(self):246 '''247 With origin_allow_all set to false, with an origin in the request248 header, and a whitelist defined (containing the origin), the249 appropriate Access-Control-Allow headers should be in the response.250 '''251 request_headers = {'Origin': 'http://thirdpartyrequests.org'}252 response = self.app.get('/simple_flask', headers=request_headers)253 response_headers = dict(response.headers)254 assert 'Access-Control-Allow-Origin' in response_headers255 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')256 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")257 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")258 @helpers.change_config('ckan.cors.origin_allow_all', 'false')259 @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://thirdpartyrequests.org http://yahoo.co.uk')260 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')261 def test_cors_config_origin_allow_all_false_with_multiple_whitelisted_origins(self):262 '''263 With origin_allow_all set to false, with an origin in the request264 header, and a whitelist defining multiple allowed origins (containing265 the origin), the appropriate Access-Control-Allow headers should be in266 the response.267 '''268 request_headers = {'Origin': 'http://thirdpartyrequests.org'}269 response = self.app.get('/simple_flask', headers=request_headers)270 response_headers = dict(response.headers)271 assert 'Access-Control-Allow-Origin' in response_headers272 nose_tools.assert_equal(response_headers['Access-Control-Allow-Origin'], 'http://thirdpartyrequests.org')273 nose_tools.assert_equal(response_headers['Access-Control-Allow-Methods'], "POST, PUT, GET, DELETE, OPTIONS")274 nose_tools.assert_equal(response_headers['Access-Control-Allow-Headers'], "X-CKAN-API-KEY, Authorization, Content-Type")275 @helpers.change_config('ckan.cors.origin_allow_all', 'false')276 @helpers.change_config('ckan.cors.origin_whitelist', 'http://google.com http://yahoo.co.uk')277 @helpers.change_config('ckan.site_url', 'http://test.ckan.org')278 def test_cors_config_origin_allow_all_false_with_whitelist_not_containing_origin(self):279 '''280 With origin_allow_all set to false, with an origin in the request281 header, and a whitelist defining multiple allowed origins (but not282 containing the requesting origin), there should be no Access-Control-283 Allow headers in the response.284 '''285 request_headers = {'Origin': 'http://thirdpartyrequests.org'}286 response = self.app.get('/simple_flask', headers=request_headers)287 response_headers = dict(response.headers)288 assert 'Access-Control-Allow-Origin' not in response_headers289 assert 'Access-Control-Allow-Methods' not in response_headers...
test_warc.py
Source:test_warc.py
1# Copyright (c) 2018 crocoite contributors2# 3# Permission is hereby granted, free of charge, to any person obtaining a copy4# of this software and associated documentation files (the "Software"), to deal5# in the Software without restriction, including without limitation the rights6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell7# copies of the Software, and to permit persons to whom the Software is8# furnished to do so, subject to the following conditions:9# 10# The above copyright notice and this permission notice shall be included in11# all copies or substantial portions of the Software.12# 13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19# THE SOFTWARE.20from tempfile import NamedTemporaryFile21import json, urllib22from operator import itemgetter23from warcio.archiveiterator import ArchiveIterator24from yarl import URL25from multidict import CIMultiDict26from hypothesis import given, reproduce_failure27import hypothesis.strategies as st28import pytest29from .warc import WarcHandler30from .logger import Logger, WarcHandlerConsumer31from .controller import ControllerStart32from .behavior import Script, ScreenshotEvent, DomSnapshotEvent33from .browser import RequestResponsePair, Base64Body, UnicodeBody34from .test_browser import requestResponsePair, urls35def test_log ():36 logger = Logger ()37 with NamedTemporaryFile() as fd:38 with WarcHandler (fd, logger) as handler:39 warclogger = WarcHandlerConsumer (handler)40 logger.connect (warclogger)41 golden = []42 assert handler.log.tell () == 043 golden.append (logger.info (foo=1, bar='baz', encoding='äöüâÎΨ'))44 assert handler.log.tell () != 045 handler.maxLogSize = 046 golden.append (logger.info (bar=1, baz='baz'))47 # should flush the log48 assert handler.log.tell () == 049 fd.seek (0)50 for it in ArchiveIterator (fd):51 headers = it.rec_headers52 assert headers['warc-type'] == 'metadata'53 assert 'warc-target-uri' not in headers54 assert headers['x-crocoite-type'] == 'log'55 assert headers['content-type'] == f'application/json; charset={handler.logEncoding}'56 while True:57 l = it.raw_stream.readline ()58 if not l:59 break60 data = json.loads (l.strip ())61 assert data == golden.pop (0)62def jsonObject ():63 """ JSON-encodable objects """64 return st.dictionaries (st.text (), st.one_of (st.integers (), st.text ()))65def viewport ():66 return st.builds (lambda x, y: f'{x}x{y}', st.integers (), st.integers ())67def event ():68 return st.one_of (69 st.builds (ControllerStart, jsonObject ()),70 st.builds (Script.fromStr, st.text (), st.one_of(st.none (), st.text ())),71 st.builds (ScreenshotEvent, urls (), st.integers (), st.binary ()),72 st.builds (DomSnapshotEvent, urls (), st.builds (lambda x: x.encode ('utf-8'), st.text ()), viewport()),73 requestResponsePair (),74 )75@pytest.mark.asyncio76@given (st.lists (event ()))77async def test_push (golden):78 def checkWarcinfoId (headers):79 if lastWarcinfoRecordid is not None:80 assert headers['WARC-Warcinfo-ID'] == lastWarcinfoRecordid81 lastWarcinfoRecordid = None82 # null logger83 logger = Logger ()84 with open('/tmp/test.warc.gz', 'w+b') as fd:85 with WarcHandler (fd, logger) as handler:86 for g in golden:87 await handler.push (g)88 fd.seek (0)89 it = iter (ArchiveIterator (fd))90 for g in golden:91 if isinstance (g, ControllerStart):92 rec = next (it)93 headers = rec.rec_headers94 assert headers['warc-type'] == 'warcinfo'95 assert 'warc-target-uri' not in headers96 assert 'x-crocoite-type' not in headers97 data = json.load (rec.raw_stream)98 assert data == g.payload99 lastWarcinfoRecordid = headers['warc-record-id']100 assert lastWarcinfoRecordid101 elif isinstance (g, Script):102 rec = next (it)103 headers = rec.rec_headers104 assert headers['warc-type'] == 'resource'105 assert headers['content-type'] == 'application/javascript; charset=utf-8'106 assert headers['x-crocoite-type'] == 'script'107 checkWarcinfoId (headers)108 if g.path:109 assert URL (headers['warc-target-uri']) == URL ('file://' + g.abspath)110 else:111 assert 'warc-target-uri' not in headers112 data = rec.raw_stream.read ().decode ('utf-8')113 assert data == g.data114 elif isinstance (g, ScreenshotEvent):115 # XXX: check refers-to header116 rec = next (it)117 headers = rec.rec_headers118 assert headers['warc-type'] == 'conversion'119 assert headers['x-crocoite-type'] == 'screenshot'120 checkWarcinfoId (headers)121 assert URL (headers['warc-target-uri']) == g.url, (headers['warc-target-uri'], g.url)122 assert headers['warc-refers-to'] is None123 assert int (headers['X-Crocoite-Screenshot-Y-Offset']) == g.yoff124 assert rec.raw_stream.read () == g.data125 elif isinstance (g, DomSnapshotEvent):126 rec = next (it)127 headers = rec.rec_headers128 assert headers['warc-type'] == 'conversion'129 assert headers['x-crocoite-type'] == 'dom-snapshot'130 checkWarcinfoId (headers)131 assert URL (headers['warc-target-uri']) == g.url132 assert headers['warc-refers-to'] is None133 assert rec.raw_stream.read () == g.document134 elif isinstance (g, RequestResponsePair):135 rec = next (it)136 # request137 headers = rec.rec_headers138 assert headers['warc-type'] == 'request'139 assert 'x-crocoite-type' not in headers140 checkWarcinfoId (headers)141 assert URL (headers['warc-target-uri']) == g.url142 assert headers['x-chrome-request-id'] == g.id143 144 assert CIMultiDict (rec.http_headers.headers) == g.request.headers145 if g.request.hasPostData:146 if g.request.body is not None:147 assert rec.raw_stream.read () == g.request.body148 else:149 # body fetch failed150 assert headers['warc-truncated'] == 'unspecified'151 assert not rec.raw_stream.read ()152 else:153 assert not rec.raw_stream.read ()154 # response155 if g.response:156 rec = next (it)157 headers = rec.rec_headers158 httpheaders = rec.http_headers159 assert headers['warc-type'] == 'response'160 checkWarcinfoId (headers)161 assert URL (headers['warc-target-uri']) == g.url162 assert headers['x-chrome-request-id'] == g.id163 assert 'x-crocoite-type' not in headers164 # these are checked separately165 filteredHeaders = CIMultiDict (httpheaders.headers)166 for b in {'content-type', 'content-length'}:167 if b in g.response.headers:168 g.response.headers.popall (b)169 if b in filteredHeaders:170 filteredHeaders.popall (b)171 assert filteredHeaders == g.response.headers172 expectedContentType = g.response.mimeType173 if expectedContentType is not None:174 assert httpheaders['content-type'].startswith (expectedContentType)175 if g.response.body is not None:176 assert rec.raw_stream.read () == g.response.body177 assert httpheaders['content-length'] == str (len (g.response.body))178 # body is never truncated if it exists179 assert headers['warc-truncated'] is None180 # unencoded strings are converted to utf8181 if isinstance (g.response.body, UnicodeBody) and httpheaders['content-type'] is not None:182 assert httpheaders['content-type'].endswith ('; charset=utf-8')183 else:184 # body fetch failed185 assert headers['warc-truncated'] == 'unspecified'186 assert not rec.raw_stream.read ()187 # content-length header should be kept intact188 else:189 assert False, f"invalid golden type {type(g)}" # pragma: no cover190 # no further records191 with pytest.raises (StopIteration):...
test_tools.py
Source:test_tools.py
1# Copyright (c) 2018 crocoite contributors2# 3# Permission is hereby granted, free of charge, to any person obtaining a copy4# of this software and associated documentation files (the "Software"), to deal5# in the Software without restriction, including without limitation the rights6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell7# copies of the Software, and to permit persons to whom the Software is8# furnished to do so, subject to the following conditions:9# 10# The above copyright notice and this permission notice shall be included in11# all copies or substantial portions of the Software.12# 13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19# THE SOFTWARE.20from tempfile import NamedTemporaryFile21from operator import itemgetter22from io import BytesIO23import pytest24from warcio.archiveiterator import ArchiveIterator25from warcio.warcwriter import WARCWriter26from warcio.statusandheaders import StatusAndHeaders27from pkg_resources import parse_version28from .tools import mergeWarc, Errata, FixableErrata29@pytest.fixture30def writer():31 return WARCWriter (NamedTemporaryFile(), gzip=True)32def recordsEqual(golden, underTest):33 for a, b in zip (golden, underTest):34 # record ids are not predictable, so we cannot compare them. Dito for35 # dates. Content-* seems to be added when writing to file.36 for x in {'WARC-Record-Id', 'WARC-Block-Digest', 'WARC-Date',37 'Content-Length', 'Content-Type'}:38 a.rec_headers.remove_header(x)39 b.rec_headers.remove_header(x)40 aheader = sorted(a.rec_headers.headers, key=itemgetter(0))41 bheader = sorted(b.rec_headers.headers, key=itemgetter(0))42 assert aheader == bheader43 assert a.http_headers == b.http_headers44def makeGolden(writer, records):45 # additional warcinfo is written. Content does not matter.46 record = writer.create_warc_record (47 '',48 'warcinfo',49 payload=b'',50 warc_headers_dict={'Content-Type': 'application/json; charset=utf-8'})51 records.insert (0, record)52 return records53def test_unmodified(writer):54 """55 Single request/response pair, no revisits56 """57 records = []58 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)59 warcHeaders = {}60 record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),61 warc_headers_dict=warcHeaders, http_headers=httpHeaders)62 records.append (record)63 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')64 record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'),65 warc_headers_dict=warcHeaders, http_headers=httpHeaders)66 records.append (record)67 for r in records:68 writer.write_record (r)69 output = NamedTemporaryFile()70 mergeWarc ([writer.out.name], output)71 output.seek(0)72 recordsEqual (makeGolden (writer, records), ArchiveIterator (output))73def test_different_payload(writer):74 """75 Duplicate URL, but different payload76 """77 records = []78 for i in range (2):79 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)80 warcHeaders = {}81 record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),82 warc_headers_dict=warcHeaders, http_headers=httpHeaders)83 records.append (record)84 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')85 record = writer.create_warc_record ('http://example.com/', 'response',86 payload=BytesIO(f'data{i}'.encode ('utf8')),87 warc_headers_dict=warcHeaders, http_headers=httpHeaders)88 records.append (record)89 for r in records:90 writer.write_record (r)91 output = NamedTemporaryFile()92 mergeWarc ([writer.out.name], output)93 output.seek(0)94 recordsEqual (makeGolden (writer, records), ArchiveIterator (output))95def makeRevisit(writer, ref, dup):96 """ Make revisit record for reference """97 dupHeaders = dup.rec_headers98 refHeaders = ref.rec_headers99 record = writer.create_revisit_record (dupHeaders.get_header('WARC-Target-URI'),100 digest=refHeaders.get_header('WARC-Payload-Digest'),101 refers_to_uri=refHeaders.get_header('WARC-Target-URI'),102 refers_to_date=refHeaders.get_header('WARC-Date'),103 http_headers=dup.http_headers)104 record.rec_headers.add_header ('WARC-Refers-To', refHeaders.get_header('WARC-Record-ID'))105 record.rec_headers.add_header ('WARC-Truncated', 'length')106 return record107def test_resp_revisit_same_url(writer):108 """109 Duplicate record for the same URL, creates a revisit110 """111 records = []112 for i in range (2):113 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)114 warcHeaders = {}115 record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),116 warc_headers_dict=warcHeaders, http_headers=httpHeaders)117 records.append (record)118 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')119 record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'),120 warc_headers_dict=warcHeaders, http_headers=httpHeaders)121 records.append (record)122 for r in records:123 writer.write_record (r)124 dup = records.pop ()125 ref = records[1]126 records.append (makeRevisit (writer, ref, dup))127 output = NamedTemporaryFile()128 mergeWarc ([writer.out.name], output)129 output.seek(0)130 recordsEqual (makeGolden (writer, records), ArchiveIterator (output))131def test_resp_revisit_other_url(writer):132 """133 Duplicate record for different URL, creates a revisit134 """135 records = []136 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)137 warcHeaders = {}138 record = writer.create_warc_record ('http://example.com/', 'request', payload=BytesIO(b'foobar'),139 warc_headers_dict=warcHeaders, http_headers=httpHeaders)140 records.append (record)141 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')142 record = writer.create_warc_record ('http://example.com/', 'response', payload=BytesIO(b'data'),143 warc_headers_dict=warcHeaders, http_headers=httpHeaders)144 records.append (record)145 httpHeaders = StatusAndHeaders('GET / HTTP/1.1', {}, is_http_request=True)146 warcHeaders = {}147 record = writer.create_warc_record ('http://example.com/one', 'request', payload=BytesIO(b'foobar'),148 warc_headers_dict=warcHeaders, http_headers=httpHeaders)149 records.append (record)150 httpHeaders = StatusAndHeaders('200 OK', {}, protocol='HTTP/1.1')151 record = writer.create_warc_record ('http://example.com/one', 'response', payload=BytesIO(b'data'),152 warc_headers_dict=warcHeaders, http_headers=httpHeaders)153 records.append (record)154 for r in records:155 writer.write_record (r)156 dup = records.pop ()157 ref = records[1]158 records.append (makeRevisit (writer, ref, dup))159 output = NamedTemporaryFile()160 mergeWarc ([writer.out.name], output)161 output.seek(0)162 recordsEqual (makeGolden (writer, records), ArchiveIterator (output))163def test_errata_contains():164 """ Test version matching """165 e = Errata('some-uuid', 'description', ['a<1.0'])166 assert {'a': parse_version('0.1')} in e167 assert {'a': parse_version('1.0')} not in e168 assert {'b': parse_version('1.0')} not in e169 e = Errata('some-uuid', 'description', ['a<1.0,>0.1'])170 assert {'a': parse_version('0.1')} not in e171 assert {'a': parse_version('0.2')} in e172 assert {'a': parse_version('1.0')} not in e173 # a AND b174 e = Errata('some-uuid', 'description', ['a<1.0', 'b>1.0'])175 assert {'a': parse_version('0.1')} not in e176 assert {'b': parse_version('1.1')} not in e177 assert {'a': parse_version('0.1'), 'b': parse_version('1.1')} in e178def test_errata_fixable ():179 e = Errata('some-uuid', 'description', ['a<1.0', 'b>1.0'])180 assert not e.fixable181 e = FixableErrata('some-uuid', 'description', ['a<1.0', 'b>1.0'])...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!