Best Python code snippet using localstack_python
s3_listener.py
Source:s3_listener.py
...609 if c.startswith(xml_prefix):610 response._content = re.compile(search).sub(replace, c)611def fix_delimiter(response):612 replace_in_xml_response(response, "<Delimiter>None<", "<Delimiter><")613def fix_xml_preamble_newline(method, path, headers, response):614 # some tools (Serverless) require a newline after the "<?xml ...>\n" preamble line, e.g., for LocationConstraint615 # this is required because upstream moto is generally collapsing all S3 XML responses:616 # https://github.com/spulec/moto/blob/3718cde444b3e0117072c29b087237e1787c3a66/moto/core/responses.py#L102-L104617 if is_object_download_request(method, path, headers):618 return619 replace_in_xml_response(response, r"(<\?xml [^>]+>)<", r"\1\n<")620def convert_to_chunked_encoding(method, path, response):621 if method != "GET" or path != "/":622 return623 if response.headers.get("Transfer-Encoding", "").lower() == "chunked":624 return625 response.headers["Transfer-Encoding"] = "chunked"626 response.headers.pop("Content-Encoding", None)627 response.headers.pop("Content-Length", None)628def strip_surrounding_quotes(s):629 if (s[0], s[-1]) in (('"', '"'), ("'", "'")):630 return s[1:-1]631 return s632def ret304_on_etag(data, headers, response):633 etag = response.headers.get("ETag")634 if etag:635 match = headers.get("If-None-Match")636 if match and strip_surrounding_quotes(match) == strip_surrounding_quotes(etag):637 response.status_code = 304638 response._content = ""639def remove_xml_preamble(response):640 """Removes <?xml ... ?> from a response content"""641 response._content = re.sub(r"^<\?[^\?]+\?>", "", to_str(response._content))642# --------------643# HELPER METHODS644# for lifecycle/replication/...645# --------------646def get_lifecycle(bucket_name):647 bucket_name = normalize_bucket_name(bucket_name)648 exists, code, body = is_bucket_available(bucket_name)649 if not exists:650 return xml_response(body, status_code=code)651 lifecycle = BUCKET_LIFECYCLE.get(bucket_name)652 status_code = 200653 if not lifecycle:654 lifecycle = {655 "Error": {656 "Code": "NoSuchLifecycleConfiguration",657 "Message": "The lifecycle configuration does not exist",658 "BucketName": bucket_name,659 }660 }661 status_code = 404662 body = xmltodict.unparse(lifecycle)663 return xml_response(body, status_code=status_code)664def get_replication(bucket_name):665 bucket_name = normalize_bucket_name(bucket_name)666 exists, code, body = is_bucket_available(bucket_name)667 if not exists:668 return xml_response(body, status_code=code)669 replication = BUCKET_REPLICATIONS.get(bucket_name)670 status_code = 200671 if not replication:672 replication = {673 "Error": {674 "Code": "ReplicationConfigurationNotFoundError",675 "Message": "The replication configuration was not found",676 "BucketName": bucket_name,677 }678 }679 status_code = 404680 body = xmltodict.unparse(replication)681 return xml_response(body, status_code=status_code)682def set_lifecycle(bucket_name, lifecycle):683 bucket_name = normalize_bucket_name(bucket_name)684 exists, code, body = is_bucket_available(bucket_name)685 if not exists:686 return xml_response(body, status_code=code)687 if isinstance(to_str(lifecycle), str):688 lifecycle = xmltodict.parse(lifecycle)689 BUCKET_LIFECYCLE[bucket_name] = lifecycle690 return 200691def delete_lifecycle(bucket_name):692 bucket_name = normalize_bucket_name(bucket_name)693 exists, code, body = is_bucket_available(bucket_name)694 if not exists:695 return xml_response(body, status_code=code)696 if BUCKET_LIFECYCLE.get(bucket_name):697 BUCKET_LIFECYCLE.pop(bucket_name)698def set_replication(bucket_name, replication):699 bucket_name = normalize_bucket_name(bucket_name)700 exists, code, body = is_bucket_available(bucket_name)701 if not exists:702 return xml_response(body, status_code=code)703 if isinstance(to_str(replication), str):704 replication = xmltodict.parse(replication)705 BUCKET_REPLICATIONS[bucket_name] = replication706 return 200707# -------------708# UTIL METHODS709# -------------710def is_bucket_available(bucket_name):711 body = {"Code": "200"}712 exists, code = bucket_exists(bucket_name)713 if not exists:714 body = {715 "Error": {716 "Code": code,717 "Message": "The bucket does not exist",718 "BucketName": bucket_name,719 }720 }721 return exists, code, body722 return True, 200, body723def bucket_exists(bucket_name):724 """Tests for the existence of the specified bucket. Returns the error code725 if the bucket does not exist (200 if the bucket does exist).726 """727 bucket_name = normalize_bucket_name(bucket_name)728 s3_client = aws_stack.connect_to_service("s3")729 try:730 s3_client.head_bucket(Bucket=bucket_name)731 except ClientError as err:732 error_code = err.response.get("Error").get("Code")733 return False, error_code734 return True, 200735def strip_chunk_signatures(body, content_length):736 # borrowed from https://github.com/spulec/moto/pull/4201737 body_io = io.BytesIO(body)738 new_body = bytearray(content_length)739 pos = 0740 line = body_io.readline()741 while line:742 # https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html#sigv4-chunked-body-definition743 # str(hex(chunk-size)) + ";chunk-signature=" + signature + \r\n + chunk-data + \r\n744 chunk_size = int(line[: line.find(b";")].decode("utf8"), 16)745 new_body[pos : pos + chunk_size] = body_io.read(chunk_size)746 pos = pos + chunk_size747 body_io.read(2) # skip trailing \r\n748 line = body_io.readline()749 return bytes(new_body)750def check_content_md5(data, headers):751 if headers.get("x-amz-content-sha256", None) == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD":752 content_length = headers.get("x-amz-decoded-content-length")753 if not content_length:754 return error_response(755 '"X-Amz-Decoded-Content-Length" header is missing',756 "SignatureDoesNotMatch",757 status_code=403,758 )759 try:760 content_length = int(content_length)761 except ValueError:762 return error_response(763 'Wrong "X-Amz-Decoded-Content-Length" header',764 "SignatureDoesNotMatch",765 status_code=403,766 )767 data = strip_chunk_signatures(data, content_length)768 actual = md5(data)769 try:770 md5_header = headers["Content-MD5"]771 if not is_base64(md5_header):772 raise Exception('Content-MD5 header is not in Base64 format: "%s"' % md5_header)773 expected = to_str(codecs.encode(base64.b64decode(md5_header), "hex"))774 except Exception:775 return error_response(776 "The Content-MD5 you specified is not valid.",777 "InvalidDigest",778 status_code=400,779 )780 if actual != expected:781 return error_response(782 "The Content-MD5 you specified did not match what we received.",783 "BadDigest",784 status_code=400,785 )786def error_response(message, code, status_code=400):787 result = {"Error": {"Code": code, "Message": message}}788 content = xmltodict.unparse(result)789 return xml_response(content, status_code=status_code)790def xml_response(content, status_code=200):791 headers = {"Content-Type": "application/xml"}792 return requests_response(content, status_code=status_code, headers=headers)793def no_such_key_error(resource, requestId=None, status_code=400):794 result = {795 "Error": {796 "Code": "NoSuchKey",797 "Message": "The resource you requested does not exist",798 "Resource": resource,799 "RequestId": requestId,800 }801 }802 content = xmltodict.unparse(result)803 return xml_response(content, status_code=status_code)804def no_such_bucket(bucket_name, requestId=None, status_code=404):805 # TODO: fix the response to match AWS bucket response when the webconfig is not set and bucket not exists806 result = {807 "Error": {808 "Code": "NoSuchBucket",809 "Message": "The specified bucket does not exist",810 "BucketName": bucket_name,811 "RequestId": requestId,812 "HostId": short_uid(),813 }814 }815 content = xmltodict.unparse(result)816 return xml_response(content, status_code=status_code)817def token_expired_error(resource, requestId=None, status_code=400):818 result = {819 "Error": {820 "Code": "ExpiredToken",821 "Message": "The provided token has expired.",822 "Resource": resource,823 "RequestId": requestId,824 }825 }826 content = xmltodict.unparse(result)827 return xml_response(content, status_code=status_code)828def expand_redirect_url(starting_url, key, bucket):829 """Add key and bucket parameters to starting URL query string."""830 parsed = urlparse(starting_url)831 query = collections.OrderedDict(parse_qsl(parsed.query))832 query.update([("key", key), ("bucket", bucket)])833 redirect_url = urlunparse(834 (835 parsed.scheme,836 parsed.netloc,837 parsed.path,838 parsed.params,839 urlencode(query),840 None,841 )842 )843 return redirect_url844def is_bucket_specified_in_domain_name(path, headers):845 host = headers.get("host", "")846 return re.match(r".*s3(\-website)?\.([^\.]+\.)?amazonaws.com", host)847def is_object_specific_request(path, headers):848 """Return whether the given request is specific to a certain S3 object.849 Note: the bucket name is usually specified as a path parameter,850 but may also be part of the domain name!"""851 bucket_in_domain = is_bucket_specified_in_domain_name(path, headers)852 parts = len(path.split("/"))853 return parts > (1 if bucket_in_domain else 2)854def empty_response():855 response = Response()856 response.status_code = 200857 response._content = ""858 return response859def handle_notification_request(bucket, method, data):860 if method == "GET":861 return handle_get_bucket_notification(bucket)862 if method == "PUT":863 return handle_put_bucket_notification(bucket, data)864 return empty_response()865def handle_get_bucket_notification(bucket):866 response = Response()867 response.status_code = 200868 response._content = ""869 # TODO check if bucket exists870 result = '<NotificationConfiguration xmlns="%s">' % XMLNS_S3871 if bucket in S3_NOTIFICATIONS:872 notifs = S3_NOTIFICATIONS[bucket]873 for notif in notifs:874 for dest in NOTIFICATION_DESTINATION_TYPES:875 if dest in notif:876 dest_dict = {877 "%sConfiguration"878 % dest: {879 "Id": notif["Id"],880 dest: notif[dest],881 "Event": notif["Event"],882 "Filter": notif["Filter"],883 }884 }885 result += xmltodict.unparse(dest_dict, full_document=False)886 result += "</NotificationConfiguration>"887 response._content = result888 return response889def _validate_filter_rules(filter_doc):890 rules = filter_doc.get("FilterRule")891 if not rules:892 return893 for rule in rules:894 name = rule.get("Name", "")895 if name.lower() not in ["suffix", "prefix"]:896 raise InvalidFilterRuleName(name)897 # TODO: check what other rules there are898def _sanitize_notification_filter_rules(filter_doc):899 rules = filter_doc.get("FilterRule")900 if not rules:901 return902 for rule in rules:903 name = rule.get("Name", "")904 if name.lower() not in ["suffix", "prefix"]:905 raise InvalidFilterRuleName(name)906 rule["Name"] = name.title()907def handle_put_bucket_notification(bucket, data):908 parsed = strip_xmlns(xmltodict.parse(data))909 notif_config = parsed.get("NotificationConfiguration")910 notifications = []911 for dest in NOTIFICATION_DESTINATION_TYPES:912 config = notif_config.get("%sConfiguration" % dest)913 configs = config if isinstance(config, list) else [config] if config else []914 for config in configs:915 events = config.get("Event")916 if isinstance(events, str):917 events = [events]918 event_filter = config.get("Filter", {})919 # make sure FilterRule is an array920 s3_filter = _get_s3_filter(event_filter)921 if s3_filter and not isinstance(s3_filter.get("FilterRule", []), list):922 s3_filter["FilterRule"] = [s3_filter["FilterRule"]]923 # make sure FilterRules are valid and sanitize if necessary924 _sanitize_notification_filter_rules(s3_filter)925 # create final details dict926 notification_details = {927 "Id": config.get("Id", str(uuid.uuid4())),928 "Event": events,929 dest: config.get(dest),930 "Filter": event_filter,931 }932 notifications.append(clone(notification_details))933 S3_NOTIFICATIONS[bucket] = notifications934 return empty_response()935def remove_bucket_notification(bucket):936 if bucket in S3_NOTIFICATIONS:937 del S3_NOTIFICATIONS[bucket]938class ProxyListenerS3(PersistingProxyListener):939 def api_name(self):940 return "s3"941 @staticmethod942 def is_s3_copy_request(headers, path):943 return "x-amz-copy-source" in headers or "x-amz-copy-source" in path944 @staticmethod945 def is_create_multipart_request(query):946 return query.startswith("uploads")947 @staticmethod948 def is_multipart_upload(query):949 return query.startswith("uploadId")950 @staticmethod951 def get_201_response(key, bucket_name):952 return """953 <PostResponse>954 <Location>{protocol}://{host}/{encoded_key}</Location>955 <Bucket>{bucket}</Bucket>956 <Key>{key}</Key>957 <ETag>{etag}</ETag>958 </PostResponse>959 """.format(960 protocol=get_service_protocol(),961 host=config.HOSTNAME_EXTERNAL,962 encoded_key=quote(key, safe=""),963 key=key,964 bucket=bucket_name,965 etag="d41d8cd98f00b204e9800998ecf8427f",966 )967 @staticmethod968 def _update_location(content, bucket_name):969 bucket_name = normalize_bucket_name(bucket_name)970 host = config.HOSTNAME_EXTERNAL971 if ":" not in host:972 host = f"{host}:{config.service_port('s3')}"973 return re.sub(974 r"<Location>\s*([a-zA-Z0-9\-]+)://[^/]+/([^<]+)\s*</Location>",975 r"<Location>%s://%s/%s/\2</Location>" % (get_service_protocol(), host, bucket_name),976 content,977 flags=re.MULTILINE,978 )979 @staticmethod980 def is_query_allowable(method, query):981 # Generally if there is a query (some/path/with?query) we don't want to send notifications982 if not query:983 return True984 # Except we do want to notify on multipart and presigned url upload completion985 contains_cred = "X-Amz-Credential" in query and "X-Amz-Signature" in query986 contains_key = "AWSAccessKeyId" in query and "Signature" in query987 # nodejs sdk putObjectCommand is adding x-id=putobject in the query988 allowed_query = "x-id=" in query.lower()989 if (990 (method == "POST" and query.startswith("uploadId"))991 or contains_cred992 or contains_key993 or allowed_query994 ):995 return True996 @staticmethod997 def parse_policy_expiration_date(expiration_string):998 try:999 dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT1)1000 except Exception:1001 dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT2)1002 # both date formats assume a UTC timezone ('Z' suffix), but it's not parsed as tzinfo into the datetime object1003 dt = dt.replace(tzinfo=datetime.timezone.utc)1004 return dt1005 def forward_request(self, method, path, data, headers):1006 # Create list of query parameteres from the url1007 parsed = urlparse("{}{}".format(config.get_edge_url(), path))1008 query_params = parse_qs(parsed.query)1009 path_orig = path1010 path = path.replace(1011 "#", "%23"1012 ) # support key names containing hashes (e.g., required by Amplify)1013 # extracting bucket name from the request1014 parsed_path = urlparse(path)1015 bucket_name = extract_bucket_name(headers, parsed_path.path)1016 if method == "PUT" and bucket_name and not re.match(BUCKET_NAME_REGEX, bucket_name):1017 if len(parsed_path.path) <= 1:1018 return error_response(1019 "Unable to extract valid bucket name. Please ensure that your AWS SDK is "1020 + "configured to use path style addressing, or send a valid "1021 + '<Bucket>.s3.localhost.localstack.cloud "Host" header',1022 "InvalidBucketName",1023 status_code=400,1024 )1025 return error_response(1026 "The specified bucket is not valid.",1027 "InvalidBucketName",1028 status_code=400,1029 )1030 # Detecting pre-sign url and checking signature1031 if any(p in query_params for p in SIGNATURE_V2_PARAMS) or any(1032 p in query_params for p in SIGNATURE_V4_PARAMS1033 ):1034 response = authenticate_presign_url(1035 method=method, path=path, data=data, headers=headers1036 )1037 if response is not None:1038 return response1039 # handling s3 website hosting requests1040 if is_static_website(headers) and method == "GET":1041 return serve_static_website(headers=headers, path=path, bucket_name=bucket_name)1042 # check content md5 hash integrity if not a copy request or multipart initialization1043 if (1044 "Content-MD5" in headers1045 and not self.is_s3_copy_request(headers, path)1046 and not self.is_create_multipart_request(parsed_path.query)1047 ):1048 response = check_content_md5(data, headers)1049 if response is not None:1050 return response1051 modified_data = None1052 # TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-11053 to_find1 = to_bytes("<LocationConstraint>us-east-1</LocationConstraint>")1054 to_find2 = to_bytes("<CreateBucketConfiguration")1055 if data and data.startswith(to_bytes("<")) and to_find1 in data and to_find2 in data:1056 # Note: with the latest version, <CreateBucketConfiguration> must either1057 # contain a valid <LocationConstraint>, or not be present at all in the body.1058 modified_data = b""1059 # POST requests to S3 may include a "${filename}" placeholder in the1060 # key, which should be replaced with an actual file name before storing.1061 if method == "POST":1062 original_data = not_none_or(modified_data, data)1063 expanded_data = multipart_content.expand_multipart_filename(original_data, headers)1064 if expanded_data is not original_data:1065 modified_data = expanded_data1066 # If no content-type is provided, 'binary/octet-stream' should be used1067 # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1068 if method == "PUT" and not headers.get("content-type"):1069 headers["content-type"] = "binary/octet-stream"1070 # parse query params1071 query = parsed_path.query1072 path = parsed_path.path1073 query_map = parse_qs(query, keep_blank_values=True)1074 # remap metadata query params (not supported in moto) to request headers1075 append_metadata_headers(method, query_map, headers)1076 # apply fixes1077 headers_changed = fix_metadata_key_underscores(request_headers=headers)1078 if query == "notification" or "notification" in query_map:1079 # handle and return response for ?notification request1080 response = handle_notification_request(bucket_name, method, data)1081 return response1082 # if the Expires key in the url is already expired then return error1083 if method == "GET" and "Expires" in query_map:1084 ts = datetime.datetime.fromtimestamp(1085 int(query_map.get("Expires")[0]), tz=datetime.timezone.utc1086 )1087 if is_expired(ts):1088 return token_expired_error(path, headers.get("x-amz-request-id"), 400)1089 # If multipart POST with policy in the params, return error if the policy has expired1090 if method == "POST":1091 policy_key, policy_value = multipart_content.find_multipart_key_value(1092 data, headers, "policy"1093 )1094 if policy_key and policy_value:1095 policy = json.loads(base64.b64decode(policy_value).decode("utf-8"))1096 expiration_string = policy.get("expiration", None) # Example: 2020-06-05T13:37:12Z1097 if expiration_string:1098 expiration_datetime = self.parse_policy_expiration_date(expiration_string)1099 if is_expired(expiration_datetime):1100 return token_expired_error(path, headers.get("x-amz-request-id"), 400)1101 if query == "cors" or "cors" in query_map:1102 if method == "GET":1103 return get_cors(bucket_name)1104 if method == "PUT":1105 return set_cors(bucket_name, data)1106 if method == "DELETE":1107 return delete_cors(bucket_name)1108 if query == "requestPayment" or "requestPayment" in query_map:1109 if method == "GET":1110 return get_request_payment(bucket_name)1111 if method == "PUT":1112 return set_request_payment(bucket_name, data)1113 if query == "lifecycle" or "lifecycle" in query_map:1114 if method == "GET":1115 return get_lifecycle(bucket_name)1116 if method == "PUT":1117 return set_lifecycle(bucket_name, data)1118 if method == "DELETE":1119 delete_lifecycle(bucket_name)1120 if query == "replication" or "replication" in query_map:1121 if method == "GET":1122 return get_replication(bucket_name)1123 if method == "PUT":1124 return set_replication(bucket_name, data)1125 if method == "DELETE" and validate_bucket_name(bucket_name):1126 delete_lifecycle(bucket_name)1127 path_orig_escaped = path_orig.replace("#", "%23")1128 if modified_data is not None or headers_changed or path_orig != path_orig_escaped:1129 data_to_return = not_none_or(modified_data, data)1130 if modified_data is not None:1131 headers["Content-Length"] = str(len(data_to_return or ""))1132 return Request(1133 url=path_orig_escaped,1134 data=data_to_return,1135 headers=headers,1136 method=method,1137 )1138 return True1139 def return_response(self, method, path, data, headers, response):1140 path = to_str(path)1141 method = to_str(method)1142 path = path.replace("#", "%23")1143 # persist this API call to disk1144 super(ProxyListenerS3, self).return_response(method, path, data, headers, response)1145 bucket_name = extract_bucket_name(headers, path)1146 # POST requests to S3 may include a success_action_redirect or1147 # success_action_status field, which should be used to redirect a1148 # client to a new location.1149 key = None1150 if method == "POST":1151 key, redirect_url = multipart_content.find_multipart_key_value(data, headers)1152 if key and redirect_url:1153 response.status_code = 3031154 response.headers["Location"] = expand_redirect_url(redirect_url, key, bucket_name)1155 LOGGER.debug(1156 "S3 POST {} to {}".format(response.status_code, response.headers["Location"])1157 )1158 expanded_data = multipart_content.expand_multipart_filename(data, headers)1159 key, status_code = multipart_content.find_multipart_key_value(1160 expanded_data, headers, "success_action_status"1161 )1162 if response.status_code == 201 and key:1163 response._content = self.get_201_response(key, bucket_name)1164 response.headers["Content-Length"] = str(len(response._content or ""))1165 response.headers["Content-Type"] = "application/xml; charset=utf-8"1166 return response1167 if response.status_code == 416:1168 if method == "GET":1169 return error_response(1170 "The requested range cannot be satisfied.", "InvalidRange", 4161171 )1172 elif method == "HEAD":1173 response.status_code = 2001174 return response1175 parsed = urlparse(path)1176 bucket_name_in_host = uses_host_addressing(headers)1177 should_send_notifications = all(1178 [1179 method in ("PUT", "POST", "DELETE"),1180 "/" in path[1:] or bucket_name_in_host or key,1181 # check if this is an actual put object request, because it could also be1182 # a put bucket request with a path like this: /bucket_name/1183 bucket_name_in_host1184 or key1185 or (len(path[1:].split("/")) > 1 and len(path[1:].split("/")[1]) > 0),1186 self.is_query_allowable(method, parsed.query),1187 ]1188 )1189 # get subscribers and send bucket notifications1190 if should_send_notifications:1191 # if we already have a good key, use it, otherwise examine the path1192 if key:1193 object_path = "/" + key1194 elif bucket_name_in_host:1195 object_path = parsed.path1196 else:1197 parts = parsed.path[1:].split("/", 1)1198 object_path = parts[1] if parts[1][0] == "/" else "/%s" % parts[1]1199 version_id = response.headers.get("x-amz-version-id", None)1200 send_notifications(method, bucket_name, object_path, version_id, headers)1201 # publish event for creation/deletion of buckets:1202 if method in ("PUT", "DELETE") and (1203 "/" not in path[1:] or len(path[1:].split("/")[1]) <= 01204 ):1205 event_type = (1206 event_publisher.EVENT_S3_CREATE_BUCKET1207 if method == "PUT"1208 else event_publisher.EVENT_S3_DELETE_BUCKET1209 )1210 event_publisher.fire_event(1211 event_type, payload={"n": event_publisher.get_hash(bucket_name)}1212 )1213 # fix an upstream issue in moto S3 (see https://github.com/localstack/localstack/issues/382)1214 if method == "PUT":1215 if parsed.query == "policy":1216 response._content = ""1217 response.status_code = 2041218 return response1219 # when creating s3 bucket using aws s3api the return header contains 'Location' param1220 if key is None:1221 # if the bucket is created in 'us-east-1' the location header contains bucket as path1222 # else the the header contains bucket url1223 if aws_stack.get_region() == "us-east-1":1224 response.headers["Location"] = "/{}".format(bucket_name)1225 else:1226 # Note: we need to set the correct protocol here1227 protocol = (1228 headers.get(constants.HEADER_LOCALSTACK_EDGE_URL, "").split("://")[0]1229 or "http"1230 )1231 response.headers["Location"] = "{}://{}.{}:{}/".format(1232 protocol,1233 bucket_name,1234 constants.S3_VIRTUAL_HOSTNAME,1235 config.EDGE_PORT,1236 )1237 if response is not None:1238 reset_content_length = False1239 # append CORS headers and other annotations/patches to response1240 append_cors_headers(1241 bucket_name,1242 request_method=method,1243 request_headers=headers,1244 response=response,1245 )1246 append_last_modified_headers(response=response)1247 append_list_objects_marker(method, path, data, response)1248 fix_range_content_type(bucket_name, path, headers, response)1249 fix_delete_objects_response(bucket_name, method, parsed, data, headers, response)1250 fix_metadata_key_underscores(response=response)1251 fix_creation_date(method, path, response=response)1252 ret304_on_etag(data, headers, response)1253 append_aws_request_troubleshooting_headers(response)1254 fix_delimiter(response)1255 fix_xml_preamble_newline(method, path, headers, response)1256 if method == "PUT":1257 set_object_expiry(path, headers)1258 # Remove body from PUT response on presigned URL1259 # https://github.com/localstack/localstack/issues/13171260 if (1261 method == "PUT"1262 and int(response.status_code) < 4001263 and (1264 "X-Amz-Security-Token=" in path1265 or "X-Amz-Credential=" in path1266 or "AWSAccessKeyId=" in path1267 )1268 ):1269 response._content = ""...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!