Best Python code snippet using autotest_python
test_p2p_daemon.py
Source:test_p2p_daemon.py
...9from multiaddr import Multiaddr10from hivemind.p2p import P2P, P2PHandlerError11from hivemind.proto import dht_pb212from hivemind.utils.serializer import MSGPackSerializer13def is_process_running(pid: int) -> bool:14 return subprocess.run(["ps", "-p", str(pid)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 015async def replicate_if_needed(p2p: P2P, replicate: bool) -> P2P:16 return await P2P.replicate(p2p.daemon_listen_maddr) if replicate else p2p17@pytest.mark.asyncio18async def test_daemon_killed_on_del():19 p2p_daemon = await P2P.create()20 child_pid = p2p_daemon._child.pid21 assert is_process_running(child_pid)22 await p2p_daemon.shutdown()23 assert not is_process_running(child_pid)24@pytest.mark.parametrize(25 "host_maddrs",26 [27 [Multiaddr("/ip4/127.0.0.1/tcp/0")],28 [Multiaddr("/ip4/127.0.0.1/udp/0/quic")],29 [Multiaddr("/ip4/127.0.0.1/tcp/0"), Multiaddr("/ip4/127.0.0.1/udp/0/quic")],30 ],31)32@pytest.mark.asyncio33async def test_transports(host_maddrs: List[Multiaddr]):34 server = await P2P.create(quic=True, host_maddrs=host_maddrs)35 peers = await server.list_peers()36 assert len(peers) == 037 client = await P2P.create(quic=True, host_maddrs=host_maddrs, initial_peers=await server.get_visible_maddrs())38 await client.wait_for_at_least_n_peers(1)39 peers = await client.list_peers()40 assert len(peers) == 141 peers = await server.list_peers()42 assert len(peers) == 143@pytest.mark.asyncio44async def test_daemon_replica_does_not_affect_primary():45 p2p_daemon = await P2P.create()46 p2p_replica = await P2P.replicate(p2p_daemon.daemon_listen_maddr)47 child_pid = p2p_daemon._child.pid48 assert is_process_running(child_pid)49 await p2p_replica.shutdown()50 assert is_process_running(child_pid)51 await p2p_daemon.shutdown()52 assert not is_process_running(child_pid)53@pytest.mark.parametrize(54 "should_cancel,replicate",55 [56 (True, False),57 (True, True),58 (False, False),59 (False, True),60 ],61)62@pytest.mark.asyncio63async def test_call_protobuf_handler(should_cancel, replicate, handle_name="handle"):64 handler_cancelled = False65 server_primary = await P2P.create()66 server = await replicate_if_needed(server_primary, replicate)67 async def ping_handler(request, context):68 try:69 await asyncio.sleep(2)70 except asyncio.CancelledError:71 nonlocal handler_cancelled72 handler_cancelled = True73 return dht_pb2.PingResponse(peer=dht_pb2.NodeInfo(node_id=server.id.to_bytes()), available=True)74 server_pid = server_primary._child.pid75 await server.add_protobuf_handler(handle_name, ping_handler, dht_pb2.PingRequest)76 assert is_process_running(server_pid)77 client_primary = await P2P.create(initial_peers=await server.get_visible_maddrs())78 client = await replicate_if_needed(client_primary, replicate)79 client_pid = client_primary._child.pid80 assert is_process_running(client_pid)81 await client.wait_for_at_least_n_peers(1)82 ping_request = dht_pb2.PingRequest(peer=dht_pb2.NodeInfo(node_id=client.id.to_bytes()), validate=True)83 expected_response = dht_pb2.PingResponse(peer=dht_pb2.NodeInfo(node_id=server.id.to_bytes()), available=True)84 if should_cancel:85 call_task = asyncio.create_task(86 client.call_protobuf_handler(server.id, handle_name, ping_request, dht_pb2.PingResponse)87 )88 await asyncio.sleep(0.25)89 call_task.cancel()90 await asyncio.sleep(0.25)91 assert handler_cancelled92 else:93 actual_response = await client.call_protobuf_handler(94 server.id, handle_name, ping_request, dht_pb2.PingResponse95 )96 assert actual_response == expected_response97 assert not handler_cancelled98 await server.shutdown()99 await server_primary.shutdown()100 assert not is_process_running(server_pid)101 await client_primary.shutdown()102 assert not is_process_running(client_pid)103@pytest.mark.asyncio104async def test_call_protobuf_handler_error(handle_name="handle"):105 async def error_handler(request, context):106 raise ValueError("boom")107 server = await P2P.create()108 server_pid = server._child.pid109 await server.add_protobuf_handler(handle_name, error_handler, dht_pb2.PingRequest)110 assert is_process_running(server_pid)111 client = await P2P.create(initial_peers=await server.get_visible_maddrs())112 client_pid = client._child.pid113 assert is_process_running(client_pid)114 await client.wait_for_at_least_n_peers(1)115 ping_request = dht_pb2.PingRequest(peer=dht_pb2.NodeInfo(node_id=client.id.to_bytes()), validate=True)116 with pytest.raises(P2PHandlerError) as excinfo:117 await client.call_protobuf_handler(server.id, handle_name, ping_request, dht_pb2.PingResponse)118 assert "boom" in str(excinfo.value)119 await server.shutdown()120 await client.shutdown()121async def handle_square_stream(_, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:122 with closing(writer):123 while True:124 try:125 x = MSGPackSerializer.loads(await P2P.receive_raw_data(reader))126 except asyncio.IncompleteReadError:127 break128 result = x ** 2129 await P2P.send_raw_data(MSGPackSerializer.dumps(result), writer)130async def validate_square_stream(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:131 with closing(writer):132 for _ in range(10):133 x = np.random.randint(100)134 await P2P.send_raw_data(MSGPackSerializer.dumps(x), writer)135 result = MSGPackSerializer.loads(await P2P.receive_raw_data(reader))136 assert result == x ** 2137@pytest.mark.asyncio138async def test_call_peer_single_process():139 server = await P2P.create()140 server_pid = server._child.pid141 assert is_process_running(server_pid)142 handler_name = "square"143 await server.add_binary_stream_handler(handler_name, handle_square_stream)144 client = await P2P.create(initial_peers=await server.get_visible_maddrs())145 client_pid = client._child.pid146 assert is_process_running(client_pid)147 await client.wait_for_at_least_n_peers(1)148 _, reader, writer = await client.call_binary_stream_handler(server.id, handler_name)149 await validate_square_stream(reader, writer)150 await server.shutdown()151 assert not is_process_running(server_pid)152 await client.shutdown()153 assert not is_process_running(client_pid)154async def run_server(handler_name, server_side, response_received):155 server = await P2P.create()156 server_pid = server._child.pid157 assert is_process_running(server_pid)158 await server.add_binary_stream_handler(handler_name, handle_square_stream)159 server_side.send(server.id)160 server_side.send(await server.get_visible_maddrs())161 while response_received.value == 0:162 await asyncio.sleep(0.5)163 await server.shutdown()164 assert not is_process_running(server_pid)165def server_target(handler_name, server_side, response_received):166 asyncio.run(run_server(handler_name, server_side, response_received))167@pytest.mark.asyncio168async def test_call_peer_different_processes():169 handler_name = "square"170 server_side, client_side = mp.Pipe()171 response_received = mp.Value(np.ctypeslib.as_ctypes_type(np.int32))172 response_received.value = 0173 proc = mp.Process(target=server_target, args=(handler_name, server_side, response_received))174 proc.start()175 peer_id = client_side.recv()176 peer_maddrs = client_side.recv()177 client = await P2P.create(initial_peers=peer_maddrs)178 client_pid = client._child.pid179 assert is_process_running(client_pid)180 await client.wait_for_at_least_n_peers(1)181 _, reader, writer = await client.call_binary_stream_handler(peer_id, handler_name)182 await validate_square_stream(reader, writer)183 response_received.value = 1184 await client.shutdown()185 assert not is_process_running(client_pid)186 proc.join()187 assert proc.exitcode == 0188@pytest.mark.asyncio189async def test_error_closes_connection():190 async def handle_raising_error(_, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:191 with closing(writer):192 command = await P2P.receive_raw_data(reader)193 if command == b"raise_error":194 raise Exception("The handler has failed")195 else:196 await P2P.send_raw_data(b"okay", writer)197 server = await P2P.create()198 server_pid = server._child.pid199 assert is_process_running(server_pid)200 handler_name = "handler"201 await server.add_binary_stream_handler(handler_name, handle_raising_error)202 client = await P2P.create(initial_peers=await server.get_visible_maddrs())203 client_pid = client._child.pid204 assert is_process_running(client_pid)205 await client.wait_for_at_least_n_peers(1)206 _, reader, writer = await client.call_binary_stream_handler(server.id, handler_name)207 with closing(writer):208 await P2P.send_raw_data(b"raise_error", writer)209 with pytest.raises(asyncio.IncompleteReadError): # Means that the connection is closed210 await P2P.receive_raw_data(reader)211 # Despite the handler raised an exception, the server did not crash and ready for next requests212 assert is_process_running(server_pid)213 _, reader, writer = await client.call_binary_stream_handler(server.id, handler_name)214 with closing(writer):215 await P2P.send_raw_data(b"behave_normally", writer)216 assert await P2P.receive_raw_data(reader) == b"okay"217 await server.shutdown()218 assert not is_process_running(server_pid)219 await client.shutdown()220 assert not is_process_running(client_pid)221@pytest.mark.asyncio222async def test_handlers_on_different_replicas():223 async def handler(_, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, key: str) -> None:224 with closing(writer):225 await P2P.send_raw_data(key, writer)226 server_primary = await P2P.create()227 server_id = server_primary.id228 await server_primary.add_binary_stream_handler("handle_primary", partial(handler, key=b"primary"))229 server_replica1 = await replicate_if_needed(server_primary, True)230 await server_replica1.add_binary_stream_handler("handle1", partial(handler, key=b"replica1"))231 server_replica2 = await replicate_if_needed(server_primary, True)232 await server_replica2.add_binary_stream_handler("handle2", partial(handler, key=b"replica2"))233 client = await P2P.create(initial_peers=await server_primary.get_visible_maddrs())234 await client.wait_for_at_least_n_peers(1)...
yeshup_tests.py
Source:yeshup_tests.py
...72 #p.stdin.close()73 p.communicate()74 code = p.wait()75 return code76def is_process_running(pid):77 try:78 os.kill(pid,0)79 return True80 except ProcessLookupError:81 return False82 83def test_simple_yeshup(trp):84 #print("run basic")85 #x = run_it([sys.argv[0], "launch", "basic"])86 #print(f"exit code: {x}")87 # start the main process in a thread, this will block88 # then can exit it asynchronously89 get_pid_queue = queue.Queue()90 91 def run_parent():92 # __file__ refers to this file, yeshup_test.py93 p = run_it_stdin([__file__, "launch", "basic"])94 #p.wait()95 #print("launched")96 # parse the two pids97 for line in p.stdout:98 #print(line)99 line = str(line, 'ascii')100 #print("here")101 #print(type(line))102 if line.startswith("parent pid:"):103 parent_pid = int(line[11:])104 break105 else:106 print(line)107 for line in p.stdout:108 line = str(line, 'ascii')109 #print("here")110 #print(type(line))111 if line.startswith("child pid:"):112 child_pid = int(line[10:])113 break114 else:115 print(line)116 #print((parent_pid, child_pid))117 get_pid_queue.put((parent_pid, child_pid))118 for line in p.stdout:119 print(line)120 p.wait()121 # need to get the stdout back from the run parent122 parent_runner_thread = threading.Thread(target=run_parent)123 parent_runner_thread.start()124 #print("queue get")125 (parent_pid,child_pid) = get_pid_queue.get()126 if trp is None:127 # display if they are running128 print(f"parent: {parent_pid} running {is_process_running(parent_pid)}")129 print(f"child: {child_pid} running {is_process_running(child_pid)}")130 else:131 trp.assert_true(f"parent running ", is_process_running(parent_pid))132 trp.assert_true(f"child running ", is_process_running(child_pid))133 # exit the process134 #c = close_stdin(p)135 #print("kill")136 os.kill(parent_pid, signal.SIGTERM)137 parent_runner_thread.join()138 139 # p.wait()140 # display again if the two processes are running141 # the child doesn't always exit fast enough without this142 # it's only needed for reliable automated testing143 time.sleep(0.01)144 if trp is None:145 print(f"parent: {parent_pid} running {is_process_running(parent_pid)}")146 print(f"child: {child_pid} running {is_process_running(child_pid)}")147 else:148 trp.assert_false(f"parent not running ", is_process_running(parent_pid))149 trp.assert_false(f"child not running ", is_process_running(child_pid))150 #return c151 152def launch_basic():153 #print("launch basic")154 print(f"parent pid: {os.getpid()}", flush=True)155 def child_process():156 # this is the line which makes sure the child exits157 # when the parent does. it has to be run in the child process158 # to make this more general, can call this then call exec*159 # it's reset on fork, but not on exec*160 yeshup.yeshup_me()161 print(f"child pid: {os.getpid()}", flush=True)162 #sys.stdout.flush()163 #for line in sys.stdin:...
oc.py
Source:oc.py
...24 @is_connected.setter25 def is_connected(self, value):26 self._is_connected = value27 @property28 def is_process_running(self):29 return self._is_process_running30 @is_process_running.setter31 def is_process_running(self, value):32 self._is_process_running = value33 @property34 def check_process_enabled(self):35 return self._check_process_enabled36 @check_process_enabled.setter37 def check_process_enabled(self, value):38 self._check_process_enabled = value39 @staticmethod40 def get_server_cert():41 result = subprocess.getoutput(f"echo {settings.current_profile['password']} | openconnect "42 f"--no-dtls --authgroup MGT {settings.current_profile['server']}"43 f" --passwd-on-stdin")44 cert = re.findall('--servercert (.+)\n', result)45 if not cert:...
menu.py
Source:menu.py
...55 ready_data = True56 elif key == '3':57 ns_loop(data, answers, zet_path)58 elif key == '4':59 if not app.is_process_running():60 app.start("OscGraph.exe")61 if not gen.is_process_running():62 gen.start("DAC_OCX.exe")63 elif key == '0':64 if app.is_process_running():65 app.kill()66 if gen.is_process_running():67 gen.kill()68 sys.exit()69if __name__ == "__main__":70 zet_path = "C:/Users/Public/Documents/ZETLab/artem/result" # ÐУТЬ Ð ÐÐÐÐЫÐ71 try:72 rmtree(zet_path)73 except FileNotFoundError:74 pass75 print("Find path to oscgraph:", zet_path)76 app = Application(backend="uia")77 gen = Application(backend="uia")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!