Merge pull request 'merge dev to master' (#69) from dev into master
Reviewed-on: freeleaps/freeleaps-service-hub#69
This commit is contained in:
commit
6500c5b23f
@ -1,10 +1,11 @@
|
||||
import os
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class AppSettings(BaseSettings):
|
||||
NAME: str = "authentication"
|
||||
APP_NAME: str = NAME
|
||||
APP_ENV: str = "alpha"
|
||||
APP_ENV: str = os.environ.get("APP_ENV", "alpha")
|
||||
|
||||
METRICS_ENABLED: bool = False
|
||||
PROBES_ENABLED: bool = True
|
||||
|
||||
@ -92,6 +92,7 @@ class StarRocksMetricsService:
|
||||
self,
|
||||
product_id: str,
|
||||
metric_name: str,
|
||||
step: str,
|
||||
start_date: Union[str, date],
|
||||
end_date: Union[str, date]
|
||||
) -> List[Dict[str, Any]]:
|
||||
@ -162,6 +163,14 @@ class StarRocksMetricsService:
|
||||
else:
|
||||
end_dt = end_date
|
||||
|
||||
# Normalize and validate step (default '1d')
|
||||
step = step or '1d'
|
||||
if step not in {"1d", "1m"}:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Invalid step. Supported values are '1d' and '1m'"
|
||||
)
|
||||
|
||||
# Validate date range
|
||||
if start_dt >= end_dt:
|
||||
raise HTTPException(
|
||||
@ -191,7 +200,14 @@ class StarRocksMetricsService:
|
||||
)
|
||||
|
||||
# Parse the result and format it
|
||||
formatted_data = self._format_query_result(result, metric_name, product_id, start_dt, end_dt)
|
||||
formatted_data = self._format_query_result(
|
||||
starrocks_result=result,
|
||||
metric_name=metric_name,
|
||||
product_id=product_id,
|
||||
step=step,
|
||||
start_date=start_dt,
|
||||
end_date=end_dt
|
||||
)
|
||||
|
||||
await self.module_logger.log_info(
|
||||
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
|
||||
@ -201,7 +217,7 @@ class StarRocksMetricsService:
|
||||
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
|
||||
raise
|
||||
|
||||
def _format_query_result(self, starrocks_result: List[Dict[str, Any]], metric_name: str, product_id: str, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
|
||||
def _format_query_result(self, starrocks_result: List[Dict[str, Any]], metric_name: str, product_id: str, step: str, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Format StarRocks query result into the required format and fill missing dates with 0 values.
|
||||
|
||||
@ -219,25 +235,42 @@ class StarRocksMetricsService:
|
||||
result_dict = {}
|
||||
|
||||
for row in starrocks_result:
|
||||
# Format the date
|
||||
# Normalize the date according to step granularity
|
||||
date_value = row.get("date")
|
||||
if date_value:
|
||||
if isinstance(date_value, str):
|
||||
date_str = date_value
|
||||
else:
|
||||
# If it's a datetime object, format it as a string
|
||||
if hasattr(date_value, 'strftime'):
|
||||
# Convert to date first, then format consistently
|
||||
if hasattr(date_value, 'date'):
|
||||
date_obj = date_value.date() if hasattr(date_value, 'date') else date_value
|
||||
else:
|
||||
date_obj = date_value
|
||||
date_str = date_obj.strftime('%Y-%m-%d') + ' 00:00:00'
|
||||
else:
|
||||
date_str = str(date_value)
|
||||
else:
|
||||
if not date_value:
|
||||
continue
|
||||
|
||||
def month_start(d: datetime) -> datetime:
|
||||
return datetime(d.year, d.month, 1)
|
||||
|
||||
# Parse and normalize
|
||||
if isinstance(date_value, str):
|
||||
parsed_dt = None
|
||||
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'):
|
||||
try:
|
||||
parsed_dt = datetime.strptime(date_value, fmt)
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
if parsed_dt is None:
|
||||
date_str = str(date_value)
|
||||
else:
|
||||
if step == '1m':
|
||||
date_str = month_start(parsed_dt).strftime('%Y-%m-01 00:00:00')
|
||||
else:
|
||||
date_str = parsed_dt.strftime('%Y-%m-%d') + ' 00:00:00'
|
||||
else:
|
||||
if hasattr(date_value, 'strftime'):
|
||||
dt_obj = date_value
|
||||
if step == '1m':
|
||||
date_str = month_start(dt_obj).strftime('%Y-%m-01 00:00:00')
|
||||
else:
|
||||
if hasattr(dt_obj, 'date'):
|
||||
dt_obj = dt_obj.date()
|
||||
date_str = dt_obj.strftime('%Y-%m-%d') + ' 00:00:00'
|
||||
else:
|
||||
date_str = str(date_value)
|
||||
|
||||
# Get the value
|
||||
value = row.get("value", 0)
|
||||
if value is None:
|
||||
@ -256,32 +289,54 @@ class StarRocksMetricsService:
|
||||
"labels": labels
|
||||
}
|
||||
|
||||
# Generate complete date range and fill missing dates with 0
|
||||
# Generate complete range and fill missing points with 0
|
||||
formatted_data = []
|
||||
current_date = start_date.date()
|
||||
end_date_only = end_date.date()
|
||||
if step == '1d':
|
||||
current_dt = datetime(start_date.year, start_date.month, start_date.day)
|
||||
end_dt_exclusive = datetime(end_date.year, end_date.month, end_date.day)
|
||||
while current_dt < end_dt_exclusive:
|
||||
date_str = current_dt.strftime('%Y-%m-%d') + ' 00:00:00'
|
||||
if date_str in result_dict:
|
||||
formatted_data.append(result_dict[date_str])
|
||||
else:
|
||||
labels = {
|
||||
"product_id": product_id,
|
||||
"metric_type": metric_name
|
||||
}
|
||||
formatted_data.append({
|
||||
"date": date_str,
|
||||
"value": 0,
|
||||
"metric": metric_name,
|
||||
"labels": labels
|
||||
})
|
||||
current_dt += timedelta(days=1)
|
||||
elif step == '1m':
|
||||
def month_start(d: datetime) -> datetime:
|
||||
return datetime(d.year, d.month, 1)
|
||||
|
||||
while current_date < end_date_only:
|
||||
date_str = current_date.strftime('%Y-%m-%d') + ' 00:00:00'
|
||||
def add_one_month(d: datetime) -> datetime:
|
||||
year = d.year + (1 if d.month == 12 else 0)
|
||||
month = 1 if d.month == 12 else d.month + 1
|
||||
return datetime(year, month, 1)
|
||||
|
||||
if date_str in result_dict:
|
||||
# Use existing data
|
||||
formatted_data.append(result_dict[date_str])
|
||||
else:
|
||||
# Fill missing date with 0 value
|
||||
labels = {
|
||||
"product_id": product_id,
|
||||
"metric_type": metric_name
|
||||
}
|
||||
|
||||
formatted_data.append({
|
||||
"date": date_str,
|
||||
"value": 0,
|
||||
"metric": metric_name,
|
||||
"labels": labels
|
||||
})
|
||||
|
||||
current_date += timedelta(days=1)
|
||||
current_dt = month_start(start_date)
|
||||
end_month_exclusive = month_start(end_date)
|
||||
while current_dt < end_month_exclusive:
|
||||
date_str = current_dt.strftime('%Y-%m-01 00:00:00')
|
||||
if date_str in result_dict:
|
||||
formatted_data.append(result_dict[date_str])
|
||||
else:
|
||||
labels = {
|
||||
"product_id": product_id,
|
||||
"metric_type": metric_name
|
||||
}
|
||||
formatted_data.append({
|
||||
"date": date_str,
|
||||
"value": 0,
|
||||
"metric": metric_name,
|
||||
"labels": labels
|
||||
})
|
||||
current_dt = add_one_month(current_dt)
|
||||
|
||||
return formatted_data
|
||||
|
||||
|
||||
@ -26,6 +26,7 @@ class MetricQueryRequest(BaseModel):
|
||||
"""Request model for metric query."""
|
||||
product_id: str = Field(..., description="Product ID to identify which product's data to query")
|
||||
metric_name: str = Field(..., description="Name of the metric to query")
|
||||
step: str = Field(..., description="Aggregation step, e.g., 1d or 1m")
|
||||
start_date: str = Field(..., description="Start date in YYYY-MM-DD HH:MM:SS format")
|
||||
end_date: str = Field(..., description="End date in YYYY-MM-DD HH:MM:SS format")
|
||||
|
||||
@ -53,6 +54,7 @@ async def metrics_query(
|
||||
data_points = await starrocks_service.query_metric_by_time_range(
|
||||
product_id=request.product_id,
|
||||
metric_name=request.metric_name,
|
||||
step=request.step,
|
||||
start_date=request.start_date,
|
||||
end_date=request.end_date
|
||||
)
|
||||
|
||||
@ -16,6 +16,10 @@ class AppSettings(BaseSettings):
|
||||
RABBITMQ_PASSWORD: str = ""
|
||||
RABBITMQ_VIRTUAL_HOST: str = ""
|
||||
|
||||
MONGODB_URI: str = ""
|
||||
MONGODB_NAME: str = ""
|
||||
TENANT_CACHE_MAX: int = 64
|
||||
|
||||
SYSTEM_USER_ID: str = ""
|
||||
SMS_FROM: str = ""
|
||||
EMAIL_FROM: str = ""
|
||||
|
||||
@ -28,12 +28,12 @@ export RABBITMQ_HOST=localhost
|
||||
export RABBITMQ_PORT=5672
|
||||
|
||||
export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/keys/
|
||||
export AUTH_SERVICE_PORT=9000
|
||||
export TENANT_CACHE_MAX=64
|
||||
|
||||
# for local environment
|
||||
export MONGODB_URI=mongodb://localhost:27017/
|
||||
# connectivity from local to alpha
|
||||
#export MONGODB_URI=mongodb+srv://jetli:8IHKx6dZK8BfugGp@freeleaps2.hanbj.mongodb.net/
|
||||
export MONGODB_NAME=interview
|
||||
export MONGODB_NAME=freeleaps2
|
||||
export FREELEAPS_ENV=local
|
||||
export LOG_BASE_PATH=./log
|
||||
130
apps/notification/webapi/middleware/README.md
Normal file
130
apps/notification/webapi/middleware/README.md
Normal file
@ -0,0 +1,130 @@
|
||||
# Notification Service Middleware Guide
|
||||
|
||||
This guide explains how to use and test the middleware system of the notification service.
|
||||
|
||||
## Middleware Architecture
|
||||
|
||||
### Middleware Execution Order
|
||||
```
|
||||
Client Request → FreeleapsAuthMiddleware → TenantDBConnectionMiddleware → Business Logic
|
||||
```
|
||||
|
||||
1. **FreeleapsAuthMiddleware**: API Key validation and path skipping
|
||||
2. **TenantDBConnectionMiddleware**: Tenant database switching (based on product_id)
|
||||
|
||||
## 1. Setup API Key
|
||||
|
||||
### 1.1 Register API Key via freeleaps-auth service
|
||||
|
||||
Ensure the freeleaps-auth service is running on port 9000, then execute the following commands:
|
||||
|
||||
```bash
|
||||
# Register API KEY for magicleaps tenant
|
||||
curl -X POST "http://localhost:9000/api/v1/keys/register_api_key" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"tenant_name": "magicleaps",
|
||||
"product_id": "68a3f19119cfaf36316f6d14",
|
||||
"scopes": ["notify.send_notification"],
|
||||
"expires_at": "2099-12-31T23:59:59Z"
|
||||
}'
|
||||
|
||||
# Register API KEY for test-a tenant
|
||||
curl -X POST "http://localhost:9000/api/v1/keys/register_api_key" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"tenant_name": "test-a",
|
||||
"product_id": "68a3f19119cfaf36316f6d15",
|
||||
"scopes": ["notify.send_notification"],
|
||||
"expires_at": "2099-12-31T23:59:59Z"
|
||||
}'
|
||||
|
||||
# Register API KEY for test-b tenant
|
||||
curl -X POST "http://localhost:9000/api/v1/keys/register_api_key" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"tenant_name": "test-b",
|
||||
"product_id": "68a3f19119cfaf36316f6d16",
|
||||
"scopes": ["notify.send_notification"],
|
||||
"expires_at": "2099-12-31T23:59:59Z"
|
||||
}'
|
||||
```
|
||||
|
||||
### 1.2 Record the returned API KEY
|
||||
|
||||
Example successful response:
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"data": {
|
||||
"tenant_name": "magicleaps",
|
||||
"product_id": "68a3f19119cfaf36316f6d14",
|
||||
"api_key": {
|
||||
"key_id": "ak_live_UkIcMxwBXIw",
|
||||
"api_key": "ak_live_UkIcMxwBXIw.J7qWirjL0IJkmvqktjEh3ViveP8dgiturxyy0KJ5sKk",
|
||||
"status": "active"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 2. Configure Tenant Database
|
||||
|
||||
Create tenant documents in the `tenant_doc` collection of the main database:
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"tenant_name": "magicleaps",
|
||||
"product_id": "68a3f19119cfaf36316f6d14",
|
||||
"mongodb_uri": "mongodb://localhost:27017/interview",
|
||||
"status": "active"
|
||||
},
|
||||
{
|
||||
"tenant_name": "test-a",
|
||||
"product_id": "68a3f19119cfaf36316f6d15",
|
||||
"mongodb_uri": "mongodb://localhost:27017/test-a",
|
||||
"status": "active"
|
||||
},
|
||||
{
|
||||
"tenant_name": "test-b",
|
||||
"product_id": "68a3f19119cfaf36316f6d16",
|
||||
"mongodb_uri": "mongodb://localhost:27017/test-b",
|
||||
"status": "active"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## 3. Test Middleware Functionality
|
||||
|
||||
### 3.1 Test System Endpoints (Skip Validation)
|
||||
|
||||
```bash
|
||||
# Health check endpoints - should return 200, skip all validation
|
||||
curl -v "http://localhost:8104/api/_/healthz"
|
||||
curl -v "http://localhost:8104/api/_/readyz"
|
||||
curl -v "http://localhost:8104/api/_/livez"
|
||||
|
||||
# Documentation and monitoring endpoints - should return 200, skip all validation
|
||||
curl -v "http://localhost:8104/docs"
|
||||
curl -v "http://localhost:8104/metrics"
|
||||
```
|
||||
|
||||
### 3.2 Test API Key Validation
|
||||
|
||||
```bash
|
||||
# No API Key - should return 200 (compatibility mode)
|
||||
curl -v "http://localhost:8104/api/notification/global_templates/list?region=1"
|
||||
|
||||
# Invalid API Key - should return 400/401
|
||||
curl -v "http://localhost:8104/api/notification/global_templates/list?region=1" \
|
||||
-H "X-API-KEY: invalid_key"
|
||||
|
||||
# Valid API Key - should return 200 and switch to tenant database
|
||||
curl -v "http://localhost:8104/api/notification/global_templates/list?region=1" \
|
||||
-H "X-API-KEY: ak_live_UkIcMxwBXIw.J7qWirjL0IJkmvqktjEh3ViveP8dgiturxyy0KJ5sKk"
|
||||
```
|
||||
|
||||
### 3.3 Check Log Output
|
||||
|
||||
View logs in `/apps/notification/log/notification-activity.log`:
|
||||
@ -1,3 +1,4 @@
|
||||
from .freeleaps_auth_middleware import FreeleapsAuthMiddleware
|
||||
from .tenant_DBConnection_middleware import TenantDBConnectionMiddleware
|
||||
|
||||
__all__ = ['FreeleapsAuthMiddleware']
|
||||
__all__ = ['FreeleapsAuthMiddleware', 'TenantDBConnectionMiddleware']
|
||||
@ -0,0 +1,78 @@
|
||||
from fastapi import Request, status
|
||||
from fastapi.responses import JSONResponse
|
||||
from webapi.middleware.freeleaps_auth_middleware import request_context_var
|
||||
from common.log.module_logger import ModuleLogger
|
||||
|
||||
|
||||
class TenantDBConnectionMiddleware:
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
self.module_logger = ModuleLogger(sender_id=TenantDBConnectionMiddleware)
|
||||
|
||||
async def __call__(self, scope, receive, send):
|
||||
if scope["type"] != "http":
|
||||
return await self.app(scope, receive, send)
|
||||
|
||||
request = Request(scope, receive)
|
||||
|
||||
# Get tenant id from auth context (set by FreeleapsAuthMiddleware)
|
||||
product_id = None
|
||||
try:
|
||||
ctx = request_context_var.get()
|
||||
product_id = getattr(ctx, "product_id", None)
|
||||
await self.module_logger.log_info(f"Retrieved product_id from auth context: {product_id}")
|
||||
except Exception as e:
|
||||
await self.module_logger.log_error(f"Failed to get auth context: {str(e)}")
|
||||
product_id = None
|
||||
|
||||
# Get tenant cache and main database from app state
|
||||
try:
|
||||
tenant_cache = request.app.state.tenant_cache
|
||||
main_db = request.app.state.main_db
|
||||
await self.module_logger.log_info(f"Retrieved app state - tenant_cache: {'success' if tenant_cache is not None else 'fail'}, main_db: {'success' if main_db is not None else 'fail'}")
|
||||
except Exception as e:
|
||||
await self.module_logger.log_error(f"Failed to get app state: {str(e)}")
|
||||
response = JSONResponse(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
content={"detail": "Database not properly initialized"}
|
||||
)
|
||||
return await response(scope, receive, send)
|
||||
|
||||
if not product_id:
|
||||
# Compatibility / public routes: use main database with tenant models initialized
|
||||
await self.module_logger.log_info(f"No product_id - using main database for path: {request.url.path}")
|
||||
|
||||
# Get main database with Beanie initialized for tenant models
|
||||
main_db_initialized = await tenant_cache.get_main_db_initialized()
|
||||
|
||||
request.state.db = main_db_initialized
|
||||
request.state.product_id = None
|
||||
await self.module_logger.log_info(f"Successfully initialized main database with tenant models")
|
||||
return await self.app(scope, receive, send)
|
||||
|
||||
try:
|
||||
# Get tenant-specific database with Beanie already initialized (cached)
|
||||
await self.module_logger.log_info(f"Attempting to get tenant database for product_id: {product_id}")
|
||||
tenant_db = await tenant_cache.get_initialized_db(product_id)
|
||||
|
||||
request.state.db = tenant_db
|
||||
request.state.product_id = product_id
|
||||
await self.module_logger.log_info(f"Successfully retrieved cached tenant database with Beanie for product_id: {product_id}")
|
||||
return await self.app(scope, receive, send)
|
||||
|
||||
except ValueError as e:
|
||||
# Handle tenant not found or inactive (ValueError from TenantDBCache)
|
||||
await self.module_logger.log_error(f"Tenant error for {product_id}: {str(e)}")
|
||||
response = JSONResponse(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
content={"detail": str(e)}
|
||||
)
|
||||
return await response(scope, receive, send)
|
||||
|
||||
except Exception as e:
|
||||
await self.module_logger.log_error(f"Database error for tenant {product_id}: {str(e)}")
|
||||
response = JSONResponse(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
content={"detail": "Database connection error"}
|
||||
)
|
||||
return await response(scope, receive, send)
|
||||
@ -1,23 +1,21 @@
|
||||
from webapi.config.site_settings import site_settings
|
||||
from beanie import init_beanie
|
||||
from motor.motor_asyncio import AsyncIOMotorClient
|
||||
from fastapi import HTTPException
|
||||
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
|
||||
from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc
|
||||
from common.config.app_settings import app_settings
|
||||
from common.log.module_logger import ModuleLogger
|
||||
import asyncio
|
||||
from collections import OrderedDict
|
||||
from typing import Optional, Union
|
||||
import os
|
||||
|
||||
# MongoDB config
|
||||
MONGODB_URI = os.getenv('MONGODB_URI')
|
||||
MONGODB_NAME = os.getenv('MONGODB_NAME')
|
||||
|
||||
# create MongoDB client
|
||||
client = AsyncIOMotorClient(
|
||||
MONGODB_URI,
|
||||
serverSelectionTimeoutMS=60000,
|
||||
minPoolSize=5,
|
||||
maxPoolSize=20,
|
||||
heartbeatFrequencyMS=20000,
|
||||
)
|
||||
# Global variables for database management
|
||||
MAIN_CLIENT: Optional[AsyncIOMotorClient] = None
|
||||
TENANT_CACHE: Optional['TenantDBCache'] = None
|
||||
|
||||
# define all document models
|
||||
# Define document models
|
||||
document_models = [
|
||||
MessageTemplateDoc,
|
||||
EmailSenderDoc,
|
||||
@ -26,19 +24,200 @@ document_models = [
|
||||
EmailBounceDoc,
|
||||
UsageLogDoc
|
||||
]
|
||||
tenant_document_models = [
|
||||
MessageTemplateDoc,
|
||||
EmailSenderDoc,
|
||||
EmailSendStatusDoc
|
||||
]
|
||||
|
||||
class TenantDBCache:
|
||||
"""
|
||||
Enhanced tenant database cache that includes Beanie initialization.
|
||||
product_id -> (AsyncIOMotorClient, AsyncIOMotorDatabase, beanie_initialized: bool)
|
||||
Uses main_db.tenant_doc to resolve mongodb_uri; caches clients/dbs with LRU and Beanie state.
|
||||
"""
|
||||
|
||||
def __init__(self, main_db: AsyncIOMotorDatabase, max_size: int = 64):
|
||||
self.main_db = main_db
|
||||
self.max_size = max_size
|
||||
self._cache: "OrderedDict[str, tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]]" = OrderedDict()
|
||||
self._locks: dict[str, asyncio.Lock] = {}
|
||||
self._global_lock = asyncio.Lock()
|
||||
self.module_logger = ModuleLogger(sender_id="TenantDBCache")
|
||||
|
||||
async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase:
|
||||
"""Get tenant database with Beanie already initialized"""
|
||||
|
||||
# fast-path
|
||||
cached = self._cache.get(product_id)
|
||||
if cached:
|
||||
client, db, beanie_initialized = cached
|
||||
await self.module_logger.log_info(f"Found cached database for {product_id}, beanie_initialized: {beanie_initialized}")
|
||||
self._cache.move_to_end(product_id)
|
||||
|
||||
if beanie_initialized:
|
||||
return db
|
||||
else:
|
||||
# Initialize Beanie if not done yet
|
||||
await init_beanie(database=db, document_models=tenant_document_models)
|
||||
await self.module_logger.log_info(f"Beanie initialization completed for {product_id}")
|
||||
|
||||
# Update cache with beanie_initialized = True
|
||||
self._cache[product_id] = (client, db, True)
|
||||
return db
|
||||
|
||||
# double-checked under per-tenant lock
|
||||
lock = self._locks.setdefault(product_id, asyncio.Lock())
|
||||
async with lock:
|
||||
cached = self._cache.get(product_id)
|
||||
if cached:
|
||||
client, db, beanie_initialized = cached
|
||||
await self.module_logger.log_info(f"Double-check found cached database for {product_id}, beanie_initialized: {beanie_initialized}")
|
||||
self._cache.move_to_end(product_id)
|
||||
|
||||
if beanie_initialized:
|
||||
return db
|
||||
else:
|
||||
# Initialize Beanie if not done yet
|
||||
await init_beanie(database=db, document_models=tenant_document_models)
|
||||
await self.module_logger.log_info(f"Beanie initialization completed for {product_id} (double-check)")
|
||||
|
||||
# Update cache with beanie_initialized = True
|
||||
self._cache[product_id] = (client, db, True)
|
||||
return db
|
||||
|
||||
# Create new tenant connection - use raw MongoDB query since we don't have TenantDoc model
|
||||
"""
|
||||
tenant_doc content:
|
||||
{
|
||||
"tenant_name": "magicleaps",
|
||||
"product_id": "68a3f19119cfaf36316f6d14",
|
||||
"mongodb_uri": "mongodb://localhost:27017/interview",
|
||||
"status": "active"
|
||||
}
|
||||
"""
|
||||
tenant = await self.main_db["tenant_doc"].find_one({"product_id": product_id})
|
||||
if not tenant:
|
||||
await self.module_logger.log_error(f"Tenant {product_id} does not exist in main database")
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"Tenant {product_id} does not exist",
|
||||
headers={"X-Error-Message": f"Tenant {product_id} does not exist"}
|
||||
)
|
||||
if tenant.get("status") != "active":
|
||||
await self.module_logger.log_error(f"Tenant {product_id} is not active, status: {tenant.get('status')}")
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail=f"Tenant {product_id} is not active",
|
||||
headers={"X-Error-Message": f"Tenant {product_id} is not active, status: {tenant.get('status')}"}
|
||||
)
|
||||
|
||||
uri = tenant["mongodb_uri"]
|
||||
client = AsyncIOMotorClient(uri, minPoolSize=3, maxPoolSize=20, serverSelectionTimeoutMS=10000)
|
||||
|
||||
# robust db name resolution (get_default_database handles mongodb+srv and empty paths)
|
||||
default_db = client.get_default_database()
|
||||
if default_db is not None:
|
||||
db = default_db
|
||||
await self.module_logger.log_info(f"Using default database for tenant {product_id}: {db.name}")
|
||||
else:
|
||||
await self.module_logger.log_error(f"No default database found for tenant {product_id}")
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"No default database found for tenant {product_id}",
|
||||
headers={"X-Error-Message": f"No default database found for tenant {product_id}"}
|
||||
)
|
||||
# Initialize Beanie for this tenant database
|
||||
await init_beanie(database=db, document_models=tenant_document_models)
|
||||
await self.module_logger.log_info(f"Beanie initialization completed for new tenant database {product_id}")
|
||||
|
||||
# LRU put with beanie_initialized = True
|
||||
await self._lru_put(product_id, (client, db, True))
|
||||
await self.module_logger.log_info(f"Tenant database {product_id} cached successfully with Beanie initialized")
|
||||
return db
|
||||
|
||||
async def get_main_db_initialized(self) -> AsyncIOMotorDatabase:
|
||||
"""Get main database with Beanie initialized for tenant models"""
|
||||
# Re-initialize Beanie for main database with business models
|
||||
await init_beanie(database=self.main_db, document_models=document_models)
|
||||
await self.module_logger.log_info("Beanie initialization completed for main database")
|
||||
return self.main_db
|
||||
|
||||
async def _lru_put(self, key: str, value: tuple[AsyncIOMotorClient, AsyncIOMotorDatabase, bool]):
|
||||
async with self._global_lock:
|
||||
self._cache[key] = value
|
||||
self._cache.move_to_end(key)
|
||||
if len(self._cache) > self.max_size:
|
||||
old_key, (old_client, _, _) = self._cache.popitem(last=False)
|
||||
await self.module_logger.log_info(f"Cache full, removing LRU tenant: {old_key}")
|
||||
try:
|
||||
old_client.close()
|
||||
await self.module_logger.log_info(f"Closed connection for evicted tenant: {old_key}")
|
||||
except Exception as e:
|
||||
await self.module_logger.log_error(f"Error closing connection for {old_key}: {str(e)}")
|
||||
self._locks.pop(old_key, None)
|
||||
|
||||
async def aclose(self):
|
||||
async with self._global_lock:
|
||||
for key, (client, _, _) in self._cache.items():
|
||||
try:
|
||||
client.close()
|
||||
await self.module_logger.log_info(f"Closed connection for tenant: {key}")
|
||||
except Exception as e:
|
||||
await self.module_logger.log_error(f"Error closing connection for {key}: {str(e)}")
|
||||
self._cache.clear()
|
||||
self._locks.clear()
|
||||
await self.module_logger.log_info("Tenant cache cleared successfully")
|
||||
|
||||
|
||||
def register(app):
|
||||
"""Register database-related configurations and setup"""
|
||||
app.debug = site_settings.DEBUG
|
||||
app.title = site_settings.NAME
|
||||
|
||||
@app.on_event("startup")
|
||||
async def start_database():
|
||||
await initiate_database()
|
||||
await initiate_database(app)
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown_database():
|
||||
await cleanup_database()
|
||||
|
||||
|
||||
async def initiate_database():
|
||||
"""initiate Beanie database connection"""
|
||||
await init_beanie(
|
||||
database=client[MONGODB_NAME],
|
||||
document_models=document_models
|
||||
)
|
||||
async def initiate_database(app):
|
||||
"""Initialize main database and tenant cache"""
|
||||
global MAIN_CLIENT, TENANT_CACHE
|
||||
|
||||
module_logger = ModuleLogger(sender_id="DatabaseInit")
|
||||
|
||||
# 1) Create main/catalog client + DB
|
||||
MAIN_CLIENT = AsyncIOMotorClient(app_settings.MONGODB_URI)
|
||||
main_db = MAIN_CLIENT[app_settings.MONGODB_NAME]
|
||||
|
||||
# 2) Initialize Beanie for main DB with business document models
|
||||
await init_beanie(database=main_db, document_models=document_models)
|
||||
|
||||
# 3) Create tenant cache that uses main_db lookups to resolve product_id -> tenant db
|
||||
max_cache_size = getattr(app_settings, 'TENANT_CACHE_MAX', 64)
|
||||
TENANT_CACHE = TenantDBCache(main_db, max_size=max_cache_size)
|
||||
|
||||
# 4) Store on app state for middleware to access
|
||||
app.state.main_db = main_db
|
||||
app.state.tenant_cache = TENANT_CACHE
|
||||
|
||||
await module_logger.log_info("Database and tenant cache initialized successfully")
|
||||
|
||||
|
||||
async def cleanup_database():
|
||||
"""Cleanup database connections and cache"""
|
||||
global MAIN_CLIENT, TENANT_CACHE
|
||||
|
||||
module_logger = ModuleLogger(sender_id="DatabaseCleanup")
|
||||
|
||||
if TENANT_CACHE:
|
||||
await TENANT_CACHE.aclose()
|
||||
|
||||
if MAIN_CLIENT:
|
||||
MAIN_CLIENT.close()
|
||||
|
||||
await module_logger.log_info("Database connections closed successfully")
|
||||
@ -1,9 +1,11 @@
|
||||
from webapi.middleware.freeleaps_auth_middleware import FreeleapsAuthMiddleware
|
||||
from webapi.middleware.tenant_DBConnection_middleware import TenantDBConnectionMiddleware
|
||||
|
||||
|
||||
def register(app):
|
||||
"""
|
||||
Register middleware to FastAPI application
|
||||
"""
|
||||
# Register API Key middleware
|
||||
# Register middlewares
|
||||
app.add_middleware(TenantDBConnectionMiddleware)
|
||||
app.add_middleware(FreeleapsAuthMiddleware)
|
||||
Loading…
Reference in New Issue
Block a user