Go to file
2025-09-07 11:59:09 +08:00
.idea --init 2025-09-07 10:35:24 +08:00
assert --init 2025-09-07 11:59:09 +08:00
comsumer --init 2025-09-07 10:55:19 +08:00
product --init 2025-09-07 10:55:19 +08:00
reliable_mq --init 2025-09-07 10:55:19 +08:00
config.py --init 2025-09-07 10:35:24 +08:00
README.md --init 2025-09-07 11:59:09 +08:00
run_direct_test.py --init 2025-09-07 10:55:19 +08:00
run_fanout_test.py --init 2025-09-07 10:55:19 +08:00
run_multi_queue_test.py --init 2025-09-07 10:55:19 +08:00
run_reliable_messaging.py --init 2025-09-07 10:35:24 +08:00
run_topic_test.py --init 2025-09-07 10:55:19 +08:00

RabbitMQ 培训文档


1. MQ 的基本概念

1.1 什么是MQ

MQ全称为Message Queue即消息队列

  • "消息队列" 是在消息的传输过程中保存消息的容器
  • 它是典型的生产者——消费者模型:生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息
  • 这样的好处:生产者只需要关注发消息,消费者只需要关注收消息,二者没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦

1.2 为什么要使用 MQ?

消息队列Message Queue是一种在分布式系统中用于异步通信的中间件核心作用是通过存储-转发机制实现消息的异步传递,从而解决系统间耦合、流量削峰、异步处理等问题。

主要作用:

1. 解耦系统组件

  • 传统系统中,组件间通常直接调用(如 A 服务直接调用 B 服务),导致强耦合
  • 引入消息队列后A 服务只需向队列发送消息无需关心谁接收或处理B 服务从队列中获取消息即可
  • 双方通过消息格式约定通信,互不依赖,降低了系统耦合度

2. 异步处理,提高效率

  • 同步处理场景中,一个操作可能需要等待多个服务依次完成,总耗时是各步骤之和
  • 消息队列支持异步处理:主流程完成后,只需发送消息到队列,无需等待后续步骤完成即可返回结果
  • 其他服务异步从队列获取消息并处理,显著提升系统响应速度和吞吐量

3. 流量削峰,保护系统

  • 突发流量(如电商秒杀、直播带货)可能瞬间压垮后端服务
  • 消息队列可作为"缓冲池":高峰期请求先进入队列,后端服务按自身处理能力从队列中消费消息

4. 数据同步与分发

  • 同一消息可被多个消费者消费,实现"一次发送,多端处理"
  • 跨系统数据同步,通过消息队列确保数据一致性

5. 重试与容错

  • 若消费者服务临时故障,消息队列会保留消息,待服务恢复后重新投递
  • 配合重试机制,可解决网络波动、服务暂时不可用等问题

典型应用场景:

  • 电商下单:订单创建 → 消息队列 → 库存扣减、支付处理、物流通知等
  • 日志收集:各服务日志发送到队列,由日志系统统一消费、存储、分析
  • 分布式事务:通过消息队列实现最终一致性
  • 延迟任务:如订单超时未支付自动取消

2. RabbitMQ

2.1 介绍

RabbitMQ 是一个开源的消息代理软件(也可称为消息队列中间件),由 Erlang 语言编写,基于 AMQP高级消息队列协议 实现,在分布式系统中用于实现应用程序之间的异步通信和解耦。

特点:

  • 多种协议支持:除了 AMQPRabbitMQ 还支持 STOMP、MQTT 等多种消息协议
  • 高可靠性:通过消息持久化、集群、镜像队列等机制,保证消息不丢失
  • 灵活的路由机制:交换器提供了丰富的消息路由规则
  • 多语言客户端支持:提供了多种编程语言的客户端库
  • 管理界面友好:具备一个可视化的管理界面

2.2 核心组件

生产者 → 信道 → 交换器(Exchange) → 队列(Queue) → 信道 → 消费者

核心组件详解:

1. 生产者Producer

  • 定义:发送消息的应用程序或服务
  • 作用:将业务数据封装为消息,发送到 RabbitMQ 服务器
  • 特点:无需关心消息的最终接收者,只需指定消息发送到哪个交换器

2. 消费者Consumer

  • 定义:接收并处理消息的应用程序或服务
  • 作用:持续监听队列,当有消息到达时,从队列中获取消息并进行业务处理
  • 特点:消费者与队列绑定,可通过自动确认或手动确认机制告知 RabbitMQ 消息是否处理完成

3. 队列Queue

  • 定义:存储消息的容器,是消息的最终落脚点
  • 核心属性:
    • 名称:队列的唯一标识
    • 持久化Durable若为 true队列会在 RabbitMQ 重启后保留
    • 排他性Exclusive若为 true队列仅对当前连接可见
    • 自动删除Auto-delete若为 true当最后一个消费者断开连接后队列自动删除

