feat: add support for Prometheus-related metrics

This commit is contained in:
icecheng 2025-09-16 17:04:53 +08:00
parent 7027e8c3f7
commit d008c1a8bc
12 changed files with 549 additions and 77 deletions

View File

@ -0,0 +1,119 @@
import httpx
from typing import Dict, Any, Optional, Union
from datetime import datetime
import json
from fastapi import HTTPException
from common.config.app_settings import app_settings
from common.log.module_logger import ModuleLogger
class PrometheusClient:
"""
Async Prometheus client for querying metrics data using PromQL.
This client provides methods to:
- Query data using PromQL expressions
- Get all available metrics
- Get labels for specific metrics
- Query metric series with label filters
"""
def __init__(self, endpoint: Optional[str] = None):
"""
Initialize Prometheus client.
Args:
endpoint: Prometheus server endpoint. If None, uses PROMETHEUS_ENDPOINT from settings.
"""
self.module_logger = ModuleLogger(__file__)
self.endpoint = endpoint or app_settings.PROMETHEUS_ENDPOINT
self.base_url = f"{self.endpoint.rstrip('/')}/api/v1"
async def request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Make HTTP request to Prometheus API.
Args:
endpoint: API endpoint path
params: Query parameters
Returns:
JSON response data
Raises:
httpx.HTTPError: If request fails
ValueError: If response is not valid JSON
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
try:
await self.module_logger.log_info(f"Making request to Prometheus: {url} with params: {params}")
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
if data.get("status") != "success":
error_msg = data.get('error', 'Unknown error')
await self.module_logger.log_error(f"Prometheus API error: {error_msg}")
raise HTTPException(status_code=400, detail=f"Prometheus API error: {error_msg}")
return data
except httpx.HTTPError as e:
await self.module_logger.log_error(f"HTTP error querying Prometheus: {e}")
raise HTTPException(status_code=502, detail=f"Failed to connect to Prometheus: {str(e)}")
except json.JSONDecodeError as e:
await self.module_logger.log_error(f"Invalid JSON response from Prometheus: {e}")
raise HTTPException(status_code=400, detail=f"Invalid response from Prometheus: {str(e)}")
async def query_range(
self,
query: str,
start: Union[str, datetime],
end: Union[str, datetime],
step: str = "15s"
) -> Dict[str, Any]:
"""
Execute a PromQL range query.
Args:
query: PromQL query string
start: Start time (RFC3339 string or datetime)
end: End time (RFC3339 string or datetime)
step: Query resolution step width (e.g., "15s", "1m", "1h")
Returns:
Range query result data
Example:
result = await client.query_range(
"up{job='prometheus'}",
start=datetime.now() - timedelta(hours=1),
end=datetime.now(),
step="1m"
)
"""
params = {
"query": query,
"step": step
}
# Convert datetime to RFC3339 string if needed
if isinstance(start, datetime):
if start.tzinfo is None:
params["start"] = start.isoformat() + "Z"
else:
params["start"] = start.isoformat()
else:
params["start"] = start
if isinstance(end, datetime):
if end.tzinfo is None:
params["end"] = end.isoformat() + "Z"
else:
params["end"] = end.isoformat()
else:
params["end"] = end
return await self.request("query_range", params)

View File

