freeleaps-service-hub/apps/devops/app/providers/message_queue.py
Nicolas e4fe9394b1 fix: use environment variable for RabbitMQ output queue name
- Add RABBITMQ_OUTPUT_QUEUE_NAME environment variable support
- Fix hardcoded queue name 'reconciler.output' to use configurable queue name
- Default to 'freeleaps.devops.reconciler.output' if env var not set
- Add debug logging to show which queue name is being used
- This fixes the issue where 42 messages were stuck in the output queue due to queue name mismatch
2025-08-08 12:01:47 +08:00

60 lines
2.8 KiB
Python

import asyncio
import os
from app.backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber
from app.backend.services.deployment_status_update_service import DeploymentStatusUpdateService
def register(app):
# Initialize the message subscriber and status update service lazily to avoid blocking startup
app.deployment_heartbeat_subscriber = None
app.deployment_status_service = None
@app.on_event("startup")
async def start_message_consumers():
print("🚀 Starting message consumers...")
try:
# Initialize services during startup to avoid blocking app initialization
print("🔧 Initializing services...")
output_queue_name = os.getenv("RABBITMQ_OUTPUT_QUEUE_NAME", "freeleaps.devops.reconciler.output")
print(f"Using output queue: {output_queue_name}")
app.deployment_heartbeat_subscriber = AsyncMQSubscriber(output_queue_name)
app.deployment_status_service = DeploymentStatusUpdateService()
print("✅ Services initialized")
# Register the heartbeat processor
print("📝 Registering deployment heartbeat processor...")
await app.deployment_heartbeat_subscriber.register_consumer(
registry_key="deployment_heartbeat_processor",
callback_method=app.deployment_status_service.process_heartbeat_message,
args={}
)
print("✅ Registered deployment heartbeat processor")
# Start the subscriber in the background
print("🔄 Starting subscriber in background...")
loop = asyncio.get_running_loop()
loop.create_task(
app.deployment_heartbeat_subscriber.subscribe(max_retries=5, event_loop=loop)
)
print("✅ Started deployment heartbeat subscriber")
print("🎉 Message consumers startup complete!")
except Exception as e:
print(f"❌ Error in message consumer startup: {e}")
# Don't raise the exception to prevent app startup failure
print("⚠️ App will continue without message queue functionality")
@app.on_event("shutdown")
async def stop_message_consumers():
# Clear consumers and close connection
print("Stopping message consumers")
if app.deployment_heartbeat_subscriber:
try:
await app.deployment_heartbeat_subscriber.clear_all_consumers()
print("Cleared all consumers")
await app.deployment_heartbeat_subscriber.close()
print("Closed deployment heartbeat subscriber")
except Exception as e:
print(f"Error during shutdown: {e}")
else:
print("No message consumers to stop")