Best Python code snippet using autotest_python
models.py
Source:models.py
...510 """ % id_list)511 all_job_counts = dict((job_id, {}) for job_id in job_ids)512 for job_id, status, aborted, complete, count in cursor.fetchall():513 job_dict = all_job_counts[job_id]514 full_status = HostQueueEntry.compute_full_status(status, aborted,515 complete)516 job_dict.setdefault(full_status, 0)517 job_dict[full_status] += count518 return all_job_counts519class Job(dbmodels.Model, model_logic.ModelExtensions):520 """\521 owner: username of job owner522 name: job name (does not have to be unique)523 priority: Low, Medium, High, Urgent (or 0-3)524 control_file: contents of control file525 control_type: Client or Server526 created_on: date of job creation527 submitted_on: date of job submission528 synch_count: how many hosts should be used per autoserv execution529 run_verify: Whether or not to run the verify phase530 timeout: hours from queuing time until job times out531 max_runtime_hrs: hours from job starting time until job times out532 email_list: list of people to email on completion delimited by any of:533 white space, ',', ':', ';'534 dependency_labels: many-to-many relationship with labels corresponding to535 job dependencies536 reboot_before: Never, If dirty, or Always537 reboot_after: Never, If all tests passed, or Always538 parse_failed_repair: if True, a failed repair launched by this job will have539 its results parsed as part of the job.540 """541 DEFAULT_TIMEOUT = global_config.global_config.get_config_value(542 'AUTOTEST_WEB', 'job_timeout_default', default=240)543 DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value(544 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72)545 DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value(546 'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool,547 default=False)548 Priority = enum.Enum('Low', 'Medium', 'High', 'Urgent')549 ControlType = enum.Enum('Server', 'Client', start_value=1)550 owner = dbmodels.CharField(maxlength=255)551 name = dbmodels.CharField(maxlength=255)552 priority = dbmodels.SmallIntegerField(choices=Priority.choices(),553 blank=True, # to allow 0554 default=Priority.MEDIUM)555 control_file = dbmodels.TextField()556 control_type = dbmodels.SmallIntegerField(choices=ControlType.choices(),557 blank=True, # to allow 0558 default=ControlType.CLIENT)559 created_on = dbmodels.DateTimeField()560 synch_count = dbmodels.IntegerField(null=True, default=1)561 timeout = dbmodels.IntegerField(default=DEFAULT_TIMEOUT)562 run_verify = dbmodels.BooleanField(default=True)563 email_list = dbmodels.CharField(maxlength=250, blank=True)564 dependency_labels = dbmodels.ManyToManyField(565 Label, blank=True, filter_interface=dbmodels.HORIZONTAL)566 reboot_before = dbmodels.SmallIntegerField(choices=RebootBefore.choices(),567 blank=True,568 default=DEFAULT_REBOOT_BEFORE)569 reboot_after = dbmodels.SmallIntegerField(choices=RebootAfter.choices(),570 blank=True,571 default=DEFAULT_REBOOT_AFTER)572 parse_failed_repair = dbmodels.BooleanField(573 default=DEFAULT_PARSE_FAILED_REPAIR)574 max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS)575 # custom manager576 objects = JobManager()577 def is_server_job(self):578 return self.control_type == self.ControlType.SERVER579 @classmethod580 def create(cls, owner, options, hosts):581 """\582 Creates a job by taking some information (the listed args)583 and filling in the rest of the necessary information.584 """585 AclGroup.check_for_acl_violation_hosts(hosts)586 job = cls.add_object(587 owner=owner,588 name=options['name'],589 priority=options['priority'],590 control_file=options['control_file'],591 control_type=options['control_type'],592 synch_count=options.get('synch_count'),593 timeout=options.get('timeout'),594 max_runtime_hrs=options.get('max_runtime_hrs'),595 run_verify=options.get('run_verify'),596 email_list=options.get('email_list'),597 reboot_before=options.get('reboot_before'),598 reboot_after=options.get('reboot_after'),599 parse_failed_repair=options.get('parse_failed_repair'),600 created_on=datetime.now())601 job.dependency_labels = options['dependencies']602 return job603 def queue(self, hosts, atomic_group=None, is_template=False):604 """Enqueue a job on the given hosts."""605 if atomic_group and not hosts:606 # No hosts or labels are required to queue an atomic group607 # Job. However, if they are given, we respect them below.608 atomic_group.enqueue_job(self, is_template=is_template)609 for host in hosts:610 host.enqueue_job(self, atomic_group=atomic_group,611 is_template=is_template)612 def create_recurring_job(self, start_date, loop_period, loop_count, owner):613 rec = RecurringRun(job=self, start_date=start_date,614 loop_period=loop_period,615 loop_count=loop_count,616 owner=User.objects.get(login=owner))617 rec.save()618 return rec.id619 def user(self):620 try:621 return User.objects.get(login=self.owner)622 except self.DoesNotExist:623 return None624 def abort(self, aborted_by):625 for queue_entry in self.hostqueueentry_set.all():626 queue_entry.abort(aborted_by)627 class Meta:628 db_table = 'jobs'629 if settings.FULL_ADMIN:630 class Admin:631 list_display = ('id', 'owner', 'name', 'control_type')632 def __str__(self):633 return '%s (%s-%s)' % (self.name, self.id, self.owner)634class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions):635 job = dbmodels.ForeignKey(Job)636 host = dbmodels.ForeignKey(Host)637 objects = model_logic.ExtendedManager()638 class Meta:639 db_table = 'ineligible_host_queues'640 if settings.FULL_ADMIN:641 class Admin:642 list_display = ('id', 'job', 'host')643class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):644 Status = enum.Enum('Queued', 'Starting', 'Verifying', 'Pending', 'Running',645 'Gathering', 'Parsing', 'Aborted', 'Completed',646 'Failed', 'Stopped', 'Template', string_values=True)647 ACTIVE_STATUSES = (Status.STARTING, Status.VERIFYING, Status.PENDING,648 Status.RUNNING, Status.GATHERING)649 COMPLETE_STATUSES = (Status.ABORTED, Status.COMPLETED, Status.FAILED,650 Status.STOPPED, Status.TEMPLATE)651 job = dbmodels.ForeignKey(Job)652 host = dbmodels.ForeignKey(Host, blank=True, null=True)653 status = dbmodels.CharField(maxlength=255)654 meta_host = dbmodels.ForeignKey(Label, blank=True, null=True,655 db_column='meta_host')656 active = dbmodels.BooleanField(default=False)657 complete = dbmodels.BooleanField(default=False)658 deleted = dbmodels.BooleanField(default=False)659 execution_subdir = dbmodels.CharField(maxlength=255, blank=True, default='')660 # If atomic_group is set, this is a virtual HostQueueEntry that will661 # be expanded into many actual hosts within the group at schedule time.662 atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True)663 aborted = dbmodels.BooleanField(default=False)664 started_on = dbmodels.DateTimeField(null=True)665 objects = model_logic.ExtendedManager()666 def __init__(self, *args, **kwargs):667 super(HostQueueEntry, self).__init__(*args, **kwargs)668 self._record_attributes(['status'])669 @classmethod670 def create(cls, job, host=None, meta_host=None, atomic_group=None,671 is_template=False):672 if is_template:673 status = cls.Status.TEMPLATE674 else:675 status = cls.Status.QUEUED676 return cls(job=job, host=host, meta_host=meta_host,677 atomic_group=atomic_group, status=status)678 def save(self):679 self._set_active_and_complete()680 super(HostQueueEntry, self).save()681 self._check_for_updated_attributes()682 def execution_path(self):683 """684 Path to this entry's results (relative to the base results directory).685 """686 return self.execution_subdir687 def host_or_metahost_name(self):688 if self.host:689 return self.host.hostname690 else:691 assert self.meta_host692 return self.meta_host.name693 def _set_active_and_complete(self):694 if self.status in self.ACTIVE_STATUSES:695 self.active, self.complete = True, False696 elif self.status in self.COMPLETE_STATUSES:697 self.active, self.complete = False, True698 else:699 self.active, self.complete = False, False700 def on_attribute_changed(self, attribute, old_value):701 assert attribute == 'status'702 logging.info('%s/%d (%d) -> %s' % (self.host, self.job.id, self.id,703 self.status))704 def is_meta_host_entry(self):705 'True if this is a entry has a meta_host instead of a host.'706 return self.host is None and self.meta_host is not None707 def log_abort(self, user):708 if user is None:709 # automatic system abort (i.e. job timeout)710 return711 abort_log = AbortedHostQueueEntry(queue_entry=self, aborted_by=user)712 abort_log.save()713 def abort(self, user):714 # this isn't completely immune to race conditions since it's not atomic,715 # but it should be safe given the scheduler's behavior.716 if not self.complete and not self.aborted:717 self.log_abort(user)718 self.aborted = True719 self.save()720 @classmethod721 def compute_full_status(cls, status, aborted, complete):722 if aborted and not complete:723 return 'Aborted (%s)' % status724 return status725 def full_status(self):726 return self.compute_full_status(self.status, self.aborted,727 self.complete)728 def _postprocess_object_dict(self, object_dict):729 object_dict['full_status'] = self.full_status()730 class Meta:731 db_table = 'host_queue_entries'732 if settings.FULL_ADMIN:733 class Admin:734 list_display = ('id', 'job', 'host', 'status',735 'meta_host')736 def __str__(self):737 hostname = None738 if self.host:739 hostname = self.host.hostname740 return "%s/%d (%d)" % (hostname, self.job.id, self.id)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!