Add metrics service with user registration API endpoints

- Add complete metrics microservice structure
- Implement StarRocks database integration
- Add user registration data query APIs:
  - Daily registered users by date range
  - Recent N days registration data
  - Registration data by start date and days
  - Registration summary statistics
- Add comprehensive error handling and logging
- Include test scripts and documentation
This commit is contained in:
weicao 2025-09-11 17:35:20 +08:00
parent 046f9ffdd2
commit 44f08eee68
27 changed files with 891 additions and 0 deletions

75
apps/metrics/.gitignore vendored Normal file
View File

@ -0,0 +1,75 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Virtual environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# Logs
logs/
*.log
# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Test coverage
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# Environments
.env.local
.env.development.local
.env.test.local
.env.production.local

37
apps/metrics/Dockerfile Normal file
View File

@ -0,0 +1,37 @@
FROM python:3.10-slim-bullseye
# docker settings
ARG CONTAINER_APP_ROOT="/app"
ENV APP_NAME="metrics"
# Service dependencies
ENV DEVSVC_WEBAPI_URL_BASE="http://devsvc:8007/api/devsvc"
ENV NOTIFICATION_WEBAPI_URL_BASE="http://notification:8003/api/notification/"
# JWT settings
ENV JWT_SECRET_KEY="8f87ca8c3c9c3df09a9c78e0adb0927855568f6072d9efc892534aee35f5867b"
ENV JWT_ALGORITHM="HS256"
# Site settings
ENV SERVICE_API_ACCESS_HOST=0.0.0.0
ENV SERVICE_API_ACCESS_PORT=8009
ENV MONGODB_NAME=freeleaps2
ENV MONGODB_PORT=27017
ENV MONGODB_URI="mongodb://localhost:27017/"
# Log settings
ENV LOG_BASE_PATH=$CONTAINER_APP_ROOT/log/$APP_NAME
ENV BACKEND_LOG_FILE_NAME=$APP_NAME
ENV APPLICATION_ACTIVITY_LOG=$APP_NAME-activity
WORKDIR ${CONTAINER_APP_ROOT}
COPY requirements.txt .
RUN pip install --upgrade pip
RUN pip install --no-cache-dir -r requirements.txt
COPY . ${CONTAINER_APP_ROOT}
EXPOSE ${SERVICE_API_ACCESS_PORT}
# Using shell to expand environment to ensure pass the actual environment value to uvicorn
CMD uvicorn webapi.main:app --reload --port=$SERVICE_API_ACCESS_PORT --host=$SERVICE_API_ACCESS_HOST

1
apps/metrics/__init__.py Normal file
View File

@ -0,0 +1 @@
# Metrics Service

View File

@ -0,0 +1 @@
# Backend module

View File

View File

