freeleaps-service-hub/apps/metrics/backend/infra/external_service/starrocks_client.py
2025-09-19 15:32:00 +08:00

61 lines
2.3 KiB
Python

import pymysql
from typing import List, Dict, Any, Optional
from datetime import date
from common.log.module_logger import ModuleLogger
from common.config.app_settings import app_settings
class StarRocksClient:
"""StarRocks database client for querying user registration data"""
def __init__(self):
self.host = app_settings.STARROCKS_HOST
self.port = app_settings.STARROCKS_PORT
self.user = app_settings.STARROCKS_USER
self.password = app_settings.STARROCKS_PASSWORD
self.database = app_settings.STARROCKS_DATABASE
self.connection = None
self.module_logger = ModuleLogger(__file__)
async def connect(self) -> bool:
"""Establish connection to StarRocks database"""
try:
self.connection = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset='utf8mb4',
autocommit=True
)
await self.module_logger.log_info(f"Successfully connected to StarRocks at {self.host}:{self.port}")
return True
except Exception as e:
await self.module_logger.log_error(f"Failed to connect to StarRocks: {e}")
return False
async def disconnect(self):
"""Close database connection"""
if self.connection:
self.connection.close()
self.connection = None
await self.module_logger.log_info("Disconnected from StarRocks")
async def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
"""Execute SQL query and return results"""
if not self.connection:
if not await self.connect():
raise Exception("Failed to connect to StarRocks database")
try:
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(query, params)
results = cursor.fetchall()
await self.module_logger.log_info(f"Query executed successfully, returned {len(results)} rows")
return results
except Exception as e:
await self.module_logger.log_error(f"Query execution failed: {e}")
raise e