Best Python code snippet using avocado_python
job.py
Source:job.py
...255 def _log_variants(variants):256 lines = variants.to_str(summary=1, variants=1, use_utf8=False)257 for line in lines.splitlines():258 LOG_JOB.info(line)259 def _setup_job_category(self):260 """261 This has to be called after self.logdir has been defined262 It attempts to create a directory one level up from the job results,263 with the given category name. Then, a symbolic link is created to264 this job results directory.265 This should allow a user to look at a single directory for all266 jobs of a given category.267 """268 category = self.config.get('run.job_category')269 if category is None:270 return271 if category != astring.string_to_safe_path(category):272 msg = ("Unable to set category in job results: name is not "273 "filesystem safe: %s" % category)274 LOG_UI.warning(msg)275 LOG_JOB.warning(msg)276 return277 # we could also get "base_logdir" from config, but I believe this is278 # the best choice because it reduces the dependency surface (depends279 # only on self.logdir)280 category_path = os.path.join(os.path.dirname(self.logdir),281 category)282 try:283 os.mkdir(category_path)284 except FileExistsError:285 pass286 try:287 os.symlink(os.path.relpath(self.logdir, category_path),288 os.path.join(category_path, os.path.basename(self.logdir)))289 except NotImplementedError:290 msg = "Unable to link this job to category %s" % category291 LOG_UI.warning(msg)292 LOG_JOB.warning(msg)293 except OSError:294 msg = "Permission denied to link this job to category %s" % category295 LOG_UI.warning(msg)296 LOG_JOB.warning(msg)297 def _setup_job_results(self):298 """299 Prepares a job result directory, also known as logdir, for this job300 """301 base_logdir = self.config.get('run.results_dir')302 if base_logdir is None:303 self.logdir = data_dir.create_job_logs_dir(unique_id=self.unique_id)304 else:305 base_logdir = os.path.abspath(base_logdir)306 self.logdir = data_dir.create_job_logs_dir(base_dir=base_logdir,307 unique_id=self.unique_id)308 if not self.config.get('run.dry_run.enabled'):309 self._update_latest_link()310 self.logfile = os.path.join(self.logdir, "job.log")311 idfile = os.path.join(self.logdir, "id")312 with open(idfile, 'w') as id_file_obj:313 id_file_obj.write("%s\n" % self.unique_id)314 id_file_obj.flush()315 os.fsync(id_file_obj)316 def _update_latest_link(self):317 """318 Update the latest job result symbolic link [avocado-logs-dir]/latest.319 """320 def soft_abort(msg):321 """ Only log the problem """322 LOG_JOB.warning("Unable to update the latest link: %s", msg)323 basedir = os.path.dirname(self.logdir)324 basename = os.path.basename(self.logdir)325 proc_latest = os.path.join(basedir, "latest.%s" % os.getpid())326 latest = os.path.join(basedir, "latest")327 if os.path.exists(latest) and not os.path.islink(latest):328 soft_abort('"%s" already exists and is not a symlink' % latest)329 return330 if os.path.exists(proc_latest):331 try:332 os.unlink(proc_latest)333 except OSError as details:334 soft_abort("Unable to remove %s: %s" % (proc_latest, details))335 return336 try:337 os.symlink(basename, proc_latest)338 os.rename(proc_latest, latest)339 except OSError as details:340 soft_abort("Unable to create create latest symlink: %s" % details)341 return342 finally:343 if os.path.exists(proc_latest):344 os.unlink(proc_latest)345 @classmethod346 def from_config(cls, job_config, suites_configs=None):347 """Helper method to create a job from config dicts.348 This is different from the Job() initialization because here we are349 assuming that you need some help to build the test suites. Avocado will350 try to resolve tests based on the configuration information instead of351 assuming pre populated test suites.352 Keep in mind that here we are going to replace the suite.name with a353 counter.354 If you need create a custom Job with your own TestSuites, please use355 the Job() constructor instead of this method.356 :param job_config: A config dict to be used on this job and also as a357 'global' config for each test suite.358 :type job_config: dict359 :param suites_configs: A list of specific config dict to be used on360 each test suite. Each suite config will be361 merged with the job_config dict. If None is362 passed then this job will have only one363 test_suite with the same config as job_config.364 :type suites_configs: list365 """366 suites_configs = suites_configs or [deepcopy(job_config)]367 suites = []368 for index, config in enumerate(suites_configs, start=1):369 suites.append(TestSuite.from_config(config,370 name=index,371 job_config=job_config))372 return cls(job_config, suites)373 @property374 def size(self):375 """Job size is the sum of all test suites sizes."""376 return sum([suite.size for suite in self.test_suites])377 @property378 def test_suite(self):379 """This is the first test suite of this job (deprecated).380 Please, use test_suites instead.381 """382 if self.test_suites:383 return self.test_suites[0]384 @test_suite.setter385 def test_suite(self, var):386 """Temporary setter. Suites should be set from test_suites."""387 if self.test_suites:388 self.test_suites[0] = var389 else:390 self.test_suites = [var]391 @property392 def timeout(self):393 if self._timeout is None:394 self._timeout = self.config.get('job.run.timeout')395 return self._timeout396 @property397 def unique_id(self):398 if self._unique_id is None:399 self._unique_id = self.config.get('run.unique_job_id') \400 or create_unique_job_id()401 return self._unique_id402 def cleanup(self):403 """404 Cleanup the temporary job handlers (dirs, global setting, ...)405 """406 output.del_last_configuration()407 self.__stop_job_logging()408 if not self.__keep_tmpdir and os.path.exists(self.tmpdir):409 shutil.rmtree(self.tmpdir)410 cleanup_conditionals = (411 self.config.get('run.dry_run.enabled'),412 not self.config.get('run.dry_run.no_cleanup')413 )414 if all(cleanup_conditionals):415 # Also clean up temp base directory created because of the dry-run416 base_logdir = self.config.get('run.results_dir')417 if base_logdir is not None:418 try:419 FileNotFoundError420 except NameError:421 FileNotFoundError = OSError # pylint: disable=W0622422 try:423 shutil.rmtree(base_logdir)424 except FileNotFoundError:425 pass426 def create_test_suite(self):427 msg = ("create_test_suite() is deprecated. You can also create your "428 "own suites with TestSuite() or TestSuite.from_config().")429 warnings.warn(msg, DeprecationWarning)430 try:431 self.test_suite = TestSuite.from_config(self.config)432 if self.test_suite and self.test_suite.size == 0:433 refs = self.test_suite.references434 msg = ("No tests found for given test references, try "435 "'avocado -V list %s' for details") % " ".join(refs)436 raise exceptions.JobTestSuiteEmptyError(msg)437 except TestSuiteError as details:438 raise exceptions.JobBaseException(details)439 if self.test_suite:440 self.result.tests_total = self.test_suite.size441 def post_tests(self):442 """443 Run the post tests execution hooks444 By default this runs the plugins that implement the445 :class:`avocado.core.plugin_interfaces.JobPostTests` interface.446 """447 self.result_events_dispatcher.map_method('post_tests', self)448 def pre_tests(self):449 """450 Run the pre tests execution hooks451 By default this runs the plugins that implement the452 :class:`avocado.core.plugin_interfaces.JobPreTests` interface.453 """454 self.result_events_dispatcher.map_method('pre_tests', self)455 def render_results(self):456 """Render test results that depend on all tests having finished.457 By default this runs the plugins that implement the458 :class:`avocado.core.plugin_interfaces.Result` interface.459 """460 result_dispatcher = dispatcher.ResultDispatcher()461 if result_dispatcher.extensions:462 result_dispatcher.map_method('render', self.result, self)463 def run(self):464 """465 Runs all job phases, returning the test execution results.466 This method is supposed to be the simplified interface for467 jobs, that is, they run all phases of a job.468 :return: Integer with overall job status. See469 :mod:`avocado.core.exit_codes` for more information.470 """471 assert self.tmpdir is not None, "Job.setup() not called"472 if self.time_start == -1:473 self.time_start = time.time()474 try:475 self.result.tests_total = self.size476 self.pre_tests()477 return self.run_tests()478 except exceptions.JobBaseException as details:479 self.status = details.status480 fail_class = details.__class__.__name__481 self.log.error('\nAvocado job failed: %s: %s', fail_class, details)482 self.exitcode |= exit_codes.AVOCADO_JOB_FAIL483 return self.exitcode484 except exceptions.OptionValidationError as details:485 self.log.error('\n%s', str(details))486 self.exitcode |= exit_codes.AVOCADO_JOB_FAIL487 return self.exitcode488 except Exception as details: # pylint: disable=W0703489 self.status = "ERROR"490 exc_type, exc_value, exc_traceback = sys.exc_info()491 tb_info = traceback.format_exception(exc_type, exc_value,492 exc_traceback.tb_next)493 fail_class = details.__class__.__name__494 self.log.error('\nAvocado crashed: %s: %s', fail_class, details)495 for line in tb_info:496 self.log.debug(line)497 self.log.error("Please include the traceback info and command line"498 " used on your bug report")499 self.log.error('Report bugs visiting %s', _NEW_ISSUE_LINK)500 self.exitcode |= exit_codes.AVOCADO_FAIL501 return self.exitcode502 finally:503 self.post_tests()504 if self.time_end == -1:505 self.time_end = time.time()506 self.time_elapsed = self.time_end - self.time_start507 self.render_results()508 def run_tests(self):509 """510 The actual test execution phase511 """512 self._log_job_debug_info()513 jobdata.record(self, sys.argv)514 if self.size == 0:515 msg = ("Unable to resolve any reference or 'run.references' is "516 "empty.")517 LOG_UI.error(msg)518 if not self.test_suites:519 self.exitcode |= exit_codes.AVOCADO_JOB_FAIL520 return self.exitcode521 summary = set()522 for suite in self.test_suites:523 summary |= suite.run(self)524 # If it's all good so far, set job status to 'PASS'525 if self.status == 'RUNNING':526 self.status = 'PASS'527 LOG_JOB.info('Test results available in %s', self.logdir)528 if 'INTERRUPTED' in summary:529 self.exitcode |= exit_codes.AVOCADO_JOB_INTERRUPTED530 if 'FAIL' in summary:531 self.exitcode |= exit_codes.AVOCADO_TESTS_FAIL532 return self.exitcode533 def setup(self):534 """535 Setup the temporary job handlers (dirs, global setting, ...)536 """537 output.reconfigure(self.config)538 assert self.tmpdir is None, "Job.setup() already called"539 if self.config.get('run.dry_run.enabled'): # Create the dry-run dirs540 if self.config.get('run.results_dir') is None:541 tmp_dir = tempfile.mkdtemp(prefix="avocado-dry-run-")542 self.config['run.results_dir'] = tmp_dir543 self._setup_job_results()544 self.result = result.Result(self.unique_id, self.logfile)545 self.__start_job_logging()546 self._setup_job_category()547 # Use "logdir" in case "keep_tmp" is enabled548 if self.config.get('run.keep_tmp'):549 base_tmpdir = self.logdir550 else:551 base_tmpdir = data_dir.get_tmp_dir()552 self.__keep_tmpdir = False553 self.tmpdir = tempfile.mkdtemp(prefix="avocado_job_",...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!