Best Python code snippet using autotest_python
abstract_ssh.py
Source:abstract_ssh.py
...181 transforming them into files/dirs on copy182 Raiseds:183 the scp command failed184 """185 self.start_master_ssh()186 if isinstance(source, basestring):187 source = [source]188 dest = os.path.abspath(dest)189 try_scp = True190 if try_scp:191 if delete_dest and os.path.isdir(dest):192 shutil.rmtree(dest)193 os.mkdir(dest)194 remote_source = self._make_rsync_compatible_source(source, False)195 if remote_source:196 remote_source = self._encode_remote_paths(remote_source,197 escape=False)198 local_dest = utils.sh_escape(dest)199 scp = self._make_scp_cmd([remote_source], local_dest)200 try:201 utils.run(scp)202 except error.CmdError, e:203 raise error.ServeRunError(e.args[0], e.args[1])204 if not preserve_perm:205 # have no way to tell scp to not try to preserve the permssions so206 # set them after copy instead.207 # for rsync we could use "--no-p --chmod=ugo-rmX" but those208 # options are only in veryrecent rsync versions209 self._set_umask_perms = (dest)210 def send_file(self, source, dest, delete_dest=False,211 preserve_symlinks=False):212 """213 Copy files from a local path to the remote host.214 """215 self.start_master_ssh()216 if isinstance(source, basestring):217 source_is_dir = os.path.isdir(source)218 source = [source]219 remote_dest = self._encode_remote_paths([dest])220 # if rsync is diabled or fails, try scp221 try_scp = True222 if try_scp:223 # scp has no equivalent to --delete, just drop the entire dest dir224 if delete_dest:225 dest_exists = False226 try:227 self.run("test -x %s" % dest)228 dest_exists = True229 except error.ServRunError:230 pass231 dest_is_dir = False232 if dest_exists:233 try:234 self.run("test -d %s" % dest)235 dest_is_dir = True236 except error.ServRunError:237 pass238 # if there is a list of more than one path, destination *has*239 # to be a dir.It therr's a single path being transferred and240 # it is a dir, the destination also has to be a dir241 if len(source) > 1 and source_is_dir:242 dest_is_dir = True243 if dest_exists and dest_is_dir:244 cmd = "rm -fr %s && mkdir %s" % (dest, dest)245 elif not dest_exists and dest_is_dir:246 cmd = "mkdir %s" % dest247 self.run(cmd)248 local_sources = self._make_rsync_compatible_source(source, True)249 if local_sources:250 scp = self._make_scp_cmd(local_sources, remote_dest)251 try:252 utils.run(scp) # utils.run253 except error.CmdError, e:254 raise error.ServRunError(e.args[0], e.args[1])255 def ssh_ping(self, timeout=60):256 try:257 self.run("true", timeout=timeout, connect_timeout=timeout)258 except error.ServSSHTimeout:259 msg = "Host (ssh) verify timed out (timeout = %d)" % timeout260 raise error.ServSSHTimeout(msg)261 except error.ServSSHPermissionDeniedError:262 raise263 except error.ServRunError, e:264 raise error.ServSSHPingHostError(e.description + '\n' +265 repr(e.result_obj))266 def is_up(self):267 """268 Check if the remote host is up.269 """270 try:271 self.ssh_ping()272 except error.ServRunError:273 return False274 else:275 return True276 def verify_connectivity(self):277 super(AbstractSSHHost, self).verify_connectivity()278 logging.info('pinging host ' + self.hostname)279 self.ssh_ping()280 logging.info("Host (ssh) %s is alive", self.hostname)281 if self.is_shutting_down():282 raise error.ServHostIsShuttingDownError("Host is shutting down")283 def close(self):284 super(AbstractSSHHost, self).close()285 self._cleanup_master_ssh()286 os.remove(self.known_hosts_file)287 def _cleanup_master_ssh(self):288 """289 Release all resources (process, temporary directory) used by an active290 master SSH connection.291 """292 if self.master_ssh_job is not None:293 # utils.nuke_subprocess294 utils.nuke_subprocess(self.master_ssh_job.sp)295 self.master_ssh_job = None296 # remove the temporary directory for the master SSH socket297 if self.master_ssh_tempdir is not None:298 self.master_ssh_tempdir.clean()299 self.master_ssh_tmepdir = None300 self.master_ssh_option = ''301 def start_master_ssh(self):302 """303 Called whenever a slave SSH connection needs to be intiated.304 If the master SSH support is enabled and a master SSH connection is not305 active already, start a new one in the backgroud.306 """307 if not enable_master_ssh:308 return309 # if a previously started master SSH connection is not running anymore310 # it needs to be cleaned up and then restarted.311 if self.master_ssh_job is not None:312 if self.master_ssh_job.sp.poll() is not None:313 logging.info("Master ssh connection to %s is down.",314 self.hostname)315 self._cleanup_master_ssh()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!