Best Python code snippet using playwright-python
test_executor.py
Source:test_executor.py
1import threading2import time3import traceback4from copy import copy5from datetime import datetime6from functools import cmp_to_key7from typing import List8from .enumeration import TestCaseStatus, TestClassRunMode, TestFixtureStatus9from .plistener import test_listeners10from .plogger import preporter, pconsole, pconsole_err11from .test_suite import AfterSuite, BeforeSuite, AfterClass, BeforeClass, BeforeGroup, AfterGroup, AfterMethod, BeforeMethod, Test, \12 TestSuite, TestGroup, TestClass, TestCase, TestFixture13from .util import call_function, kill_thread, format_thread_stack14class TestExecutor(threading.Thread):15 def __init__(self, parent_test_executor: "TestExecutor", workers: int = 0):16 threading.Thread.__init__(self)17 self.parent_test_executor = parent_test_executor18 self.__properties = {}19 if self.parent_test_executor:20 for key, value in self.parent_test_executor.get_properties().items():21 if isinstance(value, (list, tuple, set, dict)):22 self.__properties[key] = copy(value)23 else:24 self.__properties[key] = value25 self.workers = workers26 self.lock = threading.RLock()27 if self.workers == 0:28 self.acquire_worker()29 def _run(self):30 pass31 def run(self):32 try:33 self._run()34 finally:35 self.release_worker()36 def start_and_join(self):37 self.start()38 self.join()39 def update_properties(self, properties):40 self.__properties.update(properties)41 def clear_properties(self):42 self.__properties.clear()43 def get_property(self, key):44 try:45 return self.__properties[key]46 except KeyError:47 return None48 def get_properties(self):49 return self.__properties50 def allocate_worker(self, child_test_executor: "TestExecutor"):51 with self.lock:52 if self.workers > 0:53 self.workers -= 154 child_test_executor.workers += 155 return True56 else:57 return False58 def apply_worker(self):59 if self.parent_test_executor:60 with self.lock:61 if self.parent_test_executor.allocate_worker(self):62 return True63 else:64 if self.parent_test_executor.apply_worker():65 return self.parent_test_executor.allocate_worker(self)66 else:67 return False68 else:69 with self.lock:70 return self.workers > 071 def acquire_worker(self):72 while True:73 if self.apply_worker():74 return75 else:76 time.sleep(1)77 def release_worker(self):78 if self.parent_test_executor:79 with self.parent_test_executor.lock:80 self.parent_test_executor.workers += self.workers81 self.workers = 082 else:83 pass84class TestSuiteExecutor(TestExecutor):85 def __init__(self, test_suite: TestSuite, workers: int):86 TestExecutor.__init__(self, None, workers)87 self.test_suite = test_suite88 def _run(self):89 before_suite_executor = TestFixtureExecutor(self, self.test_suite.before_suite)90 test_listeners.on_test_suite_start(self.test_suite)91 self.test_suite.start_time = datetime.now()92 before_suite_executor.start_and_join()93 test_class_run_group_executors = []94 for test_class_run_group in self.test_suite.test_class_run_groups:95 test_class_run_group_executor = TestClassRunGroupExecutor(self, test_class_run_group)96 test_class_run_group_executors.append(test_class_run_group_executor)97 test_class_run_group_executor.start()98 for executor in test_class_run_group_executors:99 executor.join()100 after_suite_executor = TestFixtureExecutor(self, self.test_suite.after_suite)101 after_suite_executor.start_and_join()102 self.test_suite.end_time = datetime.now()103 test_listeners.on_test_suite_finish(self.test_suite)104class TestClassRunGroupExecutor(TestExecutor):105 def __init__(self, test_suite_executor: TestSuiteExecutor, test_class_run_group: List[TestClass]):106 TestExecutor.__init__(self, test_suite_executor)107 self.test_class_run_group = test_class_run_group108 def _run(self):109 for test_class in self.test_class_run_group:110 TestClassExecutor(self, test_class).start_and_join()111class TestClassExecutor(TestExecutor):112 def __init__(self, test_class_run_group_executor: TestClassRunGroupExecutor, test_class: TestClass):113 TestExecutor.__init__(self, test_class_run_group_executor)114 self.test_class = test_class115 def _run(self):116 before_class_executor = TestFixtureExecutor(self, self.test_class.before_class)117 test_listeners.on_test_class_start(self.test_class)118 self.test_class.start_time = datetime.now()119 before_class_executor.start_and_join()120 if self.test_class.run_mode == TestClassRunMode.SingleLine:121 for test_group in self.test_class.test_groups:122 TestGroupExecutor(self, test_group).start_and_join()123 else:124 test_group_executors = []125 for test_group in self.test_class.test_groups:126 test_group_executor = TestGroupExecutor(self, test_group)127 test_group_executors.append(test_group_executor)128 test_group_executor.start()129 for executor in test_group_executors:130 executor.join()131 after_class_executor = TestFixtureExecutor(self, self.test_class.after_class)132 after_class_executor.start_and_join()133 self.test_class.end_time = datetime.now()134 test_listeners.on_test_class_finish(self.test_class)135class TestGroupExecutor(TestExecutor):136 def __init__(self, test_class_executor: TestClassExecutor, test_group: TestGroup):137 TestExecutor.__init__(self, test_class_executor)138 self.test_group = test_group139 def _run(self):140 before_group_executor = TestFixtureExecutor(self, self.test_group.before_group)141 test_listeners.on_test_group_start(self.test_group)142 self.test_group.start_time = datetime.now()143 before_group_executor.start_and_join()144 if self.test_group.test_class.run_mode == TestClassRunMode.SingleLine:145 for test_case in self.test_group.test_cases:146 TestCaseExecutor(self, test_case).start_and_join()147 else:148 test_case_executors = []149 for test_case in self.test_group.test_cases:150 test_case_executor = TestCaseExecutor(self, test_case)151 test_case_executors.append(test_case_executor)152 test_case_executor.start()153 for executor in test_case_executors:154 executor.join()155 after_group_executor = TestFixtureExecutor(self, self.test_group.after_group)156 after_group_executor.start_and_join()157 self.test_group.end_time = datetime.now()158 test_listeners.on_test_group_finish(self.test_group)159class TestCaseExecutor(TestExecutor):160 def __init__(self, test_group_executor: TestGroupExecutor, test_case: TestCase):161 TestExecutor.__init__(self, test_group_executor)162 self.test_case = test_case163 def _run(self):164 before_method_executor = TestFixtureExecutor(self, self.test_case.before_method)165 test_listeners.on_test_case_start(self.test_case)166 self.test_case.start_time = datetime.now()167 before_method_executor.start_and_join()168 test_executor = TestFixtureExecutor(self, self.test_case.test)169 test_executor.start_and_join()170 logger_filler = "-" * (100 - len(self.test_case.full_name) - 6)171 if self.test_case.status == TestCaseStatus.PASSED:172 pconsole.write_line("%s%s|PASS|" % (self.test_case.full_name, logger_filler))173 elif self.test_case.status == TestCaseStatus.FAILED:174 pconsole.write_line("%s%s|FAIL|" % (self.test_case.full_name, logger_filler))175 elif self.test_case.status == TestCaseStatus.SKIPPED:176 pconsole.write_line("%s%s|SKIP|" % (self.test_case.full_name, logger_filler))177 after_method_executor = TestFixtureExecutor(self, self.test_case.after_method)178 after_method_executor.start_and_join()179 self.test_case.end_time = datetime.now()180 test_listeners.on_test_case_finish(self.test_case)181class TestFixtureExecutor(TestExecutor):182 def __init__(self, parent_test_executor: TestExecutor, test_fixture: TestFixture):183 TestExecutor.__init__(self, parent_test_executor)184 self.test_fixture = test_fixture185 def _run(self):186 if self.test_fixture.is_empty: return187 self.test_fixture.start_time = datetime.now()188 self.update_properties({"running_test_fixture": self.test_fixture})189 failed_setup_fixture = self.test_fixture.context.get_failed_setup_fixture()190 if not failed_setup_fixture:191 self.run_test_fixture()192 elif isinstance(self.test_fixture, AfterSuite) and isinstance(failed_setup_fixture, BeforeSuite) and self.test_fixture.always_run:193 self.run_test_fixture()194 elif isinstance(self.test_fixture, AfterClass) and isinstance(failed_setup_fixture, BeforeClass) and self.test_fixture.always_run:195 self.run_test_fixture()196 elif isinstance(self.test_fixture, AfterGroup) and isinstance(failed_setup_fixture, BeforeGroup) and self.test_fixture.always_run:197 self.run_test_fixture()198 elif isinstance(self.test_fixture, AfterMethod) and isinstance(failed_setup_fixture, BeforeMethod) and self.test_fixture.always_run:199 self.run_test_fixture()200 else:201 self.skip_test_fixture(failed_setup_fixture)202 # spread before's attributes203 if isinstance(self.test_fixture, BeforeSuite):204 before_suite_dict = self.test_fixture.test_fixture_ref.__self__.__dict__205 for test_class in self.test_fixture.test_suite.test_classes:206 test_class.test_class_ref.__dict__.update(before_suite_dict)207 for test_group in test_class.test_groups:208 test_group.test_class_ref.__dict__.update(before_suite_dict)209 for test_case in test_group.test_cases:210 test_case.test_case_ref.__self__.__dict__.update(before_suite_dict)211 elif isinstance(self.test_fixture, BeforeClass):212 before_class_dict = self.test_fixture.test_fixture_ref.__self__.__dict__213 for test_group in self.test_fixture.test_class.test_groups:214 test_group.test_class_ref.__dict__.update(before_class_dict)215 for test_case in test_group.test_cases:216 test_case.test_case_ref.__self__.__dict__.update(before_class_dict)217 elif isinstance(self.test_fixture, BeforeGroup):218 before_group_dict = self.test_fixture.test_fixture_ref.__self__.__dict__219 for test_case in self.test_fixture.test_group.test_cases:220 test_case.test_case_ref.__self__.__dict__.update(before_group_dict)221 self.update_properties({"running_test_fixture": None})222 self.test_fixture.end_time = datetime.now()223 def run_test_fixture(self):224 self.test_fixture.status = TestFixtureStatus.RUNNING225 test_fixture_sub_executor = TestFixtureSubExecutor(self)226 test_fixture_sub_executor.start()227 if self.test_fixture.timeout > 0:228 test_fixture_sub_executor.join(self.test_fixture.timeout)229 if test_fixture_sub_executor.isAlive():230 stack_trace = format_thread_stack(test_fixture_sub_executor)231 try:232 kill_thread(test_fixture_sub_executor)233 except Exception as e:234 pconsole_err.write_line(e)235 from .plogger import preporter236 self.test_fixture.status = TestFixtureStatus.FAILED237 self.test_fixture.failure_message = "Timed out executing this test fixture in %s seconds." % self.test_fixture.timeout238 self.test_fixture.failure_type = "TimeoutException"239 self.test_fixture.stack_trace = stack_trace240 preporter.error(241 "Failed with following message:\n%s\n%s" % (self.test_fixture.failure_message, self.test_fixture.stack_trace), True)242 else:243 test_fixture_sub_executor.join()244 def skip_test_fixture(self, caused_test_fixture: TestFixture):245 from .plogger import preporter246 self.test_fixture.status = TestFixtureStatus.SKIPPED247 self.test_fixture.skip_message = "@%s failed, so skipped." % caused_test_fixture.fixture_type.value248 preporter.warn("@%s failed, so skipped." % caused_test_fixture.fixture_type.value)249class TestFixtureSubExecutor(TestExecutor):250 def __init__(self, test_fixture_executor: TestFixtureExecutor):251 TestExecutor.__init__(self, test_fixture_executor)252 self.test_fixture = test_fixture_executor.test_fixture253 self.setDaemon(True)254 def _run(self):255 if isinstance(self.test_fixture, Test):256 self.run_test()257 else:258 self.run_test_configuration()259 def run_test(self):260 if self.test_fixture.expected_exceptions:261 expected_exceptions = self.test_fixture.expected_exceptions262 expected_exceptions_names = str(["%s.%s" % (e.__module__, e.__name__) for e in expected_exceptions.keys()])263 try:264 params = self.test_fixture.parameters or []265 call_function(self.test_fixture.test_fixture_ref, *params)266 except Exception as e:267 exception = e.__class__268 exception_name = "%s.%s" % (exception.__module__, exception.__name__)269 matched_exceptions = [expected_exception for expected_exception in expected_exceptions.keys() if270 issubclass(exception, expected_exception)]271 if matched_exceptions:272 def cmp_matched_exception(exception_a, exception_b):273 return -1 if issubclass(exception_a, exception_b) else 1274 matched_exception = sorted(matched_exceptions, key=cmp_to_key(cmp_matched_exception))[0]275 if not expected_exceptions[matched_exception] or expected_exceptions[matched_exception].search(str(e)):276 self.test_fixture.status = TestFixtureStatus.PASSED277 else:278 self.test_fixture.status = TestFixtureStatus.FAILED279 self.test_fixture.failure_message = "The exception <%s> was thrown with the wrong message: Expected message regex: <%s>, Actual message: <%s>." \280 % (exception_name, expected_exceptions[matched_exception].pattern, str(e))281 self.test_fixture.failure_type = "WrongExceptionMessageError"282 self.test_fixture.stack_trace = traceback.format_exc()283 preporter.error("Failed with following message:\n%s\n%s" % (self.test_fixture.failure_message, self.test_fixture.stack_trace), True)284 else:285 self.test_fixture.status = TestFixtureStatus.FAILED286 self.test_fixture.failure_message = "Expected exception: one of %s, Actual exception: <%s>." \287 % (expected_exceptions_names, exception_name)288 self.test_fixture.failure_type = "WrongExceptionThrownError"289 self.test_fixture.stack_trace = traceback.format_exc()290 preporter.error("Failed with following message:\n%s\n%s" % (self.test_fixture.failure_message, self.test_fixture.stack_trace), True)291 else:292 self.test_fixture.status = TestFixtureStatus.FAILED293 self.test_fixture.failure_message = "Expected exception: one of %s, Actual: NO exception was thrown." \294 % expected_exceptions_names295 self.test_fixture.failure_type = "NoExceptionThrownError"296 self.test_fixture.stack_trace = self.test_fixture.failure_message297 preporter.error("Failed with following message:\n%s" % self.test_fixture.failure_message, True)298 else:299 try:300 params = self.test_fixture.parameters or []301 call_function(self.test_fixture.test_fixture_ref, *params)302 except Exception as e:303 self.test_fixture.status = TestFixtureStatus.FAILED304 self.test_fixture.failure_message = str(e).strip() or "\n".join([str(arg) for arg in e.args])305 self.test_fixture.failure_type = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)306 self.test_fixture.stack_trace = traceback.format_exc()307 preporter.error("Failed with following message:\n%s" % self.test_fixture.stack_trace, True)308 else:309 self.test_fixture.status = TestFixtureStatus.PASSED310 def run_test_configuration(self):311 try:312 params = {1: [], 2: [self.test_fixture.context]}[self.test_fixture.parameters_count]313 call_function(self.test_fixture.test_fixture_ref, *params)314 except Exception as e:315 self.test_fixture.status = TestFixtureStatus.FAILED316 self.test_fixture.failure_message = str(e).strip() or "\n".join([str(arg) for arg in e.args])317 self.test_fixture.failure_type = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)318 self.test_fixture.stack_trace = traceback.format_exc()319 preporter.error("Failed with following message:\n%s" % self.test_fixture.stack_trace, True)320 else:321 self.test_fixture.status = TestFixtureStatus.PASSED322def current_executor():...
test_service_builder.py
Source:test_service_builder.py
1# Copyright 2016 F5 Networks Inc.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import mock15import pytest16from uuid import uuid417from f5lbaasdriver.v2.bigip import exceptions as f5_exc18from f5lbaasdriver.v2.bigip.service_builder import LBaaSv2ServiceBuilder19class FakeDict(dict):20 """Can be used as Neutron model object or as service builder dict"""21 def __init__(self, *args, **kwargs):22 super(FakeDict, self).__init__(*args, **kwargs)23 if 'id' not in kwargs:24 self['id'] = _uuid()25 def __getattr__(self, item):26 """Needed for using as a model object"""27 if item in self:28 return self[item]29 else:30 return None31 def to_api_dict(self):32 return self33 def to_dict(self, **kwargs):34 return self35def _uuid():36 """Create a random UUID string for model object IDs"""37 return str(uuid4())38@pytest.fixture39def listeners():40 return [FakeDict(default_pool=FakeDict(),41 l7_policies=[]),42 FakeDict(default_pool=FakeDict(),43 l7_policies=[])]44@pytest.fixture45def l7policies():46 policies = []47 ids = [_uuid(), _uuid()]48 for i, id in enumerate(ids):49 policy = FakeDict(listener_id=id,50 listeners=[FakeDict(id=id)])51 assert policy.listener_id == policy.listeners[0].id == id52 policies.append(policy)53 return policies54@pytest.fixture55def two_listener_l7policies():56 return [FakeDict(listeners=[FakeDict(), FakeDict()])]57@pytest.fixture58def l7rules():59 return [FakeDict(policies=[FakeDict()]),60 FakeDict(policies=[FakeDict()])]61@pytest.fixture62def two_policy_l7rules():63 return [FakeDict(policies=[FakeDict(), FakeDict()])]64@pytest.fixture65def loadbalancer():66 return FakeDict()67@pytest.fixture68def monitors():69 return [FakeDict(),70 FakeDict()]71@pytest.fixture72def pools(monitors):73 pools = []74 for monitor in monitors:75 pool = FakeDict(healthmonitor_id=monitor['id'])76 monitor['pool_id'] = pool['id']77 pools.append(pool)78 return pools79@pytest.fixture80def members():81 return [FakeDict(subnet_id=_uuid())]82def subnet():83 return FakeDict(network_id=_uuid())84def test_get_l7policies(listeners, l7policies):85 """Test that get_l7policies returns valid list of dict"""86 context = mock.MagicMock()87 driver = mock.MagicMock()88 service_builder = LBaaSv2ServiceBuilder(driver)89 service_builder.driver.plugin.db.get_l7policies = \90 mock.MagicMock(return_value=l7policies)91 policies = service_builder._get_l7policies(context, listeners)92 assert len(policies) > 093 assert policies[0] is l7policies[0]94def test_get_l7policies_filter(listeners):95 """Test that get_l7policies() is called with filter of listener IDs"""96 context = mock.MagicMock()97 driver = mock.MagicMock()98 # construct an equivalent filter to what service_builder should use99 filters = {'listener_id': [l['id'] for l in listeners]}100 service_builder = LBaaSv2ServiceBuilder(driver)101 service_builder._get_l7policies(context, listeners)102 # assert that the expected filter was used103 service_builder.driver.plugin.db.get_l7policies.assert_called_with(104 context, filters=filters)105def test_get_l7policies_no_listeners():106 """Test that an empty listener list input returns an empty policy list."""107 context = mock.MagicMock()108 driver = mock.MagicMock()109 listeners = []110 service_builder = LBaaSv2ServiceBuilder(driver)111 l7policies = service_builder._get_l7policies(context, listeners)112 assert not l7policies113def test_get_l7policy_rules(l7policies, l7rules):114 """Test that get_l7policies returns valid list of dict"""115 context = mock.MagicMock()116 driver = mock.MagicMock()117 service_builder = LBaaSv2ServiceBuilder(driver)118 service_builder.driver.plugin.db.get_l7policy_rules = mock.MagicMock(119 return_value=l7rules)120 rules = service_builder._get_l7policy_rules(context, l7policies)121 assert len(rules) > 0122 assert rules[0] is l7rules[0].to_api_dict()123def test_get_l7policy_rules_filter(l7policies):124 """Test that get_l7policy_rules() is called with filter of l7policy IDs"""125 context = mock.MagicMock()126 driver = mock.MagicMock()127 service_builder = LBaaSv2ServiceBuilder(driver)128 service_builder._get_l7policy_rules(context, l7policies)129 assert service_builder.driver.plugin.db.get_l7policy_rules.call_args_list \130 == [mock.call(context, l7policies[0]['id']),131 mock.call(context, l7policies[1]['id'])]132def test_get_l7policy_rules_no_policies():133 """Test that an empty policies input list returns an empty rule list."""134 context = mock.MagicMock()135 driver = mock.MagicMock()136 l7policies = []137 service_builder = LBaaSv2ServiceBuilder(driver)138 rules = service_builder._get_l7policy_rules(context, l7policies)139 assert not rules140def test_get_l7policies_more_than_one_listener_error(141 listeners, two_listener_l7policies):142 """Exception is raised when > 1 listener for a policy."""143 context = mock.MagicMock()144 driver = mock.MagicMock()145 service_builder = LBaaSv2ServiceBuilder(driver)146 service_builder.driver.plugin.db.get_l7policies = mock.MagicMock(147 return_value=two_listener_l7policies)148 with pytest.raises(f5_exc.PolicyHasMoreThanOneListener) as ex:149 service_builder._get_l7policies(context, listeners)150 assert 'A policy should have only one listener, but found 2 for policy ' +\151 two_listener_l7policies[0].id in ex.value.message152def test_get_l7policy_rules_more_than_one_policy(153 l7policies, two_policy_l7rules):154 """Exception is raised when > 1 policy for a rule."""155 context = mock.MagicMock()156 driver = mock.MagicMock()157 service_builder = LBaaSv2ServiceBuilder(driver)158 service_builder.driver.plugin.db.get_l7policy_rules = mock.MagicMock(159 return_value=two_policy_l7rules)160 with pytest.raises(f5_exc.RuleHasMoreThanOnePolicy) as ex:161 service_builder._get_l7policy_rules(context, l7policies)162 assert 'A rule should have only one policy, but found 2 for rule ' + \163 two_policy_l7rules[0].id in ex.value.message164def test_get_listeners(loadbalancer, listeners):165 context = mock.MagicMock()166 driver = mock.MagicMock()167 service_builder = LBaaSv2ServiceBuilder(driver)168 service_builder.driver.plugin.db.get_listeners = mock.MagicMock(169 return_value=listeners)170 test_listeners = service_builder._get_listeners(context, loadbalancer)171 assert len(test_listeners) == len(listeners)172 assert test_listeners[0] == listeners[0].to_api_dict()173 assert test_listeners[1] == listeners[1].to_api_dict()174def test_get_pools(loadbalancer, pools, monitors):175 context = mock.MagicMock()176 driver = mock.MagicMock()177 service_builder = LBaaSv2ServiceBuilder(driver)178 service_builder.driver.plugin.db.get_pools_and_healthmonitors = \179 mock.MagicMock(return_value=(pools, monitors))180 test_pools, test_monitors = \181 service_builder._get_pools_and_healthmonitors(182 context, loadbalancer)183 for pool, test_pool, monitor in zip(pools, test_pools, monitors):184 assert test_pool is pool185 assert test_pool['healthmonitor_id'] == monitor['id']186def test_get_members(pools, members):187 context = mock.MagicMock()188 driver = mock.MagicMock()189 subnet_map = mock.MagicMock()190 network_map = mock.MagicMock()191 service_builder = LBaaSv2ServiceBuilder(driver)192 service_builder.driver.plugin.db._get_members = \193 mock.MagicMock(return_value=members)194 test_members = service_builder._get_members(context, pools,195 subnet_map, network_map)196 for test_member, member in zip(test_members, members):197 assert test_member is member198def test__pool_to_dict():199 '''Ensure function does not add listeners or listener_id to pool dict.'''200 driver = mock.MagicMock()201 fake_pool = FakeDict()202 fake_pool.members = []203 fake_pool.l7_policies = []204 sb = LBaaSv2ServiceBuilder(driver)205 pool_dict = sb._pool_to_dict(fake_pool)206 assert 'listener_id' not in pool_dict...
test_listeners.py
Source:test_listeners.py
1import unittest2from robot.output.listeners import Listeners3from robot.output import LOGGER4from robot.utils.asserts import *5from robot.utils import JYTHON6from robot.running.outputcapture import OutputCapturer7LOGGER.unregister_console_logger()8class _Mock:9 def __getattr__(self, name):10 return ''11class SuiteMock(_Mock):12 def __init__(self):13 self.name = 'suitemock'14 self.doc = 'somedoc'15 self.status = 'PASS'16 self.tests = self.suites = []17 stat_message = 'stat message'18 full_message = 'full message'19class TestMock(_Mock):20 def __init__(self):21 self.name = 'testmock'22 self.doc = 'cod'23 self.tags = ['foo', 'bar']24 self.message = 'Expected failure'25 self.status = 'FAIL'26class KwMock(_Mock):27 def __init__(self):28 self.name = 'kwmock'29 self.args = ['a1', 'a2']30 self.status = 'PASS'31class ListenOutputs(object):32 def output_file(self, path):33 self._out_file('Output', path)34 def report_file(self, path):35 self._out_file('Report', path)36 def log_file(self, path):37 self._out_file('Log', path)38 def debug_file(self, path):39 self._out_file('Debug', path)40 def xunit_file(self, path):41 self._out_file('XUnit', path)42 def _out_file(self, name, path):43 print '%s: %s' % (name, path)44class ListenAllOldStyle(ListenOutputs):45 def start_suite(self, name, doc):46 print "SUITE START: %s '%s'" % (name, doc)47 def start_test(self, name, doc, tags):48 tags = ', '.join([ str(tag) for tag in tags ])49 print "TEST START: %s '%s' %s" % (name, doc, tags)50 def start_keyword(self, name, args):51 args = [ str(arg) for arg in args ]52 print "KW START: %s %s" % (name, args)53 def end_keyword(self, status):54 print "KW END: %s" % (status)55 def end_test(self, status, message):56 if status == 'PASS':57 print 'TEST END: PASS'58 else:59 print "TEST END: %s %s" % (status, message)60 def end_suite(self, status, message):61 print 'SUITE END: %s %s' % (status, message)62 def close(self):63 print 'Closing...'64class ListenAllNewStyle(ListenOutputs):65 ROBOT_LISTENER_API_VERSION = '2'66 def start_suite(self, name, attrs):67 print "SUITE START: %s '%s'" % (name, attrs['doc'])68 def start_test(self, name, attrs):69 print "TEST START: %s '%s' %s" % (name, attrs['doc'],70 ', '.join(attrs['tags']))71 def start_keyword(self, name, attrs):72 args = [ str(arg) for arg in attrs['args'] ]73 print "KW START: %s %s" % (name, args)74 def end_keyword(self, name, attrs):75 print "KW END: %s" % attrs['status']76 def end_test(self, name, attrs):77 if attrs['status'] == 'PASS':78 print 'TEST END: PASS'79 else:80 print "TEST END: %s %s" % (attrs['status'], attrs['message'])81 def end_suite(self, name, attrs):82 print 'SUITE END: %s %s' % (attrs['status'], attrs['statistics'])83 def close(self):84 print 'Closing...'85class InvalidListenerOldStyle:86 def start_suite(self, wrong, number, of, args):87 pass88 end_suite = start_test = end_test = start_keyword = end_keyword =\89 log_file = close = lambda self, *args: 1/090class _BaseListenerTest:91 stat_message = ''92 def setUp(self):93 self.listeners = Listeners([(self.listener_name, [])])94 self.listener = self.listeners._listeners[0]95 self.capturer = OutputCapturer()96 def test_start_suite(self):97 self.listeners.start_suite(SuiteMock())98 self._assert_output("SUITE START: suitemock 'somedoc'")99 def test_start_test(self):100 self.listeners.start_test(TestMock())101 self._assert_output("TEST START: testmock 'cod' foo, bar")102 def test_start_keyword(self):103 self.listeners.start_keyword(KwMock())104 self._assert_output("KW START: kwmock ['a1', 'a2']")105 def test_end_keyword(self):106 self.listeners.end_keyword(KwMock())107 self._assert_output("KW END: PASS")108 def test_end_test(self):109 self.listeners.end_test(TestMock())110 self._assert_output('TEST END: FAIL Expected failure')111 def test_end_suite(self):112 self.listeners.end_suite(SuiteMock())113 self._assert_output('SUITE END: PASS ' + self.stat_message)114 def test_output_file(self):115 self.listeners.output_file('output', 'path/to/output')116 self._assert_output('Output: path/to/output')117 def test_log_file(self):118 self.listeners.output_file('log', 'path/to/log')119 self._assert_output('Log: path/to/log')120 def test_report_file(self):121 self.listeners.output_file('report', 'path/to/report')122 self._assert_output('Report: path/to/report')123 def test_debug_file(self):124 self.listeners.output_file('debug', 'path/to/debug')125 self._assert_output('Debug: path/to/debug')126 def test_xunit_file(self):127 self.listeners.output_file('XUnit', 'path/to/xunit')128 self._assert_output('XUnit: path/to/xunit')129 def test_close(self):130 self.listeners.close()131 self._assert_output('Closing...')132 def _assert_output(self, expected):133 stdout, stderr = self.capturer._release()134 assert_equals(stderr, '')135 assert_equals(stdout.rstrip(), expected)136class TestOldStyleListeners(_BaseListenerTest, unittest.TestCase):137 listener_name = 'test_listeners.ListenAllOldStyle'138 stat_message = 'full message'139 def test_importing(self):140 assert_equals(self.listener.version, 1)141class TestNewStyleListeners(_BaseListenerTest, unittest.TestCase):142 listener_name = 'test_listeners.ListenAllNewStyle'143 stat_message = 'stat message'144 def test_importing(self):145 assert_equals(self.listener.version, 2)146class TestInvalidOldStyleListener(unittest.TestCase):147 def test_calling_listener_methods_fails(self):148 listenres = Listeners([('test_listeners.InvalidListenerOldStyle', [])])149 for name, args in [('start_suite', [SuiteMock()]),150 ('end_suite', [SuiteMock()]),151 ('start_test', [TestMock()]),152 ('end_test', [TestMock()]),153 ('start_keyword', [KwMock()]),154 ('end_keyword', [KwMock()]),155 ('output_file', ['log', '/path']),156 ('close', [])]:157 getattr(listenres, name)(*args)158if JYTHON:159 class TestJavaListener(_BaseListenerTest, unittest.TestCase):160 listener_name = 'NewStyleJavaListener'161 stat_message = 'stat message'162 def test_importing(self):163 assert_equals(self.listener.version, 2)164if __name__ == '__main__':...
ptest_runner.py
Source:ptest_runner.py
1try:2 import ptest3except ImportError:4 raise NameError("No ptest runner found in selected interpreter.")5try:6 from tc_messages import TeamcityServiceMessages7except ImportError:8 raise NameError("No tc_messages module found in selected interpreter.")9from ptest.plistener import TestListener10from ptest.enumeration import TestCaseStatus11class TeamcityTestListener(TestListener):12 def __init__(self):13 self.messages = TeamcityServiceMessages(prepend_linebreak=True)14 def on_test_suite_start(self, test_suite):15 self.messages.testMatrixEntered()16 self.messages.testCount(len(test_suite.test_cases))17 def on_test_suite_finish(self, test_suite):18 pass19 def on_test_class_start(self, test_class):20 self.messages.testSuiteStarted(suiteName=test_class.full_name)21 def on_test_class_finish(self, test_class):22 self.messages.testSuiteFinished(suiteName=test_class.full_name)23 def on_test_group_start(self, test_group):24 if not hasattr(test_group.test_class, "is_group_feature_used") or test_group.test_class.is_group_feature_used:25 self.messages.testSuiteStarted(suiteName=test_group.name)26 def on_test_group_finish(self, test_group):27 if not hasattr(test_group.test_class, "is_group_feature_used") or test_group.test_class.is_group_feature_used:28 self.messages.testSuiteFinished(suiteName=test_group.name)29 def on_test_case_start(self, test_case):30 self.messages.testStarted(testName=test_case.name, location=test_case.location)31 def on_test_case_finish(self, test_case):32 if test_case.status == TestCaseStatus.FAILED:33 self.messages.testFailed(testName=test_case.name)34 elif test_case.status == TestCaseStatus.SKIPPED:35 self.messages.testIgnored(testName=test_case.name)36 self.messages.testFinished(testName=test_case.name, duration=int(test_case.elapsed_time * 1000.0))37def _main():38 from ptest.main import main39 from ptest.plistener import test_listeners40 test_listeners.set_outer_test_listener(TeamcityTestListener())41 main()42if __name__ == '__main__':...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!