Best Python code snippet using playwright-python
test_debug_server.py
Source:test_debug_server.py
...18 killed = True19 break20 return killed21def setup_server(start_server, find_free_port, no_eval=False, stdout=False, env=None):22 port = find_free_port()23 args = ["+DEBUG_LOG", "+NO_EVAL"] if no_eval else ["+DEBUG_LOG"]24 s = start_server(port, "test_debug_server", args, stdout=stdout, env=env)25 assert s.poll() is None26 uri = "ws://localhost:{0}".format(port)27 return s, uri28def test_continue_stop(start_server, find_free_port):29 s, uri = setup_server(start_server, find_free_port, True)30 async def test_logic():31 client = hgdb.HGDBClient(uri, None)32 await client.connect()33 await client.continue_()34 await client.stop()35 asyncio.get_event_loop().run_until_complete(test_logic())36 # check if process exit...
test_jupyter_kernel_raw.py
Source:test_jupyter_kernel_raw.py
...6import logging7import zmq8from aiozmq import create_zmq_stream9import asyncio10def find_free_port() -> int:11 with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:12 sock.bind(('', 0))13 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)14 return sock.getsockname()[1]15async def test_kernel() -> None:16 transport = 'tcp'17 ip = '127.0.0.1'18 shell_port = find_free_port()19 control_port = find_free_port()20 stdin_port = find_free_port()21 iopub_port = find_free_port()22 heartbeat_port = find_free_port()23 configuration = {24 'transport': transport,25 'ip': ip,26 'shell_port': shell_port,27 'control_port': control_port,28 'stdin_port': stdin_port,29 'iopub_port': iopub_port,30 'hb_port': heartbeat_port,31 'key': 'a7d9f4f3-acad37f08d4fe05069a03422',32 'signature_scheme': 'hmac-sha256'33 }34 configuration_json = json.dumps(configuration)35 with NamedTemporaryFile() as configuration_file:36 configuration_file.write(str.encode(configuration_json))...
test_hgdb.py
Source:test_hgdb.py
...24 p = subprocess.Popen(args, stdout=subprocess.PIPE if not log else sys.stdout,25 stderr=subprocess.PIPE if not log else sys.stdout)26 time.sleep(wait)27 return p28def find_free_port():29 with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:30 s.bind(('', 0))31 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)32 return s.getsockname()[1]33def start_program(port, **kwargs):34 dirname = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))35 hgdb = os.path.join(dirname, "hgdb")36 # use a fake db for arguments37 args = [hgdb, ":" + str(port), "no-db", "--no-db-connection"]38 extra_args = []39 for n, v in kwargs.items():40 extra_args.append("--" + n)41 extra_args.append(v)42 args = args + extra_args43 p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)44 return p45def test_set_breakpoint_continue():46 port = find_free_port()47 # start the server48 s = start_server(port, "test_debug_server")49 # run the debugger50 p = start_program(port)51 # continue52 out = p.communicate(input=b"b test.py:1\nc\n")[0]53 out = out.decode("ascii")54 assert "Breakpoint 2 at test.py:1" in out55 s.kill()56 p.kill()57def test_rewind_time():58 port = find_free_port()59 # start the server60 s = start_server(port, "test_debug_server", supports_rewind=True)61 # run the debugger62 p = start_program(port)63 # continue64 out = p.communicate(input=b"b test.py:1\nc\ngo 200\ninfo time\n")[0]65 out = out.decode("ascii")66 assert "201" in out67 s.kill()68 p.kill()69def test_repl():70 port = find_free_port()71 # start the server72 s = start_server(port, "test_debug_server")73 # run the debugger74 p = start_program(port)75 # continue76 out = p.communicate(input=b"p 41 + mod.a\n")[0]77 out = out.decode("ascii")78 assert "42" in out79 s.kill()80 p.kill()81def test_step_over():82 port = find_free_port()83 # start the server84 s = start_server(port, "test_debug_server")85 # run the debugger86 p = start_program(port)87 # continue88 out = p.communicate(input=b"n\nn\nn\n")[0]89 out = out.decode("ascii")90 assert "Breakpoint 2 at test.py:1" in out91 assert "Breakpoint 8 at test.py:1" in out92 assert "Breakpoint 0 at test.py:2" in out93 s.kill()94 p.kill()95def test_set_value():96 port = find_free_port()97 # start the server98 s = start_server(port, "test_debug_server")99 p = start_program(port)100 # continue101 out = p.communicate(input=b"n\nset a=100\np a\nn\nn\np a + 1\n")[0]102 out = out.decode("ascii")103 assert "100" in out104 assert "101" in out105 s.kill()106 p.kill()107def test_data_breakpoint():108 port = find_free_port()109 s = start_server(port, "test_debug_server")110 p = start_program(port)111 # continue112 out = p.communicate(input=b"w c\nc\nc\ninfo watchpoint\n")[0]113 out = out.decode("ascii")114 assert "Watchpoint 0 at test.py:2" in out115 assert "Watchpoint 3 at test.py:5" in out116 assert "3\ttest.py:5\tc" in out117 s.kill()118 p.kill()119def test_src_mapping():120 port = find_free_port()121 # start the server122 s = start_server(port, "test_debug_server")123 # run the debugger124 p = start_program(port, map=":/tmp")125 # continue126 out = p.communicate(input=b"b /tmp/test.py:1\nc\n")[0]127 out = out.decode("ascii")128 assert "Breakpoint 2 at test.py:1" in out129 s.kill()130 p.kill()131if __name__ == "__main__":...
test_ws_server.py
Source:test_ws_server.py
...4import asyncio5import time6import websockets7def test_echo(start_server, find_free_port):8 port = find_free_port()9 s = start_server(port, "test_ws_server", wait=0.05)10 async def send_msg():11 uri = "ws://localhost:{0}".format(port)12 payload = "hello world"13 async with websockets.connect(uri) as ws:14 await ws.send(payload)15 echo = await ws.recv()16 assert echo == payload17 asyncio.get_event_loop().run_until_complete(send_msg())18 # kill the server19 s.terminate()20 while s.poll() is None:21 pass22def test_shutdown(start_server, find_free_port):23 port = find_free_port()24 s = start_server(port, "test_ws_server", wait=0.05)25 async def send_msg():26 uri = "ws://localhost:{0}".format(port)27 payload = "stop"28 async with websockets.connect(uri) as ws:29 await ws.send(payload)30 asyncio.get_event_loop().run_until_complete(send_msg())31 # check if process exit32 t = time.time()33 killed = False34 while time.time() < t + 1:35 if s.poll() is not None:36 killed = True37 break38 if not killed:39 s.terminate()40 assert killed41def test_topic_pub(start_server, find_free_port):42 port = find_free_port()43 s = start_server(port, "test_ws_server", wait=0.05)44 async def send_msg():45 uri = "ws://localhost:{0}".format(port)46 payload1 = "42"47 payload2 = "hello world"48 async with websockets.connect(uri) as ws1:49 async with websockets.connect(uri) as ws2:50 await ws1.send(payload1)51 echo1 = await ws1.recv()52 assert echo1 == payload153 await ws2.send(payload2)54 await ws1.recv()55 echo2 = await ws1.recv()56 assert(echo2 == payload2 * 2)...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!