How to use get_log_group_name method in localstack

Best Python code snippet using localstack_python

RDSCreateMetricsFromEnhancedMonitoring.py

Source:RDSCreateMetricsFromEnhancedMonitoring.py Github

copy

Full Screen

...38 },39 ]40 )41# This function gets the log group name from the RDS instance-42def get_log_group_name(instance_name):43 client = (boto3.client('rds', region_name=region) if region else boto3.client('rds'))44 rds_instances = client.describe_db_instances()45 for rds_instance in rds_instances['DBInstances']:46 if rds_instance['DBInstanceIdentifier'] == instance_name:47 log_group = rds_instance['EnhancedMonitoringResourceArn']48 return log_group.split(':')[6]49# This function returns a filter pattern based on the RDS instance id50def return_filter_pattern():51 return '{($.instanceID=\"' + rds_instance_name + '\")}'52# This function returns a line of the log to retrieve the fields53def return_event_log_example():54 client = (boto3.client('rds', region_name=region) if region else boto3.client('rds'))55 rds_instance = client.describe_db_instances()56 client = (boto3.client('logs', region_name=region) if region else boto3.client('logs'))57 event_log_example = client.get_log_events(58 logGroupName=get_log_group_name(rds_instance_name),59 logStreamName=rds_instance['DBInstances'][0]['DbiResourceId'],60 limit=1,61 startFromHead=True62 )63 return event_log_example64# ensures that the instance exists.65def does_this_rds_instance_exists(instance_name):66 client = (boto3.client('rds', region_name=region) if region else boto3.client('rds'))67 rds_instances = client.describe_db_instances()68 for rds_instance in rds_instances['DBInstances']:69 if rds_instance['DBInstanceIdentifier'] == instance_name:70 return True71 return False72# Read a sample of the event log and selects which metrics are going to be populated with data73# (some of them are string values and we don't populate them)74def populate_metrics():75 if not metrics_to_filter:76 json_data = json.loads(return_event_log_example()['events'][0]['message'])77 for item in json_data:78 if item not in ['engine', 'instanceID', 'instanceResourceID', 'timestamp', 'version', 'uptime', 'numVCPUs',79 'processList']:80 if isinstance(json_data[item], dict):81 subitem_list = json_data[item]82 else:83 subitem_list = json_data[item][0]84 for subitem in subitem_list:85 if not isinstance(subitem_list[subitem], str):86 create_filter_and_metric('$.' + item + '.' + subitem, get_log_group_name(rds_instance_name),87 rds_instance_name)88 else:89 for metric_to_filter in metrics_to_filter:90 create_filter_and_metric('$.' + metric_to_filter, get_log_group_name(rds_instance_name), rds_instance_name)91# Main92try:93 input = raw_input94except NameError:95 pass96print('WARNING: This script will create additional resources in your AWS account such as custom metrics, which may result in additional charges. Do you wish to continue? Y/N')97warning_choice = input('Do you understand and accept the above warning? yes/no: ').lower()98if warning_choice != 'yes':99 print('To accept, you must type \"yes\". Please run the script again if you wish to proceed.')100 sys.exit()101print102parser = argparse.ArgumentParser(103 description='Create metrics filters based on the Enhanced Monitoring of RDS instances.')104parser.add_argument('-i', '--rds_instance', help='RDS instance name (DBInstanceIdentifier)', required=True)...

Full Screen

Full Screen

aws_logs.py

Source:aws_logs.py Github

copy

Full Screen

...5 name = 'aws_logs'6 dependencies = ['aws', 'aws_envs']7 service_name = 'logs'8 required_keys = ['namespace']9 def get_log_group_name(self, app_env=None):10 """11 Returns log group name for specified or current environment.12 :param app_env: str | None application environment13 :return: str log group name14 """15 return '{namespace}/{env_name}'.format(16 namespace=self.config['namespace'],17 env_name=app_env or self.get_current_env(),18 )19 def create_log_group(self, name):20 """21 Creates log group22 :param name: str log group name23 """24 response = self.client.create_log_group(25 logGroupName=name,26 tags={27 'chops-project': self.app.config['project_name'],28 'chops-aws-project': self.app.config['aws']['project_name'],29 }30 )31 assert response['ResponseMetadata']['HTTPStatusCode'] == 20032 def delete_log_group(self, name):33 """34 Deletes log group35 :param name: str log group name36 """37 response = self.client.delete_log_group(logGroupName=name)38 assert response['ResponseMetadata']['HTTPStatusCode'] == 20039 def get_tasks(self):40 @task(iterable=['env'])41 def create_group(ctx, env=None):42 """43 Creates log group for current or specified environment[s].44 Use --env=* for all environments45 """46 for app_env in self.envs_from_string(env):47 log_group_name = self.get_log_group_name(app_env)48 self.create_log_group(log_group_name)49 ctx.info('Log group "{}" successfully created.'.format(log_group_name))50 @task(iterable=['env'])51 def delete_group(ctx, env=None):52 """53 Delete log group for current or specified environment[s].54 Use --env=* for all environments55 """56 for app_env in self.envs_from_string(env):57 log_group_name = self.get_log_group_name(app_env)58 self.delete_log_group(log_group_name)59 ctx.info('Log group "{}" successfully deleted.'.format(log_group_name))60 return [create_group, delete_group]61class AwsLogsPluginMixin:62 def get_log_group_name(self, env=None):63 return self.app.plugins['aws_logs'].get_log_group_name(env)...

Full Screen

Full Screen

cloudwatch.py

Source:cloudwatch.py Github

copy

Full Screen

...7LAMBDA_CLIENT_LOGS = client("logs")8def get_logs(9 query: str, lambda_name: str, start_time: Timestamp, retry_count: int = 32, sleep_per_loop: int = 2010) -> str:11 log_group_name = get_log_group_name(lambda_name)12 logs_found = False13 counter = 014 while logs_found is False:15 start_query_response = LAMBDA_CLIENT_LOGS.start_query(16 logGroupName=log_group_name,17 startTime=int(start_time),18 endTime=int(datetime.now().timestamp()),19 queryString=query,20 )21 query_id = start_query_response["queryId"]22 response = None23 while response is None or response["status"] != "Complete":24 sleep(sleep_per_loop)25 response = LAMBDA_CLIENT_LOGS.get_query_results(queryId=query_id)26 counter += 127 if response["results"] != []:28 logs_found = True29 elif counter == retry_count:30 raise ValueError("Log search retries exceeded.. no logs found")31 return dumps(response, indent=2)32def negative_log_check(query: str, event_lambda: str, start_time: Timestamp) -> str:33 log_group_name = get_log_group_name(event_lambda)34 start_query_response = LAMBDA_CLIENT_LOGS.start_query(35 logGroupName=log_group_name,36 startTime=int(start_time),37 endTime=int(datetime.now().timestamp()),38 queryString=query,39 )40 query_id = start_query_response["queryId"]41 sleep(30)42 response = LAMBDA_CLIENT_LOGS.get_query_results(queryId=query_id)43 if response["results"] == []:44 return True45 else:46 raise ValueError("Matching logs have been found")47def get_log_group_name(lambda_name: str) -> str:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful