Best Python code snippet using localstack_python
redshift.py
Source:redshift.py
1import contextlib2import uuid3from datetime import datetime4from typing import (5 Callable,6 ContextManager,7 Dict,8 Iterator,9 List,10 Optional,11 Tuple,12 Union,13)14import numpy as np15import pandas as pd16import pyarrow as pa17from dateutil import parser18from pydantic import StrictStr19from pydantic.typing import Literal20from pytz import utc21from feast import OnDemandFeatureView, RedshiftSource22from feast.data_source import DataSource23from feast.errors import InvalidEntityType24from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView25from feast.infra.offline_stores import offline_utils26from feast.infra.offline_stores.offline_store import (27 OfflineStore,28 RetrievalJob,29 RetrievalMetadata,30)31from feast.infra.offline_stores.redshift_source import SavedDatasetRedshiftStorage32from feast.infra.utils import aws_utils33from feast.registry import Registry34from feast.repo_config import FeastConfigBaseModel, RepoConfig35from feast.saved_dataset import SavedDatasetStorage36from feast.usage import log_exceptions_and_usage37class RedshiftOfflineStoreConfig(FeastConfigBaseModel):38 """ Offline store config for AWS Redshift """39 type: Literal["redshift"] = "redshift"40 """ Offline store type selector"""41 cluster_id: StrictStr42 """ Redshift cluster identifier """43 region: StrictStr44 """ Redshift cluster's AWS region """45 user: StrictStr46 """ Redshift user name """47 database: StrictStr48 """ Redshift database name """49 s3_staging_location: StrictStr50 """ S3 path for importing & exporting data to Redshift """51 iam_role: StrictStr52 """ IAM Role for Redshift, granting it access to S3 """53class RedshiftOfflineStore(OfflineStore):54 @staticmethod55 @log_exceptions_and_usage(offline_store="redshift")56 def pull_latest_from_table_or_query(57 config: RepoConfig,58 data_source: DataSource,59 join_key_columns: List[str],60 feature_name_columns: List[str],61 event_timestamp_column: str,62 created_timestamp_column: Optional[str],63 start_date: datetime,64 end_date: datetime,65 ) -> RetrievalJob:66 assert isinstance(data_source, RedshiftSource)67 assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)68 from_expression = data_source.get_table_query_string()69 partition_by_join_key_string = ", ".join(join_key_columns)70 if partition_by_join_key_string != "":71 partition_by_join_key_string = (72 "PARTITION BY " + partition_by_join_key_string73 )74 timestamp_columns = [event_timestamp_column]75 if created_timestamp_column:76 timestamp_columns.append(created_timestamp_column)77 timestamp_desc_string = " DESC, ".join(timestamp_columns) + " DESC"78 field_string = ", ".join(79 join_key_columns + feature_name_columns + timestamp_columns80 )81 redshift_client = aws_utils.get_redshift_data_client(82 config.offline_store.region83 )84 s3_resource = aws_utils.get_s3_resource(config.offline_store.region)85 start_date = start_date.astimezone(tz=utc)86 end_date = end_date.astimezone(tz=utc)87 query = f"""88 SELECT89 {field_string}90 {f", {repr(DUMMY_ENTITY_VAL)} AS {DUMMY_ENTITY_ID}" if not join_key_columns else ""}91 FROM (92 SELECT {field_string},93 ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row94 FROM {from_expression}95 WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'96 )97 WHERE _feast_row = 198 """99 # When materializing a single feature view, we don't need full feature names. On demand transforms aren't materialized100 return RedshiftRetrievalJob(101 query=query,102 redshift_client=redshift_client,103 s3_resource=s3_resource,104 config=config,105 full_feature_names=False,106 )107 @staticmethod108 @log_exceptions_and_usage(offline_store="redshift")109 def pull_all_from_table_or_query(110 config: RepoConfig,111 data_source: DataSource,112 join_key_columns: List[str],113 feature_name_columns: List[str],114 event_timestamp_column: str,115 start_date: datetime,116 end_date: datetime,117 ) -> RetrievalJob:118 assert isinstance(data_source, RedshiftSource)119 from_expression = data_source.get_table_query_string()120 field_string = ", ".join(121 join_key_columns + feature_name_columns + [event_timestamp_column]122 )123 redshift_client = aws_utils.get_redshift_data_client(124 config.offline_store.region125 )126 s3_resource = aws_utils.get_s3_resource(config.offline_store.region)127 start_date = start_date.astimezone(tz=utc)128 end_date = end_date.astimezone(tz=utc)129 query = f"""130 SELECT {field_string}131 FROM {from_expression}132 WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'133 """134 return RedshiftRetrievalJob(135 query=query,136 redshift_client=redshift_client,137 s3_resource=s3_resource,138 config=config,139 full_feature_names=False,140 )141 @staticmethod142 @log_exceptions_and_usage(offline_store="redshift")143 def get_historical_features(144 config: RepoConfig,145 feature_views: List[FeatureView],146 feature_refs: List[str],147 entity_df: Union[pd.DataFrame, str],148 registry: Registry,149 project: str,150 full_feature_names: bool = False,151 ) -> RetrievalJob:152 assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)153 redshift_client = aws_utils.get_redshift_data_client(154 config.offline_store.region155 )156 s3_resource = aws_utils.get_s3_resource(config.offline_store.region)157 entity_schema = _get_entity_schema(158 entity_df, redshift_client, config, s3_resource159 )160 entity_df_event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df(161 entity_schema162 )163 entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range(164 entity_df, entity_df_event_timestamp_col, redshift_client, config,165 )166 @contextlib.contextmanager167 def query_generator() -> Iterator[str]:168 table_name = offline_utils.get_temp_entity_table_name()169 _upload_entity_df(170 entity_df, redshift_client, config, s3_resource, table_name171 )172 expected_join_keys = offline_utils.get_expected_join_keys(173 project, feature_views, registry174 )175 offline_utils.assert_expected_columns_in_entity_df(176 entity_schema, expected_join_keys, entity_df_event_timestamp_col177 )178 # Build a query context containing all information required to template the Redshift SQL query179 query_context = offline_utils.get_feature_view_query_context(180 feature_refs,181 feature_views,182 registry,183 project,184 entity_df_event_timestamp_range,185 )186 # Generate the Redshift SQL query from the query context187 query = offline_utils.build_point_in_time_query(188 query_context,189 left_table_query_string=table_name,190 entity_df_event_timestamp_col=entity_df_event_timestamp_col,191 entity_df_columns=entity_schema.keys(),192 query_template=MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN,193 full_feature_names=full_feature_names,194 )195 try:196 yield query197 finally:198 # Always clean up the uploaded Redshift table199 aws_utils.execute_redshift_statement(200 redshift_client,201 config.offline_store.cluster_id,202 config.offline_store.database,203 config.offline_store.user,204 f"DROP TABLE IF EXISTS {table_name}",205 )206 return RedshiftRetrievalJob(207 query=query_generator,208 redshift_client=redshift_client,209 s3_resource=s3_resource,210 config=config,211 full_feature_names=full_feature_names,212 on_demand_feature_views=OnDemandFeatureView.get_requested_odfvs(213 feature_refs, project, registry214 ),215 metadata=RetrievalMetadata(216 features=feature_refs,217 keys=list(entity_schema.keys() - {entity_df_event_timestamp_col}),218 min_event_timestamp=entity_df_event_timestamp_range[0],219 max_event_timestamp=entity_df_event_timestamp_range[1],220 ),221 )222class RedshiftRetrievalJob(RetrievalJob):223 def __init__(224 self,225 query: Union[str, Callable[[], ContextManager[str]]],226 redshift_client,227 s3_resource,228 config: RepoConfig,229 full_feature_names: bool,230 on_demand_feature_views: Optional[List[OnDemandFeatureView]] = None,231 metadata: Optional[RetrievalMetadata] = None,232 ):233 """Initialize RedshiftRetrievalJob object.234 Args:235 query: Redshift SQL query to execute. Either a string, or a generator function that handles the artifact cleanup.236 redshift_client: boto3 redshift-data client237 s3_resource: boto3 s3 resource object238 config: Feast repo config239 full_feature_names: Whether to add the feature view prefixes to the feature names240 on_demand_feature_views (optional): A list of on demand transforms to apply at retrieval time241 """242 if not isinstance(query, str):243 self._query_generator = query244 else:245 @contextlib.contextmanager246 def query_generator() -> Iterator[str]:247 assert isinstance(query, str)248 yield query249 self._query_generator = query_generator250 self._redshift_client = redshift_client251 self._s3_resource = s3_resource252 self._config = config253 self._s3_path = (254 self._config.offline_store.s3_staging_location255 + "/unload/"256 + str(uuid.uuid4())257 )258 self._full_feature_names = full_feature_names259 self._on_demand_feature_views = (260 on_demand_feature_views if on_demand_feature_views else []261 )262 self._metadata = metadata263 @property264 def full_feature_names(self) -> bool:265 return self._full_feature_names266 @property267 def on_demand_feature_views(self) -> Optional[List[OnDemandFeatureView]]:268 return self._on_demand_feature_views269 @log_exceptions_and_usage270 def _to_df_internal(self) -> pd.DataFrame:271 with self._query_generator() as query:272 return aws_utils.unload_redshift_query_to_df(273 self._redshift_client,274 self._config.offline_store.cluster_id,275 self._config.offline_store.database,276 self._config.offline_store.user,277 self._s3_resource,278 self._s3_path,279 self._config.offline_store.iam_role,280 query,281 )282 @log_exceptions_and_usage283 def _to_arrow_internal(self) -> pa.Table:284 with self._query_generator() as query:285 return aws_utils.unload_redshift_query_to_pa(286 self._redshift_client,287 self._config.offline_store.cluster_id,288 self._config.offline_store.database,289 self._config.offline_store.user,290 self._s3_resource,291 self._s3_path,292 self._config.offline_store.iam_role,293 query,294 )295 @log_exceptions_and_usage296 def to_s3(self) -> str:297 """ Export dataset to S3 in Parquet format and return path """298 if self.on_demand_feature_views:299 transformed_df = self.to_df()300 aws_utils.upload_df_to_s3(self._s3_resource, self._s3_path, transformed_df)301 return self._s3_path302 with self._query_generator() as query:303 aws_utils.execute_redshift_query_and_unload_to_s3(304 self._redshift_client,305 self._config.offline_store.cluster_id,306 self._config.offline_store.database,307 self._config.offline_store.user,308 self._s3_path,309 self._config.offline_store.iam_role,310 query,311 )312 return self._s3_path313 @log_exceptions_and_usage314 def to_redshift(self, table_name: str) -> None:315 """ Save dataset as a new Redshift table """316 if self.on_demand_feature_views:317 transformed_df = self.to_df()318 aws_utils.upload_df_to_redshift(319 self._redshift_client,320 self._config.offline_store.cluster_id,321 self._config.offline_store.database,322 self._config.offline_store.user,323 self._s3_resource,324 f"{self._config.offline_store.s3_staging_location}/features_df/{table_name}.parquet",325 self._config.offline_store.iam_role,326 table_name,327 transformed_df,328 )329 return330 with self._query_generator() as query:331 query = f'CREATE TABLE "{table_name}" AS ({query});\n'332 aws_utils.execute_redshift_statement(333 self._redshift_client,334 self._config.offline_store.cluster_id,335 self._config.offline_store.database,336 self._config.offline_store.user,337 query,338 )339 def persist(self, storage: SavedDatasetStorage):340 assert isinstance(storage, SavedDatasetRedshiftStorage)341 self.to_redshift(table_name=storage.redshift_options.table)342 @property343 def metadata(self) -> Optional[RetrievalMetadata]:344 return self._metadata345def _upload_entity_df(346 entity_df: Union[pd.DataFrame, str],347 redshift_client,348 config: RepoConfig,349 s3_resource,350 table_name: str,351):352 if isinstance(entity_df, pd.DataFrame):353 # If the entity_df is a pandas dataframe, upload it to Redshift354 aws_utils.upload_df_to_redshift(355 redshift_client,356 config.offline_store.cluster_id,357 config.offline_store.database,358 config.offline_store.user,359 s3_resource,360 f"{config.offline_store.s3_staging_location}/entity_df/{table_name}.parquet",361 config.offline_store.iam_role,362 table_name,363 entity_df,364 )365 elif isinstance(entity_df, str):366 # If the entity_df is a string (SQL query), create a Redshift table out of it367 aws_utils.execute_redshift_statement(368 redshift_client,369 config.offline_store.cluster_id,370 config.offline_store.database,371 config.offline_store.user,372 f"CREATE TABLE {table_name} AS ({entity_df})",373 )374 else:375 raise InvalidEntityType(type(entity_df))376def _get_entity_schema(377 entity_df: Union[pd.DataFrame, str],378 redshift_client,379 config: RepoConfig,380 s3_resource,381) -> Dict[str, np.dtype]:382 if isinstance(entity_df, pd.DataFrame):383 return dict(zip(entity_df.columns, entity_df.dtypes))384 elif isinstance(entity_df, str):385 # get pandas dataframe consisting of 1 row (LIMIT 1) and generate the schema out of it386 entity_df_sample = RedshiftRetrievalJob(387 f"SELECT * FROM ({entity_df}) LIMIT 1",388 redshift_client,389 s3_resource,390 config,391 full_feature_names=False,392 ).to_df()393 return dict(zip(entity_df_sample.columns, entity_df_sample.dtypes))394 else:395 raise InvalidEntityType(type(entity_df))396def _get_entity_df_event_timestamp_range(397 entity_df: Union[pd.DataFrame, str],398 entity_df_event_timestamp_col: str,399 redshift_client,400 config: RepoConfig,401) -> Tuple[datetime, datetime]:402 if isinstance(entity_df, pd.DataFrame):403 entity_df_event_timestamp = entity_df.loc[404 :, entity_df_event_timestamp_col405 ].infer_objects()406 if pd.api.types.is_string_dtype(entity_df_event_timestamp):407 entity_df_event_timestamp = pd.to_datetime(408 entity_df_event_timestamp, utc=True409 )410 entity_df_event_timestamp_range = (411 entity_df_event_timestamp.min().to_pydatetime(),412 entity_df_event_timestamp.max().to_pydatetime(),413 )414 elif isinstance(entity_df, str):415 # If the entity_df is a string (SQL query), determine range416 # from table417 statement_id = aws_utils.execute_redshift_statement(418 redshift_client,419 config.offline_store.cluster_id,420 config.offline_store.database,421 config.offline_store.user,422 f"SELECT MIN({entity_df_event_timestamp_col}) AS min, MAX({entity_df_event_timestamp_col}) AS max "423 f"FROM ({entity_df})",424 )425 res = aws_utils.get_redshift_statement_result(redshift_client, statement_id)[426 "Records"427 ][0]428 entity_df_event_timestamp_range = (429 parser.parse(res[0]["stringValue"]),430 parser.parse(res[1]["stringValue"]),431 )432 else:433 raise InvalidEntityType(type(entity_df))434 return entity_df_event_timestamp_range435# This query is based on sdk/python/feast/infra/offline_stores/bigquery.py:MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN436# There are couple of changes from BigQuery:437# 1. Use VARCHAR instead of STRING type438# 2. Use "t - x * interval '1' second" instead of "Timestamp_sub(...)"439# 3. Replace `SELECT * EXCEPT (...)` with `SELECT *`, because `EXCEPT` is not supported by Redshift.440# Instead, we drop the column later after creating the table out of the query.441# We need to keep this query in sync with BigQuery.442MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """443/*444 Compute a deterministic hash for the `left_table_query_string` that will be used throughout445 all the logic as the field to GROUP BY the data446*/447WITH entity_dataframe AS (448 SELECT *,449 {{entity_df_event_timestamp_col}} AS entity_timestamp450 {% for featureview in featureviews %}451 {% if featureview.entities %}452 ,(453 {% for entity in featureview.entities %}454 CAST({{entity}} as VARCHAR) ||455 {% endfor %}456 CAST({{entity_df_event_timestamp_col}} AS VARCHAR)457 ) AS {{featureview.name}}__entity_row_unique_id458 {% else %}459 ,CAST({{entity_df_event_timestamp_col}} AS VARCHAR) AS {{featureview.name}}__entity_row_unique_id460 {% endif %}461 {% endfor %}462 FROM {{ left_table_query_string }}463),464{% for featureview in featureviews %}465{{ featureview.name }}__entity_dataframe AS (466 SELECT467 {{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}468 entity_timestamp,469 {{featureview.name}}__entity_row_unique_id470 FROM entity_dataframe471 GROUP BY472 {{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}473 entity_timestamp,474 {{featureview.name}}__entity_row_unique_id475),476/*477 This query template performs the point-in-time correctness join for a single feature set table478 to the provided entity table.479 1. We first join the current feature_view to the entity dataframe that has been passed.480 This JOIN has the following logic:481 - For each row of the entity dataframe, only keep the rows where the `event_timestamp_column`482 is less than the one provided in the entity dataframe483 - If there a TTL for the current feature_view, also keep the rows where the `event_timestamp_column`484 is higher the the one provided minus the TTL485 - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been486 computed previously487 The output of this CTE will contain all the necessary information and already filtered out most488 of the data that is not relevant.489*/490{{ featureview.name }}__subquery AS (491 SELECT492 {{ featureview.event_timestamp_column }} as event_timestamp,493 {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}494 {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}495 {% for feature in featureview.features %}496 {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}497 {% endfor %}498 FROM {{ featureview.table_subquery }}499 WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'500 {% if featureview.ttl == 0 %}{% else %}501 AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}'502 {% endif %}503),504{{ featureview.name }}__base AS (505 SELECT506 subquery.*,507 entity_dataframe.entity_timestamp,508 entity_dataframe.{{featureview.name}}__entity_row_unique_id509 FROM {{ featureview.name }}__subquery AS subquery510 INNER JOIN {{ featureview.name }}__entity_dataframe AS entity_dataframe511 ON TRUE512 AND subquery.event_timestamp <= entity_dataframe.entity_timestamp513 {% if featureview.ttl == 0 %}{% else %}514 AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - {{ featureview.ttl }} * interval '1' second515 {% endif %}516 {% for entity in featureview.entities %}517 AND subquery.{{ entity }} = entity_dataframe.{{ entity }}518 {% endfor %}519),520/*521 2. If the `created_timestamp_column` has been set, we need to522 deduplicate the data first. This is done by calculating the523 `MAX(created_at_timestamp)` for each event_timestamp.524 We then join the data on the next CTE525*/526{% if featureview.created_timestamp_column %}527{{ featureview.name }}__dedup AS (528 SELECT529 {{featureview.name}}__entity_row_unique_id,530 event_timestamp,531 MAX(created_timestamp) as created_timestamp532 FROM {{ featureview.name }}__base533 GROUP BY {{featureview.name}}__entity_row_unique_id, event_timestamp534),535{% endif %}536/*537 3. The data has been filtered during the first CTE "*__base"538 Thus we only need to compute the latest timestamp of each feature.539*/540{{ featureview.name }}__latest AS (541 SELECT542 event_timestamp,543 {% if featureview.created_timestamp_column %}created_timestamp,{% endif %}544 {{featureview.name}}__entity_row_unique_id545 FROM546 (547 SELECT *,548 ROW_NUMBER() OVER(549 PARTITION BY {{featureview.name}}__entity_row_unique_id550 ORDER BY event_timestamp DESC{% if featureview.created_timestamp_column %},created_timestamp DESC{% endif %}551 ) AS row_number552 FROM {{ featureview.name }}__base553 {% if featureview.created_timestamp_column %}554 INNER JOIN {{ featureview.name }}__dedup555 USING ({{featureview.name}}__entity_row_unique_id, event_timestamp, created_timestamp)556 {% endif %}557 )558 WHERE row_number = 1559),560/*561 4. Once we know the latest value of each feature for a given timestamp,562 we can join again the data back to the original "base" dataset563*/564{{ featureview.name }}__cleaned AS (565 SELECT base.*566 FROM {{ featureview.name }}__base as base567 INNER JOIN {{ featureview.name }}__latest568 USING(569 {{featureview.name}}__entity_row_unique_id,570 event_timestamp571 {% if featureview.created_timestamp_column %}572 ,created_timestamp573 {% endif %}574 )575){% if loop.last %}{% else %}, {% endif %}576{% endfor %}577/*578 Joins the outputs of multiple time travel joins to a single table.579 The entity_dataframe dataset being our source of truth here.580 */581SELECT {{ final_output_feature_names | join(', ')}}582FROM entity_dataframe583{% for featureview in featureviews %}584LEFT JOIN (585 SELECT586 {{featureview.name}}__entity_row_unique_id587 {% for feature in featureview.features %}588 ,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}589 {% endfor %}590 FROM {{ featureview.name }}__cleaned591) USING ({{featureview.name}}__entity_row_unique_id)592{% endfor %}...
manage_dwh.py
Source:manage_dwh.py
1import argparse2import configparser3import json4import time5import boto36def create_clients(access_key_id, secret_access_key, region_name):7 """ Creates boto3 resource for EC2 and clients for IAM and Redshift.8 9 Args:10 access_key_id (str): AWS access key id11 secret_access_key (str): AWS secret access key12 region_name (str): AWS region name13 14 Returns:15 ec2_resource: boto3 resource for EC216 iam_client: boto3 client for IAM17 redshift_client: boto3 client for Redshift18 """19 ec2_resource = boto3.resource('ec2', region_name = region_name,20 aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)21 iam_client = boto3.client('iam', region_name = region_name,22 aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)23 redshift_client = boto3.client('redshift', region_name = region_name,24 aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key)25 return ec2_resource, iam_client, redshift_client26def check_iam_role_exists(iam_client, iam_role_name):27 """ Check if given IAM role exists.28 29 Args:30 iam_client (boto3 IAM client)31 iam_role_name (str): IAM role name32 33 Returns: True or False34 """35 try:36 iam_client.get_role(RoleName=iam_role_name)37 return True38 except iam_client.exceptions.NoSuchEntityException:39 return False40def delete_iam_role(iam_client, iam_role_name):41 """ Deletes given IAM role if it exists.42 43 Args:44 iam_client (boto3 IAM client)45 iam_role_name (str): IAM role name46 """47 # Delete attached policies to the IAM role48 response = iam_client.list_attached_role_policies(RoleName=iam_role_name)49 attached_policies = response['AttachedPolicies']50 for attached_policy in attached_policies:51 iam_client.detach_role_policy(RoleName=iam_role_name,52 PolicyArn=attached_policy['PolicyArn'])53 # Delete the IAM role54 iam_client.delete_role(RoleName=iam_role_name)55 print('IAM role \'{}\' is deleted along with its attached policies'.format(iam_role_name))56def create_iam_role_with_policy(iam_client, iam_role_name, policy_arn):57 """ Creates a new IAM role and attaches the given policy.58 59 Args:60 iam_client (boto3 IAM client)61 iam_role_name (str): IAM role name62 policy_arn (str): IAM policy ARN63 """64 # if the given role already exists, delete it65 if check_iam_role_exists(iam_client, iam_role_name):66 print('IAM role \'{}\' already exists, will delete it'.format(iam_role_name))67 delete_iam_role(iam_client, iam_role_name)68 # Create a new IAM role69 new_iam_role = iam_client.create_role(70 Path='/',71 RoleName=iam_role_name,72 AssumeRolePolicyDocument=json.dumps(73 {"Version": "2012-10-17",74 "Statement": [{ "Effect": "Allow",75 "Principal": { "Service": ["redshift.amazonaws.com"] },76 "Action": ["sts:AssumeRole"] }]77 }78 ),79 Description='Allows Redshift clusters to call AWS services on your behalf',80 )81 print('New IAM role \'{}\' is successfully created'.format(iam_role_name))82 # Attach83 iam_client.attach_role_policy(RoleName=iam_role_name, PolicyArn=policy_arn)84 print('The role policy \'{}\' is attached to IAM role \'{}\' '.format(policy_arn, iam_role_name))85def launch_redshift_cluster(redshift_client, cluster_identifier,86 cluster_type, node_type, num_nodes,87 db_name, db_user, db_password, iam_role_arn):88 """ Launches a new Redshift cluster with provided cluster properties.89 90 Args:91 redshift_client (boto3 Redshift client)92 cluster_identifier (str): Cluster identifier93 cluster_type (str): Type of cluster94 node_type (str): multi-node / single-node95 num_nodes (int): number of nodes96 db_name (str): database name97 db_user (str): master username98 db_password (str): master password99 iam_role_arn (str): IAM role for redshift cluster100 """101 if cluster_type == 'single-node':102 response = redshift_client.create_cluster( 103 # add parameters for hardware104 ClusterType=cluster_type,105 NodeType=node_type,106 # add parameters for identifiers & credentials107 DBName=db_name,108 ClusterIdentifier=cluster_identifier,109 MasterUsername=db_user,110 MasterUserPassword=db_password,111 # add parameter for IAM role112 IamRoles=[iam_role_arn])113 else:114 response = redshift_client.create_cluster( 115 # add parameters for hardware116 ClusterType=cluster_type,117 NodeType=node_type,118 NumberOfNodes=num_nodes,119 # add parameters for identifiers & credentials120 DBName=db_name,121 ClusterIdentifier=cluster_identifier,122 MasterUsername=db_user,123 MasterUserPassword=db_password,124 # add parameter for IAM role125 IamRoles=[iam_role_arn])126 print('The request for launching Redshift cluster is successfully submitted')127def wait_for_cluster_status_available(redshift_client, cluster_identifier, timeout=10.0):128 """ Checks and waits for Redshift cluster to become available until timeout.129 130 Args:131 redshift_client (boto3 Redshift client)132 cluster_identifier (str): Cluster identifier133 timeout (float): timeout value in minutes 134 """135 sleep_period = 2136 timeout_seconds = timeout*60137 time_start = time.time()138 time_pass = time.time() - time_start139 while time_pass < timeout_seconds:140 try:141 cluster_props = redshift_client.describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters'][0]142 if cluster_props['ClusterStatus'] == 'available':143 print('Time passed: {:05.1f} seconds, Redshift cluster is now available for use'.format(time_pass))144 return cluster_props145 else:146 print('Time passed: {:05.1f} seconds, Redshift cluster status: \'{}\''.format(time_pass, cluster_props['ClusterStatus']))147 except redshift_client.exceptions.ClusterNotFoundFault:148 print('Time passed: {:05.1f} seconds, Redshift cluster is not found yet ...'.format(time_pass))149 time.sleep(sleep_period)150 time_pass = time.time() - time_start151 raise Exception('WARNING: Redshift cluster did not become available, please delete it on AWS web console and try again.')152def create_cluster(config, ec2_resource, iam_client, redshift_client):153 """ Creates the IAM role, launches Redshift Cluster and opens an incoming TCP access port.154 155 Args:156 config: configuration for AWS/S3/Redshift cluster/IAM role157 ec2_resource (boto3 EC2 resource)158 iam_client (boto3 IAM client)159 redshift_client (boto3 Redshift client)160 """161 # Get IAM role name and its policy162 iam_role_name = config.get('IAM_ROLE','role_name')163 policy_arn = config.get('IAM_ROLE','policy_arn')164 # create IAM role for redshift to provide S3 read only access165 try:166 create_iam_role_with_policy(iam_client, iam_role_name, policy_arn)167 except Exception as e:168 print('IAM role (RoleName={}) could not be created\n{}'.format(iam_role_name, e))169 return170 # Update the IAM role ARN in the config file171 iam_role_arn = iam_client.get_role(RoleName=iam_role_name)['Role']['Arn']172 config.set('IAM_ROLE', 'arn', "'{}'".format(iam_role_arn))173 # Get Cluster/Database properties174 cluster_type = config.get('CLUSTER_PROP','cp_cluster_type')175 node_type = config.get('CLUSTER_PROP','cp_node_type')176 num_nodes = config.get('CLUSTER_PROP','cp_num_nodes')177 cluster_identifier = config.get('CLUSTER_PROP','cp_cluster_identifier')178 db_name = config.get('CLUSTER','db_name')179 db_user = config.get('CLUSTER','db_user')180 db_password = config.get('CLUSTER','db_password')181 # Launch Redshift cluster182 try:183 launch_redshift_cluster(redshift_client, cluster_identifier,184 cluster_type, node_type, int(num_nodes),185 db_name, db_user, db_password, iam_role_arn)186 except Exception as e:187 print(e)188 return189 # Wait Redshift cluster190 try:191 cluster_props = wait_for_cluster_status_available(redshift_client, cluster_identifier)192 except Exception as e:193 print(e)194 return195 # Update clsuter host in the config file196 db_host = cluster_props['Endpoint']['Address']197 config.set('CLUSTER', 'host', db_host)198 print('The cluster endpoint adress: {}'.format(db_host))199 # Save the update config file for later use200 with open(config_filename, 'w') as configfile:201 config.write(configfile)202 # Open an incoming TCP port to access the cluster endpoint203 print('Creating an incoming TCP port to access the cluster endpoint...')204 db_port = config.get('CLUSTER','db_port')205 try:206 vpc = ec2_resource.Vpc(id=cluster_props['VpcId'])207 defaultSg = list(vpc.security_groups.all())[0]208 print(defaultSg)209 defaultSg.authorize_ingress(210 GroupName= defaultSg.group_name,211 CidrIp='0.0.0.0/0', 212 IpProtocol='TCP',213 FromPort=int(db_port),214 ToPort=int(db_port))215 except Exception as e:216 if 'InvalidPermission.Duplicate' in str(e):217 print('TCP port access rule already exists for the default security group')218 else:219 print(e)220 return221 print('Redshift cluster setup is now completed succesfully and ready for use')222def delete_cluster(config, redshift_client):223 """ Deletes Redshift cluster with given identifier.224 225 Args:226 config: configuration for AWS/S3/Redshift cluster/IAM role227 redshift_client (boto3 Redshift client)228 """229 # Get the cluster idenfier230 cluster_identifier = config.get('CLUSTER_PROP','cp_cluster_identifier')231 # Delete Redshift cluster if it exists232 try:233 cluster_props = redshift_client.describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters'][0]234 print('Redshift cluster with identifier \'{}\' is found with status \'{}\', will delete it'.format(cluster_identifier, cluster_props['ClusterStatus']))235 # Submit a request to delete the cluster236 redshift_client.delete_cluster(ClusterIdentifier=cluster_identifier, SkipFinalClusterSnapshot=True)237 print('The request for Redshift cluster deletion is successfully submitted')238 except redshift_client.exceptions.ClusterNotFoundFault:239 print('Redshift cluster with identifier \'{}\' is not found, cannot delete it'.format(cluster_identifier))240 return241 except Exception as e:242 print(e)243 return244 # Check cluster status until it is deleted245 deleted = False246 sleep_period = 2247 time_start = time.time()248 time_pass = time.time() - time_start249 while deleted == False:250 try:251 cluster_props = redshift_client.describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters'][0]252 print('Time passed: {:05.1f} seconds, status of the Redshift cluster: \'{}\''.format(time_pass, cluster_props['ClusterStatus']))253 except redshift_client.exceptions.ClusterNotFoundFault:254 print('Time passed: {:05.1f} seconds, the redshift cluster is now successfully deleted'.format(time_pass))255 deleted = True256 except Exception as e:257 print(e)258 return259 time.sleep(sleep_period)260 time_pass = time.time() - time_start261def describe_cluster(config, redshift_client):262 """ Checks and reports cluster status and its endpoint adress263 264 Args:265 config: configuration for AWS/S3/Redshift cluster/IAM role266 redshift_client (boto3 Redshift client)267 """268 # Get Redshift cluster identifier269 cluster_identifier = config.get('CLUSTER_PROP','cp_cluster_identifier')270 try:271 cluster_props = redshift_client.describe_clusters(ClusterIdentifier=cluster_identifier)['Clusters'][0]272 print('Redshift cluster with identifier \'{}\' is found with status \'{}\''.format(cluster_identifier, cluster_props['ClusterStatus']))273 cluster_endpoint = cluster_props.get('Endpoint')274 if cluster_endpoint is not None:275 db_host = cluster_endpoint.get('Address')276 if db_host is not None:277 print('The cluster endpoint adress: {}'.format(db_host))278 except redshift_client.exceptions.ClusterNotFoundFault:279 print('Redshift cluster with identifier \'{}\' is not found, cannot describe it'.format(cluster_identifier))280 except Exception as e:281 print(e)282def main(action):283 """ Performs the selected action using boto3 library284 285 Args:286 action (str): options are 'create', 'delete', 'describe'287 """288 # Parse the configuratin file289 config = configparser.ConfigParser()290 config.read(config_filename)291 # Get AWS access credentials292 access_key_id = config.get('AWS','KEY')293 secret_access_key = config.get('AWS','SECRET')294 region_name = config.get('AWS','REGION')295 # create boto3 clients required for cluster setup296 try:297 ec2_resource, iam_client, redshift_client = create_clients(access_key_id, secret_access_key, region_name)298 except Exception as e:299 print('AWS boto3 clients could not be initialized !\n{}'.format(e))300 return301 if action == 'create':302 create_cluster(config, ec2_resource, iam_client, redshift_client)303 elif action == 'delete':304 delete_cluster(config, redshift_client)305 elif action == 'describe':306 describe_cluster(config, redshift_client)307if __name__ == "__main__":308 # Parse arguments309 parser = argparse.ArgumentParser(description="A python script to manage AWS Redshift Clusters through SDKs", add_help=True)310 parser.add_argument("action", type=str, choices=['create', 'delete', 'describe'],311 help="""create: creates the Redshift cluster,312 delete: deletes the Redshift cluster,313 describe: reports status of the Redshift cluster314 """)315 args = parser.parse_args()316 # Configuration filename317 config_filename = 'dwh.cfg'318 # Call main function319 main(args.action)320 321 ...
redshift_database_creation.py
Source:redshift_database_creation.py
1import argparse2import configparser3import pprint4import time5import psycopg26import boto37from botocore.exceptions import ClientError8config = configparser.ConfigParser()9config.read('dl.cfg')10cluster_config = config['CLUSTER']11#header of the dl.cfg file should have the same name as we in this variable [AWS]12os.environ['AWS_ACCESS_KEY_ID']=config["AWS"]['AWS_ACCESS_KEY_ID']13os.environ['AWS_SECRET_ACCESS_KEY']=config["AWS"]['AWS_SECRET_ACCESS_KEY']14def create_redshift_security_group():15 """16 Creates redshift security group and return security group id 17 """18 19 ec2 = boto3.client('ec2')20 # Each region has a unique VPC.21 response = ec2.describe_vpcs()22 vpc_id = response.get('Vpcs', [{}])[0].get('VpcId', '')23 if not vpc_id:24 raise RuntimeError("You must create a VPC first!")25 port = int(cluster_config['DB_PORT'])26 group_name = config['VPC']['SECURITY_GROUP_NAME']27 try:28 response = ec2.create_security_group(29 GroupName=group_name,30 Description='redshift security group',31 VpcId=vpc_id)32 security_group_id = response['GroupId']33 print(f"Security Group {security_group_id} Created in vpc {vpc_id}.")34 data = ec2.authorize_security_group_ingress(35 GroupId=security_group_id,36 IpPermissions=[37 {'IpProtocol': 'tcp',38 'FromPort': port,39 'ToPort': port,40 'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}41 ])42 print(f"Ingress Successfully Set {data}")43 return security_group_id44 except ClientError as e:45 if e.response['Error']['Code'] == 'InvalidGroup.Duplicate':46 response = ec2.describe_security_groups(47 Filters=[48 dict(Name='group-name', Values=[group_name])49 ]50 )51 return response['SecurityGroups'][0]['GroupId']52 raise e53def get_iam_role_arns():54 """55 Gets Amazon Resource Number (ARN) created before56 """57 iam_client = boto3.client('iam')58 # The IAM role was created by hand.59 iam_role_name = config['IAM_ROLE']['ROLE_NAME']60 role_arns = [61 iam_client.get_role(RoleName=iam_role_name)['Role']['Arn']62 ]63 return role_arns64def create_redshift_cluster():65 """66 Creates Amazon Redshift Cluster67 """68 identifier = cluster_config['IDENTIFIER']69 security_group_id = create_redshift_security_group()70 iam_role_arns = get_iam_role_arns()71 redshift_client = boto3.client('redshift')72 try:73 response = redshift_client.create_cluster(74 ClusterType="multi-node",75 NodeType=cluster_config['NODE_TYPE'],76 NumberOfNodes=int(cluster_config['NODE_COUNT']),77 DBName=cluster_config['DB_NAME'],78 ClusterIdentifier=identifier,79 MasterUsername=cluster_config['DB_USER'],80 MasterUserPassword=cluster_config['DB_PASSWORD'],81 Port=int(cluster_config['DB_PORT']),82 IamRoles=iam_role_arns,83 VpcSecurityGroupIds=[security_group_id],84 )85 except ClientError as e:86 response = None87 if e.response['Error']['Code'] != 'ClusterAlreadyExists':88 raise e89 # Wait until the status of the cluster becomes 'available'.90 print("Creating Redshift cluster ...")91 while True:92 info = redshift_client.describe_clusters(ClusterIdentifier=identifier)[93 'Clusters'][0]94 if info['ClusterStatus'] == 'available':95 pprint.PrettyPrinter().pprint(info)96 break97 else:98 time.sleep(10)99 return response100def get_redshift_cluster_endpoint():101 """102 Returns the redshift cluster endpoint103 """104 redshift_client = boto3.client('redshift')105 endpoint = redshift_client.describe_clusters(106 ClusterIdentifier=cluster_config['IDENTIFIER'])[107 'Clusters'][0]['Endpoint']108 return endpoint['Address'], endpoint['Port']109def delete_redshift_cluster():110 """111 Deletes created cluster112 """113 identifier = cluster_config['IDENTIFIER']114 redshift_client = boto3.client('redshift')115 print(f"Deleting Redshift cluster {identifier} ...")116 redshift_client.delete_cluster(ClusterIdentifier=identifier,117 SkipFinalClusterSnapshot=True)118def create_tables():119 """120 Creates database table based on schema121 """122 from sql_queries import create_table_queries123 address, port = get_redshift_cluster_endpoint()124 # connect to default database125 print(f"Connecting to Redshift cluster at {address}:{port} ...")126 conn = psycopg2.connect(127 f"host={address} "128 f"dbname={cluster_config['DB_NAME']} "129 f"user={cluster_config['DB_USER']} "130 f"password={cluster_config['DB_PASSWORD']} "131 f"port={port}")132 cur = conn.cursor()133 for query in create_table_queries:134 cur.execute(query)135 conn.commit()136if __name__ == "__main__":137 parser = argparse.ArgumentParser()138 parser.add_argument("--delete", action="store_true")139 args = parser.parse_args()140 if args.delete:141 delete_redshift_cluster()142 else:143 create_redshift_cluster()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!