diff --git a/.gitignore b/.gitignore index 5a50a9c..b022725 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ *.pyc freedev.code-workspace .idea/ +.pytest_cache/ \ No newline at end of file diff --git a/apps/authentication/common/config/app_settings.py b/apps/authentication/common/config/app_settings.py index 35ccca9..11d8957 100644 --- a/apps/authentication/common/config/app_settings.py +++ b/apps/authentication/common/config/app_settings.py @@ -4,6 +4,7 @@ from pydantic_settings import BaseSettings class AppSettings(BaseSettings): NAME: str = "authentication" APP_NAME: str = NAME + APP_ENV: str = "alpha" METRICS_ENABLED: bool = False PROBES_ENABLED: bool = True diff --git a/apps/authentication/common/config/log_settings.py b/apps/authentication/common/config/log_settings.py index 0b8823a..7995968 100644 --- a/apps/authentication/common/config/log_settings.py +++ b/apps/authentication/common/config/log_settings.py @@ -1,17 +1,17 @@ - -from pydantic_settings import BaseSettings +import os +from dataclasses import dataclass from .app_settings import app_settings -class LogSettings(BaseSettings): - LOG_LEVEL: str = "DEBUG" - LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH - LOG_PATH: str = LOG_PATH_BASE + '/' + app_settings.BACKEND_LOG_FILE_NAME + '.log' - LOG_RETENTION: str = "14 days" - LOG_ROTATION: str = "00:00" # mid night - class Config: - env_file = ".log.env" - env_file_encoding = "utf-8" +@dataclass +class LogSettings: + LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH + LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days") + LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight + MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5)) + LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB + APP_NAME: str = app_settings.APP_NAME + ENVIRONMENT: str = app_settings.APP_ENV log_settings = LogSettings() diff --git a/apps/authentication/common/log/application_logger.py b/apps/authentication/common/log/application_logger.py index 51d2cba..67ec321 100644 --- a/apps/authentication/common/log/application_logger.py +++ b/apps/authentication/common/log/application_logger.py @@ -1,14 +1,12 @@ from .base_logger import LoggerBase from common.config.app_settings import app_settings -import json - class ApplicationLogger(LoggerBase): def __init__(self, application_activities: dict[str, any] = {}) -> None: - extra_fields = {} + extra_fileds = {} if application_activities: - extra_fields.update(application_activities) + extra_fileds.update(application_activities) super().__init__( logger_name=app_settings.APPLICATION_ACTIVITY_LOG, - extra_fields=extra_fields, + extra_fileds=extra_fileds, ) diff --git a/apps/authentication/common/log/base_logger.py b/apps/authentication/common/log/base_logger.py index 31d3c97..24f7bb0 100644 --- a/apps/authentication/common/log/base_logger.py +++ b/apps/authentication/common/log/base_logger.py @@ -1,140 +1,136 @@ from loguru import logger as guru_logger from common.config.log_settings import log_settings -from typing import Dict, Any +from typing import Dict, Any, Optional import socket +import json import threading +import os +import sys +import inspect +import logging +from common.log.json_sink import JsonSink class LoggerBase: - binded_loggers = {} # Stores logger instances - loguru_sinks_added = set() # Tracks added log sinks + binded_loggers = {} logger_lock = threading.Lock() - def __init__(self, logger_name: str, extra_fields: Dict[str, Any] = None) -> None: + def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None: self.__logger_name = logger_name - self.extra_fields = extra_fields or {} - + self.extra_fileds = extra_fileds with LoggerBase.logger_lock: - # ✅ **If already created, reuse it** to prevent duplicates if self.__logger_name in LoggerBase.binded_loggers: self.logger = LoggerBase.binded_loggers[self.__logger_name] return - log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log" + log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log" + log_level = "INFO" + rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024) - # ✅ **Ensure Loguru sink is added only once** - if log_filename not in LoggerBase.loguru_sinks_added: - guru_logger.add( - sink=log_filename, - level="INFO", - retention=log_settings.LOG_RETENTION, - rotation=log_settings.LOG_ROTATION, - format="{message}", - serialize=True, - filter=lambda record: "extra" in record - and "topic" in record["extra"] - and record["extra"]["topic"] == self.__logger_name, - ) - LoggerBase.loguru_sinks_added.add(log_filename) # ✅ Mark as added + guru_logger.remove() - host_name = socket.gethostname() - host_ip = socket.gethostbyname(host_name) - - # ✅ Bind the logger with topic and extra fields - self.logger = guru_logger.bind( - topic=self.__logger_name, - host_ip=host_ip, - host_name=host_name, - **self.extra_fields, # Include additional metadata + file_sink = JsonSink( + log_file_path=log_filename, + rotation_size_bytes=rotation_bytes, + max_backup_files=log_settings.MAX_BACKUP_FILES ) + guru_logger.add( + sink=file_sink, + level=log_level, + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) - # ✅ Store reference to prevent duplicate instances + guru_logger.add( + sink=sys.stderr, + level=log_level, + format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}", + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) + + host_name = socket.gethostname() + host_ip = socket.gethostbyname(host_name) + self.logger = guru_logger.bind( + topic=self.__logger_name, + host_ip=host_ip, + host_name=host_name, + app=log_settings.APP_NAME, + env=log_settings.ENVIRONMENT, + ) + with LoggerBase.logger_lock: LoggerBase.binded_loggers[self.__logger_name] = self.logger - async def log_event( - self, - sender_id: str, - receiver_id: str, - subject: str, - event: str, - properties: Dict[str, Any], - text: str = "", - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event=event, - properties=properties, - ) + def _get_log_context(self) -> dict: + frame = inspect.currentframe().f_back.f_back + filename = os.path.basename(frame.f_code.co_filename) + lineno = frame.f_lineno + return {"log_file": filename, "log_line": lineno} + + def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]: + props = {} if properties is None else properties.copy() + props_str = json.dumps(props, ensure_ascii=False) if props else "{}" + return props, props_str + + async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context) local_logger.info(text) - async def log_exception( - self, - sender_id: str, - receiver_id: str, - subject: str, - exception: Exception, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="exception", - properties=properties, - exception=exception, - ) + async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context) local_logger.exception(text) - async def log_info( - self, - sender_id: str, - receiver_id: str, - subject: str, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="information", - properties=properties, - ) + async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context) local_logger.info(text) - async def log_warning( - self, - sender_id: str, - receiver_id: str, - subject: str, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="warning", - properties=properties, - ) + async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context) local_logger.warning(text) - async def log_error( - self, - sender_id: str, - receiver_id: str, - subject: str, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="error", - properties=properties, - ) + async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context) local_logger.error(text) + + @staticmethod + def configure_uvicorn_logging(): + print("📢 Setting up uvicorn logging interception...") + + # Intercept logs from these loggers + intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] + + class InterceptHandler(logging.Handler): + def emit(self, record): + level = ( + guru_logger.level(record.levelname).name + if guru_logger.level(record.levelname, None) + else record.levelno + ) + frame, depth = logging.currentframe(), 2 + while frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + guru_logger.opt(depth=depth, exception=record.exc_info).log( + level, + f"[{record.name}] {record.getMessage()}", + ) + + # Replace default handlers + logging.root.handlers.clear() + logging.root.setLevel(logging.INFO) + logging.root.handlers = [InterceptHandler()] + + # Configure specific uvicorn loggers + for logger_name in intercept_loggers: + logging_logger = logging.getLogger(logger_name) + logging_logger.handlers.clear() # Remove default handlers + logging_logger.propagate = True # Ensure propagation through Loguru diff --git a/apps/authentication/common/log/business_metric_logger.py b/apps/authentication/common/log/business_metric_logger.py new file mode 100644 index 0000000..95383ab --- /dev/null +++ b/apps/authentication/common/log/business_metric_logger.py @@ -0,0 +1,25 @@ +from .base_logger import LoggerBase +from common.config.app_settings import app_settings +import json + + +class BusinessMetricLogger(LoggerBase): + def __init__(self, business_metrics: dict[str, any] = {}) -> None: + extra_fileds = {} + if business_metrics: + extra_fileds.update(business_metrics) + super().__init__( + logger_name=app_settings.BUSINESS_METRIC_LOG, + extra_fileds=extra_fileds, + ) + + + async def log_metrics(self, business_metrics: dict[str, any] = {}) -> None: + return await super().log_event( + sender_id="business_metric_manager", + receiver_id="business_metric_logger", + subject="metrics", + event="logging", + properties=business_metrics, + text="business metric logged" + ) diff --git a/apps/authentication/common/log/json_sink.py b/apps/authentication/common/log/json_sink.py new file mode 100644 index 0000000..a798156 --- /dev/null +++ b/apps/authentication/common/log/json_sink.py @@ -0,0 +1,85 @@ +import json +import datetime +import traceback +from pathlib import Path +from typing import Optional + +class JsonSink: + def __init__( + self, + log_file_path: str, + rotation_size_bytes: int = 10 * 1024 * 1024, + max_backup_files: int = 5, + ): + self.log_file_path = Path(log_file_path) + self.rotation_size = rotation_size_bytes + self.max_backup_files = max_backup_files + self._open_log_file() + + def _open_log_file(self): + # ensure the parent directory exists + parent_dir = self.log_file_path.parent + if not parent_dir.exists(): + parent_dir.mkdir(parents=True, exist_ok=True) + self.log_file = self.log_file_path.open("a", encoding="utf-8") + + def _should_rotate(self) -> bool: + return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size + + def _rotate(self): + self.log_file.close() + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}") + self.log_file_path.rename(rotated_path) + self._cleanup_old_backups() + self._open_log_file() + + def _cleanup_old_backups(self): + parent = self.log_file_path.parent + stem = self.log_file_path.stem + suffix = self.log_file_path.suffix + + backup_files = sorted( + parent.glob(f"{stem}_*{suffix}"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + + for old_file in backup_files[self.max_backup_files:]: + try: + old_file.unlink() + except Exception as e: + print(f"Failed to delete old backup {old_file}: {e}") + + def __call__(self, message): + record = message.record + if self._should_rotate(): + self._rotate() + + log_entry = { + "level": record["level"].name.lower(), + "timestamp": int(record["time"].timestamp() * 1000), + "text": record["message"], + "fields": record["extra"].get("properties", {}), + "context": { + "app": record["extra"].get("app"), + "env": record["extra"].get("env"), + "log_file": record["extra"].get("log_file"), + "log_line": record["extra"].get("log_line"), + "topic": record["extra"].get("topic"), + "sender_id": record["extra"].get("sender_id"), + "receiver_id": record["extra"].get("receiver_id"), + "subject": record["extra"].get("subject"), + "event": record["extra"].get("event"), + "host_ip": record["extra"].get("host_ip"), + "host_name": record["extra"].get("host_name"), + }, + "stacktrace": None + } + + if record["exception"]: + exc_type, exc_value, exc_tb = record["exception"] + log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb) + + self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n") + self.log_file.flush() diff --git a/apps/authentication/common/log/user_logger.py b/apps/authentication/common/log/user_logger.py new file mode 100644 index 0000000..d931975 --- /dev/null +++ b/apps/authentication/common/log/user_logger.py @@ -0,0 +1,14 @@ +from .base_logger import LoggerBase +from common.config.app_settings import app_settings + +import json + + +class UserLogger(LoggerBase): + def __init__(self, user_activities: dict[str, any] = {}) -> None: + extra_fileds = {} + if user_activities: + extra_fileds.update(user_activities) + super().__init__( + logger_name=app_settings.USER_ACTIVITY_LOG, extra_fileds=extra_fileds + ) diff --git a/apps/authentication/webapi/providers/logger.py b/apps/authentication/webapi/providers/logger.py index afcab10..6eb1e22 100644 --- a/apps/authentication/webapi/providers/logger.py +++ b/apps/authentication/webapi/providers/logger.py @@ -1,47 +1,8 @@ -import logging -import sys from loguru import logger as guru_logger +from common.log.base_logger import LoggerBase def register_logger(): print("📢 Setting up logging interception...") - - # 🔴 **Ensure Uvicorn Logs Are Captured** - intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] - - class InterceptHandler(logging.Handler): - def emit(self, record): - level = ( - guru_logger.level(record.levelname).name - if guru_logger.level(record.levelname, None) - else record.levelno - ) - frame, depth = logging.currentframe(), 2 - while frame.f_code.co_filename == logging.__file__: - frame = frame.f_back - depth += 1 - - guru_logger.opt(depth=depth, exception=record.exc_info).log( - level, - f"[{record.name}] {record.getMessage()}", - ) - - # 🔴 **Replace Existing Loggers with Interception** - logging.root.handlers.clear() - logging.root.setLevel(logging.INFO) - logging.root.handlers = [InterceptHandler()] - - for logger_name in intercept_loggers: - logging_logger = logging.getLogger(logger_name) - logging_logger.handlers.clear() # Remove Uvicorn default handlers - logging_logger.propagate = True # ✅ Ensure they propagate through Loguru - - # 🔴 **Redirect stdout/stderr to Loguru (Keep Green Timestamps)** - guru_logger.remove() - guru_logger.add( - sys.stdout, - level="INFO", - format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {message}", - ) - + LoggerBase.configure_uvicorn_logging() print("✅ Logging interception complete. Logs are formatted and deduplicated!") diff --git a/apps/central_storage/common/config/app_settings.py b/apps/central_storage/common/config/app_settings.py index d4639f0..9b5d516 100644 --- a/apps/central_storage/common/config/app_settings.py +++ b/apps/central_storage/common/config/app_settings.py @@ -4,7 +4,8 @@ from pydantic_settings import BaseSettings class AppSettings(BaseSettings): NAME: str = "central_storage" - APP_NAME:str = NAME + APP_NAME: str = NAME + APP_ENV: str = "alpha" METRICS_ENABLED: bool = False PROBES_ENABLED: bool = True diff --git a/apps/central_storage/common/config/log_settings.py b/apps/central_storage/common/config/log_settings.py index 48b97a4..7995968 100644 --- a/apps/central_storage/common/config/log_settings.py +++ b/apps/central_storage/common/config/log_settings.py @@ -1,17 +1,17 @@ - -from pydantic_settings import BaseSettings +import os +from dataclasses import dataclass from .app_settings import app_settings -class LogSettings(BaseSettings): - LOG_LEVEL: str = "DEBUG" - LOG_BASE_PATH: str = app_settings.LOG_BASE_PATH - LOG_PATH: str = LOG_BASE_PATH + '/' + app_settings.BACKEND_LOG_FILE_NAME + '.log' - LOG_RETENTION: str = "14 days" - LOG_ROTATION: str = "00:00" # mid night - class Config: - env_file = ".log.env" - env_file_encoding = "utf-8" +@dataclass +class LogSettings: + LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH + LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days") + LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight + MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5)) + LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB + APP_NAME: str = app_settings.APP_NAME + ENVIRONMENT: str = app_settings.APP_ENV log_settings = LogSettings() diff --git a/apps/central_storage/common/log/__init__.py b/apps/central_storage/common/log/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/central_storage/common/log/application_logger.py b/apps/central_storage/common/log/application_logger.py new file mode 100644 index 0000000..67ec321 --- /dev/null +++ b/apps/central_storage/common/log/application_logger.py @@ -0,0 +1,12 @@ +from .base_logger import LoggerBase +from common.config.app_settings import app_settings + +class ApplicationLogger(LoggerBase): + def __init__(self, application_activities: dict[str, any] = {}) -> None: + extra_fileds = {} + if application_activities: + extra_fileds.update(application_activities) + super().__init__( + logger_name=app_settings.APPLICATION_ACTIVITY_LOG, + extra_fileds=extra_fileds, + ) diff --git a/apps/central_storage/common/log/base_logger.py b/apps/central_storage/common/log/base_logger.py new file mode 100644 index 0000000..24f7bb0 --- /dev/null +++ b/apps/central_storage/common/log/base_logger.py @@ -0,0 +1,136 @@ +from loguru import logger as guru_logger +from common.config.log_settings import log_settings +from typing import Dict, Any, Optional +import socket +import json +import threading +import os +import sys +import inspect +import logging + +from common.log.json_sink import JsonSink + +class LoggerBase: + binded_loggers = {} + logger_lock = threading.Lock() + + def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None: + self.__logger_name = logger_name + self.extra_fileds = extra_fileds + with LoggerBase.logger_lock: + if self.__logger_name in LoggerBase.binded_loggers: + self.logger = LoggerBase.binded_loggers[self.__logger_name] + return + + log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log" + log_level = "INFO" + rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024) + + guru_logger.remove() + + file_sink = JsonSink( + log_file_path=log_filename, + rotation_size_bytes=rotation_bytes, + max_backup_files=log_settings.MAX_BACKUP_FILES + ) + guru_logger.add( + sink=file_sink, + level=log_level, + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) + + guru_logger.add( + sink=sys.stderr, + level=log_level, + format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}", + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) + + host_name = socket.gethostname() + host_ip = socket.gethostbyname(host_name) + self.logger = guru_logger.bind( + topic=self.__logger_name, + host_ip=host_ip, + host_name=host_name, + app=log_settings.APP_NAME, + env=log_settings.ENVIRONMENT, + ) + with LoggerBase.logger_lock: + LoggerBase.binded_loggers[self.__logger_name] = self.logger + + def _get_log_context(self) -> dict: + frame = inspect.currentframe().f_back.f_back + filename = os.path.basename(frame.f_code.co_filename) + lineno = frame.f_lineno + return {"log_file": filename, "log_line": lineno} + + def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]: + props = {} if properties is None else properties.copy() + props_str = json.dumps(props, ensure_ascii=False) if props else "{}" + return props, props_str + + async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context) + local_logger.info(text) + + async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context) + local_logger.exception(text) + + async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context) + local_logger.info(text) + + async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context) + local_logger.warning(text) + + async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context) + local_logger.error(text) + + @staticmethod + def configure_uvicorn_logging(): + print("📢 Setting up uvicorn logging interception...") + + # Intercept logs from these loggers + intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] + + class InterceptHandler(logging.Handler): + def emit(self, record): + level = ( + guru_logger.level(record.levelname).name + if guru_logger.level(record.levelname, None) + else record.levelno + ) + frame, depth = logging.currentframe(), 2 + while frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + guru_logger.opt(depth=depth, exception=record.exc_info).log( + level, + f"[{record.name}] {record.getMessage()}", + ) + + # Replace default handlers + logging.root.handlers.clear() + logging.root.setLevel(logging.INFO) + logging.root.handlers = [InterceptHandler()] + + # Configure specific uvicorn loggers + for logger_name in intercept_loggers: + logging_logger = logging.getLogger(logger_name) + logging_logger.handlers.clear() # Remove default handlers + logging_logger.propagate = True # Ensure propagation through Loguru diff --git a/apps/central_storage/common/log/business_metric_logger.py b/apps/central_storage/common/log/business_metric_logger.py new file mode 100644 index 0000000..95383ab --- /dev/null +++ b/apps/central_storage/common/log/business_metric_logger.py @@ -0,0 +1,25 @@ +from .base_logger import LoggerBase +from common.config.app_settings import app_settings +import json + + +class BusinessMetricLogger(LoggerBase): + def __init__(self, business_metrics: dict[str, any] = {}) -> None: + extra_fileds = {} + if business_metrics: + extra_fileds.update(business_metrics) + super().__init__( + logger_name=app_settings.BUSINESS_METRIC_LOG, + extra_fileds=extra_fileds, + ) + + + async def log_metrics(self, business_metrics: dict[str, any] = {}) -> None: + return await super().log_event( + sender_id="business_metric_manager", + receiver_id="business_metric_logger", + subject="metrics", + event="logging", + properties=business_metrics, + text="business metric logged" + ) diff --git a/apps/central_storage/common/log/function_logger.py b/apps/central_storage/common/log/function_logger.py new file mode 100644 index 0000000..4388a5e --- /dev/null +++ b/apps/central_storage/common/log/function_logger.py @@ -0,0 +1,50 @@ +from .application_logger import ApplicationLogger + + +class FunctionLogger(ApplicationLogger): + def __init__(self, sender_id: str, receiver_id:str) -> None: + super().__init__() + self.event_sender_id = sender_id + self.event_receiver_id = receiver_id + self.event_subject = "function" + + async def log_enter(self, function: str, file: str): + return await super().log_event( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + event="enter", + properties={ + "function": function, + "file": file, + }, + text="Enter:{} of {}".format(function, file) + ) + + async def log_exit(self, function: str, file: str, excution_time_in_ns: int): + return await super().log_event( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + event="exit", + properties={ + "function": function, + "file": file, + "excution_time_in_ns": excution_time_in_ns + }, + text="Exit:{} of {}".format(function, file) + ) + + async def log_exception(self, exception: Exception, function: str, file: str, excution_time_in_ns: int) -> None: + return await super().log_exception( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + exception=exception, + text="Exception:{} of {}".format(function, file), + properties={ + "function": function, + "file": file, + "excution_time_in_ns": excution_time_in_ns + }, + ) diff --git a/apps/central_storage/common/log/json_sink.py b/apps/central_storage/common/log/json_sink.py new file mode 100644 index 0000000..a798156 --- /dev/null +++ b/apps/central_storage/common/log/json_sink.py @@ -0,0 +1,85 @@ +import json +import datetime +import traceback +from pathlib import Path +from typing import Optional + +class JsonSink: + def __init__( + self, + log_file_path: str, + rotation_size_bytes: int = 10 * 1024 * 1024, + max_backup_files: int = 5, + ): + self.log_file_path = Path(log_file_path) + self.rotation_size = rotation_size_bytes + self.max_backup_files = max_backup_files + self._open_log_file() + + def _open_log_file(self): + # ensure the parent directory exists + parent_dir = self.log_file_path.parent + if not parent_dir.exists(): + parent_dir.mkdir(parents=True, exist_ok=True) + self.log_file = self.log_file_path.open("a", encoding="utf-8") + + def _should_rotate(self) -> bool: + return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size + + def _rotate(self): + self.log_file.close() + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}") + self.log_file_path.rename(rotated_path) + self._cleanup_old_backups() + self._open_log_file() + + def _cleanup_old_backups(self): + parent = self.log_file_path.parent + stem = self.log_file_path.stem + suffix = self.log_file_path.suffix + + backup_files = sorted( + parent.glob(f"{stem}_*{suffix}"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + + for old_file in backup_files[self.max_backup_files:]: + try: + old_file.unlink() + except Exception as e: + print(f"Failed to delete old backup {old_file}: {e}") + + def __call__(self, message): + record = message.record + if self._should_rotate(): + self._rotate() + + log_entry = { + "level": record["level"].name.lower(), + "timestamp": int(record["time"].timestamp() * 1000), + "text": record["message"], + "fields": record["extra"].get("properties", {}), + "context": { + "app": record["extra"].get("app"), + "env": record["extra"].get("env"), + "log_file": record["extra"].get("log_file"), + "log_line": record["extra"].get("log_line"), + "topic": record["extra"].get("topic"), + "sender_id": record["extra"].get("sender_id"), + "receiver_id": record["extra"].get("receiver_id"), + "subject": record["extra"].get("subject"), + "event": record["extra"].get("event"), + "host_ip": record["extra"].get("host_ip"), + "host_name": record["extra"].get("host_name"), + }, + "stacktrace": None + } + + if record["exception"]: + exc_type, exc_value, exc_tb = record["exception"] + log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb) + + self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n") + self.log_file.flush() diff --git a/apps/central_storage/common/log/log_utils.py b/apps/central_storage/common/log/log_utils.py new file mode 100644 index 0000000..579dee8 --- /dev/null +++ b/apps/central_storage/common/log/log_utils.py @@ -0,0 +1,25 @@ +import os +from .function_logger import FunctionLogger +import time +import functools + + +def log_entry_exit_async(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + file_path = os.path.relpath(func.__code__.co_filename) + function_logger = FunctionLogger(sender_id="log_entry_exit_async", receiver_id="function_logger") + start_time = time.process_time_ns() + try: + await function_logger.log_enter(func.__name__, file_path) + result = await func(*args, **kwargs) + await function_logger.log_exit(func.__name__, file_path, time.process_time_ns() - start_time) + return result + except Exception as exception: + await function_logger.log_exception( + exception=exception, + function=func.__name__, + file=file_path, + excution_time_in_ns=time.process_time_ns() - start_time) + raise + return wrapper diff --git a/apps/central_storage/common/log/module_logger.py b/apps/central_storage/common/log/module_logger.py new file mode 100644 index 0000000..3426b0b --- /dev/null +++ b/apps/central_storage/common/log/module_logger.py @@ -0,0 +1,46 @@ +from .application_logger import ApplicationLogger + + +class ModuleLogger(ApplicationLogger): + def __init__(self, sender_id: str) -> None: + super().__init__() + self.event_sender_id = sender_id + self.event_receiver_id = "ModuleLogger" + self.event_subject = "module" + + async def log_exception(self, exception: Exception, text: str = "Exception", properties: dict[str, any] = None) -> None: + return await super().log_exception( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + exception=exception, + text=text, + properties=properties, + ) + + async def log_info(self, info: str, properties: dict[str, any] = None) -> None: + return await super().log_info( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + text=info, + properties=properties, + ) + + async def log_warning(self, warning: str, properties: dict[str, any] = None) -> None: + return await super().log_warning( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + text=warning, + properties=properties, + ) + + async def log_error(self, error: str, properties: dict[str, any] = None) -> None: + return await super().log_error( + sender_id=self.event_sender_id, + receiver_id=self.event_receiver_id, + subject=self.event_subject, + text=error, + properties=properties, + ) diff --git a/apps/central_storage/common/log/user_logger.py b/apps/central_storage/common/log/user_logger.py new file mode 100644 index 0000000..d931975 --- /dev/null +++ b/apps/central_storage/common/log/user_logger.py @@ -0,0 +1,14 @@ +from .base_logger import LoggerBase +from common.config.app_settings import app_settings + +import json + + +class UserLogger(LoggerBase): + def __init__(self, user_activities: dict[str, any] = {}) -> None: + extra_fileds = {} + if user_activities: + extra_fileds.update(user_activities) + super().__init__( + logger_name=app_settings.USER_ACTIVITY_LOG, extra_fileds=extra_fileds + ) diff --git a/apps/central_storage/webapi/providers/logger.py b/apps/central_storage/webapi/providers/logger.py index afcab10..6eb1e22 100644 --- a/apps/central_storage/webapi/providers/logger.py +++ b/apps/central_storage/webapi/providers/logger.py @@ -1,47 +1,8 @@ -import logging -import sys from loguru import logger as guru_logger +from common.log.base_logger import LoggerBase def register_logger(): print("📢 Setting up logging interception...") - - # 🔴 **Ensure Uvicorn Logs Are Captured** - intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] - - class InterceptHandler(logging.Handler): - def emit(self, record): - level = ( - guru_logger.level(record.levelname).name - if guru_logger.level(record.levelname, None) - else record.levelno - ) - frame, depth = logging.currentframe(), 2 - while frame.f_code.co_filename == logging.__file__: - frame = frame.f_back - depth += 1 - - guru_logger.opt(depth=depth, exception=record.exc_info).log( - level, - f"[{record.name}] {record.getMessage()}", - ) - - # 🔴 **Replace Existing Loggers with Interception** - logging.root.handlers.clear() - logging.root.setLevel(logging.INFO) - logging.root.handlers = [InterceptHandler()] - - for logger_name in intercept_loggers: - logging_logger = logging.getLogger(logger_name) - logging_logger.handlers.clear() # Remove Uvicorn default handlers - logging_logger.propagate = True # ✅ Ensure they propagate through Loguru - - # 🔴 **Redirect stdout/stderr to Loguru (Keep Green Timestamps)** - guru_logger.remove() - guru_logger.add( - sys.stdout, - level="INFO", - format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {message}", - ) - + LoggerBase.configure_uvicorn_logging() print("✅ Logging interception complete. Logs are formatted and deduplicated!") diff --git a/apps/content/backend/document/document_manager.py b/apps/content/backend/document/document_manager.py index 1d62ebc..8db5f40 100644 --- a/apps/content/backend/document/document_manager.py +++ b/apps/content/backend/document/document_manager.py @@ -1,6 +1,7 @@ from common.config.app_settings import app_settings from backend.content.models import DocumentDoc from backend.document.models import BasicProfileDoc +from datetime import datetime, timezone import httpx import base64 @@ -53,9 +54,12 @@ class DocumentManager: print(f"Failed to queue deletion: {response.text}") async def cleanup_document(self): + # Get today's date at midnight (UTC) + today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) # Corrected query with regex documents = await DocumentDoc.find( - {"created_by": {"$regex": "^content-service-"}} + {"created_by": {"$regex": "^content-service-"}, + "create_time": {"$lt": today_start}} ).to_list() if documents: diff --git a/apps/content/common/config/app_settings.py b/apps/content/common/config/app_settings.py index aa5f826..c1efe3c 100644 --- a/apps/content/common/config/app_settings.py +++ b/apps/content/common/config/app_settings.py @@ -3,7 +3,8 @@ from pydantic_settings import BaseSettings class AppSettings(BaseSettings): NAME: str = "content" - APP_NAME:str = NAME + APP_NAME: str = NAME + APP_ENV: str = "alpha" METRICS_ENABLED: bool = False PROBES_ENABLED: bool = True diff --git a/apps/content/common/config/log_settings.py b/apps/content/common/config/log_settings.py index 717c1a5..2f6985c 100644 --- a/apps/content/common/config/log_settings.py +++ b/apps/content/common/config/log_settings.py @@ -1,17 +1,16 @@ -from pydantic_settings import BaseSettings +import os +from dataclasses import dataclass from .app_settings import app_settings - -class LogSettings(BaseSettings): - LOG_LEVEL: str = "DEBUG" - LOG_BASE_PATH: str = app_settings.LOG_BASE_PATH - LOG_PATH: str = LOG_BASE_PATH + "/" + app_settings.BACKEND_LOG_FILE_NAME + ".log" - LOG_RETENTION: str = "14 days" - LOG_ROTATION: str = "00:00" # mid night - - class Config: - env_file = ".log.env" - env_file_encoding = "utf-8" +@dataclass +class LogSettings: + LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH + LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days") + LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight + MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5)) + LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB + APP_NAME: str = app_settings.APP_NAME + ENVIRONMENT: str = app_settings.APP_ENV log_settings = LogSettings() diff --git a/apps/content/common/log/application_logger.py b/apps/content/common/log/application_logger.py index 51d2cba..67ec321 100644 --- a/apps/content/common/log/application_logger.py +++ b/apps/content/common/log/application_logger.py @@ -1,14 +1,12 @@ from .base_logger import LoggerBase from common.config.app_settings import app_settings -import json - class ApplicationLogger(LoggerBase): def __init__(self, application_activities: dict[str, any] = {}) -> None: - extra_fields = {} + extra_fileds = {} if application_activities: - extra_fields.update(application_activities) + extra_fileds.update(application_activities) super().__init__( logger_name=app_settings.APPLICATION_ACTIVITY_LOG, - extra_fields=extra_fields, + extra_fileds=extra_fileds, ) diff --git a/apps/content/common/log/base_logger.py b/apps/content/common/log/base_logger.py index 49e773d..24f7bb0 100644 --- a/apps/content/common/log/base_logger.py +++ b/apps/content/common/log/base_logger.py @@ -1,140 +1,136 @@ from loguru import logger as guru_logger from common.config.log_settings import log_settings -from typing import Dict, Any +from typing import Dict, Any, Optional import socket +import json import threading +import os +import sys +import inspect +import logging +from common.log.json_sink import JsonSink class LoggerBase: - binded_loggers = {} # Stores logger instances - loguru_sinks_added = set() # Tracks added log sinks + binded_loggers = {} logger_lock = threading.Lock() - def __init__(self, logger_name: str, extra_fields: Dict[str, Any] = None) -> None: + def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None: self.__logger_name = logger_name - self.extra_fields = extra_fields or {} - + self.extra_fileds = extra_fileds with LoggerBase.logger_lock: - # ✅ **If already created, reuse it** to prevent duplicates if self.__logger_name in LoggerBase.binded_loggers: self.logger = LoggerBase.binded_loggers[self.__logger_name] return - log_filename = f"{log_settings.LOG_BASE_PATH}/{self.__logger_name}.log" + log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log" + log_level = "INFO" + rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024) - # ✅ **Ensure Loguru sink is added only once** - if log_filename not in LoggerBase.loguru_sinks_added: - guru_logger.add( - sink=log_filename, - level="INFO", - retention=log_settings.LOG_RETENTION, - rotation=log_settings.LOG_ROTATION, - format="{message}", - serialize=True, - filter=lambda record: "extra" in record - and "topic" in record["extra"] - and record["extra"]["topic"] == self.__logger_name, - ) - LoggerBase.loguru_sinks_added.add(log_filename) # ✅ Mark as added + guru_logger.remove() - host_name = socket.gethostname() - host_ip = socket.gethostbyname(host_name) - - # ✅ Bind the logger with topic and extra fields - self.logger = guru_logger.bind( - topic=self.__logger_name, - host_ip=host_ip, - host_name=host_name, - **self.extra_fields, # Include additional metadata + file_sink = JsonSink( + log_file_path=log_filename, + rotation_size_bytes=rotation_bytes, + max_backup_files=log_settings.MAX_BACKUP_FILES ) + guru_logger.add( + sink=file_sink, + level=log_level, + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) - # ✅ Store reference to prevent duplicate instances + guru_logger.add( + sink=sys.stderr, + level=log_level, + format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}", + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) + + host_name = socket.gethostname() + host_ip = socket.gethostbyname(host_name) + self.logger = guru_logger.bind( + topic=self.__logger_name, + host_ip=host_ip, + host_name=host_name, + app=log_settings.APP_NAME, + env=log_settings.ENVIRONMENT, + ) + with LoggerBase.logger_lock: LoggerBase.binded_loggers[self.__logger_name] = self.logger - async def log_event( - self, - sender_id: str, - receiver_id: str, - subject: str, - event: str, - properties: Dict[str, Any], - text: str = "", - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event=event, - properties=properties, - ) + def _get_log_context(self) -> dict: + frame = inspect.currentframe().f_back.f_back + filename = os.path.basename(frame.f_code.co_filename) + lineno = frame.f_lineno + return {"log_file": filename, "log_line": lineno} + + def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]: + props = {} if properties is None else properties.copy() + props_str = json.dumps(props, ensure_ascii=False) if props else "{}" + return props, props_str + + async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context) local_logger.info(text) - async def log_exception( - self, - sender_id: str, - receiver_id: str, - subject: str, - exception: Exception, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="exception", - properties=properties, - exception=exception, - ) + async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context) local_logger.exception(text) - async def log_info( - self, - sender_id: str, - receiver_id: str, - subject: str, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="information", - properties=properties, - ) + async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context) local_logger.info(text) - async def log_warning( - self, - sender_id: str, - receiver_id: str, - subject: str, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="warning", - properties=properties, - ) + async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context) local_logger.warning(text) - async def log_error( - self, - sender_id: str, - receiver_id: str, - subject: str, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="error", - properties=properties, - ) + async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context) local_logger.error(text) + + @staticmethod + def configure_uvicorn_logging(): + print("📢 Setting up uvicorn logging interception...") + + # Intercept logs from these loggers + intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] + + class InterceptHandler(logging.Handler): + def emit(self, record): + level = ( + guru_logger.level(record.levelname).name + if guru_logger.level(record.levelname, None) + else record.levelno + ) + frame, depth = logging.currentframe(), 2 + while frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + guru_logger.opt(depth=depth, exception=record.exc_info).log( + level, + f"[{record.name}] {record.getMessage()}", + ) + + # Replace default handlers + logging.root.handlers.clear() + logging.root.setLevel(logging.INFO) + logging.root.handlers = [InterceptHandler()] + + # Configure specific uvicorn loggers + for logger_name in intercept_loggers: + logging_logger = logging.getLogger(logger_name) + logging_logger.handlers.clear() # Remove default handlers + logging_logger.propagate = True # Ensure propagation through Loguru diff --git a/apps/content/common/log/business_metric_logger.py b/apps/content/common/log/business_metric_logger.py index 734e49e..95383ab 100644 --- a/apps/content/common/log/business_metric_logger.py +++ b/apps/content/common/log/business_metric_logger.py @@ -5,14 +5,15 @@ import json class BusinessMetricLogger(LoggerBase): def __init__(self, business_metrics: dict[str, any] = {}) -> None: - extra_fields = {} + extra_fileds = {} if business_metrics: - extra_fields.update(business_metrics) + extra_fileds.update(business_metrics) super().__init__( logger_name=app_settings.BUSINESS_METRIC_LOG, - extra_fields=extra_fields, + extra_fileds=extra_fileds, ) + async def log_metrics(self, business_metrics: dict[str, any] = {}) -> None: return await super().log_event( sender_id="business_metric_manager", @@ -20,5 +21,5 @@ class BusinessMetricLogger(LoggerBase): subject="metrics", event="logging", properties=business_metrics, - text="business metric logged", + text="business metric logged" ) diff --git a/apps/content/common/log/json_sink.py b/apps/content/common/log/json_sink.py new file mode 100644 index 0000000..a798156 --- /dev/null +++ b/apps/content/common/log/json_sink.py @@ -0,0 +1,85 @@ +import json +import datetime +import traceback +from pathlib import Path +from typing import Optional + +class JsonSink: + def __init__( + self, + log_file_path: str, + rotation_size_bytes: int = 10 * 1024 * 1024, + max_backup_files: int = 5, + ): + self.log_file_path = Path(log_file_path) + self.rotation_size = rotation_size_bytes + self.max_backup_files = max_backup_files + self._open_log_file() + + def _open_log_file(self): + # ensure the parent directory exists + parent_dir = self.log_file_path.parent + if not parent_dir.exists(): + parent_dir.mkdir(parents=True, exist_ok=True) + self.log_file = self.log_file_path.open("a", encoding="utf-8") + + def _should_rotate(self) -> bool: + return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size + + def _rotate(self): + self.log_file.close() + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}") + self.log_file_path.rename(rotated_path) + self._cleanup_old_backups() + self._open_log_file() + + def _cleanup_old_backups(self): + parent = self.log_file_path.parent + stem = self.log_file_path.stem + suffix = self.log_file_path.suffix + + backup_files = sorted( + parent.glob(f"{stem}_*{suffix}"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + + for old_file in backup_files[self.max_backup_files:]: + try: + old_file.unlink() + except Exception as e: + print(f"Failed to delete old backup {old_file}: {e}") + + def __call__(self, message): + record = message.record + if self._should_rotate(): + self._rotate() + + log_entry = { + "level": record["level"].name.lower(), + "timestamp": int(record["time"].timestamp() * 1000), + "text": record["message"], + "fields": record["extra"].get("properties", {}), + "context": { + "app": record["extra"].get("app"), + "env": record["extra"].get("env"), + "log_file": record["extra"].get("log_file"), + "log_line": record["extra"].get("log_line"), + "topic": record["extra"].get("topic"), + "sender_id": record["extra"].get("sender_id"), + "receiver_id": record["extra"].get("receiver_id"), + "subject": record["extra"].get("subject"), + "event": record["extra"].get("event"), + "host_ip": record["extra"].get("host_ip"), + "host_name": record["extra"].get("host_name"), + }, + "stacktrace": None + } + + if record["exception"]: + exc_type, exc_value, exc_tb = record["exception"] + log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb) + + self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n") + self.log_file.flush() diff --git a/apps/content/common/log/user_logger.py b/apps/content/common/log/user_logger.py index c866c87..d931975 100644 --- a/apps/content/common/log/user_logger.py +++ b/apps/content/common/log/user_logger.py @@ -6,9 +6,9 @@ import json class UserLogger(LoggerBase): def __init__(self, user_activities: dict[str, any] = {}) -> None: - extra_fields = {} + extra_fileds = {} if user_activities: - extra_fields.update(user_activities) + extra_fileds.update(user_activities) super().__init__( - logger_name=app_settings.USER_ACTIVITY_LOG, extra_fields=extra_fields + logger_name=app_settings.USER_ACTIVITY_LOG, extra_fileds=extra_fileds ) diff --git a/apps/content/scheduler/scheduler_manager.py b/apps/content/scheduler/scheduler_manager.py index d35fe86..fe1bb79 100755 --- a/apps/content/scheduler/scheduler_manager.py +++ b/apps/content/scheduler/scheduler_manager.py @@ -1,5 +1,6 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.date import DateTrigger +from apscheduler.triggers.cron import CronTrigger from datetime import datetime, timedelta from scheduler.refresh_sharepoint_content_job import ( refresh_sharepoint_content_job, @@ -28,15 +29,11 @@ async def register_job(scheduler: AsyncIOScheduler): "date", run_date=datetime(2025, 2, 7, 20, 0, 0), ) - # Register cleanup_document_job as a one-time job - # This job is just one-time job for removing many unused documents - # Run already, now comment it out - # await init_lock(ScheduleJobLocker.CLEANUP_DOCUMENT_JOB_LOCKER) - # execution_time = datetime.now() + timedelta( - # seconds=60 - # ) # Schedule to run 60 seconds from now - # scheduler.add_job( - # cleanup_document_job, # Job function - # trigger=DateTrigger(run_date=execution_time), # One-time trigger - # id="cleanup_document_one_time", # Optional: Give the job an ID - # ) + + await init_lock(ScheduleJobLocker.CLEANUP_DOCUMENT_JOB_LOCKER) + scheduler.add_job( + cleanup_document_job, + trigger=CronTrigger(hour=2, minute=0), # Runs every day at 2:00 AM + id="cleanup_document_daily", + ) + diff --git a/apps/content/webapi/providers/logger.py b/apps/content/webapi/providers/logger.py index afcab10..6eb1e22 100644 --- a/apps/content/webapi/providers/logger.py +++ b/apps/content/webapi/providers/logger.py @@ -1,47 +1,8 @@ -import logging -import sys from loguru import logger as guru_logger +from common.log.base_logger import LoggerBase def register_logger(): print("📢 Setting up logging interception...") - - # 🔴 **Ensure Uvicorn Logs Are Captured** - intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] - - class InterceptHandler(logging.Handler): - def emit(self, record): - level = ( - guru_logger.level(record.levelname).name - if guru_logger.level(record.levelname, None) - else record.levelno - ) - frame, depth = logging.currentframe(), 2 - while frame.f_code.co_filename == logging.__file__: - frame = frame.f_back - depth += 1 - - guru_logger.opt(depth=depth, exception=record.exc_info).log( - level, - f"[{record.name}] {record.getMessage()}", - ) - - # 🔴 **Replace Existing Loggers with Interception** - logging.root.handlers.clear() - logging.root.setLevel(logging.INFO) - logging.root.handlers = [InterceptHandler()] - - for logger_name in intercept_loggers: - logging_logger = logging.getLogger(logger_name) - logging_logger.handlers.clear() # Remove Uvicorn default handlers - logging_logger.propagate = True # ✅ Ensure they propagate through Loguru - - # 🔴 **Redirect stdout/stderr to Loguru (Keep Green Timestamps)** - guru_logger.remove() - guru_logger.add( - sys.stdout, - level="INFO", - format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {message}", - ) - + LoggerBase.configure_uvicorn_logging() print("✅ Logging interception complete. Logs are formatted and deduplicated!") diff --git a/apps/notification/common/config/app_settings.py b/apps/notification/common/config/app_settings.py index c711676..4d634c7 100644 --- a/apps/notification/common/config/app_settings.py +++ b/apps/notification/common/config/app_settings.py @@ -5,6 +5,7 @@ from pydantic_settings import BaseSettings class AppSettings(BaseSettings): NAME: str = "notification" APP_NAME:str = NAME + APP_ENV: str = "alpha" METRICS_ENABLED: bool = False PROBES_ENABLED: bool = True diff --git a/apps/notification/common/config/log_settings.py b/apps/notification/common/config/log_settings.py index 48b97a4..2f6985c 100644 --- a/apps/notification/common/config/log_settings.py +++ b/apps/notification/common/config/log_settings.py @@ -1,17 +1,16 @@ - -from pydantic_settings import BaseSettings +import os +from dataclasses import dataclass from .app_settings import app_settings -class LogSettings(BaseSettings): - LOG_LEVEL: str = "DEBUG" - LOG_BASE_PATH: str = app_settings.LOG_BASE_PATH - LOG_PATH: str = LOG_BASE_PATH + '/' + app_settings.BACKEND_LOG_FILE_NAME + '.log' - LOG_RETENTION: str = "14 days" - LOG_ROTATION: str = "00:00" # mid night - - class Config: - env_file = ".log.env" - env_file_encoding = "utf-8" +@dataclass +class LogSettings: + LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH + LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days") + LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight + MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5)) + LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB + APP_NAME: str = app_settings.APP_NAME + ENVIRONMENT: str = app_settings.APP_ENV log_settings = LogSettings() diff --git a/apps/notification/common/log/application_logger.py b/apps/notification/common/log/application_logger.py index 51d2cba..67ec321 100644 --- a/apps/notification/common/log/application_logger.py +++ b/apps/notification/common/log/application_logger.py @@ -1,14 +1,12 @@ from .base_logger import LoggerBase from common.config.app_settings import app_settings -import json - class ApplicationLogger(LoggerBase): def __init__(self, application_activities: dict[str, any] = {}) -> None: - extra_fields = {} + extra_fileds = {} if application_activities: - extra_fields.update(application_activities) + extra_fileds.update(application_activities) super().__init__( logger_name=app_settings.APPLICATION_ACTIVITY_LOG, - extra_fields=extra_fields, + extra_fileds=extra_fileds, ) diff --git a/apps/notification/common/log/base_logger.py b/apps/notification/common/log/base_logger.py index 49e773d..24f7bb0 100644 --- a/apps/notification/common/log/base_logger.py +++ b/apps/notification/common/log/base_logger.py @@ -1,140 +1,136 @@ from loguru import logger as guru_logger from common.config.log_settings import log_settings -from typing import Dict, Any +from typing import Dict, Any, Optional import socket +import json import threading +import os +import sys +import inspect +import logging +from common.log.json_sink import JsonSink class LoggerBase: - binded_loggers = {} # Stores logger instances - loguru_sinks_added = set() # Tracks added log sinks + binded_loggers = {} logger_lock = threading.Lock() - def __init__(self, logger_name: str, extra_fields: Dict[str, Any] = None) -> None: + def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None: self.__logger_name = logger_name - self.extra_fields = extra_fields or {} - + self.extra_fileds = extra_fileds with LoggerBase.logger_lock: - # ✅ **If already created, reuse it** to prevent duplicates if self.__logger_name in LoggerBase.binded_loggers: self.logger = LoggerBase.binded_loggers[self.__logger_name] return - log_filename = f"{log_settings.LOG_BASE_PATH}/{self.__logger_name}.log" + log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log" + log_level = "INFO" + rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024) - # ✅ **Ensure Loguru sink is added only once** - if log_filename not in LoggerBase.loguru_sinks_added: - guru_logger.add( - sink=log_filename, - level="INFO", - retention=log_settings.LOG_RETENTION, - rotation=log_settings.LOG_ROTATION, - format="{message}", - serialize=True, - filter=lambda record: "extra" in record - and "topic" in record["extra"] - and record["extra"]["topic"] == self.__logger_name, - ) - LoggerBase.loguru_sinks_added.add(log_filename) # ✅ Mark as added + guru_logger.remove() - host_name = socket.gethostname() - host_ip = socket.gethostbyname(host_name) - - # ✅ Bind the logger with topic and extra fields - self.logger = guru_logger.bind( - topic=self.__logger_name, - host_ip=host_ip, - host_name=host_name, - **self.extra_fields, # Include additional metadata + file_sink = JsonSink( + log_file_path=log_filename, + rotation_size_bytes=rotation_bytes, + max_backup_files=log_settings.MAX_BACKUP_FILES ) + guru_logger.add( + sink=file_sink, + level=log_level, + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) - # ✅ Store reference to prevent duplicate instances + guru_logger.add( + sink=sys.stderr, + level=log_level, + format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}", + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) + + host_name = socket.gethostname() + host_ip = socket.gethostbyname(host_name) + self.logger = guru_logger.bind( + topic=self.__logger_name, + host_ip=host_ip, + host_name=host_name, + app=log_settings.APP_NAME, + env=log_settings.ENVIRONMENT, + ) + with LoggerBase.logger_lock: LoggerBase.binded_loggers[self.__logger_name] = self.logger - async def log_event( - self, - sender_id: str, - receiver_id: str, - subject: str, - event: str, - properties: Dict[str, Any], - text: str = "", - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event=event, - properties=properties, - ) + def _get_log_context(self) -> dict: + frame = inspect.currentframe().f_back.f_back + filename = os.path.basename(frame.f_code.co_filename) + lineno = frame.f_lineno + return {"log_file": filename, "log_line": lineno} + + def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]: + props = {} if properties is None else properties.copy() + props_str = json.dumps(props, ensure_ascii=False) if props else "{}" + return props, props_str + + async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context) local_logger.info(text) - async def log_exception( - self, - sender_id: str, - receiver_id: str, - subject: str, - exception: Exception, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="exception", - properties=properties, - exception=exception, - ) + async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context) local_logger.exception(text) - async def log_info( - self, - sender_id: str, - receiver_id: str, - subject: str, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="information", - properties=properties, - ) + async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context) local_logger.info(text) - async def log_warning( - self, - sender_id: str, - receiver_id: str, - subject: str, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="warning", - properties=properties, - ) + async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context) local_logger.warning(text) - async def log_error( - self, - sender_id: str, - receiver_id: str, - subject: str, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="error", - properties=properties, - ) + async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context) local_logger.error(text) + + @staticmethod + def configure_uvicorn_logging(): + print("📢 Setting up uvicorn logging interception...") + + # Intercept logs from these loggers + intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] + + class InterceptHandler(logging.Handler): + def emit(self, record): + level = ( + guru_logger.level(record.levelname).name + if guru_logger.level(record.levelname, None) + else record.levelno + ) + frame, depth = logging.currentframe(), 2 + while frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + guru_logger.opt(depth=depth, exception=record.exc_info).log( + level, + f"[{record.name}] {record.getMessage()}", + ) + + # Replace default handlers + logging.root.handlers.clear() + logging.root.setLevel(logging.INFO) + logging.root.handlers = [InterceptHandler()] + + # Configure specific uvicorn loggers + for logger_name in intercept_loggers: + logging_logger = logging.getLogger(logger_name) + logging_logger.handlers.clear() # Remove default handlers + logging_logger.propagate = True # Ensure propagation through Loguru diff --git a/apps/notification/common/log/business_metric_logger.py b/apps/notification/common/log/business_metric_logger.py new file mode 100644 index 0000000..95383ab --- /dev/null +++ b/apps/notification/common/log/business_metric_logger.py @@ -0,0 +1,25 @@ +from .base_logger import LoggerBase +from common.config.app_settings import app_settings +import json + + +class BusinessMetricLogger(LoggerBase): + def __init__(self, business_metrics: dict[str, any] = {}) -> None: + extra_fileds = {} + if business_metrics: + extra_fileds.update(business_metrics) + super().__init__( + logger_name=app_settings.BUSINESS_METRIC_LOG, + extra_fileds=extra_fileds, + ) + + + async def log_metrics(self, business_metrics: dict[str, any] = {}) -> None: + return await super().log_event( + sender_id="business_metric_manager", + receiver_id="business_metric_logger", + subject="metrics", + event="logging", + properties=business_metrics, + text="business metric logged" + ) diff --git a/apps/notification/common/log/json_sink.py b/apps/notification/common/log/json_sink.py new file mode 100644 index 0000000..a798156 --- /dev/null +++ b/apps/notification/common/log/json_sink.py @@ -0,0 +1,85 @@ +import json +import datetime +import traceback +from pathlib import Path +from typing import Optional + +class JsonSink: + def __init__( + self, + log_file_path: str, + rotation_size_bytes: int = 10 * 1024 * 1024, + max_backup_files: int = 5, + ): + self.log_file_path = Path(log_file_path) + self.rotation_size = rotation_size_bytes + self.max_backup_files = max_backup_files + self._open_log_file() + + def _open_log_file(self): + # ensure the parent directory exists + parent_dir = self.log_file_path.parent + if not parent_dir.exists(): + parent_dir.mkdir(parents=True, exist_ok=True) + self.log_file = self.log_file_path.open("a", encoding="utf-8") + + def _should_rotate(self) -> bool: + return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size + + def _rotate(self): + self.log_file.close() + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}") + self.log_file_path.rename(rotated_path) + self._cleanup_old_backups() + self._open_log_file() + + def _cleanup_old_backups(self): + parent = self.log_file_path.parent + stem = self.log_file_path.stem + suffix = self.log_file_path.suffix + + backup_files = sorted( + parent.glob(f"{stem}_*{suffix}"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + + for old_file in backup_files[self.max_backup_files:]: + try: + old_file.unlink() + except Exception as e: + print(f"Failed to delete old backup {old_file}: {e}") + + def __call__(self, message): + record = message.record + if self._should_rotate(): + self._rotate() + + log_entry = { + "level": record["level"].name.lower(), + "timestamp": int(record["time"].timestamp() * 1000), + "text": record["message"], + "fields": record["extra"].get("properties", {}), + "context": { + "app": record["extra"].get("app"), + "env": record["extra"].get("env"), + "log_file": record["extra"].get("log_file"), + "log_line": record["extra"].get("log_line"), + "topic": record["extra"].get("topic"), + "sender_id": record["extra"].get("sender_id"), + "receiver_id": record["extra"].get("receiver_id"), + "subject": record["extra"].get("subject"), + "event": record["extra"].get("event"), + "host_ip": record["extra"].get("host_ip"), + "host_name": record["extra"].get("host_name"), + }, + "stacktrace": None + } + + if record["exception"]: + exc_type, exc_value, exc_tb = record["exception"] + log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb) + + self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n") + self.log_file.flush() diff --git a/apps/notification/common/log/user_logger.py b/apps/notification/common/log/user_logger.py new file mode 100644 index 0000000..d931975 --- /dev/null +++ b/apps/notification/common/log/user_logger.py @@ -0,0 +1,14 @@ +from .base_logger import LoggerBase +from common.config.app_settings import app_settings + +import json + + +class UserLogger(LoggerBase): + def __init__(self, user_activities: dict[str, any] = {}) -> None: + extra_fileds = {} + if user_activities: + extra_fileds.update(user_activities) + super().__init__( + logger_name=app_settings.USER_ACTIVITY_LOG, extra_fileds=extra_fileds + ) diff --git a/apps/notification/webapi/providers/logger.py b/apps/notification/webapi/providers/logger.py index afcab10..6eb1e22 100644 --- a/apps/notification/webapi/providers/logger.py +++ b/apps/notification/webapi/providers/logger.py @@ -1,47 +1,8 @@ -import logging -import sys from loguru import logger as guru_logger +from common.log.base_logger import LoggerBase def register_logger(): print("📢 Setting up logging interception...") - - # 🔴 **Ensure Uvicorn Logs Are Captured** - intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] - - class InterceptHandler(logging.Handler): - def emit(self, record): - level = ( - guru_logger.level(record.levelname).name - if guru_logger.level(record.levelname, None) - else record.levelno - ) - frame, depth = logging.currentframe(), 2 - while frame.f_code.co_filename == logging.__file__: - frame = frame.f_back - depth += 1 - - guru_logger.opt(depth=depth, exception=record.exc_info).log( - level, - f"[{record.name}] {record.getMessage()}", - ) - - # 🔴 **Replace Existing Loggers with Interception** - logging.root.handlers.clear() - logging.root.setLevel(logging.INFO) - logging.root.handlers = [InterceptHandler()] - - for logger_name in intercept_loggers: - logging_logger = logging.getLogger(logger_name) - logging_logger.handlers.clear() # Remove Uvicorn default handlers - logging_logger.propagate = True # ✅ Ensure they propagate through Loguru - - # 🔴 **Redirect stdout/stderr to Loguru (Keep Green Timestamps)** - guru_logger.remove() - guru_logger.add( - sys.stdout, - level="INFO", - format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {message}", - ) - + LoggerBase.configure_uvicorn_logging() print("✅ Logging interception complete. Logs are formatted and deduplicated!") diff --git a/apps/payment/backend/business/stripe_manager.py b/apps/payment/backend/business/stripe_manager.py index 9b8b749..656f884 100644 --- a/apps/payment/backend/business/stripe_manager.py +++ b/apps/payment/backend/business/stripe_manager.py @@ -292,10 +292,10 @@ class StripeManager: }, }, mode="payment", - success_url="{}/work-space".format( + success_url="{}/projects".format( self.site_url_root ), # needs to be set, local: http://localhost/ - cancel_url="{}/work-space".format(self.site_url_root), + cancel_url="{}/projects".format(self.site_url_root), ) if session: diff --git a/apps/payment/common/config/app_settings.py b/apps/payment/common/config/app_settings.py index a99b386..4a63e89 100644 --- a/apps/payment/common/config/app_settings.py +++ b/apps/payment/common/config/app_settings.py @@ -4,6 +4,7 @@ from pydantic_settings import BaseSettings class AppSettings(BaseSettings): NAME: str = "payment" APP_NAME: str = NAME + APP_ENV: str = "alpha" JWT_SECRET_KEY: str = "" ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600 diff --git a/apps/payment/common/config/log_settings.py b/apps/payment/common/config/log_settings.py index 48b97a4..2f6985c 100644 --- a/apps/payment/common/config/log_settings.py +++ b/apps/payment/common/config/log_settings.py @@ -1,17 +1,16 @@ - -from pydantic_settings import BaseSettings +import os +from dataclasses import dataclass from .app_settings import app_settings -class LogSettings(BaseSettings): - LOG_LEVEL: str = "DEBUG" - LOG_BASE_PATH: str = app_settings.LOG_BASE_PATH - LOG_PATH: str = LOG_BASE_PATH + '/' + app_settings.BACKEND_LOG_FILE_NAME + '.log' - LOG_RETENTION: str = "14 days" - LOG_ROTATION: str = "00:00" # mid night - - class Config: - env_file = ".log.env" - env_file_encoding = "utf-8" +@dataclass +class LogSettings: + LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH + LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days") + LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight + MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5)) + LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB + APP_NAME: str = app_settings.APP_NAME + ENVIRONMENT: str = app_settings.APP_ENV log_settings = LogSettings() diff --git a/apps/payment/common/log/application_logger.py b/apps/payment/common/log/application_logger.py index 51d2cba..67ec321 100644 --- a/apps/payment/common/log/application_logger.py +++ b/apps/payment/common/log/application_logger.py @@ -1,14 +1,12 @@ from .base_logger import LoggerBase from common.config.app_settings import app_settings -import json - class ApplicationLogger(LoggerBase): def __init__(self, application_activities: dict[str, any] = {}) -> None: - extra_fields = {} + extra_fileds = {} if application_activities: - extra_fields.update(application_activities) + extra_fileds.update(application_activities) super().__init__( logger_name=app_settings.APPLICATION_ACTIVITY_LOG, - extra_fields=extra_fields, + extra_fileds=extra_fileds, ) diff --git a/apps/payment/common/log/base_logger.py b/apps/payment/common/log/base_logger.py index 49e773d..24f7bb0 100644 --- a/apps/payment/common/log/base_logger.py +++ b/apps/payment/common/log/base_logger.py @@ -1,140 +1,136 @@ from loguru import logger as guru_logger from common.config.log_settings import log_settings -from typing import Dict, Any +from typing import Dict, Any, Optional import socket +import json import threading +import os +import sys +import inspect +import logging +from common.log.json_sink import JsonSink class LoggerBase: - binded_loggers = {} # Stores logger instances - loguru_sinks_added = set() # Tracks added log sinks + binded_loggers = {} logger_lock = threading.Lock() - def __init__(self, logger_name: str, extra_fields: Dict[str, Any] = None) -> None: + def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None: self.__logger_name = logger_name - self.extra_fields = extra_fields or {} - + self.extra_fileds = extra_fileds with LoggerBase.logger_lock: - # ✅ **If already created, reuse it** to prevent duplicates if self.__logger_name in LoggerBase.binded_loggers: self.logger = LoggerBase.binded_loggers[self.__logger_name] return - log_filename = f"{log_settings.LOG_BASE_PATH}/{self.__logger_name}.log" + log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log" + log_level = "INFO" + rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024) - # ✅ **Ensure Loguru sink is added only once** - if log_filename not in LoggerBase.loguru_sinks_added: - guru_logger.add( - sink=log_filename, - level="INFO", - retention=log_settings.LOG_RETENTION, - rotation=log_settings.LOG_ROTATION, - format="{message}", - serialize=True, - filter=lambda record: "extra" in record - and "topic" in record["extra"] - and record["extra"]["topic"] == self.__logger_name, - ) - LoggerBase.loguru_sinks_added.add(log_filename) # ✅ Mark as added + guru_logger.remove() - host_name = socket.gethostname() - host_ip = socket.gethostbyname(host_name) - - # ✅ Bind the logger with topic and extra fields - self.logger = guru_logger.bind( - topic=self.__logger_name, - host_ip=host_ip, - host_name=host_name, - **self.extra_fields, # Include additional metadata + file_sink = JsonSink( + log_file_path=log_filename, + rotation_size_bytes=rotation_bytes, + max_backup_files=log_settings.MAX_BACKUP_FILES ) + guru_logger.add( + sink=file_sink, + level=log_level, + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) - # ✅ Store reference to prevent duplicate instances + guru_logger.add( + sink=sys.stderr, + level=log_level, + format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}", + filter=lambda record: record["extra"].get("topic") == self.__logger_name, + ) + + host_name = socket.gethostname() + host_ip = socket.gethostbyname(host_name) + self.logger = guru_logger.bind( + topic=self.__logger_name, + host_ip=host_ip, + host_name=host_name, + app=log_settings.APP_NAME, + env=log_settings.ENVIRONMENT, + ) + with LoggerBase.logger_lock: LoggerBase.binded_loggers[self.__logger_name] = self.logger - async def log_event( - self, - sender_id: str, - receiver_id: str, - subject: str, - event: str, - properties: Dict[str, Any], - text: str = "", - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event=event, - properties=properties, - ) + def _get_log_context(self) -> dict: + frame = inspect.currentframe().f_back.f_back + filename = os.path.basename(frame.f_code.co_filename) + lineno = frame.f_lineno + return {"log_file": filename, "log_line": lineno} + + def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]: + props = {} if properties is None else properties.copy() + props_str = json.dumps(props, ensure_ascii=False) if props else "{}" + return props, props_str + + async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context) local_logger.info(text) - async def log_exception( - self, - sender_id: str, - receiver_id: str, - subject: str, - exception: Exception, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="exception", - properties=properties, - exception=exception, - ) + async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context) local_logger.exception(text) - async def log_info( - self, - sender_id: str, - receiver_id: str, - subject: str, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="information", - properties=properties, - ) + async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context) local_logger.info(text) - async def log_warning( - self, - sender_id: str, - receiver_id: str, - subject: str, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="warning", - properties=properties, - ) + async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context) local_logger.warning(text) - async def log_error( - self, - sender_id: str, - receiver_id: str, - subject: str, - text: str = "", - properties: Dict[str, Any] = None, - ) -> None: - local_logger = self.logger.bind( - sender_id=sender_id, - receiver_id=receiver_id, - subject=subject, - event="error", - properties=properties, - ) + async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None: + props, props_str = self._prepare_properties(properties) + context = self._get_log_context() + local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context) local_logger.error(text) + + @staticmethod + def configure_uvicorn_logging(): + print("📢 Setting up uvicorn logging interception...") + + # Intercept logs from these loggers + intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] + + class InterceptHandler(logging.Handler): + def emit(self, record): + level = ( + guru_logger.level(record.levelname).name + if guru_logger.level(record.levelname, None) + else record.levelno + ) + frame, depth = logging.currentframe(), 2 + while frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + guru_logger.opt(depth=depth, exception=record.exc_info).log( + level, + f"[{record.name}] {record.getMessage()}", + ) + + # Replace default handlers + logging.root.handlers.clear() + logging.root.setLevel(logging.INFO) + logging.root.handlers = [InterceptHandler()] + + # Configure specific uvicorn loggers + for logger_name in intercept_loggers: + logging_logger = logging.getLogger(logger_name) + logging_logger.handlers.clear() # Remove default handlers + logging_logger.propagate = True # Ensure propagation through Loguru diff --git a/apps/payment/common/log/business_metric_logger.py b/apps/payment/common/log/business_metric_logger.py new file mode 100644 index 0000000..95383ab --- /dev/null +++ b/apps/payment/common/log/business_metric_logger.py @@ -0,0 +1,25 @@ +from .base_logger import LoggerBase +from common.config.app_settings import app_settings +import json + + +class BusinessMetricLogger(LoggerBase): + def __init__(self, business_metrics: dict[str, any] = {}) -> None: + extra_fileds = {} + if business_metrics: + extra_fileds.update(business_metrics) + super().__init__( + logger_name=app_settings.BUSINESS_METRIC_LOG, + extra_fileds=extra_fileds, + ) + + + async def log_metrics(self, business_metrics: dict[str, any] = {}) -> None: + return await super().log_event( + sender_id="business_metric_manager", + receiver_id="business_metric_logger", + subject="metrics", + event="logging", + properties=business_metrics, + text="business metric logged" + ) diff --git a/apps/payment/common/log/json_sink.py b/apps/payment/common/log/json_sink.py new file mode 100644 index 0000000..a798156 --- /dev/null +++ b/apps/payment/common/log/json_sink.py @@ -0,0 +1,85 @@ +import json +import datetime +import traceback +from pathlib import Path +from typing import Optional + +class JsonSink: + def __init__( + self, + log_file_path: str, + rotation_size_bytes: int = 10 * 1024 * 1024, + max_backup_files: int = 5, + ): + self.log_file_path = Path(log_file_path) + self.rotation_size = rotation_size_bytes + self.max_backup_files = max_backup_files + self._open_log_file() + + def _open_log_file(self): + # ensure the parent directory exists + parent_dir = self.log_file_path.parent + if not parent_dir.exists(): + parent_dir.mkdir(parents=True, exist_ok=True) + self.log_file = self.log_file_path.open("a", encoding="utf-8") + + def _should_rotate(self) -> bool: + return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size + + def _rotate(self): + self.log_file.close() + timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}") + self.log_file_path.rename(rotated_path) + self._cleanup_old_backups() + self._open_log_file() + + def _cleanup_old_backups(self): + parent = self.log_file_path.parent + stem = self.log_file_path.stem + suffix = self.log_file_path.suffix + + backup_files = sorted( + parent.glob(f"{stem}_*{suffix}"), + key=lambda p: p.stat().st_mtime, + reverse=True, + ) + + for old_file in backup_files[self.max_backup_files:]: + try: + old_file.unlink() + except Exception as e: + print(f"Failed to delete old backup {old_file}: {e}") + + def __call__(self, message): + record = message.record + if self._should_rotate(): + self._rotate() + + log_entry = { + "level": record["level"].name.lower(), + "timestamp": int(record["time"].timestamp() * 1000), + "text": record["message"], + "fields": record["extra"].get("properties", {}), + "context": { + "app": record["extra"].get("app"), + "env": record["extra"].get("env"), + "log_file": record["extra"].get("log_file"), + "log_line": record["extra"].get("log_line"), + "topic": record["extra"].get("topic"), + "sender_id": record["extra"].get("sender_id"), + "receiver_id": record["extra"].get("receiver_id"), + "subject": record["extra"].get("subject"), + "event": record["extra"].get("event"), + "host_ip": record["extra"].get("host_ip"), + "host_name": record["extra"].get("host_name"), + }, + "stacktrace": None + } + + if record["exception"]: + exc_type, exc_value, exc_tb = record["exception"] + log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb) + + self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n") + self.log_file.flush() diff --git a/apps/payment/common/log/user_logger.py b/apps/payment/common/log/user_logger.py index c866c87..d931975 100644 --- a/apps/payment/common/log/user_logger.py +++ b/apps/payment/common/log/user_logger.py @@ -6,9 +6,9 @@ import json class UserLogger(LoggerBase): def __init__(self, user_activities: dict[str, any] = {}) -> None: - extra_fields = {} + extra_fileds = {} if user_activities: - extra_fields.update(user_activities) + extra_fileds.update(user_activities) super().__init__( - logger_name=app_settings.USER_ACTIVITY_LOG, extra_fields=extra_fields + logger_name=app_settings.USER_ACTIVITY_LOG, extra_fileds=extra_fileds ) diff --git a/apps/payment/webapi/providers/logger.py b/apps/payment/webapi/providers/logger.py index afcab10..6eb1e22 100644 --- a/apps/payment/webapi/providers/logger.py +++ b/apps/payment/webapi/providers/logger.py @@ -1,47 +1,8 @@ -import logging -import sys from loguru import logger as guru_logger +from common.log.base_logger import LoggerBase def register_logger(): print("📢 Setting up logging interception...") - - # 🔴 **Ensure Uvicorn Logs Are Captured** - intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"] - - class InterceptHandler(logging.Handler): - def emit(self, record): - level = ( - guru_logger.level(record.levelname).name - if guru_logger.level(record.levelname, None) - else record.levelno - ) - frame, depth = logging.currentframe(), 2 - while frame.f_code.co_filename == logging.__file__: - frame = frame.f_back - depth += 1 - - guru_logger.opt(depth=depth, exception=record.exc_info).log( - level, - f"[{record.name}] {record.getMessage()}", - ) - - # 🔴 **Replace Existing Loggers with Interception** - logging.root.handlers.clear() - logging.root.setLevel(logging.INFO) - logging.root.handlers = [InterceptHandler()] - - for logger_name in intercept_loggers: - logging_logger = logging.getLogger(logger_name) - logging_logger.handlers.clear() # Remove Uvicorn default handlers - logging_logger.propagate = True # ✅ Ensure they propagate through Loguru - - # 🔴 **Redirect stdout/stderr to Loguru (Keep Green Timestamps)** - guru_logger.remove() - guru_logger.add( - sys.stdout, - level="INFO", - format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {message}", - ) - + LoggerBase.configure_uvicorn_logging() print("✅ Logging interception complete. Logs are formatted and deduplicated!")