Best Python code snippet using autotest_python
monitor_db.py
Source:monitor_db.py
...427 return agent_tasks428 def _get_special_task_agent_tasks(self, is_active=False):429 special_tasks = models.SpecialTask.objects.filter(430 is_active=is_active, is_complete=False)431 return [self._get_agent_task_for_special_task(task)432 for task in special_tasks]433 def _get_agent_task_for_queue_entry(self, queue_entry):434 """435 Construct an AgentTask instance for the given active HostQueueEntry.436 @param queue_entry: a HostQueueEntry437 @return: an AgentTask to run the queue entry438 """439 task_entries = queue_entry.job.get_group_entries(queue_entry)440 self._check_for_duplicate_host_entries(task_entries)441 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,442 models.HostQueueEntry.Status.RUNNING):443 if queue_entry.is_hostless():444 return HostlessQueueTask(queue_entry=queue_entry)445 return QueueTask(queue_entries=task_entries)446 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:447 return postjob_task.GatherLogsTask(queue_entries=task_entries)448 if queue_entry.status == models.HostQueueEntry.Status.PARSING:449 return postjob_task.FinalReparseTask(queue_entries=task_entries)450 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:451 return postjob_task.ArchiveResultsTask(queue_entries=task_entries)452 raise scheduler_lib.SchedulerError(453 '_get_agent_task_for_queue_entry got entry with '454 'invalid status %s: %s' % (queue_entry.status, queue_entry))455 def _check_for_duplicate_host_entries(self, task_entries):456 non_host_statuses = (models.HostQueueEntry.Status.PARSING,457 models.HostQueueEntry.Status.ARCHIVING)458 for task_entry in task_entries:459 using_host = (task_entry.host is not None460 and task_entry.status not in non_host_statuses)461 if using_host:462 self._assert_host_has_no_agent(task_entry)463 def _assert_host_has_no_agent(self, entry):464 """465 @param entry: a HostQueueEntry or a SpecialTask466 """467 if self.host_has_agent(entry.host):468 agent = tuple(self._host_agents.get(entry.host.id))[0]469 raise scheduler_lib.SchedulerError(470 'While scheduling %s, host %s already has a host agent %s'471 % (entry, entry.host, agent.task))472 def _get_agent_task_for_special_task(self, special_task):473 """474 Construct an AgentTask class to run the given SpecialTask and add it475 to this dispatcher.476 A special task is created through schedule_special_tasks, but only if477 the host doesn't already have an agent. This happens through478 add_agent_task. All special agent tasks are given a host on creation,479 and a Null hqe. To create a SpecialAgentTask object, you need a480 models.SpecialTask. If the SpecialTask used to create a SpecialAgentTask481 object contains a hqe it's passed on to the special agent task, which482 creates a HostQueueEntry and saves it as it's queue_entry.483 @param special_task: a models.SpecialTask instance484 @returns an AgentTask to run this SpecialTask485 """486 self._assert_host_has_no_agent(special_task)487 special_agent_task_classes = (prejob_task.CleanupTask,488 prejob_task.VerifyTask,489 prejob_task.RepairTask,490 prejob_task.ResetTask,491 prejob_task.ProvisionTask)492 for agent_task_class in special_agent_task_classes:493 if agent_task_class.TASK_TYPE == special_task.task:494 return agent_task_class(task=special_task)495 raise scheduler_lib.SchedulerError(496 'No AgentTask class for task', str(special_task))497 def _register_pidfiles(self, agent_tasks):498 for agent_task in agent_tasks:499 agent_task.register_necessary_pidfiles()500 def _recover_tasks(self, agent_tasks):501 orphans = _drone_manager.get_orphaned_autoserv_processes()502 for agent_task in agent_tasks:503 agent_task.recover()504 if agent_task.monitor and agent_task.monitor.has_process():505 orphans.discard(agent_task.monitor.get_process())506 self.add_agent_task(agent_task)507 self._check_for_remaining_orphan_processes(orphans)508 def _get_unassigned_entries(self, status):509 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"510 % status):511 if entry.status == status and not self.get_agents_for_entry(entry):512 # The status can change during iteration, e.g., if job.run()513 # sets a group of queue entries to Starting514 yield entry515 def _check_for_remaining_orphan_processes(self, orphans):516 if not orphans:517 return518 subject = 'Unrecovered orphan autoserv processes remain'519 message = '\n'.join(str(process) for process in orphans)520 email_manager.manager.enqueue_notify_email(subject, message)521 die_on_orphans = global_config.global_config.get_config_value(522 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)523 if die_on_orphans:524 raise RuntimeError(subject + '\n' + message)525 def _recover_pending_entries(self):526 for entry in self._get_unassigned_entries(527 models.HostQueueEntry.Status.PENDING):528 logging.info('Recovering Pending entry %s', entry)529 entry.on_pending()530 def _check_for_unrecovered_verifying_entries(self):531 queue_entries = scheduler_models.HostQueueEntry.fetch(532 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)533 unrecovered_hqes = []534 for queue_entry in queue_entries:535 special_tasks = models.SpecialTask.objects.filter(536 task__in=(models.SpecialTask.Task.CLEANUP,537 models.SpecialTask.Task.VERIFY),538 queue_entry__id=queue_entry.id,539 is_complete=False)540 if special_tasks.count() == 0:541 unrecovered_hqes.append(queue_entry)542 if unrecovered_hqes:543 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)544 raise scheduler_lib.SchedulerError(545 '%d unrecovered verifying host queue entries:\n%s' %546 (len(unrecovered_hqes), message))547 def _schedule_special_tasks(self):548 """549 Execute queued SpecialTasks that are ready to run on idle hosts.550 Special tasks include PreJobTasks like verify, reset and cleanup.551 They are created through _schedule_new_jobs and associated with a hqe552 This method translates SpecialTasks to the appropriate AgentTask and553 adds them to the dispatchers agents list, so _handle_agents can execute554 them.555 """556 # When the host scheduler is responsible for acquisition we only want557 # to run tasks with leased hosts. All hqe tasks will already have558 # leased hosts, and we don't want to run frontend tasks till the host559 # scheduler has vetted the assignment. Note that this doesn't include560 # frontend tasks with hosts leased by other active hqes.561 for task in self._job_query_manager.get_prioritized_special_tasks(562 only_tasks_with_leased_hosts=not _inline_host_acquisition):563 if self.host_has_agent(task.host):564 continue565 self.add_agent_task(self._get_agent_task_for_special_task(task))566 def _reverify_remaining_hosts(self):567 # recover active hosts that have not yet been recovered, although this568 # should never happen569 message = ('Recovering active host %s - this probably indicates a '570 'scheduler bug')571 self._reverify_hosts_where(572 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Provisioning')",573 print_message=message)574 def _reverify_hosts_where(self, where,575 print_message='Reverifying host %s'):576 full_where='locked = 0 AND invalid = 0 AND ' + where577 for host in scheduler_models.Host.fetch(where=full_where):578 if self.host_has_agent(host):579 # host has already been recovered in some way...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!