rabbitmq-test/reliable_mq/README.md
2025-09-07 10:35:24 +08:00

5.2 KiB
Raw Permalink Blame History

RabbitMQ 可靠消息传递模块

这是一个模块化的RabbitMQ可靠消息传递实现提供了完整的消息可靠性保证机制。

功能特性

  • 消息持久化: 交换器、队列、消息都支持持久化
  • 消息确认机制: 自动和手动确认模式
  • 消息幂等性: 防止重复处理消息
  • 重试机制: 可配置的重试次数和策略
  • 死信队列: 处理失败消息的完整解决方案
  • 批量处理: 支持批量发送和处理消息
  • 多消费者: 支持多个消费者并发处理
  • 异步上下文管理器: 自动管理连接生命周期
  • 详细日志: 完整的日志记录和错误追踪
  • 统计分析: 死信消息的统计分析功能

模块结构

reliable_mq/
├── __init__.py              # 模块初始化
├── config.py                # 配置管理
├── producer.py              # 消息生产者
├── consumer.py              # 消息消费者
├── dead_letter.py           # 死信队列处理
├── test_reliable_messaging.py  # 测试模块
├── example.py               # 使用示例
└── README.md                # 说明文档

快速开始

1. 基本使用

import asyncio
from reliable_mq import ReliableProducer, ReliableConsumer

async def basic_example():
    # 创建生产者和消费者
    producer = ReliableProducer()
    consumer = ReliableConsumer()
    
    # 连接
    await producer.connect()
    await consumer.connect()
    
    # 启动消费者
    consumer_task = asyncio.create_task(consumer.start_consuming())
    
    # 发送消息
    await producer.publish_reliable_message({
        "content": "Hello, RabbitMQ!",
        "type": "greeting"
    })
    
    # 清理资源
    await producer.close()
    await consumer.close()
    consumer_task.cancel()

asyncio.run(basic_example())

2. 使用异步上下文管理器

async def context_manager_example():
    async with ReliableProducer() as producer:
        async with ReliableConsumer() as consumer:
            consumer_task = asyncio.create_task(consumer.start_consuming())
            
            await producer.publish_reliable_message({
                "content": "使用上下文管理器",
                "type": "example"
            })
            
            await asyncio.sleep(2)
            consumer_task.cancel()
    # 连接会自动关闭

3. 自定义消息处理函数

async def custom_message_handler(message_data):
    content = message_data.get('content', '')
    msg_type = message_data.get('type', '')
    
    if msg_type == 'email':
        print(f"发送邮件: {content}")
    elif msg_type == 'sms':
        print(f"发送短信: {content}")
    else:
        print(f"处理消息: {content}")

# 创建带自定义处理函数的消费者
consumer = ReliableConsumer(
    consumer_name="custom_consumer",
    message_handler=custom_message_handler
)

4. 死信队列处理

from reliable_mq import DeadLetterConsumer

async def dead_letter_example():
    # 创建死信队列消费者
    dead_letter_consumer = DeadLetterConsumer()
    
    await dead_letter_consumer.connect()
    await dead_letter_consumer.start_consuming()
    
    # 死信消息会自动打印并保存到数据库

配置选项

通过环境变量或直接修改 config.py 来配置:

# 环境变量配置
RABBITMQ_URI=amqp://guest:guest@localhost:5673/
RABBITMQ_EXCHANGE=reliable.exchange
RABBITMQ_QUEUE=reliable.queue
RABBITMQ_MAX_RETRIES=3
RABBITMQ_MESSAGE_TTL=300000
RABBITMQ_PREFETCH_COUNT=1
RABBITMQ_LOG_LEVEL=INFO

运行测试

# 运行所有测试
python -m reliable_mq.test_reliable_messaging

# 运行示例
python -m reliable_mq.example

核心机制说明

1. 消息持久化

  • 交换器持久化: durable=True
  • 队列持久化: durable=True
  • 消息持久化: delivery_mode=PERSISTENT

2. 消息确认机制

  • 自动确认: async with message.process()
  • 手动确认: message.ack() / message.nack()

3. 消息幂等性

  • 使用消息ID去重
  • 业务层幂等性检查

4. 重试机制

  • 可配置最大重试次数
  • 指数退避重试策略
  • 死信队列处理失败消息

5. 死信队列

  • 自动创建死信交换器和队列
  • 详细的错误信息记录
  • 统计分析功能

最佳实践

  1. 使用异步上下文管理器自动管理连接
  2. 实现自定义消息处理函数处理特定业务逻辑
  3. 配置合适的重试次数避免无限重试
  4. 监控死信队列及时发现和处理问题
  5. 使用批量处理提高性能
  6. 设置合理的QoS控制并发处理数量

错误处理

模块提供了完整的错误处理机制:

  • 连接失败自动重连
  • 消息处理失败自动重试
  • 超过重试次数发送到死信队列
  • 详细的错误日志记录
  • 死信消息统计分析

性能优化

  • 使用连接池复用连接
  • 批量发送消息减少网络开销
  • 设置合适的QoS控制并发
  • 异步处理提高吞吐量
  • 消息持久化保证可靠性

监控和调试

  • 详细的日志记录
  • 死信消息统计分析
  • 消息处理时间监控
  • 错误率统计
  • 队列长度监控

许可证

MIT License