Best Python code snippet using autotest_python
models.py
Source:models.py
...426 or entry.id in public_ids]427 if len(cannot_abort) == 0:428 return429 entry_names = ', '.join('%s-%s/%s' % (entry.job.id, entry.job.owner,430 entry.host_or_metahost_name())431 for entry in cannot_abort)432 raise AclAccessViolation('You cannot abort the following job entries: '433 + entry_names)434 def check_for_acl_violation_acl_group(self):435 user = thread_local.get_user()436 if user.is_superuser():437 return None438 if not user in self.users.all():439 raise AclAccessViolation("You do not have access to %s"440 % self.name)441 @staticmethod442 def on_host_membership_change():443 everyone = AclGroup.objects.get(name='Everyone')444 # find hosts that aren't in any ACL group and add them to Everyone445 # TODO(showard): this is a bit of a hack, since the fact that this query446 # works is kind of a coincidence of Django internals. This trick447 # doesn't work in general (on all foreign key relationships). I'll448 # replace it with a better technique when the need arises.449 orphaned_hosts = Host.valid_objects.filter(aclgroup__id__isnull=True)450 everyone.hosts.add(*orphaned_hosts.distinct())451 # find hosts in both Everyone and another ACL group, and remove them452 # from Everyone453 hosts_in_everyone = Host.valid_objects.filter_custom_join(454 '_everyone', aclgroup__name='Everyone')455 acled_hosts = hosts_in_everyone.exclude(aclgroup__name='Everyone')456 everyone.hosts.remove(*acled_hosts.distinct())457 def delete(self):458 if (self.name == 'Everyone'):459 raise AclAccessViolation("You cannot delete 'Everyone'!")460 self.check_for_acl_violation_acl_group()461 super(AclGroup, self).delete()462 self.on_host_membership_change()463 def add_current_user_if_empty(self):464 if not self.users.count():465 self.users.add(thread_local.get_user())466 # if you have a model attribute called "Manipulator", Django will467 # automatically insert it into the beginning of the superclass list468 # for the model's manipulators469 class Manipulator(object):470 """471 Custom manipulator to get notification when ACLs are changed through472 the admin interface.473 """474 def save(self, new_data):475 user = thread_local.get_user()476 if hasattr(self, 'original_object'):477 if (not user.is_superuser()478 and self.original_object.name == 'Everyone'):479 raise AclAccessViolation("You cannot modify 'Everyone'!")480 self.original_object.check_for_acl_violation_acl_group()481 obj = super(AclGroup.Manipulator, self).save(new_data)482 if not hasattr(self, 'original_object'):483 obj.users.add(thread_local.get_user())484 obj.add_current_user_if_empty()485 obj.on_host_membership_change()486 return obj487 class Meta:488 db_table = 'acl_groups'489 class Admin:490 list_display = ('name', 'description')491 search_fields = ('name',)492 def __str__(self):493 return self.name494class JobManager(model_logic.ExtendedManager):495 'Custom manager to provide efficient status counts querying.'496 def get_status_counts(self, job_ids):497 """\498 Returns a dictionary mapping the given job IDs to their status499 count dictionaries.500 """501 if not job_ids:502 return {}503 id_list = '(%s)' % ','.join(str(job_id) for job_id in job_ids)504 cursor = connection.cursor()505 cursor.execute("""506 SELECT job_id, status, aborted, complete, COUNT(*)507 FROM host_queue_entries508 WHERE job_id IN %s509 GROUP BY job_id, status, aborted, complete510 """ % id_list)511 all_job_counts = dict((job_id, {}) for job_id in job_ids)512 for job_id, status, aborted, complete, count in cursor.fetchall():513 job_dict = all_job_counts[job_id]514 full_status = HostQueueEntry.compute_full_status(status, aborted,515 complete)516 job_dict.setdefault(full_status, 0)517 job_dict[full_status] += count518 return all_job_counts519class Job(dbmodels.Model, model_logic.ModelExtensions):520 """\521 owner: username of job owner522 name: job name (does not have to be unique)523 priority: Low, Medium, High, Urgent (or 0-3)524 control_file: contents of control file525 control_type: Client or Server526 created_on: date of job creation527 submitted_on: date of job submission528 synch_count: how many hosts should be used per autoserv execution529 run_verify: Whether or not to run the verify phase530 timeout: hours from queuing time until job times out531 max_runtime_hrs: hours from job starting time until job times out532 email_list: list of people to email on completion delimited by any of:533 white space, ',', ':', ';'534 dependency_labels: many-to-many relationship with labels corresponding to535 job dependencies536 reboot_before: Never, If dirty, or Always537 reboot_after: Never, If all tests passed, or Always538 parse_failed_repair: if True, a failed repair launched by this job will have539 its results parsed as part of the job.540 """541 DEFAULT_TIMEOUT = global_config.global_config.get_config_value(542 'AUTOTEST_WEB', 'job_timeout_default', default=240)543 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value(544 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72)545 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value(546 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool,547 default=False)548 Priority = enum.Enum('Low', 'Medium', 'High', 'Urgent')549 ControlType = enum.Enum('Server', 'Client', start_value=1)550 owner = dbmodels.CharField(maxlength=255)551 name = dbmodels.CharField(maxlength=255)552 priority = dbmodels.SmallIntegerField(choices=Priority.choices(),553 blank=True, # to allow 0554 default=Priority.MEDIUM)555 control_file = dbmodels.TextField()556 control_type = dbmodels.SmallIntegerField(choices=ControlType.choices(),557 blank=True, # to allow 0558 default=ControlType.CLIENT)559 created_on = dbmodels.DateTimeField()560 synch_count = dbmodels.IntegerField(null=True, default=1)561 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT)562 run_verify = dbmodels.BooleanField(default=True)563 email_list = dbmodels.CharField(maxlength=250, blank=True)564 dependency_labels = dbmodels.ManyToManyField(565 Label, blank=True, filter_interface=dbmodels.HORIZONTAL)566 reboot_before = dbmodels.SmallIntegerField(choices=RebootBefore.choices(),567 blank=True,568 default=DEFAULT_REBOOT_BEFORE)569 reboot_after = dbmodels.SmallIntegerField(choices=RebootAfter.choices(),570 blank=True,571 default=DEFAULT_REBOOT_AFTER)572 parse_failed_repair = dbmodels.BooleanField(573 default=DEFAULT_PARSE_FAILED_REPAIR)574 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS)575 # custom manager576 objects = JobManager()577 def is_server_job(self):578 return self.control_type == self.ControlType.SERVER579 @classmethod580 def create(cls, owner, options, hosts):581 """\582 Creates a job by taking some information (the listed args)583 and filling in the rest of the necessary information.584 """585 AclGroup.check_for_acl_violation_hosts(hosts)586 job = cls.add_object(587 owner=owner,588 name=options['name'],589 priority=options['priority'],590 control_file=options['control_file'],591 control_type=options['control_type'],592 synch_count=options.get('synch_count'),593 timeout=options.get('timeout'),594 max_runtime_hrs=options.get('max_runtime_hrs'),595 run_verify=options.get('run_verify'),596 email_list=options.get('email_list'),597 reboot_before=options.get('reboot_before'),598 reboot_after=options.get('reboot_after'),599 parse_failed_repair=options.get('parse_failed_repair'),600 created_on=datetime.now())601 job.dependency_labels = options['dependencies']602 return job603 def queue(self, hosts, atomic_group=None, is_template=False):604 """Enqueue a job on the given hosts."""605 if atomic_group and not hosts:606 # No hosts or labels are required to queue an atomic group607 # Job. However, if they are given, we respect them below.608 atomic_group.enqueue_job(self, is_template=is_template)609 for host in hosts:610 host.enqueue_job(self, atomic_group=atomic_group,611 is_template=is_template)612 def create_recurring_job(self, start_date, loop_period, loop_count, owner):613 rec = RecurringRun(job=self, start_date=start_date,614 loop_period=loop_period,615 loop_count=loop_count,616 owner=User.objects.get(login=owner))617 rec.save()618 return rec.id619 def user(self):620 try:621 return User.objects.get(login=self.owner)622 except self.DoesNotExist:623 return None624 def abort(self, aborted_by):625 for queue_entry in self.hostqueueentry_set.all():626 queue_entry.abort(aborted_by)627 class Meta:628 db_table = 'jobs'629 if settings.FULL_ADMIN:630 class Admin:631 list_display = ('id', 'owner', 'name', 'control_type')632 def __str__(self):633 return '%s (%s-%s)' % (self.name, self.id, self.owner)634class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions):635 job = dbmodels.ForeignKey(Job)636 host = dbmodels.ForeignKey(Host)637 objects = model_logic.ExtendedManager()638 class Meta:639 db_table = 'ineligible_host_queues'640 if settings.FULL_ADMIN:641 class Admin:642 list_display = ('id', 'job', 'host')643class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):644 Status = enum.Enum('Queued', 'Starting', 'Verifying', 'Pending', 'Running',645 'Gathering', 'Parsing', 'Aborted', 'Completed',646 'Failed', 'Stopped', 'Template', string_values=True)647 ACTIVE_STATUSES = (Status.STARTING, Status.VERIFYING, Status.PENDING,648 Status.RUNNING, Status.GATHERING)649 COMPLETE_STATUSES = (Status.ABORTED, Status.COMPLETED, Status.FAILED,650 Status.STOPPED, Status.TEMPLATE)651 job = dbmodels.ForeignKey(Job)652 host = dbmodels.ForeignKey(Host, blank=True, null=True)653 status = dbmodels.CharField(maxlength=255)654 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True,655 db_column='meta_host')656 active = dbmodels.BooleanField(default=False)657 complete = dbmodels.BooleanField(default=False)658 deleted = dbmodels.BooleanField(default=False)659 execution_subdir = dbmodels.CharField(maxlength=255, blank=True, default='')660 # If atomic_group is set, this is a virtual HostQueueEntry that will661 # be expanded into many actual hosts within the group at schedule time.662 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True)663 aborted = dbmodels.BooleanField(default=False)664 started_on = dbmodels.DateTimeField(null=True)665 objects = model_logic.ExtendedManager()666 def __init__(self, *args, **kwargs):667 super(HostQueueEntry, self).__init__(*args, **kwargs)668 self._record_attributes(['status'])669 @classmethod670 def create(cls, job, host=None, meta_host=None, atomic_group=None,671 is_template=False):672 if is_template:673 status = cls.Status.TEMPLATE674 else:675 status = cls.Status.QUEUED676 return cls(job=job, host=host, meta_host=meta_host,677 atomic_group=atomic_group, status=status)678 def save(self):679 self._set_active_and_complete()680 super(HostQueueEntry, self).save()681 self._check_for_updated_attributes()682 def execution_path(self):683 """684 Path to this entry's results (relative to the base results directory).685 """686 return self.execution_subdir687 def host_or_metahost_name(self):688 if self.host:689 return self.host.hostname690 else:691 assert self.meta_host692 return self.meta_host.name693 def _set_active_and_complete(self):694 if self.status in self.ACTIVE_STATUSES:695 self.active, self.complete = True, False696 elif self.status in self.COMPLETE_STATUSES:697 self.active, self.complete = False, True698 else:699 self.active, self.complete = False, False700 def on_attribute_changed(self, attribute, old_value):701 assert attribute == 'status'...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!