Best Python code snippet using autotest_python
monitor_db.py
Source:monitor_db.py
...504 if agent_task.monitor and agent_task.monitor.has_process():505 orphans.discard(agent_task.monitor.get_process())506 self.add_agent_task(agent_task)507 self._check_for_remaining_orphan_processes(orphans)508 def _get_unassigned_entries(self, status):509 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"510 % status):511 if entry.status == status and not self.get_agents_for_entry(entry):512 # The status can change during iteration, e.g., if job.run()513 # sets a group of queue entries to Starting514 yield entry515 def _check_for_remaining_orphan_processes(self, orphans):516 if not orphans:517 return518 subject = 'Unrecovered orphan autoserv processes remain'519 message = '\n'.join(str(process) for process in orphans)520 email_manager.manager.enqueue_notify_email(subject, message)521 die_on_orphans = global_config.global_config.get_config_value(522 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)523 if die_on_orphans:524 raise RuntimeError(subject + '\n' + message)525 def _recover_pending_entries(self):526 for entry in self._get_unassigned_entries(527 models.HostQueueEntry.Status.PENDING):528 logging.info('Recovering Pending entry %s', entry)529 entry.on_pending()530 def _check_for_unrecovered_verifying_entries(self):531 queue_entries = scheduler_models.HostQueueEntry.fetch(532 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)533 unrecovered_hqes = []534 for queue_entry in queue_entries:535 special_tasks = models.SpecialTask.objects.filter(536 task__in=(models.SpecialTask.Task.CLEANUP,537 models.SpecialTask.Task.VERIFY),538 queue_entry__id=queue_entry.id,539 is_complete=False)540 if special_tasks.count() == 0:...
assign_tasks.py
Source:assign_tasks.py
...70 tasks = cls._get_tasks_by_topic(tasks, topic)71 incomplete_tasks, incomplete_tasks_num \72 = cls._get_incomplete_tasks(tasks, users_per_task)73 unassigned_entries \74 = cls._get_unassigned_entries(entries, tasks, exclusion)75 polygraphs, total_polygraphs_per_user \76 = cls._get_polygraphs(entries, polygraphs_per_user)77 print(total_polygraphs_per_user)78 return cls(unassigned_entries,79 users, tasks,80 incomplete_tasks,81 incomplete_tasks_num,82 topic,83 users_per_task,84 tasks_per_user,85 polygraphs_per_user,86 total_polygraphs_per_user,87 polygraphs)8889 @classmethod90 def _get_tasks_by_topic(cls,91 tasks: List[Dict],92 topic: str,93 ) -> List[Dict]:94 tasks_with_certain_topic = []95 for task in tasks:96 if task['topic'] == topic:97 tasks_with_certain_topic.append(task)98 return tasks_with_certain_topic99100 @classmethod101 def _get_incomplete_tasks(cls,102 tasks: List[Dict],103 users_per_task: int,104 ) -> Tuple[List[Dict], int]:105 incomplete_tasks = []106 incomplete_tasks_num = 0107108 def _remove_incomplete_user(task: Dict):109 if len(task['submitters']) != len(task['to']):110 removals = []111 for user in task['to']:112 if user not in task['submitters']:113 removals.append(user)114 for user in removals:115 task['to'].remove(user)116117 for task in tasks:118 _remove_incomplete_user(task)119 if len(task['submitters']) < users_per_task:120 incomplete_tasks.append(task)121 incomplete_tasks_num += users_per_task - len(task['submitters'])122 return incomplete_tasks, incomplete_tasks_num123124 @classmethod125 def _get_unassigned_entries(cls,126 entries: List[Dict],127 tasks: List[Dict],128 exclusion: List[str],129 ):130 unassigned_entries = []131 assigned_entries = set([task['entry'] for task in tasks])132 for entry in entries:133 # this entry is a polygraph134 if 'polygraph' in entry:135 continue136 # this entry is exclusive137 if entry['info']['rpId'] in exclusion:138 continue139 # this entry is assigned
...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!