- Add complete metrics microservice structure - Implement StarRocks database integration - Add user registration data query APIs: - Daily registered users by date range - Recent N days registration data - Registration data by start date and days - Registration summary statistics - Add comprehensive error handling and logging - Include test scripts and documentation
91 lines
3.0 KiB
Python
91 lines
3.0 KiB
Python
import pymysql
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import date
|
|
from loguru import logger
|
|
from webapi.config.site_settings import site_settings
|
|
|
|
|
|
class StarRocksClient:
|
|
"""StarRocks database client for querying user registration data"""
|
|
|
|
def __init__(self):
|
|
self.host = site_settings.STARROCKS_HOST
|
|
self.port = site_settings.STARROCKS_PORT
|
|
self.user = site_settings.STARROCKS_USER
|
|
self.password = site_settings.STARROCKS_PASSWORD
|
|
self.database = site_settings.STARROCKS_DATABASE
|
|
self.connection = None
|
|
|
|
def connect(self) -> bool:
|
|
"""Establish connection to StarRocks database"""
|
|
try:
|
|
self.connection = pymysql.connect(
|
|
host=self.host,
|
|
port=self.port,
|
|
user=self.user,
|
|
password=self.password,
|
|
database=self.database,
|
|
charset='utf8mb4',
|
|
autocommit=True
|
|
)
|
|
logger.info(f"Successfully connected to StarRocks at {self.host}:{self.port}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to StarRocks: {e}")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Close database connection"""
|
|
if self.connection:
|
|
self.connection.close()
|
|
self.connection = None
|
|
logger.info("Disconnected from StarRocks")
|
|
|
|
def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
|
|
"""Execute SQL query and return results"""
|
|
if not self.connection:
|
|
if not self.connect():
|
|
raise Exception("Failed to connect to StarRocks database")
|
|
|
|
try:
|
|
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
|
cursor.execute(query, params)
|
|
results = cursor.fetchall()
|
|
logger.info(f"Query executed successfully, returned {len(results)} rows")
|
|
return results
|
|
except Exception as e:
|
|
logger.error(f"Query execution failed: {e}")
|
|
raise e
|
|
|
|
def get_daily_registered_users(
|
|
self,
|
|
start_date: date,
|
|
end_date: date,
|
|
product_id: str = "freeleaps"
|
|
) -> List[Dict[str, Any]]:
|
|
"""Query daily registered users from StarRocks"""
|
|
query = """
|
|
SELECT
|
|
date_id,
|
|
product_id,
|
|
registered_cnt,
|
|
updated_at
|
|
FROM dws_daily_registered_users
|
|
WHERE date_id >= %s
|
|
AND date_id <= %s
|
|
AND product_id = %s
|
|
ORDER BY date_id ASC
|
|
"""
|
|
|
|
params = (start_date, end_date, product_id)
|
|
return self.execute_query(query, params)
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry"""
|
|
self.connect()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit"""
|
|
self.disconnect()
|