feat(cache): use new defined database
This commit is contained in:
parent
f3a5f6321a
commit
dc1ebf2481
@ -1,191 +1,78 @@
|
|||||||
from fastapi import Request, status, Response
|
from fastapi import Request, status
|
||||||
from urllib.parse import urlparse
|
from fastapi.responses import JSONResponse
|
||||||
from motor.motor_asyncio import AsyncIOMotorClient
|
|
||||||
from beanie import init_beanie
|
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
import os
|
|
||||||
|
|
||||||
from common.config.app_settings import app_settings
|
|
||||||
from common.log.module_logger import ModuleLogger
|
|
||||||
from webapi.middleware.freeleaps_auth_middleware import request_context_var
|
from webapi.middleware.freeleaps_auth_middleware import request_context_var
|
||||||
from backend.models.models import (
|
from common.log.module_logger import ModuleLogger
|
||||||
MessageTemplateDoc,
|
|
||||||
EmailSenderDoc,
|
|
||||||
EmailSendStatusDoc,
|
|
||||||
EmailTrackingDoc,
|
|
||||||
EmailBounceDoc,
|
|
||||||
UsageLogDoc
|
|
||||||
)
|
|
||||||
|
|
||||||
# MongoDB config (moved from providers/database.py)
|
|
||||||
MONGODB_URI = os.getenv('MONGODB_URI')
|
|
||||||
MONGODB_NAME = os.getenv('MONGODB_NAME')
|
|
||||||
|
|
||||||
# Create MongoDB client (moved from providers/database.py)
|
|
||||||
client = AsyncIOMotorClient(
|
|
||||||
MONGODB_URI,
|
|
||||||
serverSelectionTimeoutMS=60000,
|
|
||||||
minPoolSize=5,
|
|
||||||
maxPoolSize=20,
|
|
||||||
heartbeatFrequencyMS=20000,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Define all document models (moved from providers/database.py)
|
|
||||||
document_models = [
|
|
||||||
MessageTemplateDoc,
|
|
||||||
EmailSenderDoc,
|
|
||||||
EmailSendStatusDoc,
|
|
||||||
EmailTrackingDoc,
|
|
||||||
EmailBounceDoc,
|
|
||||||
UsageLogDoc
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class TenantDBConnectionMiddleware:
|
class TenantDBConnectionMiddleware:
|
||||||
"""TenantDBConnectionMiddleware: Request-level tenant database isolation middleware
|
|
||||||
|
|
||||||
Depends on Auth middleware to get the product_id after API Key validation.
|
|
||||||
Must be executed after Auth middleware.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, app):
|
def __init__(self, app):
|
||||||
self.app = app
|
self.app = app
|
||||||
self.module_logger = ModuleLogger(sender_id=TenantDBConnectionMiddleware)
|
self.module_logger = ModuleLogger(sender_id=TenantDBConnectionMiddleware)
|
||||||
|
|
||||||
async def __call__(self, scope, receive, send):
|
async def __call__(self, scope, receive, send):
|
||||||
"""Process request, automatically switch tenant database"""
|
|
||||||
if scope["type"] != "http":
|
if scope["type"] != "http":
|
||||||
await self.app(scope, receive, send)
|
return await self.app(scope, receive, send)
|
||||||
return
|
|
||||||
|
|
||||||
request = Request(scope, receive)
|
request = Request(scope, receive)
|
||||||
|
|
||||||
# Check if request has API Key - if not, use default database (compatibility mode)
|
# Get tenant id from auth context (set by FreeleapsAuthMiddleware)
|
||||||
api_key = request.headers.get("X-API-KEY")
|
product_id = None
|
||||||
if not api_key or api_key == "":
|
|
||||||
await self.module_logger.log_info(f"No API Key provided - using default database for path: {request.url.path}")
|
|
||||||
# Ensure default database connection is available for business logic
|
|
||||||
await self._ensure_default_database_initialized()
|
|
||||||
await self.app(scope, receive, send)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Get product_id from Auth middleware context
|
|
||||||
try:
|
try:
|
||||||
request_context = request_context_var.get()
|
ctx = request_context_var.get()
|
||||||
product_id = request_context.product_id
|
product_id = getattr(ctx, "product_id", None)
|
||||||
|
await self.module_logger.log_info(f"Retrieved product_id from auth context: {product_id}")
|
||||||
if not product_id:
|
|
||||||
await self.module_logger.log_error(f"Request {request.url.path} API Key validation successful but no tenant information")
|
|
||||||
response = Response(
|
|
||||||
status_code=status.HTTP_400_BAD_REQUEST,
|
|
||||||
content='{"detail": "API Key does not contain valid tenant information"}',
|
|
||||||
media_type="application/json"
|
|
||||||
)
|
|
||||||
await response(scope, receive, send)
|
|
||||||
return
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
await self.module_logger.log_error(f"request {request.url.path} cannot get Auth context: {str(e)}")
|
await self.module_logger.log_error(f"Failed to get auth context: {str(e)}")
|
||||||
response = Response(
|
product_id = None
|
||||||
|
|
||||||
|
# Get tenant cache and main database from app state
|
||||||
|
try:
|
||||||
|
tenant_cache = request.app.state.tenant_cache
|
||||||
|
main_db = request.app.state.main_db
|
||||||
|
await self.module_logger.log_info(f"Retrieved app state - tenant_cache: {'✅' if tenant_cache is not None else '❌'}, main_db: {'✅' if main_db is not None else '❌'}")
|
||||||
|
except Exception as e:
|
||||||
|
await self.module_logger.log_error(f"Failed to get app state: {str(e)}")
|
||||||
|
response = JSONResponse(
|
||||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||||
content='{"detail": "Auth middleware not executed correctly, please check middleware order"}',
|
content={"detail": "Database not properly initialized"}
|
||||||
media_type="application/json"
|
|
||||||
)
|
)
|
||||||
await response(scope, receive, send)
|
return await response(scope, receive, send)
|
||||||
return
|
|
||||||
|
if not product_id:
|
||||||
|
# Compatibility / public routes: use main database with tenant models initialized
|
||||||
|
await self.module_logger.log_info(f"No product_id - using main database for path: {request.url.path}")
|
||||||
|
|
||||||
|
# Get main database with Beanie initialized for tenant models
|
||||||
|
main_db_initialized = await tenant_cache.get_main_db_initialized()
|
||||||
|
|
||||||
|
request.state.db = main_db_initialized
|
||||||
|
request.state.product_id = None
|
||||||
|
await self.module_logger.log_info(f"✅ Successfully initialized main database with tenant models")
|
||||||
|
return await self.app(scope, receive, send)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Use context manager to ensure request-level database isolation
|
# Get tenant-specific database with Beanie already initialized (cached)
|
||||||
async with self._get_tenant_database_context(product_id) as tenant_database:
|
await self.module_logger.log_info(f"Attempting to get tenant database for product_id: {product_id}")
|
||||||
await self.module_logger.log_info(f"Switch to tenant database: {tenant_database.name}")
|
tenant_db = await tenant_cache.get_initialized_db(product_id)
|
||||||
|
|
||||||
await self.app(scope, receive, send)
|
request.state.db = tenant_db
|
||||||
|
request.state.product_id = product_id
|
||||||
|
await self.module_logger.log_info(f"✅ Successfully retrieved cached tenant database with Beanie for product_id: {product_id}")
|
||||||
|
return await self.app(scope, receive, send)
|
||||||
|
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
await self.module_logger.log_error(f"Tenant validation failed: {str(e)}")
|
# Handle tenant not found or inactive (ValueError from TenantDBCache)
|
||||||
response = Response(
|
await self.module_logger.log_error(f"Tenant error for {product_id}: {str(e)}")
|
||||||
|
response = JSONResponse(
|
||||||
status_code=status.HTTP_404_NOT_FOUND,
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
content=f'{{"detail": "{str(e)}"}}',
|
content={"detail": str(e)}
|
||||||
media_type="application/json"
|
|
||||||
)
|
)
|
||||||
await response(scope, receive, send)
|
return await response(scope, receive, send)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
await self.module_logger.log_error(f"Failed to get tenant database: {str(e)}")
|
await self.module_logger.log_error(f"Database error for tenant {product_id}: {str(e)}")
|
||||||
response = Response(
|
response = JSONResponse(
|
||||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||||
content='{"detail": "Database connection failed"}',
|
content={"detail": "Database connection error"}
|
||||||
media_type="application/json"
|
|
||||||
)
|
)
|
||||||
await response(scope, receive, send)
|
return await response(scope, receive, send)
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def _get_tenant_database_context(self, product_id: str):
|
|
||||||
"""Get tenant database context manager, ensure request-level isolation"""
|
|
||||||
tenant_client = None
|
|
||||||
try:
|
|
||||||
# Use existing main database client to get tenant information
|
|
||||||
main_db = client[app_settings.MONGODB_NAME]
|
|
||||||
tenant_collection = main_db.tenant_doc
|
|
||||||
|
|
||||||
tenant_doc = await tenant_collection.find_one({"product_id": product_id})
|
|
||||||
if not tenant_doc:
|
|
||||||
raise ValueError(f"Tenant {product_id} does not exist")
|
|
||||||
|
|
||||||
if tenant_doc.get("status") != "active":
|
|
||||||
raise ValueError(f"Tenant {product_id} is not active")
|
|
||||||
|
|
||||||
# Create new database connection for tenant (each request has its own connection)
|
|
||||||
tenant_client = AsyncIOMotorClient(
|
|
||||||
tenant_doc["mongodb_uri"],
|
|
||||||
serverSelectionTimeoutMS=10000,
|
|
||||||
minPoolSize=5,
|
|
||||||
maxPoolSize=20,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Extract database name from URI
|
|
||||||
db_name = self._extract_db_name_from_uri(tenant_doc["mongodb_uri"])
|
|
||||||
database = tenant_client[db_name]
|
|
||||||
|
|
||||||
# Initialize Beanie for this request (using independent database instance)
|
|
||||||
# TODO: check whether only MessageTemplateDoc is needed
|
|
||||||
db_models = [MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc]
|
|
||||||
await init_beanie(database=database, document_models=db_models)
|
|
||||||
await self.module_logger.log_info(f"Only initialized base model for tenant {product_id}")
|
|
||||||
|
|
||||||
await self.module_logger.log_info(f"Created independent database connection for tenant {product_id}: {db_name}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
yield database
|
|
||||||
finally:
|
|
||||||
# Close connection safely after request
|
|
||||||
try:
|
|
||||||
if tenant_client:
|
|
||||||
tenant_client.close()
|
|
||||||
await self.module_logger.log_info(f"Closed database connection for tenant {product_id}")
|
|
||||||
except Exception as close_error:
|
|
||||||
logger.warning(f"Error closing database connection for tenant {product_id}: {str(close_error)}")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
await self.module_logger.log_error(f"Failed to get tenant {product_id} database: {str(e)}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def _extract_db_name_from_uri(self, uri: str) -> str:
|
|
||||||
"""Extract database name from MongoDB URI"""
|
|
||||||
# TODO: check whether app_settings.MONGODB_NAME can be used as default database name
|
|
||||||
try:
|
|
||||||
parsed = urlparse(uri)
|
|
||||||
return parsed.path.lstrip('/') or app_settings.MONGODB_NAME
|
|
||||||
except Exception:
|
|
||||||
return app_settings.MONGODB_NAME
|
|
||||||
|
|
||||||
async def _ensure_default_database_initialized(self):
|
|
||||||
"""Ensure default database connection is properly initialized for requests without API Key"""
|
|
||||||
try:
|
|
||||||
# Initialize default database with all models for compatibility mode
|
|
||||||
default_db = client[MONGODB_NAME]
|
|
||||||
await init_beanie(database=default_db, document_models=document_models)
|
|
||||||
await self.module_logger.log_info("Default database initialized for compatibility mode")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
await self.module_logger.log_error(f"Failed to initialize default database: {str(e)}")
|
|
||||||
|
|
||||||
Loading…
Reference in New Issue
Block a user