25 lines
879 B
Python
25 lines
879 B
Python
from common.probes import ProbeManager, ProbeType
|
|
from common.probes.adapters import FastAPIAdapter
|
|
from .database import check_database_initialized
|
|
|
|
def register(app):
|
|
probes_manager = ProbeManager()
|
|
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
|
|
|
|
async def readiness_checker():
|
|
return await check_database_initialized()
|
|
|
|
probes_manager.register(
|
|
name="readiness",
|
|
prefix="/api",
|
|
type=ProbeType.READINESS,
|
|
check_func=readiness_checker,
|
|
frameworks=["fastapi"]
|
|
)
|
|
|
|
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
|
|
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
|
|
|
|
@app.on_event("startup")
|
|
async def mark_startup_complete():
|
|
probes_manager.mark_startup_complete() |