How to use get_object_tagging method in localstack

Best Python code snippet using localstack_python

test_s3_tagging_v.py

Source:test_s3_tagging_v.py Github

copy

Full Screen

...23 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)24 input_tag_set = self.create_simple_tag_set(2)25 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)26 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)27 response = client.get_object_tagging(Bucket=bucket_name, Key=key)28 self.eq(response['TagSet'], input_tag_set['TagSet'])29 def test_get_obj_head_tagging(self, s3cfg_global_unique):30 """31 测试-验证head-object里含有设置的tag32 """33 key = 'testputtags'34 client = get_client(s3cfg_global_unique)35 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)36 count = 237 input_tag_set = self.create_simple_tag_set(count)38 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)39 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)40 response = client.head_object(Bucket=bucket_name, Key=key)41 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)42 self.eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-tagging-count'], str(count))43 def test_put_max_tags(self, s3cfg_global_unique):44 """45 测试-验证最大允许设置的tags(10个)46 """47 key = 'testputmaxtags'48 client = get_client(s3cfg_global_unique)49 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)50 input_tag_set = self.create_simple_tag_set(10)51 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)52 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)53 response = client.get_object_tagging(Bucket=bucket_name, Key=key)54 self.eq(response['TagSet'], input_tag_set['TagSet'])55 def test_put_excess_tags(self, s3cfg_global_unique):56 """57 测试-验证最大允许设置的tags(11个), failed58 """59 key = 'testputmaxtags'60 client = get_client(s3cfg_global_unique)61 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)62 input_tag_set = self.create_simple_tag_set(11)63 e = assert_raises(64 ClientError, client.put_object_tagging, Bucket=bucket_name, Key=key, Tagging=input_tag_set)65 status, error_code = self.get_status_and_error_code(e.response)66 self.eq(status, 400)67 self.eq(error_code, 'InvalidTag')68 response = client.get_object_tagging(Bucket=bucket_name, Key=key)69 self.eq(len(response['TagSet']), 0)70 def test_put_max_kvsize_tags(self, s3cfg_global_unique):71 """72 测试-验证设置tag是key和value的最大字符数:key为128,value为25673 """74 key = 'testputmaxkeysize'75 client = get_client(s3cfg_global_unique)76 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)77 tag_set = []78 for i in range(10):79 k = self.make_random_string(128)80 v = self.make_random_string(256)81 tag_set.append({'Key': k, 'Value': v})82 input_tag_set = {'TagSet': tag_set}83 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)84 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)85 response = client.get_object_tagging(Bucket=bucket_name, Key=key)86 for kv_pair in response['TagSet']:87 self.eq((kv_pair in input_tag_set['TagSet']), True)88 def test_put_excess_key_tags(self, s3cfg_global_unique):89 """90 测试-验证设置tag是key和value的最大字符数:key为129,value为256,failed91 """92 key = 'testputexcesskeytags'93 client = get_client(s3cfg_global_unique)94 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)95 tag_set = []96 for i in range(10):97 k = self.make_random_string(129)98 v = self.make_random_string(256)99 tag_set.append({'Key': k, 'Value': v})100 input_tag_set = {'TagSet': tag_set}101 e = assert_raises(ClientError, client.put_object_tagging, Bucket=bucket_name, Key=key, Tagging=input_tag_set)102 status, error_code = self.get_status_and_error_code(e.response)103 self.eq(status, 400)104 self.eq(error_code, 'InvalidTag')105 response = client.get_object_tagging(Bucket=bucket_name, Key=key)106 self.eq(len(response['TagSet']), 0)107 def test_put_excess_val_tags(self, s3cfg_global_unique):108 """109 测试-验证设置tag是key和value的最大字符数:key为128,value为257,failed110 """111 key = 'testputexcesskeytags'112 client = get_client(s3cfg_global_unique)113 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)114 tag_set = []115 for i in range(10):116 k = self.make_random_string(128)117 v = self.make_random_string(257)118 tag_set.append({'Key': k, 'Value': v})119 input_tag_set = {'TagSet': tag_set}120 e = assert_raises(ClientError, client.put_object_tagging, Bucket=bucket_name, Key=key, Tagging=input_tag_set)121 status, error_code = self.get_status_and_error_code(e.response)122 self.eq(status, 400)123 self.eq(error_code, 'InvalidTag')124 response = client.get_object_tagging(Bucket=bucket_name, Key=key)125 self.eq(len(response['TagSet']), 0)126 def test_put_modify_tags(self, s3cfg_global_unique):127 """128 测试-验证修改已存在的tags129 """130 key = 'testputmodifytags'131 client = get_client(s3cfg_global_unique)132 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)133 tag_set = [{'Key': 'key', 'Value': 'val'}, {'Key': 'key2', 'Value': 'val2'}]134 input_tag_set = {'TagSet': tag_set}135 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)136 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)137 response = client.get_object_tagging(Bucket=bucket_name, Key=key)138 self.eq(response['TagSet'], input_tag_set['TagSet'])139 tag_set2 = [{'Key': 'key3', 'Value': 'val3'}]140 input_tag_set2 = {'TagSet': tag_set2}141 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set2)142 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)143 response = client.get_object_tagging(Bucket=bucket_name, Key=key)144 self.eq(response['TagSet'], input_tag_set2['TagSet'])145 def test_put_delete_tags(self, s3cfg_global_unique):146 """147 测试-验证删除tags148 """149 key = 'testputmodifytags'150 client = get_client(s3cfg_global_unique)151 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)152 input_tag_set = self.create_simple_tag_set(2)153 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)154 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)155 response = client.get_object_tagging(Bucket=bucket_name, Key=key)156 self.eq(response['TagSet'], input_tag_set['TagSet'])157 response = client.delete_object_tagging(Bucket=bucket_name, Key=key)158 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 204)159 response = client.get_object_tagging(Bucket=bucket_name, Key=key)160 self.eq(len(response['TagSet']), 0)161 def test_post_object_tags_anonymous_request(self, s3cfg_global_unique):162 """163 测试-验证设置对象tags,通过browser based via POST request164 """165 client = get_client(s3cfg_global_unique)166 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)167 url = self.get_post_url(s3cfg_global_unique, bucket_name)168 client.create_bucket(ACL='public-read-write', Bucket=bucket_name)169 key_name = "foo.txt"170 input_tag_set = self.create_simple_tag_set(2)171 # xml_input_tag_set is the same as input_tag_set in xml.172 # There is not a simple way to change input_tag_set to xml like there is in the boto2 tetss173 xml_input_tag_set = "<Tagging><TagSet><Tag><Key>0</Key><Value>0</Value></Tag><Tag><Key>1</Key><Value>1</Value></Tag></TagSet></Tagging>"174 payload = OrderedDict([175 ("key", key_name),176 ("acl", "public-read"),177 ("Content-Type", "text/plain"),178 ("tagging", xml_input_tag_set),179 ('file', 'bar'),180 ])181 r = requests.post(url, files=payload, verify=s3cfg_global_unique.default_ssl_verify)182 self.eq(r.status_code, 204)183 response = client.get_object(Bucket=bucket_name, Key=key_name)184 body = self.get_body(response)185 self.eq(body, 'bar')186 response = client.get_object_tagging(Bucket=bucket_name, Key=key_name)187 self.eq(response['TagSet'], input_tag_set['TagSet'])188 def test_post_object_tags_authenticated_request(self, s3cfg_global_unique):189 """190 测试-验证191 (operation='authenticated browser based upload via POST request')192 (assertion='succeeds and returns written data')193 """194 client = get_client(s3cfg_global_unique)195 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)196 url = self.get_post_url(s3cfg_global_unique, bucket_name)197 utc = pytz.utc198 expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)199 policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),200 "conditions": [201 {"bucket": bucket_name},202 ["starts-with", "$key", "foo"],203 {"acl": "private"},204 ["starts-with", "$Content-Type", "text/plain"],205 ["content-length-range", 0, 1024],206 ["starts-with", "$tagging", ""]207 ]}208 # xml_input_tag_set is the same as `input_tag_set = self.create_simple_tag_set(2)` in xml209 # There is not a simple way to change input_tag_set to xml like there is in the boto2 tetss210 xml_input_tag_set = "<Tagging><TagSet><Tag><Key>0</Key><Value>0</Value></Tag><Tag><Key>1</Key><Value>1</Value></Tag></TagSet></Tagging>"211 json_policy_document = json.JSONEncoder().encode(policy_document)212 bytes_json_policy_document = bytes(json_policy_document, 'utf-8')213 policy = base64.b64encode(bytes_json_policy_document)214 aws_secret_access_key = s3cfg_global_unique.main_secret_key215 aws_access_key_id = s3cfg_global_unique.main_access_key216 signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest())217 payload = OrderedDict([218 ("key", "foo.txt"),219 ("AWSAccessKeyId", aws_access_key_id),220 ("acl", "private"), ("signature", signature), ("policy", policy),221 ("tagging", xml_input_tag_set),222 ("Content-Type", "text/plain"),223 ('file', 'bar')])224 r = requests.post(url, files=payload, verify=s3cfg_global_unique.default_ssl_verify)225 self.eq(r.status_code, 204)226 response = client.get_object(Bucket=bucket_name, Key='foo.txt')227 body = self.get_body(response)228 self.eq(body, 'bar')229 def test_put_obj_with_tags(self, s3cfg_global_unique):230 """231 (operation='Test PutObj with tagging headers')232 (assertion='success')233 """234 client = get_client(s3cfg_global_unique)235 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)236 key = 'testtagobj1'237 data = 'A' * 100238 tag_set = [{'Key': 'bar', 'Value': ''}, {'Key': 'foo', 'Value': 'bar'}]239 put_obj_tag_headers = {240 'x-amz-tagging': 'foo=bar&bar'241 }242 lf = (lambda **kwargs: kwargs['params']['headers'].update(put_obj_tag_headers))243 client.meta.events.register('before-call.s3.PutObject', lf)244 client.put_object(Bucket=bucket_name, Key=key, Body=data)245 response = client.get_object(Bucket=bucket_name, Key=key)246 body = self.get_body(response)247 self.eq(body, data)248 response = client.get_object_tagging(Bucket=bucket_name, Key=key)249 response_tag_set = response['TagSet']250 tag_set = tag_set251 self.eq(response_tag_set, tag_set)252class TestBucketTagging(TestTaggingBase):253 @pytest.mark.ess254 @pytest.mark.fails_on_ess255 @pytest.mark.xfail(reason="预期:当没设置桶标签的时候,获取标签返回NoSuchTagSetError", run=True, strict=True)256 def test_set_bucket_tagging(self, s3cfg_global_unique):257 """258 测试-验证设置存储桶的tags259 """260 client = get_client(s3cfg_global_unique)261 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)262 e = assert_raises(ClientError, client.get_bucket_tagging, Bucket=bucket_name) # won't raise ClientError...

