Best Python code snippet using autotest_python
scheduler_models.py
Source:scheduler_models.py
...588 task=models.SpecialTask.Task.CLEANUP,589 host=models.Host.objects.get(id=self.host.id),590 requested_by=self.job.owner_model())591 self.set_status(Status.ABORTED)592 self.job.abort_delay_ready_task()593 def get_group_name(self):594 atomic_group = self.atomic_group595 if not atomic_group:596 return ''597 # Look at any meta_host and dependency labels and pick the first598 # one that also specifies this atomic group. Use that label name599 # as the group name if possible (it is more specific).600 for label in self.get_labels():601 if label.atomic_group_id:602 assert label.atomic_group_id == atomic_group.id603 return label.name604 return atomic_group.name605 def execution_tag(self):606 assert self.execution_subdir607 return "%s/%s" % (self.job.tag(), self.execution_subdir)608 def execution_path(self):609 return self.execution_tag()610 def set_started_on_now(self):611 self.update_field('started_on', datetime.datetime.now())612 def is_hostless(self):613 return (self.host_id is None614 and self.meta_host is None615 and self.atomic_group_id is None)616class Job(DBObject):617 _table_name = 'afe_jobs'618 _fields = ('id', 'owner', 'name', 'priority', 'control_file',619 'control_type', 'created_on', 'synch_count', 'timeout',620 'run_verify', 'email_list', 'reboot_before', 'reboot_after',621 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',622 'parameterized_job_id')623 # This does not need to be a column in the DB. The delays are likely to624 # be configured short. If the scheduler is stopped and restarted in625 # the middle of a job's delay cycle, the delay cycle will either be626 # repeated or skipped depending on the number of Pending machines found627 # when the restarted scheduler recovers to track it. Not a problem.628 #629 # A reference to the DelayedCallTask that will wake up the job should630 # no other HQEs change state in time. Its end_time attribute is used631 # by our run_with_ready_delay() method to determine if the wait is over.632 _delay_ready_task = None633 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on634 # all status='Pending' atomic group HQEs incase a delay was running when the635 # scheduler was restarted and no more hosts ever successfully exit Verify.636 def __init__(self, id=None, row=None, **kwargs):637 assert id or row638 super(Job, self).__init__(id=id, row=row, **kwargs)639 self._owner_model = None # caches model instance of owner640 def model(self):641 return models.Job.objects.get(id=self.id)642 def owner_model(self):643 # work around the fact that the Job owner field is a string, not a644 # foreign key645 if not self._owner_model:646 self._owner_model = models.User.objects.get(login=self.owner)647 return self._owner_model648 def is_server_job(self):649 return self.control_type != 2650 def tag(self):651 return "%s-%s" % (self.id, self.owner)652 def get_host_queue_entries(self):653 rows = _db.execute("""654 SELECT * FROM afe_host_queue_entries655 WHERE job_id= %s656 """, (self.id,))657 entries = [HostQueueEntry(row=i) for i in rows]658 assert len(entries)>0659 return entries660 def get_execution_details(self):661 """662 Get test execution details for this job.663 @return: Dictionary with test execution details664 """665 def _find_test_jobs(rows):666 """667 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*668 Those are autotest 'internal job' tests, so they should not be669 counted when evaluating the test stats.670 @param rows: List of rows (matrix) with database results.671 """672 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')673 n_test_jobs = 0674 for r in rows:675 test_name = r[0]676 if job_test_pattern.match(test_name):677 n_test_jobs += 1678 return n_test_jobs679 stats = {}680 rows = _db.execute("""681 SELECT t.test, s.word, t.reason682 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s683 WHERE t.job_idx = j.job_idx684 AND s.status_idx = t.status685 AND j.afe_job_id = %s686 ORDER BY t.reason687 """ % self.id)688 failed_rows = [r for r in rows if not 'GOOD' in r]689 n_test_jobs = _find_test_jobs(rows)690 n_test_jobs_failed = _find_test_jobs(failed_rows)691 total_executed = len(rows) - n_test_jobs692 total_failed = len(failed_rows) - n_test_jobs_failed693 if total_executed > 0:694 success_rate = 100 - ((total_failed / float(total_executed)) * 100)695 else:696 success_rate = 0697 stats['total_executed'] = total_executed698 stats['total_failed'] = total_failed699 stats['total_passed'] = total_executed - total_failed700 stats['success_rate'] = success_rate701 status_header = ("Test Name", "Status", "Reason")702 if failed_rows:703 stats['failed_rows'] = utils.matrix_to_string(failed_rows,704 status_header)705 else:706 stats['failed_rows'] = ''707 time_row = _db.execute("""708 SELECT started_time, finished_time709 FROM tko_jobs710 WHERE afe_job_id = %s711 """ % self.id)712 if time_row:713 t_begin, t_end = time_row[0]714 try:715 delta = t_end - t_begin716 minutes, seconds = divmod(delta.seconds, 60)717 hours, minutes = divmod(minutes, 60)718 stats['execution_time'] = ("%02d:%02d:%02d" %719 (hours, minutes, seconds))720 # One of t_end or t_begin are None721 except TypeError:722 stats['execution_time'] = '(could not determine)'723 else:724 stats['execution_time'] = '(none)'725 return stats726 def set_status(self, status, update_queues=False):727 self.update_field('status',status)728 if update_queues:729 for queue_entry in self.get_host_queue_entries():730 queue_entry.set_status(status)731 def keyval_dict(self):732 return self.model().keyval_dict()733 def _atomic_and_has_started(self):734 """735 @returns True if any of the HostQueueEntries associated with this job736 have entered the Status.STARTING state or beyond.737 """738 atomic_entries = models.HostQueueEntry.objects.filter(739 job=self.id, atomic_group__isnull=False)740 if atomic_entries.count() <= 0:741 return False742 # These states may *only* be reached if Job.run() has been called.743 started_statuses = (models.HostQueueEntry.Status.STARTING,744 models.HostQueueEntry.Status.RUNNING,745 models.HostQueueEntry.Status.COMPLETED)746 started_entries = atomic_entries.filter(status__in=started_statuses)747 return started_entries.count() > 0748 def _hosts_assigned_count(self):749 """The number of HostQueueEntries assigned a Host for this job."""750 entries = models.HostQueueEntry.objects.filter(job=self.id,751 host__isnull=False)752 return entries.count()753 def _pending_count(self):754 """The number of HostQueueEntries for this job in the Pending state."""755 pending_entries = models.HostQueueEntry.objects.filter(756 job=self.id, status=models.HostQueueEntry.Status.PENDING)757 return pending_entries.count()758 def _max_hosts_needed_to_run(self, atomic_group):759 """760 @param atomic_group: The AtomicGroup associated with this job that we761 are using to set an upper bound on the threshold.762 @returns The maximum number of HostQueueEntries assigned a Host before763 this job can run.764 """765 return min(self._hosts_assigned_count(),766 atomic_group.max_number_of_machines)767 def _min_hosts_needed_to_run(self):768 """Return the minumum number of hsots needed to run this job."""769 return self.synch_count770 def is_ready(self):771 # NOTE: Atomic group jobs stop reporting ready after they have been772 # started to avoid launching multiple copies of one atomic job.773 # Only possible if synch_count is less than than half the number of774 # machines in the atomic group.775 pending_count = self._pending_count()776 atomic_and_has_started = self._atomic_and_has_started()777 ready = (pending_count >= self.synch_count778 and not atomic_and_has_started)779 if not ready:780 logging.info(781 'Job %s not ready: %s pending, %s required '782 '(Atomic and started: %s)',783 self, pending_count, self.synch_count,784 atomic_and_has_started)785 return ready786 def num_machines(self, clause = None):787 sql = "job_id=%s" % self.id788 if clause:789 sql += " AND (%s)" % clause790 return self.count(sql, table='afe_host_queue_entries')791 def num_queued(self):792 return self.num_machines('not complete')793 def num_active(self):794 return self.num_machines('active')795 def num_complete(self):796 return self.num_machines('complete')797 def is_finished(self):798 return self.num_complete() == self.num_machines()799 def _not_yet_run_entries(self, include_verifying=True):800 statuses = [models.HostQueueEntry.Status.QUEUED,801 models.HostQueueEntry.Status.PENDING]802 if include_verifying:803 statuses.append(models.HostQueueEntry.Status.VERIFYING)804 return models.HostQueueEntry.objects.filter(job=self.id,805 status__in=statuses)806 def _stop_all_entries(self):807 entries_to_stop = self._not_yet_run_entries(808 include_verifying=False)809 for child_entry in entries_to_stop:810 assert not child_entry.complete, (811 '%s status=%s, active=%s, complete=%s' %812 (child_entry.id, child_entry.status, child_entry.active,813 child_entry.complete))814 if child_entry.status == models.HostQueueEntry.Status.PENDING:815 child_entry.host.status = models.Host.Status.READY816 child_entry.host.save()817 child_entry.status = models.HostQueueEntry.Status.STOPPED818 child_entry.save()819 def stop_if_necessary(self):820 not_yet_run = self._not_yet_run_entries()821 if not_yet_run.count() < self.synch_count:822 self._stop_all_entries()823 def write_to_machines_file(self, queue_entry):824 hostname = queue_entry.host.hostname825 file_path = os.path.join(self.tag(), '.machines')826 _drone_manager.write_lines_to_file(file_path, [hostname])827 def _next_group_name(self, group_name=''):828 """@returns a directory name to use for the next host group results."""829 if group_name:830 # Sanitize for use as a pathname.831 group_name = group_name.replace(os.path.sep, '_')832 if group_name.startswith('.'):833 group_name = '_' + group_name[1:]834 # Add a separator between the group name and 'group%d'.835 group_name += '.'836 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))837 query = models.HostQueueEntry.objects.filter(838 job=self.id).values('execution_subdir').distinct()839 subdirs = (entry['execution_subdir'] for entry in query)840 group_matches = (group_count_re.match(subdir) for subdir in subdirs)841 ids = [int(match.group(1)) for match in group_matches if match]842 if ids:843 next_id = max(ids) + 1844 else:845 next_id = 0846 return '%sgroup%d' % (group_name, next_id)847 def get_group_entries(self, queue_entry_from_group):848 """849 @param queue_entry_from_group: A HostQueueEntry instance to find other850 group entries on this job for.851 @returns A list of HostQueueEntry objects all executing this job as852 part of the same group as the one supplied (having the same853 execution_subdir).854 """855 execution_subdir = queue_entry_from_group.execution_subdir856 return list(HostQueueEntry.fetch(857 where='job_id=%s AND execution_subdir=%s',858 params=(self.id, execution_subdir)))859 def _should_run_cleanup(self, queue_entry):860 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:861 return True862 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:863 return queue_entry.host.dirty864 return False865 def _should_run_verify(self, queue_entry):866 do_not_verify = (queue_entry.host.protection ==867 host_protections.Protection.DO_NOT_VERIFY)868 if do_not_verify:869 return False870 return self.run_verify871 def schedule_pre_job_tasks(self, queue_entry):872 """873 Get a list of tasks to perform before the host_queue_entry874 may be used to run this Job (such as Cleanup & Verify).875 @returns A list of tasks to be done to the given queue_entry before876 it should be considered be ready to run this job. The last877 task in the list calls HostQueueEntry.on_pending(), which878 continues the flow of the job.879 """880 if self._should_run_cleanup(queue_entry):881 task = models.SpecialTask.Task.CLEANUP882 elif self._should_run_verify(queue_entry):883 task = models.SpecialTask.Task.VERIFY884 else:885 queue_entry.on_pending()886 return887 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)888 models.SpecialTask.objects.create(889 host=models.Host.objects.get(id=queue_entry.host_id),890 queue_entry=queue_entry, task=task)891 def _assign_new_group(self, queue_entries, group_name=''):892 if len(queue_entries) == 1:893 group_subdir_name = queue_entries[0].host.hostname894 else:895 group_subdir_name = self._next_group_name(group_name)896 logging.info('Running synchronous job %d hosts %s as %s',897 self.id, [entry.host.hostname for entry in queue_entries],898 group_subdir_name)899 for queue_entry in queue_entries:900 queue_entry.set_execution_subdir(group_subdir_name)901 def _choose_group_to_run(self, include_queue_entry):902 """903 @returns A tuple containing a list of HostQueueEntry instances to be904 used to run this Job, a string group name to suggest giving905 to this job in the results database.906 """907 atomic_group = include_queue_entry.atomic_group908 chosen_entries = [include_queue_entry]909 if atomic_group:910 num_entries_wanted = atomic_group.max_number_of_machines911 else:912 num_entries_wanted = self.synch_count913 num_entries_wanted -= len(chosen_entries)914 if num_entries_wanted > 0:915 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'916 pending_entries = list(HostQueueEntry.fetch(917 where=where_clause,918 params=(self.id, include_queue_entry.id)))919 # Sort the chosen hosts by hostname before slicing.920 def cmp_queue_entries_by_hostname(entry_a, entry_b):921 return Host.cmp_for_sort(entry_a.host, entry_b.host)922 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)923 chosen_entries += pending_entries[:num_entries_wanted]924 # Sanity check. We'll only ever be called if this can be met.925 if len(chosen_entries) < self.synch_count:926 message = ('job %s got less than %s chosen entries: %s' % (927 self.id, self.synch_count, chosen_entries))928 logging.error(message)929 email_manager.manager.enqueue_notify_email(930 'Job not started, too few chosen entries', message)931 return []932 group_name = include_queue_entry.get_group_name()933 self._assign_new_group(chosen_entries, group_name=group_name)934 return chosen_entries935 def run_if_ready(self, queue_entry):936 """937 Run this job by kicking its HQEs into status='Starting' if enough938 hosts are ready for it to run.939 Cleans up by kicking HQEs into status='Stopped' if this Job is not940 ready to run.941 """942 if not self.is_ready():943 self.stop_if_necessary()944 elif queue_entry.atomic_group:945 self.run_with_ready_delay(queue_entry)946 else:947 self.run(queue_entry)948 def run_with_ready_delay(self, queue_entry):949 """950 Start a delay to wait for more hosts to enter Pending state before951 launching an atomic group job. Once set, the a delay cannot be reset.952 @param queue_entry: The HostQueueEntry object to get atomic group953 info from and pass to run_if_ready when the delay is up.954 @returns An Agent to run the job as appropriate or None if a delay955 has already been set.956 """957 assert queue_entry.job_id == self.id958 assert queue_entry.atomic_group959 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts960 over_max_threshold = (self._pending_count() >=961 self._max_hosts_needed_to_run(queue_entry.atomic_group))962 delay_expired = (self._delay_ready_task and963 time.time() >= self._delay_ready_task.end_time)964 # Delay is disabled or we already have enough? Do not wait to run.965 if not delay or over_max_threshold or delay_expired:966 self.run(queue_entry)967 else:968 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)969 def request_abort(self):970 """Request that this Job be aborted on the next scheduler cycle."""971 self.model().abort()972 def schedule_delayed_callback_task(self, queue_entry):973 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)974 if self._delay_ready_task:975 return None976 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts977 def run_job_after_delay():978 logging.info('Job %s done waiting for extra hosts.', self)979 # Check to see if the job is still relevant. It could have aborted980 # while we were waiting or hosts could have disappearred, etc.981 if self._pending_count() < self._min_hosts_needed_to_run():982 logging.info('Job %s had too few Pending hosts after waiting '983 'for extras. Not running.', self)984 self.request_abort()985 return986 return self.run(queue_entry)987 logging.info('Job %s waiting up to %s seconds for more hosts.',988 self.id, delay)989 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,990 callback=run_job_after_delay)991 return self._delay_ready_task992 def run(self, queue_entry):993 """994 @param queue_entry: The HostQueueEntry instance calling this method.995 """996 if queue_entry.atomic_group and self._atomic_and_has_started():997 logging.error('Job.run() called on running atomic Job %d '998 'with HQE %s.', self.id, queue_entry)999 return1000 queue_entries = self._choose_group_to_run(queue_entry)1001 if queue_entries:1002 self._finish_run(queue_entries)1003 def _finish_run(self, queue_entries):1004 for queue_entry in queue_entries:1005 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)1006 self.abort_delay_ready_task()1007 def abort_delay_ready_task(self):1008 """Abort the delayed task associated with this job, if any."""1009 if self._delay_ready_task:1010 # Cancel any pending callback that would try to run again1011 # as we are already running.1012 self._delay_ready_task.abort()1013 def __str__(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!