- Add RABBITMQ_OUTPUT_QUEUE_NAME environment variable support - Fix hardcoded queue name 'reconciler.output' to use configurable queue name - Default to 'freeleaps.devops.reconciler.output' if env var not set - Add debug logging to show which queue name is being used - This fixes the issue where 42 messages were stuck in the output queue due to queue name mismatch
60 lines
2.8 KiB
Python
60 lines
2.8 KiB
Python
import asyncio
|
|
import os
|
|
from app.backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber
|
|
from app.backend.services.deployment_status_update_service import DeploymentStatusUpdateService
|
|
|
|
|
|
def register(app):
|
|
# Initialize the message subscriber and status update service lazily to avoid blocking startup
|
|
app.deployment_heartbeat_subscriber = None
|
|
app.deployment_status_service = None
|
|
|
|
@app.on_event("startup")
|
|
async def start_message_consumers():
|
|
print("🚀 Starting message consumers...")
|
|
|
|
try:
|
|
# Initialize services during startup to avoid blocking app initialization
|
|
print("🔧 Initializing services...")
|
|
output_queue_name = os.getenv("RABBITMQ_OUTPUT_QUEUE_NAME", "freeleaps.devops.reconciler.output")
|
|
print(f"Using output queue: {output_queue_name}")
|
|
app.deployment_heartbeat_subscriber = AsyncMQSubscriber(output_queue_name)
|
|
app.deployment_status_service = DeploymentStatusUpdateService()
|
|
print("✅ Services initialized")
|
|
|
|
# Register the heartbeat processor
|
|
print("📝 Registering deployment heartbeat processor...")
|
|
await app.deployment_heartbeat_subscriber.register_consumer(
|
|
registry_key="deployment_heartbeat_processor",
|
|
callback_method=app.deployment_status_service.process_heartbeat_message,
|
|
args={}
|
|
)
|
|
print("✅ Registered deployment heartbeat processor")
|
|
|
|
# Start the subscriber in the background
|
|
print("🔄 Starting subscriber in background...")
|
|
loop = asyncio.get_running_loop()
|
|
loop.create_task(
|
|
app.deployment_heartbeat_subscriber.subscribe(max_retries=5, event_loop=loop)
|
|
)
|
|
print("✅ Started deployment heartbeat subscriber")
|
|
print("🎉 Message consumers startup complete!")
|
|
except Exception as e:
|
|
print(f"❌ Error in message consumer startup: {e}")
|
|
# Don't raise the exception to prevent app startup failure
|
|
print("⚠️ App will continue without message queue functionality")
|
|
|
|
@app.on_event("shutdown")
|
|
async def stop_message_consumers():
|
|
# Clear consumers and close connection
|
|
print("Stopping message consumers")
|
|
if app.deployment_heartbeat_subscriber:
|
|
try:
|
|
await app.deployment_heartbeat_subscriber.clear_all_consumers()
|
|
print("Cleared all consumers")
|
|
await app.deployment_heartbeat_subscriber.close()
|
|
print("Closed deployment heartbeat subscriber")
|
|
except Exception as e:
|
|
print(f"Error during shutdown: {e}")
|
|
else:
|
|
print("No message consumers to stop") |