Best Python code snippet using autotest_python
iostat.py
Source:iostat.py
1#!/usr/bin/env python2# coding=utf-83#4# collectd-iostat-python5# ======================6#7# Collectd-iostat-python is an iostat plugin for collectd that allows you to8# graph Linux iostat metrics in Graphite or other output formats that are9# supported by collectd.10#11# https://github.com/powdahound/redis-collectd-plugin12# - was used as template13# https://github.com/keirans/collectd-iostat/14# - was used as inspiration and contains some code from15# https://bitbucket.org/jakamkon/python-iostat16# - by Kuba KoÅczyk <jakamkon at users.sourceforge.net>17#18import signal19import string20import subprocess21import sys22__version__ = '0.0.3'23__author__ = 'denis.zhdanov@gmail.com'24class IOStatError(Exception):25 pass26class CmdError(IOStatError):27 pass28class ParseError(IOStatError):29 pass30class IOStat(object):31 def __init__(self, path='/usr/bin/iostat', interval=2, count=2, disks=[]):32 self.path = path33 self.interval = interval34 self.count = count35 self.disks = disks36 def parse_diskstats(self, input):37 """38 Parse iostat -d and -dx output.If there are more39 than one series of statistics, get the last one.40 By default parse statistics for all avaliable block devices.41 @type input: C{string}42 @param input: iostat output43 @type disks: list of C{string}s44 @param input: lists of block devices that45 statistics are taken for.46 @return: C{dictionary} contains per block device statistics.47 Statistics are in form of C{dictonary}.48 Main statistics:49 tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn50 Extended staistics (available with post 2.5 kernels):51 rrqm/s wrqm/s r/s w/s rsec/s wsec/s rkB/s wkB/s avgrq-sz \52 avgqu-sz await svctm %util53 See I{man iostat} for more details.54 """55 dstats = {}56 dsi = input.rfind('Device:')57 if dsi == -1:58 raise ParseError('Unknown input format: %r' % input)59 ds = input[dsi:].splitlines()60 hdr = ds.pop(0).split()[1:]61 for d in ds:62 if d:63 d = d.split()64 dev = d.pop(0)65 if (dev in self.disks) or not self.disks:66 dstats[dev] = dict([(k, float(v)) for k, v in zip(hdr, d)])67 return dstats68 def sum_dstats(self, stats, smetrics):69 """70 Compute the summary statistics for chosen metrics.71 """72 avg = {}73 for disk, metrics in stats.iteritems():74 for mname, metric in metrics.iteritems():75 if mname not in smetrics:76 continue77 if mname in avg:78 avg[mname] += metric79 else:80 avg[mname] = metric81 return avg82 def _run(self, options=None):83 """84 Run iostat command.85 """86 close_fds = 'posix' in sys.builtin_module_names87 args = '%s %s %s %s %s' % (88 self.path,89 ''.join(options),90 self.interval,91 self.count,92 ' '.join(self.disks))93 return subprocess.Popen(94 args,95 bufsize=1,96 shell=True,97 stdout=subprocess.PIPE,98 close_fds=close_fds)99 @staticmethod100 def _get_childs_data(child):101 """102 Return child's data when avaliable.103 """104 (stdout, stderr) = child.communicate()105 ecode = child.poll()106 if ecode != 0:107 raise CmdError('Command %r returned %d' % (child.cmd, ecode))108 return stdout109 def get_diskstats(self):110 """111 Get all avaliable disks statistics that we can get.112 """113 dstats = self._run(options=['-kNd'])114 extdstats = self._run(options=['-kNdx'])115 dsd = self._get_childs_data(dstats)116 edd = self._get_childs_data(extdstats)117 ds = self.parse_diskstats(dsd)118 eds = self.parse_diskstats(edd)119 for dk, dv in ds.iteritems():120 if dk in eds:121 ds[dk].update(eds[dk])122 return ds123class IOMon(object):124 def __init__(self):125 self.plugin_name = 'collectd-iostat-python'126 self.iostat_path = '/usr/bin/iostat'127 self.iostat_interval = 2128 self.iostat_count = 2129 self.iostat_disks = []130 self.iostat_nice_names = False131 self.iostat_disks_regex = ''132 self.verbose_logging = False133 self.names = {134 'tps': {'t': 'transfers_per_second'},135 'Blk_read/s': {'t': 'blocks_per_second', 'ti': 'read'},136 'kB_read/s': {'t': 'bytes_per_second', 'ti': 'read', 'm': 10e3},137 'MB_read/s': {'t': 'bytes_per_second', 'ti': 'read', 'm': 10e6},138 'Blk_wrtn/s': {'t': 'blocks_per_second', 'ti': 'write'},139 'kB_wrtn/s': {'t': 'bytes_per_second', 'ti': 'write', 'm': 10e3},140 'MB_wrtn/s': {'t': 'bytes_per_second', 'ti': 'write', 'm': 10e6},141 'Blk_read': {'t': 'blocks', 'ti': 'read'},142 'kB_read': {'t': 'bytes', 'ti': 'read', 'm': 10e3},143 'MB_read': {'t': 'bytes', 'ti': 'read', 'm': 10e6},144 'Blk_wrtn': {'t': 'blocks', 'ti': 'write'},145 'kB_wrtn': {'t': 'bytes', 'ti': 'write', 'm': 10e3},146 'MB_wrtn': {'t': 'bytes', 'ti': 'write', 'm': 10e6},147 'rrqm/s': {'t': 'requests_merged_per_second', 'ti': 'read'},148 'wrqm/s': {'t': 'requests_merged_per_second', 'ti': 'write'},149 'r/s': {'t': 'per_second', 'ti': 'read'},150 'w/s': {'t': 'per_second', 'ti': 'write'},151 'rsec/s': {'t': 'sectors_per_second', 'ti': 'read'},152 'rkB/s': {'t': 'bytes_per_second', 'ti': 'read', 'm': 10e3},153 'rMB/s': {'t': 'bytes_per_second', 'ti': 'read', 'm': 10e6},154 'wsec/s': {'t': 'sectors_per_second', 'ti': 'write'},155 'wkB/s': {'t': 'bytes_per_second', 'ti': 'write', 'm': 10e3},156 'wMB/s': {'t': 'bytes_per_second', 'ti': 'write', 'm': 10e6},157 'avgrq-sz': {'t': 'avg_request_size'},158 'avgqu-sz': {'t': 'avg_request_queue'},159 'await': {'t': 'avg_wait_time'},160 'r_await': {'t': 'avg_wait_time', 'ti': 'read'},161 'w_await': {'t': 'avg_wait_time', 'ti': 'write'},162 'svctm': {'t': 'avg_service_time'},163 '%util': {'t': 'percent', 'ti': 'util'}164 }165 def log_verbose(self, msg):166 if not self.verbose_logging:167 return168 collectd.info('%s plugin [verbose]: %s' % (self.plugin_name, msg))169 def configure_callback(self, conf):170 """171 Receive configuration block172 """173 for node in conf.children:174 val = str(node.values[0])175 if node.key == 'Path':176 self.iostat_path = val177 elif node.key == 'Interval':178 self.iostat_interval = int(float(val))179 elif node.key == 'Count':180 self.iostat_count = int(float(val))181 elif node.key == 'Disks':182 self.iostat_disks = val.split(',')183 elif node.key == 'NiceNames':184 self.iostat_nice_names = val in ['True', 'true']185 elif node.key == 'DisksRegex':186 self.iostat_disks_regex = val187 elif node.key == 'PluginName':188 self.plugin_name = val189 elif node.key == 'Verbose':190 self.verbose_logging = val in ['True', 'true']191 else:192 collectd.warning(193 '%s plugin: Unknown config key: %s.' % (194 self.plugin_name,195 node.key))196 self.log_verbose(197 'Configured with iostat=%s, interval=%s, count=%s, disks=%s, '198 'disks_regex=%s' % (199 self.iostat_path,200 self.iostat_interval,201 self.iostat_count,202 self.iostat_disks,203 self.iostat_disks_regex))204 def dispatch_value(self, plugin_instance, val_type, type_instance, value):205 """206 Dispatch a value to collectd207 """208 self.log_verbose(209 'Sending value: %s-%s.%s=%s' % (210 self.plugin_name,211 plugin_instance,212 '-'.join([val_type, type_instance]),213 value))214 val = collectd.Values()215 val.plugin = self.plugin_name216 val.plugin_instance = plugin_instance217 val.type = val_type218 if len(type_instance):219 val.type_instance = type_instance220 val.values = [value, ]221 val.dispatch()222 def read_callback(self):223 """224 Collectd read callback225 """226 self.log_verbose('Read callback called')227 iostat = IOStat(228 path=self.iostat_path,229 interval=self.iostat_interval,230 count=self.iostat_count,231 disks=self.iostat_disks)232 ds = iostat.get_diskstats()233 if not ds:234 self.log_verbose('%s plugin: No info received.' % self.plugin_name)235 return236 for disk in ds:237 for name in ds[disk]:238 if self.iostat_nice_names and name in self.names:239 val_type = self.names[name]['t']240 if 'ti' in self.names[name]:241 type_instance = self.names[name]['ti']242 else:243 type_instance = ''244 value = ds[disk][name]245 if 'm' in self.names[name]:246 value *= self.names[name]['m']247 else:248 val_type = 'gauge'249 tbl = string.maketrans('/-%', '___')250 type_instance = name.translate(tbl)251 value = ds[disk][name]252 self.dispatch_value(253 disk, val_type, type_instance, value)254def restore_sigchld():255 """256 Restore SIGCHLD handler for python <= v2.6257 It will BREAK exec plugin!!!258 See https://github.com/deniszh/collectd-iostat-python/issues/2 for details259 """260 if sys.version_info[0] == 2 and sys.version_info[1] <= 6:261 signal.signal(signal.SIGCHLD, signal.SIG_DFL)262if __name__ == '__main__':263 iostat = IOStat()264 ds = iostat.get_diskstats()265 for disk in ds:266 for metric in ds[disk]:267 tbl = string.maketrans('/-%', '___')268 metric_name = metric.translate(tbl)269 print("%s.%s:%s" % (disk, metric_name, ds[disk][metric]))270 sys.exit(0)271else:272 import collectd273 iomon = IOMon()274 # Register callbacks275 collectd.register_init(restore_sigchld)276 collectd.register_config(iomon.configure_callback)...
iostat_tab.py
Source:iostat_tab.py
1# pandas and numpy for data manipulation2import pandas as pd3import numpy as np4import sqlite35from bokeh.plotting import Figure6from bokeh.models import (7 CategoricalColorMapper,8 HoverTool,9 ColumnDataSource,10 Panel,11 FuncTickFormatter,12 SingleIntervalTicker,13 LinearAxis,14 Legend,15)16from bokeh.models.widgets import (17 CheckboxGroup,18 Slider,19 RangeSlider,20 Tabs,21 CheckboxButtonGroup,22 TableColumn,23 DataTable,24 Select,25)26from bokeh.layouts import column, row, WidgetBox27import matplotlib28matplotlib.use("Agg")29import matplotlib.pyplot as plt30import matplotlib.colors as colors31def iostat_tab(db):32 def make_dataset(iostat_list):33 newdf = iostat[iostat_list]34 # Convert dataframe to column data source35 return ColumnDataSource(newdf)36 def make_plot(src):37 # Blank plot with correct labels38 p = Figure(39 plot_width=1024,40 plot_height=768,41 x_axis_type="datetime",42 title="iostat",43 output_backend="webgl",44 )45 cm = plt.get_cmap("gist_rainbow")46 numlines = len(iostat.columns)47 mypal = [cm(1.0 * i / numlines) for i in range(numlines)]48 mypal = list(map(lambda x: colors.rgb2hex(x), mypal))49 col = 050 legenditems = []51 for key in src.data.keys():52 if key == "datetime":53 continue54 l = key + " "55 col = col + 156 p.line(57 iostat.index.values,58 iostat[key],59 line_width=1,60 alpha=0.8,61 color=mypal[col],62 )63 legenditems += [(key, [cline])]64 p.legend.click_policy = "hide"65 legend = Legend(items=legenditems, location=(0, -30))66 p.add_layout(legend, "right")67 return p68 def update(attr, old, new):69 iostats_to_plot = [iostat_selection.labels[i] for i in iostat_selection.active]70 new_src = make_dataset(iostats_to_plot)71 plot = make_plot(new_src)72 # TODO:crude hack in lack of a better solution so far73 layout.children[1] = plot74 # get data from DB, setup index75 cur = db.cursor()76 cur.execute(77 "SELECT name FROM sqlite_master WHERE type='table' AND name=?", ["iostat"]78 )79 if len(cur.fetchall()) == 0:80 return None81 iostat = pd.read_sql_query("select * from iostat", db)82 iostat.index = pd.to_datetime(iostat["datetime"])83 iostat = iostat.drop(["datetime"], axis=1)84 iostat.index.name = "datetime"85 iostat_selection = CheckboxGroup(labels=list(iostat.columns), active=[0, 5])86 iostat_list = [iostat_selection.labels[i] for i in iostat_selection.active]87 src = make_dataset(iostat_list)88 plot = make_plot(src)89 iostat_selection.on_change("active", update)90 controls = WidgetBox(iostat_selection)91 layout = row(controls, plot)92 tab = Panel(child=layout, title="iostat")...
events_feeder_iostat_all_disks.py
Source:events_feeder_iostat_all_disks.py
1#!/usr/bin/env python2"""This continuously display disks state with iostat command."""3import os4import re5import subprocess6import logging7import lib_uris8import lib_common9import lib_util10import lib_properties11def Usable(entity_type, entity_ids_arr):12 return lib_util.check_program_exists("iostat")13################################################################################14# Typical output of the iostat command:15# Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn16# sda 3,00 0,00 48,00 0 4817# sdb 0,00 0,00 0,00 0 018def _io_stat_to_graph(grph, spl, iostat_header):19 """This runs in the HTTP server and uses the data from the queue.20 This reads a record from the queue and builds a RDF relation with it."""21 device_node = lib_uris.gUriGen.DiskUri(spl[0])22 property_name_to_node = dict()23 for idx in range(1, len(spl)):24 # No idea why doubles are written with a comma. Maybe the locale?25 qty = float(spl[idx].replace(",", "."))26 property_name = iostat_header[idx]27 property_node = property_name_to_node.get(property_name, lib_properties.MakeProp(property_name))28 grph.add((device_node, property_node, lib_util.NodeLiteral(qty)))29def Main(loop_number=1):30 """This runs iostat and parses its output."""31 # TODO: The delay could be a parameter.32 iostat_cmd = ["iostat", "-d", "1"]33 logging.debug("iostat_cmd=%s" % str(iostat_cmd))34 # Contains the last header read.35 iostat_header = []36 cgiEnv = lib_common.ScriptEnvironment()37 proc_open = None38 try:39 proc_popen = subprocess.Popen(iostat_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)40 for lin in proc_popen.stdout.readlines():41 if not lin:42 continue43 logging.debug("lin=%s" % lin)44 # We transfer also the header.45 spl = re.split(' +', lin)46 if spl[0] == 'Device:':47 iostat_header = spl48 return49 grph = cgiEnv.ReinitGraph()50 _io_stat_to_graph(grph, spl, iostat_header)51 cgiEnv.OutCgiRdf()52 loop_number -= 153 if loop_number == 0:54 break55 except Exception as exc:56 logging.error("Caught:%s" % exc)57 lib_common.ErrorMessageHtml("iostat error:%s" % str(exc))58 finally:59 if proc_open:60 proc_open.kill()61 proc_open.communicate()62 proc_open.terminate()63if __name__ == '__main__':64 logging.debug("Starting")65 if lib_util.is_snapshot_behaviour():66 Main()67 else:68 while True:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!