Merge pull request 'feature/icecheng/metric-fix' (#72) from feature/icecheng/metric-fix into master

Reviewed-on: freeleaps/freeleaps-service-hub#72
This commit is contained in:
icecheng 2025-09-25 14:14:06 +00:00
commit d84be8de15
8 changed files with 252 additions and 90 deletions

View File

@ -1,6 +1,6 @@
import httpx import httpx
from typing import Dict, Any, Optional, Union from typing import Dict, Any, Optional, Union
from datetime import datetime from datetime import datetime, timezone
import json import json
from fastapi import HTTPException from fastapi import HTTPException
@ -79,8 +79,8 @@ class PrometheusClient:
Args: Args:
query: PromQL query string query: PromQL query string
start: Start time (RFC3339 string or datetime) start: Start time (RFC3339 string or datetime, assumed to be UTC)
end: End time (RFC3339 string or datetime) end: End time (RFC3339 string or datetime, assumed to be UTC)
step: Query resolution step width (e.g., "15s", "1m", "1h") step: Query resolution step width (e.g., "15s", "1m", "1h")
Returns: Returns:
@ -89,8 +89,8 @@ class PrometheusClient:
Example: Example:
result = await client.query_range( result = await client.query_range(
"up{job='prometheus'}", "up{job='prometheus'}",
start=datetime.now() - timedelta(hours=1), start=datetime.now(timezone.utc) - timedelta(hours=1),
end=datetime.now(), end=datetime.now(timezone.utc),
step="1m" step="1m"
) )
""" """
@ -99,21 +99,27 @@ class PrometheusClient:
"step": step "step": step
} }
# Convert datetime to RFC3339 string if needed # Convert datetime to RFC3339 string (assume input is UTC)
if isinstance(start, datetime): if isinstance(start, datetime):
# Assume datetime is already UTC, just ensure it's timezone-aware
if start.tzinfo is None: if start.tzinfo is None:
params["start"] = start.isoformat() + "Z" start_utc = start.replace(tzinfo=timezone.utc)
else: else:
params["start"] = start.isoformat() start_utc = start # Assume it's already UTC
params["start"] = start_utc.isoformat()
else: else:
# Assume string is already in UTC format
params["start"] = start params["start"] = start
if isinstance(end, datetime): if isinstance(end, datetime):
# Assume datetime is already UTC, just ensure it's timezone-aware
if end.tzinfo is None: if end.tzinfo is None:
params["end"] = end.isoformat() + "Z" end_utc = end.replace(tzinfo=timezone.utc)
else: else:
params["end"] = end.isoformat() end_utc = end # Assume it's already UTC
params["end"] = end_utc.isoformat()
else: else:
# Assume string is already in UTC format
params["end"] = end params["end"] = end
return await self.request("query_range", params) return await self.request("query_range", params)

View File

@ -1,59 +1,27 @@
import pymysql import aiomysql
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from datetime import date from datetime import date
from common.log.module_logger import ModuleLogger from common.log.module_logger import ModuleLogger
from common.config.app_settings import app_settings from webapi.providers.database import db_pool
class StarRocksClient: class StarRocksClient:
"""StarRocks database client for querying user registration data""" """StarRocks database client for querying user registration data using connection pool"""
def __init__(self): def __init__(self):
self.host = app_settings.STARROCKS_HOST
self.port = app_settings.STARROCKS_PORT
self.user = app_settings.STARROCKS_USER
self.password = app_settings.STARROCKS_PASSWORD
self.database = app_settings.STARROCKS_DATABASE
self.connection = None
self.module_logger = ModuleLogger(__file__) self.module_logger = ModuleLogger(__file__)
async def connect(self) -> bool:
"""Establish connection to StarRocks database"""
try:
self.connection = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset='utf8mb4',
autocommit=True
)
await self.module_logger.log_info(f"Successfully connected to StarRocks at {self.host}:{self.port}")
return True
except Exception as e:
await self.module_logger.log_error(f"Failed to connect to StarRocks: {e}")
return False
async def disconnect(self):
"""Close database connection"""
if self.connection:
self.connection.close()
self.connection = None
await self.module_logger.log_info("Disconnected from StarRocks")
async def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]: async def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
"""Execute SQL query and return results""" """Execute SQL query and return results using connection pool"""
if not self.connection:
if not await self.connect():
raise Exception("Failed to connect to StarRocks database")
try: try:
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor: # Get connection from pool
cursor.execute(query, params) pool = db_pool.get_pool()
results = cursor.fetchall() async with pool.acquire() as conn:
await self.module_logger.log_info(f"Query executed successfully, returned {len(results)} rows") async with conn.cursor(aiomysql.DictCursor) as cursor:
return results await cursor.execute(query, params)
results = await cursor.fetchall()
await self.module_logger.log_info(f"Query executed successfully, returned {len(results)} rows")
return results
except Exception as e: except Exception as e:
await self.module_logger.log_error(f"Query execution failed: {e}") await self.module_logger.log_error(f"Query execution failed: {e}")
raise e raise e

