Best Python code snippet using autotest_python
monitor_db_unittest.py
Source:monitor_db_unittest.py
...51 tasks = list(parameter.queue.queue)52 if len(tasks) != 1:53 return False54 return tasks[0] == self._task55def _set_host_and_qe_ids(agent_or_task, id_list=None):56 if id_list is None:57 id_list = []58 agent_or_task.host_ids = agent_or_task.queue_entry_ids = id_list59 agent_or_task.hostnames = dict((host_id, '192.168.1.1')60 for host_id in id_list)61class BaseSchedulerTest(unittest.TestCase,62 frontend_test_utils.FrontendTestMixin):63 _config_section = 'AUTOTEST_WEB'64 def _do_query(self, sql):65 self._database.execute(sql)66 def _set_monitor_stubs(self):67 self.mock_config = global_config.FakeGlobalConfig()68 self.god.stub_with(global_config, 'global_config', self.mock_config)69 # Clear the instance cache as this is a brand new database.70 scheduler_models.DBObject._clear_instance_cache()71 self._database = (72 database_connection.TranslatingDatabase.get_test_database(73 translators=scheduler_lib._DB_TRANSLATORS))74 self._database.connect(db_type='django')75 self._database.debug = _DEBUG76 connection_manager = scheduler_lib.ConnectionManager(autocommit=False)77 self.god.stub_with(connection_manager, 'db_connection', self._database)78 self.god.stub_with(monitor_db, '_db_manager', connection_manager)79 self.god.stub_with(monitor_db, '_db', self._database)80 self.god.stub_with(monitor_db.Dispatcher,81 '_get_pending_queue_entries',82 self._get_pending_hqes)83 self.god.stub_with(scheduler_models, '_db', self._database)84 self.god.stub_with(drone_manager.instance(), '_results_dir',85 '/test/path')86 self.god.stub_with(drone_manager.instance(), '_temporary_directory',87 '/test/path/tmp')88 self.god.stub_with(drone_manager.instance(), 'initialize',89 lambda *args: None)90 self.god.stub_with(drone_manager.instance(), 'execute_actions',91 lambda *args: None)92 monitor_db.initialize_globals()93 scheduler_models.initialize_globals()94 def setUp(self):95 self._frontend_common_setup()96 self._set_monitor_stubs()97 self._set_global_config_values()98 self._dispatcher = monitor_db.Dispatcher()99 def tearDown(self):100 self._database.disconnect()101 self._frontend_common_teardown()102 def _set_global_config_values(self):103 """Set global_config values to suit unittest needs."""104 self.mock_config.set_config_value(105 'SCHEDULER', 'inline_host_acquisition', True)106 def _update_hqe(self, set, where=''):107 query = 'UPDATE afe_host_queue_entries SET ' + set108 if where:109 query += ' WHERE ' + where110 self._do_query(query)111 def _get_pending_hqes(self):112 query_string=('afe_jobs.priority DESC, '113 'ifnull(nullif(host_id, NULL), host_id) DESC, '114 'ifnull(nullif(meta_host, NULL), meta_host) DESC, '115 'job_id')116 return list(scheduler_models.HostQueueEntry.fetch(117 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',118 where='NOT complete AND NOT active AND status="Queued"',119 order_by=query_string))120class DispatcherSchedulingTest(BaseSchedulerTest):121 _jobs_scheduled = []122 def tearDown(self):123 super(DispatcherSchedulingTest, self).tearDown()124 def _set_monitor_stubs(self):125 super(DispatcherSchedulingTest, self)._set_monitor_stubs()126 def hqe__do_schedule_pre_job_tasks_stub(queue_entry):127 """Called by HostQueueEntry.run()."""128 self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id)129 queue_entry.set_status('Starting')130 self.god.stub_with(scheduler_models.HostQueueEntry,131 '_do_schedule_pre_job_tasks',132 hqe__do_schedule_pre_job_tasks_stub)133 def _record_job_scheduled(self, job_id, host_id):134 record = (job_id, host_id)135 self.assert_(record not in self._jobs_scheduled,136 'Job %d scheduled on host %d twice' %137 (job_id, host_id))138 self._jobs_scheduled.append(record)139 def _assert_job_scheduled_on(self, job_id, host_id):140 record = (job_id, host_id)141 self.assert_(record in self._jobs_scheduled,142 'Job %d not scheduled on host %d as expected\n'143 'Jobs scheduled: %s' %144 (job_id, host_id, self._jobs_scheduled))145 self._jobs_scheduled.remove(record)146 def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):147 """Assert job was scheduled on exactly number hosts out of a set."""148 found = []149 for host_id in host_ids:150 record = (job_id, host_id)151 if record in self._jobs_scheduled:152 found.append(record)153 self._jobs_scheduled.remove(record)154 if len(found) < number:155 self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'156 'Jobs scheduled: %s' % (job_id, number, host_ids, found))157 elif len(found) > number:158 self.fail('Job %d scheduled on more than %d hosts in %s.\n'159 'Jobs scheduled: %s' % (job_id, number, host_ids, found))160 def _check_for_extra_schedulings(self):161 if len(self._jobs_scheduled) != 0:162 self.fail('Extra jobs scheduled: ' +163 str(self._jobs_scheduled))164 def _convert_jobs_to_metahosts(self, *job_ids):165 sql_tuple = '(' + ','.join(str(i) for i in job_ids) + ')'166 self._do_query('UPDATE afe_host_queue_entries SET '167 'meta_host=host_id, host_id=NULL '168 'WHERE job_id IN ' + sql_tuple)169 def _lock_host(self, host_id):170 self._do_query('UPDATE afe_hosts SET locked=1 WHERE id=' +171 str(host_id))172 def setUp(self):173 super(DispatcherSchedulingTest, self).setUp()174 self._jobs_scheduled = []175 def _run_scheduler(self):176 self._dispatcher._host_scheduler.tick()177 for _ in xrange(2): # metahost scheduling can take two ticks178 self._dispatcher._schedule_new_jobs()179 def _test_basic_scheduling_helper(self, use_metahosts):180 'Basic nonmetahost scheduling'181 self._create_job_simple([1], use_metahosts)182 self._create_job_simple([2], use_metahosts)183 self._run_scheduler()184 self._assert_job_scheduled_on(1, 1)185 self._assert_job_scheduled_on(2, 2)186 self._check_for_extra_schedulings()187 def _test_priorities_helper(self, use_metahosts):188 'Test prioritization ordering'189 self._create_job_simple([1], use_metahosts)190 self._create_job_simple([2], use_metahosts)191 self._create_job_simple([1,2], use_metahosts)192 self._create_job_simple([1], use_metahosts, priority=1)193 self._run_scheduler()194 self._assert_job_scheduled_on(4, 1) # higher priority195 self._assert_job_scheduled_on(2, 2) # earlier job over later196 self._check_for_extra_schedulings()197 def _test_hosts_ready_helper(self, use_metahosts):198 """199 Only hosts that are status=Ready, unlocked and not invalid get200 scheduled.201 """202 self._create_job_simple([1], use_metahosts)203 self._do_query('UPDATE afe_hosts SET status="Running" WHERE id=1')204 self._run_scheduler()205 self._check_for_extra_schedulings()206 self._do_query('UPDATE afe_hosts SET status="Ready", locked=1 '207 'WHERE id=1')208 self._run_scheduler()209 self._check_for_extra_schedulings()210 self._do_query('UPDATE afe_hosts SET locked=0, invalid=1 '211 'WHERE id=1')212 self._run_scheduler()213 if not use_metahosts:214 self._assert_job_scheduled_on(1, 1)215 self._check_for_extra_schedulings()216 def _test_hosts_idle_helper(self, use_metahosts):217 'Only idle hosts get scheduled'218 self._create_job(hosts=[1], active=True)219 self._create_job_simple([1], use_metahosts)220 self._run_scheduler()221 self._check_for_extra_schedulings()222 def _test_obey_ACLs_helper(self, use_metahosts):223 self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')224 self._create_job_simple([1], use_metahosts)225 self._run_scheduler()226 self._check_for_extra_schedulings()227 def test_basic_scheduling(self):228 self._test_basic_scheduling_helper(False)229 def test_priorities(self):230 self._test_priorities_helper(False)231 def test_hosts_ready(self):232 self._test_hosts_ready_helper(False)233 def test_hosts_idle(self):234 self._test_hosts_idle_helper(False)235 def test_obey_ACLs(self):236 self._test_obey_ACLs_helper(False)237 def test_one_time_hosts_ignore_ACLs(self):238 self._do_query('DELETE FROM afe_acl_groups_hosts WHERE host_id=1')239 self._do_query('UPDATE afe_hosts SET invalid=1 WHERE id=1')240 self._create_job_simple([1])241 self._run_scheduler()242 self._assert_job_scheduled_on(1, 1)243 self._check_for_extra_schedulings()244 def test_non_metahost_on_invalid_host(self):245 """246 Non-metahost entries can get scheduled on invalid hosts (this is how247 one-time hosts work).248 """249 self._do_query('UPDATE afe_hosts SET invalid=1')250 self._test_basic_scheduling_helper(False)251 def test_metahost_scheduling(self):252 """253 Basic metahost scheduling254 """255 self._test_basic_scheduling_helper(True)256 def test_metahost_priorities(self):257 self._test_priorities_helper(True)258 def test_metahost_hosts_ready(self):259 self._test_hosts_ready_helper(True)260 def test_metahost_hosts_idle(self):261 self._test_hosts_idle_helper(True)262 def test_metahost_obey_ACLs(self):263 self._test_obey_ACLs_helper(True)264 def test_nonmetahost_over_metahost(self):265 """266 Non-metahost entries should take priority over metahost entries267 for the same host268 """269 self._create_job(metahosts=[1])270 self._create_job(hosts=[1])271 self._run_scheduler()272 self._assert_job_scheduled_on(2, 1)273 self._check_for_extra_schedulings()274 def test_no_execution_subdir_not_found(self):275 """Reproduce bug crosbug.com/334353 and recover from it."""276 self.mock_config.set_config_value('SCHEDULER', 'drones', 'localhost')277 job = self._create_job(hostless=True)278 # Ensure execution_subdir is set before status279 original_set_status = scheduler_models.HostQueueEntry.set_status280 def fake_set_status(hqe, *args, **kwargs):281 self.assertEqual(hqe.execution_subdir, 'hostless')282 original_set_status(hqe, *args, **kwargs)283 self.god.stub_with(scheduler_models.HostQueueEntry, 'set_status',284 fake_set_status)285 self._dispatcher._schedule_new_jobs()286 hqe = job.hostqueueentry_set.all()[0]287 self.assertEqual(models.HostQueueEntry.Status.STARTING, hqe.status)288 self.assertEqual('hostless', hqe.execution_subdir)289 def test_only_schedule_queued_entries(self):290 self._create_job(metahosts=[1])291 self._update_hqe(set='active=1, host_id=2')292 self._run_scheduler()293 self._check_for_extra_schedulings()294 def test_no_ready_hosts(self):295 self._create_job(hosts=[1])296 self._do_query('UPDATE afe_hosts SET status="Repair Failed"')297 self._run_scheduler()298 self._check_for_extra_schedulings()299 def test_garbage_collection(self):300 self.god.stub_with(self._dispatcher, '_seconds_between_garbage_stats',301 999999)302 self.god.stub_function(gc, 'collect')303 self.god.stub_function(gc_stats, '_log_garbage_collector_stats')304 gc.collect.expect_call().and_return(0)305 gc_stats._log_garbage_collector_stats.expect_call()306 # Force a garbage collection run307 self._dispatcher._last_garbage_stats_time = 0308 self._dispatcher._garbage_collection()309 # The previous call should have reset the time, it won't do anything310 # the second time. If it does, we'll get an unexpected call.311 self._dispatcher._garbage_collection()312class DispatcherThrottlingTest(BaseSchedulerTest):313 """314 Test that the dispatcher throttles:315 * total number of running processes316 * number of processes started per cycle317 """318 _MAX_RUNNING = 3319 _MAX_STARTED = 2320 def setUp(self):321 super(DispatcherThrottlingTest, self).setUp()322 scheduler_config.config.max_processes_per_drone = self._MAX_RUNNING323 def fake_max_runnable_processes(fake_self, username,324 drone_hostnames_allowed):325 running = sum(agent.task.num_processes326 for agent in self._agents327 if agent.started and not agent.is_done())328 return self._MAX_RUNNING - running329 self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',330 fake_max_runnable_processes)331 def _setup_some_agents(self, num_agents):332 self._agents = [DummyAgent() for i in xrange(num_agents)]333 self._dispatcher._agents = list(self._agents)334 def _run_a_few_ticks(self):335 for i in xrange(4):336 self._dispatcher._handle_agents()337 def _assert_agents_started(self, indexes, is_started=True):338 for i in indexes:339 self.assert_(self._agents[i].started == is_started,340 'Agent %d %sstarted' %341 (i, is_started and 'not ' or ''))342 def _assert_agents_not_started(self, indexes):343 self._assert_agents_started(indexes, False)344 def test_throttle_total(self):345 self._setup_some_agents(4)346 self._run_a_few_ticks()347 self._assert_agents_started([0, 1, 2])348 self._assert_agents_not_started([3])349 def test_throttle_with_synchronous(self):350 self._setup_some_agents(2)351 self._agents[0].task.num_processes = 3352 self._run_a_few_ticks()353 self._assert_agents_started([0])354 self._assert_agents_not_started([1])355 def test_large_agent_starvation(self):356 """357 Ensure large agents don't get starved by lower-priority agents.358 """359 self._setup_some_agents(3)360 self._agents[1].task.num_processes = 3361 self._run_a_few_ticks()362 self._assert_agents_started([0])363 self._assert_agents_not_started([1, 2])364 self._agents[0].set_done(True)365 self._run_a_few_ticks()366 self._assert_agents_started([1])367 self._assert_agents_not_started([2])368 def test_zero_process_agent(self):369 self._setup_some_agents(5)370 self._agents[4].task.num_processes = 0371 self._run_a_few_ticks()372 self._assert_agents_started([0, 1, 2, 4])373 self._assert_agents_not_started([3])374class PidfileRunMonitorTest(unittest.TestCase):375 execution_tag = 'test_tag'376 pid = 12345377 process = drone_manager.Process('myhost', pid)378 num_tests_failed = 1379 def setUp(self):380 self.god = mock.mock_god()381 self.mock_drone_manager = self.god.create_mock_class(382 drone_manager.DroneManager, 'drone_manager')383 self.god.stub_with(drone_manager, '_the_instance',384 self.mock_drone_manager)385 self.god.stub_with(pidfile_monitor, '_get_pidfile_timeout_secs',386 self._mock_get_pidfile_timeout_secs)387 self.pidfile_id = object()388 (self.mock_drone_manager.get_pidfile_id_from389 .expect_call(self.execution_tag,390 pidfile_name=drone_manager.AUTOSERV_PID_FILE)391 .and_return(self.pidfile_id))392 self.monitor = pidfile_monitor.PidfileRunMonitor()393 self.monitor.attach_to_existing_process(self.execution_tag)394 def tearDown(self):395 self.god.unstub_all()396 def _mock_get_pidfile_timeout_secs(self):397 return 300398 def setup_pidfile(self, pid=None, exit_code=None, tests_failed=None,399 use_second_read=False):400 contents = drone_manager.PidfileContents()401 if pid is not None:402 contents.process = drone_manager.Process('myhost', pid)403 contents.exit_status = exit_code404 contents.num_tests_failed = tests_failed405 self.mock_drone_manager.get_pidfile_contents.expect_call(406 self.pidfile_id, use_second_read=use_second_read).and_return(407 contents)408 def set_not_yet_run(self):409 self.setup_pidfile()410 def set_empty_pidfile(self):411 self.setup_pidfile()412 def set_running(self, use_second_read=False):413 self.setup_pidfile(self.pid, use_second_read=use_second_read)414 def set_complete(self, error_code, use_second_read=False):415 self.setup_pidfile(self.pid, error_code, self.num_tests_failed,416 use_second_read=use_second_read)417 def _check_monitor(self, expected_pid, expected_exit_status,418 expected_num_tests_failed):419 if expected_pid is None:420 self.assertEquals(self.monitor._state.process, None)421 else:422 self.assertEquals(self.monitor._state.process.pid, expected_pid)423 self.assertEquals(self.monitor._state.exit_status, expected_exit_status)424 self.assertEquals(self.monitor._state.num_tests_failed,425 expected_num_tests_failed)426 self.god.check_playback()427 def _test_read_pidfile_helper(self, expected_pid, expected_exit_status,428 expected_num_tests_failed):429 self.monitor._read_pidfile()430 self._check_monitor(expected_pid, expected_exit_status,431 expected_num_tests_failed)432 def _get_expected_tests_failed(self, expected_exit_status):433 if expected_exit_status is None:434 expected_tests_failed = None435 else:436 expected_tests_failed = self.num_tests_failed437 return expected_tests_failed438 def test_read_pidfile(self):439 self.set_not_yet_run()440 self._test_read_pidfile_helper(None, None, None)441 self.set_empty_pidfile()442 self._test_read_pidfile_helper(None, None, None)443 self.set_running()444 self._test_read_pidfile_helper(self.pid, None, None)445 self.set_complete(123)446 self._test_read_pidfile_helper(self.pid, 123, self.num_tests_failed)447 def test_read_pidfile_error(self):448 self.mock_drone_manager.get_pidfile_contents.expect_call(449 self.pidfile_id, use_second_read=False).and_return(450 drone_manager.InvalidPidfile('error'))451 self.assertRaises(pidfile_monitor.PidfileRunMonitor._PidfileException,452 self.monitor._read_pidfile)453 self.god.check_playback()454 def setup_is_running(self, is_running):455 self.mock_drone_manager.is_process_running.expect_call(456 self.process).and_return(is_running)457 def _test_get_pidfile_info_helper(self, expected_pid, expected_exit_status,458 expected_num_tests_failed):459 self.monitor._get_pidfile_info()460 self._check_monitor(expected_pid, expected_exit_status,461 expected_num_tests_failed)462 def test_get_pidfile_info(self):463 """464 normal cases for get_pidfile_info465 """466 # running467 self.set_running()468 self.setup_is_running(True)469 self._test_get_pidfile_info_helper(self.pid, None, None)470 # exited during check471 self.set_running()472 self.setup_is_running(False)473 self.set_complete(123, use_second_read=True) # pidfile gets read again474 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)475 # completed476 self.set_complete(123)477 self._test_get_pidfile_info_helper(self.pid, 123, self.num_tests_failed)478 def test_get_pidfile_info_running_no_proc(self):479 """480 pidfile shows process running, but no proc exists481 """482 # running but no proc483 self.set_running()484 self.setup_is_running(False)485 self.set_running(use_second_read=True)486 self._test_get_pidfile_info_helper(self.pid, 1, 0)487 self.assertTrue(self.monitor.lost_process)488 def test_get_pidfile_info_not_yet_run(self):489 """490 pidfile hasn't been written yet491 """492 self.set_not_yet_run()493 self._test_get_pidfile_info_helper(None, None, None)494 def test_process_failed_to_write_pidfile(self):495 self.set_not_yet_run()496 self.monitor._start_time = (time.time() -497 pidfile_monitor._get_pidfile_timeout_secs() - 1)498 self._test_get_pidfile_info_helper(None, 1, 0)499 self.assertTrue(self.monitor.lost_process)500class AgentTest(unittest.TestCase):501 def setUp(self):502 self.god = mock.mock_god()503 self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,504 'dispatcher')505 def tearDown(self):506 self.god.unstub_all()507 def _create_mock_task(self, name):508 task = self.god.create_mock_class(agent_task.AgentTask, name)509 task.num_processes = 1510 _set_host_and_qe_ids(task)511 return task512 def _create_agent(self, task):513 agent = monitor_db.Agent(task)514 agent.dispatcher = self._dispatcher515 return agent516 def _finish_agent(self, agent):517 while not agent.is_done():518 agent.tick()519 def test_agent_abort(self):520 task = self._create_mock_task('task')521 task.poll.expect_call()522 task.is_done.expect_call().and_return(False)523 task.abort.expect_call()524 task.aborted = True...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!