Best Python code snippet using autotest_python
base_packages.py
Source:base_packages.py
...434 self.upload_pkg_dir(pkg_path, upload_path)435 else:436 self.upload_pkg_file(pkg_path, upload_path)437 if update_checksum:438 self.upload_pkg_file(self._get_checksum_file_path(),439 upload_path)440 def upload_pkg_file(self, file_path, upload_path):441 '''442 Upload a single file. Depending on the upload path, the appropriate443 method for that protocol is called. Currently this simply copies the444 file to the target directory (but can be extended for other protocols)445 This assumes that the web server is running on the same machine where446 the method is being called from. The upload_path's files are447 basically served by that web server.448 '''449 try:450 if upload_path.startswith('ssh://'):451 # parse ssh://user@host/usr/local/autotest/packages452 hostline, remote_path = parse_ssh_path(upload_path)453 try:454 utils.run('scp %s %s:%s' % (file_path, hostline,455 remote_path))456 r_path = os.path.join(remote_path,457 os.path.basename(file_path))458 utils.run("ssh %s 'chmod 644 %s'" % (hostline, r_path))459 except error.CmdError:460 logging.error("Error uploading to repository %s",461 upload_path)462 else:463 shutil.copy(file_path, upload_path)464 os.chmod(os.path.join(upload_path,465 os.path.basename(file_path)), 0644)466 except (IOError, os.error), why:467 logging.error("Upload of %s to %s failed: %s", file_path,468 upload_path, why)469 def upload_pkg_dir(self, dir_path, upload_path):470 '''471 Upload a full directory. Depending on the upload path, the appropriate472 method for that protocol is called. Currently this copies the whole473 tmp package directory to the target directory.474 This assumes that the web server is running on the same machine where475 the method is being called from. The upload_path's files are476 basically served by that web server.477 '''478 local_path = os.path.join(dir_path, "*")479 try:480 if upload_path.startswith('ssh://'):481 hostline, remote_path = parse_ssh_path(upload_path)482 try:483 utils.run('scp %s %s:%s' % (local_path, hostline,484 remote_path))485 ssh_path = os.path.join(remote_path, "*")486 utils.run("ssh %s 'chmod 644 %s'" % (hostline, ssh_path))487 except error.CmdError:488 logging.error("Error uploading to repository: %s",489 upload_path)490 else:491 utils.run("cp %s %s " % (local_path, upload_path))492 up_path = os.path.join(upload_path, "*")493 utils.run("chmod 644 %s" % up_path)494 except (IOError, os.error), why:495 raise error.PackageUploadError("Upload of %s to %s failed: %s"496 % (dir_path, upload_path, why))497 def remove_pkg(self, pkg_name, remove_path=None, remove_checksum=False):498 '''499 Remove the package from the specified remove_path500 pkg_name : name of the package (ex: test-sleeptest.tar.bz2,501 dep-gcc.tar.bz2)502 remove_path : the location to remove the package from.503 '''504 if remove_path:505 remove_path_list = [remove_path]506 elif len(self.upload_paths) > 0:507 remove_path_list = self.upload_paths508 else:509 raise error.PackageRemoveError(510 "Invalid path to remove the pkg from")511 checksum_path = self._get_checksum_file_path()512 if remove_checksum:513 self.remove_checksum(pkg_name)514 # remove the package and upload the checksum file to the repos515 for path in remove_path_list:516 self.remove_pkg_file(pkg_name, path)517 self.upload_pkg_file(checksum_path, path)518 def remove_pkg_file(self, filename, pkg_dir):519 '''520 Remove the file named filename from pkg_dir521 '''522 try:523 # Remove the file524 if pkg_dir.startswith('ssh://'):525 hostline, remote_path = parse_ssh_path(pkg_dir)526 path = os.path.join(remote_path, filename)527 utils.run("ssh %s 'rm -rf %s/%s'" % (hostline, remote_path,528 path))529 else:530 os.remove(os.path.join(pkg_dir, filename))531 except (IOError, os.error), why:532 raise error.PackageRemoveError("Could not remove %s from %s: %s "533 % (filename, pkg_dir, why))534 def get_mirror_list(self, repo_urls):535 '''536 Stub function for site specific mirrors.537 Returns:538 Priority ordered list539 '''540 return repo_urls541 def _get_checksum_file_path(self):542 '''543 Return the complete path of the checksum file (assumed to be stored544 in self.pkgmgr_dir545 '''546 return os.path.join(self.pkgmgr_dir, CHECKSUM_FILE)547 def _get_checksum_dict(self):548 '''549 Fetch the checksum file if not already fetched. If the checksum file550 cannot be fetched from the repos then a new file is created with551 the current package's (specified in pkg_path) checksum value in it.552 Populate the local checksum dictionary with the values read from553 the checksum file.554 The checksum file is assumed to be present in self.pkgmgr_dir555 '''556 checksum_path = self._get_checksum_file_path()557 if not self._checksum_dict:558 # Fetch the checksum file559 try:560 try:561 self._run_command("ls %s" % checksum_path)562 except (error.CmdError, error.AutoservRunError):563 # The packages checksum file does not exist locally.564 # See if it is present in the repositories.565 self.fetch_pkg(CHECKSUM_FILE, checksum_path)566 except error.PackageFetchError:567 # This should not happen whilst fetching a package..if a568 # package is present in the repository, the corresponding569 # checksum file should also be automatically present. This570 # case happens only when a package571 # is being uploaded and if it is the first package to be572 # uploaded to the repos (hence no checksum file created yet)573 # Return an empty dictionary in that case574 return {}575 # Read the checksum file into memory576 checksum_file_contents = self._run_command('cat '577 + checksum_path).stdout578 # Return {} if we have an empty checksum file present579 if not checksum_file_contents.strip():580 return {}581 # Parse the checksum file contents into self._checksum_dict582 for line in checksum_file_contents.splitlines():583 checksum, package_name = line.split(None, 1)584 self._checksum_dict[package_name] = checksum585 return self._checksum_dict586 def _save_checksum_dict(self, checksum_dict):587 '''588 Save the checksum dictionary onto the checksum file. Update the589 local _checksum_dict variable with this new set of values.590 checksum_dict : New checksum dictionary591 checksum_dir : The directory in which to store the checksum file to.592 '''593 checksum_path = self._get_checksum_file_path()594 self._checksum_dict = checksum_dict.copy()595 checksum_contents = '\n'.join(checksum + ' ' + pkg_name596 for pkg_name, checksum in597 checksum_dict.iteritems())598 # Write the checksum file back to disk599 self._run_command('echo "%s" > %s' % (checksum_contents,600 checksum_path),601 _run_command_dargs={'verbose': False})602 def compute_checksum(self, pkg_path):603 '''604 Compute the MD5 checksum for the package file and return it.605 pkg_path : The complete path for the package file606 '''607 md5sum_output = self._run_command("md5sum %s " % pkg_path).stdout...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!