Best Python code snippet using lisa_python
lisa_runner.py
Source:lisa_runner.py
...79 return is_all_results_completed and is_all_environment_completed80 def fetch_task(self) -> Optional[Task[None]]:81 self._prepare_environments()82 self._cleanup_deleted_environments()83 self._cleanup_done_results()84 # sort environments by status85 available_environments = self._sort_environments(self.environments)86 available_results = self._sort_test_results(87 [x for x in self.test_results if x.can_run]88 )89 # check deletable environments90 delete_task = self._delete_unused_environments()91 if delete_task:92 return delete_task93 if available_results and available_environments:94 for priority in range(6):95 can_run_results = self._get_results_by_priority(96 available_results, priority97 )98 if not can_run_results:99 continue100 # it means there are test cases and environment, so it needs to101 # schedule task.102 for environment in available_environments:103 if environment.is_in_use:104 # skip in used environments105 continue106 # try to pick the designated test result.107 environment_results = [108 x109 for x in can_run_results110 if environment.source_test_result111 and x.id_ == environment.source_test_result.id_112 ]113 if not environment_results:114 environment_results = self._get_runnable_test_results(115 test_results=can_run_results, environment=environment116 )117 if not environment_results:118 continue119 task = self._dispatch_test_result(120 environment=environment, test_results=environment_results121 )122 # there is more checking conditions. If some conditions doesn't123 # meet, the task is None. If so, not return, and try next124 # conditions or skip this test case.125 if task:126 return task127 if not any(128 x.is_in_use or x.status == EnvironmentStatus.New129 for x in available_environments130 ):131 # if there is no environment in used, new, and results are132 # not fit envs. those results cannot be run.133 self._skip_test_results(can_run_results)134 elif available_results:135 # no available environments, so mark all test results skipped.136 self._skip_test_results(available_results)137 self.status = ActionStatus.SUCCESS138 return None139 def close(self) -> None:140 if hasattr(self, "environments") and self.environments:141 for environment in self.environments:142 self._delete_environment_task(environment, [])143 super().close()144 def _dispatch_test_result(145 self, environment: Environment, test_results: List[TestResult]146 ) -> Optional[Task[None]]:147 check_cancelled()148 assert test_results149 can_run_results = test_results150 # deploy151 if environment.status == EnvironmentStatus.Prepared and can_run_results:152 return self._generate_task(153 task_method=self._deploy_environment_task,154 environment=environment,155 test_results=can_run_results[:1],156 )157 # run on deployed environment158 can_run_results = [x for x in can_run_results if x.can_run]159 if environment.status == EnvironmentStatus.Deployed and can_run_results:160 selected_test_results = self._get_test_results_to_run(161 test_results=test_results, environment=environment162 )163 if selected_test_results:164 return self._generate_task(165 task_method=self._run_test_task,166 environment=environment,167 test_results=selected_test_results,168 case_variables=self._case_variables,169 )170 # Check if there is case to run in a connected environment. If so,171 # initialize the environment172 initialization_results = self._get_runnable_test_results(173 test_results=test_results,174 environment_status=EnvironmentStatus.Connected,175 environment=environment,176 )177 if initialization_results:178 return self._generate_task(179 task_method=self._initialize_environment_task,180 environment=environment,181 test_results=initialization_results,182 )183 # run on connected environment184 can_run_results = [x for x in can_run_results if x.can_run]185 if environment.status == EnvironmentStatus.Connected and can_run_results:186 selected_test_results = self._get_test_results_to_run(187 test_results=test_results, environment=environment188 )189 if selected_test_results:190 return self._generate_task(191 task_method=self._run_test_task,192 environment=environment,193 test_results=selected_test_results,194 case_variables=self._case_variables,195 )196 return None197 def _delete_unused_environments(self) -> Optional[Task[None]]:198 available_environments = self._sort_environments(self.environments)199 # check deletable environments200 for environment in available_environments:201 # if an environment is in using, or not deployed, they won't be202 # deleted until end of runner.203 if environment.is_in_use or environment.status in [204 EnvironmentStatus.New,205 EnvironmentStatus.Prepared,206 ]:207 continue208 can_run_results = self._get_runnable_test_results(209 self.test_results, environment=environment210 )211 if not can_run_results:212 # no more test need this environment, delete it.213 self._log.debug(214 f"generating delete environment task on '{environment.name}'"215 )216 return self._generate_task(217 task_method=self._delete_environment_task,218 environment=environment,219 test_results=[],220 )221 return None222 def _prepare_environments(self) -> None:223 if all(x.status != EnvironmentStatus.New for x in self.environments):224 return225 proceeded_environments: List[Environment] = []226 for candidate_environment in self.environments:227 success = True228 if candidate_environment.status == EnvironmentStatus.New:229 success = self._prepare_environment(candidate_environment)230 if success:231 proceeded_environments.append(candidate_environment)232 # sort by environment source and cost cases233 # user defined should be higher priority than test cases' requirement234 proceeded_environments.sort(key=lambda x: (not x.is_predefined, x.cost))235 self.environments = proceeded_environments236 def _deploy_environment_task(237 self, environment: Environment, test_results: List[TestResult]238 ) -> None:239 try:240 try:241 self.platform.deploy_environment(environment)242 assert (243 environment.status == EnvironmentStatus.Deployed244 ), f"actual: {environment.status}"245 self._reset_awaitable_timer("deploy")246 except ResourceAwaitableException as identifier:247 if self._is_awaitable_timeout("deploy"):248 self._log.info(249 f"[{environment.name}] timeout on waiting for more resource: "250 f"{identifier}, skip assigning case."251 )252 raise SkippedException(identifier)253 else:254 # rerun prepare to calculate resource again.255 environment.status = EnvironmentStatus.New256 except Exception as identifier:257 self._attach_failed_environment_to_result(258 environment=environment,259 result=test_results[0],260 exception=identifier,261 )262 self._delete_environment_task(environment=environment, test_results=[])263 def _initialize_environment_task(264 self, environment: Environment, test_results: List[TestResult]265 ) -> None:266 self._log.debug(f"start initializing task on '{environment.name}'")267 assert test_results268 try:269 environment.initialize()270 assert (271 environment.status == EnvironmentStatus.Connected272 ), f"actual: {environment.status}"273 except Exception as identifier:274 self._attach_failed_environment_to_result(275 environment=environment,276 result=test_results[0],277 exception=identifier,278 )279 self._delete_environment_task(environment=environment, test_results=[])280 def _run_test_task(281 self,282 environment: Environment,283 test_results: List[TestResult],284 case_variables: Dict[str, VariableEntry],285 ) -> None:286 self._log.debug(287 f"start running cases on '{environment.name}', "288 f"case count: {len(test_results)}, "289 f"status {environment.status.name}"290 )291 assert test_results292 assert len(test_results) == 1, (293 f"single test result to run, " f"but {len(test_results)} found."294 )295 test_result = test_results[0]296 suite_metadata = test_result.runtime_data.metadata.suite297 test_suite: TestSuite = suite_metadata.test_class(298 suite_metadata,299 )300 test_suite.start(301 environment=environment,302 case_results=test_results,303 case_variables=case_variables,304 )305 # release environment reference to optimize memory.306 test_result.environment = None307 # Some test cases may break the ssh connections. To reduce side effects308 # on next test cases, close the connection after each test run. It will309 # be connected on the next command automatically.310 environment.nodes.close()311 # Try to connect node(s), if cannot access node(s) of this environment,312 # set the current environment as Bad. So that this environment won't be reused.313 if not is_unittest() and not environment.nodes.test_connections():314 environment.status = EnvironmentStatus.Bad315 self._log.debug(316 f"set environment '{environment.name}' as bad, "317 f"because after test case '{test_result.name}', "318 f"node(s) cannot be accessible."319 )320 environment.nodes.close()321 # keep failed environment, not to delete322 if (323 test_result.status == TestStatus.FAILED324 and self.platform.runbook.keep_environment325 == constants.ENVIRONMENT_KEEP_FAILED326 ):327 self._log.debug(328 f"keep environment '{environment.name}', "329 f"because keep_environment is 'failed', "330 f"and test case '{test_result.name}' failed on it."331 )332 environment.status = EnvironmentStatus.Deleted333 # if an environment is in bad status, it will be deleted, not run more334 # test cases. But if the setting is to keep failed environment, it may335 # be kept in above logic.336 if environment.status == EnvironmentStatus.Bad or environment.is_dirty:337 self._log.debug(338 f"delete environment '{environment.name}', "339 f"because it's in Bad status or marked as dirty."340 )341 self._delete_environment_task(342 environment=environment, test_results=test_results343 )344 def _delete_environment_task(345 self, environment: Environment, test_results: List[TestResult]346 ) -> None:347 """348 May be called async349 """350 # the predefined environment shouldn't be deleted, because it351 # serves all test cases.352 if environment.status == EnvironmentStatus.Deleted or (353 environment.status == EnvironmentStatus.Prepared354 and not environment.is_in_use355 ):356 # The prepared only environment doesn't need to be deleted.357 # It may cause platform fail to delete non-existing environment.358 environment.status = EnvironmentStatus.Deleted359 else:360 try:361 self.platform.delete_environment(environment)362 except Exception as identifier:363 self._log.debug(364 f"error on deleting environment '{environment.name}': {identifier}"365 )366 def _prepare_environment(self, environment: Environment) -> bool:367 success = True368 try:369 try:370 self.platform.prepare_environment(environment)371 self._reset_awaitable_timer("prepare")372 except ResourceAwaitableException as identifier:373 # if timed out, raise the exception and skip the test case. If374 # not, do nothing to keep env as new to try next time.375 if self._is_awaitable_timeout("prepare"):376 raise SkippedException(identifier)377 except Exception as identifier:378 success = False379 matched_result = self._match_failed_environment_with_result(380 environment=environment,381 candidate_results=self.test_results,382 exception=identifier,383 )384 self._attach_failed_environment_to_result(385 environment=environment,386 result=matched_result,387 exception=identifier,388 )389 return success390 def _cleanup_deleted_environments(self) -> None:391 # remove reference to unused environments. It can save memory on big runs.392 new_environments: List[Environment] = []393 for environment in self.environments[:]:394 if environment.status != EnvironmentStatus.Deleted:395 new_environments.append(environment)396 self.environments = new_environments397 def _cleanup_done_results(self) -> None:398 # remove reference to completed test results. It can save memory on big runs.399 remaining_results: List[TestResult] = []400 for test_result in self.test_results[:]:401 if not test_result.is_completed:402 remaining_results.append(test_result)403 self.test_results = remaining_results404 def _get_results_by_priority(405 self, test_results: List[TestResult], priority: int406 ) -> List[TestResult]:407 if not test_results:408 return []409 test_results = [410 x for x in test_results if x.runtime_data.metadata.priority == priority411 ]...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!