Best Python code snippet using autotest_python
monitor_db_functional_unittest.py
Source:monitor_db_functional_unittest.py
...111 assert pidfile_id is not None112 contents = self._pidfiles[pidfile_id]113 contents.exit_status = exit_status114 contents.num_tests_failed = 0115 def was_last_process_killed(self, pidfile_type):116 pidfile_id = self._last_pidfile_id[pidfile_type]117 return pidfile_id in self._killed_pidfiles118 def nonfinished_pidfile_ids(self):119 return [pidfile_id for pidfile_id, pidfile_contents120 in self._pidfiles.iteritems()121 if pidfile_contents.exit_status is None]122 def running_pidfile_ids(self):123 return [pidfile_id for pidfile_id in self.nonfinished_pidfile_ids()124 if self._pidfiles[pidfile_id].process is not None]125 def pidfile_from_path(self, working_directory, pidfile_name):126 return self._pidfile_index[(working_directory, pidfile_name)]127 def attached_files(self, working_directory):128 """129 Return dict mapping path to contents for attached files with specified130 paths.131 """132 return dict((path, contents) for path, contents133 in self._attached_files.get(working_directory, [])134 if path is not None)135 # DroneManager emulation APIs for use by monitor_db136 def get_orphaned_autoserv_processes(self):137 return set()138 def total_running_processes(self):139 return sum(pidfile_id._num_processes140 for pidfile_id in self.nonfinished_pidfile_ids())141 def max_runnable_processes(self, username, drone_hostnames_allowed):142 return self.process_capacity - self.total_running_processes()143 def refresh(self):144 for pidfile_id in self._unregistered_pidfiles:145 # intentionally handle non-registered pidfiles silently146 self._pidfiles.pop(pidfile_id, None)147 self._unregistered_pidfiles = set()148 def execute_actions(self):149 # executing an "execute_command" causes a pidfile to be created150 for pidfile_id in self._future_pidfiles:151 # Process objects are opaque to monitor_db152 process = object()153 self._pidfiles[pidfile_id].process = process154 self._process_index[process] = pidfile_id155 self._future_pidfiles = []156 def attach_file_to_execution(self, result_dir, file_contents,157 file_path=None):158 self._attached_files.setdefault(result_dir, set()).add((file_path,159 file_contents))160 return 'attach_path'161 def _initialize_pidfile(self, pidfile_id):162 if pidfile_id not in self._pidfiles:163 assert pidfile_id.key() not in self._pidfile_index164 self._pidfiles[pidfile_id] = drone_manager.PidfileContents()165 self._pidfile_index[pidfile_id.key()] = pidfile_id166 def _set_last_pidfile(self, pidfile_id, working_directory, pidfile_name):167 if working_directory.startswith('hosts/'):168 # such paths look like hosts/host1/1-verify, we'll grab the end169 type_string = working_directory.rsplit('-', 1)[1]170 pidfile_type = _PidfileType.get_value(type_string)171 else:172 pidfile_type = _PIDFILE_TO_PIDFILE_TYPE[pidfile_name]173 self._last_pidfile_id[pidfile_type] = pidfile_id174 def execute_command(self, command, working_directory, pidfile_name,175 num_processes, log_file=None, paired_with_pidfile=None,176 username=None, drone_hostnames_allowed=None):177 logging.debug('Executing %s in %s', command, working_directory)178 pidfile_id = self._DummyPidfileId(working_directory, pidfile_name)179 if pidfile_id.key() in self._pidfile_index:180 pidfile_id = self._pidfile_index[pidfile_id.key()]181 pidfile_id._num_processes = num_processes182 pidfile_id._paired_with_pidfile = paired_with_pidfile183 self._future_pidfiles.append(pidfile_id)184 self._initialize_pidfile(pidfile_id)185 self._pidfile_index[(working_directory, pidfile_name)] = pidfile_id186 self._set_last_pidfile(pidfile_id, working_directory, pidfile_name)187 return pidfile_id188 def get_pidfile_contents(self, pidfile_id, use_second_read=False):189 if pidfile_id not in self._pidfiles:190 logging.debug('Request for nonexistent pidfile %s' % pidfile_id)191 return self._pidfiles.get(pidfile_id, drone_manager.PidfileContents())192 def is_process_running(self, process):193 return True194 def register_pidfile(self, pidfile_id):195 self._initialize_pidfile(pidfile_id)196 def unregister_pidfile(self, pidfile_id):197 self._unregistered_pidfiles.add(pidfile_id)198 def declare_process_count(self, pidfile_id, num_processes):199 pidfile_id.num_processes = num_processes200 def absolute_path(self, path):201 return 'absolute/' + path202 def write_lines_to_file(self, file_path, lines, paired_with_process=None):203 # TODO: record this204 pass205 def get_pidfile_id_from(self, execution_tag, pidfile_name):206 default_pidfile = self._DummyPidfileId(execution_tag, pidfile_name,207 num_processes=0)208 return self._pidfile_index.get((execution_tag, pidfile_name),209 default_pidfile)210 def kill_process(self, process):211 pidfile_id = self._process_index[process]212 self._killed_pidfiles.add(pidfile_id)213 self._set_pidfile_exit_status(pidfile_id, 271)214class MockEmailManager(NullMethodObject):215 _NULL_METHODS = ('send_queued_admin', 'send')216 def enqueue_admin(self, subject, message):217 logging.warn('enqueue_notify_email: %s', subject)218 logging.warn(message)219class SchedulerFunctionalTest(unittest.TestCase,220 test_utils.FrontendTestMixin):221 # some number of ticks after which the scheduler is presumed to have222 # stabilized, given no external changes223 _A_LOT_OF_TICKS = 10224 def setUp(self):225 self._frontend_common_setup()226 self._set_stubs()227 self._set_settings_values()228 self._create_dispatcher()229 logging.basicConfig(level=logging.DEBUG)230 def _create_dispatcher(self):231 self.dispatcher = monitor_db.Dispatcher()232 def tearDown(self):233 self._database.disconnect()234 self._frontend_common_teardown()235 def _set_stubs(self):236 self.mock_config = MockGlobalConfig()237 self.god.stub_with(settings, 'settings', self.mock_config)238 self.mock_drone_manager = MockDroneManager()239 drone_manager._set_instance(self.mock_drone_manager)240 self.mock_email_manager = MockEmailManager()241 self.god.stub_with(mail, "manager", self.mock_email_manager)242 self._database = (243 database_connection.TranslatingDatabase.get_test_database(244 translators=_DB_TRANSLATORS))245 self._database.connect(db_type='django')246 self.god.stub_with(monitor_db, '_db', self._database)247 self.god.stub_with(scheduler_models, '_db', self._database)248 monitor_db.initialize_globals()249 scheduler_models.initialize_globals()250 def _set_settings_values(self):251 self.mock_config.set_config_value('SCHEDULER', 'pidfile_timeout_mins',252 1)253 self.mock_config.set_config_value('SCHEDULER', 'gc_stats_interval_mins',254 999999)255 def _initialize_test(self):256 self.dispatcher.initialize()257 def _run_dispatcher(self):258 for _ in xrange(self._A_LOT_OF_TICKS):259 self.dispatcher.tick()260 def test_idle(self):261 self._initialize_test()262 self._run_dispatcher()263 def _assert_process_executed(self, working_directory, pidfile_name):264 process_was_executed = self.mock_drone_manager.was_process_executed(265 'hosts/host1/1-verify', drone_manager.AUTOSERV_PID_FILE)266 self.assert_(process_was_executed,267 '%s/%s not executed' % (working_directory, pidfile_name))268 def _update_instance(self, model_instance):269 return type(model_instance).objects.get(pk=model_instance.pk)270 def _check_statuses(self, queue_entry, queue_entry_status,271 host_status=None):272 self._check_entry_status(queue_entry, queue_entry_status)273 if host_status:274 self._check_host_status(queue_entry.host, host_status)275 def _check_entry_status(self, queue_entry, status):276 # update from DB277 queue_entry = self._update_instance(queue_entry)278 self.assertEquals(queue_entry.status, status)279 def _check_host_status(self, host, status):280 # update from DB281 host = self._update_instance(host)282 self.assertEquals(host.status, status)283 def _run_pre_job_verify(self, queue_entry):284 self._run_dispatcher() # launches verify285 self._check_statuses(queue_entry, HqeStatus.VERIFYING,286 HostStatus.VERIFYING)287 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)288 def test_simple_job(self):289 self._initialize_test()290 job, queue_entry = self._make_job_and_queue_entry()291 self._run_pre_job_verify(queue_entry)292 self._run_dispatcher() # launches job293 self._check_statuses(queue_entry, HqeStatus.RUNNING, HostStatus.RUNNING)294 self._finish_job(queue_entry)295 self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)296 self._assert_nothing_is_running()297 def _setup_for_pre_job_cleanup(self):298 self._initialize_test()299 job, queue_entry = self._make_job_and_queue_entry()300 job.reboot_before = model_attributes.RebootBefore.ALWAYS301 job.save()302 return queue_entry303 def _run_pre_job_cleanup_job(self, queue_entry):304 self._run_dispatcher() # cleanup305 self._check_statuses(queue_entry, HqeStatus.VERIFYING,306 HostStatus.CLEANING)307 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)308 self._run_dispatcher() # verify309 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)310 self._run_dispatcher() # job311 self._finish_job(queue_entry)312 def test_pre_job_cleanup(self):313 queue_entry = self._setup_for_pre_job_cleanup()314 self._run_pre_job_cleanup_job(queue_entry)315 def _run_pre_job_cleanup_one_failure(self):316 queue_entry = self._setup_for_pre_job_cleanup()317 self._run_dispatcher() # cleanup318 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,319 exit_status=256)320 self._run_dispatcher() # repair321 self._check_statuses(queue_entry, HqeStatus.QUEUED,322 HostStatus.REPAIRING)323 self.mock_drone_manager.finish_process(_PidfileType.REPAIR)324 return queue_entry325 def test_pre_job_cleanup_failure(self):326 queue_entry = self._run_pre_job_cleanup_one_failure()327 # from here the job should run as normal328 self._run_pre_job_cleanup_job(queue_entry)329 def test_pre_job_cleanup_double_failure(self):330 # TODO (showard): this test isn't perfect. in reality, when the second331 # cleanup fails, it copies its results over to the job directory using332 # copy_results_on_drone() and then parses them. since we don't handle333 # that, there appear to be no results at the job directory. the334 # scheduler handles this gracefully, parsing gets effectively skipped,335 # and this test passes as is. but we ought to properly test that336 # behavior.337 queue_entry = self._run_pre_job_cleanup_one_failure()338 self._run_dispatcher() # second cleanup339 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,340 exit_status=256)341 self._run_dispatcher()342 self._check_statuses(queue_entry, HqeStatus.FAILED,343 HostStatus.REPAIR_FAILED)344 # nothing else should run345 self._assert_nothing_is_running()346 def _assert_nothing_is_running(self):347 self.assertEquals(self.mock_drone_manager.running_pidfile_ids(), [])348 def _setup_for_post_job_cleanup(self):349 self._initialize_test()350 job, queue_entry = self._make_job_and_queue_entry()351 job.reboot_after = model_attributes.RebootAfter.ALWAYS352 job.save()353 return queue_entry354 def _run_post_job_cleanup_failure_up_to_repair(self, queue_entry,355 include_verify=True):356 if include_verify:357 self._run_pre_job_verify(queue_entry)358 self._run_dispatcher() # job359 self.mock_drone_manager.finish_process(_PidfileType.JOB)360 self._run_dispatcher() # parsing + cleanup361 self.mock_drone_manager.finish_process(_PidfileType.PARSE)362 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,363 exit_status=256)364 self._run_dispatcher() # repair, HQE unaffected365 self.mock_drone_manager.finish_process(_PidfileType.ARCHIVE)366 self._run_dispatcher()367 return queue_entry368 def test_post_job_cleanup_failure(self):369 queue_entry = self._setup_for_post_job_cleanup()370 self._run_post_job_cleanup_failure_up_to_repair(queue_entry)371 self._check_statuses(queue_entry, HqeStatus.COMPLETED,372 HostStatus.REPAIRING)373 self.mock_drone_manager.finish_process(_PidfileType.REPAIR)374 self._run_dispatcher()375 self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)376 def test_post_job_cleanup_failure_repair_failure(self):377 queue_entry = self._setup_for_post_job_cleanup()378 self._run_post_job_cleanup_failure_up_to_repair(queue_entry)379 self.mock_drone_manager.finish_process(_PidfileType.REPAIR,380 exit_status=256)381 self._run_dispatcher()382 self._check_statuses(queue_entry, HqeStatus.COMPLETED,383 HostStatus.REPAIR_FAILED)384 def _ensure_post_job_process_is_paired(self, queue_entry, pidfile_type):385 pidfile_name = _PIDFILE_TYPE_TO_PIDFILE[pidfile_type]386 queue_entry = self._update_instance(queue_entry)387 pidfile_id = self.mock_drone_manager.pidfile_from_path(388 queue_entry.execution_path(), pidfile_name)389 self.assert_(pidfile_id._paired_with_pidfile)390 def _finish_job(self, queue_entry):391 self.mock_drone_manager.finish_process(_PidfileType.JOB)392 self._run_dispatcher() # launches parsing + cleanup393 self._check_statuses(queue_entry, HqeStatus.PARSING,394 HostStatus.CLEANING)395 self._ensure_post_job_process_is_paired(queue_entry, _PidfileType.PARSE)396 self._finish_parsing_and_cleanup(queue_entry)397 def _finish_parsing_and_cleanup(self, queue_entry):398 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)399 self.mock_drone_manager.finish_process(_PidfileType.PARSE)400 self._run_dispatcher()401 self._check_entry_status(queue_entry, HqeStatus.ARCHIVING)402 self.mock_drone_manager.finish_process(_PidfileType.ARCHIVE)403 self._run_dispatcher()404 def _create_reverify_request(self):405 host = self.hosts[0]406 models.SpecialTask.schedule_special_task(407 host=host, task=models.SpecialTask.Task.VERIFY)408 return host409 def test_requested_reverify(self):410 host = self._create_reverify_request()411 self._run_dispatcher()412 self._check_host_status(host, HostStatus.VERIFYING)413 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)414 self._run_dispatcher()415 self._check_host_status(host, HostStatus.READY)416 def test_requested_reverify_failure(self):417 host = self._create_reverify_request()418 self._run_dispatcher()419 self.mock_drone_manager.finish_process(_PidfileType.VERIFY,420 exit_status=256)421 self._run_dispatcher() # repair422 self._check_host_status(host, HostStatus.REPAIRING)423 self.mock_drone_manager.finish_process(_PidfileType.REPAIR)424 self._run_dispatcher()425 self._check_host_status(host, HostStatus.READY)426 def _setup_for_do_not_verify(self):427 self._initialize_test()428 job, queue_entry = self._make_job_and_queue_entry()429 queue_entry.host.protection = host_protections.Protection.DO_NOT_VERIFY430 queue_entry.host.save()431 return queue_entry432 def test_do_not_verify_job(self):433 queue_entry = self._setup_for_do_not_verify()434 self._run_dispatcher() # runs job directly435 self._finish_job(queue_entry)436 def test_do_not_verify_job_with_cleanup(self):437 queue_entry = self._setup_for_do_not_verify()438 queue_entry.job.reboot_before = model_attributes.RebootBefore.ALWAYS439 queue_entry.job.save()440 self._run_dispatcher() # cleanup441 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)442 self._run_dispatcher() # job443 self._finish_job(queue_entry)444 def test_do_not_verify_pre_job_cleanup_failure(self):445 queue_entry = self._setup_for_do_not_verify()446 queue_entry.job.reboot_before = model_attributes.RebootBefore.ALWAYS447 queue_entry.job.save()448 self._run_dispatcher() # cleanup449 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,450 exit_status=256)451 self._run_dispatcher() # failure ignored; job runs452 self._finish_job(queue_entry)453 def test_do_not_verify_post_job_cleanup_failure(self):454 queue_entry = self._setup_for_do_not_verify()455 self._run_post_job_cleanup_failure_up_to_repair(queue_entry,456 include_verify=False)457 # failure ignored, host still set to Ready458 self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)459 self._run_dispatcher() # nothing else runs460 self._assert_nothing_is_running()461 def test_do_not_verify_requested_reverify_failure(self):462 host = self._create_reverify_request()463 host.protection = host_protections.Protection.DO_NOT_VERIFY464 host.save()465 self._run_dispatcher()466 self.mock_drone_manager.finish_process(_PidfileType.VERIFY,467 exit_status=256)468 self._run_dispatcher()469 self._check_host_status(host, HostStatus.READY) # ignore failure470 self._assert_nothing_is_running()471 def test_job_abort_in_verify(self):472 self._initialize_test()473 job = self._create_job(hosts=[1])474 self._run_dispatcher() # launches verify475 job.hostqueueentry_set.update(aborted=True)476 self._run_dispatcher() # kills verify, launches cleanup477 self.assert_(self.mock_drone_manager.was_last_process_killed(478 _PidfileType.VERIFY))479 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)480 self._run_dispatcher()481 def test_job_abort(self):482 self._initialize_test()483 job = self._create_job(hosts=[1])484 job.run_verify = False485 job.save()486 self._run_dispatcher() # launches job487 job.hostqueueentry_set.update(aborted=True)488 self._run_dispatcher() # kills job, launches gathering489 self.assert_(self.mock_drone_manager.was_last_process_killed(490 _PidfileType.JOB))491 self.mock_drone_manager.finish_process(_PidfileType.GATHER)492 self._run_dispatcher() # launches parsing + cleanup493 queue_entry = job.hostqueueentry_set.all()[0]494 self._finish_parsing_and_cleanup(queue_entry)495 def test_job_abort_queued_synchronous(self):496 self._initialize_test()497 job = self._create_job(hosts=[1, 2])498 job.synch_count = 2499 job.save()500 job.hostqueueentry_set.update(aborted=True)501 self._run_dispatcher()502 for host_queue_entry in job.hostqueueentry_set.all():503 self.assertEqual(host_queue_entry.status,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!