Best Python code snippet using autotest_python
job.py
Source:job.py
...159 self._is_continuation = options.cont160 self._current_step_ancestry = []161 self._next_step_index = 0162 self._load_state()163 _harness = self.handle_persistent_option(options, 'harness')164 _harness_args = self.handle_persistent_option(options, 'harness_args')165 self.harness = harness.select(_harness, self, _harness_args)166 if self.control:167 parsed_control = control_data.parse_control(168 self.control, raise_warnings=False)169 self.fast = parsed_control.fast170 # set up the status logger171 def client_job_record_hook(entry):172 msg_tag = ''173 if '.' in self._logger.global_filename:174 msg_tag = self._logger.global_filename.split('.', 1)[1]175 # send the entry to the job harness176 message = '\n'.join([entry.message] + entry.extra_message_lines)177 rendered_entry = self._logger.render_entry(entry)178 self.harness.test_status_detail(entry.status_code, entry.subdir,179 entry.operation, message, msg_tag,180 entry.fields)181 self.harness.test_status(rendered_entry, msg_tag)182 # send the entry to stdout, if it's enabled183 logging.info(rendered_entry)184 self._logger = base_job.status_logger(185 self, status_indenter(self), record_hook=client_job_record_hook)186 def _post_record_init(self, control, options, drop_caches):187 """188 Perform job initialization not required by self.record().189 """190 self._init_drop_caches(drop_caches)191 self._init_packages()192 self.sysinfo = sysinfo.sysinfo(self.resultdir)193 self._load_sysinfo_state()194 if not options.cont:195 download = os.path.join(self.testdir, 'download')196 if not os.path.exists(download):197 os.mkdir(download)198 shutil.copyfile(self.control,199 os.path.join(self.resultdir, 'control'))200 self.control = control201 self.logging = logging_manager.get_logging_manager(202 manage_stdout_and_stderr=True, redirect_fds=True)203 self.logging.start_logging()204 self.profilers = profilers.profilers(self)205 self.machines = [options.hostname]206 self.machine_dict_list = [{'hostname' : options.hostname}]207 # Client side tests should always run the same whether or not they are208 # running in the lab.209 self.in_lab = False210 self.hosts = set([local_host.LocalHost(hostname=options.hostname)])211 self.args = []212 if options.args:213 self.args = self._parse_args(options.args)214 if options.user:215 self.user = options.user216 else:217 self.user = getpass.getuser()218 self.sysinfo.log_per_reboot_data()219 if not options.cont:220 self.record('START', None, None)221 self.harness.run_start()222 if options.log:223 self.enable_external_logging()224 self.num_tests_run = None225 self.num_tests_failed = None226 self.warning_loggers = None227 self.warning_manager = None228 def _init_drop_caches(self, drop_caches):229 """230 Perform the drop caches initialization.231 """232 self.drop_caches_between_iterations = (233 GLOBAL_CONFIG.get_config_value('CLIENT',234 'drop_caches_between_iterations',235 type=bool, default=True))236 self.drop_caches = drop_caches237 if self.drop_caches:238 utils.drop_caches()239 def _init_packages(self):240 """241 Perform the packages support initialization.242 """243 self.pkgmgr = packages.PackageManager(244 self.autodir, run_function_dargs={'timeout':3600})245 def _cleanup_results_dir(self):246 """Delete everything in resultsdir"""247 assert os.path.exists(self.resultdir)248 list_files = glob.glob('%s/*' % self.resultdir)249 for f in list_files:250 if os.path.isdir(f):251 shutil.rmtree(f)252 elif os.path.isfile(f):253 os.remove(f)254 def _cleanup_debugdir_files(self):255 """256 Delete any leftover debugdir files257 """258 list_files = glob.glob("/tmp/autotest_results_dir.*")259 for f in list_files:260 os.remove(f)261 def disable_warnings(self, warning_type):262 self.record("INFO", None, None,263 "disabling %s warnings" % warning_type,264 {"warnings.disable": warning_type})265 time.sleep(self._WARNING_DISABLE_DELAY)266 def enable_warnings(self, warning_type):267 time.sleep(self._WARNING_DISABLE_DELAY)268 self.record("INFO", None, None,269 "enabling %s warnings" % warning_type,270 {"warnings.enable": warning_type})271 def monitor_disk_usage(self, max_rate):272 """\273 Signal that the job should monitor disk space usage on /274 and generate a warning if a test uses up disk space at a275 rate exceeding 'max_rate'.276 Parameters:277 max_rate - the maximium allowed rate of disk consumption278 during a test, in MB/hour, or 0 to indicate279 no limit.280 """281 self._max_disk_usage_rate = max_rate282 def control_get(self):283 return self.control284 def control_set(self, control):285 self.control = os.path.abspath(control)286 def harness_select(self, which, harness_args):287 self.harness = harness.select(which, self, harness_args)288 def setup_dirs(self, results_dir, tmp_dir):289 if not tmp_dir:290 tmp_dir = os.path.join(self.tmpdir, 'build')291 if not os.path.exists(tmp_dir):292 os.mkdir(tmp_dir)293 if not os.path.isdir(tmp_dir):294 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir295 raise ValueError(e_msg)296 # We label the first build "build" and then subsequent ones297 # as "build.2", "build.3", etc. Whilst this is a little bit298 # inconsistent, 99.9% of jobs will only have one build299 # (that's not done as kernbench, sparse, or buildtest),300 # so it works out much cleaner. One of life's compromises.301 if not results_dir:302 results_dir = os.path.join(self.resultdir, 'build')303 i = 2304 while os.path.exists(results_dir):305 results_dir = os.path.join(self.resultdir, 'build.%d' % i)306 i += 1307 if not os.path.exists(results_dir):308 os.mkdir(results_dir)309 return (results_dir, tmp_dir)310 def barrier(self, *args, **kwds):311 """Create a barrier object"""312 return barrier.barrier(*args, **kwds)313 def install_pkg(self, name, pkg_type, install_dir):314 '''315 This method is a simple wrapper around the actual package316 installation method in the Packager class. This is used317 internally by the profilers, deps and tests code.318 name : name of the package (ex: sleeptest, dbench etc.)319 pkg_type : Type of the package (ex: test, dep etc.)320 install_dir : The directory in which the source is actually321 untarred into. (ex: client/profilers/<name> for profilers)322 '''323 if self.pkgmgr.repositories:324 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)325 def add_repository(self, repo_urls):326 '''327 Adds the repository locations to the job so that packages328 can be fetched from them when needed. The repository list329 needs to be a string list330 Ex: job.add_repository(['http://blah1','http://blah2'])331 '''332 for repo_url in repo_urls:333 self.pkgmgr.add_repository(repo_url)334 # Fetch the packages' checksum file that contains the checksums335 # of all the packages if it is not already fetched. The checksum336 # is always fetched whenever a job is first started. This337 # is not done in the job's constructor as we don't have the list of338 # the repositories there (and obviously don't care about this file339 # if we are not using the repos)340 try:341 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,342 packages.CHECKSUM_FILE)343 self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,344 checksum_file_path, use_checksum=False)345 except error.PackageFetchError:346 # packaging system might not be working in this case347 # Silently fall back to the normal case348 pass349 def require_gcc(self):350 """351 Test whether gcc is installed on the machine.352 """353 # check if gcc is installed on the system.354 try:355 utils.system('which gcc')356 except error.CmdError:357 raise NotAvailableError('gcc is required by this job and is '358 'not available on the system')359 def setup_dep(self, deps):360 """Set up the dependencies for this test.361 deps is a list of libraries required for this test.362 """363 # Fetch the deps from the repositories and set them up.364 for dep in deps:365 dep_dir = os.path.join(self.autodir, 'deps', dep)366 # Search for the dependency in the repositories if specified,367 # else check locally.368 try:369 self.install_pkg(dep, 'dep', dep_dir)370 except error.PackageInstallError:371 # see if the dep is there locally372 pass373 # dep_dir might not exist if it is not fetched from the repos374 if not os.path.exists(dep_dir):375 raise error.TestError("Dependency %s does not exist" % dep)376 os.chdir(dep_dir)377 if execfile('%s.py' % dep, {}) is None:378 logging.info('Dependency %s successfuly built', dep)379 def _runtest(self, url, tag, timeout, args, dargs):380 try:381 l = lambda : test.runtest(self, url, tag, args, dargs)382 pid = parallel.fork_start(self.resultdir, l)383 if timeout:384 logging.debug('Waiting for pid %d for %d seconds', pid, timeout)385 parallel.fork_waitfor_timed(self.resultdir, pid, timeout)386 else:387 parallel.fork_waitfor(self.resultdir, pid)388 except error.TestBaseException:389 # These are already classified with an error type (exit_status)390 raise391 except error.JobError:392 raise # Caught further up and turned into an ABORT.393 except Exception, e:394 # Converts all other exceptions thrown by the test regardless395 # of phase into a TestError(TestBaseException) subclass that396 # reports them with their full stack trace.397 raise error.UnhandledTestError(e)398 def _run_test_base(self, url, *args, **dargs):399 """400 Prepares arguments and run functions to run_test and run_test_detail.401 @param url A url that identifies the test to run.402 @param tag An optional keyword argument that will be added to the403 test and subdir name.404 @param subdir_tag An optional keyword argument that will be added405 to the subdir name.406 @returns:407 subdir: Test subdirectory408 testname: Test name409 group_func: Actual test run function410 timeout: Test timeout411 """412 _group, testname = self.pkgmgr.get_package_name(url, 'test')413 testname, subdir, tag = self._build_tagged_test_name(testname, dargs)414 self._make_test_outputdir(subdir)415 timeout = dargs.pop('timeout', None)416 if timeout:417 logging.debug('Test has timeout: %d sec.', timeout)418 def log_warning(reason):419 self.record("WARN", subdir, testname, reason)420 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)421 def group_func():422 try:423 self._runtest(url, tag, timeout, args, dargs)424 except error.TestBaseException, detail:425 # The error is already classified, record it properly.426 self.record(detail.exit_status, subdir, testname, str(detail))427 raise428 else:429 self.record('GOOD', subdir, testname, 'completed successfully')430 return (subdir, testname, group_func, timeout)431 @_run_test_complete_on_exit432 def run_test(self, url, *args, **dargs):433 """434 Summon a test object and run it.435 @param url A url that identifies the test to run.436 @param tag An optional keyword argument that will be added to the437 test and subdir name.438 @param subdir_tag An optional keyword argument that will be added439 to the subdir name.440 @returns True if the test passes, False otherwise.441 """442 (subdir, testname, group_func, timeout) = self._run_test_base(url,443 *args,444 **dargs)445 try:446 self._rungroup(subdir, testname, group_func, timeout)447 return True448 except error.TestBaseException:449 return False450 # Any other exception here will be given to the caller451 #452 # NOTE: The only exception possible from the control file here453 # is error.JobError as _runtest() turns all others into an454 # UnhandledTestError that is caught above.455 @_run_test_complete_on_exit456 def run_test_detail(self, url, *args, **dargs):457 """458 Summon a test object and run it, returning test status.459 @param url A url that identifies the test to run.460 @param tag An optional keyword argument that will be added to the461 test and subdir name.462 @param subdir_tag An optional keyword argument that will be added463 to the subdir name.464 @returns Test status465 @see: client/common_lib/error.py, exit_status466 """467 (subdir, testname, group_func, timeout) = self._run_test_base(url,468 *args,469 **dargs)470 try:471 self._rungroup(subdir, testname, group_func, timeout)472 return 'GOOD'473 except error.TestBaseException, detail:474 return detail.exit_status475 def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):476 """\477 subdir:478 name of the group479 testname:480 name of the test to run, or support step481 function:482 subroutine to run483 *args:484 arguments for the function485 Returns the result of the passed in function486 """487 try:488 optional_fields = None489 if timeout:490 optional_fields = {}491 optional_fields['timeout'] = timeout492 self.record('START', subdir, testname,493 optional_fields=optional_fields)494 self._state.set('client', 'unexpected_reboot', (subdir, testname))495 try:496 result = function(*args, **dargs)497 self.record('END GOOD', subdir, testname)498 return result499 except error.TestBaseException, e:500 self.record('END %s' % e.exit_status, subdir, testname)501 raise502 except error.JobError, e:503 self.record('END ABORT', subdir, testname)504 raise505 except Exception, e:506 # This should only ever happen due to a bug in the given507 # function's code. The common case of being called by508 # run_test() will never reach this. If a control file called509 # run_group() itself, bugs in its function will be caught510 # here.511 err_msg = str(e) + '\n' + traceback.format_exc()512 self.record('END ERROR', subdir, testname, err_msg)513 raise514 finally:515 self._state.discard('client', 'unexpected_reboot')516 def run_group(self, function, tag=None, **dargs):517 """518 Run a function nested within a group level.519 function:520 Callable to run.521 tag:522 An optional tag name for the group. If None (default)523 function.__name__ will be used.524 **dargs:525 Named arguments for the function.526 """527 if tag:528 name = tag529 else:530 name = function.__name__531 try:532 return self._rungroup(subdir=None, testname=name,533 function=function, timeout=None, **dargs)534 except (SystemExit, error.TestBaseException):535 raise536 # If there was a different exception, turn it into a TestError.537 # It will be caught by step_engine or _run_step_fn.538 except Exception, e:539 raise error.UnhandledTestError(e)540 def cpu_count(self):541 return utils.count_cpus() # use total system count542 def start_reboot(self):543 self.record('START', None, 'reboot')544 self.record('GOOD', None, 'reboot.start')545 def _record_reboot_failure(self, subdir, operation, status,546 running_id=None):547 self.record("ABORT", subdir, operation, status)548 if not running_id:549 running_id = utils.running_os_ident()550 kernel = {"kernel": running_id.split("::")[0]}551 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)552 def _check_post_reboot(self, subdir, running_id=None):553 """554 Function to perform post boot checks such as if the system configuration555 has changed across reboots (specifically, CPUs and partitions).556 @param subdir: The subdir to use in the job.record call.557 @param running_id: An optional running_id to include in the reboot558 failure log message559 @raise JobError: Raised if the current configuration does not match the560 pre-reboot configuration.561 """562 # check to see if any partitions have changed563 partition_list = partition_lib.get_partition_list(self,564 exclude_swap=False)565 mount_info = partition_lib.get_mount_info(partition_list)566 old_mount_info = self._state.get('client', 'mount_info')567 if mount_info != old_mount_info:568 new_entries = mount_info - old_mount_info569 old_entries = old_mount_info - mount_info570 description = ("mounted partitions are different after reboot "571 "(old entries: %s, new entries: %s)" %572 (old_entries, new_entries))573 self._record_reboot_failure(subdir, "reboot.verify_config",574 description, running_id=running_id)575 raise error.JobError("Reboot failed: %s" % description)576 # check to see if any CPUs have changed577 cpu_count = utils.count_cpus()578 old_count = self._state.get('client', 'cpu_count')579 if cpu_count != old_count:580 description = ('Number of CPUs changed after reboot '581 '(old count: %d, new count: %d)' %582 (old_count, cpu_count))583 self._record_reboot_failure(subdir, 'reboot.verify_config',584 description, running_id=running_id)585 raise error.JobError('Reboot failed: %s' % description)586 def partition(self, device, loop_size=0, mountpoint=None):587 """588 Work with a machine partition589 @param device: e.g. /dev/sda2, /dev/sdb1 etc...590 @param mountpoint: Specify a directory to mount to. If not specified591 autotest tmp directory will be used.592 @param loop_size: Size of loopback device (in MB). Defaults to 0.593 @return: A L{client.bin.partition.partition} object594 """595 if not mountpoint:596 mountpoint = self.tmpdir597 return partition_lib.partition(self, device, loop_size, mountpoint)598 @utils.deprecated599 def filesystem(self, device, mountpoint=None, loop_size=0):600 """ Same as partition601 @deprecated: Use partition method instead602 """603 return self.partition(device, loop_size, mountpoint)604 def enable_external_logging(self):605 pass606 def disable_external_logging(self):607 pass608 def reboot_setup(self):609 # save the partition list and mount points, as well as the cpu count610 partition_list = partition_lib.get_partition_list(self,611 exclude_swap=False)612 mount_info = partition_lib.get_mount_info(partition_list)613 self._state.set('client', 'mount_info', mount_info)614 self._state.set('client', 'cpu_count', utils.count_cpus())615 def reboot(self):616 self.reboot_setup()617 self.harness.run_reboot()618 # HACK: using this as a module sometimes hangs shutdown, so if it's619 # installed unload it first620 utils.system("modprobe -r netconsole", ignore_status=True)621 # sync first, so that a sync during shutdown doesn't time out622 utils.system("sync; sync", ignore_status=True)623 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")624 self.quit()625 def noop(self, text):626 logging.info("job: noop: " + text)627 @_run_test_complete_on_exit628 def parallel(self, *tasklist):629 """Run tasks in parallel"""630 pids = []631 old_log_filename = self._logger.global_filename632 for i, task in enumerate(tasklist):633 assert isinstance(task, (tuple, list))634 self._logger.global_filename = old_log_filename + (".%d" % i)635 def task_func():636 # stub out _record_indent with a process-local one637 base_record_indent = self._record_indent638 proc_local = self._job_state.property_factory(639 '_state', '_record_indent.%d' % os.getpid(),640 base_record_indent, namespace='client')641 self.__class__._record_indent = proc_local642 task[0](*task[1:])643 pids.append(parallel.fork_start(self.resultdir, task_func))644 old_log_path = os.path.join(self.resultdir, old_log_filename)645 old_log = open(old_log_path, "a")646 exceptions = []647 for i, pid in enumerate(pids):648 # wait for the task to finish649 try:650 parallel.fork_waitfor(self.resultdir, pid)651 except Exception, e:652 exceptions.append(e)653 # copy the logs from the subtask into the main log654 new_log_path = old_log_path + (".%d" % i)655 if os.path.exists(new_log_path):656 new_log = open(new_log_path)657 old_log.write(new_log.read())658 new_log.close()659 old_log.flush()660 os.remove(new_log_path)661 old_log.close()662 self._logger.global_filename = old_log_filename663 # handle any exceptions raised by the parallel tasks664 if exceptions:665 msg = "%d task(s) failed in job.parallel" % len(exceptions)666 raise error.JobError(msg)667 def quit(self):668 # XXX: should have a better name.669 self.harness.run_pause()670 raise error.JobContinue("more to come")671 def complete(self, status):672 """Write pending reports, clean up, and exit"""673 # write out a job HTML report674 try:675 html_report.create_report(self.resultdir)676 except Exception, e:677 logging.error("Error writing job HTML report: %s", e)678 # We are about to exit 'complete' so clean up the control file.679 dest = os.path.join(self.resultdir, os.path.basename(self._state_file))680 shutil.move(self._state_file, dest)681 self.harness.run_complete()682 self.disable_external_logging()683 sys.exit(status)684 def _load_state(self):685 # grab any initial state and set up $CONTROL.state as the backing file686 init_state_file = self.control + '.init.state'687 self._state_file = self.control + '.state'688 if os.path.exists(init_state_file):689 shutil.move(init_state_file, self._state_file)690 self._state.set_backing_file(self._state_file)691 # initialize the state engine, if necessary692 has_steps = self._state.has('client', 'steps')693 if not self._is_continuation and has_steps:694 raise RuntimeError('Loaded state can only contain client.steps if '695 'this is a continuation')696 if not has_steps:697 logging.debug('Initializing the state engine')698 self._state.set('client', 'steps', [])699 def handle_persistent_option(self, options, option_name):700 """701 Select option from command line or persistent state.702 Store selected option to allow standalone client to continue703 after reboot with previously selected options.704 Priority:705 1. explicitly specified via command line706 2. stored in state file (if continuing job '-c')707 3. default == None708 """709 option = None710 cmd_line_option = getattr(options, option_name)711 if cmd_line_option:712 option = cmd_line_option713 self._state.set('client', option_name, option)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!