feat(change code structure)
This commit is contained in:
parent
97240e598a
commit
98405934ce
@ -1,132 +0,0 @@
|
|||||||
from typing import List, Dict, Any
|
|
||||||
from datetime import date, timedelta
|
|
||||||
from loguru import logger
|
|
||||||
from backend.infra.external_service.starrocks_client import StarRocksClient
|
|
||||||
from backend.models.user_registration_models import UserRegistrationResponse, DailyRegisteredUsers
|
|
||||||
|
|
||||||
|
|
||||||
class RegistrationService:
|
|
||||||
"""Service for handling user registration data queries"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.starrocks_client = StarRocksClient()
|
|
||||||
|
|
||||||
def get_daily_registered_users(
|
|
||||||
self,
|
|
||||||
start_date: date,
|
|
||||||
end_date: date,
|
|
||||||
product_id: str = "freeleaps"
|
|
||||||
) -> UserRegistrationResponse:
|
|
||||||
"""
|
|
||||||
Get daily registered users count for a date range
|
|
||||||
|
|
||||||
Args:
|
|
||||||
start_date: Start date for the query
|
|
||||||
end_date: End date for the query
|
|
||||||
product_id: Product identifier (default: freeleaps)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
UserRegistrationResponse with dates and counts
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# Query data from StarRocks
|
|
||||||
raw_data = self.starrocks_client.get_daily_registered_users(
|
|
||||||
start_date, end_date, product_id
|
|
||||||
)
|
|
||||||
# Convert to DailyRegisteredUsers objects
|
|
||||||
daily_data = [
|
|
||||||
DailyRegisteredUsers(
|
|
||||||
date_id=row['date_id'],
|
|
||||||
product_id=row['product_id'],
|
|
||||||
registered_cnt=row['registered_cnt'],
|
|
||||||
updated_at=row.get('updated_at')
|
|
||||||
)
|
|
||||||
for row in raw_data
|
|
||||||
]
|
|
||||||
# Create date-to-count mapping
|
|
||||||
data_dict = {str(item.date_id): item.registered_cnt for item in daily_data}
|
|
||||||
|
|
||||||
# Generate complete date range
|
|
||||||
dates = []
|
|
||||||
counts = []
|
|
||||||
current_date = start_date
|
|
||||||
|
|
||||||
while current_date <= end_date:
|
|
||||||
date_str = str(current_date)
|
|
||||||
dates.append(date_str)
|
|
||||||
counts.append(data_dict.get(date_str, 0))
|
|
||||||
current_date += timedelta(days=1)
|
|
||||||
|
|
||||||
# Calculate total registrations
|
|
||||||
total_registrations = sum(counts)
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f"Retrieved registration data for {len(dates)} days, "
|
|
||||||
f"total registrations: {total_registrations}"
|
|
||||||
)
|
|
||||||
|
|
||||||
return UserRegistrationResponse(
|
|
||||||
dates=dates,
|
|
||||||
counts=counts,
|
|
||||||
total_registrations=total_registrations,
|
|
||||||
query_period=f"{start_date} to {end_date}"
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get daily registered users: {e}")
|
|
||||||
raise e
|
|
||||||
|
|
||||||
def get_registration_summary(
|
|
||||||
self,
|
|
||||||
start_date: date,
|
|
||||||
end_date: date,
|
|
||||||
product_id: str = "freeleaps"
|
|
||||||
) -> Dict[str, Any]:
|
|
||||||
"""
|
|
||||||
Get summary statistics for user registrations
|
|
||||||
|
|
||||||
Args:
|
|
||||||
start_date: Start date for the query
|
|
||||||
end_date: End date for the query
|
|
||||||
product_id: Product identifier
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dictionary with summary statistics
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
response = self.get_daily_registered_users(start_date, end_date, product_id)
|
|
||||||
|
|
||||||
if not response.counts:
|
|
||||||
return {
|
|
||||||
"total_registrations": 0,
|
|
||||||
"average_daily": 0,
|
|
||||||
"max_daily": 0,
|
|
||||||
"min_daily": 0,
|
|
||||||
"days_with_registrations": 0,
|
|
||||||
"total_days": len(response.dates)
|
|
||||||
}
|
|
||||||
|
|
||||||
counts = response.counts
|
|
||||||
non_zero_counts = [c for c in counts if c > 0]
|
|
||||||
|
|
||||||
return {
|
|
||||||
"total_registrations": response.total_registrations,
|
|
||||||
"average_daily": round(sum(counts) / len(counts), 2),
|
|
||||||
"max_daily": max(counts),
|
|
||||||
"min_daily": min(counts),
|
|
||||||
"days_with_registrations": len(non_zero_counts),
|
|
||||||
"total_days": len(response.dates)
|
|
||||||
}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to get registration summary: {e}")
|
|
||||||
raise e
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
268
apps/metrics/backend/services/starrocks_metrics_service.py
Normal file
268
apps/metrics/backend/services/starrocks_metrics_service.py
Normal file
@ -0,0 +1,268 @@
|
|||||||
|
from typing import Dict, List, Any, Optional, Union
|
||||||
|
from datetime import datetime, timedelta, date
|
||||||
|
from fastapi import HTTPException
|
||||||
|
|
||||||
|
from common.log.module_logger import ModuleLogger
|
||||||
|
from ..infra.external_service.starrocks_client import StarRocksClient
|
||||||
|
|
||||||
|
|
||||||
|
class StarRocksMetricsService:
|
||||||
|
"""
|
||||||
|
Service class for querying StarRocks metrics with predefined SQL queries.
|
||||||
|
|
||||||
|
This service provides a high-level interface for querying metrics data
|
||||||
|
using predefined SQL queries mapped to metric names.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Global dictionary mapping metric names to their corresponding SQL queries
|
||||||
|
METRIC_SQL_MAP: Dict[str, Dict[str, str]] = {
|
||||||
|
"freeleaps": {
|
||||||
|
"daily_registered_users": """
|
||||||
|
SELECT
|
||||||
|
date_id,
|
||||||
|
product_id,
|
||||||
|
registered_cnt,
|
||||||
|
updated_at
|
||||||
|
FROM dws_daily_registered_users
|
||||||
|
WHERE date_id >= %s
|
||||||
|
AND date_id <= %s
|
||||||
|
AND product_id = %s
|
||||||
|
ORDER BY date_id ASC
|
||||||
|
""",
|
||||||
|
},
|
||||||
|
"magicleaps": {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, starrocks_endpoint: Optional[str] = None):
|
||||||
|
"""
|
||||||
|
Initialize StarRocksMetricsService.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
starrocks_endpoint: StarRocks server endpoint. If None, uses default from settings.
|
||||||
|
"""
|
||||||
|
self.module_logger = ModuleLogger(__file__)
|
||||||
|
self.starrocks_client = StarRocksClient()
|
||||||
|
|
||||||
|
def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]:
|
||||||
|
"""
|
||||||
|
Get list of available metric names that have predefined SQL queries.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
product_id: Optional product ID to filter metrics. If None, returns all metrics from all products.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of available metric names
|
||||||
|
"""
|
||||||
|
if product_id:
|
||||||
|
if product_id in self.METRIC_SQL_MAP:
|
||||||
|
return list(self.METRIC_SQL_MAP[product_id].keys())
|
||||||
|
else:
|
||||||
|
return []
|
||||||
|
else:
|
||||||
|
# Return all metrics from all products
|
||||||
|
all_metrics = []
|
||||||
|
for product_metrics in self.METRIC_SQL_MAP.values():
|
||||||
|
all_metrics.extend(product_metrics.keys())
|
||||||
|
return all_metrics
|
||||||
|
|
||||||
|
def get_available_products(self) -> List[str]:
|
||||||
|
"""
|
||||||
|
Get list of available product IDs.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of available product IDs
|
||||||
|
"""
|
||||||
|
return list(self.METRIC_SQL_MAP.keys())
|
||||||
|
|
||||||
|
async def query_metric_by_time_range(
|
||||||
|
self,
|
||||||
|
product_id: str,
|
||||||
|
metric_name: str,
|
||||||
|
start_date: Union[str, date],
|
||||||
|
end_date: Union[str, date]
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Query metric data for a specific date range.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
product_id: Product ID to identify which product's metrics to query
|
||||||
|
metric_name: Name of the metric to query
|
||||||
|
start_date: Start date for the query (ISO string or date)
|
||||||
|
end_date: End date for the query (ISO string or date)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of dictionaries with 'date' and 'value' keys
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If product_id or metric_name is not found in the SQL mapping
|
||||||
|
Exception: If StarRocks query fails
|
||||||
|
|
||||||
|
Example:
|
||||||
|
result = await service.query_metric_by_time_range(
|
||||||
|
"freeleaps",
|
||||||
|
"daily_registered_users",
|
||||||
|
start_date=date.today() - timedelta(days=30),
|
||||||
|
end_date=date.today()
|
||||||
|
)
|
||||||
|
# Returns: [{"date": "2024-01-01", "value": 45, "labels": {...}}, ...]
|
||||||
|
"""
|
||||||
|
# Check if product_id exists in the mapping
|
||||||
|
if product_id not in self.METRIC_SQL_MAP:
|
||||||
|
available_products = ", ".join(self.get_available_products())
|
||||||
|
error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}"
|
||||||
|
await self.module_logger.log_error(error_msg)
|
||||||
|
raise HTTPException(status_code=404, detail=error_msg)
|
||||||
|
|
||||||
|
# Check if metric name exists in the product's mapping
|
||||||
|
if metric_name not in self.METRIC_SQL_MAP[product_id]:
|
||||||
|
available_metrics = ", ".join(self.get_available_metrics(product_id))
|
||||||
|
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}"
|
||||||
|
await self.module_logger.log_error(error_msg)
|
||||||
|
raise HTTPException(status_code=404, detail=error_msg)
|
||||||
|
|
||||||
|
# Parse date strings if they are strings
|
||||||
|
if isinstance(start_date, str):
|
||||||
|
try:
|
||||||
|
start_dt = datetime.strptime(start_date, '%Y-%m-%d').date()
|
||||||
|
except ValueError:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400,
|
||||||
|
detail="Invalid start_date format. Expected YYYY-MM-DD"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
start_dt = start_date
|
||||||
|
|
||||||
|
if isinstance(end_date, str):
|
||||||
|
try:
|
||||||
|
end_dt = datetime.strptime(end_date, '%Y-%m-%d').date()
|
||||||
|
except ValueError:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400,
|
||||||
|
detail="Invalid end_date format. Expected YYYY-MM-DD"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
end_dt = end_date
|
||||||
|
|
||||||
|
# Validate date range
|
||||||
|
if start_dt >= end_dt:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400,
|
||||||
|
detail="Start date must be before end date"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check date range is not too large (max 1 year)
|
||||||
|
time_diff = end_dt - start_dt
|
||||||
|
if time_diff > timedelta(days=365):
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400,
|
||||||
|
detail="Date range cannot exceed 1 year"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get the SQL query for the metric
|
||||||
|
sql_query = self.METRIC_SQL_MAP[product_id][metric_name]
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self.module_logger.log_info(
|
||||||
|
f"Querying metric '{metric_name}' from product '{product_id}' from {start_dt} to {end_dt}")
|
||||||
|
|
||||||
|
# Execute the query
|
||||||
|
result = self.starrocks_client.execute_query(
|
||||||
|
query=sql_query,
|
||||||
|
params=(start_dt, end_dt, product_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Parse the result and format it
|
||||||
|
formatted_data = self._format_query_result(result, metric_name, product_id)
|
||||||
|
|
||||||
|
await self.module_logger.log_info(
|
||||||
|
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
|
||||||
|
return formatted_data
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
def _format_query_result(self, starrocks_result: List[Dict[str, Any]], metric_name: str, product_id: str) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Format StarRocks query result into the required format.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
starrocks_result: Raw result from StarRocks query
|
||||||
|
metric_name: Name of the metric being queried
|
||||||
|
product_id: Product ID for the metric
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of dictionaries with 'date' and 'value' keys
|
||||||
|
"""
|
||||||
|
formatted_data = []
|
||||||
|
|
||||||
|
for row in starrocks_result:
|
||||||
|
# Format the date
|
||||||
|
date_value = row.get("date_id")
|
||||||
|
if date_value:
|
||||||
|
if isinstance(date_value, str):
|
||||||
|
date_str = date_value
|
||||||
|
else:
|
||||||
|
date_str = str(date_value)
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Get the value
|
||||||
|
value = row.get("registered_cnt", 0)
|
||||||
|
if value is None:
|
||||||
|
value = 0
|
||||||
|
|
||||||
|
# Create labels dictionary
|
||||||
|
labels = {
|
||||||
|
"product_id": row.get("product_id", product_id),
|
||||||
|
"metric_type": metric_name
|
||||||
|
}
|
||||||
|
|
||||||
|
formatted_data.append({
|
||||||
|
"date": date_str,
|
||||||
|
"value": int(value) if value is not None else 0,
|
||||||
|
"metric": metric_name,
|
||||||
|
"labels": labels
|
||||||
|
})
|
||||||
|
|
||||||
|
# Sort by date
|
||||||
|
formatted_data.sort(key=lambda x: x["date"])
|
||||||
|
|
||||||
|
return formatted_data
|
||||||
|
|
||||||
|
async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Get information about a specific metric including its SQL query.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
product_id: Product ID to identify which product's metrics to query
|
||||||
|
metric_name: Name of the metric
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary containing metric information
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If product_id or metric_name is not found in the SQL mapping
|
||||||
|
"""
|
||||||
|
# Check if product_id exists in the mapping
|
||||||
|
if product_id not in self.METRIC_SQL_MAP:
|
||||||
|
available_products = ", ".join(self.get_available_products())
|
||||||
|
error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}"
|
||||||
|
await self.module_logger.log_error(error_msg)
|
||||||
|
raise HTTPException(status_code=404, detail=error_msg)
|
||||||
|
|
||||||
|
# Check if metric name exists in the product's mapping
|
||||||
|
if metric_name not in self.METRIC_SQL_MAP[product_id]:
|
||||||
|
available_metrics = ", ".join(self.get_available_metrics(product_id))
|
||||||
|
error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}"
|
||||||
|
await self.module_logger.log_error(error_msg)
|
||||||
|
raise HTTPException(status_code=404, detail=error_msg)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"product_id": product_id,
|
||||||
|
"metric_name": metric_name,
|
||||||
|
"sql_query": self.METRIC_SQL_MAP[product_id][metric_name].strip(),
|
||||||
|
"description": "Daily registered users count from StarRocks table dws_daily_registered_users"
|
||||||
|
}
|
||||||
@ -3,6 +3,12 @@ from typing import Optional
|
|||||||
|
|
||||||
|
|
||||||
class AppSettings(BaseSettings):
|
class AppSettings(BaseSettings):
|
||||||
|
# Server settings
|
||||||
|
SERVER_HOST: str = "0.0.0.0"
|
||||||
|
SERVER_PORT: int = 8009
|
||||||
|
SERVICE_API_ACCESS_HOST: str = "0.0.0.0"
|
||||||
|
SERVICE_API_ACCESS_PORT: int = 8009
|
||||||
|
|
||||||
# Log settings
|
# Log settings
|
||||||
LOG_BASE_PATH: str = "./logs"
|
LOG_BASE_PATH: str = "./logs"
|
||||||
BACKEND_LOG_FILE_NAME: str = "freeleaps-metrics"
|
BACKEND_LOG_FILE_NAME: str = "freeleaps-metrics"
|
||||||
|
|||||||
@ -1,5 +1,8 @@
|
|||||||
from fastapi import APIRouter
|
from fastapi import APIRouter
|
||||||
from webapi.routes.metrics import router
|
from webapi.routes.starrocks_metrics import api_router as starrocks_metrics_router
|
||||||
|
from webapi.routes.prometheus_metrics import api_router as prometheus_metrics_router
|
||||||
|
|
||||||
api_router = APIRouter()
|
api_router = APIRouter()
|
||||||
|
|
||||||
api_router.include_router(router, tags=["metrics"])
|
api_router.include_router(starrocks_metrics_router, prefix="/starrocks", tags=["starrocks-metrics"])
|
||||||
|
api_router.include_router(prometheus_metrics_router, prefix="/prometheus", tags=["prometheus-metrics"])
|
||||||
|
|||||||
@ -1,7 +0,0 @@
|
|||||||
from fastapi import APIRouter
|
|
||||||
from webapi.routes.starrocks_metrics import api_router as starrocks_metrics_router
|
|
||||||
from webapi.routes.prometheus_metrics import api_router as prometheus_metrics_router
|
|
||||||
|
|
||||||
router = APIRouter()
|
|
||||||
router.include_router(starrocks_metrics_router, prefix="/metrics", tags=["starrocks-metrics"])
|
|
||||||
router.include_router(prometheus_metrics_router, prefix="/metrics", tags=["prometheus-metrics"])
|
|
||||||
@ -1,22 +1,14 @@
|
|||||||
from fastapi import APIRouter, HTTPException
|
from fastapi import APIRouter, HTTPException
|
||||||
|
|
||||||
from common.log.module_logger import ModuleLogger
|
from common.log.module_logger import ModuleLogger
|
||||||
|
from backend.services.starrocks_metrics_service import StarRocksMetricsService
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
# Initialize logger
|
# Initialize service and logger
|
||||||
|
starrocks_service = StarRocksMetricsService()
|
||||||
module_logger = ModuleLogger(__file__)
|
module_logger = ModuleLogger(__file__)
|
||||||
|
|
||||||
# Product -> supported StarRocks-backed metrics
|
|
||||||
SUPPORTED_STARROCKS_METRICS_MAP = {
|
|
||||||
"freeleaps": [
|
|
||||||
"daily_registered_users",
|
|
||||||
],
|
|
||||||
"magicleaps": [
|
|
||||||
"daily_registered_users",
|
|
||||||
],
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/starrocks/product/{product_id}/available-metrics")
|
@router.get("/starrocks/product/{product_id}/available-metrics")
|
||||||
async def get_available_metrics(product_id: str):
|
async def get_available_metrics(product_id: str):
|
||||||
@ -32,10 +24,14 @@ async def get_available_metrics(product_id: str):
|
|||||||
f"Getting StarRocks available metrics list for product_id: {product_id}"
|
f"Getting StarRocks available metrics list for product_id: {product_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if product_id not in SUPPORTED_STARROCKS_METRICS_MAP:
|
metrics = starrocks_service.get_available_metrics(product_id)
|
||||||
raise HTTPException(status_code=404, detail=f"Unknown product_id: {product_id}")
|
|
||||||
|
if not metrics:
|
||||||
metrics = SUPPORTED_STARROCKS_METRICS_MAP[product_id]
|
available_products = ", ".join(starrocks_service.get_available_products())
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=404,
|
||||||
|
detail=f"Unknown product_id: {product_id}. Available products: {available_products}"
|
||||||
|
)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"product_id": product_id,
|
"product_id": product_id,
|
||||||
|
|||||||
@ -1,22 +1,14 @@
|
|||||||
from fastapi import APIRouter, HTTPException
|
from fastapi import APIRouter, HTTPException
|
||||||
|
|
||||||
from common.log.module_logger import ModuleLogger
|
from common.log.module_logger import ModuleLogger
|
||||||
|
from backend.services.starrocks_metrics_service import StarRocksMetricsService
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
# Initialize logger
|
# Initialize service and logger
|
||||||
|
starrocks_service = StarRocksMetricsService()
|
||||||
module_logger = ModuleLogger(__file__)
|
module_logger = ModuleLogger(__file__)
|
||||||
|
|
||||||
# Product -> metric -> description
|
|
||||||
STARROCKS_METRIC_DESCRIPTIONS = {
|
|
||||||
"freeleaps": {
|
|
||||||
"daily_registered_users": "Daily registered users count from StarRocks table dws_daily_registered_users",
|
|
||||||
},
|
|
||||||
"magicleaps": {
|
|
||||||
"daily_registered_users": "Daily registered users count from StarRocks table dws_daily_registered_users",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/starrocks/product/{product_id}/metric/{metric_name}/info")
|
@router.get("/starrocks/product/{product_id}/metric/{metric_name}/info")
|
||||||
async def get_metric_info(
|
async def get_metric_info(
|
||||||
@ -34,18 +26,7 @@ async def get_metric_info(
|
|||||||
f"Getting StarRocks metric info for metric '{metric_name}' from product '{product_id}'"
|
f"Getting StarRocks metric info for metric '{metric_name}' from product '{product_id}'"
|
||||||
)
|
)
|
||||||
|
|
||||||
if product_id not in STARROCKS_METRIC_DESCRIPTIONS:
|
metric_info = await starrocks_service.get_metric_info(product_id, metric_name)
|
||||||
raise HTTPException(status_code=404, detail=f"Unknown product_id: {product_id}")
|
|
||||||
|
|
||||||
product_metrics = STARROCKS_METRIC_DESCRIPTIONS[product_id]
|
|
||||||
if metric_name not in product_metrics:
|
|
||||||
raise HTTPException(status_code=404, detail=f"Unknown metric '{metric_name}' for product '{product_id}'")
|
|
||||||
|
|
||||||
metric_info = {
|
|
||||||
"product_id": product_id,
|
|
||||||
"metric_name": metric_name,
|
|
||||||
"description": product_metrics[metric_name],
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"metric_info": metric_info,
|
"metric_info": metric_info,
|
||||||
|
|||||||
@ -1,31 +1,31 @@
|
|||||||
from fastapi import APIRouter
|
from fastapi import APIRouter
|
||||||
from typing import Optional, List, Dict, Any
|
from typing import Optional, List, Dict, Any, Union
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
from datetime import date
|
from datetime import date, datetime
|
||||||
|
|
||||||
from common.log.module_logger import ModuleLogger
|
from common.log.module_logger import ModuleLogger
|
||||||
from backend.services.registration_analytics_service import RegistrationService
|
from backend.services.starrocks_metrics_service import StarRocksMetricsService
|
||||||
|
|
||||||
|
|
||||||
class RegistrationDataPoint(BaseModel):
|
class MetricDataPoint(BaseModel):
|
||||||
"""Single data point in registration time series."""
|
"""Single data point in metric time series."""
|
||||||
date: str = Field(..., description="Date in YYYY-MM-DD format")
|
date: str = Field(..., description="Date in YYYY-MM-DD format")
|
||||||
value: int = Field(..., description="Number of registered users")
|
value: Union[int, float] = Field(..., description="Metric value")
|
||||||
product_id: str = Field(..., description="Product identifier")
|
labels: Dict[str, Any] = Field(default_factory=dict, description="Metric labels")
|
||||||
|
|
||||||
|
|
||||||
class RegistrationTimeSeriesResponse(BaseModel):
|
class MetricTimeSeriesResponse(BaseModel):
|
||||||
"""Response model for registration time series data."""
|
"""Response model for metric time series data."""
|
||||||
metric_name: str = Field(..., description="Name of the queried metric")
|
metric_name: str = Field(..., description="Name of the queried metric")
|
||||||
data_points: List[RegistrationDataPoint] = Field(..., description="List of data points")
|
data_points: List[MetricDataPoint] = Field(..., description="List of data points")
|
||||||
total_points: int = Field(..., description="Total number of data points")
|
total_points: int = Field(..., description="Total number of data points")
|
||||||
time_range: Dict[str, str] = Field(..., description="Start and end date of the query")
|
time_range: Dict[str, str] = Field(..., description="Start and end date of the query")
|
||||||
total_registrations: int = Field(..., description="Total number of registrations in the period")
|
|
||||||
|
|
||||||
|
|
||||||
class RegistrationQueryRequest(BaseModel):
|
class MetricQueryRequest(BaseModel):
|
||||||
"""Request model for registration query."""
|
"""Request model for metric query."""
|
||||||
product_id: str = Field("freeleaps", description="Product ID to identify which product's data to query")
|
product_id: str = Field(..., description="Product ID to identify which product's data to query")
|
||||||
|
metric_name: str = Field(..., description="Name of the metric to query")
|
||||||
start_date: str = Field(..., description="Start date in YYYY-MM-DD format")
|
start_date: str = Field(..., description="Start date in YYYY-MM-DD format")
|
||||||
end_date: str = Field(..., description="End date in YYYY-MM-DD format")
|
end_date: str = Field(..., description="End date in YYYY-MM-DD format")
|
||||||
|
|
||||||
@ -33,63 +33,48 @@ class RegistrationQueryRequest(BaseModel):
|
|||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
# Initialize service and logger
|
# Initialize service and logger
|
||||||
registration_service = RegistrationService()
|
starrocks_service = StarRocksMetricsService()
|
||||||
module_logger = ModuleLogger(__file__)
|
module_logger = ModuleLogger(__file__)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/starrocks/dru_query", response_model=RegistrationTimeSeriesResponse)
|
@router.post("/starrocks/metrics_query", response_model=MetricTimeSeriesResponse)
|
||||||
async def metrics_query(
|
async def metrics_query(
|
||||||
request: RegistrationQueryRequest
|
request: MetricQueryRequest
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Query registration time series data.
|
Query StarRocks metrics time series data.
|
||||||
|
|
||||||
Returns XY curve data (time series) for user registrations within the given date range.
|
Returns XY curve data (time series) for the specified metric within the given date range.
|
||||||
"""
|
"""
|
||||||
await module_logger.log_info(
|
await module_logger.log_info(
|
||||||
f"Querying registration data for product '{request.product_id}' from {request.start_date} to {request.end_date}")
|
f"Querying metric '{request.metric_name}' from product '{request.product_id}' from {request.start_date} to {request.end_date}")
|
||||||
|
|
||||||
# Parse dates - handle both YYYY-M-D and YYYY-MM-DD formats
|
# Query the metric data
|
||||||
def parse_date(date_str: str) -> date:
|
data_points = await starrocks_service.query_metric_by_time_range(
|
||||||
try:
|
product_id=request.product_id,
|
||||||
return date.fromisoformat(date_str)
|
metric_name=request.metric_name,
|
||||||
except ValueError:
|
start_date=request.start_date,
|
||||||
# Try to parse YYYY-M-D format and convert to YYYY-MM-DD
|
end_date=request.end_date
|
||||||
parts = date_str.split('-')
|
|
||||||
if len(parts) == 3:
|
|
||||||
year, month, day = parts
|
|
||||||
return date(int(year), int(month), int(day))
|
|
||||||
raise ValueError(f"Invalid date format: {date_str}")
|
|
||||||
|
|
||||||
start_date = parse_date(request.start_date)
|
|
||||||
end_date = parse_date(request.end_date)
|
|
||||||
|
|
||||||
# Query the registration data
|
|
||||||
result = registration_service.get_daily_registered_users(
|
|
||||||
start_date=start_date,
|
|
||||||
end_date=end_date,
|
|
||||||
product_id=request.product_id
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Format response
|
# Format response
|
||||||
response = RegistrationTimeSeriesResponse(
|
response = MetricTimeSeriesResponse(
|
||||||
metric_name="daily_registered_users",
|
metric_name=request.metric_name,
|
||||||
data_points=[
|
data_points=[
|
||||||
RegistrationDataPoint(
|
MetricDataPoint(
|
||||||
date=date_str,
|
date=point["date"],
|
||||||
value=count,
|
value=point["value"],
|
||||||
product_id=request.product_id
|
labels=point["labels"]
|
||||||
)
|
)
|
||||||
for date_str, count in zip(result.dates, result.counts)
|
for point in data_points
|
||||||
],
|
],
|
||||||
total_points=len(result.dates),
|
total_points=len(data_points),
|
||||||
time_range={
|
time_range={
|
||||||
"start": request.start_date,
|
"start": request.start_date,
|
||||||
"end": request.end_date
|
"end": request.end_date
|
||||||
},
|
}
|
||||||
total_registrations=result.total_registrations
|
|
||||||
)
|
)
|
||||||
|
|
||||||
await module_logger.log_info(
|
await module_logger.log_info(
|
||||||
f"Successfully queried registration data with {len(result.dates)} data points, total registrations: {result.total_registrations}")
|
f"Successfully queried metric '{request.metric_name}' with {len(data_points)} data points")
|
||||||
return response
|
return response
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user