""" RabbitMQ 可靠消息消费者模块 """ import asyncio import aio_pika import json import logging from datetime import datetime from typing import Dict, Any, Optional, Callable, Set from .config import config logger = logging.getLogger(__name__) class ReliableConsumer: """可靠消息消费者""" def __init__(self, queue_name: Optional[str] = None, consumer_name: Optional[str] = None, message_handler: Optional[Callable] = None): """ 初始化消费者 Args: queue_name: 队列名称,默认使用配置中的值 consumer_name: 消费者名称 message_handler: 自定义消息处理函数 """ self.queue_name = queue_name or config.queue_name self.consumer_name = consumer_name or "reliable_consumer" self.message_handler = message_handler or self.default_message_handler self.connection = None self.channel = None self.queue = None self.processed_messages: Set[str] = set() # 记录已处理的消息ID,防止重复处理 async def connect(self): """建立连接""" try: connection_config = config.get_connection_config() self.connection = await aio_pika.connect_robust(connection_config['uri']) self.channel = await self.connection.channel() # 设置QoS,确保一次只处理一条消息 await self.channel.set_qos(prefetch_count=connection_config['prefetch_count']) # 声明队列(确保队列存在) self.queue = await self.channel.declare_queue( self.queue_name, durable=True, auto_delete=False ) logger.info(f"[消费者-{self.consumer_name}] 已连接,监听队列: {self.queue_name}") except Exception as e: logger.error(f"[消费者-{self.consumer_name}] 连接失败: {e}") raise async def process_message(self, message: aio_pika.IncomingMessage): """处理消息的核心逻辑""" try: # 解析消息 message_data = json.loads(message.body.decode('utf-8')) message_id = message_data.get('message_id') # 检查是否已经处理过此消息(幂等性检查) if message_id in self.processed_messages: logger.warning("=" * 50) logger.warning(f"[消费者-{self.consumer_name}] 🚫 检测到重复消息,跳过处理:") logger.warning(f"[消费者-{self.consumer_name}] 消息ID: {message_id}") logger.warning(f"[消费者-{self.consumer_name}] 消息内容: {json.dumps(message_data, ensure_ascii=False, indent=2)}") logger.warning(f"[消费者-{self.consumer_name}] 已处理消息总数: {len(self.processed_messages)}") logger.warning("=" * 50) await message.ack() return logger.info(f"[消费者-{self.consumer_name}] 开始处理消息: {message_id}") logger.info(f"[消费者-{self.consumer_name}] 消息内容: {message_data}") # 直接重试处理消息 success = await self.retry_process_message(message_data, message_id, 0) # 只有在处理成功后才记录已处理的消息ID if success: self.processed_messages.add(message_id) # 确认消息 await message.ack() logger.info(f"[消费者-{self.consumer_name}] 消息 {message_id} 处理完成并确认") logger.info(f"[消费者-{self.consumer_name}] 当前已处理消息数量: {len(self.processed_messages)}") else: # 处理失败,不记录消息ID,发送到死信队列 await self.send_to_dead_letter_queue(message, message_id, "处理失败") await message.ack() # 确认消息以避免无限重试 except Exception as e: logger.error(f"[消费者-{self.consumer_name}] 处理消息失败: {e}") # 直接发送到死信队列,包含错误信息 message_data = json.loads(message.body.decode('utf-8')) message_id = message_data.get('message_id') await self.send_to_dead_letter_queue(message, message_id, str(e)) await message.ack() # 确认消息以避免无限重试 async def default_message_handler(self, message_data: Dict[str, Any]): """默认消息处理函数""" # 模拟处理时间 await asyncio.sleep(1) # 根据消息类型决定是否失败 message_type = message_data.get('type', '') if message_type == 'will_fail': # 特定类型的消息总是失败,用于测试死信队列 raise Exception(f"模拟业务处理失败: {message_data.get('content', '')}") else: pass logger.info(f"[消费者-{self.consumer_name}] 业务逻辑处理完成: {message_data.get('content', '')}") async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool: """直接重试处理消息""" max_retries = config.max_retries last_error = None for attempt in range(max_retries + 1): try: logger.info(f"[消费者-{self.consumer_name}] 尝试处理消息 {message_id},第 {attempt + 1} 次") await self.message_handler(message_data) logger.info(f"[消费者-{self.consumer_name}] 消息 {message_id} 处理成功") return True # 处理成功,返回True except Exception as e: last_error = str(e) logger.warning(f"[消费者-{self.consumer_name}] 消息 {message_id} 第 {attempt + 1} 次处理失败: {e}") if attempt < max_retries: # 等待一段时间后重试 await asyncio.sleep(1) else: # 所有重试都失败,返回False logger.error(f"[消费者-{self.consumer_name}] 消息 {message_id} 重试 {max_retries} 次后仍然失败: {last_error}") return False async def send_to_dead_letter_queue(self, message: aio_pika.IncomingMessage, message_id: str, error_info: str = None): """发送消息到死信队列""" try: # 解析消息内容 message_data = json.loads(message.body.decode('utf-8')) # 构建死信消息,包含原始消息和错误信息 dead_letter_data = { 'original_message': message_data, 'error_info': error_info or '重试失败', 'dead_letter_timestamp': datetime.now().isoformat(), 'message_id': message_id, 'consumer_name': self.consumer_name, 'queue_name': self.queue_name } logger.error(f"[消费者-{self.consumer_name}] 消息发送到死信队列: {message_id}, 错误: {error_info}") # 创建死信交换器和队列 dead_letter_config = config.get_dead_letter_config() dead_letter_exchange = await self.channel.declare_exchange( dead_letter_config['dead_letter_exchange'], aio_pika.ExchangeType.DIRECT, durable=True ) dead_letter_queue = await self.channel.declare_queue( dead_letter_config['dead_letter_queue'], durable=True, auto_delete=False ) await dead_letter_queue.bind( dead_letter_exchange, routing_key=dead_letter_config['dead_letter_routing_key'] ) # 创建死信消息 dead_letter_message = aio_pika.Message( body=json.dumps(dead_letter_data, ensure_ascii=False).encode('utf-8'), delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=f"dead_letter_{message_id}" ) # 发送到死信队列 await dead_letter_exchange.publish( dead_letter_message, routing_key=dead_letter_config['dead_letter_routing_key'] ) logger.info(f"[消费者-{self.consumer_name}] 消息 {message_id} 已发送到死信队列") except Exception as e: logger.error(f"[消费者-{self.consumer_name}] 发送到死信队列失败: {e}") logger.error(f"[消费者-{self.consumer_name}] 原始消息内容: {message.body.decode('utf-8') if message.body else 'None'}") async def start_consuming(self): """开始消费消息""" self.consumer_tag = await self.queue.consume(self.process_message) logger.info(f"[消费者-{self.consumer_name}] 开始消费消息...") # 保持消费者运行 await asyncio.Future() async def stop_consuming(self): """停止消费消息""" if self.queue and self.consumer_tag: await self.queue.cancel(self.consumer_tag) logger.info(f"[消费者-{self.consumer_name}] 已停止消费消息") async def close(self): """关闭连接""" try: await self.stop_consuming() if self.connection: await self.connection.close() logger.info(f"[消费者-{self.consumer_name}] 连接已关闭") # 打印最终统计信息 self.print_processed_messages_stats() except Exception as e: logger.error(f"[消费者-{self.consumer_name}] 关闭连接时出错: {e}") def get_processed_messages_stats(self): """获取已处理消息的统计信息""" return { 'total_processed': len(self.processed_messages), 'processed_message_ids': list(self.processed_messages) } def print_processed_messages_stats(self): """打印已处理消息的统计信息""" stats = self.get_processed_messages_stats() logger.info("=" * 50) logger.info(f"[消费者-{self.consumer_name}] 已处理消息统计信息:") logger.info(f"[消费者-{self.consumer_name}] 总处理数量: {stats['total_processed']}") logger.info(f"[消费者-{self.consumer_name}] 已处理消息ID列表: {stats['processed_message_ids']}") logger.info("=" * 50) async def __aenter__(self): """异步上下文管理器入口""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """异步上下文管理器出口""" await self.close()