from app.common.config.app_settings import app_settings from app.common.log.module_logger import ModuleLogger import asyncio from asyncio import AbstractEventLoop import aio_pika class AsyncMQClient: exchange_name_format = "freeleaps.devops.exchange.{}" exchange_type = "direct" def __init__(self, channel_name: str) -> None: self.exchange_name_format = AsyncMQClient.exchange_name_format self.channel_name = channel_name self.exchange_type = AsyncMQClient.exchange_type self.exchange_name = self.exchange_name_format.format(self.channel_name) self.process_callable = None self.routing_key = self.channel_name self.module_logger = ModuleLogger(sender_id="AsyncMQClient") async def bind(self, max_retries=10, event_loop: AbstractEventLoop = None): retry_count = 0 retry_interval = 1 while retry_count < max_retries: try: self.connection = await aio_pika.connect_robust( host=app_settings.RABBITMQ_HOST, port=int(app_settings.RABBITMQ_PORT), login=app_settings.RABBITMQ_USERNAME, password=app_settings.RABBITMQ_PASSWORD, virtualhost=app_settings.RABBITMQ_VIRTUAL_HOST, loop=event_loop, ) self.channel = await self.connection.channel() self.exchange = await self.channel.declare_exchange( name=self.exchange_name, type="direct", auto_delete=False ) # Connect to existing named queue instead of creating anonymous queue # channel_name already contains the full queue name from environment variable self.queue = await self.channel.declare_queue( name=self.channel_name, exclusive=False, auto_delete=False, durable=True ) await self.queue.bind( exchange=self.exchange, routing_key=self.routing_key ) break except Exception as e: await self.module_logger.log_exception( exception=e, text=f"Reconnection attempt {retry_count + 1}/{max_retries} failed: {e}", ) await asyncio.sleep(retry_interval) retry_interval = min(retry_interval * 2, 60) retry_count += 1 if retry_count >= max_retries: raise ConnectionError( "Unable to connect to RabbitMQ after multiple retries." ) async def close(self): """Unbind the queue and close the connection gracefully.""" await self.queue.unbind(self.exchange, self.routing_key) await self.connection.close()