Best Python code snippet using lisa_python
node.py
Source:node.py
...92 return self._shell is not None and self._shell.is_connected93 @property94 def local_log_path(self) -> Path:95 if not self._local_log_path:96 part_name = self._get_node_part_path()97 log_path = constants.RUN_LOCAL_LOG_PATH / self._base_part_path / part_name98 self._local_log_path = log_path99 self._local_log_path.mkdir(parents=True, exist_ok=True)100 return self._local_log_path101 @property102 def local_working_path(self) -> Path:103 if not self._local_working_path:104 part_name = self._get_node_part_path()105 self._local_working_path = (106 constants.RUN_LOCAL_WORKING_PATH / self._base_part_path / part_name107 )108 self._local_working_path.mkdir(parents=True, exist_ok=True)109 return self._local_working_path110 @property111 def working_path(self) -> PurePath:112 """113 The working path may be a remote path on remote node. It uses to put executable.114 """115 if not self._working_path:116 self._working_path = self.get_working_path()117 self.shell.mkdir(self._working_path, parents=True, exist_ok=True)118 self.log.debug(f"working path is: '{self._working_path}'")119 return self._working_path120 @property121 def nics(self) -> Nics:122 if self._nics is None:123 self._nics = Nics(self)124 self._nics.initialize()125 return self._nics126 @property127 def is_dirty(self) -> bool:128 return self._is_dirty129 @classmethod130 def create(131 cls,132 index: int,133 runbook: schema.Node,134 logger_name: str = "node",135 base_part_path: Optional[Path] = None,136 parent_logger: Optional[Logger] = None,137 ) -> Node:138 if not cls._factory:139 cls._factory = subclasses.Factory[Node](Node)140 node = cls._factory.create_by_runbook(141 index=index,142 runbook=runbook,143 logger_name=logger_name,144 base_part_path=base_part_path,145 parent_logger=parent_logger,146 )147 node.log.debug(148 f"created, type: '{node.__class__.__name__}', default: {runbook.is_default}"149 )150 return node151 def reboot(self, time_out: int = 300) -> None:152 self.tools[Reboot].reboot(time_out)153 def execute(154 self,155 cmd: str,156 shell: bool = False,157 sudo: bool = False,158 no_error_log: bool = False,159 no_info_log: bool = True,160 no_debug_log: bool = False,161 cwd: Optional[PurePath] = None,162 timeout: int = 600,163 update_envs: Optional[Dict[str, str]] = None,164 expected_exit_code: Optional[int] = None,165 expected_exit_code_failure_message: str = "",166 ) -> ExecutableResult:167 process = self.execute_async(168 cmd,169 shell=shell,170 sudo=sudo,171 no_error_log=no_error_log,172 no_info_log=no_info_log,173 no_debug_log=no_debug_log,174 cwd=cwd,175 update_envs=update_envs,176 )177 return process.wait_result(178 timeout=timeout,179 expected_exit_code=expected_exit_code,180 expected_exit_code_failure_message=expected_exit_code_failure_message,181 )182 def execute_async(183 self,184 cmd: str,185 shell: bool = False,186 sudo: bool = False,187 no_error_log: bool = False,188 no_info_log: bool = True,189 no_debug_log: bool = False,190 cwd: Optional[PurePath] = None,191 update_envs: Optional[Dict[str, str]] = None,192 ) -> Process:193 self.initialize()194 if sudo and not self.support_sudo:195 raise LisaException(196 f"node doesn't support [command] or [sudo], cannot execute: {cmd}"197 )198 return self._execute(199 cmd,200 shell=shell,201 sudo=sudo,202 no_error_log=no_error_log,203 no_info_log=no_info_log,204 no_debug_log=no_debug_log,205 cwd=cwd,206 update_envs=update_envs,207 )208 def cleanup(self) -> None:209 self.log.debug("cleaning up...")210 if hasattr(self, "_log_handler") and self._log_handler:211 remove_handler(self._log_handler, self.log)212 self._log_handler.close()213 def close(self) -> None:214 self.log.debug("closing node connection...")215 if self._shell:216 self._shell.close()217 if self._nics:218 self._nics = None219 def get_pure_path(self, path: str) -> PurePath:220 # spurplus doesn't support PurePath, so it needs to resolve by the221 # node's os here.222 if self.is_posix:223 return PurePosixPath(path)224 else:225 return PureWindowsPath(path)226 def get_case_working_path(self, case_unique_name: str) -> PurePath:227 working_path = self.working_path / "tests" / case_unique_name228 self.shell.mkdir(path=working_path, exist_ok=True)229 return working_path230 def capture_system_information(self, name: str = "") -> None:231 """232 download key files or outputs of commands to a subfolder of the node.233 """234 saved_path = self.local_log_path / f"{get_datetime_path()}_captured_{name}"235 saved_path.mkdir(parents=True, exist_ok=True)236 self.log.debug(f"capturing system information to {saved_path}.")237 self.os.capture_system_information(saved_path)238 def find_partition_with_freespace(self, size_in_gb: int) -> str:239 if self.os.is_windows:240 raise NotImplementedError(241 (242 "find_partition_with_freespace was called on a Windows node, "243 "this function is not implemented for Windows"244 )245 )246 mount = self.tools[Mount]247 lsblk = self.tools[Lsblk]248 disks = lsblk.get_disks()249 # find a disk/partition with required space250 for disk in disks:251 # we do not write on the os disk252 if disk.is_os_disk:253 continue254 # if the disk contains partition, check the partitions255 if len(disk.partitions) > 0:256 for partition in disk.partitions:257 if not partition.size_in_gb >= size_in_gb:258 continue259 # mount partition if it is not mounted260 partition_name = partition.name261 if not partition.is_mounted:262 mountpoint = f"{PATH_REMOTE_ROOT}/{partition_name}"263 mount.mount(partition.device_name, mountpoint, format=True)264 else:265 mountpoint = partition.mountpoint266 # some distro use absolute path wrt to the root, so we need to267 # requery the mount point after mounting268 return lsblk.find_mountpoint_by_volume_name(269 partition_name, force_run=True270 )271 else:272 if not disk.size_in_gb >= size_in_gb:273 continue274 # mount the disk if it isn't mounted275 disk_name = disk.name276 if not disk.is_mounted:277 mountpoint = f"{PATH_REMOTE_ROOT}/{disk_name}"278 self.tools[Mkfs].format_disk(disk.device_name, FileSystem.ext4)279 mount.mount(disk.device_name, mountpoint, format=True)280 else:281 mountpoint = disk.mountpoint282 # some distro use absolute path wrt to the root, so we need to requery283 # the mount point after mounting284 return lsblk.find_mountpoint_by_volume_name(disk_name, force_run=True)285 raise LisaException(286 f"No partition with Required disk space of {size_in_gb}GB found"287 )288 def get_working_path(self) -> PurePath:289 """290 It returns the path with expanded environment variables, but not create291 the folder. So, it can be used to locate a relative path from it, and292 not create extra folders.293 """294 raise NotImplementedError()295 def mark_dirty(self) -> None:296 self.log.debug("mark node to dirty")297 self._is_dirty = True298 def test_connection(self) -> bool:299 try:300 self.execute("date")301 return True302 except Exception as identifier:303 self.log.debug(f"cannot access VM {self.name}, error is {identifier}")304 return False305 def _initialize(self, *args: Any, **kwargs: Any) -> None:306 if not hasattr(self, "_log_handler"):307 self._log_handler = create_file_handler(308 self.local_log_path / "node.log", self.log309 )310 self.log.info(f"initializing node '{self.name}' {self}")311 self.shell.initialize()312 self.os: OperatingSystem = OperatingSystem.create(self)313 self.capture_system_information("started")314 def _execute(315 self,316 cmd: str,317 shell: bool = False,318 sudo: bool = False,319 no_error_log: bool = False,320 no_info_log: bool = False,321 no_debug_log: bool = False,322 cwd: Optional[PurePath] = None,323 update_envs: Optional[Dict[str, str]] = None,324 ) -> Process:325 cmd_id = str(randint(0, 10000))326 process = Process(cmd_id, self.shell, parent_logger=self.log)327 process.start(328 cmd,329 shell=shell,330 sudo=sudo,331 no_error_log=no_error_log,332 no_info_log=no_info_log,333 no_debug_log=no_debug_log,334 cwd=cwd,335 update_envs=update_envs,336 )337 return process338 def _get_node_part_path(self) -> PurePath:339 path_name = self.name340 if not path_name:341 if self.index:342 index = self.index343 else:344 index = randint(0, 10000)345 path_name = f"node-{index}"346 return PurePath(path_name)347class RemoteNode(Node):348 def __repr__(self) -> str:349 # it's used to handle UT failure.350 if hasattr(self, "_connection_info"):351 return str(self._connection_info)352 return ""...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!