diff --git a/apps/notification/.env b/apps/notification/.env
index 2403990..5070192 100644
--- a/apps/notification/.env
+++ b/apps/notification/.env
@@ -28,4 +28,5 @@ export RABBITMQ_PORT=5672
export FREELEAPS_ENV=local
export LOG_BASE_PATH=${CODEBASE_ROOT}/log
-
+export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/keys/
+export AUTH_SERVICE_PORT=9000
diff --git a/apps/notification/Dockerfile b/apps/notification/Dockerfile
index b246de3..dce32fc 100644
--- a/apps/notification/Dockerfile
+++ b/apps/notification/Dockerfile
@@ -20,6 +20,9 @@ ENV SENDGRID_API_KEY=SG.OrxsRI0IRaOxkd7xTfb8SA.J8CfOXsJy3vrJgTubbLmZOR6ii7z7m7C9
ENV TWILIO_ACCOUNT_SID=ACf8c9283a6acda060258eadb29be58bc8
ENV TWILIO_AUTH_TOKEN=120165c0550111ddfd58efc97dafc2fe
+# Freeleaps Auth Config
+ENV AUTH_SERVICE_ENDPOINT=""
+
#log_settings
ENV LOG_BASE_PATH=$CONTAINER_APP_ROOT/log/$APP_NAME
ENV BACKEND_LOG_FILE_NAME=$APP_NAME
diff --git a/apps/notification/backend/infra/api_key_introspect_handler.py b/apps/notification/backend/infra/api_key_introspect_handler.py
new file mode 100644
index 0000000..ceb111e
--- /dev/null
+++ b/apps/notification/backend/infra/api_key_introspect_handler.py
@@ -0,0 +1,52 @@
+from typing import Dict, Any
+import httpx
+from fastapi import HTTPException
+from common.config.app_settings import app_settings
+from common.log.log_utils import log_entry_exit_async
+from common.log.module_logger import ModuleLogger
+
+
+class ApiKeyIntrospectHandler:
+ """
+ Freeleaps Auth Service API Key Introspect Handle
+ """
+
+ def __init__(self) -> None:
+ self.module_logger = ModuleLogger(sender_id=ApiKeyIntrospectHandler.__name__)
+ self.auth_service_base = app_settings.AUTH_SERVICE_ENDPOINT
+
+
+ @log_entry_exit_async
+ async def api_key_introspect(self, api_key: str) -> Dict[str, Any]:
+ """
+ Introspect API key by calling external auth service
+
+ Args:
+ api_key: The API key to introspect
+
+ Returns:
+ Dictionary containing the API key details
+
+ Raises:
+ HTTPException: If the external service call fails
+ """
+ api_url = self.auth_service_base + "introspect_api_key"
+ await self.module_logger.log_info(f"Starting API Key validation for key: {api_key[:8]}...")
+
+ async with httpx.AsyncClient() as client:
+ response = await client.post(
+ api_url,
+ json={"api_key": api_key}
+ )
+
+ if response.status_code != 200:
+ error_detail = response.json() if response.content else {"error": "Unknown error"}
+ await self.module_logger.log_error(f"API Key validation failed - Status: {response.status_code}, Error: {error_detail}")
+ raise HTTPException(
+ status_code=response.status_code,
+ detail=error_detail
+ )
+
+ validation_result = response.json()
+ await self.module_logger.log_info(f"API Key validation successful - Active: {validation_result.get('active', False)}")
+ return validation_result
diff --git a/apps/notification/backend/infra/rabbitmq/async_subscriber.py b/apps/notification/backend/infra/rabbitmq/async_subscriber.py
index 0820d31..66dc1d3 100644
--- a/apps/notification/backend/infra/rabbitmq/async_subscriber.py
+++ b/apps/notification/backend/infra/rabbitmq/async_subscriber.py
@@ -39,7 +39,7 @@ class AsyncMQSubscriber(AsyncMQClient):
try:
await self.bind(max_retries=5, event_loop=event_loop)
await self.queue.consume(
- no_ack=False, exclusive=True, callback=self.process_incoming_message
+ no_ack=False, exclusive=False, callback=self.process_incoming_message
)
break # Exit loop if subscription is successful
except Exception as e:
diff --git a/apps/notification/backend/models/models.py b/apps/notification/backend/models/models.py
index 542aa64..f1a02d1 100644
--- a/apps/notification/backend/models/models.py
+++ b/apps/notification/backend/models/models.py
@@ -55,7 +55,7 @@ class EmailSendStatusDoc(Document):
indexes = [
"email_id",
"tenant_id"
- ]
+ ]
class EmailTrackingDoc(Document):
email_id: str
@@ -83,7 +83,7 @@ class EmailTrackingDoc(Document):
indexes = [
"email_id",
"tenant_id"
- ]
+ ]
class EmailBounceDoc(Document):
email: str
@@ -104,5 +104,25 @@ class EmailBounceDoc(Document):
indexes = [
"email",
"tenant_id"
- ]
-
\ No newline at end of file
+ ]
+
+class UsageLogDoc(Document):
+ timestamp: datetime = datetime.utcnow() # timestamp
+ tenant_id: str # tenant id
+ operation: str # operation type
+ request_id: str # request id # TODO: use true one
+ units: int # units
+ status: str # operation status
+ latency_ms: int # latency time(milliseconds)
+ bytes_in: int # input bytes
+ bytes_out: int # output bytes
+ key_id: Optional[str] = None # API Key ID
+ extra: dict = {} # extra information
+
+ class Settings:
+ name = "usage_log_doc"
+ indexes = [
+ "tenant_id",
+ "request_id",
+ "key_id"
+ ]
\ No newline at end of file
diff --git a/apps/notification/common/config/app_settings.py b/apps/notification/common/config/app_settings.py
index b1e3dc5..5346dfe 100644
--- a/apps/notification/common/config/app_settings.py
+++ b/apps/notification/common/config/app_settings.py
@@ -25,6 +25,8 @@ class AppSettings(BaseSettings):
ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600
REFRESH_TOKEN_EXPIRE_DAYS: int = 1
+ AUTH_SERVICE_ENDPOINT: str = ""
+
SENDGRID_API_KEY: str = ""
diff --git a/apps/notification/common/log/json_sink.py b/apps/notification/common/log/json_sink.py
index a798156..867ef42 100644
--- a/apps/notification/common/log/json_sink.py
+++ b/apps/notification/common/log/json_sink.py
@@ -81,5 +81,5 @@ class JsonSink:
exc_type, exc_value, exc_tb = record["exception"]
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
- self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
+ self.log_file.write(json.dumps(log_entry, ensure_ascii=False, default=str) + "\n")
self.log_file.flush()
diff --git a/apps/notification/tests/integration_tests/create_global_templates.py b/apps/notification/tests/integration_tests/create_global_templates.py
index c71e73b..3501cd7 100644
--- a/apps/notification/tests/integration_tests/create_global_templates.py
+++ b/apps/notification/tests/integration_tests/create_global_templates.py
@@ -995,6 +995,93 @@ class GlobalTemplateCreator:
return success_cn and success_en
+ async def create_job_opportunity_template(self):
+ """create job opportunity notification template"""
+ print("\n📝 create job opportunity notification template...")
+ print("🔗 Deep link format: {{job_deep_link_url}}")
+ # TODO: the deep link should be modified and updated
+ print(" Example: https://freeleaps-alpha.com/jobs/{{job_id}}?source=email&user={{user_id}}&token={{auth_token}}")
+ print(" This will redirect to login if needed, then to the specific job posting")
+
+ # Chinese version (region: 1)
+ template_data_cn = {
+ "template_id": "job_opportunity_notification",
+ "region": 1,
+ "subject": "新的工作机会 - {{candidate_name}}",
+ "body": """
+
+
freeleaps
+
+
+
+
新的工作机会
+
+
亲爱的 {{candidate_name}},
+
+
在 {{company_domain}} 上有一个新的职位发布,可能符合您的个人资料。点击此链接或登录 {{company_domain}} 查看详情。
+
+
+
+
+
祝好!
Freeleaps 团队
+
+
+
+
freeleaps
+
+ We make software development the easiest ever.
+
+
+
"""
+ }
+
+ # English version (region: 0)
+ template_data_en = {
+ "template_id": "job_opportunity_notification",
+ "region": 0,
+ "subject": "New Job Opportunity - {{candidate_name}}",
+ "body": """
+
+
freeleaps
+
+
+
+
New Job Opportunity Available
+
+
Dear {{candidate_name}},
+
+
There is a new job posting on {{company_domain}}. It could be something matching your profile. Click this link or log in to {{company_domain}} to check it out.
+
+
+
+
+
Best regards,
Freeleaps Team
+
+
+
+
freeleaps
+
+ We make software development the easiest ever.
+
+
+
"""
+ }
+
+ success_cn = await self.create_template(template_data_cn)
+ success_en = await self.create_template(template_data_en)
+
+ return success_cn and success_en
+
async def create_all_templates(self):
"""create all templates"""
print("🚀 start creating global templates...")
@@ -1010,7 +1097,8 @@ class GlobalTemplateCreator:
self.create_interview_status_update_template,
self.create_welcome_email_template,
self.create_password_reset_template,
- self.create_account_verification_template
+ self.create_account_verification_template,
+ self.create_job_opportunity_template
]
success_count = 0
@@ -1028,7 +1116,8 @@ class GlobalTemplateCreator:
"interview_status_update - 面试状态更新",
"welcome_email - 欢迎邮件",
"password_reset_email - 密码重置邮件",
- "account_verification_email - 账号验证邮件"
+ "account_verification_email - 账号验证邮件",
+ "job_opportunity_notification - 工作机会通知"
]
for i, name in enumerate(template_names, 1):
diff --git a/apps/notification/tests/integration_tests/local.env b/apps/notification/tests/integration_tests/local.env
index 93d5880..f5b5a2b 100644
--- a/apps/notification/tests/integration_tests/local.env
+++ b/apps/notification/tests/integration_tests/local.env
@@ -27,10 +27,13 @@ export DOCKER_BACKEND_LOG_HOME=$DOCKER_BACKEND_HOME/log
export RABBITMQ_HOST=localhost
export RABBITMQ_PORT=5672
+export AUTH_SERVICE_ENDPOINT=http://localhost:9000/api/v1/keys/
+export AUTH_SERVICE_PORT=9000
+
# for local environment
export MONGODB_URI=mongodb://localhost:27017/
# connectivity from local to alpha
#export MONGODB_URI=mongodb+srv://jetli:8IHKx6dZK8BfugGp@freeleaps2.hanbj.mongodb.net/
export MONGODB_NAME=interview
export FREELEAPS_ENV=local
-export LOG_BASE_PATH=./log
+export LOG_BASE_PATH=./log
\ No newline at end of file
diff --git a/apps/notification/webapi/bootstrap/application.py b/apps/notification/webapi/bootstrap/application.py
index d98218d..65adca9 100644
--- a/apps/notification/webapi/bootstrap/application.py
+++ b/apps/notification/webapi/bootstrap/application.py
@@ -11,6 +11,7 @@ from webapi.providers import message_queue
from webapi.providers import exception_handler
from webapi.providers import probes
from webapi.providers import metrics
+from webapi.providers import middleware
from .freeleaps_app import FreeleapsApp
from common.config.app_settings import app_settings
@@ -22,6 +23,11 @@ def create_app() -> FastAPI:
app = FreeleapsApp()
register_logger()
+
+ # 1. Register middleware firstly
+ register(app, middleware)
+
+ # 2. Register other providers
register(app, exception_handler)
# Register probe APIs if enabled
if app_settings.PROBES_ENABLED:
diff --git a/apps/notification/webapi/middleware/__init__.py b/apps/notification/webapi/middleware/__init__.py
new file mode 100644
index 0000000..5700ce5
--- /dev/null
+++ b/apps/notification/webapi/middleware/__init__.py
@@ -0,0 +1,3 @@
+from .freeleaps_auth_middleware import FreeleapsAuthMiddleware
+
+__all__ = ['FreeleapsAuthMiddleware']
\ No newline at end of file
diff --git a/apps/notification/webapi/middleware/freeleaps_auth_middleware.py b/apps/notification/webapi/middleware/freeleaps_auth_middleware.py
new file mode 100644
index 0000000..5ef7188
--- /dev/null
+++ b/apps/notification/webapi/middleware/freeleaps_auth_middleware.py
@@ -0,0 +1,192 @@
+import httpx
+import asyncio
+import time
+import contextvars
+from datetime import datetime
+from starlette.requests import Request
+from fastapi import HTTPException, Response
+from typing import Dict, Any, Optional
+from common.log.module_logger import ModuleLogger
+
+from backend.models.models import UsageLogDoc
+from backend.infra.api_key_introspect_handler import ApiKeyIntrospectHandler
+
+# Define context data class
+class RequestContext:
+ def __init__(self, tenant_name: str = None, product_id: str = None, key_id: str = None):
+ self.tenant_name = tenant_name
+ self.product_id = product_id
+ self.key_id = key_id
+
+ def __repr__(self):
+ return f"RequestContext(tenant_name='{self.tenant_name}', product_id='{self.product_id}', key_id='{self.key_id}')"
+
+# Create context variable, store RequestContext object
+request_context_var = contextvars.ContextVar('request_context', default=RequestContext())
+
+class FreeleapsAuthMiddleware:
+ """
+ Notification service API Key middleware
+ """
+
+ def __init__(self, app):
+ self.app = app
+ self.api_key_introspect_handler = ApiKeyIntrospectHandler()
+ self.module_logger = ModuleLogger(sender_id=FreeleapsAuthMiddleware)
+
+ async def __call__(self, scope, receive, send):
+ """
+ Middleware main function, execute before and after request processing
+ """
+ if scope["type"] != "http":
+ await self.app(scope, receive, send)
+ return
+
+ request = Request(scope, receive)
+ start_time = time.time()
+ validation_result = None
+
+ try:
+ # 1. Skip paths that do not need validation
+ if self._should_skip_validation(request.url.path):
+ await self.module_logger.log_info(f"Path skipped validation: {request.url.path}")
+ await self.app(scope, receive, send)
+ return
+
+ # 2. Extract API Key from request header
+ api_key = request.headers.get("X-API-KEY")
+ # if the API_KEY field is empty, the request can be processed directly without validation.
+ # for compatibility
+ if not api_key or api_key == "":
+ await self.module_logger.log_info(f"API Key is empty: {request.url.path}")
+ await self.app(scope, receive, send)
+ return
+
+ # 3. Call freeleaps_auth to validate API Key
+ validation_result = await self.api_key_introspect_handler.api_key_introspect(api_key)
+
+ # 4. Store validation result in contextvars for later use
+ request_context = RequestContext(
+ tenant_name=validation_result.get("tenant_name"),
+ product_id=validation_result.get("product_id"),
+ key_id=validation_result.get("key_id")
+ )
+ request_context_var.set(request_context)
+
+ # 6. Process request and capture response
+ response_captured = None
+
+ async def send_wrapper(message):
+ nonlocal response_captured
+ if message["type"] == "http.response.start":
+ # Convert bytes headers to string headers
+ headers = {}
+ for header_name, header_value in message.get("headers", []):
+ if isinstance(header_name, bytes):
+ header_name = header_name.decode('latin-1')
+ if isinstance(header_value, bytes):
+ header_value = header_value.decode('latin-1')
+ headers[header_name] = header_value
+
+ response_captured = Response(
+ status_code=message["status"],
+ headers=headers,
+ media_type="application/json"
+ )
+ await send(message)
+
+ await self.app(scope, receive, send_wrapper)
+
+ # 7. Record usage log after request processing
+ if response_captured:
+ await self._log_usage(validation_result, request, response_captured, start_time)
+
+ except HTTPException as http_exc:
+ # Pass through HTTP exceptions (401, 403, etc.) from auth service
+ await self.module_logger.log_info(f"API Key validation failed: {http_exc.status_code} - {http_exc.detail}")
+ response = Response(
+ status_code=http_exc.status_code,
+ content=f'{{"error": "Authentication failed", "message": "{str(http_exc.detail)}"}}',
+ media_type="application/json"
+ )
+ await response(scope, receive, send)
+ except Exception as e:
+ await self.module_logger.log_error(f"Middleware error: {str(e)}")
+ response = Response(
+ status_code=500,
+ content=f'{{"error": "Internal error", "message": "Failed to process request", "details": "{str(e)}"}}',
+ media_type="application/json"
+ )
+ await response(scope, receive, send)
+
+ def _should_skip_validation(self, path: str) -> bool:
+ """
+ Check if the path should be skipped for validation
+ """
+ skip_paths = [
+ "/api/_/healthz", # Health check endpoint
+ "/api/_/readyz", # Readiness check endpoint
+ "/api/_/livez", # Liveness check endpoint
+ "/metrics", # Metrics endpoint
+ "/docs", # API documentation
+ "/openapi.json", # OpenAPI specification
+ "/favicon.ico" # Website icon
+ ]
+
+ # Check exact match for root path
+ if path == "/":
+ return True
+
+ # Check startswith for other paths
+ return any(path.startswith(skip_path) for skip_path in skip_paths)
+
+ async def _log_usage(self, validation_result: Dict[str, Any], request: Request,
+ response: Response, start_time: float) -> None:
+ """
+ Record API usage log
+ """
+ try:
+ # calculate processing time
+ process_time = (time.time() - start_time) * 1000
+
+ # get request body size
+ try:
+ request_body = await request.body()
+ bytes_in = len(request_body) if request.method in ["POST", "PUT", "PATCH"] else 0
+ except Exception:
+ bytes_in = 0
+
+ bytes_out = 0
+ if hasattr(response, 'headers'):
+ content_length = response.headers.get('content-length')
+ if content_length:
+ bytes_out = int(content_length)
+
+ # create usage log document
+ usage_log_doc = UsageLogDoc(
+ timestamp=datetime.utcnow(),
+ tenant_id=validation_result.get("tenant_name"),
+ operation=f"{request.method} {request.url.path}",
+ request_id=request.headers.get("X-Request-ID", "unknown"),
+ units=1, # TODO: adjust according to business logic
+ status="success" if response.status_code < 400 else "error",
+ latency_ms=int(process_time),
+ bytes_in=bytes_in,
+ bytes_out=bytes_out,
+ key_id=validation_result.get("key_id"),
+ extra={
+ "tenant_name": request_context_var.get().tenant_name,
+ "product_id": request_context_var.get().product_id,
+ "scopes": validation_result.get("scopes"),
+ "user_agent": request.headers.get("User-Agent"),
+ "ip_address": request.client.host if request.client else "unknown",
+ "response_status": response.status_code
+ }
+ )
+
+ # save to database
+ await usage_log_doc.save()
+ await self.module_logger.log_info(f"API Usage logged: {usage_log_doc.operation} for tenant {usage_log_doc.tenant_id}")
+
+ except Exception as e:
+ await self.module_logger.log_error(f"Failed to log usage: {str(e)}")
diff --git a/apps/notification/webapi/providers/database.py b/apps/notification/webapi/providers/database.py
index 00e0d66..8a6e350 100644
--- a/apps/notification/webapi/providers/database.py
+++ b/apps/notification/webapi/providers/database.py
@@ -1,15 +1,12 @@
from webapi.config.site_settings import site_settings
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
-from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc
+from backend.models.models import MessageTemplateDoc, EmailSenderDoc, EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc, UsageLogDoc
import os
# MongoDB config
-# TODO: for non-local environment, use the following config
-#MONGODB_URI = os.getenv('MONGODB_URI', 'mongodb+srv://jetli:8IHKx6dZK8BfugGp@freeleaps2.hanbj.mongodb.net/')
-#MONGODB_NAME = os.getenv('MONGODB_NAME', 'freeleaps2')
-MONGODB_URI = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/')
-MONGODB_NAME = os.getenv('MONGODB_NAME', 'interview')
+MONGODB_URI = os.getenv('MONGODB_URI')
+MONGODB_NAME = os.getenv('MONGODB_NAME')
# create MongoDB client
client = AsyncIOMotorClient(
@@ -26,7 +23,8 @@ document_models = [
EmailSenderDoc,
EmailSendStatusDoc,
EmailTrackingDoc,
- EmailBounceDoc
+ EmailBounceDoc,
+ UsageLogDoc
]
def register(app):
@@ -40,13 +38,7 @@ def register(app):
async def initiate_database():
"""initiate Beanie database connection"""
- try:
- await init_beanie(
- database=client[MONGODB_NAME],
- document_models=document_models
- )
- print(f"✅ database initialized successfully: {MONGODB_NAME}")
- print(f" URI: {MONGODB_URI}")
- except Exception as e:
- print(f"❌ database initialization failed: {e}")
- raise
+ await init_beanie(
+ database=client[MONGODB_NAME],
+ document_models=document_models
+ )
\ No newline at end of file
diff --git a/apps/notification/webapi/providers/middleware.py b/apps/notification/webapi/providers/middleware.py
new file mode 100644
index 0000000..43df09b
--- /dev/null
+++ b/apps/notification/webapi/providers/middleware.py
@@ -0,0 +1,9 @@
+from webapi.middleware.freeleaps_auth_middleware import FreeleapsAuthMiddleware
+
+
+def register(app):
+ """
+ Register middleware to FastAPI application
+ """
+ # Register API Key middleware
+ app.add_middleware(FreeleapsAuthMiddleware)
\ No newline at end of file