freeleaps-service-hub/apps/metrics/webapi/providers/database.py

77 lines
2.6 KiB
Python

import sys
import aiomysql
from typing import Optional
from common.config.app_settings import app_settings
from common.log.module_logger import ModuleLogger
class DatabaseConnectionPool:
"""Database connection pool manager for StarRocks"""
def __init__(self):
self.pool: Optional[aiomysql.Pool] = None
self.module_logger = ModuleLogger(__file__)
async def create_pool(self) -> bool:
"""Create database connection pool"""
try:
self.pool = await aiomysql.create_pool(
host=app_settings.STARROCKS_HOST,
port=app_settings.STARROCKS_PORT,
user=app_settings.STARROCKS_USER,
password=app_settings.STARROCKS_PASSWORD,
db=app_settings.STARROCKS_DATABASE,
charset='utf8mb4',
autocommit=True,
minsize=5, # Minimum number of connections in the pool
maxsize=10, # Maximum number of connections in the pool
pool_recycle=app_settings.STARROCKS_POOL_RECYCLE, # Recycle connections after 0.5 hours
connect_timeout=30, # Connection timeout in seconds
echo=False # Set to True for SQL query logging
)
await self.module_logger.log_info(
f"Database connection pool created successfully for StarRocks"
)
return True
except Exception as e:
await self.module_logger.log_error(f"Failed to create database connection pool: {e}")
sys.exit(1)
async def close_pool(self):
"""Close database connection pool"""
if self.pool:
self.pool.close()
await self.pool.wait_closed()
self.pool = None
await self.module_logger.log_info("Database connection pool closed")
def get_pool(self) -> aiomysql.Pool:
"""Get the database connection pool"""
if not self.pool:
raise Exception("Database connection pool not initialized")
return self.pool
# Global database connection pool instance
db_pool = DatabaseConnectionPool()
def register(app):
"""Register database provider with FastAPI application"""
@app.on_event("startup")
async def startup():
"""Initialize database connection pool on application startup"""
await db_pool.create_pool()
@app.on_event("shutdown")
async def shutdown():
"""Close database connection pool on application shutdown"""
await db_pool.close_pool()
def boot(app):
"""Boot database provider (if needed for additional setup)"""
pass