4. 交换器Exchange

  • 定义:接收生产者发送的消息,并根据路由规则将消息转发到一个或多个队列
  • 作用:类似于"路由器",负责消息的路由逻辑
  • 类型:
    • 直连交换器Direct Exchange:消息的 Routing Key 与队列绑定的 Binding Key 完全匹配
    • 主题交换器Topic Exchange:支持通配符(* 匹配单个单词,# 匹配多个单词)
    • 扇出交换器Fanout Exchange:忽略 Routing Key将消息广播到所有绑定的队列
    • 头交换器Headers Exchange根据消息属性Headers而非 Routing Key 匹配

5. 绑定Binding

  • 定义:交换器与队列之间的关联关系,包含路由规则
  • 作用:告诉交换器"哪些队列需要接收什么样的消息"

6. 连接Connection

  • 定义:生产者/消费者与 RabbitMQ 服务器之间的 TCP 连接
  • 特点:建立 TCP 连接开销较大,因此通常会复用连接

7. 信道Channel

  • 定义:建立在 TCP 连接之上的虚拟连接,是消息传递的实际通道
  • 作用:减少 TCP 连接数量,降低服务器资源消耗

2.3 安装RabbitMQ

带Management页面

docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq:management

不带Management页面

docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq

访问管理页面http://localhost:15673

  • 账号guest
  • 密码guest

2.4 RabbitMQ管理页面

管理页面提供了丰富的监控和管理功能:

  • Overview:服务器概览,包括连接数、队列数、消息速率等
  • Connections:显示所有客户端连接信息
  • Channels:显示各个连接的信道信息
  • Exchanges:管理交换器
  • Queues:管理队列
  • Admin:用户和权限管理

3. Python集成

基于python aio-pika库进行集成

3.1 Fanout Exchange Demo

核心特点:

  • 忽略路由键Routing Key无论设置什么值都会被忽略
  • 消息会广播到所有与之绑定的队列
  • 适合需要 "一对多" 通知的场景(如系统通知、日志广播)

架构关系图:

img_1.png

graph TD
    P[Producer<br/>fanout_publish] --> E[Fanout Exchange<br/>demo.fanout]
    E --> Q1[Queue<br/>demo.fanout.queue-0]
    E --> Q2[Queue<br/>demo.fanout.queue-1]
    E --> Q3[Queue<br/>demo.fanout.queue-2]
    Q1 --> C1[Consumer 1<br/>fanout_consumer_1]
    Q2 --> C2[Consumer 2<br/>fanout_consumer_2]
    Q3 --> C3[Consumer 3<br/>fanout_consumer_3]
    
    style P fill:#e1f5fe
    style E fill:#f3e5f5
    style Q1 fill:#e8f5e8
    style Q2 fill:#e8f5e8
    style Q3 fill:#e8f5e8
    style C1 fill:#fff3e0
    style C2 fill:#fff3e0
    style C3 fill:#fff3e0

生产者代码:

import asyncio
import aio_pika
from config import RABBITMQ_URI

async def setup_fanout_exchange(exchange_name="demo.fanout", queue_name_prefix="demo.fanout.queue-"):
    # 建立连接
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()

    # 1. 声明 Fanout 类型交换器
    fanout_exchange = await channel.declare_exchange(
        exchange_name,
        aio_pika.ExchangeType.FANOUT,
        durable=True  # 交换器持久化
    )

    # 2. 定义需要绑定的队列名称列表
    queue_names = [queue_name_prefix + str(i) for i in range(3)]

    # 3. 循环创建队列并绑定到交换器
    for name in queue_names:
        queue = await channel.declare_queue(
            name,
            durable=True,
            auto_delete=False
        )
        # 绑定队列到 Fanout 交换器(忽略路由键)
        await queue.bind(fanout_exchange, routing_key="")

async def fanout_publish(message: str = "", exchange_name: str = "demo.fanout"):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()

    fanout_exchange = await channel.declare_exchange(
        exchange_name,
        aio_pika.ExchangeType.FANOUT,
        durable=True
    )

    message = aio_pika.Message(
        body=message.encode("utf-8"),
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT  # 消息持久化
    )

    # 发送消息到Fanout交换器
    await fanout_exchange.publish(message, routing_key="")
    await connection.close()

消费者代码:

import asyncio
import aio_pika
from config import RABBITMQ_URI

async def fanout_consumer(queue_name: str, consumer_id: int):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)

    queue = await channel.declare_queue(
        queue_name,
        durable=True,
        auto_delete=False
    )

    async def on_message_received(message: aio_pika.IncomingMessage):
        async with message.process():
            message_content = message.body.decode("utf-8")
            print(f"[Fanout Consumer {consumer_id}] Received broadcast message:")
            print(f"  Listening queue: {queue_name}")
            print(f"  Message content: {message_content}")
            print(f"  Message persistence: {'Yes' if message.delivery_mode == 2 else 'No'}")
            await asyncio.sleep(1)

    consumer_tag = f"fanout_consumer_{consumer_id}_{queue_name}"
    await queue.consume(on_message_received, consumer_tag=consumer_tag)
    print(f"[Fanout Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
    await asyncio.Future()

完整测试代码

测试运行脚本:

#!/usr/bin/env python3
"""
Run Fanout Exchange Test
"""

import asyncio
import sys
import os

# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from product.fanout_publish import fanout_publish, setup_fanout_exchange
from comsumer.fanout_consumer import start_all_fanout_consumers


async def run_fanout_test():
    """Run fanout exchange test with producer and consumer"""
    print("=== Running Fanout Exchange Test ===")
    
    # Start consumer in background
    consumer_task = asyncio.create_task(start_all_fanout_consumers())
    
    # Wait for consumer to start
    await asyncio.sleep(1)
    
    # Setup and publish messages
    await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-")
    await fanout_publish(message="hello world", exchange_name="demo.fanout")
    await fanout_publish(message="test message 2", exchange_name="demo.fanout")
    await fanout_publish(message="test message 3", exchange_name="demo.fanout")
    
    # Wait for messages to be processed
    await asyncio.sleep(3)
    
    # Cancel consumer
    consumer_task.cancel()
    print("✅ Fanout test completed successfully!")


if __name__ == "__main__":
    asyncio.run(run_fanout_test())

测试输出:

=== Running Fanout Exchange Test ===
[Fanout Consumer 2] Started, listening to queue: demo.fanout.queue-1 (tag: fanout_consumer_2_demo.fanout.queue-1)
[Fanout Consumer 1] Started, listening to queue: demo.fanout.queue-0 (tag: fanout_consumer_1_demo.fanout.queue-0)
[Fanout Consumer 3] Started, listening to queue: demo.fanout.queue-2 (tag: fanout_consumer_3_demo.fanout.queue-2)

[Fanout Consumer 2] Received broadcast message:
  Listening queue: demo.fanout.queue-1
  Message content: hello world
  Message persistence: Yes
[Fanout Consumer 1] Received broadcast message:
  Listening queue: demo.fanout.queue-0
  Message content: hello world
  Message persistence: Yes
[Fanout Consumer 3] Received broadcast message:
  Listening queue: demo.fanout.queue-2
  Message content: hello world
  Message persistence: Yes
[Fanout Consumer 3] Received broadcast message:
  Listening queue: demo.fanout.queue-2
  Message content: test message 2
  Message persistence: Yes
[Fanout Consumer 1] Received broadcast message:
  Listening queue: demo.fanout.queue-0
  Message content: test message 2
  Message persistence: Yes
[Fanout Consumer 2] Received broadcast message:
  Listening queue: demo.fanout.queue-1
  Message content: test message 2
  Message persistence: Yes
[Fanout Consumer 1] Received broadcast message:
  Listening queue: demo.fanout.queue-0
  Message content: test message 3
  Message persistence: Yes
[Fanout Consumer 2] Received broadcast message:
  Listening queue: demo.fanout.queue-1
  Message content: test message 3
  Message persistence: Yes
[Fanout Consumer 3] Received broadcast message:
  Listening queue: demo.fanout.queue-2
  Message content: test message 3
  Message persistence: Yes
✅ Fanout test completed successfully!

3.2 Direct Exchange Demo

核心特点:

  • 基于路由键routing key与绑定键binding key的精确匹配
  • 只有当消息的路由键与队列的绑定键完全一致时,消息才会被路由到该队列
  • 适合需要精准路由的场景如日志级别区分error、warning、info 分别路由到不同队列)