@ -0,0 +1,263 @@
from typing import Dict, List, Any, Optional, Union
from datetime import datetime, timedelta
from fastapi import HTTPException
from common.log.module_logger import ModuleLogger
from ..infra.external_service.prometheus_client import PrometheusClient
class PrometheusMetricsService:
"""
Service class for querying Prometheus metrics with predefined PromQL queries.
This service provides a high-level interface for querying metrics data
using predefined PromQL queries mapped to metric names.
"""
# Global dictionary mapping metric names to their corresponding PromQL queries
METRIC_PROMQL_MAP: Dict[str, str] = {
"freeleaps": {
# Just demo, No Usage
"cpu_usage": "100 - (avg by (instance) (irate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)",
# Just demo, No Usage
"memory_usage": "100 - ((node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100)",
# Just demo, No Usage
"disk_usage": "100 - ((node_filesystem_avail_bytes{mountpoint=\"/\"} / node_filesystem_size_bytes{mountpoint=\"/\"}) * 100)",
},
"magicleaps": {
}
}
def __init__(self, prometheus_endpoint: Optional[str] = None):
"""
Initialize PrometheusMetricsService.
Args:
prometheus_endpoint: Prometheus server endpoint. If None, uses default from settings.
"""
self.module_logger = ModuleLogger(__file__)
self.prometheus_client = PrometheusClient(prometheus_endpoint)
def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]:
"""
Get list of available metric names that have predefined PromQL queries.
Args:
product_id: Optional product ID to filter metrics. If None, returns all metrics from all products.
Returns:
List of available metric names
"""
if product_id:
if product_id in self.METRIC_PROMQL_MAP:
return list(self.METRIC_PROMQL_MAP[product_id].keys())
else:
return []
else:
# Return all metrics from all products
all_metrics = []
for product_metrics in self.METRIC_PROMQL_MAP.values():
all_metrics.extend(product_metrics.keys())
return all_metrics
def get_available_products(self) -> List[str]:
"""
Get list of available product IDs.
Returns:
List of available product IDs
"""
return list(self.METRIC_PROMQL_MAP.keys())
async def query_metric_by_time_range(
self,
product_id: str,
metric_name: str,
start_time: Union[str, datetime],
end_time: Union[str, datetime],
step: str = "1m"
) -> List[Dict[str, Any]]:
"""
Query metric data for a specific time range.
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric to query
start_time: Start time for the query (RFC3339 string or datetime)
end_time: End time for the query (RFC3339 string or datetime)
step: Query resolution step width (e.g., "1m", "5m", "1h")
Returns:
List of dictionaries with 'date' and 'value' keys
Raises:
ValueError: If product_id or metric_name is not found in the PromQL mapping
Exception: If Prometheus query fails
Example:
result = await service.query_metric_by_time_range(
"freeleaps",
"cpu_usage",
start_time=datetime.now() - timedelta(hours=1),
end_time=datetime.now(),
step="5m"
)
# Returns: [{"date": "2024-01-01T10:00:00Z", "value": 45.2}, ...]
"""
# Check if product_id exists in the mapping
if product_id not in self.METRIC_PROMQL_MAP:
available_products = ", ".join(self.get_available_products())
error_msg = f"Product '{product_id}' not found in PromQL mapping. Available products: {available_products}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Check if metric name exists in the product's mapping
if metric_name not in self.METRIC_PROMQL_MAP[product_id]:
available_metrics = ", ".join(self.get_available_metrics(product_id))
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' PromQL mapping. Available metrics: {available_metrics}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Parse datetime strings if they are strings
if isinstance(start_time, str):
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
else:
start_dt = start_time
if isinstance(end_time, str):
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
else:
end_dt = end_time
# Validate time range
if start_dt >= end_dt:
raise HTTPException(
status_code=400,
detail="Start time must be before end time"
)
# Check time range is not too large (max 7 days for detailed queries)
time_diff = end_dt - start_dt
if time_diff > timedelta(days=7):
raise HTTPException(
status_code=400,
detail="Time range cannot exceed 7 days for detailed queries"
)
# Get the PromQL query for the metric
promql_query = self.METRIC_PROMQL_MAP[product_id][metric_name]
try:
await self.module_logger.log_info(
f"Querying metric '{metric_name}' from product '{product_id}' with PromQL: {promql_query}")
# Execute the range query
result = await self.prometheus_client.query_range(
query=promql_query,
start=start_dt,
end=end_dt,
step=step
)
# Parse the result and format it
formatted_data = self._format_query_result(result, metric_name)
await self.module_logger.log_info(
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
return formatted_data
except Exception as e:
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
raise
def _format_query_result(self, prometheus_result: Dict[str, Any], metric_name: str) -> List[Dict[str, Any]]:
"""
Format Prometheus query result into the required format.
Args:
prometheus_result: Raw result from Prometheus API
metric_name: Name of the metric being queried
Returns:
List of dictionaries with 'date' and 'value' keys
"""
formatted_data = []
# Extract data from Prometheus result
data = prometheus_result.get("data", {})
result_type = data.get("resultType", "")
if result_type == "matrix":
# Handle range query results (matrix)
for series in data.get("result", []):
metric_labels = series.get("metric", {})
values = series.get("values", [])
for timestamp, value in values:
# Convert Unix timestamp to ISO format
date_str = datetime.fromtimestamp(timestamp).isoformat() + "Z"
formatted_data.append({
"date": date_str,
"value": float(value) if value != "NaN" else None,
"metric": metric_name,
"labels": metric_labels
})
elif result_type == "vector":
# Handle instant query results (vector)
for series in data.get("result", []):
metric_labels = series.get("metric", {})
timestamp = series.get("value", [None, None])[0]
value = series.get("value", [None, None])[1]
if timestamp and value:
date_str = datetime.fromtimestamp(timestamp).isoformat() + "Z"
formatted_data.append({
"date": date_str,
"value": float(value) if value != "NaN" else None,
"metric": metric_name,
"labels": metric_labels
})
# Sort by date
formatted_data.sort(key=lambda x: x["date"])
return formatted_data
async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]:
"""
Get information about a specific metric including its PromQL query.
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric
Returns:
Dictionary containing metric information
Raises:
ValueError: If product_id or metric_name is not found in the PromQL mapping
"""
# Check if product_id exists in the mapping
if product_id not in self.METRIC_PROMQL_MAP:
available_products = ", ".join(self.get_available_products())
error_msg = f"Product '{product_id}' not found in PromQL mapping. Available products: {available_products}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Check if metric name exists in the product's mapping
if metric_name not in self.METRIC_PROMQL_MAP[product_id]:
available_metrics = ", ".join(self.get_available_metrics(product_id))
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' PromQL mapping. Available metrics: {available_metrics}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
return {
"product_id": product_id,
"metric_name": metric_name,
"promql_query": self.METRIC_PROMQL_MAP[product_id][metric_name],
"description": f"PromQL query for {metric_name} metric in product {product_id}"
}