@ -0,0 +1,90 @@
import pymysql
from typing import List, Dict, Any, Optional
from datetime import date
from loguru import logger
from webapi.config.site_settings import site_settings
class StarRocksClient:
"""StarRocks database client for querying user registration data"""
def __init__(self):
self.host = site_settings.STARROCKS_HOST
self.port = site_settings.STARROCKS_PORT
self.user = site_settings.STARROCKS_USER
self.password = site_settings.STARROCKS_PASSWORD
self.database = site_settings.STARROCKS_DATABASE
self.connection = None
def connect(self) -> bool:
"""Establish connection to StarRocks database"""
try:
self.connection = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset='utf8mb4',
autocommit=True
)
logger.info(f"Successfully connected to StarRocks at {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Failed to connect to StarRocks: {e}")
return False
def disconnect(self):
"""Close database connection"""
if self.connection:
self.connection.close()
self.connection = None
logger.info("Disconnected from StarRocks")
def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
"""Execute SQL query and return results"""
if not self.connection:
if not self.connect():
raise Exception("Failed to connect to StarRocks database")
try:
with self.connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(query, params)
results = cursor.fetchall()
logger.info(f"Query executed successfully, returned {len(results)} rows")
return results
except Exception as e:
logger.error(f"Query execution failed: {e}")
raise e
def get_daily_registered_users(
self,
start_date: date,
end_date: date,
product_id: str = "freeleaps"
) -> List[Dict[str, Any]]:
"""Query daily registered users from StarRocks"""
query = """
SELECT
date_id,
product_id,
registered_cnt,
updated_at
FROM dws_daily_registered_users
WHERE date_id >= %s
AND date_id <= %s
AND product_id = %s
ORDER BY date_id ASC
"""
params = (start_date, end_date, product_id)
return self.execute_query(query, params)
def __enter__(self):
"""Context manager entry"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.disconnect()

View File

View File

@ -0,0 +1,26 @@
from pydantic import BaseModel
from datetime import date, datetime
from typing import List, Optional
class DailyRegisteredUsers(BaseModel):
"""Daily registered users data model"""
date_id: date
product_id: str = "freeleaps"
registered_cnt: int
updated_at: Optional[datetime] = None
class UserRegistrationQuery(BaseModel):
"""Query parameters for user registration data"""
start_date: date
end_date: date
product_id: str = "freeleaps"
class UserRegistrationResponse(BaseModel):
"""Response model for user registration data"""
dates: List[str]
counts: List[int]
total_registrations: int
query_period: str

View File

@ -0,0 +1,125 @@
from typing import List, Dict, Any
from datetime import date, timedelta
from loguru import logger
from backend.infra.starrocks_client import StarRocksClient
from backend.models.registered_users import UserRegistrationResponse, DailyRegisteredUsers
class RegistrationService:
"""Service for handling user registration data queries"""
def __init__(self):
self.starrocks_client = StarRocksClient()
def get_daily_registered_users(
self,
start_date: date,
end_date: date,
product_id: str = "freeleaps"
) -> UserRegistrationResponse:
"""
Get daily registered users count for a date range
Args:
start_date: Start date for the query
end_date: End date for the query
product_id: Product identifier (default: freeleaps)
Returns:
UserRegistrationResponse with dates and counts
"""
try:
# Query data from StarRocks
raw_data = self.starrocks_client.get_daily_registered_users(
start_date, end_date, product_id
)
# Convert to DailyRegisteredUsers objects
daily_data = [
DailyRegisteredUsers(
date_id=row['date_id'],
product_id=row['product_id'],
registered_cnt=row['registered_cnt'],
updated_at=row.get('updated_at')
)
for row in raw_data
]
# Create date-to-count mapping
data_dict = {str(item.date_id): item.registered_cnt for item in daily_data}
# Generate complete date range
dates = []
counts = []
current_date = start_date
while current_date <= end_date:
date_str = str(current_date)
dates.append(date_str)
counts.append(data_dict.get(date_str, 0))
current_date += timedelta(days=1)
# Calculate total registrations
total_registrations = sum(counts)
logger.info(
f"Retrieved registration data for {len(dates)} days, "
f"total registrations: {total_registrations}"
)
return UserRegistrationResponse(
dates=dates,
counts=counts,
total_registrations=total_registrations,
query_period=f"{start_date} to {end_date}"
)
except Exception as e:
logger.error(f"Failed to get daily registered users: {e}")
raise e
def get_registration_summary(
self,
start_date: date,
end_date: date,
product_id: str = "freeleaps"
) -> Dict[str, Any]:
"""
Get summary statistics for user registrations
Args:
start_date: Start date for the query
end_date: End date for the query
product_id: Product identifier
Returns:
Dictionary with summary statistics
"""
try:
response = self.get_daily_registered_users(start_date, end_date, product_id)
if not response.counts:
return {
"total_registrations": 0,
"average_daily": 0,
"max_daily": 0,
"min_daily": 0,
"days_with_registrations": 0,
"total_days": len(response.dates)
}
counts = response.counts
non_zero_counts = [c for c in counts if c > 0]
return {
"total_registrations": response.total_registrations,
"average_daily": round(sum(counts) / len(counts), 2),
"max_daily": max(counts),
"min_daily": min(counts),
"days_with_registrations": len(non_zero_counts),
"total_days": len(response.dates)
}
except Exception as e:
logger.error(f"Failed to get registration summary: {e}")
raise e

View File

View File

@ -0,0 +1,16 @@
fastapi==0.114.0
pydantic==2.9.2
loguru==0.7.2
uvicorn==0.23.2
beanie==1.21.0
pika==1.3.2
aio-pika
httpx
pydantic-settings
python-jose
passlib[bcrypt]
prometheus-fastapi-instrumentator==7.0.2
pytest==8.4.1
pytest-asyncio==0.21.2
pymysql==1.1.0
sqlalchemy==2.0.23

38
apps/metrics/start_fastapi.sh Executable file
View File

@ -0,0 +1,38 @@
#!/bin/bash
rp=$(dirname "$(realpath '$1')")
pushd $rp
APP_NAME=metrics
APP_PARENT_FOLDER=apps
GIT_REPO_ROOT=$(git rev-parse --show-toplevel)
CODEBASE_ROOT=$GIT_REPO_ROOT/$APP_PARENT_FOLDER/$APP_NAME
SITE_DEPLOY_FOLDER=$GIT_REPO_ROOT/sites/$APP_NAME/deploy
echo APP_NAME=$APP_NAME > .env
cat $SITE_DEPLOY_FOLDER/common/.env >> .env
echo GIT_REPO_ROOT=$(git rev-parse --show-toplevel) >> .env
echo CODEBASE_ROOT=$GIT_REPO_ROOT/$APP_PARENT_FOLDER/$APP_NAME >> .env
echo SITE_DEPLOY_FOLDER=$GIT_REPO_ROOT/sites/$APP_NAME/deploy >> .env
cat $SITE_DEPLOY_FOLDER/common/.host.env >> .env
cat $SITE_DEPLOY_FOLDER/local/.env >> .env
. .env
if [ -d "$VENV_DIR" ]
then
echo "Folder $VENV_DIR exists. Proceed to next steps"
else
echo "Folder $VENV_DIR doesn't exist. create it"
sudo apt install python3-pip
python3 -m pip install virtualenv
python3 -m virtualenv $VENV_DIR
fi
source $VENV_DIR/bin/activate
pip install --upgrade pip
pip install -r requirements.txt
uvicorn webapi.main:app --reload --host 0.0.0.0 --port $SERVICE_API_ACCESS_PORT
popd

View File

@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
Test script for registration API endpoints
"""
import requests
import json
from datetime import date, timedelta
# API base URL
BASE_URL = "http://localhost:8009"
def test_daily_registered_users():
"""Test the daily registered users endpoint"""
print("Testing daily registered users endpoint...")
# Test with last 7 days
end_date = date.today()
start_date = end_date - timedelta(days=6)
url = f"{BASE_URL}/api/metrics/daily-registered-users"
params = {
"start_date": str(start_date),
"end_date": str(end_date),
"product_id": "freeleaps"
}
try:
response = requests.get(url, params=params)
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f"Response: {json.dumps(data, indent=2)}")
print(f"Number of days: {len(data['dates'])}")
print(f"Total registrations: {data['total_registrations']}")
else:
print(f"Error: {response.text}")
except Exception as e:
print(f"Request failed: {e}")
def test_registration_summary():
"""Test the registration summary endpoint"""
print("\nTesting registration summary endpoint...")
end_date = date.today()
start_date = end_date - timedelta(days=6)
url = f"{BASE_URL}/api/metrics/registration-summary"
params = {
"start_date": str(start_date),
"end_date": str(end_date),
"product_id": "freeleaps"
}
try:
response = requests.get(url, params=params)
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f"Summary: {json.dumps(data, indent=2)}")
else:
print(f"Error: {response.text}")
except Exception as e:
print(f"Request failed: {e}")
def test_post_method():
"""Test the POST method for daily registered users"""
print("\nTesting POST method for daily registered users...")
end_date = date.today()
start_date = end_date - timedelta(days=6)
url = f"{BASE_URL}/api/metrics/daily-registered-users"
payload = {
"start_date": str(start_date),
"end_date": str(end_date),
"product_id": "freeleaps"
}
try:
response = requests.post(url, json=payload)
print(f"Status Code: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f"Response: {json.dumps(data, indent=2)}")
else:
print(f"Error: {response.text}")
except Exception as e:
print(f"Request failed: {e}")
if __name__ == "__main__":
print("Starting registration API tests...")
print(f"Testing against: {BASE_URL}")
print("=" * 50)
test_daily_registered_users()
test_registration_summary()
test_post_method()
print("\n" + "=" * 50)
print("Tests completed!")

View File

View File

@ -0,0 +1 @@
# WebAPI module

View File

@ -0,0 +1,69 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from prometheus_fastapi_instrumentator import Instrumentator
from webapi.config.site_settings import site_settings
from loguru import logger
import os
def create_app() -> FastAPI:
"""
Create and configure the FastAPI application
"""
app = FastAPI(
title="Metrics Service API",
description="Metrics Service for Freeleaps Platform",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Setup logging
setup_logging()
# Setup Prometheus metrics
Instrumentator().instrument(app).expose(app)
# Include routers
# from webapi.routes import health, api
# app.include_router(health.router, prefix="/health", tags=["health"])
# app.include_router(api.router, prefix="/api/metrics", tags=["metrics"])
# Note: Registration router is included in main.py
return app
def setup_logging():
"""
Setup logging configuration
"""
# Create log directory if it doesn't exist
log_dir = site_settings.LOG_BASE_PATH
os.makedirs(log_dir, exist_ok=True)
# Configure loguru
logger.add(
f"{log_dir}/{site_settings.BACKEND_LOG_FILE_NAME}.log",
rotation="1 day",
retention="30 days",
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}"
)
logger.add(
f"{log_dir}/{site_settings.APPLICATION_ACTIVITY_LOG}.log",
rotation="1 day",
retention="30 days",
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} | {message}",
filter=lambda record: record["level"].name == "INFO"
)

