Best Python code snippet using lisa_python
testsuite.py
Source:testsuite.py
...167 if hasattr(self, "_timer"):168 self.elapsed = self._timer.elapsed(False)169 fields = ["status", "elapsed", "id_", "log_file"]170 result_message = TestResultMessage()171 set_filtered_fields(self, result_message, fields=fields)172 metadata_fields = [173 "area",174 "category",175 "tags",176 "description",177 "priority",178 "owner",179 ]180 metadata_information = fields_to_dict(181 src=self.runtime_data.metadata, fields=metadata_fields182 )183 self.information.update(metadata_information)184 # get information of default node, and send to notifier.185 if self.environment:186 self.information.update(self.environment.get_information())187 self.information["environment"] = self.environment.name188 result_message.information.update(self.information)189 result_message.message = self.message[0:2048] if self.message else ""190 result_message.name = self.runtime_data.metadata.name191 result_message.full_name = self.runtime_data.metadata.full_name192 result_message.suite_name = self.runtime_data.metadata.suite.name193 result_message.suite_full_name = self.runtime_data.metadata.suite.full_name194 result_message.stacktrace = stacktrace195 # some extensions may need to update or fill information.196 plugin_manager.hook.update_test_result_message(message=result_message)197 notifier.notify(result_message)198@dataclass199class TestCaseRequirement:200 environment: Optional[EnvironmentSpace] = None201 environment_status: EnvironmentStatus = EnvironmentStatus.Connected202 platform_type: Optional[search_space.SetSpace[str]] = None203 os_type: Optional[search_space.SetSpace[Type[OperatingSystem]]] = None204def _create_test_case_requirement(205 node: schema.NodeSpace,206 supported_platform_type: Optional[List[str]] = None,207 unsupported_platform_type: Optional[List[str]] = None,208 supported_os: Optional[List[Type[OperatingSystem]]] = None,209 unsupported_os: Optional[List[Type[OperatingSystem]]] = None,210 supported_features: Optional[211 List[Union[Type[Feature], schema.FeatureSettings, str]]212 ] = None,213 unsupported_features: Optional[214 List[Union[Type[Feature], schema.FeatureSettings, str]]215 ] = None,216 environment_status: EnvironmentStatus = EnvironmentStatus.Connected,217) -> TestCaseRequirement:218 if supported_features:219 node.features = search_space.SetSpace[schema.FeatureSettings](220 is_allow_set=True,221 items=[Feature.get_feature_settings(x) for x in supported_features],222 )223 if unsupported_features:224 node.excluded_features = search_space.SetSpace[schema.FeatureSettings](225 is_allow_set=False,226 items=[Feature.get_feature_settings(x) for x in unsupported_features],227 )228 nodes: List[schema.NodeSpace] = [node]229 platform_types = search_space.create_set_space(230 supported_platform_type, unsupported_platform_type, "platform type"231 )232 # Most test cases are applied to Linux, exclude Windows by default.233 if unsupported_os is None and supported_os is None:234 unsupported_os = [Windows]235 os = search_space.create_set_space(supported_os, unsupported_os, "operating system")236 return TestCaseRequirement(237 environment=EnvironmentSpace(nodes=nodes),238 platform_type=platform_types,239 os_type=os,240 environment_status=environment_status,241 )242def node_requirement(243 node: schema.NodeSpace,244 supported_platform_type: Optional[List[str]] = None,245 unsupported_platform_type: Optional[List[str]] = None,246 supported_os: Optional[List[Type[OperatingSystem]]] = None,247 unsupported_os: Optional[List[Type[OperatingSystem]]] = None,248 environment_status: EnvironmentStatus = EnvironmentStatus.Connected,249) -> TestCaseRequirement:250 return _create_test_case_requirement(251 node,252 supported_platform_type,253 unsupported_platform_type,254 supported_os,255 unsupported_os,256 None,257 None,258 environment_status,259 )260def simple_requirement(261 min_count: int = 1,262 min_core_count: int = 1,263 min_nic_count: Optional[int] = None,264 min_data_disk_count: Optional[int] = None,265 disk: Optional[schema.DiskOptionSettings] = None,266 network_interface: Optional[schema.NetworkInterfaceOptionSettings] = None,267 supported_platform_type: Optional[List[str]] = None,268 unsupported_platform_type: Optional[List[str]] = None,269 supported_os: Optional[List[Type[OperatingSystem]]] = None,270 unsupported_os: Optional[List[Type[OperatingSystem]]] = None,271 supported_features: Optional[272 List[Union[Type[Feature], schema.FeatureSettings, str]]273 ] = None,274 unsupported_features: Optional[275 List[Union[Type[Feature], schema.FeatureSettings, str]]276 ] = None,277 environment_status: EnvironmentStatus = EnvironmentStatus.Connected,278) -> TestCaseRequirement:279 """280 define a simple requirement to support most test cases.281 """282 node = schema.NodeSpace()283 node.node_count = search_space.IntRange(min=min_count)284 node.core_count = search_space.IntRange(min=min_core_count)285 if min_data_disk_count or disk:286 if not disk:287 disk = schema.DiskOptionSettings()288 if min_data_disk_count:289 disk.data_disk_count = search_space.IntRange(min=min_data_disk_count)290 node.disk = disk291 if min_nic_count or network_interface:292 if not network_interface:293 network_interface = schema.NetworkInterfaceOptionSettings()294 if min_nic_count:295 network_interface.nic_count = search_space.IntRange(min=min_nic_count)296 node.network_interface = network_interface297 return _create_test_case_requirement(298 node,299 supported_platform_type,300 unsupported_platform_type,301 supported_os,302 unsupported_os,303 supported_features,304 unsupported_features,305 environment_status,306 )307DEFAULT_REQUIREMENT = simple_requirement()308class TestSuiteMetadata:309 def __init__(310 self,311 area: str,312 category: str,313 description: str,314 tags: Optional[List[str]] = None,315 name: str = "",316 requirement: TestCaseRequirement = DEFAULT_REQUIREMENT,317 owner: str = "Microsoft",318 full_name: str = "",319 ) -> None:320 self.name = name321 self.full_name = full_name322 self.cases: List[TestCaseMetadata] = []323 self.tags: List[str] = tags if tags else []324 self.area = area325 self.category = category326 if tags:327 self.tags = tags328 else:329 self.tags = []330 self.description = description331 self.requirement = requirement332 self.owner = owner333 def __call__(self, test_class: Type[TestSuite]) -> Callable[..., object]:334 self.test_class = test_class335 if not self.name:336 self.name = test_class.__name__337 self.full_name = test_class.__qualname__338 _add_suite_metadata(self)339 @wraps(self.test_class)340 def wrapper(341 test_class: Type[TestSuite],342 metadata: TestSuiteMetadata,343 ) -> TestSuite:344 return test_class(metadata)345 return wrapper346class TestCaseMetadata:347 def __init__(348 self,349 description: str,350 priority: int = 2,351 timeout: int = 3600,352 use_new_environment: bool = False,353 owner: str = "",354 requirement: Optional[TestCaseRequirement] = None,355 ) -> None:356 self.suite: TestSuiteMetadata357 self.priority = priority358 self.description = description359 self.timeout = timeout360 self.use_new_environment = use_new_environment361 if requirement:362 self.requirement = requirement363 self._owner = owner364 def __getattr__(self, key: str) -> Any:365 # return attributes of test suite, if it's not redefined in case level366 assert self.suite, "suite is not set before use metadata"367 return getattr(self.suite, key)368 def __call__(self, func: Callable[..., None]) -> Callable[..., None]:369 self.name = func.__name__370 self.full_name = func.__qualname__371 self.qualname = func.__qualname__372 self._func = func373 _add_case_metadata(self)374 @wraps(self._func)375 def wrapper(*args: Any, **kwargs: Any) -> None:376 parameters: Dict[str, Any] = {}377 for name in kwargs.keys():378 if name in func.__annotations__:379 parameters[name] = kwargs[name]380 func(*args, **parameters)381 return wrapper382 @property383 def owner(self) -> str:384 if self._owner:385 return self._owner386 return self.suite.owner387class TestCaseRuntimeData:388 def __init__(self, metadata: TestCaseMetadata):389 self.metadata = metadata390 # all runtime setting fields391 self.select_action: str = ""392 self.times: int = 1393 self.retry: int = 0394 self.use_new_environment: bool = metadata.use_new_environment395 self.ignore_failure: bool = False396 self.environment_name: str = ""397 def __getattr__(self, key: str) -> Any:398 # return attributes of metadata for convenient399 assert self.metadata400 return getattr(self.metadata, key)401 def __repr__(self) -> str:402 return (403 f"name: {self.metadata.name}, "404 f"action: {self.select_action}, "405 f"times: {self.times}, retry: {self.retry}, "406 f"new_env: {self.use_new_environment}, "407 f"ignore_failure: {self.ignore_failure}, "408 f"env_name: {self.environment_name}"409 )410 def clone(self) -> TestCaseRuntimeData:411 cloned = TestCaseRuntimeData(self.metadata)412 fields = [413 constants.TESTCASE_SELECT_ACTION,414 constants.TESTCASE_TIMES,415 constants.TESTCASE_RETRY,416 constants.TESTCASE_USE_NEW_ENVIRONMENT,417 constants.TESTCASE_IGNORE_FAILURE,418 constants.ENVIRONMENT,419 ]420 set_filtered_fields(self, cloned, fields)421 return cloned422class TestSuite:423 def __init__(424 self,425 metadata: TestSuiteMetadata,426 ) -> None:427 super().__init__()428 self._metadata = metadata429 self._should_stop = False430 self.__log = get_logger("suite", metadata.name)431 def before_suite(self, log: Logger, **kwargs: Any) -> None:432 ...433 def after_suite(self, log: Logger, **kwargs: Any) -> None:434 ......
features.py
Source:features.py
...342 value = AwsDiskOptionSettings()343 super_value = schema.DiskOptionSettings._call_requirement_method(344 self, method_name, capability345 )346 set_filtered_fields(super_value, value, ["data_disk_count"])347 cap_disk_type = capability.disk_type348 if isinstance(cap_disk_type, search_space.SetSpace):349 assert (350 len(cap_disk_type) > 0351 ), "capability should have at least one disk type, but it's empty"352 elif isinstance(cap_disk_type, schema.DiskType):353 cap_disk_type = search_space.SetSpace[schema.DiskType](354 is_allow_set=True, items=[cap_disk_type]355 )356 else:357 raise LisaException(358 f"unknown disk type on capability, type: {cap_disk_type}"359 )360 value.disk_type = getattr(search_space, f"{method_name}_setspace_by_priority")(...
testselector.py
Source:testselector.py
...103 constants.TESTCASE_RETRY,104 constants.TESTCASE_IGNORE_FAILURE,105 constants.ENVIRONMENT,106 ]107 set_filtered_fields(case_runbook, applied_case_data, fields)108 applied_case_data.use_new_environment = (109 applied_case_data.use_new_environment or case_runbook.use_new_environment110 )111 # use default value from selector112 applied_case_data.select_action = action113def _force_check(114 name: str,115 is_force: bool,116 force_expected_set: Set[str],117 force_exclusive_set: Set[str],118 temp_force_exclusive_set: Set[str],119 case_runbook: schema.TestCase,120) -> bool:121 is_skip = False...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!