feat: add starrock connect pool
This commit is contained in:
parent
d4c027ed03
commit
eae0255766
@ -1,57 +1,25 @@
|
|||||||
import pymysql
|
import aiomysql
|
||||||
from typing import List, Dict, Any, Optional
|
from typing import List, Dict, Any, Optional
|
||||||
from datetime import date
|
from datetime import date
|
||||||
from common.log.module_logger import ModuleLogger
|
from common.log.module_logger import ModuleLogger
|
||||||
from common.config.app_settings import app_settings
|
from webapi.providers.database import db_pool
|
||||||
|
|
||||||
|
|
||||||
class StarRocksClient:
|
class StarRocksClient:
|
||||||
"""StarRocks database client for querying user registration data"""
|
"""StarRocks database client for querying user registration data using connection pool"""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.host = app_settings.STARROCKS_HOST
|
|
||||||
self.port = app_settings.STARROCKS_PORT
|
|
||||||
self.user = app_settings.STARROCKS_USER
|
|
||||||
self.password = app_settings.STARROCKS_PASSWORD
|
|
||||||
self.database = app_settings.STARROCKS_DATABASE
|
|
||||||
self.connection = None
|
|
||||||
self.module_logger = ModuleLogger(__file__)
|
self.module_logger = ModuleLogger(__file__)
|
||||||
|
|
||||||
async def connect(self) -> bool:
|
|
||||||
"""Establish connection to StarRocks database"""
|
|
||||||
try:
|
|
||||||
self.connection = pymysql.connect(
|
|
||||||
host=self.host,
|
|
||||||
port=self.port,
|
|
||||||
user=self.user,
|
|
||||||
password=self.password,
|
|
||||||
database=self.database,
|
|
||||||
charset='utf8mb4',
|
|
||||||
autocommit=True
|
|
||||||
)
|
|
||||||
await self.module_logger.log_info(f"Successfully connected to StarRocks at {self.host}:{self.port}")
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
await self.module_logger.log_error(f"Failed to connect to StarRocks: {e}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def disconnect(self):
|
|
||||||
"""Close database connection"""
|
|
||||||
if self.connection:
|
|
||||||
self.connection.close()
|
|
||||||
self.connection = None
|
|
||||||
await self.module_logger.log_info("Disconnected from StarRocks")
|
|
||||||
|
|
||||||
async def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
|
async def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
|
||||||
"""Execute SQL query and return results"""
|
"""Execute SQL query and return results using connection pool"""
|
||||||
if not self.connection:
|
|
||||||
if not await self.connect():
|
|
||||||
raise Exception("Failed to connect to StarRocks database")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
# Get connection from pool
|
||||||
cursor.execute(query, params)
|
pool = db_pool.get_pool()
|
||||||
results = cursor.fetchall()
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||||||
|
await cursor.execute(query, params)
|
||||||
|
results = await cursor.fetchall()
|
||||||
await self.module_logger.log_info(f"Query executed successfully, returned {len(results)} rows")
|
await self.module_logger.log_info(f"Query executed successfully, returned {len(results)} rows")
|
||||||
return results
|
return results
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@ -14,4 +14,5 @@ pytest==8.4.1
|
|||||||
pytest-asyncio==0.21.2
|
pytest-asyncio==0.21.2
|
||||||
pymysql==1.1.0
|
pymysql==1.1.0
|
||||||
sqlalchemy==2.0.23
|
sqlalchemy==2.0.23
|
||||||
|
aiomysql==0.2.0
|
||||||
python-dotenv
|
python-dotenv
|
||||||
|
|||||||
@ -4,7 +4,7 @@ from fastapi import FastAPI
|
|||||||
from fastapi.openapi.utils import get_openapi
|
from fastapi.openapi.utils import get_openapi
|
||||||
|
|
||||||
from common.config.app_settings import app_settings
|
from common.config.app_settings import app_settings
|
||||||
from webapi.providers import exception_handler, common, probes, metrics, router
|
from webapi.providers import exception_handler, common, probes, metrics, router, database
|
||||||
from webapi.providers.logger import register_logger
|
from webapi.providers.logger import register_logger
|
||||||
|
|
||||||
|
|
||||||
@ -14,6 +14,7 @@ def create_app() -> FastAPI:
|
|||||||
app = FreeleapsMetricsApp()
|
app = FreeleapsMetricsApp()
|
||||||
|
|
||||||
register_logger()
|
register_logger()
|
||||||
|
register(app, database)
|
||||||
register(app, exception_handler)
|
register(app, exception_handler)
|
||||||
register(app, router)
|
register(app, router)
|
||||||
register(app, common)
|
register(app, common)
|
||||||
|
|||||||
74
apps/metrics/webapi/providers/database.py
Normal file
74
apps/metrics/webapi/providers/database.py
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
import aiomysql
|
||||||
|
from typing import Optional
|
||||||
|
from common.config.app_settings import app_settings
|
||||||
|
from common.log.module_logger import ModuleLogger
|
||||||
|
|
||||||
|
|
||||||
|
class DatabaseConnectionPool:
|
||||||
|
"""Database connection pool manager for StarRocks"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.pool: Optional[aiomysql.Pool] = None
|
||||||
|
self.module_logger = ModuleLogger(__file__)
|
||||||
|
|
||||||
|
async def create_pool(self) -> bool:
|
||||||
|
"""Create database connection pool"""
|
||||||
|
try:
|
||||||
|
self.pool = await aiomysql.create_pool(
|
||||||
|
host=app_settings.STARROCKS_HOST,
|
||||||
|
port=app_settings.STARROCKS_PORT,
|
||||||
|
user=app_settings.STARROCKS_USER,
|
||||||
|
password=app_settings.STARROCKS_PASSWORD,
|
||||||
|
db=app_settings.STARROCKS_DATABASE,
|
||||||
|
charset='utf8mb4',
|
||||||
|
autocommit=True,
|
||||||
|
minsize=5, # Minimum number of connections in the pool
|
||||||
|
maxsize=20, # Maximum number of connections in the pool
|
||||||
|
pool_recycle=3600, # Recycle connections after 1 hour
|
||||||
|
echo=False # Set to True for SQL query logging
|
||||||
|
)
|
||||||
|
await self.module_logger.log_info(
|
||||||
|
f"Database connection pool created successfully for StarRocks at "
|
||||||
|
f"{app_settings.STARROCKS_HOST}:{app_settings.STARROCKS_PORT}"
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
await self.module_logger.log_error(f"Failed to create database connection pool: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def close_pool(self):
|
||||||
|
"""Close database connection pool"""
|
||||||
|
if self.pool:
|
||||||
|
self.pool.close()
|
||||||
|
await self.pool.wait_closed()
|
||||||
|
self.pool = None
|
||||||
|
await self.module_logger.log_info("Database connection pool closed")
|
||||||
|
|
||||||
|
def get_pool(self) -> aiomysql.Pool:
|
||||||
|
"""Get the database connection pool"""
|
||||||
|
if not self.pool:
|
||||||
|
raise Exception("Database connection pool not initialized")
|
||||||
|
return self.pool
|
||||||
|
|
||||||
|
|
||||||
|
# Global database connection pool instance
|
||||||
|
db_pool = DatabaseConnectionPool()
|
||||||
|
|
||||||
|
|
||||||
|
def register(app):
|
||||||
|
"""Register database provider with FastAPI application"""
|
||||||
|
|
||||||
|
@app.on_event("startup")
|
||||||
|
async def startup():
|
||||||
|
"""Initialize database connection pool on application startup"""
|
||||||
|
await db_pool.create_pool()
|
||||||
|
|
||||||
|
@app.on_event("shutdown")
|
||||||
|
async def shutdown():
|
||||||
|
"""Close database connection pool on application shutdown"""
|
||||||
|
await db_pool.close_pool()
|
||||||
|
|
||||||
|
|
||||||
|
def boot(app):
|
||||||
|
"""Boot database provider (if needed for additional setup)"""
|
||||||
|
pass
|
||||||
Loading…
Reference in New Issue
Block a user