freeleaps-service-hub/apps/metrics/backend/infra/database_client.py
weicao 35fbda6954 Refactor metrics service: rename files for better readability
- Rename starrocks_client.py -> database_client.py
- Rename daily_registered_users.py -> user_registration_models.py
- Rename daily_registration_service.py -> registration_analytics_service.py
- Rename daily_registration.py -> registration_metrics.py
- Rename site_settings.py -> app_settings.py
- Rename application.py -> app_factory.py
- Update all import statements and references
- Update README.md with new file structure
2025-09-15 16:31:20 +08:00

100 lines
3.0 KiB
Python

import pymysql
from typing import List, Dict, Any, Optional
from datetime import date
from loguru import logger
from webapi.config.app_settings import site_settings
class StarRocksClient:
"""StarRocks database client for querying user registration data"""
def __init__(self):
self.host = site_settings.STARROCKS_HOST
self.port = site_settings.STARROCKS_PORT
self.user = site_settings.STARROCKS_USER
self.password = site_settings.STARROCKS_PASSWORD
self.database = site_settings.STARROCKS_DATABASE
self.connection = None
def connect(self) -> bool:
"""Establish connection to StarRocks database"""
try:
self.connection = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset='utf8mb4',
autocommit=True
)
logger.info(f"Successfully connected to StarRocks at {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Failed to connect to StarRocks: {e}")
return False
def disconnect(self):
"""Close database connection"""
if self.connection:
self.connection.close()
self.connection = None
logger.info("Disconnected from StarRocks")
def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
"""Execute SQL query and return results"""
if not self.connection:
if not self.connect():
raise Exception("Failed to connect to StarRocks database")
try:
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(query, params)
results = cursor.fetchall()
logger.info(f"Query executed successfully, returned {len(results)} rows")
return results
except Exception as e:
logger.error(f"Query execution failed: {e}")
raise e
def get_daily_registered_users(
self,
start_date: date,
end_date: date,
product_id: str = "freeleaps"
) -> List[Dict[str, Any]]:
"""Query daily registered users from StarRocks"""
query = """
SELECT
date_id,
product_id,
registered_cnt,
updated_at
FROM dws_daily_registered_users
WHERE date_id >= %s
AND date_id <= %s
AND product_id = %s
ORDER BY date_id ASC
"""
params = (start_date, end_date, product_id)
return self.execute_query(query, params)
def __enter__(self):
"""Context manager entry"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.disconnect()