Best Python code snippet using localstack_python
test_s3_tagging_v.py
Source:test_s3_tagging_v.py
...23 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)24 input_tag_set = self.create_simple_tag_set(2)25 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)26 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)27 response = client.get_object_tagging(Bucket=bucket_name, Key=key)28 self.eq(response['TagSet'], input_tag_set['TagSet'])29 def test_get_obj_head_tagging(self, s3cfg_global_unique):30 """31 æµè¯-éªè¯head-objectéå«æ设置çtag32 """33 key = 'testputtags'34 client = get_client(s3cfg_global_unique)35 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)36 count = 237 input_tag_set = self.create_simple_tag_set(count)38 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)39 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)40 response = client.head_object(Bucket=bucket_name, Key=key)41 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)42 self.eq(response['ResponseMetadata']['HTTPHeaders']['x-amz-tagging-count'], str(count))43 def test_put_max_tags(self, s3cfg_global_unique):44 """45 æµè¯-éªè¯æ大å
许设置çtagsï¼10个ï¼46 """47 key = 'testputmaxtags'48 client = get_client(s3cfg_global_unique)49 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)50 input_tag_set = self.create_simple_tag_set(10)51 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)52 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)53 response = client.get_object_tagging(Bucket=bucket_name, Key=key)54 self.eq(response['TagSet'], input_tag_set['TagSet'])55 def test_put_excess_tags(self, s3cfg_global_unique):56 """57 æµè¯-éªè¯æ大å
许设置çtagsï¼11个ï¼ï¼ failed58 """59 key = 'testputmaxtags'60 client = get_client(s3cfg_global_unique)61 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)62 input_tag_set = self.create_simple_tag_set(11)63 e = assert_raises(64 ClientError, client.put_object_tagging, Bucket=bucket_name, Key=key, Tagging=input_tag_set)65 status, error_code = self.get_status_and_error_code(e.response)66 self.eq(status, 400)67 self.eq(error_code, 'InvalidTag')68 response = client.get_object_tagging(Bucket=bucket_name, Key=key)69 self.eq(len(response['TagSet']), 0)70 def test_put_max_kvsize_tags(self, s3cfg_global_unique):71 """72 æµè¯-éªè¯è®¾ç½®tagæ¯keyåvalueçæ大å符æ°ï¼key为128ï¼value为25673 """74 key = 'testputmaxkeysize'75 client = get_client(s3cfg_global_unique)76 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)77 tag_set = []78 for i in range(10):79 k = self.make_random_string(128)80 v = self.make_random_string(256)81 tag_set.append({'Key': k, 'Value': v})82 input_tag_set = {'TagSet': tag_set}83 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)84 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)85 response = client.get_object_tagging(Bucket=bucket_name, Key=key)86 for kv_pair in response['TagSet']:87 self.eq((kv_pair in input_tag_set['TagSet']), True)88 def test_put_excess_key_tags(self, s3cfg_global_unique):89 """90 æµè¯-éªè¯è®¾ç½®tagæ¯keyåvalueçæ大å符æ°ï¼key为129ï¼value为256ï¼failed91 """92 key = 'testputexcesskeytags'93 client = get_client(s3cfg_global_unique)94 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)95 tag_set = []96 for i in range(10):97 k = self.make_random_string(129)98 v = self.make_random_string(256)99 tag_set.append({'Key': k, 'Value': v})100 input_tag_set = {'TagSet': tag_set}101 e = assert_raises(ClientError, client.put_object_tagging, Bucket=bucket_name, Key=key, Tagging=input_tag_set)102 status, error_code = self.get_status_and_error_code(e.response)103 self.eq(status, 400)104 self.eq(error_code, 'InvalidTag')105 response = client.get_object_tagging(Bucket=bucket_name, Key=key)106 self.eq(len(response['TagSet']), 0)107 def test_put_excess_val_tags(self, s3cfg_global_unique):108 """109 æµè¯-éªè¯è®¾ç½®tagæ¯keyåvalueçæ大å符æ°ï¼key为128ï¼value为257ï¼failed110 """111 key = 'testputexcesskeytags'112 client = get_client(s3cfg_global_unique)113 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)114 tag_set = []115 for i in range(10):116 k = self.make_random_string(128)117 v = self.make_random_string(257)118 tag_set.append({'Key': k, 'Value': v})119 input_tag_set = {'TagSet': tag_set}120 e = assert_raises(ClientError, client.put_object_tagging, Bucket=bucket_name, Key=key, Tagging=input_tag_set)121 status, error_code = self.get_status_and_error_code(e.response)122 self.eq(status, 400)123 self.eq(error_code, 'InvalidTag')124 response = client.get_object_tagging(Bucket=bucket_name, Key=key)125 self.eq(len(response['TagSet']), 0)126 def test_put_modify_tags(self, s3cfg_global_unique):127 """128 æµè¯-éªè¯ä¿®æ¹å·²åå¨çtags129 """130 key = 'testputmodifytags'131 client = get_client(s3cfg_global_unique)132 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)133 tag_set = [{'Key': 'key', 'Value': 'val'}, {'Key': 'key2', 'Value': 'val2'}]134 input_tag_set = {'TagSet': tag_set}135 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)136 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)137 response = client.get_object_tagging(Bucket=bucket_name, Key=key)138 self.eq(response['TagSet'], input_tag_set['TagSet'])139 tag_set2 = [{'Key': 'key3', 'Value': 'val3'}]140 input_tag_set2 = {'TagSet': tag_set2}141 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set2)142 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)143 response = client.get_object_tagging(Bucket=bucket_name, Key=key)144 self.eq(response['TagSet'], input_tag_set2['TagSet'])145 def test_put_delete_tags(self, s3cfg_global_unique):146 """147 æµè¯-éªè¯å é¤tags148 """149 key = 'testputmodifytags'150 client = get_client(s3cfg_global_unique)151 bucket_name = self.create_key_with_random_content(s3cfg_global_unique, key)152 input_tag_set = self.create_simple_tag_set(2)153 response = client.put_object_tagging(Bucket=bucket_name, Key=key, Tagging=input_tag_set)154 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 200)155 response = client.get_object_tagging(Bucket=bucket_name, Key=key)156 self.eq(response['TagSet'], input_tag_set['TagSet'])157 response = client.delete_object_tagging(Bucket=bucket_name, Key=key)158 self.eq(response['ResponseMetadata']['HTTPStatusCode'], 204)159 response = client.get_object_tagging(Bucket=bucket_name, Key=key)160 self.eq(len(response['TagSet']), 0)161 def test_post_object_tags_anonymous_request(self, s3cfg_global_unique):162 """163 æµè¯-éªè¯è®¾ç½®å¯¹è±¡tagsï¼éè¿browser based via POST request164 """165 client = get_client(s3cfg_global_unique)166 bucket_name = self.get_new_bucket_name(s3cfg_global_unique)167 url = self.get_post_url(s3cfg_global_unique, bucket_name)168 client.create_bucket(ACL='public-read-write', Bucket=bucket_name)169 key_name = "foo.txt"170 input_tag_set = self.create_simple_tag_set(2)171 # xml_input_tag_set is the same as input_tag_set in xml.172 # There is not a simple way to change input_tag_set to xml like there is in the boto2 tetss173 xml_input_tag_set = "<Tagging><TagSet><Tag><Key>0</Key><Value>0</Value></Tag><Tag><Key>1</Key><Value>1</Value></Tag></TagSet></Tagging>"174 payload = OrderedDict([175 ("key", key_name),176 ("acl", "public-read"),177 ("Content-Type", "text/plain"),178 ("tagging", xml_input_tag_set),179 ('file', 'bar'),180 ])181 r = requests.post(url, files=payload, verify=s3cfg_global_unique.default_ssl_verify)182 self.eq(r.status_code, 204)183 response = client.get_object(Bucket=bucket_name, Key=key_name)184 body = self.get_body(response)185 self.eq(body, 'bar')186 response = client.get_object_tagging(Bucket=bucket_name, Key=key_name)187 self.eq(response['TagSet'], input_tag_set['TagSet'])188 def test_post_object_tags_authenticated_request(self, s3cfg_global_unique):189 """190 æµè¯-éªè¯191 (operation='authenticated browser based upload via POST request')192 (assertion='succeeds and returns written data')193 """194 client = get_client(s3cfg_global_unique)195 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)196 url = self.get_post_url(s3cfg_global_unique, bucket_name)197 utc = pytz.utc198 expires = datetime.datetime.now(utc) + datetime.timedelta(seconds=+6000)199 policy_document = {"expiration": expires.strftime("%Y-%m-%dT%H:%M:%SZ"),200 "conditions": [201 {"bucket": bucket_name},202 ["starts-with", "$key", "foo"],203 {"acl": "private"},204 ["starts-with", "$Content-Type", "text/plain"],205 ["content-length-range", 0, 1024],206 ["starts-with", "$tagging", ""]207 ]}208 # xml_input_tag_set is the same as `input_tag_set = self.create_simple_tag_set(2)` in xml209 # There is not a simple way to change input_tag_set to xml like there is in the boto2 tetss210 xml_input_tag_set = "<Tagging><TagSet><Tag><Key>0</Key><Value>0</Value></Tag><Tag><Key>1</Key><Value>1</Value></Tag></TagSet></Tagging>"211 json_policy_document = json.JSONEncoder().encode(policy_document)212 bytes_json_policy_document = bytes(json_policy_document, 'utf-8')213 policy = base64.b64encode(bytes_json_policy_document)214 aws_secret_access_key = s3cfg_global_unique.main_secret_key215 aws_access_key_id = s3cfg_global_unique.main_access_key216 signature = base64.b64encode(hmac.new(bytes(aws_secret_access_key, 'utf-8'), policy, hashlib.sha1).digest())217 payload = OrderedDict([218 ("key", "foo.txt"),219 ("AWSAccessKeyId", aws_access_key_id),220 ("acl", "private"), ("signature", signature), ("policy", policy),221 ("tagging", xml_input_tag_set),222 ("Content-Type", "text/plain"),223 ('file', 'bar')])224 r = requests.post(url, files=payload, verify=s3cfg_global_unique.default_ssl_verify)225 self.eq(r.status_code, 204)226 response = client.get_object(Bucket=bucket_name, Key='foo.txt')227 body = self.get_body(response)228 self.eq(body, 'bar')229 def test_put_obj_with_tags(self, s3cfg_global_unique):230 """231 (operation='Test PutObj with tagging headers')232 (assertion='success')233 """234 client = get_client(s3cfg_global_unique)235 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)236 key = 'testtagobj1'237 data = 'A' * 100238 tag_set = [{'Key': 'bar', 'Value': ''}, {'Key': 'foo', 'Value': 'bar'}]239 put_obj_tag_headers = {240 'x-amz-tagging': 'foo=bar&bar'241 }242 lf = (lambda **kwargs: kwargs['params']['headers'].update(put_obj_tag_headers))243 client.meta.events.register('before-call.s3.PutObject', lf)244 client.put_object(Bucket=bucket_name, Key=key, Body=data)245 response = client.get_object(Bucket=bucket_name, Key=key)246 body = self.get_body(response)247 self.eq(body, data)248 response = client.get_object_tagging(Bucket=bucket_name, Key=key)249 response_tag_set = response['TagSet']250 tag_set = tag_set251 self.eq(response_tag_set, tag_set)252class TestBucketTagging(TestTaggingBase):253 @pytest.mark.ess254 @pytest.mark.fails_on_ess255 @pytest.mark.xfail(reason="é¢æï¼å½æ²¡è®¾ç½®æ¡¶æ ç¾çæ¶åï¼è·åæ ç¾è¿åNoSuchTagSetError", run=True, strict=True)256 def test_set_bucket_tagging(self, s3cfg_global_unique):257 """258 æµè¯-éªè¯è®¾ç½®åå¨æ¡¶çtags259 """260 client = get_client(s3cfg_global_unique)261 bucket_name = self.get_new_bucket(client, s3cfg_global_unique)262 e = assert_raises(ClientError, client.get_bucket_tagging, Bucket=bucket_name) # won't raise ClientError...
test_s3.py
Source:test_s3.py
1# Licensed under the Apache License, Version 2.0 (the "License"); you may2# not use this file except in compliance with the License. You may obtain3# a copy of the License at4#5# http://www.apache.org/licenses/LICENSE-2.06#7# Unless required by applicable law or agreed to in writing, software8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the10# License for the specific language governing permissions and limitations11# under the License.12from botocore import exceptions as boto_exc13import mock14import testtools15from s3uploader import exceptions16from s3uploader import s317class TestAssetManager(testtools.TestCase):18 def setUp(self):19 super(TestAssetManager, self).setUp()20 self.storage_mock = mock.Mock()21 self.manager = s3.AssetManager(storage_manager=self.storage_mock)22 self.addCleanup(mock.patch.stopall)23 def test_create_asset(self):24 self.storage_mock.get_url_for_upload.return_value = 'foo_url'25 asset = self.manager.create_asset()26 self.assertIsNotNone(asset)27 self.assertEqual('foo_url', asset.url)28 def test_update_asset(self):29 self.manager.update_asset('foo_asset_id')30 self.storage_mock.update_upload_status.assert_called_once_with(31 'foo_asset_id')32 def test_get_asset(self):33 self.storage_mock.get_url_for_download.return_value = 'foo_url'34 asset = self.manager.get_asset('foo_asset_id', 50)35 self.assertIsNotNone(asset)36 self.assertEqual('foo_url', asset.url)37class TestS3Manager(testtools.TestCase):38 def setUp(self):39 super(TestS3Manager, self).setUp()40 self.manager = s3.S3Manager(config=mock.Mock(), client=mock.Mock())41 self.addCleanup(mock.patch.stopall)42 def test_get_url_for_upload(self):43 self.manager.get_url_for_upload('foo_asset_id')44 self.manager.client.generate_presigned_post.assert_called_once()45 def test_update_upload_status(self):46 self.manager.update_upload_status('foo_asset_id')47 self.manager.client.put_object_tagging.assert_called_once()48 def test_update_upload_status_raise_not_found(self):49 exc = boto_exc.ClientError(50 error_response={}, operation_name='foo')51 exc.response['Error'] = {'Code': 'NoSuchKey'}52 self.manager.client.put_object_tagging.side_effect = exc53 self.assertRaises(54 exceptions.AssetNotFoundError,55 self.manager.update_upload_status, 'foo_asset_id')56 def test_get_url_for_download_ready_for_download(self):57 self.manager.client.get_object_tagging.return_value = (58 {'TagSet': [{'Key': 'Status', 'Value': 'Uploaded'}]})59 self.manager.get_url_for_download('foo_asset_id', 50)60 self.manager.client.get_object_tagging.assert_called_once()61 self.manager.client.generate_presigned_url.assert_called_once()62 def test_get_url_for_download_not_ready_for_download(self):63 self.manager.client.get_object_tagging.return_value = None64 self.manager.get_url_for_download('foo_asset_id', 50)65 self.manager.client.get_object_tagging.assert_called_once()66 self.assertFalse(self.manager.client.generate_presigned_url.call_count)67 def test_get_url_for_download_not_found(self):68 exc = boto_exc.ClientError(69 error_response={}, operation_name='foo')70 exc.response['Error'] = {'Code': 'NoSuchKey'}71 self.manager.client.get_object_tagging.side_effect = exc72 self.assertRaises(73 exceptions.AssetNotFoundError,74 self.manager.get_url_for_download, 'foo_asset_id', 50)75 def test_get_url_for_download_unknown_error(self):76 exc = boto_exc.ClientError(77 error_response={}, operation_name='foo')78 exc.response['Error'] = {'Code': 'UncaughtError'}79 self.manager.client.get_object_tagging.side_effect = exc80 self.assertRaises(81 exceptions.AssetError,82 self.manager.get_url_for_download, 'foo_asset_id', 50)83 def test_get_url_for_upload_raises(self):84 self.manager.client.generate_presigned_post.side_effect = (85 boto_exc.BotoCoreError)86 self.assertRaises(exceptions.AssetError,87 self.manager.get_url_for_upload, 'foo_asset_id')88 def test_update_upload_status_raises(self):89 self.manager.client.put_object_tagging.side_effect = (90 boto_exc.ClientError(error_response={}, operation_name='foo'))91 self.assertRaises(exceptions.AssetError,...
boto3_tagging.py
Source:boto3_tagging.py
...17 Bucket='s3-obj-tagging-test',18 Key='put-obj-test',19 Tagging='two=2&Project=X'20 )21# response = client.get_object_tagging(Bucket='s3-obj-tagging-test',22# Key='test-s3-file',23# VersionId='eVQkyDiNOUF.ipmMi7Tawtk5GQslpHtK')24# print (response)25# response = client.delete_object_tagging(Bucket='s3-obj-tagging-test',26# Key='test-s3-file',27# VersionId='jk.ArjftY0Htz7weCNXNhX34YmSzJo73')28# print response29# response = client.put_object_tagging(Bucket='s3-obj-tagging-test',30# Key='test-s3-file',31# Tagging={32# 'TagSet': [33# { 'Key': 'Owner1', 'Value': '1'}34# ]35# })36# print response37# response = client.get_object_tagging(Bucket='s3-obj-tagging-test',38# Key='test-s3-file')39# print (response)40# response = client.put_object_tagging(Bucket='s3-obj-tagging-test',41# Key='test',42# Tagging={43# 'TagSet': [44# { 'Key': 'first1', 'Value': '1'},45# { 'Key': 'second2', 'Value': '2'}46# ]47# })48# print response49#50# response = client.get_object_tagging(Bucket='s3-obj-tagging-test', Key='test')...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!