From a2fc3c8a7ea39f90c4849cea6e817fde2c1fa26d Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Thu, 18 Sep 2025 15:53:03 +0800 Subject: [PATCH 01/14] feat(tenant-middleware): add tenant middleware to switch tenant's database --- .../webapi/middleware/tenant_middleware.py | 140 ++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 apps/notification/webapi/middleware/tenant_middleware.py diff --git a/apps/notification/webapi/middleware/tenant_middleware.py b/apps/notification/webapi/middleware/tenant_middleware.py new file mode 100644 index 0000000..89a587e --- /dev/null +++ b/apps/notification/webapi/middleware/tenant_middleware.py @@ -0,0 +1,140 @@ +from fastapi import Request, status, Response +from urllib.parse import urlparse +from motor.motor_asyncio import AsyncIOMotorClient +from beanie import init_beanie +from contextlib import asynccontextmanager + +from common.config.app_settings import app_settings +from common.log.module_logger import ModuleLogger +from webapi.providers.database import client +from webapi.middleware.freeleaps_auth_middleware import request_context_var +from backend.models.models import MessageTemplateDoc + + +class TenantMiddleware: + """TenantMiddleware: Request-level tenant database isolation middleware + + Depends on Auth middleware to get the product_id after API Key validation. + Must be executed after Auth middleware. + """ + + def __init__(self, app): + self.app = app + self.module_logger = ModuleLogger(sender_id=TenantMiddleware) + + async def __call__(self, scope, receive, send): + """Process request, automatically switch tenant database""" + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + request = Request(scope, receive) + + # Get product_id from Auth middleware context + try: + request_context = request_context_var.get() + product_id = request_context.product_id + + if not product_id: + await self.module_logger.log_error(f"Request {request.url.path} API Key validation successful but no tenant information") + response = Response( + status_code=status.HTTP_400_BAD_REQUEST, + content='{"detail": "API Key does not contain valid tenant information"}', + media_type="application/json" + ) + await response(scope, receive, send) + return + + except Exception as e: + await self.module_logger.log_error(f"request {request.url.path} cannot get Auth context: {str(e)}") + response = Response( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content='{"detail": "Auth middleware not executed correctly, please check middleware order"}', + media_type="application/json" + ) + await response(scope, receive, send) + return + + try: + # Use context manager to ensure request-level database isolation + async with self._get_tenant_database_context(product_id) as tenant_database: + await self.module_logger.log_info(f"Switch to tenant database: {tenant_database.name}") + + await self.app(scope, receive, send) + + except ValueError as e: + await self.module_logger.log_error(f"Tenant validation failed: {str(e)}") + response = Response( + status_code=status.HTTP_404_NOT_FOUND, + content=f'{{"detail": "{str(e)}"}}', + media_type="application/json" + ) + await response(scope, receive, send) + except Exception as e: + await self.module_logger.log_error(f"Failed to get tenant database: {str(e)}") + response = Response( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content='{"detail": "Database connection failed"}', + media_type="application/json" + ) + await response(scope, receive, send) + + @asynccontextmanager + async def _get_tenant_database_context(self, product_id: str): + """Get tenant database context manager, ensure request-level isolation""" + try: + # Get tenant information directly from main database + main_db = client[app_settings.MONGODB_NAME] + tenant_collection = main_db.tenant_doc + + tenant_doc = await tenant_collection.find_one({"product_id": product_id}) + if not tenant_doc: + raise ValueError(f"Tenant {product_id} does not exist") + + if tenant_doc.get("status") != "active": + raise ValueError(f"Tenant {product_id} is not active") + + # Create new database connection (each request has its own connection) + client = AsyncIOMotorClient( + tenant_doc["mongodb_uri"], + serverSelectionTimeoutMS=10000, + minPoolSize=5, + maxPoolSize=20, + ) + + # Extract database name from URI + db_name = self._extract_db_name_from_uri(tenant_doc["mongodb_uri"]) + database = client[db_name] + + # Initialize Beanie for this request (using independent database instance) + # TODO: check whether only MessageTemplateDoc is needed + db_models = [MessageTemplateDoc] + await init_beanie(database=database, document_models=db_models) + await self.module_logger.log_info(f"Only initialized base model for tenant {product_id}") + + await self.module_logger.log_info(f"Created independent database connection for tenant {product_id}: {db_name}") + + try: + yield database + finally: + # Close connection safely after request + try: + if client: + client.close() + await self.module_logger.log_info(f"Closed database connection for tenant {product_id}") + except Exception as close_error: + logger.warning(f"Error closing database connection for tenant {product_id}: {str(close_error)}") + + except Exception as e: + await self.module_logger.log_error(f"Failed to get tenant {product_id} database: {str(e)}") + raise + + def _extract_db_name_from_uri(self, uri: str) -> str: + """Extract database name from MongoDB URI""" + # TODO: check whether app_settings.MONGODB_NAME can be used as default database name + try: + parsed = urlparse(uri) + return parsed.path.lstrip('/') or app_settings.MONGODB_NAME + except Exception: + return app_settings.MONGODB_NAME + From 1ba9a614a0925627189a3c79e5f92144e973760a Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Thu, 18 Sep 2025 15:54:57 +0800 Subject: [PATCH 02/14] feat(config): add configs regarding the tenant middleware --- apps/notification/common/config/app_settings.py | 3 +++ apps/notification/webapi/middleware/__init__.py | 3 ++- apps/notification/webapi/providers/middleware.py | 4 +++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/apps/notification/common/config/app_settings.py b/apps/notification/common/config/app_settings.py index 5346dfe..fb241cb 100644 --- a/apps/notification/common/config/app_settings.py +++ b/apps/notification/common/config/app_settings.py @@ -16,6 +16,9 @@ class AppSettings(BaseSettings): RABBITMQ_PASSWORD: str = "" RABBITMQ_VIRTUAL_HOST: str = "" + MONGODB_URI: str = "" + MONGODB_NAME: str = "" + SYSTEM_USER_ID: str = "" SMS_FROM: str = "" EMAIL_FROM: str = "" diff --git a/apps/notification/webapi/middleware/__init__.py b/apps/notification/webapi/middleware/__init__.py index 5700ce5..68feba8 100644 --- a/apps/notification/webapi/middleware/__init__.py +++ b/apps/notification/webapi/middleware/__init__.py @@ -1,3 +1,4 @@ from .freeleaps_auth_middleware import FreeleapsAuthMiddleware +from .tenant_middleware import TenantMiddleware -__all__ = ['FreeleapsAuthMiddleware'] \ No newline at end of file +__all__ = ['FreeleapsAuthMiddleware', 'TenantMiddleware'] \ No newline at end of file diff --git a/apps/notification/webapi/providers/middleware.py b/apps/notification/webapi/providers/middleware.py index 43df09b..8eac8c1 100644 --- a/apps/notification/webapi/providers/middleware.py +++ b/apps/notification/webapi/providers/middleware.py @@ -1,9 +1,11 @@ from webapi.middleware.freeleaps_auth_middleware import FreeleapsAuthMiddleware +from webapi.middleware.tenant_middleware import TenantMiddleware def register(app): """ Register middleware to FastAPI application """ - # Register API Key middleware + # Register middlewares + app.add_middleware(TenantMiddleware) app.add_middleware(FreeleapsAuthMiddleware) \ No newline at end of file From 115b54ad5802e31e869eab161bf874af87df2aa1 Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Fri, 19 Sep 2025 12:11:51 +0800 Subject: [PATCH 03/14] refactor(name): rename the tenant DB connection middleware --- ....py => tenant_DB_connection_middleware.py} | 75 ++++++++++++++++--- 1 file changed, 63 insertions(+), 12 deletions(-) rename apps/notification/webapi/middleware/{tenant_middleware.py => tenant_DB_connection_middleware.py} (68%) diff --git a/apps/notification/webapi/middleware/tenant_middleware.py b/apps/notification/webapi/middleware/tenant_DB_connection_middleware.py similarity index 68% rename from apps/notification/webapi/middleware/tenant_middleware.py rename to apps/notification/webapi/middleware/tenant_DB_connection_middleware.py index 89a587e..c251b93 100644 --- a/apps/notification/webapi/middleware/tenant_middleware.py +++ b/apps/notification/webapi/middleware/tenant_DB_connection_middleware.py @@ -3,16 +3,46 @@ from urllib.parse import urlparse from motor.motor_asyncio import AsyncIOMotorClient from beanie import init_beanie from contextlib import asynccontextmanager +import os from common.config.app_settings import app_settings from common.log.module_logger import ModuleLogger -from webapi.providers.database import client from webapi.middleware.freeleaps_auth_middleware import request_context_var -from backend.models.models import MessageTemplateDoc +from backend.models.models import ( + MessageTemplateDoc, + EmailSenderDoc, + EmailSendStatusDoc, + EmailTrackingDoc, + EmailBounceDoc, + UsageLogDoc +) + +# MongoDB config (moved from providers/database.py) +MONGODB_URI = os.getenv('MONGODB_URI') +MONGODB_NAME = os.getenv('MONGODB_NAME') + +# Create MongoDB client (moved from providers/database.py) +client = AsyncIOMotorClient( + MONGODB_URI, + serverSelectionTimeoutMS=60000, + minPoolSize=5, + maxPoolSize=20, + heartbeatFrequencyMS=20000, +) + +# Define all document models (moved from providers/database.py) +document_models = [ + MessageTemplateDoc, + EmailSenderDoc, + EmailSendStatusDoc, + EmailTrackingDoc, + EmailBounceDoc, + UsageLogDoc +] -class TenantMiddleware: - """TenantMiddleware: Request-level tenant database isolation middleware +class TenantDBConnectionMiddleware: + """TenantDBConnectionMiddleware: Request-level tenant database isolation middleware Depends on Auth middleware to get the product_id after API Key validation. Must be executed after Auth middleware. @@ -20,7 +50,7 @@ class TenantMiddleware: def __init__(self, app): self.app = app - self.module_logger = ModuleLogger(sender_id=TenantMiddleware) + self.module_logger = ModuleLogger(sender_id=TenantDBConnectionMiddleware) async def __call__(self, scope, receive, send): """Process request, automatically switch tenant database""" @@ -30,6 +60,15 @@ class TenantMiddleware: request = Request(scope, receive) + # Check if request has API Key - if not, use default database (compatibility mode) + api_key = request.headers.get("X-API-KEY") + if not api_key or api_key == "": + await self.module_logger.log_info(f"No API Key provided - using default database for path: {request.url.path}") + # Ensure default database connection is available for business logic + await self._ensure_default_database_initialized() + await self.app(scope, receive, send) + return + # Get product_id from Auth middleware context try: request_context = request_context_var.get() @@ -82,8 +121,9 @@ class TenantMiddleware: @asynccontextmanager async def _get_tenant_database_context(self, product_id: str): """Get tenant database context manager, ensure request-level isolation""" + tenant_client = None try: - # Get tenant information directly from main database + # Use existing main database client to get tenant information main_db = client[app_settings.MONGODB_NAME] tenant_collection = main_db.tenant_doc @@ -94,8 +134,8 @@ class TenantMiddleware: if tenant_doc.get("status") != "active": raise ValueError(f"Tenant {product_id} is not active") - # Create new database connection (each request has its own connection) - client = AsyncIOMotorClient( + # Create new database connection for tenant (each request has its own connection) + tenant_client = AsyncIOMotorClient( tenant_doc["mongodb_uri"], serverSelectionTimeoutMS=10000, minPoolSize=5, @@ -104,11 +144,11 @@ class TenantMiddleware: # Extract database name from URI db_name = self._extract_db_name_from_uri(tenant_doc["mongodb_uri"]) - database = client[db_name] + database = tenant_client[db_name] # Initialize Beanie for this request (using independent database instance) # TODO: check whether only MessageTemplateDoc is needed - db_models = [MessageTemplateDoc] + db_models = [MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc] await init_beanie(database=database, document_models=db_models) await self.module_logger.log_info(f"Only initialized base model for tenant {product_id}") @@ -119,8 +159,8 @@ class TenantMiddleware: finally: # Close connection safely after request try: - if client: - client.close() + if tenant_client: + tenant_client.close() await self.module_logger.log_info(f"Closed database connection for tenant {product_id}") except Exception as close_error: logger.warning(f"Error closing database connection for tenant {product_id}: {str(close_error)}") @@ -137,4 +177,15 @@ class TenantMiddleware: return parsed.path.lstrip('/') or app_settings.MONGODB_NAME except Exception: return app_settings.MONGODB_NAME + + async def _ensure_default_database_initialized(self): + """Ensure default database connection is properly initialized for requests without API Key""" + try: + # Initialize default database with all models for compatibility mode + default_db = client[MONGODB_NAME] + await init_beanie(database=default_db, document_models=document_models) + await self.module_logger.log_info("Default database initialized for compatibility mode") + except Exception as e: + await self.module_logger.log_error(f"Failed to initialize default database: {str(e)}") + \ No newline at end of file From e726d7e7d5fe41a11529591ec7ec5f55ac3c1647 Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Fri, 19 Sep 2025 12:13:12 +0800 Subject: [PATCH 04/14] feat(name): use new name --- apps/notification/webapi/middleware/__init__.py | 4 ++-- apps/notification/webapi/providers/middleware.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/notification/webapi/middleware/__init__.py b/apps/notification/webapi/middleware/__init__.py index 68feba8..dfd330f 100644 --- a/apps/notification/webapi/middleware/__init__.py +++ b/apps/notification/webapi/middleware/__init__.py @@ -1,4 +1,4 @@ from .freeleaps_auth_middleware import FreeleapsAuthMiddleware -from .tenant_middleware import TenantMiddleware +from .tenant_DB_connection_middleware import TenantDBConnectionMiddleware -__all__ = ['FreeleapsAuthMiddleware', 'TenantMiddleware'] \ No newline at end of file +__all__ = ['FreeleapsAuthMiddleware', 'TenantDBConnectionMiddleware'] \ No newline at end of file diff --git a/apps/notification/webapi/providers/middleware.py b/apps/notification/webapi/providers/middleware.py index 8eac8c1..0de606c 100644 --- a/apps/notification/webapi/providers/middleware.py +++ b/apps/notification/webapi/providers/middleware.py @@ -1,5 +1,5 @@ from webapi.middleware.freeleaps_auth_middleware import FreeleapsAuthMiddleware -from webapi.middleware.tenant_middleware import TenantMiddleware +from webapi.middleware.tenant_DB_connection_middleware import TenantDBConnectionMiddleware def register(app): @@ -7,5 +7,5 @@ def register(app): Register middleware to FastAPI application """ # Register middlewares - app.add_middleware(TenantMiddleware) + app.add_middleware(TenantDBConnectionMiddleware) app.add_middleware(FreeleapsAuthMiddleware) \ No newline at end of file From 9dc8811886a20ca3979515fe199f8083072a4bc5 Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Fri, 19 Sep 2025 12:14:16 +0800 Subject: [PATCH 05/14] refactor(database): move the initialization of database into tenant middleware --- .../notification/webapi/providers/database.py | 40 ++----------------- 1 file changed, 3 insertions(+), 37 deletions(-) diff --git a/apps/notification/webapi/providers/database.py b/apps/notification/webapi/providers/database.py index 8a6e350..f259607 100644 --- a/apps/notification/webapi/providers/database.py +++ b/apps/notification/webapi/providers/database.py @@ -1,44 +1,10 @@ from webapi.config.site_settings import site_settings -from beanie import init_beanie -from motor.motor_asyncio import AsyncIOMotorClient -from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc -import os -# MongoDB config -MONGODB_URI = os.getenv('MONGODB_URI') -MONGODB_NAME = os.getenv('MONGODB_NAME') - -# create MongoDB client -client = AsyncIOMotorClient( - MONGODB_URI, - serverSelectionTimeoutMS=60000, - minPoolSize=5, - maxPoolSize=20, - heartbeatFrequencyMS=20000, -) - -# define all document models -document_models = [ - MessageTemplateDoc, - EmailSenderDoc, - EmailSendStatusDoc, - EmailTrackingDoc, - EmailBounceDoc, - UsageLogDoc -] def register(app): + """Register database-related configurations""" app.debug = site_settings.DEBUG app.title = site_settings.NAME - @app.on_event("startup") - async def start_database(): - await initiate_database() - - -async def initiate_database(): - """initiate Beanie database connection""" - await init_beanie( - database=client[MONGODB_NAME], - document_models=document_models - ) \ No newline at end of file + # Database initialization is now handled by TenantDBConnectionMiddleware + # to support per-request tenant database switching \ No newline at end of file From 065c082aa7dfbd07b46b95c5c3da7c8ba174ee90 Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Fri, 19 Sep 2025 12:15:05 +0800 Subject: [PATCH 06/14] feat(guide): guide to use tenant middleware --- apps/notification/webapi/middleware/README.md | 130 ++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 apps/notification/webapi/middleware/README.md diff --git a/apps/notification/webapi/middleware/README.md b/apps/notification/webapi/middleware/README.md new file mode 100644 index 0000000..a01f1ce --- /dev/null +++ b/apps/notification/webapi/middleware/README.md @@ -0,0 +1,130 @@ +# Notification Service Middleware Guide + +This guide explains how to use and test the middleware system of the notification service. + +## Middleware Architecture + +### Middleware Execution Order +``` +Client Request โ†’ FreeleapsAuthMiddleware โ†’ TenantDBConnectionMiddleware โ†’ Business Logic +``` + +1. **FreeleapsAuthMiddleware**: API Key validation and path skipping +2. **TenantDBConnectionMiddleware**: Tenant database switching (based on product_id) + +## 1. Setup API Key + +### 1.1 Register API Key via freeleaps-auth service + +Ensure the freeleaps-auth service is running on port 9000, then execute the following commands: + +```bash +# Register API KEY for magicleaps tenant +curl -X POST "http://localhost:9000/api/v1/keys/register_api_key" \ + -H "Content-Type: application/json" \ + -d '{ + "tenant_name": "magicleaps", + "product_id": "68a3f19119cfaf36316f6d14", + "scopes": ["notify.send_notification"], + "expires_at": "2099-12-31T23:59:59Z" + }' + +# Register API KEY for test-a tenant +curl -X POST "http://localhost:9000/api/v1/keys/register_api_key" \ + -H "Content-Type: application/json" \ + -d '{ + "tenant_name": "test-a", + "product_id": "68a3f19119cfaf36316f6d15", + "scopes": ["notify.send_notification"], + "expires_at": "2099-12-31T23:59:59Z" + }' + +# Register API KEY for test-b tenant +curl -X POST "http://localhost:9000/api/v1/keys/register_api_key" \ + -H "Content-Type: application/json" \ + -d '{ + "tenant_name": "test-b", + "product_id": "68a3f19119cfaf36316f6d16", + "scopes": ["notify.send_notification"], + "expires_at": "2099-12-31T23:59:59Z" + }' +``` + +### 1.2 Record the returned API KEY + +Example successful response: +```json +{ + "success": true, + "data": { + "tenant_name": "magicleaps", + "product_id": "68a3f19119cfaf36316f6d14", + "api_key": { + "key_id": "ak_live_UkIcMxwBXIw", + "api_key": "ak_live_UkIcMxwBXIw.J7qWirjL0IJkmvqktjEh3ViveP8dgiturxyy0KJ5sKk", + "status": "active" + } + } +} +``` + +## 2. Configure Tenant Database + +Create tenant documents in the `tenant_doc` collection of the main database: + +```json +[ + { + "tenant_name": "magicleaps", + "product_id": "68a3f19119cfaf36316f6d14", + "mongodb_uri": "mongodb://localhost:27017/interview", + "status": "active" + }, + { + "tenant_name": "test-a", + "product_id": "68a3f19119cfaf36316f6d15", + "mongodb_uri": "mongodb://localhost:27017/test-a", + "status": "active" + }, + { + "tenant_name": "test-b", + "product_id": "68a3f19119cfaf36316f6d16", + "mongodb_uri": "mongodb://localhost:27017/test-b", + "status": "active" + } +] +``` + +## 3. Test Middleware Functionality + +### 3.1 Test System Endpoints (Skip Validation) + +```bash +# Health check endpoints - should return 200, skip all validation +curl -v "http://localhost:8104/api/_/healthz" +curl -v "http://localhost:8104/api/_/readyz" +curl -v "http://localhost:8104/api/_/livez" + +# Documentation and monitoring endpoints - should return 200, skip all validation +curl -v "http://localhost:8104/docs" +curl -v "http://localhost:8104/metrics" +``` + +### 3.2 Test API Key Validation + +```bash +# No API Key - should return 200 (compatibility mode) +curl -v "http://localhost:8104/api/notification/global_templates/list?region=1" + +# Invalid API Key - should return 400/401 +curl -v "http://localhost:8104/api/notification/global_templates/list?region=1" \ + -H "X-API-KEY: invalid_key" + +# Valid API Key - should return 200 and switch to tenant database +curl -v "http://localhost:8104/api/notification/global_templates/list?region=1" \ + -H "X-API-KEY: ak_live_UkIcMxwBXIw.J7qWirjL0IJkmvqktjEh3ViveP8dgiturxyy0KJ5sKk" +``` + +### 3.3 Check Log Output + +View logs in `/apps/notification/log/notification-activity.log`: From f3a5f6321af92658f07604d9b65250b3641da8f8 Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Mon, 22 Sep 2025 10:57:41 +0800 Subject: [PATCH 07/14] refactor(cache): cache the tenant database and recall --- .../notification/webapi/providers/database.py | 237 +++++++++++++++++- 1 file changed, 234 insertions(+), 3 deletions(-) diff --git a/apps/notification/webapi/providers/database.py b/apps/notification/webapi/providers/database.py index f259607..9fb9fe4 100644 --- a/apps/notification/webapi/providers/database.py +++ b/apps/notification/webapi/providers/database.py @@ -1,10 +1,241 @@ from webapi.config.site_settings import site_settings +from beanie import init_beanie +from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase +from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc +from common.config.app_settings import app_settings +from common.log.module_logger import ModuleLogger +import asyncio +from collections import OrderedDict +from typing import Optional, Union +import os + + +# Global variables for database management +MAIN_CLIENT: Optional[AsyncIOMotorClient] = None +TENANT_CACHE: Optional['TenantDBCache'] = None + +# Define document models +document_models = [ + MessageTemplateDoc, + EmailSenderDoc, + EmailSendStatusDoc, + EmailTrackingDoc, + EmailBounceDoc, + UsageLogDoc +] +tenant_document_models = [ + MessageTemplateDoc, + EmailSenderDoc, + EmailSendStatusDoc +] + +class TenantDBCache: + """ + Enhanced tenant database cache that includes Beanie initialization. + product_id -> (AsyncIOMotorClient, AsyncIOMotorDatabase, beanie_initialized: bool) + Uses main_db.tenant_doc to resolve mongodb_uri; caches clients/dbs with LRU and Beanie state. + """ + + def __init__(self, main_db: AsyncIOMotorDatabase, max_size: int = 64): + self.main_db = main_db + self.max_size = max_size + self._cache: "OrderedDict[str, tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]]" = OrderedDict() + self._locks: dict[str, asyncio.Lock] = {} + self._global_lock = asyncio.Lock() + self.module_logger = ModuleLogger(sender_id="TenantDBCache") + + async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase: + """Get tenant database with Beanie already initialized""" + await self.module_logger.log_info(f"๐Ÿ” Starting database retrieval for product_id: {product_id}") + + # fast-path + cached = self._cache.get(product_id) + if cached: + client, db, beanie_initialized = cached + await self.module_logger.log_info(f"๐Ÿ“ฆ Found cached database for {product_id}, beanie_initialized: {beanie_initialized}") + self._cache.move_to_end(product_id) + + if beanie_initialized: + await self.module_logger.log_info(f"โœ… Returning cached database with Beanie already initialized for {product_id}") + return db + else: + # Initialize Beanie if not done yet + await self.module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for cached database {product_id}") + await init_beanie(database=db, document_models=tenant_document_models) + await self.module_logger.log_info(f"โœ… Beanie initialization completed for {product_id}") + + # Update cache with beanie_initialized = True + self._cache[product_id] = (client, db, True) + return db + + # double-checked under per-tenant lock + await self.module_logger.log_info(f"๐Ÿ”’ Acquiring lock for product_id: {product_id}") + lock = self._locks.setdefault(product_id, asyncio.Lock()) + async with lock: + await self.module_logger.log_info(f"๐Ÿ”“ Lock acquired for product_id: {product_id}") + + cached = self._cache.get(product_id) + if cached: + client, db, beanie_initialized = cached + await self.module_logger.log_info(f"๐Ÿ“ฆ Double-check found cached database for {product_id}, beanie_initialized: {beanie_initialized}") + self._cache.move_to_end(product_id) + + if beanie_initialized: + await self.module_logger.log_info(f"โœ… Returning cached database with Beanie already initialized for {product_id}") + return db + else: + # Initialize Beanie if not done yet + await self.module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for cached database {product_id} (double-check)") + await init_beanie(database=db, document_models=tenant_document_models) + await self.module_logger.log_info(f"โœ… Beanie initialization completed for {product_id} (double-check)") + + # Update cache with beanie_initialized = True + self._cache[product_id] = (client, db, True) + return db + + # Create new tenant connection - use raw MongoDB query since we don't have TenantDoc model + await self.module_logger.log_info(f"๐Ÿ” Creating new tenant connection for product_id: {product_id}") + tenant = await self.main_db["tenant_doc"].find_one({"product_id": product_id}) + if not tenant: + await self.module_logger.log_error(f"โŒ Tenant {product_id} does not exist in main database") + raise ValueError(f"Tenant {product_id} does not exist") + if tenant.get("status") != "active": + await self.module_logger.log_error(f"โŒ Tenant {product_id} is not active, status: {tenant.get('status')}") + raise ValueError(f"Tenant {product_id} is not active") + + uri = tenant["mongodb_uri"] + await self.module_logger.log_info(f"๐Ÿ”— Creating Motor client for tenant {product_id} with URI: {uri[:20]}...") + client = AsyncIOMotorClient(uri, minPoolSize=3, maxPoolSize=20, serverSelectionTimeoutMS=10000) + + # robust db name resolution (get_default_database handles mongodb+srv and empty paths) + default_db = client.get_default_database() + if default_db is not None: + db = default_db + await self.module_logger.log_info(f"๐Ÿ“Š Using default database for tenant {product_id}: {db.name}") + else: + # fallback: if no default, just use 'admin' or a convention; or raise + # Here we default to 'admin' to avoid surprises. Customize as needed. + db = client["admin"] + await self.module_logger.log_info(f"๐Ÿ“Š Using fallback database 'admin' for tenant {product_id}") + + # Initialize Beanie for this tenant database + await self.module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for new tenant database {product_id}") + await init_beanie(database=db, document_models=tenant_document_models) + await self.module_logger.log_info(f"โœ… Beanie initialization completed for new tenant database {product_id}") + + # LRU put with beanie_initialized = True + await self._lru_put(product_id, (client, db, True)) + await self.module_logger.log_info(f"๐Ÿ’พ Tenant database {product_id} cached successfully with Beanie initialized") + return db + + async def get_main_db_initialized(self) -> AsyncIOMotorDatabase: + """Get main database with Beanie initialized for tenant models""" + await self.module_logger.log_info("๐Ÿ”ง Initializing Beanie for main database with business models") + # Re-initialize Beanie for main database with business models + await init_beanie(database=self.main_db, document_models=document_models) + await self.module_logger.log_info("โœ… Beanie initialization completed for main database") + return self.main_db + + async def get_db(self, product_id: str) -> AsyncIOMotorDatabase: + """Legacy method for compatibility - just returns database without Beanie initialization""" + cached = self._cache.get(product_id) + if cached: + self._cache.move_to_end(product_id) + return cached[1] + + # Use get_initialized_db and return the database part + db = await self.get_initialized_db(product_id) + return db + + async def _lru_put(self, key: str, value: tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]): + async with self._global_lock: + await self.module_logger.log_info(f"๐Ÿ“ Adding {key} to cache, current cache size: {len(self._cache)}") + self._cache[key] = value + self._cache.move_to_end(key) + if len(self._cache) > self.max_size: + old_key, (old_client, _, _) = self._cache.popitem(last=False) + await self.module_logger.log_info(f"๐Ÿ—‘๏ธ Cache full, removing LRU tenant: {old_key}") + try: + old_client.close() + await self.module_logger.log_info(f"๐Ÿ”Œ Closed connection for evicted tenant: {old_key}") + except Exception as e: + await self.module_logger.log_error(f"โŒ Error closing connection for {old_key}: {str(e)}") + self._locks.pop(old_key, None) + + async def aclose(self): + async with self._global_lock: + await self.module_logger.log_info(f"๐Ÿ”Œ Closing all cached connections, count: {len(self._cache)}") + for key, (client, _, _) in self._cache.items(): + try: + client.close() + await self.module_logger.log_info(f"โœ… Closed connection for tenant: {key}") + except Exception as e: + await self.module_logger.log_error(f"โŒ Error closing connection for {key}: {str(e)}") + self._cache.clear() + self._locks.clear() + await self.module_logger.log_info("๐Ÿงน Tenant cache cleared successfully") def register(app): - """Register database-related configurations""" + """Register database-related configurations and setup""" app.debug = site_settings.DEBUG app.title = site_settings.NAME - # Database initialization is now handled by TenantDBConnectionMiddleware - # to support per-request tenant database switching \ No newline at end of file + @app.on_event("startup") + async def start_database(): + await initiate_database(app) + + @app.on_event("shutdown") + async def shutdown_database(): + await cleanup_database() + + +async def initiate_database(app): + """Initialize main database and tenant cache""" + global MAIN_CLIENT, TENANT_CACHE + + module_logger = ModuleLogger(sender_id="DatabaseInit") + + await module_logger.log_info("๐Ÿš€ Starting database initialization") + + # 1) Create main/catalog client + DB + await module_logger.log_info(f"๐Ÿ”— Creating main MongoDB client with URI: {app_settings.MONGODB_URI[:20]}...") + MAIN_CLIENT = AsyncIOMotorClient(app_settings.MONGODB_URI) + main_db = MAIN_CLIENT[app_settings.MONGODB_NAME] + await module_logger.log_info(f"๐Ÿ“Š Main database created: {app_settings.MONGODB_NAME}") + + # 2) Initialize Beanie for main DB with business document models + await module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for main database with {len(document_models)} document models") + await init_beanie(database=main_db, document_models=document_models) + await module_logger.log_info("โœ… Beanie initialization completed for main database") + + # 3) Create tenant cache that uses main_db lookups to resolve product_id -> tenant db + max_cache_size = getattr(app_settings, 'TENANT_CACHE_MAX', 64) + await module_logger.log_info(f"๐Ÿ“ฆ Creating tenant cache with max size: {max_cache_size}") + TENANT_CACHE = TenantDBCache(main_db, max_size=max_cache_size) + + # 4) Store on app state for middleware to access + app.state.main_db = main_db + app.state.tenant_cache = TENANT_CACHE + + await module_logger.log_info("โœ… Database and tenant cache initialized successfully") + print("Database and tenant cache initialized successfully") + + +async def cleanup_database(): + """Cleanup database connections and cache""" + global MAIN_CLIENT, TENANT_CACHE + + module_logger = ModuleLogger(sender_id="DatabaseCleanup") + await module_logger.log_info("๐Ÿงน Starting database cleanup") + + if TENANT_CACHE: + await module_logger.log_info("๐Ÿ“ฆ Closing tenant cache") + await TENANT_CACHE.aclose() + + if MAIN_CLIENT: + await module_logger.log_info("๐Ÿ”Œ Closing main database client") + MAIN_CLIENT.close() + + await module_logger.log_info("โœ… Database connections closed successfully") + print("Database connections closed successfully") \ No newline at end of file From dc1ebf2481398313cb9bd18457e3fd44563d8472 Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Mon, 22 Sep 2025 10:58:40 +0800 Subject: [PATCH 08/14] feat(cache): use new defined database --- .../tenant_DB_connection_middleware.py | 233 +++++------------- 1 file changed, 60 insertions(+), 173 deletions(-) diff --git a/apps/notification/webapi/middleware/tenant_DB_connection_middleware.py b/apps/notification/webapi/middleware/tenant_DB_connection_middleware.py index c251b93..6af73ce 100644 --- a/apps/notification/webapi/middleware/tenant_DB_connection_middleware.py +++ b/apps/notification/webapi/middleware/tenant_DB_connection_middleware.py @@ -1,191 +1,78 @@ -from fastapi import Request, status, Response -from urllib.parse import urlparse -from motor.motor_asyncio import AsyncIOMotorClient -from beanie import init_beanie -from contextlib import asynccontextmanager -import os - -from common.config.app_settings import app_settings -from common.log.module_logger import ModuleLogger +from fastapi import Request, status +from fastapi.responses import JSONResponse from webapi.middleware.freeleaps_auth_middleware import request_context_var -from backend.models.models import ( - MessageTemplateDoc, - EmailSenderDoc, - EmailSendStatusDoc, - EmailTrackingDoc, - EmailBounceDoc, - UsageLogDoc -) - -# MongoDB config (moved from providers/database.py) -MONGODB_URI = os.getenv('MONGODB_URI') -MONGODB_NAME = os.getenv('MONGODB_NAME') - -# Create MongoDB client (moved from providers/database.py) -client = AsyncIOMotorClient( - MONGODB_URI, - serverSelectionTimeoutMS=60000, - minPoolSize=5, - maxPoolSize=20, - heartbeatFrequencyMS=20000, -) - -# Define all document models (moved from providers/database.py) -document_models = [ - MessageTemplateDoc, - EmailSenderDoc, - EmailSendStatusDoc, - EmailTrackingDoc, - EmailBounceDoc, - UsageLogDoc -] +from common.log.module_logger import ModuleLogger class TenantDBConnectionMiddleware: - """TenantDBConnectionMiddleware: Request-level tenant database isolation middleware - - Depends on Auth middleware to get the product_id after API Key validation. - Must be executed after Auth middleware. - """ - def __init__(self, app): self.app = app self.module_logger = ModuleLogger(sender_id=TenantDBConnectionMiddleware) - + async def __call__(self, scope, receive, send): - """Process request, automatically switch tenant database""" if scope["type"] != "http": - await self.app(scope, receive, send) - return - + return await self.app(scope, receive, send) + request = Request(scope, receive) - - # Check if request has API Key - if not, use default database (compatibility mode) - api_key = request.headers.get("X-API-KEY") - if not api_key or api_key == "": - await self.module_logger.log_info(f"No API Key provided - using default database for path: {request.url.path}") - # Ensure default database connection is available for business logic - await self._ensure_default_database_initialized() - await self.app(scope, receive, send) - return - - # Get product_id from Auth middleware context + + # Get tenant id from auth context (set by FreeleapsAuthMiddleware) + product_id = None try: - request_context = request_context_var.get() - product_id = request_context.product_id - - if not product_id: - await self.module_logger.log_error(f"Request {request.url.path} API Key validation successful but no tenant information") - response = Response( - status_code=status.HTTP_400_BAD_REQUEST, - content='{"detail": "API Key does not contain valid tenant information"}', - media_type="application/json" - ) - await response(scope, receive, send) - return - + ctx = request_context_var.get() + product_id = getattr(ctx, "product_id", None) + await self.module_logger.log_info(f"Retrieved product_id from auth context: {product_id}") except Exception as e: - await self.module_logger.log_error(f"request {request.url.path} cannot get Auth context: {str(e)}") - response = Response( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - content='{"detail": "Auth middleware not executed correctly, please check middleware order"}', - media_type="application/json" - ) - await response(scope, receive, send) - return - + await self.module_logger.log_error(f"Failed to get auth context: {str(e)}") + product_id = None + + # Get tenant cache and main database from app state try: - # Use context manager to ensure request-level database isolation - async with self._get_tenant_database_context(product_id) as tenant_database: - await self.module_logger.log_info(f"Switch to tenant database: {tenant_database.name}") - - await self.app(scope, receive, send) + tenant_cache = request.app.state.tenant_cache + main_db = request.app.state.main_db + await self.module_logger.log_info(f"Retrieved app state - tenant_cache: {'โœ…' if tenant_cache is not None else 'โŒ'}, main_db: {'โœ…' if main_db is not None else 'โŒ'}") + except Exception as e: + await self.module_logger.log_error(f"Failed to get app state: {str(e)}") + response = JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"detail": "Database not properly initialized"} + ) + return await response(scope, receive, send) + + if not product_id: + # Compatibility / public routes: use main database with tenant models initialized + await self.module_logger.log_info(f"No product_id - using main database for path: {request.url.path}") + + # Get main database with Beanie initialized for tenant models + main_db_initialized = await tenant_cache.get_main_db_initialized() + + request.state.db = main_db_initialized + request.state.product_id = None + await self.module_logger.log_info(f"โœ… Successfully initialized main database with tenant models") + return await self.app(scope, receive, send) + + try: + # Get tenant-specific database with Beanie already initialized (cached) + await self.module_logger.log_info(f"Attempting to get tenant database for product_id: {product_id}") + tenant_db = await tenant_cache.get_initialized_db(product_id) + + request.state.db = tenant_db + request.state.product_id = product_id + await self.module_logger.log_info(f"โœ… Successfully retrieved cached tenant database with Beanie for product_id: {product_id}") + return await self.app(scope, receive, send) except ValueError as e: - await self.module_logger.log_error(f"Tenant validation failed: {str(e)}") - response = Response( - status_code=status.HTTP_404_NOT_FOUND, - content=f'{{"detail": "{str(e)}"}}', - media_type="application/json" + # Handle tenant not found or inactive (ValueError from TenantDBCache) + await self.module_logger.log_error(f"Tenant error for {product_id}: {str(e)}") + response = JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={"detail": str(e)} ) - await response(scope, receive, send) - except Exception as e: - await self.module_logger.log_error(f"Failed to get tenant database: {str(e)}") - response = Response( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - content='{"detail": "Database connection failed"}', - media_type="application/json" - ) - await response(scope, receive, send) - - @asynccontextmanager - async def _get_tenant_database_context(self, product_id: str): - """Get tenant database context manager, ensure request-level isolation""" - tenant_client = None - try: - # Use existing main database client to get tenant information - main_db = client[app_settings.MONGODB_NAME] - tenant_collection = main_db.tenant_doc - - tenant_doc = await tenant_collection.find_one({"product_id": product_id}) - if not tenant_doc: - raise ValueError(f"Tenant {product_id} does not exist") - - if tenant_doc.get("status") != "active": - raise ValueError(f"Tenant {product_id} is not active") - - # Create new database connection for tenant (each request has its own connection) - tenant_client = AsyncIOMotorClient( - tenant_doc["mongodb_uri"], - serverSelectionTimeoutMS=10000, - minPoolSize=5, - maxPoolSize=20, - ) - - # Extract database name from URI - db_name = self._extract_db_name_from_uri(tenant_doc["mongodb_uri"]) - database = tenant_client[db_name] - - # Initialize Beanie for this request (using independent database instance) - # TODO: check whether only MessageTemplateDoc is needed - db_models = [MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc] - await init_beanie(database=database, document_models=db_models) - await self.module_logger.log_info(f"Only initialized base model for tenant {product_id}") - - await self.module_logger.log_info(f"Created independent database connection for tenant {product_id}: {db_name}") - - try: - yield database - finally: - # Close connection safely after request - try: - if tenant_client: - tenant_client.close() - await self.module_logger.log_info(f"Closed database connection for tenant {product_id}") - except Exception as close_error: - logger.warning(f"Error closing database connection for tenant {product_id}: {str(close_error)}") - - except Exception as e: - await self.module_logger.log_error(f"Failed to get tenant {product_id} database: {str(e)}") - raise - - def _extract_db_name_from_uri(self, uri: str) -> str: - """Extract database name from MongoDB URI""" - # TODO: check whether app_settings.MONGODB_NAME can be used as default database name - try: - parsed = urlparse(uri) - return parsed.path.lstrip('/') or app_settings.MONGODB_NAME - except Exception: - return app_settings.MONGODB_NAME - - async def _ensure_default_database_initialized(self): - """Ensure default database connection is properly initialized for requests without API Key""" - try: - # Initialize default database with all models for compatibility mode - default_db = client[MONGODB_NAME] - await init_beanie(database=default_db, document_models=document_models) - await self.module_logger.log_info("Default database initialized for compatibility mode") + return await response(scope, receive, send) except Exception as e: - await self.module_logger.log_error(f"Failed to initialize default database: {str(e)}") - \ No newline at end of file + await self.module_logger.log_error(f"Database error for tenant {product_id}: {str(e)}") + response = JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"detail": "Database connection error"} + ) + return await response(scope, receive, send) \ No newline at end of file From e9e863053d7b568ae466b5cf3d309585d068dfb0 Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Mon, 22 Sep 2025 11:01:34 +0800 Subject: [PATCH 09/14] feat(cache): add the cache volume --- apps/notification/common/config/app_settings.py | 1 + apps/notification/tests/integration_tests/local.env | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/notification/common/config/app_settings.py b/apps/notification/common/config/app_settings.py index fb241cb..2b95132 100644 --- a/apps/notification/common/config/app_settings.py +++ b/apps/notification/common/config/app_settings.py @@ -18,6 +18,7 @@ class AppSettings(BaseSettings): MONGODB_URI: str = "" MONGODB_NAME: str = "" + TENANT_CACHE_MAX: int = 64 SYSTEM_USER_ID: str = "" SMS_FROM: str = "" diff --git a/apps/notification/tests/integration_tests/local.env b/apps/notification/tests/integration_tests/local.env index f5b5a2b..b9a1142 100644 --- a/apps/notification/tests/integration_tests/local.env +++ b/apps/notification/tests/integration_tests/local.env @@ -28,12 +28,12 @@ export RABBITMQ_HOST=localhost export RABBITMQ_PORT=5672 export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/keys/ -export AUTH_SERVICE_PORT=9000 +export TENANT_CACHE_MAX=64 # for local environment export MONGODB_URI=mongodb://localhost:27017/ # connectivity from local to alpha #export MONGODB_URI=mongodb+srv://jetli:8IHKx6dZK8BfugGp@freeleaps2.hanbj.mongodb.net/ -export MONGODB_NAME=interview +export MONGODB_NAME=freeleaps2 export FREELEAPS_ENV=local export LOG_BASE_PATH=./log \ No newline at end of file From 24fe043fc3eb7c6b1e67e3c89c389db9ecfafa33 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Mon, 22 Sep 2025 18:02:39 +0800 Subject: [PATCH 10/14] Set up the logging environment for authentication --- apps/authentication/common/config/app_settings.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/authentication/common/config/app_settings.py b/apps/authentication/common/config/app_settings.py index 11d8957..05412a5 100644 --- a/apps/authentication/common/config/app_settings.py +++ b/apps/authentication/common/config/app_settings.py @@ -1,10 +1,11 @@ +import os from pydantic_settings import BaseSettings class AppSettings(BaseSettings): NAME: str = "authentication" APP_NAME: str = NAME - APP_ENV: str = "alpha" + APP_ENV: str = os.environ.get("APP_ENV", "alpha") METRICS_ENABLED: bool = False PROBES_ENABLED: bool = True From 98f2ce4871e2981e3ae794b581ca9915a9b73a99 Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Tue, 23 Sep 2025 13:07:11 +0800 Subject: [PATCH 11/14] refactor(name): rename the middleware --- apps/notification/webapi/middleware/__init__.py | 2 +- ...tion_middleware.py => tenant_DBConnection_middleware.py} | 6 +++--- apps/notification/webapi/providers/middleware.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) rename apps/notification/webapi/middleware/{tenant_DB_connection_middleware.py => tenant_DBConnection_middleware.py} (90%) diff --git a/apps/notification/webapi/middleware/__init__.py b/apps/notification/webapi/middleware/__init__.py index dfd330f..e607c44 100644 --- a/apps/notification/webapi/middleware/__init__.py +++ b/apps/notification/webapi/middleware/__init__.py @@ -1,4 +1,4 @@ from .freeleaps_auth_middleware import FreeleapsAuthMiddleware -from .tenant_DB_connection_middleware import TenantDBConnectionMiddleware +from .tenant_DBConnection_middleware import TenantDBConnectionMiddleware __all__ = ['FreeleapsAuthMiddleware', 'TenantDBConnectionMiddleware'] \ No newline at end of file diff --git a/apps/notification/webapi/middleware/tenant_DB_connection_middleware.py b/apps/notification/webapi/middleware/tenant_DBConnection_middleware.py similarity index 90% rename from apps/notification/webapi/middleware/tenant_DB_connection_middleware.py rename to apps/notification/webapi/middleware/tenant_DBConnection_middleware.py index 6af73ce..d568cd9 100644 --- a/apps/notification/webapi/middleware/tenant_DB_connection_middleware.py +++ b/apps/notification/webapi/middleware/tenant_DBConnection_middleware.py @@ -29,7 +29,7 @@ class TenantDBConnectionMiddleware: try: tenant_cache = request.app.state.tenant_cache main_db = request.app.state.main_db - await self.module_logger.log_info(f"Retrieved app state - tenant_cache: {'โœ…' if tenant_cache is not None else 'โŒ'}, main_db: {'โœ…' if main_db is not None else 'โŒ'}") + await self.module_logger.log_info(f"Retrieved app state - tenant_cache: {'success' if tenant_cache is not None else 'fail'}, main_db: {'success' if main_db is not None else 'fail'}") except Exception as e: await self.module_logger.log_error(f"Failed to get app state: {str(e)}") response = JSONResponse( @@ -47,7 +47,7 @@ class TenantDBConnectionMiddleware: request.state.db = main_db_initialized request.state.product_id = None - await self.module_logger.log_info(f"โœ… Successfully initialized main database with tenant models") + await self.module_logger.log_info(f"Successfully initialized main database with tenant models") return await self.app(scope, receive, send) try: @@ -57,7 +57,7 @@ class TenantDBConnectionMiddleware: request.state.db = tenant_db request.state.product_id = product_id - await self.module_logger.log_info(f"โœ… Successfully retrieved cached tenant database with Beanie for product_id: {product_id}") + await self.module_logger.log_info(f"Successfully retrieved cached tenant database with Beanie for product_id: {product_id}") return await self.app(scope, receive, send) except ValueError as e: diff --git a/apps/notification/webapi/providers/middleware.py b/apps/notification/webapi/providers/middleware.py index 0de606c..ee47f3e 100644 --- a/apps/notification/webapi/providers/middleware.py +++ b/apps/notification/webapi/providers/middleware.py @@ -1,5 +1,5 @@ from webapi.middleware.freeleaps_auth_middleware import FreeleapsAuthMiddleware -from webapi.middleware.tenant_DB_connection_middleware import TenantDBConnectionMiddleware +from webapi.middleware.tenant_DBConnection_middleware import TenantDBConnectionMiddleware def register(app): From 794536c3727d9f5c00bc5cea8890d22f025459f4 Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Tue, 23 Sep 2025 13:08:54 +0800 Subject: [PATCH 12/14] feat(log): delete some logs and unused methond, raise the http error --- .../notification/webapi/providers/database.py | 101 +++++++----------- 1 file changed, 39 insertions(+), 62 deletions(-) diff --git a/apps/notification/webapi/providers/database.py b/apps/notification/webapi/providers/database.py index 9fb9fe4..2f48c57 100644 --- a/apps/notification/webapi/providers/database.py +++ b/apps/notification/webapi/providers/database.py @@ -46,134 +46,123 @@ class TenantDBCache: async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase: """Get tenant database with Beanie already initialized""" - await self.module_logger.log_info(f"๐Ÿ” Starting database retrieval for product_id: {product_id}") # fast-path cached = self._cache.get(product_id) if cached: client, db, beanie_initialized = cached - await self.module_logger.log_info(f"๐Ÿ“ฆ Found cached database for {product_id}, beanie_initialized: {beanie_initialized}") + await self.module_logger.log_info(f"Found cached database for {product_id}, beanie_initialized: {beanie_initialized}") self._cache.move_to_end(product_id) if beanie_initialized: - await self.module_logger.log_info(f"โœ… Returning cached database with Beanie already initialized for {product_id}") return db else: # Initialize Beanie if not done yet - await self.module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for cached database {product_id}") await init_beanie(database=db, document_models=tenant_document_models) - await self.module_logger.log_info(f"โœ… Beanie initialization completed for {product_id}") + await self.module_logger.log_info(f"Beanie initialization completed for {product_id}") # Update cache with beanie_initialized = True self._cache[product_id] = (client, db, True) return db # double-checked under per-tenant lock - await self.module_logger.log_info(f"๐Ÿ”’ Acquiring lock for product_id: {product_id}") lock = self._locks.setdefault(product_id, asyncio.Lock()) async with lock: - await self.module_logger.log_info(f"๐Ÿ”“ Lock acquired for product_id: {product_id}") - cached = self._cache.get(product_id) if cached: client, db, beanie_initialized = cached - await self.module_logger.log_info(f"๐Ÿ“ฆ Double-check found cached database for {product_id}, beanie_initialized: {beanie_initialized}") + await self.module_logger.log_info(f"Double-check found cached database for {product_id}, beanie_initialized: {beanie_initialized}") self._cache.move_to_end(product_id) if beanie_initialized: - await self.module_logger.log_info(f"โœ… Returning cached database with Beanie already initialized for {product_id}") return db else: # Initialize Beanie if not done yet - await self.module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for cached database {product_id} (double-check)") await init_beanie(database=db, document_models=tenant_document_models) - await self.module_logger.log_info(f"โœ… Beanie initialization completed for {product_id} (double-check)") + await self.module_logger.log_info(f"Beanie initialization completed for {product_id} (double-check)") # Update cache with beanie_initialized = True self._cache[product_id] = (client, db, True) return db # Create new tenant connection - use raw MongoDB query since we don't have TenantDoc model - await self.module_logger.log_info(f"๐Ÿ” Creating new tenant connection for product_id: {product_id}") + # tenant_doc content: + # "tenant_name": "magicleaps", + # "product_id": "68a3f19119cfaf36316f6d14", + # "mongodb_uri": "mongodb://localhost:27017/interview", + # "status": "active" tenant = await self.main_db["tenant_doc"].find_one({"product_id": product_id}) if not tenant: - await self.module_logger.log_error(f"โŒ Tenant {product_id} does not exist in main database") - raise ValueError(f"Tenant {product_id} does not exist") + await self.module_logger.log_error(f"Tenant {product_id} does not exist in main database") + raise HTTPException( + status_code=404, + detail=f"Tenant {product_id} does not exist", + headers={"X-Error-Message": f"Tenant {product_id} does not exist"} + ) if tenant.get("status") != "active": - await self.module_logger.log_error(f"โŒ Tenant {product_id} is not active, status: {tenant.get('status')}") - raise ValueError(f"Tenant {product_id} is not active") + await self.module_logger.log_error(f"Tenant {product_id} is not active, status: {tenant.get('status')}") + raise HTTPException( + status_code=403, + detail=f"Tenant {product_id} is not active", + headers={"X-Error-Message": f"Tenant {product_id} is not active, status: {tenant.get('status')}"} + ) uri = tenant["mongodb_uri"] - await self.module_logger.log_info(f"๐Ÿ”— Creating Motor client for tenant {product_id} with URI: {uri[:20]}...") client = AsyncIOMotorClient(uri, minPoolSize=3, maxPoolSize=20, serverSelectionTimeoutMS=10000) # robust db name resolution (get_default_database handles mongodb+srv and empty paths) default_db = client.get_default_database() if default_db is not None: db = default_db - await self.module_logger.log_info(f"๐Ÿ“Š Using default database for tenant {product_id}: {db.name}") + await self.module_logger.log_info(f"Using default database for tenant {product_id}: {db.name}") else: - # fallback: if no default, just use 'admin' or a convention; or raise - # Here we default to 'admin' to avoid surprises. Customize as needed. - db = client["admin"] - await self.module_logger.log_info(f"๐Ÿ“Š Using fallback database 'admin' for tenant {product_id}") - + await self.module_logger.log_error(f"No default database found for tenant {product_id}") + raise HTTPException( + status_code=500, + detail=f"No default database found for tenant {product_id}", + headers={"X-Error-Message": f"No default database found for tenant {product_id}"} + ) # Initialize Beanie for this tenant database - await self.module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for new tenant database {product_id}") await init_beanie(database=db, document_models=tenant_document_models) - await self.module_logger.log_info(f"โœ… Beanie initialization completed for new tenant database {product_id}") + await self.module_logger.log_info(f"Beanie initialization completed for new tenant database {product_id}") # LRU put with beanie_initialized = True await self._lru_put(product_id, (client, db, True)) - await self.module_logger.log_info(f"๐Ÿ’พ Tenant database {product_id} cached successfully with Beanie initialized") + await self.module_logger.log_info(f"Tenant database {product_id} cached successfully with Beanie initialized") return db async def get_main_db_initialized(self) -> AsyncIOMotorDatabase: """Get main database with Beanie initialized for tenant models""" - await self.module_logger.log_info("๐Ÿ”ง Initializing Beanie for main database with business models") # Re-initialize Beanie for main database with business models await init_beanie(database=self.main_db, document_models=document_models) - await self.module_logger.log_info("โœ… Beanie initialization completed for main database") + await self.module_logger.log_info("Beanie initialization completed for main database") return self.main_db - async def get_db(self, product_id: str) -> AsyncIOMotorDatabase: - """Legacy method for compatibility - just returns database without Beanie initialization""" - cached = self._cache.get(product_id) - if cached: - self._cache.move_to_end(product_id) - return cached[1] - - # Use get_initialized_db and return the database part - db = await self.get_initialized_db(product_id) - return db - async def _lru_put(self, key: str, value: tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]): async with self._global_lock: - await self.module_logger.log_info(f"๐Ÿ“ Adding {key} to cache, current cache size: {len(self._cache)}") self._cache[key] = value self._cache.move_to_end(key) if len(self._cache) > self.max_size: old_key, (old_client, _, _) = self._cache.popitem(last=False) - await self.module_logger.log_info(f"๐Ÿ—‘๏ธ Cache full, removing LRU tenant: {old_key}") + await self.module_logger.log_info(f"Cache full, removing LRU tenant: {old_key}") try: old_client.close() - await self.module_logger.log_info(f"๐Ÿ”Œ Closed connection for evicted tenant: {old_key}") + await self.module_logger.log_info(f"Closed connection for evicted tenant: {old_key}") except Exception as e: - await self.module_logger.log_error(f"โŒ Error closing connection for {old_key}: {str(e)}") + await self.module_logger.log_error(f"Error closing connection for {old_key}: {str(e)}") self._locks.pop(old_key, None) async def aclose(self): async with self._global_lock: - await self.module_logger.log_info(f"๐Ÿ”Œ Closing all cached connections, count: {len(self._cache)}") for key, (client, _, _) in self._cache.items(): try: client.close() - await self.module_logger.log_info(f"โœ… Closed connection for tenant: {key}") + await self.module_logger.log_info(f"Closed connection for tenant: {key}") except Exception as e: - await self.module_logger.log_error(f"โŒ Error closing connection for {key}: {str(e)}") + await self.module_logger.log_error(f"Error closing connection for {key}: {str(e)}") self._cache.clear() self._locks.clear() - await self.module_logger.log_info("๐Ÿงน Tenant cache cleared successfully") + await self.module_logger.log_info("Tenant cache cleared successfully") def register(app): @@ -196,30 +185,22 @@ async def initiate_database(app): module_logger = ModuleLogger(sender_id="DatabaseInit") - await module_logger.log_info("๐Ÿš€ Starting database initialization") - # 1) Create main/catalog client + DB - await module_logger.log_info(f"๐Ÿ”— Creating main MongoDB client with URI: {app_settings.MONGODB_URI[:20]}...") MAIN_CLIENT = AsyncIOMotorClient(app_settings.MONGODB_URI) main_db = MAIN_CLIENT[app_settings.MONGODB_NAME] - await module_logger.log_info(f"๐Ÿ“Š Main database created: {app_settings.MONGODB_NAME}") # 2) Initialize Beanie for main DB with business document models - await module_logger.log_info(f"๐Ÿ”ง Initializing Beanie for main database with {len(document_models)} document models") await init_beanie(database=main_db, document_models=document_models) - await module_logger.log_info("โœ… Beanie initialization completed for main database") # 3) Create tenant cache that uses main_db lookups to resolve product_id -> tenant db max_cache_size = getattr(app_settings, 'TENANT_CACHE_MAX', 64) - await module_logger.log_info(f"๐Ÿ“ฆ Creating tenant cache with max size: {max_cache_size}") TENANT_CACHE = TenantDBCache(main_db, max_size=max_cache_size) # 4) Store on app state for middleware to access app.state.main_db = main_db app.state.tenant_cache = TENANT_CACHE - await module_logger.log_info("โœ… Database and tenant cache initialized successfully") - print("Database and tenant cache initialized successfully") + await module_logger.log_info("Database and tenant cache initialized successfully") async def cleanup_database(): @@ -227,15 +208,11 @@ async def cleanup_database(): global MAIN_CLIENT, TENANT_CACHE module_logger = ModuleLogger(sender_id="DatabaseCleanup") - await module_logger.log_info("๐Ÿงน Starting database cleanup") if TENANT_CACHE: - await module_logger.log_info("๐Ÿ“ฆ Closing tenant cache") await TENANT_CACHE.aclose() if MAIN_CLIENT: - await module_logger.log_info("๐Ÿ”Œ Closing main database client") MAIN_CLIENT.close() - await module_logger.log_info("โœ… Database connections closed successfully") - print("Database connections closed successfully") \ No newline at end of file + await module_logger.log_info("Database connections closed successfully") \ No newline at end of file From 769e32a6e3529f3a05878d9ebf19294b4084ee3e Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Tue, 23 Sep 2025 13:18:35 +0800 Subject: [PATCH 13/14] refactor(comment): use new comment type --- apps/notification/webapi/providers/database.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/apps/notification/webapi/providers/database.py b/apps/notification/webapi/providers/database.py index 2f48c57..7f6fc01 100644 --- a/apps/notification/webapi/providers/database.py +++ b/apps/notification/webapi/providers/database.py @@ -1,5 +1,6 @@ from webapi.config.site_settings import site_settings from beanie import init_beanie +from fastapi import HTTPException from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc from common.config.app_settings import app_settings @@ -86,11 +87,15 @@ class TenantDBCache: return db # Create new tenant connection - use raw MongoDB query since we don't have TenantDoc model - # tenant_doc content: - # "tenant_name": "magicleaps", - # "product_id": "68a3f19119cfaf36316f6d14", - # "mongodb_uri": "mongodb://localhost:27017/interview", - # "status": "active" + """ + tenant_doc content: + { + "tenant_name": "magicleaps", + "product_id": "68a3f19119cfaf36316f6d14", + "mongodb_uri": "mongodb://localhost:27017/interview", + "status": "active" + } + """ tenant = await self.main_db["tenant_doc"].find_one({"product_id": product_id}) if not tenant: await self.module_logger.log_error(f"Tenant {product_id} does not exist in main database") From 5b521c4ec488bd43b35648e0339deaa473aeaf87 Mon Sep 17 00:00:00 2001 From: weicao Date: Tue, 23 Sep 2025 16:23:59 +0800 Subject: [PATCH 14/14] 1d and 1m --- .../services/starrocks_metrics_service.py | 141 ++++++++++++------ .../routes/starrocks_metrics/metrics_query.py | 2 + 2 files changed, 100 insertions(+), 43 deletions(-) diff --git a/apps/metrics/backend/services/starrocks_metrics_service.py b/apps/metrics/backend/services/starrocks_metrics_service.py index 7c5f45b..fbf3080 100644 --- a/apps/metrics/backend/services/starrocks_metrics_service.py +++ b/apps/metrics/backend/services/starrocks_metrics_service.py @@ -92,6 +92,7 @@ class StarRocksMetricsService: self, product_id: str, metric_name: str, + step: str, start_date: Union[str, date], end_date: Union[str, date] ) -> List[Dict[str, Any]]: @@ -162,6 +163,14 @@ class StarRocksMetricsService: else: end_dt = end_date + # Normalize and validate step (default '1d') + step = step or '1d' + if step not in {"1d", "1m"}: + raise HTTPException( + status_code=400, + detail="Invalid step. Supported values are '1d' and '1m'" + ) + # Validate date range if start_dt >= end_dt: raise HTTPException( @@ -191,7 +200,14 @@ class StarRocksMetricsService: ) # Parse the result and format it - formatted_data = self._format_query_result(result, metric_name, product_id, start_dt, end_dt) + formatted_data = self._format_query_result( + starrocks_result=result, + metric_name=metric_name, + product_id=product_id, + step=step, + start_date=start_dt, + end_date=end_dt + ) await self.module_logger.log_info( f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points") @@ -201,7 +217,7 @@ class StarRocksMetricsService: await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}") raise - def _format_query_result(self, starrocks_result: List[Dict[str, Any]], metric_name: str, product_id: str, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: + def _format_query_result(self, starrocks_result: List[Dict[str, Any]], metric_name: str, product_id: str, step: str, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """ Format StarRocks query result into the required format and fill missing dates with 0 values. @@ -219,25 +235,42 @@ class StarRocksMetricsService: result_dict = {} for row in starrocks_result: - # Format the date + # Normalize the date according to step granularity date_value = row.get("date") - if date_value: - if isinstance(date_value, str): - date_str = date_value - else: - # If it's a datetime object, format it as a string - if hasattr(date_value, 'strftime'): - # Convert to date first, then format consistently - if hasattr(date_value, 'date'): - date_obj = date_value.date() if hasattr(date_value, 'date') else date_value - else: - date_obj = date_value - date_str = date_obj.strftime('%Y-%m-%d') + ' 00:00:00' - else: - date_str = str(date_value) - else: + if not date_value: continue + def month_start(d: datetime) -> datetime: + return datetime(d.year, d.month, 1) + + # Parse and normalize + if isinstance(date_value, str): + parsed_dt = None + for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'): + try: + parsed_dt = datetime.strptime(date_value, fmt) + break + except ValueError: + continue + if parsed_dt is None: + date_str = str(date_value) + else: + if step == '1m': + date_str = month_start(parsed_dt).strftime('%Y-%m-01 00:00:00') + else: + date_str = parsed_dt.strftime('%Y-%m-%d') + ' 00:00:00' + else: + if hasattr(date_value, 'strftime'): + dt_obj = date_value + if step == '1m': + date_str = month_start(dt_obj).strftime('%Y-%m-01 00:00:00') + else: + if hasattr(dt_obj, 'date'): + dt_obj = dt_obj.date() + date_str = dt_obj.strftime('%Y-%m-%d') + ' 00:00:00' + else: + date_str = str(date_value) + # Get the value value = row.get("value", 0) if value is None: @@ -256,32 +289,54 @@ class StarRocksMetricsService: "labels": labels } - # Generate complete date range and fill missing dates with 0 + # Generate complete range and fill missing points with 0 formatted_data = [] - current_date = start_date.date() - end_date_only = end_date.date() - - while current_date < end_date_only: - date_str = current_date.strftime('%Y-%m-%d') + ' 00:00:00' - - if date_str in result_dict: - # Use existing data - formatted_data.append(result_dict[date_str]) - else: - # Fill missing date with 0 value - labels = { - "product_id": product_id, - "metric_type": metric_name - } - - formatted_data.append({ - "date": date_str, - "value": 0, - "metric": metric_name, - "labels": labels - }) - - current_date += timedelta(days=1) + if step == '1d': + current_dt = datetime(start_date.year, start_date.month, start_date.day) + end_dt_exclusive = datetime(end_date.year, end_date.month, end_date.day) + while current_dt < end_dt_exclusive: + date_str = current_dt.strftime('%Y-%m-%d') + ' 00:00:00' + if date_str in result_dict: + formatted_data.append(result_dict[date_str]) + else: + labels = { + "product_id": product_id, + "metric_type": metric_name + } + formatted_data.append({ + "date": date_str, + "value": 0, + "metric": metric_name, + "labels": labels + }) + current_dt += timedelta(days=1) + elif step == '1m': + def month_start(d: datetime) -> datetime: + return datetime(d.year, d.month, 1) + + def add_one_month(d: datetime) -> datetime: + year = d.year + (1 if d.month == 12 else 0) + month = 1 if d.month == 12 else d.month + 1 + return datetime(year, month, 1) + + current_dt = month_start(start_date) + end_month_exclusive = month_start(end_date) + while current_dt < end_month_exclusive: + date_str = current_dt.strftime('%Y-%m-01 00:00:00') + if date_str in result_dict: + formatted_data.append(result_dict[date_str]) + else: + labels = { + "product_id": product_id, + "metric_type": metric_name + } + formatted_data.append({ + "date": date_str, + "value": 0, + "metric": metric_name, + "labels": labels + }) + current_dt = add_one_month(current_dt) return formatted_data diff --git a/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py b/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py index 37ac3ea..a035a05 100644 --- a/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py +++ b/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py @@ -26,6 +26,7 @@ class MetricQueryRequest(BaseModel): """Request model for metric query.""" product_id: str = Field(..., description="Product ID to identify which product's data to query") metric_name: str = Field(..., description="Name of the metric to query") + step: str = Field(..., description="Aggregation step, e.g., 1d or 1m") start_date: str = Field(..., description="Start date in YYYY-MM-DD HH:MM:SS format") end_date: str = Field(..., description="End date in YYYY-MM-DD HH:MM:SS format") @@ -53,6 +54,7 @@ async def metrics_query( data_points = await starrocks_service.query_metric_by_time_range( product_id=request.product_id, metric_name=request.metric_name, + step=request.step, start_date=request.start_date, end_date=request.end_date )