Best Python code snippet using lisa_python
schema.py
Source:schema.py
...318 )319 return result320 def _get_key(self) -> str:321 return self.type322 def _call_requirement_method(self, method_name: str, capability: Any) -> Any:323 assert isinstance(capability, FeatureSettings), f"actual: {type(capability)}"324 # default FeatureSetting is a place holder, nothing to do.325 value = FeatureSettings.create(self.type)326 # try best to intersect the extended schemas327 if method_name == search_space.RequirementMethod.intersect:328 if self.extended_schemas and capability and capability.extended_schemas:329 value.extended_schemas = deep_update_dict(330 self.extended_schemas,331 capability.extended_schemas,332 )333 else:334 value.extended_schemas = (335 capability.extended_schemas336 if capability and capability.extended_schemas337 else self.extended_schemas338 )339 return value340class DiskType(str, Enum):341 PremiumSSDLRS = "PremiumSSDLRS"342 Ephemeral = "Ephemeral"343 StandardHDDLRS = "StandardHDDLRS"344 StandardSSDLRS = "StandardSSDLRS"345# disk types are ordered by commonly and cost. The earlier is lower cost.346disk_type_priority: List[DiskType] = [347 DiskType.StandardHDDLRS,348 DiskType.StandardSSDLRS,349 DiskType.Ephemeral,350 DiskType.PremiumSSDLRS,351]352@dataclass_json()353@dataclass()354class DiskOptionSettings(FeatureSettings):355 type: str = constants.FEATURE_DISK356 disk_type: Optional[357 Union[search_space.SetSpace[DiskType], DiskType]358 ] = field( # type:ignore359 default_factory=partial(360 search_space.SetSpace,361 items=[362 DiskType.StandardHDDLRS,363 DiskType.StandardSSDLRS,364 DiskType.Ephemeral,365 DiskType.PremiumSSDLRS,366 ],367 ),368 metadata=field_metadata(369 decoder=partial(search_space.decode_set_space_by_type, base_type=DiskType)370 ),371 )372 data_disk_count: search_space.CountSpace = field(373 default_factory=partial(search_space.IntRange, min=0),374 metadata=field_metadata(decoder=search_space.decode_count_space),375 )376 data_disk_caching_type: str = field(377 default=constants.DATADISK_CACHING_TYPE_NONE,378 metadata=field_metadata(379 validate=validate.OneOf(380 [381 constants.DATADISK_CACHING_TYPE_NONE,382 constants.DATADISK_CACHING_TYPE_READONLY,383 constants.DATADISK_CACHING_TYPE_READYWRITE,384 ]385 ),386 ),387 )388 data_disk_iops: search_space.CountSpace = field(389 default_factory=partial(search_space.IntRange, min=0),390 metadata=field_metadata(391 allow_none=True, decoder=search_space.decode_count_space392 ),393 )394 data_disk_size: search_space.CountSpace = field(395 default_factory=partial(search_space.IntRange, min=0),396 metadata=field_metadata(397 allow_none=True, decoder=search_space.decode_count_space398 ),399 )400 max_data_disk_count: search_space.CountSpace = field(401 default=None,402 metadata=field_metadata(403 allow_none=True, decoder=search_space.decode_count_space404 ),405 )406 def __eq__(self, o: object) -> bool:407 assert isinstance(o, DiskOptionSettings), f"actual: {type(o)}"408 return (409 self.type == o.type410 and self.disk_type == o.disk_type411 and self.data_disk_count == o.data_disk_count412 and self.data_disk_caching_type == o.data_disk_caching_type413 and self.data_disk_iops == o.data_disk_iops414 and self.data_disk_size == o.data_disk_size415 and self.max_data_disk_count == o.max_data_disk_count416 )417 def __repr__(self) -> str:418 return (419 f"disk_type: {self.disk_type},"420 f"count: {self.data_disk_count},"421 f"caching: {self.data_disk_caching_type},"422 f"iops: {self.data_disk_iops},"423 f"size: {self.data_disk_size},"424 f"max_data_disk_count: {self.max_data_disk_count}"425 )426 def __str__(self) -> str:427 return self.__repr__()428 def __hash__(self) -> int:429 return super().__hash__()430 def check(self, capability: Any) -> search_space.ResultReason:431 result = super().check(capability)432 result.merge(433 search_space.check_countspace(434 self.data_disk_count, capability.data_disk_count435 ),436 "data_disk_count",437 )438 result.merge(439 search_space.check_countspace(440 self.max_data_disk_count, capability.max_data_disk_count441 ),442 "max_data_disk_count",443 )444 result.merge(445 search_space.check_countspace(446 self.data_disk_iops, capability.data_disk_iops447 ),448 "data_disk_iops",449 )450 return result451 def _get_key(self) -> str:452 return (453 f"{super()._get_key()}/{self.disk_type}/"454 f"{self.data_disk_count}/{self.data_disk_caching_type}/"455 f"{self.data_disk_iops}/{self.data_disk_size}"456 )457 def _call_requirement_method(self, method_name: str, capability: Any) -> Any:458 assert isinstance(capability, DiskOptionSettings), f"actual: {type(capability)}"459 parent_value = super()._call_requirement_method(method_name, capability)460 # convert parent type to child type461 value = DiskOptionSettings()462 value.extended_schemas = parent_value.extended_schemas463 search_space_countspace_method = getattr(464 search_space, f"{method_name}_countspace"465 )466 if self.disk_type or capability.disk_type:467 value.disk_type = getattr(468 search_space, f"{method_name}_setspace_by_priority"469 )(self.disk_type, capability.disk_type, disk_type_priority)470 if self.data_disk_count or capability.data_disk_count:471 value.data_disk_count = search_space_countspace_method(472 self.data_disk_count, capability.data_disk_count473 )474 if self.data_disk_iops or capability.data_disk_iops:475 value.data_disk_iops = search_space_countspace_method(476 self.data_disk_iops, capability.data_disk_iops477 )478 if self.data_disk_size or capability.data_disk_size:479 value.data_disk_size = search_space_countspace_method(480 self.data_disk_size, capability.data_disk_size481 )482 if self.data_disk_caching_type or capability.data_disk_caching_type:483 value.data_disk_caching_type = (484 self.data_disk_caching_type or capability.data_disk_caching_type485 )486 if self.max_data_disk_count or capability.max_data_disk_count:487 value.max_data_disk_count = search_space_countspace_method(488 self.max_data_disk_count, capability.max_data_disk_count489 )490 return value491class NetworkDataPath(str, Enum):492 Synthetic = "Synthetic"493 Sriov = "Sriov"494_network_data_path_priority: List[NetworkDataPath] = [495 NetworkDataPath.Sriov,496 NetworkDataPath.Synthetic,497]498@dataclass_json()499@dataclass()500class NetworkInterfaceOptionSettings(FeatureSettings):501 type: str = "NetworkInterface"502 data_path: Optional[503 Union[search_space.SetSpace[NetworkDataPath], NetworkDataPath]504 ] = field( # type: ignore505 default_factory=partial(506 search_space.SetSpace,507 items=[508 NetworkDataPath.Synthetic,509 NetworkDataPath.Sriov,510 ],511 ),512 metadata=field_metadata(513 decoder=partial(514 search_space.decode_set_space_by_type, base_type=NetworkDataPath515 )516 ),517 )518 # nic_count is used for specifying associated nic count during provisioning vm519 nic_count: search_space.CountSpace = field(520 default_factory=partial(search_space.IntRange, min=1),521 metadata=field_metadata(decoder=search_space.decode_count_space),522 )523 # max_nic_count is used for getting the size max nic capability, it can be used to524 # check how many nics the vm can be associated after provisioning525 max_nic_count: search_space.CountSpace = field(526 default_factory=partial(search_space.IntRange, min=1),527 metadata=field_metadata(528 allow_none=True, decoder=search_space.decode_count_space529 ),530 )531 def __eq__(self, o: object) -> bool:532 assert isinstance(o, NetworkInterfaceOptionSettings), f"actual: {type(o)}"533 return (534 self.type == o.type535 and self.data_path == o.data_path536 and self.nic_count == o.nic_count537 and self.max_nic_count == o.max_nic_count538 )539 def __repr__(self) -> str:540 return (541 f"data_path:{self.data_path}, nic_count:{self.nic_count},"542 f" max_nic_count:{self.max_nic_count}"543 )544 def __str__(self) -> str:545 return self.__repr__()546 def __hash__(self) -> int:547 return super().__hash__()548 def _get_key(self) -> str:549 return (550 f"{super()._get_key()}/{self.data_path}/{self.nic_count}"551 f"/{self.max_nic_count}"552 )553 def check(self, capability: Any) -> search_space.ResultReason:554 assert isinstance(555 capability, NetworkInterfaceOptionSettings556 ), f"actual: {type(capability)}"557 result = super().check(capability)558 result.merge(559 search_space.check_countspace(self.nic_count, capability.nic_count),560 "nic_count",561 )562 result.merge(563 search_space.check_setspace(self.data_path, capability.data_path),564 "data_path",565 )566 result.merge(567 search_space.check_countspace(self.max_nic_count, capability.max_nic_count),568 "max_nic_count",569 )570 return result571 def _call_requirement_method(self, method_name: str, capability: Any) -> Any:572 assert isinstance(573 capability, NetworkInterfaceOptionSettings574 ), f"actual: {type(capability)}"575 parent_value = super()._call_requirement_method(method_name, capability)576 # convert parent type to child type577 value = NetworkInterfaceOptionSettings()578 value.extended_schemas = parent_value.extended_schemas579 value.max_nic_count = getattr(search_space, f"{method_name}_countspace")(580 self.max_nic_count, capability.max_nic_count581 )582 if self.nic_count or capability.nic_count:583 value.nic_count = getattr(search_space, f"{method_name}_countspace")(584 self.nic_count, capability.nic_count585 )586 else:587 raise LisaException("nic_count cannot be zero")588 value.data_path = getattr(search_space, f"{method_name}_setspace_by_priority")(589 self.data_path, capability.data_path, _network_data_path_priority590 )591 return value592@dataclass_json()593@dataclass()594class FeaturesSpace(595 search_space.SetSpace[Union[str, FeatureSettings]],596):597 def __post_init__(self, *args: Any, **kwargs: Any) -> None:598 if self.items:599 for index, item in enumerate(self.items):600 if isinstance(item, dict):601 item = load_by_type(FeatureSettings, item)602 self.items[index] = item603@dataclass_json()604@dataclass()605class NodeSpace(search_space.RequirementMixin, TypedSchema, ExtendableSchemaMixin):606 type: str = field(607 default=constants.ENVIRONMENTS_NODES_REQUIREMENT,608 metadata=field_metadata(609 required=True,610 validate=validate.OneOf([constants.ENVIRONMENTS_NODES_REQUIREMENT]),611 ),612 )613 name: str = ""614 is_default: bool = field(default=False)615 node_count: search_space.CountSpace = field(616 default=search_space.IntRange(min=1),617 metadata=field_metadata(decoder=search_space.decode_count_space),618 )619 core_count: search_space.CountSpace = field(620 default=search_space.IntRange(min=1),621 metadata=field_metadata(decoder=search_space.decode_count_space),622 )623 memory_mb: search_space.CountSpace = field(624 default=search_space.IntRange(min=512),625 metadata=field_metadata(decoder=search_space.decode_count_space),626 )627 disk: Optional[DiskOptionSettings] = None628 network_interface: Optional[NetworkInterfaceOptionSettings] = None629 gpu_count: search_space.CountSpace = field(630 default=search_space.IntRange(min=0),631 metadata=field_metadata(decoder=search_space.decode_count_space),632 )633 # all features on requirement should be included.634 # all features on capability can be included.635 _features: Optional[FeaturesSpace] = field(636 default=None,637 metadata=field_metadata(allow_none=True, data_key="features"),638 )639 # set by requirements640 # capability's is ignored641 _excluded_features: Optional[FeaturesSpace] = field(642 default=None,643 metadata=field_metadata(644 allow_none=True,645 data_key="excluded_features",646 ),647 )648 def __post_init__(self, *args: Any, **kwargs: Any) -> None:649 # clarify types to avoid type errors in properties.650 self._features: Optional[search_space.SetSpace[FeatureSettings]]651 self._excluded_features: Optional[search_space.SetSpace[FeatureSettings]]652 def __eq__(self, o: object) -> bool:653 assert isinstance(o, NodeSpace), f"actual: {type(o)}"654 return (655 self.type == o.type656 and self.node_count == o.node_count657 and self.core_count == o.core_count658 and self.memory_mb == o.memory_mb659 and self.disk == o.disk660 and self.network_interface == o.network_interface661 and self.gpu_count == o.gpu_count662 and self.features == o.features663 and self.excluded_features == o.excluded_features664 )665 def __repr__(self) -> str:666 """667 override it for shorter text668 """669 return (670 f"type:{self.type},name:{self.name},"671 f"default:{self.is_default},"672 f"count:{self.node_count},core:{self.core_count},"673 f"mem:{self.memory_mb},disk:{self.disk},"674 f"network interface: {self.network_interface}, gpu:{self.gpu_count},"675 f"f:{self.features},ef:{self.excluded_features},"676 f"{super().__repr__()}"677 )678 @property679 def cost(self) -> float:680 core_count = search_space.generate_min_capability_countspace(681 self.core_count, self.core_count682 )683 gpu_count = search_space.generate_min_capability_countspace(684 self.gpu_count, self.gpu_count685 )686 return core_count + gpu_count * 100687 @property688 def features(self) -> Optional[search_space.SetSpace[FeatureSettings]]:689 self._features = self._create_feature_settings_list(self._features)690 if self._features is not None:691 self._features.is_allow_set = True692 return cast(Optional[search_space.SetSpace[FeatureSettings]], self._features)693 @features.setter694 def features(self, value: Optional[search_space.SetSpace[FeatureSettings]]) -> None:695 self._features = cast(FeaturesSpace, value)696 @property697 def excluded_features(self) -> Optional[search_space.SetSpace[FeatureSettings]]:698 if not self._excluded_features:699 self._excluded_features = self._create_feature_settings_list(700 self._excluded_features701 )702 if self._excluded_features is not None:703 self._excluded_features.is_allow_set = False704 return cast(705 Optional[search_space.SetSpace[FeatureSettings]], self._excluded_features706 )707 @excluded_features.setter708 def excluded_features(709 self, value: Optional[search_space.SetSpace[FeatureSettings]]710 ) -> None:711 self._excluded_features = cast(FeaturesSpace, value)712 def check(self, capability: Any) -> search_space.ResultReason:713 result = search_space.ResultReason()714 if capability is None:715 result.add_reason("capability shouldn't be None")716 if self.features:717 assert self.features.is_allow_set, "features should be allow set"718 if self.excluded_features:719 assert (720 not self.excluded_features.is_allow_set721 ), "excluded_features shouldn't be allow set"722 assert isinstance(capability, NodeSpace), f"actual: {type(capability)}"723 if (724 not capability.node_count725 or not capability.core_count726 or not capability.memory_mb727 ):728 result.add_reason(729 "node_count, core_count, memory_mb " "shouldn't be None or zero."730 )731 if isinstance(self.node_count, int) and isinstance(capability.node_count, int):732 if self.node_count > capability.node_count:733 result.add_reason(734 f"capability node count {capability.node_count} "735 f"must be more than requirement {self.node_count}"736 )737 else:738 result.merge(739 search_space.check_countspace(self.node_count, capability.node_count),740 "node_count",741 )742 result.merge(743 search_space.check_countspace(self.core_count, capability.core_count),744 "core_count",745 )746 result.merge(747 search_space.check_countspace(self.memory_mb, capability.memory_mb),748 "memory_mb",749 )750 if self.disk:751 result.merge(self.disk.check(capability.disk))752 if self.network_interface:753 result.merge(self.network_interface.check(capability.network_interface))754 result.merge(755 search_space.check_countspace(self.gpu_count, capability.gpu_count),756 "gpu_count",757 )758 if self.features:759 for feature in self.features:760 cap_feature = self._find_feature_by_type(761 feature.type, capability.features762 )763 if cap_feature:764 result.merge(feature.check(cap_feature))765 else:766 result.add_reason(767 f"no feature '{feature.type}' found in capability"768 )769 if self.excluded_features:770 for feature in self.excluded_features:771 cap_feature = self._find_feature_by_type(772 feature.type, capability.features773 )774 if cap_feature:775 result.add_reason(776 f"excluded feature '{feature.type}' found in capability"777 )778 return result779 def expand_by_node_count(self) -> List[Any]:780 # expand node count in requirement to one,781 # so that's easy to compare equalization later.782 expanded_requirements: List[NodeSpace] = []783 node_count = search_space.generate_min_capability_countspace(784 self.node_count, self.node_count785 )786 for _ in range(node_count):787 expanded_copy = copy.copy(self)788 expanded_copy.node_count = 1789 expanded_requirements.append(expanded_copy)790 return expanded_requirements791 def has_feature(self, find_type: str) -> bool:792 result = False793 if not self.features:794 return result795 return any(feature for feature in self.features if feature.type == find_type)796 def _call_requirement_method(self, method_name: str, capability: Any) -> Any:797 assert isinstance(capability, NodeSpace), f"actual: {type(capability)}"798 # copy to duplicate extended schema799 value: NodeSpace = copy.deepcopy(self)800 if self.node_count or capability.node_count:801 if isinstance(self.node_count, int) and isinstance(802 capability.node_count, int803 ):804 # capability can have more node805 value.node_count = capability.node_count806 else:807 value.node_count = getattr(search_space, f"{method_name}_countspace")(808 self.node_count, capability.node_count809 )810 else:...
search_space.py
Source:search_space.py
...48 return self._generate_min_capability(capability)49 def intersect(self, capability: Any) -> Any:50 self._validate_result(capability)51 return self._intersect(capability)52 def _call_requirement_method(self, method_name: str, capability: Any) -> Any:53 raise NotImplementedError(method_name)54 def _generate_min_capability(self, capability: Any) -> Any:55 return self._call_requirement_method(56 method_name=RequirementMethod.generate_min_capability,57 capability=capability,58 )59 def _intersect(self, capability: Any) -> Any:60 return self._call_requirement_method(61 method_name=RequirementMethod.intersect, capability=capability62 )63 def _validate_result(self, capability: Any) -> None:64 check_result = self.check(capability)65 if not check_result.result:66 raise NotMeetRequirementException(67 f"capability doesn't support requirement: {check_result.reasons}"68 )69T_SEARCH_SPACE = TypeVar("T_SEARCH_SPACE", bound=RequirementMixin)70@dataclass_json()71@dataclass72class IntRange(RequirementMixin):73 min: int = 074 max: int = field(default=sys.maxsize)75 max_inclusive: bool = True76 def __post_init__(self, *args: Any, **kwargs: Any) -> None:77 if self.min > self.max:78 raise LisaException(79 f"min: {self.min} shouldn't be greater than max: {self.max}"80 )81 elif self.min == self.max and self.max_inclusive is False:82 raise LisaException(83 "min shouldn't be equal to max, if max_includes is False."84 )85 def __repr__(self) -> str:86 max_value = self.max if self.max < sys.maxsize else ""87 max_inclusive = ""88 if max_value:89 max_inclusive = "(inc)" if self.max_inclusive else "(exc)"90 return f"[{self.min},{max_value}{max_inclusive}]"91 def __eq__(self, __o: object) -> bool:92 assert isinstance(__o, IntRange), f"actual type: {type(__o)}"93 return (94 self.min == __o.min95 and self.max == __o.max96 and self.max_inclusive == __o.max_inclusive97 )98 def check(self, capability: Any) -> ResultReason:99 result = ResultReason()100 if capability is None:101 result.add_reason("capability shouldn't be None")102 else:103 if isinstance(capability, IntRange):104 if capability.max < self.min:105 result.add_reason(106 f"capability max({capability.max}) is "107 f"smaller than requirement min({self.min})"108 )109 elif capability.max == self.min and not capability.max_inclusive:110 result.add_reason(111 f"capability max({capability.max}) equals "112 f"to requirement min({self.min}), but "113 f"capability is not max_inclusive"114 )115 elif capability.min > self.max:116 result.add_reason(117 f"capability min({capability.min}) is "118 f"bigger than requirement max({self.max})"119 )120 elif capability.min == self.max and not self.max_inclusive:121 result.add_reason(122 f"capability min({capability.min}) equals "123 f"to requirement max({self.max}), but "124 f"requirement is not max_inclusive"125 )126 elif isinstance(capability, int):127 if capability < self.min:128 result.add_reason(129 f"capability({capability}) is "130 f"smaller than requirement min({self.min})"131 )132 elif capability > self.max:133 result.add_reason(134 f"capability ({capability}) is "135 f"bigger than requirement max({self.max})"136 )137 elif capability == self.max and not self.max_inclusive:138 result.add_reason(139 f"capability({capability}) equals "140 f"to requirement max({self.max}), but "141 f"requirement is not max_inclusive"142 )143 else:144 assert isinstance(capability, list), f"actual: {type(capability)}"145 temp_result = _one_of_matched(self, capability)146 if not temp_result.result:147 result.add_reason(148 "no capability matches requirement, "149 f"requirement: {self}, capability: {capability}"150 )151 return result152 def _generate_min_capability(self, capability: Any) -> int:153 if isinstance(capability, int):154 result: int = capability155 elif isinstance(capability, IntRange):156 if self.min < capability.min:157 result = capability.min158 else:159 result = self.min160 else:161 assert isinstance(capability, list), f"actual: {type(capability)}"162 result = self.max if self.max_inclusive else self.max - 1163 for cap_item in capability:164 temp_result = self.check(cap_item)165 if temp_result.result:166 temp_min = self.generate_min_capability(cap_item)167 result = min(temp_min, result)168 return result169 def _intersect(self, capability: Any) -> Any:170 if isinstance(capability, int):171 return capability172 elif isinstance(capability, IntRange):173 result = IntRange(174 min=self.min, max=self.max, max_inclusive=self.max_inclusive175 )176 if self.min < capability.min:177 result.min = capability.min178 if self.max > capability.max:179 result.max = capability.max180 result.max_inclusive = capability.max_inclusive181 elif self.max == capability.max:182 result.max_inclusive = capability.max_inclusive and self.max_inclusive183 else:184 raise NotImplementedError(185 f"IntRange doesn't support other intersect on {type(capability)}."186 )187 return result188CountSpace = Union[int, List[IntRange], IntRange, None]189def decode_count_space(data: Any) -> Any:190 """191 CountSpace is complex to marshmallow, so it needs customized decode.192 Anyway, marshmallow can encode it correctly.193 """194 decoded_data: CountSpace = None195 if data is None or isinstance(data, int) or isinstance(data, IntRange):196 decoded_data = data197 elif isinstance(data, list):198 decoded_data = []199 for item in data:200 if isinstance(item, dict):201 decoded_data.append(IntRange.schema().load(item)) # type: ignore202 else:203 assert isinstance(item, IntRange), f"actual: {type(item)}"204 decoded_data.append(item)205 else:206 assert isinstance(data, dict), f"actual: {type(data)}"207 decoded_data = IntRange.schema().load(data) # type: ignore208 return decoded_data209def _one_of_matched(requirement: Any, capabilities: List[Any]) -> ResultReason:210 result = ResultReason()211 supported = False212 assert isinstance(requirement, RequirementMixin), f"actual: {type(requirement)}"213 for cap_item in capabilities:214 temp_result = requirement.check(cap_item)215 if temp_result.result:216 supported = True217 break218 if not supported:219 result.add_reason("no one meeting requirement")220 return result221@dataclass_json()222@dataclass223class SetSpace(RequirementMixin, Set[T]):224 is_allow_set: bool = False225 items: List[T] = field(default_factory=list)226 def __init__(227 self,228 is_allow_set: Optional[bool] = None,229 items: Optional[Iterable[T]] = None,230 ) -> None:231 self.items: List[T] = []232 if items:233 self.update(items)234 if is_allow_set is not None:235 self.is_allow_set = is_allow_set236 def __repr__(self) -> str:237 return (238 f"allowed:{self.is_allow_set},"239 f"items:[{','.join([str(x) for x in self])}]"240 )241 def __post_init__(self, *args: Any, **kwargs: Any) -> None:242 self.update(self.items)243 def check(self, capability: Any) -> ResultReason:244 result = ResultReason()245 if self.is_allow_set and len(self) > 0 and not capability:246 result.add_reason(247 "if requirements is allow set and len > 0, capability shouldn't be None"248 )249 assert isinstance(capability, SetSpace), f"actual: {type(capability)}"250 assert capability.is_allow_set, "capability must be allow set"251 # if self.options is not None:252 # cap_set = capability.options253 if result.result:254 if self.is_allow_set:255 if not capability.issuperset(self):256 result.add_reason(257 "capability cannot support some of requirements, "258 f"requirement: '{self}'"259 f"capability: '{capability}', "260 )261 else:262 inter_set: Set[Any] = self.intersection(capability)263 if len(inter_set) > 0:264 names: List[str] = []265 for item in inter_set:266 if isinstance(item, type):267 names.append(item.__name__)268 elif isinstance(item, object):269 names.append(item.__class__.__name__)270 else:271 names.append(item)272 result.add_reason(f"requirements excludes {names}")273 return result274 def add(self, element: T) -> None:275 super().add(element)276 self.items.append(element)277 def remove(self, element: T) -> None:278 super().remove(element)279 self.items.remove(element)280 def update(self, *s: Iterable[T]) -> None:281 super().update(*s)282 self.items.extend(*s)283 def _generate_min_capability(self, capability: Any) -> Optional[Set[T]]:284 result: Optional[SetSpace[T]] = None285 if self.is_allow_set and len(self) > 0:286 assert isinstance(capability, SetSpace), f"actual: {type(capability)}"287 result = SetSpace(is_allow_set=self.is_allow_set)288 if len(capability) > 0:289 for item in self:290 if item in capability:291 result.add(item)292 return result293 def _intersect(self, capability: Any) -> Any:294 return self._generate_min_capability(capability)295def decode_set_space(data: Any) -> Any:296 """297 not sure what's reason, __post_init__ won't be called automatically.298 So write this decoder to force it's called on deserializing299 """300 result = None301 if data:302 result = SetSpace.schema().load(data) # type: ignore303 return result304def decode_set_space_by_type(305 data: Any, base_type: Type[T]306) -> Optional[Union[SetSpace[T], T]]:307 if isinstance(data, dict):308 new_data = SetSpace[T](is_allow_set=True)309 types = data.get("items", [])310 for item in types:311 new_data.add(base_type(item)) # type: ignore312 decoded_data: Optional[Union[SetSpace[T], T]] = new_data313 elif isinstance(data, list):314 new_data = SetSpace[T](is_allow_set=True)315 for item in data:316 new_data.add(base_type(item)) # type: ignore317 decoded_data = new_data318 elif isinstance(data, str):319 decoded_data = base_type(data) # type: ignore320 elif isinstance(data, SetSpace):321 decoded_data = data322 else:323 raise LisaException(f"unknown data type: {type(data)}")324 return decoded_data325def check_countspace(requirement: CountSpace, capability: CountSpace) -> ResultReason:326 result = ResultReason()327 if requirement is not None:328 if capability is None:329 result.add_reason(330 "if requirements isn't None, capability shouldn't be None"331 )332 else:333 if isinstance(requirement, int):334 if isinstance(capability, int):335 if requirement != capability:336 result.add_reason(337 "requirement is a number, capability should be exact "338 f"much, but requirement: {requirement}, "339 f"capability: {capability}"340 )341 elif isinstance(capability, IntRange):342 temp_result = capability.check(requirement)343 if not temp_result.result:344 result.add_reason(345 "requirement is a number, capability should include it, "346 f"but requirement: {requirement}, capability: {capability}"347 )348 else:349 assert isinstance(capability, list), f"actual: {type(capability)}"350 temp_requirement = IntRange(min=requirement, max=requirement)351 temp_result = _one_of_matched(temp_requirement, capability)352 if not temp_result.result:353 result.add_reason(354 f"requirement is a number, no capability matched, "355 f"requirement: {requirement}, capability: {capability}"356 )357 elif isinstance(requirement, IntRange):358 result.merge(requirement.check(capability))359 else:360 assert isinstance(requirement, list), f"actual: {type(requirement)}"361 supported = False362 for req_item in requirement:363 temp_result = req_item.check(capability)364 if temp_result.result:365 supported = True366 if not supported:367 result.add_reason(368 "no capability matches requirement, "369 f"requirement: {requirement}, capability: {capability}"370 )371 return result372def generate_min_capability_countspace(373 requirement: CountSpace, capability: CountSpace374) -> int:375 check_result = check_countspace(requirement, capability)376 if not check_result.result:377 raise NotMeetRequirementException(378 "cannot get min value, capability doesn't support requirement: "379 f"{check_result.reasons}"380 )381 if requirement is None:382 if capability:383 requirement = capability384 result: int = sys.maxsize385 else:386 result = 0387 if isinstance(requirement, int):388 result = requirement389 elif isinstance(requirement, IntRange):390 result = requirement.generate_min_capability(capability)391 else:392 assert isinstance(requirement, list), f"actual: {type(requirement)}"393 result = sys.maxsize394 for req_item in requirement:395 temp_result = req_item.check(capability)396 if temp_result.result:397 temp_min = req_item.generate_min_capability(capability)398 result = min(result, temp_min)399 return result400def intersect_countspace(requirement: CountSpace, capability: CountSpace) -> Any:401 check_result = check_countspace(requirement, capability)402 if not check_result.result:403 raise NotMeetRequirementException(404 "cannot get intersect, capability doesn't support requirement: "405 f"{check_result.reasons}"406 )407 if requirement is None and capability:408 return copy.copy(capability)409 if isinstance(requirement, int):410 result = requirement411 elif isinstance(requirement, IntRange):412 result = requirement.intersect(capability)413 else:414 raise LisaException(415 f"not support to get intersect on countspace type: {type(requirement)}"416 )417 return result418def check_setspace(419 requirement: Optional[Union[SetSpace[T], T]],420 capability: Optional[Union[SetSpace[T], T]],421) -> ResultReason:422 result = ResultReason()423 if capability is None:424 result.add_reason("capability shouldn't be None")425 else:426 if requirement is not None:427 has_met_check = False428 if not isinstance(capability, SetSpace):429 capability = SetSpace[T](items=[capability])430 if not isinstance(requirement, SetSpace):431 requirement = SetSpace[T](items=[requirement])432 for item in requirement:433 if item in capability:434 has_met_check = True435 break436 if not has_met_check:437 result.add_reason(438 f"requirement not supported in capability. "439 f"requirement: {requirement}, "440 f"capability: {capability}"441 )442 return result443def generate_min_capability_setspace_by_priority(444 requirement: Optional[Union[SetSpace[T], T]],445 capability: Optional[Union[SetSpace[T], T]],446 priority_list: List[T],447) -> T:448 check_result = check_setspace(requirement, capability)449 if not check_result.result:450 raise NotMeetRequirementException(451 "cannot get min value, capability doesn't support requirement"452 f"{check_result.reasons}"453 )454 assert capability is not None, "Capability shouldn't be None"455 # Ensure that both cap and req are instance of SetSpace456 if not isinstance(capability, SetSpace):457 capability = SetSpace[T](items=[capability])458 if requirement is None:459 requirement = capability460 if not isinstance(requirement, SetSpace):461 requirement = SetSpace[T](items=[requirement])462 # Find min capability463 min_cap: Optional[T] = None464 for item in priority_list:465 if item in requirement and item in capability:466 min_cap = item467 break468 assert min_cap, (469 "Cannot find min capability on data path, "470 f"requirement: {requirement}"471 f"capability: {capability}"472 )473 return min_cap474def intersect_setspace_by_priority(475 requirement: Optional[Union[SetSpace[T], T]],476 capability: Optional[Union[SetSpace[T], T]],477 priority_list: List[T],478) -> Any:479 # intersect doesn't need to take care about priority.480 check_result = check_setspace(requirement, capability)481 if not check_result.result:482 raise NotMeetRequirementException(483 f"capability doesn't support requirement: {check_result.reasons}"484 )485 assert capability is not None, "Capability shouldn't be None"486 value = SetSpace[T]()487 # Ensure that both cap and req are instance of SetSpace488 if not isinstance(capability, SetSpace):489 capability = SetSpace[T](items=[capability])490 if requirement is None:491 requirement = capability492 if not isinstance(requirement, SetSpace):493 requirement = SetSpace[T](items=[requirement])494 # Find min capability495 for item in requirement:496 if item in capability:497 value.add(item)498 return value499def count_space_to_int_range(count_space: CountSpace) -> IntRange:500 if count_space is None:501 result = IntRange(min=sys.maxsize * -1, max=sys.maxsize)502 elif isinstance(count_space, int):503 result = IntRange(min=count_space, max=count_space)504 elif isinstance(count_space, IntRange):505 result = count_space506 else:507 raise LisaException(508 f"unsupported type: {type(count_space)}, value: '{count_space}'"509 )510 return result511def check(512 requirement: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],513 capability: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],514) -> ResultReason:515 result = ResultReason()516 if requirement is not None:517 if capability is None:518 result.add_reason(519 f"capability shouldn't be None, requirement: [{requirement}]"520 )521 elif isinstance(requirement, (list)):522 supported = False523 for req_item in requirement:524 temp_result = req_item.check(capability)525 if temp_result.result:526 supported = True527 if not supported:528 result.add_reason(529 "no capability meet any of requirement, "530 f"requirement: {requirement}, capability: {capability}"531 )532 else:533 result.merge(requirement.check(capability))534 return result535def _call_requirement_method(536 method: str,537 requirement: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],538 capability: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],539) -> Any:540 check_result = check(requirement, capability)541 if not check_result.result:542 raise NotMeetRequirementException(543 f"cannot call {method}, capability doesn't support requirement"544 )545 result: Optional[T_SEARCH_SPACE] = None546 if requirement is None:547 if capability is not None:548 requirement = capability549 if (550 isinstance(requirement, list)551 and method == RequirementMethod.generate_min_capability552 ):553 result = None554 for req_item in requirement:555 temp_result = req_item.check(capability)556 if temp_result.result:557 temp_min = getattr(req_item, method)(capability)558 if result is None:559 result = temp_min560 else:561 # TODO: multiple matches found, not supported well yet562 # It can be improved by implement __eq__, __lt__ functions.563 result = min(result, temp_min)564 elif requirement is not None:565 result = getattr(requirement, method)(capability)566 return result567def generate_min_capability(568 requirement: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],569 capability: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],570) -> Any:571 return _call_requirement_method(572 RequirementMethod.generate_min_capability,573 requirement=requirement,574 capability=capability,575 )576def intersect(577 requirement: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],578 capability: Union[T_SEARCH_SPACE, List[T_SEARCH_SPACE], None],579) -> Any:580 return _call_requirement_method(581 RequirementMethod.intersect, requirement=requirement, capability=capability582 )583def equal_list(first: Optional[List[Any]], second: Optional[List[Any]]) -> bool:584 if first is None or second is None:585 result = first is second586 else:587 result = len(first) == len(second)588 result = result and all(589 f_item == second[index] for index, f_item in enumerate(first)590 )591 return result592def create_set_space(593 included_set: Optional[Iterable[T]],594 excluded_set: Optional[Iterable[T]],...
security_profile.py
Source:security_profile.py
...53 def __hash__(self) -> int:54 return hash(self._get_key())55 def _get_key(self) -> str:56 return f"{self.type}/{self.security_profile}"57 def _call_requirement_method(self, method_name: str, capability: Any) -> Any:58 value = SecurityProfileSettings()59 value.security_profile = getattr(60 search_space, f"{method_name}_setspace_by_priority"61 )(62 self.security_profile,63 capability.security_profile,64 security_profile_priority,65 )66 value.encrypt_disk = self.encrypt_disk or capability.encrypt_disk67 return value68 def check(self, capability: Any) -> search_space.ResultReason:69 assert isinstance(70 capability, SecurityProfileSettings71 ), f"actual: {type(capability)}"...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!