Best Python code snippet using autotest_python
fh0a.py
Source:fh0a.py
...21 :param wait_time: çå¾
æ¶é´ï¼åä½ä¸ºç§22 """23 jsSleepWithCallbackEvery(24 wait_time * 1000, 50,25 lambda: self._receive_msg()26 )27 # startTime = time.time() * 100028 # endTime = time.time() * 100029 # last = 030 # while endTime - startTime < wait_time * 1000:31 # if (endTime - startTime) > (last * 50):32 # self._receive_msg()33 # last = last + 134 # endTime = time.time() * 100035 # pass36 def _split_state(self, acc: Dict, x: str) -> Dict:37 if ':' in x:38 p = x.split(':')39 acc[p[0]] = p[1]40 return acc41 def _receive_msg(self) -> None:42 """43 解æè¿åæ°æ®44 :return: None45 """46 msgs: List[str] = getBufMsgList().split('\n')47 for msg in msgs:48 m: List[str] = msg.split(' ')49 if len(m) >= 3:50 if m[1] == '0' and m[2] == 'status':51 states: str = m[3]52 st: Dict[str, Any] = reduce(53 self._split_state,54 states.split(';'),55 {}56 )57 if m[0] in self.uav_statement:58 self.uav_statement[m[0]].update(st)59 else:60 self.uav_statement[m[0]] = st61 # update `is_flying` state from h/lock_flag info62 if 'lock_flag' in self.uav_statement[m[0]]:63 self.uav_statement[m[0]]['is_flying'] = self.uav_statement[m[0]]['lock_flag']64 if 'loc_x' in self.uav_statement[m[0]]:65 self.uav_statement[m[0]]['x'] = self.uav_statement[m[0]]['loc_x']66 if 'loc_y' in self.uav_statement[m[0]]:67 self.uav_statement[m[0]]['y'] = self.uav_statement[m[0]]['loc_y']68 if 'high' in self.uav_statement[m[0]]:69 self.uav_statement[m[0]]['h'] = self.uav_statement[m[0]]['high']70 elif m[1] != '0':71 # cmd table72 cId = int(m[1]) - 173 if cId in self.cmd_table:74 tt = self.cmd_table[cId]75 self.cmd_table[cId] = (tt[0], msg)76 pass77 pass78 def _sendCmd(self, command: str, cmdId: int) -> bool:79 self.tag = self.tag + 180 # cmd table81 self.cmd_table[cmdId] = (command, None)82 self._receive_msg()83 return sendCmd(command)84 def _open(self, port: str) -> bool:85 """86 commandå½æ°ç¨äºè¿æ¥æ 人æº87 :param port:é£é¸¿0Aæ 人æºä¸ºç«¯å£å· COM3 ï¼TTæ 人æºä¸ºipå°å88 :return:è¿æ¥ç¶ææ¯å¦æåï¼æ¯åè¿åTrue,å¦åè¿åFalse89 """90 # if port in self.uav_statement:91 # return True92 # command = port + ' ' + str(self.tag * 2 + 1) + ' command'93 command = f"{port} {self.tag * 2 + 1} open"94 back = self._sendCmd(command, self.tag * 2 + 1)95 # return back96 return True97 # é£é¸¿çipå符串为端å£å·ï¼TTçipå符串为ipå°å98 def add_uav(self, port: str) -> None:99 """100 input_uavå½æ°ç¨äºæ·»å æ 人æº101 :param port:é£é¸¿0Aæ 人æºçipå符串为端å£å·ï¼TTæ 人æºçipå符串为ipå°å102 """103 if self._open(port):104 y = {'x': '', 'y': '', 'h': '', 'is_flying': False}105 self.uav_statement[port] = y106 self._receive_msg()107 def get_position(self, port: str) -> Tuple[str, str, str]:108 """109 get_positionå½æ°ç¨äºè·åæ 人æºå½åä½ç½®110 :param port: æ 人æºç«¯å£å·111 :return:h,x,y112 """113 self._receive_msg()114 if port in self.uav_statement:115 st: Dict[str, Any] = self.uav_statement[port]116 return st['x'], st['y'], st['h']117 h = x = y = ''118 return h, x, y119 def get_state(self, port: str) -> Union[Dict, None]:120 """121 get_positionå½æ°ç¨äºè·åæ 人æºå½åä½ç½®122 :param port: æ 人æºç«¯å£å·123 :return:h,x,y124 """125 self._receive_msg()126 if port in self.uav_statement:127 st: Dict[str, Any] = self.uav_statement[port]128 return st129 return None130 def is_tag_ok(self, port: str):131 """132 is_dot_ok å½æ°ç¨äºæ£æ¥æ 人æºå½åæ¯å¦å·²ç»æ£æµå°æå®äºç»´ç 133 :param port: æ 人æºç«¯å£å·134 :return: "0" or "1"135 """136 return self.uav_statement[port].get('is_tag_ok', '0')137 def is_dot_ok(self, port: str):138 """139 is_dot_ok å½æ°ç¨äºæ£æ¥æ 人æºå½åæ¯å¦å·²ç»æ£æµå°æå®é¢è²è²å æ å·²ç»æ£æµå°äº¤ç¹140 :param port: æ 人æºç«¯å£å·141 :return: "0" or "1"142 """143 return self.uav_statement[port].get('is_dot_ok', '0')144 def show_uav_list(self) -> None:145 """146 show_uav_listå½æ°ç¨äºæ¥çæææ 人æº147 :return:string148 """149 self._receive_msg()150 for (port, state) in self.uav_statement.items():151 print("port: {0} state: {1}".format(port, state))152 def _send_commond_without_return(self, command: str, cmdId: int) -> bool:153 return self._sendCmd(str(command), cmdId)154 def _send_commond_with_return(self, command: str, cmdId: int, timeout: int = RESPONSE_TIMEOUT) -> Union[str, None]:155 self._sendCmd(str(command), cmdId)156 timestamp = time.time()157 while 1:158 jsSleepWithCallbackEvery(159 200, 20,160 lambda: self._receive_msg()161 )162 if time.time() - timestamp > timeout:163 return None164 if self.cmd_table[cmdId][1] != None:165 continue166 else:167 return self.cmd_table[cmdId][1]168 def land(self, port: str) -> Literal[True]:169 """170 landå½æ°ç¨äºæ§å¶æ 人æºéè½171 :param port: æ 人æºç«¯å£å·172 """173 self._receive_msg()174 # if not self.uav_statement[port]['is_flying']:175 # return True176 command = f"{port} {self.tag * 2 + 1} land"177 back = self._send_commond_without_return(command, self.tag * 2 + 1)178 # back = self._send_commond_with_return(command, self.tag * 2 + 1, timeout = 20)179 self.uav_statement[port]['is_flying'] = False180 return True181 def emergency(self, port: str) -> Literal[True]:182 """183 emergency å½æ°ç¨äºæ§å¶æ 人æºç´§æ¥éè½184 :param port: æ 人æºç«¯å£å·185 """186 self._receive_msg()187 # if not self.uav_statement[port]['is_flying']:188 # return True189 command = f"{port} {self.tag * 2 + 1} emergency"190 back = self._send_commond_without_return(command, self.tag * 2 + 1)191 # back = self._send_commond_with_return(command, self.tag * 2 + 1, timeout = 20)192 self.uav_statement[port]['is_flying'] = False193 return True194 def takeoff(self, port: str, high: int) -> Literal[True]:195 """196 takeoffå½æ°ç¨äºæ§å¶æ 人æºèµ·é£197 :param port: æ 人æºç«¯å£å·198 :param high:èµ·é£é«åº¦ï¼åç±³ï¼199 """200 self._receive_msg()201 # if self.uav_statement[port]['is_flying']:202 # return True203 command = f"{port} {self.tag * 2 + 1} takeoff {high}"204 back = self._send_commond_without_return(command, self.tag * 2 + 1)205 # back = self._send_commond_with_return(command, self.tag * 2 + 1, timeout = 20)206 self.uav_statement[port]['is_flying'] = True207 return True208 def up(self, port: str, distance: int) -> bool:209 """210 up åä¸ç§»å¨211 :param port: æ 人æºç«¯å£å·212 :param distance:移å¨è·ç¦»ï¼åç±³ï¼213 """214 self._receive_msg()215 if not self.uav_statement[port]['is_flying']:216 return False217 command = f"{port} {self.tag * 2 + 1} up {distance}"218 self._send_commond_without_return(command, self.tag * 2 + 1)219 return True220 def down(self, port: str, distance: int) -> bool:221 """222 down åä¸ç§»å¨223 :param port: æ 人æºç«¯å£å·224 :param distance:移å¨è·ç¦»ï¼åç±³ï¼225 """226 self._receive_msg()227 if not self.uav_statement[port]['is_flying']:228 return False229 command = f"{port} {self.tag * 2 + 1} down {distance}"230 self._send_commond_without_return(command, self.tag * 2 + 1)231 return True232 def forward(self, port: str, distance: int) -> bool:233 """234 forward åå移å¨235 :param port: æ 人æºç«¯å£å·236 :param distance:移å¨è·ç¦»ï¼åç±³ï¼237 """238 self._receive_msg()239 if not self.uav_statement[port]['is_flying']:240 return False241 command = f"{port} {self.tag * 2 + 1} forward {distance}"242 self._send_commond_without_return(command, self.tag * 2 + 1)243 return True244 def back(self, port: str, distance: int) -> bool:245 """246 back åå移å¨247 :param port: æ 人æºç«¯å£å·248 :param distance:移å¨è·ç¦»ï¼åç±³ï¼249 """250 self._receive_msg()251 if not self.uav_statement[port]['is_flying']:252 return False253 command = f"{port} {self.tag * 2 + 1} back {distance}"254 self._send_commond_without_return(command, self.tag * 2 + 1)255 return True256 def left(self, port: str, distance: int) -> bool:257 """258 left å左移å¨259 :param port: æ 人æºç«¯å£å·260 :param distance:移å¨è·ç¦»ï¼åç±³ï¼261 """262 self._receive_msg()263 if not self.uav_statement[port]['is_flying']:264 return False265 command = f"{port} {self.tag * 2 + 1} left {distance}"266 self._send_commond_without_return(command, self.tag * 2 + 1)267 return True268 def right(self, port: str, distance: int) -> bool:269 """270 right åå³ç§»å¨271 :param port: æ 人æºç«¯å£å·272 :param distance:移å¨è·ç¦»ï¼åç±³ï¼273 """274 self._receive_msg()275 if not self.uav_statement[port]['is_flying']:276 return False277 command = f"{port} {self.tag * 2 + 1} right {distance}"278 self._send_commond_without_return(command, self.tag * 2 + 1)279 return True280 def _move(self, port: str, direct: int, distance: int) -> bool:281 """282 move å½æ°ç¨äºæ§å¶æ 人æºç§»å¨283 :param port: æ 人æºç«¯å£å·284 :param direct:移å¨æ¹åï¼1ä¸2ä¸3å4å5å·¦6å³ï¼285 :param distance:移å¨è·ç¦»ï¼åç±³ï¼286 """287 self._receive_msg()288 if not self.uav_statement[port]['is_flying']:289 return False290 command = f"{port} {self.tag * 2 + 1} move {direct} {distance}"291 self._send_commond_without_return(command, self.tag * 2 + 1)292 return True293 def goto(self, port: str, x: int, y: int, h: int) -> bool:294 """295 goto å½æ°ç¨äºæ§å¶æ 人æºå°è¾¾æå®ä½ç½®296 :param port: æ 人æºç«¯å£å·297 :param x: xè½´æ¹åä½ç½®ï¼åç±³ï¼298 :param y: yè½´æ¹åä½ç½®ï¼åç±³ï¼299 :param h: é«åº¦ï¼åç±³ï¼300 """301 self._receive_msg()302 if not self.uav_statement[port]['is_flying']:303 return False304 command = f"{port} {self.tag * 2 + 1} goto {x} {y} {h}"305 self._send_commond_without_return(command, self.tag * 2 + 1)306 return True307 def flip(self, port: str, direction: str) -> bool:308 """309 flipå½æ°ç¨äºæ§å¶æ 人æºç¿»æ»310 :param port: æ 人æºç«¯å£å·311 :param direction: ç¿»æ»æ¹åï¼få bå lå·¦ rå³ï¼312 """313 self._receive_msg()314 if not self.uav_statement[port]['is_flying']:315 return False316 command = f"{port} {self.tag * 2 + 1} flip {direction} 1"317 self._send_commond_without_return(command, self.tag * 2 + 1)318 return True319 def rotate(self, port: str, degree: int) -> bool:320 """321 rotateå½æ°ç¨äºæ§å¶æ 人æºèªè½¬322 :param port: æ 人æºç«¯å£å·323 :param degree: èªè½¬æ¹åå大å°ï¼æ£æ°é¡ºæ¶éï¼è´æ°éæ¶éï¼åä½ä¸ºåº¦æ°ï¼324 """325 self._receive_msg()326 if not self.uav_statement[port]['is_flying']:327 return False328 command = f"{port} {self.tag * 2 + 1} rotate {degree}"329 self._send_commond_without_return(command, self.tag * 2 + 1)330 return True331 def cw(self, port: str, degree: int) -> bool:332 """333 cw æ§å¶æ 人æºé¡ºæ¶éèªè½¬334 :param port: æ 人æºç«¯å£å·335 :param degree: èªè½¬è§åº¦åº¦æ°336 """337 self._receive_msg()338 if not self.uav_statement[port]['is_flying']:339 return False340 command = f"{port} {self.tag * 2 + 1} cw {degree}"341 self._send_commond_without_return(command, self.tag * 2 + 1)342 return True343 def ccw(self, port: str, degree: int) -> bool:344 """345 ccw å½æ°ç¨äºæ§å¶æ 人æºéæ¶éèªè½¬346 :param port: æ 人æºç«¯å£å·347 :param degree: èªè½¬è§åº¦åº¦æ°348 """349 self._receive_msg()350 if not self.uav_statement[port]['is_flying']:351 return False352 command = f"{port} {self.tag * 2 + 1} ccw {degree}"353 self._send_commond_without_return(command, self.tag * 2 + 1)354 return True355 def speed(self, port: str, speed: int) -> bool:356 """357 speedå½æ°ç¨äºæ§å¶æ 人æºé£è¡é度358 :param port: æ 人æºç«¯å£å·359 :param speed: é£è¡é度ï¼0-200åç±³/ç§ï¼360 """361 self._receive_msg()362 # if not self.uav_statement[port]['is_flying']:363 # return False364 command = f"{port} {self.tag * 2 + 1} setSpeed {speed}"365 self._send_commond_without_return(command, self.tag * 2 + 1)366 return True367 def high(self, port: str, high: int) -> bool:368 """369 highç¨äºæ§å¶æ 人æºé£è¡é«åº¦370 :param port: æ 人æºç«¯å£å·371 :param high: é£è¡é«åº¦ï¼åç±³ï¼372 """373 self._receive_msg()374 if not self.uav_statement[port]['is_flying']:375 return False376 command = f"{port} {self.tag * 2 + 1} setHeight {high}"377 self._send_commond_without_return(command, self.tag * 2 + 1)378 return True379 def led(self, port: str, r: int, g: int, b: int) -> bool:380 """381 ledå½æ°æ§å¶æ 人æºç¯å
382 :param port: æ 人æºç«¯å£å·383 :param r: ç¯å
é¢è²Réé384 :param g: ç¯å
é¢è²Géé385 :param b: ç¯å
é¢è²Béé386 """387 self._receive_msg()388 # if not self.uav_statement[port]['is_flying']:389 # return False390 command = f"{port} {self.tag * 2 + 1} light {r} {g} {b}"391 self._send_commond_without_return(command, self.tag * 2 + 1)392 return True393 def bln(self, port: str, r: int, g: int, b: int) -> bool:394 """395 ledå½æ°æ§å¶æ 人æºç¯å
ï¼å¼å¸ç¯396 :param port: æ 人æºç«¯å£å·397 :param r: ç¯å
é¢è²Réé398 :param g: ç¯å
é¢è²Géé399 :param b: ç¯å
é¢è²Béé400 """401 self._receive_msg()402 # if not self.uav_statement[port]['is_flying']:403 # return False404 command = f"{port} {self.tag * 2 + 1} bln {r} {g} {b}"405 self._send_commond_without_return(command, self.tag * 2 + 1)406 return True407 def rainbow(self, port: str, r: int, g: int, b: int) -> bool:408 """409 ledå½æ°æ§å¶æ 人æºç¯å
ï¼ä¸å½©åæ¢410 :param port: æ 人æºç«¯å£å·411 :param r: ç¯å
é¢è²Réé412 :param g: ç¯å
é¢è²Géé413 :param b: ç¯å
é¢è²Béé414 """415 self._receive_msg()416 # if not self.uav_statement[port]['is_flying']:417 # return False418 command = f"{port} {self.tag * 2 + 1} rainbow {r} {g} {b}"419 self._send_commond_without_return(command, self.tag * 2 + 1)420 return True421 def mode(self, port: str, mode: int) -> bool:422 """423 modeå½æ°ç¨äºåæ¢é£è¡æ¨¡å¼424 :param port: æ 人æºç«¯å£å·425 :param mode:é£è¡æ¨¡å¼ï¼1常è§2巡线3è·é4åæºç¼éï¼426 """427 self._receive_msg()428 # if not self.uav_statement[port]['is_flying']:429 # return False430 command = f"{port} {self.tag * 2 + 1} airplane_mode {mode}"431 self._send_commond_without_return(command, self.tag * 2 + 1)432 return True433 # def _visionMode(self, port: str, mode: int) -> bool:434 # """435 # visionModeå½æ°ç¨äºè®¾ç½®è§è§å·¥ä½æ¨¡å¼436 # :param port: æ 人æºç«¯å£å·437 # :param mode:è§è§å·¥ä½æ¨¡å¼ï¼1ç¹æ£æµ2线æ£æµ3æ ç¾æ£æµ4äºç»´ç æ«æ5æ¡å½¢ç æ«æï¼438 # """439 # self._receive_msg()440 # if not self.uav_statement[port]['is_flying']:441 # return False442 # command = f"{port} {self.tag * 2 + 1} visionMode {mode}"443 # self._send_commond_without_return(command, self.tag * 2 + 1)444 # return True445 def color_detect(self, port: str, L_L: int, L_H: int, A_L: int, A_H: int, B_L: int, B_H: int) -> bool:446 """447 visionColorå½æ°ç¨äºè®¾ç½®è§è§å·¥ä½æ¨¡å¼ä¸ºè²åæ£æµ448 :param port: æ 人æºç«¯å£å·449 :param L_L: è²åLééçæä½æ£æµå¼450 :param L_H: è²åLééçæé«æ£æµæ¤451 :param A_L: è²åAééçæä½æ£æµæ¤452 :param A_H: è²åAééçæé«æ£æµå¼453 :param B_L: è²åBééçæä½æ£æµæ¤454 :param B_H: è²åBééçæé«æ£æµæ¤z455 """456 self._receive_msg()457 # if not self.uav_statement[port]['is_flying']:458 # return False459 command = f"{port} {self.tag * 2 + 1} colorDetect {L_L} {L_H} {A_L} {A_H} {B_L} {B_H}"460 self._send_commond_without_return(command, self.tag * 2 + 1)461 return True462 def color_detect_label(self, port: str, label: str) -> bool:463 """464 visionColorå½æ°ç¨äºè®¾ç½®è§è§å·¥ä½æ¨¡å¼ä¸ºè²åæ£æµ465 :param port: æ 人æºç«¯å£å·466 :param label: é¢å®ä¹è²å½©æ ç¾å467 """468 self._receive_msg()469 # if not self.uav_statement[port]['is_flying']:470 # return False471 command = f"{port} {self.tag * 2 + 1} colorDetectLabel {label}"472 self._send_commond_without_return(command, self.tag * 2 + 1)473 return True474 # def patrol_line_direction(self, port: str, direction: int) -> bool:475 # """476 # patrol_line_directionå½æ°ç¨äºåæ¢æ 人æºå·¡çº¿æ¹å477 # :param port: æ 人æºç«¯å£å·478 # :param direction:巡线æ¹åï¼1å2å3å·¦4å³ï¼479 # """480 # self._receive_msg()481 # if not self.uav_statement[port]['is_flying']:482 # return False483 # command = f"{port} {self.tag * 2 + 1} patrol_line_direction {direction}"484 # self._send_commond_without_return(command, self.tag * 2 + 1)485 # return True486 # def identify_label(self, port: str, id: int) -> bool:487 # """488 # identify_labelå½æ°ç¨äºæå®è¯å«æ个æ ç¾å·489 # :param port: æ 人æºç«¯å£å·490 # :param id:ç®æ æ ç¾å·ï¼è®¾ç½®ååªè¯å«è¯¥å·æ ç¾491 # """492 # self._receive_msg()493 # if not self.uav_statement[port]['is_flying']:494 # return False495 # command = f"{port} {self.tag * 2 + 1} identify_label {id}"496 # self._send_commond_without_return(command, self.tag * 2 + 1)497 # return True498 #499 # def toward_move_label(self, port: str, direction: int, distance: int, id: int) -> bool:500 # """501 # toward_move_labelå½æ°æå®æ 人æºç§»å¨æè·ç¦»å¯»æ¾æå·æ ç¾502 # :param port: æ 人æºç«¯å£å·503 # :param direction:移å¨æ¹åï¼1ä¸2ä¸3å4å5å·¦6å³ï¼504 # :param distance:移å¨è·ç¦»ï¼åç±³ï¼505 # :param id:ç®æ æ ç¾å·ï¼ç§»å¨è¿ç¨ä¸çå°è¯¥æ ç¾ä¼èªå¨æ¬åå¨ä¸æ¹506 # """507 # self._receive_msg()508 # if not self.uav_statement[port]['is_flying']:509 # return False510 # command = f"{port} {self.tag * 2 + 1} toward_move_label {direction} {distance} {id}"511 # self._send_commond_without_return(command, self.tag * 2 + 1)512 # return True513 #514 # def move_up_label(self, port: str, distance: int, id: int) -> bool:515 # """516 # move_up_label å½æ°æå®æ 人æºåä¸ç§»å¨æè·ç¦»å¯»æ¾æå·æ ç¾517 # :param port: æ 人æºç«¯å£å·518 # :param distance:移å¨è·ç¦»ï¼åç±³ï¼519 # :param id:ç®æ æ ç¾å·ï¼ç§»å¨è¿ç¨ä¸çå°è¯¥æ ç¾ä¼èªå¨æ¬åå¨ä¸æ¹520 # """521 # self._receive_msg()522 # if not self.uav_statement[port]['is_flying']:523 # return False524 # command = f"{port} {self.tag * 2 + 1} move_up_label {distance} {id}"525 # self._send_commond_without_return(command, self.tag * 2 + 1)526 # return True527 #528 # def move_down_label(self, port: str, distance: int, id: int) -> bool:529 # """530 # move_down_label å½æ°æå®æ 人æºåä¸ç§»å¨æè·ç¦»å¯»æ¾æå·æ ç¾531 # :param port: æ 人æºç«¯å£å·532 # :param distance:移å¨è·ç¦»ï¼åç±³ï¼533 # :param id:ç®æ æ ç¾å·ï¼ç§»å¨è¿ç¨ä¸çå°è¯¥æ ç¾ä¼èªå¨æ¬åå¨ä¸æ¹534 # """535 # self._receive_msg()536 # if not self.uav_statement[port]['is_flying']:537 # return False538 # command = f"{port} {self.tag * 2 + 1} move_down_label {distance} {id}"539 # self._send_commond_without_return(command, self.tag * 2 + 1)540 # return True541 #542 # def move_forward_label(self, port: str, distance: int, id: int) -> bool:543 # """544 # move_forward_label å½æ°æå®æ 人æºåå移å¨æè·ç¦»å¯»æ¾æå·æ ç¾545 # :param port: æ 人æºç«¯å£å·546 # :param distance:移å¨è·ç¦»ï¼åç±³ï¼547 # :param id:ç®æ æ ç¾å·ï¼ç§»å¨è¿ç¨ä¸çå°è¯¥æ ç¾ä¼èªå¨æ¬åå¨ä¸æ¹548 # """549 # self._receive_msg()550 # if not self.uav_statement[port]['is_flying']:551 # return False552 # command = f"{port} {self.tag * 2 + 1} move_forward_label {distance} {id}"553 # self._send_commond_without_return(command, self.tag * 2 + 1)554 # return True555 #556 # def move_back_label(self, port: str, distance: int, id: int) -> bool:557 # """558 # move_back_label å½æ°æå®æ 人æºåå移å¨æè·ç¦»å¯»æ¾æå·æ ç¾559 # :param port: æ 人æºç«¯å£å·560 # :param distance:移å¨è·ç¦»ï¼åç±³ï¼561 # :param id:ç®æ æ ç¾å·ï¼ç§»å¨è¿ç¨ä¸çå°è¯¥æ ç¾ä¼èªå¨æ¬åå¨ä¸æ¹562 # """563 # self._receive_msg()564 # if not self.uav_statement[port]['is_flying']:565 # return False566 # command = f"{port} {self.tag * 2 + 1} move_back_label {distance} {id}"567 # self._send_commond_without_return(command, self.tag * 2 + 1)568 # return True569 #570 # def move_left_label(self, port: str, distance: int, id: int) -> bool:571 # """572 # move_left_label å½æ°æå®æ 人æºå左移å¨æè·ç¦»å¯»æ¾æå·æ ç¾573 # :param port: æ 人æºç«¯å£å·574 # :param distance:移å¨è·ç¦»ï¼åç±³ï¼575 # :param id:ç®æ æ ç¾å·ï¼ç§»å¨è¿ç¨ä¸çå°è¯¥æ ç¾ä¼èªå¨æ¬åå¨ä¸æ¹576 # """577 # self._receive_msg()578 # if not self.uav_statement[port]['is_flying']:579 # return False580 # command = f"{port} {self.tag * 2 + 1} move_left_label {distance} {id}"581 # self._send_commond_without_return(command, self.tag * 2 + 1)582 # return True583 #584 # def move_right_label(self, port: str, distance: int, id: int) -> bool:585 # """586 # move_right_label å½æ°æå®æ 人æºåå³ç§»å¨æè·ç¦»å¯»æ¾æå·æ ç¾587 # :param port: æ 人æºç«¯å£å·588 # :param distance:移å¨è·ç¦»ï¼åç±³ï¼589 # :param id:ç®æ æ ç¾å·ï¼ç§»å¨è¿ç¨ä¸çå°è¯¥æ ç¾ä¼èªå¨æ¬åå¨ä¸æ¹590 # """591 # self._receive_msg()592 # if not self.uav_statement[port]['is_flying']:593 # return False594 # command = f"{port} {self.tag * 2 + 1} move_right_label {distance} {id}"595 # self._send_commond_without_return(command, self.tag * 2 + 1)596 # return True597 # def obstacle_range(self, port: str, distance: int) -> bool:598 # """599 # obstacle_rangeå½æ°ç¨äºè®¾ç½®éç¢ç©æ£æµèå´600 # :param port: æ 人æºç«¯å£å·601 # :param distance:æ£æµèå´ï¼åç±³ï¼602 # """603 # self._receive_msg()604 # if not self.uav_statement[port]['is_flying']:605 # return False606 # command = f"{port} {self.tag * 2 + 1} obstacle_range {distance}"607 # self._send_commond_without_return(command, self.tag * 2 + 1)608 # return True609 # def solenoid(self, port: str, switch: int) -> bool:610 # """611 # solenoidå½æ°ç¨äºæ 人æºçµç£éæ§å¶612 # :param port: æ 人æºç«¯å£å·613 # :param switch:çµç£éæ§å¶ï¼0å
³é1æå¼ï¼614 # """615 # self._receive_msg()616 # if not self.uav_statement[port]['is_flying']:617 # return False618 # command = f"{port} {self.tag * 2 + 1} solenoid {switch}"619 # self._send_commond_without_return(command, self.tag * 2 + 1)620 # return True621 # def steering(self, port: str, angle: int) -> bool:622 # """623 # steeringå½æ°ç¨äºæ 人æºèµæºæ§å¶624 # :param port: æ 人æºç«¯å£å·625 # :param angle:èµæºè§åº¦ï¼+/-90度ï¼626 # """627 # self._receive_msg()628 # if not self.uav_statement[port]['is_flying']:629 # return False630 # command = f"{port} {self.tag * 2 + 1} steering {angle}"631 # self._send_commond_without_return(command, self.tag * 2 + 1)632 # return True633 def stop(self, port: str) -> bool:634 return self.hover(port)635 def hover(self, port: str) -> bool:636 """637 hover å½æ°ç¨äºæ§å¶æ 人æºæ¬å638 :param port: æ 人æºç«¯å£å·639 """640 self._receive_msg()641 if not self.uav_statement[port]['is_flying']:642 return False643 command = f"{port} {self.tag * 2 + 1} hover"644 self._send_commond_without_return(command, self.tag * 2 + 1)645 return True646 # def end(self) -> None:647 # """648 # endå½æ°ç¨äºéè½ææç¼éæ 人æº649 # """650 # for (port, uav) in self.uav_statement.items():651 # if uav['is_flying']:652 # self.land(port)653 def set_single_setting(self, port: str, mode: int, channel: int, address: int):654 """655 设置åæºè®¾ç½®656 :param port: æ 人æºç«¯å£å·657 :param mode: 0ä½é1ä¸é2é«é658 :param channel: éä¿¡ä¿¡é0~125659 :param address: éä¿¡å°å660 :return:661 """662 self._receive_msg()663 command = f"{port} {self.tag * 2 + 1} single_setting {mode} {channel} {address}"664 self._send_commond_without_return(command, self.tag * 2 + 1)665 pass666 def set_multiply_setting(self, port: str, mode: int, airplaneNumber: int, channel: int, address: int):667 """668 设置å¤æºæ¨¡å¼è®¾ç½®669 :param port: æ 人æºç«¯å£å·670 :param mode: 0åæºæ¨¡å¼1ç¼é模å¼671 :param airplaneNumber: é£æºç¼å·672 :param channel: éä¿¡ä¿¡é0~125673 :param address: éä¿¡å°å674 :return:675 """676 self._receive_msg()677 command = f"{port} {self.tag * 2 + 1} multiply_setting {mode} {airplaneNumber} {channel} {address}"678 self._send_commond_without_return(command, self.tag * 2 + 1)679 return True680 def read_multi_setting(self, port: str):681 """682 读åå¤æºæ¨¡å¼è®¾ç½®683 :param port: æ 人æºç«¯å£å·684 :return:685 """686 self._receive_msg()687 command = f"{port} {self.tag * 2 + 1} read_multi_setting"688 self._send_commond_without_return(command, self.tag * 2 + 1)689 return True690 def read_single_setting(self, port: str):691 """692 读ååæºæ¨¡å¼è®¾ç½®693 :param port: æ 人æºç«¯å£å·694 :return:695 """696 self._receive_msg()697 command = f"{port} {self.tag * 2 + 1} read_single_setting"698 self._send_commond_without_return(command, self.tag * 2 + 1)699 return True700 def read_hardware_setting(self, port: str):701 """702 读å硬件信æ¯703 :param port: æ 人æºç«¯å£å·704 :return:705 """706 self._receive_msg()707 command = f"{port} {self.tag * 2 + 1} read_hardware_setting"708 self._send_commond_without_return(command, self.tag * 2 + 1)709 return True710 def cleanup(self):711 """712 cleanupå½æ°ç¨äºæ¸
çå
é¨ç¶æ表713 :return:714 """715 self.uav_statement = {}716 self.cmd_table = {}717 # def __del__(self):...
sock_chat.py
Source:sock_chat.py
...27 def _send_msg(self, message, remote_socket):28 remote_socket.send(bytes(message,"utf-8"))29 logging.info(f"{self} sent message: {message}")30 31 def _receive_msg(self, remote_socket):32 message = remote_socket.recv(RECV_SIZE).decode("utf-8")33 logging.info(f"{self} received message: {message}")34 return message35class Client(Socket):36 def __init__(self, id) -> None:37 self.id = id38 self.socket = self._client_socket()39 greeting = self._receive_msg(self.socket)40 print(greeting)41 def Send_msg(self,receiver_id, message_body):42 message_header = f"<{self.id}#{receiver_id}>"43 message = message_header + message_body44 request = f'Send_msg("{message}")'45 46 self._send_msg(request, self.socket)47 48 def Check_msg(self): #-> str:49 messages = self._server_fetch_msg()50 logging.info(f"The messages are: {messages}")51 return messages52 def _server_fetch_msg(self):53 request = f'Check_msg({self.id})'54 self._send_msg(request, self.socket)55 messages = []56 msg = " "57 # while msg != "":58 msg = self._receive_msg(self.socket)59 messages.append(msg)60 return messages61class Server(Socket):62 def __init__(self) -> None:63 self.pending_msg = {}64 # self.clients = []65 # self.conection_ids = []66 # conversation = ()67 self.socket = self._server_socket()68 listening = threading.Thread(target=self._accept_conections)69 listening.start()70 def _accept_conections(self):71 while True:72 client_socket, address = self.socket.accept()73 print(f"Connection from {address} has been established!")74 self._send_msg("Welcome to the server!", client_socket)75 # msg = self._receive_msg(clientsocket)76 # self.clients.append(client_socket)77 handler_thread = threading.Thread(target=self._handle_connection, args=(client_socket,))78 handler_thread.start()79 def _handle_connection(self, client_sock):80 while True:81 try:82 message = self._receive_msg(client_sock)83 self._handle_msg(message, client_sock)84 except:85 logging.error("some error!")86 client_sock.close()87 break88 89 def _handle_msg(self, message, client_sock):90 # handle = {91 # "Check_msg" : self._fetch_msg,92 # "Send_msg" : self._receive_chat93 # }94 logging.info(f"message to handle: {message}")95 request, body = message[:-1].split("(")96 # handle[request](body)...
client.py
Source:client.py
...30 super().__init__()31 self._socket_host = socket_host32 self._socket_port = socket_port33 self.server_url = urllib.parse.urlsplit(f"http://{host}:{port}/")34 async def _receive_msg(self):35 reader, writer = await asyncio.open_connection(self._socket_host, self._socket_port)36 data = await reader.read(100)37 msg = NMEASentence(data.decode())38 return msg 39 async def send_message(self):40 """Send messages from queue"""41 url = self.server_url42 msg = await self._receive_msg()43# if url.scheme == 'https':44# reader, writer = await asyncio.open_connection(45# url.hostname, 443, ssl=True)46# else:47 reader, writer = await asyncio.open_connection(48 url.hostname, url.port)49 data = json.dumps(msg.as_dict())50 query = (51 f"POST {url.path or '/'} HTTP/1.0\r\n"52 f"Host: {url.hostname}\r\n"53 f"User-Agent: python/requests2.0.1\r\n"54 f"Content-Length: {len(data)}\r\n"55 f"\r\n{data}\r\n"56 )...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!