diff --git a/apps/notification/.env b/apps/notification/.env index 2403990..5070192 100644 --- a/apps/notification/.env +++ b/apps/notification/.env @@ -28,4 +28,5 @@ export RABBITMQ_PORT=5672 export FREELEAPS_ENV=local export LOG_BASE_PATH=${CODEBASE_ROOT}/log - +export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/keys/ +export AUTH_SERVICE_PORT=9000 diff --git a/apps/notification/Dockerfile b/apps/notification/Dockerfile index b246de3..dce32fc 100644 --- a/apps/notification/Dockerfile +++ b/apps/notification/Dockerfile @@ -20,6 +20,9 @@ ENV SENDGRID_API_KEY=SG.OrxsRI0IRaOxkd7xTfb8SA.J8CfOXsJy3vrJgTubbLmZOR6ii7z7m7C9 ENV TWILIO_ACCOUNT_SID=ACf8c9283a6acda060258eadb29be58bc8 ENV TWILIO_AUTH_TOKEN=120165c0550111ddfd58efc97dafc2fe +# Freeleaps Auth Config +ENV AUTH_SERVICE_ENDPOINT="" + #log_settings ENV LOG_BASE_PATH=$CONTAINER_APP_ROOT/log/$APP_NAME ENV BACKEND_LOG_FILE_NAME=$APP_NAME diff --git a/apps/notification/backend/infra/api_key_introspect_handler.py b/apps/notification/backend/infra/api_key_introspect_handler.py new file mode 100644 index 0000000..ceb111e --- /dev/null +++ b/apps/notification/backend/infra/api_key_introspect_handler.py @@ -0,0 +1,52 @@ +from typing import Dict, Any +import httpx +from fastapi import HTTPException +from common.config.app_settings import app_settings +from common.log.log_utils import log_entry_exit_async +from common.log.module_logger import ModuleLogger + + +class ApiKeyIntrospectHandler: + """ + Freeleaps Auth Service API Key Introspect Handle + """ + + def __init__(self) -> None: + self.module_logger = ModuleLogger(sender_id=ApiKeyIntrospectHandler.__name__) + self.auth_service_base = app_settings.AUTH_SERVICE_ENDPOINT + + + @log_entry_exit_async + async def api_key_introspect(self, api_key: str) -> Dict[str, Any]: + """ + Introspect API key by calling external auth service + + Args: + api_key: The API key to introspect + + Returns: + Dictionary containing the API key details + + Raises: + HTTPException: If the external service call fails + """ + api_url = self.auth_service_base + "introspect_api_key" + await self.module_logger.log_info(f"Starting API Key validation for key: {api_key[:8]}...") + + async with httpx.AsyncClient() as client: + response = await client.post( + api_url, + json={"api_key": api_key} + ) + + if response.status_code != 200: + error_detail = response.json() if response.content else {"error": "Unknown error"} + await self.module_logger.log_error(f"API Key validation failed - Status: {response.status_code}, Error: {error_detail}") + raise HTTPException( + status_code=response.status_code, + detail=error_detail + ) + + validation_result = response.json() + await self.module_logger.log_info(f"API Key validation successful - Active: {validation_result.get('active', False)}") + return validation_result diff --git a/apps/notification/backend/models/models.py b/apps/notification/backend/models/models.py index 542aa64..f1a02d1 100644 --- a/apps/notification/backend/models/models.py +++ b/apps/notification/backend/models/models.py @@ -55,7 +55,7 @@ class EmailSendStatusDoc(Document): indexes = [ "email_id", "tenant_id" - ] + ] class EmailTrackingDoc(Document): email_id: str @@ -83,7 +83,7 @@ class EmailTrackingDoc(Document): indexes = [ "email_id", "tenant_id" - ] + ] class EmailBounceDoc(Document): email: str @@ -104,5 +104,25 @@ class EmailBounceDoc(Document): indexes = [ "email", "tenant_id" - ] - \ No newline at end of file + ] + +class UsageLogDoc(Document): + timestamp: datetime = datetime.utcnow() # timestamp + tenant_id: str # tenant id + operation: str # operation type + request_id: str # request id # TODO: use true one + units: int # units + status: str # operation status + latency_ms: int # latency time(milliseconds) + bytes_in: int # input bytes + bytes_out: int # output bytes + key_id: Optional[str] = None # API Key ID + extra: dict = {} # extra information + + class Settings: + name = "usage_log_doc" + indexes = [ + "tenant_id", + "request_id", + "key_id" + ] \ No newline at end of file diff --git a/apps/notification/common/config/app_settings.py b/apps/notification/common/config/app_settings.py index b1e3dc5..5346dfe 100644 --- a/apps/notification/common/config/app_settings.py +++ b/apps/notification/common/config/app_settings.py @@ -25,6 +25,8 @@ class AppSettings(BaseSettings): ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600 REFRESH_TOKEN_EXPIRE_DAYS: int = 1 + AUTH_SERVICE_ENDPOINT: str = "" + SENDGRID_API_KEY: str = "" diff --git a/apps/notification/common/log/json_sink.py b/apps/notification/common/log/json_sink.py index a798156..867ef42 100644 --- a/apps/notification/common/log/json_sink.py +++ b/apps/notification/common/log/json_sink.py @@ -81,5 +81,5 @@ class JsonSink: exc_type, exc_value, exc_tb = record["exception"] log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb) - self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n") + self.log_file.write(json.dumps(log_entry, ensure_ascii=False, default=str) + "\n") self.log_file.flush() diff --git a/apps/notification/tests/integration_tests/local.env b/apps/notification/tests/integration_tests/local.env index 93d5880..f5b5a2b 100644 --- a/apps/notification/tests/integration_tests/local.env +++ b/apps/notification/tests/integration_tests/local.env @@ -27,10 +27,13 @@ export DOCKER_BACKEND_LOG_HOME=$DOCKER_BACKEND_HOME/log export RABBITMQ_HOST=localhost export RABBITMQ_PORT=5672 +export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/keys/ +export AUTH_SERVICE_PORT=9000 + # for local environment export MONGODB_URI=mongodb://localhost:27017/ # connectivity from local to alpha #export MONGODB_URI=mongodb+srv://jetli:8IHKx6dZK8BfugGp@freeleaps2.hanbj.mongodb.net/ export MONGODB_NAME=interview export FREELEAPS_ENV=local -export LOG_BASE_PATH=./log +export LOG_BASE_PATH=./log \ No newline at end of file diff --git a/apps/notification/webapi/bootstrap/application.py b/apps/notification/webapi/bootstrap/application.py index d98218d..65adca9 100644 --- a/apps/notification/webapi/bootstrap/application.py +++ b/apps/notification/webapi/bootstrap/application.py @@ -11,6 +11,7 @@ from webapi.providers import message_queue from webapi.providers import exception_handler from webapi.providers import probes from webapi.providers import metrics +from webapi.providers import middleware from .freeleaps_app import FreeleapsApp from common.config.app_settings import app_settings @@ -22,6 +23,11 @@ def create_app() -> FastAPI: app = FreeleapsApp() register_logger() + + # 1. Register middleware firstly + register(app, middleware) + + # 2. Register other providers register(app, exception_handler) # Register probe APIs if enabled if app_settings.PROBES_ENABLED: diff --git a/apps/notification/webapi/middleware/__init__.py b/apps/notification/webapi/middleware/__init__.py new file mode 100644 index 0000000..5700ce5 --- /dev/null +++ b/apps/notification/webapi/middleware/__init__.py @@ -0,0 +1,3 @@ +from .freeleaps_auth_middleware import FreeleapsAuthMiddleware + +__all__ = ['FreeleapsAuthMiddleware'] \ No newline at end of file diff --git a/apps/notification/webapi/middleware/freeleaps_auth_middleware.py b/apps/notification/webapi/middleware/freeleaps_auth_middleware.py new file mode 100644 index 0000000..5ef7188 --- /dev/null +++ b/apps/notification/webapi/middleware/freeleaps_auth_middleware.py @@ -0,0 +1,192 @@ +import httpx +import asyncio +import time +import contextvars +from datetime import datetime +from starlette.requests import Request +from fastapi import HTTPException, Response +from typing import Dict, Any, Optional +from common.log.module_logger import ModuleLogger + +from backend.models.models import UsageLogDoc +from backend.infra.api_key_introspect_handler import ApiKeyIntrospectHandler + +# Define context data class +class RequestContext: + def __init__(self, tenant_name: str = None, product_id: str = None, key_id: str = None): + self.tenant_name = tenant_name + self.product_id = product_id + self.key_id = key_id + + def __repr__(self): + return f"RequestContext(tenant_name='{self.tenant_name}', product_id='{self.product_id}', key_id='{self.key_id}')" + +# Create context variable, store RequestContext object +request_context_var = contextvars.ContextVar('request_context', default=RequestContext()) + +class FreeleapsAuthMiddleware: + """ + Notification service API Key middleware + """ + + def __init__(self, app): + self.app = app + self.api_key_introspect_handler = ApiKeyIntrospectHandler() + self.module_logger = ModuleLogger(sender_id=FreeleapsAuthMiddleware) + + async def __call__(self, scope, receive, send): + """ + Middleware main function, execute before and after request processing + """ + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + request = Request(scope, receive) + start_time = time.time() + validation_result = None + + try: + # 1. Skip paths that do not need validation + if self._should_skip_validation(request.url.path): + await self.module_logger.log_info(f"Path skipped validation: {request.url.path}") + await self.app(scope, receive, send) + return + + # 2. Extract API Key from request header + api_key = request.headers.get("X-API-KEY") + # if the API_KEY field is empty, the request can be processed directly without validation. + # for compatibility + if not api_key or api_key == "": + await self.module_logger.log_info(f"API Key is empty: {request.url.path}") + await self.app(scope, receive, send) + return + + # 3. Call freeleaps_auth to validate API Key + validation_result = await self.api_key_introspect_handler.api_key_introspect(api_key) + + # 4. Store validation result in contextvars for later use + request_context = RequestContext( + tenant_name=validation_result.get("tenant_name"), + product_id=validation_result.get("product_id"), + key_id=validation_result.get("key_id") + ) + request_context_var.set(request_context) + + # 6. Process request and capture response + response_captured = None + + async def send_wrapper(message): + nonlocal response_captured + if message["type"] == "http.response.start": + # Convert bytes headers to string headers + headers = {} + for header_name, header_value in message.get("headers", []): + if isinstance(header_name, bytes): + header_name = header_name.decode('latin-1') + if isinstance(header_value, bytes): + header_value = header_value.decode('latin-1') + headers[header_name] = header_value + + response_captured = Response( + status_code=message["status"], + headers=headers, + media_type="application/json" + ) + await send(message) + + await self.app(scope, receive, send_wrapper) + + # 7. Record usage log after request processing + if response_captured: + await self._log_usage(validation_result, request, response_captured, start_time) + + except HTTPException as http_exc: + # Pass through HTTP exceptions (401, 403, etc.) from auth service + await self.module_logger.log_info(f"API Key validation failed: {http_exc.status_code} - {http_exc.detail}") + response = Response( + status_code=http_exc.status_code, + content=f'{{"error": "Authentication failed", "message": "{str(http_exc.detail)}"}}', + media_type="application/json" + ) + await response(scope, receive, send) + except Exception as e: + await self.module_logger.log_error(f"Middleware error: {str(e)}") + response = Response( + status_code=500, + content=f'{{"error": "Internal error", "message": "Failed to process request", "details": "{str(e)}"}}', + media_type="application/json" + ) + await response(scope, receive, send) + + def _should_skip_validation(self, path: str) -> bool: + """ + Check if the path should be skipped for validation + """ + skip_paths = [ + "/api/_/healthz", # Health check endpoint + "/api/_/readyz", # Readiness check endpoint + "/api/_/livez", # Liveness check endpoint + "/metrics", # Metrics endpoint + "/docs", # API documentation + "/openapi.json", # OpenAPI specification + "/favicon.ico" # Website icon + ] + + # Check exact match for root path + if path == "/": + return True + + # Check startswith for other paths + return any(path.startswith(skip_path) for skip_path in skip_paths) + + async def _log_usage(self, validation_result: Dict[str, Any], request: Request, + response: Response, start_time: float) -> None: + """ + Record API usage log + """ + try: + # calculate processing time + process_time = (time.time() - start_time) * 1000 + + # get request body size + try: + request_body = await request.body() + bytes_in = len(request_body) if request.method in ["POST", "PUT", "PATCH"] else 0 + except Exception: + bytes_in = 0 + + bytes_out = 0 + if hasattr(response, 'headers'): + content_length = response.headers.get('content-length') + if content_length: + bytes_out = int(content_length) + + # create usage log document + usage_log_doc = UsageLogDoc( + timestamp=datetime.utcnow(), + tenant_id=validation_result.get("tenant_name"), + operation=f"{request.method} {request.url.path}", + request_id=request.headers.get("X-Request-ID", "unknown"), + units=1, # TODO: adjust according to business logic + status="success" if response.status_code < 400 else "error", + latency_ms=int(process_time), + bytes_in=bytes_in, + bytes_out=bytes_out, + key_id=validation_result.get("key_id"), + extra={ + "tenant_name": request_context_var.get().tenant_name, + "product_id": request_context_var.get().product_id, + "scopes": validation_result.get("scopes"), + "user_agent": request.headers.get("User-Agent"), + "ip_address": request.client.host if request.client else "unknown", + "response_status": response.status_code + } + ) + + # save to database + await usage_log_doc.save() + await self.module_logger.log_info(f"API Usage logged: {usage_log_doc.operation} for tenant {usage_log_doc.tenant_id}") + + except Exception as e: + await self.module_logger.log_error(f"Failed to log usage: {str(e)}") diff --git a/apps/notification/webapi/providers/database.py b/apps/notification/webapi/providers/database.py index 00e0d66..8a6e350 100644 --- a/apps/notification/webapi/providers/database.py +++ b/apps/notification/webapi/providers/database.py @@ -1,15 +1,12 @@ from webapi.config.site_settings import site_settings from beanie import init_beanie from motor.motor_asyncio import AsyncIOMotorClient -from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc +from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc import os # MongoDB config -# TODO: for non-local environment, use the following config -#MONGODB_URI = os.getenv('MONGODB_URI', 'mongodb+srv://jetli:8IHKx6dZK8BfugGp@freeleaps2.hanbj.mongodb.net/') -#MONGODB_NAME = os.getenv('MONGODB_NAME', 'freeleaps2') -MONGODB_URI = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/') -MONGODB_NAME = os.getenv('MONGODB_NAME', 'interview') +MONGODB_URI = os.getenv('MONGODB_URI') +MONGODB_NAME = os.getenv('MONGODB_NAME') # create MongoDB client client = AsyncIOMotorClient( @@ -26,7 +23,8 @@ document_models = [ EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, - EmailBounceDoc + EmailBounceDoc, + UsageLogDoc ] def register(app): @@ -40,13 +38,7 @@ def register(app): async def initiate_database(): """initiate Beanie database connection""" - try: - await init_beanie( - database=client[MONGODB_NAME], - document_models=document_models - ) - print(f"✅ database initialized successfully: {MONGODB_NAME}") - print(f" URI: {MONGODB_URI}") - except Exception as e: - print(f"❌ database initialization failed: {e}") - raise + await init_beanie( + database=client[MONGODB_NAME], + document_models=document_models + ) \ No newline at end of file diff --git a/apps/notification/webapi/providers/middleware.py b/apps/notification/webapi/providers/middleware.py new file mode 100644 index 0000000..43df09b --- /dev/null +++ b/apps/notification/webapi/providers/middleware.py @@ -0,0 +1,9 @@ +from webapi.middleware.freeleaps_auth_middleware import FreeleapsAuthMiddleware + + +def register(app): + """ + Register middleware to FastAPI application + """ + # Register API Key middleware + app.add_middleware(FreeleapsAuthMiddleware) \ No newline at end of file