from typing import Dict, List, Any, Optional, Union from datetime import datetime, timedelta, date from fastapi import HTTPException from common.log.module_logger import ModuleLogger from ..infra.external_service.starrocks_client import StarRocksClient class StarRocksMetricsService: """ Service class for querying StarRocks metrics with predefined SQL queries. This service provides a high-level interface for querying metrics data using predefined SQL queries mapped to metric names. """ # Global dictionary mapping metric names to their corresponding SQL queries METRIC_SQL_MAP: Dict[str, Dict[str, str]] = { "freeleaps": { "daily_registered_users": """ SELECT date_id, product_id, registered_cnt, updated_at FROM dws_daily_registered_users WHERE date_id >= %s AND date_id < %s AND product_id = %s ORDER BY date_id ASC """, }, "magicleaps": { } } def __init__(self, starrocks_endpoint: Optional[str] = None): """ Initialize StarRocksMetricsService. Args: starrocks_endpoint: StarRocks server endpoint. If None, uses default from settings. """ self.module_logger = ModuleLogger(__file__) self.starrocks_client = StarRocksClient() def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]: """ Get list of available metric names that have predefined SQL queries. Args: product_id: Optional product ID to filter metrics. If None, returns all metrics from all products. Returns: List of available metric names """ if product_id: if product_id in self.METRIC_SQL_MAP: return list(self.METRIC_SQL_MAP[product_id].keys()) else: return [] else: # Return all metrics from all products all_metrics = [] for product_metrics in self.METRIC_SQL_MAP.values(): all_metrics.extend(product_metrics.keys()) return all_metrics def get_available_products(self) -> List[str]: """ Get list of available product IDs. Returns: List of available product IDs """ return list(self.METRIC_SQL_MAP.keys()) async def query_metric_by_time_range( self, product_id: str, metric_name: str, start_date: Union[str, date], end_date: Union[str, date] ) -> List[Dict[str, Any]]: """ Query metric data for a specific date range. Args: product_id: Product ID to identify which product's metrics to query metric_name: Name of the metric to query start_date: Start date for the query (ISO string or date) end_date: End date for the query (ISO string or date) Returns: List of dictionaries with 'date' and 'value' keys Raises: ValueError: If product_id or metric_name is not found in the SQL mapping Exception: If StarRocks query fails Example: result = await service.query_metric_by_time_range( "freeleaps", "daily_registered_users", start_date=date.today() - timedelta(days=30), end_date=date.today() ) # Returns: [{"date": "2024-01-01", "value": 45, "labels": {...}}, ...] """ # Check if product_id exists in the mapping if product_id not in self.METRIC_SQL_MAP: available_products = ", ".join(self.get_available_products()) error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Check if metric name exists in the product's mapping if metric_name not in self.METRIC_SQL_MAP[product_id]: available_metrics = ", ".join(self.get_available_metrics(product_id)) error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Parse date strings if they are strings if isinstance(start_date, str): try: start_dt = datetime.strptime(start_date, '%Y-%m-%d').date() except ValueError: raise HTTPException( status_code=400, detail="Invalid start_date format. Expected YYYY-MM-DD" ) else: start_dt = start_date if isinstance(end_date, str): try: end_dt = datetime.strptime(end_date, '%Y-%m-%d').date() except ValueError: raise HTTPException( status_code=400, detail="Invalid end_date format. Expected YYYY-MM-DD" ) else: end_dt = end_date # Validate date range if start_dt >= end_dt: raise HTTPException( status_code=400, detail="Start date must be before end date" ) # Check date range is not too large (max 1 year) time_diff = end_dt - start_dt if time_diff > timedelta(days=365): raise HTTPException( status_code=400, detail="Date range cannot exceed 1 year" ) # Get the SQL query for the metric sql_query = self.METRIC_SQL_MAP[product_id][metric_name] try: await self.module_logger.log_info( f"Querying metric '{metric_name}' from product '{product_id}' from {start_dt} to {end_dt}") # Execute the query result = self.starrocks_client.execute_query( query=sql_query, params=(start_dt, end_dt, product_id) ) # Parse the result and format it formatted_data = self._format_query_result(result, metric_name, product_id) await self.module_logger.log_info( f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points") return formatted_data except Exception as e: await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}") raise def _format_query_result(self, starrocks_result: List[Dict[str, Any]], metric_name: str, product_id: str) -> List[Dict[str, Any]]: """ Format StarRocks query result into the required format. Args: starrocks_result: Raw result from StarRocks query metric_name: Name of the metric being queried product_id: Product ID for the metric Returns: List of dictionaries with 'date' and 'value' keys """ formatted_data = [] for row in starrocks_result: # Format the date date_value = row.get("date_id") if date_value: if isinstance(date_value, str): date_str = date_value else: date_str = str(date_value) else: continue # Get the value value = row.get("registered_cnt", 0) if value is None: value = 0 # Create labels dictionary labels = { "product_id": row.get("product_id", product_id), "metric_type": metric_name } formatted_data.append({ "date": date_str, "value": int(value) if value is not None else 0, "metric": metric_name, "labels": labels }) # Sort by date formatted_data.sort(key=lambda x: x["date"]) return formatted_data async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]: """ Get information about a specific metric including its SQL query. Args: product_id: Product ID to identify which product's metrics to query metric_name: Name of the metric Returns: Dictionary containing metric information Raises: ValueError: If product_id or metric_name is not found in the SQL mapping """ # Check if product_id exists in the mapping if product_id not in self.METRIC_SQL_MAP: available_products = ", ".join(self.get_available_products()) error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Check if metric name exists in the product's mapping if metric_name not in self.METRIC_SQL_MAP[product_id]: available_metrics = ", ".join(self.get_available_metrics(product_id)) error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) return { "product_id": product_id, "metric_name": metric_name, "sql_query": self.METRIC_SQL_MAP[product_id][metric_name].strip(), "description": "Daily registered users count from StarRocks table dws_daily_registered_users" }