diff --git a/apps/notification/webapi/providers/database.py b/apps/notification/webapi/providers/database.py index 9fb9fe4..2f48c57 100644 --- a/apps/notification/webapi/providers/database.py +++ b/apps/notification/webapi/providers/database.py @@ -46,134 +46,123 @@ class TenantDBCache: async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase: """Get tenant database with Beanie already initialized""" - await self.module_logger.log_info(f"๐Ÿ” Starting database retrieval for product_id: {product_id}") # fast-path cached = self._cache.get(product_id) if cached: client, db, beanie_initialized = cached - await self.module_logger.log_info(f"๐Ÿ“ฆ Found cached database for {product_id}, beanie_initialized: {beanie_initialized}") + await self.module_logger.log_info(f"Found cached database for {product_id}, beanie_initialized: {beanie_initialized}") self._cache.move_to_end(product_id) if beanie_initialized: - await self.module_logger.log_info(f"โœ… Returning cached database with Beanie already initialized for {product_id}") return db else: # Initialize Beanie if not done yet - await self.module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for cached database {product_id}") await init_beanie(database=db, document_models=tenant_document_models) - await self.module_logger.log_info(f"โœ… Beanie initialization completed for {product_id}") + await self.module_logger.log_info(f"Beanie initialization completed for {product_id}") # Update cache with beanie_initialized = True self._cache[product_id] = (client, db, True) return db # double-checked under per-tenant lock - await self.module_logger.log_info(f"๐Ÿ”’ Acquiring lock for product_id: {product_id}") lock = self._locks.setdefault(product_id, asyncio.Lock()) async with lock: - await self.module_logger.log_info(f"๐Ÿ”“ Lock acquired for product_id: {product_id}") - cached = self._cache.get(product_id) if cached: client, db, beanie_initialized = cached - await self.module_logger.log_info(f"๐Ÿ“ฆ Double-check found cached database for {product_id}, beanie_initialized: {beanie_initialized}") + await self.module_logger.log_info(f"Double-check found cached database for {product_id}, beanie_initialized: {beanie_initialized}") self._cache.move_to_end(product_id) if beanie_initialized: - await self.module_logger.log_info(f"โœ… Returning cached database with Beanie already initialized for {product_id}") return db else: # Initialize Beanie if not done yet - await self.module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for cached database {product_id} (double-check)") await init_beanie(database=db, document_models=tenant_document_models) - await self.module_logger.log_info(f"โœ… Beanie initialization completed for {product_id} (double-check)") + await self.module_logger.log_info(f"Beanie initialization completed for {product_id} (double-check)") # Update cache with beanie_initialized = True self._cache[product_id] = (client, db, True) return db # Create new tenant connection - use raw MongoDB query since we don't have TenantDoc model - await self.module_logger.log_info(f"๐Ÿ” Creating new tenant connection for product_id: {product_id}") + # tenant_doc content: + # "tenant_name": "magicleaps", + # "product_id": "68a3f19119cfaf36316f6d14", + # "mongodb_uri": "mongodb://localhost:27017/interview", + # "status": "active" tenant = await self.main_db["tenant_doc"].find_one({"product_id": product_id}) if not tenant: - await self.module_logger.log_error(f"โŒ Tenant {product_id} does not exist in main database") - raise ValueError(f"Tenant {product_id} does not exist") + await self.module_logger.log_error(f"Tenant {product_id} does not exist in main database") + raise HTTPException( + status_code=404, + detail=f"Tenant {product_id} does not exist", + headers={"X-Error-Message": f"Tenant {product_id} does not exist"} + ) if tenant.get("status") != "active": - await self.module_logger.log_error(f"โŒ Tenant {product_id} is not active, status: {tenant.get('status')}") - raise ValueError(f"Tenant {product_id} is not active") + await self.module_logger.log_error(f"Tenant {product_id} is not active, status: {tenant.get('status')}") + raise HTTPException( + status_code=403, + detail=f"Tenant {product_id} is not active", + headers={"X-Error-Message": f"Tenant {product_id} is not active, status: {tenant.get('status')}"} + ) uri = tenant["mongodb_uri"] - await self.module_logger.log_info(f"๐Ÿ”— Creating Motor client for tenant {product_id} with URI: {uri[:20]}...") client = AsyncIOMotorClient(uri, minPoolSize=3, maxPoolSize=20, serverSelectionTimeoutMS=10000) # robust db name resolution (get_default_database handles mongodb+srv and empty paths) default_db = client.get_default_database() if default_db is not None: db = default_db - await self.module_logger.log_info(f"๐Ÿ“Š Using default database for tenant {product_id}: {db.name}") + await self.module_logger.log_info(f"Using default database for tenant {product_id}: {db.name}") else: - # fallback: if no default, just use 'admin' or a convention; or raise - # Here we default to 'admin' to avoid surprises. Customize as needed. - db = client["admin"] - await self.module_logger.log_info(f"๐Ÿ“Š Using fallback database 'admin' for tenant {product_id}") - + await self.module_logger.log_error(f"No default database found for tenant {product_id}") + raise HTTPException( + status_code=500, + detail=f"No default database found for tenant {product_id}", + headers={"X-Error-Message": f"No default database found for tenant {product_id}"} + ) # Initialize Beanie for this tenant database - await self.module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for new tenant database {product_id}") await init_beanie(database=db, document_models=tenant_document_models) - await self.module_logger.log_info(f"โœ… Beanie initialization completed for new tenant database {product_id}") + await self.module_logger.log_info(f"Beanie initialization completed for new tenant database {product_id}") # LRU put with beanie_initialized = True await self._lru_put(product_id, (client, db, True)) - await self.module_logger.log_info(f"๐Ÿ’พ Tenant database {product_id} cached successfully with Beanie initialized") + await self.module_logger.log_info(f"Tenant database {product_id} cached successfully with Beanie initialized") return db async def get_main_db_initialized(self) -> AsyncIOMotorDatabase: """Get main database with Beanie initialized for tenant models""" - await self.module_logger.log_info("๐Ÿ”ง Initializing Beanie for main database with business models") # Re-initialize Beanie for main database with business models await init_beanie(database=self.main_db, document_models=document_models) - await self.module_logger.log_info("โœ… Beanie initialization completed for main database") + await self.module_logger.log_info("Beanie initialization completed for main database") return self.main_db - async def get_db(self, product_id: str) -> AsyncIOMotorDatabase: - """Legacy method for compatibility - just returns database without Beanie initialization""" - cached = self._cache.get(product_id) - if cached: - self._cache.move_to_end(product_id) - return cached[1] - - # Use get_initialized_db and return the database part - db = await self.get_initialized_db(product_id) - return db - async def _lru_put(self, key: str, value: tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]): async with self._global_lock: - await self.module_logger.log_info(f"๐Ÿ“ Adding {key} to cache, current cache size: {len(self._cache)}") self._cache[key] = value self._cache.move_to_end(key) if len(self._cache) > self.max_size: old_key, (old_client, _, _) = self._cache.popitem(last=False) - await self.module_logger.log_info(f"๐Ÿ—‘๏ธ Cache full, removing LRU tenant: {old_key}") + await self.module_logger.log_info(f"Cache full, removing LRU tenant: {old_key}") try: old_client.close() - await self.module_logger.log_info(f"๐Ÿ”Œ Closed connection for evicted tenant: {old_key}") + await self.module_logger.log_info(f"Closed connection for evicted tenant: {old_key}") except Exception as e: - await self.module_logger.log_error(f"โŒ Error closing connection for {old_key}: {str(e)}") + await self.module_logger.log_error(f"Error closing connection for {old_key}: {str(e)}") self._locks.pop(old_key, None) async def aclose(self): async with self._global_lock: - await self.module_logger.log_info(f"๐Ÿ”Œ Closing all cached connections, count: {len(self._cache)}") for key, (client, _, _) in self._cache.items(): try: client.close() - await self.module_logger.log_info(f"โœ… Closed connection for tenant: {key}") + await self.module_logger.log_info(f"Closed connection for tenant: {key}") except Exception as e: - await self.module_logger.log_error(f"โŒ Error closing connection for {key}: {str(e)}") + await self.module_logger.log_error(f"Error closing connection for {key}: {str(e)}") self._cache.clear() self._locks.clear() - await self.module_logger.log_info("๐Ÿงน Tenant cache cleared successfully") + await self.module_logger.log_info("Tenant cache cleared successfully") def register(app): @@ -196,30 +185,22 @@ async def initiate_database(app): module_logger = ModuleLogger(sender_id="DatabaseInit") - await module_logger.log_info("๐Ÿš€ Starting database initialization") - # 1) Create main/catalog client + DB - await module_logger.log_info(f"๐Ÿ”— Creating main MongoDB client with URI: {app_settings.MONGODB_URI[:20]}...") MAIN_CLIENT = AsyncIOMotorClient(app_settings.MONGODB_URI) main_db = MAIN_CLIENT[app_settings.MONGODB_NAME] - await module_logger.log_info(f"๐Ÿ“Š Main database created: {app_settings.MONGODB_NAME}") # 2) Initialize Beanie for main DB with business document models - await module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for main database with {len(document_models)} document models") await init_beanie(database=main_db, document_models=document_models) - await module_logger.log_info("โœ… Beanie initialization completed for main database") # 3) Create tenant cache that uses main_db lookups to resolve product_id -> tenant db max_cache_size = getattr(app_settings, 'TENANT_CACHE_MAX', 64) - await module_logger.log_info(f"๐Ÿ“ฆ Creating tenant cache with max size: {max_cache_size}") TENANT_CACHE = TenantDBCache(main_db, max_size=max_cache_size) # 4) Store on app state for middleware to access app.state.main_db = main_db app.state.tenant_cache = TENANT_CACHE - await module_logger.log_info("โœ… Database and tenant cache initialized successfully") - print("Database and tenant cache initialized successfully") + await module_logger.log_info("Database and tenant cache initialized successfully") async def cleanup_database(): @@ -227,15 +208,11 @@ async def cleanup_database(): global MAIN_CLIENT, TENANT_CACHE module_logger = ModuleLogger(sender_id="DatabaseCleanup") - await module_logger.log_info("๐Ÿงน Starting database cleanup") if TENANT_CACHE: - await module_logger.log_info("๐Ÿ“ฆ Closing tenant cache") await TENANT_CACHE.aclose() if MAIN_CLIENT: - await module_logger.log_info("๐Ÿ”Œ Closing main database client") MAIN_CLIENT.close() - await module_logger.log_info("โœ… Database connections closed successfully") - print("Database connections closed successfully") \ No newline at end of file + await module_logger.log_info("Database connections closed successfully") \ No newline at end of file