Best Python code snippet using autotest_python
base_packages.py
Source:base_packages.py
...159 """160 self.run_command = package_manager._run_command161 self.url = repository_url162 self.pkgmgr = package_manager163 def install_pkg_setup(self, name, fetch_dir, install):164 """165 Install setup for a package based on fetcher type.166 :type name: string167 :param name: The filename to be munged168 :type fetch_dir: string169 :param fetch_dir: The destination path to be munged170 :type install: boolean171 :param install: Whether this is be called from the install path or not172 :return: tuple with (name, fetch_dir)173 """174 if install:175 fetch_dir = os.path.join(fetch_dir, re.sub("/", "_", name))176 return (name, fetch_dir)177 def fetch_pkg_file(self, filename, dest_path):178 """179 Fetch a package file from a package repository.180 :type filename: string181 :param filename: The filename of the package file to fetch.182 :type dest_path: string183 :param dest_path: Destination path to download the file to.184 :raise PackageFetchError: if the fetch failed185 """186 raise NotImplementedError()187 def install_pkg_post(self, filename, fetch_dir,188 install_dir, preserve_install_dir=False):189 """190 Fetcher specific post install191 :type filename: string192 :param filename: The filename of the package to install193 :type fetch_dir: string194 :param fetch_dir: The fetched path of the package195 :type install_dir: string196 :param install_dir: The path to install the package to197 :type preserve_install_dir: boolean198 @preserve_install_dir: Preserve the install directory199 """200 # check to see if the install_dir exists and if it does201 # then check to see if the .checksum file is the latest202 install_dir_exists = False203 try:204 self.pkgmgr._run_command("ls %s" % install_dir)205 install_dir_exists = True206 except (error.CmdError, error.AutoservRunError):207 pass208 fetch_path = os.path.join(fetch_dir, re.sub("/", "_", filename))209 if (install_dir_exists and210 not self.pkgmgr.untar_required(fetch_path, install_dir)):211 return212 # untar the package into install_dir and213 # update the checksum in that directory214 if not preserve_install_dir:215 # Make sure we clean up the install_dir216 self.pkgmgr._run_command('rm -rf %s' % install_dir)217 self.pkgmgr._run_command('mkdir -p %s' % install_dir)218 self.pkgmgr.untar_pkg(fetch_path, install_dir)219class HttpFetcher(RepositoryFetcher):220 '''221 Repository Fetcher using HTTP222 '''223 #224 # parameters: url, destination file path225 #226 wget_cmd_pattern = 'wget --connect-timeout=15 -nv %s -O %s'227 def _quick_http_test(self):228 """229 Runs a wget command with a 30s timeout230 This checks that the repository is reachable, and avoids the need to231 wait for a full 10min timeout.232 """233 # just make a temp file to write a test fetch into234 mktemp = 'mktemp -u /tmp/tmp.XXXXXX'235 dest_file_path = self.run_command(mktemp).stdout.strip()236 try:237 # build up a wget command238 http_cmd = self.wget_cmd_pattern % (self.url, dest_file_path)239 try:240 self.run_command(http_cmd, _run_command_dargs={'timeout': 30})241 except Exception, e:242 msg = 'HTTP test failed, unable to contact %s: %s'243 raise error.PackageFetchError(msg % (self.url, e))244 finally:245 self.run_command('rm -rf %s' % dest_file_path)246 def fetch_pkg_file(self, filename, dest_path):247 """248 Fetch a package file from a package repository.249 :type filename: string250 :param filename: The filename of the package file to fetch.251 :type dest_path: string252 :param dest_path: Destination path to download the file to.253 :raise PackageFetchError: if the fetch failed254 """255 logging.info('Fetching %s from %s to %s', filename, self.url,256 dest_path)257 # do a quick test to verify the repo is reachable258 self._quick_http_test()259 # try to retrieve the package via http260 package_url = os.path.join(self.url, filename)261 try:262 cmd = self.wget_cmd_pattern % (package_url, dest_path)263 result = self.run_command(cmd)264 file_exists = self.run_command(265 'ls %s' % dest_path,266 _run_command_dargs={'ignore_status': True}).exit_status == 0267 if not file_exists:268 logging.error('wget failed: %s', result)269 raise error.CmdError(cmd, result)270 logging.debug('Successfully fetched %s from %s', filename,271 package_url)272 except error.CmdError:273 # remove whatever junk was retrieved when the get failed274 self.run_command('rm -f %s' % dest_path)275 raise error.PackageFetchError('%s not found in %s' % (filename,276 package_url))277class GitFetcher(RepositoryFetcher):278 """279 A git based repository fetcher280 """281 #282 # parameters: url, destination file path, <branch>:<file name>283 #284 git_archive_cmd_pattern = 'git archive --remote=%s -o %s %s'285 def __init__(self, package_manager, repository_url):286 """287 Initializes a new GitFetcher288 :type package_manager: BasePackageManager class289 :param package_manager: and instance of BasePackageManager class290 :type repository_url: string291 :param repository_url: The base URL of the git repository292 """293 super(GitFetcher, self).__init__(package_manager, repository_url)294 self._set_repo_url_branch(repository_url)295 logging.debug('GitFetcher initialized with repo=%s and branch=%s',296 self.url, self.branch)297 def _set_repo_url_branch(self, repository_url):298 '''299 Parse the url, look for a branch and set it accordingly300 :type repository_url: string301 :param repository_url: The base URL of the git repository302 '''303 # do we have branch info in the repoistory_url?304 branch = "master"305 match = repository_url.split(":")306 if len(match) > 2:307 # we have a branch308 branch = match[2]309 repository_url = re.sub(":" + branch, "", repository_url)310 self.branch = branch311 def fetch_pkg_file(self, filename, dest_path):312 """313 Fetch a package file and save it to the given destination path314 git is an SCM, you can download the test directly. No need to fetch315 a bz2'd tarball file. However 'filename' is <type>-<name>.tar.bz2316 break this up and only fetch <name>.317 :type filename: string318 :param filename: The filename of the package file to fetch.319 :type dest_path: string320 :param dest_path: Destination path to download the file to.321 """322 logging.info('Fetching %s from %s to %s', filename, self.url,323 dest_path)324 name, _ = self.pkgmgr.parse_tarball_name(filename)325 package_path = self.branch + " " + name326 try:327 cmd = self.git_archive_cmd_pattern % (self.url, dest_path, package_path)328 result = self.run_command(cmd)329 file_exists = self.run_command(330 'ls %s' % dest_path,331 _run_command_dargs={'ignore_status': True}).exit_status == 0332 if not file_exists:333 logging.error('git archive failed: %s', result)334 raise error.CmdError(cmd, result)335 logging.debug('Successfully fetched %s from %s', package_path,336 self.url)337 except error.CmdError:338 raise error.PackageFetchError('%s not found in %s' % (name,339 package_path))340 def install_pkg_post(self, filename, fetch_dir, install_dir,341 preserve_install_dir=False):342 os_dep.command("tar")343 filename, _ = self.pkgmgr.parse_tarball_name(filename)344 install_path = re.sub(filename, "", install_dir)345 for suffix in ['', '.tar', '.tar.bz2']:346 pkg_name = "%s%s" % (suffix, re.sub("/", "_", filename))347 fetch_path = os.path.join(fetch_dir, pkg_name)348 if os.path.exists(fetch_path):349 self.pkgmgr._run_command('tar -xf %s -C %s' % (fetch_path,350 install_path))351class LocalFilesystemFetcher(RepositoryFetcher):352 def fetch_pkg_file(self, filename, dest_path):353 logging.info('Fetching %s from %s to %s', filename, self.url,354 dest_path)355 local_path = os.path.join(self.url, filename)356 try:357 self.run_command('cp %s %s' % (local_path, dest_path))358 logging.debug('Successfully fetched %s from %s', filename,359 local_path)360 except error.CmdError, e:361 raise error.PackageFetchError(362 'Package %s could not be fetched from %s'363 % (filename, self.url), e)364class BasePackageManager(object):365 def __init__(self, pkgmgr_dir, hostname=None, repo_urls=None,366 upload_paths=None, do_locking=True, run_function=utils.run,367 run_function_args=[], run_function_dargs={}):368 '''369 Initializes a new BasePackageManager instance370 One of most used interfaces on this class is the _run_command(), which371 is controlled by the run_function parameter. It defaults to utils.run()372 but a custom method (if provided) should be of the same schema as373 utils.run. It should return a CmdResult object and throw a CmdError374 exception. The reason for using a separate function to run the commands375 is that the same code can be run to fetch a package on the local376 machine or on a remote machine (in which case ssh_host's run function377 is passed in for run_function).378 :type pkgmgr_dir: string379 :param pkgmgr_dir: A directory that can be used by the package manager380 to dump stuff (like checksum files of the repositories etc)381 :type hostname: string382 :param hostname: hostname from where to fetch a list of package repos383 :type repo_urls: list of strings384 :param repo_urls: The list of the repository urls which is consulted385 whilst fetching the package386 :type upload_paths: list of strings387 :param upload_paths: The list of the upload of repositories to which388 the package is uploaded to389 :type do_locking: boolean390 :param do_locking: Enable locking when the packages are installed.391 :type run_function: function392 :param run_function: function used to execute commands.393 :type run_function_args: tuple394 :param run_function_args: positional (tuple-like) arguments to395 run_function396 :param run_function_dargs: dictionary397 :param run_function_dargs: named (dictionary-like) arguments to398 run_function399 '''400 # In memory dictionary that stores the checksum's of packages401 self._checksum_dict = {}402 self.pkgmgr_dir = pkgmgr_dir403 self.do_locking = do_locking404 self.hostname = hostname405 self.repositories = []406 # Create an internal function that is a simple wrapper of407 # run_function and takes in the args and dargs as arguments408 def _run_command(command, _run_command_args=run_function_args,409 _run_command_dargs={}):410 '''411 Special internal function that takes in a command as412 argument and passes it on to run_function (if specified).413 The _run_command_dargs are merged into run_function_dargs414 with the former having more precedence than the latter.415 '''416 new_dargs = dict(run_function_dargs)417 new_dargs.update(_run_command_dargs)418 # avoid polluting logs with extremely verbose packaging output419 new_dargs.update({'stdout_tee': None})420 return run_function(command, *_run_command_args,421 **new_dargs)422 self._run_command = _run_command423 # Process the repository URLs424 if not repo_urls:425 repo_urls = []426 elif hostname:427 repo_urls = self.get_mirror_list(repo_urls)428 for url in repo_urls:429 self.add_repository(url)430 # Process the upload URLs431 if not upload_paths:432 self.upload_paths = []433 else:434 self.upload_paths = list(upload_paths)435 def add_repository(self, repo):436 if isinstance(repo, basestring):437 self.repositories.append(self.get_fetcher(repo))438 elif isinstance(repo, RepositoryFetcher):439 self.repositories.append(repo)440 else:441 raise TypeError("repo must be RepositoryFetcher or url string")442 def get_fetcher(self, url):443 if url.startswith('http://'):444 return HttpFetcher(self, url)445 elif url.startswith('git://'):446 return GitFetcher(self, url)447 else:448 return LocalFilesystemFetcher(self, url)449 def repo_check(self, repo):450 '''451 Check to make sure the repo is in a sane state:452 ensure we have at least XX amount of free space453 Make sure we can write to the repo454 '''455 if not repo.startswith('/') and not repo.startswith('ssh:'):456 return457 try:458 create_directory(repo)459 check_diskspace(repo)460 check_write(repo)461 except (error.RepoWriteError, error.RepoUnknownError,462 error.RepoDiskFullError), e:463 raise error.RepoError("ERROR: Repo %s: %s" % (repo, e))464 def upkeep(self, custom_repos=None):465 '''466 Clean up custom upload/download areas467 '''468 from autotest.server import subcommand469 if not custom_repos:470 # Not all package types necessarily require or allow custom repos471 try:472 custom_repos = settings.get_value('PACKAGES',473 'custom_upload_location').split(',')474 except SettingsError:475 custom_repos = []476 try:477 custom_download = settings.get_value('PACKAGES',478 'custom_download_location')479 custom_repos += [custom_download]480 except SettingsError:481 pass482 if not custom_repos:483 return484 subcommand.parallel_simple(trim_custom_directories, custom_repos,485 log=False)486 def install_pkg(self, name, pkg_type, fetch_dir, install_dir,487 preserve_install_dir=False, repo_url=None):488 '''489 Remove install_dir if it already exists and then recreate it unless490 preserve_install_dir is specified as True.491 Fetch the package into the pkg_dir. Untar the package into install_dir492 The assumption is that packages are of the form :493 <pkg_type>.<pkg_name>.tar.bz2494 name : name of the package495 type : type of the package496 fetch_dir : The directory into which the package tarball will be497 fetched to.498 install_dir : the directory where the package files will be untarred to499 repo_url : the url of the repository to fetch the package from.500 '''501 # do_locking flag is on by default unless you disable it (typically502 # in the cases where packages are directly installed from the server503 # onto the client in which case fcntl stuff wont work as the code504 # will run on the server in that case..505 if self.do_locking:506 lockfile_name = '.%s-%s-lock' % (re.sub("/", "_", name), pkg_type)507 lockfile = open(os.path.join(self.pkgmgr_dir, lockfile_name), 'w')508 try:509 if self.do_locking:510 fcntl.flock(lockfile, fcntl.LOCK_EX)511 self._run_command('mkdir -p %s' % fetch_dir)512 pkg_name = self.get_tarball_name(name, pkg_type)513 try:514 # Fetch the package into fetch_dir515 fetcher = self.fetch_pkg(pkg_name, fetch_dir, use_checksum=True,516 repo_url=repo_url, install=True)517 fetcher.install_pkg_post(pkg_name, fetch_dir, install_dir, preserve_install_dir)518 except error.PackageFetchError, why:519 raise error.PackageInstallError(520 'Installation of %s(type:%s) failed : %s'521 % (name, pkg_type, why))522 finally:523 if self.do_locking:524 fcntl.flock(lockfile, fcntl.LOCK_UN)525 lockfile.close()526 def fetch_pkg(self, pkg_name, dest_path, repo_url=None, use_checksum=False, install=False):527 '''528 Fetch the package into dest_dir from repo_url. By default repo_url529 is None and the package is looked in all the repositories specified.530 Otherwise it fetches it from the specific repo_url.531 pkg_name : name of the package (ex: test-sleeptest.tar.bz2,532 dep-gcc.tar.bz2, kernel.1-1.rpm)533 repo_url : the URL of the repository where the package is located.534 dest_path : complete path of where the package will be fetched to.535 use_checksum : This is set to False to fetch the packages.checksum file536 so that the checksum comparison is bypassed for the537 checksum file itself. This is used internally by the538 packaging system. It should be ignored by externals539 callers of this method who use it fetch custom packages.540 install : install path has unique name and destination requirements541 that vary based on the fetcher that is used. So call them542 here as opposed to install_pkg.543 '''544 try:545 self._run_command("ls %s" % os.path.dirname(dest_path))546 except (error.CmdError, error.AutoservRunError):547 raise error.PackageFetchError("Please provide a valid "548 "destination: %s " % dest_path)549 # See if the package was already fetched earlier, if so550 # the checksums need to be compared and the package is now551 # fetched only if they differ.552 pkg_exists = False553 try:554 self._run_command("ls %s" % dest_path)555 pkg_exists = True556 except (error.CmdError, error.AutoservRunError):557 pass558 # if a repository location is explicitly provided, fetch the package559 # from there and return560 if repo_url:561 repositories = [self.get_fetcher(repo_url)]562 elif self.repositories:563 repositories = self.repositories564 else:565 raise error.PackageFetchError("No repository urls specified")566 # install the package from the package repos, try the repos in567 # reverse order, assuming that the 'newest' repos are most desirable568 for fetcher in reversed(repositories):569 try:570 if isinstance(fetcher, GitFetcher):571 use_checksum = False572 # different fetchers have different install requirements573 dest = fetcher.install_pkg_setup(pkg_name, dest_path, install)[1]574 # Fetch the package if it is not there, the checksum does575 # not match, or checksums are disabled entirely576 need_to_fetch = (not use_checksum or577 not pkg_exists or578 not self.compare_checksum(dest, fetcher.url))579 if need_to_fetch:580 fetcher.fetch_pkg_file(pkg_name, dest)581 # update checksum so we won't refetch next time.582 if use_checksum:583 self.update_checksum(dest)584 return fetcher585 except (error.PackageFetchError, error.AutoservRunError):586 # The package could not be found in this repo, continue looking587 logging.debug('%s could not be fetched from %s', pkg_name,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!