diff --git a/apps/metrics/backend/infra/external_service/prometheus_client.py b/apps/metrics/backend/infra/external_service/prometheus_client.py new file mode 100644 index 0000000..db13c3a --- /dev/null +++ b/apps/metrics/backend/infra/external_service/prometheus_client.py @@ -0,0 +1,119 @@ +import httpx +from typing import Dict, Any, Optional, Union +from datetime import datetime +import json +from fastapi import HTTPException + +from common.config.app_settings import app_settings +from common.log.module_logger import ModuleLogger + + +class PrometheusClient: + """ + Async Prometheus client for querying metrics data using PromQL. + + This client provides methods to: + - Query data using PromQL expressions + - Get all available metrics + - Get labels for specific metrics + - Query metric series with label filters + """ + + def __init__(self, endpoint: Optional[str] = None): + """ + Initialize Prometheus client. + + Args: + endpoint: Prometheus server endpoint. If None, uses PROMETHEUS_ENDPOINT from settings. + """ + self.module_logger = ModuleLogger(__file__) + self.endpoint = endpoint or app_settings.PROMETHEUS_ENDPOINT + self.base_url = f"{self.endpoint.rstrip('/')}/api/v1" + + async def request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """ + Make HTTP request to Prometheus API. + + Args: + endpoint: API endpoint path + params: Query parameters + + Returns: + JSON response data + + Raises: + httpx.HTTPError: If request fails + ValueError: If response is not valid JSON + """ + url = f"{self.base_url}/{endpoint.lstrip('/')}" + + try: + await self.module_logger.log_info(f"Making request to Prometheus: {url} with params: {params}") + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get(url, params=params) + response.raise_for_status() + data = response.json() + if data.get("status") != "success": + error_msg = data.get('error', 'Unknown error') + await self.module_logger.log_error(f"Prometheus API error: {error_msg}") + raise HTTPException(status_code=400, detail=f"Prometheus API error: {error_msg}") + + return data + + except httpx.HTTPError as e: + await self.module_logger.log_error(f"HTTP error querying Prometheus: {e}") + raise HTTPException(status_code=502, detail=f"Failed to connect to Prometheus: {str(e)}") + except json.JSONDecodeError as e: + await self.module_logger.log_error(f"Invalid JSON response from Prometheus: {e}") + raise HTTPException(status_code=400, detail=f"Invalid response from Prometheus: {str(e)}") + + async def query_range( + self, + query: str, + start: Union[str, datetime], + end: Union[str, datetime], + step: str = "15s" + ) -> Dict[str, Any]: + """ + Execute a PromQL range query. + + Args: + query: PromQL query string + start: Start time (RFC3339 string or datetime) + end: End time (RFC3339 string or datetime) + step: Query resolution step width (e.g., "15s", "1m", "1h") + + Returns: + Range query result data + + Example: + result = await client.query_range( + "up{job='prometheus'}", + start=datetime.now() - timedelta(hours=1), + end=datetime.now(), + step="1m" + ) + """ + params = { + "query": query, + "step": step + } + + # Convert datetime to RFC3339 string if needed + if isinstance(start, datetime): + if start.tzinfo is None: + params["start"] = start.isoformat() + "Z" + else: + params["start"] = start.isoformat() + else: + params["start"] = start + + if isinstance(end, datetime): + if end.tzinfo is None: + params["end"] = end.isoformat() + "Z" + else: + params["end"] = end.isoformat() + else: + params["end"] = end + + return await self.request("query_range", params) diff --git a/apps/metrics/backend/services/prometheus_metrics_service.py b/apps/metrics/backend/services/prometheus_metrics_service.py new file mode 100644 index 0000000..e16a86e --- /dev/null +++ b/apps/metrics/backend/services/prometheus_metrics_service.py @@ -0,0 +1,263 @@ +from typing import Dict, List, Any, Optional, Union +from datetime import datetime, timedelta +from fastapi import HTTPException + +from common.log.module_logger import ModuleLogger +from ..infra.external_service.prometheus_client import PrometheusClient + + +class PrometheusMetricsService: + """ + Service class for querying Prometheus metrics with predefined PromQL queries. + + This service provides a high-level interface for querying metrics data + using predefined PromQL queries mapped to metric names. + """ + + # Global dictionary mapping metric names to their corresponding PromQL queries + METRIC_PROMQL_MAP: Dict[str, str] = { + "freeleaps": { + # Just demo, No Usage + "cpu_usage": "100 - (avg by (instance) (irate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)", + # Just demo, No Usage + "memory_usage": "100 - ((node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100)", + # Just demo, No Usage + "disk_usage": "100 - ((node_filesystem_avail_bytes{mountpoint=\"/\"} / node_filesystem_size_bytes{mountpoint=\"/\"}) * 100)", + }, + "magicleaps": { + + } + } + + def __init__(self, prometheus_endpoint: Optional[str] = None): + """ + Initialize PrometheusMetricsService. + + Args: + prometheus_endpoint: Prometheus server endpoint. If None, uses default from settings. + """ + self.module_logger = ModuleLogger(__file__) + self.prometheus_client = PrometheusClient(prometheus_endpoint) + + def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]: + """ + Get list of available metric names that have predefined PromQL queries. + + Args: + product_id: Optional product ID to filter metrics. If None, returns all metrics from all products. + + Returns: + List of available metric names + """ + if product_id: + if product_id in self.METRIC_PROMQL_MAP: + return list(self.METRIC_PROMQL_MAP[product_id].keys()) + else: + return [] + else: + # Return all metrics from all products + all_metrics = [] + for product_metrics in self.METRIC_PROMQL_MAP.values(): + all_metrics.extend(product_metrics.keys()) + return all_metrics + + def get_available_products(self) -> List[str]: + """ + Get list of available product IDs. + + Returns: + List of available product IDs + """ + return list(self.METRIC_PROMQL_MAP.keys()) + + async def query_metric_by_time_range( + self, + product_id: str, + metric_name: str, + start_time: Union[str, datetime], + end_time: Union[str, datetime], + step: str = "1m" + ) -> List[Dict[str, Any]]: + """ + Query metric data for a specific time range. + + Args: + product_id: Product ID to identify which product's metrics to query + metric_name: Name of the metric to query + start_time: Start time for the query (RFC3339 string or datetime) + end_time: End time for the query (RFC3339 string or datetime) + step: Query resolution step width (e.g., "1m", "5m", "1h") + + Returns: + List of dictionaries with 'date' and 'value' keys + + Raises: + ValueError: If product_id or metric_name is not found in the PromQL mapping + Exception: If Prometheus query fails + + Example: + result = await service.query_metric_by_time_range( + "freeleaps", + "cpu_usage", + start_time=datetime.now() - timedelta(hours=1), + end_time=datetime.now(), + step="5m" + ) + # Returns: [{"date": "2024-01-01T10:00:00Z", "value": 45.2}, ...] + """ + # Check if product_id exists in the mapping + if product_id not in self.METRIC_PROMQL_MAP: + available_products = ", ".join(self.get_available_products()) + error_msg = f"Product '{product_id}' not found in PromQL mapping. Available products: {available_products}" + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) + + # Check if metric name exists in the product's mapping + if metric_name not in self.METRIC_PROMQL_MAP[product_id]: + available_metrics = ", ".join(self.get_available_metrics(product_id)) + error_msg = f"Metric '{metric_name}' not found in product '{product_id}' PromQL mapping. Available metrics: {available_metrics}" + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) + + # Parse datetime strings if they are strings + if isinstance(start_time, str): + start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00')) + else: + start_dt = start_time + + if isinstance(end_time, str): + end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00')) + else: + end_dt = end_time + + # Validate time range + if start_dt >= end_dt: + raise HTTPException( + status_code=400, + detail="Start time must be before end time" + ) + + # Check time range is not too large (max 7 days for detailed queries) + time_diff = end_dt - start_dt + if time_diff > timedelta(days=7): + raise HTTPException( + status_code=400, + detail="Time range cannot exceed 7 days for detailed queries" + ) + + # Get the PromQL query for the metric + promql_query = self.METRIC_PROMQL_MAP[product_id][metric_name] + + try: + await self.module_logger.log_info( + f"Querying metric '{metric_name}' from product '{product_id}' with PromQL: {promql_query}") + + # Execute the range query + result = await self.prometheus_client.query_range( + query=promql_query, + start=start_dt, + end=end_dt, + step=step + ) + + # Parse the result and format it + formatted_data = self._format_query_result(result, metric_name) + + await self.module_logger.log_info( + f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points") + return formatted_data + + except Exception as e: + await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}") + raise + + def _format_query_result(self, prometheus_result: Dict[str, Any], metric_name: str) -> List[Dict[str, Any]]: + """ + Format Prometheus query result into the required format. + + Args: + prometheus_result: Raw result from Prometheus API + metric_name: Name of the metric being queried + + Returns: + List of dictionaries with 'date' and 'value' keys + """ + formatted_data = [] + + # Extract data from Prometheus result + data = prometheus_result.get("data", {}) + result_type = data.get("resultType", "") + + if result_type == "matrix": + # Handle range query results (matrix) + for series in data.get("result", []): + metric_labels = series.get("metric", {}) + values = series.get("values", []) + + for timestamp, value in values: + # Convert Unix timestamp to ISO format + date_str = datetime.fromtimestamp(timestamp).isoformat() + "Z" + + formatted_data.append({ + "date": date_str, + "value": float(value) if value != "NaN" else None, + "metric": metric_name, + "labels": metric_labels + }) + + elif result_type == "vector": + # Handle instant query results (vector) + for series in data.get("result", []): + metric_labels = series.get("metric", {}) + timestamp = series.get("value", [None, None])[0] + value = series.get("value", [None, None])[1] + + if timestamp and value: + date_str = datetime.fromtimestamp(timestamp).isoformat() + "Z" + + formatted_data.append({ + "date": date_str, + "value": float(value) if value != "NaN" else None, + "metric": metric_name, + "labels": metric_labels + }) + + # Sort by date + formatted_data.sort(key=lambda x: x["date"]) + + return formatted_data + + async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]: + """ + Get information about a specific metric including its PromQL query. + + Args: + product_id: Product ID to identify which product's metrics to query + metric_name: Name of the metric + + Returns: + Dictionary containing metric information + + Raises: + ValueError: If product_id or metric_name is not found in the PromQL mapping + """ + # Check if product_id exists in the mapping + if product_id not in self.METRIC_PROMQL_MAP: + available_products = ", ".join(self.get_available_products()) + error_msg = f"Product '{product_id}' not found in PromQL mapping. Available products: {available_products}" + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) + + # Check if metric name exists in the product's mapping + if metric_name not in self.METRIC_PROMQL_MAP[product_id]: + available_metrics = ", ".join(self.get_available_metrics(product_id)) + error_msg = f"Metric '{metric_name}' not found in product '{product_id}' PromQL mapping. Available metrics: {available_metrics}" + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) + + return { + "product_id": product_id, + "metric_name": metric_name, + "promql_query": self.METRIC_PROMQL_MAP[product_id][metric_name], + "description": f"PromQL query for {metric_name} metric in product {product_id}" + } diff --git a/apps/metrics/common/config/app_settings.py b/apps/metrics/common/config/app_settings.py index 3575b8d..cdda983 100644 --- a/apps/metrics/common/config/app_settings.py +++ b/apps/metrics/common/config/app_settings.py @@ -5,8 +5,8 @@ from typing import Optional class AppSettings(BaseSettings): # Log settings LOG_BASE_PATH: str = "./logs" - BACKEND_LOG_FILE_NAME: str = "metrics" - APPLICATION_ACTIVITY_LOG: str = "metrics-activity" + BACKEND_LOG_FILE_NAME: str = "freeleaps-metrics" + APPLICATION_ACTIVITY_LOG: str = "freeleaps-metrics-activity" # StarRocks database settings STARROCKS_HOST: str = "freeleaps-starrocks-cluster-fe-service.freeleaps-data-platform.svc" diff --git a/apps/metrics/common/log/application_logger.py b/apps/metrics/common/log/application_logger.py index 896c044..67ec321 100644 --- a/apps/metrics/common/log/application_logger.py +++ b/apps/metrics/common/log/application_logger.py @@ -1,5 +1,5 @@ from .base_logger import LoggerBase -from app.common.config.app_settings import app_settings +from common.config.app_settings import app_settings class ApplicationLogger(LoggerBase): def __init__(self, application_activities: dict[str, any] = {}) -> None: diff --git a/apps/metrics/local.env b/apps/metrics/local.env index 616515e..4b601db 100644 --- a/apps/metrics/local.env +++ b/apps/metrics/local.env @@ -14,4 +14,6 @@ STARROCKS_DATABASE=freeleaps # log settings LOG_BASE_PATH=./logs BACKEND_LOG_FILE_NAME=metrics -APPLICATION_ACTIVITY_LOG=metrics-activity \ No newline at end of file +APPLICATION_ACTIVITY_LOG=metrics-activity + +PROMETHEUS_ENDPOINT=http://localhost:9090 \ No newline at end of file diff --git a/apps/metrics/webapi/bootstrap/app_factory.py b/apps/metrics/webapi/bootstrap/app_factory.py deleted file mode 100644 index feee790..0000000 --- a/apps/metrics/webapi/bootstrap/app_factory.py +++ /dev/null @@ -1,69 +0,0 @@ -from fastapi import FastAPI -from fastapi.middleware.cors import CORSMiddleware -from prometheus_fastapi_instrumentator import Instrumentator -from common.config.app_settings import site_settings -from loguru import logger -import os - - -def create_app() -> FastAPI: - """ - Create and configure the FastAPI application - """ - app = FastAPI( - title="Metrics Service API", - description="Metrics Service for Freeleaps Platform", - version="1.0.0", - docs_url="/docs", - redoc_url="/redoc" - ) - - # Add CORS middleware - app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) - - # Setup logging - setup_logging() - - # Setup Prometheus metrics - Instrumentator().instrument(app).expose(app) - - # Include routers - # from webapi.routes import health, api - # app.include_router(health.router, prefix="/health", tags=["health"]) - # app.include_router(api.router, prefix="/api/metrics", tags=["metrics"]) - # Note: Registration router is included in main.py - - return app - - -def setup_logging(): - """ - Setup logging configuration - """ - # Create log directory if it doesn't exist - log_dir = site_settings.LOG_BASE_PATH - os.makedirs(log_dir, exist_ok=True) - - # Configure loguru - logger.add( - f"{log_dir}/{site_settings.BACKEND_LOG_FILE_NAME}.log", - rotation="1 day", - retention="30 days", - level="INFO", - format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}" - ) - - logger.add( - f"{log_dir}/{site_settings.APPLICATION_ACTIVITY_LOG}.log", - rotation="1 day", - retention="30 days", - level="INFO", - format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}", - filter=lambda record: record["level"].name == "INFO" - ) diff --git a/apps/metrics/webapi/providers/metrics.py b/apps/metrics/webapi/providers/metrics.py index 593369d..08811ba 100644 --- a/apps/metrics/webapi/providers/metrics.py +++ b/apps/metrics/webapi/providers/metrics.py @@ -6,7 +6,7 @@ def register(app): instrumentator = ( Instrumentator().instrument( app, - metric_namespace="freeleaps-auth", + metric_namespace="freeleaps-mertics", metric_subsystem=app_settings.APP_NAME) ) diff --git a/apps/metrics/webapi/routes/metrics/__init__.py b/apps/metrics/webapi/routes/metrics/__init__.py index e7012cd..5e05f09 100644 --- a/apps/metrics/webapi/routes/metrics/__init__.py +++ b/apps/metrics/webapi/routes/metrics/__init__.py @@ -1,5 +1,7 @@ from fastapi import APIRouter -from webapi.routes.metrics.registration_metrics import router +from webapi.routes.metrics.registration_metrics import router as registration_router +from webapi.routes.prometheus_metrics import api_router as prometheus_metrics_router -api_router = APIRouter() -api_router.include_router(router,prefix="/metrics", tags=["metrics"]) +router = APIRouter() +router.include_router(registration_router, prefix="/metrics", tags=["registration-metrics"]) +router.include_router(prometheus_metrics_router, prefix="/metrics", tags=["prometheus-metrics"]) diff --git a/apps/metrics/webapi/routes/prometheus_metrics/__init__.py b/apps/metrics/webapi/routes/prometheus_metrics/__init__.py new file mode 100644 index 0000000..1c7aa93 --- /dev/null +++ b/apps/metrics/webapi/routes/prometheus_metrics/__init__.py @@ -0,0 +1,9 @@ +from fastapi import APIRouter +from .available_metrics import router as available_metrics_router +from .metrics_query import router as metrics_query_router +from .metric_info import router as metric_info_router + +api_router = APIRouter() +api_router.include_router(available_metrics_router, tags=["prometheus-metrics"]) +api_router.include_router(metrics_query_router, tags=["prometheus-metrics"]) +api_router.include_router(metric_info_router, tags=["prometheus-metrics"]) diff --git a/apps/metrics/webapi/routes/prometheus_metrics/available_metrics.py b/apps/metrics/webapi/routes/prometheus_metrics/available_metrics.py new file mode 100644 index 0000000..2cb66e6 --- /dev/null +++ b/apps/metrics/webapi/routes/prometheus_metrics/available_metrics.py @@ -0,0 +1,31 @@ +from fastapi import APIRouter + +from common.log.module_logger import ModuleLogger +from backend.services.prometheus_metrics_service import PrometheusMetricsService + +router = APIRouter() + +# Initialize service and logger +prometheus_service = PrometheusMetricsService() +module_logger = ModuleLogger(__file__) + + +@router.get("/prometheus/product/{product_id}/available-metrics") +async def get_available_metrics(product_id: str): + """ + Get list of available metrics for a specific product. + + Args: + product_id: Product ID to get metrics for (required). + + Returns a list of metric names that have predefined PromQL queries for the specified product. + """ + await module_logger.log_info(f"Getting available metrics list for product_id: {product_id}") + metrics = prometheus_service.get_available_metrics(product_id) + + return { + "product_id": product_id, + "available_metrics": metrics, + "total_count": len(metrics), + "description": f"List of metrics with predefined PromQL queries for product '{product_id}'" + } diff --git a/apps/metrics/webapi/routes/prometheus_metrics/metric_info.py b/apps/metrics/webapi/routes/prometheus_metrics/metric_info.py new file mode 100644 index 0000000..4dbef60 --- /dev/null +++ b/apps/metrics/webapi/routes/prometheus_metrics/metric_info.py @@ -0,0 +1,32 @@ +from fastapi import APIRouter, HTTPException + +from common.log.module_logger import ModuleLogger +from backend.services.prometheus_metrics_service import PrometheusMetricsService + +router = APIRouter() + +# Initialize service and logger +prometheus_service = PrometheusMetricsService() +module_logger = ModuleLogger(__file__) + + +@router.get("/prometheus/product/{product_id}/metric/{metric_name}/info") +async def get_metric_info( + product_id: str, + metric_name: str +): + """ + Get information about a specific metric including its PromQL query. + + Args: + product_id: Product ID to identify which product's metrics to query + metric_name: Name of the metric to get information for + """ + await module_logger.log_info(f"Getting info for metric '{metric_name}' from product '{product_id}'") + + metric_info = await prometheus_service.get_metric_info(product_id, metric_name) + + return { + "metric_info": metric_info, + "description": f"Information about metric '{metric_name}' in product '{product_id}'" + } diff --git a/apps/metrics/webapi/routes/prometheus_metrics/metrics_query.py b/apps/metrics/webapi/routes/prometheus_metrics/metrics_query.py new file mode 100644 index 0000000..69ad4c8 --- /dev/null +++ b/apps/metrics/webapi/routes/prometheus_metrics/metrics_query.py @@ -0,0 +1,83 @@ +from fastapi import APIRouter +from typing import Optional, List, Dict, Any +from pydantic import BaseModel, Field + +from common.log.module_logger import ModuleLogger +from backend.services.prometheus_metrics_service import PrometheusMetricsService + + +class MetricDataPoint(BaseModel): + """Single data point in a time series.""" + date: str = Field(..., description="Timestamp in ISO format") + value: Optional[float] = Field(None, description="Metric value") + labels: Optional[Dict[str, str]] = Field(None, description="Metric labels") + + +class MetricTimeSeriesResponse(BaseModel): + """Response model for metric time series data.""" + metric_name: str = Field(..., description="Name of the queried metric") + data_points: List[MetricDataPoint] = Field(..., description="List of data points") + total_points: int = Field(..., description="Total number of data points") + time_range: Dict[str, str] = Field(..., description="Start and end time of the query") + step: str = Field("1h", description="Query resolution step") + + +class MetricQueryRequest(BaseModel): + """Request model for metric query.""" + product_id: str = Field(..., description="Product ID to identify which product's metrics to query") + metric_name: str = Field(..., description="Name of the metric to query") + start_time: str = Field(..., description="Start time in ISO format or RFC3339") + end_time: str = Field(..., description="End time in ISO format or RFC3339") + step: str = Field("1h", description="Query resolution step (e.g., 1m, 5m, 1h)") + + +router = APIRouter() + +# Initialize service and logger +prometheus_service = PrometheusMetricsService() +module_logger = ModuleLogger(__file__) + + +@router.post("/prometheus/metrics_query", response_model=MetricTimeSeriesResponse) +async def metrics_query( + request: MetricQueryRequest +): + """ + Query metrics time series data. + + Returns XY curve data (time series) for the specified metric within the given time range. + """ + await module_logger.log_info( + f"Querying metric '{request.metric_name}' from product '{request.product_id}' from {request.start_time} to {request.end_time}") + + # Query the metric data + data_points = await prometheus_service.query_metric_by_time_range( + product_id=request.product_id, + metric_name=request.metric_name, + start_time=request.start_time, + end_time=request.end_time, + step=request.step + ) + + # Format response + response = MetricTimeSeriesResponse( + metric_name=request.metric_name, + data_points=[ + MetricDataPoint( + date=point["date"], + value=point["value"], + labels=point["labels"] + ) + for point in data_points + ], + total_points=len(data_points), + time_range={ + "start": request.start_time, + "end": request.end_time + }, + step=request.step + ) + + await module_logger.log_info( + f"Successfully queried metric '{request.metric_name}' with {len(data_points)} data points") + return response