feat(tenant-middleware): add tenant middleware to switch tenant's database
This commit is contained in:
parent
f27080452c
commit
a2fc3c8a7e
140
apps/notification/webapi/middleware/tenant_middleware.py
Normal file
140
apps/notification/webapi/middleware/tenant_middleware.py
Normal file
@ -0,0 +1,140 @@
|
||||
from fastapi import Request, status, Response
|
||||
from urllib.parse import urlparse
|
||||
from motor.motor_asyncio import AsyncIOMotorClient
|
||||
from beanie import init_beanie
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from common.config.app_settings import app_settings
|
||||
from common.log.module_logger import ModuleLogger
|
||||
from webapi.providers.database import client
|
||||
from webapi.middleware.freeleaps_auth_middleware import request_context_var
|
||||
from backend.models.models import MessageTemplateDoc
|
||||
|
||||
|
||||
class TenantMiddleware:
|
||||
"""TenantMiddleware: Request-level tenant database isolation middleware
|
||||
|
||||
Depends on Auth middleware to get the product_id after API Key validation.
|
||||
Must be executed after Auth middleware.
|
||||
"""
|
||||
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
self.module_logger = ModuleLogger(sender_id=TenantMiddleware)
|
||||
|
||||
async def __call__(self, scope, receive, send):
|
||||
"""Process request, automatically switch tenant database"""
|
||||
if scope["type"] != "http":
|
||||
await self.app(scope, receive, send)
|
||||
return
|
||||
|
||||
request = Request(scope, receive)
|
||||
|
||||
# Get product_id from Auth middleware context
|
||||
try:
|
||||
request_context = request_context_var.get()
|
||||
product_id = request_context.product_id
|
||||
|
||||
if not product_id:
|
||||
await self.module_logger.log_error(f"Request {request.url.path} API Key validation successful but no tenant information")
|
||||
response = Response(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
content='{"detail": "API Key does not contain valid tenant information"}',
|
||||
media_type="application/json"
|
||||
)
|
||||
await response(scope, receive, send)
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
await self.module_logger.log_error(f"request {request.url.path} cannot get Auth context: {str(e)}")
|
||||
response = Response(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
content='{"detail": "Auth middleware not executed correctly, please check middleware order"}',
|
||||
media_type="application/json"
|
||||
)
|
||||
await response(scope, receive, send)
|
||||
return
|
||||
|
||||
try:
|
||||
# Use context manager to ensure request-level database isolation
|
||||
async with self._get_tenant_database_context(product_id) as tenant_database:
|
||||
await self.module_logger.log_info(f"Switch to tenant database: {tenant_database.name}")
|
||||
|
||||
await self.app(scope, receive, send)
|
||||
|
||||
except ValueError as e:
|
||||
await self.module_logger.log_error(f"Tenant validation failed: {str(e)}")
|
||||
response = Response(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
content=f'{{"detail": "{str(e)}"}}',
|
||||
media_type="application/json"
|
||||
)
|
||||
await response(scope, receive, send)
|
||||
except Exception as e:
|
||||
await self.module_logger.log_error(f"Failed to get tenant database: {str(e)}")
|
||||
response = Response(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
content='{"detail": "Database connection failed"}',
|
||||
media_type="application/json"
|
||||
)
|
||||
await response(scope, receive, send)
|
||||
|
||||
@asynccontextmanager
|
||||
async def _get_tenant_database_context(self, product_id: str):
|
||||
"""Get tenant database context manager, ensure request-level isolation"""
|
||||
try:
|
||||
# Get tenant information directly from main database
|
||||
main_db = client[app_settings.MONGODB_NAME]
|
||||
tenant_collection = main_db.tenant_doc
|
||||
|
||||
tenant_doc = await tenant_collection.find_one({"product_id": product_id})
|
||||
if not tenant_doc:
|
||||
raise ValueError(f"Tenant {product_id} does not exist")
|
||||
|
||||
if tenant_doc.get("status") != "active":
|
||||
raise ValueError(f"Tenant {product_id} is not active")
|
||||
|
||||
# Create new database connection (each request has its own connection)
|
||||
client = AsyncIOMotorClient(
|
||||
tenant_doc["mongodb_uri"],
|
||||
serverSelectionTimeoutMS=10000,
|
||||
minPoolSize=5,
|
||||
maxPoolSize=20,
|
||||
)
|
||||
|
||||
# Extract database name from URI
|
||||
db_name = self._extract_db_name_from_uri(tenant_doc["mongodb_uri"])
|
||||
database = client[db_name]
|
||||
|
||||
# Initialize Beanie for this request (using independent database instance)
|
||||
# TODO: check whether only MessageTemplateDoc is needed
|
||||
db_models = [MessageTemplateDoc]
|
||||
await init_beanie(database=database, document_models=db_models)
|
||||
await self.module_logger.log_info(f"Only initialized base model for tenant {product_id}")
|
||||
|
||||
await self.module_logger.log_info(f"Created independent database connection for tenant {product_id}: {db_name}")
|
||||
|
||||
try:
|
||||
yield database
|
||||
finally:
|
||||
# Close connection safely after request
|
||||
try:
|
||||
if client:
|
||||
client.close()
|
||||
await self.module_logger.log_info(f"Closed database connection for tenant {product_id}")
|
||||
except Exception as close_error:
|
||||
logger.warning(f"Error closing database connection for tenant {product_id}: {str(close_error)}")
|
||||
|
||||
except Exception as e:
|
||||
await self.module_logger.log_error(f"Failed to get tenant {product_id} database: {str(e)}")
|
||||
raise
|
||||
|
||||
def _extract_db_name_from_uri(self, uri: str) -> str:
|
||||
"""Extract database name from MongoDB URI"""
|
||||
# TODO: check whether app_settings.MONGODB_NAME can be used as default database name
|
||||
try:
|
||||
parsed = urlparse(uri)
|
||||
return parsed.path.lstrip('/') or app_settings.MONGODB_NAME
|
||||
except Exception:
|
||||
return app_settings.MONGODB_NAME
|
||||
|
||||
Loading…
Reference in New Issue
Block a user