Best Python code snippet using autotest_python
scheduler_models.py
Source:scheduler_models.py
...763 this job can run.764 """765 return min(self._hosts_assigned_count(),766 atomic_group.max_number_of_machines)767 def _min_hosts_needed_to_run(self):768 """Return the minumum number of hsots needed to run this job."""769 return self.synch_count770 def is_ready(self):771 # NOTE: Atomic group jobs stop reporting ready after they have been772 # started to avoid launching multiple copies of one atomic job.773 # Only possible if synch_count is less than than half the number of774 # machines in the atomic group.775 pending_count = self._pending_count()776 atomic_and_has_started = self._atomic_and_has_started()777 ready = (pending_count >= self.synch_count778 and not atomic_and_has_started)779 if not ready:780 logging.info(781 'Job %s not ready: %s pending, %s required '782 '(Atomic and started: %s)',783 self, pending_count, self.synch_count,784 atomic_and_has_started)785 return ready786 def num_machines(self, clause = None):787 sql = "job_id=%s" % self.id788 if clause:789 sql += " AND (%s)" % clause790 return self.count(sql, table='afe_host_queue_entries')791 def num_queued(self):792 return self.num_machines('not complete')793 def num_active(self):794 return self.num_machines('active')795 def num_complete(self):796 return self.num_machines('complete')797 def is_finished(self):798 return self.num_complete() == self.num_machines()799 def _not_yet_run_entries(self, include_verifying=True):800 statuses = [models.HostQueueEntry.Status.QUEUED,801 models.HostQueueEntry.Status.PENDING]802 if include_verifying:803 statuses.append(models.HostQueueEntry.Status.VERIFYING)804 return models.HostQueueEntry.objects.filter(job=self.id,805 status__in=statuses)806 def _stop_all_entries(self):807 entries_to_stop = self._not_yet_run_entries(808 include_verifying=False)809 for child_entry in entries_to_stop:810 assert not child_entry.complete, (811 '%s status=%s, active=%s, complete=%s' %812 (child_entry.id, child_entry.status, child_entry.active,813 child_entry.complete))814 if child_entry.status == models.HostQueueEntry.Status.PENDING:815 child_entry.host.status = models.Host.Status.READY816 child_entry.host.save()817 child_entry.status = models.HostQueueEntry.Status.STOPPED818 child_entry.save()819 def stop_if_necessary(self):820 not_yet_run = self._not_yet_run_entries()821 if not_yet_run.count() < self.synch_count:822 self._stop_all_entries()823 def write_to_machines_file(self, queue_entry):824 hostname = queue_entry.host.hostname825 file_path = os.path.join(self.tag(), '.machines')826 _drone_manager.write_lines_to_file(file_path, [hostname])827 def _next_group_name(self, group_name=''):828 """@returns a directory name to use for the next host group results."""829 if group_name:830 # Sanitize for use as a pathname.831 group_name = group_name.replace(os.path.sep, '_')832 if group_name.startswith('.'):833 group_name = '_' + group_name[1:]834 # Add a separator between the group name and 'group%d'.835 group_name += '.'836 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))837 query = models.HostQueueEntry.objects.filter(838 job=self.id).values('execution_subdir').distinct()839 subdirs = (entry['execution_subdir'] for entry in query)840 group_matches = (group_count_re.match(subdir) for subdir in subdirs)841 ids = [int(match.group(1)) for match in group_matches if match]842 if ids:843 next_id = max(ids) + 1844 else:845 next_id = 0846 return '%sgroup%d' % (group_name, next_id)847 def get_group_entries(self, queue_entry_from_group):848 """849 @param queue_entry_from_group: A HostQueueEntry instance to find other850 group entries on this job for.851 @returns A list of HostQueueEntry objects all executing this job as852 part of the same group as the one supplied (having the same853 execution_subdir).854 """855 execution_subdir = queue_entry_from_group.execution_subdir856 return list(HostQueueEntry.fetch(857 where='job_id=%s AND execution_subdir=%s',858 params=(self.id, execution_subdir)))859 def _should_run_cleanup(self, queue_entry):860 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:861 return True862 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:863 return queue_entry.host.dirty864 return False865 def _should_run_verify(self, queue_entry):866 do_not_verify = (queue_entry.host.protection ==867 host_protections.Protection.DO_NOT_VERIFY)868 if do_not_verify:869 return False870 return self.run_verify871 def schedule_pre_job_tasks(self, queue_entry):872 """873 Get a list of tasks to perform before the host_queue_entry874 may be used to run this Job (such as Cleanup & Verify).875 @returns A list of tasks to be done to the given queue_entry before876 it should be considered be ready to run this job. The last877 task in the list calls HostQueueEntry.on_pending(), which878 continues the flow of the job.879 """880 if self._should_run_cleanup(queue_entry):881 task = models.SpecialTask.Task.CLEANUP882 elif self._should_run_verify(queue_entry):883 task = models.SpecialTask.Task.VERIFY884 else:885 queue_entry.on_pending()886 return887 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)888 models.SpecialTask.objects.create(889 host=models.Host.objects.get(id=queue_entry.host_id),890 queue_entry=queue_entry, task=task)891 def _assign_new_group(self, queue_entries, group_name=''):892 if len(queue_entries) == 1:893 group_subdir_name = queue_entries[0].host.hostname894 else:895 group_subdir_name = self._next_group_name(group_name)896 logging.info('Running synchronous job %d hosts %s as %s',897 self.id, [entry.host.hostname for entry in queue_entries],898 group_subdir_name)899 for queue_entry in queue_entries:900 queue_entry.set_execution_subdir(group_subdir_name)901 def _choose_group_to_run(self, include_queue_entry):902 """903 @returns A tuple containing a list of HostQueueEntry instances to be904 used to run this Job, a string group name to suggest giving905 to this job in the results database.906 """907 atomic_group = include_queue_entry.atomic_group908 chosen_entries = [include_queue_entry]909 if atomic_group:910 num_entries_wanted = atomic_group.max_number_of_machines911 else:912 num_entries_wanted = self.synch_count913 num_entries_wanted -= len(chosen_entries)914 if num_entries_wanted > 0:915 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'916 pending_entries = list(HostQueueEntry.fetch(917 where=where_clause,918 params=(self.id, include_queue_entry.id)))919 # Sort the chosen hosts by hostname before slicing.920 def cmp_queue_entries_by_hostname(entry_a, entry_b):921 return Host.cmp_for_sort(entry_a.host, entry_b.host)922 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)923 chosen_entries += pending_entries[:num_entries_wanted]924 # Sanity check. We'll only ever be called if this can be met.925 if len(chosen_entries) < self.synch_count:926 message = ('job %s got less than %s chosen entries: %s' % (927 self.id, self.synch_count, chosen_entries))928 logging.error(message)929 email_manager.manager.enqueue_notify_email(930 'Job not started, too few chosen entries', message)931 return []932 group_name = include_queue_entry.get_group_name()933 self._assign_new_group(chosen_entries, group_name=group_name)934 return chosen_entries935 def run_if_ready(self, queue_entry):936 """937 Run this job by kicking its HQEs into status='Starting' if enough938 hosts are ready for it to run.939 Cleans up by kicking HQEs into status='Stopped' if this Job is not940 ready to run.941 """942 if not self.is_ready():943 self.stop_if_necessary()944 elif queue_entry.atomic_group:945 self.run_with_ready_delay(queue_entry)946 else:947 self.run(queue_entry)948 def run_with_ready_delay(self, queue_entry):949 """950 Start a delay to wait for more hosts to enter Pending state before951 launching an atomic group job. Once set, the a delay cannot be reset.952 @param queue_entry: The HostQueueEntry object to get atomic group953 info from and pass to run_if_ready when the delay is up.954 @returns An Agent to run the job as appropriate or None if a delay955 has already been set.956 """957 assert queue_entry.job_id == self.id958 assert queue_entry.atomic_group959 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts960 over_max_threshold = (self._pending_count() >=961 self._max_hosts_needed_to_run(queue_entry.atomic_group))962 delay_expired = (self._delay_ready_task and963 time.time() >= self._delay_ready_task.end_time)964 # Delay is disabled or we already have enough? Do not wait to run.965 if not delay or over_max_threshold or delay_expired:966 self.run(queue_entry)967 else:968 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)969 def request_abort(self):970 """Request that this Job be aborted on the next scheduler cycle."""971 self.model().abort()972 def schedule_delayed_callback_task(self, queue_entry):973 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)974 if self._delay_ready_task:975 return None976 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts977 def run_job_after_delay():978 logging.info('Job %s done waiting for extra hosts.', self)979 # Check to see if the job is still relevant. It could have aborted980 # while we were waiting or hosts could have disappearred, etc.981 if self._pending_count() < self._min_hosts_needed_to_run():982 logging.info('Job %s had too few Pending hosts after waiting '983 'for extras. Not running.', self)984 self.request_abort()985 return986 return self.run(queue_entry)987 logging.info('Job %s waiting up to %s seconds for more hosts.',988 self.id, delay)989 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,990 callback=run_job_after_delay)991 return self._delay_ready_task992 def run(self, queue_entry):993 """994 @param queue_entry: The HostQueueEntry instance calling this method.995 """...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!