Merge pull request 'chore(code_base): merging from dev to master' (#7) from dev into master

Reviewed-on: freeleaps/freeleaps-service-hub#7
This commit is contained in:
jingyao1991 2025-04-24 06:48:19 +00:00
commit fbf2f23ad4
48 changed files with 1372 additions and 750 deletions

1
.gitignore vendored
View File

@ -7,3 +7,4 @@
*.pyc *.pyc
freedev.code-workspace freedev.code-workspace
.idea/ .idea/
.pytest_cache/

View File

@ -4,6 +4,7 @@ from pydantic_settings import BaseSettings
class AppSettings(BaseSettings): class AppSettings(BaseSettings):
NAME: str = "authentication" NAME: str = "authentication"
APP_NAME: str = NAME APP_NAME: str = NAME
APP_ENV: str = "alpha"
METRICS_ENABLED: bool = False METRICS_ENABLED: bool = False
PROBES_ENABLED: bool = True PROBES_ENABLED: bool = True

View File

@ -1,17 +1,17 @@
import os
from pydantic_settings import BaseSettings from dataclasses import dataclass
from .app_settings import app_settings from .app_settings import app_settings
class LogSettings(BaseSettings):
LOG_LEVEL: str = "DEBUG"
LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH
LOG_PATH: str = LOG_PATH_BASE + '/' + app_settings.BACKEND_LOG_FILE_NAME + '.log'
LOG_RETENTION: str = "14 days"
LOG_ROTATION: str = "00:00" # mid night
class Config: @dataclass
env_file = ".log.env" class LogSettings:
env_file_encoding = "utf-8" LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH
LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days")
LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight
MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5))
LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB
APP_NAME: str = app_settings.APP_NAME
ENVIRONMENT: str = app_settings.APP_ENV
log_settings = LogSettings() log_settings = LogSettings()

View File

@ -1,14 +1,12 @@
from .base_logger import LoggerBase from .base_logger import LoggerBase
from common.config.app_settings import app_settings from common.config.app_settings import app_settings
import json
class ApplicationLogger(LoggerBase): class ApplicationLogger(LoggerBase):
def __init__(self, application_activities: dict[str, any] = {}) -> None: def __init__(self, application_activities: dict[str, any] = {}) -> None:
extra_fields = {} extra_fileds = {}
if application_activities: if application_activities:
extra_fields.update(application_activities) extra_fileds.update(application_activities)
super().__init__( super().__init__(
logger_name=app_settings.APPLICATION_ACTIVITY_LOG, logger_name=app_settings.APPLICATION_ACTIVITY_LOG,
extra_fields=extra_fields, extra_fileds=extra_fileds,
) )

View File

@ -1,140 +1,136 @@
from loguru import logger as guru_logger from loguru import logger as guru_logger
from common.config.log_settings import log_settings from common.config.log_settings import log_settings
from typing import Dict, Any from typing import Dict, Any, Optional
import socket import socket
import json
import threading import threading
import os
import sys
import inspect
import logging
from common.log.json_sink import JsonSink
class LoggerBase: class LoggerBase:
binded_loggers = {} # Stores logger instances binded_loggers = {}
loguru_sinks_added = set() # Tracks added log sinks
logger_lock = threading.Lock() logger_lock = threading.Lock()
def __init__(self, logger_name: str, extra_fields: Dict[str, Any] = None) -> None: def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None:
self.__logger_name = logger_name self.__logger_name = logger_name
self.extra_fields = extra_fields or {} self.extra_fileds = extra_fileds
with LoggerBase.logger_lock: with LoggerBase.logger_lock:
# ✅ **If already created, reuse it** to prevent duplicates
if self.__logger_name in LoggerBase.binded_loggers: if self.__logger_name in LoggerBase.binded_loggers:
self.logger = LoggerBase.binded_loggers[self.__logger_name] self.logger = LoggerBase.binded_loggers[self.__logger_name]
return return
log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log" log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log"
log_level = "INFO"
rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024)
# ✅ **Ensure Loguru sink is added only once** guru_logger.remove()
if log_filename not in LoggerBase.loguru_sinks_added:
guru_logger.add( file_sink = JsonSink(
sink=log_filename, log_file_path=log_filename,
level="INFO", rotation_size_bytes=rotation_bytes,
retention=log_settings.LOG_RETENTION, max_backup_files=log_settings.MAX_BACKUP_FILES
rotation=log_settings.LOG_ROTATION, )
format="{message}", guru_logger.add(
serialize=True, sink=file_sink,
filter=lambda record: "extra" in record level=log_level,
and "topic" in record["extra"] filter=lambda record: record["extra"].get("topic") == self.__logger_name,
and record["extra"]["topic"] == self.__logger_name, )
guru_logger.add(
sink=sys.stderr,
level=log_level,
format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}",
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
) )
LoggerBase.loguru_sinks_added.add(log_filename) # ✅ Mark as added
host_name = socket.gethostname() host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name) host_ip = socket.gethostbyname(host_name)
# ✅ Bind the logger with topic and extra fields
self.logger = guru_logger.bind( self.logger = guru_logger.bind(
topic=self.__logger_name, topic=self.__logger_name,
host_ip=host_ip, host_ip=host_ip,
host_name=host_name, host_name=host_name,
**self.extra_fields, # Include additional metadata app=log_settings.APP_NAME,
env=log_settings.ENVIRONMENT,
) )
with LoggerBase.logger_lock:
# ✅ Store reference to prevent duplicate instances
LoggerBase.binded_loggers[self.__logger_name] = self.logger LoggerBase.binded_loggers[self.__logger_name] = self.logger
async def log_event( def _get_log_context(self) -> dict:
self, frame = inspect.currentframe().f_back.f_back
sender_id: str, filename = os.path.basename(frame.f_code.co_filename)
receiver_id: str, lineno = frame.f_lineno
subject: str, return {"log_file": filename, "log_line": lineno}
event: str,
properties: Dict[str, Any], def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]:
text: str = "", props = {} if properties is None else properties.copy()
) -> None: props_str = json.dumps(props, ensure_ascii=False) if props else "{}"
local_logger = self.logger.bind( return props, props_str
sender_id=sender_id,
receiver_id=receiver_id, async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None:
subject=subject, props, props_str = self._prepare_properties(properties)
event=event, context = self._get_log_context()
properties=properties, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context)
)
local_logger.info(text) local_logger.info(text)
async def log_exception( async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context)
subject: str,
exception: Exception,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="exception",
properties=properties,
exception=exception,
)
local_logger.exception(text) local_logger.exception(text)
async def log_info( async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context)
subject: str,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="information",
properties=properties,
)
local_logger.info(text) local_logger.info(text)
async def log_warning( async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context)
subject: str,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="warning",
properties=properties,
)
local_logger.warning(text) local_logger.warning(text)
async def log_error( async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context)
subject: str,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="error",
properties=properties,
)
local_logger.error(text) local_logger.error(text)
@staticmethod
def configure_uvicorn_logging():
print("📢 Setting up uvicorn logging interception...")
# Intercept logs from these loggers
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# Replace default handlers
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
# Configure specific uvicorn loggers
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove default handlers
logging_logger.propagate = True # Ensure propagation through Loguru

View File

