From 7844906cfe64b1b2f2d8ae29b064213385c6ce98 Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Sat, 11 Oct 2025 09:52:52 +0800 Subject: [PATCH] feat(middleware): modify database to tolerate the tenant cache --- .../webapi/providers/database.py | 263 ++++++++++++++++-- 1 file changed, 239 insertions(+), 24 deletions(-) diff --git a/apps/authentication/webapi/providers/database.py b/apps/authentication/webapi/providers/database.py index aaf1cca..e1a2cf8 100644 --- a/apps/authentication/webapi/providers/database.py +++ b/apps/authentication/webapi/providers/database.py @@ -1,38 +1,253 @@ -import logging -import asyncio -from common.config.app_settings import app_settings +from webapi.config.site_settings import site_settings from beanie import init_beanie -from motor.motor_asyncio import AsyncIOMotorClient -from backend.models import backend_models -from common.probes import ProbeResult - -client = AsyncIOMotorClient( - app_settings.MONGODB_URI, - serverSelectionTimeoutMS=60000, - minPoolSize=5, # Minimum number of connections in the pool - maxPoolSize=20, # Maximum number of connections in the pool - heartbeatFrequencyMS=20000, # Adjust heartbeat frequency to 20 seconds +from fastapi import HTTPException +from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase +from backend.models.user.models import ( + UserAccountDoc, + UserPasswordDoc, + UserEmailDoc, + UserMobileDoc, + AuthCodeDoc, + UsageLogDoc ) +from backend.models.user_profile.models import BasicProfileDoc +from backend.models.permission.models import PermissionDoc, RoleDoc, UserRoleDoc +from common.config.app_settings import app_settings +from common.log.module_logger import ModuleLogger +from common.probes import ProbeResult +import asyncio +from collections import OrderedDict +from typing import Optional, Union +import os + + +# Global variables for database management +MAIN_CLIENT: Optional[AsyncIOMotorClient] = None +TENANT_CACHE: Optional['TenantDBCache'] = None + +# Define document models +document_models = [ + UsageLogDoc, + UserAccountDoc, + UserPasswordDoc, + UserEmailDoc, + UserMobileDoc, + AuthCodeDoc, + BasicProfileDoc, + PermissionDoc, + RoleDoc, + UserRoleDoc +] + +tenant_document_models = [ + UserAccountDoc, + UserPasswordDoc, + UserEmailDoc, + UserMobileDoc, + AuthCodeDoc, + BasicProfileDoc, + PermissionDoc, + RoleDoc, + UserRoleDoc +] + +class TenantDBCache: + """ + Enhanced tenant database cache that caches only clients, not databases. + product_id -> AsyncIOMotorClient + Uses main_db.tenant_doc to resolve mongodb_uri; caches clients with LRU. + Database instances are created fresh each time from cached clients. + """ + + def __init__(self, main_db: AsyncIOMotorDatabase, max_size: int = 64): + self.main_db = main_db + self.max_size = max_size + self._cache: "OrderedDict[str, AsyncIOMotorClient]" = OrderedDict() + self._locks: dict[str, asyncio.Lock] = {} + self._global_lock = asyncio.Lock() + self.module_logger = ModuleLogger(sender_id="TenantDBCache") + + async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase: + """Get tenant database with Beanie already initialized""" + + # fast-path: check if client is cached + cached_client = self._cache.get(product_id) + if cached_client: + await self.module_logger.log_info(f"Found cached client for {product_id}") + self._cache.move_to_end(product_id) + + # Get fresh database instance from cached client + db = cached_client.get_default_database() + if db is not None: + # Initialize Beanie for this fresh database instance + await init_beanie(database=db, document_models=tenant_document_models) + await self.module_logger.log_info(f"Beanie initialization completed for {product_id} using cached client") + return db + else: + await self.module_logger.log_error(f"No default database found for cached client {product_id}") + # Remove invalid cached client + del self._cache[product_id] + + # double-checked under per-tenant lock + lock = self._locks.setdefault(product_id, asyncio.Lock()) + async with lock: + cached_client = self._cache.get(product_id) + if cached_client: + await self.module_logger.log_info(f"Double-check found cached client for {product_id}") + self._cache.move_to_end(product_id) + + # Get fresh database instance from cached client + db = cached_client.get_default_database() + if db is not None: + # Initialize Beanie for this fresh database instance + await init_beanie(database=db, document_models=tenant_document_models) + await self.module_logger.log_info(f"Beanie initialization completed for {product_id} using cached client (double-check)") + return db + else: + await self.module_logger.log_error(f"No default database found for cached client {product_id}") + # Remove invalid cached client + del self._cache[product_id] + + # Create new tenant connection - use raw MongoDB query since we don't have TenantDoc model + """ + tenant_doc content: + { + "tenant_name": "magicleaps", + "product_id": "68a3f19119cfaf36316f6d14", + "mongodb_uri": "mongodb://localhost:27017/interview", + "status": "active" + } + """ + tenant = await self.main_db["tenant_doc"].find_one({"product_id": product_id}) + if not tenant: + await self.module_logger.log_error(f"Tenant {product_id} does not exist in main database") + raise HTTPException( + status_code=404, + detail=f"Tenant {product_id} does not exist", + headers={"X-Error-Message": f"Tenant {product_id} does not exist"} + ) + if tenant.get("status") != "active": + await self.module_logger.log_error(f"Tenant {product_id} is not active, status: {tenant.get('status')}") + raise HTTPException( + status_code=403, + detail=f"Tenant {product_id} is not active", + headers={"X-Error-Message": f"Tenant {product_id} is not active, status: {tenant.get('status')}"} + ) + + uri = tenant["mongodb_uri"] + client = AsyncIOMotorClient(uri, minPoolSize=3, maxPoolSize=20, serverSelectionTimeoutMS=10000) + + # robust db name resolution (get_default_database handles mongodb+srv and empty paths) + default_db = client.get_default_database() + if default_db is not None: + db = default_db + await self.module_logger.log_info(f"Using default database for tenant {product_id}: {db.name}") + else: + await self.module_logger.log_error(f"No default database found for tenant {product_id}") + raise HTTPException( + status_code=500, + detail=f"No default database found for tenant {product_id}", + headers={"X-Error-Message": f"No default database found for tenant {product_id}"} + ) + + # Initialize Beanie for this tenant database + await init_beanie(database=db, document_models=tenant_document_models) + await self.module_logger.log_info(f"Beanie initialization completed for new tenant database {product_id}") + + # Cache only the client + await self._lru_put(product_id, client) + await self.module_logger.log_info(f"Tenant client {product_id} cached successfully") + return db + + async def get_main_db_initialized(self) -> AsyncIOMotorDatabase: + """Get main database with Beanie initialized for tenant models""" + # Re-initialize Beanie for main database with business models + await init_beanie(database=self.main_db, document_models=document_models) + await self.module_logger.log_info("Beanie initialization completed for main database") + return self.main_db + + async def _lru_put(self, key: str, client: AsyncIOMotorClient): + async with self._global_lock: + self._cache[key] = client + self._cache.move_to_end(key) + if len(self._cache) > self.max_size: + old_key, old_client = self._cache.popitem(last=False) + await self.module_logger.log_info(f"Cache full, removing LRU tenant: {old_key}") + try: + old_client.close() + await self.module_logger.log_info(f"Closed connection for evicted tenant: {old_key}") + except Exception as e: + await self.module_logger.log_error(f"Error closing connection for {old_key}: {str(e)}") + self._locks.pop(old_key, None) + + async def aclose(self): + async with self._global_lock: + for key, client in self._cache.items(): + try: + client.close() + await self.module_logger.log_info(f"Closed connection for tenant: {key}") + except Exception as e: + await self.module_logger.log_error(f"Error closing connection for {key}: {str(e)}") + self._cache.clear() + self._locks.clear() + await self.module_logger.log_info("Tenant cache cleared successfully") + def register(app): - app.debug = "auth_mongo_debug" - app.title = "auth_mongo_name" - - # Configure logging for pymongo - logging.getLogger("pymongo").setLevel(logging.WARNING) # Suppress DEBUG logs + """Register database-related configurations and setup""" + app.debug = site_settings.DEBUG + app.title = site_settings.NAME @app.on_event("startup") async def start_database(): - await initiate_database() + await initiate_database(app) + + @app.on_event("shutdown") + async def shutdown_database(): + await cleanup_database() async def check_database_initialized() -> ProbeResult: try: - await asyncio.wait_for(client.server_info(), timeout=5) + await asyncio.wait_for(MAIN_CLIENT.server_info(), timeout=5) return ProbeResult(success=True, message="service has been initialized and ready to serve") except Exception: return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"}) -async def initiate_database(): - await init_beanie( - database=client[app_settings.MONGODB_NAME], document_models=backend_models - ) + +async def initiate_database(app): + """Initialize main database and tenant cache""" + global MAIN_CLIENT, TENANT_CACHE + + module_logger = ModuleLogger(sender_id="DatabaseInit") + + # 1) Create main/catalog client + DB + MAIN_CLIENT = AsyncIOMotorClient(app_settings.MONGODB_URI) + main_db = MAIN_CLIENT[app_settings.MONGODB_NAME] + + # 2) Initialize Beanie for main DB with business document models + await init_beanie(database=main_db, document_models=document_models) + + # 3) Create tenant cache that uses main_db lookups to resolve product_id -> tenant db + max_cache_size = getattr(app_settings, 'TENANT_CACHE_MAX', 64) + TENANT_CACHE = TenantDBCache(main_db, max_size=max_cache_size) + + # 4) Store on app state for middleware to access + app.state.main_db = main_db + app.state.tenant_cache = TENANT_CACHE + + await module_logger.log_info("Database and tenant cache initialized successfully") + + +async def cleanup_database(): + """Cleanup database connections and cache""" + global MAIN_CLIENT, TENANT_CACHE + + module_logger = ModuleLogger(sender_id="DatabaseCleanup") + + if TENANT_CACHE: + await TENANT_CACHE.aclose() + + if MAIN_CLIENT: + MAIN_CLIENT.close() + + await module_logger.log_info("Database connections closed successfully") \ No newline at end of file