Merge pull request 'tania_middleware' (#46) from tania_middleware into dev
Reviewed-on: freeleaps/freeleaps-service-hub#46
This commit is contained in:
commit
071694cefe
@ -28,4 +28,5 @@ export RABBITMQ_PORT=5672
|
||||
export FREELEAPS_ENV=local
|
||||
export LOG_BASE_PATH=${CODEBASE_ROOT}/log
|
||||
|
||||
|
||||
export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/keys/
|
||||
export AUTH_SERVICE_PORT=9000
|
||||
|
||||
@ -20,6 +20,9 @@ ENV SENDGRID_API_KEY=SG.OrxsRI0IRaOxkd7xTfb8SA.J8CfOXsJy3vrJgTubbLmZOR6ii7z7m7C9
|
||||
ENV TWILIO_ACCOUNT_SID=ACf8c9283a6acda060258eadb29be58bc8
|
||||
ENV TWILIO_AUTH_TOKEN=120165c0550111ddfd58efc97dafc2fe
|
||||
|
||||
# Freeleaps Auth Config
|
||||
ENV AUTH_SERVICE_ENDPOINT=""
|
||||
|
||||
#log_settings
|
||||
ENV LOG_BASE_PATH=$CONTAINER_APP_ROOT/log/$APP_NAME
|
||||
ENV BACKEND_LOG_FILE_NAME=$APP_NAME
|
||||
|
||||
@ -0,0 +1,52 @@
|
||||
from typing import Dict, Any
|
||||
import httpx
|
||||
from fastapi import HTTPException
|
||||
from common.config.app_settings import app_settings
|
||||
from common.log.log_utils import log_entry_exit_async
|
||||
from common.log.module_logger import ModuleLogger
|
||||
|
||||
|
||||
class ApiKeyIntrospectHandler:
|
||||
"""
|
||||
Freeleaps Auth Service API Key Introspect Handle
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.module_logger = ModuleLogger(sender_id=ApiKeyIntrospectHandler.__name__)
|
||||
self.auth_service_base = app_settings.AUTH_SERVICE_ENDPOINT
|
||||
|
||||
|
||||
@log_entry_exit_async
|
||||
async def api_key_introspect(self, api_key: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Introspect API key by calling external auth service
|
||||
|
||||
Args:
|
||||
api_key: The API key to introspect
|
||||
|
||||
Returns:
|
||||
Dictionary containing the API key details
|
||||
|
||||
Raises:
|
||||
HTTPException: If the external service call fails
|
||||
"""
|
||||
api_url = self.auth_service_base + "introspect_api_key"
|
||||
await self.module_logger.log_info(f"Starting API Key validation for key: {api_key[:8]}...")
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
api_url,
|
||||
json={"api_key": api_key}
|
||||
)
|
||||
|
||||
if response.status_code != 200:
|
||||
error_detail = response.json() if response.content else {"error": "Unknown error"}
|
||||
await self.module_logger.log_error(f"API Key validation failed - Status: {response.status_code}, Error: {error_detail}")
|
||||
raise HTTPException(
|
||||
status_code=response.status_code,
|
||||
detail=error_detail
|
||||
)
|
||||
|
||||
validation_result = response.json()
|
||||
await self.module_logger.log_info(f"API Key validation successful - Active: {validation_result.get('active', False)}")
|
||||
return validation_result
|
||||
@ -55,7 +55,7 @@ class EmailSendStatusDoc(Document):
|
||||
indexes = [
|
||||
"email_id",
|
||||
"tenant_id"
|
||||
]
|
||||
]
|
||||
|
||||
class EmailTrackingDoc(Document):
|
||||
email_id: str
|
||||
@ -83,7 +83,7 @@ class EmailTrackingDoc(Document):
|
||||
indexes = [
|
||||
"email_id",
|
||||
"tenant_id"
|
||||
]
|
||||
]
|
||||
|
||||
class EmailBounceDoc(Document):
|
||||
email: str
|
||||
@ -104,5 +104,25 @@ class EmailBounceDoc(Document):
|
||||
indexes = [
|
||||
"email",
|
||||
"tenant_id"
|
||||
]
|
||||
|
||||
]
|
||||
|
||||
class UsageLogDoc(Document):
|
||||
timestamp: datetime = datetime.utcnow() # timestamp
|
||||
tenant_id: str # tenant id
|
||||
operation: str # operation type
|
||||
request_id: str # request id # TODO: use true one
|
||||
units: int # units
|
||||
status: str # operation status
|
||||
latency_ms: int # latency time(milliseconds)
|
||||
bytes_in: int # input bytes
|
||||
bytes_out: int # output bytes
|
||||
key_id: Optional[str] = None # API Key ID
|
||||
extra: dict = {} # extra information
|
||||
|
||||
class Settings:
|
||||
name = "usage_log_doc"
|
||||
indexes = [
|
||||
"tenant_id",
|
||||
"request_id",
|
||||
"key_id"
|
||||
]
|
||||
@ -25,6 +25,8 @@ class AppSettings(BaseSettings):
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600
|
||||
REFRESH_TOKEN_EXPIRE_DAYS: int = 1
|
||||
|
||||
AUTH_SERVICE_ENDPOINT: str = ""
|
||||
|
||||
SENDGRID_API_KEY: str = ""
|
||||
|
||||
|
||||
|
||||
@ -81,5 +81,5 @@ class JsonSink:
|
||||
exc_type, exc_value, exc_tb = record["exception"]
|
||||
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
|
||||
|
||||
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
|
||||
self.log_file.write(json.dumps(log_entry, ensure_ascii=False, default=str) + "\n")
|
||||
self.log_file.flush()
|
||||
|
||||
@ -27,10 +27,13 @@ export DOCKER_BACKEND_LOG_HOME=$DOCKER_BACKEND_HOME/log
|
||||
export RABBITMQ_HOST=localhost
|
||||
export RABBITMQ_PORT=5672
|
||||
|
||||
export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/keys/
|
||||
export AUTH_SERVICE_PORT=9000
|
||||
|
||||
# for local environment
|
||||
export MONGODB_URI=mongodb://localhost:27017/
|
||||
# connectivity from local to alpha
|
||||
#export MONGODB_URI=mongodb+srv://jetli:8IHKx6dZK8BfugGp@freeleaps2.hanbj.mongodb.net/
|
||||
export MONGODB_NAME=interview
|
||||
export FREELEAPS_ENV=local
|
||||
export LOG_BASE_PATH=./log
|
||||
export LOG_BASE_PATH=./log
|
||||
@ -11,6 +11,7 @@ from webapi.providers import message_queue
|
||||
from webapi.providers import exception_handler
|
||||
from webapi.providers import probes
|
||||
from webapi.providers import metrics
|
||||
from webapi.providers import middleware
|
||||
from .freeleaps_app import FreeleapsApp
|
||||
from common.config.app_settings import app_settings
|
||||
|
||||
@ -22,6 +23,11 @@ def create_app() -> FastAPI:
|
||||
app = FreeleapsApp()
|
||||
|
||||
register_logger()
|
||||
|
||||
# 1. Register middleware firstly
|
||||
register(app, middleware)
|
||||
|
||||
# 2. Register other providers
|
||||
register(app, exception_handler)
|
||||
# Register probe APIs if enabled
|
||||
if app_settings.PROBES_ENABLED:
|
||||
|
||||
3
apps/notification/webapi/middleware/__init__.py
Normal file
3
apps/notification/webapi/middleware/__init__.py
Normal file
@ -0,0 +1,3 @@
|
||||
from .freeleaps_auth_middleware import FreeleapsAuthMiddleware
|
||||
|
||||
__all__ = ['FreeleapsAuthMiddleware']
|
||||
192
apps/notification/webapi/middleware/freeleaps_auth_middleware.py
Normal file
192
apps/notification/webapi/middleware/freeleaps_auth_middleware.py
Normal file
@ -0,0 +1,192 @@
|
||||
import httpx
|
||||
import asyncio
|
||||
import time
|
||||
import contextvars
|
||||
from datetime import datetime
|
||||
from starlette.requests import Request
|
||||
from fastapi import HTTPException, Response
|
||||
from typing import Dict, Any, Optional
|
||||
from common.log.module_logger import ModuleLogger
|
||||
|
||||
from backend.models.models import UsageLogDoc
|
||||
from backend.infra.api_key_introspect_handler import ApiKeyIntrospectHandler
|
||||
|
||||
# Define context data class
|
||||
class RequestContext:
|
||||
def __init__(self, tenant_name: str = None, product_id: str = None, key_id: str = None):
|
||||
self.tenant_name = tenant_name
|
||||
self.product_id = product_id
|
||||
self.key_id = key_id
|
||||
|
||||
def __repr__(self):
|
||||
return f"RequestContext(tenant_name='{self.tenant_name}', product_id='{self.product_id}', key_id='{self.key_id}')"
|
||||
|
||||
# Create context variable, store RequestContext object
|
||||
request_context_var = contextvars.ContextVar('request_context', default=RequestContext())
|
||||
|
||||
class FreeleapsAuthMiddleware:
|
||||
"""
|
||||
Notification service API Key middleware
|
||||
"""
|
||||
|
||||
def __init__(self, app):
|
||||
self.app = app
|
||||
self.api_key_introspect_handler = ApiKeyIntrospectHandler()
|
||||
self.module_logger = ModuleLogger(sender_id=FreeleapsAuthMiddleware)
|
||||
|
||||
async def __call__(self, scope, receive, send):
|
||||
"""
|
||||
Middleware main function, execute before and after request processing
|
||||
"""
|
||||
if scope["type"] != "http":
|
||||
await self.app(scope, receive, send)
|
||||
return
|
||||
|
||||
request = Request(scope, receive)
|
||||
start_time = time.time()
|
||||
validation_result = None
|
||||
|
||||
try:
|
||||
# 1. Skip paths that do not need validation
|
||||
if self._should_skip_validation(request.url.path):
|
||||
await self.module_logger.log_info(f"Path skipped validation: {request.url.path}")
|
||||
await self.app(scope, receive, send)
|
||||
return
|
||||
|
||||
# 2. Extract API Key from request header
|
||||
api_key = request.headers.get("X-API-KEY")
|
||||
# if the API_KEY field is empty, the request can be processed directly without validation.
|
||||
# for compatibility
|
||||
if not api_key or api_key == "":
|
||||
await self.module_logger.log_info(f"API Key is empty: {request.url.path}")
|
||||
await self.app(scope, receive, send)
|
||||
return
|
||||
|
||||
# 3. Call freeleaps_auth to validate API Key
|
||||
validation_result = await self.api_key_introspect_handler.api_key_introspect(api_key)
|
||||
|
||||
# 4. Store validation result in contextvars for later use
|
||||
request_context = RequestContext(
|
||||
tenant_name=validation_result.get("tenant_name"),
|
||||
product_id=validation_result.get("product_id"),
|
||||
key_id=validation_result.get("key_id")
|
||||
)
|
||||
request_context_var.set(request_context)
|
||||
|
||||
# 6. Process request and capture response
|
||||
response_captured = None
|
||||
|
||||
async def send_wrapper(message):
|
||||
nonlocal response_captured
|
||||
if message["type"] == "http.response.start":
|
||||
# Convert bytes headers to string headers
|
||||
headers = {}
|
||||
for header_name, header_value in message.get("headers", []):
|
||||
if isinstance(header_name, bytes):
|
||||
header_name = header_name.decode('latin-1')
|
||||
if isinstance(header_value, bytes):
|
||||
header_value = header_value.decode('latin-1')
|
||||
headers[header_name] = header_value
|
||||
|
||||
response_captured = Response(
|
||||
status_code=message["status"],
|
||||
headers=headers,
|
||||
media_type="application/json"
|
||||
)
|
||||
await send(message)
|
||||
|
||||
await self.app(scope, receive, send_wrapper)
|
||||
|
||||
# 7. Record usage log after request processing
|
||||
if response_captured:
|
||||
await self._log_usage(validation_result, request, response_captured, start_time)
|
||||
|
||||
except HTTPException as http_exc:
|
||||
# Pass through HTTP exceptions (401, 403, etc.) from auth service
|
||||
await self.module_logger.log_info(f"API Key validation failed: {http_exc.status_code} - {http_exc.detail}")
|
||||
response = Response(
|
||||
status_code=http_exc.status_code,
|
||||
content=f'{{"error": "Authentication failed", "message": "{str(http_exc.detail)}"}}',
|
||||
media_type="application/json"
|
||||
)
|
||||
await response(scope, receive, send)
|
||||
except Exception as e:
|
||||
await self.module_logger.log_error(f"Middleware error: {str(e)}")
|
||||
response = Response(
|
||||
status_code=500,
|
||||
content=f'{{"error": "Internal error", "message": "Failed to process request", "details": "{str(e)}"}}',
|
||||
media_type="application/json"
|
||||
)
|
||||
await response(scope, receive, send)
|
||||
|
||||
def _should_skip_validation(self, path: str) -> bool:
|
||||
"""
|
||||
Check if the path should be skipped for validation
|
||||
"""
|
||||
skip_paths = [
|
||||
"/api/_/healthz", # Health check endpoint
|
||||
"/api/_/readyz", # Readiness check endpoint
|
||||
"/api/_/livez", # Liveness check endpoint
|
||||
"/metrics", # Metrics endpoint
|
||||
"/docs", # API documentation
|
||||
"/openapi.json", # OpenAPI specification
|
||||
"/favicon.ico" # Website icon
|
||||
]
|
||||
|
||||
# Check exact match for root path
|
||||
if path == "/":
|
||||
return True
|
||||
|
||||
# Check startswith for other paths
|
||||
return any(path.startswith(skip_path) for skip_path in skip_paths)
|
||||
|
||||
async def _log_usage(self, validation_result: Dict[str, Any], request: Request,
|
||||
response: Response, start_time: float) -> None:
|
||||
"""
|
||||
Record API usage log
|
||||
"""
|
||||
try:
|
||||
# calculate processing time
|
||||
process_time = (time.time() - start_time) * 1000
|
||||
|
||||
# get request body size
|
||||
try:
|
||||
request_body = await request.body()
|
||||
bytes_in = len(request_body) if request.method in ["POST", "PUT", "PATCH"] else 0
|
||||
except Exception:
|
||||
bytes_in = 0
|
||||
|
||||
bytes_out = 0
|
||||
if hasattr(response, 'headers'):
|
||||
content_length = response.headers.get('content-length')
|
||||
if content_length:
|
||||
bytes_out = int(content_length)
|
||||
|
||||
# create usage log document
|
||||
usage_log_doc = UsageLogDoc(
|
||||
timestamp=datetime.utcnow(),
|
||||
tenant_id=validation_result.get("tenant_name"),
|
||||
operation=f"{request.method} {request.url.path}",
|
||||
request_id=request.headers.get("X-Request-ID", "unknown"),
|
||||
units=1, # TODO: adjust according to business logic
|
||||
status="success" if response.status_code < 400 else "error",
|
||||
latency_ms=int(process_time),
|
||||
bytes_in=bytes_in,
|
||||
bytes_out=bytes_out,
|
||||
key_id=validation_result.get("key_id"),
|
||||
extra={
|
||||
"tenant_name": request_context_var.get().tenant_name,
|
||||
"product_id": request_context_var.get().product_id,
|
||||
"scopes": validation_result.get("scopes"),
|
||||
"user_agent": request.headers.get("User-Agent"),
|
||||
"ip_address": request.client.host if request.client else "unknown",
|
||||
"response_status": response.status_code
|
||||
}
|
||||
)
|
||||
|
||||
# save to database
|
||||
await usage_log_doc.save()
|
||||
await self.module_logger.log_info(f"API Usage logged: {usage_log_doc.operation} for tenant {usage_log_doc.tenant_id}")
|
||||
|
||||
except Exception as e:
|
||||
await self.module_logger.log_error(f"Failed to log usage: {str(e)}")
|
||||
@ -1,15 +1,12 @@
|
||||
from webapi.config.site_settings import site_settings
|
||||
from beanie import init_beanie
|
||||
from motor.motor_asyncio import AsyncIOMotorClient
|
||||
from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc
|
||||
from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc
|
||||
import os
|
||||
|
||||
# MongoDB config
|
||||
# TODO: for non-local environment, use the following config
|
||||
#MONGODB_URI = os.getenv('MONGODB_URI', 'mongodb+srv://jetli:8IHKx6dZK8BfugGp@freeleaps2.hanbj.mongodb.net/')
|
||||
#MONGODB_NAME = os.getenv('MONGODB_NAME', 'freeleaps2')
|
||||
MONGODB_URI = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/')
|
||||
MONGODB_NAME = os.getenv('MONGODB_NAME', 'interview')
|
||||
MONGODB_URI = os.getenv('MONGODB_URI')
|
||||
MONGODB_NAME = os.getenv('MONGODB_NAME')
|
||||
|
||||
# create MongoDB client
|
||||
client = AsyncIOMotorClient(
|
||||
@ -26,7 +23,8 @@ document_models = [
|
||||
EmailSenderDoc,
|
||||
EmailSendStatusDoc,
|
||||
EmailTrackingDoc,
|
||||
EmailBounceDoc
|
||||
EmailBounceDoc,
|
||||
UsageLogDoc
|
||||
]
|
||||
|
||||
def register(app):
|
||||
@ -40,13 +38,7 @@ def register(app):
|
||||
|
||||
async def initiate_database():
|
||||
"""initiate Beanie database connection"""
|
||||
try:
|
||||
await init_beanie(
|
||||
database=client[MONGODB_NAME],
|
||||
document_models=document_models
|
||||
)
|
||||
print(f"✅ database initialized successfully: {MONGODB_NAME}")
|
||||
print(f" URI: {MONGODB_URI}")
|
||||
except Exception as e:
|
||||
print(f"❌ database initialization failed: {e}")
|
||||
raise
|
||||
await init_beanie(
|
||||
database=client[MONGODB_NAME],
|
||||
document_models=document_models
|
||||
)
|
||||
9
apps/notification/webapi/providers/middleware.py
Normal file
9
apps/notification/webapi/providers/middleware.py
Normal file
@ -0,0 +1,9 @@
|
||||
from webapi.middleware.freeleaps_auth_middleware import FreeleapsAuthMiddleware
|
||||
|
||||
|
||||
def register(app):
|
||||
"""
|
||||
Register middleware to FastAPI application
|
||||
"""
|
||||
# Register API Key middleware
|
||||
app.add_middleware(FreeleapsAuthMiddleware)
|
||||
Loading…
Reference in New Issue
Block a user