freeleaps-service-hub/apps/notification/webapi/providers/probes.py
2025-03-17 10:46:02 +08:00

44 lines
1.7 KiB
Python

import asyncio
import socket
from common.probes import ProbeManager, ProbeType, ProbeResult
from common.probes.adapters import FastAPIAdapter
from common.config.app_settings import app_settings
async def check_rabbitmq_connectivity() -> ProbeResult:
"""
Check if RabbitMQ server is accessible by attempting a TCP connection.
Does not require any RabbitMQ libraries.
Returns a ProbeResult with success=True if the connection is successful.
"""
try:
_, writer = await asyncio.wait_for(
asyncio.open_connection(app_settings.RABBITMQ_HOST, app_settings.RABBITMQ_PORT),
timeout=5
)
writer.close()
await writer.wait_closed()
return ProbeResult(success=True)
except (socket.error, asyncio.TimeoutError, ConnectionRefusedError) as e:
return ProbeResult(success=False, error=f"RabbitMQ connection failed: {str(e)}")
except Exception as e:
return ProbeResult(success=False, error=f"Unexpected error when checking RabbitMQ: {str(e)}")
def register(app):
probes_manager = ProbeManager()
probes_manager.register_adapter("fastapi", FastAPIAdapter(app))
probes_manager.register(
name="readiness",
prefix="/api",
type=ProbeType.READINESS,
check_func=lambda: check_rabbitmq_connectivity(),
frameworks=["fastapi"]
)
probes_manager.register(name="liveness", prefix="/api", type=ProbeType.LIVENESS, frameworks=["fastapi"])
probes_manager.register(name="startup", prefix="/api", type=ProbeType.STARTUP, frameworks=["fastapi"])
@app.on_event("startup")
async def mark_startup_complete():
probes_manager.mark_startup_complete()