freeleaps-service-hub/apps/devops/app/backend/infra/rabbitmq/async_client.py
Nicolas 04acd78d78 fix: connect to existing named queue instead of creating anonymous queue
- Change AsyncMQClient to connect to existing persistent queue by name
- Fix issue where DevOps Service created temporary anonymous queues instead of consuming from the correct named queue
- This allows consuming the 42 backlogged messages in freeleaps.devops.reconciler.output queue
- Change queue properties: exclusive=False, auto_delete=False, durable=True
- Resolves the core issue where messages were split between persistent and temporary queues
2025-08-08 12:45:21 +08:00

66 lines
2.8 KiB
Python

from app.common.config.app_settings import app_settings
from app.common.log.module_logger import ModuleLogger
import asyncio
from asyncio import AbstractEventLoop
import aio_pika
class AsyncMQClient:
exchange_name_format = "freeleaps.devops.exchange.{}"
exchange_type = "direct"
def __init__(self, channel_name: str) -> None:
self.exchange_name_format = AsyncMQClient.exchange_name_format
self.channel_name = channel_name
self.exchange_type = AsyncMQClient.exchange_type
self.exchange_name = self.exchange_name_format.format(self.channel_name)
self.process_callable = None
self.routing_key = self.channel_name
self.module_logger = ModuleLogger(sender_id="AsyncMQClient")
async def bind(self, max_retries=10, event_loop: AbstractEventLoop = None):
retry_count = 0
retry_interval = 1
while retry_count < max_retries:
try:
self.connection = await aio_pika.connect_robust(
host=app_settings.RABBITMQ_HOST,
port=int(app_settings.RABBITMQ_PORT),
login=app_settings.RABBITMQ_USERNAME,
password=app_settings.RABBITMQ_PASSWORD,
virtualhost=app_settings.RABBITMQ_VIRTUAL_HOST,
loop=event_loop,
)
self.channel = await self.connection.channel()
self.exchange = await self.channel.declare_exchange(
name=self.exchange_name, type="direct", auto_delete=False
)
# Connect to existing named queue instead of creating anonymous queue
# channel_name already contains the full queue name from environment variable
self.queue = await self.channel.declare_queue(
name=self.channel_name, exclusive=False, auto_delete=False, durable=True
)
await self.queue.bind(
exchange=self.exchange, routing_key=self.routing_key
)
break
except Exception as e:
await self.module_logger.log_exception(
exception=e,
text=f"Reconnection attempt {retry_count + 1}/{max_retries} failed: {e}",
)
await asyncio.sleep(retry_interval)
retry_interval = min(retry_interval * 2, 60)
retry_count += 1
if retry_count >= max_retries:
raise ConnectionError(
"Unable to connect to RabbitMQ after multiple retries."
)
async def close(self):
"""Unbind the queue and close the connection gracefully."""
await self.queue.unbind(self.exchange, self.routing_key)
await self.connection.close()