Best Python code snippet using localstack_python
test_docker.py
Source:test_docker.py
...24 ],25)26LOG = logging.getLogger(__name__)27container_name_prefix = "lst_test_"28def _random_container_name() -> str:29 return f"{container_name_prefix}{short_uid()}"30@pytest.fixture31def dummy_container(create_container):32 """Returns a container that is created but not started"""33 return create_container("alpine", command=["sh", "-c", "while true; do sleep 1; done"])34@pytest.fixture35def create_container(docker_client: ContainerClient, create_network):36 """37 Uses the factory as fixture pattern to wrap ContainerClient.create_container as a factory that38 removes the containers after the fixture is cleaned up.39 Depends on create network for correct cleanup order40 """41 containers = list()42 def _create_container(image_name: str, **kwargs):43 kwargs["name"] = kwargs.get("name", _random_container_name())44 cid = docker_client.create_container(image_name, **kwargs)45 cid = cid.strip()46 containers.append(cid)47 return ContainerInfo(cid, kwargs["name"]) # FIXME name should come from docker_client48 yield _create_container49 for c in containers:50 try:51 docker_client.remove_container(c)52 except Exception:53 LOG.warning("failed to remove test container %s", c)54@pytest.fixture55def create_network():56 """57 Uses the factory as fixture pattern to wrap the creation of networks as a factory that58 removes the networks after the fixture is cleaned up.59 """60 networks = list()61 def _create_network(network_name: str):62 network_id = safe_run([config.DOCKER_CMD, "network", "create", network_name]).strip()63 networks.append(network_id)64 return network_id65 yield _create_network66 for network in networks:67 try:68 LOG.debug("Removing network %s", network)69 safe_run([config.DOCKER_CMD, "network", "remove", network])70 except CalledProcessError:71 pass72class TestDockerClient:73 def test_container_lifecycle_commands(self, docker_client: ContainerClient):74 container_name = _random_container_name()75 output = docker_client.create_container(76 "alpine",77 name=container_name,78 command=["sh", "-c", "for i in `seq 30`; do sleep 1; echo $i; done"],79 )80 container_id = output.strip()81 assert container_id82 try:83 docker_client.start_container(container_id)84 assert DockerContainerStatus.UP == docker_client.get_container_status(container_name)85 docker_client.stop_container(container_id)86 assert DockerContainerStatus.DOWN == docker_client.get_container_status(container_name)87 finally:88 docker_client.remove_container(container_id)89 assert DockerContainerStatus.NON_EXISTENT == docker_client.get_container_status(90 container_name91 )92 def test_create_container_remove_removes_container(93 self, docker_client: ContainerClient, create_container94 ):95 info = create_container("alpine", remove=True, command=["echo", "foobar"])96 # make sure it was correctly created97 assert 1 == len(docker_client.list_containers(f"id={info.container_id}"))98 # start the container99 output, _ = docker_client.start_container(info.container_id, attach=True)100 output = output.decode(config.DEFAULT_ENCODING)101 time.sleep(1) # give the docker daemon some time to remove the container after execution102 assert 0 == len(docker_client.list_containers(f"id={info.container_id}"))103 # it takes a while for it to be removed104 assert "foobar" in output105 def test_create_container_non_existing_image(self, docker_client: ContainerClient):106 with pytest.raises(NoSuchImage):107 docker_client.create_container("this_image_does_hopefully_not_exist_42069")108 def test_exec_in_container(109 self, docker_client: ContainerClient, dummy_container: ContainerInfo110 ):111 docker_client.start_container(dummy_container.container_id)112 output, _ = docker_client.exec_in_container(113 dummy_container.container_id, command=["echo", "foobar"]114 )115 output = output.decode(config.DEFAULT_ENCODING)116 assert "foobar" == output.strip()117 def test_exec_in_container_not_running_raises_exception(118 self, docker_client: ContainerClient, dummy_container119 ):120 with pytest.raises(ContainerException):121 # can't exec into a non-running container122 docker_client.exec_in_container(123 dummy_container.container_id, command=["echo", "foobar"]124 )125 def test_exec_in_container_with_env(self, docker_client: ContainerClient, dummy_container):126 docker_client.start_container(dummy_container.container_id)127 env = {"MYVAR": "foo_var"}128 output, _ = docker_client.exec_in_container(129 dummy_container.container_id, env_vars=env, command=["env"]130 )131 output = output.decode(config.DEFAULT_ENCODING)132 assert "MYVAR=foo_var" in output133 def test_exec_error_in_container(self, docker_client: ContainerClient, dummy_container):134 docker_client.start_container(dummy_container.container_id)135 with pytest.raises(ContainerException) as ex:136 docker_client.exec_in_container(137 dummy_container.container_id, command=["./doesnotexist"]138 )139 assert ex.match("doesnotexist: no such file or directory")140 def test_create_container_with_max_env_vars(141 self, docker_client: ContainerClient, create_container142 ):143 # default ARG_MAX=131072 in Docker144 env = dict([(f"IVAR_{i:05d}", f"VAL_{i:05d}") for i in range(2000)])145 # make sure we're really triggering the relevant code146 assert len(str(dict(env))) >= Util.MAX_ENV_ARGS_LENGTH147 info = create_container("alpine", env_vars=env, command=["env"])148 output, _ = docker_client.start_container(info.container_id, attach=True)149 output = output.decode(config.DEFAULT_ENCODING)150 assert "IVAR_00001=VAL_00001" in output151 assert "IVAR_01000=VAL_01000" in output152 assert "IVAR_01999=VAL_01999" in output153 def test_run_container(self, docker_client: ContainerClient):154 container_name = _random_container_name()155 try:156 output, _ = docker_client.run_container(157 "alpine",158 name=container_name,159 command=["echo", "foobared"],160 )161 output = output.decode(config.DEFAULT_ENCODING)162 assert "foobared" in output163 finally:164 docker_client.remove_container(container_name)165 def test_run_container_error(self, docker_client: ContainerClient):166 container_name = _random_container_name()167 try:168 with pytest.raises(ContainerException):169 docker_client.run_container(170 "alpine",171 name=container_name,172 command=["./doesnotexist"],173 )174 finally:175 docker_client.remove_container(container_name)176 def test_stop_non_existing_container(self, docker_client: ContainerClient):177 with pytest.raises(NoSuchContainer):178 docker_client.stop_container("this_container_does_not_exist")179 def test_remove_non_existing_container(self, docker_client: ContainerClient):180 with pytest.raises(NoSuchContainer):181 docker_client.remove_container("this_container_does_not_exist", force=False)182 def test_start_non_existing_container(self, docker_client: ContainerClient):183 with pytest.raises(NoSuchContainer):184 docker_client.start_container("this_container_does_not_exist")185 def test_get_network(self, docker_client: ContainerClient, dummy_container):186 n = docker_client.get_network(dummy_container.container_name)187 assert "default" == n188 def test_create_with_host_network(self, docker_client: ContainerClient, create_container):189 info = create_container("alpine", network="host")190 network = docker_client.get_network(info.container_name)191 assert "host" == network192 def test_create_with_port_mapping(self, docker_client: ContainerClient, create_container):193 ports = PortMappings()194 ports.add(45122, 22)195 ports.add(45180, 80)196 create_container("alpine", ports=ports)197 def test_create_with_volume(self, tmpdir, docker_client: ContainerClient, create_container):198 mount_volumes = [(tmpdir.realpath(), "/tmp/mypath")]199 c = create_container(200 "alpine",201 command=["sh", "-c", "echo 'foobar' > /tmp/mypath/foo.log"],202 mount_volumes=mount_volumes,203 )204 docker_client.start_container(c.container_id)205 assert tmpdir.join("foo.log").isfile(), "foo.log was not created in mounted dir"206 def test_copy_into_container(self, tmpdir, docker_client: ContainerClient, create_container):207 local_path = tmpdir.join("myfile.txt")208 container_path = "/tmp/myfile_differentpath.txt"209 self._test_copy_into_container(210 docker_client,211 create_container,212 ["cat", container_path],213 local_path,214 local_path,215 container_path,216 )217 def test_copy_into_non_existent_container(self, tmpdir, docker_client: ContainerClient):218 local_path = tmpdir.mkdir("test_dir")219 file_path = local_path.join("test_file")220 with file_path.open(mode="w") as fd:221 fd.write("foobared\n")222 with pytest.raises(NoSuchContainer):223 docker_client.copy_into_container(224 "hopefully_non_existent_container_%s" % short_uid(), str(file_path), "test_file"225 )226 def test_copy_into_container_without_target_filename(227 self, tmpdir, docker_client: ContainerClient, create_container228 ):229 local_path = tmpdir.join("myfile.txt")230 container_path = "/tmp"231 self._test_copy_into_container(232 docker_client,233 create_container,234 ["cat", "/tmp/myfile.txt"],235 local_path,236 local_path,237 container_path,238 )239 def test_copy_directory_into_container(240 self, tmpdir, docker_client: ContainerClient, create_container241 ):242 local_path = tmpdir.join("fancy_folder")243 local_path.mkdir()244 file_path = local_path.join("myfile.txt")245 container_path = "/tmp/fancy_other_folder"246 self._test_copy_into_container(247 docker_client,248 create_container,249 ["cat", "/tmp/fancy_other_folder/myfile.txt"],250 file_path,251 local_path,252 container_path,253 )254 def _test_copy_into_container(255 self, docker_client, create_container, command, file_path, local_path, container_path256 ):257 c = create_container("alpine", command=command)258 with file_path.open(mode="w") as fd:259 fd.write("foobared\n")260 docker_client.copy_into_container(c.container_name, str(local_path), container_path)261 output, _ = docker_client.start_container(c.container_id, attach=True)262 output = output.decode(config.DEFAULT_ENCODING)263 assert "foobared" in output264 def test_copy_into_container_with_existing_target(265 self, tmpdir, docker_client: ContainerClient, dummy_container266 ):267 local_path = tmpdir.join("myfile.txt")268 container_path = "/tmp/myfile.txt"269 with local_path.open(mode="w") as fd:270 fd.write("foo\n")271 docker_client.start_container(dummy_container.container_id)272 docker_client.exec_in_container(273 dummy_container.container_id, command=["sh", "-c", f"echo bar > {container_path}"]274 )275 out, _ = docker_client.exec_in_container(276 dummy_container.container_id,277 command=[278 "cat",279 "/tmp/myfile.txt",280 ],281 )282 assert "bar" in out.decode(config.DEFAULT_ENCODING)283 docker_client.copy_into_container(284 dummy_container.container_id, str(local_path), container_path285 )286 out, _ = docker_client.exec_in_container(287 dummy_container.container_id,288 command=[289 "cat",290 "/tmp/myfile.txt",291 ],292 )293 assert "foo" in out.decode(config.DEFAULT_ENCODING)294 def test_copy_directory_content_into_container(295 self, tmpdir, docker_client: ContainerClient, dummy_container296 ):297 local_path = tmpdir.join("fancy_folder")298 local_path.mkdir()299 file_path = local_path.join("myfile.txt")300 with file_path.open(mode="w") as fd:301 fd.write("foo\n")302 file_path = local_path.join("myfile2.txt")303 with file_path.open(mode="w") as fd:304 fd.write("bar\n")305 container_path = "/tmp/fancy_other_folder"306 docker_client.start_container(dummy_container.container_id)307 docker_client.exec_in_container(308 dummy_container.container_id, command=["mkdir", "-p", container_path]309 )310 docker_client.copy_into_container(311 dummy_container.container_id, f"{str(local_path)}/.", container_path312 )313 out, _ = docker_client.exec_in_container(314 dummy_container.container_id,315 command=[316 "cat",317 "/tmp/fancy_other_folder/myfile.txt",318 "/tmp/fancy_other_folder/myfile2.txt",319 ],320 )321 assert "foo" in out.decode(config.DEFAULT_ENCODING)322 assert "bar" in out.decode(config.DEFAULT_ENCODING)323 def test_get_network_non_existing_container(self, docker_client: ContainerClient):324 with pytest.raises(ContainerException):325 docker_client.get_network("this_container_does_not_exist")326 def test_list_containers(self, docker_client: ContainerClient, create_container):327 c1 = create_container("alpine", command=["echo", "1"])328 c2 = create_container("alpine", command=["echo", "2"])329 c3 = create_container("alpine", command=["echo", "3"])330 container_list = docker_client.list_containers()331 assert len(container_list) >= 3332 image_names = [info["name"] for info in container_list]333 assert c1.container_name in image_names334 assert c2.container_name in image_names335 assert c3.container_name in image_names336 def test_list_containers_filter_non_existing(self, docker_client: ContainerClient):337 container_list = docker_client.list_containers(filter="id=DOES_NOT_EXST")338 assert 0 == len(container_list)339 def test_list_containers_filter_illegal_filter(self, docker_client: ContainerClient):340 with pytest.raises(ContainerException):341 docker_client.list_containers(filter="illegalfilter=foobar")342 def test_list_containers_filter(self, docker_client: ContainerClient, create_container):343 name_prefix = "filter_tests_"344 cn1 = name_prefix + _random_container_name()345 cn2 = name_prefix + _random_container_name()346 cn3 = name_prefix + _random_container_name()347 c1 = create_container("alpine", name=cn1, command=["echo", "1"])348 c2 = create_container("alpine", name=cn2, command=["echo", "2"])349 c3 = create_container("alpine", name=cn3, command=["echo", "3"])350 # per id351 container_list = docker_client.list_containers(filter=f"id={c2.container_id}")352 assert 1 == len(container_list)353 assert c2.container_id.startswith(container_list[0]["id"])354 assert c2.container_name == container_list[0]["name"]355 assert "created" == container_list[0]["status"]356 # per name pattern357 container_list = docker_client.list_containers(filter=f"name={name_prefix}")358 assert 3 == len(container_list)359 image_names = [info["name"] for info in container_list]360 assert c1.container_name in image_names361 assert c2.container_name in image_names362 assert c3.container_name in image_names363 # multiple patterns364 container_list = docker_client.list_containers(365 filter=[366 f"id={c1.container_id}",367 f"name={container_name_prefix}",368 ]369 )370 assert 1 == len(container_list)371 assert c1.container_name == container_list[0]["name"]372 def test_get_container_entrypoint(self, docker_client: ContainerClient):373 entrypoint = docker_client.get_image_entrypoint("alpine")374 assert "" == entrypoint375 def test_get_container_entrypoint_non_existing_image(self, docker_client: ContainerClient):376 with pytest.raises(NoSuchImage):377 docker_client.get_image_entrypoint("thisdoesnotexist")378 def test_get_container_command(self, docker_client: ContainerClient):379 command = docker_client.get_image_cmd("alpine")380 assert "/bin/sh" == command381 def test_get_container_command_non_existing_image(self, docker_client: ContainerClient):382 with pytest.raises(NoSuchImage):383 docker_client.get_image_cmd("thisdoesnotexist")384 def test_create_start_container_with_stdin_to_stdout(self, docker_client: ContainerClient):385 container_name = _random_container_name()386 message = "test_message_stdin"387 try:388 docker_client.create_container(389 "alpine",390 name=container_name,391 interactive=True,392 command=["cat"],393 )394 output, _ = docker_client.start_container(395 container_name, interactive=True, stdin=message.encode(config.DEFAULT_ENCODING)396 )397 assert message == output.decode(config.DEFAULT_ENCODING).strip()398 finally:399 docker_client.remove_container(container_name)400 pass401 def test_create_start_container_with_stdin_to_file(402 self, tmpdir, docker_client: ContainerClient403 ):404 container_name = _random_container_name()405 message = "test_message_stdin"406 try:407 docker_client.create_container(408 "alpine",409 name=container_name,410 interactive=True,411 command=["sh", "-c", "cat > test_file"],412 )413 output, _ = docker_client.start_container(414 container_name, interactive=True, stdin=message.encode(config.DEFAULT_ENCODING)415 )416 target_path = tmpdir.join("test_file")417 docker_client.copy_from_container(container_name, str(target_path), "test_file")418 assert message == target_path.read().strip()419 finally:420 docker_client.remove_container(container_name)421 def test_run_container_with_stdin(self, docker_client: ContainerClient):422 container_name = _random_container_name()423 message = "test_message_stdin"424 try:425 output, _ = docker_client.run_container(426 "alpine",427 name=container_name,428 interactive=True,429 stdin=message.encode(config.DEFAULT_ENCODING),430 command=["cat"],431 )432 assert message == output.decode(config.DEFAULT_ENCODING).strip()433 finally:434 docker_client.remove_container(container_name)435 def test_exec_in_container_with_stdin(self, docker_client: ContainerClient, dummy_container):436 docker_client.start_container(dummy_container.container_id)437 message = "test_message_stdin"438 output, _ = docker_client.exec_in_container(439 dummy_container.container_id,440 interactive=True,441 stdin=message.encode(config.DEFAULT_ENCODING),442 command=["cat"],443 )444 assert message == output.decode(config.DEFAULT_ENCODING).strip()445 def test_exec_in_container_with_stdin_stdout_stderr(446 self, docker_client: ContainerClient, dummy_container447 ):448 docker_client.start_container(dummy_container.container_id)449 message = "test_message_stdin"450 output, stderr = docker_client.exec_in_container(451 dummy_container.container_id,452 interactive=True,453 stdin=message.encode(config.DEFAULT_ENCODING),454 command=["sh", "-c", "cat; >&2 echo stderrtest"],455 )456 assert message == output.decode(config.DEFAULT_ENCODING).strip()457 assert "stderrtest" == stderr.decode(config.DEFAULT_ENCODING).strip()458 def test_run_detached_with_logs(self, docker_client: ContainerClient):459 container_name = _random_container_name()460 message = "test_message"461 try:462 output, _ = docker_client.run_container(463 "alpine",464 name=container_name,465 detach=True,466 command=["echo", message],467 )468 container_id = output.decode(config.DEFAULT_ENCODING).strip()469 logs = docker_client.get_container_logs(container_id)470 assert message == logs.strip()471 finally:472 docker_client.remove_container(container_name)473 def test_get_logs_non_existent_container(self, docker_client: ContainerClient):...
test_storage.py
Source:test_storage.py
...34 obj.delete()35 container.delete()36 def test_containers(self):37 # make a new container38 container_name = _random_container_name()39 container = self.driver.create_container(container_name)40 self.assertEqual(container.name, container_name)41 container = self.driver.get_container(container_name)42 self.assertEqual(container.name, container_name)43 # check that an existing container can't be re-created44 with self.assertRaises(types.ContainerAlreadyExistsError):45 self.driver.create_container(container_name)46 # check that the new container can be listed47 containers = self.driver.list_containers()48 self.assertEqual([c.name for c in containers], [container_name])49 # delete the container50 self.driver.delete_container(container)51 # check that a deleted container can't be looked up52 with self.assertRaises(types.ContainerDoesNotExistError):53 self.driver.get_container(container_name)54 # check that the container is deleted55 containers = self.driver.list_containers()56 self.assertEqual([c.name for c in containers], [])57 def _test_objects(self, do_upload, do_download, size=1 * MB):58 content = os.urandom(size)59 blob_name = "testblob"60 container = self.driver.create_container(_random_container_name())61 # upload a file62 obj = do_upload(container, blob_name, content)63 self.assertEqual(obj.name, blob_name)64 obj = self.driver.get_object(container.name, blob_name)65 # check that the file can be listed66 blobs = self.driver.list_container_objects(container)67 self.assertEqual([blob.name for blob in blobs], [blob_name])68 # upload another file and check it's excluded in prefix listing69 do_upload(container, blob_name[::-1], content[::-1])70 blobs = self.driver.list_container_objects(container, ex_prefix=blob_name[0:3])71 self.assertEqual([blob.name for blob in blobs], [blob_name])72 # check that the file can be read back73 self.assertEqual(do_download(obj), content)74 # delete the file75 self.driver.delete_object(obj)76 # check that a missing file can't be deleted or looked up77 with self.assertRaises(types.ObjectDoesNotExistError):78 self.driver.delete_object(obj)79 with self.assertRaises(types.ObjectDoesNotExistError):80 self.driver.get_object(container.name, blob_name)81 # check that the file is deleted82 blobs = self.driver.list_container_objects(container)83 self.assertEqual([blob.name for blob in blobs], [blob_name[::-1]])84 def test_objects(self, size=1 * MB):85 def do_upload(container, blob_name, content):86 infile = self._create_tempfile(content=content)87 return self.driver.upload_object(infile, container, blob_name)88 def do_download(obj):89 outfile = self._create_tempfile()90 self.driver.download_object(obj, outfile, overwrite_existing=True)91 with open(outfile, "rb") as fobj:92 return fobj.read()93 self._test_objects(do_upload, do_download, size)94 def test_objects_range_downloads(self):95 blob_name = "testblob-range"96 content = b"0123456789"97 container = self.driver.create_container(_random_container_name())98 infile = self._create_tempfile(content=content)99 obj = self.driver.upload_object(infile, container, blob_name)100 self.assertEqual(obj.name, blob_name)101 self.assertEqual(obj.size, len(content))102 obj = self.driver.get_object(container.name, blob_name)103 self.assertEqual(obj.name, blob_name)104 self.assertEqual(obj.size, len(content))105 values = [106 {"start_bytes": 0, "end_bytes": 1, "expected_content": b"0"},107 {"start_bytes": 1, "end_bytes": 5, "expected_content": b"1234"},108 {"start_bytes": 5, "end_bytes": None, "expected_content": b"56789"},109 {"start_bytes": 5, "end_bytes": len(content), "expected_content": b"56789"},110 {"start_bytes": 0, "end_bytes": None, "expected_content": b"0123456789"},111 {112 "start_bytes": 0,113 "end_bytes": len(content),114 "expected_content": b"0123456789",115 },116 ]117 for value in values:118 # 1. download_object_range119 start_bytes = value["start_bytes"]120 end_bytes = value["end_bytes"]121 outfile = self._create_tempfile()122 result = self.driver.download_object_range(123 obj,124 outfile,125 start_bytes=start_bytes,126 end_bytes=end_bytes,127 overwrite_existing=True,128 )129 self.assertTrue(result)130 with open(outfile, "rb") as fobj:131 downloaded_content = fobj.read()132 if end_bytes is not None:133 expected_content = content[start_bytes:end_bytes]134 else:135 expected_content = content[start_bytes:]136 msg = 'Expected "%s", got "%s" for values: %s' % (137 expected_content,138 downloaded_content,139 str(value),140 )141 self.assertEqual(downloaded_content, expected_content, msg)142 self.assertEqual(downloaded_content, value["expected_content"], msg)143 # 2. download_object_range_as_stream144 downloaded_content = _read_stream(145 self.driver.download_object_range_as_stream(146 obj, start_bytes=start_bytes, end_bytes=end_bytes147 )148 )149 self.assertEqual(downloaded_content, expected_content)150 @unittest.skipUnless(os.getenv("LARGE_FILE_SIZE_MB"), "config not set")151 def test_objects_large(self):152 size = int(float(os.environ["LARGE_FILE_SIZE_MB"]) * MB)153 self.test_objects(size)154 def test_objects_stream_io(self):155 def do_upload(container, blob_name, content):156 content = io.BytesIO(content)157 return self.driver.upload_object_via_stream(content, container, blob_name)158 def do_download(obj):159 return _read_stream(self.driver.download_object_as_stream(obj))160 self._test_objects(do_upload, do_download)161 def test_objects_stream_iterable(self):162 def do_upload(container, blob_name, content):163 content = iter([content[i : i + 1] for i in range(len(content))])164 return self.driver.upload_object_via_stream(content, container, blob_name)165 def do_download(obj):166 return _read_stream(self.driver.download_object_as_stream(obj))167 self._test_objects(do_upload, do_download)168 def test_upload_via_stream_with_content_encoding(self):169 object_name = "content_encoding.gz"170 content = gzip.compress(os.urandom(MB // 100))171 container = self.driver.create_container(_random_container_name())172 self.driver.upload_object_via_stream(173 iter(content), container, object_name, headers={"Content-Encoding": "gzip"}174 )175 obj = self.driver.get_object(container.name, object_name)176 self.assertEqual(obj.extra.get("content_encoding"), "gzip")177 def test_cdn_url(self):178 content = os.urandom(MB // 100)179 container = self.driver.create_container(_random_container_name())180 obj = self.driver.upload_object_via_stream(iter(content), container, "cdn")181 response = requests.get(self.driver.get_object_cdn_url(obj))182 response.raise_for_status()183 self.assertEqual(response.content, content)184 def _create_tempfile(self, prefix="", content=b""):185 fobj, path = tempfile.mkstemp(prefix=prefix, text=False)186 os.write(fobj, content)187 os.close(fobj)188 self.addCleanup(os.remove, path)189 return path190class AzureStorageTest(SmokeStorageTest):191 class Config:192 username = None193 password = None194 tenant = None195 subscription = None196 location = "eastus"197 template_file = os.path.splitext(__file__)[0] + ".arm.json"198 kind = "StorageV2"199 client = None200 resource_group_name = None201 @classmethod202 def setUpClass(cls):203 try:204 from azure.common.credentials import ServicePrincipalCredentials205 from azure.mgmt.resource import ResourceManagementClient206 except ImportError:207 raise unittest.SkipTest("missing azure-mgmt-resource library")208 cls.client = ResourceManagementClient(209 credentials=ServicePrincipalCredentials(210 client_id=cls.Config.username,211 secret=cls.Config.password,212 tenant=cls.Config.tenant,213 ),214 subscription_id=cls.Config.subscription,215 )216 cls.resource_group_name = "libcloudtest" + _random_string(length=23)217 cls.client.resource_groups.create_or_update(218 resource_group_name=cls.resource_group_name,219 parameters={"location": cls.Config.location},220 )221 with io.open(cls.Config.template_file, encoding="utf-8") as fobj:222 template = json.load(fobj)223 deployment = cls.client.deployments.create_or_update(224 resource_group_name=cls.resource_group_name,225 deployment_name=os.path.basename(__file__),226 properties={227 "template": template,228 "mode": "incremental",229 "parameters": {"storageAccountKind": {"value": cls.Config.kind}},230 },231 ).result()232 for key, value in deployment.properties.outputs.items():233 setattr(cls.Config, key.lower(), value["value"])234 cls.Config.provider = "azure_blobs"235 @classmethod236 def tearDownClass(cls):237 while True:238 groups = [239 group240 for group in cls.client.resource_groups.list()241 if group.name.startswith(cls.resource_group_name)242 ]243 if not groups:244 break245 for group in groups:246 if group.properties.provisioning_state != "Deleting":247 try:248 cls.client.resource_groups.delete(249 resource_group_name=group.name, polling=False250 )251 except Exception: # pylint: disable=broad-except252 pass253 time.sleep(3)254class AzuriteStorageTest(SmokeStorageTest):255 class Config:256 port = 10000257 version = "latest"258 client = None259 container = None260 image = "arafato/azurite"261 has_sas_support = False262 @classmethod263 def setUpClass(cls):264 cls.client = _new_docker_client()265 cls.container = cls.client.containers.run(266 "{}:{}".format(cls.image, cls.Config.version),267 detach=True,268 auto_remove=True,269 ports={cls.Config.port: 10000},270 environment={"executable": "blob"},271 )272 cls.Config.account = "devstoreaccount1"273 cls.Config.secret = (274 "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uS"275 "RZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="276 )277 cls.Config.host = "localhost"278 cls.Config.secure = False279 cls.Config.provider = "azure_blobs"280 time.sleep(5)281 @classmethod282 def tearDownClass(cls):283 _kill_and_log(cls.container)284 def test_cdn_url(self):285 if not self.has_sas_support:286 self.skipTest("Storage backend has no account SAS support")287class AzuriteV3StorageTest(AzuriteStorageTest):288 image = "mcr.microsoft.com/azure-storage/azurite"289 has_sas_support = True290 def test_upload_via_stream_with_content_encoding(self):291 self.skipTest(292 "Possible bug in AzuriteV3, see https://github.com/Azure/Azurite/issues/629"293 )294class IotedgeStorageTest(SmokeStorageTest):295 class Config:296 port = 11002297 version = "latest"298 client = None299 container = None300 @classmethod301 def setUpClass(cls):302 cls.client = _new_docker_client()303 account = _random_string(10)304 key = base64.b64encode(_random_string(20).encode("ascii")).decode("ascii")305 cls.container = cls.client.containers.run(306 "mcr.microsoft.com/azure-blob-storage:{}".format(cls.Config.version),307 detach=True,308 auto_remove=True,309 ports={cls.Config.port: 11002},310 environment={311 "LOCAL_STORAGE_ACCOUNT_NAME": account,312 "LOCAL_STORAGE_ACCOUNT_KEY": key,313 },314 )315 cls.Config.account = account316 cls.Config.secret = key317 cls.Config.host = "localhost"318 cls.Config.secure = False319 cls.Config.provider = "azure_blobs"320 time.sleep(5)321 @classmethod322 def tearDownClass(cls):323 _kill_and_log(cls.container)324def _new_docker_client():325 try:326 import docker327 except ImportError:328 raise unittest.SkipTest("missing docker library")329 return docker.from_env()330def _kill_and_log(container):331 for line in container.logs().splitlines():332 print(line)333 container.kill()334def _random_container_name(prefix="test"):335 max_length = 63336 suffix = _random_string(max_length)337 name = prefix + suffix338 name = re.sub("[^a-z0-9-]", "-", name)339 name = re.sub("-+", "-", name)340 name = name[:max_length]341 name = name.lower()342 return name343def _random_string(length, alphabet=string.ascii_lowercase + string.digits):344 return "".join(random.choice(alphabet) for _ in range(length))345def _read_stream(stream):346 buffer = io.BytesIO()347 buffer.writelines(stream)348 buffer.seek(0)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!