freeleaps-service-hub/apps/metrics/backend/services/starrocks_metrics_service.py

284 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Dict, List, Any, Optional, Union
from datetime import datetime, timedelta, date
from fastapi import HTTPException
from common.log.module_logger import ModuleLogger
from ..infra.external_service.starrocks_client import StarRocksClient
class StarRocksMetricsService:
"""
Service class for querying StarRocks metrics with predefined SQL queries.
This service provides a high-level interface for querying metrics data
using predefined SQL queries mapped to metric names.
"""
# Global dictionary mapping metric names to their corresponding SQL queries
METRIC_SQL_MAP: Dict[str, Dict[str, str]] = {
"freeleaps": {
"daily_registered_users": """
SELECT
date,
product_id,
value,
updated_date
FROM dws_daily_registered_users_test
WHERE date >= %s
AND date < %s
AND product_id = %s
ORDER BY date ASC
""",
"monthly_registered_users": """
SELECT
date,
product_id,
value,
updated_date
FROM dws_monthly_registered_users_test
WHERE date >= %s
AND date < %s
AND product_id = %s
ORDER BY date ASC
""",
},
"magicleaps": {
}
}
def __init__(self, starrocks_endpoint: Optional[str] = None):
"""
Initialize StarRocksMetricsService.
Args:
starrocks_endpoint: StarRocks server endpoint. If None, uses default from settings.
"""
self.module_logger = ModuleLogger(__file__)
self.starrocks_client = StarRocksClient()
def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]:
"""
Get list of available metric names that have predefined SQL queries.
Args:
product_id: Optional product ID to filter metrics. If None, returns all metrics from all products.
Returns:
List of available metric names
"""
if product_id:
if product_id in self.METRIC_SQL_MAP:
return list(self.METRIC_SQL_MAP[product_id].keys())
else:
return []
else:
# Return all metrics from all products
all_metrics = []
for product_metrics in self.METRIC_SQL_MAP.values():
all_metrics.extend(product_metrics.keys())
return all_metrics
def get_available_products(self) -> List[str]:
"""
Get list of available product IDs.
Returns:
List of available product IDs
"""
return list(self.METRIC_SQL_MAP.keys())
async def query_metric_by_time_range(
self,
product_id: str,
metric_name: str,
start_date: Union[str, date],
end_date: Union[str, date]
) -> List[Dict[str, Any]]:
"""
Query metric data for a specific date range.
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric to query
start_date: Start date for the query (ISO string or date)
end_date: End date for the query (ISO string or date)
Returns:
List of dictionaries with 'date' and 'value' keys
Raises:
ValueError: If product_id or metric_name is not found in the SQL mapping
Exception: If StarRocks query fails
Example:
result = await service.query_metric_by_time_range(
"freeleaps",
"daily_registered_users",
start_date=date.today() - timedelta(days=30),
end_date=date.today()
)
# Returns: [{"date": "2024-01-01", "value": 45, "labels": {...}}, ...]
"""
# Check if product_id exists in the mapping
if product_id not in self.METRIC_SQL_MAP:
available_products = ", ".join(self.get_available_products())
error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Check if metric name exists in the product's mapping
if metric_name not in self.METRIC_SQL_MAP[product_id]:
available_metrics = ", ".join(self.get_available_metrics(product_id))
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Parse date strings if they are strings
if isinstance(start_date, str):
try:
start_dt = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
except ValueError:
raise HTTPException(
status_code=400,
detail="Invalid start_date format. Expected YYYY-MM-DD HH:MM:SS"
)
else:
start_dt = start_date
if isinstance(end_date, str):
try:
end_dt = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S')
except ValueError:
raise HTTPException(
status_code=400,
detail="Invalid start_date format. Expected YYYY-MM-DD HH:MM:SS"
)
else:
end_dt = end_date
# Validate date range
if start_dt >= end_dt:
raise HTTPException(
status_code=400,
detail="Start date must be before end date"
)
# Check date range is not too large (max 1 year)
time_diff = end_dt - start_dt
if time_diff > timedelta(days=365):
raise HTTPException(
status_code=400,
detail="Date range cannot exceed 1 year"
)
# Get the SQL query for the metric
sql_query = self.METRIC_SQL_MAP[product_id][metric_name]
try:
await self.module_logger.log_info(
f"Querying metric '{metric_name}' from product '{product_id}' from {start_dt} to {end_dt}")
# Execute the query
result = await self.starrocks_client.execute_query(
query=sql_query,
params=(start_dt, end_dt, product_id)
)
# Parse the result and format it
formatted_data = self._format_query_result(result, metric_name, product_id)
await self.module_logger.log_info(
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
return formatted_data
except Exception as e:
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
raise
def _format_query_result(self, starrocks_result: List[Dict[str, Any]], metric_name: str, product_id: str) -> List[Dict[str, Any]]:
"""
Format StarRocks query result into the required format.
Args:
starrocks_result: Raw result from StarRocks query
metric_name: Name of the metric being queried
product_id: Product ID for the metric
Returns:
List of dictionaries with 'date' and 'value' keys
"""
formatted_data = []
for row in starrocks_result:
# Format the date
date_value = row.get("date")
if date_value:
if isinstance(date_value, str):
date_str = date_value
else:
# 如果是datetime对象格式化为字符串
if hasattr(date_value, 'strftime'):
date_str = date_value.strftime('%Y-%m-%d %H:%M:%S')
else:
date_str = str(date_value)
else:
continue
# Get the value
value = row.get("value", 0)
if value is None:
value = 0
# Create labels dictionary
labels = {
"product_id": row.get("product_id", product_id),
"metric_type": metric_name
}
formatted_data.append({
"date": date_str,
"value": int(value) if value is not None else 0,
"metric": metric_name,
"labels": labels
})
# Sort by date
formatted_data.sort(key=lambda x: x["date"])
return formatted_data
async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]:
"""
Get information about a specific metric including its SQL query.
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric
Returns:
Dictionary containing metric information
Raises:
ValueError: If product_id or metric_name is not found in the SQL mapping
"""
# Check if product_id exists in the mapping
if product_id not in self.METRIC_SQL_MAP:
available_products = ", ".join(self.get_available_products())
error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Check if metric name exists in the product's mapping
if metric_name not in self.METRIC_SQL_MAP[product_id]:
available_metrics = ", ".join(self.get_available_metrics(product_id))
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
return {
"product_id": product_id,
"metric_name": metric_name,
"sql_query": self.METRIC_SQL_MAP[product_id][metric_name].strip(),
"description": f"{metric_name} count from StarRocks table dws_{metric_name}"
}