@ -0,0 +1,25 @@
from .base_logger import LoggerBase
from common.config.app_settings import app_settings
import json
class BusinessMetricLogger(LoggerBase):
def __init__(self, business_metrics: dict[str, any] = {}) -> None:
extra_fileds = {}
if business_metrics:
extra_fileds.update(business_metrics)
super().__init__(
logger_name=app_settings.BUSINESS_METRIC_LOG,
extra_fileds=extra_fileds,
)
async def log_metrics(self, business_metrics: dict[str, any] = {}) -> None:
return await super().log_event(
sender_id="business_metric_manager",
receiver_id="business_metric_logger",
subject="metrics",
event="logging",
properties=business_metrics,
text="business metric logged"
)

View File

@ -0,0 +1,85 @@
import json
import datetime
import traceback
from pathlib import Path
from typing import Optional
class JsonSink:
def __init__(
self,
log_file_path: str,
rotation_size_bytes: int = 10 * 1024 * 1024,
max_backup_files: int = 5,
):
self.log_file_path = Path(log_file_path)
self.rotation_size = rotation_size_bytes
self.max_backup_files = max_backup_files
self._open_log_file()
def _open_log_file(self):
# ensure the parent directory exists
parent_dir = self.log_file_path.parent
if not parent_dir.exists():
parent_dir.mkdir(parents=True, exist_ok=True)
self.log_file = self.log_file_path.open("a", encoding="utf-8")
def _should_rotate(self) -> bool:
return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size
def _rotate(self):
self.log_file.close()
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}")
self.log_file_path.rename(rotated_path)
self._cleanup_old_backups()
self._open_log_file()
def _cleanup_old_backups(self):
parent = self.log_file_path.parent
stem = self.log_file_path.stem
suffix = self.log_file_path.suffix
backup_files = sorted(
parent.glob(f"{stem}_*{suffix}"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
for old_file in backup_files[self.max_backup_files:]:
try:
old_file.unlink()
except Exception as e:
print(f"Failed to delete old backup {old_file}: {e}")
def __call__(self, message):
record = message.record
if self._should_rotate():
self._rotate()
log_entry = {
"level": record["level"].name.lower(),
"timestamp": int(record["time"].timestamp() * 1000),
"text": record["message"],
"fields": record["extra"].get("properties", {}),
"context": {
"app": record["extra"].get("app"),
"env": record["extra"].get("env"),
"log_file": record["extra"].get("log_file"),
"log_line": record["extra"].get("log_line"),
"topic": record["extra"].get("topic"),
"sender_id": record["extra"].get("sender_id"),
"receiver_id": record["extra"].get("receiver_id"),
"subject": record["extra"].get("subject"),
"event": record["extra"].get("event"),
"host_ip": record["extra"].get("host_ip"),
"host_name": record["extra"].get("host_name"),
},
"stacktrace": None
}
if record["exception"]:
exc_type, exc_value, exc_tb = record["exception"]
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
self.log_file.flush()

View File

@ -0,0 +1,14 @@
from .base_logger import LoggerBase
from common.config.app_settings import app_settings
import json
class UserLogger(LoggerBase):
def __init__(self, user_activities: dict[str, any] = {}) -> None:
extra_fileds = {}
if user_activities:
extra_fileds.update(user_activities)
super().__init__(
logger_name=app_settings.USER_ACTIVITY_LOG, extra_fileds=extra_fileds
)

View File

@ -1,47 +1,8 @@
import logging
import sys
from loguru import logger as guru_logger from loguru import logger as guru_logger
from common.log.base_logger import LoggerBase
def register_logger(): def register_logger():
print("📢 Setting up logging interception...") print("📢 Setting up logging interception...")
LoggerBase.configure_uvicorn_logging()
# 🔴 **Ensure Uvicorn Logs Are Captured**
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# 🔴 **Replace Existing Loggers with Interception**
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove Uvicorn default handlers
logging_logger.propagate = True # ✅ Ensure they propagate through Loguru
# 🔴 **Redirect stdout/stderr to Loguru (Keep Green Timestamps)**
guru_logger.remove()
guru_logger.add(
sys.stdout,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | {level} | {message}",
)
print("✅ Logging interception complete. Logs are formatted and deduplicated!") print("✅ Logging interception complete. Logs are formatted and deduplicated!")

View File

@ -5,6 +5,7 @@ from pydantic_settings import BaseSettings
class AppSettings(BaseSettings): class AppSettings(BaseSettings):
NAME: str = "central_storage" NAME: str = "central_storage"
APP_NAME: str = NAME APP_NAME: str = NAME
APP_ENV: str = "alpha"
METRICS_ENABLED: bool = False METRICS_ENABLED: bool = False
PROBES_ENABLED: bool = True PROBES_ENABLED: bool = True

View File

@ -1,17 +1,17 @@
import os
from pydantic_settings import BaseSettings from dataclasses import dataclass
from .app_settings import app_settings from .app_settings import app_settings
class LogSettings(BaseSettings):
LOG_LEVEL: str = "DEBUG"
LOG_BASE_PATH: str = app_settings.LOG_BASE_PATH
LOG_PATH: str = LOG_BASE_PATH + '/' + app_settings.BACKEND_LOG_FILE_NAME + '.log'
LOG_RETENTION: str = "14 days"
LOG_ROTATION: str = "00:00" # mid night
class Config: @dataclass
env_file = ".log.env" class LogSettings:
env_file_encoding = "utf-8" LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH
LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days")
LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight
MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5))
LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB
APP_NAME: str = app_settings.APP_NAME
ENVIRONMENT: str = app_settings.APP_ENV
log_settings = LogSettings() log_settings = LogSettings()

View File

@ -0,0 +1,12 @@
from .base_logger import LoggerBase
from common.config.app_settings import app_settings
class ApplicationLogger(LoggerBase):
def __init__(self, application_activities: dict[str, any] = {}) -> None:
extra_fileds = {}
if application_activities:
extra_fileds.update(application_activities)
super().__init__(
logger_name=app_settings.APPLICATION_ACTIVITY_LOG,
extra_fileds=extra_fileds,
)

View File

@ -0,0 +1,136 @@
from loguru import logger as guru_logger
from common.config.log_settings import log_settings
from typing import Dict, Any, Optional
import socket
import json
import threading
import os
import sys
import inspect
import logging
from common.log.json_sink import JsonSink
class LoggerBase:
binded_loggers = {}
logger_lock = threading.Lock()
def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None:
self.__logger_name = logger_name
self.extra_fileds = extra_fileds
with LoggerBase.logger_lock:
if self.__logger_name in LoggerBase.binded_loggers:
self.logger = LoggerBase.binded_loggers[self.__logger_name]
return
log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log"
log_level = "INFO"
rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024)
guru_logger.remove()
file_sink = JsonSink(
log_file_path=log_filename,
rotation_size_bytes=rotation_bytes,
max_backup_files=log_settings.MAX_BACKUP_FILES
)
guru_logger.add(
sink=file_sink,
level=log_level,
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
)
guru_logger.add(
sink=sys.stderr,
level=log_level,
format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}",
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
)
host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name)
self.logger = guru_logger.bind(
topic=self.__logger_name,
host_ip=host_ip,
host_name=host_name,
app=log_settings.APP_NAME,
env=log_settings.ENVIRONMENT,
)
with LoggerBase.logger_lock:
LoggerBase.binded_loggers[self.__logger_name] = self.logger
def _get_log_context(self) -> dict:
frame = inspect.currentframe().f_back.f_back
filename = os.path.basename(frame.f_code.co_filename)
lineno = frame.f_lineno
return {"log_file": filename, "log_line": lineno}
def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]:
props = {} if properties is None else properties.copy()
props_str = json.dumps(props, ensure_ascii=False) if props else "{}"
return props, props_str
async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context)
local_logger.info(text)
async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context)
local_logger.exception(text)
async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context)
local_logger.info(text)
async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context)
local_logger.warning(text)
async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context)
local_logger.error(text)
@staticmethod
def configure_uvicorn_logging():
print("📢 Setting up uvicorn logging interception...")
# Intercept logs from these loggers
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# Replace default handlers
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
# Configure specific uvicorn loggers
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove default handlers
logging_logger.propagate = True # Ensure propagation through Loguru

