Best Python code snippet using autotest_python
resources.py
Source:resources.py
...48class Label(EntryWithInvalid):49 model = models.Label50 @classmethod51 def add_query_selectors(cls, query_processor):52 query_processor.add_field_selector('name')53 query_processor.add_field_selector(54 'is_platform', field='platform',55 value_transform=query_processor.read_boolean)56 @classmethod57 def from_uri_args(cls, request, label_name, **kwargs):58 return cls(request, models.Label.objects.get(name=label_name))59 def _uri_args(self):60 return {'label_name': self.instance.name}61 def short_representation(self):62 rep = super(Label, self).short_representation()63 rep.update({'name': self.instance.name,64 'is_platform': bool(self.instance.platform)})65 return rep66 def full_representation(self):67 rep = super(Label, self).full_representation()68 atomic_group_class = AtomicGroupClass.from_optional_instance(69 self._request, self.instance.atomic_group)70 rep.update({'atomic_group_class':71 atomic_group_class.short_representation(),72 'hosts': HostLabelingCollection(fixed_entry=self).link()})73 return rep74 @classmethod75 def create_instance(cls, input_dict, containing_collection):76 cls._check_for_required_fields(input_dict, ('name',))77 return models.Label.add_object(name=input_dict['name'])78 def update(self, input_dict):79 # TODO update atomic group80 if 'is_platform' in input_dict:81 self.instance.platform = input_dict['is_platform']82 self.instance.save()83class LabelCollection(resource_lib.Collection):84 queryset = models.Label.valid_objects.all()85 entry_class = Label86class AtomicLabelTagging(resource_lib.Relationship):87 related_classes = {'label': Label, 'atomic_group_class': AtomicGroupClass}88class AtomicLabelTaggingCollection(resource_lib.RelationshipCollection):89 entry_class = AtomicLabelTagging90class User(resource_lib.InstanceEntry):91 model = models.User92 _permitted_methods = ('GET,')93 @classmethod94 def from_uri_args(cls, request, username, **kwargs):95 if username == '@me':96 username = models.User.current_user().login97 return cls(request, models.User.objects.get(login=username))98 def _uri_args(self):99 return {'username': self.instance.login}100 def short_representation(self):101 rep = super(User, self).short_representation()102 rep['username'] = self.instance.login103 return rep104 def full_representation(self):105 rep = super(User, self).full_representation()106 accessible_hosts = HostCollection(self._request)107 accessible_hosts.set_query_parameters(accessible_by=self.instance.login)108 rep.update({'jobs': 'TODO',109 'recurring_runs': 'TODO',110 'acls':111 UserAclMembershipCollection(fixed_entry=self).link(),112 'accessible_hosts': accessible_hosts.link()})113 return rep114class UserCollection(resource_lib.Collection):115 _permitted_methods = ('GET',)116 queryset = models.User.objects.all()117 entry_class = User118class Acl(resource_lib.InstanceEntry):119 _permitted_methods = ('GET',)120 model = models.AclGroup121 @classmethod122 def from_uri_args(cls, request, acl_name, **kwargs):123 return cls(request, models.AclGroup.objects.get(name=acl_name))124 def _uri_args(self):125 return {'acl_name': self.instance.name}126 def short_representation(self):127 rep = super(Acl, self).short_representation()128 rep['name'] = self.instance.name129 return rep130 def full_representation(self):131 rep = super(Acl, self).full_representation()132 rep.update({'users':133 UserAclMembershipCollection(fixed_entry=self).link(),134 'hosts':135 HostAclMembershipCollection(fixed_entry=self).link()})136 return rep137 @classmethod138 def create_instance(cls, input_dict, containing_collection):139 cls._check_for_required_fields(input_dict, ('name',))140 return models.AclGroup.add_object(name=input_dict['name'])141 def update(self, input_dict):142 pass143class AclCollection(resource_lib.Collection):144 queryset = models.AclGroup.objects.all()145 entry_class = Acl146class UserAclMembership(resource_lib.Relationship):147 related_classes = {'user': User, 'acl': Acl}148 # TODO: check permissions149 # TODO: check for and add/remove "Everyone"150class UserAclMembershipCollection(resource_lib.RelationshipCollection):151 entry_class = UserAclMembership152class Host(EntryWithInvalid):153 model = models.Host154 @classmethod155 def add_query_selectors(cls, query_processor):156 query_processor.add_field_selector('hostname')157 query_processor.add_field_selector(158 'locked', value_transform=query_processor.read_boolean)159 query_processor.add_field_selector(160 'locked_by', field='locked_by__login',161 doc='Username of user who locked this host, if locked')162 query_processor.add_field_selector('status')163 query_processor.add_field_selector(164 'protection_level', field='protection',165 doc='Verify/repair protection level',166 value_transform=cls._read_protection)167 query_processor.add_field_selector(168 'accessible_by', field='aclgroup__users__login',169 doc='Username of user with access to this host')170 query_processor.add_related_existence_selector(171 'has_label', models.Label, 'name')172 @classmethod173 def _read_protection(cls, protection_input):174 return host_protections.Protection.get_value(protection_input)175 @classmethod176 def from_uri_args(cls, request, hostname, **kwargs):177 return cls(request, models.Host.objects.get(hostname=hostname))178 def _uri_args(self):179 return {'hostname': self.instance.hostname}180 def short_representation(self):181 rep = super(Host, self).short_representation()182 # TODO calling platform() over and over is inefficient183 platform_rep = (Label.from_optional_instance(self._request,184 self.instance.platform())185 .short_representation())186 rep.update({'hostname': self.instance.hostname,187 'locked': bool(self.instance.locked),188 'status': self.instance.status,189 'platform': platform_rep})190 return rep191 def full_representation(self):192 rep = super(Host, self).full_representation()193 protection = host_protections.Protection.get_string(194 self.instance.protection)195 locked_by = (User.from_optional_instance(self._request,196 self.instance.locked_by)197 .short_representation())198 labels = HostLabelingCollection(fixed_entry=self)199 acls = HostAclMembershipCollection(fixed_entry=self)200 queue_entries = QueueEntryCollection(self._request)201 queue_entries.set_query_parameters(host=self.instance.hostname)202 health_tasks = HealthTaskCollection(self._request)203 health_tasks.set_query_parameters(host=self.instance.hostname)204 rep.update({'locked_by': locked_by,205 'locked_on': self._format_datetime(self.instance.lock_time),206 'invalid': self.instance.invalid,207 'protection_level': protection,208 # TODO make these efficient209 'labels': labels.full_representation(),210 'acls': acls.full_representation(),211 'queue_entries': queue_entries.link(),212 'health_tasks': health_tasks.link()})213 return rep214 @classmethod215 def create_instance(cls, input_dict, containing_collection):216 cls._check_for_required_fields(input_dict, ('hostname',))217 # include locked here, rather than waiting for update(), to avoid race218 # conditions219 host = models.Host.add_object(hostname=input_dict['hostname'],220 locked=input_dict.get('locked', False))221 return host222 def update(self, input_dict):223 data = {'locked': input_dict.get('locked'),224 'protection': input_dict.get('protection_level')}225 data = input_dict.remove_unspecified_fields(data)226 if 'protection' in data:227 data['protection'] = self._read_protection(data['protection'])228 self.instance.update_object(**data)229 if 'platform' in input_dict:230 label = self.resolve_link(input_dict['platform']) .instance231 if not label.platform:232 raise exceptions.BadRequest('Label %s is not a platform' % label.name)233 for label in self.instance.labels.filter(platform=True):234 self.instance.labels.remove(label)235 self.instance.labels.add(label)236class HostCollection(resource_lib.Collection):237 queryset = models.Host.valid_objects.all()238 entry_class = Host239class HostLabeling(resource_lib.Relationship):240 related_classes = {'host': Host, 'label': Label}241class HostLabelingCollection(resource_lib.RelationshipCollection):242 entry_class = HostLabeling243class HostAclMembership(resource_lib.Relationship):244 related_classes = {'host': Host, 'acl': Acl}245 # TODO: acl.check_for_acl_violation_acl_group()246 # TODO: models.AclGroup.on_host_membership_change()247class HostAclMembershipCollection(resource_lib.RelationshipCollection):248 entry_class = HostAclMembership249class Test(resource_lib.InstanceEntry):250 model = models.Test251 @classmethod252 def add_query_selectors(cls, query_processor):253 query_processor.add_field_selector('name')254 @classmethod255 def from_uri_args(cls, request, test_name, **kwargs):256 return cls(request, models.Test.objects.get(name=test_name))257 def _uri_args(self):258 return {'test_name': self.instance.name}259 def short_representation(self):260 rep = super(Test, self).short_representation()261 rep['name'] = self.instance.name262 return rep263 def full_representation(self):264 rep = super(Test, self).full_representation()265 rep.update({'author': self.instance.author,266 'class': self.instance.test_class,267 'control_file_type':268 control_data.CONTROL_TYPE.get_string(269 self.instance.test_type),270 'control_file_path': self.instance.path,271 'sync_count': self.instance.sync_count,272 'dependencies':273 TestDependencyCollection(fixed_entry=self).link(),274 })275 return rep276 @classmethod277 def create_instance(cls, input_dict, containing_collection):278 cls._check_for_required_fields(input_dict,279 ('name', 'control_file_type',280 'control_file_path'))281 test_type = control_data.CONTROL_TYPE.get_value(282 input['control_file_type'])283 return models.Test.add_object(name=input_dict['name'],284 test_type=test_type,285 path=input_dict['control_file_path'])286 def update(self, input_dict):287 data = {'test_type': input_dict.get('control_file_type'),288 'path': input_dict.get('control_file_path'),289 'class': input_dict.get('class'),290 }291 data = input_dict.remove_unspecified_fields(data)292 self.instance.update_object(**data)293class TestCollection(resource_lib.Collection):294 queryset = models.Test.objects.all()295 entry_class = Test296class TestDependency(resource_lib.Relationship):297 related_classes = {'test': Test, 'label': Label}298class TestDependencyCollection(resource_lib.RelationshipCollection):299 entry_class = TestDependency300# TODO profilers301class ExecutionInfo(resource_lib.Resource):302 _permitted_methods = ('GET','POST')303 _job_fields = models.Job.get_field_dict()304 _DEFAULTS = {305 'control_file': '',306 'is_server': True,307 'dependencies': [],308 'machines_per_execution': 1,309 'run_verify': bool(_job_fields['run_verify'].default),310 'run_reset': bool(_job_fields['run_reset'].default),311 'timeout_mins': _job_fields['timeout_mins'].default,312 'maximum_runtime_mins': _job_fields['max_runtime_mins'].default,313 'cleanup_before_job':314 model_attributes.RebootBefore.get_string(315 models.DEFAULT_REBOOT_BEFORE),316 'cleanup_after_job':317 model_attributes.RebootAfter.get_string(318 models.DEFAULT_REBOOT_AFTER),319 }320 def _query_parameters_accepted(self):321 return (('tests', 'Comma-separated list of test names to run'),322 ('kernels', 'TODO'),323 ('client_control_file',324 'Client control file segment to run after all specified '325 'tests'),326 ('profilers',327 'Comma-separated list of profilers to activate during the '328 'job'),329 ('use_container', 'TODO'),330 ('profile_only',331 'If true, run only profiled iterations; otherwise, always run '332 'at least one non-profiled iteration in addition to a '333 'profiled iteration'),334 ('upload_kernel_config',335 'If true, generate a server control file code that uploads '336 'the kernel config file to the client and tells the client of '337 'the new (local) path when compiling the kernel; the tests '338 'must be server side tests'))339 @classmethod340 def execution_info_from_job(cls, job):341 return {'control_file': job.control_file,342 'is_server': 343 job.control_type == control_data.CONTROL_TYPE.SERVER,344 'dependencies': [label.name for label345 in job.dependency_labels.all()],346 'machines_per_execution': job.synch_count,347 'run_verify': bool(job.run_verify),348 'run_reset': bool(job.run_reset),349 'timeout_mins': job.timeout_mins,350 'maximum_runtime_mins': job.max_runtime_mins,351 'cleanup_before_job':352 model_attributes.RebootBefore.get_string(job.reboot_before),353 'cleanup_after_job':354 model_attributes.RebootAfter.get_string(job.reboot_after),355 }356 def _get_execution_info(self, input_dict):357 tests = input_dict.get('tests', '')358 client_control_file = input_dict.get('client_control_file', None)359 if not tests and not client_control_file:360 return self._DEFAULTS361 test_list = tests.split(',')362 if 'profilers' in input_dict:363 profilers_list = input_dict['profilers'].split(',')364 else:365 profilers_list = []366 kernels = input_dict.get('kernels', '') # TODO367 if kernels:368 kernels = [dict(version=kernel) for kernel in kernels.split(',')]369 cf_info, test_objects, profiler_objects, label = (370 rpc_utils.prepare_generate_control_file(371 test_list, kernels, None, profilers_list))372 control_file_contents = control_file.generate_control(373 tests=test_objects, kernels=kernels,374 profilers=profiler_objects, is_server=cf_info['is_server'],375 client_control_file=client_control_file,376 profile_only=input_dict.get('profile_only', None),377 upload_kernel_config=input_dict.get(378 'upload_kernel_config', None))379 return dict(self._DEFAULTS,380 control_file=control_file_contents,381 is_server=cf_info['is_server'],382 dependencies=cf_info['dependencies'],383 machines_per_execution=cf_info['synch_count'])384 def handle_request(self):385 result = self.link()386 result['execution_info'] = self._get_execution_info(387 self._request.REQUEST)388 return self._basic_response(result)389class QueueEntriesRequest(resource_lib.Resource):390 _permitted_methods = ('GET',)391 def _query_parameters_accepted(self):392 return (('hosts', 'Comma-separated list of hostnames'),393 ('one_time_hosts',394 'Comma-separated list of hostnames not already in the '395 'Autotest system'),396 ('meta_hosts',397 'Comma-separated list of label names; for each one, an entry '398 'will be created and assigned at runtime to an available host '399 'with that label'),400 ('atomic_group_class', 'TODO'))401 def _read_list(self, list_string):402 if list_string:403 return list_string.split(',')404 return []405 def handle_request(self):406 request_dict = self._request.REQUEST407 hosts = self._read_list(request_dict.get('hosts'))408 one_time_hosts = self._read_list(request_dict.get('one_time_hosts'))409 meta_hosts = self._read_list(request_dict.get('meta_hosts'))410 atomic_group_class = request_dict.get('atomic_group_class')411 # TODO: bring in all the atomic groups magic from create_job()412 entries = []413 for hostname in one_time_hosts:414 models.Host.create_one_time_host(hostname)415 for hostname in hosts:416 entry = Host.from_uri_args(self._request, hostname)417 entries.append({'host': entry.link()})418 for label_name in meta_hosts:419 entry = Label.from_uri_args(self._request, label_name)420 entries.append({'meta_host': entry.link()})421 if atomic_group_class:422 entries.append({'atomic_group_class': atomic_group_class})423 result = self.link()424 result['queue_entries'] = entries425 return self._basic_response(result)426class Job(resource_lib.InstanceEntry):427 _permitted_methods = ('GET',)428 model = models.Job429 class _StatusConstraint(query_lib.Constraint):430 def apply_constraint(self, queryset, value, comparison_type,431 is_inverse):432 if comparison_type != 'equals' or is_inverse:433 raise query_lib.ConstraintError('Can only use this selector '434 'with equals')435 non_queued_statuses = [436 status for status, _437 in models.HostQueueEntry.Status.choices()438 if status != models.HostQueueEntry.Status.QUEUED]439 if value == 'queued':440 return queryset.exclude(441 hostqueueentry__status__in=non_queued_statuses)442 elif value == 'active':443 return queryset.filter(444 hostqueueentry__status__in=non_queued_statuses).filter(445 hostqueueentry__complete=False).distinct()446 elif value == 'complete':447 return queryset.exclude(hostqueueentry__complete=False)448 else:449 raise query_lib.ConstraintError('Value must be one of queued, '450 'active or complete')451 @classmethod452 def add_query_selectors(cls, query_processor):453 query_processor.add_field_selector('id')454 query_processor.add_field_selector('name')455 query_processor.add_selector(456 query_lib.Selector('status',457 doc='One of queued, active or complete'),458 Job._StatusConstraint())459 query_processor.add_keyval_selector('has_keyval', models.JobKeyval,460 'key', 'value')461 @classmethod462 def from_uri_args(cls, request, job_id, **kwargs):463 return cls(request, models.Job.objects.get(id=job_id))464 def _uri_args(self):465 return {'job_id': self.instance.id}466 @classmethod467 def _do_prepare_for_full_representation(cls, instances):468 models.Job.objects.populate_relationships(instances, models.JobKeyval,469 'keyvals')470 def short_representation(self):471 rep = super(Job, self).short_representation()472 try:473 string_priority = priorities.Priority.get_string(474 self.instance.priority)475 except ValueError:476 string_priority = str(self.instance.priority)477 rep.update({'id': self.instance.id,478 'owner': self.instance.owner,479 'name': self.instance.name,480 'priority': string_priority,481 'created_on':482 self._format_datetime(self.instance.created_on),483 })484 return rep485 def full_representation(self):486 rep = super(Job, self).full_representation()487 queue_entries = QueueEntryCollection(self._request)488 queue_entries.set_query_parameters(job=self.instance.id)489 drone_set = self.instance.drone_set and self.instance.drone_set.name490 rep.update({'email_list': self.instance.email_list,491 'parse_failed_repair':492 bool(self.instance.parse_failed_repair),493 'drone_set': drone_set,494 'execution_info':495 ExecutionInfo.execution_info_from_job(self.instance),496 'queue_entries': queue_entries.link(),497 'keyvals': dict((keyval.key, keyval.value)498 for keyval in self.instance.keyvals)499 })500 return rep501 @classmethod502 def create_instance(cls, input_dict, containing_collection):503 owner = input_dict.get('owner')504 if not owner:505 owner = models.User.current_user().login506 cls._check_for_required_fields(input_dict, ('name', 'execution_info',507 'queue_entries'))508 execution_info = input_dict['execution_info']509 cls._check_for_required_fields(execution_info, ('control_file',510 'is_server'))511 if execution_info['is_server']:512 control_type = control_data.CONTROL_TYPE.SERVER513 else:514 control_type = control_data.CONTROL_TYPE.CLIENT515 options = dict(516 name=input_dict['name'],517 priority=input_dict.get('priority', None),518 control_file=execution_info['control_file'],519 control_type=control_type,520 is_template=input_dict.get('is_template', None),521 timeout_mins=execution_info.get('timeout_mins'),522 max_runtime_mins=execution_info.get('maximum_runtime_mins'),523 synch_count=execution_info.get('machines_per_execution'),524 run_verify=execution_info.get('run_verify'),525 run_reset=execution_info.get('run_reset'),526 email_list=input_dict.get('email_list', None),527 dependencies=execution_info.get('dependencies', ()),528 reboot_before=execution_info.get('cleanup_before_job'),529 reboot_after=execution_info.get('cleanup_after_job'),530 parse_failed_repair=input_dict.get('parse_failed_repair', None),531 drone_set=input_dict.get('drone_set', None),532 keyvals=input_dict.get('keyvals', None))533 host_objects, metahost_label_objects, atomic_group = [], [], None534 for queue_entry in input_dict['queue_entries']:535 if 'host' in queue_entry:536 host = queue_entry['host']537 if host: # can be None, indicated a hostless job538 host_entry = containing_collection.resolve_link(host)539 host_objects.append(host_entry.instance)540 elif 'meta_host' in queue_entry:541 label_entry = containing_collection.resolve_link(542 queue_entry['meta_host'])543 metahost_label_objects.append(label_entry.instance)544 if 'atomic_group_class' in queue_entry:545 atomic_group_entry = containing_collection.resolve_link(546 queue_entry['atomic_group_class'])547 if atomic_group:548 assert atomic_group_entry.instance.id == atomic_group.id549 else:550 atomic_group = atomic_group_entry.instance551 job_id = rpc_utils.create_new_job(552 owner=owner,553 options=options,554 host_objects=host_objects,555 metahost_objects=metahost_label_objects,556 atomic_group=atomic_group)557 return models.Job.objects.get(id=job_id)558 def update(self, input_dict):559 # Required for POST, doesn't actually support PUT560 pass561class JobCollection(resource_lib.Collection):562 queryset = models.Job.objects.order_by('-id')563 entry_class = Job564class QueueEntry(resource_lib.InstanceEntry):565 _permitted_methods = ('GET', 'PUT')566 model = models.HostQueueEntry567 @classmethod568 def add_query_selectors(cls, query_processor):569 query_processor.add_field_selector('host', field='host__hostname')570 query_processor.add_field_selector('job', field='job__id')571 @classmethod572 def from_uri_args(cls, request, queue_entry_id):573 instance = models.HostQueueEntry.objects.get(id=queue_entry_id)574 return cls(request, instance)575 def _uri_args(self):576 return {'queue_entry_id': self.instance.id}577 def short_representation(self):578 rep = super(QueueEntry, self).short_representation()579 if self.instance.host:580 host = (Host(self._request, self.instance.host)581 .short_representation())582 else:583 host = None584 job = Job(self._request, self.instance.job)585 host = Host.from_optional_instance(self._request, self.instance.host)586 label = Label.from_optional_instance(self._request,587 self.instance.meta_host)588 atomic_group_class = AtomicGroupClass.from_optional_instance(589 self._request, self.instance.atomic_group)590 rep.update(591 {'job': job.short_representation(),592 'host': host.short_representation(),593 'label': label.short_representation(),594 'atomic_group_class':595 atomic_group_class.short_representation(),596 'status': self.instance.status,597 'execution_path': self.instance.execution_subdir,598 'started_on': self._format_datetime(self.instance.started_on),599 'aborted': bool(self.instance.aborted)})600 return rep601 def update(self, input_dict):602 if 'aborted' in input_dict:603 if input_dict['aborted'] != True:604 raise exceptions.BadRequest('"aborted" can only be set to true')605 query = models.HostQueueEntry.objects.filter(pk=self.instance.pk)606 models.AclGroup.check_abort_permissions(query)607 rpc_utils.check_abort_synchronous_jobs(query)608 self.instance.abort(thread_local.get_user())609class QueueEntryCollection(resource_lib.Collection):610 queryset = models.HostQueueEntry.objects.order_by('-id')611 entry_class = QueueEntry612class HealthTask(resource_lib.InstanceEntry):613 _permitted_methods = ('GET',)614 model = models.SpecialTask615 @classmethod616 def add_query_selectors(cls, query_processor):617 query_processor.add_field_selector('host', field='host__hostname')618 @classmethod619 def from_uri_args(cls, request, task_id):620 instance = models.SpecialTask.objects.get(id=task_id)621 return cls(request, instance)622 def _uri_args(self):623 return {'task_id': self.instance.id}624 def short_representation(self):625 rep = super(HealthTask, self).short_representation()626 host = Host(self._request, self.instance.host)627 queue_entry = QueueEntry.from_optional_instance(628 self._request, self.instance.queue_entry)629 rep.update(630 {'host': host.short_representation(),631 'task_type': self.instance.task,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!