From 951a7378d230fc2ce5cc979b424e505cf18a477b Mon Sep 17 00:00:00 2001 From: icecheng Date: Sun, 7 Sep 2025 10:35:24 +0800 Subject: [PATCH] --init --- .idea/.gitignore | 5 + .../inspectionProfiles/profiles_settings.xml | 6 + .idea/misc.xml | 6 + .idea/modules.xml | 8 + .idea/rabbitmq-test.iml | 8 + .idea/vcs.xml | 6 + comsumer/__init__.py | 0 comsumer/direct_consumer.py | 85 ++++++ comsumer/direct_multi_consumer.py | 66 +++++ comsumer/fanout_consumer.py | 77 ++++++ comsumer/topic_consumer.py | 62 +++++ config.py | 1 + product/__init__.py | 0 product/direct_multi_publish.py | 74 +++++ product/direct_publish.py | 73 +++++ product/fanout_publish.py | 64 +++++ product/topic_publish.py | 76 ++++++ reliable_mq/README.md | 207 ++++++++++++++ reliable_mq/__init__.py | 18 ++ reliable_mq/config.py | 61 +++++ reliable_mq/dead_letter_consumer.py | 128 +++++++++ reliable_mq/reliable_consumer.py | 253 ++++++++++++++++++ reliable_mq/reliable_producer.py | 152 +++++++++++ run_reliable_messaging.py | 60 +++++ test.py | 59 ++++ 25 files changed, 1555 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/rabbitmq-test.iml create mode 100644 .idea/vcs.xml create mode 100644 comsumer/__init__.py create mode 100644 comsumer/direct_consumer.py create mode 100644 comsumer/direct_multi_consumer.py create mode 100644 comsumer/fanout_consumer.py create mode 100644 comsumer/topic_consumer.py create mode 100644 config.py create mode 100644 product/__init__.py create mode 100644 product/direct_multi_publish.py create mode 100644 product/direct_publish.py create mode 100644 product/fanout_publish.py create mode 100644 product/topic_publish.py create mode 100644 reliable_mq/README.md create mode 100644 reliable_mq/__init__.py create mode 100644 reliable_mq/config.py create mode 100644 reliable_mq/dead_letter_consumer.py create mode 100644 reliable_mq/reliable_consumer.py create mode 100644 reliable_mq/reliable_producer.py create mode 100644 run_reliable_messaging.py create mode 100644 test.py diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..b58b603 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,5 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..b70de2b --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..a456678 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/rabbitmq-test.iml b/.idea/rabbitmq-test.iml new file mode 100644 index 0000000..21a12cc --- /dev/null +++ b/.idea/rabbitmq-test.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/comsumer/__init__.py b/comsumer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/comsumer/direct_consumer.py b/comsumer/direct_consumer.py new file mode 100644 index 0000000..713251d --- /dev/null +++ b/comsumer/direct_consumer.py @@ -0,0 +1,85 @@ +import asyncio +import aio_pika +from config import RABBITMQ_URI + + +async def direct_consumer(queue_name: str, consumer_label: str): + """ + Direct Exchange 消费者:监听指定队列,处理对应路由键的消息 + :param queue_name: 要监听的队列名称(与生产者中定义的队列一致) + :param consumer_label: 消费者标签(用于区分不同类型的消息处理器) + """ + # 1. 建立与 RabbitMQ 的连接(robust 模式自动重连,提升稳定性) + connection = await aio_pika.connect_robust(RABBITMQ_URI) + # 2. 创建通信信道(所有操作通过信道执行,减少 TCP 连接开销) + channel = await connection.channel() + + # 3. 开启公平调度:确保消费者处理完 1 条消息后,再接收下 1 条 + # 避免「快消费者空闲、慢消费者堆积」的不均衡问题 + await channel.set_qos(prefetch_count=1) + + # 4. 声明要监听的队列(与生产者中 queue_bindings 定义的队列完全一致) + # 若队列不存在(未执行 setup_direct_exchange),会报错提醒初始化 + queue = await channel.declare_queue( + queue_name, + durable=True, # 与生产者一致:队列持久化(重启不丢失) + auto_delete=False # 队列不自动删除(即使无消费者也保留) + ) + + # 5. 定义消息处理逻辑(核心:根据队列类型处理对应级别消息) + async def on_message_received(message: aio_pika.IncomingMessage): + # async with 上下文:自动完成消息确认(处理完后告知 RabbitMQ 删除消息) + # 若处理过程中崩溃,消息会重新回到队列,避免丢失 + async with message.process(): + # 解码消息体(生产者用 utf-8 编码,此处对应解码) + message_content = message.body.decode("utf-8") + # 打印关键信息(便于调试和日志跟踪) + print(f"[{consumer_label} 消费者] 收到消息:") + print(f" 队列名称:{queue_name}") + print(f" 消息内容:{message_content}") + print(f" 消息路由键:{message.routing_key}") # 验证路由键是否匹配 + print(f" 处理时间:{asyncio.get_running_loop().time():.2f}s\n") + + # 模拟不同级别消息的处理耗时(业务场景可替换为实际逻辑) + if "error" in queue_name: + # 错误消息:可能需要重试、告警,耗时更长 + await asyncio.sleep(2) + elif "warning" in queue_name: + # 警告消息:可能需要记录日志、轻量处理 + await asyncio.sleep(1) + elif "info" in queue_name: + # 信息/调试消息:快速处理,仅记录 + await asyncio.sleep(0.5) + + # 6. 启动队列监听:将消息处理函数绑定到队列 + await queue.consume(on_message_received) + # 打印启动日志,提示用户消费者已就绪 + print(f"[{consumer_label} 消费者] 已启动,正在监听队列:{queue_name}\n") + + # 7. 保持消费者运行(无限期阻塞,直到手动停止程序) + # 若不阻塞,协程会立即结束,消费者会断开连接 + await asyncio.Future() + + +async def start_all_direct_consumers(queue_prefix="demo.direct.queue-"): + """ + 启动所有 Direct Exchange 对应的消费者 + 与生产者 setup_direct_exchange 中的 queue_bindings 完全对应 + """ + # 定义要启动的消费者列表(队列名称 + 消费者标签) + consumers = [ + # 错误队列:处理路由键为 "error" 的消息 + direct_consumer(f"{queue_prefix}error", "错误级别"), + # 警告队列:处理路由键为 "warning" 的消息 + direct_consumer(f"{queue_prefix}warning", "警告级别"), + # 信息队列:处理路由键为 "info" 和 "debug" 的消息 + direct_consumer(f"{queue_prefix}info", "信息/调试级别") + ] + + # 同时启动所有消费者(并发运行,互不阻塞) + await asyncio.gather(*consumers) + + +if __name__ == "__main__": + # 启动所有消费者(需先执行 setup_direct_exchange 初始化队列) + asyncio.run(start_all_direct_consumers()) diff --git a/comsumer/direct_multi_consumer.py b/comsumer/direct_multi_consumer.py new file mode 100644 index 0000000..acee7a6 --- /dev/null +++ b/comsumer/direct_multi_consumer.py @@ -0,0 +1,66 @@ +import asyncio +import aio_pika +from config import RABBITMQ_URI + + +async def queue_consumer(queue_name: str, consumer_id: int): + """ + 单个队列的消费者 + :param queue_name: 要监听的队列名称 + :param consumer_id: 消费者ID,用于区分不同实例 + """ + # 建立连接 + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + # 开启公平调度,确保消息均匀分配 + # 消费者处理完一条消息后才会接收下一条 + await channel.set_qos(prefetch_count=1) + + # 声明要监听的队列(与生产者中创建的队列对应) + queue = await channel.declare_queue( + queue_name, + durable=True, + auto_delete=False + ) + + # 消息处理函数 + async def on_message(message: aio_pika.IncomingMessage): + # 自动确认消息(处理完成后从队列中删除) + async with message.process(): + content = message.body.decode("utf-8") + print(f"【消费者{consumer_id}】处理消息:{content}") + print(f"【消费者{consumer_id}】来自队列:{queue_name}") + print(f"【消费者{consumer_id}】路由键:{message.routing_key}\n") + + # 模拟业务处理耗时(根据实际场景调整) + await asyncio.sleep(1) + + # 开始消费队列消息 + await queue.consume(on_message) + print(f"【消费者{consumer_id}】已启动,正在监听队列:{queue_name}") + + # 保持消费者运行(无限期阻塞) + await asyncio.Future() + + +async def start_balanced_consumers( + queue_prefix="task.queue.", + queue_count=3 +): + """启动多个消费者,每个消费者对应一个队列""" + # 创建消费者任务列表 + consumer_tasks = [] + for i in range(queue_count): + queue_name = f"{queue_prefix}{i + 1}" + # 每个队列对应一个独立的消费者 + task = asyncio.create_task(queue_consumer(queue_name, i + 1)) + consumer_tasks.append(task) + + # 同时运行所有消费者 + await asyncio.gather(*consumer_tasks) + + +if __name__ == "__main__": + # 启动3个消费者,分别对应3个队列 + asyncio.run(start_balanced_consumers(queue_count=3)) diff --git a/comsumer/fanout_consumer.py b/comsumer/fanout_consumer.py new file mode 100644 index 0000000..5063415 --- /dev/null +++ b/comsumer/fanout_consumer.py @@ -0,0 +1,77 @@ +import asyncio +import aio_pika +from config import RABBITMQ_URI + + +async def fanout_consumer(queue_name: str, consumer_id: int): + """ + Fanout Exchange 消费者:监听单个队列,接收 Fanout 广播的消息 + :param queue_name: 要监听的队列名称(与生产者 setup 中创建的队列一致) + :param consumer_id: 消费者标识,区分不同队列的消费者 + """ + # 1. 建立稳健连接(自动重连机制,应对网络波动或 RabbitMQ 重启) + connection = await aio_pika.connect_robust(RABBITMQ_URI) + # 2. 创建通信信道(所有消息操作通过信道执行,减少 TCP 连接开销) + channel = await connection.channel() + + # 3. 开启公平调度:确保消费者处理完 1 条消息后再接收下 1 条 + # 避免单队列内消息堆积(尤其 Fanout 场景下多队列并行处理需均衡) + await channel.set_qos(prefetch_count=1) + + # 4. 声明要监听的队列(与生产者 setup_fanout_exchange 中创建的队列完全一致) + # 若队列未初始化(未执行 setup),会报错提醒先完成交换器和队列创建 + queue = await channel.declare_queue( + queue_name, + durable=True, # 与生产者一致:队列持久化(重启不丢失) + auto_delete=False # 队列不自动删除(即使无消费者也保留,确保广播消息不丢失) + ) + + # 5. 定义消息处理逻辑(Fanout 场景下,同一消息会被所有队列的消费者接收) + async def on_message_received(message: aio_pika.IncomingMessage): + # async with 上下文:自动确认消息(处理完成后告知 RabbitMQ 删除,避免重复消费) + # 若处理崩溃,消息会重新入队,等待消费者重启后重试 + async with message.process(): + # 解码消息体(生产者用 utf-8 编码,此处对应解码) + message_content = message.body.decode("utf-8") + # 打印关键信息(清晰展示 Fanout 广播特性:同一消息多队列接收) + print(f"【Fanout 消费者{consumer_id}】收到广播消息:") + print(f" 监听队列:{queue_name}") + print(f" 消息内容:{message_content}") + print(f" 消息持久化状态:{'是' if message.delivery_mode == 2 else '否'}") # 验证持久化 + # print(f" 处理时间:{asyncio.get_running_loop().time():.2f}s\n") + + # 模拟业务处理耗时(根据实际场景替换,如日志存储、通知推送等) + await asyncio.sleep(1) + + # 6. 启动队列监听:将处理逻辑绑定到队列,持续接收广播消息 + await queue.consume(on_message_received) + # 打印启动日志,提示用户消费者就绪 + print(f"【Fanout 消费者{consumer_id}】已启动,正在监听队列:{queue_name}\n") + + # 7. 保持消费者运行(无限期阻塞,直到手动停止程序) + # 若不阻塞,协程会立即结束,消费者连接会断开 + await asyncio.Future() + + +async def start_all_fanout_consumers(queue_prefix="demo.fanout.queue-", queue_count=3): + """ + 启动所有 Fanout 消费者:与生产者 setup 中创建的 3 个队列一一对应 + :param queue_prefix: 队列名称前缀(与生产者 queue_name_prefix 一致) + :param queue_count: 队列数量(与生产者中 range(3) 对应,默认 3 个) + """ + # 构建消费者任务列表:为每个队列创建一个独立消费者 + consumer_tasks = [ + fanout_consumer( + queue_name=f"{queue_prefix}{i}", # 队列名:demo.fanout.queue-0/1/2(与生产者一致) + consumer_id=i + 1 # 消费者标识:1/2/3 + ) + for i in range(queue_count) + ] + + # 并发启动所有消费者(3 个消费者同时运行,互不阻塞) + await asyncio.gather(*consumer_tasks) + + +if __name__ == "__main__": + # 启动所有 Fanout 消费者(需先执行 setup_fanout_exchange 初始化交换器和队列) + asyncio.run(start_all_fanout_consumers()) diff --git a/comsumer/topic_consumer.py b/comsumer/topic_consumer.py new file mode 100644 index 0000000..83c50c8 --- /dev/null +++ b/comsumer/topic_consumer.py @@ -0,0 +1,62 @@ +import asyncio +import aio_pika + +from config import RABBITMQ_URI + + +async def topic_consumer(queue_name: str, consumer_id: str): + """ + 监听指定队列的消费者 + :param queue_name: 要监听的队列名称 + :param consumer_id: 消费者标识,用于区分不同消费者 + """ + # 建立连接 + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + # 开启公平调度,确保消费者处理完一条消息再接收下一条 + await channel.set_qos(prefetch_count=1) + + # 声明要监听的队列(与生产者中创建的队列保持一致) + queue = await channel.declare_queue( + queue_name, + durable=True, + auto_delete=False + ) + + # 消息处理函数 + async def on_message(message: aio_pika.IncomingMessage): + # 自动确认消息(处理完成后从队列中删除) + async with message.process(): + message_content = message.body.decode("utf-8") + print(f"【消费者{consumer_id}】收到消息: {message_content}") + print(f"【消费者{consumer_id}】消息路由键: {message.routing_key}") + print(f"【消费者{consumer_id}】来自队列: {queue_name}\n") + + # 模拟消息处理耗时(根据实际业务逻辑调整) + await asyncio.sleep(1) + + # 开始消费队列中的消息 + await queue.consume(on_message) + print(f"【消费者{consumer_id}】已启动,正在监听队列: {queue_name}") + + # 保持消费者运行(无限期阻塞) + await asyncio.Future() + + +async def start_all_topic_consumers(queue_prefix="demo.topic.queue-"): + """启动所有Topic Exchange对应的消费者""" + # 与setup_topic_exchange中的队列名称对应 + consumer_tasks = [ + topic_consumer(f"{queue_prefix}critical", "CriticalHandler"), + topic_consumer(f"{queue_prefix}order", "OrderHandler"), + topic_consumer(f"{queue_prefix}user.login", "UserLoginHandler") + ] + + # 同时启动所有消费者 + await asyncio.gather(*consumer_tasks) + + +if __name__ == "__main__": + # 启动消费者 + asyncio.run(start_all_topic_consumers()) diff --git a/config.py b/config.py new file mode 100644 index 0000000..79bd821 --- /dev/null +++ b/config.py @@ -0,0 +1 @@ +RABBITMQ_URI = "amqp://guest:guest@localhost:5673/" diff --git a/product/__init__.py b/product/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/product/direct_multi_publish.py b/product/direct_multi_publish.py new file mode 100644 index 0000000..3e123cc --- /dev/null +++ b/product/direct_multi_publish.py @@ -0,0 +1,74 @@ +import aio_pika +from config import RABBITMQ_URI + + +async def setup_multi_queue_balance( + exchange_name="demo.direct.multi.queue", + queue_prefix="task.queue.", + route_prefix="route.", + queue_count=3 # 3个队列 +): + """创建多个队列,每个队列绑定不同的路由键""" + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + # 声明Direct交换器 + await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.DIRECT, + durable=True + ) + + # 创建N个队列,每个队列绑定独立的路由键 + for i in range(queue_count): + queue_name = f"{queue_prefix}{i + 1}" + route_key = f"{route_prefix}{i + 1}" + + # 声明队列 + queue = await channel.declare_queue( + queue_name, + durable=True, + auto_delete=False + ) + + # 绑定路由键(每个队列对应唯一路由键) + await queue.bind(exchange_name, routing_key=route_key) + print(f"队列 {queue_name} 绑定路由键:{route_key}") + + await connection.close() + + +class BalancedProducer: + def __init__(self, exchange_name="demo.direct.multi.queue", queue_count=3): + self.exchange_name = exchange_name + self.queue_count = queue_count + self.current_index = 0 # 轮询索引 + + async def connect(self): + """建立连接(可复用连接提升性能)""" + self.connection = await aio_pika.connect_robust(RABBITMQ_URI) + self.channel = await self.connection.channel() + self.exchange = await self.channel.declare_exchange( + self.exchange_name, + aio_pika.ExchangeType.DIRECT, + durable=True + ) + + async def publish(self, message: str): + """轮询选择路由键,发送消息到对应的队列""" + # 轮询算法:每次发送后切换到下一个路由键 + self.current_index = (self.current_index + 1) % self.queue_count + route_key = f"route.{self.current_index + 1}" # 对应 route_1/2/3 + + message_obj = aio_pika.Message( + body=message.encode("utf-8"), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ) + + await self.exchange.publish(message_obj, routing_key=route_key) + print(f"已发送消息:{message}(路由到 {route_key})") + + async def close(self): + """关闭连接""" + await self.connection.close() + diff --git a/product/direct_publish.py b/product/direct_publish.py new file mode 100644 index 0000000..3131763 --- /dev/null +++ b/product/direct_publish.py @@ -0,0 +1,73 @@ +import asyncio +import aio_pika + +from config import RABBITMQ_URI + + +async def setup_direct_exchange(exchange_name="demo.direct", queue_prefix="demo.direct.queue-"): + """设置Direct类型交换器并绑定队列""" + # 建立连接 + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + # 1. 声明Direct类型交换器 + direct_exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.DIRECT, + durable=True # 交换器持久化 + ) + + # 2. 定义队列及对应的绑定键(Direct交换器需要精确匹配) + # 格式: (队列名称, 绑定键列表) + queue_bindings = [ + (f"{queue_prefix}error", ["error"]), # 处理错误级别的消息 + (f"{queue_prefix}warning", ["warning"]), # 处理警告级别的消息 + (f"{queue_prefix}info", ["info", "debug"]) # 处理信息和调试级别的消息 + ] + + # 3. 循环创建队列并绑定到交换器 + for queue_name, binding_keys in queue_bindings: + # 声明队列(持久化) + queue = await channel.declare_queue( + queue_name, + durable=True, + auto_delete=False + ) + # 绑定多个路由键到同一个队列 + for binding_key in binding_keys: + await queue.bind( + direct_exchange, + routing_key=binding_key # Direct交换器需要指定绑定键 + ) + print(f"队列 {queue_name} 已绑定路由键: {binding_keys}") + + await connection.close() + + +async def direct_publish(message: str, routing_key: str, exchange_name: str = "demo.direct"): + """向Direct交换器发送消息""" + # 建立连接 + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + # 声明交换器(确保交换器存在) + exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.DIRECT, + durable=True + ) + + # 构建消息对象(持久化) + message_obj = aio_pika.Message( + body=message.encode("utf-8"), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ) + + # 发送消息(指定路由键,Direct交换器会根据路由键匹配队列) + await exchange.publish( + message_obj, + routing_key=routing_key # 路由键决定消息流向哪个队列 + ) + print(f"已发送消息: {message} (路由键: {routing_key})") + + await connection.close() diff --git a/product/fanout_publish.py b/product/fanout_publish.py new file mode 100644 index 0000000..15f138d --- /dev/null +++ b/product/fanout_publish.py @@ -0,0 +1,64 @@ +import asyncio +import aio_pika + +from config import RABBITMQ_URI + + +async def setup_fanout_exchange(exchange_name="demo.fanout", queue_name_prefix="demo.fanout.queue-"): + # 建立连接 + connection = await aio_pika.connect_robust( + RABBITMQ_URI + ) + channel = await connection.channel() + + # 1. 声明 Fanout 类型交换器,不存在就创建,存在就复用 + fanout_exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.FANOUT, + durable=True # 交换器持久化 + ) + + # 2. 定义需要绑定的队列名称列表 + queue_names = [queue_name_prefix + str(i) for i in range(3)] + + # 3. 循环创建队列并绑定到交换器 + for name in queue_names: + # 声明队列(持久化),不存在就创建,存在就复用 + queue = await channel.declare_queue( + name, + durable=True, + auto_delete=False + ) + # 绑定队列到 Fanout 交换器(忽略路由键) + await queue.bind(fanout_exchange, routing_key="") + + +async def fanout_publish(message: str = "", exchange_name: str = "demo.fanout"): + # 建立与RabbitMQ的连接 + connection = await aio_pika.connect_robust( + RABBITMQ_URI + ) + + # 创建通道 + channel = await connection.channel() + + # 声明一个Fanout类型的交换器 + fanout_exchange = await channel.declare_exchange( + exchange_name, # 交换器名称 + aio_pika.ExchangeType.FANOUT, # 交换器类型为FANOUT + durable=True # 持久化交换器 + ) + + # 构建消息对象 + message = aio_pika.Message( + body=message.encode("utf-8"), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT # 消息持久化 + ) + + # 发送消息到Fanout交换器 + # Fanout类型不需要需要指定routing_key,即使指定也会被忽略 + await fanout_exchange.publish( + message, + routing_key="" # 路由键为空 + ) + await connection.close() diff --git a/product/topic_publish.py b/product/topic_publish.py new file mode 100644 index 0000000..3e5f558 --- /dev/null +++ b/product/topic_publish.py @@ -0,0 +1,76 @@ +import asyncio +import aio_pika + +from config import RABBITMQ_URI + + +async def setup_topic_exchange(exchange_name="demo.topic", queue_prefix="demo.topic.queue-"): + """设置Topic类型交换器并绑定队列(支持通配符路由)""" + # 建立连接 + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + # 1. 声明Topic类型交换器 + topic_exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.TOPIC, + durable=True # 交换器持久化 + ) + + # 2. 定义队列及对应的绑定键(Topic交换器支持通配符) + # 格式: (队列名称, 绑定键列表) + # 通配符规则: + # * 匹配恰好1个层级 + # # 匹配0个或多个层级(层级用.分隔) + queue_bindings = [ + (f"{queue_prefix}critical", ["#.critical"]), # 匹配任意前缀+critical + (f"{queue_prefix}order", ["order.#"]), # 匹配所有order开头的路由键 + (f"{queue_prefix}user.login", ["user.login.*"]) # 匹配user.login+1个后缀 + ] + + # 3. 循环创建队列并绑定到交换器 + for queue_name, binding_keys in queue_bindings: + # 声明队列(持久化) + queue = await channel.declare_queue( + queue_name, + durable=True, + auto_delete=False + ) + # 绑定多个通配符路由键到同一个队列 + for binding_key in binding_keys: + await queue.bind( + topic_exchange, + routing_key=binding_key # Topic交换器使用带通配符的绑定键 + ) + print(f"队列 {queue_name} 已绑定路由键: {binding_keys}") + + await connection.close() + + +async def topic_publish(message: str, routing_key: str, exchange_name: str = "demo.topic"): + """向Topic交换器发送消息(使用层级化路由键)""" + # 建立连接 + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + # 声明交换器(确保交换器存在) + exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.TOPIC, + durable=True + ) + + # 构建消息对象(持久化) + message_obj = aio_pika.Message( + body=message.encode("utf-8"), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ) + + # 发送消息(指定层级化路由键,Topic交换器会按通配符匹配队列) + await exchange.publish( + message_obj, + routing_key=routing_key # 路由键为层级化字符串(如"order.create.user") + ) + print(f"已发送消息: {message} (路由键: {routing_key})") + + await connection.close() diff --git a/reliable_mq/README.md b/reliable_mq/README.md new file mode 100644 index 0000000..f4511ee --- /dev/null +++ b/reliable_mq/README.md @@ -0,0 +1,207 @@ +# RabbitMQ 可靠消息传递模块 + +这是一个模块化的RabbitMQ可靠消息传递实现,提供了完整的消息可靠性保证机制。 + +## 功能特性 + +- ✅ **消息持久化**: 交换器、队列、消息都支持持久化 +- ✅ **消息确认机制**: 自动和手动确认模式 +- ✅ **消息幂等性**: 防止重复处理消息 +- ✅ **重试机制**: 可配置的重试次数和策略 +- ✅ **死信队列**: 处理失败消息的完整解决方案 +- ✅ **批量处理**: 支持批量发送和处理消息 +- ✅ **多消费者**: 支持多个消费者并发处理 +- ✅ **异步上下文管理器**: 自动管理连接生命周期 +- ✅ **详细日志**: 完整的日志记录和错误追踪 +- ✅ **统计分析**: 死信消息的统计分析功能 + +## 模块结构 + +``` +reliable_mq/ +├── __init__.py # 模块初始化 +├── config.py # 配置管理 +├── producer.py # 消息生产者 +├── consumer.py # 消息消费者 +├── dead_letter.py # 死信队列处理 +├── test_reliable_messaging.py # 测试模块 +├── example.py # 使用示例 +└── README.md # 说明文档 +``` + +## 快速开始 + +### 1. 基本使用 + +```python +import asyncio +from reliable_mq import ReliableProducer, ReliableConsumer + +async def basic_example(): + # 创建生产者和消费者 + producer = ReliableProducer() + consumer = ReliableConsumer() + + # 连接 + await producer.connect() + await consumer.connect() + + # 启动消费者 + consumer_task = asyncio.create_task(consumer.start_consuming()) + + # 发送消息 + await producer.publish_reliable_message({ + "content": "Hello, RabbitMQ!", + "type": "greeting" + }) + + # 清理资源 + await producer.close() + await consumer.close() + consumer_task.cancel() + +asyncio.run(basic_example()) +``` + +### 2. 使用异步上下文管理器 + +```python +async def context_manager_example(): + async with ReliableProducer() as producer: + async with ReliableConsumer() as consumer: + consumer_task = asyncio.create_task(consumer.start_consuming()) + + await producer.publish_reliable_message({ + "content": "使用上下文管理器", + "type": "example" + }) + + await asyncio.sleep(2) + consumer_task.cancel() + # 连接会自动关闭 +``` + +### 3. 自定义消息处理函数 + +```python +async def custom_message_handler(message_data): + content = message_data.get('content', '') + msg_type = message_data.get('type', '') + + if msg_type == 'email': + print(f"发送邮件: {content}") + elif msg_type == 'sms': + print(f"发送短信: {content}") + else: + print(f"处理消息: {content}") + +# 创建带自定义处理函数的消费者 +consumer = ReliableConsumer( + consumer_name="custom_consumer", + message_handler=custom_message_handler +) +``` + +### 4. 死信队列处理 + +```python +from reliable_mq import DeadLetterConsumer + +async def dead_letter_example(): + # 创建死信队列消费者 + dead_letter_consumer = DeadLetterConsumer() + + await dead_letter_consumer.connect() + await dead_letter_consumer.start_consuming() + + # 死信消息会自动打印并保存到数据库 +``` + +## 配置选项 + +通过环境变量或直接修改 `config.py` 来配置: + +```python +# 环境变量配置 +RABBITMQ_URI=amqp://guest:guest@localhost:5673/ +RABBITMQ_EXCHANGE=reliable.exchange +RABBITMQ_QUEUE=reliable.queue +RABBITMQ_MAX_RETRIES=3 +RABBITMQ_MESSAGE_TTL=300000 +RABBITMQ_PREFETCH_COUNT=1 +RABBITMQ_LOG_LEVEL=INFO +``` + +## 运行测试 + +```bash +# 运行所有测试 +python -m reliable_mq.test_reliable_messaging + +# 运行示例 +python -m reliable_mq.example +``` + +## 核心机制说明 + +### 1. 消息持久化 +- 交换器持久化: `durable=True` +- 队列持久化: `durable=True` +- 消息持久化: `delivery_mode=PERSISTENT` + +### 2. 消息确认机制 +- 自动确认: `async with message.process()` +- 手动确认: `message.ack()` / `message.nack()` + +### 3. 消息幂等性 +- 使用消息ID去重 +- 业务层幂等性检查 + +### 4. 重试机制 +- 可配置最大重试次数 +- 指数退避重试策略 +- 死信队列处理失败消息 + +### 5. 死信队列 +- 自动创建死信交换器和队列 +- 详细的错误信息记录 +- 统计分析功能 + +## 最佳实践 + +1. **使用异步上下文管理器**自动管理连接 +2. **实现自定义消息处理函数**处理特定业务逻辑 +3. **配置合适的重试次数**避免无限重试 +4. **监控死信队列**及时发现和处理问题 +5. **使用批量处理**提高性能 +6. **设置合理的QoS**控制并发处理数量 + +## 错误处理 + +模块提供了完整的错误处理机制: + +- 连接失败自动重连 +- 消息处理失败自动重试 +- 超过重试次数发送到死信队列 +- 详细的错误日志记录 +- 死信消息统计分析 + +## 性能优化 + +- 使用连接池复用连接 +- 批量发送消息减少网络开销 +- 设置合适的QoS控制并发 +- 异步处理提高吞吐量 +- 消息持久化保证可靠性 + +## 监控和调试 + +- 详细的日志记录 +- 死信消息统计分析 +- 消息处理时间监控 +- 错误率统计 +- 队列长度监控 + +## 许可证 + +MIT License diff --git a/reliable_mq/__init__.py b/reliable_mq/__init__.py new file mode 100644 index 0000000..3ac508b --- /dev/null +++ b/reliable_mq/__init__.py @@ -0,0 +1,18 @@ +""" +RabbitMQ 可靠消息传递模块 +""" + +__version__ = "1.0.0" +__author__ = "RabbitMQ Test" + +from .config import RabbitMQConfig +from .reliable_producer import ReliableProducer +from .reliable_consumer import ReliableConsumer +from .dead_letter_consumer import DeadLetterConsumer + +__all__ = [ + 'RabbitMQConfig', + 'ReliableProducer', + 'ReliableConsumer', + 'DeadLetterConsumer' +] diff --git a/reliable_mq/config.py b/reliable_mq/config.py new file mode 100644 index 0000000..b02e2c3 --- /dev/null +++ b/reliable_mq/config.py @@ -0,0 +1,61 @@ +""" +RabbitMQ 配置模块 +""" + +import os +from typing import Dict, Any + + +class RabbitMQConfig: + """RabbitMQ 配置类""" + + def __init__(self): + # 从环境变量或默认值获取配置 + self.uri = os.getenv('RABBITMQ_URI', 'amqp://guest:guest@localhost:5673/') + self.exchange_name = os.getenv('RABBITMQ_EXCHANGE', 'reliable.exchange') + self.queue_name = os.getenv('RABBITMQ_QUEUE', 'reliable.queue') + self.dead_letter_exchange = os.getenv('RABBITMQ_DL_EXCHANGE', 'dead_letter_exchange') + self.dead_letter_queue = os.getenv('RABBITMQ_DL_QUEUE', 'dead_letter_queue') + + # 消息配置 + self.max_retries = int(os.getenv('RABBITMQ_MAX_RETRIES', '3')) + self.message_ttl = int(os.getenv('RABBITMQ_MESSAGE_TTL', '300000')) # 5分钟 + self.prefetch_count = int(os.getenv('RABBITMQ_PREFETCH_COUNT', '1')) + + # 日志配置 + self.log_level = os.getenv('RABBITMQ_LOG_LEVEL', 'INFO') + + def get_connection_config(self) -> Dict[str, Any]: + """获取连接配置""" + return { + 'uri': self.uri, + 'prefetch_count': self.prefetch_count + } + + def get_exchange_config(self) -> Dict[str, Any]: + """获取交换器配置""" + return { + 'exchange_name': self.exchange_name, + 'dead_letter_exchange': self.dead_letter_exchange + } + + def get_queue_config(self) -> Dict[str, Any]: + """获取队列配置""" + return { + 'queue_name': self.queue_name, + 'dead_letter_queue': self.dead_letter_queue, + 'message_ttl': self.message_ttl, + 'max_retries': self.max_retries + } + + def get_dead_letter_config(self) -> Dict[str, Any]: + """获取死信队列配置""" + return { + 'dead_letter_exchange': self.dead_letter_exchange, + 'dead_letter_queue': self.dead_letter_queue, + 'dead_letter_routing_key': 'dead_letter' + } + + +# 全局配置实例 +config = RabbitMQConfig() diff --git a/reliable_mq/dead_letter_consumer.py b/reliable_mq/dead_letter_consumer.py new file mode 100644 index 0000000..7b32e36 --- /dev/null +++ b/reliable_mq/dead_letter_consumer.py @@ -0,0 +1,128 @@ +""" +RabbitMQ 死信队列处理模块 +""" + +import asyncio +import aio_pika +import json +import logging +from datetime import datetime +from typing import Dict, Any + +from .config import config + +logger = logging.getLogger(__name__) + + +class DeadLetterConsumer: + """死信队列消费者""" + + def __init__(self): + """初始化死信队列消费者""" + self.connection = None + self.channel = None + self.dead_letter_queue = None + + async def connect(self): + """建立连接""" + try: + connection_config = config.get_connection_config() + self.connection = await aio_pika.connect_robust(connection_config['uri']) + self.channel = await self.connection.channel() + await self.channel.set_qos(prefetch_count=connection_config['prefetch_count']) + + # 声明死信队列 + dead_letter_config = config.get_dead_letter_config() + self.dead_letter_queue = await self.channel.declare_queue( + dead_letter_config['dead_letter_queue'], + durable=True, + auto_delete=False + ) + + logger.info("[死信消费者] 已连接") + + except Exception as e: + logger.error(f"[死信消费者] 连接失败: {e}") + raise + + async def process_dead_letter_message(self, message: aio_pika.IncomingMessage): + """处理死信消息""" + try: + # 解析死信消息 + dead_letter_data = json.loads(message.body.decode('utf-8')) + original_message = dead_letter_data.get('original_message', {}) + error_info = dead_letter_data.get('error_info', 'Unknown') + message_id = dead_letter_data.get('message_id', 'Unknown') + + # 打印死信消息信息 + logger.error("=" * 50) + logger.error("[死信消费者] 收到死信消息:") + logger.error(f"[死信消费者] 消息ID: {message_id}") + logger.error(f"[死信消费者] 消息内容: {json.dumps(original_message, ensure_ascii=False, indent=2)}") + logger.error(f"[死信消费者] 错误原因: {error_info}") + logger.error("=" * 50) + + # 保存到数据库 + await self.save_to_database(original_message, error_info, message_id) + + # 确认死信消息 + await message.ack() + logger.info(f"[死信消费者] 死信消息 {message_id} 处理完成") + + except Exception as e: + logger.error(f"[死信消费者] 处理死信消息失败: {e}") + await message.nack(requeue=False) # 拒绝重新入队,避免一致失败出现的死循环 + + async def save_to_database(self, original_message: Dict[str, Any], error_info: str, message_id: str): + """保存死信消息到数据库""" + # 模拟数据库保存操作 + await asyncio.sleep(0.5) + + # 构建数据库记录 + db_record = { + 'id': message_id, + 'original_message': original_message, + 'error_info': error_info, + 'created_at': datetime.now().isoformat(), + 'status': 'failed' + } + + logger.info(f"[死信消费者] 💾 死信消息已保存到数据库: {message_id}") + logger.info(f"[死信消费者] 数据库记录: {json.dumps(db_record, ensure_ascii=False, indent=2)}") + + # 这里可以添加实际的数据库操作 + # 例如:await database.insert('dead_letter_messages', db_record) + + async def start_consuming(self): + """开始消费死信消息""" + self.consumer_tag = await self.dead_letter_queue.consume(self.process_dead_letter_message) + logger.info("[死信消费者] 开始消费死信消息...") + + # 保持消费者运行 + await asyncio.Future() + + async def stop_consuming(self): + """停止消费死信消息""" + if self.dead_letter_queue and self.consumer_tag: + await self.dead_letter_queue.cancel(self.consumer_tag) + logger.info("[死信消费者] 已停止消费死信消息") + + async def close(self): + """关闭连接""" + try: + await self.stop_consuming() + if self.connection: + await self.connection.close() + logger.info("[死信消费者] 连接已关闭") + except Exception as e: + logger.error(f"[死信消费者] 关闭连接时出错: {e}") + + async def __aenter__(self): + """异步上下文管理器入口""" + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """异步上下文管理器出口""" + await self.close() + diff --git a/reliable_mq/reliable_consumer.py b/reliable_mq/reliable_consumer.py new file mode 100644 index 0000000..e2dc064 --- /dev/null +++ b/reliable_mq/reliable_consumer.py @@ -0,0 +1,253 @@ +""" +RabbitMQ 可靠消息消费者模块 +""" + +import asyncio +import aio_pika +import json +import logging +from datetime import datetime +from typing import Dict, Any, Optional, Callable, Set + +from .config import config + +logger = logging.getLogger(__name__) + + +class ReliableConsumer: + """可靠消息消费者""" + + def __init__(self, + queue_name: Optional[str] = None, + consumer_name: Optional[str] = None, + message_handler: Optional[Callable] = None): + """ + 初始化消费者 + + Args: + queue_name: 队列名称,默认使用配置中的值 + consumer_name: 消费者名称 + message_handler: 自定义消息处理函数 + """ + self.queue_name = queue_name or config.queue_name + self.consumer_name = consumer_name or "reliable_consumer" + self.message_handler = message_handler or self.default_message_handler + self.connection = None + self.channel = None + self.queue = None + self.processed_messages: Set[str] = set() # 记录已处理的消息ID,防止重复处理 + + async def connect(self): + """建立连接""" + try: + connection_config = config.get_connection_config() + self.connection = await aio_pika.connect_robust(connection_config['uri']) + self.channel = await self.connection.channel() + + # 设置QoS,确保一次只处理一条消息 + await self.channel.set_qos(prefetch_count=connection_config['prefetch_count']) + + # 声明队列(确保队列存在) + self.queue = await self.channel.declare_queue( + self.queue_name, + durable=True, + auto_delete=False + ) + + logger.info(f"[消费者-{self.consumer_name}] 已连接,监听队列: {self.queue_name}") + + except Exception as e: + logger.error(f"[消费者-{self.consumer_name}] 连接失败: {e}") + raise + + async def process_message(self, message: aio_pika.IncomingMessage): + """处理消息的核心逻辑""" + try: + # 解析消息 + message_data = json.loads(message.body.decode('utf-8')) + message_id = message_data.get('message_id') + + # 检查是否已经处理过此消息(幂等性检查) + if message_id in self.processed_messages: + logger.warning("=" * 50) + logger.warning(f"[消费者-{self.consumer_name}] 🚫 检测到重复消息,跳过处理:") + logger.warning(f"[消费者-{self.consumer_name}] 消息ID: {message_id}") + logger.warning(f"[消费者-{self.consumer_name}] 消息内容: {json.dumps(message_data, ensure_ascii=False, indent=2)}") + logger.warning(f"[消费者-{self.consumer_name}] 已处理消息总数: {len(self.processed_messages)}") + logger.warning("=" * 50) + await message.ack() + return + + logger.info(f"[消费者-{self.consumer_name}] 开始处理消息: {message_id}") + logger.info(f"[消费者-{self.consumer_name}] 消息内容: {message_data}") + + # 直接重试处理消息 + success = await self.retry_process_message(message_data, message_id, 0) + + # 只有在处理成功后才记录已处理的消息ID + if success: + self.processed_messages.add(message_id) + # 确认消息 + await message.ack() + logger.info(f"[消费者-{self.consumer_name}] 消息 {message_id} 处理完成并确认") + logger.info(f"[消费者-{self.consumer_name}] 当前已处理消息数量: {len(self.processed_messages)}") + else: + # 处理失败,不记录消息ID,发送到死信队列 + await self.send_to_dead_letter_queue(message, message_id, "处理失败") + await message.ack() # 确认消息以避免无限重试 + + except Exception as e: + logger.error(f"[消费者-{self.consumer_name}] 处理消息失败: {e}") + # 直接发送到死信队列,包含错误信息 + message_data = json.loads(message.body.decode('utf-8')) + message_id = message_data.get('message_id') + await self.send_to_dead_letter_queue(message, message_id, str(e)) + await message.ack() # 确认消息以避免无限重试 + + async def default_message_handler(self, message_data: Dict[str, Any]): + """默认消息处理函数""" + # 模拟处理时间 + await asyncio.sleep(1) + + # 根据消息类型决定是否失败 + message_type = message_data.get('type', '') + + if message_type == 'will_fail': + # 特定类型的消息总是失败,用于测试死信队列 + raise Exception(f"模拟业务处理失败: {message_data.get('content', '')}") + else: + pass + + logger.info(f"[消费者-{self.consumer_name}] 业务逻辑处理完成: {message_data.get('content', '')}") + + async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool: + """直接重试处理消息""" + max_retries = config.max_retries + last_error = None + + for attempt in range(max_retries + 1): + try: + logger.info(f"[消费者-{self.consumer_name}] 尝试处理消息 {message_id},第 {attempt + 1} 次") + await self.message_handler(message_data) + logger.info(f"[消费者-{self.consumer_name}] 消息 {message_id} 处理成功") + return True # 处理成功,返回True + + except Exception as e: + last_error = str(e) + logger.warning(f"[消费者-{self.consumer_name}] 消息 {message_id} 第 {attempt + 1} 次处理失败: {e}") + + if attempt < max_retries: + # 等待一段时间后重试 + await asyncio.sleep(1) + else: + # 所有重试都失败,返回False + logger.error(f"[消费者-{self.consumer_name}] 消息 {message_id} 重试 {max_retries} 次后仍然失败: {last_error}") + return False + + async def send_to_dead_letter_queue(self, message: aio_pika.IncomingMessage, message_id: str, + error_info: str = None): + """发送消息到死信队列""" + try: + # 解析消息内容 + message_data = json.loads(message.body.decode('utf-8')) + + # 构建死信消息,包含原始消息和错误信息 + dead_letter_data = { + 'original_message': message_data, + 'error_info': error_info or '重试失败', + 'dead_letter_timestamp': datetime.now().isoformat(), + 'message_id': message_id, + 'consumer_name': self.consumer_name, + 'queue_name': self.queue_name + } + + logger.error(f"[消费者-{self.consumer_name}] 消息发送到死信队列: {message_id}, 错误: {error_info}") + + # 创建死信交换器和队列 + dead_letter_config = config.get_dead_letter_config() + dead_letter_exchange = await self.channel.declare_exchange( + dead_letter_config['dead_letter_exchange'], + aio_pika.ExchangeType.DIRECT, + durable=True + ) + + dead_letter_queue = await self.channel.declare_queue( + dead_letter_config['dead_letter_queue'], + durable=True, + auto_delete=False + ) + + await dead_letter_queue.bind( + dead_letter_exchange, + routing_key=dead_letter_config['dead_letter_routing_key'] + ) + + # 创建死信消息 + dead_letter_message = aio_pika.Message( + body=json.dumps(dead_letter_data, ensure_ascii=False).encode('utf-8'), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, + message_id=f"dead_letter_{message_id}" + ) + + # 发送到死信队列 + await dead_letter_exchange.publish( + dead_letter_message, + routing_key=dead_letter_config['dead_letter_routing_key'] + ) + + logger.info(f"[消费者-{self.consumer_name}] 消息 {message_id} 已发送到死信队列") + + except Exception as e: + logger.error(f"[消费者-{self.consumer_name}] 发送到死信队列失败: {e}") + logger.error(f"[消费者-{self.consumer_name}] 原始消息内容: {message.body.decode('utf-8') if message.body else 'None'}") + + async def start_consuming(self): + """开始消费消息""" + self.consumer_tag = await self.queue.consume(self.process_message) + logger.info(f"[消费者-{self.consumer_name}] 开始消费消息...") + + # 保持消费者运行 + await asyncio.Future() + + async def stop_consuming(self): + """停止消费消息""" + if self.queue and self.consumer_tag: + await self.queue.cancel(self.consumer_tag) + logger.info(f"[消费者-{self.consumer_name}] 已停止消费消息") + + async def close(self): + """关闭连接""" + try: + await self.stop_consuming() + if self.connection: + await self.connection.close() + logger.info(f"[消费者-{self.consumer_name}] 连接已关闭") + # 打印最终统计信息 + self.print_processed_messages_stats() + except Exception as e: + logger.error(f"[消费者-{self.consumer_name}] 关闭连接时出错: {e}") + + def get_processed_messages_stats(self): + """获取已处理消息的统计信息""" + return { + 'total_processed': len(self.processed_messages), + 'processed_message_ids': list(self.processed_messages) + } + + def print_processed_messages_stats(self): + """打印已处理消息的统计信息""" + stats = self.get_processed_messages_stats() + logger.info("=" * 50) + logger.info(f"[消费者-{self.consumer_name}] 已处理消息统计信息:") + logger.info(f"[消费者-{self.consumer_name}] 总处理数量: {stats['total_processed']}") + logger.info(f"[消费者-{self.consumer_name}] 已处理消息ID列表: {stats['processed_message_ids']}") + logger.info("=" * 50) + + async def __aenter__(self): + """异步上下文管理器入口""" + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """异步上下文管理器出口""" + await self.close() diff --git a/reliable_mq/reliable_producer.py b/reliable_mq/reliable_producer.py new file mode 100644 index 0000000..644b5c3 --- /dev/null +++ b/reliable_mq/reliable_producer.py @@ -0,0 +1,152 @@ +""" +RabbitMQ 可靠消息生产者模块 +""" + +import asyncio +import aio_pika +import json +import logging +from datetime import datetime +from typing import Dict, Any, Optional + +from .config import config + +logger = logging.getLogger(__name__) + + +class ReliableProducer: + """可靠消息生产者""" + + def __init__(self, + exchange_name: Optional[str] = None, + queue_name: Optional[str] = None): + """ + 初始化生产者 + + Args: + exchange_name: 交换器名称,默认使用配置中的值 + queue_name: 队列名称,默认使用配置中的值 + """ + self.exchange_name = exchange_name or config.exchange_name + self.queue_name = queue_name or config.queue_name + self.connection = None + self.channel = None + self.exchange = None + self.queue = None + + async def connect(self): + """建立连接并设置确认机制""" + try: + # 使用 robust 连接,支持自动重连 + connection_config = config.get_connection_config() + self.connection = await aio_pika.connect_robust(connection_config['uri']) + self.channel = await self.connection.channel() + + # 开启发布确认机制 - 确保消息成功发送到队列 + await self.channel.set_qos(prefetch_count=connection_config['prefetch_count']) + + # 声明持久化交换器 + self.exchange = await self.channel.declare_exchange( + self.exchange_name, + aio_pika.ExchangeType.DIRECT, + durable=True # 交换器持久化 + ) + + # 声明持久化队列 + self.queue = await self.channel.declare_queue( + self.queue_name, + durable=True, # 队列持久化 + auto_delete=False, # 队列不自动删除 + ) + + # 绑定队列到交换器 + await self.queue.bind(self.exchange, routing_key="reliable") + + logger.info(f"[生产者] 已连接,队列: {self.queue_name}") + + except Exception as e: + logger.error(f"[生产者] 连接失败: {e}") + raise + + def _generate_message_id(self, message_data: Dict[str, Any]) -> str: + """ + 为消息生成消息ID + 对于 duplicate_test 类型的消息,生成固定的ID用于测试幂等性 + + Args: + message_data: 消息数据字典 + + Returns: + str: 消息ID + """ + message_type = message_data.get('type', '') + content = message_data.get('content', '') + + # 对于 duplicate_test 类型的消息,基于内容生成固定ID + if message_type == 'duplicate_test': + # 使用内容生成固定的消息ID + import hashlib + content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() + return f"duplicate_{content_hash[:8]}" + else: + # 其他消息使用时间戳生成唯一ID + return f"msg_{asyncio.get_running_loop().time()}" + + async def publish_reliable_message(self, message_data: Dict[str, Any]) -> bool: + """ + 发送可靠消息 + + Args: + message_data: 消息数据字典 + + Returns: + bool: 发送是否成功 + """ + try: + # 生成消息ID + message_id = self._generate_message_id(message_data) + + # 添加消息元数据 + message_data.update({ + 'timestamp': datetime.now().isoformat(), + 'message_id': message_id + }) + + # 创建持久化消息 + message = aio_pika.Message( + body=json.dumps(message_data, ensure_ascii=False).encode('utf-8'), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 消息持久化 + message_id=message_id, + timestamp=datetime.now() + ) + + # 发送消息并等待确认 + await self.exchange.publish( + message, + routing_key="reliable" + ) + + logger.info(f"[生产者] 消息已发送: {message_id} (类型: {message_data.get('type', 'N/A')}, 内容: {message_data.get('content', 'N/A')})") + return True + + except Exception as e: + logger.error(f"[生产者] 发送消息失败: {e}") + return False + + async def close(self): + """关闭连接""" + try: + if self.connection: + await self.connection.close() + logger.info("[生产者] 连接已关闭") + except Exception as e: + logger.error(f"[生产者] 关闭连接时出错: {e}") + + async def __aenter__(self): + """异步上下文管理器入口""" + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """异步上下文管理器出口""" + await self.close() diff --git a/run_reliable_messaging.py b/run_reliable_messaging.py new file mode 100644 index 0000000..2087a86 --- /dev/null +++ b/run_reliable_messaging.py @@ -0,0 +1,60 @@ +""" +RabbitMQ 可靠消息传递测试模块 +""" + +import asyncio +import logging + +from reliable_mq import ReliableProducer, ReliableConsumer +from reliable_mq.dead_letter_consumer import DeadLetterConsumer +from reliable_mq.config import config + +# 配置日志 +logging.basicConfig( + level=getattr(logging, config.log_level), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +async def run_context_manager_messaging(): + """使用上下文管理器测试可靠消息传递""" + logger.info("=== 使用上下文管理器测试可靠消息传递 ===") + + # 使用异步上下文管理器 + async with ReliableProducer() as producer: + async with ReliableConsumer(consumer_name="context_test_consumer") as consumer: + async with DeadLetterConsumer() as dead_letter_consumer: + # 启动消费者(在后台运行) + consumer_task = asyncio.create_task(consumer.start_consuming()) + dead_letter_task = asyncio.create_task(dead_letter_consumer.start_consuming()) + + # 等待消费者启动 + await asyncio.sleep(1) + + # 发送测试消息 + test_messages = [ + {"content": "重要业务消息1", "type": "business"}, + {"content": "系统通知消息2", "type": "notification"}, + {"content": "用户操作消息3", "type": "user_action"}, + {"content": "重复消息测试", "type": "duplicate_test"}, + {"content": "重复消息测试", "type": "duplicate_test"}, # 重复消息 + {"content": "会失败的消息1", "type": "will_fail"}, # 这些消息会失败并进入死信队列 + {"content": "会失败的消息2", "type": "will_fail"}, + {"content": "会失败的消息3", "type": "will_fail"}, + ] + + for msg in test_messages: + await producer.publish_reliable_message(msg) + await asyncio.sleep(0.5) + + # 等待消息处理完成 + await asyncio.sleep(30) + + # 取消任务 + consumer_task.cancel() + dead_letter_task.cancel() + + +if __name__ == '__main__': + asyncio.run(run_context_manager_messaging()) diff --git a/test.py b/test.py new file mode 100644 index 0000000..1351608 --- /dev/null +++ b/test.py @@ -0,0 +1,59 @@ +import asyncio + +from product.direct_multi_publish import setup_multi_queue_balance, BalancedProducer +from product.direct_publish import setup_direct_exchange, direct_publish +from product.fanout_publish import fanout_publish, setup_fanout_exchange +from product.topic_publish import setup_topic_exchange, topic_publish + + +async def test_fanout_publish(): + await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-") + await fanout_publish(message="hello world", exchange_name="demo.fanout") + + +async def test_direct_exchange(): + """测试Direct交换器的消息发送""" + # 1. 初始化Direct交换器和队列绑定 + await setup_direct_exchange() + + # 2. 发送不同路由键的测试消息 + test_messages = [ + ("系统崩溃,无法启动", "error"), # 路由到error队列 + ("磁盘空间不足", "warning"), # 路由到warning队列 + ("用户登录成功", "info"), # 路由到info队列 + ("调试信息:数据库连接成功", "debug") # 路由到info队列(因为info队列绑定了debug键) + ] + + for msg, routing_key in test_messages: + await direct_publish(msg, routing_key) + + +async def test_topic_exchange(): + """测试Topic交换器的通配符路由功能""" + # 1. 初始化Topic交换器和队列绑定 + await setup_topic_exchange() + + # 2. 发送不同路由键的测试消息(体现层级化路由) + test_messages = [ + ("订单创建失败(严重错误)", "order.create.critical"), + ("用户登录成功", "user.login.success"), + ("订单支付完成", "order.pay.success"), + ("系统崩溃(严重错误)", "system.crash.critical"), + ("用户登录失败", "user.login.failed"), + ("普通系统日志", "system.log.info") # 不匹配任何绑定键,会被丢弃 + ] + + for msg, routing_key in test_messages: + await topic_publish(msg, routing_key) + + +async def test_multi_queue_balance(): + queue_count = 3 + await setup_multi_queue_balance(queue_count=queue_count) + + producer = BalancedProducer(queue_count=queue_count) + await producer.connect() + for i in range(10): + await producer.publish(f"任务{i + 1}:多队列负载均衡测试") + await asyncio.sleep(0.3) + await producer.close()