From 7a2686998b4509aefd044c2093b8ffb344e82729 Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Fri, 26 Sep 2025 14:08:42 +0800 Subject: [PATCH] fix(cache): cache client rather than the db --- .../notification/webapi/providers/database.py | 74 ++++++++++--------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/apps/notification/webapi/providers/database.py b/apps/notification/webapi/providers/database.py index 7f6fc01..1dc1843 100644 --- a/apps/notification/webapi/providers/database.py +++ b/apps/notification/webapi/providers/database.py @@ -32,15 +32,16 @@ tenant_document_models = [ class TenantDBCache: """ - Enhanced tenant database cache that includes Beanie initialization. - product_id -> (AsyncIOMotorClient, AsyncIOMotorDatabase, beanie_initialized: bool) - Uses main_db.tenant_doc to resolve mongodb_uri; caches clients/dbs with LRU and Beanie state. + Enhanced tenant database cache that caches only clients, not databases. + product_id -> AsyncIOMotorClient + Uses main_db.tenant_doc to resolve mongodb_uri; caches clients with LRU. + Database instances are created fresh each time from cached clients. """ def __init__(self, main_db: AsyncIOMotorDatabase, max_size: int = 64): self.main_db = main_db self.max_size = max_size - self._cache: "OrderedDict[str, tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]]" = OrderedDict() + self._cache: "OrderedDict[str, AsyncIOMotorClient]" = OrderedDict() self._locks: dict[str, asyncio.Lock] = {} self._global_lock = asyncio.Lock() self.module_logger = ModuleLogger(sender_id="TenantDBCache") @@ -48,43 +49,43 @@ class TenantDBCache: async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase: """Get tenant database with Beanie already initialized""" - # fast-path - cached = self._cache.get(product_id) - if cached: - client, db, beanie_initialized = cached - await self.module_logger.log_info(f"Found cached database for {product_id}, beanie_initialized: {beanie_initialized}") + # fast-path: check if client is cached + cached_client = self._cache.get(product_id) + if cached_client: + await self.module_logger.log_info(f"Found cached client for {product_id}") self._cache.move_to_end(product_id) - if beanie_initialized: + # Get fresh database instance from cached client + db = cached_client.get_default_database() + if db is not None: + # Initialize Beanie for this fresh database instance + await init_beanie(database=db, document_models=tenant_document_models) + await self.module_logger.log_info(f"Beanie initialization completed for {product_id} using cached client") return db else: - # Initialize Beanie if not done yet - await init_beanie(database=db, document_models=tenant_document_models) - await self.module_logger.log_info(f"Beanie initialization completed for {product_id}") - - # Update cache with beanie_initialized = True - self._cache[product_id] = (client, db, True) - return db + await self.module_logger.log_error(f"No default database found for cached client {product_id}") + # Remove invalid cached client + del self._cache[product_id] # double-checked under per-tenant lock lock = self._locks.setdefault(product_id, asyncio.Lock()) async with lock: - cached = self._cache.get(product_id) - if cached: - client, db, beanie_initialized = cached - await self.module_logger.log_info(f"Double-check found cached database for {product_id}, beanie_initialized: {beanie_initialized}") + cached_client = self._cache.get(product_id) + if cached_client: + await self.module_logger.log_info(f"Double-check found cached client for {product_id}") self._cache.move_to_end(product_id) - if beanie_initialized: + # Get fresh database instance from cached client + db = cached_client.get_default_database() + if db is not None: + # Initialize Beanie for this fresh database instance + await init_beanie(database=db, document_models=tenant_document_models) + await self.module_logger.log_info(f"Beanie initialization completed for {product_id} using cached client (double-check)") return db else: - # Initialize Beanie if not done yet - await init_beanie(database=db, document_models=tenant_document_models) - await self.module_logger.log_info(f"Beanie initialization completed for {product_id} (double-check)") - - # Update cache with beanie_initialized = True - self._cache[product_id] = (client, db, True) - return db + await self.module_logger.log_error(f"No default database found for cached client {product_id}") + # Remove invalid cached client + del self._cache[product_id] # Create new tenant connection - use raw MongoDB query since we don't have TenantDoc model """ @@ -127,13 +128,14 @@ class TenantDBCache: detail=f"No default database found for tenant {product_id}", headers={"X-Error-Message": f"No default database found for tenant {product_id}"} ) + # Initialize Beanie for this tenant database await init_beanie(database=db, document_models=tenant_document_models) await self.module_logger.log_info(f"Beanie initialization completed for new tenant database {product_id}") - # LRU put with beanie_initialized = True - await self._lru_put(product_id, (client, db, True)) - await self.module_logger.log_info(f"Tenant database {product_id} cached successfully with Beanie initialized") + # Cache only the client + await self._lru_put(product_id, client) + await self.module_logger.log_info(f"Tenant client {product_id} cached successfully") return db async def get_main_db_initialized(self) -> AsyncIOMotorDatabase: @@ -143,12 +145,12 @@ class TenantDBCache: await self.module_logger.log_info("Beanie initialization completed for main database") return self.main_db - async def _lru_put(self, key: str, value: tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]): + async def _lru_put(self, key: str, client: AsyncIOMotorClient): async with self._global_lock: - self._cache[key] = value + self._cache[key] = client self._cache.move_to_end(key) if len(self._cache) > self.max_size: - old_key, (old_client, _, _) = self._cache.popitem(last=False) + old_key, old_client = self._cache.popitem(last=False) await self.module_logger.log_info(f"Cache full, removing LRU tenant: {old_key}") try: old_client.close() @@ -159,7 +161,7 @@ class TenantDBCache: async def aclose(self): async with self._global_lock: - for key, (client, _, _) in self._cache.items(): + for key, client in self._cache.items(): try: client.close() await self.module_logger.log_info(f"Closed connection for tenant: {key}")