Best Python code snippet using localstack_python
logging.py
Source:logging.py
1import logging.config2import os3from logging import NullHandler4from threading import local5import logstash_formatter6import watchtower7from boto3.session import Session8from gunicorn import glogging9from yaml import safe_load10OPENSHIFT_ENVIRONMENT_NAME_FILE = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"11DEFAULT_AWS_LOGGING_NAMESPACE = "inventory-dev"12DEFAULT_LOGGING_CONFIG_FILE = "logconfig.yaml"13LOGGER_NAME = "inventory"14threadctx = local()15def configure_logging():16 log_config_file = os.getenv("INVENTORY_LOGGING_CONFIG_FILE", DEFAULT_LOGGING_CONFIG_FILE)17 with open(log_config_file) as log_config_file:18 logconfig_dict = safe_load(log_config_file)19 logging.config.dictConfig(logconfig_dict)20 logger = logging.getLogger(LOGGER_NAME)21 log_level = os.getenv("INVENTORY_LOG_LEVEL", "INFO").upper()22 logger.setLevel(log_level)23 # Allows for the log level of certain loggers to be redefined with an env variable24 # e.g. SQLALCHEMY_ENGINE_LOG_LEVEL=DEBUG25 for component in ("sqlalchemy.engine", "urllib3"):26 env_key = component.replace(".", "_").upper()27 level = os.getenv(f"{env_key}_LOG_LEVEL")28 if level:29 logging.getLogger(component).setLevel(level.upper())30def clowder_config():31 import app_common_python32 cfg = app_common_python.LoadedConfig33 if cfg.logging:34 cw = cfg.logging.cloudwatch35 return cw.accessKeyId, cw.secretAccessKey, cw.region, cw.logGroup, False36 else:37 return None, None, None, None, None38def non_clowder_config():39 aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID", None)40 aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY", None)41 aws_region_name = os.getenv("AWS_REGION_NAME", None)42 aws_log_group = os.getenv("AWS_LOG_GROUP", "platform")43 create_log_group = str(os.getenv("AWS_CREATE_LOG_GROUP")).lower() == "true"44 return aws_access_key_id, aws_secret_access_key, aws_region_name, aws_log_group, create_log_group45def cloudwatch_handler():46 if os.environ.get("CLOWDER_ENABLED", "").lower() == "true":47 f = clowder_config48 else:49 f = non_clowder_config50 aws_access_key_id, aws_secret_access_key, aws_region_name, aws_log_group, create_log_group = f()51 if all((aws_access_key_id, aws_secret_access_key, aws_region_name)):52 aws_log_stream = os.getenv("AWS_LOG_STREAM", _get_hostname())53 print(f"Configuring watchtower logging (log_group={aws_log_group}, stream_name={aws_log_stream})")54 boto3_session = Session(55 aws_access_key_id=aws_access_key_id,56 aws_secret_access_key=aws_secret_access_key,57 region_name=aws_region_name,58 )59 return watchtower.CloudWatchLogHandler(60 boto3_session=boto3_session,61 log_group=aws_log_group,62 stream_name=aws_log_stream,63 create_log_group=create_log_group,64 )65 else:66 print("Unable to configure watchtower logging. Please verify watchtower logging configuration!")67 return NullHandler()68def _get_hostname():69 return os.uname().nodename70class ContextualFilter(logging.Filter):71 """72 This filter gets the request_id from the flask request73 and adds it to each log record. This way we do not have74 to explicitly retrieve/pass around the request id for each75 log message.76 """77 def filter(self, log_record):78 try:79 log_record.request_id = threadctx.request_id80 except Exception:81 # TODO: need to decide what to do when you log outside the context82 # of a request83 log_record.request_id = None84 try:85 log_record.account_number = threadctx.account_number86 except Exception:87 # TODO: need to decide what to do when you log outside the context88 # of a request89 log_record.account_number = None90 try:91 log_record.org_id = threadctx.org_id92 except Exception:93 # TODO: need to decide what to do when you log outside the context94 # of a request95 log_record.org_id = None96 return True97class InventoryGunicornLogger(glogging.Logger):98 """99 The logger used by the gunicorn arbiter ignores configuration from100 the logconfig.ini/--log-config. This class is required so that the101 log messages emmitted by the arbiter are routed through the102 logstash formatter. If they do not get routed through the logstash103 formatter, then kibana appears to ignore them. This could cause104 us to lose "WORKER TIMEOUT" error messages, etc.105 """106 def setup(self, cfg):107 super().setup(cfg)108 self._set_handler(self.error_log, cfg.errorlog, logstash_formatter.LogstashFormatterV1())109def get_logger(name):...
test_ensureloggroup.py
Source:test_ensureloggroup.py
1import pytest2import botocore3import ensureloggroup4LOG_GROUP_NAME = 'myLogGroup'5PHYSICAL_RESOURCE_ID = 'someUUID'6@pytest.fixture7def mock_cw_logs(mocker):8 mocker.patch.object(ensureloggroup, 'CW_LOGS')9 return ensureloggroup.CW_LOGS10def test_create_log_group_no_exist(mock_cw_logs, mocker):11 mocker.patch.object(ensureloggroup, 'uuid')12 ensureloggroup.uuid.uuid4.return_value = PHYSICAL_RESOURCE_ID13 response = ensureloggroup.create(_mock_event(), None)14 assert response == {15 'Status': 'SUCCESS',16 'PhysicalResourceId': PHYSICAL_RESOURCE_ID,17 'Data': {18 'LogGroupName': LOG_GROUP_NAME19 }20 }21 ensureloggroup.uuid.uuid4.assert_called()22 ensureloggroup.CW_LOGS.create_log_group.assert_called_with(23 logGroupName=LOG_GROUP_NAME24 )25def test_create_log_group_already_exists(mock_cw_logs, mocker):26 mocker.patch.object(ensureloggroup, 'uuid')27 ensureloggroup.uuid.uuid4.return_value = PHYSICAL_RESOURCE_ID28 ensureloggroup.CW_LOGS.create_log_group.side_effect = botocore.exceptions.ClientError(29 {30 'Error': {31 'Code': 'ResourceAlreadyExistsException'32 }33 },34 None35 )36 response = ensureloggroup.create(_mock_event(), None)37 assert response == {38 'Status': 'SUCCESS',39 'PhysicalResourceId': PHYSICAL_RESOURCE_ID,40 'Data': {41 'LogGroupName': LOG_GROUP_NAME42 }43 }44 ensureloggroup.uuid.uuid4.assert_called()45 ensureloggroup.CW_LOGS.create_log_group.assert_called_with(46 logGroupName=LOG_GROUP_NAME47 )48def test_create_log_group_other_error(mock_cw_logs, mocker):49 mocker.patch.object(ensureloggroup, 'uuid')50 ensureloggroup.uuid.uuid4.return_value = PHYSICAL_RESOURCE_ID51 ensureloggroup.CW_LOGS.create_log_group.side_effect = botocore.exceptions.ClientError(52 {53 'Error': {54 'Code': 'SomethingElse'55 }56 },57 None58 )59 with pytest.raises(botocore.exceptions.ClientError):60 ensureloggroup.create(_mock_event(), None)61def test_update_log_group_no_exist(mock_cw_logs, mocker):62 response = ensureloggroup.update(_mock_event(), None)63 assert response == {64 'Status': 'SUCCESS',65 'PhysicalResourceId': PHYSICAL_RESOURCE_ID,66 'Data': {67 'LogGroupName': LOG_GROUP_NAME68 }69 }70 ensureloggroup.CW_LOGS.create_log_group.assert_called_with(71 logGroupName=LOG_GROUP_NAME72 )73def test_update_log_group_already_exists(mock_cw_logs, mocker):74 ensureloggroup.CW_LOGS.create_log_group.side_effect = botocore.exceptions.ClientError(75 {76 'Error': {77 'Code': 'ResourceAlreadyExistsException'78 }79 },80 None81 )82 response = ensureloggroup.update(_mock_event(), None)83 assert response == {84 'Status': 'SUCCESS',85 'PhysicalResourceId': PHYSICAL_RESOURCE_ID,86 'Data': {87 'LogGroupName': LOG_GROUP_NAME88 }89 }90 ensureloggroup.CW_LOGS.create_log_group.assert_called_with(91 logGroupName=LOG_GROUP_NAME92 )93def test_update_log_group_other_error(mock_cw_logs, mocker):94 ensureloggroup.CW_LOGS.create_log_group.side_effect = botocore.exceptions.ClientError(95 {96 'Error': {97 'Code': 'SomethingElse'98 }99 },100 None101 )102 with pytest.raises(botocore.exceptions.ClientError):103 ensureloggroup.update(_mock_event(), None)104def test_delete(mock_cw_logs, mocker):105 response = ensureloggroup.delete(_mock_event(), None)106 assert response == {107 'Status': 'SUCCESS',108 'PhysicalResourceId': PHYSICAL_RESOURCE_ID,109 'Data': {110 'LogGroupName': None111 }112 }113 ensureloggroup.CW_LOGS.create_log_group.assert_not_called()114def _mock_event():115 return {116 'PhysicalResourceId': PHYSICAL_RESOURCE_ID,117 'ResourceProperties': {118 'LogGroupName': LOG_GROUP_NAME119 }...
puptoo_logging.py
Source:puptoo_logging.py
1import os2import sys3import logging4import socket5from threading import local6from logstash_formatter import LogstashFormatterV17from . import config8threadctx = local()9def clowder_config():10 # Cloudwatch Configuration with Clowder11 if os.environ.get("ACG_CONFIG"):12 import app_common_python13 cfg = app_common_python.LoadedConfig14 if cfg.logging:15 cw = cfg.logging.cloudwatch16 return cw.accessKeyId, cw.secretAccessKey, cw.region, cw.logGroup, False17 else:18 return None, None, None, None, None19def non_clowder_config():20 aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID", None)21 aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY", None)22 aws_region_name = os.getenv("AWS_REGION_NAME", None)23 aws_log_group = os.getenv("AWS_LOG_GROUP", "platform")24 create_log_group = str(os.getenv("AWS_CREATE_LOG_GROUP")).lower() == "true"25 return aws_access_key_id, aws_secret_access_key, aws_region_name, aws_log_group, create_log_group26def initialize_logging():27 kafkalogger = logging.getLogger("kafka")28 kafkalogger.setLevel(config.KAFKA_LOGGER)29 if any("KUBERNETES" in k for k in os.environ):30 handler = logging.StreamHandler(sys.stdout)31 handler.setFormatter(LogstashFormatterV1())32 handler.addFilter(ContextualFilter())33 logging.root.setLevel(os.getenv("LOG_LEVEL", "INFO"))34 logging.root.addHandler(handler)35 else:36 logging.basicConfig(37 level=config.LOG_LEVEL,38 format="%(threadName)s %(levelname)s %(name)s - %(message)s",39 )40 if os.environ.get("ACG_CONFIG"):41 f = clowder_config42 else:43 f = non_clowder_config44 aws_access_key_id, aws_secret_access_key, aws_region_name, aws_log_group, create_log_group = f()45 if all((aws_access_key_id, aws_secret_access_key, aws_region_name, aws_log_group)):46 from boto3.session import Session47 import watchtower48 boto3_session = Session(aws_access_key_id=aws_access_key_id,49 aws_secret_access_key=aws_secret_access_key,50 region_name=aws_region_name)51 # configure logging to use watchtower52 cw_handler = watchtower.CloudWatchLogHandler(boto3_session=boto3_session,53 log_group=aws_log_group,54 stream_name=socket.gethostname(),55 create_log_group=create_log_group)56 cw_handler.setFormatter(LogstashFormatterV1())57 cw_handler.addFilter(ContextualFilter())58 logging.root.addHandler(cw_handler)59 logger = logging.getLogger(config.APP_NAME)60 return logger61class ContextualFilter(logging.Filter):62 """63 This filter gets the request_id from the message and adds it to64 each log record. This way we do not have to explicitly retreive/pass65 around the request_id for each log message66 """67 def filter(self, log_record):68 try:69 log_record.request_id = threadctx.request_id70 except Exception:71 log_record.request_id = "-1"72 try:73 log_record.account = threadctx.account74 except Exception:75 log_record.account = "000001"76 try:77 log_record.org_id = threadctx.org_id78 except Exception:79 log_record.org_id = "000001"...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!