From f3a5f6321af92658f07604d9b65250b3641da8f8 Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Mon, 22 Sep 2025 10:57:41 +0800 Subject: [PATCH] refactor(cache): cache the tenant database and recall --- .../notification/webapi/providers/database.py | 237 +++++++++++++++++- 1 file changed, 234 insertions(+), 3 deletions(-) diff --git a/apps/notification/webapi/providers/database.py b/apps/notification/webapi/providers/database.py index f259607..9fb9fe4 100644 --- a/apps/notification/webapi/providers/database.py +++ b/apps/notification/webapi/providers/database.py @@ -1,10 +1,241 @@ from webapi.config.site_settings import site_settings +from beanie import init_beanie +from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase +from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc +from common.config.app_settings import app_settings +from common.log.module_logger import ModuleLogger +import asyncio +from collections import OrderedDict +from typing import Optional, Union +import os + + +# Global variables for database management +MAIN_CLIENT: Optional[AsyncIOMotorClient] = None +TENANT_CACHE: Optional['TenantDBCache'] = None + +# Define document models +document_models = [ + MessageTemplateDoc, + EmailSenderDoc, + EmailSendStatusDoc, + EmailTrackingDoc, + EmailBounceDoc, + UsageLogDoc +] +tenant_document_models = [ + MessageTemplateDoc, + EmailSenderDoc, + EmailSendStatusDoc +] + +class TenantDBCache: + """ + Enhanced tenant database cache that includes Beanie initialization. + product_id -> (AsyncIOMotorClient, AsyncIOMotorDatabase, beanie_initialized: bool) + Uses main_db.tenant_doc to resolve mongodb_uri; caches clients/dbs with LRU and Beanie state. + """ + + def __init__(self, main_db: AsyncIOMotorDatabase, max_size: int = 64): + self.main_db = main_db + self.max_size = max_size + self._cache: "OrderedDict[str, tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]]" = OrderedDict() + self._locks: dict[str, asyncio.Lock] = {} + self._global_lock = asyncio.Lock() + self.module_logger = ModuleLogger(sender_id="TenantDBCache") + + async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase: + """Get tenant database with Beanie already initialized""" + await self.module_logger.log_info(f"๐Ÿ” Starting database retrieval for product_id: {product_id}") + + # fast-path + cached = self._cache.get(product_id) + if cached: + client, db, beanie_initialized = cached + await self.module_logger.log_info(f"๐Ÿ“ฆ Found cached database for {product_id}, beanie_initialized: {beanie_initialized}") + self._cache.move_to_end(product_id) + + if beanie_initialized: + await self.module_logger.log_info(f"โœ… Returning cached database with Beanie already initialized for {product_id}") + return db + else: + # Initialize Beanie if not done yet + await self.module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for cached database {product_id}") + await init_beanie(database=db, document_models=tenant_document_models) + await self.module_logger.log_info(f"โœ… Beanie initialization completed for {product_id}") + + # Update cache with beanie_initialized = True + self._cache[product_id] = (client, db, True) + return db + + # double-checked under per-tenant lock + await self.module_logger.log_info(f"๐Ÿ”’ Acquiring lock for product_id: {product_id}") + lock = self._locks.setdefault(product_id, asyncio.Lock()) + async with lock: + await self.module_logger.log_info(f"๐Ÿ”“ Lock acquired for product_id: {product_id}") + + cached = self._cache.get(product_id) + if cached: + client, db, beanie_initialized = cached + await self.module_logger.log_info(f"๐Ÿ“ฆ Double-check found cached database for {product_id}, beanie_initialized: {beanie_initialized}") + self._cache.move_to_end(product_id) + + if beanie_initialized: + await self.module_logger.log_info(f"โœ… Returning cached database with Beanie already initialized for {product_id}") + return db + else: + # Initialize Beanie if not done yet + await self.module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for cached database {product_id} (double-check)") + await init_beanie(database=db, document_models=tenant_document_models) + await self.module_logger.log_info(f"โœ… Beanie initialization completed for {product_id} (double-check)") + + # Update cache with beanie_initialized = True + self._cache[product_id] = (client, db, True) + return db + + # Create new tenant connection - use raw MongoDB query since we don't have TenantDoc model + await self.module_logger.log_info(f"๐Ÿ” Creating new tenant connection for product_id: {product_id}") + tenant = await self.main_db["tenant_doc"].find_one({"product_id": product_id}) + if not tenant: + await self.module_logger.log_error(f"โŒ Tenant {product_id} does not exist in main database") + raise ValueError(f"Tenant {product_id} does not exist") + if tenant.get("status") != "active": + await self.module_logger.log_error(f"โŒ Tenant {product_id} is not active, status: {tenant.get('status')}") + raise ValueError(f"Tenant {product_id} is not active") + + uri = tenant["mongodb_uri"] + await self.module_logger.log_info(f"๐Ÿ”— Creating Motor client for tenant {product_id} with URI: {uri[:20]}...") + client = AsyncIOMotorClient(uri, minPoolSize=3, maxPoolSize=20, serverSelectionTimeoutMS=10000) + + # robust db name resolution (get_default_database handles mongodb+srv and empty paths) + default_db = client.get_default_database() + if default_db is not None: + db = default_db + await self.module_logger.log_info(f"๐Ÿ“Š Using default database for tenant {product_id}: {db.name}") + else: + # fallback: if no default, just use 'admin' or a convention; or raise + # Here we default to 'admin' to avoid surprises. Customize as needed. + db = client["admin"] + await self.module_logger.log_info(f"๐Ÿ“Š Using fallback database 'admin' for tenant {product_id}") + + # Initialize Beanie for this tenant database + await self.module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for new tenant database {product_id}") + await init_beanie(database=db, document_models=tenant_document_models) + await self.module_logger.log_info(f"โœ… Beanie initialization completed for new tenant database {product_id}") + + # LRU put with beanie_initialized = True + await self._lru_put(product_id, (client, db, True)) + await self.module_logger.log_info(f"๐Ÿ’พ Tenant database {product_id} cached successfully with Beanie initialized") + return db + + async def get_main_db_initialized(self) -> AsyncIOMotorDatabase: + """Get main database with Beanie initialized for tenant models""" + await self.module_logger.log_info("๐Ÿ”ง Initializing Beanie for main database with business models") + # Re-initialize Beanie for main database with business models + await init_beanie(database=self.main_db, document_models=document_models) + await self.module_logger.log_info("โœ… Beanie initialization completed for main database") + return self.main_db + + async def get_db(self, product_id: str) -> AsyncIOMotorDatabase: + """Legacy method for compatibility - just returns database without Beanie initialization""" + cached = self._cache.get(product_id) + if cached: + self._cache.move_to_end(product_id) + return cached[1] + + # Use get_initialized_db and return the database part + db = await self.get_initialized_db(product_id) + return db + + async def _lru_put(self, key: str, value: tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]): + async with self._global_lock: + await self.module_logger.log_info(f"๐Ÿ“ Adding {key} to cache, current cache size: {len(self._cache)}") + self._cache[key] = value + self._cache.move_to_end(key) + if len(self._cache) > self.max_size: + old_key, (old_client, _, _) = self._cache.popitem(last=False) + await self.module_logger.log_info(f"๐Ÿ—‘๏ธ Cache full, removing LRU tenant: {old_key}") + try: + old_client.close() + await self.module_logger.log_info(f"๐Ÿ”Œ Closed connection for evicted tenant: {old_key}") + except Exception as e: + await self.module_logger.log_error(f"โŒ Error closing connection for {old_key}: {str(e)}") + self._locks.pop(old_key, None) + + async def aclose(self): + async with self._global_lock: + await self.module_logger.log_info(f"๐Ÿ”Œ Closing all cached connections, count: {len(self._cache)}") + for key, (client, _, _) in self._cache.items(): + try: + client.close() + await self.module_logger.log_info(f"โœ… Closed connection for tenant: {key}") + except Exception as e: + await self.module_logger.log_error(f"โŒ Error closing connection for {key}: {str(e)}") + self._cache.clear() + self._locks.clear() + await self.module_logger.log_info("๐Ÿงน Tenant cache cleared successfully") def register(app): - """Register database-related configurations""" + """Register database-related configurations and setup""" app.debug = site_settings.DEBUG app.title = site_settings.NAME - # Database initialization is now handled by TenantDBConnectionMiddleware - # to support per-request tenant database switching \ No newline at end of file + @app.on_event("startup") + async def start_database(): + await initiate_database(app) + + @app.on_event("shutdown") + async def shutdown_database(): + await cleanup_database() + + +async def initiate_database(app): + """Initialize main database and tenant cache""" + global MAIN_CLIENT, TENANT_CACHE + + module_logger = ModuleLogger(sender_id="DatabaseInit") + + await module_logger.log_info("๐Ÿš€ Starting database initialization") + + # 1) Create main/catalog client + DB + await module_logger.log_info(f"๐Ÿ”— Creating main MongoDB client with URI: {app_settings.MONGODB_URI[:20]}...") + MAIN_CLIENT = AsyncIOMotorClient(app_settings.MONGODB_URI) + main_db = MAIN_CLIENT[app_settings.MONGODB_NAME] + await module_logger.log_info(f"๐Ÿ“Š Main database created: {app_settings.MONGODB_NAME}") + + # 2) Initialize Beanie for main DB with business document models + await module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for main database with {len(document_models)} document models") + await init_beanie(database=main_db, document_models=document_models) + await module_logger.log_info("โœ… Beanie initialization completed for main database") + + # 3) Create tenant cache that uses main_db lookups to resolve product_id -> tenant db + max_cache_size = getattr(app_settings, 'TENANT_CACHE_MAX', 64) + await module_logger.log_info(f"๐Ÿ“ฆ Creating tenant cache with max size: {max_cache_size}") + TENANT_CACHE = TenantDBCache(main_db, max_size=max_cache_size) + + # 4) Store on app state for middleware to access + app.state.main_db = main_db + app.state.tenant_cache = TENANT_CACHE + + await module_logger.log_info("โœ… Database and tenant cache initialized successfully") + print("Database and tenant cache initialized successfully") + + +async def cleanup_database(): + """Cleanup database connections and cache""" + global MAIN_CLIENT, TENANT_CACHE + + module_logger = ModuleLogger(sender_id="DatabaseCleanup") + await module_logger.log_info("๐Ÿงน Starting database cleanup") + + if TENANT_CACHE: + await module_logger.log_info("๐Ÿ“ฆ Closing tenant cache") + await TENANT_CACHE.aclose() + + if MAIN_CLIENT: + await module_logger.log_info("๐Ÿ”Œ Closing main database client") + MAIN_CLIENT.close() + + await module_logger.log_info("โœ… Database connections closed successfully") + print("Database connections closed successfully") \ No newline at end of file