metrics: restructure starrocks routes, move database client, align APIs, Docker updates

This commit is contained in:
weicao 2025-09-18 17:19:27 +08:00
parent 35fbda6954
commit 3a05ec5001
42 changed files with 1720 additions and 430 deletions

View File

@ -10,9 +10,28 @@ COPY requirements.txt .
# Install dependencies # Install dependencies
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r requirements.txt
# Copy environment file
COPY local.env .
# Copy application code # Copy application code
COPY . . COPY . .
ENV MONGODB_NAME = "freeleaps2"
ENV MONGODB_URI = "mongodb://freeleaps2-mongodb:27017"
#app_settings
ENV GITEA_TOKEN = ""
ENV GITEA_URL = ""
ENV GITEA_DEPOT_ORGANIZATION = ""
ENV CODE_DEPOT_HTTP_PORT = ""
ENV CODE_DEPOT_SSH_PORT = ""
ENV CODE_DEPOT_DOMAIN_NAME = ""
#log_settings
ENV LOG_BASE_PATH = "./logs"
ENV BACKEND_LOG_FILE_NAME = "freeleaps-metrics"
ENV APPLICATION_ACTIVITY_LOG = "freeleaps-metrics-activity"
# Expose port # Expose port
EXPOSE 8009 EXPOSE 8009

View File

