fix(cache): cache client rather than the db
This commit is contained in:
parent
4d3b763ea3
commit
7a2686998b
@ -32,15 +32,16 @@ tenant_document_models = [
|
|||||||
|
|
||||||
class TenantDBCache:
|
class TenantDBCache:
|
||||||
"""
|
"""
|
||||||
Enhanced tenant database cache that includes Beanie initialization.
|
Enhanced tenant database cache that caches only clients, not databases.
|
||||||
product_id -> (AsyncIOMotorClient, AsyncIOMotorDatabase, beanie_initialized: bool)
|
product_id -> AsyncIOMotorClient
|
||||||
Uses main_db.tenant_doc to resolve mongodb_uri; caches clients/dbs with LRU and Beanie state.
|
Uses main_db.tenant_doc to resolve mongodb_uri; caches clients with LRU.
|
||||||
|
Database instances are created fresh each time from cached clients.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, main_db: AsyncIOMotorDatabase, max_size: int = 64):
|
def __init__(self, main_db: AsyncIOMotorDatabase, max_size: int = 64):
|
||||||
self.main_db = main_db
|
self.main_db = main_db
|
||||||
self.max_size = max_size
|
self.max_size = max_size
|
||||||
self._cache: "OrderedDict[str, tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]]" = OrderedDict()
|
self._cache: "OrderedDict[str, AsyncIOMotorClient]" = OrderedDict()
|
||||||
self._locks: dict[str, asyncio.Lock] = {}
|
self._locks: dict[str, asyncio.Lock] = {}
|
||||||
self._global_lock = asyncio.Lock()
|
self._global_lock = asyncio.Lock()
|
||||||
self.module_logger = ModuleLogger(sender_id="TenantDBCache")
|
self.module_logger = ModuleLogger(sender_id="TenantDBCache")
|
||||||
@ -48,43 +49,43 @@ class TenantDBCache:
|
|||||||
async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase:
|
async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase:
|
||||||
"""Get tenant database with Beanie already initialized"""
|
"""Get tenant database with Beanie already initialized"""
|
||||||
|
|
||||||
# fast-path
|
# fast-path: check if client is cached
|
||||||
cached = self._cache.get(product_id)
|
cached_client = self._cache.get(product_id)
|
||||||
if cached:
|
if cached_client:
|
||||||
client, db, beanie_initialized = cached
|
await self.module_logger.log_info(f"Found cached client for {product_id}")
|
||||||
await self.module_logger.log_info(f"Found cached database for {product_id}, beanie_initialized: {beanie_initialized}")
|
|
||||||
self._cache.move_to_end(product_id)
|
self._cache.move_to_end(product_id)
|
||||||
|
|
||||||
if beanie_initialized:
|
# Get fresh database instance from cached client
|
||||||
|
db = cached_client.get_default_database()
|
||||||
|
if db is not None:
|
||||||
|
# Initialize Beanie for this fresh database instance
|
||||||
|
await init_beanie(database=db, document_models=tenant_document_models)
|
||||||
|
await self.module_logger.log_info(f"Beanie initialization completed for {product_id} using cached client")
|
||||||
return db
|
return db
|
||||||
else:
|
else:
|
||||||
# Initialize Beanie if not done yet
|
await self.module_logger.log_error(f"No default database found for cached client {product_id}")
|
||||||
await init_beanie(database=db, document_models=tenant_document_models)
|
# Remove invalid cached client
|
||||||
await self.module_logger.log_info(f"Beanie initialization completed for {product_id}")
|
del self._cache[product_id]
|
||||||
|
|
||||||
# Update cache with beanie_initialized = True
|
|
||||||
self._cache[product_id] = (client, db, True)
|
|
||||||
return db
|
|
||||||
|
|
||||||
# double-checked under per-tenant lock
|
# double-checked under per-tenant lock
|
||||||
lock = self._locks.setdefault(product_id, asyncio.Lock())
|
lock = self._locks.setdefault(product_id, asyncio.Lock())
|
||||||
async with lock:
|
async with lock:
|
||||||
cached = self._cache.get(product_id)
|
cached_client = self._cache.get(product_id)
|
||||||
if cached:
|
if cached_client:
|
||||||
client, db, beanie_initialized = cached
|
await self.module_logger.log_info(f"Double-check found cached client for {product_id}")
|
||||||
await self.module_logger.log_info(f"Double-check found cached database for {product_id}, beanie_initialized: {beanie_initialized}")
|
|
||||||
self._cache.move_to_end(product_id)
|
self._cache.move_to_end(product_id)
|
||||||
|
|
||||||
if beanie_initialized:
|
# Get fresh database instance from cached client
|
||||||
|
db = cached_client.get_default_database()
|
||||||
|
if db is not None:
|
||||||
|
# Initialize Beanie for this fresh database instance
|
||||||
|
await init_beanie(database=db, document_models=tenant_document_models)
|
||||||
|
await self.module_logger.log_info(f"Beanie initialization completed for {product_id} using cached client (double-check)")
|
||||||
return db
|
return db
|
||||||
else:
|
else:
|
||||||
# Initialize Beanie if not done yet
|
await self.module_logger.log_error(f"No default database found for cached client {product_id}")
|
||||||
await init_beanie(database=db, document_models=tenant_document_models)
|
# Remove invalid cached client
|
||||||
await self.module_logger.log_info(f"Beanie initialization completed for {product_id} (double-check)")
|
del self._cache[product_id]
|
||||||
|
|
||||||
# Update cache with beanie_initialized = True
|
|
||||||
self._cache[product_id] = (client, db, True)
|
|
||||||
return db
|
|
||||||
|
|
||||||
# Create new tenant connection - use raw MongoDB query since we don't have TenantDoc model
|
# Create new tenant connection - use raw MongoDB query since we don't have TenantDoc model
|
||||||
"""
|
"""
|
||||||
@ -127,13 +128,14 @@ class TenantDBCache:
|
|||||||
detail=f"No default database found for tenant {product_id}",
|
detail=f"No default database found for tenant {product_id}",
|
||||||
headers={"X-Error-Message": f"No default database found for tenant {product_id}"}
|
headers={"X-Error-Message": f"No default database found for tenant {product_id}"}
|
||||||
)
|
)
|
||||||
|
|
||||||
# Initialize Beanie for this tenant database
|
# Initialize Beanie for this tenant database
|
||||||
await init_beanie(database=db, document_models=tenant_document_models)
|
await init_beanie(database=db, document_models=tenant_document_models)
|
||||||
await self.module_logger.log_info(f"Beanie initialization completed for new tenant database {product_id}")
|
await self.module_logger.log_info(f"Beanie initialization completed for new tenant database {product_id}")
|
||||||
|
|
||||||
# LRU put with beanie_initialized = True
|
# Cache only the client
|
||||||
await self._lru_put(product_id, (client, db, True))
|
await self._lru_put(product_id, client)
|
||||||
await self.module_logger.log_info(f"Tenant database {product_id} cached successfully with Beanie initialized")
|
await self.module_logger.log_info(f"Tenant client {product_id} cached successfully")
|
||||||
return db
|
return db
|
||||||
|
|
||||||
async def get_main_db_initialized(self) -> AsyncIOMotorDatabase:
|
async def get_main_db_initialized(self) -> AsyncIOMotorDatabase:
|
||||||
@ -143,12 +145,12 @@ class TenantDBCache:
|
|||||||
await self.module_logger.log_info("Beanie initialization completed for main database")
|
await self.module_logger.log_info("Beanie initialization completed for main database")
|
||||||
return self.main_db
|
return self.main_db
|
||||||
|
|
||||||
async def _lru_put(self, key: str, value: tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]):
|
async def _lru_put(self, key: str, client: AsyncIOMotorClient):
|
||||||
async with self._global_lock:
|
async with self._global_lock:
|
||||||
self._cache[key] = value
|
self._cache[key] = client
|
||||||
self._cache.move_to_end(key)
|
self._cache.move_to_end(key)
|
||||||
if len(self._cache) > self.max_size:
|
if len(self._cache) > self.max_size:
|
||||||
old_key, (old_client, _, _) = self._cache.popitem(last=False)
|
old_key, old_client = self._cache.popitem(last=False)
|
||||||
await self.module_logger.log_info(f"Cache full, removing LRU tenant: {old_key}")
|
await self.module_logger.log_info(f"Cache full, removing LRU tenant: {old_key}")
|
||||||
try:
|
try:
|
||||||
old_client.close()
|
old_client.close()
|
||||||
@ -159,7 +161,7 @@ class TenantDBCache:
|
|||||||
|
|
||||||
async def aclose(self):
|
async def aclose(self):
|
||||||
async with self._global_lock:
|
async with self._global_lock:
|
||||||
for key, (client, _, _) in self._cache.items():
|
for key, client in self._cache.items():
|
||||||
try:
|
try:
|
||||||
client.close()
|
client.close()
|
||||||
await self.module_logger.log_info(f"Closed connection for tenant: {key}")
|
await self.module_logger.log_info(f"Closed connection for tenant: {key}")
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user