Best Python code snippet using locust
runners.py
Source:runners.py
1# -*- coding: utf-8 -*-2import functools3import json4import logging5import os6import re7import socket8import sys9import time10import traceback11from collections import defaultdict12from collections.abc import MutableMapping13from operator import (14 itemgetter,15 methodcaller,16)17from typing import (18 Dict,19 Iterator,20 List,21 Union,22 ValuesView,23)24from uuid import uuid425import gevent26import greenlet27import psutil28from gevent.pool import Group29from . import User30from locust import __version__31from .dispatch import UsersDispatcher32from .exception import RPCError33from .log import greenlet_exception_logger34from .rpc import (35 Message,36 rpc,37)38from .stats import (39 RequestStats,40 setup_distributed_stats_event_listeners,41)42from . import argument_parser43logger = logging.getLogger(__name__)44STATE_INIT, STATE_SPAWNING, STATE_RUNNING, STATE_CLEANUP, STATE_STOPPING, STATE_STOPPED, STATE_MISSING = [45 "ready",46 "spawning",47 "running",48 "cleanup",49 "stopping",50 "stopped",51 "missing",52]53WORKER_REPORT_INTERVAL = 3.054CPU_MONITOR_INTERVAL = 5.055HEARTBEAT_INTERVAL = 156HEARTBEAT_LIVENESS = 357FALLBACK_INTERVAL = 558greenlet_exception_handler = greenlet_exception_logger(logger)59class Runner:60 """61 Orchestrates the load test by starting and stopping the users.62 Use one of the :meth:`create_local_runner <locust.env.Environment.create_local_runner>`,63 :meth:`create_master_runner <locust.env.Environment.create_master_runner>` or64 :meth:`create_worker_runner <locust.env.Environment.create_worker_runner>` methods on65 the :class:`Environment <locust.env.Environment>` instance to create a runner of the66 desired type.67 """68 def __init__(self, environment):69 self.environment = environment70 self.user_greenlets = Group()71 self.greenlet = Group()72 self.state = STATE_INIT73 self.spawning_greenlet = None74 self.shape_greenlet = None75 self.shape_last_state = None76 self.current_cpu_usage = 077 self.cpu_warning_emitted = False78 self.worker_cpu_warning_emitted = False79 self.greenlet.spawn(self.monitor_cpu).link_exception(greenlet_exception_handler)80 self.exceptions = {}81 self.target_user_classes_count: Dict[str, int] = {}82 self.custom_messages = {}83 # Only when running in standalone mode (non-distributed)84 self._local_worker_node = WorkerNode(id="local")85 self._local_worker_node.user_classes_count = self.user_classes_count86 self._users_dispatcher = None87 # set up event listeners for recording requests88 def on_request_success(request_type, name, response_time, response_length, **_kwargs):89 self.stats.log_request(request_type, name, response_time, response_length)90 def on_request_failure(request_type, name, response_time, response_length, exception, **_kwargs):91 self.stats.log_request(request_type, name, response_time, response_length)92 self.stats.log_error(request_type, name, exception)93 # temporarily set log level to ignore warnings to suppress deprication message94 loglevel = logging.getLogger().level95 logging.getLogger().setLevel(logging.ERROR)96 self.environment.events.request_success.add_listener(on_request_success)97 self.environment.events.request_failure.add_listener(on_request_failure)98 logging.getLogger().setLevel(loglevel)99 self.connection_broken = False100 # register listener that resets stats when spawning is complete101 def on_spawning_complete(user_count):102 self.update_state(STATE_RUNNING)103 if environment.reset_stats:104 logger.info("Resetting stats\n")105 self.stats.reset_all()106 self.environment.events.spawning_complete.add_listener(on_spawning_complete)107 def __del__(self):108 # don't leave any stray greenlets if runner is removed109 if self.greenlet and len(self.greenlet) > 0:110 self.greenlet.kill(block=False)111 @property112 def user_classes(self):113 return self.environment.user_classes114 @property115 def user_classes_by_name(self):116 return self.environment.user_classes_by_name117 @property118 def stats(self) -> RequestStats:119 return self.environment.stats120 @property121 def errors(self):122 return self.stats.errors123 @property124 def user_count(self):125 """126 :returns: Number of currently running users127 """128 return len(self.user_greenlets)129 @property130 def user_classes_count(self) -> Dict[str, int]:131 """132 :returns: Number of currently running users for each user class133 """134 user_classes_count = {user_class.__name__: 0 for user_class in self.user_classes}135 for user_greenlet in self.user_greenlets:136 try:137 user = user_greenlet.args[0]138 except IndexError:139 # TODO: Find out why args is sometimes empty. In gevent code,140 # the supplied args are cleared in the gevent.greenlet.Greenlet.__free,141 # so it seems a good place to start investigating. My suspicion is that142 # the supplied args are emptied whenever the greenlet is dead, so we can143 # simply ignore the greenlets with empty args.144 logger.debug(145 "ERROR: While calculating number of running users, we encountered a user that didnt have proper args %s (user_greenlet.dead=%s)",146 user_greenlet,147 user_greenlet.dead,148 )149 continue150 user_classes_count[user.__class__.__name__] += 1151 return user_classes_count152 def update_state(self, new_state):153 """154 Updates the current state155 """156 # I (cyberwiz) commented out this logging, because it is too noisy even for debug level157 # Uncomment it if you are specifically debugging state transitions158 # logger.debug("Updating state to '%s', old state was '%s'" % (new_state, self.state))159 self.state = new_state160 def cpu_log_warning(self):161 """Called at the end of the test to repeat the warning & return the status"""162 if self.cpu_warning_emitted:163 logger.warning(164 "CPU usage was too high at some point during the test! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines"165 )166 return True167 return False168 def spawn_users(self, user_classes_spawn_count: Dict[str, int], wait: bool = False):169 if self.state == STATE_INIT or self.state == STATE_STOPPED:170 self.update_state(STATE_SPAWNING)171 logger.debug(172 "Spawning additional %s (%s already running)..."173 % (json.dumps(user_classes_spawn_count), json.dumps(self.user_classes_count))174 )175 def spawn(user_class: str, spawn_count: int):176 n = 0177 while n < spawn_count:178 new_user = self.user_classes_by_name[user_class](self.environment)179 new_user.start(self.user_greenlets)180 n += 1181 if n % 10 == 0 or n == spawn_count:182 logger.debug("%i users spawned" % self.user_count)183 logger.debug("All users of class %s spawned" % user_class)184 for user_class, spawn_count in user_classes_spawn_count.items():185 spawn(user_class, spawn_count)186 if wait:187 self.user_greenlets.join()188 logger.info("All users stopped\n")189 def stop_users(self, user_classes_stop_count: Dict[str, int]):190 async_calls_to_stop = Group()191 stop_group = Group()192 for user_class, stop_count in user_classes_stop_count.items():193 if self.user_classes_count[user_class] == 0:194 continue195 to_stop = []196 for user_greenlet in self.user_greenlets:197 if len(to_stop) == stop_count:198 break199 try:200 user = user_greenlet.args[0]201 except IndexError:202 logger.error(203 "While stopping users, we encountered a user that didnt have proper args %s", user_greenlet204 )205 continue206 if isinstance(user, self.user_classes_by_name[user_class]):207 to_stop.append(user)208 if not to_stop:209 continue210 while True:211 user_to_stop: User = to_stop.pop()212 logger.debug("Stopping %s" % user_to_stop.greenlet.name)213 if user_to_stop.greenlet is greenlet.getcurrent():214 # User called runner.quit(), so don't block waiting for killing to finish215 user_to_stop.group.killone(user_to_stop.greenlet, block=False)216 elif self.environment.stop_timeout:217 async_calls_to_stop.add(gevent.spawn_later(0, user_to_stop.stop, force=False))218 stop_group.add(user_to_stop.greenlet)219 else:220 async_calls_to_stop.add(gevent.spawn_later(0, user_to_stop.stop, force=True))221 if not to_stop:222 break223 async_calls_to_stop.join()224 if not stop_group.join(timeout=self.environment.stop_timeout):225 logger.info(226 "Not all users finished their tasks & terminated in %s seconds. Stopping them..."227 % self.environment.stop_timeout228 )229 stop_group.kill(block=True)230 logger.debug(231 "%g users have been stopped, %g still running", sum(user_classes_stop_count.values()), self.user_count232 )233 def monitor_cpu(self):234 process = psutil.Process()235 while True:236 self.current_cpu_usage = process.cpu_percent()237 if self.current_cpu_usage > 90 and not self.cpu_warning_emitted:238 logging.warning(239 "CPU usage above 90%! This may constrain your throughput and may even give inconsistent response time measurements! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines"240 )241 self.cpu_warning_emitted = True242 gevent.sleep(CPU_MONITOR_INTERVAL)243 def start(self, user_count: int, spawn_rate: float, wait: bool = False):244 """245 Start running a load test246 :param user_count: Total number of users to start247 :param spawn_rate: Number of users to spawn per second248 :param wait: If True calls to this method will block until all users are spawned.249 If False (the default), a greenlet that spawns the users will be250 started and the call to this method will return immediately.251 """252 if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:253 self.stats.clear_all()254 self.exceptions = {}255 self.cpu_warning_emitted = False256 self.worker_cpu_warning_emitted = False257 self.environment.events.test_start.fire(environment=self.environment)258 if wait and user_count - self.user_count > spawn_rate:259 raise ValueError("wait is True but the amount of users to add is greater than the spawn rate")260 for user_class in self.user_classes:261 if self.environment.host is not None:262 user_class.host = self.environment.host263 if self.state != STATE_INIT and self.state != STATE_STOPPED:264 self.update_state(STATE_SPAWNING)265 if self._users_dispatcher is None:266 self._users_dispatcher = UsersDispatcher(267 worker_nodes=[self._local_worker_node], user_classes=self.user_classes268 )269 logger.info("Ramping to %d users at a rate of %.2f per second" % (user_count, spawn_rate))270 self._users_dispatcher.new_dispatch(user_count, spawn_rate)271 try:272 for dispatched_users in self._users_dispatcher:273 user_classes_spawn_count = {}274 user_classes_stop_count = {}275 user_classes_count = dispatched_users[self._local_worker_node.id]276 logger.debug("Ramping to %s" % _format_user_classes_count_for_log(user_classes_count))277 for user_class, user_class_count in user_classes_count.items():278 if self.user_classes_count[user_class] > user_class_count:279 user_classes_stop_count[user_class] = self.user_classes_count[user_class] - user_class_count280 elif self.user_classes_count[user_class] < user_class_count:281 user_classes_spawn_count[user_class] = user_class_count - self.user_classes_count[user_class]282 if wait:283 # spawn_users will block, so we need to call stop_users first284 self.stop_users(user_classes_stop_count)285 self.spawn_users(user_classes_spawn_count, wait)286 else:287 # call spawn_users before stopping the users since stop_users288 # can be blocking because of the stop_timeout289 self.spawn_users(user_classes_spawn_count, wait)290 self.stop_users(user_classes_stop_count)291 self._local_worker_node.user_classes_count = next(iter(dispatched_users.values()))292 except KeyboardInterrupt:293 # TODO: Find a cleaner way to handle that294 # We need to catch keyboard interrupt. Otherwise, if KeyboardInterrupt is received while in295 # a gevent.sleep inside the dispatch_users function, locust won't gracefully shutdown.296 self.quit()297 logger.info("All users spawned: %s" % _format_user_classes_count_for_log(self.user_classes_count))298 self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values()))299 def start_shape(self):300 if self.shape_greenlet:301 logger.info("There is an ongoing shape test running. Editing is disabled")302 return303 logger.info("Shape test starting. User count and spawn rate are ignored for this type of load test")304 self.update_state(STATE_INIT)305 self.shape_greenlet = self.greenlet.spawn(self.shape_worker)306 self.shape_greenlet.link_exception(greenlet_exception_handler)307 self.environment.shape_class.reset_time()308 def shape_worker(self):309 logger.info("Shape worker starting")310 while self.state == STATE_INIT or self.state == STATE_SPAWNING or self.state == STATE_RUNNING:311 new_state = self.environment.shape_class.tick()312 if new_state is None:313 logger.info("Shape test stopping")314 if self.environment.parsed_options and self.environment.parsed_options.headless:315 self.quit()316 else:317 self.stop()318 self.shape_greenlet = None319 self.shape_last_state = None320 return321 elif self.shape_last_state == new_state:322 gevent.sleep(1)323 else:324 user_count, spawn_rate = new_state325 logger.info("Shape test updating to %d users at %.2f spawn rate" % (user_count, spawn_rate))326 # TODO: This `self.start()` call is blocking until the ramp-up is completed. This can leads327 # to unexpected behaviours such as the one in the following example:328 # A load test shape has the following stages:329 # stage 1: (user_count=100, spawn_rate=1) for t < 50s330 # stage 2: (user_count=120, spawn_rate=1) for t < 100s331 # stage 3: (user_count=130, spawn_rate=1) for t < 120s332 # Because the first stage will take 100s to complete, the second stage333 # will be skipped completely because the shape worker will be blocked334 # at the `self.start()` of the first stage.335 # Of couse, this isn't a problem if the load test shape is well-defined.336 # We should probably use a `gevent.timeout` with a duration a little over337 # `(user_count - prev_user_count) / spawn_rate` in order to limit the runtime338 # of each load test shape stage.339 self.start(user_count=user_count, spawn_rate=spawn_rate)340 self.shape_last_state = new_state341 def stop(self):342 """343 Stop a running load test by stopping all running users344 """345 if self.state == STATE_STOPPED:346 return347 logger.debug("Stopping all users")348 self.update_state(STATE_CLEANUP)349 # if we are currently spawning users we need to kill the spawning greenlet first350 if self.spawning_greenlet and not self.spawning_greenlet.ready():351 self.spawning_greenlet.kill(block=True)352 if self.environment.shape_class is not None and self.shape_greenlet is not greenlet.getcurrent():353 # If the test was not started yet and locust is354 # stopped/quit, shape_greenlet will be None.355 if self.shape_greenlet is not None:356 self.shape_greenlet.kill(block=True)357 self.shape_greenlet = None358 self.shape_last_state = None359 self.stop_users(self.user_classes_count)360 self.update_state(STATE_STOPPED)361 self.cpu_log_warning()362 self.environment.events.test_stop.fire(environment=self.environment)363 def quit(self):364 """365 Stop any running load test and kill all greenlets for the runner366 """367 self.stop()368 self.greenlet.kill(block=True)369 def log_exception(self, node_id, msg, formatted_tb):370 key = hash(formatted_tb)371 row = self.exceptions.setdefault(key, {"count": 0, "msg": msg, "traceback": formatted_tb, "nodes": set()})372 row["count"] += 1373 row["nodes"].add(node_id)374 self.exceptions[key] = row375 @property376 def target_user_count(self) -> int:377 return sum(self.target_user_classes_count.values())378 def register_message(self, msg_type, listener):379 """380 Register a listener for a custom message from another node381 :param msg_type: The type of the message to listen for382 :param listener: The function to execute when the message is received383 """384 self.custom_messages[msg_type] = listener385class LocalRunner(Runner):386 """387 Runner for running single process load test388 """389 def __init__(self, environment):390 """391 :param environment: Environment instance392 """393 super().__init__(environment)394 # register listener thats logs the exception for the local runner395 def on_user_error(user_instance, exception, tb):396 formatted_tb = "".join(traceback.format_tb(tb))397 self.log_exception("local", str(exception), formatted_tb)398 self.environment.events.user_error.add_listener(on_user_error)399 def start(self, user_count: int, spawn_rate: float, wait: bool = False):400 if spawn_rate > 100:401 logger.warning(402 "Your selected spawn rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"403 )404 if self.spawning_greenlet:405 # kill existing spawning_greenlet before we start a new one406 self.spawning_greenlet.kill(block=True)407 self.spawning_greenlet = self.greenlet.spawn(408 lambda: super(LocalRunner, self).start(user_count, spawn_rate, wait=wait)409 )410 self.spawning_greenlet.link_exception(greenlet_exception_handler)411 def stop(self):412 if self.state == STATE_STOPPED:413 return414 super().stop()415 def send_message(self, msg_type, data=None):416 """417 Emulates internodal messaging by calling registered listeners418 :param msg_type: The type of the message to emulate sending419 :param data: Optional data to include420 """421 logger.debug(f"Running locally: sending {msg_type} message to self")422 if msg_type in self.custom_messages:423 listener = self.custom_messages[msg_type]424 msg = Message(msg_type, data, "local")425 listener(environment=self.environment, msg=msg)426 else:427 logger.warning(f"Unknown message type recieved: {msg_type}")428class DistributedRunner(Runner):429 def __init__(self, *args, **kwargs):430 super().__init__(*args, **kwargs)431 self._local_worker_node = None432 setup_distributed_stats_event_listeners(self.environment.events, self.stats)433class WorkerNode:434 def __init__(self, id: str, state=STATE_INIT, heartbeat_liveness=HEARTBEAT_LIVENESS):435 self.id: str = id436 self.state = state437 self.heartbeat = heartbeat_liveness438 self.cpu_usage = 0439 self.cpu_warning_emitted = False440 # The reported users running on the worker441 self.user_classes_count: Dict[str, int] = {}442 @property443 def user_count(self) -> int:444 return sum(self.user_classes_count.values())445class WorkerNodes(MutableMapping):446 def __init__(self):447 self._worker_nodes = {}448 def get_by_state(self, state) -> List[WorkerNode]:449 return [c for c in self.values() if c.state == state]450 @property451 def all(self) -> ValuesView[WorkerNode]:452 return self.values()453 @property454 def ready(self) -> List[WorkerNode]:455 return self.get_by_state(STATE_INIT)456 @property457 def spawning(self) -> List[WorkerNode]:458 return self.get_by_state(STATE_SPAWNING)459 @property460 def running(self) -> List[WorkerNode]:461 return self.get_by_state(STATE_RUNNING)462 @property463 def missing(self) -> List[WorkerNode]:464 return self.get_by_state(STATE_MISSING)465 def __setitem__(self, k: str, v: WorkerNode) -> None:466 self._worker_nodes[k] = v467 def __delitem__(self, k: str) -> None:468 del self._worker_nodes[k]469 def __getitem__(self, k: str) -> WorkerNode:470 return self._worker_nodes[k]471 def __len__(self) -> int:472 return len(self._worker_nodes)473 def __iter__(self) -> Iterator[WorkerNode]:474 return iter(self._worker_nodes)475class MasterRunner(DistributedRunner):476 """477 Runner used to run distributed load tests across multiple processes and/or machines.478 MasterRunner doesn't spawn any user greenlets itself. Instead it expects479 :class:`WorkerRunners <WorkerRunner>` to connect to it, which it will then direct480 to start and stop user greenlets. Stats sent back from the481 :class:`WorkerRunners <WorkerRunner>` will aggregated.482 """483 def __init__(self, environment, master_bind_host, master_bind_port):484 """485 :param environment: Environment instance486 :param master_bind_host: Host/interface to use for incoming worker connections487 :param master_bind_port: Port to use for incoming worker connections488 """489 super().__init__(environment)490 self.worker_cpu_warning_emitted = False491 self.master_bind_host = master_bind_host492 self.master_bind_port = master_bind_port493 self.spawn_rate: float = 0494 self.clients = WorkerNodes()495 try:496 self.server = rpc.Server(master_bind_host, master_bind_port)497 except RPCError as e:498 if e.args[0] == "Socket bind failure: Address already in use":499 port_string = (500 master_bind_host + ":" + str(master_bind_port) if master_bind_host != "*" else str(master_bind_port)501 )502 logger.error(503 f"The Locust master port ({port_string}) was busy. Close any applications using that port - perhaps an old instance of Locust master is still running? ({e.args[0]})"504 )505 sys.exit(1)506 else:507 raise508 self._users_dispatcher: Union[UsersDispatcher, None] = None509 self.greenlet.spawn(self.heartbeat_worker).link_exception(greenlet_exception_handler)510 self.greenlet.spawn(self.client_listener).link_exception(greenlet_exception_handler)511 # listener that gathers info on how many users the worker has spawned512 def on_worker_report(client_id, data):513 if client_id not in self.clients:514 logger.info("Discarded report from unrecognized worker %s", client_id)515 return516 self.clients[client_id].user_classes_count = data["user_classes_count"]517 self.environment.events.worker_report.add_listener(on_worker_report)518 # register listener that sends quit message to worker nodes519 def on_quitting(environment, **kw):520 self.quit()521 self.environment.events.quitting.add_listener(on_quitting)522 @property523 def user_count(self) -> int:524 return sum(c.user_count for c in self.clients.values())525 def cpu_log_warning(self):526 warning_emitted = Runner.cpu_log_warning(self)527 if self.worker_cpu_warning_emitted:528 logger.warning("CPU usage threshold was exceeded on workers during the test!")529 warning_emitted = True530 return warning_emitted531 def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:532 num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning)533 if not num_workers:534 logger.warning(535 "You are running in distributed mode but have no worker servers connected. "536 "Please connect workers prior to swarming."537 )538 return539 for user_class in self.user_classes:540 if self.environment.host is not None:541 user_class.host = self.environment.host542 self.spawn_rate = spawn_rate543 if self._users_dispatcher is None:544 self._users_dispatcher = UsersDispatcher(545 worker_nodes=list(self.clients.values()), user_classes=self.user_classes546 )547 logger.info(548 "Sending spawn jobs of %d users at %.2f spawn rate to %d ready clients"549 % (user_count, spawn_rate, num_workers)550 )551 worker_spawn_rate = float(spawn_rate) / (num_workers or 1)552 if worker_spawn_rate > 100:553 logger.warning(554 "Your selected spawn rate is very high (>100/worker), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"555 )556 if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:557 self.stats.clear_all()558 self.exceptions = {}559 self.environment.events.test_start.fire(environment=self.environment)560 if self.environment.shape_class:561 self.environment.shape_class.reset_time()562 self.update_state(STATE_SPAWNING)563 self._users_dispatcher.new_dispatch(target_user_count=user_count, spawn_rate=spawn_rate)564 try:565 for dispatched_users in self._users_dispatcher:566 dispatch_greenlets = Group()567 for worker_node_id, worker_user_classes_count in dispatched_users.items():568 data = {569 "timestamp": time.time(),570 "user_classes_count": worker_user_classes_count,571 "host": self.environment.host,572 "stop_timeout": self.environment.stop_timeout,573 "parsed_options": vars(self.environment.parsed_options)574 if self.environment.parsed_options575 else {},576 }577 dispatch_greenlets.add(578 gevent.spawn_later(579 0,580 self.server.send_to_client,581 Message("spawn", data, worker_node_id),582 )583 )584 dispatched_user_count = sum(map(sum, map(methodcaller("values"), dispatched_users.values())))585 logger.debug(586 "Sending spawn messages for %g total users to %i client(s)",587 dispatched_user_count,588 len(dispatch_greenlets),589 )590 dispatch_greenlets.join()591 logger.debug(592 "Currently spawned users: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count)593 )594 self.target_user_classes_count = _aggregate_dispatched_users(dispatched_users)595 except KeyboardInterrupt:596 # TODO: Find a cleaner way to handle that597 # We need to catch keyboard interrupt. Otherwise, if KeyboardInterrupt is received while in598 # a gevent.sleep inside the dispatch_users function, locust won't gracefully shutdown.599 self.quit()600 # Wait a little for workers to report their users to the master601 # so that we can give an accurate log message below and fire the `spawning_complete` event602 # when the user count is really at the desired value.603 timeout = gevent.Timeout(self._wait_for_workers_report_after_ramp_up())604 timeout.start()605 try:606 while self.user_count != self.target_user_count:607 gevent.sleep()608 except gevent.Timeout:609 pass610 finally:611 timeout.cancel()612 self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values()))613 logger.info("All users spawned: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count))614 @functools.lru_cache()615 def _wait_for_workers_report_after_ramp_up(self) -> float:616 """617 The amount of time to wait after a ramp-up in order for all the workers to report their state618 to the master. If not supplied by the user, it is 100ms by default. If the supplied value is a number,619 it is taken as-is. If the supplied value is a pattern like "some_number * WORKER_REPORT_INTERVAL",620 the value will be "some_number * WORKER_REPORT_INTERVAL". The most sensible value would be something621 like "1.25 * WORKER_REPORT_INTERVAL". However, some users might find it too high, so it is left622 to a really small value of 100ms by default.623 """624 locust_wait_for_workers_report_after_ramp_up = os.getenv("LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP")625 if locust_wait_for_workers_report_after_ramp_up is None:626 return 0.1627 match = re.search(628 r"^(?P<coeff>(\d+)|(\d+\.\d+))[ ]*\*[ ]*WORKER_REPORT_INTERVAL$",629 locust_wait_for_workers_report_after_ramp_up,630 )631 if match is None:632 assert float(locust_wait_for_workers_report_after_ramp_up) >= 0633 return float(locust_wait_for_workers_report_after_ramp_up)634 else:635 return float(match.group("coeff")) * WORKER_REPORT_INTERVAL636 def stop(self, send_stop_to_client: bool = True):637 if self.state not in [STATE_INIT, STATE_STOPPED, STATE_STOPPING]:638 logger.debug("Stopping...")639 self.update_state(STATE_STOPPING)640 if self.environment.shape_class is not None and self.shape_greenlet is not greenlet.getcurrent():641 self.shape_greenlet.kill(block=True)642 self.shape_greenlet = None643 self.shape_last_state = None644 self._users_dispatcher = None645 if send_stop_to_client:646 for client in self.clients.all:647 logger.debug("Sending stop message to client %s" % client.id)648 self.server.send_to_client(Message("stop", None, client.id))649 # Give an additional 60s for all workers to stop650 timeout = gevent.Timeout(self.environment.stop_timeout or 0 + 60)651 timeout.start()652 try:653 while self.user_count != 0:654 gevent.sleep(1)655 except gevent.Timeout:656 logger.error("Timeout waiting for all workers to stop")657 finally:658 timeout.cancel()659 self.environment.events.test_stop.fire(environment=self.environment)660 def quit(self):661 self.stop(send_stop_to_client=False)662 logger.debug("Quitting...")663 for client in self.clients.all:664 logger.debug("Sending quit message to client %s" % (client.id))665 self.server.send_to_client(Message("quit", None, client.id))666 gevent.sleep(0.5) # wait for final stats report from all workers667 self.greenlet.kill(block=True)668 def check_stopped(self):669 if (670 not self.state == STATE_INIT671 and not self.state == STATE_STOPPED672 and all(map(lambda x: x.state not in (STATE_RUNNING, STATE_SPAWNING, STATE_INIT), self.clients.all))673 ):674 self.update_state(STATE_STOPPED)675 def heartbeat_worker(self):676 while True:677 gevent.sleep(HEARTBEAT_INTERVAL)678 if self.connection_broken:679 self.reset_connection()680 continue681 for client in self.clients.all:682 if client.heartbeat < 0 and client.state != STATE_MISSING:683 logger.info("Worker %s failed to send heartbeat, setting state to missing." % str(client.id))684 client.state = STATE_MISSING685 client.user_classes_count = {}686 if self._users_dispatcher is not None:687 self._users_dispatcher.remove_worker(client)688 # TODO: If status is `STATE_RUNNING`, call self.start()689 if self.worker_count <= 0:690 logger.info("The last worker went missing, stopping test.")691 self.stop()692 self.check_stopped()693 else:694 client.heartbeat -= 1695 def reset_connection(self):696 logger.info("Reset connection to worker")697 try:698 self.server.close()699 self.server = rpc.Server(self.master_bind_host, self.master_bind_port)700 except RPCError as e:701 logger.error("Temporary failure when resetting connection: %s, will retry later." % (e))702 def client_listener(self):703 while True:704 try:705 client_id, msg = self.server.recv_from_client()706 except RPCError as e:707 logger.error("RPCError found when receiving from client: %s" % (e))708 self.connection_broken = True709 gevent.sleep(FALLBACK_INTERVAL)710 continue711 self.connection_broken = False712 msg.node_id = client_id713 if msg.type == "client_ready":714 if not msg.data:715 logger.error(f"An old (pre 2.0) worker tried to connect ({client_id}). That's not going to work.")716 continue717 elif msg.data != __version__ and msg.data != -1:718 logger.warning(719 f"A worker ({client_id}) running a different version ({msg.data}) connected, master version is {__version__}"720 )721 worker_node_id = msg.node_id722 self.clients[worker_node_id] = WorkerNode(worker_node_id, heartbeat_liveness=HEARTBEAT_LIVENESS)723 if self._users_dispatcher is not None:724 self._users_dispatcher.add_worker(worker_node=self.clients[worker_node_id])725 if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:726 # TODO: Test this situation727 self.start(self.target_user_count, self.spawn_rate)728 logger.info(729 "Client %r reported as ready. Currently %i clients ready to swarm."730 % (worker_node_id, len(self.clients.ready + self.clients.running + self.clients.spawning))731 )732 # if self.state == STATE_RUNNING or self.state == STATE_SPAWNING:733 # # TODO: Necessary now that UsersDispatcher handles that?734 # # balance the load distribution when new client joins735 # self.start(self.target_user_count, self.spawn_rate)736 # emit a warning if the worker's clock seem to be out of sync with our clock737 # if abs(time() - msg.data["time"]) > 5.0:738 # warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")739 elif msg.type == "client_stopped":740 client = self.clients[msg.node_id]741 del self.clients[msg.node_id]742 if self._users_dispatcher is not None:743 self._users_dispatcher.remove_worker(client)744 if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:745 # TODO: Test this situation746 self.start(self.target_user_count, self.spawn_rate)747 logger.info("Removing %s client from running clients" % (msg.node_id))748 elif msg.type == "heartbeat":749 if msg.node_id in self.clients:750 c = self.clients[msg.node_id]751 c.heartbeat = HEARTBEAT_LIVENESS752 client_state = msg.data["state"]753 if c.state == STATE_MISSING:754 logger.info(755 "Worker %s self-healed with heartbeat, setting state to %s." % (str(c.id), client_state)756 )757 if self._users_dispatcher is not None:758 self._users_dispatcher.add_worker(worker_node=c)759 if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:760 # TODO: Test this situation761 self.start(self.target_user_count, self.spawn_rate)762 c.state = client_state763 c.cpu_usage = msg.data["current_cpu_usage"]764 if not c.cpu_warning_emitted and c.cpu_usage > 90:765 self.worker_cpu_warning_emitted = True # used to fail the test in the end766 c.cpu_warning_emitted = True # used to suppress logging for this node767 logger.warning(768 "Worker %s exceeded cpu threshold (will only log this once per worker)" % (msg.node_id)769 )770 elif msg.type == "stats":771 self.environment.events.worker_report.fire(client_id=msg.node_id, data=msg.data)772 elif msg.type == "spawning":773 self.clients[msg.node_id].state = STATE_SPAWNING774 elif msg.type == "spawning_complete":775 self.clients[msg.node_id].state = STATE_RUNNING776 self.clients[msg.node_id].user_classes_count = msg.data["user_classes_count"]777 elif msg.type == "quit":778 if msg.node_id in self.clients:779 client = self.clients[msg.node_id]780 del self.clients[msg.node_id]781 if self._users_dispatcher is not None:782 self._users_dispatcher.remove_worker(client)783 if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:784 # TODO: Test this situation785 self.start(self.target_user_count, self.spawn_rate)786 logger.info(787 "Client %r quit. Currently %i clients connected." % (msg.node_id, len(self.clients.ready))788 )789 if self.worker_count - len(self.clients.missing) <= 0:790 logger.info("The last worker quit, stopping test.")791 self.stop()792 if self.environment.parsed_options and self.environment.parsed_options.headless:793 self.quit()794 elif msg.type == "exception":795 self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])796 elif msg.type in self.custom_messages:797 logger.debug(f"Recieved {msg.type} message from worker {msg.node_id}")798 self.custom_messages[msg.type](environment=self.environment, msg=msg)799 else:800 logger.warning(f"Unknown message type recieved from worker {msg.node_id}: {msg.type}")801 self.check_stopped()802 @property803 def worker_count(self):804 return len(self.clients.ready) + len(self.clients.spawning) + len(self.clients.running)805 @property806 def reported_user_classes_count(self) -> Dict[str, int]:807 reported_user_classes_count = defaultdict(lambda: 0)808 for client in self.clients.ready + self.clients.spawning + self.clients.running:809 for name, count in client.user_classes_count.items():810 reported_user_classes_count[name] += count811 return reported_user_classes_count812 def send_message(self, msg_type, data=None, client_id=None):813 """814 Sends a message to attached worker node(s)815 :param msg_type: The type of the message to send816 :param data: Optional data to send817 :param client_id: Optional id of the target worker node.818 If None, will send to all attached workers819 """820 if client_id:821 logger.debug(f"Sending {msg_type} message to client {client_id}")822 self.server.send_to_client(Message(msg_type, data, client_id))823 else:824 for client in self.clients.all:825 logger.debug(f"Sending {msg_type} message to client {client.id}")826 self.server.send_to_client(Message(msg_type, data, client.id))827class WorkerRunner(DistributedRunner):828 """829 Runner used to run distributed load tests across multiple processes and/or machines.830 WorkerRunner connects to a :class:`MasterRunner` from which it'll receive831 instructions to start and stop user greenlets. The WorkerRunner will periodically832 take the stats generated by the running users and send back to the :class:`MasterRunner`.833 """834 def __init__(self, environment, master_host, master_port):835 """836 :param environment: Environment instance837 :param master_host: Host/IP to use for connection to the master838 :param master_port: Port to use for connecting to the master839 """840 super().__init__(environment)841 self.worker_state = STATE_INIT842 self.client_id = socket.gethostname() + "_" + uuid4().hex843 self.master_host = master_host844 self.master_port = master_port845 self.worker_cpu_warning_emitted = False846 self._users_dispatcher = None847 self.client = rpc.Client(master_host, master_port, self.client_id)848 self.greenlet.spawn(self.heartbeat).link_exception(greenlet_exception_handler)849 self.greenlet.spawn(self.worker).link_exception(greenlet_exception_handler)850 self.client.send(Message("client_ready", __version__, self.client_id))851 self.greenlet.spawn(self.stats_reporter).link_exception(greenlet_exception_handler)852 # register listener for when all users have spawned, and report it to the master node853 def on_spawning_complete(user_count):854 assert user_count == sum(self.user_classes_count.values())855 self.client.send(856 Message(857 "spawning_complete",858 {"user_classes_count": self.user_classes_count, "user_count": self.user_count},859 self.client_id,860 )861 )862 self.worker_state = STATE_RUNNING863 self.environment.events.spawning_complete.add_listener(on_spawning_complete)864 # register listener that adds the current number of spawned users to the report that is sent to the master node865 def on_report_to_master(client_id, data):866 data["user_classes_count"] = self.user_classes_count867 data["user_count"] = self.user_count868 self.environment.events.report_to_master.add_listener(on_report_to_master)869 # register listener that sends quit message to master870 def on_quitting(environment, **kw):871 self.client.send(Message("quit", None, self.client_id))872 self.environment.events.quitting.add_listener(on_quitting)873 # register listener thats sends user exceptions to master874 def on_user_error(user_instance, exception, tb):875 formatted_tb = "".join(traceback.format_tb(tb))876 self.client.send(Message("exception", {"msg": str(exception), "traceback": formatted_tb}, self.client_id))877 self.environment.events.user_error.add_listener(on_user_error)878 def start(self, user_count, spawn_rate, wait=False):879 raise NotImplementedError("use start_worker")880 def start_worker(self, user_classes_count: Dict[str, int], **kwargs):881 """882 Start running a load test as a worker883 :param user_classes_count: Users to run884 """885 self.target_user_classes_count = user_classes_count886 if self.worker_state != STATE_RUNNING and self.worker_state != STATE_SPAWNING:887 self.stats.clear_all()888 self.exceptions = {}889 self.cpu_warning_emitted = False890 self.worker_cpu_warning_emitted = False891 self.environment.events.test_start.fire(environment=self.environment)892 self.worker_state = STATE_SPAWNING893 for user_class in self.user_classes:894 if self.environment.host is not None:895 user_class.host = self.environment.host896 user_classes_spawn_count = {}897 user_classes_stop_count = {}898 for user_class, user_class_count in user_classes_count.items():899 if self.user_classes_count[user_class] > user_class_count:900 user_classes_stop_count[user_class] = self.user_classes_count[user_class] - user_class_count901 elif self.user_classes_count[user_class] < user_class_count:902 user_classes_spawn_count[user_class] = user_class_count - self.user_classes_count[user_class]903 # call spawn_users before stopping the users since stop_users904 # can be blocking because of the stop_timeout905 self.spawn_users(user_classes_spawn_count)906 self.stop_users(user_classes_stop_count)907 self.environment.events.spawning_complete.fire(user_count=sum(self.user_classes_count.values()))908 def heartbeat(self):909 while True:910 try:911 self.client.send(912 Message(913 "heartbeat",914 {915 "state": self.worker_state,916 "current_cpu_usage": self.current_cpu_usage,917 },918 self.client_id,919 )920 )921 except RPCError as e:922 logger.error("RPCError found when sending heartbeat: %s" % (e))923 self.reset_connection()924 gevent.sleep(HEARTBEAT_INTERVAL)925 def reset_connection(self):926 logger.info("Reset connection to master")927 try:928 self.client.close()929 self.client = rpc.Client(self.master_host, self.master_port, self.client_id)930 except RPCError as e:931 logger.error("Temporary failure when resetting connection: %s, will retry later." % (e))932 def worker(self):933 last_received_spawn_timestamp = 0934 while True:935 try:936 msg = self.client.recv()937 except RPCError as e:938 logger.error("RPCError found when receiving from master: %s" % (e))939 continue940 if msg.type == "spawn":941 self.client.send(Message("spawning", None, self.client_id))942 job = msg.data943 if job["timestamp"] <= last_received_spawn_timestamp:944 logger.info(945 "Discard spawn message with older or equal timestamp than timestamp of previous spawn message"946 )947 continue948 self.environment.host = job["host"]949 self.environment.stop_timeout = job["stop_timeout"]950 # receive custom arguments951 if self.environment.parsed_options is None:952 default_parser = argument_parser.get_empty_argument_parser()953 argument_parser.setup_parser_arguments(default_parser)954 self.environment.parsed_options = default_parser.parse(args=[])955 custom_args_from_master = {956 k: v for k, v in job["parsed_options"].items() if k not in argument_parser.default_args_dict()957 }958 vars(self.environment.parsed_options).update(custom_args_from_master)959 if self.spawning_greenlet:960 # kill existing spawning greenlet before we launch new one961 self.spawning_greenlet.kill(block=True)962 self.spawning_greenlet = self.greenlet.spawn(lambda: self.start_worker(job["user_classes_count"]))963 self.spawning_greenlet.link_exception(greenlet_exception_handler)964 last_received_spawn_timestamp = job["timestamp"]965 elif msg.type == "stop":966 self.stop()967 self.client.send(Message("client_stopped", None, self.client_id))968 # +additional_wait is just a small buffer to account for the random network latencies and/or other969 # random delays inherent to distributed systems.970 additional_wait = int(os.getenv("LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP", 0))971 gevent.sleep((self.environment.stop_timeout or 0) + additional_wait)972 self.client.send(Message("client_ready", __version__, self.client_id))973 self.worker_state = STATE_INIT974 elif msg.type == "quit":975 logger.info("Got quit message from master, shutting down...")976 self.stop()977 self._send_stats() # send a final report, in case there were any samples not yet reported978 self.greenlet.kill(block=True)979 elif msg.type in self.custom_messages:980 logger.debug(f"Recieved {msg.type} message from master")981 self.custom_messages[msg.type](environment=self.environment, msg=msg)982 else:983 logger.warning(f"Unknown message type recieved: {msg.type}")984 def stats_reporter(self):985 while True:986 try:987 self._send_stats()988 except RPCError as e:989 logger.error("Temporary connection lost to master server: %s, will retry later." % (e))990 gevent.sleep(WORKER_REPORT_INTERVAL)991 def send_message(self, msg_type, data=None):992 """993 Sends a message to master node994 :param msg_type: The type of the message to send995 :param data: Optional data to send996 """997 logger.debug(f"Sending {msg_type} message to master")998 self.client.send(Message(msg_type, data, self.client_id))999 def _send_stats(self):1000 data = {}1001 self.environment.events.report_to_master.fire(client_id=self.client_id, data=data)1002 self.client.send(Message("stats", data, self.client_id))1003def _format_user_classes_count_for_log(user_classes_count: Dict[str, int]) -> str:1004 return "{} ({} total users)".format(1005 json.dumps(dict(sorted(user_classes_count.items(), key=itemgetter(0)))),1006 sum(user_classes_count.values()),1007 )1008def _aggregate_dispatched_users(d: Dict[str, Dict[str, int]]) -> Dict[str, int]:1009 # TODO: Test it1010 user_classes = list(next(iter(d.values())).keys())...
eventbus.py
Source:eventbus.py
1import logging2_log = logging.getLogger(__name__)3class Event(object):4 def __init__(self, data=None):5 self.data = data6class EventBus(object):7 def __init__(self):8 self.listeners = {}9 self.queue = []10 self.dispatch_in_progress = False11 def register(self, event_type, callback):12 if not issubclass(event_type, Event):13 raise TypeError("Must be a subclass of Event")14 if event_type not in self.listeners:15 self.listeners[event_type] = set()16 self.listeners[event_type].add(callback)17 _log.debug("Registering for {0}".format(event_type.__name__))18 _log.debug(self._get_stats())19 def unregister(self, event_type, callback):20 if event_type not in self.listeners:21 raise LookupError("Noone listens to '{0}'".format(event_type))22 group = self.listeners[event_type]23 if callback not in group:24 raise LookupError("Listener '{0}' wasn't registered "25 "for '{1}'".format(callback, event_type))26 group.remove(callback)27 if len(group) == 0:28 del self.listeners[event_type]29 _log.debug("Un-registering for {0}".format(event_type.__name__))30 _log.debug(self._get_stats())31 def dispatch(self, event):32 if not isinstance(event, Event):33 raise TypeError("Must subclass Event")34 # TODO: check for infinite dispatch loops35 # add event to queue36 self.queue += [event]37 # if dispatch is currently in progress just leave event on queue38 # it will be served by currently running method39 if self.dispatch_in_progress:40 _log.debug("Event {0} added to queue (dispatch already in "41 "progress, exiting)".format(event.__class__.__name__))42 return43 _log.debug("Event {0} added to queue, starting "44 "dispatch".format(event.__class__.__name__))45 # lock to prevent other calls46 self.dispatch_in_progress = True47 while self.queue:48 event = self.queue.pop(0)49 # gather all callbacks registered for event50 callbacks = [cb for cb in self.listeners.get(event.__class__, [])]51 _log.debug("Invoking {0} callbacks for {1}".format(52 len(callbacks), event.__class__.__name__))53 # perform gathered calls54 [cb(event) for cb in callbacks]55 # unlock56 self.dispatch_in_progress = False57 _log.debug("Dispatch done".format(event.__class__.__name__))58 def _get_stats(self):59 groups = [(evt.__name__, len(grp))60 for evt, grp in self.listeners.items()]61 total = sum(l for _, l in groups)62 return "Stats: {total} registered for {groups} events: {items}".format(63 groups=len(groups), total=total, items=', '.join(...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!