Best Python code snippet using stestr_python
test_test_commands.py
Source:test_test_commands.py
1# coding=utf-82"""3Copyright 2017 Load Impact4Licensed under the Apache License, Version 2.0 (the "License");5you may not use this file except in compliance with the License.6You may obtain a copy of the License at7 http://www.apache.org/licenses/LICENSE-2.08Unless required by applicable law or agreed to in writing, software9distributed under the License is distributed on an "AS IS" BASIS,10WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.11See the License for the specific language governing permissions and12limitations under the License.13"""14# Without this the config will prompt for a token15import os16os.environ['LOADIMPACT_API_V3_TOKEN'] = 'token'17import unittest18from collections import namedtuple19from datetime import datetime20from click.testing import CliRunner21from loadimpactcli import test_commands22from loadimpactcli.util import Metric, ColumnFormatter23try:24 from unittest.mock import MagicMock25except ImportError:26 from mock import MagicMock27Test = namedtuple('Test', ['id', 'name', 'last_test_run_id', 'config'])28TestRun = namedtuple('TestRun', ['id', 'queued', 'status', 'status_text'])29Organization = namedtuple('Organization', ['id'])30Project = namedtuple('Project', ['id'])31StreamData = namedtuple('StreamData', ['timestamp', 'value'])32class TestTestsList(unittest.TestCase):33 def setUp(self):34 self.runner = CliRunner()35 self.tests = [Test(1, 'Test1', 10003, ''),36 Test(2, 'Test2', 10002, ''),37 Test(3, 'Test3', 10001, '')]38 def test_list_tests_one_project_id(self):39 """40 Test "test list" with 1 project id and 3 tests.41 """42 client = test_commands.client43 # Setup mockers.44 client.list_tests = MagicMock(return_value=self.tests)45 client.get_test_run = MagicMock(return_value=TestRun(1, datetime.now(), 0, 'status'))46 result = self.runner.invoke(test_commands.list_tests, ['--project_id', '1', '--full_width'])47 self.assertEqual(client.list_tests.call_count, 1)48 self.assertEqual(client.get_test_run.call_count, 3)49 output = result.output.split('\n')50 self.assertEqual(len(output), 2 + 3)51 self.assertTrue(output[1].startswith('1\tTest1\t'))52 self.assertTrue(output[2].startswith('2\tTest2\t'))53 self.assertTrue(output[3].startswith('3\tTest3\t'))54 def test_list_tests_several_project_id(self):55 """56 Test "test list" with 2 project id and 6 tests.57 """58 client = test_commands.client59 # Setup mockers.60 client.list_tests = MagicMock(return_value=self.tests)61 client.get_test_run = MagicMock(return_value=TestRun(1, datetime.now(), 0, 'status'))62 result = self.runner.invoke(test_commands.list_tests, ['--project_id', '1',63 '--project_id', '2'])64 self.assertEqual(client.list_tests.call_count, 2)65 self.assertEqual(client.get_test_run.call_count, 6)66 output = result.output.split('\n')67 self.assertEqual(len(output), 2 + 2 * 3)68 def test_list_tests_no_project_id(self):69 """70 Test "test list" with no project id. The test simulates 271 organizations, with 2 projects per organization, with 3 tests per72 project.73 """74 client = test_commands.client75 # Setup mockers.76 client.list_tests = MagicMock(return_value=self.tests)77 client.get_test_run = MagicMock(return_value=TestRun(1, datetime.now(), 0, 'status'))78 client.list_organizations = MagicMock(return_value=[Organization(1), Organization(2)])79 client.list_organization_projects = MagicMock(side_effect=[[Project(1), Project(2)],80 [Project(3), Project(4)]])81 result = self.runner.invoke(test_commands.list_tests)82 self.assertEqual(client.list_organizations.call_count, 1)83 self.assertEqual(client.list_organization_projects.call_count, 2)84 self.assertEqual(client.list_tests.call_count, 2 * 2)85 self.assertEqual(client.get_test_run.call_count, 2 * 2 * 3)86 output = result.output.split('\n')87 self.assertEqual(len(output), 2 + 2 * 2 * 3)88 def test_list_tests_limit_not_hit(self):89 """90 Test "test list" with the limit flag.91 """92 client = test_commands.client93 # Setup mockers.94 client.list_tests = MagicMock(return_value=self.tests)95 client.get_test_run = MagicMock(return_value=TestRun(1, datetime.now(), 0, 'status'))96 # Invokation without flag: limit is 20, all tests are listed, no extra line.97 result = self.runner.invoke(test_commands.list_tests, ['--project_id', '1'])98 self.assertEqual(client.list_tests.call_count, 1)99 self.assertEqual(client.get_test_run.call_count, 3)100 output = result.output.split('\n')101 self.assertEqual(len(output), 2 + 3)102 # Invokation with flag: limit is 1, 1 test listed, extra line.103 client.list_tests.reset_mock()104 client.get_test_run.reset_mock()105 result = self.runner.invoke(test_commands.list_tests, ['--project_id', '1',106 '--limit', '1'])107 self.assertEqual(client.list_tests.call_count, 1)108 self.assertEqual(client.get_test_run.call_count, 1)109 output = result.output.split('\n')110 self.assertEqual(len(output), 2 + 1 + 1)111 def test_summarize_valid_config(self):112 config = {u'new_relic_applications': [],113 u'network_emulation': {u'client': u'li', u'network': u'unlimited'},114 u'user_type': u'sbu',115 u'server_metric_agents': [],116 u'tracks': [117 {u'clips': [{u'user_scenario_id': 225, u'percent': 100}],118 u'loadzone': u'amazon:ie:dublin'}],119 u'url_groups': [],120 u'load_schedule': [121 {u'duration': 1, u'users': 50},122 {u'duration': 2, u'users': 100}],123 u'source_ips': 0}124 summary = test_commands.summarize_config(config)125 self.assertEqual(summary, '50 users 1s; 100 users 2s')126 def test_summarize_invalid_config(self):127 config = 'INVALID'128 summary = test_commands.summarize_config(config)129 self.assertEqual(summary, '-')130class TestTestsRun(unittest.TestCase):131 def setUp(self):132 self.runner = CliRunner()133 def _assertMetricHeader(self, string, metrics):134 formatter = test_commands.get_run_test_formatter(True, metrics)135 self.assertEqual(test_commands.pprint_header(formatter, metrics),136 u'TIMESTAMP:\t{0}'.format(string))137 def test_ui_header(self):138 """139 Test the header of metric streaming.140 """141 # Default metrics: they are automatically appended the load zone.142 default_metrics = [Metric.from_raw('clients_active'),143 Metric.from_raw('requests_per_second'),144 Metric.from_raw('bandwidth'),145 Metric.from_raw('user_load_time'),146 Metric.from_raw('failure_rate')]147 self._assertMetricHeader(148 u'VUs [1]:\treqs/s [1]:\tbandwidth [1]:\tuser load time [1]:\tfailure rate [1]:',149 default_metrics)150 # Same metrics, different parameters.151 repeated_metrics = [Metric.from_raw('clients_active'),152 Metric.from_raw('__li_clients_active:2')]153 self._assertMetricHeader(u'VUs [1]:\tVUs [2]:', repeated_metrics)154 # Unicode metrics.155 unicode_metrics = [Metric.from_raw(u'foöbÃ¥r:0:ÃÃ¥Å')]156 self._assertMetricHeader(u'foöbÃ¥r [0 ÃÃ¥Å]:', unicode_metrics)157 def test_ui_row(self):158 """159 Test the row generation of metric streaming.160 """161 metrics = [Metric.from_raw('clients_active'),162 Metric.from_raw(u'foöbÃ¥r:1')]163 # No results returned.164 formatter = test_commands.get_run_test_formatter(True, metrics)165 data = {}166 self.assertEqual(test_commands.pprint_row(formatter, data, metrics), '')167 # Only timestamp (not one of the requested metrics) returned.168 data = {'somerandomkey': StreamData(datetime.now(), 1)}169 self.assertEqual(test_commands.pprint_row(formatter, data, metrics).split('\t', 1)[1], u'-\t-')170 # Data returned for one metric.171 data = {metrics[0].str_raw(True): StreamData(datetime.now(), 123)}172 self.assertEqual(test_commands.pprint_row(formatter, data, metrics).split('\t', 1)[1], u'123\t-')173 # Data returned for both metrics.174 data = {metrics[0].str_raw(True): StreamData(datetime.now(), 123),175 metrics[1].str_raw(True): StreamData(datetime.now(), 'xyz')}176 self.assertEqual(test_commands.pprint_row(formatter, data, metrics).split('\t', 1)[1], u'123\txyz')177 def test_run_no_streaming(self):178 """179 Test `test run` with no streaming of the results.180 """181 client = test_commands.client182 # Setup mockers.183 MockedTest = namedtuple('Test', ['id', 'name', 'last_test_run_id', 'config', 'start_test_run'])184 test_run = MagicMock(return_value=TestRun(222, datetime.now(), 0, 'status'))185 test = MockedTest(1, 'Test1', 10001, '', test_run)186 client.get_test = MagicMock(return_value=test)187 result = self.runner.invoke(test_commands.run_test, ['--quiet', '1'])188 # Client and test methods have been called.189 self.assertEqual(client.get_test.call_count, 1)190 self.assertEqual(test.start_test_run.call_count, 1)191 # Last line of the output contains new TestRun ID.192 output = result.output.split('\n')193 self.assertEqual(output[-2], u'222')194 def test_run_streaming(self):195 """196 Test `test run` with streaming of the results using default metrics.197 """198 def mocked_stream(*args, **kwargs):199 """200 Mimic stream() generator, yielding one row of results.201 """202 yield {'__li_clients_active:1': StreamData(datetime.now(), '1.23')}203 client = test_commands.client204 # Setup mockers.205 MockedTest = namedtuple('MockedTest',206 ['id', 'name', 'last_test_run_id', 'config', 'start_test_run'])207 MockedTestRun = namedtuple('MockedTestRun',208 ['id', 'queued', 'status', 'status_text', 'result_stream'])209 mocked_stream = MagicMock(return_value=mocked_stream)210 test_run = MagicMock(return_value=MockedTestRun(222, datetime.now(), 0, 'status', mocked_stream))211 test = MockedTest(1, 'Test1', 10001, '', test_run)212 client.get_test = MagicMock(return_value=test)213 result = self.runner.invoke(test_commands.run_test, ['1', '--full_width'])214 # Client and test methods have been called.215 self.assertEqual(client.get_test.call_count, 1)216 self.assertEqual(test.start_test_run.call_count, 1)217 self.assertEqual(mocked_stream.call_count, 1)218 # Assertions on the output.219 output = result.output.split('\n')220 # Results table assertions (one row, one metric).221 self.assertEqual(len(output), 6)222 self.assertEqual(len(output[-2].split('\t')), 6)...
example_adf_run_pipeline.py
Source:example_adf_run_pipeline.py
...66#from tests.system.utils import get_test_run # noqa: E40267#68## Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)69#
...
route.py
Source:route.py
...5from ..dependencies import get_db6router = APIRouter(prefix="/run", tags=["Test Run"])7@router.get("/{id}", response_model=TestRun, response_description="Test run retrieved")8def get_run(id: int, db: Session = Depends(get_db)):9 run = get_test_run(db, run_id=id)10 if not run:11 raise HTTPException(404, "Test run not found")12 return run13@router.delete("/{id}")14def delete_run(id: int, db: Session = Depends(get_db)):15 run = delete_test_run(db, run_id=id)16 return run17@router.patch("/{id}")18def update_run(id: int, run: TestRunUpdate, db: Session = Depends(get_db)):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!