Best Python code snippet using playwright-python
robot.py
Source:robot.py
...28 try:29 return self.get_robot_mode() != RobotState.DISCONNECTED30 except grpc.RpcError:31 return False32 def _sync(self):33 if self._sync_flag:34 self.sync()35 def sync(self) -> None:36 """37 åæ¥ï¼çå¾
æå®å½ä»¤æ§è¡å®æ38 """39 try:40 self.rcs.Sync(Empty())41 except grpc.RpcError:42 pass43 def sleep(self, time: int) -> None:44 """45 çå¾
46 :param time: çå¾
æ¶é´,åä½æ¯«ç§47 """48 try:49 self.rcs.Sleep(rc.SleepRequest(time=time))50 except grpc.RpcError:51 pass52 def start_sys(self) -> None:53 """54 å¯å¨55 """56 # self._sync()57 self.rcs.StartSys(Empty())58 def stop_sys(self) -> None:59 """60 å
³é61 """62 # self._sync()63 self.rcs.StopSys(Empty())64 def powerdown(self) -> None:65 """66 å
³éçµæº67 """68 # self._sync()69 self.rcs.PowerDown(Empty())70 def stop(self) -> None:71 """72 åæ¢ç¨åº73 """74 # self._sync()75 self.rcs.Stop(Empty())76 def estop(self) -> None:77 """78 æ¥å79 """80 # self._sync()81 self.rcs.EStop(Empty())82 def teach_mode(self) -> None:83 """84 å¼å¯ç¤ºæ模å¼85 """86 self._sync()87 self.rcs.TeachMode(Empty())88 def end_teach_mode(self) -> None:89 """90 å
³é示æ模å¼91 """92 self._sync()93 self.rcs.EndTeachMode(Empty())94 def resume(self) -> None:95 """96 æ¢å¤æºå¨äºº97 """98 # self._sync()99 self.rcs.Resume(Empty())100 def pause(self) -> None:101 """102 æåæºå¨äºº103 """104 # self._sync()105 self.rcs.Pause(Empty())106 def get_robot_mode(self) -> RobotState:107 """108 è·åæºå¨äººç¶æ109 :return: æºå¨äººç¶æ110 """111 self._sync()112 res = self.rcs.GetRobotMode(Empty())113 return RobotState(res.mode)114 def get_velocity_factor(self) -> float:115 """116 è·åé度å å117 :return: é度å å118 """119 self._sync()120 res = self.rcs.GetVelocityFactor(Empty())121 return res.value122 def set_velocity_factor(self, factor: float) -> None:123 """124 设置é度å å125 :param factor: é度å å126 """127 self._sync()128 self.rcs.SetVelocityFactor(rc.Factor(value=factor))129 self._sync()130 def set_gravity(self, x: float = 0, y: float = 0, z: float = -9.8) -> None:131 """132 设置éå133 :param x: éåX134 :param y: éåY135 :param z: éåZ136 """137 if type(x) is tuple or type(x) is list:138 y = x[1]139 z = x[2]140 x = x[0]141 self._sync()142 self.rcs.SetGravity(msg.Coordinate(x=x, y=y, z=z))143 def get_gravity(self) -> (float, float, float):144 """145 è·åéå146 :return: éåï¼ç±»åï¼tupleï¼(x,y,z)147 """148 self._sync()149 res = self.rcs.GetGravity(Empty())150 return (res.x, res.y, res.z)151 def set_payload(self, x: float = 0, y: float = 0, z: float = -9.8, mass: float = 0) -> None:152 """153 设置è´è·154 :param x: è´è·X155 :param y: è´è·Y156 :param z: è´è·Z157 :param mass: è´è·çè´¨é158 """159 if type(x) is tuple:160 if type(y) is not tuple and type(x[0]) is tuple:161 y = x[1]162 x = x[0]163 mass = y164 z = x[2]165 y = x[1]166 x = x[0]167 self._sync()168 self.rcs.SetPayload(msg.Payload(169 mass=mass, cog=msg.Coordinate(x=x, y=y, z=z)))170 def get_payload(self) -> ((float, float, float), float):171 """172 è·åè´è·173 :return: è´è·ä¿¡æ¯ï¼ç±»åï¼tupleï¼((x, y, z), è´è·çè´¨é)174 """175 self._sync()176 res = self.rcs.GetPayload(Empty())177 return ((res.cog.x, res.cog.y, res.cog.z), res.mass)178 def set_payload_mass(self, mass: float) -> None:179 """180 设置è´è·çè´¨é181 :param mass: è´è·çè´¨é182 """183 self._sync()184 self.rcs.SetPayloadMass(msg.PayloadMass(mass=mass))185 def get_payload_mass(self) -> float:186 """187 è·åè´è·çè´¨é188 :return: è´è·çè´¨é189 """190 self._sync()191 res = self.rcs.GetPayloadMass(Empty())192 return res.mass193 def set_payload_cog(self, x: float = 0, y: float = 0, z: float = 0) -> None:194 """195 设置è´è·çè´¨å¿196 :param x: è´¨å¿X197 :param y: è´¨å¿Y198 :param z: è´¨å¿Z199 """200 if type(x) is tuple or type(x) is list:201 y = x[1]202 z = x[2]203 x = x[0]204 self._sync()205 self.rcs.SetPayloadCog(msg.PayloadCog(206 cog=msg.Coordinate(x=x, y=y, z=z)))207 def get_payload_cog(self) -> (float, float, float):208 """209 è·åè´è·çè´¨å¿210 :return: è´¨å¿ï¼ç±»åï¼tupleï¼(x,y,z)211 """212 self._sync()213 res = self.rcs.GetPayloadCog(Empty())214 return res.cog.x, res.cog.y, res.cog.z215 def set_tcp(self, x: float = 0, y: float = 0, z: float = 0, rz: float = 0, ry: float = 0, rx: float = 0) -> None:216 """217 设置工å
·ä¸å¿ç¹ï¼TCPï¼åæ ï¼åæ å¼ç¸å¯¹äºå·¥å
·åæ ç³»ã218 :param x: TCP åæ x219 :param y: TCP åæ y220 :param z: TCP åæ z221 :param rz: æ转è§åº¦ rz222 :param ry: æ转è§åº¦ ry223 :param rx: æ转è§åº¦ rx224 """225 self._sync()226 tcp = CartesianPose(x, y, z, rz, ry, rx)227 self.rcs.SetTcp(tcp._to_PR())228 def get_tcp(self) -> CartesianPose:229 """230 è·åTCP231 :return: TCPåæ 232 """233 self._sync()234 res = self.rcs.GetTcp(Empty())235 return CartesianPose(res)236 def get_claw_aio(self, pin: str) -> float:237 """238 è·åæçªåæ°239 :param pin: 'force'ï¼æçªåï¼'weight'ï¼æçªéé240 :return: æ ¹æ®åæ°ï¼è·åæçªåæè
éé241 示ä¾ï¼242 >>> claw_force = self.get_claw_aio("force")243 >>> claw_weight = self.get_claw_aio("weight")244 """245 self._sync()246 pin = pin.lower()247 if pin == 'force':248 res = self.rcs.GetClawHoldOn(Empty())249 return res.hold_on250 elif pin == 'weight':251 res = self.rcs.GetClawWeight(Empty())252 return res.weight253 else: # pin == 'amplitude':254 res = self.rcs.GetClawAmplitude(Empty())255 return res.amplitude256 def set_claw_aio(self, pin: str, value: float = 0) -> None:257 """258 设置æçªåæ°259 :param pin: 'force'ï¼æçªåï¼'weight'ï¼æçªéé260 :param value: å¼261 示ä¾ï¼262 >>> self.set_claw_aio("force",0)263 >>> self.set_claw_aio("weight",1)264 """265 self._sync()266 pin = pin.lower()267 if pin == 'force':268 self.rcs.SetClawForce(rc.Force(force=value))269 else: # pin == 'amplitude':270 self.rcs.SetClawAmplitude(rc.Amplitude(amplitude=value))271 def set_claw(self, force: float = 0, amplitude: float = 0) -> None:272 """273 设置æçª274 :param force: æçªå275 :param amplitude: æçªéé276 """277 self._sync()278 self.rcs.SetClawForce(rc.Force(force=force))279 self.rcs.SetClawAmplitude(rc.Amplitude(amplitude=amplitude))280 def movej(self, p: object, a: int = 0, v: int = 0, t: int = 0, r: int = 0, is_joint=None) -> None:281 """282 线æ§ç§»å¨ï¼å
³è空é´ï¼283 :p: ä¼ å
¥JointPoseæCartesianPoseï¼JointPose` å
³èä½ç½®ï¼`CartesianPose` 空é´ä½ç½®ï¼å°éè¿è¿å¨å¦å解转为å
³èä½ç½®ï¼284 :a: 主轴çå
³èå é度 (rad/s)285 :v: 主轴çå
³èé度 (rad/s)286 :t: è¿å¨æ¶é´ (s)287 :r: 交èåå¾ (m)288 :is_joint: å·²å¼ç¨289 示ä¾ï¼290 >>> self.movej(JointPose(0, -0.7853981633974483, 1.5707963267948966, -0.7853981633974483, 1.5707963267948966, 0),1.23,1.23)291 """292 if type(p) is not CartesianPose and type(p) is not JointPose:293 raise Exception("è¯·ä¼ å
¥ CartesianPose æ JointPose")294 is_joint = getattr(p, 'is_joint', True)295 if not hasattr(p, 'pos'):296 p = JointPose(*p)297 req = rc.MoveJRequest(298 joint_pose_to=list(p.pos),299 pose_is_joint_angle=is_joint,300 acceleration=a,301 velocity=v,302 time=t,303 blend_radius=r304 )305 if type(p) is CartesianPose:306 p._base_set_PR(req.pose_base)307 self.rcs.MoveJ(req)308 def movel(self, p: object, a: int = 0, v: int = 0, t: int = 0, r: int = 0, is_joint: bool = None) -> None:309 """310 线æ§ç§»å¨ï¼å·¥å
·ç©ºé´ï¼311 :p: ä¼ å
¥JointPoseæCartesianPoseï¼JointPose` å
³èä½ç½®ï¼`CartesianPose` 空é´ä½ç½®ï¼å°éè¿è¿å¨å¦å解转为å
³èä½ç½®ï¼312 :a: è½´çå
³èå é度 (rad/s)313 :v: 主轴çå
³èé度 (rad/s)314 :t: è¿å¨æ¶é´ (s)315 :r: 交èåå¾ (m)316 :is_joint: å·²å¼ç¨317 示ä¾ï¼318 >>> self.movel(JointPose(0, -0.7853981633974483, 1.5707963267948966, -0.7853981633974483, 1.5707963267948966, 0),1.23,1.23)319 """320 if type(p) is not CartesianPose and type(p) is not JointPose:321 raise Exception("è¯·ä¼ å
¥ CartesianPose æ JointPose")322 is_joint = getattr(p, 'is_joint', False)323 if not hasattr(p, 'pos'):324 p = CartesianPose(*p)325 req = rc.MoveLRequest(326 pose_to=list(p.pos),327 pose_is_joint_angle=is_joint,328 acceleration=a,329 velocity=v,330 time=t,331 blend_radius=r332 )333 if type(p) is CartesianPose:334 p._base_set_PR(req.pose_base)335 self.rcs.MoveL(req)336 def movec(self, via: object, p: object, rad: int = 0, a: int = 0, v: int = 0, t: int = 0, r: int = 0,337 is_joint: bool = None) -> None:338 """339 å弧移å¨ï¼å·¥å
·ç©ºé´ï¼340 :param via: éç»ä½ç½®ãä¼ å
¥JointPoseæCartesianPoseï¼JointPose` å
³èä½ç½®ï¼`CartesianPose` 空é´ä½ç½®ï¼å°éè¿è¿å¨å¦å解转为å
³èä½ç½®ï¼341 :param p: ç®æ ä½ç½®ãå
¥JointPoseæCartesianPoseï¼JointPose` å
³èä½ç½®ï¼`CartesianPose` 空é´ä½ç½®ï¼å°éè¿è¿å¨å¦å解转为å
³èä½ç½®ï¼342 :param rad: è·¯å¾å弧ç弧度 (rad)343 :param a: å·¥å
·ç©ºé´å é度 (m/s2)344 :param v: å·¥å
·ç©ºé´é度 (m/s)345 :param t: è¿å¨æ¶é´ (s)346 :param r: 交èåå¾ (m)347 :param is_joint: å·²å¼ç¨348 示ä¾ï¼349 >>> self.movec(JointPose(0.2, 0.5, 0.4, 0, 0, 1.57),CartesianPose(0.1,0.2,0.2,0.3,0.1,0.2),0,1,0.2,0)350 """351 if type(p) is not CartesianPose and type(p) is not JointPose:352 raise Exception("påæ°å¿
é¡»æ¯ CartesianPose æ JointPose ç±»å")353 if type(via) is not CartesianPose and type(via) is not JointPose:354 raise Exception("via åæ° å¿
é¡»æ¯ CartesianPose æ JointPoseç±»å")355 if not hasattr(p, 'pos'):356 p = CartesianPose(*p)357 if not hasattr(via, 'pos'):358 via = CartesianPose(*via)359 req = rc.MoveCRequest(360 pose_via=list(via.pos),361 pose_via_is_joint=getattr(via, 'is_joint', True),362 pose_to=list(p.pos),363 pose_to_is_joint=getattr(p, 'is_joint', True),364 acceleration=a,365 velocity=v,366 time=t,367 blend_radius=r,368 rad=rad369 )370 if type(p) is CartesianPose:371 p._base_set_PR(req.pose_base)372 self.rcs.MoveC(req)373 def stop_move(self) -> None:374 """375 åæ¢å½å移å¨376 """377 self.rcs.StopMove(Empty())378 def move_pvat(self, p: list, v: list, a: list, t: float) -> None:379 """380 æå®ä½ç½®ãé度ãå é度ãæ¶é´ç伺æ移å¨381 :param p: å
³èä½ç½®å表 (rad)ï¼ç±»åï¼list[float]382 :param v: å
³èé度å表 (rad/s)ï¼ç±»åï¼list[float]383 :param a: å
³èå é度å表 (rad/s^2)ï¼ç±»åï¼list[float]384 :param t: æ»è¿å¨æ¶é´ (s)385 """386 self.rcs.MovePVAT(rc.PVATRequest(duration=t, q=p, v=v, acc=a))387 def move_pvats(self, pvt_iter: list) -> None:388 """389 move_pvatçæµçæ¬390 @param pvt_iter: ç±»åï¼list[PVAT]391 @type pvt_iter: list[PVAT]392 """393 self.rcs.MovePVATStream(394 (rc.PVATRequest(duration=s.duration, q=s.q, v=s.v, acc=s.acc) for s in pvt_iter))395 def move_pvt(self, p: list, v: list, t: float) -> None:396 """397 æå®ä½ç½®ãé度ãæ¶é´ç伺æ移å¨, å é度å°èªå¨è®¡ç®ã398 :param p: å
³èä½ç½®å表 (rad)ï¼ç±»åï¼list[float],399 :param v: å
³èé度å表 (rad/s)ï¼ç±»åï¼list[float],400 :param t: æ»è¿å¨æ¶é´ (s)401 """402 self.rcs.MovePVT(rc.PVATRequest(duration=t, q=p, v=v))403 def move_pvts(self, pvt_iter: list):404 """405 æå®ä½ç½®ãé度ãæ¶é´ç伺æ移å¨, å é度å°èªå¨è®¡ç®ã406 :param pvt_iter: ç±»åï¼list[PVAT]407 :return:408 """409 self.rcs.MovePVTStream(410 (rc.PVATRequest(duration=s.duration, q=s.q, v=s.v) for s in pvt_iter))411 def move_pt(self, p: list, t: float) -> None:412 """413 æå®ä½ç½®åæ¶é´ç伺æ移å¨,é度åå é度å°èªå¨è®¡ç®ã414 :param p: å
³èä½ç½®å表 (rad)ï¼ç±»åï¼list[float]415 :param t: æ»è¿å¨æ¶é´ (s)416 """417 self.rcs.MovePT(rc.PVATRequest(duration=t, q=p))418 def move_pts(self, pt_iter: list) -> None:419 """420 æå®ä½ç½®åæ¶é´ç伺æ移å¨,é度åå é度å°èªå¨è®¡ç®ã421 :param pt_iter: ç±»åï¼list[PVAT]422 :return:423 """424 self.rcs.MovePTStream(425 (rc.PVATRequest(duration=s.duration, q=s.q) for s in pt_iter))426 def movej_until(self, p, a=0, v=0, t=0, cb=None) -> None:427 """428 todo: å¾
å®ç°429 """430 pass431 def movej_until_rt(self, p, a=0, v=0, t=0, logic='AND', io={}, cb=None) -> None:432 """433 todo: å¾
å®ç°434 """435 pass436 def movel_until(self, p, a=0, v=0, t=0, cb=None) -> None:437 """438 todo: å¾
å®ç°439 """440 pass441 def movel_until_rt(self, p, a=0, v=0, t=0, logic='AND', io={}, cb=None) -> None:442 """443 todo: å¾
å®ç°444 """445 pass446 def movec_until(self, via, p, rad=0, a=0, v=0, t=0, cb=None) -> None:447 """448 todo: å¾
å®ç°449 """450 pass451 def movec_until_rt(self, via, p, rad=0, a=0, v=0, t=0, logic='AND', io={}, cb=None) -> None:452 """453 todo: å¾
å®ç°454 """455 pass456 def kinematics_forward(self, *p: list) -> CartesianPose:457 """458 æºå¨äººæ£è§£459 :param p: å
³èä½ç½®ï¼ç±»åï¼list[float]460 :return: 空é´ä½ç½®461 示ä¾ï¼462 >>> certesianPose = self.kinematics_forward(JointPose(0, -0.5, math.pi / 6, 0, 0, 0))463 """464 j = JointPose(*p)465 res = self.rcs.KinematicsForward(j._to_Joint())466 return CartesianPose(*res.vector)467 def kinematics_inverse(self, *p: list):468 """469 æºå¨äººå解470 :param p: 空é´ä½ç½®ï¼ç±»åï¼list[float]471 :return: å
³èä½ç½®472 示ä¾ï¼473 >>> certesianPose = self.kinematics_forward(JointPose(0, -0.5, math.pi / 6, 0, 0, 0))474 >>> jointPose = self.kinematics_inverse(certesianPose)475 """476 j = CartesianPose(*p)477 res = self.rcs.KinematicsInverse(j._to_Vector())478 return JointPose(*res.joints)479 def pose_times(self) -> None:480 """481 todo: å¾
å®ç°482 """483 pass484 def pose_inverse(self) -> None:485 """486 todo: å¾
å®ç°487 """488 pass489 def get_actual_joint_positions(self) -> JointPose:490 """491 è·åå®é
å
³èä½ç½®492 :returns: å
³èä½ç½®493 """494 self._sync()495 res = self.rcs.GetActualJointPositions(Empty())496 return JointPose(*res.joints)497 def get_target_joint_positions(self) -> JointPose:498 """499 è·å¾ææå
³èçææè§åº¦ä½ç½®500 :return: ææå
³èçææè§åº¦ä½ç½®501 """502 self._sync()503 res = self.rcs.GetTargetJointPositions(Empty())504 return JointPose(*res.joints)505 def get_actual_joint_speeds(self) -> list:506 """507 è·å¾ææå
³èçå®é
è§é度508 :return: ææå
³èçå®é
è§é度509 """510 self._sync()511 res = self.rcs.GetActualJointSpeeds(Empty())512 return list(res.joints)513 def get_target_joint_speeds(self) -> list:514 """515 è·å¾ææå
³èçææè§é度516 :return: ææå
³èçææè§é度517 """518 self._sync()519 res = self.rcs.GetTargetJointSpeeds(Empty())520 return list(res.joints)521 def get_joint_torques(self) -> list:522 """523 è·å¾æ¯ä¸ªå
³èçæç©å¼524 :return: æ¯ä¸ªå
³èçæç©å¼525 """526 self._sync()527 res = self.rcs.GetJointTorques(Empty())528 return list(res.joints)529 def get_actual_joint_torques(self) -> list:530 """531 è·åå®é
åç©532 :return: å®é
åç©533 """534 self._sync()535 res = self.rcs.GetRobotData(Empty())536 return list(res.actualTorque.joints)537 def get_target_joint_torques(self) -> list:538 """539 è·åç论åç©540 :return: ç论åç©541 """542 self._sync()543 res = self.rcs.GetRobotData(Empty())544 return list(res.targetTorque.joints)545 def get_joint_temperatures(self) -> list:546 """547 è·åå
³è温度548 :return: å
³è温度549 """550 self._sync()551 res = self.rcs.GetRobotData(Empty())552 return list(res.jointTemps.joints)553 def get_joint_temp(self, joint: int) -> float:554 """555 è·åå
³è温度556 :param joint: å
³èåºå·557 :returns: å
³èå½å温度 (â)558 """559 self._sync()560 res = self.rcs.GetJointTemp(rc.IntRequest(index=joint))561 return res.degree562 def get_actual_tcp_pose(self) -> CartesianPose:563 """564 è·åå®é
空é´ä½ç½®565 :returns: 空é´ä½ç½®566 """567 self._sync()568 res = self.rcs.GetActualTcpPose(Empty())569 return CartesianPose(*res.vector)570 def get_target_tcp_pose(self) -> CartesianPose:571 """572 è·å¾TCPçææç姿æ/ä½ç½®573 :return: TCPçææç姿æ/ä½ç½®574 """575 self._sync()576 res = self.rcs.GetTargetTcpPose(Empty())577 return CartesianPose(*res.vector)578 def get_robot_poses(self) -> RobotPoseData:579 """580 è·åæºå¨äººå§¿æä¿¡æ¯581 :return: æºå¨äººå§¿æä¿¡æ¯582 """583 self._sync()584 res = self.rcs.GetRobotData(Empty())585 return RobotPoseData(res)586 def get_robot_data(self) -> RobotData:587 """588 è·åæºå¨äººæ°æ®589 :return: æºå¨äººæ°æ®590 """591 self._sync()592 res = self.rcs.GetRobotData(Empty())593 return RobotData(res)594 def _generate_robot_data_cmd(self, n=1):595 for i in range(0, n):596 yield rc.RobotDataCmd()597 def get_robot_io_data(self) -> Iterator[RobotIOData]:598 """599 è·åæºå¨äººIOæ°æ®600 :return: æºå¨äººIOæ°æ®601 """602 self._sync()603 res = self.rcs.GetRobotIOData(self._generate_robot_data_cmd())604 for io in res:605 yield RobotIOData(io)606 def set_do(self, pin: int, value: int) -> None:607 """608 设置æ°åè¾åº609 :param pin: éè610 :param value: å¼611 """612 self._sync()613 self.rcs.SetDIO(msg.DIO(pin=pin, value=value))614 def get_di(self, pin: int) -> int:615 """616 è·åæ°åè¾å
¥617 :param pin: éè618 :return: æ°åè¾å
¥å¼619 """620 self._sync()621 res = self.rcs.GetDIO(msg.IOPin(pin=pin))622 return res.value623 def set_ao(self, pin: int, value: float) -> None:624 """625 设置模æè¾åº626 :param pin: éè627 :param value: å¼628 """629 self._sync()630 self.rcs.SetAIO(msg.AIO(pin=pin, value=value))631 def get_ai(self, pin: int) -> float:632 """633 è·å模æè¾å
¥634 :param pin: éè635 :return: 模æè¾å
¥å¼636 """637 self._sync()638 res = self.rcs.GetAIO(msg.IOPin(pin=pin))639 return res.value640 def set_extra_do(self, pin: int, value: int) -> None:641 """642 设置æ©å±æ°åè¾åº643 :param pin: éè644 :param value: å¼645 """646 self._sync()647 self.rcs.SetExtraDIO(msg.DIO(pin=pin, value=value))648 def get_extra_di(self, pin: int) -> int:649 """650 è·åæ©å±æ°åè¾å
¥651 :param pin: éè652 :return: æ°åè¾å
¥å¼653 """654 self._sync()655 res = self.rcs.GetExtraDIO(msg.IOPin(pin=pin))656 return res.value657 def set_extra_ao(self, pin: int, value: float) -> None:658 """659 设置æ©å±æ¨¡æè¾åº660 :param pin: éè661 :param value: å¼662 """663 self._sync()664 self.rcs.SetExtraAIO(msg.AIO(pin=pin, value=value))665 def get_extra_ai(self, pin: int) -> float:666 """667 è·åæ©å±æ¨¡æè¾å
¥668 :param pin: éè669 :return: 模æè¾å
¥å¼670 """671 self._sync()672 res = self.rcs.GetExtraAIO(msg.IOPin(pin=pin))673 return res.value674 def set_ai_mode(self, pin: int, mode: int) -> None:675 """676 设置模æè¾å
¥ç«¯å£å·¥ä½æ¨¡å¼677 :param pin: éè678 :param mode: 0:çµåï¼1:çµæµ679 """680 self._sync()681 self.rcs.SetAInMode(msg.AIO(pin=pin, mode=mode))682 def get_ai_mode(self, pin: int) -> int:683 """684 è·å模æè¾å
¥ç«¯å£å·¥ä½æ¨¡å¼685 :param pin: éè686 """687 self._sync()688 res = self.rcs.GetAInMode(msg.IOPin(pin=pin))689 return res.mode690 def set_ao_mode(self, pin: int, mode: int) -> None:691 """692 设置模æè¾åºç«¯å£å·¥ä½æ¨¡å¼693 :param pin: éè694 :param mode: 0:çµåï¼1:çµæµ695 """696 self._sync()697 self.rcs.SetAOutMode(msg.AIO(pin=pin, mode=mode))698 def get_ao_mode(self, pin: int) -> int:699 """700 è·å模æè¾åºç«¯å£å·¥ä½æ¨¡å¼701 :param pin: éè702 :return: 0:çµåï¼1:çµæµ703 """704 self._sync()705 res = self.rcs.GetAOutMode(msg.IOPin(pin=pin))706 return res.mode707 def set_flange_do(self, pin: int, value: int) -> None:708 """709 设置æ³å
°æ°åè¾åº710 :param pin: éè711 :param value: å¼712 """713 self._sync()714 self.rcs.SetTcpDIO(msg.DIO(pin=pin, value=value))715 def get_flange_di(self, pin: int) -> int:716 """717 è·åæ³å
°æ°åè¾åº718 :param pin: éè719 :return: å¼720 """721 self._sync()722 res = self.rcs.GetTcpDIO(msg.IOPin(pin=pin))723 return res.value724 def set_led(self, mode: int, speed: float, color: list) -> None:725 """726 设置LEDç¯ç¶æ727 :param mode: 1ï¼å
³éã2ï¼å¸¸äº®ã3ï¼å¼å¸ã4ï¼ååæ转ã5ï¼åè²æ转ã6ï¼éªç728 :param speed: é度 åä¸ä¸ªç级ï¼1ï¼å¿«éã2ï¼æ£å¸¸ã3ï¼æ
¢é729 :param color: é¢è²ï¼æå¤å
å«4个 0 ~ 15 ä¹é´çæ´æ°ï¼ç±»åï¼list[int]730 """731 self._sync()732 self.rcs.SetLED(rc.LEDStatus(mode=mode, speed=speed, color=color))733 def set_voice(self, voice: int, volume: int) -> None:734 """735 设置声é³736 :voice: 声é³å表 0~10737 :volume: é³é åå个ç级ï¼0ï¼éé³ã1ï¼ä½ã2ï¼æ£å¸¸ã3ï¼é«738 """739 self._sync()740 self.rcs.SetVoice(rc.VoiceStatus(voice=voice, volume=volume))741 def set_fan(self, fan: int) -> None:742 """743 设置é£æ744 :param fan: 1ï¼å
³éã 2ï¼å¼å¯745 """746 self._sync()747 self.rcs.SetFan(rc.FanStatus(fan=fan))748 def set_signal(self, pin: int, value: int) -> None:749 """750 设置信å·751 :param pin: éè752 :param value: å¼753 """754 self._sync()755 self.rcs.SetSignal(rc.SignalValue(index=pin, value=value))756 def get_signal(self, pin: int) -> int:757 """758 è·åä¿¡å·759 :param pin: éè760 """761 self._sync()762 res = self.rcs.GetSignal(rc.SignalValue(index=pin))763 return res.value764 def add_signal(self, pin: int, value: int) -> None:765 """766 æ·»å ä¿¡å·767 :pin: éè768 :value: å¼769 """770 self._sync()771 self.rcs.AddSignal(rc.SignalValue(index=pin, value=value))772 def enable_joint_limits(self) -> None:773 """774 å¯ç¨å
³èéä½æ£æµ775 """776 self._sync()777 self.pcs.EnableJointLimit(pc.TrueOrFalse(val=True))778 def disable_joint_limits(self) -> None:779 """780 å
³éå
³èéä½æ£æµ781 """782 self._sync()783 self.pcs.EnableJointLimit(pc.TrueOrFalse(val=False))784 def run_scene(self, scene_id: int, execute_count: int = 1, clear: bool = True) -> int:785 """786 è¿è¡åºæ¯787 :param scene_id: åºæ¯Id788 :param execute_count: æ§è¡æ°éï¼0 表示ä¸ç´å¾ªç¯è¿è¡789 :param clear: æ¯å¦å¼ºå¶å
³éæ£å¨è¿è¡çä»»å¡790 :return: ä»»å¡Id791 """792 return self.http_service.run_scene(scene_id, execute_count, clear)['id']793 def rerun_task(self, task_id: int, execute_count: int = 1, clear: bool = True) -> int:794 """795 è¿è¡ä»»å¡796 :param task_id: ä»»å¡Id797 :param execute_count: æ§è¡æ°éï¼0 表示ä¸ç´å¾ªç¯è¿è¡798 :param clear: æ¯å¦å¼ºå¶å
³éæ£å¨è¿è¡çä»»å¡799 :return: ä»»å¡Id800 """801 return self.http_service.run_task(task_id, execute_count, clear)['id']802 def execute_lua_code(self, task_name: str, code: str, execute_count: int = 1, clear: bool = True) -> int:803 """804 :param task_name: ä»»å¡å称805 :param code: lua 代ç 806 :param execute_count: æ§è¡æ°éï¼0 表示ä¸ç´å¾ªç¯è¿è¡807 :param clear: æ¯å¦å¼ºå¶å
³éæ£å¨è¿è¡çä»»å¡808 :return: ä»»å¡Id809 """810 return self.http_service.execute_lua_code(task_name, execute_count, clear, code)['id']811 def get_task(self, id: int) -> Optional[TaskInfo]:812 """813 è·åä»»å¡ä¿¡æ¯814 :param id: ä»»å¡Id815 :return: ä»»å¡ä¿¡æ¯816 """817 return self.http_service.get_task(id)818 def get_tasks(self, pi: int, ps: int) -> Optional[TasksResult]:819 """820 è·åä»»å¡å表821 :param pi: 页ç 822 :param ps: 页大å°823 :return: ä»»å¡å表824 """825 return self.http_service.get_tasks(pi, ps)826 def record_pvat(self) -> Iterator[rc.PVATRequest]:827 """828 è®°å½PVATæ°æ®ç¹829 """830 # self._sync()831 req = pc.RecordPVATRequest()832 req.type = pc.RecordPVATRequest.PVATType.PVAT833 req.duration = 200834 req.use_duration_ts = True835 req.save_file = False836 req.vZeroGap.remove_gap = False837 req.vZeroGap.threshold = 0.001838 res = self.pcs.RecordPVAT(req)839 for r in res:840 if r.end:841 return842 s = rc.PVATRequest(duration=r.t, q=r.p, v=r.v, acc=r.v)843 yield s844 def stop_record_pvat(self):...
variable.py
Source:variable.py
...51 self._names = names or ['x']52 self._kwargs = kwargs or {}53 self._not = not_5455 def _sync(self, f, name, **kwargs):56 kw = {f"{name}.{k}":v for k, v in kwargs.items()}57 return Variable(lambda x: f(self._id(x)), self._names+[name], dict(self._kwargs, **kw), self._not)5859 @property60 def len(self):61 return self._sync(lambda x: len(x), 'len')6263 def in_(self, *v):64 return self._sync(lambda x: x in v, 'in', value=v)6566 def has(self, v):67 return self._sync(lambda x: v in x, 'has', value=v)6869 def inv(self):70 return self._sync(lambda x: not x, 'inv')7172 @property73 def not_(self):74 return Variable(self._id, self._names, self._kwargs, True)7576 @property77 def _verifier(self):78 func = lambda x: self(x)79 names = [n for n in self._names if n]80 if self._not:81 names[1:1] = ['not']82 return Verifier('.'.join(names), func, False, **self._kwargs)8384 def __call__(self, x, context:ValidationContext=None):85 b = bool(self._id(x)) 86 return not b if self._not else b8788 def __getattr__(self, key):89 if key == '__name__':90 return '.'.join(self._names)91 elif key == '__annotations__':92 return {}93 else:94 return self._sync(lambda x: getattr(x, key), f'@{key}')9596 def __getitem__(self, key):97 return self._sync(lambda x: x[key], f'[{key}]')9899 def __eq__(self, v):100 return self._sync(lambda x: x == v, 'eq', value=v)101102 def __ne__(self, v):103 return self._sync(lambda x: x != v, 'ne', value=v)104105 def __lt__(self, th):106 return self._sync(lambda x: x < th, 'lt', value=th)107108 def __le__(self, th):109 return self._sync(lambda x: x <= th, 'le', value=th)110111 def __gt__(self, th):112 return self._sync(lambda x: x > th, 'gt', value=th)113114 def __ge__(self, th):115 return self._sync(lambda x: x >= th, 'ge', value=th)116117 def __add__(self, v):118 return self._sync(lambda x: x + v, 'add', value=v)119120 def __sub__(self, v):121 return self._sync(lambda x: x - v, 'sub', value=v)122123 def __mul__(self, v):124 return self._sync(lambda x: x * v, 'mul', value=v)125126 def __matmul__(self, v):127 return self._sync(lambda x: x @ v, 'matmul', value=v)128129 def __truediv__(self, v):130 return self._sync(lambda x: x / v, 'truediv', value=v)131132 def __floordiv__(self, v):133 return self._sync(lambda x: x // v, 'floordiv', value=v)134135 def __mod__(self, v):136 return self._sync(lambda x: x % v, 'mod', value=v)137138 def __divmod__(self, v):139 return self._sync(lambda x: divmod(x, v), 'divmod', value=v)140141 def __pow__(self, v):142 return self._sync(lambda x: pow(x, v), 'pow', value=v)143144 def __lshift__(self, v):145 return self._sync(lambda x: x << v, 'lshift', value=v)146147 def __rshift__(self, v):148 return self._sync(lambda x: x >> v, 'rshift', value=v)149150 def __and__(self, v):151 return self._sync(lambda x: x & v, 'and', value=v)152153 def __xor__(self, v):154 return self._sync(lambda x: x ^ v, 'xor', value=v)155156 def __or__(self, v):157 return self._sync(lambda x: x | v, 'or', value=v)158159 def __neg__(self):160 return self._sync(lambda x: -x, 'neg')161162 def __pos__(self):163 return self._sync(lambda x: +x, 'pos')164165 def __abs__(self):166 return self._sync(lambda x: abs(x), 'abs')167168 def __invert__(self):169 return self._sync(lambda x: ~x, 'invert')170171 def __round__(self):172 return self._sync(lambda x: round(x), 'round')173174 def __trunc__(self):175 return self._sync(lambda x: math.trunc(x), 'trunc')176177 def __floor__(self):178 return self._sync(lambda x: math.floor(x), 'floor')179180 def __ceil__(self):181 return self._sync(lambda x: math.ceil(x), 'ceil')182183
...
basic.py
Source:basic.py
1from stormed.util import WithFields2properties = [3 ('content_type' , 'shortstr'),4 ('content_encoding' , 'shortstr'),5 ('headers' , 'table'),6 ('delivery_mode' , 'octet'),7 ('priority' , 'octet'),8 ('correlation_id' , 'shortstr'),9 ('reply_to' , 'shortstr'),10 ('expiration' , 'shortstr'),11 ('message_id' , 'shortstr'),12 ('timestamp' , 'timestamp'),13 ('type' , 'shortstr'),14 ('user_id' , 'shortstr'),15 ('app_id' , 'shortstr'),16 ('cluster_id' , 'shortstr'),17]18class Qos(WithFields):19 _name = "basic.qos"20 _class_id = 6021 _method_id = 1022 _sync = True23 _content = False24 _fields = [25 ('prefetch_size' , 'long'),26 ('prefetch_count' , 'short'),27 ('_global' , 'bit'),28 ]29class QosOk(WithFields):30 _name = "basic.qos-ok"31 _class_id = 6032 _method_id = 1133 _sync = False34 _content = False35 _fields = [36 ]37class Consume(WithFields):38 _name = "basic.consume"39 _class_id = 6040 _method_id = 2041 _sync = True42 _content = False43 _fields = [44 ('ticket' , 'short'),45 ('queue' , 'shortstr'),46 ('consumer_tag' , 'shortstr'),47 ('no_local' , 'bit'),48 ('no_ack' , 'bit'),49 ('exclusive' , 'bit'),50 ('nowait' , 'bit'),51 ('arguments' , 'table'),52 ]53class ConsumeOk(WithFields):54 _name = "basic.consume-ok"55 _class_id = 6056 _method_id = 2157 _sync = False58 _content = False59 _fields = [60 ('consumer_tag' , 'shortstr'),61 ]62class Cancel(WithFields):63 _name = "basic.cancel"64 _class_id = 6065 _method_id = 3066 _sync = True67 _content = False68 _fields = [69 ('consumer_tag' , 'shortstr'),70 ('nowait' , 'bit'),71 ]72class CancelOk(WithFields):73 _name = "basic.cancel-ok"74 _class_id = 6075 _method_id = 3176 _sync = False77 _content = False78 _fields = [79 ('consumer_tag' , 'shortstr'),80 ]81class Publish(WithFields):82 _name = "basic.publish"83 _class_id = 6084 _method_id = 4085 _sync = False86 _content = True87 _fields = [88 ('ticket' , 'short'),89 ('exchange' , 'shortstr'),90 ('routing_key' , 'shortstr'),91 ('mandatory' , 'bit'),92 ('immediate' , 'bit'),93 ]94class Return(WithFields):95 _name = "basic.return"96 _class_id = 6097 _method_id = 5098 _sync = False99 _content = True100 _fields = [101 ('reply_code' , 'short'),102 ('reply_text' , 'shortstr'),103 ('exchange' , 'shortstr'),104 ('routing_key' , 'shortstr'),105 ]106class Deliver(WithFields):107 _name = "basic.deliver"108 _class_id = 60109 _method_id = 60110 _sync = False111 _content = True112 _fields = [113 ('consumer_tag' , 'shortstr'),114 ('delivery_tag' , 'longlong'),115 ('redelivered' , 'bit'),116 ('exchange' , 'shortstr'),117 ('routing_key' , 'shortstr'),118 ]119class Get(WithFields):120 _name = "basic.get"121 _class_id = 60122 _method_id = 70123 _sync = True124 _content = False125 _fields = [126 ('ticket' , 'short'),127 ('queue' , 'shortstr'),128 ('no_ack' , 'bit'),129 ]130class GetOk(WithFields):131 _name = "basic.get-ok"132 _class_id = 60133 _method_id = 71134 _sync = False135 _content = True136 _fields = [137 ('delivery_tag' , 'longlong'),138 ('redelivered' , 'bit'),139 ('exchange' , 'shortstr'),140 ('routing_key' , 'shortstr'),141 ('message_count' , 'long'),142 ]143class GetEmpty(WithFields):144 _name = "basic.get-empty"145 _class_id = 60146 _method_id = 72147 _sync = False148 _content = False149 _fields = [150 ('cluster_id' , 'shortstr'),151 ]152class Ack(WithFields):153 _name = "basic.ack"154 _class_id = 60155 _method_id = 80156 _sync = False157 _content = False158 _fields = [159 ('delivery_tag' , 'longlong'),160 ('multiple' , 'bit'),161 ]162class Reject(WithFields):163 _name = "basic.reject"164 _class_id = 60165 _method_id = 90166 _sync = False167 _content = False168 _fields = [169 ('delivery_tag' , 'longlong'),170 ('requeue' , 'bit'),171 ]172class RecoverAsync(WithFields):173 _name = "basic.recover-async"174 _class_id = 60175 _method_id = 100176 _sync = False177 _content = False178 _fields = [179 ('requeue' , 'bit'),180 ]181class Recover(WithFields):182 _name = "basic.recover"183 _class_id = 60184 _method_id = 110185 _sync = True186 _content = False187 _fields = [188 ('requeue' , 'bit'),189 ]190class RecoverOk(WithFields):191 _name = "basic.recover-ok"192 _class_id = 60193 _method_id = 111194 _sync = False195 _content = False196 _fields = [197 ]198class Nack(WithFields):199 _name = "basic.nack"200 _class_id = 60201 _method_id = 120202 _sync = False203 _content = False204 _fields = [205 ('delivery_tag' , 'longlong'),206 ('multiple' , 'bit'),207 ('requeue' , 'bit'),208 ]209id2method = {210 10: Qos,211 11: QosOk,212 20: Consume,213 21: ConsumeOk,214 30: Cancel,215 31: CancelOk,216 40: Publish,217 50: Return,218 60: Deliver,219 70: Get,220 71: GetOk,221 72: GetEmpty,222 80: Ack,223 90: Reject,224 100: RecoverAsync,225 110: Recover,226 111: RecoverOk,227 120: Nack,...
memory.py
Source:memory.py
...29 is a thin asynchronous wrapper around that version.30 """31 def __init__(self) -> None:32 self._sync = SyncMemoryBackend()33 def to_sync(self) -> SyncMemoryBackend:34 """Get a synchronous backend with the same underlying data."""35 return self._sync36 @staticmethod37 def from_sync(sync: SyncMemoryBackend) -> 'MemoryBackend':38 """Create an asynchronous backend that shares data with a synchronous one."""39 me = MemoryBackend()40 me._sync = sync41 return me42 async def exists(self, key: bytes) -> bool:43 return key in self._sync44 async def keys(self, filter: bytes) -> List[bytes]:45 return self._sync.keys(filter)46 async def delete(self, key: bytes) -> None:47 self._sync.delete(key)48 async def clear(self) -> None:49 self._sync.clear()50 async def key_type(self, key: bytes) -> Optional[utils.KeyType]:51 return self._sync.key_type(key)...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!