Best Python code snippet using avocado_python
service.py
Source:service.py
...85 headers=headers,86 serializer='json',87 retry=True88 )89 def _handle_task_finished(self):90 correlation_id = self.task.message.properties['correlation_id']91 try:92 status = "success"93 result = self.task.future.result()94 except Exception as e:95 logger.exception(96 f"[{correlation_id}]Exception occurred during handling"97 )98 status = "error"99 result = f"Request {self.task.message.decode()} failed\n{repr(e)}"100 self._reply(self.task.message, result, status)101 self.task.message.ack()102 logger.info(f"[{correlation_id}] done")103 def wait_ready(self):104 while not self.ready:105 time.sleep(0.1)106 def stop(self):107 self.executor.shutdown()108 self.should_stop = True109 def get_consumers(self, Consumer, channel):110 self._setup_dlx_exchange(channel)111 self._setup_invalid_queue(channel)112 self._setup_worker_queue(channel)113 return [Consumer(queues=self.worker_queue,114 callbacks=[self._handle_incoming_message],115 accept=['json'],116 prefetch_count=1)]117 def on_iteration(self):118 if self.task is not None and self.task.future.done():119 logger.debug("task done")120 self._handle_task_finished()121 self.task = None122 def on_connection_error(self, exc, interval):123 super().on_connection_error(exc, interval)124 if self.task:125 logger.info("Dropping currently running task")126 # there is no way to kill the running thread, so we wait for it127 # future.exception() does not reraise compared to future.result()128 self.task.future.exception()129 self.task = None130 logger.info("Dropping currently running task...Done")131 def on_consume_ready(self, connection, channel, consumers):132 """Callback executed one we are ready to consume"""133 self.ready = True134 logger.info("{} is ready for consuming", self)...
repo.py
Source:repo.py
...17 #: status that have been superseded by newer status.18 self._status_journal_summary = []19 #: Contains the task IDs keyed by the result received20 self._by_result = {}21 def _handle_task_finished(self, message):22 self._set_by_result(message)23 self._set_task_data(message)24 def _handle_task_started(self, message):25 if 'output_dir' not in message:26 raise StatusMsgMissingDataError('output_dir')27 self._set_task_data(message)28 def _set_by_result(self, message):29 """Sets an entry in the aggregate by result.30 For messages that include a "result" key, expected for example,31 from a "finished" status message, this will allow users to query32 for tasks with a given result."""33 result = message.get('result')34 if result not in self._by_result:35 self._by_result[result] = []...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!