refactor: enhance lazy initialization and error handling in message consumer startup

Updated the message consumer initialization to occur lazily during the startup event, improving application startup performance. Added detailed logging for service initialization, registration, and error handling to enhance traceability and robustness during startup and shutdown processes.

Signed-off-by: zhenyus <zhenyus@mathmast.com>
This commit is contained in:
zhenyus 2025-08-03 03:01:11 +08:00
parent 95b6560ffd
commit 2dd73c0734

View File

@ -4,33 +4,54 @@ from app.backend.services.deployment_status_update_service import DeploymentStat
def register(app):
# Initialize the message subscriber and status update service
app.deployment_heartbeat_subscriber = AsyncMQSubscriber("devops_reconcile_heartbeat")
app.deployment_status_service = DeploymentStatusUpdateService()
# Initialize the message subscriber and status update service lazily to avoid blocking startup
app.deployment_heartbeat_subscriber = None
app.deployment_status_service = None
@app.on_event("startup")
async def start_message_consumers():
print("Starting message consumers")
# Register the heartbeat processor
await app.deployment_heartbeat_subscriber.register_consumer(
registry_key="deployment_heartbeat_processor",
callback_method=app.deployment_status_service.process_heartbeat_message,
args={}
)
print("Registered deployment heartbeat processor")
print("🚀 Starting message consumers...")
# Start the subscriber in the background
loop = asyncio.get_running_loop()
loop.create_task(
app.deployment_heartbeat_subscriber.subscribe(max_retries=5, event_loop=loop)
)
print("Started deployment heartbeat subscriber")
try:
# Initialize services during startup to avoid blocking app initialization
print("🔧 Initializing services...")
app.deployment_heartbeat_subscriber = AsyncMQSubscriber("devops_reconcile_heartbeat")
app.deployment_status_service = DeploymentStatusUpdateService()
print("✅ Services initialized")
# Register the heartbeat processor
print("📝 Registering deployment heartbeat processor...")
await app.deployment_heartbeat_subscriber.register_consumer(
registry_key="deployment_heartbeat_processor",
callback_method=app.deployment_status_service.process_heartbeat_message,
args={}
)
print("✅ Registered deployment heartbeat processor")
# Start the subscriber in the background
print("🔄 Starting subscriber in background...")
loop = asyncio.get_running_loop()
loop.create_task(
app.deployment_heartbeat_subscriber.subscribe(max_retries=5, event_loop=loop)
)
print("✅ Started deployment heartbeat subscriber")
print("🎉 Message consumers startup complete!")
except Exception as e:
print(f"❌ Error in message consumer startup: {e}")
# Don't raise the exception to prevent app startup failure
print("⚠️ App will continue without message queue functionality")
@app.on_event("shutdown")
async def stop_message_consumers():
# Clear consumers and close connection
print("Stopping message consumers")
await app.deployment_heartbeat_subscriber.clear_all_consumers()
print("Cleared all consumers")
await app.deployment_heartbeat_subscriber.close()
print("Closed deployment heartbeat subscriber")
if app.deployment_heartbeat_subscriber:
try:
await app.deployment_heartbeat_subscriber.clear_all_consumers()
print("Cleared all consumers")
await app.deployment_heartbeat_subscriber.close()
print("Closed deployment heartbeat subscriber")
except Exception as e:
print(f"Error during shutdown: {e}")
else:
print("No message consumers to stop")