rabbitmq-test/run_reliable_messaging.py
2025-09-07 10:35:24 +08:00

61 lines
2.3 KiB
Python

"""
RabbitMQ 可靠消息传递测试模块
"""
import asyncio
import logging
from reliable_mq import ReliableProducer, ReliableConsumer
from reliable_mq.dead_letter_consumer import DeadLetterConsumer
from reliable_mq.config import config
# 配置日志
logging.basicConfig(
level=getattr(logging, config.log_level),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def run_context_manager_messaging():
"""使用上下文管理器测试可靠消息传递"""
logger.info("=== 使用上下文管理器测试可靠消息传递 ===")
# 使用异步上下文管理器
async with ReliableProducer() as producer:
async with ReliableConsumer(consumer_name="context_test_consumer") as consumer:
async with DeadLetterConsumer() as dead_letter_consumer:
# 启动消费者(在后台运行)
consumer_task = asyncio.create_task(consumer.start_consuming())
dead_letter_task = asyncio.create_task(dead_letter_consumer.start_consuming())
# 等待消费者启动
await asyncio.sleep(1)
# 发送测试消息
test_messages = [
{"content": "重要业务消息1", "type": "business"},
{"content": "系统通知消息2", "type": "notification"},
{"content": "用户操作消息3", "type": "user_action"},
{"content": "重复消息测试", "type": "duplicate_test"},
{"content": "重复消息测试", "type": "duplicate_test"}, # 重复消息
{"content": "会失败的消息1", "type": "will_fail"}, # 这些消息会失败并进入死信队列
{"content": "会失败的消息2", "type": "will_fail"},
{"content": "会失败的消息3", "type": "will_fail"},
]
for msg in test_messages:
await producer.publish_reliable_message(msg)
await asyncio.sleep(0.5)
# 等待消息处理完成
await asyncio.sleep(30)
# 取消任务
consumer_task.cancel()
dead_letter_task.cancel()
if __name__ == '__main__':
asyncio.run(run_context_manager_messaging())