@ -1,35 +1,28 @@
# 📊 Metrics Service # 📊 Metrics Service
> A lightweight FastAPI microservice for user registration analytics and statistics > A lightweight FastAPI microservice for user registration analytics and metrics
[![Python](https://img.shields.io/badge/Python-3.12+-blue.svg)](https://python.org) [![Python](https://img.shields.io/badge/Python-3.12+-blue.svg)](https://python.org)
[![FastAPI](https://img.shields.io/badge/FastAPI-0.114+-green.svg)](https://fastapi.tiangolo.com) [![FastAPI](https://img.shields.io/badge/FastAPI-0.114+-green.svg)](https://fastapi.tiangolo.com)
[![Docker](https://img.shields.io/badge/Docker-Ready-blue.svg)](https://docker.com) [![Docker](https://img.shields.io/badge/Docker-Ready-blue.svg)](https://docker.com)
The Metrics service provides real-time APIs for querying user registration data from StarRocks database, offering flexible analytics and insights into user growth patterns. The Metrics service provides real-time APIs for querying user registration data (via StarRocks) and querying monitoring metrics (via Prometheus).
## ✨ Features ## ✨ Features
### 📊 User Registration Statistics APIs ### 📊 Registration Analytics (StarRocks)
- **Date Range Query** - Query registration data for specific date ranges - Date Range Query (start_date ~ end_date)
- **Recent N Days Query** - Get registration data for the last N days - Typed responses with Pydantic models
- **Start Date + Days Query** - Query N days starting from a specified date
- **Statistics Summary** - Get comprehensive statistics and analytics
- **POST Method Support** - JSON request body support for complex queries
### 🗄️ Database Integration ### 📈 Prometheus Metrics
- **StarRocks Database Connection** - Predefined PromQL metrics per product
- Host: `freeleaps-starrocks-cluster-fe-service.freeleaps-data-platform.svc` - Time-range queries and metric info discovery
- Port: `9030`
- Database: `freeleaps`
- Table: `dws_daily_registered_users`
### 🔧 Technical Features ### 🔧 Technical Features
- **Data Models**: Pydantic validation for data integrity - Data Models: Pydantic v2
- **Connection Management**: Automatic database connection and disconnection - Async HTTP and robust error handling
- **Error Handling**: Comprehensive exception handling with user-friendly error messages - Structured logging
- **Logging**: Structured logging using Loguru - Auto-generated Swagger/OpenAPI docs
- **API Documentation**: Auto-generated Swagger/OpenAPI documentation
## 📁 Project Structure ## 📁 Project Structure
@ -37,31 +30,43 @@ The Metrics service provides real-time APIs for querying user registration data
metrics/ metrics/
├── backend/ # Business logic layer ├── backend/ # Business logic layer
│ ├── infra/ # Infrastructure components │ ├── infra/ # Infrastructure components
│ │ └── database_client.py │ │ └── external_service/
│ │ ├── prometheus_client.py
│ │ └── starrocks_client.py
│ ├── models/ # Data models │ ├── models/ # Data models
│ │ └── user_registration_models.py │ │ └── user_registration_models.py
│ └── services/ # Business services │ └── services/ # Business services
│ ├── prometheus_metrics_service.py
│ └── registration_analytics_service.py │ └── registration_analytics_service.py
├── webapi/ # API layer ├── webapi/ # API layer
│ ├── routes/ # API endpoints │ ├── routes/ # API endpoints
│ │ └── registration_metrics.py │ │ ├── metrics/ # Aggregated routers (prefix: /api/metrics)
│ ├── config/ # Configuration │ │ ├── prometheus_metrics/
│ │ └── app_settings.py │ │ │ ├── __init__.py
│ ├── bootstrap/ # App initialization │ │ │ ├── available_metrics.py
│ │ └── app_factory.py │ │ │ ├── metric_info.py
│ └── main.py # FastAPI app entry point │ │ │ └── metrics_query.py
├── common/ # Shared utilities │ │ └── starrocks_metrics/
├── requirements.txt # Dependencies │ │ ├── __init__.py
├── Dockerfile # Container config │ │ ├── available_metrics.py
├── local.env # Environment variables │ │ ├── metric_info.py
└── README.md # Documentation │ │ └── metrics_query.py
│ ├── bootstrap/
│ │ └── application.py
│ └── main.py
├── common/
├── requirements.txt
├── Dockerfile
├── local.env
└── README.md
``` ```
## 🚀 Quick Start ## 🚀 Quick Start
### Prerequisites ### Prerequisites
- Python 3.12+ or Docker - Python 3.12+ or Docker
- Access to StarRocks database - Access to StarRocks database (for registration analytics)
- Access to Prometheus (for monitoring metrics)
### 🐍 Python Setup ### 🐍 Python Setup
@ -79,8 +84,12 @@ python3 -m uvicorn webapi.main:app --host 0.0.0.0 --port 8009 --reload
# 1. Build image # 1. Build image
docker build -t metrics:latest . docker build -t metrics:latest .
# 2. Run container # 2. Run container (env from baked local.env)
docker run --rm -p 8009:8009 metrics:latest docker run --rm -p 8009:8009 metrics:latest
# Alternatively: run with a custom env file
# (this overrides envs copied into the image)
docker run --rm -p 8009:8009 --env-file local.env metrics:latest
``` ```
### 📖 Access Documentation ### 📖 Access Documentation
@ -88,69 +97,36 @@ Visit `http://localhost:8009/docs` for interactive API documentation.
## 📊 API Endpoints ## 📊 API Endpoints
| Endpoint | Method | Description | All endpoints are exposed under the API base prefix `/api/metrics`.
|----------|--------|-------------|
| `/api/metrics/daily-registered-users` | GET/POST | Query registration data by date range |
| `/api/metrics/recent-registered-users` | GET | Get recent N days data |
| `/api/metrics/registered-users-by-days` | GET | Query N days from start date |
| `/api/metrics/registration-summary` | GET | Get statistical summary |
### Example Requests ### StarRocks (Registration Analytics)
- POST `/api/metrics/starrocks/dru_query` — Query daily registered users time series for a date range
- GET `/api/metrics/starrocks/product/{product_id}/available-metrics` — List available StarRocks-backed metrics for the product
- GET `/api/metrics/starrocks/product/{product_id}/metric/{metric_name}/info` — Get metric info for the product
Example request:
```bash ```bash
# Get last 7 days curl -X POST "http://localhost:8009/api/metrics/starrocks/dru_query" \
curl "http://localhost:8009/api/metrics/recent-registered-users?days=7" -H "Content-Type: application/json" \
-d '{
# Get date range "product_id": "freeleaps",
curl "http://localhost:8009/api/metrics/daily-registered-users?start_date=2024-09-10&end_date=2024-09-20" "start_date": "2024-09-10",
"end_date": "2024-09-20"
# Get summary statistics }'
curl "http://localhost:8009/api/metrics/registration-summary?start_date=2024-09-10&end_date=2024-09-20"
``` ```
### Parameters ### Prometheus
- `start_date` / `end_date`: Date in `YYYY-MM-DD` format - POST `/api/metrics/prometheus/metrics_query` — Query metric time series by product/metric
- `days`: Number of days (max: 365) - GET `/api/metrics/prometheus/product/{product_id}/available-metrics` — List available metrics for product
- `product_id`: Product identifier (default: "freeleaps") - GET `/api/metrics/prometheus/product/{product_id}/metric/{metric_name}/info` — Get metric info
## 📈 Response Format
### Standard Response
```json
{
"dates": ["2024-09-10", "2024-09-11", "2024-09-12"],
"counts": [39, 38, 31],
"total_registrations": 108,
"query_period": "2024-09-10 to 2024-09-12"
}
```
### Summary Response
```json
{
"total_registrations": 282,
"average_daily": 25.64,
"max_daily": 39,
"min_daily": 8,
"days_with_registrations": 10,
"total_days": 11
}
```
## 🧪 Testing ## 🧪 Testing
### Quick Test
```bash ```bash
# Health check # Health check
curl http://localhost:8009/ curl http://localhost:8009/
# Test recent registrations
curl "http://localhost:8009/api/metrics/recent-registered-users?days=7"
``` ```
### Interactive Testing
Visit `http://localhost:8009/docs` for the Swagger UI interface where you can test all endpoints directly.
## ⚙️ Configuration ## ⚙️ Configuration
### Environment Variables ### Environment Variables
@ -170,9 +146,12 @@ STARROCKS_DATABASE=freeleaps
LOG_BASE_PATH=./logs LOG_BASE_PATH=./logs
BACKEND_LOG_FILE_NAME=metrics BACKEND_LOG_FILE_NAME=metrics
APPLICATION_ACTIVITY_LOG=metrics-activity APPLICATION_ACTIVITY_LOG=metrics-activity
# Prometheus
PROMETHEUS_ENDPOINT=http://localhost:9090
``` ```
> 💡 **Tip**: Copy `local.env` to `.env` and modify as needed for your environment. > Tip: Copy `local.env` to `.env` and modify as needed for your environment.
### 🐳 Docker Deployment ### 🐳 Docker Deployment
@ -206,20 +185,16 @@ python -m uvicorn webapi.main:app --reload
``` ```
## 📝 API Documentation ## 📝 API Documentation
- Swagger UI: `http://localhost:8009/docs`
- **Swagger UI**: `http://localhost:8009/docs` - ReDoc: `http://localhost:8009/redoc`
- **ReDoc**: `http://localhost:8009/redoc` - OpenAPI JSON: `http://localhost:8009/openapi.json`
- **OpenAPI JSON**: `http://localhost:8009/openapi.json`
## ⚠️ Important Notes ## ⚠️ Important Notes
- Date format: `YYYY-MM-DD` (single-digit month/day also accepted by API)
- Date format: `YYYY-MM-DD`
- Max query range: 365 days
- Default `product_id`: "freeleaps" - Default `product_id`: "freeleaps"
- Requires StarRocks database access - Requires StarRocks database access and/or Prometheus endpoint
## 🐛 Troubleshooting ## 🐛 Troubleshooting
| Issue | Solution | | Issue | Solution |
|-------|----------| |-------|----------|
| Port in use | `docker stop $(docker ps -q --filter ancestor=metrics:latest)` | | Port in use | `docker stop $(docker ps -q --filter ancestor=metrics:latest)` |

View File

@ -0,0 +1,119 @@
import httpx
from typing import Dict, Any, Optional, Union
from datetime import datetime
import json
from fastapi import HTTPException
from common.config.app_settings import app_settings
from common.log.module_logger import ModuleLogger
class PrometheusClient:
"""
Async Prometheus client for querying metrics data using PromQL.
This client provides methods to:
- Query data using PromQL expressions
- Get all available metrics
- Get labels for specific metrics
- Query metric series with label filters
"""
def __init__(self, endpoint: Optional[str] = None):
"""
Initialize Prometheus client.
Args:
endpoint: Prometheus server endpoint. If None, uses PROMETHEUS_ENDPOINT from settings.
"""
self.module_logger = ModuleLogger(__file__)
self.endpoint = endpoint or app_settings.PROMETHEUS_ENDPOINT
self.base_url = f"{self.endpoint.rstrip('/')}/api/v1"
async def request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Make HTTP request to Prometheus API.
Args:
endpoint: API endpoint path
params: Query parameters
Returns:
JSON response data
Raises:
httpx.HTTPError: If request fails
ValueError: If response is not valid JSON
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
await self.module_logger.log_info(f"Making request to Prometheus: {url} with params: {params}")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
if data.get("status") != "success":
error_msg = data.get('error', 'Unknown error')
await self.module_logger.log_error(f"Prometheus API error: {error_msg}")
raise HTTPException(status_code=400, detail=f"Prometheus API error: {error_msg}")
return data
except httpx.HTTPError as e:
await self.module_logger.log_error(f"HTTP error querying Prometheus: {e}")
raise HTTPException(status_code=502, detail=f"Failed to connect to Prometheus: {str(e)}")
except json.JSONDecodeError as e:
await self.module_logger.log_error(f"Invalid JSON response from Prometheus: {e}")
raise HTTPException(status_code=400, detail=f"Invalid response from Prometheus: {str(e)}")
async def query_range(
self,
query: str,
start: Union[str, datetime],
end: Union[str, datetime],
step: str = "15s"
) -> Dict[str, Any]:
"""
Execute a PromQL range query.
Args:
query: PromQL query string
start: Start time (RFC3339 string or datetime)
end: End time (RFC3339 string or datetime)
step: Query resolution step width (e.g., "15s", "1m", "1h")
Returns:
Range query result data
Example:
result = await client.query_range(
"up{job='prometheus'}",
start=datetime.now() - timedelta(hours=1),
end=datetime.now(),
step="1m"
)
"""
params = {
"query": query,
"step": step
}
# Convert datetime to RFC3339 string if needed
if isinstance(start, datetime):
if start.tzinfo is None:
params["start"] = start.isoformat() + "Z"
else:
params["start"] = start.isoformat()
else:
params["start"] = start
if isinstance(end, datetime):
if end.tzinfo is None:
params["end"] = end.isoformat() + "Z"
else:
params["end"] = end.isoformat()
else:
params["end"] = end
return await self.request("query_range", params)

View File

@ -2,18 +2,18 @@ import pymysql
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from datetime import date from datetime import date
from loguru import logger from loguru import logger
from webapi.config.app_settings import site_settings from common.config.app_settings import app_settings
class StarRocksClient: class StarRocksClient:
"""StarRocks database client for querying user registration data""" """StarRocks database client for querying user registration data"""
def __init__(self): def __init__(self):
self.host = site_settings.STARROCKS_HOST self.host = app_settings.STARROCKS_HOST
self.port = site_settings.STARROCKS_PORT self.port = app_settings.STARROCKS_PORT
self.user = site_settings.STARROCKS_USER self.user = app_settings.STARROCKS_USER
self.password = site_settings.STARROCKS_PASSWORD self.password = app_settings.STARROCKS_PASSWORD
self.database = site_settings.STARROCKS_DATABASE self.database = app_settings.STARROCKS_DATABASE
self.connection = None self.connection = None
def connect(self) -> bool: def connect(self) -> bool:
@ -88,12 +88,3 @@ class StarRocksClient:
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit""" """Context manager exit"""
self.disconnect() self.disconnect()

View File

@ -0,0 +1,268 @@
from typing import Dict, List, Any, Optional, Union
from datetime import datetime, timedelta
from fastapi import HTTPException
from common.log.module_logger import ModuleLogger
from ..infra.external_service.prometheus_client import PrometheusClient
class PrometheusMetricsService:
"""
Service class for querying Prometheus metrics with predefined PromQL queries.
This service provides a high-level interface for querying metrics data
using predefined PromQL queries mapped to metric names.
"""
# Global dictionary mapping metric names to their corresponding PromQL queries
METRIC_PROMQL_MAP: Dict[str, str] = {
"freeleaps": {
# Just demo, No Usage
"cpu_usage": "100 - (avg by (instance) (irate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)",
# Just demo, No Usage
"memory_usage": "100 - ((node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100)",
# Just demo, No Usage
"disk_usage": "100 - ((node_filesystem_avail_bytes{mountpoint=\"/\"} / node_filesystem_size_bytes{mountpoint=\"/\"}) * 100)",
# Average response time for notification HTTP requests
"latency_ms": "1000*avg(freeleaps_notification_http_request_duration_seconds_sum{handler!=\"none\"} / freeleaps_notification_http_request_duration_seconds_count)",
# Error rate for 5xx HTTP status codes (stability metric)
"reliability": "1-sum(rate(freeleaps_notification_http_requests_total{status=\"5xx\"}[1m]))",
},
"magicleaps": {
}
}
def __init__(self, prometheus_endpoint: Optional[str] = None):
"""
Initialize PrometheusMetricsService.
Args:
prometheus_endpoint: Prometheus server endpoint. If None, uses default from settings.
"""
self.module_logger = ModuleLogger(__file__)
self.prometheus_client = PrometheusClient(prometheus_endpoint)
def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]:
"""
Get list of available metric names that have predefined PromQL queries.
Args:
product_id: Optional product ID to filter metrics. If None, returns all metrics from all products.
Returns:
List of available metric names
"""
if product_id:
if product_id in self.METRIC_PROMQL_MAP:
return list(self.METRIC_PROMQL_MAP[product_id].keys())
else:
return []
else:
# Return all metrics from all products
all_metrics = []
for product_metrics in self.METRIC_PROMQL_MAP.values():
all_metrics.extend(product_metrics.keys())
return all_metrics
def get_available_products(self) -> List[str]:
"""
Get list of available product IDs.
Returns:
List of available product IDs
"""
return list(self.METRIC_PROMQL_MAP.keys())
async def query_metric_by_time_range(
self,
product_id: str,
metric_name: str,
start_time: Union[str, datetime],
end_time: Union[str, datetime],
step: str = "1m"
) -> List[Dict[str, Any]]:
"""
Query metric data for a specific time range.
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric to query
start_time: Start time for the query (RFC3339 string or datetime)
end_time: End time for the query (RFC3339 string or datetime)
step: Query resolution step width (e.g., "1m", "5m", "1h")
Returns:
List of dictionaries with 'date' and 'value' keys
Raises:
ValueError: If product_id or metric_name is not found in the PromQL mapping
Exception: If Prometheus query fails
Example:
result = await service.query_metric_by_time_range(
"freeleaps",
"cpu_usage",
start_time=datetime.now() - timedelta(hours=1),
end_time=datetime.now(),
step="5m"
)
# Returns: [{"date": "2024-01-01T10:00:00Z", "value": 45.2}, ...]
"""
# Check if product_id exists in the mapping
if product_id not in self.METRIC_PROMQL_MAP:
available_products = ", ".join(self.get_available_products())
error_msg = f"Product '{product_id}' not found in PromQL mapping. Available products: {available_products}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Check if metric name exists in the product's mapping
if metric_name not in self.METRIC_PROMQL_MAP[product_id]:
available_metrics = ", ".join(self.get_available_metrics(product_id))
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' PromQL mapping. Available metrics: {available_metrics}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Parse datetime strings if they are strings
if isinstance(start_time, str):
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
else:
start_dt = start_time
if isinstance(end_time, str):
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
else:
end_dt = end_time
# Validate time range
if start_dt >= end_dt:
raise HTTPException(
status_code=400,
detail="Start time must be before end time"
)
# Check time range is not too large (max 7 days for detailed queries)
time_diff = end_dt - start_dt
if time_diff > timedelta(days=7):
raise HTTPException(
status_code=400,
detail="Time range cannot exceed 7 days for detailed queries"
)
# Get the PromQL query for the metric
promql_query = self.METRIC_PROMQL_MAP[product_id][metric_name]
try:
await self.module_logger.log_info(
f"Querying metric '{metric_name}' from product '{product_id}' with PromQL: {promql_query}")
# Execute the range query
result = await self.prometheus_client.query_range(
query=promql_query,
start=start_dt,
end=end_dt,
step=step
)
# Parse the result and format it
formatted_data = self._format_query_result(result, metric_name)
await self.module_logger.log_info(
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
return formatted_data
except Exception as e:
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
raise
def _format_query_result(self, prometheus_result: Dict[str, Any], metric_name: str) -> List[Dict[str, Any]]:
"""
Format Prometheus query result into the required format.
Args:
prometheus_result: Raw result from Prometheus API
metric_name: Name of the metric being queried
Returns:
List of dictionaries with 'date' and 'value' keys
"""
formatted_data = []
# Extract data from Prometheus result
data = prometheus_result.get("data", {})
result_type = data.get("resultType", "")
if result_type == "matrix":
# Handle range query results (matrix)
for series in data.get("result", []):
metric_labels = series.get("metric", {})
values = series.get("values", [])
for timestamp, value in values:
# Convert Unix timestamp to ISO format
date_str = datetime.fromtimestamp(timestamp).isoformat() + "Z"
formatted_data.append({
"date": date_str,
"value": float(value) if value != "NaN" else None,
"metric": metric_name,
"labels": metric_labels
})
elif result_type == "vector":
# Handle instant query results (vector)
for series in data.get("result", []):
metric_labels = series.get("metric", {})
timestamp = series.get("value", [None, None])[0]
value = series.get("value", [None, None])[1]
if timestamp and value:
date_str = datetime.fromtimestamp(timestamp).isoformat() + "Z"
formatted_data.append({
"date": date_str,
"value": float(value) if value != "NaN" else None,
"metric": metric_name,
"labels": metric_labels
})
# Sort by date
formatted_data.sort(key=lambda x: x["date"])
return formatted_data
async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]:
"""
Get information about a specific metric including its PromQL query.
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric
Returns:
Dictionary containing metric information
Raises:
ValueError: If product_id or metric_name is not found in the PromQL mapping
"""
# Check if product_id exists in the mapping
if product_id not in self.METRIC_PROMQL_MAP:
available_products = ", ".join(self.get_available_products())
error_msg = f"Product '{product_id}' not found in PromQL mapping. Available products: {available_products}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Check if metric name exists in the product's mapping
if metric_name not in self.METRIC_PROMQL_MAP[product_id]:
available_metrics = ", ".join(self.get_available_metrics(product_id))
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' PromQL mapping. Available metrics: {available_metrics}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
return {
"product_id": product_id,
"metric_name": metric_name,
"promql_query": self.METRIC_PROMQL_MAP[product_id][metric_name],
"description": f"PromQL query for {metric_name} metric in product {product_id}"
}

View File

@ -1,7 +1,7 @@
from typing import List, Dict, Any from typing import List, Dict, Any
from datetime import date, timedelta from datetime import date, timedelta
from loguru import logger from loguru import logger
from backend.infra.database_client import StarRocksClient from backend.infra.external_service.starrocks_client import StarRocksClient
from backend.models.user_registration_models import UserRegistrationResponse, DailyRegisteredUsers from backend.models.user_registration_models import UserRegistrationResponse, DailyRegisteredUsers
@ -33,7 +33,6 @@ class RegistrationService:
raw_data = self.starrocks_client.get_daily_registered_users( raw_data = self.starrocks_client.get_daily_registered_users(
start_date, end_date, product_id start_date, end_date, product_id
) )
# Convert to DailyRegisteredUsers objects # Convert to DailyRegisteredUsers objects
daily_data = [ daily_data = [
DailyRegisteredUsers( DailyRegisteredUsers(
@ -44,7 +43,6 @@ class RegistrationService:
) )
for row in raw_data for row in raw_data
] ]
# Create date-to-count mapping # Create date-to-count mapping
data_dict = {str(item.date_id): item.registered_cnt for item in daily_data} data_dict = {str(item.date_id): item.registered_cnt for item in daily_data}

View File

View File

@ -2,7 +2,7 @@ from pydantic_settings import BaseSettings
from typing import Optional from typing import Optional
class SiteSettings(BaseSettings): class AppSettings(BaseSettings):
# Server settings # Server settings
SERVER_HOST: str = "0.0.0.0" SERVER_HOST: str = "0.0.0.0"
SERVER_PORT: int = 8009 SERVER_PORT: int = 8009
@ -11,8 +11,8 @@ class SiteSettings(BaseSettings):
# Log settings # Log settings
LOG_BASE_PATH: str = "./logs" LOG_BASE_PATH: str = "./logs"
BACKEND_LOG_FILE_NAME: str = "metrics" BACKEND_LOG_FILE_NAME: str = "freeleaps-metrics"
APPLICATION_ACTIVITY_LOG: str = "metrics-activity" APPLICATION_ACTIVITY_LOG: str = "freeleaps-metrics-activity"
# StarRocks database settings # StarRocks database settings
STARROCKS_HOST: str = "freeleaps-starrocks-cluster-fe-service.freeleaps-data-platform.svc" STARROCKS_HOST: str = "freeleaps-starrocks-cluster-fe-service.freeleaps-data-platform.svc"
@ -21,8 +21,15 @@ class SiteSettings(BaseSettings):
STARROCKS_PASSWORD: str = "" STARROCKS_PASSWORD: str = ""
STARROCKS_DATABASE: str = "freeleaps" STARROCKS_DATABASE: str = "freeleaps"
# Prometheus settings
PROMETHEUS_ENDPOINT: str = "http://localhost:9090"
METRICS_ENABLED: bool = False
PROBES_ENABLED: bool = True
class Config: class Config:
env_file = "local.env" env_file = "local.env"
site_settings = SiteSettings() app_settings = AppSettings()

View File

@ -0,0 +1,17 @@
import os
from dataclasses import dataclass
from .app_settings import app_settings
from .site_settings import site_settings
@dataclass
class LogSettings:
LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH
LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days")
LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight
MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5))
LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB
APP_NAME: str = site_settings.NAME
ENVIRONMENT: str = site_settings.ENV
log_settings = LogSettings()

View File

@ -0,0 +1,26 @@
import os
from pydantic_settings import BaseSettings
# NOTE: The values fall backs to your environment variables when not set here
class SiteSettings(BaseSettings):
NAME: str = "FREELEAPS-METRICS"
DEBUG: bool = True
ENV: str = "dev"
SERVER_HOST: str = "localhost"
SERVER_PORT: int = 9000
URL: str = "http://localhost"
TIME_ZONE: str = "UTC"
BASE_PATH: str = os.path.dirname(os.path.dirname((os.path.abspath(__file__))))
class Config:
env_file = ".devbase-webapi.env"
env_file_encoding = "utf-8"
site_settings = SiteSettings()

View File

View File

@ -0,0 +1,12 @@
from .base_logger import LoggerBase
from common.config.app_settings import app_settings
class ApplicationLogger(LoggerBase):
def __init__(self, application_activities: dict[str, any] = {}) -> None:
extra_fileds = {}
if application_activities:
extra_fileds.update(application_activities)
super().__init__(
logger_name=app_settings.APPLICATION_ACTIVITY_LOG,
extra_fileds=extra_fileds,
)

View File

@ -0,0 +1,136 @@
from loguru import logger as guru_logger
from common.config.log_settings import log_settings
from typing import Dict, Any, Optional
import socket
import json
import threading
import os
import sys
import inspect
import logging
from common.log.json_sink import JsonSink
class LoggerBase:
binded_loggers = {}
logger_lock = threading.Lock()
def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None:
self.__logger_name = logger_name
self.extra_fileds = extra_fileds
with LoggerBase.logger_lock:
if self.__logger_name in LoggerBase.binded_loggers:
self.logger = LoggerBase.binded_loggers[self.__logger_name]
return
log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log"
log_level = "INFO"
rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024)
guru_logger.remove()
file_sink = JsonSink(
log_file_path=log_filename,
rotation_size_bytes=rotation_bytes,
max_backup_files=log_settings.MAX_BACKUP_FILES
)
guru_logger.add(
sink=file_sink,
level=log_level,
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
)
guru_logger.add(
sink=sys.stderr,
level=log_level,
format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}",
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
)
host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name)
self.logger = guru_logger.bind(
topic=self.__logger_name,
host_ip=host_ip,
host_name=host_name,
app=log_settings.APP_NAME,
env=log_settings.ENVIRONMENT,
)
with LoggerBase.logger_lock:
LoggerBase.binded_loggers[self.__logger_name] = self.logger
def _get_log_context(self) -> dict:
frame = inspect.currentframe().f_back.f_back
filename = os.path.basename(frame.f_code.co_filename)
lineno = frame.f_lineno
return {"log_file": filename, "log_line": lineno}
def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]:
props = {} if properties is None else properties.copy()
props_str = json.dumps(props, ensure_ascii=False) if props else "{}"
return props, props_str
async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context)
local_logger.info(text)
async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context)
local_logger.exception(text)
async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context)
local_logger.info(text)
async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context)
local_logger.warning(text)
async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context)
local_logger.error(text)
@staticmethod
def configure_uvicorn_logging():
print("📢 Setting up uvicorn logging interception...")
# Intercept logs from these loggers
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# Replace default handlers
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
# Configure specific uvicorn loggers
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove default handlers
logging_logger.propagate = True # Ensure propagation through Loguru

View File

@ -0,0 +1,84 @@
import json
import datetime
import traceback
from pathlib import Path
class JsonSink:
def __init__(
self,
log_file_path: str,
rotation_size_bytes: int = 10 * 1024 * 1024,
max_backup_files: int = 5,
):
self.log_file_path = Path(log_file_path)
self.rotation_size = rotation_size_bytes
self.max_backup_files = max_backup_files
self._open_log_file()
def _open_log_file(self):
# ensure the parent directory exists
parent_dir = self.log_file_path.parent
if not parent_dir.exists():
parent_dir.mkdir(parents=True, exist_ok=True)
self.log_file = self.log_file_path.open("a", encoding="utf-8")
def _should_rotate(self) -> bool:
return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size
def _rotate(self):
self.log_file.close()
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}")
self.log_file_path.rename(rotated_path)
self._cleanup_old_backups()
self._open_log_file()
def _cleanup_old_backups(self):
parent = self.log_file_path.parent
stem = self.log_file_path.stem
suffix = self.log_file_path.suffix
backup_files = sorted(
parent.glob(f"{stem}_*{suffix}"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
for old_file in backup_files[self.max_backup_files:]:
try:
old_file.unlink()
except Exception as e:
print(f"Failed to delete old backup {old_file}: {e}")
def __call__(self, message):
record = message.record
if self._should_rotate():
self._rotate()
log_entry = {
"level": record["level"].name.lower(),
"timestamp": int(record["time"].timestamp() * 1000),
"text": record["message"],
"fields": record["extra"].get("properties", {}),
"context": {
"app": record["extra"].get("app"),
"env": record["extra"].get("env"),
"log_file": record["extra"].get("log_file"),
"log_line": record["extra"].get("log_line"),
"topic": record["extra"].get("topic"),
"sender_id": record["extra"].get("sender_id"),
"receiver_id": record["extra"].get("receiver_id"),
"subject": record["extra"].get("subject"),
"event": record["extra"].get("event"),
"host_ip": record["extra"].get("host_ip"),
"host_name": record["extra"].get("host_name"),
},
"stacktrace": None
}
if record["exception"]:
exc_type, exc_value, exc_tb = record["exception"]
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
self.log_file.flush()

View File

@ -0,0 +1,46 @@
from .application_logger import ApplicationLogger
class ModuleLogger(ApplicationLogger):
def __init__(self, sender_id: str) -> None:
super().__init__()
self.event_sender_id = sender_id
self.event_receiver_id = "ModuleLogger"
self.event_subject = "module"
async def log_exception(self, exception: Exception, text: str = "Exception", properties: dict[str, any] = None) -> None:
return await super().log_exception(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
exception=exception,
text=text,
properties=properties,
)
async def log_info(self, text: str, data: dict[str, any] = None) -> None:
return await super().log_info(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=text,
properties=data,
)
async def log_warning(self, text: str, data: dict[str, any] = None) -> None:
return await super().log_warning(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=text,
properties=data,
)
async def log_error(self, text: str, data: dict[str, any] = None) -> None:
return await super().log_error(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=text,
properties=data,
)

View File

@ -0,0 +1,140 @@
import logging
from enum import Enum
from typing import Optional, Callable, Tuple, Dict
import inspect
from datetime import datetime, timezone
# ProbeType is an Enum that defines the types of probes that can be registered.
class ProbeType(Enum):
LIVENESS = "liveness"
READINESS = "readiness"
STARTUP = "startup"
# ProbeResult is a class that represents the result of a probe check.
class ProbeResult:
def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None):
self.success = success
self.message = message
self.data = data or {}
def to_dict(self) -> dict:
return {
"success": self.success,
"message": self.message,
"data": self.data
}
# Probe is a class that represents a probe that can be registered.
class Probe:
def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None):
self.type = type
self.path = path
self.check_fn = check_fn
self.name = name or f"{type.value}-{id(self)}"
async def execute(self) -> ProbeResult:
try:
result = self.check_fn()
if inspect.isawaitable(result):
result = await result
if isinstance(result, ProbeResult):
return result
elif isinstance(result, bool):
return ProbeResult(result, "ok" if result else "failed")
else:
return ProbeResult(True, "ok")
except Exception as e:
return ProbeResult(False, str(e))
# ProbeGroup is a class that represents a group of probes that can be checked together.
class ProbeGroup:
def __init__(self, path: str):
self.path = path
self.probes: Dict[str, Probe] = {}
def add_probe(self, probe: Probe):
self.probes[probe.name] = probe
async def check_all(self) -> Tuple[bool, dict]:
results = {}
all_success = True
for name, probe in self.probes.items():
result = await probe.execute()
results[name] = result.to_dict()
if not result.success:
all_success = False
return all_success, results
# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters.
class FrameworkAdapter:
async def handle_request(self, group: ProbeGroup):
all_success, results = await group.check_all()
status_code = 200 if all_success else 503
return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code
def register_route(self, path: str, handler: Callable):
raise NotImplementedError
# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters.
class ProbeManager:
_default_paths = {
ProbeType.LIVENESS: "/_/livez",
ProbeType.READINESS: "/_/readyz",
ProbeType.STARTUP: "/_/healthz"
}
def __init__(self):
self.groups: Dict[str, ProbeGroup] = {}
self.adapters: Dict[str, FrameworkAdapter] = {}
self._startup_complete = False
def register_adapter(self, framework: str, adapter: FrameworkAdapter):
self.adapters[framework] = adapter
logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}")
def register(
self,
type: ProbeType,
check_func: Optional[Callable] = None,
path: Optional[str] = None,
prefix: str = "",
name: Optional[str] = None,
frameworks: Optional[list] = None
):
path = path or self._default_paths.get(type, "/_/healthz")
if prefix:
path = f"{prefix}{path}"
if type == ProbeType.STARTUP and check_func is None:
check_func = self._default_startup_check
probe = Probe(type, path, check_func or (lambda: True), name)
if path not in self.groups:
self.groups[path] = ProbeGroup(path)
self.groups[path].add_probe(probe)
for framework in (frameworks or ["default"]):
self._register_route(framework, path)
logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}")
def _register_route(self, framework: str, path: str):
if framework not in self.adapters:
return
adapter = self.adapters[framework]
group = self.groups[path]
async def handler():
return await adapter.handle_request(group)
adapter.register_route(path, handler)
def _default_startup_check(self) -> bool:
return self._startup_complete
def mark_startup_complete(self):
self._startup_complete = True

View File

@ -0,0 +1,15 @@
from . import FrameworkAdapter
from fastapi.responses import JSONResponse
from typing import Callable
# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI.
class FastAPIAdapter(FrameworkAdapter):
def __init__(self, app):
self.app = app
def register_route(self,path: str, handler: Callable):
async def wrapper():
data, status_code = await handler()
return JSONResponse(content=data, status_code=status_code)
self.app.add_api_route(path, wrapper, methods=["GET"])

146
apps/metrics/docs/design.md Normal file
View File

@ -0,0 +1,146 @@
# 1.Override
We support two ways to query metrics:
- Connect to StarRocks data warehouse and query metrics from it
- Query Prometheus directly and retrieve metrics from it
# 2.Starrocks Metric
We can implement StarRocks Metric queries similar to Prometheus Metric queries. The only difference is replacing PromQL with SQL and querying through StarRocks API.
# 3.Prometheus Metric
## 3.1.Metrics Config
Currently, metrics are configured in code. In the future, they will be configured through database or other methods.
Organization structure: Product ID -> Metric Name -> Metric Query Method (PromQL)
```json
{
"freeleaps": {
// Just for demo
"cpu_usage": "100 - (avg by (instance) (irate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)",
// Just for demo
"memory_usage": "100 - ((node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100)",
// Just for demo
"disk_usage": "100 - ((node_filesystem_avail_bytes{mountpoint=\"/\"} / node_filesystem_size_bytes{mountpoint=\"/\"}) * 100)",
"latency_ms": "1000*avg(freeleaps_notification_http_request_duration_seconds_sum{handler!=\"none\"} / freeleaps_notification_http_request_duration_seconds_count)",
"reliability": "1-sum(rate(freeleaps_notification_http_requests_total{status=\"5xx\"}[1m]))"
},
"magicleaps": {}
}
```
If we want to add new metrics, theoretically we only need to add one configuration entry (provided that the metric exists in Prometheus and can be queried directly through PromQL without requiring any additional code processing)
## 3.2.API Design
### 3.2.1.Query Metrics by Product ID
API: `/api/metrics/prometheus/product/{product_id}/available-metrics`
Method: GET
Request:
```
product_id=freeleaps
```
Response:
```json
{
"product_id": "freeleaps",
"available_metrics": [
"cpu_usage",
"memory_usage",
"disk_usage",
"latency_ms",
"reliability"
],
"total_count": 5,
"description": "List of metrics with predefined PromQL queries for product 'freeleaps'"
}
```
### 3.2.2.Query Metric Info
API: `/api/metrics/prometheus/product/{product_id}/metric/{metric_name}/info`
Method: GET
Request:
```
product_id=freeleaps
metric_name=cpu_usage
```
Response:
```json
{
"metric_info": {
"product_id": "freeleaps",
"metric_name": "cpu_usage",
"promql_query": "100 - (avg by (instance) (irate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)",
"description": "PromQL query for cpu_usage metric in product freeleaps"
},
"description": "Information about metric 'cpu_usage' in product 'freeleaps'"
}
```
### 3.2.3.Query Metric Data
API: `/api/metrics/prometheus/metrics_query`
Method: GET
Request:
```
{
"product_id":"freeleaps",
"metric_name": "latency_ms",
"start_time": "2025-09-12T00:00:00Z",
"end_time": "2025-09-16T01:00:00Z",
"step":"1h" # Interval between data points in the query result
}
```
Response:
```json
{
"metric_name": "latency_ms",
"data_points": [
{
"date": "2025-09-12T08:00:00Z",
"value": 41.37141507698155,
"labels": {} # Optional: Additional labels for prometheus, Just for debugging
},
{
"date": "2025-09-12T09:00:00Z",
"value": 41.371992733188385,
"labels": {}
},
{
"date": "2025-09-12T10:00:00Z",
"value": 41.37792878125675,
"labels": {}
},
{
"date": "2025-09-12T11:00:00Z",
"value": 41.37297490632533,
"labels": {}
},
...
{
"date": "2025-09-16T08:00:00Z",
"value": 40.72491916149973,
"labels": {}
},
{
"date": "2025-09-16T09:00:00Z",
"value": 40.72186597550194,
"labels": {}
}
],
"total_points": 98,
"time_range": {
"start": "2025-09-12T00:00:00Z",
"end": "2025-09-16T01:00:00Z"
},
"step": "1h"
}
```
# 4.Universal Metrics
In the future, we can create an abstraction layer above StarRocks Metrics and Prometheus Metrics to unify metric queries from both data sources!

View File

@ -15,3 +15,5 @@ STARROCKS_DATABASE=freeleaps
LOG_BASE_PATH=./logs LOG_BASE_PATH=./logs
BACKEND_LOG_FILE_NAME=metrics BACKEND_LOG_FILE_NAME=metrics
APPLICATION_ACTIVITY_LOG=metrics-activity APPLICATION_ACTIVITY_LOG=metrics-activity
PROMETHEUS_ENDPOINT=http://localhost:9090

View File

@ -14,3 +14,4 @@ pytest==8.4.1
pytest-asyncio==0.21.2 pytest-asyncio==0.21.2
pymysql==1.1.0 pymysql==1.1.0
sqlalchemy==2.0.23 sqlalchemy==2.0.23
python-dotenv

View File

@ -1,69 +0,0 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from prometheus_fastapi_instrumentator import Instrumentator
from webapi.config.app_settings import site_settings
from loguru import logger
import os
def create_app() -> FastAPI:
"""
Create and configure the FastAPI application
"""
app = FastAPI(
title="Metrics Service API",
description="Metrics Service for Freeleaps Platform",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Setup logging
setup_logging()
# Setup Prometheus metrics
Instrumentator().instrument(app).expose(app)
# Include routers
# from webapi.routes import health, api
# app.include_router(health.router, prefix="/health", tags=["health"])
# app.include_router(api.router, prefix="/api/metrics", tags=["metrics"])
# Note: Registration router is included in main.py
return app
def setup_logging():
"""
Setup logging configuration
"""
# Create log directory if it doesn't exist
log_dir = site_settings.LOG_BASE_PATH
os.makedirs(log_dir, exist_ok=True)
# Configure loguru
logger.add(
f"{log_dir}/{site_settings.BACKEND_LOG_FILE_NAME}.log",
rotation="1 day",
retention="30 days",
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}"
)
logger.add(
f"{log_dir}/{site_settings.APPLICATION_ACTIVITY_LOG}.log",
rotation="1 day",
retention="30 days",
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}",
filter=lambda record: record["level"].name == "INFO"
)

View File

@ -0,0 +1,77 @@
import logging
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from common.config.app_settings import app_settings
from webapi.providers import exception_handler, common, probes, metrics, router
from webapi.providers.logger import register_logger
def create_app() -> FastAPI:
logging.info("App initializing")
app = FreeleapsMetricsApp()
register_logger()
register(app, exception_handler)
register(app, router)
register(app, common)
# Call the custom_openapi function to change the OpenAPI version
customize_openapi_security(app)
# Register probe APIs if enabled
if app_settings.PROBES_ENABLED:
register(app, probes)
# Register metrics APIs if enabled
if app_settings.METRICS_ENABLED:
register(app, metrics)
return app
# This function overrides the OpenAPI schema version to 3.0.0
def customize_openapi_security(app: FastAPI) -> None:
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
# Generate OpenAPI schema
openapi_schema = get_openapi(
title="FreeLeaps Metrics API",
version="3.1.0",
description="FreeLeaps Metrics API Documentation",
routes=app.routes,
)
# Ensure the components section exists in the OpenAPI schema
if "components" not in openapi_schema:
openapi_schema["components"] = {}
# Add security scheme to components
openapi_schema["components"]["securitySchemes"] = {
"bearerAuth": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}
}
# Add security requirement globally
openapi_schema["security"] = [{"bearerAuth": []}]
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
def register(app, provider):
logging.info(provider.__name__ + " registering")
provider.register(app)
def boot(app, provider):
logging.info(provider.__name__ + " booting")
provider.boot(app)
class FreeleapsMetricsApp(FastAPI):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

View File

@ -1,15 +1,14 @@
from webapi.bootstrap.app_factory import create_app from common.config.site_settings import site_settings
from webapi.config.app_settings import site_settings
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse
import uvicorn import uvicorn
from typing import Any
from webapi.routes import registration_metrics
from webapi.bootstrap.application import create_app
from webapi.routes.starrocks_metrics import metrics_query
app = create_app() app = create_app()
# Include routers # Include routers
app.include_router(registration_metrics.router) app.include_router(metrics_query.router)
@app.get("/", status_code=301) @app.get("/", status_code=301)
@ -24,13 +23,3 @@ if __name__ == "__main__":
uvicorn.run( uvicorn.run(
app="main:app", host=site_settings.SERVER_HOST, port=site_settings.SERVER_PORT app="main:app", host=site_settings.SERVER_HOST, port=site_settings.SERVER_PORT
) )
def get_context() -> Any:
# Define your context function. This is where you can set up authentication, database connections, etc.
return {}
def get_root_value() -> Any:
# Define your root value function. This is where you can set up the root value for GraphQL.
return {}

View File

@ -0,0 +1,31 @@
from fastapi.middleware.cors import CORSMiddleware
from common.config.site_settings import site_settings
def register(app):
app.debug = site_settings.DEBUG
app.title = site_settings.NAME
add_global_middleware(app)
# This hook ensures that a connection is opened to handle any queries
# generated by the request.
@app.on_event("startup")
async def startup():
pass
# This hook ensures that the connection is closed when we've finished
# processing the request.
@app.on_event("shutdown")
async def shutdown():
pass
def add_global_middleware(app):
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

View File

@ -0,0 +1,39 @@
from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.status import (
HTTP_400_BAD_REQUEST,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
HTTP_422_UNPROCESSABLE_ENTITY,
HTTP_500_INTERNAL_SERVER_ERROR,
)
async def custom_http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={"error": exc.detail},
)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=HTTP_400_BAD_REQUEST,
content={"error": str(exc)},
)
async def exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(exc)},
)
def register(app: FastAPI):
app.add_exception_handler(HTTPException, custom_http_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(Exception, exception_handler)

View File

@ -0,0 +1,7 @@
from common.log.base_logger import LoggerBase
def register_logger():
print("📢 Setting up logging interception...")
LoggerBase.configure_uvicorn_logging()
print("✅ Logging interception complete. Logs are formatted and deduplicated!")

View File

@ -0,0 +1,16 @@
import logging
from prometheus_fastapi_instrumentator import Instrumentator
from common.config.app_settings import app_settings
def register(app):
instrumentator = (
Instrumentator().instrument(
app,
metric_namespace="freeleaps-mertics",
metric_subsystem=app_settings.APP_NAME)
)
@app.on_event("startup")
async def startup():
instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True)
logging.info("Metrics endpoint exposed at /api/_/metrics")

View File

@ -0,0 +1,24 @@
from common.probes import ProbeManager, ProbeType
from common.probes.adapters import FastAPIAdapter
def register(app):
probes_manager = ProbeManager()
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
async def readiness_checker():
return {"success": True, "message": "Ready"}
probes_manager.register(
name="readiness",
prefix="/api",
type=ProbeType.READINESS,
check_func=readiness_checker,
frameworks=["fastapi"]
)
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
@app.on_event("startup")
async def mark_startup_complete():
probes_manager.mark_startup_complete()

View File

@ -0,0 +1,34 @@
from webapi.routes import api_router
from starlette import routing
def register(app):
app.include_router(
api_router,
prefix="/api",
tags=["api"],
dependencies=[],
responses={404: {"description": "no page found"}},
)
if app.debug:
for route in app.routes:
if not isinstance(route, routing.WebSocketRoute):
print(
{
"path": route.path,
"endpoint": route.endpoint,
"name": route.name,
"methods": route.methods,
}
)
else:
print(
{
"path": route.path,
"endpoint": route.endpoint,
"name": route.name,
"type": "web socket route",
}
)

View File

@ -0,0 +1,5 @@
from fastapi import APIRouter
from webapi.routes.metrics import router
api_router = APIRouter()
api_router.include_router(router, tags=["metrics"])

View File

@ -0,0 +1,7 @@
from fastapi import APIRouter
from webapi.routes.starrocks_metrics import api_router as starrocks_metrics_router
from webapi.routes.prometheus_metrics import api_router as prometheus_metrics_router
router = APIRouter()
router.include_router(starrocks_metrics_router, prefix="/metrics", tags=["starrocks-metrics"])
router.include_router(prometheus_metrics_router, prefix="/metrics", tags=["prometheus-metrics"])

View File

@ -0,0 +1,9 @@
from fastapi import APIRouter
from .available_metrics import router as available_metrics_router
from .metrics_query import router as metrics_query_router
from .metric_info import router as metric_info_router
api_router = APIRouter()
api_router.include_router(available_metrics_router, tags=["prometheus-metrics"])
api_router.include_router(metrics_query_router, tags=["prometheus-metrics"])
api_router.include_router(metric_info_router, tags=["prometheus-metrics"])

View File

@ -0,0 +1,31 @@
from fastapi import APIRouter
from common.log.module_logger import ModuleLogger
from backend.services.prometheus_metrics_service import PrometheusMetricsService
router = APIRouter()
# Initialize service and logger
prometheus_service = PrometheusMetricsService()
module_logger = ModuleLogger(__file__)
@router.get("/prometheus/product/{product_id}/available-metrics")
async def get_available_metrics(product_id: str):
"""
Get list of available metrics for a specific product.
Args:
product_id: Product ID to get metrics for (required).
Returns a list of metric names that have predefined PromQL queries for the specified product.
"""
await module_logger.log_info(f"Getting available metrics list for product_id: {product_id}")
metrics = prometheus_service.get_available_metrics(product_id)
return {
"product_id": product_id,
"available_metrics": metrics,
"total_count": len(metrics),
"description": f"List of metrics with predefined PromQL queries for product '{product_id}'"
}

View File

@ -0,0 +1,32 @@
from fastapi import APIRouter, HTTPException
from common.log.module_logger import ModuleLogger
from backend.services.prometheus_metrics_service import PrometheusMetricsService
router = APIRouter()
# Initialize service and logger
prometheus_service = PrometheusMetricsService()
module_logger = ModuleLogger(__file__)
@router.get("/prometheus/product/{product_id}/metric/{metric_name}/info")
async def get_metric_info(
product_id: str,
metric_name: str
):
"""
Get information about a specific metric including its PromQL query.
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric to get information for
"""
await module_logger.log_info(f"Getting info for metric '{metric_name}' from product '{product_id}'")
metric_info = await prometheus_service.get_metric_info(product_id, metric_name)
return {
"metric_info": metric_info,
"description": f"Information about metric '{metric_name}' in product '{product_id}'"
}

View File

@ -0,0 +1,83 @@
from fastapi import APIRouter
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field
from common.log.module_logger import ModuleLogger
from backend.services.prometheus_metrics_service import PrometheusMetricsService
class MetricDataPoint(BaseModel):
"""Single data point in a time series."""
date: str = Field(..., description="Timestamp in ISO format")
value: Optional[float] = Field(None, description="Metric value")
labels: Optional[Dict[str, str]] = Field(None, description="Metric labels")
class MetricTimeSeriesResponse(BaseModel):
"""Response model for metric time series data."""
metric_name: str = Field(..., description="Name of the queried metric")
data_points: List[MetricDataPoint] = Field(..., description="List of data points")
total_points: int = Field(..., description="Total number of data points")
time_range: Dict[str, str] = Field(..., description="Start and end time of the query")
step: str = Field("1h", description="Query resolution step")
class MetricQueryRequest(BaseModel):
"""Request model for metric query."""
product_id: str = Field(..., description="Product ID to identify which product's metrics to query")
metric_name: str = Field(..., description="Name of the metric to query")
start_time: str = Field(..., description="Start time in ISO format or RFC3339")
end_time: str = Field(..., description="End time in ISO format or RFC3339")
step: str = Field("1h", description="Query resolution step (e.g., 1m, 5m, 1h)")
router = APIRouter()
# Initialize service and logger
prometheus_service = PrometheusMetricsService()
module_logger = ModuleLogger(__file__)
@router.post("/prometheus/metrics_query", response_model=MetricTimeSeriesResponse)
async def metrics_query(
request: MetricQueryRequest
):
"""
Query metrics time series data.
Returns XY curve data (time series) for the specified metric within the given time range.
"""
await module_logger.log_info(
f"Querying metric '{request.metric_name}' from product '{request.product_id}' from {request.start_time} to {request.end_time}")
# Query the metric data
data_points = await prometheus_service.query_metric_by_time_range(
product_id=request.product_id,
metric_name=request.metric_name,
start_time=request.start_time,
end_time=request.end_time,
step=request.step
)
# Format response
response = MetricTimeSeriesResponse(
metric_name=request.metric_name,
data_points=[
MetricDataPoint(
date=point["date"],
value=point["value"],
labels=point["labels"]
)
for point in data_points
],
total_points=len(data_points),
time_range={
"start": request.start_time,
"end": request.end_time
},
step=request.step
)
await module_logger.log_info(
f"Successfully queried metric '{request.metric_name}' with {len(data_points)} data points")
return response

View File

@ -1,229 +0,0 @@
from fastapi import APIRouter, HTTPException, Query
from datetime import date, datetime, timedelta
from typing import Optional
from loguru import logger
from backend.services.registration_analytics_service import RegistrationService
from backend.models.user_registration_models import UserRegistrationResponse, UserRegistrationQuery
router = APIRouter(prefix="/api/metrics", tags=["registration"])
# Initialize service
registration_service = RegistrationService()
@router.get("/daily-registered-users", response_model=UserRegistrationResponse)
async def get_daily_registered_users(
start_date: date = Query(..., description="Start date in YYYY-MM-DD format"),
end_date: date = Query(..., description="End date in YYYY-MM-DD format"),
product_id: str = Query("freeleaps", description="Product identifier")
):
"""
Get daily registered users count for a date range
Returns two lists:
- dates: List of dates in YYYY-MM-DD format
- counts: List of daily registration counts
Example:
- GET /api/metrics/daily-registered-users?start_date=2024-01-01&end_date=2024-01-07
"""
try:
# Validate date range
if start_date > end_date:
raise HTTPException(
status_code=400,
detail="Start date must be before or equal to end date"
)
# Check date range is not too large (max 1 year)
if (end_date - start_date).days > 365:
raise HTTPException(
status_code=400,
detail="Date range cannot exceed 365 days"
)
logger.info(f"Querying registration data from {start_date} to {end_date} for product {product_id}")
# Get data from service
result = registration_service.get_daily_registered_users(
start_date, end_date, product_id
)
logger.info(f"Successfully retrieved data for {len(result.dates)} days")
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get daily registered users: {e}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@router.get("/registration-summary")
async def get_registration_summary(
start_date: date = Query(..., description="Start date in YYYY-MM-DD format"),
end_date: date = Query(..., description="End date in YYYY-MM-DD format"),
product_id: str = Query("freeleaps", description="Product identifier")
):
"""
Get summary statistics for user registrations in a date range
Returns summary statistics including:
- total_registrations: Total number of registrations
- average_daily: Average daily registrations
- max_daily: Maximum daily registrations
- min_daily: Minimum daily registrations
- days_with_registrations: Number of days with registrations
- total_days: Total number of days in range
"""
try:
# Validate date range
if start_date > end_date:
raise HTTPException(
status_code=400,
detail="Start date must be before or equal to end date"
)
if (end_date - start_date).days > 365:
raise HTTPException(
status_code=400,
detail="Date range cannot exceed 365 days"
)
logger.info(f"Querying registration summary from {start_date} to {end_date} for product {product_id}")
# Get summary from service
summary = registration_service.get_registration_summary(
start_date, end_date, product_id
)
return summary
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get registration summary: {e}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@router.get("/recent-registered-users", response_model=UserRegistrationResponse)
async def get_recent_registered_users(
days: int = Query(7, ge=1, le=365, description="Number of recent days to query"),
product_id: str = Query("freeleaps", description="Product identifier")
):
"""
Get daily registered users count for recent N days
Returns registration data for the last N days from today
Example:
- GET /api/metrics/recent-registered-users?days=7
- GET /api/metrics/recent-registered-users?days=30&product_id=freeleaps
"""
try:
# Calculate date range
end_date = date.today()
start_date = end_date - timedelta(days=days-1)
logger.info(f"Querying recent {days} days registration data from {start_date} to {end_date} for product {product_id}")
# Get data from service
result = registration_service.get_daily_registered_users(
start_date, end_date, product_id
)
logger.info(f"Successfully retrieved recent {days} days data, total registrations: {result.total_registrations}")
return result
except Exception as e:
logger.error(f"Failed to get recent registered users: {e}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@router.get("/registered-users-by-days", response_model=UserRegistrationResponse)
async def get_registered_users_by_days(
start_date: date = Query(..., description="Start date in YYYY-MM-DD format"),
days: int = Query(..., ge=1, le=365, description="Number of days from start date"),
product_id: str = Query("freeleaps", description="Product identifier")
):
"""
Get daily registered users count starting from a specific date for N days
Returns registration data for N days starting from the specified start date
Example:
- GET /api/metrics/registered-users-by-days?start_date=2024-01-01&days=7
- GET /api/metrics/registered-users-by-days?start_date=2024-09-01&days=30&product_id=freeleaps
"""
try:
# Calculate end date
end_date = start_date + timedelta(days=days-1)
logger.info(f"Querying registration data from {start_date} for {days} days (until {end_date}) for product {product_id}")
# Get data from service
result = registration_service.get_daily_registered_users(
start_date, end_date, product_id
)
logger.info(f"Successfully retrieved {days} days data from {start_date}, total registrations: {result.total_registrations}")
return result
except Exception as e:
logger.error(f"Failed to get registered users by days: {e}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@router.post("/daily-registered-users", response_model=UserRegistrationResponse)
async def get_daily_registered_users_post(
query: UserRegistrationQuery
):
"""
Get daily registered users count for a date range (POST method)
Same as GET method but accepts parameters in request body
"""
try:
# Validate date range
if query.start_date > query.end_date:
raise HTTPException(
status_code=400,
detail="Start date must be before or equal to end date"
)
if (query.end_date - query.start_date).days > 365:
raise HTTPException(
status_code=400,
detail="Date range cannot exceed 365 days"
)
logger.info(f"Querying registration data from {query.start_date} to {query.end_date} for product {query.product_id}")
# Get data from service
result = registration_service.get_daily_registered_users(
query.start_date, query.end_date, query.product_id
)
logger.info(f"Successfully retrieved data for {len(result.dates)} days")
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get daily registered users: {e}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)

View File

@ -0,0 +1,9 @@
from fastapi import APIRouter
from .metrics_query import router as metrics_query_router
from .available_metrics import router as available_metrics_router
from .metric_info import router as metric_info_router
api_router = APIRouter()
api_router.include_router(available_metrics_router, tags=["starrocks-metrics"])
api_router.include_router(metrics_query_router, tags=["starrocks-metrics"])
api_router.include_router(metric_info_router, tags=["starrocks-metrics"])

View File

@ -0,0 +1,45 @@
from fastapi import APIRouter, HTTPException
from common.log.module_logger import ModuleLogger
router = APIRouter()
# Initialize logger
module_logger = ModuleLogger(__file__)
# Product -> supported StarRocks-backed metrics
SUPPORTED_STARROCKS_METRICS_MAP = {
"freeleaps": [
"daily_registered_users",
],
"magicleaps": [
"daily_registered_users",
],
}
@router.get("/starrocks/product/{product_id}/available-metrics")
async def get_available_metrics(product_id: str):
"""
Get list of available StarRocks-backed metrics for a specific product.
Args:
product_id: Product ID to get metrics for (required).
Returns a list of metric names available via StarRocks for the specified product.
"""
await module_logger.log_info(
f"Getting StarRocks available metrics list for product_id: {product_id}"
)
if product_id not in SUPPORTED_STARROCKS_METRICS_MAP:
raise HTTPException(status_code=404, detail=f"Unknown product_id: {product_id}")
metrics = SUPPORTED_STARROCKS_METRICS_MAP[product_id]
return {
"product_id": product_id,
"available_metrics": metrics,
"total_count": len(metrics),
"description": f"List of StarRocks-backed metrics for product '{product_id}'",
}

View File

@ -0,0 +1,53 @@
from fastapi import APIRouter, HTTPException
from common.log.module_logger import ModuleLogger
router = APIRouter()
# Initialize logger
module_logger = ModuleLogger(__file__)
# Product -> metric -> description
STARROCKS_METRIC_DESCRIPTIONS = {
"freeleaps": {
"daily_registered_users": "Daily registered users count from StarRocks table dws_daily_registered_users",
},
"magicleaps": {
"daily_registered_users": "Daily registered users count from StarRocks table dws_daily_registered_users",
},
}
@router.get("/starrocks/product/{product_id}/metric/{metric_name}/info")
async def get_metric_info(
product_id: str,
metric_name: str
):
"""
Get information about a specific StarRocks-backed metric.
Args:
product_id: Product identifier for the product's data.
metric_name: Name of the StarRocks-backed metric.
"""
await module_logger.log_info(
f"Getting StarRocks metric info for metric '{metric_name}' from product '{product_id}'"
)
if product_id not in STARROCKS_METRIC_DESCRIPTIONS:
raise HTTPException(status_code=404, detail=f"Unknown product_id: {product_id}")
product_metrics = STARROCKS_METRIC_DESCRIPTIONS[product_id]
if metric_name not in product_metrics:
raise HTTPException(status_code=404, detail=f"Unknown metric '{metric_name}' for product '{product_id}'")
metric_info = {
"product_id": product_id,
"metric_name": metric_name,
"description": product_metrics[metric_name],
}
return {
"metric_info": metric_info,
"description": f"Information about StarRocks metric '{metric_name}' in product '{product_id}'",
}

View File

@ -0,0 +1,95 @@
from fastapi import APIRouter
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field
from datetime import date
from common.log.module_logger import ModuleLogger
from backend.services.registration_analytics_service import RegistrationService
class RegistrationDataPoint(BaseModel):
"""Single data point in registration time series."""
date: str = Field(..., description="Date in YYYY-MM-DD format")
value: int = Field(..., description="Number of registered users")
product_id: str = Field(..., description="Product identifier")
class RegistrationTimeSeriesResponse(BaseModel):
"""Response model for registration time series data."""
metric_name: str = Field(..., description="Name of the queried metric")
data_points: List[RegistrationDataPoint] = Field(..., description="List of data points")
total_points: int = Field(..., description="Total number of data points")
time_range: Dict[str, str] = Field(..., description="Start and end date of the query")
total_registrations: int = Field(..., description="Total number of registrations in the period")
class RegistrationQueryRequest(BaseModel):
"""Request model for registration query."""
product_id: str = Field("freeleaps", description="Product ID to identify which product's data to query")
start_date: str = Field(..., description="Start date in YYYY-MM-DD format")
end_date: str = Field(..., description="End date in YYYY-MM-DD format")
router = APIRouter()
# Initialize service and logger
registration_service = RegistrationService()
module_logger = ModuleLogger(__file__)
@router.post("/starrocks/dru_query", response_model=RegistrationTimeSeriesResponse)
async def metrics_query(
request: RegistrationQueryRequest
):
"""
Query registration time series data.
Returns XY curve data (time series) for user registrations within the given date range.
"""
await module_logger.log_info(
f"Querying registration data for product '{request.product_id}' from {request.start_date} to {request.end_date}")
# Parse dates - handle both YYYY-M-D and YYYY-MM-DD formats
def parse_date(date_str: str) -> date:
try:
return date.fromisoformat(date_str)
except ValueError:
# Try to parse YYYY-M-D format and convert to YYYY-MM-DD
parts = date_str.split('-')
if len(parts) == 3:
year, month, day = parts
return date(int(year), int(month), int(day))
raise ValueError(f"Invalid date format: {date_str}")
start_date = parse_date(request.start_date)
end_date = parse_date(request.end_date)
# Query the registration data
result = registration_service.get_daily_registered_users(
start_date=start_date,
end_date=end_date,
product_id=request.product_id
)
# Format response
response = RegistrationTimeSeriesResponse(
metric_name="daily_registered_users",
data_points=[
RegistrationDataPoint(
date=date_str,
value=count,
product_id=request.product_id
)
for date_str, count in zip(result.dates, result.counts)
],
total_points=len(result.dates),
time_range={
"start": request.start_date,
"end": request.end_date
},
total_registrations=result.total_registrations
)
await module_logger.log_info(
f"Successfully queried registration data with {len(result.dates)} data points, total registrations: {result.total_registrations}")
return response