from typing import Dict, List, Any, Optional, Union from datetime import datetime, timedelta, date from fastapi import HTTPException from common.log.module_logger import ModuleLogger from ..infra.external_service.starrocks_client import StarRocksClient class StarRocksMetricsService: """ Service class for querying StarRocks metrics with predefined SQL queries. This service provides a high-level interface for querying metrics data using predefined SQL queries mapped to metric names. """ # Global dictionary mapping metric names to their corresponding SQL queries METRIC_SQL_MAP: Dict[str, Dict[str, str]] = { "freeleaps": { "dru": """ SELECT date, product_id, value, updated_date FROM dws_dru WHERE date >= %s AND date < %s AND product_id = %s ORDER BY date ASC """, "mru": """ SELECT date, product_id, value, updated_date FROM dws_mru WHERE date >= %s AND date < %s AND product_id = %s ORDER BY date ASC """, "dcr": """ SELECT date, product_id, value, updated_date FROM dws_dcr WHERE date >= %s AND date < %s AND product_id = %s ORDER BY date ASC """, "mrar": """ SELECT date, product_id, CASE WHEN monthly_requests = 0 THEN 0.0 ELSE (monthly_accepted_requests * 1.0) / monthly_requests END AS value, updated_date FROM dws_mrar WHERE date >= %s AND date < %s AND product_id = %s ORDER BY date ASC """, "trar": """ SELECT product_id, CASE WHEN total_requests = 0 THEN 0.0 ELSE (total_accepted_requests * 1.0) / total_requests END AS value, updated_date FROM dws_trar WHERE product_id = %s """, "mrqr": """ SELECT date, product_id, CASE WHEN monthly_requests = 0 THEN 0.0 ELSE (monthly_quoted_requests * 1.0) / monthly_requests END AS value, updated_date FROM dws_mrqr WHERE date >= %s AND date < %s AND product_id = %s ORDER BY date ASC """, "trqr": """ SELECT product_id, CASE WHEN total_requests = 0 THEN 0.0 ELSE (total_quoted_requests * 1.0) / total_requests END AS value, updated_date FROM dws_trqr WHERE product_id = %s """, }, "magicleaps": { } } def __init__(self, starrocks_endpoint: Optional[str] = None): """ Initialize StarRocksMetricsService. Args: starrocks_endpoint: StarRocks server endpoint. If None, uses default from settings. """ self.module_logger = ModuleLogger(__file__) self.starrocks_client = StarRocksClient() def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]: """ Get list of available metric names that have predefined SQL queries. Args: product_id: Optional product ID to filter metrics. If None, returns all metrics from all products. Returns: List of available metric names """ if product_id: if product_id in self.METRIC_SQL_MAP: return list(self.METRIC_SQL_MAP[product_id].keys()) else: return [] else: # Return all metrics from all products all_metrics = [] for product_metrics in self.METRIC_SQL_MAP.values(): all_metrics.extend(product_metrics.keys()) return all_metrics def get_available_products(self) -> List[str]: """ Get list of available product IDs. Returns: List of available product IDs """ return list(self.METRIC_SQL_MAP.keys()) async def query_metric_by_time_range( self, product_id: str, metric_name: str, step: str, start_date: Union[str, date], end_date: Union[str, date] ) -> List[Dict[str, Any]]: """ Query metric data for a specific date range. This method will fill missing dates in the range with 0 values to ensure a complete time series with no gaps. Args: product_id: Product ID to identify which product's metrics to query metric_name: Name of the metric to query start_date: Start date for the query (ISO string or date) end_date: End date for the query (ISO string or date) Returns: List of dictionaries with 'date' and 'value' keys. Missing dates in the range will be filled with 0 values. Raises: ValueError: If product_id or metric_name is not found in the SQL mapping Exception: If StarRocks query fails Example: result = await service.query_metric_by_time_range( "freeleaps", "daily_registered_users", start_date=date.today() - timedelta(days=30), end_date=date.today() ) # Returns: [{"date": "2024-01-01", "value": 45, "labels": {...}}, # {"date": "2024-01-02", "value": 0, "labels": {...}}, ...] """ # Check if product_id exists in the mapping if product_id not in self.METRIC_SQL_MAP: available_products = ", ".join(self.get_available_products()) error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Check if metric name exists in the product's mapping if metric_name not in self.METRIC_SQL_MAP[product_id]: available_metrics = ", ".join(self.get_available_metrics(product_id)) error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Check if metric need time params # Starting with "t" indicates a query for the total count since the very first record. # NOTE: This determination logic is subject to future changes. if metric_name.startswith('t'): error_msg = f"Metric '{metric_name}' can not be queried by time range." await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Parse date strings if they are strings if isinstance(start_date, str): try: start_dt = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S') except ValueError: raise HTTPException( status_code=400, detail="Invalid start_date format. Expected YYYY-MM-DD HH:MM:SS" ) else: start_dt = start_date if isinstance(end_date, str): try: end_dt = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S') except ValueError: raise HTTPException( status_code=400, detail="Invalid start_date format. Expected YYYY-MM-DD HH:MM:SS" ) else: end_dt = end_date # Normalize and validate step (default '1d') step = step or '1d' if step not in {"1d", "1m"}: raise HTTPException( status_code=400, detail="Invalid step. Supported values are '1d' and '1m'" ) # Validate date range if start_dt >= end_dt: raise HTTPException( status_code=400, detail="Start date must be before end date" ) # Check date range is not too large (max 1 year) time_diff = end_dt - start_dt if time_diff > timedelta(days=365): raise HTTPException( status_code=400, detail="Date range cannot exceed 1 year" ) # Get the SQL query for the metric sql_query = self.METRIC_SQL_MAP[product_id][metric_name] try: await self.module_logger.log_info( f"Querying metric '{metric_name}' from product '{product_id}' from {start_dt} to {end_dt}") # Execute the query result = await self.starrocks_client.execute_query( query=sql_query, params=(start_dt, end_dt, product_id) ) # Parse the result and format it formatted_data = self._format_query_result( starrocks_result=result, metric_name=metric_name, product_id=product_id, step=step, start_date=start_dt, end_date=end_dt ) await self.module_logger.log_info( f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points") return formatted_data except Exception as e: await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}") raise def _format_query_result(self, starrocks_result: List[Dict[str, Any]], metric_name: str, product_id: str, step: str, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """ Format StarRocks query result into the required format and fill missing dates with 0 values. Args: starrocks_result: Raw result from StarRocks query metric_name: Name of the metric being queried product_id: Product ID for the metric start_date: Start date of the query range end_date: End date of the query range Returns: List of dictionaries with 'date' and 'value' keys, with missing dates filled with 0 """ # First, process the query results and create a dictionary for quick lookup result_dict = {} for row in starrocks_result: # Normalize the date according to step granularity date_value = row.get("date") if not date_value: continue def month_start(d: datetime) -> datetime: return datetime(d.year, d.month, 1) # Parse and normalize if isinstance(date_value, str): parsed_dt = None for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'): try: parsed_dt = datetime.strptime(date_value, fmt) break except ValueError: continue if parsed_dt is None: date_str = str(date_value) else: if step == '1m': date_str = month_start(parsed_dt).strftime('%Y-%m-01 00:00:00') else: date_str = parsed_dt.strftime('%Y-%m-%d') + ' 00:00:00' else: if hasattr(date_value, 'strftime'): dt_obj = date_value if step == '1m': date_str = month_start(dt_obj).strftime('%Y-%m-01 00:00:00') else: if hasattr(dt_obj, 'date'): dt_obj = dt_obj.date() date_str = dt_obj.strftime('%Y-%m-%d') + ' 00:00:00' else: date_str = str(date_value) # Get the value value = row.get("value", 0) if value is None: value = 0 # Create labels dictionary labels = { "product_id": row.get("product_id", product_id), "metric_type": metric_name } result_dict[date_str] = { "date": date_str, "value": value if value is not None else 0, "metric": metric_name, "labels": labels } # Generate complete range and fill missing points with 0 formatted_data = [] if step == '1d': current_dt = datetime(start_date.year, start_date.month, start_date.day) end_dt_exclusive = datetime(end_date.year, end_date.month, end_date.day) while current_dt < end_dt_exclusive: date_str = current_dt.strftime('%Y-%m-%d') + ' 00:00:00' if date_str in result_dict: formatted_data.append(result_dict[date_str]) else: labels = { "product_id": product_id, "metric_type": metric_name } formatted_data.append({ "date": date_str, "value": 0, "metric": metric_name, "labels": labels }) current_dt += timedelta(days=1) elif step == '1m': def month_start(d: datetime) -> datetime: return datetime(d.year, d.month, 1) def add_one_month(d: datetime) -> datetime: year = d.year + (1 if d.month == 12 else 0) month = 1 if d.month == 12 else d.month + 1 return datetime(year, month, 1) current_dt = month_start(start_date) end_month_exclusive = month_start(end_date) while current_dt < end_month_exclusive: date_str = current_dt.strftime('%Y-%m-01 00:00:00') if date_str in result_dict: formatted_data.append(result_dict[date_str]) else: labels = { "product_id": product_id, "metric_type": metric_name } formatted_data.append({ "date": date_str, "value": 0, "metric": metric_name, "labels": labels }) current_dt = add_one_month(current_dt) return formatted_data async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]: """ Get information about a specific metric including its SQL query. Args: product_id: Product ID to identify which product's metrics to query metric_name: Name of the metric Returns: Dictionary containing metric information Raises: ValueError: If product_id or metric_name is not found in the SQL mapping """ # Check if product_id exists in the mapping if product_id not in self.METRIC_SQL_MAP: available_products = ", ".join(self.get_available_products()) error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Check if metric name exists in the product's mapping if metric_name not in self.METRIC_SQL_MAP[product_id]: available_metrics = ", ".join(self.get_available_metrics(product_id)) error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) return { "product_id": product_id, "metric_name": metric_name, "sql_query": self.METRIC_SQL_MAP[product_id][metric_name].strip(), "description": f"{metric_name} count from StarRocks table dws_{metric_name}" } async def query_metric_by_product_id(self, product_id: str, metric_name: str) -> Dict[str, Any]: """ Query metric not suitable for date range (e.g. data related to calculating total records). Args: product_id: Product ID to identify which product's metrics to query metric_name: Name of the metric to query Returns: List of dictionaries with 'product_id' key. Raises: ValueError: If product_id or metric_name is not found in the SQL mapping Exception: If StarRocks query fails Example: result = await service.query_metric_by_time_range( "freeleaps", "total_request_quoted_rate", ) # Returns: [{"date": "freeleaps", "value": 45, "labels": {...}},] """ # Check if product_id exists in the mapping if product_id not in self.METRIC_SQL_MAP: available_products = ", ".join(self.get_available_products()) error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Starting with "t" indicates a query for the total count since the very first record. # NOTE: This determination logic is subject to future changes. if not metric_name.startswith('t'): error_msg = f"Metric '{metric_name}' should be queried by time range." await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Check if metric name exists in the product's mapping if metric_name not in self.METRIC_SQL_MAP[product_id]: available_metrics = ", ".join(self.get_available_metrics(product_id)) error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Get the SQL query for the metric sql_query = self.METRIC_SQL_MAP[product_id][metric_name] try: await self.module_logger.log_info( f"Querying metric '{metric_name}' from product '{product_id}'") # Execute the query result = await self.starrocks_client.execute_query( query=sql_query, params=(product_id) ) # Parse the result and format it for row in result: # Get the value value = row.get("value", 0) if value is None: value = 0 # Create labels dictionary labels = { "product_id": row.get("product_id", product_id), "metric_type": metric_name, } result_dict = { "value": value if value is not None else 0, "metric": metric_name, "labels": labels } await self.module_logger.log_info( f"Successfully queried metric '{metric_name}'") return result_dict except Exception as e: await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}") raise