Best Python code snippet using lisa_python
test_testsuite.py
Source:test_testsuite.py
...130 ut: TestCase, case_runbook: List[Any], expected_descriptions: List[str]131) -> List[TestCaseRuntimeData]:132 runbook = RunbookBuilder._validate_and_load({constants.TESTCASE: case_runbook})133 case_metadata = generate_cases_metadata()134 runbook.testcase = parse_testcase_filters(runbook.testcase_raw)135 filters = cast(List[schema.TestCase], runbook.testcase)136 selected = select_testcases(filters, case_metadata)137 ut.assertListEqual(expected_descriptions, [case.description for case in selected])138 return selected139class TestSuiteTestCase(TestCase):140 def generate_suite_instance(self) -> MockTestSuite:141 case_results = generate_cases_result()142 self.case_results = case_results[:2]143 suite_metadata = case_results[0].runtime_data.metadata.suite144 runbook = generate_runbook(is_single_env=True, local=True, remote=True)145 envs = load_environments(runbook)146 self.default_env = list(envs.values())[0]147 assert self.default_env148 test_suite = MockTestSuite(...
runner.py
Source:runner.py
...16from lisa.util.parallel import Task, TaskManager, cancel, set_global_task_manager17from lisa.util.perf_timer import Timer, create_timer18from lisa.util.subclasses import Factory19from lisa.variable import VariableEntry, get_case_variables, replace_variables20def parse_testcase_filters(raw_filters: List[Any]) -> List[schema.BaseTestCaseFilter]:21 if raw_filters:22 filters: List[schema.BaseTestCaseFilter] = []23 factory = Factory[schema.BaseTestCaseFilter](schema.BaseTestCaseFilter)24 for raw_filter in raw_filters:25 if constants.TYPE not in raw_filter:26 raw_filter[constants.TYPE] = constants.TESTCASE_TYPE_LISA27 filter = factory.load_typed_runbook(raw_filter)28 filters.append(filter)29 else:30 filters = [schema.TestCase(name="test", criteria=schema.Criteria(area="demo"))]31 return filters32def print_results(33 test_results: List[TestResultMessage],34 output_method: Callable[[str], Any],35) -> None:36 output_method("________________________________________")37 result_count_dict: Dict[TestStatus, int] = {}38 for test_result in test_results:39 result_name = test_result.full_name40 result_status = test_result.status41 output_method(42 f"{result_name:>50}: {result_status.name:<8} {test_result.message}"43 )44 result_count = result_count_dict.get(result_status, 0)45 result_count += 146 result_count_dict[result_status] = result_count47 output_method("test result summary")48 output_method(f" TOTAL : {len(test_results)}")49 for key in TestStatus:50 count = result_count_dict.get(key, 0)51 if key == TestStatus.ATTEMPTED and count == 0:52 # attempted is confusing if user don't know it.53 # so hide it if there is no attempted cases.54 continue55 output_method(f" {key.name:<9}: {count}")56class RunnerResult(notifier.Notifier):57 """58 This is an internal notifier. It uses to collect test results for runner.59 """60 @classmethod61 def type_name(cls) -> str:62 # no type_name, not able to import from yaml book.63 return ""64 @classmethod65 def type_schema(cls) -> Type[schema.TypedSchema]:66 return schema.Notifier67 def _received_message(self, message: messages.MessageBase) -> None:68 assert isinstance(message, TestResultMessage), f"actual: {type(message)}"69 self.results[message.id_] = message70 def _subscribed_message_type(self) -> List[Type[messages.MessageBase]]:71 return [TestResultMessage]72 def _initialize(self, *args: Any, **kwargs: Any) -> None:73 self.results: Dict[str, TestResultMessage] = {}74class BaseRunner(BaseClassMixin, InitializableMixin):75 """76 Base runner of other runners. And other runners derived from this one.77 """78 def __init__(79 self,80 runbook: schema.Runbook,81 index: int,82 case_variables: Dict[str, Any],83 ) -> None:84 super().__init__()85 self._runbook = runbook86 self.id = f"{self.type_name()}_{index}"87 self._task_id = -188 self._log = get_logger("runner", str(index))89 self._log_handler: Optional[FileHandler] = None90 self._case_variables = case_variables91 self._timer = create_timer()92 self._wait_resource_timeout = runbook.wait_resource_timeout93 self._wait_resource_timers: Dict[str, Timer] = dict()94 self._wait_resource_logged: bool = False95 self.canceled = False96 def __repr__(self) -> str:97 return self.id98 @property99 def is_done(self) -> bool:100 raise NotImplementedError()101 def fetch_task(self) -> Optional[Task[None]]:102 """103 return:104 The runnable task, which can return test results105 """106 raise NotImplementedError()107 def close(self) -> None:108 self._log.debug(f"Runner finished in {self._timer.elapsed_text()}.")109 if self._log_handler:110 remove_handler(self._log_handler)111 self._log_handler.close()112 def generate_task_id(self) -> int:113 self._task_id += 1114 return self._task_id115 def _initialize(self, *args: Any, **kwargs: Any) -> None:116 # do not put this logic to __init__, since the mkdir takes time.117 # default lisa runner doesn't need separated handler.118 self._log_folder = constants.RUN_LOCAL_LOG_PATH119 self._working_folder = constants.RUN_LOCAL_WORKING_PATH120 if self.type_name() != constants.TESTCASE_TYPE_LISA:121 # create separated folder and log for each non-default runner.122 runner_path_name = f"{self.type_name()}_runner"123 self._log_folder = self._log_folder / runner_path_name124 self._log_file_name = str(self._log_folder / f"{runner_path_name}.log")125 self._log_folder.mkdir(parents=True, exist_ok=True)126 self._log_handler = create_file_handler(127 Path(self._log_file_name), self._log128 )129 self._working_folder = self._working_folder / runner_path_name130 def _is_awaitable_timeout(self, name: str) -> bool:131 _wait_resource_timer = self._wait_resource_timers.get(name, create_timer())132 self._wait_resource_timers[name] = _wait_resource_timer133 if _wait_resource_timer.elapsed(False) < self._wait_resource_timeout * 60:134 # wait a while to prevent called too fast135 if not self._wait_resource_logged:136 self._log.info(f"{name} waiting for more resource...")137 self._wait_resource_logged = True138 time.sleep(5)139 return False140 else:141 self._log.info(f"{name} timeout on waiting for more resource...")142 return True143 def _reset_awaitable_timer(self, name: str) -> None:144 _wait_resource_timer = self._wait_resource_timers.get(name, create_timer())145 _wait_resource_timer.reset()146 self._wait_resource_logged = False147 self._wait_resource_timers[name] = _wait_resource_timer148class RootRunner(Action):149 """150 The entry root runner, which starts other runners.151 """152 def __init__(self, runbook_builder: RunbookBuilder) -> None:153 super().__init__()154 self.exit_code: int = 0155 self._runbook_builder = runbook_builder156 self._log = get_logger("RootRunner")157 # this is to hold active runners, and will close them, if there is any158 # global error.159 self._runners: List[BaseRunner] = []160 self._runner_count: int = 0161 self._idle_logged: bool = False162 async def start(self) -> None:163 await super().start()164 self._results_collector: Optional[RunnerResult] = None165 try:166 transformer.run(167 self._runbook_builder, phase=constants.TRANSFORMER_PHASE_INIT168 )169 # update runbook for notifiers170 raw_data = copy.deepcopy(self._runbook_builder.raw_data)171 constants.RUNBOOK = replace_variables(172 raw_data, self._runbook_builder._variables173 )174 runbook = self._runbook_builder.resolve()175 self._runbook_builder.dump_variables()176 self._max_concurrency = runbook.concurrency177 self._log.debug(f"max concurrency is {self._max_concurrency}")178 self._results_collector = RunnerResult(schema.Notifier())179 register_notifier(self._results_collector)180 self._start_loop()181 except Exception as identifier:182 self._log.exception(183 "canceling runner due to exception", exc_info=identifier184 )185 cancel()186 finally:187 self._cleanup()188 if self._results_collector:189 results = [x for x in self._results_collector.results.values()]190 print_results(results, self._log.info)191 if runbook.exit_with_failed_count:192 # pass failed count to exit code193 self.exit_code = sum(194 1 for x in results if x.status == TestStatus.FAILED195 )196 else:197 # unknown error happened, see exception log for details.198 self.exit_code = -1199 async def stop(self) -> None:200 await super().stop()201 # TODO: to be implemented202 async def close(self) -> None:203 await super().close()204 def _fetch_runners(self) -> Iterator[BaseRunner]:205 root_runbook = self._runbook_builder.resolve(self._runbook_builder.variables)206 if root_runbook.combinator:207 combinator_factory = Factory[Combinator](Combinator)208 combinator = combinator_factory.create_by_runbook(root_runbook.combinator)209 del self._runbook_builder.raw_data[constants.COMBINATOR]210 self._log.debug(211 f"found combinator '{combinator.type_name()}', to expand runbook."212 )213 combinator.initialize()214 while True:215 variables = combinator.fetch(self._runbook_builder.variables)216 if variables is None:217 break218 sub_runbook_builder = self._runbook_builder.derive(variables=variables)219 transformer.run(220 sub_runbook_builder, phase=constants.TRANSFORMER_PHASE_EXPANDED221 )222 runners = self._generate_runners(223 sub_runbook_builder.resolve(), variables224 )225 for runner in runners:226 yield runner227 else:228 # no combinator, use the root runbook229 transformer.run(230 self._runbook_builder, phase=constants.TRANSFORMER_PHASE_EXPANDED231 )232 for runner in self._generate_runners(233 root_runbook, self._runbook_builder.variables234 ):235 yield runner236 def _generate_runners(237 self, runbook: schema.Runbook, variables: Dict[str, VariableEntry]238 ) -> Iterator[BaseRunner]:239 # group filters by runner type240 case_variables = get_case_variables(variables)241 runner_filters: Dict[str, List[schema.BaseTestCaseFilter]] = {}242 for raw_filter in runbook.testcase_raw:243 # by default run all filtered cases unless 'enable' is specified as false244 filter = schema.load_by_type(schema.BaseTestCaseFilter, raw_filter)245 if filter.enabled:246 raw_filters: List[schema.BaseTestCaseFilter] = runner_filters.get(247 filter.type, []248 )249 if not raw_filters:250 runner_filters[filter.type] = raw_filters251 raw_filters.append(raw_filter)252 else:253 self._log.debug(f"Skip disabled filter: {raw_filter}.")254 # initialize runners255 factory = Factory[BaseRunner](BaseRunner)256 for runner_name, raw_filters in runner_filters.items():257 self._log.debug(258 f"create runner {runner_name} with {len(raw_filters)} filter(s)."259 )260 # keep filters to current runner's only.261 runbook.testcase = parse_testcase_filters(raw_filters)262 runner = factory.create_by_type_name(263 type_name=runner_name,264 runbook=runbook,265 index=self._runner_count,266 case_variables=case_variables,267 )268 runner.initialize()269 self._runners.append(runner)270 self._runner_count += 1271 yield runner272 def _submit_runner_tasks(273 self,274 runner: BaseRunner,275 task_manager: TaskManager[None],...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!