Best Python code snippet using localstack_python
test_logs.py
Source:test_logs.py
...201 firehose_name = f"test-firehose-{short_uid()}"202 s3_bucket_arn = f"arn:aws:s3:::{s3_bucket}"203 role = f"test-firehose-s3-role-{short_uid()}"204 policy_name = f"test-firehose-s3-role-policy-{short_uid()}"205 role_arn = create_iam_role_with_policy(206 RoleName=role,207 PolicyName=policy_name,208 RoleDefinition=s3_firehose_role,209 PolicyDefinition=s3_firehose_permission,210 )211 # TODO AWS has troubles creating the delivery stream the first time212 # policy is not accepted at first, so we try again213 def create_delivery_stream():214 firehose_client.create_delivery_stream(215 DeliveryStreamName=firehose_name,216 S3DestinationConfiguration={217 "BucketARN": s3_bucket_arn,218 "RoleARN": role_arn,219 "BufferingHints": {"SizeInMBs": 1, "IntervalInSeconds": 60},220 },221 )222 retry(create_delivery_stream, retries=5, sleep=10.0)223 response = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)224 firehose_arn = response["DeliveryStreamDescription"]["DeliveryStreamARN"]225 role = f"test-firehose-role-{short_uid()}"226 policy_name = f"test-firehose-role-policy-{short_uid()}"227 role_arn_logs = create_iam_role_with_policy(228 RoleName=role,229 PolicyName=policy_name,230 RoleDefinition=logs_role,231 PolicyDefinition=firehose_permission,232 )233 def check_stream_active():234 state = firehose_client.describe_delivery_stream(DeliveryStreamName=firehose_name)[235 "DeliveryStreamDescription"236 ]["DeliveryStreamStatus"]237 if state != "ACTIVE":238 raise Exception(f"DeliveryStreamStatus is {state}")239 retry(check_stream_active, retries=60, sleep=30.0)240 logs_client.put_subscription_filter(241 logGroupName=logs_log_group,242 filterName="Destination",243 filterPattern="",244 destinationArn=firehose_arn,245 roleArn=role_arn_logs,246 )247 logs_client.put_log_events(248 logGroupName=logs_log_group,249 logStreamName=logs_log_stream,250 logEvents=[251 {"timestamp": now_utc(millis=True), "message": "test"},252 {"timestamp": now_utc(millis=True), "message": "test 2"},253 ],254 )255 def list_objects():256 response = s3_client.list_objects(Bucket=s3_bucket)257 assert len(response["Contents"]) >= 1258 retry(list_objects, retries=60, sleep=30.0)259 response = s3_client.list_objects(Bucket=s3_bucket)260 key = response["Contents"][-1]["Key"]261 response = s3_client.get_object(Bucket=s3_bucket, Key=key)262 content = gzip.decompress(response["Body"].read()).decode("utf-8")263 assert "DATA_MESSAGE" in content264 assert "test" in content265 assert "test 2" in content266 finally:267 # clean up268 firehose_client.delete_delivery_stream(269 DeliveryStreamName=firehose_name, AllowForceDelete=True270 )271 def test_put_subscription_filter_kinesis(272 self,273 logs_client,274 logs_log_group,275 logs_log_stream,276 kinesis_client,277 iam_client,278 create_iam_role_with_policy,279 ):280 kinesis_name = f"test-kinesis-{short_uid()}"281 filter_name = "Destination"282 kinesis_client.create_stream(StreamName=kinesis_name, ShardCount=1)283 try:284 result = kinesis_client.describe_stream(StreamName=kinesis_name)["StreamDescription"]285 kinesis_arn = result["StreamARN"]286 role = f"test-kinesis-role-{short_uid()}"287 policy_name = f"test-kinesis-role-policy-{short_uid()}"288 role_arn = create_iam_role_with_policy(289 RoleName=role,290 PolicyName=policy_name,291 RoleDefinition=logs_role,292 PolicyDefinition=kinesis_permission,293 )294 # wait for stream-status "ACTIVE"295 status = result["StreamStatus"]296 if status != "ACTIVE":297 def check_stream_active():298 state = kinesis_client.describe_stream(StreamName=kinesis_name)[299 "StreamDescription"300 ]["StreamStatus"]301 if state != "ACTIVE":302 raise Exception(f"StreamStatus is {state}")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!