feat(observability): add configuration options and implement metrics/probes APIs

Signed-off-by: zhenyus <zhenyus@mathmast.com>
This commit is contained in:
zhenyus 2025-03-17 10:46:02 +08:00
parent 590953f33c
commit d5e42d31a4
27 changed files with 882 additions and 15 deletions

View File

@ -6,6 +6,9 @@ class AppSettings(BaseSettings):
NAME: str = "central_storage" NAME: str = "central_storage"
APP_NAME:str = NAME APP_NAME:str = NAME
METRICS_ENABLED: bool = False
PROBES_ENABLED: bool = True
AZURE_STORAGE_DOCUMENT_API_ENDPOINT: str = "" AZURE_STORAGE_DOCUMENT_API_ENDPOINT: str = ""
AZURE_STORAGE_DOCUMENT_API_KEY: str = "" AZURE_STORAGE_DOCUMENT_API_KEY: str = ""

View File

@ -0,0 +1,140 @@
import logging
from enum import Enum
from typing import Optional, Callable, Tuple, Dict
import inspect
from datetime import datetime, timezone
# ProbeType is an Enum that defines the types of probes that can be registered.
class ProbeType(Enum):
LIVENESS = "liveness"
READINESS = "readiness"
STARTUP = "startup"
# ProbeResult is a class that represents the result of a probe check.
class ProbeResult:
def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None):
self.success = success
self.message = message
self.data = data or {}
def to_dict(self) -> dict:
return {
"success": self.success,
"message": self.message,
"data": self.data
}
# Probe is a class that represents a probe that can be registered.
class Probe:
def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None):
self.type = type
self.path = path
self.check_fn = check_fn
self.name = name or f"{type.value}-{id(self)}"
async def execute(self) -> ProbeResult:
try:
result = self.check_fn()
if inspect.isawaitable(result):
result = await result
if isinstance(result, ProbeResult):
return result
elif isinstance(result, bool):
return ProbeResult(result, "ok" if result else "failed")
else:
return ProbeResult(True, "ok")
except Exception as e:
return ProbeResult(False, str(e))
# ProbeGroup is a class that represents a group of probes that can be checked together.
class ProbeGroup:
def __init__(self, path: str):
self.path = path
self.probes: Dict[str, Probe] = {}
def add_probe(self, probe: Probe):
self.probes[probe.name] = probe
async def check_all(self) -> Tuple[bool, dict]:
results = {}
all_success = True
for name, probe in self.probes.items():
result = await probe.execute()
results[name] = result.to_dict()
if not result.success:
all_success = False
return all_success, results
# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters.
class FrameworkAdapter:
async def handle_request(self, group: ProbeGroup):
all_success, results = await group.check_all()
status_code = 200 if all_success else 503
return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code
def register_route(self, path: str, handler: Callable):
raise NotImplementedError
# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters.
class ProbeManager:
_default_paths = {
ProbeType.LIVENESS: "/_/livez",
ProbeType.READINESS: "/_/readyz",
ProbeType.STARTUP: "/_/healthz"
}
def __init__(self):
self.groups: Dict[str, ProbeGroup] = {}
self.adapters: Dict[str, FrameworkAdapter] = {}
self._startup_complete = False
def register_adapter(self, framework: str, adapter: FrameworkAdapter):
self.adapters[framework] = adapter
logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}")
def register(
self,
type: ProbeType,
check_func: Optional[Callable] = None,
path: Optional[str] = None,
prefix: str = "",
name: Optional[str] = None,
frameworks: Optional[list] = None
):
path = path or self._default_paths.get(type, "/_/healthz")
if prefix:
path = f"{prefix}{path}"
if type == ProbeType.STARTUP and check_func is None:
check_func = self._default_startup_check
probe = Probe(type, path, check_func or (lambda: True), name)
if path not in self.groups:
self.groups[path] = ProbeGroup(path)
self.groups[path].add_probe(probe)
for framework in (frameworks or ["default"]):
self._register_route(framework, path)
logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}")
def _register_route(self, framework: str, path: str):
if framework not in self.adapters:
return
adapter = self.adapters[framework]
group = self.groups[path]
async def handler():
return await adapter.handle_request(group)
adapter.register_route(path, handler)
def _default_startup_check(self) -> bool:
return self._startup_complete
def mark_startup_complete(self):
self._startup_complete = True

View File

