diff --git a/apps/metrics/backend/services/registration_analytics_service.py b/apps/metrics/backend/services/registration_analytics_service.py deleted file mode 100644 index eb843f0..0000000 --- a/apps/metrics/backend/services/registration_analytics_service.py +++ /dev/null @@ -1,132 +0,0 @@ -from typing import List, Dict, Any -from datetime import date, timedelta -from loguru import logger -from backend.infra.external_service.starrocks_client import StarRocksClient -from backend.models.user_registration_models import UserRegistrationResponse, DailyRegisteredUsers - - -class RegistrationService: - """Service for handling user registration data queries""" - - def __init__(self): - self.starrocks_client = StarRocksClient() - - def get_daily_registered_users( - self, - start_date: date, - end_date: date, - product_id: str = "freeleaps" - ) -> UserRegistrationResponse: - """ - Get daily registered users count for a date range - - Args: - start_date: Start date for the query - end_date: End date for the query - product_id: Product identifier (default: freeleaps) - - Returns: - UserRegistrationResponse with dates and counts - """ - try: - # Query data from StarRocks - raw_data = self.starrocks_client.get_daily_registered_users( - start_date, end_date, product_id - ) - # Convert to DailyRegisteredUsers objects - daily_data = [ - DailyRegisteredUsers( - date_id=row['date_id'], - product_id=row['product_id'], - registered_cnt=row['registered_cnt'], - updated_at=row.get('updated_at') - ) - for row in raw_data - ] - # Create date-to-count mapping - data_dict = {str(item.date_id): item.registered_cnt for item in daily_data} - - # Generate complete date range - dates = [] - counts = [] - current_date = start_date - - while current_date <= end_date: - date_str = str(current_date) - dates.append(date_str) - counts.append(data_dict.get(date_str, 0)) - current_date += timedelta(days=1) - - # Calculate total registrations - total_registrations = sum(counts) - - logger.info( - f"Retrieved registration data for {len(dates)} days, " - f"total registrations: {total_registrations}" - ) - - return UserRegistrationResponse( - dates=dates, - counts=counts, - total_registrations=total_registrations, - query_period=f"{start_date} to {end_date}" - ) - - except Exception as e: - logger.error(f"Failed to get daily registered users: {e}") - raise e - - def get_registration_summary( - self, - start_date: date, - end_date: date, - product_id: str = "freeleaps" - ) -> Dict[str, Any]: - """ - Get summary statistics for user registrations - - Args: - start_date: Start date for the query - end_date: End date for the query - product_id: Product identifier - - Returns: - Dictionary with summary statistics - """ - try: - response = self.get_daily_registered_users(start_date, end_date, product_id) - - if not response.counts: - return { - "total_registrations": 0, - "average_daily": 0, - "max_daily": 0, - "min_daily": 0, - "days_with_registrations": 0, - "total_days": len(response.dates) - } - - counts = response.counts - non_zero_counts = [c for c in counts if c > 0] - - return { - "total_registrations": response.total_registrations, - "average_daily": round(sum(counts) / len(counts), 2), - "max_daily": max(counts), - "min_daily": min(counts), - "days_with_registrations": len(non_zero_counts), - "total_days": len(response.dates) - } - - except Exception as e: - logger.error(f"Failed to get registration summary: {e}") - raise e - - - - - - - - - diff --git a/apps/metrics/backend/services/starrocks_metrics_service.py b/apps/metrics/backend/services/starrocks_metrics_service.py new file mode 100644 index 0000000..329815b --- /dev/null +++ b/apps/metrics/backend/services/starrocks_metrics_service.py @@ -0,0 +1,268 @@ +from typing import Dict, List, Any, Optional, Union +from datetime import datetime, timedelta, date +from fastapi import HTTPException + +from common.log.module_logger import ModuleLogger +from ..infra.external_service.starrocks_client import StarRocksClient + + +class StarRocksMetricsService: + """ + Service class for querying StarRocks metrics with predefined SQL queries. + + This service provides a high-level interface for querying metrics data + using predefined SQL queries mapped to metric names. + """ + + # Global dictionary mapping metric names to their corresponding SQL queries + METRIC_SQL_MAP: Dict[str, Dict[str, str]] = { + "freeleaps": { + "daily_registered_users": """ + SELECT + date_id, + product_id, + registered_cnt, + updated_at + FROM dws_daily_registered_users + WHERE date_id >= %s + AND date_id <= %s + AND product_id = %s + ORDER BY date_id ASC + """, + }, + "magicleaps": { + + } + } + + def __init__(self, starrocks_endpoint: Optional[str] = None): + """ + Initialize StarRocksMetricsService. + + Args: + starrocks_endpoint: StarRocks server endpoint. If None, uses default from settings. + """ + self.module_logger = ModuleLogger(__file__) + self.starrocks_client = StarRocksClient() + + def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]: + """ + Get list of available metric names that have predefined SQL queries. + + Args: + product_id: Optional product ID to filter metrics. If None, returns all metrics from all products. + + Returns: + List of available metric names + """ + if product_id: + if product_id in self.METRIC_SQL_MAP: + return list(self.METRIC_SQL_MAP[product_id].keys()) + else: + return [] + else: + # Return all metrics from all products + all_metrics = [] + for product_metrics in self.METRIC_SQL_MAP.values(): + all_metrics.extend(product_metrics.keys()) + return all_metrics + + def get_available_products(self) -> List[str]: + """ + Get list of available product IDs. + + Returns: + List of available product IDs + """ + return list(self.METRIC_SQL_MAP.keys()) + + async def query_metric_by_time_range( + self, + product_id: str, + metric_name: str, + start_date: Union[str, date], + end_date: Union[str, date] + ) -> List[Dict[str, Any]]: + """ + Query metric data for a specific date range. + + Args: + product_id: Product ID to identify which product's metrics to query + metric_name: Name of the metric to query + start_date: Start date for the query (ISO string or date) + end_date: End date for the query (ISO string or date) + + Returns: + List of dictionaries with 'date' and 'value' keys + + Raises: + ValueError: If product_id or metric_name is not found in the SQL mapping + Exception: If StarRocks query fails + + Example: + result = await service.query_metric_by_time_range( + "freeleaps", + "daily_registered_users", + start_date=date.today() - timedelta(days=30), + end_date=date.today() + ) + # Returns: [{"date": "2024-01-01", "value": 45, "labels": {...}}, ...] + """ + # Check if product_id exists in the mapping + if product_id not in self.METRIC_SQL_MAP: + available_products = ", ".join(self.get_available_products()) + error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}" + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) + + # Check if metric name exists in the product's mapping + if metric_name not in self.METRIC_SQL_MAP[product_id]: + available_metrics = ", ".join(self.get_available_metrics(product_id)) + error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}" + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) + + # Parse date strings if they are strings + if isinstance(start_date, str): + try: + start_dt = datetime.strptime(start_date, '%Y-%m-%d').date() + except ValueError: + raise HTTPException( + status_code=400, + detail="Invalid start_date format. Expected YYYY-MM-DD" + ) + else: + start_dt = start_date + + if isinstance(end_date, str): + try: + end_dt = datetime.strptime(end_date, '%Y-%m-%d').date() + except ValueError: + raise HTTPException( + status_code=400, + detail="Invalid end_date format. Expected YYYY-MM-DD" + ) + else: + end_dt = end_date + + # Validate date range + if start_dt >= end_dt: + raise HTTPException( + status_code=400, + detail="Start date must be before end date" + ) + + # Check date range is not too large (max 1 year) + time_diff = end_dt - start_dt + if time_diff > timedelta(days=365): + raise HTTPException( + status_code=400, + detail="Date range cannot exceed 1 year" + ) + + # Get the SQL query for the metric + sql_query = self.METRIC_SQL_MAP[product_id][metric_name] + + try: + await self.module_logger.log_info( + f"Querying metric '{metric_name}' from product '{product_id}' from {start_dt} to {end_dt}") + + # Execute the query + result = self.starrocks_client.execute_query( + query=sql_query, + params=(start_dt, end_dt, product_id) + ) + + # Parse the result and format it + formatted_data = self._format_query_result(result, metric_name, product_id) + + await self.module_logger.log_info( + f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points") + return formatted_data + + except Exception as e: + await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}") + raise + + def _format_query_result(self, starrocks_result: List[Dict[str, Any]], metric_name: str, product_id: str) -> List[Dict[str, Any]]: + """ + Format StarRocks query result into the required format. + + Args: + starrocks_result: Raw result from StarRocks query + metric_name: Name of the metric being queried + product_id: Product ID for the metric + + Returns: + List of dictionaries with 'date' and 'value' keys + """ + formatted_data = [] + + for row in starrocks_result: + # Format the date + date_value = row.get("date_id") + if date_value: + if isinstance(date_value, str): + date_str = date_value + else: + date_str = str(date_value) + else: + continue + + # Get the value + value = row.get("registered_cnt", 0) + if value is None: + value = 0 + + # Create labels dictionary + labels = { + "product_id": row.get("product_id", product_id), + "metric_type": metric_name + } + + formatted_data.append({ + "date": date_str, + "value": int(value) if value is not None else 0, + "metric": metric_name, + "labels": labels + }) + + # Sort by date + formatted_data.sort(key=lambda x: x["date"]) + + return formatted_data + + async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]: + """ + Get information about a specific metric including its SQL query. + + Args: + product_id: Product ID to identify which product's metrics to query + metric_name: Name of the metric + + Returns: + Dictionary containing metric information + + Raises: + ValueError: If product_id or metric_name is not found in the SQL mapping + """ + # Check if product_id exists in the mapping + if product_id not in self.METRIC_SQL_MAP: + available_products = ", ".join(self.get_available_products()) + error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}" + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) + + # Check if metric name exists in the product's mapping + if metric_name not in self.METRIC_SQL_MAP[product_id]: + available_metrics = ", ".join(self.get_available_metrics(product_id)) + error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}" + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) + + return { + "product_id": product_id, + "metric_name": metric_name, + "sql_query": self.METRIC_SQL_MAP[product_id][metric_name].strip(), + "description": "Daily registered users count from StarRocks table dws_daily_registered_users" + } \ No newline at end of file diff --git a/apps/metrics/common/config/app_settings.py b/apps/metrics/common/config/app_settings.py index 4ea8a84..927ea09 100644 --- a/apps/metrics/common/config/app_settings.py +++ b/apps/metrics/common/config/app_settings.py @@ -3,6 +3,12 @@ from typing import Optional class AppSettings(BaseSettings): + # Server settings + SERVER_HOST: str = "0.0.0.0" + SERVER_PORT: int = 8009 + SERVICE_API_ACCESS_HOST: str = "0.0.0.0" + SERVICE_API_ACCESS_PORT: int = 8009 + # Log settings LOG_BASE_PATH: str = "./logs" BACKEND_LOG_FILE_NAME: str = "freeleaps-metrics" diff --git a/apps/metrics/webapi/routes/__init__.py b/apps/metrics/webapi/routes/__init__.py index 3a2818a..242272f 100644 --- a/apps/metrics/webapi/routes/__init__.py +++ b/apps/metrics/webapi/routes/__init__.py @@ -1,5 +1,8 @@ from fastapi import APIRouter -from webapi.routes.metrics import router +from webapi.routes.starrocks_metrics import api_router as starrocks_metrics_router +from webapi.routes.prometheus_metrics import api_router as prometheus_metrics_router + api_router = APIRouter() -api_router.include_router(router, tags=["metrics"]) +api_router.include_router(starrocks_metrics_router, prefix="/starrocks", tags=["starrocks-metrics"]) +api_router.include_router(prometheus_metrics_router, prefix="/prometheus", tags=["prometheus-metrics"]) diff --git a/apps/metrics/webapi/routes/metrics/__init__.py b/apps/metrics/webapi/routes/metrics/__init__.py deleted file mode 100644 index 68613d6..0000000 --- a/apps/metrics/webapi/routes/metrics/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from fastapi import APIRouter -from webapi.routes.starrocks_metrics import api_router as starrocks_metrics_router -from webapi.routes.prometheus_metrics import api_router as prometheus_metrics_router - -router = APIRouter() -router.include_router(starrocks_metrics_router, prefix="/metrics", tags=["starrocks-metrics"]) -router.include_router(prometheus_metrics_router, prefix="/metrics", tags=["prometheus-metrics"]) diff --git a/apps/metrics/webapi/routes/metrics/registration_metrics.py b/apps/metrics/webapi/routes/metrics/registration_metrics.py deleted file mode 100644 index e69de29..0000000 diff --git a/apps/metrics/webapi/routes/starrocks_metrics/available_metrics.py b/apps/metrics/webapi/routes/starrocks_metrics/available_metrics.py index 68c4314..c0fd3fc 100644 --- a/apps/metrics/webapi/routes/starrocks_metrics/available_metrics.py +++ b/apps/metrics/webapi/routes/starrocks_metrics/available_metrics.py @@ -1,22 +1,14 @@ from fastapi import APIRouter, HTTPException from common.log.module_logger import ModuleLogger +from backend.services.starrocks_metrics_service import StarRocksMetricsService router = APIRouter() -# Initialize logger +# Initialize service and logger +starrocks_service = StarRocksMetricsService() module_logger = ModuleLogger(__file__) -# Product -> supported StarRocks-backed metrics -SUPPORTED_STARROCKS_METRICS_MAP = { - "freeleaps": [ - "daily_registered_users", - ], - "magicleaps": [ - "daily_registered_users", - ], -} - @router.get("/starrocks/product/{product_id}/available-metrics") async def get_available_metrics(product_id: str): @@ -32,10 +24,14 @@ async def get_available_metrics(product_id: str): f"Getting StarRocks available metrics list for product_id: {product_id}" ) - if product_id not in SUPPORTED_STARROCKS_METRICS_MAP: - raise HTTPException(status_code=404, detail=f"Unknown product_id: {product_id}") - - metrics = SUPPORTED_STARROCKS_METRICS_MAP[product_id] + metrics = starrocks_service.get_available_metrics(product_id) + + if not metrics: + available_products = ", ".join(starrocks_service.get_available_products()) + raise HTTPException( + status_code=404, + detail=f"Unknown product_id: {product_id}. Available products: {available_products}" + ) return { "product_id": product_id, diff --git a/apps/metrics/webapi/routes/starrocks_metrics/metric_info.py b/apps/metrics/webapi/routes/starrocks_metrics/metric_info.py index d376cd0..e94b19e 100644 --- a/apps/metrics/webapi/routes/starrocks_metrics/metric_info.py +++ b/apps/metrics/webapi/routes/starrocks_metrics/metric_info.py @@ -1,22 +1,14 @@ from fastapi import APIRouter, HTTPException from common.log.module_logger import ModuleLogger +from backend.services.starrocks_metrics_service import StarRocksMetricsService router = APIRouter() -# Initialize logger +# Initialize service and logger +starrocks_service = StarRocksMetricsService() module_logger = ModuleLogger(__file__) -# Product -> metric -> description -STARROCKS_METRIC_DESCRIPTIONS = { - "freeleaps": { - "daily_registered_users": "Daily registered users count from StarRocks table dws_daily_registered_users", - }, - "magicleaps": { - "daily_registered_users": "Daily registered users count from StarRocks table dws_daily_registered_users", - }, -} - @router.get("/starrocks/product/{product_id}/metric/{metric_name}/info") async def get_metric_info( @@ -34,18 +26,7 @@ async def get_metric_info( f"Getting StarRocks metric info for metric '{metric_name}' from product '{product_id}'" ) - if product_id not in STARROCKS_METRIC_DESCRIPTIONS: - raise HTTPException(status_code=404, detail=f"Unknown product_id: {product_id}") - - product_metrics = STARROCKS_METRIC_DESCRIPTIONS[product_id] - if metric_name not in product_metrics: - raise HTTPException(status_code=404, detail=f"Unknown metric '{metric_name}' for product '{product_id}'") - - metric_info = { - "product_id": product_id, - "metric_name": metric_name, - "description": product_metrics[metric_name], - } + metric_info = await starrocks_service.get_metric_info(product_id, metric_name) return { "metric_info": metric_info, diff --git a/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py b/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py index 976b9c2..77078f9 100644 --- a/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py +++ b/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py @@ -1,31 +1,31 @@ from fastapi import APIRouter -from typing import Optional, List, Dict, Any +from typing import Optional, List, Dict, Any, Union from pydantic import BaseModel, Field -from datetime import date +from datetime import date, datetime from common.log.module_logger import ModuleLogger -from backend.services.registration_analytics_service import RegistrationService +from backend.services.starrocks_metrics_service import StarRocksMetricsService -class RegistrationDataPoint(BaseModel): - """Single data point in registration time series.""" +class MetricDataPoint(BaseModel): + """Single data point in metric time series.""" date: str = Field(..., description="Date in YYYY-MM-DD format") - value: int = Field(..., description="Number of registered users") - product_id: str = Field(..., description="Product identifier") + value: Union[int, float] = Field(..., description="Metric value") + labels: Dict[str, Any] = Field(default_factory=dict, description="Metric labels") -class RegistrationTimeSeriesResponse(BaseModel): - """Response model for registration time series data.""" +class MetricTimeSeriesResponse(BaseModel): + """Response model for metric time series data.""" metric_name: str = Field(..., description="Name of the queried metric") - data_points: List[RegistrationDataPoint] = Field(..., description="List of data points") + data_points: List[MetricDataPoint] = Field(..., description="List of data points") total_points: int = Field(..., description="Total number of data points") time_range: Dict[str, str] = Field(..., description="Start and end date of the query") - total_registrations: int = Field(..., description="Total number of registrations in the period") -class RegistrationQueryRequest(BaseModel): - """Request model for registration query.""" - product_id: str = Field("freeleaps", description="Product ID to identify which product's data to query") +class MetricQueryRequest(BaseModel): + """Request model for metric query.""" + product_id: str = Field(..., description="Product ID to identify which product's data to query") + metric_name: str = Field(..., description="Name of the metric to query") start_date: str = Field(..., description="Start date in YYYY-MM-DD format") end_date: str = Field(..., description="End date in YYYY-MM-DD format") @@ -33,63 +33,48 @@ class RegistrationQueryRequest(BaseModel): router = APIRouter() # Initialize service and logger -registration_service = RegistrationService() +starrocks_service = StarRocksMetricsService() module_logger = ModuleLogger(__file__) -@router.post("/starrocks/dru_query", response_model=RegistrationTimeSeriesResponse) +@router.post("/starrocks/metrics_query", response_model=MetricTimeSeriesResponse) async def metrics_query( - request: RegistrationQueryRequest + request: MetricQueryRequest ): """ - Query registration time series data. + Query StarRocks metrics time series data. - Returns XY curve data (time series) for user registrations within the given date range. + Returns XY curve data (time series) for the specified metric within the given date range. """ await module_logger.log_info( - f"Querying registration data for product '{request.product_id}' from {request.start_date} to {request.end_date}") + f"Querying metric '{request.metric_name}' from product '{request.product_id}' from {request.start_date} to {request.end_date}") - # Parse dates - handle both YYYY-M-D and YYYY-MM-DD formats - def parse_date(date_str: str) -> date: - try: - return date.fromisoformat(date_str) - except ValueError: - # Try to parse YYYY-M-D format and convert to YYYY-MM-DD - parts = date_str.split('-') - if len(parts) == 3: - year, month, day = parts - return date(int(year), int(month), int(day)) - raise ValueError(f"Invalid date format: {date_str}") - - start_date = parse_date(request.start_date) - end_date = parse_date(request.end_date) - - # Query the registration data - result = registration_service.get_daily_registered_users( - start_date=start_date, - end_date=end_date, - product_id=request.product_id + # Query the metric data + data_points = await starrocks_service.query_metric_by_time_range( + product_id=request.product_id, + metric_name=request.metric_name, + start_date=request.start_date, + end_date=request.end_date ) # Format response - response = RegistrationTimeSeriesResponse( - metric_name="daily_registered_users", + response = MetricTimeSeriesResponse( + metric_name=request.metric_name, data_points=[ - RegistrationDataPoint( - date=date_str, - value=count, - product_id=request.product_id + MetricDataPoint( + date=point["date"], + value=point["value"], + labels=point["labels"] ) - for date_str, count in zip(result.dates, result.counts) + for point in data_points ], - total_points=len(result.dates), + total_points=len(data_points), time_range={ "start": request.start_date, "end": request.end_date - }, - total_registrations=result.total_registrations + } ) await module_logger.log_info( - f"Successfully queried registration data with {len(result.dates)} data points, total registrations: {result.total_registrations}") + f"Successfully queried metric '{request.metric_name}' with {len(data_points)} data points") return response