Best Python code snippet using avocado_python
process.py
Source:process.py
...207 if ' -s' not in sudo_cmd:208 sudo_cmd = '%s -s' % sudo_cmd209 cmd = '%s %s' % (sudo_cmd, cmd)210 return cmd211 def _init_subprocess(self):212 if self._popen is None:213 if self.verbose:214 log.info("Running '%s'", self.cmd)215 if self.shell is False:216 cmd = shlex.split(self.cmd)217 else:218 cmd = self.cmd219 try:220 self._popen = subprocess.Popen(cmd,221 stdout=subprocess.PIPE,222 stderr=subprocess.PIPE,223 shell=self.shell,224 env=self.env)225 except OSError as details:226 details.strerror += " (%s)" % self.cmd227 raise details228 self.start_time = time.time()229 self.stdout_file = StringIO()230 self.stderr_file = StringIO()231 self.stdout_lock = threading.Lock()232 ignore_bg_processes = self._ignore_bg_processes233 self.stdout_thread = threading.Thread(target=self._fd_drainer,234 name="%s-stdout" % self.cmd,235 args=[self._popen.stdout,236 ignore_bg_processes])237 self.stdout_thread.daemon = True238 self.stderr_lock = threading.Lock()239 self.stderr_thread = threading.Thread(target=self._fd_drainer,240 name="%s-stderr" % self.cmd,241 args=[self._popen.stderr,242 ignore_bg_processes])243 self.stderr_thread.daemon = True244 self.stdout_thread.start()245 self.stderr_thread.start()246 def signal_handler(signum, frame):247 self.result.interrupted = "signal/ctrl+c"248 self.wait()249 signal.default_int_handler()250 try:251 signal.signal(signal.SIGINT, signal_handler)252 except ValueError:253 if self.verbose:254 log.info("Command %s running on a thread", self.cmd)255 def _fd_drainer(self, input_pipe, ignore_bg_processes=False):256 """257 Read from input_pipe, storing and logging output.258 :param input_pipe: File like object to the stream.259 """260 stream_prefix = "%s"261 if input_pipe == self._popen.stdout:262 prefix = '[stdout] %s'263 if self.allow_output_check in ['none', 'stderr']:264 stream_logger = None265 else:266 stream_logger = stdout_log267 output_file = self.stdout_file268 lock = self.stdout_lock269 elif input_pipe == self._popen.stderr:270 prefix = '[stderr] %s'271 if self.allow_output_check in ['none', 'stdout']:272 stream_logger = None273 else:274 stream_logger = stderr_log275 output_file = self.stderr_file276 lock = self.stderr_lock277 fileno = input_pipe.fileno()278 bfr = ''279 while True:280 if ignore_bg_processes:281 # Exit if there are no new data and the main process finished282 if (not select.select([fileno], [], [], 1)[0] and283 self.result.exit_status is not None):284 break285 # Don't read unless there are new data available:286 if not select.select([fileno], [], [], 1)[0]:287 continue288 tmp = os.read(fileno, 8192).decode()289 if tmp == '':290 break291 lock.acquire()292 try:293 output_file.write(tmp)294 if self.verbose:295 bfr += tmp296 if tmp.endswith('\n'):297 for line in bfr.splitlines():298 log.debug(prefix, line)299 if stream_logger is not None:300 stream_logger.debug(stream_prefix, line)301 bfr = ''302 finally:303 lock.release()304 # Write the rest of the bfr unfinished by \n305 if self.verbose and bfr:306 for line in bfr.splitlines():307 log.debug(prefix, line)308 if stream_logger is not None:309 stream_logger.debug(stream_prefix, line)310 def _fill_results(self, rc):311 self._init_subprocess()312 self.result.exit_status = rc313 if self.result.duration == 0:314 self.result.duration = time.time() - self.start_time315 if self.verbose:316 log.info("Command '%s' finished with %s after %ss", self.cmd, rc,317 self.result.duration)318 self.result.pid = self._popen.pid319 self._fill_streams()320 def _fill_streams(self):321 """322 Close subprocess stdout and stderr, and put values into result obj.323 """324 # Cleaning up threads325 self.stdout_thread.join()326 self.stderr_thread.join()327 # Clean subprocess pipes and populate stdout/err328 self._popen.stdout.close()329 self._popen.stderr.close()330 self.result.stdout = self.get_stdout()331 self.result.stderr = self.get_stderr()332 def start(self):333 """334 Start running the subprocess.335 This method is particularly useful for background processes, since336 you can start the subprocess and not block your test flow.337 :return: Subprocess PID.338 :rtype: int339 """340 self._init_subprocess()341 return self._popen.pid342 def get_stdout(self):343 """344 Get the full stdout of the subprocess so far.345 :return: Standard output of the process.346 :rtype: str347 """348 self._init_subprocess()349 self.stdout_lock.acquire()350 stdout = self.stdout_file.getvalue()351 self.stdout_lock.release()352 return stdout353 def get_stderr(self):354 """355 Get the full stderr of the subprocess so far.356 :return: Standard error of the process.357 :rtype: str358 """359 self._init_subprocess()360 self.stderr_lock.acquire()361 stderr = self.stderr_file.getvalue()362 self.stderr_lock.release()363 return stderr364 def terminate(self):365 """366 Send a :attr:`signal.SIGTERM` to the process.367 """368 self._init_subprocess()369 self.send_signal(signal.SIGTERM)370 def kill(self):371 """372 Send a :attr:`signal.SIGKILL` to the process.373 """374 self._init_subprocess()375 self.send_signal(signal.SIGKILL)376 def send_signal(self, sig):377 """378 Send the specified signal to the process.379 :param sig: Signal to send.380 """381 self._init_subprocess()382 self._popen.send_signal(sig)383 def poll(self):384 """385 Call the subprocess poll() method, fill results if rc is not None.386 """387 self._init_subprocess()388 rc = self._popen.poll()389 if rc is not None:390 self._fill_results(rc)391 return rc392 def wait(self):393 """394 Call the subprocess poll() method, fill results if rc is not None.395 """396 self._init_subprocess()397 rc = self._popen.wait()398 if rc is not None:399 self._fill_results(rc)400 return rc401 def stop(self):402 """403 Stop background subprocess.404 Call this method to terminate the background subprocess and405 wait for it results.406 """407 self._init_subprocess()408 if self.result.exit_status is None:409 self.terminate()410 return self.wait()411 def get_pid(self):412 """413 Reports PID of this process414 """415 self._init_subprocess()416 return self._popen.pid417 def run(self, timeout=None, sig=signal.SIGTERM):418 """419 Start a process and wait for it to end, returning the result attr.420 If the process was already started using .start(), this will simply421 wait for it to end.422 :param timeout: Time (seconds) we'll wait until the process is423 finished. If it's not, we'll try to terminate it424 and get a status.425 :type timeout: float426 :param sig: Signal to send to the process in case it did not end after427 the specified timeout.428 :type sig: int429 :returns: The command result object.430 :rtype: A :class:`CmdResult` instance.431 """432 def timeout_handler():433 self.send_signal(sig)434 self.result.interrupted = "timeout after %ss" % timeout435 self._init_subprocess()436 if timeout is None:437 self.wait()438 elif timeout > 0.0:439 timer = threading.Timer(timeout, timeout_handler)440 try:441 timer.start()442 self.wait()443 finally:444 timer.cancel()445 if self.result.exit_status is None:446 stop_time = time.time() + 1447 while time.time() < stop_time:448 self.poll()449 if self.result.exit_status is not None:...
002.compute_cascades.py
Source:002.compute_cascades.py
...8from tqdm import tqdm9from s4.cascade.analysis import compute_cascade10from s4.data import open_data11logging.basicConfig(level=logging.WARNING, format='[%(levelname)s]: %(message)s')12def _init_subprocess(o_icsd, use_temperature):13 _init_subprocess.only_icsd = o_icsd14 _init_subprocess.use_temperature = use_temperature15 sys.stdout = open(os.devnull, 'w')16 sys.stderr = open(os.devnull, 'w')17def _compute_cascade_for_one(args):18 k, d = args19 temperatures = _init_subprocess.use_temperature20 results = []21 for j, reaction in enumerate(d.reactions):22 per_reaction = {}23 for i, temp in enumerate([d.exp_t] + temperatures):24 furnace = [temp + 273.15] * 1025 try:26 step_info = compute_cascade(reaction, furnace, only_icsd=_init_subprocess.only_icsd)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!