View File

View File

@ -0,0 +1,41 @@
from pydantic_settings import BaseSettings
from typing import Optional
class SiteSettings(BaseSettings):
# Server settings
SERVER_HOST: str = "0.0.0.0"
SERVER_PORT: int = 8009
SERVICE_API_ACCESS_HOST: str = "0.0.0.0"
SERVICE_API_ACCESS_PORT: int = 8009
# Database settings
MONGODB_URI: str = "mongodb://localhost:27017/"
MONGODB_NAME: str = "freeleaps2"
MONGODB_PORT: int = 27017
# JWT settings
JWT_SECRET_KEY: str = "8f87ca8c3c9c3df09a9c78e0adb0927855568f6072d9efc892534aee35f5867b"
JWT_ALGORITHM: str = "HS256"
# Log settings
LOG_BASE_PATH: str = "./logs"
BACKEND_LOG_FILE_NAME: str = "metrics"
APPLICATION_ACTIVITY_LOG: str = "metrics-activity"
# Service dependencies
DEVSVC_WEBAPI_URL_BASE: str = "http://devsvc:8007/api/devsvc"
NOTIFICATION_WEBAPI_URL_BASE: str = "http://notification:8003/api/notification/"
# StarRocks database settings
STARROCKS_HOST: str = "freeleaps-starrocks-cluster-fe-service.freeleaps-data-platform.svc"
STARROCKS_PORT: int = 9030
STARROCKS_USER: str = "root"
STARROCKS_PASSWORD: str = ""
STARROCKS_DATABASE: str = "freeleaps"
class Config:
env_file = ".env"
site_settings = SiteSettings()

