freeleaps-service-hub/apps/metrics/backend/infra/external_service/starrocks_client.py
2025-09-25 18:25:51 +08:00

29 lines
1.1 KiB
Python

import aiomysql
from typing import List, Dict, Any, Optional
from datetime import date
from common.log.module_logger import ModuleLogger
from webapi.providers.database import db_pool
class StarRocksClient:
"""StarRocks database client for querying user registration data using connection pool"""
def __init__(self):
self.module_logger = ModuleLogger(__file__)
async def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
"""Execute SQL query and return results using connection pool"""
try:
# Get connection from pool
pool = db_pool.get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(query, params)
results = await cursor.fetchall()
await self.module_logger.log_info(f"Query executed successfully, returned {len(results)} rows")
return results
except Exception as e:
await self.module_logger.log_error(f"Query execution failed: {e}")
raise e