Best Python code snippet using autotest_python
scheduler_models_unittest.py
Source:scheduler_models_unittest.py
...32 self._set_monitor_stubs()33 def tearDown(self):34 self._database.disconnect()35 self._frontend_common_teardown()36 def _update_hqe(self, set, where=''):37 query = 'UPDATE afe_host_queue_entries SET ' + set38 if where:39 query += ' WHERE ' + where40 self._do_query(query)41class DelayedCallTaskTest(unittest.TestCase):42 def setUp(self):43 self.god = mock.mock_god()44 def tearDown(self):45 self.god.unstub_all()46 def test_delayed_call(self):47 test_time = self.god.create_mock_function('time')48 test_time.expect_call().and_return(33)49 test_time.expect_call().and_return(34.01)50 test_time.expect_call().and_return(34.99)51 test_time.expect_call().and_return(35.01)52 def test_callback():53 test_callback.calls += 154 test_callback.calls = 055 delay_task = scheduler_models.DelayedCallTask(56 delay_seconds=2, callback=test_callback,57 now_func=test_time) # time 3358 self.assertEqual(35, delay_task.end_time)59 delay_task.poll() # activates the task and polls it once, time 34.0160 self.assertEqual(0, test_callback.calls, "callback called early")61 delay_task.poll() # time 34.9962 self.assertEqual(0, test_callback.calls, "callback called early")63 delay_task.poll() # time 35.0164 self.assertEqual(1, test_callback.calls)65 self.assert_(delay_task.is_done())66 self.assert_(delay_task.success)67 self.assert_(not delay_task.aborted)68 self.god.check_playback()69 def test_delayed_call_abort(self):70 delay_task = scheduler_models.DelayedCallTask(71 delay_seconds=987654, callback=lambda: None)72 delay_task.abort()73 self.assert_(delay_task.aborted)74 self.assert_(delay_task.is_done())75 self.assert_(not delay_task.success)76 self.god.check_playback()77class DBObjectTest(BaseSchedulerModelsTest):78 def test_compare_fields_in_row(self):79 host = scheduler_models.Host(id=1)80 fields = list(host._fields)81 row_data = [getattr(host, fieldname) for fieldname in fields]82 self.assertEqual({}, host._compare_fields_in_row(row_data))83 row_data[fields.index('hostname')] = 'spam'84 self.assertEqual({'hostname': ('host1', 'spam')},85 host._compare_fields_in_row(row_data))86 row_data[fields.index('id')] = 2387 self.assertEqual({'hostname': ('host1', 'spam'), 'id': (1, 23)},88 host._compare_fields_in_row(row_data))89 def test_compare_fields_in_row_datetime_ignores_microseconds(self):90 datetime_with_us = datetime.datetime(2009, 10, 07, 12, 34, 56, 7890)91 datetime_without_us = datetime.datetime(2009, 10, 07, 12, 34, 56, 0)92 class TestTable(scheduler_models.DBObject):93 _table_name = 'test_table'94 _fields = ('id', 'test_datetime')95 tt = TestTable(row=[1, datetime_without_us])96 self.assertEqual({}, tt._compare_fields_in_row([1, datetime_with_us]))97 def test_always_query(self):98 host_a = scheduler_models.Host(id=2)99 self.assertEqual(host_a.hostname, 'host2')100 self._do_query('UPDATE afe_hosts SET hostname="host2-updated" '101 'WHERE id=2')102 host_b = scheduler_models.Host(id=2, always_query=True)103 self.assert_(host_a is host_b, 'Cached instance not returned.')104 self.assertEqual(host_a.hostname, 'host2-updated',105 'Database was not re-queried')106 # If either of these are called, a query was made when it shouldn't be.107 host_a._compare_fields_in_row = lambda _: self.fail('eek! a query!')108 host_a._update_fields_from_row = host_a._compare_fields_in_row109 host_c = scheduler_models.Host(id=2, always_query=False)110 self.assert_(host_a is host_c, 'Cached instance not returned')111 def test_delete(self):112 host = scheduler_models.Host(id=3)113 host.delete()114 host = self.assertRaises(scheduler_models.DBError, scheduler_models.Host, id=3,115 always_query=False)116 host = self.assertRaises(scheduler_models.DBError, scheduler_models.Host, id=3,117 always_query=True)118 def test_save(self):119 # Dummy Job to avoid creating a one in the HostQueueEntry __init__.120 class MockJob(object):121 def __init__(self, id):122 pass123 def tag(self):124 return 'MockJob'125 self.god.stub_with(scheduler_models, 'Job', MockJob)126 hqe = scheduler_models.HostQueueEntry(127 new_record=True,128 row=[0, 1, 2, 'rhel6', 'Queued', None, 0, 0, 0, '.', None, False, None])129 hqe.save()130 new_id = hqe.id131 # Force a re-query and verify that the correct data was stored.132 scheduler_models.DBObject._clear_instance_cache()133 hqe = scheduler_models.HostQueueEntry(id=new_id)134 self.assertEqual(hqe.id, new_id)135 self.assertEqual(hqe.job_id, 1)136 self.assertEqual(hqe.host_id, 2)137 self.assertEqual(hqe.profile, 'rhel6')138 self.assertEqual(hqe.status, 'Queued')139 self.assertEqual(hqe.meta_host, None)140 self.assertEqual(hqe.active, False)141 self.assertEqual(hqe.complete, False)142 self.assertEqual(hqe.deleted, False)143 self.assertEqual(hqe.execution_subdir, '.')144 self.assertEqual(hqe.atomic_group_id, None)145 self.assertEqual(hqe.started_on, None)146class HostTest(BaseSchedulerModelsTest):147 def test_cmp_for_sort(self):148 expected_order = [149 'alice', 'Host1', 'host2', 'host3', 'host09', 'HOST010',150 'host10', 'host11', 'yolkfolk']151 hostname_idx = list(scheduler_models.Host._fields).index('hostname')152 row = [None] * len(scheduler_models.Host._fields)153 hosts = []154 for hostname in expected_order:155 row[hostname_idx] = hostname156 hosts.append(scheduler_models.Host(row=row, new_record=True))157 host1 = hosts[expected_order.index('Host1')]158 host010 = hosts[expected_order.index('HOST010')]159 host10 = hosts[expected_order.index('host10')]160 host3 = hosts[expected_order.index('host3')]161 alice = hosts[expected_order.index('alice')]162 self.assertEqual(0, scheduler_models.Host.cmp_for_sort(host10, host10))163 self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host10, host010))164 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host010, host10))165 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host10))166 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host010))167 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host3, host10))168 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host3, host010))169 self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host3, host1))170 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(host1, host3))171 self.assertEqual(-1, scheduler_models.Host.cmp_for_sort(alice, host3))172 self.assertEqual(1, scheduler_models.Host.cmp_for_sort(host3, alice))173 self.assertEqual(0, scheduler_models.Host.cmp_for_sort(alice, alice))174 hosts.sort(cmp=scheduler_models.Host.cmp_for_sort)175 self.assertEqual(expected_order, [h.hostname for h in hosts])176 hosts.reverse()177 hosts.sort(cmp=scheduler_models.Host.cmp_for_sort)178 self.assertEqual(expected_order, [h.hostname for h in hosts])179class HostQueueEntryTest(BaseSchedulerModelsTest):180 def _create_hqe(self, dependency_labels=(), **create_job_kwargs):181 job = self._create_job(**create_job_kwargs)182 for label in dependency_labels:183 job.dependency_labels.add(label)184 hqes = list(scheduler_models.HostQueueEntry.fetch(where='job_id=%d' % job.id))185 self.assertEqual(1, len(hqes))186 return hqes[0]187 def _check_hqe_labels(self, hqe, expected_labels):188 expected_labels = set(expected_labels)189 label_names = set(label.name for label in hqe.get_labels())190 self.assertEqual(expected_labels, label_names)191 def test_get_labels_empty(self):192 hqe = self._create_hqe(hosts=[1])193 labels = list(hqe.get_labels())194 self.assertEqual([], labels)195 def test_get_labels_metahost(self):196 hqe = self._create_hqe(metahosts=[2])197 self._check_hqe_labels(hqe, ['label2'])198 def test_get_labels_dependancies(self):199 hqe = self._create_hqe(dependency_labels=(self.label3, self.label4),200 metahosts=[1])201 self._check_hqe_labels(hqe, ['label1', 'label3', 'label4'])202class JobTest(BaseSchedulerModelsTest):203 def setUp(self):204 super(JobTest, self).setUp()205 def _mock_create(**kwargs):206 task = models.SpecialTask(**kwargs)207 task.save()208 self._task = task209 self.god.stub_with(models.SpecialTask.objects, 'create', _mock_create)210 def _test_pre_job_tasks_helper(self):211 """212 Calls HQE._do_schedule_pre_job_tasks() and returns the created special213 task214 """215 self._task = None216 queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]217 queue_entry._do_schedule_pre_job_tasks()218 return self._task219 def test_job_request_abort(self):220 django_job = self._create_job(hosts=[5, 6], atomic_group=1)221 job = scheduler_models.Job(django_job.id)222 job.request_abort()223 django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))224 for hqe in django_hqes:225 self.assertTrue(hqe.aborted)226 def test__atomic_and_has_started__on_atomic(self):227 self._create_job(hosts=[5, 6], atomic_group=1)228 job = scheduler_models.Job.fetch('id = 1')[0]229 self.assertFalse(job._atomic_and_has_started())230 self._update_hqe("status='Pending'")231 self.assertFalse(job._atomic_and_has_started())232 self._update_hqe("status='Verifying'")233 self.assertFalse(job._atomic_and_has_started())234 self.assertFalse(job._atomic_and_has_started())235 self._update_hqe("status='Failed'")236 self.assertFalse(job._atomic_and_has_started())237 self._update_hqe("status='Stopped'")238 self.assertFalse(job._atomic_and_has_started())239 self._update_hqe("status='Starting'")240 self.assertTrue(job._atomic_and_has_started())241 self._update_hqe("status='Completed'")242 self.assertTrue(job._atomic_and_has_started())243 self._update_hqe("status='Aborted'")244 def test__atomic_and_has_started__not_atomic(self):245 self._create_job(hosts=[1, 2])246 job = scheduler_models.Job.fetch('id = 1')[0]247 self.assertFalse(job._atomic_and_has_started())248 self._update_hqe("status='Starting'")249 self.assertFalse(job._atomic_and_has_started())250 def _check_special_task(self, task, task_type, queue_entry_id=None):251 self.assertEquals(task.task, task_type)252 self.assertEquals(task.host.id, 1)253 if queue_entry_id:254 self.assertEquals(task.queue_entry.id, queue_entry_id)255 def test_run_asynchronous(self):256 self._create_job(hosts=[1, 2])257 task = self._test_pre_job_tasks_helper()258 self._check_special_task(task, models.SpecialTask.Task.VERIFY, 1)259 def test_run_asynchronous_skip_verify(self):260 job = self._create_job(hosts=[1, 2])261 job.run_verify = False262 job.save()263 task = self._test_pre_job_tasks_helper()264 self.assertEquals(task, None)265 def test_run_synchronous_verify(self):266 self._create_job(hosts=[1, 2], synchronous=True)267 task = self._test_pre_job_tasks_helper()268 self._check_special_task(task, models.SpecialTask.Task.VERIFY, 1)269 def test_run_synchronous_skip_verify(self):270 job = self._create_job(hosts=[1, 2], synchronous=True)271 job.run_verify = False272 job.save()273 task = self._test_pre_job_tasks_helper()274 self.assertEquals(task, None)275 def test_run_atomic_group_already_started(self):276 self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)277 self._update_hqe("status='Starting', execution_subdir=''")278 job = scheduler_models.Job.fetch('id = 1')[0]279 queue_entry = scheduler_models.HostQueueEntry.fetch('id = 1')[0]280 assert queue_entry.job is job281 self.assertEqual(None, job.run(queue_entry))282 self.god.check_playback()283 def test_reboot_before_always(self):284 job = self._create_job(hosts=[1])285 job.reboot_before = model_attributes.RebootBefore.ALWAYS286 job.save()287 task = self._test_pre_job_tasks_helper()288 self._check_special_task(task, models.SpecialTask.Task.CLEANUP)289 def _test_reboot_before_if_dirty_helper(self, expect_reboot):290 job = self._create_job(hosts=[1])291 job.reboot_before = model_attributes.RebootBefore.IF_DIRTY...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!