## 1. MQ 的基本概念 ### 1.1 什么是MQ? MQ全称为Message Queue即消息队列 - "消息队列" 是在消息的传输过程中保存消息的容器 - 它是典型的生产者——消费者模型:生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息 - 这样的好处:生产者只需要关注发消息,消费者只需要关注收消息,二者没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦 ### 1.2 为什么要使用 MQ? 消息队列(Message Queue)是一种在分布式系统中用于异步通信的中间件,核心作用是通过**存储-转发**机制实现消息的异步传递,从而解决系统间耦合、流量削峰、异步处理等问题。 #### 主要作用: **1. 解耦系统组件** - 传统系统中,组件间通常直接调用(如 A 服务直接调用 B 服务),导致强耦合 - 引入消息队列后,A 服务只需向队列发送消息,无需关心谁接收或处理;B 服务从队列中获取消息即可 - 双方通过消息格式约定通信,互不依赖,降低了系统耦合度 **2. 异步处理,提高效率** - 同步处理场景中,一个操作可能需要等待多个服务依次完成,总耗时是各步骤之和 - 消息队列支持异步处理:主流程完成后,只需发送消息到队列,无需等待后续步骤完成即可返回结果 - 其他服务异步从队列获取消息并处理,显著提升系统响应速度和吞吐量 **3. 流量削峰,保护系统** - 突发流量(如电商秒杀、直播带货)可能瞬间压垮后端服务 - 消息队列可作为"缓冲池":高峰期请求先进入队列,后端服务按自身处理能力从队列中消费消息 **4. 数据同步与分发** - 同一消息可被多个消费者消费,实现"一次发送,多端处理" - 跨系统数据同步,通过消息队列确保数据一致性 **5. 重试与容错** - 若消费者服务临时故障,消息队列会保留消息,待服务恢复后重新投递 - 配合重试机制,可解决网络波动、服务暂时不可用等问题 --- ## 2. RabbitMQ ### 2.1 介绍 RabbitMQ 是一个开源的消息代理软件(也可称为消息队列中间件),由 Erlang 语言编写,在分布式系统中用于实现应用程序之间的异步通信和解耦。 #### 特点: - **多种协议支持**:除了 AMQP,RabbitMQ 还支持 STOMP、MQTT 等多种消息协议 - **高可靠性**:通过消息持久化、集群、镜像队列等机制,保证消息不丢失 - **灵活的路由机制**:交换器提供了丰富的消息路由规则 - **多语言客户端支持**:提供了多种编程语言的客户端库 - **管理界面友好**:具备一个可视化的管理界面 ### 2.2 核心组件 ![img_6.png](assert/img_6.png) ``` 生产者 → 信道 → 交换器(Exchange) → 队列(Queue) → 信道 → 消费者 ``` #### 核心组件详解: **1. 生产者(Producer)** - 定义:发送消息的应用程序或服务 - 作用:将业务数据封装为消息,发送到 RabbitMQ 服务器 - 特点:无需关心消息的最终接收者,只需指定消息发送到哪个交换器 **2. 消费者(Consumer)** - 定义:接收并处理消息的应用程序或服务 - 作用:持续监听队列,当有消息到达时,从队列中获取消息并进行业务处理 - 特点:消费者与队列绑定,可通过自动确认或手动确认机制告知 RabbitMQ 消息是否处理完成 **3. 队列(Queue)** - 定义:存储消息的容器,是消息的最终落脚点 - 核心属性: - 名称:队列的唯一标识 - 持久化(Durable):若为 true,队列会在 RabbitMQ 重启后保留 - 排他性(Exclusive):若为 true,队列仅对当前连接可见 - 自动删除(Auto-delete):若为 true,当最后一个消费者断开连接后,队列自动删除 **4. 交换器(Exchange)** - 定义:接收生产者发送的消息,并根据路由规则将消息转发到一个或多个队列 - 作用:类似于"路由器",负责消息的路由逻辑 - 类型: - **直连交换器(Direct Exchange)**:消息的 Routing Key 与队列绑定的 Binding Key 完全匹配 - **主题交换器(Topic Exchange)**:支持通配符(* 匹配单个单词,# 匹配多个单词) - **扇出交换器(Fanout Exchange)**:忽略 Routing Key,将消息广播到所有绑定的队列 - **头交换器(Headers Exchange)**:根据消息属性(Headers)而非 Routing Key 匹配 **5. 绑定(Binding)** - 定义:交换器与队列之间的关联关系,包含路由规则 - 作用:告诉交换器"哪些队列需要接收什么样的消息" **6. 连接(Connection)** - 定义:生产者/消费者与 RabbitMQ 服务器之间的 TCP 连接 - 特点:建立 TCP 连接开销较大,因此通常会复用连接 **7. 信道(Channel)** - 定义:建立在 TCP 连接之上的虚拟连接,是消息传递的实际通道 - 作用:减少 TCP 连接数量,降低服务器资源消耗 ### 2.3 安装RabbitMQ #### 带Management页面 ```bash docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq:management ``` #### 不带Management页面 ```bash docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq ``` **访问管理页面**:http://localhost:15673 - 账号:guest - 密码:guest ### 2.4 RabbitMQ管理页面 管理页面提供了丰富的监控和管理功能: - **Overview**:服务器概览,包括连接数、队列数、消息速率等 - **Connections**:显示所有客户端连接信息 - **Channels**:显示各个连接的信道信息 - **Exchanges**:管理交换器 - **Queues**:管理队列 - **Admin**:用户和权限管理 --- ## 3. Python集成 基于python aio-pika库进行集成 仓库地址:https://gitea.freeleaps.mathmast.com/icecheng/rabbitmq-test ### 3.1 Fanout Exchange Demo #### 核心特点: - 忽略路由键(Routing Key),无论设置什么值都会被忽略 - 消息会广播到所有与之绑定的队列 - 适合需要 "一对多" 通知的场景(如系统通知、日志广播) #### 架构关系图: ![img_1.png](assert/img_1.png) ```mermaid graph TD P[Producer
fanout_publish] --> E[Fanout Exchange
demo.fanout] E --> Q1[Queue
demo.fanout.queue-0] E --> Q2[Queue
demo.fanout.queue-1] E --> Q3[Queue
demo.fanout.queue-2] Q1 --> C1[Consumer 1
fanout_consumer_1] Q2 --> C2[Consumer 2
fanout_consumer_2] Q3 --> C3[Consumer 3
fanout_consumer_3] style P fill:#e1f5fe style E fill:#f3e5f5 style Q1 fill:#e8f5e8 style Q2 fill:#e8f5e8 style Q3 fill:#e8f5e8 style C1 fill:#fff3e0 style C2 fill:#fff3e0 style C3 fill:#fff3e0 ``` #### 生产者代码: ```python import asyncio import aio_pika from config import RABBITMQ_URI async def setup_fanout_exchange(exchange_name="demo.fanout", queue_name_prefix="demo.fanout.queue-"): # 建立连接 connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() # 1. 声明 Fanout 类型交换器 fanout_exchange = await channel.declare_exchange( exchange_name, aio_pika.ExchangeType.FANOUT, durable=True # 交换器持久化 ) # 2. 定义需要绑定的队列名称列表 queue_names = [queue_name_prefix + str(i) for i in range(3)] # 3. 循环创建队列并绑定到交换器 for name in queue_names: queue = await channel.declare_queue( name, durable=True, auto_delete=False ) # 绑定队列到 Fanout 交换器(忽略路由键) await queue.bind(fanout_exchange, routing_key="") async def fanout_publish(message: str = "", exchange_name: str = "demo.fanout"): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() fanout_exchange = await channel.declare_exchange( exchange_name, aio_pika.ExchangeType.FANOUT, durable=True ) message = aio_pika.Message( body=message.encode("utf-8"), delivery_mode=aio_pika.DeliveryMode.PERSISTENT # 消息持久化 ) # 发送消息到Fanout交换器 await fanout_exchange.publish(message, routing_key="") await connection.close() ``` #### 消费者代码: ```python import asyncio import aio_pika from config import RABBITMQ_URI async def fanout_consumer(queue_name: str, consumer_id: int): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue( queue_name, durable=True, auto_delete=False ) async def on_message_received(message: aio_pika.IncomingMessage): async with message.process(): message_content = message.body.decode("utf-8") print(f"[Fanout Consumer {consumer_id}] Received broadcast message:") print(f" Listening queue: {queue_name}") print(f" Message content: {message_content}") print(f" Message persistence: {'Yes' if message.delivery_mode == 2 else 'No'}") await asyncio.sleep(1) consumer_tag = f"fanout_consumer_{consumer_id}_{queue_name}" await queue.consume(on_message_received, consumer_tag=consumer_tag) print(f"[Fanout Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") await asyncio.Future() ``` #### 完整测试代码 **测试运行脚本:** ```python #!/usr/bin/env python3 """ Run Fanout Exchange Test """ import asyncio import sys import os # Add current directory to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from product.fanout_publish import fanout_publish, setup_fanout_exchange from comsumer.fanout_consumer import start_all_fanout_consumers async def run_fanout_test(): """Run fanout exchange test with producer and consumer""" print("=== Running Fanout Exchange Test ===") # Start consumer in background consumer_task = asyncio.create_task(start_all_fanout_consumers()) # Wait for consumer to start await asyncio.sleep(1) # Setup and publish messages await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-") await fanout_publish(message="hello world", exchange_name="demo.fanout") await fanout_publish(message="test message 2", exchange_name="demo.fanout") await fanout_publish(message="test message 3", exchange_name="demo.fanout") # Wait for messages to be processed await asyncio.sleep(3) # Cancel consumer consumer_task.cancel() print("✅ Fanout test completed successfully!") if __name__ == "__main__": asyncio.run(run_fanout_test()) ``` #### 测试输出: ``` === Running Fanout Exchange Test === [Fanout Consumer 2] Started, listening to queue: demo.fanout.queue-1 (tag: fanout_consumer_2_demo.fanout.queue-1) [Fanout Consumer 1] Started, listening to queue: demo.fanout.queue-0 (tag: fanout_consumer_1_demo.fanout.queue-0) [Fanout Consumer 3] Started, listening to queue: demo.fanout.queue-2 (tag: fanout_consumer_3_demo.fanout.queue-2) [Fanout Consumer 2] Received broadcast message: Listening queue: demo.fanout.queue-1 Message content: hello world Message persistence: Yes [Fanout Consumer 1] Received broadcast message: Listening queue: demo.fanout.queue-0 Message content: hello world Message persistence: Yes [Fanout Consumer 3] Received broadcast message: Listening queue: demo.fanout.queue-2 Message content: hello world Message persistence: Yes [Fanout Consumer 3] Received broadcast message: Listening queue: demo.fanout.queue-2 Message content: test message 2 Message persistence: Yes [Fanout Consumer 1] Received broadcast message: Listening queue: demo.fanout.queue-0 Message content: test message 2 Message persistence: Yes [Fanout Consumer 2] Received broadcast message: Listening queue: demo.fanout.queue-1 Message content: test message 2 Message persistence: Yes [Fanout Consumer 1] Received broadcast message: Listening queue: demo.fanout.queue-0 Message content: test message 3 Message persistence: Yes [Fanout Consumer 2] Received broadcast message: Listening queue: demo.fanout.queue-1 Message content: test message 3 Message persistence: Yes [Fanout Consumer 3] Received broadcast message: Listening queue: demo.fanout.queue-2 Message content: test message 3 Message persistence: Yes ✅ Fanout test completed successfully! ``` ### 3.2 Direct Exchange Demo #### 核心特点: - 基于路由键(routing key)与绑定键(binding key)的精确匹配 - 只有当消息的路由键与队列的绑定键完全一致时,消息才会被路由到该队列 - 适合需要精准路由的场景(如日志级别区分:error、warning、info 分别路由到不同队列) #### 架构关系图: ```mermaid graph TD P[Producer
direct_publish] --> E[Direct Exchange
demo.direct] E -->|routing_key: error| Q1[Queue
demo.direct.queue-error] E -->|routing_key: warning| Q2[Queue
demo.direct.queue-warning] E -->|routing_key: info| Q3[Queue
demo.direct.queue-info] E -->|routing_key: debug| Q3 Q1 --> C1[Error Level Consumer
direct_error_level] Q2 --> C2[Warning Level Consumer
direct_warning_level] Q3 --> C3[Info/Debug Level Consumer
direct_info/debug_level] style P fill:#e1f5fe style E fill:#f3e5f5 style Q1 fill:#ffebee style Q2 fill:#fff3e0 style Q3 fill:#e8f5e8 style C1 fill:#ffebee style C2 fill:#fff3e0 style C3 fill:#e8f5e8 ``` #### 生产者代码: ```python async def setup_direct_exchange(exchange_name="demo.direct", queue_prefix="demo.direct.queue-"): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() direct_exchange = await channel.declare_exchange( exchange_name, aio_pika.ExchangeType.DIRECT, durable=True ) # 定义队列及对应的绑定键 queue_bindings = [ (f"{queue_prefix}error", ["error"]), # 处理错误级别的消息 (f"{queue_prefix}warning", ["warning"]), # 处理警告级别的消息 (f"{queue_prefix}info", ["info", "debug"]) # 处理信息和调试级别的消息 ] for queue_name, binding_keys in queue_bindings: queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) for binding_key in binding_keys: await queue.bind(direct_exchange, routing_key=binding_key) print(f"Queue {queue_name} bound to routing keys: {binding_keys}") async def direct_publish(message: str, routing_key: str, exchange_name: str = "demo.direct"): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() exchange = await channel.declare_exchange( exchange_name, aio_pika.ExchangeType.DIRECT, durable=True ) message_obj = aio_pika.Message( body=message.encode("utf-8"), delivery_mode=aio_pika.DeliveryMode.PERSISTENT ) await exchange.publish(message_obj, routing_key=routing_key) print(f"Message sent: {message} (routing key: {routing_key})") ``` #### 消费者代码: ```python async def direct_consumer(queue_name: str, consumer_label: str): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) async def on_message_received(message: aio_pika.IncomingMessage): async with message.process(): message_content = message.body.decode("utf-8") print(f"[{consumer_label} Consumer] Received message:") print(f" Queue name: {queue_name}") print(f" Message content: {message_content}") print(f" Message routing key: {message.routing_key}") print(f" Processing time: {asyncio.get_running_loop().time():.2f}s") # 模拟不同级别消息的处理耗时 if "error" in queue_name: await asyncio.sleep(2) elif "warning" in queue_name: await asyncio.sleep(1) elif "info" in queue_name: await asyncio.sleep(0.5) consumer_tag = f"direct_{consumer_label.lower().replace(' ', '_')}_{queue_name}" await queue.consume(on_message_received, consumer_tag=consumer_tag) print(f"[{consumer_label} Consumer] Started, listening to queue: {queue_name} (tag: {consumer_tag})") await asyncio.Future() ``` #### 完整测试代码 **测试运行脚本:** ```python #!/usr/bin/env python3 """ Run Direct Exchange Test """ import asyncio import sys import os # Add current directory to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from product.direct_publish import setup_direct_exchange, direct_publish from comsumer.direct_consumer import start_all_direct_consumers async def run_direct_exchange_test(): """Run direct exchange test with producer and consumer""" print("=== Running Direct Exchange Test ===") # Start consumer in background consumer_task = asyncio.create_task(start_all_direct_consumers()) # Wait for consumer to start await asyncio.sleep(1) # Setup exchange and publish messages await setup_direct_exchange() test_messages = [ ("System crash, unable to start", "error"), # Route to error queue ("Disk space insufficient", "warning"), # Route to warning queue ("User login successful", "info"), # Route to info queue ("Debug info: Database connection successful", "debug") # Route to info queue ] for msg, routing_key in test_messages: await direct_publish(msg, routing_key) await asyncio.sleep(0.5) # Wait for messages to be processed await asyncio.sleep(3) # Cancel consumer consumer_task.cancel() print("✅ Direct exchange test completed successfully!") if __name__ == "__main__": asyncio.run(run_direct_exchange_test()) ``` #### 测试输出: ``` === Running Direct Exchange Test === [Info/Debug Level Consumer] Started, listening to queue: demo.direct.queue-info (tag: direct_info/debug_level_demo.direct.queue-info) [Warning Level Consumer] Started, listening to queue: demo.direct.queue-warning (tag: direct_warning_level_demo.direct.queue-warning) [Error Level Consumer] Started, listening to queue: demo.direct.queue-error (tag: direct_error_level_demo.direct.queue-error) Queue demo.direct.queue-error bound to routing keys: ['error'] Queue demo.direct.queue-warning bound to routing keys: ['warning'] Queue demo.direct.queue-info bound to routing keys: ['info', 'debug'] [Error Level Consumer] Received message: Queue name: demo.direct.queue-error Message content: System crash, unable to start Message routing key: error Processing time: 322774.03s Message sent: System crash, unable to start (routing key: error) [Warning Level Consumer] Received message: Queue name: demo.direct.queue-warning Message content: Disk space insufficient Message routing key: warning Processing time: 322774.54s Message sent: Disk space insufficient (routing key: warning) [Info/Debug Level Consumer] Received message: Queue name: demo.direct.queue-info Message content: User login successful Message routing key: info Processing time: 322775.06s Message sent: User login successful (routing key: info) [Info/Debug Level Consumer] Received message: Queue name: demo.direct.queue-info Message content: Debug info: Database connection successful Message routing key: debug Processing time: 322775.57s Message sent: Debug info: Database connection successful (routing key: debug) ✅ Direct exchange test completed successfully! ``` ### 3.3 Direct Exchange Demo (负载均衡) #### 实现原理: 1. 创建多个队列:每个队列绑定到同一个 Direct Exchange,但使用不同的路由键 2. 生产者路由策略:通过轮询、随机或按消息特征哈希的方式,选择一个路由键 3. 消费者处理:每个队列对应一个或多个消费者,各自处理分配到的消息 #### 架构关系图: ```mermaid graph TD P[BalancedProducer
轮询发送] --> E[Direct Exchange
demo.direct.multi.queue] E -->|routing_key: route.1| Q1[Queue
task.queue.1] E -->|routing_key: route.2| Q2[Queue
task.queue.2] E -->|routing_key: route.3| Q3[Queue
task.queue.3] Q1 --> C1[Consumer 1
multi_consumer_1] Q2 --> C2[Consumer 2
multi_consumer_2] Q3 --> C3[Consumer 3
multi_consumer_3] P -.->|轮询算法| P style P fill:#e1f5fe style E fill:#f3e5f5 style Q1 fill:#e8f5e8 style Q2 fill:#e8f5e8 style Q3 fill:#e8f5e8 style C1 fill:#fff3e0 style C2 fill:#fff3e0 style C3 fill:#fff3e0 ``` #### 生产者代码: ```python class BalancedProducer: def __init__(self, exchange_name="demo.direct.multi.queue", queue_count=3): self.exchange_name = exchange_name self.queue_count = queue_count self.current_index = 0 # 轮询索引 async def connect(self): self.connection = await aio_pika.connect_robust(RABBITMQ_URI) self.channel = await self.connection.channel() self.exchange = await self.channel.declare_exchange( self.exchange_name, aio_pika.ExchangeType.DIRECT, durable=True ) async def publish(self, message: str): # 轮询算法:每次发送后切换到下一个路由键 self.current_index = (self.current_index + 1) % self.queue_count route_key = f"route.{self.current_index + 1}" message_obj = aio_pika.Message( body=message.encode("utf-8"), delivery_mode=aio_pika.DeliveryMode.PERSISTENT ) await self.exchange.publish(message_obj, routing_key=route_key) print(f"Message sent: {message} (routed to {route_key})") ``` #### 消费者代码: ```python async def queue_consumer(queue_name: str, consumer_id: int): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) async def on_message(message: aio_pika.IncomingMessage): async with message.process(): content = message.body.decode("utf-8") print(f"[Consumer {consumer_id}] Processing message: {content}") print(f"[Consumer {consumer_id}] From queue: {queue_name}") print(f"[Consumer {consumer_id}] Routing key: {message.routing_key}") await asyncio.sleep(1) consumer_tag = f"multi_consumer_{consumer_id}_{queue_name}" await queue.consume(on_message, consumer_tag=consumer_tag) print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") await asyncio.Future() ``` #### 完整测试代码 **测试运行脚本:** ```python #!/usr/bin/env python3 """ Run Multi-Queue Load Balancing Test """ import asyncio import sys import os # Add current directory to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from product.direct_multi_publish import setup_multi_queue_balance, BalancedProducer from comsumer.direct_multi_consumer import start_balanced_consumers async def run_multi_queue_balance_test(): """Run multi-queue load balancing test with producer and consumer""" print("=== Running Multi-Queue Load Balancing Test ===") queue_count = 3 # Start consumer in background consumer_task = asyncio.create_task(start_balanced_consumers(queue_count=queue_count)) # Wait for consumer to start await asyncio.sleep(1) # Setup and publish messages await setup_multi_queue_balance(queue_count=queue_count) producer = BalancedProducer(queue_count=queue_count) await producer.connect() for i in range(10): await producer.publish(f"Task {i + 1}: Multi-queue load balancing test") await asyncio.sleep(0.3) await producer.close() # Wait for messages to be processed await asyncio.sleep(3) # Cancel consumer consumer_task.cancel() print("✅ Multi-queue load balancing test completed successfully!") if __name__ == "__main__": asyncio.run(run_multi_queue_balance_test()) ``` #### 测试输出: ``` === Running Multi-Queue Load Balancing Test === [Consumer 2] Started, listening to queue: task.queue.2 (tag: multi_consumer_2_task.queue.2) [Consumer 3] Started, listening to queue: task.queue.3 (tag: multi_consumer_3_task.queue.3) [Consumer 1] Started, listening to queue: task.queue.1 (tag: multi_consumer_1_task.queue.1) Queue task.queue.1 bound to routing key: route.1 Queue task.queue.2 bound to routing key: route.2 Queue task.queue.3 bound to routing key: route.3 [Consumer 2] Processing message: Task 1: Multi-queue load balancing test [Consumer 2] From queue: task.queue.2 [Consumer 2] Routing key: route.2 Message sent: Task 1: Multi-queue load balancing test (routed to route.2) [Consumer 3] Processing message: Task 2: Multi-queue load balancing test [Consumer 3] From queue: task.queue.3 [Consumer 3] Routing key: route.3 Message sent: Task 2: Multi-queue load balancing test (routed to route.3) [Consumer 1] Processing message: Task 3: Multi-queue load balancing test [Consumer 1] From queue: task.queue.1 [Consumer 1] Routing key: route.1 Message sent: Task 3: Multi-queue load balancing test (routed to route.1) Message sent: Task 4: Multi-queue load balancing test (routed to route.2) [Consumer 2] Processing message: Task 4: Multi-queue load balancing test [Consumer 2] From queue: task.queue.2 [Consumer 2] Routing key: route.2 Message sent: Task 5: Multi-queue load balancing test (routed to route.3) [Consumer 3] Processing message: Task 5: Multi-queue load balancing test [Consumer 3] From queue: task.queue.3 [Consumer 3] Routing key: route.3 Message sent: Task 6: Multi-queue load balancing test (routed to route.1) [Consumer 1] Processing message: Task 6: Multi-queue load balancing test [Consumer 1] From queue: task.queue.1 [Consumer 1] Routing key: route.1 Message sent: Task 7: Multi-queue load balancing test (routed to route.2) [Consumer 2] Processing message: Task 7: Multi-queue load balancing test [Consumer 2] From queue: task.queue.2 [Consumer 2] Routing key: route.2 Message sent: Task 8: Multi-queue load balancing test (routed to route.3) [Consumer 3] Processing message: Task 8: Multi-queue load balancing test [Consumer 3] From queue: task.queue.3 [Consumer 3] Routing key: route.3 Message sent: Task 9: Multi-queue load balancing test (routed to route.1) [Consumer 1] Processing message: Task 9: Multi-queue load balancing test [Consumer 1] From queue: task.queue.1 [Consumer 1] Routing key: route.1 Message sent: Task 10: Multi-queue load balancing test (routed to route.2) [Consumer 2] Processing message: Task 10: Multi-queue load balancing test [Consumer 2] From queue: task.queue.2 [Consumer 2] Routing key: route.2 ✅ Multi-queue load balancing test completed successfully! ``` ### 3.4 Topic Exchange Demo #### 核心特性: - 路由键采用层级化字符串(用 . 分隔,如 order.create.user) - 支持两种通配符: - `*`:匹配1 个层级(如 user.* 可匹配 user.login 但不匹配 user.login.success) - `#`:匹配0 个或多个层级(如 order.# 可匹配 order、order.pay、order.pay.success) #### 架构关系图: ```mermaid graph TD P[Producer
topic_publish] --> E[Topic Exchange
demo.topic] E -->|binding: #.critical| Q1[Queue
demo.topic.queue-critical] E -->|binding: order.#| Q2[Queue
demo.topic.queue-order] E -->|binding: user.login.*| Q3[Queue
demo.topic.queue-user.login] Q1 --> C1[CriticalHandler
topic_CriticalHandler] Q2 --> C2[OrderHandler
topic_OrderHandler] Q3 --> C3[UserLoginHandler
topic_UserLoginHandler] subgraph "路由键示例" R1[order.create.critical] -.->|匹配 #.critical 和 order.#| Q1 R1 -.-> Q2 R2[user.login.success] -.->|匹配 user.login.*| Q3 R3[system.log.info] -.->|无匹配| X[消息丢弃] end style P fill:#e1f5fe style E fill:#f3e5f5 style Q1 fill:#ffebee style Q2 fill:#e8f5e8 style Q3 fill:#fff3e0 style C1 fill:#ffebee style C2 fill:#e8f5e8 style C3 fill:#fff3e0 style X fill:#f5f5f5 ``` #### 生产者代码: ```python async def setup_topic_exchange(exchange_name="demo.topic", queue_prefix="demo.topic.queue-"): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() topic_exchange = await channel.declare_exchange( exchange_name, aio_pika.ExchangeType.TOPIC, durable=True ) # 定义队列及对应的绑定键(支持通配符) queue_bindings = [ (f"{queue_prefix}critical", ["#.critical"]), # 匹配任意前缀+critical (f"{queue_prefix}order", ["order.#"]), # 匹配所有order开头的路由键 (f"{queue_prefix}user.login", ["user.login.*"]) # 匹配user.login+1个后缀 ] for queue_name, binding_keys in queue_bindings: queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) for binding_key in binding_keys: await queue.bind(topic_exchange, routing_key=binding_key) print(f"Queue {queue_name} bound to routing keys: {binding_keys}") async def topic_publish(message: str, routing_key: str, exchange_name: str = "demo.topic"): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() exchange = await channel.declare_exchange( exchange_name, aio_pika.ExchangeType.TOPIC, durable=True ) message_obj = aio_pika.Message( body=message.encode("utf-8"), delivery_mode=aio_pika.DeliveryMode.PERSISTENT ) await exchange.publish(message_obj, routing_key=routing_key) print(f"Message sent: {message} (routing key: {routing_key})") ``` #### 消费者代码: ```python async def topic_consumer(queue_name: str, consumer_id: str): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) async def on_message(message: aio_pika.IncomingMessage): async with message.process(): message_content = message.body.decode("utf-8") print(f"[Consumer {consumer_id}] Received message: {message_content}") print(f"[Consumer {consumer_id}] Message routing key: {message.routing_key}") print(f"[Consumer {consumer_id}] From queue: {queue_name}") await asyncio.sleep(1) consumer_tag = f"topic_{consumer_id}_{queue_name}" await queue.consume(on_message, consumer_tag=consumer_tag) print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") await asyncio.Future() ``` #### 完整测试代码 **测试运行脚本:** ```python #!/usr/bin/env python3 """ Run Topic Exchange Test """ import asyncio import sys import os # Add current directory to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from product.topic_publish import setup_topic_exchange, topic_publish from comsumer.topic_consumer import start_all_topic_consumers async def run_topic_exchange_test(): """Run topic exchange test with producer and consumer""" print("=== Running Topic Exchange Test ===") # Start consumer in background consumer_task = asyncio.create_task(start_all_topic_consumers()) # Wait for consumer to start await asyncio.sleep(1) # Setup exchange and publish messages await setup_topic_exchange() test_messages = [ ("Order creation failed (critical error)", "order.create.critical"), ("User login successful", "user.login.success"), ("Order payment completed", "order.pay.success"), ("System crash (critical error)", "system.crash.critical"), ("User login failed", "user.login.failed"), ("Normal system log", "system.log.info") # Won't match any binding key, will be discarded ] for msg, routing_key in test_messages: await topic_publish(msg, routing_key) await asyncio.sleep(0.5) # Wait for messages to be processed await asyncio.sleep(3) # Cancel consumer consumer_task.cancel() print("✅ Topic exchange test completed successfully!") if __name__ == "__main__": asyncio.run(run_topic_exchange_test()) ``` #### 测试输出: ``` === Running Topic Exchange Test === [Consumer CriticalHandler] Started, listening to queue: demo.topic.queue-critical (tag: topic_CriticalHandler_demo.topic.queue-critical) [Consumer UserLoginHandler] Started, listening to queue: demo.topic.queue-user.login (tag: topic_UserLoginHandler_demo.topic.queue-user.login) [Consumer OrderHandler] Started, listening to queue: demo.topic.queue-order (tag: topic_OrderHandler_demo.topic.queue-order) Queue demo.topic.queue-critical bound to routing keys: ['#.critical'] Queue demo.topic.queue-order bound to routing keys: ['order.#'] Queue demo.topic.queue-user.login bound to routing keys: ['user.login.*'] [Consumer OrderHandler] Received message: Order creation failed (critical error) [Consumer OrderHandler] Message routing key: order.create.critical [Consumer OrderHandler] From queue: demo.topic.queue-order [Consumer CriticalHandler] Received message: Order creation failed (critical error) [Consumer CriticalHandler] Message routing key: order.create.critical [Consumer CriticalHandler] From queue: demo.topic.queue-critical Message sent: Order creation failed (critical error) (routing key: order.create.critical) [Consumer UserLoginHandler] Received message: User login successful [Consumer UserLoginHandler] Message routing key: user.login.success [Consumer UserLoginHandler] From queue: demo.topic.queue-user.login Message sent: User login successful (routing key: user.login.success) [Consumer OrderHandler] Received message: Order payment completed [Consumer OrderHandler] Message routing key: order.pay.success [Consumer OrderHandler] From queue: demo.topic.queue-order Message sent: Order payment completed (routing key: order.pay.success) [Consumer CriticalHandler] Received message: System crash (critical error) [Consumer CriticalHandler] Message routing key: system.crash.critical [Consumer CriticalHandler] From queue: demo.topic.queue-critical Message sent: System crash (critical error) (routing key: system.crash.critical) [Consumer UserLoginHandler] Received message: User login failed [Consumer UserLoginHandler] Message routing key: user.login.failed [Consumer UserLoginHandler] From queue: demo.topic.queue-user.login Message sent: User login failed (routing key: user.login.failed) Message sent: Normal system log (routing key: system.log.info) ✅ Topic exchange test completed successfully! ``` ### 3.5 可靠的RabbitMQ生产者消费者 #### 可靠性保证机制 要确保消息消费的可靠性,需要从以下几个方面入手: **1. 消息持久化** - 交换器持久化:`durable=True` - 队列持久化:`durable=True` - 消息持久化:`delivery_mode=PERSISTENT` **2. 消息确认机制** - 自动确认:`async with message.process()` - 手动确认:`message.ack()` / `message.nack()` - 确保消息处理完成后才确认 **3. 消息幂等性** - 使用消息ID去重 - 数据库记录已处理的消息ID - 防止重复处理相同消息 **4. 重试机制** - 可配置最大重试次数 - 消费者内部重试,避免消息重新入队 **5. 死信队列** - 处理失败消息的完整解决方案 - 自动创建死信交换器和队列 - 详细的错误信息记录 #### 关键代码实现 **可靠生产者代码:** ```python class ReliableProducer: """Reliable Message Producer""" def _generate_message_id(self, message_data: Dict[str, Any]) -> str: """Generate message ID for message""" message_type = message_data.get('type', '') content = message_data.get('content', '') # For duplicate_test type messages, generate fixed ID based on content if message_type == 'duplicate_test': import hashlib content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() return f"duplicate_{content_hash[:8]}" else: return f"msg_{asyncio.get_running_loop().time()}" async def publish_reliable_message(self, message_data: Dict[str, Any]) -> bool: """Publish reliable message""" try: # Generate message ID message_id = self._generate_message_id(message_data) # Add message metadata message_data.update({ 'timestamp': datetime.now().isoformat(), 'message_id': message_id }) # Create persistent message message = aio_pika.Message( body=json.dumps(message_data, ensure_ascii=False).encode('utf-8'), delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # Message persistence message_id=message_id, timestamp=datetime.now() ) # Send message and wait for confirmation await self.exchange.publish(message, routing_key="reliable") logger.info(f"[Producer] Message sent: {message_id}") return True except Exception as e: logger.error(f"[Producer] Failed to send message: {e}") return False ``` **可靠消费者代码:** ```python class ReliableConsumer: """Reliable Message Consumer""" def __init__(self, queue_name: Optional[str] = None, consumer_name: Optional[str] = None): self.processed_messages: Set[str] = set() # Store processed message IDs async def process_message(self, message: aio_pika.IncomingMessage): """Core message processing logic""" try: # Parse message message_data = json.loads(message.body.decode('utf-8')) message_id = message_data.get('message_id') # Check if message has been processed before (idempotency check) if message_id in self.processed_messages: logger.warning(f"[Consumer-{self.consumer_name}] 🚫 Duplicate message detected, skipping: {message_id}") await message.ack() return logger.info(f"[Consumer-{self.consumer_name}] Starting to process message: {message_id}") # Retry processing message directly success = await self.retry_process_message(message_data, message_id, 0) # Only record processed message ID after successful processing if success: self.processed_messages.add(message_id) await message.ack() logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed and acknowledged") else: # Processing failed, send to dead letter queue await self.send_to_dead_letter_queue(message, message_id, "Processing failed") await message.ack() except Exception as e: logger.error(f"[Consumer-{self.consumer_name}] Failed to process message: {e}") await message.ack() async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool: """Retry processing message directly""" max_retries = config.max_retries last_error = None for attempt in range(max_retries + 1): try: logger.info(f"[Consumer-{self.consumer_name}] Attempting to process message {message_id}, attempt {attempt + 1}") await self.message_handler(message_data) logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed successfully") return True except Exception as e: last_error = str(e) logger.warning(f"[Consumer-{self.consumer_name}] Message {message_id} failed on attempt {attempt + 1}: {e}") if attempt < max_retries: await asyncio.sleep(1) else: logger.error(f"[Consumer-{self.consumer_name}] Message {message_id} failed after {max_retries} retries: {last_error}") return False ``` **死信队列消费者代码:** ```python class DeadLetterConsumer: """Dead Letter Queue Consumer""" async def process_dead_letter_message(self, message: aio_pika.IncomingMessage): """Process dead letter message""" try: # Parse dead letter message dead_letter_data = json.loads(message.body.decode('utf-8')) original_message = dead_letter_data.get('original_message', {}) error_info = dead_letter_data.get('error_info', 'Unknown') message_id = dead_letter_data.get('message_id', 'Unknown') # Print dead letter message information logger.error("=" * 50) logger.error("[Dead Letter Consumer] Received Dead Letter Message:") logger.error(f"[Dead Letter Consumer] Message ID: {message_id}") logger.error(f"[Dead Letter Consumer] Message Content: {json.dumps(original_message, ensure_ascii=False, indent=2)}") logger.error(f"[Dead Letter Consumer] Error Reason: {error_info}") logger.error("=" * 50) # Save to database await self.save_to_database(original_message, error_info, message_id) # Acknowledge dead letter message await message.ack() logger.info(f"[Dead Letter Consumer] Dead letter message {message_id} processed") except Exception as e: logger.error(f"[Dead Letter Consumer] Failed to process dead letter message: {e}") await message.nack(requeue=False) ``` #### 完整测试代码 **测试运行脚本:** ```python """ RabbitMQ 可靠消息传递测试模块 """ import asyncio import logging from reliable_mq import ReliableProducer, ReliableConsumer from reliable_mq.dead_letter_consumer import DeadLetterConsumer from reliable_mq.config import config # 配置日志 logging.basicConfig( level=getattr(logging, config.log_level), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) async def run_context_manager_messaging(): """使用上下文管理器测试可靠消息传递""" logger.info("=== 使用上下文管理器测试可靠消息传递 ===") # 使用异步上下文管理器 async with ReliableProducer() as producer: async with ReliableConsumer(consumer_name="context_test_consumer") as consumer: async with DeadLetterConsumer() as dead_letter_consumer: # 启动消费者(在后台运行) consumer_task = asyncio.create_task(consumer.start_consuming()) dead_letter_task = asyncio.create_task(dead_letter_consumer.start_consuming()) # 等待消费者启动 await asyncio.sleep(1) # 发送测试消息 test_messages = [ {"content": "重要业务消息1", "type": "business"}, {"content": "系统通知消息2", "type": "notification"}, {"content": "用户操作消息3", "type": "user_action"}, {"content": "重复消息测试", "type": "duplicate_test"}, {"content": "重复消息测试", "type": "duplicate_test"}, # 重复消息 {"content": "会失败的消息1", "type": "will_fail"}, # 这些消息会失败并进入死信队列 {"content": "会失败的消息2", "type": "will_fail"}, {"content": "会失败的消息3", "type": "will_fail"}, ] for msg in test_messages: await producer.publish_reliable_message(msg) await asyncio.sleep(0.5) # 等待消息处理完成 await asyncio.sleep(30) # 取消任务 consumer_task.cancel() dead_letter_task.cancel() if __name__ == '__main__': asyncio.run(run_context_manager_messaging()) ``` **配置文件:** ```python """ RabbitMQ 可靠消息传递配置模块 """ import os from dataclasses import dataclass from typing import Dict, Any @dataclass class Config: """配置类""" # RabbitMQ 连接配置 rabbitmq_uri: str = "amqp://guest:guest@localhost:5673/" # 交换器和队列配置 exchange_name: str = "reliable.exchange" queue_name: str = "reliable.queue" # 死信队列配置 dead_letter_exchange: str = "reliable.dead.letter.exchange" dead_letter_queue: str = "reliable.dead.letter.queue" # 重试配置 max_retries: int = 3 message_ttl: int = 300000 # 5分钟 # QoS 配置 prefetch_count: int = 1 # 日志配置 log_level: str = "INFO" def get_connection_config(self) -> Dict[str, Any]: """获取连接配置""" return { 'uri': self.rabbitmq_uri, 'prefetch_count': self.prefetch_count } def get_dead_letter_config(self) -> Dict[str, str]: """获取死信队列配置""" return { 'dead_letter_exchange': self.dead_letter_exchange, 'dead_letter_queue': self.dead_letter_queue } # 全局配置实例 config = Config() ``` **自定义消息处理函数:** ```python async def default_message_handler(message_data: Dict[str, Any]): """Default message handler function""" await asyncio.sleep(1) # Simulate processing time message_type = message_data.get('type', '') content = message_data.get('content', '') # Simulate business logic processing if message_type == 'will_fail': raise Exception(f"Simulated business processing failure: {content}") logger.info(f"[Consumer] Business logic processing completed: {content}") ``` #### 测试结果 ``` 2025-09-07 11:25:02,498 - __main__ - INFO - === 使用上下文管理器测试可靠消息传递 === 2025-09-07 11:25:02,509 - reliable_mq.reliable_producer - INFO - [Producer] Connected, queue: reliable.queue 2025-09-07 11:25:02,513 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Connected, listening to queue: reliable.queue 2025-09-07 11:25:02,518 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Connected 2025-09-07 11:25:03,523 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Starting to process message: msg_323632.845174041 2025-09-07 11:25:03,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message content: {'content': '重要业务消息1', 'type': 'business', 'timestamp': '2025-09-07T11:25:03.519240', 'message_id': 'msg_323632.845174041'} 2025-09-07 11:25:03,524 - reliable_mq.reliable_producer - INFO - [Producer] Message sent: msg_323632.845174041 (type: business, content: 重要业务消息1) 2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Business logic processing completed: 重要业务消息1 2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed successfully 2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed and acknowledged 2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Current processed message count: 1 2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ================================================== 2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] 🚫 Duplicate message detected, skipping: 2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message ID: duplicate_090f7015 2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message content: { "content": "重复消息测试", "type": "duplicate_test", "timestamp": "2025-09-07T11:25:05.546930", "message_id": "duplicate_090f7015" } 2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Total processed messages: 4 2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ================================================== 2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message msg_323635.377526708 failed after 3 retries: Simulated business processing failure: 会失败的消息1 2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message sent to dead letter queue: msg_323635.377526708, error: Processing failed 2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ================================================== 2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Received Dead Letter Message: 2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message ID: msg_323635.377526708 2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message Content: { "content": "会失败的消息1", "type": "will_fail", "timestamp": "2025-09-07T11:25:06.051557", "message_id": "msg_323635.377526708" } 2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Error Reason: Processing failed 2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ================================================== 2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] 💾 Dead letter message saved to database: msg_323635.377526708 2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Database Record: { "id": "msg_323635.377526708", "original_message": { "content": "会失败的消息1", "type": "will_fail", "timestamp": "2025-09-07T11:25:06.051557", "message_id": "msg_323635.377526708" }, "error_info": "Processing failed", "created_at": "2025-09-07T11:25:15.064341", "status": "failed" } 2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ================================================== 2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message Statistics: 2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Total Processed: 4 2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message IDs: ['msg_323632.845174041', 'msg_323633.351571583', 'duplicate_090f7015', 'msg_323633.854272166'] 2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ================================================== ``` #### 架构关系图 ![img_5.png](assert/img_5.png) ```mermaid graph TD P[ReliableProducer
可靠生产者] --> E[Reliable Exchange
reliable.exchange] E --> Q[Reliable Queue
reliable.queue] Q --> C[ReliableConsumer
可靠消费者] C -->|处理成功| ACK[Message ACK
消息确认] C -->|处理失败| RETRY[Retry Logic
重试机制] RETRY -->|重试成功| ACK RETRY -->|重试失败| DLQ[Dead Letter Queue
死信队列] DLQ --> DLC[DeadLetterConsumer
死信消费者] DLC --> DB[(Database
数据库)] subgraph "可靠性保证" PERSIST[Message Persistence
消息持久化] IDEMPOTENT[Idempotency Check
幂等性检查] CONFIRM[Publisher Confirmation
发布确认] end subgraph "消息流程" MSG1[正常消息] --> SUCCESS[处理成功] MSG2[重复消息] --> SKIP[跳过处理] MSG3[失败消息] --> FAIL[重试后失败] end style P fill:#e1f5fe style E fill:#f3e5f5 style Q fill:#e8f5e8 style C fill:#fff3e0 style DLQ fill:#ffebee style DLC fill:#ffebee style DB fill:#f3e5f5 style ACK fill:#e8f5e8 style RETRY fill:#fff3e0 ```