Best Python code snippet using localstack_python
test_s3.py
Source:test_s3.py
1import base642import datetime3import gzip4import hashlib5import io6import json7import logging8import os9import re10import shutil11import tempfile12import time13from io import BytesIO14from operator import itemgetter15from typing import TYPE_CHECKING16from urllib.parse import parse_qs, quote, urlparse17import boto3 as boto318import pytest19import requests20import xmltodict21from boto3.s3.transfer import KB, TransferConfig22from botocore import UNSIGNED23from botocore.client import Config24from botocore.exceptions import ClientError25from pytz import timezone26from localstack import config, constants27from localstack.constants import (28 S3_VIRTUAL_HOSTNAME,29 TEST_AWS_ACCESS_KEY_ID,30 TEST_AWS_SECRET_ACCESS_KEY,31)32from localstack.services.awslambda.lambda_utils import (33 LAMBDA_RUNTIME_NODEJS14X,34 LAMBDA_RUNTIME_PYTHON39,35)36from localstack.testing.aws.util import is_aws_cloud37from localstack.testing.pytest.fixtures import _client38from localstack.utils import testutil39from localstack.utils.aws import aws_stack40from localstack.utils.collections import is_sub_dict41from localstack.utils.files import load_file42from localstack.utils.run import run43from localstack.utils.server import http2_server44from localstack.utils.strings import (45 checksum_crc32,46 checksum_crc32c,47 hash_sha1,48 hash_sha256,49 short_uid,50 to_bytes,51 to_str,52)53from localstack.utils.sync import retry54from localstack.utils.testutil import check_expected_lambda_log_events_length55if TYPE_CHECKING:56 from mypy_boto3_s3 import S3Client57LOG = logging.getLogger(__name__)58@pytest.fixture(scope="class")59def s3_client_for_region():60 def _s3_client(61 region_name: str = None,62 ):63 return _client("s3", region_name=region_name)64 return _s3_client65@pytest.fixture66def s3_create_bucket_with_client(s3_resource):67 buckets = []68 def factory(s3_client, **kwargs) -> str:69 if "Bucket" not in kwargs:70 kwargs["Bucket"] = f"test-bucket-{short_uid()}"71 response = s3_client.create_bucket(**kwargs)72 buckets.append(kwargs["Bucket"])73 return response74 yield factory75 # cleanup76 for bucket in buckets:77 try:78 bucket = s3_resource.Bucket(bucket)79 bucket.objects.all().delete()80 bucket.object_versions.all().delete()81 bucket.delete()82 except Exception as e:83 LOG.debug(f"error cleaning up bucket {bucket}: {e}")84@pytest.fixture85def s3_multipart_upload(s3_client):86 def perform_multipart_upload(bucket, key, data=None, zipped=False, acl=None):87 kwargs = {"ACL": acl} if acl else {}88 multipart_upload_dict = s3_client.create_multipart_upload(Bucket=bucket, Key=key, **kwargs)89 upload_id = multipart_upload_dict["UploadId"]90 # Write contents to memory rather than a file.91 data = data or (5 * short_uid())92 data = to_bytes(data)93 upload_file_object = BytesIO(data)94 if zipped:95 upload_file_object = BytesIO()96 with gzip.GzipFile(fileobj=upload_file_object, mode="w") as filestream:97 filestream.write(data)98 response = s3_client.upload_part(99 Bucket=bucket,100 Key=key,101 Body=upload_file_object,102 PartNumber=1,103 UploadId=upload_id,104 )105 multipart_upload_parts = [{"ETag": response["ETag"], "PartNumber": 1}]106 return s3_client.complete_multipart_upload(107 Bucket=bucket,108 Key=key,109 MultipartUpload={"Parts": multipart_upload_parts},110 UploadId=upload_id,111 )112 return perform_multipart_upload113@pytest.fixture114def create_tmp_folder_lambda():115 cleanup_folders = []116 def prepare_folder(path_to_lambda, run_command=None):117 tmp_dir = tempfile.mkdtemp()118 shutil.copy(path_to_lambda, tmp_dir)119 if run_command:120 run(f"cd {tmp_dir}; {run_command}")121 cleanup_folders.append(tmp_dir)122 return tmp_dir123 yield prepare_folder124 for folder in cleanup_folders:125 try:126 shutil.rmtree(folder)127 except Exception:128 LOG.warning(f"could not delete folder {folder}")129class TestS3:130 @pytest.mark.aws_validated131 @pytest.mark.skip_snapshot_verify(paths=["$..EncodingType"])132 def test_region_header_exists(self, s3_client, s3_create_bucket, snapshot):133 snapshot.add_transformer(snapshot.transform.s3_api())134 bucket_name = s3_create_bucket(135 CreateBucketConfiguration={"LocationConstraint": "eu-west-1"},136 )137 response = s3_client.head_bucket(Bucket=bucket_name)138 assert response["ResponseMetadata"]["HTTPHeaders"]["x-amz-bucket-region"] == "eu-west-1"139 snapshot.match("head_bucket", response)140 response = s3_client.list_objects_v2(Bucket=bucket_name)141 assert response["ResponseMetadata"]["HTTPHeaders"]["x-amz-bucket-region"] == "eu-west-1"142 snapshot.match("list_objects_v2", response)143 @pytest.mark.aws_validated144 # TODO list-buckets contains other buckets when running in CI145 @pytest.mark.skip_snapshot_verify(146 paths=["$..Marker", "$..Prefix", "$..EncodingType", "$..list-buckets.Buckets"]147 )148 def test_delete_bucket_with_content(self, s3_client, s3_resource, s3_bucket, snapshot):149 snapshot.add_transformer(snapshot.transform.s3_api())150 bucket_name = s3_bucket151 for i in range(0, 10, 1):152 body = "test-" + str(i)153 key = "test-key-" + str(i)154 s3_client.put_object(Bucket=bucket_name, Key=key, Body=body)155 resp = s3_client.list_objects(Bucket=bucket_name, MaxKeys=100)156 snapshot.match("list-objects", resp)157 assert 10 == len(resp["Contents"])158 bucket = s3_resource.Bucket(bucket_name)159 bucket.objects.all().delete()160 bucket.delete()161 resp = s3_client.list_buckets()162 # TODO - this fails in the CI pipeline and is currently skipped from verification163 snapshot.match("list-buckets", resp)164 assert bucket_name not in [b["Name"] for b in resp["Buckets"]]165 @pytest.mark.aws_validated166 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId", "$..ContentLanguage"])167 def test_put_and_get_object_with_utf8_key(self, s3_client, s3_bucket, snapshot):168 snapshot.add_transformer(snapshot.transform.s3_api())169 response = s3_client.put_object(Bucket=s3_bucket, Key="Ä0Ã", Body=b"abc123")170 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200171 snapshot.match("put-object", response)172 response = s3_client.get_object(Bucket=s3_bucket, Key="Ä0Ã")173 snapshot.match("get-object", response)174 assert response["Body"].read() == b"abc123"175 @pytest.mark.aws_validated176 def test_resource_object_with_slashes_in_key(self, s3_resource, s3_bucket):177 s3_resource.Object(s3_bucket, "/foo").put(Body="foobar")178 s3_resource.Object(s3_bucket, "bar").put(Body="barfoo")179 with pytest.raises(ClientError) as e:180 s3_resource.Object(s3_bucket, "foo").get()181 e.match("NoSuchKey")182 with pytest.raises(ClientError) as e:183 s3_resource.Object(s3_bucket, "/bar").get()184 e.match("NoSuchKey")185 response = s3_resource.Object(s3_bucket, "/foo").get()186 assert response["Body"].read() == b"foobar"187 response = s3_resource.Object(s3_bucket, "bar").get()188 assert response["Body"].read() == b"barfoo"189 @pytest.mark.aws_validated190 def test_metadata_header_character_decoding(self, s3_client, s3_bucket, snapshot):191 snapshot.add_transformer(snapshot.transform.s3_api())192 # Object metadata keys should accept keys with underscores193 # https://github.com/localstack/localstack/issues/1790194 # put object195 object_key = "key-with-metadata"196 metadata = {"TEST_META_1": "foo", "__meta_2": "bar"}197 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Metadata=metadata, Body="foo")198 metadata_saved = s3_client.head_object(Bucket=s3_bucket, Key=object_key)["Metadata"]199 snapshot.match("head-object", metadata_saved)200 # note that casing is removed (since headers are case-insensitive)201 assert metadata_saved == {"test_meta_1": "foo", "__meta_2": "bar"}202 @pytest.mark.aws_validated203 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId", "$..ContentLanguage"])204 def test_upload_file_multipart(self, s3_client, s3_bucket, tmpdir, snapshot):205 snapshot.add_transformer(snapshot.transform.s3_api())206 key = "my-key"207 # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3.html#multipart-transfers208 tranfer_config = TransferConfig(multipart_threshold=5 * KB, multipart_chunksize=1 * KB)209 file = tmpdir / "test-file.bin"210 data = b"1" * (6 * KB) # create 6 kilobytes of ones211 file.write(data=data, mode="w")212 s3_client.upload_file(213 Bucket=s3_bucket, Key=key, Filename=str(file.realpath()), Config=tranfer_config214 )215 obj = s3_client.get_object(Bucket=s3_bucket, Key=key)216 assert obj["Body"].read() == data, f"body did not contain expected data {obj}"217 snapshot.match("get_object", obj)218 @pytest.mark.aws_validated219 @pytest.mark.parametrize("delimiter", ["/", "%2F"])220 def test_list_objects_with_prefix(self, s3_client, s3_create_bucket, delimiter):221 bucket_name = s3_create_bucket()222 key = "test/foo/bar/123"223 s3_client.put_object(Bucket=bucket_name, Key=key, Body=b"content 123")224 response = s3_client.list_objects(225 Bucket=bucket_name, Prefix="test/", Delimiter=delimiter, MaxKeys=1, EncodingType="url"226 )227 sub_dict = {228 "Delimiter": delimiter,229 "EncodingType": "url",230 "IsTruncated": False,231 "Marker": "",232 "MaxKeys": 1,233 "Name": bucket_name,234 "Prefix": "test/",235 }236 if delimiter == "/":237 # if delimiter is "/", then common prefixes are returned238 sub_dict["CommonPrefixes"] = [{"Prefix": "test/foo/"}]239 else:240 # if delimiter is "%2F" (or other non-contained character), then the actual keys are returned in Contents241 assert len(response["Contents"]) == 1242 assert response["Contents"][0]["Key"] == key243 sub_dict["Delimiter"] = "%252F"244 assert is_sub_dict(sub_dict, response)245 @pytest.mark.aws_validated246 @pytest.mark.skip_snapshot_verify(path="$..Error.BucketName")247 def test_get_object_no_such_bucket(self, s3_client, snapshot):248 with pytest.raises(ClientError) as e:249 s3_client.get_object(Bucket=f"does-not-exist-{short_uid()}", Key="foobar")250 snapshot.match("expected_error", e.value.response)251 @pytest.mark.aws_validated252 @pytest.mark.skip_snapshot_verify(path="$..RequestID")253 def test_delete_bucket_no_such_bucket(self, s3_client, snapshot):254 with pytest.raises(ClientError) as e:255 s3_client.delete_bucket(Bucket=f"does-not-exist-{short_uid()}")256 snapshot.match("expected_error", e.value.response)257 @pytest.mark.aws_validated258 @pytest.mark.skip_snapshot_verify(path="$..Error.BucketName")259 def test_get_bucket_notification_configuration_no_such_bucket(self, s3_client, snapshot):260 with pytest.raises(ClientError) as e:261 s3_client.get_bucket_notification_configuration(Bucket=f"doesnotexist-{short_uid()}")262 snapshot.match("expected_error", e.value.response)263 @pytest.mark.aws_validated264 @pytest.mark.xfail(265 reason="currently not implemented in moto, see https://github.com/localstack/localstack/issues/6217"266 )267 # TODO: see also XML issue in https://github.com/localstack/localstack/issues/6422268 def test_get_object_attributes(self, s3_client, s3_bucket, snapshot):269 s3_client.put_object(Bucket=s3_bucket, Key="data.txt", Body=b"69\n420\n")270 response = s3_client.get_object_attributes(271 Bucket=s3_bucket,272 Key="data.txt",273 ObjectAttributes=["StorageClass", "ETag", "ObjectSize"],274 )275 snapshot.match("object-attrs", response)276 @pytest.mark.aws_validated277 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId", "$..ContentLanguage"])278 def test_put_and_get_object_with_hash_prefix(self, s3_client, s3_bucket, snapshot):279 snapshot.add_transformer(snapshot.transform.s3_api())280 key_name = "#key-with-hash-prefix"281 content = b"test 123"282 response = s3_client.put_object(Bucket=s3_bucket, Key=key_name, Body=content)283 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200284 snapshot.match("put-object", response)285 response = s3_client.get_object(Bucket=s3_bucket, Key=key_name)286 snapshot.match("get-object", response)287 assert response["Body"].read() == content288 @pytest.mark.aws_validated289 @pytest.mark.xfail(reason="error message is different in current implementation")290 def test_invalid_range_error(self, s3_client, s3_bucket):291 key = "my-key"292 s3_client.put_object(Bucket=s3_bucket, Key=key, Body=b"abcdefgh")293 with pytest.raises(ClientError) as e:294 s3_client.get_object(Bucket=s3_bucket, Key=key, Range="bytes=1024-4096")295 e.match("InvalidRange")296 e.match("The requested range is not satisfiable")297 @pytest.mark.aws_validated298 def test_range_key_not_exists(self, s3_client, s3_bucket):299 key = "my-key"300 with pytest.raises(ClientError) as e:301 s3_client.get_object(Bucket=s3_bucket, Key=key, Range="bytes=1024-4096")302 e.match("NoSuchKey")303 e.match("The specified key does not exist.")304 @pytest.mark.aws_validated305 def test_create_bucket_via_host_name(self, s3_vhost_client):306 # TODO check redirection (happens in AWS because of region name), should it happen in LS?307 # https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility308 bucket_name = f"test-{short_uid()}"309 try:310 response = s3_vhost_client.create_bucket(311 Bucket=bucket_name,312 CreateBucketConfiguration={"LocationConstraint": "eu-central-1"},313 )314 assert "Location" in response315 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200316 response = s3_vhost_client.get_bucket_location(Bucket=bucket_name)317 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200318 assert response["LocationConstraint"] == "eu-central-1"319 finally:320 s3_vhost_client.delete_bucket(Bucket=bucket_name)321 @pytest.mark.aws_validated322 def test_put_and_get_bucket_policy(self, s3_client, s3_bucket):323 # put bucket policy324 policy = {325 "Version": "2012-10-17",326 "Statement": [327 {328 "Action": "s3:GetObject",329 "Effect": "Allow",330 "Resource": f"arn:aws:s3:::{s3_bucket}/*",331 "Principal": {"AWS": "*"},332 }333 ],334 }335 response = s3_client.put_bucket_policy(Bucket=s3_bucket, Policy=json.dumps(policy))336 assert response["ResponseMetadata"]["HTTPStatusCode"] == 204337 # retrieve and check policy config338 saved_policy = s3_client.get_bucket_policy(Bucket=s3_bucket)["Policy"]339 assert policy == json.loads(saved_policy)340 @pytest.mark.aws_validated341 @pytest.mark.xfail(reason="see https://github.com/localstack/localstack/issues/5769")342 def test_put_object_tagging_empty_list(self, s3_client, s3_bucket, snapshot):343 key = "my-key"344 s3_client.put_object(Bucket=s3_bucket, Key=key, Body=b"abcdefgh")345 object_tags = s3_client.get_object_tagging(Bucket=s3_bucket, Key=key)346 snapshot.match("created-object-tags", object_tags)347 tag_set = {"TagSet": [{"Key": "tag1", "Value": "tag1"}]}348 s3_client.put_object_tagging(Bucket=s3_bucket, Key=key, Tagging=tag_set)349 object_tags = s3_client.get_object_tagging(Bucket=s3_bucket, Key=key)350 snapshot.match("updated-object-tags", object_tags)351 s3_client.put_object_tagging(Bucket=s3_bucket, Key=key, Tagging={"TagSet": []})352 object_tags = s3_client.get_object_tagging(Bucket=s3_bucket, Key=key)353 snapshot.match("deleted-object-tags", object_tags)354 @pytest.mark.aws_validated355 @pytest.mark.xfail(reason="see https://github.com/localstack/localstack/issues/6218")356 def test_head_object_fields(self, s3_client, s3_bucket, snapshot):357 key = "my-key"358 s3_client.put_object(Bucket=s3_bucket, Key=key, Body=b"abcdefgh")359 response = s3_client.head_object(Bucket=s3_bucket, Key=key)360 # missing AcceptRanges field361 # see https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html362 # https://stackoverflow.com/questions/58541696/s3-not-returning-accept-ranges-header363 # https://www.keycdn.com/support/frequently-asked-questions#is-byte-range-not-working-in-combination-with-s3364 snapshot.match("head-object", response)365 @pytest.mark.aws_validated366 @pytest.mark.xfail(reason="see https://github.com/localstack/localstack/issues/6553")367 def test_get_object_after_deleted_in_versioned_bucket(368 self, s3_client, s3_bucket, s3_resource, snapshot369 ):370 bucket = s3_resource.Bucket(s3_bucket)371 bucket.Versioning().enable()372 key = "my-key"373 s3_client.put_object(Bucket=s3_bucket, Key=key, Body=b"abcdefgh")374 s3_obj = s3_client.get_object(Bucket=s3_bucket, Key=key)375 snapshot.match("get-object", s3_obj)376 s3_client.delete_object(Bucket=s3_bucket, Key=key)377 with pytest.raises(ClientError) as e:378 s3_client.get_object(Bucket=s3_bucket, Key=key)379 snapshot.match("get-object-after-delete", e.value.response)380 @pytest.mark.aws_validated381 @pytest.mark.parametrize("algorithm", ["CRC32", "CRC32C", "SHA1", "SHA256"])382 def test_put_object_checksum(self, s3_client, s3_create_bucket, algorithm):383 bucket = s3_create_bucket()384 key = f"file-{short_uid()}"385 data = b"test data.."386 params = {387 "Bucket": bucket,388 "Key": key,389 "Body": data,390 "ChecksumAlgorithm": algorithm,391 f"Checksum{algorithm}": short_uid(),392 }393 with pytest.raises(ClientError) as e:394 s3_client.put_object(**params)395 error = e.value.response["Error"]396 assert error["Code"] == "InvalidRequest"397 checksum_header = f"x-amz-checksum-{algorithm.lower()}"398 assert error["Message"] == f"Value for {checksum_header} header is invalid."399 # Test our generated checksums400 match algorithm:401 case "CRC32":402 checksum = checksum_crc32(data)403 case "CRC32C":404 checksum = checksum_crc32c(data)405 case "SHA1":406 checksum = hash_sha1(data)407 case "SHA256":408 checksum = hash_sha256(data)409 case _:410 checksum = ""411 params.update({f"Checksum{algorithm}": checksum})412 response = s3_client.put_object(**params)413 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200414 # Test the autogenerated checksums415 params.pop(f"Checksum{algorithm}")416 response = s3_client.put_object(**params)417 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200418 @pytest.mark.aws_validated419 @pytest.mark.skip_snapshot_verify(paths=["$..AcceptRanges"])420 def test_s3_copy_metadata_replace(self, s3_client, s3_create_bucket, snapshot):421 snapshot.add_transformer(snapshot.transform.s3_api())422 object_key = "source-object"423 bucket_name = s3_create_bucket()424 resp = s3_client.put_object(425 Bucket=bucket_name,426 Key=object_key,427 Body='{"key": "value"}',428 ContentType="application/json",429 Metadata={"key": "value"},430 )431 snapshot.match("put_object", resp)432 head_object = s3_client.head_object(Bucket=bucket_name, Key=object_key)433 snapshot.match("head_object", head_object)434 object_key_copy = f"{object_key}-copy"435 resp = s3_client.copy_object(436 Bucket=bucket_name,437 CopySource=f"{bucket_name}/{object_key}",438 Key=object_key_copy,439 Metadata={"another-key": "value"},440 ContentType="application/javascript",441 MetadataDirective="REPLACE",442 )443 snapshot.match("copy_object", resp)444 head_object = s3_client.head_object(Bucket=bucket_name, Key=object_key_copy)445 snapshot.match("head_object_copy", head_object)446 @pytest.mark.aws_validated447 @pytest.mark.skip_snapshot_verify(paths=["$..AcceptRanges"])448 def test_s3_copy_content_type_and_metadata(self, s3_client, s3_create_bucket, snapshot):449 snapshot.add_transformer(snapshot.transform.s3_api())450 object_key = "source-object"451 bucket_name = s3_create_bucket()452 resp = s3_client.put_object(453 Bucket=bucket_name,454 Key=object_key,455 Body='{"key": "value"}',456 ContentType="application/json",457 Metadata={"key": "value"},458 )459 snapshot.match("put_object", resp)460 head_object = s3_client.head_object(Bucket=bucket_name, Key=object_key)461 snapshot.match("head_object", head_object)462 object_key_copy = f"{object_key}-copy"463 resp = s3_client.copy_object(464 Bucket=bucket_name, CopySource=f"{bucket_name}/{object_key}", Key=object_key_copy465 )466 snapshot.match("copy_object", resp)467 head_object = s3_client.head_object(Bucket=bucket_name, Key=object_key_copy)468 snapshot.match("head_object_copy", head_object)469 s3_client.delete_objects(Bucket=bucket_name, Delete={"Objects": [{"Key": object_key_copy}]})470 # does not set MetadataDirective=REPLACE, so the original metadata should be kept471 object_key_copy = f"{object_key}-second-copy"472 resp = s3_client.copy_object(473 Bucket=bucket_name,474 CopySource=f"{bucket_name}/{object_key}",475 Key=object_key_copy,476 Metadata={"another-key": "value"},477 ContentType="application/javascript",478 )479 snapshot.match("copy_object_second", resp)480 head_object = s3_client.head_object(Bucket=bucket_name, Key=object_key_copy)481 snapshot.match("head_object_second_copy", head_object)482 @pytest.mark.aws_validated483 @pytest.mark.xfail(484 reason="wrong behaviour, see https://docs.aws.amazon.com/AmazonS3/latest/userguide/managing-acls.html"485 )486 def test_s3_multipart_upload_acls(487 self, s3_client, s3_create_bucket, s3_multipart_upload, snapshot488 ):489 # The basis for this test is wrong - see:490 # https://docs.aws.amazon.com/AmazonS3/latest/userguide/managing-acls.html491 # > Bucket and object permissions are independent of each other. An object does not inherit the permissions492 # > from its bucket. For example, if you create a bucket and grant write access to a user, you can't access493 # > that userâs objects unless the user explicitly grants you access.494 snapshot.add_transformer(495 [496 snapshot.transform.key_value("DisplayName"),497 snapshot.transform.key_value("ID", value_replacement="owner-id"),498 ]499 )500 bucket_name = f"test-bucket-{short_uid()}"501 s3_create_bucket(Bucket=bucket_name, ACL="public-read")502 response = s3_client.get_bucket_acl(Bucket=bucket_name)503 snapshot.match("bucket-acl", response)504 def check_permissions(key):505 acl_response = s3_client.get_object_acl(Bucket=bucket_name, Key=key)506 snapshot.match(f"permission-{key}", acl_response)507 # perform uploads (multipart and regular) and check ACLs508 s3_client.put_object(Bucket=bucket_name, Key="acl-key0", Body="something")509 check_permissions("acl-key0")510 s3_multipart_upload(bucket=bucket_name, key="acl-key1")511 check_permissions("acl-key1")512 s3_multipart_upload(bucket=bucket_name, key="acl-key2", acl="public-read-write")513 check_permissions("acl-key2")514 @pytest.mark.only_localstack515 @pytest.mark.parametrize("case_sensitive_headers", [True, False])516 def test_s3_get_response_case_sensitive_headers(517 self, s3_client, s3_bucket, case_sensitive_headers518 ):519 # Test that RETURN_CASE_SENSITIVE_HEADERS is respected520 object_key = "key-by-hostname"521 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")522 # get object and assert headers523 case_sensitive_before = http2_server.RETURN_CASE_SENSITIVE_HEADERS524 try:525 url = s3_client.generate_presigned_url(526 "get_object", Params={"Bucket": s3_bucket, "Key": object_key}527 )528 http2_server.RETURN_CASE_SENSITIVE_HEADERS = case_sensitive_headers529 response = requests.get(url, verify=False)530 # expect that Etag is contained531 header_names = list(response.headers.keys())532 expected_etag = "ETag" if case_sensitive_headers else "etag"533 assert expected_etag in header_names534 finally:535 http2_server.RETURN_CASE_SENSITIVE_HEADERS = case_sensitive_before536 @pytest.mark.aws_validated537 @pytest.mark.skip_snapshot_verify(538 paths=[539 "$..AcceptRanges",540 "$..ContentLanguage",541 "$..VersionId",542 "$..Restore",543 ]544 )545 def test_s3_object_expiry(self, s3_client, s3_bucket, snapshot):546 # AWS only cleans up S3 expired object once a day usually547 # the object stays accessible for quite a while after being expired548 # https://stackoverflow.com/questions/38851456/aws-s3-object-expiration-less-than-24-hours549 # handle s3 object expiry550 # https://github.com/localstack/localstack/issues/1685551 # TODO: should we have a config var to not deleted immediately in the new provider? and schedule it?552 snapshot.add_transformer(snapshot.transform.s3_api())553 # put object554 short_expire = datetime.datetime.now(timezone("GMT")) + datetime.timedelta(seconds=1)555 object_key_expired = "key-object-expired"556 object_key_not_expired = "key-object-not-expired"557 s3_client.put_object(558 Bucket=s3_bucket,559 Key=object_key_expired,560 Body="foo",561 Expires=short_expire,562 )563 # sleep so it expires564 time.sleep(3)565 # head_object does not raise an error for now in LS566 response = s3_client.head_object(Bucket=s3_bucket, Key=object_key_expired)567 assert response["Expires"] < datetime.datetime.now(timezone("GMT"))568 snapshot.match("head-object-expired", response)569 # try to fetch an object which is already expired570 if not is_aws_cloud(): # fixme for now behaviour differs, have a look at it and discuss571 with pytest.raises(Exception) as e: # this does not raise in AWS572 s3_client.get_object(Bucket=s3_bucket, Key=object_key_expired)573 e.match("NoSuchKey")574 s3_client.put_object(575 Bucket=s3_bucket,576 Key=object_key_not_expired,577 Body="foo",578 Expires=datetime.datetime.now(timezone("GMT")) + datetime.timedelta(hours=1),579 )580 # try to fetch has not been expired yet.581 resp = s3_client.get_object(Bucket=s3_bucket, Key=object_key_not_expired)582 assert "Expires" in resp583 assert resp["Expires"] > datetime.datetime.now(timezone("GMT"))584 snapshot.match("get-object-not-yet-expired", resp)585 @pytest.mark.aws_validated586 @pytest.mark.skip_snapshot_verify(587 paths=[588 "$..ContentLanguage",589 "$..VersionId",590 ]591 )592 def test_upload_file_with_xml_preamble(self, s3_client, s3_create_bucket, snapshot):593 snapshot.add_transformer(snapshot.transform.s3_api())594 bucket_name = f"bucket-{short_uid()}"595 object_key = f"key-{short_uid()}"596 body = '<?xml version="1.0" encoding="UTF-8"?><test/>'597 s3_create_bucket(Bucket=bucket_name)598 s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=body)599 response = s3_client.get_object(Bucket=bucket_name, Key=object_key)600 snapshot.match("get_object", response)601 @pytest.mark.aws_validated602 @pytest.mark.xfail(reason="The error format is wrong in s3_listener (is_bucket_available)")603 def test_bucket_availability(self, s3_client, snapshot):604 bucket_name = "test-bucket-lifecycle"605 with pytest.raises(ClientError) as e:606 s3_client.get_bucket_lifecycle(Bucket=bucket_name)607 snapshot.match("bucket-lifecycle", e.value.response)608 with pytest.raises(ClientError) as e:609 s3_client.get_bucket_replication(Bucket=bucket_name)610 snapshot.match("bucket-replication", e.value.response)611 @pytest.mark.aws_validated612 def test_location_path_url(self, s3_client, s3_create_bucket, account_id):613 region = "us-east-2"614 bucket_name = s3_create_bucket(615 CreateBucketConfiguration={"LocationConstraint": region}, ACL="public-read"616 )617 response = s3_client.get_bucket_location(Bucket=bucket_name)618 assert region == response["LocationConstraint"]619 url = _bucket_url(bucket_name, region)620 # https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLocation.html621 # make raw request, assert that newline is contained after XML preamble: <?xml ...>\n622 response = requests.get(f"{url}?location?x-amz-expected-bucket-owner={account_id}")623 assert response.ok624 content = to_str(response.content)625 assert re.match(r"^<\?xml [^>]+>\n<.*", content, flags=re.MULTILINE)626 @pytest.mark.aws_validated627 @pytest.mark.skip_snapshot_verify(paths=["$..Error.RequestID"])628 def test_different_location_constraint(629 self,630 s3_client,631 s3_create_bucket,632 s3_client_for_region,633 s3_create_bucket_with_client,634 snapshot,635 ):636 snapshot.add_transformer(snapshot.transform.s3_api())637 snapshot.add_transformer(638 snapshot.transform.key_value("Location", "<location>", reference_replacement=False)639 )640 bucket_1_name = f"bucket-{short_uid()}"641 s3_create_bucket(Bucket=bucket_1_name)642 response = s3_client.get_bucket_location(Bucket=bucket_1_name)643 snapshot.match("get_bucket_location_bucket_1", response)644 region_2 = "us-east-2"645 client_2 = s3_client_for_region(region_name=region_2)646 bucket_2_name = f"bucket-{short_uid()}"647 s3_create_bucket_with_client(648 client_2,649 Bucket=bucket_2_name,650 CreateBucketConfiguration={"LocationConstraint": region_2},651 )652 response = client_2.get_bucket_location(Bucket=bucket_2_name)653 snapshot.match("get_bucket_location_bucket_2", response)654 # assert creation fails without location constraint for us-east-2 region655 with pytest.raises(Exception) as exc:656 client_2.create_bucket(Bucket=f"bucket-{short_uid()}")657 snapshot.match("create_bucket_constraint_exc", exc.value.response)658 bucket_3_name = f"bucket-{short_uid()}"659 response = s3_create_bucket_with_client(660 client_2,661 Bucket=bucket_3_name,662 CreateBucketConfiguration={"LocationConstraint": region_2},663 )664 snapshot.match("create_bucket_bucket_3", response)665 response = client_2.get_bucket_location(Bucket=bucket_3_name)666 snapshot.match("get_bucket_location_bucket_3", response)667 @pytest.mark.aws_validated668 @pytest.mark.skip_snapshot_verify(669 paths=[670 "$..ContentLanguage",671 "$..VersionId",672 ]673 )674 def test_get_object_with_anon_credentials(self, s3_client, s3_create_bucket, snapshot):675 snapshot.add_transformer(snapshot.transform.s3_api())676 bucket_name = f"bucket-{short_uid()}"677 object_key = f"key-{short_uid()}"678 body = "body data"679 s3_create_bucket(Bucket=bucket_name, ACL="public-read")680 s3_client.put_object(681 Bucket=bucket_name,682 Key=object_key,683 Body=body,684 )685 s3_client.put_object_acl(Bucket=bucket_name, Key=object_key, ACL="public-read")686 s3_anon_client = _anon_client("s3")687 response = s3_anon_client.get_object(Bucket=bucket_name, Key=object_key)688 snapshot.match("get_object", response)689 @pytest.mark.aws_validated690 @pytest.mark.skip_snapshot_verify(691 paths=["$..ContentLanguage", "$..VersionId", "$..AcceptRanges"]692 )693 def test_putobject_with_multiple_keys(self, s3_client, s3_create_bucket, snapshot):694 snapshot.add_transformer(snapshot.transform.s3_api())695 bucket = f"bucket-{short_uid()}"696 key_by_path = "aws/key1/key2/key3"697 s3_create_bucket(Bucket=bucket)698 s3_client.put_object(Body=b"test", Bucket=bucket, Key=key_by_path)699 result = s3_client.get_object(Bucket=bucket, Key=key_by_path)700 snapshot.match("get_object", result)701 @pytest.mark.aws_validated702 def test_delete_bucket_lifecycle_configuration(self, s3_client, s3_bucket, snapshot):703 snapshot.add_transformer(snapshot.transform.key_value("BucketName"))704 lfc = {705 "Rules": [706 {707 "Expiration": {"Days": 7},708 "ID": "wholebucket",709 "Filter": {"Prefix": ""},710 "Status": "Enabled",711 }712 ]713 }714 s3_client.put_bucket_lifecycle_configuration(Bucket=s3_bucket, LifecycleConfiguration=lfc)715 result = s3_client.get_bucket_lifecycle_configuration(Bucket=s3_bucket)716 snapshot.match("get-bucket-lifecycle-conf", result)717 s3_client.delete_bucket_lifecycle(Bucket=s3_bucket)718 with pytest.raises(ClientError) as e:719 s3_client.get_bucket_lifecycle_configuration(Bucket=s3_bucket)720 snapshot.match("get-bucket-lifecycle-exc", e.value.response)721 @pytest.mark.aws_validated722 def test_delete_lifecycle_configuration_on_bucket_deletion(723 self, s3_client, s3_create_bucket, snapshot724 ):725 snapshot.add_transformer(snapshot.transform.key_value("BucketName"))726 bucket_name = f"test-bucket-{short_uid()}" # keep the same name for both bucket727 s3_create_bucket(Bucket=bucket_name)728 lfc = {729 "Rules": [730 {731 "Expiration": {"Days": 7},732 "ID": "wholebucket",733 "Filter": {"Prefix": ""},734 "Status": "Enabled",735 }736 ]737 }738 s3_client.put_bucket_lifecycle_configuration(Bucket=bucket_name, LifecycleConfiguration=lfc)739 result = s3_client.get_bucket_lifecycle_configuration(Bucket=bucket_name)740 snapshot.match("get-bucket-lifecycle-conf", result)741 s3_client.delete_bucket(Bucket=bucket_name)742 s3_create_bucket(Bucket=bucket_name)743 with pytest.raises(ClientError) as e:744 s3_client.get_bucket_lifecycle_configuration(Bucket=bucket_name)745 snapshot.match("get-bucket-lifecycle-exc", e.value.response)746 @pytest.mark.aws_validated747 @pytest.mark.skip_snapshot_verify(748 paths=[749 "$..ContentLanguage",750 "$..VersionId",751 "$..ETag", # TODO ETag should be the same?752 ]753 )754 def test_range_header_body_length(self, s3_client, s3_bucket, snapshot):755 # Test for https://github.com/localstack/localstack/issues/1952756 object_key = "sample.bin"757 chunk_size = 1024758 with io.BytesIO() as data:759 data.write(os.urandom(chunk_size * 2))760 data.seek(0)761 s3_client.upload_fileobj(data, s3_bucket, object_key)762 range_header = f"bytes=0-{(chunk_size - 1)}"763 resp = s3_client.get_object(Bucket=s3_bucket, Key=object_key, Range=range_header)764 content = resp["Body"].read()765 assert chunk_size == len(content)766 snapshot.match("get-object", resp)767 @pytest.mark.aws_validated768 def test_get_range_object_headers(self, s3_client, s3_bucket):769 object_key = "sample.bin"770 chunk_size = 1024771 with io.BytesIO() as data:772 data.write(os.urandom(chunk_size * 2))773 data.seek(0)774 s3_client.upload_fileobj(data, s3_bucket, object_key)775 range_header = f"bytes=0-{(chunk_size - 1)}"776 resp = s3_client.get_object(Bucket=s3_bucket, Key=object_key, Range=range_header)777 assert resp.get("AcceptRanges") == "bytes"778 resp_headers = resp["ResponseMetadata"]["HTTPHeaders"]779 assert "x-amz-request-id" in resp_headers780 assert "x-amz-id-2" in resp_headers781 # `content-language` should not be in the response782 if is_aws_cloud(): # fixme parity issue783 assert "content-language" not in resp_headers784 # We used to return `cache-control: no-cache` if the header wasn't set785 # by the client, but this was a bug because s3 doesn't do that. It simply786 # omits it.787 assert "cache-control" not in resp_headers788 # Do not send a content-encoding header as discussed in Issue #3608789 assert "content-encoding" not in resp_headers790 @pytest.mark.only_localstack791 def test_put_object_chunked_newlines(self, s3_client, s3_bucket):792 # Boto still does not support chunk encoding, which means we can't test with the client nor793 # aws_http_client_factory. See open issue: https://github.com/boto/boto3/issues/751794 # Test for https://github.com/localstack/localstack/issues/1571795 object_key = "data"796 body = "Hello\r\n\r\n\r\n\r\n"797 headers = {798 "Authorization": aws_stack.mock_aws_request_headers("s3")["Authorization"],799 "Content-Type": "audio/mpeg",800 "X-Amz-Content-Sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD",801 "X-Amz-Date": "20190918T051509Z",802 "X-Amz-Decoded-Content-Length": str(len(body)),803 }804 data = (805 "d;chunk-signature=af5e6c0a698b0192e9aa5d9083553d4d241d81f69ec62b184d05c509ad5166af\r\n"806 f"{body}\r\n0;chunk-signature=f2a50a8c0ad4d212b579c2489c6d122db88d8a0d0b987ea1f3e9d081074a5937\r\n"807 )808 # put object809 url = f"{config.service_url('s3')}/{s3_bucket}/{object_key}"810 requests.put(url, data, headers=headers, verify=False)811 # get object and assert content length812 downloaded_object = s3_client.get_object(Bucket=s3_bucket, Key=object_key)813 download_file_object = to_str(downloaded_object["Body"].read())814 assert len(body) == len(str(download_file_object))815 assert body == str(download_file_object)816 @pytest.mark.only_localstack817 def test_put_object_with_md5_and_chunk_signature(self, s3_client, s3_bucket):818 # Boto still does not support chunk encoding, which means we can't test with the client nor819 # aws_http_client_factory. See open issue: https://github.com/boto/boto3/issues/751820 # Test for https://github.com/localstack/localstack/issues/4987821 object_key = "test-runtime.properties"822 object_data = (823 "#20211122+0100\n"824 "#Mon Nov 22 20:10:44 CET 2021\n"825 "last.sync.url.test-space-key=2822a50f-4992-425a-b8fb-923735a9ddff317e3479-5907-46cf-b33a-60da9709274f\n"826 )827 object_data_chunked = (828 "93;chunk-signature=5be6b2d473e96bb9f297444da60bdf0ff8f5d2e211e1d551b3cf3646c0946641\r\n"829 f"{object_data}"830 "\r\n0;chunk-signature=bd5c830b94346b57ddc8805ba26c44a122256c207014433bf6579b0985f21df7\r\n\r\n"831 )832 content_md5 = base64.b64encode(hashlib.md5(object_data.encode()).digest()).decode()833 headers = {834 "Content-Md5": content_md5,835 "Content-Type": "application/octet-stream",836 "User-Agent": (837 "aws-sdk-java/1.11.951 Mac_OS_X/10.15.7 OpenJDK_64-Bit_Server_VM/11.0.11+9-LTS "838 "java/11.0.11 scala/2.13.6 kotlin/1.5.31 vendor/Amazon.com_Inc."839 ),840 "X-Amz-Content-Sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD",841 "X-Amz-Date": "20211122T191045Z",842 "X-Amz-Decoded-Content-Length": str(len(object_data)),843 "Content-Length": str(len(object_data_chunked)),844 "Connection": "Keep-Alive",845 "Expect": "100-continue",846 }847 url = s3_client.generate_presigned_url(848 "put_object",849 Params={850 "Bucket": s3_bucket,851 "Key": object_key,852 "ContentType": "application/octet-stream",853 "ContentMD5": content_md5,854 },855 )856 result = requests.put(url, data=object_data_chunked, headers=headers)857 assert result.status_code == 200, (result, result.content)858 @pytest.mark.aws_validated859 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId", "$..ContentLanguage"])860 def test_delete_object_tagging(self, s3_client, s3_bucket, snapshot):861 object_key = "test-key-tagging"862 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")863 # get object and assert response864 s3_obj = s3_client.get_object(Bucket=s3_bucket, Key=object_key)865 snapshot.match("get-obj", s3_obj)866 # delete object tagging867 s3_client.delete_object_tagging(Bucket=s3_bucket, Key=object_key)868 # assert that the object still exists869 s3_obj = s3_client.get_object(Bucket=s3_bucket, Key=object_key)870 snapshot.match("get-obj-after-tag-deletion", s3_obj)871 @pytest.mark.aws_validated872 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId"])873 def test_delete_non_existing_keys(self, s3_client, s3_bucket, snapshot):874 object_key = "test-key-nonexistent"875 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")876 response = s3_client.delete_objects(877 Bucket=s3_bucket,878 Delete={"Objects": [{"Key": object_key}, {"Key": "dummy1"}, {"Key": "dummy2"}]},879 )880 response["Deleted"].sort(key=itemgetter("Key"))881 snapshot.match("deleted-resp", response)882 assert len(response["Deleted"]) == 3883 assert "Errors" not in response884 @pytest.mark.aws_validated885 @pytest.mark.skip_snapshot_verify(886 paths=["$..Error.RequestID"]887 ) # fixme RequestID not in AWS response888 def test_delete_non_existing_keys_in_non_existing_bucket(self, s3_client, snapshot):889 with pytest.raises(ClientError) as e:890 s3_client.delete_objects(891 Bucket="non-existent-bucket",892 Delete={"Objects": [{"Key": "dummy1"}, {"Key": "dummy2"}]},893 )894 assert "NoSuchBucket" == e.value.response["Error"]["Code"]895 snapshot.match("error-non-existent-bucket", e.value.response)896 @pytest.mark.aws_validated897 def test_s3_request_payer(self, s3_client, s3_bucket, snapshot):898 response = s3_client.put_bucket_request_payment(899 Bucket=s3_bucket, RequestPaymentConfiguration={"Payer": "Requester"}900 )901 snapshot.match("put-bucket-request-payment", response)902 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200903 response = s3_client.get_bucket_request_payment(Bucket=s3_bucket)904 snapshot.match("get-bucket-request-payment", response)905 assert "Requester" == response["Payer"]906 @pytest.mark.aws_validated907 @pytest.mark.skip_snapshot_verify(908 paths=["$..Error.RequestID", "$..Grants..Grantee.DisplayName"]909 )910 def test_bucket_exists(self, s3_client, s3_bucket, snapshot):911 snapshot.add_transformer(912 [913 snapshot.transform.key_value("DisplayName"),914 snapshot.transform.key_value("ID", value_replacement="owner-id"),915 ]916 )917 s3_client.put_bucket_cors(918 Bucket=s3_bucket,919 CORSConfiguration={920 "CORSRules": [921 {922 "AllowedMethods": ["GET", "POST", "PUT", "DELETE"],923 "AllowedOrigins": ["localhost"],924 }925 ]926 },927 )928 response = s3_client.get_bucket_cors(Bucket=s3_bucket)929 snapshot.match("get-bucket-cors", response)930 result = s3_client.get_bucket_acl(Bucket=s3_bucket)931 snapshot.match("get-bucket-acl", result)932 with pytest.raises(ClientError) as e:933 s3_client.get_bucket_acl(Bucket="bucket-not-exists")934 snapshot.match("get-bucket-not-exists", e.value.response)935 @pytest.mark.aws_validated936 @pytest.mark.skip_snapshot_verify(937 paths=["$..VersionId", "$..ContentLanguage", "$..Error.RequestID"]938 )939 def test_s3_uppercase_key_names(self, s3_client, s3_create_bucket, snapshot):940 # bucket name should be case-sensitive941 bucket_name = f"testuppercase-{short_uid()}"942 s3_create_bucket(Bucket=bucket_name)943 # key name should be case-sensitive944 object_key = "camelCaseKey"945 s3_client.put_object(Bucket=bucket_name, Key=object_key, Body="something")946 res = s3_client.get_object(Bucket=bucket_name, Key=object_key)947 snapshot.match("response", res)948 with pytest.raises(ClientError) as e:949 s3_client.get_object(Bucket=bucket_name, Key="camelcasekey")950 snapshot.match("wrong-case-key", e.value.response)951 @pytest.mark.aws_validated952 def test_s3_download_object_with_lambda(953 self,954 s3_client,955 s3_create_bucket,956 create_lambda_function,957 lambda_client,958 lambda_su_role,959 logs_client,960 ):961 bucket_name = f"bucket-{short_uid()}"962 function_name = f"func-{short_uid()}"963 key = f"key-{short_uid()}"964 s3_create_bucket(Bucket=bucket_name)965 s3_client.put_object(Bucket=bucket_name, Key=key, Body="something..")966 create_lambda_function(967 handler_file=os.path.join(968 os.path.dirname(__file__),969 "../awslambda",970 "functions",971 "lambda_triggered_by_sqs_download_s3_file.py",972 ),973 func_name=function_name,974 role=lambda_su_role,975 runtime=LAMBDA_RUNTIME_PYTHON39,976 envvars=dict(977 {978 "BUCKET_NAME": bucket_name,979 "OBJECT_NAME": key,980 "LOCAL_FILE_NAME": "/tmp/" + key,981 }982 ),983 )984 lambda_client.invoke(FunctionName=function_name, InvocationType="Event")985 # TODO maybe this check can be improved (do not rely on logs)986 retry(987 check_expected_lambda_log_events_length,988 retries=10,989 sleep=1,990 function_name=function_name,991 regex_filter="success",992 expected_length=1,993 logs_client=logs_client,994 )995 @pytest.mark.aws_validated996 # TODO LocalStack adds this RequestID to the error response997 @pytest.mark.skip_snapshot_verify(paths=["$..Error.RequestID"])998 def test_precondition_failed_error(self, s3_client, s3_create_bucket, snapshot):999 bucket = f"bucket-{short_uid()}"1000 s3_create_bucket(Bucket=bucket)1001 s3_client.put_object(Bucket=bucket, Key="foo", Body=b'{"foo": "bar"}')1002 with pytest.raises(ClientError) as e:1003 s3_client.get_object(Bucket=bucket, Key="foo", IfMatch='"not good etag"')1004 snapshot.match("get-object-if-match", e.value.response)1005 @pytest.mark.aws_validated1006 @pytest.mark.xfail(reason="Error format is wrong and missing keys")1007 def test_s3_invalid_content_md5(self, s3_client, s3_bucket, snapshot):1008 # put object with invalid content MD51009 hashes = ["__invalid__", "000", "not base64 encoded checksum", "MTIz"]1010 for index, md5hash in enumerate(hashes):1011 with pytest.raises(ClientError) as e:1012 s3_client.put_object(1013 Bucket=s3_bucket,1014 Key="test-key",1015 Body="something",1016 ContentMD5=md5hash,1017 )1018 snapshot.match(f"md5-error-{index}", e.value.response)1019 @pytest.mark.aws_validated1020 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId", "$..ContentLanguage", "$..ETag"])1021 def test_s3_upload_download_gzip(self, s3_client, s3_bucket, snapshot):1022 data = "1234567890 " * 1001023 # Write contents to memory rather than a file.1024 upload_file_object = BytesIO()1025 with gzip.GzipFile(fileobj=upload_file_object, mode="w") as filestream:1026 filestream.write(data.encode("utf-8"))1027 # Upload gzip1028 response = s3_client.put_object(1029 Bucket=s3_bucket,1030 Key="test.gz",1031 ContentEncoding="gzip",1032 Body=upload_file_object.getvalue(),1033 )1034 snapshot.match("put-object", response)1035 # TODO: check why ETag is different1036 # Download gzip1037 downloaded_object = s3_client.get_object(Bucket=s3_bucket, Key="test.gz")1038 snapshot.match("get-object", downloaded_object)1039 download_file_object = BytesIO(downloaded_object["Body"].read())1040 with gzip.GzipFile(fileobj=download_file_object, mode="rb") as filestream:1041 downloaded_data = filestream.read().decode("utf-8")1042 assert downloaded_data == data1043 @pytest.mark.aws_validated1044 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId"])1045 def test_multipart_copy_object_etag(self, s3_client, s3_bucket, s3_multipart_upload, snapshot):1046 snapshot.add_transformer(1047 [1048 snapshot.transform.key_value("Location"),1049 snapshot.transform.key_value("Bucket"),1050 ]1051 )1052 key = "test.file"1053 copy_key = "copy.file"1054 src_object_path = f"{s3_bucket}/{key}"1055 content = "test content 123"1056 response = s3_multipart_upload(bucket=s3_bucket, key=key, data=content)1057 snapshot.match("multipart-upload", response)1058 multipart_etag = response["ETag"]1059 response = s3_client.copy_object(Bucket=s3_bucket, CopySource=src_object_path, Key=copy_key)1060 snapshot.match("copy-object", response)1061 copy_etag = response["CopyObjectResult"]["ETag"]1062 # etags should be different1063 assert copy_etag != multipart_etag1064 @pytest.mark.aws_validated1065 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId"])1066 def test_set_external_hostname(1067 self, s3_client, s3_bucket, s3_multipart_upload, monkeypatch, snapshot1068 ):1069 snapshot.add_transformer(1070 [1071 snapshot.transform.key_value("Location"),1072 snapshot.transform.key_value("Bucket"),1073 ]1074 )1075 monkeypatch.setattr(config, "HOSTNAME_EXTERNAL", "foobar")1076 key = "test.file"1077 content = "test content 123"1078 acl = "public-read"1079 # upload file1080 response = s3_multipart_upload(bucket=s3_bucket, key=key, data=content, acl=acl)1081 snapshot.match("multipart-upload", response)1082 if is_aws_cloud(): # TODO: default addressing is vhost for AWS1083 expected_url = f"{_bucket_url_vhost(bucket_name=s3_bucket)}/{key}"1084 else: # LS default is path addressing1085 expected_url = f"{_bucket_url(bucket_name=s3_bucket, localstack_host=config.HOSTNAME_EXTERNAL)}/{key}"1086 assert response["Location"] == expected_url1087 # download object via API1088 downloaded_object = s3_client.get_object(Bucket=s3_bucket, Key=key)1089 snapshot.match("get-object", response)1090 assert content == to_str(downloaded_object["Body"].read())1091 # download object directly from download link1092 download_url = response["Location"].replace(f"{config.HOSTNAME_EXTERNAL}:", "localhost:")1093 response = requests.get(download_url)1094 assert response.status_code == 2001095 assert to_str(response.content) == content1096 @pytest.mark.skip_offline1097 @pytest.mark.aws_validated1098 @pytest.mark.skip_snapshot_verify(paths=["$..AcceptRanges"])1099 def test_s3_lambda_integration(1100 self,1101 lambda_client,1102 create_lambda_function,1103 lambda_su_role,1104 s3_client,1105 s3_create_bucket,1106 create_tmp_folder_lambda,1107 snapshot,1108 ):1109 snapshot.add_transformer(snapshot.transform.s3_api())1110 handler_file = os.path.join(1111 os.path.dirname(__file__), "../awslambda", "functions", "lambda_s3_integration.js"1112 )1113 temp_folder = create_tmp_folder_lambda(1114 handler_file,1115 run_command="npm i @aws-sdk/client-s3; npm i @aws-sdk/s3-request-presigner",1116 )1117 function_name = f"func-integration-{short_uid()}"1118 create_lambda_function(1119 func_name=function_name,1120 zip_file=testutil.create_zip_file(temp_folder, get_content=True),1121 runtime=LAMBDA_RUNTIME_NODEJS14X,1122 handler="lambda_s3_integration.handler",1123 role=lambda_su_role,1124 )1125 s3_create_bucket(Bucket=function_name)1126 response = lambda_client.invoke(FunctionName=function_name)1127 presigned_url = response["Payload"].read()1128 presigned_url = json.loads(to_str(presigned_url))["body"].strip('"')1129 response = requests.put(presigned_url, verify=False)1130 assert 200 == response.status_code1131 response = s3_client.head_object(Bucket=function_name, Key="key.png")1132 snapshot.match("head_object", response)1133class TestS3TerraformRawRequests:1134 @pytest.mark.only_localstack1135 def test_terraform_request_sequence(self):1136 reqs = load_file(os.path.join(os.path.dirname(__file__), "../files", "s3.requests.txt"))1137 reqs = reqs.split("---")1138 for req in reqs:1139 header, _, body = req.strip().partition("\n\n")1140 req, _, headers = header.strip().partition("\n")1141 headers = {h.split(":")[0]: h.partition(":")[2].strip() for h in headers.split("\n")}1142 method, path, _ = req.split(" ")1143 url = f"{config.get_edge_url()}{path}"1144 result = getattr(requests, method.lower())(url, data=body, headers=headers)1145 assert result.status_code < 4001146class TestS3PresignedUrl:1147 """1148 These tests pertain to S3's presigned URL feature.1149 """1150 @pytest.mark.aws_validated1151 @pytest.mark.skip_snapshot_verify(paths=["$..VersionId", "$..ContentLanguage", "$..Expires"])1152 def test_put_object(self, s3_client, s3_bucket, snapshot):1153 snapshot.add_transformer(snapshot.transform.s3_api())1154 key = "my-key"1155 url = s3_client.generate_presigned_url(1156 "put_object", Params={"Bucket": s3_bucket, "Key": key}1157 )1158 requests.put(url, data="something", verify=False)1159 response = s3_client.get_object(Bucket=s3_bucket, Key=key)1160 assert response["Body"].read() == b"something"1161 snapshot.match("get_object", response)1162 @pytest.mark.aws_validated1163 @pytest.mark.xfail(1164 condition=not config.LEGACY_EDGE_PROXY, reason="failing with new HTTP gateway (only in CI)"1165 )1166 def test_post_object_with_files(self, s3_client, s3_bucket):1167 object_key = "test-presigned-post-key"1168 body = b"something body"1169 presigned_request = s3_client.generate_presigned_post(1170 Bucket=s3_bucket, Key=object_key, ExpiresIn=601171 )1172 # put object1173 response = requests.post(1174 presigned_request["url"],1175 data=presigned_request["fields"],1176 files={"file": body},1177 verify=False,1178 )1179 assert response.status_code == 2041180 # get object and compare results1181 downloaded_object = s3_client.get_object(Bucket=s3_bucket, Key=object_key)1182 assert downloaded_object["Body"].read() == body1183 @pytest.mark.aws_validated1184 def test_post_request_expires(self, s3_client, s3_bucket):1185 # presign a post with a short expiry time1186 object_key = "test-presigned-post-key"1187 presigned_request = s3_client.generate_presigned_post(1188 Bucket=s3_bucket, Key=object_key, ExpiresIn=21189 )1190 # sleep so it expires1191 time.sleep(3)1192 # attempt to use the presigned request1193 response = requests.post(1194 presigned_request["url"],1195 data=presigned_request["fields"],1196 files={"file": "file content"},1197 verify=False,1198 )1199 # FIXME: localstack returns 400 but aws returns 4031200 assert response.status_code in [400, 403]1201 assert "ExpiredToken" in response.text1202 @pytest.mark.aws_validated1203 def test_delete_has_empty_content_length_header(self, s3_client, s3_bucket):1204 for encoding in None, "gzip":1205 # put object1206 object_key = "key-by-hostname"1207 s3_client.put_object(1208 Bucket=s3_bucket,1209 Key=object_key,1210 Body="something",1211 ContentType="text/html; charset=utf-8",1212 )1213 url = s3_client.generate_presigned_url(1214 "delete_object", Params={"Bucket": s3_bucket, "Key": object_key}1215 )1216 # get object and assert headers1217 headers = {}1218 if encoding:1219 headers["Accept-Encoding"] = encoding1220 response = requests.delete(url, headers=headers, verify=False)1221 assert response.status_code == 2041222 assert not response.text1223 # AWS does not send a content-length header at all, legacy localstack sends a 0 length header1224 assert response.headers.get("content-length") in [1225 "0",1226 None,1227 ], f"Unexpected content-length in headers {response.headers}"1228 @pytest.mark.aws_validated1229 def test_head_has_correct_content_length_header(self, s3_client, s3_bucket):1230 body = "something body \n \n\r"1231 # put object1232 object_key = "key-by-hostname"1233 s3_client.put_object(1234 Bucket=s3_bucket,1235 Key=object_key,1236 Body=body,1237 ContentType="text/html; charset=utf-8",1238 )1239 url = s3_client.generate_presigned_url(1240 "head_object", Params={"Bucket": s3_bucket, "Key": object_key}1241 )1242 # get object and assert headers1243 response = requests.head(url, verify=False)1244 assert response.headers.get("content-length") == str(len(body))1245 @pytest.mark.aws_validated1246 @pytest.mark.skip_snapshot_verify(paths=["$..Expires", "$..AcceptRanges"])1247 def test_put_url_metadata(self, s3_client, s3_bucket, snapshot):1248 snapshot.add_transformer(snapshot.transform.s3_api())1249 # Object metadata should be passed as query params via presigned URL1250 # https://github.com/localstack/localstack/issues/5441251 metadata = {"foo": "bar"}1252 object_key = "key-by-hostname"1253 # put object via presigned URL1254 url = s3_client.generate_presigned_url(1255 "put_object",1256 Params={"Bucket": s3_bucket, "Key": object_key, "Metadata": metadata},1257 )1258 assert "x-amz-meta-foo=bar" in url1259 response = requests.put(url, data="content 123", verify=False)1260 assert response.ok, f"response returned {response.status_code}: {response.text}"1261 # response body should be empty, see https://github.com/localstack/localstack/issues/13171262 assert not response.text1263 # assert metadata is present1264 response = s3_client.head_object(Bucket=s3_bucket, Key=object_key)1265 assert response.get("Metadata", {}).get("foo") == "bar"1266 snapshot.match("head_object", response)1267 @pytest.mark.aws_validated1268 def test_get_object_ignores_request_body(self, s3_client, s3_bucket):1269 key = "foo-key"1270 body = "foobar"1271 s3_client.put_object(Bucket=s3_bucket, Key=key, Body=body)1272 url = s3_client.generate_presigned_url(1273 "get_object", Params={"Bucket": s3_bucket, "Key": key}1274 )1275 response = requests.get(url, data=b"get body is ignored by AWS")1276 assert response.status_code == 2001277 assert response.text == body1278 @pytest.mark.aws_validated1279 def test_put_object_with_md5_and_chunk_signature_bad_headers(1280 self,1281 s3_client,1282 s3_create_bucket,1283 ):1284 bucket_name = f"bucket-{short_uid()}"1285 object_key = "test-runtime.properties"1286 content_md5 = "pX8KKuGXS1f2VTcuJpqjkw=="1287 headers = {1288 "Content-Md5": content_md5,1289 "Content-Type": "application/octet-stream",1290 "X-Amz-Content-Sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD",1291 "X-Amz-Date": "20211122T191045Z",1292 "X-Amz-Decoded-Content-Length": "test", # string instead of int1293 "Content-Length": "10",1294 "Connection": "Keep-Alive",1295 "Expect": "100-continue",1296 }1297 s3_create_bucket(Bucket=bucket_name)1298 url = s3_client.generate_presigned_url(1299 "put_object",1300 Params={1301 "Bucket": bucket_name,1302 "Key": object_key,1303 "ContentType": "application/octet-stream",1304 "ContentMD5": content_md5,1305 },1306 )1307 result = requests.put(url, data="test", headers=headers)1308 assert result.status_code == 4031309 assert b"SignatureDoesNotMatch" in result.content1310 # check also no X-Amz-Decoded-Content-Length1311 headers.pop("X-Amz-Decoded-Content-Length")1312 result = requests.put(url, data="test", headers=headers)1313 assert result.status_code == 403, (result, result.content)1314 assert b"SignatureDoesNotMatch" in result.content1315 @pytest.mark.aws_validated1316 def test_s3_get_response_default_content_type(self, s3_client, s3_bucket):1317 # When no content type is provided by a PUT request1318 # 'binary/octet-stream' should be used1319 # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1320 # put object1321 object_key = "key-by-hostname"1322 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")1323 # get object and assert headers1324 url = s3_client.generate_presigned_url(1325 "get_object", Params={"Bucket": s3_bucket, "Key": object_key}1326 )1327 response = requests.get(url, verify=False)1328 assert response.headers["content-type"] == "binary/octet-stream"1329 @pytest.mark.aws_validated1330 def test_s3_presigned_url_expired(self, s3_presigned_client, s3_bucket, monkeypatch):1331 if not is_aws_cloud():1332 monkeypatch.setattr(config, "S3_SKIP_SIGNATURE_VALIDATION", False)1333 object_key = "key-expires-in-2"1334 s3_presigned_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")1335 # get object and assert headers1336 url = s3_presigned_client.generate_presigned_url(1337 "get_object", Params={"Bucket": s3_bucket, "Key": object_key}, ExpiresIn=21338 )1339 # retrieving it before expiry1340 resp = requests.get(url, verify=False)1341 assert resp.status_code == 2001342 assert to_str(resp.content) == "something"1343 time.sleep(3) # wait for the URL to expire1344 resp = requests.get(url, verify=False)1345 resp_content = to_str(resp.content)1346 assert resp.status_code == 4031347 assert "<Code>AccessDenied</Code>" in resp_content1348 assert "<Message>Request has expired</Message>" in resp_content1349 url = s3_presigned_client.generate_presigned_url(1350 "get_object",1351 Params={"Bucket": s3_bucket, "Key": object_key},1352 ExpiresIn=120,1353 )1354 resp = requests.get(url, verify=False)1355 assert resp.status_code == 2001356 assert to_str(resp.content) == "something"1357 @pytest.mark.aws_validated1358 def test_s3_get_response_content_type_same_as_upload_and_range(self, s3_client, s3_bucket):1359 # put object1360 object_key = "foo/bar/key-by-hostname"1361 content_type = "foo/bar; charset=utf-8"1362 s3_client.put_object(1363 Bucket=s3_bucket,1364 Key=object_key,1365 Body="something " * 20,1366 ContentType=content_type,1367 )1368 url = s3_client.generate_presigned_url(1369 "get_object", Params={"Bucket": s3_bucket, "Key": object_key}1370 )1371 # get object and assert headers1372 response = requests.get(url, verify=False)1373 assert content_type == response.headers["content-type"]1374 # get object using range query and assert headers1375 response = requests.get(url, headers={"Range": "bytes=0-18"}, verify=False)1376 assert content_type == response.headers["content-type"]1377 # test we only get the first 18 bytes from the object1378 assert "something something" == to_str(response.content)1379 @pytest.mark.aws_validated1380 def test_s3_presigned_post_success_action_status_201_response(self, s3_client, s3_bucket):1381 # a security policy is required if the bucket is not publicly writable1382 # see https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html#RESTObjectPOST-requests-form-fields1383 body = "something body"1384 # get presigned URL1385 object_key = "key-${filename}"1386 presigned_request = s3_client.generate_presigned_post(1387 Bucket=s3_bucket,1388 Key=object_key,1389 Fields={"success_action_status": "201"},1390 Conditions=[{"bucket": s3_bucket}, ["eq", "$success_action_status", "201"]],1391 ExpiresIn=60,1392 )1393 files = {"file": ("my-file", body)}1394 response = requests.post(1395 presigned_request["url"],1396 data=presigned_request["fields"],1397 files=files,1398 verify=False,1399 )1400 # test1401 assert 201 == response.status_code1402 json_response = xmltodict.parse(response.content)1403 assert "PostResponse" in json_response1404 json_response = json_response["PostResponse"]1405 # fixme 201 response is hardcoded1406 # see localstack.services.s3.s3_listener.ProxyListenerS3.get_201_response1407 if is_aws_cloud():1408 location = f"{_bucket_url_vhost(s3_bucket, aws_stack.get_region())}/key-my-file"1409 etag = '"43281e21fce675ac3bcb3524b38ca4ed"' # TODO check quoting of etag1410 else:1411 location = "http://localhost/key-my-file"1412 etag = "d41d8cd98f00b204e9800998ecf8427f"1413 assert json_response["Location"] == location1414 assert json_response["Bucket"] == s3_bucket1415 assert json_response["Key"] == "key-my-file"1416 assert json_response["ETag"] == etag1417 @pytest.mark.aws_validated1418 @pytest.mark.xfail(reason="Access-Control-Allow-Origin returns Origin value in LS")1419 def test_s3_get_response_headers(self, s3_client, s3_bucket, snapshot):1420 # put object and CORS configuration1421 object_key = "key-by-hostname"1422 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")1423 s3_client.put_bucket_cors(1424 Bucket=s3_bucket,1425 CORSConfiguration={1426 "CORSRules": [1427 {1428 "AllowedMethods": ["GET", "PUT", "POST"],1429 "AllowedOrigins": ["*"],1430 "ExposeHeaders": ["ETag", "x-amz-version-id"],1431 }1432 ]1433 },1434 )1435 bucket_cors_res = s3_client.get_bucket_cors(Bucket=s3_bucket)1436 snapshot.match("bucket-cors-response", bucket_cors_res)1437 # get object and assert headers1438 url = s3_client.generate_presigned_url(1439 "get_object", Params={"Bucket": s3_bucket, "Key": object_key}1440 )1441 # need to add Origin headers for S3 to send back the Access-Control-* headers1442 # as CORS is made for browsers1443 response = requests.get(url, verify=False, headers={"Origin": "http://localhost"})1444 assert response.headers["Access-Control-Expose-Headers"] == "ETag, x-amz-version-id"1445 assert response.headers["Access-Control-Allow-Methods"] == "GET, PUT, POST"1446 assert (1447 response.headers["Access-Control-Allow-Origin"] == "*"1448 ) # returns http://localhost in LS1449 @pytest.mark.aws_validated1450 @pytest.mark.xfail(reason="Behaviour diverges from AWS, Access-Control-* headers always added")1451 def test_s3_get_response_headers_without_origin(self, s3_client, s3_bucket):1452 # put object and CORS configuration1453 object_key = "key-by-hostname"1454 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")1455 s3_client.put_bucket_cors(1456 Bucket=s3_bucket,1457 CORSConfiguration={1458 "CORSRules": [1459 {1460 "AllowedMethods": ["GET", "PUT", "POST"],1461 "AllowedOrigins": ["*"],1462 "ExposeHeaders": ["ETag", "x-amz-version-id"],1463 }1464 ]1465 },1466 )1467 # get object and assert headers1468 url = s3_client.generate_presigned_url(1469 "get_object", Params={"Bucket": s3_bucket, "Key": object_key}1470 )1471 response = requests.get(url, verify=False)1472 assert "Access-Control-Expose-Headers" not in response.headers1473 assert "Access-Control-Allow-Methods" not in response.headers1474 assert "Access-Control-Allow-Origin" not in response.headers1475 @pytest.mark.aws_validated1476 def test_presigned_url_with_session_token(self, s3_create_bucket_with_client, sts_client):1477 bucket_name = f"bucket-{short_uid()}"1478 key_name = "key"1479 response = sts_client.get_session_token()1480 client = boto3.client(1481 "s3",1482 config=Config(signature_version="s3v4"),1483 endpoint_url=None1484 if os.environ.get("TEST_TARGET") == "AWS_CLOUD"1485 else "http://127.0.0.1:4566",1486 aws_access_key_id=response["Credentials"]["AccessKeyId"],1487 aws_secret_access_key=response["Credentials"]["SecretAccessKey"],1488 aws_session_token=response["Credentials"]["SessionToken"],1489 )1490 s3_create_bucket_with_client(s3_client=client, Bucket=bucket_name)1491 client.put_object(Body="test-value", Bucket=bucket_name, Key=key_name)1492 presigned_url = client.generate_presigned_url(1493 ClientMethod="get_object",1494 Params={"Bucket": bucket_name, "Key": key_name},1495 ExpiresIn=600,1496 )1497 response = requests.get(presigned_url)1498 assert response._content == b"test-value"1499 @pytest.mark.aws_validated1500 def test_s3_get_response_header_overrides(self, s3_client, s3_bucket):1501 # Signed requests may include certain header overrides in the querystring1502 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html1503 object_key = "key-header-overrides"1504 s3_client.put_object(Bucket=s3_bucket, Key=object_key, Body="something")1505 # get object and assert headers1506 expiry_date = "Wed, 21 Oct 2015 07:28:00 GMT"1507 url = s3_client.generate_presigned_url(1508 "get_object",1509 Params={1510 "Bucket": s3_bucket,1511 "Key": object_key,1512 "ResponseCacheControl": "max-age=74",1513 "ResponseContentDisposition": 'attachment; filename="foo.jpg"',1514 "ResponseContentEncoding": "identity",1515 "ResponseContentLanguage": "de-DE",1516 "ResponseContentType": "image/jpeg",1517 "ResponseExpires": expiry_date,1518 },1519 )1520 response = requests.get(url, verify=False)1521 headers = response.headers1522 assert headers["cache-control"] == "max-age=74"1523 assert headers["content-disposition"] == 'attachment; filename="foo.jpg"'1524 assert headers["content-encoding"] == "identity"1525 assert headers["content-language"] == "de-DE"1526 assert headers["content-type"] == "image/jpeg"1527 # Note: looks like depending on the environment/libraries, we can get different date formats...1528 possible_date_formats = ["2015-10-21T07:28:00Z", expiry_date]1529 assert headers["expires"] in possible_date_formats1530 @pytest.mark.aws_validated1531 def test_s3_copy_md5(self, s3_client, s3_bucket, snapshot):1532 src_key = "src"1533 s3_client.put_object(Bucket=s3_bucket, Key=src_key, Body="something")1534 # copy object1535 dest_key = "dest"1536 response = s3_client.copy_object(1537 Bucket=s3_bucket,1538 CopySource={"Bucket": s3_bucket, "Key": src_key},1539 Key=dest_key,1540 )1541 snapshot.match("copy-obj", response)1542 # Create copy object to try to match s3a setting Content-MD51543 dest_key2 = "dest"1544 url = s3_client.generate_presigned_url(1545 "copy_object",1546 Params={1547 "Bucket": s3_bucket,1548 "CopySource": {"Bucket": s3_bucket, "Key": src_key},1549 "Key": dest_key2,1550 },1551 )1552 request_response = requests.put(url, verify=False)1553 assert request_response.status_code == 2001554 @pytest.mark.aws_validated1555 @pytest.mark.xfail(reason="ACL behaviour is not implemented, see comments")1556 def test_s3_batch_delete_objects_using_requests_with_acl(1557 self, s3_client, s3_create_bucket, snapshot1558 ):1559 # If an object is created in a public bucket by the owner, it can't be deleted by anonymous clients1560 # https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#specifying-grantee-predefined-groups1561 # only "public" created objects can be deleted by anonymous clients1562 snapshot.add_transformer(snapshot.transform.s3_api())1563 bucket_name = f"bucket-{short_uid()}"1564 object_key_1 = "key-created-by-owner"1565 object_key_2 = "key-created-by-anonymous"1566 s3_create_bucket(Bucket=bucket_name, ACL="public-read-write")1567 s3_client.put_object(1568 Bucket=bucket_name, Key=object_key_1, Body="This body document", ACL="public-read-write"1569 )1570 anon = _anon_client("s3")1571 anon.put_object(1572 Bucket=bucket_name,1573 Key=object_key_2,1574 Body="This body document #2",1575 ACL="public-read-write",1576 )1577 # TODO delete does currently not work with S3_VIRTUAL_HOSTNAME1578 url = f"{_bucket_url(bucket_name, localstack_host=config.LOCALSTACK_HOSTNAME)}?delete"1579 data = f"""1580 <Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">1581 <Object>1582 <Key>{object_key_1}</Key>1583 </Object>1584 <Object>1585 <Key>{object_key_2}</Key>1586 </Object>1587 </Delete>1588 """1589 md = hashlib.md5(data.encode("utf-8")).digest()1590 contents_md5 = base64.b64encode(md).decode("utf-8")1591 header = {"content-md5": contents_md5, "x-amz-request-payer": "requester"}1592 r = requests.post(url=url, data=data, headers=header)1593 assert 200 == r.status_code1594 response = xmltodict.parse(r.content)1595 response["DeleteResult"].pop("@xmlns")1596 assert response["DeleteResult"]["Error"]["Key"] == object_key_11597 assert response["DeleteResult"]["Error"]["Code"] == "AccessDenied"1598 assert response["DeleteResult"]["Deleted"]["Key"] == object_key_21599 snapshot.match("multi-delete-with-requests", response)1600 response = s3_client.list_objects(Bucket=bucket_name)1601 assert 200 == response["ResponseMetadata"]["HTTPStatusCode"]1602 assert len(response["Contents"]) == 11603 snapshot.match("list-remaining-objects", response)1604 @pytest.mark.aws_validated1605 @pytest.mark.skip_snapshot_verify(1606 paths=[1607 "$..DeleteResult.Deleted..VersionId",1608 "$..Prefix",1609 ]1610 )1611 def test_s3_batch_delete_public_objects_using_requests(1612 self, s3_client, s3_create_bucket, snapshot1613 ):1614 # only "public" created objects can be deleted by anonymous clients1615 # https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#specifying-grantee-predefined-groups1616 snapshot.add_transformer(snapshot.transform.s3_api())1617 bucket_name = f"bucket-{short_uid()}"1618 object_key_1 = "key-created-by-anonymous-1"1619 object_key_2 = "key-created-by-anonymous-2"1620 s3_create_bucket(Bucket=bucket_name, ACL="public-read-write")1621 anon = _anon_client("s3")1622 anon.put_object(1623 Bucket=bucket_name, Key=object_key_1, Body="This body document", ACL="public-read-write"1624 )1625 anon.put_object(1626 Bucket=bucket_name,1627 Key=object_key_2,1628 Body="This body document #2",1629 ACL="public-read-write",1630 )1631 # TODO delete does currently not work with S3_VIRTUAL_HOSTNAME1632 url = f"{_bucket_url(bucket_name, localstack_host=config.LOCALSTACK_HOSTNAME)}?delete"1633 data = f"""1634 <Delete xmlns="http://s3.amazonaws.com/doc/2006-03-01/">1635 <Object>1636 <Key>{object_key_1}</Key>1637 </Object>1638 <Object>1639 <Key>{object_key_2}</Key>1640 </Object>1641 </Delete>1642 """1643 md = hashlib.md5(data.encode("utf-8")).digest()1644 contents_md5 = base64.b64encode(md).decode("utf-8")1645 header = {"content-md5": contents_md5, "x-amz-request-payer": "requester"}1646 r = requests.post(url=url, data=data, headers=header)1647 assert 200 == r.status_code1648 response = xmltodict.parse(r.content)1649 response["DeleteResult"].pop("@xmlns")1650 snapshot.match("multi-delete-with-requests", response)1651 response = s3_client.list_objects(Bucket=bucket_name)1652 snapshot.match("list-remaining-objects", response)1653 @pytest.mark.aws_validated1654 @pytest.mark.skip_snapshot_verify(1655 paths=[1656 "$..Error.Message", # TODO AWS does not include dot at the end1657 "$..Error.RequestID", # AWS has no RequestID here1658 "$..Error.StorageClass", # Missing in Localstack1659 "$..StorageClass", # Missing in Localstack1660 ]1661 )1662 def test_s3_get_deep_archive_object_restore(self, s3_client, s3_create_bucket, snapshot):1663 snapshot.add_transformer(snapshot.transform.s3_api())1664 bucket_name = f"bucket-{short_uid()}"1665 object_key = f"key-{short_uid()}"1666 s3_create_bucket(Bucket=bucket_name)1667 # put DEEP_ARCHIVE object1668 s3_client.put_object(1669 Bucket=bucket_name,1670 Key=object_key,1671 Body="body data",1672 StorageClass="DEEP_ARCHIVE",1673 )1674 with pytest.raises(ClientError) as e:1675 s3_client.get_object(Bucket=bucket_name, Key=object_key)1676 e.match("InvalidObjectState")1677 snapshot.match("get_object_invalid_state", e.value.response)1678 response = s3_client.restore_object(1679 Bucket=bucket_name,1680 Key=object_key,1681 RestoreRequest={1682 "Days": 30,1683 "GlacierJobParameters": {1684 "Tier": "Bulk",1685 },1686 },1687 )1688 snapshot.match("restore_object", response)1689 # AWS tier is currently configured to retrieve within 48 hours, so we cannot test the get-object here1690 response = s3_client.head_object(Bucket=bucket_name, Key=object_key)1691 if 'ongoing-request="false"' in response.get("Restore", ""):1692 # if the restoring happens in LocalStack (or was fast in AWS) we can retrieve the object1693 response = s3_client.get_object(Bucket=bucket_name, Key=object_key)1694 assert "etag" in response.get("ResponseMetadata").get("HTTPHeaders")1695 @pytest.mark.aws_validated1696 def test_create_bucket_with_existing_name(self, s3_client, s3_create_bucket, snapshot):1697 snapshot.add_transformer(snapshot.transform.s3_api())1698 bucket_name = f"bucket-{short_uid()}"1699 s3_create_bucket(1700 Bucket=bucket_name,1701 CreateBucketConfiguration={"LocationConstraint": "us-west-1"},1702 )1703 for loc_constraint in ["us-west-1", "us-east-2"]:1704 with pytest.raises(ClientError) as e:1705 s3_client.create_bucket(1706 Bucket=bucket_name,1707 CreateBucketConfiguration={"LocationConstraint": loc_constraint},1708 )1709 e.match("BucketAlreadyOwnedByYou")1710 snapshot.match(f"create-bucket-{loc_constraint}", e.value.response)1711 @pytest.mark.aws_validated1712 @pytest.mark.skip_snapshot_verify(paths=["$..Prefix"])1713 def test_s3_list_objects_empty_marker(self, s3_client, s3_create_bucket, snapshot):1714 snapshot.add_transformer(snapshot.transform.s3_api())1715 bucket_name = "test" + short_uid()1716 s3_create_bucket(Bucket=bucket_name)1717 resp = s3_client.list_objects(Bucket=bucket_name, Marker="")1718 snapshot.match("list-objects", resp)1719 @pytest.mark.aws_validated1720 @pytest.mark.skip_snapshot_verify(paths=["$..Prefix", "$..ContentLanguage", "$..VersionId"])1721 def test_s3_put_more_than_1000_items(self, s3_client, s3_create_bucket, snapshot):1722 snapshot.add_transformer(snapshot.transform.s3_api())1723 bucket_name = "test" + short_uid()1724 s3_create_bucket(Bucket=bucket_name)1725 for i in range(0, 1010, 1):1726 body = "test-" + str(i)1727 key = "test-key-" + str(i)1728 s3_client.put_object(Bucket=bucket_name, Key=key, Body=body)1729 # trying to get the last item of 1010 items added.1730 resp = s3_client.get_object(Bucket=bucket_name, Key="test-key-1009")1731 snapshot.match("get_object-1009", resp)1732 # trying to get the first item of 1010 items added.1733 resp = s3_client.get_object(Bucket=bucket_name, Key="test-key-0")1734 snapshot.match("get_object-0", resp)1735 # according docs for MaxKeys: the response might contain fewer keys but will never contain more.1736 # AWS returns less during testing1737 resp = s3_client.list_objects(Bucket=bucket_name, MaxKeys=1010)1738 assert 1010 >= len(resp["Contents"])1739 resp = s3_client.list_objects(Bucket=bucket_name, Delimiter="/")1740 assert 1000 == len(resp["Contents"])1741 # way too much content, remove it from this match1742 snapshot.add_transformer(1743 snapshot.transform.jsonpath(1744 "$..list-objects.Contents", "<content>", reference_replacement=False1745 )1746 )1747 snapshot.match("list-objects", resp)1748 next_marker = resp["NextMarker"]1749 # Second list1750 resp = s3_client.list_objects(Bucket=bucket_name, Marker=next_marker)1751 snapshot.match("list-objects-next_marker", resp)1752 assert 10 == len(resp["Contents"])1753 @pytest.mark.aws_validated1754 @pytest.mark.skip_snapshot_verify(paths=["$..AcceptRanges"])1755 def test_upload_big_file(self, s3_client, s3_create_bucket, snapshot):1756 snapshot.add_transformer(snapshot.transform.s3_api())1757 bucket_name = f"bucket-{short_uid()}"1758 key1 = "test_key1"1759 key2 = "test_key1"1760 s3_create_bucket(Bucket=bucket_name)1761 body1 = "\x01" * 100000001762 rs = s3_client.put_object(Bucket=bucket_name, Key=key1, Body=body1)1763 snapshot.match("put_object_key1", rs)1764 body2 = "a" * 100000001765 rs = s3_client.put_object(Bucket=bucket_name, Key=key2, Body=body2)1766 snapshot.match("put_object_key2", rs)1767 rs = s3_client.head_object(Bucket=bucket_name, Key=key1)1768 snapshot.match("head_object_key1", rs)1769 rs = s3_client.head_object(Bucket=bucket_name, Key=key2)1770 snapshot.match("head_object_key2", rs)1771 @pytest.mark.aws_validated1772 @pytest.mark.skip_snapshot_verify(1773 paths=["$..Delimiter", "$..EncodingType", "$..VersionIdMarker"]1774 )1775 def test_get_bucket_versioning_order(self, s3_client, s3_create_bucket, snapshot):1776 snapshot.add_transformer(snapshot.transform.s3_api())1777 bucket_name = f"bucket-{short_uid()}"1778 s3_create_bucket(Bucket=bucket_name)1779 rs = s3_client.list_object_versions(Bucket=bucket_name, EncodingType="url")1780 snapshot.match("list_object_versions_before", rs)1781 rs = s3_client.put_bucket_versioning(1782 Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"}1783 )1784 snapshot.match("put_bucket_versioning", rs)1785 rs = s3_client.get_bucket_versioning(Bucket=bucket_name)1786 snapshot.match("get_bucket_versioning", rs)1787 s3_client.put_object(Bucket=bucket_name, Key="test", Body="body")1788 s3_client.put_object(Bucket=bucket_name, Key="test", Body="body")1789 s3_client.put_object(Bucket=bucket_name, Key="test2", Body="body")1790 rs = s3_client.list_object_versions(1791 Bucket=bucket_name,1792 )1793 snapshot.match("list_object_versions", rs)1794 @pytest.mark.aws_validated1795 @pytest.mark.skip_snapshot_verify(paths=["$..ContentLanguage", "$..VersionId"])1796 def test_etag_on_get_object_call(self, s3_client, s3_create_bucket, snapshot):1797 snapshot.add_transformer(snapshot.transform.s3_api())1798 bucket_name = f"bucket-{short_uid()}"1799 object_key = "my-key"1800 s3_create_bucket(Bucket=bucket_name)1801 body = "Lorem ipsum dolor sit amet, ... " * 301802 rs = s3_client.put_object(Bucket=bucket_name, Key=object_key, Body=body)1803 rs = s3_client.get_object(Bucket=bucket_name, Key=object_key)1804 snapshot.match("get_object", rs)1805 range = 171806 rs = s3_client.get_object(1807 Bucket=bucket_name,1808 Key=object_key,1809 Range=f"bytes=0-{range-1}",1810 )1811 snapshot.match("get_object_range", rs)1812 @pytest.mark.aws_validated1813 @pytest.mark.skip_snapshot_verify(1814 paths=["$..Delimiter", "$..EncodingType", "$..VersionIdMarker"]1815 )1816 def test_s3_delete_object_with_version_id(self, s3_client, s3_create_bucket, snapshot):1817 snapshot.add_transformer(snapshot.transform.s3_api())1818 bucket_name = f"bucket-{short_uid()}"1819 test_1st_key = "aws/s3/testkey1.txt"1820 test_2nd_key = "aws/s3/testkey2.txt"1821 body = "Lorem ipsum dolor sit amet, ... " * 301822 s3_create_bucket(Bucket=bucket_name)1823 s3_client.put_bucket_versioning(1824 Bucket=bucket_name,1825 VersioningConfiguration={"Status": "Enabled"},1826 )1827 rs = s3_client.get_bucket_versioning(Bucket=bucket_name)1828 snapshot.match("get_bucket_versioning", rs)1829 # put 2 objects1830 rs = s3_client.put_object(Bucket=bucket_name, Key=test_1st_key, Body=body)1831 s3_client.put_object(Bucket=bucket_name, Key=test_2nd_key, Body=body)1832 version_id = rs["VersionId"]1833 # delete 1st object with version1834 rs = s3_client.delete_objects(1835 Bucket=bucket_name,1836 Delete={"Objects": [{"Key": test_1st_key, "VersionId": version_id}]},1837 )1838 deleted = rs["Deleted"][0]1839 assert test_1st_key == deleted["Key"]1840 assert version_id == deleted["VersionId"]1841 snapshot.match("delete_objects", rs)1842 rs = s3_client.list_object_versions(Bucket=bucket_name)1843 object_versions = [object["VersionId"] for object in rs["Versions"]]1844 snapshot.match("list_object_versions_after_delete", rs)1845 assert version_id not in object_versions1846 # disable versioning1847 s3_client.put_bucket_versioning(1848 Bucket=bucket_name,1849 VersioningConfiguration={"Status": "Suspended"},1850 )1851 rs = s3_client.get_bucket_versioning(Bucket=bucket_name)1852 snapshot.match("get_bucket_versioning_suspended", rs)1853 @pytest.mark.aws_validated1854 def test_s3_static_website_index(self, s3_client, s3_create_bucket):1855 bucket_name = f"bucket-{short_uid()}"1856 s3_create_bucket(Bucket=bucket_name, ACL="public-read")1857 s3_client.put_object(1858 Bucket=bucket_name,1859 Key="index.html",1860 Body="index",1861 ContentType="text/html",1862 ACL="public-read",1863 )1864 s3_client.put_bucket_website(1865 Bucket=bucket_name,1866 WebsiteConfiguration={1867 "IndexDocument": {"Suffix": "index.html"},1868 },1869 )1870 url = _website_bucket_url(bucket_name)1871 response = requests.get(url, verify=False)1872 assert 200 == response.status_code1873 assert "index" == response.text1874 @pytest.mark.aws_validated1875 def test_s3_static_website_hosting(self, s3_client, s3_create_bucket):1876 bucket_name = f"bucket-{short_uid()}"1877 s3_create_bucket(Bucket=bucket_name, ACL="public-read")1878 index_obj = s3_client.put_object(1879 Bucket=bucket_name,1880 Key="test/index.html",1881 Body="index",1882 ContentType="text/html",1883 ACL="public-read",1884 )1885 error_obj = s3_client.put_object(1886 Bucket=bucket_name,1887 Key="test/error.html",1888 Body="error",1889 ContentType="text/html",1890 ACL="public-read",1891 )1892 actual_key_obj = s3_client.put_object(1893 Bucket=bucket_name,1894 Key="actual/key.html",1895 Body="key",1896 ContentType="text/html",1897 ACL="public-read",1898 )1899 with_content_type_obj = s3_client.put_object(1900 Bucket=bucket_name,1901 Key="with-content-type/key.js",1902 Body="some js",1903 ContentType="application/javascript; charset=utf-8",1904 ACL="public-read",1905 )1906 s3_client.put_object(1907 Bucket=bucket_name,1908 Key="to-be-redirected.html",1909 WebsiteRedirectLocation="/actual/key.html",1910 ACL="public-read",1911 )1912 s3_client.put_bucket_website(1913 Bucket=bucket_name,1914 WebsiteConfiguration={1915 "IndexDocument": {"Suffix": "index.html"},1916 "ErrorDocument": {"Key": "test/error.html"},1917 },1918 )1919 website_url = _website_bucket_url(bucket_name)1920 # actual key1921 url = f"{website_url}/actual/key.html"1922 response = requests.get(url, verify=False)1923 assert 200 == response.status_code1924 assert "key" == response.text1925 assert "content-type" in response.headers1926 assert "text/html" == response.headers["content-type"]1927 assert "etag" in response.headers1928 assert actual_key_obj["ETag"] in response.headers["etag"]1929 # If-None-Match and Etag1930 response = requests.get(1931 url, headers={"If-None-Match": actual_key_obj["ETag"]}, verify=False1932 )1933 assert 304 == response.status_code1934 # key with specified content-type1935 url = f"{website_url}/with-content-type/key.js"1936 response = requests.get(url, verify=False)1937 assert 200 == response.status_code1938 assert "some js" == response.text1939 assert "content-type" in response.headers1940 assert "application/javascript; charset=utf-8" == response.headers["content-type"]1941 assert "etag" in response.headers1942 assert with_content_type_obj["ETag"] == response.headers["etag"]1943 # index document1944 url = f"{website_url}/test"1945 response = requests.get(url, verify=False)1946 assert 200 == response.status_code1947 assert "index" == response.text1948 assert "content-type" in response.headers1949 assert "text/html" in response.headers["content-type"]1950 assert "etag" in response.headers1951 assert index_obj["ETag"] == response.headers["etag"]1952 # root path test1953 url = f"{website_url}/"1954 response = requests.get(url, verify=False)1955 assert 404 == response.status_code1956 assert "error" == response.text1957 assert "content-type" in response.headers1958 assert "text/html" in response.headers["content-type"]1959 assert "etag" in response.headers1960 assert error_obj["ETag"] == response.headers["etag"]1961 # error document1962 url = f"{website_url}/something"1963 assert 404 == response.status_code1964 assert "error" == response.text1965 assert "content-type" in response.headers1966 assert "text/html" in response.headers["content-type"]1967 assert "etag" in response.headers1968 assert error_obj["ETag"] == response.headers["etag"]1969 # redirect object1970 url = f"{website_url}/to-be-redirected.html"1971 response = requests.get(url, verify=False, allow_redirects=False)1972 assert 301 == response.status_code1973 assert "location" in response.headers1974 assert "actual/key.html" in response.headers["location"]1975 response = requests.get(url, verify=False)1976 assert 200 == response.status_code1977 assert actual_key_obj["ETag"] == response.headers["etag"]1978class TestS3Cors:1979 @pytest.mark.aws_validated1980 # TODO x-amzn-requestid should be 'x-amz-request-id'1981 # TODO "Vary" contains more in AWS, other params are added additional in LocalStack1982 @pytest.mark.skip_snapshot_verify(1983 paths=[1984 "$..Access-Control-Allow-Headers",1985 "$..Connection",1986 "$..Location",1987 "$..Vary",1988 "$..Content-Type",1989 "$..x-amzn-requestid",1990 "$..last-modified",1991 "$..Last-Modified",1992 ]1993 )1994 def test_cors_with_allowed_origins(self, s3_client, s3_create_bucket, snapshot, monkeypatch):1995 monkeypatch.setattr(config, "DISABLE_CUSTOM_CORS_S3", False)1996 snapshot.add_transformer(self._get_cors_result_header_snapshot_transformer(snapshot))1997 bucket_cors_config = {1998 "CORSRules": [1999 {2000 "AllowedOrigins": ["https://localhost:4200"],2001 "AllowedMethods": ["GET", "PUT"],2002 "MaxAgeSeconds": 3000,2003 "AllowedHeaders": ["*"],2004 }2005 ]2006 }2007 bucket_name = f"bucket-{short_uid()}"2008 object_key = "424f6bae-c48f-42d8-9e25-52046aecc64d/document.pdf"2009 s3_create_bucket(Bucket=bucket_name)2010 s3_client.put_bucket_cors(Bucket=bucket_name, CORSConfiguration=bucket_cors_config)2011 # create signed url2012 url = s3_client.generate_presigned_url(2013 ClientMethod="put_object",2014 Params={2015 "Bucket": bucket_name,2016 "Key": object_key,2017 "ContentType": "application/pdf",2018 "ACL": "bucket-owner-full-control",2019 },2020 ExpiresIn=3600,2021 )2022 result = requests.put(2023 url,2024 data="something",2025 verify=False,2026 headers={2027 "Origin": "https://localhost:4200",2028 "Content-Type": "application/pdf",2029 },2030 )2031 assert result.status_code == 2002032 # result.headers is type CaseInsensitiveDict and needs to be converted first2033 snapshot.match("raw-response-headers", dict(result.headers))2034 bucket_cors_config = {2035 "CORSRules": [2036 {2037 "AllowedOrigins": [2038 "https://localhost:4200",2039 "https://localhost:4201",2040 ],2041 "AllowedMethods": ["GET", "PUT"],2042 "MaxAgeSeconds": 3000,2043 "AllowedHeaders": ["*"],2044 }2045 ]2046 }2047 s3_client.put_bucket_cors(Bucket=bucket_name, CORSConfiguration=bucket_cors_config)2048 # create signed url2049 url = s3_client.generate_presigned_url(2050 ClientMethod="put_object",2051 Params={2052 "Bucket": bucket_name,2053 "Key": object_key,2054 "ContentType": "application/pdf",2055 "ACL": "bucket-owner-full-control",2056 },2057 ExpiresIn=3600,2058 )2059 # mimic chrome behavior, sending OPTIONS request first for strict-origin-when-cross-origin2060 result = requests.options(2061 url,2062 headers={2063 "Origin": "https://localhost:4200",2064 "Access-Control-Request-Method": "PUT",2065 },2066 )2067 snapshot.match("raw-response-headers-2", dict(result.headers))2068 result = requests.put(2069 url,2070 data="something",2071 verify=False,2072 headers={2073 "Origin": "https://localhost:4200",2074 "Content-Type": "application/pdf",2075 },2076 )2077 assert result.status_code == 2002078 snapshot.match("raw-response-headers-3", dict(result.headers))2079 result = requests.put(2080 url,2081 data="something",2082 verify=False,2083 headers={2084 "Origin": "https://localhost:4201",2085 "Content-Type": "application/pdf",2086 },2087 )2088 assert result.status_code == 2002089 snapshot.match("raw-response-headers-4", dict(result.headers))2090 @pytest.mark.aws_validated2091 @pytest.mark.skip_snapshot_verify(2092 paths=[2093 "$..Access-Control-Allow-Headers",2094 "$..Connection",2095 "$..Location",2096 "$..Vary",2097 "$..Content-Type",2098 "$..x-amzn-requestid",2099 "$..last-modified",2100 "$..accept-ranges",2101 "$..content-language",2102 "$..content-md5",2103 "$..content-type",2104 "$..x-amz-version-id",2105 "$..Last-Modified",2106 "$..Accept-Ranges",2107 "$..raw-response-headers-2.Access-Control-Allow-Credentials",2108 ]2109 )2110 def test_cors_configurations(self, s3_client, s3_create_bucket, monkeypatch, snapshot):2111 monkeypatch.setattr(config, "DISABLE_CUSTOM_CORS_S3", False)2112 snapshot.add_transformer(self._get_cors_result_header_snapshot_transformer(snapshot))2113 bucket = f"test-cors-{short_uid()}"2114 object_key = "index.html"2115 url = "{}/{}".format(_bucket_url(bucket), object_key)2116 BUCKET_CORS_CONFIG = {2117 "CORSRules": [2118 {2119 "AllowedOrigins": [config.get_edge_url()],2120 "AllowedMethods": ["GET", "PUT"],2121 "MaxAgeSeconds": 3000,2122 "AllowedHeaders": ["x-amz-tagging"],2123 }2124 ]2125 }2126 s3_create_bucket(Bucket=bucket, ACL="public-read")2127 s3_client.put_bucket_cors(Bucket=bucket, CORSConfiguration=BUCKET_CORS_CONFIG)2128 s3_client.put_object(2129 Bucket=bucket, Key=object_key, Body="<h1>Index</html>", ACL="public-read"2130 )2131 response = requests.get(2132 url, headers={"Origin": config.get_edge_url(), "Content-Type": "text/html"}2133 )2134 assert 200 == response.status_code2135 snapshot.match("raw-response-headers", dict(response.headers))2136 BUCKET_CORS_CONFIG = {2137 "CORSRules": [2138 {2139 "AllowedOrigins": ["https://anydomain.com"],2140 "AllowedMethods": ["GET", "PUT"],2141 "MaxAgeSeconds": 3000,2142 "AllowedHeaders": ["x-amz-tagging"],2143 }2144 ]2145 }2146 s3_client.put_bucket_cors(Bucket=bucket, CORSConfiguration=BUCKET_CORS_CONFIG)2147 response = requests.get(2148 url, headers={"Origin": config.get_edge_url(), "Content-Type": "text/html"}2149 )2150 assert 200 == response.status_code2151 snapshot.match("raw-response-headers-2", dict(response.headers))2152 def _get_cors_result_header_snapshot_transformer(self, snapshot):2153 return [2154 snapshot.transform.key_value("x-amz-id-2", "<id>", reference_replacement=False),2155 snapshot.transform.key_value(2156 "x-amz-request-id", "<request-id>", reference_replacement=False2157 ),2158 snapshot.transform.key_value("Date", "<date>", reference_replacement=False),2159 snapshot.transform.key_value("Server", "<server>", reference_replacement=False),2160 snapshot.transform.key_value("Last-Modified", "<date>", reference_replacement=False),2161 ]2162 @pytest.mark.parametrize(2163 "signature_version, use_virtual_address",2164 [2165 ("s3", False),2166 ("s3", True),2167 ("s3v4", False),2168 ("s3v4", True),2169 ],2170 )2171 @pytest.mark.aws_validated2172 def test_presigned_url_signature_authentication_multi_part(2173 self,2174 s3_client,2175 s3_create_bucket,2176 signature_version,2177 use_virtual_address,2178 monkeypatch,2179 ):2180 monkeypatch.setattr(config, "S3_SKIP_SIGNATURE_VALIDATION", False)2181 bucket_name = f"presign-{short_uid()}"2182 s3_endpoint_path_style = _endpoint_url()2183 s3_create_bucket(Bucket=bucket_name)2184 object_key = "temp.txt"2185 s3_config = {"addressing_style": "virtual"} if use_virtual_address else {}2186 client = _s3_client_custom_config(2187 Config(signature_version=signature_version, s3=s3_config),2188 endpoint_url=s3_endpoint_path_style,2189 )2190 upload_id = client.create_multipart_upload(2191 Bucket=bucket_name,2192 Key=object_key,2193 )["UploadId"]2194 data = to_bytes("hello this is a upload test")2195 upload_file_object = BytesIO(data)2196 signed_url = _generate_presigned_url(2197 client,2198 {2199 "Bucket": bucket_name,2200 "Key": object_key,2201 "UploadId": upload_id,2202 "PartNumber": 1,2203 },2204 expires=4,2205 client_method="upload_part",2206 )2207 response = requests.put(signed_url, data=upload_file_object)2208 assert response.status_code == 2002209 multipart_upload_parts = [{"ETag": response.headers["ETag"], "PartNumber": 1}]2210 response = client.complete_multipart_upload(2211 Bucket=bucket_name,2212 Key=object_key,2213 MultipartUpload={"Parts": multipart_upload_parts},2214 UploadId=upload_id,2215 )2216 assert 200 == response["ResponseMetadata"]["HTTPStatusCode"]2217 simple_params = {"Bucket": bucket_name, "Key": object_key}2218 response = requests.get(_generate_presigned_url(client, simple_params, 4))2219 assert 200 == response.status_code2220 assert response.content == data2221 @pytest.mark.parametrize(2222 "signature_version, use_virtual_address",2223 [2224 ("s3", False),2225 ("s3", True),2226 ("s3v4", False),2227 ("s3v4", True),2228 ],2229 )2230 @pytest.mark.aws_validated2231 def test_presigned_url_signature_authentication_expired(2232 self,2233 s3_client,2234 s3_create_bucket,2235 signature_version,2236 use_virtual_address,2237 monkeypatch,2238 ):2239 monkeypatch.setattr(config, "S3_SKIP_SIGNATURE_VALIDATION", False)2240 bucket_name = f"presign-{short_uid()}"2241 s3_endpoint_path_style = _endpoint_url()2242 s3_create_bucket(Bucket=bucket_name)2243 object_key = "temp.txt"2244 s3_client.put_object(Key=object_key, Bucket=bucket_name, Body="123")2245 s3_config = {"addressing_style": "virtual"} if use_virtual_address else {}2246 client = _s3_client_custom_config(2247 Config(signature_version=signature_version, s3=s3_config),2248 endpoint_url=s3_endpoint_path_style,2249 )2250 url = _generate_presigned_url(client, {"Bucket": bucket_name, "Key": object_key}, expires=1)2251 time.sleep(1)2252 assert 403 == requests.get(url).status_code2253 @pytest.mark.parametrize(2254 "signature_version, use_virtual_address",2255 [2256 ("s3", False),2257 ("s3", True),2258 ("s3v4", False),2259 ("s3v4", True),2260 ],2261 )2262 @pytest.mark.aws_validated2263 def test_presigned_url_signature_authentication(2264 self,2265 s3_client,2266 s3_create_bucket,2267 signature_version,2268 use_virtual_address,2269 monkeypatch,2270 ):2271 monkeypatch.setattr(config, "S3_SKIP_SIGNATURE_VALIDATION", False)2272 bucket_name = f"presign-{short_uid()}"2273 s3_endpoint_path_style = _endpoint_url()2274 s3_url = _bucket_url_vhost(bucket_name) if use_virtual_address else _bucket_url(bucket_name)2275 s3_create_bucket(Bucket=bucket_name)2276 object_key = "temp.txt"2277 s3_client.put_object(Key=object_key, Bucket=bucket_name, Body="123")2278 s3_config = {"addressing_style": "virtual"} if use_virtual_address else {}2279 client = _s3_client_custom_config(2280 Config(signature_version=signature_version, s3=s3_config),2281 endpoint_url=s3_endpoint_path_style,2282 )2283 expires = 42284 # GET requests2285 simple_params = {"Bucket": bucket_name, "Key": object_key}2286 response = requests.get(_generate_presigned_url(client, simple_params, expires))2287 assert 200 == response.status_code2288 assert response.content == b"123"2289 params = {2290 "Bucket": bucket_name,2291 "Key": object_key,2292 "ResponseContentType": "text/plain",2293 "ResponseContentDisposition": "attachment; filename=test.txt",2294 }2295 presigned = _generate_presigned_url(client, params, expires)2296 response = requests.get(_generate_presigned_url(client, params, expires))2297 assert 200 == response.status_code2298 assert response.content == b"123"2299 object_data = "this should be found in when you download {}.".format(object_key)2300 # invalid requests2301 # TODO check how much sense it makes to make this url "invalid"...2302 assert (2303 4032304 == requests.get(2305 _make_url_invalid(s3_url, object_key, presigned),2306 data=object_data,2307 headers={"Content-Type": "my-fake-content/type"},2308 ).status_code2309 )2310 # put object valid2311 assert (2312 2002313 == requests.put(2314 _generate_presigned_url(client, simple_params, expires, client_method="put_object"),2315 data=object_data,2316 ).status_code2317 )2318 params = {2319 "Bucket": bucket_name,2320 "Key": object_key,2321 "ContentType": "text/plain",2322 }2323 presigned_put_url = _generate_presigned_url(2324 client, params, expires, client_method="put_object"2325 )2326 assert (2327 2002328 == requests.put(2329 presigned_put_url,2330 data=object_data,2331 headers={"Content-Type": "text/plain"},2332 ).status_code2333 )2334 # Invalid request2335 response = requests.put(2336 _make_url_invalid(s3_url, object_key, presigned_put_url),2337 data=object_data,2338 headers={"Content-Type": "my-fake-content/type"},2339 )2340 assert 403 == response.status_code2341 # DELETE requests2342 presigned_delete_url = _generate_presigned_url(2343 client, simple_params, expires, client_method="delete_object"2344 )2345 response = requests.delete(presigned_delete_url)2346 assert 204 == response.status_code2347class TestS3DeepArchive:2348 """2349 Test to cover DEEP_ARCHIVE Storage Class functionality.2350 """2351 @pytest.mark.aws_validated2352 def test_storage_class_deep_archive(self, s3_client, s3_resource, s3_bucket, tmpdir):2353 key = "my-key"2354 transfer_config = TransferConfig(multipart_threshold=5 * KB, multipart_chunksize=1 * KB)2355 def upload_file(size_in_kb: int):2356 file = tmpdir / f"test-file-{short_uid()}.bin"2357 data = b"1" * (size_in_kb * KB)2358 file.write(data=data, mode="w")2359 s3_client.upload_file(2360 Bucket=s3_bucket,2361 Key=key,2362 Filename=str(file.realpath()),2363 ExtraArgs={"StorageClass": "DEEP_ARCHIVE"},2364 Config=transfer_config,2365 )2366 upload_file(1)2367 upload_file(9)2368 upload_file(15)2369 objects = s3_resource.Bucket(s3_bucket).objects.all()2370 keys = []2371 for obj in objects:2372 keys.append(obj.key)2373 assert obj.storage_class == "DEEP_ARCHIVE"2374def _anon_client(service: str):2375 conf = Config(signature_version=UNSIGNED)2376 if os.environ.get("TEST_TARGET") == "AWS_CLOUD":2377 return boto3.client(service, config=conf, region_name=None)2378 return aws_stack.create_external_boto_client(service, config=conf)2379def _s3_client_custom_config(conf: Config, endpoint_url: str):2380 if os.environ.get("TEST_TARGET") == "AWS_CLOUD":2381 return boto3.client("s3", config=conf, endpoint_url=endpoint_url)2382 # TODO in future this should work with aws_stack.create_external_boto_client2383 # currently it doesn't as authenticate_presign_url_signv2 requires the secret_key to be 'test'2384 # return aws_stack.create_external_boto_client(2385 # "s3",2386 # config=conf,2387 # endpoint_url=endpoint_url,2388 # aws_access_key_id=TEST_AWS_ACCESS_KEY_ID,2389 # )2390 return boto3.client(2391 "s3",2392 endpoint_url=endpoint_url,2393 config=conf,2394 aws_access_key_id=TEST_AWS_ACCESS_KEY_ID,2395 aws_secret_access_key=TEST_AWS_SECRET_ACCESS_KEY,2396 )2397def _endpoint_url(region: str = "", localstack_host: str = None) -> str:2398 if not region:2399 region = config.DEFAULT_REGION2400 if os.environ.get("TEST_TARGET") == "AWS_CLOUD":2401 if region == "us-east-1":2402 return "https://s3.amazonaws.com"2403 else:2404 return f"http://s3.{region}.amazonaws.com"2405 return f"{config.get_edge_url(localstack_hostname=localstack_host or S3_VIRTUAL_HOSTNAME)}"2406def _bucket_url(bucket_name: str, region: str = "", localstack_host: str = None) -> str:2407 return f"{_endpoint_url(region, localstack_host)}/{bucket_name}"2408def _website_bucket_url(bucket_name: str):2409 # TODO depending on region the syntax of the website variy (dot vs dash before region)2410 if os.environ.get("TEST_TARGET") == "AWS_CLOUD":2411 region = config.DEFAULT_REGION2412 return f"http://{bucket_name}.s3-website-{region}.amazonaws.com"2413 return _bucket_url_vhost(bucket_name, localstack_host=constants.S3_STATIC_WEBSITE_HOSTNAME)2414def _bucket_url_vhost(bucket_name: str, region: str = "", localstack_host: str = None) -> str:2415 if not region:2416 region = config.DEFAULT_REGION2417 if os.environ.get("TEST_TARGET") == "AWS_CLOUD":2418 if region == "us-east-1":2419 return f"https://{bucket_name}.s3.amazonaws.com"2420 else:2421 return f"https://{bucket_name}.s3.{region}.amazonaws.com"2422 host = localstack_host or S3_VIRTUAL_HOSTNAME2423 s3_edge_url = config.get_edge_url(localstack_hostname=host)2424 # TODO might add the region here2425 return s3_edge_url.replace(f"://{host}", f"://{bucket_name}.{host}")2426def _generate_presigned_url(2427 client: "S3Client", params: dict, expires: int, client_method: str = "get_object"2428) -> str:2429 return client.generate_presigned_url(2430 client_method,2431 Params=params,2432 ExpiresIn=expires,2433 )2434def _make_url_invalid(url_prefix: str, object_key: str, url: str) -> str:2435 parsed = urlparse(url)2436 query_params = parse_qs(parsed.query)2437 if "Signature" in query_params:2438 # v2 style2439 return "{}/{}?AWSAccessKeyId={}&Signature={}&Expires={}".format(2440 url_prefix,2441 object_key,2442 query_params["AWSAccessKeyId"][0],2443 query_params["Signature"][0],2444 query_params["Expires"][0],2445 )2446 else:2447 # v4 style2448 return (2449 "{}/{}?X-Amz-Algorithm=AWS4-HMAC-SHA256&"2450 "X-Amz-Credential={}&X-Amz-Date={}&"2451 "X-Amz-Expires={}&X-Amz-SignedHeaders=host&"2452 "X-Amz-Signature={}"2453 ).format(2454 url_prefix,2455 object_key,2456 quote(query_params["X-Amz-Credential"][0]).replace("/", "%2F"),2457 query_params["X-Amz-Date"][0],2458 query_params["X-Amz-Expires"][0],2459 query_params["X-Amz-Signature"][0],...
fixtures.py
Source:fixtures.py
...174 )175 boto_config = boto_config.merge(external_boto_config)176 return aws_stack.create_external_boto_client("s3", config=boto_config)177@pytest.fixture(scope="class")178def s3_presigned_client() -> "S3Client":179 if os.environ.get("TEST_TARGET") == "AWS_CLOUD":180 return _client("s3")181 # can't set the timeouts to 0 like in the AWS CLI because the underlying http client requires values > 0182 boto_config = (183 botocore.config.Config(184 connect_timeout=1_000, read_timeout=1_000, retries={"total_max_attempts": 1}185 )186 if os.environ.get("TEST_DISABLE_RETRIES_AND_TIMEOUTS")187 else None188 )189 return aws_stack.connect_to_service(190 "s3",191 config=boto_config,192 aws_access_key_id=TEST_AWS_ACCESS_KEY_ID,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!