refactor: refactor the metric project structure

This commit is contained in:
icecheng 2025-09-16 15:11:53 +08:00
parent 35fbda6954
commit 7027e8c3f7
26 changed files with 735 additions and 31 deletions

View File

@ -2,18 +2,18 @@ import pymysql
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from datetime import date from datetime import date
from loguru import logger from loguru import logger
from webapi.config.app_settings import site_settings from common.config.app_settings import app_settings
class StarRocksClient: class StarRocksClient:
"""StarRocks database client for querying user registration data""" """StarRocks database client for querying user registration data"""
def __init__(self): def __init__(self):
self.host = site_settings.STARROCKS_HOST self.host = app_settings.STARROCKS_HOST
self.port = site_settings.STARROCKS_PORT self.port = app_settings.STARROCKS_PORT
self.user = site_settings.STARROCKS_USER self.user = app_settings.STARROCKS_USER
self.password = site_settings.STARROCKS_PASSWORD self.password = app_settings.STARROCKS_PASSWORD
self.database = site_settings.STARROCKS_DATABASE self.database = app_settings.STARROCKS_DATABASE
self.connection = None self.connection = None
def connect(self) -> bool: def connect(self) -> bool:

View File

View File

@ -2,13 +2,7 @@ from pydantic_settings import BaseSettings
from typing import Optional from typing import Optional
class SiteSettings(BaseSettings): class AppSettings(BaseSettings):
# Server settings
SERVER_HOST: str = "0.0.0.0"
SERVER_PORT: int = 8009
SERVICE_API_ACCESS_HOST: str = "0.0.0.0"
SERVICE_API_ACCESS_PORT: int = 8009
# Log settings # Log settings
LOG_BASE_PATH: str = "./logs" LOG_BASE_PATH: str = "./logs"
BACKEND_LOG_FILE_NAME: str = "metrics" BACKEND_LOG_FILE_NAME: str = "metrics"
@ -21,8 +15,15 @@ class SiteSettings(BaseSettings):
STARROCKS_PASSWORD: str = "" STARROCKS_PASSWORD: str = ""
STARROCKS_DATABASE: str = "freeleaps" STARROCKS_DATABASE: str = "freeleaps"
# Prometheus settings
PROMETHEUS_ENDPOINT: str = "http://localhost:9090"
METRICS_ENABLED: bool = False
PROBES_ENABLED: bool = True
class Config: class Config:
env_file = "local.env" env_file = "local.env"
site_settings = SiteSettings() app_settings = AppSettings()

View File

@ -0,0 +1,17 @@
import os
from dataclasses import dataclass
from .app_settings import app_settings
from .site_settings import site_settings
@dataclass
class LogSettings:
LOG_PATH_BASE: str = app_settings.LOG_BASE_PATH
LOG_RETENTION: str = os.environ.get("LOG_RETENTION", "30 days")
LOG_ROTATION: str = os.environ.get("LOG_ROTATION", "00:00") # midnight
MAX_BACKUP_FILES: int = int(os.environ.get("LOG_BACKUP_FILES", 5))
LOG_ROTATION_BYTES: int = int(os.environ.get("LOG_ROTATION_BYTES", 10 * 1024 * 1024)) # 10 MB
APP_NAME: str = site_settings.NAME
ENVIRONMENT: str = site_settings.ENV
log_settings = LogSettings()

View File

@ -0,0 +1,26 @@
import os
from pydantic_settings import BaseSettings
# NOTE: The values fall backs to your environment variables when not set here
class SiteSettings(BaseSettings):
NAME: str = "FREELEAPS-METRICS"
DEBUG: bool = True
ENV: str = "dev"
SERVER_HOST: str = "localhost"
SERVER_PORT: int = 9000
URL: str = "http://localhost"
TIME_ZONE: str = "UTC"
BASE_PATH: str = os.path.dirname(os.path.dirname((os.path.abspath(__file__))))
class Config:
env_file = ".devbase-webapi.env"
env_file_encoding = "utf-8"
site_settings = SiteSettings()

View File

View File

@ -0,0 +1,12 @@
from .base_logger import LoggerBase
from app.common.config.app_settings import app_settings
class ApplicationLogger(LoggerBase):
def __init__(self, application_activities: dict[str, any] = {}) -> None:
extra_fileds = {}
if application_activities:
extra_fileds.update(application_activities)
super().__init__(
logger_name=app_settings.APPLICATION_ACTIVITY_LOG,
extra_fileds=extra_fileds,
)

View File

