Best Python code snippet using autotest_python
monitor_db_unittest.py
Source:monitor_db_unittest.py
...604 self.assert_(isinstance(agent, monitor_db.Agent))605 self.assert_(agent.task)606 return agent.task607 def test_run_if_ready_delays(self):608 # Also tests Job.run_with_ready_delay() on atomic group jobs.609 django_job = self._create_job(hosts=[5, 6], atomic_group=1)610 job = scheduler_models.Job(django_job.id)611 self.assertEqual(1, job.synch_count)612 django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))613 self.assertEqual(2, len(django_hqes))614 self.assertEqual(2, django_hqes[0].atomic_group.max_number_of_machines)615 def set_hqe_status(django_hqe, status):616 django_hqe.status = status617 django_hqe.save()618 scheduler_models.HostQueueEntry(django_hqe.id).host.set_status(status)619 # An initial state, our synch_count is 1620 set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.VERIFYING)621 set_hqe_status(django_hqes[1], models.HostQueueEntry.Status.PENDING)622 # So that we don't depend on the config file value during the test.623 self.assert_(scheduler_config.config624 .secs_to_wait_for_atomic_group_hosts is not None)625 self.god.stub_with(scheduler_config.config,626 'secs_to_wait_for_atomic_group_hosts', 123456)627 # Get the pending one as a scheduler_models.HostQueueEntry object.628 hqe = scheduler_models.HostQueueEntry(django_hqes[1].id)629 self.assert_(not job._delay_ready_task)630 self.assertTrue(job.is_ready())631 # Ready with one pending, one verifying and an atomic group should632 # result in a DelayCallTask to re-check if we're ready a while later.633 job.run_if_ready(hqe)634 self.assertEquals('Waiting', hqe.status)635 self._dispatcher._schedule_delay_tasks()636 self.assertEquals('Pending', hqe.status)637 agent = self._dispatcher._agents[0]638 self.assert_(job._delay_ready_task)639 self.assert_(isinstance(agent, monitor_db.Agent))640 self.assert_(agent.task)641 delay_task = agent.task642 self.assert_(isinstance(delay_task, scheduler_models.DelayedCallTask))643 self.assert_(not delay_task.is_done())644 self.god.stub_function(delay_task, 'abort')645 self.god.stub_function(job, 'run')646 self.god.stub_function(job, '_pending_count')647 self.god.stub_with(job, 'synch_count', 9)648 self.god.stub_function(job, 'request_abort')649 # Test that the DelayedCallTask's callback queued up above does the650 # correct thing and does not call run if there are not enough hosts651 # in pending after the delay.652 job._pending_count.expect_call().and_return(0)653 job.request_abort.expect_call()654 delay_task._callback()655 self.god.check_playback()656 # Test that the DelayedCallTask's callback queued up above does the657 # correct thing and returns the Agent returned by job.run() if658 # there are still enough hosts pending after the delay.659 job.synch_count = 4660 job._pending_count.expect_call().and_return(4)661 job.run.expect_call(hqe)662 delay_task._callback()663 self.god.check_playback()664 job._pending_count.expect_call().and_return(4)665 # Adjust the delay deadline so that enough time has passed.666 job._delay_ready_task.end_time = time.time() - 111111667 job.run.expect_call(hqe)668 # ...the delay_expired condition should cause us to call run()669 self._dispatcher._handle_agents()670 self.god.check_playback()671 delay_task.success = False672 # Adjust the delay deadline back so that enough time has not passed.673 job._delay_ready_task.end_time = time.time() + 111111674 self._dispatcher._handle_agents()675 self.god.check_playback()676 # Now max_number_of_machines HQEs are in pending state. Remaining677 # delay will now be ignored.678 other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)679 self.god.unstub(job, 'run')680 self.god.unstub(job, '_pending_count')681 self.god.unstub(job, 'synch_count')682 self.god.unstub(job, 'request_abort')683 # ...the over_max_threshold test should cause us to call run()684 delay_task.abort.expect_call()685 other_hqe.on_pending()686 self.assertEquals('Starting', other_hqe.status)687 self.assertEquals('Starting', hqe.status)688 self.god.stub_function(job, 'run')689 self.god.unstub(delay_task, 'abort')690 hqe.set_status('Pending')691 other_hqe.set_status('Pending')692 # Now we're not over the max for the atomic group. But all assigned693 # hosts are in pending state. over_max_threshold should make us run().694 hqe.atomic_group.max_number_of_machines += 1695 hqe.atomic_group.save()696 job.run.expect_call(hqe)697 hqe.on_pending()698 self.god.check_playback()699 hqe.atomic_group.max_number_of_machines -= 1700 hqe.atomic_group.save()701 other_hqe = scheduler_models.HostQueueEntry(django_hqes[0].id)702 self.assertTrue(hqe.job is other_hqe.job)703 # DBObject classes should reuse instances so these should be the same.704 self.assertEqual(job, other_hqe.job)705 self.assertEqual(other_hqe.job, hqe.job)706 # Be sure our delay was not lost during the other_hqe construction.707 self.assertEqual(job._delay_ready_task, delay_task)708 self.assert_(job._delay_ready_task)709 self.assertFalse(job._delay_ready_task.is_done())710 self.assertFalse(job._delay_ready_task.aborted)711 # We want the real run() to be called below.712 self.god.unstub(job, 'run')713 # We pass in the other HQE this time the same way it would happen714 # for real when one host finishes verifying and enters pending.715 job.run_if_ready(other_hqe)716 # The delayed task must be aborted by the actual run() call above.717 self.assertTrue(job._delay_ready_task.aborted)718 self.assertFalse(job._delay_ready_task.success)719 self.assertTrue(job._delay_ready_task.is_done())720 # Check that job run() and _finish_run() were called by the above:721 self._dispatcher._schedule_running_host_queue_entries()722 agent = self._dispatcher._agents[0]723 self.assert_(agent.task)724 task = agent.task725 self.assert_(isinstance(task, monitor_db.QueueTask))726 # Requery these hqes in order to verify the status from the DB.727 django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))728 for entry in django_hqes:729 self.assertEqual(models.HostQueueEntry.Status.STARTING,730 entry.status)731 # We're already running, but more calls to run_with_ready_delay can732 # continue to come in due to straggler hosts enter Pending. Make733 # sure we don't do anything.734 self.god.stub_function(job, 'run')735 job.run_with_ready_delay(hqe)736 self.god.check_playback()737 self.god.unstub(job, 'run')738 def test_run_synchronous_atomic_group_ready(self):739 self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)740 self._update_hqe("status='Pending', execution_subdir=''")741 queue_task = self._test_run_helper(expect_starting=True)742 self.assert_(isinstance(queue_task, monitor_db.QueueTask))743 # Atomic group jobs that do not depend on a specific label in the744 # atomic group will use the atomic group name as their group name.745 self.assertEquals(queue_task.queue_entries[0].get_group_name(),746 'atomic1')747 def test_run_synchronous_atomic_group_with_label_ready(self):748 job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)749 job.dependency_labels.add(self.label4)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!