from loguru import logger as guru_logger from common.config.log_settings import log_settings from typing import Dict, Any import socket import threading class LoggerBase: binded_loggers = {} # Stores logger instances loguru_sinks_added = set() # Tracks added log sinks logger_lock = threading.Lock() def __init__(self, logger_name: str, extra_fields: Dict[str, Any] = None) -> None: self.__logger_name = logger_name self.extra_fields = extra_fields or {} with LoggerBase.logger_lock: # ✅ **If already created, reuse it** to prevent duplicates if self.__logger_name in LoggerBase.binded_loggers: self.logger = LoggerBase.binded_loggers[self.__logger_name] return log_filename = f"{log_settings.LOG_BASE_PATH}/{self.__logger_name}.log" # ✅ **Ensure Loguru sink is added only once** if log_filename not in LoggerBase.loguru_sinks_added: guru_logger.add( sink=log_filename, level="INFO", retention=log_settings.LOG_RETENTION, rotation=log_settings.LOG_ROTATION, format="{message}", serialize=True, filter=lambda record: "extra" in record and "topic" in record["extra"] and record["extra"]["topic"] == self.__logger_name, ) LoggerBase.loguru_sinks_added.add(log_filename) # ✅ Mark as added host_name = socket.gethostname() host_ip = socket.gethostbyname(host_name) # ✅ Bind the logger with topic and extra fields self.logger = guru_logger.bind( topic=self.__logger_name, host_ip=host_ip, host_name=host_name, **self.extra_fields, # Include additional metadata ) # ✅ Store reference to prevent duplicate instances LoggerBase.binded_loggers[self.__logger_name] = self.logger async def log_event( self, sender_id: str, receiver_id: str, subject: str, event: str, properties: Dict[str, Any], text: str = "", ) -> None: local_logger = self.logger.bind( sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=properties, ) local_logger.info(text) async def log_exception( self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: Dict[str, Any] = None, ) -> None: local_logger = self.logger.bind( sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=properties, exception=exception, ) local_logger.exception(text) async def log_info( self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: Dict[str, Any] = None, ) -> None: local_logger = self.logger.bind( sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=properties, ) local_logger.info(text) async def log_warning( self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: Dict[str, Any] = None, ) -> None: local_logger = self.logger.bind( sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=properties, ) local_logger.warning(text) async def log_error( self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: Dict[str, Any] = None, ) -> None: local_logger = self.logger.bind( sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=properties, ) local_logger.error(text)