Best Python code snippet using lisa_python
scope.py
Source:scope.py
...301 # if not platform.is_absent_index(prev_local_idx):302 # continue303 refs._set_refs(static_ref_id, static_ref)304 return refs305 def _create_exports(self):306 if plist.is_empty(self.__declared_exports):307 # somewhat dirty hacks here for auto export308 # cant think of anything better unfortunately309 skip_start = lang_names.SKIP_ON_AUTO_EXPORT_START310 skip_middle = lang_names.SKIP_ON_AUTO_EXPORT_MIDDLE311 syms = []312 keys = self.__locals.keys_list()313 for key in keys:314 keyval = key.string315 # avoid exporting _name but enable _name_316 if keyval.startswith_s(skip_start) and not keyval.endswith_s(skip_start):317 continue318 index_colon = keyval.index_s(skip_middle)319 # so operators like := will be exported but names arza:foo will not320 if index_colon > 1:321 continue322 syms.append(key)323 return space.newlist(syms)324 for exported in self.__declared_exports:325 if not self.has_local(exported):326 error.throw_2(error.Errors.EXPORT_ERROR, space.newstring(u"Unreachable export variable"), exported)327 return self.__declared_exports328 def finalize(self, previous_scope, parse_scope):329 if parse_scope is not None:330 self.add_operators(parse_scope.operators.to_list())331 self.exports = self._create_exports()332 self.references = self._create_references(previous_scope)333 return self334 def create_references(self):335 if self.references is None:336 return None337 return api.clone(self.references)338 def create_operators(self):339 if self.__operators is None:340 return None341 return api.clone(self.__operators)342 def create_env_bindings(self):343 return api.clone(self.__locals)344 def create_temporaries(self):345 if self.__temp_index == 0:...
process.py
Source:process.py
...156 else:157 split_command = []158 if sudo:159 split_command += ["sudo"]160 envs = _create_exports(update_envs=update_envs)161 if envs:162 command = f"{envs} {command}"163 split_command += ["sh", "-c", command]164 # expand variables in posix mode165 update_envs.clear()166 else:167 if sudo and self._is_posix:168 command = f"sudo {command}"169 try:170 split_command = shlex.split(command, posix=self._is_posix)171 except Exception as identifier:172 raise LisaException(f"failed on split command: {command}: {identifier}")173 return split_command174 def wait_result(175 self,176 timeout: float = 600,177 expected_exit_code: Optional[int] = None,178 expected_exit_code_failure_message: str = "",179 ) -> ExecutableResult:180 timer = create_timer()181 is_timeout = False182 while self.is_running() and timeout >= timer.elapsed(False):183 time.sleep(0.01)184 if timeout < timer.elapsed():185 if self._process is not None:186 self._log.info(f"timeout in {timeout} sec, and killed")187 self.kill()188 is_timeout = True189 if self._result is None:190 assert self._process191 if is_timeout:192 process_result = spur.results.result(193 return_code=1, allow_error=True, output="", stderr_output=""194 )195 else:196 process_result = self._process.wait_for_result()197 if not self._is_posix and self._shell.is_remote:198 # special handle remote windows. There are extra control chars199 # and on extra line at the end.200 # remove extra controls in remote Windows201 process_result.output = filter_ansi_escape(process_result.output)202 process_result.stderr_output = filter_ansi_escape(203 process_result.stderr_output204 )205 self._stdout_writer.close()206 self._stderr_writer.close()207 # cache for future queries, in case it's queried twice.208 self._result = ExecutableResult(209 process_result.output.strip(),210 process_result.stderr_output.strip(),211 process_result.return_code,212 self._cmd,213 self._timer.elapsed(),214 )215 self._recycle_resource()216 self._log.debug(217 f"execution time: {self._timer}, exit code: {self._result.exit_code}"218 )219 if expected_exit_code is not None:220 self._result.assert_exit_code(221 expected_exit_code=expected_exit_code,222 message=expected_exit_code_failure_message,223 )224 if self._is_posix and self._sudo:225 self._result.stdout = self._filter_sudo_result(self._result.stdout)226 return self._result227 def kill(self) -> None:228 if self._process:229 self._log.debug(f"Killing process : {self._id_}")230 try:231 if self._shell.is_remote:232 # Support remote Posix so far233 self._process.send_signal(9)234 else:235 # local process should use the compiled value236 # the value is different between windows and posix237 self._process.send_signal(signal.SIGTERM)238 except Exception as identifier:239 self._log.debug(f"failed on killing process: {identifier}")240 def is_running(self) -> bool:241 if self._running and self._process:242 self._running = self._process.is_running()243 return self._running244 def wait_output(245 self,246 keyword: str,247 timeout: int = 300,248 error_on_missing: bool = True,249 interval: int = 1,250 ) -> None:251 # check if stdout buffers contain the string "keyword" to determine if252 # it is running253 start_time = time.time()254 while time.time() - start_time < timeout:255 # LogWriter only flushes if "\n" is written, so we need to flush256 # manually.257 self._stdout_writer.flush()258 # check if buffer contains the keyword259 if keyword in self._log_buffer.getvalue():260 return261 time.sleep(interval)262 if error_on_missing:263 raise LisaException(264 f"{keyword} not found in stdout after {timeout} seconds"265 )266 else:267 self._log.debug(268 f"not found '{keyword}' in {timeout} seconds, but ignore it."269 )270 def _recycle_resource(self) -> None:271 # TODO: The spur library is not very good and leaves open272 # resources (probably due to it starting the process with273 # `bufsize=0`). We need to replace it, but for now, we274 # manually close the leaks.275 if isinstance(self._process, spur.local.LocalProcess):276 popen: subprocess.Popen[str] = self._process._subprocess277 if popen.stdin:278 popen.stdin.close()279 if popen.stdout:280 popen.stdout.close()281 if popen.stderr:282 popen.stderr.close()283 elif isinstance(self._process, spur.ssh.SshProcess):284 if self._process._stdin:285 self._process._stdin.close()286 if self._process._stdout:287 self._process._stdout.close()288 if self._process._stderr:289 self._process._stderr.close()290 self._process = None291 def _filter_sudo_result(self, raw_input: str) -> str:292 # this warning message may break commands, so remove it from the first line293 # of standard output.294 if raw_input.startswith("sudo: unable to resolve host"):295 lines = raw_input.splitlines(keepends=True)296 raw_input = "".join(lines[1:])297 self._log.debug(f'found error message in sudo: "{lines[0]}"')298 return raw_input299def _create_exports(update_envs: Dict[str, str]) -> str:300 result: str = ""301 for key, value in update_envs.items():302 value = value.replace('"', '\\"')303 result += f'export {key}="{value}";'...
unfs.py
Source:unfs.py
...7 def __init__(self, unfs_path, mount_dir):8 self.unfs = None9 self.unfs_path = unfs_path10 self.mount_dir = mount_dir11 self.exports = self._create_exports()12 def _create_exports(self):13 options = [14 self.mount_dir,15 '(rw,no_root_squash,no_all_squash,insecure)\n'16 ]17 with tempfile.NamedTemporaryFile(delete=False, mode="w") as exports:18 exports.write(' '.join(options))19 return exports.name20 def stop(self):21 if self.unfs is not None:22 self.unfs.terminate()23 os.unlink(self.exports)24 def serve(self):25 nfsport, mountport = get_free_port(udp=True), get_free_port(udp=True)26 args = [...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!