Best Python code snippet using locust
test_web.py
Source:test_web.py
...21from locust.web import WebUI22from .testcases import LocustTestCase23from .util import create_tls_cert24class _HeaderCheckMixin:25 def _check_csv_headers(self, headers, exp_fn_prefix):26 # Check common headers for csv file download request27 self.assertIn("Content-Type", headers)28 content_type = headers["Content-Type"]29 self.assertIn("text/csv", content_type)30 self.assertIn("Content-disposition", headers)31 disposition = headers[32 "Content-disposition"33 ] # e.g.: 'attachment; filename=requests_full_history_1597586811.5084946.csv'34 self.assertIn(exp_fn_prefix, disposition)35class TestWebUI(LocustTestCase, _HeaderCheckMixin):36 def setUp(self):37 super().setUp()38 parser = get_parser(default_config_files=[])39 self.environment.parsed_options = parser.parse_args([])40 self.stats = self.environment.stats41 self.web_ui = self.environment.create_web_ui("127.0.0.1", 0)42 self.web_ui.app.view_functions["request_stats"].clear_cache()43 gevent.sleep(0.01)44 self.web_port = self.web_ui.server.server_port45 def tearDown(self):46 super().tearDown()47 self.web_ui.stop()48 self.runner.quit()49 def test_web_ui_reference_on_environment(self):50 self.assertEqual(self.web_ui, self.environment.web_ui)51 def test_web_ui_no_runner(self):52 env = Environment()53 web_ui = WebUI(env, "127.0.0.1", 0)54 gevent.sleep(0.01)55 try:56 response = requests.get("http://127.0.0.1:%i/" % web_ui.server.server_port)57 self.assertEqual(500, response.status_code)58 self.assertEqual("Error: Locust Environment does not have any runner", response.text)59 finally:60 web_ui.stop()61 def test_index(self):62 self.assertEqual(200, requests.get("http://127.0.0.1:%i/" % self.web_port).status_code)63 def test_index_with_spawn_options(self):64 html_to_option = {65 "user_count": ["-u", "100"],66 "spawn_rate": ["-r", "10.0"],67 }68 for html_name_to_test in html_to_option.keys():69 # Test that setting each spawn option individually populates the corresponding field in the html, and none of the others70 self.environment.parsed_options = parse_options(html_to_option[html_name_to_test])71 response = requests.get("http://127.0.0.1:%i/" % self.web_port)72 self.assertEqual(200, response.status_code)73 d = pq(response.content.decode("utf-8"))74 for html_name in html_to_option.keys():75 start_value = d(f".start [name={html_name}]").attr("value")76 edit_value = d(f".edit [name={html_name}]").attr("value")77 if html_name_to_test == html_name:78 self.assertEqual(html_to_option[html_name][1], start_value)79 self.assertEqual(html_to_option[html_name][1], edit_value)80 else:81 self.assertEqual("", start_value)82 self.assertEqual("", edit_value)83 def test_stats_no_data(self):84 self.assertEqual(200, requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port).status_code)85 def test_stats(self):86 self.stats.log_request("GET", "/<html>", 120, 5612)87 response = requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port)88 self.assertEqual(200, response.status_code)89 data = json.loads(response.text)90 self.assertEqual(2, len(data["stats"])) # one entry plus Aggregated91 self.assertEqual("/<html>", data["stats"][0]["name"])92 self.assertEqual("/<html>", data["stats"][0]["safe_name"])93 self.assertEqual("GET", data["stats"][0]["method"])94 self.assertEqual(120, data["stats"][0]["avg_response_time"])95 self.assertEqual("Aggregated", data["stats"][1]["name"])96 self.assertEqual(1, data["stats"][1]["num_requests"])97 self.assertEqual(120, data["stats"][1]["avg_response_time"])98 def test_stats_cache(self):99 self.stats.log_request("GET", "/test", 120, 5612)100 response = requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port)101 self.assertEqual(200, response.status_code)102 data = json.loads(response.text)103 self.assertEqual(2, len(data["stats"])) # one entry plus Aggregated104 # add another entry105 self.stats.log_request("GET", "/test2", 120, 5612)106 data = json.loads(requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port).text)107 self.assertEqual(2, len(data["stats"])) # old value should be cached now108 self.web_ui.app.view_functions["request_stats"].clear_cache()109 data = json.loads(requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port).text)110 self.assertEqual(3, len(data["stats"])) # this should no longer be cached111 def test_stats_rounding(self):112 self.stats.log_request("GET", "/test", 1.39764125, 2)113 self.stats.log_request("GET", "/test", 999.9764125, 1000)114 response = requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port)115 self.assertEqual(200, response.status_code)116 data = json.loads(response.text)117 self.assertEqual(1, data["stats"][0]["min_response_time"])118 self.assertEqual(1000, data["stats"][0]["max_response_time"])119 def test_request_stats_csv(self):120 self.stats.log_request("GET", "/test2", 120, 5612)121 response = requests.get("http://127.0.0.1:%i/stats/requests/csv" % self.web_port)122 self.assertEqual(200, response.status_code)123 self._check_csv_headers(response.headers, "requests")124 def test_request_stats_full_history_csv_not_present(self):125 self.stats.log_request("GET", "/test2", 120, 5612)126 response = requests.get("http://127.0.0.1:%i/stats/requests_full_history/csv" % self.web_port)127 self.assertEqual(404, response.status_code)128 def test_failure_stats_csv(self):129 self.stats.log_error("GET", "/", Exception("Error1337"))130 response = requests.get("http://127.0.0.1:%i/stats/failures/csv" % self.web_port)131 self.assertEqual(200, response.status_code)132 self._check_csv_headers(response.headers, "failures")133 def test_request_stats_with_errors(self):134 self.stats.log_error("GET", "/", Exception("Error1337"))135 response = requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port)136 self.assertEqual(200, response.status_code)137 self.assertIn("Error1337", response.text)138 def test_reset_stats(self):139 try:140 raise Exception("A cool test exception")141 except Exception as e:142 tb = e.__traceback__143 self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))144 self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))145 self.stats.log_request("GET", "/test", 120, 5612)146 self.stats.log_error("GET", "/", Exception("Error1337"))147 response = requests.get("http://127.0.0.1:%i/stats/reset" % self.web_port)148 self.assertEqual(200, response.status_code)149 self.assertEqual({}, self.stats.errors)150 self.assertEqual({}, self.runner.exceptions)151 self.assertEqual(0, self.stats.get("/", "GET").num_requests)152 self.assertEqual(0, self.stats.get("/", "GET").num_failures)153 self.assertEqual(0, self.stats.get("/test", "GET").num_requests)154 self.assertEqual(0, self.stats.get("/test", "GET").num_failures)155 def test_exceptions(self):156 try:157 raise Exception("A cool test exception")158 except Exception as e:159 tb = e.__traceback__160 self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))161 self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))162 response = requests.get("http://127.0.0.1:%i/exceptions" % self.web_port)163 self.assertEqual(200, response.status_code)164 self.assertIn("A cool test exception", response.text)165 response = requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port)166 self.assertEqual(200, response.status_code)167 def test_exceptions_csv(self):168 try:169 raise Exception("Test exception")170 except Exception as e:171 tb = e.__traceback__172 self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))173 self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))174 response = requests.get("http://127.0.0.1:%i/exceptions/csv" % self.web_port)175 self.assertEqual(200, response.status_code)176 self._check_csv_headers(response.headers, "exceptions")177 reader = csv.reader(StringIO(response.text))178 rows = []179 for row in reader:180 rows.append(row)181 self.assertEqual(2, len(rows))182 self.assertEqual("Test exception", rows[1][1])183 self.assertEqual(2, int(rows[1][0]), "Exception count should be 2")184 def test_swarm_host_value_specified(self):185 class MyUser(User):186 wait_time = constant(1)187 @task(1)188 def my_task(self):189 pass190 self.environment.user_classes = [MyUser]191 response = requests.post(192 "http://127.0.0.1:%i/swarm" % self.web_port,193 data={"user_count": 5, "spawn_rate": 5, "host": "https://localhost"},194 )195 self.assertEqual(200, response.status_code)196 self.assertEqual("https://localhost", response.json()["host"])197 self.assertEqual(self.environment.host, "https://localhost")198 def test_swarm_host_value_not_specified(self):199 class MyUser(User):200 wait_time = constant(1)201 @task(1)202 def my_task(self):203 pass204 self.environment.user_classes = [MyUser]205 response = requests.post(206 "http://127.0.0.1:%i/swarm" % self.web_port,207 data={"user_count": 5, "spawn_rate": 5},208 )209 self.assertEqual(200, response.status_code)210 self.assertEqual(None, response.json()["host"])211 self.assertEqual(self.environment.host, None)212 def test_host_value_from_user_class(self):213 class MyUser(User):214 host = "http://example.com"215 self.environment.user_classes = [MyUser]216 response = requests.get("http://127.0.0.1:%i/" % self.web_port)217 self.assertEqual(200, response.status_code)218 self.assertIn("http://example.com", response.content.decode("utf-8"))219 self.assertNotIn("setting this will override the host on all User classes", response.content.decode("utf-8"))220 def test_host_value_from_multiple_user_classes(self):221 class MyUser(User):222 host = "http://example.com"223 class MyUser2(User):224 host = "http://example.com"225 self.environment.user_classes = [MyUser, MyUser2]226 response = requests.get("http://127.0.0.1:%i/" % self.web_port)227 self.assertEqual(200, response.status_code)228 self.assertIn("http://example.com", response.content.decode("utf-8"))229 self.assertNotIn("setting this will override the host on all User classes", response.content.decode("utf-8"))230 def test_host_value_from_multiple_user_classes_different_hosts(self):231 class MyUser(User):232 host = None233 class MyUser2(User):234 host = "http://example.com"235 self.environment.user_classes = [MyUser, MyUser2]236 response = requests.get("http://127.0.0.1:%i/" % self.web_port)237 self.assertEqual(200, response.status_code)238 self.assertNotIn("http://example.com", response.content.decode("utf-8"))239 self.assertIn("setting this will override the host on all User classes", response.content.decode("utf-8"))240 def test_report_page(self):241 self.stats.log_request("GET", "/test", 120, 5612)242 r = requests.get("http://127.0.0.1:%i/stats/report" % self.web_port)243 self.assertEqual(200, r.status_code)244 self.assertIn("<title>Test Report</title>", r.text)245 self.assertIn("charts-container", r.text)246 self.assertIn(247 '<a href="?download=1">Download the Report</a>',248 r.text,249 "Download report link not found in HTML content",250 )251 def test_report_page_empty_stats(self):252 r = requests.get("http://127.0.0.1:%i/stats/report" % self.web_port)253 self.assertEqual(200, r.status_code)254 self.assertIn("<title>Test Report</title>", r.text)255 self.assertIn("charts-container", r.text)256 def test_report_download(self):257 self.stats.log_request("GET", "/test", 120, 5612)258 r = requests.get("http://127.0.0.1:%i/stats/report?download=1" % self.web_port)259 self.assertEqual(200, r.status_code)260 self.assertIn("attachment", r.headers.get("Content-Disposition", ""))261 self.assertNotIn("Download the Report", r.text, "Download report link found in HTML content")262 def test_report_host(self):263 self.environment.host = "http://test.com"264 self.stats.log_request("GET", "/test", 120, 5612)265 r = requests.get("http://127.0.0.1:%i/stats/report" % self.web_port)266 self.assertEqual(200, r.status_code)267 self.assertIn("http://test.com", r.text)268 def test_report_host2(self):269 class MyUser(User):270 host = "http://test2.com"271 @task272 def my_task(self):273 pass274 self.environment.host = None275 self.environment.user_classes = [MyUser]276 self.stats.log_request("GET", "/test", 120, 5612)277 r = requests.get("http://127.0.0.1:%i/stats/report" % self.web_port)278 self.assertEqual(200, r.status_code)279 self.assertIn("http://test2.com", r.text)280 def test_report_exceptions(self):281 try:282 raise Exception("Test exception")283 except Exception as e:284 tb = e.__traceback__285 self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))286 self.runner.log_exception("local", str(e), "".join(traceback.format_tb(tb)))287 self.stats.log_request("GET", "/test", 120, 5612)288 r = requests.get("http://127.0.0.1:%i/stats/report" % self.web_port)289 # self.assertEqual(200, r.status_code)290 self.assertIn("<h2>Exceptions Statistics</h2>", r.text)291class TestWebUIAuth(LocustTestCase):292 def setUp(self):293 super().setUp()294 parser = get_parser(default_config_files=[])295 options = parser.parse_args(["--web-auth", "john:doe"])296 self.runner = Runner(self.environment)297 self.stats = self.runner.stats298 self.web_ui = self.environment.create_web_ui("127.0.0.1", 0, auth_credentials=options.web_auth)299 self.web_ui.app.view_functions["request_stats"].clear_cache()300 gevent.sleep(0.01)301 self.web_port = self.web_ui.server.server_port302 def tearDown(self):303 super().tearDown()304 self.web_ui.stop()305 self.runner.quit()306 def test_index_with_basic_auth_enabled_correct_credentials(self):307 self.assertEqual(308 200, requests.get("http://127.0.0.1:%i/?ele=phino" % self.web_port, auth=("john", "doe")).status_code309 )310 def test_index_with_basic_auth_enabled_incorrect_credentials(self):311 self.assertEqual(312 401, requests.get("http://127.0.0.1:%i/?ele=phino" % self.web_port, auth=("john", "invalid")).status_code313 )314 def test_index_with_basic_auth_enabled_blank_credentials(self):315 self.assertEqual(401, requests.get("http://127.0.0.1:%i/?ele=phino" % self.web_port).status_code)316class TestWebUIWithTLS(LocustTestCase):317 def setUp(self):318 super().setUp()319 tls_cert, tls_key = create_tls_cert("127.0.0.1")320 self.tls_cert_file = NamedTemporaryFile(delete=False)321 self.tls_key_file = NamedTemporaryFile(delete=False)322 with open(self.tls_cert_file.name, "w") as f:323 f.write(tls_cert.decode())324 with open(self.tls_key_file.name, "w") as f:325 f.write(tls_key.decode())326 parser = get_parser(default_config_files=[])327 options = parser.parse_args(328 [329 "--tls-cert",330 self.tls_cert_file.name,331 "--tls-key",332 self.tls_key_file.name,333 ]334 )335 self.runner = Runner(self.environment)336 self.stats = self.runner.stats337 self.web_ui = self.environment.create_web_ui("127.0.0.1", 0, tls_cert=options.tls_cert, tls_key=options.tls_key)338 gevent.sleep(0.01)339 self.web_port = self.web_ui.server.server_port340 def tearDown(self):341 super().tearDown()342 self.web_ui.stop()343 self.runner.quit()344 os.unlink(self.tls_cert_file.name)345 os.unlink(self.tls_key_file.name)346 def test_index_with_https(self):347 # Suppress only the single warning from urllib3 needed.348 from urllib3.exceptions import InsecureRequestWarning349 requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)350 self.assertEqual(200, requests.get("https://127.0.0.1:%i/" % self.web_port, verify=False).status_code)351class TestWebUIFullHistory(LocustTestCase, _HeaderCheckMixin):352 STATS_BASE_NAME = "web_test"353 STATS_FILENAME = "{}_stats.csv".format(STATS_BASE_NAME)354 STATS_HISTORY_FILENAME = "{}_stats_history.csv".format(STATS_BASE_NAME)355 STATS_FAILURES_FILENAME = "{}_failures.csv".format(STATS_BASE_NAME)356 def setUp(self):357 super().setUp()358 self.remove_files_if_exists()359 parser = get_parser(default_config_files=[])360 self.environment.parsed_options = parser.parse_args(["--csv", self.STATS_BASE_NAME, "--csv-full-history"])361 self.stats = self.environment.stats362 self.stats.CSV_STATS_INTERVAL_SEC = 0.02363 locust.stats.CSV_STATS_INTERVAL_SEC = 0.1364 self.stats_csv_writer = StatsCSVFileWriter(365 self.environment, stats.PERCENTILES_TO_REPORT, self.STATS_BASE_NAME, full_history=True366 )367 self.web_ui = self.environment.create_web_ui("127.0.0.1", 0, stats_csv_writer=self.stats_csv_writer)368 self.web_ui.app.view_functions["request_stats"].clear_cache()369 gevent.sleep(0.01)370 self.web_port = self.web_ui.server.server_port371 def tearDown(self):372 super().tearDown()373 self.web_ui.stop()374 self.runner.quit()375 self.remove_files_if_exists()376 def remove_file_if_exists(self, filename):377 if os.path.exists(filename):378 os.remove(filename)379 def remove_files_if_exists(self):380 self.remove_file_if_exists(self.STATS_FILENAME)381 self.remove_file_if_exists(self.STATS_HISTORY_FILENAME)382 self.remove_file_if_exists(self.STATS_FAILURES_FILENAME)383 def test_request_stats_full_history_csv(self):384 self.stats.log_request("GET", "/test", 1.39764125, 2)385 self.stats.log_request("GET", "/test", 999.9764125, 1000)386 self.stats.log_request("GET", "/test2", 120, 5612)387 greenlet = gevent.spawn(self.stats_csv_writer.stats_writer)388 gevent.sleep(0.01)389 self.stats_csv_writer.stats_history_flush()390 gevent.kill(greenlet)391 response = requests.get("http://127.0.0.1:%i/stats/requests_full_history/csv" % self.web_port)392 self.assertEqual(200, response.status_code)393 self._check_csv_headers(response.headers, "requests_full_history")394 self.assertIn("Content-Length", response.headers)395 reader = csv.reader(StringIO(response.text))396 rows = [r for r in reader]397 self.assertEqual(4, len(rows))398 self.assertEqual("Timestamp", rows[0][0])399 self.assertEqual("GET", rows[1][2])400 self.assertEqual("/test", rows[1][3])401 self.assertEqual("/test2", rows[2][3])402 self.assertEqual("", rows[3][2])...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!