View File

@ -0,0 +1,25 @@
from .base_logger import LoggerBase
from common.config.app_settings import app_settings
import json
class BusinessMetricLogger(LoggerBase):
def __init__(self, business_metrics: dict[str, any] = {}) -> None:
extra_fileds = {}
if business_metrics:
extra_fileds.update(business_metrics)
super().__init__(
logger_name=app_settings.BUSINESS_METRIC_LOG,
extra_fileds=extra_fileds,
)
async def log_metrics(self, business_metrics: dict[str, any] = {}) -> None:
return await super().log_event(
sender_id="business_metric_manager",
receiver_id="business_metric_logger",
subject="metrics",
event="logging",
properties=business_metrics,
text="business metric logged"
)

View File

@ -0,0 +1,50 @@
from .application_logger import ApplicationLogger
class FunctionLogger(ApplicationLogger):
def __init__(self, sender_id: str, receiver_id:str) -> None:
super().__init__()
self.event_sender_id = sender_id
self.event_receiver_id = receiver_id
self.event_subject = "function"
async def log_enter(self, function: str, file: str):
return await super().log_event(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
event="enter",
properties={
"function": function,
"file": file,
},
text="Enter:{} of {}".format(function, file)
)
async def log_exit(self, function: str, file: str, excution_time_in_ns: int):
return await super().log_event(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
event="exit",
properties={
"function": function,
"file": file,
"excution_time_in_ns": excution_time_in_ns
},
text="Exit:{} of {}".format(function, file)
)
async def log_exception(self, exception: Exception, function: str, file: str, excution_time_in_ns: int) -> None:
return await super().log_exception(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
exception=exception,
text="Exception:{} of {}".format(function, file),
properties={
"function": function,
"file": file,
"excution_time_in_ns": excution_time_in_ns
},
)

View File

