Best Python code snippet using localstack_python
impl.py
Source:impl.py
...45 @available46 def delete_s3_object(self, s3_path):47 bucket, prefix = self.split_s3_path(s3_path)48 creds = self.get_creds()49 boto3_session = get_boto3_session(creds.region_name, creds.aws_profile_name)50 s3_client = boto3_session.client("s3")51 s3_resource = boto3_session.resource("s3")52 if self.s3_path_exists(s3_path, s3_client):53 logger.info(f"Delete objects from bucket={bucket}, prefix={prefix}")54 s3_resource.Bucket(bucket).objects.filter(Prefix=prefix).delete()55 @available56 def s3_table_location(self, schema_name: str, table_name: str) -> str:57 creds = self.get_creds()58 if creds.s3_data_dir is not None:59 s3_path = creds.s3_data_dir.format(60 schema_name=schema_name, table_name=table_name61 )62 return s3_path63 else:64 raise ValueError("s3_data_dir is required for the profile config")65 @available66 def clean_up_partitions(67 self, database_name: str, table_name: str, where_condition: str68 ):69 # Look up Glue partitions & clean up70 creds = self.get_creds()71 boto3_session = get_boto3_session(creds.region_name, creds.aws_profile_name)72 with boto3_client_lock:73 glue_client = boto3_session.client("glue")74 s3_resource = boto3_session.resource("s3")75 partitions = glue_client.get_partitions(76 # CatalogId='123456789012', # Need to make this configurable if it is different from default AWS Account ID77 DatabaseName=database_name,78 TableName=table_name,79 Expression=where_condition,80 )81 p = re.compile("s3://([^/]*)/(.*)")82 for partition in partitions["Partitions"]:83 logger.debug(84 "Deleting objects for partition '{}' at '{}'",85 partition["Values"],86 partition["StorageDescriptor"]["Location"],87 )88 m = p.match(partition["StorageDescriptor"]["Location"])89 if m is not None:90 bucket_name = m.group(1)91 prefix = m.group(2)92 s3_bucket = s3_resource.Bucket(bucket_name)93 s3_bucket.objects.filter(Prefix=prefix).delete()94 @available95 def clean_up_table(self, database_name: str, table_name: str):96 # Look up Glue partitions & clean up97 creds = self.get_creds()98 boto3_session = get_boto3_session(creds.region_name, creds.aws_profile_name)99 with boto3_client_lock:100 glue_client = boto3_session.client("glue")101 try:102 table = glue_client.get_table(DatabaseName=database_name, Name=table_name)103 except ClientError as e:104 if e.response["Error"]["Code"] == "EntityNotFoundException":105 logger.debug("Table '{}' does not exists - Ignoring", table_name)106 return107 if table is not None:108 logger.debug(109 "Deleting table data from'{}'",110 table["Table"]["StorageDescriptor"]["Location"],111 )112 p = re.compile("s3://([^/]*)/(.*)")...
schema_validator.py
Source:schema_validator.py
1from abc import ABC, abstractmethod2import boto33import awswrangler as wr # Ensure Lambda has an AWS Wrangler Layer configured4from ..commons import init_logger5logger = init_logger(__name__)6class GlueSchemaValidator(ABC):7 """8 Abstract class to validate objects against Glue Table schemas.9 """10 def __init__(self, boto3_session=None):11 """12 Initializes Glue client based on the supplied or a new session.13 Args:14 boto3_session: Boto3 session15 """16 self.boto3_session = boto3_session17 # Reuse session or create a default one18 self.glue_client = boto3_session.client(19 'glue') if boto3_session else boto3.client('glue')20 def _get_table_parameters(self, database_name, table_name):21 """22 Returns the parameters of the Glue table23 Args:24 database_name: Glue database name25 table_name: Glue table name26 Returns: dict of table parameters27 """28 return self.glue_client.get_table(29 DatabaseName=database_name, Name=table_name)['Table']['Parameters']30 def _get_table_schema(self, database_name, table_name):31 """32 Returns column names and respective types of the Glue table33 Args:34 database_name: Glue database name35 table_name: Glue table name36 Returns: list of dicts of a form { 'Name': ..., 'Type': ...}37 """38 return self.glue_client.get_table(39 DatabaseName=database_name, Name=table_name)['Table']['StorageDescriptor']['Columns']40 @abstractmethod41 def validate(self, prefix, keys, database_name, table_name):42 """43 Validates the object(s) against the Glue schema44 Args:45 prefix: S3 prefix46 keys: list of S3 keys47 database_name: Glue database name48 table_name: Glue table name49 Returns: validation result: True or False50 """51 pass52class ParquetSchemaValidator(GlueSchemaValidator):53 def validate(self, prefix, keys, database_name, table_name):54 """55 Validates the Parquet S3 object(s) against the Glue schema56 Args:57 prefix: S3 prefix58 keys: list of S3 keys59 database_name: Glue database name60 table_name: Glue table name61 Returns: validation result: True or False62 """63 # Retrieve table parameters64 table_parameters = self._get_table_parameters(65 database_name, table_name)66 logger.info(f"Table parameters: {table_parameters}")67 # Parse table parameters68 validate_schema, validate_latest = self._parse_table_parameters(69 table_parameters)70 if validate_schema:71 # Retrieve table schema72 # Columns must be sorted in order to compare the schema because Parquet73 # does not respect the order74 table_schema = sorted(75 self._get_table_schema(database_name, table_name), key=lambda x: x['Name'])76 logger.info(f"Table schema: {table_schema}")77 # Retrieve object schema78 object_schema = self._get_object_schema(79 prefix, keys, validate_latest)80 logger.info(81 f"Object prefix: {prefix}, keys: {keys}, schema: {object_schema}")82 if table_schema != object_schema:83 return False84 return True85 def _get_object_schema(self, prefix, keys, get_latest):86 """87 Retrieves object schema from a Parquet file88 Args:89 prefix: S3 prefix90 keys: list of S3 keys91 get_latest: Flag to get only latest object (otherwise - get all available objects)92 Returns: list of dicts of a form { 'Name': ..., 'Type': ...}93 """94 # Retrieve object metadata95 s3_objects = wr.s3.describe_objects(96 path=keys if keys and len(keys) > 0 else prefix,97 boto3_session=self.boto3_session98 )99 # Sort by last modified date and filter out empty objects100 object_keys = [object_key for object_key, object_metadata in sorted(101 s3_objects.items(),102 key=lambda k_v: k_v[1]['LastModified']103 ) if object_metadata['ContentLength'] > 0]104 # Retrieve Parquet metadata105 column_types, _ = wr.s3.read_parquet_metadata(106 path=object_keys[0] if get_latest else object_keys,107 boto3_session=self.boto3_session108 )109 # Columns must be sorted in order to compare the schema because Parquet110 # does not respect the order111 return sorted(112 list({'Name': name.lower(), 'Type': type}113 for name, type in column_types.items()),114 key=lambda x: x['Name'])115 @staticmethod116 def _parse_table_parameters(table_parameters):117 """118 Retrieves specific table parameters and provides defaults119 Args:120 table_parameters: table parameters dict121 Returns: tuple of Booleans validate_schema, validate_latest122 """123 try:124 validate_schema = True \125 if table_parameters and table_parameters['validate_schema'] == 'true' else False126 except KeyError:127 validate_schema = False128 try:129 validate_latest = True \130 if table_parameters and table_parameters['validate_latest'] == 'true' else False131 except KeyError:132 validate_latest = False...
secrets.py
Source:secrets.py
1import json2import os3import boto34from logger import logger5def export(boto3_session=None, **params):6 secret = fetch(boto3_session, **params)7 os.environ.update(**secret)8def fetch(boto3_session=None, **params):9 boto3_session = boto3_session or boto3.Session()10 secrets = boto3_session.client('secretsmanager')11 logger.info('GET SECRET %s', json.dumps(params))12 secret = json.loads(secrets.get_secret_value(**params)['SecretString'])...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!