514 lines
19 KiB
Python
514 lines
19 KiB
Python
from typing import Dict, List, Any, Optional, Union
|
|
from datetime import datetime, timedelta, date
|
|
from fastapi import HTTPException
|
|
|
|
from common.log.module_logger import ModuleLogger
|
|
from ..infra.external_service.starrocks_client import StarRocksClient
|
|
|
|
|
|
class StarRocksMetricsService:
|
|
"""
|
|
Service class for querying StarRocks metrics with predefined SQL queries.
|
|
|
|
This service provides a high-level interface for querying metrics data
|
|
using predefined SQL queries mapped to metric names.
|
|
"""
|
|
|
|
# Global dictionary mapping metric names to their corresponding SQL queries
|
|
METRIC_SQL_MAP: Dict[str, Dict[str, str]] = {
|
|
"freeleaps": {
|
|
"dru": """
|
|
SELECT
|
|
date,
|
|
product_id,
|
|
value,
|
|
updated_date
|
|
FROM dws_dru
|
|
WHERE date >= %s
|
|
AND date < %s
|
|
AND product_id = %s
|
|
ORDER BY date ASC
|
|
""",
|
|
"mru": """
|
|
SELECT
|
|
date,
|
|
product_id,
|
|
value,
|
|
updated_date
|
|
FROM dws_mru
|
|
WHERE date >= %s
|
|
AND date < %s
|
|
AND product_id = %s
|
|
ORDER BY date ASC
|
|
""",
|
|
"dcr": """
|
|
SELECT
|
|
date,
|
|
product_id,
|
|
value,
|
|
updated_date
|
|
FROM dws_dcr
|
|
WHERE date >= %s
|
|
AND date < %s
|
|
AND product_id = %s
|
|
ORDER BY date ASC
|
|
""",
|
|
"mrar": """
|
|
SELECT
|
|
date,
|
|
product_id,
|
|
CASE
|
|
WHEN monthly_requests = 0 THEN 0.0
|
|
ELSE (monthly_accepted_requests * 1.0) / monthly_requests
|
|
END AS value,
|
|
updated_date
|
|
FROM dws_mrar
|
|
WHERE date >= %s
|
|
AND date < %s
|
|
AND product_id = %s
|
|
ORDER BY date ASC
|
|
""",
|
|
"trar": """
|
|
SELECT
|
|
product_id,
|
|
CASE
|
|
WHEN total_requests = 0 THEN 0.0
|
|
ELSE (total_accepted_requests * 1.0) / total_requests
|
|
END AS value,
|
|
updated_date
|
|
FROM dws_trar
|
|
WHERE product_id = %s
|
|
""",
|
|
"mrqr": """
|
|
SELECT
|
|
date,
|
|
product_id,
|
|
CASE
|
|
WHEN monthly_requests = 0 THEN 0.0
|
|
ELSE (monthly_quoted_requests * 1.0) / monthly_requests
|
|
END AS value,
|
|
updated_date
|
|
FROM dws_mrqr
|
|
WHERE date >= %s
|
|
AND date < %s
|
|
AND product_id = %s
|
|
ORDER BY date ASC
|
|
""",
|
|
"trqr": """
|
|
SELECT
|
|
product_id,
|
|
CASE
|
|
WHEN total_requests = 0 THEN 0.0
|
|
ELSE (total_quoted_requests * 1.0) / total_requests
|
|
END AS value,
|
|
updated_date
|
|
FROM dws_trqr
|
|
WHERE product_id = %s
|
|
""",
|
|
},
|
|
"magicleaps": {
|
|
|
|
}
|
|
}
|
|
|
|
def __init__(self, starrocks_endpoint: Optional[str] = None):
|
|
"""
|
|
Initialize StarRocksMetricsService.
|
|
|
|
Args:
|
|
starrocks_endpoint: StarRocks server endpoint. If None, uses default from settings.
|
|
"""
|
|
self.module_logger = ModuleLogger(__file__)
|
|
self.starrocks_client = StarRocksClient()
|
|
|
|
def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]:
|
|
"""
|
|
Get list of available metric names that have predefined SQL queries.
|
|
|
|
Args:
|
|
product_id: Optional product ID to filter metrics. If None, returns all metrics from all products.
|
|
|
|
Returns:
|
|
List of available metric names
|
|
"""
|
|
if product_id:
|
|
if product_id in self.METRIC_SQL_MAP:
|
|
return list(self.METRIC_SQL_MAP[product_id].keys())
|
|
else:
|
|
return []
|
|
else:
|
|
# Return all metrics from all products
|
|
all_metrics = []
|
|
for product_metrics in self.METRIC_SQL_MAP.values():
|
|
all_metrics.extend(product_metrics.keys())
|
|
return all_metrics
|
|
|
|
def get_available_products(self) -> List[str]:
|
|
"""
|
|
Get list of available product IDs.
|
|
|
|
Returns:
|
|
List of available product IDs
|
|
"""
|
|
return list(self.METRIC_SQL_MAP.keys())
|
|
|
|
async def query_metric_by_time_range(
|
|
self,
|
|
product_id: str,
|
|
metric_name: str,
|
|
step: Optional[str],
|
|
start_date: Optional[Union[str, date]],
|
|
end_date: Optional[Union[str, date]]
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Query metric data for a specific date range.
|
|
|
|
This method will fill missing dates in the range with 0 values to ensure
|
|
a complete time series with no gaps.
|
|
|
|
Args:
|
|
product_id: Product ID to identify which product's metrics to query
|
|
metric_name: Name of the metric to query
|
|
start_date: Start date for the query (ISO string or date)
|
|
end_date: End date for the query (ISO string or date)
|
|
|
|
Returns:
|
|
List of dictionaries with 'date' and 'value' keys. Missing dates
|
|
in the range will be filled with 0 values.
|
|
|
|
Raises:
|
|
ValueError: If product_id or metric_name is not found in the SQL mapping
|
|
Exception: If StarRocks query fails
|
|
|
|
Example:
|
|
result = await service.query_metric_by_time_range(
|
|
"freeleaps",
|
|
"daily_registered_users",
|
|
start_date=date.today() - timedelta(days=30),
|
|
end_date=date.today()
|
|
)
|
|
# Returns: [{"date": "2024-01-01", "value": 45, "labels": {...}},
|
|
# {"date": "2024-01-02", "value": 0, "labels": {...}}, ...]
|
|
"""
|
|
# Check if product_id exists in the mapping
|
|
if product_id not in self.METRIC_SQL_MAP:
|
|
available_products = ", ".join(self.get_available_products())
|
|
error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}"
|
|
await self.module_logger.log_error(error_msg)
|
|
raise HTTPException(status_code=404, detail=error_msg)
|
|
|
|
# Check if metric name exists in the product's mapping
|
|
if metric_name not in self.METRIC_SQL_MAP[product_id]:
|
|
available_metrics = ", ".join(self.get_available_metrics(product_id))
|
|
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}"
|
|
await self.module_logger.log_error(error_msg)
|
|
raise HTTPException(status_code=404, detail=error_msg)
|
|
|
|
# Check if metric need time params
|
|
# Starting with "t" indicates a query for the total count since the very first record.
|
|
# NOTE: This determination logic is subject to future changes.
|
|
if (start_date is None) or (end_date is None) or (step is None):
|
|
if metric_name.startswith('t'):
|
|
return await self._query_metric_by_product_id(product_id, metric_name)
|
|
else:
|
|
error_msg = f"Metric '{metric_name}' should be queried by start date, end date and step."
|
|
await self.module_logger.log_error(error_msg)
|
|
raise HTTPException(status_code=404, detail=error_msg)
|
|
|
|
# Parse date strings if they are strings
|
|
if isinstance(start_date, str):
|
|
try:
|
|
start_dt = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Invalid start_date format. Expected YYYY-MM-DD HH:MM:SS"
|
|
)
|
|
else:
|
|
start_dt = start_date
|
|
|
|
if isinstance(end_date, str):
|
|
try:
|
|
end_dt = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Invalid start_date format. Expected YYYY-MM-DD HH:MM:SS"
|
|
)
|
|
else:
|
|
end_dt = end_date
|
|
|
|
# Normalize and validate step (default '1d')
|
|
step = step or '1d'
|
|
if step not in {"1d", "1m"}:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Invalid step. Supported values are '1d' and '1m'"
|
|
)
|
|
|
|
# Validate date range
|
|
if start_dt >= end_dt:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Start date must be before end date"
|
|
)
|
|
|
|
# Check date range is not too large (max 1 year)
|
|
time_diff = end_dt - start_dt
|
|
if time_diff > timedelta(days=365):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Date range cannot exceed 1 year"
|
|
)
|
|
|
|
# Get the SQL query for the metric
|
|
sql_query = self.METRIC_SQL_MAP[product_id][metric_name]
|
|
|
|
try:
|
|
await self.module_logger.log_info(
|
|
f"Querying metric '{metric_name}' from product '{product_id}' from {start_dt} to {end_dt}")
|
|
|
|
# Execute the query
|
|
result = await self.starrocks_client.execute_query(
|
|
query=sql_query,
|
|
params=(start_dt, end_dt, product_id)
|
|
)
|
|
|
|
# Parse the result and format it
|
|
formatted_data = self._format_query_result(
|
|
starrocks_result=result,
|
|
metric_name=metric_name,
|
|
product_id=product_id,
|
|
step=step,
|
|
start_date=start_dt,
|
|
end_date=end_dt
|
|
)
|
|
|
|
await self.module_logger.log_info(
|
|
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
|
|
return formatted_data
|
|
|
|
except Exception as e:
|
|
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
|
|
raise
|
|
|
|
def _format_query_result(self, starrocks_result: List[Dict[str, Any]], metric_name: str, product_id: str, step: str, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
|
|
"""
|
|
Format StarRocks query result into the required format and fill missing dates with 0 values.
|
|
|
|
Args:
|
|
starrocks_result: Raw result from StarRocks query
|
|
metric_name: Name of the metric being queried
|
|
product_id: Product ID for the metric
|
|
start_date: Start date of the query range
|
|
end_date: End date of the query range
|
|
|
|
Returns:
|
|
List of dictionaries with 'date' and 'value' keys, with missing dates filled with 0
|
|
"""
|
|
# First, process the query results and create a dictionary for quick lookup
|
|
result_dict = {}
|
|
|
|
for row in starrocks_result:
|
|
# Normalize the date according to step granularity
|
|
date_value = row.get("date")
|
|
if not date_value:
|
|
continue
|
|
|
|
def month_start(d: datetime) -> datetime:
|
|
return datetime(d.year, d.month, 1)
|
|
|
|
# Parse and normalize
|
|
if isinstance(date_value, str):
|
|
parsed_dt = None
|
|
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'):
|
|
try:
|
|
parsed_dt = datetime.strptime(date_value, fmt)
|
|
break
|
|
except ValueError:
|
|
continue
|
|
if parsed_dt is None:
|
|
date_str = str(date_value)
|
|
else:
|
|
if step == '1m':
|
|
date_str = month_start(parsed_dt).strftime('%Y-%m-01 00:00:00')
|
|
else:
|
|
date_str = parsed_dt.strftime('%Y-%m-%d') + ' 00:00:00'
|
|
else:
|
|
if hasattr(date_value, 'strftime'):
|
|
dt_obj = date_value
|
|
if step == '1m':
|
|
date_str = month_start(dt_obj).strftime('%Y-%m-01 00:00:00')
|
|
else:
|
|
if hasattr(dt_obj, 'date'):
|
|
dt_obj = dt_obj.date()
|
|
date_str = dt_obj.strftime('%Y-%m-%d') + ' 00:00:00'
|
|
else:
|
|
date_str = str(date_value)
|
|
|
|
# Get the value
|
|
value = row.get("value", 0)
|
|
if value is None:
|
|
value = 0
|
|
|
|
# Create labels dictionary
|
|
labels = {
|
|
"product_id": row.get("product_id", product_id),
|
|
"metric_type": metric_name
|
|
}
|
|
|
|
result_dict[date_str] = {
|
|
"date": date_str,
|
|
"value": value if value is not None else 0,
|
|
"metric": metric_name,
|
|
"labels": labels
|
|
}
|
|
|
|
# Generate complete range and fill missing points with 0
|
|
formatted_data = []
|
|
if step == '1d':
|
|
current_dt = datetime(start_date.year, start_date.month, start_date.day)
|
|
end_dt_exclusive = datetime(end_date.year, end_date.month, end_date.day)
|
|
while current_dt < end_dt_exclusive:
|
|
date_str = current_dt.strftime('%Y-%m-%d') + ' 00:00:00'
|
|
if date_str in result_dict:
|
|
formatted_data.append(result_dict[date_str])
|
|
else:
|
|
labels = {
|
|
"product_id": product_id,
|
|
"metric_type": metric_name
|
|
}
|
|
formatted_data.append({
|
|
"date": date_str,
|
|
"value": 0,
|
|
"metric": metric_name,
|
|
"labels": labels
|
|
})
|
|
current_dt += timedelta(days=1)
|
|
elif step == '1m':
|
|
def month_start(d: datetime) -> datetime:
|
|
return datetime(d.year, d.month, 1)
|
|
|
|
def add_one_month(d: datetime) -> datetime:
|
|
year = d.year + (1 if d.month == 12 else 0)
|
|
month = 1 if d.month == 12 else d.month + 1
|
|
return datetime(year, month, 1)
|
|
|
|
current_dt = month_start(start_date)
|
|
end_month_exclusive = month_start(end_date)
|
|
while current_dt < end_month_exclusive:
|
|
date_str = current_dt.strftime('%Y-%m-01 00:00:00')
|
|
if date_str in result_dict:
|
|
formatted_data.append(result_dict[date_str])
|
|
else:
|
|
labels = {
|
|
"product_id": product_id,
|
|
"metric_type": metric_name
|
|
}
|
|
formatted_data.append({
|
|
"date": date_str,
|
|
"value": 0,
|
|
"metric": metric_name,
|
|
"labels": labels
|
|
})
|
|
current_dt = add_one_month(current_dt)
|
|
|
|
return formatted_data
|
|
|
|
async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]:
|
|
"""
|
|
Get information about a specific metric including its SQL query.
|
|
|
|
Args:
|
|
product_id: Product ID to identify which product's metrics to query
|
|
metric_name: Name of the metric
|
|
|
|
Returns:
|
|
Dictionary containing metric information
|
|
|
|
Raises:
|
|
ValueError: If product_id or metric_name is not found in the SQL mapping
|
|
"""
|
|
# Check if product_id exists in the mapping
|
|
if product_id not in self.METRIC_SQL_MAP:
|
|
available_products = ", ".join(self.get_available_products())
|
|
error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}"
|
|
await self.module_logger.log_error(error_msg)
|
|
raise HTTPException(status_code=404, detail=error_msg)
|
|
|
|
# Check if metric name exists in the product's mapping
|
|
if metric_name not in self.METRIC_SQL_MAP[product_id]:
|
|
available_metrics = ", ".join(self.get_available_metrics(product_id))
|
|
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}"
|
|
await self.module_logger.log_error(error_msg)
|
|
raise HTTPException(status_code=404, detail=error_msg)
|
|
|
|
return {
|
|
"product_id": product_id,
|
|
"metric_name": metric_name,
|
|
"sql_query": self.METRIC_SQL_MAP[product_id][metric_name].strip(),
|
|
"description": f"{metric_name} count from StarRocks table dws_{metric_name}"
|
|
}
|
|
|
|
async def _query_metric_by_product_id(self, product_id: str, metric_name: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Query metric not suitable for date range (e.g. data related to calculating total records).
|
|
|
|
Args:
|
|
product_id: Product ID to identify which product's metrics to query
|
|
metric_name: Name of the metric to query
|
|
|
|
Returns:
|
|
List of dictionaries with 'product_id' key.
|
|
|
|
Raises:
|
|
Exception: If StarRocks query fails
|
|
|
|
Example:
|
|
result = await service.query_metric_by_time_range(
|
|
"freeleaps",
|
|
"total_request_quoted_rate",
|
|
)
|
|
# Returns: [{"date": "freeleaps", "value": 45, "labels": {...}},]
|
|
"""
|
|
|
|
# Get the SQL query for the metric
|
|
sql_query = self.METRIC_SQL_MAP[product_id][metric_name]
|
|
|
|
try:
|
|
await self.module_logger.log_info(
|
|
f"Querying metric '{metric_name}' from product '{product_id}'")
|
|
|
|
# Execute the query
|
|
result = await self.starrocks_client.execute_query(
|
|
query=sql_query,
|
|
params=(product_id)
|
|
)
|
|
|
|
# Parse the result and format it
|
|
for row in result:
|
|
# Get the value
|
|
value = row.get("value", 0)
|
|
if value is None:
|
|
value = 0
|
|
|
|
# Create labels dictionary
|
|
labels = {
|
|
"product_id": row.get("product_id", product_id),
|
|
"metric_type": metric_name,
|
|
}
|
|
result_dict = []
|
|
result_dict.append({
|
|
"date": None,
|
|
"value": value if value is not None else 0,
|
|
"metric": metric_name,
|
|
"labels": labels
|
|
})
|
|
|
|
await self.module_logger.log_info(
|
|
f"Successfully queried metric '{metric_name}'")
|
|
return result_dict
|
|
|
|
except Exception as e:
|
|
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
|
|
raise |