Best Python code snippet using avocado_python
base_sysinfo.py
Source:base_sysinfo.py
1import glob2import gzip3import logging4import os5import re6import shutil7import subprocess8from autotest.client import utils9from autotest.client.shared import log, software_manager, utils_memory10from autotest.client.shared.settings import settings11_LOG_INSTALLED_PACKAGES = settings.get_value('CLIENT', 'log_installed_packages',12 type=bool, default=False)13_DEFAULT_COMMANDS_TO_LOG_PER_TEST = []14_DEFAULT_COMMANDS_TO_LOG_PER_BOOT = [15 "lspci -vvnn", "gcc --version", "ld --version", "mount", "hostname",16 "uptime", "dmidecode", "ifconfig -a", "brctl show", "ip link",17 "numactl --hardware show", "lscpu", "fdisk -l",18]19_DEFAULT_COMMANDS_TO_LOG_BEFORE_ITERATION = []20_DEFAULT_COMMANDS_TO_LOG_AFTER_ITERATION = []21_DEFAULT_FILES_TO_LOG_PER_TEST = []22_DEFAULT_FILES_TO_LOG_PER_BOOT = [23 "/proc/pci", "/proc/meminfo", "/proc/slabinfo", "/proc/version",24 "/proc/cpuinfo", "/proc/modules", "/proc/interrupts", "/proc/partitions",25 "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor",26 "/sys/kernel/debug/sched_features",27 "/sys/devices/system/clocksource/clocksource0/current_clocksource"28]29_DEFAULT_FILES_TO_LOG_BEFORE_ITERATION = [30 "/proc/schedstat", "/proc/meminfo", "/proc/slabinfo", "/proc/interrupts",31 "/proc/buddyinfo"32]33_DEFAULT_FILES_TO_LOG_AFTER_ITERATION = [34 "/proc/schedstat", "/proc/meminfo", "/proc/slabinfo", "/proc/interrupts",35 "/proc/buddyinfo"36]37class loggable(object):38 """ Abstract class for representing all things "loggable" by sysinfo. """39 def __init__(self, logf, log_in_keyval):40 self.logf = logf41 self.log_in_keyval = log_in_keyval42 def readline(self, logdir):43 path = os.path.join(logdir, self.logf)44 if os.path.exists(path):45 return utils.read_one_line(path)46 else:47 return ""48class logfile(loggable):49 def __init__(self, path, logf=None, log_in_keyval=False):50 if not logf:51 logf = os.path.basename(path)52 super(logfile, self).__init__(logf, log_in_keyval)53 self.path = path54 def __repr__(self):55 r = "sysinfo.logfile(%r, %r, %r)"56 r %= (self.path, self.logf, self.log_in_keyval)57 return r58 def __eq__(self, other):59 if isinstance(other, logfile):60 return (self.path, self.logf) == (other.path, other.logf)61 elif isinstance(other, loggable):62 return False63 return NotImplemented64 def __ne__(self, other):65 result = self.__eq__(other)66 if result is NotImplemented:67 return result68 return not result69 def __hash__(self):70 return hash((self.path, self.logf))71 def run(self, logdir):72 if os.path.exists(self.path):73 try:74 shutil.copyfile(self.path, os.path.join(logdir, self.logf))75 except IOError:76 logging.info("Not logging %s (lack of permissions)",77 self.path)78class command(loggable):79 def __init__(self, cmd, logf=None, log_in_keyval=False, compress_log=False):80 if not logf:81 logf = cmd.replace(" ", "_")82 super(command, self).__init__(logf, log_in_keyval)83 self.cmd = cmd84 self._compress_log = compress_log85 def __repr__(self):86 r = "sysinfo.command(%r, %r, %r)"87 r %= (self.cmd, self.logf, self.log_in_keyval)88 return r89 def __eq__(self, other):90 if isinstance(other, command):91 return (self.cmd, self.logf) == (other.cmd, other.logf)92 elif isinstance(other, loggable):93 return False94 return NotImplemented95 def __ne__(self, other):96 result = self.__eq__(other)97 if result is NotImplemented:98 return result99 return not result100 def __hash__(self):101 return hash((self.cmd, self.logf))102 def run(self, logdir):103 env = os.environ.copy()104 if "PATH" not in env:105 env["PATH"] = "/usr/bin:/bin"106 logf_path = os.path.join(logdir, self.logf)107 stdin = open(os.devnull, "r")108 stderr = open(os.devnull, "w")109 stdout = open(logf_path, "w")110 try:111 subprocess.call(self.cmd, stdin=stdin, stdout=stdout, stderr=stderr,112 shell=True, env=env)113 finally:114 for f in (stdin, stdout, stderr):115 f.close()116 if self._compress_log and os.path.exists(logf_path):117 utils.run('gzip -9 "%s"' % logf_path, ignore_status=True,118 verbose=False)119class base_sysinfo(object):120 def __init__(self, job_resultsdir):121 self.sysinfodir = self._get_sysinfodir(job_resultsdir)122 # pull in the post-test logs to collect123 self.test_loggables = set()124 for cmd in _DEFAULT_COMMANDS_TO_LOG_PER_TEST:125 self.test_loggables.add(command(cmd))126 for filename in _DEFAULT_FILES_TO_LOG_PER_TEST:127 self.test_loggables.add(logfile(filename))128 # pull in the EXTRA post-boot logs to collect129 self.boot_loggables = set()130 for cmd in _DEFAULT_COMMANDS_TO_LOG_PER_BOOT:131 self.boot_loggables.add(command(cmd))132 for filename in _DEFAULT_FILES_TO_LOG_PER_BOOT:133 self.boot_loggables.add(logfile(filename))134 # pull in the pre test iteration logs to collect135 self.before_iteration_loggables = set()136 for cmd in _DEFAULT_COMMANDS_TO_LOG_BEFORE_ITERATION:137 self.before_iteration_loggables.add(138 command(cmd, logf=cmd.replace(" ", "_") + '.before'))139 for fname in _DEFAULT_FILES_TO_LOG_BEFORE_ITERATION:140 self.before_iteration_loggables.add(141 logfile(fname, logf=os.path.basename(fname) + '.before'))142 # pull in the post test iteration logs to collect143 self.after_iteration_loggables = set()144 for cmd in _DEFAULT_COMMANDS_TO_LOG_AFTER_ITERATION:145 self.after_iteration_loggables.add(146 command(cmd, logf=cmd.replace(" ", "_") + '.after'))147 for fname in _DEFAULT_FILES_TO_LOG_AFTER_ITERATION:148 self.after_iteration_loggables.add(149 logfile(fname, logf=os.path.basename(fname) + '.after'))150 # add in a couple of extra files and commands we want to grab151 self.test_loggables.add(command("df -mP", logf="df"))152 # We compress the dmesg because it can get large when kernels are153 # configured with a large buffer and some tests trigger OOMs or154 # other large "spam" that fill it up...155 self.test_loggables.add(command("dmesg -c", logf="dmesg",156 compress_log=True))157 self.boot_loggables.add(logfile("/proc/cmdline",158 log_in_keyval=True))159 # log /proc/mounts but with custom filename since we already160 # log the output of the "mount" command as the filename "mount"161 self.boot_loggables.add(logfile('/proc/mounts', logf='proc_mounts'))162 self.boot_loggables.add(command("uname -a", logf="uname",163 log_in_keyval=True))164 self.sm = software_manager.SoftwareManager()165 def __getstate__(self):166 ret = dict(self.__dict__)167 ret["sm"] = None168 return ret169 def serialize(self):170 return {"boot": self.boot_loggables, "test": self.test_loggables}171 def deserialize(self, serialized):172 self.boot_loggables = serialized["boot"]173 self.test_loggables = serialized["test"]174 @staticmethod175 def _get_sysinfodir(resultsdir):176 sysinfodir = os.path.join(resultsdir, "sysinfo")177 if not os.path.exists(sysinfodir):178 os.makedirs(sysinfodir)179 return sysinfodir180 def _get_reboot_count(self):181 if not glob.glob(os.path.join(self.sysinfodir, "*")):182 return -1183 else:184 return len(glob.glob(os.path.join(self.sysinfodir, "boot.*")))185 def _get_boot_subdir(self, next=False):186 reboot_count = self._get_reboot_count()187 if next:188 reboot_count += 1189 if reboot_count < 1:190 return self.sysinfodir191 else:192 boot_dir = "boot.%d" % (reboot_count - 1)193 return os.path.join(self.sysinfodir, boot_dir)194 def _get_iteration_subdir(self, test, iteration):195 iter_dir = "iteration.%d" % iteration196 logdir = os.path.join(self._get_sysinfodir(test.outputdir), iter_dir)197 if not os.path.exists(logdir):198 os.mkdir(logdir)199 return logdir200 @log.log_and_ignore_errors("post-reboot sysinfo error:")201 def log_per_reboot_data(self):202 """ Logging hook called whenever a job starts, and again after203 any reboot. """204 logdir = self._get_boot_subdir(next=True)205 if not os.path.exists(logdir):206 os.mkdir(logdir)207 for log in (self.test_loggables | self.boot_loggables):208 log.run(logdir)209 if _LOG_INSTALLED_PACKAGES:210 # also log any installed packages211 installed_path = os.path.join(logdir, "installed_packages")212 installed_packages = "\n".join(self.sm.list_all()) + "\n"213 utils.open_write_close(installed_path, installed_packages)214 @log.log_and_ignore_errors("pre-test sysinfo error:")215 def log_before_each_test(self, test):216 """ Logging hook called before a test starts. """217 if _LOG_INSTALLED_PACKAGES:218 self._installed_packages = self.sm.list_all()219 # Also log the list of installed packaged before each test starts220 test_sysinfodir = self._get_sysinfodir(test.outputdir)221 installed_path = os.path.join(test_sysinfodir, "installed_packages")222 installed_packages = "\n".join(self._installed_packages)223 utils.open_write_close(installed_path, installed_packages)224 if os.path.exists("/var/log/messages"):225 stat = os.stat("/var/log/messages")226 self._messages_size = stat.st_size227 self._messages_inode = stat.st_ino228 elif os.path.exists("/var/log/syslog"):229 stat = os.stat("/var/log/syslog")230 self._messages_size = stat.st_size231 self._messages_inode = stat.st_ino232 @log.log_and_ignore_errors("post-test sysinfo error:")233 def log_after_each_test(self, test):234 """ Logging hook called after a test finishs. """235 test_sysinfodir = self._get_sysinfodir(test.outputdir)236 # create a symlink in the test sysinfo dir to the current boot237 reboot_dir = self._get_boot_subdir()238 assert os.path.exists(reboot_dir)239 symlink_dest = os.path.join(test_sysinfodir, "reboot_current")240 symlink_src = utils.get_relative_path(reboot_dir,241 os.path.dirname(symlink_dest))242 try:243 os.symlink(symlink_src, symlink_dest)244 except Exception, e:245 raise Exception('%s: whilst linking %s to %s' % (e, symlink_src,246 symlink_dest))247 # run all the standard logging commands248 for log in self.test_loggables:249 log.run(test_sysinfodir)250 # grab any new data from the system log251 self._log_messages(test_sysinfodir)252 # log some sysinfo data into the test keyval file253 keyval = self.log_test_keyvals(test_sysinfodir)254 test.write_test_keyval(keyval)255 if _LOG_INSTALLED_PACKAGES:256 # log any changes to installed packages257 old_packages = set(self._installed_packages)258 new_packages = set(self.sm.list_all())259 added_path = os.path.join(test_sysinfodir, "added_packages")260 added_packages = "\n".join(new_packages - old_packages) + "\n"261 utils.open_write_close(added_path, added_packages)262 removed_path = os.path.join(test_sysinfodir, "removed_packages")263 removed_packages = "\n".join(old_packages - new_packages) + "\n"264 utils.open_write_close(removed_path, removed_packages)265 @log.log_and_ignore_errors("pre-test siteration sysinfo error:")266 def log_before_each_iteration(self, test, iteration=None):267 """ Logging hook called before a test iteration."""268 if not iteration:269 iteration = test.iteration270 logdir = self._get_iteration_subdir(test, iteration)271 for log in self.before_iteration_loggables:272 log.run(logdir)273 @log.log_and_ignore_errors("post-test siteration sysinfo error:")274 def log_after_each_iteration(self, test, iteration=None):275 """ Logging hook called after a test iteration."""276 if not iteration:277 iteration = test.iteration278 logdir = self._get_iteration_subdir(test, iteration)279 for log in self.after_iteration_loggables:280 log.run(logdir)281 def _log_messages(self, logdir):282 """ Log all of the new data in the system log. """283 try:284 # log all of the new data in the system log285 logpaths = ["/var/log/messages", "/var/log/syslog"]286 for logpath in logpaths:287 if os.path.exists(logpath):288 break289 else:290 raise ValueError("System log file not found (looked for %s)" %291 logpaths)292 bytes_to_skip = 0293 if hasattr(self, "_messages_size"):294 current_inode = os.stat(logpath).st_ino295 if current_inode == self._messages_inode:296 bytes_to_skip = self._messages_size297 in_messages = open(logpath)298 out_file_basename = os.path.basename(logpath) + ".gz"299 out_file_name = os.path.join(logdir, out_file_basename)300 out_messages = gzip.GzipFile(out_file_name, "w")301 try:302 in_messages.seek(bytes_to_skip)303 while True:304 # Read data in managable chunks rather than all at once.305 in_data = in_messages.read(200000)306 if not in_data:307 break308 out_messages.write(in_data)309 finally:310 out_messages.close()311 in_messages.close()312 except ValueError, e:313 logging.info(e)314 except (IOError, OSError):315 logging.info("Not logging %s (lack of permissions)", logpath)316 except Exception, e:317 logging.info("System log collection failed: %s", e)318 @staticmethod319 def _read_sysinfo_keyvals(loggables, logdir):320 keyval = {}321 for log in loggables:322 if log.log_in_keyval:323 keyval["sysinfo-" + log.logf] = log.readline(logdir)324 return keyval325 def log_test_keyvals(self, test_sysinfodir):326 """ Logging hook called by log_after_each_test to collect keyval327 entries to be written in the test keyval. """328 keyval = {}329 # grab any loggables that should be in the keyval330 keyval.update(self._read_sysinfo_keyvals(331 self.test_loggables, test_sysinfodir))332 keyval.update(self._read_sysinfo_keyvals(333 self.boot_loggables,334 os.path.join(test_sysinfodir, "reboot_current")))335 # remove hostname from uname info336 # Linux lpt36 2.6.18-smp-230.1 #1 [4069269] SMP Fri Oct 24 11:30:...337 if "sysinfo-uname" in keyval:338 kernel_vers = " ".join(keyval["sysinfo-uname"].split()[2:])339 keyval["sysinfo-uname"] = kernel_vers340 # grab the total avail memory, not used by sys tables341 path = os.path.join(test_sysinfodir, "reboot_current", "meminfo")342 if os.path.exists(path):343 mem_data = open(path).read()344 match = re.search(r"^MemTotal:\s+(\d+) kB$", mem_data,345 re.MULTILINE)346 if match:347 keyval["sysinfo-memtotal-in-kb"] = match.group(1)348 # guess the system's total physical memory, including sys tables349 keyval["sysinfo-phys-mbytes"] = utils_memory.rounded_memtotal() // 1024350 # return what we collected...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!