Best Python code snippet using lisa_python
environment.py
Source:environment.py
...51 ],52)53_global_environment_id = 054_global_environment_id_lock: Lock = Lock()55def _get_environment_id() -> int:56 """57 Return an unique id crossing threads, runners.58 """59 global _global_environment_id_lock60 global _global_environment_id61 with _global_environment_id_lock:62 id = _global_environment_id63 _global_environment_id += 164 return id65@dataclass66class EnvironmentMessage(MessageBase):67 type: str = "Environment"68 name: str = ""69 runbook: schema.Environment = schema.Environment()70 status: EnvironmentStatus = EnvironmentStatus.New71@dataclass_json()72@dataclass73class EnvironmentSpace(search_space.RequirementMixin):74 """75 Search space of an environment. It uses to76 1. Specify test suite requirement, see TestCaseRequirement77 2. Describe capability of an environment, see Environment.capability78 """79 topology: str = field(80 default=constants.ENVIRONMENTS_SUBNET,81 metadata=field_metadata(82 validate=validate.OneOf([constants.ENVIRONMENTS_SUBNET])83 ),84 )85 nodes: List[schema.NodeSpace] = field(default_factory=list)86 def __post_init__(self, *args: Any, **kwargs: Any) -> None:87 self._expand_node_space()88 def __eq__(self, o: object) -> bool:89 assert isinstance(o, EnvironmentSpace), f"actual: {type(o)}"90 return self.topology == o.topology and search_space.equal_list(91 self.nodes, o.nodes92 )93 def check(self, capability: Any) -> search_space.ResultReason:94 assert isinstance(capability, EnvironmentSpace), f"actual: {type(capability)}"95 result = search_space.ResultReason()96 if not capability.nodes:97 result.add_reason("no node instance found")98 elif len(self.nodes) > len(capability.nodes):99 result.add_reason(100 f"no enough nodes, "101 f"requirement: {len(self.nodes)}, "102 f"capability: {len(capability.nodes)}."103 )104 else:105 if self.nodes:106 for index, current_req in enumerate(self.nodes):107 current_cap = capability.nodes[index]108 result.merge(109 search_space.check(current_req, current_cap),110 str(index),111 )112 if not result.result:113 break114 return result115 def _generate_min_capability(self, capability: Any) -> Any:116 env = EnvironmentSpace(topology=self.topology)117 assert isinstance(capability, EnvironmentSpace), f"actual: {type(capability)}"118 assert capability.nodes119 for index, current_req in enumerate(self.nodes):120 if len(capability.nodes) == 1:121 current_cap = capability.nodes[0]122 else:123 current_cap = capability.nodes[index]124 env.nodes.append(current_req.generate_min_capability(current_cap))125 return env126 def _expand_node_space(self) -> None:127 if self.nodes:128 expanded_requirements: List[schema.NodeSpace] = []129 for node in self.nodes:130 expanded_requirements.extend(node.expand_by_node_count())131 self.nodes = expanded_requirements132class Environment(ContextMixin, InitializableMixin):133 def __init__(134 self,135 is_predefined: bool,136 warn_as_error: bool,137 id_: int,138 runbook: schema.Environment,139 ) -> None:140 super().__init__()141 self.nodes: Nodes142 self.runbook = runbook143 self.name = runbook.name144 self.is_predefined: bool = is_predefined145 self.is_new: bool = True146 self.id: str = str(id_)147 self.warn_as_error = warn_as_error148 self.platform: Optional[Platform] = None149 self.log = get_logger("env", self.name)150 self.source_test_result: Optional[TestResult] = None151 self._default_node: Optional[Node] = None152 self._is_dirty: bool = False153 # track retried times to provide an unique name.154 self._raw_id = id_155 self._retries: int = 0156 # cost uses to plan order of environments.157 # cheaper env can fit cases earlier to run more cases on it.158 # 1. smaller is higher priority, it can be index of candidate environment159 # 2. 0 means no cost.160 self.cost: float = 0161 # indicate is this environment is deploying, preparing, testing or not.162 self.is_in_use: bool = False163 # Not to set the log path until its first used. Because the path164 # contains environment name, which is not set in __init__.165 self._log_path: Optional[Path] = None166 self._working_path: Optional[Path] = None167 # it's used to compose a consistent path for both log and working path.168 self.environment_part_path = Path(169 f"environments/{get_datetime_path()}-{self.name}"170 )171 self._status: Optional[EnvironmentStatus] = None172 self.status = EnvironmentStatus.New173 def __repr__(self) -> str:174 return self.name175 @property176 def status(self) -> EnvironmentStatus:177 assert self._status178 return self._status179 @status.setter180 def status(self, value: EnvironmentStatus) -> None:181 # sometimes there are duplicated messages, ignore if no change.182 if self._status != value:183 if value == EnvironmentStatus.New:184 self._reset()185 self._status = value186 environment_message = EnvironmentMessage(187 name=self.name, status=self._status, runbook=self.runbook188 )189 notifier.notify(environment_message)190 @property191 def is_alive(self) -> bool:192 return self._status in [193 EnvironmentStatus.New,194 EnvironmentStatus.Prepared,195 EnvironmentStatus.Deployed,196 EnvironmentStatus.Connected,197 ]198 @property199 def default_node(self) -> Node:200 return self.nodes.default201 @property202 def log_path(self) -> Path:203 # avoid to create path for UT. There may be path conflict in UT.204 if is_unittest():205 return Path()206 if not self._log_path:207 self._log_path = constants.RUN_LOCAL_LOG_PATH / self.environment_part_path208 self._log_path.mkdir(parents=True, exist_ok=True)209 return self._log_path210 @property211 def working_path(self) -> Path:212 if is_unittest():213 return Path()214 if not self._working_path:215 self._working_path = (216 constants.RUN_LOCAL_WORKING_PATH / self.environment_part_path217 )218 self._working_path.mkdir(parents=True, exist_ok=True)219 return self._working_path220 @property221 def capability(self) -> EnvironmentSpace:222 result = EnvironmentSpace(topology=self.runbook.topology)223 for node in self.nodes.list():224 result.nodes.append(node.capability)225 if (226 self.status in [EnvironmentStatus.Prepared, EnvironmentStatus.New]227 and self.runbook.nodes_requirement228 ):229 result.nodes.extend(self.runbook.nodes_requirement)230 return result231 @property232 def is_dirty(self) -> bool:233 return self._is_dirty or any(x.is_dirty for x in self.nodes.list())234 def cleanup(self) -> None:235 self.nodes.cleanup()236 if hasattr(self, "_log_handler") and self._log_handler:237 remove_handler(self._log_handler, self.log)238 self._log_handler.close()239 def close(self) -> None:240 self.nodes.close()241 def create_node_from_exists(242 self,243 node_runbook: schema.Node,244 ) -> Node:245 node = Node.create(246 index=len(self.nodes),247 runbook=node_runbook,248 base_part_path=self.environment_part_path,249 parent_logger=self.log,250 )251 self.nodes.append(node)252 return node253 def create_node_from_requirement(254 self,255 node_requirement: schema.NodeSpace,256 ) -> Node:257 min_requirement = cast(258 schema.Capability,259 node_requirement.generate_min_capability(node_requirement),260 )261 assert isinstance(min_requirement.node_count, int), (262 f"must be int after generate_min_capability, "263 f"actual: {min_requirement.node_count}"264 )265 # node count should be expanded in platform already266 assert min_requirement.node_count == 1, f"actual: {min_requirement.node_count}"267 mock_runbook = schema.RemoteNode(268 type=constants.ENVIRONMENTS_NODES_REMOTE,269 capability=min_requirement,270 is_default=node_requirement.is_default,271 )272 node = Node.create(273 index=len(self.nodes),274 runbook=mock_runbook,275 base_part_path=self.environment_part_path,276 parent_logger=self.log,277 )278 self.nodes.append(node)279 return node280 def get_information(self) -> Dict[str, str]:281 final_information: Dict[str, str] = {}282 informations: List[283 Dict[str, str]284 ] = plugin_manager.hook.get_environment_information(environment=self)285 # reverse it, since it's FILO order,286 # try basic earlier, and they are allowed to be overwritten287 informations.reverse()288 for current_information in informations:289 final_information.update(current_information)290 return final_information291 def mark_dirty(self) -> None:292 self.log.debug("mark environment to dirty")293 self._is_dirty = True294 def _initialize(self, *args: Any, **kwargs: Any) -> None:295 if self.status != EnvironmentStatus.Deployed:296 raise LisaException("environment is not deployed, cannot be initialized")297 if not hasattr(self, "_log_handler"):298 self._log_handler = create_file_handler(299 self.log_path / "environment.log", self.log300 )301 self.nodes.initialize()302 self.status = EnvironmentStatus.Connected303 def _reset(self) -> None:304 self.nodes = Nodes()305 self.runbook.reload_requirements()306 self.is_new = True307 if not self.runbook.nodes_requirement and not self.runbook.nodes:308 raise LisaException("not found any node or requirement in environment")309 has_default_node = False310 for node_runbook in self.runbook.nodes:311 self.create_node_from_exists(312 node_runbook=node_runbook,313 )314 has_default_node = self._validate_single_default(315 has_default_node, node_runbook.is_default316 )317 if self._retries > 0:318 # provide an unique id.319 self.id = f"{self._raw_id}-{self._retries}"320 self.remove_context()321 self._retries += 1322 def _validate_single_default(323 self, has_default: bool, is_default: Optional[bool]324 ) -> bool:325 if is_default:326 if has_default:327 raise LisaException("only one node can set isDefault to True")328 has_default = True329 return has_default330if TYPE_CHECKING:331 EnvironmentsDict = UserDict[str, Environment]332else:333 EnvironmentsDict = UserDict334class Environments(EnvironmentsDict):335 def __init__(336 self,337 warn_as_error: bool = False,338 ) -> None:339 super().__init__()340 self.warn_as_error = warn_as_error341 def get_or_create(self, requirement: EnvironmentSpace) -> Optional[Environment]:342 result: Optional[Environment] = None343 for environment in self.values():344 # find exact match, or create a new one.345 if requirement == environment.capability:346 result = environment347 break348 else:349 result = self.from_requirement(requirement)350 return result351 def from_requirement(self, requirement: EnvironmentSpace) -> Optional[Environment]:352 runbook = schema.Environment(353 topology=requirement.topology,354 nodes_requirement=requirement.nodes,355 )356 id_ = _get_environment_id()357 return self.from_runbook(358 runbook=runbook,359 name=f"generated_{id_}",360 is_predefined_runbook=False,361 id_=id_,362 )363 def from_runbook(364 self,365 runbook: schema.Environment,366 name: str,367 is_predefined_runbook: bool,368 id_: int,369 ) -> Optional[Environment]:370 assert runbook371 assert name372 env: Optional[Environment] = None373 # make a copy, so that modification on env won't impact test case374 copied_runbook = copy.copy(runbook)375 copied_runbook.name = name376 env = Environment(377 is_predefined=is_predefined_runbook,378 warn_as_error=self.warn_as_error,379 id_=id_,380 runbook=copied_runbook,381 )382 self[name] = env383 log = _get_init_logger()384 log.debug(f"created {env.name}: {env.runbook}")385 return env386def load_environments(387 root_runbook: Optional[schema.EnvironmentRoot],388) -> Environments:389 if root_runbook:390 environments = Environments(391 warn_as_error=root_runbook.warn_as_error,392 )393 environments_runbook = root_runbook.environments394 for environment_runbook in environments_runbook:395 id_ = _get_environment_id()396 env = environments.from_runbook(397 runbook=environment_runbook,398 name=environment_runbook.name or f"customized_{id_}",399 is_predefined_runbook=True,400 id_=id_,401 )402 assert env, "created from runbook shouldn't be None"403 else:404 environments = Environments()405 return environments406class EnvironmentHookSpec:407 @hookspec408 def get_environment_information(self, environment: Environment) -> Dict[str, str]:409 ......
test_registration.py
Source:test_registration.py
...115 self.stub_cp_provider.basic_auth_cp = mock_uep116 rc = RegisterCommand()117 rc.options = Mock()118 rc.options.activation_keys = None119 env_id = rc._get_environment_id(mock_uep, 'owner', None)120 expected = None121 self.assertEqual(expected, env_id)122 def test_get_environment_id_one_available(self):123 def env_list(*args, **kwargs):124 return [{"id": "1234", "name": "somename"}]125 with patch('rhsm.connection.UEPConnection', new_callable=StubUEP) as mock_uep:126 mock_uep.getEnvironmentList = env_list127 mock_uep.supports_resource = Mock(return_value=True)128 self.stub_cp_provider.basic_auth_cp = mock_uep129 rc = RegisterCommand()130 rc.options = Mock()131 rc.options.activation_keys = None132 env_id = rc._get_environment_id(mock_uep, 'owner', None)133 expected = "1234"134 self.assertEqual(expected, env_id)135 def test_get_environment_id_multi_available(self):136 def env_list(*args, **kwargs):137 return [{"id": "1234", "name": "somename"},138 {"id": "5678", "name": "othername"}]139 with patch('rhsm.connection.UEPConnection', new_callable=StubUEP) as mock_uep:140 mock_uep.getEnvironmentList = env_list141 mock_uep.supports_resource = Mock(return_value=True)142 self.stub_cp_provider.basic_auth_cp = mock_uep143 rc = RegisterCommand()144 rc.options = Mock()145 rc.options.activation_keys = None146 rc._prompt_for_environment = Mock(return_value="othername")147 env_id = rc._get_environment_id(mock_uep, 'owner', None)148 expected = "5678"149 self.assertEqual(expected, env_id)150 def test_get_environment_id_multi_available_bad_name(self):151 def env_list(*args, **kwargs):152 return [{"id": "1234", "name": "somename"},153 {"id": "5678", "name": "othername"}]154 with patch('rhsm.connection.UEPConnection', new_callable=StubUEP) as mock_uep:155 mock_uep.getEnvironmentList = env_list156 mock_uep.supports_resource = Mock(return_value=True)157 self.stub_cp_provider.basic_auth_cp = mock_uep158 rc = RegisterCommand()159 rc.options = Mock()160 rc.options.activation_keys = None161 rc._prompt_for_environment = Mock(return_value="not_an_env")162 with Capture(silent=True):163 with self.assertRaises(SystemExit):164 rc._get_environment_id(mock_uep, 'owner', None)165 def test_registration_with_failed_profile_upload(self):166 with patch('rhsm.connection.UEPConnection', new_callable=StubUEP) as mock_uep:167 profile_mgr = inj.require(inj.PROFILE_MANAGER)168 def raise_remote_server_exception(*args, **kwargs):169 """Raise remote server exception (uploading of profile failed)"""170 from rhsm.connection import RemoteServerException171 raise RemoteServerException(172 502,173 request_type="PUT",174 handler="/subscription/consumers/xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/profiles"175 )176 profile_mgr.update_check = Mock(side_effect=raise_remote_server_exception)177 self.stub_cp_provider.basic_auth_cp = mock_uep178 cmd = RegisterCommand()...
base.py
Source:base.py
...53 self._api = self._th.teachable(self.teachable)54 self._deployment_id = None55 self._paths_to_be_deleted = []56 self._archived_model_path = None57 self._environment_id = self._get_environment_id(environment)58 self._deployment_data = {59 "framework": self.framework,60 "environment": self._environment_id61 }62 version = kwargs.get('version', None)63 if version:64 self._version = int(version)65 self._retrieve_existing_deployment()66 # this is used where we can get the schema from the model67 # like in the ludwig for example68 self._auto_schema = True69 def _get_environment_id(self, environment_name):70 all_envs = self._api.get_all_environments()71 for env in all_envs:72 if env['name'] == environment_name:73 return env['id']74 def _retrieve_existing_deployment(self):75 r = self._api.get_deployment_by_env_id_and_version(self._environment_id, self._version)76 data = r.json()77 if len(data) == 1:78 self._deployment_data = data[0]79 self._deployment_id = data[0]['uuid']80 self._version = data[0]['version']81 return self._deployment_data82 else:83 raise AssertionError("The deployment with this version and environment should be only one.")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!