Best Python code snippet using playwright-python
main.py
Source:main.py
1# --- Revised 3-Clause BSD License ---2# Copyright Semtech Corporation 2020. All rights reserved.3#4# Redistribution and use in source and binary forms, with or without modification,5# are permitted provided that the following conditions are met:6#7# * Redistributions of source code must retain the above copyright notice,8# this list of conditions and the following disclaimer.9# * Redistributions in binary form must reproduce the above copyright notice,10# this list of conditions and the following disclaimer in the documentation11# and/or other materials provided with the distribution.12# * Neither the name of the Semtech corporation nor the names of its13# contributors may be used to endorse or promote products derived from this14# software without specific prior written permission.15#16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED18# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE19# DISCLAIMED. IN NO EVENT SHALL SEMTECH CORPORATION. BE LIABLE FOR ANY DIRECT,20# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,21# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,22# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF23# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE24# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF25# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.26from typing import Any,Awaitable,Callable,Dict,List,Mapping,Optional27import sys28import os29import traceback30import asyncio31import websockets32import logging33import json34import argparse35from urllib.parse import urlparse36from websockets.server import WebSocketServerProtocol as WSSP37import router_config38from router import Router39from id6 import Id640logger = logging.getLogger('ts2pktfwd')41# All track stations managed by this process. Every web socket connection42# on muxs is forwarded to the router registered here. This map is populated43# at startup time.44routerid2router = {} # type:Mapping[Id6,Router]45async def add_router(routerid:'Id6', rconfig:Mapping[str,Any], pkfwduri:str) -> Router:46 assert routerid not in routerid2router47 r = Router(routerid, rconfig, pkfwduri)48 await r.start()49 routerid2router[routerid] = r50 return r51async def websocket_send_error(websocket: WSSP, router:Optional[str], message:str) -> None:52 await websocket.send(json.dumps({ 'router': router if router else '0', 'error': message }))53class Infos():54 ''' Simple info server to handle router info requests. '''55 def __init__(self, host:str, port:int, muxs_uri:str) -> None:56 self.host = host57 self.port = port58 self.muxs_uri = muxs_uri59 self.ws_server = None # type: Optional[websockets.server.WebSocketServer]60 async def start(self):61 #self.ws_server = await websockets.serve(self.accept, host=self.host, port=self.port)62 self.ws_server = await websockets.serve(self.accept, port=self.port)63 def __str__(self):64 return 'Infos'65 async def accept(self, websocket:WSSP, path:str) -> None:66 logger.info('%s: accept: path %s' % (self, path))67 router = None # type:Optional[str]68 errmsg = None # type:Optional[str]69 try:70 s = json.loads(await websocket.recv())71 logger.info('%s: read: %s' % (self, s))72 if 'router' not in s:73 errmsg = 'Invalid request data'74 else:75 router = s['router']76 routerid = Id6(router, 'router')77 if routerid not in routerid2router:78 errmsg = 'Router not provisioned'79 else:80 logger.info('%s: respond: %s' % (self, self.muxs_uri+'/'+str(routerid)))81 resp = { 'router': router, 'muxs': 'muxs-::0', 'uri': self.muxs_uri+'/'+str(routerid) }82 await websocket.send(json.dumps(resp))83 return84 except asyncio.CancelledError:85 raise86 except Exception as exc:87 errmsg = 'Could not handle request'88 logger.error('%s: server socket failed: %s', self, exc, exc_info=True)89 await websocket_send_error(websocket, router, errmsg)90 resp = { 'error': errmsg }91 await websocket.send(json.dumps(resp))92 async def shutdown(self) -> None:93 ws_server = self.ws_server94 if ws_server:95 self.ws_server = None96 ws_server.close()97 await ws_server.wait_closed()98class Muxs():99 ''' Simple muxs server to accept router connects. '''100 def __init__(self, host:str, port:int) -> None:101 self.host = host102 self.port = port103 self.ws_server = None # type: Optional[websockets.server.WebSocketServer]104 async def start(self):105 #self.ws_server = await websockets.serve(self.accept, host=self.host, port=self.port)106 self.ws_server = await websockets.serve(self.accept, port=self.port)107 def __str__(self):108 return 'Muxs'109 async def accept(self, websocket:WSSP, path:str) -> None:110 logger.info('%s: accept: %s' % (self, path))111 try:112 s = path[1:]113 routerid = Id6(s, 'router')114 if routerid not in routerid2router:115 await websocket_send_error(websocket, s, 'Router not provisioned')116 return117 r = routerid2router[routerid]118 await r.on_ws_connect(websocket)119 except Exception as exc:120 logger.error('%s: server socket failed: %s', self, exc, exc_info=True)121 return122 async def shutdown(self) -> None:123 ws_server = self.ws_server124 if ws_server:125 self.ws_server = None126 ws_server.close()127 await ws_server.wait_closed()128infos = None # type:Optional[Infos]129muxs = None # type:Optional[Muxs]130LOG_LEVELS = [ 'ERROR', 'WARNING', 'INFO', 'DEBUG' ]131LOG_STR2LEVEL = {132 'ERROR': logging.ERROR,133 'WARNING': logging.WARNING,134 'INFO': logging.INFO,135 'DEBUG': logging.DEBUG136}137def ap_routerid(s:str) -> Id6:138 return Id6(s,'router')139def handle_exc(exc, exit_code=2):140 osenv = os.environ141 if 'stacktrace' in osenv and osenv['stacktrace']:142 print('Execution failed:')143 print(traceback.format_exc())144 else:145 print('Execution failed: %s' % (exc))146 sys.exit(exit_code)147async def main(args):148 global infos, muxs149 root = logging.getLogger()150 ll = logging.INFO151 if args.loglevel:152 ll = LOG_STR2LEVEL[args.loglevel]153 root.setLevel(ll)154 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')155 if args.logfile:156 fh = logging.FileHandler(args.logfile)157 fh.setLevel(logging.DEBUG)158 root.addHandler(fh)159 else:160 ch = logging.StreamHandler(sys.stdout)161 ch.setLevel(logging.DEBUG)162 ch.setFormatter(formatter)163 root.addHandler(ch)164 infosuri = urlparse(args.infosuri)165 infosport = infosuri.port166 infoshost = infosuri.hostname167 muxsuri = ''168 if args.muxsuri:169 muxsuri = urlparse(args.muxsuri)170 else:171 muxsuri = infosuri._replace(netloc="{}:{}".format(infoshost, infosport+2))172 muxsport = muxsuri.port173 muxshost = muxsuri.hostname174 pkfwduri = urlparse(args.pkfwduri)175 logger.info('Connection details: infosuri %s, muxsuri %s, pkfwduri %s' % (infosuri.geturl(), muxsuri.geturl(), pkfwduri.geturl()))176 infos = Infos(infoshost, infosport, muxsuri.geturl())177 muxs = Muxs(muxshost, muxsport)178 await infos.start()179 logger.info('Infos started.')180 await muxs.start()181 logger.info('Muxs started.')182 router_config.ini([ args.confdir ])183 routers = args.routerids if args.routerids else router_config.routerid2config.keys()184 for s in routers:185 routerid = Id6(s, 'router')186 logger.info("Instantiating %s" % (routerid))187 rc = router_config.get_router_config(routerid)188 await add_router(routerid, rc, pkfwduri)189if __name__ == '__main__': # pragma:nocover190 parser = argparse.ArgumentParser(description='''ts2pkdfwd.''')191 parser.add_argument("--infosuri", type=str, default="ws://localhost:6090", help="Info server base URI.")192 parser.add_argument("--muxsuri", type=str, default=None, help="Mux server base URI, by default Info server port plus 2.")193 parser.add_argument("--pkfwduri", type=str, help="Packet forwarder destination URI.", default="udp://localhost:1680")194 parser.add_argument("--confdir", type=str, help="Directory where to load region and router configuration.", default=".")195 parser.add_argument("--logfile", type=str, help="Log file, by default logged to stdout.", default=None)196 parser.add_argument("--loglevel", type=str, choices=LOG_LEVELS, help="Log level: %s" % LOG_LEVELS, default='INFO')197 parser.add_argument("routerids", type=ap_routerid, nargs='*', help='Router ids', default= None)198 try:199 args = parser.parse_args()200 except Exception as exc:201 handle_exc(exc, 1)202 loop = asyncio.get_event_loop()203 try:204 loop.run_until_complete(main(args))205 asyncio.get_event_loop().run_forever()206 except Exception as ex:207 handle_exc(ex, 2)208 finally:...
server.py
Source:server.py
1import asyncio2import base643import json4import ssl5from os import environ as env6import aiohttp7import aiohttp_jinja28import jinja29from aiohttp import ClientSession, WSMsgType, client, web10from aiohttp_session import get_session, setup11from aiohttp_session.cookie_storage import EncryptedCookieStorage12from dotenv import find_dotenv, load_dotenv13from oauthlib.oauth2 import WebApplicationClient14import if_debug15from code_server_manager import CodeServerManager16load_dotenv(find_dotenv())17if_debug.attach_debugger_if_dev()18oath_client = WebApplicationClient(env["GITHUB_CLIENT_ID"])19async def login(req: web.Request) -> web.Response:20 request_uri = oath_client.prepare_request_uri(21 env["GITHUB_URL"],22 redirect_uri=str(req.url.parent) + "/callback",23 scope=["openid", "email", "profile"],24 )25 raise web.HTTPFound(request_uri)26@aiohttp_jinja2.template("home.html")27async def callback(req: web.Request) -> web.Response:28 code = req.query["code"]29 token_url, headers, body = oath_client.prepare_token_request(30 env["GITHUB_ACCESS_TOKEN"],31 # authorization_response=str(req.url),32 # redirect_url=str(req.url.parent),33 code=code,34 )35 headers["Accept"] = "application/json"36 async with aiohttp.ClientSession() as session:37 async with session.post(38 token_url,39 headers=headers,40 data=body,41 params=[42 ("client_id", env["GITHUB_CLIENT_ID"]),43 ("client_secret", env["GITHUB_CLIENT_SECRET"]),44 ("code", code),45 ]46 # auth=(env["GITHUB_CLIENT_ID"], env["GITHUB_CLIENT_SECRET"])47 ) as token_response:48 data = await token_response.read()49 print(data)50 tokens = oath_client.parse_request_body_response(data)51 print(tokens)52 new_headers = {"Authorization": "token " + tokens.get("access_token")}53 async with session.get(54 "https://api.github.com/user", headers=new_headers55 ) as user_response:56 user_data_str = await user_response.read()57 user_data = json.loads(user_data_str)58 print(user_data)59 session = await get_session(req)60 session["container_name"] = (61 user_data["login"] + "_" + str(user_data["id"])62 )63 return {"is_logged_in": True}64@aiohttp_jinja2.template("home.html")65async def logout(req: web.Request) -> web.Response:66 session = await get_session(req)67 session.invalidate()68 return {"is_logged_in": False}69@aiohttp_jinja2.template("home.html")70async def does_work(req: web.Request) -> web.Response:71 sess = await get_session(req)72 return {"is_logged_in": "container_name" in sess.keys()}73async def proxy_handler(req: web.Request) -> web.Response:74 sess = await get_session(req)75 if "container_name" not in sess.keys():76 raise web.HTTPFound("/login")77 else:78 container_name = sess["container_name"]79 code_server_manager = CodeServerManager(container_name)80 await code_server_manager.find_or_create_container()81 reqH = req.headers.copy()82 base_url = f"http://{container_name}:8080"83 # Do web socket Stuff84 if (85 reqH["connection"] == "Upgrade"86 and reqH["upgrade"] == "websocket"87 and req.method == "GET"88 ):89 ws_server = web.WebSocketResponse()90 await ws_server.prepare(req)91 print(f"##### WS_SERVER {ws_server}")92 client_session = ClientSession(cookies=req.cookies)93 path_qs_cleaned = req.path_qs.removeprefix("/devenv")94 async with client_session.ws_connect(base_url + path_qs_cleaned) as ws_client:95 print(f"##### WS_CLIENT {ws_client}")96 async def wsforward(ws_from, ws_to):97 async for msg in ws_from:98 print(f">>> msg: {msg}")99 mt = msg.type100 md = msg.data101 if mt == WSMsgType.TEXT:102 await ws_to.send_str(md)103 elif mt == WSMsgType.BINARY:104 await ws_to.send_bytes(md)105 elif mt == WSMsgType.PING:106 await ws_to.ping()107 elif mt == WSMsgType.PONG:108 await ws_to.pong()109 elif ws_to.closed:110 await ws_to.close(code=ws_to.close_code, message=msg.extra)111 else:112 raise ValueError(f"unexpected message type: {msg}")113 await asyncio.wait(114 [wsforward(ws_server, ws_client), wsforward(ws_client, ws_server)],115 return_when=asyncio.FIRST_COMPLETED,116 )117 return ws_server118 else: # Do http proxy119 proxyPath = req.path_qs120 if proxyPath != "":121 proxyPath = (122 proxyPath.removeprefix("/devenv")123 .removeprefix("devenv")124 .removeprefix("/")125 )126 proxyPath = "/" + proxyPath127 async with client.request(128 req.method,129 base_url + proxyPath,130 allow_redirects=False,131 data=await req.read(),132 ) as res:133 headers = res.headers.copy()134 headers["service-worker-allowed"] = "/"135 body = await res.read()136 return web.Response(headers=headers, status=res.status, body=body)137app = web.Application()138# Set up sessions139secret_key = base64.urlsafe_b64decode(env["FERNET_KEY"])140setup(app, EncryptedCookieStorage(secret_key))141# Set up routes142app.add_routes([web.get("/", does_work)])143app.add_routes([web.get("/login", login)])144app.add_routes([web.get("/callback", callback)])145app.add_routes([web.get("/logout", logout)])146app.add_routes([web.get(r"/devenv", proxy_handler)])147app.add_routes([web.get(r"/{proxyPath:.*}", proxy_handler)])148# app.add_routes([web.get(r'/{proxyPath:.*}', proxy_handler)])149aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader("./templates"))150if "SSL_CRT_FILE" in env.keys() and "SSL_KEY_FILE" in env.keys():151 ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)152 ssl_context.load_cert_chain(env["SSL_CRT_FILE"], env["SSL_KEY_FILE"])153 web.run_app(app, port=5000, ssl_context=ssl_context)...
humming_ws_server.py
Source:humming_ws_server.py
...12 # url_host_only is used for creating one HummingWSServer to handle all websockets requests and responses for13 # a given url host.14 url_host_only = False15 @staticmethod16 def get_ws_server(url):17 if HummingWsServerFactory.url_host_only:18 url = urlparse(url).netloc19 return HummingWsServerFactory._ws_servers.get(url)20 @staticmethod21 def start_new_server(url):22 port = get_open_port()23 ws_server = HummingWsServer(HummingWsServerFactory.host, port)24 if HummingWsServerFactory.url_host_only:25 url = urlparse(url).netloc26 HummingWsServerFactory._ws_servers[url] = ws_server27 ws_server.start()28 return ws_server29 @staticmethod30 def reroute_ws_connect(url, **kwargs):31 ws_server = HummingWsServerFactory.get_ws_server(url)32 if ws_server is None:33 return HummingWsServerFactory._orig_ws_connect(url, **kwargs)34 kwargs.clear()35 return HummingWsServerFactory._orig_ws_connect(f"ws://{ws_server.host}:{ws_server.port}", **kwargs)36 @staticmethod37 async def send_str(url, message, delay=0):38 if delay > 0:39 await asyncio.sleep(delay)40 ws_server = HummingWsServerFactory.get_ws_server(url)41 await ws_server.websocket.send(message)42 @staticmethod43 def send_str_threadsafe(url, msg, delay=0):44 ws_server = HummingWsServerFactory.get_ws_server(url)45 asyncio.run_coroutine_threadsafe(HummingWsServerFactory.send_str(url, msg, delay), ws_server.ev_loop)46 @staticmethod47 async def send_json(url, data, delay=0):48 if delay > 0:49 await asyncio.sleep(delay)50 ws_server = HummingWsServerFactory.get_ws_server(url)51 await ws_server.websocket.send(json.dumps(data))52 @staticmethod53 def send_json_threadsafe(url, data, delay=0):54 ws_server = HummingWsServerFactory.get_ws_server(url)55 asyncio.run_coroutine_threadsafe(HummingWsServerFactory.send_json(url, data, delay), ws_server.ev_loop)56class HummingWsServer:57 def __init__(self, host, port):58 self.ev_loop: None59 self._started: bool = False60 self.host = host61 self.port = port62 self.websocket = None63 self.stock_responses = {}64 def add_stock_response(self, request, json_response):65 self.stock_responses[request] = json_response66 async def _handler(self, websocket, path):67 self.websocket = websocket68 async for msg in self.websocket:...
chat.py
Source:chat.py
1import json, threading2class Destination:3 def __init__( self, nick, con=None ):4 self.nick = nick5 self.con = con6class ChatServer:7 def __init__( self ):8 self.dst_queues = {}9 self.dst_by_id = {}10 def register_destination( self, dst ):11 if dst.nick not in self.dst_queues:12 self.dst_queues[dst.nick] = {}13 if dst.con not in self.dst_queues[dst.nick]:14 if dst.con!=None and None in self.dst_queues[dst.nick]:15 # aufgelaufene nicht verbindungsorientierte Nachrichten übernehmen16 self.dst_queues[dst.nick][dst.con] = [] + self.dst_queues[dst.nick][None]17 # anonyme Warteschlange wird nach der Ãbernahme bereinigt:18 self.dst_queues[dst.nick][None] = []19 else:20 self.dst_queues[dst.nick][dst.con] = []21 def remove_destination( self, dst ):22 if dst.nick in self.dst_queues:23 if dst.con in self.dst_queues[dst.nick] and dst.con!=None:24 del self.dst_queues[dst.nick][dst.con]25 if len(self.dst_queues[dst.nick])==0:26 del self.dst_queues[dst.nick]27 def get_connected_nicks( self ):28 nicks = set()29 for nick in self.dst_queues:30 for con in self.dst_queues[nick]:31 if con!=None:32 nicks.add( nick )33 return list(nicks)34 def send_message( self, dst, msg, msg_type="text", src=None ):35 self.register_destination( dst )36 if dst.con:37 # Direktzustellung an spezifische Verbindung38 self.dst_queues[dst.nick][dst.con].insert( 0, {"src" : src, "msg_type" : msg_type, "msg" : msg} )39 else:40 # Nachrichtenzustellung erfolgt an alle für den Nick registrierten Verbindungen41 for con in self.dst_queues[dst.nick]:42 self.dst_queues[dst.nick][con].insert( 0, {"src" : src, "msg_type" : msg_type, "msg" : msg} )43 def broadcast_message( self, msg, msg_type="text", src=None ):44 for nick in self.dst_queues:45 self.send_message( src=src, dst=Destination(nick), msg=msg, msg_type=msg_type )46 def recv_message( self, dst ):47 self.register_destination( dst )48 while self.dst_queues[dst.nick][dst.con]:49 msg = self.dst_queues[dst.nick][dst.con].pop()50 #msg["msg"] += " -> [%s,%s]" % (dst.nick,str(dst.con))51 yield msg52 53global_chat_server = ChatServer()54global_chat_server_semaphore = threading.Semaphore()55def initialize( ws_server ):56 print( str(threading.current_thread().ident)+" initialize" )57 global global_chat_server58 ws_server.user_status = ws_server.app.user.status()59 nick = ws_server.user_status["login"]["nick"]60 global_chat_server_semaphore.acquire()61 global_chat_server.register_destination( dst=Destination(nick,ws_server.con) )62 global_chat_server.broadcast_message( msg={"nick":nick, "nick_list":global_chat_server.get_connected_nicks()}, msg_type="join" )63 print( str(global_chat_server.dst_queues) )64 global_chat_server_semaphore.release()65def process_message( ws_server, msg ):66 print( str(threading.current_thread().ident)+" process_message" )67 cmd = json.loads(msg)68 global global_chat_server69 if "msg" in cmd:70 global_chat_server_semaphore.acquire()71 if "dst" in cmd:72 global_chat_server.send_message( src=ws_server.user_status["login"]["nick"], dst=Destination(cmd["dst"]), msg=cmd["msg"] )73 else:74 global_chat_server.broadcast_message( src=ws_server.user_status["login"]["nick"], msg=cmd["msg"] )75 global_chat_server_semaphore.release()76def run( ws_server ):77 #print( str(threading.current_thread().ident)+" run" )78 global global_chat_server79 global_chat_server_semaphore.acquire()80 for msg in global_chat_server.recv_message( dst=Destination(ws_server.user_status["login"]["nick"],ws_server.con) ):81 ws_server.ws.send( json.dumps(msg) )82 global_chat_server_semaphore.release()83def sleep( ws_server ):84 #print( str(threading.current_thread().ident)+" sleep" )85 ws_server.ws.client_message_event.wait( timeout=0.2 )86 87def cleanup( ws_server ):88 print( str(threading.current_thread().ident)+" cleanup" )89 global global_chat_server90 nick = ws_server.user_status["login"]["nick"]91 global_chat_server_semaphore.acquire()92 global_chat_server.remove_destination( dst=Destination(nick,ws_server.con) )93 global_chat_server.broadcast_message( msg={"nick":nick, "nick_list":global_chat_server.get_connected_nicks()}, msg_type="leave" )94 print( str(global_chat_server.dst_queues) )...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!