241 lines
12 KiB
Python
241 lines
12 KiB
Python
from webapi.config.site_settings import site_settings
|
|
from beanie import init_beanie
|
|
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
|
|
from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc
|
|
from common.config.app_settings import app_settings
|
|
from common.log.module_logger import ModuleLogger
|
|
import asyncio
|
|
from collections import OrderedDict
|
|
from typing import Optional, Union
|
|
import os
|
|
|
|
|
|
# Global variables for database management
|
|
MAIN_CLIENT: Optional[AsyncIOMotorClient] = None
|
|
TENANT_CACHE: Optional['TenantDBCache'] = None
|
|
|
|
# Define document models
|
|
document_models = [
|
|
MessageTemplateDoc,
|
|
EmailSenderDoc,
|
|
EmailSendStatusDoc,
|
|
EmailTrackingDoc,
|
|
EmailBounceDoc,
|
|
UsageLogDoc
|
|
]
|
|
tenant_document_models = [
|
|
MessageTemplateDoc,
|
|
EmailSenderDoc,
|
|
EmailSendStatusDoc
|
|
]
|
|
|
|
class TenantDBCache:
|
|
"""
|
|
Enhanced tenant database cache that includes Beanie initialization.
|
|
product_id -> (AsyncIOMotorClient, AsyncIOMotorDatabase, beanie_initialized: bool)
|
|
Uses main_db.tenant_doc to resolve mongodb_uri; caches clients/dbs with LRU and Beanie state.
|
|
"""
|
|
|
|
def __init__(self, main_db: AsyncIOMotorDatabase, max_size: int = 64):
|
|
self.main_db = main_db
|
|
self.max_size = max_size
|
|
self._cache: "OrderedDict[str, tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]]" = OrderedDict()
|
|
self._locks: dict[str, asyncio.Lock] = {}
|
|
self._global_lock = asyncio.Lock()
|
|
self.module_logger = ModuleLogger(sender_id="TenantDBCache")
|
|
|
|
async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase:
|
|
"""Get tenant database with Beanie already initialized"""
|
|
await self.module_logger.log_info(f"🔍 Starting database retrieval for product_id: {product_id}")
|
|
|
|
# fast-path
|
|
cached = self._cache.get(product_id)
|
|
if cached:
|
|
client, db, beanie_initialized = cached
|
|
await self.module_logger.log_info(f"📦 Found cached database for {product_id}, beanie_initialized: {beanie_initialized}")
|
|
self._cache.move_to_end(product_id)
|
|
|
|
if beanie_initialized:
|
|
await self.module_logger.log_info(f"✅ Returning cached database with Beanie already initialized for {product_id}")
|
|
return db
|
|
else:
|
|
# Initialize Beanie if not done yet
|
|
await self.module_logger.log_info(f"🔧 Initializing Beanie for cached database {product_id}")
|
|
await init_beanie(database=db, document_models=tenant_document_models)
|
|
await self.module_logger.log_info(f"✅ Beanie initialization completed for {product_id}")
|
|
|
|
# Update cache with beanie_initialized = True
|
|
self._cache[product_id] = (client, db, True)
|
|
return db
|
|
|
|
# double-checked under per-tenant lock
|
|
await self.module_logger.log_info(f"🔒 Acquiring lock for product_id: {product_id}")
|
|
lock = self._locks.setdefault(product_id, asyncio.Lock())
|
|
async with lock:
|
|
await self.module_logger.log_info(f"🔓 Lock acquired for product_id: {product_id}")
|
|
|
|
cached = self._cache.get(product_id)
|
|
if cached:
|
|
client, db, beanie_initialized = cached
|
|
await self.module_logger.log_info(f"📦 Double-check found cached database for {product_id}, beanie_initialized: {beanie_initialized}")
|
|
self._cache.move_to_end(product_id)
|
|
|
|
if beanie_initialized:
|
|
await self.module_logger.log_info(f"✅ Returning cached database with Beanie already initialized for {product_id}")
|
|
return db
|
|
else:
|
|
# Initialize Beanie if not done yet
|
|
await self.module_logger.log_info(f"🔧 Initializing Beanie for cached database {product_id} (double-check)")
|
|
await init_beanie(database=db, document_models=tenant_document_models)
|
|
await self.module_logger.log_info(f"✅ Beanie initialization completed for {product_id} (double-check)")
|
|
|
|
# Update cache with beanie_initialized = True
|
|
self._cache[product_id] = (client, db, True)
|
|
return db
|
|
|
|
# Create new tenant connection - use raw MongoDB query since we don't have TenantDoc model
|
|
await self.module_logger.log_info(f"🔍 Creating new tenant connection for product_id: {product_id}")
|
|
tenant = await self.main_db["tenant_doc"].find_one({"product_id": product_id})
|
|
if not tenant:
|
|
await self.module_logger.log_error(f"❌ Tenant {product_id} does not exist in main database")
|
|
raise ValueError(f"Tenant {product_id} does not exist")
|
|
if tenant.get("status") != "active":
|
|
await self.module_logger.log_error(f"❌ Tenant {product_id} is not active, status: {tenant.get('status')}")
|
|
raise ValueError(f"Tenant {product_id} is not active")
|
|
|
|
uri = tenant["mongodb_uri"]
|
|
await self.module_logger.log_info(f"🔗 Creating Motor client for tenant {product_id} with URI: {uri[:20]}...")
|
|
client = AsyncIOMotorClient(uri, minPoolSize=3, maxPoolSize=20, serverSelectionTimeoutMS=10000)
|
|
|
|
# robust db name resolution (get_default_database handles mongodb+srv and empty paths)
|
|
default_db = client.get_default_database()
|
|
if default_db is not None:
|
|
db = default_db
|
|
await self.module_logger.log_info(f"📊 Using default database for tenant {product_id}: {db.name}")
|
|
else:
|
|
# fallback: if no default, just use 'admin' or a convention; or raise
|
|
# Here we default to 'admin' to avoid surprises. Customize as needed.
|
|
db = client["admin"]
|
|
await self.module_logger.log_info(f"📊 Using fallback database 'admin' for tenant {product_id}")
|
|
|
|
# Initialize Beanie for this tenant database
|
|
await self.module_logger.log_info(f"🔧 Initializing Beanie for new tenant database {product_id}")
|
|
await init_beanie(database=db, document_models=tenant_document_models)
|
|
await self.module_logger.log_info(f"✅ Beanie initialization completed for new tenant database {product_id}")
|
|
|
|
# LRU put with beanie_initialized = True
|
|
await self._lru_put(product_id, (client, db, True))
|
|
await self.module_logger.log_info(f"💾 Tenant database {product_id} cached successfully with Beanie initialized")
|
|
return db
|
|
|
|
async def get_main_db_initialized(self) -> AsyncIOMotorDatabase:
|
|
"""Get main database with Beanie initialized for tenant models"""
|
|
await self.module_logger.log_info("🔧 Initializing Beanie for main database with business models")
|
|
# Re-initialize Beanie for main database with business models
|
|
await init_beanie(database=self.main_db, document_models=document_models)
|
|
await self.module_logger.log_info("✅ Beanie initialization completed for main database")
|
|
return self.main_db
|
|
|
|
async def get_db(self, product_id: str) -> AsyncIOMotorDatabase:
|
|
"""Legacy method for compatibility - just returns database without Beanie initialization"""
|
|
cached = self._cache.get(product_id)
|
|
if cached:
|
|
self._cache.move_to_end(product_id)
|
|
return cached[1]
|
|
|
|
# Use get_initialized_db and return the database part
|
|
db = await self.get_initialized_db(product_id)
|
|
return db
|
|
|
|
async def _lru_put(self, key: str, value: tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]):
|
|
async with self._global_lock:
|
|
await self.module_logger.log_info(f"📝 Adding {key} to cache, current cache size: {len(self._cache)}")
|
|
self._cache[key] = value
|
|
self._cache.move_to_end(key)
|
|
if len(self._cache) > self.max_size:
|
|
old_key, (old_client, _, _) = self._cache.popitem(last=False)
|
|
await self.module_logger.log_info(f"🗑️ Cache full, removing LRU tenant: {old_key}")
|
|
try:
|
|
old_client.close()
|
|
await self.module_logger.log_info(f"🔌 Closed connection for evicted tenant: {old_key}")
|
|
except Exception as e:
|
|
await self.module_logger.log_error(f"❌ Error closing connection for {old_key}: {str(e)}")
|
|
self._locks.pop(old_key, None)
|
|
|
|
async def aclose(self):
|
|
async with self._global_lock:
|
|
await self.module_logger.log_info(f"🔌 Closing all cached connections, count: {len(self._cache)}")
|
|
for key, (client, _, _) in self._cache.items():
|
|
try:
|
|
client.close()
|
|
await self.module_logger.log_info(f"✅ Closed connection for tenant: {key}")
|
|
except Exception as e:
|
|
await self.module_logger.log_error(f"❌ Error closing connection for {key}: {str(e)}")
|
|
self._cache.clear()
|
|
self._locks.clear()
|
|
await self.module_logger.log_info("🧹 Tenant cache cleared successfully")
|
|
|
|
|
|
def register(app):
|
|
"""Register database-related configurations and setup"""
|
|
app.debug = site_settings.DEBUG
|
|
app.title = site_settings.NAME
|
|
|
|
@app.on_event("startup")
|
|
async def start_database():
|
|
await initiate_database(app)
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown_database():
|
|
await cleanup_database()
|
|
|
|
|
|
async def initiate_database(app):
|
|
"""Initialize main database and tenant cache"""
|
|
global MAIN_CLIENT, TENANT_CACHE
|
|
|
|
module_logger = ModuleLogger(sender_id="DatabaseInit")
|
|
|
|
await module_logger.log_info("🚀 Starting database initialization")
|
|
|
|
# 1) Create main/catalog client + DB
|
|
await module_logger.log_info(f"🔗 Creating main MongoDB client with URI: {app_settings.MONGODB_URI[:20]}...")
|
|
MAIN_CLIENT = AsyncIOMotorClient(app_settings.MONGODB_URI)
|
|
main_db = MAIN_CLIENT[app_settings.MONGODB_NAME]
|
|
await module_logger.log_info(f"📊 Main database created: {app_settings.MONGODB_NAME}")
|
|
|
|
# 2) Initialize Beanie for main DB with business document models
|
|
await module_logger.log_info(f"🔧 Initializing Beanie for main database with {len(document_models)} document models")
|
|
await init_beanie(database=main_db, document_models=document_models)
|
|
await module_logger.log_info("✅ Beanie initialization completed for main database")
|
|
|
|
# 3) Create tenant cache that uses main_db lookups to resolve product_id -> tenant db
|
|
max_cache_size = getattr(app_settings, 'TENANT_CACHE_MAX', 64)
|
|
await module_logger.log_info(f"📦 Creating tenant cache with max size: {max_cache_size}")
|
|
TENANT_CACHE = TenantDBCache(main_db, max_size=max_cache_size)
|
|
|
|
# 4) Store on app state for middleware to access
|
|
app.state.main_db = main_db
|
|
app.state.tenant_cache = TENANT_CACHE
|
|
|
|
await module_logger.log_info("✅ Database and tenant cache initialized successfully")
|
|
print("Database and tenant cache initialized successfully")
|
|
|
|
|
|
async def cleanup_database():
|
|
"""Cleanup database connections and cache"""
|
|
global MAIN_CLIENT, TENANT_CACHE
|
|
|
|
module_logger = ModuleLogger(sender_id="DatabaseCleanup")
|
|
await module_logger.log_info("🧹 Starting database cleanup")
|
|
|
|
if TENANT_CACHE:
|
|
await module_logger.log_info("📦 Closing tenant cache")
|
|
await TENANT_CACHE.aclose()
|
|
|
|
if MAIN_CLIENT:
|
|
await module_logger.log_info("🔌 Closing main database client")
|
|
MAIN_CLIENT.close()
|
|
|
|
await module_logger.log_info("✅ Database connections closed successfully")
|
|
print("Database connections closed successfully") |