Best Python code snippet using localstack_python
firehose_to_s3.py
Source:firehose_to_s3.py
...220 except ClientError as e:221 logging.error(e)222 return None223 return result['DeliveryStreamARN']224def wait_for_active_firehose(firehose_name):225 """Wait until the Firehose delivery stream is active226 :param firehose_name: Name of Firehose delivery stream227 :return: True if delivery stream is active. Otherwise, False.228 """229 # Wait until the stream is active230 firehose_client = boto3.client('firehose')231 while True:232 try:233 # Get the stream's current status234 result = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)235 except ClientError as e:236 logging.error(e)237 return False238 status = result['DeliveryStreamDescription']['DeliveryStreamStatus']239 if status == 'ACTIVE':240 return True241 if status == 'DELETING':242 logging.error(f'Firehose delivery stream {firehose_name} is being deleted.')243 return False244 time.sleep(2)245def main():246 """Exercise Kinesis Firehose methods"""247 # Assign these values before running the program248 # If the specified IAM role does not exist, it will be created249 firehose_name = 'firehose_to_s3_stream'250 bucket_arn = 'arn:aws:s3:::BUCKET_NAME'251 iam_role_name = 'firehose_to_s3'252 # Set up logging253 logging.basicConfig(level=logging.DEBUG,254 format='%(levelname)s: %(asctime)s: %(message)s')255 # If Firehose doesn't exist, create it256 if not firehose_exists(firehose_name):257 # Create a Firehose delivery stream to S3. The Firehose will receive258 # data from direct puts.259 firehose_arn = create_firehose_to_s3(firehose_name, bucket_arn, iam_role_name)260 if firehose_arn is None:261 exit(1)262 logging.info(f'Created Firehose delivery stream to S3: {firehose_arn}')263 # Wait for the stream to become active264 if not wait_for_active_firehose(firehose_name):265 exit(1)266 logging.info('Firehose stream is active')267 # Put records into the Firehose stream268 test_data_file = 'kinesis_test_data.txt'269 firehose_client = boto3.client('firehose')270 with open(test_data_file, 'r') as f:271 logging.info('Putting 20 records into the Firehose one at a time')272 for i in range(20):273 # Read a record of test data274 line = next(f)275 # Put the record into the Firehose stream276 try:277 firehose_client.put_record(DeliveryStreamName=firehose_name,278 Record={'Data': line})...
firehose_stack.py
Source:firehose_stack.py
...10 self._supported_in_region = self.is_service_supported_in_region()11 delivery_bucket = self.create_s3_delivery_bucket()12 self.create_log_group_and_stream()13 firehose_role_arn = self.create_firehose_role(delivery_bucket)14 firehose = self.create_firehose(delivery_bucket, firehose_role_arn)15 firehose_stream_name = firehose.ref16 self._parameters_to_save["firehose_stream_name"] = firehose_stream_name17 self.create_test_policies(common_stack)18 self.save_parameters_in_parameter_store(Platform.IOS)19 def create_s3_delivery_bucket(self) -> aws_s3.Bucket:20 delivery_bucket = aws_s3.Bucket(21 self, "integ_test_firehose_delivery_bucket", removal_policy=core.RemovalPolicy.DESTROY22 )23 return delivery_bucket24 def create_log_group_and_stream(self) -> aws_logs.LogGroup:25 log_group = aws_logs.LogGroup(26 self,27 "integ_test_firehose_delivery_log_group",28 log_group_name=FirehoseStack.LOG_GROUP_NAME,29 removal_policy=core.RemovalPolicy.DESTROY,30 retention=aws_logs.RetentionDays.FIVE_DAYS,31 )32 aws_logs.LogStream(33 self,34 "integ_test_firehose_delivery_log_stream",35 log_group=log_group,36 log_stream_name=FirehoseStack.LOG_STREAM_NAME,37 removal_policy=core.RemovalPolicy.DESTROY,38 )39 return log_group40 def create_firehose_role(self, delivery_bucket) -> str:41 """42 Creates an IAM role to allow Kinesis to deliver records to S3, per43 https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html44 :param delivery_bucket: The destination bucket45 :return: IAM Role ARN46 """47 firehose_role = aws_iam.Role(48 self,49 "integ_test_firehose_delivery_role",50 assumed_by=aws_iam.ServicePrincipal("firehose.amazonaws.com"),51 )52 firehose_role.add_to_policy(53 aws_iam.PolicyStatement(54 effect=aws_iam.Effect.ALLOW,55 actions=[56 "s3:AbortMultipartUpload",57 "s3:GetBucketLocation",58 "s3:GetObject",59 "s3:ListBucket",60 "s3:ListBucketMultipartUploads",61 "s3:PutObject",62 ],63 resources=[delivery_bucket.bucket_arn, f"{delivery_bucket.bucket_arn}/*"],64 )65 )66 firehose_role.add_to_policy(67 aws_iam.PolicyStatement(68 effect=aws_iam.Effect.ALLOW,69 actions=[70 "kinesis:DescribeStream",71 "kinesis:GetShardIterator",72 "kinesis:GetRecords",73 "kinesis:ListShards",74 ],75 resources=[f"arn:aws:kinesis:{self.region}:{self.account}:stream/*"],76 )77 )78 log_stream_arn = ":".join(79 [80 "arn:aws:logs",81 self.region,82 self.account,83 "log-group",84 FirehoseStack.LOG_GROUP_NAME,85 "log-stream",86 FirehoseStack.LOG_STREAM_NAME,87 ]88 )89 firehose_role.add_to_policy(90 aws_iam.PolicyStatement(91 effect=aws_iam.Effect.ALLOW,92 actions=["logs:PutLogEvents"],93 resources=[log_stream_arn],94 )95 )96 return firehose_role.role_arn97 def create_firehose(98 self, delivery_bucket, firehose_role_arn99 ) -> aws_kinesisfirehose.CfnDeliveryStream:100 """101 Creates a Firehose DeliveryStream configured to deliver to the S3 Bucket `delivery_bucket`,102 and log errors to a log stream named 'S3Delivery' in `log_group`. Firehose will adopt the103 role specified in `firehose_role_arn`.104 :param delivery_bucket: The delivery destination bucket for the Firehose105 :param firehose_role_arn: The role to adopt106 :return: a CfnDeliveryStream107 """108 firehose = aws_kinesisfirehose.CfnDeliveryStream(109 self,110 "integ_test_firehose",111 extended_s3_destination_configuration={...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!