Best Python code snippet using locust
test_runners.py
Source:test_runners.py
...335 self.assertTrue(0 <= delta <= 1.05, "Expected user count to decrease to 2 in 1s, instead it took %f" % delta)336 self.assertTrue(337 runner.user_count == 2, "User count has not decreased correctly to 2, it is : %i" % runner.user_count338 )339 def test_attributes_populated_when_calling_start(self):340 class MyUser1(User):341 wait_time = constant(0)342 @task343 def my_task(self):344 pass345 class MyUser2(User):346 wait_time = constant(0)347 @task348 def my_task(self):349 pass350 environment = Environment(user_classes=[MyUser1, MyUser2])351 runner = LocalRunner(environment)352 runner.start(user_count=10, spawn_rate=5, wait=False)353 runner.spawning_greenlet.join()354 self.assertDictEqual({"MyUser1": 5, "MyUser2": 5}, runner.user_classes_count)355 runner.start(user_count=5, spawn_rate=5, wait=False)356 runner.spawning_greenlet.join()357 self.assertDictEqual({"MyUser1": 3, "MyUser2": 2}, runner.user_classes_count)358 runner.quit()359 def test_user_classes_count(self):360 class MyUser1(User):361 wait_time = constant(0)362 @task363 def my_task(self):364 pass365 class MyUser2(User):366 wait_time = constant(0)367 @task368 def my_task(self):369 pass370 environment = Environment(user_classes=[MyUser1, MyUser2])371 runner = LocalRunner(environment)372 runner.start(user_count=10, spawn_rate=5, wait=False)373 runner.spawning_greenlet.join()374 self.assertDictEqual({"MyUser1": 5, "MyUser2": 5}, runner.user_classes_count)375 runner.start(user_count=5, spawn_rate=5, wait=False)376 runner.spawning_greenlet.join()377 self.assertDictEqual({"MyUser1": 3, "MyUser2": 2}, runner.user_classes_count)378 runner.quit()379 def test_custom_message(self):380 class MyUser(User):381 wait_time = constant(1)382 @task383 def my_task(self):384 pass385 test_custom_msg = [False]386 test_custom_msg_data = [{}]387 def on_custom_msg(msg, **kw):388 test_custom_msg[0] = True389 test_custom_msg_data[0] = msg.data390 environment = Environment(user_classes=[MyUser])391 runner = LocalRunner(environment)392 runner.register_message("test_custom_msg", on_custom_msg)393 runner.send_message("test_custom_msg", {"test_data": 123})394 self.assertTrue(test_custom_msg[0])395 self.assertEqual(123, test_custom_msg_data[0]["test_data"])396 def test_undefined_custom_message(self):397 class MyUser(User):398 wait_time = constant(1)399 @task400 def my_task(self):401 pass402 test_custom_msg = [False]403 def on_custom_msg(msg, **kw):404 test_custom_msg[0] = True405 environment = Environment(user_classes=[MyUser])406 runner = LocalRunner(environment)407 runner.register_message("test_custom_msg", on_custom_msg)408 runner.send_message("test_different_custom_msg")409 self.assertFalse(test_custom_msg[0])410 self.assertEqual(1, len(self.mocked_log.warning))411 msg = self.mocked_log.warning[0]412 self.assertIn("Unknown message type recieved", msg)413 def test_swarm_endpoint_is_non_blocking(self):414 class TestUser1(User):415 @task416 def my_task(self):417 gevent.sleep(600)418 class TestUser2(User):419 @task420 def my_task(self):421 gevent.sleep(600)422 stop_timeout = 0423 env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)424 local_runner = env.create_local_runner()425 web_ui = env.create_web_ui("127.0.0.1", 0)426 gevent.sleep(0.1)427 ts = time.perf_counter()428 response = requests.post(429 "http://127.0.0.1:{}/swarm".format(web_ui.server.server_port),430 data={"user_count": 20, "spawn_rate": 5, "host": "https://localhost"},431 )432 self.assertEqual(200, response.status_code)433 self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")434 ts = time.perf_counter()435 while local_runner.state != STATE_RUNNING:436 self.assertTrue(time.perf_counter() - ts <= 4, local_runner.state)437 gevent.sleep(0.1)438 self.assertTrue(3 <= time.perf_counter() - ts <= 5)439 self.assertEqual(local_runner.user_count, 20)440 local_runner.stop()441 web_ui.stop()442 def test_can_call_stop_endpoint_if_currently_swarming(self):443 class TestUser1(User):444 @task445 def my_task(self):446 gevent.sleep(600)447 class TestUser2(User):448 @task449 def my_task(self):450 gevent.sleep(600)451 stop_timeout = 5452 env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)453 local_runner = env.create_local_runner()454 web_ui = env.create_web_ui("127.0.0.1", 0)455 gevent.sleep(0.1)456 ts = time.perf_counter()457 response = requests.post(458 "http://127.0.0.1:{}/swarm".format(web_ui.server.server_port),459 data={"user_count": 20, "spawn_rate": 1, "host": "https://localhost"},460 )461 self.assertEqual(200, response.status_code)462 self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")463 gevent.sleep(5)464 self.assertEqual(local_runner.state, STATE_SPAWNING)465 self.assertLessEqual(local_runner.user_count, 10)466 ts = time.perf_counter()467 response = requests.get(468 "http://127.0.0.1:{}/stop".format(web_ui.server.server_port),469 )470 self.assertEqual(200, response.status_code)471 self.assertTrue(stop_timeout <= time.perf_counter() - ts <= stop_timeout + 5, "stop endpoint took too long")472 ts = time.perf_counter()473 while local_runner.state != STATE_STOPPED:474 self.assertTrue(time.perf_counter() - ts <= 2)475 gevent.sleep(0.1)476 self.assertLessEqual(local_runner.user_count, 0)477 local_runner.stop()478 web_ui.stop()479class TestMasterWorkerRunners(LocustTestCase):480 def test_distributed_integration_run(self):481 """482 Full integration test that starts both a MasterRunner and three WorkerRunner instances483 and makes sure that their stats is sent to the Master.484 """485 class TestUser(User):486 wait_time = constant(0.1)487 @task488 def incr_stats(self):489 self.environment.events.request.fire(490 request_type="GET",491 name="/",492 response_time=1337,493 response_length=666,494 exception=None,495 context={},496 )497 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):498 # start a Master runner499 master_env = Environment(user_classes=[TestUser])500 master = master_env.create_master_runner("*", 0)501 sleep(0)502 # start 3 Worker runners503 workers = []504 for i in range(3):505 worker_env = Environment(user_classes=[TestUser])506 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)507 workers.append(worker)508 # give workers time to connect509 sleep(0.1)510 # issue start command that should trigger TestUsers to be spawned in the Workers511 master.start(6, spawn_rate=1000)512 sleep(0.1)513 # check that worker nodes have started locusts514 for worker in workers:515 self.assertEqual(2, worker.user_count)516 # give time for users to generate stats, and stats to be sent to master517 sleep(1)518 master.quit()519 # make sure users are killed520 for worker in workers:521 self.assertEqual(0, worker.user_count)522 # check that stats are present in master523 self.assertGreater(524 master_env.runner.stats.total.num_requests,525 20,526 "For some reason the master node's stats has not come in",527 )528 def test_distributed_run_with_custom_args(self):529 """530 Full integration test that starts both a MasterRunner and three WorkerRunner instances531 and makes sure that their stats is sent to the Master.532 """533 class TestUser(User):534 wait_time = constant(0.1)535 @task536 def incr_stats(self):537 self.environment.events.request.fire(538 request_type="GET",539 name=self.environment.parsed_options.my_str_argument,540 response_time=self.environment.parsed_options.my_int_argument,541 response_length=666,542 exception=None,543 context={},544 )545 @locust.events.init_command_line_parser.add_listener546 def _(parser, **kw):547 parser.add_argument("--my-int-argument", type=int)548 parser.add_argument("--my-str-argument", type=str, default="NOOOO")549 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):550 # start a Master runner551 master_env = Environment(user_classes=[TestUser])552 master = master_env.create_master_runner("*", 0)553 master_env.parsed_options = parse_options(554 [555 "--my-int-argument",556 "42",557 "--my-str-argument",558 "cool-string",559 ]560 )561 sleep(0)562 # start 3 Worker runners563 workers = []564 for i in range(3):565 worker_env = Environment(user_classes=[TestUser])566 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)567 workers.append(worker)568 # give workers time to connect569 sleep(0.1)570 # issue start command that should trigger TestUsers to be spawned in the Workers571 master.start(6, spawn_rate=1000)572 sleep(0.1)573 # check that worker nodes have started locusts574 for worker in workers:575 self.assertEqual(2, worker.user_count)576 # give time for users to generate stats, and stats to be sent to master577 sleep(1)578 master.quit()579 # make sure users are killed580 for worker in workers:581 self.assertEqual(0, worker.user_count)582 self.assertEqual(master_env.runner.stats.total.max_response_time, 42)583 self.assertEqual(master_env.runner.stats.get("cool-string", "GET").avg_response_time, 42)584 def test_test_stop_event(self):585 class TestUser(User):586 wait_time = constant(0.1)587 @task588 def my_task(l):589 pass590 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):591 # start a Master runner592 master_env = Environment(user_classes=[TestUser])593 test_stop_count = {"master": 0, "worker": 0}594 @master_env.events.test_stop.add_listener595 def _(*args, **kwargs):596 test_stop_count["master"] += 1597 master = master_env.create_master_runner("*", 0)598 sleep(0)599 # start a Worker runner600 worker_env = Environment(user_classes=[TestUser])601 @worker_env.events.test_stop.add_listener602 def _(*args, **kwargs):603 test_stop_count["worker"] += 1604 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)605 # give worker time to connect606 sleep(0.1)607 # issue start command that should trigger TestUsers to be spawned in the Workers608 master.start(2, spawn_rate=1000)609 sleep(0.1)610 # check that worker nodes have started locusts611 self.assertEqual(2, worker.user_count)612 # give time for users to generate stats, and stats to be sent to master613 sleep(0.1)614 master_env.events.quitting.fire(environment=master_env, reverse=True)615 master.quit()616 sleep(0.1)617 # make sure users are killed618 self.assertEqual(0, worker.user_count)619 # check the test_stop event was called one time in master and one time in worker620 self.assertEqual(621 1,622 test_stop_count["master"],623 "The test_stop event was not called exactly one time in the master node",624 )625 self.assertEqual(626 1,627 test_stop_count["worker"],628 "The test_stop event was not called exactly one time in the worker node",629 )630 def test_distributed_shape(self):631 """632 Full integration test that starts both a MasterRunner and three WorkerRunner instances633 and tests a basic LoadTestShape with scaling up and down users634 """635 class TestUser(User):636 @task637 def my_task(self):638 pass639 class TestShape(LoadTestShape):640 def tick(self):641 run_time = self.get_run_time()642 if run_time < 2:643 return 9, 9644 elif run_time < 4:645 return 21, 21646 elif run_time < 6:647 return 3, 21648 else:649 return None650 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):651 test_shape = TestShape()652 master_env = Environment(user_classes=[TestUser], shape_class=test_shape)653 master_env.shape_class.reset_time()654 master = master_env.create_master_runner("*", 0)655 workers = []656 for i in range(3):657 worker_env = Environment(user_classes=[TestUser])658 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)659 workers.append(worker)660 # Give workers time to connect661 sleep(0.1)662 # Start a shape test663 master.start_shape()664 sleep(1)665 # Ensure workers have connected and started the correct amount of users666 for worker in workers:667 self.assertEqual(3, worker.user_count, "Shape test has not reached stage 1")668 self.assertEqual(669 9, test_shape.get_current_user_count(), "Shape is not seeing stage 1 runner user count correctly"670 )671 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 9})672 # Ensure new stage with more users has been reached673 sleep(2)674 for worker in workers:675 self.assertEqual(7, worker.user_count, "Shape test has not reached stage 2")676 self.assertEqual(677 21, test_shape.get_current_user_count(), "Shape is not seeing stage 2 runner user count correctly"678 )679 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 21})680 # Ensure new stage with less users has been reached681 sleep(2)682 for worker in workers:683 self.assertEqual(1, worker.user_count, "Shape test has not reached stage 3")684 self.assertEqual(685 3, test_shape.get_current_user_count(), "Shape is not seeing stage 3 runner user count correctly"686 )687 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 3})688 # Ensure test stops at the end689 sleep(2)690 for worker in workers:691 self.assertEqual(0, worker.user_count, "Shape test has not stopped")692 self.assertEqual(693 0, test_shape.get_current_user_count(), "Shape is not seeing stopped runner user count correctly"694 )695 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 0})696 self.assertEqual("stopped", master.state)697 def test_distributed_shape_with_stop_timeout(self):698 """699 Full integration test that starts both a MasterRunner and five WorkerRunner instances700 and tests a basic LoadTestShape with scaling up and down users701 """702 class TestUser1(User):703 def start(self, group: Group):704 gevent.sleep(0.5)705 return super().start(group)706 @task707 def my_task(self):708 gevent.sleep(0)709 class TestUser2(User):710 def start(self, group: Group):711 gevent.sleep(0.5)712 return super().start(group)713 @task714 def my_task(self):715 gevent.sleep(600)716 class TestUser3(User):717 def start(self, group: Group):718 gevent.sleep(0.5)719 return super().start(group)720 @task721 def my_task(self):722 gevent.sleep(600)723 class TestShape(LoadTestShape):724 def tick(self):725 run_time = self.get_run_time()726 if run_time < 10:727 return 15, 3728 elif run_time < 30:729 return 5, 10730 else:731 return None732 locust_worker_additional_wait_before_ready_after_stop = 5733 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env(734 "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",735 str(locust_worker_additional_wait_before_ready_after_stop),736 ):737 stop_timeout = 5738 master_env = Environment(739 user_classes=[TestUser1, TestUser2, TestUser3], shape_class=TestShape(), stop_timeout=stop_timeout740 )741 master_env.shape_class.reset_time()742 master = master_env.create_master_runner("*", 0)743 workers = []744 for i in range(5):745 worker_env = Environment(user_classes=[TestUser1, TestUser2, TestUser3])746 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)747 workers.append(worker)748 # Give workers time to connect749 sleep(0.1)750 self.assertEqual(STATE_INIT, master.state)751 self.assertEqual(5, len(master.clients.ready))752 # Re-order `workers` so that it is sorted by `id`.753 # This is required because the dispatch is done754 # on the sorted workers.755 workers = sorted(workers, key=lambda w: w.client_id)756 # Start a shape test757 master.start_shape()758 # First stage759 ts = time.time()760 while master.state != STATE_SPAWNING:761 self.assertTrue(time.time() - ts <= 1, master.state)762 sleep()763 sleep(5 - (time.time() - ts)) # runtime = 5s764 ts = time.time()765 while master.state != STATE_RUNNING:766 self.assertTrue(time.time() - ts <= 1, master.state)767 sleep()768 self.assertEqual(STATE_RUNNING, master.state)769 w1 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}770 w2 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}771 w3 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}772 w4 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}773 w5 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}774 self.assertDictEqual(w1, workers[0].user_classes_count)775 self.assertDictEqual(w2, workers[1].user_classes_count)776 self.assertDictEqual(w3, workers[2].user_classes_count)777 self.assertDictEqual(w4, workers[3].user_classes_count)778 self.assertDictEqual(w5, workers[4].user_classes_count)779 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)780 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)781 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)782 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)783 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)784 sleep(5 - (time.time() - ts)) # runtime = 10s785 # Fourth stage786 ts = time.time()787 while master.state != STATE_SPAWNING:788 self.assertTrue(time.time() - ts <= 1, master.state)789 sleep()790 sleep(5 - (time.time() - ts)) # runtime = 15s791 # Fourth stage - Excess TestUser1 have been stopped but792 # TestUser2/TestUser3 have not reached stop timeout yet, so793 # their number are unchanged794 ts = time.time()795 while master.state != STATE_RUNNING:796 self.assertTrue(time.time() - ts <= 1, master.state)797 sleep()798 delta = time.time() - ts799 w1 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}800 w2 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}801 w3 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}802 w4 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}803 w5 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}804 self.assertDictEqual(w1, workers[0].user_classes_count)805 self.assertDictEqual(w2, workers[1].user_classes_count)806 self.assertDictEqual(w3, workers[2].user_classes_count)807 self.assertDictEqual(w4, workers[3].user_classes_count)808 self.assertDictEqual(w5, workers[4].user_classes_count)809 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)810 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)811 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)812 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)813 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)814 sleep(1 - delta) # runtime = 16s815 # Fourth stage - All users are now at the desired number816 ts = time.time()817 while master.state != STATE_RUNNING:818 self.assertTrue(time.time() - ts <= 1, master.state)819 sleep()820 delta = time.time() - ts821 w1 = {"TestUser1": 1, "TestUser2": 0, "TestUser3": 0}822 w2 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 0}823 w3 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 1}824 w4 = {"TestUser1": 1, "TestUser2": 0, "TestUser3": 0}825 w5 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 0}826 self.assertDictEqual(w1, workers[0].user_classes_count)827 self.assertDictEqual(w2, workers[1].user_classes_count)828 self.assertDictEqual(w3, workers[2].user_classes_count)829 self.assertDictEqual(w4, workers[3].user_classes_count)830 self.assertDictEqual(w5, workers[4].user_classes_count)831 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)832 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)833 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)834 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)835 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)836 sleep(10 - delta) # runtime = 26s837 # Sleep stop_timeout and make sure the test has stopped838 sleep(5) # runtime = 31s839 self.assertEqual(STATE_STOPPING, master.state)840 sleep(stop_timeout) # runtime = 36s841 # We wait for "stop_timeout" seconds to let the workers reconnect as "ready" with the master.842 # The reason for waiting an additional "stop_timeout" when we already waited for "stop_timeout"843 # above is that when a worker receives the stop message, it can take up to "stop_timeout"844 # for the worker to send the "client_stopped" message then an additional "stop_timeout" seconds845 # to send the "client_ready" message.846 ts = time.time()847 while len(master.clients.ready) != len(workers):848 self.assertTrue(849 time.time() - ts <= stop_timeout + locust_worker_additional_wait_before_ready_after_stop,850 f"expected {len(workers)} workers to be ready but only {len(master.clients.ready)} workers are",851 )852 sleep()853 sleep(1)854 # Check that no users are running855 w1 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}856 w2 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}857 w3 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}858 w4 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}859 w5 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}860 self.assertDictEqual(w1, workers[0].user_classes_count)861 self.assertDictEqual(w2, workers[1].user_classes_count)862 self.assertDictEqual(w3, workers[2].user_classes_count)863 self.assertDictEqual(w4, workers[3].user_classes_count)864 self.assertDictEqual(w5, workers[4].user_classes_count)865 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)866 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)867 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)868 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)869 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)870 ts = time.time()871 while master.state != STATE_STOPPED:872 self.assertTrue(time.time() - ts <= 5, master.state)873 sleep()874 master.stop()875 @unittest.skip876 def test_distributed_shape_fuzzy_test(self):877 """878 Incredibility useful test to find issues with dispatch logic. This test allowed to find879 multiple small corner cases with the new dispatch logic of locust v2.880 The test is disabled by default because it takes a lot of time to run and has randomness to it.881 However, it is advised to run it a few times (you can run it in parallel) when modifying the dispatch logic.882 """883 class BaseUser(User):884 @task885 def my_task(self):886 gevent.sleep(600)887 class TestUser01(BaseUser):888 pass889 class TestUser02(BaseUser):890 pass891 class TestUser03(BaseUser):892 pass893 class TestUser04(BaseUser):894 pass895 class TestUser05(BaseUser):896 pass897 class TestUser06(BaseUser):898 pass899 class TestUser07(BaseUser):900 pass901 class TestUser08(BaseUser):902 pass903 class TestUser09(BaseUser):904 pass905 class TestUser10(BaseUser):906 pass907 class TestUser11(BaseUser):908 pass909 class TestUser12(BaseUser):910 pass911 class TestUser13(BaseUser):912 pass913 class TestUser14(BaseUser):914 pass915 class TestUser15(BaseUser):916 pass917 class TestShape(LoadTestShape):918 def __init__(self):919 super().__init__()920 self.stages = []921 runtime = 0922 for _ in range(100):923 runtime += random.uniform(3, 15)924 self.stages.append((runtime, random.randint(1, 100), random.uniform(0.1, 10)))925 def tick(self):926 run_time = self.get_run_time()927 for stage in self.stages:928 if run_time < stage[0]:929 return stage[1], stage[2]930 user_classes = [931 TestUser01,932 TestUser02,933 TestUser03,934 TestUser04,935 TestUser05,936 TestUser06,937 TestUser07,938 TestUser08,939 TestUser09,940 TestUser10,941 TestUser11,942 TestUser12,943 TestUser13,944 TestUser14,945 TestUser15,946 ]947 chosen_user_classes = random.sample(user_classes, k=random.randint(1, len(user_classes)))948 for user_class in chosen_user_classes:949 user_class.weight = random.uniform(1, 20)950 locust_worker_additional_wait_before_ready_after_stop = 5951 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env(952 "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",953 str(locust_worker_additional_wait_before_ready_after_stop),954 ):955 stop_timeout = 5956 master_env = Environment(957 user_classes=chosen_user_classes, shape_class=TestShape(), stop_timeout=stop_timeout958 )959 master_env.shape_class.reset_time()960 master = master_env.create_master_runner("*", 0)961 workers = []962 for i in range(random.randint(1, 30)):963 worker_env = Environment(user_classes=chosen_user_classes)964 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)965 workers.append(worker)966 # Give workers time to connect967 sleep(0.1)968 self.assertEqual(STATE_INIT, master.state)969 self.assertEqual(len(workers), len(master.clients.ready))970 # Start a shape test971 master.start_shape()972 ts = time.time()973 while master.state != STATE_STOPPED:974 self.assertTrue(time.time() - ts <= master_env.shape_class.stages[-1][0] + 60, master.state)975 print(976 "{:.2f}/{:.2f} | {} | {:.0f} | ".format(977 time.time() - ts,978 master_env.shape_class.stages[-1][0],979 master.state,980 sum(master.reported_user_classes_count.values()),981 )982 + json.dumps(dict(sorted(master.reported_user_classes_count.items(), key=itemgetter(0))))983 )984 sleep(1)985 master.stop()986 def test_distributed_shape_stop_and_restart(self):987 """988 Test stopping and then restarting a LoadTestShape989 """990 class TestUser(User):991 @task992 def my_task(self):993 pass994 class TestShape(LoadTestShape):995 def tick(self):996 run_time = self.get_run_time()997 if run_time < 10:998 return 4, 4999 else:1000 return None1001 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1002 master_env = Environment(user_classes=[TestUser], shape_class=TestShape())1003 master_env.shape_class.reset_time()1004 master = master_env.create_master_runner("*", 0)1005 workers = []1006 for i in range(2):1007 worker_env = Environment(user_classes=[TestUser])1008 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1009 workers.append(worker)1010 # Give workers time to connect1011 sleep(0.1)1012 # Start a shape test and ensure workers have connected and started the correct amount of users1013 master.start_shape()1014 sleep(1)1015 for worker in workers:1016 self.assertEqual(2, worker.user_count, "Shape test has not started correctly")1017 # Stop the test and ensure all user count is 01018 master.stop()1019 sleep(1)1020 for worker in workers:1021 self.assertEqual(0, worker.user_count, "Shape test has not stopped")1022 # Then restart the test again and ensure workers have connected and started the correct amount of users1023 master.start_shape()1024 sleep(1)1025 for worker in workers:1026 self.assertEqual(2, worker.user_count, "Shape test has not started again correctly")1027 master.stop()1028 def test_distributed_shape_statuses_transition(self):1029 """1030 Full integration test that starts both a MasterRunner and five WorkerRunner instances1031 The goal of this test is to validate the status on the master is correctly transitioned for each of the1032 test phases.1033 """1034 class TestUser1(User):1035 @task1036 def my_task(self):1037 gevent.sleep(600)1038 class TestShape(LoadTestShape):1039 def tick(self):1040 run_time = self.get_run_time()1041 if run_time < 5:1042 return 5, 2.51043 elif run_time < 10:1044 return 10, 2.51045 elif run_time < 15:1046 return 15, 2.51047 else:1048 return None1049 locust_worker_additional_wait_before_ready_after_stop = 21050 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), _patch_env(1051 "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",1052 str(locust_worker_additional_wait_before_ready_after_stop),1053 ):1054 stop_timeout = 01055 master_env = Environment(user_classes=[TestUser1], shape_class=TestShape(), stop_timeout=stop_timeout)1056 master_env.shape_class.reset_time()1057 master = master_env.create_master_runner("*", 0)1058 workers = []1059 for i in range(5):1060 worker_env = Environment(user_classes=[TestUser1])1061 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1062 workers.append(worker)1063 # Give workers time to connect1064 sleep(0.1)1065 self.assertEqual(STATE_INIT, master.state)1066 self.assertEqual(5, len(master.clients.ready))1067 statuses = []1068 ts = time.perf_counter()1069 master.start_shape()1070 while master.state != STATE_STOPPED:1071 # +5s buffer to let master stop1072 self.assertTrue(1073 time.perf_counter() - ts <= 30 + locust_worker_additional_wait_before_ready_after_stop + 5,1074 master.state,1075 )1076 statuses.append((time.perf_counter() - ts, master.state, master.user_count))1077 sleep(0.1)1078 self.assertEqual(statuses[0][1], STATE_INIT)1079 stage = 11080 tolerance = 1 # in s1081 for (t1, state1, user_count1), (t2, state2, user_count2) in zip(statuses[:-1], statuses[1:]):1082 if state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 1:1083 self.assertTrue(2.5 - tolerance <= t2 <= 2.5 + tolerance)1084 elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 1:1085 self.assertTrue(5 - tolerance <= t2 <= 5 + tolerance)1086 stage += 11087 elif state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 2:1088 self.assertTrue(7.5 - tolerance <= t2 <= 7.5 + tolerance)1089 elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 2:1090 self.assertTrue(10 - tolerance <= t2 <= 10 + tolerance)1091 stage += 11092 elif state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 3:1093 self.assertTrue(12.5 - tolerance <= t2 <= 12.5 + tolerance)1094 elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 3:1095 self.assertTrue(15 - tolerance <= t2 <= 15 + tolerance)1096 stage += 11097 elif state1 == STATE_RUNNING and state2 == STATE_STOPPED and stage == 3:1098 self.assertTrue(15 - tolerance <= t2 <= 15 + tolerance)1099 def test_swarm_endpoint_is_non_blocking(self):1100 class TestUser1(User):1101 @task1102 def my_task(self):1103 gevent.sleep(600)1104 class TestUser2(User):1105 @task1106 def my_task(self):1107 gevent.sleep(600)1108 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1109 stop_timeout = 01110 master_env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)1111 master = master_env.create_master_runner("*", 0)1112 web_ui = master_env.create_web_ui("127.0.0.1", 0)1113 workers = []1114 for i in range(2):1115 worker_env = Environment(user_classes=[TestUser1, TestUser2])1116 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1117 workers.append(worker)1118 # Give workers time to connect1119 sleep(0.1)1120 self.assertEqual(STATE_INIT, master.state)1121 self.assertEqual(len(master.clients.ready), len(workers))1122 ts = time.perf_counter()1123 response = requests.post(1124 "http://127.0.0.1:{}/swarm".format(web_ui.server.server_port),1125 data={"user_count": 20, "spawn_rate": 5, "host": "https://localhost"},1126 )1127 self.assertEqual(200, response.status_code)1128 self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")1129 ts = time.perf_counter()1130 while master.state != STATE_RUNNING:1131 self.assertTrue(time.perf_counter() - ts <= 4, master.state)1132 gevent.sleep(0.1)1133 self.assertTrue(3 <= time.perf_counter() - ts <= 5)1134 self.assertEqual(master.user_count, 20)1135 master.stop()1136 web_ui.stop()1137 def test_can_call_stop_endpoint_if_currently_swarming(self):1138 class TestUser1(User):1139 @task1140 def my_task(self):1141 gevent.sleep(600)1142 class TestUser2(User):1143 @task1144 def my_task(self):1145 gevent.sleep(600)1146 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1147 stop_timeout = 51148 master_env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)1149 master = master_env.create_master_runner("*", 0)1150 web_ui = master_env.create_web_ui("127.0.0.1", 0)1151 workers = []1152 for i in range(2):1153 worker_env = Environment(user_classes=[TestUser1, TestUser2])1154 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1155 workers.append(worker)1156 # Give workers time to connect1157 sleep(0.1)1158 self.assertEqual(STATE_INIT, master.state)1159 self.assertEqual(len(master.clients.ready), len(workers))1160 ts = time.perf_counter()1161 response = requests.post(1162 "http://127.0.0.1:{}/swarm".format(web_ui.server.server_port),1163 data={"user_count": 20, "spawn_rate": 1, "host": "https://localhost"},1164 )1165 self.assertEqual(200, response.status_code)1166 self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")1167 gevent.sleep(5)1168 self.assertEqual(master.state, STATE_SPAWNING)1169 self.assertLessEqual(master.user_count, 10)1170 ts = time.perf_counter()1171 response = requests.get(1172 "http://127.0.0.1:{}/stop".format(web_ui.server.server_port),1173 )1174 self.assertEqual(200, response.status_code)1175 self.assertTrue(stop_timeout <= time.perf_counter() - ts <= stop_timeout + 5, "stop endpoint took too long")1176 ts = time.perf_counter()1177 while master.state != STATE_STOPPED:1178 self.assertTrue(time.perf_counter() - ts <= 2)1179 gevent.sleep(0.1)1180 self.assertLessEqual(master.user_count, 0)1181 master.stop()1182 web_ui.stop()1183class TestMasterRunner(LocustTestCase):1184 def setUp(self):1185 super().setUp()1186 self.environment = Environment(events=locust.events, catch_exceptions=False)1187 def tearDown(self):1188 super().tearDown()1189 def get_runner(self, user_classes=None):1190 if user_classes is not None:1191 self.environment.user_classes = user_classes1192 return self.environment.create_master_runner("*", 5557)1193 def test_worker_connect(self):1194 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1195 master = self.get_runner()1196 server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1"))1197 self.assertEqual(1, len(master.clients))1198 self.assertTrue(1199 "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"1200 )1201 server.mocked_send(Message("client_ready", __version__, "zeh_fake_client2"))1202 server.mocked_send(Message("client_ready", __version__, "zeh_fake_client3"))1203 server.mocked_send(Message("client_ready", __version__, "zeh_fake_client4"))1204 self.assertEqual(4, len(master.clients))1205 server.mocked_send(Message("quit", None, "zeh_fake_client3"))1206 self.assertEqual(3, len(master.clients))1207 def test_worker_connect_with_special_versions(self):1208 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1209 master = self.get_runner()1210 server.mocked_send(Message("client_ready", None, "1.x_style_client_should_not_be_allowed"))1211 self.assertEqual(1, len(self.mocked_log.error))1212 self.assertEqual(0, len(master.clients))1213 server.mocked_send(Message("client_ready", "abcd", "other_version_mismatch_should_just_give_a_warning"))1214 self.assertEqual(1, len(self.mocked_log.warning))1215 self.assertEqual(1, len(master.clients))1216 server.mocked_send(Message("client_ready", -1, "version_check_bypass_should_not_warn"))1217 self.assertEqual(1, len(self.mocked_log.warning))1218 self.assertEqual(2, len(master.clients))1219 def test_worker_stats_report_median(self):1220 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1221 master = self.get_runner()1222 server.mocked_send(Message("client_ready", __version__, "fake_client"))1223 master.stats.get("/", "GET").log(100, 23455)1224 master.stats.get("/", "GET").log(800, 23455)1225 master.stats.get("/", "GET").log(700, 23455)1226 data = {"user_count": 1}1227 self.environment.events.report_to_master.fire(client_id="fake_client", data=data)1228 master.stats.clear_all()1229 server.mocked_send(Message("stats", data, "fake_client"))1230 s = master.stats.get("/", "GET")1231 self.assertEqual(700, s.median_response_time)1232 def test_worker_stats_report_with_none_response_times(self):1233 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1234 master = self.get_runner()1235 server.mocked_send(Message("client_ready", __version__, "fake_client"))1236 master.stats.get("/mixed", "GET").log(0, 23455)1237 master.stats.get("/mixed", "GET").log(800, 23455)1238 master.stats.get("/mixed", "GET").log(700, 23455)1239 master.stats.get("/mixed", "GET").log(None, 23455)1240 master.stats.get("/mixed", "GET").log(None, 23455)1241 master.stats.get("/mixed", "GET").log(None, 23455)1242 master.stats.get("/mixed", "GET").log(None, 23455)1243 master.stats.get("/onlyNone", "GET").log(None, 23455)1244 data = {"user_count": 1}1245 self.environment.events.report_to_master.fire(client_id="fake_client", data=data)1246 master.stats.clear_all()1247 server.mocked_send(Message("stats", data, "fake_client"))1248 s1 = master.stats.get("/mixed", "GET")1249 self.assertEqual(700, s1.median_response_time)1250 self.assertEqual(500, s1.avg_response_time)1251 s2 = master.stats.get("/onlyNone", "GET")1252 self.assertEqual(0, s2.median_response_time)1253 self.assertEqual(0, s2.avg_response_time)1254 def test_master_marks_downed_workers_as_missing(self):1255 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1256 master = self.get_runner()1257 server.mocked_send(Message("client_ready", __version__, "fake_client"))1258 sleep(6)1259 # print(master.clients['fake_client'].__dict__)1260 assert master.clients["fake_client"].state == STATE_MISSING1261 def test_last_worker_quitting_stops_test(self):1262 class TestUser(User):1263 @task1264 def my_task(self):1265 gevent.sleep(600)1266 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1267 master = self.get_runner(user_classes=[TestUser])1268 server.mocked_send(Message("client_ready", __version__, "fake_client1"))1269 server.mocked_send(Message("client_ready", __version__, "fake_client2"))1270 master.start(1, 2)1271 server.mocked_send(Message("spawning", None, "fake_client1"))1272 server.mocked_send(Message("spawning", None, "fake_client2"))1273 server.mocked_send(Message("quit", None, "fake_client1"))1274 sleep(0.1)1275 self.assertEqual(1, len(master.clients.all))1276 self.assertNotEqual(STATE_STOPPED, master.state, "Not all workers quit but test stopped anyway.")1277 server.mocked_send(Message("quit", None, "fake_client2"))1278 sleep(0.1)1279 self.assertEqual(0, len(master.clients.all))1280 self.assertEqual(STATE_STOPPED, master.state, "All workers quit but test didn't stop.")1281 @mock.patch("locust.runners.HEARTBEAT_INTERVAL", new=0.1)1282 def test_last_worker_missing_stops_test(self):1283 class TestUser(User):1284 @task1285 def my_task(self):1286 gevent.sleep(600)1287 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1288 master = self.get_runner(user_classes=[TestUser])1289 server.mocked_send(Message("client_ready", __version__, "fake_client1"))1290 server.mocked_send(Message("client_ready", __version__, "fake_client2"))1291 server.mocked_send(Message("client_ready", __version__, "fake_client3"))1292 master.start(3, 3)1293 server.mocked_send(Message("spawning", None, "fake_client1"))1294 server.mocked_send(Message("spawning", None, "fake_client2"))1295 server.mocked_send(Message("spawning", None, "fake_client3"))1296 sleep(0.2)1297 server.mocked_send(1298 Message("heartbeat", {"state": STATE_RUNNING, "current_cpu_usage": 50, "count": 1}, "fake_client1")1299 )1300 server.mocked_send(1301 Message("heartbeat", {"state": STATE_RUNNING, "current_cpu_usage": 50, "count": 1}, "fake_client2")1302 )1303 server.mocked_send(1304 Message("heartbeat", {"state": STATE_RUNNING, "current_cpu_usage": 50, "count": 1}, "fake_client3")1305 )1306 sleep(0.2)1307 self.assertEqual(0, len(master.clients.missing))1308 self.assertEqual(3, master.worker_count)1309 self.assertNotIn(1310 master.state, [STATE_STOPPED, STATE_STOPPING], "Not all workers went missing but test stopped anyway."1311 )1312 server.mocked_send(1313 Message("heartbeat", {"state": STATE_RUNNING, "current_cpu_usage": 50, "count": 1}, "fake_client1")1314 )1315 sleep(0.4)1316 self.assertEqual(2, len(master.clients.missing))1317 self.assertEqual(1, master.worker_count)1318 self.assertNotIn(1319 master.state, [STATE_STOPPED, STATE_STOPPING], "Not all workers went missing but test stopped anyway."1320 )1321 sleep(0.2)1322 self.assertEqual(3, len(master.clients.missing))1323 self.assertEqual(0, master.worker_count)1324 self.assertEqual(STATE_STOPPED, master.state, "All workers went missing but test didn't stop.")1325 def test_master_total_stats(self):1326 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1327 master = self.get_runner()1328 server.mocked_send(Message("client_ready", __version__, "fake_client"))1329 stats = RequestStats()1330 stats.log_request("GET", "/1", 100, 3546)1331 stats.log_request("GET", "/1", 800, 56743)1332 stats2 = RequestStats()1333 stats2.log_request("GET", "/2", 700, 2201)1334 server.mocked_send(1335 Message(1336 "stats",1337 {1338 "stats": stats.serialize_stats(),1339 "stats_total": stats.total.serialize(),1340 "errors": stats.serialize_errors(),1341 "user_count": 1,1342 },1343 "fake_client",1344 )1345 )1346 server.mocked_send(1347 Message(1348 "stats",1349 {1350 "stats": stats2.serialize_stats(),1351 "stats_total": stats2.total.serialize(),1352 "errors": stats2.serialize_errors(),1353 "user_count": 2,1354 },1355 "fake_client",1356 )1357 )1358 self.assertEqual(700, master.stats.total.median_response_time)1359 def test_master_total_stats_with_none_response_times(self):1360 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1361 master = self.get_runner()1362 server.mocked_send(Message("client_ready", __version__, "fake_client"))1363 stats = RequestStats()1364 stats.log_request("GET", "/1", 100, 3546)1365 stats.log_request("GET", "/1", 800, 56743)1366 stats.log_request("GET", "/1", None, 56743)1367 stats2 = RequestStats()1368 stats2.log_request("GET", "/2", 700, 2201)1369 stats2.log_request("GET", "/2", None, 2201)1370 stats3 = RequestStats()1371 stats3.log_request("GET", "/3", None, 2201)1372 server.mocked_send(1373 Message(1374 "stats",1375 {1376 "stats": stats.serialize_stats(),1377 "stats_total": stats.total.serialize(),1378 "errors": stats.serialize_errors(),1379 "user_count": 1,1380 },1381 "fake_client",1382 )1383 )1384 server.mocked_send(1385 Message(1386 "stats",1387 {1388 "stats": stats2.serialize_stats(),1389 "stats_total": stats2.total.serialize(),1390 "errors": stats2.serialize_errors(),1391 "user_count": 2,1392 },1393 "fake_client",1394 )1395 )1396 server.mocked_send(1397 Message(1398 "stats",1399 {1400 "stats": stats3.serialize_stats(),1401 "stats_total": stats3.total.serialize(),1402 "errors": stats3.serialize_errors(),1403 "user_count": 2,1404 },1405 "fake_client",1406 )1407 )1408 self.assertEqual(700, master.stats.total.median_response_time)1409 def test_master_current_response_times(self):1410 start_time = 11411 with mock.patch("time.time") as mocked_time:1412 mocked_time.return_value = start_time1413 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1414 master = self.get_runner()1415 self.environment.stats.reset_all()1416 mocked_time.return_value += 1.02341417 server.mocked_send(Message("client_ready", __version__, "fake_client"))1418 stats = RequestStats()1419 stats.log_request("GET", "/1", 100, 3546)1420 stats.log_request("GET", "/1", 800, 56743)1421 server.mocked_send(1422 Message(1423 "stats",1424 {1425 "stats": stats.serialize_stats(),1426 "stats_total": stats.total.get_stripped_report(),1427 "errors": stats.serialize_errors(),1428 "user_count": 1,1429 },1430 "fake_client",1431 )1432 )1433 mocked_time.return_value += 11434 stats2 = RequestStats()1435 stats2.log_request("GET", "/2", 400, 2201)1436 server.mocked_send(1437 Message(1438 "stats",1439 {1440 "stats": stats2.serialize_stats(),1441 "stats_total": stats2.total.get_stripped_report(),1442 "errors": stats2.serialize_errors(),1443 "user_count": 2,1444 },1445 "fake_client",1446 )1447 )1448 mocked_time.return_value += 41449 self.assertEqual(400, master.stats.total.get_current_response_time_percentile(0.5))1450 self.assertEqual(800, master.stats.total.get_current_response_time_percentile(0.95))1451 # let 10 second pass, do some more requests, send it to the master and make1452 # sure the current response time percentiles only accounts for these new requests1453 mocked_time.return_value += 10.100231454 stats.log_request("GET", "/1", 20, 1)1455 stats.log_request("GET", "/1", 30, 1)1456 stats.log_request("GET", "/1", 3000, 1)1457 server.mocked_send(1458 Message(1459 "stats",1460 {1461 "stats": stats.serialize_stats(),1462 "stats_total": stats.total.get_stripped_report(),1463 "errors": stats.serialize_errors(),1464 "user_count": 2,1465 },1466 "fake_client",1467 )1468 )1469 self.assertEqual(30, master.stats.total.get_current_response_time_percentile(0.5))1470 self.assertEqual(3000, master.stats.total.get_current_response_time_percentile(0.95))1471 @mock.patch("locust.runners.HEARTBEAT_INTERVAL", new=600)1472 def test_rebalance_locust_users_on_worker_connect(self):1473 class TestUser(User):1474 @task1475 def my_task(self):1476 pass1477 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1478 master = self.get_runner(user_classes=[TestUser])1479 server.mocked_send(Message("client_ready", __version__, "zeh_fake_client1"))1480 self.assertEqual(1, len(master.clients))1481 self.assertTrue(1482 "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"1483 )1484 master.start(100, 20)1485 self.assertEqual(5, len(server.outbox))1486 for i, (_, msg) in enumerate(server.outbox.copy()):1487 self.assertDictEqual({"TestUser": int((i + 1) * 20)}, msg.data["user_classes_count"])1488 server.outbox.pop()1489 # Normally, this attribute would be updated when the1490 # master receives the report from the worker.1491 master.clients["zeh_fake_client1"].user_classes_count = {"TestUser": 100}1492 # let another worker connect1493 server.mocked_send(Message("client_ready", __version__, "zeh_fake_client2"))1494 self.assertEqual(2, len(master.clients))1495 sleep(0.1) # give time for messages to be sent to clients1496 self.assertEqual(2, len(server.outbox))1497 client_id, msg = server.outbox.pop()1498 self.assertEqual({"TestUser": 50}, msg.data["user_classes_count"])1499 client_id, msg = server.outbox.pop()1500 self.assertEqual({"TestUser": 50}, msg.data["user_classes_count"])1501 def test_sends_spawn_data_to_ready_running_spawning_workers(self):1502 """Sends spawn job to running, ready, or spawning workers"""1503 class TestUser(User):1504 @task1505 def my_task(self):1506 pass1507 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1508 master = self.get_runner(user_classes=[TestUser])1509 master.clients[1] = WorkerNode("1")1510 master.clients[2] = WorkerNode("2")1511 master.clients[3] = WorkerNode("3")1512 master.clients[1].state = STATE_INIT1513 master.clients[2].state = STATE_SPAWNING1514 master.clients[3].state = STATE_RUNNING1515 master.start(user_count=5, spawn_rate=5)1516 self.assertEqual(3, len(server.outbox))1517 def test_start_event(self):1518 """1519 Tests that test_start event is fired1520 """1521 class TestUser(User):1522 @task1523 def my_task(self):1524 pass1525 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1526 master = self.get_runner(user_classes=[TestUser])1527 run_count = [0]1528 @self.environment.events.test_start.add_listener1529 def on_test_start(*a, **kw):1530 run_count[0] += 11531 for i in range(5):1532 server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))1533 master.start(7, 7)1534 self.assertEqual(5, len(server.outbox))1535 self.assertEqual(1, run_count[0])1536 # change number of users and check that test_start isn't fired again1537 master.start(7, 7)1538 self.assertEqual(1, run_count[0])1539 # stop and start to make sure test_start is fired again1540 master.stop()1541 master.start(3, 3)1542 self.assertEqual(2, run_count[0])1543 master.quit()1544 def test_stop_event(self):1545 """1546 Tests that test_stop event is fired1547 """1548 class TestUser(User):1549 @task1550 def my_task(self):1551 pass1552 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1553 master = self.get_runner(user_classes=[TestUser])1554 run_count = [0]1555 @self.environment.events.test_stop.add_listener1556 def on_test_stop(*a, **kw):1557 run_count[0] += 11558 for i in range(5):1559 server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))1560 master.start(7, 7)1561 self.assertEqual(5, len(server.outbox))1562 master.stop()1563 self.assertEqual(1, run_count[0])1564 run_count[0] = 01565 for i in range(5):1566 server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))1567 master.start(7, 7)1568 master.stop()1569 master.quit()1570 self.assertEqual(1, run_count[0])1571 def test_stop_event_quit(self):1572 """1573 Tests that test_stop event is fired when quit() is called directly1574 """1575 class TestUser(User):1576 @task1577 def my_task(self):1578 pass1579 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1580 master = self.get_runner(user_classes=[TestUser])1581 run_count = [0]1582 @self.environment.events.test_stop.add_listener1583 def on_test_stop(*a, **kw):1584 run_count[0] += 11585 for i in range(5):1586 server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))1587 master.start(7, 7)1588 self.assertEqual(5, len(server.outbox))1589 master.quit()1590 self.assertEqual(1, run_count[0])1591 def test_spawn_zero_locusts(self):1592 class MyTaskSet(TaskSet):1593 @task1594 def my_task(self):1595 pass1596 class MyTestUser(User):1597 tasks = [MyTaskSet]1598 wait_time = constant(0.1)1599 environment = Environment(user_classes=[MyTestUser])1600 runner = LocalRunner(environment)1601 timeout = gevent.Timeout(2.0)1602 timeout.start()1603 try:1604 runner.start(0, 1, wait=True)1605 runner.spawning_greenlet.join()1606 except gevent.Timeout:1607 self.fail("Got Timeout exception. A locust seems to have been spawned, even though 0 was specified.")1608 finally:1609 timeout.cancel()1610 def test_spawn_uneven_locusts(self):1611 """1612 Tests that we can accurately spawn a certain number of locusts, even if it's not an1613 even number of the connected workers1614 """1615 class TestUser(User):1616 @task1617 def my_task(self):1618 pass1619 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1620 master = self.get_runner(user_classes=[TestUser])1621 for i in range(5):1622 server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))1623 master.start(7, 7)1624 self.assertEqual(5, len(server.outbox))1625 num_users = sum(sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.data)1626 self.assertEqual(7, num_users, "Total number of locusts that would have been spawned is not 7")1627 def test_spawn_fewer_locusts_than_workers(self):1628 class TestUser(User):1629 @task1630 def my_task(self):1631 pass1632 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1633 master = self.get_runner(user_classes=[TestUser])1634 for i in range(5):1635 server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))1636 master.start(2, 2)1637 self.assertEqual(5, len(server.outbox))1638 num_users = sum(sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.data)1639 self.assertEqual(2, num_users, "Total number of locusts that would have been spawned is not 2")1640 def test_custom_shape_scale_up(self):1641 class MyUser(User):1642 @task1643 def my_task(self):1644 pass1645 class TestShape(LoadTestShape):1646 def tick(self):1647 run_time = self.get_run_time()1648 if run_time < 2:1649 return 1, 11650 elif run_time < 4:1651 return 2, 21652 else:1653 return None1654 self.environment.shape_class = TestShape()1655 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1656 master = self.get_runner(user_classes=[MyUser])1657 for i in range(5):1658 server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))1659 # Start the shape_worker1660 self.environment.shape_class.reset_time()1661 master.start_shape()1662 sleep(0.5)1663 # Wait for shape_worker to update user_count1664 num_users = sum(sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.data)1665 self.assertEqual(1666 1, num_users, "Total number of users in first stage of shape test is not 1: %i" % num_users1667 )1668 # Wait for shape_worker to update user_count again1669 sleep(2)1670 num_users = sum(sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.data)1671 self.assertEqual(1672 3, num_users, "Total number of users in second stage of shape test is not 3: %i" % num_users1673 )1674 # Wait to ensure shape_worker has stopped the test1675 sleep(3)1676 self.assertEqual("stopped", master.state, "The test has not been stopped by the shape class")1677 def test_custom_shape_scale_down(self):1678 class MyUser(User):1679 @task1680 def my_task(self):1681 pass1682 class TestShape(LoadTestShape):1683 def tick(self):1684 run_time = self.get_run_time()1685 if run_time < 2:1686 return 5, 51687 elif run_time < 4:1688 return 1, 51689 else:1690 return None1691 self.environment.shape_class = TestShape()1692 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1693 master = self.get_runner(user_classes=[MyUser])1694 for i in range(5):1695 server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i))1696 # Start the shape_worker1697 self.environment.shape_class.reset_time()1698 master.start_shape()1699 sleep(0.5)1700 # Wait for shape_worker to update user_count1701 num_users = sum(sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.data)1702 self.assertEqual(1703 5, num_users, "Total number of users in first stage of shape test is not 5: %i" % num_users1704 )1705 # Wait for shape_worker to update user_count again1706 sleep(2)1707 msgs = defaultdict(dict)1708 for _, msg in server.outbox:1709 if not msg.data:1710 continue1711 msgs[msg.node_id][msg.data["timestamp"]] = sum(msg.data["user_classes_count"].values())1712 # Count users for the last received messages1713 num_users = sum(v[max(v.keys())] for v in msgs.values())1714 self.assertEqual(1715 1, num_users, "Total number of users in second stage of shape test is not 1: %i" % num_users1716 )1717 # Wait to ensure shape_worker has stopped the test1718 sleep(3)1719 self.assertEqual("stopped", master.state, "The test has not been stopped by the shape class")1720 def test_exception_in_task(self):1721 class MyUser(User):1722 @task1723 def will_error(self):1724 raise HeyAnException(":(")1725 self.environment.user_classes = [MyUser]1726 runner = self.environment.create_local_runner()1727 l = MyUser(self.environment)1728 self.assertRaises(HeyAnException, l.run)1729 self.assertRaises(HeyAnException, l.run)1730 self.assertEqual(1, len(runner.exceptions))1731 hash_key, exception = runner.exceptions.popitem()1732 self.assertTrue("traceback" in exception)1733 self.assertTrue("HeyAnException" in exception["traceback"])1734 self.assertEqual(2, exception["count"])1735 def test_exception_is_caught(self):1736 """Test that exceptions are stored, and execution continues"""1737 class MyTaskSet(TaskSet):1738 def __init__(self, *a, **kw):1739 super().__init__(*a, **kw)1740 self._task_queue = [self.will_error, self.will_stop]1741 @task(1)1742 def will_error(self):1743 raise HeyAnException(":(")1744 @task(1)1745 def will_stop(self):1746 raise StopUser()1747 class MyUser(User):1748 wait_time = constant(0.01)1749 tasks = [MyTaskSet]1750 # set config to catch exceptions in locust users1751 self.environment.catch_exceptions = True1752 self.environment.user_classes = [MyUser]1753 runner = LocalRunner(self.environment)1754 l = MyUser(self.environment)1755 # make sure HeyAnException isn't raised1756 l.run()1757 l.run()1758 # make sure we got two entries in the error log1759 self.assertEqual(2, len(self.mocked_log.error))1760 # make sure exception was stored1761 self.assertEqual(1, len(runner.exceptions))1762 hash_key, exception = runner.exceptions.popitem()1763 self.assertTrue("traceback" in exception)1764 self.assertTrue("HeyAnException" in exception["traceback"])1765 self.assertEqual(2, exception["count"])1766 def test_master_reset_connection(self):1767 """Test that connection will be reset when network issues found"""1768 with mock.patch("locust.runners.FALLBACK_INTERVAL", new=0.1):1769 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1770 master = self.get_runner()1771 self.assertEqual(0, len(master.clients))1772 server.mocked_send(Message("client_ready", NETWORK_BROKEN, "fake_client"))1773 self.assertTrue(master.connection_broken)1774 server.mocked_send(Message("client_ready", __version__, "fake_client"))1775 sleep(0.2)1776 self.assertFalse(master.connection_broken)1777 self.assertEqual(1, len(master.clients))1778 master.quit()1779 def test_attributes_populated_when_calling_start(self):1780 class MyUser1(User):1781 @task1782 def my_task(self):1783 pass1784 class MyUser2(User):1785 @task1786 def my_task(self):1787 pass1788 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1789 master = self.get_runner(user_classes=[MyUser1, MyUser2])1790 server.mocked_send(Message("client_ready", __version__, "fake_client1"))1791 master.start(7, 7)1792 self.assertEqual({"MyUser1": 4, "MyUser2": 3}, master.target_user_classes_count)1793 self.assertEqual(7, master.target_user_count)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!