Best Python code snippet using playwright-python
test_network.py
Source:test_network.py
1# Copyright (c) Microsoft Corporation.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import asyncio15import json16from asyncio.futures import Future17from typing import Dict, List, cast18import pytest19from playwright.async_api import Error, Page, Request, Response20async def test_request_fulfill(page, server):21 async def handle_request(route, request):22 assert route.request == request23 assert "empty.html" in request.url24 assert request.headers["user-agent"]25 assert request.method == "GET"26 assert request.post_data is None27 assert request.is_navigation_request()28 assert request.resource_type == "document"29 assert request.frame == page.main_frame30 assert request.frame.url == "about:blank"31 await route.fulfill(body="Text")32 await page.route(33 "**/empty.html",34 lambda route, request: asyncio.create_task(handle_request(route, request)),35 )36 response = await page.goto(server.EMPTY_PAGE)37 assert response.ok38 assert await response.text() == "Text"39async def test_request_continue(page, server):40 async def handle_request(route, request, intercepted):41 intercepted.append(True)42 await route.continue_()43 intercepted = list()44 await page.route(45 "**/*",46 lambda route, request: asyncio.create_task(47 handle_request(route, request, intercepted)48 ),49 )50 response = await page.goto(server.EMPTY_PAGE)51 assert response.ok52 assert intercepted == [True]53 assert await page.title() == ""54async def test_page_events_request_should_fire_for_navigation_requests(55 page: Page, server56):57 requests = []58 page.on("request", lambda r: requests.append(r))59 await page.goto(server.EMPTY_PAGE)60 assert len(requests) == 161async def test_page_events_request_should_fire_for_iframes(page, server, utils):62 requests = []63 page.on("request", lambda r: requests.append(r))64 await page.goto(server.EMPTY_PAGE)65 await utils.attach_frame(page, "frame1", server.EMPTY_PAGE)66 assert len(requests) == 267async def test_page_events_request_should_fire_for_fetches(page, server):68 requests = []69 page.on("request", lambda r: requests.append(r))70 await page.goto(server.EMPTY_PAGE)71 await page.evaluate('() => fetch("/empty.html")')72 assert len(requests) == 273async def test_page_events_request_should_report_requests_and_responses_handled_by_service_worker(74 page: Page, server75):76 await page.goto(server.PREFIX + "/serviceworkers/fetchdummy/sw.html")77 await page.evaluate("() => window.activationPromise")78 sw_response = None79 async with page.expect_request("**/*") as request_info:80 sw_response = await page.evaluate('() => fetchDummy("foo")')81 request = await request_info.value82 assert sw_response == "responseFromServiceWorker:foo"83 assert request.url == server.PREFIX + "/serviceworkers/fetchdummy/foo"84 response = await request.response()85 assert response.url == server.PREFIX + "/serviceworkers/fetchdummy/foo"86 assert await response.text() == "responseFromServiceWorker:foo"87async def test_request_frame_should_work_for_main_frame_navigation_request(88 page, server89):90 requests = []91 page.on("request", lambda r: requests.append(r))92 await page.goto(server.EMPTY_PAGE)93 assert len(requests) == 194 assert requests[0].frame == page.main_frame95async def test_request_frame_should_work_for_subframe_navigation_request(96 page, server, utils97):98 await page.goto(server.EMPTY_PAGE)99 requests = []100 page.on("request", lambda r: requests.append(r))101 await utils.attach_frame(page, "frame1", server.EMPTY_PAGE)102 assert len(requests) == 1103 assert requests[0].frame == page.frames[1]104async def test_request_frame_should_work_for_fetch_requests(page, server):105 await page.goto(server.EMPTY_PAGE)106 requests: List[Request] = []107 page.on("request", lambda r: requests.append(r))108 await page.evaluate('() => fetch("/digits/1.png")')109 requests = [r for r in requests if "favicon" not in r.url]110 assert len(requests) == 1111 assert requests[0].frame == page.main_frame112async def test_request_headers_should_work(113 page, server, is_chromium, is_firefox, is_webkit114):115 response = await page.goto(server.EMPTY_PAGE)116 if is_chromium:117 assert "Chrome" in response.request.headers["user-agent"]118 elif is_firefox:119 assert "Firefox" in response.request.headers["user-agent"]120 elif is_webkit:121 assert "WebKit" in response.request.headers["user-agent"]122async def test_request_headers_should_get_the_same_headers_as_the_server(123 page: Page, server, is_webkit, is_win124):125 server_request_headers_future: Future[Dict[str, str]] = asyncio.Future()126 def handle(request):127 normalized_headers = {128 key.decode().lower(): value[0].decode()129 for key, value in request.requestHeaders.getAllRawHeaders()130 }131 server_request_headers_future.set_result(normalized_headers)132 request.write(b"done")133 request.finish()134 server.set_route("/empty.html", handle)135 response = await page.goto(server.EMPTY_PAGE)136 server_headers = await server_request_headers_future137 if is_webkit and is_win:138 # Curl does not show accept-encoding and accept-language139 server_headers.pop("accept-encoding")140 server_headers.pop("accept-language")141 assert cast(Response, response).request.headers == server_headers142async def test_request_headers_should_get_the_same_headers_as_the_server_cors(143 page: Page, server, is_webkit, is_win144):145 await page.goto(server.PREFIX + "/empty.html")146 server_request_headers_future: Future[Dict[str, str]] = asyncio.Future()147 def handle_something(request):148 normalized_headers = {149 key.decode().lower(): value[0].decode()150 for key, value in request.requestHeaders.getAllRawHeaders()151 }152 server_request_headers_future.set_result(normalized_headers)153 request.setHeader("Access-Control-Allow-Origin", "*")154 request.write(b"done")155 request.finish()156 server.set_route("/something", handle_something)157 text = None158 async with page.expect_request("**/*") as request_info:159 text = await page.evaluate(160 """async url => {161 const data = await fetch(url);162 return data.text();163 }""",164 server.CROSS_PROCESS_PREFIX + "/something",165 )166 request = await request_info.value167 assert text == "done"168 server_headers = await server_request_headers_future169 if is_webkit and is_win:170 # Curl does not show accept-encoding and accept-language171 server_headers.pop("accept-encoding")172 server_headers.pop("accept-language")173 assert request.headers == server_headers174async def test_response_headers_should_work(page, server):175 server.set_route("/empty.html", lambda r: (r.setHeader("foo", "bar"), r.finish()))176 response = await page.goto(server.EMPTY_PAGE)177 assert response.headers["foo"] == "bar"178async def test_request_post_data_should_work(page, server):179 await page.goto(server.EMPTY_PAGE)180 server.set_route("/post", lambda r: r.finish())181 requests = []182 page.on("request", lambda r: requests.append(r))183 await page.evaluate(184 '() => fetch("./post", { method: "POST", body: JSON.stringify({foo: "bar"})})'185 )186 assert len(requests) == 1187 assert requests[0].post_data == '{"foo":"bar"}'188async def test_request_post_data__should_be_undefined_when_there_is_no_post_data(189 page, server190):191 response = await page.goto(server.EMPTY_PAGE)192 assert response.request.post_data is None193async def test_should_parse_the_json_post_data(page, server):194 await page.goto(server.EMPTY_PAGE)195 server.set_route("/post", lambda req: req.finish())196 requests = []197 page.on("request", lambda r: requests.append(r))198 await page.evaluate(199 """() => fetch('./post', { method: 'POST', body: JSON.stringify({ foo: 'bar' }) })"""200 )201 assert len(requests) == 1202 assert requests[0].post_data_json == {"foo": "bar"}203async def test_should_parse_the_data_if_content_type_is_form_urlencoded(page, server):204 await page.goto(server.EMPTY_PAGE)205 server.set_route("/post", lambda req: req.finish())206 requests = []207 page.on("request", lambda r: requests.append(r))208 await page.set_content(209 """<form method='POST' action='/post'><input type='text' name='foo' value='bar'><input type='number' name='baz' value='123'><input type='submit'></form>"""210 )211 await page.click("input[type=submit]")212 assert len(requests) == 1213 assert requests[0].post_data_json == {"foo": "bar", "baz": "123"}214async def test_should_be_undefined_when_there_is_no_post_data(page, server):215 response = await page.goto(server.EMPTY_PAGE)216 assert response.request.post_data_json is None217async def test_should_work_with_binary_post_data(page, server):218 await page.goto(server.EMPTY_PAGE)219 server.set_route("/post", lambda req: req.finish())220 requests = []221 page.on("request", lambda r: requests.append(r))222 await page.evaluate(223 """async () => {224 await fetch('./post', { method: 'POST', body: new Uint8Array(Array.from(Array(256).keys())) })225 }"""226 )227 assert len(requests) == 1228 buffer = requests[0].post_data_buffer229 assert len(buffer) == 256230 for i in range(256):231 assert buffer[i] == i232async def test_should_work_with_binary_post_data_and_interception(page, server):233 await page.goto(server.EMPTY_PAGE)234 server.set_route("/post", lambda req: req.finish())235 requests = []236 await page.route("/post", lambda route: asyncio.ensure_future(route.continue_()))237 page.on("request", lambda r: requests.append(r))238 await page.evaluate(239 """async () => {240 await fetch('./post', { method: 'POST', body: new Uint8Array(Array.from(Array(256).keys())) })241 }"""242 )243 assert len(requests) == 1244 buffer = requests[0].post_data_buffer245 assert len(buffer) == 256246 for i in range(256):247 assert buffer[i] == i248async def test_response_text_should_work(page, server):249 response = await page.goto(server.PREFIX + "/simple.json")250 assert await response.text() == '{"foo": "bar"}\n'251async def test_response_text_should_return_uncompressed_text(page, server):252 server.enable_gzip("/simple.json")253 response = await page.goto(server.PREFIX + "/simple.json")254 assert response.headers["content-encoding"] == "gzip"255 assert await response.text() == '{"foo": "bar"}\n'256async def test_response_text_should_throw_when_requesting_body_of_redirected_response(257 page, server258):259 server.set_redirect("/foo.html", "/empty.html")260 response = await page.goto(server.PREFIX + "/foo.html")261 redirected_from = response.request.redirected_from262 assert redirected_from263 redirected = await redirected_from.response()264 assert redirected.status == 302265 error = None266 try:267 await redirected.text()268 except Error as exc:269 error = exc270 assert "Response body is unavailable for redirect responses" in error.message271async def test_response_json_should_work(page, server):272 response = await page.goto(server.PREFIX + "/simple.json")273 assert await response.json() == {"foo": "bar"}274async def test_response_body_should_work(page, server, assetdir):275 response = await page.goto(server.PREFIX + "/pptr.png")276 with open(277 assetdir / "pptr.png",278 "rb",279 ) as fd:280 assert fd.read() == await response.body()281async def test_response_body_should_work_with_compression(page, server, assetdir):282 server.enable_gzip("/pptr.png")283 response = await page.goto(server.PREFIX + "/pptr.png")284 with open(285 assetdir / "pptr.png",286 "rb",287 ) as fd:288 assert fd.read() == await response.body()289async def test_response_status_text_should_work(page, server):290 server.set_route("/cool", lambda r: (r.setResponseCode(200, b"cool!"), r.finish()))291 response = await page.goto(server.PREFIX + "/cool")292 assert response.status_text == "cool!"293async def test_request_resource_type_should_return_event_source(page, server):294 SSE_MESSAGE = {"foo": "bar"}295 # 1. Setup server-sent events on server that immediately sends a message to the client.296 server.set_route(297 "/sse",298 lambda r: (299 r.setHeader("Content-Type", "text/event-stream"),300 r.setHeader("Connection", "keep-alive"),301 r.setHeader("Cache-Control", "no-cache"),302 r.setResponseCode(200),303 r.write(f"data: {json.dumps(SSE_MESSAGE)}\n\n".encode()),304 r.finish(),305 ),306 )307 # 2. Subscribe to page request events.308 await page.goto(server.EMPTY_PAGE)309 requests = []310 page.on("request", lambda r: requests.append(r))311 # 3. Connect to EventSource in browser and return first message.312 assert (313 await page.evaluate(314 """() => {315 const eventSource = new EventSource('/sse');316 return new Promise(resolve => {317 eventSource.onmessage = e => resolve(JSON.parse(e.data));318 });319 }"""320 )321 == SSE_MESSAGE322 )323 assert requests[0].resource_type == "eventsource"324async def test_network_events_request(page, server):325 requests = []326 page.on("request", lambda r: requests.append(r))327 await page.goto(server.EMPTY_PAGE)328 assert len(requests) == 1329 assert requests[0].url == server.EMPTY_PAGE330 assert requests[0].resource_type == "document"331 assert requests[0].method == "GET"332 assert await requests[0].response()333 assert requests[0].frame == page.main_frame334 assert requests[0].frame.url == server.EMPTY_PAGE335async def test_network_events_response(page, server):336 responses = []337 page.on("response", lambda r: responses.append(r))338 await page.goto(server.EMPTY_PAGE)339 assert len(responses) == 1340 assert responses[0].url == server.EMPTY_PAGE341 assert responses[0].status == 200342 assert responses[0].ok343 assert responses[0].request344async def test_network_events_request_failed(345 page, server, is_chromium, is_webkit, is_mac, is_win346):347 def handle_request(request):348 request.setHeader("Content-Type", "text/css")349 request.transport.loseConnection()350 server.set_route("/one-style.css", handle_request)351 failed_requests = []352 page.on("requestfailed", lambda request: failed_requests.append(request))353 await page.goto(server.PREFIX + "/one-style.html")354 assert len(failed_requests) == 1355 assert "one-style.css" in failed_requests[0].url356 assert await failed_requests[0].response() is None357 assert failed_requests[0].resource_type == "stylesheet"358 if is_chromium:359 assert failed_requests[0].failure == "net::ERR_EMPTY_RESPONSE"360 elif is_webkit:361 if is_mac:362 assert failed_requests[0].failure == "The network connection was lost."363 elif is_win:364 assert (365 failed_requests[0].failure366 == "Server returned nothing (no headers, no data)"367 )368 else:369 assert failed_requests[0].failure == "Message Corrupt"370 else:371 assert failed_requests[0].failure == "NS_ERROR_NET_RESET"372 assert failed_requests[0].frame373async def test_network_events_request_finished(page, server):374 async with page.expect_event("requestfinished") as event_info:375 await page.goto(server.EMPTY_PAGE)376 request = await event_info.value377 assert request.url == server.EMPTY_PAGE378 assert await request.response()379 assert request.frame == page.main_frame380 assert request.frame.url == server.EMPTY_PAGE381async def test_network_events_should_fire_events_in_proper_order(page, server):382 events = []383 page.on("request", lambda request: events.append("request"))384 page.on("response", lambda response: events.append("response"))385 response = await page.goto(server.EMPTY_PAGE)386 await response.finished()387 events.append("requestfinished")388 assert events == ["request", "response", "requestfinished"]389async def test_network_events_should_support_redirects(page, server):390 events = []391 page.on("request", lambda request: events.append(f"{request.method} {request.url}"))392 page.on(393 "response", lambda response: events.append(f"{response.status} {response.url}")394 )395 page.on("requestfinished", lambda request: events.append(f"DONE {request.url}"))396 page.on("requestfailed", lambda request: events.append(f"FAIL {request.url}"))397 server.set_redirect("/foo.html", "/empty.html")398 FOO_URL = server.PREFIX + "/foo.html"399 response = await page.goto(FOO_URL)400 await response.finished()401 assert events == [402 f"GET {FOO_URL}",403 f"302 {FOO_URL}",404 f"DONE {FOO_URL}",405 f"GET {server.EMPTY_PAGE}",406 f"200 {server.EMPTY_PAGE}",407 f"DONE {server.EMPTY_PAGE}",408 ]409 redirected_from = response.request.redirected_from410 assert "/foo.html" in redirected_from.url411 assert redirected_from.redirected_from is None412 assert redirected_from.redirected_to == response.request413async def test_request_is_navigation_request_should_work(page, server):414 pytest.skip(msg="test")415 requests = {}416 def handle_request(request):417 requests[request.url().split("/").pop()] = request418 page.on("request", handle_request)419 server.set_redirect("/rrredirect", "/frames/one-frame.html")420 await page.goto(server.PREFIX + "/rrredirect")421 print("kek")422 assert requests.get("rrredirect").is_navigation_request()423 assert requests.get("one-frame.html").is_navigation_request()424 assert requests.get("frame.html").is_navigation_request()425 assert requests.get("script.js").is_navigation_request() is False426 assert requests.get("style.css").is_navigation_request() is False427async def test_request_is_navigation_request_should_work_when_navigating_to_image(428 page, server429):430 requests = []431 page.on("request", lambda r: requests.append(r))432 await page.goto(server.PREFIX + "/pptr.png")433 assert requests[0].is_navigation_request()434async def test_set_extra_http_headers_should_work(page, server):435 await page.set_extra_http_headers({"foo": "bar"})436 request = (437 await asyncio.gather(438 server.wait_for_request("/empty.html"),439 page.goto(server.EMPTY_PAGE),440 )441 )[0]442 assert request.getHeader("foo") == "bar"443async def test_set_extra_http_headers_should_work_with_redirects(page, server):444 server.set_redirect("/foo.html", "/empty.html")445 await page.set_extra_http_headers({"foo": "bar"})446 request = (447 await asyncio.gather(448 server.wait_for_request("/empty.html"),449 page.goto(server.PREFIX + "/foo.html"),450 )451 )[0]452 assert request.getHeader("foo") == "bar"453async def test_set_extra_http_headers_should_work_with_extra_headers_from_browser_context(454 browser, server455):456 context = await browser.new_context()457 await context.set_extra_http_headers({"foo": "bar"})458 page = await context.new_page()459 request = (460 await asyncio.gather(461 server.wait_for_request("/empty.html"),462 page.goto(server.EMPTY_PAGE),463 )464 )[0]465 await context.close()466 assert request.getHeader("foo") == "bar"467async def test_set_extra_http_headers_should_override_extra_headers_from_browser_context(468 browser, server469):470 context = await browser.new_context(extra_http_headers={"fOo": "bAr", "baR": "foO"})471 page = await context.new_page()472 await page.set_extra_http_headers({"Foo": "Bar"})473 request = (474 await asyncio.gather(475 server.wait_for_request("/empty.html"),476 page.goto(server.EMPTY_PAGE),477 )478 )[0]479 await context.close()480 assert request.getHeader("foo") == "Bar"481 assert request.getHeader("bar") == "foO"482async def test_set_extra_http_headers_should_throw_for_non_string_header_values(483 page, server484):485 error = None486 try:487 await page.set_extra_http_headers({"foo": 1})488 except Error as exc:489 error = exc...
_fetch.py
Source:_fetch.py
1# Copyright (c) Microsoft Corporation.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import base6415import json16import pathlib17import typing18from pathlib import Path19from typing import Any, Dict, List, Optional, Union, cast20import playwright._impl._network as network21from playwright._impl._api_structures import (22 FilePayload,23 FormField,24 Headers,25 HttpCredentials,26 ProxySettings,27 ServerFilePayload,28 StorageState,29)30from playwright._impl._connection import ChannelOwner, from_channel31from playwright._impl._helper import (32 Error,33 NameValue,34 async_readfile,35 async_writefile,36 is_file_payload,37 is_safe_close_error,38 locals_to_params,39 object_to_array,40)41from playwright._impl._network import serialize_headers42from playwright._impl._tracing import Tracing43if typing.TYPE_CHECKING:44 from playwright._impl._playwright import Playwright45class APIRequest:46 def __init__(self, playwright: "Playwright") -> None:47 self.playwright = playwright48 self._loop = playwright._loop49 self._dispatcher_fiber = playwright._connection._dispatcher_fiber50 async def new_context(51 self,52 baseURL: str = None,53 extraHTTPHeaders: Dict[str, str] = None,54 httpCredentials: HttpCredentials = None,55 ignoreHTTPSErrors: bool = None,56 proxy: ProxySettings = None,57 userAgent: str = None,58 timeout: float = None,59 storageState: Union[StorageState, str, Path] = None,60 ) -> "APIRequestContext":61 params = locals_to_params(locals())62 if "storageState" in params:63 storage_state = params["storageState"]64 if not isinstance(storage_state, dict) and storage_state:65 params["storageState"] = json.loads(66 (await async_readfile(storage_state)).decode()67 )68 if "extraHTTPHeaders" in params:69 params["extraHTTPHeaders"] = serialize_headers(params["extraHTTPHeaders"])70 context = cast(71 APIRequestContext,72 from_channel(await self.playwright._channel.send("newRequest", params)),73 )74 context._tracing._local_utils = self.playwright._utils75 return context76class APIRequestContext(ChannelOwner):77 def __init__(78 self, parent: ChannelOwner, type: str, guid: str, initializer: Dict79 ) -> None:80 super().__init__(parent, type, guid, initializer)81 self._tracing: Tracing = from_channel(initializer["tracing"])82 async def dispose(self) -> None:83 await self._channel.send("dispose")84 async def delete(85 self,86 url: str,87 params: Dict[str, Union[bool, float, str]] = None,88 headers: Headers = None,89 data: Union[Any, bytes, str] = None,90 form: Dict[str, Union[bool, float, str]] = None,91 multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,92 timeout: float = None,93 failOnStatusCode: bool = None,94 ignoreHTTPSErrors: bool = None,95 ) -> "APIResponse":96 return await self.fetch(97 url,98 method="DELETE",99 params=params,100 headers=headers,101 data=data,102 form=form,103 multipart=multipart,104 timeout=timeout,105 failOnStatusCode=failOnStatusCode,106 ignoreHTTPSErrors=ignoreHTTPSErrors,107 )108 async def head(109 self,110 url: str,111 params: Dict[str, Union[bool, float, str]] = None,112 headers: Headers = None,113 timeout: float = None,114 failOnStatusCode: bool = None,115 ignoreHTTPSErrors: bool = None,116 ) -> "APIResponse":117 return await self.fetch(118 url,119 method="HEAD",120 params=params,121 headers=headers,122 timeout=timeout,123 failOnStatusCode=failOnStatusCode,124 ignoreHTTPSErrors=ignoreHTTPSErrors,125 )126 async def get(127 self,128 url: str,129 params: Dict[str, Union[bool, float, str]] = None,130 headers: Headers = None,131 timeout: float = None,132 failOnStatusCode: bool = None,133 ignoreHTTPSErrors: bool = None,134 ) -> "APIResponse":135 return await self.fetch(136 url,137 method="GET",138 params=params,139 headers=headers,140 timeout=timeout,141 failOnStatusCode=failOnStatusCode,142 ignoreHTTPSErrors=ignoreHTTPSErrors,143 )144 async def patch(145 self,146 url: str,147 params: Dict[str, Union[bool, float, str]] = None,148 headers: Headers = None,149 data: Union[Any, bytes, str] = None,150 form: Dict[str, Union[bool, float, str]] = None,151 multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,152 timeout: float = None,153 failOnStatusCode: bool = None,154 ignoreHTTPSErrors: bool = None,155 ) -> "APIResponse":156 return await self.fetch(157 url,158 method="PATCH",159 params=params,160 headers=headers,161 data=data,162 form=form,163 multipart=multipart,164 timeout=timeout,165 failOnStatusCode=failOnStatusCode,166 ignoreHTTPSErrors=ignoreHTTPSErrors,167 )168 async def put(169 self,170 url: str,171 params: Dict[str, Union[bool, float, str]] = None,172 headers: Headers = None,173 data: Union[Any, bytes, str] = None,174 form: Dict[str, Union[bool, float, str]] = None,175 multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,176 timeout: float = None,177 failOnStatusCode: bool = None,178 ignoreHTTPSErrors: bool = None,179 ) -> "APIResponse":180 return await self.fetch(181 url,182 method="PUT",183 params=params,184 headers=headers,185 data=data,186 form=form,187 multipart=multipart,188 timeout=timeout,189 failOnStatusCode=failOnStatusCode,190 ignoreHTTPSErrors=ignoreHTTPSErrors,191 )192 async def post(193 self,194 url: str,195 params: Dict[str, Union[bool, float, str]] = None,196 headers: Headers = None,197 data: Union[Any, bytes, str] = None,198 form: Dict[str, Union[bool, float, str]] = None,199 multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,200 timeout: float = None,201 failOnStatusCode: bool = None,202 ignoreHTTPSErrors: bool = None,203 ) -> "APIResponse":204 return await self.fetch(205 url,206 method="POST",207 params=params,208 headers=headers,209 data=data,210 form=form,211 multipart=multipart,212 timeout=timeout,213 failOnStatusCode=failOnStatusCode,214 ignoreHTTPSErrors=ignoreHTTPSErrors,215 )216 async def fetch(217 self,218 urlOrRequest: Union[str, network.Request],219 params: Dict[str, Union[bool, float, str]] = None,220 method: str = None,221 headers: Headers = None,222 data: Union[Any, bytes, str] = None,223 form: Dict[str, Union[bool, float, str]] = None,224 multipart: Dict[str, Union[bytes, bool, float, str, FilePayload]] = None,225 timeout: float = None,226 failOnStatusCode: bool = None,227 ignoreHTTPSErrors: bool = None,228 ) -> "APIResponse":229 request = urlOrRequest if isinstance(urlOrRequest, network.Request) else None230 assert request or isinstance(231 urlOrRequest, str232 ), "First argument must be either URL string or Request"233 assert (234 (1 if data else 0) + (1 if form else 0) + (1 if multipart else 0)235 ) <= 1, "Only one of 'data', 'form' or 'multipart' can be specified"236 url = request.url if request else urlOrRequest237 method = method or (request.method if request else "GET")238 # Cannot call allHeaders() here as the request may be paused inside route handler.239 headers_obj = headers or (request.headers if request else None)240 serialized_headers = serialize_headers(headers_obj) if headers_obj else None241 json_data: Any = None242 form_data: Optional[List[NameValue]] = None243 multipart_data: Optional[List[FormField]] = None244 post_data_buffer: Optional[bytes] = None245 if data:246 if isinstance(data, str):247 if is_json_content_type(serialized_headers):248 json_data = data249 else:250 post_data_buffer = data.encode()251 elif isinstance(data, bytes):252 post_data_buffer = data253 elif isinstance(data, (dict, list, int, bool)):254 json_data = data255 else:256 raise Error(f"Unsupported 'data' type: {type(data)}")257 elif form:258 form_data = object_to_array(form)259 elif multipart:260 multipart_data = []261 # Convert file-like values to ServerFilePayload structs.262 for name, value in multipart.items():263 if is_file_payload(value):264 payload = cast(FilePayload, value)265 assert isinstance(266 payload["buffer"], bytes267 ), f"Unexpected buffer type of 'data.{name}'"268 multipart_data.append(269 FormField(name=name, file=file_payload_to_json(payload))270 )271 elif isinstance(value, str):272 multipart_data.append(FormField(name=name, value=value))273 if (274 post_data_buffer is None275 and json_data is None276 and form_data is None277 and multipart_data is None278 ):279 post_data_buffer = request.post_data_buffer if request else None280 post_data = (281 base64.b64encode(post_data_buffer).decode() if post_data_buffer else None282 )283 def filter_none(input: Dict) -> Dict:284 return {k: v for k, v in input.items() if v is not None}285 response = await self._channel.send(286 "fetch",287 filter_none(288 {289 "url": url,290 "params": object_to_array(params),291 "method": method,292 "headers": serialized_headers,293 "postData": post_data,294 "jsonData": json_data,295 "formData": form_data,296 "multipartData": multipart_data,297 "timeout": timeout,298 "failOnStatusCode": failOnStatusCode,299 "ignoreHTTPSErrors": ignoreHTTPSErrors,300 }301 ),302 )303 return APIResponse(self, response)304 async def storage_state(305 self, path: Union[pathlib.Path, str] = None306 ) -> StorageState:307 result = await self._channel.send_return_as_dict("storageState")308 if path:309 await async_writefile(path, json.dumps(result))310 return result311def file_payload_to_json(payload: FilePayload) -> ServerFilePayload:312 return ServerFilePayload(313 name=payload["name"],314 mimeType=payload["mimeType"],315 buffer=base64.b64encode(payload["buffer"]).decode(),316 )317class APIResponse:318 def __init__(self, context: APIRequestContext, initializer: Dict) -> None:319 self._loop = context._loop320 self._dispatcher_fiber = context._connection._dispatcher_fiber321 self._request = context322 self._initializer = initializer323 self._headers = network.RawHeaders(initializer["headers"])324 def __repr__(self) -> str:325 return f"<APIResponse url={self.url!r} status={self.status!r} status_text={self.status_text!r}>"326 @property327 def ok(self) -> bool:328 return self.status >= 200 and self.status <= 299329 @property330 def url(self) -> str:331 return self._initializer["url"]332 @property333 def status(self) -> int:334 return self._initializer["status"]335 @property336 def status_text(self) -> str:337 return self._initializer["statusText"]338 @property339 def headers(self) -> Headers:340 return self._headers.headers()341 @property342 def headers_array(self) -> network.HeadersArray:343 return self._headers.headers_array()344 async def body(self) -> bytes:345 try:346 result = await self._request._channel.send_return_as_dict(347 "fetchResponseBody",348 {349 "fetchUid": self._fetch_uid,350 },351 )352 if result is None:353 raise Error("Response has been disposed")354 return base64.b64decode(result["binary"])355 except Error as exc:356 if is_safe_close_error(exc):357 raise Error("Response has been disposed")358 raise exc359 async def text(self) -> str:360 content = await self.body()361 return content.decode()362 async def json(self) -> Any:363 content = await self.text()364 return json.loads(content)365 async def dispose(self) -> None:366 await self._request._channel.send(367 "disposeAPIResponse",368 {369 "fetchUid": self._fetch_uid,370 },371 )372 @property373 def _fetch_uid(self) -> str:374 return self._initializer["fetchUid"]375 async def _fetch_log(self) -> List[str]:376 return await self._request._channel.send(377 "fetchLog",378 {379 "fetchUid": self._fetch_uid,380 },381 )382def is_json_content_type(headers: network.HeadersArray = None) -> bool:383 if not headers:384 return False385 for header in headers:386 if header["name"] == "Content-Type":387 return header["value"].startswith("application/json")...
_network.py
Source:_network.py
...80 if content_type == "application/x-www-form-urlencoded":81 return dict(parse.parse_qsl(post_data))82 return json.loads(post_data)83 @property84 def post_data_buffer(self) -> Optional[bytes]:85 b64_content = self._initializer.get("postData")86 if not b64_content:87 return None88 return base64.b64decode(b64_content)89 @property90 def headers(self) -> Dict[str, str]:91 return self._headers92 async def response(self) -> Optional["Response"]:93 return from_nullable_channel(await self._channel.send("response"))94 @property95 def frame(self) -> "Frame":96 return from_channel(self._initializer["frame"])97 def is_navigation_request(self) -> bool:98 return self._initializer["isNavigationRequest"]...
main.py
Source:main.py
1from os import device_encoding2import sys3import asyncio4import requests5import time6import aiohttp7import json8import random9import argparse10from threading import Thread11from playwright.async_api import async_playwright12parser = argparse.ArgumentParser(description='Script to generate an unlabeled dataset for Poseidon')13parser.add_argument('--threads', dest='threads', action='store', type=int, default=1, help='How many browsers to run in parallel for collection')14parser.add_argument('--proxy-url', dest='proxy_url', action='store', type=str, required=False, help='The url of the proxy')15parser.add_argument('--proxy-port-min', dest='proxy_port_min', action='store', type=int, required=False, help='The minimum port (if one port then set --proxy-port-max to --proxy-port-min)')16parser.add_argument('--proxy-port-max', dest='proxy_port_max', action='store', type=int, required=False, help='The maximum port')17parser.add_argument('--proxy-user', dest='proxy_user', action='store', type=str, required=False, help='Username of proxy auth (if auth exists)')18parser.add_argument('--proxy-pass', dest='proxy_pass', action='store', type=str, required=False, help='Password of proxy auth (if auth exists)')19args = parser.parse_args()20if args.proxy_url and (args.proxy_port_min is None or args.proxy_port_max is None):21 parser.error("--proxy-url requires --proxy-port-min and --proxy-port-max.")22if args.proxy_url and len([x for x in (args.proxy_user,args.proxy_pass) if x is not None]) == 1:23 parser.error("--proxy-user and --proxy-pass must be given together.")24LANDING="https://recaptcha-demo.appspot.com/recaptcha-v2-invisible.php"25async def download_image(response):26 with open('parts/' + str(time.time_ns()) + str(random.randint(0, 10000)) + '.jpeg', 'wb') as f:27 f.write(await response.body())28async def capture_route(session, route):29 request = route.request30 headers = request.headers31 method = request.method32 post_data_buffer = request.post_data_buffer33 url = request.url34 if LANDING in url:35 with open('document.html') as doc:36 return await route.fulfill(body=doc.read(), content_type='text/html', status=200, headers={ 'Access-Control-Allow-Origin': '*' })37 proxy = None38 if args.proxy_url is not None:39 if args.proxy_user is not None:40 proxy = 'http://{}:{}@{}:{}'.format(args.proxy_user, args.proxy_pass, args.proxy_url, random.randint(args.proxy_port_min, args.proxy_port_max))41 else:42 proxy = 'http://{}:{}'.format(args.proxy_url, random.randint(args.proxy_port_min, args.proxy_port_max))43 44 async with session.request(method=method, url=url, data=post_data_buffer, headers=headers, proxy=proxy) as response:45 content_type = response.headers.get('content-type')46 47 print(response.status, response.url)48 return await route.fulfill(body=await response.read(), content_type=content_type, status=response.status, headers=response.headers)49async def main_async():50 async with aiohttp.ClientSession() as session:51 async with async_playwright() as p:52 iphone_11 = p.devices["iPhone 11 Pro"]53 browser = await p.chromium.launch(headless=True)54 context = await browser.new_context(55 # **iphone_11,56 locale="en-US",57 geolocation={"longitude": 12.492507, "latitude": 41.889938 },58 permissions=["geolocation"],59 )60 page = await context.new_page()61 async def load_page():62 if len(list(filter(lambda x: 'bframe' in x.url, page.main_frame.child_frames))) > 0:63 bframe = list(filter(lambda x: 'bframe' in x.url, page.main_frame.child_frames))[0]64 print(bframe)65 el = await bframe.wait_for_selector('.rc-button-reload')66 await el.click()67 else:68 try:69 await page.goto(LANDING)70 el = await page.wait_for_selector('#submit')71 await el.click()72 except Exception:73 return await load_page()74 async def ensure_image(response):75 if '/api2/payload' in response.url:76 asyncio.ensure_future(download_image(response))77 elif '/api2/userverify' not in response.url: return78 await load_page()79 80 async def check_labels(frame):81 if 'bframe' in frame.url:82 el = await frame.wait_for_selector('strong')83 label = await el.text_content()84 with open('labels.json', "r+") as f:85 try:86 data = json.loads(f.read())87 except Exception:88 data = []89 90 if label not in data:91 data.append(label)92 93 f.seek(0)94 f.write(json.dumps(data))95 f.truncate()96 97 return await load_page()98 # return await check_labels(frame)99 await page.route('**', lambda route: asyncio.ensure_future(capture_route(session, route)))100 page.on('response', lambda response: asyncio.ensure_future(ensure_image(response)))101 page.on('framenavigated', lambda frame: asyncio.ensure_future(check_labels(frame)))102 await load_page()103 print('Press CTRL-D to stop')104 reader = asyncio.StreamReader()105 pipe = sys.stdin106 loop = asyncio.get_event_loop()107 await loop.connect_read_pipe(lambda: asyncio.StreamReaderProtocol(reader), pipe)108 async for line in reader:109 print(f'Got: {line.decode()!r}')110def main():111 asyncio.run(main_async())112if __name__ == '__main__':113 for i in range(args.threads):114 Thread(target=main).start()...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!