Best Python code snippet using tempest_python
test_container_sync.py
Source:test_container_sync.py
...64 def resource_cleanup(cls):65 for client in cls.clients.values():66 cls.delete_containers(cls.containers, client[0], client[1])67 super(ContainerSyncTest, cls).resource_cleanup()68 def _test_container_synchronization(self, make_headers):69 # container to container synchronization70 # to allow/accept sync requests to/from other accounts71 # turn container synchronization on and create object in container72 for cont in (self.containers, self.containers[::-1]):73 cont_client = [self.clients[c][0] for c in cont]74 obj_client = [self.clients[c][1] for c in cont]75 headers = make_headers(cont[1], cont_client[1])76 resp, body = \77 cont_client[0].put(str(cont[0]), body=None, headers=headers)78 # create object in container79 object_name = data_utils.rand_name(name='TestSyncObject')80 data = object_name[::-1] # data_utils.arbitrary_string()81 resp, _ = obj_client[0].create_object(cont[0], object_name, data)82 self.objects.append(object_name)83 # wait until container contents list is not empty84 cont_client = [self.clients[c][0] for c in self.containers]85 params = {'format': 'json'}86 while self.attempts > 0:87 object_lists = []88 for c_client, cont in zip(cont_client, self.containers):89 resp, object_list = c_client.list_container_contents(90 cont, params=params)91 object_lists.append(dict(92 (obj['name'], obj) for obj in object_list))93 # check that containers are not empty and have equal keys()94 # or wait for next attempt95 if object_lists[0] and object_lists[1] and \96 set(object_lists[0].keys()) == set(object_lists[1].keys()):97 break98 else:99 time.sleep(self.container_sync_interval)100 self.attempts -= 1101 self.assertEqual(object_lists[0], object_lists[1],102 'Different object lists in containers.')103 # Verify object content104 obj_clients = [(self.clients[c][1], c) for c in self.containers]105 for obj_client, cont in obj_clients:106 for obj_name in object_lists[0]:107 resp, object_content = obj_client.get_object(cont, obj_name)108 self.assertEqual(object_content, obj_name[::-1])109 @test.attr(type='slow')110 @decorators.skip_because(bug='1317133')111 @test.idempotent_id('be008325-1bba-4925-b7dd-93b58f22ce9b')112 @testtools.skipIf(113 not CONF.object_storage_feature_enabled.container_sync,114 'Old-style container sync function is disabled')115 def test_container_synchronization(self):116 def make_headers(cont, cont_client):117 # tell first container to synchronize to a second118 client_proxy_ip = \119 urlparse.urlparse(cont_client.base_url).netloc.split(':')[0]120 client_base_url = \121 cont_client.base_url.replace(client_proxy_ip,122 self.local_ip)123 headers = {'X-Container-Sync-Key': 'sync_key',124 'X-Container-Sync-To': "%s/%s" %125 (client_base_url, str(cont))}126 return headers...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!