@ -0,0 +1,85 @@
import json
import datetime
import traceback
from pathlib import Path
from typing import Optional
class JsonSink:
def __init__(
self,
log_file_path: str,
rotation_size_bytes: int = 10 * 1024 * 1024,
max_backup_files: int = 5,
):
self.log_file_path = Path(log_file_path)
self.rotation_size = rotation_size_bytes
self.max_backup_files = max_backup_files
self._open_log_file()
def _open_log_file(self):
# ensure the parent directory exists
parent_dir = self.log_file_path.parent
if not parent_dir.exists():
parent_dir.mkdir(parents=True, exist_ok=True)
self.log_file = self.log_file_path.open("a", encoding="utf-8")
def _should_rotate(self) -> bool:
return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size
def _rotate(self):
self.log_file.close()
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}")
self.log_file_path.rename(rotated_path)
self._cleanup_old_backups()
self._open_log_file()
def _cleanup_old_backups(self):
parent = self.log_file_path.parent
stem = self.log_file_path.stem
suffix = self.log_file_path.suffix
backup_files = sorted(
parent.glob(f"{stem}_*{suffix}"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
for old_file in backup_files[self.max_backup_files:]:
try:
old_file.unlink()
except Exception as e:
print(f"Failed to delete old backup {old_file}: {e}")
def __call__(self, message):
record = message.record
if self._should_rotate():
self._rotate()
log_entry = {
"level": record["level"].name.lower(),
"timestamp": int(record["time"].timestamp() * 1000),
"text": record["message"],
"fields": record["extra"].get("properties", {}),
"context": {
"app": record["extra"].get("app"),
"env": record["extra"].get("env"),
"log_file": record["extra"].get("log_file"),
"log_line": record["extra"].get("log_line"),
"topic": record["extra"].get("topic"),
"sender_id": record["extra"].get("sender_id"),
"receiver_id": record["extra"].get("receiver_id"),
"subject": record["extra"].get("subject"),
"event": record["extra"].get("event"),
"host_ip": record["extra"].get("host_ip"),
"host_name": record["extra"].get("host_name"),
},
"stacktrace": None
}
if record["exception"]:
exc_type, exc_value, exc_tb = record["exception"]
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
self.log_file.flush()

View File

@ -0,0 +1,25 @@
import os
from .function_logger import FunctionLogger
import time
import functools
def log_entry_exit_async(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
file_path = os.path.relpath(func.__code__.co_filename)
function_logger = FunctionLogger(sender_id="log_entry_exit_async", receiver_id="function_logger")
start_time = time.process_time_ns()
try:
await function_logger.log_enter(func.__name__, file_path)
result = await func(*args, **kwargs)
await function_logger.log_exit(func.__name__, file_path, time.process_time_ns() - start_time)
return result
except Exception as exception:
await function_logger.log_exception(
exception=exception,
function=func.__name__,
file=file_path,
excution_time_in_ns=time.process_time_ns() - start_time)
raise
return wrapper

View File

@ -0,0 +1,46 @@
from .application_logger import ApplicationLogger
class ModuleLogger(ApplicationLogger):
def __init__(self, sender_id: str) -> None:
super().__init__()
self.event_sender_id = sender_id
self.event_receiver_id = "ModuleLogger"
self.event_subject = "module"
async def log_exception(self, exception: Exception, text: str = "Exception", properties: dict[str, any] = None) -> None:
return await super().log_exception(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
exception=exception,
text=text,
properties=properties,
)
async def log_info(self, info: str, properties: dict[str, any] = None) -> None:
return await super().log_info(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=info,
properties=properties,
)
async def log_warning(self, warning: str, properties: dict[str, any] = None) -> None:
return await super().log_warning(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=warning,
properties=properties,
)
async def log_error(self, error: str, properties: dict[str, any] = None) -> None:
return await super().log_error(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=error,
properties=properties,
)

View File

@ -0,0 +1,14 @@
from .base_logger import LoggerBase
from common.config.app_settings import app_settings
import json
class UserLogger(LoggerBase):
def __init__(self, user_activities: dict[str, any] = {}) -> None:
extra_fileds = {}
if user_activities:
extra_fileds.update(user_activities)
super().__init__(
logger_name=app_settings.USER_ACTIVITY_LOG, extra_fileds=extra_fileds
)

View File

@ -1,47 +1,8 @@
import logging
import sys
from loguru import logger as guru_logger from loguru import logger as guru_logger
from common.log.base_logger import LoggerBase
def register_logger(): def register_logger():
print("📢 Setting up logging interception...") print("📢 Setting up logging interception...")
LoggerBase.configure_uvicorn_logging()
# 🔴 **Ensure Uvicorn Logs Are Captured**
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# 🔴 **Replace Existing Loggers with Interception**
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove Uvicorn default handlers
logging_logger.propagate = True # ✅ Ensure they propagate through Loguru
# 🔴 **Redirect stdout/stderr to Loguru (Keep Green Timestamps)**
guru_logger.remove()
guru_logger.add(
sys.stdout,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | {level} | {message}",
)
print("✅ Logging interception complete. Logs are formatted and deduplicated!") print("✅ Logging interception complete. Logs are formatted and deduplicated!")

View File

@ -1,6 +1,7 @@
from common.config.app_settings import app_settings from common.config.app_settings import app_settings
from backend.content.models import DocumentDoc from backend.content.models import DocumentDoc
from backend.document.models import BasicProfileDoc from backend.document.models import BasicProfileDoc
from datetime import datetime, timezone
import httpx import httpx
import base64 import base64
@ -53,9 +54,12 @@ class DocumentManager:
print(f"Failed to queue deletion: {response.text}") print(f"Failed to queue deletion: {response.text}")
async def cleanup_document(self): async def cleanup_document(self):
# Get today's date at midnight (UTC)
today_start = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
# Corrected query with regex # Corrected query with regex
documents = await DocumentDoc.find( documents = await DocumentDoc.find(
{"created_by": {"$regex": "^content-service-"}} {"created_by": {"$regex": "^content-service-"},
"create_time": {"$lt": today_start}}
).to_list() ).to_list()
if documents: if documents:

View File

@ -4,6 +4,7 @@ from pydantic_settings import BaseSettings
class AppSettings(BaseSettings): class AppSettings(BaseSettings):
NAME: str = "content" NAME: str = "content"
APP_NAME: str = NAME APP_NAME: str = NAME
APP_ENV: str = "alpha"
METRICS_ENABLED: bool = False METRICS_ENABLED: bool = False
PROBES_ENABLED: bool = True PROBES_ENABLED: bool = True

View File

@ -1,17 +1,16 @@
from pydantic_settings import BaseSettings import os
from dataclasses import dataclass
from .app_settings import app_settings from .app_settings import app_settings
@dataclass
class LogSettings(BaseSettings): class LogSettings:
LOG_LEVEL: str = "DEBUG" LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH
LOG_BASE_PATH: str = app_settings.LOG_BASE_PATH LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days")
LOG_PATH: str = LOG_BASE_PATH + "/" + app_settings.BACKEND_LOG_FILE_NAME + ".log" LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight
LOG_RETENTION: str = "14 days" MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5))
LOG_ROTATION: str = "00:00" # mid night LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB
APP_NAME: str = app_settings.APP_NAME
class Config: ENVIRONMENT: str = app_settings.APP_ENV
env_file = ".log.env"
env_file_encoding = "utf-8"
log_settings = LogSettings() log_settings = LogSettings()

View File

@ -1,14 +1,12 @@
from .base_logger import LoggerBase from .base_logger import LoggerBase
from common.config.app_settings import app_settings from common.config.app_settings import app_settings
import json
class ApplicationLogger(LoggerBase): class ApplicationLogger(LoggerBase):
def __init__(self, application_activities: dict[str, any] = {}) -> None: def __init__(self, application_activities: dict[str, any] = {}) -> None:
extra_fields = {} extra_fileds = {}
if application_activities: if application_activities:
extra_fields.update(application_activities) extra_fileds.update(application_activities)
super().__init__( super().__init__(
logger_name=app_settings.APPLICATION_ACTIVITY_LOG, logger_name=app_settings.APPLICATION_ACTIVITY_LOG,
extra_fields=extra_fields, extra_fileds=extra_fileds,
) )

View File

@ -1,140 +1,136 @@
from loguru import logger as guru_logger from loguru import logger as guru_logger
from common.config.log_settings import log_settings from common.config.log_settings import log_settings
from typing import Dict, Any from typing import Dict, Any, Optional
import socket import socket
import json
import threading import threading
import os
import sys
import inspect
import logging
from common.log.json_sink import JsonSink
class LoggerBase: class LoggerBase:
binded_loggers = {} # Stores logger instances binded_loggers = {}
loguru_sinks_added = set() # Tracks added log sinks
logger_lock = threading.Lock() logger_lock = threading.Lock()
def __init__(self, logger_name: str, extra_fields: Dict[str, Any] = None) -> None: def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None:
self.__logger_name = logger_name self.__logger_name = logger_name
self.extra_fields = extra_fields or {} self.extra_fileds = extra_fileds
with LoggerBase.logger_lock: with LoggerBase.logger_lock:
# ✅ **If already created, reuse it** to prevent duplicates
if self.__logger_name in LoggerBase.binded_loggers: if self.__logger_name in LoggerBase.binded_loggers:
self.logger = LoggerBase.binded_loggers[self.__logger_name] self.logger = LoggerBase.binded_loggers[self.__logger_name]
return return
log_filename = f"{log_settings.LOG_BASE_PATH}/{self.__logger_name}.log" log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log"
log_level = "INFO"
rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024)
# ✅ **Ensure Loguru sink is added only once** guru_logger.remove()
if log_filename not in LoggerBase.loguru_sinks_added:
guru_logger.add( file_sink = JsonSink(
sink=log_filename, log_file_path=log_filename,
level="INFO", rotation_size_bytes=rotation_bytes,
retention=log_settings.LOG_RETENTION, max_backup_files=log_settings.MAX_BACKUP_FILES
rotation=log_settings.LOG_ROTATION, )
format="{message}", guru_logger.add(
serialize=True, sink=file_sink,
filter=lambda record: "extra" in record level=log_level,
and "topic" in record["extra"] filter=lambda record: record["extra"].get("topic") == self.__logger_name,
and record["extra"]["topic"] == self.__logger_name, )
guru_logger.add(
sink=sys.stderr,
level=log_level,
format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}",
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
) )
LoggerBase.loguru_sinks_added.add(log_filename) # ✅ Mark as added
host_name = socket.gethostname() host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name) host_ip = socket.gethostbyname(host_name)
# ✅ Bind the logger with topic and extra fields
self.logger = guru_logger.bind( self.logger = guru_logger.bind(
topic=self.__logger_name, topic=self.__logger_name,
host_ip=host_ip, host_ip=host_ip,
host_name=host_name, host_name=host_name,
**self.extra_fields, # Include additional metadata app=log_settings.APP_NAME,
env=log_settings.ENVIRONMENT,
) )
with LoggerBase.logger_lock:
# ✅ Store reference to prevent duplicate instances
LoggerBase.binded_loggers[self.__logger_name] = self.logger LoggerBase.binded_loggers[self.__logger_name] = self.logger
async def log_event( def _get_log_context(self) -> dict:
self, frame = inspect.currentframe().f_back.f_back
sender_id: str, filename = os.path.basename(frame.f_code.co_filename)
receiver_id: str, lineno = frame.f_lineno
subject: str, return {"log_file": filename, "log_line": lineno}
event: str,
properties: Dict[str, Any], def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]:
text: str = "", props = {} if properties is None else properties.copy()
) -> None: props_str = json.dumps(props, ensure_ascii=False) if props else "{}"
local_logger = self.logger.bind( return props, props_str
sender_id=sender_id,
receiver_id=receiver_id, async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None:
subject=subject, props, props_str = self._prepare_properties(properties)
event=event, context = self._get_log_context()
properties=properties, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context)
)
local_logger.info(text) local_logger.info(text)
async def log_exception( async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context)
subject: str,
exception: Exception,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="exception",
properties=properties,
exception=exception,
)
local_logger.exception(text) local_logger.exception(text)
async def log_info( async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context)
subject: str,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="information",
properties=properties,
)
local_logger.info(text) local_logger.info(text)
async def log_warning( async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context)
subject: str,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="warning",
properties=properties,
)
local_logger.warning(text) local_logger.warning(text)
async def log_error( async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context)
subject: str,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="error",
properties=properties,
)
local_logger.error(text) local_logger.error(text)
@staticmethod
def configure_uvicorn_logging():
print("📢 Setting up uvicorn logging interception...")
# Intercept logs from these loggers
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# Replace default handlers
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
# Configure specific uvicorn loggers
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove default handlers
logging_logger.propagate = True # Ensure propagation through Loguru