@ -0,0 +1,15 @@
from . import FrameworkAdapter
from fastapi.responses import JSONResponse
from typing import Callable
# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI.
class FastAPIAdapter(FrameworkAdapter):
def __init__(self, app):
self.app = app
def register_route(self,path: str, handler: Callable):
async def wrapper():
data, status_code = await handler()
return JSONResponse(content=data, status_code=status_code)
self.app.add_api_route(path, wrapper, methods=["GET"])

View File

@ -8,7 +8,10 @@ from webapi.providers import router
from webapi.providers import database from webapi.providers import database
from webapi.providers import scheduler from webapi.providers import scheduler
from webapi.providers import exception_handler from webapi.providers import exception_handler
from webapi.providers import probes
from webapi.providers import metrics
from .freeleaps_app import FreeleapsApp from .freeleaps_app import FreeleapsApp
from common.config.app_settings import app_settings
def create_app() -> FastAPI: def create_app() -> FastAPI:
@ -25,6 +28,15 @@ def create_app() -> FastAPI:
# Call the custom_openapi function to change the OpenAPI version # Call the custom_openapi function to change the OpenAPI version
customize_openapi_security(app) customize_openapi_security(app)
# Register probe APIs if enabled
if app_settings.PROBES_ENABLED:
register(app, probes)
# Register metrics APIs if enabled
if app_settings.METRICS_ENABLED:
register(app, metrics)
return app return app

View File

@ -2,7 +2,10 @@ from webapi.config.site_settings import site_settings
from beanie import init_beanie from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient from motor.motor_asyncio import AsyncIOMotorClient
from backend.models import backend_models from backend.models import backend_models
from common.probes import ProbeResult
import asyncio
client = AsyncIOMotorClient(site_settings.MONGODB_URI, serverSelectionTimeoutMS=60000)
def register(app): def register(app):
app.debug = "mongo_debug" app.debug = "mongo_debug"
@ -12,9 +15,14 @@ def register(app):
async def start_database(): async def start_database():
await initiate_database() await initiate_database()
async def check_database_initialized() -> ProbeResult:
try:
await asyncio.wait_for(client.server_info(), timeout=5)
return ProbeResult(success=True, message="service has been initialized and ready to serve")
except Exception:
return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"})
async def initiate_database(): async def initiate_database():
client = AsyncIOMotorClient(site_settings.MONGODB_URI, serverSelectionTimeoutMS=60000)
await init_beanie( await init_beanie(
database=client[site_settings.MONGODB_NAME], document_models=backend_models database=client[site_settings.MONGODB_NAME], document_models=backend_models
) )

View File

@ -0,0 +1,13 @@
import logging
from prometheus_fastapi_instrumentator import Instrumentator
from common.config.app_settings import app_settings
def register(app):
instrumentator = Instrumentator().instrument(app,
metric_namespace="freeleaps",
metric_subsystem=app_settings.APP_NAME)
@app.on_event("startup")
async def startup():
instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True)
logging.info("Metrics endpoint exposed at /api/_/metrics")

View File

@ -0,0 +1,25 @@
from common.probes import ProbeManager, ProbeType
from common.probes.adapters import FastAPIAdapter
from .database import check_database_initialized
def register(app):
probes_manager = ProbeManager()
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
async def readiness_checker():
return await check_database_initialized()
probes_manager.register(
name="readiness",
prefix="/api",
type=ProbeType.READINESS,
check_func=readiness_checker,
frameworks=["fastapi"]
)
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
@app.on_event("startup")
async def mark_startup_complete():
probes_manager.mark_startup_complete()

View File

@ -5,6 +5,9 @@ class AppSettings(BaseSettings):
NAME: str = "content" NAME: str = "content"
APP_NAME:str = NAME APP_NAME:str = NAME
METRICS_ENABLED: bool = False
PROBES_ENABLED: bool = True
FREELEAPS_WWW_AS_AZURE_CLIENT_SECRET: str = "" FREELEAPS_WWW_AS_AZURE_CLIENT_SECRET: str = ""
CENTRAL_STORAGE_WEBAPI_URL_BASE:str ="" CENTRAL_STORAGE_WEBAPI_URL_BASE:str =""

View File