View File

@ -0,0 +1,36 @@
from webapi.bootstrap.application import create_app
from webapi.config.site_settings import site_settings
from fastapi.responses import RedirectResponse
import uvicorn
from typing import Any
from webapi.routes import registration
app = create_app()
# Include routers
app.include_router(registration.router)
@app.get("/", status_code=301)
async def root():
"""
TODO: redirect client to /docs
"""
return RedirectResponse("docs")
if __name__ == "__main__":
uvicorn.run(
app="main:app", host=site_settings.SERVER_HOST, port=site_settings.SERVER_PORT
)
def get_context() -> Any:
# Define your context function. This is where you can set up authentication, database connections, etc.
return {}
def get_root_value() -> Any:
# Define your root value function. This is where you can set up the root value for GraphQL.
return {}

View File

View File

@ -0,0 +1,229 @@
from fastapi import APIRouter, HTTPException, Query
from datetime import date, datetime, timedelta
from typing import Optional
from loguru import logger
from backend.services.registration_service import RegistrationService
from backend.models.registered_users import UserRegistrationResponse, UserRegistrationQuery
router = APIRouter(prefix="/api/metrics", tags=["registration"])
# Initialize service
registration_service = RegistrationService()
@router.get("/daily-registered-users", response_model=UserRegistrationResponse)
async def get_daily_registered_users(
start_date: date = Query(..., description="Start date in YYYY-MM-DD format"),
end_date: date = Query(..., description="End date in YYYY-MM-DD format"),
product_id: str = Query("freeleaps", description="Product identifier")
):
"""
Get daily registered users count for a date range
Returns two lists:
- dates: List of dates in YYYY-MM-DD format
- counts: List of daily registration counts
Example:
- GET /api/metrics/daily-registered-users?start_date=2024-01-01&end_date=2024-01-07
"""
try:
# Validate date range
if start_date > end_date:
raise HTTPException(
status_code=400,
detail="Start date must be before or equal to end date"
)
# Check date range is not too large (max 1 year)
if (end_date - start_date).days > 365:
raise HTTPException(
status_code=400,
detail="Date range cannot exceed 365 days"
)
logger.info(f"Querying registration data from {start_date} to {end_date} for product {product_id}")
# Get data from service
result = registration_service.get_daily_registered_users(
start_date, end_date, product_id
)
logger.info(f"Successfully retrieved data for {len(result.dates)} days")
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get daily registered users: {e}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@router.get("/registration-summary")
async def get_registration_summary(
start_date: date = Query(..., description="Start date in YYYY-MM-DD format"),
end_date: date = Query(..., description="End date in YYYY-MM-DD format"),
product_id: str = Query("freeleaps", description="Product identifier")
):
"""
Get summary statistics for user registrations in a date range
Returns summary statistics including:
- total_registrations: Total number of registrations
- average_daily: Average daily registrations
- max_daily: Maximum daily registrations
- min_daily: Minimum daily registrations
- days_with_registrations: Number of days with registrations
- total_days: Total number of days in range
"""
try:
# Validate date range
if start_date > end_date:
raise HTTPException(
status_code=400,
detail="Start date must be before or equal to end date"
)
if (end_date - start_date).days > 365:
raise HTTPException(
status_code=400,
detail="Date range cannot exceed 365 days"
)
logger.info(f"Querying registration summary from {start_date} to {end_date} for product {product_id}")
# Get summary from service
summary = registration_service.get_registration_summary(
start_date, end_date, product_id
)
return summary
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get registration summary: {e}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@router.get("/recent-registered-users", response_model=UserRegistrationResponse)
async def get_recent_registered_users(
days: int = Query(7, ge=1, le=365, description="Number of recent days to query"),
product_id: str = Query("freeleaps", description="Product identifier")
):
"""
Get daily registered users count for recent N days
Returns registration data for the last N days from today
Example:
- GET /api/metrics/recent-registered-users?days=7
- GET /api/metrics/recent-registered-users?days=30&product_id=freeleaps
"""
try:
# Calculate date range
end_date = date.today()
start_date = end_date - timedelta(days=days-1)
logger.info(f"Querying recent {days} days registration data from {start_date} to {end_date} for product {product_id}")
# Get data from service
result = registration_service.get_daily_registered_users(
start_date, end_date, product_id
)
logger.info(f"Successfully retrieved recent {days} days data, total registrations: {result.total_registrations}")
return result
except Exception as e:
logger.error(f"Failed to get recent registered users: {e}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@router.get("/registered-users-by-days", response_model=UserRegistrationResponse)
async def get_registered_users_by_days(
start_date: date = Query(..., description="Start date in YYYY-MM-DD format"),
days: int = Query(..., ge=1, le=365, description="Number of days from start date"),
product_id: str = Query("freeleaps", description="Product identifier")
):
"""
Get daily registered users count starting from a specific date for N days
Returns registration data for N days starting from the specified start date
Example:
- GET /api/metrics/registered-users-by-days?start_date=2024-01-01&days=7
- GET /api/metrics/registered-users-by-days?start_date=2024-09-01&days=30&product_id=freeleaps
"""
try:
# Calculate end date
end_date = start_date + timedelta(days=days-1)
logger.info(f"Querying registration data from {start_date} for {days} days (until {end_date}) for product {product_id}")
# Get data from service
result = registration_service.get_daily_registered_users(
start_date, end_date, product_id
)
logger.info(f"Successfully retrieved {days} days data from {start_date}, total registrations: {result.total_registrations}")
return result
except Exception as e:
logger.error(f"Failed to get registered users by days: {e}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)
@router.post("/daily-registered-users", response_model=UserRegistrationResponse)
async def get_daily_registered_users_post(
query: UserRegistrationQuery
):
"""
Get daily registered users count for a date range (POST method)
Same as GET method but accepts parameters in request body
"""
try:
# Validate date range
if query.start_date > query.end_date:
raise HTTPException(
status_code=400,
detail="Start date must be before or equal to end date"
)
if (query.end_date - query.start_date).days > 365:
raise HTTPException(
status_code=400,
detail="Date range cannot exceed 365 days"
)
logger.info(f"Querying registration data from {query.start_date} to {query.end_date} for product {query.product_id}")
# Get data from service
result = registration_service.get_daily_registered_users(
query.start_date, query.end_date, query.product_id
)
logger.info(f"Successfully retrieved data for {len(result.dates)} days")
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get daily registered users: {e}")
raise HTTPException(
status_code=500,
detail=f"Internal server error: {str(e)}"
)