Best Python code snippet using Airtest
test_run.py
Source: test_run.py
1import asyncio2from datetime import datetime3from time import time4import pytest5from further_link.util.message import create_message, parse_message6from .helpers import receive_data, wait_for_data7@pytest.mark.asyncio8async def test_bad_message(run_ws_client):9 start_cmd = create_message("start", {"runner": "python3", "code": ""})10 await run_ws_client.send_str(start_cmd)11 await wait_for_data(run_ws_client, "error", "message", "Bad message")12@pytest.mark.asyncio13async def test_run_code_script(run_ws_client):14 code = """\15from datetime import datetime16print(datetime.now().strftime("%A"))17"""18 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")19 await run_ws_client.send_str(start_cmd)20 await receive_data(run_ws_client, "started", process="1")21 day = datetime.now().strftime("%A")22 await wait_for_data(run_ws_client, "stdout", "output", day + "\n", 0, "1")23 await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")24@pytest.mark.asyncio25async def test_run_shell(run_ws_client):26 code = """\27date +%s # unix time in seconds28"""29 start_cmd = create_message("start", {"runner": "shell"}, "1")30 await run_ws_client.send_str(start_cmd)31 await receive_data(run_ws_client, "started", process="1")32 commands = create_message("stdin", {"input": code}, "1")33 await run_ws_client.send_str(commands)34 seconds = str(int(time()))35 await wait_for_data(run_ws_client, "stdout", "output", seconds + "\n", 0, "1")36 stop_cmd = create_message("stop", None, "1")37 await run_ws_client.send_str(stop_cmd)38 await wait_for_data(run_ws_client, "stopped", "exitCode", -15, 0, "1")39@pytest.mark.asyncio40async def test_run_executable(run_ws_client):41 code = """\42#!/bin/bash43date +%s # unix time in seconds44"""45 start_cmd = create_message("start", {"runner": "exec", "code": code}, "1")46 await run_ws_client.send_str(start_cmd)47 await receive_data(run_ws_client, "started", process="1")48 seconds = str(int(time()))49 await wait_for_data(run_ws_client, "stdout", "output", seconds + "\n", 0, "1")50 await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")51@pytest.mark.asyncio52async def test_run_two_scripts(run_ws_client):53 code1 = """\54from time import sleep55sleep(1)56print(1)57"""58 code2 = """\59print(2)60"""61 start_cmd = create_message("start", {"runner": "python3", "code": code1}, "1")62 await run_ws_client.send_str(start_cmd)63 await receive_data(run_ws_client, "started", process="1")64 start_cmd = create_message("start", {"runner": "python3", "code": code2}, "2")65 await run_ws_client.send_str(start_cmd)66 await receive_data(run_ws_client, "started", process="2")67 await wait_for_data(run_ws_client, "stdout", "output", "2\n", 0, "2")68 await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "2")69 await wait_for_data(run_ws_client, "stdout", "output", "1\n", 0, "1")70 await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")71@pytest.mark.asyncio72async def test_stop_early(run_ws_client):73 code = "while True: pass"74 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")75 await run_ws_client.send_str(start_cmd)76 await receive_data(run_ws_client, "started", process="1")77 stop_cmd = create_message("stop", None, "1")78 await run_ws_client.send_str(stop_cmd)79 await wait_for_data(run_ws_client, "stopped", "exitCode", -15, 0, "1")80@pytest.mark.asyncio81async def test_bad_code(run_ws_client):82 code = "i'm not valid python"83 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")84 await run_ws_client.send_str(start_cmd)85 await receive_data(run_ws_client, "started", process="1")86 await asyncio.sleep(0.1) # wait for data87 message = await run_ws_client.receive()88 m_type, m_data, m_process = parse_message(message.data)89 assert m_type == "stderr"90 lines = m_data["output"].split("\n")91 assert lines[0].startswith(" File")92 assert lines[1] == " i'm not valid python"93 assert lines[2][-1] == "^"94 assert lines[3] == "SyntaxError: EOL while scanning string literal"95 await wait_for_data(run_ws_client, "stopped", "exitCode", 1, 0, "1")96@pytest.mark.asyncio97async def test_input(run_ws_client):98 code = """s = input()99while "BYE" != s:100 print(["HUH?! SPEAK UP, SONNY!","NO, NOT SINCE 1930"][s.isupper()])101 s = input()"""102 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")103 await run_ws_client.send_str(start_cmd)104 await receive_data(run_ws_client, "started", process="1")105 user_input = create_message("stdin", {"input": "hello\n"}, "1")106 await run_ws_client.send_str(user_input)107 await wait_for_data(108 run_ws_client, "stdout", "output", "HUH?! SPEAK UP, SONNY!\n", 0, "1"109 )110 user_input = create_message("stdin", {"input": "HEY GRANDMA\n"}, "1")111 await run_ws_client.send_str(user_input)112 await wait_for_data(113 run_ws_client, "stdout", "output", "NO, NOT SINCE 1930\n", 0, "1"114 )115 user_input = create_message("stdin", {"input": "BYE\n"}, "1")116 await run_ws_client.send_str(user_input)117 await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")118@pytest.mark.asyncio119async def test_two_clients(run_ws_client, run_ws_client2):120 code = "while True: pass"121 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")122 await run_ws_client.send_str(start_cmd)123 await receive_data(run_ws_client, "started", process="1")124 await run_ws_client2.send_str(start_cmd)125 await receive_data(run_ws_client2, "started", process="1")126 stop_cmd = create_message("stop", None, "1")127 await run_ws_client.send_str(stop_cmd)128 await wait_for_data(run_ws_client, "stopped", "exitCode", -15, 100, "1")129 stop_cmd = create_message("stop", None, "1")130 await run_ws_client2.send_str(stop_cmd)131 await wait_for_data(run_ws_client2, "stopped", "exitCode", -15, 100, "1")132@pytest.mark.asyncio133async def test_out_of_order_commands(run_ws_client):134 # send input135 user_input = create_message("stdin", {"input": "hello\n"}, "1")136 await run_ws_client.send_str(user_input)137 # bad message138 await receive_data(run_ws_client, "error", "message", "Bad message")139 # send stop140 stop_cmd = create_message("stop", None, "1")141 await run_ws_client.send_str(stop_cmd)142 # bad message143 await receive_data(run_ws_client, "error", "message", "Bad message")144 # send start145 code = "while True: pass"146 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")147 await run_ws_client.send_str(start_cmd)148 # started149 await receive_data(run_ws_client, "started", process="1")150 # send start151 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")152 await run_ws_client.send_str(start_cmd)153 # bad message154 await receive_data(run_ws_client, "error", "message", "Bad message")155 # send stop156 stop_cmd = create_message("stop", None, "1")157 await run_ws_client.send_str(stop_cmd)158 # stopped159 await wait_for_data(run_ws_client, "stopped", "exitCode", -15, 0, "1")160 # send stop161 stop_cmd = create_message("stop", None, "1")162 await run_ws_client.send_str(stop_cmd)163 # bad message164 await receive_data(run_ws_client, "error", "message", "Bad message")165@pytest.mark.asyncio166async def test_discard_old_input(run_ws_client):167 code = 'print("hello world")'168 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")169 await run_ws_client.send_str(start_cmd)170 await receive_data(run_ws_client, "started", process="1")171 unterminated_input = create_message("stdin", {"input": "unterminated input"}, "1")172 await run_ws_client.send_str(unterminated_input)173 await wait_for_data(run_ws_client, "stdout", "output", "hello world\n", 100, "1")174 await wait_for_data(run_ws_client, "stopped", "exitCode", 0, 0, "1")175 code = "print(input())"176 start_cmd = create_message("start", {"runner": "python3", "code": code}, "1")177 await run_ws_client.send_str(start_cmd)178 await receive_data(run_ws_client, "started", process="1")179 user_input = create_message("stdin", {"input": "hello\n"}, "1")180 await run_ws_client.send_str(user_input)181 await wait_for_data(run_ws_client, "stdout", "output", "hello\n", 0, "1")...
ConmmunicationInterface.py
Source: ConmmunicationInterface.py
1from serial import Serial2import _thread3import serial4import time5class CommunicationInterface:6 def __init__(self,com):7 self.ser = None8 self.com_name = ''9 self.id_mapping = {10 # æ ¼å¼ä¸º[Screen id, component id]11 "minute":[0x00,0x01],12 'second':[0x00,0x02],13 'Tw':[0x00,0x07],14 'Tc':[0x00,0x15],15 "voltage_max":[0x00,0x26],16 'width1_us_0':[0x01,0x4],17 'width1_us_1':[0x01,0x7],18 'width1_us_2':[0x01,0x3B],19 'width2_us_0':[0x01,0x05],20 'width2_us_1':[0x01,0x08],21 'width2_us_2':[0x01,0x0B],22 'phase_us_0':[0x01,0x06],23 'phase_us_1':[0x01,0x09],24 'phase_us_2':[0x01,0x0C],25 'D1':[0x01,0x16],26 'D2':[0x01,0x17],27 'phase2':[0x01,0x18],28 'M':[0x01,0x1E],29 'index':[0x01,0x0D],30 'pulse_source_index':[0x00,0x12],31 'frequency_0':[0x01,0x1A],32 'frequency_1':[0x01,0x1B],33 'frequency_2':[0x01,0x1C],34 'internal':[0x00,0x7A],35 'external':[0x00,0x7B],36 'preset0':[0x01,0x7C],37 'preset1':[0x01,0x7D],38 'preset2':[0x01,0x7E]39 }40 def set_com(self,com_name):41 self.com_name = com_name42 print(f"[INFO] Change to {com_name}")43 def start_com(self):44 try:45 self.ser.close()46 except:47 pass48 if(self.ser is None):49 self.ser=Serial(self.com_name,115200,timeout=0.5)50 else:51 self.ser.open()52 # self.ser.open()53 print("[INFO] serial connected")54 def com_valid(self):55 return self.ser is not None56 def stop_com(self):57 start_cmd = bytes()58 start_cmd += bytes([0xEE,0xB1,0x11])59 start_cmd += bytes([0x00,0x00])60 start_cmd += bytes([0x00,0x69])61 start_cmd += bytes([0x00,0xFF,0xFC,0xFF,0xFF])62 start_cmd += '\n'.encode()63 self.ser.write(start_cmd)64 time.sleep(1)65 self.ser.close()66 print("[INFO] serial disconnected")67 68 def format_data(self,name,value):69 # 转æ¢æ°æ®çæ ¼å¼ï¼çææ°æ®å¸§70 format_result = bytes()71 format_result += bytes([0xEE,0xB1,0x11])72 try:73 format_result += bytes([0x00,self.id_mapping[name][0]])74 format_result += bytes([0x00,self.id_mapping[name][1],0x11])75 format_result += str(value).encode()76 except KeyError:77 print('[ERROR] Variable does not exist, please check it!')78 format_result += bytes([0x00,0xFF,0xFC,0xFF,0xFF])79 format_result += '\n'.encode()80 return format_result81 def start_pwm(self):82 start_cmd = bytes()83 start_cmd += bytes([0xEE,0xB1,0x11])84 start_cmd += bytes([0x00,0x00])85 start_cmd += bytes([0x00,0x08])86 start_cmd += bytes([0x00,0xFF,0xFC,0xFF,0xFF])87 start_cmd += '\n'.encode()88 try:89 self.ser.write(start_cmd)90 print("[INFO] PWM has started")91 except serial.serialutil.PortNotOpenError:92 print("[ERROR] Port has been closed")93 raise AttributeError94 def stop_pwm(self):95 start_cmd = bytes()96 start_cmd += bytes([0xEE,0xB1,0x11])97 start_cmd += bytes([0x00,0x00])98 start_cmd += bytes([0x00,0x09])99 start_cmd += bytes([0x00,0xFF,0xFC,0xFF,0xFF])100 start_cmd += '\n'.encode()101 try:102 self.ser.write(start_cmd)103 print("[INFO] PWM has stopped")104 except serial.serialutil.PortNotOpenError:105 print("[ERROR] Port has been closed")106 raise AttributeError107 def send_data(self,name,value):108 # å°æ°æ®æ ¼å¼è½¬å109 try:110 formatted_data = self.format_data(name, value)111 # # 转æ¢ä¸ºå¯ä»¥äººè½ççåå
è¿å¶çæ°æ®112 # str_hex = ":".join("{:02x}".format(ord(c)) for c in formatted_data)113 # æå°114 # print(f"Send data {str_hex}")115 # è¾åºå°ä¸²å£116 self.ser.write(formatted_data)117 print("[INFO] %s has been changed to %s"%(name.capitalize(),value))118 except serial.serialutil.PortNotOpenError:119 print("[ERROR] Port has been closed")120 # raise AttributeError121 def send_number(self,name,value):122 format_result = bytes()123 format_result += bytes([0xEE,0xB1,0x11])124 try:125 format_result += bytes([0x00,self.id_mapping[name][0]])126 format_result += bytes([0x00,self.id_mapping[name][1],0x11])127 format_result += bytes([value])128 except KeyError:129 print('[ERROR] Variable does not exist, please check it!')130 format_result += bytes([0x00,0xFF,0xFC,0xFF,0xFF])131 format_result += '\n'.encode()132 print(format_result)133 self.ser.write(format_result)134 # change trigger mode index135 def change_pulse_index(self,trigger_mode):136 """ä¿®æ¹PWMç触å模å¼137 Args:138 trigger_mode (str): internalæè
external139 """140 if(trigger_mode == 'internal'):141 self.send_number("pulse_source_index",0x00)142 print("[INFO] Index change to internal")143 else:144 self.send_number("pulse_source_index",0x01)145 print("[INFO] Index change to external")146 # change preset index147 def change_index(self,index):148 self.send_number("index",0x00+index)149 print(f"[INFO] Pulse Index: {index}")150 @DeprecationWarning151 def change_port(self,value):152 com_name = value.split(' ')[0]153 print(com_name)154 self.set_com(com_name)155 # _thread.start_new_thread(self.get_data_from_serial, (com,))...
test_pihole_scripts.py
Source: test_pihole_scripts.py
1import pytest2@pytest.fixture3def start_cmd():4 ''' broken by default, required override '''5 return None6@pytest.fixture7def RunningPiHole(DockerPersist, Slow, persist_webserver, persist_tag, start_cmd):8 ''' Override the RunningPiHole to run and check for success of a9 dnsmasq start based `pihole` script command '''10 #print DockerPersist.run('ps -ef').stdout11 Slow(lambda: DockerPersist.run('pgrep dnsmasq').rc == 0)12 Slow(lambda: DockerPersist.run('pgrep {}'.format(persist_webserver)).rc == 0)13 oldpid = DockerPersist.run('pidof dnsmasq')14 for service in [ 'dnsmasq', 'nginx', ]:15 print DockerPersist.run('service {} status'.format(service))16 cmd = DockerPersist.run('pihole {}'.format(start_cmd))17 Slow(lambda: DockerPersist.run('pgrep dnsmasq').rc == 0)18 newpid = DockerPersist.run('pidof dnsmasq')19 for pid in [oldpid, newpid]:20 assert pid != ''21 # ensure a new pid for dnsmasq appeared due to service restart22 assert oldpid != newpid23 assert cmd.rc == 024 # Save out cmd result to check different stdout of start/enable/disable25 DockerPersist.cmd = cmd26 return DockerPersist27@pytest.mark.parametrize('start_cmd', ['start_cmd'])28def test_pihole_start_cmd(RunningPiHole, start_cmd, persist_tag):29 ''' the start_cmd tests are all built into the RunningPiHole fixture in this file '''30 assert RunningPiHole.cmd.stdout == START_DNS_STDOUT[persist_tag]31@pytest.mark.parametrize('start_cmd,hostname,expected_ip', [32 ('enable', 'pi.hole', '127.0.0.1'),33 ('disable', 'pi.hole', '127.0.0.1'),34])35def test_pihole_start_cmd(RunningPiHole, Dig, persist_tag, start_cmd, hostname, expected_ip):36 ''' the start_cmd tests are all built into the RunningPiHole fixture in this file '''37 dig_cmd = "dig +time=1 +noall +answer {} @test_pihole | awk '{{ print $5 }}'".format(hostname)38 lookup = RunningPiHole.dig.run(dig_cmd).stdout.rstrip('\n')39 assert lookup == expected_ip40 stdout = "::: Blocking has been {}d!\n".format(start_cmd)...
Check out the latest blogs from LambdaTest on this topic:
Unit testing is typically software testing within the developer domain. As the QA role expands in DevOps, QAOps, DesignOps, or within an Agile team, QA testers often find themselves creating unit tests. QA testers may create unit tests within the code using a specified unit testing tool, or independently using a variety of methods.
People love to watch, read and interact with quality content — especially video content. Whether it is sports, news, TV shows, or videos captured on smartphones, people crave digital content. The emergence of OTT platforms has already shaped the way people consume content. Viewers can now enjoy their favorite shows whenever they want rather than at pre-set times. Thus, the OTT platform’s concept of viewing anything, anytime, anywhere has hit the right chord.
Agile project management is a great alternative to traditional methods, to address the customer’s needs and the delivery of business value from the beginning of the project. This blog describes the main benefits of Agile for both the customer and the business.
The purpose of developing test cases is to ensure the application functions as expected for the customer. Test cases provide basic application documentation for every function, feature, and integrated connection. Test case development often detects defects in the design or missing requirements early in the development process. Additionally, well-written test cases provide internal documentation for all application processing. Test case development is an important part of determining software quality and keeping defects away from customers.
I routinely come across test strategy documents when working with customers. They are lengthy—100 pages or more—and packed with monotonous text that is routinely reused from one project to another. Yawn once more— the test halt and resume circumstances, the defect management procedure, entrance and exit criteria, unnecessary generic risks, and in fact, one often-used model replicates the requirements of textbook testing, from stress to systems integration.
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!