Best Python code snippet using autotest_python
remote.py
Source:remote.py
...10from autotest.server import utils11from autotest.server.hosts import base_classes, install_server12class InstallServerUnavailable(Exception):13 pass14def get_install_server_info():15 server_info = {}16 settings.parse_config_file()17 for option, value in settings.config.items('INSTALL_SERVER'):18 server_info[option] = value19 return server_info20def install_server_is_configured():21 server_info = get_install_server_info()22 if server_info.get('xmlrpc_url', None):23 return True24 return False25class RemoteHost(base_classes.Host):26 """27 This class represents a remote machine on which you can run28 programs.29 It may be accessed through a network, a serial line, ...30 It is not the machine autoserv is running on.31 Implementation details:32 This is an abstract class, leaf subclasses must implement the methods33 listed here and in parent classes which have no implementation. They34 may reimplement methods which already have an implementation. You35 must not instantiate this class but should instantiate one of those36 leaf subclasses.37 """38 DEFAULT_REBOOT_TIMEOUT = base_classes.Host.DEFAULT_REBOOT_TIMEOUT39 LAST_BOOT_TAG = object()40 DEFAULT_HALT_TIMEOUT = 2 * 6041 VAR_LOG_MESSAGES_COPY_PATH = "/var/tmp/messages.autotest_start"42 VAR_LOG_MESSAGES_PATHS = ["/var/log/messages", "/var/log/syslog"]43 INSTALL_SERVER_MAPPING = {'cobbler': install_server.CobblerInterface}44 def _initialize(self, hostname, autodir=None, profile='',45 *args, **dargs):46 super(RemoteHost, self)._initialize(*args, **dargs)47 self.hostname = hostname48 self.autodir = autodir49 self.profile = profile50 self.tmp_dirs = []51 def __repr__(self):52 return "<remote host: %s, profile: %s>" % (self.hostname,53 self.profile)54 def close(self):55 super(RemoteHost, self).close()56 self.stop_loggers()57 if hasattr(self, 'tmp_dirs'):58 for dir in self.tmp_dirs:59 try:60 self.run('rm -rf "%s"' % (utils.sh_escape(dir)))61 except error.AutoservRunError:62 pass63 def machine_install(self, profile='', timeout=None):64 """65 Install a profile using the install server.66 :param profile: Profile name inside the install server database.67 """68 if timeout is None:69 timeout = settings.get_value('INSTALL_SERVER',70 'default_install_timeout',71 type=int,72 default=3600)73 server_info = get_install_server_info()74 if install_server_is_configured():75 if not profile:76 profile = self.profile77 if profile in ['Do_not_install', 'N/A']:78 return79 num_attempts = int(server_info.get('num_attempts', 2))80 ServerInterface = self.INSTALL_SERVER_MAPPING[server_info['type']]81 end_time = time.time() + (timeout / 10)82 step = int(timeout / 100)83 server_interface = None84 while time.time() < end_time:85 try:86 server_interface = ServerInterface(**server_info)87 break88 except socket.error:89 logging.error('Install server unavailable. Trying '90 'again in %s s...', step)91 time.sleep(step)92 if server_interface is None:93 raise InstallServerUnavailable("%s install server at (%s) "94 "unavailable. Tried to "95 "communicate for %s s" %96 (server_info['type'],97 server_info['xmlrpc_url'],98 timeout / 10))99 server_interface.install_host(self, profile=profile,100 timeout=timeout,101 num_attempts=num_attempts)102 def hardreset(self, timeout=DEFAULT_REBOOT_TIMEOUT, wait=True,103 num_attempts=1, halt=False, **wait_for_restart_kwargs):104 """105 Reboot the machine using the install server.106 :params timeout: timelimit in seconds before the machine is107 considered unreachable108 :params wait: Whether or not to wait for the machine to reboot109 :params num_attempts: Number of times to attempt hard reset erroring110 on the last attempt.111 :params halt: Halts the machine before hardresetting.112 :params **wait_for_restart_kwargs: keyword arguments passed to113 wait_for_restart()114 """115 server_info = get_install_server_info()116 if server_info.get('xmlrpc_url', None) is not None:117 ServerInterface = self.INSTALL_SERVER_MAPPING[server_info['type']]118 server_interface = ServerInterface(**server_info)119 try:120 old_boot_id = self.get_boot_id()121 except error.AutoservSSHTimeout:122 old_boot_id = 'unknown boot_id prior to RemoteHost.hardreset'123 def reboot():124 power_state = "reboot"125 if halt:126 self.halt()127 power_state = "on"128 server_interface.power_host(host=self, state=power_state)129 self.record("GOOD", None, "reboot.start", "hard reset")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!