@ -0,0 +1,136 @@
from loguru import logger as guru_logger
from common.config.log_settings import log_settings
from typing import Dict, Any, Optional
import socket
import json
import threading
import os
import sys
import inspect
import logging
from common.log.json_sink import JsonSink
class LoggerBase:
binded_loggers = {}
logger_lock = threading.Lock()
def __init__(self, logger_name: str, extra_fileds: dict[str, any]) -> None:
self.__logger_name = logger_name
self.extra_fileds = extra_fileds
with LoggerBase.logger_lock:
if self.__logger_name in LoggerBase.binded_loggers:
self.logger = LoggerBase.binded_loggers[self.__logger_name]
return
log_filename = f"{log_settings.LOG_PATH_BASE}/{self.__logger_name}.log"
log_level = "INFO"
rotation_bytes = int(log_settings.LOG_ROTATION_BYTES or 10 * 1024 * 1024)
guru_logger.remove()
file_sink = JsonSink(
log_file_path=log_filename,
rotation_size_bytes=rotation_bytes,
max_backup_files=log_settings.MAX_BACKUP_FILES
)
guru_logger.add(
sink=file_sink,
level=log_level,
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
)
guru_logger.add(
sink=sys.stderr,
level=log_level,
format="{level} - {time:YYYY-MM-DD HH:mm:ss} - <{extra[log_file]}:{extra[log_line]}> - {extra[properties_str]} - {message}",
filter=lambda record: record["extra"].get("topic") == self.__logger_name,
)
host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name)
self.logger = guru_logger.bind(
topic=self.__logger_name,
host_ip=host_ip,
host_name=host_name,
app=log_settings.APP_NAME,
env=log_settings.ENVIRONMENT,
)
with LoggerBase.logger_lock:
LoggerBase.binded_loggers[self.__logger_name] = self.logger
def _get_log_context(self) -> dict:
frame = inspect.currentframe().f_back.f_back
filename = os.path.basename(frame.f_code.co_filename)
lineno = frame.f_lineno
return {"log_file": filename, "log_line": lineno}
def _prepare_properties(self, properties: Optional[Dict[str, Any]]) -> Dict[str, Any]:
props = {} if properties is None else properties.copy()
props_str = json.dumps(props, ensure_ascii=False) if props else "{}"
return props, props_str
async def log_event(self, sender_id: str, receiver_id: str, subject: str, event: str, properties: dict[str, any], text: str = "") -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event=event, properties=props, properties_str=props_str, **context)
local_logger.info(text)
async def log_exception(self, sender_id: str, receiver_id: str, subject: str, exception: Exception, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="exception", properties=props, properties_str=props_str, exception=exception, **context)
local_logger.exception(text)
async def log_info(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="information", properties=props, properties_str=props_str, **context)
local_logger.info(text)
async def log_warning(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="warning", properties=props, properties_str=props_str, **context)
local_logger.warning(text)
async def log_error(self, sender_id: str, receiver_id: str, subject: str, text: str = "", properties: dict[str, any] = None) -> None:
props, props_str = self._prepare_properties(properties)
context = self._get_log_context()
local_logger = self.logger.bind(sender_id=sender_id, receiver_id=receiver_id, subject=subject, event="error", properties=props, properties_str=props_str, **context)
local_logger.error(text)
@staticmethod
def configure_uvicorn_logging():
print("📢 Setting up uvicorn logging interception...")
# Intercept logs from these loggers
intercept_loggers = ["uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"]
class InterceptHandler(logging.Handler):
def emit(self, record):
level = (
guru_logger.level(record.levelname).name
if guru_logger.level(record.levelname, None)
else record.levelno
)
frame, depth = logging.currentframe(), 2
while frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
guru_logger.opt(depth=depth, exception=record.exc_info).log(
level,
f"[{record.name}] {record.getMessage()}",
)
# Replace default handlers
logging.root.handlers.clear()
logging.root.setLevel(logging.INFO)
logging.root.handlers = [InterceptHandler()]
# Configure specific uvicorn loggers
for logger_name in intercept_loggers:
logging_logger = logging.getLogger(logger_name)
logging_logger.handlers.clear() # Remove default handlers
logging_logger.propagate = True # Ensure propagation through Loguru

View File

