Best Python code snippet using localstack_python
test_s3.py
Source:test_s3.py
...2301 # TODO check how much sense it makes to make this url "invalid"...2302 assert (2303 4032304 == requests.get(2305 _make_url_invalid(s3_url, object_key, presigned),2306 data=object_data,2307 headers={"Content-Type": "my-fake-content/type"},2308 ).status_code2309 )2310 # put object valid2311 assert (2312 2002313 == requests.put(2314 _generate_presigned_url(client, simple_params, expires, client_method="put_object"),2315 data=object_data,2316 ).status_code2317 )2318 params = {2319 "Bucket": bucket_name,2320 "Key": object_key,2321 "ContentType": "text/plain",2322 }2323 presigned_put_url = _generate_presigned_url(2324 client, params, expires, client_method="put_object"2325 )2326 assert (2327 2002328 == requests.put(2329 presigned_put_url,2330 data=object_data,2331 headers={"Content-Type": "text/plain"},2332 ).status_code2333 )2334 # Invalid request2335 response = requests.put(2336 _make_url_invalid(s3_url, object_key, presigned_put_url),2337 data=object_data,2338 headers={"Content-Type": "my-fake-content/type"},2339 )2340 assert 403 == response.status_code2341 # DELETE requests2342 presigned_delete_url = _generate_presigned_url(2343 client, simple_params, expires, client_method="delete_object"2344 )2345 response = requests.delete(presigned_delete_url)2346 assert 204 == response.status_code2347class TestS3DeepArchive:2348 """2349 Test to cover DEEP_ARCHIVE Storage Class functionality.2350 """2351 @pytest.mark.aws_validated2352 def test_storage_class_deep_archive(self, s3_client, s3_resource, s3_bucket, tmpdir):2353 key = "my-key"2354 transfer_config = TransferConfig(multipart_threshold=5 * KB, multipart_chunksize=1 * KB)2355 def upload_file(size_in_kb: int):2356 file = tmpdir / f"test-file-{short_uid()}.bin"2357 data = b"1" * (size_in_kb * KB)2358 file.write(data=data, mode="w")2359 s3_client.upload_file(2360 Bucket=s3_bucket,2361 Key=key,2362 Filename=str(file.realpath()),2363 ExtraArgs={"StorageClass": "DEEP_ARCHIVE"},2364 Config=transfer_config,2365 )2366 upload_file(1)2367 upload_file(9)2368 upload_file(15)2369 objects = s3_resource.Bucket(s3_bucket).objects.all()2370 keys = []2371 for obj in objects:2372 keys.append(obj.key)2373 assert obj.storage_class == "DEEP_ARCHIVE"2374def _anon_client(service: str):2375 conf = Config(signature_version=UNSIGNED)2376 if os.environ.get("TEST_TARGET") == "AWS_CLOUD":2377 return boto3.client(service, config=conf, region_name=None)2378 return aws_stack.create_external_boto_client(service, config=conf)2379def _s3_client_custom_config(conf: Config, endpoint_url: str):2380 if os.environ.get("TEST_TARGET") == "AWS_CLOUD":2381 return boto3.client("s3", config=conf, endpoint_url=endpoint_url)2382 # TODO in future this should work with aws_stack.create_external_boto_client2383 # currently it doesn't as authenticate_presign_url_signv2 requires the secret_key to be 'test'2384 # return aws_stack.create_external_boto_client(2385 # "s3",2386 # config=conf,2387 # endpoint_url=endpoint_url,2388 # aws_access_key_id=TEST_AWS_ACCESS_KEY_ID,2389 # )2390 return boto3.client(2391 "s3",2392 endpoint_url=endpoint_url,2393 config=conf,2394 aws_access_key_id=TEST_AWS_ACCESS_KEY_ID,2395 aws_secret_access_key=TEST_AWS_SECRET_ACCESS_KEY,2396 )2397def _endpoint_url(region: str = "", localstack_host: str = None) -> str:2398 if not region:2399 region = config.DEFAULT_REGION2400 if os.environ.get("TEST_TARGET") == "AWS_CLOUD":2401 if region == "us-east-1":2402 return "https://s3.amazonaws.com"2403 else:2404 return f"http://s3.{region}.amazonaws.com"2405 return f"{config.get_edge_url(localstack_hostname=localstack_host or S3_VIRTUAL_HOSTNAME)}"2406def _bucket_url(bucket_name: str, region: str = "", localstack_host: str = None) -> str:2407 return f"{_endpoint_url(region, localstack_host)}/{bucket_name}"2408def _website_bucket_url(bucket_name: str):2409 # TODO depending on region the syntax of the website variy (dot vs dash before region)2410 if os.environ.get("TEST_TARGET") == "AWS_CLOUD":2411 region = config.DEFAULT_REGION2412 return f"http://{bucket_name}.s3-website-{region}.amazonaws.com"2413 return _bucket_url_vhost(bucket_name, localstack_host=constants.S3_STATIC_WEBSITE_HOSTNAME)2414def _bucket_url_vhost(bucket_name: str, region: str = "", localstack_host: str = None) -> str:2415 if not region:2416 region = config.DEFAULT_REGION2417 if os.environ.get("TEST_TARGET") == "AWS_CLOUD":2418 if region == "us-east-1":2419 return f"https://{bucket_name}.s3.amazonaws.com"2420 else:2421 return f"https://{bucket_name}.s3.{region}.amazonaws.com"2422 host = localstack_host or S3_VIRTUAL_HOSTNAME2423 s3_edge_url = config.get_edge_url(localstack_hostname=host)2424 # TODO might add the region here2425 return s3_edge_url.replace(f"://{host}", f"://{bucket_name}.{host}")2426def _generate_presigned_url(2427 client: "S3Client", params: dict, expires: int, client_method: str = "get_object"2428) -> str:2429 return client.generate_presigned_url(2430 client_method,2431 Params=params,2432 ExpiresIn=expires,2433 )2434def _make_url_invalid(url_prefix: str, object_key: str, url: str) -> str:2435 parsed = urlparse(url)2436 query_params = parse_qs(parsed.query)2437 if "Signature" in query_params:2438 # v2 style2439 return "{}/{}?AWSAccessKeyId={}&Signature={}&Expires={}".format(2440 url_prefix,2441 object_key,2442 query_params["AWSAccessKeyId"][0],2443 query_params["Signature"][0],2444 query_params["Expires"][0],2445 )2446 else:2447 # v4 style2448 return (...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!