Merge pull request 'feature/wc' (#54) from feature/wc into dev

Reviewed-on: freeleaps/freeleaps-service-hub#54
This commit is contained in:
icecheng 2025-09-19 04:53:43 +00:00
commit 39d7d2a0b4
10 changed files with 483 additions and 230 deletions

View File

@ -1,132 +0,0 @@
from typing import List, Dict, Any
from datetime import date, timedelta
from loguru import logger
from backend.infra.external_service.starrocks_client import StarRocksClient
from backend.models.user_registration_models import UserRegistrationResponse, DailyRegisteredUsers
class RegistrationService:
"""Service for handling user registration data queries"""
def __init__(self):
self.starrocks_client = StarRocksClient()
def get_daily_registered_users(
self,
start_date: date,
end_date: date,
product_id: str = "freeleaps"
) -> UserRegistrationResponse:
"""
Get daily registered users count for a date range
Args:
start_date: Start date for the query
end_date: End date for the query
product_id: Product identifier (default: freeleaps)
Returns:
UserRegistrationResponse with dates and counts
"""
try:
# Query data from StarRocks
raw_data = self.starrocks_client.get_daily_registered_users(
start_date, end_date, product_id
)
# Convert to DailyRegisteredUsers objects
daily_data = [
DailyRegisteredUsers(
date_id=row['date_id'],
product_id=row['product_id'],
registered_cnt=row['registered_cnt'],
updated_at=row.get('updated_at')
)
for row in raw_data
]
# Create date-to-count mapping
data_dict = {str(item.date_id): item.registered_cnt for item in daily_data}
# Generate complete date range
dates = []
counts = []
current_date = start_date
while current_date <= end_date:
date_str = str(current_date)
dates.append(date_str)
counts.append(data_dict.get(date_str, 0))
current_date += timedelta(days=1)
# Calculate total registrations
total_registrations = sum(counts)
logger.info(
f"Retrieved registration data for {len(dates)} days, "
f"total registrations: {total_registrations}"
)
return UserRegistrationResponse(
dates=dates,
counts=counts,
total_registrations=total_registrations,
query_period=f"{start_date} to {end_date}"
)
except Exception as e:
logger.error(f"Failed to get daily registered users: {e}")
raise e
def get_registration_summary(
self,
start_date: date,
end_date: date,
product_id: str = "freeleaps"
) -> Dict[str, Any]:
"""
Get summary statistics for user registrations
Args:
start_date: Start date for the query
end_date: End date for the query
product_id: Product identifier
Returns:
Dictionary with summary statistics
"""
try:
response = self.get_daily_registered_users(start_date, end_date, product_id)
if not response.counts:
return {
"total_registrations": 0,
"average_daily": 0,
"max_daily": 0,
"min_daily": 0,
"days_with_registrations": 0,
"total_days": len(response.dates)
}
counts = response.counts
non_zero_counts = [c for c in counts if c > 0]
return {
"total_registrations": response.total_registrations,
"average_daily": round(sum(counts) / len(counts), 2),
"max_daily": max(counts),
"min_daily": min(counts),
"days_with_registrations": len(non_zero_counts),
"total_days": len(response.dates)
}
except Exception as e:
logger.error(f"Failed to get registration summary: {e}")
raise e

View File

@ -0,0 +1,268 @@
from typing import Dict, List, Any, Optional, Union
from datetime import datetime, timedelta, date
from fastapi import HTTPException
from common.log.module_logger import ModuleLogger
from ..infra.external_service.starrocks_client import StarRocksClient
class StarRocksMetricsService:
"""
Service class for querying StarRocks metrics with predefined SQL queries.
This service provides a high-level interface for querying metrics data
using predefined SQL queries mapped to metric names.
"""
# Global dictionary mapping metric names to their corresponding SQL queries
METRIC_SQL_MAP: Dict[str, Dict[str, str]] = {
"freeleaps": {
"daily_registered_users": """
SELECT
date_id,
product_id,
registered_cnt,
updated_at
FROM dws_daily_registered_users
WHERE date_id >= %s
AND date_id < %s
AND product_id = %s
ORDER BY date_id ASC
""",
},
"magicleaps": {
}
}
def __init__(self, starrocks_endpoint: Optional[str] = None):
"""
Initialize StarRocksMetricsService.
Args:
starrocks_endpoint: StarRocks server endpoint. If None, uses default from settings.
"""
self.module_logger = ModuleLogger(__file__)
self.starrocks_client = StarRocksClient()
def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]:
"""
Get list of available metric names that have predefined SQL queries.
Args:
product_id: Optional product ID to filter metrics. If None, returns all metrics from all products.
Returns:
List of available metric names
"""
if product_id:
if product_id in self.METRIC_SQL_MAP:
return list(self.METRIC_SQL_MAP[product_id].keys())
else:
return []
else:
# Return all metrics from all products
all_metrics = []
for product_metrics in self.METRIC_SQL_MAP.values():
all_metrics.extend(product_metrics.keys())
return all_metrics
def get_available_products(self) -> List[str]:
"""
Get list of available product IDs.
Returns:
List of available product IDs
"""
return list(self.METRIC_SQL_MAP.keys())
async def query_metric_by_time_range(
self,
product_id: str,
metric_name: str,
start_date: Union[str, date],
end_date: Union[str, date]
) -> List[Dict[str, Any]]:
"""
Query metric data for a specific date range.
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric to query
start_date: Start date for the query (ISO string or date)
end_date: End date for the query (ISO string or date)
Returns:
List of dictionaries with 'date' and 'value' keys
Raises:
ValueError: If product_id or metric_name is not found in the SQL mapping
Exception: If StarRocks query fails
Example:
result = await service.query_metric_by_time_range(
"freeleaps",
"daily_registered_users",
start_date=date.today() - timedelta(days=30),
end_date=date.today()
)
# Returns: [{"date": "2024-01-01", "value": 45, "labels": {...}}, ...]
"""
# Check if product_id exists in the mapping
if product_id not in self.METRIC_SQL_MAP:
available_products = ", ".join(self.get_available_products())
error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Check if metric name exists in the product's mapping
if metric_name not in self.METRIC_SQL_MAP[product_id]:
available_metrics = ", ".join(self.get_available_metrics(product_id))
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Parse date strings if they are strings
if isinstance(start_date, str):
try:
start_dt = datetime.strptime(start_date, '%Y-%m-%d').date()
except ValueError:
raise HTTPException(
status_code=400,
detail="Invalid start_date format. Expected YYYY-MM-DD"
)
else:
start_dt = start_date
if isinstance(end_date, str):
try:
end_dt = datetime.strptime(end_date, '%Y-%m-%d').date()
except ValueError:
raise HTTPException(
status_code=400,
detail="Invalid end_date format. Expected YYYY-MM-DD"
)
else:
end_dt = end_date
# Validate date range
if start_dt >= end_dt:
raise HTTPException(
status_code=400,
detail="Start date must be before end date"
)
# Check date range is not too large (max 1 year)
time_diff = end_dt - start_dt
if time_diff > timedelta(days=365):
raise HTTPException(
status_code=400,
detail="Date range cannot exceed 1 year"
)
# Get the SQL query for the metric
sql_query = self.METRIC_SQL_MAP[product_id][metric_name]
try:
await self.module_logger.log_info(
f"Querying metric '{metric_name}' from product '{product_id}' from {start_dt} to {end_dt}")
# Execute the query
result = self.starrocks_client.execute_query(
query=sql_query,
params=(start_dt, end_dt, product_id)
)
# Parse the result and format it
formatted_data = self._format_query_result(result, metric_name, product_id)
await self.module_logger.log_info(
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
return formatted_data
except Exception as e:
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
raise
def _format_query_result(self, starrocks_result: List[Dict[str, Any]], metric_name: str, product_id: str) -> List[Dict[str, Any]]:
"""
Format StarRocks query result into the required format.
Args:
starrocks_result: Raw result from StarRocks query
metric_name: Name of the metric being queried
product_id: Product ID for the metric
Returns:
List of dictionaries with 'date' and 'value' keys
"""
formatted_data = []
for row in starrocks_result:
# Format the date
date_value = row.get("date_id")
if date_value:
if isinstance(date_value, str):
date_str = date_value
else:
date_str = str(date_value)
else:
continue
# Get the value
value = row.get("registered_cnt", 0)
if value is None:
value = 0
# Create labels dictionary
labels = {
"product_id": row.get("product_id", product_id),
"metric_type": metric_name
}
formatted_data.append({
"date": date_str,
"value": int(value) if value is not None else 0,
"metric": metric_name,
"labels": labels
})
# Sort by date
formatted_data.sort(key=lambda x: x["date"])
return formatted_data
async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]:
"""
Get information about a specific metric including its SQL query.
Args:
product_id: Product ID to identify which product's metrics to query
metric_name: Name of the metric
Returns:
Dictionary containing metric information
Raises:
ValueError: If product_id or metric_name is not found in the SQL mapping
"""
# Check if product_id exists in the mapping
if product_id not in self.METRIC_SQL_MAP:
available_products = ", ".join(self.get_available_products())
error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
# Check if metric name exists in the product's mapping
if metric_name not in self.METRIC_SQL_MAP[product_id]:
available_metrics = ", ".join(self.get_available_metrics(product_id))
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}"
await self.module_logger.log_error(error_msg)
raise HTTPException(status_code=404, detail=error_msg)
return {
"product_id": product_id,
"metric_name": metric_name,
"sql_query": self.METRIC_SQL_MAP[product_id][metric_name].strip(),
"description": "Daily registered users count from StarRocks table dws_daily_registered_users"
}