@ -0,0 +1,84 @@
import json
import datetime
import traceback
from pathlib import Path
class JsonSink:
def __init__(
self,
log_file_path: str,
rotation_size_bytes: int = 10 * 1024 * 1024,
max_backup_files: int = 5,
):
self.log_file_path = Path(log_file_path)
self.rotation_size = rotation_size_bytes
self.max_backup_files = max_backup_files
self._open_log_file()
def _open_log_file(self):
# ensure the parent directory exists
parent_dir = self.log_file_path.parent
if not parent_dir.exists():
parent_dir.mkdir(parents=True, exist_ok=True)
self.log_file = self.log_file_path.open("a", encoding="utf-8")
def _should_rotate(self) -> bool:
return self.log_file_path.exists() and self.log_file_path.stat().st_size >= self.rotation_size
def _rotate(self):
self.log_file.close()
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
rotated_path = self.log_file_path.with_name(f"{self.log_file_path.stem}_{timestamp}{self.log_file_path.suffix}")
self.log_file_path.rename(rotated_path)
self._cleanup_old_backups()
self._open_log_file()
def _cleanup_old_backups(self):
parent = self.log_file_path.parent
stem = self.log_file_path.stem
suffix = self.log_file_path.suffix
backup_files = sorted(
parent.glob(f"{stem}_*{suffix}"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
for old_file in backup_files[self.max_backup_files:]:
try:
old_file.unlink()
except Exception as e:
print(f"Failed to delete old backup {old_file}: {e}")
def __call__(self, message):
record = message.record
if self._should_rotate():
self._rotate()
log_entry = {
"level": record["level"].name.lower(),
"timestamp": int(record["time"].timestamp() * 1000),
"text": record["message"],
"fields": record["extra"].get("properties", {}),
"context": {
"app": record["extra"].get("app"),
"env": record["extra"].get("env"),
"log_file": record["extra"].get("log_file"),
"log_line": record["extra"].get("log_line"),
"topic": record["extra"].get("topic"),
"sender_id": record["extra"].get("sender_id"),
"receiver_id": record["extra"].get("receiver_id"),
"subject": record["extra"].get("subject"),
"event": record["extra"].get("event"),
"host_ip": record["extra"].get("host_ip"),
"host_name": record["extra"].get("host_name"),
},
"stacktrace": None
}
if record["exception"]:
exc_type, exc_value, exc_tb = record["exception"]
log_entry["stacktrace"] = traceback.format_exception(exc_type, exc_value, exc_tb)
self.log_file.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
self.log_file.flush()

View File

@ -0,0 +1,46 @@
from .application_logger import ApplicationLogger
class ModuleLogger(ApplicationLogger):
def __init__(self, sender_id: str) -> None:
super().__init__()
self.event_sender_id = sender_id
self.event_receiver_id = "ModuleLogger"
self.event_subject = "module"
async def log_exception(self, exception: Exception, text: str = "Exception", properties: dict[str, any] = None) -> None:
return await super().log_exception(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
exception=exception,
text=text,
properties=properties,
)
async def log_info(self, text: str, data: dict[str, any] = None) -> None:
return await super().log_info(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=text,
properties=data,
)
async def log_warning(self, text: str, data: dict[str, any] = None) -> None:
return await super().log_warning(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=text,
properties=data,
)
async def log_error(self, text: str, data: dict[str, any] = None) -> None:
return await super().log_error(
sender_id=self.event_sender_id,
receiver_id=self.event_receiver_id,
subject=self.event_subject,
text=text,
properties=data,
)

View File

@ -0,0 +1,140 @@
import logging
from enum import Enum
from typing import Optional, Callable, Tuple, Dict
import inspect
from datetime import datetime, timezone
# ProbeType is an Enum that defines the types of probes that can be registered.
class ProbeType(Enum):
LIVENESS = "liveness"
READINESS = "readiness"
STARTUP = "startup"
# ProbeResult is a class that represents the result of a probe check.
class ProbeResult:
def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None):
self.success = success
self.message = message
self.data = data or {}
def to_dict(self) -> dict:
return {
"success": self.success,
"message": self.message,
"data": self.data
}
# Probe is a class that represents a probe that can be registered.
class Probe:
def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None):
self.type = type
self.path = path
self.check_fn = check_fn
self.name = name or f"{type.value}-{id(self)}"
async def execute(self) -> ProbeResult:
try:
result = self.check_fn()
if inspect.isawaitable(result):
result = await result
if isinstance(result, ProbeResult):
return result
elif isinstance(result, bool):
return ProbeResult(result, "ok" if result else "failed")
else:
return ProbeResult(True, "ok")
except Exception as e:
return ProbeResult(False, str(e))
# ProbeGroup is a class that represents a group of probes that can be checked together.
class ProbeGroup:
def __init__(self, path: str):
self.path = path
self.probes: Dict[str, Probe] = {}
def add_probe(self, probe: Probe):
self.probes[probe.name] = probe
async def check_all(self) -> Tuple[bool, dict]:
results = {}
all_success = True
for name, probe in self.probes.items():
result = await probe.execute()
results[name] = result.to_dict()
if not result.success:
all_success = False
return all_success, results
# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters.
class FrameworkAdapter:
async def handle_request(self, group: ProbeGroup):
all_success, results = await group.check_all()
status_code = 200 if all_success else 503
return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code
def register_route(self, path: str, handler: Callable):
raise NotImplementedError
# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters.
class ProbeManager:
_default_paths = {
ProbeType.LIVENESS: "/_/livez",
ProbeType.READINESS: "/_/readyz",
ProbeType.STARTUP: "/_/healthz"
}
def __init__(self):
self.groups: Dict[str, ProbeGroup] = {}
self.adapters: Dict[str, FrameworkAdapter] = {}
self._startup_complete = False
def register_adapter(self, framework: str, adapter: FrameworkAdapter):
self.adapters[framework] = adapter
logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}")
def register(
self,
type: ProbeType,
check_func: Optional[Callable] = None,
path: Optional[str] = None,
prefix: str = "",
name: Optional[str] = None,
frameworks: Optional[list] = None
):
path = path or self._default_paths.get(type, "/_/healthz")
if prefix:
path = f"{prefix}{path}"
if type == ProbeType.STARTUP and check_func is None:
check_func = self._default_startup_check
probe = Probe(type, path, check_func or (lambda: True), name)
if path not in self.groups:
self.groups[path] = ProbeGroup(path)
self.groups[path].add_probe(probe)
for framework in (frameworks or ["default"]):
self._register_route(framework, path)
logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}")
def _register_route(self, framework: str, path: str):
if framework not in self.adapters:
return
adapter = self.adapters[framework]
group = self.groups[path]
async def handler():
return await adapter.handle_request(group)
adapter.register_route(path, handler)
def _default_startup_check(self) -> bool:
return self._startup_complete
def mark_startup_complete(self):
self._startup_complete = True

