Best Python code snippet using robotframework
__init__.py
Source:__init__.py
...276 def read_lazy(self):277 """Lazy read from sshout. Waits a little, but does not block."""278 return self.sshout.read_lazy()279 280 def read_some(self):281 """Always read at least one block, unless the connection is closed.282 May block."""283 if self.isopen:284 return self.sshout.read_some()285 else:286 return self.sshout.read_very_lazy() 287 288 def read_all(self):289 """Reads until end of file hit. May block."""290 if self.isopen:291 return self.sshout.read_all()292 else:293 return self.sshout.read_very_lazy()294 295 def read_lw_all(self):296 """Reads until prompt returns. May block."""297 if self.isopen:298 timeout = 0299 banner = self.sshout.read_some()300 print "In read_lw_all Expecting:" + self.__prompt301 print "In read_lw_all Received:" + banner302 while banner.find(self.__prompt) == -1 and timeout < MAX_TIMEOUT:303 time.sleep(1)304 timeout += 1305 banner = banner + self.sshout.read_very_lazy()306 if self.debuglevel:307 print "================================================="308 print "Prompt: " + self.__prompt309 print banner310 print "================================================="311 return banner312 else:313 if self.debuglevel:314 print "Ssh::read_lw_all() reading very lazy when should be all"315 time.sleep(10)316 return self.sshout.read_very_lazy()317 time.sleep(5)318319 def login(self, password):320 """Logs in to the ssh host. Checks for standard prompts, and calls321 the function passed as promptcb to process them.322 Returns the login banner, or 'None' if login process aborted.323 """324 self.connected = False325 logintext='Last login:'326 text = ''327 trycnt = 0328329 self.open()330 banner = self.read_some()331332 if self.debuglevel:333 print ">> 1st banner read is: %s" % banner334 print "Logging in to:" + self.username335 print "Password:" + password336 print "Hostname:" + self.host337338 while banner.find(logintext) == -1 and banner.find("$") == -1 and banner.find("#") == -1 and banner.find(">") == -1:339 if banner.lower().find("permission denied") > -1 or banner.lower().find("try again") > -1 or \340 banner.lower().find("connection refused") > -1 or banner.lower().find("host key verification failed") > -1:341 self.close()342 return banner343344 if trycnt == 6:345 self.close()346 return banner347348 response, abort = _prompt(banner, "password", password)349 trycnt += 1350351 if abort:352 self.close()353 return banner354355 if response != "":356 if self.debuglevel:357 print "pySsh sending: " + response358 self.write(response + '\n')359360 banner += self.read_some()361362 if banner.find(logintext) != -1 or banner.find("Have") != -1 or \363 banner.find("$") != -1 or banner.find("#") != -1 or banner.find(">") != -1:364 self.password = password365 self.connected = True366 else:367 self.connected = False368 return banner369370 self.write("pwd\n")371 output = self.read_some()372 result = output.split("\n")373 if len(result) > 2:374 self.__prompt = result[len(result)-1]375 return banner376 377 def ssologin(self, logintext='Last login:'):378 retVal = True379 """Logs in to the ssh host. Checks for standard prompts, and calls380 the function passed as promptcb to process them.381 Returns the login banner, or 'None' if login process aborted.382 """383 text = ''384 self.open()385 banner = self.read_some()386 if self.debuglevel:387 print ">> 1st banner read is: %s" % banner388 389 promptcount = 0390 #print "=======",banner, "======="391 #print "logintext",logintext392 #if banner.find(logintext) > -1:393 # return banner394 abort = 0395 while banner.find(logintext) == -1:396 print banner397 if banner.lower().find("password") > -1:398 retVal = False399 abort = 1 400 print "Found password prompt on", self.host401 elif banner.lower().find("connection refused"):402 retVal = False403 abort = 1404 print "Connection refused from host", self.host405 elif (banner.lower().find('rsa key fingerprint') >= 0):406 """Found RSA key fingerprint request"""407 response = "yes"408 self.write(response + '\n')409 banner = self.read_some()410 if abort:411 self.connected = False412 return self.close()413 self.connected = True414 return banner415 416 def multipasslogin(self, password1, password2, password3, logintext='Last login:', prompt_callback=_prompt):417 """Logs in to the ssh host. Checks for standard prompts, and calls418 the function passed as promptcb to process them.419 Returns the login banner, or 'None' if login process aborted.420 """421 text = ''422 self.open()423 banner = self.read_some()424 if self.debuglevel:425 print ">> 1st banner read is: %s" % banner426 427 password = password1428 promptcount = 0429 while banner.find(logintext) == -1:430 print banner431 if banner.find("password") > -1:432 promptcount = promptcount + 1433 if promptcount == 1:434 password = password2435 elif promptcount == 2:436 password = password3437 response, abort = prompt_callback(banner, password=password)438 if abort:439 return self.close()440 print "pySsh sending: " + response441 self.write(response + '\n')442 banner = self.read_some()443 #end while444 if promptcount > 1:445 return "Fail"446 else:447 return banner448 449 def logout(self):450 """Logs out the session."""451 self.close()452 453 def sendcmd(self, cmd, timeout=0, readtype=READ_SOME):454 """Sends the command 'cmd' over the ssh connection, and returns the455 result. By default it uses read_some, which may block.456 """457 banner = self.__prompt.strip()458 cmd = cmd.strip() #+ ";cd ~"459 try:460 if cmd[-1] != '\n':461 cmd += '\n'462 463 self.write(cmd)464 if readtype == READ_ALL:465 return banner + self.read_all()466 elif readtype == READ_LAZY:467 return self.read_lazy()468 else:469 if timeout > 0:470 time.sleep(timeout)471 banner = banner + self.read_some()472 return banner473 except mysshError:474 self.close()475 476 477 def sudocmd(self, command):478 #sys.stdout.write(command + "\n")479 #sys.stdout.write("Password: " + self.password + "\n")480 sudooutput = self.interact("sudo " + command + "; sudo -k", "password", self.password)481 return sudooutput482 483 def interact(self, cmd, prompt, answer, readtype=READ_LW_ALL):484 try:485 cmd = cmd.strip() + "; cd ~"486 if cmd[-1] != '\n':487 cmd += '\n'488 self.write(cmd)489 if readtype == READ_ALL:490 if self.debuglevel:491 print "Ssh::interact() reading all"492 return self.read_all()493 elif readtype == READ_LAZY:494 if self.debuglevel:495 print "Ssh::interact() reading lazy"496 return self.read_lazy()497 elif readtype == READ_SOME:498 if self.debuglevel:499 print "Ssh::interact() reading lw_all"500 banner = self.read_some()501 response, abort = _lwiprompt(banner, prompt, answer)502 if response:503 if self.debuglevel:504 print "Prompt: ", prompt505 print "Response:", response506 self.write(response + '\n')507 banner = banner + self.read_some()508 else:509 if self.debuglevel:510 print "No response sent"511 print "Prompt: ", prompt512 print "Response:", response513 return banner514 else:515 if self.debuglevel:516 print "Ssh::interact() reading lw_all"517 banner = self.read_some()518 response, abort = _lwiprompt(banner, prompt, answer)519 if response:520 if self.debuglevel:521 print "Prompt: ", prompt522 print "Response:", response523 self.write(response + '\n')524 banner = banner + self.read_lw_all()525 #banner = banner + self.read_some()526 else:527 if self.debuglevel:528 print "No response sent"529 print "Prompt: ", prompt530 print "Response:", response531 #time.sleep(10)532 return banner533 except mysshError:534 if self.debuglevel:535 print "Ssh::interact encountered mysshError"536 self.close()537 return banner538539540 def runcmd_readall(self, cmd):541 try:542 cmd = cmd.strip() + "; cd ~"543 if cmd[-1] != '\n':544 cmd += '\n'545 self.write(cmd)546 if self.debuglevel:547 print "Ssh::interact() reading lw_all"548 banner = self.read_lw_all()549 return banner550 except mysshError:551 if self.debuglevel:552 print "Ssh::interact encountered mysshError"553 self.close()554 return banner555556 def setpass(self, cmd, prompt, answer):557 try:558 import commands559 readtype=READ_SOME560 if cmd[-1] != '\n':561 cmd += '\n'562 self.write(cmd)563 if readtype == READ_ALL:564 return self.read_all()565 elif readtype == READ_LAZY:566 return self.read_lazy()567 else:568 banner = self.read_some()569 response, abort = _lwiprompt(banner, prompt, answer)570 if response:571 #print "Banner: ", banner572 #print "Prompt: ", prompt573 #print "Response:", response574 self.write(response + '\n')575 banner = banner + self.read_some()576 response, abort = _lwiprompt(banner, prompt, answer)577 if response:578 #print "Banner: ", banner579 #print "Prompt: ", prompt580 #print "Response:", response581 self.write(response + '\n')582 banner = banner + self.read_some()583 #print "Wrote response"584 else:585 print "No response sent"586 print "Prompt: ", prompt587 print "Response:", response588 #self.sendcmd("sleep 0")589 #commands.getoutput("sleep 1")590 return banner591 except mysshError:592 self.close()593594 def changepass(self, oldpass, newpass):595 try:596 self.sendcmd("sleep 0")597 retVal = False598 cmd = "/opt/likewise/bin/passwd"599 readtype=READ_SOME600 if cmd[-1] != '\n':601 cmd += '\n'602 self.write(cmd)603 if readtype == READ_ALL:604 return self.read_all()605 elif readtype == READ_LAZY:606 return self.read_lazy()607 else:608 banner = self.read_some()609 response, abort = _lwiprompt(banner, "current", oldpass)610 if response:611 self.write(response + '\n')612 banner = banner + self.read_some()613 response, abort = _lwiprompt(banner, "new", newpass)614 if response:615 self.write(response + '\n')616 banner = banner + self.read_some()617 response, abort = _lwiprompt(banner, "password", newpass)618 if response:619 self.write(response + '\n')620 banner = banner + self.read_some()621 if banner.lower().find("success") > -1:622 retVal = True623 else:624 print "No password changed"625 #print "Prompt: ", prompt626 #print "Response:", response627 #self.write("sleep 3\n")628 return banner629 except mysshError:630 self.close()631 632def test():633 """Test routine for myssh.634
...
api.py
Source:api.py
...49 # # FUNCTION CODE: 0350 # # Register:0-0x0000 ===== on/off (2b)51 # send_mess = b"\x02\x03\x00\x00\x00\x01\x84\x39"52 # tn.write(send_mess)53 # raw_data['on_off'] = (tn.read_some().hex())54 #55 # # Register:26-0x0058 ===== Mode selection (2b)56 # send_mess = b"\x02\x03\x00\x1A\x00\x01\xA5\xFE"57 # tn.write(send_mess)58 # raw_data['mode_select'] = (tn.read_some().hex())59 #60 # # Register:50-0x0032 ===== Maximum DC voltageï¼PV) (2b)61 # send_mess = b"\x02\x03\x00\x32\x00\x01\x25\xF6"62 # tn.write(send_mess)63 # raw_data['max_DC_volt_PV'] = (tn.read_some().hex())64 #65 # # Register:58-0x003A ===== Maximum Output power (2b)66 # send_mess = b"\x02\x03\x00\x3A\x00\x01\xA4\x34"67 # tn.write(send_mess)68 # raw_data['max_output_P'] = (tn.read_some().hex())69 #70 # # Register:74-0x004A ===== Voltage reference (2b)71 # send_mess = b"\x02\x03\x00\x4A\x00\x01\xA5\xEF"72 # tn.write(send_mess)73 # raw_data['volt_ref'] = (tn.read_some().hex())74 # -----------------------------------------------75 # FUNCTION CODE: 0476 # Register:0-0x0000 ===== PV1 Voltage (2b+-)77 # send_mess = b"\x02\x04\x00\x00\x00\x01\x31\xF9"78 # tn.write(send_mess)79 # raw_data['PV1_volt'] = (tn.read_some().hex())80 #81 # # Register:3-0x0003 ===== PV1 DC current (2b+-)82 # send_mess = b"\x02\x04\x00\x03\x00\x01\xC1\xF9"83 # tn.write(send_mess)84 # raw_data['PV1_DC_cur'] = (tn.read_some().hex())85 #86 # # Register:4-0x0004 ===== Output voltage UV (2b+-)87 # send_mess = b"\x02\x04\x00\x04\x00\x01\x70\x38"88 # tn.write(send_mess)89 # raw_data['output_volt_UV'] = (tn.read_some().hex())90 #91 # # Register:5-0x0005 ===== Output voltage VW (2b+-)92 # send_mess = b"\x02\x04\x00\x05\x00\x01\x21\xF8"93 # tn.write(send_mess)94 # raw_data['output_volt_VW'] = (tn.read_some().hex())95 #96 # # Register:6-0x0006 ===== Output voltage WU (2b+-)97 # send_mess = b"\x02\x04\x00\x06\x00\x01\xD1\xF8"98 # tn.write(send_mess)99 # raw_data['output_volt_WU'] = (tn.read_some().hex())100 #101 # # Register:16-0x0010 ===== Output frequency (2b+-)102 # send_mess = b"\x02\x04\x00\x10\x00\x01\x30\x3C"103 # tn.write(send_mess)104 # raw_data['output_freq'] = (tn.read_some().hex())105 # Register:17-0x0011 ===== Battery power (2b+-)106 send_mess = b"\x02\x04\x00\x11\x00\x01\x61\xFC"107 tn.write(send_mess)108 raw_data['batt_P'] = (tn.read_some().hex())109 # Register:21-0x0015 ===== Power grid frequency (2b+-)110 # send_mess = b"\x02\x04\x00\x15\x00\x01\x20\x3D"111 # tn.write(send_mess)112 # raw_data['P_grid_freq'] = (tn.read_some().hex())113 #114 # # Register:22-0x0016 ===== Power factor symbol (screen none) (2b+-)115 # send_mess = b"\x02\x04\x00\x16\x00\x01\xD0\x3D"116 # tn.write(send_mess)117 # raw_data['PF_symbol'] = (tn.read_some().hex())118 #119 # # Register:23-0x0017 ===== Power factor (2b+-)120 # send_mess = b"\x02\x04\x00\x17\x00\x01\x81\xFD"121 # tn.write(send_mess)122 # raw_data['PF'] = (tn.read_some().hex())123 #124 # # Register:47-0x002F ===== Battery percentage (2b+)125 send_mess = b"\x02\x04\x00\x2F\x00\x01\x00\x30"126 tn.write(send_mess)127 raw_data['batt_percen'] = (tn.read_some().hex())128 #129 # # Register:49-0x0031 ===== Load active power (2b+)130 send_mess = b"\x02\x04\x00\x31\x00\x01\x60\x36"131 tn.write(send_mess)132 raw_data['load_act_P'] = (tn.read_some().hex())133 #134 # # Register:51-0x0033 ===== PV1 power (2b+-)135 # send_mess = b"\x02\x04\x00\x33\x00\x01\xC1\xF6"136 # tn.write(send_mess)137 # raw_data['PV1_P'] = (tn.read_some().hex())138 #139 # # Register:53-0x0035 ===== Load current A (2b+-)140 # send_mess = b"\x02\x04\x00\x35\x00\x01\x21\xF7"141 # tn.write(send_mess)142 # raw_data['load_cur_A'] = (tn.read_some().hex())143 #144 # # Register:54-0x0036 ===== Load current B (2b+-)145 # send_mess = b"\x02\x04\x00\x36\x00\x01\xD1\xF7"146 # tn.write(send_mess)147 # raw_data['load_cur_B'] = (tn.read_some().hex())148 #149 # # Register:55-0x0037 ===== Load current C (2b+-)150 # send_mess = b"\x02\x04\x00\x37\x00\x01\x80\x37"151 # tn.write(send_mess)152 # raw_data['load_cur_C'] = (tn.read_some().hex())153 #154 # # Register:62-0x003E ===== PV daily power generation (2b+)155 # send_mess = b"\x02\x04\x00\x3E\x00\x01\x50\x35"156 # tn.write(send_mess)157 # raw_data['PV_daily_P_gen'] = (tn.read_some().hex())158 #159 # # Register:64-0x0040 ===== PV total power generation High (2b+)160 # # Register:65-0x0041 ===== PV total power generation Low (2b+)161 # send_mess = b"\x02\x04\x00\x40\x00\x02\x70\x2C"162 # tn.write(send_mess)163 # raw_data['PV_total_P_gen'] = (tn.read_some().hex())164 #165 # # Register:80-0x0050 ===== Output reactive power (2b+-)166 # send_mess = b"\x02\x04\x00\x50\x00\x01\x31\xE8"167 # tn.write(send_mess)168 # raw_data['output_react_P'] = (tn.read_some().hex())169 #170 # # Register:82-0x0052 ===== Daily load consumption (2b+)171 # send_mess = b"\x02\x04\x00\x52\x00\x01\x90\x28"172 # tn.write(send_mess)173 # raw_data['daily_load_con'] = (tn.read_some().hex())174 #175 # # Register:88-0x0058 ===== Daily power intake from grid (2b+)176 # send_mess = b"\x02\x04\x00\x58\x00\x01\xB0\x2A"177 # tn.write(send_mess)178 # raw_data['daily_P_intake_grid'] = (tn.read_some().hex())179 #180 # # Register:90-0x005A ===== Total power intake from grid High (2b+)181 # # Register:91-0x005B ===== Total power intake from grid Low (2b+)182 # send_mess = b"\x02\x04\x00\x5A\x00\x02\x51\xEB"183 # tn.write(send_mess)184 # raw_data['total_P_intake_grid'] = (tn.read_some().hex())185 #186 # # Register:94-0x005E ===== Daily power fed to grid187 # send_mess = b"\x02\x04\x00\x5E\x00\x01\x50\x2B"188 # tn.write(send_mess)189 # raw_data['daily_P_fed_grid'] = (tn.read_some().hex())190 #191 # # Register:96-0x0060 ===== Total power fed to grid High (2b+)192 # # Register:97-0x0061 ===== Total power fed to grid Low (2b+)193 # send_mess = b"\x02\x04\x00\x60\x00\x02\x71\xE6"194 # tn.write(send_mess)195 # raw_data['total_P_fed_grid'] = (tn.read_some().hex())196 # Register:108-0x006C ===== PV total power (2b+-)197 send_mess = b"\x02\x04\x00\x6C\x00\x01\xF1\xE4"198 tn.write(send_mess)199 raw_data['PV_total_P'] = (tn.read_some().hex())200 # Register:113-0x0071 ===== Output power (2b+-)201 send_mess = b"\x02\x04\x00\x71\x00\x01\x61\xE2"202 tn.write(send_mess)203 raw_data['output_P'] = (tn.read_some().hex())204 #205 # # Register:166-0x00A6 ===== SOH (2b+)206 send_mess = b"\x02\x04\x00\xA6\x00\x01\xD1\xDA"207 tn.write(send_mess)208 raw_data['SOH'] = (tn.read_some().hex())209 #210 # # Register:176-0x00B0 ===== BMS battery status (1b)211 # send_mess = b"\x02\x04\x00\xB0\x00\x01\x30\x1E"212 # tn.write(send_mess)213 # raw_data['BMS_batt_status'] = (tn.read_some().hex())214 for k,v in raw_data.items():215 raw_data[k] = ('0'*((int(v[4:6])*2) - len(v[6:-4]))) + v[6:-4]216 # closed connection217 tn.close()218 self.getDeviceStatusJson(raw_data)219 self.printDeviceStatus()220 if getDeviceStatusResult==True:221 self.set_variable('offline_count', 0)222 else:223 self.set_variable('offline_count', self.get_variable('offline_count')+1)224 except Exception as er:225 print (er)226 # self.set_variable('batt_P', (0))227 # self.set_variable('batt_percen', (0))...
install.py
Source:install.py
...74 smoothie.write(command)75 LOG[command] = "OK"76 if LOG_PRINT:77 print(LOG)78 return command + ' ' +str(smoothie.read_some())79def testes(smoothie, es):80 if es == 'x':81 index = 682 elif es == 'y':83 index = 1484 elif es =='z':85 index = 2286 while True:87 smoothie.write(ES)88 s = smoothie.read_some()89 if s=='':90 s = smoothie.read_some()91 elif s[0]=="o":92 s = smoothie.read_some()93 if s[index] == "1":94 while True:95 smoothie.write(ES)96 s = smoothie.read_some()97 if s=='':98 s = smoothie.read_some()99 elif s[0]=="o":100 s = smoothie.read_some()101 if s[index] == "0":102 return "End Stop "+es +" OK"103 time.sleep(0.1)104# --------------------------------------------------------105# -------------------- route function --------------------106# --------------------------------------------------------107@app.route('/', methods=['POST', 'GET'])108def init():109 if request.method == 'POST':110 LOG['PIC']= request.form['Tech']111 global PIC112 PIC=request.form['Tech']113 SERIAL_NUMBER = request.form['SN']114 changeConfigValue("ROBOT_SN", SERIAL_NUMBER)115 changeConfigValue("UI_LANGUAGE", request.form['language'])116 changeConfigValue("NTRIP_USER", request.form['ntripuser'])117 changeConfigValue("NTRIP_PASSWORD", request.form['ntrippswd'])118 changeConfigValue("NTRIP_CASTER", request.form['ntripcaster'])119 configVPN(SERIAL_NUMBER)120 return redirect(url_for('smoothie_iframe'))121 else:122 return render_template('init.html')123@app.route('/smoothie_iframe', methods=['POST', 'GET'])124def smoothie_iframe():125 if request.method == 'POST':126 return redirect(url_for('cam'))127 else:128 return render_template('smoothie_iframe.html')129@app.route('/cam', methods=['POST', 'GET'])130def cam():131 global PIC132 if PIC is None:133 return redirect("/")134 global camSP135 if request.method == 'GET':136 os.system("sudo systemctl restart nvargus-daemon")137 camSP=startLiveCam()138 return render_template('cam.html')139 else:140 LOG['CAMERA']= 'OK'141 if LOG_PRINT:142 print(LOG)143 os.killpg(os.getpgid(camSP.pid), signal.SIGINT)144 camSP.wait()145 return redirect(url_for('xyd'))146@app.route('/xyd', methods=['POST', 'GET'])147def xyd():148 global PIC149 if PIC is None:150 return redirect("/")151 with connectors.SmoothieV11SerialConnector(utility.get_smoothie_vesc_addresses()["smoothie"], config.SMOOTHIE_BAUDRATE) as smoothie:152 smoothie.write("G91")153 smoothie.read_some()154 smoothie.write("G91")155 LOG['G91'] = 'OK'156 if LOG_PRINT:157 print(LOG)158 if request.method == 'GET':159 return render_template('xyd.html', answer=str(utility.get_smoothie_vesc_addresses()["smoothie"]))160 else:161 if request.form['x'] == 'next':162 return redirect(url_for('vesc_pr'))163@app.route('/vesc_pr', methods=['POST', 'GET'])164def vesc_pr():165 global PIC166 if PIC is None:167 return redirect("/")168 if request.method == 'GET':169 return render_template('vesc_pr.html')170 else:171 if request.form['x'] == 'test_motor':172 with adapters.VescAdapter(config.VESC_RPM_SLOW, config.VESC_MOVING_TIME, config.VESC_ALIVE_FREQ,173 config.VESC_CHECK_FREQ, config.VESC_PORT, config.VESC_BAUDRATE) as vesc_engine:174 vesc_engine.set_rpm(RPM)175 vesc_engine.set_moving_time(MOVING_TIME)176 vesc_engine.start_moving()177 vesc_engine.wait_for_stop()178 return render_template('vesc_pr.html', answer='test_motor command sent')179 elif request.form['x'] == 'next':180 return redirect(url_for('vesc_z'))181@app.route('/vesc_z', methods=['POST', 'GET'])182def vesc_z():183 global PIC184 if PIC is None:185 return redirect("/")186 with connectors.SmoothieV11SerialConnector(utility.get_smoothie_vesc_addresses()["smoothie"], config.SMOOTHIE_BAUDRATE) as smoothie:187 smoothie.write("G91")188 smoothie.read_some()189 smoothie.write("G91")190 smoothie.read_some()191 if request.method == 'GET':192 return render_template('vesc_z.html')193 else:194 if request.form['x'] == 'test_motor':195 return test_smoothie(smoothie, TEST_Z, VESC_Z_URL) 196 return render_template('vesc_pr.html', answer='test_motor command sent')197 elif request.form['x'] == 'next':198 return redirect(url_for('gps'))199@app.route('/gps', methods=['POST', 'GET'])200def gps():201 global PIC202 if PIC is None:203 return redirect("/")204 if request.method == 'GET':205 return render_template('gps.html')206 else:207 if request.form['x'] == 'test_gps':208 with adapters.GPSUbloxAdapter(config.GPS_PORT, config.GPS_BAUDRATE, config.GPS_POSITIONS_TO_KEEP) as gps:209 return render_template('gps.html', answer = gps.get_fresh_position())210 return redirect(url_for('final'))211@app.route('/final', methods=['POST', 'GET'])212def final():213 global PIC214 if PIC is None:215 return redirect("/")216 if request.method == 'GET':217 if LOG_PRINT:218 print(LOG)219 return render_template('final.html', answer = LOG)220 else:221 return redirect(url_for('final'))222# -----------------------------------------------------------223# -------------------- socketio function --------------------224# -----------------------------------------------------------225@socketio.on('data', namespace='/server')226def on_socket_data(data):227 print(data)228 with connectors.SmoothieV11SerialConnector(utility.get_smoothie_vesc_addresses()["smoothie"], config.SMOOTHIE_BAUDRATE) as smoothie:229 smoothie.write("G91")230 smoothie.read_some()231 smoothie.write("G91")232 LOG['G91'] = 'OK'233 if LOG_PRINT:234 print(LOG)235 if data == "x+":236 to_emit = test_smoothie(smoothie, XP, XYD_URL)237 elif data == "x-": 238 to_emit = test_smoothie(smoothie, XN, XYD_URL),239 elif data == "y+": 240 to_emit = test_smoothie(smoothie, YP, XYD_URL),241 elif data == "y-": 242 to_emit = test_smoothie(smoothie, YN, XYD_URL),243 elif data == "d+": 244 to_emit = test_smoothie(smoothie, DP, XYD_URL),...
sequential.py
Source:sequential.py
...54 55 def testWelcomeMessage(self):56 """On connecting the server sends a 220 response with a welcome message."""57 client = Telnet('localhost', 1025)58 self.assertEqual(client.read_some(),59 '220 test node.js smtpevent server 0.0.2\r\n'60 )61 client.close()62 63 def testUnknownCommand(self):64 """Unknown commands are ignored and the client informed."""65 66 client = Telnet('localhost', 1025)67 client.read_some()68 client.write('EHLO')69 self.assertEqual(client.read_some(), 70 '502 Error: command "EHLO" not implemented\r\n')71 client.close()72 73 def testIllegalHelo(self):74 """HELO takes a single argument."""75 76 client = Telnet('localhost', 1025)77 client.read_some()78 client.write('HELO')79 self.assertEqual(client.read_some(), '501 Syntax: HELO hostname\r\n')80 client.close()81 82 def testLegalHelo(self):83 """The server responds to a valid HELO command."""84 85 client = Telnet('localhost', 1025)86 client.read_some()87 client.write('HELO localhost')88 self.assertEqual(client.read_some(), '250 test Hello 127.0.0.1\r\n')89 client.close()90 91 def testMultipleHelo(self):92 """Only a single HELO command is allowed per connection."""93 94 client = Telnet('localhost', 1025)95 client.read_some()96 client.write('HELO localhost')97 self.assertEqual(client.read_some(), '250 test Hello 127.0.0.1\r\n')98 client.write('HELO localhost')99 self.assertEqual(client.read_some(), '503 Duplicate HELO/EHLO\r\n')100 client.close()101 102 def testIllegalNoop(self):103 """The NOOP command fails if any argument is passed."""104 105 client = Telnet('localhost', 1025)106 client.read_some()107 client.write('NOOP something else here')108 self.assertEqual(client.read_some(), '501 Syntax: NOOP\r\n')109 client.close()110 111 def testLegalNoop(self):112 """The NOOP command takes no arguments."""113 114 client = Telnet('localhost', 1025)115 client.read_some()116 client.write('NOOP')117 self.assertEqual(client.read_some(), '250 Ok\r\n')118 client.close()119 120 def testQuit(self):121 """The QUIT command doesn't care about arguments - the connection is122 closed regardless."""123 124 client = Telnet('localhost', 1025)125 client.read_some()126 client.write('QUIT')127 self.assertEqual(client.read_some(), '221 test closing connection\r\n')128 129 client = Telnet('localhost', 1025)130 client.read_some()131 client.write('QUIT See you later')132 self.assertEqual(client.read_some(), '221 test closing connection\r\n')133 client.close()134 135 def testIllegalRset(self):136 """The RSET command fails if any argument is passed."""137 138 client = Telnet('localhost', 1025)139 client.read_some()140 client.write('RSET now')141 self.assertEqual(client.read_some(), '501 Syntax: RSET\r\n')142 client.close()143 144 def testLegalRset(self):145 """The RSET command takes no arguments."""146 147 client = Telnet('localhost', 1025)148 client.read_some()149 client.write('RSET')150 self.assertEqual(client.read_some(), '250 Ok\r\n')151 client.close()152 153 def testMailNoFrom(self):154 """The MAIL command requires FROM: to follow it."""155 156 client = Telnet('localhost', 1025)157 client.read_some()158 client.write('MAIL')159 self.assertEqual(client.read_some(),160 '501 Syntax: MAIL FROM:<address>\r\n')161 client.close()162 163 def testMailInvalidFrom(self):164 """The MAIL command requires FROM: to contain an email address."""165 166 client = Telnet('localhost', 1025)167 client.read_some()168 client.write('MAIL FROM:')169 self.assertEqual(client.read_some(),170 '501 Syntax: MAIL FROM:<address>\r\n')171 client.close()172 173 def testMailFromParse(self):174 """The MAIL command will extract the email address from the FROM:."""175 176 client = Telnet('localhost', 1025)177 client.read_some()178 client.write('MAIL FROM:<person@example.com>')179 self.assertEqual(client.read_some(), '250 Ok\r\n')180 client.close()181 182 def testMailFromParse(self):183 """The MAIL command handles empty addresses"""184 185 client = Telnet('localhost', 1025)186 client.read_some()187 client.write('MAIL FROM:<>')188 self.assertEqual(client.read_some(), '250 Ok\r\n')189 client.close()190 191 def testDuplicateMailCommand(self):192 """Nested MAIL commands are not allowed."""193 194 client = Telnet('localhost', 1025)195 client.read_some()196 client.write('MAIL FROM:<me@example.com>')197 self.assertEqual(client.read_some(), '250 Ok\r\n')198 client.write('MAIL FROM:<me@example.com>')199 self.assertEqual(client.read_some(), '503 Error: nested MAIL command\r\n')200 client.close()201 202 def testRcptWithoutMail(self):203 """The RCPT command must be preceded by the MAIL command."""204 205 client = Telnet('localhost', 1025)206 client.read_some()207 client.write('RCPT TO:<me@example.com>')208 self.assertEqual(client.read_some(), '503 Error: need MAIL command\r\n')209 client.close()210 211 def testRcptWithoutTo(self):212 """The RCPT command must contain TO:<address> as the argument."""213 214 client = Telnet('localhost', 1025)215 client.read_some()216 client.write('MAIL FROM:<you@example.com>')217 self.assertEqual(client.read_some(), '250 Ok\r\n')218 client.write('RCPT')219 self.assertEqual(client.read_some(), '501 Syntax: RCPT TO: <address>\r\n')220 client.close()221 222 def testRcptEmptyTo(self):223 """The RCPT command cannot have an empty TO:."""224 225 client = Telnet('localhost', 1025)226 client.read_some()227 client.write('MAIL FROM:<you@example.com>')228 self.assertEqual(client.read_some(), '250 Ok\r\n')229 client.write('RCPT TO:')230 self.assertEqual(client.read_some(), '501 Syntax: RCPT TO: <address>\r\n')231 client.close()232 233 def testMultipleRcpts(self):234 """Multiple RCPT commands can be issued to add recipients."""235 236 client = Telnet('localhost', 1025)237 client.read_some()238 client.write('MAIL FROM:<you@example.com>')239 self.assertEqual(client.read_some(), '250 Ok\r\n')240 241 for rcpt in self.addrs:242 client.write('RCPT TO:<%s>' % rcpt)243 self.assertEqual(client.read_some(), '250 Ok\r\n')244 client.close()245 246 def testDataWithoutRcpt(self):247 """The DATA command must be preceded by the RCPT TO: command."""248 249 client = Telnet('localhost', 1025)250 client.read_some()251 client.write('DATA')252 self.assertEqual(client.read_some(), '503 Error: need RCPT command\r\n')253 client.close()254 255 def testDataResponse(self):256 """The DATA instructs the client to end the message with <CR><LF>.<CR><LF>."""257 258 client = Telnet('localhost', 1025)259 client.read_some()260 client.write('MAIL FROM:<you@example.com>')261 self.assertEqual(client.read_some(), '250 Ok\r\n')262 client.write('RCPT TO:<me@example.com>')263 self.assertEqual(client.read_some(), '250 Ok\r\n')264 client.write('DATA')265 self.assertEqual(client.read_some(),266 '354 End data with <CR><LF>.<CR><LF>\r\n')267 client.close()268 269 def testDataArgument(self):270 """The DATA command does not take any arguments."""271 272 client = Telnet('localhost', 1025)273 client.read_some()274 client.write('MAIL FROM:<you@example.com>')275 self.assertEqual(client.read_some(), '250 Ok\r\n')276 client.write('RCPT TO:<me@example.com>')277 self.assertEqual(client.read_some(), '250 Ok\r\n')278 client.write('DATA some data here')279 self.assertEqual(client.read_some(), '501 Syntax: DATA\r\n')280 client.close()281 282if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!