View File

@ -5,8 +5,8 @@ from typing import Optional
class AppSettings(BaseSettings):
# Log settings
LOG_BASE_PATH: str = "./logs"
BACKEND_LOG_FILE_NAME: str = "metrics"
APPLICATION_ACTIVITY_LOG: str = "metrics-activity"
BACKEND_LOG_FILE_NAME: str = "freeleaps-metrics"
APPLICATION_ACTIVITY_LOG: str = "freeleaps-metrics-activity"
# StarRocks database settings
STARROCKS_HOST: str = "freeleaps-starrocks-cluster-fe-service.freeleaps-data-platform.svc"

View File

@ -1,5 +1,5 @@
from .base_logger import LoggerBase
from app.common.config.app_settings import app_settings
from common.config.app_settings import app_settings
class ApplicationLogger(LoggerBase):
def __init__(self, application_activities: dict[str, any] = {}) -> None:

View File

@ -15,3 +15,5 @@ STARROCKS_DATABASE=freeleaps
LOG_BASE_PATH=./logs
BACKEND_LOG_FILE_NAME=metrics
APPLICATION_ACTIVITY_LOG=metrics-activity
PROMETHEUS_ENDPOINT=http://localhost:9090

View File

@ -1,69 +0,0 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from prometheus_fastapi_instrumentator import Instrumentator
from common.config.app_settings import site_settings
from loguru import logger
import os
def create_app() -> FastAPI:
"""
Create and configure the FastAPI application
"""
app = FastAPI(
title="Metrics Service API",
description="Metrics Service for Freeleaps Platform",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Setup logging
setup_logging()
# Setup Prometheus metrics
Instrumentator().instrument(app).expose(app)
# Include routers
# from webapi.routes import health, api
# app.include_router(health.router, prefix="/health", tags=["health"])
# app.include_router(api.router, prefix="/api/metrics", tags=["metrics"])
# Note: Registration router is included in main.py
return app
def setup_logging():
"""
Setup logging configuration
"""
# Create log directory if it doesn't exist
log_dir = site_settings.LOG_BASE_PATH
os.makedirs(log_dir, exist_ok=True)
# Configure loguru
logger.add(
f"{log_dir}/{site_settings.BACKEND_LOG_FILE_NAME}.log",
rotation="1 day",
retention="30 days",
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}"
)
logger.add(
f"{log_dir}/{site_settings.APPLICATION_ACTIVITY_LOG}.log",
rotation="1 day",
retention="30 days",
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}",
filter=lambda record: record["level"].name == "INFO"
)

View File

@ -6,7 +6,7 @@ def register(app):
instrumentator = (
Instrumentator().instrument(
app,
metric_namespace="freeleaps-auth",
metric_namespace="freeleaps-mertics",
metric_subsystem=app_settings.APP_NAME)
)

View File

@ -1,5 +1,7 @@
from fastapi import APIRouter
from webapi.routes.metrics.registration_metrics import router
from webapi.routes.metrics.registration_metrics import router as registration_router
from webapi.routes.prometheus_metrics import api_router as prometheus_metrics_router
api_router = APIRouter()
api_router.include_router(router,prefix="/metrics", tags=["metrics"])
router = APIRouter()
router.include_router(registration_router, prefix="/metrics", tags=["registration-metrics"])
router.include_router(prometheus_metrics_router, prefix="/metrics", tags=["prometheus-metrics"])

View File

