diff --git a/apps/devops/app/providers/message_queue.py b/apps/devops/app/providers/message_queue.py index 51cb071..978e232 100644 --- a/apps/devops/app/providers/message_queue.py +++ b/apps/devops/app/providers/message_queue.py @@ -4,33 +4,54 @@ from app.backend.services.deployment_status_update_service import DeploymentStat def register(app): - # Initialize the message subscriber and status update service - app.deployment_heartbeat_subscriber = AsyncMQSubscriber("devops_reconcile_heartbeat") - app.deployment_status_service = DeploymentStatusUpdateService() + # Initialize the message subscriber and status update service lazily to avoid blocking startup + app.deployment_heartbeat_subscriber = None + app.deployment_status_service = None @app.on_event("startup") async def start_message_consumers(): - print("Starting message consumers") - # Register the heartbeat processor - await app.deployment_heartbeat_subscriber.register_consumer( - registry_key="deployment_heartbeat_processor", - callback_method=app.deployment_status_service.process_heartbeat_message, - args={} - ) - print("Registered deployment heartbeat processor") + print("🚀 Starting message consumers...") - # Start the subscriber in the background - loop = asyncio.get_running_loop() - loop.create_task( - app.deployment_heartbeat_subscriber.subscribe(max_retries=5, event_loop=loop) - ) - print("Started deployment heartbeat subscriber") + try: + # Initialize services during startup to avoid blocking app initialization + print("🔧 Initializing services...") + app.deployment_heartbeat_subscriber = AsyncMQSubscriber("devops_reconcile_heartbeat") + app.deployment_status_service = DeploymentStatusUpdateService() + print("✅ Services initialized") + + # Register the heartbeat processor + print("📝 Registering deployment heartbeat processor...") + await app.deployment_heartbeat_subscriber.register_consumer( + registry_key="deployment_heartbeat_processor", + callback_method=app.deployment_status_service.process_heartbeat_message, + args={} + ) + print("✅ Registered deployment heartbeat processor") + + # Start the subscriber in the background + print("🔄 Starting subscriber in background...") + loop = asyncio.get_running_loop() + loop.create_task( + app.deployment_heartbeat_subscriber.subscribe(max_retries=5, event_loop=loop) + ) + print("✅ Started deployment heartbeat subscriber") + print("🎉 Message consumers startup complete!") + except Exception as e: + print(f"❌ Error in message consumer startup: {e}") + # Don't raise the exception to prevent app startup failure + print("⚠️ App will continue without message queue functionality") @app.on_event("shutdown") async def stop_message_consumers(): # Clear consumers and close connection print("Stopping message consumers") - await app.deployment_heartbeat_subscriber.clear_all_consumers() - print("Cleared all consumers") - await app.deployment_heartbeat_subscriber.close() - print("Closed deployment heartbeat subscriber") \ No newline at end of file + if app.deployment_heartbeat_subscriber: + try: + await app.deployment_heartbeat_subscriber.clear_all_consumers() + print("Cleared all consumers") + await app.deployment_heartbeat_subscriber.close() + print("Closed deployment heartbeat subscriber") + except Exception as e: + print(f"Error during shutdown: {e}") + else: + print("No message consumers to stop") \ No newline at end of file