""" RabbitMQ Reliable Message Producer Module """ import asyncio import aio_pika import json import logging from datetime import datetime from typing import Dict, Any, Optional from .config import config logger = logging.getLogger(__name__) class ReliableProducer: """Reliable Message Producer""" def __init__(self, exchange_name: Optional[str] = None, queue_name: Optional[str] = None): """ Initialize producer Args: exchange_name: Exchange name, defaults to config value queue_name: Queue name, defaults to config value """ self.exchange_name = exchange_name or config.exchange_name self.queue_name = queue_name or config.queue_name self.connection = None self.channel = None self.exchange = None self.queue = None async def connect(self): """Establish connection and setup confirmation mechanism""" try: # Use robust connection with auto-reconnect support connection_config = config.get_connection_config() self.connection = await aio_pika.connect_robust(connection_config['uri']) self.channel = await self.connection.channel() # Enable publisher confirmations - ensure messages are successfully sent to queue await self.channel.set_qos(prefetch_count=connection_config['prefetch_count']) # Declare durable exchange self.exchange = await self.channel.declare_exchange( self.exchange_name, aio_pika.ExchangeType.DIRECT, durable=True # Exchange persistence ) # Declare durable queue self.queue = await self.channel.declare_queue( self.queue_name, durable=True, # Queue persistence auto_delete=False, # Queue not auto-deleted ) # Bind queue to exchange await self.queue.bind(self.exchange, routing_key="reliable") logger.info(f"[Producer] Connected, queue: {self.queue_name}") except Exception as e: logger.error(f"[Producer] Connection failed: {e}") raise def _generate_message_id(self, message_data: Dict[str, Any]) -> str: """ Generate message ID for message For duplicate_test type messages, generate fixed ID for idempotency testing Args: message_data: Message data dictionary Returns: str: Message ID """ message_type = message_data.get('type', '') content = message_data.get('content', '') # For duplicate_test type messages, generate fixed ID based on content if message_type == 'duplicate_test': # Use content to generate fixed message ID import hashlib content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() return f"duplicate_{content_hash[:8]}" else: # Other messages use timestamp to generate unique ID return f"msg_{asyncio.get_running_loop().time()}" async def publish_reliable_message(self, message_data: Dict[str, Any]) -> bool: """ Publish reliable message Args: message_data: Message data dictionary Returns: bool: Whether sending was successful """ try: # Generate message ID message_id = self._generate_message_id(message_data) # Add message metadata message_data.update({ 'timestamp': datetime.now().isoformat(), 'message_id': message_id }) # Create persistent message message = aio_pika.Message( body=json.dumps(message_data, ensure_ascii=False).encode('utf-8'), delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # Message persistence message_id=message_id, timestamp=datetime.now() ) # Send message and wait for confirmation await self.exchange.publish( message, routing_key="reliable" ) logger.info(f"[Producer] Message sent: {message_id} (type: {message_data.get('type', 'N/A')}, content: {message_data.get('content', 'N/A')})") return True except Exception as e: logger.error(f"[Producer] Failed to send message: {e}") return False async def close(self): """Close connection""" try: if self.connection: await self.connection.close() logger.info("[Producer] Connection closed") except Exception as e: logger.error(f"[Producer] Error closing connection: {e}") async def __aenter__(self): """Async context manager entry""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit""" await self.close()