View File

@ -0,0 +1,15 @@
from . import FrameworkAdapter
from fastapi.responses import JSONResponse
from typing import Callable
# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI.
class FastAPIAdapter(FrameworkAdapter):
def __init__(self, app):
self.app = app
def register_route(self,path: str, handler: Callable):
async def wrapper():
data, status_code = await handler()
return JSONResponse(content=data, status_code=status_code)
self.app.add_api_route(path, wrapper, methods=["GET"])

View File

@ -1,7 +1,7 @@
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from prometheus_fastapi_instrumentator import Instrumentator from prometheus_fastapi_instrumentator import Instrumentator
from webapi.config.app_settings import site_settings from common.config.app_settings import site_settings
from loguru import logger from loguru import logger
import os import os

View File

@ -0,0 +1,77 @@
import logging
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from common.config.app_settings import app_settings
from webapi.providers import exception_handler, common, probes, metrics, router
from webapi.providers.logger import register_logger
def create_app() -> FastAPI:
logging.info("App initializing")
app = FreeleapsMetricsApp()
register_logger()
register(app, exception_handler)
register(app, router)
register(app, common)
# Call the custom_openapi function to change the OpenAPI version
customize_openapi_security(app)
# Register probe APIs if enabled
if app_settings.PROBES_ENABLED:
register(app, probes)
# Register metrics APIs if enabled
if app_settings.METRICS_ENABLED:
register(app, metrics)
return app
# This function overrides the OpenAPI schema version to 3.0.0
def customize_openapi_security(app: FastAPI) -> None:
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
# Generate OpenAPI schema
openapi_schema = get_openapi(
title="FreeLeaps Metrics API",
version="3.1.0",
description="FreeLeaps Metrics API Documentation",
routes=app.routes,
)
# Ensure the components section exists in the OpenAPI schema
if "components" not in openapi_schema:
openapi_schema["components"] = {}
# Add security scheme to components
openapi_schema["components"]["securitySchemes"] = {
"bearerAuth": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}
}
# Add security requirement globally
openapi_schema["security"] = [{"bearerAuth": []}]
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
def register(app, provider):
logging.info(provider.__name__ + " registering")
provider.register(app)
def boot(app, provider):
logging.info(provider.__name__ + " booting")
provider.boot(app)
class FreeleapsMetricsApp(FastAPI):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

View File

