How to use bucket_exists method in localstack

Best Python code snippet using localstack_python

it_object_store_dataset_manager.py

Source:it_object_store_dataset_manager.py Github

copy

Full Screen

...84 """ Test that a simple data add creates a new object as expected. """85 ex_num = 186 dataset_name = self.examples[ex_num]['name']87 dest_object_name = 'data_file'88 self.assertFalse(self.minio_client.bucket_exists(dataset_name))89 self.manager.create(**self.examples[ex_num])90 does_exist = self.minio_client.bucket_exists(dataset_name)91 if does_exist:92 self._datasets_to_cleanup.add(dataset_name)93 self.assertNotIn(dest_object_name, self.manager.list_files(dataset_name))94 original_data = "File data contents"95 result = self.manager.add_data(dataset_name=dataset_name, dest=dest_object_name, data=original_data.encode())96 self.assertTrue(result)97 self.assertIn(dest_object_name, self.manager.list_files(dataset_name))98 def test_add_data_1_b(self):99 """ Test that a simple data add of raw data works correctly. """100 ex_num = 1101 dataset_name = self.examples[ex_num]['name']102 dest_object_name = 'data_file'103 self.assertFalse(self.minio_client.bucket_exists(dataset_name))104 self.manager.create(**self.examples[ex_num])105 does_exist = self.minio_client.bucket_exists(dataset_name)106 if does_exist:107 self._datasets_to_cleanup.add(dataset_name)108 original_data = "File data contents"109 self.manager.add_data(dataset_name=dataset_name, dest=dest_object_name, data=original_data.encode())110 raw_read_data = self.manager.get_data(dataset_name, item_name=dest_object_name)111 read_data = raw_read_data.decode()112 self.assertEqual(original_data, read_data)113 def test_add_data_1_c(self):114 """ Test that a data add of a file works correctly with specified dest. """115 ex_num = 1116 dataset_name = self.examples[ex_num]['name']117 file_to_add = Path(self.find_git_root_dir()).joinpath('doc/GIT_USAGE.md')118 expected_name = 'GIT_USAGE.md'119 self.assertTrue(file_to_add.is_file())120 expected_data = file_to_add.read_bytes()121 self.assertFalse(self.minio_client.bucket_exists(dataset_name))122 self.manager.create(**self.examples[ex_num])123 does_exist = self.minio_client.bucket_exists(dataset_name)124 if does_exist:125 self._datasets_to_cleanup.add(dataset_name)126 self.manager.add_data(dataset_name=dataset_name, dest=expected_name, source=str(file_to_add))127 raw_read_data = self.manager.get_data(dataset_name, item_name=expected_name)128 self.assertEqual(expected_data, raw_read_data)129 def test_add_data_1_d(self):130 """ Test that a data add of a directory of files works correctly with implied bucket root. """131 ex_num = 1132 dataset_name = self.examples[ex_num]['name']133 dir_to_add = Path(self.find_git_root_dir()).joinpath('doc')134 # Note that if the project's doc dir is altered in certain ways, this may have to be manually updated135 self.assertTrue(dir_to_add.is_dir())136 one_files_name = 'GIT_USAGE.md'137 one_file = dir_to_add.joinpath(one_files_name)138 one_files_expected_data = one_file.read_bytes()139 num_uploaded_files = 0140 for p in dir_to_add.iterdir():141 if p.is_file():142 num_uploaded_files += 1143 # This is actually one more, because of the serialized dataset state file144 expected_num_files = num_uploaded_files + 1145 self.assertFalse(self.minio_client.bucket_exists(dataset_name))146 self.manager.create(**self.examples[ex_num])147 does_exist = self.minio_client.bucket_exists(dataset_name)148 if does_exist:149 self._datasets_to_cleanup.add(dataset_name)150 self.manager.add_data(dataset_name=dataset_name, dest='', source=str(dir_to_add))151 actual_num_files = len(self.manager.list_files(dataset_name))152 self.assertEqual(expected_num_files, actual_num_files)153 raw_read_data = self.manager.get_data(dataset_name, item_name=one_files_name)154 self.assertEqual(one_files_expected_data, raw_read_data)155 def test_create_1_a(self):156 """157 Test that create works for a dataset that does not already exist.158 """159 ex_num = 1160 dataset_name = self.examples[ex_num]['name']161 self.assertFalse(self.minio_client.bucket_exists(dataset_name))162 self.manager.create(**self.examples[ex_num])163 does_exist = self.minio_client.bucket_exists(dataset_name)164 if does_exist:165 self._datasets_to_cleanup.add(dataset_name)166 self.assertTrue(does_exist)167 def test_create_1_b(self):168 """169 Test that create writes the state serialization file to the newly created dataset.170 """171 ex_num = 1172 dataset_name = self.examples[ex_num]['name']173 serial_file_name = self.manager._gen_dataset_serial_obj_name(dataset_name)174 self.assertFalse(self.minio_client.bucket_exists(dataset_name))175 self.manager.create(**self.examples[ex_num])176 does_exist = self.minio_client.bucket_exists(dataset_name)177 if does_exist:178 self._datasets_to_cleanup.add(dataset_name)179 result = self.minio_client.get_object(bucket_name=dataset_name, object_name=serial_file_name)180 self.assertIsNotNone(result)181 def test_get_data_1_a(self):182 """ Test that we can get the serialized file for a newly created dataset. """183 ex_num = 1184 dataset_name = self.examples[ex_num]['name']185 serial_file_name = self.manager._gen_dataset_serial_obj_name(dataset_name)186 self.assertFalse(self.minio_client.bucket_exists(dataset_name))187 self.manager.create(**self.examples[ex_num])188 does_exist = self.minio_client.bucket_exists(dataset_name)189 if does_exist:190 self._datasets_to_cleanup.add(dataset_name)191 data = self.manager.get_data(dataset_name, item_name=serial_file_name)192 self.assertIsInstance(data, bytes)193 def test_get_data_1_b(self):194 """ Test that we can get the serialized file for a newly created dataset, and that it decodes. """195 ex_num = 1196 dataset_name = self.examples[ex_num]['name']197 serial_file_name = self.manager._gen_dataset_serial_obj_name(dataset_name)198 self.assertFalse(self.minio_client.bucket_exists(dataset_name))199 self.manager.create(**self.examples[ex_num])200 does_exist = self.minio_client.bucket_exists(dataset_name)201 if does_exist:202 self._datasets_to_cleanup.add(dataset_name)203 data_dict = json.loads(self.manager.get_data(dataset_name, item_name=serial_file_name).decode())204 self.assertEqual(dataset_name, data_dict[ObjectStoreDataset._KEY_NAME])205 def test_list_files_1_a(self):206 """207 Test that list files includes the serialized file for a newly created dataset.208 """209 ex_num = 1210 dataset_name = self.examples[ex_num]['name']211 serial_file_name = self.manager._gen_dataset_serial_obj_name(dataset_name)212 self.assertFalse(self.minio_client.bucket_exists(dataset_name))213 self.manager.create(**self.examples[ex_num])214 does_exist = self.minio_client.bucket_exists(dataset_name)215 if does_exist:216 self._datasets_to_cleanup.add(dataset_name)217 self.assertTrue(serial_file_name in self.manager.list_files(dataset_name))218 def test_persist_serialized_1_a(self):219 """220 Test that serialized persistence works for new dataset after an extra, manual persist call.221 """222 ex_num = 1223 dataset_name = self.examples[ex_num]['name']224 self.assertFalse(self.minio_client.bucket_exists(dataset_name))225 self.manager.create(**self.examples[ex_num])226 does_exist = self.minio_client.bucket_exists(dataset_name)227 if does_exist:228 self._datasets_to_cleanup.add(dataset_name)229 self.manager.persist_serialized(name=dataset_name)230 expected_obj_name = self.manager._gen_dataset_serial_obj_name(dataset_name)231 result = self.minio_client.get_object(bucket_name=dataset_name, object_name=expected_obj_name)232 self.assertIsNotNone(result)233 def test_persist_serialized_1_b(self):234 """235 Test that ``persist_serialized`` (during create) writes a serialization file that can be deserialized properly.236 """237 ex_num = 1238 dataset_name = self.examples[ex_num]['name']239 serial_file_name = self.manager._gen_dataset_serial_obj_name(dataset_name)240 self.assertFalse(self.minio_client.bucket_exists(dataset_name))241 self.manager.create(**self.examples[ex_num])242 does_exist = self.minio_client.bucket_exists(dataset_name)243 if does_exist:244 self._datasets_to_cleanup.add(dataset_name)245 data_dict = json.loads(self.manager.get_data(dataset_name, item_name=serial_file_name).decode())246 dataset = ObjectStoreDataset.factory_init_from_deserialized_json(data_dict)247 expected_dataset = self.manager.datasets[dataset_name]248 self.assertEqual(expected_dataset, dataset)249 def test_persist_serialized_1_c(self):250 """251 Test that serialized persistence works for new dataset and correctly saves dataset domain.252 """253 ex_num = 1254 dataset_name = self.examples[ex_num]['name']255 self.assertFalse(self.minio_client.bucket_exists(dataset_name))256 self.manager.create(**self.examples[ex_num])257 does_exist = self.minio_client.bucket_exists(dataset_name)258 if does_exist:259 self._datasets_to_cleanup.add(dataset_name)260 serial_dataset_obj_name = self.manager._gen_dataset_serial_obj_name(dataset_name)261 response_obj = self.minio_client.get_object(bucket_name=dataset_name, object_name=serial_dataset_obj_name)262 response_data = json.loads(response_obj.data.decode())263 expected_domain = self.examples[ex_num]['domain']264 serialized_domain = DataDomain.factory_init_from_deserialized_json(response_data['data_domain'])265 self.assertEqual(expected_domain, serialized_domain)266 def test_persist_serialized_1_d(self):267 """268 Test that serialized persistence works for new dataset and correctly saves several other dataset attributes.269 """270 ex_num = 1271 dataset_name = self.examples[ex_num]['name']272 self.assertFalse(self.minio_client.bucket_exists(dataset_name))273 self.manager.create(**self.examples[ex_num])274 does_exist = self.minio_client.bucket_exists(dataset_name)275 if does_exist:276 self._datasets_to_cleanup.add(dataset_name)277 expected_dataset = self.manager.datasets[dataset_name]278 serial_dataset_obj_name = self.manager._gen_dataset_serial_obj_name(dataset_name)279 response_obj = self.minio_client.get_object(bucket_name=dataset_name, object_name=serial_dataset_obj_name)280 response_data = json.loads(response_obj.data.decode())281 deserialized_dataset = ObjectStoreDataset.factory_init_from_deserialized_json(response_data)282 self.assertEqual(expected_dataset, deserialized_dataset)283 def test_persist_serialized_1_e(self):284 """285 Test that create writes a state serialization file that is loaded properly by a new manager instance.286 """287 ex_num = 1288 dataset_name = self.examples[ex_num]['name']289 self.assertFalse(self.minio_client.bucket_exists(dataset_name))290 self.manager.create(**self.examples[ex_num])291 does_exist = self.minio_client.bucket_exists(dataset_name)292 if does_exist:293 self._datasets_to_cleanup.add(dataset_name)294 self._initialize_manager(reset_existing=True)295 self.assertIn(dataset_name, self.manager.datasets)296 def test_persist_serialized_1_f(self):297 """298 Test that create writes a state serialization file that is loaded properly by a new manager instance.299 """300 ex_num = 1301 dataset_name = self.examples[ex_num]['name']302 self.assertFalse(self.minio_client.bucket_exists(dataset_name))303 self.manager.create(**self.examples[ex_num])304 does_exist = self.minio_client.bucket_exists(dataset_name)305 if does_exist:306 self._datasets_to_cleanup.add(dataset_name)307 expected_dataset = self.manager.datasets[dataset_name]308 self._initialize_manager(reset_existing=True)309 dataset = self.manager.datasets[dataset_name]310 self.assertEqual(expected_dataset, dataset)311 def test_persist_serialized_1_g(self):312 """313 Test that create writes a state serialization file that is loaded properly by a new manager instance.314 """315 ex_num = 1316 dataset_name = self.examples[ex_num]['name']317 serial_file_name = self.manager._gen_dataset_serial_obj_name(dataset_name)318 self.assertFalse(self.minio_client.bucket_exists(dataset_name))319 self.manager.create(**self.examples[ex_num])320 does_exist = self.minio_client.bucket_exists(dataset_name)321 if does_exist:322 self._datasets_to_cleanup.add(dataset_name)323 # Get the initial data from the serial file324 expected_data = self.manager.get_data(dataset_name, item_name=serial_file_name)325 # Create a new manager object, which should reload the dataset from the bucket and serial file326 self._initialize_manager(reset_existing=True)327 # Now load the data from the new manager328 DataFormat.NGEN_GEOJSON_HYDROFABRIC329 data = self.manager.get_data(dataset_name, item_name=serial_file_name)330 self.assertEqual(expected_data, data)331 def test_datasets_1_a(self):332 """333 Test that ``datasets`` property does not initially have testing dataset.334 """335 ex_num = 1336 dataset_name = self.examples[ex_num]['name']337 self.assertFalse(self.minio_client.bucket_exists(dataset_name))338 self.assertFalse(dataset_name in self.manager.datasets)339 def test_datasets_1_b(self):340 """341 Test that ``datasets`` property shows dataset after it is created.342 """343 ex_num = 1344 dataset_name = self.examples[ex_num]['name']345 self.assertFalse(self.minio_client.bucket_exists(dataset_name))346 all_datasets = self.manager.datasets347 self.assertFalse(dataset_name in self.manager.datasets)348 self.manager.create(**self.examples[ex_num])349 does_exist = self.minio_client.bucket_exists(dataset_name)350 if does_exist:351 self._datasets_to_cleanup.add(dataset_name)352 self.assertTrue(dataset_name in self.manager.datasets)353 def test_datasets_1_c(self):354 """355 Test that ``datasets`` property store actual dataset object of correct type after it is created.356 """357 ex_num = 1358 dataset_name = self.examples[ex_num]['name']359 self.assertFalse(self.minio_client.bucket_exists(dataset_name))360 self.assertFalse(dataset_name in self.manager.datasets)361 self.manager.create(**self.examples[ex_num])362 does_exist = self.minio_client.bucket_exists(dataset_name)363 if does_exist:364 self._datasets_to_cleanup.add(dataset_name)365 self.assertIsInstance(self.manager.datasets[dataset_name], ObjectStoreDataset)366 def test_datasets_1_d(self):367 """368 Test that ``datasets`` property store actual dataset object after it is created.369 """370 ex_num = 1371 dataset_name = self.examples[ex_num]['name']372 self.assertFalse(self.minio_client.bucket_exists(dataset_name))373 self.assertFalse(dataset_name in self.manager.datasets)374 self.manager.create(**self.examples[ex_num])375 does_exist = self.minio_client.bucket_exists(dataset_name)376 if does_exist:377 self._datasets_to_cleanup.add(dataset_name)378 self.assertEqual(self.manager.datasets[dataset_name].name, dataset_name)...

