Merge branch 'dev' into Nicolas_service_hub
This commit is contained in:
commit
69b75932f7
13
CHANGELOG.md
13
CHANGELOG.md
@ -1,3 +1,16 @@
|
|||||||
|
# [1.10.0](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.9.0...v1.10.0) (2025-09-24)
|
||||||
|
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* **cache:** add the cache volume ([e9e8630](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/e9e863053d7b568ae466b5cf3d309585d068dfb0))
|
||||||
|
* **cache:** use new defined database ([dc1ebf2](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/dc1ebf2481398313cb9bd18457e3fd44563d8472))
|
||||||
|
* **config:** add configs regarding the tenant middleware ([1ba9a61](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/1ba9a614a0925627189a3c79e5f92144e973760a))
|
||||||
|
* **guide:** guide to use tenant middleware ([065c082](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/065c082aa7dfbd07b46b95c5c3da7c8ba174ee90))
|
||||||
|
* **log:** delete some logs and unused methond, raise the http error ([794536c](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/794536c3727d9f5c00bc5cea8890d22f025459f4))
|
||||||
|
* **name:** use new name ([e726d7e](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/e726d7e7d5fe41a11529591ec7ec5f55ac3c1647))
|
||||||
|
* **tenant-middleware:** add tenant middleware to switch tenant's database ([a2fc3c8](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/commit/a2fc3c8a7ea39f90c4849cea6e817fde2c1fa26d))
|
||||||
|
|
||||||
# [1.9.0](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.8.0...v1.9.0) (2025-09-22)
|
# [1.9.0](https://gitea.freeleaps.mathmast.com/freeleaps/freeleaps-service-hub/compare/v1.8.0...v1.9.0) (2025-09-22)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
import httpx
|
import httpx
|
||||||
from typing import Dict, Any, Optional, Union
|
from typing import Dict, Any, Optional, Union
|
||||||
from datetime import datetime
|
from datetime import datetime, timezone
|
||||||
import json
|
import json
|
||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
|
|
||||||
@ -79,8 +79,8 @@ class PrometheusClient:
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
query: PromQL query string
|
query: PromQL query string
|
||||||
start: Start time (RFC3339 string or datetime)
|
start: Start time (RFC3339 string or datetime, assumed to be UTC)
|
||||||
end: End time (RFC3339 string or datetime)
|
end: End time (RFC3339 string or datetime, assumed to be UTC)
|
||||||
step: Query resolution step width (e.g., "15s", "1m", "1h")
|
step: Query resolution step width (e.g., "15s", "1m", "1h")
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
@ -89,8 +89,8 @@ class PrometheusClient:
|
|||||||
Example:
|
Example:
|
||||||
result = await client.query_range(
|
result = await client.query_range(
|
||||||
"up{job='prometheus'}",
|
"up{job='prometheus'}",
|
||||||
start=datetime.now() - timedelta(hours=1),
|
start=datetime.now(timezone.utc) - timedelta(hours=1),
|
||||||
end=datetime.now(),
|
end=datetime.now(timezone.utc),
|
||||||
step="1m"
|
step="1m"
|
||||||
)
|
)
|
||||||
"""
|
"""
|
||||||
@ -99,21 +99,27 @@ class PrometheusClient:
|
|||||||
"step": step
|
"step": step
|
||||||
}
|
}
|
||||||
|
|
||||||
# Convert datetime to RFC3339 string if needed
|
# Convert datetime to RFC3339 string (assume input is UTC)
|
||||||
if isinstance(start, datetime):
|
if isinstance(start, datetime):
|
||||||
|
# Assume datetime is already UTC, just ensure it's timezone-aware
|
||||||
if start.tzinfo is None:
|
if start.tzinfo is None:
|
||||||
params["start"] = start.isoformat() + "Z"
|
start_utc = start.replace(tzinfo=timezone.utc)
|
||||||
else:
|
else:
|
||||||
params["start"] = start.isoformat()
|
start_utc = start # Assume it's already UTC
|
||||||
|
params["start"] = start_utc.isoformat()
|
||||||
else:
|
else:
|
||||||
|
# Assume string is already in UTC format
|
||||||
params["start"] = start
|
params["start"] = start
|
||||||
|
|
||||||
if isinstance(end, datetime):
|
if isinstance(end, datetime):
|
||||||
|
# Assume datetime is already UTC, just ensure it's timezone-aware
|
||||||
if end.tzinfo is None:
|
if end.tzinfo is None:
|
||||||
params["end"] = end.isoformat() + "Z"
|
end_utc = end.replace(tzinfo=timezone.utc)
|
||||||
else:
|
else:
|
||||||
params["end"] = end.isoformat()
|
end_utc = end # Assume it's already UTC
|
||||||
|
params["end"] = end_utc.isoformat()
|
||||||
else:
|
else:
|
||||||
|
# Assume string is already in UTC format
|
||||||
params["end"] = end
|
params["end"] = end
|
||||||
|
|
||||||
return await self.request("query_range", params)
|
return await self.request("query_range", params)
|
||||||
|
|||||||
@ -1,59 +1,27 @@
|
|||||||
import pymysql
|
import aiomysql
|
||||||
from typing import List, Dict, Any, Optional
|
from typing import List, Dict, Any, Optional
|
||||||
from datetime import date
|
from datetime import date
|
||||||
from common.log.module_logger import ModuleLogger
|
from common.log.module_logger import ModuleLogger
|
||||||
from common.config.app_settings import app_settings
|
from webapi.providers.database import db_pool
|
||||||
|
|
||||||
|
|
||||||
class StarRocksClient:
|
class StarRocksClient:
|
||||||
"""StarRocks database client for querying user registration data"""
|
"""StarRocks database client for querying user registration data using connection pool"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.host = app_settings.STARROCKS_HOST
|
|
||||||
self.port = app_settings.STARROCKS_PORT
|
|
||||||
self.user = app_settings.STARROCKS_USER
|
|
||||||
self.password = app_settings.STARROCKS_PASSWORD
|
|
||||||
self.database = app_settings.STARROCKS_DATABASE
|
|
||||||
self.connection = None
|
|
||||||
self.module_logger = ModuleLogger(__file__)
|
self.module_logger = ModuleLogger(__file__)
|
||||||
|
|
||||||
async def connect(self) -> bool:
|
|
||||||
"""Establish connection to StarRocks database"""
|
|
||||||
try:
|
|
||||||
self.connection = pymysql.connect(
|
|
||||||
host=self.host,
|
|
||||||
port=self.port,
|
|
||||||
user=self.user,
|
|
||||||
password=self.password,
|
|
||||||
database=self.database,
|
|
||||||
charset='utf8mb4',
|
|
||||||
autocommit=True
|
|
||||||
)
|
|
||||||
await self.module_logger.log_info(f"Successfully connected to StarRocks at {self.host}:{self.port}")
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
await self.module_logger.log_error(f"Failed to connect to StarRocks: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def disconnect(self):
|
|
||||||
"""Close database connection"""
|
|
||||||
if self.connection:
|
|
||||||
self.connection.close()
|
|
||||||
self.connection = None
|
|
||||||
await self.module_logger.log_info("Disconnected from StarRocks")
|
|
||||||
|
|
||||||
async def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
|
async def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
|
||||||
"""Execute SQL query and return results"""
|
"""Execute SQL query and return results using connection pool"""
|
||||||
if not self.connection:
|
|
||||||
if not await self.connect():
|
|
||||||
raise Exception("Failed to connect to StarRocks database")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
# Get connection from pool
|
||||||
cursor.execute(query, params)
|
pool = db_pool.get_pool()
|
||||||
results = cursor.fetchall()
|
async with pool.acquire() as conn:
|
||||||
await self.module_logger.log_info(f"Query executed successfully, returned {len(results)} rows")
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||||||
return results
|
await cursor.execute(query, params)
|
||||||
|
results = await cursor.fetchall()
|
||||||
|
await self.module_logger.log_info(f"Query executed successfully, returned {len(results)} rows")
|
||||||
|
return results
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
await self.module_logger.log_error(f"Query execution failed: {e}")
|
await self.module_logger.log_error(f"Query execution failed: {e}")
|
||||||
raise e
|
raise e
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
from typing import Dict, List, Any, Optional, Union
|
from typing import Dict, List, Any, Optional, Union
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta, timezone
|
||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
|
|
||||||
from common.log.module_logger import ModuleLogger
|
from common.log.module_logger import ModuleLogger
|
||||||
@ -24,10 +24,9 @@ class PrometheusMetricsService:
|
|||||||
# Just demo, No Usage
|
# Just demo, No Usage
|
||||||
"disk_usage": "100 - ((node_filesystem_avail_bytes{mountpoint=\"/\"} / node_filesystem_size_bytes{mountpoint=\"/\"}) * 100)",
|
"disk_usage": "100 - ((node_filesystem_avail_bytes{mountpoint=\"/\"} / node_filesystem_size_bytes{mountpoint=\"/\"}) * 100)",
|
||||||
# Average response time for notification HTTP requests
|
# Average response time for notification HTTP requests
|
||||||
"latency_ms": "1000*avg(freeleaps_notification_http_request_duration_seconds_sum{handler!=\"none\"} / freeleaps_notification_http_request_duration_seconds_count)",
|
"latency_ms": "(sum(rate(freeleaps_notification_http_request_duration_seconds_sum{handler!~'/api/_/.*'}[30m])) / sum(rate(freeleaps_notification_http_request_duration_seconds_count{handler!~'/api/_/.*'}[30m]))) * 1000",
|
||||||
# Error rate for 5xx HTTP status codes (stability metric)
|
# Error rate for 5xx HTTP status codes (stability metric)
|
||||||
"reliability": "1-sum(rate(freeleaps_notification_http_requests_total{status=\"5xx\"}[1m]))",
|
"reliability": "(1 - sum(rate(freeleaps_notification_http_requests_total{status=~'5..'}[30m])) / sum(rate(freeleaps_notification_http_requests_total[30m]))) * 1 or vector(1)",
|
||||||
|
|
||||||
},
|
},
|
||||||
"magicleaps": {
|
"magicleaps": {
|
||||||
|
|
||||||
@ -75,6 +74,34 @@ class PrometheusMetricsService:
|
|||||||
"""
|
"""
|
||||||
return list(self.METRIC_PROMQL_MAP.keys())
|
return list(self.METRIC_PROMQL_MAP.keys())
|
||||||
|
|
||||||
|
def _parse_utc_datetime(self, time_input: Union[str, datetime]) -> datetime:
|
||||||
|
"""
|
||||||
|
Parse time input and ensure it's a UTC timezone-aware datetime.
|
||||||
|
Assumes all input times are already in UTC.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
time_input: Time as string (RFC3339) or datetime object, assumed to be UTC
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
UTC timezone-aware datetime object
|
||||||
|
"""
|
||||||
|
if isinstance(time_input, str):
|
||||||
|
# Handle RFC3339 format, assume UTC
|
||||||
|
if time_input.endswith('Z'):
|
||||||
|
return datetime.fromisoformat(time_input.replace('Z', '+00:00'))
|
||||||
|
elif '+' in time_input or time_input.count('-') > 2:
|
||||||
|
return datetime.fromisoformat(time_input)
|
||||||
|
else:
|
||||||
|
# Assume UTC if no timezone specified
|
||||||
|
return datetime.fromisoformat(time_input + '+00:00')
|
||||||
|
else:
|
||||||
|
# Assume datetime is already UTC, just ensure it's timezone-aware
|
||||||
|
if time_input.tzinfo is None:
|
||||||
|
return time_input.replace(tzinfo=timezone.utc)
|
||||||
|
else:
|
||||||
|
# Already timezone-aware, assume it's UTC
|
||||||
|
return time_input
|
||||||
|
|
||||||
async def query_metric_by_time_range(
|
async def query_metric_by_time_range(
|
||||||
self,
|
self,
|
||||||
product_id: str,
|
product_id: str,
|
||||||
@ -89,8 +116,8 @@ class PrometheusMetricsService:
|
|||||||
Args:
|
Args:
|
||||||
product_id: Product ID to identify which product's metrics to query
|
product_id: Product ID to identify which product's metrics to query
|
||||||
metric_name: Name of the metric to query
|
metric_name: Name of the metric to query
|
||||||
start_time: Start time for the query (RFC3339 string or datetime)
|
start_time: Start time for the query (RFC3339 string or datetime, assumed to be UTC)
|
||||||
end_time: End time for the query (RFC3339 string or datetime)
|
end_time: End time for the query (RFC3339 string or datetime, assumed to be UTC)
|
||||||
step: Query resolution step width (e.g., "1m", "5m", "1h")
|
step: Query resolution step width (e.g., "1m", "5m", "1h")
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
@ -104,8 +131,8 @@ class PrometheusMetricsService:
|
|||||||
result = await service.query_metric_by_time_range(
|
result = await service.query_metric_by_time_range(
|
||||||
"freeleaps",
|
"freeleaps",
|
||||||
"cpu_usage",
|
"cpu_usage",
|
||||||
start_time=datetime.now() - timedelta(hours=1),
|
start_time=datetime.now(timezone.utc) - timedelta(hours=1),
|
||||||
end_time=datetime.now(),
|
end_time=datetime.now(timezone.utc),
|
||||||
step="5m"
|
step="5m"
|
||||||
)
|
)
|
||||||
# Returns: [{"date": "2024-01-01T10:00:00Z", "value": 45.2}, ...]
|
# Returns: [{"date": "2024-01-01T10:00:00Z", "value": 45.2}, ...]
|
||||||
@ -124,16 +151,9 @@ class PrometheusMetricsService:
|
|||||||
await self.module_logger.log_error(error_msg)
|
await self.module_logger.log_error(error_msg)
|
||||||
raise HTTPException(status_code=404, detail=error_msg)
|
raise HTTPException(status_code=404, detail=error_msg)
|
||||||
|
|
||||||
# Parse datetime strings if they are strings
|
# Parse time inputs (assume all inputs are UTC)
|
||||||
if isinstance(start_time, str):
|
start_dt = self._parse_utc_datetime(start_time)
|
||||||
start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
|
end_dt = self._parse_utc_datetime(end_time)
|
||||||
else:
|
|
||||||
start_dt = start_time
|
|
||||||
|
|
||||||
if isinstance(end_time, str):
|
|
||||||
end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00'))
|
|
||||||
else:
|
|
||||||
end_dt = end_time
|
|
||||||
|
|
||||||
# Validate time range
|
# Validate time range
|
||||||
if start_dt >= end_dt:
|
if start_dt >= end_dt:
|
||||||
@ -166,7 +186,7 @@ class PrometheusMetricsService:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Parse the result and format it
|
# Parse the result and format it
|
||||||
formatted_data = self._format_query_result(result, metric_name)
|
formatted_data = self._format_query_result(result, metric_name, start_dt, end_dt, step)
|
||||||
|
|
||||||
await self.module_logger.log_info(
|
await self.module_logger.log_info(
|
||||||
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
|
f"Successfully queried metric '{metric_name}' with {len(formatted_data)} data points")
|
||||||
@ -176,16 +196,20 @@ class PrometheusMetricsService:
|
|||||||
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
|
await self.module_logger.log_error(f"Failed to query metric '{metric_name}': {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _format_query_result(self, prometheus_result: Dict[str, Any], metric_name: str) -> List[Dict[str, Any]]:
|
def _format_query_result(self, prometheus_result: Dict[str, Any], metric_name: str, start_dt: datetime = None,
|
||||||
|
end_dt: datetime = None, step: str = "1h") -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Format Prometheus query result into the required format.
|
Format Prometheus query result into the required format and fill missing time points with 0 values.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
prometheus_result: Raw result from Prometheus API
|
prometheus_result: Raw result from Prometheus API
|
||||||
metric_name: Name of the metric being queried
|
metric_name: Name of the metric being queried
|
||||||
|
start_dt: Start time for filling missing data points
|
||||||
|
end_dt: End time for filling missing data points
|
||||||
|
step: Step size for filling missing data points
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of dictionaries with 'date' and 'value' keys
|
List of dictionaries with 'date' and 'value' keys, with missing time points filled with 0 values
|
||||||
"""
|
"""
|
||||||
formatted_data = []
|
formatted_data = []
|
||||||
|
|
||||||
@ -193,6 +217,14 @@ class PrometheusMetricsService:
|
|||||||
data = prometheus_result.get("data", {})
|
data = prometheus_result.get("data", {})
|
||||||
result_type = data.get("resultType", "")
|
result_type = data.get("resultType", "")
|
||||||
|
|
||||||
|
# Check if there are any warnings from Prometheus
|
||||||
|
warnings = prometheus_result.get("warnings", [])
|
||||||
|
if warnings:
|
||||||
|
self.module_logger.log_warning(f"Prometheus warnings: {warnings}")
|
||||||
|
|
||||||
|
# First, collect all data points from Prometheus
|
||||||
|
result_dict = {}
|
||||||
|
|
||||||
if result_type == "matrix":
|
if result_type == "matrix":
|
||||||
# Handle range query results (matrix)
|
# Handle range query results (matrix)
|
||||||
for series in data.get("result", []):
|
for series in data.get("result", []):
|
||||||
@ -200,15 +232,24 @@ class PrometheusMetricsService:
|
|||||||
values = series.get("values", [])
|
values = series.get("values", [])
|
||||||
|
|
||||||
for timestamp, value in values:
|
for timestamp, value in values:
|
||||||
# Convert Unix timestamp to ISO format
|
# Convert Unix timestamp to UTC ISO format
|
||||||
date_str = datetime.fromtimestamp(timestamp).isoformat() + "Z"
|
date_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
|
||||||
|
|
||||||
formatted_data.append({
|
# Handle NaN values properly
|
||||||
|
if value == "NaN" or value is None:
|
||||||
|
formatted_value = None
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
formatted_value = float(value)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
formatted_value = None
|
||||||
|
|
||||||
|
result_dict[date_str] = {
|
||||||
"date": date_str,
|
"date": date_str,
|
||||||
"value": float(value) if value != "NaN" else None,
|
"value": formatted_value,
|
||||||
"metric": metric_name,
|
"metric": metric_name,
|
||||||
"labels": metric_labels
|
"labels": metric_labels
|
||||||
})
|
}
|
||||||
|
|
||||||
elif result_type == "vector":
|
elif result_type == "vector":
|
||||||
# Handle instant query results (vector)
|
# Handle instant query results (vector)
|
||||||
@ -218,17 +259,87 @@ class PrometheusMetricsService:
|
|||||||
value = series.get("value", [None, None])[1]
|
value = series.get("value", [None, None])[1]
|
||||||
|
|
||||||
if timestamp and value:
|
if timestamp and value:
|
||||||
date_str = datetime.fromtimestamp(timestamp).isoformat() + "Z"
|
date_str = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
|
||||||
|
|
||||||
formatted_data.append({
|
# Handle NaN values properly
|
||||||
|
if value == "NaN" or value is None:
|
||||||
|
formatted_value = None
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
formatted_value = float(value)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
formatted_value = None
|
||||||
|
|
||||||
|
result_dict[date_str] = {
|
||||||
"date": date_str,
|
"date": date_str,
|
||||||
"value": float(value) if value != "NaN" else None,
|
"value": formatted_value,
|
||||||
"metric": metric_name,
|
"metric": metric_name,
|
||||||
"labels": metric_labels
|
"labels": metric_labels
|
||||||
})
|
}
|
||||||
|
|
||||||
# Sort by date
|
# If we have start and end times, fill missing time points
|
||||||
formatted_data.sort(key=lambda x: x["date"])
|
if start_dt and end_dt and step:
|
||||||
|
formatted_data = self._fill_missing_time_points(result_dict, start_dt, end_dt, step, metric_name)
|
||||||
|
else:
|
||||||
|
# Just return the data we have, sorted by date
|
||||||
|
formatted_data = list(result_dict.values())
|
||||||
|
formatted_data.sort(key=lambda x: x["date"])
|
||||||
|
|
||||||
|
return formatted_data
|
||||||
|
|
||||||
|
def _fill_missing_time_points(self, result_dict: Dict[str, Dict[str, Any]], start_dt: datetime, end_dt: datetime,
|
||||||
|
step: str, metric_name: str) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Fill missing time points in the time series data with 0 values.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result_dict: Dictionary of existing data points
|
||||||
|
start_dt: Start time for the range
|
||||||
|
end_dt: End time for the range
|
||||||
|
step: Step size (e.g., "1h", "1m", "5m")
|
||||||
|
metric_name: Name of the metric
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of data points with missing time points filled with 0 values
|
||||||
|
"""
|
||||||
|
formatted_data = []
|
||||||
|
|
||||||
|
# Parse step to determine time increment
|
||||||
|
if step.endswith('h'):
|
||||||
|
hours = int(step[:-1])
|
||||||
|
time_increment = timedelta(hours=hours)
|
||||||
|
elif step.endswith('m'):
|
||||||
|
minutes = int(step[:-1])
|
||||||
|
time_increment = timedelta(minutes=minutes)
|
||||||
|
elif step.endswith('d'):
|
||||||
|
minutes = int(step[:-1])
|
||||||
|
time_increment = timedelta(days=minutes)
|
||||||
|
elif step.endswith('s'):
|
||||||
|
seconds = int(step[:-1])
|
||||||
|
time_increment = timedelta(seconds=seconds)
|
||||||
|
else:
|
||||||
|
# Default to 1 hour if step format is not recognized
|
||||||
|
time_increment = timedelta(hours=1)
|
||||||
|
|
||||||
|
# Generate complete time range
|
||||||
|
current_dt = start_dt
|
||||||
|
while current_dt <= end_dt:
|
||||||
|
date_str = current_dt.isoformat()
|
||||||
|
|
||||||
|
if date_str in result_dict:
|
||||||
|
# Use existing data point
|
||||||
|
result_dict[date_str]["date"] = result_dict[date_str]["date"].replace('+00:00', 'Z')
|
||||||
|
formatted_data.append(result_dict[date_str])
|
||||||
|
else:
|
||||||
|
# Fill missing data point with 0 value
|
||||||
|
formatted_data.append({
|
||||||
|
"date": date_str.replace('+00:00', 'Z'),
|
||||||
|
"value": None,
|
||||||
|
"metric": metric_name,
|
||||||
|
"labels": {}
|
||||||
|
})
|
||||||
|
|
||||||
|
current_dt += time_increment
|
||||||
|
|
||||||
return formatted_data
|
return formatted_data
|
||||||
|
|
||||||
|
|||||||
@ -22,7 +22,7 @@ class AppSettings(BaseSettings):
|
|||||||
STARROCKS_DATABASE: str = "freeleaps"
|
STARROCKS_DATABASE: str = "freeleaps"
|
||||||
|
|
||||||
# Prometheus settings
|
# Prometheus settings
|
||||||
PROMETHEUS_ENDPOINT: str = "http://localhost:9090"
|
PROMETHEUS_ENDPOINT: str = "http://kube-prometheus-stack-prometheus.freeleaps-monitoring-system:9090"
|
||||||
|
|
||||||
METRICS_ENABLED: bool = True
|
METRICS_ENABLED: bool = True
|
||||||
PROBES_ENABLED: bool = True
|
PROBES_ENABLED: bool = True
|
||||||
|
|||||||
@ -16,6 +16,6 @@ LOG_BASE_PATH=./logs
|
|||||||
BACKEND_LOG_FILE_NAME=metrics
|
BACKEND_LOG_FILE_NAME=metrics
|
||||||
APPLICATION_ACTIVITY_LOG=metrics-activity
|
APPLICATION_ACTIVITY_LOG=metrics-activity
|
||||||
|
|
||||||
PROMETHEUS_ENDPOINT=http://localhost:9090
|
PROMETHEUS_ENDPOINT=http://localhost:57828
|
||||||
|
|
||||||
METRICS_ENABLED=True
|
METRICS_ENABLED=True
|
||||||
@ -14,4 +14,5 @@ pytest==8.4.1
|
|||||||
pytest-asyncio==0.21.2
|
pytest-asyncio==0.21.2
|
||||||
pymysql==1.1.0
|
pymysql==1.1.0
|
||||||
sqlalchemy==2.0.23
|
sqlalchemy==2.0.23
|
||||||
|
aiomysql==0.2.0
|
||||||
python-dotenv
|
python-dotenv
|
||||||
|
|||||||
@ -4,7 +4,7 @@ from fastapi import FastAPI
|
|||||||
from fastapi.openapi.utils import get_openapi
|
from fastapi.openapi.utils import get_openapi
|
||||||
|
|
||||||
from common.config.app_settings import app_settings
|
from common.config.app_settings import app_settings
|
||||||
from webapi.providers import exception_handler, common, probes, metrics, router
|
from webapi.providers import exception_handler, common, probes, metrics, router, database
|
||||||
from webapi.providers.logger import register_logger
|
from webapi.providers.logger import register_logger
|
||||||
|
|
||||||
|
|
||||||
@ -14,6 +14,7 @@ def create_app() -> FastAPI:
|
|||||||
app = FreeleapsMetricsApp()
|
app = FreeleapsMetricsApp()
|
||||||
|
|
||||||
register_logger()
|
register_logger()
|
||||||
|
register(app, database)
|
||||||
register(app, exception_handler)
|
register(app, exception_handler)
|
||||||
register(app, router)
|
register(app, router)
|
||||||
register(app, common)
|
register(app, common)
|
||||||
|
|||||||
75
apps/metrics/webapi/providers/database.py
Normal file
75
apps/metrics/webapi/providers/database.py
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
import sys
|
||||||
|
|
||||||
|
import aiomysql
|
||||||
|
from typing import Optional
|
||||||
|
from common.config.app_settings import app_settings
|
||||||
|
from common.log.module_logger import ModuleLogger
|
||||||
|
|
||||||
|
|
||||||
|
class DatabaseConnectionPool:
|
||||||
|
"""Database connection pool manager for StarRocks"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.pool: Optional[aiomysql.Pool] = None
|
||||||
|
self.module_logger = ModuleLogger(__file__)
|
||||||
|
|
||||||
|
async def create_pool(self) -> bool:
|
||||||
|
"""Create database connection pool"""
|
||||||
|
try:
|
||||||
|
self.pool = await aiomysql.create_pool(
|
||||||
|
host=app_settings.STARROCKS_HOST,
|
||||||
|
port=app_settings.STARROCKS_PORT,
|
||||||
|
user=app_settings.STARROCKS_USER,
|
||||||
|
password=app_settings.STARROCKS_PASSWORD,
|
||||||
|
db=app_settings.STARROCKS_DATABASE,
|
||||||
|
charset='utf8mb4',
|
||||||
|
autocommit=True,
|
||||||
|
minsize=5, # Minimum number of connections in the pool
|
||||||
|
maxsize=20, # Maximum number of connections in the pool
|
||||||
|
pool_recycle=3600, # Recycle connections after 1 hour
|
||||||
|
echo=False # Set to True for SQL query logging
|
||||||
|
)
|
||||||
|
await self.module_logger.log_info(
|
||||||
|
f"Database connection pool created successfully for StarRocks"
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
await self.module_logger.log_error(f"Failed to create database connection pool: {e}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
async def close_pool(self):
|
||||||
|
"""Close database connection pool"""
|
||||||
|
if self.pool:
|
||||||
|
self.pool.close()
|
||||||
|
await self.pool.wait_closed()
|
||||||
|
self.pool = None
|
||||||
|
await self.module_logger.log_info("Database connection pool closed")
|
||||||
|
|
||||||
|
def get_pool(self) -> aiomysql.Pool:
|
||||||
|
"""Get the database connection pool"""
|
||||||
|
if not self.pool:
|
||||||
|
raise Exception("Database connection pool not initialized")
|
||||||
|
return self.pool
|
||||||
|
|
||||||
|
|
||||||
|
# Global database connection pool instance
|
||||||
|
db_pool = DatabaseConnectionPool()
|
||||||
|
|
||||||
|
|
||||||
|
def register(app):
|
||||||
|
"""Register database provider with FastAPI application"""
|
||||||
|
|
||||||
|
@app.on_event("startup")
|
||||||
|
async def startup():
|
||||||
|
"""Initialize database connection pool on application startup"""
|
||||||
|
await db_pool.create_pool()
|
||||||
|
|
||||||
|
@app.on_event("shutdown")
|
||||||
|
async def shutdown():
|
||||||
|
"""Close database connection pool on application shutdown"""
|
||||||
|
await db_pool.close_pool()
|
||||||
|
|
||||||
|
|
||||||
|
def boot(app):
|
||||||
|
"""Boot database provider (if needed for additional setup)"""
|
||||||
|
pass
|
||||||
Loading…
Reference in New Issue
Block a user