380 lines
16 KiB
Python
380 lines
16 KiB
Python
from typing import Dict, List, Any, Optional, Union
|
|
from datetime import datetime, timedelta, timezone
|
|
from fastapi import HTTPException
|
|
|
|
from common.log.module_logger import ModuleLogger
|
|
from ..infra.external_service.prometheus_client import PrometheusClient
|
|
|
|
|
|
class PrometheusMetricsService:
|
|
"""
|
|
Service class for querying Prometheus metrics with predefined PromQL queries.
|
|
|
|
This service provides a high-level interface for querying metrics data
|
|
using predefined PromQL queries mapped to metric names.
|
|
"""
|
|
|
|
# Global dictionary mapping metric names to their corresponding PromQL queries
|
|
METRIC_PROMQL_MAP: Dict[str, str] = {
|
|
"freeleaps": {
|
|
# Just demo, No Usage
|
|
"cpu_usage": "100 - (avg by (instance) (irate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)",
|
|
# Just demo, No Usage
|
|
"memory_usage": "100 - ((node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100)",
|
|
# Just demo, No Usage
|
|
"disk_usage": "100 - ((node_filesystem_avail_bytes{mountpoint=\"/\"} / node_filesystem_size_bytes{mountpoint=\"/\"}) * 100)",
|
|
# Average response time for notification HTTP requests
|
|
"latency_ms": "(sum(rate(freeleaps_notification_http_request_duration_seconds_sum{handler!~'/api/_/.*'}[30m])) / sum(rate(freeleaps_notification_http_request_duration_seconds_count{handler!~'/api/_/.*'}[30m]))) * 1000",
|
|
# Error rate for 5xx HTTP status codes (stability metric)
|
|
"reliability": "(1 - sum(rate(freeleaps_notification_http_requests_total{status=~'5..'}[30m])) / sum(rate(freeleaps_notification_http_requests_total[30m]))) * 1 or vector(1)",
|
|
},
|
|
"magicleaps": {
|
|
|
|
}
|
|
}
|
|
|
|
def __init__(self, prometheus_endpoint: Optional[str] = None):
|
|
"""
|
|
Initialize PrometheusMetricsService.
|
|
|
|
Args:
|
|
prometheus_endpoint: Prometheus server endpoint. If None, uses default from settings.
|
|
"""
|
|
self.module_logger = ModuleLogger(__file__)
|
|
self.prometheus_client = PrometheusClient(prometheus_endpoint)
|
|
|
|
def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]:
|
|
"""
|
|
Get list of available metric names that have predefined PromQL queries.
|
|
|
|
Args:
|
|
product_id: Optional product ID to filter metrics. If None, returns all metrics from all products.
|
|
|
|
Returns:
|
|
List of available metric names
|
|
"""
|
|
if product_id:
|
|
if product_id in self.METRIC_PROMQL_MAP:
|
|
return list(self.METRIC_PROMQL_MAP[product_id].keys())
|
|
else:
|
|
return []
|
|
else:
|
|
# Return all metrics from all products
|
|
all_metrics = []
|
|
for product_metrics in self.METRIC_PROMQL_MAP.values():
|
|
all_metrics.extend(product_metrics.keys())
|
|
return all_metrics
|
|
|
|
def get_available_products(self) -> List[str]:
|
|
"""
|
|
Get list of available product IDs.
|
|
|
|
Returns:
|
|
List of available product IDs
|
|
"""
|
|
return list(self.METRIC_PROMQL_MAP.keys())
|
|
|
|
def _parse_utc_datetime(self, time_input: Union[str, datetime]) -> datetime:
|
|
"""
|
|
Parse time input and ensure it's a UTC timezone-aware datetime.
|
|
Assumes all input times are already in UTC.
|
|
|
|
Args:
|
|
time_input: Time as string (RFC3339) or datetime object, assumed to be UTC
|
|
|
|
Returns:
|
|
UTC timezone-aware datetime object
|
|
"""
|
|
if isinstance(time_input, str):
|
|
# Handle RFC3339 format, assume UTC
|
|
if time_input.endswith('Z'):
|
|
return datetime.fromisoformat(time_input.replace('Z', '+00:00'))
|
|
elif '+' in time_input or time_input.count('-') > 2:
|
|
return datetime.fromisoformat(time_input)
|
|
else:
|
|
# Assume UTC if no timezone specified
|
|
return datetime.fromisoformat(time_input + '+00:00')
|
|
else:
|
|
# Assume datetime is already UTC, just ensure it's timezone-aware
|
|
if time_input.tzinfo is None:
|
|
return time_input.replace(tzinfo=timezone.utc)
|
|
else:
|
|
# Already timezone-aware, assume it's UTC
|
|
return time_input
|
|
|
|
async def query_metric_by_time_range(
|
|
self,
|
|
product_id: str,
|
|
metric_name: str,
|
|
start_time: Union[str, datetime],
|
|
end_time: Union[str, datetime],
|
|
step: str = "1m"
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Query metric data for a specific time range.
|
|
|
|
Args:
|
|
product_id: Product ID to identify which product's metrics to query
|
|
metric_name: Name of the metric to query
|
|
start_time: Start time for the query (RFC3339 string or datetime, assumed to be UTC)
|
|
end_time: End time for the query (RFC3339 string or datetime, assumed to be UTC)
|
|
step: Query resolution step width (e.g., "1m", "5m", "1h")
|
|
|
|
Returns:
|
|
List of dictionaries with 'date' and 'value' keys
|
|
|
|
Raises:
|
|
ValueError: If product_id or metric_name is not found in the PromQL mapping
|
|
Exception: If Prometheus query fails
|
|
|
|
Example:
|
|
result = await service.query_metric_by_time_range(
|
|
"freeleaps",
|
|
"cpu_usage",
|
|
start_time=datetime.now(timezone.utc) - timedelta(hours=1),
|
|
end_time=datetime.now(timezone.utc),
|
|
step="5m"
|
|
)
|
|
# Returns: [{"date": "2024-01-01T10:00:00Z", "value": 45.2}, ...]
|
|
"""
|
|
# Check if product_id exists in the mapping
|
|
if product_id not in self.METRIC_PROMQL_MAP:
|
|
available_products = ", ".join(self.get_available_products())
|
|
error_msg = f"Product '{product_id}' not found in PromQL mapping. Available products: {available_products}"
|
|
await self.module_logger.log_error(error_msg)
|
|
raise HTTPException(status_code=404, detail=error_msg)
|
|
|
|
# Check if metric name exists in the product's mapping
|
|
if metric_name not in self.METRIC_PROMQL_MAP[product_id]:
|
|
available_metrics = ", ".join(self.get_available_metrics(product_id))
|
|
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' PromQL mapping. Available metrics: {available_metrics}"
|
|
await self.module_logger.log_error(error_msg)
|
|
raise HTTPException(status_code=404, detail=error_msg)
|
|
|
|
# Parse time inputs (assume all inputs are UTC)
|
|
start_dt = self._parse_utc_datetime(start_time)
|
|
end_dt = self._parse_utc_datetime(end_time)
|
|
|
|
# Validate time range
|
|
if start_dt >= end_dt:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Start time must be before end time"
|
|
)
|
|
|
|
# Check time range is not too large (max 7 days for detailed queries)
|
|
time_diff = end_dt - start_dt
|
|
if time_diff > timedelta(days=7):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Time range cannot exceed 7 days for detailed queries"
|
|
)
|
|
|
|
# Get the PromQL query for the metric
|
|
promql_query = self.METRIC_PROMQL_MAP[product_id][metric_name]
|
|
|
|
try:
|
|
await self.module_logger.log_info(
|
|
f"Querying metric '{metric_name}' from product '{product_id}' with PromQL: {promql_query}")
|
|
|
|
# Execute the range query
|
|
result = await self.prometheus_client.query_range(
|
|
query=promql_query,
|
|
start=start_dt,
|
|
end=end_dt,
|
|
step=step
|
|
)
|
|
|
|
# Parse the result and format it
|
|
formatted_data = self._format_query_result(result, metric_name, start_dt, end_dt, step)
|
|
|
|
await self.module_logger.log_info(
|
|
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
|
|
return formatted_data
|
|
|
|
except Exception as e:
|
|
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
|
|
raise
|
|
|
|
def _format_query_result(self, prometheus_result: Dict[str, Any], metric_name: str, start_dt: datetime = None,
|
|
end_dt: datetime = None, step: str = "1h") -> List[Dict[str, Any]]:
|
|
"""
|
|
Format Prometheus query result into the required format and fill missing time points with 0 values.
|
|
|
|
Args:
|
|
prometheus_result: Raw result from Prometheus API
|
|
metric_name: Name of the metric being queried
|
|
start_dt: Start time for filling missing data points
|
|
end_dt: End time for filling missing data points
|
|
step: Step size for filling missing data points
|
|
|
|
Returns:
|
|
List of dictionaries with 'date' and 'value' keys, with missing time points filled with 0 values
|
|
"""
|
|
formatted_data = []
|
|
|
|
# Extract data from Prometheus result
|
|
data = prometheus_result.get("data", {})
|
|
result_type = data.get("resultType", "")
|
|
|
|
# Check if there are any warnings from Prometheus
|
|
warnings = prometheus_result.get("warnings", [])
|
|
if warnings:
|
|
self.module_logger.log_warning(f"Prometheus warnings: {warnings}")
|
|
|
|
# First, collect all data points from Prometheus
|
|
result_dict = {}
|
|
|
|
if result_type == "matrix":
|
|
# Handle range query results (matrix)
|
|
for series in data.get("result", []):
|
|
metric_labels = series.get("metric", {})
|
|
values = series.get("values", [])
|
|
|
|
for timestamp, value in values:
|
|
# Convert Unix timestamp to UTC ISO format
|
|
date_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
|
|
|
|
# Handle NaN values properly
|
|
if value == "NaN" or value is None:
|
|
formatted_value = None
|
|
else:
|
|
try:
|
|
formatted_value = float(value)
|
|
except (ValueError, TypeError):
|
|
formatted_value = None
|
|
|
|
result_dict[date_str] = {
|
|
"date": date_str,
|
|
"value": formatted_value,
|
|
"metric": metric_name,
|
|
"labels": metric_labels
|
|
}
|
|
|
|
elif result_type == "vector":
|
|
# Handle instant query results (vector)
|
|
for series in data.get("result", []):
|
|
metric_labels = series.get("metric", {})
|
|
timestamp = series.get("value", [None, None])[0]
|
|
value = series.get("value", [None, None])[1]
|
|
|
|
if timestamp and value:
|
|
date_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
|
|
|
|
# Handle NaN values properly
|
|
if value == "NaN" or value is None:
|
|
formatted_value = None
|
|
else:
|
|
try:
|
|
formatted_value = float(value)
|
|
except (ValueError, TypeError):
|
|
formatted_value = None
|
|
|
|
result_dict[date_str] = {
|
|
"date": date_str,
|
|
"value": formatted_value,
|
|
"metric": metric_name,
|
|
"labels": metric_labels
|
|
}
|
|
|
|
# If we have start and end times, fill missing time points
|
|
if start_dt and end_dt and step:
|
|
formatted_data = self._fill_missing_time_points(result_dict, start_dt, end_dt, step, metric_name)
|
|
else:
|
|
# Just return the data we have, sorted by date
|
|
formatted_data = list(result_dict.values())
|
|
formatted_data.sort(key=lambda x: x["date"])
|
|
|
|
return formatted_data
|
|
|
|
def _fill_missing_time_points(self, result_dict: Dict[str, Dict[str, Any]], start_dt: datetime, end_dt: datetime,
|
|
step: str, metric_name: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Fill missing time points in the time series data with 0 values.
|
|
|
|
Args:
|
|
result_dict: Dictionary of existing data points
|
|
start_dt: Start time for the range
|
|
end_dt: End time for the range
|
|
step: Step size (e.g., "1h", "1m", "5m")
|
|
metric_name: Name of the metric
|
|
|
|
Returns:
|
|
List of data points with missing time points filled with 0 values
|
|
"""
|
|
formatted_data = []
|
|
|
|
# Parse step to determine time increment
|
|
if step.endswith('h'):
|
|
hours = int(step[:-1])
|
|
time_increment = timedelta(hours=hours)
|
|
elif step.endswith('m'):
|
|
minutes = int(step[:-1])
|
|
time_increment = timedelta(minutes=minutes)
|
|
elif step.endswith('d'):
|
|
minutes = int(step[:-1])
|
|
time_increment = timedelta(days=minutes)
|
|
elif step.endswith('s'):
|
|
seconds = int(step[:-1])
|
|
time_increment = timedelta(seconds=seconds)
|
|
else:
|
|
# Default to 1 hour if step format is not recognized
|
|
time_increment = timedelta(hours=1)
|
|
|
|
# Generate complete time range
|
|
current_dt = start_dt
|
|
while current_dt <= end_dt:
|
|
date_str = current_dt.isoformat()
|
|
|
|
if date_str in result_dict:
|
|
# Use existing data point
|
|
result_dict[date_str]["date"] = result_dict[date_str]["date"].replace('+00:00', 'Z')
|
|
formatted_data.append(result_dict[date_str])
|
|
else:
|
|
# Fill missing data point with 0 value
|
|
formatted_data.append({
|
|
"date": date_str.replace('+00:00', 'Z'),
|
|
"value": None,
|
|
"metric": metric_name,
|
|
"labels": {}
|
|
})
|
|
|
|
current_dt += time_increment
|
|
|
|
return formatted_data
|
|
|
|
async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]:
|
|
"""
|
|
Get information about a specific metric including its PromQL query.
|
|
|
|
Args:
|
|
product_id: Product ID to identify which product's metrics to query
|
|
metric_name: Name of the metric
|
|
|
|
Returns:
|
|
Dictionary containing metric information
|
|
|
|
Raises:
|
|
ValueError: If product_id or metric_name is not found in the PromQL mapping
|
|
"""
|
|
# Check if product_id exists in the mapping
|
|
if product_id not in self.METRIC_PROMQL_MAP:
|
|
available_products = ", ".join(self.get_available_products())
|
|
error_msg = f"Product '{product_id}' not found in PromQL mapping. Available products: {available_products}"
|
|
await self.module_logger.log_error(error_msg)
|
|
raise HTTPException(status_code=404, detail=error_msg)
|
|
|
|
# Check if metric name exists in the product's mapping
|
|
if metric_name not in self.METRIC_PROMQL_MAP[product_id]:
|
|
available_metrics = ", ".join(self.get_available_metrics(product_id))
|
|
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' PromQL mapping. Available metrics: {available_metrics}"
|
|
await self.module_logger.log_error(error_msg)
|
|
raise HTTPException(status_code=404, detail=error_msg)
|
|
|
|
return {
|
|
"product_id": product_id,
|
|
"metric_name": metric_name,
|
|
"promql_query": self.METRIC_PROMQL_MAP[product_id][metric_name],
|
|
"description": f"PromQL query for {metric_name} metric in product {product_id}"
|
|
}
|