Full Screen

Full Screen

test_gcloud.py

Source:test_gcloud.py Github

copy

Full Screen

1# Copyright 2020 Neal Lathia2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import os15import mock16import pytest17from google.cloud import storage18from google.cloud.storage.blob import Blob19from modelstore.metadata import metadata20from modelstore.storage.gcloud import GoogleCloudStorage21# pylint: disable=unused-import22from tests.storage.test_utils import (23 TEST_FILE_CONTENTS,24 TEST_FILE_NAME,25 remote_file_path,26 temp_file,27)28# pylint: disable=redefined-outer-name29# pylint: disable=protected-access30# pylint: disable=missing-function-docstring31_MOCK_BUCKET_NAME = "gcloud-bucket"32_MOCK_PROJECT_NAME = "project-name"33def gcloud_bucket(bucket_exists: bool):34 mock_bucket = mock.create_autospec(storage.Bucket)35 mock_bucket.name = _MOCK_BUCKET_NAME36 mock_bucket.exists.return_value = bucket_exists37 return mock_bucket38def gcloud_blob(blob_exists: bool, file_contents: str):39 mock_blob = mock.create_autospec(storage.Blob)40 mock_blob.exists.return_value = blob_exists41 if blob_exists:42 mock_blob.download_as_string.return_value = file_contents43 return mock_blob44def gcloud_anon_client(bucket_exists: bool, blob_exists: bool, file_contents: str):45 # Create a storage client46 mock_client = mock.create_autospec(storage.Client)47 # Add a bucket to the client; anonymous clients use .bucket48 # instead of .get_bucket49 mock_bucket = gcloud_bucket(bucket_exists)50 mock_client.bucket.return_value = mock_bucket51 # If the bucket exists, add a file to it52 if bucket_exists:53 mock_blob = gcloud_blob(blob_exists, file_contents)54 mock_bucket.blob.return_value = mock_blob55 if blob_exists:56 # The anonymous client can list the blobs57 mock_client.list_blobs.return_value = [mock_blob]58 return mock_client59def gcloud_authed_client(bucket_exists: bool, blob_exists: bool, file_contents: str):60 # Create a storage client61 mock_client = mock.create_autospec(storage.Client)62 # Add a bucket to the client63 mock_bucket = gcloud_bucket(bucket_exists)64 mock_client.get_bucket.return_value = mock_bucket65 # If the bucket exists, add a file to it66 if bucket_exists:67 mock_blob = gcloud_blob(blob_exists, file_contents)68 mock_bucket.blob.return_value = mock_blob69 return mock_client70def gcloud_storage(mock_client: storage.Client, is_anon_client: bool):71 return GoogleCloudStorage(72 project_name=_MOCK_PROJECT_NAME,73 bucket_name=_MOCK_BUCKET_NAME,74 client=mock_client,75 is_anon_client=is_anon_client,76 )77def gcloud_client(bucket_exists: bool, blob_exists: bool, is_anon_client: bool, file_contents: str = TEST_FILE_CONTENTS):78 if is_anon_client:79 client = gcloud_anon_client(bucket_exists, blob_exists, file_contents)80 return client, gcloud_storage(client, is_anon_client)81 else:82 client = gcloud_authed_client(bucket_exists, blob_exists, file_contents)83 return client, gcloud_storage(client, is_anon_client)84def test_create_from_environment_variables(monkeypatch):85 monkeypatch.setenv("MODEL_STORE_GCP_PROJECT", _MOCK_PROJECT_NAME)86 monkeypatch.setenv("MODEL_STORE_GCP_BUCKET", _MOCK_BUCKET_NAME)87 # Does not fail when environment variables exist88 # pylint: disable=bare-except89 try:90 _ = GoogleCloudStorage()91 except:92 pytest.fail("Failed to initialise storage from env variables")93def test_create_fails_with_missing_environment_variables(monkeypatch):94 # Fails when environment variables are missing95 for key in GoogleCloudStorage.BUILD_FROM_ENVIRONMENT.get("required", []):96 monkeypatch.delenv(key, raising=False)97 with pytest.raises(KeyError):98 _ = GoogleCloudStorage()99@pytest.mark.parametrize(100 "bucket_exists,is_anon_client,validate_should_pass",101 [102 (False, False, False),103 (True, False, True),104 (False, True, False),105 (True, True, True),106 ],107)108def test_validate(bucket_exists, is_anon_client, validate_should_pass):109 _, storage = gcloud_client(110 bucket_exists=bucket_exists,111 blob_exists=False,112 is_anon_client=is_anon_client113 )114 assert storage.validate() == validate_should_pass115def test_push(tmp_path):116 _, storage = gcloud_client(117 bucket_exists=True,118 blob_exists=False,119 is_anon_client=False120 )121 # Push a file122 prefix = remote_file_path()123 result = storage._push(temp_file(tmp_path), prefix)124 # Assert that the correct prefix is returned125 # and that an upload happened126 assert result == prefix127 # Assert that an upload happened128 mock_bucket = storage.client.get_bucket(storage.bucket_name)129 mock_blob = mock_bucket.blob(prefix)130 mock_blob.upload_from_file.assert_called()131def test_anonymous_push(tmp_path):132 _, storage = gcloud_client(133 bucket_exists=True,134 blob_exists=False,135 is_anon_client=True136 )137 prefix = remote_file_path()138 with pytest.raises(NotImplementedError):139 _ = storage._push(temp_file(tmp_path), prefix)140def test_pull(tmp_path):141 _, storage = gcloud_client(142 bucket_exists=True,143 blob_exists=True,144 is_anon_client=False145 )146 # Pull the file back from storage147 prefix = remote_file_path()148 result = storage._pull(prefix, tmp_path)149 # Assert returned path150 local_destination = os.path.join(tmp_path, TEST_FILE_NAME)151 assert result == local_destination152 # Assert download happened153 mock_bucket = storage.client.get_bucket(storage.bucket_name)154 mock_blob = mock_bucket.blob(prefix)155 mock_blob.download_to_filename.assert_called_with(local_destination)156def test_anonymous_pull(tmp_path):157 _, storage = gcloud_client(158 bucket_exists=True,159 blob_exists=True,160 is_anon_client=True161 )162 # Pull the file back from storage163 prefix = remote_file_path()164 result = storage._pull(prefix, tmp_path)165 # Assert returned path166 local_destination = os.path.join(tmp_path, TEST_FILE_NAME)167 assert result == local_destination168 # Assert download happened169 mock_bucket = storage.client.bucket(storage.bucket_name)170 mock_blob = mock_bucket.blob(prefix)171 mock_blob.download_to_filename.assert_called_with(local_destination)172@pytest.mark.parametrize(173 "blob_exists,should_call_delete",174 [175 (False, False),176 (True, True),177 ],178)179def test_remove(blob_exists, should_call_delete):180 mock_client, storage = gcloud_client(181 bucket_exists=True,182 blob_exists=blob_exists,183 is_anon_client=False184 )185 prefix = remote_file_path()186 mock_bucket = mock_client.get_bucket(storage.bucket_name)187 mock_blob = mock_bucket.blob(prefix)188 file_removed = storage._remove(prefix)189 assert file_removed == should_call_delete190 if should_call_delete:191 # Asserts that removing the file results in a removal192 mock_blob.delete.assert_called()193 else:194 # Asserts that we don't call delete on a file that doesn't exist195 mock_blob.delete.assert_not_called()196def test_anonymous_remove():197 _, storage = gcloud_client(198 bucket_exists=True,199 blob_exists=True,200 is_anon_client=True201 )202 prefix = remote_file_path()203 with pytest.raises(NotImplementedError):204 _ = storage._remove(prefix)205def test_read_json_objects_ignores_non_json():206 mock_client, storage = gcloud_client(207 bucket_exists=True,208 blob_exists=False,209 is_anon_client=False210 )211 mock_client.list_blobs.return_value = [212 Blob(name="test-file-source-1.txt", bucket=_MOCK_BUCKET_NAME),213 Blob(name="test-file-source-2.txt", bucket=_MOCK_BUCKET_NAME),214 ]215 items = storage._read_json_objects("")216 assert len(items) == 0217def test_read_json_object_fails_gracefully():218 _, storage = gcloud_client(219 bucket_exists=True,220 blob_exists=True,221 is_anon_client=False,222 file_contents="not json"223 )224 prefix = remote_file_path()225 # Read a file that does not contain any JSON226 # Argument (remote prefix) is ignored here because of mock above227 item = storage._read_json_object(prefix)228 assert item is None229def test_storage_location():230 _, storage = gcloud_client(231 bucket_exists=False,232 blob_exists=False,233 is_anon_client=False234 )235 # Asserts that the location meta data is correctly formatted236 prefix = remote_file_path()237 expected = metadata.Storage.from_bucket(238 storage_type="google:cloud-storage",239 bucket=_MOCK_BUCKET_NAME,240 prefix=prefix,241 )242 assert storage._storage_location(prefix) == expected243@pytest.mark.parametrize(244 "meta_data,should_raise,result",245 [246 (247 metadata.Storage(248 type=None,249 path=None,250 bucket=_MOCK_BUCKET_NAME,251 container=None,252 prefix="/path/to/file"253 ),254 False,255 "/path/to/file",256 ),257 (258 metadata.Storage(259 type=None,260 path=None,261 bucket="a-different-bucket",262 container=None,263 prefix="/path/to/file"264 ),265 True,266 None,267 ),268 ],269)270def test_get_location(meta_data, should_raise, result):271 _, storage = gcloud_client(272 bucket_exists=False,273 blob_exists=False,274 is_anon_client=False275 )276 # Asserts that pulling the location out of meta data is correct277 if should_raise:278 with pytest.raises(ValueError):279 storage._get_storage_location(meta_data)280 else:...

Full Screen

Full Screen

bucket_exists.py

Source:bucket_exists.py Github

copy

Full Screen

...22# language governing permissions and limitations under the License.23import logging24import boto325from botocore.exceptions import ClientError26def bucket_exists(bucket_name):27 """Determine whether bucket_name exists and the user has permission to access it28 :param bucket_name: string29 :return: True if the referenced bucket_name exists, otherwise False30 """31 s3 = boto3.client('s3')32 try:33 response = s3.head_bucket(Bucket=bucket_name)34 except ClientError as e:35 logging.debug(e)36 return False37 return True38def main():39 """Exercise bucket_exists()"""40 # Assign this value before running the program41 test_bucket_name = 'BUCKET_NAME'42 # Set up logging43 logging.basicConfig(level=logging.DEBUG,44 format='%(levelname)s: %(asctime)s: %(message)s')45 # Check if the bucket exists46 if bucket_exists(test_bucket_name):47 logging.info(f'{test_bucket_name} exists and you have permission to access it.')48 else:49 logging.info(f'{test_bucket_name} does not exist or '50 f'you do not have permission to access it.')51if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful