from common.config.app_settings import app_settings from common.log.module_logger import ModuleLogger import asyncio from asyncio import AbstractEventLoop import aio_pika class AsyncMQClient: exchange_name_format = "freeleaps.notification.exchange.{}" exchange_type = "direct" def __init__(self, channel_name: str) -> None: self.exchange_name_format = AsyncMQClient.exchange_name_format self.channel_name = channel_name self.exchange_type = AsyncMQClient.exchange_type self.exchange_name = self.exchange_name_format.format(self.channel_name) self.process_callable = None self.routing_key = self.channel_name self.module_logger = ModuleLogger(sender_id="AsyncMQClient") async def bind(self, max_retries=10, event_loop: AbstractEventLoop = None): retry_count = 0 retry_interval = 1 # Start with a 1-second interval while retry_count < max_retries: try: self.connection = await aio_pika.connect_robust( host=app_settings.RABBITMQ_HOST, port=int(app_settings.RABBITMQ_PORT), loop=event_loop, ) self.channel = await self.connection.channel() self.exchange = await self.channel.declare_exchange( name=self.exchange_name, type="direct", auto_delete=False ) # Declare and bind queue if it's not set by a specific client self.queue = await self.channel.declare_queue( name=None, exclusive=True, auto_delete=True, durable=False ) await self.queue.bind( exchange=self.exchange, routing_key=self.routing_key ) break # Exit loop once connected except Exception as e: await self.module_logger.log_exception( exception=e, text=f"Reconnection attempt {retry_count + 1}/{max_retries} failed: {e}", ) await asyncio.sleep(retry_interval) retry_interval = min( retry_interval * 2, 60 ) # Exponential backoff, up to 60s max retry_count += 1 if retry_count >= max_retries: raise ConnectionError( "Unable to connect to RabbitMQ after multiple retries." ) async def close(self): """Unbind the queue and close the connection gracefully.""" await self.queue.unbind(self.exchange, self.routing_key) await self.connection.close()