@ -0,0 +1,9 @@
from fastapi import APIRouter
from .available_metrics import router as available_metrics_router
from .metrics_query import router as metrics_query_router
from .metric_info import router as metric_info_router
api_router = APIRouter()
api_router.include_router(available_metrics_router, tags=["prometheus-metrics"])
api_router.include_router(metrics_query_router, tags=["prometheus-metrics"])
api_router.include_router(metric_info_router, tags=["prometheus-metrics"])

View File

@ -0,0 +1,31 @@
from fastapi import APIRouter
from common.log.module_logger import ModuleLogger
from backend.services.prometheus_metrics_service import PrometheusMetricsService
router = APIRouter()
# Initialize service and logger
prometheus_service = PrometheusMetricsService()
module_logger = ModuleLogger(__file__)
@router.get("/prometheus/product/{product_id}/available-metrics")
async def get_available_metrics(product_id: str):
"""
Get list of available metrics for a specific product.
Args:
product_id: Product ID to get metrics for (required).
Returns a list of metric names that have predefined PromQL queries for the specified product.
"""
await module_logger.log_info(f"Getting available metrics list for product_id: {product_id}")
metrics = prometheus_service.get_available_metrics(product_id)
return {
"product_id": product_id,
"available_metrics": metrics,
"total_count": len(metrics),
"description": f"List of metrics with predefined PromQL queries for product '{product_id}'"
}

View File

@ -0,0 +1,32 @@
from fastapi import APIRouter, HTTPException
from common.log.module_logger import ModuleLogger
from backend.services.prometheus_metrics_service import PrometheusMetricsService
router = APIRouter()
# Initialize service and logger
prometheus_service = PrometheusMetricsService()
module_logger = ModuleLogger(__file__)
@router.get("/prometheus/product/{product_id}/metric/{metric_name}/info")
async def get_metric_info(
product_id: str,
metric_name: str
):
"""
Get information about a specific metric including its PromQL query.
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric to get information for
"""
await module_logger.log_info(f"Getting info for metric '{metric_name}' from product '{product_id}'")
metric_info = await prometheus_service.get_metric_info(product_id, metric_name)
return {
"metric_info": metric_info,
"description": f"Information about metric '{metric_name}' in product '{product_id}'"
}

View File

@ -0,0 +1,83 @@
from fastapi import APIRouter
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field
from common.log.module_logger import ModuleLogger
from backend.services.prometheus_metrics_service import PrometheusMetricsService
class MetricDataPoint(BaseModel):
"""Single data point in a time series."""
date: str = Field(..., description="Timestamp in ISO format")
value: Optional[float] = Field(None, description="Metric value")
labels: Optional[Dict[str, str]] = Field(None, description="Metric labels")
class MetricTimeSeriesResponse(BaseModel):
"""Response model for metric time series data."""
metric_name: str = Field(..., description="Name of the queried metric")
data_points: List[MetricDataPoint] = Field(..., description="List of data points")
total_points: int = Field(..., description="Total number of data points")
time_range: Dict[str, str] = Field(..., description="Start and end time of the query")
step: str = Field("1h", description="Query resolution step")
class MetricQueryRequest(BaseModel):
"""Request model for metric query."""
product_id: str = Field(..., description="Product ID to identify which product's metrics to query")
metric_name: str = Field(..., description="Name of the metric to query")
start_time: str = Field(..., description="Start time in ISO format or RFC3339")
end_time: str = Field(..., description="End time in ISO format or RFC3339")
step: str = Field("1h", description="Query resolution step (e.g., 1m, 5m, 1h)")
router = APIRouter()
# Initialize service and logger
prometheus_service = PrometheusMetricsService()
module_logger = ModuleLogger(__file__)
@router.post("/prometheus/metrics_query", response_model=MetricTimeSeriesResponse)
async def metrics_query(
request: MetricQueryRequest
):
"""
Query metrics time series data.
Returns XY curve data (time series) for the specified metric within the given time range.
"""
await module_logger.log_info(
f"Querying metric '{request.metric_name}' from product '{request.product_id}' from {request.start_time} to {request.end_time}")
# Query the metric data
data_points = await prometheus_service.query_metric_by_time_range(
product_id=request.product_id,
metric_name=request.metric_name,
start_time=request.start_time,
end_time=request.end_time,
step=request.step
)
# Format response
response = MetricTimeSeriesResponse(
metric_name=request.metric_name,
data_points=[
MetricDataPoint(
date=point["date"],
value=point["value"],
labels=point["labels"]
)
for point in data_points
],
total_points=len(data_points),
time_range={
"start": request.start_time,
"end": request.end_time
},
step=request.step
)
await module_logger.log_info(
f"Successfully queried metric '{request.metric_name}' with {len(data_points)} data points")
return response