View File

@ -1,5 +1,5 @@
from typing import Dict, List, Any, Optional, Union from typing import Dict, List, Any, Optional, Union
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from fastapi import HTTPException from fastapi import HTTPException
from common.log.module_logger import ModuleLogger from common.log.module_logger import ModuleLogger
@ -24,10 +24,9 @@ class PrometheusMetricsService:
# Just demo, No Usage # Just demo, No Usage
"disk_usage": "100 - ((node_filesystem_avail_bytes{mountpoint=\"/\"} / node_filesystem_size_bytes{mountpoint=\"/\"}) * 100)", "disk_usage": "100 - ((node_filesystem_avail_bytes{mountpoint=\"/\"} / node_filesystem_size_bytes{mountpoint=\"/\"}) * 100)",
# Average response time for notification HTTP requests # Average response time for notification HTTP requests
"latency_ms": "1000*avg(freeleaps_notification_http_request_duration_seconds_sum{handler!=\"none\"} / freeleaps_notification_http_request_duration_seconds_count)", "latency_ms": "(sum(rate(freeleaps_notification_http_request_duration_seconds_sum{handler!~'/api/_/.*'}[30m])) / sum(rate(freeleaps_notification_http_request_duration_seconds_count{handler!~'/api/_/.*'}[30m]))) * 1000",
# Error rate for 5xx HTTP status codes (stability metric) # Error rate for 5xx HTTP status codes (stability metric)
"reliability": "1-sum(rate(freeleaps_notification_http_requests_total{status=\"5xx\"}[1m]))", "reliability": "(1 - sum(rate(freeleaps_notification_http_requests_total{status=~'5..'}[30m])) / sum(rate(freeleaps_notification_http_requests_total[30m]))) * 1 or vector(1)",
}, },
"magicleaps": { "magicleaps": {
@ -75,6 +74,34 @@ class PrometheusMetricsService:
""" """
return list(self.METRIC_PROMQL_MAP.keys()) return list(self.METRIC_PROMQL_MAP.keys())
def _parse_utc_datetime(self, time_input: Union[str, datetime]) -> datetime:
"""
Parse time input and ensure it's a UTC timezone-aware datetime.
Assumes all input times are already in UTC.
Args:
time_input: Time as string (RFC3339) or datetime object, assumed to be UTC
Returns:
UTC timezone-aware datetime object
"""
if isinstance(time_input, str):
# Handle RFC3339 format, assume UTC
if time_input.endswith('Z'):
return datetime.fromisoformat(time_input.replace('Z', '+00:00'))
elif '+' in time_input or time_input.count('-') > 2:
return datetime.fromisoformat(time_input)
else:
# Assume UTC if no timezone specified
return datetime.fromisoformat(time_input + '+00:00')
else:
# Assume datetime is already UTC, just ensure it's timezone-aware
if time_input.tzinfo is None:
return time_input.replace(tzinfo=timezone.utc)
else:
# Already timezone-aware, assume it's UTC
return time_input
async def query_metric_by_time_range( async def query_metric_by_time_range(
self, self,
product_id: str, product_id: str,
@ -89,8 +116,8 @@ class PrometheusMetricsService:
Args: Args:
product_id: Product ID to identify which product's metrics to query product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric to query metric_name: Name of the metric to query
start_time: Start time for the query (RFC3339 string or datetime) start_time: Start time for the query (RFC3339 string or datetime, assumed to be UTC)
end_time: End time for the query (RFC3339 string or datetime) end_time: End time for the query (RFC3339 string or datetime, assumed to be UTC)
step: Query resolution step width (e.g., "1m", "5m", "1h") step: Query resolution step width (e.g., "1m", "5m", "1h")
Returns: Returns:
@ -104,8 +131,8 @@ class PrometheusMetricsService:
result = await service.query_metric_by_time_range( result = await service.query_metric_by_time_range(
"freeleaps", "freeleaps",
"cpu_usage", "cpu_usage",
start_time=datetime.now() - timedelta(hours=1), start_time=datetime.now(timezone.utc) - timedelta(hours=1),
end_time=datetime.now(), end_time=datetime.now(timezone.utc),
step="5m" step="5m"
) )
# Returns: [{"date": "2024-01-01T10:00:00Z", "value": 45.2}, ...] # Returns: [{"date": "2024-01-01T10:00:00Z", "value": 45.2}, ...]
@ -124,16 +151,9 @@ class PrometheusMetricsService:
await self.module_logger.log_error(error_msg) await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg) raise HTTPException(status_code=404, detail=error_msg)
# Parse datetime strings if they are strings # Parse time inputs (assume all inputs are UTC)
if isinstance(start_time, str): start_dt = self._parse_utc_datetime(start_time)
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00')) end_dt = self._parse_utc_datetime(end_time)
else:
start_dt = start_time
if isinstance(end_time, str):
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
else:
end_dt = end_time
# Validate time range # Validate time range
if start_dt >= end_dt: if start_dt >= end_dt:
@ -166,7 +186,7 @@ class PrometheusMetricsService:
) )
# Parse the result and format it # Parse the result and format it
formatted_data = self._format_query_result(result, metric_name) formatted_data = self._format_query_result(result, metric_name, start_dt, end_dt, step)
await self.module_logger.log_info( await self.module_logger.log_info(
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points") f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
@ -176,16 +196,20 @@ class PrometheusMetricsService:
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}") await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
raise raise
def _format_query_result(self, prometheus_result: Dict[str, Any], metric_name: str) -> List[Dict[str, Any]]: def _format_query_result(self, prometheus_result: Dict[str, Any], metric_name: str, start_dt: datetime = None,
end_dt: datetime = None, step: str = "1h") -> List[Dict[str, Any]]:
""" """
Format Prometheus query result into the required format. Format Prometheus query result into the required format and fill missing time points with 0 values.
Args: Args:
prometheus_result: Raw result from Prometheus API prometheus_result: Raw result from Prometheus API
metric_name: Name of the metric being queried metric_name: Name of the metric being queried
start_dt: Start time for filling missing data points
end_dt: End time for filling missing data points
step: Step size for filling missing data points
Returns: Returns:
List of dictionaries with 'date' and 'value' keys List of dictionaries with 'date' and 'value' keys, with missing time points filled with 0 values
""" """
formatted_data = [] formatted_data = []
@ -193,6 +217,14 @@ class PrometheusMetricsService:
data = prometheus_result.get("data", {}) data = prometheus_result.get("data", {})
result_type = data.get("resultType", "") result_type = data.get("resultType", "")
# Check if there are any warnings from Prometheus
warnings = prometheus_result.get("warnings", [])
if warnings:
self.module_logger.log_warning(f"Prometheus warnings: {warnings}")
# First, collect all data points from Prometheus
result_dict = {}
if result_type == "matrix": if result_type == "matrix":
# Handle range query results (matrix) # Handle range query results (matrix)
for series in data.get("result", []): for series in data.get("result", []):
@ -200,15 +232,24 @@ class PrometheusMetricsService:
values = series.get("values", []) values = series.get("values", [])
for timestamp, value in values: for timestamp, value in values:
# Convert Unix timestamp to ISO format # Convert Unix timestamp to UTC ISO format
date_str = datetime.fromtimestamp(timestamp).isoformat() + "Z" date_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
formatted_data.append({ # Handle NaN values properly
if value == "NaN" or value is None:
formatted_value = None
else:
try:
formatted_value = float(value)
except (ValueError, TypeError):
formatted_value = None
result_dict[date_str] = {
"date": date_str, "date": date_str,
"value": float(value) if value != "NaN" else None, "value": formatted_value,
"metric": metric_name, "metric": metric_name,
"labels": metric_labels "labels": metric_labels
}) }
elif result_type == "vector": elif result_type == "vector":
# Handle instant query results (vector) # Handle instant query results (vector)
@ -218,17 +259,87 @@ class PrometheusMetricsService:
value = series.get("value", [None, None])[1] value = series.get("value", [None, None])[1]
if timestamp and value: if timestamp and value:
date_str = datetime.fromtimestamp(timestamp).isoformat() + "Z" date_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
formatted_data.append({ # Handle NaN values properly
if value == "NaN" or value is None:
formatted_value = None
else:
try:
formatted_value = float(value)
except (ValueError, TypeError):
formatted_value = None
result_dict[date_str] = {
"date": date_str, "date": date_str,
"value": float(value) if value != "NaN" else None, "value": formatted_value,
"metric": metric_name, "metric": metric_name,
"labels": metric_labels "labels": metric_labels
}) }
# Sort by date # If we have start and end times, fill missing time points
formatted_data.sort(key=lambda x: x["date"]) if start_dt and end_dt and step:
formatted_data = self._fill_missing_time_points(result_dict, start_dt, end_dt, step, metric_name)
else:
# Just return the data we have, sorted by date
formatted_data = list(result_dict.values())
formatted_data.sort(key=lambda x: x["date"])
return formatted_data
def _fill_missing_time_points(self, result_dict: Dict[str, Dict[str, Any]], start_dt: datetime, end_dt: datetime,
step: str, metric_name: str) -> List[Dict[str, Any]]:
"""
Fill missing time points in the time series data with 0 values.
Args:
result_dict: Dictionary of existing data points
start_dt: Start time for the range
end_dt: End time for the range
step: Step size (e.g., "1h", "1m", "5m")
metric_name: Name of the metric
Returns:
List of data points with missing time points filled with 0 values
"""
formatted_data = []
# Parse step to determine time increment
if step.endswith('h'):
hours = int(step[:-1])
time_increment = timedelta(hours=hours)
elif step.endswith('m'):
minutes = int(step[:-1])
time_increment = timedelta(minutes=minutes)
elif step.endswith('d'):
minutes = int(step[:-1])
time_increment = timedelta(days=minutes)
elif step.endswith('s'):
seconds = int(step[:-1])
time_increment = timedelta(seconds=seconds)
else:
# Default to 1 hour if step format is not recognized
time_increment = timedelta(hours=1)
# Generate complete time range
current_dt = start_dt
while current_dt <= end_dt:
date_str = current_dt.isoformat()
if date_str in result_dict:
# Use existing data point
result_dict[date_str]["date"] = result_dict[date_str]["date"].replace('+00:00', 'Z')
formatted_data.append(result_dict[date_str])
else:
# Fill missing data point with 0 value
formatted_data.append({
"date": date_str.replace('+00:00', 'Z'),
"value": None,
"metric": metric_name,
"labels": {}
})
current_dt += time_increment
return formatted_data return formatted_data

View File

@ -22,7 +22,7 @@ class AppSettings(BaseSettings):
STARROCKS_DATABASE: str = "freeleaps" STARROCKS_DATABASE: str = "freeleaps"
# Prometheus settings # Prometheus settings
PROMETHEUS_ENDPOINT: str = "http://localhost:9090" PROMETHEUS_ENDPOINT: str = "http://kube-prometheus-stack-prometheus.freeleaps-monitoring-system:9090"
METRICS_ENABLED: bool = True METRICS_ENABLED: bool = True
PROBES_ENABLED: bool = True PROBES_ENABLED: bool = True

View File

@ -16,6 +16,6 @@ LOG_BASE_PATH=./logs
BACKEND_LOG_FILE_NAME=metrics BACKEND_LOG_FILE_NAME=metrics
APPLICATION_ACTIVITY_LOG=metrics-activity APPLICATION_ACTIVITY_LOG=metrics-activity
PROMETHEUS_ENDPOINT=http://localhost:9090 PROMETHEUS_ENDPOINT=http://localhost:57828
METRICS_ENABLED=True METRICS_ENABLED=True

View File

@ -14,4 +14,5 @@ pytest==8.4.1
pytest-asyncio==0.21.2 pytest-asyncio==0.21.2
pymysql==1.1.0 pymysql==1.1.0
sqlalchemy==2.0.23 sqlalchemy==2.0.23
aiomysql==0.2.0
python-dotenv python-dotenv

View File

@ -4,7 +4,7 @@ from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi from fastapi.openapi.utils import get_openapi
from common.config.app_settings import app_settings from common.config.app_settings import app_settings
from webapi.providers import exception_handler, common, probes, metrics, router from webapi.providers import exception_handler, common, probes, metrics, router, database
from webapi.providers.logger import register_logger from webapi.providers.logger import register_logger
@ -14,6 +14,7 @@ def create_app() -> FastAPI:
app = FreeleapsMetricsApp() app = FreeleapsMetricsApp()
register_logger() register_logger()
register(app, database)
register(app, exception_handler) register(app, exception_handler)
register(app, router) register(app, router)
register(app, common) register(app, common)

View File

@ -0,0 +1,75 @@
import sys
import aiomysql
from typing import Optional
from common.config.app_settings import app_settings
from common.log.module_logger import ModuleLogger
class DatabaseConnectionPool:
"""Database connection pool manager for StarRocks"""
def __init__(self):
self.pool: Optional[aiomysql.Pool] = None
self.module_logger = ModuleLogger(__file__)
async def create_pool(self) -> bool:
"""Create database connection pool"""
try:
self.pool = await aiomysql.create_pool(
host=app_settings.STARROCKS_HOST,
port=app_settings.STARROCKS_PORT,
user=app_settings.STARROCKS_USER,
password=app_settings.STARROCKS_PASSWORD,
db=app_settings.STARROCKS_DATABASE,
charset='utf8mb4',
autocommit=True,
minsize=5, # Minimum number of connections in the pool
maxsize=20, # Maximum number of connections in the pool
pool_recycle=3600, # Recycle connections after 1 hour
echo=False # Set to True for SQL query logging
)
await self.module_logger.log_info(
f"Database connection pool created successfully for StarRocks"
)
return True
except Exception as e:
await self.module_logger.log_error(f"Failed to create database connection pool: {e}")
sys.exit(1)
async def close_pool(self):
"""Close database connection pool"""
if self.pool:
self.pool.close()
await self.pool.wait_closed()
self.pool = None
await self.module_logger.log_info("Database connection pool closed")
def get_pool(self) -> aiomysql.Pool:
"""Get the database connection pool"""
if not self.pool:
raise Exception("Database connection pool not initialized")
return self.pool
# Global database connection pool instance
db_pool = DatabaseConnectionPool()
def register(app):
"""Register database provider with FastAPI application"""
@app.on_event("startup")
async def startup():
"""Initialize database connection pool on application startup"""
await db_pool.create_pool()
@app.on_event("shutdown")
async def shutdown():
"""Close database connection pool on application shutdown"""
await db_pool.close_pool()
def boot(app):
"""Boot database provider (if needed for additional setup)"""
pass