Merged PR 38: feat(probes): add metrics and probes APIs for application health checks
Provides metrics API and Probes APIs (readiness, liveness, startup): - Metrics Endpoint: /api/_/metrics - Readiness Probe Endpoint: /api/_/readyz - Liveness Probe Endpoint: /api/_/livez - Startup Probe Endpoint: /api/_/healthz User can controls these APIs accessible or not through these environment variables: - METRICS_ENABLED: defaults to False - PROBES_ENABLED: defaults to True
This commit is contained in:
commit
590953f33c
@ -5,6 +5,9 @@ class AppSettings(BaseSettings):
|
|||||||
NAME: str = "authentication"
|
NAME: str = "authentication"
|
||||||
APP_NAME: str = NAME
|
APP_NAME: str = NAME
|
||||||
|
|
||||||
|
METRICS_ENABLED: bool = False
|
||||||
|
PROBES_ENABLED: bool = True
|
||||||
|
|
||||||
JWT_SECRET_KEY: str = ""
|
JWT_SECRET_KEY: str = ""
|
||||||
JWT_ALGORITHM: str = "HS256"
|
JWT_ALGORITHM: str = "HS256"
|
||||||
|
|
||||||
|
|||||||
140
apps/authentication/common/probes/__init__.py
Normal file
140
apps/authentication/common/probes/__init__.py
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
import logging
|
||||||
|
from enum import Enum
|
||||||
|
from typing import Optional, Callable, Tuple, Dict
|
||||||
|
import inspect
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
# ProbeType is an Enum that defines the types of probes that can be registered.
|
||||||
|
class ProbeType(Enum):
|
||||||
|
LIVENESS = "liveness"
|
||||||
|
READINESS = "readiness"
|
||||||
|
STARTUP = "startup"
|
||||||
|
|
||||||
|
# ProbeResult is a class that represents the result of a probe check.
|
||||||
|
class ProbeResult:
|
||||||
|
def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None):
|
||||||
|
self.success = success
|
||||||
|
self.message = message
|
||||||
|
self.data = data or {}
|
||||||
|
|
||||||
|
def to_dict(self) -> dict:
|
||||||
|
return {
|
||||||
|
"success": self.success,
|
||||||
|
"message": self.message,
|
||||||
|
"data": self.data
|
||||||
|
}
|
||||||
|
|
||||||
|
# Probe is a class that represents a probe that can be registered.
|
||||||
|
class Probe:
|
||||||
|
def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None):
|
||||||
|
self.type = type
|
||||||
|
self.path = path
|
||||||
|
self.check_fn = check_fn
|
||||||
|
self.name = name or f"{type.value}-{id(self)}"
|
||||||
|
|
||||||
|
async def execute(self) -> ProbeResult:
|
||||||
|
try:
|
||||||
|
result = self.check_fn()
|
||||||
|
if inspect.isawaitable(result):
|
||||||
|
result = await result
|
||||||
|
|
||||||
|
if isinstance(result, ProbeResult):
|
||||||
|
return result
|
||||||
|
elif isinstance(result, bool):
|
||||||
|
return ProbeResult(result, "ok" if result else "failed")
|
||||||
|
else:
|
||||||
|
return ProbeResult(True, "ok")
|
||||||
|
except Exception as e:
|
||||||
|
return ProbeResult(False, str(e))
|
||||||
|
|
||||||
|
# ProbeGroup is a class that represents a group of probes that can be checked together.
|
||||||
|
class ProbeGroup:
|
||||||
|
def __init__(self, path: str):
|
||||||
|
self.path = path
|
||||||
|
self.probes: Dict[str, Probe] = {}
|
||||||
|
|
||||||
|
def add_probe(self, probe: Probe):
|
||||||
|
self.probes[probe.name] = probe
|
||||||
|
|
||||||
|
async def check_all(self) -> Tuple[bool, dict]:
|
||||||
|
results = {}
|
||||||
|
all_success = True
|
||||||
|
|
||||||
|
for name, probe in self.probes.items():
|
||||||
|
result = await probe.execute()
|
||||||
|
results[name] = result.to_dict()
|
||||||
|
if not result.success:
|
||||||
|
all_success = False
|
||||||
|
|
||||||
|
return all_success, results
|
||||||
|
|
||||||
|
# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters.
|
||||||
|
class FrameworkAdapter:
|
||||||
|
async def handle_request(self, group: ProbeGroup):
|
||||||
|
all_success, results = await group.check_all()
|
||||||
|
status_code = 200 if all_success else 503
|
||||||
|
return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code
|
||||||
|
|
||||||
|
def register_route(self, path: str, handler: Callable):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters.
|
||||||
|
class ProbeManager:
|
||||||
|
_default_paths = {
|
||||||
|
ProbeType.LIVENESS: "/_/livez",
|
||||||
|
ProbeType.READINESS: "/_/readyz",
|
||||||
|
ProbeType.STARTUP: "/_/healthz"
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.groups: Dict[str, ProbeGroup] = {}
|
||||||
|
self.adapters: Dict[str, FrameworkAdapter] = {}
|
||||||
|
self._startup_complete = False
|
||||||
|
|
||||||
|
def register_adapter(self, framework: str, adapter: FrameworkAdapter):
|
||||||
|
self.adapters[framework] = adapter
|
||||||
|
logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}")
|
||||||
|
|
||||||
|
def register(
|
||||||
|
self,
|
||||||
|
type: ProbeType,
|
||||||
|
check_func: Optional[Callable] = None,
|
||||||
|
path: Optional[str] = None,
|
||||||
|
prefix: str = "",
|
||||||
|
name: Optional[str] = None,
|
||||||
|
frameworks: Optional[list] = None
|
||||||
|
):
|
||||||
|
path = path or self._default_paths.get(type, "/_/healthz")
|
||||||
|
if prefix:
|
||||||
|
path = f"{prefix}{path}"
|
||||||
|
|
||||||
|
if type == ProbeType.STARTUP and check_func is None:
|
||||||
|
check_func = self._default_startup_check
|
||||||
|
|
||||||
|
probe = Probe(type, path, check_func or (lambda: True), name)
|
||||||
|
|
||||||
|
if path not in self.groups:
|
||||||
|
self.groups[path] = ProbeGroup(path)
|
||||||
|
self.groups[path].add_probe(probe)
|
||||||
|
|
||||||
|
for framework in (frameworks or ["default"]):
|
||||||
|
self._register_route(framework, path)
|
||||||
|
logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}")
|
||||||
|
|
||||||
|
def _register_route(self, framework: str, path: str):
|
||||||
|
if framework not in self.adapters:
|
||||||
|
return
|
||||||
|
|
||||||
|
adapter = self.adapters[framework]
|
||||||
|
group = self.groups[path]
|
||||||
|
|
||||||
|
async def handler():
|
||||||
|
return await adapter.handle_request(group)
|
||||||
|
|
||||||
|
adapter.register_route(path, handler)
|
||||||
|
|
||||||
|
def _default_startup_check(self) -> bool:
|
||||||
|
return self._startup_complete
|
||||||
|
|
||||||
|
def mark_startup_complete(self):
|
||||||
|
self._startup_complete = True
|
||||||
15
apps/authentication/common/probes/adapters.py
Normal file
15
apps/authentication/common/probes/adapters.py
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
from . import FrameworkAdapter
|
||||||
|
from fastapi.responses import JSONResponse
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI.
|
||||||
|
class FastAPIAdapter(FrameworkAdapter):
|
||||||
|
def __init__(self, app):
|
||||||
|
self.app = app
|
||||||
|
|
||||||
|
def register_route(self,path: str, handler: Callable):
|
||||||
|
async def wrapper():
|
||||||
|
data, status_code = await handler()
|
||||||
|
return JSONResponse(content=data, status_code=status_code)
|
||||||
|
|
||||||
|
self.app.add_api_route(path, wrapper, methods=["GET"])
|
||||||
@ -6,11 +6,13 @@ from webapi.providers import common
|
|||||||
from webapi.providers.logger import register_logger
|
from webapi.providers.logger import register_logger
|
||||||
from webapi.providers import router
|
from webapi.providers import router
|
||||||
from webapi.providers import database
|
from webapi.providers import database
|
||||||
|
from webapi.providers import probes
|
||||||
|
from webapi.providers import metrics
|
||||||
|
|
||||||
# from webapi.providers import scheduler
|
# from webapi.providers import scheduler
|
||||||
from webapi.providers import exception_handler
|
from webapi.providers import exception_handler
|
||||||
from .freeleaps_app import FreeleapsApp
|
from .freeleaps_app import FreeleapsApp
|
||||||
|
from common.config.app_settings import app_settings
|
||||||
|
|
||||||
def create_app() -> FastAPI:
|
def create_app() -> FastAPI:
|
||||||
logging.info("App initializing")
|
logging.info("App initializing")
|
||||||
@ -26,6 +28,15 @@ def create_app() -> FastAPI:
|
|||||||
|
|
||||||
# Call the custom_openapi function to change the OpenAPI version
|
# Call the custom_openapi function to change the OpenAPI version
|
||||||
customize_openapi_security(app)
|
customize_openapi_security(app)
|
||||||
|
|
||||||
|
# Create probes manager to register probes if enabled
|
||||||
|
if app_settings.PROBES_ENABLED:
|
||||||
|
register(app, probes)
|
||||||
|
|
||||||
|
# Register metrics APIs if enabled
|
||||||
|
if app_settings.METRICS_ENABLED:
|
||||||
|
register(app, metrics)
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -1,9 +1,18 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import asyncio
|
||||||
from common.config.app_settings import app_settings
|
from common.config.app_settings import app_settings
|
||||||
from beanie import init_beanie
|
from beanie import init_beanie
|
||||||
from motor.motor_asyncio import AsyncIOMotorClient
|
from motor.motor_asyncio import AsyncIOMotorClient
|
||||||
from backend.models import backend_models
|
from backend.models import backend_models
|
||||||
|
from common.probes import ProbeResult
|
||||||
|
|
||||||
|
client = AsyncIOMotorClient(
|
||||||
|
app_settings.MONGODB_URI,
|
||||||
|
serverSelectionTimeoutMS=60000,
|
||||||
|
minPoolSize=5, # Minimum number of connections in the pool
|
||||||
|
maxPoolSize=20, # Maximum number of connections in the pool
|
||||||
|
heartbeatFrequencyMS=20000, # Adjust heartbeat frequency to 20 seconds
|
||||||
|
)
|
||||||
|
|
||||||
def register(app):
|
def register(app):
|
||||||
app.debug = "auth_mongo_debug"
|
app.debug = "auth_mongo_debug"
|
||||||
@ -16,15 +25,14 @@ def register(app):
|
|||||||
async def start_database():
|
async def start_database():
|
||||||
await initiate_database()
|
await initiate_database()
|
||||||
|
|
||||||
|
async def check_database_initialized() -> ProbeResult:
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(client.server_info(), timeout=5)
|
||||||
|
return ProbeResult(success=True, message="service has been initialized and ready to serve")
|
||||||
|
except Exception:
|
||||||
|
return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"})
|
||||||
|
|
||||||
async def initiate_database():
|
async def initiate_database():
|
||||||
client = AsyncIOMotorClient(
|
|
||||||
app_settings.MONGODB_URI,
|
|
||||||
serverSelectionTimeoutMS=60000,
|
|
||||||
minPoolSize=5, # Minimum number of connections in the pool
|
|
||||||
maxPoolSize=20, # Maximum number of connections in the pool
|
|
||||||
heartbeatFrequencyMS=20000, # Adjust heartbeat frequency to 20 seconds
|
|
||||||
)
|
|
||||||
await init_beanie(
|
await init_beanie(
|
||||||
database=client[app_settings.MONGODB_NAME], document_models=backend_models
|
database=client[app_settings.MONGODB_NAME], document_models=backend_models
|
||||||
)
|
)
|
||||||
|
|||||||
13
apps/authentication/webapi/providers/metrics.py
Normal file
13
apps/authentication/webapi/providers/metrics.py
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
import logging
|
||||||
|
from prometheus_fastapi_instrumentator import Instrumentator
|
||||||
|
from common.config.app_settings import app_settings
|
||||||
|
|
||||||
|
def register(app):
|
||||||
|
instrumentator = Instrumentator().instrument(app,
|
||||||
|
metric_namespace="freeleaps",
|
||||||
|
metric_subsystem=app_settings.APP_NAME)
|
||||||
|
|
||||||
|
@app.on_event("startup")
|
||||||
|
async def startup():
|
||||||
|
instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True)
|
||||||
|
logging.info("Metrics endpoint exposed at /api/_/metrics")
|
||||||
25
apps/authentication/webapi/providers/probes.py
Normal file
25
apps/authentication/webapi/providers/probes.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
from common.probes import ProbeManager, ProbeType
|
||||||
|
from common.probes.adapters import FastAPIAdapter
|
||||||
|
from .database import check_database_initialized
|
||||||
|
|
||||||
|
def register(app):
|
||||||
|
probes_manager = ProbeManager()
|
||||||
|
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
|
||||||
|
|
||||||
|
async def readiness_checker():
|
||||||
|
return await check_database_initialized()
|
||||||
|
|
||||||
|
probes_manager.register(
|
||||||
|
name="readiness",
|
||||||
|
prefix="/api",
|
||||||
|
type=ProbeType.READINESS,
|
||||||
|
check_func=readiness_checker,
|
||||||
|
frameworks=["fastapi"]
|
||||||
|
)
|
||||||
|
|
||||||
|
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
|
||||||
|
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
|
||||||
|
|
||||||
|
@app.on_event("startup")
|
||||||
|
async def mark_startup_complete():
|
||||||
|
probes_manager.mark_startup_complete()
|
||||||
Loading…
Reference in New Issue
Block a user