Best Python code snippet using autotest_python
job.py
Source:job.py
1"""The main job wrapper2This is the core infrastructure.3Copyright Andy Whitcroft, Martin J. Bligh 20064"""5# pylint: disable=missing-docstring6import copy7from datetime import datetime8import getpass9import glob10import logging11import os12import re13import shutil14import sys15import time16import traceback17import types18import weakref19import common20from autotest_lib.client.bin import client_logging_config21from autotest_lib.client.bin import harness22from autotest_lib.client.bin import local_host23from autotest_lib.client.bin import parallel24from autotest_lib.client.bin import partition as partition_lib25from autotest_lib.client.bin import profilers26from autotest_lib.client.bin import sysinfo27from autotest_lib.client.bin import test28from autotest_lib.client.bin import utils29from autotest_lib.client.common_lib import barrier30from autotest_lib.client.common_lib import base_job31from autotest_lib.client.common_lib import control_data32from autotest_lib.client.common_lib import error33from autotest_lib.client.common_lib import global_config34from autotest_lib.client.common_lib import logging_manager35from autotest_lib.client.common_lib import packages36from autotest_lib.client.cros import cros_logging37from autotest_lib.client.tools import html_report38GLOBAL_CONFIG = global_config.global_config39LAST_BOOT_TAG = object()40JOB_PREAMBLE = """41from autotest_lib.client.common_lib.error import *42from autotest_lib.client.bin.utils import *43"""44class StepError(error.AutotestError):45 pass46class NotAvailableError(error.AutotestError):47 pass48def _run_test_complete_on_exit(f):49 """Decorator for job methods that automatically calls50 self.harness.run_test_complete when the method exits, if appropriate."""51 def wrapped(self, *args, **dargs):52 try:53 return f(self, *args, **dargs)54 finally:55 if self._logger.global_filename == 'status':56 self.harness.run_test_complete()57 if self.drop_caches:58 utils.drop_caches()59 wrapped.__name__ = f.__name__60 wrapped.__doc__ = f.__doc__61 wrapped.__dict__.update(f.__dict__)62 return wrapped63class status_indenter(base_job.status_indenter):64 """Provide a status indenter that is backed by job._record_prefix."""65 def __init__(self, job_):66 self._job = weakref.proxy(job_) # avoid a circular reference67 @property68 def indent(self):69 return self._job._record_indent70 def increment(self):71 self._job._record_indent += 172 def decrement(self):73 self._job._record_indent -= 174class base_client_job(base_job.base_job):75 """The client-side concrete implementation of base_job.76 Optional properties provided by this implementation:77 control78 harness79 """80 _WARNING_DISABLE_DELAY = 581 # _record_indent is a persistent property, but only on the client82 _job_state = base_job.base_job._job_state83 _record_indent = _job_state.property_factory(84 '_state', '_record_indent', 0, namespace='client')85 _max_disk_usage_rate = _job_state.property_factory(86 '_state', '_max_disk_usage_rate', 0.0, namespace='client')87 def __init__(self, control, options, drop_caches=True):88 """89 Prepare a client side job object.90 @param control: The control file (pathname of).91 @param options: an object which includes:92 jobtag: The job tag string (eg "default").93 cont: If this is the continuation of this job.94 harness_type: An alternative server harness. [None]95 use_external_logging: If true, the enable_external_logging96 method will be called during construction. [False]97 @param drop_caches: If true, utils.drop_caches() is called before and98 between all tests. [True]99 """100 super(base_client_job, self).__init__(options=options)101 self._pre_record_init(control, options)102 try:103 self._post_record_init(control, options, drop_caches)104 except Exception, err:105 self.record(106 'ABORT', None, None,'client.bin.job.__init__ failed: %s' %107 str(err))108 raise109 @classmethod110 def _get_environ_autodir(cls):111 return os.environ['AUTODIR']112 @classmethod113 def _find_base_directories(cls):114 """115 Determine locations of autodir and clientdir (which are the same)116 using os.environ. Serverdir does not exist in this context.117 """118 autodir = clientdir = cls._get_environ_autodir()119 return autodir, clientdir, None120 @classmethod121 def _parse_args(cls, args):122 return re.findall("[^\s]*?['|\"].*?['|\"]|[^\s]+", args)123 def _find_resultdir(self, options):124 """125 Determine the directory for storing results. On a client this is126 always <autodir>/results/<tag>, where tag is passed in on the command127 line as an option.128 """129 output_dir_config = GLOBAL_CONFIG.get_config_value('CLIENT',130 'output_dir',131 default="")132 if options.output_dir:133 basedir = options.output_dir134 elif output_dir_config:135 basedir = output_dir_config136 else:137 basedir = self.autodir138 return os.path.join(basedir, 'results', options.tag)139 def _get_status_logger(self):140 """Return a reference to the status logger."""141 return self._logger142 def _pre_record_init(self, control, options):143 """144 Initialization function that should peform ONLY the required145 setup so that the self.record() method works.146 As of now self.record() needs self.resultdir, self._group_level,147 self.harness and of course self._logger.148 """149 if not options.cont:150 self._cleanup_debugdir_files()151 self._cleanup_results_dir()152 logging_manager.configure_logging(153 client_logging_config.ClientLoggingConfig(),154 results_dir=self.resultdir,155 verbose=options.verbose)156 logging.info('Writing results to %s', self.resultdir)157 # init_group_level needs the state158 self.control = os.path.realpath(control)159 self._is_continuation = options.cont160 self._current_step_ancestry = []161 self._next_step_index = 0162 self._load_state()163 _harness = self.handle_persistent_option(options, 'harness')164 _harness_args = self.handle_persistent_option(options, 'harness_args')165 self.harness = harness.select(_harness, self, _harness_args)166 if self.control:167 parsed_control = control_data.parse_control(168 self.control, raise_warnings=False)169 self.fast = parsed_control.fast170 # set up the status logger171 def client_job_record_hook(entry):172 msg_tag = ''173 if '.' in self._logger.global_filename:174 msg_tag = self._logger.global_filename.split('.', 1)[1]175 # send the entry to the job harness176 message = '\n'.join([entry.message] + entry.extra_message_lines)177 rendered_entry = self._logger.render_entry(entry)178 self.harness.test_status_detail(entry.status_code, entry.subdir,179 entry.operation, message, msg_tag,180 entry.fields)181 self.harness.test_status(rendered_entry, msg_tag)182 # send the entry to stdout, if it's enabled183 logging.info(rendered_entry)184 self._logger = base_job.status_logger(185 self, status_indenter(self), record_hook=client_job_record_hook)186 def _post_record_init(self, control, options, drop_caches):187 """188 Perform job initialization not required by self.record().189 """190 self._init_drop_caches(drop_caches)191 self._init_packages()192 self.sysinfo = sysinfo.sysinfo(self.resultdir)193 self._load_sysinfo_state()194 if not options.cont:195 download = os.path.join(self.testdir, 'download')196 if not os.path.exists(download):197 os.mkdir(download)198 shutil.copyfile(self.control,199 os.path.join(self.resultdir, 'control'))200 self.control = control201 self.logging = logging_manager.get_logging_manager(202 manage_stdout_and_stderr=True, redirect_fds=True)203 self.logging.start_logging()204 self.profilers = profilers.profilers(self)205 self.machines = [options.hostname]206 self.machine_dict_list = [{'hostname' : options.hostname}]207 # Client side tests should always run the same whether or not they are208 # running in the lab.209 self.in_lab = False210 self.hosts = set([local_host.LocalHost(hostname=options.hostname)])211 self.args = []212 if options.args:213 self.args = self._parse_args(options.args)214 if options.user:215 self.user = options.user216 else:217 self.user = getpass.getuser()218 self.sysinfo.log_per_reboot_data()219 if not options.cont:220 self.record('START', None, None)221 self.harness.run_start()222 if options.log:223 self.enable_external_logging()224 self.num_tests_run = None225 self.num_tests_failed = None226 self.warning_loggers = None227 self.warning_manager = None228 def _init_drop_caches(self, drop_caches):229 """230 Perform the drop caches initialization.231 """232 self.drop_caches_between_iterations = (233 GLOBAL_CONFIG.get_config_value('CLIENT',234 'drop_caches_between_iterations',235 type=bool, default=True))236 self.drop_caches = drop_caches237 if self.drop_caches:238 utils.drop_caches()239 def _init_packages(self):240 """241 Perform the packages support initialization.242 """243 self.pkgmgr = packages.PackageManager(244 self.autodir, run_function_dargs={'timeout':3600})245 def _cleanup_results_dir(self):246 """Delete everything in resultsdir"""247 assert os.path.exists(self.resultdir)248 list_files = glob.glob('%s/*' % self.resultdir)249 for f in list_files:250 if os.path.isdir(f):251 shutil.rmtree(f)252 elif os.path.isfile(f):253 os.remove(f)254 def _cleanup_debugdir_files(self):255 """256 Delete any leftover debugdir files257 """258 list_files = glob.glob("/tmp/autotest_results_dir.*")259 for f in list_files:260 os.remove(f)261 def disable_warnings(self, warning_type):262 self.record("INFO", None, None,263 "disabling %s warnings" % warning_type,264 {"warnings.disable": warning_type})265 time.sleep(self._WARNING_DISABLE_DELAY)266 def enable_warnings(self, warning_type):267 time.sleep(self._WARNING_DISABLE_DELAY)268 self.record("INFO", None, None,269 "enabling %s warnings" % warning_type,270 {"warnings.enable": warning_type})271 def monitor_disk_usage(self, max_rate):272 """\273 Signal that the job should monitor disk space usage on /274 and generate a warning if a test uses up disk space at a275 rate exceeding 'max_rate'.276 Parameters:277 max_rate - the maximium allowed rate of disk consumption278 during a test, in MB/hour, or 0 to indicate279 no limit.280 """281 self._max_disk_usage_rate = max_rate282 def control_get(self):283 return self.control284 def control_set(self, control):285 self.control = os.path.abspath(control)286 def harness_select(self, which, harness_args):287 self.harness = harness.select(which, self, harness_args)288 def setup_dirs(self, results_dir, tmp_dir):289 if not tmp_dir:290 tmp_dir = os.path.join(self.tmpdir, 'build')291 if not os.path.exists(tmp_dir):292 os.mkdir(tmp_dir)293 if not os.path.isdir(tmp_dir):294 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir295 raise ValueError(e_msg)296 # We label the first build "build" and then subsequent ones297 # as "build.2", "build.3", etc. Whilst this is a little bit298 # inconsistent, 99.9% of jobs will only have one build299 # (that's not done as kernbench, sparse, or buildtest),300 # so it works out much cleaner. One of life's compromises.301 if not results_dir:302 results_dir = os.path.join(self.resultdir, 'build')303 i = 2304 while os.path.exists(results_dir):305 results_dir = os.path.join(self.resultdir, 'build.%d' % i)306 i += 1307 if not os.path.exists(results_dir):308 os.mkdir(results_dir)309 return (results_dir, tmp_dir)310 def barrier(self, *args, **kwds):311 """Create a barrier object"""312 return barrier.barrier(*args, **kwds)313 def install_pkg(self, name, pkg_type, install_dir):314 '''315 This method is a simple wrapper around the actual package316 installation method in the Packager class. This is used317 internally by the profilers, deps and tests code.318 name : name of the package (ex: sleeptest, dbench etc.)319 pkg_type : Type of the package (ex: test, dep etc.)320 install_dir : The directory in which the source is actually321 untarred into. (ex: client/profilers/<name> for profilers)322 '''323 if self.pkgmgr.repositories:324 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)325 def add_repository(self, repo_urls):326 '''327 Adds the repository locations to the job so that packages328 can be fetched from them when needed. The repository list329 needs to be a string list330 Ex: job.add_repository(['http://blah1','http://blah2'])331 '''332 for repo_url in repo_urls:333 self.pkgmgr.add_repository(repo_url)334 # Fetch the packages' checksum file that contains the checksums335 # of all the packages if it is not already fetched. The checksum336 # is always fetched whenever a job is first started. This337 # is not done in the job's constructor as we don't have the list of338 # the repositories there (and obviously don't care about this file339 # if we are not using the repos)340 try:341 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,342 packages.CHECKSUM_FILE)343 self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,344 checksum_file_path, use_checksum=False)345 except error.PackageFetchError:346 # packaging system might not be working in this case347 # Silently fall back to the normal case348 pass349 def require_gcc(self):350 """351 Test whether gcc is installed on the machine.352 """353 # check if gcc is installed on the system.354 try:355 utils.system('which gcc')356 except error.CmdError:357 raise NotAvailableError('gcc is required by this job and is '358 'not available on the system')359 def setup_dep(self, deps):360 """Set up the dependencies for this test.361 deps is a list of libraries required for this test.362 """363 # Fetch the deps from the repositories and set them up.364 for dep in deps:365 dep_dir = os.path.join(self.autodir, 'deps', dep)366 # Search for the dependency in the repositories if specified,367 # else check locally.368 try:369 self.install_pkg(dep, 'dep', dep_dir)370 except error.PackageInstallError:371 # see if the dep is there locally372 pass373 # dep_dir might not exist if it is not fetched from the repos374 if not os.path.exists(dep_dir):375 raise error.TestError("Dependency %s does not exist" % dep)376 os.chdir(dep_dir)377 if execfile('%s.py' % dep, {}) is None:378 logging.info('Dependency %s successfuly built', dep)379 def _runtest(self, url, tag, timeout, args, dargs):380 try:381 l = lambda : test.runtest(self, url, tag, args, dargs)382 pid = parallel.fork_start(self.resultdir, l)383 if timeout:384 logging.debug('Waiting for pid %d for %d seconds', pid, timeout)385 parallel.fork_waitfor_timed(self.resultdir, pid, timeout)386 else:387 parallel.fork_waitfor(self.resultdir, pid)388 except error.TestBaseException:389 # These are already classified with an error type (exit_status)390 raise391 except error.JobError:392 raise # Caught further up and turned into an ABORT.393 except Exception, e:394 # Converts all other exceptions thrown by the test regardless395 # of phase into a TestError(TestBaseException) subclass that396 # reports them with their full stack trace.397 raise error.UnhandledTestError(e)398 def _run_test_base(self, url, *args, **dargs):399 """400 Prepares arguments and run functions to run_test and run_test_detail.401 @param url A url that identifies the test to run.402 @param tag An optional keyword argument that will be added to the403 test and subdir name.404 @param subdir_tag An optional keyword argument that will be added405 to the subdir name.406 @returns:407 subdir: Test subdirectory408 testname: Test name409 group_func: Actual test run function410 timeout: Test timeout411 """412 _group, testname = self.pkgmgr.get_package_name(url, 'test')413 testname, subdir, tag = self._build_tagged_test_name(testname, dargs)414 self._make_test_outputdir(subdir)415 timeout = dargs.pop('timeout', None)416 if timeout:417 logging.debug('Test has timeout: %d sec.', timeout)418 def log_warning(reason):419 self.record("WARN", subdir, testname, reason)420 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)421 def group_func():422 try:423 self._runtest(url, tag, timeout, args, dargs)424 except error.TestBaseException, detail:425 # The error is already classified, record it properly.426 self.record(detail.exit_status, subdir, testname, str(detail))427 raise428 else:429 self.record('GOOD', subdir, testname, 'completed successfully')430 return (subdir, testname, group_func, timeout)431 @_run_test_complete_on_exit432 def run_test(self, url, *args, **dargs):433 """434 Summon a test object and run it.435 @param url A url that identifies the test to run.436 @param tag An optional keyword argument that will be added to the437 test and subdir name.438 @param subdir_tag An optional keyword argument that will be added439 to the subdir name.440 @returns True if the test passes, False otherwise.441 """442 (subdir, testname, group_func, timeout) = self._run_test_base(url,443 *args,444 **dargs)445 try:446 self._rungroup(subdir, testname, group_func, timeout)447 return True448 except error.TestBaseException:449 return False450 # Any other exception here will be given to the caller451 #452 # NOTE: The only exception possible from the control file here453 # is error.JobError as _runtest() turns all others into an454 # UnhandledTestError that is caught above.455 @_run_test_complete_on_exit456 def run_test_detail(self, url, *args, **dargs):457 """458 Summon a test object and run it, returning test status.459 @param url A url that identifies the test to run.460 @param tag An optional keyword argument that will be added to the461 test and subdir name.462 @param subdir_tag An optional keyword argument that will be added463 to the subdir name.464 @returns Test status465 @see: client/common_lib/error.py, exit_status466 """467 (subdir, testname, group_func, timeout) = self._run_test_base(url,468 *args,469 **dargs)470 try:471 self._rungroup(subdir, testname, group_func, timeout)472 return 'GOOD'473 except error.TestBaseException, detail:474 return detail.exit_status475 def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):476 """\477 subdir:478 name of the group479 testname:480 name of the test to run, or support step481 function:482 subroutine to run483 *args:484 arguments for the function485 Returns the result of the passed in function486 """487 try:488 optional_fields = None489 if timeout:490 optional_fields = {}491 optional_fields['timeout'] = timeout492 self.record('START', subdir, testname,493 optional_fields=optional_fields)494 self._state.set('client', 'unexpected_reboot', (subdir, testname))495 try:496 result = function(*args, **dargs)497 self.record('END GOOD', subdir, testname)498 return result499 except error.TestBaseException, e:500 self.record('END %s' % e.exit_status, subdir, testname)501 raise502 except error.JobError, e:503 self.record('END ABORT', subdir, testname)504 raise505 except Exception, e:506 # This should only ever happen due to a bug in the given507 # function's code. The common case of being called by508 # run_test() will never reach this. If a control file called509 # run_group() itself, bugs in its function will be caught510 # here.511 err_msg = str(e) + '\n' + traceback.format_exc()512 self.record('END ERROR', subdir, testname, err_msg)513 raise514 finally:515 self._state.discard('client', 'unexpected_reboot')516 def run_group(self, function, tag=None, **dargs):517 """518 Run a function nested within a group level.519 function:520 Callable to run.521 tag:522 An optional tag name for the group. If None (default)523 function.__name__ will be used.524 **dargs:525 Named arguments for the function.526 """527 if tag:528 name = tag529 else:530 name = function.__name__531 try:532 return self._rungroup(subdir=None, testname=name,533 function=function, timeout=None, **dargs)534 except (SystemExit, error.TestBaseException):535 raise536 # If there was a different exception, turn it into a TestError.537 # It will be caught by step_engine or _run_step_fn.538 except Exception, e:539 raise error.UnhandledTestError(e)540 def cpu_count(self):541 return utils.count_cpus() # use total system count542 def start_reboot(self):543 self.record('START', None, 'reboot')544 self.record('GOOD', None, 'reboot.start')545 def _record_reboot_failure(self, subdir, operation, status,546 running_id=None):547 self.record("ABORT", subdir, operation, status)548 if not running_id:549 running_id = utils.running_os_ident()550 kernel = {"kernel": running_id.split("::")[0]}551 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)552 def _check_post_reboot(self, subdir, running_id=None):553 """554 Function to perform post boot checks such as if the system configuration555 has changed across reboots (specifically, CPUs and partitions).556 @param subdir: The subdir to use in the job.record call.557 @param running_id: An optional running_id to include in the reboot558 failure log message559 @raise JobError: Raised if the current configuration does not match the560 pre-reboot configuration.561 """562 # check to see if any partitions have changed563 partition_list = partition_lib.get_partition_list(self,564 exclude_swap=False)565 mount_info = partition_lib.get_mount_info(partition_list)566 old_mount_info = self._state.get('client', 'mount_info')567 if mount_info != old_mount_info:568 new_entries = mount_info - old_mount_info569 old_entries = old_mount_info - mount_info570 description = ("mounted partitions are different after reboot "571 "(old entries: %s, new entries: %s)" %572 (old_entries, new_entries))573 self._record_reboot_failure(subdir, "reboot.verify_config",574 description, running_id=running_id)575 raise error.JobError("Reboot failed: %s" % description)576 # check to see if any CPUs have changed577 cpu_count = utils.count_cpus()578 old_count = self._state.get('client', 'cpu_count')579 if cpu_count != old_count:580 description = ('Number of CPUs changed after reboot '581 '(old count: %d, new count: %d)' %582 (old_count, cpu_count))583 self._record_reboot_failure(subdir, 'reboot.verify_config',584 description, running_id=running_id)585 raise error.JobError('Reboot failed: %s' % description)586 def partition(self, device, loop_size=0, mountpoint=None):587 """588 Work with a machine partition589 @param device: e.g. /dev/sda2, /dev/sdb1 etc...590 @param mountpoint: Specify a directory to mount to. If not specified591 autotest tmp directory will be used.592 @param loop_size: Size of loopback device (in MB). Defaults to 0.593 @return: A L{client.bin.partition.partition} object594 """595 if not mountpoint:596 mountpoint = self.tmpdir597 return partition_lib.partition(self, device, loop_size, mountpoint)598 @utils.deprecated599 def filesystem(self, device, mountpoint=None, loop_size=0):600 """ Same as partition601 @deprecated: Use partition method instead602 """603 return self.partition(device, loop_size, mountpoint)604 def enable_external_logging(self):605 pass606 def disable_external_logging(self):607 pass608 def reboot_setup(self):609 # save the partition list and mount points, as well as the cpu count610 partition_list = partition_lib.get_partition_list(self,611 exclude_swap=False)612 mount_info = partition_lib.get_mount_info(partition_list)613 self._state.set('client', 'mount_info', mount_info)614 self._state.set('client', 'cpu_count', utils.count_cpus())615 def reboot(self):616 self.reboot_setup()617 self.harness.run_reboot()618 # HACK: using this as a module sometimes hangs shutdown, so if it's619 # installed unload it first620 utils.system("modprobe -r netconsole", ignore_status=True)621 # sync first, so that a sync during shutdown doesn't time out622 utils.system("sync; sync", ignore_status=True)623 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")624 self.quit()625 def noop(self, text):626 logging.info("job: noop: " + text)627 @_run_test_complete_on_exit628 def parallel(self, *tasklist):629 """Run tasks in parallel"""630 pids = []631 old_log_filename = self._logger.global_filename632 for i, task in enumerate(tasklist):633 assert isinstance(task, (tuple, list))634 self._logger.global_filename = old_log_filename + (".%d" % i)635 def task_func():636 # stub out _record_indent with a process-local one637 base_record_indent = self._record_indent638 proc_local = self._job_state.property_factory(639 '_state', '_record_indent.%d' % os.getpid(),640 base_record_indent, namespace='client')641 self.__class__._record_indent = proc_local642 task[0](*task[1:])643 pids.append(parallel.fork_start(self.resultdir, task_func))644 old_log_path = os.path.join(self.resultdir, old_log_filename)645 old_log = open(old_log_path, "a")646 exceptions = []647 for i, pid in enumerate(pids):648 # wait for the task to finish649 try:650 parallel.fork_waitfor(self.resultdir, pid)651 except Exception, e:652 exceptions.append(e)653 # copy the logs from the subtask into the main log654 new_log_path = old_log_path + (".%d" % i)655 if os.path.exists(new_log_path):656 new_log = open(new_log_path)657 old_log.write(new_log.read())658 new_log.close()659 old_log.flush()660 os.remove(new_log_path)661 old_log.close()662 self._logger.global_filename = old_log_filename663 # handle any exceptions raised by the parallel tasks664 if exceptions:665 msg = "%d task(s) failed in job.parallel" % len(exceptions)666 raise error.JobError(msg)667 def quit(self):668 # XXX: should have a better name.669 self.harness.run_pause()670 raise error.JobContinue("more to come")671 def complete(self, status):672 """Write pending reports, clean up, and exit"""673 # write out a job HTML report674 try:675 html_report.create_report(self.resultdir)676 except Exception, e:677 logging.error("Error writing job HTML report: %s", e)678 # We are about to exit 'complete' so clean up the control file.679 dest = os.path.join(self.resultdir, os.path.basename(self._state_file))680 shutil.move(self._state_file, dest)681 self.harness.run_complete()682 self.disable_external_logging()683 sys.exit(status)684 def _load_state(self):685 # grab any initial state and set up $CONTROL.state as the backing file686 init_state_file = self.control + '.init.state'687 self._state_file = self.control + '.state'688 if os.path.exists(init_state_file):689 shutil.move(init_state_file, self._state_file)690 self._state.set_backing_file(self._state_file)691 # initialize the state engine, if necessary692 has_steps = self._state.has('client', 'steps')693 if not self._is_continuation and has_steps:694 raise RuntimeError('Loaded state can only contain client.steps if '695 'this is a continuation')696 if not has_steps:697 logging.debug('Initializing the state engine')698 self._state.set('client', 'steps', [])699 def handle_persistent_option(self, options, option_name):700 """701 Select option from command line or persistent state.702 Store selected option to allow standalone client to continue703 after reboot with previously selected options.704 Priority:705 1. explicitly specified via command line706 2. stored in state file (if continuing job '-c')707 3. default == None708 """709 option = None710 cmd_line_option = getattr(options, option_name)711 if cmd_line_option:712 option = cmd_line_option713 self._state.set('client', option_name, option)714 else:715 stored_option = self._state.get('client', option_name, None)716 if stored_option:717 option = stored_option718 logging.debug('Persistent option %s now set to %s', option_name, option)719 return option720 def __create_step_tuple(self, fn, args, dargs):721 # Legacy code passes in an array where the first arg is722 # the function or its name.723 if isinstance(fn, list):724 assert(len(args) == 0)725 assert(len(dargs) == 0)726 args = fn[1:]727 fn = fn[0]728 # Pickling actual functions is hairy, thus we have to call729 # them by name. Unfortunately, this means only functions730 # defined globally can be used as a next step.731 if callable(fn):732 fn = fn.__name__733 if not isinstance(fn, types.StringTypes):734 raise StepError("Next steps must be functions or "735 "strings containing the function name")736 ancestry = copy.copy(self._current_step_ancestry)737 return (ancestry, fn, args, dargs)738 def next_step_append(self, fn, *args, **dargs):739 """Define the next step and place it at the end"""740 steps = self._state.get('client', 'steps')741 steps.append(self.__create_step_tuple(fn, args, dargs))742 self._state.set('client', 'steps', steps)743 def next_step(self, fn, *args, **dargs):744 """Create a new step and place it after any steps added745 while running the current step but before any steps added in746 previous steps"""747 steps = self._state.get('client', 'steps')748 steps.insert(self._next_step_index,749 self.__create_step_tuple(fn, args, dargs))750 self._next_step_index += 1751 self._state.set('client', 'steps', steps)752 def next_step_prepend(self, fn, *args, **dargs):753 """Insert a new step, executing first"""754 steps = self._state.get('client', 'steps')755 steps.insert(0, self.__create_step_tuple(fn, args, dargs))756 self._next_step_index += 1757 self._state.set('client', 'steps', steps)758 def _run_step_fn(self, local_vars, fn, args, dargs):759 """Run a (step) function within the given context"""760 local_vars['__args'] = args761 local_vars['__dargs'] = dargs762 try:763 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)764 return local_vars['__ret']765 except SystemExit:766 raise # Send error.JobContinue and JobComplete on up to runjob.767 except error.TestNAError, detail:768 self.record(detail.exit_status, None, fn, str(detail))769 except Exception, detail:770 raise error.UnhandledJobError(detail)771 def _create_frame(self, global_vars, ancestry, fn_name):772 """Set up the environment like it would have been when this773 function was first defined.774 Child step engine 'implementations' must have 'return locals()'775 at end end of their steps. Because of this, we can call the776 parent function and get back all child functions (i.e. those777 defined within it).778 Unfortunately, the call stack of the function calling779 job.next_step might have been deeper than the function it780 added. In order to make sure that the environment is what it781 should be, we need to then pop off the frames we built until782 we find the frame where the function was first defined."""783 # The copies ensure that the parent frames are not modified784 # while building child frames. This matters if we then785 # pop some frames in the next part of this function.786 current_frame = copy.copy(global_vars)787 frames = [current_frame]788 for steps_fn_name in ancestry:789 ret = self._run_step_fn(current_frame, steps_fn_name, [], {})790 current_frame = copy.copy(ret)791 frames.append(current_frame)792 # Walk up the stack frames until we find the place fn_name was defined.793 while len(frames) > 2:794 if fn_name not in frames[-2]:795 break796 if frames[-2][fn_name] != frames[-1][fn_name]:797 break798 frames.pop()799 ancestry.pop()800 return (frames[-1], ancestry)801 def _add_step_init(self, local_vars, current_function):802 """If the function returned a dictionary that includes a803 function named 'step_init', prepend it to our list of steps.804 This will only get run the first time a function with a nested805 use of the step engine is run."""806 if (isinstance(local_vars, dict) and807 'step_init' in local_vars and808 callable(local_vars['step_init'])):809 # The init step is a child of the function810 # we were just running.811 self._current_step_ancestry.append(current_function)812 self.next_step_prepend('step_init')813 def step_engine(self):814 """The multi-run engine used when the control file defines step_init.815 Does the next step.816 """817 # Set up the environment and then interpret the control file.818 # Some control files will have code outside of functions,819 # which means we need to have our state engine initialized820 # before reading in the file.821 global_control_vars = {'job': self,822 'args': self.args}823 exec(JOB_PREAMBLE, global_control_vars, global_control_vars)824 try:825 execfile(self.control, global_control_vars, global_control_vars)826 except error.TestNAError, detail:827 self.record(detail.exit_status, None, self.control, str(detail))828 except SystemExit:829 raise # Send error.JobContinue and JobComplete on up to runjob.830 except Exception, detail:831 # Syntax errors or other general Python exceptions coming out of832 # the top level of the control file itself go through here.833 raise error.UnhandledJobError(detail)834 # If we loaded in a mid-job state file, then we presumably835 # know what steps we have yet to run.836 if not self._is_continuation:837 if 'step_init' in global_control_vars:838 self.next_step(global_control_vars['step_init'])839 else:840 # if last job failed due to unexpected reboot, record it as fail841 # so harness gets called842 last_job = self._state.get('client', 'unexpected_reboot', None)843 if last_job:844 subdir, testname = last_job845 self.record('FAIL', subdir, testname, 'unexpected reboot')846 self.record('END FAIL', subdir, testname)847 # Iterate through the steps. If we reboot, we'll simply848 # continue iterating on the next step.849 while len(self._state.get('client', 'steps')) > 0:850 steps = self._state.get('client', 'steps')851 (ancestry, fn_name, args, dargs) = steps.pop(0)852 self._state.set('client', 'steps', steps)853 self._next_step_index = 0854 ret = self._create_frame(global_control_vars, ancestry, fn_name)855 local_vars, self._current_step_ancestry = ret856 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)857 self._add_step_init(local_vars, fn_name)858 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):859 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),860 on_every_test)861 def add_sysinfo_logfile(self, file, on_every_test=False):862 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)863 def _add_sysinfo_loggable(self, loggable, on_every_test):864 if on_every_test:865 self.sysinfo.test_loggables.add(loggable)866 else:867 self.sysinfo.boot_loggables.add(loggable)868 self._save_sysinfo_state()869 def _load_sysinfo_state(self):870 state = self._state.get('client', 'sysinfo', None)871 if state:872 self.sysinfo.deserialize(state)873 def _save_sysinfo_state(self):874 state = self.sysinfo.serialize()875 self._state.set('client', 'sysinfo', state)876class disk_usage_monitor:877 def __init__(self, logging_func, device, max_mb_per_hour):878 self.func = logging_func879 self.device = device880 self.max_mb_per_hour = max_mb_per_hour881 def start(self):882 self.initial_space = utils.freespace(self.device)883 self.start_time = time.time()884 def stop(self):885 # if no maximum usage rate was set, we don't need to886 # generate any warnings887 if not self.max_mb_per_hour:888 return889 final_space = utils.freespace(self.device)890 used_space = self.initial_space - final_space891 stop_time = time.time()892 total_time = stop_time - self.start_time893 # round up the time to one minute, to keep extremely short894 # tests from generating false positives due to short, badly895 # timed bursts of activity896 total_time = max(total_time, 60.0)897 # determine the usage rate898 bytes_per_sec = used_space / total_time899 mb_per_sec = bytes_per_sec / 1024**2900 mb_per_hour = mb_per_sec * 60 * 60901 if mb_per_hour > self.max_mb_per_hour:902 msg = ("disk space on %s was consumed at a rate of %.2f MB/hour")903 msg %= (self.device, mb_per_hour)904 self.func(msg)905 @classmethod906 def watch(cls, *monitor_args, **monitor_dargs):907 """ Generic decorator to wrap a function call with the908 standard create-monitor -> start -> call -> stop idiom."""909 def decorator(func):910 def watched_func(*args, **dargs):911 monitor = cls(*monitor_args, **monitor_dargs)912 monitor.start()913 try:914 func(*args, **dargs)915 finally:916 monitor.stop()917 return watched_func918 return decorator919def runjob(control, drop_caches, options):920 """921 Run a job using the given control file.922 This is the main interface to this module.923 @see base_job.__init__ for parameter info.924 """925 control = os.path.abspath(control)926 state = control + '.state'927 # Ensure state file is cleaned up before the job starts to run if autotest928 # is not running with the --continue flag929 if not options.cont and os.path.isfile(state):930 logging.debug('Cleaning up previously found state file')931 os.remove(state)932 # instantiate the job object ready for the control file.933 myjob = None934 try:935 # Check that the control file is valid936 if not os.path.exists(control):937 raise error.JobError(control + ": control file not found")938 # When continuing, the job is complete when there is no939 # state file, ensure we don't try and continue.940 if options.cont and not os.path.exists(state):941 raise error.JobComplete("all done")942 myjob = job(control=control, drop_caches=drop_caches, options=options)943 # Load in the users control file, may do any one of:944 # 1) execute in toto945 # 2) define steps, and select the first via next_step()946 myjob.step_engine()947 except error.JobContinue:948 sys.exit(5)949 except error.JobComplete:950 sys.exit(1)951 except error.JobError, instance:952 logging.error("JOB ERROR: " + str(instance))953 if myjob:954 command = None955 if len(instance.args) > 1:956 command = instance.args[1]957 myjob.record('ABORT', None, command, str(instance))958 myjob.record('END ABORT', None, None, str(instance))959 assert myjob._record_indent == 0960 myjob.complete(1)961 else:962 sys.exit(1)963 except Exception, e:964 # NOTE: job._run_step_fn and job.step_engine will turn things into965 # a JobError for us. If we get here, its likely an autotest bug.966 msg = str(e) + '\n' + traceback.format_exc()967 logging.critical("JOB ERROR (autotest bug?): " + msg)968 if myjob:969 myjob.record('END ABORT', None, None, msg)970 assert myjob._record_indent == 0971 myjob.complete(1)972 else:973 sys.exit(1)974 # If we get here, then we assume the job is complete and good.975 myjob.record('END GOOD', None, None)976 assert myjob._record_indent == 0977 myjob.complete(0)978class job(base_client_job):979 def __init__(self, *args, **kwargs):980 base_client_job.__init__(self, *args, **kwargs)981 def run_test(self, url, *args, **dargs):982 log_pauser = cros_logging.LogRotationPauser()983 passed = False984 try:985 log_pauser.begin()986 passed = base_client_job.run_test(self, url, *args, **dargs)987 if not passed:988 # Save the VM state immediately after the test failure.989 # This is a NOOP if the the test isn't running in a VM or990 # if the VM is not properly configured to save state.991 _group, testname = self.pkgmgr.get_package_name(url, 'test')992 now = datetime.now().strftime('%I:%M:%S.%f')993 checkpoint_name = '%s-%s' % (testname, now)994 utils.save_vm_state(checkpoint_name)995 finally:996 log_pauser.end()997 return passed998 def reboot(self):999 self.reboot_setup()1000 self.harness.run_reboot()1001 # sync first, so that a sync during shutdown doesn't time out1002 utils.system('sync; sync', ignore_status=True)1003 utils.system('reboot </dev/null >/dev/null 2>&1 &')1004 self.quit()1005 def require_gcc(self):...
autotest.py
Source:autotest.py
...383 if section > 0:384 args.append('-c')385 if self.tag:386 args.append('-t %s' % self.tag)387 if self.host.job.use_external_logging():388 args.append('-l')389 if self.host.hostname:390 args.append('--hostname=%s' % self.host.hostname)391 args.append('--user=%s' % self.host.job.user)392 args.append(self.remote_control_file)393 return args394 def get_background_cmd(self, section):395 cmd = ['nohup', os.path.join(self.autodir, 'bin/autotest_client')]396 cmd += self.get_base_cmd_args(section)397 cmd += ['>/dev/null', '2>/dev/null', '&']398 return ' '.join(cmd)399 def get_daemon_cmd(self, section, monitor_dir):400 cmd = ['nohup', os.path.join(self.autodir, 'bin/autotestd'),401 monitor_dir, '-H autoserv']...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!