Best Python code snippet using lisa_python
schema.py
Source:schema.py
1# Copyright (c) Microsoft Corporation.2# Licensed under the MIT license.3import copy4from dataclasses import dataclass, field5from enum import Enum6from functools import partial7from pathlib import Path8from typing import Any, Dict, List, Optional, Type, TypeVar, Union, cast9from dataclasses_json import (10 CatchAll,11 DataClassJsonMixin,12 Undefined,13 config,14 dataclass_json,15)16from marshmallow import ValidationError, fields, validate17from lisa import search_space18from lisa.secret import PATTERN_HEADTAIL, add_secret19from lisa.util import (20 BaseClassMixin,21 LisaException,22 constants,23 deep_update_dict,24 field_metadata,25 strip_strs,26)27"""28Schema is dealt with three components,291. dataclasses. It's a builtin class, uses to define schema of an instance. field()30 function uses to describe a field.312. dataclasses_json. Serializer. config() function customizes this component.323. marshmallow. Validator. It's wrapped by dataclasses_json. config(mm_field=xxx)33 function customizes this component.34"""35T = TypeVar("T")36class ListableValidator(validate.Validator):37 default_message = ""38 def __init__(39 self,40 value_type: type,41 value_validator: Optional[42 Union[validate.Validator, List[validate.Validator]]43 ] = None,44 error: str = "",45 ) -> None:46 self._value_type: Any = value_type47 if value_validator is None:48 self._inner_validator: List[validate.Validator] = []49 elif callable(value_validator):50 self._inner_validator = [value_validator]51 elif isinstance(value_validator, list):52 self._inner_validator = list(value_validator)53 else:54 raise ValueError(55 "The 'value_validator' parameter must be a callable "56 "or a collection of callables."57 )58 self.error: str = error or self.default_message59 def _repr_args(self) -> str:60 return f"_inner_validator={self._inner_validator}"61 def _format_error(self, value: Any) -> str:62 return self.error.format(input=value)63 def __call__(self, value: Any) -> Any:64 if isinstance(value, self._value_type):65 if self._inner_validator:66 for validator in self._inner_validator:67 validator(value)68 elif isinstance(value, list):69 for value_item in value:70 assert isinstance(value_item, self._value_type), (71 f"must be '{self._value_type}' but '{value_item}' "72 f"is '{type(value_item)}'"73 )74 if self._inner_validator:75 for validator in self._inner_validator:76 validator(value_item)77 elif value is not None:78 raise ValidationError(79 f"must be Union[{self._value_type}, List[{self._value_type}]], "80 f"but '{value}' is '{type(value)}'"81 )82 return value83@dataclass_json(undefined=Undefined.INCLUDE)84@dataclass85class ExtendableSchemaMixin:86 extended_schemas: CatchAll = field(default_factory=dict) # type: ignore87 def get_extended_runbook(self, runbook_type: Type[T], type_name: str = "") -> T:88 """89 runbook_type: type of runbook90 field_name: the field name which stores the data, if it's "", get it from type91 """92 if not hasattr(self, "_extended_runbook"):93 type_name = self.__resolve_type_name(94 runbook_type=runbook_type, type_name=type_name95 )96 if self.extended_schemas and type_name in self.extended_schemas:97 self._extended_runbook: T = load_by_type(98 runbook_type, self.extended_schemas[type_name]99 )100 else:101 # value may be filled outside, so hold and return an object.102 self._extended_runbook = runbook_type()103 # if there is any extra key, raise exception to help user find it earlier.104 if self.extended_schemas and len(self.extended_schemas) > 0:105 expected_extra_count = 0106 if type_name in self.extended_schemas:107 expected_extra_count = 1108 if len(self.extended_schemas) > expected_extra_count:109 extra_names = [110 name for name in self.extended_schemas if name != type_name111 ]112 raise LisaException(113 f"unknown keys in extendable schema [{runbook_type.__name__}]: "114 f"{extra_names}"115 )116 return self._extended_runbook117 def set_extended_runbook(self, runbook: Any, type_name: str = "") -> None:118 self._extended_runbook = runbook119 if self.extended_schemas and type_name in self.extended_schemas:120 # save extended runbook back to raw dict121 self.extended_schemas[type_name] = runbook.to_dict()122 def __resolve_type_name(self, runbook_type: Type[Any], type_name: str) -> str:123 assert issubclass(124 runbook_type, DataClassJsonMixin125 ), "runbook_type must annotate from DataClassJsonMixin"126 if not type_name:127 assert hasattr(self, constants.TYPE), (128 f"cannot find type attr on '{runbook_type.__name__}'."129 f"either set field_name or make sure type attr exists."130 )131 type_name = getattr(self, constants.TYPE)132 return type_name133 def __repr__(self) -> str:134 result = ""135 if hasattr(self, "_extended_runbook"):136 result = f"ext:{self._extended_runbook}"137 elif self.extended_schemas:138 result = f"ext:{self.extended_schemas}"139 return result140 def __hash__(self) -> int:141 return super().__hash__()142@dataclass_json()143@dataclass144class TypedSchema:145 type: str = field(default="", metadata=field_metadata(required=True))146 def __hash__(self) -> int:147 return super().__hash__()148@dataclass_json()149@dataclass150class Transformer(TypedSchema, ExtendableSchemaMixin):151 # the name can be referenced by other transformers. If it's not specified,152 # the type will be used.153 name: str = ""154 # prefix of generated variables. if it's not specified, the name will be155 # used. For example, a variable called "a" with the prefix "b", so the156 # variable name will be "b_a" in the variable dict157 prefix: str = ""158 # specify which transformers are depended.159 depends_on: List[str] = field(default_factory=list)160 # rename some of variables for easier use.161 rename: Dict[str, str] = field(default_factory=dict)162 # enable this transformer or not, only enabled transformers run actually.163 enabled: bool = True164 # decide when the transformer run. The init means run at very beginning165 # phase, which is before the combinator. The expanded means run after166 # combinator expanded variables.167 phase: str = field(168 default=constants.TRANSFORMER_PHASE_INIT,169 metadata=field_metadata(170 validate=validate.OneOf(171 [172 constants.TRANSFORMER_PHASE_INIT,173 constants.TRANSFORMER_PHASE_EXPANDED,174 constants.TRANSFORMER_PHASE_CLEANUP,175 ]176 ),177 ),178 )179 def __post_init__(self, *args: Any, **kwargs: Any) -> None:180 if not self.name:181 self.name = self.type182 if not self.prefix:183 self.prefix = self.name184@dataclass_json()185@dataclass186class Combinator(TypedSchema, ExtendableSchemaMixin):187 type: str = field(188 default=constants.COMBINATOR_GRID, metadata=field_metadata(required=True)189 )190@dataclass_json()191@dataclass192class Strategy:193 """194 node_path is the path of yaml node. For example:195 environment.nodes196 if node_path doesn't present, it means to all.197 operations include:198 overwrite: default behavior. add non-exist items and replace exist.199 remove: remove specified path totally.200 add: add non-exist, not replace exist.201 """202 node_path: str = field(default="", metadata=field_metadata(required=True))203 operation: str = field(204 default=constants.OPERATION_OVERWRITE,205 metadata=field_metadata(206 required=True,207 validate=validate.OneOf(208 [209 constants.OPERATION_ADD,210 constants.OPERATION_OVERWRITE,211 constants.OPERATION_REMOVE,212 ]213 ),214 ),215 )216@dataclass_json()217@dataclass218class Include:219 """220 Inclusion of runbook logic, for similar runs.221 """222 path: str = field(default="", metadata=field_metadata(required=True))223 strategy: Union[List[Strategy], Strategy, None] = None224@dataclass_json()225@dataclass226class Extension:227 path: str228 name: Optional[str] = None229 @classmethod230 def from_raw(cls, raw_data: Any) -> List["Extension"]:231 results: List[Extension] = []232 assert isinstance(raw_data, list), f"actual: {type(raw_data)}"233 for extension in raw_data:234 # convert to structured Extension235 if isinstance(extension, str):236 extension = Extension(path=extension)237 elif isinstance(extension, dict):238 extension = load_by_type(Extension, extension)239 results.append(extension)240 return results241@dataclass_json()242@dataclass243class Variable:244 """245 it uses to support variables in other fields.246 duplicate items will be overwritten one by one.247 if a variable is not defined here, LISA can fail earlier to ask check it.248 file path is relative to LISA command starts.249 """250 # If it's secret, it will be removed from log and other output information.251 # secret files also need to be removed after test252 # it's not recommended highly to put secret in runbook directly.253 is_secret: bool = False254 # continue to support v2 format. it's simple.255 file: str = field(256 default="",257 metadata=field_metadata(258 validate=validate.Regexp(r"([\w\W]+[.](xml|yml|yaml)$)|(^$)")259 ),260 )261 name: str = field(default="")262 value: Union[str, bool, int, Dict[Any, Any], List[Any], None] = field(default="")263 # True means this variable can be used in test cases.264 is_case_visible: bool = False265 mask: str = ""266 def __post_init__(self, *args: Any, **kwargs: Any) -> None:267 if self.file and (self.name or self.value):268 raise LisaException(269 f"file cannot be specified with name or value"270 f"file: '{self.file}'"271 f"name: '{self.name}'"272 f"value: '{self.value}'"273 )274@dataclass_json()275@dataclass276class Notifier(TypedSchema, ExtendableSchemaMixin):277 """278 it sends test progress and results to any place wanted.279 detail types are defined in notifier itself, allowed items are handled in code.280 """281 # A notifier is disabled, if it's false. It helps to disable notifier by282 # variables.283 enabled: bool = True284@dataclass_json()285@dataclass()286class FeatureSettings(287 search_space.RequirementMixin, TypedSchema, ExtendableSchemaMixin288):289 """290 It's the default feature setting. It's used by features without settings,291 and it's the base class of specified settings.292 """293 def __eq__(self, o: object) -> bool:294 assert isinstance(o, FeatureSettings), f"actual: {type(o)}"295 return self.type == o.type296 def __repr__(self) -> str:297 return self.type298 def __hash__(self) -> int:299 return hash(self._get_key())300 @staticmethod301 def create(302 type: str, extended_schemas: Optional[Dict[Any, Any]] = None303 ) -> "FeatureSettings":304 # If a feature has no setting, it will return the default settings.305 if extended_schemas:306 feature = FeatureSettings(type=type, extended_schemas=extended_schemas)307 else:308 feature = FeatureSettings(type=type)309 return feature310 def check(self, capability: Any) -> search_space.ResultReason:311 assert isinstance(capability, FeatureSettings), f"actual: {type(capability)}"312 # default FeatureSetting is a place holder, nothing to do.313 result = search_space.ResultReason()314 if self.type != capability.type:315 result.add_reason(316 f"settings are different, "317 f"requirement: {self.type}, capability: {capability.type}"318 )319 return result320 def _get_key(self) -> str:321 return self.type322 def _call_requirement_method(self, method_name: str, capability: Any) -> Any:323 assert isinstance(capability, FeatureSettings), f"actual: {type(capability)}"324 # default FeatureSetting is a place holder, nothing to do.325 value = FeatureSettings.create(self.type)326 # try best to intersect the extended schemas327 if method_name == search_space.RequirementMethod.intersect:328 if self.extended_schemas and capability and capability.extended_schemas:329 value.extended_schemas = deep_update_dict(330 self.extended_schemas,331 capability.extended_schemas,332 )333 else:334 value.extended_schemas = (335 capability.extended_schemas336 if capability and capability.extended_schemas337 else self.extended_schemas338 )339 return value340class DiskType(str, Enum):341 PremiumSSDLRS = "PremiumSSDLRS"342 Ephemeral = "Ephemeral"343 StandardHDDLRS = "StandardHDDLRS"344 StandardSSDLRS = "StandardSSDLRS"345# disk types are ordered by commonly and cost. The earlier is lower cost.346disk_type_priority: List[DiskType] = [347 DiskType.StandardHDDLRS,348 DiskType.StandardSSDLRS,349 DiskType.Ephemeral,350 DiskType.PremiumSSDLRS,351]352@dataclass_json()353@dataclass()354class DiskOptionSettings(FeatureSettings):355 type: str = constants.FEATURE_DISK356 disk_type: Optional[357 Union[search_space.SetSpace[DiskType], DiskType]358 ] = field( # type:ignore359 default_factory=partial(360 search_space.SetSpace,361 items=[362 DiskType.StandardHDDLRS,363 DiskType.StandardSSDLRS,364 DiskType.Ephemeral,365 DiskType.PremiumSSDLRS,366 ],367 ),368 metadata=field_metadata(369 decoder=partial(search_space.decode_set_space_by_type, base_type=DiskType)370 ),371 )372 data_disk_count: search_space.CountSpace = field(373 default_factory=partial(search_space.IntRange, min=0),374 metadata=field_metadata(decoder=search_space.decode_count_space),375 )376 data_disk_caching_type: str = field(377 default=constants.DATADISK_CACHING_TYPE_NONE,378 metadata=field_metadata(379 validate=validate.OneOf(380 [381 constants.DATADISK_CACHING_TYPE_NONE,382 constants.DATADISK_CACHING_TYPE_READONLY,383 constants.DATADISK_CACHING_TYPE_READYWRITE,384 ]385 ),386 ),387 )388 data_disk_iops: search_space.CountSpace = field(389 default_factory=partial(search_space.IntRange, min=0),390 metadata=field_metadata(391 allow_none=True, decoder=search_space.decode_count_space392 ),393 )394 data_disk_size: search_space.CountSpace = field(395 default_factory=partial(search_space.IntRange, min=0),396 metadata=field_metadata(397 allow_none=True, decoder=search_space.decode_count_space398 ),399 )400 max_data_disk_count: search_space.CountSpace = field(401 default=None,402 metadata=field_metadata(403 allow_none=True, decoder=search_space.decode_count_space404 ),405 )406 def __eq__(self, o: object) -> bool:407 assert isinstance(o, DiskOptionSettings), f"actual: {type(o)}"408 return (409 self.type == o.type410 and self.disk_type == o.disk_type411 and self.data_disk_count == o.data_disk_count412 and self.data_disk_caching_type == o.data_disk_caching_type413 and self.data_disk_iops == o.data_disk_iops414 and self.data_disk_size == o.data_disk_size415 and self.max_data_disk_count == o.max_data_disk_count416 )417 def __repr__(self) -> str:418 return (419 f"disk_type: {self.disk_type},"420 f"count: {self.data_disk_count},"421 f"caching: {self.data_disk_caching_type},"422 f"iops: {self.data_disk_iops},"423 f"size: {self.data_disk_size},"424 f"max_data_disk_count: {self.max_data_disk_count}"425 )426 def __str__(self) -> str:427 return self.__repr__()428 def __hash__(self) -> int:429 return super().__hash__()430 def check(self, capability: Any) -> search_space.ResultReason:431 result = super().check(capability)432 result.merge(433 search_space.check_countspace(434 self.data_disk_count, capability.data_disk_count435 ),436 "data_disk_count",437 )438 result.merge(439 search_space.check_countspace(440 self.max_data_disk_count, capability.max_data_disk_count441 ),442 "max_data_disk_count",443 )444 result.merge(445 search_space.check_countspace(446 self.data_disk_iops, capability.data_disk_iops447 ),448 "data_disk_iops",449 )450 return result451 def _get_key(self) -> str:452 return (453 f"{super()._get_key()}/{self.disk_type}/"454 f"{self.data_disk_count}/{self.data_disk_caching_type}/"455 f"{self.data_disk_iops}/{self.data_disk_size}"456 )457 def _call_requirement_method(self, method_name: str, capability: Any) -> Any:458 assert isinstance(capability, DiskOptionSettings), f"actual: {type(capability)}"459 parent_value = super()._call_requirement_method(method_name, capability)460 # convert parent type to child type461 value = DiskOptionSettings()462 value.extended_schemas = parent_value.extended_schemas463 search_space_countspace_method = getattr(464 search_space, f"{method_name}_countspace"465 )466 if self.disk_type or capability.disk_type:467 value.disk_type = getattr(468 search_space, f"{method_name}_setspace_by_priority"469 )(self.disk_type, capability.disk_type, disk_type_priority)470 if self.data_disk_count or capability.data_disk_count:471 value.data_disk_count = search_space_countspace_method(472 self.data_disk_count, capability.data_disk_count473 )474 if self.data_disk_iops or capability.data_disk_iops:475 value.data_disk_iops = search_space_countspace_method(476 self.data_disk_iops, capability.data_disk_iops477 )478 if self.data_disk_size or capability.data_disk_size:479 value.data_disk_size = search_space_countspace_method(480 self.data_disk_size, capability.data_disk_size481 )482 if self.data_disk_caching_type or capability.data_disk_caching_type:483 value.data_disk_caching_type = (484 self.data_disk_caching_type or capability.data_disk_caching_type485 )486 if self.max_data_disk_count or capability.max_data_disk_count:487 value.max_data_disk_count = search_space_countspace_method(488 self.max_data_disk_count, capability.max_data_disk_count489 )490 return value491class NetworkDataPath(str, Enum):492 Synthetic = "Synthetic"493 Sriov = "Sriov"494_network_data_path_priority: List[NetworkDataPath] = [495 NetworkDataPath.Sriov,496 NetworkDataPath.Synthetic,497]498@dataclass_json()499@dataclass()500class NetworkInterfaceOptionSettings(FeatureSettings):501 type: str = "NetworkInterface"502 data_path: Optional[503 Union[search_space.SetSpace[NetworkDataPath], NetworkDataPath]504 ] = field( # type: ignore505 default_factory=partial(506 search_space.SetSpace,507 items=[508 NetworkDataPath.Synthetic,509 NetworkDataPath.Sriov,510 ],511 ),512 metadata=field_metadata(513 decoder=partial(514 search_space.decode_set_space_by_type, base_type=NetworkDataPath515 )516 ),517 )518 # nic_count is used for specifying associated nic count during provisioning vm519 nic_count: search_space.CountSpace = field(520 default_factory=partial(search_space.IntRange, min=1),521 metadata=field_metadata(decoder=search_space.decode_count_space),522 )523 # max_nic_count is used for getting the size max nic capability, it can be used to524 # check how many nics the vm can be associated after provisioning525 max_nic_count: search_space.CountSpace = field(526 default_factory=partial(search_space.IntRange, min=1),527 metadata=field_metadata(528 allow_none=True, decoder=search_space.decode_count_space529 ),530 )531 def __eq__(self, o: object) -> bool:532 assert isinstance(o, NetworkInterfaceOptionSettings), f"actual: {type(o)}"533 return (534 self.type == o.type535 and self.data_path == o.data_path536 and self.nic_count == o.nic_count537 and self.max_nic_count == o.max_nic_count538 )539 def __repr__(self) -> str:540 return (541 f"data_path:{self.data_path}, nic_count:{self.nic_count},"542 f" max_nic_count:{self.max_nic_count}"543 )544 def __str__(self) -> str:545 return self.__repr__()546 def __hash__(self) -> int:547 return super().__hash__()548 def _get_key(self) -> str:549 return (550 f"{super()._get_key()}/{self.data_path}/{self.nic_count}"551 f"/{self.max_nic_count}"552 )553 def check(self, capability: Any) -> search_space.ResultReason:554 assert isinstance(555 capability, NetworkInterfaceOptionSettings556 ), f"actual: {type(capability)}"557 result = super().check(capability)558 result.merge(559 search_space.check_countspace(self.nic_count, capability.nic_count),560 "nic_count",561 )562 result.merge(563 search_space.check_setspace(self.data_path, capability.data_path),564 "data_path",565 )566 result.merge(567 search_space.check_countspace(self.max_nic_count, capability.max_nic_count),568 "max_nic_count",569 )570 return result571 def _call_requirement_method(self, method_name: str, capability: Any) -> Any:572 assert isinstance(573 capability, NetworkInterfaceOptionSettings574 ), f"actual: {type(capability)}"575 parent_value = super()._call_requirement_method(method_name, capability)576 # convert parent type to child type577 value = NetworkInterfaceOptionSettings()578 value.extended_schemas = parent_value.extended_schemas579 value.max_nic_count = getattr(search_space, f"{method_name}_countspace")(580 self.max_nic_count, capability.max_nic_count581 )582 if self.nic_count or capability.nic_count:583 value.nic_count = getattr(search_space, f"{method_name}_countspace")(584 self.nic_count, capability.nic_count585 )586 else:587 raise LisaException("nic_count cannot be zero")588 value.data_path = getattr(search_space, f"{method_name}_setspace_by_priority")(589 self.data_path, capability.data_path, _network_data_path_priority590 )591 return value592@dataclass_json()593@dataclass()594class FeaturesSpace(595 search_space.SetSpace[Union[str, FeatureSettings]],596):597 def __post_init__(self, *args: Any, **kwargs: Any) -> None:598 if self.items:599 for index, item in enumerate(self.items):600 if isinstance(item, dict):601 item = load_by_type(FeatureSettings, item)602 self.items[index] = item603@dataclass_json()604@dataclass()605class NodeSpace(search_space.RequirementMixin, TypedSchema, ExtendableSchemaMixin):606 type: str = field(607 default=constants.ENVIRONMENTS_NODES_REQUIREMENT,608 metadata=field_metadata(609 required=True,610 validate=validate.OneOf([constants.ENVIRONMENTS_NODES_REQUIREMENT]),611 ),612 )613 name: str = ""614 is_default: bool = field(default=False)615 node_count: search_space.CountSpace = field(616 default=search_space.IntRange(min=1),617 metadata=field_metadata(decoder=search_space.decode_count_space),618 )619 core_count: search_space.CountSpace = field(620 default=search_space.IntRange(min=1),621 metadata=field_metadata(decoder=search_space.decode_count_space),622 )623 memory_mb: search_space.CountSpace = field(624 default=search_space.IntRange(min=512),625 metadata=field_metadata(decoder=search_space.decode_count_space),626 )627 disk: Optional[DiskOptionSettings] = None628 network_interface: Optional[NetworkInterfaceOptionSettings] = None629 gpu_count: search_space.CountSpace = field(630 default=search_space.IntRange(min=0),631 metadata=field_metadata(decoder=search_space.decode_count_space),632 )633 # all features on requirement should be included.634 # all features on capability can be included.635 _features: Optional[FeaturesSpace] = field(636 default=None,637 metadata=field_metadata(allow_none=True, data_key="features"),638 )639 # set by requirements640 # capability's is ignored641 _excluded_features: Optional[FeaturesSpace] = field(642 default=None,643 metadata=field_metadata(644 allow_none=True,645 data_key="excluded_features",646 ),647 )648 def __post_init__(self, *args: Any, **kwargs: Any) -> None:649 # clarify types to avoid type errors in properties.650 self._features: Optional[search_space.SetSpace[FeatureSettings]]651 self._excluded_features: Optional[search_space.SetSpace[FeatureSettings]]652 def __eq__(self, o: object) -> bool:653 assert isinstance(o, NodeSpace), f"actual: {type(o)}"654 return (655 self.type == o.type656 and self.node_count == o.node_count657 and self.core_count == o.core_count658 and self.memory_mb == o.memory_mb659 and self.disk == o.disk660 and self.network_interface == o.network_interface661 and self.gpu_count == o.gpu_count662 and self.features == o.features663 and self.excluded_features == o.excluded_features664 )665 def __repr__(self) -> str:666 """667 override it for shorter text668 """669 return (670 f"type:{self.type},name:{self.name},"671 f"default:{self.is_default},"672 f"count:{self.node_count},core:{self.core_count},"673 f"mem:{self.memory_mb},disk:{self.disk},"674 f"network interface: {self.network_interface}, gpu:{self.gpu_count},"675 f"f:{self.features},ef:{self.excluded_features},"676 f"{super().__repr__()}"677 )678 @property679 def cost(self) -> float:680 core_count = search_space.generate_min_capability_countspace(681 self.core_count, self.core_count682 )683 gpu_count = search_space.generate_min_capability_countspace(684 self.gpu_count, self.gpu_count685 )686 return core_count + gpu_count * 100687 @property688 def features(self) -> Optional[search_space.SetSpace[FeatureSettings]]:689 self._features = self._create_feature_settings_list(self._features)690 if self._features is not None:691 self._features.is_allow_set = True692 return cast(Optional[search_space.SetSpace[FeatureSettings]], self._features)693 @features.setter694 def features(self, value: Optional[search_space.SetSpace[FeatureSettings]]) -> None:695 self._features = cast(FeaturesSpace, value)696 @property697 def excluded_features(self) -> Optional[search_space.SetSpace[FeatureSettings]]:698 if not self._excluded_features:699 self._excluded_features = self._create_feature_settings_list(700 self._excluded_features701 )702 if self._excluded_features is not None:703 self._excluded_features.is_allow_set = False704 return cast(705 Optional[search_space.SetSpace[FeatureSettings]], self._excluded_features706 )707 @excluded_features.setter708 def excluded_features(709 self, value: Optional[search_space.SetSpace[FeatureSettings]]710 ) -> None:711 self._excluded_features = cast(FeaturesSpace, value)712 def check(self, capability: Any) -> search_space.ResultReason:713 result = search_space.ResultReason()714 if capability is None:715 result.add_reason("capability shouldn't be None")716 if self.features:717 assert self.features.is_allow_set, "features should be allow set"718 if self.excluded_features:719 assert (720 not self.excluded_features.is_allow_set721 ), "excluded_features shouldn't be allow set"722 assert isinstance(capability, NodeSpace), f"actual: {type(capability)}"723 if (724 not capability.node_count725 or not capability.core_count726 or not capability.memory_mb727 ):728 result.add_reason(729 "node_count, core_count, memory_mb " "shouldn't be None or zero."730 )731 if isinstance(self.node_count, int) and isinstance(capability.node_count, int):732 if self.node_count > capability.node_count:733 result.add_reason(734 f"capability node count {capability.node_count} "735 f"must be more than requirement {self.node_count}"736 )737 else:738 result.merge(739 search_space.check_countspace(self.node_count, capability.node_count),740 "node_count",741 )742 result.merge(743 search_space.check_countspace(self.core_count, capability.core_count),744 "core_count",745 )746 result.merge(747 search_space.check_countspace(self.memory_mb, capability.memory_mb),748 "memory_mb",749 )750 if self.disk:751 result.merge(self.disk.check(capability.disk))752 if self.network_interface:753 result.merge(self.network_interface.check(capability.network_interface))754 result.merge(755 search_space.check_countspace(self.gpu_count, capability.gpu_count),756 "gpu_count",757 )758 if self.features:759 for feature in self.features:760 cap_feature = self._find_feature_by_type(761 feature.type, capability.features762 )763 if cap_feature:764 result.merge(feature.check(cap_feature))765 else:766 result.add_reason(767 f"no feature '{feature.type}' found in capability"768 )769 if self.excluded_features:770 for feature in self.excluded_features:771 cap_feature = self._find_feature_by_type(772 feature.type, capability.features773 )774 if cap_feature:775 result.add_reason(776 f"excluded feature '{feature.type}' found in capability"777 )778 return result779 def expand_by_node_count(self) -> List[Any]:780 # expand node count in requirement to one,781 # so that's easy to compare equalization later.782 expanded_requirements: List[NodeSpace] = []783 node_count = search_space.generate_min_capability_countspace(784 self.node_count, self.node_count785 )786 for _ in range(node_count):787 expanded_copy = copy.copy(self)788 expanded_copy.node_count = 1789 expanded_requirements.append(expanded_copy)790 return expanded_requirements791 def has_feature(self, find_type: str) -> bool:792 result = False793 if not self.features:794 return result795 return any(feature for feature in self.features if feature.type == find_type)796 def _call_requirement_method(self, method_name: str, capability: Any) -> Any:797 assert isinstance(capability, NodeSpace), f"actual: {type(capability)}"798 # copy to duplicate extended schema799 value: NodeSpace = copy.deepcopy(self)800 if self.node_count or capability.node_count:801 if isinstance(self.node_count, int) and isinstance(802 capability.node_count, int803 ):804 # capability can have more node805 value.node_count = capability.node_count806 else:807 value.node_count = getattr(search_space, f"{method_name}_countspace")(808 self.node_count, capability.node_count809 )810 else:811 raise LisaException("node_count cannot be zero")812 if self.core_count or capability.core_count:813 value.core_count = getattr(search_space, f"{method_name}_countspace")(814 self.core_count, capability.core_count815 )816 else:817 raise LisaException("core_count cannot be zero")818 if self.memory_mb or capability.memory_mb:819 value.memory_mb = getattr(search_space, f"{method_name}_countspace")(820 self.memory_mb, capability.memory_mb821 )822 else:823 raise LisaException("memory_mb cannot be zero")824 if self.disk or capability.disk:825 value.disk = getattr(search_space, method_name)(self.disk, capability.disk)826 if self.network_interface or capability.network_interface:827 value.network_interface = getattr(search_space, method_name)(828 self.network_interface, capability.network_interface829 )830 if self.gpu_count or capability.gpu_count:831 value.gpu_count = getattr(search_space, f"{method_name}_countspace")(832 self.gpu_count, capability.gpu_count833 )834 else:835 value.gpu_count = 0836 if (837 capability.features838 and method_name == search_space.RequirementMethod.generate_min_capability839 ):840 # The requirement features are ignored, if cap doesn't have it.841 value.features = search_space.SetSpace[FeatureSettings](is_allow_set=True)842 for original_cap_feature in capability.features:843 capability_feature = self._get_or_create_feature_settings(844 original_cap_feature845 )846 requirement_feature = (847 self._find_feature_by_type(capability_feature.type, self.features)848 or capability_feature849 )850 current_feature = getattr(requirement_feature, method_name)(851 capability_feature852 )853 value.features.add(current_feature)854 elif method_name == search_space.RequirementMethod.intersect and (855 capability.features or self.features856 ):857 # This is a hack to work with lisa_runner. The capability features858 # are joined case req and runbook req. Here just take the results859 # from capability.860 value.features = capability.features861 if (862 capability.excluded_features863 and method_name == search_space.RequirementMethod.generate_min_capability864 ):865 # TODO: the min value for excluded feature is not clear. It may need866 # to be improved with real scenarios.867 value.excluded_features = search_space.SetSpace[FeatureSettings](868 is_allow_set=True869 )870 for original_cap_feature in capability.excluded_features:871 capability_feature = self._get_or_create_feature_settings(872 original_cap_feature873 )874 requirement_feature = (875 self._find_feature_by_type(876 capability_feature.type, self.excluded_features877 )878 or capability_feature879 )880 current_feature = getattr(requirement_feature, method_name)(881 capability_feature882 )883 value.excluded_features.add(current_feature)884 elif method_name == search_space.RequirementMethod.intersect and (885 capability.excluded_features or self.excluded_features886 ):887 # This is a hack to work with lisa_runner. The capability features888 # are joined case req and runbook req. Here just take the results889 # from capability.890 value.excluded_features = capability.excluded_features891 return value892 def _find_feature_by_type(893 self,894 find_type: str,895 features: Optional[search_space.SetSpace[Any]],896 ) -> Optional[FeatureSettings]:897 result: Optional[FeatureSettings] = None898 if not features:899 return result900 is_found = False901 for original_feature in features.items:902 feature = self._get_or_create_feature_settings(original_feature)903 if feature.type == find_type:904 is_found = True905 break906 if is_found:907 result = feature908 return result909 def _create_feature_settings_list(910 self, features: Optional[search_space.SetSpace[Any]]911 ) -> Optional[FeaturesSpace]:912 result: Optional[FeaturesSpace] = None913 if features is None:914 return result915 result = cast(916 FeaturesSpace,917 search_space.SetSpace[FeatureSettings](is_allow_set=features.is_allow_set),918 )919 for raw_feature in features.items:920 feature = self._get_or_create_feature_settings(raw_feature)921 result.add(feature)922 return result923 def _get_or_create_feature_settings(self, feature: Any) -> FeatureSettings:924 if isinstance(feature, str):925 feature_setting = FeatureSettings.create(feature)926 elif isinstance(feature, FeatureSettings):927 feature_setting = feature928 else:929 raise LisaException(930 f"unsupported type {type(feature)} found in features, "931 "only str and FeatureSettings supported."932 )933 return feature_setting934@dataclass_json()935@dataclass936class Capability(NodeSpace):937 type: str = constants.ENVIRONMENTS_NODES_REQUIREMENT938 def __post_init__(self, *args: Any, **kwargs: Any) -> None:939 super().__post_init__(*args, **kwargs)940 self.node_count = 1941@dataclass_json()942@dataclass943class Node(TypedSchema, ExtendableSchemaMixin):944 capability: Capability = field(default_factory=Capability)945 name: str = ""946 is_default: bool = field(default=False)947@dataclass_json()948@dataclass949class LocalNode(Node):950 type: str = constants.ENVIRONMENTS_NODES_LOCAL951@dataclass_json()952@dataclass953class RemoteNode(Node):954 type: str = constants.ENVIRONMENTS_NODES_REMOTE955 address: str = ""956 port: int = field(957 default=22,958 metadata=field_metadata(959 field_function=fields.Int, validate=validate.Range(min=1, max=65535)960 ),961 )962 public_address: str = ""963 public_port: int = field(964 default=22,965 metadata=field_metadata(966 field_function=fields.Int, validate=validate.Range(min=1, max=65535)967 ),968 )969 username: str = constants.DEFAULT_USER_NAME970 password: str = ""971 private_key_file: str = ""972 def __post_init__(self, *args: Any, **kwargs: Any) -> None:973 add_secret(self.username, PATTERN_HEADTAIL)974 add_secret(self.password)975 add_secret(self.private_key_file)976@dataclass_json()977@dataclass978class Environment:979 name: str = field(default="")980 topology: str = field(981 default=constants.ENVIRONMENTS_SUBNET,982 metadata=field_metadata(983 validate=validate.OneOf([constants.ENVIRONMENTS_SUBNET])984 ),985 )986 nodes_raw: Optional[List[Any]] = field(987 default=None,988 metadata=field_metadata(data_key=constants.NODES),989 )990 nodes_requirement: Optional[List[NodeSpace]] = None991 _original_nodes_requirement: Optional[List[NodeSpace]] = None992 def __post_init__(self, *args: Any, **kwargs: Any) -> None:993 self._original_nodes_requirement = self.nodes_requirement994 self.reload_requirements()995 def reload_requirements(self) -> None:996 results: List[Node] = []997 self.nodes = []998 self.nodes_requirement = None999 if self._original_nodes_requirement:1000 self.nodes_requirement = []1001 self.nodes_requirement.extend(copy.copy(self._original_nodes_requirement))1002 if self.nodes_raw:1003 for node_raw in self.nodes_raw:1004 node_type = node_raw[constants.TYPE]1005 if node_type == constants.ENVIRONMENTS_NODES_REQUIREMENT:1006 original_req: NodeSpace = load_by_type(NodeSpace, node_raw)1007 expanded_req = original_req.expand_by_node_count()1008 if self.nodes_requirement is None:1009 self.nodes_requirement = []1010 self.nodes_requirement.extend(expanded_req)1011 else:1012 # load base schema for future parsing1013 node: Node = load_by_type(Node, node_raw)1014 results.append(node)1015 self.nodes = results1016@dataclass_json()1017@dataclass1018class EnvironmentRoot:1019 warn_as_error: bool = field(default=False)1020 environments: List[Environment] = field(default_factory=list)1021@dataclass_json()1022@dataclass1023class Platform(TypedSchema, ExtendableSchemaMixin):1024 type: str = field(1025 default=constants.PLATFORM_READY,1026 metadata=field_metadata(required=True),1027 )1028 admin_username: str = constants.DEFAULT_USER_NAME1029 admin_password: str = ""1030 admin_private_key_file: str = ""1031 # no/False: means to delete the environment regardless case fail or pass1032 # yes/always/True: means to keep the environment regardless case fail or pass1033 keep_environment: Optional[Union[str, bool]] = constants.ENVIRONMENT_KEEP_NO1034 # platform can specify a default environment requirement1035 requirement: Optional[Dict[str, Any]] = None1036 def __post_init__(self, *args: Any, **kwargs: Any) -> None:1037 add_secret(self.admin_username, PATTERN_HEADTAIL)1038 add_secret(self.admin_password)1039 if self.type != constants.PLATFORM_READY:1040 if not self.admin_password and not self.admin_private_key_file:1041 raise LisaException(1042 "one of admin_password and admin_private_key_file must be set"1043 )1044 if isinstance(self.keep_environment, bool):1045 if self.keep_environment:1046 self.keep_environment = constants.ENVIRONMENT_KEEP_ALWAYS1047 else:1048 self.keep_environment = constants.ENVIRONMENT_KEEP_NO1049 allow_list = [1050 constants.ENVIRONMENT_KEEP_ALWAYS,1051 constants.ENVIRONMENT_KEEP_FAILED,1052 constants.ENVIRONMENT_KEEP_NO,1053 ]1054 assert isinstance(self.keep_environment, str), (1055 f"keep_environment should be {allow_list} or bool, "1056 f"but it's {type(self.keep_environment)}, '{self.keep_environment}'"1057 )1058 if isinstance(self.keep_environment, str):1059 self.keep_environment = self.keep_environment.lower()1060 if self.keep_environment not in allow_list:1061 raise LisaException(1062 f"keep_environment only can be set as one of {allow_list}"1063 )1064 # this requirement in platform will be applied to each test case1065 # requirement. It means the set value will override value in test cases.1066 # But the schema will be validated here. The original NodeSpace object holds1067 if self.requirement:1068 # validate schema of raw inputs1069 load_by_type(Capability, self.requirement)1070@dataclass_json()1071@dataclass1072class Criteria:1073 """1074 all rules in same criteria are AND condition.1075 we may support richer conditions later.1076 match case by name pattern1077 """1078 name: Optional[str] = None1079 area: Optional[str] = None1080 category: Optional[str] = None1081 priority: Optional[Union[int, List[int]]] = field(1082 default=None,1083 metadata=field_metadata(1084 validate=ListableValidator(int, validate.Range(min=0, max=4)),1085 allow_none=True,1086 ),1087 )1088 # tags is a simple way to include test cases within same topic.1089 tags: Optional[Union[str, List[str]]] = field(1090 default=None,1091 metadata=field_metadata(validate=ListableValidator(str), allow_none=True),1092 )1093 def __post_init__(self, *args: Any, **kwargs: Any) -> None:1094 strip_strs(self, ["name", "area", "category", "tags"])1095@dataclass_json()1096@dataclass1097class BaseTestCaseFilter(TypedSchema, ExtendableSchemaMixin, BaseClassMixin):1098 """1099 base test case filters for subclass factory1100 """1101 type: str = field(1102 default=constants.TESTCASE_TYPE_LISA,1103 )1104 # if it's false, current filter is ineffective.1105 enabled: bool = field(default=True)1106@dataclass_json()1107@dataclass1108class TestCase(BaseTestCaseFilter):1109 type: str = field(1110 default=constants.TESTCASE_TYPE_LISA,1111 metadata=field_metadata(1112 validate=validate.OneOf([constants.TESTCASE_TYPE_LISA]),1113 ),1114 )1115 name: str = ""1116 criteria: Optional[Criteria] = None1117 # specify use this rule to select or drop test cases. if it's forced include or1118 # exclude, it won't be effect by following select actions. And it fails if1119 # there are force rules conflict.1120 select_action: str = field(1121 default=constants.TESTCASE_SELECT_ACTION_INCLUDE,1122 metadata=config(1123 mm_field=fields.String(1124 validate=validate.OneOf(1125 [1126 # none means this action part doesn't include or exclude cases1127 constants.TESTCASE_SELECT_ACTION_NONE,1128 constants.TESTCASE_SELECT_ACTION_INCLUDE,1129 constants.TESTCASE_SELECT_ACTION_FORCE_INCLUDE,1130 constants.TESTCASE_SELECT_ACTION_EXCLUDE,1131 constants.TESTCASE_SELECT_ACTION_FORCE_EXCLUDE,1132 ]1133 )1134 ),1135 ),1136 )1137 # run this group of test cases several times1138 # default is 11139 times: int = field(1140 default=1,1141 metadata=field_metadata(1142 field_function=fields.Int, validate=validate.Range(min=1)1143 ),1144 )1145 # retry times if fails. Default is 0, not to retry.1146 retry: int = field(1147 default=0,1148 metadata=field_metadata(1149 field_function=fields.Int, validate=validate.Range(min=0)1150 ),1151 )1152 # each case with this rule will be run in a new environment.1153 use_new_environment: bool = False1154 # Once it's set, failed test result will be rewrite to success1155 # it uses to work around some cases temporarily, don't overuse it.1156 # default is false1157 ignore_failure: bool = False1158 # case should run on a specified environment1159 environment: str = ""1160 @classmethod1161 def type_name(cls) -> str:1162 return constants.TESTCASE_TYPE_LISA1163@dataclass_json()1164@dataclass1165class LegacyTestCase(BaseTestCaseFilter):1166 type: str = field(1167 default=constants.TESTCASE_TYPE_LEGACY,1168 metadata=field_metadata(1169 required=True,1170 validate=validate.OneOf([constants.TESTCASE_TYPE_LEGACY]),1171 ),1172 )1173 repo: str = "https://github.com/microsoft/lisa.git"1174 branch: str = "master"1175 command: str = ""1176 @classmethod1177 def type_name(cls) -> str:1178 return constants.TESTCASE_TYPE_LEGACY1179@dataclass_json()1180@dataclass1181class ConnectionInfo:1182 address: str = ""1183 port: int = field(1184 default=22,1185 metadata=field_metadata(1186 field_function=fields.Int, validate=validate.Range(min=1, max=65535)1187 ),1188 )1189 username: str = constants.DEFAULT_USER_NAME1190 password: Optional[str] = ""1191 private_key_file: Optional[str] = ""1192 def __post_init__(self, *args: Any, **kwargs: Any) -> None:1193 add_secret(self.username, PATTERN_HEADTAIL)1194 add_secret(self.password)1195 add_secret(self.private_key_file)1196 if not self.password and not self.private_key_file:1197 raise LisaException(1198 "at least one of password or private_key_file need to be set when "1199 "connecting"1200 )1201 elif not self.private_key_file:1202 # use password1203 # spurplus doesn't process empty string correctly, use None1204 self.private_key_file = None1205 else:1206 if not Path(self.private_key_file).exists():1207 raise FileNotFoundError(self.private_key_file)1208 self.password = None1209 if not self.username:1210 raise LisaException("username must be set")1211 def __str__(self) -> str:1212 return f"{self.username}@{self.address}:{self.port}"1213@dataclass_json()1214@dataclass1215class ProxyConnectionInfo(ConnectionInfo):1216 private_address: str = ""1217 private_port: int = field(1218 default=22,1219 metadata=field_metadata(1220 field_function=fields.Int, validate=validate.Range(min=1, max=65535)1221 ),1222 )1223@dataclass_json()1224@dataclass1225class Development:1226 enabled: bool = True1227 enable_trace: bool = False1228 mock_tcp_ping: bool = False1229 jump_boxes: List[ProxyConnectionInfo] = field(default_factory=list)1230@dataclass_json()1231@dataclass1232class Runbook:1233 # run name prefix to help grouping results and put it in title.1234 name: str = "not_named"1235 exit_with_failed_count: bool = True1236 test_project: str = ""1237 test_pass: str = ""1238 tags: Optional[List[str]] = None1239 concurrency: int = 11240 # minutes to wait for resource1241 wait_resource_timeout: float = 51242 include: Optional[List[Include]] = field(default=None)1243 extension: Optional[List[Union[str, Extension]]] = field(default=None)1244 variable: Optional[List[Variable]] = field(default=None)1245 transformer: Optional[List[Transformer]] = field(default=None)1246 combinator: Optional[Combinator] = field(default=None)1247 environment: Optional[EnvironmentRoot] = field(default=None)1248 notifier: Optional[List[Notifier]] = field(default=None)1249 platform: List[Platform] = field(default_factory=list)1250 # will be parsed in runner.1251 testcase_raw: List[Any] = field(1252 default_factory=list, metadata=field_metadata(data_key=constants.TESTCASE)1253 )1254 dev: Optional[Development] = field(default=None)1255 def __post_init__(self, *args: Any, **kwargs: Any) -> None:1256 if not self.platform:1257 self.platform = [Platform(type=constants.PLATFORM_READY)]1258 if not self.testcase_raw:1259 self.testcase_raw = [1260 {1261 constants.TESTCASE_CRITERIA: {1262 constants.TESTCASE_CRITERIA_AREA: "demo"1263 }1264 }1265 ]1266 self.testcase: List[Any] = []1267def load_by_type(schema_type: Type[T], raw_runbook: Any, many: bool = False) -> T:1268 """1269 Convert dict, list or base typed schema to specified typed schema.1270 """1271 if type(raw_runbook) == schema_type:1272 return raw_runbook1273 if not isinstance(raw_runbook, dict) and not many:1274 raw_runbook = raw_runbook.to_dict()1275 result: T = schema_type.schema().load(raw_runbook, many=many) # type: ignore1276 return result1277def load_by_type_many(schema_type: Type[T], raw_runbook: Any) -> List[T]:1278 """1279 Convert raw list to list of typed schema. It has different returned type1280 with load_by_type.1281 """1282 result = load_by_type(schema_type, raw_runbook=raw_runbook, many=True)...
search_space.py
Source:search_space.py
...185 f"IntRange doesn't support other intersect on {type(capability)}."186 )187 return result188CountSpace = Union[int, List[IntRange], IntRange, None]189def decode_count_space(data: Any) -> Any:190 """191 CountSpace is complex to marshmallow, so it needs customized decode.192 Anyway, marshmallow can encode it correctly.193 """194 decoded_data: CountSpace = None195 if data is None or isinstance(data, int) or isinstance(data, IntRange):196 decoded_data = data197 elif isinstance(data, list):198 decoded_data = []199 for item in data:200 if isinstance(item, dict):201 decoded_data.append(IntRange.schema().load(item)) # type: ignore202 else:203 assert isinstance(item, IntRange), f"actual: {type(item)}"...
nvme.py
Source:nvme.py
1# Copyright (c) Microsoft Corporation.2# Licensed under the MIT license.3import re4from dataclasses import dataclass, field5from typing import Any, List, Type6from dataclasses_json import dataclass_json7from lisa import schema, search_space8from lisa.feature import Feature9from lisa.schema import FeatureSettings10from lisa.tools import Lspci, Nvmecli11from lisa.tools.lspci import PciDevice12from lisa.util import field_metadata13class Nvme(Feature):14 # crw------- 1 root root 251, 0 Jun 21 03:08 /dev/nvme015 _device_pattern = re.compile(r".*(?P<device_name>/dev/nvme[0-9]$)", re.MULTILINE)16 # brw-rw---- 1 root disk 259, 0 Jun 21 03:08 /dev/nvme0n117 _namespace_pattern = re.compile(18 r".*(?P<namespace>/dev/nvme[0-9]n[0-9]$)", re.MULTILINE19 )20 # '/dev/nvme0n1 351f1f720e5a00000001 Microsoft NVMe Direct Disk 1 0.00 B / 1.92 TB 512 B + 0 B NVMDV001' # noqa: E50121 _namespace_cli_pattern = re.compile(22 r"(?P<namespace>/dev/nvme[0-9]n[0-9])", re.MULTILINE23 )24 _pci_device_name = "Non-Volatile memory controller"25 _ls_devices: str = ""26 @classmethod27 def settings_type(cls) -> Type[schema.FeatureSettings]:28 return NvmeSettings29 @classmethod30 def can_disable(cls) -> bool:31 return True32 def enabled(self) -> bool:33 return True34 def get_devices(self) -> List[str]:35 devices_list = []36 self._get_device_from_ls()37 for row in self._ls_devices.splitlines():38 matched_result = self._device_pattern.match(row)39 if matched_result:40 devices_list.append(matched_result.group("device_name"))41 return devices_list42 def get_namespaces(self) -> List[str]:43 namespaces = []44 self._get_device_from_ls()45 for row in self._ls_devices.splitlines():46 matched_result = self._namespace_pattern.match(row)47 if matched_result:48 namespaces.append(matched_result.group("namespace"))49 return namespaces50 def get_namespaces_from_cli(self) -> List[str]:51 namespaces_cli = []52 nvme_cli = self._node.tools[Nvmecli]53 nvme_list = nvme_cli.run("list", shell=True, sudo=True)54 for row in nvme_list.stdout.splitlines():55 matched_result = self._namespace_cli_pattern.match(row)56 if matched_result:57 namespaces_cli.append(matched_result.group("namespace"))58 return namespaces_cli59 def get_devices_from_lspci(self) -> List[PciDevice]:60 devices_from_lspci = []61 lspci_tool = self._node.tools[Lspci]62 device_list = lspci_tool.get_devices()63 devices_from_lspci = [64 x for x in device_list if self._pci_device_name == x.device_class65 ]66 return devices_from_lspci67 def get_raw_data_disks(self) -> List[str]:68 return self.get_namespaces()69 def _get_device_from_ls(self, force_run: bool = False) -> None:70 if (not self._ls_devices) or force_run:71 execute_results = self._node.execute(72 "ls -l /dev/nvme*", shell=True, sudo=True73 )74 self._ls_devices = execute_results.stdout75@dataclass_json()76@dataclass()77class NvmeSettings(FeatureSettings):78 type: str = "Nvme"79 disk_count: search_space.CountSpace = field(80 default=search_space.IntRange(min=0),81 metadata=field_metadata(decoder=search_space.decode_count_space),82 )83 def __eq__(self, o: object) -> bool:84 assert isinstance(o, NvmeSettings), f"actual: {type(o)}"85 return self.type == o.type and self.disk_count == o.disk_count86 def __repr__(self) -> str:87 return f"disk_count:{self.disk_count}"88 def __str__(self) -> str:89 return self.__repr__()90 def __hash__(self) -> int:91 return super().__hash__()92 def _get_key(self) -> str:93 return f"{super()._get_key()}/{self.disk_count}"94 def check(self, capability: Any) -> search_space.ResultReason:95 assert isinstance(capability, NvmeSettings), f"actual: {type(capability)}"96 result = super().check(capability)97 result.merge(98 search_space.check_countspace(self.disk_count, capability.disk_count),99 "disk_count",100 )101 return result102 def _generate_min_capability(self, capability: Any) -> Any:103 assert isinstance(capability, NvmeSettings), f"actual: {type(capability)}"104 min_value = NvmeSettings()105 if self.disk_count or capability.disk_count:106 min_value.disk_count = search_space.generate_min_capability_countspace(107 self.disk_count, capability.disk_count108 )...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!