Best Python code snippet using tempest_python
ssh.py
Source:ssh.py
...44 ssh = paramiko.SSHClient()45 ssh.set_missing_host_key_policy(46 paramiko.AutoAddPolicy())47 _start_time = time.time()48 while not self._is_timed_out(_start_time):49 try:50 ssh.connect(self.host, username=self.username,51 password=self.password,52 look_for_keys=self.look_for_keys,53 key_filename=self.key_filename,54 timeout=self.channel_timeout, pkey=self.pkey)55 _timeout = False56 break57 except (socket.error,58 paramiko.AuthenticationException,59 paramiko.SSHException):60 time.sleep(bsleep)61 bsleep *= backoff62 continue63 if _timeout:64 raise exceptions.SSHTimeout(host=self.host,65 user=self.username,66 password=self.password)67 return ssh68 def _is_timed_out(self, start_time):69 return (time.time() - self.timeout) > start_time70 def connect_until_closed(self):71 """Connect to the server and wait until connection is lost."""72 try:73 ssh = self._get_ssh_connection()74 _transport = ssh.get_transport()75 _start_time = time.time()76 _timed_out = self._is_timed_out(_start_time)77 while _transport.is_active() and not _timed_out:78 time.sleep(5)79 _timed_out = self._is_timed_out(_start_time)80 ssh.close()81 except (EOFError, paramiko.AuthenticationException, socket.error):82 return83 def exec_command(self, cmd):84 """85 Execute the specified command on the server.86 Note that this method is reading whole command outputs to memory, thus87 shouldn't be used for large outputs.88 :returns: data read from standard output of the command.89 :raises: SSHExecCommandFailed if command returns nonzero90 status. The exception contains command status stderr content.91 """92 ssh = self._get_ssh_connection()93 transport = ssh.get_transport()94 channel = transport.open_session()95 channel.fileno() # Register event pipe96 channel.exec_command(cmd)97 channel.shutdown_write()98 out_data = []99 err_data = []100 poll = select.poll()101 poll.register(channel, select.POLLIN)102 start_time = time.time()103 while True:104 ready = poll.poll(self.channel_timeout)105 if not any(ready):106 if not self._is_timed_out(start_time):107 continue108 raise exceptions.TimeoutException(109 "Command: '{0}' executed on host '{1}'.".format(110 cmd, self.host))111 if not ready[0]: # If there is nothing to read.112 continue113 out_chunk = err_chunk = None114 if channel.recv_ready():115 out_chunk = channel.recv(self.buf_size)116 out_data += out_chunk,117 if channel.recv_stderr_ready():118 err_chunk = channel.recv_stderr(self.buf_size)119 err_data += err_chunk,120 if channel.closed and not err_chunk and not out_chunk:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!