How to use log_group_exists method in localstack

Best Python code snippet using localstack_python

loggers.py

Source:loggers.py Github

copy

Full Screen

1import datetime2import logging3import boto34from dagster import Field, check, logger, seven5from dagster.core.log_manager import coerce_valid_log_level6# The maximum batch size is 1,048,576 bytes, and this size is calculated as the sum of all event7# messages in UTF-8, plus 26 bytes for each log event.8MAXIMUM_BATCH_SIZE = 10485769OVERHEAD = 2610EPOCH = datetime.datetime(1970, 1, 1)11# For real12def millisecond_timestamp(dt):13 td = dt - EPOCH14 microsecond_timestamp = (15 td.days * 24 * 60 * 60 * 1000000 + td.seconds * 1000000 + td.microseconds16 )17 return int(microsecond_timestamp / 1000)18class CloudwatchLogsHandler(logging.Handler):19 def __init__(20 self,21 log_group_name,22 log_stream_name,23 aws_region=None,24 aws_secret_access_key=None,25 aws_access_key_id=None,26 ):27 self.client = boto3.client(28 'logs',29 region_name=aws_region,30 aws_access_key_id=aws_access_key_id,31 aws_secret_access_key=aws_secret_access_key,32 )33 self.log_group_name = check.str_param(log_group_name, 'log_group_name')34 # Maybe we should make this optional, and default to the run_id35 self.log_stream_name = check.str_param(log_stream_name, 'log_stream_name')36 self.overhead = OVERHEAD37 self.maximum_batch_size = MAXIMUM_BATCH_SIZE38 self.sequence_token = None39 self.check_log_group()40 self.check_log_stream()41 super(CloudwatchLogsHandler, self).__init__()42 def check_log_group(self):43 # Check that log group exists44 log_group_exists = False45 next_token = None46 while not log_group_exists:47 describe_log_group_kwargs = {'logGroupNamePrefix': self.log_group_name}48 if next_token is not None:49 describe_log_group_kwargs['nextToken'] = next_token50 res = self.client.describe_log_groups(**describe_log_group_kwargs)51 if self.log_group_name in (log_group['logGroupName'] for log_group in res['logGroups']):52 log_group_exists = True53 break54 else:55 next_token = res.get('nextToken')56 if next_token is None:57 break58 if not log_group_exists:59 raise Exception(60 'Failed to initialize Cloudwatch logger: Could not find log group with name '61 '{log_group_name}'.format(log_group_name=self.log_group_name)62 )63 def check_log_stream(self):64 # Check that log stream exists65 log_stream_exists = False66 next_token = None67 while not log_stream_exists:68 describe_log_stream_kwargs = {69 'logGroupName': self.log_group_name,70 'logStreamNamePrefix': self.log_stream_name,71 }72 if next_token is not None:73 describe_log_stream_kwargs['nextToken'] = next_token74 res = self.client.describe_log_streams(**describe_log_stream_kwargs)75 for log_stream in res['logStreams']:76 if self.log_stream_name == log_stream['logStreamName']:77 log_stream_exists = True78 self.sequence_token = log_stream.get('uploadSequenceToken')79 break80 else:81 next_token = res.get('nextToken')82 if next_token is None:83 break84 if not log_stream_exists:85 raise Exception(86 'Failed to initialize Cloudwatch logger: Could not find log stream with name '87 '{log_stream_name}'.format(log_stream_name=self.log_stream_name)88 )89 def log_error(self, record, exc):90 logging.critical('Error while logging!')91 try:92 logging.error(93 'Attempted to log: {record}'.format(record=seven.json.dumps(record.__dict__))94 )95 except Exception: # pylint: disable=broad-except96 pass97 logging.exception(str(exc))98 def emit(self, record):99 self._emit(record, retry=False)100 def retry(self, record):101 self._emit(record, retry=True)102 def _emit(self, record, retry=False):103 message = seven.json.dumps(record.__dict__)104 timestamp = millisecond_timestamp(105 datetime.datetime.strptime(record.dagster_meta['log_timestamp'], '%Y-%m-%dT%H:%M:%S.%f')106 )107 params = {108 'logGroupName': self.log_group_name,109 'logStreamName': self.log_stream_name,110 'logEvents': [{'timestamp': timestamp, 'message': message}],111 }112 if self.sequence_token is not None:113 params['sequenceToken'] = self.sequence_token114 try:115 res = self.client.put_log_events(**params)116 self.sequence_token = res['nextSequenceToken']117 log_events_rejected = res.get('rejectedLogEventsInfo')118 if log_events_rejected is not None:119 logging.error('Cloudwatch logger: log events rejected: {res}'.format(res=res))120 except self.client.exceptions.InvalidSequenceTokenException as exc:121 if not retry:122 self.check_log_stream()123 self.retry(record)124 else:125 self.log_error(record, exc)126 except self.client.exceptions.DataAlreadyAcceptedException as exc:127 logging.error('Cloudwatch logger: log events already accepted: {res}'.format(res=res))128 except self.client.exceptions.InvalidParameterException as exc:129 logging.error(130 'Cloudwatch logger: Invalid parameter exception while logging: {res}'.format(131 res=res132 )133 )134 except self.client.exceptions.ResourceNotFoundException as exc:135 logging.error(136 'Cloudwatch logger: Resource not found. Check that the log stream or log group '137 'was not deleted: {res}'.format(res=res)138 )139 except self.client.exceptions.ServiceUnavailableException as exc:140 if not retry:141 self.retry(record)142 else:143 logging.error('Cloudwatch logger: Service unavailable: {res}'.format(res=res))144 except self.client.exceptions.ServiceUnavailableException as exc:145 if not retry:146 self.retry(record)147 else:148 logging.error(149 'Cloudwatch logger: Unrecognized client. Check your AWS access key id and '150 'secret key: {res}'.format(res=res)151 )152@logger(153 {154 'log_level': Field(str, is_optional=True, default_value='INFO'),155 'name': Field(str, is_optional=True, default_value='dagster'),156 'log_group_name': Field(str),157 'log_stream_name': Field(str),158 'aws_region': Field(str, is_optional=True),159 'aws_secret_access_key': Field(str, is_optional=True),160 'aws_access_key_id': Field(str, is_optional=True),161 },162 description='The default colored console logger.',163)164def cloudwatch_logger(init_context):165 level = coerce_valid_log_level(init_context.logger_config['log_level'])166 name = init_context.logger_config['name']167 klass = logging.getLoggerClass()168 logger_ = klass(name, level=level)169 logger_.addHandler(170 CloudwatchLogsHandler(171 init_context.logger_config['log_group_name'],172 init_context.logger_config['log_stream_name'],173 aws_region=init_context.logger_config.get('aws_region'),174 aws_secret_access_key=init_context.logger_config.get('aws_secret_access_key'),175 aws_access_key_id=init_context.logger_config.get('aws_access_key_id'),176 )177 )...