@ -1,10 +1,9 @@
from webapi.bootstrap.app_factory import create_app from common.config.site_settings import site_settings
from webapi.config.app_settings import site_settings
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse
import uvicorn import uvicorn
from typing import Any
from webapi.routes import registration_metrics
from webapi.bootstrap.application import create_app
from webapi.routes.metrics import registration_metrics
app = create_app() app = create_app()
@ -24,13 +23,3 @@ if __name__ == "__main__":
uvicorn.run( uvicorn.run(
app="main:app", host=site_settings.SERVER_HOST, port=site_settings.SERVER_PORT app="main:app", host=site_settings.SERVER_HOST, port=site_settings.SERVER_PORT
) )
def get_context() -> Any:
# Define your context function. This is where you can set up authentication, database connections, etc.
return {}
def get_root_value() -> Any:
# Define your root value function. This is where you can set up the root value for GraphQL.
return {}

View File

@ -0,0 +1,31 @@
from fastapi.middleware.cors import CORSMiddleware
from common.config.site_settings import site_settings
def register(app):
app.debug = site_settings.DEBUG
app.title = site_settings.NAME
add_global_middleware(app)
# This hook ensures that a connection is opened to handle any queries
# generated by the request.
@app.on_event("startup")
async def startup():
pass
# This hook ensures that the connection is closed when we've finished
# processing the request.
@app.on_event("shutdown")
async def shutdown():
pass
def add_global_middleware(app):
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

View File

@ -0,0 +1,39 @@
from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.status import (
HTTP_400_BAD_REQUEST,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_404_NOT_FOUND,
HTTP_422_UNPROCESSABLE_ENTITY,
HTTP_500_INTERNAL_SERVER_ERROR,
)
async def custom_http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={"error": exc.detail},
)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=HTTP_400_BAD_REQUEST,
content={"error": str(exc)},
)
async def exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(exc)},
)
def register(app: FastAPI):
app.add_exception_handler(HTTPException, custom_http_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(Exception, exception_handler)

View File

@ -0,0 +1,7 @@
from common.log.base_logger import LoggerBase
def register_logger():
print("📢 Setting up logging interception...")
LoggerBase.configure_uvicorn_logging()
print("✅ Logging interception complete. Logs are formatted and deduplicated!")

View File

@ -0,0 +1,16 @@
import logging
from prometheus_fastapi_instrumentator import Instrumentator
from common.config.app_settings import app_settings
def register(app):
instrumentator = (
Instrumentator().instrument(
app,
metric_namespace="freeleaps-auth",
metric_subsystem=app_settings.APP_NAME)
)
@app.on_event("startup")
async def startup():
instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True)
logging.info("Metrics endpoint exposed at /api/_/metrics")

View File

@ -0,0 +1,24 @@
from common.probes import ProbeManager, ProbeType
from common.probes.adapters import FastAPIAdapter
def register(app):
probes_manager = ProbeManager()
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
async def readiness_checker():
return {"success": True, "message": "Ready"}
probes_manager.register(
name="readiness",
prefix="/api",
type=ProbeType.READINESS,
check_func=readiness_checker,
frameworks=["fastapi"]
)
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
@app.on_event("startup")
async def mark_startup_complete():
probes_manager.mark_startup_complete()

View File

@ -0,0 +1,34 @@
from webapi.routes import api_router
from starlette import routing
def register(app):
app.include_router(
api_router,
prefix="/api",
tags=["api"],
dependencies=[],
responses={404: {"description": "no page found"}},
)
if app.debug:
for route in app.routes:
if not isinstance(route, routing.WebSocketRoute):
print(
{
"path": route.path,
"endpoint": route.endpoint,
"name": route.name,
"methods": route.methods,
}
)
else:
print(
{
"path": route.path,
"endpoint": route.endpoint,
"name": route.name,
"type": "web socket route",
}
)

View File

@ -0,0 +1,5 @@
from fastapi import APIRouter
from webapi.routes.metrics import router
api_router = APIRouter()
api_router.include_router(router, tags=["metrics"])

View File

@ -0,0 +1,5 @@
from fastapi import APIRouter
from webapi.routes.metrics.registration_metrics import router
api_router = APIRouter()
api_router.include_router(router,prefix="/metrics", tags=["metrics"])

View File

@ -5,7 +5,7 @@ from loguru import logger
from backend.services.registration_analytics_service import RegistrationService from backend.services.registration_analytics_service import RegistrationService
from backend.models.user_registration_models import UserRegistrationResponse, UserRegistrationQuery from backend.models.user_registration_models import UserRegistrationResponse, UserRegistrationQuery
router = APIRouter(prefix="/api/metrics", tags=["registration"]) router = APIRouter(tags=["registration"])
# Initialize service # Initialize service
registration_service = RegistrationService() registration_service = RegistrationService()