Best Python code snippet using autotest_python
site_autotest.py
Source:site_autotest.py
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4import logging5import os6import tempfile7import urllib28from autotest_lib.client.common_lib import error, global_config9from autotest_lib.client.common_lib.cros import dev_server10from autotest_lib.server import installable_object, autoserv_parser11from autotest_lib.server import utils as server_utils12from autotest_lib.server.cros.dynamic_suite import tools13from autotest_lib.server.cros.dynamic_suite.constants import JOB_REPO_URL14_CONFIG = global_config.global_config15_PARSER = autoserv_parser.autoserv_parser16class SiteAutotest(installable_object.InstallableObject):17 """Site implementation of Autotest."""18 def get(self, location=None):19 if not location:20 location = os.path.join(self.serverdir, '../client')21 location = os.path.abspath(location)22 installable_object.InstallableObject.get(self, location)23 self.got = True24 def _get_fetch_location_from_host_attribute(self):25 """Get repo to use for packages from host attribute, if possible.26 Hosts are tagged with an attribute containing the URL27 from which to source packages when running a test on that host.28 If self.host is set, attempt to look this attribute up by calling out29 to the AFE.30 @returns value of the 'job_repo_url' host attribute, if present.31 """32 try:33 from autotest_lib.server import frontend34 if self.host:35 afe = frontend.AFE(debug=False)36 hosts = afe.get_hosts(hostname=self.host.hostname)37 if hosts and JOB_REPO_URL in hosts[0].attributes:38 return hosts[0].attributes[JOB_REPO_URL]39 logging.warning("No %s for %s", JOB_REPO_URL, self.host)40 except (ImportError, urllib2.URLError):41 logging.warning('Not attempting to look for %s', JOB_REPO_URL)42 pass43 return None44 def get_fetch_location(self):45 """Generate list of locations where autotest can look for packages.46 Old n' busted: Autotest packages are always stored at a URL that can47 be derived from the one passed via the voodoo magic --image argument.48 New hotness: Hosts are tagged with an attribute containing the URL49 from which to source packages when running a test on that host.50 @returns the list of candidate locations to check for packages.51 """52 repos = super(SiteAutotest, self).get_fetch_location()53 if _PARSER.options.image:54 image_opt = _PARSER.options.image55 if image_opt.startswith('http://'):56 # A devserver HTTP url was specified, set that as the repo_url.57 repos.append(image_opt.replace(58 'update', 'static').rstrip('/') + '/autotest')59 else:60 # An image_name like stumpy-release/R27-3437.0.0 was specified,61 # set this as the repo_url for the host. If an AFE is not being62 # run, this will ensure that the installed build uses the63 # associated artifacts for the test specified when running64 # autoserv with --image. However, any subsequent tests run on65 # the host will no longer have the context of the image option66 # and will revert back to utilizing test code/artifacts that are67 # currently present in the users source checkout.68 devserver_url = dev_server.ImageServer.resolve(image_opt).url()69 repo_url = tools.get_package_url(devserver_url, image_opt)70 repos.append(repo_url)71 elif not server_utils.is_inside_chroot():72 # Only try to get fetch location from host attribute if the test73 # is not running inside chroot.74 # No --image option was specified, look for the repo url via75 # the host attribute. If we are not running with a full AFE76 # autoserv will fall back to serving packages itself from whatever77 # source version it is sync'd to rather than using the proper78 # artifacts for the build on the host.79 found_repo = self._get_fetch_location_from_host_attribute()80 if found_repo is not None:81 # Add our new repo to the end, the package manager will82 # later reverse the list of repositories resulting in ours83 # being first84 repos.append(found_repo)85 return repos86 def install(self, host=None, autodir=None, use_packaging=True):87 """Install autotest. If |host| is not None, stores it in |self.host|.88 @param host A Host instance on which autotest will be installed89 @param autodir Location on the remote host to install to90 @param use_packaging Enable install modes that use the packaging system.91 """92 if host:93 self.host = host94 super(SiteAutotest, self).install(host=host, autodir=autodir,95 use_packaging=use_packaging)96 def _install(self, host=None, autodir=None, use_autoserv=True,97 use_packaging=True):98 """99 Install autotest. If get() was not called previously, an100 attempt will be made to install from the autotest svn101 repository.102 @param host A Host instance on which autotest will be installed103 @param autodir Location on the remote host to install to104 @param use_autoserv Enable install modes that depend on the client105 running with the autoserv harness106 @param use_packaging Enable install modes that use the packaging system107 @exception AutoservError if a tarball was not specified and108 the target host does not have svn installed in its path109 """110 # TODO(milleral): http://crbug.com/258161111 super(SiteAutotest, self)._install(host, autodir, use_autoserv,112 use_packaging)113 # Send over the most recent global_config.ini after installation if one114 # is available.115 # This code is a bit duplicated from116 # _BaseRun._create_client_config_file, but oh well.117 if self.installed and self.source_material:118 logging.info('Installing updated global_config.ini.')119 destination = os.path.join(self.host.get_autodir(),120 'global_config.ini')121 with tempfile.NamedTemporaryFile() as client_config:122 config = global_config.global_config123 client_section = config.get_section_values('CLIENT')124 client_section.write(client_config)125 client_config.flush()126 self.host.send_file(client_config.name, destination)127 def run_static_method(self, module, method, results_dir='.', host=None,128 *args):129 """Runs a non-instance method with |args| from |module| on the client.130 This method runs a static/class/module autotest method on the client.131 For example:132 run_static_method("autotest_lib.client.cros.cros_ui", "reboot")133 Will run autotest_lib.client.cros.cros_ui.reboot() on the client.134 @param module: module name as you would refer to it when importing in a135 control file. e.g. autotest_lib.client.common_lib.module_name.136 @param method: the method you want to call.137 @param results_dir: A str path where the results should be stored138 on the local filesystem.139 @param host: A Host instance on which the control file should140 be run.141 @param args: args to pass to the method.142 """143 control = "\n".join(["import %s" % module,144 "%s.%s(%s)\n" % (module, method,145 ','.join(map(repr, args)))])146 self.run(control, results_dir=results_dir, host=host)147class SiteClientLogger(object):148 """Overrides default client logger to allow for using a local package cache.149 """150 def _process_line(self, line):151 """Returns the package checksum file if it exists."""152 logging.debug(line)153 fetch_package_match = self.fetch_package_parser.search(line)154 if fetch_package_match:155 pkg_name, dest_path, fifo_path = fetch_package_match.groups()156 serve_packages = _CONFIG.get_config_value(157 "PACKAGES", "serve_packages_from_autoserv", type=bool)158 if serve_packages and pkg_name == 'packages.checksum':159 try:160 checksum_file = os.path.join(161 self.job.pkgmgr.pkgmgr_dir, 'packages', pkg_name)162 if os.path.exists(checksum_file):163 self.host.send_file(checksum_file, dest_path)164 except error.AutoservRunError:165 msg = "Package checksum file not found, continuing anyway"166 logging.exception(msg)167 try:168 # When fetching a package, the client expects to be169 # notified when the fetching is complete. Autotest170 # does this pushing a B to a fifo queue to the client.171 self.host.run("echo B > %s" % fifo_path)172 except error.AutoservRunError:173 msg = "Checksum installation failed, continuing anyway"174 logging.exception(msg)175 finally:176 return177 # Fall through to process the line using the default method.178 super(SiteClientLogger, self)._process_line(line)179 def _send_tarball(self, pkg_name, remote_dest):180 """Uses tarballs in package manager by default."""181 try:182 server_package = os.path.join(self.job.pkgmgr.pkgmgr_dir,183 'packages', pkg_name)184 if os.path.exists(server_package):185 self.host.send_file(server_package, remote_dest)186 return187 except error.AutoservRunError:188 msg = ("Package %s could not be sent from the package cache." %189 pkg_name)190 logging.exception(msg)191 # Fall through to send tarball the default method.192 super(SiteClientLogger, self)._send_tarball(pkg_name, remote_dest)193class _SiteRun(object):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!