feat: update prometheus query logic, use utc timezone and update promql

This commit is contained in:
icecheng 2025-09-25 22:12:32 +08:00
parent eae0255766
commit 8ee447a9fb
5 changed files with 167 additions and 49 deletions

View File

@ -1,6 +1,6 @@
import httpx import httpx
from typing import Dict, Any, Optional, Union from typing import Dict, Any, Optional, Union
from datetime import datetime from datetime import datetime, timezone
import json import json
from fastapi import HTTPException from fastapi import HTTPException
@ -79,8 +79,8 @@ class PrometheusClient:
Args: Args:
query: PromQL query string query: PromQL query string
start: Start time (RFC3339 string or datetime) start: Start time (RFC3339 string or datetime, assumed to be UTC)
end: End time (RFC3339 string or datetime) end: End time (RFC3339 string or datetime, assumed to be UTC)
step: Query resolution step width (e.g., "15s", "1m", "1h") step: Query resolution step width (e.g., "15s", "1m", "1h")
Returns: Returns:
@ -89,8 +89,8 @@ class PrometheusClient:
Example: Example:
result = await client.query_range( result = await client.query_range(
"up{job='prometheus'}", "up{job='prometheus'}",
start=datetime.now() - timedelta(hours=1), start=datetime.now(timezone.utc) - timedelta(hours=1),
end=datetime.now(), end=datetime.now(timezone.utc),
step="1m" step="1m"
) )
""" """
@ -99,21 +99,27 @@ class PrometheusClient:
"step": step "step": step
} }
# Convert datetime to RFC3339 string if needed # Convert datetime to RFC3339 string (assume input is UTC)
if isinstance(start, datetime): if isinstance(start, datetime):
# Assume datetime is already UTC, just ensure it's timezone-aware
if start.tzinfo is None: if start.tzinfo is None:
params["start"] = start.isoformat() + "Z" start_utc = start.replace(tzinfo=timezone.utc)
else: else:
params["start"] = start.isoformat() start_utc = start # Assume it's already UTC
params["start"] = start_utc.isoformat()
else: else:
# Assume string is already in UTC format
params["start"] = start params["start"] = start
if isinstance(end, datetime): if isinstance(end, datetime):
# Assume datetime is already UTC, just ensure it's timezone-aware
if end.tzinfo is None: if end.tzinfo is None:
params["end"] = end.isoformat() + "Z" end_utc = end.replace(tzinfo=timezone.utc)
else: else:
params["end"] = end.isoformat() end_utc = end # Assume it's already UTC
params["end"] = end_utc.isoformat()
else: else:
# Assume string is already in UTC format
params["end"] = end params["end"] = end
return await self.request("query_range", params) return await self.request("query_range", params)

View File

