Merge pull request 'merge dev to master' (#84) from dev into master
Reviewed-on: freeleaps/freeleaps-service-hub#84
This commit is contained in:
commit
10cc4940f7
@ -22,3 +22,4 @@ export MONGODB_URI=mongodb://localhost:27017/
|
|||||||
export FREELEAPS_ENV=local
|
export FREELEAPS_ENV=local
|
||||||
export LOG_BASE_PATH=${CODEBASE_ROOT}/log
|
export LOG_BASE_PATH=${CODEBASE_ROOT}/log
|
||||||
|
|
||||||
|
export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/
|
||||||
@ -17,6 +17,9 @@ ENV MONGODB_NAME=freeleaps2
|
|||||||
ENV MONGODB_PORT=27017
|
ENV MONGODB_PORT=27017
|
||||||
ENV MONGODB_URI="mongodb://localhost:27017/"
|
ENV MONGODB_URI="mongodb://localhost:27017/"
|
||||||
|
|
||||||
|
# Freeleaps Auth Config
|
||||||
|
ENV AUTH_SERVICE_ENDPOINT=""
|
||||||
|
|
||||||
#log_settings
|
#log_settings
|
||||||
ENV LOG_BASE_PATH=$CONTAINER_APP_ROOT/log/$APP_NAME
|
ENV LOG_BASE_PATH=$CONTAINER_APP_ROOT/log/$APP_NAME
|
||||||
ENV BACKEND_LOG_FILE_NAME=$APP_NAME
|
ENV BACKEND_LOG_FILE_NAME=$APP_NAME
|
||||||
|
|||||||
@ -0,0 +1,52 @@
|
|||||||
|
from typing import Dict, Any
|
||||||
|
import httpx
|
||||||
|
from fastapi import HTTPException
|
||||||
|
from common.config.app_settings import app_settings
|
||||||
|
from common.log.log_utils import log_entry_exit_async
|
||||||
|
from common.log.module_logger import ModuleLogger
|
||||||
|
|
||||||
|
|
||||||
|
class ApiKeyIntrospectHandler:
|
||||||
|
"""
|
||||||
|
Freeleaps Auth Service API Key Introspect Handle
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.module_logger = ModuleLogger(sender_id=ApiKeyIntrospectHandler.__name__)
|
||||||
|
self.auth_service_base = app_settings.AUTH_SERVICE_ENDPOINT
|
||||||
|
|
||||||
|
|
||||||
|
@log_entry_exit_async
|
||||||
|
async def api_key_introspect(self, api_key: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Introspect API key by calling external auth service
|
||||||
|
|
||||||
|
Args:
|
||||||
|
api_key: The API key to introspect
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing the API key details
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
HTTPException: If the external service call fails
|
||||||
|
"""
|
||||||
|
api_url = self.auth_service_base + "keys/introspect_api_key"
|
||||||
|
await self.module_logger.log_info(f"Starting API Key validation for key")
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
response = await client.post(
|
||||||
|
api_url,
|
||||||
|
json={"api_key": api_key}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
error_detail = response.json() if response.content else {"error": "Unknown error"}
|
||||||
|
await self.module_logger.log_error(f"API Key validation failed - Status: {response.status_code}, Error: {error_detail}")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=response.status_code,
|
||||||
|
detail=error_detail
|
||||||
|
)
|
||||||
|
|
||||||
|
validation_result = response.json()
|
||||||
|
await self.module_logger.log_info(f"API Key validation successful - Active: {validation_result.get('active', False)}")
|
||||||
|
return validation_result
|
||||||
@ -4,6 +4,7 @@ from .models import (
|
|||||||
UserEmailDoc,
|
UserEmailDoc,
|
||||||
UserMobileDoc,
|
UserMobileDoc,
|
||||||
AuthCodeDoc,
|
AuthCodeDoc,
|
||||||
|
UsageLogDoc,
|
||||||
)
|
)
|
||||||
|
|
||||||
user_models = [
|
user_models = [
|
||||||
@ -12,4 +13,5 @@ user_models = [
|
|||||||
UserEmailDoc,
|
UserEmailDoc,
|
||||||
UserMobileDoc,
|
UserMobileDoc,
|
||||||
AuthCodeDoc,
|
AuthCodeDoc,
|
||||||
|
UsageLogDoc,
|
||||||
]
|
]
|
||||||
|
|||||||
@ -57,3 +57,23 @@ class AuthCodeDoc(Document):
|
|||||||
|
|
||||||
class Settings:
|
class Settings:
|
||||||
name = "user_auth_code"
|
name = "user_auth_code"
|
||||||
|
|
||||||
|
class UsageLogDoc(Document):
|
||||||
|
timestamp: datetime = datetime.utcnow() # timestamp
|
||||||
|
tenant_id: str # tenant id
|
||||||
|
operation: str # operation type
|
||||||
|
request_id: str # request id # TODO: use true one
|
||||||
|
status: str # operation status
|
||||||
|
latency_ms: int # latency time(milliseconds)
|
||||||
|
bytes_in: int # input bytes
|
||||||
|
bytes_out: int # output bytes
|
||||||
|
key_id: Optional[str] = None # API Key ID
|
||||||
|
extra: dict = {} # extra information
|
||||||
|
|
||||||
|
class Settings:
|
||||||
|
name = "usage_log_doc"
|
||||||
|
indexes = [
|
||||||
|
"tenant_id",
|
||||||
|
"request_id",
|
||||||
|
"key_id"
|
||||||
|
]
|
||||||
@ -19,8 +19,11 @@ class AppSettings(BaseSettings):
|
|||||||
DEVSVC_WEBAPI_URL_BASE: str = "http://localhost:8007/api/devsvc/"
|
DEVSVC_WEBAPI_URL_BASE: str = "http://localhost:8007/api/devsvc/"
|
||||||
NOTIFICATION_WEBAPI_URL_BASE: str = "http://localhost:8003/api/notification/"
|
NOTIFICATION_WEBAPI_URL_BASE: str = "http://localhost:8003/api/notification/"
|
||||||
|
|
||||||
|
AUTH_SERVICE_ENDPOINT: str = ""
|
||||||
|
|
||||||
MONGODB_URI: str = ""
|
MONGODB_URI: str = ""
|
||||||
MONGODB_NAME: str = ""
|
MONGODB_NAME: str = ""
|
||||||
|
TENANT_CACHE_MAX: int = 64
|
||||||
SYSTEM_USER_ID: str = "117f191e810c19729de860aa"
|
SYSTEM_USER_ID: str = "117f191e810c19729de860aa"
|
||||||
|
|
||||||
LOG_BASE_PATH: str = "./log"
|
LOG_BASE_PATH: str = "./log"
|
||||||
|
|||||||
@ -81,5 +81,5 @@ class JsonSink:
|
|||||||
exc_type, exc_value, exc_tb = record["exception"]
|
exc_type, exc_value, exc_tb = record["exception"]
|
||||||
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
|
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
|
||||||
|
|
||||||
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
|
self.log_file.write(json.dumps(log_entry, ensure_ascii=False, default=str) + "\n")
|
||||||
self.log_file.flush()
|
self.log_file.flush()
|
||||||
|
|||||||
@ -1,12 +1,15 @@
|
|||||||
APP_NAME=authentication
|
APP_NAME=authentication
|
||||||
export SERVICE_API_ACCESS_HOST=0.0.0.0
|
export SERVICE_API_ACCESS_HOST=0.0.0.0
|
||||||
export SERVICE_API_ACCESS_PORT=8004
|
export SERVICE_API_ACCESS_PORT=8004
|
||||||
|
export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/
|
||||||
|
export AUTH_SERVICE_PORT=9000
|
||||||
export CONTAINER_APP_ROOT=/app
|
export CONTAINER_APP_ROOT=/app
|
||||||
export LOG_BASE_PATH=$CONTAINER_APP_ROOT/log/$APP_NAME
|
export LOG_BASE_PATH=$CONTAINER_APP_ROOT/log/$APP_NAME
|
||||||
export BACKEND_LOG_FILE_NAME=$APP_NAME
|
export BACKEND_LOG_FILE_NAME=$APP_NAME
|
||||||
export APPLICATION_ACTIVITY_LOG=$APP_NAME-activity
|
export APPLICATION_ACTIVITY_LOG=$APP_NAME-activity
|
||||||
export MONGODB_NAME=freeleaps2
|
export MONGODB_NAME=freeleaps2
|
||||||
export MONGODB_PORT=27017
|
export MONGODB_PORT=27017
|
||||||
|
export TENANT_CACHE_MAX=64
|
||||||
export JWT_SECRET_KEY=ea84edf152976b2fcec12b78aa8e45bc26a5cf0ef61bf16f5c317ae33b3fd8b0
|
export JWT_SECRET_KEY=ea84edf152976b2fcec12b78aa8e45bc26a5cf0ef61bf16f5c317ae33b3fd8b0
|
||||||
GIT_REPO_ROOT=/mnt/freeleaps/freeleaps-service-hub
|
GIT_REPO_ROOT=/mnt/freeleaps/freeleaps-service-hub
|
||||||
CODEBASE_ROOT=/mnt/freeleaps/freeleaps-service-hub/apps/authentication
|
CODEBASE_ROOT=/mnt/freeleaps/freeleaps-service-hub/apps/authentication
|
||||||
|
|||||||
@ -11,6 +11,7 @@ from webapi.providers import metrics
|
|||||||
|
|
||||||
# from webapi.providers import scheduler
|
# from webapi.providers import scheduler
|
||||||
from webapi.providers import exception_handler
|
from webapi.providers import exception_handler
|
||||||
|
from webapi.providers import middleware
|
||||||
from .freeleaps_app import FreeleapsApp
|
from .freeleaps_app import FreeleapsApp
|
||||||
from common.config.app_settings import app_settings
|
from common.config.app_settings import app_settings
|
||||||
|
|
||||||
@ -20,6 +21,10 @@ def create_app() -> FastAPI:
|
|||||||
app = FreeleapsApp()
|
app = FreeleapsApp()
|
||||||
|
|
||||||
register_logger()
|
register_logger()
|
||||||
|
# 1. Register middleware firstly
|
||||||
|
register(app, middleware)
|
||||||
|
|
||||||
|
# 2. Register other providers
|
||||||
register(app, exception_handler)
|
register(app, exception_handler)
|
||||||
register(app, database)
|
register(app, database)
|
||||||
register(app, router)
|
register(app, router)
|
||||||
|
|||||||
4
apps/authentication/webapi/middleware/__init__.py
Normal file
4
apps/authentication/webapi/middleware/__init__.py
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
from .freeleaps_auth_middleware import FreeleapsAuthMiddleware
|
||||||
|
from .database_middleware import DatabaseMiddleware
|
||||||
|
|
||||||
|
__all__ = ['FreeleapsAuthMiddleware', 'DatabaseMiddleware']
|
||||||
78
apps/authentication/webapi/middleware/database_middleware.py
Normal file
78
apps/authentication/webapi/middleware/database_middleware.py
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
from fastapi import Request, status
|
||||||
|
from fastapi.responses import JSONResponse
|
||||||
|
from webapi.middleware.freeleaps_auth_middleware import request_context_var
|
||||||
|
from common.log.module_logger import ModuleLogger
|
||||||
|
|
||||||
|
|
||||||
|
class DatabaseMiddleware:
|
||||||
|
def __init__(self, app):
|
||||||
|
self.app = app
|
||||||
|
self.module_logger = ModuleLogger(sender_id=DatabaseMiddleware)
|
||||||
|
|
||||||
|
async def __call__(self, scope, receive, send):
|
||||||
|
if scope["type"] != "http":
|
||||||
|
return await self.app(scope, receive, send)
|
||||||
|
|
||||||
|
request = Request(scope, receive)
|
||||||
|
|
||||||
|
# Get tenant id from auth context (set by FreeleapsAuthMiddleware)
|
||||||
|
product_id = None
|
||||||
|
try:
|
||||||
|
ctx = request_context_var.get()
|
||||||
|
product_id = getattr(ctx, "product_id", None)
|
||||||
|
await self.module_logger.log_info(f"Retrieved product_id from auth context: {product_id}")
|
||||||
|
except Exception as e:
|
||||||
|
await self.module_logger.log_error(f"Failed to get auth context: {str(e)}")
|
||||||
|
product_id = None
|
||||||
|
|
||||||
|
# Get tenant cache and main database from app state
|
||||||
|
try:
|
||||||
|
tenant_cache = request.app.state.tenant_cache
|
||||||
|
main_db = request.app.state.main_db
|
||||||
|
await self.module_logger.log_info(f"Retrieved app state - tenant_cache: {'success' if tenant_cache is not None else 'fail'}, main_db: {'success' if main_db is not None else 'fail'}")
|
||||||
|
except Exception as e:
|
||||||
|
await self.module_logger.log_error(f"Failed to get app state: {str(e)}")
|
||||||
|
response = JSONResponse(
|
||||||
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||||
|
content={"detail": "Database not properly initialized"}
|
||||||
|
)
|
||||||
|
return await response(scope, receive, send)
|
||||||
|
|
||||||
|
if not product_id:
|
||||||
|
# Compatibility / public routes: use main database with tenant models initialized
|
||||||
|
await self.module_logger.log_info(f"No product_id - using main database for path: {request.url.path}")
|
||||||
|
|
||||||
|
# Get main database with Beanie initialized for tenant models
|
||||||
|
main_db_initialized = await tenant_cache.get_main_db_initialized()
|
||||||
|
|
||||||
|
request.state.db = main_db_initialized
|
||||||
|
request.state.product_id = None
|
||||||
|
await self.module_logger.log_info(f"Successfully initialized main database with tenant models")
|
||||||
|
return await self.app(scope, receive, send)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get tenant-specific database with Beanie already initialized (cached)
|
||||||
|
await self.module_logger.log_info(f"Attempting to get tenant database for product_id: {product_id}")
|
||||||
|
tenant_db = await tenant_cache.get_initialized_db(product_id)
|
||||||
|
|
||||||
|
request.state.db = tenant_db
|
||||||
|
request.state.product_id = product_id
|
||||||
|
await self.module_logger.log_info(f"Successfully retrieved cached tenant database with Beanie for product_id: {product_id}")
|
||||||
|
return await self.app(scope, receive, send)
|
||||||
|
|
||||||
|
except ValueError as e:
|
||||||
|
# Handle tenant not found or inactive (ValueError from TenantDBCache)
|
||||||
|
await self.module_logger.log_error(f"Tenant error for {product_id}: {str(e)}")
|
||||||
|
response = JSONResponse(
|
||||||
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
|
content={"detail": str(e)}
|
||||||
|
)
|
||||||
|
return await response(scope, receive, send)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
await self.module_logger.log_error(f"Database error for tenant {product_id}: {str(e)}")
|
||||||
|
response = JSONResponse(
|
||||||
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||||
|
content={"detail": "Database connection error"}
|
||||||
|
)
|
||||||
|
return await response(scope, receive, send)
|
||||||
@ -0,0 +1,191 @@
|
|||||||
|
import httpx
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
import contextvars
|
||||||
|
from datetime import datetime
|
||||||
|
from starlette.requests import Request
|
||||||
|
from fastapi import HTTPException, Response
|
||||||
|
from typing import Dict, Any, Optional
|
||||||
|
from common.log.module_logger import ModuleLogger
|
||||||
|
|
||||||
|
from backend.models.user.models import UsageLogDoc
|
||||||
|
from backend.infra.api_key_introspect_handler import ApiKeyIntrospectHandler
|
||||||
|
|
||||||
|
# Define context data class
|
||||||
|
class RequestContext:
|
||||||
|
def __init__(self, tenant_name: str = None, product_id: str = None, key_id: str = None):
|
||||||
|
self.tenant_name = tenant_name
|
||||||
|
self.product_id = product_id
|
||||||
|
self.key_id = key_id
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"RequestContext(tenant_name='{self.tenant_name}', product_id='{self.product_id}', key_id='{self.key_id}')"
|
||||||
|
|
||||||
|
# Create context variable, store RequestContext object
|
||||||
|
request_context_var = contextvars.ContextVar('request_context', default=RequestContext())
|
||||||
|
|
||||||
|
class FreeleapsAuthMiddleware:
|
||||||
|
"""
|
||||||
|
Notification service API Key middleware
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, app):
|
||||||
|
self.app = app
|
||||||
|
self.api_key_introspect_handler = ApiKeyIntrospectHandler()
|
||||||
|
self.module_logger = ModuleLogger(sender_id=FreeleapsAuthMiddleware)
|
||||||
|
|
||||||
|
async def __call__(self, scope, receive, send):
|
||||||
|
"""
|
||||||
|
Middleware main function, execute before and after request processing
|
||||||
|
"""
|
||||||
|
if scope["type"] != "http":
|
||||||
|
await self.app(scope, receive, send)
|
||||||
|
return
|
||||||
|
|
||||||
|
request = Request(scope, receive)
|
||||||
|
start_time = time.time()
|
||||||
|
validation_result = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
# 1. Skip paths that do not need validation
|
||||||
|
if self._should_skip_validation(request.url.path):
|
||||||
|
await self.module_logger.log_info(f"Path skipped validation: {request.url.path}")
|
||||||
|
await self.app(scope, receive, send)
|
||||||
|
return
|
||||||
|
|
||||||
|
# 2. Extract API Key from request header
|
||||||
|
api_key = request.headers.get("X-API-KEY")
|
||||||
|
# if the API_KEY field is empty, the request can be processed directly without validation.
|
||||||
|
# for compatibility
|
||||||
|
if not api_key or api_key == "":
|
||||||
|
await self.module_logger.log_info(f"API Key is empty: {request.url.path}")
|
||||||
|
await self.app(scope, receive, send)
|
||||||
|
return
|
||||||
|
|
||||||
|
# 3. Call freeleaps_auth to validate API Key
|
||||||
|
validation_result = await self.api_key_introspect_handler.api_key_introspect(api_key)
|
||||||
|
|
||||||
|
# 4. Store validation result in contextvars for later use
|
||||||
|
request_context = RequestContext(
|
||||||
|
tenant_name=validation_result.get("tenant_name"),
|
||||||
|
product_id=validation_result.get("product_id"),
|
||||||
|
key_id=validation_result.get("key_id")
|
||||||
|
)
|
||||||
|
request_context_var.set(request_context)
|
||||||
|
|
||||||
|
# 6. Process request and capture response
|
||||||
|
response_captured = None
|
||||||
|
|
||||||
|
async def send_wrapper(message):
|
||||||
|
nonlocal response_captured
|
||||||
|
if message["type"] == "http.response.start":
|
||||||
|
# Convert bytes headers to string headers
|
||||||
|
headers = {}
|
||||||
|
for header_name, header_value in message.get("headers", []):
|
||||||
|
if isinstance(header_name, bytes):
|
||||||
|
header_name = header_name.decode('latin-1')
|
||||||
|
if isinstance(header_value, bytes):
|
||||||
|
header_value = header_value.decode('latin-1')
|
||||||
|
headers[header_name] = header_value
|
||||||
|
|
||||||
|
response_captured = Response(
|
||||||
|
status_code=message["status"],
|
||||||
|
headers=headers,
|
||||||
|
media_type="application/json"
|
||||||
|
)
|
||||||
|
await send(message)
|
||||||
|
|
||||||
|
await self.app(scope, receive, send_wrapper)
|
||||||
|
|
||||||
|
# 7. Record usage log after request processing
|
||||||
|
if response_captured:
|
||||||
|
await self._log_usage(validation_result, request, response_captured, start_time)
|
||||||
|
|
||||||
|
except HTTPException as http_exc:
|
||||||
|
# Pass through HTTP exceptions (401, 403, etc.) from auth service
|
||||||
|
await self.module_logger.log_info(f"API Key validation failed: {http_exc.status_code} - {http_exc.detail}")
|
||||||
|
response = Response(
|
||||||
|
status_code=http_exc.status_code,
|
||||||
|
content=f'{{"error": "Authentication failed", "message": "{str(http_exc.detail)}"}}',
|
||||||
|
media_type="application/json"
|
||||||
|
)
|
||||||
|
await response(scope, receive, send)
|
||||||
|
except Exception as e:
|
||||||
|
await self.module_logger.log_error(f"Middleware error: {str(e)}")
|
||||||
|
response = Response(
|
||||||
|
status_code=500,
|
||||||
|
content=f'{{"error": "Internal error", "message": "Failed to process request", "details": "{str(e)}"}}',
|
||||||
|
media_type="application/json"
|
||||||
|
)
|
||||||
|
await response(scope, receive, send)
|
||||||
|
|
||||||
|
def _should_skip_validation(self, path: str) -> bool:
|
||||||
|
"""
|
||||||
|
Check if the path should be skipped for validation
|
||||||
|
"""
|
||||||
|
skip_paths = [
|
||||||
|
"/api/_/healthz", # Health check endpoint
|
||||||
|
"/api/_/readyz", # Readiness check endpoint
|
||||||
|
"/api/_/livez", # Liveness check endpoint
|
||||||
|
"/metrics", # Metrics endpoint
|
||||||
|
"/docs", # API documentation
|
||||||
|
"/openapi.json", # OpenAPI specification
|
||||||
|
"/favicon.ico" # Website icon
|
||||||
|
]
|
||||||
|
|
||||||
|
# Check exact match for root path
|
||||||
|
if path == "/":
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Check startswith for other paths
|
||||||
|
return any(path.startswith(skip_path) for skip_path in skip_paths)
|
||||||
|
|
||||||
|
async def _log_usage(self, validation_result: Dict[str, Any], request: Request,
|
||||||
|
response: Response, start_time: float) -> None:
|
||||||
|
"""
|
||||||
|
Record API usage log
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# calculate processing time
|
||||||
|
process_time = (time.time() - start_time) * 1000
|
||||||
|
|
||||||
|
# get request body size
|
||||||
|
try:
|
||||||
|
request_body = await request.body()
|
||||||
|
bytes_in = len(request_body) if request.method in ["POST", "PUT", "PATCH"] else 0
|
||||||
|
except Exception:
|
||||||
|
bytes_in = 0
|
||||||
|
|
||||||
|
bytes_out = 0
|
||||||
|
if hasattr(response, 'headers'):
|
||||||
|
content_length = response.headers.get('content-length')
|
||||||
|
if content_length:
|
||||||
|
bytes_out = int(content_length)
|
||||||
|
|
||||||
|
# create usage log document
|
||||||
|
usage_log_doc = UsageLogDoc(
|
||||||
|
timestamp=datetime.utcnow(),
|
||||||
|
tenant_id=validation_result.get("tenant_name"),
|
||||||
|
operation=f"{request.method} {request.url.path}",
|
||||||
|
request_id=request.headers.get("X-Request-ID", "unknown"),
|
||||||
|
status="success" if response.status_code < 400 else "error",
|
||||||
|
latency_ms=int(process_time),
|
||||||
|
bytes_in=bytes_in,
|
||||||
|
bytes_out=bytes_out,
|
||||||
|
key_id=validation_result.get("key_id"),
|
||||||
|
extra={
|
||||||
|
"tenant_name": request_context_var.get().tenant_name,
|
||||||
|
"product_id": request_context_var.get().product_id,
|
||||||
|
"scopes": validation_result.get("scopes"),
|
||||||
|
"user_agent": request.headers.get("User-Agent"),
|
||||||
|
"ip_address": request.client.host if request.client else "unknown",
|
||||||
|
"response_status": response.status_code
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# save to database
|
||||||
|
await usage_log_doc.save()
|
||||||
|
await self.module_logger.log_info(f"API Usage logged: {usage_log_doc.operation} for tenant {usage_log_doc.tenant_id}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
await self.module_logger.log_error(f"Failed to log usage: {str(e)}")
|
||||||
@ -1,38 +1,253 @@
|
|||||||
import logging
|
from webapi.config.site_settings import site_settings
|
||||||
import asyncio
|
|
||||||
from common.config.app_settings import app_settings
|
|
||||||
from beanie import init_beanie
|
from beanie import init_beanie
|
||||||
from motor.motor_asyncio import AsyncIOMotorClient
|
from fastapi import HTTPException
|
||||||
from backend.models import backend_models
|
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
|
||||||
from common.probes import ProbeResult
|
from backend.models.user.models import (
|
||||||
|
UserAccountDoc,
|
||||||
client = AsyncIOMotorClient(
|
UserPasswordDoc,
|
||||||
app_settings.MONGODB_URI,
|
UserEmailDoc,
|
||||||
serverSelectionTimeoutMS=60000,
|
UserMobileDoc,
|
||||||
minPoolSize=5, # Minimum number of connections in the pool
|
AuthCodeDoc,
|
||||||
maxPoolSize=20, # Maximum number of connections in the pool
|
UsageLogDoc
|
||||||
heartbeatFrequencyMS=20000, # Adjust heartbeat frequency to 20 seconds
|
|
||||||
)
|
)
|
||||||
|
from backend.models.user_profile.models import BasicProfileDoc
|
||||||
|
from backend.models.permission.models import PermissionDoc, RoleDoc, UserRoleDoc
|
||||||
|
from common.config.app_settings import app_settings
|
||||||
|
from common.log.module_logger import ModuleLogger
|
||||||
|
from common.probes import ProbeResult
|
||||||
|
import asyncio
|
||||||
|
from collections import OrderedDict
|
||||||
|
from typing import Optional, Union
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
# Global variables for database management
|
||||||
|
MAIN_CLIENT: Optional[AsyncIOMotorClient] = None
|
||||||
|
TENANT_CACHE: Optional['TenantDBCache'] = None
|
||||||
|
|
||||||
|
# Define document models
|
||||||
|
document_models = [
|
||||||
|
UsageLogDoc,
|
||||||
|
UserAccountDoc,
|
||||||
|
UserPasswordDoc,
|
||||||
|
UserEmailDoc,
|
||||||
|
UserMobileDoc,
|
||||||
|
AuthCodeDoc,
|
||||||
|
BasicProfileDoc,
|
||||||
|
PermissionDoc,
|
||||||
|
RoleDoc,
|
||||||
|
UserRoleDoc
|
||||||
|
]
|
||||||
|
|
||||||
|
tenant_document_models = [
|
||||||
|
UserAccountDoc,
|
||||||
|
UserPasswordDoc,
|
||||||
|
UserEmailDoc,
|
||||||
|
UserMobileDoc,
|
||||||
|
AuthCodeDoc,
|
||||||
|
BasicProfileDoc,
|
||||||
|
PermissionDoc,
|
||||||
|
RoleDoc,
|
||||||
|
UserRoleDoc
|
||||||
|
]
|
||||||
|
|
||||||
|
class TenantDBCache:
|
||||||
|
"""
|
||||||
|
Enhanced tenant database cache that caches only clients, not databases.
|
||||||
|
product_id -> AsyncIOMotorClient
|
||||||
|
Uses main_db.tenant_doc to resolve mongodb_uri; caches clients with LRU.
|
||||||
|
Database instances are created fresh each time from cached clients.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, main_db: AsyncIOMotorDatabase, max_size: int = 64):
|
||||||
|
self.main_db = main_db
|
||||||
|
self.max_size = max_size
|
||||||
|
self._cache: "OrderedDict[str, AsyncIOMotorClient]" = OrderedDict()
|
||||||
|
self._locks: dict[str, asyncio.Lock] = {}
|
||||||
|
self._global_lock = asyncio.Lock()
|
||||||
|
self.module_logger = ModuleLogger(sender_id="TenantDBCache")
|
||||||
|
|
||||||
|
async def get_initialized_db(self, product_id: str) -> AsyncIOMotorDatabase:
|
||||||
|
"""Get tenant database with Beanie already initialized"""
|
||||||
|
|
||||||
|
# fast-path: check if client is cached
|
||||||
|
cached_client = self._cache.get(product_id)
|
||||||
|
if cached_client:
|
||||||
|
await self.module_logger.log_info(f"Found cached client for {product_id}")
|
||||||
|
self._cache.move_to_end(product_id)
|
||||||
|
|
||||||
|
# Get fresh database instance from cached client
|
||||||
|
db = cached_client.get_default_database()
|
||||||
|
if db is not None:
|
||||||
|
# Initialize Beanie for this fresh database instance
|
||||||
|
await init_beanie(database=db, document_models=tenant_document_models)
|
||||||
|
await self.module_logger.log_info(f"Beanie initialization completed for {product_id} using cached client")
|
||||||
|
return db
|
||||||
|
else:
|
||||||
|
await self.module_logger.log_error(f"No default database found for cached client {product_id}")
|
||||||
|
# Remove invalid cached client
|
||||||
|
del self._cache[product_id]
|
||||||
|
|
||||||
|
# double-checked under per-tenant lock
|
||||||
|
lock = self._locks.setdefault(product_id, asyncio.Lock())
|
||||||
|
async with lock:
|
||||||
|
cached_client = self._cache.get(product_id)
|
||||||
|
if cached_client:
|
||||||
|
await self.module_logger.log_info(f"Double-check found cached client for {product_id}")
|
||||||
|
self._cache.move_to_end(product_id)
|
||||||
|
|
||||||
|
# Get fresh database instance from cached client
|
||||||
|
db = cached_client.get_default_database()
|
||||||
|
if db is not None:
|
||||||
|
# Initialize Beanie for this fresh database instance
|
||||||
|
await init_beanie(database=db, document_models=tenant_document_models)
|
||||||
|
await self.module_logger.log_info(f"Beanie initialization completed for {product_id} using cached client (double-check)")
|
||||||
|
return db
|
||||||
|
else:
|
||||||
|
await self.module_logger.log_error(f"No default database found for cached client {product_id}")
|
||||||
|
# Remove invalid cached client
|
||||||
|
del self._cache[product_id]
|
||||||
|
|
||||||
|
# Create new tenant connection - use raw MongoDB query since we don't have TenantDoc model
|
||||||
|
"""
|
||||||
|
tenant_doc content:
|
||||||
|
{
|
||||||
|
"tenant_name": "magicleaps",
|
||||||
|
"product_id": "68a3f19119cfaf36316f6d14",
|
||||||
|
"mongodb_uri": "mongodb://localhost:27017/interview",
|
||||||
|
"status": "active"
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
tenant = await self.main_db["tenant_doc"].find_one({"product_id": product_id})
|
||||||
|
if not tenant:
|
||||||
|
await self.module_logger.log_error(f"Tenant {product_id} does not exist in main database")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=404,
|
||||||
|
detail=f"Tenant {product_id} does not exist",
|
||||||
|
headers={"X-Error-Message": f"Tenant {product_id} does not exist"}
|
||||||
|
)
|
||||||
|
if tenant.get("status") != "active":
|
||||||
|
await self.module_logger.log_error(f"Tenant {product_id} is not active, status: {tenant.get('status')}")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=403,
|
||||||
|
detail=f"Tenant {product_id} is not active",
|
||||||
|
headers={"X-Error-Message": f"Tenant {product_id} is not active, status: {tenant.get('status')}"}
|
||||||
|
)
|
||||||
|
|
||||||
|
uri = tenant["mongodb_uri"]
|
||||||
|
client = AsyncIOMotorClient(uri, minPoolSize=3, maxPoolSize=20, serverSelectionTimeoutMS=10000)
|
||||||
|
|
||||||
|
# robust db name resolution (get_default_database handles mongodb+srv and empty paths)
|
||||||
|
default_db = client.get_default_database()
|
||||||
|
if default_db is not None:
|
||||||
|
db = default_db
|
||||||
|
await self.module_logger.log_info(f"Using default database for tenant {product_id}: {db.name}")
|
||||||
|
else:
|
||||||
|
await self.module_logger.log_error(f"No default database found for tenant {product_id}")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail=f"No default database found for tenant {product_id}",
|
||||||
|
headers={"X-Error-Message": f"No default database found for tenant {product_id}"}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Initialize Beanie for this tenant database
|
||||||
|
await init_beanie(database=db, document_models=tenant_document_models)
|
||||||
|
await self.module_logger.log_info(f"Beanie initialization completed for new tenant database {product_id}")
|
||||||
|
|
||||||
|
# Cache only the client
|
||||||
|
await self._lru_put(product_id, client)
|
||||||
|
await self.module_logger.log_info(f"Tenant client {product_id} cached successfully")
|
||||||
|
return db
|
||||||
|
|
||||||
|
async def get_main_db_initialized(self) -> AsyncIOMotorDatabase:
|
||||||
|
"""Get main database with Beanie initialized for tenant models"""
|
||||||
|
# Re-initialize Beanie for main database with business models
|
||||||
|
await init_beanie(database=self.main_db, document_models=document_models)
|
||||||
|
await self.module_logger.log_info("Beanie initialization completed for main database")
|
||||||
|
return self.main_db
|
||||||
|
|
||||||
|
async def _lru_put(self, key: str, client: AsyncIOMotorClient):
|
||||||
|
async with self._global_lock:
|
||||||
|
self._cache[key] = client
|
||||||
|
self._cache.move_to_end(key)
|
||||||
|
if len(self._cache) > self.max_size:
|
||||||
|
old_key, old_client = self._cache.popitem(last=False)
|
||||||
|
await self.module_logger.log_info(f"Cache full, removing LRU tenant: {old_key}")
|
||||||
|
try:
|
||||||
|
old_client.close()
|
||||||
|
await self.module_logger.log_info(f"Closed connection for evicted tenant: {old_key}")
|
||||||
|
except Exception as e:
|
||||||
|
await self.module_logger.log_error(f"Error closing connection for {old_key}: {str(e)}")
|
||||||
|
self._locks.pop(old_key, None)
|
||||||
|
|
||||||
|
async def aclose(self):
|
||||||
|
async with self._global_lock:
|
||||||
|
for key, client in self._cache.items():
|
||||||
|
try:
|
||||||
|
client.close()
|
||||||
|
await self.module_logger.log_info(f"Closed connection for tenant: {key}")
|
||||||
|
except Exception as e:
|
||||||
|
await self.module_logger.log_error(f"Error closing connection for {key}: {str(e)}")
|
||||||
|
self._cache.clear()
|
||||||
|
self._locks.clear()
|
||||||
|
await self.module_logger.log_info("Tenant cache cleared successfully")
|
||||||
|
|
||||||
|
|
||||||
def register(app):
|
def register(app):
|
||||||
app.debug = "auth_mongo_debug"
|
"""Register database-related configurations and setup"""
|
||||||
app.title = "auth_mongo_name"
|
app.debug = site_settings.DEBUG
|
||||||
|
app.title = site_settings.NAME
|
||||||
# Configure logging for pymongo
|
|
||||||
logging.getLogger("pymongo").setLevel(logging.WARNING) # Suppress DEBUG logs
|
|
||||||
|
|
||||||
@app.on_event("startup")
|
@app.on_event("startup")
|
||||||
async def start_database():
|
async def start_database():
|
||||||
await initiate_database()
|
await initiate_database(app)
|
||||||
|
|
||||||
|
@app.on_event("shutdown")
|
||||||
|
async def shutdown_database():
|
||||||
|
await cleanup_database()
|
||||||
|
|
||||||
async def check_database_initialized() -> ProbeResult:
|
async def check_database_initialized() -> ProbeResult:
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(client.server_info(), timeout=5)
|
await asyncio.wait_for(MAIN_CLIENT.server_info(), timeout=5)
|
||||||
return ProbeResult(success=True, message="service has been initialized and ready to serve")
|
return ProbeResult(success=True, message="service has been initialized and ready to serve")
|
||||||
except Exception:
|
except Exception:
|
||||||
return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"})
|
return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"})
|
||||||
|
|
||||||
async def initiate_database():
|
|
||||||
await init_beanie(
|
async def initiate_database(app):
|
||||||
database=client[app_settings.MONGODB_NAME], document_models=backend_models
|
"""Initialize main database and tenant cache"""
|
||||||
)
|
global MAIN_CLIENT, TENANT_CACHE
|
||||||
|
|
||||||
|
module_logger = ModuleLogger(sender_id="DatabaseInit")
|
||||||
|
|
||||||
|
# 1) Create main/catalog client + DB
|
||||||
|
MAIN_CLIENT = AsyncIOMotorClient(app_settings.MONGODB_URI)
|
||||||
|
main_db = MAIN_CLIENT[app_settings.MONGODB_NAME]
|
||||||
|
|
||||||
|
# 2) Initialize Beanie for main DB with business document models
|
||||||
|
await init_beanie(database=main_db, document_models=document_models)
|
||||||
|
|
||||||
|
# 3) Create tenant cache that uses main_db lookups to resolve product_id -> tenant db
|
||||||
|
max_cache_size = getattr(app_settings, 'TENANT_CACHE_MAX', 64)
|
||||||
|
TENANT_CACHE = TenantDBCache(main_db, max_size=max_cache_size)
|
||||||
|
|
||||||
|
# 4) Store on app state for middleware to access
|
||||||
|
app.state.main_db = main_db
|
||||||
|
app.state.tenant_cache = TENANT_CACHE
|
||||||
|
|
||||||
|
await module_logger.log_info("Database and tenant cache initialized successfully")
|
||||||
|
|
||||||
|
|
||||||
|
async def cleanup_database():
|
||||||
|
"""Cleanup database connections and cache"""
|
||||||
|
global MAIN_CLIENT, TENANT_CACHE
|
||||||
|
|
||||||
|
module_logger = ModuleLogger(sender_id="DatabaseCleanup")
|
||||||
|
|
||||||
|
if TENANT_CACHE:
|
||||||
|
await TENANT_CACHE.aclose()
|
||||||
|
|
||||||
|
if MAIN_CLIENT:
|
||||||
|
MAIN_CLIENT.close()
|
||||||
|
|
||||||
|
await module_logger.log_info("Database connections closed successfully")
|
||||||
11
apps/authentication/webapi/providers/middleware.py
Normal file
11
apps/authentication/webapi/providers/middleware.py
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
from webapi.middleware.freeleaps_auth_middleware import FreeleapsAuthMiddleware
|
||||||
|
from webapi.middleware.database_middleware import DatabaseMiddleware
|
||||||
|
|
||||||
|
|
||||||
|
def register(app):
|
||||||
|
"""
|
||||||
|
Register middleware to FastAPI application
|
||||||
|
"""
|
||||||
|
# Register middlewares
|
||||||
|
app.add_middleware(DatabaseMiddleware)
|
||||||
|
app.add_middleware(FreeleapsAuthMiddleware)
|
||||||
@ -1,19 +1,14 @@
|
|||||||
from common.probes import ProbeManager, ProbeType
|
from common.probes import ProbeManager, ProbeType
|
||||||
from common.probes.adapters import FastAPIAdapter
|
from common.probes.adapters import FastAPIAdapter
|
||||||
from .database import check_database_initialized
|
|
||||||
|
|
||||||
def register(app):
|
def register(app):
|
||||||
probes_manager = ProbeManager()
|
probes_manager = ProbeManager()
|
||||||
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
|
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
|
||||||
|
|
||||||
async def readiness_checker():
|
|
||||||
return await check_database_initialized()
|
|
||||||
|
|
||||||
probes_manager.register(
|
probes_manager.register(
|
||||||
name="readiness",
|
name="readiness",
|
||||||
prefix="/api",
|
prefix="/api",
|
||||||
type=ProbeType.READINESS,
|
type=ProbeType.READINESS,
|
||||||
check_func=readiness_checker,
|
|
||||||
frameworks=["fastapi"]
|
frameworks=["fastapi"]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -28,5 +28,5 @@ export RABBITMQ_PORT=5672
|
|||||||
export FREELEAPS_ENV=local
|
export FREELEAPS_ENV=local
|
||||||
export LOG_BASE_PATH=${CODEBASE_ROOT}/log
|
export LOG_BASE_PATH=${CODEBASE_ROOT}/log
|
||||||
|
|
||||||
export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/keys/
|
export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/
|
||||||
export AUTH_SERVICE_PORT=9000
|
export AUTH_SERVICE_PORT=9000
|
||||||
|
|||||||
@ -30,8 +30,8 @@ class ApiKeyIntrospectHandler:
|
|||||||
Raises:
|
Raises:
|
||||||
HTTPException: If the external service call fails
|
HTTPException: If the external service call fails
|
||||||
"""
|
"""
|
||||||
api_url = self.auth_service_base + "introspect_api_key"
|
api_url = self.auth_service_base + "keys/introspect_api_key"
|
||||||
await self.module_logger.log_info(f"Starting API Key validation for key: {api_key[:8]}...")
|
await self.module_logger.log_info(f"Starting API Key validation for key")
|
||||||
|
|
||||||
async with httpx.AsyncClient() as client:
|
async with httpx.AsyncClient() as client:
|
||||||
response = await client.post(
|
response = await client.post(
|
||||||
|
|||||||
@ -111,7 +111,6 @@ class UsageLogDoc(Document):
|
|||||||
tenant_id: str # tenant id
|
tenant_id: str # tenant id
|
||||||
operation: str # operation type
|
operation: str # operation type
|
||||||
request_id: str # request id # TODO: use true one
|
request_id: str # request id # TODO: use true one
|
||||||
units: int # units
|
|
||||||
status: str # operation status
|
status: str # operation status
|
||||||
latency_ms: int # latency time(milliseconds)
|
latency_ms: int # latency time(milliseconds)
|
||||||
bytes_in: int # input bytes
|
bytes_in: int # input bytes
|
||||||
|
|||||||
@ -5,7 +5,7 @@ export CONTAINER_APP_ROOT=/app
|
|||||||
export LOG_BASE_PATH=$CONTAINER_APP_ROOT/log/$APP_NAME
|
export LOG_BASE_PATH=$CONTAINER_APP_ROOT/log/$APP_NAME
|
||||||
export BACKEND_LOG_FILE_NAME=$APP_NAME
|
export BACKEND_LOG_FILE_NAME=$APP_NAME
|
||||||
export APPLICATION_ACTIVITY_LOG=$APP_NAME-activity
|
export APPLICATION_ACTIVITY_LOG=$APP_NAME-activity
|
||||||
export SENDGRID_API_KEY=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
|
export SENDGRID_API_KEY=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
|
||||||
export EMAIL_FROM=freeleaps@freeleaps.com
|
export EMAIL_FROM=freeleaps@freeleaps.com
|
||||||
export TWILIO_ACCOUNT_SID=ACf8c9283a6acda060258eadb29be58bc8
|
export TWILIO_ACCOUNT_SID=ACf8c9283a6acda060258eadb29be58bc8
|
||||||
export TWILIO_AUTH_TOKEN=ef160748cc22c8b7195b49df4b8eca7e
|
export TWILIO_AUTH_TOKEN=ef160748cc22c8b7195b49df4b8eca7e
|
||||||
@ -27,7 +27,7 @@ export DOCKER_BACKEND_LOG_HOME=$DOCKER_BACKEND_HOME/log
|
|||||||
export RABBITMQ_HOST=localhost
|
export RABBITMQ_HOST=localhost
|
||||||
export RABBITMQ_PORT=5672
|
export RABBITMQ_PORT=5672
|
||||||
|
|
||||||
export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/keys/
|
export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/
|
||||||
export TENANT_CACHE_MAX=64
|
export TENANT_CACHE_MAX=64
|
||||||
|
|
||||||
# for local environment
|
# for local environment
|
||||||
|
|||||||
@ -168,7 +168,6 @@ class FreeleapsAuthMiddleware:
|
|||||||
tenant_id=validation_result.get("tenant_name"),
|
tenant_id=validation_result.get("tenant_name"),
|
||||||
operation=f"{request.method} {request.url.path}",
|
operation=f"{request.method} {request.url.path}",
|
||||||
request_id=request.headers.get("X-Request-ID", "unknown"),
|
request_id=request.headers.get("X-Request-ID", "unknown"),
|
||||||
units=1, # TODO: adjust according to business logic
|
|
||||||
status="success" if response.status_code < 400 else "error",
|
status="success" if response.status_code < 400 else "error",
|
||||||
latency_ms=int(process_time),
|
latency_ms=int(process_time),
|
||||||
bytes_in=bytes_in,
|
bytes_in=bytes_in,
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user