View File

@ -3,6 +3,12 @@ from typing import Optional
class AppSettings(BaseSettings):
# Server settings
SERVER_HOST: str = "0.0.0.0"
SERVER_PORT: int = 8009
SERVICE_API_ACCESS_HOST: str = "0.0.0.0"
SERVICE_API_ACCESS_PORT: int = 8009
# Log settings
LOG_BASE_PATH: str = "./logs"
BACKEND_LOG_FILE_NAME: str = "freeleaps-metrics"

View File

@ -8,6 +8,159 @@ We support two ways to query metrics:
We can implement StarRocks Metric queries similar to Prometheus Metric queries. The only difference is replacing PromQL with SQL and querying through StarRocks API.
## 2.1.Metrics Config
Currently, metrics are configured in code through the `StarRocksMetricsService.METRIC_SQL_MAP` dictionary. In the future, they will be configured through database or other methods.
Organization structure: Product ID -> Metric Name -> SQL Query
```python
METRIC_SQL_MAP: Dict[str, Dict[str, str]] = {
"freeleaps": {
"daily_registered_users": """
SELECT
date_id,
product_id,
registered_cnt,
updated_at
FROM dws_daily_registered_users
WHERE date_id >= %s
AND date_id <= %s
AND product_id = %s
ORDER BY date_id ASC
""",
},
"magicleaps": {
# Future metrics can be added here
}
}
```
## 2.2.API Design
### 2.2.1.Query Metrics by Product ID
API: `/api/metrics/starrocks/product/{product_id}/available-metrics`
Method: GET
Request:
```
product_id=freeleaps
```
Response:
```json
{
"product_id": "freeleaps",
"available_metrics": [
"daily_registered_users"
],
"total_count": 1,
"description": "List of StarRocks-backed metrics for product 'freeleaps'"
}
```
### 2.2.2.Query Metric Info
API: `/api/metrics/starrocks/product/{product_id}/metric/{metric_name}/info`
Method: GET
Request:
```
product_id=freeleaps
metric_name=daily_registered_users
```
Response:
```json
{
"metric_info": {
"product_id": "freeleaps",
"metric_name": "daily_registered_users",
"sql_query": "SELECT date_id, product_id, registered_cnt, updated_at FROM dws_daily_registered_users WHERE date_id >= %s AND date_id <= %s AND product_id = %s ORDER BY date_id ASC",
"description": "Daily registered users count from StarRocks table dws_daily_registered_users"
},
"description": "Information about StarRocks metric 'daily_registered_users' in product 'freeleaps'"
}
```
### 2.2.3.Query Metric Data
API: `/api/metrics/starrocks/metrics_query`
Method: POST
Request:
```json
{
"product_id": "freeleaps",
"metric_name": "daily_registered_users",
"start_date": "2024-09-10",
"end_date": "2024-09-20"
}
```
Response:
```json
{
"metric_name": "daily_registered_users",
"data_points": [
{
"date": "2024-09-10",
"value": 45,
"labels": {
"product_id": "freeleaps",
"metric_type": "daily_registered_users"
}
},
{
"date": "2024-09-11",
"value": 52,
"labels": {
"product_id": "freeleaps",
"metric_type": "daily_registered_users"
}
},
{
"date": "2024-09-12",
"value": 38,
"labels": {
"product_id": "freeleaps",
"metric_type": "daily_registered_users"
}
},
...
{
"date": "2024-09-19",
"value": 67,
"labels": {
"product_id": "freeleaps",
"metric_type": "daily_registered_users"
}
}
],
"total_points": 10,
"time_range": {
"start": "2024-09-10",
"end": "2024-09-19"
}
}
```
## 2.3.Technical Implementation
### 2.3.1.StarRocks Client
- Uses PyMySQL to connect to StarRocks database
- Supports parameterized queries for security
- Automatic connection management with context manager
- Error handling and logging
### 2.3.2.Data Format
- Date format: `YYYY-MM-DD`
- Values are returned as integers or floats
- Labels include product_id and metric_type for debugging
- Results are sorted by date in ascending order
### 2.3.3.Validation
- Date range validation (start_date < end_date)
- Maximum date range limit (1 year)
- Product ID and metric name validation against available mappings
- Input format validation for date strings
# 3.Prometheus Metric
## 3.1.Metrics Config

