Best Python code snippet using yandex-tank
ZumoTest.py
Source:ZumoTest.py
...56 if(topic == "/team20/debug"):57 on_msg_debug(decoded)58 elif(topic == "/team20/state"):59 on_msg_state(decoded)60def send_config(msgType, msgValue):61 global ID62 topic = "/team20/config"63 data = {"ID": ID[topic],64 "Type": msgType,65 "Value": msgValue}66 package = json.dumps(data)67 client.publish(topic,package)68 ID[topic] += 169 print("Sent",package,"to",topic,"@",round(time.time() - starttime,2))70def resetConfig():71 send_config(3, 0) #set speed72 time.sleep(1)73 send_config(8, 0) #disable movement74 send_config(7, 0) #disable pixy75 send_config(6, 0) #disable sensor76 send_config(2, 0) #disable PID77 78 79def test_pause():80 global tests81 print("Running test: pause @",round(time.time() - starttime,2))82 check1 = False83 check2 = False84 send_config(8, 1) #enable movement85 send_config(7, 0) #disable pixy86 send_config(6, 0) #disable sensor87 send_config(2, 0) #disable PID88 send_config(3, 100)#set speed89 time.sleep(2)90 send_config(1, 2) #rover loading91 time.sleep(2)92 if statePaused == 1:93 check1 = True94 send_config(1, 1) #rover moving95 time.sleep(2)96 if statePaused == 0:97 check2 = True98 if check1 and check2:99 tests["pause"] = True100def test_PID_straight():101 print("Running test: PID straight @",round(time.time() - starttime,2))102 send_config(8, 1) #enable movement103 send_config(7, 0) #disable pixy104 send_config(6, 0) #disable sensor105 send_config(2, 1) #enable PID106 send_config(3, 50)#set speed107 time.sleep(1)108 PIDLeftVals.clear()109 PIDRightVals.clear()110 time.sleep(5)111 send_config(3, 100)#set speed112 time.sleep(5)113 send_config(3, 150)#set speed114 time.sleep(5)115 send_config(3, 200)#set speed116 time.sleep(5)117 send_config(3, 255)#set speed118 time.sleep(5)119 send_config(3, 0)#set speed120 plt.plot(PIDLeftVals)121 plt.plot(PIDRightVals)122 plt.ylabel('PID adjustment')123 plt.xlabel('trial')124 plt.show()125def test_PID_turning():126 print("Running test: PID turning @",round(time.time() - starttime,2))127 send_config(8, 1) #enable movement128 send_config(7, 0) #disable pixy129 send_config(6, 0) #disable sensor130 send_config(2, 1) #enable PID131 send_config(5, 50)#turn left132 time.sleep(1)133 PIDLeftVals.clear()134 PIDRightVals.clear()135 time.sleep(5)136 send_config(5, 100)#set speed137 time.sleep(5)138 send_config(5, 150)#set speed139 time.sleep(5)140 send_config(5, 200)#set speed141 time.sleep(5)142 send_config(5, 255)#set speed143 time.sleep(5)144 send_config(5, 0)#set speed145 plt.plot(PIDLeftVals)146 plt.plot(PIDRightVals)147 plt.ylabel('PID adjustment')148 plt.xlabel('trial')149 plt.show()150def test_distance():151 global tests152 print("Running test: distance @",round(time.time() - starttime,2))153 check1 = False154 check2 = False155 check3 = False156 send_config(3, 0) #set speed157 send_config(7, 0) #disable pixy158 send_config(6, 0) #disable sensor159 send_config(2, 0) #disable PID160 send_config(8, 1) #disable movement161 input("distance: press enter to check 20 inches")162 sensorVals.clear()163 time.sleep(2)164 if 18 <= statistics.mean(sensorVals) <= 22:165 check1 = True166 print("Average for 20 inches was:",statistics.mean(sensorVals))167 input("distance: press enter to check 12 inches") 168 sensorVals.clear()169 time.sleep(2)170 if 10 <= statistics.mean(sensorVals) <= 14:171 check2 = True172 print("Average for 12 inches was:",statistics.mean(sensorVals))173 input("distance: press enter to check 6 inches")174 sensorVals.clear()175 time.sleep(2)176 if 4 <= statistics.mean(sensorVals) <= 8:177 check3 = True178 print("Average for 6 inches was:",statistics.mean(sensorVals)) 179 if check1 and check2 and check3:180 tests["distance"] = True181def test_capture():182 global tests183 print("Running test: capture @",round(time.time() - starttime,2))184 check1 = False185 check2 = False186 check3 = False187 send_config(8, 1) #enable movement188 send_config(7, 0) #disable pixy189 send_config(6, 0) #disable sensor190 send_config(2, 0) #disable PID191 send_config(3, 100) #set speed192 time.sleep(1)193 capLeftVals.clear()194 capRightVals.clear()195 time.sleep(5.5)196 print(capLeftVals)197 print(capRightVals)198 if len(capLeftVals) >= 12 and len(capRightVals) >= 12:199 check1 = True200 if 4400 < statistics.mean(capLeftVals) < 7000:201 check2 = True202 if 4400 < statistics.mean(capRightVals) < 7000:203 check3 = True204 if check1 and check2 and check3:205 tests["capture"] = True206def test_pantilt():207 print("Running test: pan/tilt @",round(time.time() - starttime,2))208 send_config(8, 1) #enable movement209 send_config(7, 0) #disable pixy210 send_config(6, 0) #disable sensor211 send_config(2, 0) #disable PID212 time.sleep(10)213 if stateTracking == 1: #found it214 tests["pantilt"] = True215def test_turning():216 print("Running test: turning @",round(time.time() - starttime,2))217 send_config(8, 1) #enable movement218 send_config(7, 1) #enable pixy219 send_config(6, 0) #disable sensor220 send_config(2, 0) #disable PID221 time.sleep(20)222def test_movement():223 global tests224 print("Running test: movement @",round(time.time() - starttime,2))225 send_config(8, 1) #enable movement226 send_config(7, 0) #disable pixy227 send_config(6, 1) #enable sensor228 send_config(2, 0) #disable PID229 send_config(3, 255) #set speed230 time.sleep(20)231 sensorVals.clear()232 time.sleep(1)233 if 6 <= statistics.mean(sensorVals) <= 12:234 tests["movement"] = True235def test_sync():236 global tests237 print("Running test: track @",round(time.time() - starttime,2))238 send_config(8, 0) #disable movement239 send_config(7, 0) #disable pixy240 send_config(6, 0) #disable sensor241 send_config(2, 0) #disable PID242 while(stateTracking != 0):243 pass #wait for pixy to lose target244 print("sync: target lost")245 while(stateTracking != 1):246 pass #wait for pixy to find target247 if stateTracking == 1: #found it248 tests["sync"] = True249def test_approach():250 global tests251 print("Running test: approach @",round(time.time() - starttime,2))252 send_config(8, 1) #enable movement253 send_config(7, 1) #enable pixy254 send_config(6, 1) #enable sensor255 send_config(2, 1) #enable PID256 time.sleep(15)257 sensorVals.clear()258 time.sleep(2)259 tests["approach"] = True260def run_tests():261 print("Thread started: run_tests")262 waiting = 0263 while(not connected):264 time.sleep(1)265 print("Waiting for connection:", waiting)266 waiting+=1267 run_tests = ["pantilt", "distance", "movement", "sync", "turning", "pause", "capture", "PID_straight", "PID_turning", "approach"]268 for func in run_tests:269 input("Press Enter to continue to test: " + func)...
sender.py
Source:sender.py
1import threading2import pickle3import socket4from Utils import PDU, Config, CRC5import os6import signal7import random8send_config = Config() #读åé
ç½®æ件9event = threading.Event() #event flag10lock = threading.RLock() #éå½é11send_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #客æ·ç«¯socket12send_socket.bind(send_config.send_addr)13send_file = open(send_config.send_file, 'rb') #以äºè¿å¶æ¹å¼æå¼å¾
åéçæ件ï¼æµè¯åéä¸ä»½pdfæ件14send_log = open(send_config.send_log, 'w') #å建æ¥å¿15# æ¯å¦äº§çé误16def should_error():17 global host2_config18 # 产çä¸ä¸ª1~error_rateçéæºæ°ï¼å¦æå好çäºerror_rateè¿åTrue19 return True if random.randint(1, send_config.error_rate) == send_config.error_rate else False20# å¡«å
pduéå21def fill_pdu_q():22 global send_config23 global send_file24 while True:25 # ç»´æ¤æ»å¨çªå£ï¼å½å·²åéçpduä¸å·²æ¶å°çackä¹å·®å°äºçäºçªå£å¤§å°æ¶æçææ°çpdu26 while send_config.pdu_to_send - send_config.acked_num <= send_config.sw_size:27 if send_config.pdu_to_send > send_config.pdu_sum + 1: #åéå®æ件åååéä¸ä»½ç©ºæ°æ®28 break29 lock.acquire() #å é30 send_file.seek((send_config.pdu_to_send - 1) * send_config.data_size) #使æ件æéæåå½ååºè¯¥è¯»åçå段31 data = send_file.read(send_config.data_size) #读åæå®å¤§å°çå
容32 checksum = CRC().calculate(data) #计ç®checksum33 if should_error(): #模æåºé34 checksum += 135 pdu = PDU(num_to_send=send_config.num_to_send,36 pdu_to_send=send_config.pdu_to_send,37 status=('NEW' if send_config.pdu_to_send > send_config.pdu_to_resend else 'RT'),38 acked_num=send_config.acked_num,39 data=data,40 checksum='%d'%checksum)41 send_config.pdu_q.put(pdu) #å å
¥éå42 send_config.num_to_send += 1 #åé次æ°+143 send_config.pdu_to_send += 1 #åépduæ°+144 lock.release() #éæ¾é45 event.set() #设置event为tureï¼å¤ésend_frame线ç¨46#åépduéå47def send_pdu():48 global send_config49 global send_socket50 global send_log51 print('%d pdus to be send' % send_config.pdu_sum)52 event.wait() #é»å¡çå¾
fill_pdu_q线ç¨53 while True:54 lock.acquire() #å é55 while not send_config.pdu_q.empty(): #pduéåä¸ç©ºæ¶åé56 pdu = send_config.pdu_q.get()57 send_socket.sendto(pickle.dumps(pdu.get_pdu()), send_config.recv_addr) #frameåéè³æå¡ç«¯58 log = pdu.get_log() #è·åæ¥å¿59 send_log.write(log + '\n') #åå
¥æ¥å¿60 print(log)61 lock.release() #éæ¾é62 event.clear() #设置event为false63 event.wait() #é»å¡çå¾
fill_pud_qéå64#æ¥æ¶ack65def receive_ack():66 global host2_config67 global host2_socket68 global host2_send_file69 global host2_send_log70 while True:71 # print("çå¾
æ¥æ¶%d" % (config.acked_num+1))72 send_socket.settimeout(send_config.timeout) # 设置é»å¡æ¥åæ¶é´73 try:74 ack = send_socket.recvfrom(1024)[0]75 except socket.timeout: # è¶
æ¶éå76 threading.Thread(name='resend', target=resend).start()77 continue78 ack = pickle.loads(ack)79 lock.acquire()80 send_config.acked_num = ack # ä¿®æ¹å·²æ¶å°çack81 print('receive ack: ', ack)82 if send_config.acked_num == send_config.pdu_sum + 1: #ææackæ¥åå®æ¯ï¼ç»æè¿ç¨83 print('send complete')84 send_log.write('send complete\n')85 send_file.close()86 send_log.close()87 os.kill(os.getpid(), signal.SIGTERM)88 lock.release()89#å¼å¯éå90def resend():91 global host2_config92 lock.acquire()93 send_config.pdu_to_resend = send_config.pdu_to_send - 1 #设置éè¦éåçpduåºå·94 send_config.pdu_to_send = send_config.acked_num + 1 #éæ°è®¾ç½®å·²ç»åéçpduåºå·95 print('resend pdu from %d to %d' % (send_config.pdu_to_send, send_config.pdu_to_resend))96 lock.release()97if __name__ == '__main__':98 fill_pdu_q = threading.Thread(name='fill_pud_q', target=fill_pdu_q)99 send_pdu = threading.Thread(name='send_pdu', target=send_pdu)100 receiver_ack = threading.Thread(name='receive_ack', target=receive_ack)101 fill_pdu_q.start()102 send_pdu.start()...
send_email.py
Source:send_email.py
1import smtplib2from email import encoders3from email.mime.multipart import MIMEMultipart4from email.mime.text import MIMEText5from email.mime.base import MIMEBase6def send(send_config):7 # extract from send_config8 from_address = send_config["from_address"]9 password = send_config["password"]10 from_name = send_config["from_name"]11 subject = send_config["subject"]12 dest_address = send_config["dest_address"]13 html_content = send_config["html_content"]14 opened_attachment = send_config["opened_attachment"]15 attachment_name = send_config["attachment_name"]16 # connect to server17 server = smtplib.SMTP("smtp.gmail.com", 587)18 server.starttls()19 server.login(from_address, password)20 # message21 msg = MIMEMultipart()22 msg['Subject'] = subject23 msg['From'] = from_name24 msg['To'] = dest_address25 # plain text can also be sent with "text" instead of "html"26 msg.attach(MIMEText(html_content, 'html'))27 # email attachment28 if send_config["opened_attachement"]:29 attachment = open(opened_attachment, "rb")30 p = MIMEBase("application", "octet-stream") # payload object31 p.set_payload(attachment.read())32 encoders.encode_base64(p)33 p.add_header("Content-Disposition", "attachment; filename=" + attachment_name)34 msg.attach(p)35 # send mail36 text = msg.as_string()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!