架构关系图:

graph TD
    P[Producer<br/>direct_publish] --> E[Direct Exchange<br/>demo.direct]
    E -->|routing_key: error| Q1[Queue<br/>demo.direct.queue-error]
    E -->|routing_key: warning| Q2[Queue<br/>demo.direct.queue-warning]
    E -->|routing_key: info| Q3[Queue<br/>demo.direct.queue-info]
    E -->|routing_key: debug| Q3
    Q1 --> C1[Error Level Consumer<br/>direct_error_level]
    Q2 --> C2[Warning Level Consumer<br/>direct_warning_level]
    Q3 --> C3[Info/Debug Level Consumer<br/>direct_info/debug_level]
    
    style P fill:#e1f5fe
    style E fill:#f3e5f5
    style Q1 fill:#ffebee
    style Q2 fill:#fff3e0
    style Q3 fill:#e8f5e8
    style C1 fill:#ffebee
    style C2 fill:#fff3e0
    style C3 fill:#e8f5e8

生产者代码:

async def setup_direct_exchange(exchange_name="demo.direct", queue_prefix="demo.direct.queue-"):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()

    direct_exchange = await channel.declare_exchange(
        exchange_name,
        aio_pika.ExchangeType.DIRECT,
        durable=True
    )

    # 定义队列及对应的绑定键
    queue_bindings = [
        (f"{queue_prefix}error", ["error"]),      # 处理错误级别的消息
        (f"{queue_prefix}warning", ["warning"]),  # 处理警告级别的消息
        (f"{queue_prefix}info", ["info", "debug"]) # 处理信息和调试级别的消息
    ]

    for queue_name, binding_keys in queue_bindings:
        queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
        for binding_key in binding_keys:
            await queue.bind(direct_exchange, routing_key=binding_key)
        print(f"Queue {queue_name} bound to routing keys: {binding_keys}")

async def direct_publish(message: str, routing_key: str, exchange_name: str = "demo.direct"):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()

    exchange = await channel.declare_exchange(
        exchange_name,
        aio_pika.ExchangeType.DIRECT,
        durable=True
    )

    message_obj = aio_pika.Message(
        body=message.encode("utf-8"),
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT
    )

    await exchange.publish(message_obj, routing_key=routing_key)
    print(f"Message sent: {message} (routing key: {routing_key})")

消费者代码:

async def direct_consumer(queue_name: str, consumer_label: str):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)

    queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)

    async def on_message_received(message: aio_pika.IncomingMessage):
        async with message.process():
            message_content = message.body.decode("utf-8")
            print(f"[{consumer_label} Consumer] Received message:")
            print(f"  Queue name: {queue_name}")
            print(f"  Message content: {message_content}")
            print(f"  Message routing key: {message.routing_key}")
            print(f"  Processing time: {asyncio.get_running_loop().time():.2f}s")

            # 模拟不同级别消息的处理耗时
            if "error" in queue_name:
                await asyncio.sleep(2)
            elif "warning" in queue_name:
                await asyncio.sleep(1)
            elif "info" in queue_name:
                await asyncio.sleep(0.5)

    consumer_tag = f"direct_{consumer_label.lower().replace(' ', '_')}_{queue_name}"
    await queue.consume(on_message_received, consumer_tag=consumer_tag)
    print(f"[{consumer_label} Consumer] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
    await asyncio.Future()

完整测试代码

测试运行脚本:

#!/usr/bin/env python3
"""
Run Direct Exchange Test
"""

import asyncio
import sys
import os

# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from product.direct_publish import setup_direct_exchange, direct_publish
from comsumer.direct_consumer import start_all_direct_consumers


async def run_direct_exchange_test():
    """Run direct exchange test with producer and consumer"""
    print("=== Running Direct Exchange Test ===")
    
    # Start consumer in background
    consumer_task = asyncio.create_task(start_all_direct_consumers())
    
    # Wait for consumer to start
    await asyncio.sleep(1)
    
    # Setup exchange and publish messages
    await setup_direct_exchange()
    
    test_messages = [
        ("System crash, unable to start", "error"),  # Route to error queue
        ("Disk space insufficient", "warning"),  # Route to warning queue
        ("User login successful", "info"),  # Route to info queue
        ("Debug info: Database connection successful", "debug")  # Route to info queue
    ]

    for msg, routing_key in test_messages:
        await direct_publish(msg, routing_key)
        await asyncio.sleep(0.5)
    
    # Wait for messages to be processed
    await asyncio.sleep(3)
    
    # Cancel consumer
    consumer_task.cancel()
    print("✅ Direct exchange test completed successfully!")


if __name__ == "__main__":
    asyncio.run(run_direct_exchange_test())

测试输出:

=== Running Direct Exchange Test ===
[Info/Debug Level Consumer] Started, listening to queue: demo.direct.queue-info (tag: direct_info/debug_level_demo.direct.queue-info)
[Warning Level Consumer] Started, listening to queue: demo.direct.queue-warning (tag: direct_warning_level_demo.direct.queue-warning)
[Error Level Consumer] Started, listening to queue: demo.direct.queue-error (tag: direct_error_level_demo.direct.queue-error)

Queue demo.direct.queue-error bound to routing keys: ['error']
Queue demo.direct.queue-warning bound to routing keys: ['warning']
Queue demo.direct.queue-info bound to routing keys: ['info', 'debug']

[Error Level Consumer] Received message:
  Queue name: demo.direct.queue-error
  Message content: System crash, unable to start
  Message routing key: error
  Processing time: 322774.03s

Message sent: System crash, unable to start (routing key: error)
[Warning Level Consumer] Received message:
  Queue name: demo.direct.queue-warning
  Message content: Disk space insufficient
  Message routing key: warning
  Processing time: 322774.54s

Message sent: Disk space insufficient (routing key: warning)
[Info/Debug Level Consumer] Received message:
  Queue name: demo.direct.queue-info
  Message content: User login successful
  Message routing key: info
  Processing time: 322775.06s

Message sent: User login successful (routing key: info)
[Info/Debug Level Consumer] Received message:
  Queue name: demo.direct.queue-info
  Message content: Debug info: Database connection successful
  Message routing key: debug
  Processing time: 322775.57s

Message sent: Debug info: Database connection successful (routing key: debug)
✅ Direct exchange test completed successfully!

3.3 Direct Exchange Demo (负载均衡)

实现原理:

  1. 创建多个队列:每个队列绑定到同一个 Direct Exchange但使用不同的路由键
  2. 生产者路由策略:通过轮询、随机或按消息特征哈希的方式,选择一个路由键
  3. 消费者处理:每个队列对应一个或多个消费者,各自处理分配到的消息

架构关系图:

graph TD
    P[BalancedProducer<br/>轮询发送] --> E[Direct Exchange<br/>demo.direct.multi.queue]
    E -->|routing_key: route.1| Q1[Queue<br/>task.queue.1]
    E -->|routing_key: route.2| Q2[Queue<br/>task.queue.2]
    E -->|routing_key: route.3| Q3[Queue<br/>task.queue.3]
    Q1 --> C1[Consumer 1<br/>multi_consumer_1]
    Q2 --> C2[Consumer 2<br/>multi_consumer_2]
    Q3 --> C3[Consumer 3<br/>multi_consumer_3]
    
    P -.->|轮询算法| P
    
    style P fill:#e1f5fe
    style E fill:#f3e5f5
    style Q1 fill:#e8f5e8
    style Q2 fill:#e8f5e8
    style Q3 fill:#e8f5e8
    style C1 fill:#fff3e0
    style C2 fill:#fff3e0
    style C3 fill:#fff3e0

生产者代码:

class BalancedProducer:
    def __init__(self, exchange_name="demo.direct.multi.queue", queue_count=3):
        self.exchange_name = exchange_name
        self.queue_count = queue_count
        self.current_index = 0  # 轮询索引

    async def connect(self):
        self.connection = await aio_pika.connect_robust(RABBITMQ_URI)
        self.channel = await self.connection.channel()
        self.exchange = await self.channel.declare_exchange(
            self.exchange_name,
            aio_pika.ExchangeType.DIRECT,
            durable=True
        )

    async def publish(self, message: str):
        # 轮询算法:每次发送后切换到下一个路由键
        self.current_index = (self.current_index + 1) % self.queue_count
        route_key = f"route.{self.current_index + 1}"

        message_obj = aio_pika.Message(
            body=message.encode("utf-8"),
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT
        )

        await self.exchange.publish(message_obj, routing_key=route_key)
        print(f"Message sent: {message} (routed to {route_key})")

消费者代码:

async def queue_consumer(queue_name: str, consumer_id: int):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)

    queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)

    async def on_message(message: aio_pika.IncomingMessage):
        async with message.process():
            content = message.body.decode("utf-8")
            print(f"[Consumer {consumer_id}] Processing message: {content}")
            print(f"[Consumer {consumer_id}] From queue: {queue_name}")
            print(f"[Consumer {consumer_id}] Routing key: {message.routing_key}")
            await asyncio.sleep(1)

    consumer_tag = f"multi_consumer_{consumer_id}_{queue_name}"
    await queue.consume(on_message, consumer_tag=consumer_tag)
    print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
    await asyncio.Future()

完整测试代码

测试运行脚本:

#!/usr/bin/env python3
"""
Run Multi-Queue Load Balancing Test
"""

import asyncio
import sys
import os

# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from product.direct_multi_publish import setup_multi_queue_balance, BalancedProducer
from comsumer.direct_multi_consumer import start_balanced_consumers


async def run_multi_queue_balance_test():
    """Run multi-queue load balancing test with producer and consumer"""
    print("=== Running Multi-Queue Load Balancing Test ===")
    
    queue_count = 3
    
    # Start consumer in background
    consumer_task = asyncio.create_task(start_balanced_consumers(queue_count=queue_count))
    
    # Wait for consumer to start
    await asyncio.sleep(1)
    
    # Setup and publish messages
    await setup_multi_queue_balance(queue_count=queue_count)
    
    producer = BalancedProducer(queue_count=queue_count)
    await producer.connect()
    
    for i in range(10):
        await producer.publish(f"Task {i + 1}: Multi-queue load balancing test")
        await asyncio.sleep(0.3)
    
    await producer.close()
    
    # Wait for messages to be processed
    await asyncio.sleep(3)
    
    # Cancel consumer
    consumer_task.cancel()
    print("✅ Multi-queue load balancing test completed successfully!")


if __name__ == "__main__":
    asyncio.run(run_multi_queue_balance_test())

测试输出:

