""" RabbitMQ Reliable Message Consumer Module """ import asyncio import aio_pika import json import logging from datetime import datetime from typing import Dict, Any, Optional, Callable, Set from .config import config logger = logging.getLogger(__name__) class ReliableConsumer: """Reliable Message Consumer""" def __init__(self, queue_name: Optional[str] = None, consumer_name: Optional[str] = None, message_handler: Optional[Callable] = None): """ Initialize consumer Args: queue_name: Queue name, defaults to config value consumer_name: Consumer name message_handler: Custom message handler function """ self.queue_name = queue_name or config.queue_name self.consumer_name = consumer_name or "reliable_consumer" self.message_handler = message_handler or self.default_message_handler self.connection = None self.channel = None self.queue = None self.processed_messages: Set[str] = set() # Store processed message IDs to prevent duplicate processing async def connect(self): """Establish connection""" try: connection_config = config.get_connection_config() self.connection = await aio_pika.connect_robust(connection_config['uri']) self.channel = await self.connection.channel() # Set QoS to ensure only one message is processed at a time await self.channel.set_qos(prefetch_count=connection_config['prefetch_count']) # Declare queue (ensure queue exists) self.queue = await self.channel.declare_queue( self.queue_name, durable=True, auto_delete=False ) logger.info(f"[Consumer-{self.consumer_name}] Connected, listening to queue: {self.queue_name}") except Exception as e: logger.error(f"[Consumer-{self.consumer_name}] Connection failed: {e}") raise async def process_message(self, message: aio_pika.IncomingMessage): """Core message processing logic""" try: # Parse message message_data = json.loads(message.body.decode('utf-8')) message_id = message_data.get('message_id') # Check if message has been processed before (idempotency check) if message_id in self.processed_messages: logger.warning("=" * 50) logger.warning(f"[Consumer-{self.consumer_name}] 🚫 Duplicate message detected, skipping:") logger.warning(f"[Consumer-{self.consumer_name}] Message ID: {message_id}") logger.warning(f"[Consumer-{self.consumer_name}] Message content: {json.dumps(message_data, ensure_ascii=False, indent=2)}") logger.warning(f"[Consumer-{self.consumer_name}] Total processed messages: {len(self.processed_messages)}") logger.warning("=" * 50) await message.ack() return logger.info(f"[Consumer-{self.consumer_name}] Starting to process message: {message_id}") logger.info(f"[Consumer-{self.consumer_name}] Message content: {message_data}") # Retry processing message directly success = await self.retry_process_message(message_data, message_id, 0) # Only record processed message ID after successful processing if success: self.processed_messages.add(message_id) # Acknowledge message await message.ack() logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed and acknowledged") logger.info(f"[Consumer-{self.consumer_name}] Current processed message count: {len(self.processed_messages)}") else: # Processing failed, don't record message ID, send to dead letter queue await self.send_to_dead_letter_queue(message, message_id, "Processing failed") await message.ack() # Acknowledge message to avoid infinite retry except Exception as e: logger.error(f"[Consumer-{self.consumer_name}] Failed to process message: {e}") # Send directly to dead letter queue with error information message_data = json.loads(message.body.decode('utf-8')) message_id = message_data.get('message_id') await self.send_to_dead_letter_queue(message, message_id, str(e)) await message.ack() # Acknowledge message to avoid infinite retry async def default_message_handler(self, message_data: Dict[str, Any]): """Default message handler function""" # Simulate processing time await asyncio.sleep(1) # Decide whether to fail based on message type message_type = message_data.get('type', '') if message_type == 'will_fail': # Specific type of messages always fail, used for testing dead letter queue raise Exception(f"Simulated business processing failure: {message_data.get('content', '')}") else: pass logger.info(f"[Consumer-{self.consumer_name}] Business logic processing completed: {message_data.get('content', '')}") async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool: """Retry processing message directly""" max_retries = config.max_retries last_error = None for attempt in range(max_retries + 1): try: logger.info(f"[Consumer-{self.consumer_name}] Attempting to process message {message_id}, attempt {attempt + 1}") await self.message_handler(message_data) logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed successfully") return True # Processing successful, return True except Exception as e: last_error = str(e) logger.warning(f"[Consumer-{self.consumer_name}] Message {message_id} failed on attempt {attempt + 1}: {e}") if attempt < max_retries: # Wait for a while before retrying await asyncio.sleep(1) else: # All retries failed, return False logger.error(f"[Consumer-{self.consumer_name}] Message {message_id} failed after {max_retries} retries: {last_error}") return False async def send_to_dead_letter_queue(self, message: aio_pika.IncomingMessage, message_id: str, error_info: str = None): """发送消息到死信队列""" try: # 解析消息内容 message_data = json.loads(message.body.decode('utf-8')) # 构建死信消息,包含原始消息和错误信息 dead_letter_data = { 'original_message': message_data, 'error_info': error_info or '重试失败', 'dead_letter_timestamp': datetime.now().isoformat(), 'message_id': message_id, 'consumer_name': self.consumer_name, 'queue_name': self.queue_name } logger.error(f"[Consumer-{self.consumer_name}] Message sent to dead letter queue: {message_id}, error: {error_info}") # Create dead letter exchange and queue dead_letter_config = config.get_dead_letter_config() dead_letter_exchange = await self.channel.declare_exchange( dead_letter_config['dead_letter_exchange'], aio_pika.ExchangeType.DIRECT, durable=True ) dead_letter_queue = await self.channel.declare_queue( dead_letter_config['dead_letter_queue'], durable=True, auto_delete=False ) await dead_letter_queue.bind( dead_letter_exchange, routing_key=dead_letter_config['dead_letter_routing_key'] ) # Create dead letter message dead_letter_message = aio_pika.Message( body=json.dumps(dead_letter_data, ensure_ascii=False).encode('utf-8'), delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=f"dead_letter_{message_id}" ) # Send to dead letter queue await dead_letter_exchange.publish( dead_letter_message, routing_key=dead_letter_config['dead_letter_routing_key'] ) logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} sent to dead letter queue") except Exception as e: logger.error(f"[Consumer-{self.consumer_name}] Failed to send to dead letter queue: {e}") logger.error(f"[Consumer-{self.consumer_name}] Original message content: {message.body.decode('utf-8') if message.body else 'None'}") async def start_consuming(self): """开始消费消息""" self.consumer_tag = await self.queue.consume(self.process_message) logger.info(f"[消费者-{self.consumer_name}] 开始消费消息...") # 保持消费者运行 await asyncio.Future() async def stop_consuming(self): """停止消费消息""" if self.queue and self.consumer_tag: await self.queue.cancel(self.consumer_tag) logger.info(f"[消费者-{self.consumer_name}] 已停止消费消息") async def close(self): """关闭连接""" try: await self.stop_consuming() if self.connection: await self.connection.close() logger.info(f"[消费者-{self.consumer_name}] 连接已关闭") # 打印最终统计信息 self.print_processed_messages_stats() except Exception as e: logger.error(f"[消费者-{self.consumer_name}] 关闭连接时出错: {e}") def get_processed_messages_stats(self): """获取已处理消息的统计信息""" return { 'total_processed': len(self.processed_messages), 'processed_message_ids': list(self.processed_messages) } def print_processed_messages_stats(self): """打印已处理消息的统计信息""" stats = self.get_processed_messages_stats() logger.info("=" * 50) logger.info(f"[消费者-{self.consumer_name}] 已处理消息统计信息:") logger.info(f"[消费者-{self.consumer_name}] 总处理数量: {stats['total_processed']}") logger.info(f"[消费者-{self.consumer_name}] 已处理消息ID列表: {stats['processed_message_ids']}") logger.info("=" * 50) async def __aenter__(self): """异步上下文管理器入口""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" await self.close()