View File

@ -5,14 +5,15 @@ import json
class BusinessMetricLogger(LoggerBase): class BusinessMetricLogger(LoggerBase):
def __init__(self, business_metrics: dict[str, any] = {}) -> None: def __init__(self, business_metrics: dict[str, any] = {}) -> None:
extra_fields = {} extra_fileds = {}
if business_metrics: if business_metrics:
extra_fields.update(business_metrics) extra_fileds.update(business_metrics)
super().__init__( super().__init__(
logger_name=app_settings.BUSINESS_METRIC_LOG, logger_name=app_settings.BUSINESS_METRIC_LOG,
extra_fields=extra_fields, extra_fileds=extra_fileds,
) )
async def log_metrics(self, business_metrics: dict[str, any] = {}) -> None: async def log_metrics(self, business_metrics: dict[str, any] = {}) -> None:
return await super().log_event( return await super().log_event(
sender_id="business_metric_manager", sender_id="business_metric_manager",
@ -20,5 +21,5 @@ class BusinessMetricLogger(LoggerBase):
subject="metrics", subject="metrics",
event="logging", event="logging",
properties=business_metrics, properties=business_metrics,
text="business metric logged", text="business metric logged"
) )

View File

@ -0,0 +1,85 @@
import json
import datetime
import traceback
from pathlib import Path
from typing import Optional
class JsonSink:
def __init__(
self,
log_file_path: str,
rotation_size_bytes: int = 10 * 1024 * 1024,
max_backup_files: int = 5,
):
self.log_file_path = Path(log_file_path)
self.rotation_size = rotation_size_bytes
self.max_backup_files = max_backup_files
self._open_log_file()
def _open_log_file(self):
# ensure the parent directory exists
parent_dir = self.log_file_path.parent
if not parent_dir.exists():
parent_dir.mkdir(parents=True, exist_ok=True)
self.log_file = self.log_file_path.open("a", encoding="utf-8")
def _should_rotate(self) -> bool:
return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size
def _rotate(self):
self.log_file.close()
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}")
self.log_file_path.rename(rotated_path)
self._cleanup_old_backups()
self._open_log_file()
def _cleanup_old_backups(self):
parent = self.log_file_path.parent
stem = self.log_file_path.stem
suffix = self.log_file_path.suffix
backup_files = sorted(
parent.glob(f"{stem}_*{suffix}"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
for old_file in backup_files[self.max_backup_files:]:
try:
old_file.unlink()
except Exception as e:
print(f"Failed to delete old backup {old_file}: {e}")
def __call__(self, message):
record = message.record
if self._should_rotate():
self._rotate()
log_entry = {
"level": record["level"].name.lower(),
"timestamp": int(record["time"].timestamp() * 1000),
"text": record["message"],
"fields": record["extra"].get("properties", {}),
"context": {
"app": record["extra"].get("app"),
"env": record["extra"].get("env"),
"log_file": record["extra"].get("log_file"),
"log_line": record["extra"].get("log_line"),
"topic": record["extra"].get("topic"),
"sender_id": record["extra"].get("sender_id"),
"receiver_id": record["extra"].get("receiver_id"),
"subject": record["extra"].get("subject"),
"event": record["extra"].get("event"),
"host_ip": record["extra"].get("host_ip"),
"host_name": record["extra"].get("host_name"),
},
"stacktrace": None
}
if record["exception"]:
exc_type, exc_value, exc_tb = record["exception"]
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
self.log_file.flush()

View File

@ -6,9 +6,9 @@ import json
class UserLogger(LoggerBase): class UserLogger(LoggerBase):
def __init__(self, user_activities: dict[str, any] = {}) -> None: def __init__(self, user_activities: dict[str, any] = {}) -> None:
extra_fields = {} extra_fileds = {}
if user_activities: if user_activities:
extra_fields.update(user_activities) extra_fileds.update(user_activities)
super().__init__( super().__init__(
logger_name=app_settings.USER_ACTIVITY_LOG, extra_fields=extra_fields logger_name=app_settings.USER_ACTIVITY_LOG, extra_fileds=extra_fileds
) )

View File

@ -1,5 +1,6 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime, timedelta from datetime import datetime, timedelta
from scheduler.refresh_sharepoint_content_job import ( from scheduler.refresh_sharepoint_content_job import (
refresh_sharepoint_content_job, refresh_sharepoint_content_job,
@ -28,15 +29,11 @@ async def register_job(scheduler: AsyncIOScheduler):
"date", "date",
run_date=datetime(2025, 2, 7, 20, 0, 0), run_date=datetime(2025, 2, 7, 20, 0, 0),
) )
# Register cleanup_document_job as a one-time job
# This job is just one-time job for removing many unused documents await init_lock(ScheduleJobLocker.CLEANUP_DOCUMENT_JOB_LOCKER)
# Run already, now comment it out scheduler.add_job(
# await init_lock(ScheduleJobLocker.CLEANUP_DOCUMENT_JOB_LOCKER) cleanup_document_job,
# execution_time = datetime.now() + timedelta( trigger=CronTrigger(hour=2, minute=0), # Runs every day at 2:00 AM
# seconds=60 id="cleanup_document_daily",
# ) # Schedule to run 60 seconds from now )
# scheduler.add_job(
# cleanup_document_job, # Job function
# trigger=DateTrigger(run_date=execution_time), # One-time trigger
# id="cleanup_document_one_time", # Optional: Give the job an ID
# )

View File

@ -1,47 +1,8 @@
import logging
import sys
from loguru import logger as guru_logger from loguru import logger as guru_logger
from common.log.base_logger import LoggerBase
def register_logger(): def register_logger():
print("📢 Setting up logging interception...") print("📢 Setting up logging interception...")
LoggerBase.configure_uvicorn_logging()
# 🔴 **Ensure Uvicorn Logs Are Captured**
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# 🔴 **Replace Existing Loggers with Interception**
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove Uvicorn default handlers
logging_logger.propagate = True # ✅ Ensure they propagate through Loguru
# 🔴 **Redirect stdout/stderr to Loguru (Keep Green Timestamps)**
guru_logger.remove()
guru_logger.add(
sys.stdout,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | {level} | {message}",
)
print("✅ Logging interception complete. Logs are formatted and deduplicated!") print("✅ Logging interception complete. Logs are formatted and deduplicated!")

View File

@ -5,6 +5,7 @@ from pydantic_settings import BaseSettings
class AppSettings(BaseSettings): class AppSettings(BaseSettings):
NAME: str = "notification" NAME: str = "notification"
APP_NAME:str = NAME APP_NAME:str = NAME
APP_ENV: str = "alpha"
METRICS_ENABLED: bool = False METRICS_ENABLED: bool = False
PROBES_ENABLED: bool = True PROBES_ENABLED: bool = True

View File

@ -1,17 +1,16 @@
import os
from pydantic_settings import BaseSettings from dataclasses import dataclass
from .app_settings import app_settings from .app_settings import app_settings
class LogSettings(BaseSettings): @dataclass
LOG_LEVEL: str = "DEBUG" class LogSettings:
LOG_BASE_PATH: str = app_settings.LOG_BASE_PATH LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH
LOG_PATH: str = LOG_BASE_PATH + '/' + app_settings.BACKEND_LOG_FILE_NAME + '.log' LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days")
LOG_RETENTION: str = "14 days" LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight
LOG_ROTATION: str = "00:00" # mid night MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5))
LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB
class Config: APP_NAME: str = app_settings.APP_NAME
env_file = ".log.env" ENVIRONMENT: str = app_settings.APP_ENV
env_file_encoding = "utf-8"
log_settings = LogSettings() log_settings = LogSettings()

View File

@ -1,14 +1,12 @@
from .base_logger import LoggerBase from .base_logger import LoggerBase
from common.config.app_settings import app_settings from common.config.app_settings import app_settings
import json
class ApplicationLogger(LoggerBase): class ApplicationLogger(LoggerBase):
def __init__(self, application_activities: dict[str, any] = {}) -> None: def __init__(self, application_activities: dict[str, any] = {}) -> None:
extra_fields = {} extra_fileds = {}
if application_activities: if application_activities:
extra_fields.update(application_activities) extra_fileds.update(application_activities)
super().__init__( super().__init__(
logger_name=app_settings.APPLICATION_ACTIVITY_LOG, logger_name=app_settings.APPLICATION_ACTIVITY_LOG,
extra_fields=extra_fields, extra_fileds=extra_fileds,
) )

View File

@ -1,140 +1,136 @@
from loguru import logger as guru_logger from loguru import logger as guru_logger
from common.config.log_settings import log_settings from common.config.log_settings import log_settings
from typing import Dict, Any from typing import Dict, Any, Optional
import socket import socket
import json
import threading import threading
import os
import sys
import inspect
import logging
from common.log.json_sink import JsonSink
class LoggerBase: class LoggerBase:
binded_loggers = {} # Stores logger instances binded_loggers = {}
loguru_sinks_added = set() # Tracks added log sinks
logger_lock = threading.Lock() logger_lock = threading.Lock()
def __init__(self, logger_name: str, extra_fields: Dict[str, Any] = None) -> None: def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None:
self.__logger_name = logger_name self.__logger_name = logger_name
self.extra_fields = extra_fields or {} self.extra_fileds = extra_fileds
with LoggerBase.logger_lock: with LoggerBase.logger_lock:
# ✅ **If already created, reuse it** to prevent duplicates
if self.__logger_name in LoggerBase.binded_loggers: if self.__logger_name in LoggerBase.binded_loggers:
self.logger = LoggerBase.binded_loggers[self.__logger_name] self.logger = LoggerBase.binded_loggers[self.__logger_name]
return return
log_filename = f"{log_settings.LOG_BASE_PATH}/{self.__logger_name}.log" log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log"
log_level = "INFO"
rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024)
# ✅ **Ensure Loguru sink is added only once** guru_logger.remove()
if log_filename not in LoggerBase.loguru_sinks_added:
guru_logger.add( file_sink = JsonSink(
sink=log_filename, log_file_path=log_filename,
level="INFO", rotation_size_bytes=rotation_bytes,
retention=log_settings.LOG_RETENTION, max_backup_files=log_settings.MAX_BACKUP_FILES
rotation=log_settings.LOG_ROTATION, )
format="{message}", guru_logger.add(
serialize=True, sink=file_sink,
filter=lambda record: "extra" in record level=log_level,
and "topic" in record["extra"] filter=lambda record: record["extra"].get("topic") == self.__logger_name,
and record["extra"]["topic"] == self.__logger_name, )
guru_logger.add(
sink=sys.stderr,
level=log_level,
format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}",
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
) )
LoggerBase.loguru_sinks_added.add(log_filename) # ✅ Mark as added
host_name = socket.gethostname() host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name) host_ip = socket.gethostbyname(host_name)
# ✅ Bind the logger with topic and extra fields
self.logger = guru_logger.bind( self.logger = guru_logger.bind(
topic=self.__logger_name, topic=self.__logger_name,
host_ip=host_ip, host_ip=host_ip,
host_name=host_name, host_name=host_name,
**self.extra_fields, # Include additional metadata app=log_settings.APP_NAME,
env=log_settings.ENVIRONMENT,
) )
with LoggerBase.logger_lock:
# ✅ Store reference to prevent duplicate instances
LoggerBase.binded_loggers[self.__logger_name] = self.logger LoggerBase.binded_loggers[self.__logger_name] = self.logger
async def log_event( def _get_log_context(self) -> dict:
self, frame = inspect.currentframe().f_back.f_back
sender_id: str, filename = os.path.basename(frame.f_code.co_filename)
receiver_id: str, lineno = frame.f_lineno
subject: str, return {"log_file": filename, "log_line": lineno}
event: str,
properties: Dict[str, Any], def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]:
text: str = "", props = {} if properties is None else properties.copy()
) -> None: props_str = json.dumps(props, ensure_ascii=False) if props else "{}"
local_logger = self.logger.bind( return props, props_str
sender_id=sender_id,
receiver_id=receiver_id, async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None:
subject=subject, props, props_str = self._prepare_properties(properties)
event=event, context = self._get_log_context()
properties=properties, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context)
)
local_logger.info(text) local_logger.info(text)
async def log_exception( async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context)
subject: str,
exception: Exception,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="exception",
properties=properties,
exception=exception,
)
local_logger.exception(text) local_logger.exception(text)
async def log_info( async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context)
subject: str,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="information",
properties=properties,
)
local_logger.info(text) local_logger.info(text)
async def log_warning( async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context)
subject: str,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="warning",
properties=properties,
)
local_logger.warning(text) local_logger.warning(text)
async def log_error( async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context)
subject: str,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="error",
properties=properties,
)
local_logger.error(text) local_logger.error(text)
@staticmethod
def configure_uvicorn_logging():
print("📢 Setting up uvicorn logging interception...")
# Intercept logs from these loggers
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# Replace default handlers
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
# Configure specific uvicorn loggers
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove default handlers
logging_logger.propagate = True # Ensure propagation through Loguru

View File

@ -0,0 +1,25 @@
from .base_logger import LoggerBase
from common.config.app_settings import app_settings
import json
class BusinessMetricLogger(LoggerBase):
def __init__(self, business_metrics: dict[str, any] = {}) -> None:
extra_fileds = {}
if business_metrics:
extra_fileds.update(business_metrics)
super().__init__(
logger_name=app_settings.BUSINESS_METRIC_LOG,
extra_fileds=extra_fileds,
)
async def log_metrics(self, business_metrics: dict[str, any] = {}) -> None:
return await super().log_event(
sender_id="business_metric_manager",
receiver_id="business_metric_logger",
subject="metrics",
event="logging",
properties=business_metrics,
text="business metric logged"
)

View File

@ -0,0 +1,85 @@
import json
import datetime
import traceback
from pathlib import Path
from typing import Optional
class JsonSink:
def __init__(
self,
log_file_path: str,
rotation_size_bytes: int = 10 * 1024 * 1024,
max_backup_files: int = 5,
):
self.log_file_path = Path(log_file_path)
self.rotation_size = rotation_size_bytes
self.max_backup_files = max_backup_files
self._open_log_file()
def _open_log_file(self):
# ensure the parent directory exists
parent_dir = self.log_file_path.parent
if not parent_dir.exists():
parent_dir.mkdir(parents=True, exist_ok=True)
self.log_file = self.log_file_path.open("a", encoding="utf-8")
def _should_rotate(self) -> bool:
return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size
def _rotate(self):
self.log_file.close()
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}")
self.log_file_path.rename(rotated_path)
self._cleanup_old_backups()
self._open_log_file()
def _cleanup_old_backups(self):
parent = self.log_file_path.parent
stem = self.log_file_path.stem
suffix = self.log_file_path.suffix
backup_files = sorted(
parent.glob(f"{stem}_*{suffix}"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
for old_file in backup_files[self.max_backup_files:]:
try:
old_file.unlink()
except Exception as e:
print(f"Failed to delete old backup {old_file}: {e}")
def __call__(self, message):
record = message.record
if self._should_rotate():
self._rotate()
log_entry = {
"level": record["level"].name.lower(),
"timestamp": int(record["time"].timestamp() * 1000),
"text": record["message"],
"fields": record["extra"].get("properties", {}),
"context": {
"app": record["extra"].get("app"),
"env": record["extra"].get("env"),
"log_file": record["extra"].get("log_file"),
"log_line": record["extra"].get("log_line"),
"topic": record["extra"].get("topic"),
"sender_id": record["extra"].get("sender_id"),
"receiver_id": record["extra"].get("receiver_id"),
"subject": record["extra"].get("subject"),
"event": record["extra"].get("event"),
"host_ip": record["extra"].get("host_ip"),
"host_name": record["extra"].get("host_name"),
},
"stacktrace": None
}
if record["exception"]:
exc_type, exc_value, exc_tb = record["exception"]
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
self.log_file.flush()

View File

@ -0,0 +1,14 @@
from .base_logger import LoggerBase
from common.config.app_settings import app_settings
import json
class UserLogger(LoggerBase):
def __init__(self, user_activities: dict[str, any] = {}) -> None:
extra_fileds = {}
if user_activities:
extra_fileds.update(user_activities)
super().__init__(
logger_name=app_settings.USER_ACTIVITY_LOG, extra_fileds=extra_fileds
)

View File

@ -1,47 +1,8 @@
import logging
import sys
from loguru import logger as guru_logger from loguru import logger as guru_logger
from common.log.base_logger import LoggerBase
def register_logger(): def register_logger():
print("📢 Setting up logging interception...") print("📢 Setting up logging interception...")
LoggerBase.configure_uvicorn_logging()
# 🔴 **Ensure Uvicorn Logs Are Captured**
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# 🔴 **Replace Existing Loggers with Interception**
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove Uvicorn default handlers
logging_logger.propagate = True # ✅ Ensure they propagate through Loguru
# 🔴 **Redirect stdout/stderr to Loguru (Keep Green Timestamps)**
guru_logger.remove()
guru_logger.add(
sys.stdout,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | {level} | {message}",
)
print("✅ Logging interception complete. Logs are formatted and deduplicated!") print("✅ Logging interception complete. Logs are formatted and deduplicated!")

View File

@ -292,10 +292,10 @@ class StripeManager:
}, },
}, },
mode="payment", mode="payment",
success_url="{}/work-space".format( success_url="{}/projects".format(
self.site_url_root self.site_url_root
), # needs to be set, local: http://localhost/ ), # needs to be set, local: http://localhost/
cancel_url="{}/work-space".format(self.site_url_root), cancel_url="{}/projects".format(self.site_url_root),
) )
if session: if session:

View File

@ -4,6 +4,7 @@ from pydantic_settings import BaseSettings
class AppSettings(BaseSettings): class AppSettings(BaseSettings):
NAME: str = "payment" NAME: str = "payment"
APP_NAME: str = NAME APP_NAME: str = NAME
APP_ENV: str = "alpha"
JWT_SECRET_KEY: str = "" JWT_SECRET_KEY: str = ""
ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600 ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600

View File

@ -1,17 +1,16 @@
import os
from pydantic_settings import BaseSettings from dataclasses import dataclass
from .app_settings import app_settings from .app_settings import app_settings
class LogSettings(BaseSettings): @dataclass
LOG_LEVEL: str = "DEBUG" class LogSettings:
LOG_BASE_PATH: str = app_settings.LOG_BASE_PATH LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH
LOG_PATH: str = LOG_BASE_PATH + '/' + app_settings.BACKEND_LOG_FILE_NAME + '.log' LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days")
LOG_RETENTION: str = "14 days" LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight
LOG_ROTATION: str = "00:00" # mid night MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5))
LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB
class Config: APP_NAME: str = app_settings.APP_NAME
env_file = ".log.env" ENVIRONMENT: str = app_settings.APP_ENV
env_file_encoding = "utf-8"
log_settings = LogSettings() log_settings = LogSettings()

View File

@ -1,14 +1,12 @@
from .base_logger import LoggerBase from .base_logger import LoggerBase
from common.config.app_settings import app_settings from common.config.app_settings import app_settings
import json
class ApplicationLogger(LoggerBase): class ApplicationLogger(LoggerBase):
def __init__(self, application_activities: dict[str, any] = {}) -> None: def __init__(self, application_activities: dict[str, any] = {}) -> None:
extra_fields = {} extra_fileds = {}
if application_activities: if application_activities:
extra_fields.update(application_activities) extra_fileds.update(application_activities)
super().__init__( super().__init__(
logger_name=app_settings.APPLICATION_ACTIVITY_LOG, logger_name=app_settings.APPLICATION_ACTIVITY_LOG,
extra_fields=extra_fields, extra_fileds=extra_fileds,
) )

View File

