Best Python code snippet using locust
performance_report.py
Source:performance_report.py
1#!/usr/bin/python2import sys3import os4if os.name != 'posix':5 sys.exit('platform not supported')6os.environ['PATH'] += ':/usr/bin:/sbin:/bin'7import atexit8import time9import commands10import re11import string12import argparse13import psutil14from threading import Thread15import json16import signal17import sys18# Exit statuses recognized by Nagios19NAGIOS_OK = 020NAGIOS_WARNING = 121NAGIOS_CRITICAL = 222NAGIOS_UNKNOWN = 323def toKbps(n):24 return float(n >> 7)25 26def byte_to_bit(n):27 return n << 328def trunc(f, n):29 '''Truncates/pads a float f to n decimal places without rounding'''30 slen = len('%.*f' % (n, f))31 return str(f)[:slen]32class CircularList:33 def __init__(self, size):34 self.list = []35 self.max_size = size36 37 def append(self, data):38 self.list = [data] + self.list[:self.max_size - 1]39 40 def avg(self):41 return sum(self.list) / float(len(self.list))42 43class processesAnalyzer(Thread):44 '''collect cpu process data and print them to the stdout'''45 def __init__ (self,refreshRate):46 Thread.__init__(self)47 self.refreshRate = refreshRate48 self.terminate = False49 50 def kill(self):51 self.terminate = True52 53 def run(self):54 while True:55 if self.terminate:56 return57 processList = []58 for p in psutil.process_iter():59 processList.append(p)60 try:61 processesSortByMem = sorted(processList, key=lambda p: p.get_memory_percent(), reverse=True)62 processesSortByProc = sorted(processList, key=lambda p: p.get_cpu_percent(interval=0), reverse=True)63 #to use later. Print top 5 processes on mem and proc usage64 printProcStatus = False65# if printProcStatus:66# print "sorted by memory usage"67# for i, p in zip(range(5),processesSortByMem):68# print (" process name: " + str(p.name) + " mem use: " + str(p.get_memory_percent()))69# print "\n"70# print "sorted by processor usage"71# for i, p in zip(range(5),processesSortByProc):72# print (" process name: " + str(p.name) + " proc use: " + str(p.get_cpu_percent(interval=0)))73# print "\n\n\n\n\n\n\n\n"74 except psutil.NoSuchProcess:75 #just to catch the error and avoid killing the thread76 #the raised error is because the process maybe killed before the get_cpu_percent or get_memory_percent calls77 pass78 time.sleep(self.refreshRate)79 80class Sender(Thread):81 def __init__(self, config, reporters):82 Thread.__init__(self)83 self.config = config84 self.reporters = reporters85 self.terminate = False86 def kill(self):87 self.terminate = True88 def threadLoop(self):89 for reporter in self.reporters:90 service = reporter.service91 message, state = reporter.data()92 self.__sendReport(service, state, message)93 94 def run(self):95 while not self.terminate:96 time.sleep(self.config.send_rate)97 self.threadLoop()98 99 def __sendReport(self, service, state, message):100 '''send report to nagios server'''101 print "%s\t%s\t%s\t%s\t" % (self.config.hostname, service, str(state), message)102 sys.stdout.flush()103class Reporter(Thread):104 '''base reporter thread class'''105 def __init__ (self, config):106 Thread.__init__(self)107 self.terminate = False108 self.config = config109 self.minimum = 0110 self.maximum = None111 112 def kill(self):113 #send kill sign to terminate the thread in the next data collection loop114 self.terminate = True115 116 def threadLoop(self):117 #nothing on the base class 118 #Should be implemented by each reporter service and set the self.state and self.message variables119 return120 121 def run(self):122 while not self.terminate:123 #call method that actually do what the threads needs to do124 self.threadLoop()125 def formatMessage(self, data, label, unit):126 list_avg = data.avg()127 list_max = max(data.list)128 list_min = min(data.list)129 if self.maximum == None:130 format = "%s_%%s=%%.2f%s;%d;%d;%d; " % (label, unit.replace("%", "%%"), self.warning, self.critical, self.minimum)131 else:132 format = "%s_%%s=%%.2f%s;%d;%d;%d;%d " % (label, unit.replace("%", "%%"), self.warning, self.critical, self.minimum, self.maximum)133 return format % ("avg", list_avg) + format % ("max", list_max) + format % ("min", list_min)134 135 def checkStatus(self, data):136 if data >= self.critical:137 return NAGIOS_CRITICAL138 elif data >= self.warning:139 return NAGIOS_WARNING140 else:141 return NAGIOS_OK142 143class MemoryReporter(Reporter):144 '''reporter class to collect and report memory data'''145 def __init__(self, config):146 Reporter.__init__(self, config)147 self.service = "Memory Report"148 self.list = CircularList(self.config.send_rate)149 self.maximum = psutil.phymem_usage().total / (1024 * 1024)150 self.warning = (self.config.memory_warning * self.maximum) / 100151 self.critical = (self.config.memory_critical * self.maximum) / 100152 153 def threadLoop(self):154 time.sleep(1)155 #self.list.append(psutil.phymem_usage().used / (1024 * 1024))156 self.list.append((psutil.phymem_usage().percent * self.maximum) / 100)157 158 def data(self):159 # message mount160 list_avg = self.list.avg()161 message = "Memory usage: %dMB of %dMB (%d%%)" % (list_avg, \162 self.maximum, (list_avg * 100) / self.maximum) \163 + "|" + self.formatMessage(self.list, "mem", "MB")164 # state mount165 state = self.checkStatus(list_avg)166 return message, state167class DiskReporter(Reporter):168 def __init__(self, config):169 Reporter.__init__(self, config)170 self.service = "Disk Report"171 self.list = CircularList(self.config.send_rate)172 self.maximum = psutil.disk_usage('/').total / (1024 * 1024 * 1024)173 self.warning = (self.config.disk_warning * self.maximum) / 100174 self.critical = (self.config.disk_critical * self.maximum) / 100175 176 def threadLoop(self):177 time.sleep(1)178 self.list.append((psutil.disk_usage('/').percent * self.maximum) / 100)179 180 def data(self):181 list_avg = self.list.avg()182 # message mount183 message = "Disk usage: %dGB of %dGB (%d%%)" % (list_avg, \184 self.maximum, (list_avg * 100) / self.maximum) \185 + "|" + self.formatMessage(self.list, "disk", "GB")186 # state mount187 state = self.checkStatus(list_avg)188 return message, state189class MountedDisksReporterHelper(Reporter):190 def __init__(self, config, path):191 Reporter.__init__(self, config)192 self.service = "Disk Report"193 self.mountedDiskPath = path194 self.maximum, self.unit = self.findBestUnit(float(psutil.disk_usage(self.mountedDiskPath).total))195 self.warning = (self.config.disk_warning * self.maximum) / 100196 self.critical = (self.config.disk_critical * self.maximum) / 100197 def findBestUnit(self, value):198 if value >= 1024 * 1024 * 1024 * 1024:199 return value / (1024 * 1024 * 1024 * 1024), "TB"200 if value >= 1024 * 1024 * 1024:201 return value / (1024 * 1024 * 1024), "GB"202 if value >= 1024 * 1024:203 return value / (1024 * 1024), "MB"204 if value >= 1024:205 return value / (1024), "KB"206 return value, "B"207 def data(self): 208 currentUsage = (psutil.disk_usage(self.mountedDiskPath).percent * self.maximum) / 100209 state = self.checkStatus(currentUsage)210 humamMessage = "%s: %d%s of %d%s (%d%%)" % (self.mountedDiskPath, currentUsage, self.unit, self.maximum, self.unit, (currentUsage * 100) / self.maximum) \211 nagiosMessage = self.formatMessage(currentUsage, self.mountedDiskPath, self.unit)212 return humamMessage, nagiosMessage, state213 def formatMessage(self, usage, label, unit):214 format = "%s%%s=%%.2f%s;%d;%d;%d;%d" % (label, unit.replace("%", "%%"), self.warning, self.critical, self.minimum, self.maximum)215 return format % ("", usage)216class MountedDisksReporter(Reporter):217 def __init__(self, config): 218 Reporter.__init__(self, config)219 self.service = "Mounted Disks Report"220 self.mountedDiskReporters = []221 # get all VALID mounted disks222 for partition in psutil.disk_partitions(all=False):223 if psutil.disk_usage(partition.mountpoint).total > 0:224 self.mountedDiskReporters.append(MountedDisksReporterHelper(config, partition.mountpoint))225 def data(self):226 humamMessages = []227 nagiosMessages = []228 diskStates = []229 230 for diskReporter in self.mountedDiskReporters:231 humamMessage, nagiosMessage, state = diskReporter.data()232 humamMessages.append(humamMessage)233 nagiosMessages.append(nagiosMessage)234 diskStates.append(state)235 message = self.formatMessage(humamMessages, nagiosMessages)236 return message,max(diskStates)237 def formatMessage(self,humamMessages, nagiosMessages):238 concatenatedHumamMessages = string.join(humamMessages, ', ')239 concatenatedNagiosMessages = string.join(nagiosMessages,' ')240 return concatenatedHumamMessages + "| " + concatenatedNagiosMessages241 def kill(self):242 self.terminate = True243 def threadLoop(self):244 time.sleep(1)245class ProcessorReporter(Reporter):246 '''reporter class to collect and report processor data'''247 def __init__ (self,config):248 Reporter.__init__(self, config)249 self.service = "Processor Report"250 self.list = CircularList(self.config.send_rate)251 self.maximum = 100252 self.warning = self.config.cpu_warning253 self.critical = self.config.cpu_critical254 self.processor = commands.getoutput("cat /proc/cpuinfo | grep 'model name' | head -n 1 | sed 's:.*\: *\(.*\):\\1:g' | sed 's/ */\ /g'")255 self.numberOfCores = psutil.NUM_CPUS256 257 def threadLoop(self):258 self.list.append(psutil.cpu_percent(1, percpu=False))259 260 def data(self):261 list_avg = self.list.avg()262 # message mount263 message = "CPU usage: %.1f%% Model: %s (%s cores) " % (list_avg, self.processor, self.numberOfCores) \264 + "|" + self.formatMessage(self.list, "cpu", "%") + "cores=" + str(self.numberOfCores) + ";;;;"265 # state mount266 state = self.checkStatus(list_avg)267 return message, state268class NetworkReporter(Reporter):269 '''reporter class to collect and report network data'''270 def __init__(self, config):271 Reporter.__init__(self, config)272 self.service = "Network Report"273 self.sent = CircularList(self.config.send_rate)274 self.recv = CircularList(self.config.send_rate)275 self.warning = self.config.network_warning * 1000 # in Mbit/s276 self.critical = self.config.network_critical * 1000 # in Mbit/s277 278 def threadLoop(self):279 pnic_before = psutil.network_io_counters(pernic=True)280 281 if not pnic_before.has_key(self.config.network_interface):282# print "Couldn't find the network interface %s" % (self.config.network_interface)283 self.config.network_interface = None284 for i in pnic_before.keys():285 if i != "lo":286 self.config.network_interface = i287 break288 if self.config.network_interface == None:289 return290# print "Using %s instead" % (self.config.network_interface)291 stats_before = pnic_before[self.config.network_interface]292 293 while not self.terminate:294 time.sleep(1)295 296 pnic_after = psutil.network_io_counters(pernic=True)297 stats_after = pnic_after[self.config.network_interface]298 # format bytes to string299 bytesSent = byte_to_bit(stats_after.bytes_sent - stats_before.bytes_sent) #toKbps(stats_after.bytes_sent - stats_before.bytes_sent) / 1300 bytesReceived = byte_to_bit(stats_after.bytes_recv - stats_before.bytes_recv) #toKbps(stats_after.bytes_recv - stats_before.bytes_recv) / 1301 # store on a circular list302 self.sent.append(bytesSent)303 self.recv.append(bytesReceived)304 stats_before = stats_after305 306 def normalize(self, value):307 if value >= 1000000000:308 return (value / 1000000000, "Gbit/s")309 elif value >= 1000000:310 return (value / 1000000, "Mbit/s")311 elif value >= 1000:312 return (value / 1000, "kbit/s")313 else:314 return (value, "bit/s")315 def data(self):316 sent_avg = self.sent.avg()317 recv_avg = self.recv.avg()318 # state mount319 state = max(int(self.checkStatus(sent_avg)), int(self.checkStatus(recv_avg)))320 sent_avg, sent_unit = self.normalize(sent_avg)321 recv_avg, recv_unit = self.normalize(recv_avg)322 # message mount323 message = "Network bandwidth used: up %.1f%s - down %.1f%s" \324 % (sent_avg, sent_unit, recv_avg, recv_unit) + " |" \325 + self.formatMessage(self.sent, "sent", "") \326 + self.formatMessage(self.recv, "recv", "")327 return message, state328def parse_args():329 parser = argparse.ArgumentParser(description = "Fetches information for a Performance Reporter")330 parser.add_argument("--network_interface",331 required = False,332 help = "network interface to be monitored",333 dest = "network_interface",334 default = "eth0",335 metavar = "<network_interface>")336 parser.add_argument("--hostname",337 required = False,338 help = "name of the caller host",339 dest = "hostname",340 default = "`ifconfig | grep 'inet addr:'| grep -v '127.0.0.1' | cut -d: -f2 | awk '{ print $1}'`",341 metavar = "<hostname>")342 parser.add_argument("--send_rate",343 required = False,344 help = "set the interval in which the script will send data to the Nagios server, in seconds",345 dest = "send_rate",346 default = "2",347 metavar = "<send_rate>")348 parser.add_argument("--network-warning", required=False, default="40000",349 help="define the warning limit in kbps", dest="network_warning", 350 metavar="<network_warning>")351 parser.add_argument("--network-critical", required=False, default="70000", 352 help="define the critical limit in kbps", dest="network_critical", 353 metavar="<network_critical>")354 parser.add_argument("--cpu-warning", required=False, default="90", 355 help="define the warning limit in %", dest="cpu_warning", 356 metavar="<cpu_warning>")357 parser.add_argument("--cpu-critical", required=False, default="100", 358 help="define the critical limit in %", dest="cpu_critical", 359 metavar="<cpu_critical>")360 parser.add_argument("--memory-warning", required=False, default="70", 361 help="define the warning limit in %", dest="memory_warning", 362 metavar="<memory_warning>")363 parser.add_argument("--memory-critical", required=False, default="90", 364 help="define the critical limit in %", dest="memory_critical", 365 metavar="<memory_critical>")366 parser.add_argument("--disk-warning", required=False, default="80", 367 help="define the warning limit in %", dest="disk_warning", 368 metavar="<disk_warning>")369 parser.add_argument("--disk-critical", required=False, default="90", 370 help="define the critical limit in %", dest="disk_critical", 371 metavar="<disk_critical>")372 return parser.parse_args()373class Configuration:374 def __init__(self, args):375 self.network_interface = args.network_interface376 self.hostname = args.hostname377 self.send_rate = int(args.send_rate)378 self.network_warning = int(args.network_warning)379 self.network_critical = int(args.network_critical)380 self.cpu_warning = int(args.cpu_warning)381 self.cpu_critical = int(args.cpu_critical)382 self.memory_warning = int(args.memory_warning)383 self.memory_critical = int(args.memory_critical)384 self.disk_warning = int(args.disk_warning)385 self.disk_critical = int(args.disk_critical)386# http://stackoverflow.com/questions/1112343/how-do-i-capture-sigint-in-python387def signal_handler(signal, frame):388 print '\nYou pressed Ctrl+C!'389 sender.kill()390 for reporterThread in threadsList:391 reporterThread.kill()392 sender.join()393 for reporterThread in threadsList:394 reporterThread.join()395 sys.exit(0)396 397if __name__ == '__main__':398 threadsList = []399 400 config = Configuration(parse_args())401 402 # here we should have the main call to the reporter threads403 threadsList.append(NetworkReporter(config))404 threadsList.append(ProcessorReporter(config))405 threadsList.append(MemoryReporter(config))406 threadsList.append(DiskReporter(config))407 threadsList.append(MountedDisksReporter(config))408 #processesAnalyzer thread409# threadsList.append(processesAnalyzer(config))410 sender = Sender(config, threadsList)411 # start every thread412 for reporterThread in threadsList:413 reporterThread.start()414 sender.start()415 416 signal.signal(signal.SIGINT, signal_handler)417 print 'Press Ctrl+C'418 signal.pause()...
api.py
Source:api.py
1# -*- coding: utf-8 -*-2"""3This module contains the public API.4Assume you have a function for batched computation of nearest neighbors using brute-force distance calculation.5.. code-block:: python6 import torch7 def knn(x, y, batch_size, k: int = 3):8 return torch.cat(9 [10 torch.cdist(x[start : start + batch_size], y).topk(k=k, dim=1, largest=False).indices11 for start in range(0, x.shape[0], batch_size)12 ],13 dim=0,14 )15Using :func:`maximize_memory_utilization` you can decorate this function to reduce the batch size until no more16out-of-memory error occurs.17.. code-block:: python18 import torch19 from torch_max_mem import maximize_memory_utilization20 @maximize_memory_utilization()21 def knn(x, y, batch_size, k: int = 3):22 return torch.cat(23 [24 torch.cdist(x[start : start + batch_size], y).topk(k=k, dim=0, largest=False).indices25 for start in range(0, x.shape[0], batch_size)26 ],27 dim=0,28 )29In the code, you can now always pass the largest sensible batch size, e.g.,30.. code-block:: python31 x = torch.rand(100, 100, device="cuda")32 y = torch.rand(200, 100, device="cuda")33 knn(x, y, batch_size=x.shape[0])34"""35# cf. https://gist.github.com/mberr/c37a8068b38cabc98228db2cbe35804336import functools37import inspect38import itertools39import logging40from typing import Any, Callable, Collection, Mapping, MutableMapping, Optional, Tuple, TypeVar41import torch42logger = logging.getLogger(__name__)43__all__ = [44 "maximize_memory_utilization",45]46R = TypeVar("R")47def is_oom_error(error: RuntimeError) -> bool:48 """Check whether a runtime error was caused by insufficient memory."""49 if not error.args:50 logger.debug(f"Cannot check empty error message for {error}.")51 return False52 message = error.args[0]53 logger.debug(f"Checking error for OOM: {message}")54 # CUDA out of memory55 if "CUDA out of memory." in message:56 return True57 # CUDA error (dimension was larger than int limit)58 if "RuntimeError: CUDA error: invalid configuration argument" in message:59 return True60 # CPU out of memory61 if "DefaultCPUAllocator: can't allocate memory:" in message:62 return True63 return False64def maximize_memory_utilization_decorator(65 parameter_name: str = "batch_size",66 q: int = 32,67 cpu_warning: bool = True,68) -> Callable[[Callable[..., R]], Callable[..., Tuple[R, int]]]:69 """70 Create decorators to create methods for memory utilization maximization.71 :param parameter_name:72 The parameter name.73 :param q:74 Prefer multiples of q as size.75 :param cpu_warning:76 Whether to check the input for CPU tensors and warn about potential CPU OOM problems.77 :return:78 A decorator for functions.79 """80 if cpu_warning:81 def check_for_cpu_tensors(*args, **kwargs):82 """Check whether any tensor argument is on CPU."""83 if any(84 (torch.is_tensor(obj) and obj.device.type == "cpu")85 for obj in itertools.chain(args, kwargs.values())86 ):87 logger.warning(88 "Using maximize_memory_utilization on non-CUDA tensors. This may lead to "89 "undocumented crashes due to CPU OOM killer.",90 )91 else:92 def check_for_cpu_tensors(*args, **kwargs):93 """Skip checking whether any tensor argument is on CPU."""94 def decorator_maximize_memory_utilization(95 func: Callable[..., R],96 ) -> Callable[..., Tuple[R, int]]:97 """98 Decorate a function to maximize memory utilization.99 :param func:100 The function to decorate.101 :return:102 The decorated function.103 :raises ValueError:104 if the provided function does not contain a suitable parameter105 """106 # Input validation107 signature = inspect.signature(func)108 if parameter_name not in signature.parameters.keys():109 raise ValueError(f"{func} does not have a parameter {parameter_name}.")110 _parameter = signature.parameters[parameter_name]111 if _parameter.kind != inspect.Parameter.POSITIONAL_OR_KEYWORD:112 # TODO: we could also support positional ones by saving the position113 raise ValueError(f"{parameter_name} must be a keyword based parameter.")114 if _parameter.annotation != inspect.Parameter.empty and _parameter.annotation != int:115 logger.warning(116 f"Memory utilization maximization is written for integer parameters, but the "117 f"{parameter_name} is annotated as {_parameter.annotation}",118 )119 if _parameter.default != inspect.Parameter.empty:120 default_max_value = _parameter.default121 else:122 default_max_value = None123 @functools.wraps(func)124 def wrapper_maximize_memory_utilization(*args, **kwargs) -> Tuple[R, int]:125 """126 Wrap a function to maximize memory utilization by successive halving.127 :param args:128 The positional arguments.129 :param kwargs:130 The key-word based arguments.131 :return:132 A tuple (result, max_value).133 :raises RuntimeError:134 any runtime error which was not caused by (CUDA) OOM.135 :raises MemoryError:136 if the execution did not even succeed with the smallest parameter value137 :raises ValueError:138 if an invalid (or no) maximum parameter value is found139 """140 check_for_cpu_tensors(*args, **kwargs)141 max_value = kwargs.pop(parameter_name, default_max_value)142 if max_value is None:143 raise ValueError(144 f"Invalid maximum value for parameter {parameter_name}: {max_value}",145 )146 elif callable(max_value):147 max_value = max_value(*args, **kwargs)148 while max_value > 0:149 p_kwargs = {150 parameter_name: max_value,151 }152 try:153 return func(*args, **p_kwargs, **kwargs), max_value154 except RuntimeError as runtime_error:155 # clear cache156 torch.cuda.empty_cache()157 # check whether the error is an out-of-memory error158 if not is_oom_error(error=runtime_error):159 raise runtime_error160 logger.info(f"Execution failed with {parameter_name}={max_value}")161 max_value //= 2162 if max_value > q:163 max_value = max_value // q * q164 raise MemoryError(f"Execution did not even succeed with {parameter_name}=1.")165 return wrapper_maximize_memory_utilization166 return decorator_maximize_memory_utilization167class KeyHasher:168 """A hasher based on (a subset of) keys."""169 def __init__(self, keys: Optional[Collection[str]]) -> None:170 """171 Initialize the hasher.172 :param keys:173 the keys whose associated values should be used for hashing174 """175 self.keys = keys or []176 def __call__(self, kwargs: Mapping[str, Any]) -> int:177 """178 Calculate the hash based on the values associated with the selected keys.179 :param kwargs:180 the key-value dictionary181 :return:182 the hash of the tuple of values associated with the stored keys.183 """184 return hash(tuple(*(kwargs.get(key, None) for key in self.keys)))185class MemoryUtilizationMaximizer:186 """Stateful memory utilization maximizer."""187 def __init__(188 self,189 parameter_name: str = "batch_size",190 q: int = 32,191 cpu_warning: bool = True,192 hasher: Optional[Callable[[Mapping[str, Any]], int]] = None,193 keys: Optional[str] = None,194 ) -> None:195 """196 Initialize the stateful maximizer.197 :param parameter_name:198 The parameter name.199 :param q:200 Prefer multiples of q as size.201 :param cpu_warning:202 Whether to check the input for CPU tensors and warn about potential CPU OOM problems.203 :param hasher:204 a hashing function for separate parameter values depending on hash value; if None, use the same for all205 :param keys:206 the keys to use for creating a hasher. Only used if hasher is None.207 """208 self.parameter_name = parameter_name209 self.q = q210 self.cpu_warning = cpu_warning211 self.parameter_value: MutableMapping[int, int] = dict()212 if hasher is None:213 hasher = KeyHasher(keys=keys)214 self.hasher = hasher215 def __call__(self, func: Callable[..., R]) -> Callable[..., R]:216 """Wrap the function."""217 wrapped = maximize_memory_utilization_decorator(218 parameter_name=self.parameter_name,219 q=self.q,220 cpu_warning=self.cpu_warning,221 )(func)222 @functools.wraps(wrapped)223 def inner(*args, **kwargs):224 """Evaluate function with the stored parameter size."""225 h = self.hasher(kwargs)226 kwargs[self.parameter_name] = self.parameter_value.get(h) or kwargs[self.parameter_name]227 result, self.parameter_value[h] = wrapped(*args, **kwargs)228 return result229 return inner230# alias...
monitor.py
Source:monitor.py
1"""2[Unit]3Description=Monitor Server Ram, Cpu, Disk4After=multi-user.target5[Service]6Type=simple7ExecStart=/home/do/Desktop/file/service/.venv/bin/python3 /home/do/Desktop/file/service/monitor_disk_ram.py8ExecStop=/bin/kill -s QUIT $MAINPID9[Install]10WantedBy=multi-user.target11"""12from collections import namedtuple13import psutil, time, datetime, threading14from threading import Thread15RamInfo = namedtuple('RamInfo', ['total', 'available', 'used', 'free', 'ram_percent', 'ram_percent_txt', 'ram'])16def truncate(num, n):17 integer = int(num * (10 ** n)) / (10 ** n)18 return float(integer)19def current_milli_time():20 return round(time.time() * 1000)21def byte_to_gigabyte(byte):22 return byte / 1024 ** 323def get_ram_info():24 ram = psutil.virtual_memory()25 swap_ram = psutil.swap_memory()26 return {27 'ram_total': truncate(byte_to_gigabyte(ram.total), 2),28 'ram_available': truncate(byte_to_gigabyte(ram.available), 2),29 'ram_used': truncate(byte_to_gigabyte(ram.used), 2),30 'ram_free': truncate(byte_to_gigabyte(ram.free), 2),31 'ram_percent': ram.percent,32 'ram_percent_txt': str(ram.percent) + ' %',33 'swap_ram_total': truncate(byte_to_gigabyte(swap_ram.total), 2),34 'swap_ram_used': truncate(byte_to_gigabyte(swap_ram.used), 2),35 'swap_ram_free': truncate(byte_to_gigabyte(swap_ram.free), 2),36 'swap_ram_percent': swap_ram.percent,37 'swap_ram_percent_txt': str(swap_ram.percent) + ' %',38 'ram': ram,39 'swap_ram': swap_ram40 }41def get_disk_info():42 disk = psutil.disk_usage('/')43 return {44 'disk_total': truncate(byte_to_gigabyte(disk.total), 2),45 'disk_used': truncate(byte_to_gigabyte(disk.used), 2),46 'disk_free': truncate(byte_to_gigabyte(disk.free), 2),47 'disk_percent': disk.percent,48 'disk_percent_txt': str(disk.percent) + ' %',49 'disk': disk50 }51folder_save = '/home/do/Desktop/file/'52file_disk = folder_save + 'disk.txt'53file_ram = folder_save + 'ram.txt'54file_cpu = folder_save + 'cpu.txt'55repeat_time = 1 # thá»i gian monitor lặp lại [ram, disk, cpu_prenc56# Äo lÆ°á»ng mức Äá» sá» dụng tà i nguyên trung bình trong má»t khoảng thá»i gian mesure_workload (giây) Äá» ÄÆ°a ra cảnh báo57mesure_workload = 1058ram_warning = 5059disk_warning = 9060cpu_warning = 9061def start_monitor_ram_and_disk():62 list_avg_ram = []63 list_avg_disk = []64 with open(file_disk, 'w', encoding='utf-8') as f:65 f.write('disk_free,disk_used,percent\n')66 with open(file_ram, 'w', encoding='utf-8') as f:67 f.write('ram_free,ram_used,ram_percent\n')68 while True:69 if len(list_avg_disk) >= mesure_workload / repeat_time:70 avg_ram = sum(list_avg_ram) / len(list_avg_ram)71 avg_disk = sum(list_avg_disk) / len(list_avg_disk)72 print(avg_ram)73 print(avg_disk)74 if avg_ram > ram_warning:75 # TODO: warning ram overload76 print(f'Ram overload {datetime.datetime.now()}')77 if avg_disk > disk_warning:78 # TODO: warning disk overload79 print(f'Disk overload {datetime.datetime.now()}')80 list_avg_ram.clear()81 list_avg_disk.clear()82 disk = get_disk_info()83 list_avg_disk.append(disk['disk_percent'])84 result = str(disk['disk_free']) + ',' + str(disk['disk_used']) + ',' + str(disk['disk_percent']) + '\n'85 with open(file_disk, 'a', encoding='utf-8') as f:86 f.write(result)87 print('monitor disk', result)88 print(threading.current_thread())89 ram = get_ram_info()90 list_avg_ram.append(ram['ram_percent'])91 result = str(ram['ram_free']) + ',' + str(ram['ram_used']) + ',' + str(ram['ram_percent']) + '\n'92 with open(file_ram, 'a', encoding='utf-8') as f:93 f.write(result)94 print('monitor ram', result)95 time.sleep(repeat_time)96def start_monitor_cpu():97 with open(file_cpu, 'w', encoding='utf-8') as f:98 f.write('percent\n')99 while True:100 percent = psutil.cpu_percent(interval=repeat_time)101 with open(file_cpu, 'a', encoding='utf-8') as f:102 f.write(str(percent) + '\n')103 print('monitor cpu:', percent)104 print(threading.current_thread())105def start_montior_warning_cpu():106 while True:107 percent = psutil.cpu_percent(interval=mesure_workload)108 if percent >= cpu_warning:109 # TODO: Warning110 print(f'Disk overload {datetime.datetime.now()}')111 print('warning cpu:', percent)112if __name__ == '__main__':113 thread_ram_disk = threading.Thread(target=start_monitor_ram_and_disk, name='Thread Monitor Ram Disk')114 thread_monitor_cpu = Thread(target=start_monitor_cpu, name='Thread Monitor Cpu')115 thread_warning_cpu = Thread(target=start_montior_warning_cpu, name='Thread Warning Cpu')116 thread_ram_disk.start()117 thread_monitor_cpu.start()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!