Best Python code snippet using fMBT_python
tello_scratch_if.py
Source:tello_scratch_if.py
1# -*- coding: utf-8 -*-2#3# èªå·±è²¬ä»»ã§ããèªç±ã«ã使ããã ããã4# 2020å¹´4æåæ¥5# uma3san6#7### to scratch8from http.server import BaseHTTPRequestHandler, HTTPServer9from urllib.parse import parse_qs, urlparse10### to tello11import threading12import socket13import sys14import queue15import cv216from time import sleep17from collections import deque18scratchAdrs = ( 'localhost', 8001 ) # PC <-> Scratch on PC19telloCmdAdrs = ( '192.168.10.1', 8889 ) # Tello <-> PC ( 50602 )20pcAdrs2SendCmd2Tello = ( '0.0.0.0', 50602 ) # 50602: 決ãã¦ããã°ä½ã§ãå¯ãä¸ã¯åããªãã21 # tello 㯠PCã(192.168.10.2)ã¨ãã¦ãã22rcvTelloStateAdrs = ( '0.0.0.0', 8890 ) # Tello -> PC23rcvTelloVideoURL = ( 'udp://@0.0.0.0:11111' ) # Tello -> PC24INTERVAL = 0.225class SendCmd(threading.Thread):26 def __init__(self):27 self.sendSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # socket for receiving tello state28 self.sendSock.bind( pcAdrs2SendCmd2Tello ) # program ãåèµ·åããæãtello ã®åèµ·åãé¿ããããã«è¨å®ã29 self.sendSock.settimeout(10.0)30 self.finishSignal = False31 self.result = False32 if self.connect() == True:33 super().__init__(daemon=True)34 self.start()35 self.result = True36 37 def connect(self):38 print("-> try to connect the tello", end="")39 counter = 540 while counter > 0:41 self.sendSock.sendto('command'.encode('utf-8'), telloCmdAdrs) # tello ã command ã¢ã¼ãã«ãã42 try:43 response = self.sendSock.recvfrom(1024) # ctl-c ã§çµäºããã¨ãtelloãåèµ·åããªãã¨åããªã -> client.bind ã§åèµ·åã¯ä¸è¦ã«44 # recvfrom ã¯ããã¼ã¿ãåãåãã¨æ»ã£ã¦ããã(buffer ãä¸æ¯ã§ãªãã¦ã)45 print(response) 46 if b'ok' in response:47 print("-> connected : set tello to command mode")48 return True49 except socket.timeout:50 print('\n???? socket timeout : ', counter-1, end="" )51 counter -= 152 except KeyboardInterrupt:53 print('~~CTRL-C')54 return False55 else:56 return False57 #58 # Thread of sending command from PC to Tello59 #60 def run(self):61 while True:62 if self.finishSignal == True:63 self.sendSock.close()64 print(".... SendCmd : socket closed")65 return66 67 if len(cmdQue) != 0:68 if 'emergency' in cmdQue:69 # é£è¡åæ¢ã³ãã³ã70 cmd = 'emergency'71 cmdQue.clear()72 else:73 # é常ã®ã³ãã³ã74 cmd = cmdQue.popleft()75 print('{} {} {}'.format("->", cmd, " ... "), end="")76 self.sendSock.sendto(cmd.encode('utf-8'), telloCmdAdrs)77 try:78 response = self.sendSock.recvfrom(1024) # (b'ok', ('192.168.10.1', 8889)) 79 # byte str int ã® tuple80 if b'ok' in response:81 print( response[0].decode('utf-8'), response[1][0], response[1][1])82 pass 83 elif b'error' in response:84 print( response[0].decode('utf-8'), response[1][0], response[1][1])85 # sys.exit()86 except socket.timeout:87 print("???? send command error : recvfrom time out")88 continue89 else:90 pass91 def kill_thread(self):92 self.finishSignal = True93class ReceiveTelloState(threading.Thread):94 def __init__(self):95 self.rcvSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # socket for receiving tello state96 self.rcvSock.bind(rcvTelloStateAdrs)97 self.rcvSock.settimeout(1.0)98 self.finishSignal = False99 super().__init__(daemon=True)100 self.start()101 #102 # The thread of receiving tello state103 #104 def run(self):105 while True:106 if self.finishSignal == True:107 self.rcvSock.close()108 print(".... ReceiveTelloState : socket closed")109 return110 try:111 response, ip = self.rcvSock.recvfrom(1024)112 except socket.timeout as ex:113 print("???? ReceiveTelloState : ", ex) # tello ã®é»æºãåã£ã¦ããæããã® thread ã®åªå
度ãé«ãã®ãï¼114 continue115 if b'ok' in response:116 print("**ok") # state ã¯ãok ãè¿ããªãã117 continue118 #print("state = ", response)119 out = response.rstrip(b'\r\n') # æå¾ã®æ¹è¡æåãåé¤120 out = out.replace( b';', b'\n')121 out = out.replace( b':', b' ' )122 #print(out)123 stateQue.append(out) # scratch ã«éãããã« queue ã«ç©ã124 125 sleep(INTERVAL)126 def kill_thread(self):127 self.finishSignal = True128#129# http server for scratch editor130#131class MyHTTPRequestHandler(BaseHTTPRequestHandler):132 def do_GET(self):133 parsed_path = urlparse(self.path)134 #print(type(parsed_path))135 if parsed_path.path == '/poll':136 #137 # scratch editor ã polling ãã138 #139 if len(stateQue) == 0:140 # print(" empty")141 pass142 else:143 state = stateQue.popleft()144 self.wfile.write( state ) # state ã scrach ã«145 # print( state )146 return147 148 #print(type(self.path))149 #print(parsed_path.path)150 #print('path = {}'.format(self.path))151 #print('parsed: path = {}, query = {}'.format(parsed_path.path, parse_qs(parsed_path.query)))152 #153 # scratch editor ã tello command ãéã£ã154 #155 _com = parsed_path.path.replace('/', ' ')156 _com = _com[1:]157 cmdQue.append(_com)158class StartHttpServer(threading.Thread):159 def __init__(self):160 super().__init__(daemon=True)161 self.start()162 #self.finishSignal = False163 def run(self):164 scratchSvr = HTTPServer(scratchAdrs, MyHTTPRequestHandler)165 try:166 scratchSvr.serve_forever()167 except KeyboardInterrupt:168 print(".... startHttpServer : Exit startHttpServer by KeyboardInterrupt")169 def kill_thread(self):170 print(".... StartHttpServer : shutdown")171 self.scratchSvr.shutdown()172 173#174# å¤é¨ãããã® thread ãåæ¢ããæ¹æ³ã¯åãããªã175# ctl-c ã§ã¬ããã¯ãã ãã176# 177class MyInput(threading.Thread):178 def __init__(self):179 self.finishSignal = False # self.start() ã®å¾ã«æ¸ã㨠run() ã® self.finish ãå
ã«åç
§ããã¦ãno attribute ã® error ãåºãããã 180 self.queue = deque()181 super().__init__(daemon=True)182 self.start()183 def run(self):184 while True:185 if self.finishSignal == True:186 #187 # ãããå®è¡ããããã¨ã¯ãªãã£ãã188 #189 print("....MyInput : close")190 return191 try:192 t = input()193 except Exception as ex: # ctl-c ã§ãã®ä¾å¤ãçºçãã¦ãããã°ã©ã ãçµäºãããããã 194 print(".... MyInput : ctl-c") # ãã®è¡ã¯è¡¨ç¤ºããã195 self.queue.append("!!!! ctl-c") # ããã°ã©ã ãæ¢ã¾ãã¾ã§æéãããã196 break197 198 if t != None:199 self.queue.append(t)200 201 def input(self, block=True, timeout=None):202 if len(self.queue) == 0:203 return None204 else:205 return self.queue.popleft()206 #207 # ãã®é¢æ°ã¯ä½¿ããªãããå¾ã®åèã®ããã«æ®ãã¦ãã208 #209 def kill_thread(self):210 self.finishSignal = True211def kill_all_thread():212 sendCmd.kill_thread()213 receiveTelloState.kill_thread()214 #cin.kill_thread()215 #startHttpServer.kill_thread()216if __name__ == "__main__":217 #218 # ããã«æ¸ãã¦ããä¸è¨ class ã®ä¸ã§åç
§ã§ãã219 #220 stateQue = deque() # tello state -> ãã®ããã°ã©ã -> scratch221 cmdQue = deque() # tell cmd <- ãã®ããã°ã©ã <- scratch222 223 #224 # all thread start225 #226 sendCmd = SendCmd() # class ãªã®ã§ () ã¯å¿
è¦ã ã¤ã³ã¹ã¿ã³ã¹ãªã() ã¯ä¸è¦ã¨æãããã227 if sendCmd.result == False:228 print( "\n???? error : can't connect to the tello")229 sys.exit()230 231 receiveTelloState = ReceiveTelloState()232 startHttpServer = StartHttpServer()233 cin = MyInput()234 cmdQue.append('command')235 cmdQue.append('streamon')236 VS_UDP_IP = '0.0.0.0'237 VS_UDP_PORT = 11111238 #udp_video_address = 'udp://@' + VS_UDP_IP + ':' + str(VS_UDP_PORT)239 #cap = cv2.VideoCapture(udp_video_address)240 cap = cv2.VideoCapture(rcvTelloVideoURL)241 #cap.open(udp_video_address)242 cap.open(rcvTelloVideoURL)243 #objects = [ sendCmd, receiveTelloState, startHttpServer, cin ]244 while True:245 try:246 msg = cin.input()247 except KeyboardInterrupt:248 print("....main : Exit KeyboardInterrupt")249 kill_all_thread() # ã©ã¡ãã250 # map( lambda func:func.kill_thread(), objects )251 break 252 if msg == None:253 pass254 255 elif '!!!!' in msg:256 break257 258 else:259 if 'quit' in msg:260 print ('.... main : Exit by <quit>')261 kill_all_thread()262 break 263 else:264 cmdQue.append(msg)265 ret, frame = cap.read()266 cv2.imshow('frame', frame)267 268 if cv2.waitKey(1) & 0xFF == ord('q'):269 print ('.... main : Exit by <q> on the video window')270 kill_all_thread()271 break 272 cap.release()273 cv2.destroyAllWindows()274 sendCmd.join()275 print("<<<< join sendCmd")276 receiveTelloState.join()277 print("<<<< join receiveTelloState")278 startHttpServer.kill_thread()279 startHttpServer.join() # ãããå®è¡ãããã¨ããã°ã©ã ãçµäºãã280 print("<<<< join startHttpServer") # ããããä¸ãå®è¡ããããã¨ã¯ãªãã£ã281 cin.join()...
httpserver_test.py
Source:httpserver_test.py
1p# -*- coding: utf-8 -*-2"""3Created on Thu Jun 20 17:27:48 20194@author: THP65"""6# coding:utf-87import threading8import time9import socket10import sys11from wsclient_test import startClient12#import turn_server13from turn_server import turnServer14from multiprocessing import Process15def handle_client(client_socket):16 """17 å¤ç客æ·ç«¯è¯·æ±18 """19 request_data = client_socket.recv(1024)20 print("request data:", request_data)21 # æé ååºæ°æ®22 response_start_line = "HTTP/1.1 200 OK\r\n"23 response_headers = "Server: My server\r\n"24 response_body = "<h1>Python HTTP Test</h1>"25 response = response_start_line + response_headers + "\r\n" + response_body26 # å客æ·ç«¯è¿åååºæ°æ®27 client_socket.send(bytes(response, "utf-8"))28 # å
³é客æ·ç«¯è¿æ¥29 client_socket.close()30def startHttpServer(name):31 print ("startHttpServer")32 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)33 server_socket.bind(("", 8000))34 server_socket.listen(128)35 while True:36 client_socket, client_address = server_socket.accept()37 print("[%s, %s]ç¨æ·è¿æ¥ä¸äº" % client_address)38 handle_client_process = Process(target=handle_client, args=(client_socket,))39 handle_client_process.start()40 client_socket.close()41 42def startWebsocket(ts):43 print ("startWebsocket")44 #ts=turnServer();45 ts.startWebsocket() 46 print ("startWebsocket end")47def startConnetServer(ws):48 try:49 ws.connect()50 ws.run_forever()51 except KeyboardInterrupt:52 ws.close()53 54if __name__ == "__main__":55 ts=None56 try:57 print ("start") 58 ts=turnServer();59 '''60 t1 = threading.Thread(target=startHttpServer,args=('ws1', ))61 t1.start()62 print ("start1ed")63 '''64 t2 = threading.Thread(target=startWebsocket,args=(ts, ))65 t2.start()66 print ("start2ed")67 68 69 #startClient()70 71 while True:72 msg = input(">> ").strip()73 if("e"==msg):74 print("exit")75 sys.exit(0)76 elif("2"==msg):77 print("live")78 else:79 print("cmd:",msg)80 81 time.sleep(10000)82 83 except:84 print ("Error: unable to start thread")85 86 87 88 89 90 ...
main.py
Source:main.py
1#!/usr/bin/python32import time3import sys4import os5import random6import numpy as np7from execute.sequentialEvaluation import sequentialEvaluation #function8from execute.generateBatches import generateBatches #function9from execute.generateBehaviour import generateBehaviour #function10from execute.startHttpServer import startHttpServer #function11from execute.resultsVerification import resultsVerification #function12from batch.batch import Batch #class13from batchDefinition.aBatchDefinition import ABatchDefinition #class14def main2():15 start = time.time()16 sequentialEvaluation()17 end = time.time()18 print()19 print("Time: " + format(end - start, '.5f') + " s")20if __name__ == "__main__":21 #generateBatches()22 if len(sys.argv) == 2 and sys.argv[1] == "-generateBatches":23 generateBatches()24 if len(sys.argv) == 2 and sys.argv[1] == "-generateBehaviours":25 generateBehaviour()26 if len(sys.argv) == 2 and sys.argv[1] == "-startHttpServer":27 startHttpServer()28 if len(sys.argv) == 2 and sys.argv[1] == "-resultVerification":29 resultsVerification()30 if len(sys.argv) == 1:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!