from typing import Dict, List, Any, Optional, Union from datetime import datetime, timedelta, timezone from fastapi import HTTPException from common.log.module_logger import ModuleLogger from ..infra.external_service.prometheus_client import PrometheusClient class PrometheusMetricsService: """ Service class for querying Prometheus metrics with predefined PromQL queries. This service provides a high-level interface for querying metrics data using predefined PromQL queries mapped to metric names. """ # Global dictionary mapping metric names to their corresponding PromQL queries METRIC_PROMQL_MAP: Dict[str, str] = { "freeleaps": { # Just demo, No Usage "cpu_usage": "100 - (avg by (instance) (irate(node_cpu_seconds_total{mode=\"idle\"}[5m])) * 100)", # Just demo, No Usage "memory_usage": "100 - ((node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100)", # Just demo, No Usage "disk_usage": "100 - ((node_filesystem_avail_bytes{mountpoint=\"/\"} / node_filesystem_size_bytes{mountpoint=\"/\"}) * 100)", # Average response time for notification HTTP requests "latency_ms": "(sum(rate(freeleaps_notification_http_request_duration_seconds_sum{handler!~'/api/_/.*'}[30m])) / sum(rate(freeleaps_notification_http_request_duration_seconds_count{handler!~'/api/_/.*'}[30m]))) * 1000", # Error rate for 5xx HTTP status codes (stability metric) "reliability": "(1 - sum(rate(freeleaps_notification_http_requests_total{status=~'5..'}[30m])) / sum(rate(freeleaps_notification_http_requests_total[30m]))) * 1 or vector(1)", }, "magicleaps": { } } def __init__(self, prometheus_endpoint: Optional[str] = None): """ Initialize PrometheusMetricsService. Args: prometheus_endpoint: Prometheus server endpoint. If None, uses default from settings. """ self.module_logger = ModuleLogger(__file__) self.prometheus_client = PrometheusClient(prometheus_endpoint) def get_available_metrics(self, product_id: Optional[str] = None) -> List[str]: """ Get list of available metric names that have predefined PromQL queries. Args: product_id: Optional product ID to filter metrics. If None, returns all metrics from all products. Returns: List of available metric names """ if product_id: if product_id in self.METRIC_PROMQL_MAP: return list(self.METRIC_PROMQL_MAP[product_id].keys()) else: return [] else: # Return all metrics from all products all_metrics = [] for product_metrics in self.METRIC_PROMQL_MAP.values(): all_metrics.extend(product_metrics.keys()) return all_metrics def get_available_products(self) -> List[str]: """ Get list of available product IDs. Returns: List of available product IDs """ return list(self.METRIC_PROMQL_MAP.keys()) def _parse_utc_datetime(self, time_input: Union[str, datetime]) -> datetime: """ Parse time input and ensure it's a UTC timezone-aware datetime. Assumes all input times are already in UTC. Args: time_input: Time as string (RFC3339) or datetime object, assumed to be UTC Returns: UTC timezone-aware datetime object """ if isinstance(time_input, str): # Handle RFC3339 format, assume UTC if time_input.endswith('Z'): return datetime.fromisoformat(time_input.replace('Z', '+00:00')) elif '+' in time_input or time_input.count('-') > 2: return datetime.fromisoformat(time_input) else: # Assume UTC if no timezone specified return datetime.fromisoformat(time_input + '+00:00') else: # Assume datetime is already UTC, just ensure it's timezone-aware if time_input.tzinfo is None: return time_input.replace(tzinfo=timezone.utc) else: # Already timezone-aware, assume it's UTC return time_input async def query_metric_by_time_range( self, product_id: str, metric_name: str, start_time: Union[str, datetime], end_time: Union[str, datetime], step: str = "1m" ) -> List[Dict[str, Any]]: """ Query metric data for a specific time range. Args: product_id: Product ID to identify which product's metrics to query metric_name: Name of the metric to query start_time: Start time for the query (RFC3339 string or datetime, assumed to be UTC) end_time: End time for the query (RFC3339 string or datetime, assumed to be UTC) step: Query resolution step width (e.g., "1m", "5m", "1h") Returns: List of dictionaries with 'date' and 'value' keys Raises: ValueError: If product_id or metric_name is not found in the PromQL mapping Exception: If Prometheus query fails Example: result = await service.query_metric_by_time_range( "freeleaps", "cpu_usage", start_time=datetime.now(timezone.utc) - timedelta(hours=1), end_time=datetime.now(timezone.utc), step="5m" ) # Returns: [{"date": "2024-01-01T10:00:00Z", "value": 45.2}, ...] """ # Check if product_id exists in the mapping if product_id not in self.METRIC_PROMQL_MAP: available_products = ", ".join(self.get_available_products()) error_msg = f"Product '{product_id}' not found in PromQL mapping. Available products: {available_products}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Check if metric name exists in the product's mapping if metric_name not in self.METRIC_PROMQL_MAP[product_id]: available_metrics = ", ".join(self.get_available_metrics(product_id)) error_msg = f"Metric '{metric_name}' not found in product '{product_id}' PromQL mapping. Available metrics: {available_metrics}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Parse time inputs (assume all inputs are UTC) start_dt = self._parse_utc_datetime(start_time) end_dt = self._parse_utc_datetime(end_time) # Validate time range if start_dt >= end_dt: raise HTTPException( status_code=400, detail="Start time must be before end time" ) # Check time range is not too large (max 7 days for detailed queries) time_diff = end_dt - start_dt if time_diff > timedelta(days=7): raise HTTPException( status_code=400, detail="Time range cannot exceed 7 days for detailed queries" ) # Get the PromQL query for the metric promql_query = self.METRIC_PROMQL_MAP[product_id][metric_name] try: await self.module_logger.log_info( f"Querying metric '{metric_name}' from product '{product_id}' with PromQL: {promql_query}") # Execute the range query result = await self.prometheus_client.query_range( query=promql_query, start=start_dt, end=end_dt, step=step ) # Parse the result and format it formatted_data = self._format_query_result(result, metric_name, start_dt, end_dt, step) await self.module_logger.log_info( f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points") return formatted_data except Exception as e: await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}") raise def _format_query_result(self, prometheus_result: Dict[str, Any], metric_name: str, start_dt: datetime = None, end_dt: datetime = None, step: str = "1h") -> List[Dict[str, Any]]: """ Format Prometheus query result into the required format and fill missing time points with 0 values. Args: prometheus_result: Raw result from Prometheus API metric_name: Name of the metric being queried start_dt: Start time for filling missing data points end_dt: End time for filling missing data points step: Step size for filling missing data points Returns: List of dictionaries with 'date' and 'value' keys, with missing time points filled with 0 values """ formatted_data = [] # Extract data from Prometheus result data = prometheus_result.get("data", {}) result_type = data.get("resultType", "") # Check if there are any warnings from Prometheus warnings = prometheus_result.get("warnings", []) if warnings: self.module_logger.log_warning(f"Prometheus warnings: {warnings}") # First, collect all data points from Prometheus result_dict = {} if result_type == "matrix": # Handle range query results (matrix) for series in data.get("result", []): metric_labels = series.get("metric", {}) values = series.get("values", []) for timestamp, value in values: # Convert Unix timestamp to UTC ISO format date_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat() # Handle NaN values properly if value == "NaN" or value is None: formatted_value = None else: try: formatted_value = float(value) except (ValueError, TypeError): formatted_value = None result_dict[date_str] = { "date": date_str, "value": formatted_value, "metric": metric_name, "labels": metric_labels } elif result_type == "vector": # Handle instant query results (vector) for series in data.get("result", []): metric_labels = series.get("metric", {}) timestamp = series.get("value", [None, None])[0] value = series.get("value", [None, None])[1] if timestamp and value: date_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat() # Handle NaN values properly if value == "NaN" or value is None: formatted_value = None else: try: formatted_value = float(value) except (ValueError, TypeError): formatted_value = None result_dict[date_str] = { "date": date_str, "value": formatted_value, "metric": metric_name, "labels": metric_labels } # If we have start and end times, fill missing time points if start_dt and end_dt and step: formatted_data = self._fill_missing_time_points(result_dict, start_dt, end_dt, step, metric_name) else: # Just return the data we have, sorted by date formatted_data = list(result_dict.values()) formatted_data.sort(key=lambda x: x["date"]) return formatted_data def _fill_missing_time_points(self, result_dict: Dict[str, Dict[str, Any]], start_dt: datetime, end_dt: datetime, step: str, metric_name: str) -> List[Dict[str, Any]]: """ Fill missing time points in the time series data with 0 values. Args: result_dict: Dictionary of existing data points start_dt: Start time for the range end_dt: End time for the range step: Step size (e.g., "1h", "1m", "5m") metric_name: Name of the metric Returns: List of data points with missing time points filled with 0 values """ formatted_data = [] # Parse step to determine time increment if step.endswith('h'): hours = int(step[:-1]) time_increment = timedelta(hours=hours) elif step.endswith('m'): minutes = int(step[:-1]) time_increment = timedelta(minutes=minutes) elif step.endswith('d'): minutes = int(step[:-1]) time_increment = timedelta(days=minutes) elif step.endswith('s'): seconds = int(step[:-1]) time_increment = timedelta(seconds=seconds) else: # Default to 1 hour if step format is not recognized time_increment = timedelta(hours=1) # Generate complete time range current_dt = start_dt while current_dt <= end_dt: date_str = current_dt.isoformat() if date_str in result_dict: # Use existing data point result_dict[date_str]["date"] = result_dict[date_str]["date"].replace('+00:00', 'Z') formatted_data.append(result_dict[date_str]) else: # Fill missing data point with 0 value formatted_data.append({ "date": date_str.replace('+00:00', 'Z'), "value": None, "metric": metric_name, "labels": {} }) current_dt += time_increment return formatted_data async def get_metric_info(self, product_id: str, metric_name: str) -> Dict[str, Any]: """ Get information about a specific metric including its PromQL query. Args: product_id: Product ID to identify which product's metrics to query metric_name: Name of the metric Returns: Dictionary containing metric information Raises: ValueError: If product_id or metric_name is not found in the PromQL mapping """ # Check if product_id exists in the mapping if product_id not in self.METRIC_PROMQL_MAP: available_products = ", ".join(self.get_available_products()) error_msg = f"Product '{product_id}' not found in PromQL mapping. Available products: {available_products}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) # Check if metric name exists in the product's mapping if metric_name not in self.METRIC_PROMQL_MAP[product_id]: available_metrics = ", ".join(self.get_available_metrics(product_id)) error_msg = f"Metric '{metric_name}' not found in product '{product_id}' PromQL mapping. Available metrics: {available_metrics}" await self.module_logger.log_error(error_msg) raise HTTPException(status_code=404, detail=error_msg) return { "product_id": product_id, "metric_name": metric_name, "promql_query": self.METRIC_PROMQL_MAP[product_id][metric_name], "description": f"PromQL query for {metric_name} metric in product {product_id}" }