Best Python code snippet using playwright-python
playwrightmanager.py
Source:playwrightmanager.py
...202 strict_selectors=strict_selectors,203 user_agent=user_agent,204 viewport=viewport205 )206 self._context.set_default_navigation_timeout(self.default_navigation_timeout)207 self._context.set_default_timeout(self.default_timeout)208 def close_context(self):209 """å
³éæµè§å¨ä¸ä¸æã210 å±äºæµè§å¨ä¸ä¸æçææ页é¢é½å°å
³éã211 æ æ³å
³éé»è®¤æµè§å¨ä¸ä¸æã212 """213 if self._context is not None:214 self._context.close()215 if self._browser.contexts:216 self._context = self._browser.contexts[-1]217 self._page = None218 self._interaction = None219 def new_page(220 self,221 accept_downloads: bool = None,222 base_url: str = None,223 bypass_csp: bool = None,224 extra_http_headers: typing.Optional[typing.Dict[str, str]] = None,225 http_credentials: HttpCredentials = None,226 ignore_https_errors: bool = None,227 java_script_enabled: bool = None,228 no_viewport: bool = None,229 proxy: ProxySettings = None,230 storage_state: typing.Union[StorageState, str, pathlib.Path] = None,231 strict_selectors: bool = None,232 user_agent: str = None,233 viewport: ViewportSize = None234 ):235 """å¨æ°çæµè§å¨ä¸ä¸æä¸å建ä¸ä¸ªæ°é¡µé¢ã å
³éæ¤é¡µé¢ä¹å°å
³éä¸ä¸æãè¿æ¯ä¸ä¸ªæ¹ä¾¿çAPIï¼åºè¯¥åªç¨äºå页åºæ¯åçç段ã236 ç产代ç åºæ¾å¼å建æµè§å¨ä¸ä¸æï¼ç¶åå建页é¢ï¼ä»¥æ§å¶å
¶ç¡®åççå½å¨æã237 :param accept_downloads: æ¯å¦èªå¨ä¸è½½ææé件ã é»è®¤ä¸º false ï¼å
¶ä¸ææä¸è½½é½è¢«åæ¶ã238 :param base_url: å½ä½¿ç¨ goto(url, **kwargs), route(url, handler), wait_for_url(url, **kwargs),239 expect_request(url_or_predicate, **kwargs), æ expect_response(url_or_predicate) , **kwargs) æ¶ï¼240 å®éè¿ä½¿ç¨ URL() æé å½æ°æ¥æ建ç¸åºç URL ãä¾åï¼241 baseURL: http://localhost:3000 并导èªå° /bar.html => http://localhost:3000/bar.html242 baseURL: http://localhost:3000/foo/ 并导èªå° ./bar.html => http://localhost:3000/foo/bar.html243 :param bypass_csp: åæ¢ç»è¿é¡µé¢çå
容å®å
¨çç¥ã244 :param extra_http_headers: å
å«æ¯ä¸ªè¯·æ±é½è¦åéçéå HTTP头ç对象ãæææ é¢å¼å¿
é¡»æ¯å符串ã245 :param http_credentials: HTTP 身份éªè¯çåæ®ã246 :param ignore_https_errors: æ¯å¦å¨å¯¼èªè¿ç¨ä¸å¿½ç¥ HTTPS é误ã é»è®¤ä¸º falseã247 :param java_script_enabled: æ¯å¦å¨ä¸ä¸æä¸å¯ç¨ JavaScriptã é»è®¤ä¸º trueã248 :param no_viewport: ä¸å¼ºå¶åºå®è§å£ï¼å
许å¨æ头模å¼ä¸è°æ´çªå£å¤§å°ã249 :param proxy: ä¸æ¤ä¸ä¸æä¸èµ·ä½¿ç¨çç½ç»ä»£ç设置ã250 :param storage_state: 使ç¨ç»å®çåå¨ç¶æå¡«å
ä¸ä¸æã251 æ¤é项å¯ç¨äºä½¿ç¨éè¿ storage_state(**kwargs) è·åçç»å½ä¿¡æ¯åå§åä¸ä¸æã252 å
·æä¿ååå¨çæ件çè·¯å¾ï¼æå
·æ以ä¸å段ç对象:253 cookies <List[Dict]> 为ä¸ä¸æ设置çå¯é cookie254 name <str>255 value <str>256 url <str> urlãdomainåpathä¸éä¸ã257 domain <str> urlãdomainåpathä¸éä¸ã258 path <str> urlãdomainåpathä¸éä¸ã259 expires <float> å¯éç Unix æ¶é´ï¼ä»¥ç§ä¸ºåä½ï¼ã260 httpOnly <bool> å¯éç httpOnly æ å¿261 secure <bool> å¯éçsecureæ å¿262 sameSite <"Strict"|"Lax"|"None"> å¯éç sameSite æ å¿263 origins <List[Dict]> 为ä¸ä¸æ设置çå¯é localStorage264 origin <str>265 localStorage <List[Dict]>266 name <str>267 value <str>268 :param strict_selectors: å®æå®ï¼ä¸ºæ¤ä¸ä¸æå¯ç¨ä¸¥æ ¼éæ©å¨æ¨¡å¼ã269 å¨ä¸¥æ ¼éæ©å¨æ¨¡å¼æ¨¡å¼ä¸ï¼å½å¤ä¸ªå
ç´ ä¸éæ©å¨å¹é
æ¶ï¼å°æåºå¯¹éæ©å¨çæææä½ï¼è¿äºæä½æå³çå个ç®æ DOMå
ç´ ã270 :param user_agent: å¨æ¤ä¸ä¸æä¸ä½¿ç¨çç¹å®ç¨æ·ä»£çã271 :param viewport: 为æ¯ä¸ªé¡µé¢è®¾ç½®ä¸è´çè§çªãé»è®¤ä¸º 1280x720 è§çªã272 width <int> 以åç´ ä¸ºåä½ç页é¢å®½åº¦ã273 height <int> 以åç´ ä¸ºåä½ç页é¢é«åº¦ã274 """275 if self._context is not None:276 self._page = self._context.new_page()277 else:278 if no_viewport is None:279 no_viewport = True280 self._page = self._browser.new_page(281 accept_downloads=accept_downloads,282 base_url=base_url,283 bypass_csp=bypass_csp,284 extra_http_headers=extra_http_headers,285 http_credentials=http_credentials,286 ignore_https_errors=ignore_https_errors,287 java_script_enabled=java_script_enabled,288 no_viewport=no_viewport,289 proxy=proxy,290 storage_state=storage_state,291 strict_selectors=strict_selectors,292 user_agent=user_agent,293 viewport=viewport294 )295 self._page.context.set_default_timeout(self.default_timeout)296 self._page.context.set_default_navigation_timeout(self.default_navigation_timeout)297 self._interaction = self._page298 def close_page(self):299 """å
³éå½åç页é¢ãå¦æè¿æå
¶ä»æå¼ç页é¢ï¼å°åæ¢å°æè¿æå¼ç页é¢ã"""300 if self._page is not None:301 self._page.close()302 if self._context.pages: # å¦æè¿ææå¼ç页é¢303 self._page = self._context.pages[-1]304 self._interaction = self._page305 else:306 self._interaction = None307 def switch_context(self, index: int):308 """æ `index` å°æ´»å¨æµè§å¨ä¸ä¸æåæ¢å°å¦ä¸ä¸ªæå¼çä¸ä¸æã309 :param index: è¦æ´æ¹ä¸ºçä¸ä¸æçç´¢å¼ãä»0å¼å§ã310 """...
_frame.py
Source:_frame.py
...117 Error("Navigating frame was detached!"),118 lambda frame: frame == self,119 )120 if timeout is None:121 timeout = self._page._timeout_settings.navigation_timeout()122 wait_helper.reject_on_timeout(timeout, f"Timeout {timeout}ms exceeded.")123 return wait_helper124 def expect_navigation(125 self,126 url: URLMatch = None,127 wait_until: DocumentLoadState = None,128 timeout: float = None,129 ) -> EventContextManagerImpl[Response]:130 if not wait_until:131 wait_until = "load"132 if timeout is None:133 timeout = self._page._timeout_settings.navigation_timeout()134 deadline = monotonic_time() + timeout135 wait_helper = self._setup_navigation_wait_helper(timeout)136 matcher = URLMatcher(url) if url else None137 def predicate(event: Any) -> bool:138 # Any failed navigation results in a rejection.139 if event.get("error"):140 return True141 return not matcher or matcher.matches(event["url"])142 wait_helper.wait_for_event(143 self._event_emitter,144 "navigated",145 predicate=predicate,146 )147 async def continuation() -> Optional[Response]:...
asv_pilot.py
Source:asv_pilot.py
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3# Software License Agreement (BSD License)4#5# Copyright (c) 2014, Ocean Systems Laboratory, Heriot-Watt University, UK.6# All rights reserved.7#8# Redistribution and use in source and binary forms, with or without9# modification, are permitted provided that the following conditions10# are met:11#12# * Redistributions of source code must retain the above copyright13# notice, this list of conditions and the following disclaimer.14# * Redistributions in binary form must reproduce the above15# copyright notice, this list of conditions and the following16# disclaimer in the documentation and/or other materials provided17# with the distribution.18# * Neither the name of the Heriot-Watt University nor the names of19# its contributors may be used to endorse or promote products20# derived from this software without specific prior written21# permission.22#23# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS24# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT25# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS26# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE27# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,28# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,29# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;30# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER31# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT32# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN33# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE34# POSSIBILITY OF SUCH DAMAGE.35#36# Original authors:37# Valerio De Carolis, Marian Andrecki, Corina Barbalata, Gordon Frost38from __future__ import division39import roslib40roslib.load_manifest('asv_pilot')41import rospy42import numpy as np43np.set_printoptions(precision=2, suppress=True)44# import sys45import asv_controllers as ctrl46import frame_maths as fm47# Messages48from vehicle_interface.msg import PilotRequest, PilotStatus, ThrusterCommand49from auv_msgs.msg import NavSts50from vehicle_interface.srv import BooleanService, BooleanServiceResponse51# Constants52TOPIC_THROTTLE = '/motors/throttle'53TOPIC_POSITION_REQUEST = '/pilot/position_req'54TOPIC_BODY_REQUEST = '/pilot/body_req'55TOPIC_GEO_REQUEST = '/pilot/geo_req'56TOPIC_VELOCITY_REQUEST = '/pilot/velocity_req'57TOPIC_STATUS = '/pilot/status'58TOPIC_NAV = '/nav/nav_sts'59SRV_SWITCH = '/pilot/switch'60SRV_PID_CONFIG = '/pilot/pid_config'61NAVIGATION_TIMEOUT = 5 # seconds62LOOP_RATE = 5 # Hz63SIMULATION = False64VERBOSE = False65# Status66CTRL_DISABLED = 067CTRL_ENABLED = 168STATUS_CTRL = {69 CTRL_DISABLED: PilotStatus.PILOT_DISABLED,70 CTRL_ENABLED: PilotStatus.PILOT_ENABLED71}72STATUS_MODE = {73 ctrl.MODE_POSITION: PilotStatus.MODE_POSITION,74 ctrl.MODE_VELOCITY: PilotStatus.MODE_VELOCITY,75 ctrl.MODE_POINT_SHOOT: 'point_shoot'76}77SCALE_THROTTLE = 1.078class Pilot(object):79 """Node provides an interface between control logic and ROS. This node outputs throttle commands that can be80 consumed either by pololu_driver or thruster_sim. The controller will not run if fresh navigation81 information is not available. The controller can be enabled or disabled via service.82 Generally, the asv has two degrees of freedom: surge and yaw. They are coupled - that is, the boat cannot yaw83 without thrusting forward (note that this means limited steering when slowing down).84 Different control policies:85 - point and shoot - simple P controller on position (distance to goal and orientation towards the goal).86 - cascaded pid - PID position controller outputs a desired velocity, then PID velocity controller attempts to87 achieve this velocity88 - velocity control - in progress89 """90 def __init__(self, name, topic_throttle, topic_position_request, topic_body_request, topic_geo_request,91 topic_velocity_request, topic_nav, topic_pilot_status, srv_switch, verbose, controller_config):92 self.name = name93 # latest throttle received94 self.pose = np.zeros(6) # [x, y, z, roll, pitch, yaw]95 self.vel = np.zeros(6)96 self.origin = np.zeros(2)97 self.geo_radius = fm.compute_geocentric_radius(self.origin[0])98 self.last_nav_t = 099 self.nav_switch = False100 self.pilot_enable = CTRL_DISABLED101 self.verbose = verbose102 self.controller = ctrl.Controller(1/LOOP_RATE)103 self.controller.set_mode(ctrl.MODE_POSITION)104 self.controller.update_gains(controller_config)105 # Subscribers106 self.position_sub = rospy.Subscriber(topic_position_request, PilotRequest, self.handle_pose_req, tcp_nodelay=True, queue_size=1)107 self.body_sub = rospy.Subscriber(topic_body_request, PilotRequest, self.handle_body_req, tcp_nodelay=True, queue_size=1)108 self.geo_sub = rospy.Subscriber(topic_geo_request, PilotRequest, self.handle_geo_req, tcp_nodelay=True, queue_size=1)109 self.velocity_sub = rospy.Subscriber(topic_velocity_request, PilotRequest, self.handle_vel_req, tcp_nodelay=True, queue_size=1)110 self.nav_sub = rospy.Subscriber(topic_nav, NavSts, self.handle_nav, tcp_nodelay=True, queue_size=1)111 # Publishers112 self.throttle_pub = rospy.Publisher(topic_throttle, ThrusterCommand, tcp_nodelay=True, queue_size=1)113 self.status_pub = rospy.Publisher(topic_pilot_status, PilotStatus, tcp_nodelay=True, queue_size=1)114 # Services115 self.srv_switch = rospy.Service(srv_switch, BooleanService, self.handle_switch)116 self.srv_pid_config = rospy.Service(SRV_PID_CONFIG, BooleanService, self.handle_pid_config)117 def loop(self):118 # if nav message is old stop the controller119 if (rospy.Time.now().to_sec() - self.last_nav_t) > NAVIGATION_TIMEOUT and self.nav_switch:120 self.nav_switch = False121 rospy.logerr('Navigation outdated')122 if self.nav_switch and self.pilot_enable:123 throttle = self.controller.evaluate_control()124 if self.verbose:125 rospy.loginfo(str(self.controller))126 thr_msg = ThrusterCommand()127 thr_msg.header.stamp = rospy.Time.now()128 thr_msg.throttle = throttle129 self.throttle_pub.publish(thr_msg)130 self.send_status()131 def handle_nav(self, msg):132 try:133 pos = msg.position134 orient = msg.orientation135 vel = msg.body_velocity136 rot = msg.orientation_rate137 self.pose[0:3] = np.array([pos.north, pos.east, pos.depth])138 self.pose[3:6] = np.array([orient.roll, orient.pitch, orient.yaw])139 self.vel[0:3] = np.array([vel.x, vel.y, vel.z])140 self.vel[3:6] = np.array([rot.roll, rot.pitch, rot.yaw])141 tmp_origin = np.array([msg.origin.latitude, msg.origin.longitude])142 if np.all(self.origin != tmp_origin):143 self.origin = tmp_origin144 self.geo_radius = fm.compute_geocentric_radius(self.origin[0])145 # dt = msg.header.stamp.to_sec() - self.last_nav_t146 self.last_nav_t = msg.header.stamp.to_sec()147 self.nav_switch = True148 self.controller.update_nav(self.pose, velocity=self.vel)149 except Exception as e:150 rospy.logerr('%s', e)151 rospy.logerr('Bad navigation message format, skipping!')152 def handle_pose_req(self, msg):153 req_pose = np.array(msg.position)154 # ignore depth, pitch and roll155 if any(req_pose[2:5]):156 rospy.logwarn('Non-zero depth, pitch or roll requested. Setting those to zero.')157 req_pose[2:6] = 0158 self.controller.request_pose(req_pose)159 def handle_body_req(self, msg):160 req_pose = np.array(msg.position)161 # ignore depth, pitch and roll162 if any(req_pose[2:5]):163 rospy.logwarn('Non-zero depth, pitch or roll requested. Setting those to zero.')164 req_pose[2:6] = 0165 self.controller.request_body(req_pose)166 def handle_geo_req(self, msg):167 req_pose = np.array(msg.position)168 if any(req_pose[2:5]):169 rospy.logwarn('Non-zero depth, pitch or roll requested. Setting those to zero.')170 # ignore depth, pitch and roll171 req_pose[2:6] = 0172 req_pose[0:2] = fm.geo2ne(req_pose[0:2], self.origin, self.geo_radius)173 self.controller.request_pose(req_pose)174 def handle_vel_req(self, msg):175 req_vel = np.array(msg.velocity)176 if any(req_vel[1:5]):177 rospy.logwarn('Non-zero sway, heave, pitch or roll requested. Setting those to zero.')178 req_vel[1:5] = 0179 self.controller.request_vel(req_vel)180 def handle_switch(self, srv):181 self.pilot_enable = srv.request182 if not self.pilot_enable:183 thr_msg = ThrusterCommand()184 thr_msg.header.stamp = rospy.Time.now()185 thr_msg.throttle = np.zeros(6)186 self.throttle_pub.publish(thr_msg)187 return BooleanServiceResponse(self.pilot_enable)188 def handle_pid_config(self, srv):189 if srv.request is True:190 config = rospy.get_param('/controller', dict())191 self.controller.update_gains(config)192 rospy.logwarn("PID config reloaded")193 return BooleanServiceResponse(True)194 else:195 return BooleanServiceResponse(False)196 def send_status(self, event=None):197 ps = PilotStatus()198 ps.header.stamp = rospy.Time.now()199 ps.status = STATUS_CTRL[self.pilot_enable]200 ps.mode = STATUS_MODE[self.controller.mode]201 if self.controller.req_pose is not None:202 ps.des_pos = self.controller.req_pose.tolist()203 vel = np.zeros(6)204 vel[0:2] = self.controller.des_vel205 ps.des_vel = vel.tolist()206 self.status_pub.publish(ps)207if __name__ == '__main__':208 rospy.init_node('asv_pilot')209 name = rospy.get_name()210 topic_throttle = rospy.get_param('~topic_throttle', TOPIC_THROTTLE)211 topic_position_request = rospy.get_param('~topic_position_request', TOPIC_POSITION_REQUEST)212 topic_body_request = rospy.get_param('~topic_body_request', TOPIC_BODY_REQUEST)213 topic_geo_request = rospy.get_param('~topic_geo_request', TOPIC_GEO_REQUEST)214 topic_velocity_request = rospy.get_param('~topic_velocity_request', TOPIC_VELOCITY_REQUEST)215 topic_nav = rospy.get_param('~topic_nav', TOPIC_NAV)216 topic_pilot_status = rospy.get_param('~topic_pilot_status', TOPIC_STATUS)217 srv_switch = rospy.get_param('~srv_switch', SRV_SWITCH)218 # simulation = bool(int(rospy.get_param('~simulation', SIMULATION)))219 verbose = bool(int(rospy.get_param('~verbose', VERBOSE)))220 controller_config = rospy.get_param('~controller', dict())221 rospy.loginfo('%s: throttle topic: %s', name, topic_throttle)222 rospy.loginfo('%s: topic_position_request: %s', name, topic_position_request)223 rospy.loginfo('%s: topic_body_request: %s', name, topic_body_request)224 rospy.loginfo('%s: topic_geo_request: %s', name, topic_geo_request)225 rospy.loginfo('%s: topic_velocity_request: %s', name, topic_velocity_request)226 rospy.loginfo('%s: topic_nav: %s', name, topic_nav)227 rospy.loginfo('%s: topic_pilot_status: %s', name, topic_pilot_status)228 rospy.loginfo('%s: srv_switch: %s', name, srv_switch)229 rospy.loginfo('%s: verbose: %s', name, verbose)230 pilot = Pilot(name, topic_throttle, topic_position_request, topic_body_request, topic_geo_request,231 topic_velocity_request, topic_nav, topic_pilot_status, srv_switch, verbose,232 controller_config)233 loop_rate = rospy.Rate(LOOP_RATE)234 while not rospy.is_shutdown():235 try:236 pilot.loop()237 loop_rate.sleep()238 except rospy.ROSInterruptException:239 rospy.loginfo('%s caught ros interrupt!', name)240 # except Exception as e:241 # rospy.logfatal('%s', e)242 # rospy.logfatal('Caught exception and dying!')...
neyboy.py
Source:neyboy.py
1import base642import datetime as dt3import io4try:5 from gym import logger6except:7 import logging as logger8import random9import re10import uuid11from collections import namedtuple12import pathlib13import numpy as np14from PIL import Image15from pyppeteer import launch, connect16from syncer import sync17ACTION_NAMES = ["NOOP", "LEFT", "RIGHT"]18ACTION_NONE = 019ACTION_LEFT = 120ACTION_RIGHT = 221START_SCREEN = 022GAME_SCREEN = 123GAME_OVER_SCREEN = 224TOAST_APPEARANCE_FREQUENCY = 1025GameState = namedtuple('GameState',26 ['game_id', 'id', 'score', 'status', 'hiscore', 'snapshot', 'timestamp', 'dimensions',27 'position'])28DEFAULT_NAVIGATION_TIMEOUT = 60 * 100029DEFAULT_GAME_URL = 'http://fabito.github.io/neyboy/'30DEFAULT_BROWSER_WS_ENDPOINT = 'ws://localhost:3000'31DEFAULT_CHROMIUM_LAUNCH_ARGS = ['--no-sandbox', '--window-size=80,315', '--disable-infobars']32class Game:33 def __init__(self, headless=True, user_data_dir=None, navigation_timeout=DEFAULT_NAVIGATION_TIMEOUT,34 game_url=DEFAULT_GAME_URL, browser_ws_endpoint=None, initial_width=180, initial_height=320):35 self.initial_width = initial_width36 self.initial_height = initial_height37 self.headless = headless38 self.user_data_dir = user_data_dir39 self.navigation_timeout = navigation_timeout40 self.is_running = False41 self.browser = None42 self.page = None43 self.state = None44 self._dims = None45 self.game_id = str(uuid.uuid4())46 self.state_id = 047 self.game_url = game_url48 self.browser_ws_endpoint = browser_ws_endpoint49 async def initialize(self):50 if self.browser_ws_endpoint is not None:51 logger.info('Connecting to running instance at: %s', self.browser_ws_endpoint)52 self.browser = await connect(browserWSEndpoint=self.browser_ws_endpoint)53 self.page = await self.browser.newPage()54 else:55 logger.info('Launching new browser instance')56 if self.user_data_dir is not None:57 self.browser = await launch(headless=self.headless, userDataDir=self.user_data_dir,58 args=DEFAULT_CHROMIUM_LAUNCH_ARGS)59 else:60 self.browser = await launch(headless=self.headless)61 pages = await self.browser.pages()62 if len(pages) > 0:63 self.page = pages[0]64 else:65 self.page = await self.browser.newPage()66 self.page.setDefaultNavigationTimeout(self.navigation_timeout)67 await self.page.setViewport(dict(width=117, height=156))68 await self.page.goto('{}?w={}&h={}'.format(self.game_url, self.initial_width, self.initial_height),69 {'waitUntil': 'networkidle2'})70 envjs_path = pathlib.Path(__file__).resolve().parent.joinpath('env.js')71 await self.page.addScriptTag(dict(path=str(envjs_path)))72 await self.is_ready()73 @staticmethod74 async def create(headless=True, user_data_dir=None, navigation_timeout=DEFAULT_NAVIGATION_TIMEOUT,75 game_url=DEFAULT_GAME_URL, browser_ws_endpoint=None) -> 'Game':76 o = Game(headless, user_data_dir, navigation_timeout, game_url, browser_ws_endpoint)77 await o.initialize()78 return o79 @property80 def x(self):81 return int(self._dims['x'])82 @property83 def y(self):84 return int(self._dims['y'])85 @property86 def width(self):87 return int(self._dims['width'])88 @property89 def height(self):90 return int(self._dims['height'])91 async def dimensions(self):92 dimensions = await self.page.evaluate('''() => {93 return neyboyChallenge.dimensions();94 }''')95 return dimensions96 async def is_ready(self):97 await self.page.waitForFunction('''()=>{98 return neyboyChallenge && neyboyChallenge.isReady();99 }''')100 async def start(self):101 if random.randint(0, 1):102 await self.tap_right()103 else:104 await self.tap_left()105 await self._shuffle_toasts()106 return self107 async def _shuffle_toasts(self):108 await self.page.evaluate('''() => {109 neyboyChallenge.shuffleToasts();110 }''')111 return self112 def is_over(self):113 return self.state.status == GAME_OVER_SCREEN114 async def _wait_until_replay_button_is_active(self):115 await self.resume()116 await self.page.waitForFunction('''() => {117 return neyboyChallenge.isOver();118 }''')119 async def is_loaded(self):120 return await self.is_ready()121 async def pause(self):122 await self.page.evaluate('''() => {123 neyboyChallenge.pause();124 }''')125 async def resume(self):126 await self.page.evaluate('''() => {127 neyboyChallenge.resume();128 }''')129 async def get_score(self):130 score = await self.page.evaluate('''() => {131 return neyboyChallenge.getScore();132 }''')133 return int(score) if score else 1134 async def get_high_score(self):135 hiscore = await self.page.evaluate('''() => {136 return neyboyChallenge.runtime !== undefined &&137 neyboyChallenge.runtime.getEventVariableByName('hiscore').data;138 }''')139 return int(hiscore) if hiscore else 1140 async def get_scores(self):141 scores = await self.page.evaluate('''() => {142 const score = neyboyChallenge.getScore();143 const hiscore = neyboyChallenge.runtime.getEventVariableByName('hiscore').data || 0;144 return {score, hiscore};145 }''')146 scores['score'] = int(scores['score'])147 scores['hiscore'] = int(scores['hiscore'])148 return scores149 async def tap_left(self, delay=0):150 x = self.x + self.width // 4151 y = self.y + self.height // 3152 await self.page.mouse.click(x, y, {'delay': delay})153 async def tap_right(self, delay=0):154 x = (self.x + self.width) - self.width // 4155 y = self.y + self.height // 3156 await self.page.mouse.click(x, y, {'delay': delay})157 async def stop(self):158 await self.browser.close()159 async def _hard_restart(self):160 await self.page.reload({'waitUntil': 'networkidle2'})161 await self.is_loaded()162 async def restart(self):163 self.game_id = str(uuid.uuid4())164 self.state_id = 0165 if self.state.status == GAME_SCREEN:166 # commit suicide167 while not self.is_over():168 logger.debug('suiciding')169 await self.tap_left()170 await self.tap_left()171 await self.tap_left()172 await self.get_state()173 if self.is_over():174 await self._wait_until_replay_button_is_active()175 x = self.x + self.width // 2176 y = self.y + self.height - self.height // 7177 await self.page.mouse.click(x, y)178 elif self.state.status == START_SCREEN:179 logger.debug('start screen')180 else:181 raise ValueError('Unknown state: {}'.format(self.state.status))182 await self.start()183 async def screenshot(self, format="jpeg", quality=30, encoding='binary'):184 dims = await self.dimensions()185 dims['y'] = dims['height'] / 2186 dims['height'] = dims['height'] - dims['y'] - 30187 snapshot = await self.page.screenshot({188 'type': format,189 'quality': quality,190 'clip': dims191 })192 if encoding == 'binary':193 return snapshot194 else:195 encoded_snapshot = base64.b64encode(snapshot)196 return encoded_snapshot.decode('ascii')197 async def get_state(self, include_snapshot='numpy'):198 """199 :param include_snapshot: numpy, pil, ascii, bytes, None200 :param fmt:201 :param quality:202 :return: a GameState instance203 """204 state = await self.page.evaluate('''(includeSnapshot, format, quality) => {205 return neyboyChallenge.state(includeSnapshot, format, quality);206 }''', include_snapshot, 'image/jpeg', 30)207 self.state_id += 1208 self._dims = state['dimensions']209 state['hiscore'] = int(state['hiscore'])210 state['score'] = int(state['score'])211 state['status'] = int(state['status'])212 state['id'] = self.state_id213 state['game_id'] = self.game_id214 state['timestamp'] = dt.datetime.today().timestamp()215 if include_snapshot is not None:216 base64_string = state['snapshot']217 base64_string = re.sub('^data:image/.+;base64,', '', base64_string)218 imgdata = base64.b64decode(base64_string)219 bytes_io = io.BytesIO(imgdata)220 if include_snapshot == 'numpy':221 image = Image.open(bytes_io)222 state['snapshot'] = np.array(image)223 elif include_snapshot == 'pil':224 image = Image.open(bytes_io)225 state['snapshot'] = image226 elif include_snapshot == 'ascii':227 image = Image.open(bytes_io)228 state['snapshot'] = self.screenshot_to_ascii(image, 0.1, 3)229 elif include_snapshot == 'bytes':230 state['snapshot'] = bytes_io231 else:232 raise ValueError('Supported snapshot formats are: numpy, pil, ascii, bytes')233 self.state = GameState(**state)234 return self.state235 async def save_screenshot(self, path, format="jpeg", quality=30):236 dims = await self.dimensions()237 dims['y'] = dims['height'] / 2238 dims['height'] = dims['height'] - dims['y'] - 30239 await self.page.screenshot({240 'path': path,241 'type': format,242 'quality': quality,243 'clip': dims244 })245 async def is_in_start_screen(self):246 playing_status = await self._get_is_playing_status()247 return playing_status == START_SCREEN248 @staticmethod249 def screenshot_to_ascii(img, scale, intensity_correction_factor, width_correction_factor=7 / 4):250 """251 https://gist.github.com/cdiener/10491632252 :return:253 """254 chars = np.asarray(list(' .,:;irsXA253hMHGS#9B&@'))255 SC, GCF, WCF = scale, intensity_correction_factor, width_correction_factor256 S = (round(img.size[0] * SC * WCF), round(img.size[1] * SC))257 img = np.sum(np.asarray(img.resize(S)), axis=2)258 img -= img.min()259 img = (1.0 - img / img.max()) ** GCF * (chars.size - 1)260 return "\n".join(("".join(r) for r in chars[img.astype(int)]))261class SyncGame:262 def __init__(self, game: Game):263 self.game = game264 def __getattr__(self, attr):265 return sync(getattr(self.game, attr))266 @staticmethod267 def create(headless=True, user_data_dir=None, navigation_timeout=DEFAULT_NAVIGATION_TIMEOUT,268 game_url=DEFAULT_GAME_URL, browser_ws_endpoint=None) -> 'SyncGame':269 o = sync(Game.create)(headless, user_data_dir, navigation_timeout, game_url, browser_ws_endpoint)...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!