import asyncio import aio_pika from config import RABBITMQ_URI async def direct_consumer(queue_name: str, consumer_label: str): """ Direct Exchange 消费者:监听指定队列,处理对应路由键的消息 :param queue_name: 要监听的队列名称(与生产者中定义的队列一致) :param consumer_label: 消费者标签(用于区分不同类型的消息处理器) """ # 1. 建立与 RabbitMQ 的连接(robust 模式自动重连,提升稳定性) connection = await aio_pika.connect_robust(RABBITMQ_URI) # 2. 创建通信信道(所有操作通过信道执行,减少 TCP 连接开销) channel = await connection.channel() # 3. 开启公平调度:确保消费者处理完 1 条消息后,再接收下 1 条 # 避免「快消费者空闲、慢消费者堆积」的不均衡问题 await channel.set_qos(prefetch_count=1) # 4. 声明要监听的队列(与生产者中 queue_bindings 定义的队列完全一致) # 若队列不存在(未执行 setup_direct_exchange),会报错提醒初始化 queue = await channel.declare_queue( queue_name, durable=True, # 与生产者一致:队列持久化(重启不丢失) auto_delete=False # 队列不自动删除(即使无消费者也保留) ) # 5. 定义消息处理逻辑(核心:根据队列类型处理对应级别消息) async def on_message_received(message: aio_pika.IncomingMessage): # async with 上下文:自动完成消息确认(处理完后告知 RabbitMQ 删除消息) # 若处理过程中崩溃,消息会重新回到队列,避免丢失 async with message.process(): # 解码消息体(生产者用 utf-8 编码,此处对应解码) message_content = message.body.decode("utf-8") # Print key information (for debugging and log tracking) print(f"[{consumer_label} Consumer] Received message:") print(f" Queue name: {queue_name}") print(f" Message content: {message_content}") print(f" Message routing key: {message.routing_key}") # Verify routing key match print(f" Processing time: {asyncio.get_running_loop().time():.2f}s\n") # 模拟不同级别消息的处理耗时(业务场景可替换为实际逻辑) if "error" in queue_name: # 错误消息:可能需要重试、告警,耗时更长 await asyncio.sleep(2) elif "warning" in queue_name: # 警告消息:可能需要记录日志、轻量处理 await asyncio.sleep(1) elif "info" in queue_name: # 信息/调试消息:快速处理,仅记录 await asyncio.sleep(0.5) # 6. 启动队列监听:将消息处理函数绑定到队列 consumer_tag = f"direct_{consumer_label.lower().replace(' ', '_')}_{queue_name}" await queue.consume(on_message_received, consumer_tag=consumer_tag) # Print startup log, indicating consumer is ready print(f"[{consumer_label} Consumer] Started, listening to queue: {queue_name} (tag: {consumer_tag})\n") # 7. 保持消费者运行(无限期阻塞,直到手动停止程序) # 若不阻塞,协程会立即结束,消费者会断开连接 await asyncio.Future() async def start_all_direct_consumers(queue_prefix="demo.direct.queue-"): """ 启动所有 Direct Exchange 对应的消费者 与生产者 setup_direct_exchange 中的 queue_bindings 完全对应 """ # Define list of consumers to start (queue name + consumer label) consumers = [ # Error queue: handles messages with routing key "error" direct_consumer(f"{queue_prefix}error", "Error Level"), # Warning queue: handles messages with routing key "warning" direct_consumer(f"{queue_prefix}warning", "Warning Level"), # Info queue: handles messages with routing keys "info" and "debug" direct_consumer(f"{queue_prefix}info", "Info/Debug Level") ] # 同时启动所有消费者(并发运行,互不阻塞) await asyncio.gather(*consumers) if __name__ == "__main__": # 启动所有消费者(需先执行 setup_direct_exchange 初始化队列) asyncio.run(start_all_direct_consumers())