Full Screen

Full Screen

test_s3.py

Source:test_s3.py Github

copy

Full Screen

1# Licensed under the Apache License, Version 2.0 (the "License"); you may2# not use this file except in compliance with the License. You may obtain3# a copy of the License at4#5# http://www.apache.org/licenses/LICENSE-2.06#7# Unless required by applicable law or agreed to in writing, software8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the10# License for the specific language governing permissions and limitations11# under the License.12from botocore import exceptions as boto_exc13import mock14import testtools15from s3uploader import exceptions16from s3uploader import s317class TestAssetManager(testtools.TestCase):18 def setUp(self):19 super(TestAssetManager, self).setUp()20 self.storage_mock = mock.Mock()21 self.manager = s3.AssetManager(storage_manager=self.storage_mock)22 self.addCleanup(mock.patch.stopall)23 def test_create_asset(self):24 self.storage_mock.get_url_for_upload.return_value = 'foo_url'25 asset = self.manager.create_asset()26 self.assertIsNotNone(asset)27 self.assertEqual('foo_url', asset.url)28 def test_update_asset(self):29 self.manager.update_asset('foo_asset_id')30 self.storage_mock.update_upload_status.assert_called_once_with(31 'foo_asset_id')32 def test_get_asset(self):33 self.storage_mock.get_url_for_download.return_value = 'foo_url'34 asset = self.manager.get_asset('foo_asset_id', 50)35 self.assertIsNotNone(asset)36 self.assertEqual('foo_url', asset.url)37class TestS3Manager(testtools.TestCase):38 def setUp(self):39 super(TestS3Manager, self).setUp()40 self.manager = s3.S3Manager(config=mock.Mock(), client=mock.Mock())41 self.addCleanup(mock.patch.stopall)42 def test_get_url_for_upload(self):43 self.manager.get_url_for_upload('foo_asset_id')44 self.manager.client.generate_presigned_post.assert_called_once()45 def test_update_upload_status(self):46 self.manager.update_upload_status('foo_asset_id')47 self.manager.client.put_object_tagging.assert_called_once()48 def test_update_upload_status_raise_not_found(self):49 exc = boto_exc.ClientError(50 error_response={}, operation_name='foo')51 exc.response['Error'] = {'Code': 'NoSuchKey'}52 self.manager.client.put_object_tagging.side_effect = exc53 self.assertRaises(54 exceptions.AssetNotFoundError,55 self.manager.update_upload_status, 'foo_asset_id')56 def test_get_url_for_download_ready_for_download(self):57 self.manager.client.get_object_tagging.return_value = (58 {'TagSet': [{'Key': 'Status', 'Value': 'Uploaded'}]})59 self.manager.get_url_for_download('foo_asset_id', 50)60 self.manager.client.get_object_tagging.assert_called_once()61 self.manager.client.generate_presigned_url.assert_called_once()62 def test_get_url_for_download_not_ready_for_download(self):63 self.manager.client.get_object_tagging.return_value = None64 self.manager.get_url_for_download('foo_asset_id', 50)65 self.manager.client.get_object_tagging.assert_called_once()66 self.assertFalse(self.manager.client.generate_presigned_url.call_count)67 def test_get_url_for_download_not_found(self):68 exc = boto_exc.ClientError(69 error_response={}, operation_name='foo')70 exc.response['Error'] = {'Code': 'NoSuchKey'}71 self.manager.client.get_object_tagging.side_effect = exc72 self.assertRaises(73 exceptions.AssetNotFoundError,74 self.manager.get_url_for_download, 'foo_asset_id', 50)75 def test_get_url_for_download_unknown_error(self):76 exc = boto_exc.ClientError(77 error_response={}, operation_name='foo')78 exc.response['Error'] = {'Code': 'UncaughtError'}79 self.manager.client.get_object_tagging.side_effect = exc80 self.assertRaises(81 exceptions.AssetError,82 self.manager.get_url_for_download, 'foo_asset_id', 50)83 def test_get_url_for_upload_raises(self):84 self.manager.client.generate_presigned_post.side_effect = (85 boto_exc.BotoCoreError)86 self.assertRaises(exceptions.AssetError,87 self.manager.get_url_for_upload, 'foo_asset_id')88 def test_update_upload_status_raises(self):89 self.manager.client.put_object_tagging.side_effect = (90 boto_exc.ClientError(error_response={}, operation_name='foo'))91 self.assertRaises(exceptions.AssetError,...