=== Running Multi-Queue Load Balancing Test ===
[Consumer 2] Started, listening to queue: task.queue.2 (tag: multi_consumer_2_task.queue.2)
[Consumer 3] Started, listening to queue: task.queue.3 (tag: multi_consumer_3_task.queue.3)
[Consumer 1] Started, listening to queue: task.queue.1 (tag: multi_consumer_1_task.queue.1)

Queue task.queue.1 bound to routing key: route.1
Queue task.queue.2 bound to routing key: route.2
Queue task.queue.3 bound to routing key: route.3

[Consumer 2] Processing message: Task 1: Multi-queue load balancing test
[Consumer 2] From queue: task.queue.2
[Consumer 2] Routing key: route.2

Message sent: Task 1: Multi-queue load balancing test (routed to route.2)
[Consumer 3] Processing message: Task 2: Multi-queue load balancing test
[Consumer 3] From queue: task.queue.3
[Consumer 3] Routing key: route.3

Message sent: Task 2: Multi-queue load balancing test (routed to route.3)
[Consumer 1] Processing message: Task 3: Multi-queue load balancing test
[Consumer 1] From queue: task.queue.1
[Consumer 1] Routing key: route.1

Message sent: Task 3: Multi-queue load balancing test (routed to route.1)
Message sent: Task 4: Multi-queue load balancing test (routed to route.2)
[Consumer 2] Processing message: Task 4: Multi-queue load balancing test
[Consumer 2] From queue: task.queue.2
[Consumer 2] Routing key: route.2

Message sent: Task 5: Multi-queue load balancing test (routed to route.3)
[Consumer 3] Processing message: Task 5: Multi-queue load balancing test
[Consumer 3] From queue: task.queue.3
[Consumer 3] Routing key: route.3

Message sent: Task 6: Multi-queue load balancing test (routed to route.1)
[Consumer 1] Processing message: Task 6: Multi-queue load balancing test
[Consumer 1] From queue: task.queue.1
[Consumer 1] Routing key: route.1

Message sent: Task 7: Multi-queue load balancing test (routed to route.2)
[Consumer 2] Processing message: Task 7: Multi-queue load balancing test
[Consumer 2] From queue: task.queue.2
[Consumer 2] Routing key: route.2

Message sent: Task 8: Multi-queue load balancing test (routed to route.3)
[Consumer 3] Processing message: Task 8: Multi-queue load balancing test
[Consumer 3] From queue: task.queue.3
[Consumer 3] Routing key: route.3

Message sent: Task 9: Multi-queue load balancing test (routed to route.1)
[Consumer 1] Processing message: Task 9: Multi-queue load balancing test
[Consumer 1] From queue: task.queue.1
[Consumer 1] Routing key: route.1

Message sent: Task 10: Multi-queue load balancing test (routed to route.2)
[Consumer 2] Processing message: Task 10: Multi-queue load balancing test
[Consumer 2] From queue: task.queue.2
[Consumer 2] Routing key: route.2

✅ Multi-queue load balancing test completed successfully!

3.4 Topic Exchange Demo

核心特性:

  • 路由键采用层级化字符串(用 . 分隔,如 order.create.user
  • 支持两种通配符:
    • *匹配1 个层级(如 user.* 可匹配 user.login 但不匹配 user.login.success
    • #匹配0 个或多个层级(如 order.# 可匹配 order、order.pay、order.pay.success

架构关系图:

graph TD
    P[Producer<br/>topic_publish] --> E[Topic Exchange<br/>demo.topic]
    E -->|binding: #.critical| Q1[Queue<br/>demo.topic.queue-critical]
    E -->|binding: order.#| Q2[Queue<br/>demo.topic.queue-order]
    E -->|binding: user.login.*| Q3[Queue<br/>demo.topic.queue-user.login]
    Q1 --> C1[CriticalHandler<br/>topic_CriticalHandler]
    Q2 --> C2[OrderHandler<br/>topic_OrderHandler]
    Q3 --> C3[UserLoginHandler<br/>topic_UserLoginHandler]
    
    subgraph "路由键示例"
        R1[order.create.critical] -.->|匹配 #.critical 和 order.#| Q1
        R1 -.-> Q2
        R2[user.login.success] -.->|匹配 user.login.*| Q3
        R3[system.log.info] -.->|无匹配| X[消息丢弃]
    end
    
    style P fill:#e1f5fe
    style E fill:#f3e5f5
    style Q1 fill:#ffebee
    style Q2 fill:#e8f5e8
    style Q3 fill:#fff3e0
    style C1 fill:#ffebee
    style C2 fill:#e8f5e8
    style C3 fill:#fff3e0
    style X fill:#f5f5f5

生产者代码:

async def setup_topic_exchange(exchange_name="demo.topic", queue_prefix="demo.topic.queue-"):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()

    topic_exchange = await channel.declare_exchange(
        exchange_name,
        aio_pika.ExchangeType.TOPIC,
        durable=True
    )

    # 定义队列及对应的绑定键(支持通配符)
    queue_bindings = [
        (f"{queue_prefix}critical", ["#.critical"]),      # 匹配任意前缀+critical
        (f"{queue_prefix}order", ["order.#"]),            # 匹配所有order开头的路由键
        (f"{queue_prefix}user.login", ["user.login.*"])   # 匹配user.login+1个后缀
    ]

    for queue_name, binding_keys in queue_bindings:
        queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
        for binding_key in binding_keys:
            await queue.bind(topic_exchange, routing_key=binding_key)
        print(f"Queue {queue_name} bound to routing keys: {binding_keys}")

async def topic_publish(message: str, routing_key: str, exchange_name: str = "demo.topic"):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()

    exchange = await channel.declare_exchange(
        exchange_name,
        aio_pika.ExchangeType.TOPIC,
        durable=True
    )

    message_obj = aio_pika.Message(
        body=message.encode("utf-8"),
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT
    )

    await exchange.publish(message_obj, routing_key=routing_key)
    print(f"Message sent: {message} (routing key: {routing_key})")

消费者代码:

async def topic_consumer(queue_name: str, consumer_id: str):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)

    queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)

    async def on_message(message: aio_pika.IncomingMessage):
        async with message.process():
            message_content = message.body.decode("utf-8")
            print(f"[Consumer {consumer_id}] Received message: {message_content}")
            print(f"[Consumer {consumer_id}] Message routing key: {message.routing_key}")
            print(f"[Consumer {consumer_id}] From queue: {queue_name}")
            await asyncio.sleep(1)

    consumer_tag = f"topic_{consumer_id}_{queue_name}"
    await queue.consume(on_message, consumer_tag=consumer_tag)
    print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
    await asyncio.Future()

