Best Python code snippet using autotest_python
paramiko_host.py
Source:paramiko_host.py
...166 def close(self):167 super(ParamikoHost, self).close()168 self._close_transport()169 @classmethod170 def _exhaust_stream(cls, tee, output_list, recvfunc):171 while True:172 try:173 output_list.append(recvfunc(cls.BUFFSIZE))174 except socket.timeout:175 return176 tee.write(output_list[-1])177 if not output_list[-1]:178 return179 @classmethod180 def __send_stdin(cls, channel, stdin):181 if not stdin or not channel.send_ready():182 # nothing more to send or just no space to send now183 return184 sent = channel.send(stdin[:cls.BUFFSIZE])185 if not sent:186 logging.warning('Could not send a single stdin byte.')187 else:188 stdin = stdin[sent:]189 if not stdin:190 # no more stdin input, close output direction191 channel.shutdown_write()192 return stdin193 def run(self, command, timeout=3600, ignore_status=False,194 stdout_tee=utils.TEE_TO_LOGS, stderr_tee=utils.TEE_TO_LOGS,195 connect_timeout=30, stdin=None, verbose=True, args=(),196 ignore_timeout=False):197 """198 Run a command on the remote host.199 @see common_lib.hosts.host.run()200 @param connect_timeout: connection timeout (in seconds)201 @param options: string with additional ssh command options202 @param verbose: log the commands203 @param ignore_timeout: bool True command timeouts should be204 ignored. Will return None on command timeout.205 @raises AutoservRunError: if the command failed206 @raises AutoservSSHTimeout: ssh connection has timed out207 """208 stdout = utils.get_stream_tee_file(209 stdout_tee, utils.DEFAULT_STDOUT_LEVEL,210 prefix=utils.STDOUT_PREFIX)211 stderr = utils.get_stream_tee_file(212 stderr_tee, utils.get_stderr_level(ignore_status),213 prefix=utils.STDERR_PREFIX)214 for arg in args:215 command += ' "%s"' % utils.sh_escape(arg)216 if verbose:217 logging.debug("Running (ssh-paramiko) '%s'", command)218 # start up the command219 start_time = time.time()220 try:221 channel = self._open_channel(timeout)222 channel.exec_command(command)223 except (socket.error, paramiko.SSHException, EOFError), e:224 # This has to match the string from paramiko *exactly*.225 if str(e) != 'Channel closed.':226 raise error.AutoservSSHTimeout("ssh failed: %s" % e)227 # pull in all the stdout, stderr until the command terminates228 raw_stdout, raw_stderr = [], []229 timed_out = False230 while not channel.exit_status_ready():231 if channel.recv_ready():232 raw_stdout.append(channel.recv(self.BUFFSIZE))233 stdout.write(raw_stdout[-1])234 if channel.recv_stderr_ready():235 raw_stderr.append(channel.recv_stderr(self.BUFFSIZE))236 stderr.write(raw_stderr[-1])237 if timeout and time.time() - start_time > timeout:238 timed_out = True239 break240 stdin = self.__send_stdin(channel, stdin)241 time.sleep(1)242 if timed_out:243 exit_status = -signal.SIGTERM244 else:245 exit_status = channel.recv_exit_status()246 channel.settimeout(10)247 self._exhaust_stream(stdout, raw_stdout, channel.recv)248 self._exhaust_stream(stderr, raw_stderr, channel.recv_stderr)249 channel.close()250 duration = time.time() - start_time251 # create the appropriate results252 stdout = "".join(raw_stdout)253 stderr = "".join(raw_stderr)254 result = utils.CmdResult(command, stdout, stderr, exit_status,255 duration)256 if exit_status == -signal.SIGHUP:257 msg = "ssh connection unexpectedly terminated"258 raise error.AutoservRunError(msg, result)259 if timed_out:260 logging.warning('Paramiko command timed out after %s sec: %s', timeout,261 command)262 if not ignore_timeout:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!