from loguru import logger as guru_logger from common.config.log_settings import log_settings from typing import Dict, Any, Optional import socket import json import threading import os import sys import inspect import logging from common.log.json_sink import JsonSink class LoggerBase: binded_loggers = {} logger_lock = threading.Lock() def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None: self.__logger_name = logger_name self.extra_fileds = extra_fileds with LoggerBase.logger_lock: if self.__logger_name in LoggerBase.binded_loggers: self.logger = LoggerBase.binded_loggers[self.__logger_name] return log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log" log_level = "INFO" rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024) guru_logger.remove() file_sink = JsonSink( log_file_path=log_filename, rotation_size_bytes=rotation_bytes, max_backup_files=log_settings.MAX_BACKUP_FILES ) guru_logger.add( sink=file_sink, level=log_level, filter=lambda record: record["extra"].get("topic") == self.__logger_name, ) guru_logger.add( sink=sys.stderr, level=log_level, format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}", filter=lambda record: record["extra"].get("topic") == self.__logger_name, ) host_name = socket.gethostname() host_ip = socket.gethostbyname(host_name) self.logger = guru_logger.bind( topic=self.__logger_name, host_ip=host_ip, host_name=host_name, app=log_settings.APP_NAME, env=log_settings.ENVIRONMENT, ) with LoggerBase.logger_lock: LoggerBase.binded_loggers[self.__logger_name] = self.logger def _get_log_context(self) -> dict: frame = inspect.currentframe().f_back.f_back filename = os.path.basename(frame.f_code.co_filename) lineno = frame.f_lineno return {"log_file": filename, "log_line": lineno} def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]: props = {} if properties is None else properties.copy() props_str = json.dumps(props, ensure_ascii=False) if props else "{}" return props, props_str async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None: props, props_str = self._prepare_properties(properties) context = self._get_log_context() local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context) local_logger.info(text) async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None: props, props_str = self._prepare_properties(properties) context = self._get_log_context() local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context) local_logger.exception(text) async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: props, props_str = self._prepare_properties(properties) context = self._get_log_context() local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context) local_logger.info(text) async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: props, props_str = self._prepare_properties(properties) context = self._get_log_context() local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context) local_logger.warning(text) async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: props, props_str = self._prepare_properties(properties) context = self._get_log_context() local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context) local_logger.error(text) @staticmethod def configure_uvicorn_logging(): print("📢 Setting up uvicorn logging interception...") # Intercept logs from these loggers intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] class InterceptHandler(logging.Handler): def emit(self, record): level = ( guru_logger.level(record.levelname).name if guru_logger.level(record.levelname, None) else record.levelno ) frame, depth = logging.currentframe(), 2 while frame.f_code.co_filename == logging.__file__: frame = frame.f_back depth += 1 guru_logger.opt(depth=depth, exception=record.exc_info).log( level, f"[{record.name}] {record.getMessage()}", ) # Replace default handlers logging.root.handlers.clear() logging.root.setLevel(logging.INFO) logging.root.handlers = [InterceptHandler()] # Configure specific uvicorn loggers for logger_name in intercept_loggers: logging_logger = logging.getLogger(logger_name) logging_logger.handlers.clear() # Remove default handlers logging_logger.propagate = True # Ensure propagation through Loguru