Best Python code snippet using locust
main.py
Source:main.py
1import inspect2import logging3import os4import importlib5import signal6import socket7import sys8import time9import gevent10import locust11from . import log12from .argument_parser import parse_locustfile_option, parse_options13from .env import Environment14from .log import setup_logging, greenlet_exception_logger15from . import stats16from .stats import print_error_report, print_percentile_stats, print_stats, stats_printer, stats_history17from .stats import StatsCSV, StatsCSVFileWriter18from .user import User19from .user.inspectuser import get_task_ratio_dict, print_task_ratio20from .util.timespan import parse_timespan21from .exception import AuthCredentialsError22from .shape import LoadTestShape23from .input_events import input_listener24version = locust.__version__25def is_user_class(item):26 """27 Check if a variable is a runnable (non-abstract) User class28 """29 return bool(inspect.isclass(item) and issubclass(item, User) and item.abstract is False)30def is_shape_class(item):31 """32 Check if a class is a LoadTestShape33 """34 return bool(35 inspect.isclass(item) and issubclass(item, LoadTestShape) and item.__dict__["__module__"] != "locust.shape"36 )37def load_locustfile(path):38 """39 Import given locustfile path and return (docstring, callables).40 Specifically, the locustfile's ``__doc__`` attribute (a string) and a41 dictionary of ``{'name': callable}`` containing all callables which pass42 the "is a Locust" test.43 """44 # Start with making sure the current working dir is in the sys.path45 sys.path.insert(0, os.getcwd())46 # Get directory and locustfile name47 directory, locustfile = os.path.split(path)48 # If the directory isn't in the PYTHONPATH, add it so our import will work49 added_to_path = False50 index = None51 if directory not in sys.path:52 sys.path.insert(0, directory)53 added_to_path = True54 # If the directory IS in the PYTHONPATH, move it to the front temporarily,55 # otherwise other locustfiles -- like Locusts's own -- may scoop the intended56 # one.57 else:58 i = sys.path.index(directory)59 if i != 0:60 # Store index for later restoration61 index = i62 # Add to front, then remove from original position63 sys.path.insert(0, directory)64 del sys.path[i + 1]65 # Perform the import66 source = importlib.machinery.SourceFileLoader(os.path.splitext(locustfile)[0], path)67 imported = source.load_module()68 # Remove directory from path if we added it ourselves (just to be neat)69 if added_to_path:70 del sys.path[0]71 # Put back in original index if we moved it72 if index is not None:73 sys.path.insert(index + 1, directory)74 del sys.path[0]75 # Return our two-tuple76 user_classes = {name: value for name, value in vars(imported).items() if is_user_class(value)}77 # Find shape class, if any, return it78 shape_classes = [value for name, value in vars(imported).items() if is_shape_class(value)]79 if shape_classes:80 shape_class = shape_classes[0]()81 else:82 shape_class = None83 return imported.__doc__, user_classes, shape_class84def create_environment(user_classes, options, events=None, shape_class=None):85 """86 Create an Environment instance from options87 """88 return Environment(89 user_classes=user_classes,90 shape_class=shape_class,91 tags=options.tags,92 exclude_tags=options.exclude_tags,93 events=events,94 host=options.host,95 reset_stats=options.reset_stats,96 stop_timeout=options.stop_timeout,97 parsed_options=options,98 )99def main():100 # find specified locustfile and make sure it exists, using a very simplified101 # command line parser that is only used to parse the -f option102 locustfile = parse_locustfile_option()103 # import the locustfile104 docstring, user_classes, shape_class = load_locustfile(locustfile)105 # parse all command line options106 options = parse_options()107 if options.headful:108 options.headless = False109 if options.slave or options.expect_slaves:110 sys.stderr.write("The --slave/--expect-slaves parameters have been renamed --worker/--expect-workers\n")111 sys.exit(1)112 if options.step_time or options.step_load or options.step_users or options.step_clients:113 sys.stderr.write(114 "The step load feature was removed in Locust 1.3. You can achieve similar results using a LoadTestShape class. See https://docs.locust.io/en/stable/generating-custom-load-shape.html\n"115 )116 sys.exit(1)117 if options.hatch_rate:118 sys.stderr.write("[DEPRECATED] The --hatch-rate parameter has been renamed --spawn-rate\n")119 options.spawn_rate = options.hatch_rate120 # setup logging121 if not options.skip_log_setup:122 if options.loglevel.upper() in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]:123 setup_logging(options.loglevel, options.logfile)124 else:125 sys.stderr.write("Invalid --loglevel. Valid values are: DEBUG/INFO/WARNING/ERROR/CRITICAL\n")126 sys.exit(1)127 logger = logging.getLogger(__name__)128 greenlet_exception_handler = greenlet_exception_logger(logger)129 if options.list_commands:130 print("Available Users:")131 for name in user_classes:132 print(" " + name)133 sys.exit(0)134 if not user_classes:135 logger.error("No User class found!")136 sys.exit(1)137 # make sure specified User exists138 if options.user_classes:139 missing = set(options.user_classes) - set(user_classes.keys())140 if missing:141 logger.error("Unknown User(s): %s\n" % (", ".join(missing)))142 sys.exit(1)143 else:144 names = set(options.user_classes) & set(user_classes.keys())145 user_classes = [user_classes[n] for n in names]146 else:147 # list() call is needed to consume the dict_view object in Python 3148 user_classes = list(user_classes.values())149 if os.name != "nt" and not options.master:150 try:151 import resource152 minimum_open_file_limit = 10000153 current_open_file_limit = resource.getrlimit(resource.RLIMIT_NOFILE)[0]154 if current_open_file_limit < minimum_open_file_limit:155 # Increasing the limit to 10000 within a running process should work on at least MacOS.156 # It does not work on all OS:es, but we should be no worse off for trying.157 resource.setrlimit(resource.RLIMIT_NOFILE, [minimum_open_file_limit, resource.RLIM_INFINITY])158 except BaseException:159 logger.warning(160 (161 f"System open file limit '{current_open_file_limit}' is below minimum setting '{minimum_open_file_limit}'. "162 "It's not high enough for load testing, and the OS didn't allow locust to increase it by itself. "163 "See https://github.com/locustio/locust/wiki/Installation#increasing-maximum-number-of-open-files-limit for more info."164 )165 )166 # create locust Environment167 environment = create_environment(user_classes, options, events=locust.events, shape_class=shape_class)168 if shape_class and (options.num_users or options.spawn_rate):169 logger.warning(170 "The specified locustfile contains a shape class but a conflicting argument was specified: users or spawn-rate. Ignoring arguments"171 )172 if options.show_task_ratio:173 print("\n Task ratio per User class")174 print("-" * 80)175 print_task_ratio(user_classes)176 print("\n Total task ratio")177 print("-" * 80)178 print_task_ratio(user_classes, total=True)179 sys.exit(0)180 if options.show_task_ratio_json:181 from json import dumps182 task_data = {183 "per_class": get_task_ratio_dict(user_classes),184 "total": get_task_ratio_dict(user_classes, total=True),185 }186 print(dumps(task_data))187 sys.exit(0)188 if options.master:189 runner = environment.create_master_runner(190 master_bind_host=options.master_bind_host,191 master_bind_port=options.master_bind_port,192 )193 elif options.worker:194 try:195 runner = environment.create_worker_runner(options.master_host, options.master_port)196 logger.debug("Connected to locust master: %s:%s", options.master_host, options.master_port)197 except socket.error as e:198 logger.error("Failed to connect to the Locust master: %s", e)199 sys.exit(-1)200 else:201 runner = environment.create_local_runner()202 # main_greenlet is pointing to runners.greenlet by default, it will point the web greenlet later if in web mode203 main_greenlet = runner.greenlet204 if options.run_time:205 if not options.headless:206 logger.error("The --run-time argument can only be used together with --headless")207 sys.exit(1)208 if options.worker:209 logger.error("--run-time should be specified on the master node, and not on worker nodes")210 sys.exit(1)211 try:212 options.run_time = parse_timespan(options.run_time)213 except ValueError:214 logger.error("Valid --run-time formats are: 20, 20s, 3m, 2h, 1h20m, 3h30m10s, etc.")215 sys.exit(1)216 if options.csv_prefix:217 stats_csv_writer = StatsCSVFileWriter(218 environment, stats.PERCENTILES_TO_REPORT, options.csv_prefix, options.stats_history_enabled219 )220 else:221 stats_csv_writer = StatsCSV(environment, stats.PERCENTILES_TO_REPORT)222 # start Web UI223 if not options.headless and not options.worker:224 # spawn web greenlet225 protocol = "https" if options.tls_cert and options.tls_key else "http"226 try:227 if options.web_host == "*":228 # special check for "*" so that we're consistent with --master-bind-host229 web_host = ""230 else:231 web_host = options.web_host232 if web_host:233 logger.info("Starting web interface at %s://%s:%s" % (protocol, web_host, options.web_port))234 else:235 logger.info(236 "Starting web interface at %s://0.0.0.0:%s (accepting connections from all network interfaces)"237 % (protocol, options.web_port)238 )239 web_ui = environment.create_web_ui(240 host=web_host,241 port=options.web_port,242 auth_credentials=options.web_auth,243 tls_cert=options.tls_cert,244 tls_key=options.tls_key,245 stats_csv_writer=stats_csv_writer,246 delayed_start=True,247 )248 except AuthCredentialsError:249 logger.error("Credentials supplied with --web-auth should have the format: username:password")250 sys.exit(1)251 else:252 web_ui = None253 # Fire locust init event which can be used by end-users' code to run setup code that254 # need access to the Environment, Runner or WebUI.255 environment.events.init.fire(environment=environment, runner=runner, web_ui=web_ui)256 if web_ui:257 web_ui.start()258 main_greenlet = web_ui.greenlet259 if options.headless:260 # headless mode261 if options.master:262 # wait for worker nodes to connect263 while len(runner.clients.ready) < options.expect_workers:264 logging.info(265 "Waiting for workers to be ready, %s of %s connected",266 len(runner.clients.ready),267 options.expect_workers,268 )269 time.sleep(1)270 if not options.worker:271 # apply headless mode defaults272 if options.num_users is None:273 options.num_users = 1274 if options.spawn_rate is None:275 options.spawn_rate = 1276 # start the test277 if environment.shape_class:278 environment.runner.start_shape()279 else:280 runner.start(options.num_users, options.spawn_rate)281 def spawn_run_time_limit_greenlet():282 def timelimit_stop():283 logger.info("Time limit reached. Stopping Locust.")284 runner.quit()285 gevent.spawn_later(options.run_time, timelimit_stop).link_exception(greenlet_exception_handler)286 if options.run_time:287 logger.info("Run time limit set to %s seconds" % options.run_time)288 spawn_run_time_limit_greenlet()289 elif options.headless:290 logger.info("No run time limit set, use CTRL+C to interrupt.")291 else:292 pass # dont log anything - not having a time limit is normal when not running headless293 input_listener_greenlet = None294 if not options.worker:295 # spawn input listener greenlet296 input_listener_greenlet = gevent.spawn(297 input_listener(298 {299 "w": lambda: runner.spawn_users(1, 100)300 if runner.state != "spawning"301 else logging.warning("Already spawning users, can't spawn more right now"),302 "W": lambda: runner.spawn_users(10, 100)303 if runner.state != "spawning"304 else logging.warning("Already spawning users, can't spawn more right now"),305 "s": lambda: runner.stop_users(1)306 if runner.state != "spawning"307 else logging.warning("Spawning users, can't stop right now"),308 "S": lambda: runner.stop_users(10)309 if runner.state != "spawning"310 else logging.warning("Spawning users, can't stop right now"),311 }312 )313 )314 input_listener_greenlet.link_exception(greenlet_exception_handler)315 stats_printer_greenlet = None316 if not options.only_summary and (options.print_stats or (options.headless and not options.worker)):317 # spawn stats printing greenlet318 stats_printer_greenlet = gevent.spawn(stats_printer(runner.stats))319 stats_printer_greenlet.link_exception(greenlet_exception_handler)320 if options.csv_prefix:321 gevent.spawn(stats_csv_writer.stats_writer).link_exception(greenlet_exception_handler)322 gevent.spawn(stats_history, runner)323 def shutdown():324 """325 Shut down locust by firing quitting event, printing/writing stats and exiting326 """327 logger.info("Running teardowns...")328 if input_listener_greenlet is not None:329 input_listener_greenlet.kill(block=False)330 environment.events.quitting.fire(environment=environment, reverse=True)331 # determine the process exit code332 if log.unhandled_greenlet_exception:333 code = 2334 elif environment.process_exit_code is not None:335 code = environment.process_exit_code336 elif len(runner.errors) or len(runner.exceptions):337 code = options.exit_code_on_error338 else:339 code = 0340 logger.info("Shutting down (exit code %s), bye." % code)341 if stats_printer_greenlet is not None:342 stats_printer_greenlet.kill(block=False)343 logger.info("Cleaning up runner...")344 if runner is not None:345 runner.quit()346 print_stats(runner.stats, current=False)347 print_percentile_stats(runner.stats)348 print_error_report(runner.stats)349 sys.exit(code)350 # install SIGTERM handler351 def sig_term_handler():352 logger.info("Got SIGTERM signal")353 shutdown()354 gevent.signal_handler(signal.SIGTERM, sig_term_handler)355 try:356 logger.info("Starting Locust %s" % version)357 main_greenlet.join()358 shutdown()359 except KeyboardInterrupt:...
regulation.py
Source:regulation.py
1from pycurb import PyCurbObject2class Regulation(PyCurbObject):3 fields = ['rule', 'user_classes', 'time_spans', 'priority', 'payment']4 def __init__(self,5 rule,6 user_classes=None,7 time_spans=None,8 priority=None,9 payment=None):10 self.rule = rule11 self.user_classes = None if user_classes == [{}] else user_classes12 self.time_spans = None if time_spans == [] else time_spans13 self.priority = priority14 self.payment = payment15 def to_dict(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!