Best Python code snippet using autotest_python
rpc_interface_unittest.py
Source:rpc_interface_unittest.py
...347 host.save()348 self.assertFalse(host.locked)349 self.god.stub_class_method(frontend.AFE, 'run')350 if host_on_shard and not on_shard:351 mock_afe = self.god.create_mock_class_obj(352 frontend_wrappers.RetryingAFE, 'MockAFE')353 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)354 mock_afe2 = frontend_wrappers.RetryingAFE.expect_new(355 server=shard_hostname, user=None)356 mock_afe2.run.expect_call('modify_host_local', id=host.id,357 locked=True, lock_reason='_modify_host_helper lock',358 lock_time=datetime.datetime(2015, 12, 15))359 elif on_shard:360 mock_afe = self.god.create_mock_class_obj(361 frontend_wrappers.RetryingAFE, 'MockAFE')362 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)363 mock_afe2 = frontend_wrappers.RetryingAFE.expect_new(364 server=server_utils.get_global_afe_hostname(), user=None)365 mock_afe2.run.expect_call('modify_host', id=host.id,366 locked=True, lock_reason='_modify_host_helper lock',367 lock_time=datetime.datetime(2015, 12, 15))368 rpc_interface.modify_host(id=host.id, locked=True,369 lock_reason='_modify_host_helper lock',370 lock_time=datetime.datetime(2015, 12, 15))371 host = models.Host.objects.get(pk=host.id)372 if on_shard:373 # modify_host on shard does nothing but routing the RPC to master.374 self.assertFalse(host.locked)375 else:376 self.assertTrue(host.locked)377 self.god.check_playback()378 def test_modify_host_on_master_host_on_master(self):379 """Call modify_host to master for host in master."""380 self._modify_host_helper()381 def test_modify_host_on_master_host_on_shard(self):382 """Call modify_host to master for host in shard."""383 self._modify_host_helper(host_on_shard=True)384 def test_modify_host_on_shard(self):385 """Call modify_host to shard for host in shard."""386 self._modify_host_helper(on_shard=True, host_on_shard=True)387 def test_modify_hosts_on_master_host_on_shard(self):388 """Ensure calls to modify_hosts are correctly forwarded to shards."""389 host1 = models.Host.objects.all()[0]390 host2 = models.Host.objects.all()[1]391 shard1 = models.Shard.objects.create(hostname='shard1')392 host1.shard = shard1393 host1.save()394 shard2 = models.Shard.objects.create(hostname='shard2')395 host2.shard = shard2396 host2.save()397 self.assertFalse(host1.locked)398 self.assertFalse(host2.locked)399 mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,400 'MockAFE')401 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)402 # The statuses of one host might differ on master and shard.403 # Filters are always applied on the master. So the host on the shard404 # will be affected no matter what his status is.405 filters_to_use = {'status': 'Ready'}406 mock_afe2 = frontend_wrappers.RetryingAFE.expect_new(407 server='shard2', user=None)408 mock_afe2.run.expect_call(409 'modify_hosts_local',410 host_filter_data={'id__in': [shard1.id, shard2.id]},411 update_data={'locked': True,412 'lock_reason': 'Testing forward to shard',413 'lock_time' : datetime.datetime(2015, 12, 15) })414 mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(415 server='shard1', user=None)416 mock_afe1.run.expect_call(417 'modify_hosts_local',418 host_filter_data={'id__in': [shard1.id, shard2.id]},419 update_data={'locked': True,420 'lock_reason': 'Testing forward to shard',421 'lock_time' : datetime.datetime(2015, 12, 15)})422 rpc_interface.modify_hosts(423 host_filter_data={'status': 'Ready'},424 update_data={'locked': True,425 'lock_reason': 'Testing forward to shard',426 'lock_time' : datetime.datetime(2015, 12, 15) })427 host1 = models.Host.objects.get(pk=host1.id)428 self.assertTrue(host1.locked)429 host2 = models.Host.objects.get(pk=host2.id)430 self.assertTrue(host2.locked)431 self.god.check_playback()432 def test_delete_host(self):433 """Ensure an RPC is made on delete a host, if it is on a shard."""434 host1 = models.Host.objects.all()[0]435 shard1 = models.Shard.objects.create(hostname='shard1')436 host1.shard = shard1437 host1.save()438 host1_id = host1.id439 mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,440 'MockAFE')441 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)442 mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(443 server='shard1', user=None)444 mock_afe1.run.expect_call('delete_host', id=host1.id)445 rpc_interface.delete_host(id=host1.id)446 self.assertRaises(models.Host.DoesNotExist,447 models.Host.smart_get, host1_id)448 self.god.check_playback()449 def test_modify_label(self):450 label1 = models.Label.objects.all()[0]451 self.assertEqual(label1.invalid, 0)452 host2 = models.Host.objects.all()[1]453 shard1 = models.Shard.objects.create(hostname='shard1')454 host2.shard = shard1455 host2.labels.add(label1)456 host2.save()457 mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,458 'MockAFE')459 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)460 mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(461 server='shard1', user=None)462 mock_afe1.run.expect_call('modify_label', id=label1.id, invalid=1)463 rpc_interface.modify_label(label1.id, invalid=1)464 self.assertEqual(models.Label.objects.all()[0].invalid, 1)465 self.god.check_playback()466 def test_delete_label(self):467 label1 = models.Label.objects.all()[0]468 host2 = models.Host.objects.all()[1]469 shard1 = models.Shard.objects.create(hostname='shard1')470 host2.shard = shard1471 host2.labels.add(label1)472 host2.save()473 mock_afe = self.god.create_mock_class_obj(frontend_wrappers.RetryingAFE,474 'MockAFE')475 self.god.stub_with(frontend_wrappers, 'RetryingAFE', mock_afe)476 mock_afe1 = frontend_wrappers.RetryingAFE.expect_new(477 server='shard1', user=None)478 mock_afe1.run.expect_call('delete_label', id=label1.id)479 rpc_interface.delete_label(id=label1.id)480 self.assertRaises(models.Label.DoesNotExist,481 models.Label.smart_get, label1.id)482 self.god.check_playback()483if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!