How to use delete_log_stream method in localstack

Best Python code snippet using localstack_python

operations.py

Source:operations.py Github

copy

Full Screen

...65 return response66 except Exception as err:67 logger.error('{}'.format(str(err)))68 raise ConnectorError(str(err))69def delete_log_stream(config, params):70 try:71 cw = CloudWatch(config)72 cw_client = cw._get_cloudwatch_client()73 params_dict = cw._build_request_payload(params)74 response = cw_client.delete_log_stream(**params_dict)75 return response76 except Exception as err:77 logger.error('{}'.format(str(err)))78 raise ConnectorError(str(err))79def create_log_retention_policy(config, params):80 try:81 cw = CloudWatch(config)82 cw_client = cw._get_cloudwatch_client()83 params_dict = cw._build_request_payload(params)84 response = cw_client.put_retention_policy(**params_dict)85 return response86 except Exception as err:87 logger.error('{}'.format(str(err)))88 raise ConnectorError(str(err))...

Full Screen

Full Screen

lambda_function.py

Source:lambda_function.py Github

copy

Full Screen

...38 except exceptions.ClientError as e:39 logger.error(e.response["Error"]["Code"])40 is_created = False41 return is_created42 def delete_log_stream(self):43 is_deleted = True44 try:45 cwlogs.delete_log_stream(logGroupName=self.group_name, logStreamName=self.stream_name)46 except exceptions.ClientError as e:47 # ResourceNotFoundException is ok48 codes = [49 "InvalidParameterException",50 "OperationAbortedException",51 "ServiceUnavailableException",52 ]53 if e.response["Error"]["Code"] in codes:54 logger.error(e.response["Error"]["Code"])55 is_deleted = False56 return is_deleted57 def init_log_stream(self):58 if not all([self.has_log_group(), self.delete_log_stream(), self.create_log_stream()]):59 raise Exception("fails to create log stream")60 logger.info("log stream created")61 def create_log_events(self, stream):62 fmt = "%Y-%m-%d %H:%M:%S,%f"63 log_events = []64 for m in [s for s in stream.getvalue().split("\n") if s]:65 match = re.search(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}", m)66 dt_str = match.group() if match else datetime.utcnow().strftime(fmt)67 log_events.append(68 {"timestamp": int(datetime.strptime(dt_str, fmt).timestamp()) * 1000, "message": m}69 )70 return log_events71 def put_log_events(self, stream):72 try:...

Full Screen

Full Screen

index.py

Source:index.py Github

copy

Full Screen

...15 16 delete_count = 017 for log_group in get_log_groups(client):18 for expired_log_stream in get_expired_log_streams(client, log_group):19 delete_log_stream(client, log_group, expired_log_stream)20 delete_count += 121 if delete_count % 5 == 0:22 # Prevents the log stream delete throttle (5 per second) from blocking execution.23 time.sleep(1)24 delete_count = 025 log_info('Completed empty expired log stream purge', delete_count = delete_count)26def get_log_groups(client):27 paginator = client.get_paginator('describe_log_groups')28 response_iterator = paginator.paginate()29 for response_item in response_iterator:30 for log_group_item in response_item.get('logGroups', []):31 if 'retentionInDays' in log_group_item:32 yield log_group_item33def get_expired_log_streams(client, log_group):34 log_group_name = log_group['logGroupName']35 retention_in_days = log_group['retentionInDays']36 cutoff_date = datetime.datetime.utcnow() - datetime.timedelta(days = retention_in_days + 2)37 log_info('Processing log group', log_group_name = log_group_name, retention_in_days = retention_in_days, cutoff_date = str(cutoff_date))38 paginator = client.get_paginator('describe_log_streams')39 response_iterator = paginator.paginate(40 logGroupName = log_group_name,41 orderBy = 'LastEventTime',42 descending = False43 )44 for reposonse_item in response_iterator:45 for log_stream_item in reposonse_item.get('logStreams'):46 log_stream_name = log_stream_item['logStreamName']47 if 'lastEventTimestamp' in log_stream_item and 'lastIngestionTime' in log_stream_item:48 last_event_timestamp = datetime.datetime.fromtimestamp(log_stream_item['lastEventTimestamp'] / 1000)49 last_event_ingestion_timestamp = datetime.datetime.fromtimestamp(log_stream_item['lastIngestionTime'] / 1000)50 if last_event_timestamp < cutoff_date and last_event_ingestion_timestamp < cutoff_date:51 log_events = client.get_log_events(52 logGroupName = log_group_name,53 logStreamName = log_stream_name,54 startFromHead = True,55 limit = 156 )57 if len(log_events.get('events', [])) == 0:58 log_info('Identified log stream to delete', log_group_name = log_group_name, log_stream_name = log_stream_name, \59 retention_in_days = retention_in_days, cutoff_date = str(cutoff_date), \60 last_event_timestamp = str(last_event_timestamp), last_event_ingestion_timestamp = str(last_event_ingestion_timestamp), \61 log_event_count = 0 )62 yield log_stream_item63 64def delete_log_stream(client, log_group, expired_log_stream):65 client.delete_log_stream(66 logGroupName = log_group['logGroupName'],67 logStreamName = expired_log_stream['logStreamName']...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful