Best Python code snippet using avocado_python
job.py
Source:job.py
...123 if unique_id is None:124 self.config["run.unique_job_id"] = "0" * 40125 self.config["sysinfo.collect.enabled"] = "off"126 self.test_suites = test_suites or []127 self._check_test_suite_name_uniqueness()128 #: The log directory for this job, also known as the job results129 #: directory. If it's set to None, it means that the job results130 #: directory has not yet been created.131 self.logdir = None132 self.logfile = None133 self.tmpdir = None134 self.__keep_tmpdir = True135 self._base_tmpdir = None136 self.status = "RUNNING"137 self.result = None138 self.interrupted_reason = None139 self._timeout = None140 self._unique_id = None141 #: The time at which the job has started or `-1` if it has not been142 #: started by means of the `run()` method.143 self.time_start = -1144 #: The time at which the job has finished or `-1` if it has not been145 #: started by means of the `run()` method.146 self.time_end = -1147 #: The total amount of time the job took from start to finish,148 #: or `-1` if it has not been started by means of the `run()` method149 self.time_elapsed = -1150 self.funcatexit = CallbackRegister(f"JobExit {self.unique_id}", LOG_JOB)151 self._stdout_stderr = None152 self.replay_sourcejob = self.config.get("replay_sourcejob")153 self.exitcode = exit_codes.AVOCADO_ALL_OK154 self._result_events_dispatcher = None155 @property156 def result_events_dispatcher(self):157 # The result events dispatcher is shared with the test runner.158 # Because of our goal to support using the phases of a job159 # freely, let's get the result events dispatcher on first usage.160 # A future optimization may load it on demand.161 if self._result_events_dispatcher is None:162 self._result_events_dispatcher = dispatcher.ResultEventsDispatcher(163 self.config164 )165 output.log_plugin_failures(self._result_events_dispatcher.load_failures)166 return self._result_events_dispatcher167 @property168 def test_results_path(self):169 return os.path.join(self.logdir, "test-results")170 def __enter__(self):171 self.setup()172 return self173 def __exit__(self, _exc_type, _exc_value, _traceback):174 self.cleanup()175 def __start_job_logging(self):176 # Enable test logger177 fmt = "%(asctime)s %(name)s %(levelname)-5.5s| %(message)s"178 test_handler = output.add_log_handler(179 LOG_JOB, logging.FileHandler, self.logfile, self.loglevel, fmt180 )181 main_logger = logging.getLogger("avocado")182 main_logger.addHandler(test_handler)183 main_logger.setLevel(self.loglevel)184 self.__logging_handlers[test_handler] = [LOG_JOB.name, ""]185 # Enable console loggers186 enabled_logs = self.config.get("core.show")187 if "test" in enabled_logs and "early" not in enabled_logs:188 self._stdout_stderr = sys.stdout, sys.stderr189 # Enable std{out,err} but redirect both to stdout190 sys.stdout = STD_OUTPUT.stdout191 sys.stderr = STD_OUTPUT.stdout192 test_handler = output.add_log_handler(193 LOG_JOB,194 logging.StreamHandler,195 STD_OUTPUT.stdout,196 logging.DEBUG,197 fmt="%(message)s",198 )199 main_logger.addHandler(test_handler)200 self.__logging_handlers[test_handler] = [LOG_JOB.name, ""]201 def __stop_job_logging(self):202 if self._stdout_stderr:203 sys.stdout, sys.stderr = self._stdout_stderr204 for handler, loggers in self.__logging_handlers.items():205 for logger in loggers:206 logging.getLogger(logger).removeHandler(handler)207 def _log_avocado_config(self):208 LOG_JOB.info("Avocado config:")209 LOG_JOB.info("")210 for line in pprint.pformat(self.config).splitlines():211 LOG_JOB.info(line)212 LOG_JOB.info("")213 def _log_avocado_datadir(self):214 LOG_JOB.info("Avocado Data Directories:")215 LOG_JOB.info("")216 LOG_JOB.info("base %s", self.config.get("datadir.paths.base_dir"))217 LOG_JOB.info("tests %s", data_dir.get_test_dir())218 LOG_JOB.info("data %s", self.config.get("datadir.paths.data_dir"))219 LOG_JOB.info("logs %s", self.logdir)220 LOG_JOB.info("")221 @staticmethod222 def _log_avocado_version():223 version_log = version.VERSION224 git_version = get_avocado_git_version()225 if git_version is not None:226 version_log += git_version227 LOG_JOB.info("Avocado version: %s", version_log)228 LOG_JOB.info("")229 @staticmethod230 def _log_cmdline():231 cmdline = " ".join(sys.argv)232 LOG_JOB.info("Command line: %s", cmdline)233 LOG_JOB.info("")234 def _log_job_debug_info(self):235 """236 Log relevant debug information to the job log.237 """238 self._log_cmdline()239 self._log_avocado_version()240 self._log_avocado_config()241 self._log_avocado_datadir()242 for suite in self.test_suites:243 self._log_variants(suite.variants)244 self._log_tmp_dir()245 self._log_job_id()246 def _log_job_id(self):247 LOG_JOB.info("Job ID: %s", self.unique_id)248 if self.replay_sourcejob is not None:249 LOG_JOB.info("Replay of Job ID: %s", self.replay_sourcejob)250 LOG_JOB.info("")251 def _log_tmp_dir(self):252 LOG_JOB.info("Temporary dir: %s", self.tmpdir)253 LOG_JOB.info("")254 @staticmethod255 def _log_variants(variants):256 lines = variants.to_str(summary=1, variants=1, use_utf8=False)257 for line in lines.splitlines():258 LOG_JOB.info(line)259 def _setup_job_category(self):260 """261 This has to be called after self.logdir has been defined262 It attempts to create a directory one level up from the job results,263 with the given category name. Then, a symbolic link is created to264 this job results directory.265 This should allow a user to look at a single directory for all266 jobs of a given category.267 """268 category = self.config.get("run.job_category")269 if category is None:270 return271 if category != astring.string_to_safe_path(category):272 msg = (273 f"Unable to set category in job results: name is not "274 f"filesystem safe: {category}"275 )276 LOG_UI.warning(msg)277 LOG_JOB.warning(msg)278 return279 # we could also get "base_logdir" from config, but I believe this is280 # the best choice because it reduces the dependency surface (depends281 # only on self.logdir)282 category_path = os.path.join(os.path.dirname(self.logdir), category)283 try:284 os.mkdir(category_path)285 except FileExistsError:286 pass287 try:288 os.symlink(289 os.path.relpath(self.logdir, category_path),290 os.path.join(category_path, os.path.basename(self.logdir)),291 )292 except NotImplementedError:293 msg = f"Unable to link this job to category {category}"294 LOG_UI.warning(msg)295 LOG_JOB.warning(msg)296 except OSError:297 msg = f"Permission denied to link this job to category {category}"298 LOG_UI.warning(msg)299 LOG_JOB.warning(msg)300 def _setup_job_results(self):301 """302 Prepares a job result directory, also known as logdir, for this job303 """304 base_logdir = self.config.get("run.results_dir")305 if base_logdir is None:306 self.logdir = data_dir.create_job_logs_dir(unique_id=self.unique_id)307 else:308 base_logdir = os.path.abspath(base_logdir)309 self.logdir = data_dir.create_job_logs_dir(310 base_dir=base_logdir, unique_id=self.unique_id311 )312 if not self.config.get("run.dry_run.enabled"):313 self._update_latest_link()314 self.logfile = os.path.join(self.logdir, "job.log")315 idfile = os.path.join(self.logdir, "id")316 with open(idfile, "w", encoding="utf-8") as id_file_obj:317 id_file_obj.write(f"{self.unique_id}\n")318 id_file_obj.flush()319 os.fsync(id_file_obj)320 def _update_latest_link(self):321 """322 Update the latest job result symbolic link [avocado-logs-dir]/latest.323 """324 def soft_abort(msg):325 """Only log the problem"""326 LOG_JOB.warning("Unable to update the latest link: %s", msg)327 basedir = os.path.dirname(self.logdir)328 basename = os.path.basename(self.logdir)329 proc_latest = os.path.join(basedir, f"latest.{os.getpid()}")330 latest = os.path.join(basedir, "latest")331 if os.path.exists(latest) and not os.path.islink(latest):332 soft_abort(f'"{latest}" already exists and is not a symlink')333 return334 if os.path.exists(proc_latest):335 try:336 os.unlink(proc_latest)337 except OSError as details:338 soft_abort(f"Unable to remove {proc_latest}: {details}")339 return340 try:341 os.symlink(basename, proc_latest)342 os.rename(proc_latest, latest)343 except OSError as details:344 soft_abort(f"Unable to create create latest symlink: {details}")345 return346 finally:347 if os.path.exists(proc_latest):348 os.unlink(proc_latest)349 @classmethod350 def from_config(cls, job_config, suites_configs=None):351 """Helper method to create a job from config dicts.352 This is different from the Job() initialization because here we are353 assuming that you need some help to build the test suites. Avocado will354 try to resolve tests based on the configuration information instead of355 assuming pre populated test suites.356 Keep in mind that here we are going to replace the suite.name with a357 counter.358 If you need create a custom Job with your own TestSuites, please use359 the Job() constructor instead of this method.360 :param job_config: A config dict to be used on this job and also as a361 'global' config for each test suite.362 :type job_config: dict363 :param suites_configs: A list of specific config dict to be used on364 each test suite. Each suite config will be365 merged with the job_config dict. If None is366 passed then this job will have only one367 test_suite with the same config as job_config.368 :type suites_configs: list369 """370 suites_configs = suites_configs or [deepcopy(job_config)]371 suites = []372 for index, config in enumerate(suites_configs, start=1):373 suites.append(374 TestSuite.from_config(config, name=index, job_config=job_config)375 )376 return cls(job_config, suites)377 @property378 def size(self):379 """Job size is the sum of all test suites sizes."""380 return sum([suite.size for suite in self.test_suites])381 @property382 def test_suite(self):383 """This is the first test suite of this job (deprecated).384 Please, use test_suites instead.385 """386 if self.test_suites:387 return self.test_suites[0]388 @test_suite.setter389 def test_suite(self, var):390 """Temporary setter. Suites should be set from test_suites."""391 if self.test_suites:392 self.test_suites[0] = var393 else:394 self.test_suites = [var]395 @property396 def timeout(self):397 if self._timeout is None:398 self._timeout = self.config.get("job.run.timeout")399 return self._timeout400 @property401 def unique_id(self):402 if self._unique_id is None:403 self._unique_id = (404 self.config.get("run.unique_job_id") or create_unique_job_id()405 )406 return self._unique_id407 def cleanup(self):408 """409 Cleanup the temporary job handlers (dirs, global setting, ...)410 """411 output.del_last_configuration()412 self.__stop_job_logging()413 if not self.__keep_tmpdir and os.path.exists(self.tmpdir):414 shutil.rmtree(self.tmpdir)415 shutil.rmtree(self._base_tmpdir)416 cleanup_conditionals = (417 self.config.get("run.dry_run.enabled"),418 not self.config.get("run.dry_run.no_cleanup"),419 )420 if all(cleanup_conditionals):421 # Also clean up temp base directory created because of the dry-run422 base_logdir = self.config.get("run.results_dir")423 if base_logdir is not None:424 try:425 shutil.rmtree(base_logdir)426 except FileNotFoundError:427 pass428 def create_test_suite(self):429 msg = (430 "create_test_suite() is deprecated. You can also create your "431 "own suites with TestSuite() or TestSuite.from_config()."432 )433 warnings.warn(msg, DeprecationWarning)434 try:435 self.test_suite = TestSuite.from_config(self.config)436 if self.test_suite and self.test_suite.size == 0:437 refs = self.test_suite.references438 msg = (439 f"No tests found for given test references, try "440 f"'avocado -V list {' '.join(refs)}' for details"441 )442 raise exceptions.JobTestSuiteEmptyError(msg)443 except TestSuiteError as details:444 raise exceptions.JobBaseException(details)445 if self.test_suite:446 self.result.tests_total = self.test_suite.size447 def post_tests(self):448 """449 Run the post tests execution hooks450 By default this runs the plugins that implement the451 :class:`avocado.core.plugin_interfaces.JobPostTests` interface.452 """453 self.result_events_dispatcher.map_method("post_tests", self)454 def pre_tests(self):455 """456 Run the pre tests execution hooks457 By default this runs the plugins that implement the458 :class:`avocado.core.plugin_interfaces.JobPreTests` interface.459 """460 self.result_events_dispatcher.map_method("pre_tests", self)461 def render_results(self):462 """Render test results that depend on all tests having finished.463 By default this runs the plugins that implement the464 :class:`avocado.core.plugin_interfaces.Result` interface.465 """466 result_dispatcher = dispatcher.ResultDispatcher()467 if result_dispatcher.extensions:468 result_dispatcher.map_method("render", self.result, self)469 def get_failed_tests(self):470 """Gets the tests with status 'FAIL' and 'ERROR' after the Job ended.471 :return: List of failed tests472 """473 tests = []474 if self.result:475 for test in self.result.tests:476 if test.get("status") in ["FAIL", "ERROR"]:477 tests.append(test)478 return tests479 def run(self):480 """481 Runs all job phases, returning the test execution results.482 This method is supposed to be the simplified interface for483 jobs, that is, they run all phases of a job.484 :return: Integer with overall job status. See485 :mod:`avocado.core.exit_codes` for more information.486 """487 assert self.tmpdir is not None, "Job.setup() not called"488 if self.time_start == -1:489 self.time_start = time.monotonic()490 try:491 self.result.tests_total = self.size492 pre_post_dispatcher = dispatcher.JobPrePostDispatcher()493 output.log_plugin_failures(pre_post_dispatcher.load_failures)494 pre_post_dispatcher.map_method("pre", self)495 self.pre_tests()496 return self.run_tests()497 except exceptions.JobBaseException as details:498 self.status = details.status499 fail_class = details.__class__.__name__500 self.log.error("\nAvocado job failed: %s: %s", fail_class, details)501 self.exitcode |= exit_codes.AVOCADO_JOB_FAIL502 return self.exitcode503 except exceptions.OptionValidationError as details:504 self.log.error("\n%s", str(details))505 self.exitcode |= exit_codes.AVOCADO_JOB_FAIL506 return self.exitcode507 except Exception as details: # pylint: disable=W0703508 self.status = "ERROR"509 exc_type, exc_value, exc_traceback = sys.exc_info()510 tb_info = traceback.format_exception(511 exc_type, exc_value, exc_traceback.tb_next512 )513 fail_class = details.__class__.__name__514 self.log.error("\nAvocado crashed: %s: %s", fail_class, details)515 for line in tb_info:516 self.log.debug(line)517 self.log.error(518 "Please include the traceback info and command line"519 " used on your bug report"520 )521 self.log.error("Report bugs visiting %s", _NEW_ISSUE_LINK)522 self.exitcode |= exit_codes.AVOCADO_FAIL523 return self.exitcode524 finally:525 self.post_tests()526 if self.time_end == -1:527 self.time_end = time.monotonic()528 self.time_elapsed = self.time_end - self.time_start529 self.render_results()530 pre_post_dispatcher.map_method("post", self)531 def run_tests(self):532 """533 The actual test execution phase534 """535 self._log_job_debug_info()536 jobdata.record(self, sys.argv)537 if self.size == 0:538 msg = 'Unable to resolve any reference or "resolver.references" is empty.'539 LOG_UI.error(msg)540 if not self.test_suites:541 self.exitcode |= exit_codes.AVOCADO_JOB_FAIL542 return self.exitcode543 summary = set()544 for suite in self.test_suites:545 summary |= suite.run(self)546 # If it's all good so far, set job status to 'PASS'547 if self.status == "RUNNING":548 self.status = "PASS"549 LOG_JOB.info("Test results available in %s", self.logdir)550 if "INTERRUPTED" in summary:551 self.exitcode |= exit_codes.AVOCADO_JOB_INTERRUPTED552 if "FAIL" in summary or "ERROR" in summary:553 self.exitcode |= exit_codes.AVOCADO_TESTS_FAIL554 return self.exitcode555 def _check_test_suite_name_uniqueness(self):556 all_names = [suite.name for suite in self.test_suites]557 duplicate_names = set([name for name in all_names if all_names.count(name) > 1])558 if duplicate_names:559 duplicate_names = ", ".join(duplicate_names)560 msg = (561 f"Job contains suites with the following duplicate "562 f"name(s) {duplicate_names}. Test suite names must be "563 f"unique to guarantee that results will not be overwritten"564 )565 raise exceptions.JobTestSuiteDuplicateNameError(msg)566 def setup(self):567 """568 Setup the temporary job handlers (dirs, global setting, ...)569 """...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!