Best Python code snippet using Airtest
DataBase.py
Source:DataBase.py
...30 :param kind: ç§ç±»31 :param time: æ¶é´32 :return: å¸å°åæ°æ®33 '''34 if(self.test_connect()):35 conn,cursor = self.connect()36 sql="INSERT INTO `word` (`uuid`, `word`, `kind`, `time`, `status`) VALUES (%s,%s,%s,%s,%s)"37 try:38 time=datetime.datetime.now()39 cursor.execute(sql,(uuid,word,kind,str(time),'1'))40 conn.commit()41 return True42 except:43 conn.rollback()44 return False45 cursor.close()46 conn.close()47 else:48 print('æ°æ®åºè¿æ¥å¤±è´¥')49 return False50 def test_connect(self):51 '''52 æµè¯æ°æ®åºè¿æ¥53 :return: æåè¿åTrue 失败è¿åFalse54 '''55 try:56 conn,cursor = self.connect()57 conn.close()58 cursor.close()59 return True60 except:61 return False62 def get_word(self):63 '''64 è·åè¯æç¨65 :return: è¿åè¯æææ¬ä¸uuid66 '''67 if(self.test_connect()):68 conn,cursor = self.connect()69 sql = "select `uuid`,`word` from `word` where `status` = '1' limit 1"70 try:71 cursor.execute(sql)72 result = cursor.fetchall()73 for row in result:74 uuid = row[0]75 text = row[1]76 if(self.set_status(uuid)):77 return str(uuid),str(text)78 else:79 print('æ æ³æ´æ°è¯æç¶æ')80 except:81 conn.rollback()82 print('sqlè¯å¥æ§è¡å¤±è´¥')83 conn.close()84 else:85 print('æ°æ®åºè¿æ¥å¤±è´¥')86 def set_status(self,uuid):87 '''88 讲读åçè¯æç¶æ设置为已读å89 :param uuid: è¯æid90 :return: bool91 '''92 if(self.test_connect()):93 conn,cursor = self.connect()94 sql = "update `word` set `status` = 1 where uuid = %s"95 try:96 cursor.execute(sql,(str(uuid)))97 conn.commit()98 return True99 except:100 conn.rollback()101 return False102 conn.close()103 else:104 print('æ°æ®åºè¿æ¥å¤±è´¥')105 return False106if __name__ == '__main__':107 #print(DataBase().insert_word('1','3æ9æ¥ææ对ææä½åºæ示','æ°é»'))...
sub0.py
Source:sub0.py
...37 if delay!=0:38 time.sleep(delay)39########4041def test_connect(cleansession, will, duration, wait1=5, wait2=2):4243 mystring = f"""44 %%%%%%%%%%%%%%%%%%%%%%%%45 test_connect46 1. Connect with: 47 a. cleansession: {cleansession}48 b. will: {will}49 c. duration: {duration}50 2. Wait for ConnAck51 3. Run loop_start()52 4. Wait {wait1} sec53 5. Disconnect()54 6. Wait {wait2} sec55 %%%%%%%%%%%%%%%%%%%%%%%%"""56 print(mystring)5758 parameters = {59 "duration": duration,60 "wait1": wait1,61 "wait2": wait2,6263 }64 print(parameters)65 client = Client("linh")#change so only needs name66 client.message_arrived_flag=False67 client.registerCallback(MyCallback())68 client.connected_flag=False69 client.set_will("sub0", "disconnect now", 0, False)7071 client.connect(host,port,duration, cleansession, will)72 client.lookfor(MQTTSN.CONNACK)73 try:74 if client.waitfor(MQTTSN.CONNACK)==None:75 print("connection failed")76 raise SystemExit("no Connection quitting")77 except:78 print("connection failed")79 raise SystemExit("no Connection quitting")8081 client.loop_start() #start loop82 time.sleep(wait1)8384 client.disconnect()8586 time.sleep(wait2)87 client.loop_stop() #stop loop88 assert client.connected_flag == False89 time.sleep(disconnect_time)90 9192#if __name__ == "__main__":93host="127.0.0.1"94port=6000095#m_port=1885 #port gateways advertises on96#m_group="225.0.18.83" #IP gateways advertises on9798conn_time = 399disconnect_time = 5100101test_connect(cleansession=True, will=False, duration=10)102test_connect(cleansession=True, will=True, duration=10)103test_connect(cleansession=False, will=True, duration=10)104test_connect(cleansession=False, will=False, duration=10)105106# Connection timeout because wait1=15 while duration=10
...
Multistream_opencv.py
Source:Multistream_opencv.py
1'''2The purpose of this code is to shows how to connect to a JAI multi channel camera and show the images from the different channels through opencv34'''56import pyjds7import numpy as np8import cv2 as cv910# multi channel11#Find Camera12camera=pyjds.DeviceFinder().find()13print("number of cameras : ", len(camera))14print(camera[0].connection_id)1516# Connect to camera17test_connect=pyjds.DeviceFactory.create(camera[0])18test_connect.connect()192021#set autowhitebalance, feature module, enum and enum entry22feature_white=test_connect.get_feature("BalanceWhiteAuto")23print("white visibility = ", feature_white.visibility)24print("white = ", feature_white.description)25print("white = ", feature_white.entries)26print("white available= ", feature_white.is_available())27print("white = ", feature_white.value_as_str)2829# Open stream30streams=test_connect.create_and_open_streams()31print("number of streams : ", len(streams))32stream_RGB_fps=streams[0].get_parameter("AcquisitionRate")33stream_NIR_fps=streams[1].get_parameter("AcquisitionRate")3435#Start acquisition36test_connect.acquisition_start()373839while True:40 # Get image data from the two channels41 RGB = streams[0].get_buffer().get_image().get_data()42 cv.imshow("RGB", RGB)43 #NIR = streams[1].get_buffer().get_image().get_data()44 #cv.imshow("NIR", NIR)4546 # Print frames for the two channels47 print("RGB fps : ", stream_RGB_fps.value)48 #print("NIR fps : ", stream_NIR_fps.value)4950 # Wait for the user to press 'q' key to stop the recording51 cv.waitKey(1)52 if cv.waitKey(1) == ord('q'):53 break5455#cleanup and stop acquisition56cv.destroyAllWindows()57del RGB58del NIR5960test_connect.acquisition_stop()61test_connect.dis_connect()
...
Find_and_connect.py
Source:Find_and_connect.py
1'''2The purpose of this sample is to show how to find a USB or GigE device, connect and change variables.345'''678import pyjds9import numpy as np10import cv2 as cv11121314# Auto find cameras15camera=pyjds.DeviceFinder().find()16print("number of cameras : ", len(camera))1718# Create device 19test_connect=pyjds.DeviceFactory.create(camera[0])2021# Connect 22test_connect.connect()23print("Is connected : ", test_connect.is_connected())2425# Get Pixel value26feature_pixel=test_connect.get_feature("PixelFormat")27print(feature_pixel.value_as_str)2829# Get and set width of image30feature_width=test_connect.get_feature("Width")31print("original width = ", feature_width.value)32feature_width.value=80033print("new width = ", feature_width.value)3435# 0=single mode, 1=multi, 2=continuous36feature_mode=test_connect.get_feature("AcquisitionMode")37feature_mode.value=238print("mode = ", feature_mode.value)3940# create stream and start acquisition41streams=test_connect.create_and_open_streams()42print("number of stream: ", len(streams))43test_connect.acquisition_start()444546# Get parameter node47stream_param=streams[0].get_parameter("Bandwidth")48stream_fps=streams[0].get_parameter("AcquisitionRate")4950loop=05152# Get buffer and parameter values while running53while loop <= 10:54 buffer = streams[0].get_buffer()55 print("Block ID ", buffer.block_id)56 loop = loop+157 print("Bandwidth : ", stream_param.value)58 print("Fps : ", stream_fps.value)5960# Cleamup and stop acquisition61del buffer62test_connect.acquisition_stop()63test_connect.dis_connect()
...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!