rabbitmq-test/README.md
2025-09-11 09:57:10 +08:00

1365 lines
52 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

## 1. MQ 的基本概念
### 1.1 什么是MQ
MQ全称为Message Queue即消息队列
- "消息队列" 是在消息的传输过程中保存消息的容器
- 它是典型的生产者——消费者模型:生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息
- 这样的好处:生产者只需要关注发消息,消费者只需要关注收消息,二者没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦
### 1.2 为什么要使用 MQ?
消息队列Message Queue是一种在分布式系统中用于异步通信的中间件核心作用是通过**存储-转发**机制实现消息的异步传递,从而解决系统间耦合、流量削峰、异步处理等问题。
#### 主要作用:
**1. 解耦系统组件**
- 传统系统中,组件间通常直接调用(如 A 服务直接调用 B 服务),导致强耦合
- 引入消息队列后A 服务只需向队列发送消息无需关心谁接收或处理B 服务从队列中获取消息即可
- 双方通过消息格式约定通信,互不依赖,降低了系统耦合度
**2. 异步处理,提高效率**
- 同步处理场景中,一个操作可能需要等待多个服务依次完成,总耗时是各步骤之和
- 消息队列支持异步处理:主流程完成后,只需发送消息到队列,无需等待后续步骤完成即可返回结果
- 其他服务异步从队列获取消息并处理,显著提升系统响应速度和吞吐量
**3. 流量削峰,保护系统**
- 突发流量(如电商秒杀、直播带货)可能瞬间压垮后端服务
- 消息队列可作为"缓冲池":高峰期请求先进入队列,后端服务按自身处理能力从队列中消费消息
**4. 数据同步与分发**
- 同一消息可被多个消费者消费,实现"一次发送,多端处理"
- 跨系统数据同步,通过消息队列确保数据一致性
**5. 重试与容错**
- 若消费者服务临时故障,消息队列会保留消息,待服务恢复后重新投递
- 配合重试机制,可解决网络波动、服务暂时不可用等问题
---
## 2. RabbitMQ
### 2.1 介绍
RabbitMQ 是一个开源的消息代理软件(也可称为消息队列中间件),由 Erlang 语言编写,在分布式系统中用于实现应用程序之间的异步通信和解耦。
#### 特点:
- **多种协议支持**:除了 AMQPRabbitMQ 还支持 STOMP、MQTT 等多种消息协议
- **高可靠性**:通过消息持久化、集群、镜像队列等机制,保证消息不丢失
- **灵活的路由机制**:交换器提供了丰富的消息路由规则
- **多语言客户端支持**:提供了多种编程语言的客户端库
- **管理界面友好**:具备一个可视化的管理界面
### 2.2 核心组件
![img_6.png](assert/img_6.png)
```
生产者 → 信道 → 交换器(Exchange) → 队列(Queue) → 信道 → 消费者
```
#### 核心组件详解:
**1. 生产者Producer**
- 定义:发送消息的应用程序或服务
- 作用:将业务数据封装为消息,发送到 RabbitMQ 服务器
- 特点:无需关心消息的最终接收者,只需指定消息发送到哪个交换器
**2. 消费者Consumer**
- 定义:接收并处理消息的应用程序或服务
- 作用:持续监听队列,当有消息到达时,从队列中获取消息并进行业务处理
- 特点:消费者与队列绑定,可通过自动确认或手动确认机制告知 RabbitMQ 消息是否处理完成
**3. 队列Queue**
- 定义:存储消息的容器,是消息的最终落脚点
- 核心属性:
- 名称:队列的唯一标识
- 持久化Durable若为 true队列会在 RabbitMQ 重启后保留
- 排他性Exclusive若为 true队列仅对当前连接可见
- 自动删除Auto-delete若为 true当最后一个消费者断开连接后队列自动删除
**4. 交换器Exchange**
- 定义:接收生产者发送的消息,并根据路由规则将消息转发到一个或多个队列
- 作用:类似于"路由器",负责消息的路由逻辑
- 类型:
- **直连交换器Direct Exchange**:消息的 Routing Key 与队列绑定的 Binding Key 完全匹配
- **主题交换器Topic Exchange**:支持通配符(* 匹配单个单词,# 匹配多个单词)
- **扇出交换器Fanout Exchange**:忽略 Routing Key将消息广播到所有绑定的队列
- **头交换器Headers Exchange**根据消息属性Headers而非 Routing Key 匹配
**5. 绑定Binding**
- 定义:交换器与队列之间的关联关系,包含路由规则
- 作用:告诉交换器"哪些队列需要接收什么样的消息"
**6. 连接Connection**
- 定义:生产者/消费者与 RabbitMQ 服务器之间的 TCP 连接
- 特点:建立 TCP 连接开销较大,因此通常会复用连接
**7. 信道Channel**
- 定义:建立在 TCP 连接之上的虚拟连接,是消息传递的实际通道
- 作用:减少 TCP 连接数量,降低服务器资源消耗
### 2.3 安装RabbitMQ
#### 带Management页面
```bash
docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq:management
```
#### 不带Management页面
```bash
docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq
```
**访问管理页面**http://localhost:15673
- 账号guest
- 密码guest
### 2.4 RabbitMQ管理页面
管理页面提供了丰富的监控和管理功能:
- **Overview**:服务器概览,包括连接数、队列数、消息速率等
- **Connections**:显示所有客户端连接信息
- **Channels**:显示各个连接的信道信息
- **Exchanges**:管理交换器
- **Queues**:管理队列
- **Admin**:用户和权限管理
---
## 3. Python集成
基于python aio-pika库进行集成
仓库地址:https://gitea.freeleaps.mathmast.com/icecheng/rabbitmq-test
### 3.1 Fanout Exchange Demo
#### 核心特点:
- 忽略路由键Routing Key无论设置什么值都会被忽略
- 消息会广播到所有与之绑定的队列
- 适合需要 "一对多" 通知的场景(如系统通知、日志广播)
#### 架构关系图:
![img_1.png](assert/img_1.png)
```mermaid
graph TD
P[Producer<br/>fanout_publish] --> E[Fanout Exchange<br/>demo.fanout]
E --> Q1[Queue<br/>demo.fanout.queue-0]
E --> Q2[Queue<br/>demo.fanout.queue-1]
E --> Q3[Queue<br/>demo.fanout.queue-2]
Q1 --> C1[Consumer 1<br/>fanout_consumer_1]
Q2 --> C2[Consumer 2<br/>fanout_consumer_2]
Q3 --> C3[Consumer 3<br/>fanout_consumer_3]
style P fill:#e1f5fe
style E fill:#f3e5f5
style Q1 fill:#e8f5e8
style Q2 fill:#e8f5e8
style Q3 fill:#e8f5e8
style C1 fill:#fff3e0
style C2 fill:#fff3e0
style C3 fill:#fff3e0
```
#### 生产者代码:
```python
import asyncio
import aio_pika
from config import RABBITMQ_URI
async def setup_fanout_exchange(exchange_name="demo.fanout", queue_name_prefix="demo.fanout.queue-"):
# 建立连接
connection = await aio_pika.connect_robust(RABBITMQ_URI)
channel = await connection.channel()
# 1. 声明 Fanout 类型交换器
fanout_exchange = await channel.declare_exchange(
exchange_name,
aio_pika.ExchangeType.FANOUT,
durable=True # 交换器持久化
)
# 2. 定义需要绑定的队列名称列表
queue_names = [queue_name_prefix + str(i) for i in range(3)]
# 3. 循环创建队列并绑定到交换器
for name in queue_names:
queue = await channel.declare_queue(
name,
durable=True,
auto_delete=False
)
# 绑定队列到 Fanout 交换器(忽略路由键)
await queue.bind(fanout_exchange, routing_key="")
async def fanout_publish(message: str = "", exchange_name: str = "demo.fanout"):
connection = await aio_pika.connect_robust(RABBITMQ_URI)
channel = await connection.channel()
fanout_exchange = await channel.declare_exchange(
exchange_name,
aio_pika.ExchangeType.FANOUT,
durable=True
)
message = aio_pika.Message(
body=message.encode("utf-8"),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT # 消息持久化
)
# 发送消息到Fanout交换器
await fanout_exchange.publish(message, routing_key="")
await connection.close()
```
#### 消费者代码:
```python
import asyncio
import aio_pika
from config import RABBITMQ_URI
async def fanout_consumer(queue_name: str, consumer_id: int):
connection = await aio_pika.connect_robust(RABBITMQ_URI)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(
queue_name,
durable=True,
auto_delete=False
)
async def on_message_received(message: aio_pika.IncomingMessage):
async with message.process():
message_content = message.body.decode("utf-8")
print(f"[Fanout Consumer {consumer_id}] Received broadcast message:")
print(f" Listening queue: {queue_name}")
print(f" Message content: {message_content}")
print(f" Message persistence: {'Yes' if message.delivery_mode == 2 else 'No'}")
await asyncio.sleep(1)
consumer_tag = f"fanout_consumer_{consumer_id}_{queue_name}"
await queue.consume(on_message_received, consumer_tag=consumer_tag)
print(f"[Fanout Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
await asyncio.Future()
```
#### 完整测试代码
**测试运行脚本:**
```python
#!/usr/bin/env python3
"""
Run Fanout Exchange Test
"""
import asyncio
import sys
import os
# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from product.fanout_publish import fanout_publish, setup_fanout_exchange
from comsumer.fanout_consumer import start_all_fanout_consumers
async def run_fanout_test():
"""Run fanout exchange test with producer and consumer"""
print("=== Running Fanout Exchange Test ===")
# Start consumer in background
consumer_task = asyncio.create_task(start_all_fanout_consumers())
# Wait for consumer to start
await asyncio.sleep(1)
# Setup and publish messages
await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-")
await fanout_publish(message="hello world", exchange_name="demo.fanout")
await fanout_publish(message="test message 2", exchange_name="demo.fanout")
await fanout_publish(message="test message 3", exchange_name="demo.fanout")
# Wait for messages to be processed
await asyncio.sleep(3)
# Cancel consumer
consumer_task.cancel()
print("✅ Fanout test completed successfully!")
if __name__ == "__main__":
asyncio.run(run_fanout_test())
```
#### 测试输出:
```
=== Running Fanout Exchange Test ===
[Fanout Consumer 2] Started, listening to queue: demo.fanout.queue-1 (tag: fanout_consumer_2_demo.fanout.queue-1)
[Fanout Consumer 1] Started, listening to queue: demo.fanout.queue-0 (tag: fanout_consumer_1_demo.fanout.queue-0)
[Fanout Consumer 3] Started, listening to queue: demo.fanout.queue-2 (tag: fanout_consumer_3_demo.fanout.queue-2)
[Fanout Consumer 2] Received broadcast message:
Listening queue: demo.fanout.queue-1
Message content: hello world
Message persistence: Yes
[Fanout Consumer 1] Received broadcast message:
Listening queue: demo.fanout.queue-0
Message content: hello world
Message persistence: Yes
[Fanout Consumer 3] Received broadcast message:
Listening queue: demo.fanout.queue-2
Message content: hello world
Message persistence: Yes
[Fanout Consumer 3] Received broadcast message:
Listening queue: demo.fanout.queue-2
Message content: test message 2
Message persistence: Yes
[Fanout Consumer 1] Received broadcast message:
Listening queue: demo.fanout.queue-0
Message content: test message 2
Message persistence: Yes
[Fanout Consumer 2] Received broadcast message:
Listening queue: demo.fanout.queue-1
Message content: test message 2
Message persistence: Yes
[Fanout Consumer 1] Received broadcast message:
Listening queue: demo.fanout.queue-0
Message content: test message 3
Message persistence: Yes
[Fanout Consumer 2] Received broadcast message:
Listening queue: demo.fanout.queue-1
Message content: test message 3
Message persistence: Yes
[Fanout Consumer 3] Received broadcast message:
Listening queue: demo.fanout.queue-2
Message content: test message 3
Message persistence: Yes
✅ Fanout test completed successfully!
```
### 3.2 Direct Exchange Demo
#### 核心特点:
- 基于路由键routing key与绑定键binding key的精确匹配
- 只有当消息的路由键与队列的绑定键完全一致时,消息才会被路由到该队列
- 适合需要精准路由的场景如日志级别区分error、warning、info 分别路由到不同队列)
#### 架构关系图:
```mermaid
graph TD
P[Producer<br/>direct_publish] --> E[Direct Exchange<br/>demo.direct]
E -->|routing_key: error| Q1[Queue<br/>demo.direct.queue-error]
E -->|routing_key: warning| Q2[Queue<br/>demo.direct.queue-warning]
E -->|routing_key: info| Q3[Queue<br/>demo.direct.queue-info]
E -->|routing_key: debug| Q3
Q1 --> C1[Error Level Consumer<br/>direct_error_level]
Q2 --> C2[Warning Level Consumer<br/>direct_warning_level]
Q3 --> C3[Info/Debug Level Consumer<br/>direct_info/debug_level]
style P fill:#e1f5fe
style E fill:#f3e5f5
style Q1 fill:#ffebee
style Q2 fill:#fff3e0
style Q3 fill:#e8f5e8
style C1 fill:#ffebee
style C2 fill:#fff3e0
style C3 fill:#e8f5e8
```
#### 生产者代码:
```python
async def setup_direct_exchange(exchange_name="demo.direct", queue_prefix="demo.direct.queue-"):
connection = await aio_pika.connect_robust(RABBITMQ_URI)
channel = await connection.channel()
direct_exchange = await channel.declare_exchange(
exchange_name,
aio_pika.ExchangeType.DIRECT,
durable=True
)
# 定义队列及对应的绑定键
queue_bindings = [
(f"{queue_prefix}error", ["error"]), # 处理错误级别的消息
(f"{queue_prefix}warning", ["warning"]), # 处理警告级别的消息
(f"{queue_prefix}info", ["info", "debug"]) # 处理信息和调试级别的消息
]
for queue_name, binding_keys in queue_bindings:
queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
for binding_key in binding_keys:
await queue.bind(direct_exchange, routing_key=binding_key)
print(f"Queue {queue_name} bound to routing keys: {binding_keys}")
async def direct_publish(message: str, routing_key: str, exchange_name: str = "demo.direct"):
connection = await aio_pika.connect_robust(RABBITMQ_URI)
channel = await connection.channel()
exchange = await channel.declare_exchange(
exchange_name,
aio_pika.ExchangeType.DIRECT,
durable=True
)
message_obj = aio_pika.Message(
body=message.encode("utf-8"),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
)
await exchange.publish(message_obj, routing_key=routing_key)
print(f"Message sent: {message} (routing key: {routing_key})")
```
#### 消费者代码:
```python
async def direct_consumer(queue_name: str, consumer_label: str):
connection = await aio_pika.connect_robust(RABBITMQ_URI)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
async def on_message_received(message: aio_pika.IncomingMessage):
async with message.process():
message_content = message.body.decode("utf-8")
print(f"[{consumer_label} Consumer] Received message:")
print(f" Queue name: {queue_name}")
print(f" Message content: {message_content}")
print(f" Message routing key: {message.routing_key}")
print(f" Processing time: {asyncio.get_running_loop().time():.2f}s")
# 模拟不同级别消息的处理耗时
if "error" in queue_name:
await asyncio.sleep(2)
elif "warning" in queue_name:
await asyncio.sleep(1)
elif "info" in queue_name:
await asyncio.sleep(0.5)
consumer_tag = f"direct_{consumer_label.lower().replace(' ', '_')}_{queue_name}"
await queue.consume(on_message_received, consumer_tag=consumer_tag)
print(f"[{consumer_label} Consumer] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
await asyncio.Future()
```
#### 完整测试代码
**测试运行脚本:**
```python
#!/usr/bin/env python3
"""
Run Direct Exchange Test
"""
import asyncio
import sys
import os
# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from product.direct_publish import setup_direct_exchange, direct_publish
from comsumer.direct_consumer import start_all_direct_consumers
async def run_direct_exchange_test():
"""Run direct exchange test with producer and consumer"""
print("=== Running Direct Exchange Test ===")
# Start consumer in background
consumer_task = asyncio.create_task(start_all_direct_consumers())
# Wait for consumer to start
await asyncio.sleep(1)
# Setup exchange and publish messages
await setup_direct_exchange()
test_messages = [
("System crash, unable to start", "error"), # Route to error queue
("Disk space insufficient", "warning"), # Route to warning queue
("User login successful", "info"), # Route to info queue
("Debug info: Database connection successful", "debug") # Route to info queue
]
for msg, routing_key in test_messages:
await direct_publish(msg, routing_key)
await asyncio.sleep(0.5)
# Wait for messages to be processed
await asyncio.sleep(3)
# Cancel consumer
consumer_task.cancel()
print("✅ Direct exchange test completed successfully!")
if __name__ == "__main__":
asyncio.run(run_direct_exchange_test())
```
#### 测试输出:
```
=== Running Direct Exchange Test ===
[Info/Debug Level Consumer] Started, listening to queue: demo.direct.queue-info (tag: direct_info/debug_level_demo.direct.queue-info)
[Warning Level Consumer] Started, listening to queue: demo.direct.queue-warning (tag: direct_warning_level_demo.direct.queue-warning)
[Error Level Consumer] Started, listening to queue: demo.direct.queue-error (tag: direct_error_level_demo.direct.queue-error)
Queue demo.direct.queue-error bound to routing keys: ['error']
Queue demo.direct.queue-warning bound to routing keys: ['warning']
Queue demo.direct.queue-info bound to routing keys: ['info', 'debug']
[Error Level Consumer] Received message:
Queue name: demo.direct.queue-error
Message content: System crash, unable to start
Message routing key: error
Processing time: 322774.03s
Message sent: System crash, unable to start (routing key: error)
[Warning Level Consumer] Received message:
Queue name: demo.direct.queue-warning
Message content: Disk space insufficient
Message routing key: warning
Processing time: 322774.54s
Message sent: Disk space insufficient (routing key: warning)
[Info/Debug Level Consumer] Received message:
Queue name: demo.direct.queue-info
Message content: User login successful
Message routing key: info
Processing time: 322775.06s
Message sent: User login successful (routing key: info)
[Info/Debug Level Consumer] Received message:
Queue name: demo.direct.queue-info
Message content: Debug info: Database connection successful
Message routing key: debug
Processing time: 322775.57s
Message sent: Debug info: Database connection successful (routing key: debug)
✅ Direct exchange test completed successfully!
```
### 3.3 Direct Exchange Demo (负载均衡)
#### 实现原理:
1. 创建多个队列:每个队列绑定到同一个 Direct Exchange但使用不同的路由键
2. 生产者路由策略:通过轮询、随机或按消息特征哈希的方式,选择一个路由键
3. 消费者处理:每个队列对应一个或多个消费者,各自处理分配到的消息
#### 架构关系图:
```mermaid
graph TD
P[BalancedProducer<br/>轮询发送] --> E[Direct Exchange<br/>demo.direct.multi.queue]
E -->|routing_key: route.1| Q1[Queue<br/>task.queue.1]
E -->|routing_key: route.2| Q2[Queue<br/>task.queue.2]
E -->|routing_key: route.3| Q3[Queue<br/>task.queue.3]
Q1 --> C1[Consumer 1<br/>multi_consumer_1]
Q2 --> C2[Consumer 2<br/>multi_consumer_2]
Q3 --> C3[Consumer 3<br/>multi_consumer_3]
P -.->|轮询算法| P
style P fill:#e1f5fe
style E fill:#f3e5f5
style Q1 fill:#e8f5e8
style Q2 fill:#e8f5e8
style Q3 fill:#e8f5e8
style C1 fill:#fff3e0
style C2 fill:#fff3e0
style C3 fill:#fff3e0
```
#### 生产者代码:
```python
class BalancedProducer:
def __init__(self, exchange_name="demo.direct.multi.queue", queue_count=3):
self.exchange_name = exchange_name
self.queue_count = queue_count
self.current_index = 0 # 轮询索引
async def connect(self):
self.connection = await aio_pika.connect_robust(RABBITMQ_URI)
self.channel = await self.connection.channel()
self.exchange = await self.channel.declare_exchange(
self.exchange_name,
aio_pika.ExchangeType.DIRECT,
durable=True
)
async def publish(self, message: str):
# 轮询算法:每次发送后切换到下一个路由键
self.current_index = (self.current_index + 1) % self.queue_count
route_key = f"route.{self.current_index + 1}"
message_obj = aio_pika.Message(
body=message.encode("utf-8"),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
)
await self.exchange.publish(message_obj, routing_key=route_key)
print(f"Message sent: {message} (routed to {route_key})")
```
#### 消费者代码:
```python
async def queue_consumer(queue_name: str, consumer_id: int):
connection = await aio_pika.connect_robust(RABBITMQ_URI)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
async def on_message(message: aio_pika.IncomingMessage):
async with message.process():
content = message.body.decode("utf-8")
print(f"[Consumer {consumer_id}] Processing message: {content}")
print(f"[Consumer {consumer_id}] From queue: {queue_name}")
print(f"[Consumer {consumer_id}] Routing key: {message.routing_key}")
await asyncio.sleep(1)
consumer_tag = f"multi_consumer_{consumer_id}_{queue_name}"
await queue.consume(on_message, consumer_tag=consumer_tag)
print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
await asyncio.Future()
```
#### 完整测试代码
**测试运行脚本:**
```python
#!/usr/bin/env python3
"""
Run Multi-Queue Load Balancing Test
"""
import asyncio
import sys
import os
# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from product.direct_multi_publish import setup_multi_queue_balance, BalancedProducer
from comsumer.direct_multi_consumer import start_balanced_consumers
async def run_multi_queue_balance_test():
"""Run multi-queue load balancing test with producer and consumer"""
print("=== Running Multi-Queue Load Balancing Test ===")
queue_count = 3
# Start consumer in background
consumer_task = asyncio.create_task(start_balanced_consumers(queue_count=queue_count))
# Wait for consumer to start
await asyncio.sleep(1)
# Setup and publish messages
await setup_multi_queue_balance(queue_count=queue_count)
producer = BalancedProducer(queue_count=queue_count)
await producer.connect()
for i in range(10):
await producer.publish(f"Task {i + 1}: Multi-queue load balancing test")
await asyncio.sleep(0.3)
await producer.close()
# Wait for messages to be processed
await asyncio.sleep(3)
# Cancel consumer
consumer_task.cancel()
print("✅ Multi-queue load balancing test completed successfully!")
if __name__ == "__main__":
asyncio.run(run_multi_queue_balance_test())
```
#### 测试输出:
```
=== Running Multi-Queue Load Balancing Test ===
[Consumer 2] Started, listening to queue: task.queue.2 (tag: multi_consumer_2_task.queue.2)
[Consumer 3] Started, listening to queue: task.queue.3 (tag: multi_consumer_3_task.queue.3)
[Consumer 1] Started, listening to queue: task.queue.1 (tag: multi_consumer_1_task.queue.1)
Queue task.queue.1 bound to routing key: route.1
Queue task.queue.2 bound to routing key: route.2
Queue task.queue.3 bound to routing key: route.3
[Consumer 2] Processing message: Task 1: Multi-queue load balancing test
[Consumer 2] From queue: task.queue.2
[Consumer 2] Routing key: route.2
Message sent: Task 1: Multi-queue load balancing test (routed to route.2)
[Consumer 3] Processing message: Task 2: Multi-queue load balancing test
[Consumer 3] From queue: task.queue.3
[Consumer 3] Routing key: route.3
Message sent: Task 2: Multi-queue load balancing test (routed to route.3)
[Consumer 1] Processing message: Task 3: Multi-queue load balancing test
[Consumer 1] From queue: task.queue.1
[Consumer 1] Routing key: route.1
Message sent: Task 3: Multi-queue load balancing test (routed to route.1)
Message sent: Task 4: Multi-queue load balancing test (routed to route.2)
[Consumer 2] Processing message: Task 4: Multi-queue load balancing test
[Consumer 2] From queue: task.queue.2
[Consumer 2] Routing key: route.2
Message sent: Task 5: Multi-queue load balancing test (routed to route.3)
[Consumer 3] Processing message: Task 5: Multi-queue load balancing test
[Consumer 3] From queue: task.queue.3
[Consumer 3] Routing key: route.3
Message sent: Task 6: Multi-queue load balancing test (routed to route.1)
[Consumer 1] Processing message: Task 6: Multi-queue load balancing test
[Consumer 1] From queue: task.queue.1
[Consumer 1] Routing key: route.1
Message sent: Task 7: Multi-queue load balancing test (routed to route.2)
[Consumer 2] Processing message: Task 7: Multi-queue load balancing test
[Consumer 2] From queue: task.queue.2
[Consumer 2] Routing key: route.2
Message sent: Task 8: Multi-queue load balancing test (routed to route.3)
[Consumer 3] Processing message: Task 8: Multi-queue load balancing test
[Consumer 3] From queue: task.queue.3
[Consumer 3] Routing key: route.3
Message sent: Task 9: Multi-queue load balancing test (routed to route.1)
[Consumer 1] Processing message: Task 9: Multi-queue load balancing test
[Consumer 1] From queue: task.queue.1
[Consumer 1] Routing key: route.1
Message sent: Task 10: Multi-queue load balancing test (routed to route.2)
[Consumer 2] Processing message: Task 10: Multi-queue load balancing test
[Consumer 2] From queue: task.queue.2
[Consumer 2] Routing key: route.2
✅ Multi-queue load balancing test completed successfully!
```
### 3.4 Topic Exchange Demo
#### 核心特性:
- 路由键采用层级化字符串(用 . 分隔,如 order.create.user
- 支持两种通配符:
- `*`匹配1 个层级(如 user.* 可匹配 user.login 但不匹配 user.login.success
- `#`匹配0 个或多个层级(如 order.# 可匹配 order、order.pay、order.pay.success
#### 架构关系图:
```mermaid
graph TD
P[Producer<br/>topic_publish] --> E[Topic Exchange<br/>demo.topic]
E -->|binding: #.critical| Q1[Queue<br/>demo.topic.queue-critical]
E -->|binding: order.#| Q2[Queue<br/>demo.topic.queue-order]
E -->|binding: user.login.*| Q3[Queue<br/>demo.topic.queue-user.login]
Q1 --> C1[CriticalHandler<br/>topic_CriticalHandler]
Q2 --> C2[OrderHandler<br/>topic_OrderHandler]
Q3 --> C3[UserLoginHandler<br/>topic_UserLoginHandler]
subgraph "路由键示例"
R1[order.create.critical] -.->|匹配 #.critical 和 order.#| Q1
R1 -.-> Q2
R2[user.login.success] -.->|匹配 user.login.*| Q3
R3[system.log.info] -.->|无匹配| X[消息丢弃]
end
style P fill:#e1f5fe
style E fill:#f3e5f5
style Q1 fill:#ffebee
style Q2 fill:#e8f5e8
style Q3 fill:#fff3e0
style C1 fill:#ffebee
style C2 fill:#e8f5e8
style C3 fill:#fff3e0
style X fill:#f5f5f5
```
#### 生产者代码:
```python
async def setup_topic_exchange(exchange_name="demo.topic", queue_prefix="demo.topic.queue-"):
connection = await aio_pika.connect_robust(RABBITMQ_URI)
channel = await connection.channel()
topic_exchange = await channel.declare_exchange(
exchange_name,
aio_pika.ExchangeType.TOPIC,
durable=True
)
# 定义队列及对应的绑定键(支持通配符)
queue_bindings = [
(f"{queue_prefix}critical", ["#.critical"]), # 匹配任意前缀+critical
(f"{queue_prefix}order", ["order.#"]), # 匹配所有order开头的路由键
(f"{queue_prefix}user.login", ["user.login.*"]) # 匹配user.login+1个后缀
]
for queue_name, binding_keys in queue_bindings:
queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
for binding_key in binding_keys:
await queue.bind(topic_exchange, routing_key=binding_key)
print(f"Queue {queue_name} bound to routing keys: {binding_keys}")
async def topic_publish(message: str, routing_key: str, exchange_name: str = "demo.topic"):
connection = await aio_pika.connect_robust(RABBITMQ_URI)
channel = await connection.channel()
exchange = await channel.declare_exchange(
exchange_name,
aio_pika.ExchangeType.TOPIC,
durable=True
)
message_obj = aio_pika.Message(
body=message.encode("utf-8"),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
)
await exchange.publish(message_obj, routing_key=routing_key)
print(f"Message sent: {message} (routing key: {routing_key})")
```
#### 消费者代码:
```python
async def topic_consumer(queue_name: str, consumer_id: str):
connection = await aio_pika.connect_robust(RABBITMQ_URI)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
async def on_message(message: aio_pika.IncomingMessage):
async with message.process():
message_content = message.body.decode("utf-8")
print(f"[Consumer {consumer_id}] Received message: {message_content}")
print(f"[Consumer {consumer_id}] Message routing key: {message.routing_key}")
print(f"[Consumer {consumer_id}] From queue: {queue_name}")
await asyncio.sleep(1)
consumer_tag = f"topic_{consumer_id}_{queue_name}"
await queue.consume(on_message, consumer_tag=consumer_tag)
print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
await asyncio.Future()
```
#### 完整测试代码
**测试运行脚本:**
```python
#!/usr/bin/env python3
"""
Run Topic Exchange Test
"""
import asyncio
import sys
import os
# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from product.topic_publish import setup_topic_exchange, topic_publish
from comsumer.topic_consumer import start_all_topic_consumers
async def run_topic_exchange_test():
"""Run topic exchange test with producer and consumer"""
print("=== Running Topic Exchange Test ===")
# Start consumer in background
consumer_task = asyncio.create_task(start_all_topic_consumers())
# Wait for consumer to start
await asyncio.sleep(1)
# Setup exchange and publish messages
await setup_topic_exchange()
test_messages = [
("Order creation failed (critical error)", "order.create.critical"),
("User login successful", "user.login.success"),
("Order payment completed", "order.pay.success"),
("System crash (critical error)", "system.crash.critical"),
("User login failed", "user.login.failed"),
("Normal system log", "system.log.info") # Won't match any binding key, will be discarded
]
for msg, routing_key in test_messages:
await topic_publish(msg, routing_key)
await asyncio.sleep(0.5)
# Wait for messages to be processed
await asyncio.sleep(3)
# Cancel consumer
consumer_task.cancel()
print("✅ Topic exchange test completed successfully!")
if __name__ == "__main__":
asyncio.run(run_topic_exchange_test())
```
#### 测试输出:
```
=== Running Topic Exchange Test ===
[Consumer CriticalHandler] Started, listening to queue: demo.topic.queue-critical (tag: topic_CriticalHandler_demo.topic.queue-critical)
[Consumer UserLoginHandler] Started, listening to queue: demo.topic.queue-user.login (tag: topic_UserLoginHandler_demo.topic.queue-user.login)
[Consumer OrderHandler] Started, listening to queue: demo.topic.queue-order (tag: topic_OrderHandler_demo.topic.queue-order)
Queue demo.topic.queue-critical bound to routing keys: ['#.critical']
Queue demo.topic.queue-order bound to routing keys: ['order.#']
Queue demo.topic.queue-user.login bound to routing keys: ['user.login.*']
[Consumer OrderHandler] Received message: Order creation failed (critical error)
[Consumer OrderHandler] Message routing key: order.create.critical
[Consumer OrderHandler] From queue: demo.topic.queue-order
[Consumer CriticalHandler] Received message: Order creation failed (critical error)
[Consumer CriticalHandler] Message routing key: order.create.critical
[Consumer CriticalHandler] From queue: demo.topic.queue-critical
Message sent: Order creation failed (critical error) (routing key: order.create.critical)
[Consumer UserLoginHandler] Received message: User login successful
[Consumer UserLoginHandler] Message routing key: user.login.success
[Consumer UserLoginHandler] From queue: demo.topic.queue-user.login
Message sent: User login successful (routing key: user.login.success)
[Consumer OrderHandler] Received message: Order payment completed
[Consumer OrderHandler] Message routing key: order.pay.success
[Consumer OrderHandler] From queue: demo.topic.queue-order
Message sent: Order payment completed (routing key: order.pay.success)
[Consumer CriticalHandler] Received message: System crash (critical error)
[Consumer CriticalHandler] Message routing key: system.crash.critical
[Consumer CriticalHandler] From queue: demo.topic.queue-critical
Message sent: System crash (critical error) (routing key: system.crash.critical)
[Consumer UserLoginHandler] Received message: User login failed
[Consumer UserLoginHandler] Message routing key: user.login.failed
[Consumer UserLoginHandler] From queue: demo.topic.queue-user.login
Message sent: User login failed (routing key: user.login.failed)
Message sent: Normal system log (routing key: system.log.info)
✅ Topic exchange test completed successfully!
```
### 3.5 可靠的RabbitMQ生产者消费者
#### 可靠性保证机制
要确保消息消费的可靠性,需要从以下几个方面入手:
**1. 消息持久化**
- 交换器持久化:`durable=True`
- 队列持久化:`durable=True`
- 消息持久化:`delivery_mode=PERSISTENT`
**2. 消息确认机制**
- 自动确认:`async with message.process()`
- 手动确认:`message.ack()` / `message.nack()`
- 确保消息处理完成后才确认
**3. 消息幂等性**
- 使用消息ID去重
- 数据库记录已处理的消息ID
- 防止重复处理相同消息
**4. 重试机制**
- 可配置最大重试次数
- 消费者内部重试,避免消息重新入队
**5. 死信队列**
- 处理失败消息的完整解决方案
- 自动创建死信交换器和队列
- 详细的错误信息记录
#### 关键代码实现
**可靠生产者代码:**
```python
class ReliableProducer:
"""Reliable Message Producer"""
def _generate_message_id(self, message_data: Dict[str, Any]) -> str:
"""Generate message ID for message"""
message_type = message_data.get('type', '')
content = message_data.get('content', '')
# For duplicate_test type messages, generate fixed ID based on content
if message_type == 'duplicate_test':
import hashlib
content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
return f"duplicate_{content_hash[:8]}"
else:
return f"msg_{asyncio.get_running_loop().time()}"
async def publish_reliable_message(self, message_data: Dict[str, Any]) -> bool:
"""Publish reliable message"""
try:
# Generate message ID
message_id = self._generate_message_id(message_data)
# Add message metadata
message_data.update({
'timestamp': datetime.now().isoformat(),
'message_id': message_id
})
# Create persistent message
message = aio_pika.Message(
body=json.dumps(message_data, ensure_ascii=False).encode('utf-8'),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # Message persistence
message_id=message_id,
timestamp=datetime.now()
)
# Send message and wait for confirmation
await self.exchange.publish(message, routing_key="reliable")
logger.info(f"[Producer] Message sent: {message_id}")
return True
except Exception as e:
logger.error(f"[Producer] Failed to send message: {e}")
return False
```
**可靠消费者代码:**
```python
class ReliableConsumer:
"""Reliable Message Consumer"""
def __init__(self, queue_name: Optional[str] = None, consumer_name: Optional[str] = None):
self.processed_messages: Set[str] = set() # Store processed message IDs
async def process_message(self, message: aio_pika.IncomingMessage):
"""Core message processing logic"""
try:
# Parse message
message_data = json.loads(message.body.decode('utf-8'))
message_id = message_data.get('message_id')
# Check if message has been processed before (idempotency check)
if message_id in self.processed_messages:
logger.warning(f"[Consumer-{self.consumer_name}] 🚫 Duplicate message detected, skipping: {message_id}")
await message.ack()
return
logger.info(f"[Consumer-{self.consumer_name}] Starting to process message: {message_id}")
# Retry processing message directly
success = await self.retry_process_message(message_data, message_id, 0)
# Only record processed message ID after successful processing
if success:
self.processed_messages.add(message_id)
await message.ack()
logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed and acknowledged")
else:
# Processing failed, send to dead letter queue
await self.send_to_dead_letter_queue(message, message_id, "Processing failed")
await message.ack()
except Exception as e:
logger.error(f"[Consumer-{self.consumer_name}] Failed to process message: {e}")
await message.ack()
async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool:
"""Retry processing message directly"""
max_retries = config.max_retries
last_error = None
for attempt in range(max_retries + 1):
try:
logger.info(f"[Consumer-{self.consumer_name}] Attempting to process message {message_id}, attempt {attempt + 1}")
await self.message_handler(message_data)
logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed successfully")
return True
except Exception as e:
last_error = str(e)
logger.warning(f"[Consumer-{self.consumer_name}] Message {message_id} failed on attempt {attempt + 1}: {e}")
if attempt < max_retries:
await asyncio.sleep(1)
else:
logger.error(f"[Consumer-{self.consumer_name}] Message {message_id} failed after {max_retries} retries: {last_error}")
return False
```
**死信队列消费者代码:**
```python
class DeadLetterConsumer:
"""Dead Letter Queue Consumer"""
async def process_dead_letter_message(self, message: aio_pika.IncomingMessage):
"""Process dead letter message"""
try:
# Parse dead letter message
dead_letter_data = json.loads(message.body.decode('utf-8'))
original_message = dead_letter_data.get('original_message', {})
error_info = dead_letter_data.get('error_info', 'Unknown')
message_id = dead_letter_data.get('message_id', 'Unknown')
# Print dead letter message information
logger.error("=" * 50)
logger.error("[Dead Letter Consumer] Received Dead Letter Message:")
logger.error(f"[Dead Letter Consumer] Message ID: {message_id}")
logger.error(f"[Dead Letter Consumer] Message Content: {json.dumps(original_message, ensure_ascii=False, indent=2)}")
logger.error(f"[Dead Letter Consumer] Error Reason: {error_info}")
logger.error("=" * 50)
# Save to database
await self.save_to_database(original_message, error_info, message_id)
# Acknowledge dead letter message
await message.ack()
logger.info(f"[Dead Letter Consumer] Dead letter message {message_id} processed")
except Exception as e:
logger.error(f"[Dead Letter Consumer] Failed to process dead letter message: {e}")
await message.nack(requeue=False)
```
#### 完整测试代码
**测试运行脚本:**
```python
"""
RabbitMQ 可靠消息传递测试模块
"""
import asyncio
import logging
from reliable_mq import ReliableProducer, ReliableConsumer
from reliable_mq.dead_letter_consumer import DeadLetterConsumer
from reliable_mq.config import config
# 配置日志
logging.basicConfig(
level=getattr(logging, config.log_level),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def run_context_manager_messaging():
"""使用上下文管理器测试可靠消息传递"""
logger.info("=== 使用上下文管理器测试可靠消息传递 ===")
# 使用异步上下文管理器
async with ReliableProducer() as producer:
async with ReliableConsumer(consumer_name="context_test_consumer") as consumer:
async with DeadLetterConsumer() as dead_letter_consumer:
# 启动消费者(在后台运行)
consumer_task = asyncio.create_task(consumer.start_consuming())
dead_letter_task = asyncio.create_task(dead_letter_consumer.start_consuming())
# 等待消费者启动
await asyncio.sleep(1)
# 发送测试消息
test_messages = [
{"content": "重要业务消息1", "type": "business"},
{"content": "系统通知消息2", "type": "notification"},
{"content": "用户操作消息3", "type": "user_action"},
{"content": "重复消息测试", "type": "duplicate_test"},
{"content": "重复消息测试", "type": "duplicate_test"}, # 重复消息
{"content": "会失败的消息1", "type": "will_fail"}, # 这些消息会失败并进入死信队列
{"content": "会失败的消息2", "type": "will_fail"},
{"content": "会失败的消息3", "type": "will_fail"},
]
for msg in test_messages:
await producer.publish_reliable_message(msg)
await asyncio.sleep(0.5)
# 等待消息处理完成
await asyncio.sleep(30)
# 取消任务
consumer_task.cancel()
dead_letter_task.cancel()
if __name__ == '__main__':
asyncio.run(run_context_manager_messaging())
```
**配置文件:**
```python
"""
RabbitMQ 可靠消息传递配置模块
"""
import os
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class Config:
"""配置类"""
# RabbitMQ 连接配置
rabbitmq_uri: str = "amqp://guest:guest@localhost:5673/"
# 交换器和队列配置
exchange_name: str = "reliable.exchange"
queue_name: str = "reliable.queue"
# 死信队列配置
dead_letter_exchange: str = "reliable.dead.letter.exchange"
dead_letter_queue: str = "reliable.dead.letter.queue"
# 重试配置
max_retries: int = 3
message_ttl: int = 300000 # 5分钟
# QoS 配置
prefetch_count: int = 1
# 日志配置
log_level: str = "INFO"
def get_connection_config(self) -> Dict[str, Any]:
"""获取连接配置"""
return {
'uri': self.rabbitmq_uri,
'prefetch_count': self.prefetch_count
}
def get_dead_letter_config(self) -> Dict[str, str]:
"""获取死信队列配置"""
return {
'dead_letter_exchange': self.dead_letter_exchange,
'dead_letter_queue': self.dead_letter_queue
}
# 全局配置实例
config = Config()
```
**自定义消息处理函数:**
```python
async def default_message_handler(message_data: Dict[str, Any]):
"""Default message handler function"""
await asyncio.sleep(1) # Simulate processing time
message_type = message_data.get('type', '')
content = message_data.get('content', '')
# Simulate business logic processing
if message_type == 'will_fail':
raise Exception(f"Simulated business processing failure: {content}")
logger.info(f"[Consumer] Business logic processing completed: {content}")
```
#### 测试结果
```
2025-09-07 11:25:02,498 - __main__ - INFO - === 使用上下文管理器测试可靠消息传递 ===
2025-09-07 11:25:02,509 - reliable_mq.reliable_producer - INFO - [Producer] Connected, queue: reliable.queue
2025-09-07 11:25:02,513 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Connected, listening to queue: reliable.queue
2025-09-07 11:25:02,518 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Connected
2025-09-07 11:25:03,523 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Starting to process message: msg_323632.845174041
2025-09-07 11:25:03,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message content: {'content': '重要业务消息1', 'type': 'business', 'timestamp': '2025-09-07T11:25:03.519240', 'message_id': 'msg_323632.845174041'}
2025-09-07 11:25:03,524 - reliable_mq.reliable_producer - INFO - [Producer] Message sent: msg_323632.845174041 (type: business, content: 重要业务消息1)
2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Business logic processing completed: 重要业务消息1
2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed successfully
2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed and acknowledged
2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Current processed message count: 1
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ==================================================
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] 🚫 Duplicate message detected, skipping:
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message ID: duplicate_090f7015
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message content: {
"content": "重复消息测试",
"type": "duplicate_test",
"timestamp": "2025-09-07T11:25:05.546930",
"message_id": "duplicate_090f7015"
}
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Total processed messages: 4
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ==================================================
2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message msg_323635.377526708 failed after 3 retries: Simulated business processing failure: 会失败的消息1
2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message sent to dead letter queue: msg_323635.377526708, error: Processing failed
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ==================================================
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Received Dead Letter Message:
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message ID: msg_323635.377526708
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message Content: {
"content": "会失败的消息1",
"type": "will_fail",
"timestamp": "2025-09-07T11:25:06.051557",
"message_id": "msg_323635.377526708"
}
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Error Reason: Processing failed
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ==================================================
2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] 💾 Dead letter message saved to database: msg_323635.377526708
2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Database Record: {
"id": "msg_323635.377526708",
"original_message": {
"content": "会失败的消息1",
"type": "will_fail",
"timestamp": "2025-09-07T11:25:06.051557",
"message_id": "msg_323635.377526708"
},
"error_info": "Processing failed",
"created_at": "2025-09-07T11:25:15.064341",
"status": "failed"
}
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ==================================================
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message Statistics:
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Total Processed: 4
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message IDs: ['msg_323632.845174041', 'msg_323633.351571583', 'duplicate_090f7015', 'msg_323633.854272166']
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ==================================================
```
#### 架构关系图
![img_5.png](assert/img_5.png)
```mermaid
graph TD
P[ReliableProducer<br/>可靠生产者] --> E[Reliable Exchange<br/>reliable.exchange]
E --> Q[Reliable Queue<br/>reliable.queue]
Q --> C[ReliableConsumer<br/>可靠消费者]
C -->|处理成功| ACK[Message ACK<br/>消息确认]
C -->|处理失败| RETRY[Retry Logic<br/>重试机制]
RETRY -->|重试成功| ACK
RETRY -->|重试失败| DLQ[Dead Letter Queue<br/>死信队列]
DLQ --> DLC[DeadLetterConsumer<br/>死信消费者]
DLC --> DB[(Database<br/>数据库)]
subgraph "可靠性保证"
PERSIST[Message Persistence<br/>消息持久化]
IDEMPOTENT[Idempotency Check<br/>幂等性检查]
CONFIRM[Publisher Confirmation<br/>发布确认]
end
subgraph "消息流程"
MSG1[正常消息] --> SUCCESS[处理成功]
MSG2[重复消息] --> SKIP[跳过处理]
MSG3[失败消息] --> FAIL[重试后失败]
end
style P fill:#e1f5fe
style E fill:#f3e5f5
style Q fill:#e8f5e8
style C fill:#fff3e0
style DLQ fill:#ffebee
style DLC fill:#ffebee
style DB fill:#f3e5f5
style ACK fill:#e8f5e8
style RETRY fill:#fff3e0
```