@ -0,0 +1,140 @@
import logging
from enum import Enum
from typing import Optional, Callable, Tuple, Dict
import inspect
from datetime import datetime, timezone
# ProbeType is an Enum that defines the types of probes that can be registered.
class ProbeType(Enum):
LIVENESS = "liveness"
READINESS = "readiness"
STARTUP = "startup"
# ProbeResult is a class that represents the result of a probe check.
class ProbeResult:
def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None):
self.success = success
self.message = message
self.data = data or {}
def to_dict(self) -> dict:
return {
"success": self.success,
"message": self.message,
"data": self.data
}
# Probe is a class that represents a probe that can be registered.
class Probe:
def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None):
self.type = type
self.path = path
self.check_fn = check_fn
self.name = name or f"{type.value}-{id(self)}"
async def execute(self) -> ProbeResult:
try:
result = self.check_fn()
if inspect.isawaitable(result):
result = await result
if isinstance(result, ProbeResult):
return result
elif isinstance(result, bool):
return ProbeResult(result, "ok" if result else "failed")
else:
return ProbeResult(True, "ok")
except Exception as e:
return ProbeResult(False, str(e))
# ProbeGroup is a class that represents a group of probes that can be checked together.
class ProbeGroup:
def __init__(self, path: str):
self.path = path
self.probes: Dict[str, Probe] = {}
def add_probe(self, probe: Probe):
self.probes[probe.name] = probe
async def check_all(self) -> Tuple[bool, dict]:
results = {}
all_success = True
for name, probe in self.probes.items():
result = await probe.execute()
results[name] = result.to_dict()
if not result.success:
all_success = False
return all_success, results
# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters.
class FrameworkAdapter:
async def handle_request(self, group: ProbeGroup):
all_success, results = await group.check_all()
status_code = 200 if all_success else 503
return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code
def register_route(self, path: str, handler: Callable):
raise NotImplementedError
# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters.
class ProbeManager:
_default_paths = {
ProbeType.LIVENESS: "/_/livez",
ProbeType.READINESS: "/_/readyz",
ProbeType.STARTUP: "/_/healthz"
}
def __init__(self):
self.groups: Dict[str, ProbeGroup] = {}
self.adapters: Dict[str, FrameworkAdapter] = {}
self._startup_complete = False
def register_adapter(self, framework: str, adapter: FrameworkAdapter):
self.adapters[framework] = adapter
logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}")
def register(
self,
type: ProbeType,
check_func: Optional[Callable] = None,
path: Optional[str] = None,
prefix: str = "",
name: Optional[str] = None,
frameworks: Optional[list] = None
):
path = path or self._default_paths.get(type, "/_/healthz")
if prefix:
path = f"{prefix}{path}"
if type == ProbeType.STARTUP and check_func is None:
check_func = self._default_startup_check
probe = Probe(type, path, check_func or (lambda: True), name)
if path not in self.groups:
self.groups[path] = ProbeGroup(path)
self.groups[path].add_probe(probe)
for framework in (frameworks or ["default"]):
self._register_route(framework, path)
logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}")
def _register_route(self, framework: str, path: str):
if framework not in self.adapters:
return
adapter = self.adapters[framework]
group = self.groups[path]
async def handler():
return await adapter.handle_request(group)
adapter.register_route(path, handler)
def _default_startup_check(self) -> bool:
return self._startup_complete
def mark_startup_complete(self):
self._startup_complete = True

View File

@ -0,0 +1,15 @@
from . import FrameworkAdapter
from fastapi.responses import JSONResponse
from typing import Callable
# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI.
class FastAPIAdapter(FrameworkAdapter):
def __init__(self, app):
self.app = app
def register_route(self,path: str, handler: Callable):
async def wrapper():
data, status_code = await handler()
return JSONResponse(content=data, status_code=status_code)
self.app.add_api_route(path, wrapper, methods=["GET"])

View File

@ -2,13 +2,23 @@ from common.config.app_settings import app_settings
from beanie import init_beanie from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient from motor.motor_asyncio import AsyncIOMotorClient
from database.mongo.models import mongo_models from database.mongo.models import mongo_models
import asyncio
from common.probes import ProbeResult
client = AsyncIOMotorClient(app_settings.MONGODB_URI, serverSelectionTimeoutMS=60000)
async def check_database_initialized() -> ProbeResult:
try:
await asyncio.wait_for(client.server_info(), timeout=5)
return ProbeResult(success=True, message="service has been initialized and ready to serve")
except Exception:
return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"})
class MongoDriver: class MongoDriver:
def __init__(self): def __init__(self):
pass pass
async def initiate_database(self): async def initiate_database(self):
client = AsyncIOMotorClient(app_settings.MONGODB_URI, serverSelectionTimeoutMS=60000)
await init_beanie( await init_beanie(
database=client[app_settings.MONGODB_NAME], document_models=mongo_models database=client[app_settings.MONGODB_NAME], document_models=mongo_models
) )

