diff --git a/apps/notification/webapi/middleware/tenant_middleware.py b/apps/notification/webapi/middleware/tenant_middleware.py new file mode 100644 index 0000000..89a587e --- /dev/null +++ b/apps/notification/webapi/middleware/tenant_middleware.py @@ -0,0 +1,140 @@ +from fastapi import Request, status, Response +from urllib.parse import urlparse +from motor.motor_asyncio import AsyncIOMotorClient +from beanie import init_beanie +from contextlib import asynccontextmanager + +from common.config.app_settings import app_settings +from common.log.module_logger import ModuleLogger +from webapi.providers.database import client +from webapi.middleware.freeleaps_auth_middleware import request_context_var +from backend.models.models import MessageTemplateDoc + + +class TenantMiddleware: + """TenantMiddleware: Request-level tenant database isolation middleware + + Depends on Auth middleware to get the product_id after API Key validation. + Must be executed after Auth middleware. + """ + + def __init__(self, app): + self.app = app + self.module_logger = ModuleLogger(sender_id=TenantMiddleware) + + async def __call__(self, scope, receive, send): + """Process request, automatically switch tenant database""" + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + request = Request(scope, receive) + + # Get product_id from Auth middleware context + try: + request_context = request_context_var.get() + product_id = request_context.product_id + + if not product_id: + await self.module_logger.log_error(f"Request {request.url.path} API Key validation successful but no tenant information") + response = Response( + status_code=status.HTTP_400_BAD_REQUEST, + content='{"detail": "API Key does not contain valid tenant information"}', + media_type="application/json" + ) + await response(scope, receive, send) + return + + except Exception as e: + await self.module_logger.log_error(f"request {request.url.path} cannot get Auth context: {str(e)}") + response = Response( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content='{"detail": "Auth middleware not executed correctly, please check middleware order"}', + media_type="application/json" + ) + await response(scope, receive, send) + return + + try: + # Use context manager to ensure request-level database isolation + async with self._get_tenant_database_context(product_id) as tenant_database: + await self.module_logger.log_info(f"Switch to tenant database: {tenant_database.name}") + + await self.app(scope, receive, send) + + except ValueError as e: + await self.module_logger.log_error(f"Tenant validation failed: {str(e)}") + response = Response( + status_code=status.HTTP_404_NOT_FOUND, + content=f'{{"detail": "{str(e)}"}}', + media_type="application/json" + ) + await response(scope, receive, send) + except Exception as e: + await self.module_logger.log_error(f"Failed to get tenant database: {str(e)}") + response = Response( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content='{"detail": "Database connection failed"}', + media_type="application/json" + ) + await response(scope, receive, send) + + @asynccontextmanager + async def _get_tenant_database_context(self, product_id: str): + """Get tenant database context manager, ensure request-level isolation""" + try: + # Get tenant information directly from main database + main_db = client[app_settings.MONGODB_NAME] + tenant_collection = main_db.tenant_doc + + tenant_doc = await tenant_collection.find_one({"product_id": product_id}) + if not tenant_doc: + raise ValueError(f"Tenant {product_id} does not exist") + + if tenant_doc.get("status") != "active": + raise ValueError(f"Tenant {product_id} is not active") + + # Create new database connection (each request has its own connection) + client = AsyncIOMotorClient( + tenant_doc["mongodb_uri"], + serverSelectionTimeoutMS=10000, + minPoolSize=5, + maxPoolSize=20, + ) + + # Extract database name from URI + db_name = self._extract_db_name_from_uri(tenant_doc["mongodb_uri"]) + database = client[db_name] + + # Initialize Beanie for this request (using independent database instance) + # TODO: check whether only MessageTemplateDoc is needed + db_models = [MessageTemplateDoc] + await init_beanie(database=database, document_models=db_models) + await self.module_logger.log_info(f"Only initialized base model for tenant {product_id}") + + await self.module_logger.log_info(f"Created independent database connection for tenant {product_id}: {db_name}") + + try: + yield database + finally: + # Close connection safely after request + try: + if client: + client.close() + await self.module_logger.log_info(f"Closed database connection for tenant {product_id}") + except Exception as close_error: + logger.warning(f"Error closing database connection for tenant {product_id}: {str(close_error)}") + + except Exception as e: + await self.module_logger.log_error(f"Failed to get tenant {product_id} database: {str(e)}") + raise + + def _extract_db_name_from_uri(self, uri: str) -> str: + """Extract database name from MongoDB URI""" + # TODO: check whether app_settings.MONGODB_NAME can be used as default database name + try: + parsed = urlparse(uri) + return parsed.path.lstrip('/') or app_settings.MONGODB_NAME + except Exception: + return app_settings.MONGODB_NAME +