Best Python code snippet using lisa_python
device_controller.py
Source:device_controller.py
...98 '''99 Cannot write to device registry device status table100 '''101 raise SegfaultError('Segmentation fault when trying to write word at {}'.format(utils.word_to_str(pos)))102 def _set_device_status(self, ioport_number, status):103 if status < 0:104 status = 0105 if status > 255:106 status = 255107 changed = status != self._device_status_table[ioport_number]108 self._device_status_table[ioport_number] = status109 if changed:110 self.interrupt_controller.send(self.system_interrupts['device_status_changed'][ioport_number])111 def _output_thread_run(self):112 while True:113 try:114 ioport_number, device_host, device_port, command, data = self.output_queue.get(timeout=0.1)115 except queue.Empty:116 if self._stop_event.wait(0):117 break118 continue119 logger.debug('Command "%s" from IOPort %s', command, ioport_number)120 if command == 'data':121 response = self._send_request(device_host, device_port, 'data', data)122 if response.status_code != 200:123 raise DeviceError('Could not send data: {}'.format(response.text))124 logger.debug('[Device] %s', response.json()['message'])125 else:126 logger.error('Unknown command')127 def _ping_thread_run(self):128 ping_period = 1 # sec129 while True:130 for ioport in self.ioports:131 if ioport.registered:132 self._check_and_update_device_status(ioport)133 if self._stop_event.wait(ping_period):134 break135 def _check_and_update_device_status(self, ioport):136 ponged = self._ping_device(ioport)137 if ponged:138 ping_message = 'ponged'139 else:140 ping_message = 'did not pong'141 logger.debug(142 'Device[%s:%s] @ IOPort[%s] %s.',143 ioport.device_host, ioport.device_port,144 ioport.ioport_number,145 ping_message,146 )147 if ponged:148 new_status = 0149 else:150 new_status = self._device_status_table[ioport.ioport_number] + 1151 self._set_device_status(ioport.ioport_number, new_status)152 def _ping_device(self, ioport):153 try:154 response = self._send_request(ioport.device_host, ioport.device_port, 'ping')155 if response.status_code != 200:156 raise DeviceError('Device did not pong.')157 except DeviceControllerError:158 return False159 return True160 def _send_request(self, device_host, device_port, command, data=None, content_type='application/octet-stream'):161 if data is None:162 data = b''163 try:164 response = requests.post(165 'http://{}:{}/{}'.format(166 device_host,167 device_port,168 command,169 ),170 data=data,171 headers={'Content-Type': content_type},172 )173 except requests.exceptions.ConnectionError:174 raise DeviceControllerConnectionError('Could not connect to Aldebaran.')175 logger.debug('Request sent.')176 return response177 def _handle_incoming_request(self, path, headers, rfile):178 '''179 Handle incoming request from devices, called by GenericRequestHandler180 '''181 max_ioport_number = len(self.ioports) - 1182 path = path.lstrip('/')183 if '/' not in path:184 return (185 HTTPStatus.BAD_REQUEST,186 {187 'error': 'Path must be /ioport/command',188 }189 )190 ioport_number, command = path.split('/', 1)191 try:192 ioport_number = int(ioport_number)193 if ioport_number < 0:194 raise ValueError()195 if ioport_number > max_ioport_number:196 raise ValueError()197 except ValueError:198 return (199 HTTPStatus.BAD_REQUEST,200 {201 'error': 'IOPort number must be an integer between 0 and {}.'.format(max_ioport_number),202 }203 )204 try:205 request_body_length = int(headers.get('Content-Length'))206 except TypeError:207 return (HTTPStatus.LENGTH_REQUIRED, None)208 data = rfile.read(request_body_length)209 return self._handle_input(ioport_number, command, data)210 def _handle_input(self, ioport_number, command, data):211 '''212 Handle command from device213 '''214 logger.debug('Incoming command "%s" to IOPort %s', command, ioport_number)215 if command == 'register':216 return self._register_device(ioport_number, data)217 if command == 'unregister':218 return self._unregister_device(ioport_number)219 if command == 'ping':220 return (221 HTTPStatus.OK,222 {223 'message': 'pong',224 }225 )226 if command == 'data':227 return self._send_data_to_ioport(ioport_number, data)228 return (229 HTTPStatus.BAD_REQUEST,230 {231 'error': 'Unknown command: {}'.format(command),232 }233 )234 def _send_data_to_ioport(self, ioport_number, data):235 if not self.ioports[ioport_number].registered:236 logger.info('No device is registered to IOPort %s.', ioport_number)237 return (238 HTTPStatus.FORBIDDEN,239 {240 'error': 'No device is registered to this IOPort.',241 }242 )243 if len(data) > self.ioports[ioport_number].input_buffer_size:244 logger.info('Too much data sent to IOPort %s.', ioport_number)245 return (246 HTTPStatus.FORBIDDEN,247 {248 'error': 'Too much data sent.',249 }250 )251 self.ioports[ioport_number].input_queue.put(data)252 logger.info('Delivered data to IOPort %s.', ioport_number)253 self.interrupt_controller.send(self.system_interrupts['ioport_in'][ioport_number])254 return (255 HTTPStatus.OK,256 {257 'message': 'Received data: {}'.format(utils.binary_to_str(data)),258 }259 )260 def _register_device(self, ioport_number, data):261 try:262 data = json.loads(data)263 except json.decoder.JSONDecodeError:264 return (265 HTTPStatus.BAD_REQUEST,266 {267 'error': 'Could not parse data.',268 }269 )270 for field_name in {'type', 'id', 'host', 'port'}:271 if field_name not in data:272 return (273 HTTPStatus.BAD_REQUEST,274 {275 'error': 'Field "{}" missing.'.format(field_name),276 }277 )278 try:279 device_type = int(data['type'], 16)280 if device_type < 0x00 or device_type > 0xFF:281 raise ValueError()282 except ValueError:283 return (284 HTTPStatus.BAD_REQUEST,285 {286 'error': 'Device type must be a 1-byte hex number.',287 }288 )289 try:290 device_id = int(data['id'], 16)291 if device_id < 0x00 or device_id > 0xFFFFFF:292 raise ValueError()293 device_id = list(device_id.to_bytes(3, 'big'))294 except ValueError:295 return (296 HTTPStatus.BAD_REQUEST,297 {298 'error': 'Device ID must be a 1-byte hex number.',299 }300 )301 device_host, device_port = data['host'], data['port']302 if self.ioports[ioport_number].registered:303 logger.info('A device is already registered to IOPort %s.', ioport_number)304 return (305 HTTPStatus.FORBIDDEN,306 {307 'error': 'A device is already registered to this IOPort.',308 }309 )310 if self.ioports[ioport_number].input_queue.qsize() > 0:311 logger.info('IOPort %s input queue not empty.', ioport_number)312 return (313 HTTPStatus.FORBIDDEN,314 {315 'error': 'IOPort input queue not empty.',316 }317 )318 logger.info('Registering device to IOPort %s...', ioport_number)319 logger.info(320 'Device type and ID: %s %s',321 utils.byte_to_str(device_type),322 ' '.join(utils.byte_to_str(device_id[i]) for i in range(3)),323 )324 logger.info('Device host and port: %s:%s', device_host, device_port)325 self._device_registry[4 * ioport_number] = device_type326 for idx in range(3):327 self._device_registry[4 * ioport_number + 1 + idx] = device_id[idx]328 self._set_device_status(ioport_number, 0)329 self.ioports[ioport_number].register_device(device_host, device_port)330 self.interrupt_controller.send(self.system_interrupts['device_registered'])331 logger.info('Device registered to IOPort %s.', ioport_number)332 return (333 HTTPStatus.OK,334 {335 'message': 'Device registered.',336 }337 )338 def _unregister_device(self, ioport_number):339 if not self.ioports[ioport_number].registered:340 logger.info('No device is registered to IOPort %s.', ioport_number)341 return (342 HTTPStatus.FORBIDDEN,343 {344 'error': 'No device is registered to this IOPort.',345 }346 )347 logger.info('Unregistering device from IOPort %s...', ioport_number)348 for idx in range(4):349 self._device_registry[4 * ioport_number + idx] = 0350 self._set_device_status(ioport_number, 0)351 self.ioports[ioport_number].unregister_device()352 self.interrupt_controller.send(self.system_interrupts['device_unregistered'])353 logger.info('Device unregistered from IOPort %s.', ioport_number)354 return (355 HTTPStatus.OK,356 {357 'message': 'Device unregistered.',358 }359 )360# pylint: disable=missing-docstring361class DeviceControllerError(AldebaranError):362 pass363class DeviceControllerConnectionError(DeviceControllerError):364 pass...
polling_status.py
Source:polling_status.py
...131 if self._metric_statuses[k] != DEVICE_METRICS_STATES.SUCCESS:132 self._metric_statuses[k] = DEVICE_METRICS_STATES.PARTIAL_METRIC_FAILURE133 else:134 self._metric_statuses[k] = DEVICE_METRICS_STATES.SUCCESS135 self._set_device_status()136 def handle_exception(self, k, e):137 """138 Mutate the state of the _device_metrics_status dictionary.139 Args:140 k(str): name of the metric to apply the exception to141 e(Exception): exception which has occurred for the given k142 """143 self.logger.warn(u'Error while trying to poll "%s" (%s) for "%s": %s - %s' %144 (str(self._device_name), str(self._device_type), k, repr(e), traceback.format_exc()))145 if k in self._metric_statuses:146 if self._metric_statuses[k] in \147 [DEVICE_METRICS_STATES.SUCCESS, DEVICE_METRICS_STATES.PARTIAL_METRIC_FAILURE]:148 self._metric_statuses[k] = DEVICE_METRICS_STATES.PARTIAL_METRIC_FAILURE149 self._set_device_status()150 return151 if type(e) in EXCEPTIONS_KEYS:152 self._metric_statuses[k] = exceptions_dict[type(e)]153 else:154 found_exception = False155 for exception in inspect.getmro(type(e)):156 if exception in EXCEPTIONS_KEYS:157 self._metric_statuses[k] = exceptions_dict[exception]158 found_exception = True159 break160 if not found_exception:161 self._metric_statuses[k] = DEVICE_METRICS_STATES.INTERNAL_FAILURE162 self._set_device_status()163 def _set_device_status(self):164 """165 Call when done with operations on polling status object to set overall166 device status based upon component metric statuses.167 """168 if len(self._metric_statuses) > 0:169 if all(status == DEVICE_METRICS_STATES.SUCCESS for status in list(self._metric_statuses.values())):170 self._device_status = DEVICE_METRICS_STATES.SUCCESS171 elif DEVICE_METRICS_STATES.SUCCESS in list(self._metric_statuses.values()):172 self._device_status = DEVICE_METRICS_STATES.PARTIAL_METRIC_FAILURE173 else:174 count = Counter(list(self._metric_statuses.values()))175 if len(count.most_common()) > 0:176 # get the most common (1) status and it's count as a tuple, and grab just its name177 self._device_status = count.most_common(1)[0][0]...
EdgeServer.py
Source:EdgeServer.py
...66 def set_device_values__by_device_id(self, device_id, update_values: Device_Update_Model):67 # device = tuple(filter(lambda dev: dev.device_id == device_id, self.get_registered_device_list()))68 for device in self.get_registered_device_list():69 if device.device_id == device_id:70 self._set_device_status(device_id, update_values)71 break72 def set_device_values__by_device_type(self, device_type: ActiveDeviceTypes, update_values: Device_Update_Model):73 for device in self.get_registered_device_list():74 if device.device_type == device_type.name:75 self._set_device_status(device.device_id, update_values)76 def set_device_values__by_room(self, room: ActiveRooms, update_values: Device_Update_Model):77 for device in self.get_registered_device_list():78 if device.room_type == room.value:79 self._set_device_status(device.device_id, update_values)80 # Controlling and performing the operations on the devices81 # based on the request received82 def set_status(self, update_values: Device_Update_Model):83 for device in self.get_registered_device_list():84 self._set_device_status(device.device_id, update_values)85 def _set_device_status(self, device_id, update_values: Device_Update_Model):86 self.send_message_to_device(ActiveTopics.DEVICE_UPDATE_REQUEST_TOPIC_NAME.value.format(device_id),87 update_values.to_json(), qos=0)88 ################ SET call(s) to set device status/values :: START ################89 # handle different subscribed topics90 def handle_topic(self, topic_name, payload):91 if topic_name == ActiveTopics.DEVICE_REGISTER_REQUEST_TOPIC_NAME.value:92 self._device_registration_handler(Registration_Request.from_json(payload))93 elif topic_name == ActiveTopics.SEND_DEVICE_STATUS_TO_EDGE_TOPIC_NAME.value:94 self._process_device_status(payload)95 elif topic_name == ActiveTopics.DEVICE_UPDATE_COMMAND_RESULT_TOPIC_NAME.value:96 self._handle_device_update_ack(Update_Command_Result.from_json(payload))97 else:98 print(" {} topic is not supported! ".format(topic_name))99 # handle device registration request...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!