View File

@ -9,7 +9,10 @@ from webapi.providers import router
from webapi.providers import database from webapi.providers import database
from webapi.providers import scheduler from webapi.providers import scheduler
from webapi.providers import exception_handler from webapi.providers import exception_handler
from webapi.providers import probes
from webapi.providers import metrics
from .freeleaps_app import FreeleapsApp from .freeleaps_app import FreeleapsApp
from common.config.app_settings import app_settings
def create_app() -> FastAPI: def create_app() -> FastAPI:
@ -26,6 +29,14 @@ def create_app() -> FastAPI:
# Call the custom_openapi function to change the OpenAPI version # Call the custom_openapi function to change the OpenAPI version
customize_openapi_security(app) customize_openapi_security(app)
# Register probe APIs if enabled
if app_settings.PROBES_ENABLED:
register(app, probes)
# Register metrics APIs if enabled
if app_settings.METRICS_ENABLED:
register(app, metrics)
return app return app

View File

@ -0,0 +1,13 @@
import logging
from prometheus_fastapi_instrumentator import Instrumentator
from common.config.app_settings import app_settings
def register(app):
instrumentator = Instrumentator().instrument(app,
metric_namespace="freeleaps",
metric_subsystem=app_settings.APP_NAME)
@app.on_event("startup")
async def startup():
instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True)
logging.info("Metrics endpoint exposed at /api/_/metrics")

View File

@ -0,0 +1,25 @@
from common.probes import ProbeManager, ProbeType
from common.probes.adapters import FastAPIAdapter
from database.mongo.mongo_driver import check_database_initialized
def register(app):
probes_manager = ProbeManager()
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
async def readiness_checker():
return await check_database_initialized()
probes_manager.register(
name="readiness",
prefix="/api",
type=ProbeType.READINESS,
check_func=readiness_checker,
frameworks=["fastapi"]
)
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
@app.on_event("startup")
async def mark_startup_complete():
probes_manager.mark_startup_complete()

View File

@ -6,6 +6,9 @@ class AppSettings(BaseSettings):
NAME: str = "notification" NAME: str = "notification"
APP_NAME:str = NAME APP_NAME:str = NAME
METRICS_ENABLED: bool = False
PROBES_ENABLED: bool = True
RABBITMQ_HOST: str = "" RABBITMQ_HOST: str = ""
RABBITMQ_PORT: int = 5672 RABBITMQ_PORT: int = 5672
RABBITMQ_USERNAME: str = "" RABBITMQ_USERNAME: str = ""

View File

