diff --git a/README.md b/README.md index dd37e92..4b9e0ec 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,3 @@ -# RabbitMQ 培训文档 - ---- - ## 1. MQ 的基本概念 ### 1.1 什么是MQ? @@ -39,20 +35,13 @@ MQ全称为Message Queue即消息队列 **5. 重试与容错** - 若消费者服务临时故障,消息队列会保留消息,待服务恢复后重新投递 - 配合重试机制,可解决网络波动、服务暂时不可用等问题 - -#### 典型应用场景: -- 电商下单:订单创建 → 消息队列 → 库存扣减、支付处理、物流通知等 -- 日志收集:各服务日志发送到队列,由日志系统统一消费、存储、分析 -- 分布式事务:通过消息队列实现最终一致性 -- 延迟任务:如订单超时未支付自动取消 - --- ## 2. RabbitMQ ### 2.1 介绍 -RabbitMQ 是一个开源的消息代理软件(也可称为消息队列中间件),由 Erlang 语言编写,基于 AMQP(高级消息队列协议) 实现,在分布式系统中用于实现应用程序之间的异步通信和解耦。 +RabbitMQ 是一个开源的消息代理软件(也可称为消息队列中间件),由 Erlang 语言编写,在分布式系统中用于实现应用程序之间的异步通信和解耦。 #### 特点: - **多种协议支持**:除了 AMQP,RabbitMQ 还支持 STOMP、MQTT 等多种消息协议 @@ -62,7 +51,7 @@ RabbitMQ 是一个开源的消息代理软件(也可称为消息队列中间 - **管理界面友好**:具备一个可视化的管理界面 ### 2.2 核心组件 - +![img_6.png](assert/img_6.png) ``` 生产者 → 信道 → 交换器(Exchange) → 队列(Queue) → 信道 → 消费者 ``` @@ -140,6 +129,7 @@ docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq ## 3. Python集成 基于python aio-pika库进行集成 +仓库地址:https://gitea.freeleaps.mathmast.com/icecheng/rabbitmq-test ### 3.1 Fanout Exchange Demo @@ -979,13 +969,12 @@ Message sent: Normal system log (routing key: system.log.info) **3. 消息幂等性** - 使用消息ID去重 -- 内存中记录已处理的消息ID +- 数据库记录已处理的消息ID - 防止重复处理相同消息 **4. 重试机制** - 可配置最大重试次数 - 消费者内部重试,避免消息重新入队 -- 指数退避重试策略 **5. 死信队列** - 处理失败消息的完整解决方案 diff --git a/README_en.md b/README_en.md new file mode 100644 index 0000000..5d2023c --- /dev/null +++ b/README_en.md @@ -0,0 +1,1362 @@ +## 1. Basic Concepts of MQ + +### 1.1 What is MQ? + +MQ stands for Message Queue + +- "Message Queue" is a container that stores messages during the transmission process +- It follows the typical producer-consumer model: producers continuously produce messages to the message queue, consumers continuously retrieve messages from the queue +- Benefits: Producers only need to focus on sending messages, consumers only need to focus on receiving messages, with no business logic intrusion between them, achieving decoupling between producers and consumers + +### 1.2 Why Use MQ? + +Message Queue is a middleware used for asynchronous communication in distributed systems. Its core function is to achieve asynchronous message delivery through a **store-and-forward** mechanism, solving problems such as system coupling, traffic peak shaving, and asynchronous processing. + +#### Main Functions: + +**1. Decouple System Components** +- In traditional systems, components usually call each other directly (e.g., Service A directly calls Service B), resulting in tight coupling +- After introducing message queues, Service A only needs to send messages to the queue without caring who receives or processes them; Service B can get messages from the queue +- Both parties communicate through message format agreements, independent of each other, reducing system coupling + +**2. Asynchronous Processing, Improve Efficiency** +- In synchronous processing scenarios, an operation may need to wait for multiple services to complete sequentially, with total time being the sum of all steps +- Message queues support asynchronous processing: after the main process is completed, only need to send messages to the queue, can return results without waiting for subsequent steps to complete +- Other services asynchronously get messages from the queue and process them, significantly improving system response speed and throughput + +**3. Traffic Peak Shaving, Protect System** +- Sudden traffic (such as e-commerce flash sales, live streaming) may instantly overwhelm backend services +- Message queues can serve as a "buffer pool": peak requests first enter the queue, backend services consume messages from the queue according to their own processing capacity + +**4. Data Synchronization and Distribution** +- The same message can be consumed by multiple consumers, achieving "send once, process multiple" +- Cross-system data synchronization, ensuring data consistency through message queues + +**5. Retry and Fault Tolerance** +- If consumer services temporarily fail, message queues will retain messages and redeliver them after service recovery +- Combined with retry mechanisms, can solve problems such as network fluctuations and temporary service unavailability +--- + +## 2. RabbitMQ + +### 2.1 Introduction + +RabbitMQ is an open-source message broker software (also called message queue middleware), written in Erlang, used in distributed systems to implement asynchronous communication and decoupling between applications. + +#### Features: +- **Multiple Protocol Support**: Besides AMQP, RabbitMQ also supports STOMP, MQTT and other message protocols +- **High Reliability**: Ensures no message loss through message persistence, clustering, mirrored queues and other mechanisms +- **Flexible Routing Mechanism**: Exchanges provide rich message routing rules +- **Multi-language Client Support**: Provides client libraries for multiple programming languages +- **Friendly Management Interface**: Has a visual management interface + +### 2.2 Core Components +![img_6.png](assert/img_6.png) +``` +Producer → Channel → Exchange → Queue → Channel → Consumer +``` + +#### Core Component Details: + +**1. Producer** +- Definition: Application or service that sends messages +- Function: Encapsulate business data into messages and send to RabbitMQ server +- Features: No need to care about the final receiver, only need to specify which exchange to send messages to + +**2. Consumer** +- Definition: Application or service that receives and processes messages +- Function: Continuously monitor queues, when messages arrive, get messages from queue and perform business processing +- Features: Consumers are bound to queues, can inform RabbitMQ whether message processing is complete through automatic or manual acknowledgment mechanisms + +**3. Queue** +- Definition: Container that stores messages, the final destination of messages +- Core Properties: + - Name: Unique identifier of the queue + - Durable: If true, queue will be retained after RabbitMQ restart + - Exclusive: If true, queue is only visible to current connection + - Auto-delete: If true, queue is automatically deleted when the last consumer disconnects + +**4. Exchange** +- Definition: Receives messages sent by producers and forwards them to one or more queues according to routing rules +- Function: Similar to a "router", responsible for message routing logic +- Types: + - **Direct Exchange**: Message's Routing Key exactly matches the queue's Binding Key + - **Topic Exchange**: Supports wildcards (* matches single word, # matches multiple words) + - **Fanout Exchange**: Ignores Routing Key, broadcasts messages to all bound queues + - **Headers Exchange**: Matches based on message properties (Headers) rather than Routing Key + +**5. Binding** +- Definition: Association relationship between exchange and queue, containing routing rules +- Function: Tells exchange "which queues need to receive what kind of messages" + +**6. Connection** +- Definition: TCP connection between producer/consumer and RabbitMQ server +- Features: TCP connection establishment has high overhead, so connections are usually reused + +**7. Channel** +- Definition: Virtual connection built on top of TCP connection, the actual channel for message delivery +- Function: Reduces number of TCP connections, lowers server resource consumption + +### 2.3 Install RabbitMQ + +#### With Management Page +```bash +docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq:management +``` + +#### Without Management Page +```bash +docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq +``` + +**Access Management Page**: http://localhost:15673 +- Username: guest +- Password: guest + +### 2.4 RabbitMQ Management Page + +The management page provides rich monitoring and management functions: + +- **Overview**: Server overview, including connection count, queue count, message rate, etc. +- **Connections**: Display all client connection information +- **Channels**: Display channel information for each connection +- **Exchanges**: Manage exchanges +- **Queues**: Manage queues +- **Admin**: User and permission management + +--- + +## 3. Python Integration + +- Based on python aio-pika library for integration +- repo:https://gitea.freeleaps.mathmast.com/icecheng/rabbitmq-test +### 3.1 Fanout Exchange Demo + +#### Core Features: +- Ignores routing key (Routing Key), whatever value is set will be ignored +- Messages will be broadcast to all queues bound to it +- Suitable for scenarios requiring "one-to-many" notifications (such as system notifications, log broadcasting) + +#### Architecture Diagram: +![img_1.png](assert/img_1.png) +```mermaid +graph TD + P[Producer
fanout_publish] --> E[Fanout Exchange
demo.fanout] + E --> Q1[Queue
demo.fanout.queue-0] + E --> Q2[Queue
demo.fanout.queue-1] + E --> Q3[Queue
demo.fanout.queue-2] + Q1 --> C1[Consumer 1
fanout_consumer_1] + Q2 --> C2[Consumer 2
fanout_consumer_2] + Q3 --> C3[Consumer 3
fanout_consumer_3] + + style P fill:#e1f5fe + style E fill:#f3e5f5 + style Q1 fill:#e8f5e8 + style Q2 fill:#e8f5e8 + style Q3 fill:#e8f5e8 + style C1 fill:#fff3e0 + style C2 fill:#fff3e0 + style C3 fill:#fff3e0 +``` + +#### Producer Code: +```python +import asyncio +import aio_pika +from config import RABBITMQ_URI + +async def setup_fanout_exchange(exchange_name="demo.fanout", queue_name_prefix="demo.fanout.queue-"): + # Establish connection + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + # 1. Declare Fanout type exchange + fanout_exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.FANOUT, + durable=True # Exchange persistence + ) + + # 2. Define list of queue names to bind + queue_names = [queue_name_prefix + str(i) for i in range(3)] + + # 3. Loop to create queues and bind to exchange + for name in queue_names: + queue = await channel.declare_queue( + name, + durable=True, + auto_delete=False + ) + # Bind queue to Fanout exchange (ignore routing key) + await queue.bind(fanout_exchange, routing_key="") + +async def fanout_publish(message: str = "", exchange_name: str = "demo.fanout"): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + fanout_exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.FANOUT, + durable=True + ) + + message = aio_pika.Message( + body=message.encode("utf-8"), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT # Message persistence + ) + + # Send message to Fanout exchange + await fanout_exchange.publish(message, routing_key="") + await connection.close() +``` + +#### Consumer Code: +```python +import asyncio +import aio_pika +from config import RABBITMQ_URI + +async def fanout_consumer(queue_name: str, consumer_id: int): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + await channel.set_qos(prefetch_count=1) + + queue = await channel.declare_queue( + queue_name, + durable=True, + auto_delete=False + ) + + async def on_message_received(message: aio_pika.IncomingMessage): + async with message.process(): + message_content = message.body.decode("utf-8") + print(f"[Fanout Consumer {consumer_id}] Received broadcast message:") + print(f" Listening queue: {queue_name}") + print(f" Message content: {message_content}") + print(f" Message persistence: {'Yes' if message.delivery_mode == 2 else 'No'}") + await asyncio.sleep(1) + + consumer_tag = f"fanout_consumer_{consumer_id}_{queue_name}" + await queue.consume(on_message_received, consumer_tag=consumer_tag) + print(f"[Fanout Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") + await asyncio.Future() +``` + +#### Complete Test Code + +**Test Run Script:** +```python +#!/usr/bin/env python3 +""" +Run Fanout Exchange Test +""" + +import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from product.fanout_publish import fanout_publish, setup_fanout_exchange +from comsumer.fanout_consumer import start_all_fanout_consumers + + +async def run_fanout_test(): + """Run fanout exchange test with producer and consumer""" + print("=== Running Fanout Exchange Test ===") + + # Start consumer in background + consumer_task = asyncio.create_task(start_all_fanout_consumers()) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup and publish messages + await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-") + await fanout_publish(message="hello world", exchange_name="demo.fanout") + await fanout_publish(message="test message 2", exchange_name="demo.fanout") + await fanout_publish(message="test message 3", exchange_name="demo.fanout") + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Fanout test completed successfully!") + + +if __name__ == "__main__": + asyncio.run(run_fanout_test()) +``` + +#### Test Output: +``` +=== Running Fanout Exchange Test === +[Fanout Consumer 2] Started, listening to queue: demo.fanout.queue-1 (tag: fanout_consumer_2_demo.fanout.queue-1) +[Fanout Consumer 1] Started, listening to queue: demo.fanout.queue-0 (tag: fanout_consumer_1_demo.fanout.queue-0) +[Fanout Consumer 3] Started, listening to queue: demo.fanout.queue-2 (tag: fanout_consumer_3_demo.fanout.queue-2) + +[Fanout Consumer 2] Received broadcast message: + Listening queue: demo.fanout.queue-1 + Message content: hello world + Message persistence: Yes +[Fanout Consumer 1] Received broadcast message: + Listening queue: demo.fanout.queue-0 + Message content: hello world + Message persistence: Yes +[Fanout Consumer 3] Received broadcast message: + Listening queue: demo.fanout.queue-2 + Message content: hello world + Message persistence: Yes +[Fanout Consumer 3] Received broadcast message: + Listening queue: demo.fanout.queue-2 + Message content: test message 2 + Message persistence: Yes +[Fanout Consumer 1] Received broadcast message: + Listening queue: demo.fanout.queue-0 + Message content: test message 2 + Message persistence: Yes +[Fanout Consumer 2] Received broadcast message: + Listening queue: demo.fanout.queue-1 + Message content: test message 2 + Message persistence: Yes +[Fanout Consumer 1] Received broadcast message: + Listening queue: demo.fanout.queue-0 + Message content: test message 3 + Message persistence: Yes +[Fanout Consumer 2] Received broadcast message: + Listening queue: demo.fanout.queue-1 + Message content: test message 3 + Message persistence: Yes +[Fanout Consumer 3] Received broadcast message: + Listening queue: demo.fanout.queue-2 + Message content: test message 3 + Message persistence: Yes +✅ Fanout test completed successfully! +``` + +### 3.2 Direct Exchange Demo + +#### Core Features: +- Based on exact matching between routing key and binding key +- Messages are only routed to a queue when the message's routing key exactly matches the queue's binding key +- Suitable for scenarios requiring precise routing (such as log level distinction: error, warning, info routed to different queues) + +#### Architecture Diagram: + +```mermaid +graph TD + P[Producer
direct_publish] --> E[Direct Exchange
demo.direct] + E -->|routing_key: error| Q1[Queue
demo.direct.queue-error] + E -->|routing_key: warning| Q2[Queue
demo.direct.queue-warning] + E -->|routing_key: info| Q3[Queue
demo.direct.queue-info] + E -->|routing_key: debug| Q3 + Q1 --> C1[Error Level Consumer
direct_error_level] + Q2 --> C2[Warning Level Consumer
direct_warning_level] + Q3 --> C3[Info/Debug Level Consumer
direct_info/debug_level] + + style P fill:#e1f5fe + style E fill:#f3e5f5 + style Q1 fill:#ffebee + style Q2 fill:#fff3e0 + style Q3 fill:#e8f5e8 + style C1 fill:#ffebee + style C2 fill:#fff3e0 + style C3 fill:#e8f5e8 +``` + +#### Producer Code: +```python +async def setup_direct_exchange(exchange_name="demo.direct", queue_prefix="demo.direct.queue-"): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + direct_exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.DIRECT, + durable=True + ) + + # Define queues and corresponding binding keys + queue_bindings = [ + (f"{queue_prefix}error", ["error"]), # Handle error level messages + (f"{queue_prefix}warning", ["warning"]), # Handle warning level messages + (f"{queue_prefix}info", ["info", "debug"]) # Handle info and debug level messages + ] + + for queue_name, binding_keys in queue_bindings: + queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) + for binding_key in binding_keys: + await queue.bind(direct_exchange, routing_key=binding_key) + print(f"Queue {queue_name} bound to routing keys: {binding_keys}") + +async def direct_publish(message: str, routing_key: str, exchange_name: str = "demo.direct"): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.DIRECT, + durable=True + ) + + message_obj = aio_pika.Message( + body=message.encode("utf-8"), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ) + + await exchange.publish(message_obj, routing_key=routing_key) + print(f"Message sent: {message} (routing key: {routing_key})") +``` + +#### Consumer Code: +```python +async def direct_consumer(queue_name: str, consumer_label: str): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + await channel.set_qos(prefetch_count=1) + + queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) + + async def on_message_received(message: aio_pika.IncomingMessage): + async with message.process(): + message_content = message.body.decode("utf-8") + print(f"[{consumer_label} Consumer] Received message:") + print(f" Queue name: {queue_name}") + print(f" Message content: {message_content}") + print(f" Message routing key: {message.routing_key}") + print(f" Processing time: {asyncio.get_running_loop().time():.2f}s") + + # Simulate different processing times for different level messages + if "error" in queue_name: + await asyncio.sleep(2) + elif "warning" in queue_name: + await asyncio.sleep(1) + elif "info" in queue_name: + await asyncio.sleep(0.5) + + consumer_tag = f"direct_{consumer_label.lower().replace(' ', '_')}_{queue_name}" + await queue.consume(on_message_received, consumer_tag=consumer_tag) + print(f"[{consumer_label} Consumer] Started, listening to queue: {queue_name} (tag: {consumer_tag})") + await asyncio.Future() +``` + +#### Complete Test Code + +**Test Run Script:** +```python +#!/usr/bin/env python3 +""" +Run Direct Exchange Test +""" + +import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from product.direct_publish import setup_direct_exchange, direct_publish +from comsumer.direct_consumer import start_all_direct_consumers + + +async def run_direct_exchange_test(): + """Run direct exchange test with producer and consumer""" + print("=== Running Direct Exchange Test ===") + + # Start consumer in background + consumer_task = asyncio.create_task(start_all_direct_consumers()) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup exchange and publish messages + await setup_direct_exchange() + + test_messages = [ + ("System crash, unable to start", "error"), # Route to error queue + ("Disk space insufficient", "warning"), # Route to warning queue + ("User login successful", "info"), # Route to info queue + ("Debug info: Database connection successful", "debug") # Route to info queue + ] + + for msg, routing_key in test_messages: + await direct_publish(msg, routing_key) + await asyncio.sleep(0.5) + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Direct exchange test completed successfully!") + + +if __name__ == "__main__": + asyncio.run(run_direct_exchange_test()) +``` + +#### Test Output: +``` +=== Running Direct Exchange Test === +[Info/Debug Level Consumer] Started, listening to queue: demo.direct.queue-info (tag: direct_info/debug_level_demo.direct.queue-info) +[Warning Level Consumer] Started, listening to queue: demo.direct.queue-warning (tag: direct_warning_level_demo.direct.queue-warning) +[Error Level Consumer] Started, listening to queue: demo.direct.queue-error (tag: direct_error_level_demo.direct.queue-error) + +Queue demo.direct.queue-error bound to routing keys: ['error'] +Queue demo.direct.queue-warning bound to routing keys: ['warning'] +Queue demo.direct.queue-info bound to routing keys: ['info', 'debug'] + +[Error Level Consumer] Received message: + Queue name: demo.direct.queue-error + Message content: System crash, unable to start + Message routing key: error + Processing time: 322774.03s + +Message sent: System crash, unable to start (routing key: error) +[Warning Level Consumer] Received message: + Queue name: demo.direct.queue-warning + Message content: Disk space insufficient + Message routing key: warning + Processing time: 322774.54s + +Message sent: Disk space insufficient (routing key: warning) +[Info/Debug Level Consumer] Received message: + Queue name: demo.direct.queue-info + Message content: User login successful + Message routing key: info + Processing time: 322775.06s + +Message sent: User login successful (routing key: info) +[Info/Debug Level Consumer] Received message: + Queue name: demo.direct.queue-info + Message content: Debug info: Database connection successful + Message routing key: debug + Processing time: 322775.57s + +Message sent: Debug info: Database connection successful (routing key: debug) +✅ Direct exchange test completed successfully! +``` + +### 3.3 Direct Exchange Demo (Load Balancing) + +#### Implementation Principle: +1. Create multiple queues: Each queue is bound to the same Direct Exchange but uses different routing keys +2. Producer routing strategy: Select a routing key through round-robin, random, or hash based on message characteristics +3. Consumer processing: Each queue corresponds to one or more consumers, each processing assigned messages + +#### Architecture Diagram: +```mermaid +graph TD + P[BalancedProducer
Round-robin sending] --> E[Direct Exchange
demo.direct.multi.queue] + E -->|routing_key: route.1| Q1[Queue
task.queue.1] + E -->|routing_key: route.2| Q2[Queue
task.queue.2] + E -->|routing_key: route.3| Q3[Queue
task.queue.3] + Q1 --> C1[Consumer 1
multi_consumer_1] + Q2 --> C2[Consumer 2
multi_consumer_2] + Q3 --> C3[Consumer 3
multi_consumer_3] + + P -.->|Round-robin algorithm| P + + style P fill:#e1f5fe + style E fill:#f3e5f5 + style Q1 fill:#e8f5e8 + style Q2 fill:#e8f5e8 + style Q3 fill:#e8f5e8 + style C1 fill:#fff3e0 + style C2 fill:#fff3e0 + style C3 fill:#fff3e0 +``` + +#### Producer Code: +```python +class BalancedProducer: + def __init__(self, exchange_name="demo.direct.multi.queue", queue_count=3): + self.exchange_name = exchange_name + self.queue_count = queue_count + self.current_index = 0 # Round-robin index + + async def connect(self): + self.connection = await aio_pika.connect_robust(RABBITMQ_URI) + self.channel = await self.connection.channel() + self.exchange = await self.channel.declare_exchange( + self.exchange_name, + aio_pika.ExchangeType.DIRECT, + durable=True + ) + + async def publish(self, message: str): + # Round-robin algorithm: switch to next routing key after each send + self.current_index = (self.current_index + 1) % self.queue_count + route_key = f"route.{self.current_index + 1}" + + message_obj = aio_pika.Message( + body=message.encode("utf-8"), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ) + + await self.exchange.publish(message_obj, routing_key=route_key) + print(f"Message sent: {message} (routed to {route_key})") +``` + +#### Consumer Code: +```python +async def queue_consumer(queue_name: str, consumer_id: int): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + await channel.set_qos(prefetch_count=1) + + queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) + + async def on_message(message: aio_pika.IncomingMessage): + async with message.process(): + content = message.body.decode("utf-8") + print(f"[Consumer {consumer_id}] Processing message: {content}") + print(f"[Consumer {consumer_id}] From queue: {queue_name}") + print(f"[Consumer {consumer_id}] Routing key: {message.routing_key}") + await asyncio.sleep(1) + + consumer_tag = f"multi_consumer_{consumer_id}_{queue_name}" + await queue.consume(on_message, consumer_tag=consumer_tag) + print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") + await asyncio.Future() +``` + +#### Complete Test Code + +**Test Run Script:** +```python +#!/usr/bin/env python3 +""" +Run Multi-Queue Load Balancing Test +""" + +import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from product.direct_multi_publish import setup_multi_queue_balance, BalancedProducer +from comsumer.direct_multi_consumer import start_balanced_consumers + + +async def run_multi_queue_balance_test(): + """Run multi-queue load balancing test with producer and consumer""" + print("=== Running Multi-Queue Load Balancing Test ===") + + queue_count = 3 + + # Start consumer in background + consumer_task = asyncio.create_task(start_balanced_consumers(queue_count=queue_count)) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup and publish messages + await setup_multi_queue_balance(queue_count=queue_count) + + producer = BalancedProducer(queue_count=queue_count) + await producer.connect() + + for i in range(10): + await producer.publish(f"Task {i + 1}: Multi-queue load balancing test") + await asyncio.sleep(0.3) + + await producer.close() + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Multi-queue load balancing test completed successfully!") + + +if __name__ == "__main__": + asyncio.run(run_multi_queue_balance_test()) +``` + +#### Test Output: +``` +=== Running Multi-Queue Load Balancing Test === +[Consumer 2] Started, listening to queue: task.queue.2 (tag: multi_consumer_2_task.queue.2) +[Consumer 3] Started, listening to queue: task.queue.3 (tag: multi_consumer_3_task.queue.3) +[Consumer 1] Started, listening to queue: task.queue.1 (tag: multi_consumer_1_task.queue.1) + +Queue task.queue.1 bound to routing key: route.1 +Queue task.queue.2 bound to routing key: route.2 +Queue task.queue.3 bound to routing key: route.3 + +[Consumer 2] Processing message: Task 1: Multi-queue load balancing test +[Consumer 2] From queue: task.queue.2 +[Consumer 2] Routing key: route.2 + +Message sent: Task 1: Multi-queue load balancing test (routed to route.2) +[Consumer 3] Processing message: Task 2: Multi-queue load balancing test +[Consumer 3] From queue: task.queue.3 +[Consumer 3] Routing key: route.3 + +Message sent: Task 2: Multi-queue load balancing test (routed to route.3) +[Consumer 1] Processing message: Task 3: Multi-queue load balancing test +[Consumer 1] From queue: task.queue.1 +[Consumer 1] Routing key: route.1 + +Message sent: Task 3: Multi-queue load balancing test (routed to route.1) +Message sent: Task 4: Multi-queue load balancing test (routed to route.2) +[Consumer 2] Processing message: Task 4: Multi-queue load balancing test +[Consumer 2] From queue: task.queue.2 +[Consumer 2] Routing key: route.2 + +Message sent: Task 5: Multi-queue load balancing test (routed to route.3) +[Consumer 3] Processing message: Task 5: Multi-queue load balancing test +[Consumer 3] From queue: task.queue.3 +[Consumer 3] Routing key: route.3 + +Message sent: Task 6: Multi-queue load balancing test (routed to route.1) +[Consumer 1] Processing message: Task 6: Multi-queue load balancing test +[Consumer 1] From queue: task.queue.1 +[Consumer 1] Routing key: route.1 + +Message sent: Task 7: Multi-queue load balancing test (routed to route.2) +[Consumer 2] Processing message: Task 7: Multi-queue load balancing test +[Consumer 2] From queue: task.queue.2 +[Consumer 2] Routing key: route.2 + +Message sent: Task 8: Multi-queue load balancing test (routed to route.3) +[Consumer 3] Processing message: Task 8: Multi-queue load balancing test +[Consumer 3] From queue: task.queue.3 +[Consumer 3] Routing key: route.3 + +Message sent: Task 9: Multi-queue load balancing test (routed to route.1) +[Consumer 1] Processing message: Task 9: Multi-queue load balancing test +[Consumer 1] From queue: task.queue.1 +[Consumer 1] Routing key: route.1 + +Message sent: Task 10: Multi-queue load balancing test (routed to route.2) +[Consumer 2] Processing message: Task 10: Multi-queue load balancing test +[Consumer 2] From queue: task.queue.2 +[Consumer 2] Routing key: route.2 + +✅ Multi-queue load balancing test completed successfully! +``` + +### 3.4 Topic Exchange Demo + +#### Core Features: +- Routing keys use hierarchical strings (separated by ., such as order.create.user) +- Supports two wildcards: + - `*`: Matches 1 level (e.g., user.* can match user.login but not user.login.success) + - `#`: Matches 0 or more levels (e.g., order.# can match order, order.pay, order.pay.success) + +#### Architecture Diagram: +```mermaid +graph TD + P[Producer
topic_publish] --> E[Topic Exchange
demo.topic] + E -->|binding: #.critical| Q1[Queue
demo.topic.queue-critical] + E -->|binding: order.#| Q2[Queue
demo.topic.queue-order] + E -->|binding: user.login.*| Q3[Queue
demo.topic.queue-user.login] + Q1 --> C1[CriticalHandler
topic_CriticalHandler] + Q2 --> C2[OrderHandler
topic_OrderHandler] + Q3 --> C3[UserLoginHandler
topic_UserLoginHandler] + + subgraph "Routing Key Examples" + R1[order.create.critical] -.->|Matches #.critical and order.#| Q1 + R1 -.-> Q2 + R2[user.login.success] -.->|Matches user.login.*| Q3 + R3[system.log.info] -.->|No match| X[Message discarded] + end + + style P fill:#e1f5fe + style E fill:#f3e5f5 + style Q1 fill:#ffebee + style Q2 fill:#e8f5e8 + style Q3 fill:#fff3e0 + style C1 fill:#ffebee + style C2 fill:#e8f5e8 + style C3 fill:#fff3e0 + style X fill:#f5f5f5 +``` + +#### Producer Code: +```python +async def setup_topic_exchange(exchange_name="demo.topic", queue_prefix="demo.topic.queue-"): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + topic_exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.TOPIC, + durable=True + ) + + # Define queues and corresponding binding keys (supports wildcards) + queue_bindings = [ + (f"{queue_prefix}critical", ["#.critical"]), # Match any prefix+critical + (f"{queue_prefix}order", ["order.#"]), # Match all order-prefixed routing keys + (f"{queue_prefix}user.login", ["user.login.*"]) # Match user.login+1 suffix + ] + + for queue_name, binding_keys in queue_bindings: + queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) + for binding_key in binding_keys: + await queue.bind(topic_exchange, routing_key=binding_key) + print(f"Queue {queue_name} bound to routing keys: {binding_keys}") + +async def topic_publish(message: str, routing_key: str, exchange_name: str = "demo.topic"): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + + exchange = await channel.declare_exchange( + exchange_name, + aio_pika.ExchangeType.TOPIC, + durable=True + ) + + message_obj = aio_pika.Message( + body=message.encode("utf-8"), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ) + + await exchange.publish(message_obj, routing_key=routing_key) + print(f"Message sent: {message} (routing key: {routing_key})") +``` + +#### Consumer Code: +```python +async def topic_consumer(queue_name: str, consumer_id: str): + connection = await aio_pika.connect_robust(RABBITMQ_URI) + channel = await connection.channel() + await channel.set_qos(prefetch_count=1) + + queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) + + async def on_message(message: aio_pika.IncomingMessage): + async with message.process(): + message_content = message.body.decode("utf-8") + print(f"[Consumer {consumer_id}] Received message: {message_content}") + print(f"[Consumer {consumer_id}] Message routing key: {message.routing_key}") + print(f"[Consumer {consumer_id}] From queue: {queue_name}") + await asyncio.sleep(1) + + consumer_tag = f"topic_{consumer_id}_{queue_name}" + await queue.consume(on_message, consumer_tag=consumer_tag) + print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") + await asyncio.Future() +``` + +#### Complete Test Code + +**Test Run Script:** +```python +#!/usr/bin/env python3 +""" +Run Topic Exchange Test +""" + +import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from product.topic_publish import setup_topic_exchange, topic_publish +from comsumer.topic_consumer import start_all_topic_consumers + + +async def run_topic_exchange_test(): + """Run topic exchange test with producer and consumer""" + print("=== Running Topic Exchange Test ===") + + # Start consumer in background + consumer_task = asyncio.create_task(start_all_topic_consumers()) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup exchange and publish messages + await setup_topic_exchange() + + test_messages = [ + ("Order creation failed (critical error)", "order.create.critical"), + ("User login successful", "user.login.success"), + ("Order payment completed", "order.pay.success"), + ("System crash (critical error)", "system.crash.critical"), + ("User login failed", "user.login.failed"), + ("Normal system log", "system.log.info") # Won't match any binding key, will be discarded + ] + + for msg, routing_key in test_messages: + await topic_publish(msg, routing_key) + await asyncio.sleep(0.5) + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Topic exchange test completed successfully!") + + +if __name__ == "__main__": + asyncio.run(run_topic_exchange_test()) +``` + +#### Test Output: +``` +=== Running Topic Exchange Test === +[Consumer CriticalHandler] Started, listening to queue: demo.topic.queue-critical (tag: topic_CriticalHandler_demo.topic.queue-critical) +[Consumer UserLoginHandler] Started, listening to queue: demo.topic.queue-user.login (tag: topic_UserLoginHandler_demo.topic.queue-user.login) +[Consumer OrderHandler] Started, listening to queue: demo.topic.queue-order (tag: topic_OrderHandler_demo.topic.queue-order) + +Queue demo.topic.queue-critical bound to routing keys: ['#.critical'] +Queue demo.topic.queue-order bound to routing keys: ['order.#'] +Queue demo.topic.queue-user.login bound to routing keys: ['user.login.*'] + +[Consumer OrderHandler] Received message: Order creation failed (critical error) +[Consumer OrderHandler] Message routing key: order.create.critical +[Consumer OrderHandler] From queue: demo.topic.queue-order + +[Consumer CriticalHandler] Received message: Order creation failed (critical error) +[Consumer CriticalHandler] Message routing key: order.create.critical +[Consumer CriticalHandler] From queue: demo.topic.queue-critical + +Message sent: Order creation failed (critical error) (routing key: order.create.critical) +[Consumer UserLoginHandler] Received message: User login successful +[Consumer UserLoginHandler] Message routing key: user.login.success +[Consumer UserLoginHandler] From queue: demo.topic.queue-user.login + +Message sent: User login successful (routing key: user.login.success) +[Consumer OrderHandler] Received message: Order payment completed +[Consumer OrderHandler] Message routing key: order.pay.success +[Consumer OrderHandler] From queue: demo.topic.queue-order + +Message sent: Order payment completed (routing key: order.pay.success) +[Consumer CriticalHandler] Received message: System crash (critical error) +[Consumer CriticalHandler] Message routing key: system.crash.critical +[Consumer CriticalHandler] From queue: demo.topic.queue-critical + +Message sent: System crash (critical error) (routing key: system.crash.critical) +[Consumer UserLoginHandler] Received message: User login failed +[Consumer UserLoginHandler] Message routing key: user.login.failed +[Consumer UserLoginHandler] From queue: demo.topic.queue-user.login + +Message sent: User login failed (routing key: user.login.failed) +Message sent: Normal system log (routing key: system.log.info) +✅ Topic exchange test completed successfully! +``` + +### 3.5 Reliable RabbitMQ Producer-Consumer + +#### Reliability Guarantee Mechanisms + +To ensure message consumption reliability, we need to address the following aspects: + +**1. Message Persistence** +- Exchange persistence: `durable=True` +- Queue persistence: `durable=True` +- Message persistence: `delivery_mode=PERSISTENT` + +**2. Message Acknowledgment Mechanism** +- Automatic acknowledgment: `async with message.process()` +- Manual acknowledgment: `message.ack()` / `message.nack()` +- Ensure message acknowledgment only after successful processing + +**3. Message Idempotency** +- Use message ID for deduplication +- Database records of processed message IDs +- Prevent duplicate processing of the same message + +**4. Retry Mechanism** +- Configurable maximum retry count +- Internal consumer retry, avoid message re-queuing + +**5. Dead Letter Queue** +- Complete solution for handling failed messages +- Automatic creation of dead letter exchange and queue +- Detailed error information recording + +#### Key Code Implementation + +**Reliable Producer Code:** +```python +class ReliableProducer: + """Reliable Message Producer""" + + def _generate_message_id(self, message_data: Dict[str, Any]) -> str: + """Generate message ID for message""" + message_type = message_data.get('type', '') + content = message_data.get('content', '') + + # For duplicate_test type messages, generate fixed ID based on content + if message_type == 'duplicate_test': + import hashlib + content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() + return f"duplicate_{content_hash[:8]}" + else: + return f"msg_{asyncio.get_running_loop().time()}" + + async def publish_reliable_message(self, message_data: Dict[str, Any]) -> bool: + """Publish reliable message""" + try: + # Generate message ID + message_id = self._generate_message_id(message_data) + + # Add message metadata + message_data.update({ + 'timestamp': datetime.now().isoformat(), + 'message_id': message_id + }) + + # Create persistent message + message = aio_pika.Message( + body=json.dumps(message_data, ensure_ascii=False).encode('utf-8'), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # Message persistence + message_id=message_id, + timestamp=datetime.now() + ) + + # Send message and wait for confirmation + await self.exchange.publish(message, routing_key="reliable") + logger.info(f"[Producer] Message sent: {message_id}") + return True + except Exception as e: + logger.error(f"[Producer] Failed to send message: {e}") + return False +``` + +**Reliable Consumer Code:** +```python +class ReliableConsumer: + """Reliable Message Consumer""" + + def __init__(self, queue_name: Optional[str] = None, consumer_name: Optional[str] = None): + self.processed_messages: Set[str] = set() # Store processed message IDs + + async def process_message(self, message: aio_pika.IncomingMessage): + """Core message processing logic""" + try: + # Parse message + message_data = json.loads(message.body.decode('utf-8')) + message_id = message_data.get('message_id') + + # Check if message has been processed before (idempotency check) + if message_id in self.processed_messages: + logger.warning(f"[Consumer-{self.consumer_name}] 🚫 Duplicate message detected, skipping: {message_id}") + await message.ack() + return + + logger.info(f"[Consumer-{self.consumer_name}] Starting to process message: {message_id}") + + # Retry processing message directly + success = await self.retry_process_message(message_data, message_id, 0) + + # Only record processed message ID after successful processing + if success: + self.processed_messages.add(message_id) + await message.ack() + logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed and acknowledged") + else: + # Processing failed, send to dead letter queue + await self.send_to_dead_letter_queue(message, message_id, "Processing failed") + await message.ack() + except Exception as e: + logger.error(f"[Consumer-{self.consumer_name}] Failed to process message: {e}") + await message.ack() + + async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool: + """Retry processing message directly""" + max_retries = config.max_retries + last_error = None + + for attempt in range(max_retries + 1): + try: + logger.info(f"[Consumer-{self.consumer_name}] Attempting to process message {message_id}, attempt {attempt + 1}") + await self.message_handler(message_data) + logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed successfully") + return True + except Exception as e: + last_error = str(e) + logger.warning(f"[Consumer-{self.consumer_name}] Message {message_id} failed on attempt {attempt + 1}: {e}") + if attempt < max_retries: + await asyncio.sleep(1) + else: + logger.error(f"[Consumer-{self.consumer_name}] Message {message_id} failed after {max_retries} retries: {last_error}") + return False +``` + +**Dead Letter Queue Consumer Code:** +```python +class DeadLetterConsumer: + """Dead Letter Queue Consumer""" + + async def process_dead_letter_message(self, message: aio_pika.IncomingMessage): + """Process dead letter message""" + try: + # Parse dead letter message + dead_letter_data = json.loads(message.body.decode('utf-8')) + original_message = dead_letter_data.get('original_message', {}) + error_info = dead_letter_data.get('error_info', 'Unknown') + message_id = dead_letter_data.get('message_id', 'Unknown') + + # Print dead letter message information + logger.error("=" * 50) + logger.error("[Dead Letter Consumer] Received Dead Letter Message:") + logger.error(f"[Dead Letter Consumer] Message ID: {message_id}") + logger.error(f"[Dead Letter Consumer] Message Content: {json.dumps(original_message, ensure_ascii=False, indent=2)}") + logger.error(f"[Dead Letter Consumer] Error Reason: {error_info}") + logger.error("=" * 50) + + # Save to database + await self.save_to_database(original_message, error_info, message_id) + + # Acknowledge dead letter message + await message.ack() + logger.info(f"[Dead Letter Consumer] Dead letter message {message_id} processed") + except Exception as e: + logger.error(f"[Dead Letter Consumer] Failed to process dead letter message: {e}") + await message.nack(requeue=False) +``` + +#### Complete Test Code + +**Test Run Script:** +```python +""" +RabbitMQ Reliable Messaging Test Module +""" + +import asyncio +import logging + +from reliable_mq import ReliableProducer, ReliableConsumer +from reliable_mq.dead_letter_consumer import DeadLetterConsumer +from reliable_mq.config import config + +# Configure logging +logging.basicConfig( + level=getattr(logging, config.log_level), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +async def run_context_manager_messaging(): + """Test reliable messaging using context manager""" + logger.info("=== Testing Reliable Messaging with Context Manager ===") + + # Use async context manager + async with ReliableProducer() as producer: + async with ReliableConsumer(consumer_name="context_test_consumer") as consumer: + async with DeadLetterConsumer() as dead_letter_consumer: + # Start consumers (run in background) + consumer_task = asyncio.create_task(consumer.start_consuming()) + dead_letter_task = asyncio.create_task(dead_letter_consumer.start_consuming()) + + # Wait for consumers to start + await asyncio.sleep(1) + + # Send test messages + test_messages = [ + {"content": "Important business message 1", "type": "business"}, + {"content": "System notification message 2", "type": "notification"}, + {"content": "User action message 3", "type": "user_action"}, + {"content": "Duplicate message test", "type": "duplicate_test"}, + {"content": "Duplicate message test", "type": "duplicate_test"}, # Duplicate message + {"content": "Message that will fail 1", "type": "will_fail"}, # These messages will fail and go to dead letter queue + {"content": "Message that will fail 2", "type": "will_fail"}, + {"content": "Message that will fail 3", "type": "will_fail"}, + ] + + for msg in test_messages: + await producer.publish_reliable_message(msg) + await asyncio.sleep(0.5) + + # Wait for message processing to complete + await asyncio.sleep(30) + + # Cancel tasks + consumer_task.cancel() + dead_letter_task.cancel() + + +if __name__ == '__main__': + asyncio.run(run_context_manager_messaging()) +``` + +**Configuration File:** +```python +""" +RabbitMQ Reliable Messaging Configuration Module +""" + +import os +from dataclasses import dataclass +from typing import Dict, Any + + +@dataclass +class Config: + """Configuration class""" + # RabbitMQ connection configuration + rabbitmq_uri: str = "amqp://guest:guest@localhost:5673/" + + # Exchange and queue configuration + exchange_name: str = "reliable.exchange" + queue_name: str = "reliable.queue" + + # Dead letter queue configuration + dead_letter_exchange: str = "reliable.dead.letter.exchange" + dead_letter_queue: str = "reliable.dead.letter.queue" + + # Retry configuration + max_retries: int = 3 + message_ttl: int = 300000 # 5 minutes + + # QoS configuration + prefetch_count: int = 1 + + # Logging configuration + log_level: str = "INFO" + + def get_connection_config(self) -> Dict[str, Any]: + """Get connection configuration""" + return { + 'uri': self.rabbitmq_uri, + 'prefetch_count': self.prefetch_count + } + + def get_dead_letter_config(self) -> Dict[str, str]: + """Get dead letter queue configuration""" + return { + 'dead_letter_exchange': self.dead_letter_exchange, + 'dead_letter_queue': self.dead_letter_queue + } + + +# Global configuration instance +config = Config() +``` + +**Custom Message Handler Function:** +```python +async def default_message_handler(message_data: Dict[str, Any]): + """Default message handler function""" + await asyncio.sleep(1) # Simulate processing time + + message_type = message_data.get('type', '') + content = message_data.get('content', '') + + # Simulate business logic processing + if message_type == 'will_fail': + raise Exception(f"Simulated business processing failure: {content}") + + logger.info(f"[Consumer] Business logic processing completed: {content}") +``` + +#### Test Results + +``` +2025-09-07 11:25:02,498 - __main__ - INFO - === Testing Reliable Messaging with Context Manager === +2025-09-07 11:25:02,509 - reliable_mq.reliable_producer - INFO - [Producer] Connected, queue: reliable.queue +2025-09-07 11:25:02,513 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Connected, listening to queue: reliable.queue +2025-09-07 11:25:02,518 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Connected + +2025-09-07 11:25:03,523 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Starting to process message: msg_323632.845174041 +2025-09-07 11:25:03,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message content: {'content': 'Important business message 1', 'type': 'business', 'timestamp': '2025-09-07T11:25:03.519240', 'message_id': 'msg_323632.845174041'} +2025-09-07 11:25:03,524 - reliable_mq.reliable_producer - INFO - [Producer] Message sent: msg_323632.845174041 (type: business, content: Important business message 1) +2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Business logic processing completed: Important business message 1 +2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed successfully +2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed and acknowledged +2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Current processed message count: 1 + +2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ================================================== +2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] 🚫 Duplicate message detected, skipping: +2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message ID: duplicate_090f7015 +2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message content: { + "content": "Duplicate message test", + "type": "duplicate_test", + "timestamp": "2025-09-07T11:25:05.546930", + "message_id": "duplicate_090f7015" +} +2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Total processed messages: 4 +2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ================================================== + +2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message msg_323635.377526708 failed after 3 retries: Simulated business processing failure: Message that will fail 1 +2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message sent to dead letter queue: msg_323635.377526708, error: Processing failed + +2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ================================================== +2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Received Dead Letter Message: +2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message ID: msg_323635.377526708 +2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message Content: { + "content": "Message that will fail 1", + "type": "will_fail", + "timestamp": "2025-09-07T11:25:06.051557", + "message_id": "msg_323635.377526708" +} +2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Error Reason: Processing failed +2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ================================================== + +2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] 💾 Dead letter message saved to database: msg_323635.377526708 +2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Database Record: { + "id": "msg_323635.377526708", + "original_message": { + "content": "Message that will fail 1", + "type": "will_fail", + "timestamp": "2025-09-07T11:25:06.051557", + "message_id": "msg_323635.377526708" + }, + "error_info": "Processing failed", + "created_at": "2025-09-07T11:25:15.064341", + "status": "failed" +} + +2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ================================================== +2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message Statistics: +2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Total Processed: 4 +2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message IDs: ['msg_323632.845174041', 'msg_323633.351571583', 'duplicate_090f7015', 'msg_323633.854272166'] +2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ================================================== +``` + +#### Architecture Diagram +![img_5.png](assert/img_5.png) +```mermaid +graph TD + P[ReliableProducer
Reliable Producer] --> E[Reliable Exchange
reliable.exchange] + E --> Q[Reliable Queue
reliable.queue] + Q --> C[ReliableConsumer
Reliable Consumer] + + C -->|Processing success| ACK[Message ACK
Message Acknowledgment] + C -->|Processing failure| RETRY[Retry Logic
Retry Mechanism] + RETRY -->|Retry success| ACK + RETRY -->|Retry failure| DLQ[Dead Letter Queue
Dead Letter Queue] + + DLQ --> DLC[DeadLetterConsumer
Dead Letter Consumer] + DLC --> DB[(Database
Database)] + + subgraph "Reliability Guarantees" + PERSIST[Message Persistence
Message Persistence] + IDEMPOTENT[Idempotency Check
Idempotency Check] + CONFIRM[Publisher Confirmation
Publisher Confirmation] + end + + subgraph "Message Flow" + MSG1[Normal message] --> SUCCESS[Processing success] + MSG2[Duplicate message] --> SKIP[Skip processing] + MSG3[Failed message] --> FAIL[Failed after retry] + end + + style P fill:#e1f5fe + style E fill:#f3e5f5 + style Q fill:#e8f5e8 + style C fill:#fff3e0 + style DLQ fill:#ffebee + style DLC fill:#ffebee + style DB fill:#f3e5f5 + style ACK fill:#e8f5e8 + style RETRY fill:#fff3e0 +``` diff --git a/RabbitMQ Usage.docx b/RabbitMQ Usage.docx new file mode 100644 index 0000000..9e1136e Binary files /dev/null and b/RabbitMQ Usage.docx differ diff --git a/__pycache__/config.cpython-312.pyc b/__pycache__/config.cpython-312.pyc new file mode 100644 index 0000000..095821f Binary files /dev/null and b/__pycache__/config.cpython-312.pyc differ diff --git a/__pycache__/config.cpython-313.pyc b/__pycache__/config.cpython-313.pyc new file mode 100644 index 0000000..032fc76 Binary files /dev/null and b/__pycache__/config.cpython-313.pyc differ diff --git a/__pycache__/test.cpython-312-pytest-8.4.1.pyc b/__pycache__/test.cpython-312-pytest-8.4.1.pyc new file mode 100644 index 0000000..d0ac656 Binary files /dev/null and b/__pycache__/test.cpython-312-pytest-8.4.1.pyc differ diff --git a/comsumer/__pycache__/__init__.cpython-312.pyc b/comsumer/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..7b3f889 Binary files /dev/null and b/comsumer/__pycache__/__init__.cpython-312.pyc differ diff --git a/comsumer/__pycache__/__init__.cpython-313.pyc b/comsumer/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..15e035b Binary files /dev/null and b/comsumer/__pycache__/__init__.cpython-313.pyc differ diff --git a/comsumer/__pycache__/direct_consumer.cpython-312.pyc b/comsumer/__pycache__/direct_consumer.cpython-312.pyc new file mode 100644 index 0000000..f92bb8c Binary files /dev/null and b/comsumer/__pycache__/direct_consumer.cpython-312.pyc differ diff --git a/comsumer/__pycache__/direct_consumer.cpython-313.pyc b/comsumer/__pycache__/direct_consumer.cpython-313.pyc new file mode 100644 index 0000000..ae624bb Binary files /dev/null and b/comsumer/__pycache__/direct_consumer.cpython-313.pyc differ diff --git a/comsumer/__pycache__/direct_multi_consumer.cpython-312.pyc b/comsumer/__pycache__/direct_multi_consumer.cpython-312.pyc new file mode 100644 index 0000000..157505f Binary files /dev/null and b/comsumer/__pycache__/direct_multi_consumer.cpython-312.pyc differ diff --git a/comsumer/__pycache__/direct_multi_consumer.cpython-313.pyc b/comsumer/__pycache__/direct_multi_consumer.cpython-313.pyc new file mode 100644 index 0000000..2a40cf6 Binary files /dev/null and b/comsumer/__pycache__/direct_multi_consumer.cpython-313.pyc differ diff --git a/comsumer/__pycache__/fanout_consumer.cpython-312.pyc b/comsumer/__pycache__/fanout_consumer.cpython-312.pyc new file mode 100644 index 0000000..08ab318 Binary files /dev/null and b/comsumer/__pycache__/fanout_consumer.cpython-312.pyc differ diff --git a/comsumer/__pycache__/fanout_consumer.cpython-313.pyc b/comsumer/__pycache__/fanout_consumer.cpython-313.pyc new file mode 100644 index 0000000..16d5d12 Binary files /dev/null and b/comsumer/__pycache__/fanout_consumer.cpython-313.pyc differ diff --git a/comsumer/__pycache__/topic_consumer.cpython-312.pyc b/comsumer/__pycache__/topic_consumer.cpython-312.pyc new file mode 100644 index 0000000..ea547af Binary files /dev/null and b/comsumer/__pycache__/topic_consumer.cpython-312.pyc differ diff --git a/comsumer/__pycache__/topic_consumer.cpython-313.pyc b/comsumer/__pycache__/topic_consumer.cpython-313.pyc new file mode 100644 index 0000000..42a2c06 Binary files /dev/null and b/comsumer/__pycache__/topic_consumer.cpython-313.pyc differ diff --git a/product/__pycache__/__init__.cpython-312.pyc b/product/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..b3e5ccf Binary files /dev/null and b/product/__pycache__/__init__.cpython-312.pyc differ diff --git a/product/__pycache__/__init__.cpython-313.pyc b/product/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..71bd1a7 Binary files /dev/null and b/product/__pycache__/__init__.cpython-313.pyc differ diff --git a/product/__pycache__/direct_multi_publish.cpython-312-pytest-8.4.1.pyc b/product/__pycache__/direct_multi_publish.cpython-312-pytest-8.4.1.pyc new file mode 100644 index 0000000..c3258ae Binary files /dev/null and b/product/__pycache__/direct_multi_publish.cpython-312-pytest-8.4.1.pyc differ diff --git a/product/__pycache__/direct_multi_publish.cpython-312.pyc b/product/__pycache__/direct_multi_publish.cpython-312.pyc new file mode 100644 index 0000000..e090d10 Binary files /dev/null and b/product/__pycache__/direct_multi_publish.cpython-312.pyc differ diff --git a/product/__pycache__/direct_multi_publish.cpython-313.pyc b/product/__pycache__/direct_multi_publish.cpython-313.pyc new file mode 100644 index 0000000..21f38d2 Binary files /dev/null and b/product/__pycache__/direct_multi_publish.cpython-313.pyc differ diff --git a/product/__pycache__/direct_publish.cpython-312.pyc b/product/__pycache__/direct_publish.cpython-312.pyc new file mode 100644 index 0000000..89b75a0 Binary files /dev/null and b/product/__pycache__/direct_publish.cpython-312.pyc differ diff --git a/product/__pycache__/direct_publish.cpython-313.pyc b/product/__pycache__/direct_publish.cpython-313.pyc new file mode 100644 index 0000000..a37cbf9 Binary files /dev/null and b/product/__pycache__/direct_publish.cpython-313.pyc differ diff --git a/product/__pycache__/fanout_publish.cpython-312.pyc b/product/__pycache__/fanout_publish.cpython-312.pyc new file mode 100644 index 0000000..c863150 Binary files /dev/null and b/product/__pycache__/fanout_publish.cpython-312.pyc differ diff --git a/product/__pycache__/fanout_publish.cpython-313.pyc b/product/__pycache__/fanout_publish.cpython-313.pyc new file mode 100644 index 0000000..df0c957 Binary files /dev/null and b/product/__pycache__/fanout_publish.cpython-313.pyc differ diff --git a/product/__pycache__/topic_publish.cpython-312.pyc b/product/__pycache__/topic_publish.cpython-312.pyc new file mode 100644 index 0000000..6a5b7ee Binary files /dev/null and b/product/__pycache__/topic_publish.cpython-312.pyc differ diff --git a/product/__pycache__/topic_publish.cpython-313.pyc b/product/__pycache__/topic_publish.cpython-313.pyc new file mode 100644 index 0000000..6124752 Binary files /dev/null and b/product/__pycache__/topic_publish.cpython-313.pyc differ diff --git a/reliable_mq/__pycache__/__init__.cpython-312.pyc b/reliable_mq/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..748da6a Binary files /dev/null and b/reliable_mq/__pycache__/__init__.cpython-312.pyc differ diff --git a/reliable_mq/__pycache__/__init__.cpython-313.pyc b/reliable_mq/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..b864899 Binary files /dev/null and b/reliable_mq/__pycache__/__init__.cpython-313.pyc differ diff --git a/reliable_mq/__pycache__/config.cpython-312.pyc b/reliable_mq/__pycache__/config.cpython-312.pyc new file mode 100644 index 0000000..5fcef2f Binary files /dev/null and b/reliable_mq/__pycache__/config.cpython-312.pyc differ diff --git a/reliable_mq/__pycache__/config.cpython-313.pyc b/reliable_mq/__pycache__/config.cpython-313.pyc new file mode 100644 index 0000000..75ce2df Binary files /dev/null and b/reliable_mq/__pycache__/config.cpython-313.pyc differ diff --git a/reliable_mq/__pycache__/dead_letter_consumer.cpython-312.pyc b/reliable_mq/__pycache__/dead_letter_consumer.cpython-312.pyc new file mode 100644 index 0000000..bfd4945 Binary files /dev/null and b/reliable_mq/__pycache__/dead_letter_consumer.cpython-312.pyc differ diff --git a/reliable_mq/__pycache__/dead_letter_consumer.cpython-313.pyc b/reliable_mq/__pycache__/dead_letter_consumer.cpython-313.pyc new file mode 100644 index 0000000..ab3b19f Binary files /dev/null and b/reliable_mq/__pycache__/dead_letter_consumer.cpython-313.pyc differ diff --git a/reliable_mq/__pycache__/queue_manager.cpython-312.pyc b/reliable_mq/__pycache__/queue_manager.cpython-312.pyc new file mode 100644 index 0000000..02f4ff4 Binary files /dev/null and b/reliable_mq/__pycache__/queue_manager.cpython-312.pyc differ diff --git a/reliable_mq/__pycache__/reliable_consumer.cpython-312.pyc b/reliable_mq/__pycache__/reliable_consumer.cpython-312.pyc new file mode 100644 index 0000000..ed29ee8 Binary files /dev/null and b/reliable_mq/__pycache__/reliable_consumer.cpython-312.pyc differ diff --git a/reliable_mq/__pycache__/reliable_consumer.cpython-313.pyc b/reliable_mq/__pycache__/reliable_consumer.cpython-313.pyc new file mode 100644 index 0000000..1f1d379 Binary files /dev/null and b/reliable_mq/__pycache__/reliable_consumer.cpython-313.pyc differ diff --git a/reliable_mq/__pycache__/reliable_producer.cpython-312.pyc b/reliable_mq/__pycache__/reliable_producer.cpython-312.pyc new file mode 100644 index 0000000..8217f9b Binary files /dev/null and b/reliable_mq/__pycache__/reliable_producer.cpython-312.pyc differ diff --git a/reliable_mq/__pycache__/reliable_producer.cpython-313.pyc b/reliable_mq/__pycache__/reliable_producer.cpython-313.pyc new file mode 100644 index 0000000..765dc10 Binary files /dev/null and b/reliable_mq/__pycache__/reliable_producer.cpython-313.pyc differ diff --git a/reliable_mq/__pycache__/run_reliable_messaging.cpython-312-pytest-8.4.1.pyc b/reliable_mq/__pycache__/run_reliable_messaging.cpython-312-pytest-8.4.1.pyc new file mode 100644 index 0000000..50b14fa Binary files /dev/null and b/reliable_mq/__pycache__/run_reliable_messaging.cpython-312-pytest-8.4.1.pyc differ diff --git a/reliable_mq/__pycache__/test_reliable_messaging.cpython-312-pytest-8.4.1.pyc b/reliable_mq/__pycache__/test_reliable_messaging.cpython-312-pytest-8.4.1.pyc new file mode 100644 index 0000000..1747d7f Binary files /dev/null and b/reliable_mq/__pycache__/test_reliable_messaging.cpython-312-pytest-8.4.1.pyc differ