Best Python code snippet using autotest_python
monitor_db_functional_test.py
Source:monitor_db_functional_test.py
...305 def _check_host_status(self, host, status):306 # update from DB307 host = self._update_instance(host)308 self.assertEquals(host.status, status)309 def _run_pre_job_verify(self, queue_entry):310 self._run_dispatcher() # launches verify311 self._check_statuses(queue_entry, HqeStatus.VERIFYING,312 HostStatus.VERIFYING)313 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)314 def test_simple_job(self):315 self._initialize_test()316 job, queue_entry = self._make_job_and_queue_entry()317 self._run_pre_job_verify(queue_entry)318 self._run_dispatcher() # launches job319 self._check_statuses(queue_entry, HqeStatus.RUNNING, HostStatus.RUNNING)320 self._finish_job(queue_entry)321 self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)322 self._assert_nothing_is_running()323 def _setup_for_pre_job_reset(self):324 self._initialize_test()325 job, queue_entry = self._make_job_and_queue_entry()326 job.reboot_before = model_attributes.RebootBefore.ALWAYS327 job.save()328 return queue_entry329 def _run_pre_job_reset_job(self, queue_entry):330 self._run_dispatcher() # reset331 self._check_statuses(queue_entry, HqeStatus.RESETTING,332 HostStatus.RESETTING)333 self.mock_drone_manager.finish_process(_PidfileType.RESET)334 self._run_dispatcher() # job335 self._finish_job(queue_entry)336 def test_pre_job_reset(self):337 queue_entry = self._setup_for_pre_job_reset()338 self._run_pre_job_reset_job(queue_entry)339 def _run_pre_job_reset_one_failure(self):340 queue_entry = self._setup_for_pre_job_reset()341 self._run_dispatcher() # reset342 self.mock_drone_manager.finish_process(_PidfileType.RESET,343 exit_status=256)344 self._run_dispatcher() # repair345 self._check_statuses(queue_entry, HqeStatus.QUEUED,346 HostStatus.REPAIRING)347 self.mock_drone_manager.finish_process(_PidfileType.REPAIR)348 return queue_entry349 def test_pre_job_reset_failure(self):350 queue_entry = self._run_pre_job_reset_one_failure()351 # from here the job should run as normal352 self._run_pre_job_reset_job(queue_entry)353 def test_pre_job_reset_double_failure(self):354 # TODO (showard): this test isn't perfect. in reality, when the second355 # reset fails, it copies its results over to the job directory using356 # copy_results_on_drone() and then parses them. since we don't handle357 # that, there appear to be no results at the job directory. the358 # scheduler handles this gracefully, parsing gets effectively skipped,359 # and this test passes as is. but we ought to properly test that360 # behavior.361 queue_entry = self._run_pre_job_reset_one_failure()362 self._run_dispatcher() # second reset363 self.mock_drone_manager.finish_process(_PidfileType.RESET,364 exit_status=256)365 self._run_dispatcher()366 self._check_statuses(queue_entry, HqeStatus.FAILED,367 HostStatus.REPAIR_FAILED)368 # nothing else should run369 self._assert_nothing_is_running()370 def _assert_nothing_is_running(self):371 self.assertEquals(self.mock_drone_manager.running_pidfile_ids(), [])372 def _setup_for_post_job_cleanup(self):373 self._initialize_test()374 job, queue_entry = self._make_job_and_queue_entry()375 job.reboot_after = model_attributes.RebootAfter.ALWAYS376 job.save()377 return queue_entry378 def _run_post_job_cleanup_failure_up_to_repair(self, queue_entry,379 include_verify=True):380 if include_verify:381 self._run_pre_job_verify(queue_entry)382 self._run_dispatcher() # job383 self.mock_drone_manager.finish_process(_PidfileType.JOB)384 self._run_dispatcher() # parsing + cleanup385 self.mock_drone_manager.finish_process(_PidfileType.PARSE)386 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,387 exit_status=256)388 self._run_dispatcher() # repair, HQE unaffected389 return queue_entry390 def test_post_job_cleanup_failure(self):391 queue_entry = self._setup_for_post_job_cleanup()392 self._run_post_job_cleanup_failure_up_to_repair(queue_entry)393 self._check_statuses(queue_entry, HqeStatus.COMPLETED,394 HostStatus.REPAIRING)395 self.mock_drone_manager.finish_process(_PidfileType.REPAIR)396 self._run_dispatcher()397 self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)398 def test_post_job_cleanup_failure_repair_failure(self):399 queue_entry = self._setup_for_post_job_cleanup()400 self._run_post_job_cleanup_failure_up_to_repair(queue_entry)401 self.mock_drone_manager.finish_process(_PidfileType.REPAIR,402 exit_status=256)403 self._run_dispatcher()404 self._check_statuses(queue_entry, HqeStatus.COMPLETED,405 HostStatus.REPAIR_FAILED)406 def _ensure_post_job_process_is_paired(self, queue_entry, pidfile_type):407 pidfile_name = _PIDFILE_TYPE_TO_PIDFILE[pidfile_type]408 queue_entry = self._update_instance(queue_entry)409 pidfile_id = self.mock_drone_manager.pidfile_from_path(410 queue_entry.execution_path(), pidfile_name)411 self.assert_(pidfile_id._paired_with_pidfile)412 def _finish_job(self, queue_entry):413 self._check_statuses(queue_entry, HqeStatus.RUNNING)414 self.mock_drone_manager.finish_process(_PidfileType.JOB)415 self._run_dispatcher() # launches parsing416 self._check_statuses(queue_entry, HqeStatus.PARSING)417 self._ensure_post_job_process_is_paired(queue_entry, _PidfileType.PARSE)418 self._finish_parsing()419 def _finish_parsing(self):420 self.mock_drone_manager.finish_process(_PidfileType.PARSE)421 self._run_dispatcher()422 def _create_reverify_request(self):423 host = self.hosts[0]424 models.SpecialTask.schedule_special_task(425 host=host, task=models.SpecialTask.Task.VERIFY)426 return host427 def test_requested_reverify(self):428 host = self._create_reverify_request()429 self._run_dispatcher()430 self._check_host_status(host, HostStatus.VERIFYING)431 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)432 self._run_dispatcher()433 self._check_host_status(host, HostStatus.READY)434 def test_requested_reverify_failure(self):435 host = self._create_reverify_request()436 self._run_dispatcher()437 self.mock_drone_manager.finish_process(_PidfileType.VERIFY,438 exit_status=256)439 self._run_dispatcher() # repair440 self._check_host_status(host, HostStatus.REPAIRING)441 self.mock_drone_manager.finish_process(_PidfileType.REPAIR)442 self._run_dispatcher()443 self._check_host_status(host, HostStatus.READY)444 def _setup_for_do_not_verify(self):445 self._initialize_test()446 job, queue_entry = self._make_job_and_queue_entry()447 queue_entry.host.protection = host_protections.Protection.DO_NOT_VERIFY448 queue_entry.host.save()449 return queue_entry450 def test_do_not_verify_job(self):451 queue_entry = self._setup_for_do_not_verify()452 self._run_dispatcher() # runs job directly453 self._finish_job(queue_entry)454 def test_do_not_verify_job_with_cleanup(self):455 queue_entry = self._setup_for_do_not_verify()456 queue_entry.job.reboot_before = model_attributes.RebootBefore.ALWAYS457 queue_entry.job.save()458 self._run_dispatcher() # cleanup459 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)460 self._run_dispatcher() # job461 self._finish_job(queue_entry)462 def test_do_not_verify_pre_job_cleanup_failure(self):463 queue_entry = self._setup_for_do_not_verify()464 queue_entry.job.reboot_before = model_attributes.RebootBefore.ALWAYS465 queue_entry.job.save()466 self._run_dispatcher() # cleanup467 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,468 exit_status=256)469 self._run_dispatcher() # failure ignored; job runs470 self._finish_job(queue_entry)471 def test_do_not_verify_post_job_cleanup_failure(self):472 queue_entry = self._setup_for_do_not_verify()473 queue_entry.job.reboot_after = model_attributes.RebootAfter.ALWAYS474 queue_entry.job.save()475 self._run_post_job_cleanup_failure_up_to_repair(queue_entry,476 include_verify=False)477 # failure ignored, host still set to Ready478 self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)479 self._run_dispatcher() # nothing else runs480 self._assert_nothing_is_running()481 def test_do_not_verify_requested_reverify_failure(self):482 host = self._create_reverify_request()483 host.protection = host_protections.Protection.DO_NOT_VERIFY484 host.save()485 self._run_dispatcher()486 self.mock_drone_manager.finish_process(_PidfileType.VERIFY,487 exit_status=256)488 self._run_dispatcher()489 self._check_host_status(host, HostStatus.READY) # ignore failure490 self._assert_nothing_is_running()491 def test_job_abort_in_verify(self):492 self._initialize_test()493 job = self._create_job(hosts=[1])494 queue_entries = list(job.hostqueueentry_set.all())495 self._run_dispatcher() # launches verify496 self._check_statuses(queue_entries[0], HqeStatus.VERIFYING)497 job.hostqueueentry_set.update(aborted=True)498 self._run_dispatcher() # kills verify, launches cleanup499 self.assert_(self.mock_drone_manager.was_last_process_killed(500 _PidfileType.VERIFY, set([signal.SIGKILL])))501 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)502 self._run_dispatcher()503 def test_job_abort(self):504 self._initialize_test()505 job = self._create_job(hosts=[1])506 job.run_reset = False507 job.save()508 queue_entries = list(job.hostqueueentry_set.all())509 self._run_dispatcher() # launches job510 self._check_statuses(queue_entries[0], HqeStatus.RUNNING)511 job.hostqueueentry_set.update(aborted=True)512 self._run_dispatcher() # kills job, launches gathering513 self._check_statuses(queue_entries[0], HqeStatus.GATHERING)514 self.mock_drone_manager.finish_process(_PidfileType.GATHER)515 self._run_dispatcher() # launches parsing + cleanup516 queue_entry = job.hostqueueentry_set.all()[0]517 self._finish_parsing()518 # The abort will cause gathering to launch a cleanup.519 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)520 self._run_dispatcher()521 self.mock_drone_manager.finish_process(_PidfileType.RESET)522 self._run_dispatcher()523 def test_job_abort_queued_synchronous(self):524 self._initialize_test()525 job = self._create_job(hosts=[1,2])526 job.synch_count = 2527 job.save()528 job.hostqueueentry_set.update(aborted=True)529 self._run_dispatcher()530 for host_queue_entry in job.hostqueueentry_set.all():531 self.assertEqual(host_queue_entry.status,532 HqeStatus.ABORTED)533 def test_no_pidfile_leaking(self):534 self._initialize_test()535 self.test_simple_job()536 self.mock_drone_manager.refresh()537 self.assertEquals(self.mock_drone_manager._pidfiles, {})538 self.test_job_abort_in_verify()539 self.mock_drone_manager.refresh()540 self.assertEquals(self.mock_drone_manager._pidfiles, {})541 self.test_job_abort()542 self.mock_drone_manager.refresh()543 self.assertEquals(self.mock_drone_manager._pidfiles, {})544 def _make_job_and_queue_entry(self):545 job = self._create_job(hosts=[1])546 queue_entry = job.hostqueueentry_set.all()[0]547 return job, queue_entry548 def test_recover_running_no_process(self):549 # recovery should re-execute a Running HQE if no process is found550 _, queue_entry = self._make_job_and_queue_entry()551 queue_entry.status = HqeStatus.RUNNING552 queue_entry.execution_subdir = '1-myuser/host1'553 queue_entry.save()554 queue_entry.host.status = HostStatus.RUNNING555 queue_entry.host.save()556 self._initialize_test()557 self._run_dispatcher()558 self._finish_job(queue_entry)559 def test_recover_verifying_hqe_no_special_task(self):560 # recovery should move a Resetting HQE with no corresponding561 # Verify or Reset SpecialTask back to Queued.562 _, queue_entry = self._make_job_and_queue_entry()563 queue_entry.status = HqeStatus.RESETTING564 queue_entry.save()565 # make some dummy SpecialTasks that shouldn't count566 models.SpecialTask.objects.create(567 host=queue_entry.host,568 task=models.SpecialTask.Task.RESET,569 requested_by=models.User.current_user())570 models.SpecialTask.objects.create(571 host=queue_entry.host,572 task=models.SpecialTask.Task.CLEANUP,573 queue_entry=queue_entry,574 is_complete=True,575 requested_by=models.User.current_user())576 self._initialize_test()577 self._check_statuses(queue_entry, HqeStatus.QUEUED)578 def _test_recover_verifying_hqe_helper(self, task, pidfile_type):579 _, queue_entry = self._make_job_and_queue_entry()580 queue_entry.status = HqeStatus.VERIFYING581 queue_entry.save()582 special_task = models.SpecialTask.objects.create(583 host=queue_entry.host, task=task, queue_entry=queue_entry)584 self._initialize_test()585 self._run_dispatcher()586 self.mock_drone_manager.finish_process(pidfile_type)587 self._run_dispatcher()588 # don't bother checking the rest of the job execution, as long as the589 # SpecialTask ran590 def test_recover_verifying_hqe_with_cleanup(self):591 # recover an HQE that was in pre-job cleanup592 self._test_recover_verifying_hqe_helper(models.SpecialTask.Task.CLEANUP,593 _PidfileType.CLEANUP)594 def test_recover_verifying_hqe_with_verify(self):595 # recover an HQE that was in pre-job verify596 self._test_recover_verifying_hqe_helper(models.SpecialTask.Task.VERIFY,597 _PidfileType.VERIFY)598 def test_recover_parsing(self):599 self._initialize_test()600 job, queue_entry = self._make_job_and_queue_entry()601 job.run_verify = False602 job.run_reset = False603 job.reboot_after = model_attributes.RebootAfter.NEVER604 job.save()605 self._run_dispatcher() # launches job606 self.mock_drone_manager.finish_process(_PidfileType.JOB)607 self._run_dispatcher() # launches parsing608 # now "restart" the scheduler609 self._create_dispatcher()610 self._initialize_test()611 self._run_dispatcher()612 self.mock_drone_manager.finish_process(_PidfileType.PARSE)613 self._run_dispatcher()614 def test_recover_parsing__no_process_already_aborted(self):615 _, queue_entry = self._make_job_and_queue_entry()616 queue_entry.execution_subdir = 'host1'617 queue_entry.status = HqeStatus.PARSING618 queue_entry.aborted = True619 queue_entry.save()620 self._initialize_test()621 self._run_dispatcher()622 def test_job_scheduled_just_after_abort(self):623 # test a pretty obscure corner case where a job is aborted while queued,624 # another job is ready to run, and throttling is active. the post-abort625 # cleanup must not be pre-empted by the second job.626 # This test kind of doesn't make sense anymore after verify+cleanup627 # were merged into reset. It should maybe just be removed.628 job1, queue_entry1 = self._make_job_and_queue_entry()629 queue_entry1.save()630 job2, queue_entry2 = self._make_job_and_queue_entry()631 job2.reboot_before = model_attributes.RebootBefore.IF_DIRTY632 job2.save()633 self.mock_drone_manager.process_capacity = 0634 self._run_dispatcher() # schedule job1, but won't start verify635 job1.hostqueueentry_set.update(aborted=True)636 self.mock_drone_manager.process_capacity = 100637 self._run_dispatcher() # reset must run here, not verify for job2638 self._check_statuses(queue_entry1, HqeStatus.ABORTED,639 HostStatus.RESETTING)640 self.mock_drone_manager.finish_process(_PidfileType.RESET)641 self._run_dispatcher() # now verify starts for job2642 self._check_statuses(queue_entry2, HqeStatus.RUNNING,643 HostStatus.RUNNING)644 def test_reverify_interrupting_pre_job(self):645 # ensure things behave sanely if a reverify is scheduled in the middle646 # of pre-job actions647 _, queue_entry = self._make_job_and_queue_entry()648 self._run_dispatcher() # pre-job verify649 self._create_reverify_request()650 self.mock_drone_manager.finish_process(_PidfileType.VERIFY,651 exit_status=256)652 self._run_dispatcher() # repair653 self.mock_drone_manager.finish_process(_PidfileType.REPAIR)654 self._run_dispatcher() # reverify runs now655 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)656 self._run_dispatcher() # pre-job verify657 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)658 self._run_dispatcher() # and job runs...659 self._check_statuses(queue_entry, HqeStatus.RUNNING, HostStatus.RUNNING)660 self._finish_job(queue_entry) # reverify has been deleted661 self._check_statuses(queue_entry, HqeStatus.COMPLETED,662 HostStatus.READY)663 self._assert_nothing_is_running()664 def test_reverify_while_job_running(self):665 # once a job is running, a reverify must not be allowed to preempt666 # Gathering667 _, queue_entry = self._make_job_and_queue_entry()668 self._run_pre_job_verify(queue_entry)669 self._run_dispatcher() # job runs670 self._create_reverify_request()671 # make job end with a signal, so gathering will run672 self.mock_drone_manager.finish_process(_PidfileType.JOB,673 exit_status=271)674 self._run_dispatcher() # gathering must start675 self.mock_drone_manager.finish_process(_PidfileType.GATHER)676 self._run_dispatcher() # parsing and cleanup677 self._finish_parsing()678 self._run_dispatcher() # now reverify runs679 self._check_statuses(queue_entry, HqeStatus.FAILED,680 HostStatus.VERIFYING)681 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)682 self._run_dispatcher()683 self._check_host_status(queue_entry.host, HostStatus.READY)684 def test_reverify_while_host_pending(self):685 # ensure that if a reverify is scheduled while a host is in Pending, it686 # won't run until the host is actually free687 job = self._create_job(hosts=[1,2])688 queue_entry = job.hostqueueentry_set.get(host__hostname='host1')689 job.synch_count = 2690 job.save()691 host2 = self.hosts[1]692 host2.locked = True693 host2.save()694 self._run_dispatcher() # verify host1695 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)696 self._run_dispatcher() # host1 Pending697 self._check_statuses(queue_entry, HqeStatus.PENDING, HostStatus.PENDING)698 self._create_reverify_request()699 self._run_dispatcher() # nothing should happen here700 self._check_statuses(queue_entry, HqeStatus.PENDING, HostStatus.PENDING)701 # now let the job run702 host2.locked = False703 host2.save()704 self._run_dispatcher() # verify host2705 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)706 self._run_dispatcher() # run job707 self._finish_job(queue_entry)708 # the reverify should now be running709 self._check_statuses(queue_entry, HqeStatus.COMPLETED,710 HostStatus.VERIFYING)711 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)712 self._run_dispatcher()713 self._check_host_status(queue_entry.host, HostStatus.READY)714 def test_throttling(self):715 job = self._create_job(hosts=[1,2,3])716 job.synch_count = 3717 job.save()718 queue_entries = list(job.hostqueueentry_set.all())719 def _check_hqe_statuses(*statuses):720 for queue_entry, status in zip(queue_entries, statuses):721 self._check_statuses(queue_entry, status)722 self.mock_drone_manager.process_capacity = 2723 self._run_dispatcher() # verify runs on 1 and 2724 queue_entries = list(job.hostqueueentry_set.all())725 _check_hqe_statuses(HqeStatus.QUEUED,726 HqeStatus.VERIFYING, HqeStatus.VERIFYING)727 self.assertEquals(len(self.mock_drone_manager.running_pidfile_ids()), 2)728 self.mock_drone_manager.finish_specific_process(729 'hosts/host3/1-verify', drone_manager.AUTOSERV_PID_FILE)730 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)731 self._run_dispatcher() # verify runs on 3732 _check_hqe_statuses(HqeStatus.VERIFYING, HqeStatus.PENDING,733 HqeStatus.PENDING)734 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)735 self._run_dispatcher() # job won't run due to throttling736 _check_hqe_statuses(HqeStatus.STARTING, HqeStatus.STARTING,737 HqeStatus.STARTING)738 self._assert_nothing_is_running()739 self.mock_drone_manager.process_capacity = 3740 self._run_dispatcher() # now job runs741 _check_hqe_statuses(HqeStatus.RUNNING, HqeStatus.RUNNING,742 HqeStatus.RUNNING)743 self.mock_drone_manager.process_capacity = 2744 self.mock_drone_manager.finish_process(_PidfileType.JOB,745 exit_status=271)746 self._run_dispatcher() # gathering won't run due to throttling747 _check_hqe_statuses(HqeStatus.GATHERING, HqeStatus.GATHERING,748 HqeStatus.GATHERING)749 self._assert_nothing_is_running()750 self.mock_drone_manager.process_capacity = 3751 self._run_dispatcher() # now gathering runs752 self.mock_drone_manager.process_capacity = 0753 self.mock_drone_manager.finish_process(_PidfileType.GATHER)754 self._run_dispatcher() # parsing runs despite throttling755 _check_hqe_statuses(HqeStatus.PARSING, HqeStatus.PARSING,756 HqeStatus.PARSING)757 def test_abort_starting_while_throttling(self):758 self._initialize_test()759 job = self._create_job(hosts=[1,2], synchronous=True)760 queue_entry = job.hostqueueentry_set.all()[0]761 job.run_verify = False762 job.run_reset = False763 job.reboot_after = model_attributes.RebootAfter.NEVER764 job.save()765 self.mock_drone_manager.process_capacity = 0766 self._run_dispatcher() # go to starting, but don't start job767 self._check_statuses(queue_entry, HqeStatus.STARTING,768 HostStatus.PENDING)769 job.hostqueueentry_set.update(aborted=True)770 self._run_dispatcher()771 self._check_statuses(queue_entry, HqeStatus.GATHERING,772 HostStatus.RUNNING)773 self.mock_drone_manager.process_capacity = 5774 self._run_dispatcher()775 self._check_statuses(queue_entry, HqeStatus.ABORTED,776 HostStatus.CLEANING)777 def test_simple_metahost_assignment(self):778 job = self._create_job(metahosts=[1])779 self._run_dispatcher()780 entry = job.hostqueueentry_set.all()[0]781 self.assertEquals(entry.host.hostname, 'host1')782 self._check_statuses(entry, HqeStatus.VERIFYING, HostStatus.VERIFYING)783 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)784 self._run_dispatcher()785 self._check_statuses(entry, HqeStatus.RUNNING, HostStatus.RUNNING)786 # rest of job proceeds normally787 def test_metahost_fail_verify(self):788 self.hosts[1].labels.add(self.labels[0]) # put label1 also on host2789 job = self._create_job(metahosts=[1])790 self._run_dispatcher() # assigned to host1791 self.mock_drone_manager.finish_process(_PidfileType.VERIFY,792 exit_status=256)793 self._run_dispatcher() # host1 failed, gets reassigned to host2794 entry = job.hostqueueentry_set.all()[0]795 self.assertEquals(entry.host.hostname, 'host2')796 self._check_statuses(entry, HqeStatus.VERIFYING, HostStatus.VERIFYING)797 self._check_host_status(self.hosts[0], HostStatus.REPAIRING)798 self.mock_drone_manager.finish_process(_PidfileType.VERIFY)799 self._run_dispatcher()800 self._check_statuses(entry, HqeStatus.RUNNING, HostStatus.RUNNING)801 def test_hostless_job(self):802 job = self._create_job(hostless=True)803 entry = job.hostqueueentry_set.all()[0]804 self._run_dispatcher()805 self._check_entry_status(entry, HqeStatus.RUNNING)806 self.mock_drone_manager.finish_process(_PidfileType.JOB)807 self._run_dispatcher()808 self._check_entry_status(entry, HqeStatus.PARSING)809 self.mock_drone_manager.finish_process(_PidfileType.PARSE)810 self._run_dispatcher()811 self._check_entry_status(entry, HqeStatus.COMPLETED)812 def test_pre_job_keyvals(self):813 job = self._create_job(hosts=[1])814 job.run_verify = False815 job.run_reset = False816 job.reboot_before = model_attributes.RebootBefore.NEVER817 job.save()818 models.JobKeyval.objects.create(job=job, key='mykey', value='myvalue')819 self._run_dispatcher()820 self._finish_job(job.hostqueueentry_set.all()[0])821 attached_files = self.mock_drone_manager.attached_files(822 '1-autotest_system/host1')823 job_keyval_path = '1-autotest_system/host1/keyval'824 self.assert_(job_keyval_path in attached_files, attached_files)825 keyval_contents = attached_files[job_keyval_path]826 keyval_dict = dict(line.strip().split('=', 1)827 for line in keyval_contents.splitlines())828 self.assert_('job_queued' in keyval_dict, keyval_dict)829 self.assertEquals(keyval_dict['mykey'], 'myvalue')830# This tests the scheduler functions with archiving step disabled831class SchedulerFunctionalTestNoArchiving(SchedulerFunctionalTest):832 def _set_global_config_values(self):833 super(SchedulerFunctionalTestNoArchiving, self834 )._set_global_config_values()835 self.mock_config.set_config_value('SCHEDULER', 'enable_archiving',836 False)837 def _finish_parsing(self):838 self.mock_drone_manager.finish_process(_PidfileType.PARSE)839 self._run_dispatcher()840 def _run_post_job_cleanup_failure_up_to_repair(self, queue_entry,841 include_verify=True):842 if include_verify:843 self._run_pre_job_verify(queue_entry)844 self._run_dispatcher() # job845 self.mock_drone_manager.finish_process(_PidfileType.JOB)846 self._run_dispatcher() # parsing + cleanup847 self.mock_drone_manager.finish_process(_PidfileType.PARSE)848 self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,849 exit_status=256)850 self._run_dispatcher() # repair, HQE unaffected851 return queue_entry852 def test_hostless_job(self):853 job = self._create_job(hostless=True)854 entry = job.hostqueueentry_set.all()[0]855 self._run_dispatcher()856 self._check_entry_status(entry, HqeStatus.RUNNING)857 self.mock_drone_manager.finish_process(_PidfileType.JOB)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!