@ -0,0 +1,140 @@
import logging
from enum import Enum
from typing import Optional, Callable, Tuple, Dict
import inspect
from datetime import datetime, timezone
# ProbeType is an Enum that defines the types of probes that can be registered.
class ProbeType(Enum):
LIVENESS = "liveness"
READINESS = "readiness"
STARTUP = "startup"
# ProbeResult is a class that represents the result of a probe check.
class ProbeResult:
def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None):
self.success = success
self.message = message
self.data = data or {}
def to_dict(self) -> dict:
return {
"success": self.success,
"message": self.message,
"data": self.data
}
# Probe is a class that represents a probe that can be registered.
class Probe:
def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None):
self.type = type
self.path = path
self.check_fn = check_fn
self.name = name or f"{type.value}-{id(self)}"
async def execute(self) -> ProbeResult:
try:
result = self.check_fn()
if inspect.isawaitable(result):
result = await result
if isinstance(result, ProbeResult):
return result
elif isinstance(result, bool):
return ProbeResult(result, "ok" if result else "failed")
else:
return ProbeResult(True, "ok")
except Exception as e:
return ProbeResult(False, str(e))
# ProbeGroup is a class that represents a group of probes that can be checked together.
class ProbeGroup:
def __init__(self, path: str):
self.path = path
self.probes: Dict[str, Probe] = {}
def add_probe(self, probe: Probe):
self.probes[probe.name] = probe
async def check_all(self) -> Tuple[bool, dict]:
results = {}
all_success = True
for name, probe in self.probes.items():
result = await probe.execute()
results[name] = result.to_dict()
if not result.success:
all_success = False
return all_success, results
# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters.
class FrameworkAdapter:
async def handle_request(self, group: ProbeGroup):
all_success, results = await group.check_all()
status_code = 200 if all_success else 503
return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code
def register_route(self, path: str, handler: Callable):
raise NotImplementedError
# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters.
class ProbeManager:
_default_paths = {
ProbeType.LIVENESS: "/_/livez",
ProbeType.READINESS: "/_/readyz",
ProbeType.STARTUP: "/_/healthz"
}
def __init__(self):
self.groups: Dict[str, ProbeGroup] = {}
self.adapters: Dict[str, FrameworkAdapter] = {}
self._startup_complete = False
def register_adapter(self, framework: str, adapter: FrameworkAdapter):
self.adapters[framework] = adapter
logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}")
def register(
self,
type: ProbeType,
check_func: Optional[Callable] = None,
path: Optional[str] = None,
prefix: str = "",
name: Optional[str] = None,
frameworks: Optional[list] = None
):
path = path or self._default_paths.get(type, "/_/healthz")
if prefix:
path = f"{prefix}{path}"
if type == ProbeType.STARTUP and check_func is None:
check_func = self._default_startup_check
probe = Probe(type, path, check_func or (lambda: True), name)
if path not in self.groups:
self.groups[path] = ProbeGroup(path)
self.groups[path].add_probe(probe)
for framework in (frameworks or ["default"]):
self._register_route(framework, path)
logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}")
def _register_route(self, framework: str, path: str):
if framework not in self.adapters:
return
adapter = self.adapters[framework]
group = self.groups[path]
async def handler():
return await adapter.handle_request(group)
adapter.register_route(path, handler)
def _default_startup_check(self) -> bool:
return self._startup_complete
def mark_startup_complete(self):
self._startup_complete = True

View File

@ -0,0 +1,15 @@
from . import FrameworkAdapter
from fastapi.responses import JSONResponse
from typing import Callable
# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI.
class FastAPIAdapter(FrameworkAdapter):
def __init__(self, app):
self.app = app
def register_route(self,path: str, handler: Callable):
async def wrapper():
data, status_code = await handler()
return JSONResponse(content=data, status_code=status_code)
self.app.add_api_route(path, wrapper, methods=["GET"])

View File

@ -9,7 +9,10 @@ from webapi.providers import database
from webapi.providers import scheduler from webapi.providers import scheduler
from webapi.providers import message_queue from webapi.providers import message_queue
from webapi.providers import exception_handler from webapi.providers import exception_handler
from webapi.providers import probes
from webapi.providers import metrics
from .freeleaps_app import FreeleapsApp from .freeleaps_app import FreeleapsApp
from common.config import app_settings
def create_app() -> FastAPI: def create_app() -> FastAPI:
@ -19,6 +22,14 @@ def create_app() -> FastAPI:
register_logger() register_logger()
register(app, exception_handler) register(app, exception_handler)
# Register probe APIs if enabled
if app_settings.PROBES_ENABLED:
register(app, probes)
# Register metrics APIs if enabled
if app_settings.METRICS_ENABLED:
register(app, metrics)
register(app, database) register(app, database)
register(app, router) register(app, router)
register(app, scheduler) register(app, scheduler)

View File

@ -0,0 +1,13 @@
import logging
from prometheus_fastapi_instrumentator import Instrumentator
from common.config.app_settings import app_settings
def register(app):
instrumentator = Instrumentator().instrument(app,
metric_namespace="freeleaps",
metric_subsystem=app_settings.APP_NAME)
@app.on_event("startup")
async def startup():
instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True)
logging.info("Metrics endpoint exposed at /api/_/metrics")

View File

