Best Python code snippet using autotest_python
postjob_task.py
Source:postjob_task.py
...59 # we'll use a PidfileRunMonitor to read the autoserv exit status60 if self._autoserv_monitor.exit_code() == 0:61 return models.HostQueueEntry.Status.COMPLETED62 return models.HostQueueEntry.Status.FAILED63 def _set_all_statuses(self, status):64 for queue_entry in self.queue_entries:65 queue_entry.set_status(status)66 def abort(self):67 # override AgentTask.abort() to avoid killing the process and ending68 # the task. post-job tasks continue when the job is aborted.69 pass70 def _pidfile_label(self):71 # '.autoserv_execute' -> 'autoserv'72 return self._pidfile_name()[1:-len('_execute')]73class SelfThrottledPostJobTask(PostJobTask):74 """75 PostJobTask that maintains its own process limit.76 We throttle tasks like parsing because we don't want them to77 hold up tests. At the same time we don't wish to build up load78 that will take forever to parse.79 """80 _num_running_processes = 081 # Last known limit of max processes, used to check whether82 # max processes config has been changed.83 _last_known_max_processes = 084 # Whether an email should be sent to notifiy process limit being hit.85 _notification_on = True86 # Once process limit is hit, an email will be sent.87 # To prevent spams, do not send another email until88 # it drops to lower than the following level.89 REVIVE_NOTIFICATION_THRESHOLD = 0.8090 @classmethod91 def _gauge_metrics(cls):92 """Report to monarch the number of running processes."""93 m = metrics.Gauge('chromeos/autotest/scheduler/postjob_tasks')94 m.set(cls._num_running_processes, fields={'task_name': cls.__name__})95 @classmethod96 def _increment_running_processes(cls):97 cls._num_running_processes += 198 cls._gauge_metrics()99 @classmethod100 def _decrement_running_processes(cls):101 cls._num_running_processes -= 1102 cls._gauge_metrics()103 @classmethod104 def _max_processes(cls):105 raise NotImplementedError106 @classmethod107 def _can_run_new_process(cls):108 return cls._num_running_processes < cls._max_processes()109 def _process_started(self):110 return bool(self.monitor)111 def tick(self):112 # override tick to keep trying to start until the process count goes113 # down and we can, at which point we revert to default behavior114 if self._process_started():115 super(SelfThrottledPostJobTask, self).tick()116 else:117 self._try_starting_process()118 def run(self):119 # override run() to not actually run unless we can120 self._try_starting_process()121 @classmethod122 def _notify_process_limit_hit(cls):123 """Send an email to notify that process limit is hit."""124 if cls._notification_on:125 subject = '%s: hitting max process limit.' % cls.__name__126 message = ('Running processes/Max processes: %d/%d'127 % (cls._num_running_processes, cls._max_processes()))128 email_manager.manager.enqueue_notify_email(subject, message)129 cls._notification_on = False130 @classmethod131 def _reset_notification_switch_if_necessary(cls):132 """Reset _notification_on if necessary.133 Set _notification_on to True on the following cases:134 1) If the limit of max processes configuration changes;135 2) If _notification_on is False and the number of running processes136 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD.137 """138 if cls._last_known_max_processes != cls._max_processes():139 cls._notification_on = True140 cls._last_known_max_processes = cls._max_processes()141 return142 percentage = float(cls._num_running_processes) / cls._max_processes()143 if (not cls._notification_on and144 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD):145 cls._notification_on = True146 def _try_starting_process(self):147 self._reset_notification_switch_if_necessary()148 if not self._can_run_new_process():149 self._notify_process_limit_hit()150 return151 # actually run the command152 super(SelfThrottledPostJobTask, self).run()153 if self._process_started():154 self._increment_running_processes()155 def finished(self, success):156 super(SelfThrottledPostJobTask, self).finished(success)157 if self._process_started():158 self._decrement_running_processes()159class GatherLogsTask(PostJobTask):160 """161 Task responsible for162 * gathering uncollected logs (if Autoserv crashed hard or was killed)163 * copying logs to the results repository164 * spawning CleanupTasks for hosts, if necessary165 * spawning a FinalReparseTask for the job166 * setting the final status of the host, directly or through a cleanup167 """168 def __init__(self, queue_entries, recover_run_monitor=None):169 self._job = queue_entries[0].job170 super(GatherLogsTask, self).__init__(171 queue_entries, log_file_name='.collect_crashinfo.log')172 self._set_ids(queue_entries=queue_entries)173 # TODO: Refactor into autoserv_utils. crbug.com/243090174 def _generate_command(self, results_dir):175 host_list = ','.join(queue_entry.host.hostname176 for queue_entry in self.queue_entries)177 return [autoserv_utils.autoserv_path , '-p',178 '--pidfile-label=%s' % self._pidfile_label(),179 '--use-existing-results', '--collect-crashinfo',180 '-m', host_list, '-r', results_dir]181 @property182 def num_processes(self):183 return len(self.queue_entries)184 def _pidfile_name(self):185 return drone_manager.CRASHINFO_PID_FILE186 def prolog(self):187 self._check_queue_entry_statuses(188 self.queue_entries,189 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),190 allowed_host_statuses=(models.Host.Status.RUNNING,))191 super(GatherLogsTask, self).prolog()192 def epilog(self):193 super(GatherLogsTask, self).epilog()194 self._parse_results(self.queue_entries)195 final_success, num_tests_failed = self._get_monitor_info()196 reset_after_failure = (197 not self._job.run_reset and (198 not final_success or num_tests_failed > 0))199 self._reboot_hosts(final_success, num_tests_failed, reset_after_failure)200 if reset_after_failure:201 m = metrics.Counter('chromeos/autotest/scheduler/postjob_tasks/'202 'reset_after_failure')203 m.increment(fields={'autoserv_process_success': final_success,204 'num_tests_failed': num_tests_failed > 0})205 self._reset_after_failure()206 def _get_monitor_info(self):207 """Read monitor info from pidfile.208 @returns: a tuple including209 final_success: whether the monitor is successfully finished;210 num_tests_failed: how many failed tests in the process.211 """212 if self._autoserv_monitor.has_process():213 final_success = (self._final_status() ==214 models.HostQueueEntry.Status.COMPLETED)215 num_tests_failed = self._autoserv_monitor.num_tests_failed()216 else:217 final_success = False218 num_tests_failed = 0219 return final_success, num_tests_failed220 def _reboot_hosts(self, final_success, num_tests_failed,221 reset_after_failure):222 """Reboot hosts by scheduling a CLEANUP task on host if needed.223 @param final_success: whether the monitor successfully exits.224 @param num_tests_failed: how many failed tests in total.225 @param reset_after_failure: whether to schedule RESET task later.226 """227 reboot_after = self._job.reboot_after228 do_reboot = (229 # always reboot after aborted jobs230 self._final_status() == models.HostQueueEntry.Status.ABORTED231 or reboot_after == model_attributes.RebootAfter.ALWAYS232 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED233 and final_success and num_tests_failed == 0)234 or (num_tests_failed > 0 and not reset_after_failure))235 for queue_entry in self.queue_entries:236 if do_reboot:237 # don't pass the queue entry to the CleanupTask. if the cleanup238 # fails, the job doesn't care -- it's over.239 models.SpecialTask.objects.create(240 host=models.Host.objects.get(id=queue_entry.host.id),241 task=models.SpecialTask.Task.CLEANUP,242 requested_by=self._job.owner_model())243 else:244 queue_entry.host.set_status(models.Host.Status.READY)245 def _reset_after_failure(self):246 """Queue a RESET job for the host if job fails.247 The current hqe entry is not passed into the RESET job.248 """249 for queue_entry in self.queue_entries:250 models.SpecialTask.objects.create(251 host=models.Host.objects.get(id=queue_entry.host.id),252 task=models.SpecialTask.Task.RESET,253 requested_by=self._job.owner_model())254 def run(self):255 autoserv_exit_code = self._autoserv_monitor.exit_code()256 # only run if Autoserv exited due to some signal. if we have no exit257 # code, assume something bad (and signal-like) happened.258 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):259 super(GatherLogsTask, self).run()260 else:261 self.finished(True)262class FinalReparseTask(SelfThrottledPostJobTask):263 def __init__(self, queue_entries):264 super(FinalReparseTask, self).__init__(queue_entries,265 log_file_name='.parse.log')266 # don't use _set_ids, since we don't want to set the host_ids267 self.queue_entry_ids = [entry.id for entry in queue_entries]268 def _generate_command(self, results_dir):269 return [_parser_path, '--detach', '--write-pidfile',270 '--record-duration', '--suite-report', '-l', '2', '-r', '-o',271 results_dir]272 @property273 def num_processes(self):274 return 0 # don't include parser processes in accounting275 def _pidfile_name(self):276 return drone_manager.PARSER_PID_FILE277 @classmethod278 def _max_processes(cls):279 return scheduler_config.config.max_parse_processes280 def prolog(self):281 self._check_queue_entry_statuses(282 self.queue_entries,283 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))284 super(FinalReparseTask, self).prolog()285 def epilog(self):286 super(FinalReparseTask, self).epilog()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!