Best Python code snippet using localstack_python
test_s3.py
Source:test_s3.py
1#2# Licensed to the Apache Software Foundation (ASF) under one3# or more contributor license agreements. See the NOTICE file4# distributed with this work for additional information5# regarding copyright ownership. The ASF licenses this file6# to you under the Apache License, Version 2.0 (the7# "License"); you may not use this file except in compliance8# with the License. You may obtain a copy of the License at9#10# http://www.apache.org/licenses/LICENSE-2.011#12# Unless required by applicable law or agreed to in writing,13# software distributed under the License is distributed on an14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY15# KIND, either express or implied. See the License for the16# specific language governing permissions and limitations17# under the License.18#19import gzip as gz20import os21import tempfile22from unittest import mock23from unittest.mock import Mock24import boto325import pytest26from botocore.exceptions import ClientError, NoCredentialsError27from airflow.exceptions import AirflowException28from airflow.models import Connection29from airflow.providers.amazon.aws.hooks.s3 import S3Hook, provide_bucket_name, unify_bucket_name_and_key30try:31 from moto import mock_s332except ImportError:33 mock_s3 = None34@pytest.mark.skipif(mock_s3 is None, reason='moto package not present')35class TestAwsS3Hook:36 @mock_s337 def test_get_conn(self):38 hook = S3Hook()39 assert hook.get_conn() is not None40 def test_parse_s3_url(self):41 parsed = S3Hook.parse_s3_url("s3://test/this/is/not/a-real-key.txt")42 assert parsed == ("test", "this/is/not/a-real-key.txt"), "Incorrect parsing of the s3 url"43 def test_check_for_bucket(self, s3_bucket):44 hook = S3Hook()45 assert hook.check_for_bucket(s3_bucket) is True46 assert hook.check_for_bucket('not-a-bucket') is False47 def test_check_for_bucket_raises_error_with_invalid_conn_id(self, s3_bucket, monkeypatch):48 monkeypatch.delenv('AWS_PROFILE', raising=False)49 monkeypatch.delenv('AWS_ACCESS_KEY_ID', raising=False)50 monkeypatch.delenv('AWS_SECRET_ACCESS_KEY', raising=False)51 hook = S3Hook(aws_conn_id="does_not_exist")52 with pytest.raises(NoCredentialsError):53 hook.check_for_bucket(s3_bucket)54 @mock_s355 def test_get_bucket(self):56 hook = S3Hook()57 assert hook.get_bucket('bucket') is not None58 @mock_s359 def test_create_bucket_default_region(self):60 hook = S3Hook()61 hook.create_bucket(bucket_name='new_bucket')62 assert hook.get_bucket('new_bucket') is not None63 @mock_s364 def test_create_bucket_us_standard_region(self, monkeypatch):65 monkeypatch.delenv('AWS_DEFAULT_REGION', raising=False)66 hook = S3Hook()67 hook.create_bucket(bucket_name='new_bucket', region_name='us-east-1')68 bucket = hook.get_bucket('new_bucket')69 assert bucket is not None70 region = bucket.meta.client.get_bucket_location(Bucket=bucket.name).get('LocationConstraint')71 # https://github.com/spulec/moto/pull/196172 # If location is "us-east-1", LocationConstraint should be None73 assert region is None74 @mock_s375 def test_create_bucket_other_region(self):76 hook = S3Hook()77 hook.create_bucket(bucket_name='new_bucket', region_name='us-east-2')78 bucket = hook.get_bucket('new_bucket')79 assert bucket is not None80 region = bucket.meta.client.get_bucket_location(Bucket=bucket.name).get('LocationConstraint')81 assert region == 'us-east-2'82 def test_check_for_prefix(self, s3_bucket):83 hook = S3Hook()84 bucket = hook.get_bucket(s3_bucket)85 bucket.put_object(Key='a', Body=b'a')86 bucket.put_object(Key='dir/b', Body=b'b')87 assert hook.check_for_prefix(bucket_name=s3_bucket, prefix='dir/', delimiter='/') is True88 assert hook.check_for_prefix(bucket_name=s3_bucket, prefix='a', delimiter='/') is False89 def test_list_prefixes(self, s3_bucket):90 hook = S3Hook()91 bucket = hook.get_bucket(s3_bucket)92 bucket.put_object(Key='a', Body=b'a')93 bucket.put_object(Key='dir/b', Body=b'b')94 assert [] == hook.list_prefixes(s3_bucket, prefix='non-existent/')95 assert ['dir/'] == hook.list_prefixes(s3_bucket, delimiter='/')96 assert ['a'] == hook.list_keys(s3_bucket, delimiter='/')97 assert ['dir/b'] == hook.list_keys(s3_bucket, prefix='dir/')98 def test_list_prefixes_paged(self, s3_bucket):99 hook = S3Hook()100 bucket = hook.get_bucket(s3_bucket)101 # we don't need to test the paginator that's covered by boto tests102 keys = ["%s/b" % i for i in range(2)]103 dirs = ["%s/" % i for i in range(2)]104 for key in keys:105 bucket.put_object(Key=key, Body=b'a')106 assert sorted(dirs) == sorted(hook.list_prefixes(s3_bucket, delimiter='/', page_size=1))107 def test_list_keys(self, s3_bucket):108 hook = S3Hook()109 bucket = hook.get_bucket(s3_bucket)110 bucket.put_object(Key='a', Body=b'a')111 bucket.put_object(Key='dir/b', Body=b'b')112 assert [] == hook.list_keys(s3_bucket, prefix='non-existent/')113 assert ['a', 'dir/b'] == hook.list_keys(s3_bucket)114 assert ['a'] == hook.list_keys(s3_bucket, delimiter='/')115 assert ['dir/b'] == hook.list_keys(s3_bucket, prefix='dir/')116 def test_list_keys_paged(self, s3_bucket):117 hook = S3Hook()118 bucket = hook.get_bucket(s3_bucket)119 keys = [str(i) for i in range(2)]120 for key in keys:121 bucket.put_object(Key=key, Body=b'a')122 assert sorted(keys) == sorted(hook.list_keys(s3_bucket, delimiter='/', page_size=1))123 def test_check_for_key(self, s3_bucket):124 hook = S3Hook()125 bucket = hook.get_bucket(s3_bucket)126 bucket.put_object(Key='a', Body=b'a')127 assert hook.check_for_key('a', s3_bucket) is True128 assert hook.check_for_key(f's3://{s3_bucket}//a') is True129 assert hook.check_for_key('b', s3_bucket) is False130 assert hook.check_for_key(f's3://{s3_bucket}//b') is False131 def test_check_for_key_raises_error_with_invalid_conn_id(self, monkeypatch, s3_bucket):132 monkeypatch.delenv('AWS_PROFILE', raising=False)133 monkeypatch.delenv('AWS_ACCESS_KEY_ID', raising=False)134 monkeypatch.delenv('AWS_SECRET_ACCESS_KEY', raising=False)135 hook = S3Hook(aws_conn_id="does_not_exist")136 with pytest.raises(NoCredentialsError):137 hook.check_for_key('a', s3_bucket)138 def test_get_key(self, s3_bucket):139 hook = S3Hook()140 bucket = hook.get_bucket(s3_bucket)141 bucket.put_object(Key='a', Body=b'a')142 assert hook.get_key('a', s3_bucket).key == 'a'143 assert hook.get_key(f's3://{s3_bucket}/a').key == 'a'144 def test_read_key(self, s3_bucket):145 hook = S3Hook()146 bucket = hook.get_bucket(s3_bucket)147 bucket.put_object(Key='my_key', Body=b'Cont\xC3\xA9nt')148 assert hook.read_key('my_key', s3_bucket) == 'Contént'149 # As of 1.3.2, Moto doesn't support select_object_content yet.150 @mock.patch('airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.get_client_type')151 def test_select_key(self, mock_get_client_type, s3_bucket):152 mock_get_client_type.return_value.select_object_content.return_value = {153 'Payload': [{'Records': {'Payload': b'Cont\xC3\xA9nt'}}]154 }155 hook = S3Hook()156 assert hook.select_key('my_key', s3_bucket) == 'Contént'157 def test_check_for_wildcard_key(self, s3_bucket):158 hook = S3Hook()159 bucket = hook.get_bucket(s3_bucket)160 bucket.put_object(Key='abc', Body=b'a')161 bucket.put_object(Key='a/b', Body=b'a')162 assert hook.check_for_wildcard_key('a*', s3_bucket) is True163 assert hook.check_for_wildcard_key('abc', s3_bucket) is True164 assert hook.check_for_wildcard_key(f's3://{s3_bucket}//a*') is True165 assert hook.check_for_wildcard_key(f's3://{s3_bucket}//abc') is True166 assert hook.check_for_wildcard_key('a', s3_bucket) is False167 assert hook.check_for_wildcard_key('b', s3_bucket) is False168 assert hook.check_for_wildcard_key(f's3://{s3_bucket}//a') is False169 assert hook.check_for_wildcard_key(f's3://{s3_bucket}//b') is False170 def test_get_wildcard_key(self, s3_bucket):171 hook = S3Hook()172 bucket = hook.get_bucket(s3_bucket)173 bucket.put_object(Key='abc', Body=b'a')174 bucket.put_object(Key='a/b', Body=b'a')175 # The boto3 Class API is _odd_, and we can't do an isinstance check as176 # each instance is a different class, so lets just check one property177 # on S3.Object. Not great but...178 assert hook.get_wildcard_key('a*', s3_bucket).key == 'a/b'179 assert hook.get_wildcard_key('a*', s3_bucket, delimiter='/').key == 'abc'180 assert hook.get_wildcard_key('abc', s3_bucket, delimiter='/').key == 'abc'181 assert hook.get_wildcard_key(f's3://{s3_bucket}/a*').key == 'a/b'182 assert hook.get_wildcard_key(f's3://{s3_bucket}/a*', delimiter='/').key == 'abc'183 assert hook.get_wildcard_key(f's3://{s3_bucket}/abc', delimiter='/').key == 'abc'184 assert hook.get_wildcard_key('a', s3_bucket) is None185 assert hook.get_wildcard_key('b', s3_bucket) is None186 assert hook.get_wildcard_key(f's3://{s3_bucket}/a') is None187 assert hook.get_wildcard_key(f's3://{s3_bucket}/b') is None188 def test_load_string(self, s3_bucket):189 hook = S3Hook()190 hook.load_string("Contént", "my_key", s3_bucket)191 resource = boto3.resource('s3').Object(s3_bucket, 'my_key') # pylint: disable=no-member192 assert resource.get()['Body'].read() == b'Cont\xC3\xA9nt'193 def test_load_string_acl(self, s3_bucket):194 hook = S3Hook()195 hook.load_string("Contént", "my_key", s3_bucket, acl_policy='public-read')196 response = boto3.client('s3').get_object_acl(Bucket=s3_bucket, Key="my_key", RequestPayer='requester')197 assert (response['Grants'][1]['Permission'] == 'READ') and (198 response['Grants'][0]['Permission'] == 'FULL_CONTROL'199 )200 def test_load_bytes(self, s3_bucket):201 hook = S3Hook()202 hook.load_bytes(b"Content", "my_key", s3_bucket)203 resource = boto3.resource('s3').Object(s3_bucket, 'my_key') # pylint: disable=no-member204 assert resource.get()['Body'].read() == b'Content'205 def test_load_bytes_acl(self, s3_bucket):206 hook = S3Hook()207 hook.load_bytes(b"Content", "my_key", s3_bucket, acl_policy='public-read')208 response = boto3.client('s3').get_object_acl(Bucket=s3_bucket, Key="my_key", RequestPayer='requester')209 assert (response['Grants'][1]['Permission'] == 'READ') and (210 response['Grants'][0]['Permission'] == 'FULL_CONTROL'211 )212 def test_load_fileobj(self, s3_bucket):213 hook = S3Hook()214 with tempfile.TemporaryFile() as temp_file:215 temp_file.write(b"Content")216 temp_file.seek(0)217 hook.load_file_obj(temp_file, "my_key", s3_bucket)218 resource = boto3.resource('s3').Object(s3_bucket, 'my_key') # pylint: disable=no-member219 assert resource.get()['Body'].read() == b'Content'220 def test_load_fileobj_acl(self, s3_bucket):221 hook = S3Hook()222 with tempfile.TemporaryFile() as temp_file:223 temp_file.write(b"Content")224 temp_file.seek(0)225 hook.load_file_obj(temp_file, "my_key", s3_bucket, acl_policy='public-read')226 response = boto3.client('s3').get_object_acl(227 Bucket=s3_bucket, Key="my_key", RequestPayer='requester'228 ) # pylint: disable=no-member # noqa: E501 # pylint: disable=C0301229 assert (response['Grants'][1]['Permission'] == 'READ') and (230 response['Grants'][0]['Permission'] == 'FULL_CONTROL'231 )232 def test_load_file_gzip(self, s3_bucket):233 hook = S3Hook()234 with tempfile.NamedTemporaryFile(delete=False) as temp_file:235 temp_file.write(b"Content")236 temp_file.seek(0)237 hook.load_file(temp_file.name, "my_key", s3_bucket, gzip=True)238 resource = boto3.resource('s3').Object(s3_bucket, 'my_key') # pylint: disable=no-member239 assert gz.decompress(resource.get()['Body'].read()) == b'Content'240 os.unlink(temp_file.name)241 def test_load_file_acl(self, s3_bucket):242 hook = S3Hook()243 with tempfile.NamedTemporaryFile(delete=False) as temp_file:244 temp_file.write(b"Content")245 temp_file.seek(0)246 hook.load_file(temp_file.name, "my_key", s3_bucket, gzip=True, acl_policy='public-read')247 response = boto3.client('s3').get_object_acl(248 Bucket=s3_bucket, Key="my_key", RequestPayer='requester'249 ) # pylint: disable=no-member # noqa: E501 # pylint: disable=C0301250 assert (response['Grants'][1]['Permission'] == 'READ') and (251 response['Grants'][0]['Permission'] == 'FULL_CONTROL'252 )253 os.unlink(temp_file.name)254 def test_copy_object_acl(self, s3_bucket):255 hook = S3Hook()256 with tempfile.NamedTemporaryFile() as temp_file:257 temp_file.write(b"Content")258 temp_file.seek(0)259 hook.load_file_obj(temp_file, "my_key", s3_bucket)260 hook.copy_object("my_key", "my_key", s3_bucket, s3_bucket)261 response = boto3.client('s3').get_object_acl(262 Bucket=s3_bucket, Key="my_key", RequestPayer='requester'263 ) # pylint: disable=no-member # noqa: E501 # pylint: disable=C0301264 assert (response['Grants'][0]['Permission'] == 'FULL_CONTROL') and (len(response['Grants']) == 1)265 @mock_s3266 def test_delete_bucket_if_bucket_exist(self, s3_bucket):267 # assert if the bucket is created268 mock_hook = S3Hook()269 mock_hook.create_bucket(bucket_name=s3_bucket)270 assert mock_hook.check_for_bucket(bucket_name=s3_bucket)271 mock_hook.delete_bucket(bucket_name=s3_bucket, force_delete=True)272 assert not mock_hook.check_for_bucket(s3_bucket)273 @mock_s3274 def test_delete_bucket_if_not_bucket_exist(self, s3_bucket):275 # assert if exception is raised if bucket not present276 mock_hook = S3Hook()277 with pytest.raises(ClientError) as error:278 # assert error279 assert mock_hook.delete_bucket(bucket_name=s3_bucket, force_delete=True)280 assert error.value.response['Error']['Code'] == 'NoSuchBucket'281 @mock.patch.object(S3Hook, 'get_connection', return_value=Connection(schema='test_bucket'))282 def test_provide_bucket_name(self, mock_get_connection):283 class FakeS3Hook(S3Hook):284 @provide_bucket_name285 def test_function(self, bucket_name=None):286 return bucket_name287 fake_s3_hook = FakeS3Hook()288 test_bucket_name = fake_s3_hook.test_function()289 assert test_bucket_name == mock_get_connection.return_value.schema290 test_bucket_name = fake_s3_hook.test_function(bucket_name='bucket')291 assert test_bucket_name == 'bucket'292 def test_delete_objects_key_does_not_exist(self, s3_bucket):293 hook = S3Hook()294 with pytest.raises(AirflowException) as err:295 hook.delete_objects(bucket=s3_bucket, keys=['key-1'])296 assert isinstance(err.value, AirflowException)297 assert str(err.value) == "Errors when deleting: ['key-1']"298 def test_delete_objects_one_key(self, mocked_s3_res, s3_bucket):299 key = 'key-1'300 mocked_s3_res.Object(s3_bucket, key).put(Body=b'Data')301 hook = S3Hook()302 hook.delete_objects(bucket=s3_bucket, keys=[key])303 assert [o.key for o in mocked_s3_res.Bucket(s3_bucket).objects.all()] == []304 def test_delete_objects_many_keys(self, mocked_s3_res, s3_bucket):305 num_keys_to_remove = 1001306 keys = []307 for index in range(num_keys_to_remove):308 key = f'key-{index}'309 mocked_s3_res.Object(s3_bucket, key).put(Body=b'Data')310 keys.append(key)311 assert sum(1 for _ in mocked_s3_res.Bucket(s3_bucket).objects.all()) == num_keys_to_remove312 hook = S3Hook()313 hook.delete_objects(bucket=s3_bucket, keys=keys)314 assert [o.key for o in mocked_s3_res.Bucket(s3_bucket).objects.all()] == []315 def test_unify_bucket_name_and_key(self):316 class FakeS3Hook(S3Hook):317 @unify_bucket_name_and_key318 def test_function_with_wildcard_key(self, wildcard_key, bucket_name=None):319 return bucket_name, wildcard_key320 @unify_bucket_name_and_key321 def test_function_with_key(self, key, bucket_name=None):322 return bucket_name, key323 @unify_bucket_name_and_key324 def test_function_with_test_key(self, test_key, bucket_name=None):325 return bucket_name, test_key326 fake_s3_hook = FakeS3Hook()327 test_bucket_name_with_wildcard_key = fake_s3_hook.test_function_with_wildcard_key('s3://foo/bar*.csv')328 assert ('foo', 'bar*.csv') == test_bucket_name_with_wildcard_key329 test_bucket_name_with_key = fake_s3_hook.test_function_with_key('s3://foo/bar.csv')330 assert ('foo', 'bar.csv') == test_bucket_name_with_key331 with pytest.raises(ValueError) as err:332 fake_s3_hook.test_function_with_test_key('s3://foo/bar.csv')333 assert isinstance(err.value, ValueError)334 @mock.patch('airflow.providers.amazon.aws.hooks.s3.NamedTemporaryFile')335 def test_download_file(self, mock_temp_file):336 mock_temp_file.return_value.__enter__ = Mock(return_value=mock_temp_file)337 s3_hook = S3Hook(aws_conn_id='s3_test')338 s3_hook.check_for_key = Mock(return_value=True)339 s3_obj = Mock()340 s3_obj.download_fileobj = Mock(return_value=None)341 s3_hook.get_key = Mock(return_value=s3_obj)342 key = 'test_key'343 bucket = 'test_bucket'344 s3_hook.download_file(key=key, bucket_name=bucket)345 s3_hook.check_for_key.assert_called_once_with(key, bucket)346 s3_hook.get_key.assert_called_once_with(key, bucket)347 s3_obj.download_fileobj.assert_called_once_with(mock_temp_file)348 def test_generate_presigned_url(self, s3_bucket):349 hook = S3Hook()350 presigned_url = hook.generate_presigned_url(351 client_method="get_object", params={'Bucket': s3_bucket, 'Key': "my_key"}352 )353 url = presigned_url.split("?")[1]354 params = {x[0]: x[1] for x in [x.split("=") for x in url[0:].split("&")]}355 assert {"AWSAccessKeyId", "Signature", "Expires"}.issubset(set(params.keys()))356 def test_should_throw_error_if_extra_args_is_not_dict(self):357 with pytest.raises(ValueError):358 S3Hook(extra_args=1)359 def test_should_throw_error_if_extra_args_contains_unknown_arg(self, s3_bucket):360 hook = S3Hook(extra_args={"unknown_s3_args": "value"})361 with tempfile.TemporaryFile() as temp_file:362 temp_file.write(b"Content")363 temp_file.seek(0)364 with pytest.raises(ValueError):365 hook.load_file_obj(temp_file, "my_key", s3_bucket, acl_policy='public-read')366 def test_should_pass_extra_args(self, s3_bucket):367 hook = S3Hook(extra_args={"ContentLanguage": "value"})368 with tempfile.TemporaryFile() as temp_file:369 temp_file.write(b"Content")370 temp_file.seek(0)371 hook.load_file_obj(temp_file, "my_key", s3_bucket, acl_policy='public-read')372 resource = boto3.resource('s3').Object(s3_bucket, 'my_key') # pylint: disable=no-member...
common.py
Source:common.py
1#!/usr/bin/env python32# -*- coding: utf-8 -*-3from io import BufferedReader, BufferedWriter4import boto35import paramiko6from boto3.exceptions import S3UploadFailedError7from botocore.config import Config8from botocore.exceptions import NoRegionError, ClientError9from dataeng.utils.data_type import represent_int10def get_s3_client(logger, session, config=None):11 """12 Gets an instance of S3 client.13 :param logger: logger14 :param session: AWS session15 :param config: Boto config16 :type logger: logging.Logger17 :type session: boto3.session.Session18 :type: config: botocore.config.Config19 :returns: S3 client20 :rtype: botocore.client.S321 """22 if not config:23 config = Config(retries=dict(max_attempts=10))24 try:25 logger.debug("Getting S3 client.")26 return session.client("s3", config=config)27 except NoRegionError as e:28 logger.error("Unable to get S3 client - {error}".format(error=e.__str__()))29def get_s3_resource(logger, session, config=None):30 """31 Gets an instance of S3 resource.32 :param logger: logger33 :param session: AWS session34 :param config: Boto config35 :type logger: logging.Logger36 :type session: boto3.session.Session37 :type: config: botocore.config.Config38 :returns: S3 resource39 :rtype: boto3.resources.factory.s3.ServiceResource40 """41 if not config:42 config = Config(retries=dict(max_attempts=10))43 try:44 logger.debug("Getting S3 resource.")45 return session.resource("s3", config=config)46 except NoRegionError as e:47 logger.error("Unable to get S3 resource - {error}".format(error=e.__str__()))48def is_s3_key_exists(logger, s3_resource, s3_bucket, s3_key):49 """50 Checks the existence of S3 key.51 :param logger: logger52 :param s3_resource: S3 resource53 :param s3_bucket: S3 bucket54 :param s3_key: S3 key55 :type logger: logging.Logger56 :type s3_resource: boto3.resources.factory.s3.ServiceResource57 :type s3_bucket: str58 :type s3_key: str59 :returns: True if key exists, False if key does not exist, None if unable to determine60 :rtype: bool61 """62 try:63 logger.debug("Checking the existence of s3://{s3_bucket}/{s3_key}".format(s3_bucket=s3_bucket, s3_key=s3_key))64 objs = list(s3_resource.Bucket(s3_bucket).objects.filter(Prefix=s3_key))65 return len(objs) > 0 and objs[0].key == s3_key66 except ClientError as e:67 if represent_int(e.response["Error"]["Code"]):68 error_code = int(e.response["Error"]["Code"])69 if error_code == 403:70 logger.debug("Bucket {s3_bucket} is forbidden".format(s3_bucket=s3_bucket))71 return False72 logger.warning("Unable to determine S3 key {s3_key} - {error}".format(s3_key=s3_key, error=e.__str__()))73def is_s3_bucket_exists(logger, s3_resource, s3_bucket):74 """75 Checks the existence of S3 bucket.76 :param logger: logger77 :param s3_resource: S3 resource78 :param s3_bucket: S3 bucket79 :type logger: logging.Logger80 :type s3_resource: boto3.resources.factory.s3.ServiceResource81 :type s3_bucket: str82 :returns: True if bucket exists, False if bucket does not exist or forbidden, None if unable to determine83 :rtype: bool84 """85 try:86 logger.debug("Checking the existence of s3://{s3_bucket}".format(s3_bucket=s3_bucket))87 s3_resource.meta.client.head_bucket(Bucket=s3_bucket)88 return True89 except ClientError as e:90 if represent_int(e.response["Error"]["Code"]):91 error_code = int(e.response["Error"]["Code"])92 if error_code == 403:93 logger.debug("Bucket {s3_bucket} is forbidden".format(s3_bucket=s3_bucket))94 return False95 elif error_code == 404:96 logger.debug("Bucket {s3_bucket} does not exist".format(s3_bucket=s3_bucket))97 return False98 logger.warning(99 "Unable to determine S3 bucket {s3_bucket} - {error}".format(s3_bucket=s3_bucket, error=e.__str__()))100def upload_s3_object(logger, s3_resource, f, s3_bucket, s3_key, callback=None):101 """102 Uploads file object to S3.103 :param logger: logger104 :param s3_resource: S3 resource105 :param f: file path or file object106 :param s3_bucket: S3 bucket107 :param s3_key: S3 key108 :param callback: callback for monitoring progress109 :type logger: logging.Logger110 :type s3_resource: boto3.resources.factory.s3.ServiceResource111 :type f: str | io.BufferedReader | paramiko.SFTPFile112 :type s3_bucket: str 113 :type s3_key: str114 :type callback: typing.Callables115 """116 try:117 if isinstance(f, str):118 logger.debug(119 "Uploading {f} to s3://{s3_bucket}/{s3_key}".format(f=f, s3_bucket=s3_bucket, s3_key=s3_key))120 s3_resource.meta.client.upload_file(f, s3_bucket, s3_key, Callback=callback)121 elif isinstance(f, BufferedReader) or isinstance(f, paramiko.SFTPFile):122 logger.debug(123 "Uploading a file object to s3://{s3_bucket}/{s3_key}".format(s3_bucket=s3_bucket, s3_key=s3_key))124 s3_resource.meta.client.upload_fileobj(f, s3_bucket, s3_key, Callback=callback)125 else:126 logger.error("Invalid input type for upload - {input_type}".format(input_type=type(f).__name__))127 except S3UploadFailedError as e:128 logger.warning(129 "Unable to upload file to s3://{s3_bucket}/{s3_key} - {error}".format(s3_bucket=s3_bucket, s3_key=s3_key,130 error=e.__str__()))131def download_s3_object(logger, s3_resource, f, s3_bucket, s3_key, callback=None):132 """133 Downloads file object from S3.134 :param logger: logger135 :param s3_resource: S3 resource136 :param f: file path or file object137 :param s3_bucket: S3 bucket138 :param s3_key: S3 key139 :param callback: callback for monitoring progress140 :type logger: logging.Logger141 :type s3_resource: boto3.resources.factory.s3.ServiceResource142 :type f: str | io.BufferedWriter143 :type s3_bucket: str 144 :type s3_key: str145 :type callback: typing.Callables146 """147 try:148 if isinstance(f, str):149 logger.debug("Downloading s3://{s3_bucket}/{s3_key} to {f}".format(s3_bucket=s3_bucket, s3_key=s3_key, f=f))150 s3_resource.meta.client.download_file(s3_bucket, s3_key, f, Callback=callback)151 elif isinstance(f, BufferedWriter):152 logger.debug(153 "Downloading s3://{s3_bucket}/{s3_key} to a file object".format(s3_bucket=s3_bucket, s3_key=s3_key))154 s3_resource.meta.client.download_fileobj(s3_bucket, s3_key, f, Callback=callback)155 else:156 logger.error("Invalid input type for download - {input_type}".format(input_type=type(f).__name__))157 except ClientError as e:158 if represent_int(e.response["Error"]["Code"]):159 error_code = int(e.response["Error"]["Code"])160 if error_code == 403:161 logger.debug(162 "S3 object s3://{s3_bucket}/{s3_key} is forbidden".format(s3_bucket=s3_bucket, s3_key=s3_key))163 return164 elif error_code == 404:165 logger.debug(166 "S3 object s3://{s3_bucket}/{s3_key} does not exist".format(s3_bucket=s3_bucket, s3_key=s3_key))167 return168 logger.warning(169 "Unable to download s3://{s3_bucket}/{s3_key} - {error}".format(s3_bucket=s3_bucket, s3_key=s3_key,170 error=e.__str__()))171def delete_s3_object(logger, s3_resource, s3_bucket, s3_key):172 """173 Deletes an S3 object.174 :param logger: logger175 :param s3_resource: S3 resource176 :param s3_bucket: S3 bucket177 :param s3_key: S3 key178 :type logger: logging.Logger179 :type s3_resource: boto3.resources.factory.s3.ServiceResource180 :type s3_bucket: str181 :type s3_key: str182 """183 try:184 logger.debug("Deleting s3://{s3_bucket}/{s3_key}".format(s3_bucket=s3_bucket, s3_key=s3_key))185 s3_resource.Object(s3_bucket, s3_key).delete()186 except ClientError as e:187 logger.warning("Unable to delete S3 key {s3_key} - {error}".format(s3_key=s3_key, error=e.__str__()))188def delete_s3_prefix(logger, s3_resource, s3_bucket, s3_prefix):189 """190 Deletes an S3 prefix.191 :param logger: logger192 :param s3_resource: S3 resource193 :param s3_bucket: S3 bucket194 :param s3_prefix: S3 prefix195 :type logger: logging.Logger196 :type s3_resource: boto3.resources.factory.s3.ServiceResource197 :type s3_bucket: str198 :type s3_prefix: str199 """200 try:201 logger.debug("Deleting s3://{s3_bucket}/{s3_prefix}/".format(s3_bucket=s3_bucket, s3_prefix=s3_prefix))202 s3_resource.Bucket(s3_bucket).objects.filter(Prefix=s3_prefix).delete()203 s3_resource.Object(bucket_name=s3_bucket, key=s3_prefix).delete()204 except ClientError as e:205 if represent_int(e.response["Error"]["Code"]):206 error_code = int(e.response["Error"]["Code"])207 if error_code == 403:208 logger.debug(209 "s3://{s3_bucket}/{s3_prefix}/ is forbidden".format(s3_bucket=s3_bucket, s3_prefix=s3_prefix))210 return211 elif error_code == 404:212 logger.debug(213 "s3://{s3_bucket}/{s3_prefix}/ does not exist".format(s3_bucket=s3_bucket, s3_prefix=s3_prefix))214 return215 logger.warning("Unable to delete S3 prefix s3://{s3_bucket}/{s3_prefix} - {error}".format(s3_bucket=s3_bucket,216 s3_prefix=s3_prefix,217 error=e.__str__()))218def list_s3_keys(logger, s3_resource, s3_bucket, s3_prefix="", search="Contents"):219 """220 Lists S3 keys for a given S3 bucket and S3 prefix.221 :param logger: logger222 :param s3_resource: S3 resource223 :param s3_bucket: S3 bucket224 :param s3_prefix: S3 prefix225 :param search: JMESPath search string226 :type logger: logging.Logger227 :type s3_resource: boto3.resources.factory.s3.ServiceResource228 :type s3_bucket: str229 :type s3_prefix: str230 :type search: str231 :returns: list of S3 keys232 :rtype: list233 """234 try:235 if s3_prefix and s3_prefix[0] == "/":236 s3_prefix = s3_prefix[1:]237 if s3_prefix and s3_prefix[len(s3_prefix) - 1] != "/":238 s3_prefix = "{s3_prefix}/".format(s3_prefix=s3_prefix)239 paginator = s3_resource.meta.client.get_paginator("list_objects_v2")240 page_iterator = paginator.paginate(Bucket=s3_bucket, Prefix=s3_prefix)241 if search is not None:242 page_iterator = page_iterator.search(search)243 s3_keys = []244 for key_data in page_iterator:245 if key_data is not None:246 if not key_data["Key"].endswith("/"):247 s3_keys.append(key_data["Key"])248 return s3_keys249 except ClientError as e:250 if represent_int(e.response["Error"]["Code"]):251 error_code = int(e.response["Error"]["Code"])252 if error_code == 403:253 logger.debug(254 "s3://{s3_bucket}/{s3_prefix} is forbidden".format(s3_bucket=s3_bucket, s3_prefix=s3_prefix))255 return256 elif error_code == 404:257 logger.debug(258 "s3://{s3_bucket}/{s3_prefix} does not exist".format(s3_bucket=s3_bucket, s3_prefix=s3_prefix))259 return260 logger.warning("Unable to list S3 keys from s3://{s3_bucket}/{s3_prefix} - {error}".format(s3_bucket=s3_bucket,261 s3_prefix=s3_prefix,262 error=e.__str__()))263def get_s3_object_size(logger, s3_resource, s3_bucket, s3_key):264 """265 Gets S3 object size.266 :param logger: logger267 :param s3_resource: S3 resource268 :param s3_bucket: S3 bucket269 :param s3_prefix: S3 prefix270 :param s3_keys: list of S3 keys271 :type logger: logging.Logger272 :type s3_resource: boto3.resources.factory.s3.ServiceResource273 :type s3_bucket: str274 :type s3_prefix: str275 :type s3_keys: list276 :returns: size of S3 object in bytes277 :rtype: int278 """279 try:280 return s3_resource.meta.client.head_object(Bucket=s3_bucket, Key=s3_key)["ContentLength"]281 except ClientError as e:282 if represent_int(e.response["Error"]["Code"]):283 error_code = int(e.response["Error"]["Code"])284 if error_code == 403:285 logger.debug("s3://{s3_bucket}/{s3_key} is forbidden".format(s3_bucket=s3_bucket, s3_key=s3_key))286 return287 elif error_code == 404:288 logger.debug("s3://{s3_bucket}/{s3_key} does not exist".format(s3_bucket=s3_bucket, s3_key=s3_key))289 return290 logger.warning("Unable to determine get size of s3://{s3_bucket}/{s3_key} - {error}".format(s3_bucket=s3_bucket,291 s3_key=s3_key,...
table_quality_check.py
Source:table_quality_check.py
1import boto32import configparser3import os4import pyspark.sql.functions as F5from pyspark.sql import types as T6from pyspark.sql import SparkSession7from pyspark.sql.functions import udf, col8config = configparser.ConfigParser()9config.read('/home/workspace/dwh.cfg')10os.environ["AWS_ACCESS_KEY_ID"] = config.get("AWS_CREDENTIALS", "AWS_ACCESS_KEY_ID")11os.environ["AWS_SECRET_ACCESS_KEY"] = config.get("AWS_CREDENTIALS", "AWS_SECRET_ACCESS_KEY")12os.environ["s3_bucket"] = config.get("S3", "s3_bucket")13def check(path, table,spark):14 15 print ("======================================")16 checkvar=path + table17 print("Check Activated : " , checkvar)18 19 temp_table = spark.read.parquet(checkvar)20 temp_table.createOrReplaceTempView("temp_table")21 22 temp_table = spark.sql(" SELECT count(*) count FROM temp_table").first()23 print(table ," count :",temp_table[0])24 if (temp_table[0] > 0):25 print ("PASSED")26 else:27 print ("FAILED")28 29 print ("======================================")30 print ("")31 32def create_spark_session():33 """34 Create spark session for processing35 """36 print("Create Spark Session")37 spark = SparkSession \38 .builder \39 .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0") \40 .getOrCreate()41 return spark42def main():43 """44 Main Function to load data to S3 using spark.45 """ 46 #Print S3 bucket location47 s3_bucket=os.environ["s3_bucket"]48 s3_bucket = s3_bucket.replace("'", "")49 50 print (s3_bucket)51 #Invoke Functions to check data 52 check(s3_bucket + "datalake/", "country_table",spark)53 check(s3_bucket + "datalake/", "airport_table",spark)54 check(s3_bucket + "datalake/", "immigration_table",spark)55 check(s3_bucket + "datalake/", "immigrant_table",spark)56 check(s3_bucket + "datalake/", "weather_table",spark)57 check(s3_bucket + "datalake/", "city_state_table",spark)58 check(s3_bucket + "datalake/", "city_weather_table",spark)59 check(s3_bucket + "datalake/", "demographics_city_table",spark)60 check(s3_bucket + "datalake/", "immigration_demographic_table",spark)61 check(s3_bucket + "datalake/", "airports_weather_table",spark)62if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!