Best Python code snippet using playwright-python
_network.py
Source:_network.py
1# Copyright (c) Microsoft Corporation.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import asyncio15import base6416import json17import mimetypes18from collections import defaultdict19from pathlib import Path20from types import SimpleNamespace21from typing import (22 TYPE_CHECKING,23 Any,24 Callable,25 Coroutine,26 Dict,27 List,28 Optional,29 Union,30 cast,31)32from urllib import parse33from playwright._impl._api_structures import (34 Headers,35 HeadersArray,36 RemoteAddr,37 RequestSizes,38 ResourceTiming,39 SecurityDetails,40)41from playwright._impl._api_types import Error42from playwright._impl._connection import (43 ChannelOwner,44 from_channel,45 from_nullable_channel,46)47from playwright._impl._event_context_manager import EventContextManagerImpl48from playwright._impl._helper import ContinueParameters, locals_to_params49from playwright._impl._wait_helper import WaitHelper50if TYPE_CHECKING: # pragma: no cover51 from playwright._impl._fetch import APIResponse52 from playwright._impl._frame import Frame53class Request(ChannelOwner):54 def __init__(55 self, parent: ChannelOwner, type: str, guid: str, initializer: Dict56 ) -> None:57 super().__init__(parent, type, guid, initializer)58 self._redirected_from: Optional["Request"] = from_nullable_channel(59 initializer.get("redirectedFrom")60 )61 self._redirected_to: Optional["Request"] = None62 if self._redirected_from:63 self._redirected_from._redirected_to = self64 self._failure_text: Optional[str] = None65 self._timing: ResourceTiming = {66 "startTime": 0,67 "domainLookupStart": -1,68 "domainLookupEnd": -1,69 "connectStart": -1,70 "secureConnectionStart": -1,71 "connectEnd": -1,72 "requestStart": -1,73 "responseStart": -1,74 "responseEnd": -1,75 }76 self._provisional_headers = RawHeaders(self._initializer["headers"])77 self._all_headers_future: Optional[asyncio.Future[RawHeaders]] = None78 def __repr__(self) -> str:79 return f"<Request url={self.url!r} method={self.method!r}>"80 @property81 def url(self) -> str:82 return self._initializer["url"]83 @property84 def resource_type(self) -> str:85 return self._initializer["resourceType"]86 @property87 def method(self) -> str:88 return self._initializer["method"]89 async def sizes(self) -> RequestSizes:90 response = await self.response()91 if not response:92 raise Error("Unable to fetch sizes for failed request")93 return await response._channel.send("sizes")94 @property95 def post_data(self) -> Optional[str]:96 data = self.post_data_buffer97 if not data:98 return None99 return data.decode()100 @property101 def post_data_json(self) -> Optional[Any]:102 post_data = self.post_data103 if not post_data:104 return None105 content_type = self.headers["content-type"]106 if content_type == "application/x-www-form-urlencoded":107 return dict(parse.parse_qsl(post_data))108 try:109 return json.loads(post_data)110 except Exception:111 raise Error(f"POST data is not a valid JSON object: {post_data}")112 @property113 def post_data_buffer(self) -> Optional[bytes]:114 b64_content = self._initializer.get("postData")115 if b64_content is None:116 return None117 return base64.b64decode(b64_content)118 async def response(self) -> Optional["Response"]:119 return from_nullable_channel(await self._channel.send("response"))120 @property121 def frame(self) -> "Frame":122 return from_channel(self._initializer["frame"])123 def is_navigation_request(self) -> bool:124 return self._initializer["isNavigationRequest"]125 @property126 def redirected_from(self) -> Optional["Request"]:127 return self._redirected_from128 @property129 def redirected_to(self) -> Optional["Request"]:130 return self._redirected_to131 @property132 def failure(self) -> Optional[str]:133 return self._failure_text134 @property135 def timing(self) -> ResourceTiming:136 return self._timing137 @property138 def headers(self) -> Headers:139 return self._provisional_headers.headers()140 async def all_headers(self) -> Headers:141 return (await self._actual_headers()).headers()142 async def headers_array(self) -> HeadersArray:143 return (await self._actual_headers()).headers_array()144 async def header_value(self, name: str) -> Optional[str]:145 return (await self._actual_headers()).get(name)146 async def _actual_headers(self) -> "RawHeaders":147 if not self._all_headers_future:148 self._all_headers_future = asyncio.Future()149 headers = await self._channel.send("rawRequestHeaders")150 self._all_headers_future.set_result(RawHeaders(headers))151 return await self._all_headers_future152class Route(ChannelOwner):153 def __init__(154 self, parent: ChannelOwner, type: str, guid: str, initializer: Dict155 ) -> None:156 super().__init__(parent, type, guid, initializer)157 def __repr__(self) -> str:158 return f"<Route request={self.request}>"159 @property160 def request(self) -> Request:161 return from_channel(self._initializer["request"])162 async def abort(self, errorCode: str = None) -> None:163 await self._race_with_page_close(164 self._channel.send("abort", locals_to_params(locals()))165 )166 async def fulfill(167 self,168 status: int = None,169 headers: Dict[str, str] = None,170 body: Union[str, bytes] = None,171 path: Union[str, Path] = None,172 contentType: str = None,173 response: "APIResponse" = None,174 ) -> None:175 params = locals_to_params(locals())176 if response:177 del params["response"]178 params["status"] = (179 params["status"] if params.get("status") else response.status180 )181 params["headers"] = (182 params["headers"] if params.get("headers") else response.headers183 )184 from playwright._impl._fetch import APIResponse185 if body is None and path is None and isinstance(response, APIResponse):186 if response._request._connection is self._connection:187 params["fetchResponseUid"] = response._fetch_uid188 else:189 body = await response.body()190 length = 0191 if isinstance(body, str):192 params["body"] = body193 params["isBase64"] = False194 length = len(body.encode())195 elif isinstance(body, bytes):196 params["body"] = base64.b64encode(body).decode()197 params["isBase64"] = True198 length = len(body)199 elif path:200 del params["path"]201 file_content = Path(path).read_bytes()202 params["body"] = base64.b64encode(file_content).decode()203 params["isBase64"] = True204 length = len(file_content)205 headers = {k.lower(): str(v) for k, v in params.get("headers", {}).items()}206 if params.get("contentType"):207 headers["content-type"] = params["contentType"]208 elif path:209 headers["content-type"] = (210 mimetypes.guess_type(str(Path(path)))[0] or "application/octet-stream"211 )212 if length and "content-length" not in headers:213 headers["content-length"] = str(length)214 params["headers"] = serialize_headers(headers)215 await self._race_with_page_close(self._channel.send("fulfill", params))216 async def continue_(217 self,218 url: str = None,219 method: str = None,220 headers: Dict[str, str] = None,221 postData: Union[str, bytes] = None,222 ) -> None:223 overrides: ContinueParameters = {}224 if url:225 overrides["url"] = url226 if method:227 overrides["method"] = method228 if headers:229 overrides["headers"] = serialize_headers(headers)230 if isinstance(postData, str):231 overrides["postData"] = base64.b64encode(postData.encode()).decode()232 elif isinstance(postData, bytes):233 overrides["postData"] = base64.b64encode(postData).decode()234 await self._race_with_page_close(235 self._channel.send("continue", cast(Any, overrides))236 )237 def _internal_continue(self) -> None:238 async def continue_route() -> None:239 try:240 await self.continue_()241 except Exception:242 pass243 asyncio.create_task(continue_route())244 async def _race_with_page_close(self, future: Coroutine) -> None:245 if hasattr(self.request.frame, "_page"):246 page = self.request.frame._page247 # When page closes or crashes, we catch any potential rejects from this Route.248 # Note that page could be missing when routing popup's initial request that249 # does not have a Page initialized just yet.250 fut = asyncio.create_task(future)251 await asyncio.wait(252 [fut, page._closed_or_crashed_future],253 return_when=asyncio.FIRST_COMPLETED,254 )255 if page._closed_or_crashed_future.done():256 await asyncio.gather(fut, return_exceptions=True)257 else:258 await future259class Response(ChannelOwner):260 def __init__(261 self, parent: ChannelOwner, type: str, guid: str, initializer: Dict262 ) -> None:263 super().__init__(parent, type, guid, initializer)264 self._request: Request = from_channel(self._initializer["request"])265 timing = self._initializer["timing"]266 self._request._timing["startTime"] = timing["startTime"]267 self._request._timing["domainLookupStart"] = timing["domainLookupStart"]268 self._request._timing["domainLookupEnd"] = timing["domainLookupEnd"]269 self._request._timing["connectStart"] = timing["connectStart"]270 self._request._timing["secureConnectionStart"] = timing["secureConnectionStart"]271 self._request._timing["connectEnd"] = timing["connectEnd"]272 self._request._timing["requestStart"] = timing["requestStart"]273 self._request._timing["responseStart"] = timing["responseStart"]274 self._provisional_headers = RawHeaders(275 cast(HeadersArray, self._initializer["headers"])276 )277 self._raw_headers_future: Optional[asyncio.Future[RawHeaders]] = None278 self._finished_future: asyncio.Future[bool] = asyncio.Future()279 def __repr__(self) -> str:280 return f"<Response url={self.url!r} request={self.request}>"281 @property282 def url(self) -> str:283 return self._initializer["url"]284 @property285 def ok(self) -> bool:286 # Status 0 is for file:// URLs287 return self._initializer["status"] == 0 or (288 self._initializer["status"] >= 200 and self._initializer["status"] <= 299289 )290 @property291 def status(self) -> int:292 return self._initializer["status"]293 @property294 def status_text(self) -> str:295 return self._initializer["statusText"]296 @property297 def headers(self) -> Headers:298 return self._provisional_headers.headers()299 async def all_headers(self) -> Headers:300 return (await self._actual_headers()).headers()301 async def headers_array(self) -> HeadersArray:302 return (await self._actual_headers()).headers_array()303 async def header_value(self, name: str) -> Optional[str]:304 return (await self._actual_headers()).get(name)305 async def header_values(self, name: str) -> List[str]:306 return (await self._actual_headers()).get_all(name)307 async def _actual_headers(self) -> "RawHeaders":308 if not self._raw_headers_future:309 self._raw_headers_future = asyncio.Future()310 headers = cast(HeadersArray, await self._channel.send("rawResponseHeaders"))311 self._raw_headers_future.set_result(RawHeaders(headers))312 return await self._raw_headers_future313 async def server_addr(self) -> Optional[RemoteAddr]:314 return await self._channel.send("serverAddr")315 async def security_details(self) -> Optional[SecurityDetails]:316 return await self._channel.send("securityDetails")317 async def finished(self) -> None:318 await self._finished_future319 async def body(self) -> bytes:320 binary = await self._channel.send("body")321 return base64.b64decode(binary)322 async def text(self) -> str:323 content = await self.body()324 return content.decode()325 async def json(self) -> Any:326 return json.loads(await self.text())327 @property328 def request(self) -> Request:329 return self._request330 @property331 def frame(self) -> "Frame":332 return self._request.frame333class WebSocket(ChannelOwner):334 Events = SimpleNamespace(335 Close="close",336 FrameReceived="framereceived",337 FrameSent="framesent",338 Error="socketerror",339 )340 def __init__(341 self, parent: ChannelOwner, type: str, guid: str, initializer: Dict342 ) -> None:343 super().__init__(parent, type, guid, initializer)344 self._is_closed = False345 self._channel.on(346 "frameSent",347 lambda params: self._on_frame_sent(params["opcode"], params["data"]),348 )349 self._channel.on(350 "frameReceived",351 lambda params: self._on_frame_received(params["opcode"], params["data"]),352 )353 self._channel.on(354 "socketError",355 lambda params: self.emit(WebSocket.Events.Error, params["error"]),356 )357 self._channel.on("close", lambda params: self._on_close())358 def __repr__(self) -> str:359 return f"<WebSocket url={self.url!r}>"360 @property361 def url(self) -> str:362 return self._initializer["url"]363 def expect_event(364 self,365 event: str,366 predicate: Callable = None,367 timeout: float = None,368 ) -> EventContextManagerImpl:369 if timeout is None:370 timeout = cast(Any, self._parent)._timeout_settings.timeout()371 wait_helper = WaitHelper(self, f"web_socket.expect_event({event})")372 wait_helper.reject_on_timeout(373 cast(float, timeout),374 f'Timeout {timeout}ms exceeded while waiting for event "{event}"',375 )376 if event != WebSocket.Events.Close:377 wait_helper.reject_on_event(378 self, WebSocket.Events.Close, Error("Socket closed")379 )380 if event != WebSocket.Events.Error:381 wait_helper.reject_on_event(382 self, WebSocket.Events.Error, Error("Socket error")383 )384 wait_helper.reject_on_event(self._parent, "close", Error("Page closed"))385 wait_helper.wait_for_event(self, event, predicate)386 return EventContextManagerImpl(wait_helper.result())387 async def wait_for_event(388 self, event: str, predicate: Callable = None, timeout: float = None389 ) -> Any:390 async with self.expect_event(event, predicate, timeout) as event_info:391 pass392 return await event_info393 def _on_frame_sent(self, opcode: int, data: str) -> None:394 if opcode == 2:395 self.emit(WebSocket.Events.FrameSent, base64.b64decode(data))396 elif opcode == 1:397 self.emit(WebSocket.Events.FrameSent, data)398 def _on_frame_received(self, opcode: int, data: str) -> None:399 if opcode == 2:400 self.emit(WebSocket.Events.FrameReceived, base64.b64decode(data))401 elif opcode == 1:402 self.emit(WebSocket.Events.FrameReceived, data)403 def is_closed(self) -> bool:404 return self._is_closed405 def _on_close(self) -> None:406 self._is_closed = True407 self.emit(WebSocket.Events.Close, self)408def serialize_headers(headers: Dict[str, str]) -> HeadersArray:409 return [{"name": name, "value": value} for name, value in headers.items()]410class RawHeaders:411 def __init__(self, headers: HeadersArray) -> None:412 self._headers_array = headers413 self._headers_map: Dict[str, Dict[str, bool]] = defaultdict(dict)414 for header in headers:415 self._headers_map[header["name"].lower()][header["value"]] = True416 def get(self, name: str) -> Optional[str]:417 values = self.get_all(name)418 if not values:419 return None420 separator = "\n" if name.lower() == "set-cookie" else ", "421 return separator.join(values)422 def get_all(self, name: str) -> List[str]:423 return list(self._headers_map[name.lower()].keys())424 def headers(self) -> Dict[str, str]:425 result = {}426 for name in self._headers_map.keys():427 result[name] = cast(str, self.get(name))428 return result429 def headers_array(self) -> HeadersArray:...
_fetch.py
Source:_fetch.py
...369 "fetchUid": self._fetch_uid,370 },371 )372 @property373 def _fetch_uid(self) -> str:374 return self._initializer["fetchUid"]375 async def _fetch_log(self) -> List[str]:376 return await self._request._channel.send(377 "fetchLog",378 {379 "fetchUid": self._fetch_uid,380 },381 )382def is_json_content_type(headers: network.HeadersArray = None) -> bool:383 if not headers:384 return False385 for header in headers:386 if header["name"] == "Content-Type":387 return header["value"].startswith("application/json")...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!