How to use append_metadata_headers method in localstack

Best Python code snippet using localstack_python

s3_listener.py

Source:s3_listener.py Github

copy

Full Screen

...514 response._content = content.replace(515 "</ListBucketResult>", "%s</ListBucketResult>" % insert516 )517 response.headers.pop("Content-Length", None)518def append_metadata_headers(method, query_map, headers):519 for key, value in query_map.items():520 if key.lower().startswith(OBJECT_METADATA_KEY_PREFIX):521 if headers.get(key) is None:522 headers[key] = value[0]523def fix_location_constraint(response):524 """Make sure we return a valid non-empty LocationConstraint, as this otherwise breaks Serverless."""525 try:526 content = to_str(response.content or "") or ""527 except Exception:528 content = ""529 if "LocationConstraint" in content:530 pattern = r"<LocationConstraint([^>]*)>\s*</LocationConstraint>"531 replace = r"<LocationConstraint\1>%s</LocationConstraint>" % aws_stack.get_region()532 response._content = re.sub(pattern, replace, content)533 remove_xml_preamble(response)534def fix_range_content_type(bucket_name, path, headers, response):535 # Fix content type for Range requests - https://github.com/localstack/localstack/issues/1259536 if "Range" not in headers:537 return538 if response.status_code >= 400:539 return540 s3_client = aws_stack.connect_to_service("s3")541 path = urlparse.urlparse(urlparse.unquote(path)).path542 key_name = extract_key_name(headers, path)543 result = s3_client.head_object(Bucket=bucket_name, Key=key_name)544 content_type = result["ContentType"]545 if response.headers.get("Content-Type") == "text/html; charset=utf-8":546 response.headers["Content-Type"] = content_type547def fix_delete_objects_response(bucket_name, method, parsed_path, data, headers, response):548 # Deleting non-existing keys should not result in errors.549 # Fixes https://github.com/localstack/localstack/issues/1893550 if not (method == "POST" and parsed_path.query == "delete" and "<Delete" in to_str(data or "")):551 return552 content = to_str(response._content)553 if "<Error>" not in content:554 return555 result = xmltodict.parse(content).get("DeleteResult")556 errors = result.get("Error")557 errors = errors if isinstance(errors, list) else [errors]558 deleted = result.get("Deleted")559 if not isinstance(result.get("Deleted"), list):560 deleted = result["Deleted"] = [deleted] if deleted else []561 for entry in list(errors):562 if set(entry.keys()) == set(["Key"]):563 errors.remove(entry)564 deleted.append(entry)565 if not errors:566 result.pop("Error")567 response._content = xmltodict.unparse({"DeleteResult": result})568def fix_metadata_key_underscores(request_headers={}, response=None):569 # fix for https://github.com/localstack/localstack/issues/1790570 underscore_replacement = "---"571 meta_header_prefix = "x-amz-meta-"572 prefix_len = len(meta_header_prefix)573 updated = False574 for key in list(request_headers.keys()):575 if key.lower().startswith(meta_header_prefix):576 key_new = meta_header_prefix + key[prefix_len:].replace("_", underscore_replacement)577 if key != key_new:578 request_headers[key_new] = request_headers.pop(key)579 updated = True580 if response is not None:581 for key in list(response.headers.keys()):582 if key.lower().startswith(meta_header_prefix):583 key_new = meta_header_prefix + key[prefix_len:].replace(underscore_replacement, "_")584 if key != key_new:585 response.headers[key_new] = response.headers.pop(key)586 return updated587def fix_creation_date(method, path, response):588 if method != "GET" or path != "/":589 return590 response._content = re.sub(591 r"(\.[0-9]+)(\+00:00)?</CreationDate>",592 r"\1Z</CreationDate>",593 to_str(response._content),594 )595def fix_delimiter(data, headers, response):596 if response.status_code == 200 and response._content:597 c, xml_prefix, delimiter = response._content, "<?xml", "<Delimiter><"598 pattern = "[<]Delimiter[>]None[<]"599 if isinstance(c, bytes):600 xml_prefix, delimiter = xml_prefix.encode(), delimiter.encode()601 pattern = pattern.encode()602 if c.startswith(xml_prefix):603 response._content = re.compile(pattern).sub(delimiter, c)604def convert_to_chunked_encoding(method, path, response):605 if method != "GET" or path != "/":606 return607 if response.headers.get("Transfer-Encoding", "").lower() == "chunked":608 return609 response.headers["Transfer-Encoding"] = "chunked"610 response.headers.pop("Content-Encoding", None)611 response.headers.pop("Content-Length", None)612def unquote(s):613 if (s[0], s[-1]) in (('"', '"'), ("'", "'")):614 return s[1:-1]615 return s616def ret304_on_etag(data, headers, response):617 etag = response.headers.get("ETag")618 if etag:619 match = headers.get("If-None-Match")620 if match and unquote(match) == unquote(etag):621 response.status_code = 304622 response._content = ""623def fix_etag_for_multipart(data, headers, response):624 # Fix for https://github.com/localstack/localstack/issues/1978625 if headers.get(CONTENT_SHA256_HEADER) == STREAMING_HMAC_PAYLOAD:626 try:627 if b"chunk-signature=" not in to_bytes(data):628 return629 correct_hash = md5(strip_chunk_signatures(data))630 tags = r"<ETag>%s</ETag>"631 pattern = r"(&#34;)?([^<&]+)(&#34;)?"632 replacement = r"\g<1>%s\g<3>" % correct_hash633 response._content = re.sub(tags % pattern, tags % replacement, to_str(response.content))634 if response.headers.get("ETag"):635 response.headers["ETag"] = re.sub(pattern, replacement, response.headers["ETag"])636 except Exception:637 pass638def remove_xml_preamble(response):639 """Removes <?xml ... ?> from a response content"""640 response._content = re.sub(r"^<\?[^\?]+\?>", "", to_str(response._content))641# --------------642# HELPER METHODS643# for lifecycle/replication/...644# --------------645def get_lifecycle(bucket_name):646 bucket_name = normalize_bucket_name(bucket_name)647 exists, code, body = is_bucket_available(bucket_name)648 if not exists:649 return xml_response(body, status_code=code)650 lifecycle = BUCKET_LIFECYCLE.get(bucket_name)651 status_code = 200652 if not lifecycle:653 lifecycle = {654 "Error": {655 "Code": "NoSuchLifecycleConfiguration",656 "Message": "The lifecycle configuration does not exist",657 "BucketName": bucket_name,658 }659 }660 status_code = 404661 body = xmltodict.unparse(lifecycle)662 return xml_response(body, status_code=status_code)663def get_replication(bucket_name):664 bucket_name = normalize_bucket_name(bucket_name)665 exists, code, body = is_bucket_available(bucket_name)666 if not exists:667 return xml_response(body, status_code=code)668 replication = BUCKET_REPLICATIONS.get(bucket_name)669 status_code = 200670 if not replication:671 replication = {672 "Error": {673 "Code": "ReplicationConfigurationNotFoundError",674 "Message": "The replication configuration was not found",675 "BucketName": bucket_name,676 }677 }678 status_code = 404679 body = xmltodict.unparse(replication)680 return xml_response(body, status_code=status_code)681def set_lifecycle(bucket_name, lifecycle):682 bucket_name = normalize_bucket_name(bucket_name)683 exists, code, body = is_bucket_available(bucket_name)684 if not exists:685 return xml_response(body, status_code=code)686 if isinstance(to_str(lifecycle), six.string_types):687 lifecycle = xmltodict.parse(lifecycle)688 BUCKET_LIFECYCLE[bucket_name] = lifecycle689 return 200690def delete_lifecycle(bucket_name):691 bucket_name = normalize_bucket_name(bucket_name)692 exists, code, body = is_bucket_available(bucket_name)693 if not exists:694 return xml_response(body, status_code=code)695 if BUCKET_LIFECYCLE.get(bucket_name):696 BUCKET_LIFECYCLE.pop(bucket_name)697def set_replication(bucket_name, replication):698 bucket_name = normalize_bucket_name(bucket_name)699 exists, code, body = is_bucket_available(bucket_name)700 if not exists:701 return xml_response(body, status_code=code)702 if isinstance(to_str(replication), six.string_types):703 replication = xmltodict.parse(replication)704 BUCKET_REPLICATIONS[bucket_name] = replication705 return 200706# -------------707# UTIL METHODS708# -------------709def strip_chunk_signatures(data):710 # For clients that use streaming v4 authentication, the request contains chunk signatures711 # in the HTTP body (see example below) which we need to strip as moto cannot handle them712 #713 # 17;chunk-signature=6e162122ec4962bea0b18bc624025e6ae4e9322bdc632762d909e87793ac5921714 # <payload data ...>715 # 0;chunk-signature=927ab45acd82fc90a3c210ca7314d59fedc77ce0c914d79095f8cc9563cf2c70716 data_new = ""717 if data is not None:718 data_new = re.sub(719 b"(^|\r\n)[0-9a-fA-F]+;chunk-signature=[0-9a-f]{64}(\r\n)(\r\n$)?",720 b"",721 to_bytes(data),722 flags=re.MULTILINE | re.DOTALL,723 )724 return data_new725def is_bucket_available(bucket_name):726 body = {"Code": "200"}727 exists, code = bucket_exists(bucket_name)728 if not exists:729 body = {730 "Error": {731 "Code": code,732 "Message": "The bucket does not exist",733 "BucketName": bucket_name,734 }735 }736 return exists, code, body737 return True, 200, body738def bucket_exists(bucket_name):739 """Tests for the existence of the specified bucket. Returns the error code740 if the bucket does not exist (200 if the bucket does exist).741 """742 bucket_name = normalize_bucket_name(bucket_name)743 s3_client = aws_stack.connect_to_service("s3")744 try:745 s3_client.head_bucket(Bucket=bucket_name)746 except ClientError as err:747 error_code = err.response.get("Error").get("Code")748 return False, error_code749 return True, 200750def check_content_md5(data, headers):751 actual = md5(strip_chunk_signatures(data))752 try:753 md5_header = headers["Content-MD5"]754 if not is_base64(md5_header):755 raise Exception('Content-MD5 header is not in Base64 format: "%s"' % md5_header)756 expected = to_str(codecs.encode(base64.b64decode(md5_header), "hex"))757 except Exception:758 return error_response(759 "The Content-MD5 you specified is not valid.",760 "InvalidDigest",761 status_code=400,762 )763 if actual != expected:764 return error_response(765 "The Content-MD5 you specified did not match what we received.",766 "BadDigest",767 status_code=400,768 )769def error_response(message, code, status_code=400):770 result = {"Error": {"Code": code, "Message": message}}771 content = xmltodict.unparse(result)772 return xml_response(content, status_code=status_code)773def xml_response(content, status_code=200):774 headers = {"Content-Type": "application/xml"}775 return requests_response(content, status_code=status_code, headers=headers)776def no_such_key_error(resource, requestId=None, status_code=400):777 result = {778 "Error": {779 "Code": "NoSuchKey",780 "Message": "The resource you requested does not exist",781 "Resource": resource,782 "RequestId": requestId,783 }784 }785 content = xmltodict.unparse(result)786 return xml_response(content, status_code=status_code)787def no_such_bucket(bucket_name, requestId=None, status_code=404):788 # TODO: fix the response to match AWS bucket response when the webconfig is not set and bucket not exists789 result = {790 "Error": {791 "Code": "NoSuchBucket",792 "Message": "The specified bucket does not exist",793 "BucketName": bucket_name,794 "RequestId": requestId,795 "HostId": short_uid(),796 }797 }798 content = xmltodict.unparse(result)799 return xml_response(content, status_code=status_code)800def token_expired_error(resource, requestId=None, status_code=400):801 result = {802 "Error": {803 "Code": "ExpiredToken",804 "Message": "The provided token has expired.",805 "Resource": resource,806 "RequestId": requestId,807 }808 }809 content = xmltodict.unparse(result)810 return xml_response(content, status_code=status_code)811def expand_redirect_url(starting_url, key, bucket):812 """Add key and bucket parameters to starting URL query string."""813 parsed = urlparse.urlparse(starting_url)814 query = collections.OrderedDict(urlparse.parse_qsl(parsed.query))815 query.update([("key", key), ("bucket", bucket)])816 redirect_url = urlparse.urlunparse(817 (818 parsed.scheme,819 parsed.netloc,820 parsed.path,821 parsed.params,822 urlparse.urlencode(query),823 None,824 )825 )826 return redirect_url827def is_bucket_specified_in_domain_name(path, headers):828 host = headers.get("host", "")829 return re.match(r".*s3(\-website)?\.([^\.]+\.)?amazonaws.com", host)830def is_object_specific_request(path, headers):831 """Return whether the given request is specific to a certain S3 object.832 Note: the bucket name is usually specified as a path parameter,833 but may also be part of the domain name!"""834 bucket_in_domain = is_bucket_specified_in_domain_name(path, headers)835 parts = len(path.split("/"))836 return parts > (1 if bucket_in_domain else 2)837def empty_response():838 response = Response()839 response.status_code = 200840 response._content = ""841 return response842def handle_notification_request(bucket, method, data):843 if method == "GET":844 return handle_get_bucket_notification(bucket)845 if method == "PUT":846 return handle_put_bucket_notification(bucket, data)847 return empty_response()848def handle_get_bucket_notification(bucket):849 response = Response()850 response.status_code = 200851 response._content = ""852 # TODO check if bucket exists853 result = '<NotificationConfiguration xmlns="%s">' % XMLNS_S3854 if bucket in S3_NOTIFICATIONS:855 notifs = S3_NOTIFICATIONS[bucket]856 for notif in notifs:857 for dest in NOTIFICATION_DESTINATION_TYPES:858 if dest in notif:859 dest_dict = {860 "%sConfiguration"861 % dest: {862 "Id": notif["Id"],863 dest: notif[dest],864 "Event": notif["Event"],865 "Filter": notif["Filter"],866 }867 }868 result += xmltodict.unparse(dest_dict, full_document=False)869 result += "</NotificationConfiguration>"870 response._content = result871 return response872def _validate_filter_rules(filter_doc):873 rules = filter_doc.get("FilterRule")874 if not rules:875 return876 for rule in rules:877 name = rule.get("Name", "")878 if name.lower() not in ["suffix", "prefix"]:879 raise InvalidFilterRuleName(name)880 # TODO: check what other rules there are881def _sanitize_notification_filter_rules(filter_doc):882 rules = filter_doc.get("FilterRule")883 if not rules:884 return885 for rule in rules:886 name = rule.get("Name", "")887 if name.lower() not in ["suffix", "prefix"]:888 raise InvalidFilterRuleName(name)889 rule["Name"] = name.title()890def handle_put_bucket_notification(bucket, data):891 parsed = xmltodict.parse(data)892 notif_config = parsed.get("NotificationConfiguration")893 notifications = []894 for dest in NOTIFICATION_DESTINATION_TYPES:895 config = notif_config.get("%sConfiguration" % dest)896 configs = config if isinstance(config, list) else [config] if config else []897 for config in configs:898 events = config.get("Event")899 if isinstance(events, six.string_types):900 events = [events]901 event_filter = config.get("Filter", {})902 # make sure FilterRule is an array903 s3_filter = _get_s3_filter(event_filter)904 if s3_filter and not isinstance(s3_filter.get("FilterRule", []), list):905 s3_filter["FilterRule"] = [s3_filter["FilterRule"]]906 # make sure FilterRules are valid and sanitize if necessary907 _sanitize_notification_filter_rules(s3_filter)908 # create final details dict909 notification_details = {910 "Id": config.get("Id", str(uuid.uuid4())),911 "Event": events,912 dest: config.get(dest),913 "Filter": event_filter,914 }915 notifications.append(clone(notification_details))916 S3_NOTIFICATIONS[bucket] = notifications917 return empty_response()918def remove_bucket_notification(bucket):919 if bucket in S3_NOTIFICATIONS:920 del S3_NOTIFICATIONS[bucket]921class ProxyListenerS3(PersistingProxyListener):922 def api_name(self):923 return "s3"924 @staticmethod925 def is_s3_copy_request(headers, path):926 return "x-amz-copy-source" in headers or "x-amz-copy-source" in path927 @staticmethod928 def is_create_multipart_request(query):929 return query.startswith("uploads")930 @staticmethod931 def is_multipart_upload(query):932 return query.startswith("uploadId")933 @staticmethod934 def get_201_response(key, bucket_name):935 return """936 <PostResponse>937 <Location>{protocol}://{host}/{encoded_key}</Location>938 <Bucket>{bucket}</Bucket>939 <Key>{key}</Key>940 <ETag>{etag}</ETag>941 </PostResponse>942 """.format(943 protocol=get_service_protocol(),944 host=config.HOSTNAME_EXTERNAL,945 encoded_key=urlparse.quote(key, safe=""),946 key=key,947 bucket=bucket_name,948 etag="d41d8cd98f00b204e9800998ecf8427f",949 )950 @staticmethod951 def _update_location(content, bucket_name):952 bucket_name = normalize_bucket_name(bucket_name)953 host = config.HOSTNAME_EXTERNAL954 if ":" not in host:955 host = "%s:%s" % (host, config.PORT_S3)956 return re.sub(957 r"<Location>\s*([a-zA-Z0-9\-]+)://[^/]+/([^<]+)\s*</Location>",958 r"<Location>%s://%s/%s/\2</Location>" % (get_service_protocol(), host, bucket_name),959 content,960 flags=re.MULTILINE,961 )962 @staticmethod963 def is_query_allowable(method, query):964 # Generally if there is a query (some/path/with?query) we don't want to send notifications965 if not query:966 return True967 # Except we do want to notify on multipart and presigned url upload completion968 contains_cred = "X-Amz-Credential" in query and "X-Amz-Signature" in query969 contains_key = "AWSAccessKeyId" in query and "Signature" in query970 # nodejs sdk putObjectCommand is adding x-id=putobject in the query971 allowed_query = "x-id=" in query.lower()972 if (973 (method == "POST" and query.startswith("uploadId"))974 or contains_cred975 or contains_key976 or allowed_query977 ):978 return True979 @staticmethod980 def parse_policy_expiration_date(expiration_string):981 try:982 dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT1)983 except Exception:984 dt = datetime.datetime.strptime(expiration_string, POLICY_EXPIRATION_FORMAT2)985 # both date formats assume a UTC timezone ('Z' suffix), but it's not parsed as tzinfo into the datetime object986 dt = dt.replace(tzinfo=datetime.timezone.utc)987 return dt988 def forward_request(self, method, path, data, headers):989 # Create list of query parameteres from the url990 parsed = urlparse.urlparse("{}{}".format(config.get_edge_url(), path))991 query_params = parse_qs(parsed.query)992 path_orig = path993 path = path.replace(994 "#", "%23"995 ) # support key names containing hashes (e.g., required by Amplify)996 # extracting bucket name from the request997 parsed_path = urlparse.urlparse(path)998 bucket_name = extract_bucket_name(headers, parsed_path.path)999 if method == "PUT" and bucket_name and not re.match(BUCKET_NAME_REGEX, bucket_name):1000 if len(parsed_path.path) <= 1:1001 return error_response(1002 "Unable to extract valid bucket name. Please ensure that your AWS SDK is "1003 + "configured to use path style addressing, or send a valid "1004 + '<Bucket>.s3.localhost.localstack.cloud "Host" header',1005 "InvalidBucketName",1006 status_code=400,1007 )1008 return error_response(1009 "The specified bucket is not valid.",1010 "InvalidBucketName",1011 status_code=400,1012 )1013 # Detecting pre-sign url and checking signature1014 if any([p in query_params for p in SIGNATURE_V2_PARAMS]) or any(1015 [p in query_params for p in SIGNATURE_V4_PARAMS]1016 ):1017 response = authenticate_presign_url(1018 method=method, path=path, data=data, headers=headers1019 )1020 if response is not None:1021 return response1022 # handling s3 website hosting requests1023 if is_static_website(headers) and method == "GET":1024 return serve_static_website(headers=headers, path=path, bucket_name=bucket_name)1025 # check content md5 hash integrity if not a copy request or multipart initialization1026 if (1027 "Content-MD5" in headers1028 and not self.is_s3_copy_request(headers, path)1029 and not self.is_create_multipart_request(parsed_path.query)1030 ):1031 response = check_content_md5(data, headers)1032 if response is not None:1033 return response1034 modified_data = None1035 # TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-11036 to_find1 = to_bytes("<LocationConstraint>us-east-1</LocationConstraint>")1037 to_find2 = to_bytes("<CreateBucketConfiguration")1038 if data and data.startswith(to_bytes("<")) and to_find1 in data and to_find2 in data:1039 # Note: with the latest version, <CreateBucketConfiguration> must either1040 # contain a valid <LocationConstraint>, or not be present at all in the body.1041 modified_data = b""1042 # If this request contains streaming v4 authentication signatures, strip them from the message1043 # Related isse: https://github.com/localstack/localstack/issues/981044 # TODO: can potentially be removed after this fix in moto: https://github.com/spulec/moto/pull/42011045 is_streaming_payload = headers.get(CONTENT_SHA256_HEADER) == STREAMING_HMAC_PAYLOAD1046 if is_streaming_payload:1047 modified_data = strip_chunk_signatures(not_none_or(modified_data, data))1048 headers["Content-Length"] = headers.get("x-amz-decoded-content-length")1049 headers.pop(CONTENT_SHA256_HEADER)1050 # POST requests to S3 may include a "${filename}" placeholder in the1051 # key, which should be replaced with an actual file name before storing.1052 if method == "POST":1053 original_data = not_none_or(modified_data, data)1054 expanded_data = multipart_content.expand_multipart_filename(original_data, headers)1055 if expanded_data is not original_data:1056 modified_data = expanded_data1057 # If no content-type is provided, 'binary/octet-stream' should be used1058 # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1059 if method == "PUT" and not headers.get("content-type"):1060 headers["content-type"] = "binary/octet-stream"1061 # parse query params1062 query = parsed_path.query1063 path = parsed_path.path1064 query_map = urlparse.parse_qs(query, keep_blank_values=True)1065 # remap metadata query params (not supported in moto) to request headers1066 append_metadata_headers(method, query_map, headers)1067 # apply fixes1068 headers_changed = fix_metadata_key_underscores(request_headers=headers)1069 if query == "notification" or "notification" in query_map:1070 # handle and return response for ?notification request1071 response = handle_notification_request(bucket_name, method, data)1072 return response1073 # if the Expires key in the url is already expired then return error1074 if method == "GET" and "Expires" in query_map:1075 ts = datetime.datetime.fromtimestamp(1076 int(query_map.get("Expires")[0]), tz=datetime.timezone.utc1077 )1078 if is_expired(ts):1079 return token_expired_error(path, headers.get("x-amz-request-id"), 400)1080 # If multipart POST with policy in the params, return error if the policy has expired...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful