Best Python code snippet using autotest_python
server_job.py
Source:server_job.py
...353 def _get_status_logger(self):354 """Return a reference to the status logger."""355 return self._logger356 @staticmethod357 def _load_control_file(path):358 f = open(path)359 try:360 control_file = f.read()361 finally:362 f.close()363 return re.sub('\r', '', control_file)364 def _register_subcommand_hooks(self):365 """366 Register some hooks into the subcommand modules that allow us367 to properly clean up self.hosts created in forked subprocesses.368 """369 def on_fork(cmd):370 self._existing_hosts_on_fork = set(self.hosts)371 def on_join(cmd):372 new_hosts = self.hosts - self._existing_hosts_on_fork373 for host in new_hosts:374 host.close()375 subcommand.subcommand.register_fork_hook(on_fork)376 subcommand.subcommand.register_join_hook(on_join)377 # TODO crbug.com/285395 add a kwargs parameter.378 def _make_namespace(self):379 """Create a namespace dictionary to be passed along to control file.380 Creates a namespace argument populated with standard values:381 machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,382 and ssh_options.383 """384 namespace = {'machines' : self.machine_dict_list,385 'job' : self,386 'ssh_user' : self._ssh_user,387 'ssh_port' : self._ssh_port,388 'ssh_pass' : self._ssh_pass,389 'ssh_verbosity_flag' : self._ssh_verbosity_flag,390 'ssh_options' : self._ssh_options}391 return namespace392 def cleanup(self, labels):393 """Cleanup machines.394 @param labels: Comma separated job labels, will be used to395 determine special task actions.396 """397 if not self.machines:398 raise error.AutoservError('No machines specified to cleanup')399 if self.resultdir:400 os.chdir(self.resultdir)401 namespace = self._make_namespace()402 namespace.update({'job_labels': labels, 'args': ''})403 self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)404 def verify(self, labels):405 """Verify machines are all ssh-able.406 @param labels: Comma separated job labels, will be used to407 determine special task actions.408 """409 if not self.machines:410 raise error.AutoservError('No machines specified to verify')411 if self.resultdir:412 os.chdir(self.resultdir)413 namespace = self._make_namespace()414 namespace.update({'job_labels': labels, 'args': ''})415 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)416 def reset(self, labels):417 """Reset machines by first cleanup then verify each machine.418 @param labels: Comma separated job labels, will be used to419 determine special task actions.420 """421 if not self.machines:422 raise error.AutoservError('No machines specified to reset.')423 if self.resultdir:424 os.chdir(self.resultdir)425 namespace = self._make_namespace()426 namespace.update({'job_labels': labels, 'args': ''})427 self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)428 def repair(self, labels):429 """Repair machines.430 @param labels: Comma separated job labels, will be used to431 determine special task actions.432 """433 if not self.machines:434 raise error.AutoservError('No machines specified to repair')435 if self.resultdir:436 os.chdir(self.resultdir)437 namespace = self._make_namespace()438 namespace.update({'job_labels': labels, 'args': ''})439 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)440 def provision(self, labels):441 """442 Provision all hosts to match |labels|.443 @param labels: A comma seperated string of labels to provision the444 host to.445 """446 control = self._load_control_file(PROVISION_CONTROL_FILE)447 self.run(control=control, job_labels=labels)448 def precheck(self):449 """450 perform any additional checks in derived classes.451 """452 pass453 def enable_external_logging(self):454 """455 Start or restart external logging mechanism.456 """457 pass458 def disable_external_logging(self):459 """460 Pause or stop external logging mechanism.461 """462 pass463 def use_external_logging(self):464 """465 Return True if external logging should be used.466 """467 return False468 def _make_parallel_wrapper(self, function, machines, log):469 """Wrap function as appropriate for calling by parallel_simple."""470 # machines could be a list of dictionaries, e.g.,471 # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]472 # The dictionary is generated in server_job.__init__, refer to473 # variable machine_dict_list, then passed in with namespace, see method474 # server_job._make_namespace.475 # To compare the machinese to self.machines, which is a list of machine476 # hostname, we need to convert machines back to a list of hostnames.477 if (machines and isinstance(machines, list)478 and isinstance(machines[0], dict)):479 machines = [m['hostname'] for m in machines]480 if len(machines) > 1 and log:481 def wrapper(machine):482 hostname = server_utils.get_hostname_from_machine(machine)483 self.push_execution_context(hostname)484 os.chdir(self.resultdir)485 machine_data = {'hostname' : hostname,486 'status_version' : str(self._STATUS_VERSION)}487 utils.write_keyval(self.resultdir, machine_data)488 result = function(machine)489 return result490 else:491 wrapper = function492 return wrapper493 def parallel_simple(self, function, machines, log=True, timeout=None,494 return_results=False):495 """496 Run 'function' using parallel_simple, with an extra wrapper to handle497 the necessary setup for continuous parsing, if possible. If continuous498 parsing is already properly initialized then this should just work.499 @param function: A callable to run in parallel given each machine.500 @param machines: A list of machine names to be passed one per subcommand501 invocation of function.502 @param log: If True, output will be written to output in a subdirectory503 named after each machine.504 @param timeout: Seconds after which the function call should timeout.505 @param return_results: If True instead of an AutoServError being raised506 on any error a list of the results|exceptions from the function507 called on each arg is returned. [default: False]508 @raises error.AutotestError: If any of the functions failed.509 """510 wrapper = self._make_parallel_wrapper(function, machines, log)511 return subcommand.parallel_simple(512 wrapper, machines,513 subdir_name_constructor=server_utils.get_hostname_from_machine,514 log=log, timeout=timeout, return_results=return_results)515 def parallel_on_machines(self, function, machines, timeout=None):516 """517 @param function: Called in parallel with one machine as its argument.518 @param machines: A list of machines to call function(machine) on.519 @param timeout: Seconds after which the function call should timeout.520 @returns A list of machines on which function(machine) returned521 without raising an exception.522 """523 results = self.parallel_simple(function, machines, timeout=timeout,524 return_results=True)525 success_machines = []526 for result, machine in itertools.izip(results, machines):527 if not isinstance(result, Exception):528 success_machines.append(machine)529 return success_machines530 def record_skipped_test(self, skipped_test, message=None):531 """Insert a failure record into status.log for this test."""532 msg = message533 if msg is None:534 msg = 'No valid machines found for test %s.' % skipped_test535 logging.info(msg)536 self.record('START', None, skipped_test.test_name)537 self.record('INFO', None, skipped_test.test_name, msg)538 self.record('END TEST_NA', None, skipped_test.test_name, msg)539 def _has_failed_tests(self):540 """Parse status log for failed tests.541 This checks the current working directory and is intended only for use542 by the run() method.543 @return boolean544 """545 path = os.getcwd()546 # TODO(ayatane): Copied from tko/parse.py. Needs extensive refactor to547 # make code reuse plausible.548 job_keyval = tko_models.job.read_keyval(path)549 status_version = job_keyval.get("status_version", 0)550 # parse out the job551 parser = parser_lib.parser(status_version)552 job = parser.make_job(path)553 status_log = os.path.join(path, "status.log")554 if not os.path.exists(status_log):555 status_log = os.path.join(path, "status")556 if not os.path.exists(status_log):557 logging.warning("! Unable to parse job, no status file")558 return True559 # parse the status logs560 status_lines = open(status_log).readlines()561 parser.start(job)562 tests = parser.end(status_lines)563 # parser.end can return the same object multiple times, so filter out564 # dups565 job.tests = []566 already_added = set()567 for test in tests:568 if test not in already_added:569 already_added.add(test)570 job.tests.append(test)571 failed = False572 for test in job.tests:573 # The current job is still running and shouldn't count as failed.574 # The parser will fail to parse the exit status of the job since it575 # hasn't exited yet (this running right now is the job).576 failed = failed or (test.status != 'GOOD'577 and not _is_current_server_job(test))578 return failed579 def _collect_crashes(self, namespace, collect_crashinfo):580 """Collect crashes.581 @param namespace: namespace dict.582 @param collect_crashinfo: whether to collect crashinfo in addition to583 dumps584 """585 if collect_crashinfo:586 # includes crashdumps587 crash_control_file = CRASHINFO_CONTROL_FILE588 else:589 crash_control_file = CRASHDUMPS_CONTROL_FILE590 self._execute_code(crash_control_file, namespace)591 _USE_TEMP_DIR = object()592 def run(self, collect_crashdumps=True, namespace={}, control=None,593 control_file_dir=None, verify_job_repo_url=False,594 only_collect_crashinfo=False, skip_crash_collection=False,595 job_labels='', use_packaging=True):596 # for a normal job, make sure the uncollected logs file exists597 # for a crashinfo-only run it should already exist, bail out otherwise598 created_uncollected_logs = False599 logging.info("I am PID %s", os.getpid())600 if self.resultdir and not os.path.exists(self._uncollected_log_file):601 if only_collect_crashinfo:602 # if this is a crashinfo-only run, and there were no existing603 # uncollected logs, just bail out early604 logging.info("No existing uncollected logs, "605 "skipping crashinfo collection")606 return607 else:608 log_file = open(self._uncollected_log_file, "w")609 pickle.dump([], log_file)610 log_file.close()611 created_uncollected_logs = True612 # use a copy so changes don't affect the original dictionary613 namespace = namespace.copy()614 machines = self.machines615 if control is None:616 if self.control is None:617 control = ''618 elif self._use_client_trampoline:619 control = self._load_control_file(620 CLIENT_TRAMPOLINE_CONTROL_FILE)621 # repr of a string is safe for eval.622 control = (('trampoline_testname = %r\n' % str(self.control))623 + control)624 else:625 control = self._load_control_file(self.control)626 if control_file_dir is None:627 control_file_dir = self.resultdir628 self.aborted = False629 namespace.update(self._make_namespace())630 namespace.update({631 'args': self.args,632 'job_labels': job_labels,633 'gtest_runner': site_gtest_runner.gtest_runner(),634 })635 test_start_time = int(time.time())636 if self.resultdir:637 os.chdir(self.resultdir)638 # touch status.log so that the parser knows a job is running here639 open(self.get_status_log_path(), 'a').close()640 self.enable_external_logging()641 collect_crashinfo = True642 temp_control_file_dir = None643 try:644 try:645 if not self.fast:646 with metrics.SecondsTimer(647 'chromeos/autotest/job/get_network_stats',648 fields = {'stage': 'start'}):649 namespace['network_stats_label'] = 'at-start'650 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,651 namespace)652 if only_collect_crashinfo:653 return654 # If the verify_job_repo_url option is set but we're unable655 # to actually verify that the job_repo_url contains the autotest656 # package, this job will fail.657 if verify_job_repo_url:658 self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,659 namespace)660 else:661 logging.warning('Not checking if job_repo_url contains '662 'autotest packages on %s', machines)663 # determine the dir to write the control files to664 cfd_specified = (control_file_dir665 and control_file_dir is not self._USE_TEMP_DIR)666 if cfd_specified:667 temp_control_file_dir = None668 else:669 temp_control_file_dir = tempfile.mkdtemp(670 suffix='temp_control_file_dir')671 control_file_dir = temp_control_file_dir672 server_control_file = os.path.join(control_file_dir,673 self._control_filename)674 client_control_file = os.path.join(control_file_dir,675 CLIENT_CONTROL_FILENAME)676 if self._client:677 namespace['control'] = control678 utils.open_write_close(client_control_file, control)679 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,680 server_control_file)681 else:682 utils.open_write_close(server_control_file, control)683 logging.info("Processing control file")684 namespace['use_packaging'] = use_packaging685 self._execute_code(server_control_file, namespace)686 logging.info("Finished processing control file")687 # If no device error occured, no need to collect crashinfo.688 collect_crashinfo = self.failed_with_device_error689 except Exception as e:690 try:691 logging.exception(692 'Exception escaped control file, job aborting:')693 reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,694 ' ', str(e))695 self.record('INFO', None, None, str(e),696 {'job_abort_reason': reason})697 except:698 pass # don't let logging exceptions here interfere699 raise700 finally:701 if temp_control_file_dir:702 # Clean up temp directory used for copies of the control files703 try:704 shutil.rmtree(temp_control_file_dir)705 except Exception as e:706 logging.warning('Could not remove temp directory %s: %s',707 temp_control_file_dir, e)708 if machines and (collect_crashdumps or collect_crashinfo):709 if skip_crash_collection or self.fast:710 logging.info('Skipping crash dump/info collection '711 'as requested.')712 else:713 with metrics.SecondsTimer(714 'chromeos/autotest/job/collect_crashinfo'):715 namespace['test_start_time'] = test_start_time716 # Remove crash files for passing tests.717 # TODO(ayatane): Tests that create crash files should be718 # reported.719 namespace['has_failed_tests'] = self._has_failed_tests()720 self._collect_crashes(namespace, collect_crashinfo)721 self.disable_external_logging()722 if self._uncollected_log_file and created_uncollected_logs:723 os.remove(self._uncollected_log_file)724 if not self.fast:725 with metrics.SecondsTimer(726 'chromeos/autotest/job/get_network_stats',727 fields = {'stage': 'end'}):728 namespace['network_stats_label'] = 'at-end'729 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,730 namespace)731 def run_test(self, url, *args, **dargs):732 """733 Summon a test object and run it.734 tag735 tag to add to testname736 url737 url of the test to run738 """739 if self._disable_sysinfo:740 dargs['disable_sysinfo'] = True741 group, testname = self.pkgmgr.get_package_name(url, 'test')742 testname, subdir, tag = self._build_tagged_test_name(testname, dargs)743 outputdir = self._make_test_outputdir(subdir)744 def group_func():745 try:746 test.runtest(self, url, tag, args, dargs)747 except error.TestBaseException as e:748 self.record(e.exit_status, subdir, testname, str(e))749 raise750 except Exception as e:751 info = str(e) + "\n" + traceback.format_exc()752 self.record('FAIL', subdir, testname, info)753 raise754 else:755 self.record('GOOD', subdir, testname, 'completed successfully')756 try:757 result = self._run_group(testname, subdir, group_func)758 except error.TestBaseException as e:759 return False760 else:761 return True762 def _run_group(self, name, subdir, function, *args, **dargs):763 """Underlying method for running something inside of a group."""764 result, exc_info = None, None765 try:766 self.record('START', subdir, name)767 result = function(*args, **dargs)768 except error.TestBaseException as e:769 self.record("END %s" % e.exit_status, subdir, name)770 raise771 except Exception as e:772 err_msg = str(e) + '\n'773 err_msg += traceback.format_exc()774 self.record('END ABORT', subdir, name, err_msg)775 raise error.JobError(name + ' failed\n' + traceback.format_exc())776 else:777 self.record('END GOOD', subdir, name)778 finally:779 for hook in self._post_run_hooks:780 hook()781 return result782 def run_group(self, function, *args, **dargs):783 """\784 @param function: subroutine to run785 @returns: (result, exc_info). When the call succeeds, result contains786 the return value of |function| and exc_info is None. If787 |function| raises an exception, exc_info contains the tuple788 returned by sys.exc_info(), and result is None.789 """790 name = function.__name__791 # Allow the tag for the group to be specified.792 tag = dargs.pop('tag', None)793 if tag:794 name = tag795 try:796 result = self._run_group(name, None, function, *args, **dargs)[0]797 except error.TestBaseException:798 return None, sys.exc_info()799 return result, None800 def run_op(self, op, op_func, get_kernel_func):801 """\802 A specialization of run_group meant specifically for handling803 management operation. Includes support for capturing the kernel version804 after the operation.805 Args:806 op: name of the operation.807 op_func: a function that carries out the operation (reboot, suspend)808 get_kernel_func: a function that returns a string809 representing the kernel version.810 """811 try:812 self.record('START', None, op)813 op_func()814 except Exception as e:815 err_msg = str(e) + '\n' + traceback.format_exc()816 self.record('END FAIL', None, op, err_msg)817 raise818 else:819 kernel = get_kernel_func()820 self.record('END GOOD', None, op,821 optional_fields={"kernel": kernel})822 def run_control(self, path):823 """Execute a control file found at path (relative to the autotest824 path). Intended for executing a control file within a control file,825 not for running the top-level job control file."""826 path = os.path.join(self.autodir, path)827 control_file = self._load_control_file(path)828 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)829 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):830 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),831 on_every_test)832 def add_sysinfo_logfile(self, file, on_every_test=False):833 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)834 def _add_sysinfo_loggable(self, loggable, on_every_test):835 if on_every_test:836 self.sysinfo.test_loggables.add(loggable)837 else:838 self.sysinfo.boot_loggables.add(loggable)839 def _read_warnings(self):840 """Poll all the warning loggers and extract any new warnings that have841 been logged. If the warnings belong to a category that is currently...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!