Best Python code snippet using playwright-python
browser_tab.py
Source:browser_tab.py
...983 reply = self.network_manager.get(request)984 reply.finished.connect(callback)985 self._replies.add(reply)986 return reply987 def _on_request_finished(self, callback, method, body, headers,988 follow_redirects, redirects_remaining):989 """ Handle redirects and call the callback. """990 reply = self.sender()991 try:992 if not follow_redirects:993 callback()994 return995 if not redirects_remaining:996 callback() # XXX: should it be an error?997 return998 redirect_url = reply.attribute(QNetworkRequest.RedirectionTargetAttribute)999 if redirect_url is None: # no redirect1000 callback()1001 return...
_page.py
Source:_page.py
...181 ),182 )183 self._channel.on(184 "requestFinished",185 lambda params: self._on_request_finished(186 from_channel(params["request"]), params["responseEndTiming"]187 ),188 )189 self._channel.on(190 "response",191 lambda params: self.emit(192 Page.Events.Response, from_channel(params["response"])193 ),194 )195 self._channel.on(196 "route",197 lambda params: self._on_route(198 from_channel(params["route"]), from_channel(params["request"])199 ),200 )201 self._channel.on(202 "video",203 lambda params: cast(Video, self.video)._set_relative_path(204 params["relativePath"]205 ),206 )207 self._channel.on(208 "webSocket",209 lambda params: self.emit(210 Page.Events.WebSocket, from_channel(params["webSocket"])211 ),212 )213 self._channel.on(214 "worker", lambda params: self._on_worker(from_channel(params["worker"]))215 )216 def _set_browser_context(self, context: "BrowserContext") -> None:217 self._browser_context = context218 self._timeout_settings = TimeoutSettings(context._timeout_settings)219 def _on_request_failed(220 self,221 request: Request,222 response_end_timing: float,223 failure_text: str = None,224 ) -> None:225 request._failure_text = failure_text226 if request._timing:227 request._timing["responseEnd"] = response_end_timing228 self.emit(Page.Events.RequestFailed, request)229 def _on_request_finished(230 self, request: Request, response_end_timing: float231 ) -> None:232 if request._timing:233 request._timing["responseEnd"] = response_end_timing234 self.emit(Page.Events.RequestFinished, request)235 def _on_frame_attached(self, frame: Frame) -> None:236 frame._page = self237 self._frames.append(frame)238 self.emit(Page.Events.FrameAttached, frame)239 def _on_frame_detached(self, frame: Frame) -> None:240 self._frames.remove(frame)241 frame._detached = True242 self.emit(Page.Events.FrameDetached, frame)243 def _on_route(self, route: Route, request: Request) -> None:...
switch.py
Source:switch.py
...198 self.logger.debug(f"Firing request event.")199 self._current_profile = None200 self._reset_turn_off_timer()201 self.fire_event(EVENT_TYPE_AUTOMATIC_LIGHTING, entity_id=self.entity_id, type=EVENT_DATA_TYPE_REQUEST)202 def _on_request_finished(*args: Any) -> None:203 """ Triggered when the request event has finished. """204 self._reset_request_timer()205 if self.is_blocked:206 return207 if self._current_profile:208 self.logger.debug(f"Turning on profile {self._current_profile.id} with the following values: { {CONF_ENTITY_ID: self._current_profile.lights, **self._current_profile.attributes} }")209 self._current_status = self._current_profile.status210 self._turn_off_unused_entities(self._tracked_lights, self._current_profile.lights)211 self.call_service(LIGHT_DOMAIN, SERVICE_TURN_ON, entity_id=self._current_profile.lights, **self._current_profile.attributes)212 else:213 self.logger.debug(f"No profile was provided.")214 self._current_status = STATUS_IDLE215 self._turn_off_unused_entities(self._tracked_lights, [])216 self.async_schedule_update_ha_state(True)...
_browser_context.py
Source:_browser_context.py
...125 ),126 )127 self._channel.on(128 "requestFinished",129 lambda params: self._on_request_finished(130 from_channel(params["request"]),131 from_nullable_channel(params.get("response")),132 params["responseEndTiming"],133 from_nullable_channel(params.get("page")),134 ),135 )136 self._closed_future: asyncio.Future = asyncio.Future()137 self.once(138 self.Events.Close, lambda context: self._closed_future.set_result(True)139 )140 def __repr__(self) -> str:141 return f"<BrowserContext browser={self.browser}>"142 def _on_page(self, page: Page) -> None:143 self._pages.append(page)144 self.emit(BrowserContext.Events.Page, page)145 if page._opener and not page._opener.is_closed():146 page._opener.emit(Page.Events.Popup, page)147 def _on_route(self, route: Route, request: Request) -> None:148 for handler_entry in self._routes:149 if handler_entry.matches(request.url):150 try:151 handler_entry.handle(route, request)152 finally:153 if not handler_entry.is_active:154 self._routes.remove(handler_entry)155 if not len(self._routes) == 0:156 asyncio.create_task(self._disable_interception())157 break158 route._internal_continue()159 def _on_binding(self, binding_call: BindingCall) -> None:160 func = self._bindings.get(binding_call._initializer["name"])161 if func is None:162 return163 asyncio.create_task(binding_call.call(func))164 def set_default_navigation_timeout(self, timeout: float) -> None:165 self._timeout_settings.set_navigation_timeout(timeout)166 self._channel.send_no_reply(167 "setDefaultNavigationTimeoutNoReply", dict(timeout=timeout)168 )169 def set_default_timeout(self, timeout: float) -> None:170 self._timeout_settings.set_timeout(timeout)171 self._channel.send_no_reply("setDefaultTimeoutNoReply", dict(timeout=timeout))172 @property173 def pages(self) -> List[Page]:174 return self._pages.copy()175 @property176 def browser(self) -> Optional["Browser"]:177 return self._browser178 async def new_page(self) -> Page:179 if self._owner_page:180 raise Error("Please use browser.new_context()")181 return from_channel(await self._channel.send("newPage"))182 async def cookies(self, urls: Union[str, List[str]] = None) -> List[Cookie]:183 if urls is None:184 urls = []185 if not isinstance(urls, list):186 urls = [urls]187 return await self._channel.send("cookies", dict(urls=urls))188 async def add_cookies(self, cookies: List[SetCookieParam]) -> None:189 await self._channel.send("addCookies", dict(cookies=cookies))190 async def clear_cookies(self) -> None:191 await self._channel.send("clearCookies")192 async def grant_permissions(193 self, permissions: List[str], origin: str = None194 ) -> None:195 await self._channel.send("grantPermissions", locals_to_params(locals()))196 async def clear_permissions(self) -> None:197 await self._channel.send("clearPermissions")198 async def set_geolocation(self, geolocation: Geolocation = None) -> None:199 await self._channel.send("setGeolocation", locals_to_params(locals()))200 async def set_extra_http_headers(self, headers: Dict[str, str]) -> None:201 await self._channel.send(202 "setExtraHTTPHeaders", dict(headers=serialize_headers(headers))203 )204 async def set_offline(self, offline: bool) -> None:205 await self._channel.send("setOffline", dict(offline=offline))206 async def add_init_script(207 self, script: str = None, path: Union[str, Path] = None208 ) -> None:209 if path:210 script = (await async_readfile(path)).decode()211 if not isinstance(script, str):212 raise Error("Either path or script parameter must be specified")213 await self._channel.send("addInitScript", dict(source=script))214 async def expose_binding(215 self, name: str, callback: Callable, handle: bool = None216 ) -> None:217 for page in self._pages:218 if name in page._bindings:219 raise Error(220 f'Function "{name}" has been already registered in one of the pages'221 )222 if name in self._bindings:223 raise Error(f'Function "{name}" has been already registered')224 self._bindings[name] = callback225 await self._channel.send(226 "exposeBinding", dict(name=name, needsHandle=handle or False)227 )228 async def expose_function(self, name: str, callback: Callable) -> None:229 await self.expose_binding(name, lambda source, *args: callback(*args))230 async def route(231 self, url: URLMatch, handler: RouteHandlerCallback, times: int = None232 ) -> None:233 self._routes.insert(234 0,235 RouteHandler(URLMatcher(self._options.get("baseURL"), url), handler, times),236 )237 if len(self._routes) == 1:238 await self._channel.send(239 "setNetworkInterceptionEnabled", dict(enabled=True)240 )241 async def unroute(242 self, url: URLMatch, handler: Optional[RouteHandlerCallback] = None243 ) -> None:244 self._routes = list(245 filter(246 lambda r: r.matcher.match != url or (handler and r.handler != handler),247 self._routes,248 )249 )250 if len(self._routes) == 0:251 await self._disable_interception()252 async def _disable_interception(self) -> None:253 await self._channel.send("setNetworkInterceptionEnabled", dict(enabled=False))254 def expect_event(255 self,256 event: str,257 predicate: Callable = None,258 timeout: float = None,259 ) -> EventContextManagerImpl:260 if timeout is None:261 timeout = self._timeout_settings.timeout()262 wait_helper = WaitHelper(self, f"browser_context.expect_event({event})")263 wait_helper.reject_on_timeout(264 timeout, f'Timeout {timeout}ms exceeded while waiting for event "{event}"'265 )266 if event != BrowserContext.Events.Close:267 wait_helper.reject_on_event(268 self, BrowserContext.Events.Close, Error("Context closed")269 )270 wait_helper.wait_for_event(self, event, predicate)271 return EventContextManagerImpl(wait_helper.result())272 def _on_close(self) -> None:273 if self._browser:274 self._browser._contexts.remove(self)275 self.emit(BrowserContext.Events.Close, self)276 async def close(self) -> None:277 try:278 if self._options.get("recordHar"):279 har = cast(280 Artifact, from_channel(await self._channel.send("harExport"))281 )282 await har.save_as(283 cast(Dict[str, str], self._options["recordHar"])["path"]284 )285 await har.delete()286 await self._channel.send("close")287 await self._closed_future288 except Exception as e:289 if not is_safe_close_error(e):290 raise e291 async def _pause(self) -> None:292 await self._channel.send("pause")293 async def storage_state(self, path: Union[str, Path] = None) -> StorageState:294 result = await self._channel.send_return_as_dict("storageState")295 if path:296 await async_writefile(path, json.dumps(result))297 return result298 async def wait_for_event(299 self, event: str, predicate: Callable = None, timeout: float = None300 ) -> Any:301 async with self.expect_event(event, predicate, timeout) as event_info:302 pass303 return await event_info304 def expect_page(305 self,306 predicate: Callable[[Page], bool] = None,307 timeout: float = None,308 ) -> EventContextManagerImpl[Page]:309 return self.expect_event(BrowserContext.Events.Page, predicate, timeout)310 def _on_background_page(self, page: Page) -> None:311 self._background_pages.add(page)312 self.emit(BrowserContext.Events.BackgroundPage, page)313 def _on_service_worker(self, worker: Worker) -> None:314 worker._context = self315 self._service_workers.add(worker)316 self.emit(BrowserContext.Events.ServiceWorker, worker)317 def _on_request_failed(318 self,319 request: Request,320 response_end_timing: float,321 failure_text: Optional[str],322 page: Optional[Page],323 ) -> None:324 request._failure_text = failure_text325 if request._timing:326 request._timing["responseEnd"] = response_end_timing327 self.emit(BrowserContext.Events.RequestFailed, request)328 if page:329 page.emit(Page.Events.RequestFailed, request)330 def _on_request_finished(331 self,332 request: Request,333 response: Optional[Response],334 response_end_timing: float,335 page: Optional[Page],336 ) -> None:337 if request._timing:338 request._timing["responseEnd"] = response_end_timing339 self.emit(BrowserContext.Events.RequestFinished, request)340 if page:341 page.emit(Page.Events.RequestFinished, request)342 if response:343 response._finished_future.set_result(True)344 def _on_request(self, request: Request, page: Optional[Page]) -> None:...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!