How to use request_stats method in locust

Best Python code snippet using locust

server.py

Source:server.py Github

copy

Full Screen

1# Copyright (C) 2019 Garmin Ltd.2#3# SPDX-License-Identifier: GPL-2.0-only4#5from contextlib import closing6from datetime import datetime7import asyncio8import json9import logging10import math11import os12import signal13import socket14import time15from . import chunkify, DEFAULT_MAX_CHUNK16logger = logging.getLogger('hashserv.server')17class Measurement(object):18 def __init__(self, sample):19 self.sample = sample20 def start(self):21 self.start_time = time.perf_counter()22 def end(self):23 self.sample.add(time.perf_counter() - self.start_time)24 def __enter__(self):25 self.start()26 return self27 def __exit__(self, *args, **kwargs):28 self.end()29class Sample(object):30 def __init__(self, stats):31 self.stats = stats32 self.num_samples = 033 self.elapsed = 034 def measure(self):35 return Measurement(self)36 def __enter__(self):37 return self38 def __exit__(self, *args, **kwargs):39 self.end()40 def add(self, elapsed):41 self.num_samples += 142 self.elapsed += elapsed43 def end(self):44 if self.num_samples:45 self.stats.add(self.elapsed)46 self.num_samples = 047 self.elapsed = 048class Stats(object):49 def __init__(self):50 self.reset()51 def reset(self):52 self.num = 053 self.total_time = 054 self.max_time = 055 self.m = 056 self.s = 057 self.current_elapsed = None58 def add(self, elapsed):59 self.num += 160 if self.num == 1:61 self.m = elapsed62 self.s = 063 else:64 last_m = self.m65 self.m = last_m + (elapsed - last_m) / self.num66 self.s = self.s + (elapsed - last_m) * (elapsed - self.m)67 self.total_time += elapsed68 if self.max_time < elapsed:69 self.max_time = elapsed70 def start_sample(self):71 return Sample(self)72 @property73 def average(self):74 if self.num == 0:75 return 076 return self.total_time / self.num77 @property78 def stdev(self):79 if self.num <= 1:80 return 081 return math.sqrt(self.s / (self.num - 1))82 def todict(self):83 return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}84class ClientError(Exception):85 pass86class ServerClient(object):87 FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'88 ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1'89 def __init__(self, reader, writer, db, request_stats):90 self.reader = reader91 self.writer = writer92 self.db = db93 self.request_stats = request_stats94 self.max_chunk = DEFAULT_MAX_CHUNK95 self.handlers = {96 'get': self.handle_get,97 'report': self.handle_report,98 'report-equiv': self.handle_equivreport,99 'get-stream': self.handle_get_stream,100 'get-stats': self.handle_get_stats,101 'reset-stats': self.handle_reset_stats,102 'chunk-stream': self.handle_chunk,103 }104 async def process_requests(self):105 try:106 self.addr = self.writer.get_extra_info('peername')107 logger.debug('Client %r connected' % (self.addr,))108 # Read protocol and version109 protocol = await self.reader.readline()110 if protocol is None:111 return112 (proto_name, proto_version) = protocol.decode('utf-8').rstrip().split()113 if proto_name != 'OEHASHEQUIV':114 return115 proto_version = tuple(int(v) for v in proto_version.split('.'))116 if proto_version < (1, 0) or proto_version > (1, 1):117 return118 # Read headers. Currently, no headers are implemented, so look for119 # an empty line to signal the end of the headers120 while True:121 line = await self.reader.readline()122 if line is None:123 return124 line = line.decode('utf-8').rstrip()125 if not line:126 break127 # Handle messages128 while True:129 d = await self.read_message()130 if d is None:131 break132 await self.dispatch_message(d)133 await self.writer.drain()134 except ClientError as e:135 logger.error(str(e))136 finally:137 self.writer.close()138 async def dispatch_message(self, msg):139 for k in self.handlers.keys():140 if k in msg:141 logger.debug('Handling %s' % k)142 if 'stream' in k:143 await self.handlers[k](msg[k])144 else:145 with self.request_stats.start_sample() as self.request_sample, \146 self.request_sample.measure():147 await self.handlers[k](msg[k])148 return149 raise ClientError("Unrecognized command %r" % msg)150 def write_message(self, msg):151 for c in chunkify(json.dumps(msg), self.max_chunk):152 self.writer.write(c.encode('utf-8'))153 async def read_message(self):154 l = await self.reader.readline()155 if not l:156 return None157 try:158 message = l.decode('utf-8')159 if not message.endswith('\n'):160 return None161 return json.loads(message)162 except (json.JSONDecodeError, UnicodeDecodeError) as e:163 logger.error('Bad message from client: %r' % message)164 raise e165 async def handle_chunk(self, request):166 lines = []167 try:168 while True:169 l = await self.reader.readline()170 l = l.rstrip(b"\n").decode("utf-8")171 if not l:172 break173 lines.append(l)174 msg = json.loads(''.join(lines))175 except (json.JSONDecodeError, UnicodeDecodeError) as e:176 logger.error('Bad message from client: %r' % message)177 raise e178 if 'chunk-stream' in msg:179 raise ClientError("Nested chunks are not allowed")180 await self.dispatch_message(msg)181 async def handle_get(self, request):182 method = request['method']183 taskhash = request['taskhash']184 if request.get('all', False):185 row = self.query_equivalent(method, taskhash, self.ALL_QUERY)186 else:187 row = self.query_equivalent(method, taskhash, self.FAST_QUERY)188 if row is not None:189 logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))190 d = {k: row[k] for k in row.keys()}191 self.write_message(d)192 else:193 self.write_message(None)194 async def handle_get_stream(self, request):195 self.write_message('ok')196 while True:197 l = await self.reader.readline()198 if not l:199 return200 try:201 # This inner loop is very sensitive and must be as fast as202 # possible (which is why the request sample is handled manually203 # instead of using 'with', and also why logging statements are204 # commented out.205 self.request_sample = self.request_stats.start_sample()206 request_measure = self.request_sample.measure()207 request_measure.start()208 l = l.decode('utf-8').rstrip()209 if l == 'END':210 self.writer.write('ok\n'.encode('utf-8'))211 return212 (method, taskhash) = l.split()213 #logger.debug('Looking up %s %s' % (method, taskhash))214 row = self.query_equivalent(method, taskhash, self.FAST_QUERY)215 if row is not None:216 msg = ('%s\n' % row['unihash']).encode('utf-8')217 #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))218 else:219 msg = '\n'.encode('utf-8')220 self.writer.write(msg)221 finally:222 request_measure.end()223 self.request_sample.end()224 await self.writer.drain()225 async def handle_report(self, data):226 with closing(self.db.cursor()) as cursor:227 cursor.execute('''228 -- Find tasks with a matching outhash (that is, tasks that229 -- are equivalent)230 SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash231 -- If there is an exact match on the taskhash, return it.232 -- Otherwise return the oldest matching outhash of any233 -- taskhash234 ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,235 created ASC236 -- Only return one row237 LIMIT 1238 ''', {k: data[k] for k in ('method', 'outhash', 'taskhash')})239 row = cursor.fetchone()240 # If no matching outhash was found, or one *was* found but it241 # wasn't an exact match on the taskhash, a new entry for this242 # taskhash should be added243 if row is None or row['taskhash'] != data['taskhash']:244 # If a row matching the outhash was found, the unihash for245 # the new taskhash should be the same as that one.246 # Otherwise the caller provided unihash is used.247 unihash = data['unihash']248 if row is not None:249 unihash = row['unihash']250 insert_data = {251 'method': data['method'],252 'outhash': data['outhash'],253 'taskhash': data['taskhash'],254 'unihash': unihash,255 'created': datetime.now()256 }257 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):258 if k in data:259 insert_data[k] = data[k]260 cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % (261 ', '.join(sorted(insert_data.keys())),262 ', '.join(':' + k for k in sorted(insert_data.keys()))),263 insert_data)264 self.db.commit()265 logger.info('Adding taskhash %s with unihash %s',266 data['taskhash'], unihash)267 d = {268 'taskhash': data['taskhash'],269 'method': data['method'],270 'unihash': unihash271 }272 else:273 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}274 self.write_message(d)275 async def handle_equivreport(self, data):276 with closing(self.db.cursor()) as cursor:277 insert_data = {278 'method': data['method'],279 'outhash': "",280 'taskhash': data['taskhash'],281 'unihash': data['unihash'],282 'created': datetime.now()283 }284 for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):285 if k in data:286 insert_data[k] = data[k]287 cursor.execute('''INSERT OR IGNORE INTO tasks_v2 (%s) VALUES (%s)''' % (288 ', '.join(sorted(insert_data.keys())),289 ', '.join(':' + k for k in sorted(insert_data.keys()))),290 insert_data)291 self.db.commit()292 # Fetch the unihash that will be reported for the taskhash. If the293 # unihash matches, it means this row was inserted (or the mapping294 # was already valid)295 row = self.query_equivalent(data['method'], data['taskhash'], self.FAST_QUERY)296 if row['unihash'] == data['unihash']:297 logger.info('Adding taskhash equivalence for %s with unihash %s',298 data['taskhash'], row['unihash'])299 d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}300 self.write_message(d)301 async def handle_get_stats(self, request):302 d = {303 'requests': self.request_stats.todict(),304 }305 self.write_message(d)306 async def handle_reset_stats(self, request):307 d = {308 'requests': self.request_stats.todict(),309 }310 self.request_stats.reset()311 self.write_message(d)312 def query_equivalent(self, method, taskhash, query):313 # This is part of the inner loop and must be as fast as possible314 try:315 cursor = self.db.cursor()316 cursor.execute(query, {'method': method, 'taskhash': taskhash})317 return cursor.fetchone()318 except:319 cursor.close()320class Server(object):321 def __init__(self, db, loop=None):322 self.request_stats = Stats()323 self.db = db324 if loop is None:325 self.loop = asyncio.new_event_loop()326 self.close_loop = True327 else:328 self.loop = loop329 self.close_loop = False330 self._cleanup_socket = None331 def start_tcp_server(self, host, port):332 self.server = self.loop.run_until_complete(333 asyncio.start_server(self.handle_client, host, port, loop=self.loop)334 )335 for s in self.server.sockets:336 logger.info('Listening on %r' % (s.getsockname(),))337 # Newer python does this automatically. Do it manually here for338 # maximum compatibility339 s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)340 s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)341 name = self.server.sockets[0].getsockname()342 if self.server.sockets[0].family == socket.AF_INET6:343 self.address = "[%s]:%d" % (name[0], name[1])344 else:345 self.address = "%s:%d" % (name[0], name[1])346 def start_unix_server(self, path):347 def cleanup():348 os.unlink(path)349 cwd = os.getcwd()350 try:351 # Work around path length limits in AF_UNIX352 os.chdir(os.path.dirname(path))353 self.server = self.loop.run_until_complete(354 asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop)355 )356 finally:357 os.chdir(cwd)358 logger.info('Listening on %r' % path)359 self._cleanup_socket = cleanup360 self.address = "unix://%s" % os.path.abspath(path)361 async def handle_client(self, reader, writer):362 # writer.transport.set_write_buffer_limits(0)363 try:364 client = ServerClient(reader, writer, self.db, self.request_stats)365 await client.process_requests()366 except Exception as e:367 import traceback368 logger.error('Error from client: %s' % str(e), exc_info=True)369 traceback.print_exc()370 writer.close()371 logger.info('Client disconnected')372 def serve_forever(self):373 def signal_handler():374 self.loop.stop()375 self.loop.add_signal_handler(signal.SIGTERM, signal_handler)376 try:377 self.loop.run_forever()378 except KeyboardInterrupt:379 pass380 self.server.close()381 self.loop.run_until_complete(self.server.wait_closed())382 logger.info('Server shutting down')383 if self.close_loop:384 self.loop.close()385 if self._cleanup_socket is not None:...

Full Screen

Full Screen

stats.py

Source:stats.py Github

copy

Full Screen

1from flask import current_app2from app.server.util.utils import timestamp3# We use a list to calculate requests per second4request_stats = []5def add_request():6 t = timestamp()7 while len(request_stats) > 0 and request_stats[0] < t - current_app.config['REQUEST_STATS_WINDOW']:8 del request_stats[0]9 request_stats.append(t)10def requests_per_second():...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful