""" RabbitMQ 可靠消息传递测试模块 """ import asyncio import logging from reliable_mq import ReliableProducer, ReliableConsumer from reliable_mq.dead_letter_consumer import DeadLetterConsumer from reliable_mq.config import config # 配置日志 logging.basicConfig( level=getattr(logging, config.log_level), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) async def run_context_manager_messaging(): """使用上下文管理器测试可靠消息传递""" logger.info("=== 使用上下文管理器测试可靠消息传递 ===") # 使用异步上下文管理器 async with ReliableProducer() as producer: async with ReliableConsumer(consumer_name="context_test_consumer") as consumer: async with DeadLetterConsumer() as dead_letter_consumer: # 启动消费者(在后台运行) consumer_task = asyncio.create_task(consumer.start_consuming()) dead_letter_task = asyncio.create_task(dead_letter_consumer.start_consuming()) # 等待消费者启动 await asyncio.sleep(1) # 发送测试消息 test_messages = [ {"content": "重要业务消息1", "type": "business"}, {"content": "系统通知消息2", "type": "notification"}, {"content": "用户操作消息3", "type": "user_action"}, {"content": "重复消息测试", "type": "duplicate_test"}, {"content": "重复消息测试", "type": "duplicate_test"}, # 重复消息 {"content": "会失败的消息1", "type": "will_fail"}, # 这些消息会失败并进入死信队列 {"content": "会失败的消息2", "type": "will_fail"}, {"content": "会失败的消息3", "type": "will_fail"}, ] for msg in test_messages: await producer.publish_reliable_message(msg) await asyncio.sleep(0.5) # 等待消息处理完成 await asyncio.sleep(30) # 取消任务 consumer_task.cancel() dead_letter_task.cancel() if __name__ == '__main__': asyncio.run(run_context_manager_messaging())