@ -1,5 +1,5 @@
from typing import Dict, List, Any, Optional, Union from typing import Dict, List, Any, Optional, Union
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from fastapi import HTTPException from fastapi import HTTPException
from common.log.module_logger import ModuleLogger from common.log.module_logger import ModuleLogger
@ -24,10 +24,9 @@ class PrometheusMetricsService:
# Just demo, No Usage # Just demo, No Usage
"disk_usage": "100 - ((node_filesystem_avail_bytes{mountpoint=\"/\"} / node_filesystem_size_bytes{mountpoint=\"/\"}) * 100)", "disk_usage": "100 - ((node_filesystem_avail_bytes{mountpoint=\"/\"} / node_filesystem_size_bytes{mountpoint=\"/\"}) * 100)",
# Average response time for notification HTTP requests # Average response time for notification HTTP requests
"latency_ms": "1000*avg(freeleaps_notification_http_request_duration_seconds_sum{handler!=\"none\"} / freeleaps_notification_http_request_duration_seconds_count)", "latency_ms": "(sum(rate(freeleaps_notification_http_request_duration_seconds_sum{handler!~'/api/_/.*'}[30m])) / sum(rate(freeleaps_notification_http_request_duration_seconds_count{handler!~'/api/_/.*'}[30m]))) * 1000",
# Error rate for 5xx HTTP status codes (stability metric) # Error rate for 5xx HTTP status codes (stability metric)
"reliability": "1-sum(rate(freeleaps_notification_http_requests_total{status=\"5xx\"}[1m]))", "reliability": "(1 - sum(rate(freeleaps_notification_http_requests_total{status=~'5..'}[30m])) / sum(rate(freeleaps_notification_http_requests_total[30m]))) * 1 or vector(1)",
}, },
"magicleaps": { "magicleaps": {
@ -75,6 +74,34 @@ class PrometheusMetricsService:
""" """
return list(self.METRIC_PROMQL_MAP.keys()) return list(self.METRIC_PROMQL_MAP.keys())
def _parse_utc_datetime(self, time_input: Union[str, datetime]) -> datetime:
"""
Parse time input and ensure it's a UTC timezone-aware datetime.
Assumes all input times are already in UTC.
Args:
time_input: Time as string (RFC3339) or datetime object, assumed to be UTC
Returns:
UTC timezone-aware datetime object
"""
if isinstance(time_input, str):
# Handle RFC3339 format, assume UTC
if time_input.endswith('Z'):
return datetime.fromisoformat(time_input.replace('Z', '+00:00'))
elif '+' in time_input or time_input.count('-') > 2:
return datetime.fromisoformat(time_input)
else:
# Assume UTC if no timezone specified
return datetime.fromisoformat(time_input + '+00:00')
else:
# Assume datetime is already UTC, just ensure it's timezone-aware
if time_input.tzinfo is None:
return time_input.replace(tzinfo=timezone.utc)
else:
# Already timezone-aware, assume it's UTC
return time_input
async def query_metric_by_time_range( async def query_metric_by_time_range(
self, self,
product_id: str, product_id: str,
@ -89,8 +116,8 @@ class PrometheusMetricsService:
Args: Args:
product_id: Product ID to identify which product's metrics to query product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric to query metric_name: Name of the metric to query
start_time: Start time for the query (RFC3339 string or datetime) start_time: Start time for the query (RFC3339 string or datetime, assumed to be UTC)
end_time: End time for the query (RFC3339 string or datetime) end_time: End time for the query (RFC3339 string or datetime, assumed to be UTC)
step: Query resolution step width (e.g., "1m", "5m", "1h") step: Query resolution step width (e.g., "1m", "5m", "1h")
Returns: Returns:
@ -104,8 +131,8 @@ class PrometheusMetricsService:
result = await service.query_metric_by_time_range( result = await service.query_metric_by_time_range(
"freeleaps", "freeleaps",
"cpu_usage", "cpu_usage",
start_time=datetime.now() - timedelta(hours=1), start_time=datetime.now(timezone.utc) - timedelta(hours=1),
end_time=datetime.now(), end_time=datetime.now(timezone.utc),
step="5m" step="5m"
) )
# Returns: [{"date": "2024-01-01T10:00:00Z", "value": 45.2}, ...] # Returns: [{"date": "2024-01-01T10:00:00Z", "value": 45.2}, ...]
@ -124,16 +151,9 @@ class PrometheusMetricsService:
await self.module_logger.log_error(error_msg) await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg) raise HTTPException(status_code=404, detail=error_msg)
# Parse datetime strings if they are strings # Parse time inputs (assume all inputs are UTC)
if isinstance(start_time, str): start_dt = self._parse_utc_datetime(start_time)
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00')) end_dt = self._parse_utc_datetime(end_time)
else:
start_dt = start_time
if isinstance(end_time, str):
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
else:
end_dt = end_time
# Validate time range # Validate time range
if start_dt >= end_dt: if start_dt >= end_dt:
@ -166,7 +186,7 @@ class PrometheusMetricsService:
) )
# Parse the result and format it # Parse the result and format it
formatted_data = self._format_query_result(result, metric_name) formatted_data = self._format_query_result(result, metric_name, start_dt, end_dt, step)
await self.module_logger.log_info( await self.module_logger.log_info(
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points") f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
@ -176,16 +196,20 @@ class PrometheusMetricsService:
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}") await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
raise raise
def _format_query_result(self, prometheus_result: Dict[str, Any], metric_name: str) -> List[Dict[str, Any]]: def _format_query_result(self, prometheus_result: Dict[str, Any], metric_name: str, start_dt: datetime = None,
end_dt: datetime = None, step: str = "1h") -> List[Dict[str, Any]]:
""" """
Format Prometheus query result into the required format. Format Prometheus query result into the required format and fill missing time points with 0 values.
Args: Args:
prometheus_result: Raw result from Prometheus API prometheus_result: Raw result from Prometheus API
metric_name: Name of the metric being queried metric_name: Name of the metric being queried
start_dt: Start time for filling missing data points
end_dt: End time for filling missing data points
step: Step size for filling missing data points
Returns: Returns:
List of dictionaries with 'date' and 'value' keys List of dictionaries with 'date' and 'value' keys, with missing time points filled with 0 values
""" """
formatted_data = [] formatted_data = []
@ -193,6 +217,14 @@ class PrometheusMetricsService:
data = prometheus_result.get("data", {}) data = prometheus_result.get("data", {})
result_type = data.get("resultType", "") result_type = data.get("resultType", "")
# Check if there are any warnings from Prometheus
warnings = prometheus_result.get("warnings", [])
if warnings:
self.module_logger.log_warning(f"Prometheus warnings: {warnings}")
# First, collect all data points from Prometheus
result_dict = {}
if result_type == "matrix": if result_type == "matrix":
# Handle range query results (matrix) # Handle range query results (matrix)
for series in data.get("result", []): for series in data.get("result", []):
@ -200,15 +232,24 @@ class PrometheusMetricsService:
values = series.get("values", []) values = series.get("values", [])
for timestamp, value in values: for timestamp, value in values:
# Convert Unix timestamp to ISO format # Convert Unix timestamp to UTC ISO format
date_str = datetime.fromtimestamp(timestamp).isoformat() + "Z" date_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
formatted_data.append({ # Handle NaN values properly
if value == "NaN" or value is None:
formatted_value = None
else:
try:
formatted_value = float(value)
except (ValueError, TypeError):
formatted_value = None
result_dict[date_str] = {
"date": date_str, "date": date_str,
"value": float(value) if value != "NaN" else None, "value": formatted_value,
"metric": metric_name, "metric": metric_name,
"labels": metric_labels "labels": metric_labels
}) }
elif result_type == "vector": elif result_type == "vector":
# Handle instant query results (vector) # Handle instant query results (vector)
@ -218,17 +259,87 @@ class PrometheusMetricsService:
value = series.get("value", [None, None])[1] value = series.get("value", [None, None])[1]
if timestamp and value: if timestamp and value:
date_str = datetime.fromtimestamp(timestamp).isoformat() + "Z" date_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
formatted_data.append({ # Handle NaN values properly
if value == "NaN" or value is None:
formatted_value = None
else:
try:
formatted_value = float(value)
except (ValueError, TypeError):
formatted_value = None
result_dict[date_str] = {
"date": date_str, "date": date_str,
"value": float(value) if value != "NaN" else None, "value": formatted_value,
"metric": metric_name, "metric": metric_name,
"labels": metric_labels "labels": metric_labels
}) }
# Sort by date # If we have start and end times, fill missing time points
formatted_data.sort(key=lambda x: x["date"]) if start_dt and end_dt and step:
formatted_data = self._fill_missing_time_points(result_dict, start_dt, end_dt, step, metric_name)
else:
# Just return the data we have, sorted by date
formatted_data = list(result_dict.values())
formatted_data.sort(key=lambda x: x["date"])
return formatted_data
def _fill_missing_time_points(self, result_dict: Dict[str, Dict[str, Any]], start_dt: datetime, end_dt: datetime,
step: str, metric_name: str) -> List[Dict[str, Any]]:
"""
Fill missing time points in the time series data with 0 values.
Args:
result_dict: Dictionary of existing data points
start_dt: Start time for the range
end_dt: End time for the range
step: Step size (e.g., "1h", "1m", "5m")
metric_name: Name of the metric
Returns:
List of data points with missing time points filled with 0 values
"""
formatted_data = []
# Parse step to determine time increment
if step.endswith('h'):
hours = int(step[:-1])
time_increment = timedelta(hours=hours)
elif step.endswith('m'):
minutes = int(step[:-1])
time_increment = timedelta(minutes=minutes)
elif step.endswith('d'):
minutes = int(step[:-1])
time_increment = timedelta(days=minutes)
elif step.endswith('s'):
seconds = int(step[:-1])
time_increment = timedelta(seconds=seconds)
else:
# Default to 1 hour if step format is not recognized
time_increment = timedelta(hours=1)
# Generate complete time range
current_dt = start_dt
while current_dt <= end_dt:
date_str = current_dt.isoformat()
if date_str in result_dict:
# Use existing data point
result_dict[date_str]["date"] = result_dict[date_str]["date"].replace('+00:00', 'Z')
formatted_data.append(result_dict[date_str])
else:
# Fill missing data point with 0 value
formatted_data.append({
"date": date_str.replace('+00:00', 'Z'),
"value": None,
"metric": metric_name,
"labels": {}
})
current_dt += time_increment
return formatted_data return formatted_data

View File

@ -22,7 +22,7 @@ class AppSettings(BaseSettings):
STARROCKS_DATABASE: str = "freeleaps" STARROCKS_DATABASE: str = "freeleaps"
# Prometheus settings # Prometheus settings
PROMETHEUS_ENDPOINT: str = "http://localhost:9090" PROMETHEUS_ENDPOINT: str = "http://kube-prometheus-stack-prometheus.freeleaps-monitoring-system:9090"
METRICS_ENABLED: bool = True METRICS_ENABLED: bool = True
PROBES_ENABLED: bool = True PROBES_ENABLED: bool = True

View File

@ -16,6 +16,6 @@ LOG_BASE_PATH=./logs
BACKEND_LOG_FILE_NAME=metrics BACKEND_LOG_FILE_NAME=metrics
APPLICATION_ACTIVITY_LOG=metrics-activity APPLICATION_ACTIVITY_LOG=metrics-activity
PROMETHEUS_ENDPOINT=http://localhost:9090 PROMETHEUS_ENDPOINT=http://localhost:57828
METRICS_ENABLED=True METRICS_ENABLED=True

View File

@ -1,3 +1,5 @@
import sys
import aiomysql import aiomysql
from typing import Optional from typing import Optional
from common.config.app_settings import app_settings from common.config.app_settings import app_settings
@ -28,13 +30,12 @@ class DatabaseConnectionPool:
echo=False # Set to True for SQL query logging echo=False # Set to True for SQL query logging
) )
await self.module_logger.log_info( await self.module_logger.log_info(
f"Database connection pool created successfully for StarRocks at " f"Database connection pool created successfully for StarRocks"
f"{app_settings.STARROCKS_HOST}:{app_settings.STARROCKS_PORT}"
) )
return True return True
except Exception as e: except Exception as e:
await self.module_logger.log_error(f"Failed to create database connection pool: {e}") await self.module_logger.log_error(f"Failed to create database connection pool: {e}")
return False sys.exit(1)
async def close_pool(self): async def close_pool(self):
"""Close database connection pool""" """Close database connection pool"""