Best Python code snippet using lisa_python
node.py
Source:node.py
...371 return constants.ENVIRONMENTS_NODES_REMOTE372 @classmethod373 def type_schema(cls) -> Type[schema.TypedSchema]:374 return schema.RemoteNode375 def set_connection_info_by_runbook(376 self,377 default_username: str = "",378 default_password: str = "",379 default_private_key_file: str = "",380 ) -> None:381 fields = [382 constants.ENVIRONMENTS_NODES_REMOTE_ADDRESS,383 constants.ENVIRONMENTS_NODES_REMOTE_PORT,384 constants.ENVIRONMENTS_NODES_REMOTE_PUBLIC_ADDRESS,385 constants.ENVIRONMENTS_NODES_REMOTE_PUBLIC_PORT,386 ]387 parameters = fields_to_dict(self.runbook, fields)388 # use default credential, if they are not specified389 node_runbook = cast(schema.RemoteNode, self.runbook)390 parameters[constants.ENVIRONMENTS_NODES_REMOTE_USERNAME] = (391 node_runbook.username if node_runbook.username else default_username392 )393 parameters[constants.ENVIRONMENTS_NODES_REMOTE_PASSWORD] = (394 node_runbook.password if node_runbook.password else default_password395 )396 parameters[constants.ENVIRONMENTS_NODES_REMOTE_PRIVATE_KEY_FILE] = (397 node_runbook.private_key_file398 if node_runbook.private_key_file399 else default_private_key_file400 )401 self.set_connection_info(**parameters)402 def set_connection_info(403 self,404 address: str = "",405 port: Optional[int] = 22,406 public_address: str = "",407 public_port: Optional[int] = 22,408 username: str = "root",409 password: str = "",410 private_key_file: str = "",411 ) -> None:412 if not address and not public_address:413 raise LisaException(414 "at least one of address and public_address need to be set"415 )416 elif not address:417 address = public_address418 elif not public_address:419 public_address = address420 if not port and not public_port:421 raise LisaException("at least one of port and public_port need to be set")422 elif not port:423 port = public_port424 elif not public_port:425 public_port = port426 assert public_port427 assert port428 self._connection_info: schema.ConnectionInfo = schema.ConnectionInfo(429 public_address,430 public_port,431 username,432 password,433 private_key_file,434 )435 self._shell = SshShell(self._connection_info)436 self.public_address = public_address437 self.public_port = public_port438 self.internal_address = address439 self.internal_port = port440 def _initialize(self, *args: Any, **kwargs: Any) -> None:441 assert self._connection_info, "call setConnectionInfo before use remote node"442 super()._initialize(*args, **kwargs)443 def get_working_path(self) -> PurePath:444 if self.is_posix:445 remote_root_path = Path("$HOME")446 else:447 remote_root_path = Path("%TEMP%")448 working_path = remote_root_path.joinpath(449 constants.PATH_REMOTE_ROOT, constants.RUN_LOGIC_PATH450 ).as_posix()451 # expand environment variables in path452 echo = self.tools[Echo]453 result = echo.run(working_path, shell=True)454 return self.get_pure_path(result.stdout)455class LocalNode(Node):456 def __init__(457 self,458 runbook: schema.Node,459 index: int,460 logger_name: str,461 base_part_path: Optional[Path],462 parent_logger: Optional[Logger] = None,463 ) -> None:464 super().__init__(465 index=index,466 runbook=runbook,467 logger_name=logger_name,468 base_part_path=base_part_path,469 parent_logger=parent_logger,470 )471 self._shell = LocalShell()472 @property473 def is_remote(self) -> bool:474 return False475 @classmethod476 def type_name(cls) -> str:477 return constants.ENVIRONMENTS_NODES_LOCAL478 @classmethod479 def type_schema(cls) -> Type[schema.TypedSchema]:480 return schema.LocalNode481 def get_working_path(self) -> PurePath:482 return self.local_working_path483 def __repr__(self) -> str:484 return "local"485class Nodes:486 def __init__(self) -> None:487 super().__init__()488 self._default: Optional[Node] = None489 self._list: List[Node] = []490 def __getitem__(self, key: Union[int, str]) -> Node:491 found = None492 if not self._list:493 raise LisaException("no node found")494 if isinstance(key, int):495 if len(self._list) > key:496 found = self._list[key]497 else:498 for node in self._list:499 if node.name == key:500 found = node501 break502 if not found:503 raise KeyError(f"cannot find node {key}")504 return found505 def __setitem__(self, key: Union[int, str], v: Node) -> None:506 raise NotImplementedError("don't set node directly, call from_*")507 def __len__(self) -> int:508 return len(self._list)509 @property510 def default(self) -> Node:511 if self._default is None:512 default = None513 for node in self._list:514 if node.is_default:515 default = node516 break517 if default is None:518 if len(self._list) == 0:519 raise LisaException("No node found in current environment")520 else:521 default = self._list[0]522 self._default = default523 return self._default524 def list(self) -> Iterable[Node]:525 for node in self._list:526 yield node527 def initialize(self) -> None:528 run_in_parallel([x.initialize for x in self._list])529 def close(self) -> None:530 for node in self._list:531 node.close()532 def cleanup(self) -> None:533 for node in self._list:534 node.cleanup()535 def append(self, node: Node) -> None:536 self._list.append(node)537 def test_connections(self) -> bool:538 return all(run_in_parallel([x.test_connection for x in self._list]))539def local_node_connect(540 index: int = -1,541 name: str = "local",542 base_part_path: Optional[Path] = None,543 parent_logger: Optional[Logger] = None,544) -> Node:545 node_runbook = schema.LocalNode(name=name, capability=schema.Capability())546 node = Node.create(547 index=index,548 runbook=node_runbook,549 logger_name=name,550 base_part_path=base_part_path,551 parent_logger=parent_logger,552 )553 node.initialize()554 return node555def local() -> Node:556 """557 Return a default local node. There is no special configuration.558 """559 global __local_node560 if __local_node is None:561 __local_node = local_node_connect()562 return __local_node563def quick_connect(564 runbook: schema.Node,565 logger_name: str = "",566 index: int = -1,567 parent_logger: Optional[Logger] = None,568) -> Node:569 """570 setup node information and initialize connection.571 """572 node = Node.create(573 index, runbook, logger_name=logger_name, parent_logger=parent_logger574 )575 if isinstance(node, RemoteNode):576 node.set_connection_info_by_runbook()577 node.initialize()...
platform_.py
Source:platform_.py
...105 # nodes.106 platform_runbook = cast(schema.Platform, self.runbook)107 for node in environment.nodes.list():108 if isinstance(node, RemoteNode):109 node.set_connection_info_by_runbook(110 default_username=platform_runbook.admin_username,111 default_password=platform_runbook.admin_password,112 default_private_key_file=platform_runbook.admin_private_key_file,113 )114 try:115 is_success = self._prepare_environment(environment, log)116 except NotMeetRequirementException as identifier:117 raise SkippedException(identifier)118 if is_success:119 environment.status = EnvironmentStatus.Prepared120 else:121 raise LisaException(122 f"no capability found for environment: {environment.runbook}"123 )...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!