Best Python code snippet using lemoncheesecake
viewerclient.py
Source:viewerclient.py
...184 def _request_channel(self):185 return "DIRECTOR_TREE_VIEWER_REQUEST_<{:s}>".format(self.client_id)186 def _response_channel(self):187 return "DIRECTOR_TREE_VIEWER_RESPONSE_<{:s}>".format(self.client_id)188 def _handler_loop(self):189 while True:190 self.lcm.handle()191 def start_handler(self):192 if self.handler_thread is not None:193 return194 self.handler_thread = threading.Thread(195 target=self._handler_loop)196 self.handler_thread.daemon = True197 self.handler_thread.start()198 def _handle_response(self, channel, msgdata):199 msg = viewer2_comms_t.decode(msgdata)200 data = json.loads(msg.data)201 if data["status"] == 0:202 pass...
events.py
Source:events.py
...85 print("Fire event %s" % event)86 self._queue.put(event)87 def get_pending_failure(self):88 return self._pending_failure89 def _handler_loop(self):90 while True:91 event = self._queue.get()92 if event is None:93 break94 try:95 self.handle_event(event)96 except Exception as excp:97 self._pending_failure = excp, serialize_current_exception()98 break99 finally:100 self._queue.task_done()101 @contextmanager102 def handle_events(self):103 self._queue = Queue()...
sanitarize.py
Source:sanitarize.py
1""" Trying to control flow """2import asyncio3from apd_types import ApHass4from create_helpers import CreateHelpers5from entity_oper import EntityObject67import globals as g8from ws_comm import WsHA9from entity_register import EntityRegister10from app_system import AppSystem11from bootstart import boot_logger, boot_logger_off, boot_module121314class Sanitarize(ApHass):15 @boot_logger_off16 @boot_module17 def initialize(self):18 self.debug("Initialize")1920 def init(self, end_callback):21 """[summary]2223 Args:24 end_callback (function): Will be called when all processes are done25 """26 self.info("Starting init")27 self.end_callback = end_callback # for the boot system2829 # Going throw all global definitions and adding them30 # result g.helper_register3132 ########################################33 # reducing for only one entity for debug34 """35 new_register: RegisterType = []36 for p in g.helper_register:37 entity = p[INDEX_KEY]38 if entity == "input_text.mower_map":39 new_register.append(p)40 g.helper_register.clear()41 g.helper_register = new_register 42 self.logger.debug(g.helper_register)43 """44 ########################################45 self.debug(46 "Entity register execute and waiting for finishing with entity_register_done"47 )48 self.ws: WsHA = self.sync_get_app("ws_comm")49 self.appSystem: AppSystem = self.sync_get_app("app_system") # type: ignore50 self.entityRegister: EntityRegister = self.sync_get_app("entity_register") # type: ignore51 self.createHelpers: CreateHelpers = self.sync_get_app("create_helpers") # type: ignore5253 self._handler_loop = self.sync_create_task(self.main_loop())5455 async def main_loop(self):56 self.info("Starting sanitarize")57 control_buffer = asyncio.Queue()58 tasks = {59 "appSystem": self.appSystem.execute(control_buffer),60 "entityRegister": self.entityRegister.execute(control_buffer),61 "createHelpers": self.createHelpers.execute(control_buffer),62 "addToAD": self._add_to_ad(control_buffer),63 "done": self._task_done(control_buffer),64 }65 # without creating helpers66 """67 tasks = {68 "appSystem": self.appSystem.execute(control_buffer),69 "entityRegister": self.entityRegister.execute(control_buffer),70 "done": self._task_done(control_buffer),71 }72 """7374 for p in tasks.keys():75 await control_buffer.put(p)76 try:77 while True:78 process = await control_buffer.get()79 self.debug(f"Process: {process}")80 to_do = tasks.get(process)81 if to_do is None:82 self.error("Wrong definition in tasks")83 raise ValueError("Wrong definition in tasks")84 await to_do85 except Exception as ex:86 template = "An exception of type {0} occurred. Arguments:\n{1!r}"87 message = template.format(type(ex).__name__, ex.args)88 self.error(message)89 raise ValueError("Fatal error in core")9091 async def _task_done(self, control_buffer: asyncio.Queue):92 # info bootstart that it is done93 self._handler_loop.cancel() # type: ignore94 self.end_callback(self)9596 async def _add_to_ad(self, control_buffer: asyncio.Queue):97 self.info("Waiting for system")98 eo: EntityObject99 for eo in g.created_helpers_obj:100 if eo.cmd_finished:101 if not await self.entity_exists(eo.entity_id, replace=False):102 await self.set_state(eo.entity_id, state=eo.initial)103 control_buffer.task_done()104105 def _done(self, *kwargs):106 self.debug("Fired end created entity")
...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!