Best Python code snippet using autotest_python
host.py
Source:host.py
...38 self.topic_parse_info = topic_common.item_parse_info(39 attribute_name='hosts',40 filename_option='mlist',41 use_leftover=True)42 def _parse_lock_options(self, options):43 if options.lock and options.unlock:44 self.invalid_syntax('Only specify one of '45 '--lock and --unlock.')46 if options.lock:47 self.data['locked'] = True48 self.messages.append('Locked host')49 elif options.unlock:50 self.data['locked'] = False51 self.messages.append('Unlocked host')52 def _cleanup_labels(self, labels, platform=None):53 """Removes the platform label from the overall labels"""54 if platform:55 return [label for label in labels56 if label != platform]57 else:58 try:59 return [label for label in labels60 if not label['platform']]61 except TypeError:62 # This is a hack - the server will soon63 # do this, so all this code should be removed.64 return labels65 def get_items(self):66 return self.hosts67class host_help(host):68 """Just here to get the atest logic working.69 Usage is set by its parent"""70 pass71class host_list(action_common.atest_list, host):72 """atest host list [--mlist <file>|<hosts>] [--label <label>]73 [--status <status1,status2>] [--acl <ACL>] [--user <user>]"""74 def __init__(self):75 super(host_list, self).__init__()76 self.parser.add_option('-b', '--label',77 default='',78 help='Only list hosts with all these labels '79 '(comma separated)')80 self.parser.add_option('-s', '--status',81 default='',82 help='Only list hosts with any of these '83 'statuses (comma separated)')84 self.parser.add_option('-a', '--acl',85 default='',86 help='Only list hosts within this ACL')87 self.parser.add_option('-u', '--user',88 default='',89 help='Only list hosts available to this user')90 self.parser.add_option('-N', '--hostnames-only', help='Only return '91 'hostnames for the machines queried.',92 action='store_true')93 self.parser.add_option('--locked',94 default=False,95 help='Only list locked hosts',96 action='store_true')97 self.parser.add_option('--unlocked',98 default=False,99 help='Only list unlocked hosts',100 action='store_true')101 def parse(self):102 """Consume the specific options"""103 label_info = topic_common.item_parse_info(attribute_name='labels',104 inline_option='label')105 (options, leftover) = super(host_list, self).parse([label_info])106 self.status = options.status107 self.acl = options.acl108 self.user = options.user109 self.hostnames_only = options.hostnames_only110 if options.locked and options.unlocked:111 self.invalid_syntax('--locked and --unlocked are '112 'mutually exclusive')113 self.locked = options.locked114 self.unlocked = options.unlocked115 return (options, leftover)116 def execute(self):117 filters = {}118 check_results = {}119 if self.hosts:120 filters['hostname__in'] = self.hosts121 check_results['hostname__in'] = 'hostname'122 if self.labels:123 if len(self.labels) == 1:124 # This is needed for labels with wildcards (x86*)125 filters['labels__name__in'] = self.labels126 check_results['labels__name__in'] = None127 else:128 filters['multiple_labels'] = self.labels129 check_results['multiple_labels'] = None130 if self.status:131 statuses = self.status.split(',')132 statuses = [status.strip() for status in statuses133 if status.strip()]134 filters['status__in'] = statuses135 check_results['status__in'] = None136 if self.acl:137 filters['aclgroup__name'] = self.acl138 check_results['aclgroup__name'] = None139 if self.user:140 filters['aclgroup__users__login'] = self.user141 check_results['aclgroup__users__login'] = None142 if self.locked or self.unlocked:143 filters['locked'] = self.locked144 check_results['locked'] = None145 return super(host_list, self).execute(op='get_hosts',146 filters=filters,147 check_results=check_results)148 def output(self, results):149 if results:150 # Remove the platform from the labels.151 for result in results:152 result['labels'] = self._cleanup_labels(result['labels'],153 result['platform'])154 if self.hostnames_only:155 self.print_list(results, key='hostname')156 else:157 super(host_list, self).output(results, keys=['hostname', 'status',158 'locked', 'platform', 'labels'])159class host_stat(host):160 """atest host stat --mlist <file>|<hosts>"""161 usage_action = 'stat'162 def execute(self):163 results = []164 # Convert wildcards into real host stats.165 existing_hosts = []166 for host in self.hosts:167 if host.endswith('*'):168 stats = self.execute_rpc('get_hosts',169 hostname__startswith=host.rstrip('*'))170 if len(stats) == 0:171 self.failure('No hosts matching %s' % host, item=host,172 what_failed='Failed to stat')173 continue174 else:175 stats = self.execute_rpc('get_hosts', hostname=host)176 if len(stats) == 0:177 self.failure('Unknown host %s' % host, item=host,178 what_failed='Failed to stat')179 continue180 existing_hosts.extend(stats)181 for stat in existing_hosts:182 host = stat['hostname']183 # The host exists, these should succeed184 acls = self.execute_rpc('get_acl_groups', hosts__hostname=host)185 labels = self.execute_rpc('get_labels', host__hostname=host)186 results.append([[stat], acls, labels])187 return results188 def output(self, results):189 for stats, acls, labels in results:190 print '-' * 5191 self.print_fields(stats,192 keys=['hostname', 'platform',193 'status', 'locked', 'locked_by',194 'lock_time', 'protection', ])195 self.print_by_ids(acls, 'ACLs', line_before=True)196 labels = self._cleanup_labels(labels)197 self.print_by_ids(labels, 'Labels', line_before=True)198class host_jobs(host):199 """atest host jobs [--max-query] --mlist <file>|<hosts>"""200 usage_action = 'jobs'201 def __init__(self):202 super(host_jobs, self).__init__()203 self.parser.add_option('-q', '--max-query',204 help='Limits the number of results '205 '(20 by default)',206 type='int', default=20)207 def parse(self):208 """Consume the specific options"""209 (options, leftover) = super(host_jobs, self).parse()210 self.max_queries = options.max_query211 return (options, leftover)212 def execute(self):213 results = []214 real_hosts = []215 for host in self.hosts:216 if host.endswith('*'):217 stats = self.execute_rpc('get_hosts',218 hostname__startswith=host.rstrip('*'))219 if len(stats) == 0:220 self.failure('No host matching %s' % host, item=host,221 what_failed='Failed to stat')222 [real_hosts.append(stat['hostname']) for stat in stats]223 else:224 real_hosts.append(host)225 for host in real_hosts:226 queue_entries = self.execute_rpc('get_host_queue_entries',227 host__hostname=host,228 query_limit=self.max_queries,229 sort_by=['-job__id'])230 jobs = []231 for entry in queue_entries:232 job = {'job_id': entry['job']['id'],233 'job_owner': entry['job']['owner'],234 'job_name': entry['job']['name'],235 'status': entry['status']}236 jobs.append(job)237 results.append((host, jobs))238 return results239 def output(self, results):240 for host, jobs in results:241 print '-' * 5242 print 'Hostname: %s' % host243 self.print_table(jobs, keys_header=['job_id',244 'job_owner',245 'job_name',246 'status'])247class host_mod(host):248 """atest host mod --lock|--unlock|--protection249 --mlist <file>|<hosts>"""250 usage_action = 'mod'251 def __init__(self):252 """Add the options specific to the mod action"""253 self.data = {}254 self.messages = []255 super(host_mod, self).__init__()256 self.parser.add_option('-l', '--lock',257 help='Lock hosts',258 action='store_true')259 self.parser.add_option('-u', '--unlock',260 help='Unlock hosts',261 action='store_true')262 self.parser.add_option('-p', '--protection', type='choice',263 help=('Set the protection level on a host. '264 'Must be one of: %s' %265 ', '.join('"%s"' % p266 for p in self.protections)),267 choices=self.protections)268 def parse(self):269 """Consume the specific options"""270 (options, leftover) = super(host_mod, self).parse()271 self._parse_lock_options(options)272 if options.protection:273 self.data['protection'] = options.protection274 self.messages.append('Protection set to "%s"' % options.protection)275 if len(self.data) == 0:276 self.invalid_syntax('No modification requested')277 return (options, leftover)278 def execute(self):279 successes = []280 for host in self.hosts:281 try:282 res = self.execute_rpc('modify_host', item=host,283 id=host, **self.data)284 # TODO: Make the AFE return True or False,285 # especially for lock286 successes.append(host)287 except topic_common.CliError, full_error:288 # Already logged by execute_rpc()289 pass290 return successes291 def output(self, hosts):292 for msg in self.messages:293 self.print_wrapped(msg, hosts)294class host_create(host):295 """atest host create [--lock|--unlock --platform <arch>296 --labels <labels>|--blist <label_file>297 --acls <acls>|--alist <acl_file>298 --protection <protection_type>299 --mlist <mach_file>] <hosts>"""300 usage_action = 'create'301 def __init__(self):302 self.messages = []303 super(host_create, self).__init__()304 self.parser.add_option('-l', '--lock',305 help='Create the hosts as locked',306 action='store_true', default=False)307 self.parser.add_option('-u', '--unlock',308 help='Create the hosts as '309 'unlocked (default)',310 action='store_true')311 self.parser.add_option('-t', '--platform',312 help='Sets the platform label')313 self.parser.add_option('-b', '--labels',314 help='Comma separated list of labels')315 self.parser.add_option('-B', '--blist',316 help='File listing the labels',317 type='string',318 metavar='LABEL_FLIST')319 self.parser.add_option('-a', '--acls',320 help='Comma separated list of ACLs')321 self.parser.add_option('-A', '--alist',322 help='File listing the acls',323 type='string',324 metavar='ACL_FLIST')325 self.parser.add_option('-p', '--protection', type='choice',326 help=('Set the protection level on a host. '327 'Must be one of: %s' %328 ', '.join('"%s"' % p329 for p in self.protections)),330 choices=self.protections)331 def parse(self):332 label_info = topic_common.item_parse_info(attribute_name='labels',333 inline_option='labels',334 filename_option='blist')335 acl_info = topic_common.item_parse_info(attribute_name='acls',336 inline_option='acls',337 filename_option='alist')338 (options, leftover) = super(host_create, self).parse([label_info,339 acl_info],340 req_items='hosts')341 self._parse_lock_options(options)342 self.locked = options.lock343 self.platform = getattr(options, 'platform', None)344 if options.protection:345 self.data['protection'] = options.protection346 return (options, leftover)347 def _execute_add_one_host(self, host):348 # Always add the hosts as locked to avoid the host349 # being picked up by the scheduler before it's ACL'ed350 self.data['locked'] = True351 self.execute_rpc('add_host', hostname=host,352 status="Ready", **self.data)353 # Now add the platform label354 labels = self.labels[:]355 if self.platform:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!