5.2 KiB
5.2 KiB
RabbitMQ 可靠消息传递模块
这是一个模块化的RabbitMQ可靠消息传递实现,提供了完整的消息可靠性保证机制。
功能特性
- ✅ 消息持久化: 交换器、队列、消息都支持持久化
- ✅ 消息确认机制: 自动和手动确认模式
- ✅ 消息幂等性: 防止重复处理消息
- ✅ 重试机制: 可配置的重试次数和策略
- ✅ 死信队列: 处理失败消息的完整解决方案
- ✅ 批量处理: 支持批量发送和处理消息
- ✅ 多消费者: 支持多个消费者并发处理
- ✅ 异步上下文管理器: 自动管理连接生命周期
- ✅ 详细日志: 完整的日志记录和错误追踪
- ✅ 统计分析: 死信消息的统计分析功能
模块结构
reliable_mq/
├── __init__.py # 模块初始化
├── config.py # 配置管理
├── producer.py # 消息生产者
├── consumer.py # 消息消费者
├── dead_letter.py # 死信队列处理
├── test_reliable_messaging.py # 测试模块
├── example.py # 使用示例
└── README.md # 说明文档
快速开始
1. 基本使用
import asyncio
from reliable_mq import ReliableProducer, ReliableConsumer
async def basic_example():
# 创建生产者和消费者
producer = ReliableProducer()
consumer = ReliableConsumer()
# 连接
await producer.connect()
await consumer.connect()
# 启动消费者
consumer_task = asyncio.create_task(consumer.start_consuming())
# 发送消息
await producer.publish_reliable_message({
"content": "Hello, RabbitMQ!",
"type": "greeting"
})
# 清理资源
await producer.close()
await consumer.close()
consumer_task.cancel()
asyncio.run(basic_example())
2. 使用异步上下文管理器
async def context_manager_example():
async with ReliableProducer() as producer:
async with ReliableConsumer() as consumer:
consumer_task = asyncio.create_task(consumer.start_consuming())
await producer.publish_reliable_message({
"content": "使用上下文管理器",
"type": "example"
})
await asyncio.sleep(2)
consumer_task.cancel()
# 连接会自动关闭
3. 自定义消息处理函数
async def custom_message_handler(message_data):
content = message_data.get('content', '')
msg_type = message_data.get('type', '')
if msg_type == 'email':
print(f"发送邮件: {content}")
elif msg_type == 'sms':
print(f"发送短信: {content}")
else:
print(f"处理消息: {content}")
# 创建带自定义处理函数的消费者
consumer = ReliableConsumer(
consumer_name="custom_consumer",
message_handler=custom_message_handler
)
4. 死信队列处理
from reliable_mq import DeadLetterConsumer
async def dead_letter_example():
# 创建死信队列消费者
dead_letter_consumer = DeadLetterConsumer()
await dead_letter_consumer.connect()
await dead_letter_consumer.start_consuming()
# 死信消息会自动打印并保存到数据库
配置选项
通过环境变量或直接修改 config.py 来配置:
# 环境变量配置
RABBITMQ_URI=amqp://guest:guest@localhost:5673/
RABBITMQ_EXCHANGE=reliable.exchange
RABBITMQ_QUEUE=reliable.queue
RABBITMQ_MAX_RETRIES=3
RABBITMQ_MESSAGE_TTL=300000
RABBITMQ_PREFETCH_COUNT=1
RABBITMQ_LOG_LEVEL=INFO
运行测试
# 运行所有测试
python -m reliable_mq.test_reliable_messaging
# 运行示例
python -m reliable_mq.example
核心机制说明
1. 消息持久化
- 交换器持久化:
durable=True - 队列持久化:
durable=True - 消息持久化:
delivery_mode=PERSISTENT
2. 消息确认机制
- 自动确认:
async with message.process() - 手动确认:
message.ack()/message.nack()
3. 消息幂等性
- 使用消息ID去重
- 业务层幂等性检查
4. 重试机制
- 可配置最大重试次数
- 指数退避重试策略
- 死信队列处理失败消息
5. 死信队列
- 自动创建死信交换器和队列
- 详细的错误信息记录
- 统计分析功能
最佳实践
- 使用异步上下文管理器自动管理连接
- 实现自定义消息处理函数处理特定业务逻辑
- 配置合适的重试次数避免无限重试
- 监控死信队列及时发现和处理问题
- 使用批量处理提高性能
- 设置合理的QoS控制并发处理数量
错误处理
模块提供了完整的错误处理机制:
- 连接失败自动重连
- 消息处理失败自动重试
- 超过重试次数发送到死信队列
- 详细的错误日志记录
- 死信消息统计分析
性能优化
- 使用连接池复用连接
- 批量发送消息减少网络开销
- 设置合适的QoS控制并发
- 异步处理提高吞吐量
- 消息持久化保证可靠性
监控和调试
- 详细的日志记录
- 死信消息统计分析
- 消息处理时间监控
- 错误率统计
- 队列长度监控
许可证
MIT License