import logging from enum import Enum from typing import Optional, Callable, Tuple, Dict import inspect from datetime import datetime, timezone # ProbeType is an Enum that defines the types of probes that can be registered. class ProbeType(Enum): LIVENESS = "liveness" READINESS = "readiness" STARTUP = "startup" # ProbeResult is a class that represents the result of a probe check. class ProbeResult: def __init__(self, success: bool, message: str = "ok", data: Optional[dict] = None): self.success = success self.message = message self.data = data or {} def to_dict(self) -> dict: return { "success": self.success, "message": self.message, "data": self.data } # Probe is a class that represents a probe that can be registered. class Probe: def __init__(self, type: ProbeType, path: str, check_fn: Callable, name: Optional[str] = None): self.type = type self.path = path self.check_fn = check_fn self.name = name or f"{type.value}-{id(self)}" async def execute(self) -> ProbeResult: try: result = self.check_fn() if inspect.isawaitable(result): result = await result if isinstance(result, ProbeResult): return result elif isinstance(result, bool): return ProbeResult(result, "ok" if result else "failed") else: return ProbeResult(True, "ok") except Exception as e: return ProbeResult(False, str(e)) # ProbeGroup is a class that represents a group of probes that can be checked together. class ProbeGroup: def __init__(self, path: str): self.path = path self.probes: Dict[str, Probe] = {} def add_probe(self, probe: Probe): self.probes[probe.name] = probe async def check_all(self) -> Tuple[bool, dict]: results = {} all_success = True for name, probe in self.probes.items(): result = await probe.execute() results[name] = result.to_dict() if not result.success: all_success = False return all_success, results # FrameworkAdapter is an abstract class that defines the interface for framework-specific probe adapters. class FrameworkAdapter: async def handle_request(self, group: ProbeGroup): all_success, results = await group.check_all() status_code = 200 if all_success else 503 return {"status": "ok" if all_success else "failed", "payload": results, "timestamp": int(datetime.now(timezone.utc).timestamp())}, status_code def register_route(self, path: str, handler: Callable): raise NotImplementedError # ProbeManager is a class that manages the registration of probes and their corresponding framework adapters. class ProbeManager: _default_paths = { ProbeType.LIVENESS: "/_/livez", ProbeType.READINESS: "/_/readyz", ProbeType.STARTUP: "/_/healthz" } def __init__(self): self.groups: Dict[str, ProbeGroup] = {} self.adapters: Dict[str, FrameworkAdapter] = {} self._startup_complete = False def register_adapter(self, framework: str, adapter: FrameworkAdapter): self.adapters[framework] = adapter logging.info(f"Registered probe adapter ({adapter}) for framework: {framework}") def register( self, type: ProbeType, check_func: Optional[Callable] = None, path: Optional[str] = None, prefix: str = "", name: Optional[str] = None, frameworks: Optional[list] = None ): path = path or self._default_paths.get(type, "/_/healthz") if prefix: path = f"{prefix}{path}" if type == ProbeType.STARTUP and check_func is None: check_func = self._default_startup_check probe = Probe(type, path, check_func or (lambda: True), name) if path not in self.groups: self.groups[path] = ProbeGroup(path) self.groups[path].add_probe(probe) for framework in (frameworks or ["default"]): self._register_route(framework, path) logging.info(f"Registered {type.value} probe route ({path}) for framework: {framework}") def _register_route(self, framework: str, path: str): if framework not in self.adapters: return adapter = self.adapters[framework] group = self.groups[path] async def handler(): return await adapter.handle_request(group) adapter.register_route(path, handler) def _default_startup_check(self) -> bool: return self._startup_complete def mark_startup_complete(self): self._startup_complete = True