完整测试代码

测试运行脚本:

#!/usr/bin/env python3
"""
Run Topic Exchange Test
"""

import asyncio
import sys
import os

# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from product.topic_publish import setup_topic_exchange, topic_publish
from comsumer.topic_consumer import start_all_topic_consumers


async def run_topic_exchange_test():
    """Run topic exchange test with producer and consumer"""
    print("=== Running Topic Exchange Test ===")
    
    # Start consumer in background
    consumer_task = asyncio.create_task(start_all_topic_consumers())
    
    # Wait for consumer to start
    await asyncio.sleep(1)
    
    # Setup exchange and publish messages
    await setup_topic_exchange()
    
    test_messages = [
        ("Order creation failed (critical error)", "order.create.critical"),
        ("User login successful", "user.login.success"),
        ("Order payment completed", "order.pay.success"),
        ("System crash (critical error)", "system.crash.critical"),
        ("User login failed", "user.login.failed"),
        ("Normal system log", "system.log.info")  # Won't match any binding key, will be discarded
    ]

    for msg, routing_key in test_messages:
        await topic_publish(msg, routing_key)
        await asyncio.sleep(0.5)
    
    # Wait for messages to be processed
    await asyncio.sleep(3)
    
    # Cancel consumer
    consumer_task.cancel()
    print("✅ Topic exchange test completed successfully!")


if __name__ == "__main__":
    asyncio.run(run_topic_exchange_test())

测试输出:

=== Running Topic Exchange Test ===
[Consumer CriticalHandler] Started, listening to queue: demo.topic.queue-critical (tag: topic_CriticalHandler_demo.topic.queue-critical)
[Consumer UserLoginHandler] Started, listening to queue: demo.topic.queue-user.login (tag: topic_UserLoginHandler_demo.topic.queue-user.login)
[Consumer OrderHandler] Started, listening to queue: demo.topic.queue-order (tag: topic_OrderHandler_demo.topic.queue-order)

Queue demo.topic.queue-critical bound to routing keys: ['#.critical']
Queue demo.topic.queue-order bound to routing keys: ['order.#']
Queue demo.topic.queue-user.login bound to routing keys: ['user.login.*']

[Consumer OrderHandler] Received message: Order creation failed (critical error)
[Consumer OrderHandler] Message routing key: order.create.critical
[Consumer OrderHandler] From queue: demo.topic.queue-order

[Consumer CriticalHandler] Received message: Order creation failed (critical error)
[Consumer CriticalHandler] Message routing key: order.create.critical
[Consumer CriticalHandler] From queue: demo.topic.queue-critical

Message sent: Order creation failed (critical error) (routing key: order.create.critical)
[Consumer UserLoginHandler] Received message: User login successful
[Consumer UserLoginHandler] Message routing key: user.login.success
[Consumer UserLoginHandler] From queue: demo.topic.queue-user.login

Message sent: User login successful (routing key: user.login.success)
[Consumer OrderHandler] Received message: Order payment completed
[Consumer OrderHandler] Message routing key: order.pay.success
[Consumer OrderHandler] From queue: demo.topic.queue-order

Message sent: Order payment completed (routing key: order.pay.success)
[Consumer CriticalHandler] Received message: System crash (critical error)
[Consumer CriticalHandler] Message routing key: system.crash.critical
[Consumer CriticalHandler] From queue: demo.topic.queue-critical

Message sent: System crash (critical error) (routing key: system.crash.critical)
[Consumer UserLoginHandler] Received message: User login failed
[Consumer UserLoginHandler] Message routing key: user.login.failed
[Consumer UserLoginHandler] From queue: demo.topic.queue-user.login

Message sent: User login failed (routing key: user.login.failed)
Message sent: Normal system log (routing key: system.log.info)
✅ Topic exchange test completed successfully!

3.5 可靠的RabbitMQ生产者消费者

可靠性保证机制

要确保消息消费的可靠性,需要从以下几个方面入手:

1. 消息持久化

  • 交换器持久化:durable=True
  • 队列持久化:durable=True
  • 消息持久化:delivery_mode=PERSISTENT

2. 消息确认机制

  • 自动确认:async with message.process()
  • 手动确认:message.ack() / message.nack()
  • 确保消息处理完成后才确认

3. 消息幂等性

  • 使用消息ID去重
  • 内存中记录已处理的消息ID
  • 防止重复处理相同消息

4. 重试机制

  • 可配置最大重试次数
  • 消费者内部重试,避免消息重新入队
  • 指数退避重试策略

5. 死信队列

  • 处理失败消息的完整解决方案
  • 自动创建死信交换器和队列
  • 详细的错误信息记录

关键代码实现

可靠生产者代码:

