From eae025576674b6f788f178d0496ac221e4bc8878 Mon Sep 17 00:00:00 2001 From: icecheng Date: Thu, 25 Sep 2025 18:25:51 +0800 Subject: [PATCH] feat: add starrock connect pool --- .../external_service/starrocks_client.py | 56 +++----------- apps/metrics/requirements.txt | 1 + apps/metrics/webapi/bootstrap/application.py | 3 +- apps/metrics/webapi/providers/database.py | 74 +++++++++++++++++++ 4 files changed, 89 insertions(+), 45 deletions(-) create mode 100644 apps/metrics/webapi/providers/database.py diff --git a/apps/metrics/backend/infra/external_service/starrocks_client.py b/apps/metrics/backend/infra/external_service/starrocks_client.py index d88de67..b3ffac7 100644 --- a/apps/metrics/backend/infra/external_service/starrocks_client.py +++ b/apps/metrics/backend/infra/external_service/starrocks_client.py @@ -1,59 +1,27 @@ -import pymysql +import aiomysql from typing import List, Dict, Any, Optional from datetime import date from common.log.module_logger import ModuleLogger -from common.config.app_settings import app_settings +from webapi.providers.database import db_pool class StarRocksClient: - """StarRocks database client for querying user registration data""" + """StarRocks database client for querying user registration data using connection pool""" def __init__(self): - self.host = app_settings.STARROCKS_HOST - self.port = app_settings.STARROCKS_PORT - self.user = app_settings.STARROCKS_USER - self.password = app_settings.STARROCKS_PASSWORD - self.database = app_settings.STARROCKS_DATABASE - self.connection = None self.module_logger = ModuleLogger(__file__) - async def connect(self) -> bool: - """Establish connection to StarRocks database""" - try: - self.connection = pymysql.connect( - host=self.host, - port=self.port, - user=self.user, - password=self.password, - database=self.database, - charset='utf8mb4', - autocommit=True - ) - await self.module_logger.log_info(f"Successfully connected to StarRocks at {self.host}:{self.port}") - return True - except Exception as e: - await self.module_logger.log_error(f"Failed to connect to StarRocks: {e}") - return False - - async def disconnect(self): - """Close database connection""" - if self.connection: - self.connection.close() - self.connection = None - await self.module_logger.log_info("Disconnected from StarRocks") - async def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]: - """Execute SQL query and return results""" - if not self.connection: - if not await self.connect(): - raise Exception("Failed to connect to StarRocks database") - + """Execute SQL query and return results using connection pool""" try: - with self.connection.cursor(pymysql.cursors.DictCursor) as cursor: - cursor.execute(query, params) - results = cursor.fetchall() - await self.module_logger.log_info(f"Query executed successfully, returned {len(results)} rows") - return results + # Get connection from pool + pool = db_pool.get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cursor: + await cursor.execute(query, params) + results = await cursor.fetchall() + await self.module_logger.log_info(f"Query executed successfully, returned {len(results)} rows") + return results except Exception as e: await self.module_logger.log_error(f"Query execution failed: {e}") raise e diff --git a/apps/metrics/requirements.txt b/apps/metrics/requirements.txt index 1422c6b..251c9bc 100644 --- a/apps/metrics/requirements.txt +++ b/apps/metrics/requirements.txt @@ -14,4 +14,5 @@ pytest==8.4.1 pytest-asyncio==0.21.2 pymysql==1.1.0 sqlalchemy==2.0.23 +aiomysql==0.2.0 python-dotenv diff --git a/apps/metrics/webapi/bootstrap/application.py b/apps/metrics/webapi/bootstrap/application.py index 66d4a65..34d997f 100644 --- a/apps/metrics/webapi/bootstrap/application.py +++ b/apps/metrics/webapi/bootstrap/application.py @@ -4,7 +4,7 @@ from fastapi import FastAPI from fastapi.openapi.utils import get_openapi from common.config.app_settings import app_settings -from webapi.providers import exception_handler, common, probes, metrics, router +from webapi.providers import exception_handler, common, probes, metrics, router, database from webapi.providers.logger import register_logger @@ -14,6 +14,7 @@ def create_app() -> FastAPI: app = FreeleapsMetricsApp() register_logger() + register(app, database) register(app, exception_handler) register(app, router) register(app, common) diff --git a/apps/metrics/webapi/providers/database.py b/apps/metrics/webapi/providers/database.py new file mode 100644 index 0000000..c5b09be --- /dev/null +++ b/apps/metrics/webapi/providers/database.py @@ -0,0 +1,74 @@ +import aiomysql +from typing import Optional +from common.config.app_settings import app_settings +from common.log.module_logger import ModuleLogger + + +class DatabaseConnectionPool: + """Database connection pool manager for StarRocks""" + + def __init__(self): + self.pool: Optional[aiomysql.Pool] = None + self.module_logger = ModuleLogger(__file__) + + async def create_pool(self) -> bool: + """Create database connection pool""" + try: + self.pool = await aiomysql.create_pool( + host=app_settings.STARROCKS_HOST, + port=app_settings.STARROCKS_PORT, + user=app_settings.STARROCKS_USER, + password=app_settings.STARROCKS_PASSWORD, + db=app_settings.STARROCKS_DATABASE, + charset='utf8mb4', + autocommit=True, + minsize=5, # Minimum number of connections in the pool + maxsize=20, # Maximum number of connections in the pool + pool_recycle=3600, # Recycle connections after 1 hour + echo=False # Set to True for SQL query logging + ) + await self.module_logger.log_info( + f"Database connection pool created successfully for StarRocks at " + f"{app_settings.STARROCKS_HOST}:{app_settings.STARROCKS_PORT}" + ) + return True + except Exception as e: + await self.module_logger.log_error(f"Failed to create database connection pool: {e}") + return False + + async def close_pool(self): + """Close database connection pool""" + if self.pool: + self.pool.close() + await self.pool.wait_closed() + self.pool = None + await self.module_logger.log_info("Database connection pool closed") + + def get_pool(self) -> aiomysql.Pool: + """Get the database connection pool""" + if not self.pool: + raise Exception("Database connection pool not initialized") + return self.pool + + +# Global database connection pool instance +db_pool = DatabaseConnectionPool() + + +def register(app): + """Register database provider with FastAPI application""" + + @app.on_event("startup") + async def startup(): + """Initialize database connection pool on application startup""" + await db_pool.create_pool() + + @app.on_event("shutdown") + async def shutdown(): + """Close database connection pool on application shutdown""" + await db_pool.close_pool() + + +def boot(app): + """Boot database provider (if needed for additional setup)""" + pass