Best Python code snippet using locust
runners.py
Source:runners.py
...336 :param environment: Environment instance337 """338 super().__init__(environment)339 # register listener thats logs the exception for the local runner340 def on_user_error(user_instance, exception, tb):341 formatted_tb = "".join(traceback.format_tb(tb))342 self.log_exception("local", str(exception), formatted_tb)343 self.environment.events.user_error.add_listener(on_user_error)344 def start(self, user_count, spawn_rate, wait=False):345 self.target_user_count = user_count346 if spawn_rate > 100:347 logger.warning(348 "Your selected spawn rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"349 )350 if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:351 # if we're not already running we'll fire the test_start event352 self.environment.events.test_start.fire(environment=self.environment)353 if self.spawning_greenlet:354 # kill existing spawning_greenlet before we start a new one355 self.spawning_greenlet.kill(block=True)356 self.spawning_greenlet = self.greenlet.spawn(357 lambda: super(LocalRunner, self).start(user_count, spawn_rate, wait=wait)358 )359 self.spawning_greenlet.link_exception(greenlet_exception_handler)360 def stop(self):361 if self.state == STATE_STOPPED:362 return363 super().stop()364 self.environment.events.test_stop.fire(environment=self.environment)365class DistributedRunner(Runner):366 def __init__(self, *args, **kwargs):367 super().__init__(*args, **kwargs)368 setup_distributed_stats_event_listeners(self.environment.events, self.stats)369class WorkerNode:370 def __init__(self, id, state=STATE_INIT, heartbeat_liveness=HEARTBEAT_LIVENESS):371 self.id = id372 self.state = state373 self.user_count = 0374 self.heartbeat = heartbeat_liveness375 self.cpu_usage = 0376 self.cpu_warning_emitted = False377class MasterRunner(DistributedRunner):378 """379 Runner used to run distributed load tests across multiple processes and/or machines.380 MasterRunner doesn't spawn any user greenlets itself. Instead it expects381 :class:`WorkerRunners <WorkerRunner>` to connect to it, which it will then direct382 to start and stop user greenlets. Stats sent back from the383 :class:`WorkerRunners <WorkerRunner>` will aggregated.384 """385 def __init__(self, environment, master_bind_host, master_bind_port):386 """387 :param environment: Environment instance388 :param master_bind_host: Host/interface to use for incoming worker connections389 :param master_bind_port: Port to use for incoming worker connections390 """391 super().__init__(environment)392 self.worker_cpu_warning_emitted = False393 self.master_bind_host = master_bind_host394 self.master_bind_port = master_bind_port395 class WorkerNodesDict(dict):396 def get_by_state(self, state):397 return [c for c in self.values() if c.state == state]398 @property399 def all(self):400 return self.values()401 @property402 def ready(self):403 return self.get_by_state(STATE_INIT)404 @property405 def spawning(self):406 return self.get_by_state(STATE_SPAWNING)407 @property408 def running(self):409 return self.get_by_state(STATE_RUNNING)410 @property411 def missing(self):412 return self.get_by_state(STATE_MISSING)413 self.clients = WorkerNodesDict()414 try:415 self.server = rpc.Server(master_bind_host, master_bind_port)416 except RPCError as e:417 if e.args[0] == "Socket bind failure: Address already in use":418 port_string = (419 master_bind_host + ":" + str(master_bind_port) if master_bind_host != "*" else str(master_bind_port)420 )421 logger.error(422 f"The Locust master port ({port_string}) was busy. Close any applications using that port - perhaps an old instance of Locust master is still running? ({e.args[0]})"423 )424 sys.exit(1)425 else:426 raise427 self.greenlet.spawn(self.heartbeat_worker).link_exception(greenlet_exception_handler)428 self.greenlet.spawn(self.client_listener).link_exception(greenlet_exception_handler)429 # listener that gathers info on how many users the worker has spawned430 def on_worker_report(client_id, data):431 if client_id not in self.clients:432 logger.info("Discarded report from unrecognized worker %s", client_id)433 return434 self.clients[client_id].user_count = data["user_count"]435 self.environment.events.worker_report.add_listener(on_worker_report)436 # register listener that sends quit message to worker nodes437 def on_quitting(environment, **kw):438 self.quit()439 self.environment.events.quitting.add_listener(on_quitting)440 @property441 def user_count(self):442 return sum([c.user_count for c in self.clients.values()])443 def cpu_log_warning(self):444 warning_emitted = Runner.cpu_log_warning(self)445 if self.worker_cpu_warning_emitted:446 logger.warning("CPU usage threshold was exceeded on workers during the test!")447 warning_emitted = True448 return warning_emitted449 def start(self, user_count, spawn_rate):450 self.target_user_count = user_count451 num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning)452 if not num_workers:453 logger.warning(454 "You are running in distributed mode but have no worker servers connected. "455 "Please connect workers prior to swarming."456 )457 return458 self.spawn_rate = spawn_rate459 worker_num_users = user_count // (num_workers or 1)460 worker_spawn_rate = float(spawn_rate) / (num_workers or 1)461 remaining = user_count % num_workers462 logger.info(463 "Sending spawn jobs of %d users and %.2f spawn rate to %d ready clients"464 % (worker_num_users, worker_spawn_rate, num_workers)465 )466 if worker_spawn_rate > 100:467 logger.warning(468 "Your selected spawn rate is very high (>100/worker), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"469 )470 if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:471 self.stats.clear_all()472 self.exceptions = {}473 self.environment.events.test_start.fire(environment=self.environment)474 if self.environment.shape_class:475 self.environment.shape_class.reset_time()476 for client in self.clients.ready + self.clients.running + self.clients.spawning:477 data = {478 "spawn_rate": worker_spawn_rate,479 "num_users": worker_num_users,480 "host": self.environment.host,481 "stop_timeout": self.environment.stop_timeout,482 }483 if remaining > 0:484 data["num_users"] += 1485 remaining -= 1486 logger.debug("Sending spawn message to client %s" % (client.id))487 self.server.send_to_client(Message("spawn", data, client.id))488 self.update_state(STATE_SPAWNING)489 def stop(self):490 if self.state not in [STATE_INIT, STATE_STOPPED, STATE_STOPPING]:491 logger.debug("Stopping...")492 self.update_state(STATE_STOPPING)493 if self.environment.shape_class:494 self.shape_last_state = None495 for client in self.clients.all:496 logger.debug("Sending stop message to client %s" % (client.id))497 self.server.send_to_client(Message("stop", None, client.id))498 self.environment.events.test_stop.fire(environment=self.environment)499 def quit(self):500 self.stop()501 logger.debug("Quitting...")502 for client in self.clients.all:503 logger.debug("Sending quit message to client %s" % (client.id))504 self.server.send_to_client(Message("quit", None, client.id))505 gevent.sleep(0.5) # wait for final stats report from all workers506 self.greenlet.kill(block=True)507 def check_stopped(self):508 if (509 not self.state == STATE_INIT510 and not self.state == STATE_STOPPED511 and all(map(lambda x: x.state not in (STATE_RUNNING, STATE_SPAWNING, STATE_INIT), self.clients.all))512 ):513 self.update_state(STATE_STOPPED)514 def heartbeat_worker(self):515 while True:516 gevent.sleep(HEARTBEAT_INTERVAL)517 if self.connection_broken:518 self.reset_connection()519 continue520 for client in self.clients.all:521 if client.heartbeat < 0 and client.state != STATE_MISSING:522 logger.info("Worker %s failed to send heartbeat, setting state to missing." % str(client.id))523 client.state = STATE_MISSING524 client.user_count = 0525 if self.worker_count <= 0:526 logger.info("The last worker went missing, stopping test.")527 self.stop()528 self.check_stopped()529 else:530 client.heartbeat -= 1531 def reset_connection(self):532 logger.info("Reset connection to worker")533 try:534 self.server.close()535 self.server = rpc.Server(self.master_bind_host, self.master_bind_port)536 except RPCError as e:537 logger.error("Temporary failure when resetting connection: %s, will retry later." % (e))538 def client_listener(self):539 while True:540 try:541 client_id, msg = self.server.recv_from_client()542 except RPCError as e:543 logger.error("RPCError found when receiving from client: %s" % (e))544 self.connection_broken = True545 gevent.sleep(FALLBACK_INTERVAL)546 continue547 self.connection_broken = False548 msg.node_id = client_id549 if msg.type == "client_ready":550 id = msg.node_id551 self.clients[id] = WorkerNode(id, heartbeat_liveness=HEARTBEAT_LIVENESS)552 logger.info(553 "Client %r reported as ready. Currently %i clients ready to swarm."554 % (id, len(self.clients.ready + self.clients.running + self.clients.spawning))555 )556 if self.state == STATE_RUNNING or self.state == STATE_SPAWNING:557 # balance the load distribution when new client joins558 self.start(self.target_user_count, self.spawn_rate)559 # emit a warning if the worker's clock seem to be out of sync with our clock560 # if abs(time() - msg.data["time"]) > 5.0:561 # warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")562 elif msg.type == "client_stopped":563 del self.clients[msg.node_id]564 logger.info("Removing %s client from running clients" % (msg.node_id))565 elif msg.type == "heartbeat":566 if msg.node_id in self.clients:567 c = self.clients[msg.node_id]568 c.heartbeat = HEARTBEAT_LIVENESS569 client_state = msg.data["state"]570 if c.state == STATE_MISSING:571 logger.info(572 "Worker %s self-healed with heartbeat, setting state to %s." % (str(c.id), client_state)573 )574 user_count = msg.data.get("count")575 if user_count:576 c.user_count = user_count577 c.state = client_state578 c.cpu_usage = msg.data["current_cpu_usage"]579 if not c.cpu_warning_emitted and c.cpu_usage > 90:580 self.worker_cpu_warning_emitted = True # used to fail the test in the end581 c.cpu_warning_emitted = True # used to suppress logging for this node582 logger.warning(583 "Worker %s exceeded cpu threshold (will only log this once per worker)" % (msg.node_id)584 )585 elif msg.type == "stats":586 self.environment.events.worker_report.fire(client_id=msg.node_id, data=msg.data)587 elif msg.type == "spawning":588 self.clients[msg.node_id].state = STATE_SPAWNING589 elif msg.type == "spawning_complete":590 self.clients[msg.node_id].state = STATE_RUNNING591 self.clients[msg.node_id].user_count = msg.data["count"]592 if len(self.clients.spawning) == 0:593 count = sum(c.user_count for c in self.clients.values())594 self.environment.events.spawning_complete.fire(user_count=count)595 elif msg.type == "quit":596 if msg.node_id in self.clients:597 del self.clients[msg.node_id]598 logger.info(599 "Client %r quit. Currently %i clients connected." % (msg.node_id, len(self.clients.ready))600 )601 if self.worker_count - len(self.clients.missing) <= 0:602 logger.info("The last worker quit, stopping test.")603 self.stop()604 if self.environment.parsed_options and self.environment.parsed_options.headless:605 self.quit()606 elif msg.type == "exception":607 self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])608 self.check_stopped()609 @property610 def worker_count(self):611 return len(self.clients.ready) + len(self.clients.spawning) + len(self.clients.running)612class WorkerRunner(DistributedRunner):613 """614 Runner used to run distributed load tests across multiple processes and/or machines.615 WorkerRunner connects to a :class:`MasterRunner` from which it'll receive616 instructions to start and stop user greenlets. The WorkerRunner will periodically617 take the stats generated by the running users and send back to the :class:`MasterRunner`.618 """619 def __init__(self, environment, master_host, master_port):620 """621 :param environment: Environment instance622 :param master_host: Host/IP to use for connection to the master623 :param master_port: Port to use for connecting to the master624 """625 super().__init__(environment)626 self.worker_state = STATE_INIT627 self.client_id = socket.gethostname() + "_" + uuid4().hex628 self.master_host = master_host629 self.master_port = master_port630 self.client = rpc.Client(master_host, master_port, self.client_id)631 self.greenlet.spawn(self.heartbeat).link_exception(greenlet_exception_handler)632 self.greenlet.spawn(self.worker).link_exception(greenlet_exception_handler)633 self.client.send(Message("client_ready", None, self.client_id))634 self.greenlet.spawn(self.stats_reporter).link_exception(greenlet_exception_handler)635 # register listener for when all users have spawned, and report it to the master node636 def on_spawning_complete(user_count):637 self.client.send(Message("spawning_complete", {"count": user_count}, self.client_id))638 self.worker_state = STATE_RUNNING639 self.environment.events.spawning_complete.add_listener(on_spawning_complete)640 # register listener that adds the current number of spawned users to the report that is sent to the master node641 def on_report_to_master(client_id, data):642 data["user_count"] = self.user_count643 self.environment.events.report_to_master.add_listener(on_report_to_master)644 # register listener that sends quit message to master645 def on_quitting(environment, **kw):646 self.client.send(Message("quit", None, self.client_id))647 self.environment.events.quitting.add_listener(on_quitting)648 # register listener thats sends user exceptions to master649 def on_user_error(user_instance, exception, tb):650 formatted_tb = "".join(traceback.format_tb(tb))651 self.client.send(Message("exception", {"msg": str(exception), "traceback": formatted_tb}, self.client_id))652 self.environment.events.user_error.add_listener(on_user_error)653 def heartbeat(self):654 while True:655 try:656 self.client.send(657 Message(658 "heartbeat",659 {660 "state": self.worker_state,661 "current_cpu_usage": self.current_cpu_usage,662 "count": self.user_count,663 },...
test_http_client.py
Source:test_http_client.py
1from django.test import TestCase2from fixcity.bmabr.management.commands.http import FixcityHttp3import mock4import os5HERE = os.path.abspath(os.path.dirname(__file__))6class TestFixcityHttp(TestCase):7 @mock.patch('httplib2.Response')8 @mock.patch('httplib2.Http.request')9 @mock.patch('logging.Logger.debug')10 @mock.patch('fixcity.bmabr.management.commands.handle_mailin.Notifier')11 def test_do_post__success(self, mock_notifier, mock_debug, mock_request,12 mock_response):13 response = mock_response()14 notifier = mock_notifier()15 response.status = 20016 mock_request.return_value = (response, 'hello POST world')17 http = FixcityHttp(mock_notifier())18 status, content = http.do_post('http://example.com', 'test body')19 self.assertEqual(content, 'hello POST world')20 self.assertEqual(status, 200)21 self.failIf(notifier.bounce.call_count)22 @mock.patch('httplib2.Response')23 @mock.patch('httplib2.Http.request')24 @mock.patch('logging.Logger.debug')25 @mock.patch('fixcity.bmabr.management.commands.handle_mailin.Notifier')26 def test_do_post__500_error(self, mock_notifier, mock_debug, mock_request,27 mock_response):28 response = mock_response()29 notifier = mock_notifier()30 response.status = 50031 mock_request.return_value = (response, 'hello POST world')32 http = FixcityHttp(notifier)33 status, content = http.do_post('http://example.com', 'test body')34 self.assertEqual(status, 500)35 self.assertEqual(content, 'hello POST world')36 self.assertEqual(notifier.on_server_error.call_count, 1)37 self.assertEqual(notifier.on_server_error.call_args[0][0], content)38 @mock.patch('httplib2.Http.request')39 @mock.patch('logging.Logger.debug')40 @mock.patch('fixcity.bmabr.management.commands.handle_mailin.Notifier')41 def test_do_post__socket_error(self, mock_notifier, mock_debug,42 mock_request):43 import socket44 notifier = mock_notifier()45 mock_request.side_effect = socket.error("kaboom")46 http = FixcityHttp(notifier)47 status, content = http.do_post('http://example.com', 'test body')48 self.assertEqual(status, None)49 self.assertEqual(content, None)50 self.assertEqual(notifier.on_server_temp_failure.call_count, 1)51 @mock.patch('httplib2.Response')52 @mock.patch('httplib2.Http.request')53 @mock.patch('logging.Logger.debug')54 @mock.patch('fixcity.bmabr.management.commands.handle_mailin.Notifier')55 def test_do_post_json(self, mock_notifier, mock_debug, mock_request,56 mock_response):57 response = mock_response()58 response.status = 20059 notifier = mock_notifier()60 mock_request.return_value = (response, '{"foo": "bar"}')61 http = FixcityHttp(notifier)62 content = http.do_post_json('http://example.com',63 "{'some key': 'some value'}")64 self.assertEqual(content, {'foo': 'bar'})65 self.failIf(notifier.bounce.call_count)66 @mock.patch('httplib2.Response')67 @mock.patch('httplib2.Http.request')68 @mock.patch('logging.Logger._log')69 @mock.patch('fixcity.bmabr.management.commands.handle_mailin.Notifier')70 def test_do_post_json__parse_error(self, mock_notifier, mock_log, 71 mock_request, mock_response):72 response = mock_response()73 response.status = 20074 notifier = mock_notifier()75 mock_request.return_value = (response, 'this is not my beautiful JSON')76 http = FixcityHttp(notifier)77 content = http.do_post_json('http://example.com',78 "{'some key': 'some value'}")79 self.assertEqual(content, None)80 self.assertEqual(notifier.on_server_error.call_count, 1)81 self.assertEqual(mock_log.call_count, 2)82 @mock.patch('fixcity.bmabr.management.commands.http.FixcityHttp.do_post')83 @mock.patch('logging.Logger._log')84 @mock.patch('fixcity.bmabr.management.commands.handle_mailin.Notifier')85 def test_do_post_json__non_string(self, mock_notifier, mock_log, 86 mock_do_post):87 notifier = mock_notifier()88 mock_do_post.return_value = (200, 12345)89 http = FixcityHttp(notifier)90 self.assertRaises(AssertionError,91 http.do_post_json, 'http://example.com',92 "{'some key': 'some value'}")93 @mock.patch('httplib2.Response')94 @mock.patch('httplib2.Http.request')95 @mock.patch('logging.Logger.debug')96 @mock.patch('fixcity.bmabr.management.commands.handle_mailin.Notifier')97 def test_do_post_json__validation_errors(self, mock_notifier, mock_debug,98 mock_request, mock_response):99 response = mock_response()100 notifier = mock_notifier()101 response.status = 200102 from django.utils import simplejson as json103 error_body = json.dumps(104 {'errors': {'title': ['This field is required.']}})105 mock_request.return_value = (response, error_body)106 http = FixcityHttp(notifier)107 content = http.do_post_json('http://example.com',108 {'user': 'bob', 'some key': 'some value'})109 self.assertEqual(content, json.loads(error_body))110 self.assertEqual(notifier.on_user_error.call_count, 1)111 @mock.patch('httplib2.Response')112 @mock.patch('httplib2.Http.request')113 @mock.patch('logging.Logger.debug')114 @mock.patch('fixcity.bmabr.management.commands.handle_mailin.Notifier')115 def test_submit__successful_empty_result(self, mock_notifier, mock_debug,116 mock_request, mock_response):117 response = mock_response()118 notifier = mock_notifier()119 response.status = 200120 mock_request.return_value = (response, '{}')121 http = FixcityHttp(notifier)122 self.assertEqual(http.submit({}), None)123 self.assertEqual(notifier.on_submit_success.call_count, 0)124 @mock.patch('fixcity.bmabr.management.commands.tweeter.Notifier')125 @mock.patch('httplib2.Response')126 @mock.patch('httplib2.Http.request')127 @mock.patch('logging.Logger.debug')128 def test_submit__server_error(self, mock_debug,129 mock_request, mock_response, mock_notifier):130 mock_response.status = 500131 mock_request.return_value = (mock_response, 'blah')132 http = FixcityHttp(mock_notifier)133 data = {}134 http.submit(data)135 self.assertEqual(mock_notifier.on_server_error.call_count, 1)136 args = mock_notifier.on_server_error.call_args[0]137 self.assertEqual(args, ('blah',))138 @mock.patch('httplib2.Response')139 @mock.patch('fixcity.bmabr.management.commands.http.FixcityHttp.do_post')140 @mock.patch('logging.Logger.debug')141 @mock.patch('fixcity.bmabr.management.commands.handle_mailin.Notifier')142 def test_submit__with_photos_and_user(self, mock_notifier, mock_debug,143 mock_do_post,144 mock_response):145 # Mock typically uses side_effect() to specify multiple return146 # value; clunky API but works fine.147 do_post_return_values = [148 (200, '''{149 "user": "bob",150 "photo_post_url": "/photos/",151 "rack_url": "/racks/1"152 }'''),153 (200, 'OK')]154 def side_effect(*args, **kw):155 return do_post_return_values.pop(0)156 mock_do_post.side_effect = side_effect157 notifier = mock_notifier()158 http = FixcityHttp(notifier)159 # Mock photo needs to be just file-like enough.160 mock_photo_file = mock.Mock()161 mock_photo_file.name = 'foo.jpg'162 mock_photo_file.fileno.side_effect = AttributeError()163 mock_photo_file.tell.return_value = 12345164 mock_photo_file.read.return_value = ''165 self.assertEqual(http.submit({'photos': {'photo': mock_photo_file}}),166 None)167 self.assertEqual(notifier.on_submit_success.call_count, 1)168 vars = notifier.on_submit_success.call_args[0][0]169 self.assert_(vars.has_key('rack_url'))170 self.assert_(vars.has_key('rack_user'))171 @mock.patch('fixcity.bmabr.management.commands.tweeter.Notifier')172 @mock.patch('fixcity.bmabr.management.commands.tweeter.shorten_url')173 @mock.patch('logging.Logger.info')174 @mock.patch('fixcity.bmabr.management.commands.http.FixcityHttp.do_post')175 def test_submit__user_errors(self, mock_do_post, mock_info,176 mock_shorten, mock_notifier):177 http = FixcityHttp(mock_notifier)178 mock_do_post.return_value = (200, '{"errors": {"any": "thing at all"}}')179 mock_shorten.return_value = 'http://short_url/'180 data = {'title': 'TITLE', 'address': 'ADDRESS', 'twitter_user': 'USER',181 'date': 'DATE', 'twitter_id': 123}182 http.submit(data)183 self.assertEqual(mock_do_post.call_count, 1)184 # We notified the user of failure.185 self.assertEqual(mock_notifier.on_user_error.call_count, 1)186 notify_args, notify_kwargs = mock_notifier.on_user_error.call_args187 self.assertEqual(notify_args[0], data)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!