Best Python code snippet using locust
test_runners.py
Source:test_runners.py
...220 self.assertEqual(0, test_stop_run[0])221 runner.stop()222 runner.quit()223 self.assertEqual(1, test_stop_run[0])224 def test_change_user_count_during_spawning(self):225 class MyUser(User):226 wait_time = constant(1)227 @task228 def my_task(self):229 pass230 environment = Environment(user_classes=[MyUser])231 runner = LocalRunner(environment)232 runner.start(user_count=10, spawn_rate=5, wait=False)233 sleep(0.6)234 runner.start(user_count=5, spawn_rate=5, wait=False)235 runner.spawning_greenlet.join()236 self.assertEqual(5, len(runner.user_greenlets))237 runner.quit()238 def test_reset_stats(self):239 class MyUser(User):240 @task241 class task_set(TaskSet):242 @task243 def my_task(self):244 self.user.environment.events.request.fire(245 request_type="GET",246 name="/test",247 response_time=666,248 response_length=1337,249 exception=None,250 context={},251 )252 sleep(2)253 environment = Environment(user_classes=[MyUser], reset_stats=True)254 runner = LocalRunner(environment)255 runner.start(user_count=6, spawn_rate=12, wait=False)256 sleep(0.25)257 self.assertGreaterEqual(runner.stats.get("/test", "GET").num_requests, 3)258 sleep(0.3)259 self.assertLessEqual(runner.stats.get("/test", "GET").num_requests, 1)260 runner.quit()261 def test_no_reset_stats(self):262 class MyUser(User):263 @task264 class task_set(TaskSet):265 @task266 def my_task(self):267 self.user.environment.events.request.fire(268 request_type="GET",269 name="/test",270 response_time=666,271 response_length=1337,272 exception=None,273 context={},274 )275 sleep(2)276 environment = Environment(reset_stats=False, user_classes=[MyUser])277 runner = LocalRunner(environment)278 runner.start(user_count=6, spawn_rate=12, wait=False)279 sleep(0.25)280 self.assertGreaterEqual(runner.stats.get("/test", "GET").num_requests, 3)281 sleep(0.3)282 self.assertEqual(6, runner.stats.get("/test", "GET").num_requests)283 runner.quit()284 def test_runner_reference_on_environment(self):285 env = Environment()286 runner = env.create_local_runner()287 self.assertEqual(env, runner.environment)288 self.assertEqual(runner, env.runner)289 def test_users_can_call_runner_quit_without_deadlocking(self):290 class BaseUser(User):291 stop_triggered = False292 @task293 def trigger(self):294 self.environment.runner.quit()295 def on_stop(self):296 BaseUser.stop_triggered = True297 runner = Environment(user_classes=[BaseUser]).create_local_runner()298 runner.spawn_users(1, 1, wait=False)299 timeout = gevent.Timeout(0.5)300 timeout.start()301 try:302 runner.greenlet.join()303 except gevent.Timeout:304 self.fail("Got Timeout exception, runner must have hung somehow.")305 finally:306 timeout.cancel()307 self.assertTrue(BaseUser.stop_triggered)308 def test_runner_quit_can_run_on_stop_for_multiple_users_concurrently(self):309 class BaseUser(User):310 stop_count = 0311 @task312 def trigger(self):313 pass314 def on_stop(self):315 gevent.sleep(0.1)316 BaseUser.stop_count += 1317 runner = Environment(user_classes=[BaseUser]).create_local_runner()318 runner.spawn_users(10, 10, wait=False)319 timeout = gevent.Timeout(0.3)320 timeout.start()321 try:322 runner.quit()323 except gevent.Timeout:324 self.fail("Got Timeout exception, runner must have hung somehow.")325 finally:326 timeout.cancel()327 self.assertEqual(10, BaseUser.stop_count) # verify that all users executed on_stop328 def test_stop_users_with_spawn_rate(self):329 class MyUser(User):330 wait_time = constant(1)331 @task332 def my_task(self):333 pass334 environment = Environment(user_classes=[MyUser])335 runner = LocalRunner(environment)336 # Start load test, wait for users to start, then trigger ramp down337 runner.start(10, 10, wait=False)338 sleep(1)339 runner.start(2, 4, wait=False)340 # Wait a moment and then ensure the user count has started to drop but341 # not immediately to user_count342 sleep(1)343 user_count = len(runner.user_greenlets)344 self.assertTrue(user_count > 5, "User count has decreased too quickly: %i" % user_count)345 self.assertTrue(user_count < 10, "User count has not decreased at all: %i" % user_count)346 # Wait and ensure load test users eventually dropped to desired count347 sleep(2)348 user_count = len(runner.user_greenlets)349 self.assertTrue(user_count == 2, "User count has not decreased correctly to 2, it is : %i" % user_count)350class TestMasterWorkerRunners(LocustTestCase):351 def test_distributed_integration_run(self):352 """353 Full integration test that starts both a MasterRunner and three WorkerRunner instances354 and makes sure that their stats is sent to the Master.355 """356 class TestUser(User):357 wait_time = constant(0.1)358 @task359 def incr_stats(l):360 l.environment.events.request.fire(361 request_type="GET",362 name="/",363 response_time=1337,364 response_length=666,365 exception=None,366 context={},367 )368 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):369 # start a Master runner370 master_env = Environment(user_classes=[TestUser])371 master = master_env.create_master_runner("*", 0)372 sleep(0)373 # start 3 Worker runners374 workers = []375 for i in range(3):376 worker_env = Environment(user_classes=[TestUser])377 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)378 workers.append(worker)379 # give workers time to connect380 sleep(0.1)381 # issue start command that should trigger TestUsers to be spawned in the Workers382 master.start(6, spawn_rate=1000)383 sleep(0.1)384 # check that worker nodes have started locusts385 for worker in workers:386 self.assertEqual(2, worker.user_count)387 # give time for users to generate stats, and stats to be sent to master388 sleep(1)389 master.quit()390 # make sure users are killed391 for worker in workers:392 self.assertEqual(0, worker.user_count)393 # check that stats are present in master394 self.assertGreater(395 master_env.runner.stats.total.num_requests,396 20,397 "For some reason the master node's stats has not come in",398 )399 def test_test_stop_event(self):400 class TestUser(User):401 wait_time = constant(0.1)402 @task403 def my_task(l):404 pass405 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):406 # start a Master runner407 master_env = Environment(user_classes=[TestUser])408 test_stop_count = {"master": 0, "worker": 0}409 @master_env.events.test_stop.add_listener410 def _(*args, **kwargs):411 test_stop_count["master"] += 1412 master = master_env.create_master_runner("*", 0)413 sleep(0)414 # start a Worker runner415 worker_env = Environment(user_classes=[TestUser])416 @worker_env.events.test_stop.add_listener417 def _(*args, **kwargs):418 test_stop_count["worker"] += 1419 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)420 # give worker time to connect421 sleep(0.1)422 # issue start command that should trigger TestUsers to be spawned in the Workers423 master.start(2, spawn_rate=1000)424 sleep(0.1)425 # check that worker nodes have started locusts426 self.assertEqual(2, worker.user_count)427 # give time for users to generate stats, and stats to be sent to master428 sleep(0.1)429 master_env.events.quitting.fire(environment=master_env, reverse=True)430 master.quit()431 sleep(0.1)432 # make sure users are killed433 self.assertEqual(0, worker.user_count)434 # check the test_stop event was called one time in master and zero times in workder435 self.assertEqual(436 1,437 test_stop_count["master"],438 "The test_stop event was not called exactly one time in the master node",439 )440 self.assertEqual(441 0,442 test_stop_count["worker"],443 "The test_stop event was called in the worker node",444 )445 def test_distributed_shape(self):446 """447 Full integration test that starts both a MasterRunner and three WorkerRunner instances448 and tests a basic LoadTestShape with scaling up and down users449 """450 class TestUser(User):451 @task452 def my_task(self):453 pass454 class TestShape(LoadTestShape):455 def tick(self):456 run_time = self.get_run_time()457 if run_time < 2:458 return (9, 9)459 elif run_time < 4:460 return (21, 21)461 elif run_time < 6:462 return (3, 21)463 else:464 return None465 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):466 test_shape = TestShape()467 master_env = Environment(user_classes=[TestUser], shape_class=test_shape)468 master_env.shape_class.reset_time()469 master = master_env.create_master_runner("*", 0)470 workers = []471 for i in range(3):472 worker_env = Environment(user_classes=[TestUser])473 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)474 workers.append(worker)475 # Give workers time to connect476 sleep(0.1)477 # Start a shape test478 master.start_shape()479 sleep(1)480 # Ensure workers have connected and started the correct amounf of users481 for worker in workers:482 self.assertEqual(3, worker.user_count, "Shape test has not reached stage 1")483 self.assertEqual(484 9, test_shape.get_current_user_count(), "Shape is not seeing stage 1 runner user count correctly"485 )486 # Ensure new stage with more users has been reached487 sleep(2)488 for worker in workers:489 self.assertEqual(7, worker.user_count, "Shape test has not reached stage 2")490 self.assertEqual(491 21, test_shape.get_current_user_count(), "Shape is not seeing stage 2 runner user count correctly"492 )493 # Ensure new stage with less users has been reached494 sleep(2)495 for worker in workers:496 self.assertEqual(1, worker.user_count, "Shape test has not reached stage 3")497 self.assertEqual(498 3, test_shape.get_current_user_count(), "Shape is not seeing stage 3 runner user count correctly"499 )500 # Ensure test stops at the end501 sleep(2)502 for worker in workers:503 self.assertEqual(0, worker.user_count, "Shape test has not stopped")504 self.assertEqual(505 0, test_shape.get_current_user_count(), "Shape is not seeing stopped runner user count correctly"506 )507 def test_distributed_shape_stop_and_restart(self):508 """509 Test stopping and then restarting a LoadTestShape510 """511 class TestUser(User):512 @task513 def my_task(self):514 pass515 class TestShape(LoadTestShape):516 def tick(self):517 run_time = self.get_run_time()518 if run_time < 10:519 return (4, 4)520 else:521 return None522 with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):523 master_env = Environment(user_classes=[TestUser], shape_class=TestShape())524 master_env.shape_class.reset_time()525 master = master_env.create_master_runner("*", 0)526 workers = []527 for i in range(2):528 worker_env = Environment(user_classes=[TestUser])529 worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)530 workers.append(worker)531 # Give workers time to connect532 sleep(0.1)533 # Start a shape test and ensure workers have connected and started the correct amounf of users534 master.start_shape()535 sleep(1)536 for worker in workers:537 self.assertEqual(2, worker.user_count, "Shape test has not started correctly")538 # Stop the test and ensure all user count is 0539 master.stop()540 sleep(1)541 for worker in workers:542 self.assertEqual(0, worker.user_count, "Shape test has not stopped")543 # Then restart the test again and ensure workers have connected and started the correct amounf of users544 master.start_shape()545 sleep(1)546 for worker in workers:547 self.assertEqual(2, worker.user_count, "Shape test has not started again correctly")548 master.stop()549class TestMasterRunner(LocustTestCase):550 def setUp(self):551 super().setUp()552 self.environment = Environment(events=locust.events, catch_exceptions=False)553 def tearDown(self):554 super().tearDown()555 def get_runner(self):556 return self.environment.create_master_runner("*", 5557)557 def test_worker_connect(self):558 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:559 master = self.get_runner()560 server.mocked_send(Message("client_ready", None, "zeh_fake_client1"))561 self.assertEqual(1, len(master.clients))562 self.assertTrue(563 "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"564 )565 server.mocked_send(Message("client_ready", None, "zeh_fake_client2"))566 server.mocked_send(Message("client_ready", None, "zeh_fake_client3"))567 server.mocked_send(Message("client_ready", None, "zeh_fake_client4"))568 self.assertEqual(4, len(master.clients))569 server.mocked_send(Message("quit", None, "zeh_fake_client3"))570 self.assertEqual(3, len(master.clients))571 def test_worker_stats_report_median(self):572 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:573 master = self.get_runner()574 server.mocked_send(Message("client_ready", None, "fake_client"))575 master.stats.get("/", "GET").log(100, 23455)576 master.stats.get("/", "GET").log(800, 23455)577 master.stats.get("/", "GET").log(700, 23455)578 data = {"user_count": 1}579 self.environment.events.report_to_master.fire(client_id="fake_client", data=data)580 master.stats.clear_all()581 server.mocked_send(Message("stats", data, "fake_client"))582 s = master.stats.get("/", "GET")583 self.assertEqual(700, s.median_response_time)584 def test_worker_stats_report_with_none_response_times(self):585 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:586 master = self.get_runner()587 server.mocked_send(Message("client_ready", None, "fake_client"))588 master.stats.get("/mixed", "GET").log(0, 23455)589 master.stats.get("/mixed", "GET").log(800, 23455)590 master.stats.get("/mixed", "GET").log(700, 23455)591 master.stats.get("/mixed", "GET").log(None, 23455)592 master.stats.get("/mixed", "GET").log(None, 23455)593 master.stats.get("/mixed", "GET").log(None, 23455)594 master.stats.get("/mixed", "GET").log(None, 23455)595 master.stats.get("/onlyNone", "GET").log(None, 23455)596 data = {"user_count": 1}597 self.environment.events.report_to_master.fire(client_id="fake_client", data=data)598 master.stats.clear_all()599 server.mocked_send(Message("stats", data, "fake_client"))600 s1 = master.stats.get("/mixed", "GET")601 self.assertEqual(700, s1.median_response_time)602 self.assertEqual(500, s1.avg_response_time)603 s2 = master.stats.get("/onlyNone", "GET")604 self.assertEqual(0, s2.median_response_time)605 self.assertEqual(0, s2.avg_response_time)606 def test_master_marks_downed_workers_as_missing(self):607 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:608 master = self.get_runner()609 server.mocked_send(Message("client_ready", None, "fake_client"))610 sleep(6)611 # print(master.clients['fake_client'].__dict__)612 assert master.clients["fake_client"].state == STATE_MISSING613 def test_last_worker_quitting_stops_test(self):614 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:615 master = self.get_runner()616 server.mocked_send(Message("client_ready", None, "fake_client1"))617 server.mocked_send(Message("client_ready", None, "fake_client2"))618 master.start(1, 2)619 server.mocked_send(Message("spawning", None, "fake_client1"))620 server.mocked_send(Message("spawning", None, "fake_client2"))621 server.mocked_send(Message("quit", None, "fake_client1"))622 sleep(0)623 self.assertEqual(1, len(master.clients.all))624 self.assertNotEqual(STATE_STOPPED, master.state, "Not all workers quit but test stopped anyway.")625 server.mocked_send(Message("quit", None, "fake_client2"))626 sleep(0)627 self.assertEqual(0, len(master.clients.all))628 self.assertEqual(STATE_STOPPED, master.state, "All workers quit but test didn't stop.")629 @mock.patch("locust.runners.HEARTBEAT_INTERVAL", new=0.1)630 def test_last_worker_missing_stops_test(self):631 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:632 master = self.get_runner()633 server.mocked_send(Message("client_ready", None, "fake_client1"))634 server.mocked_send(Message("client_ready", None, "fake_client2"))635 server.mocked_send(Message("client_ready", None, "fake_client3"))636 master.start(3, 3)637 server.mocked_send(Message("spawning", None, "fake_client1"))638 server.mocked_send(Message("spawning", None, "fake_client2"))639 server.mocked_send(Message("spawning", None, "fake_client3"))640 sleep(0.2)641 server.mocked_send(642 Message("heartbeat", {"state": STATE_RUNNING, "current_cpu_usage": 50, "count": 1}, "fake_client1")643 )644 server.mocked_send(645 Message("heartbeat", {"state": STATE_RUNNING, "current_cpu_usage": 50, "count": 1}, "fake_client2")646 )647 server.mocked_send(648 Message("heartbeat", {"state": STATE_RUNNING, "current_cpu_usage": 50, "count": 1}, "fake_client3")649 )650 sleep(0.2)651 self.assertEqual(0, len(master.clients.missing))652 self.assertEqual(3, master.worker_count)653 self.assertNotIn(654 master.state, [STATE_STOPPED, STATE_STOPPING], "Not all workers went missing but test stopped anyway."655 )656 server.mocked_send(657 Message("heartbeat", {"state": STATE_RUNNING, "current_cpu_usage": 50, "count": 1}, "fake_client1")658 )659 sleep(0.4)660 self.assertEqual(2, len(master.clients.missing))661 self.assertEqual(1, master.worker_count)662 self.assertNotIn(663 master.state, [STATE_STOPPED, STATE_STOPPING], "Not all workers went missing but test stopped anyway."664 )665 sleep(0.2)666 self.assertEqual(3, len(master.clients.missing))667 self.assertEqual(0, master.worker_count)668 self.assertEqual(STATE_STOPPED, master.state, "All workers went missing but test didn't stop.")669 def test_master_total_stats(self):670 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:671 master = self.get_runner()672 server.mocked_send(Message("client_ready", None, "fake_client"))673 stats = RequestStats()674 stats.log_request("GET", "/1", 100, 3546)675 stats.log_request("GET", "/1", 800, 56743)676 stats2 = RequestStats()677 stats2.log_request("GET", "/2", 700, 2201)678 server.mocked_send(679 Message(680 "stats",681 {682 "stats": stats.serialize_stats(),683 "stats_total": stats.total.serialize(),684 "errors": stats.serialize_errors(),685 "user_count": 1,686 },687 "fake_client",688 )689 )690 server.mocked_send(691 Message(692 "stats",693 {694 "stats": stats2.serialize_stats(),695 "stats_total": stats2.total.serialize(),696 "errors": stats2.serialize_errors(),697 "user_count": 2,698 },699 "fake_client",700 )701 )702 self.assertEqual(700, master.stats.total.median_response_time)703 def test_master_total_stats_with_none_response_times(self):704 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:705 master = self.get_runner()706 server.mocked_send(Message("client_ready", None, "fake_client"))707 stats = RequestStats()708 stats.log_request("GET", "/1", 100, 3546)709 stats.log_request("GET", "/1", 800, 56743)710 stats.log_request("GET", "/1", None, 56743)711 stats2 = RequestStats()712 stats2.log_request("GET", "/2", 700, 2201)713 stats2.log_request("GET", "/2", None, 2201)714 stats3 = RequestStats()715 stats3.log_request("GET", "/3", None, 2201)716 server.mocked_send(717 Message(718 "stats",719 {720 "stats": stats.serialize_stats(),721 "stats_total": stats.total.serialize(),722 "errors": stats.serialize_errors(),723 "user_count": 1,724 },725 "fake_client",726 )727 )728 server.mocked_send(729 Message(730 "stats",731 {732 "stats": stats2.serialize_stats(),733 "stats_total": stats2.total.serialize(),734 "errors": stats2.serialize_errors(),735 "user_count": 2,736 },737 "fake_client",738 )739 )740 server.mocked_send(741 Message(742 "stats",743 {744 "stats": stats3.serialize_stats(),745 "stats_total": stats3.total.serialize(),746 "errors": stats3.serialize_errors(),747 "user_count": 2,748 },749 "fake_client",750 )751 )752 self.assertEqual(700, master.stats.total.median_response_time)753 def test_master_current_response_times(self):754 start_time = 1755 with mock.patch("time.time") as mocked_time:756 mocked_time.return_value = start_time757 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:758 master = self.get_runner()759 self.environment.stats.reset_all()760 mocked_time.return_value += 1.0234761 server.mocked_send(Message("client_ready", None, "fake_client"))762 stats = RequestStats()763 stats.log_request("GET", "/1", 100, 3546)764 stats.log_request("GET", "/1", 800, 56743)765 server.mocked_send(766 Message(767 "stats",768 {769 "stats": stats.serialize_stats(),770 "stats_total": stats.total.get_stripped_report(),771 "errors": stats.serialize_errors(),772 "user_count": 1,773 },774 "fake_client",775 )776 )777 mocked_time.return_value += 1778 stats2 = RequestStats()779 stats2.log_request("GET", "/2", 400, 2201)780 server.mocked_send(781 Message(782 "stats",783 {784 "stats": stats2.serialize_stats(),785 "stats_total": stats2.total.get_stripped_report(),786 "errors": stats2.serialize_errors(),787 "user_count": 2,788 },789 "fake_client",790 )791 )792 mocked_time.return_value += 4793 self.assertEqual(400, master.stats.total.get_current_response_time_percentile(0.5))794 self.assertEqual(800, master.stats.total.get_current_response_time_percentile(0.95))795 # let 10 second pass, do some more requests, send it to the master and make796 # sure the current response time percentiles only accounts for these new requests797 mocked_time.return_value += 10.10023798 stats.log_request("GET", "/1", 20, 1)799 stats.log_request("GET", "/1", 30, 1)800 stats.log_request("GET", "/1", 3000, 1)801 server.mocked_send(802 Message(803 "stats",804 {805 "stats": stats.serialize_stats(),806 "stats_total": stats.total.get_stripped_report(),807 "errors": stats.serialize_errors(),808 "user_count": 2,809 },810 "fake_client",811 )812 )813 self.assertEqual(30, master.stats.total.get_current_response_time_percentile(0.5))814 self.assertEqual(3000, master.stats.total.get_current_response_time_percentile(0.95))815 def test_rebalance_locust_users_on_worker_connect(self):816 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:817 master = self.get_runner()818 server.mocked_send(Message("client_ready", None, "zeh_fake_client1"))819 self.assertEqual(1, len(master.clients))820 self.assertTrue(821 "zeh_fake_client1" in master.clients, "Could not find fake client in master instance's clients dict"822 )823 master.start(100, 20)824 self.assertEqual(1, len(server.outbox))825 client_id, msg = server.outbox.pop()826 self.assertEqual(100, msg.data["num_users"])827 self.assertEqual(20, msg.data["spawn_rate"])828 # let another worker connect829 server.mocked_send(Message("client_ready", None, "zeh_fake_client2"))830 self.assertEqual(2, len(master.clients))831 self.assertEqual(2, len(server.outbox))832 client_id, msg = server.outbox.pop()833 self.assertEqual(50, msg.data["num_users"])834 self.assertEqual(10, msg.data["spawn_rate"])835 client_id, msg = server.outbox.pop()836 self.assertEqual(50, msg.data["num_users"])837 self.assertEqual(10, msg.data["spawn_rate"])838 def test_sends_spawn_data_to_ready_running_spawning_workers(self):839 """Sends spawn job to running, ready, or spawning workers"""840 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:841 master = self.get_runner()842 master.clients[1] = WorkerNode(1)843 master.clients[2] = WorkerNode(2)844 master.clients[3] = WorkerNode(3)845 master.clients[1].state = STATE_INIT846 master.clients[2].state = STATE_SPAWNING847 master.clients[3].state = STATE_RUNNING848 master.start(user_count=5, spawn_rate=5)849 self.assertEqual(3, len(server.outbox))850 def test_start_event(self):851 """852 Tests that test_start event is fired853 """854 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:855 master = self.get_runner()856 run_count = [0]857 @self.environment.events.test_start.add_listener858 def on_test_start(*a, **kw):859 run_count[0] += 1860 for i in range(5):861 server.mocked_send(Message("client_ready", None, "fake_client%i" % i))862 master.start(7, 7)863 self.assertEqual(5, len(server.outbox))864 self.assertEqual(1, run_count[0])865 # change number of users and check that test_start isn't fired again866 master.start(7, 7)867 self.assertEqual(1, run_count[0])868 # stop and start to make sure test_start is fired again869 master.stop()870 master.start(3, 3)871 self.assertEqual(2, run_count[0])872 master.quit()873 def test_stop_event(self):874 """875 Tests that test_stop event is fired876 """877 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:878 master = self.get_runner()879 run_count = [0]880 @self.environment.events.test_stop.add_listener881 def on_test_stop(*a, **kw):882 run_count[0] += 1883 for i in range(5):884 server.mocked_send(Message("client_ready", None, "fake_client%i" % i))885 master.start(7, 7)886 self.assertEqual(5, len(server.outbox))887 master.stop()888 self.assertEqual(1, run_count[0])889 run_count[0] = 0890 for i in range(5):891 server.mocked_send(Message("client_ready", None, "fake_client%i" % i))892 master.start(7, 7)893 master.stop()894 master.quit()895 self.assertEqual(1, run_count[0])896 def test_stop_event_quit(self):897 """898 Tests that test_stop event is fired when quit() is called directly899 """900 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:901 master = self.get_runner()902 run_count = [0]903 @self.environment.events.test_stop.add_listener904 def on_test_stop(*a, **kw):905 run_count[0] += 1906 for i in range(5):907 server.mocked_send(Message("client_ready", None, "fake_client%i" % i))908 master.start(7, 7)909 self.assertEqual(5, len(server.outbox))910 master.quit()911 self.assertEqual(1, run_count[0])912 def test_spawn_zero_locusts(self):913 class MyTaskSet(TaskSet):914 @task915 def my_task(self):916 pass917 class MyTestUser(User):918 tasks = [MyTaskSet]919 wait_time = constant(0.1)920 environment = Environment(user_classes=[MyTestUser])921 runner = LocalRunner(environment)922 timeout = gevent.Timeout(2.0)923 timeout.start()924 try:925 runner.start(0, 1, wait=True)926 runner.spawning_greenlet.join()927 except gevent.Timeout:928 self.fail("Got Timeout exception. A locust seems to have been spawned, even though 0 was specified.")929 finally:930 timeout.cancel()931 def test_spawn_uneven_locusts(self):932 """933 Tests that we can accurately spawn a certain number of locusts, even if it's not an934 even number of the connected workers935 """936 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:937 master = self.get_runner()938 for i in range(5):939 server.mocked_send(Message("client_ready", None, "fake_client%i" % i))940 master.start(7, 7)941 self.assertEqual(5, len(server.outbox))942 num_users = 0943 for _, msg in server.outbox:944 num_users += msg.data["num_users"]945 self.assertEqual(7, num_users, "Total number of locusts that would have been spawned is not 7")946 def test_spawn_fewer_locusts_than_workers(self):947 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:948 master = self.get_runner()949 for i in range(5):950 server.mocked_send(Message("client_ready", None, "fake_client%i" % i))951 master.start(2, 2)952 self.assertEqual(5, len(server.outbox))953 num_users = 0954 for _, msg in server.outbox:955 num_users += msg.data["num_users"]956 self.assertEqual(2, num_users, "Total number of locusts that would have been spawned is not 2")957 def test_custom_shape_scale_up(self):958 class MyUser(User):959 @task960 def my_task(self):961 pass962 class TestShape(LoadTestShape):963 def tick(self):964 run_time = self.get_run_time()965 if run_time < 2:966 return (1, 1)967 elif run_time < 4:968 return (2, 2)969 else:970 return None971 self.environment.user_classes = [MyUser]972 self.environment.shape_class = TestShape()973 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:974 master = self.get_runner()975 for i in range(5):976 server.mocked_send(Message("client_ready", None, "fake_client%i" % i))977 # Start the shape_worker978 self.environment.shape_class.reset_time()979 master.start_shape()980 sleep(0.5)981 # Wait for shape_worker to update user_count982 num_users = 0983 for _, msg in server.outbox:984 if msg.data:985 num_users += msg.data["num_users"]986 self.assertEqual(987 1, num_users, "Total number of users in first stage of shape test is not 1: %i" % num_users988 )989 # Wait for shape_worker to update user_count again990 sleep(2)991 num_users = 0992 for _, msg in server.outbox:993 if msg.data:994 num_users += msg.data["num_users"]995 self.assertEqual(996 3, num_users, "Total number of users in second stage of shape test is not 3: %i" % num_users997 )998 # Wait to ensure shape_worker has stopped the test999 sleep(3)1000 self.assertEqual("stopped", master.state, "The test has not been stopped by the shape class")1001 def test_custom_shape_scale_down(self):1002 class MyUser(User):1003 @task1004 def my_task(self):1005 pass1006 class TestShape(LoadTestShape):1007 def tick(self):1008 run_time = self.get_run_time()1009 if run_time < 2:1010 return (5, 5)1011 elif run_time < 4:1012 return (-4, 4)1013 else:1014 return None1015 self.environment.user_classes = [MyUser]1016 self.environment.shape_class = TestShape()1017 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1018 master = self.get_runner()1019 for i in range(5):1020 server.mocked_send(Message("client_ready", None, "fake_client%i" % i))1021 # Start the shape_worker1022 self.environment.shape_class.reset_time()1023 master.start_shape()1024 sleep(0.5)1025 # Wait for shape_worker to update user_count1026 num_users = 01027 for _, msg in server.outbox:1028 if msg.data:1029 num_users += msg.data["num_users"]1030 self.assertEqual(1031 5, num_users, "Total number of users in first stage of shape test is not 5: %i" % num_users1032 )1033 # Wait for shape_worker to update user_count again1034 sleep(2)1035 num_users = 01036 for _, msg in server.outbox:1037 if msg.data:1038 num_users += msg.data["num_users"]1039 self.assertEqual(1040 1, num_users, "Total number of users in second stage of shape test is not 1: %i" % num_users1041 )1042 # Wait to ensure shape_worker has stopped the test1043 sleep(3)1044 self.assertEqual("stopped", master.state, "The test has not been stopped by the shape class")1045 def test_exception_in_task(self):1046 class MyUser(User):1047 @task1048 def will_error(self):1049 raise HeyAnException(":(")1050 self.environment.user_classes = [MyUser]1051 runner = self.environment.create_local_runner()1052 l = MyUser(self.environment)1053 self.assertRaises(HeyAnException, l.run)1054 self.assertRaises(HeyAnException, l.run)1055 self.assertEqual(1, len(runner.exceptions))1056 hash_key, exception = runner.exceptions.popitem()1057 self.assertTrue("traceback" in exception)1058 self.assertTrue("HeyAnException" in exception["traceback"])1059 self.assertEqual(2, exception["count"])1060 def test_exception_is_caught(self):1061 """Test that exceptions are stored, and execution continues"""1062 class MyTaskSet(TaskSet):1063 def __init__(self, *a, **kw):1064 super().__init__(*a, **kw)1065 self._task_queue = [self.will_error, self.will_stop]1066 @task(1)1067 def will_error(self):1068 raise HeyAnException(":(")1069 @task(1)1070 def will_stop(self):1071 raise StopUser()1072 class MyUser(User):1073 wait_time = constant(0.01)1074 tasks = [MyTaskSet]1075 # set config to catch exceptions in locust users1076 self.environment.catch_exceptions = True1077 self.environment.user_classes = [MyUser]1078 runner = LocalRunner(self.environment)1079 l = MyUser(self.environment)1080 # make sure HeyAnException isn't raised1081 l.run()1082 l.run()1083 # make sure we got two entries in the error log1084 self.assertEqual(2, len(self.mocked_log.error))1085 # make sure exception was stored1086 self.assertEqual(1, len(runner.exceptions))1087 hash_key, exception = runner.exceptions.popitem()1088 self.assertTrue("traceback" in exception)1089 self.assertTrue("HeyAnException" in exception["traceback"])1090 self.assertEqual(2, exception["count"])1091 def test_master_reset_connection(self):1092 """Test that connection will be reset when network issues found"""1093 with mock.patch("locust.runners.FALLBACK_INTERVAL", new=0.1):1094 with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server:1095 master = self.get_runner()1096 self.assertEqual(0, len(master.clients))1097 server.mocked_send(Message("client_ready", NETWORK_BROKEN, "fake_client"))1098 self.assertTrue(master.connection_broken)1099 server.mocked_send(Message("client_ready", None, "fake_client"))1100 sleep(0.2)1101 self.assertFalse(master.connection_broken)1102 self.assertEqual(1, len(master.clients))1103 master.quit()1104class TestWorkerRunner(LocustTestCase):1105 def setUp(self):1106 super().setUp()1107 # self._report_to_master_event_handlers = [h for h in events.report_to_master._handlers]1108 def tearDown(self):1109 # events.report_to_master._handlers = self._report_to_master_event_handlers1110 super().tearDown()1111 def get_runner(self, environment=None, user_classes=[]):1112 if environment is None:1113 environment = self.environment1114 environment.user_classes = user_classes1115 return WorkerRunner(environment, master_host="localhost", master_port=5557)1116 def test_worker_stop_timeout(self):1117 class MyTestUser(User):1118 _test_state = 01119 @task1120 def the_task(self):1121 MyTestUser._test_state = 11122 gevent.sleep(0.2)1123 MyTestUser._test_state = 21124 with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:1125 environment = Environment()1126 test_start_run = [False]1127 @environment.events.test_start.add_listener1128 def on_test_start(_environment, **kw):1129 test_start_run[0] = True1130 worker = self.get_runner(environment=environment, user_classes=[MyTestUser])1131 self.assertEqual(1, len(client.outbox))1132 self.assertEqual("client_ready", client.outbox[0].type)1133 client.mocked_send(1134 Message(1135 "spawn",1136 {1137 "spawn_rate": 1,1138 "num_users": 1,1139 "host": "",1140 "stop_timeout": 1,1141 },1142 "dummy_client_id",1143 )1144 )1145 # print("outbox:", client.outbox)1146 # wait for worker to spawn locusts1147 self.assertIn("spawning", [m.type for m in client.outbox])1148 worker.spawning_greenlet.join()1149 self.assertEqual(1, len(worker.user_greenlets))1150 # check that locust has started running1151 gevent.sleep(0.01)1152 self.assertEqual(1, MyTestUser._test_state)1153 # send stop message1154 client.mocked_send(Message("stop", None, "dummy_client_id"))1155 worker.user_greenlets.join()1156 # check that locust user got to finish1157 self.assertEqual(2, MyTestUser._test_state)1158 # make sure the test_start was never fired on the worker1159 self.assertFalse(test_start_run[0])1160 def test_worker_without_stop_timeout(self):1161 class MyTestUser(User):1162 _test_state = 01163 @task1164 def the_task(self):1165 MyTestUser._test_state = 11166 gevent.sleep(0.2)1167 MyTestUser._test_state = 21168 with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:1169 environment = Environment(stop_timeout=None)1170 worker = self.get_runner(environment=environment, user_classes=[MyTestUser])1171 self.assertEqual(1, len(client.outbox))1172 self.assertEqual("client_ready", client.outbox[0].type)1173 client.mocked_send(1174 Message(1175 "spawn",1176 {1177 "spawn_rate": 1,1178 "num_users": 1,1179 "host": "",1180 "stop_timeout": None,1181 },1182 "dummy_client_id",1183 )1184 )1185 # print("outbox:", client.outbox)1186 # wait for worker to spawn locusts1187 self.assertIn("spawning", [m.type for m in client.outbox])1188 worker.spawning_greenlet.join()1189 self.assertEqual(1, len(worker.user_greenlets))1190 # check that locust has started running1191 gevent.sleep(0.01)1192 self.assertEqual(1, MyTestUser._test_state)1193 # send stop message1194 client.mocked_send(Message("stop", None, "dummy_client_id"))1195 worker.user_greenlets.join()1196 # check that locust user did not get to finish1197 self.assertEqual(1, MyTestUser._test_state)1198 def test_change_user_count_during_spawning(self):1199 class MyUser(User):1200 wait_time = constant(1)1201 @task1202 def my_task(self):1203 pass1204 with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:1205 environment = Environment()1206 worker = self.get_runner(environment=environment, user_classes=[MyUser])1207 client.mocked_send(1208 Message(1209 "spawn",1210 {1211 "spawn_rate": 5,1212 "num_users": 10,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!