from fastapi import Request, status, Response from urllib.parse import urlparse from motor.motor_asyncio import AsyncIOMotorClient from beanie import init_beanie from contextlib import asynccontextmanager import os from common.config.app_settings import app_settings from common.log.module_logger import ModuleLogger from webapi.middleware.freeleaps_auth_middleware import request_context_var from backend.models.models import ( MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc ) # MongoDB config (moved from providers/database.py) MONGODB_URI = os.getenv('MONGODB_URI') MONGODB_NAME = os.getenv('MONGODB_NAME') # Create MongoDB client (moved from providers/database.py) client = AsyncIOMotorClient( MONGODB_URI, serverSelectionTimeoutMS=60000, minPoolSize=5, maxPoolSize=20, heartbeatFrequencyMS=20000, ) # Define all document models (moved from providers/database.py) document_models = [ MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc ] class TenantDBConnectionMiddleware: """TenantDBConnectionMiddleware: Request-level tenant database isolation middleware Depends on Auth middleware to get the product_id after API Key validation. Must be executed after Auth middleware. """ def __init__(self, app): self.app = app self.module_logger = ModuleLogger(sender_id=TenantDBConnectionMiddleware) async def __call__(self, scope, receive, send): """Process request, automatically switch tenant database""" if scope["type"] != "http": await self.app(scope, receive, send) return request = Request(scope, receive) # Check if request has API Key - if not, use default database (compatibility mode) api_key = request.headers.get("X-API-KEY") if not api_key or api_key == "": await self.module_logger.log_info(f"No API Key provided - using default database for path: {request.url.path}") # Ensure default database connection is available for business logic await self._ensure_default_database_initialized() await self.app(scope, receive, send) return # Get product_id from Auth middleware context try: request_context = request_context_var.get() product_id = request_context.product_id if not product_id: await self.module_logger.log_error(f"Request {request.url.path} API Key validation successful but no tenant information") response = Response( status_code=status.HTTP_400_BAD_REQUEST, content='{"detail": "API Key does not contain valid tenant information"}', media_type="application/json" ) await response(scope, receive, send) return except Exception as e: await self.module_logger.log_error(f"request {request.url.path} cannot get Auth context: {str(e)}") response = Response( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content='{"detail": "Auth middleware not executed correctly, please check middleware order"}', media_type="application/json" ) await response(scope, receive, send) return try: # Use context manager to ensure request-level database isolation async with self._get_tenant_database_context(product_id) as tenant_database: await self.module_logger.log_info(f"Switch to tenant database: {tenant_database.name}") await self.app(scope, receive, send) except ValueError as e: await self.module_logger.log_error(f"Tenant validation failed: {str(e)}") response = Response( status_code=status.HTTP_404_NOT_FOUND, content=f'{{"detail": "{str(e)}"}}', media_type="application/json" ) await response(scope, receive, send) except Exception as e: await self.module_logger.log_error(f"Failed to get tenant database: {str(e)}") response = Response( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content='{"detail": "Database connection failed"}', media_type="application/json" ) await response(scope, receive, send) @asynccontextmanager async def _get_tenant_database_context(self, product_id: str): """Get tenant database context manager, ensure request-level isolation""" tenant_client = None try: # Use existing main database client to get tenant information main_db = client[app_settings.MONGODB_NAME] tenant_collection = main_db.tenant_doc tenant_doc = await tenant_collection.find_one({"product_id": product_id}) if not tenant_doc: raise ValueError(f"Tenant {product_id} does not exist") if tenant_doc.get("status") != "active": raise ValueError(f"Tenant {product_id} is not active") # Create new database connection for tenant (each request has its own connection) tenant_client = AsyncIOMotorClient( tenant_doc["mongodb_uri"], serverSelectionTimeoutMS=10000, minPoolSize=5, maxPoolSize=20, ) # Extract database name from URI db_name = self._extract_db_name_from_uri(tenant_doc["mongodb_uri"]) database = tenant_client[db_name] # Initialize Beanie for this request (using independent database instance) # TODO: check whether only MessageTemplateDoc is needed db_models = [MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc] await init_beanie(database=database, document_models=db_models) await self.module_logger.log_info(f"Only initialized base model for tenant {product_id}") await self.module_logger.log_info(f"Created independent database connection for tenant {product_id}: {db_name}") try: yield database finally: # Close connection safely after request try: if tenant_client: tenant_client.close() await self.module_logger.log_info(f"Closed database connection for tenant {product_id}") except Exception as close_error: logger.warning(f"Error closing database connection for tenant {product_id}: {str(close_error)}") except Exception as e: await self.module_logger.log_error(f"Failed to get tenant {product_id} database: {str(e)}") raise def _extract_db_name_from_uri(self, uri: str) -> str: """Extract database name from MongoDB URI""" # TODO: check whether app_settings.MONGODB_NAME can be used as default database name try: parsed = urlparse(uri) return parsed.path.lstrip('/') or app_settings.MONGODB_NAME except Exception: return app_settings.MONGODB_NAME async def _ensure_default_database_initialized(self): """Ensure default database connection is properly initialized for requests without API Key""" try: # Initialize default database with all models for compatibility mode default_db = client[MONGODB_NAME] await init_beanie(database=default_db, document_models=document_models) await self.module_logger.log_info("Default database initialized for compatibility mode") except Exception as e: await self.module_logger.log_error(f"Failed to initialize default database: {str(e)}")