Best Python code snippet using playwright-python
device.py
Source:device.py
1from __future__ import print_function2import threading3import logging4from collections import defaultdict5from binascii import hexlify6from uuid import UUID7from . import exceptions8try:9 string_type = basestring10except NameError:11 string_type = str12log = logging.getLogger(__name__)13class BLEDevice(object):14 """15 An BLE device connection instance, returned by one of the BLEBackend16 implementations. This class is not meant to be instantiated directly - use17 BLEBackend.connect() to create one.18 """19 def __init__(self, address):20 """21 Initialize.22 address -- the BLE address (aka MAC address) of the device as a string.23 """24 self._address = address25 self._characteristics = {}26 self._callbacks = defaultdict(set)27 self._subscribed_handlers = {}28 self._subscribed_uuids = {}29 self._lock = threading.Lock()30 def bond(self, permanent=False):31 """32 Create a new bond or use an existing bond with the device and make the33 current connection bonded and encrypted.34 """35 raise NotImplementedError()36 def get_rssi(self):37 """38 Get the receiver signal strength indicator (RSSI) value from the BLE39 device.40 Returns the RSSI value in dBm on success.41 Returns None on failure.42 """43 raise NotImplementedError()44 def char_read(self, uuid):45 """46 Reads a Characteristic by UUID.47 uuid -- UUID of Characteristic to read as a string.48 Returns a bytearray containing the characteristic value on success.49 Example:50 my_ble_device.char_read('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b')51 """52 raise NotImplementedError()53 def char_read_handle(self, uuid):54 """55 Reads a Characteristic by handle.56 handle -- handle of Characteristic to read.57 Returns a bytearray containing the characteristic value on success.58 Example:59 my_ble_device.char_read_handle(5)60 """61 raise NotImplementedError()62 def char_read_long(self, uuid):63 """64 Reads a Characteristic by UUID.65 uuid -- UUID of Characteristic to read as a string.66 Returns a bytearray containing the characteristic value on success.67 Example:68 my_ble_device.char_read('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b')69 """70 raise NotImplementedError()71 def char_read_long_handle(self, handle):72 """73 Reads a Characteristic longer than one read by handle.74 handle -- handle of Characteristic to read.75 Returns a bytearray containing the characteristic value on success.76 Example:77 my_ble_device.char_read_long_handle(5)78 """79 raise NotImplementedError()80 def char_write(self, uuid, value, wait_for_response=True):81 """82 Writes a value to a given characteristic UUID.83 uuid -- the UUID of the characteristic to write to.84 value -- a bytearray to write to the characteristic.85 wait_for_response -- wait for response after writing. A GATT "command"86 is used when not waiting for a response. The remote host will not87 acknowledge the write.88 Example:89 my_ble_device.char_write('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b',90 bytearray([0x00, 0xFF]))91 """92 return self.char_write_handle(self.get_handle(uuid), value,93 wait_for_response=wait_for_response)94 def char_write_handle(self, handle, value, wait_for_response=True):95 """96 Writes a value to a given characteristic handle. This can be used to97 write to the characteristic config handle for a primary characteristic.98 handle -- the handle to write to.99 value -- a bytearray to write to the characteristic.100 wait_for_response -- wait for response after writing.101 Example:102 my_ble_device.char_write_handle(42, bytearray([0x00, 0xFF]))103 """104 raise NotImplementedError()105 def char_write_long(self, uuid, value, wait_for_response=False):106 """107 Writes a value to a given characteristic UUID.108 uuid -- the UUID of the characteristic to write to.109 value -- a bytearray to write to the characteristic.110 wait_for_response -- wait for response after writing.111 Example:112 my_ble_device.char_write('a1e8f5b1-696b-4e4c-87c6-69dfe0b0093b',113 bytearray([0x00, 0xFF]))114 """115 return self.char_write_long_handle(self.get_handle(uuid), value,116 wait_for_response=wait_for_response)117 def char_write_long_handle(self, handle, value, wait_for_response=False):118 """119 Writes a value to a given characteristic handle. This can be used to120 write to the characteristic config handle for a primary characteristic.121 handle -- the handle to write to.122 value -- a bytearray to write to the characteristic.123 wait_for_response -- wait for response after writing.124 Example:125 my_ble_device.char_write(42,126 bytearray([0x00, 0xFF]))127 """128 raise NotImplementedError()129 def disconnect(self):130 """131 Disconnect from the device. This instance of BLEDevice cannot be used132 after calling this method, you must call BLEBackend.connect() again to133 get a fresh connection.134 """135 raise NotImplementedError()136 def _notification_handles(self, uuid):137 # Expect notifications on the value handle...138 value_handle = self.get_handle(uuid)139 # but write to the characteristic config to enable notifications140 # TODO with the BGAPI backend we can be smarter and fetch the actual141 # characteristic config handle - we can also do that with gattool if we142 # use the 'desc' command, so we'll need to change the "get_handle" API143 # to be able to get the value or characteristic config handle.144 characteristic_config_handle = value_handle + 1145 return value_handle, characteristic_config_handle146 def subscribe(self, uuid, callback=None, indication=False,147 wait_for_response=True):148 """149 Enable notifications or indications for a characteristic and register a150 callback function to be called whenever a new value arrives.151 uuid -- UUID as a string of the characteristic to subscribe.152 callback -- function to be called when a notification/indication is153 received on this characteristic.154 indication -- use indications (where each notificaiton is ACKd). This is155 more reliable, but slower.156 wait_for_response -- wait for response after subscription.157 """158 value_handle, characteristic_config_handle = (159 self._notification_handles(uuid)160 )161 properties = bytearray([162 0x2 if indication else 0x1,163 0x0164 ])165 with self._lock:166 if callback is not None:167 self._callbacks[value_handle].add(callback)168 if self._subscribed_handlers.get(value_handle, None) != properties:169 self.char_write_handle(170 characteristic_config_handle,171 properties,172 wait_for_response=wait_for_response173 )174 log.info("Subscribed to uuid=%s", uuid)175 self._subscribed_handlers[value_handle] = properties176 self._subscribed_uuids[uuid] = indication177 else:178 log.debug("Already subscribed to uuid=%s", uuid)179 def unsubscribe(self, uuid, wait_for_response=True):180 """181 Disable notification for a characteristic and de-register the callback.182 """183 value_handle, characteristic_config_handle = (184 self._notification_handles(uuid)185 )186 properties = bytearray([0x0, 0x0])187 with self._lock:188 if uuid in self._subscribed_uuids:189 del(self._subscribed_uuids[uuid])190 if value_handle in self._callbacks:191 del(self._callbacks[value_handle])192 if value_handle in self._subscribed_handlers:193 del(self._subscribed_handlers[value_handle])194 self.char_write_handle(195 characteristic_config_handle,196 properties,197 wait_for_response=wait_for_response198 )199 log.info("Unsubscribed from uuid=%s", uuid)200 else:201 log.debug("Already unsubscribed from uuid=%s", uuid)202 def subscribe_handle(self, handle, callback=None, indication=False,203 wait_for_response=True):204 """205 Like subscribe() but using handle instead of uuid.206 handle -- handle as a integer of the characteristic to subscribe.207 """208 value_handle = handle209 characteristic_config_handle = value_handle + 1210 properties = bytearray([211 0x2 if indication else 0x1,212 0x0213 ])214 with self._lock:215 if callback is not None:216 self._callbacks[value_handle].add(callback)217 if self._subscribed_handlers.get(value_handle, None) != properties:218 self.char_write_handle(219 characteristic_config_handle,220 properties,221 wait_for_response=wait_for_response222 )223 log.info("Subscribed to handle=0x%04x", value_handle)224 self._subscribed_handlers[value_handle] = properties225 else:226 log.debug("Already subscribed to handle=0x%04x", value_handle)227 def unsubscribe_handle(self, handle, wait_for_response=True):228 """229 Like unsubscribe() but using handle instead of uuid.230 handle -- handle as a integer of the characteristic to unsubscribe.231 """232 value_handle = handle233 characteristic_config_handle = value_handle + 1234 properties = bytearray([0x0, 0x0])235 with self._lock:236 if value_handle in self._callbacks:237 del(self._callbacks[value_handle])238 if value_handle in self._subscribed_handlers:239 del(self._subscribed_handlers[value_handle])240 self.char_write_handle(241 characteristic_config_handle,242 properties,243 wait_for_response=wait_for_response244 )245 log.info("Unsubscribed from handle=0x%04x", value_handle)246 else:247 log.debug(248 "Already unsubscribed from handle=0x%04x",249 value_handle250 )251 def get_handle(self, char_uuid):252 """253 Look up and return the handle for an attribute by its UUID.254 :param char_uuid: The UUID of the characteristic.255 :type uuid: str256 :return: None if the UUID was not found.257 """258 if isinstance(char_uuid, string_type):259 char_uuid = UUID(char_uuid)260 log.debug("Looking up handle for characteristic %s", char_uuid)261 if char_uuid not in self._characteristics:262 self._characteristics = self.discover_characteristics()263 characteristic = self._characteristics.get(char_uuid)264 if characteristic is None:265 message = "No characteristic found matching %s" % char_uuid266 log.warn(message)267 raise exceptions.BLEError(message)268 # TODO support filtering by descriptor UUID, or maybe return the whole269 # Characteristic object270 log.debug("Found %s" % characteristic)271 return characteristic.handle272 def receive_notification(self, handle, value):273 """274 Receive a notification from the connected device and propagate the value275 to all registered callbacks.276 """277 log.info('Received notification on handle=0x%x, value=0x%s',278 handle, hexlify(value))279 with self._lock:280 if handle in self._callbacks:281 for callback in self._callbacks[handle]:282 callback(handle, value)283 def exchange_mtu(self, mtu):284 """285 ATT exchange Maximum Transmission Unit.286 :param mtu: New MTU-value287 :return: New MTU, as recognized by server.288 """289 raise NotImplementedError()290 def resubscribe_all(self):291 """292 Reenable all notifications and indications for uuids that were293 previously subscribed to.294 This has to be called after a connection loss and subsequent reconnect.295 """296 for uuid in self._subscribed_uuids:297 value_handle, characteristic_config_handle = (298 self._notification_handles(uuid)299 )300 properties = bytearray([301 0x2 if self._subscribed_uuids[uuid] else 0x1,302 0x0303 ])304 with self._lock:305 self.char_write_handle(306 characteristic_config_handle,307 properties,308 wait_for_response=True309 )...
listener.py
Source:listener.py
...37 print("show_messages : View the messages received so far")38 print("show_route : Display the routing table for the AODV node")39 40 # Wait for the response from protocol handler thread41 def wait_for_response(self):42 (msg, _) = self.sock.recvfrom(10)43 status = msg.decode('utf-8')44 # Simulate a link-up event for this node45 def activate(self):46 message_type = "NODE_ACTIVATE"47 message = message_type + ":" + ""48 message_bytes = bytes(message, 'utf-8')49 self.send(message_bytes)50 self.wait_for_response()51 52 # Take the neighbor set for the current node from the user. This will be handled by the AODV library 53 def add_neighbors(self):54 message_type = "ADD_NEIGHBOR"55 message = message_type + ":" + ""56 message_bytes = bytes(message, 'utf-8')57 self.send(message_bytes)58 self.wait_for_response()59 60 # Simulate a link-down event for this node61 def deactivate(self):62 message_type = "NODE_DEACTIVATE"63 message = message_type + ":" + ""64 message_bytes = bytes(message, 'utf-8')65 self.send(message_bytes)66 self.wait_for_response()67 # Delete all the messages received so far68 def delete_messages(self):69 message_type = "DELETE_MESSAGES"70 message = message_type + ":" + ""71 message_bytes = bytes(message, 'utf-8')72 self.send(message_bytes)73 self.wait_for_response()74 75 # Send a message to a peer76 def send_message(self):77 78 # Get the message details from the user79 message = input("Enter the message to send: ")80 node = input("Enter the destination: ")81 82 message_type = "SEND_MESSAGE"83 message = message_type + ":" + self.node_id + ":" + node + ":" + message84 message_bytes = bytes(message, 'utf-8')85 self.send(message_bytes)86 self.wait_for_response()87 88 # Display the contents of the log file. Just invoke the library routine.89 def show_log(self):90 message_type = "VIEW_LOG"91 message = message_type + ":" + ""92 message_bytes = bytes(message, 'utf-8')93 self.send(message_bytes)94 self.wait_for_response()95 96 # Display the messages sent to this node by other nodes97 def show_messages(self):98 message_type = "VIEW_MESSAGES"99 message = message_type + ":" + ""100 message_bytes = bytes(message, 'utf-8')101 self.send(message_bytes)102 self.wait_for_response()103 # Display the routing table 104 def show_route(self):105 message_type = "SHOW_ROUTE"106 message = message_type + ":" + ""107 message_bytes = bytes(message, 'utf-8')108 self.send(message_bytes)109 self.wait_for_response()110 111 def command_stop(self):112 message_type = "COMMAND_STOP"113 message = message_type + ":" + ""114 message_bytes = bytes(message, 'utf-8')115 self.send(message_bytes)116 self.wait_for_response()117 # Default routine called when invalid command is issued. Do nothing.118 def default(self):119 if (len(self.command) == 0):120 pass121 else:122 print("Invalid Command")123 # Thread start routine124 def run(self):125 126 # Get the listener ports127 self.port = self.get_listener_port(self.node_id)128 self.aodv_listener_port = self.get_aodv_listener_port(self.node_id)129 130 # Setup socket to communicate with the AODV protocol handler thread...
clay_extruder_control.py
Source:clay_extruder_control.py
1# -*- coding: UTF-8 -*-2import socket3import struct4import time5from message_types import msg_types_dict6from message_types import MSG_RECEIVED, MSG_EXECUTED, MSG_STOP, MSG_INFO, MSG_DODATA, MSG_MOTORDATA, MSG_MOTORSTATE7__all__ = [8 "ExtruderClient"9]10class ExtruderClient():11 def __init__(self, host = "192.168.10.50", port = 50004):12 self.host = host13 self.port = port14 self.connected = False15 self.msg_rcv = bytearray([])16 self.info_msg = ""17 self.header_byteorder = ">lll"18 def clear(self):19 self.msg_rcv = bytearray([])20 def connect(self):21 if not self.connected:22 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)23 self.sock.connect((self.host, self.port))24 print("Connected to server {} on port {}".format(self.host, self.port))25 self.connected = True26 27 def close(self):28 self.sock.close()29 self.connected = False30 # Send commands31 def send_stop(self, wait_for_response=True):32 return self.__send(MSG_STOP, wait_for_response=wait_for_response)33 def send_set_do(self, pin=0, state=0, wait_for_response=True):34 msg = [pin, state]35 return self.__send(MSG_DODATA, 8, msg, wait_for_response)36 def send_motordata(self, motor_id, state, max_speed, speed, acceleration, wait_for_response=True):37 msg = [motor_id, state, max_speed, speed, acceleration]38 return self.__send(MSG_MOTORDATA, 20, msg, wait_for_response)39 40 def send_motorstate(self, motor_id, state, wait_for_response=True):41 msg = [motor_id, state]42 return self.__send(MSG_MOTORSTATE, 8, msg, wait_for_response)43 def send_get_arduino_info(self, wait_for_response=True):44 return self.__send(MSG_INFO, wait_for_response=wait_for_response)45 # Get commands46 def get_msg_stop(self, wait_for_response=True):47 return self.__get_msg(MSG_STOP, wait_for_response=wait_for_response)48 def get_msg_set_do(self, pin=0, state=0, wait_for_response=True):49 msg = [pin, state]50 return self.__get_msg(MSG_DODATA, 8, msg, wait_for_response)51 def get_msg_motordata(self, motor_id, state, max_speed, speed, acceleration, wait_for_response=True):52 msg = [motor_id, state, max_speed, speed, acceleration]53 return self.__get_msg(MSG_MOTORDATA, 20, msg, wait_for_response)54 55 def get_msg_motorstate(self, motor_id, state, wait_for_response=True):56 msg = [motor_id, state]57 return self.__get_msg(MSG_MOTORSTATE, 8, msg, wait_for_response)58 def get_msg_arduino_info(self, wait_for_response=True):59 return self.__get_msg(MSG_INFO, wait_for_response=wait_for_response)60 # Internal commands61 def __get_msg(self, msg_type, msg_len = 0, msg = None, wait_for_response = True, packed=False):62 msg_list = [msg_type, msg_len, int(wait_for_response)]63 if msg is not None:64 msg_list.extend(msg)65 if packed:66 packed_msg = struct.pack(self.header_byteorder + msg_types_dict[msg_type][1], *msg_list)67 return packed_msg68 else:69 return msg_list70 def __send(self, msg_type, msg_len = 0, msg = None, wait_for_response = True):71 self.sock.send(self.__get_msg(msg_type, msg_len, msg, wait_for_response, True))72 headers, msgs = [], []73 while wait_for_response:74 header, msg = self.__read()75 if header is not None:76 print("{} received from Arduino.".format(msg_types_dict[header[0]][0]))77 print("Message: {}".format(msg))78 headers.append(header)79 msgs.append(msg)80 wait_for_response = header[0] != MSG_EXECUTED81 print("Still waiting? -> ", wait_for_response)82 time.sleep(0.1)83 else:84 return(headers, msgs)85 86 def __read(self):87 try: 88 self.msg_rcv.extend(self.sock.recv(1))89 print(self.msg_rcv)90 hdr = struct.unpack_from(self.header_byteorder, self.msg_rcv)91 print(hdr)92 except:93 return None, None94 else:95 if hdr[1] > 0: 96 msg = struct.unpack_from(msg_types_dict[hdr[0]][2], self.sock.recv(hdr[1]))97 else:98 msg = ""99 print(msg)100 self.clear()101 return hdr, msg102if __name__ == "__main__":103 ec = ExtruderClient()104 ec.connect()105 time.sleep(0.5)106 print("Connection: ", ec.connected)107 #ec.send_get_arduino_info()108 pin = 32109 state = 1110 wait = False111 for i in range(5):112 ec.send_set_do(pin-2,1,wait)113 ec.send_set_do(pin,1,wait)114 ec.send_set_do(pin+2,1,wait)115 ec.send_set_do(pin-2,0,wait)116 ec.send_set_do(pin,0,wait)117 ec.send_set_do(pin+2,0,wait)118 time.sleep(1)119 # ec.send_set_do(30,1,wait)120 # time.sleep(0.5)121 # ec.send_motordata(0, 0, 1600, 800, 200, wait)122 # ec.send_motorstate(0, 1, wait)123 #executed = False124 # while not executed:125 # raw_hdr = bytearray([])126 # [raw_hdr.extend(ec.sock.recv(1)) for i in range(12)]127 # #print(raw_hdr)128 # hdr = struct.unpack(ec.header_byteorder, raw_hdr)129 # #print(hdr)130 # if hdr[0] == MSG_EXECUTED:131 # executed = True132 # elif hdr[0] == MSG_INFO:133 # raw_msg = bytearray()134 # raw_msg.extend(ec.sock.recv(hdr[1]))135 # #[raw_msg.extend(ec.sock.recv(1)) for i in range(hdr[1])]136 # msg = struct.unpack(msg_types_dict[MSG_INFO][2], raw_msg)137 # for m in msg[:3]:138 # clean_msg = m[:m.index(0)] 139 # #print(clean_msg.decode())140 # else:141 # pass142 # time.sleep(0.1)143 # time.sleep(5)144 # ec.send_motorstate(0, 0, wait)145 # time.sleep(0.5)146 # ec.send_set_do(30,0,wait)147 ec.close()...
trionesControl.py
Source:trionesControl.py
1import pygatt2import logging3import pygatt.exceptions 4MAIN_CHARACTERISTIC_UUID = "0000ffd9-0000-1000-8000-00805f9b34fb"5log = logging.getLogger(__name__)6def connect(MAC, reset_on_start=True):7 """8 Create and start a new backend adapter and connect it to a device.9 When connecting to multiple devices at the same time make sure to set reset_on_start10 to False after the first connection is made, otherwise all connections made before are11 invalidated.12 :param string MAC: MAC address of the device to connect to.13 :param bool reset_on_start: Perhaps due to a bug in gatttol or pygatt,14 but if the bluez backend isn't restarted, it can sometimes lock up15 the computer when trying to make a connection to HCI device.16 """17 try:18 adapter = pygatt.GATTToolBackend()19 adapter.start(reset_on_start=reset_on_start)20 device = adapter.connect(MAC)21 except pygatt.exceptions.NotConnectedError:22 raise pygatt.exceptions.NotConnectedError("Device nor connected!")23 log.info("Device connected")24 return device25def disconnect(device):26 try:27 device.disconnect()28 except pygatt.exceptions.NotConnectedError:29 raise pygatt.exceptions.NotConnectedError("Device nor connected!")30 log.info("Device disconnected")31def powerOn(device, wait_for_response=False):32 """33 :param bool wait_for_response: wait for response after writing. A GATT "command"34 is used when not waiting for a response. The remote host will not35 acknowledge the write.36 """37 try:38 device.char_write(MAIN_CHARACTERISTIC_UUID, b'\xcc\x23\x33', wait_for_response=wait_for_response)39 except pygatt.exceptions.NotConnectedError:40 raise pygatt.exceptions.NotConnectedError("Device nor connected!")41 log.info("Device powered on")42def powerOff(device, wait_for_response=False):43 """44 :param bool wait_for_response: wait for response after writing. A GATT "command"45 is used when not waiting for a response. The remote host will not46 acknowledge the write.47 """48 try:49 device.char_write(MAIN_CHARACTERISTIC_UUID, b'\xcc\x24\x33', wait_for_response=wait_for_response)50 except pygatt.exceptions.NotConnectedError:51 raise pygatt.exceptions.NotConnectedError("Device nor connected!")52 log.info("Device powered off")53def setRGB(r: int, g: int, b: int, device, wait_for_response=False):54 """55 :param bool wait_for_response: wait for response after writing. A GATT "command"56 is used when not waiting for a response. The remote host will not57 acknowledge the write.58 """59 # Values for color should be between 0 and 25560 if r > 255: r = 25561 if r < 0: r= 062 if g > 255: g = 255 63 if g < 0: g = 064 if b > 255: b = 25565 if b < 0: b = 066 payload = bytearray() 67 payload.append(0x56)68 payload.append(r)69 payload.append(g)70 payload.append(b)71 payload.append(0x00)72 payload.append(0xF0)73 payload.append(0xAA)74 try:75 device.char_write(MAIN_CHARACTERISTIC_UUID, payload, wait_for_response=wait_for_response)76 except pygatt.exceptions.NotConnectedError:77 raise pygatt.exceptions.NotConnectedError("Device nor connected!")78 log.info("RGB set -- R: %d, G: %d, B: %d", r, g, b)79def setWhite(intensity: int, device, wait_for_response=False):80 """81 :param bool wait_for_response: wait for response after writing. A GATT "command"82 is used when not waiting for a response. The remote host will not83 acknowledge the write.84 """85 # Intensity value shoud be between 0 and 25586 if (intensity > 255): intensity = 25587 if (intensity < 0): intensity = 088 payload = bytearray() 89 payload.append(0x56)90 payload.append(0x0)91 payload.append(0x0)92 payload.append(0x0)93 payload.append(intensity)94 payload.append(0x0F)95 payload.append(0xAA)96 try:97 device.char_write(MAIN_CHARACTERISTIC_UUID, payload, wait_for_response=wait_for_response)98 except pygatt.exceptions.NotConnectedError:99 raise pygatt.exceptions.NotConnectedError("Device nor connected!")100 log.info("White color set -- Intensity: %d", intensity)101def setBuiltIn(mode: int, speed: int, device, wait_for_response=False):102 """103 :param bool wait_for_response: wait for response after writing. A GATT "command"104 is used when not waiting for a response. The remote host will not105 acknowledge the write.106 """107 if mode<37 | mode > 56:108 raise pygatt.exceptions.BLEError("Invalid Mode")109 if speed<1: speed =1110 if speed > 255: speed = 255111 payload = bytearray() 112 payload.append(0xBB)113 payload.append(mode)114 payload.append(speed)115 payload.append(0x44)116 try:117 device.char_write(MAIN_CHARACTERISTIC_UUID, payload, wait_for_response=wait_for_response)118 except pygatt.exceptions.NotConnectedError:119 raise pygatt.exceptions.NotConnectedError("Device nor connected!")...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!