Full Screen

Full Screen

boto3_tagging.py

Source:boto3_tagging.py Github

copy

Full Screen

...17 Bucket='s3-obj-tagging-test',18 Key='put-obj-test',19 Tagging='two=2&Project=X'20 )21# response = client.get_object_tagging(Bucket='s3-obj-tagging-test',22# Key='test-s3-file',23# VersionId='eVQkyDiNOUF.ipmMi7Tawtk5GQslpHtK')24# print (response)25# response = client.delete_object_tagging(Bucket='s3-obj-tagging-test',26# Key='test-s3-file',27# VersionId='jk.ArjftY0Htz7weCNXNhX34YmSzJo73')28# print response29# response = client.put_object_tagging(Bucket='s3-obj-tagging-test',30# Key='test-s3-file',31# Tagging={32# 'TagSet': [33# { 'Key': 'Owner1', 'Value': '1'}34# ]35# })36# print response37# response = client.get_object_tagging(Bucket='s3-obj-tagging-test',38# Key='test-s3-file')39# print (response)40# response = client.put_object_tagging(Bucket='s3-obj-tagging-test',41# Key='test',42# Tagging={43# 'TagSet': [44# { 'Key': 'first1', 'Value': '1'},45# { 'Key': 'second2', 'Value': '2'}46# ]47# })48# print response49#50# response = client.get_object_tagging(Bucket='s3-obj-tagging-test', Key='test')...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful