How to use _determine_status_server method in avocado

Best Python code snippet using avocado_python

runner_nrunner.py

Source:runner_nrunner.py Github

copy

Full Screen

...259 :type config: dict260 """261 for runnable in runnables:262 runnable.config = Runnable.filter_runnable_config(runnable.kind, config)263 def _determine_status_server(self, test_suite, config_key):264 if test_suite.config.get("nrunner.status_server_auto"):265 # no UNIX domain sockets on Windows266 if platform.system() != "Windows":267 if self.status_server_dir is None:268 self.status_server_dir = tempfile.TemporaryDirectory(269 prefix="avocado_"270 )271 return os.path.join(self.status_server_dir.name, ".status_server.sock")272 return test_suite.config.get(config_key)273 def _create_status_server(self, test_suite, job):274 listen = self._determine_status_server(275 test_suite, "nrunner.status_server_listen"276 )277 # pylint: disable=W0201278 self.status_repo = StatusRepo(job.unique_id)279 # pylint: disable=W0201280 self.status_server = StatusServer(listen, self.status_repo)281 async def _update_status(self, job):282 tasks_by_id = {283 str(runtime_task.task.identifier): runtime_task.task284 for runtime_task in self.runtime_tasks285 }286 message_handler = MessageHandler()287 while True:288 try:289 (_, task_id, _, index) = self.status_repo.status_journal_summary_pop()290 except IndexError:291 await asyncio.sleep(0.05)292 continue293 message = self.status_repo.get_task_data(task_id, index)294 task = tasks_by_id.get(task_id)295 message_handler.process_message(message, task, job)296 @staticmethod297 def _abort_if_missing_runners(runnables):298 if runnables:299 missing_kinds = set([runnable.kind for runnable in runnables])300 msg = (301 f"Could not find runners for runnable(s) of kind(s): "302 f"{', '.join(missing_kinds)}"303 )304 raise JobError(msg)305 def run_suite(self, job, test_suite):306 summary = set()307 if not test_suite.enabled:308 job.interrupted_reason = f"Suite {test_suite.name} is disabled."309 return summary310 test_suite.tests, missing_requirements = check_runnables_runner_requirements(311 test_suite.tests312 )313 self._update_avocado_configuration_used_on_runnables(314 test_suite.tests, test_suite.config315 )316 self._abort_if_missing_runners(missing_requirements)317 job.result.tests_total = test_suite.variants.get_number_of_tests(318 test_suite.tests319 )320 self._create_status_server(test_suite, job)321 graph = RuntimeTaskGraph(322 test_suite.get_test_variants(),323 test_suite.name,324 self._determine_status_server(test_suite, "nrunner.status_server_uri"),325 job.unique_id,326 )327 # pylint: disable=W0201328 self.runtime_tasks = graph.get_tasks_in_topological_order()329 # Start the status server330 asyncio.ensure_future(self.status_server.serve_forever())331 if test_suite.config.get("nrunner.shuffle"):332 random.shuffle(self.runtime_tasks)333 test_ids = [334 rt.task.identifier335 for rt in self.runtime_tasks336 if rt.task.category == "test"337 ]338 tsm = TaskStateMachine(self.runtime_tasks, self.status_repo)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful