import json import datetime import traceback from pathlib import Path from typing import Optional class JsonSink: def __init__( self, log_file_path: str, rotation_size_bytes: int = 10 * 1024 * 1024, max_backup_files: int = 5, ): self.log_file_path = Path(log_file_path) self.rotation_size = rotation_size_bytes self.max_backup_files = max_backup_files self._open_log_file() def _open_log_file(self): # ensure the parent directory exists parent_dir = self.log_file_path.parent if not parent_dir.exists(): parent_dir.mkdir(parents=True, exist_ok=True) self.log_file = self.log_file_path.open("a", encoding="utf-8") def _should_rotate(self) -> bool: return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size def _rotate(self): self.log_file.close() timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S") rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}") self.log_file_path.rename(rotated_path) self._cleanup_old_backups() self._open_log_file() def _cleanup_old_backups(self): parent = self.log_file_path.parent stem = self.log_file_path.stem suffix = self.log_file_path.suffix backup_files = sorted( parent.glob(f"{stem}_*{suffix}"), key=lambda p: p.stat().st_mtime, reverse=True, ) for old_file in backup_files[self.max_backup_files:]: try: old_file.unlink() except Exception as e: print(f"Failed to delete old backup {old_file}: {e}") def __call__(self, message): record = message.record if self._should_rotate(): self._rotate() log_entry = { "level": record["level"].name.lower(), "timestamp": int(record["time"].timestamp() * 1000), "text": record["message"], "fields": record["extra"].get("properties", {}), "context": { "app": record["extra"].get("app"), "env": record["extra"].get("env"), "log_file": record["extra"].get("log_file"), "log_line": record["extra"].get("log_line"), "topic": record["extra"].get("topic"), "sender_id": record["extra"].get("sender_id"), "receiver_id": record["extra"].get("receiver_id"), "subject": record["extra"].get("subject"), "event": record["extra"].get("event"), "host_ip": record["extra"].get("host_ip"), "host_name": record["extra"].get("host_name"), }, "stacktrace": None } if record["exception"]: exc_type, exc_value, exc_tb = record["exception"] log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb) self.log_file.write(json.dumps(log_entry, ensure_ascii=False, default=str) + "\n") self.log_file.flush()