Best Python code snippet using locust
test_runners.py
Source:test_runners.py
...556 gevent.sleep(0.1)557 self.assertLessEqual(local_runner.user_count, 0)558 local_runner.stop()559 web_ui.stop()560 def test_target_user_count_is_set_before_ramp_up(self):561 """Test for https://github.com/locustio/locust/issues/1883"""562 class MyUser1(User):563 wait_time = constant(0)564 @task565 def my_task(self):566 pass567 environment = Environment(user_classes=[MyUser1])568 runner = LocalRunner(environment)569 test_start_event_fired = [False]570 @environment.events.test_start.add_listener571 def on_test_start(*args, **kwargs):572 test_start_event_fired[0] = True573 self.assertEqual(runner.target_user_count, 3)574 runner.start(user_count=3, spawn_rate=1, wait=False)575 gevent.sleep(1)576 self.assertEqual(runner.target_user_count, 3)577 self.assertEqual(runner.user_count, 1)578 # However, target_user_classes_count is only updated at the end of the ramp-up/ramp-down579 # due to the way it is implemented.580 self.assertDictEqual({}, runner.target_user_classes_count)581 runner.spawning_greenlet.join()582 self.assertEqual(runner.target_user_count, 3)583 self.assertEqual(runner.user_count, 3)584 self.assertDictEqual({"MyUser1": 3}, runner.target_user_classes_count)585 runner.quit()586 self.assertTrue(test_start_event_fired[0])587 def test_stop_users_count(self):588 user_count = 10589 class BaseUser1(User):590 wait_time = constant(1)591 @task592 def task_a(self):593 pass594 class BaseUser2(BaseUser1):595 wait_time = constant(1)596 runner = Environment(user_classes=[BaseUser1, BaseUser2]).create_local_runner()597 runner.start(user_count=user_count, spawn_rate=10)598 sleep(1)599 self.assertEqual(user_count, runner.user_count)600 runner.stop()601 sleep(1)602 self.assertEqual(0, runner.user_count)603class TestMasterWorkerRunners(LocustTestCase):604 def test_distributed_integration_run(self):605 """606 Full integration test that starts both a MasterRunner and three WorkerRunner instances607 and makes sure that their stats is sent to the Master.608 """609 class TestUser(User):610 wait_time = constant(0.1)611 @task612 def incr_stats(self):613 self.environment.events.request.fire(614 request_type="GET",615 name="/",616 response_time=1337,617 response_length=666,618 exception=None,619 context={},620 )621 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):622 # start a Master runner623 master_env = Environment(user_classes=[TestUser])624 master = master_env.create_master_runner("*", 0)625 sleep(0)626 # start 3 Worker runners627 workers = []628 for i in range(3):629 worker_env = Environment(user_classes=[TestUser])630 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)631 workers.append(worker)632 # give workers time to connect633 sleep(0.1)634 # issue start command that should trigger TestUsers to be spawned in the Workers635 master.start(6, spawn_rate=1000)636 sleep(0.1)637 # check that worker nodes have started locusts638 for worker in workers:639 self.assertEqual(2, worker.user_count)640 # give time for users to generate stats, and stats to be sent to master641 sleep(1)642 master.quit()643 # make sure users are killed644 for worker in workers:645 self.assertEqual(0, worker.user_count)646 # check that stats are present in master647 self.assertGreater(648 master_env.runner.stats.total.num_requests,649 20,650 "For some reason the master node's stats has not come in",651 )652 def test_distributed_rebalanced_integration_run(self):653 """654 Full integration test that starts both a MasterRunner and three WorkerRunner instances655 and makes sure that their stats is sent to the Master.656 """657 class TestUser(User):658 wait_time = constant(0.1)659 @task660 def incr_stats(self):661 self.environment.events.request.fire(662 request_type="GET",663 name="/",664 response_time=1337,665 response_length=666,666 exception=None,667 context={},668 )669 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(670 "LOCUST_WAIT_FOR_WORKERS_REPORT_AFTER_RAMP_UP", "0.1"671 ):672 # start a Master runner673 options = parse_options(["--enable-rebalancing"])674 master_env = Environment(user_classes=[TestUser], parsed_options=options)675 master = master_env.create_master_runner("*", 0)676 sleep(0)677 # start 3 Worker runners678 workers = []679 def add_worker():680 worker_env = Environment(user_classes=[TestUser])681 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)682 workers.append(worker)683 for i in range(3):684 add_worker()685 # give workers time to connect686 sleep(0.1)687 # issue start command that should trigger TestUsers to be spawned in the Workers688 master.start(6, spawn_rate=1000)689 sleep(0.1)690 # check that worker nodes have started locusts691 for worker in workers:692 self.assertEqual(2, worker.user_count)693 # give time for users to generate stats, and stats to be sent to master694 # Add 1 more workers (should be 4 now)695 add_worker()696 @retry(AssertionError, tries=10, delay=0.5)697 def check_rebalanced_true():698 for worker in workers:699 self.assertTrue(worker.user_count > 0)700 # Check that all workers have a user count > 0 at least701 check_rebalanced_true()702 # Add 2 more workers (should be 6 now)703 add_worker()704 add_worker()705 @retry(AssertionError, tries=10, delay=0.5)706 def check_rebalanced_equals():707 for worker in workers:708 self.assertEqual(1, worker.user_count)709 # Check that all workers have a user count = 1 now710 check_rebalanced_equals()711 # Simulate that some workers are missing by "killing" them abrutly712 for i in range(3):713 workers[i].greenlet.kill(block=True)714 @retry(AssertionError, tries=10, delay=1)715 def check_master_worker_missing_count():716 self.assertEqual(3, len(master.clients.missing))717 # Check that master detected the missing workers718 check_master_worker_missing_count()719 @retry(AssertionError, tries=10, delay=1)720 def check_remaing_worker_new_user_count():721 for i in range(3, 6):722 self.assertEqual(2, workers[i].user_count)723 # Check that remaining workers have a new count of user due to rebalancing.724 check_remaing_worker_new_user_count()725 sleep(1)726 # Finally quit and check states of remaining workers.727 master.quit()728 # make sure users are killed on remaining workers729 for i in range(3, 6):730 self.assertEqual(0, workers[i].user_count)731 # check that stats are present in master732 self.assertGreater(733 master_env.runner.stats.total.num_requests,734 20,735 "For some reason the master node's stats has not come in",736 )737 def test_distributed_run_with_custom_args(self):738 """739 Full integration test that starts both a MasterRunner and three WorkerRunner instances740 and makes sure that their stats is sent to the Master.741 """742 class TestUser(User):743 wait_time = constant(0.1)744 @task745 def incr_stats(self):746 self.environment.events.request.fire(747 request_type="GET",748 name=self.environment.parsed_options.my_str_argument,749 response_time=self.environment.parsed_options.my_int_argument,750 response_length=666,751 exception=None,752 context={},753 )754 @locust.events.init_command_line_parser.add_listener755 def _(parser, **kw):756 parser.add_argument("--my-int-argument", type=int)757 parser.add_argument("--my-str-argument", type=str, default="NOOOO")758 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):759 # start a Master runner760 master_env = Environment(user_classes=[TestUser])761 master = master_env.create_master_runner("*", 0)762 master_env.parsed_options = parse_options(763 [764 "--my-int-argument",765 "42",766 "--my-str-argument",767 "cool-string",768 ]769 )770 sleep(0)771 # start 3 Worker runners772 workers = []773 for i in range(3):774 worker_env = Environment(user_classes=[TestUser])775 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)776 workers.append(worker)777 # give workers time to connect778 sleep(0.1)779 # issue start command that should trigger TestUsers to be spawned in the Workers780 master.start(6, spawn_rate=1000)781 sleep(0.1)782 # check that worker nodes have started locusts783 for worker in workers:784 self.assertEqual(2, worker.user_count)785 # give time for users to generate stats, and stats to be sent to master786 sleep(1)787 master.quit()788 # make sure users are killed789 for worker in workers:790 self.assertEqual(0, worker.user_count)791 self.assertEqual(master_env.runner.stats.total.max_response_time, 42)792 self.assertEqual(master_env.runner.stats.get("cool-string", "GET").avg_response_time, 42)793 def test_test_stop_event(self):794 class TestUser(User):795 wait_time = constant(0.1)796 @task797 def my_task(l):798 pass799 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):800 # start a Master runner801 master_env = Environment(user_classes=[TestUser])802 test_stop_count = {"master": 0, "worker": 0}803 @master_env.events.test_stop.add_listener804 def _(*args, **kwargs):805 test_stop_count["master"] += 1806 master = master_env.create_master_runner("*", 0)807 sleep(0)808 # start a Worker runner809 worker_env = Environment(user_classes=[TestUser])810 @worker_env.events.test_stop.add_listener811 def _(*args, **kwargs):812 test_stop_count["worker"] += 1813 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)814 # give worker time to connect815 sleep(0.1)816 # issue start command that should trigger TestUsers to be spawned in the Workers817 master.start(2, spawn_rate=1000)818 sleep(0.1)819 # check that worker nodes have started locusts820 self.assertEqual(2, worker.user_count)821 # give time for users to generate stats, and stats to be sent to master822 sleep(0.1)823 master_env.events.quitting.fire(environment=master_env, reverse=True)824 master.quit()825 sleep(0.1)826 # make sure users are killed827 self.assertEqual(0, worker.user_count)828 # check the test_stop event was called one time in master and one time in worker829 self.assertEqual(830 1,831 test_stop_count["master"],832 "The test_stop event was not called exactly one time in the master node",833 )834 self.assertEqual(835 1,836 test_stop_count["worker"],837 "The test_stop event was not called exactly one time in the worker node",838 )839 def test_distributed_shape(self):840 """841 Full integration test that starts both a MasterRunner and three WorkerRunner instances842 and tests a basic LoadTestShape with scaling up and down users843 """844 class TestUser(User):845 @task846 def my_task(self):847 pass848 class TestShape(LoadTestShape):849 def tick(self):850 run_time = self.get_run_time()851 if run_time < 2:852 return 9, 9853 elif run_time < 4:854 return 21, 21855 elif run_time < 6:856 return 3, 21857 else:858 return None859 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):860 test_shape = TestShape()861 master_env = Environment(user_classes=[TestUser], shape_class=test_shape)862 master_env.shape_class.reset_time()863 master = master_env.create_master_runner("*", 0)864 workers = []865 for i in range(3):866 worker_env = Environment(user_classes=[TestUser])867 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)868 workers.append(worker)869 # Give workers time to connect870 sleep(0.1)871 # Start a shape test872 master.start_shape()873 sleep(1)874 # Ensure workers have connected and started the correct amount of users875 for worker in workers:876 self.assertEqual(3, worker.user_count, "Shape test has not reached stage 1")877 self.assertEqual(878 9, test_shape.get_current_user_count(), "Shape is not seeing stage 1 runner user count correctly"879 )880 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 9})881 # Ensure new stage with more users has been reached882 sleep(2)883 for worker in workers:884 self.assertEqual(7, worker.user_count, "Shape test has not reached stage 2")885 self.assertEqual(886 21, test_shape.get_current_user_count(), "Shape is not seeing stage 2 runner user count correctly"887 )888 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 21})889 # Ensure new stage with less users has been reached890 sleep(2)891 for worker in workers:892 self.assertEqual(1, worker.user_count, "Shape test has not reached stage 3")893 self.assertEqual(894 3, test_shape.get_current_user_count(), "Shape is not seeing stage 3 runner user count correctly"895 )896 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 3})897 # Ensure test stops at the end898 sleep(2)899 for worker in workers:900 self.assertEqual(0, worker.user_count, "Shape test has not stopped")901 self.assertEqual(902 0, test_shape.get_current_user_count(), "Shape is not seeing stopped runner user count correctly"903 )904 self.assertDictEqual(master.reported_user_classes_count, {"TestUser": 0})905 self.assertEqual("stopped", master.state)906 def test_distributed_shape_with_fixed_users(self):907 """908 Full integration test that starts both a MasterRunner and three WorkerRunner instances909 and tests a basic LoadTestShape with scaling up and down users with 'fixed count' users910 """911 class TestUser(User):912 @task913 def my_task(self):914 pass915 class FixedUser1(User):916 fixed_count = 1917 @task918 def my_task(self):919 pass920 class FixedUser2(User):921 fixed_count = 11922 @task923 def my_task(self):924 pass925 class TestShape(LoadTestShape):926 def tick(self):927 run_time = self.get_run_time()928 if run_time < 1:929 return 12, 12930 elif run_time < 2:931 return 36, 24932 elif run_time < 3:933 return 12, 24934 else:935 return None936 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):937 test_shape = TestShape()938 master_env = Environment(user_classes=[TestUser, FixedUser1, FixedUser2], shape_class=test_shape)939 master_env.shape_class.reset_time()940 master = master_env.create_master_runner("*", 0)941 workers = []942 for _ in range(3):943 worker_env = Environment(user_classes=[TestUser, FixedUser1, FixedUser2])944 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)945 workers.append(worker)946 # Give workers time to connect947 sleep(0.1)948 # Start a shape test949 master.start_shape()950 sleep(1)951 # Ensure workers have connected and started the correct amount of users (fixed is spawn first)952 for worker in workers:953 self.assertEqual(4, worker.user_count, "Shape test has not reached stage 1")954 self.assertEqual(955 12, test_shape.get_current_user_count(), "Shape is not seeing stage 1 runner user count correctly"956 )957 self.assertDictEqual(master.reported_user_classes_count, {"FixedUser1": 1, "FixedUser2": 11, "TestUser": 0})958 # Ensure new stage with more users has been reached959 sleep(1)960 for worker in workers:961 self.assertEqual(12, worker.user_count, "Shape test has not reached stage 2")962 self.assertEqual(963 36, test_shape.get_current_user_count(), "Shape is not seeing stage 2 runner user count correctly"964 )965 self.assertDictEqual(966 master.reported_user_classes_count, {"FixedUser1": 1, "FixedUser2": 11, "TestUser": 24}967 )968 # Ensure new stage with less users has been reached969 # and expected count of the fixed users is present970 sleep(1)971 for worker in workers:972 self.assertEqual(4, worker.user_count, "Shape test has not reached stage 3")973 self.assertEqual(974 12, test_shape.get_current_user_count(), "Shape is not seeing stage 3 runner user count correctly"975 )976 self.assertDictEqual(master.reported_user_classes_count, {"FixedUser1": 1, "FixedUser2": 11, "TestUser": 0})977 # Ensure test stops at the end978 sleep(0.5)979 for worker in workers:980 self.assertEqual(0, worker.user_count, "Shape test has not stopped")981 self.assertEqual(982 0, test_shape.get_current_user_count(), "Shape is not seeing stopped runner user count correctly"983 )984 self.assertDictEqual(master.reported_user_classes_count, {"FixedUser1": 0, "FixedUser2": 0, "TestUser": 0})985 try:986 with gevent.Timeout(3.0):987 while master.state != STATE_STOPPED:988 sleep(0.1)989 finally:990 self.assertEqual(STATE_STOPPED, master.state)991 def test_distributed_shape_with_stop_timeout(self):992 """993 Full integration test that starts both a MasterRunner and five WorkerRunner instances994 and tests a basic LoadTestShape with scaling up and down users995 """996 class TestUser1(User):997 def start(self, group: Group):998 gevent.sleep(0.5)999 return super().start(group)1000 @task1001 def my_task(self):1002 gevent.sleep(0)1003 class TestUser2(User):1004 def start(self, group: Group):1005 gevent.sleep(0.5)1006 return super().start(group)1007 @task1008 def my_task(self):1009 gevent.sleep(600)1010 class TestUser3(User):1011 def start(self, group: Group):1012 gevent.sleep(0.5)1013 return super().start(group)1014 @task1015 def my_task(self):1016 gevent.sleep(600)1017 class TestShape(LoadTestShape):1018 def tick(self):1019 run_time = self.get_run_time()1020 if run_time < 10:1021 return 15, 31022 elif run_time < 30:1023 return 5, 101024 else:1025 return None1026 locust_worker_additional_wait_before_ready_after_stop = 51027 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(1028 "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",1029 str(locust_worker_additional_wait_before_ready_after_stop),1030 ):1031 stop_timeout = 51032 master_env = Environment(1033 user_classes=[TestUser1, TestUser2, TestUser3], shape_class=TestShape(), stop_timeout=stop_timeout1034 )1035 master_env.shape_class.reset_time()1036 master = master_env.create_master_runner("*", 0)1037 workers = []1038 for i in range(5):1039 worker_env = Environment(user_classes=[TestUser1, TestUser2, TestUser3])1040 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1041 workers.append(worker)1042 # Give workers time to connect1043 sleep(0.1)1044 self.assertEqual(STATE_INIT, master.state)1045 self.assertEqual(5, len(master.clients.ready))1046 # Re-order `workers` so that it is sorted by `id`.1047 # This is required because the dispatch is done1048 # on the sorted workers.1049 workers = sorted(workers, key=lambda w: w.client_id)1050 # Start a shape test1051 master.start_shape()1052 # First stage1053 ts = time.time()1054 while master.state != STATE_SPAWNING:1055 self.assertTrue(time.time() - ts <= 1, master.state)1056 sleep()1057 sleep(5 - (time.time() - ts)) # runtime = 5s1058 ts = time.time()1059 while master.state != STATE_RUNNING:1060 self.assertTrue(time.time() - ts <= 1, master.state)1061 sleep()1062 self.assertEqual(STATE_RUNNING, master.state)1063 w1 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1064 w2 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1065 w3 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1066 w4 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1067 w5 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1068 self.assertDictEqual(w1, workers[0].user_classes_count)1069 self.assertDictEqual(w2, workers[1].user_classes_count)1070 self.assertDictEqual(w3, workers[2].user_classes_count)1071 self.assertDictEqual(w4, workers[3].user_classes_count)1072 self.assertDictEqual(w5, workers[4].user_classes_count)1073 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)1074 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)1075 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)1076 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)1077 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)1078 sleep(5 - (time.time() - ts)) # runtime = 10s1079 # Fourth stage1080 ts = time.time()1081 while master.state != STATE_SPAWNING:1082 self.assertTrue(time.time() - ts <= 1, master.state)1083 sleep()1084 sleep(5 - (time.time() - ts)) # runtime = 15s1085 # Fourth stage - Excess TestUser1 have been stopped but1086 # TestUser2/TestUser3 have not reached stop timeout yet, so1087 # their number are unchanged1088 ts = time.time()1089 while master.state != STATE_RUNNING:1090 self.assertTrue(time.time() - ts <= 1, master.state)1091 sleep()1092 delta = time.time() - ts1093 w1 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1094 w2 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}1095 w3 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}1096 w4 = {"TestUser1": 1, "TestUser2": 1, "TestUser3": 1}1097 w5 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 1}1098 self.assertDictEqual(w1, workers[0].user_classes_count)1099 self.assertDictEqual(w2, workers[1].user_classes_count)1100 self.assertDictEqual(w3, workers[2].user_classes_count)1101 self.assertDictEqual(w4, workers[3].user_classes_count)1102 self.assertDictEqual(w5, workers[4].user_classes_count)1103 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)1104 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)1105 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)1106 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)1107 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)1108 sleep(1 - delta) # runtime = 16s1109 # Fourth stage - All users are now at the desired number1110 ts = time.time()1111 while master.state != STATE_RUNNING:1112 self.assertTrue(time.time() - ts <= 1, master.state)1113 sleep()1114 delta = time.time() - ts1115 w1 = {"TestUser1": 1, "TestUser2": 0, "TestUser3": 0}1116 w2 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 0}1117 w3 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 1}1118 w4 = {"TestUser1": 1, "TestUser2": 0, "TestUser3": 0}1119 w5 = {"TestUser1": 0, "TestUser2": 1, "TestUser3": 0}1120 self.assertDictEqual(w1, workers[0].user_classes_count)1121 self.assertDictEqual(w2, workers[1].user_classes_count)1122 self.assertDictEqual(w3, workers[2].user_classes_count)1123 self.assertDictEqual(w4, workers[3].user_classes_count)1124 self.assertDictEqual(w5, workers[4].user_classes_count)1125 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)1126 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)1127 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)1128 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)1129 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)1130 sleep(10 - delta) # runtime = 26s1131 # Sleep stop_timeout and make sure the test has stopped1132 sleep(5) # runtime = 31s1133 self.assertEqual(STATE_STOPPING, master.state)1134 sleep(stop_timeout) # runtime = 36s1135 # We wait for "stop_timeout" seconds to let the workers reconnect as "ready" with the master.1136 # The reason for waiting an additional "stop_timeout" when we already waited for "stop_timeout"1137 # above is that when a worker receives the stop message, it can take up to "stop_timeout"1138 # for the worker to send the "client_stopped" message then an additional "stop_timeout" seconds1139 # to send the "client_ready" message.1140 ts = time.time()1141 while len(master.clients.ready) != len(workers):1142 self.assertTrue(1143 time.time() - ts <= stop_timeout + locust_worker_additional_wait_before_ready_after_stop,1144 f"expected {len(workers)} workers to be ready but only {len(master.clients.ready)} workers are",1145 )1146 sleep()1147 sleep(1)1148 # Check that no users are running1149 w1 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1150 w2 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1151 w3 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1152 w4 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1153 w5 = {"TestUser1": 0, "TestUser2": 0, "TestUser3": 0}1154 self.assertDictEqual(w1, workers[0].user_classes_count)1155 self.assertDictEqual(w2, workers[1].user_classes_count)1156 self.assertDictEqual(w3, workers[2].user_classes_count)1157 self.assertDictEqual(w4, workers[3].user_classes_count)1158 self.assertDictEqual(w5, workers[4].user_classes_count)1159 self.assertDictEqual(w1, master.clients[workers[0].client_id].user_classes_count)1160 self.assertDictEqual(w2, master.clients[workers[1].client_id].user_classes_count)1161 self.assertDictEqual(w3, master.clients[workers[2].client_id].user_classes_count)1162 self.assertDictEqual(w4, master.clients[workers[3].client_id].user_classes_count)1163 self.assertDictEqual(w5, master.clients[workers[4].client_id].user_classes_count)1164 ts = time.time()1165 while master.state != STATE_STOPPED:1166 self.assertTrue(time.time() - ts <= 5, master.state)1167 sleep()1168 master.stop()1169 @unittest.skip(reason="takes a lot of time and has randomness to it")1170 def test_distributed_shape_fuzzy_test(self):1171 """1172 Incredibility useful test to find issues with dispatch logic. This test allowed to find1173 multiple small corner cases with the new dispatch logic of locust v2.1174 The test is disabled by default because it takes a lot of time to run and has randomness to it.1175 However, it is advised to run it a few times (you can run it in parallel) when modifying the dispatch logic.1176 """1177 class BaseUser(User):1178 @task1179 def my_task(self):1180 gevent.sleep(600)1181 class TestUser01(BaseUser):1182 pass1183 class TestUser02(BaseUser):1184 pass1185 class TestUser03(BaseUser):1186 pass1187 class TestUser04(BaseUser):1188 pass1189 class TestUser05(BaseUser):1190 pass1191 class TestUser06(BaseUser):1192 pass1193 class TestUser07(BaseUser):1194 pass1195 class TestUser08(BaseUser):1196 pass1197 class TestUser09(BaseUser):1198 pass1199 class TestUser10(BaseUser):1200 pass1201 class TestUser11(BaseUser):1202 pass1203 class TestUser12(BaseUser):1204 pass1205 class TestUser13(BaseUser):1206 pass1207 class TestUser14(BaseUser):1208 pass1209 class TestUser15(BaseUser):1210 pass1211 class TestShape(LoadTestShape):1212 def __init__(self):1213 super().__init__()1214 self.stages = []1215 runtime = 01216 for _ in range(100):1217 runtime += random.uniform(3, 15)1218 self.stages.append((runtime, random.randint(1, 100), random.uniform(0.1, 10)))1219 def tick(self):1220 run_time = self.get_run_time()1221 for stage in self.stages:1222 if run_time < stage[0]:1223 return stage[1], stage[2]1224 user_classes = [1225 TestUser01,1226 TestUser02,1227 TestUser03,1228 TestUser04,1229 TestUser05,1230 TestUser06,1231 TestUser07,1232 TestUser08,1233 TestUser09,1234 TestUser10,1235 TestUser11,1236 TestUser12,1237 TestUser13,1238 TestUser14,1239 TestUser15,1240 ]1241 chosen_user_classes = random.sample(user_classes, k=random.randint(1, len(user_classes)))1242 for user_class in chosen_user_classes:1243 user_class.weight = random.uniform(1, 20)1244 locust_worker_additional_wait_before_ready_after_stop = 51245 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(1246 "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",1247 str(locust_worker_additional_wait_before_ready_after_stop),1248 ):1249 stop_timeout = 51250 master_env = Environment(1251 user_classes=chosen_user_classes, shape_class=TestShape(), stop_timeout=stop_timeout1252 )1253 master_env.shape_class.reset_time()1254 master = master_env.create_master_runner("*", 0)1255 workers = []1256 for i in range(random.randint(1, 30)):1257 worker_env = Environment(user_classes=chosen_user_classes)1258 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1259 workers.append(worker)1260 # Give workers time to connect1261 sleep(0.1)1262 self.assertEqual(STATE_INIT, master.state)1263 self.assertEqual(len(workers), len(master.clients.ready))1264 # Start a shape test1265 master.start_shape()1266 ts = time.time()1267 while master.state != STATE_STOPPED:1268 self.assertTrue(time.time() - ts <= master_env.shape_class.stages[-1][0] + 60, master.state)1269 print(1270 "{:.2f}/{:.2f} | {} | {:.0f} | ".format(1271 time.time() - ts,1272 master_env.shape_class.stages[-1][0],1273 master.state,1274 sum(master.reported_user_classes_count.values()),1275 )1276 + json.dumps(dict(sorted(master.reported_user_classes_count.items(), key=itemgetter(0))))1277 )1278 sleep(1)1279 master.stop()1280 def test_distributed_shape_stop_and_restart(self):1281 """1282 Test stopping and then restarting a LoadTestShape1283 """1284 class TestUser(User):1285 @task1286 def my_task(self):1287 pass1288 class TestShape(LoadTestShape):1289 def tick(self):1290 run_time = self.get_run_time()1291 if run_time < 10:1292 return 4, 41293 else:1294 return None1295 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1296 master_env = Environment(user_classes=[TestUser], shape_class=TestShape())1297 master_env.shape_class.reset_time()1298 master = master_env.create_master_runner("*", 0)1299 workers = []1300 for i in range(2):1301 worker_env = Environment(user_classes=[TestUser])1302 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1303 workers.append(worker)1304 # Give workers time to connect1305 sleep(0.1)1306 # Start a shape test and ensure workers have connected and started the correct amount of users1307 master.start_shape()1308 sleep(1)1309 for worker in workers:1310 self.assertEqual(2, worker.user_count, "Shape test has not started correctly")1311 # Stop the test and ensure all user count is 01312 master.stop()1313 sleep(1)1314 for worker in workers:1315 self.assertEqual(0, worker.user_count, "Shape test has not stopped")1316 # Then restart the test again and ensure workers have connected and started the correct amount of users1317 master.start_shape()1318 sleep(1)1319 for worker in workers:1320 self.assertEqual(2, worker.user_count, "Shape test has not started again correctly")1321 master.stop()1322 def test_distributed_stop_with_stopping_state(self):1323 """1324 Test stopping state when workers have stopped and now ready for a next test1325 """1326 class TestUser(User):1327 @task1328 def my_task(self):1329 pass1330 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1331 master_env = Environment(user_classes=[TestUser])1332 master = master_env.create_master_runner("*", 0)1333 workers = []1334 for i in range(3):1335 worker_env = Environment(user_classes=[TestUser])1336 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1337 workers.append(worker)1338 for worker in workers:1339 worker.send_message("client_stopped", None)1340 sleep(1)1341 for worker in workers:1342 self.assertEqual(STATE_INIT, worker.state, "Worker sent a client_stopped, should be ready once stopped")1343 self.assertEqual(STATE_STOPPED, master.state)1344 def test_distributed_shape_statuses_transition(self):1345 """1346 Full integration test that starts both a MasterRunner and five WorkerRunner instances1347 The goal of this test is to validate the status on the master is correctly transitioned for each of the1348 test phases.1349 """1350 class TestUser1(User):1351 @task1352 def my_task(self):1353 gevent.sleep(600)1354 class TestShape(LoadTestShape):1355 def tick(self):1356 run_time = self.get_run_time()1357 if run_time < 5:1358 return 5, 2.51359 elif run_time < 10:1360 return 10, 2.51361 elif run_time < 15:1362 return 15, 2.51363 else:1364 return None1365 locust_worker_additional_wait_before_ready_after_stop = 21366 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3), patch_env(1367 "LOCUST_WORKER_ADDITIONAL_WAIT_BEFORE_READY_AFTER_STOP",1368 str(locust_worker_additional_wait_before_ready_after_stop),1369 ):1370 stop_timeout = 01371 master_env = Environment(user_classes=[TestUser1], shape_class=TestShape(), stop_timeout=stop_timeout)1372 master_env.shape_class.reset_time()1373 master = master_env.create_master_runner("*", 0)1374 workers = []1375 for i in range(5):1376 worker_env = Environment(user_classes=[TestUser1])1377 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1378 workers.append(worker)1379 # Give workers time to connect1380 sleep(0.1)1381 self.assertEqual(STATE_INIT, master.state)1382 self.assertEqual(5, len(master.clients.ready))1383 statuses = []1384 ts = time.perf_counter()1385 master.start_shape()1386 while master.state != STATE_STOPPED:1387 # +5s buffer to let master stop1388 self.assertTrue(1389 time.perf_counter() - ts <= 30 + locust_worker_additional_wait_before_ready_after_stop + 5,1390 master.state,1391 )1392 statuses.append((time.perf_counter() - ts, master.state, master.user_count))1393 sleep(0.1)1394 self.assertEqual(statuses[0][1], STATE_INIT)1395 stage = 11396 tolerance = 1 # in s1397 for (t1, state1, user_count1), (t2, state2, user_count2) in zip(statuses[:-1], statuses[1:]):1398 if state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 1:1399 self.assertTrue(2.5 - tolerance <= t2 <= 2.5 + tolerance)1400 elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 1:1401 self.assertTrue(5 - tolerance <= t2 <= 5 + tolerance)1402 stage += 11403 elif state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 2:1404 self.assertTrue(7.5 - tolerance <= t2 <= 7.5 + tolerance)1405 elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 2:1406 self.assertTrue(10 - tolerance <= t2 <= 10 + tolerance)1407 stage += 11408 elif state1 == STATE_SPAWNING and state2 == STATE_RUNNING and stage == 3:1409 self.assertTrue(12.5 - tolerance <= t2 <= 12.5 + tolerance)1410 elif state1 == STATE_RUNNING and state2 == STATE_SPAWNING and stage == 3:1411 self.assertTrue(15 - tolerance <= t2 <= 15 + tolerance)1412 stage += 11413 elif state1 == STATE_RUNNING and state2 == STATE_STOPPED and stage == 3:1414 self.assertTrue(15 - tolerance <= t2 <= 15 + tolerance)1415 def test_swarm_endpoint_is_non_blocking(self):1416 class TestUser1(User):1417 @task1418 def my_task(self):1419 gevent.sleep(600)1420 class TestUser2(User):1421 @task1422 def my_task(self):1423 gevent.sleep(600)1424 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1425 stop_timeout = 01426 master_env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)1427 master = master_env.create_master_runner("*", 0)1428 web_ui = master_env.create_web_ui("127.0.0.1", 0)1429 workers = []1430 for i in range(2):1431 worker_env = Environment(user_classes=[TestUser1, TestUser2])1432 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1433 workers.append(worker)1434 # Give workers time to connect1435 sleep(0.1)1436 self.assertEqual(STATE_INIT, master.state)1437 self.assertEqual(len(master.clients.ready), len(workers))1438 ts = time.perf_counter()1439 response = requests.post(1440 f"http://127.0.0.1:{web_ui.server.server_port}/swarm",1441 data={"user_count": 20, "spawn_rate": 5, "host": "https://localhost"},1442 )1443 self.assertEqual(200, response.status_code)1444 self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")1445 ts = time.perf_counter()1446 while master.state != STATE_RUNNING:1447 self.assertTrue(time.perf_counter() - ts <= 4, master.state)1448 gevent.sleep(0.1)1449 self.assertTrue(3 <= time.perf_counter() - ts <= 5)1450 self.assertEqual(master.user_count, 20)1451 master.stop()1452 web_ui.stop()1453 def test_can_call_stop_endpoint_if_currently_swarming(self):1454 class TestUser1(User):1455 @task1456 def my_task(self):1457 gevent.sleep(600)1458 class TestUser2(User):1459 @task1460 def my_task(self):1461 gevent.sleep(600)1462 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1463 stop_timeout = 51464 master_env = Environment(user_classes=[TestUser1, TestUser2], stop_timeout=stop_timeout)1465 master = master_env.create_master_runner("*", 0)1466 web_ui = master_env.create_web_ui("127.0.0.1", 0)1467 workers = []1468 for i in range(2):1469 worker_env = Environment(user_classes=[TestUser1, TestUser2])1470 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)1471 workers.append(worker)1472 # Give workers time to connect1473 sleep(0.1)1474 self.assertEqual(STATE_INIT, master.state)1475 self.assertEqual(len(master.clients.ready), len(workers))1476 ts = time.perf_counter()1477 response = requests.post(1478 f"http://127.0.0.1:{web_ui.server.server_port}/swarm",1479 data={"user_count": 20, "spawn_rate": 1, "host": "https://localhost"},1480 )1481 self.assertEqual(200, response.status_code)1482 self.assertTrue(0 <= time.perf_counter() - ts <= 1, "swarm endpoint is blocking")1483 gevent.sleep(5)1484 self.assertEqual(master.state, STATE_SPAWNING)1485 self.assertLessEqual(master.user_count, 10)1486 ts = time.perf_counter()1487 response = requests.get(1488 f"http://127.0.0.1:{web_ui.server.server_port}/stop",1489 )1490 self.assertEqual(200, response.status_code)1491 self.assertTrue(stop_timeout <= time.perf_counter() - ts <= stop_timeout + 5, "stop endpoint took too long")1492 ts = time.perf_counter()1493 while master.state != STATE_STOPPED:1494 self.assertTrue(time.perf_counter() - ts <= 2)1495 gevent.sleep(0.1)1496 self.assertLessEqual(master.user_count, 0)1497 master.stop()1498 web_ui.stop()1499 def test_target_user_count_is_set_before_ramp_up(self):1500 """Test for https://github.com/locustio/locust/issues/1883"""1501 class MyUser1(User):1502 wait_time = constant(0)1503 @task1504 def my_task(self):1505 pass1506 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):1507 # start a Master runner1508 master_env = Environment(user_classes=[MyUser1])1509 master = master_env.create_master_runner("*", 0)1510 test_start_event_fired = [False]1511 @master_env.events.test_start.add_listener1512 def on_test_start(*args, **kwargs):1513 test_start_event_fired[0] = True...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!