Best Python code snippet using autotest_python
tcp.py
Source:tcp.py
1# from socket import *2# tcp_socket=socket(AF_INET,SOCK_STREAM)3# addr=('192.168.1.6',8080)4# tcp_socket.connect(addr)5# data=input("请è¾å
¥ä½ è¦åéçå
容ï¼")6# tcp_socket.send(data.encode('gbk'))7# recv_from=tcp_socket.recv(1024)8# print(recv_from.decode('gbk'))9# tcp_socket.close()10# from socket import *11# #12# # å建socket13# tcp_client_socket = socket(AF_INET, SOCK_STREAM)14#15# # ç®çä¿¡æ¯16# server_addr = ('192.168.1.6', 8080)17# # é¾æ¥æå¡å¨18# tcp_client_socket.connect(server_addr)19#20# # æ示ç¨æ·è¾å
¥æ°æ®21# send_data = input("请è¾å
¥è¦åéçæ°æ®ï¼")22#23# tcp_client_socket.send(send_data.encode("gbk"))24#25# # æ¥æ¶å¯¹æ¹åéè¿æ¥çæ°æ®ï¼æ大æ¥æ¶1024个åè26# recvData = tcp_client_socket.recv(1024)27# print('æ¥æ¶å°çæ°æ®ä¸º:', recvData.decode('gbk'))28#29# # å
³éå¥æ¥å30# tcp_client_socket.close()31#32# from socket import *33#34# # å建socket35# tcp_server_socket = socket(AF_INET, SOCK_STREAM)36# # æ¬å°ä¿¡æ¯37# address = ('', 7788)38# # ç»å®39# tcp_server_socket.bind(address)40# # 使ç¨socketå建çå¥æ¥åé»è®¤çå±æ§æ¯ä¸»å¨çï¼ä½¿ç¨listenå°å
¶å为被å¨çï¼è¿æ ·å°±å¯ä»¥æ¥æ¶å«äººçé¾æ¥äº41# tcp_server_socket.listen(128)42#43# # å¦æææ°ç客æ·ç«¯æ¥é¾æ¥æå¡å¨ï¼é£ä¹å°±äº§çä¸ä¸ªæ°çå¥æ¥åä¸é¨ä¸ºè¿ä¸ªå®¢æ·ç«¯æå¡44# # client_socketç¨æ¥ä¸ºè¿ä¸ªå®¢æ·ç«¯æå¡45# # tcp_server_socketå°±å¯ä»¥çä¸æ¥ä¸é¨çå¾
å
¶ä»æ°å®¢æ·ç«¯çé¾æ¥46# client_socket, clientAddr = tcp_server_socket.accept()47# # æ¥æ¶å¯¹æ¹åéè¿æ¥çæ°æ®48# recv_data = client_socket.recv(1024) # æ¥æ¶1024个åè49# print('æ¥æ¶å°çæ°æ®ä¸º:', recv_data.decode('gbk'))50# # åéä¸äºæ°æ®å°å®¢æ·ç«¯51# client_socket.send("thank you !".encode('gbk'))52# # å
³é为è¿ä¸ªå®¢æ·ç«¯æå¡çå¥æ¥åï¼åªè¦å
³éäºï¼å°±æå³ç为ä¸è½å为è¿ä¸ªå®¢æ·ç«¯æå¡äºï¼å¦æè¿éè¦æå¡ï¼åªè½å次éæ°è¿æ¥53# client_socket.close()54# from socket import *55# tcp_socket=socket(AF_INET,SOCK_STREAM)56# addr=('192.168.1.6',8080)57# tcp_socket.connect(addr)58# data=input('请è¾å
¥ä½ è¦åéçå
容ï¼')59# tcp_socket.send(data.encode('gbk'))60# recv_from=tcp_socket.recv(1024)61# print(recv_from.decode('gbk'))62# tcp_socket.close()63# while True:64# from socket import *65# tcp = socket(AF_INET, SOCK_STREAM)66# addr = ('', 8080)67# tcp.bind(addr)68# tcp.listen(100)69# tcp2, tcp_ip = tcp.accept()70# data = input('请è¾å
¥ä½ è¦åæå¡ç«¯åéçä¿¡æ¯ï¼')71# tcp2.send(data.encode('gbk'))72# recv_from = tcp2.recv(2014)73# print('æå¡å¨ä¸ºä½ çæ¶æ¯ä¸ºï¼%s' % recv_from.decode('gbk'))74# tcp2.close()75# tcp.close()76# while True:77# from socket import *78# tcp = socket(AF_INET, SOCK_STREAM)79# addr = ('192.168.1.6', 8080)80# tcp.connect(addr)81# data = input('请è¾å
¥ä½ è¦åéçå
容ï¼')82# tcp.send(data.encode('gbk'))83# recv_from = tcp.recv(1024)84# print('æ¶å°çå
容为ï¼%s' % recv_from.decode('gbk'))85# tcp.close()86# from socket import *87# tcp_1=socket(AF_INET,SOCK_STREAM)88# addr=('',8080)89# tcp_1.bind(addr)90# tcp_1.listen(100)91# q,w=tcp_1.accept()92# recv=q.recv(1024)93# print(recv.decode('gbk'))94# q.send('thankyou'.encode('gbk'))95# print(w)96# q.close()97# from socket import *`98# tcp_socket=socket(AF_INET,SOCK_STREAM)99# addr=('192.168.1.1',8080)100# tcp_socket.connect(addr)101# data=input('请è¾å
¥ä½ è¦åéçå
容ï¼')102# tcp_socket.send(data.encode('gbk'))103# recv_from=tcp_socket.recv(1024)104# print(recv_from.decode('gbk'))105# tcp_socket.close()106# a=[1,2,3,4,5]107#108#109# def cha(a,i):110# first=0111# last=len(a)-1112# while first<=last:113# x=(last+first)//2114# if a[x]==i:115# return True116# elif i>a[x]:117# first=x+1118# else:119# last=x-1120# else:121# return False122# def cha(a,i):123# if len(a)==0:124# return False125# else:126# x=len(a)//2127# if a[x]==i:128# return True129# elif i>a[x]:130# return cha(a[x+1:],i)131# else:132# return cha(a[:x],i)133# print(cha(a,5))134# from socket import *135# tcp=socket(AF_INET,SOCK_STREAM)136# addr=('',8080)137# tcp.bind(addr)138# tcp.listen(100)139# kehu,kehu_ip=tcp.accept()140# recv_data=kehu.recv(1024)141# print('æ¶å°å®¢æ·ç«¯çä¿¡æ¯ä¸ºï¼%s'%recv_data.decode('gbk'))142# kehu.send('thankyou'.encode('gbk'))143# recv_data2=kehu.recv(1024)144# print(recv_data2.decode('gbk'))145# print(kehu_ip)146# kehu.close()147# tcp.close()148# from socket import *149# tcp_server_socket=socket(AF_INET,SOCK_STREAM)150# addr=('',7788)151# tcp_server_socket.bind(addr)152# tcp_server_socket.listen(100)153# tcp_client_socket,client_addr=tcp_server_socket.accept()154# data=input("请è¾å
¥ä½ è¦æ³æå¡ç«¯åéçä¿¡æ¯ï¼")155# tcp_client_socket.send(data.encode('gbk'))156# # data2=input('请è¾å
¥ä½ è¦åå¤å®¢æ·ç«¯çä¿¡æ¯ï¼')157# rec=tcp_client_socket.recv(2014)158# print(rec.decode('gbk'))159# tcp_client_socket.close()160# tcp_server_socket.close()161# from socket import *162# tcp_server_socket=socket(AF_INET,SOCK_STREAM)163# addr=('',8080)164# tcp_server_socket.bind(addr)165# tcp_server_socket.listen(100)166# tcp_client_socket,tcp_client_ip=tcp_server_socket.accept()167# recv_from=tcp_client_socket.recv(1024)168# print(recv_from.decode('gbk'))169# print(tcp_client_ip)170# tcp_client_socket.close()171# tcp_server_socket.close()172# from socket import *173# tcp_socket=socket(AF_INET,SOCK_STREAM)174# addr=('192.168.1.6',8080)175# tcp_socket.connect(addr)176# tcp_socket.send('123'.encode('gbk'))177# rev=tcp_socket.recv(1024)178# print(rev.decode('gbk'))179# tcp_socket.close()180import urllib.request as a181response=a.urlopen('https://www.baidu.com')182headers=response.getheaders()183for x,y in headers:184 print(x,y)185html=response.read()...
cankao2.py
Source:cankao2.py
1import socket,sys, select, time, heapq23def encode(node_name,node_graph):4 message = node_name + "\n"5 for i in node_graph:6 message += i + " " + str(node_graph[i]) + "\n"7 return message.encode("UTF-8")89def decode(message):10 msg_list = (message.decode("UTF-8")).split("\n")11 graph = {}12 node_name = msg_list[0]13 for lines in msg_list[1:]:14 data = lines.split(" ")15 if len(data) == 2:16 graph[data[0]] = float(data[1])17 return node_name, graph1819'''20the recv_from is a dictionary.21 key is name of the node whose neighbour graph is send as message22 value is the name of the node which send that message.23 { neighbour graph of A : receive from {B, C, D}}24'''25def update_graph():26 global recv_from27 global graph28 inf, outf, errf = select.select([sock, ], [], [], 0)29 global keep_alive30 if inf:31 for sockets in inf:32 message, ADDR = sockets.recvfrom(1024)33 node, recv_graph = decode(message)34 keep_alive[ADDR[1]] = time.time()35 graph[node] = recv_graph36 if node not in recv_from:37 recv_from[node] = [ADDR[1]]38 elif ADDR[1] not in recv_from[node]:39 recv_from[node].append(ADDR[1])404142def broadcast(node):43 for neighbour_node in node_port: #对äºææçç¸é»èç¹ï¼44 port = node_port[neighbour_node] # 对æ¤ç¸é»çèç¹åé4546 if node not in recv_from or port not in recv_from[node]:#å¦ææ¥åçå¾å¯¹åºçèç¹Bä¸å¨ recv_fromä¸ å¯¹åºçèç¹B çè®°å½é47 sock.sendto(encode(node, graph[node]), ("localhost", port))48def Dijkstra_Algrm(graph):49 result_set = {}50 set_N = []51 from_to_list = {}5253 self_graph = graph[node_name]54 failure_node = []55 failure_flag = True56 #find the router which is disconnected.57 for node in graph:58 for i in graph[node]:59 if graph[node][i] != float("inf"):60 failure_flag = False61 break62 if failure_flag and node not in failure_node:63 failure_node.append(node)64 failure_flag = True6566 for node in graph:67 if node_name != node and node not in failure_node:68 if node in self_graph:69 from_to_list[node] = node_name70 result_set[node] = graph[node_name][node]71 else:72 result_set[node] = float("inf")7374 sorted_result_set = sorted(result_set.items(), key=lambda d: d[1])7576 while len(set_N) < len(sorted_result_set):77 break_flag = False78 for node in sorted_result_set:79 if node[0] not in set_N:80 set_N.append(node[0])81 node_distance = node[1]82 for new_node in graph[node[0]]:83 if new_node == node_name or new_node in failure_node:84 continue85 if graph[node[0]][new_node] + node_distance < result_set[new_node]:86 from_to_list[new_node] = node[0]87 result_set[new_node] = graph[node[0]][new_node] + node_distance88 break_flag = True89 if break_flag:90 break91 sorted_result_set = sorted(result_set.items(), key=lambda d: d[1])92 return result_set, from_to_list, failure_node9394def find_shortest_path(result_set, from_to_list, failure_node):95 #result set A {'F': 4, 'C': 3, 'D': 1, 'E': 2, 'B': 2}96 #from_to_list {'F': 'E', 'C': 'E', 'D': 'A', 'E': 'D', 'B': 'A'}97 least_cost = {}98 for node in result_set:99 if node in failure_node:100 continue101 least_cost[node] = [node]102 next_node = from_to_list[node]103 while next_node != node_name:104 least_cost[node].append(next_node)105 next_node = from_to_list[next_node]106 least_cost[node].append(next_node)107 least_cost[node].reverse()108 for node in sorted(least_cost.keys()):109 path = ""110 for i in least_cost[node]:111 path += str(i)112 print("least-cost path to node %s: %s and the cost is %.1f"%(node, path, result_set[node]))113 return least_cost114115if __name__ == "__main__":116 argv = sys.argv[1:]117 node_name = argv[0]118 port_number = int(argv[1])119 config_file = open(argv[2])120121 address = ('localhost', port_number)122 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)123 sock.bind(address)124 graph = {}125 node_port = {}126 keep_alive ={}127128 # data: node_name port_number distance129 for lines in config_file:130 data = lines.split(" ")131 if not node_name in graph:132 graph[node_name] = {}133 if len(data) == 3:134 graph[node_name][data[0]] = float(data[1])135 node_port[data[0]] = int(data[2])136 keep_alive[int(data[2])] = time.time() + 3137 recv_from = {node_name:[port_number]}138 start_time = time.time()139 now_time = time.time() - 1 #make sure it will start immediately at beginning140 delete_list = []141 while 1:142 while(now_time <= start_time + 28):143 if node_name == 'E' and time.time() > start_time + 15:144 exit("terminal router E")145 if node_name == 'B' and time.time() > start_time + 27:146 exit("terminal router B")147 if time.time() >= now_time + 1:148 for node in graph:149 broadcast(node)150 now_time = time.time()151 update_graph()152 #dealing with node failures153 for port in node_port:154 try:155 if now_time - keep_alive[node_port[port]] > 3:156 graph[node_name][port] = float('inf')157 for i in graph[port]:158 graph[port][i] = float('inf')159 del keep_alive[node_port[port]]160 except KeyError:161 pass162 result_set, from_to_list, failure_node = Dijkstra_Algrm(graph)163 least_cost_path = find_shortest_path(result_set, from_to_list, failure_node)164 now_time = time.time()
...
BluetoothMgmt.py
Source:BluetoothMgmt.py
1import socket2import threading3import time4import multiprocessing56from env import *789class BluetoothMgmt(multiprocessing.Process):10 handle_q = multiprocessing.Manager().Queue()1112 def __init__(self, job_q, header):13 multiprocessing.Process.__init__(self)1415 self.header = header16 self.client = None17 self.job_q = job_q18 self.daemon = True1920 self.start()2122 def recv(self):23 while True:24 try:25 data = self.client.recv(ANDROID_BUFFER_SIZE)26 if not len(data):27 continue28 packet = data.decode('utf-8').rstrip()2930 print("[LOG][AND]", f"Received from Android: {packet}")31 self.job_q.put(f'{self.header}:{packet}\n')3233 except Exception as e:34 print("[ERR][AND]:",35 "Failed to receive data. Bluetooth Disconnected")36 break3738 time.sleep(0.00001)3940 self.close_connection()4142 def send(self, message):43 if(not self.client):44 print("[ERR][AND]:",45 f"Trying to send packet {message} but no clients connected!")46 else:47 try:48 self.client.send(message)49 time.sleep(0.6)5051 # will remove the try except, this one is needed because Nexus app and Bluetooth Terminal send in different format52 except:53 self.client.send((message).encode('utf-8'))54 time.sleep(0.6)5556 print("[LOG][AND]:",57 f'Sending "{message}" to Android successful')5859 def close_connection(self):60 self.client.close()61 self.client = None62 print("[LOG][AND]", "Android Bluetooth Connection closed")6364 def getPacketHeader(self):65 return self.header6667 def handleProcessor(self):68 while True:69 if(self.handle_q.qsize() != 0):70 packet, recv_from = self.handle_q.get()71 packet = packet.rstrip()72 self.send(f'{recv_from}:{packet}')73 self.handle_q.task_done()74 print("[LOG][AND]",75 f'Done Handling Packet from "{recv_from}" ,content: "{packet}"')76 time.sleep(0.000001)7778 def handle(self, packet, recv_from):79 self.handle_q.put((packet, recv_from))8081 def run(self):82 q_thread = threading.Thread(target=self.handleProcessor, args=())83 q_thread.start()8485 server_sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM,86 socket.BTPROTO_RFCOMM)87 server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)88 server_sock.bind((ANDROID_HOST_MAC, ANDROID_BLUETOOTH_PORT))89 server_sock.listen(ANDROID_BLUETOOTH_BACKLOG)9091 while True:92 print("[LOG][AND]", "Listening for connection")93 self.client, address = server_sock.accept()94 print("[LOG][AND]", "Connection from: "+str(address))9596 receive_thread = threading.Thread(target=self.recv, args=())97 receive_thread.start()98 receive_thread.join()99100 self.close_connection()101 server_sock.close()
...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!