From d48f72544d140676280ed69fa5d3326f4ccb3a80 Mon Sep 17 00:00:00 2001 From: hazelzhu Date: Thu, 23 Oct 2025 12:54:03 +0800 Subject: [PATCH 1/2] add request metric queries --- .../services/starrocks_metrics_service.py | 160 +++++++++++++++++- .../routes/starrocks_metrics/metrics_query.py | 40 ++++- 2 files changed, 194 insertions(+), 6 deletions(-) diff --git a/apps/metrics/backend/services/starrocks_metrics_service.py b/apps/metrics/backend/services/starrocks_metrics_service.py index fbf3080..01b3b71 100644 --- a/apps/metrics/backend/services/starrocks_metrics_service.py +++ b/apps/metrics/backend/services/starrocks_metrics_service.py @@ -41,6 +41,70 @@ class StarRocksMetricsService: AND product_id = %s ORDER BY date ASC """, + "dcr": """ + SELECT + date, + product_id, + value, + updated_date + FROM dws_dcr + WHERE date >= %s + AND date < %s + AND product_id = %s + ORDER BY date ASC + """, + "mrar": """ + SELECT + date, + product_id, + CASE + WHEN monthly_requests = 0 THEN 0.0 + ELSE (monthly_accepted_requests * 1.0) / monthly_requests + END AS value, + updated_date + FROM dws_mrar + WHERE date >= %s + AND date < %s + AND product_id = %s + ORDER BY date ASC + """, + "trar": """ + SELECT + product_id, + CASE + WHEN total_requests = 0 THEN 0.0 + ELSE (total_accepted_requests * 1.0) / total_requests + END AS value, + updated_date + FROM dws_trar + WHERE product_id = %s + """, + "mrqr": """ + SELECT + date, + product_id, + CASE + WHEN monthly_requests = 0 THEN 0.0 + ELSE (monthly_quoted_requests * 1.0) / monthly_requests + END AS value, + updated_date + FROM dws_mrqr + WHERE date >= %s + AND date < %s + AND product_id = %s + ORDER BY date ASC + """, + "trqr": """ + SELECT + product_id, + CASE + WHEN total_requests = 0 THEN 0.0 + ELSE (total_quoted_requests * 1.0) / total_requests + END AS value, + updated_date + FROM dws_trqr + WHERE product_id = %s + """, }, "magicleaps": { @@ -140,6 +204,14 @@ class StarRocksMetricsService: await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) + # Check if metric need time params + # Starting with "t" indicates a query for the total count since the very first record. + # NOTE: This determination logic is subject to future changes. + if metric_name.startswith('t'): + error_msg = f"Metric '{metric_name}' can not be queried by time range." + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) + # Parse date strings if they are strings if isinstance(start_date, str): try: @@ -284,7 +356,7 @@ class StarRocksMetricsService: result_dict[date_str] = { "date": date_str, - "value": int(value) if value is not None else 0, + "value": value if value is not None else 0, "metric": metric_name, "labels": labels } @@ -373,4 +445,88 @@ class StarRocksMetricsService: "metric_name": metric_name, "sql_query": self.METRIC_SQL_MAP[product_id][metric_name].strip(), "description": f"{metric_name} count from StarRocks table dws_{metric_name}" - } \ No newline at end of file + } + + async def query_metric_by_product_id(self, product_id: str, metric_name: str) -> Dict[str, Any]: + """ + Query metric not suitable for date range (e.g. data related to calculating total records). + + Args: + product_id: Product ID to identify which product's metrics to query + metric_name: Name of the metric to query + + Returns: + List of dictionaries with 'product_id' key. + + Raises: + ValueError: If product_id or metric_name is not found in the SQL mapping + Exception: If StarRocks query fails + + Example: + result = await service.query_metric_by_time_range( + "freeleaps", + "total_request_quoted_rate", + ) + # Returns: [{"date": "freeleaps", "value": 45, "labels": {...}},] + """ + + # Check if product_id exists in the mapping + if product_id not in self.METRIC_SQL_MAP: + available_products = ", ".join(self.get_available_products()) + error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}" + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) + + # Starting with "t" indicates a query for the total count since the very first record. + # NOTE: This determination logic is subject to future changes. + if not metric_name.startswith('t'): + error_msg = f"Metric '{metric_name}' should be queried by time range." + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) + + # Check if metric name exists in the product's mapping + if metric_name not in self.METRIC_SQL_MAP[product_id]: + available_metrics = ", ".join(self.get_available_metrics(product_id)) + error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}" + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) + + # Get the SQL query for the metric + sql_query = self.METRIC_SQL_MAP[product_id][metric_name] + + try: + await self.module_logger.log_info( + f"Querying metric '{metric_name}' from product '{product_id}'") + + # Execute the query + result = await self.starrocks_client.execute_query( + query=sql_query, + params=(product_id) + ) + + # Parse the result and format it + for row in result: + # Get the value + value = row.get("value", 0) + if value is None: + value = 0 + + # Create labels dictionary + labels = { + "product_id": row.get("product_id", product_id), + "metric_type": metric_name, + } + + result_dict = { + "value": value if value is not None else 0, + "metric": metric_name, + "labels": labels + } + + await self.module_logger.log_info( + f"Successfully queried metric '{metric_name}'") + return result_dict + + except Exception as e: + await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}") + raise \ No newline at end of file diff --git a/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py b/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py index a035a05..978a311 100644 --- a/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py +++ b/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py @@ -9,7 +9,7 @@ from backend.services.starrocks_metrics_service import StarRocksMetricsService class MetricDataPoint(BaseModel): """Single data point in metric time series.""" - date: str = Field(..., description="Date in YYYY-MM-DD format") + date: Optional[str] = Field(None, description="Date in YYYY-MM-DD format") value: Union[int, float] = Field(..., description="Metric value") labels: Dict[str, Any] = Field(default_factory=dict, description="Metric labels") @@ -22,14 +22,18 @@ class MetricTimeSeriesResponse(BaseModel): time_range: Dict[str, str] = Field(..., description="Start and end date of the query") -class MetricQueryRequest(BaseModel): - """Request model for metric query.""" +class MetricQueryTimeRequest(BaseModel): + """Request model for metric query by time range.""" product_id: str = Field(..., description="Product ID to identify which product's data to query") metric_name: str = Field(..., description="Name of the metric to query") step: str = Field(..., description="Aggregation step, e.g., 1d or 1m") start_date: str = Field(..., description="Start date in YYYY-MM-DD HH:MM:SS format") end_date: str = Field(..., description="End date in YYYY-MM-DD HH:MM:SS format") +class MetricQueryRequest(BaseModel): + """Request model for metric query without time range.""" + product_id: str = Field(..., description="Product ID to identify which product's data to query") + metric_name: str = Field(..., description="Name of the metric to query") router = APIRouter() @@ -40,7 +44,7 @@ module_logger = ModuleLogger(__file__) @router.post("/starrocks/metrics_query", response_model=MetricTimeSeriesResponse) async def metrics_query( - request: MetricQueryRequest + request: MetricQueryTimeRequest ): """ Query StarRocks metrics time series data. @@ -80,3 +84,31 @@ async def metrics_query( await module_logger.log_info( f"Successfully queried metric '{request.metric_name}' with {len(data_points)} data points") return response + +@router.post("/starrocks/metrics_query_without_timerange", response_model=MetricDataPoint) +async def metrics_query_without_timerange( + request: MetricQueryRequest +): + """ + Query StarRocks metrics time series data. + + Returns XY curve data (time series) for the specified metric within the given date range. + """ + await module_logger.log_info( + f"Querying metric '{request.metric_name}' from product '{request.product_id}'") + + # Query the metric data + result_data = await starrocks_service.query_metric_by_product_id( + product_id=request.product_id, + metric_name=request.metric_name, + ) + + # Format response + response = MetricDataPoint( + value=result_data["value"], + labels=result_data["labels"] + ) + + await module_logger.log_info( + f"Successfully queried metric '{request.metric_name}'") + return response \ No newline at end of file From b1ec14a98413fdcdcad6b2753d02292a250b9cf6 Mon Sep 17 00:00:00 2001 From: hazelzhu Date: Thu, 23 Oct 2025 13:48:35 +0800 Subject: [PATCH 2/2] simplify api routes --- .../services/starrocks_metrics_service.py | 48 ++++++------------ .../routes/starrocks_metrics/metrics_query.py | 49 +++---------------- 2 files changed, 23 insertions(+), 74 deletions(-) diff --git a/apps/metrics/backend/services/starrocks_metrics_service.py b/apps/metrics/backend/services/starrocks_metrics_service.py index 01b3b71..29cb07f 100644 --- a/apps/metrics/backend/services/starrocks_metrics_service.py +++ b/apps/metrics/backend/services/starrocks_metrics_service.py @@ -156,9 +156,9 @@ class StarRocksMetricsService: self, product_id: str, metric_name: str, - step: str, - start_date: Union[str, date], - end_date: Union[str, date] + step: Optional[str], + start_date: Optional[Union[str, date]], + end_date: Optional[Union[str, date]] ) -> List[Dict[str, Any]]: """ Query metric data for a specific date range. @@ -207,10 +207,13 @@ class StarRocksMetricsService: # Check if metric need time params # Starting with "t" indicates a query for the total count since the very first record. # NOTE: This determination logic is subject to future changes. - if metric_name.startswith('t'): - error_msg = f"Metric '{metric_name}' can not be queried by time range." - await self.module_logger.log_error(error_msg) - raise HTTPException(status_code=404, detail=error_msg) + if (start_date is None) or (end_date is None) or (step is None): + if metric_name.startswith('t'): + return await self._query_metric_by_product_id(product_id, metric_name) + else: + error_msg = f"Metric '{metric_name}' should be queried by start date, end date and step." + await self.module_logger.log_error(error_msg) + raise HTTPException(status_code=404, detail=error_msg) # Parse date strings if they are strings if isinstance(start_date, str): @@ -447,7 +450,7 @@ class StarRocksMetricsService: "description": f"{metric_name} count from StarRocks table dws_{metric_name}" } - async def query_metric_by_product_id(self, product_id: str, metric_name: str) -> Dict[str, Any]: + async def _query_metric_by_product_id(self, product_id: str, metric_name: str) -> List[Dict[str, Any]]: """ Query metric not suitable for date range (e.g. data related to calculating total records). @@ -459,7 +462,6 @@ class StarRocksMetricsService: List of dictionaries with 'product_id' key. Raises: - ValueError: If product_id or metric_name is not found in the SQL mapping Exception: If StarRocks query fails Example: @@ -469,27 +471,6 @@ class StarRocksMetricsService: ) # Returns: [{"date": "freeleaps", "value": 45, "labels": {...}},] """ - - # Check if product_id exists in the mapping - if product_id not in self.METRIC_SQL_MAP: - available_products = ", ".join(self.get_available_products()) - error_msg = f"Product '{product_id}' not found in SQL mapping. Available products: {available_products}" - await self.module_logger.log_error(error_msg) - raise HTTPException(status_code=404, detail=error_msg) - - # Starting with "t" indicates a query for the total count since the very first record. - # NOTE: This determination logic is subject to future changes. - if not metric_name.startswith('t'): - error_msg = f"Metric '{metric_name}' should be queried by time range." - await self.module_logger.log_error(error_msg) - raise HTTPException(status_code=404, detail=error_msg) - - # Check if metric name exists in the product's mapping - if metric_name not in self.METRIC_SQL_MAP[product_id]: - available_metrics = ", ".join(self.get_available_metrics(product_id)) - error_msg = f"Metric '{metric_name}' not found in product '{product_id}' SQL mapping. Available metrics: {available_metrics}" - await self.module_logger.log_error(error_msg) - raise HTTPException(status_code=404, detail=error_msg) # Get the SQL query for the metric sql_query = self.METRIC_SQL_MAP[product_id][metric_name] @@ -516,12 +497,13 @@ class StarRocksMetricsService: "product_id": row.get("product_id", product_id), "metric_type": metric_name, } - - result_dict = { + result_dict = [] + result_dict.append({ + "date": None, "value": value if value is not None else 0, "metric": metric_name, "labels": labels - } + }) await self.module_logger.log_info( f"Successfully queried metric '{metric_name}'") diff --git a/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py b/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py index 978a311..0e2eb66 100644 --- a/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py +++ b/apps/metrics/webapi/routes/starrocks_metrics/metrics_query.py @@ -19,21 +19,16 @@ class MetricTimeSeriesResponse(BaseModel): metric_name: str = Field(..., description="Name of the queried metric") data_points: List[MetricDataPoint] = Field(..., description="List of data points") total_points: int = Field(..., description="Total number of data points") - time_range: Dict[str, str] = Field(..., description="Start and end date of the query") + time_range: Dict[Optional[str], Optional[str]] = Field([None, None], description="Start and end date of the query") -class MetricQueryTimeRequest(BaseModel): +class MetricQueryRequest(BaseModel): """Request model for metric query by time range.""" product_id: str = Field(..., description="Product ID to identify which product's data to query") metric_name: str = Field(..., description="Name of the metric to query") - step: str = Field(..., description="Aggregation step, e.g., 1d or 1m") - start_date: str = Field(..., description="Start date in YYYY-MM-DD HH:MM:SS format") - end_date: str = Field(..., description="End date in YYYY-MM-DD HH:MM:SS format") - -class MetricQueryRequest(BaseModel): - """Request model for metric query without time range.""" - product_id: str = Field(..., description="Product ID to identify which product's data to query") - metric_name: str = Field(..., description="Name of the metric to query") + step: Optional[str] = Field(None, description="Aggregation step, e.g., 1d or 1m") + start_date: Optional[str] = Field(None, description="Start date in YYYY-MM-DD HH:MM:SS format") + end_date: Optional[str] = Field(None, description="End date in YYYY-MM-DD HH:MM:SS format") router = APIRouter() @@ -44,7 +39,7 @@ module_logger = ModuleLogger(__file__) @router.post("/starrocks/metrics_query", response_model=MetricTimeSeriesResponse) async def metrics_query( - request: MetricQueryTimeRequest + request: MetricQueryRequest ): """ Query StarRocks metrics time series data. @@ -76,39 +71,11 @@ async def metrics_query( ], total_points=len(data_points), time_range={ - "start": request.start_date, - "end": request.end_date + "start": request.start_date if request.start_date else None, + "end": request.end_date if request.end_date else None } ) await module_logger.log_info( f"Successfully queried metric '{request.metric_name}' with {len(data_points)} data points") - return response - -@router.post("/starrocks/metrics_query_without_timerange", response_model=MetricDataPoint) -async def metrics_query_without_timerange( - request: MetricQueryRequest -): - """ - Query StarRocks metrics time series data. - - Returns XY curve data (time series) for the specified metric within the given date range. - """ - await module_logger.log_info( - f"Querying metric '{request.metric_name}' from product '{request.product_id}'") - - # Query the metric data - result_data = await starrocks_service.query_metric_by_product_id( - product_id=request.product_id, - metric_name=request.metric_name, - ) - - # Format response - response = MetricDataPoint( - value=result_data["value"], - labels=result_data["labels"] - ) - - await module_logger.log_info( - f"Successfully queried metric '{request.metric_name}'") return response \ No newline at end of file