Best Python code snippet using localstack_python
s3_listener.py
Source:s3_listener.py
...1020 if response is not None:1021 return response1022 # handling s3 website hosting requests1023 if is_static_website(headers) and method == "GET":1024 return serve_static_website(headers=headers, path=path, bucket_name=bucket_name)1025 # check content md5 hash integrity if not a copy request or multipart initialization1026 if (1027 "Content-MD5" in headers1028 and not self.is_s3_copy_request(headers, path)1029 and not self.is_create_multipart_request(parsed_path.query)1030 ):1031 response = check_content_md5(data, headers)1032 if response is not None:1033 return response1034 modified_data = None1035 # TODO: For some reason, moto doesn't allow us to put a location constraint on us-east-11036 to_find1 = to_bytes("<LocationConstraint>us-east-1</LocationConstraint>")1037 to_find2 = to_bytes("<CreateBucketConfiguration")1038 if data and data.startswith(to_bytes("<")) and to_find1 in data and to_find2 in data:1039 # Note: with the latest version, <CreateBucketConfiguration> must either1040 # contain a valid <LocationConstraint>, or not be present at all in the body.1041 modified_data = b""1042 # If this request contains streaming v4 authentication signatures, strip them from the message1043 # Related isse: https://github.com/localstack/localstack/issues/981044 # TODO: can potentially be removed after this fix in moto: https://github.com/spulec/moto/pull/42011045 is_streaming_payload = headers.get(CONTENT_SHA256_HEADER) == STREAMING_HMAC_PAYLOAD1046 if is_streaming_payload:1047 modified_data = strip_chunk_signatures(not_none_or(modified_data, data))1048 headers["Content-Length"] = headers.get("x-amz-decoded-content-length")1049 headers.pop(CONTENT_SHA256_HEADER)1050 # POST requests to S3 may include a "${filename}" placeholder in the1051 # key, which should be replaced with an actual file name before storing.1052 if method == "POST":1053 original_data = not_none_or(modified_data, data)1054 expanded_data = multipart_content.expand_multipart_filename(original_data, headers)1055 if expanded_data is not original_data:1056 modified_data = expanded_data1057 # If no content-type is provided, 'binary/octet-stream' should be used1058 # src: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html1059 if method == "PUT" and not headers.get("content-type"):1060 headers["content-type"] = "binary/octet-stream"1061 # parse query params1062 query = parsed_path.query1063 path = parsed_path.path1064 query_map = urlparse.parse_qs(query, keep_blank_values=True)1065 # remap metadata query params (not supported in moto) to request headers1066 append_metadata_headers(method, query_map, headers)1067 # apply fixes1068 headers_changed = fix_metadata_key_underscores(request_headers=headers)1069 if query == "notification" or "notification" in query_map:1070 # handle and return response for ?notification request1071 response = handle_notification_request(bucket_name, method, data)1072 return response1073 # if the Expires key in the url is already expired then return error1074 if method == "GET" and "Expires" in query_map:1075 ts = datetime.datetime.fromtimestamp(1076 int(query_map.get("Expires")[0]), tz=datetime.timezone.utc1077 )1078 if is_expired(ts):1079 return token_expired_error(path, headers.get("x-amz-request-id"), 400)1080 # If multipart POST with policy in the params, return error if the policy has expired1081 if method == "POST":1082 policy_key, policy_value = multipart_content.find_multipart_key_value(1083 data, headers, "policy"1084 )1085 if policy_key and policy_value:1086 policy = json.loads(base64.b64decode(policy_value).decode("utf-8"))1087 expiration_string = policy.get("expiration", None) # Example: 2020-06-05T13:37:12Z1088 if expiration_string:1089 expiration_datetime = self.parse_policy_expiration_date(expiration_string)1090 if is_expired(expiration_datetime):1091 return token_expired_error(path, headers.get("x-amz-request-id"), 400)1092 if query == "cors" or "cors" in query_map:1093 if method == "GET":1094 return get_cors(bucket_name)1095 if method == "PUT":1096 return set_cors(bucket_name, data)1097 if method == "DELETE":1098 return delete_cors(bucket_name)1099 if query == "requestPayment" or "requestPayment" in query_map:1100 if method == "GET":1101 return get_request_payment(bucket_name)1102 if method == "PUT":1103 return set_request_payment(bucket_name, data)1104 if query == "lifecycle" or "lifecycle" in query_map:1105 if method == "GET":1106 return get_lifecycle(bucket_name)1107 if method == "PUT":1108 return set_lifecycle(bucket_name, data)1109 if method == "DELETE":1110 delete_lifecycle(bucket_name)1111 if query == "replication" or "replication" in query_map:1112 if method == "GET":1113 return get_replication(bucket_name)1114 if method == "PUT":1115 return set_replication(bucket_name, data)1116 if method == "DELETE" and validate_bucket_name(bucket_name):1117 delete_lifecycle(bucket_name)1118 path_orig_escaped = path_orig.replace("#", "%23")1119 if modified_data is not None or headers_changed or path_orig != path_orig_escaped:1120 data_to_return = not_none_or(modified_data, data)1121 if modified_data is not None:1122 headers["Content-Length"] = str(len(data_to_return or ""))1123 return Request(1124 url=path_orig_escaped,1125 data=data_to_return,1126 headers=headers,1127 method=method,1128 )1129 return True1130 def return_response(self, method, path, data, headers, response, request_handler=None):1131 path = to_str(path)1132 method = to_str(method)1133 path = path.replace("#", "%23")1134 # persist this API call to disk1135 super(ProxyListenerS3, self).return_response(1136 method, path, data, headers, response, request_handler1137 )1138 bucket_name = extract_bucket_name(headers, path)1139 # POST requests to S3 may include a success_action_redirect or1140 # success_action_status field, which should be used to redirect a1141 # client to a new location.1142 key = None1143 if method == "POST":1144 key, redirect_url = multipart_content.find_multipart_key_value(data, headers)1145 if key and redirect_url:1146 response.status_code = 3031147 response.headers["Location"] = expand_redirect_url(redirect_url, key, bucket_name)1148 LOGGER.debug(1149 "S3 POST {} to {}".format(response.status_code, response.headers["Location"])1150 )1151 expanded_data = multipart_content.expand_multipart_filename(data, headers)1152 key, status_code = multipart_content.find_multipart_key_value(1153 expanded_data, headers, "success_action_status"1154 )1155 if response.status_code == 201 and key:1156 response._content = self.get_201_response(key, bucket_name)1157 response.headers["Content-Length"] = str(len(response._content or ""))1158 response.headers["Content-Type"] = "application/xml; charset=utf-8"1159 return response1160 if response.status_code == 416:1161 if method == "GET":1162 return error_response(1163 "The requested range cannot be satisfied.", "InvalidRange", 4161164 )1165 elif method == "HEAD":1166 response.status_code = 2001167 return response1168 parsed = urlparse.urlparse(path)1169 bucket_name_in_host = uses_host_addressing(headers)1170 should_send_notifications = all(1171 [1172 method in ("PUT", "POST", "DELETE"),1173 "/" in path[1:] or bucket_name_in_host or key,1174 # check if this is an actual put object request, because it could also be1175 # a put bucket request with a path like this: /bucket_name/1176 bucket_name_in_host1177 or key1178 or (len(path[1:].split("/")) > 1 and len(path[1:].split("/")[1]) > 0),1179 self.is_query_allowable(method, parsed.query),1180 ]1181 )1182 # get subscribers and send bucket notifications1183 if should_send_notifications:1184 # if we already have a good key, use it, otherwise examine the path1185 if key:1186 object_path = "/" + key1187 elif bucket_name_in_host:1188 object_path = parsed.path1189 else:1190 parts = parsed.path[1:].split("/", 1)1191 object_path = parts[1] if parts[1][0] == "/" else "/%s" % parts[1]1192 version_id = response.headers.get("x-amz-version-id", None)1193 send_notifications(method, bucket_name, object_path, version_id, headers)1194 # publish event for creation/deletion of buckets:1195 if method in ("PUT", "DELETE") and (1196 "/" not in path[1:] or len(path[1:].split("/")[1]) <= 01197 ):1198 event_type = (1199 event_publisher.EVENT_S3_CREATE_BUCKET1200 if method == "PUT"1201 else event_publisher.EVENT_S3_DELETE_BUCKET1202 )1203 event_publisher.fire_event(1204 event_type, payload={"n": event_publisher.get_hash(bucket_name)}1205 )1206 # fix an upstream issue in moto S3 (see https://github.com/localstack/localstack/issues/382)1207 if method == "PUT":1208 if parsed.query == "policy":1209 response._content = ""1210 response.status_code = 2041211 return response1212 # when creating s3 bucket using aws s3api the return header contains 'Location' param1213 if key is None:1214 # if the bucket is created in 'us-east-1' the location header contains bucket as path1215 # else the the header contains bucket url1216 if aws_stack.get_region() == "us-east-1":1217 response.headers["Location"] = "/{}".format(bucket_name)1218 else:1219 # Note: we need to set the correct protocol here1220 protocol = (1221 headers.get(constants.HEADER_LOCALSTACK_EDGE_URL, "").split("://")[0]1222 or "http"1223 )1224 response.headers["Location"] = "{}://{}.{}:{}/".format(1225 protocol,1226 bucket_name,1227 constants.S3_VIRTUAL_HOSTNAME,1228 config.EDGE_PORT,1229 )1230 if response is not None:1231 reset_content_length = False1232 # append CORS headers and other annotations/patches to response1233 append_cors_headers(1234 bucket_name,1235 request_method=method,1236 request_headers=headers,1237 response=response,1238 )1239 append_last_modified_headers(response=response)1240 append_list_objects_marker(method, path, data, response)1241 fix_location_constraint(response)1242 fix_range_content_type(bucket_name, path, headers, response)1243 fix_delete_objects_response(bucket_name, method, parsed, data, headers, response)1244 fix_metadata_key_underscores(response=response)1245 fix_creation_date(method, path, response=response)1246 fix_etag_for_multipart(data, headers, response)1247 ret304_on_etag(data, headers, response)1248 append_aws_request_troubleshooting_headers(response)1249 fix_delimiter(data, headers, response)1250 if method == "PUT":1251 set_object_expiry(path, headers)1252 # Remove body from PUT response on presigned URL1253 # https://github.com/localstack/localstack/issues/13171254 if (1255 method == "PUT"1256 and int(response.status_code) < 4001257 and (1258 "X-Amz-Security-Token=" in path1259 or "X-Amz-Credential=" in path1260 or "AWSAccessKeyId=" in path1261 )1262 ):1263 response._content = ""1264 reset_content_length = True1265 response_content_str = None1266 try:1267 response_content_str = to_str(response._content)1268 except Exception:1269 pass1270 # Honor response header overrides1271 # https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html1272 if method == "GET":1273 add_accept_range_header(response)1274 add_response_metadata_headers(response)1275 if is_object_expired(path):1276 return no_such_key_error(path, headers.get("x-amz-request-id"), 400)1277 # AWS C# SDK uses get bucket acl to check the existence of the bucket1278 # If not exists, raises a NoSuchBucket Error1279 if bucket_name and "/?acl" in path:1280 exists, code, body = is_bucket_available(bucket_name)1281 if not exists:1282 return no_such_bucket(bucket_name, headers.get("x-amz-request-id"), 404)1283 query_map = urlparse.parse_qs(parsed.query, keep_blank_values=True)1284 for param_name, header_name in ALLOWED_HEADER_OVERRIDES.items():1285 if param_name in query_map:1286 response.headers[header_name] = query_map[param_name][0]1287 if response_content_str and response_content_str.startswith("<"):1288 is_bytes = isinstance(response._content, six.binary_type)1289 response._content = response_content_str1290 append_last_modified_headers(response=response, content=response_content_str)1291 # We need to un-pretty-print the XML, otherwise we run into this issue with Spark:1292 # https://github.com/jserver/mock-s3/pull/9/files1293 # https://github.com/localstack/localstack/issues/1831294 # Note: yet, we need to make sure we have a newline after the first line: <?xml ...>\n1295 # Note: make sure to return XML docs verbatim: https://github.com/localstack/localstack/issues/10371296 if method != "GET" or not is_object_specific_request(path, headers):1297 response._content = re.sub(1298 r"([^\?])>\n\s*<",1299 r"\1><",1300 response_content_str,1301 flags=re.MULTILINE,1302 )1303 # update Location information in response payload1304 response._content = self._update_location(response._content, bucket_name)1305 # convert back to bytes1306 if is_bytes:1307 response._content = to_bytes(response._content)1308 # fix content-type: https://github.com/localstack/localstack/issues/6181309 # https://github.com/localstack/localstack/issues/5491310 # https://github.com/localstack/localstack/issues/8541311 if "text/html" in response.headers.get(1312 "Content-Type", ""1313 ) and not response_content_str.lower().startswith("<!doctype html"):1314 response.headers["Content-Type"] = "application/xml; charset=utf-8"1315 reset_content_length = True1316 # update Content-Length headers (fix https://github.com/localstack/localstack/issues/541)1317 if method == "DELETE":1318 reset_content_length = True1319 if reset_content_length:1320 response.headers["Content-Length"] = str(len(response._content or ""))1321 # convert to chunked encoding, for compatibility with certain SDKs (e.g., AWS PHP SDK)1322 convert_to_chunked_encoding(method, path, response)1323def serve_static_website(headers, path, bucket_name):1324 s3_client = aws_stack.connect_to_service("s3")1325 # check if bucket exists1326 try:1327 s3_client.head_bucket(Bucket=bucket_name)1328 except ClientError:1329 return no_such_bucket(bucket_name, headers.get("x-amz-request-id"), 404)1330 def respond_with_key(status_code, key):1331 obj = s3_client.get_object(Bucket=bucket_name, Key=key)1332 response_headers = {}1333 if "if-none-match" in headers and "ETag" in obj and obj["ETag"] in headers["if-none-match"]:1334 return requests_response(status_code=304, content="", headers=response_headers)1335 if "WebsiteRedirectLocation" in obj:1336 response_headers["location"] = obj["WebsiteRedirectLocation"]1337 return requests_response(status_code=301, content="", headers=response_headers)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!