class ReliableProducer:
    """Reliable Message Producer"""
    
    def _generate_message_id(self, message_data: Dict[str, Any]) -> str:
        """Generate message ID for message"""
        message_type = message_data.get('type', '')
        content = message_data.get('content', '')
        
        # For duplicate_test type messages, generate fixed ID based on content
        if message_type == 'duplicate_test':
            import hashlib
            content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
            return f"duplicate_{content_hash[:8]}"
        else:
            return f"msg_{asyncio.get_running_loop().time()}"
    
    async def publish_reliable_message(self, message_data: Dict[str, Any]) -> bool:
        """Publish reliable message"""
        try:
            # Generate message ID
            message_id = self._generate_message_id(message_data)
            
            # Add message metadata
            message_data.update({
                'timestamp': datetime.now().isoformat(),
                'message_id': message_id
            })

            # Create persistent message
            message = aio_pika.Message(
                body=json.dumps(message_data, ensure_ascii=False).encode('utf-8'),
                delivery_mode=aio_pika.DeliveryMode.PERSISTENT,  # Message persistence
                message_id=message_id,
                timestamp=datetime.now()
            )

            # Send message and wait for confirmation
            await self.exchange.publish(message, routing_key="reliable")
            logger.info(f"[Producer] Message sent: {message_id}")
            return True
        except Exception as e:
            logger.error(f"[Producer] Failed to send message: {e}")
            return False

可靠消费者代码:

class ReliableConsumer:
    """Reliable Message Consumer"""
    
    def __init__(self, queue_name: Optional[str] = None, consumer_name: Optional[str] = None):
        self.processed_messages: Set[str] = set()  # Store processed message IDs
    
    async def process_message(self, message: aio_pika.IncomingMessage):
        """Core message processing logic"""
        try:
            # Parse message
            message_data = json.loads(message.body.decode('utf-8'))
            message_id = message_data.get('message_id')

            # Check if message has been processed before (idempotency check)
            if message_id in self.processed_messages:
                logger.warning(f"[Consumer-{self.consumer_name}] 🚫 Duplicate message detected, skipping: {message_id}")
                await message.ack()
                return

            logger.info(f"[Consumer-{self.consumer_name}] Starting to process message: {message_id}")

            # Retry processing message directly
            success = await self.retry_process_message(message_data, message_id, 0)

            # Only record processed message ID after successful processing
            if success:
                self.processed_messages.add(message_id)
                await message.ack()
                logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed and acknowledged")
            else:
                # Processing failed, send to dead letter queue
                await self.send_to_dead_letter_queue(message, message_id, "Processing failed")
                await message.ack()
        except Exception as e:
            logger.error(f"[Consumer-{self.consumer_name}] Failed to process message: {e}")
            await message.ack()
    
    async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool:
        """Retry processing message directly"""
        max_retries = config.max_retries
        last_error = None
        
        for attempt in range(max_retries + 1):
            try:
                logger.info(f"[Consumer-{self.consumer_name}] Attempting to process message {message_id}, attempt {attempt + 1}")
                await self.message_handler(message_data)
                logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed successfully")
                return True
            except Exception as e:
                last_error = str(e)
                logger.warning(f"[Consumer-{self.consumer_name}] Message {message_id} failed on attempt {attempt + 1}: {e}")
                if attempt < max_retries:
                    await asyncio.sleep(1)
                else:
                    logger.error(f"[Consumer-{self.consumer_name}] Message {message_id} failed after {max_retries} retries: {last_error}")
                    return False

死信队列消费者代码:

class DeadLetterConsumer:
    """Dead Letter Queue Consumer"""
    
    async def process_dead_letter_message(self, message: aio_pika.IncomingMessage):
        """Process dead letter message"""
        try:
            # Parse dead letter message
            dead_letter_data = json.loads(message.body.decode('utf-8'))
            original_message = dead_letter_data.get('original_message', {})
            error_info = dead_letter_data.get('error_info', 'Unknown')
            message_id = dead_letter_data.get('message_id', 'Unknown')
            
            # Print dead letter message information
            logger.error("=" * 50)
            logger.error("[Dead Letter Consumer] Received Dead Letter Message:")
            logger.error(f"[Dead Letter Consumer] Message ID: {message_id}")
            logger.error(f"[Dead Letter Consumer] Message Content: {json.dumps(original_message, ensure_ascii=False, indent=2)}")
            logger.error(f"[Dead Letter Consumer] Error Reason: {error_info}")
            logger.error("=" * 50)
            
            # Save to database
            await self.save_to_database(original_message, error_info, message_id)
            
            # Acknowledge dead letter message
            await message.ack()
            logger.info(f"[Dead Letter Consumer] Dead letter message {message_id} processed")
        except Exception as e:
            logger.error(f"[Dead Letter Consumer] Failed to process dead letter message: {e}")
            await message.nack(requeue=False)

完整测试代码

测试运行脚本:

"""
RabbitMQ 可靠消息传递测试模块
"""

import asyncio
import logging

from reliable_mq import ReliableProducer, ReliableConsumer
from reliable_mq.dead_letter_consumer import DeadLetterConsumer
from reliable_mq.config import config

