diff --git a/apps/authentication/common/config/app_settings.py b/apps/authentication/common/config/app_settings.py index c8e5404..35ccca9 100644 --- a/apps/authentication/common/config/app_settings.py +++ b/apps/authentication/common/config/app_settings.py @@ -5,6 +5,9 @@ class AppSettings(BaseSettings): NAME: str = "authentication" APP_NAME: str = NAME + METRICS_ENABLED: bool = False + PROBES_ENABLED: bool = True + JWT_SECRET_KEY: str = "" JWT_ALGORITHM: str = "HS256" diff --git a/apps/authentication/common/probes/__init__.py b/apps/authentication/common/probes/__init__.py new file mode 100644 index 0000000..4071df8 --- /dev/null +++ b/apps/authentication/common/probes/__init__.py @@ -0,0 +1,140 @@ +import logging +from enum import Enum +from typing import Optional, Callable, Tuple, Dict +import inspect +from datetime import datetime, timezone + +# ProbeType is an Enum that defines the types of probes that can be registered. +class ProbeType(Enum): + LIVENESS = "liveness" + READINESS = "readiness" + STARTUP = "startup" + +# ProbeResult is a class that represents the result of a probe check. +class ProbeResult: + def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None): + self.success = success + self.message = message + self.data = data or {} + + def to_dict(self) -> dict: + return { + "success": self.success, + "message": self.message, + "data": self.data + } + +# Probe is a class that represents a probe that can be registered. +class Probe: + def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None): + self.type = type + self.path = path + self.check_fn = check_fn + self.name = name or f"{type.value}-{id(self)}" + + async def execute(self) -> ProbeResult: + try: + result = self.check_fn() + if inspect.isawaitable(result): + result = await result + + if isinstance(result, ProbeResult): + return result + elif isinstance(result, bool): + return ProbeResult(result, "ok" if result else "failed") + else: + return ProbeResult(True, "ok") + except Exception as e: + return ProbeResult(False, str(e)) + +# ProbeGroup is a class that represents a group of probes that can be checked together. +class ProbeGroup: + def __init__(self, path: str): + self.path = path + self.probes: Dict[str, Probe] = {} + + def add_probe(self, probe: Probe): + self.probes[probe.name] = probe + + async def check_all(self) -> Tuple[bool, dict]: + results = {} + all_success = True + + for name, probe in self.probes.items(): + result = await probe.execute() + results[name] = result.to_dict() + if not result.success: + all_success = False + + return all_success, results + +# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters. +class FrameworkAdapter: + async def handle_request(self, group: ProbeGroup): + all_success, results = await group.check_all() + status_code = 200 if all_success else 503 + return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code + + def register_route(self, path: str, handler: Callable): + raise NotImplementedError + +# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters. +class ProbeManager: + _default_paths = { + ProbeType.LIVENESS: "/_/livez", + ProbeType.READINESS: "/_/readyz", + ProbeType.STARTUP: "/_/healthz" + } + + def __init__(self): + self.groups: Dict[str, ProbeGroup] = {} + self.adapters: Dict[str, FrameworkAdapter] = {} + self._startup_complete = False + + def register_adapter(self, framework: str, adapter: FrameworkAdapter): + self.adapters[framework] = adapter + logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}") + + def register( + self, + type: ProbeType, + check_func: Optional[Callable] = None, + path: Optional[str] = None, + prefix: str = "", + name: Optional[str] = None, + frameworks: Optional[list] = None + ): + path = path or self._default_paths.get(type, "/_/healthz") + if prefix: + path = f"{prefix}{path}" + + if type == ProbeType.STARTUP and check_func is None: + check_func = self._default_startup_check + + probe = Probe(type, path, check_func or (lambda: True), name) + + if path not in self.groups: + self.groups[path] = ProbeGroup(path) + self.groups[path].add_probe(probe) + + for framework in (frameworks or ["default"]): + self._register_route(framework, path) + logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}") + + def _register_route(self, framework: str, path: str): + if framework not in self.adapters: + return + + adapter = self.adapters[framework] + group = self.groups[path] + + async def handler(): + return await adapter.handle_request(group) + + adapter.register_route(path, handler) + + def _default_startup_check(self) -> bool: + return self._startup_complete + + def mark_startup_complete(self): + self._startup_complete = True \ No newline at end of file diff --git a/apps/authentication/common/probes/adapters.py b/apps/authentication/common/probes/adapters.py new file mode 100644 index 0000000..2ecd38a --- /dev/null +++ b/apps/authentication/common/probes/adapters.py @@ -0,0 +1,15 @@ +from . import FrameworkAdapter +from fastapi.responses import JSONResponse +from typing import Callable + +# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI. +class FastAPIAdapter(FrameworkAdapter): + def __init__(self, app): + self.app = app + + def register_route(self,path: str, handler: Callable): + async def wrapper(): + data, status_code = await handler() + return JSONResponse(content=data, status_code=status_code) + + self.app.add_api_route(path, wrapper, methods=["GET"]) diff --git a/apps/authentication/webapi/bootstrap/application.py b/apps/authentication/webapi/bootstrap/application.py index e060fbf..3f6dfb3 100644 --- a/apps/authentication/webapi/bootstrap/application.py +++ b/apps/authentication/webapi/bootstrap/application.py @@ -6,11 +6,13 @@ from webapi.providers import common from webapi.providers.logger import register_logger from webapi.providers import router from webapi.providers import database +from webapi.providers import probes +from webapi.providers import metrics # from webapi.providers import scheduler from webapi.providers import exception_handler from .freeleaps_app import FreeleapsApp - +from common.config.app_settings import app_settings def create_app() -> FastAPI: logging.info("App initializing") @@ -26,6 +28,15 @@ def create_app() -> FastAPI: # Call the custom_openapi function to change the OpenAPI version customize_openapi_security(app) + + # Create probes manager to register probes if enabled + if app_settings.PROBES_ENABLED: + register(app, probes) + + # Register metrics APIs if enabled + if app_settings.METRICS_ENABLED: + register(app, metrics) + return app diff --git a/apps/authentication/webapi/providers/database.py b/apps/authentication/webapi/providers/database.py index c54f393..aaf1cca 100644 --- a/apps/authentication/webapi/providers/database.py +++ b/apps/authentication/webapi/providers/database.py @@ -1,9 +1,18 @@ import logging +import asyncio from common.config.app_settings import app_settings from beanie import init_beanie from motor.motor_asyncio import AsyncIOMotorClient from backend.models import backend_models +from common.probes import ProbeResult +client = AsyncIOMotorClient( + app_settings.MONGODB_URI, + serverSelectionTimeoutMS=60000, + minPoolSize=5, # Minimum number of connections in the pool + maxPoolSize=20, # Maximum number of connections in the pool + heartbeatFrequencyMS=20000, # Adjust heartbeat frequency to 20 seconds +) def register(app): app.debug = "auth_mongo_debug" @@ -16,15 +25,14 @@ def register(app): async def start_database(): await initiate_database() +async def check_database_initialized() -> ProbeResult: + try: + await asyncio.wait_for(client.server_info(), timeout=5) + return ProbeResult(success=True, message="service has been initialized and ready to serve") + except Exception: + return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"}) async def initiate_database(): - client = AsyncIOMotorClient( - app_settings.MONGODB_URI, - serverSelectionTimeoutMS=60000, - minPoolSize=5, # Minimum number of connections in the pool - maxPoolSize=20, # Maximum number of connections in the pool - heartbeatFrequencyMS=20000, # Adjust heartbeat frequency to 20 seconds - ) await init_beanie( database=client[app_settings.MONGODB_NAME], document_models=backend_models ) diff --git a/apps/authentication/webapi/providers/metrics.py b/apps/authentication/webapi/providers/metrics.py new file mode 100644 index 0000000..ae5a634 --- /dev/null +++ b/apps/authentication/webapi/providers/metrics.py @@ -0,0 +1,13 @@ +import logging +from prometheus_fastapi_instrumentator import Instrumentator +from common.config.app_settings import app_settings + +def register(app): + instrumentator = Instrumentator().instrument(app, + metric_namespace="freeleaps", + metric_subsystem=app_settings.APP_NAME) + + @app.on_event("startup") + async def startup(): + instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True) + logging.info("Metrics endpoint exposed at /api/_/metrics") \ No newline at end of file diff --git a/apps/authentication/webapi/providers/probes.py b/apps/authentication/webapi/providers/probes.py new file mode 100644 index 0000000..7c5b5d8 --- /dev/null +++ b/apps/authentication/webapi/providers/probes.py @@ -0,0 +1,25 @@ +from common.probes import ProbeManager, ProbeType +from common.probes.adapters import FastAPIAdapter +from .database import check_database_initialized + +def register(app): + probes_manager = ProbeManager() + probes_manager.register_adapter("fastapi", FastAPIAdapter(app)) + + async def readiness_checker(): + return await check_database_initialized() + + probes_manager.register( + name="readiness", + prefix="/api", + type=ProbeType.READINESS, + check_func=readiness_checker, + frameworks=["fastapi"] + ) + + probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"]) + probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"]) + + @app.on_event("startup") + async def mark_startup_complete(): + probes_manager.mark_startup_complete() \ No newline at end of file