diff --git a/README.md b/README.md new file mode 100644 index 0000000..dd37e92 --- /dev/null +++ b/README.md @@ -0,0 +1,1375 @@ +# RabbitMQ 培训文档 + +--- + +## 1. MQ 的基本概念 + +### 1.1 什么是MQ? + +MQ全称为Message Queue即消息队列 + +- "消息队列" 是在消息的传输过程中保存消息的容器 +- 它是典型的生产者——消费者模型:生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息 +- 这样的好处:生产者只需要关注发消息,消费者只需要关注收消息,二者没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦 + +### 1.2 为什么要使用 MQ? + +消息队列(Message Queue)是一种在分布式系统中用于异步通信的中间件,核心作用是通过**存储-转发**机制实现消息的异步传递,从而解决系统间耦合、流量削峰、异步处理等问题。 + +#### 主要作用: + +**1. 解耦系统组件** +- 传统系统中,组件间通常直接调用(如 A 服务直接调用 B 服务),导致强耦合 +- 引入消息队列后,A 服务只需向队列发送消息,无需关心谁接收或处理;B 服务从队列中获取消息即可 +- 双方通过消息格式约定通信,互不依赖,降低了系统耦合度 + +**2. 异步处理,提高效率** +- 同步处理场景中,一个操作可能需要等待多个服务依次完成,总耗时是各步骤之和 +- 消息队列支持异步处理:主流程完成后,只需发送消息到队列,无需等待后续步骤完成即可返回结果 +- 其他服务异步从队列获取消息并处理,显著提升系统响应速度和吞吐量 + +**3. 流量削峰,保护系统** +- 突发流量(如电商秒杀、直播带货)可能瞬间压垮后端服务 +- 消息队列可作为"缓冲池":高峰期请求先进入队列,后端服务按自身处理能力从队列中消费消息 + +**4. 数据同步与分发** +- 同一消息可被多个消费者消费,实现"一次发送,多端处理" +- 跨系统数据同步,通过消息队列确保数据一致性 + +**5. 重试与容错** +- 若消费者服务临时故障,消息队列会保留消息,待服务恢复后重新投递 +- 配合重试机制,可解决网络波动、服务暂时不可用等问题 + +#### 典型应用场景: +- 电商下单:订单创建 → 消息队列 → 库存扣减、支付处理、物流通知等 +- 日志收集:各服务日志发送到队列,由日志系统统一消费、存储、分析 +- 分布式事务:通过消息队列实现最终一致性 +- 延迟任务:如订单超时未支付自动取消 + +--- + +## 2. RabbitMQ + +### 2.1 介绍 + +RabbitMQ 是一个开源的消息代理软件(也可称为消息队列中间件),由 Erlang 语言编写,基于 AMQP(高级消息队列协议) 实现,在分布式系统中用于实现应用程序之间的异步通信和解耦。 + +#### 特点: +- **多种协议支持**:除了 AMQP,RabbitMQ 还支持 STOMP、MQTT 等多种消息协议 +- **高可靠性**:通过消息持久化、集群、镜像队列等机制,保证消息不丢失 +- **灵活的路由机制**:交换器提供了丰富的消息路由规则 +- **多语言客户端支持**:提供了多种编程语言的客户端库 +- **管理界面友好**:具备一个可视化的管理界面 + +### 2.2 核心组件 + +``` +生产者 → 信道 → 交换器(Exchange) → 队列(Queue) → 信道 → 消费者 +``` + +#### 核心组件详解: + +**1. 生产者(Producer)** +- 定义:发送消息的应用程序或服务 +- 作用:将业务数据封装为消息,发送到 RabbitMQ 服务器 +- 特点:无需关心消息的最终接收者,只需指定消息发送到哪个交换器 + +**2. 消费者(Consumer)** +- 定义:接收并处理消息的应用程序或服务 +- 作用:持续监听队列,当有消息到达时,从队列中获取消息并进行业务处理 +- 特点:消费者与队列绑定,可通过自动确认或手动确认机制告知 RabbitMQ 消息是否处理完成 + +**3. 队列(Queue)** +- 定义:存储消息的容器,是消息的最终落脚点 +- 核心属性: + - 名称:队列的唯一标识 + - 持久化(Durable):若为 true,队列会在 RabbitMQ 重启后保留 + - 排他性(Exclusive):若为 true,队列仅对当前连接可见 + - 自动删除(Auto-delete):若为 true,当最后一个消费者断开连接后,队列自动删除 + +**4. 交换器(Exchange)** +- 定义:接收生产者发送的消息,并根据路由规则将消息转发到一个或多个队列 +- 作用:类似于"路由器",负责消息的路由逻辑 +- 类型: + - **直连交换器(Direct Exchange)**:消息的 Routing Key 与队列绑定的 Binding Key 完全匹配 + - **主题交换器(Topic Exchange)**:支持通配符(* 匹配单个单词,# 匹配多个单词) + - **扇出交换器(Fanout Exchange)**:忽略 Routing Key,将消息广播到所有绑定的队列 + - **头交换器(Headers Exchange)**:根据消息属性(Headers)而非 Routing Key 匹配 + +**5. 绑定(Binding)** +- 定义:交换器与队列之间的关联关系,包含路由规则 +- 作用:告诉交换器"哪些队列需要接收什么样的消息" + +**6. 连接(Connection)** +- 定义:生产者/消费者与 RabbitMQ 服务器之间的 TCP 连接 +- 特点:建立 TCP 连接开销较大,因此通常会复用连接 + +**7. 信道(Channel)** +- 定义:建立在 TCP 连接之上的虚拟连接,是消息传递的实际通道 +- 作用:减少 TCP 连接数量,降低服务器资源消耗 + +### 2.3 安装RabbitMQ + +#### 带Management页面 +```bash +docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq:management +``` + +#### 不带Management页面 +```bash +docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq +``` + +**访问管理页面**:http://localhost:15673 +- 账号:guest +- 密码:guest + +### 2.4 RabbitMQ管理页面 + +管理页面提供了丰富的监控和管理功能: + +- **Overview**:服务器概览,包括连接数、队列数、消息速率等 +- **Connections**:显示所有客户端连接信息 +- **Channels**:显示各个连接的信道信息 +- **Exchanges**:管理交换器 +- **Queues**:管理队列 +- **Admin**:用户和权限管理 + +--- + +## 3. Python集成 + +基于python aio-pika库进行集成 + +### 3.1 Fanout Exchange Demo + +#### 核心特点: +- 忽略路由键(Routing Key),无论设置什么值都会被忽略 +- 消息会广播到所有与之绑定的队列 +- 适合需要 "一对多" 通知的场景(如系统通知、日志广播) + +#### 架构关系图: +![img_1.png](assert/img_1.png) +```mermaid +graph TD + P[Producer
fanout_publish] --> E[Fanout Exchange
demo.fanout] + E --> Q1[Queue
demo.fanout.queue-0] + E --> Q2[Queue
demo.fanout.queue-1] + E --> Q3[Queue
demo.fanout.queue-2] + Q1 --> C1[Consumer 1
fanout_consumer_1] + Q2 --> C2[Consumer 2
fanout_consumer_2] + Q3 --> C3[Consumer 3
fanout_consumer_3] + + style P fill:#e1f5fe + style E fill:#f3e5f5 + style Q1 fill:#e8f5e8 + style Q2 fill:#e8f5e8 + style Q3 fill:#e8f5e8 + style C1 fill:#fff3e0 + style C2 fill:#fff3e0 + style C3 fill:#fff3e0 +``` + +#### 生产者代码: +```python +import asyncio +import aio_pika +from config import RABBITMQ_URI + +async def setup_fanout_exchange(exchange_name="demo.fanout", queue_name_prefix="demo.fanout.queue-"): + # 建立连接 + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + # 1. 声明 Fanout 类型交换器 + fanout_exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.FANOUT, + durable=True # 交换器持久化 + ) + + # 2. 定义需要绑定的队列名称列表 + queue_names = [queue_name_prefix + str(i) for i in range(3)] + + # 3. 循环创建队列并绑定到交换器 + for name in queue_names: + queue = await channel.declare_queue( + name, + durable=True, + auto_delete=False + ) + # 绑定队列到 Fanout 交换器(忽略路由键) + await queue.bind(fanout_exchange, routing_key="") + +async def fanout_publish(message: str = "", exchange_name: str = "demo.fanout"): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + fanout_exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.FANOUT, + durable=True + ) + + message = aio_pika.Message( + body=message.encode("utf-8"), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT # 消息持久化 + ) + + # 发送消息到Fanout交换器 + await fanout_exchange.publish(message, routing_key="") + await connection.close() +``` + +#### 消费者代码: +```python +import asyncio +import aio_pika +from config import RABBITMQ_URI + +async def fanout_consumer(queue_name: str, consumer_id: int): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + await channel.set_qos(prefetch_count=1) + + queue = await channel.declare_queue( + queue_name, + durable=True, + auto_delete=False + ) + + async def on_message_received(message: aio_pika.IncomingMessage): + async with message.process(): + message_content = message.body.decode("utf-8") + print(f"[Fanout Consumer {consumer_id}] Received broadcast message:") + print(f" Listening queue: {queue_name}") + print(f" Message content: {message_content}") + print(f" Message persistence: {'Yes' if message.delivery_mode == 2 else 'No'}") + await asyncio.sleep(1) + + consumer_tag = f"fanout_consumer_{consumer_id}_{queue_name}" + await queue.consume(on_message_received, consumer_tag=consumer_tag) + print(f"[Fanout Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") + await asyncio.Future() +``` + +#### 完整测试代码 + +**测试运行脚本:** +```python +#!/usr/bin/env python3 +""" +Run Fanout Exchange Test +""" + +import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from product.fanout_publish import fanout_publish, setup_fanout_exchange +from comsumer.fanout_consumer import start_all_fanout_consumers + + +async def run_fanout_test(): + """Run fanout exchange test with producer and consumer""" + print("=== Running Fanout Exchange Test ===") + + # Start consumer in background + consumer_task = asyncio.create_task(start_all_fanout_consumers()) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup and publish messages + await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-") + await fanout_publish(message="hello world", exchange_name="demo.fanout") + await fanout_publish(message="test message 2", exchange_name="demo.fanout") + await fanout_publish(message="test message 3", exchange_name="demo.fanout") + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Fanout test completed successfully!") + + +if __name__ == "__main__": + asyncio.run(run_fanout_test()) +``` + +#### 测试输出: +``` +=== Running Fanout Exchange Test === +[Fanout Consumer 2] Started, listening to queue: demo.fanout.queue-1 (tag: fanout_consumer_2_demo.fanout.queue-1) +[Fanout Consumer 1] Started, listening to queue: demo.fanout.queue-0 (tag: fanout_consumer_1_demo.fanout.queue-0) +[Fanout Consumer 3] Started, listening to queue: demo.fanout.queue-2 (tag: fanout_consumer_3_demo.fanout.queue-2) + +[Fanout Consumer 2] Received broadcast message: + Listening queue: demo.fanout.queue-1 + Message content: hello world + Message persistence: Yes +[Fanout Consumer 1] Received broadcast message: + Listening queue: demo.fanout.queue-0 + Message content: hello world + Message persistence: Yes +[Fanout Consumer 3] Received broadcast message: + Listening queue: demo.fanout.queue-2 + Message content: hello world + Message persistence: Yes +[Fanout Consumer 3] Received broadcast message: + Listening queue: demo.fanout.queue-2 + Message content: test message 2 + Message persistence: Yes +[Fanout Consumer 1] Received broadcast message: + Listening queue: demo.fanout.queue-0 + Message content: test message 2 + Message persistence: Yes +[Fanout Consumer 2] Received broadcast message: + Listening queue: demo.fanout.queue-1 + Message content: test message 2 + Message persistence: Yes +[Fanout Consumer 1] Received broadcast message: + Listening queue: demo.fanout.queue-0 + Message content: test message 3 + Message persistence: Yes +[Fanout Consumer 2] Received broadcast message: + Listening queue: demo.fanout.queue-1 + Message content: test message 3 + Message persistence: Yes +[Fanout Consumer 3] Received broadcast message: + Listening queue: demo.fanout.queue-2 + Message content: test message 3 + Message persistence: Yes +✅ Fanout test completed successfully! +``` + +### 3.2 Direct Exchange Demo + +#### 核心特点: +- 基于路由键(routing key)与绑定键(binding key)的精确匹配 +- 只有当消息的路由键与队列的绑定键完全一致时,消息才会被路由到该队列 +- 适合需要精准路由的场景(如日志级别区分:error、warning、info 分别路由到不同队列) + +#### 架构关系图: + +```mermaid +graph TD + P[Producer
direct_publish] --> E[Direct Exchange
demo.direct] + E -->|routing_key: error| Q1[Queue
demo.direct.queue-error] + E -->|routing_key: warning| Q2[Queue
demo.direct.queue-warning] + E -->|routing_key: info| Q3[Queue
demo.direct.queue-info] + E -->|routing_key: debug| Q3 + Q1 --> C1[Error Level Consumer
direct_error_level] + Q2 --> C2[Warning Level Consumer
direct_warning_level] + Q3 --> C3[Info/Debug Level Consumer
direct_info/debug_level] + + style P fill:#e1f5fe + style E fill:#f3e5f5 + style Q1 fill:#ffebee + style Q2 fill:#fff3e0 + style Q3 fill:#e8f5e8 + style C1 fill:#ffebee + style C2 fill:#fff3e0 + style C3 fill:#e8f5e8 +``` + +#### 生产者代码: +```python +async def setup_direct_exchange(exchange_name="demo.direct", queue_prefix="demo.direct.queue-"): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + direct_exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.DIRECT, + durable=True + ) + + # 定义队列及对应的绑定键 + queue_bindings = [ + (f"{queue_prefix}error", ["error"]), # 处理错误级别的消息 + (f"{queue_prefix}warning", ["warning"]), # 处理警告级别的消息 + (f"{queue_prefix}info", ["info", "debug"]) # 处理信息和调试级别的消息 + ] + + for queue_name, binding_keys in queue_bindings: + queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) + for binding_key in binding_keys: + await queue.bind(direct_exchange, routing_key=binding_key) + print(f"Queue {queue_name} bound to routing keys: {binding_keys}") + +async def direct_publish(message: str, routing_key: str, exchange_name: str = "demo.direct"): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.DIRECT, + durable=True + ) + + message_obj = aio_pika.Message( + body=message.encode("utf-8"), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ) + + await exchange.publish(message_obj, routing_key=routing_key) + print(f"Message sent: {message} (routing key: {routing_key})") +``` + +#### 消费者代码: +```python +async def direct_consumer(queue_name: str, consumer_label: str): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + await channel.set_qos(prefetch_count=1) + + queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) + + async def on_message_received(message: aio_pika.IncomingMessage): + async with message.process(): + message_content = message.body.decode("utf-8") + print(f"[{consumer_label} Consumer] Received message:") + print(f" Queue name: {queue_name}") + print(f" Message content: {message_content}") + print(f" Message routing key: {message.routing_key}") + print(f" Processing time: {asyncio.get_running_loop().time():.2f}s") + + # 模拟不同级别消息的处理耗时 + if "error" in queue_name: + await asyncio.sleep(2) + elif "warning" in queue_name: + await asyncio.sleep(1) + elif "info" in queue_name: + await asyncio.sleep(0.5) + + consumer_tag = f"direct_{consumer_label.lower().replace(' ', '_')}_{queue_name}" + await queue.consume(on_message_received, consumer_tag=consumer_tag) + print(f"[{consumer_label} Consumer] Started, listening to queue: {queue_name} (tag: {consumer_tag})") + await asyncio.Future() +``` + +#### 完整测试代码 + +**测试运行脚本:** +```python +#!/usr/bin/env python3 +""" +Run Direct Exchange Test +""" + +import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from product.direct_publish import setup_direct_exchange, direct_publish +from comsumer.direct_consumer import start_all_direct_consumers + + +async def run_direct_exchange_test(): + """Run direct exchange test with producer and consumer""" + print("=== Running Direct Exchange Test ===") + + # Start consumer in background + consumer_task = asyncio.create_task(start_all_direct_consumers()) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup exchange and publish messages + await setup_direct_exchange() + + test_messages = [ + ("System crash, unable to start", "error"), # Route to error queue + ("Disk space insufficient", "warning"), # Route to warning queue + ("User login successful", "info"), # Route to info queue + ("Debug info: Database connection successful", "debug") # Route to info queue + ] + + for msg, routing_key in test_messages: + await direct_publish(msg, routing_key) + await asyncio.sleep(0.5) + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Direct exchange test completed successfully!") + + +if __name__ == "__main__": + asyncio.run(run_direct_exchange_test()) +``` + +#### 测试输出: +``` +=== Running Direct Exchange Test === +[Info/Debug Level Consumer] Started, listening to queue: demo.direct.queue-info (tag: direct_info/debug_level_demo.direct.queue-info) +[Warning Level Consumer] Started, listening to queue: demo.direct.queue-warning (tag: direct_warning_level_demo.direct.queue-warning) +[Error Level Consumer] Started, listening to queue: demo.direct.queue-error (tag: direct_error_level_demo.direct.queue-error) + +Queue demo.direct.queue-error bound to routing keys: ['error'] +Queue demo.direct.queue-warning bound to routing keys: ['warning'] +Queue demo.direct.queue-info bound to routing keys: ['info', 'debug'] + +[Error Level Consumer] Received message: + Queue name: demo.direct.queue-error + Message content: System crash, unable to start + Message routing key: error + Processing time: 322774.03s + +Message sent: System crash, unable to start (routing key: error) +[Warning Level Consumer] Received message: + Queue name: demo.direct.queue-warning + Message content: Disk space insufficient + Message routing key: warning + Processing time: 322774.54s + +Message sent: Disk space insufficient (routing key: warning) +[Info/Debug Level Consumer] Received message: + Queue name: demo.direct.queue-info + Message content: User login successful + Message routing key: info + Processing time: 322775.06s + +Message sent: User login successful (routing key: info) +[Info/Debug Level Consumer] Received message: + Queue name: demo.direct.queue-info + Message content: Debug info: Database connection successful + Message routing key: debug + Processing time: 322775.57s + +Message sent: Debug info: Database connection successful (routing key: debug) +✅ Direct exchange test completed successfully! +``` + +### 3.3 Direct Exchange Demo (负载均衡) + +#### 实现原理: +1. 创建多个队列:每个队列绑定到同一个 Direct Exchange,但使用不同的路由键 +2. 生产者路由策略:通过轮询、随机或按消息特征哈希的方式,选择一个路由键 +3. 消费者处理:每个队列对应一个或多个消费者,各自处理分配到的消息 + +#### 架构关系图: +```mermaid +graph TD + P[BalancedProducer
轮询发送] --> E[Direct Exchange
demo.direct.multi.queue] + E -->|routing_key: route.1| Q1[Queue
task.queue.1] + E -->|routing_key: route.2| Q2[Queue
task.queue.2] + E -->|routing_key: route.3| Q3[Queue
task.queue.3] + Q1 --> C1[Consumer 1
multi_consumer_1] + Q2 --> C2[Consumer 2
multi_consumer_2] + Q3 --> C3[Consumer 3
multi_consumer_3] + + P -.->|轮询算法| P + + style P fill:#e1f5fe + style E fill:#f3e5f5 + style Q1 fill:#e8f5e8 + style Q2 fill:#e8f5e8 + style Q3 fill:#e8f5e8 + style C1 fill:#fff3e0 + style C2 fill:#fff3e0 + style C3 fill:#fff3e0 +``` + +#### 生产者代码: +```python +class BalancedProducer: + def __init__(self, exchange_name="demo.direct.multi.queue", queue_count=3): + self.exchange_name = exchange_name + self.queue_count = queue_count + self.current_index = 0 # 轮询索引 + + async def connect(self): + self.connection = await aio_pika.connect_robust(RABBITMQ_URI) + self.channel = await self.connection.channel() + self.exchange = await self.channel.declare_exchange( + self.exchange_name, + aio_pika.ExchangeType.DIRECT, + durable=True + ) + + async def publish(self, message: str): + # 轮询算法:每次发送后切换到下一个路由键 + self.current_index = (self.current_index + 1) % self.queue_count + route_key = f"route.{self.current_index + 1}" + + message_obj = aio_pika.Message( + body=message.encode("utf-8"), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ) + + await self.exchange.publish(message_obj, routing_key=route_key) + print(f"Message sent: {message} (routed to {route_key})") +``` + +#### 消费者代码: +```python +async def queue_consumer(queue_name: str, consumer_id: int): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + await channel.set_qos(prefetch_count=1) + + queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) + + async def on_message(message: aio_pika.IncomingMessage): + async with message.process(): + content = message.body.decode("utf-8") + print(f"[Consumer {consumer_id}] Processing message: {content}") + print(f"[Consumer {consumer_id}] From queue: {queue_name}") + print(f"[Consumer {consumer_id}] Routing key: {message.routing_key}") + await asyncio.sleep(1) + + consumer_tag = f"multi_consumer_{consumer_id}_{queue_name}" + await queue.consume(on_message, consumer_tag=consumer_tag) + print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") + await asyncio.Future() +``` + +#### 完整测试代码 + +**测试运行脚本:** +```python +#!/usr/bin/env python3 +""" +Run Multi-Queue Load Balancing Test +""" + +import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from product.direct_multi_publish import setup_multi_queue_balance, BalancedProducer +from comsumer.direct_multi_consumer import start_balanced_consumers + + +async def run_multi_queue_balance_test(): + """Run multi-queue load balancing test with producer and consumer""" + print("=== Running Multi-Queue Load Balancing Test ===") + + queue_count = 3 + + # Start consumer in background + consumer_task = asyncio.create_task(start_balanced_consumers(queue_count=queue_count)) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup and publish messages + await setup_multi_queue_balance(queue_count=queue_count) + + producer = BalancedProducer(queue_count=queue_count) + await producer.connect() + + for i in range(10): + await producer.publish(f"Task {i + 1}: Multi-queue load balancing test") + await asyncio.sleep(0.3) + + await producer.close() + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Multi-queue load balancing test completed successfully!") + + +if __name__ == "__main__": + asyncio.run(run_multi_queue_balance_test()) +``` + +#### 测试输出: +``` +=== Running Multi-Queue Load Balancing Test === +[Consumer 2] Started, listening to queue: task.queue.2 (tag: multi_consumer_2_task.queue.2) +[Consumer 3] Started, listening to queue: task.queue.3 (tag: multi_consumer_3_task.queue.3) +[Consumer 1] Started, listening to queue: task.queue.1 (tag: multi_consumer_1_task.queue.1) + +Queue task.queue.1 bound to routing key: route.1 +Queue task.queue.2 bound to routing key: route.2 +Queue task.queue.3 bound to routing key: route.3 + +[Consumer 2] Processing message: Task 1: Multi-queue load balancing test +[Consumer 2] From queue: task.queue.2 +[Consumer 2] Routing key: route.2 + +Message sent: Task 1: Multi-queue load balancing test (routed to route.2) +[Consumer 3] Processing message: Task 2: Multi-queue load balancing test +[Consumer 3] From queue: task.queue.3 +[Consumer 3] Routing key: route.3 + +Message sent: Task 2: Multi-queue load balancing test (routed to route.3) +[Consumer 1] Processing message: Task 3: Multi-queue load balancing test +[Consumer 1] From queue: task.queue.1 +[Consumer 1] Routing key: route.1 + +Message sent: Task 3: Multi-queue load balancing test (routed to route.1) +Message sent: Task 4: Multi-queue load balancing test (routed to route.2) +[Consumer 2] Processing message: Task 4: Multi-queue load balancing test +[Consumer 2] From queue: task.queue.2 +[Consumer 2] Routing key: route.2 + +Message sent: Task 5: Multi-queue load balancing test (routed to route.3) +[Consumer 3] Processing message: Task 5: Multi-queue load balancing test +[Consumer 3] From queue: task.queue.3 +[Consumer 3] Routing key: route.3 + +Message sent: Task 6: Multi-queue load balancing test (routed to route.1) +[Consumer 1] Processing message: Task 6: Multi-queue load balancing test +[Consumer 1] From queue: task.queue.1 +[Consumer 1] Routing key: route.1 + +Message sent: Task 7: Multi-queue load balancing test (routed to route.2) +[Consumer 2] Processing message: Task 7: Multi-queue load balancing test +[Consumer 2] From queue: task.queue.2 +[Consumer 2] Routing key: route.2 + +Message sent: Task 8: Multi-queue load balancing test (routed to route.3) +[Consumer 3] Processing message: Task 8: Multi-queue load balancing test +[Consumer 3] From queue: task.queue.3 +[Consumer 3] Routing key: route.3 + +Message sent: Task 9: Multi-queue load balancing test (routed to route.1) +[Consumer 1] Processing message: Task 9: Multi-queue load balancing test +[Consumer 1] From queue: task.queue.1 +[Consumer 1] Routing key: route.1 + +Message sent: Task 10: Multi-queue load balancing test (routed to route.2) +[Consumer 2] Processing message: Task 10: Multi-queue load balancing test +[Consumer 2] From queue: task.queue.2 +[Consumer 2] Routing key: route.2 + +✅ Multi-queue load balancing test completed successfully! +``` + +### 3.4 Topic Exchange Demo + +#### 核心特性: +- 路由键采用层级化字符串(用 . 分隔,如 order.create.user) +- 支持两种通配符: + - `*`:匹配1 个层级(如 user.* 可匹配 user.login 但不匹配 user.login.success) + - `#`:匹配0 个或多个层级(如 order.# 可匹配 order、order.pay、order.pay.success) + +#### 架构关系图: +```mermaid +graph TD + P[Producer
topic_publish] --> E[Topic Exchange
demo.topic] + E -->|binding: #.critical| Q1[Queue
demo.topic.queue-critical] + E -->|binding: order.#| Q2[Queue
demo.topic.queue-order] + E -->|binding: user.login.*| Q3[Queue
demo.topic.queue-user.login] + Q1 --> C1[CriticalHandler
topic_CriticalHandler] + Q2 --> C2[OrderHandler
topic_OrderHandler] + Q3 --> C3[UserLoginHandler
topic_UserLoginHandler] + + subgraph "路由键示例" + R1[order.create.critical] -.->|匹配 #.critical 和 order.#| Q1 + R1 -.-> Q2 + R2[user.login.success] -.->|匹配 user.login.*| Q3 + R3[system.log.info] -.->|无匹配| X[消息丢弃] + end + + style P fill:#e1f5fe + style E fill:#f3e5f5 + style Q1 fill:#ffebee + style Q2 fill:#e8f5e8 + style Q3 fill:#fff3e0 + style C1 fill:#ffebee + style C2 fill:#e8f5e8 + style C3 fill:#fff3e0 + style X fill:#f5f5f5 +``` + +#### 生产者代码: +```python +async def setup_topic_exchange(exchange_name="demo.topic", queue_prefix="demo.topic.queue-"): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + topic_exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.TOPIC, + durable=True + ) + + # 定义队列及对应的绑定键(支持通配符) + queue_bindings = [ + (f"{queue_prefix}critical", ["#.critical"]), # 匹配任意前缀+critical + (f"{queue_prefix}order", ["order.#"]), # 匹配所有order开头的路由键 + (f"{queue_prefix}user.login", ["user.login.*"]) # 匹配user.login+1个后缀 + ] + + for queue_name, binding_keys in queue_bindings: + queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) + for binding_key in binding_keys: + await queue.bind(topic_exchange, routing_key=binding_key) + print(f"Queue {queue_name} bound to routing keys: {binding_keys}") + +async def topic_publish(message: str, routing_key: str, exchange_name: str = "demo.topic"): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.TOPIC, + durable=True + ) + + message_obj = aio_pika.Message( + body=message.encode("utf-8"), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ) + + await exchange.publish(message_obj, routing_key=routing_key) + print(f"Message sent: {message} (routing key: {routing_key})") +``` + +#### 消费者代码: +```python +async def topic_consumer(queue_name: str, consumer_id: str): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + await channel.set_qos(prefetch_count=1) + + queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) + + async def on_message(message: aio_pika.IncomingMessage): + async with message.process(): + message_content = message.body.decode("utf-8") + print(f"[Consumer {consumer_id}] Received message: {message_content}") + print(f"[Consumer {consumer_id}] Message routing key: {message.routing_key}") + print(f"[Consumer {consumer_id}] From queue: {queue_name}") + await asyncio.sleep(1) + + consumer_tag = f"topic_{consumer_id}_{queue_name}" + await queue.consume(on_message, consumer_tag=consumer_tag) + print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") + await asyncio.Future() +``` + +#### 完整测试代码 + +**测试运行脚本:** +```python +#!/usr/bin/env python3 +""" +Run Topic Exchange Test +""" + +import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from product.topic_publish import setup_topic_exchange, topic_publish +from comsumer.topic_consumer import start_all_topic_consumers + + +async def run_topic_exchange_test(): + """Run topic exchange test with producer and consumer""" + print("=== Running Topic Exchange Test ===") + + # Start consumer in background + consumer_task = asyncio.create_task(start_all_topic_consumers()) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup exchange and publish messages + await setup_topic_exchange() + + test_messages = [ + ("Order creation failed (critical error)", "order.create.critical"), + ("User login successful", "user.login.success"), + ("Order payment completed", "order.pay.success"), + ("System crash (critical error)", "system.crash.critical"), + ("User login failed", "user.login.failed"), + ("Normal system log", "system.log.info") # Won't match any binding key, will be discarded + ] + + for msg, routing_key in test_messages: + await topic_publish(msg, routing_key) + await asyncio.sleep(0.5) + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Topic exchange test completed successfully!") + + +if __name__ == "__main__": + asyncio.run(run_topic_exchange_test()) +``` + +#### 测试输出: +``` +=== Running Topic Exchange Test === +[Consumer CriticalHandler] Started, listening to queue: demo.topic.queue-critical (tag: topic_CriticalHandler_demo.topic.queue-critical) +[Consumer UserLoginHandler] Started, listening to queue: demo.topic.queue-user.login (tag: topic_UserLoginHandler_demo.topic.queue-user.login) +[Consumer OrderHandler] Started, listening to queue: demo.topic.queue-order (tag: topic_OrderHandler_demo.topic.queue-order) + +Queue demo.topic.queue-critical bound to routing keys: ['#.critical'] +Queue demo.topic.queue-order bound to routing keys: ['order.#'] +Queue demo.topic.queue-user.login bound to routing keys: ['user.login.*'] + +[Consumer OrderHandler] Received message: Order creation failed (critical error) +[Consumer OrderHandler] Message routing key: order.create.critical +[Consumer OrderHandler] From queue: demo.topic.queue-order + +[Consumer CriticalHandler] Received message: Order creation failed (critical error) +[Consumer CriticalHandler] Message routing key: order.create.critical +[Consumer CriticalHandler] From queue: demo.topic.queue-critical + +Message sent: Order creation failed (critical error) (routing key: order.create.critical) +[Consumer UserLoginHandler] Received message: User login successful +[Consumer UserLoginHandler] Message routing key: user.login.success +[Consumer UserLoginHandler] From queue: demo.topic.queue-user.login + +Message sent: User login successful (routing key: user.login.success) +[Consumer OrderHandler] Received message: Order payment completed +[Consumer OrderHandler] Message routing key: order.pay.success +[Consumer OrderHandler] From queue: demo.topic.queue-order + +Message sent: Order payment completed (routing key: order.pay.success) +[Consumer CriticalHandler] Received message: System crash (critical error) +[Consumer CriticalHandler] Message routing key: system.crash.critical +[Consumer CriticalHandler] From queue: demo.topic.queue-critical + +Message sent: System crash (critical error) (routing key: system.crash.critical) +[Consumer UserLoginHandler] Received message: User login failed +[Consumer UserLoginHandler] Message routing key: user.login.failed +[Consumer UserLoginHandler] From queue: demo.topic.queue-user.login + +Message sent: User login failed (routing key: user.login.failed) +Message sent: Normal system log (routing key: system.log.info) +✅ Topic exchange test completed successfully! +``` + +### 3.5 可靠的RabbitMQ生产者消费者 + +#### 可靠性保证机制 + +要确保消息消费的可靠性,需要从以下几个方面入手: + +**1. 消息持久化** +- 交换器持久化:`durable=True` +- 队列持久化:`durable=True` +- 消息持久化:`delivery_mode=PERSISTENT` + +**2. 消息确认机制** +- 自动确认:`async with message.process()` +- 手动确认:`message.ack()` / `message.nack()` +- 确保消息处理完成后才确认 + +**3. 消息幂等性** +- 使用消息ID去重 +- 内存中记录已处理的消息ID +- 防止重复处理相同消息 + +**4. 重试机制** +- 可配置最大重试次数 +- 消费者内部重试,避免消息重新入队 +- 指数退避重试策略 + +**5. 死信队列** +- 处理失败消息的完整解决方案 +- 自动创建死信交换器和队列 +- 详细的错误信息记录 + +#### 关键代码实现 + +**可靠生产者代码:** +```python +class ReliableProducer: + """Reliable Message Producer""" + + def _generate_message_id(self, message_data: Dict[str, Any]) -> str: + """Generate message ID for message""" + message_type = message_data.get('type', '') + content = message_data.get('content', '') + + # For duplicate_test type messages, generate fixed ID based on content + if message_type == 'duplicate_test': + import hashlib + content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() + return f"duplicate_{content_hash[:8]}" + else: + return f"msg_{asyncio.get_running_loop().time()}" + + async def publish_reliable_message(self, message_data: Dict[str, Any]) -> bool: + """Publish reliable message""" + try: + # Generate message ID + message_id = self._generate_message_id(message_data) + + # Add message metadata + message_data.update({ + 'timestamp': datetime.now().isoformat(), + 'message_id': message_id + }) + + # Create persistent message + message = aio_pika.Message( + body=json.dumps(message_data, ensure_ascii=False).encode('utf-8'), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # Message persistence + message_id=message_id, + timestamp=datetime.now() + ) + + # Send message and wait for confirmation + await self.exchange.publish(message, routing_key="reliable") + logger.info(f"[Producer] Message sent: {message_id}") + return True + except Exception as e: + logger.error(f"[Producer] Failed to send message: {e}") + return False +``` + +**可靠消费者代码:** +```python +class ReliableConsumer: + """Reliable Message Consumer""" + + def __init__(self, queue_name: Optional[str] = None, consumer_name: Optional[str] = None): + self.processed_messages: Set[str] = set() # Store processed message IDs + + async def process_message(self, message: aio_pika.IncomingMessage): + """Core message processing logic""" + try: + # Parse message + message_data = json.loads(message.body.decode('utf-8')) + message_id = message_data.get('message_id') + + # Check if message has been processed before (idempotency check) + if message_id in self.processed_messages: + logger.warning(f"[Consumer-{self.consumer_name}] 🚫 Duplicate message detected, skipping: {message_id}") + await message.ack() + return + + logger.info(f"[Consumer-{self.consumer_name}] Starting to process message: {message_id}") + + # Retry processing message directly + success = await self.retry_process_message(message_data, message_id, 0) + + # Only record processed message ID after successful processing + if success: + self.processed_messages.add(message_id) + await message.ack() + logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed and acknowledged") + else: + # Processing failed, send to dead letter queue + await self.send_to_dead_letter_queue(message, message_id, "Processing failed") + await message.ack() + except Exception as e: + logger.error(f"[Consumer-{self.consumer_name}] Failed to process message: {e}") + await message.ack() + + async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool: + """Retry processing message directly""" + max_retries = config.max_retries + last_error = None + + for attempt in range(max_retries + 1): + try: + logger.info(f"[Consumer-{self.consumer_name}] Attempting to process message {message_id}, attempt {attempt + 1}") + await self.message_handler(message_data) + logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed successfully") + return True + except Exception as e: + last_error = str(e) + logger.warning(f"[Consumer-{self.consumer_name}] Message {message_id} failed on attempt {attempt + 1}: {e}") + if attempt < max_retries: + await asyncio.sleep(1) + else: + logger.error(f"[Consumer-{self.consumer_name}] Message {message_id} failed after {max_retries} retries: {last_error}") + return False +``` + +**死信队列消费者代码:** +```python +class DeadLetterConsumer: + """Dead Letter Queue Consumer""" + + async def process_dead_letter_message(self, message: aio_pika.IncomingMessage): + """Process dead letter message""" + try: + # Parse dead letter message + dead_letter_data = json.loads(message.body.decode('utf-8')) + original_message = dead_letter_data.get('original_message', {}) + error_info = dead_letter_data.get('error_info', 'Unknown') + message_id = dead_letter_data.get('message_id', 'Unknown') + + # Print dead letter message information + logger.error("=" * 50) + logger.error("[Dead Letter Consumer] Received Dead Letter Message:") + logger.error(f"[Dead Letter Consumer] Message ID: {message_id}") + logger.error(f"[Dead Letter Consumer] Message Content: {json.dumps(original_message, ensure_ascii=False, indent=2)}") + logger.error(f"[Dead Letter Consumer] Error Reason: {error_info}") + logger.error("=" * 50) + + # Save to database + await self.save_to_database(original_message, error_info, message_id) + + # Acknowledge dead letter message + await message.ack() + logger.info(f"[Dead Letter Consumer] Dead letter message {message_id} processed") + except Exception as e: + logger.error(f"[Dead Letter Consumer] Failed to process dead letter message: {e}") + await message.nack(requeue=False) +``` + +#### 完整测试代码 + +**测试运行脚本:** +```python +""" +RabbitMQ 可靠消息传递测试模块 +""" + +import asyncio +import logging + +from reliable_mq import ReliableProducer, ReliableConsumer +from reliable_mq.dead_letter_consumer import DeadLetterConsumer +from reliable_mq.config import config + +# 配置日志 +logging.basicConfig( + level=getattr(logging, config.log_level), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +async def run_context_manager_messaging(): + """使用上下文管理器测试可靠消息传递""" + logger.info("=== 使用上下文管理器测试可靠消息传递 ===") + + # 使用异步上下文管理器 + async with ReliableProducer() as producer: + async with ReliableConsumer(consumer_name="context_test_consumer") as consumer: + async with DeadLetterConsumer() as dead_letter_consumer: + # 启动消费者(在后台运行) + consumer_task = asyncio.create_task(consumer.start_consuming()) + dead_letter_task = asyncio.create_task(dead_letter_consumer.start_consuming()) + + # 等待消费者启动 + await asyncio.sleep(1) + + # 发送测试消息 + test_messages = [ + {"content": "重要业务消息1", "type": "business"}, + {"content": "系统通知消息2", "type": "notification"}, + {"content": "用户操作消息3", "type": "user_action"}, + {"content": "重复消息测试", "type": "duplicate_test"}, + {"content": "重复消息测试", "type": "duplicate_test"}, # 重复消息 + {"content": "会失败的消息1", "type": "will_fail"}, # 这些消息会失败并进入死信队列 + {"content": "会失败的消息2", "type": "will_fail"}, + {"content": "会失败的消息3", "type": "will_fail"}, + ] + + for msg in test_messages: + await producer.publish_reliable_message(msg) + await asyncio.sleep(0.5) + + # 等待消息处理完成 + await asyncio.sleep(30) + + # 取消任务 + consumer_task.cancel() + dead_letter_task.cancel() + + +if __name__ == '__main__': + asyncio.run(run_context_manager_messaging()) +``` + +**配置文件:** +```python +""" +RabbitMQ 可靠消息传递配置模块 +""" + +import os +from dataclasses import dataclass +from typing import Dict, Any + + +@dataclass +class Config: + """配置类""" + # RabbitMQ 连接配置 + rabbitmq_uri: str = "amqp://guest:guest@localhost:5673/" + + # 交换器和队列配置 + exchange_name: str = "reliable.exchange" + queue_name: str = "reliable.queue" + + # 死信队列配置 + dead_letter_exchange: str = "reliable.dead.letter.exchange" + dead_letter_queue: str = "reliable.dead.letter.queue" + + # 重试配置 + max_retries: int = 3 + message_ttl: int = 300000 # 5分钟 + + # QoS 配置 + prefetch_count: int = 1 + + # 日志配置 + log_level: str = "INFO" + + def get_connection_config(self) -> Dict[str, Any]: + """获取连接配置""" + return { + 'uri': self.rabbitmq_uri, + 'prefetch_count': self.prefetch_count + } + + def get_dead_letter_config(self) -> Dict[str, str]: + """获取死信队列配置""" + return { + 'dead_letter_exchange': self.dead_letter_exchange, + 'dead_letter_queue': self.dead_letter_queue + } + + +# 全局配置实例 +config = Config() +``` + +**自定义消息处理函数:** +```python +async def default_message_handler(message_data: Dict[str, Any]): + """Default message handler function""" + await asyncio.sleep(1) # Simulate processing time + + message_type = message_data.get('type', '') + content = message_data.get('content', '') + + # Simulate business logic processing + if message_type == 'will_fail': + raise Exception(f"Simulated business processing failure: {content}") + + logger.info(f"[Consumer] Business logic processing completed: {content}") +``` + +#### 测试结果 + +``` +2025-09-07 11:25:02,498 - __main__ - INFO - === 使用上下文管理器测试可靠消息传递 === +2025-09-07 11:25:02,509 - reliable_mq.reliable_producer - INFO - [Producer] Connected, queue: reliable.queue +2025-09-07 11:25:02,513 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Connected, listening to queue: reliable.queue +2025-09-07 11:25:02,518 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Connected + +2025-09-07 11:25:03,523 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Starting to process message: msg_323632.845174041 +2025-09-07 11:25:03,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message content: {'content': '重要业务消息1', 'type': 'business', 'timestamp': '2025-09-07T11:25:03.519240', 'message_id': 'msg_323632.845174041'} +2025-09-07 11:25:03,524 - reliable_mq.reliable_producer - INFO - [Producer] Message sent: msg_323632.845174041 (type: business, content: 重要业务消息1) +2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Business logic processing completed: 重要业务消息1 +2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed successfully +2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed and acknowledged +2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Current processed message count: 1 + +2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ================================================== +2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] 🚫 Duplicate message detected, skipping: +2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message ID: duplicate_090f7015 +2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message content: { + "content": "重复消息测试", + "type": "duplicate_test", + "timestamp": "2025-09-07T11:25:05.546930", + "message_id": "duplicate_090f7015" +} +2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Total processed messages: 4 +2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ================================================== + +2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message msg_323635.377526708 failed after 3 retries: Simulated business processing failure: 会失败的消息1 +2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message sent to dead letter queue: msg_323635.377526708, error: Processing failed + +2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ================================================== +2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Received Dead Letter Message: +2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message ID: msg_323635.377526708 +2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message Content: { + "content": "会失败的消息1", + "type": "will_fail", + "timestamp": "2025-09-07T11:25:06.051557", + "message_id": "msg_323635.377526708" +} +2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Error Reason: Processing failed +2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ================================================== + +2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] 💾 Dead letter message saved to database: msg_323635.377526708 +2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Database Record: { + "id": "msg_323635.377526708", + "original_message": { + "content": "会失败的消息1", + "type": "will_fail", + "timestamp": "2025-09-07T11:25:06.051557", + "message_id": "msg_323635.377526708" + }, + "error_info": "Processing failed", + "created_at": "2025-09-07T11:25:15.064341", + "status": "failed" +} + +2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ================================================== +2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message Statistics: +2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Total Processed: 4 +2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message IDs: ['msg_323632.845174041', 'msg_323633.351571583', 'duplicate_090f7015', 'msg_323633.854272166'] +2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ================================================== +``` + +#### 架构关系图 +![img_5.png](assert/img_5.png) +```mermaid +graph TD + P[ReliableProducer
可靠生产者] --> E[Reliable Exchange
reliable.exchange] + E --> Q[Reliable Queue
reliable.queue] + Q --> C[ReliableConsumer
可靠消费者] + + C -->|处理成功| ACK[Message ACK
消息确认] + C -->|处理失败| RETRY[Retry Logic
重试机制] + RETRY -->|重试成功| ACK + RETRY -->|重试失败| DLQ[Dead Letter Queue
死信队列] + + DLQ --> DLC[DeadLetterConsumer
死信消费者] + DLC --> DB[(Database
数据库)] + + subgraph "可靠性保证" + PERSIST[Message Persistence
消息持久化] + IDEMPOTENT[Idempotency Check
幂等性检查] + CONFIRM[Publisher Confirmation
发布确认] + end + + subgraph "消息流程" + MSG1[正常消息] --> SUCCESS[处理成功] + MSG2[重复消息] --> SKIP[跳过处理] + MSG3[失败消息] --> FAIL[重试后失败] + end + + style P fill:#e1f5fe + style E fill:#f3e5f5 + style Q fill:#e8f5e8 + style C fill:#fff3e0 + style DLQ fill:#ffebee + style DLC fill:#ffebee + style DB fill:#f3e5f5 + style ACK fill:#e8f5e8 + style RETRY fill:#fff3e0 +``` + diff --git a/README_TESTS.md b/README_TESTS.md deleted file mode 100644 index c595462..0000000 --- a/README_TESTS.md +++ /dev/null @@ -1,136 +0,0 @@ -# RabbitMQ Test Suite - -This directory contains comprehensive tests for different RabbitMQ exchange types and patterns. - -## Test Files - -### Individual Test Files - -1. **`run_fanout_test.py`** - Tests Fanout Exchange - - Broadcasts messages to all bound queues - - Demonstrates one-to-many messaging pattern - - Run: `python run_fanout_test.py` - -2. **`run_direct_test.py`** - Tests Direct Exchange - - Routes messages based on exact routing key matches - - Demonstrates selective message routing - - Run: `python run_direct_test.py` - -3. **`run_topic_test.py`** - Tests Topic Exchange - - Routes messages using wildcard patterns (* and #) - - Demonstrates hierarchical message routing - - Run: `python run_topic_test.py` - -4. **`run_multi_queue_test.py`** - Tests Multi-Queue Load Balancing - - Distributes messages across multiple queues - - Demonstrates load balancing and parallel processing - - Run: `python run_multi_queue_test.py` - -### Combined Test File - -5. **`test.py`** - Runs all tests sequentially - - Executes all exchange type tests in order - - Run: `python test.py` - -## Test Features - -### Producer and Consumer Coordination -- Each test starts consumers in the background -- Producers send messages after consumers are ready -- Tests demonstrate real-time message processing -- Automatic cleanup and task cancellation - -### Message Patterns Tested - -#### Fanout Exchange -- **Pattern**: One-to-many broadcasting -- **Queues**: 3 queues (demo.fanout.queue-0, demo.fanout.queue-1, demo.fanout.queue-2) -- **Behavior**: All queues receive every message -- **Use Case**: Notifications, announcements, logging - -#### Direct Exchange -- **Pattern**: Exact routing key matching -- **Queues**: error, warning, info (with debug routing to info) -- **Behavior**: Messages routed based on exact routing key -- **Use Case**: Log level routing, priority-based processing - -#### Topic Exchange -- **Pattern**: Wildcard pattern matching -- **Queues**: Critical, Success, Failed -- **Behavior**: Messages routed using * and # wildcards -- **Use Case**: Hierarchical event routing, microservice communication - -#### Multi-Queue Load Balancing -- **Pattern**: Round-robin distribution -- **Queues**: 3 balanced queues -- **Behavior**: Messages distributed evenly across queues -- **Use Case**: Horizontal scaling, parallel processing - -## Running Tests - -### Prerequisites -- RabbitMQ server running on localhost:5673 -- Python 3.7+ with asyncio support -- Required packages: aio-pika - -### Individual Test Execution -```bash -# Test Fanout Exchange -python run_fanout_test.py - -# Test Direct Exchange -python run_direct_test.py - -# Test Topic Exchange -python run_topic_test.py - -# Test Multi-Queue Load Balancing -python run_multi_queue_test.py -``` - -### Run All Tests -```bash -python test.py -``` - -## Test Output - -Each test provides detailed output showing: -- Consumer startup messages -- Message reception and processing -- Queue routing behavior -- Message persistence status -- Test completion status - -## Configuration - -Tests use the configuration from `config.py`: -- RabbitMQ URI: `amqp://guest:guest@localhost:5673/` -- Exchange and queue naming conventions -- Message persistence settings - -## Architecture - -### Producer Side -- Sets up exchanges and queues -- Publishes test messages with appropriate routing keys -- Handles connection management - -### Consumer Side -- Starts multiple consumers for different queues -- Processes messages with simulated business logic -- Demonstrates concurrent message handling - -### Test Coordination -- Uses asyncio tasks for concurrent execution -- Implements proper startup/shutdown sequences -- Ensures clean resource cleanup - -## Extending Tests - -To add new test scenarios: -1. Create a new test file following the naming pattern `run_xxx_test.py` -2. Import appropriate producer and consumer functions -3. Implement the test logic with proper async/await patterns -4. Add consumer startup, message publishing, and cleanup phases -5. Update this README with the new test description diff --git a/assert/img.png b/assert/img.png new file mode 100644 index 0000000..70f34d9 Binary files /dev/null and b/assert/img.png differ diff --git a/assert/img_1.png b/assert/img_1.png new file mode 100644 index 0000000..767417d Binary files /dev/null and b/assert/img_1.png differ diff --git a/assert/img_2.png b/assert/img_2.png new file mode 100644 index 0000000..e9bb856 Binary files /dev/null and b/assert/img_2.png differ diff --git a/assert/img_3.png b/assert/img_3.png new file mode 100644 index 0000000..38c9d51 Binary files /dev/null and b/assert/img_3.png differ diff --git a/assert/img_4.png b/assert/img_4.png new file mode 100644 index 0000000..df27954 Binary files /dev/null and b/assert/img_4.png differ diff --git a/assert/img_5.png b/assert/img_5.png new file mode 100644 index 0000000..6c4d91a Binary files /dev/null and b/assert/img_5.png differ diff --git a/help.py b/help.py deleted file mode 100644 index 160274e..0000000 --- a/help.py +++ /dev/null @@ -1,75 +0,0 @@ -#!/usr/bin/env python3 -""" -RabbitMQ Test Help -Shows available test commands and usage information. -""" - -import os -import sys - - -def show_help(): - """Display help information for RabbitMQ tests""" - print("🐰 RabbitMQ Test Suite Help") - print("=" * 50) - - print("\n📋 Available Test Files:") - print("-" * 30) - - test_files = [ - ("run_fanout_test.py", "Test Fanout Exchange (broadcast messaging)"), - ("run_direct_test.py", "Test Direct Exchange (routing by key)"), - ("run_topic_test.py", "Test Topic Exchange (wildcard routing)"), - ("run_multi_queue_test.py", "Test Multi-Queue Load Balancing"), - ("test.py", "Run all tests sequentially"), - ("run_all_tests.py", "Run all individual test files with summary") - ] - - for filename, description in test_files: - exists = "✅" if os.path.exists(filename) else "❌" - print(f"{exists} {filename:<25} - {description}") - - print("\n🚀 Usage Examples:") - print("-" * 20) - print("python run_fanout_test.py # Test fanout exchange") - print("python run_direct_test.py # Test direct exchange") - print("python run_topic_test.py # Test topic exchange") - print("python run_multi_queue_test.py # Test load balancing") - print("python test.py # Run all tests") - print("python run_all_tests.py # Run with detailed summary") - - print("\n📖 Test Patterns:") - print("-" * 20) - print("• Fanout: One-to-many broadcasting") - print("• Direct: Exact routing key matching") - print("• Topic: Wildcard pattern matching (* and #)") - print("• Multi: Round-robin load balancing") - - print("\n⚙️ Prerequisites:") - print("-" * 20) - print("• RabbitMQ server running on localhost:5673") - print("• Python 3.7+ with asyncio support") - print("• aio-pika package installed") - - print("\n📁 File Structure:") - print("-" * 20) - print("product/ - Message producers") - print("comsumer/ - Message consumers") - print("config.py - RabbitMQ configuration") - print("run_*.py - Individual test files") - print("test.py - Combined test runner") - - print("\n🔧 Configuration:") - print("-" * 20) - print("Edit config.py to change:") - print("• RabbitMQ connection URI") - print("• Exchange and queue names") - print("• Message persistence settings") - - print("\n📚 Documentation:") - print("-" * 20) - print("See README_TESTS.md for detailed information") - - -if __name__ == "__main__": - show_help() diff --git a/run_all_tests.py b/run_all_tests.py deleted file mode 100644 index e9b68f0..0000000 --- a/run_all_tests.py +++ /dev/null @@ -1,113 +0,0 @@ -#!/usr/bin/env python3 -""" -Run All RabbitMQ Tests -This script runs all individual test files in sequence. -""" - -import asyncio -import sys -import os -import subprocess - -# Add current directory to Python path -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - - -def run_test_file(test_file): - """Run a test file and return success status""" - try: - print(f"\n{'='*60}") - print(f"Running {test_file}") - print(f"{'='*60}") - - result = subprocess.run([sys.executable, test_file], - capture_output=True, - text=True, - timeout=30) - - if result.returncode == 0: - print(f"✅ {test_file} completed successfully") - if result.stdout: - print("Output:") - print(result.stdout) - return True - else: - print(f"❌ {test_file} failed with return code {result.returncode}") - if result.stderr: - print("Error:") - print(result.stderr) - return False - - except subprocess.TimeoutExpired: - print(f"⏰ {test_file} timed out after 30 seconds") - return False - except Exception as e: - print(f"💥 {test_file} failed with exception: {e}") - return False - - -def main(): - """Main function to run all tests""" - print("🚀 Starting RabbitMQ Test Suite") - print("This will run all individual test files in sequence.") - - # List of test files to run - test_files = [ - "run_fanout_test.py", - "run_direct_test.py", - "run_topic_test.py", - "run_multi_queue_test.py" - ] - - # Check if test files exist - missing_files = [] - for test_file in test_files: - if not os.path.exists(test_file): - missing_files.append(test_file) - - if missing_files: - print(f"❌ Missing test files: {missing_files}") - sys.exit(1) - - # Run all tests - results = [] - for test_file in test_files: - success = run_test_file(test_file) - results.append((test_file, success)) - - # Wait between tests - if test_file != test_files[-1]: # Don't wait after last test - print("\n⏳ Waiting 2 seconds before next test...") - import time - time.sleep(2) - - # Print summary - print(f"\n{'='*60}") - print("TEST SUMMARY") - print(f"{'='*60}") - - passed = 0 - failed = 0 - - for test_file, success in results: - status = "✅ PASSED" if success else "❌ FAILED" - print(f"{test_file:<25} {status}") - if success: - passed += 1 - else: - failed += 1 - - print(f"\nTotal: {len(results)} tests") - print(f"Passed: {passed}") - print(f"Failed: {failed}") - - if failed == 0: - print("\n🎉 All tests passed!") - sys.exit(0) - else: - print(f"\n💥 {failed} test(s) failed!") - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/test.py b/test.py deleted file mode 100644 index 657c33e..0000000 --- a/test.py +++ /dev/null @@ -1,165 +0,0 @@ -import asyncio -import sys -import os - -# Add current directory to Python path -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from product.direct_multi_publish import setup_multi_queue_balance, BalancedProducer -from product.direct_publish import setup_direct_exchange, direct_publish -from product.fanout_publish import fanout_publish, setup_fanout_exchange -from product.topic_publish import setup_topic_exchange, topic_publish -from comsumer.direct_consumer import start_all_direct_consumers -from comsumer.fanout_consumer import start_all_fanout_consumers -from comsumer.topic_consumer import start_all_topic_consumers -from comsumer.direct_multi_consumer import start_balanced_consumers - - -async def run_fanout_test(): - """Run fanout exchange test with producer and consumer""" - print("=== Running Fanout Exchange Test ===") - - # Start consumer in background - consumer_task = asyncio.create_task(start_all_fanout_consumers()) - - # Wait for consumer to start - await asyncio.sleep(1) - - # Setup and publish messages - await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-") - await fanout_publish(message="hello world", exchange_name="demo.fanout") - await fanout_publish(message="test message 2", exchange_name="demo.fanout") - await fanout_publish(message="test message 3", exchange_name="demo.fanout") - - # Wait for messages to be processed - await asyncio.sleep(3) - - # Cancel consumer - consumer_task.cancel() - print("✅ Fanout test completed successfully!") - - -async def run_direct_exchange_test(): - """Run direct exchange test with producer and consumer""" - print("=== Running Direct Exchange Test ===") - - # Start consumer in background - consumer_task = asyncio.create_task(start_all_direct_consumers()) - - # Wait for consumer to start - await asyncio.sleep(1) - - # Setup exchange and publish messages - await setup_direct_exchange() - - test_messages = [ - ("System crash, unable to start", "error"), # Route to error queue - ("Disk space insufficient", "warning"), # Route to warning queue - ("User login successful", "info"), # Route to info queue - ("Debug info: Database connection successful", "debug") # Route to info queue - ] - - for msg, routing_key in test_messages: - await direct_publish(msg, routing_key) - await asyncio.sleep(0.5) - - # Wait for messages to be processed - await asyncio.sleep(3) - - # Cancel consumer - consumer_task.cancel() - print("✅ Direct exchange test completed successfully!") - - -async def run_topic_exchange_test(): - """Run topic exchange test with producer and consumer""" - print("=== Running Topic Exchange Test ===") - - # Start consumer in background - consumer_task = asyncio.create_task(start_all_topic_consumers()) - - # Wait for consumer to start - await asyncio.sleep(1) - - # Setup exchange and publish messages - await setup_topic_exchange() - - test_messages = [ - ("Order creation failed (critical error)", "order.create.critical"), - ("User login successful", "user.login.success"), - ("Order payment completed", "order.pay.success"), - ("System crash (critical error)", "system.crash.critical"), - ("User login failed", "user.login.failed"), - ("Normal system log", "system.log.info") # Won't match any binding key, will be discarded - ] - - for msg, routing_key in test_messages: - await topic_publish(msg, routing_key) - await asyncio.sleep(0.5) - - # Wait for messages to be processed - await asyncio.sleep(3) - - # Cancel consumer - consumer_task.cancel() - print("✅ Topic exchange test completed successfully!") - - -async def run_multi_queue_balance_test(): - """Run multi-queue load balancing test with producer and consumer""" - print("=== Running Multi-Queue Load Balancing Test ===") - - queue_count = 3 - - # Start consumer in background - consumer_task = asyncio.create_task(start_balanced_consumers(queue_count=queue_count)) - - # Wait for consumer to start - await asyncio.sleep(1) - - # Setup and publish messages - await setup_multi_queue_balance(queue_count=queue_count) - - producer = BalancedProducer(queue_count=queue_count) - await producer.connect() - - for i in range(10): - await producer.publish(f"Task {i + 1}: Multi-queue load balancing test") - await asyncio.sleep(0.3) - - await producer.close() - - # Wait for messages to be processed - await asyncio.sleep(3) - - # Cancel consumer - consumer_task.cancel() - print("✅ Multi-queue load balancing test completed successfully!") - - -async def main(): - """Main function to run all tests""" - print("🚀 Starting RabbitMQ Tests...") - - try: - # Run all tests - await run_fanout_test() - await asyncio.sleep(1) - - await run_direct_exchange_test() - await asyncio.sleep(1) - - await run_topic_exchange_test() - await asyncio.sleep(1) - - await run_multi_queue_balance_test() - - print("\n🎉 All tests completed successfully!") - - except Exception as e: - print(f"❌ Test failed: {e}") - sys.exit(1) - - -if __name__ == "__main__": - asyncio.run(main())