@ -0,0 +1,44 @@
import asyncio
import socket
from common.probes import ProbeManager, ProbeType, ProbeResult
from common.probes.adapters import FastAPIAdapter
from common.config.app_settings import app_settings
async def check_rabbitmq_connectivity() -> ProbeResult:
"""
Check if RabbitMQ server is accessible by attempting a TCP connection.
Does not require any RabbitMQ libraries.
Returns a ProbeResult with success=True if the connection is successful.
"""
try:
_, writer = await asyncio.wait_for(
asyncio.open_connection(app_settings.RABBITMQ_HOST, app_settings.RABBITMQ_PORT),
timeout=5
)
writer.close()
await writer.wait_closed()
return ProbeResult(success=True)
except (socket.error, asyncio.TimeoutError, ConnectionRefusedError) as e:
return ProbeResult(success=False, error=f"RabbitMQ connection failed: {str(e)}")
except Exception as e:
return ProbeResult(success=False, error=f"Unexpected error when checking RabbitMQ: {str(e)}")
def register(app):
probes_manager = ProbeManager()
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
probes_manager.register(
name="readiness",
prefix="/api",
type=ProbeType.READINESS,
check_func=lambda: check_rabbitmq_connectivity(),
frameworks=["fastapi"]
)
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
@app.on_event("startup")
async def mark_startup_complete():
probes_manager.mark_startup_complete()

View File

@ -9,6 +9,9 @@ class AppSettings(BaseSettings):
ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600 ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600
REFRESH_TOKEN_EXPIRE_DAYS: int = 1 REFRESH_TOKEN_EXPIRE_DAYS: int = 1
METRICS_ENABLED: bool = False
PROBES_ENABLED: bool = True
MONGODB_URI: str = "" MONGODB_URI: str = ""
MONGODB_NAME: str = "" MONGODB_NAME: str = ""

View File

@ -0,0 +1,140 @@
import logging
from enum import Enum
from typing import Optional, Callable, Tuple, Dict
import inspect
from datetime import datetime, timezone
# ProbeType is an Enum that defines the types of probes that can be registered.
class ProbeType(Enum):
LIVENESS = "liveness"
READINESS = "readiness"
STARTUP = "startup"
# ProbeResult is a class that represents the result of a probe check.
class ProbeResult:
def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None):
self.success = success
self.message = message
self.data = data or {}
def to_dict(self) -> dict:
return {
"success": self.success,
"message": self.message,
"data": self.data
}
# Probe is a class that represents a probe that can be registered.
class Probe:
def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None):
self.type = type
self.path = path
self.check_fn = check_fn
self.name = name or f"{type.value}-{id(self)}"
async def execute(self) -> ProbeResult:
try:
result = self.check_fn()
if inspect.isawaitable(result):
result = await result
if isinstance(result, ProbeResult):
return result
elif isinstance(result, bool):
return ProbeResult(result, "ok" if result else "failed")
else:
return ProbeResult(True, "ok")
except Exception as e:
return ProbeResult(False, str(e))
# ProbeGroup is a class that represents a group of probes that can be checked together.
class ProbeGroup:
def __init__(self, path: str):
self.path = path
self.probes: Dict[str, Probe] = {}
def add_probe(self, probe: Probe):
self.probes[probe.name] = probe
async def check_all(self) -> Tuple[bool, dict]:
results = {}
all_success = True
for name, probe in self.probes.items():
result = await probe.execute()
results[name] = result.to_dict()
if not result.success:
all_success = False
return all_success, results
# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters.
class FrameworkAdapter:
async def handle_request(self, group: ProbeGroup):
all_success, results = await group.check_all()
status_code = 200 if all_success else 503
return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code
def register_route(self, path: str, handler: Callable):
raise NotImplementedError
# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters.
class ProbeManager:
_default_paths = {
ProbeType.LIVENESS: "/_/livez",
ProbeType.READINESS: "/_/readyz",
ProbeType.STARTUP: "/_/healthz"
}
def __init__(self):
self.groups: Dict[str, ProbeGroup] = {}
self.adapters: Dict[str, FrameworkAdapter] = {}
self._startup_complete = False
def register_adapter(self, framework: str, adapter: FrameworkAdapter):
self.adapters[framework] = adapter
logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}")
def register(
self,
type: ProbeType,
check_func: Optional[Callable] = None,
path: Optional[str] = None,
prefix: str = "",
name: Optional[str] = None,
frameworks: Optional[list] = None
):
path = path or self._default_paths.get(type, "/_/healthz")
if prefix:
path = f"{prefix}{path}"
if type == ProbeType.STARTUP and check_func is None:
check_func = self._default_startup_check
probe = Probe(type, path, check_func or (lambda: True), name)
if path not in self.groups:
self.groups[path] = ProbeGroup(path)
self.groups[path].add_probe(probe)
for framework in (frameworks or ["default"]):
self._register_route(framework, path)
logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}")
def _register_route(self, framework: str, path: str):
if framework not in self.adapters:
return
adapter = self.adapters[framework]
group = self.groups[path]
async def handler():
return await adapter.handle_request(group)
adapter.register_route(path, handler)
def _default_startup_check(self) -> bool:
return self._startup_complete
def mark_startup_complete(self):
self._startup_complete = True

