From 9754576d28066c9805e5e4673e03fd79b3a603bd Mon Sep 17 00:00:00 2001 From: zhenyus Date: Wed, 12 Mar 2025 15:59:06 +0800 Subject: [PATCH 1/3] feat(probes): add metrics and probes APIs for application health checks Signed-off-by: zhenyus --- .../common/config/app_settings.py | 3 + apps/authentication/common/probes/__init__.py | 140 ++++++++++++++++++ apps/authentication/common/probes/adapters.py | 15 ++ .../webapi/bootstrap/application.py | 13 +- .../webapi/providers/database.py | 24 ++- .../webapi/providers/metrics.py | 13 ++ .../authentication/webapi/providers/probes.py | 25 ++++ 7 files changed, 224 insertions(+), 9 deletions(-) create mode 100644 apps/authentication/common/probes/__init__.py create mode 100644 apps/authentication/common/probes/adapters.py create mode 100644 apps/authentication/webapi/providers/metrics.py create mode 100644 apps/authentication/webapi/providers/probes.py diff --git a/apps/authentication/common/config/app_settings.py b/apps/authentication/common/config/app_settings.py index c8e5404..35ccca9 100644 --- a/apps/authentication/common/config/app_settings.py +++ b/apps/authentication/common/config/app_settings.py @@ -5,6 +5,9 @@ class AppSettings(BaseSettings): NAME: str = "authentication" APP_NAME: str = NAME + METRICS_ENABLED: bool = False + PROBES_ENABLED: bool = True + JWT_SECRET_KEY: str = "" JWT_ALGORITHM: str = "HS256" diff --git a/apps/authentication/common/probes/__init__.py b/apps/authentication/common/probes/__init__.py new file mode 100644 index 0000000..4071df8 --- /dev/null +++ b/apps/authentication/common/probes/__init__.py @@ -0,0 +1,140 @@ +import logging +from enum import Enum +from typing import Optional, Callable, Tuple, Dict +import inspect +from datetime import datetime, timezone + +# ProbeType is an Enum that defines the types of probes that can be registered. +class ProbeType(Enum): + LIVENESS = "liveness" + READINESS = "readiness" + STARTUP = "startup" + +# ProbeResult is a class that represents the result of a probe check. +class ProbeResult: + def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None): + self.success = success + self.message = message + self.data = data or {} + + def to_dict(self) -> dict: + return { + "success": self.success, + "message": self.message, + "data": self.data + } + +# Probe is a class that represents a probe that can be registered. +class Probe: + def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None): + self.type = type + self.path = path + self.check_fn = check_fn + self.name = name or f"{type.value}-{id(self)}" + + async def execute(self) -> ProbeResult: + try: + result = self.check_fn() + if inspect.isawaitable(result): + result = await result + + if isinstance(result, ProbeResult): + return result + elif isinstance(result, bool): + return ProbeResult(result, "ok" if result else "failed") + else: + return ProbeResult(True, "ok") + except Exception as e: + return ProbeResult(False, str(e)) + +# ProbeGroup is a class that represents a group of probes that can be checked together. +class ProbeGroup: + def __init__(self, path: str): + self.path = path + self.probes: Dict[str, Probe] = {} + + def add_probe(self, probe: Probe): + self.probes[probe.name] = probe + + async def check_all(self) -> Tuple[bool, dict]: + results = {} + all_success = True + + for name, probe in self.probes.items(): + result = await probe.execute() + results[name] = result.to_dict() + if not result.success: + all_success = False + + return all_success, results + +# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters. +class FrameworkAdapter: + async def handle_request(self, group: ProbeGroup): + all_success, results = await group.check_all() + status_code = 200 if all_success else 503 + return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code + + def register_route(self, path: str, handler: Callable): + raise NotImplementedError + +# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters. +class ProbeManager: + _default_paths = { + ProbeType.LIVENESS: "/_/livez", + ProbeType.READINESS: "/_/readyz", + ProbeType.STARTUP: "/_/healthz" + } + + def __init__(self): + self.groups: Dict[str, ProbeGroup] = {} + self.adapters: Dict[str, FrameworkAdapter] = {} + self._startup_complete = False + + def register_adapter(self, framework: str, adapter: FrameworkAdapter): + self.adapters[framework] = adapter + logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}") + + def register( + self, + type: ProbeType, + check_func: Optional[Callable] = None, + path: Optional[str] = None, + prefix: str = "", + name: Optional[str] = None, + frameworks: Optional[list] = None + ): + path = path or self._default_paths.get(type, "/_/healthz") + if prefix: + path = f"{prefix}{path}" + + if type == ProbeType.STARTUP and check_func is None: + check_func = self._default_startup_check + + probe = Probe(type, path, check_func or (lambda: True), name) + + if path not in self.groups: + self.groups[path] = ProbeGroup(path) + self.groups[path].add_probe(probe) + + for framework in (frameworks or ["default"]): + self._register_route(framework, path) + logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}") + + def _register_route(self, framework: str, path: str): + if framework not in self.adapters: + return + + adapter = self.adapters[framework] + group = self.groups[path] + + async def handler(): + return await adapter.handle_request(group) + + adapter.register_route(path, handler) + + def _default_startup_check(self) -> bool: + return self._startup_complete + + def mark_startup_complete(self): + self._startup_complete = True \ No newline at end of file diff --git a/apps/authentication/common/probes/adapters.py b/apps/authentication/common/probes/adapters.py new file mode 100644 index 0000000..2ecd38a --- /dev/null +++ b/apps/authentication/common/probes/adapters.py @@ -0,0 +1,15 @@ +from . import FrameworkAdapter +from fastapi.responses import JSONResponse +from typing import Callable + +# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI. +class FastAPIAdapter(FrameworkAdapter): + def __init__(self, app): + self.app = app + + def register_route(self,path: str, handler: Callable): + async def wrapper(): + data, status_code = await handler() + return JSONResponse(content=data, status_code=status_code) + + self.app.add_api_route(path, wrapper, methods=["GET"]) diff --git a/apps/authentication/webapi/bootstrap/application.py b/apps/authentication/webapi/bootstrap/application.py index e060fbf..3f6dfb3 100644 --- a/apps/authentication/webapi/bootstrap/application.py +++ b/apps/authentication/webapi/bootstrap/application.py @@ -6,11 +6,13 @@ from webapi.providers import common from webapi.providers.logger import register_logger from webapi.providers import router from webapi.providers import database +from webapi.providers import probes +from webapi.providers import metrics # from webapi.providers import scheduler from webapi.providers import exception_handler from .freeleaps_app import FreeleapsApp - +from common.config.app_settings import app_settings def create_app() -> FastAPI: logging.info("App initializing") @@ -26,6 +28,15 @@ def create_app() -> FastAPI: # Call the custom_openapi function to change the OpenAPI version customize_openapi_security(app) + + # Create probes manager to register probes if enabled + if app_settings.PROBES_ENABLED: + register(app, probes) + + # Register metrics APIs if enabled + if app_settings.METRICS_ENABLED: + register(app, metrics) + return app diff --git a/apps/authentication/webapi/providers/database.py b/apps/authentication/webapi/providers/database.py index c54f393..334279a 100644 --- a/apps/authentication/webapi/providers/database.py +++ b/apps/authentication/webapi/providers/database.py @@ -1,9 +1,18 @@ import logging +import asyncio from common.config.app_settings import app_settings from beanie import init_beanie from motor.motor_asyncio import AsyncIOMotorClient from backend.models import backend_models +from common.probes import ProbeResult +client = AsyncIOMotorClient( + app_settings.MONGODB_URI, + serverSelectionTimeoutMS=60000, + minPoolSize=5, # Minimum number of connections in the pool + maxPoolSize=20, # Maximum number of connections in the pool + heartbeatFrequencyMS=20000, # Adjust heartbeat frequency to 20 seconds +) def register(app): app.debug = "auth_mongo_debug" @@ -12,19 +21,18 @@ def register(app): # Configure logging for pymongo logging.getLogger("pymongo").setLevel(logging.WARNING) # Suppress DEBUG logs - @app.on_event("startup") + # @app.on_event("startup") async def start_database(): await initiate_database() +async def check_database_initialized() -> ProbeResult: + try: + await asyncio.wait_for(client.server_info(), timeout=5) + return ProbeResult(success=True, message="service has been initialized and ready to serve") + except Exception: + return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"}) async def initiate_database(): - client = AsyncIOMotorClient( - app_settings.MONGODB_URI, - serverSelectionTimeoutMS=60000, - minPoolSize=5, # Minimum number of connections in the pool - maxPoolSize=20, # Maximum number of connections in the pool - heartbeatFrequencyMS=20000, # Adjust heartbeat frequency to 20 seconds - ) await init_beanie( database=client[app_settings.MONGODB_NAME], document_models=backend_models ) diff --git a/apps/authentication/webapi/providers/metrics.py b/apps/authentication/webapi/providers/metrics.py new file mode 100644 index 0000000..ae5a634 --- /dev/null +++ b/apps/authentication/webapi/providers/metrics.py @@ -0,0 +1,13 @@ +import logging +from prometheus_fastapi_instrumentator import Instrumentator +from common.config.app_settings import app_settings + +def register(app): + instrumentator = Instrumentator().instrument(app, + metric_namespace="freeleaps", + metric_subsystem=app_settings.APP_NAME) + + @app.on_event("startup") + async def startup(): + instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True) + logging.info("Metrics endpoint exposed at /api/_/metrics") \ No newline at end of file diff --git a/apps/authentication/webapi/providers/probes.py b/apps/authentication/webapi/providers/probes.py new file mode 100644 index 0000000..7c5b5d8 --- /dev/null +++ b/apps/authentication/webapi/providers/probes.py @@ -0,0 +1,25 @@ +from common.probes import ProbeManager, ProbeType +from common.probes.adapters import FastAPIAdapter +from .database import check_database_initialized + +def register(app): + probes_manager = ProbeManager() + probes_manager.register_adapter("fastapi", FastAPIAdapter(app)) + + async def readiness_checker(): + return await check_database_initialized() + + probes_manager.register( + name="readiness", + prefix="/api", + type=ProbeType.READINESS, + check_func=readiness_checker, + frameworks=["fastapi"] + ) + + probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"]) + probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"]) + + @app.on_event("startup") + async def mark_startup_complete(): + probes_manager.mark_startup_complete() \ No newline at end of file From 962292cfe0b640d2e01ef48a7308ace0b995d692 Mon Sep 17 00:00:00 2001 From: zhenyus Date: Wed, 12 Mar 2025 16:10:47 +0800 Subject: [PATCH 2/3] chore: enabled orm initialization when startup Signed-off-by: zhenyus --- apps/authentication/webapi/providers/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/authentication/webapi/providers/database.py b/apps/authentication/webapi/providers/database.py index 334279a..aaf1cca 100644 --- a/apps/authentication/webapi/providers/database.py +++ b/apps/authentication/webapi/providers/database.py @@ -21,7 +21,7 @@ def register(app): # Configure logging for pymongo logging.getLogger("pymongo").setLevel(logging.WARNING) # Suppress DEBUG logs - # @app.on_event("startup") + @app.on_event("startup") async def start_database(): await initiate_database() From d5e42d31a4ce1ac64f6c5dcf5688c0acae1fdaa6 Mon Sep 17 00:00:00 2001 From: zhenyus Date: Mon, 17 Mar 2025 10:46:02 +0800 Subject: [PATCH 3/3] feat(observability): add configuration options and implement metrics/probes APIs Signed-off-by: zhenyus --- .../common/config/app_settings.py | 3 + .../central_storage/common/probes/__init__.py | 140 ++++++++++++++++++ .../central_storage/common/probes/adapters.py | 15 ++ .../webapi/bootstrap/application.py | 12 ++ .../webapi/providers/database.py | 10 +- .../webapi/providers/metrics.py | 13 ++ .../webapi/providers/probes.py | 25 ++++ apps/content/common/config/app_settings.py | 3 + apps/content/common/probes/__init__.py | 140 ++++++++++++++++++ apps/content/common/probes/adapters.py | 15 ++ apps/content/database/mongo/mongo_driver.py | 12 +- apps/content/webapi/bootstrap/application.py | 11 ++ apps/content/webapi/providers/metrics.py | 13 ++ apps/content/webapi/providers/probes.py | 25 ++++ .../common/config/app_settings.py | 3 + apps/notification/common/probes/__init__.py | 140 ++++++++++++++++++ apps/notification/common/probes/adapters.py | 15 ++ .../webapi/bootstrap/application.py | 11 ++ apps/notification/webapi/providers/metrics.py | 13 ++ apps/notification/webapi/providers/probes.py | 44 ++++++ apps/payment/common/config/app_settings.py | 3 + apps/payment/common/probes/__init__.py | 140 ++++++++++++++++++ apps/payment/common/probes/adapters.py | 15 ++ apps/payment/webapi/bootstrap/application.py | 17 ++- apps/payment/webapi/providers/database.py | 21 ++- apps/payment/webapi/providers/metrics.py | 13 ++ apps/payment/webapi/providers/probes.py | 25 ++++ 27 files changed, 882 insertions(+), 15 deletions(-) create mode 100644 apps/central_storage/common/probes/__init__.py create mode 100644 apps/central_storage/common/probes/adapters.py create mode 100644 apps/central_storage/webapi/providers/metrics.py create mode 100644 apps/central_storage/webapi/providers/probes.py create mode 100644 apps/content/common/probes/__init__.py create mode 100644 apps/content/common/probes/adapters.py create mode 100644 apps/content/webapi/providers/metrics.py create mode 100644 apps/content/webapi/providers/probes.py create mode 100644 apps/notification/common/probes/__init__.py create mode 100644 apps/notification/common/probes/adapters.py create mode 100644 apps/notification/webapi/providers/metrics.py create mode 100644 apps/notification/webapi/providers/probes.py create mode 100644 apps/payment/common/probes/__init__.py create mode 100644 apps/payment/common/probes/adapters.py create mode 100644 apps/payment/webapi/providers/metrics.py create mode 100644 apps/payment/webapi/providers/probes.py diff --git a/apps/central_storage/common/config/app_settings.py b/apps/central_storage/common/config/app_settings.py index d9ba0ed..d4639f0 100644 --- a/apps/central_storage/common/config/app_settings.py +++ b/apps/central_storage/common/config/app_settings.py @@ -6,6 +6,9 @@ class AppSettings(BaseSettings): NAME: str = "central_storage" APP_NAME:str = NAME + METRICS_ENABLED: bool = False + PROBES_ENABLED: bool = True + AZURE_STORAGE_DOCUMENT_API_ENDPOINT: str = "" AZURE_STORAGE_DOCUMENT_API_KEY: str = "" diff --git a/apps/central_storage/common/probes/__init__.py b/apps/central_storage/common/probes/__init__.py new file mode 100644 index 0000000..4071df8 --- /dev/null +++ b/apps/central_storage/common/probes/__init__.py @@ -0,0 +1,140 @@ +import logging +from enum import Enum +from typing import Optional, Callable, Tuple, Dict +import inspect +from datetime import datetime, timezone + +# ProbeType is an Enum that defines the types of probes that can be registered. +class ProbeType(Enum): + LIVENESS = "liveness" + READINESS = "readiness" + STARTUP = "startup" + +# ProbeResult is a class that represents the result of a probe check. +class ProbeResult: + def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None): + self.success = success + self.message = message + self.data = data or {} + + def to_dict(self) -> dict: + return { + "success": self.success, + "message": self.message, + "data": self.data + } + +# Probe is a class that represents a probe that can be registered. +class Probe: + def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None): + self.type = type + self.path = path + self.check_fn = check_fn + self.name = name or f"{type.value}-{id(self)}" + + async def execute(self) -> ProbeResult: + try: + result = self.check_fn() + if inspect.isawaitable(result): + result = await result + + if isinstance(result, ProbeResult): + return result + elif isinstance(result, bool): + return ProbeResult(result, "ok" if result else "failed") + else: + return ProbeResult(True, "ok") + except Exception as e: + return ProbeResult(False, str(e)) + +# ProbeGroup is a class that represents a group of probes that can be checked together. +class ProbeGroup: + def __init__(self, path: str): + self.path = path + self.probes: Dict[str, Probe] = {} + + def add_probe(self, probe: Probe): + self.probes[probe.name] = probe + + async def check_all(self) -> Tuple[bool, dict]: + results = {} + all_success = True + + for name, probe in self.probes.items(): + result = await probe.execute() + results[name] = result.to_dict() + if not result.success: + all_success = False + + return all_success, results + +# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters. +class FrameworkAdapter: + async def handle_request(self, group: ProbeGroup): + all_success, results = await group.check_all() + status_code = 200 if all_success else 503 + return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code + + def register_route(self, path: str, handler: Callable): + raise NotImplementedError + +# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters. +class ProbeManager: + _default_paths = { + ProbeType.LIVENESS: "/_/livez", + ProbeType.READINESS: "/_/readyz", + ProbeType.STARTUP: "/_/healthz" + } + + def __init__(self): + self.groups: Dict[str, ProbeGroup] = {} + self.adapters: Dict[str, FrameworkAdapter] = {} + self._startup_complete = False + + def register_adapter(self, framework: str, adapter: FrameworkAdapter): + self.adapters[framework] = adapter + logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}") + + def register( + self, + type: ProbeType, + check_func: Optional[Callable] = None, + path: Optional[str] = None, + prefix: str = "", + name: Optional[str] = None, + frameworks: Optional[list] = None + ): + path = path or self._default_paths.get(type, "/_/healthz") + if prefix: + path = f"{prefix}{path}" + + if type == ProbeType.STARTUP and check_func is None: + check_func = self._default_startup_check + + probe = Probe(type, path, check_func or (lambda: True), name) + + if path not in self.groups: + self.groups[path] = ProbeGroup(path) + self.groups[path].add_probe(probe) + + for framework in (frameworks or ["default"]): + self._register_route(framework, path) + logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}") + + def _register_route(self, framework: str, path: str): + if framework not in self.adapters: + return + + adapter = self.adapters[framework] + group = self.groups[path] + + async def handler(): + return await adapter.handle_request(group) + + adapter.register_route(path, handler) + + def _default_startup_check(self) -> bool: + return self._startup_complete + + def mark_startup_complete(self): + self._startup_complete = True \ No newline at end of file diff --git a/apps/central_storage/common/probes/adapters.py b/apps/central_storage/common/probes/adapters.py new file mode 100644 index 0000000..2ecd38a --- /dev/null +++ b/apps/central_storage/common/probes/adapters.py @@ -0,0 +1,15 @@ +from . import FrameworkAdapter +from fastapi.responses import JSONResponse +from typing import Callable + +# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI. +class FastAPIAdapter(FrameworkAdapter): + def __init__(self, app): + self.app = app + + def register_route(self,path: str, handler: Callable): + async def wrapper(): + data, status_code = await handler() + return JSONResponse(content=data, status_code=status_code) + + self.app.add_api_route(path, wrapper, methods=["GET"]) diff --git a/apps/central_storage/webapi/bootstrap/application.py b/apps/central_storage/webapi/bootstrap/application.py index 8d0844d..aae7a72 100644 --- a/apps/central_storage/webapi/bootstrap/application.py +++ b/apps/central_storage/webapi/bootstrap/application.py @@ -8,7 +8,10 @@ from webapi.providers import router from webapi.providers import database from webapi.providers import scheduler from webapi.providers import exception_handler +from webapi.providers import probes +from webapi.providers import metrics from .freeleaps_app import FreeleapsApp +from common.config.app_settings import app_settings def create_app() -> FastAPI: @@ -25,6 +28,15 @@ def create_app() -> FastAPI: # Call the custom_openapi function to change the OpenAPI version customize_openapi_security(app) + + # Register probe APIs if enabled + if app_settings.PROBES_ENABLED: + register(app, probes) + + # Register metrics APIs if enabled + if app_settings.METRICS_ENABLED: + register(app, metrics) + return app diff --git a/apps/central_storage/webapi/providers/database.py b/apps/central_storage/webapi/providers/database.py index 0024f82..c002028 100644 --- a/apps/central_storage/webapi/providers/database.py +++ b/apps/central_storage/webapi/providers/database.py @@ -2,7 +2,10 @@ from webapi.config.site_settings import site_settings from beanie import init_beanie from motor.motor_asyncio import AsyncIOMotorClient from backend.models import backend_models +from common.probes import ProbeResult +import asyncio +client = AsyncIOMotorClient(site_settings.MONGODB_URI, serverSelectionTimeoutMS=60000) def register(app): app.debug = "mongo_debug" @@ -12,9 +15,14 @@ def register(app): async def start_database(): await initiate_database() +async def check_database_initialized() -> ProbeResult: + try: + await asyncio.wait_for(client.server_info(), timeout=5) + return ProbeResult(success=True, message="service has been initialized and ready to serve") + except Exception: + return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"}) async def initiate_database(): - client = AsyncIOMotorClient(site_settings.MONGODB_URI, serverSelectionTimeoutMS=60000) await init_beanie( database=client[site_settings.MONGODB_NAME], document_models=backend_models ) diff --git a/apps/central_storage/webapi/providers/metrics.py b/apps/central_storage/webapi/providers/metrics.py new file mode 100644 index 0000000..ae5a634 --- /dev/null +++ b/apps/central_storage/webapi/providers/metrics.py @@ -0,0 +1,13 @@ +import logging +from prometheus_fastapi_instrumentator import Instrumentator +from common.config.app_settings import app_settings + +def register(app): + instrumentator = Instrumentator().instrument(app, + metric_namespace="freeleaps", + metric_subsystem=app_settings.APP_NAME) + + @app.on_event("startup") + async def startup(): + instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True) + logging.info("Metrics endpoint exposed at /api/_/metrics") \ No newline at end of file diff --git a/apps/central_storage/webapi/providers/probes.py b/apps/central_storage/webapi/providers/probes.py new file mode 100644 index 0000000..7c5b5d8 --- /dev/null +++ b/apps/central_storage/webapi/providers/probes.py @@ -0,0 +1,25 @@ +from common.probes import ProbeManager, ProbeType +from common.probes.adapters import FastAPIAdapter +from .database import check_database_initialized + +def register(app): + probes_manager = ProbeManager() + probes_manager.register_adapter("fastapi", FastAPIAdapter(app)) + + async def readiness_checker(): + return await check_database_initialized() + + probes_manager.register( + name="readiness", + prefix="/api", + type=ProbeType.READINESS, + check_func=readiness_checker, + frameworks=["fastapi"] + ) + + probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"]) + probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"]) + + @app.on_event("startup") + async def mark_startup_complete(): + probes_manager.mark_startup_complete() \ No newline at end of file diff --git a/apps/content/common/config/app_settings.py b/apps/content/common/config/app_settings.py index 8a84051..aa5f826 100644 --- a/apps/content/common/config/app_settings.py +++ b/apps/content/common/config/app_settings.py @@ -5,6 +5,9 @@ class AppSettings(BaseSettings): NAME: str = "content" APP_NAME:str = NAME + METRICS_ENABLED: bool = False + PROBES_ENABLED: bool = True + FREELEAPS_WWW_AS_AZURE_CLIENT_SECRET: str = "" CENTRAL_STORAGE_WEBAPI_URL_BASE:str ="" diff --git a/apps/content/common/probes/__init__.py b/apps/content/common/probes/__init__.py new file mode 100644 index 0000000..4071df8 --- /dev/null +++ b/apps/content/common/probes/__init__.py @@ -0,0 +1,140 @@ +import logging +from enum import Enum +from typing import Optional, Callable, Tuple, Dict +import inspect +from datetime import datetime, timezone + +# ProbeType is an Enum that defines the types of probes that can be registered. +class ProbeType(Enum): + LIVENESS = "liveness" + READINESS = "readiness" + STARTUP = "startup" + +# ProbeResult is a class that represents the result of a probe check. +class ProbeResult: + def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None): + self.success = success + self.message = message + self.data = data or {} + + def to_dict(self) -> dict: + return { + "success": self.success, + "message": self.message, + "data": self.data + } + +# Probe is a class that represents a probe that can be registered. +class Probe: + def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None): + self.type = type + self.path = path + self.check_fn = check_fn + self.name = name or f"{type.value}-{id(self)}" + + async def execute(self) -> ProbeResult: + try: + result = self.check_fn() + if inspect.isawaitable(result): + result = await result + + if isinstance(result, ProbeResult): + return result + elif isinstance(result, bool): + return ProbeResult(result, "ok" if result else "failed") + else: + return ProbeResult(True, "ok") + except Exception as e: + return ProbeResult(False, str(e)) + +# ProbeGroup is a class that represents a group of probes that can be checked together. +class ProbeGroup: + def __init__(self, path: str): + self.path = path + self.probes: Dict[str, Probe] = {} + + def add_probe(self, probe: Probe): + self.probes[probe.name] = probe + + async def check_all(self) -> Tuple[bool, dict]: + results = {} + all_success = True + + for name, probe in self.probes.items(): + result = await probe.execute() + results[name] = result.to_dict() + if not result.success: + all_success = False + + return all_success, results + +# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters. +class FrameworkAdapter: + async def handle_request(self, group: ProbeGroup): + all_success, results = await group.check_all() + status_code = 200 if all_success else 503 + return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code + + def register_route(self, path: str, handler: Callable): + raise NotImplementedError + +# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters. +class ProbeManager: + _default_paths = { + ProbeType.LIVENESS: "/_/livez", + ProbeType.READINESS: "/_/readyz", + ProbeType.STARTUP: "/_/healthz" + } + + def __init__(self): + self.groups: Dict[str, ProbeGroup] = {} + self.adapters: Dict[str, FrameworkAdapter] = {} + self._startup_complete = False + + def register_adapter(self, framework: str, adapter: FrameworkAdapter): + self.adapters[framework] = adapter + logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}") + + def register( + self, + type: ProbeType, + check_func: Optional[Callable] = None, + path: Optional[str] = None, + prefix: str = "", + name: Optional[str] = None, + frameworks: Optional[list] = None + ): + path = path or self._default_paths.get(type, "/_/healthz") + if prefix: + path = f"{prefix}{path}" + + if type == ProbeType.STARTUP and check_func is None: + check_func = self._default_startup_check + + probe = Probe(type, path, check_func or (lambda: True), name) + + if path not in self.groups: + self.groups[path] = ProbeGroup(path) + self.groups[path].add_probe(probe) + + for framework in (frameworks or ["default"]): + self._register_route(framework, path) + logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}") + + def _register_route(self, framework: str, path: str): + if framework not in self.adapters: + return + + adapter = self.adapters[framework] + group = self.groups[path] + + async def handler(): + return await adapter.handle_request(group) + + adapter.register_route(path, handler) + + def _default_startup_check(self) -> bool: + return self._startup_complete + + def mark_startup_complete(self): + self._startup_complete = True \ No newline at end of file diff --git a/apps/content/common/probes/adapters.py b/apps/content/common/probes/adapters.py new file mode 100644 index 0000000..2ecd38a --- /dev/null +++ b/apps/content/common/probes/adapters.py @@ -0,0 +1,15 @@ +from . import FrameworkAdapter +from fastapi.responses import JSONResponse +from typing import Callable + +# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI. +class FastAPIAdapter(FrameworkAdapter): + def __init__(self, app): + self.app = app + + def register_route(self,path: str, handler: Callable): + async def wrapper(): + data, status_code = await handler() + return JSONResponse(content=data, status_code=status_code) + + self.app.add_api_route(path, wrapper, methods=["GET"]) diff --git a/apps/content/database/mongo/mongo_driver.py b/apps/content/database/mongo/mongo_driver.py index 92ad7c3..1f340ac 100755 --- a/apps/content/database/mongo/mongo_driver.py +++ b/apps/content/database/mongo/mongo_driver.py @@ -2,13 +2,23 @@ from common.config.app_settings import app_settings from beanie import init_beanie from motor.motor_asyncio import AsyncIOMotorClient from database.mongo.models import mongo_models +import asyncio +from common.probes import ProbeResult + +client = AsyncIOMotorClient(app_settings.MONGODB_URI, serverSelectionTimeoutMS=60000) + +async def check_database_initialized() -> ProbeResult: + try: + await asyncio.wait_for(client.server_info(), timeout=5) + return ProbeResult(success=True, message="service has been initialized and ready to serve") + except Exception: + return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"}) class MongoDriver: def __init__(self): pass async def initiate_database(self): - client = AsyncIOMotorClient(app_settings.MONGODB_URI, serverSelectionTimeoutMS=60000) await init_beanie( database=client[app_settings.MONGODB_NAME], document_models=mongo_models ) diff --git a/apps/content/webapi/bootstrap/application.py b/apps/content/webapi/bootstrap/application.py index 7d0b26d..038f61b 100644 --- a/apps/content/webapi/bootstrap/application.py +++ b/apps/content/webapi/bootstrap/application.py @@ -9,7 +9,10 @@ from webapi.providers import router from webapi.providers import database from webapi.providers import scheduler from webapi.providers import exception_handler +from webapi.providers import probes +from webapi.providers import metrics from .freeleaps_app import FreeleapsApp +from common.config.app_settings import app_settings def create_app() -> FastAPI: @@ -26,6 +29,14 @@ def create_app() -> FastAPI: # Call the custom_openapi function to change the OpenAPI version customize_openapi_security(app) + + # Register probe APIs if enabled + if app_settings.PROBES_ENABLED: + register(app, probes) + + # Register metrics APIs if enabled + if app_settings.METRICS_ENABLED: + register(app, metrics) return app diff --git a/apps/content/webapi/providers/metrics.py b/apps/content/webapi/providers/metrics.py new file mode 100644 index 0000000..ae5a634 --- /dev/null +++ b/apps/content/webapi/providers/metrics.py @@ -0,0 +1,13 @@ +import logging +from prometheus_fastapi_instrumentator import Instrumentator +from common.config.app_settings import app_settings + +def register(app): + instrumentator = Instrumentator().instrument(app, + metric_namespace="freeleaps", + metric_subsystem=app_settings.APP_NAME) + + @app.on_event("startup") + async def startup(): + instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True) + logging.info("Metrics endpoint exposed at /api/_/metrics") \ No newline at end of file diff --git a/apps/content/webapi/providers/probes.py b/apps/content/webapi/providers/probes.py new file mode 100644 index 0000000..3f2566f --- /dev/null +++ b/apps/content/webapi/providers/probes.py @@ -0,0 +1,25 @@ +from common.probes import ProbeManager, ProbeType +from common.probes.adapters import FastAPIAdapter +from database.mongo.mongo_driver import check_database_initialized + +def register(app): + probes_manager = ProbeManager() + probes_manager.register_adapter("fastapi", FastAPIAdapter(app)) + + async def readiness_checker(): + return await check_database_initialized() + + probes_manager.register( + name="readiness", + prefix="/api", + type=ProbeType.READINESS, + check_func=readiness_checker, + frameworks=["fastapi"] + ) + + probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"]) + probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"]) + + @app.on_event("startup") + async def mark_startup_complete(): + probes_manager.mark_startup_complete() \ No newline at end of file diff --git a/apps/notification/common/config/app_settings.py b/apps/notification/common/config/app_settings.py index ec72713..c711676 100644 --- a/apps/notification/common/config/app_settings.py +++ b/apps/notification/common/config/app_settings.py @@ -5,6 +5,9 @@ from pydantic_settings import BaseSettings class AppSettings(BaseSettings): NAME: str = "notification" APP_NAME:str = NAME + + METRICS_ENABLED: bool = False + PROBES_ENABLED: bool = True RABBITMQ_HOST: str = "" RABBITMQ_PORT: int = 5672 diff --git a/apps/notification/common/probes/__init__.py b/apps/notification/common/probes/__init__.py new file mode 100644 index 0000000..4071df8 --- /dev/null +++ b/apps/notification/common/probes/__init__.py @@ -0,0 +1,140 @@ +import logging +from enum import Enum +from typing import Optional, Callable, Tuple, Dict +import inspect +from datetime import datetime, timezone + +# ProbeType is an Enum that defines the types of probes that can be registered. +class ProbeType(Enum): + LIVENESS = "liveness" + READINESS = "readiness" + STARTUP = "startup" + +# ProbeResult is a class that represents the result of a probe check. +class ProbeResult: + def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None): + self.success = success + self.message = message + self.data = data or {} + + def to_dict(self) -> dict: + return { + "success": self.success, + "message": self.message, + "data": self.data + } + +# Probe is a class that represents a probe that can be registered. +class Probe: + def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None): + self.type = type + self.path = path + self.check_fn = check_fn + self.name = name or f"{type.value}-{id(self)}" + + async def execute(self) -> ProbeResult: + try: + result = self.check_fn() + if inspect.isawaitable(result): + result = await result + + if isinstance(result, ProbeResult): + return result + elif isinstance(result, bool): + return ProbeResult(result, "ok" if result else "failed") + else: + return ProbeResult(True, "ok") + except Exception as e: + return ProbeResult(False, str(e)) + +# ProbeGroup is a class that represents a group of probes that can be checked together. +class ProbeGroup: + def __init__(self, path: str): + self.path = path + self.probes: Dict[str, Probe] = {} + + def add_probe(self, probe: Probe): + self.probes[probe.name] = probe + + async def check_all(self) -> Tuple[bool, dict]: + results = {} + all_success = True + + for name, probe in self.probes.items(): + result = await probe.execute() + results[name] = result.to_dict() + if not result.success: + all_success = False + + return all_success, results + +# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters. +class FrameworkAdapter: + async def handle_request(self, group: ProbeGroup): + all_success, results = await group.check_all() + status_code = 200 if all_success else 503 + return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code + + def register_route(self, path: str, handler: Callable): + raise NotImplementedError + +# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters. +class ProbeManager: + _default_paths = { + ProbeType.LIVENESS: "/_/livez", + ProbeType.READINESS: "/_/readyz", + ProbeType.STARTUP: "/_/healthz" + } + + def __init__(self): + self.groups: Dict[str, ProbeGroup] = {} + self.adapters: Dict[str, FrameworkAdapter] = {} + self._startup_complete = False + + def register_adapter(self, framework: str, adapter: FrameworkAdapter): + self.adapters[framework] = adapter + logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}") + + def register( + self, + type: ProbeType, + check_func: Optional[Callable] = None, + path: Optional[str] = None, + prefix: str = "", + name: Optional[str] = None, + frameworks: Optional[list] = None + ): + path = path or self._default_paths.get(type, "/_/healthz") + if prefix: + path = f"{prefix}{path}" + + if type == ProbeType.STARTUP and check_func is None: + check_func = self._default_startup_check + + probe = Probe(type, path, check_func or (lambda: True), name) + + if path not in self.groups: + self.groups[path] = ProbeGroup(path) + self.groups[path].add_probe(probe) + + for framework in (frameworks or ["default"]): + self._register_route(framework, path) + logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}") + + def _register_route(self, framework: str, path: str): + if framework not in self.adapters: + return + + adapter = self.adapters[framework] + group = self.groups[path] + + async def handler(): + return await adapter.handle_request(group) + + adapter.register_route(path, handler) + + def _default_startup_check(self) -> bool: + return self._startup_complete + + def mark_startup_complete(self): + self._startup_complete = True \ No newline at end of file diff --git a/apps/notification/common/probes/adapters.py b/apps/notification/common/probes/adapters.py new file mode 100644 index 0000000..2ecd38a --- /dev/null +++ b/apps/notification/common/probes/adapters.py @@ -0,0 +1,15 @@ +from . import FrameworkAdapter +from fastapi.responses import JSONResponse +from typing import Callable + +# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI. +class FastAPIAdapter(FrameworkAdapter): + def __init__(self, app): + self.app = app + + def register_route(self,path: str, handler: Callable): + async def wrapper(): + data, status_code = await handler() + return JSONResponse(content=data, status_code=status_code) + + self.app.add_api_route(path, wrapper, methods=["GET"]) diff --git a/apps/notification/webapi/bootstrap/application.py b/apps/notification/webapi/bootstrap/application.py index 718c166..67d4437 100644 --- a/apps/notification/webapi/bootstrap/application.py +++ b/apps/notification/webapi/bootstrap/application.py @@ -9,7 +9,10 @@ from webapi.providers import database from webapi.providers import scheduler from webapi.providers import message_queue from webapi.providers import exception_handler +from webapi.providers import probes +from webapi.providers import metrics from .freeleaps_app import FreeleapsApp +from common.config import app_settings def create_app() -> FastAPI: @@ -19,6 +22,14 @@ def create_app() -> FastAPI: register_logger() register(app, exception_handler) + # Register probe APIs if enabled + if app_settings.PROBES_ENABLED: + register(app, probes) + + # Register metrics APIs if enabled + if app_settings.METRICS_ENABLED: + register(app, metrics) + register(app, database) register(app, router) register(app, scheduler) diff --git a/apps/notification/webapi/providers/metrics.py b/apps/notification/webapi/providers/metrics.py new file mode 100644 index 0000000..ae5a634 --- /dev/null +++ b/apps/notification/webapi/providers/metrics.py @@ -0,0 +1,13 @@ +import logging +from prometheus_fastapi_instrumentator import Instrumentator +from common.config.app_settings import app_settings + +def register(app): + instrumentator = Instrumentator().instrument(app, + metric_namespace="freeleaps", + metric_subsystem=app_settings.APP_NAME) + + @app.on_event("startup") + async def startup(): + instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True) + logging.info("Metrics endpoint exposed at /api/_/metrics") \ No newline at end of file diff --git a/apps/notification/webapi/providers/probes.py b/apps/notification/webapi/providers/probes.py new file mode 100644 index 0000000..bef9e72 --- /dev/null +++ b/apps/notification/webapi/providers/probes.py @@ -0,0 +1,44 @@ +import asyncio +import socket +from common.probes import ProbeManager, ProbeType, ProbeResult +from common.probes.adapters import FastAPIAdapter +from common.config.app_settings import app_settings + +async def check_rabbitmq_connectivity() -> ProbeResult: + """ + Check if RabbitMQ server is accessible by attempting a TCP connection. + Does not require any RabbitMQ libraries. + + Returns a ProbeResult with success=True if the connection is successful. + """ + try: + _, writer = await asyncio.wait_for( + asyncio.open_connection(app_settings.RABBITMQ_HOST, app_settings.RABBITMQ_PORT), + timeout=5 + ) + writer.close() + await writer.wait_closed() + return ProbeResult(success=True) + except (socket.error, asyncio.TimeoutError, ConnectionRefusedError) as e: + return ProbeResult(success=False, error=f"RabbitMQ connection failed: {str(e)}") + except Exception as e: + return ProbeResult(success=False, error=f"Unexpected error when checking RabbitMQ: {str(e)}") + +def register(app): + probes_manager = ProbeManager() + probes_manager.register_adapter("fastapi", FastAPIAdapter(app)) + + probes_manager.register( + name="readiness", + prefix="/api", + type=ProbeType.READINESS, + check_func=lambda: check_rabbitmq_connectivity(), + frameworks=["fastapi"] + ) + + probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"]) + probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"]) + + @app.on_event("startup") + async def mark_startup_complete(): + probes_manager.mark_startup_complete() \ No newline at end of file diff --git a/apps/payment/common/config/app_settings.py b/apps/payment/common/config/app_settings.py index d316055..a99b386 100644 --- a/apps/payment/common/config/app_settings.py +++ b/apps/payment/common/config/app_settings.py @@ -9,6 +9,9 @@ class AppSettings(BaseSettings): ACCESS_TOKEN_EXPIRE_MINUTES: int = 3600 REFRESH_TOKEN_EXPIRE_DAYS: int = 1 + METRICS_ENABLED: bool = False + PROBES_ENABLED: bool = True + MONGODB_URI: str = "" MONGODB_NAME: str = "" diff --git a/apps/payment/common/probes/__init__.py b/apps/payment/common/probes/__init__.py new file mode 100644 index 0000000..4071df8 --- /dev/null +++ b/apps/payment/common/probes/__init__.py @@ -0,0 +1,140 @@ +import logging +from enum import Enum +from typing import Optional, Callable, Tuple, Dict +import inspect +from datetime import datetime, timezone + +# ProbeType is an Enum that defines the types of probes that can be registered. +class ProbeType(Enum): + LIVENESS = "liveness" + READINESS = "readiness" + STARTUP = "startup" + +# ProbeResult is a class that represents the result of a probe check. +class ProbeResult: + def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None): + self.success = success + self.message = message + self.data = data or {} + + def to_dict(self) -> dict: + return { + "success": self.success, + "message": self.message, + "data": self.data + } + +# Probe is a class that represents a probe that can be registered. +class Probe: + def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None): + self.type = type + self.path = path + self.check_fn = check_fn + self.name = name or f"{type.value}-{id(self)}" + + async def execute(self) -> ProbeResult: + try: + result = self.check_fn() + if inspect.isawaitable(result): + result = await result + + if isinstance(result, ProbeResult): + return result + elif isinstance(result, bool): + return ProbeResult(result, "ok" if result else "failed") + else: + return ProbeResult(True, "ok") + except Exception as e: + return ProbeResult(False, str(e)) + +# ProbeGroup is a class that represents a group of probes that can be checked together. +class ProbeGroup: + def __init__(self, path: str): + self.path = path + self.probes: Dict[str, Probe] = {} + + def add_probe(self, probe: Probe): + self.probes[probe.name] = probe + + async def check_all(self) -> Tuple[bool, dict]: + results = {} + all_success = True + + for name, probe in self.probes.items(): + result = await probe.execute() + results[name] = result.to_dict() + if not result.success: + all_success = False + + return all_success, results + +# FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters. +class FrameworkAdapter: + async def handle_request(self, group: ProbeGroup): + all_success, results = await group.check_all() + status_code = 200 if all_success else 503 + return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code + + def register_route(self, path: str, handler: Callable): + raise NotImplementedError + +# ProbeManager is a class that manages the registration of probes and their corresponding framework adapters. +class ProbeManager: + _default_paths = { + ProbeType.LIVENESS: "/_/livez", + ProbeType.READINESS: "/_/readyz", + ProbeType.STARTUP: "/_/healthz" + } + + def __init__(self): + self.groups: Dict[str, ProbeGroup] = {} + self.adapters: Dict[str, FrameworkAdapter] = {} + self._startup_complete = False + + def register_adapter(self, framework: str, adapter: FrameworkAdapter): + self.adapters[framework] = adapter + logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}") + + def register( + self, + type: ProbeType, + check_func: Optional[Callable] = None, + path: Optional[str] = None, + prefix: str = "", + name: Optional[str] = None, + frameworks: Optional[list] = None + ): + path = path or self._default_paths.get(type, "/_/healthz") + if prefix: + path = f"{prefix}{path}" + + if type == ProbeType.STARTUP and check_func is None: + check_func = self._default_startup_check + + probe = Probe(type, path, check_func or (lambda: True), name) + + if path not in self.groups: + self.groups[path] = ProbeGroup(path) + self.groups[path].add_probe(probe) + + for framework in (frameworks or ["default"]): + self._register_route(framework, path) + logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}") + + def _register_route(self, framework: str, path: str): + if framework not in self.adapters: + return + + adapter = self.adapters[framework] + group = self.groups[path] + + async def handler(): + return await adapter.handle_request(group) + + adapter.register_route(path, handler) + + def _default_startup_check(self) -> bool: + return self._startup_complete + + def mark_startup_complete(self): + self._startup_complete = True \ No newline at end of file diff --git a/apps/payment/common/probes/adapters.py b/apps/payment/common/probes/adapters.py new file mode 100644 index 0000000..2ecd38a --- /dev/null +++ b/apps/payment/common/probes/adapters.py @@ -0,0 +1,15 @@ +from . import FrameworkAdapter +from fastapi.responses import JSONResponse +from typing import Callable + +# FastAPIAdapter is a class that implements the FrameworkAdapter interface for FastAPI. +class FastAPIAdapter(FrameworkAdapter): + def __init__(self, app): + self.app = app + + def register_route(self,path: str, handler: Callable): + async def wrapper(): + data, status_code = await handler() + return JSONResponse(content=data, status_code=status_code) + + self.app.add_api_route(path, wrapper, methods=["GET"]) diff --git a/apps/payment/webapi/bootstrap/application.py b/apps/payment/webapi/bootstrap/application.py index 95e485e..edebb28 100644 --- a/apps/payment/webapi/bootstrap/application.py +++ b/apps/payment/webapi/bootstrap/application.py @@ -6,14 +6,12 @@ from webapi.providers import common from webapi.providers.logger import register_logger from webapi.providers import router from webapi.providers import database - # from webapi.providers import scheduler +from webapi.providers import metrics +from webapi.providers import probes from webapi.providers import exception_handler from .freeleaps_app import FreeleapsApp - -# prometheus -from prometheus_fastapi_instrumentator import Instrumentator - +from common.config import app_settings def create_app() -> FastAPI: logging.info("App initializing") @@ -29,8 +27,13 @@ def create_app() -> FastAPI: # Call the custom_openapi function to change the OpenAPI version customize_openapi_security(app) - # expose prometheus metrics - Instrumentator().instrument(app).expose(app) + # Register probe APIs if enabled + if app_settings.PROBES_ENABLED: + register(app, probes) + + # Register metrics APIs if enabled + if app_settings.METRICS_ENABLED: + register(app, metrics) return app diff --git a/apps/payment/webapi/providers/database.py b/apps/payment/webapi/providers/database.py index 70d5246..cba87d0 100644 --- a/apps/payment/webapi/providers/database.py +++ b/apps/payment/webapi/providers/database.py @@ -1,8 +1,16 @@ +import asyncio from common.config.app_settings import app_settings from beanie import init_beanie from motor.motor_asyncio import AsyncIOMotorClient from backend.models import backend_models +from common.probes import ProbeResult +client = AsyncIOMotorClient( + app_settings.MONGODB_URI, + serverSelectionTimeoutMS=60000, + minPoolSize=5, # Minimum number of connections in the pool + maxPoolSize=20, # Maximum number of connections in the pool +) def register(app): app.debug = "auth_mongo_debug" @@ -12,14 +20,15 @@ def register(app): async def start_database(): await initiate_database() +async def check_database_initialized() -> ProbeResult: + try: + await asyncio.wait_for(client.server_info(), timeout=5) + return ProbeResult(success=True, message="service has been initialized and ready to serve") + except Exception: + return ProbeResult(success=False, message="service is not initialized yet", data={"error": "database is not ready"}) + async def initiate_database(): - client = AsyncIOMotorClient( - app_settings.MONGODB_URI, - serverSelectionTimeoutMS=60000, - minPoolSize=5, # Minimum number of connections in the pool - maxPoolSize=20, # Maximum number of connections in the pool - ) await init_beanie( database=client[app_settings.MONGODB_NAME], document_models=backend_models ) diff --git a/apps/payment/webapi/providers/metrics.py b/apps/payment/webapi/providers/metrics.py new file mode 100644 index 0000000..ae5a634 --- /dev/null +++ b/apps/payment/webapi/providers/metrics.py @@ -0,0 +1,13 @@ +import logging +from prometheus_fastapi_instrumentator import Instrumentator +from common.config.app_settings import app_settings + +def register(app): + instrumentator = Instrumentator().instrument(app, + metric_namespace="freeleaps", + metric_subsystem=app_settings.APP_NAME) + + @app.on_event("startup") + async def startup(): + instrumentator.expose(app, endpoint="/api/_/metrics", should_gzip=True) + logging.info("Metrics endpoint exposed at /api/_/metrics") \ No newline at end of file diff --git a/apps/payment/webapi/providers/probes.py b/apps/payment/webapi/providers/probes.py new file mode 100644 index 0000000..7c5b5d8 --- /dev/null +++ b/apps/payment/webapi/providers/probes.py @@ -0,0 +1,25 @@ +from common.probes import ProbeManager, ProbeType +from common.probes.adapters import FastAPIAdapter +from .database import check_database_initialized + +def register(app): + probes_manager = ProbeManager() + probes_manager.register_adapter("fastapi", FastAPIAdapter(app)) + + async def readiness_checker(): + return await check_database_initialized() + + probes_manager.register( + name="readiness", + prefix="/api", + type=ProbeType.READINESS, + check_func=readiness_checker, + frameworks=["fastapi"] + ) + + probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"]) + probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"]) + + @app.on_event("startup") + async def mark_startup_complete(): + probes_manager.mark_startup_complete() \ No newline at end of file