Best Python code snippet using Airtest
udp_interface.py
Source:udp_interface.py
...68 self.__sockI.settimeout(None)69 except socket.timeout:70 return []71 res = [data]72 nb = self.recv_nonblocking()73 res = res + nb74 return res75 def recv_nonblocking(self):76 """77 This returns a list of bytestrings (all messages in the buffer)78 Returns an empty list if no message is present in the buffer79 Reads everything in the buffer80 """81 assert self.__sockI is not None, "receiver not initialized"82 res = []83 while True:84 try:85 if not WINDOWS:86 data, _ = self.__sockI.recvfrom(self.__buffersize, socket.MSG_DONTWAIT)87 else:88 self.__sockI.setblocking(False)89 data, _ = self.__sockI.recvfrom(self.__buffersize)90 except IOError:91 return res92 finally:93 if WINDOWS:94 self.__sockI.setblocking(True)95 res += [data]96def test_send(interface, msg):97 print(f"sending: {msg}")98 interface.send(msg)99def test_recv(interface, timeout=None):100 print(f"receiving (blocking)...")101 res = interface.recv(timeout)102 print(f"received: {res}")103def test_recv_nonblocking(interface):104 print(f"receiving (non blocking)...")105 res = interface.recv_nonblocking()106 print(f"received: {res}")107def main(args):108 # standard library imports109 import time110 ip_send = args.ipsend111 ip_recv = args.iprecv112 port_send = args.portsend113 port_recv = args.portrecv114 conn = UDPInterface()115 conn.init_sender(ip_send, port_send)116 conn.init_receiver(ip_recv, port_recv)117 test_send(conn, b"hello 0")118 test_send(conn, b"hello 1")119 test_recv(conn)120 test_send(conn, b"hello 2")121 test_send(conn, b"hello 3")122 test_recv_nonblocking(conn)123 test_recv_nonblocking(conn)124 test_send(conn, b"hello 4")125 test_recv(conn, timeout=1.0)126 test_recv(conn, timeout=1.0)127if __name__ == "__main__":128 # standard library imports129 import argparse130 parser = argparse.ArgumentParser()131 parser.add_argument('--ipsend', type=str, default="127.0.0.1", help='IP address of the drone if any.')132 parser.add_argument('--iprecv', type=str, default="127.0.0.1", help='local IP address if any.')133 parser.add_argument('--portsend', type=int, default=8989, help='Port to send udp messages to.')134 parser.add_argument('--portrecv', type=int, default=8989, help='Port to reveive udp messages from.')135 args = parser.parse_args()...
safesocket.py
Source:safesocket.py
...37 ret = None38 finally:39 self.sock.settimeout(None)40 return ret41 def recv_nonblocking(self, size):42 self.sock.settimeout(0)43 try:44 ret = self.recv(size)45 except(socket.error) as e:46 #10035 no data when nonblocking47 if e.args[0] == 10035: #errno.EWOULDBLOCK: å°¼çerrnoä¼¼ä¹ä¸ä¸è´48 ret = None49 #10053 connection abort by client50 #10054 connection reset by peer51 elif e.args[0] in [10053, 10054]: #errno.ECONNABORTED:52 raise53 else:54 raise55 return ret...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!