# 配置日志
logging.basicConfig(
    level=getattr(logging, config.log_level),
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


async def run_context_manager_messaging():
    """使用上下文管理器测试可靠消息传递"""
    logger.info("=== 使用上下文管理器测试可靠消息传递 ===")

    # 使用异步上下文管理器
    async with ReliableProducer() as producer:
        async with ReliableConsumer(consumer_name="context_test_consumer") as consumer:
            async with DeadLetterConsumer() as dead_letter_consumer:
                # 启动消费者(在后台运行)
                consumer_task = asyncio.create_task(consumer.start_consuming())
                dead_letter_task = asyncio.create_task(dead_letter_consumer.start_consuming())

                # 等待消费者启动
                await asyncio.sleep(1)

                # 发送测试消息
                test_messages = [
                    {"content": "重要业务消息1", "type": "business"},
                    {"content": "系统通知消息2", "type": "notification"},
                    {"content": "用户操作消息3", "type": "user_action"},
                    {"content": "重复消息测试", "type": "duplicate_test"},
                    {"content": "重复消息测试", "type": "duplicate_test"},  # 重复消息
                    {"content": "会失败的消息1", "type": "will_fail"},  # 这些消息会失败并进入死信队列
                    {"content": "会失败的消息2", "type": "will_fail"},
                    {"content": "会失败的消息3", "type": "will_fail"},
                ]

                for msg in test_messages:
                    await producer.publish_reliable_message(msg)
                    await asyncio.sleep(0.5)

                # 等待消息处理完成
                await asyncio.sleep(30)

                # 取消任务
                consumer_task.cancel()
                dead_letter_task.cancel()


if __name__ == '__main__':
    asyncio.run(run_context_manager_messaging())

配置文件:

"""
RabbitMQ 可靠消息传递配置模块
"""

import os
from dataclasses import dataclass
from typing import Dict, Any


@dataclass
class Config:
    """配置类"""
    # RabbitMQ 连接配置
    rabbitmq_uri: str = "amqp://guest:guest@localhost:5673/"
    
    # 交换器和队列配置
    exchange_name: str = "reliable.exchange"
    queue_name: str = "reliable.queue"
    
    # 死信队列配置
    dead_letter_exchange: str = "reliable.dead.letter.exchange"
    dead_letter_queue: str = "reliable.dead.letter.queue"
    
    # 重试配置
    max_retries: int = 3
    message_ttl: int = 300000  # 5分钟
    
    # QoS 配置
    prefetch_count: int = 1
    
    # 日志配置
    log_level: str = "INFO"
    
    def get_connection_config(self) -> Dict[str, Any]:
        """获取连接配置"""
        return {
            'uri': self.rabbitmq_uri,
            'prefetch_count': self.prefetch_count
        }
    
    def get_dead_letter_config(self) -> Dict[str, str]:
        """获取死信队列配置"""
        return {
            'dead_letter_exchange': self.dead_letter_exchange,
            'dead_letter_queue': self.dead_letter_queue
        }


# 全局配置实例
config = Config()

自定义消息处理函数:

async def default_message_handler(message_data: Dict[str, Any]):
    """Default message handler function"""
    await asyncio.sleep(1)  # Simulate processing time
    
    message_type = message_data.get('type', '')
    content = message_data.get('content', '')
    
    # Simulate business logic processing
    if message_type == 'will_fail':
        raise Exception(f"Simulated business processing failure: {content}")
    
    logger.info(f"[Consumer] Business logic processing completed: {content}")

测试结果

2025-09-07 11:25:02,498 - __main__ - INFO - === 使用上下文管理器测试可靠消息传递 ===
2025-09-07 11:25:02,509 - reliable_mq.reliable_producer - INFO - [Producer] Connected, queue: reliable.queue
2025-09-07 11:25:02,513 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Connected, listening to queue: reliable.queue
2025-09-07 11:25:02,518 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Connected

2025-09-07 11:25:03,523 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Starting to process message: msg_323632.845174041
2025-09-07 11:25:03,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message content: {'content': '重要业务消息1', 'type': 'business', 'timestamp': '2025-09-07T11:25:03.519240', 'message_id': 'msg_323632.845174041'}
2025-09-07 11:25:03,524 - reliable_mq.reliable_producer - INFO - [Producer] Message sent: msg_323632.845174041 (type: business, content: 重要业务消息1)
2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Business logic processing completed: 重要业务消息1
2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed successfully
2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed and acknowledged
2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Current processed message count: 1

2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ==================================================
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] 🚫 Duplicate message detected, skipping:
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message ID: duplicate_090f7015
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message content: {
  "content": "重复消息测试",
  "type": "duplicate_test",
  "timestamp": "2025-09-07T11:25:05.546930",
  "message_id": "duplicate_090f7015"
}
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Total processed messages: 4
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ==================================================

2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message msg_323635.377526708 failed after 3 retries: Simulated business processing failure: 会失败的消息1
2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message sent to dead letter queue: msg_323635.377526708, error: Processing failed

2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ==================================================
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Received Dead Letter Message:
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message ID: msg_323635.377526708
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message Content: {
  "content": "会失败的消息1",
  "type": "will_fail",
  "timestamp": "2025-09-07T11:25:06.051557",
  "message_id": "msg_323635.377526708"
}
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Error Reason: Processing failed
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ==================================================

2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] 💾 Dead letter message saved to database: msg_323635.377526708
2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Database Record: {
  "id": "msg_323635.377526708",
  "original_message": {
    "content": "会失败的消息1",
    "type": "will_fail",
    "timestamp": "2025-09-07T11:25:06.051557",
    "message_id": "msg_323635.377526708"
  },
  "error_info": "Processing failed",
  "created_at": "2025-09-07T11:25:15.064341",
  "status": "failed"
}

2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ==================================================
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message Statistics:
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Total Processed: 4
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message IDs: ['msg_323632.845174041', 'msg_323633.351571583', 'duplicate_090f7015', 'msg_323633.854272166']
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ==================================================

架构关系图

img_5.png

graph TD
    P[ReliableProducer<br/>可靠生产者] --> E[Reliable Exchange<br/>reliable.exchange]
    E --> Q[Reliable Queue<br/>reliable.queue]
    Q --> C[ReliableConsumer<br/>可靠消费者]
    
    C -->|处理成功| ACK[Message ACK<br/>消息确认]
    C -->|处理失败| RETRY[Retry Logic<br/>重试机制]
    RETRY -->|重试成功| ACK
    RETRY -->|重试失败| DLQ[Dead Letter Queue<br/>死信队列]
    
    DLQ --> DLC[DeadLetterConsumer<br/>死信消费者]
    DLC --> DB[(Database<br/>数据库)]
    
    subgraph "可靠性保证"
        PERSIST[Message Persistence<br/>消息持久化]
        IDEMPOTENT[Idempotency Check<br/>幂等性检查]
        CONFIRM[Publisher Confirmation<br/>发布确认]
    end
    
    subgraph "消息流程"
        MSG1[正常消息] --> SUCCESS[处理成功]
        MSG2[重复消息] --> SKIP[跳过处理]
        MSG3[失败消息] --> FAIL[重试后失败]
    end
    
    style P fill:#e1f5fe
    style E fill:#f3e5f5
    style Q fill:#e8f5e8
    style C fill:#fff3e0
    style DLQ fill:#ffebee
    style DLC fill:#ffebee
    style DB fill:#f3e5f5
    style ACK fill:#e8f5e8
    style RETRY fill:#fff3e0