Best Python code snippet using localstack_python
manage.py
Source:manage.py
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3import base644import boto35import json6import os7import re8import sys9from elasticsearch import Elasticsearch, RequestsHttpConnection, TransportError10from manager import Manager11from aws_requests_auth.boto_utils import BotoAWSRequestsAuth12manager = Manager()13boto_session = boto3.Session()14es = {}15def get_es_conn(es_domain_arn):16 if es_domain_arn not in es:17 es_domain = re.sub(18 r'^arn:aws:es:[a-z]*-[a-z]*-[0-9]*:[0-9]*:domain/(.*)$',19 r'\1',20 es_domain_arn)21 aws_es = boto_session.client('es')22 es_config = aws_es.describe_elasticsearch_domain(DomainName=es_domain)23 awsauth = BotoAWSRequestsAuth(24 aws_host=es_config['DomainStatus']['Endpoint'],25 aws_region=boto_session.region_name,26 aws_service='es'27 )28 es.update({es_domain_arn:29 Elasticsearch(30 hosts=[{'host': es_config['DomainStatus']['Endpoint'], 'port': 443}],31 http_auth=awsauth,32 use_ssl=True,33 verify_certs=True,34 connection_class=RequestsHttpConnection,35 retry_on_timeout=True36 )37 })38 return es[es_domain_arn]39def get_firehose_config(stream=None):40 if stream is None and not os.environ.get('STREAM_NAME', False):41 print('No stream name passed and STREAM_NAME not set in environment')42 sys.exit(1)43 firehose = boto_session.client('firehose')44 # Intentionally not catching the ResourceNotFoundException45 try:46 exists_resp = firehose.describe_delivery_stream(DeliveryStreamName=stream)47 if 'DeliveryStreamDescription' in exists_resp:48 return exists_resp['DeliveryStreamDescription']49 except firehose.exceptions.ResourceNotFoundException as e:50 print(e.message)51 except Exception:52 print(e.message)53 print('Something went wrong in retrieving the Firehose Stream configuration')54 sys.exit(1)55def get_s3info_from_config(config):56 info = {}57 if 'S3BackupMode' not in config:58 return info59 # S3 and Redshift destinations use this key60 if config['S3BackupMode'] == 'enabled':61 info.update({62 'ARN': config['S3BackupDescription']['BucketARN'],63 'prefix': config['S3BackupDescription']['Prefix'],64 'bucket': config['S3BackupDescription']['BucketARN'].replace('arn:aws:s3:::','')65 })66 # ES destinations use this key67 elif config['S3BackupMode'] == 'FailedDocumentsOnly':68 info.update({69 'ARN': config['S3DestinationDescription']['BucketARN'],70 'prefix': config['S3DestinationDescription']['Prefix'],71 'bucket': config['S3DestinationDescription']['BucketARN'].replace('arn:aws:s3:::','')72 })73 return info74@manager.command75def review(stream, pattern=None, year=None, month=None, day=None):76 """77 Review the list of failures from a Kinesis Firehose Stream78 """79 stream_config = get_firehose_config(stream)80 buckets = _extract_s3_info(stream_config)81 if len(buckets) < 1:82 print('Nothing to review, no stream destinations have S3 backups enabled.')83 sys.exit(1)84 elif len(buckets) > 1:85 print('WARNING: Review is possible, but cannot resubmit to firehose for delivery since there are multiple destinations defined on the stream.')86 print('Some destinations may have already accepted the records.')87 for destination in buckets:88 print('Keys in {}'.format(destination['bucket']))89 prefix = _build_prefix(destination, year=year, month=month, day=day)90 print('Prefix: {}'.format(prefix))91 bucket = boto_session.resource('s3').Bucket(destination['bucket'])92 bucketlist = bucket.objects.filter(Prefix=prefix)93 for subobj in sorted(bucketlist, key=lambda k: k.last_modified):94 display_key = subobj.key.replace(prefix, '')95 if pattern:96 if subobj.key.contains(pattern):97 print(' {}'.format(display_key))98 else:99 print(' {}'.format(display_key))100def _build_prefix(s3_destination, year=None, month=None, day=None):101 prefix = s3_destination['prefix']102 if year:103 if prefix.endswith('/'):104 prefix = prefix[:-1]105 prefix = '{}/elasticsearch-failed/{}'.format(prefix, year)106 if month:107 if int(month) < 10:108 month = '0{}'.format(int(month))109 prefix = '{}/{}'.format(prefix, month)110 if day:111 if int(day) < 10:112 day = '0{}'.format(int(day))113 prefix = '{}/{}'.format(prefix, day)114 prefix = '{}/'.format(prefix)115 return prefix116def _extract_s3_info(stream_config, es_only=False):117 buckets = []118 for destination in stream_config['Destinations']:119 if 'ExtendedS3DestinationDescription' in destination:120 info = get_s3info_from_config(destination['ExtendedS3DestinationDescription'])121 if info:122 buckets.append(info)123 if 'RedshiftDestinationDescription' in destination:124 info = get_s3info_from_config(destination['RedshiftDestinationDescription'])125 if info:126 buckets.append(info)127 if 'ElasticsearchDestinationDescription' in destination:128 info = get_s3info_from_config(destination['ElasticsearchDestinationDescription'])129 if info:130 buckets.append(info)131 return buckets132@manager.command133def show(stream, key, year=None, month=None, day=None):134 """135 Show the contents of a single failure report. Needs S3 key as provided by the review command136 """137 stream_config = get_firehose_config(stream)138 try:139 s3_info = _extract_s3_info(stream_config, es_only=True)[0]140 except IndexError:141 print('Cannot resubmit to ES, no ES destination defined in stream config.')142 sys.exit(1)143 prefix = _build_prefix(s3_info, year=year, month=month, day=day)144 print(json.dumps(get_failure_report(s3_info['bucket'], '{}{}'.format(prefix, key)), indent=4))145def get_failure_report(bucket, key):146 """147 Retrieves and decodes a failure report for display or resubmission148 """149 obj = boto_session.resource('s3').Object(bucket, key).get()150 body = obj['Body'].read()151 lines = []152 for line in body.split('\n'):153 if not line:154 continue155 try:156 newline = json.loads(line.replace('\r',''))157 newline['rawData'] = json.loads(base64.b64decode(newline['rawData']))158 lines.append(newline)159 except ValueError as e:160 print(line)161 raise e162 except TypeError as e1:163 print(line)164 raise e1165 return lines166@manager.command167def resubmit_to_es(stream, year=None, month=None, day=None):168 """169 Resubmit a day of failed records to ElasticSearch170 """171 stream_config = get_firehose_config(stream)172 try:173 s3_info = _extract_s3_info(stream_config, es_only=True)[0]174 except IndexError:175 print('Cannot resubmit to ES, no ES destination defined in stream config.')176 sys.exit(1)177 prefix = _build_prefix(s3_info, year=year, month=month, day=day)178 print('Prefix: {}'.format(prefix))179 bucket = boto_session.resource('s3').Bucket(s3_info['bucket'])180 bucketlist = bucket.objects.filter(Prefix=prefix)181 # Get list in reverse, because typically when processing a full month, we182 # want the most recent entries populated fastest183 for subobj in sorted(bucketlist, key=lambda k: k.last_modified, reverse=True):184 print('Attempting to resubmit: {}'.format(subobj.key))185 _resubmit_to_es(s3_info['bucket'], subobj.key, stream_config)186def _resubmit_to_es(bucket, key, stream_config):187 """188 Resubmit a failed record to ElasticSearch189 """190 es_destination_arn = None191 for destination in stream_config['Destinations']:192 if 'ElasticsearchDestinationDescription' in destination:193 es_destination_arn = destination['ElasticsearchDestinationDescription']['DomainARN']194 break195 if not es_destination_arn:196 print('There is no elasticsearch destination configured for stream {}'.format(stream))197 sys.exit(1)198 report = get_failure_report(bucket, key)199 es_conn = get_es_conn(es_destination_arn)200 all_resubmitted = True201 for failures in report:202 try:203 if 'esIndexName' not in failures:204 continue205 exception_message = failures['rawData'].get('response', {}).get('exception', {}).get('message', {})206 if exception_message:207 if not isinstance(exception_message, str):208 failures['rawData']['response']['exception']['raw_message'] = json.dumps(exception_message)209 else:210 failures['rawData']['response']['exception']['raw_message'] = exception_message211 failures['rawData']['response']['exception'].pop('message')212 request_data = failures['rawData'].get('request', {}).get('data', {})213 if request_data:214 if not isinstance(request_data, str):215 failures['rawData']['request']['raw_data'] = json.dumps(request_data)216 else:217 failures['rawData']['request']['raw_data'] = request_data218 failures['rawData']['request'].pop('data')219 print('Resubmitting document id: {}'.format(failures['esDocumentId']))220 response = es_conn.create(221 index=failures['esIndexName'],222 doc_type=failures['esTypeName'],223 body=failures['rawData'],224 id=failures['esDocumentId']225 )226 except TransportError as e:227 if e.status_code == 409 and e.error == 'version_conflict_engine_exception':228 pass229 else:230 all_resubmitted = False231 print('An transport error occurred resubmitting the data for {}: {}'.format(failures['esDocumentId'], e))232 except Exception as e:233 print('An error occurred resubmitting the data for {}: {}'.format(failures['esDocumentId'], e))234 all_resubmitted = False235 if all_resubmitted:236 boto_session.resource('s3').Object(bucket, key).delete()237 print('Removed {}'.format(key))238 else:239 print('Did not remove {}'.format(key))240if __name__ == '__main__':...
kinesis_firehose_stream.py
Source:kinesis_firehose_stream.py
1from typing import List, Optional2from cloudrail.knowledge.context.aws.resources.networking_config.network_configuration import NetworkConfiguration3from cloudrail.knowledge.context.aws.resources.networking_config.network_entity import NetworkEntity4from cloudrail.knowledge.context.aws.resources.service_name import AwsServiceName5from cloudrail.knowledge.utils.tags_utils import filter_tags6class KinesisFirehoseStream(NetworkEntity):7 """8 Attributes:9 stream_name: The name of the Kinesis Firehose Stream.10 stream_arn: The ARN of the Kinesis Firehose Stream.11 encrypted_at_rest: True if the stream is set to be encrypted.12 es_domain_arn: The ARN of the related ElasticSearch Domain, if any.13 es_vpc_config: The VPC configuration of the ElasticSearch Domain related14 to this stream, if any.15 """16 def __init__(self,17 stream_name: str,18 stream_arn: str,19 encrypted_at_rest: bool,20 account: str,21 region: str,22 es_domain_arn: Optional[str],23 es_vpc_config: Optional[NetworkConfiguration]):24 super().__init__(stream_name, account, region, AwsServiceName.AWS_KINESIS_FIREHOSE_DELIVERY_STREAM)25 self.stream_name: str = stream_name26 self.stream_arn: str = stream_arn27 self.encrypted_at_rest: bool = encrypted_at_rest28 self.es_domain_arn: Optional[str] = es_domain_arn29 self.es_vpc_config: Optional[NetworkConfiguration] = es_vpc_config30 def get_keys(self) -> List[str]:31 return [self.stream_arn]32 def get_name(self) -> str:33 return self.stream_name34 def get_arn(self) -> str:35 return self.stream_arn36 def get_type(self, is_plural: bool = False) -> str:37 if not is_plural:38 return 'Kinesis Data Firehose'39 else:40 return 'Kinesis Data Firehoses'41 def get_all_network_configurations(self) -> Optional[List[NetworkConfiguration]]:42 if self.es_vpc_config:43 return [NetworkConfiguration(self.es_vpc_config.assign_public_ip,44 self.es_vpc_config.security_groups_ids,45 self.es_vpc_config.subnet_list_ids)]46 else:47 return None48 def get_cloud_resource_url(self) -> str:49 return '{0}firehose/home?region={1}#/details/{2}' \50 .format(self.AWS_CONSOLE_URL, self.region, self.stream_name)51 @property52 def is_tagable(self) -> bool:53 return True54 def to_drift_detection_object(self) -> dict:55 return {'tags': filter_tags(self.tags), 'stream_name': self.stream_name,56 'encrypted_at_rest': self.encrypted_at_rest,57 'es_domain_arn': self.es_domain_arn,58 'assign_public_ip': self.es_vpc_config and self.es_vpc_config.assign_public_ip,59 'security_groups_ids': self.es_vpc_config and self.es_vpc_config.security_groups_ids,...
app.py
Source:app.py
1#!/usr/bin/env python32from aws_cdk import core3from aws_es_recommended_cw_alarms.aws_es_recommended_cw_alarms_stack import (4 AwsEsRecommendedCwAlarmsStack,5)6import os7import sys8app = core.App()9CFN_STACK_NAME, ES_DOMAIN_ARN, AWS_PROFILE, CW_TRIGGER_SNS_ARN_LIST, ENABLE_ES_API_OUTPUT, ES_API_OUTPUT_SNS_ARN = (10 sys.argv[1],11 sys.argv[2],12 sys.argv[3],13 None if sys.argv[4] == 'None' else sys.argv[4].split(","),14 sys.argv[5] == "True",15 None if sys.argv[6] == 'None' else sys.argv[6],16)17AwsEsRecommendedCwAlarmsStack(18 app,19 CFN_STACK_NAME,20 ES_DOMAIN_ARN,21 AWS_PROFILE,22 CW_TRIGGER_SNS_ARN_LIST,23 ENABLE_ES_API_OUTPUT,24 ES_API_OUTPUT_SNS_ARN,25 env={"account": ES_DOMAIN_ARN.split(":")[4], "region": ES_DOMAIN_ARN.split(":")[3]},26)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!