@ -1,140 +1,136 @@
from loguru import logger as guru_logger from loguru import logger as guru_logger
from common.config.log_settings import log_settings from common.config.log_settings import log_settings
from typing import Dict, Any from typing import Dict, Any, Optional
import socket import socket
import json
import threading import threading
import os
import sys
import inspect
import logging
from common.log.json_sink import JsonSink
class LoggerBase: class LoggerBase:
binded_loggers = {} # Stores logger instances binded_loggers = {}
loguru_sinks_added = set() # Tracks added log sinks
logger_lock = threading.Lock() logger_lock = threading.Lock()
def __init__(self, logger_name: str, extra_fields: Dict[str, Any] = None) -> None: def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None:
self.__logger_name = logger_name self.__logger_name = logger_name
self.extra_fields = extra_fields or {} self.extra_fileds = extra_fileds
with LoggerBase.logger_lock: with LoggerBase.logger_lock:
# ✅ **If already created, reuse it** to prevent duplicates
if self.__logger_name in LoggerBase.binded_loggers: if self.__logger_name in LoggerBase.binded_loggers:
self.logger = LoggerBase.binded_loggers[self.__logger_name] self.logger = LoggerBase.binded_loggers[self.__logger_name]
return return
log_filename = f"{log_settings.LOG_BASE_PATH}/{self.__logger_name}.log" log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log"
log_level = "INFO"
rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024)
# ✅ **Ensure Loguru sink is added only once** guru_logger.remove()
if log_filename not in LoggerBase.loguru_sinks_added:
guru_logger.add( file_sink = JsonSink(
sink=log_filename, log_file_path=log_filename,
level="INFO", rotation_size_bytes=rotation_bytes,
retention=log_settings.LOG_RETENTION, max_backup_files=log_settings.MAX_BACKUP_FILES
rotation=log_settings.LOG_ROTATION, )
format="{message}", guru_logger.add(
serialize=True, sink=file_sink,
filter=lambda record: "extra" in record level=log_level,
and "topic" in record["extra"] filter=lambda record: record["extra"].get("topic") == self.__logger_name,
and record["extra"]["topic"] == self.__logger_name, )
guru_logger.add(
sink=sys.stderr,
level=log_level,
format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}",
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
) )
LoggerBase.loguru_sinks_added.add(log_filename) # ✅ Mark as added
host_name = socket.gethostname() host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name) host_ip = socket.gethostbyname(host_name)
# ✅ Bind the logger with topic and extra fields
self.logger = guru_logger.bind( self.logger = guru_logger.bind(
topic=self.__logger_name, topic=self.__logger_name,
host_ip=host_ip, host_ip=host_ip,
host_name=host_name, host_name=host_name,
**self.extra_fields, # Include additional metadata app=log_settings.APP_NAME,
env=log_settings.ENVIRONMENT,
) )
with LoggerBase.logger_lock:
# ✅ Store reference to prevent duplicate instances
LoggerBase.binded_loggers[self.__logger_name] = self.logger LoggerBase.binded_loggers[self.__logger_name] = self.logger
async def log_event( def _get_log_context(self) -> dict:
self, frame = inspect.currentframe().f_back.f_back
sender_id: str, filename = os.path.basename(frame.f_code.co_filename)
receiver_id: str, lineno = frame.f_lineno
subject: str, return {"log_file": filename, "log_line": lineno}
event: str,
properties: Dict[str, Any], def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]:
text: str = "", props = {} if properties is None else properties.copy()
) -> None: props_str = json.dumps(props, ensure_ascii=False) if props else "{}"
local_logger = self.logger.bind( return props, props_str
sender_id=sender_id,
receiver_id=receiver_id, async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None:
subject=subject, props, props_str = self._prepare_properties(properties)
event=event, context = self._get_log_context()
properties=properties, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context)
)
local_logger.info(text) local_logger.info(text)
async def log_exception( async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context)
subject: str,
exception: Exception,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="exception",
properties=properties,
exception=exception,
)
local_logger.exception(text) local_logger.exception(text)
async def log_info( async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context)
subject: str,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="information",
properties=properties,
)
local_logger.info(text) local_logger.info(text)
async def log_warning( async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context)
subject: str,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="warning",
properties=properties,
)
local_logger.warning(text) local_logger.warning(text)
async def log_error( async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
self, props, props_str = self._prepare_properties(properties)
sender_id: str, context = self._get_log_context()
receiver_id: str, local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context)
subject: str,
text: str = "",
properties: Dict[str, Any] = None,
) -> None:
local_logger = self.logger.bind(
sender_id=sender_id,
receiver_id=receiver_id,
subject=subject,
event="error",
properties=properties,
)
local_logger.error(text) local_logger.error(text)
@staticmethod
def configure_uvicorn_logging():
print("📢 Setting up uvicorn logging interception...")
# Intercept logs from these loggers
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# Replace default handlers
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
# Configure specific uvicorn loggers
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove default handlers
logging_logger.propagate = True # Ensure propagation through Loguru

View File

@ -0,0 +1,25 @@
from .base_logger import LoggerBase
from common.config.app_settings import app_settings
import json
class BusinessMetricLogger(LoggerBase):
def __init__(self, business_metrics: dict[str, any] = {}) -> None:
extra_fileds = {}
if business_metrics:
extra_fileds.update(business_metrics)
super().__init__(
logger_name=app_settings.BUSINESS_METRIC_LOG,
extra_fileds=extra_fileds,
)
async def log_metrics(self, business_metrics: dict[str, any] = {}) -> None:
return await super().log_event(
sender_id="business_metric_manager",
receiver_id="business_metric_logger",
subject="metrics",
event="logging",
properties=business_metrics,
text="business metric logged"
)

View File

@ -0,0 +1,85 @@
import json
import datetime
import traceback
from pathlib import Path
from typing import Optional
class JsonSink:
def __init__(
self,
log_file_path: str,
rotation_size_bytes: int = 10 * 1024 * 1024,
max_backup_files: int = 5,
):
self.log_file_path = Path(log_file_path)
self.rotation_size = rotation_size_bytes
self.max_backup_files = max_backup_files
self._open_log_file()
def _open_log_file(self):
# ensure the parent directory exists
parent_dir = self.log_file_path.parent
if not parent_dir.exists():
parent_dir.mkdir(parents=True, exist_ok=True)
self.log_file = self.log_file_path.open("a", encoding="utf-8")
def _should_rotate(self) -> bool:
return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size
def _rotate(self):
self.log_file.close()
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}")
self.log_file_path.rename(rotated_path)
self._cleanup_old_backups()
self._open_log_file()
def _cleanup_old_backups(self):
parent = self.log_file_path.parent
stem = self.log_file_path.stem
suffix = self.log_file_path.suffix
backup_files = sorted(
parent.glob(f"{stem}_*{suffix}"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
for old_file in backup_files[self.max_backup_files:]:
try:
old_file.unlink()
except Exception as e:
print(f"Failed to delete old backup {old_file}: {e}")
def __call__(self, message):
record = message.record
if self._should_rotate():
self._rotate()
log_entry = {
"level": record["level"].name.lower(),
"timestamp": int(record["time"].timestamp() * 1000),
"text": record["message"],
"fields": record["extra"].get("properties", {}),
"context": {
"app": record["extra"].get("app"),
"env": record["extra"].get("env"),
"log_file": record["extra"].get("log_file"),
"log_line": record["extra"].get("log_line"),
"topic": record["extra"].get("topic"),
"sender_id": record["extra"].get("sender_id"),
"receiver_id": record["extra"].get("receiver_id"),
"subject": record["extra"].get("subject"),
"event": record["extra"].get("event"),
"host_ip": record["extra"].get("host_ip"),
"host_name": record["extra"].get("host_name"),
},
"stacktrace": None
}
if record["exception"]:
exc_type, exc_value, exc_tb = record["exception"]
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
self.log_file.flush()

View File

@ -6,9 +6,9 @@ import json
class UserLogger(LoggerBase): class UserLogger(LoggerBase):
def __init__(self, user_activities: dict[str, any] = {}) -> None: def __init__(self, user_activities: dict[str, any] = {}) -> None:
extra_fields = {} extra_fileds = {}
if user_activities: if user_activities:
extra_fields.update(user_activities) extra_fileds.update(user_activities)
super().__init__( super().__init__(
logger_name=app_settings.USER_ACTIVITY_LOG, extra_fields=extra_fields logger_name=app_settings.USER_ACTIVITY_LOG, extra_fileds=extra_fileds
) )

View File

@ -1,47 +1,8 @@
import logging
import sys
from loguru import logger as guru_logger from loguru import logger as guru_logger
from common.log.base_logger import LoggerBase
def register_logger(): def register_logger():
print("📢 Setting up logging interception...") print("📢 Setting up logging interception...")
LoggerBase.configure_uvicorn_logging()
# 🔴 **Ensure Uvicorn Logs Are Captured**
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# 🔴 **Replace Existing Loggers with Interception**
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove Uvicorn default handlers
logging_logger.propagate = True # ✅ Ensure they propagate through Loguru
# 🔴 **Redirect stdout/stderr to Loguru (Keep Green Timestamps)**
guru_logger.remove()
guru_logger.add(
sys.stdout,
level="INFO",
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | {level} | {message}",
)
print("✅ Logging interception complete. Logs are formatted and deduplicated!") print("✅ Logging interception complete. Logs are formatted and deduplicated!")