View File

@ -1,5 +1,8 @@
from fastapi import APIRouter
from webapi.routes.metrics import router
from webapi.routes.starrocks_metrics import api_router as starrocks_metrics_router
from webapi.routes.prometheus_metrics import api_router as prometheus_metrics_router
api_router = APIRouter()
api_router.include_router(router, tags=["metrics"])
api_router.include_router(starrocks_metrics_router, prefix="/starrocks", tags=["starrocks-metrics"])
api_router.include_router(prometheus_metrics_router, prefix="/prometheus", tags=["prometheus-metrics"])

View File

@ -1,7 +0,0 @@
from fastapi import APIRouter
from webapi.routes.starrocks_metrics import api_router as starrocks_metrics_router
from webapi.routes.prometheus_metrics import api_router as prometheus_metrics_router
router = APIRouter()
router.include_router(starrocks_metrics_router, prefix="/metrics", tags=["starrocks-metrics"])
router.include_router(prometheus_metrics_router, prefix="/metrics", tags=["prometheus-metrics"])

View File

@ -1,22 +1,14 @@
from fastapi import APIRouter, HTTPException
from common.log.module_logger import ModuleLogger
from backend.services.starrocks_metrics_service import StarRocksMetricsService
router = APIRouter()
# Initialize logger
# Initialize service and logger
starrocks_service = StarRocksMetricsService()
module_logger = ModuleLogger(__file__)
# Product -> supported StarRocks-backed metrics
SUPPORTED_STARROCKS_METRICS_MAP = {
"freeleaps": [
"daily_registered_users",
],
"magicleaps": [
"daily_registered_users",
],
}
@router.get("/starrocks/product/{product_id}/available-metrics")
async def get_available_metrics(product_id: str):
@ -32,10 +24,14 @@ async def get_available_metrics(product_id: str):
f"Getting StarRocks available metrics list for product_id: {product_id}"
)
if product_id not in SUPPORTED_STARROCKS_METRICS_MAP:
raise HTTPException(status_code=404, detail=f"Unknown product_id: {product_id}")
metrics = starrocks_service.get_available_metrics(product_id)
metrics = SUPPORTED_STARROCKS_METRICS_MAP[product_id]
if not metrics:
available_products = ", ".join(starrocks_service.get_available_products())
raise HTTPException(
status_code=404,
detail=f"Unknown product_id: {product_id}. Available products: {available_products}"
)
return {
"product_id": product_id,

View File

@ -1,22 +1,14 @@
from fastapi import APIRouter, HTTPException
from common.log.module_logger import ModuleLogger
from backend.services.starrocks_metrics_service import StarRocksMetricsService
router = APIRouter()
# Initialize logger
# Initialize service and logger
starrocks_service = StarRocksMetricsService()
module_logger = ModuleLogger(__file__)
# Product -> metric -> description
STARROCKS_METRIC_DESCRIPTIONS = {
"freeleaps": {
"daily_registered_users": "Daily registered users count from StarRocks table dws_daily_registered_users",
},
"magicleaps": {
"daily_registered_users": "Daily registered users count from StarRocks table dws_daily_registered_users",
},
}
@router.get("/starrocks/product/{product_id}/metric/{metric_name}/info")
async def get_metric_info(
@ -34,18 +26,7 @@ async def get_metric_info(
f"Getting StarRocks metric info for metric '{metric_name}' from product '{product_id}'"
)
if product_id not in STARROCKS_METRIC_DESCRIPTIONS:
raise HTTPException(status_code=404, detail=f"Unknown product_id: {product_id}")
product_metrics = STARROCKS_METRIC_DESCRIPTIONS[product_id]
if metric_name not in product_metrics:
raise HTTPException(status_code=404, detail=f"Unknown metric '{metric_name}' for product '{product_id}'")
metric_info = {
"product_id": product_id,
"metric_name": metric_name,
"description": product_metrics[metric_name],
}
metric_info = await starrocks_service.get_metric_info(product_id, metric_name)
return {
"metric_info": metric_info,

View File

@ -1,31 +1,31 @@
from fastapi import APIRouter
from typing import Optional, List, Dict, Any
from typing import Optional, List, Dict, Any, Union
from pydantic import BaseModel, Field
from datetime import date
from datetime import date, datetime
from common.log.module_logger import ModuleLogger
from backend.services.registration_analytics_service import RegistrationService
from backend.services.starrocks_metrics_service import StarRocksMetricsService
class RegistrationDataPoint(BaseModel):
"""Single data point in registration time series."""
class MetricDataPoint(BaseModel):
"""Single data point in metric time series."""
date: str = Field(..., description="Date in YYYY-MM-DD format")
value: int = Field(..., description="Number of registered users")
product_id: str = Field(..., description="Product identifier")
value: Union[int, float] = Field(..., description="Metric value")
labels: Dict[str, Any] = Field(default_factory=dict, description="Metric labels")
class RegistrationTimeSeriesResponse(BaseModel):
"""Response model for registration time series data."""
class MetricTimeSeriesResponse(BaseModel):
"""Response model for metric time series data."""
metric_name: str = Field(..., description="Name of the queried metric")
data_points: List[RegistrationDataPoint] = Field(..., description="List of data points")
data_points: List[MetricDataPoint] = Field(..., description="List of data points")
total_points: int = Field(..., description="Total number of data points")
time_range: Dict[str, str] = Field(..., description="Start and end date of the query")
total_registrations: int = Field(..., description="Total number of registrations in the period")
class RegistrationQueryRequest(BaseModel):
"""Request model for registration query."""
product_id: str = Field("freeleaps", description="Product ID to identify which product's data to query")
class MetricQueryRequest(BaseModel):
"""Request model for metric query."""
product_id: str = Field(..., description="Product ID to identify which product's data to query")
metric_name: str = Field(..., description="Name of the metric to query")
start_date: str = Field(..., description="Start date in YYYY-MM-DD format")
end_date: str = Field(..., description="End date in YYYY-MM-DD format")
@ -33,63 +33,48 @@ class RegistrationQueryRequest(BaseModel):
router = APIRouter()
# Initialize service and logger
registration_service = RegistrationService()
starrocks_service = StarRocksMetricsService()
module_logger = ModuleLogger(__file__)
@router.post("/starrocks/dru_query", response_model=RegistrationTimeSeriesResponse)
@router.post("/starrocks/metrics_query", response_model=MetricTimeSeriesResponse)
async def metrics_query(
request: RegistrationQueryRequest
request: MetricQueryRequest
):
"""
Query registration time series data.
Query StarRocks metrics time series data.
Returns XY curve data (time series) for user registrations within the given date range.
Returns XY curve data (time series) for the specified metric within the given date range.
"""
await module_logger.log_info(
f"Querying registration data for product '{request.product_id}' from {request.start_date} to {request.end_date}")
f"Querying metric '{request.metric_name}' from product '{request.product_id}' from {request.start_date} to {request.end_date}")
# Parse dates - handle both YYYY-M-D and YYYY-MM-DD formats
def parse_date(date_str: str) -> date:
try:
return date.fromisoformat(date_str)
except ValueError:
# Try to parse YYYY-M-D format and convert to YYYY-MM-DD
parts = date_str.split('-')
if len(parts) == 3:
year, month, day = parts
return date(int(year), int(month), int(day))
raise ValueError(f"Invalid date format: {date_str}")
start_date = parse_date(request.start_date)
end_date = parse_date(request.end_date)
# Query the registration data
result = registration_service.get_daily_registered_users(
start_date=start_date,
end_date=end_date,
product_id=request.product_id
# Query the metric data
data_points = await starrocks_service.query_metric_by_time_range(
product_id=request.product_id,
metric_name=request.metric_name,
start_date=request.start_date,
end_date=request.end_date
)
# Format response
response = RegistrationTimeSeriesResponse(
metric_name="daily_registered_users",
response = MetricTimeSeriesResponse(
metric_name=request.metric_name,
data_points=[
RegistrationDataPoint(
date=date_str,
value=count,
product_id=request.product_id
MetricDataPoint(
date=point["date"],
value=point["value"],
labels=point["labels"]
)
for date_str, count in zip(result.dates, result.counts)
for point in data_points
],
total_points=len(result.dates),
total_points=len(data_points),
time_range={
"start": request.start_date,
"end": request.end_date
},
total_registrations=result.total_registrations
}
)
await module_logger.log_info(
f"Successfully queried registration data with {len(result.dates)} data points, total registrations: {result.total_registrations}")
f"Successfully queried metric '{request.metric_name}' with {len(data_points)} data points")
return response