Best Python code snippet using avocado_python
packages.py
Source:packages.py
1#2# Copyright (c) 1999--2017 Red Hat, Inc.3#4# This software is licensed to you under the GNU General Public License,5# version 2 (GPLv2). There is NO WARRANTY for this software, express or6# implied, including the implied warranties of MERCHANTABILITY or FITNESS7# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv28# along with this software; if not, see9# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.10#11# Red Hat trademarks are not licensed under GPLv2. No permission is12# granted to use or replicate Red Hat trademarks that are incorporated13# in this software or its documentation.14#15import os16import sys17import time18import yum19import yum.Errors20from yum.plugins import PluginYumExit21sys.path.append("/usr/share/yum-cli")22import callback23from up2date_client import up2dateLog24from up2date_client import config25from up2date_client import rpmUtils26from up2date_client import rhnPackageInfo27from rpm import RPMPROB_FILTER_OLDPACKAGE28from rpm import labelCompare29log = up2dateLog.initLog()30YUM_PID_FILE = '/var/run/yum.pid'31# file used to keep track of the next time rhn_check32# is allowed to update the package list on the server33LAST_UPDATE_FILE="/var/lib/up2date/dbtimestamp"34# mark this module as acceptable35__rhnexport__ = [36 'update',37 'remove',38 'refresh_list',39 'fullUpdate',40 'checkNeedUpdate',41 'runTransaction',42 'verify'43]44class YumAction(yum.YumBase):45 def __init__(self):46 yum.YumBase.__init__(self)47 self.cfg = config.initUp2dateConfig()48 self.doConfigSetup(debuglevel=self.cfg["debug"])49 self.cache_only = None50 self.doLock()51 self.doTsSetup()52 self.doRpmDBSetup()53 self.doRepoSetup()54 self.doSackSetup()55 # Copied from yum/cli.py, more or less56 def doTransaction(self):57 """takes care of package downloading, checking, user confirmation and actually58 RUNNING the transaction"""59 #allow downgrades to support rollbacks60 self.tsInfo.probFilterFlags.append(RPMPROB_FILTER_OLDPACKAGE)61 # Check which packages have to be downloaded62 downloadpkgs = []63 do_erase = False;64 for txmbr in self.tsInfo.getMembers():65 if txmbr.ts_state in ['i', 'u']:66 po = txmbr.po67 if po:68 downloadpkgs.append(po)69 elif txmbr.ts_state in ['e']:70 do_erase = True;71 log.log_debug('Downloading Packages:')72 problems = self.downloadPkgs(downloadpkgs)73 if len(problems.keys()) > 0:74 errstring = ''75 errstring += 'Error Downloading Packages:\n'76 for key in problems.keys():77 errors = yum.misc.unique(problems[key])78 for error in errors:79 errstring += ' %s: %s\n' % (key, error)80 raise yum.Errors.YumBaseError, errstring81 if self.cfg['retrieveOnly']:82 # We are configured to only download packages (rather than install/update)83 #84 # If all we were asked to do was install/update, skip rest of transaction85 # and return now.86 #87 # If all we were asked to do was remove packages, process the transaction.88 #89 # If we were asked to do a mixture of install/erase - then we can't do a90 # complete transaction, and we have to FAIL, rather than execute or show a91 # success.92 #93 if downloadpkgs and do_erase: # FAIL94 err = 'Mix of install/erase and "retrieveOnly" set - transaction FAILS'95 log.log_debug(err)96 raise yumErrors.YumBaseError, err97 elif downloadpkgs and not do_erase: # download and exit98 log.log_debug('Configured to "retrieveOnly" so skipping package install')99 return 0100 else: # erase-only, continue101 log.log_debug('Configured to "retrieveOnly" but only erase-commands - continuing')102 if self.cache_only:103 log.log_debug('Just pre-caching packages, skipping package install')104 return 0105 # Check GPG signatures106 if self.gpgsigcheck(downloadpkgs) != 0:107 return 1108 log.log_debug('Running Transaction Test')109 tsConf = {}110 for feature in ['diskspacecheck']: # more to come, I'm sure111 tsConf[feature] = getattr(self.conf, feature)112 # clean out the ts b/c we have to give it new paths to the rpms113 del self.ts114 self.initActionTs()115 # save our dsCallback out116 testcb = callback.RPMInstallCallback(output=0)117 testcb.tsInfo = self.tsInfo118 dscb = self.dsCallback119 self.dsCallback = None # dumb, dumb dumb dumb!120 self.populateTs(keepold=0) # sigh121 tserrors = self.ts.test(testcb, conf=tsConf)122 log.log_debug('Finished Transaction Test')123 if len(tserrors) > 0:124 errstring = 'Transaction Check Error: '125 for descr in tserrors:126 errstring += ' %s\n' % descr127 raise yum.Errors.YumBaseError, errstring128 log.log_debug('Transaction Test Succeeded')129 del self.ts130 self.initActionTs() # make a new, blank ts to populate131 self.populateTs(keepold=0) # populate the ts132 self.ts.check() #required for ordering133 self.ts.order() # order134 # put back our depcheck callback135 self.dsCallback = dscb136 log.log_debug('Running Transaction')137 self.runTransaction(testcb)138 # close things139 return 0140 # Also taken from yum/cli.py141 def gpgsigcheck(self, pkgs):142 '''Perform GPG signature verification on the given packages, installing143 keys if possible144 Returns non-zero if execution should stop (user abort).145 Will raise YumBaseError if there's a problem146 '''147 for po in pkgs:148 result, errmsg = self.sigCheckPkg(po)149 if result == 0:150 # Verified ok, or verify not req'd151 continue152 elif result == 1:153 # bz 433781154 # If the package is a Red Hat pkg, try to install the key and see if it helps155 if self.isRepoUsingRedHatGPG(po):156 log.log_debug("GPG check wasn't successful, will attempt to import key")157 self.getKeyForPackage(po, askcb=lambda x,y,z: True)158 log.log_debug("GPG key import was good.")159 # if we got here, the key worked, otherwise an exception is thrown160 else:161 raise yum.Errors.YumBaseError, \162 'Refusing to automatically import keys when running ' \163 'unattended.'164 else:165 # Fatal error166 raise yum.Errors.YumBaseError, errmsg167 return 0168 def isRepoUsingRedHatGPG(self, po):169 goodValues = ["file:///etc/pki/rpm-gpg/RPM-GPG-KEY-redhat-release"]170 repo = self.repos.getRepo(po.repoid)171 keyurls = repo.gpgkey172 if len(keyurls) < 1:173 return False174 for keyurl in keyurls:175 if keyurl not in goodValues:176 log.log_debug(177 "keyurl = %s, isn't a known Red Hat key, so this " \178 "will not be imported. Manually import this key " \179 "or set gpgcheck=0 in the RHN yum plugin configuration file"180 % (keyurl))181 return False182 return True183 def getInstalledPkgObject(self, package_tup):184 installed = self.rpmdb.returnPackages()185 log.log_debug("Searching for installed package to remove: %s"186 % str(package_tup))187 exactmatch, matched, unmatched = yum.packages.parsePackages(188 installed, (package_tup), casematch=1)189 erases = yum.misc.unique(matched + exactmatch)190 if len(erases) >= 1:191 log.log_debug("Found %d package(s) to remove" % len(erases))192 return erases193 else:194 # TODO: we should just fail, I think195 log.log_debug("Couldn't find packages to remove")196 return ()197 def add_transaction_data(self, transaction_data):198 """ Add packages to transaction.199 transaction_data is in format:200 { 'packages' : [201 [['name', '1.0.0', '1', '', ''], 'e'], ...202 # name, versio, rel., epoch, arch, flag203 ]}204 where flag can be:205 i - install206 u - update207 e - remove208 r - rollback209 Note: install and update will check for dependecies and210 obsoletes and will install them if needed.211 Rollback do not check anything and will assume that state212 to which we are rolling back should be correct.213 """214 for pkgtup, action in transaction_data['packages']:215 pkgkeys = {216 'name' : pkgtup[0],217 'epoch' : pkgtup[3],218 'version' : pkgtup[1],219 'release' : pkgtup[2],220 }221 if len(pkgtup) > 4:222 pkgkeys['arch'] = pkgtup[4]223 else:224 pkgtup.append('')225 pkgkeys['arch'] = None226 if action == 'u':227 self.update(**pkgkeys)228 elif action == 'i':229 self.install(**pkgkeys)230 elif action == 'r':231 # we are doing rollback, we want exact version232 # no dependecy check233 pkgs = self.pkgSack.searchNevra(name=pkgkeys['name'],234 epoch=pkgkeys['epoch'], arch=pkgkeys['arch'],235 ver=pkgkeys['version'], rel=pkgkeys['release'])236 if not pkgs:237 raise yum.Errors.YumBaseError, \238 "Cannot find package %s:%s-%s-%s.%s in any of enabled repositories." \239 % (pkgkeys['epoch'], pkgkeys['name'], pkgkeys['version'],240 pkgkeys['release'], pkgkeys['arch'])241 for po in pkgs:242 self.tsInfo.addInstall(po)243 elif action == 'e':244 package_tup = _yum_package_tup(pkgtup)245 packages = self.getInstalledPkgObject(package_tup)246 for package in packages:247 self.remove(package)248 else:249 assert False, "Unknown package transaction action."250# global module level reference to YumAction251yum_base = YumAction()252def _yum_package_tup(package_tup):253 """ Create a yum-style package tuple from an rhn package tuple.254 Allowed styles: n, n.a, n-v-r, n-e:v-r.a, n-v, n-v-r.a,255 e:n-v-r.a256 Choose from the above styles to be compatible with yum.parsePackage257 """258 n, v, r, e, a = package_tup[:]259 if not e:260 # set epoch to 0 as yum expects261 e = '0'262 if not a:263 pkginfo = '%s-%s-%s' % (n, v, r)264 else:265 pkginfo = '%s-%s:%s-%s.%s' % (n, e, v, r, a)266 return (pkginfo,)267def remove(package_list, cache_only=None):268 """We have been told that we should remove packages"""269 if cache_only:270 return (0, "no-ops for caching", {})271 if type(package_list) != type([]):272 return (13, "Invalid arguments passed to function", {})273 log.log_debug("Called remove_packages", package_list)274 transaction_data = __make_transaction(package_list, 'e')275 return _runTransaction(transaction_data)276def update(package_list, cache_only=None):277 """We have been told that we should retrieve/install packages"""278 if type(package_list) != type([]):279 return (13, "Invalid arguments passed to function", {})280 log.log_debug("Called update", package_list)281 err = None282 errmsgs = []283 # Remove already installed packages from the list284 for package in package_list[:]:285 pkgkeys = {286 'name' : package[0],287 'epoch' : package[3],288 'version' : package[1],289 'release' : package[2],290 }291 if len(package) == 5 \292 and package[1] == '' \293 and package[2] == '' \294 and package[3] == '' \295 and package[4] == '' \296 and yum_base.rpmdb.searchNevra(name=package[0]):297 log.log_debug('Package %s is already installed' % package[0])298 package_list.remove(package)299 continue300 if len(package) > 4:301 pkgkeys['arch'] = package[4]302 else:303 package.append('')304 pkgkeys['arch'] = None305 if pkgkeys['epoch'] == '':306 pkgkeys['epoch'] = None307 pkgs = yum_base.rpmdb.searchNevra(name=pkgkeys['name'], arch=pkgkeys['arch'])308 evr = yum.packages.PackageEVR(pkgkeys['epoch'], pkgkeys['version'], pkgkeys['release'])309 found = False310 for pkg in pkgs:311 current = pkg.returnEVR()312 currentEVR = (current.epoch, current.version, current.release)313 candidateEVR = (evr.epoch, evr.version, evr.release)314 compare = labelCompare(currentEVR, candidateEVR)315 if compare == 0:316 log.log_debug('Package %s already installed' \317 % _yum_package_tup(package))318 package_list.remove(package)319 found = True320 break321 elif compare > 0:322 log.log_debug('More recent version of package %s is already installed' \323 % _yum_package_tup(package))324 package_list.remove(package)325 found = True326 break327 if not found:328 available = yum_base.pkgSack.searchNevra(name=pkgkeys['name'], arch=pkgkeys['arch'],329 epoch=pkgkeys['epoch'], ver=pkgkeys['version'], rel=pkgkeys['release'])330 if not available:331 err = 'Package %s is not available for installation' \332 % _yum_package_tup(package)333 log.log_me('E: ', err )334 package_list.remove(package)335 errmsgs.append(err)336 # Don't proceed further with empty list,337 # since this would result into an empty yum transaction338 if not package_list:339 if err:340 ret = (32, "Failed: Packages failed to install properly:\n" + '\n'.join(errmsgs),341 {'version': '1', 'name': "package_install_failure"})342 else:343 ret = (0, "Requested packages already installed", {})344 return ret345 transaction_data = __make_transaction(package_list, 'i')346 return _runTransaction(transaction_data, cache_only)347def __make_transaction(package_list, action):348 """349 Build transaction Data like _runTransaction would expect.350 This is a list of ((n,v,r,e,a), m) where m is either e, i, or u351 """352 transaction_data = {}353 transaction_data['packages'] = []354 #We don't care about this stuff.355 transaction_data['flags'] = []356 transaction_data['vsflags'] = []357 transaction_data['probFilterFlags'] = []358 for package in package_list:359 transaction_data['packages'].append((package, action))360 return transaction_data361class RunTransactionCommand:362 def __init__(self, transaction_data):363 self.transaction_data = transaction_data364 def execute(self, yum_base):365 yum_base.add_transaction_data(self.transaction_data)366def _runTransaction(transaction_data, cache_only=None):367 """ Run a tranaction on a group of packages. """368 command = RunTransactionCommand(transaction_data)369 return _run_yum_action(command, cache_only)370def runTransaction(transaction_data, cache_only=None):371 """ Run a transaction on a group of packages.372 This was historicaly meant as generic call, but373 is only called for rollback.374 Therefore we change all actions "i" (install) to375 "r" (rollback) where we will not check dependencies and obsoletes.376 """377 if cache_only:378 return (0, "no-ops for caching", {})379 for package_object in transaction_data['packages'][:]:380 [package, action] = package_object381 pkgkeys = {382 'name' : package[0],383 'version' : package[1],384 'release' : package[2],385 'epoch' : package[3],386 }387 if len(package) > 4:388 pkgkeys['arch'] = package[4]389 else:390 pkgkeys['arch'] = None391 if pkgkeys['arch'] == '':392 pkgkeys['arch'] = None393 if pkgkeys['epoch'] == '':394 pkgkeys['epoch'] = None395 package_exists = yum_base.rpmdb.searchNevra(name=pkgkeys['name'],396 epoch=pkgkeys['epoch'], arch=pkgkeys['arch'],397 ver=pkgkeys['version'], rel=pkgkeys['release'])398 # if we're deleting a package that no longer exists, noop399 if action == 'e' and not package_exists:400 transaction_data['packages'].remove(package_object)401 # if we're installing a package that is already installed, noop402 elif action == 'i' and package_exists:403 transaction_data['packages'].remove(package_object)404 # Don't proceed further with empty package list,405 # since this would result in an empty yum transaction406 if not transaction_data['packages']:407 return (0, "Requested package actions have already been performed.", {})408 for index, data in enumerate(transaction_data['packages']):409 if data[1] == 'i':410 transaction_data['packages'][index][1] = 'r'411 return _runTransaction(transaction_data)412class FullUpdateCommand:413 def execute(self, yum_base):414 yum_base.update()415def fullUpdate(force=0, cache_only=None):416 """ Update all packages on the system. """417 #TODO: force doesn't mean anything for yum.418 command = FullUpdateCommand()419 return _run_yum_action(command, cache_only)420def _run_yum_action(command, cache_only=None):421 """422 Do something with yum.423 command is an object with an 'execute' method taking yum_base,424 so we can apply different operations to yum_base.425 """426 # TODO: Note to future programmers:427 # When this is running on python 2.5,428 # use the unified try/except/finally429 try:430 try:431 yum_base.doLock(YUM_PID_FILE)432 # Accumulate transaction data433 oldcount = len(yum_base.tsInfo)434 command.execute(yum_base)435 if not len(yum_base.tsInfo) > oldcount:436 raise yum.Errors.YumBaseError, 'empty transaction'437 # depSolving stage438 (result, resultmsgs) = yum_base.buildTransaction()439 if result == 1:440 # Fatal Error441 for msg in resultmsgs:442 log.log_debug('Error: %s' % msg)443 raise yum.Errors.DepError, resultmsgs444 elif result == 0 or result == 2:445 # Continue on446 pass447 else:448 # Unknown Error449 for msg in resultmsgs:450 log.log_debug('Error: %s' % msg)451 raise yum.Errors.YumBaseError, resultmsgs452 log.log_debug("Dependencies Resolved")453 yum_base.cache_only=cache_only454 yum_base.doTransaction()455 finally:456 yum_base.closeRpmDB()457 yum_base.doUnlock(YUM_PID_FILE)458 except (yum.Errors.InstallError, yum.Errors.UpdateError), e:459 data = {}460 data['version'] = "1"461 data['name'] = "package_install_failure"462 return (32, u"Failed: Packages failed to install "\463 "properly: %s" % unicode(e), data)464 except yum.Errors.RemoveError, e:465 data = {}466 data['version'] = 0467 data['name'] = "rpmremoveerrors"468 return (15, u"%s" % unicode(e), data)469 except yum.Errors.DepError, e:470 data = {}471 data["version"] = "1"472 data["name"] = "failed_deps"473 return (18, u"Failed: packages requested raised "\474 "dependency problems: %s" % unicode(e), data)475 except (yum.Errors.YumBaseError, PluginYumExit), e:476 status = 6,477 message = u"Error while executing packages action: %s" % unicode(e)478 data = {}479 return (status, message, data)480 return (0, "Update Succeeded", {})481# The following functions are the same as the old up2date ones.482def checkNeedUpdate(rhnsd=None, cache_only=None):483 """ Check if the locally installed package list changed, if484 needed the list is updated on the server485 In case of error avoid pushing data to stay safe486 """487 if cache_only:488 return (0, "no-ops for caching", {})489 data = {}490 dbpath = "/var/lib/rpm"491 cfg = config.initUp2dateConfig()492 if cfg['dbpath']:493 dbpath = cfg['dbpath']494 RPM_PACKAGE_FILE="%s/Packages" % dbpath495 try:496 dbtime = os.stat(RPM_PACKAGE_FILE)[8] # 8 is st_mtime497 except:498 return (0, "unable to stat the rpm database", data)499 try:500 last = os.stat(LAST_UPDATE_FILE)[8]501 except:502 last = 0;503 # Never update the package list more than once every 1/2 hour504 if last >= (dbtime - 10):505 return (0, "rpm database not modified since last update (or package "506 "list recently updated)", data)507 if last == 0:508 try:509 file = open(LAST_UPDATE_FILE, "w+")510 file.close()511 except:512 return (0, "unable to open the timestamp file", data)513 # call the refresh_list action with a argument so we know it's514 # from rhnsd515 return refresh_list(rhnsd=1)516def refresh_list(rhnsd=None, cache_only=None):517 """ push again the list of rpm packages to the server """518 if cache_only:519 return (0, "no-ops for caching", {})520 log.log_debug("Called refresh_rpmlist")521 ret = None522 try:523 rhnPackageInfo.updatePackageProfile()524 except:525 print "ERROR: refreshing remote package list for System Profile"526 return (20, "Error refreshing package list", {})527 touch_time_stamp()528 return (0, "rpmlist refreshed", {})529def touch_time_stamp():530 try:531 file_d = open(LAST_UPDATE_FILE, "w+")532 file_d.close()533 except:534 return (0, "unable to open the timestamp file", {})535 # Never update the package list more than once every hour.536 t = time.time()537 try:538 os.utime(LAST_UPDATE_FILE, (t, t))539 except:540 return (0, "unable to set the time stamp on the time stamp file %s"541 % LAST_UPDATE_FILE, {})542def verify(packages, cache_only=None):543 log.log_debug("Called packages.verify")544 if cache_only:545 return (0, "no-ops for caching", {})546 data = {}547 data['name'] = "packages.verify"548 data['version'] = 0549 ret, missing_packages = rpmUtils.verifyPackages(packages)550 data['verify_info'] = ret551 if len(missing_packages):552 data['name'] = "packages.verify.missing_packages"553 data['version'] = 0554 data['missing_packages'] = missing_packages555 return(43, "packages requested to be verified are missing", data)...
yyoom
Source:yyoom
1#!/usr/bin/python2# This program is free software; you can redistribute it and/or modify3# it under the terms of the GNU General Public License as published by4# the Free Software Foundation; either version 2 of the License, or5# (at your option) any later version.6#7# This program is distributed in the hope that it will be useful,8# but WITHOUT ANY WARRANTY; without even the implied warranty of9# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the10# GNU Library General Public License for more details.11#12# You should have received a copy of the GNU General Public License13# along with this program; if not, write to the Free Software14# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.15# Copyright 2005 Duke University16# Parts Copyright 2007 Red Hat, Inc17"""YYOOM: a package management utility18Using Yum API instead of /usr/bin/yum provides several interesting19capabilities, some of which we are desperate to use, including:20- installing and removing packages in same transaction;21- JSON output.22"""23import argparse24import json25import logging26import os27import sys28import yum29import functools30import collections31import pkg_resources32from contextlib import contextmanager33LOG = logging.getLogger('yyoom')34OUTPUT = None35ACTION_TYPE_MAP = {36 yum.constants.TS_INSTALL: 'install',37 yum.constants.TS_TRUEINSTALL: 'install',38 yum.constants.TS_UPDATE: 'upgrade',39 yum.constants.TS_OBSOLETING: 'upgrade',40 yum.constants.TS_ERASE: 'erase',41 yum.constants.TS_OBSOLETED: 'erase',42 yum.constants.TS_UPDATED: 'erase',43 yum.constants.TS_FAILED: 'error'44}45def _extended_yum_raises(method):46 """Decorator to extend error messages when manipulating packages with yum.47 """48 @functools.wraps(method)49 def wrapper(self, *args, **kwargs):50 try:51 return method(self, *args, **kwargs)52 except yum.Errors.YumBaseError as e:53 data = dict(54 method_name=method.__name__,55 args=[str(arg) for arg in args],56 kwargs='\n'.join(' %s: %s' % item57 for item in sorted(kwargs.items())))58 details = yum.i18n._("\nDetails:\n"59 " method name: %(method_name)s\n"60 " arguments: %(args)s\n"61 " keyword arguments:\n%(kwargs)s")62 e.value += details % data63 raise64 return wrapper65class _YyoomBase(yum.YumBase):66 def __init__(self, *args, **kwargs):67 """Reintroduced init to preset some settings.68 """69 super(_YyoomBase, self).__init__(*args, **kwargs)70 self.setCacheDir(force=True)71 def _askForGPGKeyImport(self, po, userid, hexkeyid):72 """Tell yum to import GPG keys if needed.73 Fixes: https://bugs.launchpad.net/anvil/+bug/121065774 Fixes: https://bugs.launchpad.net/anvil/+bug/121872875 """76 return True77 @_extended_yum_raises78 def install(self, po=None, **kwargs):79 return super(_YyoomBase, self).install(po, **kwargs)80 @_extended_yum_raises81 def remove(self, po=None, **kwargs):82 return super(_YyoomBase, self).remove(po, **kwargs)83def _setup_output():84 """Do some nasty manipulations with fds85 Yum internals may sometimes write to stdout, just out of a sudden.86 To prevent this output form interfering with our JSON, we save87 current stdout to other fd via os.dup, and replace fd 1 with88 /dev/null opened for writing.89 """90 global OUTPUT91 # save current stdout for later use92 OUTPUT = os.fdopen(os.dup(sys.stdout.fileno()), 'wb')93 # close the stream94 sys.stdout.close()95 # open /dev/null -- all writes to stdout from now on will go there96 devnull_fd = os.open(os.devnull, os.O_WRONLY)97 if devnull_fd != 1:98 os.dup2(devnull_fd, 1)99 os.close(devnull_fd)100 sys.stdout = os.fdopen(1, 'w')101def _write_output(data):102 """Dump given object as pretty json"""103 OUTPUT.write(json.dumps(data, indent=4,104 separators=(',', ': '),105 sort_keys=True) + '\n')106def _action_type_from_code(action):107 """Return value according to action code in dictionary mapping Yum states.108 Yum has a mapping that sometimes really isn't that accurate enough109 for our needs, so make a mapping that will suit our needs instead.110 """111 return ACTION_TYPE_MAP.get(action, 'other')112def _package_info(pkg, **kwargs):113 if isinstance(pkg, basestring):114 result = dict(name=pkg, **kwargs)115 else:116 result = dict(117 name=pkg.name,118 epoch=pkg.epoch,119 version=pkg.version,120 release=pkg.release,121 provides=pkg.provides,122 repo=str(pkg.repo),123 arch=pkg.arch,124 **kwargs125 )126 return result127class _RPMCallback(yum.rpmtrans.RPMBaseCallback):128 """Listen to events from RPM transactions"""129 def event(self, package, action, te_current, te_total,130 ts_current, ts_total):131 pass132 def scriptout(self, package, msg):133 if not msg or not LOG.isEnabledFor(logging.INFO):134 return135 for line in msg.splitlines():136 line = line.strip()137 if line:138 LOG.info("%s: %s", package, line)139 def errorlog(self, msg):140 LOG.error("%s", msg)141 def filelog(self, package, action):142 if not LOG.isEnabledFor(logging.INFO):143 return144 LOG.info("Performed %(action_type)s (code %(action)s) on %(package)s",145 dict(package=package,146 action=action,147 action_type=_action_type_from_code(action)))148class _OutputtingRPMCallback(_RPMCallback):149 def __init__(self, skip_missing=False):150 _RPMCallback.__init__(self)151 self._skip_missing = skip_missing152 self._missing = []153 def yyoom_post_transaction(self, base, _code):154 output = []155 for txmbr in base.tsInfo:156 action_type = _action_type_from_code(txmbr.output_state)157 info = _package_info(txmbr.po,158 action_code=txmbr.output_state,159 action_type=action_type)160 output.append(info)161 _write_output(output + self._missing)162 def yyoom_on_missing_package(self, pkg_req):163 if not self._skip_missing:164 raise yum.Errors.InstallError("The '%s' package not found." % pkg_req)165 req = pkg_resources.Requirement.parse(pkg_req)166 self._missing.append(_package_info(req.unsafe_name,167 action_type="missing",168 requirement=pkg_req,169 action=None))170def log_list(items, title=''):171 if not items:172 return173 if title:174 if not title.endswith(':'):175 title = str(title) + ":"176 LOG.info(title)177 for i in items:178 LOG.info(" - %s" % (i))179def build_yum_map(base):180 rpms = base.doPackageLists(ignore_case=True,181 showdups=True)182 all_rpms = []183 for name in ('available', 'installed', 'extras', 'reinstall_available'):184 all_rpms.extend(getattr(rpms, name, []))185 yum_map = collections.defaultdict(list)186 for rpm in all_rpms:187 for provides in rpm.provides:188 yum_map[provides[0]].append((rpm.version, rpm))189 return dict(yum_map)190def _find_packages(yum_map, pkg_req):191 """Find suitable packages in YUM packages map"""192 req = pkg_resources.Requirement.parse(pkg_req)193 matches = [rpm194 for (version, rpm) in yum_map.get(req.unsafe_name, [])195 if version in req]196 if matches:197 return matches198def _run(yum_base, options):199 """Handler of `transaction` command200 Installs and erases packages, prints what was done in JSON201 """202 log_list(options.erase, title='Erasing packages:')203 log_list(options.install, title='Installing packages:')204 with _transaction(yum_base,205 _OutputtingRPMCallback(options.skip_missing)) as cb:206 yum_map = build_yum_map(yum_base)207 # erase packages208 for pkg_name in options.erase or ():209 matches = _find_packages(yum_map, pkg_name)210 if matches is None:211 cb.yyoom_on_missing_package(pkg_name)212 else:213 installed_packages = yum_base.rpmdb.returnPackages()214 for package in matches:215 if package in installed_packages:216 yum_base.remove(package)217 # install packages218 for pkg_name in options.install or ():219 matches = _find_packages(yum_map, pkg_name)220 if matches is None:221 cb.yyoom_on_missing_package(pkg_name)222 else:223 # try to install package from preferred repositories,224 # if not found - install from default ones225 repo_matches = [m for m in matches226 if m.repoid in options.prefer_repo]227 matches = repo_matches if repo_matches else matches228 yum_base.install(max(matches))229def _list(yum_base, options):230 """Handler of `list` command"""231 pkgnarrow = options.what[0] if len(options.what) == 1 else 'all'232 lists = yum_base.doPackageLists(pkgnarrow=pkgnarrow, showdups=True)233 LOG.debug("Got packages for '%s': %s installed, %s available,"234 "%s available for reinstall, %s extras",235 pkgnarrow, len(lists.installed), len(lists.available),236 len(lists.reinstall_available), len(lists.extras))237 result = []238 if 'installed' in options.what:239 result.extend(_package_info(pkg, status='installed')240 for pkg in lists.installed)241 if 'available' in options.what:242 result.extend(_package_info(pkg, status='available')243 for pkg in lists.available)244 result.extend(_package_info(pkg, status='available')245 for pkg in lists.reinstall_available)246 if 'extras' in options.what:247 result.extend(_package_info(pkg, status='installed')248 for pkg in lists.extras)249 _write_output(result)250def _cleanall(yum_base, options):251 """Handler of `cleanall` command"""252 LOG.info("Running yum cleanup")253 code = sum((254 _run_yum_api('packages clean up', yum_base.cleanPackages),255 _run_yum_api('headers clean up', yum_base.cleanHeaders),256 _run_yum_api('metadata clean up', yum_base.cleanMetadata),257 _run_yum_api('sqlite clean up', yum_base.cleanSqlite),258 _run_yum_api('rpm db clean up', yum_base.cleanRpmDB),259 ))260 return code261def _builddep(yum_base, options):262 """Handler of `builddep` command263 Installs build dependencies for given package, prints what was done264 in JSON.265 """266 LOG.info("Installing build dependencies for package %s", options.srpm)267 srpm = yum.packages.YumLocalPackage(yum_base.ts, options.srpm)268 with _transaction(yum_base, _OutputtingRPMCallback()):269 for req in srpm.requiresList():270 LOG.debug('Processing dependency: %s', req)271 if not (272 req.startswith('rpmlib(') or273 yum_base.returnInstalledPackagesByDep(req)274 ):275 pkg = yum_base.returnPackageByDep(req)276 LOG.debug('Installing %s', pkg)277 yum_base.install(pkg)278def _parse_arguments(args):279 parser = argparse.ArgumentParser(prog=args[0])280 parser.add_argument('--verbose', '-v', action='store_true',281 help='verbose operation')282 # TODO(imelnikov): --format283 subparsers = parser.add_subparsers(title='subcommands')284 # Arg: list285 parser_list = subparsers.add_parser('list', help='list packages')286 parser_list.add_argument('what', nargs='+',287 choices=('installed', 'available', 'extras'),288 help='what packages to list')289 parser_list.set_defaults(func=_list, operation='List')290 # Arg: transaction291 parser_run = subparsers.add_parser('transaction',292 help='install or remove packages')293 parser_run.add_argument('--install', '-i', action='append',294 metavar='package',295 help='install package')296 parser_run.add_argument('--erase', '-e', action='append',297 metavar='package',298 help='erase package')299 parser_run.add_argument('--skip-missing', action='store_true',300 default=False,301 help='do not fail on missing packages')302 parser_run.add_argument('--prefer-repo', '-r', action='append',303 metavar='repository',304 default=[],305 help='preferred repository name')306 parser_run.set_defaults(func=_run, operation='Transaction')307 # Arg: srpm308 parser_builddep = subparsers.add_parser(309 'builddep', help='install build dependencies of srpm')310 parser_builddep.add_argument('srpm', help='path to source RPM package')311 parser_builddep.set_defaults(func=_builddep, operation='Builddep')312 # Arg: cleanall313 parser_cleanall = subparsers.add_parser('cleanall', help='clean all')314 parser_cleanall.set_defaults(func=_cleanall, operation='Cleanall')315 return parser.parse_args(args[1:])316def _setup_logging(verbose=True):317 """Initialize logging"""318 # setup logging -- put messages to stderr319 handler = logging.StreamHandler(sys.stderr)320 handler.setFormatter(logging.Formatter('YYOOM %(levelname)s: %(message)s'))321 root_logger = logging.getLogger()322 root_logger.addHandler(handler)323 root_logger.setLevel(logging.DEBUG if verbose else logging.INFO)324def _run_yum_api(name, func, ok_codes=(0,), *args, **kwargs):325 code, results = func(*args, **kwargs)326 for msg in results:327 LOG.debug(msg)328 if code not in ok_codes:329 LOG.error('%s failed', name.title())330 return code331@contextmanager332def _transaction(base, callback):333 """Manage Yum transactions334 Locks and unlocks Yum database, builds and processes transaction335 on __exit__.336 """337 try:338 base.doLock()339 yield callback340 code = _run_yum_api('building transaction',341 base.buildTransaction, ok_codes=(0, 2))342 failed = []343 if code == 0:344 LOG.debug('Nothing to do')345 elif code == 2:346 base.processTransaction(rpmTestDisplay=callback,347 rpmDisplay=callback)348 failed = [txmbr for txmbr in base.tsInfo349 if txmbr.output_state == yum.constants.TS_FAILED]350 else:351 raise RuntimeError("Transaction failed: %s" % code)352 post_cb = getattr(callback, 'yyoom_post_transaction', None)353 if post_cb:354 post_cb(base, code)355 if failed:356 raise RuntimeError("Operation failed for %s" %357 ', '.join(txmbr.name for txmbr in failed))358 finally:359 del base.tsInfo360 del base.ts361 base.doUnlock()362def main(args):363 options = _parse_arguments(args)364 _setup_output()365 _setup_logging(options.verbose)366 try:367 yum_base = _YyoomBase()368 return options.func(yum_base, options) or 0369 except Exception as e:370 if options.verbose:371 LOG.exception("%s failed", options.operation)372 else:373 LOG.error("%s failed: %s", options.operation, e)374 return 1375if __name__ == '__main__':...
yumlistmodel.py
Source:yumlistmodel.py
1import yum2import menu3import observable4class ListModel(menu.MenuModel, observable.Observable):5 """ Model of a list of packages. """6 7 def __init__(self):8 menu.MenuModel.__init__(self)9 observable.Observable.__init__(self)10 self.register_signal("selection-changed")11 12 self.title = "All Packages"13 self.name = self.title14 yum_base = yum.YumBase()15 16 yum_base.doConfigSetup()17 yum_base.closeRpmDB()18 yum_base.doTsSetup()19 yum_base.doRpmDBSetup()20 yum_base.doRepoSetup()21 yum_base.doSackSetup()22 yum_base.repos.populateSack()23 self.packages = []24 25 for po in yum_base.pkgSack.returnNewestByNameArch():26 if self.simpleDBInstalled(yum_base, po.returnSimple('name')):27 continue28 self.packages.append(DetailsModel(po, False))29 for po in yum_base.rpmdb.returnPackages():30 self.packages.append(DetailsModel(po, True))31 def simpleDBInstalled(self, yum_base, name):32 # From pirut.33 # FIXME: doing this directly instead of using self.rpmdb.installed()34 # speeds things up by 400%35 mi = yum_base.ts.ts.dbMatch('name', name)36 if mi.count() > 0:37 return True38 return False39 def add_sub_list(self, sub_list):40 """ Add a sub list to this list. """41 self.packages.append(sub_list)42 43 def move_up(self):44 """ Move the selection up one entry. """45 if (self.selected_entry > 0):46 self.selected_entry = self.selected_entry - 147 self.emit_signal("selection-changed")48 49 def move_down(self):50 """ Move the selection down one entry. """51 if (self.selected_entry < self.length - 1):52 self.selected_entry = self.selected_entry + 153 self.emit_signal("selection-changed")54 55 def __get_length(self):56 """ Return the length of the list including sublists. """57 length = 1 # Include the title.58 for entry in self.packages:59 if entry.__class__ == ListModel:60 length = length + entry.__get_length()61 else:62 length = length + 163 return length64 length = property(__get_length)65 def __get_selected(self):66 return self.packages[self.selected_entry - 1]67 selected = property(__get_selected)68class DetailsModel(object):69 """ A model of package details. """70 71 def __init__(self, pkg, installed):72 self.pkg = pkg73 self.name = pkg.name74 self.avail_version = pkg.version75 self.avail_release = pkg.release76 self.section = pkg.returnSimple("group")77 self.priority = "Required"78 self.arch = pkg.arch79 self.summary = pkg.returnSimple('summary')80 self.description = pkg.returnSimple('description')81 self.installed = installed82 if self.installed:83 self.version = pkg.version84 self.release = pkg.release85 self.action = 'INSTALL'86 else:87 self.version = '<none>'88 self.release = None89 self.action = 'REMOVE'90 def __get_eiom(self):91 eiom = " "92 if self.installed:93 eiom = eiom + "*" + "*"94 else:95 eiom = eiom + " " + "_"96 if self.action == 'INSTALL':97 eiom = eiom + "*"98 else:99 eiom = eiom + "_"100 return eiom...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!