Full Screen

Full Screen

cloudwatchlogs.py

Source:cloudwatchlogs.py Github

copy

Full Screen

1import boto32from botocore.exceptions import ClientError3class LogStatus:4 def __init__(self, log_group, log_stream):5 self.log_group = log_group6 self.log_stream = log_stream7 self.log_group_exists = None8 self.log_stream_exists = None9 self.upload_token = None10class CloudWatchLogs:11 def __init__(self, client=boto3.client('logs')):12 self.client = client13 def put_log_events(self, request, sequence_token_cache):14 try:15 return self._put_log_events(request, sequence_token_cache)16 except ClientError as e:17 if e.response.get("Error", {}).get("Code") != 'ResourceNotFoundException':18 raise e19 self.prepare(request['logGroupName'], request['logStreamName'])20 return self._put_log_events(request, sequence_token_cache)21 def _put_log_events(self, request, sequence_token_cache):22 cache_key = f"{request['logGroupName']}.{request['logStreamName']}"23 try:24 if cache_key in sequence_token_cache:25 sequence_token = sequence_token_cache[cache_key]26 response = self.client.put_log_events(27 logGroupName=request['logGroupName'],28 logStreamName=request['logStreamName'],29 logEvents=request['logEvents'],30 sequenceToken=sequence_token31 )32 else:33 response = self.client.put_log_events(34 logGroupName=request['logGroupName'],35 logStreamName=request['logStreamName'],36 logEvents=request['logEvents'],37 )38 print(f"{request['logGroupName']}: put {len(request['logEvents'])} log events, next sequence token is {response['nextSequenceToken']}")39 except ClientError as e:40 error_code = e.response.get("Error", {}).get("Code")41 if error_code not in {'InvalidSequenceTokenException', 'DataAlreadyAcceptedException'}:42 raise e43 correct_sequence_token = str(e).split(' ')[-1]44 print(f"{request['logGroupName']}: handling {error_code}, new token is {correct_sequence_token}.")45 if correct_sequence_token != 'null':46 sequence_token_cache[cache_key] = correct_sequence_token47 else:48 sequence_token_cache.pop(cache_key, None)49 response = self._put_log_events(request, sequence_token_cache)50 sequence_token_cache[cache_key] = response['nextSequenceToken']51 return response52 def prepare(self, log_group, log_stream):53 status = LogStatus(log_group=log_group, log_stream=log_stream)54 status = self._get_upload_token_if_possible(status)55 status = self._create_log_group_if_needed(status)56 status = self._create_log_stream_if_needed(status)57 return status.upload_token58 def _get_upload_token_if_possible(self, status):59 if status.upload_token:60 return status61 return self._update_status(status)62 def _create_log_group_if_needed(self, status):63 if status.log_group_exists:64 print(f"Log group/stream exists, obtained sequence token: ${status.upload_token}")65 return status66 self.client.create_log_group(logGroupName=status.log_group)67 return status68 def _create_log_stream_if_needed(self, status):69 if status.log_stream_exists:70 return status71 try:72 self.client.create_log_stream(73 logGroupName=status.log_group,74 logStreamName=status.log_stream,75 )76 status.log_stream_exists = True77 status.upload_token = None78 return status79 except ClientError as e:80 if e.response.get("Error", {}).get("Code") != 'ResourceAlreadyExistsException':81 raise e82 print(f"${status.log_stream}: stream already existed, retrieving sequence token.")83 return self._update_status(status)84 def _update_status(self, status):85 try:86 pages = self.client.get_paginator('describe_log_streams').paginate(87 logGroupName=status.log_group,88 logStreamNamePrefix=status.log_stream89 )90 for page in pages:91 for stream in page:92 if stream['logStreamName'] == status.log_stream:93 status.log_group_exists = True94 status.log_stream_exists = True95 status.upload_token = stream['uploadSequenceToken']96 return status97 print(f"{status.log_group}: Log group/stream exists!")98 return status99 except ClientError as e:100 if e.response.get("Error", {}).get("Code") != 'ResourceNotFoundException':101 raise e102 print(f"{status.log_group}: Log group/stream does not exist!")103 status.log_group_exists = False104 status.log_stream_exists = False...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful