Best Python code snippet using autotest_python
job.py
Source:job.py
...189 """190 self._init_drop_caches(drop_caches)191 self._init_packages()192 self.sysinfo = sysinfo.sysinfo(self.resultdir)193 self._load_sysinfo_state()194 if not options.cont:195 download = os.path.join(self.testdir, 'download')196 if not os.path.exists(download):197 os.mkdir(download)198 shutil.copyfile(self.control,199 os.path.join(self.resultdir, 'control'))200 self.control = control201 self.logging = logging_manager.get_logging_manager(202 manage_stdout_and_stderr=True, redirect_fds=True)203 self.logging.start_logging()204 self.profilers = profilers.profilers(self)205 self.machines = [options.hostname]206 self.machine_dict_list = [{'hostname' : options.hostname}]207 # Client side tests should always run the same whether or not they are208 # running in the lab.209 self.in_lab = False210 self.hosts = set([local_host.LocalHost(hostname=options.hostname)])211 self.args = []212 if options.args:213 self.args = self._parse_args(options.args)214 if options.user:215 self.user = options.user216 else:217 self.user = getpass.getuser()218 self.sysinfo.log_per_reboot_data()219 if not options.cont:220 self.record('START', None, None)221 self.harness.run_start()222 if options.log:223 self.enable_external_logging()224 self.num_tests_run = None225 self.num_tests_failed = None226 self.warning_loggers = None227 self.warning_manager = None228 def _init_drop_caches(self, drop_caches):229 """230 Perform the drop caches initialization.231 """232 self.drop_caches_between_iterations = (233 GLOBAL_CONFIG.get_config_value('CLIENT',234 'drop_caches_between_iterations',235 type=bool, default=True))236 self.drop_caches = drop_caches237 if self.drop_caches:238 utils.drop_caches()239 def _init_packages(self):240 """241 Perform the packages support initialization.242 """243 self.pkgmgr = packages.PackageManager(244 self.autodir, run_function_dargs={'timeout':3600})245 def _cleanup_results_dir(self):246 """Delete everything in resultsdir"""247 assert os.path.exists(self.resultdir)248 list_files = glob.glob('%s/*' % self.resultdir)249 for f in list_files:250 if os.path.isdir(f):251 shutil.rmtree(f)252 elif os.path.isfile(f):253 os.remove(f)254 def _cleanup_debugdir_files(self):255 """256 Delete any leftover debugdir files257 """258 list_files = glob.glob("/tmp/autotest_results_dir.*")259 for f in list_files:260 os.remove(f)261 def disable_warnings(self, warning_type):262 self.record("INFO", None, None,263 "disabling %s warnings" % warning_type,264 {"warnings.disable": warning_type})265 time.sleep(self._WARNING_DISABLE_DELAY)266 def enable_warnings(self, warning_type):267 time.sleep(self._WARNING_DISABLE_DELAY)268 self.record("INFO", None, None,269 "enabling %s warnings" % warning_type,270 {"warnings.enable": warning_type})271 def monitor_disk_usage(self, max_rate):272 """\273 Signal that the job should monitor disk space usage on /274 and generate a warning if a test uses up disk space at a275 rate exceeding 'max_rate'.276 Parameters:277 max_rate - the maximium allowed rate of disk consumption278 during a test, in MB/hour, or 0 to indicate279 no limit.280 """281 self._max_disk_usage_rate = max_rate282 def control_get(self):283 return self.control284 def control_set(self, control):285 self.control = os.path.abspath(control)286 def harness_select(self, which, harness_args):287 self.harness = harness.select(which, self, harness_args)288 def setup_dirs(self, results_dir, tmp_dir):289 if not tmp_dir:290 tmp_dir = os.path.join(self.tmpdir, 'build')291 if not os.path.exists(tmp_dir):292 os.mkdir(tmp_dir)293 if not os.path.isdir(tmp_dir):294 e_msg = "Temp dir (%s) is not a dir - args backwards?" % self.tmpdir295 raise ValueError(e_msg)296 # We label the first build "build" and then subsequent ones297 # as "build.2", "build.3", etc. Whilst this is a little bit298 # inconsistent, 99.9% of jobs will only have one build299 # (that's not done as kernbench, sparse, or buildtest),300 # so it works out much cleaner. One of life's compromises.301 if not results_dir:302 results_dir = os.path.join(self.resultdir, 'build')303 i = 2304 while os.path.exists(results_dir):305 results_dir = os.path.join(self.resultdir, 'build.%d' % i)306 i += 1307 if not os.path.exists(results_dir):308 os.mkdir(results_dir)309 return (results_dir, tmp_dir)310 def barrier(self, *args, **kwds):311 """Create a barrier object"""312 return barrier.barrier(*args, **kwds)313 def install_pkg(self, name, pkg_type, install_dir):314 '''315 This method is a simple wrapper around the actual package316 installation method in the Packager class. This is used317 internally by the profilers, deps and tests code.318 name : name of the package (ex: sleeptest, dbench etc.)319 pkg_type : Type of the package (ex: test, dep etc.)320 install_dir : The directory in which the source is actually321 untarred into. (ex: client/profilers/<name> for profilers)322 '''323 if self.pkgmgr.repositories:324 self.pkgmgr.install_pkg(name, pkg_type, self.pkgdir, install_dir)325 def add_repository(self, repo_urls):326 '''327 Adds the repository locations to the job so that packages328 can be fetched from them when needed. The repository list329 needs to be a string list330 Ex: job.add_repository(['http://blah1','http://blah2'])331 '''332 for repo_url in repo_urls:333 self.pkgmgr.add_repository(repo_url)334 # Fetch the packages' checksum file that contains the checksums335 # of all the packages if it is not already fetched. The checksum336 # is always fetched whenever a job is first started. This337 # is not done in the job's constructor as we don't have the list of338 # the repositories there (and obviously don't care about this file339 # if we are not using the repos)340 try:341 checksum_file_path = os.path.join(self.pkgmgr.pkgmgr_dir,342 packages.CHECKSUM_FILE)343 self.pkgmgr.fetch_pkg(packages.CHECKSUM_FILE,344 checksum_file_path, use_checksum=False)345 except error.PackageFetchError:346 # packaging system might not be working in this case347 # Silently fall back to the normal case348 pass349 def require_gcc(self):350 """351 Test whether gcc is installed on the machine.352 """353 # check if gcc is installed on the system.354 try:355 utils.system('which gcc')356 except error.CmdError:357 raise NotAvailableError('gcc is required by this job and is '358 'not available on the system')359 def setup_dep(self, deps):360 """Set up the dependencies for this test.361 deps is a list of libraries required for this test.362 """363 # Fetch the deps from the repositories and set them up.364 for dep in deps:365 dep_dir = os.path.join(self.autodir, 'deps', dep)366 # Search for the dependency in the repositories if specified,367 # else check locally.368 try:369 self.install_pkg(dep, 'dep', dep_dir)370 except error.PackageInstallError:371 # see if the dep is there locally372 pass373 # dep_dir might not exist if it is not fetched from the repos374 if not os.path.exists(dep_dir):375 raise error.TestError("Dependency %s does not exist" % dep)376 os.chdir(dep_dir)377 if execfile('%s.py' % dep, {}) is None:378 logging.info('Dependency %s successfuly built', dep)379 def _runtest(self, url, tag, timeout, args, dargs):380 try:381 l = lambda : test.runtest(self, url, tag, args, dargs)382 pid = parallel.fork_start(self.resultdir, l)383 if timeout:384 logging.debug('Waiting for pid %d for %d seconds', pid, timeout)385 parallel.fork_waitfor_timed(self.resultdir, pid, timeout)386 else:387 parallel.fork_waitfor(self.resultdir, pid)388 except error.TestBaseException:389 # These are already classified with an error type (exit_status)390 raise391 except error.JobError:392 raise # Caught further up and turned into an ABORT.393 except Exception, e:394 # Converts all other exceptions thrown by the test regardless395 # of phase into a TestError(TestBaseException) subclass that396 # reports them with their full stack trace.397 raise error.UnhandledTestError(e)398 def _run_test_base(self, url, *args, **dargs):399 """400 Prepares arguments and run functions to run_test and run_test_detail.401 @param url A url that identifies the test to run.402 @param tag An optional keyword argument that will be added to the403 test and subdir name.404 @param subdir_tag An optional keyword argument that will be added405 to the subdir name.406 @returns:407 subdir: Test subdirectory408 testname: Test name409 group_func: Actual test run function410 timeout: Test timeout411 """412 _group, testname = self.pkgmgr.get_package_name(url, 'test')413 testname, subdir, tag = self._build_tagged_test_name(testname, dargs)414 self._make_test_outputdir(subdir)415 timeout = dargs.pop('timeout', None)416 if timeout:417 logging.debug('Test has timeout: %d sec.', timeout)418 def log_warning(reason):419 self.record("WARN", subdir, testname, reason)420 @disk_usage_monitor.watch(log_warning, "/", self._max_disk_usage_rate)421 def group_func():422 try:423 self._runtest(url, tag, timeout, args, dargs)424 except error.TestBaseException, detail:425 # The error is already classified, record it properly.426 self.record(detail.exit_status, subdir, testname, str(detail))427 raise428 else:429 self.record('GOOD', subdir, testname, 'completed successfully')430 return (subdir, testname, group_func, timeout)431 @_run_test_complete_on_exit432 def run_test(self, url, *args, **dargs):433 """434 Summon a test object and run it.435 @param url A url that identifies the test to run.436 @param tag An optional keyword argument that will be added to the437 test and subdir name.438 @param subdir_tag An optional keyword argument that will be added439 to the subdir name.440 @returns True if the test passes, False otherwise.441 """442 (subdir, testname, group_func, timeout) = self._run_test_base(url,443 *args,444 **dargs)445 try:446 self._rungroup(subdir, testname, group_func, timeout)447 return True448 except error.TestBaseException:449 return False450 # Any other exception here will be given to the caller451 #452 # NOTE: The only exception possible from the control file here453 # is error.JobError as _runtest() turns all others into an454 # UnhandledTestError that is caught above.455 @_run_test_complete_on_exit456 def run_test_detail(self, url, *args, **dargs):457 """458 Summon a test object and run it, returning test status.459 @param url A url that identifies the test to run.460 @param tag An optional keyword argument that will be added to the461 test and subdir name.462 @param subdir_tag An optional keyword argument that will be added463 to the subdir name.464 @returns Test status465 @see: client/common_lib/error.py, exit_status466 """467 (subdir, testname, group_func, timeout) = self._run_test_base(url,468 *args,469 **dargs)470 try:471 self._rungroup(subdir, testname, group_func, timeout)472 return 'GOOD'473 except error.TestBaseException, detail:474 return detail.exit_status475 def _rungroup(self, subdir, testname, function, timeout, *args, **dargs):476 """\477 subdir:478 name of the group479 testname:480 name of the test to run, or support step481 function:482 subroutine to run483 *args:484 arguments for the function485 Returns the result of the passed in function486 """487 try:488 optional_fields = None489 if timeout:490 optional_fields = {}491 optional_fields['timeout'] = timeout492 self.record('START', subdir, testname,493 optional_fields=optional_fields)494 self._state.set('client', 'unexpected_reboot', (subdir, testname))495 try:496 result = function(*args, **dargs)497 self.record('END GOOD', subdir, testname)498 return result499 except error.TestBaseException, e:500 self.record('END %s' % e.exit_status, subdir, testname)501 raise502 except error.JobError, e:503 self.record('END ABORT', subdir, testname)504 raise505 except Exception, e:506 # This should only ever happen due to a bug in the given507 # function's code. The common case of being called by508 # run_test() will never reach this. If a control file called509 # run_group() itself, bugs in its function will be caught510 # here.511 err_msg = str(e) + '\n' + traceback.format_exc()512 self.record('END ERROR', subdir, testname, err_msg)513 raise514 finally:515 self._state.discard('client', 'unexpected_reboot')516 def run_group(self, function, tag=None, **dargs):517 """518 Run a function nested within a group level.519 function:520 Callable to run.521 tag:522 An optional tag name for the group. If None (default)523 function.__name__ will be used.524 **dargs:525 Named arguments for the function.526 """527 if tag:528 name = tag529 else:530 name = function.__name__531 try:532 return self._rungroup(subdir=None, testname=name,533 function=function, timeout=None, **dargs)534 except (SystemExit, error.TestBaseException):535 raise536 # If there was a different exception, turn it into a TestError.537 # It will be caught by step_engine or _run_step_fn.538 except Exception, e:539 raise error.UnhandledTestError(e)540 def cpu_count(self):541 return utils.count_cpus() # use total system count542 def start_reboot(self):543 self.record('START', None, 'reboot')544 self.record('GOOD', None, 'reboot.start')545 def _record_reboot_failure(self, subdir, operation, status,546 running_id=None):547 self.record("ABORT", subdir, operation, status)548 if not running_id:549 running_id = utils.running_os_ident()550 kernel = {"kernel": running_id.split("::")[0]}551 self.record("END ABORT", subdir, 'reboot', optional_fields=kernel)552 def _check_post_reboot(self, subdir, running_id=None):553 """554 Function to perform post boot checks such as if the system configuration555 has changed across reboots (specifically, CPUs and partitions).556 @param subdir: The subdir to use in the job.record call.557 @param running_id: An optional running_id to include in the reboot558 failure log message559 @raise JobError: Raised if the current configuration does not match the560 pre-reboot configuration.561 """562 # check to see if any partitions have changed563 partition_list = partition_lib.get_partition_list(self,564 exclude_swap=False)565 mount_info = partition_lib.get_mount_info(partition_list)566 old_mount_info = self._state.get('client', 'mount_info')567 if mount_info != old_mount_info:568 new_entries = mount_info - old_mount_info569 old_entries = old_mount_info - mount_info570 description = ("mounted partitions are different after reboot "571 "(old entries: %s, new entries: %s)" %572 (old_entries, new_entries))573 self._record_reboot_failure(subdir, "reboot.verify_config",574 description, running_id=running_id)575 raise error.JobError("Reboot failed: %s" % description)576 # check to see if any CPUs have changed577 cpu_count = utils.count_cpus()578 old_count = self._state.get('client', 'cpu_count')579 if cpu_count != old_count:580 description = ('Number of CPUs changed after reboot '581 '(old count: %d, new count: %d)' %582 (old_count, cpu_count))583 self._record_reboot_failure(subdir, 'reboot.verify_config',584 description, running_id=running_id)585 raise error.JobError('Reboot failed: %s' % description)586 def partition(self, device, loop_size=0, mountpoint=None):587 """588 Work with a machine partition589 @param device: e.g. /dev/sda2, /dev/sdb1 etc...590 @param mountpoint: Specify a directory to mount to. If not specified591 autotest tmp directory will be used.592 @param loop_size: Size of loopback device (in MB). Defaults to 0.593 @return: A L{client.bin.partition.partition} object594 """595 if not mountpoint:596 mountpoint = self.tmpdir597 return partition_lib.partition(self, device, loop_size, mountpoint)598 @utils.deprecated599 def filesystem(self, device, mountpoint=None, loop_size=0):600 """ Same as partition601 @deprecated: Use partition method instead602 """603 return self.partition(device, loop_size, mountpoint)604 def enable_external_logging(self):605 pass606 def disable_external_logging(self):607 pass608 def reboot_setup(self):609 # save the partition list and mount points, as well as the cpu count610 partition_list = partition_lib.get_partition_list(self,611 exclude_swap=False)612 mount_info = partition_lib.get_mount_info(partition_list)613 self._state.set('client', 'mount_info', mount_info)614 self._state.set('client', 'cpu_count', utils.count_cpus())615 def reboot(self):616 self.reboot_setup()617 self.harness.run_reboot()618 # HACK: using this as a module sometimes hangs shutdown, so if it's619 # installed unload it first620 utils.system("modprobe -r netconsole", ignore_status=True)621 # sync first, so that a sync during shutdown doesn't time out622 utils.system("sync; sync", ignore_status=True)623 utils.system("(sleep 5; reboot) </dev/null >/dev/null 2>&1 &")624 self.quit()625 def noop(self, text):626 logging.info("job: noop: " + text)627 @_run_test_complete_on_exit628 def parallel(self, *tasklist):629 """Run tasks in parallel"""630 pids = []631 old_log_filename = self._logger.global_filename632 for i, task in enumerate(tasklist):633 assert isinstance(task, (tuple, list))634 self._logger.global_filename = old_log_filename + (".%d" % i)635 def task_func():636 # stub out _record_indent with a process-local one637 base_record_indent = self._record_indent638 proc_local = self._job_state.property_factory(639 '_state', '_record_indent.%d' % os.getpid(),640 base_record_indent, namespace='client')641 self.__class__._record_indent = proc_local642 task[0](*task[1:])643 pids.append(parallel.fork_start(self.resultdir, task_func))644 old_log_path = os.path.join(self.resultdir, old_log_filename)645 old_log = open(old_log_path, "a")646 exceptions = []647 for i, pid in enumerate(pids):648 # wait for the task to finish649 try:650 parallel.fork_waitfor(self.resultdir, pid)651 except Exception, e:652 exceptions.append(e)653 # copy the logs from the subtask into the main log654 new_log_path = old_log_path + (".%d" % i)655 if os.path.exists(new_log_path):656 new_log = open(new_log_path)657 old_log.write(new_log.read())658 new_log.close()659 old_log.flush()660 os.remove(new_log_path)661 old_log.close()662 self._logger.global_filename = old_log_filename663 # handle any exceptions raised by the parallel tasks664 if exceptions:665 msg = "%d task(s) failed in job.parallel" % len(exceptions)666 raise error.JobError(msg)667 def quit(self):668 # XXX: should have a better name.669 self.harness.run_pause()670 raise error.JobContinue("more to come")671 def complete(self, status):672 """Write pending reports, clean up, and exit"""673 # write out a job HTML report674 try:675 html_report.create_report(self.resultdir)676 except Exception, e:677 logging.error("Error writing job HTML report: %s", e)678 # We are about to exit 'complete' so clean up the control file.679 dest = os.path.join(self.resultdir, os.path.basename(self._state_file))680 shutil.move(self._state_file, dest)681 self.harness.run_complete()682 self.disable_external_logging()683 sys.exit(status)684 def _load_state(self):685 # grab any initial state and set up $CONTROL.state as the backing file686 init_state_file = self.control + '.init.state'687 self._state_file = self.control + '.state'688 if os.path.exists(init_state_file):689 shutil.move(init_state_file, self._state_file)690 self._state.set_backing_file(self._state_file)691 # initialize the state engine, if necessary692 has_steps = self._state.has('client', 'steps')693 if not self._is_continuation and has_steps:694 raise RuntimeError('Loaded state can only contain client.steps if '695 'this is a continuation')696 if not has_steps:697 logging.debug('Initializing the state engine')698 self._state.set('client', 'steps', [])699 def handle_persistent_option(self, options, option_name):700 """701 Select option from command line or persistent state.702 Store selected option to allow standalone client to continue703 after reboot with previously selected options.704 Priority:705 1. explicitly specified via command line706 2. stored in state file (if continuing job '-c')707 3. default == None708 """709 option = None710 cmd_line_option = getattr(options, option_name)711 if cmd_line_option:712 option = cmd_line_option713 self._state.set('client', option_name, option)714 else:715 stored_option = self._state.get('client', option_name, None)716 if stored_option:717 option = stored_option718 logging.debug('Persistent option %s now set to %s', option_name, option)719 return option720 def __create_step_tuple(self, fn, args, dargs):721 # Legacy code passes in an array where the first arg is722 # the function or its name.723 if isinstance(fn, list):724 assert(len(args) == 0)725 assert(len(dargs) == 0)726 args = fn[1:]727 fn = fn[0]728 # Pickling actual functions is hairy, thus we have to call729 # them by name. Unfortunately, this means only functions730 # defined globally can be used as a next step.731 if callable(fn):732 fn = fn.__name__733 if not isinstance(fn, types.StringTypes):734 raise StepError("Next steps must be functions or "735 "strings containing the function name")736 ancestry = copy.copy(self._current_step_ancestry)737 return (ancestry, fn, args, dargs)738 def next_step_append(self, fn, *args, **dargs):739 """Define the next step and place it at the end"""740 steps = self._state.get('client', 'steps')741 steps.append(self.__create_step_tuple(fn, args, dargs))742 self._state.set('client', 'steps', steps)743 def next_step(self, fn, *args, **dargs):744 """Create a new step and place it after any steps added745 while running the current step but before any steps added in746 previous steps"""747 steps = self._state.get('client', 'steps')748 steps.insert(self._next_step_index,749 self.__create_step_tuple(fn, args, dargs))750 self._next_step_index += 1751 self._state.set('client', 'steps', steps)752 def next_step_prepend(self, fn, *args, **dargs):753 """Insert a new step, executing first"""754 steps = self._state.get('client', 'steps')755 steps.insert(0, self.__create_step_tuple(fn, args, dargs))756 self._next_step_index += 1757 self._state.set('client', 'steps', steps)758 def _run_step_fn(self, local_vars, fn, args, dargs):759 """Run a (step) function within the given context"""760 local_vars['__args'] = args761 local_vars['__dargs'] = dargs762 try:763 exec('__ret = %s(*__args, **__dargs)' % fn, local_vars, local_vars)764 return local_vars['__ret']765 except SystemExit:766 raise # Send error.JobContinue and JobComplete on up to runjob.767 except error.TestNAError, detail:768 self.record(detail.exit_status, None, fn, str(detail))769 except Exception, detail:770 raise error.UnhandledJobError(detail)771 def _create_frame(self, global_vars, ancestry, fn_name):772 """Set up the environment like it would have been when this773 function was first defined.774 Child step engine 'implementations' must have 'return locals()'775 at end end of their steps. Because of this, we can call the776 parent function and get back all child functions (i.e. those777 defined within it).778 Unfortunately, the call stack of the function calling779 job.next_step might have been deeper than the function it780 added. In order to make sure that the environment is what it781 should be, we need to then pop off the frames we built until782 we find the frame where the function was first defined."""783 # The copies ensure that the parent frames are not modified784 # while building child frames. This matters if we then785 # pop some frames in the next part of this function.786 current_frame = copy.copy(global_vars)787 frames = [current_frame]788 for steps_fn_name in ancestry:789 ret = self._run_step_fn(current_frame, steps_fn_name, [], {})790 current_frame = copy.copy(ret)791 frames.append(current_frame)792 # Walk up the stack frames until we find the place fn_name was defined.793 while len(frames) > 2:794 if fn_name not in frames[-2]:795 break796 if frames[-2][fn_name] != frames[-1][fn_name]:797 break798 frames.pop()799 ancestry.pop()800 return (frames[-1], ancestry)801 def _add_step_init(self, local_vars, current_function):802 """If the function returned a dictionary that includes a803 function named 'step_init', prepend it to our list of steps.804 This will only get run the first time a function with a nested805 use of the step engine is run."""806 if (isinstance(local_vars, dict) and807 'step_init' in local_vars and808 callable(local_vars['step_init'])):809 # The init step is a child of the function810 # we were just running.811 self._current_step_ancestry.append(current_function)812 self.next_step_prepend('step_init')813 def step_engine(self):814 """The multi-run engine used when the control file defines step_init.815 Does the next step.816 """817 # Set up the environment and then interpret the control file.818 # Some control files will have code outside of functions,819 # which means we need to have our state engine initialized820 # before reading in the file.821 global_control_vars = {'job': self,822 'args': self.args}823 exec(JOB_PREAMBLE, global_control_vars, global_control_vars)824 try:825 execfile(self.control, global_control_vars, global_control_vars)826 except error.TestNAError, detail:827 self.record(detail.exit_status, None, self.control, str(detail))828 except SystemExit:829 raise # Send error.JobContinue and JobComplete on up to runjob.830 except Exception, detail:831 # Syntax errors or other general Python exceptions coming out of832 # the top level of the control file itself go through here.833 raise error.UnhandledJobError(detail)834 # If we loaded in a mid-job state file, then we presumably835 # know what steps we have yet to run.836 if not self._is_continuation:837 if 'step_init' in global_control_vars:838 self.next_step(global_control_vars['step_init'])839 else:840 # if last job failed due to unexpected reboot, record it as fail841 # so harness gets called842 last_job = self._state.get('client', 'unexpected_reboot', None)843 if last_job:844 subdir, testname = last_job845 self.record('FAIL', subdir, testname, 'unexpected reboot')846 self.record('END FAIL', subdir, testname)847 # Iterate through the steps. If we reboot, we'll simply848 # continue iterating on the next step.849 while len(self._state.get('client', 'steps')) > 0:850 steps = self._state.get('client', 'steps')851 (ancestry, fn_name, args, dargs) = steps.pop(0)852 self._state.set('client', 'steps', steps)853 self._next_step_index = 0854 ret = self._create_frame(global_control_vars, ancestry, fn_name)855 local_vars, self._current_step_ancestry = ret856 local_vars = self._run_step_fn(local_vars, fn_name, args, dargs)857 self._add_step_init(local_vars, fn_name)858 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):859 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),860 on_every_test)861 def add_sysinfo_logfile(self, file, on_every_test=False):862 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)863 def _add_sysinfo_loggable(self, loggable, on_every_test):864 if on_every_test:865 self.sysinfo.test_loggables.add(loggable)866 else:867 self.sysinfo.boot_loggables.add(loggable)868 self._save_sysinfo_state()869 def _load_sysinfo_state(self):870 state = self._state.get('client', 'sysinfo', None)871 if state:872 self.sysinfo.deserialize(state)873 def _save_sysinfo_state(self):874 state = self.sysinfo.serialize()875 self._state.set('client', 'sysinfo', state)876class disk_usage_monitor:877 def __init__(self, logging_func, device, max_mb_per_hour):878 self.func = logging_func879 self.device = device880 self.max_mb_per_hour = max_mb_per_hour881 def start(self):882 self.initial_space = utils.freespace(self.device)883 self.start_time = time.time()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!