freeleaps-service-hub/apps/metrics/backend/services/starrocks_metrics_service.py
2025-09-22 14:36:56 +08:00

321 lines
12 KiB
Python

from typing import Dict, List, Any, Optional, Union
from datetime import datetime, timedelta, date
from fastapi import HTTPException
from common.log.module_logger import ModuleLogger
from ..infra.external_service.starrocks_client import StarRocksClient
class StarRocksMetricsService:
"""
Service class for querying StarRocks metrics with predefined SQL queries.
This service provides a high-level interface for querying metrics data
using predefined SQL queries mapped to metric names.
"""
# Global dictionary mapping metric names to their corresponding SQL queries
METRIC_SQL_MAP: Dict[str, Dict[str, str]] = {
"freeleaps": {
"dru": """
SELECT
date,
product_id,
value,
updated_date
FROM dws_dru
WHERE date >= %s
AND date < %s
AND product_id = %s
ORDER BY date ASC
""",
"mru": """
SELECT
date,
product_id,
value,
updated_date
FROM dws_mru
WHERE date >= %s
AND date < %s
AND product_id = %s
ORDER BY date ASC
""",
},
"magicleaps": {
}
}
def __init__(self, starrocks_endpoint: Optional[str] = None):
"""
Initialize StarRocksMetricsService.
Args:
starrocks_endpoint: StarRocks server endpoint. If None, uses default from settings.
"""
self.module_logger = ModuleLogger(__file__)
self.starrocks_client = StarRocksClient()
def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]:
"""
Get list of available metric names that have predefined SQL queries.
Args:
product_id: Optional product ID to filter metrics. If None, returns all metrics from all products.
Returns:
List of available metric names
"""
if product_id:
if product_id in self.METRIC_SQL_MAP:
return list(self.METRIC_SQL_MAP[product_id].keys())
else:
return []
else:
# Return all metrics from all products
all_metrics = []
for product_metrics in self.METRIC_SQL_MAP.values():
all_metrics.extend(product_metrics.keys())
return all_metrics
def get_available_products(self) -> List[str]:
"""
Get list of available product IDs.
Returns:
List of available product IDs
"""
return list(self.METRIC_SQL_MAP.keys())
async def query_metric_by_time_range(
self,
product_id: str,
metric_name: str,
start_date: Union[str, date],
end_date: Union[str, date]
) -> List[Dict[str, Any]]:
"""
Query metric data for a specific date range.
This method will fill missing dates in the range with 0 values to ensure
a complete time series with no gaps.
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric to query
start_date: Start date for the query (ISO string or date)
end_date: End date for the query (ISO string or date)
Returns:
List of dictionaries with 'date' and 'value' keys. Missing dates
in the range will be filled with 0 values.
Raises:
ValueError: If product_id or metric_name is not found in the SQL mapping
Exception: If StarRocks query fails
Example:
result = await service.query_metric_by_time_range(
"freeleaps",
"daily_registered_users",
start_date=date.today() - timedelta(days=30),
end_date=date.today()
)
# Returns: [{"date": "2024-01-01", "value": 45, "labels": {...}},
# {"date": "2024-01-02", "value": 0, "labels": {...}}, ...]
"""
# Check if product_id exists in the mapping
if product_id not in self.METRIC_SQL_MAP:
available_products = ", ".join(self.get_available_products())
error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Check if metric name exists in the product's mapping
if metric_name not in self.METRIC_SQL_MAP[product_id]:
available_metrics = ", ".join(self.get_available_metrics(product_id))
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Parse date strings if they are strings
if isinstance(start_date, str):
try:
start_dt = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
except ValueError:
raise HTTPException(
status_code=400,
detail="Invalid start_date format. Expected YYYY-MM-DD HH:MM:SS"
)
else:
start_dt = start_date
if isinstance(end_date, str):
try:
end_dt = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S')
except ValueError:
raise HTTPException(
status_code=400,
detail="Invalid start_date format. Expected YYYY-MM-DD HH:MM:SS"
)
else:
end_dt = end_date
# Validate date range
if start_dt >= end_dt:
raise HTTPException(
status_code=400,
detail="Start date must be before end date"
)
# Check date range is not too large (max 1 year)
time_diff = end_dt - start_dt
if time_diff > timedelta(days=365):
raise HTTPException(
status_code=400,
detail="Date range cannot exceed 1 year"
)
# Get the SQL query for the metric
sql_query = self.METRIC_SQL_MAP[product_id][metric_name]
try:
await self.module_logger.log_info(
f"Querying metric '{metric_name}' from product '{product_id}' from {start_dt} to {end_dt}")
# Execute the query
result = await self.starrocks_client.execute_query(
query=sql_query,
params=(start_dt, end_dt, product_id)
)
# Parse the result and format it
formatted_data = self._format_query_result(result, metric_name, product_id, start_dt, end_dt)
await self.module_logger.log_info(
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
return formatted_data
except Exception as e:
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
raise
def _format_query_result(self, starrocks_result: List[Dict[str, Any]], metric_name: str, product_id: str, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""
Format StarRocks query result into the required format and fill missing dates with 0 values.
Args:
starrocks_result: Raw result from StarRocks query
metric_name: Name of the metric being queried
product_id: Product ID for the metric
start_date: Start date of the query range
end_date: End date of the query range
Returns:
List of dictionaries with 'date' and 'value' keys, with missing dates filled with 0
"""
# First, process the query results and create a dictionary for quick lookup
result_dict = {}
for row in starrocks_result:
# Format the date
date_value = row.get("date")
if date_value:
if isinstance(date_value, str):
date_str = date_value
else:
# If it's a datetime object, format it as a string
if hasattr(date_value, 'strftime'):
# Convert to date first, then format consistently
if hasattr(date_value, 'date'):
date_obj = date_value.date() if hasattr(date_value, 'date') else date_value
else:
date_obj = date_value
date_str = date_obj.strftime('%Y-%m-%d') + ' 00:00:00'
else:
date_str = str(date_value)
else:
continue
# Get the value
value = row.get("value", 0)
if value is None:
value = 0
# Create labels dictionary
labels = {
"product_id": row.get("product_id", product_id),
"metric_type": metric_name
}
result_dict[date_str] = {
"date": date_str,
"value": int(value) if value is not None else 0,
"metric": metric_name,
"labels": labels
}
# Generate complete date range and fill missing dates with 0
formatted_data = []
current_date = start_date.date()
end_date_only = end_date.date()
while current_date < end_date_only:
date_str = current_date.strftime('%Y-%m-%d') + ' 00:00:00'
if date_str in result_dict:
# Use existing data
formatted_data.append(result_dict[date_str])
else:
# Fill missing date with 0 value
labels = {
"product_id": product_id,
"metric_type": metric_name
}
formatted_data.append({
"date": date_str,
"value": 0,
"metric": metric_name,
"labels": labels
})
current_date += timedelta(days=1)
return formatted_data
async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]:
"""
Get information about a specific metric including its SQL query.
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric
Returns:
Dictionary containing metric information
Raises:
ValueError: If product_id or metric_name is not found in the SQL mapping
"""
# Check if product_id exists in the mapping
if product_id not in self.METRIC_SQL_MAP:
available_products = ", ".join(self.get_available_products())
error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Check if metric name exists in the product's mapping
if metric_name not in self.METRIC_SQL_MAP[product_id]:
available_metrics = ", ".join(self.get_available_metrics(product_id))
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
return {
"product_id": product_id,
"metric_name": metric_name,
"sql_query": self.METRIC_SQL_MAP[product_id][metric_name].strip(),
"description": f"{metric_name} count from StarRocks table dws_{metric_name}"
}