Best Python code snippet using playwright-python
test_interception.py
Source:test_interception.py
...22 assert "empty.html" in request.url23 assert request.headers["user-agent"]24 assert request.method == "GET"25 assert request.post_data is None26 assert request.is_navigation_request()27 assert request.resource_type == "document"28 assert request.frame == page.main_frame29 assert request.frame.url == "about:blank"30 await route.continue_()31 intercepted.append(True)32 await page.route("**/empty.html", handle_request)33 response = await page.goto(server.EMPTY_PAGE)34 assert response.ok35 assert len(intercepted) == 136async def test_page_route_should_unroute(page: Page, server):37 intercepted = []38 def handler1(route):39 intercepted.append(1)40 asyncio.create_task(route.continue_())41 await page.route("**/empty.html", handler1)42 await page.route(43 "**/empty.html",44 lambda route: (45 intercepted.append(2), # type: ignore46 asyncio.create_task(route.continue_()),47 ),48 )49 await page.route(50 "**/empty.html",51 lambda route: (52 intercepted.append(3), # type: ignore53 asyncio.create_task(route.continue_()),54 ),55 )56 await page.route(57 "**/*",58 lambda route: (59 intercepted.append(4), # type: ignore60 asyncio.create_task(route.continue_()),61 ),62 )63 await page.goto(server.EMPTY_PAGE)64 assert intercepted == [1]65 intercepted = []66 await page.unroute("**/empty.html", handler1)67 await page.goto(server.EMPTY_PAGE)68 assert intercepted == [2]69 intercepted = []70 await page.unroute("**/empty.html")71 await page.goto(server.EMPTY_PAGE)72 assert intercepted == [4]73async def test_page_route_should_work_when_POST_is_redirected_with_302(page, server):74 server.set_redirect("/rredirect", "/empty.html")75 await page.goto(server.EMPTY_PAGE)76 await page.route("**/*", lambda route: route.continue_())77 await page.set_content(78 """79 <form action='/rredirect' method='post'>80 <input type="hidden" id="foo" name="foo" value="FOOBAR">81 </form>82 """83 )84 async with page.expect_navigation():85 await page.eval_on_selector("form", "form => form.submit()"),86# @see https://github.com/GoogleChrome/puppeteer/issues/397387async def test_page_route_should_work_when_header_manipulation_headers_with_redirect(88 page, server89):90 server.set_redirect("/rrredirect", "/empty.html")91 await page.route(92 "**/*",93 lambda route: route.continue_(headers={**route.request.headers, "foo": "bar"}),94 )95 await page.goto(server.PREFIX + "/rrredirect")96# @see https://github.com/GoogleChrome/puppeteer/issues/474397async def test_page_route_should_be_able_to_remove_headers(page, server):98 async def handle_request(route):99 headers = route.request.headers100 if "origin" in headers:101 del headers["origin"]102 await route.continue_(headers=headers)103 await page.route(104 "**/*", # remove "origin" header105 handle_request,106 )107 [serverRequest, _] = await asyncio.gather(108 server.wait_for_request("/empty.html"), page.goto(server.PREFIX + "/empty.html")109 )110 assert serverRequest.getHeader("origin") is None111async def test_page_route_should_contain_referer_header(page, server):112 requests = []113 await page.route(114 "**/*",115 lambda route: (116 requests.append(route.request),117 asyncio.create_task(route.continue_()),118 ),119 )120 await page.goto(server.PREFIX + "/one-style.html")121 assert "/one-style.css" in requests[1].url122 assert "/one-style.html" in requests[1].headers["referer"]123async def test_page_route_should_properly_return_navigation_response_when_URL_has_cookies(124 context, page, server125):126 # Setup cookie.127 await page.goto(server.EMPTY_PAGE)128 await context.add_cookies(129 [{"url": server.EMPTY_PAGE, "name": "foo", "value": "bar"}]130 )131 # Setup request interception.132 await page.route("**/*", lambda route: route.continue_())133 response = await page.reload()134 assert response.status == 200135async def test_page_route_should_show_custom_HTTP_headers(page, server):136 await page.set_extra_http_headers({"foo": "bar"})137 def assert_headers(request):138 assert request.headers["foo"] == "bar"139 await page.route(140 "**/*",141 lambda route: (142 assert_headers(route.request),143 asyncio.create_task(route.continue_()),144 ),145 )146 response = await page.goto(server.EMPTY_PAGE)147 assert response.ok148# @see https://github.com/GoogleChrome/puppeteer/issues/4337149async def test_page_route_should_work_with_redirect_inside_sync_XHR(page, server):150 await page.goto(server.EMPTY_PAGE)151 server.set_redirect("/logo.png", "/pptr.png")152 await page.route("**/*", lambda route: route.continue_())153 status = await page.evaluate(154 """async() => {155 const request = new XMLHttpRequest();156 request.open('GET', '/logo.png', false); // `false` makes the request synchronous157 request.send(null);158 return request.status;159 }"""160 )161 assert status == 200162async def test_page_route_should_work_with_custom_referer_headers(page, server):163 await page.set_extra_http_headers({"referer": server.EMPTY_PAGE})164 def assert_headers(route):165 assert route.request.headers["referer"] == server.EMPTY_PAGE166 await page.route(167 "**/*",168 lambda route: (169 assert_headers(route),170 asyncio.create_task(route.continue_()),171 ),172 )173 response = await page.goto(server.EMPTY_PAGE)174 assert response.ok175async def test_page_route_should_be_abortable(page, server):176 await page.route(r"/\.css$/", lambda route: asyncio.create_task(route.abort()))177 failed = []178 def handle_request(request):179 if request.url.includes(".css"):180 failed.append(True)181 page.on("requestfailed", handle_request)182 response = await page.goto(server.PREFIX + "/one-style.html")183 assert response.ok184 assert response.request.failure is None185 assert len(failed) == 0186async def test_page_route_should_be_abortable_with_custom_error_codes(187 page: Page, server, is_webkit, is_firefox188):189 await page.route(190 "**/*",191 lambda route: route.abort("internetdisconnected"),192 )193 failed_requests = []194 page.on("requestfailed", lambda request: failed_requests.append(request))195 with pytest.raises(Error):196 await page.goto(server.EMPTY_PAGE)197 assert len(failed_requests) == 1198 failed_request = failed_requests[0]199 if is_webkit:200 assert failed_request.failure == "Request intercepted"201 elif is_firefox:202 assert failed_request.failure == "NS_ERROR_OFFLINE"203 else:204 assert failed_request.failure == "net::ERR_INTERNET_DISCONNECTED"205async def test_page_route_should_send_referer(page, server):206 await page.set_extra_http_headers({"referer": "http://google.com/"})207 await page.route("**/*", lambda route: route.continue_())208 [request, _] = await asyncio.gather(209 server.wait_for_request("/grid.html"),210 page.goto(server.PREFIX + "/grid.html"),211 )212 assert request.getHeader("referer") == "http://google.com/"213async def test_page_route_should_fail_navigation_when_aborting_main_resource(214 page, server, is_webkit, is_firefox215):216 await page.route("**/*", lambda route: route.abort())217 with pytest.raises(Error) as exc:218 await page.goto(server.EMPTY_PAGE)219 assert exc220 if is_webkit:221 assert "Request intercepted" in exc.value.message222 elif is_firefox:223 assert "NS_ERROR_FAILURE" in exc.value.message224 else:225 assert "net::ERR_FAILED" in exc.value.message226async def test_page_route_should_not_work_with_redirects(page, server):227 intercepted = []228 await page.route(229 "**/*",230 lambda route: (231 asyncio.create_task(route.continue_()),232 intercepted.append(route.request),233 ),234 )235 server.set_redirect("/non-existing-page.html", "/non-existing-page-2.html")236 server.set_redirect("/non-existing-page-2.html", "/non-existing-page-3.html")237 server.set_redirect("/non-existing-page-3.html", "/non-existing-page-4.html")238 server.set_redirect("/non-existing-page-4.html", "/empty.html")239 response = await page.goto(server.PREFIX + "/non-existing-page.html")240 assert response.status == 200241 assert "empty.html" in response.url242 assert len(intercepted) == 1243 assert intercepted[0].resource_type == "document"244 assert intercepted[0].is_navigation_request()245 assert "/non-existing-page.html" in intercepted[0].url246 chain = []247 r = response.request248 while r:249 chain.append(r)250 assert r.is_navigation_request()251 r = r.redirected_from252 assert len(chain) == 5253 assert "/empty.html" in chain[0].url254 assert "/non-existing-page-4.html" in chain[1].url255 assert "/non-existing-page-3.html" in chain[2].url256 assert "/non-existing-page-2.html" in chain[3].url257 assert "/non-existing-page.html" in chain[4].url258 for idx, _ in enumerate(chain):259 assert chain[idx].redirected_to == (chain[idx - 1] if idx > 0 else None)260async def test_page_route_should_work_with_redirects_for_subresources(page, server):261 intercepted = []262 await page.route(263 "**/*",264 lambda route: (265 asyncio.create_task(route.continue_()),266 intercepted.append(route.request),267 ),268 )269 server.set_redirect("/one-style.css", "/two-style.css")270 server.set_redirect("/two-style.css", "/three-style.css")271 server.set_redirect("/three-style.css", "/four-style.css")272 server.set_route(273 "/four-style.css",274 lambda req: (req.write(b"body {box-sizing: border-box; }"), req.finish()),275 )276 response = await page.goto(server.PREFIX + "/one-style.html")277 assert response.status == 200278 assert "one-style.html" in response.url279 assert len(intercepted) == 2280 assert intercepted[0].resource_type == "document"281 assert "one-style.html" in intercepted[0].url282 r = intercepted[1]283 for url in [284 "/one-style.css",285 "/two-style.css",286 "/three-style.css",287 "/four-style.css",288 ]:289 assert r.resource_type == "stylesheet"290 assert url in r.url291 r = r.redirected_to292 assert r is None293async def test_page_route_should_work_with_equal_requests(page, server):294 await page.goto(server.EMPTY_PAGE)295 hits = [True]296 def handle_request(request, hits):297 request.write(str(len(hits) * 11).encode())298 request.finish()299 hits.append(True)300 server.set_route("/zzz", lambda r: handle_request(r, hits))301 spinner = []302 async def handle_route(route):303 if len(spinner) == 1:304 await route.abort()305 spinner.pop(0)306 else:307 await route.continue_()308 spinner.append(True)309 # Cancel 2nd request.310 await page.route("**/*", handle_route)311 results = []312 for idx in range(3):313 results.append(314 await page.evaluate(315 """() => fetch('/zzz').then(response => response.text()).catch(e => 'FAILED')"""316 )317 )318 assert results == ["11", "FAILED", "22"]319async def test_page_route_should_navigate_to_dataURL_and_not_fire_dataURL_requests(320 page, server321):322 requests = []323 await page.route(324 "**/*",325 lambda route: (326 requests.append(route.request),327 asyncio.create_task(route.continue_()),328 ),329 )330 data_URL = "data:text/html,<div>yo</div>"331 response = await page.goto(data_URL)332 assert response is None333 assert len(requests) == 0334async def test_page_route_should_be_able_to_fetch_dataURL_and_not_fire_dataURL_requests(335 page, server336):337 await page.goto(server.EMPTY_PAGE)338 requests = []339 await page.route(340 "**/*",341 lambda route: (342 requests.append(route.request),343 asyncio.create_task(route.continue_()),344 ),345 )346 data_URL = "data:text/html,<div>yo</div>"347 text = await page.evaluate("url => fetch(url).then(r => r.text())", data_URL)348 assert text == "<div>yo</div>"349 assert len(requests) == 0350async def test_page_route_should_navigate_to_URL_with_hash_and_and_fire_requests_without_hash(351 page, server352):353 requests = []354 await page.route(355 "**/*",356 lambda route: (357 requests.append(route.request),358 asyncio.create_task(route.continue_()),359 ),360 )361 response = await page.goto(server.EMPTY_PAGE + "#hash")362 assert response.status == 200363 assert response.url == server.EMPTY_PAGE364 assert len(requests) == 1365 assert requests[0].url == server.EMPTY_PAGE366async def test_page_route_should_work_with_encoded_server(page, server):367 # The requestWillBeSent will report encoded URL, whereas interception will368 # report URL as-is. @see crbug.com/759388369 await page.route("**/*", lambda route: route.continue_())370 response = await page.goto(server.PREFIX + "/some nonexisting page")371 assert response.status == 404372async def test_page_route_should_work_with_encoded_server___2(page, server):373 # The requestWillBeSent will report URL as-is, whereas interception will374 # report encoded URL for stylesheet. @see crbug.com/759388375 requests = []376 await page.route(377 "**/*",378 lambda route: (379 asyncio.create_task(route.continue_()),380 requests.append(route.request),381 ),382 )383 response = await page.goto(384 f"""data:text/html,<link rel="stylesheet" href="{server.PREFIX}/fonts?helvetica|arial"/>"""385 )386 assert response is None387 assert len(requests) == 1388 assert (await requests[0].response()).status == 404389async def test_page_route_should_not_throw_Invalid_Interception_Id_if_the_request_was_cancelled(390 page, server391):392 await page.set_content("<iframe></iframe>")393 route_future = asyncio.Future()394 await page.route("**/*", lambda r, _: route_future.set_result(r))395 async with page.expect_request("**/*"):396 await page.eval_on_selector(397 "iframe", """(frame, url) => frame.src = url""", server.EMPTY_PAGE398 )399 # Delete frame to cause request to be canceled.400 await page.eval_on_selector("iframe", "frame => frame.remove()")401 route = await route_future402 await route.continue_()403async def test_page_route_should_intercept_main_resource_during_cross_process_navigation(404 page, server405):406 await page.goto(server.EMPTY_PAGE)407 intercepted = []408 await page.route(409 server.CROSS_PROCESS_PREFIX + "/empty.html",410 lambda route: (411 intercepted.append(True),412 asyncio.create_task(route.continue_()),413 ),414 )415 response = await page.goto(server.CROSS_PROCESS_PREFIX + "/empty.html")416 assert response.ok417 assert len(intercepted) == 1418@pytest.mark.skip_browser("webkit")419async def test_page_route_should_create_a_redirect(page, server):420 await page.goto(server.PREFIX + "/empty.html")421 async def handle_route(route, request):422 if request.url != (server.PREFIX + "/redirect_this"):423 return await route.continue_()424 await route.fulfill(status=301, headers={"location": "/empty.html"})425 await page.route(426 "**/*",427 handle_route,428 )429 text = await page.evaluate(430 """async url => {431 const data = await fetch(url);432 return data.text();433 }""",434 server.PREFIX + "/redirect_this",435 )436 assert text == ""437async def test_page_route_should_support_cors_with_GET(page, server):438 await page.goto(server.EMPTY_PAGE)439 async def handle_route(route, request):440 headers = (441 {"access-control-allow-origin": "*"}442 if request.url.endswith("allow")443 else {}444 )445 await route.fulfill(446 content_type="application/json",447 headers=headers,448 status=200,449 body=json.dumps(["electric", "gas"]),450 )451 await page.route(452 "**/cars*",453 handle_route,454 )455 # Should succeed456 resp = await page.evaluate(457 """async () => {458 const response = await fetch('https://example.com/cars?allow', { mode: 'cors' });459 return response.json();460 }"""461 )462 assert resp == ["electric", "gas"]463 # Should be rejected464 with pytest.raises(Error) as exc:465 await page.evaluate(466 """async () => {467 const response = await fetch('https://example.com/cars?reject', { mode: 'cors' });468 return response.json();469 }"""470 )471 assert "failed" in exc.value.message472async def test_page_route_should_support_cors_with_POST(page, server):473 await page.goto(server.EMPTY_PAGE)474 await page.route(475 "**/cars",476 lambda route: route.fulfill(477 content_type="application/json",478 headers={"Access-Control-Allow-Origin": "*"},479 status=200,480 body=json.dumps(["electric", "gas"]),481 ),482 )483 resp = await page.evaluate(484 """async () => {485 const response = await fetch('https://example.com/cars', {486 method: 'POST',487 headers: { 'Content-Type': 'application/json' },488 mode: 'cors',489 body: JSON.stringify({ 'number': 1 })490 });491 return response.json();492 }"""493 )494 assert resp == ["electric", "gas"]495async def test_page_route_should_support_cors_for_different_methods(page, server):496 await page.goto(server.EMPTY_PAGE)497 await page.route(498 "**/cars",499 lambda route, request: route.fulfill(500 content_type="application/json",501 headers={"Access-Control-Allow-Origin": "*"},502 status=200,503 body=json.dumps([request.method, "electric", "gas"]),504 ),505 )506 # First POST507 resp = await page.evaluate(508 """async () => {509 const response = await fetch('https://example.com/cars', {510 method: 'POST',511 headers: { 'Content-Type': 'application/json' },512 mode: 'cors',513 body: JSON.stringify({ 'number': 1 })514 });515 return response.json();516 }"""517 )518 assert resp == ["POST", "electric", "gas"]519 # Then DELETE520 resp = await page.evaluate(521 """async () => {522 const response = await fetch('https://example.com/cars', {523 method: 'DELETE',524 headers: {},525 mode: 'cors',526 body: ''527 });528 return response.json();529 }"""530 )531 assert resp == ["DELETE", "electric", "gas"]532async def test_request_fulfill_should_work_a(page, server):533 await page.route(534 "**/*",535 lambda route: route.fulfill(536 status=201,537 headers={"foo": "bar"},538 content_type="text/html",539 body="Yo, page!",540 ),541 )542 response = await page.goto(server.EMPTY_PAGE)543 assert response.status == 201544 assert response.headers["foo"] == "bar"545 assert await page.evaluate("() => document.body.textContent") == "Yo, page!"546async def test_request_fulfill_should_work_with_status_code_422(page, server):547 await page.route(548 "**/*",549 lambda route: route.fulfill(status=422, body="Yo, page!"),550 )551 response = await page.goto(server.EMPTY_PAGE)552 assert response.status == 422553 assert response.status_text == "Unprocessable Entity"554 assert await page.evaluate("() => document.body.textContent") == "Yo, page!"555async def test_request_fulfill_should_allow_mocking_binary_responses(556 page: Page, server, assert_to_be_golden, assetdir557):558 await page.route(559 "**/*",560 lambda route: route.fulfill(561 content_type="image/png",562 body=(assetdir / "pptr.png").read_bytes(),563 ),564 )565 await page.evaluate(566 """PREFIX => {567 const img = document.createElement('img');568 img.src = PREFIX + '/does-not-exist.png';569 document.body.appendChild(img);570 return new Promise(fulfill => img.onload = fulfill);571 }""",572 server.PREFIX,573 )574 img = await page.query_selector("img")575 assert img576 assert_to_be_golden(await img.screenshot(), "mock-binary-response.png")577async def test_request_fulfill_should_allow_mocking_svg_with_charset(578 page, server, assert_to_be_golden579):580 await page.route(581 "**/*",582 lambda route: route.fulfill(583 content_type="image/svg+xml ; charset=utf-8",584 body='<svg width="50" height="50" version="1.1" xmlns="http://www.w3.org/2000/svg"><rect x="10" y="10" width="30" height="30" stroke="black" fill="transparent" stroke-width="5"/></svg>',585 ),586 )587 await page.evaluate(588 """PREFIX => {589 const img = document.createElement('img');590 img.src = PREFIX + '/does-not-exist.svg';591 document.body.appendChild(img);592 return new Promise((f, r) => { img.onload = f; img.onerror = r; });593 }""",594 server.PREFIX,595 )596 img = await page.query_selector("img")597 assert_to_be_golden(await img.screenshot(), "mock-svg.png")598async def test_request_fulfill_should_work_with_file_path(599 page: Page, server, assert_to_be_golden, assetdir600):601 await page.route(602 "**/*",603 lambda route: route.fulfill(604 content_type="shouldBeIgnored", path=assetdir / "pptr.png"605 ),606 )607 await page.evaluate(608 """PREFIX => {609 const img = document.createElement('img');610 img.src = PREFIX + '/does-not-exist.png';611 document.body.appendChild(img);612 return new Promise(fulfill => img.onload = fulfill);613 }""",614 server.PREFIX,615 )616 img = await page.query_selector("img")617 assert img618 assert_to_be_golden(await img.screenshot(), "mock-binary-response.png")619async def test_request_fulfill_should_stringify_intercepted_request_response_headers(620 page, server621):622 await page.route(623 "**/*",624 lambda route: route.fulfill(625 status=200, headers={"foo": True}, body="Yo, page!"626 ),627 )628 response = await page.goto(server.EMPTY_PAGE)629 assert response.status == 200630 headers = response.headers631 assert headers["foo"] == "True"632 assert await page.evaluate("() => document.body.textContent") == "Yo, page!"633async def test_request_fulfill_should_not_modify_the_headers_sent_to_the_server(634 page, server635):636 await page.goto(server.PREFIX + "/empty.html")637 interceptedRequests = []638 # this is just to enable request interception, which disables caching in chromium639 await page.route(server.PREFIX + "/unused", lambda route, req: None)640 server.set_route(641 "/something",642 lambda response: (643 interceptedRequests.append(response),644 response.setHeader("Access-Control-Allow-Origin", "*"),645 response.write(b"done"),646 response.finish(),647 ),648 )649 text = await page.evaluate(650 """async url => {651 const data = await fetch(url);652 return data.text();653 }""",654 server.CROSS_PROCESS_PREFIX + "/something",655 )656 assert text == "done"657 playwrightRequest = asyncio.Future()658 await page.route(659 server.CROSS_PROCESS_PREFIX + "/something",660 lambda route, request: (661 playwrightRequest.set_result(request),662 asyncio.create_task(route.continue_(headers={**request.headers})),663 ),664 )665 textAfterRoute = await page.evaluate(666 """async url => {667 const data = await fetch(url);668 return data.text();669 }""",670 server.CROSS_PROCESS_PREFIX + "/something",671 )672 assert textAfterRoute == "done"673 assert len(interceptedRequests) == 2674 assert (675 interceptedRequests[0].requestHeaders == interceptedRequests[1].requestHeaders676 )677async def test_request_fulfill_should_include_the_origin_header(page, server):678 await page.goto(server.PREFIX + "/empty.html")679 interceptedRequest = []680 await page.route(681 server.CROSS_PROCESS_PREFIX + "/something",682 lambda route, request: (683 interceptedRequest.append(request),684 asyncio.create_task(685 route.fulfill(686 headers={"Access-Control-Allow-Origin": "*"},687 content_type="text/plain",688 body="done",689 )690 ),691 ),692 )693 text = await page.evaluate(694 """async url => {695 const data = await fetch(url);696 return data.text();697 }""",698 server.CROSS_PROCESS_PREFIX + "/something",699 )700 assert text == "done"701 assert len(interceptedRequest) == 1702 assert interceptedRequest[0].headers["origin"] == server.PREFIX703async def test_request_fulfill_should_work_with_request_interception(page, server):704 requests = {}705 async def _handle_route(route: Route):706 requests[route.request.url.split("/").pop()] = route.request707 await route.continue_()708 await page.route("**/*", _handle_route)709 server.set_redirect("/rrredirect", "/frames/one-frame.html")710 await page.goto(server.PREFIX + "/rrredirect")711 assert requests["rrredirect"].is_navigation_request()712 assert requests["frame.html"].is_navigation_request()713 assert requests["script.js"].is_navigation_request() is False714 assert requests["style.css"].is_navigation_request() is False715async def test_Interception_should_work_with_request_interception(716 browser: Browser, https_server717):718 context = await browser.new_context(ignore_https_errors=True)719 page = await context.new_page()720 await page.route("**/*", lambda route: asyncio.ensure_future(route.continue_()))721 response = await page.goto(https_server.EMPTY_PAGE)722 assert response723 assert response.status == 200724 await context.close()725async def test_ignore_http_errors_service_worker_should_intercept_after_a_service_worker(726 page, server727):728 await page.goto(server.PREFIX + "/serviceworkers/fetchdummy/sw.html")...
test_network.py
Source:test_network.py
...23 assert "empty.html" in request.url24 assert request.headers["user-agent"]25 assert request.method == "GET"26 assert request.post_data is None27 assert request.is_navigation_request()28 assert request.resource_type == "document"29 assert request.frame == page.main_frame30 assert request.frame.url == "about:blank"31 await route.fulfill(body="Text")32 await page.route(33 "**/empty.html",34 lambda route, request: asyncio.create_task(handle_request(route, request)),35 )36 response = await page.goto(server.EMPTY_PAGE)37 assert response.ok38 assert await response.text() == "Text"39async def test_request_continue(page, server):40 async def handle_request(route, request, intercepted):41 intercepted.append(True)42 await route.continue_()43 intercepted = list()44 await page.route(45 "**/*",46 lambda route, request: asyncio.create_task(47 handle_request(route, request, intercepted)48 ),49 )50 response = await page.goto(server.EMPTY_PAGE)51 assert response.ok52 assert intercepted == [True]53 assert await page.title() == ""54async def test_page_events_request_should_fire_for_navigation_requests(55 page: Page, server56):57 requests = []58 page.on("request", lambda r: requests.append(r))59 await page.goto(server.EMPTY_PAGE)60 assert len(requests) == 161async def test_page_events_request_should_fire_for_iframes(page, server, utils):62 requests = []63 page.on("request", lambda r: requests.append(r))64 await page.goto(server.EMPTY_PAGE)65 await utils.attach_frame(page, "frame1", server.EMPTY_PAGE)66 assert len(requests) == 267async def test_page_events_request_should_fire_for_fetches(page, server):68 requests = []69 page.on("request", lambda r: requests.append(r))70 await page.goto(server.EMPTY_PAGE)71 await page.evaluate('() => fetch("/empty.html")')72 assert len(requests) == 273async def test_page_events_request_should_report_requests_and_responses_handled_by_service_worker(74 page: Page, server75):76 await page.goto(server.PREFIX + "/serviceworkers/fetchdummy/sw.html")77 await page.evaluate("() => window.activationPromise")78 sw_response = None79 async with page.expect_request("**/*") as request_info:80 sw_response = await page.evaluate('() => fetchDummy("foo")')81 request = await request_info.value82 assert sw_response == "responseFromServiceWorker:foo"83 assert request.url == server.PREFIX + "/serviceworkers/fetchdummy/foo"84 response = await request.response()85 assert response.url == server.PREFIX + "/serviceworkers/fetchdummy/foo"86 assert await response.text() == "responseFromServiceWorker:foo"87async def test_request_frame_should_work_for_main_frame_navigation_request(88 page, server89):90 requests = []91 page.on("request", lambda r: requests.append(r))92 await page.goto(server.EMPTY_PAGE)93 assert len(requests) == 194 assert requests[0].frame == page.main_frame95async def test_request_frame_should_work_for_subframe_navigation_request(96 page, server, utils97):98 await page.goto(server.EMPTY_PAGE)99 requests = []100 page.on("request", lambda r: requests.append(r))101 await utils.attach_frame(page, "frame1", server.EMPTY_PAGE)102 assert len(requests) == 1103 assert requests[0].frame == page.frames[1]104async def test_request_frame_should_work_for_fetch_requests(page, server):105 await page.goto(server.EMPTY_PAGE)106 requests: List[Request] = []107 page.on("request", lambda r: requests.append(r))108 await page.evaluate('() => fetch("/digits/1.png")')109 requests = [r for r in requests if "favicon" not in r.url]110 assert len(requests) == 1111 assert requests[0].frame == page.main_frame112async def test_request_headers_should_work(113 page, server, is_chromium, is_firefox, is_webkit114):115 response = await page.goto(server.EMPTY_PAGE)116 if is_chromium:117 assert "Chrome" in response.request.headers["user-agent"]118 elif is_firefox:119 assert "Firefox" in response.request.headers["user-agent"]120 elif is_webkit:121 assert "WebKit" in response.request.headers["user-agent"]122async def test_request_headers_should_get_the_same_headers_as_the_server(123 page: Page, server, is_webkit, is_win124):125 server_request_headers_future: Future[Dict[str, str]] = asyncio.Future()126 def handle(request):127 normalized_headers = {128 key.decode().lower(): value[0].decode()129 for key, value in request.requestHeaders.getAllRawHeaders()130 }131 server_request_headers_future.set_result(normalized_headers)132 request.write(b"done")133 request.finish()134 server.set_route("/empty.html", handle)135 response = await page.goto(server.EMPTY_PAGE)136 server_headers = await server_request_headers_future137 if is_webkit and is_win:138 # Curl does not show accept-encoding and accept-language139 server_headers.pop("accept-encoding")140 server_headers.pop("accept-language")141 assert cast(Response, response).request.headers == server_headers142async def test_request_headers_should_get_the_same_headers_as_the_server_cors(143 page: Page, server, is_webkit, is_win144):145 await page.goto(server.PREFIX + "/empty.html")146 server_request_headers_future: Future[Dict[str, str]] = asyncio.Future()147 def handle_something(request):148 normalized_headers = {149 key.decode().lower(): value[0].decode()150 for key, value in request.requestHeaders.getAllRawHeaders()151 }152 server_request_headers_future.set_result(normalized_headers)153 request.setHeader("Access-Control-Allow-Origin", "*")154 request.write(b"done")155 request.finish()156 server.set_route("/something", handle_something)157 text = None158 async with page.expect_request("**/*") as request_info:159 text = await page.evaluate(160 """async url => {161 const data = await fetch(url);162 return data.text();163 }""",164 server.CROSS_PROCESS_PREFIX + "/something",165 )166 request = await request_info.value167 assert text == "done"168 server_headers = await server_request_headers_future169 if is_webkit and is_win:170 # Curl does not show accept-encoding and accept-language171 server_headers.pop("accept-encoding")172 server_headers.pop("accept-language")173 assert request.headers == server_headers174async def test_response_headers_should_work(page, server):175 server.set_route("/empty.html", lambda r: (r.setHeader("foo", "bar"), r.finish()))176 response = await page.goto(server.EMPTY_PAGE)177 assert response.headers["foo"] == "bar"178async def test_request_post_data_should_work(page, server):179 await page.goto(server.EMPTY_PAGE)180 server.set_route("/post", lambda r: r.finish())181 requests = []182 page.on("request", lambda r: requests.append(r))183 await page.evaluate(184 '() => fetch("./post", { method: "POST", body: JSON.stringify({foo: "bar"})})'185 )186 assert len(requests) == 1187 assert requests[0].post_data == '{"foo":"bar"}'188async def test_request_post_data__should_be_undefined_when_there_is_no_post_data(189 page, server190):191 response = await page.goto(server.EMPTY_PAGE)192 assert response.request.post_data is None193async def test_should_parse_the_json_post_data(page, server):194 await page.goto(server.EMPTY_PAGE)195 server.set_route("/post", lambda req: req.finish())196 requests = []197 page.on("request", lambda r: requests.append(r))198 await page.evaluate(199 """() => fetch('./post', { method: 'POST', body: JSON.stringify({ foo: 'bar' }) })"""200 )201 assert len(requests) == 1202 assert requests[0].post_data_json == {"foo": "bar"}203async def test_should_parse_the_data_if_content_type_is_form_urlencoded(page, server):204 await page.goto(server.EMPTY_PAGE)205 server.set_route("/post", lambda req: req.finish())206 requests = []207 page.on("request", lambda r: requests.append(r))208 await page.set_content(209 """<form method='POST' action='/post'><input type='text' name='foo' value='bar'><input type='number' name='baz' value='123'><input type='submit'></form>"""210 )211 await page.click("input[type=submit]")212 assert len(requests) == 1213 assert requests[0].post_data_json == {"foo": "bar", "baz": "123"}214async def test_should_be_undefined_when_there_is_no_post_data(page, server):215 response = await page.goto(server.EMPTY_PAGE)216 assert response.request.post_data_json is None217async def test_should_work_with_binary_post_data(page, server):218 await page.goto(server.EMPTY_PAGE)219 server.set_route("/post", lambda req: req.finish())220 requests = []221 page.on("request", lambda r: requests.append(r))222 await page.evaluate(223 """async () => {224 await fetch('./post', { method: 'POST', body: new Uint8Array(Array.from(Array(256).keys())) })225 }"""226 )227 assert len(requests) == 1228 buffer = requests[0].post_data_buffer229 assert len(buffer) == 256230 for i in range(256):231 assert buffer[i] == i232async def test_should_work_with_binary_post_data_and_interception(page, server):233 await page.goto(server.EMPTY_PAGE)234 server.set_route("/post", lambda req: req.finish())235 requests = []236 await page.route("/post", lambda route: asyncio.ensure_future(route.continue_()))237 page.on("request", lambda r: requests.append(r))238 await page.evaluate(239 """async () => {240 await fetch('./post', { method: 'POST', body: new Uint8Array(Array.from(Array(256).keys())) })241 }"""242 )243 assert len(requests) == 1244 buffer = requests[0].post_data_buffer245 assert len(buffer) == 256246 for i in range(256):247 assert buffer[i] == i248async def test_response_text_should_work(page, server):249 response = await page.goto(server.PREFIX + "/simple.json")250 assert await response.text() == '{"foo": "bar"}\n'251async def test_response_text_should_return_uncompressed_text(page, server):252 server.enable_gzip("/simple.json")253 response = await page.goto(server.PREFIX + "/simple.json")254 assert response.headers["content-encoding"] == "gzip"255 assert await response.text() == '{"foo": "bar"}\n'256async def test_response_text_should_throw_when_requesting_body_of_redirected_response(257 page, server258):259 server.set_redirect("/foo.html", "/empty.html")260 response = await page.goto(server.PREFIX + "/foo.html")261 redirected_from = response.request.redirected_from262 assert redirected_from263 redirected = await redirected_from.response()264 assert redirected.status == 302265 error = None266 try:267 await redirected.text()268 except Error as exc:269 error = exc270 assert "Response body is unavailable for redirect responses" in error.message271async def test_response_json_should_work(page, server):272 response = await page.goto(server.PREFIX + "/simple.json")273 assert await response.json() == {"foo": "bar"}274async def test_response_body_should_work(page, server, assetdir):275 response = await page.goto(server.PREFIX + "/pptr.png")276 with open(277 assetdir / "pptr.png",278 "rb",279 ) as fd:280 assert fd.read() == await response.body()281async def test_response_body_should_work_with_compression(page, server, assetdir):282 server.enable_gzip("/pptr.png")283 response = await page.goto(server.PREFIX + "/pptr.png")284 with open(285 assetdir / "pptr.png",286 "rb",287 ) as fd:288 assert fd.read() == await response.body()289async def test_response_status_text_should_work(page, server):290 server.set_route("/cool", lambda r: (r.setResponseCode(200, b"cool!"), r.finish()))291 response = await page.goto(server.PREFIX + "/cool")292 assert response.status_text == "cool!"293async def test_request_resource_type_should_return_event_source(page, server):294 SSE_MESSAGE = {"foo": "bar"}295 # 1. Setup server-sent events on server that immediately sends a message to the client.296 server.set_route(297 "/sse",298 lambda r: (299 r.setHeader("Content-Type", "text/event-stream"),300 r.setHeader("Connection", "keep-alive"),301 r.setHeader("Cache-Control", "no-cache"),302 r.setResponseCode(200),303 r.write(f"data: {json.dumps(SSE_MESSAGE)}\n\n".encode()),304 r.finish(),305 ),306 )307 # 2. Subscribe to page request events.308 await page.goto(server.EMPTY_PAGE)309 requests = []310 page.on("request", lambda r: requests.append(r))311 # 3. Connect to EventSource in browser and return first message.312 assert (313 await page.evaluate(314 """() => {315 const eventSource = new EventSource('/sse');316 return new Promise(resolve => {317 eventSource.onmessage = e => resolve(JSON.parse(e.data));318 });319 }"""320 )321 == SSE_MESSAGE322 )323 assert requests[0].resource_type == "eventsource"324async def test_network_events_request(page, server):325 requests = []326 page.on("request", lambda r: requests.append(r))327 await page.goto(server.EMPTY_PAGE)328 assert len(requests) == 1329 assert requests[0].url == server.EMPTY_PAGE330 assert requests[0].resource_type == "document"331 assert requests[0].method == "GET"332 assert await requests[0].response()333 assert requests[0].frame == page.main_frame334 assert requests[0].frame.url == server.EMPTY_PAGE335async def test_network_events_response(page, server):336 responses = []337 page.on("response", lambda r: responses.append(r))338 await page.goto(server.EMPTY_PAGE)339 assert len(responses) == 1340 assert responses[0].url == server.EMPTY_PAGE341 assert responses[0].status == 200342 assert responses[0].ok343 assert responses[0].request344async def test_network_events_request_failed(345 page, server, is_chromium, is_webkit, is_mac, is_win346):347 def handle_request(request):348 request.setHeader("Content-Type", "text/css")349 request.transport.loseConnection()350 server.set_route("/one-style.css", handle_request)351 failed_requests = []352 page.on("requestfailed", lambda request: failed_requests.append(request))353 await page.goto(server.PREFIX + "/one-style.html")354 assert len(failed_requests) == 1355 assert "one-style.css" in failed_requests[0].url356 assert await failed_requests[0].response() is None357 assert failed_requests[0].resource_type == "stylesheet"358 if is_chromium:359 assert failed_requests[0].failure == "net::ERR_EMPTY_RESPONSE"360 elif is_webkit:361 if is_mac:362 assert failed_requests[0].failure == "The network connection was lost."363 elif is_win:364 assert (365 failed_requests[0].failure366 == "Server returned nothing (no headers, no data)"367 )368 else:369 assert failed_requests[0].failure == "Message Corrupt"370 else:371 assert failed_requests[0].failure == "NS_ERROR_NET_RESET"372 assert failed_requests[0].frame373async def test_network_events_request_finished(page, server):374 async with page.expect_event("requestfinished") as event_info:375 await page.goto(server.EMPTY_PAGE)376 request = await event_info.value377 assert request.url == server.EMPTY_PAGE378 assert await request.response()379 assert request.frame == page.main_frame380 assert request.frame.url == server.EMPTY_PAGE381async def test_network_events_should_fire_events_in_proper_order(page, server):382 events = []383 page.on("request", lambda request: events.append("request"))384 page.on("response", lambda response: events.append("response"))385 response = await page.goto(server.EMPTY_PAGE)386 await response.finished()387 events.append("requestfinished")388 assert events == ["request", "response", "requestfinished"]389async def test_network_events_should_support_redirects(page, server):390 events = []391 page.on("request", lambda request: events.append(f"{request.method} {request.url}"))392 page.on(393 "response", lambda response: events.append(f"{response.status} {response.url}")394 )395 page.on("requestfinished", lambda request: events.append(f"DONE {request.url}"))396 page.on("requestfailed", lambda request: events.append(f"FAIL {request.url}"))397 server.set_redirect("/foo.html", "/empty.html")398 FOO_URL = server.PREFIX + "/foo.html"399 response = await page.goto(FOO_URL)400 await response.finished()401 assert events == [402 f"GET {FOO_URL}",403 f"302 {FOO_URL}",404 f"DONE {FOO_URL}",405 f"GET {server.EMPTY_PAGE}",406 f"200 {server.EMPTY_PAGE}",407 f"DONE {server.EMPTY_PAGE}",408 ]409 redirected_from = response.request.redirected_from410 assert "/foo.html" in redirected_from.url411 assert redirected_from.redirected_from is None412 assert redirected_from.redirected_to == response.request413async def test_request_is_navigation_request_should_work(page, server):414 pytest.skip(msg="test")415 requests = {}416 def handle_request(request):417 requests[request.url().split("/").pop()] = request418 page.on("request", handle_request)419 server.set_redirect("/rrredirect", "/frames/one-frame.html")420 await page.goto(server.PREFIX + "/rrredirect")421 print("kek")422 assert requests.get("rrredirect").is_navigation_request()423 assert requests.get("one-frame.html").is_navigation_request()424 assert requests.get("frame.html").is_navigation_request()425 assert requests.get("script.js").is_navigation_request() is False426 assert requests.get("style.css").is_navigation_request() is False427async def test_request_is_navigation_request_should_work_when_navigating_to_image(428 page, server429):430 requests = []431 page.on("request", lambda r: requests.append(r))432 await page.goto(server.PREFIX + "/pptr.png")433 assert requests[0].is_navigation_request()434async def test_set_extra_http_headers_should_work(page, server):435 await page.set_extra_http_headers({"foo": "bar"})436 request = (437 await asyncio.gather(438 server.wait_for_request("/empty.html"),439 page.goto(server.EMPTY_PAGE),440 )441 )[0]442 assert request.getHeader("foo") == "bar"443async def test_set_extra_http_headers_should_work_with_redirects(page, server):444 server.set_redirect("/foo.html", "/empty.html")445 await page.set_extra_http_headers({"foo": "bar"})446 request = (447 await asyncio.gather(...
handler.py
Source:handler.py
...252 stats_prefix = "playwright/request_count"253 self.stats.inc_value(stats_prefix)254 self.stats.inc_value(f"{stats_prefix}/resource_type/{request.resource_type}")255 self.stats.inc_value(f"{stats_prefix}/method/{request.method}")256 if request.is_navigation_request():257 self.stats.inc_value(f"{stats_prefix}/navigation")258 def _increment_response_stats(self, response: PlaywrightResponse) -> None:259 stats_prefix = "playwright/response_count"260 self.stats.inc_value(stats_prefix)261 self.stats.inc_value(f"{stats_prefix}/resource_type/{response.request.resource_type}")262 self.stats.inc_value(f"{stats_prefix}/method/{response.request.method}")263 def _make_close_page_callback(self, context_name: str) -> Callable:264 def close_page_callback() -> None:265 if context_name in self.context_semaphores:266 self.context_semaphores[context_name].release()267 return close_page_callback268 def _make_close_browser_context_callback(self, name: str) -> Callable:269 def close_browser_context_callback() -> None:270 logger.debug("Browser context closed: '%s'", name)271 if name in self.contexts:272 self.contexts.pop(name)273 if name in self.context_semaphores:274 self.context_semaphores.pop(name)275 return close_browser_context_callback276 def _make_request_handler(277 self, method: str, scrapy_headers: Headers, body: Optional[bytes], encoding: str = "utf8"278 ) -> Callable:279 async def _request_handler(route: Route, playwright_request: PlaywrightRequest) -> None:280 """Override request headers, method and body."""281 if self.abort_request and self.abort_request(playwright_request):282 await route.abort()283 self.stats.inc_value("playwright/request_count/aborted")284 return None285 processed_headers = await self.process_request_headers(286 self.browser_type, playwright_request, scrapy_headers287 )288 # the request that reaches the callback should contain the headers that were sent289 scrapy_headers.clear()290 scrapy_headers.update(processed_headers)291 overrides: dict = {"headers": processed_headers}292 if playwright_request.is_navigation_request():293 overrides["method"] = method294 if body is not None:295 overrides["post_data"] = body.decode(encoding)296 try:297 await route.continue_(**overrides)298 except Exception as ex: # pylint: disable=broad-except299 if _is_safe_close_error(ex):300 logger.warning(301 f"{playwright_request}: failed processing Playwright request ({ex})"302 )303 else:304 raise305 return _request_handler306def _possible_encodings(headers: Headers, text: str) -> Generator[str, None, None]:...
_network.py
Source:_network.py
...93 return from_nullable_channel(await self._channel.send("response"))94 @property95 def frame(self) -> "Frame":96 return from_channel(self._initializer["frame"])97 def is_navigation_request(self) -> bool:98 return self._initializer["isNavigationRequest"]99 @property100 def redirected_from(self) -> Optional["Request"]:101 return self._redirected_from102 @property103 def redirected_to(self) -> Optional["Request"]:104 return self._redirected_to105 @property106 def failure(self) -> Optional[str]:107 return self._failure_text108 @property109 def timing(self) -> ResourceTiming:110 return self._timing111class Route(ChannelOwner):...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!