diff --git a/apps/notification/webapi/middleware/tenant_DB_connection_middleware.py b/apps/notification/webapi/middleware/tenant_DB_connection_middleware.py index c251b93..6af73ce 100644 --- a/apps/notification/webapi/middleware/tenant_DB_connection_middleware.py +++ b/apps/notification/webapi/middleware/tenant_DB_connection_middleware.py @@ -1,191 +1,78 @@ -from fastapi import Request, status, Response -from urllib.parse import urlparse -from motor.motor_asyncio import AsyncIOMotorClient -from beanie import init_beanie -from contextlib import asynccontextmanager -import os - -from common.config.app_settings import app_settings -from common.log.module_logger import ModuleLogger +from fastapi import Request, status +from fastapi.responses import JSONResponse from webapi.middleware.freeleaps_auth_middleware import request_context_var -from backend.models.models import ( - MessageTemplateDoc, - EmailSenderDoc, - EmailSendStatusDoc, - EmailTrackingDoc, - EmailBounceDoc, - UsageLogDoc -) - -# MongoDB config (moved from providers/database.py) -MONGODB_URI = os.getenv('MONGODB_URI') -MONGODB_NAME = os.getenv('MONGODB_NAME') - -# Create MongoDB client (moved from providers/database.py) -client = AsyncIOMotorClient( - MONGODB_URI, - serverSelectionTimeoutMS=60000, - minPoolSize=5, - maxPoolSize=20, - heartbeatFrequencyMS=20000, -) - -# Define all document models (moved from providers/database.py) -document_models = [ - MessageTemplateDoc, - EmailSenderDoc, - EmailSendStatusDoc, - EmailTrackingDoc, - EmailBounceDoc, - UsageLogDoc -] +from common.log.module_logger import ModuleLogger class TenantDBConnectionMiddleware: - """TenantDBConnectionMiddleware: Request-level tenant database isolation middleware - - Depends on Auth middleware to get the product_id after API Key validation. - Must be executed after Auth middleware. - """ - def __init__(self, app): self.app = app self.module_logger = ModuleLogger(sender_id=TenantDBConnectionMiddleware) - + async def __call__(self, scope, receive, send): - """Process request, automatically switch tenant database""" if scope["type"] != "http": - await self.app(scope, receive, send) - return - + return await self.app(scope, receive, send) + request = Request(scope, receive) - - # Check if request has API Key - if not, use default database (compatibility mode) - api_key = request.headers.get("X-API-KEY") - if not api_key or api_key == "": - await self.module_logger.log_info(f"No API Key provided - using default database for path: {request.url.path}") - # Ensure default database connection is available for business logic - await self._ensure_default_database_initialized() - await self.app(scope, receive, send) - return - - # Get product_id from Auth middleware context + + # Get tenant id from auth context (set by FreeleapsAuthMiddleware) + product_id = None try: - request_context = request_context_var.get() - product_id = request_context.product_id - - if not product_id: - await self.module_logger.log_error(f"Request {request.url.path} API Key validation successful but no tenant information") - response = Response( - status_code=status.HTTP_400_BAD_REQUEST, - content='{"detail": "API Key does not contain valid tenant information"}', - media_type="application/json" - ) - await response(scope, receive, send) - return - + ctx = request_context_var.get() + product_id = getattr(ctx, "product_id", None) + await self.module_logger.log_info(f"Retrieved product_id from auth context: {product_id}") except Exception as e: - await self.module_logger.log_error(f"request {request.url.path} cannot get Auth context: {str(e)}") - response = Response( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - content='{"detail": "Auth middleware not executed correctly, please check middleware order"}', - media_type="application/json" - ) - await response(scope, receive, send) - return - + await self.module_logger.log_error(f"Failed to get auth context: {str(e)}") + product_id = None + + # Get tenant cache and main database from app state try: - # Use context manager to ensure request-level database isolation - async with self._get_tenant_database_context(product_id) as tenant_database: - await self.module_logger.log_info(f"Switch to tenant database: {tenant_database.name}") - - await self.app(scope, receive, send) + tenant_cache = request.app.state.tenant_cache + main_db = request.app.state.main_db + await self.module_logger.log_info(f"Retrieved app state - tenant_cache: {'✅' if tenant_cache is not None else '❌'}, main_db: {'✅' if main_db is not None else '❌'}") + except Exception as e: + await self.module_logger.log_error(f"Failed to get app state: {str(e)}") + response = JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"detail": "Database not properly initialized"} + ) + return await response(scope, receive, send) + + if not product_id: + # Compatibility / public routes: use main database with tenant models initialized + await self.module_logger.log_info(f"No product_id - using main database for path: {request.url.path}") + + # Get main database with Beanie initialized for tenant models + main_db_initialized = await tenant_cache.get_main_db_initialized() + + request.state.db = main_db_initialized + request.state.product_id = None + await self.module_logger.log_info(f"✅ Successfully initialized main database with tenant models") + return await self.app(scope, receive, send) + + try: + # Get tenant-specific database with Beanie already initialized (cached) + await self.module_logger.log_info(f"Attempting to get tenant database for product_id: {product_id}") + tenant_db = await tenant_cache.get_initialized_db(product_id) + + request.state.db = tenant_db + request.state.product_id = product_id + await self.module_logger.log_info(f"✅ Successfully retrieved cached tenant database with Beanie for product_id: {product_id}") + return await self.app(scope, receive, send) except ValueError as e: - await self.module_logger.log_error(f"Tenant validation failed: {str(e)}") - response = Response( - status_code=status.HTTP_404_NOT_FOUND, - content=f'{{"detail": "{str(e)}"}}', - media_type="application/json" + # Handle tenant not found or inactive (ValueError from TenantDBCache) + await self.module_logger.log_error(f"Tenant error for {product_id}: {str(e)}") + response = JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={"detail": str(e)} ) - await response(scope, receive, send) - except Exception as e: - await self.module_logger.log_error(f"Failed to get tenant database: {str(e)}") - response = Response( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - content='{"detail": "Database connection failed"}', - media_type="application/json" - ) - await response(scope, receive, send) - - @asynccontextmanager - async def _get_tenant_database_context(self, product_id: str): - """Get tenant database context manager, ensure request-level isolation""" - tenant_client = None - try: - # Use existing main database client to get tenant information - main_db = client[app_settings.MONGODB_NAME] - tenant_collection = main_db.tenant_doc - - tenant_doc = await tenant_collection.find_one({"product_id": product_id}) - if not tenant_doc: - raise ValueError(f"Tenant {product_id} does not exist") - - if tenant_doc.get("status") != "active": - raise ValueError(f"Tenant {product_id} is not active") - - # Create new database connection for tenant (each request has its own connection) - tenant_client = AsyncIOMotorClient( - tenant_doc["mongodb_uri"], - serverSelectionTimeoutMS=10000, - minPoolSize=5, - maxPoolSize=20, - ) - - # Extract database name from URI - db_name = self._extract_db_name_from_uri(tenant_doc["mongodb_uri"]) - database = tenant_client[db_name] - - # Initialize Beanie for this request (using independent database instance) - # TODO: check whether only MessageTemplateDoc is needed - db_models = [MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc] - await init_beanie(database=database, document_models=db_models) - await self.module_logger.log_info(f"Only initialized base model for tenant {product_id}") - - await self.module_logger.log_info(f"Created independent database connection for tenant {product_id}: {db_name}") - - try: - yield database - finally: - # Close connection safely after request - try: - if tenant_client: - tenant_client.close() - await self.module_logger.log_info(f"Closed database connection for tenant {product_id}") - except Exception as close_error: - logger.warning(f"Error closing database connection for tenant {product_id}: {str(close_error)}") - - except Exception as e: - await self.module_logger.log_error(f"Failed to get tenant {product_id} database: {str(e)}") - raise - - def _extract_db_name_from_uri(self, uri: str) -> str: - """Extract database name from MongoDB URI""" - # TODO: check whether app_settings.MONGODB_NAME can be used as default database name - try: - parsed = urlparse(uri) - return parsed.path.lstrip('/') or app_settings.MONGODB_NAME - except Exception: - return app_settings.MONGODB_NAME - - async def _ensure_default_database_initialized(self): - """Ensure default database connection is properly initialized for requests without API Key""" - try: - # Initialize default database with all models for compatibility mode - default_db = client[MONGODB_NAME] - await init_beanie(database=default_db, document_models=document_models) - await self.module_logger.log_info("Default database initialized for compatibility mode") + return await response(scope, receive, send) except Exception as e: - await self.module_logger.log_error(f"Failed to initialize default database: {str(e)}") - \ No newline at end of file + await self.module_logger.log_error(f"Database error for tenant {product_id}: {str(e)}") + response = JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"detail": "Database connection error"} + ) + return await response(scope, receive, send) \ No newline at end of file