View File

@ -0,0 +1,15 @@
from . import FrameworkAdapter
from fastapi.responses import JSONResponse
from typing import Callable
# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI.
class FastAPIAdapter(FrameworkAdapter):
def __init__(self, app):
self.app = app
def register_route(self,path: str, handler: Callable):
async def wrapper():
data, status_code = await handler()
return JSONResponse(content=data, status_code=status_code)
self.app.add_api_route(path, wrapper, methods=["GET"])

View File

@ -6,14 +6,12 @@ from webapi.providers import common
from webapi.providers.logger import register_logger from webapi.providers.logger import register_logger
from webapi.providers import router from webapi.providers import router
from webapi.providers import database from webapi.providers import database
# from webapi.providers import scheduler # from webapi.providers import scheduler
from webapi.providers import metrics
from webapi.providers import probes
from webapi.providers import exception_handler from webapi.providers import exception_handler
from .freeleaps_app import FreeleapsApp from .freeleaps_app import FreeleapsApp
from common.config import app_settings
# prometheus
from prometheus_fastapi_instrumentator import Instrumentator
def create_app() -> FastAPI: def create_app() -> FastAPI:
logging.info("App initializing") logging.info("App initializing")
@ -29,8 +27,13 @@ def create_app() -> FastAPI:
# Call the custom_openapi function to change the OpenAPI version # Call the custom_openapi function to change the OpenAPI version
customize_openapi_security(app) customize_openapi_security(app)
# expose prometheus metrics # Register probe APIs if enabled
Instrumentator().instrument(app).expose(app) if app_settings.PROBES_ENABLED:
register(app, probes)
# Register metrics APIs if enabled
if app_settings.METRICS_ENABLED:
register(app, metrics)
return app return app

View File

@ -1,8 +1,16 @@
import asyncio
from common.config.app_settings import app_settings from common.config.app_settings import app_settings
from beanie import init_beanie from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient from motor.motor_asyncio import AsyncIOMotorClient
from backend.models import backend_models from backend.models import backend_models
from common.probes import ProbeResult
client = AsyncIOMotorClient(
app_settings.MONGODB_URI,
serverSelectionTimeoutMS=60000,
minPoolSize=5, # Minimum number of connections in the pool
maxPoolSize=20, # Maximum number of connections in the pool
)
def register(app): def register(app):
app.debug = "auth_mongo_debug" app.debug = "auth_mongo_debug"
@ -12,14 +20,15 @@ def register(app):
async def start_database(): async def start_database():
await initiate_database() await initiate_database()
async def check_database_initialized() -> ProbeResult:
try:
await asyncio.wait_for(client.server_info(), timeout=5)
return ProbeResult(success=True, message="service has been initialized and ready to serve")
except Exception:
return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"})
async def initiate_database(): async def initiate_database():
client = AsyncIOMotorClient(
app_settings.MONGODB_URI,
serverSelectionTimeoutMS=60000,
minPoolSize=5, # Minimum number of connections in the pool
maxPoolSize=20, # Maximum number of connections in the pool
)
await init_beanie( await init_beanie(
database=client[app_settings.MONGODB_NAME], document_models=backend_models database=client[app_settings.MONGODB_NAME], document_models=backend_models
) )

View File

@ -0,0 +1,13 @@
import logging
from prometheus_fastapi_instrumentator import Instrumentator
from common.config.app_settings import app_settings
def register(app):
instrumentator = Instrumentator().instrument(app,
metric_namespace="freeleaps",
metric_subsystem=app_settings.APP_NAME)
@app.on_event("startup")
async def startup():
instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True)
logging.info("Metrics endpoint exposed at /api/_/metrics")

View File

@ -0,0 +1,25 @@
from common.probes import ProbeManager, ProbeType
from common.probes.adapters import FastAPIAdapter
from .database import check_database_initialized
def register(app):
probes_manager = ProbeManager()
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
async def readiness_checker():
return await check_database_initialized()
probes_manager.register(
name="readiness",
prefix="/api",
type=ProbeType.READINESS,
check_func=readiness_checker,
frameworks=["fastapi"]
)
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
@app.on_event("startup")
async def mark_startup_complete():
probes_manager.mark_startup_complete()