1363 lines
53 KiB
Markdown
1363 lines
53 KiB
Markdown
## 1. Basic Concepts of MQ
|
|
|
|
### 1.1 What is MQ?
|
|
|
|
MQ stands for Message Queue
|
|
|
|
- "Message Queue" is a container that stores messages during the transmission process
|
|
- It follows the typical producer-consumer model: producers continuously produce messages to the message queue, consumers continuously retrieve messages from the queue
|
|
- Benefits: Producers only need to focus on sending messages, consumers only need to focus on receiving messages, with no business logic intrusion between them, achieving decoupling between producers and consumers
|
|
|
|
### 1.2 Why Use MQ?
|
|
|
|
Message Queue is a middleware used for asynchronous communication in distributed systems. Its core function is to achieve asynchronous message delivery through a **store-and-forward** mechanism, solving problems such as system coupling, traffic peak shaving, and asynchronous processing.
|
|
|
|
#### Main Functions:
|
|
|
|
**1. Decouple System Components**
|
|
- In traditional systems, components usually call each other directly (e.g., Service A directly calls Service B), resulting in tight coupling
|
|
- After introducing message queues, Service A only needs to send messages to the queue without caring who receives or processes them; Service B can get messages from the queue
|
|
- Both parties communicate through message format agreements, independent of each other, reducing system coupling
|
|
|
|
**2. Asynchronous Processing, Improve Efficiency**
|
|
- In synchronous processing scenarios, an operation may need to wait for multiple services to complete sequentially, with total time being the sum of all steps
|
|
- Message queues support asynchronous processing: after the main process is completed, only need to send messages to the queue, can return results without waiting for subsequent steps to complete
|
|
- Other services asynchronously get messages from the queue and process them, significantly improving system response speed and throughput
|
|
|
|
**3. Traffic Peak Shaving, Protect System**
|
|
- Sudden traffic (such as e-commerce flash sales, live streaming) may instantly overwhelm backend services
|
|
- Message queues can serve as a "buffer pool": peak requests first enter the queue, backend services consume messages from the queue according to their own processing capacity
|
|
|
|
**4. Data Synchronization and Distribution**
|
|
- The same message can be consumed by multiple consumers, achieving "send once, process multiple"
|
|
- Cross-system data synchronization, ensuring data consistency through message queues
|
|
|
|
**5. Retry and Fault Tolerance**
|
|
- If consumer services temporarily fail, message queues will retain messages and redeliver them after service recovery
|
|
- Combined with retry mechanisms, can solve problems such as network fluctuations and temporary service unavailability
|
|
---
|
|
|
|
## 2. RabbitMQ
|
|
|
|
### 2.1 Introduction
|
|
|
|
RabbitMQ is an open-source message broker software (also called message queue middleware), written in Erlang, used in distributed systems to implement asynchronous communication and decoupling between applications.
|
|
|
|
#### Features:
|
|
- **Multiple Protocol Support**: Besides AMQP, RabbitMQ also supports STOMP, MQTT and other message protocols
|
|
- **High Reliability**: Ensures no message loss through message persistence, clustering, mirrored queues and other mechanisms
|
|
- **Flexible Routing Mechanism**: Exchanges provide rich message routing rules
|
|
- **Multi-language Client Support**: Provides client libraries for multiple programming languages
|
|
- **Friendly Management Interface**: Has a visual management interface
|
|
|
|
### 2.2 Core Components
|
|

|
|
```
|
|
Producer → Channel → Exchange → Queue → Channel → Consumer
|
|
```
|
|
|
|
#### Core Component Details:
|
|
|
|
**1. Producer**
|
|
- Definition: Application or service that sends messages
|
|
- Function: Encapsulate business data into messages and send to RabbitMQ server
|
|
- Features: No need to care about the final receiver, only need to specify which exchange to send messages to
|
|
|
|
**2. Consumer**
|
|
- Definition: Application or service that receives and processes messages
|
|
- Function: Continuously monitor queues, when messages arrive, get messages from queue and perform business processing
|
|
- Features: Consumers are bound to queues, can inform RabbitMQ whether message processing is complete through automatic or manual acknowledgment mechanisms
|
|
|
|
**3. Queue**
|
|
- Definition: Container that stores messages, the final destination of messages
|
|
- Core Properties:
|
|
- Name: Unique identifier of the queue
|
|
- Durable: If true, queue will be retained after RabbitMQ restart
|
|
- Exclusive: If true, queue is only visible to current connection
|
|
- Auto-delete: If true, queue is automatically deleted when the last consumer disconnects
|
|
|
|
**4. Exchange**
|
|
- Definition: Receives messages sent by producers and forwards them to one or more queues according to routing rules
|
|
- Function: Similar to a "router", responsible for message routing logic
|
|
- Types:
|
|
- **Direct Exchange**: Message's Routing Key exactly matches the queue's Binding Key
|
|
- **Topic Exchange**: Supports wildcards (* matches single word, # matches multiple words)
|
|
- **Fanout Exchange**: Ignores Routing Key, broadcasts messages to all bound queues
|
|
- **Headers Exchange**: Matches based on message properties (Headers) rather than Routing Key
|
|
|
|
**5. Binding**
|
|
- Definition: Association relationship between exchange and queue, containing routing rules
|
|
- Function: Tells exchange "which queues need to receive what kind of messages"
|
|
|
|
**6. Connection**
|
|
- Definition: TCP connection between producer/consumer and RabbitMQ server
|
|
- Features: TCP connection establishment has high overhead, so connections are usually reused
|
|
|
|
**7. Channel**
|
|
- Definition: Virtual connection built on top of TCP connection, the actual channel for message delivery
|
|
- Function: Reduces number of TCP connections, lowers server resource consumption
|
|
|
|
### 2.3 Install RabbitMQ
|
|
|
|
#### With Management Page
|
|
```bash
|
|
docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq:management
|
|
```
|
|
|
|
#### Without Management Page
|
|
```bash
|
|
docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq
|
|
```
|
|
|
|
**Access Management Page**: http://localhost:15673
|
|
- Username: guest
|
|
- Password: guest
|
|
|
|
### 2.4 RabbitMQ Management Page
|
|
|
|
The management page provides rich monitoring and management functions:
|
|
|
|
- **Overview**: Server overview, including connection count, queue count, message rate, etc.
|
|
- **Connections**: Display all client connection information
|
|
- **Channels**: Display channel information for each connection
|
|
- **Exchanges**: Manage exchanges
|
|
- **Queues**: Manage queues
|
|
- **Admin**: User and permission management
|
|
|
|
---
|
|
|
|
## 3. Python Integration
|
|
|
|
- Based on python aio-pika library for integration
|
|
- repo:https://gitea.freeleaps.mathmast.com/icecheng/rabbitmq-test
|
|
### 3.1 Fanout Exchange Demo
|
|
|
|
#### Core Features:
|
|
- Ignores routing key (Routing Key), whatever value is set will be ignored
|
|
- Messages will be broadcast to all queues bound to it
|
|
- Suitable for scenarios requiring "one-to-many" notifications (such as system notifications, log broadcasting)
|
|
|
|
#### Architecture Diagram:
|
|

|
|
```mermaid
|
|
graph TD
|
|
P[Producer<br/>fanout_publish] --> E[Fanout Exchange<br/>demo.fanout]
|
|
E --> Q1[Queue<br/>demo.fanout.queue-0]
|
|
E --> Q2[Queue<br/>demo.fanout.queue-1]
|
|
E --> Q3[Queue<br/>demo.fanout.queue-2]
|
|
Q1 --> C1[Consumer 1<br/>fanout_consumer_1]
|
|
Q2 --> C2[Consumer 2<br/>fanout_consumer_2]
|
|
Q3 --> C3[Consumer 3<br/>fanout_consumer_3]
|
|
|
|
style P fill:#e1f5fe
|
|
style E fill:#f3e5f5
|
|
style Q1 fill:#e8f5e8
|
|
style Q2 fill:#e8f5e8
|
|
style Q3 fill:#e8f5e8
|
|
style C1 fill:#fff3e0
|
|
style C2 fill:#fff3e0
|
|
style C3 fill:#fff3e0
|
|
```
|
|
|
|
#### Producer Code:
|
|
```python
|
|
import asyncio
|
|
import aio_pika
|
|
from config import RABBITMQ_URI
|
|
|
|
async def setup_fanout_exchange(exchange_name="demo.fanout", queue_name_prefix="demo.fanout.queue-"):
|
|
# Establish connection
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URI)
|
|
channel = await connection.channel()
|
|
|
|
# 1. Declare Fanout type exchange
|
|
fanout_exchange = await channel.declare_exchange(
|
|
exchange_name,
|
|
aio_pika.ExchangeType.FANOUT,
|
|
durable=True # Exchange persistence
|
|
)
|
|
|
|
# 2. Define list of queue names to bind
|
|
queue_names = [queue_name_prefix + str(i) for i in range(3)]
|
|
|
|
# 3. Loop to create queues and bind to exchange
|
|
for name in queue_names:
|
|
queue = await channel.declare_queue(
|
|
name,
|
|
durable=True,
|
|
auto_delete=False
|
|
)
|
|
# Bind queue to Fanout exchange (ignore routing key)
|
|
await queue.bind(fanout_exchange, routing_key="")
|
|
|
|
async def fanout_publish(message: str = "", exchange_name: str = "demo.fanout"):
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URI)
|
|
channel = await connection.channel()
|
|
|
|
fanout_exchange = await channel.declare_exchange(
|
|
exchange_name,
|
|
aio_pika.ExchangeType.FANOUT,
|
|
durable=True
|
|
)
|
|
|
|
message = aio_pika.Message(
|
|
body=message.encode("utf-8"),
|
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT # Message persistence
|
|
)
|
|
|
|
# Send message to Fanout exchange
|
|
await fanout_exchange.publish(message, routing_key="")
|
|
await connection.close()
|
|
```
|
|
|
|
#### Consumer Code:
|
|
```python
|
|
import asyncio
|
|
import aio_pika
|
|
from config import RABBITMQ_URI
|
|
|
|
async def fanout_consumer(queue_name: str, consumer_id: int):
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URI)
|
|
channel = await connection.channel()
|
|
await channel.set_qos(prefetch_count=1)
|
|
|
|
queue = await channel.declare_queue(
|
|
queue_name,
|
|
durable=True,
|
|
auto_delete=False
|
|
)
|
|
|
|
async def on_message_received(message: aio_pika.IncomingMessage):
|
|
async with message.process():
|
|
message_content = message.body.decode("utf-8")
|
|
print(f"[Fanout Consumer {consumer_id}] Received broadcast message:")
|
|
print(f" Listening queue: {queue_name}")
|
|
print(f" Message content: {message_content}")
|
|
print(f" Message persistence: {'Yes' if message.delivery_mode == 2 else 'No'}")
|
|
await asyncio.sleep(1)
|
|
|
|
consumer_tag = f"fanout_consumer_{consumer_id}_{queue_name}"
|
|
await queue.consume(on_message_received, consumer_tag=consumer_tag)
|
|
print(f"[Fanout Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
|
|
await asyncio.Future()
|
|
```
|
|
|
|
#### Complete Test Code
|
|
|
|
**Test Run Script:**
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
Run Fanout Exchange Test
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
|
|
# Add current directory to Python path
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from product.fanout_publish import fanout_publish, setup_fanout_exchange
|
|
from comsumer.fanout_consumer import start_all_fanout_consumers
|
|
|
|
|
|
async def run_fanout_test():
|
|
"""Run fanout exchange test with producer and consumer"""
|
|
print("=== Running Fanout Exchange Test ===")
|
|
|
|
# Start consumer in background
|
|
consumer_task = asyncio.create_task(start_all_fanout_consumers())
|
|
|
|
# Wait for consumer to start
|
|
await asyncio.sleep(1)
|
|
|
|
# Setup and publish messages
|
|
await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-")
|
|
await fanout_publish(message="hello world", exchange_name="demo.fanout")
|
|
await fanout_publish(message="test message 2", exchange_name="demo.fanout")
|
|
await fanout_publish(message="test message 3", exchange_name="demo.fanout")
|
|
|
|
# Wait for messages to be processed
|
|
await asyncio.sleep(3)
|
|
|
|
# Cancel consumer
|
|
consumer_task.cancel()
|
|
print("✅ Fanout test completed successfully!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run_fanout_test())
|
|
```
|
|
|
|
#### Test Output:
|
|
```
|
|
=== Running Fanout Exchange Test ===
|
|
[Fanout Consumer 2] Started, listening to queue: demo.fanout.queue-1 (tag: fanout_consumer_2_demo.fanout.queue-1)
|
|
[Fanout Consumer 1] Started, listening to queue: demo.fanout.queue-0 (tag: fanout_consumer_1_demo.fanout.queue-0)
|
|
[Fanout Consumer 3] Started, listening to queue: demo.fanout.queue-2 (tag: fanout_consumer_3_demo.fanout.queue-2)
|
|
|
|
[Fanout Consumer 2] Received broadcast message:
|
|
Listening queue: demo.fanout.queue-1
|
|
Message content: hello world
|
|
Message persistence: Yes
|
|
[Fanout Consumer 1] Received broadcast message:
|
|
Listening queue: demo.fanout.queue-0
|
|
Message content: hello world
|
|
Message persistence: Yes
|
|
[Fanout Consumer 3] Received broadcast message:
|
|
Listening queue: demo.fanout.queue-2
|
|
Message content: hello world
|
|
Message persistence: Yes
|
|
[Fanout Consumer 3] Received broadcast message:
|
|
Listening queue: demo.fanout.queue-2
|
|
Message content: test message 2
|
|
Message persistence: Yes
|
|
[Fanout Consumer 1] Received broadcast message:
|
|
Listening queue: demo.fanout.queue-0
|
|
Message content: test message 2
|
|
Message persistence: Yes
|
|
[Fanout Consumer 2] Received broadcast message:
|
|
Listening queue: demo.fanout.queue-1
|
|
Message content: test message 2
|
|
Message persistence: Yes
|
|
[Fanout Consumer 1] Received broadcast message:
|
|
Listening queue: demo.fanout.queue-0
|
|
Message content: test message 3
|
|
Message persistence: Yes
|
|
[Fanout Consumer 2] Received broadcast message:
|
|
Listening queue: demo.fanout.queue-1
|
|
Message content: test message 3
|
|
Message persistence: Yes
|
|
[Fanout Consumer 3] Received broadcast message:
|
|
Listening queue: demo.fanout.queue-2
|
|
Message content: test message 3
|
|
Message persistence: Yes
|
|
✅ Fanout test completed successfully!
|
|
```
|
|
|
|
### 3.2 Direct Exchange Demo
|
|
|
|
#### Core Features:
|
|
- Based on exact matching between routing key and binding key
|
|
- Messages are only routed to a queue when the message's routing key exactly matches the queue's binding key
|
|
- Suitable for scenarios requiring precise routing (such as log level distinction: error, warning, info routed to different queues)
|
|
|
|
#### Architecture Diagram:
|
|
|
|
```mermaid
|
|
graph TD
|
|
P[Producer<br/>direct_publish] --> E[Direct Exchange<br/>demo.direct]
|
|
E -->|routing_key: error| Q1[Queue<br/>demo.direct.queue-error]
|
|
E -->|routing_key: warning| Q2[Queue<br/>demo.direct.queue-warning]
|
|
E -->|routing_key: info| Q3[Queue<br/>demo.direct.queue-info]
|
|
E -->|routing_key: debug| Q3
|
|
Q1 --> C1[Error Level Consumer<br/>direct_error_level]
|
|
Q2 --> C2[Warning Level Consumer<br/>direct_warning_level]
|
|
Q3 --> C3[Info/Debug Level Consumer<br/>direct_info/debug_level]
|
|
|
|
style P fill:#e1f5fe
|
|
style E fill:#f3e5f5
|
|
style Q1 fill:#ffebee
|
|
style Q2 fill:#fff3e0
|
|
style Q3 fill:#e8f5e8
|
|
style C1 fill:#ffebee
|
|
style C2 fill:#fff3e0
|
|
style C3 fill:#e8f5e8
|
|
```
|
|
|
|
#### Producer Code:
|
|
```python
|
|
async def setup_direct_exchange(exchange_name="demo.direct", queue_prefix="demo.direct.queue-"):
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URI)
|
|
channel = await connection.channel()
|
|
|
|
direct_exchange = await channel.declare_exchange(
|
|
exchange_name,
|
|
aio_pika.ExchangeType.DIRECT,
|
|
durable=True
|
|
)
|
|
|
|
# Define queues and corresponding binding keys
|
|
queue_bindings = [
|
|
(f"{queue_prefix}error", ["error"]), # Handle error level messages
|
|
(f"{queue_prefix}warning", ["warning"]), # Handle warning level messages
|
|
(f"{queue_prefix}info", ["info", "debug"]) # Handle info and debug level messages
|
|
]
|
|
|
|
for queue_name, binding_keys in queue_bindings:
|
|
queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
|
|
for binding_key in binding_keys:
|
|
await queue.bind(direct_exchange, routing_key=binding_key)
|
|
print(f"Queue {queue_name} bound to routing keys: {binding_keys}")
|
|
|
|
async def direct_publish(message: str, routing_key: str, exchange_name: str = "demo.direct"):
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URI)
|
|
channel = await connection.channel()
|
|
|
|
exchange = await channel.declare_exchange(
|
|
exchange_name,
|
|
aio_pika.ExchangeType.DIRECT,
|
|
durable=True
|
|
)
|
|
|
|
message_obj = aio_pika.Message(
|
|
body=message.encode("utf-8"),
|
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
|
|
)
|
|
|
|
await exchange.publish(message_obj, routing_key=routing_key)
|
|
print(f"Message sent: {message} (routing key: {routing_key})")
|
|
```
|
|
|
|
#### Consumer Code:
|
|
```python
|
|
async def direct_consumer(queue_name: str, consumer_label: str):
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URI)
|
|
channel = await connection.channel()
|
|
await channel.set_qos(prefetch_count=1)
|
|
|
|
queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
|
|
|
|
async def on_message_received(message: aio_pika.IncomingMessage):
|
|
async with message.process():
|
|
message_content = message.body.decode("utf-8")
|
|
print(f"[{consumer_label} Consumer] Received message:")
|
|
print(f" Queue name: {queue_name}")
|
|
print(f" Message content: {message_content}")
|
|
print(f" Message routing key: {message.routing_key}")
|
|
print(f" Processing time: {asyncio.get_running_loop().time():.2f}s")
|
|
|
|
# Simulate different processing times for different level messages
|
|
if "error" in queue_name:
|
|
await asyncio.sleep(2)
|
|
elif "warning" in queue_name:
|
|
await asyncio.sleep(1)
|
|
elif "info" in queue_name:
|
|
await asyncio.sleep(0.5)
|
|
|
|
consumer_tag = f"direct_{consumer_label.lower().replace(' ', '_')}_{queue_name}"
|
|
await queue.consume(on_message_received, consumer_tag=consumer_tag)
|
|
print(f"[{consumer_label} Consumer] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
|
|
await asyncio.Future()
|
|
```
|
|
|
|
#### Complete Test Code
|
|
|
|
**Test Run Script:**
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
Run Direct Exchange Test
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
|
|
# Add current directory to Python path
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from product.direct_publish import setup_direct_exchange, direct_publish
|
|
from comsumer.direct_consumer import start_all_direct_consumers
|
|
|
|
|
|
async def run_direct_exchange_test():
|
|
"""Run direct exchange test with producer and consumer"""
|
|
print("=== Running Direct Exchange Test ===")
|
|
|
|
# Start consumer in background
|
|
consumer_task = asyncio.create_task(start_all_direct_consumers())
|
|
|
|
# Wait for consumer to start
|
|
await asyncio.sleep(1)
|
|
|
|
# Setup exchange and publish messages
|
|
await setup_direct_exchange()
|
|
|
|
test_messages = [
|
|
("System crash, unable to start", "error"), # Route to error queue
|
|
("Disk space insufficient", "warning"), # Route to warning queue
|
|
("User login successful", "info"), # Route to info queue
|
|
("Debug info: Database connection successful", "debug") # Route to info queue
|
|
]
|
|
|
|
for msg, routing_key in test_messages:
|
|
await direct_publish(msg, routing_key)
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Wait for messages to be processed
|
|
await asyncio.sleep(3)
|
|
|
|
# Cancel consumer
|
|
consumer_task.cancel()
|
|
print("✅ Direct exchange test completed successfully!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run_direct_exchange_test())
|
|
```
|
|
|
|
#### Test Output:
|
|
```
|
|
=== Running Direct Exchange Test ===
|
|
[Info/Debug Level Consumer] Started, listening to queue: demo.direct.queue-info (tag: direct_info/debug_level_demo.direct.queue-info)
|
|
[Warning Level Consumer] Started, listening to queue: demo.direct.queue-warning (tag: direct_warning_level_demo.direct.queue-warning)
|
|
[Error Level Consumer] Started, listening to queue: demo.direct.queue-error (tag: direct_error_level_demo.direct.queue-error)
|
|
|
|
Queue demo.direct.queue-error bound to routing keys: ['error']
|
|
Queue demo.direct.queue-warning bound to routing keys: ['warning']
|
|
Queue demo.direct.queue-info bound to routing keys: ['info', 'debug']
|
|
|
|
[Error Level Consumer] Received message:
|
|
Queue name: demo.direct.queue-error
|
|
Message content: System crash, unable to start
|
|
Message routing key: error
|
|
Processing time: 322774.03s
|
|
|
|
Message sent: System crash, unable to start (routing key: error)
|
|
[Warning Level Consumer] Received message:
|
|
Queue name: demo.direct.queue-warning
|
|
Message content: Disk space insufficient
|
|
Message routing key: warning
|
|
Processing time: 322774.54s
|
|
|
|
Message sent: Disk space insufficient (routing key: warning)
|
|
[Info/Debug Level Consumer] Received message:
|
|
Queue name: demo.direct.queue-info
|
|
Message content: User login successful
|
|
Message routing key: info
|
|
Processing time: 322775.06s
|
|
|
|
Message sent: User login successful (routing key: info)
|
|
[Info/Debug Level Consumer] Received message:
|
|
Queue name: demo.direct.queue-info
|
|
Message content: Debug info: Database connection successful
|
|
Message routing key: debug
|
|
Processing time: 322775.57s
|
|
|
|
Message sent: Debug info: Database connection successful (routing key: debug)
|
|
✅ Direct exchange test completed successfully!
|
|
```
|
|
|
|
### 3.3 Direct Exchange Demo (Load Balancing)
|
|
|
|
#### Implementation Principle:
|
|
1. Create multiple queues: Each queue is bound to the same Direct Exchange but uses different routing keys
|
|
2. Producer routing strategy: Select a routing key through round-robin, random, or hash based on message characteristics
|
|
3. Consumer processing: Each queue corresponds to one or more consumers, each processing assigned messages
|
|
|
|
#### Architecture Diagram:
|
|
```mermaid
|
|
graph TD
|
|
P[BalancedProducer<br/>Round-robin sending] --> E[Direct Exchange<br/>demo.direct.multi.queue]
|
|
E -->|routing_key: route.1| Q1[Queue<br/>task.queue.1]
|
|
E -->|routing_key: route.2| Q2[Queue<br/>task.queue.2]
|
|
E -->|routing_key: route.3| Q3[Queue<br/>task.queue.3]
|
|
Q1 --> C1[Consumer 1<br/>multi_consumer_1]
|
|
Q2 --> C2[Consumer 2<br/>multi_consumer_2]
|
|
Q3 --> C3[Consumer 3<br/>multi_consumer_3]
|
|
|
|
P -.->|Round-robin algorithm| P
|
|
|
|
style P fill:#e1f5fe
|
|
style E fill:#f3e5f5
|
|
style Q1 fill:#e8f5e8
|
|
style Q2 fill:#e8f5e8
|
|
style Q3 fill:#e8f5e8
|
|
style C1 fill:#fff3e0
|
|
style C2 fill:#fff3e0
|
|
style C3 fill:#fff3e0
|
|
```
|
|
|
|
#### Producer Code:
|
|
```python
|
|
class BalancedProducer:
|
|
def __init__(self, exchange_name="demo.direct.multi.queue", queue_count=3):
|
|
self.exchange_name = exchange_name
|
|
self.queue_count = queue_count
|
|
self.current_index = 0 # Round-robin index
|
|
|
|
async def connect(self):
|
|
self.connection = await aio_pika.connect_robust(RABBITMQ_URI)
|
|
self.channel = await self.connection.channel()
|
|
self.exchange = await self.channel.declare_exchange(
|
|
self.exchange_name,
|
|
aio_pika.ExchangeType.DIRECT,
|
|
durable=True
|
|
)
|
|
|
|
async def publish(self, message: str):
|
|
# Round-robin algorithm: switch to next routing key after each send
|
|
self.current_index = (self.current_index + 1) % self.queue_count
|
|
route_key = f"route.{self.current_index + 1}"
|
|
|
|
message_obj = aio_pika.Message(
|
|
body=message.encode("utf-8"),
|
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
|
|
)
|
|
|
|
await self.exchange.publish(message_obj, routing_key=route_key)
|
|
print(f"Message sent: {message} (routed to {route_key})")
|
|
```
|
|
|
|
#### Consumer Code:
|
|
```python
|
|
async def queue_consumer(queue_name: str, consumer_id: int):
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URI)
|
|
channel = await connection.channel()
|
|
await channel.set_qos(prefetch_count=1)
|
|
|
|
queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
|
|
|
|
async def on_message(message: aio_pika.IncomingMessage):
|
|
async with message.process():
|
|
content = message.body.decode("utf-8")
|
|
print(f"[Consumer {consumer_id}] Processing message: {content}")
|
|
print(f"[Consumer {consumer_id}] From queue: {queue_name}")
|
|
print(f"[Consumer {consumer_id}] Routing key: {message.routing_key}")
|
|
await asyncio.sleep(1)
|
|
|
|
consumer_tag = f"multi_consumer_{consumer_id}_{queue_name}"
|
|
await queue.consume(on_message, consumer_tag=consumer_tag)
|
|
print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
|
|
await asyncio.Future()
|
|
```
|
|
|
|
#### Complete Test Code
|
|
|
|
**Test Run Script:**
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
Run Multi-Queue Load Balancing Test
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
|
|
# Add current directory to Python path
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from product.direct_multi_publish import setup_multi_queue_balance, BalancedProducer
|
|
from comsumer.direct_multi_consumer import start_balanced_consumers
|
|
|
|
|
|
async def run_multi_queue_balance_test():
|
|
"""Run multi-queue load balancing test with producer and consumer"""
|
|
print("=== Running Multi-Queue Load Balancing Test ===")
|
|
|
|
queue_count = 3
|
|
|
|
# Start consumer in background
|
|
consumer_task = asyncio.create_task(start_balanced_consumers(queue_count=queue_count))
|
|
|
|
# Wait for consumer to start
|
|
await asyncio.sleep(1)
|
|
|
|
# Setup and publish messages
|
|
await setup_multi_queue_balance(queue_count=queue_count)
|
|
|
|
producer = BalancedProducer(queue_count=queue_count)
|
|
await producer.connect()
|
|
|
|
for i in range(10):
|
|
await producer.publish(f"Task {i + 1}: Multi-queue load balancing test")
|
|
await asyncio.sleep(0.3)
|
|
|
|
await producer.close()
|
|
|
|
# Wait for messages to be processed
|
|
await asyncio.sleep(3)
|
|
|
|
# Cancel consumer
|
|
consumer_task.cancel()
|
|
print("✅ Multi-queue load balancing test completed successfully!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run_multi_queue_balance_test())
|
|
```
|
|
|
|
#### Test Output:
|
|
```
|
|
=== Running Multi-Queue Load Balancing Test ===
|
|
[Consumer 2] Started, listening to queue: task.queue.2 (tag: multi_consumer_2_task.queue.2)
|
|
[Consumer 3] Started, listening to queue: task.queue.3 (tag: multi_consumer_3_task.queue.3)
|
|
[Consumer 1] Started, listening to queue: task.queue.1 (tag: multi_consumer_1_task.queue.1)
|
|
|
|
Queue task.queue.1 bound to routing key: route.1
|
|
Queue task.queue.2 bound to routing key: route.2
|
|
Queue task.queue.3 bound to routing key: route.3
|
|
|
|
[Consumer 2] Processing message: Task 1: Multi-queue load balancing test
|
|
[Consumer 2] From queue: task.queue.2
|
|
[Consumer 2] Routing key: route.2
|
|
|
|
Message sent: Task 1: Multi-queue load balancing test (routed to route.2)
|
|
[Consumer 3] Processing message: Task 2: Multi-queue load balancing test
|
|
[Consumer 3] From queue: task.queue.3
|
|
[Consumer 3] Routing key: route.3
|
|
|
|
Message sent: Task 2: Multi-queue load balancing test (routed to route.3)
|
|
[Consumer 1] Processing message: Task 3: Multi-queue load balancing test
|
|
[Consumer 1] From queue: task.queue.1
|
|
[Consumer 1] Routing key: route.1
|
|
|
|
Message sent: Task 3: Multi-queue load balancing test (routed to route.1)
|
|
Message sent: Task 4: Multi-queue load balancing test (routed to route.2)
|
|
[Consumer 2] Processing message: Task 4: Multi-queue load balancing test
|
|
[Consumer 2] From queue: task.queue.2
|
|
[Consumer 2] Routing key: route.2
|
|
|
|
Message sent: Task 5: Multi-queue load balancing test (routed to route.3)
|
|
[Consumer 3] Processing message: Task 5: Multi-queue load balancing test
|
|
[Consumer 3] From queue: task.queue.3
|
|
[Consumer 3] Routing key: route.3
|
|
|
|
Message sent: Task 6: Multi-queue load balancing test (routed to route.1)
|
|
[Consumer 1] Processing message: Task 6: Multi-queue load balancing test
|
|
[Consumer 1] From queue: task.queue.1
|
|
[Consumer 1] Routing key: route.1
|
|
|
|
Message sent: Task 7: Multi-queue load balancing test (routed to route.2)
|
|
[Consumer 2] Processing message: Task 7: Multi-queue load balancing test
|
|
[Consumer 2] From queue: task.queue.2
|
|
[Consumer 2] Routing key: route.2
|
|
|
|
Message sent: Task 8: Multi-queue load balancing test (routed to route.3)
|
|
[Consumer 3] Processing message: Task 8: Multi-queue load balancing test
|
|
[Consumer 3] From queue: task.queue.3
|
|
[Consumer 3] Routing key: route.3
|
|
|
|
Message sent: Task 9: Multi-queue load balancing test (routed to route.1)
|
|
[Consumer 1] Processing message: Task 9: Multi-queue load balancing test
|
|
[Consumer 1] From queue: task.queue.1
|
|
[Consumer 1] Routing key: route.1
|
|
|
|
Message sent: Task 10: Multi-queue load balancing test (routed to route.2)
|
|
[Consumer 2] Processing message: Task 10: Multi-queue load balancing test
|
|
[Consumer 2] From queue: task.queue.2
|
|
[Consumer 2] Routing key: route.2
|
|
|
|
✅ Multi-queue load balancing test completed successfully!
|
|
```
|
|
|
|
### 3.4 Topic Exchange Demo
|
|
|
|
#### Core Features:
|
|
- Routing keys use hierarchical strings (separated by ., such as order.create.user)
|
|
- Supports two wildcards:
|
|
- `*`: Matches 1 level (e.g., user.* can match user.login but not user.login.success)
|
|
- `#`: Matches 0 or more levels (e.g., order.# can match order, order.pay, order.pay.success)
|
|
|
|
#### Architecture Diagram:
|
|
```mermaid
|
|
graph TD
|
|
P[Producer<br/>topic_publish] --> E[Topic Exchange<br/>demo.topic]
|
|
E -->|binding: #.critical| Q1[Queue<br/>demo.topic.queue-critical]
|
|
E -->|binding: order.#| Q2[Queue<br/>demo.topic.queue-order]
|
|
E -->|binding: user.login.*| Q3[Queue<br/>demo.topic.queue-user.login]
|
|
Q1 --> C1[CriticalHandler<br/>topic_CriticalHandler]
|
|
Q2 --> C2[OrderHandler<br/>topic_OrderHandler]
|
|
Q3 --> C3[UserLoginHandler<br/>topic_UserLoginHandler]
|
|
|
|
subgraph "Routing Key Examples"
|
|
R1[order.create.critical] -.->|Matches #.critical and order.#| Q1
|
|
R1 -.-> Q2
|
|
R2[user.login.success] -.->|Matches user.login.*| Q3
|
|
R3[system.log.info] -.->|No match| X[Message discarded]
|
|
end
|
|
|
|
style P fill:#e1f5fe
|
|
style E fill:#f3e5f5
|
|
style Q1 fill:#ffebee
|
|
style Q2 fill:#e8f5e8
|
|
style Q3 fill:#fff3e0
|
|
style C1 fill:#ffebee
|
|
style C2 fill:#e8f5e8
|
|
style C3 fill:#fff3e0
|
|
style X fill:#f5f5f5
|
|
```
|
|
|
|
#### Producer Code:
|
|
```python
|
|
async def setup_topic_exchange(exchange_name="demo.topic", queue_prefix="demo.topic.queue-"):
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URI)
|
|
channel = await connection.channel()
|
|
|
|
topic_exchange = await channel.declare_exchange(
|
|
exchange_name,
|
|
aio_pika.ExchangeType.TOPIC,
|
|
durable=True
|
|
)
|
|
|
|
# Define queues and corresponding binding keys (supports wildcards)
|
|
queue_bindings = [
|
|
(f"{queue_prefix}critical", ["#.critical"]), # Match any prefix+critical
|
|
(f"{queue_prefix}order", ["order.#"]), # Match all order-prefixed routing keys
|
|
(f"{queue_prefix}user.login", ["user.login.*"]) # Match user.login+1 suffix
|
|
]
|
|
|
|
for queue_name, binding_keys in queue_bindings:
|
|
queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
|
|
for binding_key in binding_keys:
|
|
await queue.bind(topic_exchange, routing_key=binding_key)
|
|
print(f"Queue {queue_name} bound to routing keys: {binding_keys}")
|
|
|
|
async def topic_publish(message: str, routing_key: str, exchange_name: str = "demo.topic"):
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URI)
|
|
channel = await connection.channel()
|
|
|
|
exchange = await channel.declare_exchange(
|
|
exchange_name,
|
|
aio_pika.ExchangeType.TOPIC,
|
|
durable=True
|
|
)
|
|
|
|
message_obj = aio_pika.Message(
|
|
body=message.encode("utf-8"),
|
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
|
|
)
|
|
|
|
await exchange.publish(message_obj, routing_key=routing_key)
|
|
print(f"Message sent: {message} (routing key: {routing_key})")
|
|
```
|
|
|
|
#### Consumer Code:
|
|
```python
|
|
async def topic_consumer(queue_name: str, consumer_id: str):
|
|
connection = await aio_pika.connect_robust(RABBITMQ_URI)
|
|
channel = await connection.channel()
|
|
await channel.set_qos(prefetch_count=1)
|
|
|
|
queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
|
|
|
|
async def on_message(message: aio_pika.IncomingMessage):
|
|
async with message.process():
|
|
message_content = message.body.decode("utf-8")
|
|
print(f"[Consumer {consumer_id}] Received message: {message_content}")
|
|
print(f"[Consumer {consumer_id}] Message routing key: {message.routing_key}")
|
|
print(f"[Consumer {consumer_id}] From queue: {queue_name}")
|
|
await asyncio.sleep(1)
|
|
|
|
consumer_tag = f"topic_{consumer_id}_{queue_name}"
|
|
await queue.consume(on_message, consumer_tag=consumer_tag)
|
|
print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
|
|
await asyncio.Future()
|
|
```
|
|
|
|
#### Complete Test Code
|
|
|
|
**Test Run Script:**
|
|
```python
|
|
#!/usr/bin/env python3
|
|
"""
|
|
Run Topic Exchange Test
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
|
|
# Add current directory to Python path
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from product.topic_publish import setup_topic_exchange, topic_publish
|
|
from comsumer.topic_consumer import start_all_topic_consumers
|
|
|
|
|
|
async def run_topic_exchange_test():
|
|
"""Run topic exchange test with producer and consumer"""
|
|
print("=== Running Topic Exchange Test ===")
|
|
|
|
# Start consumer in background
|
|
consumer_task = asyncio.create_task(start_all_topic_consumers())
|
|
|
|
# Wait for consumer to start
|
|
await asyncio.sleep(1)
|
|
|
|
# Setup exchange and publish messages
|
|
await setup_topic_exchange()
|
|
|
|
test_messages = [
|
|
("Order creation failed (critical error)", "order.create.critical"),
|
|
("User login successful", "user.login.success"),
|
|
("Order payment completed", "order.pay.success"),
|
|
("System crash (critical error)", "system.crash.critical"),
|
|
("User login failed", "user.login.failed"),
|
|
("Normal system log", "system.log.info") # Won't match any binding key, will be discarded
|
|
]
|
|
|
|
for msg, routing_key in test_messages:
|
|
await topic_publish(msg, routing_key)
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Wait for messages to be processed
|
|
await asyncio.sleep(3)
|
|
|
|
# Cancel consumer
|
|
consumer_task.cancel()
|
|
print("✅ Topic exchange test completed successfully!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run_topic_exchange_test())
|
|
```
|
|
|
|
#### Test Output:
|
|
```
|
|
=== Running Topic Exchange Test ===
|
|
[Consumer CriticalHandler] Started, listening to queue: demo.topic.queue-critical (tag: topic_CriticalHandler_demo.topic.queue-critical)
|
|
[Consumer UserLoginHandler] Started, listening to queue: demo.topic.queue-user.login (tag: topic_UserLoginHandler_demo.topic.queue-user.login)
|
|
[Consumer OrderHandler] Started, listening to queue: demo.topic.queue-order (tag: topic_OrderHandler_demo.topic.queue-order)
|
|
|
|
Queue demo.topic.queue-critical bound to routing keys: ['#.critical']
|
|
Queue demo.topic.queue-order bound to routing keys: ['order.#']
|
|
Queue demo.topic.queue-user.login bound to routing keys: ['user.login.*']
|
|
|
|
[Consumer OrderHandler] Received message: Order creation failed (critical error)
|
|
[Consumer OrderHandler] Message routing key: order.create.critical
|
|
[Consumer OrderHandler] From queue: demo.topic.queue-order
|
|
|
|
[Consumer CriticalHandler] Received message: Order creation failed (critical error)
|
|
[Consumer CriticalHandler] Message routing key: order.create.critical
|
|
[Consumer CriticalHandler] From queue: demo.topic.queue-critical
|
|
|
|
Message sent: Order creation failed (critical error) (routing key: order.create.critical)
|
|
[Consumer UserLoginHandler] Received message: User login successful
|
|
[Consumer UserLoginHandler] Message routing key: user.login.success
|
|
[Consumer UserLoginHandler] From queue: demo.topic.queue-user.login
|
|
|
|
Message sent: User login successful (routing key: user.login.success)
|
|
[Consumer OrderHandler] Received message: Order payment completed
|
|
[Consumer OrderHandler] Message routing key: order.pay.success
|
|
[Consumer OrderHandler] From queue: demo.topic.queue-order
|
|
|
|
Message sent: Order payment completed (routing key: order.pay.success)
|
|
[Consumer CriticalHandler] Received message: System crash (critical error)
|
|
[Consumer CriticalHandler] Message routing key: system.crash.critical
|
|
[Consumer CriticalHandler] From queue: demo.topic.queue-critical
|
|
|
|
Message sent: System crash (critical error) (routing key: system.crash.critical)
|
|
[Consumer UserLoginHandler] Received message: User login failed
|
|
[Consumer UserLoginHandler] Message routing key: user.login.failed
|
|
[Consumer UserLoginHandler] From queue: demo.topic.queue-user.login
|
|
|
|
Message sent: User login failed (routing key: user.login.failed)
|
|
Message sent: Normal system log (routing key: system.log.info)
|
|
✅ Topic exchange test completed successfully!
|
|
```
|
|
|
|
### 3.5 Reliable RabbitMQ Producer-Consumer
|
|
|
|
#### Reliability Guarantee Mechanisms
|
|
|
|
To ensure message consumption reliability, we need to address the following aspects:
|
|
|
|
**1. Message Persistence**
|
|
- Exchange persistence: `durable=True`
|
|
- Queue persistence: `durable=True`
|
|
- Message persistence: `delivery_mode=PERSISTENT`
|
|
|
|
**2. Message Acknowledgment Mechanism**
|
|
- Automatic acknowledgment: `async with message.process()`
|
|
- Manual acknowledgment: `message.ack()` / `message.nack()`
|
|
- Ensure message acknowledgment only after successful processing
|
|
|
|
**3. Message Idempotency**
|
|
- Use message ID for deduplication
|
|
- Database records of processed message IDs
|
|
- Prevent duplicate processing of the same message
|
|
|
|
**4. Retry Mechanism**
|
|
- Configurable maximum retry count
|
|
- Internal consumer retry, avoid message re-queuing
|
|
|
|
**5. Dead Letter Queue**
|
|
- Complete solution for handling failed messages
|
|
- Automatic creation of dead letter exchange and queue
|
|
- Detailed error information recording
|
|
|
|
#### Key Code Implementation
|
|
|
|
**Reliable Producer Code:**
|
|
```python
|
|
class ReliableProducer:
|
|
"""Reliable Message Producer"""
|
|
|
|
def _generate_message_id(self, message_data: Dict[str, Any]) -> str:
|
|
"""Generate message ID for message"""
|
|
message_type = message_data.get('type', '')
|
|
content = message_data.get('content', '')
|
|
|
|
# For duplicate_test type messages, generate fixed ID based on content
|
|
if message_type == 'duplicate_test':
|
|
import hashlib
|
|
content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
|
|
return f"duplicate_{content_hash[:8]}"
|
|
else:
|
|
return f"msg_{asyncio.get_running_loop().time()}"
|
|
|
|
async def publish_reliable_message(self, message_data: Dict[str, Any]) -> bool:
|
|
"""Publish reliable message"""
|
|
try:
|
|
# Generate message ID
|
|
message_id = self._generate_message_id(message_data)
|
|
|
|
# Add message metadata
|
|
message_data.update({
|
|
'timestamp': datetime.now().isoformat(),
|
|
'message_id': message_id
|
|
})
|
|
|
|
# Create persistent message
|
|
message = aio_pika.Message(
|
|
body=json.dumps(message_data, ensure_ascii=False).encode('utf-8'),
|
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # Message persistence
|
|
message_id=message_id,
|
|
timestamp=datetime.now()
|
|
)
|
|
|
|
# Send message and wait for confirmation
|
|
await self.exchange.publish(message, routing_key="reliable")
|
|
logger.info(f"[Producer] Message sent: {message_id}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"[Producer] Failed to send message: {e}")
|
|
return False
|
|
```
|
|
|
|
**Reliable Consumer Code:**
|
|
```python
|
|
class ReliableConsumer:
|
|
"""Reliable Message Consumer"""
|
|
|
|
def __init__(self, queue_name: Optional[str] = None, consumer_name: Optional[str] = None):
|
|
self.processed_messages: Set[str] = set() # Store processed message IDs
|
|
|
|
async def process_message(self, message: aio_pika.IncomingMessage):
|
|
"""Core message processing logic"""
|
|
try:
|
|
# Parse message
|
|
message_data = json.loads(message.body.decode('utf-8'))
|
|
message_id = message_data.get('message_id')
|
|
|
|
# Check if message has been processed before (idempotency check)
|
|
if message_id in self.processed_messages:
|
|
logger.warning(f"[Consumer-{self.consumer_name}] 🚫 Duplicate message detected, skipping: {message_id}")
|
|
await message.ack()
|
|
return
|
|
|
|
logger.info(f"[Consumer-{self.consumer_name}] Starting to process message: {message_id}")
|
|
|
|
# Retry processing message directly
|
|
success = await self.retry_process_message(message_data, message_id, 0)
|
|
|
|
# Only record processed message ID after successful processing
|
|
if success:
|
|
self.processed_messages.add(message_id)
|
|
await message.ack()
|
|
logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed and acknowledged")
|
|
else:
|
|
# Processing failed, send to dead letter queue
|
|
await self.send_to_dead_letter_queue(message, message_id, "Processing failed")
|
|
await message.ack()
|
|
except Exception as e:
|
|
logger.error(f"[Consumer-{self.consumer_name}] Failed to process message: {e}")
|
|
await message.ack()
|
|
|
|
async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool:
|
|
"""Retry processing message directly"""
|
|
max_retries = config.max_retries
|
|
last_error = None
|
|
|
|
for attempt in range(max_retries + 1):
|
|
try:
|
|
logger.info(f"[Consumer-{self.consumer_name}] Attempting to process message {message_id}, attempt {attempt + 1}")
|
|
await self.message_handler(message_data)
|
|
logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed successfully")
|
|
return True
|
|
except Exception as e:
|
|
last_error = str(e)
|
|
logger.warning(f"[Consumer-{self.consumer_name}] Message {message_id} failed on attempt {attempt + 1}: {e}")
|
|
if attempt < max_retries:
|
|
await asyncio.sleep(1)
|
|
else:
|
|
logger.error(f"[Consumer-{self.consumer_name}] Message {message_id} failed after {max_retries} retries: {last_error}")
|
|
return False
|
|
```
|
|
|
|
**Dead Letter Queue Consumer Code:**
|
|
```python
|
|
class DeadLetterConsumer:
|
|
"""Dead Letter Queue Consumer"""
|
|
|
|
async def process_dead_letter_message(self, message: aio_pika.IncomingMessage):
|
|
"""Process dead letter message"""
|
|
try:
|
|
# Parse dead letter message
|
|
dead_letter_data = json.loads(message.body.decode('utf-8'))
|
|
original_message = dead_letter_data.get('original_message', {})
|
|
error_info = dead_letter_data.get('error_info', 'Unknown')
|
|
message_id = dead_letter_data.get('message_id', 'Unknown')
|
|
|
|
# Print dead letter message information
|
|
logger.error("=" * 50)
|
|
logger.error("[Dead Letter Consumer] Received Dead Letter Message:")
|
|
logger.error(f"[Dead Letter Consumer] Message ID: {message_id}")
|
|
logger.error(f"[Dead Letter Consumer] Message Content: {json.dumps(original_message, ensure_ascii=False, indent=2)}")
|
|
logger.error(f"[Dead Letter Consumer] Error Reason: {error_info}")
|
|
logger.error("=" * 50)
|
|
|
|
# Save to database
|
|
await self.save_to_database(original_message, error_info, message_id)
|
|
|
|
# Acknowledge dead letter message
|
|
await message.ack()
|
|
logger.info(f"[Dead Letter Consumer] Dead letter message {message_id} processed")
|
|
except Exception as e:
|
|
logger.error(f"[Dead Letter Consumer] Failed to process dead letter message: {e}")
|
|
await message.nack(requeue=False)
|
|
```
|
|
|
|
#### Complete Test Code
|
|
|
|
**Test Run Script:**
|
|
```python
|
|
"""
|
|
RabbitMQ Reliable Messaging Test Module
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
|
|
from reliable_mq import ReliableProducer, ReliableConsumer
|
|
from reliable_mq.dead_letter_consumer import DeadLetterConsumer
|
|
from reliable_mq.config import config
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=getattr(logging, config.log_level),
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def run_context_manager_messaging():
|
|
"""Test reliable messaging using context manager"""
|
|
logger.info("=== Testing Reliable Messaging with Context Manager ===")
|
|
|
|
# Use async context manager
|
|
async with ReliableProducer() as producer:
|
|
async with ReliableConsumer(consumer_name="context_test_consumer") as consumer:
|
|
async with DeadLetterConsumer() as dead_letter_consumer:
|
|
# Start consumers (run in background)
|
|
consumer_task = asyncio.create_task(consumer.start_consuming())
|
|
dead_letter_task = asyncio.create_task(dead_letter_consumer.start_consuming())
|
|
|
|
# Wait for consumers to start
|
|
await asyncio.sleep(1)
|
|
|
|
# Send test messages
|
|
test_messages = [
|
|
{"content": "Important business message 1", "type": "business"},
|
|
{"content": "System notification message 2", "type": "notification"},
|
|
{"content": "User action message 3", "type": "user_action"},
|
|
{"content": "Duplicate message test", "type": "duplicate_test"},
|
|
{"content": "Duplicate message test", "type": "duplicate_test"}, # Duplicate message
|
|
{"content": "Message that will fail 1", "type": "will_fail"}, # These messages will fail and go to dead letter queue
|
|
{"content": "Message that will fail 2", "type": "will_fail"},
|
|
{"content": "Message that will fail 3", "type": "will_fail"},
|
|
]
|
|
|
|
for msg in test_messages:
|
|
await producer.publish_reliable_message(msg)
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Wait for message processing to complete
|
|
await asyncio.sleep(30)
|
|
|
|
# Cancel tasks
|
|
consumer_task.cancel()
|
|
dead_letter_task.cancel()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(run_context_manager_messaging())
|
|
```
|
|
|
|
**Configuration File:**
|
|
```python
|
|
"""
|
|
RabbitMQ Reliable Messaging Configuration Module
|
|
"""
|
|
|
|
import os
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Any
|
|
|
|
|
|
@dataclass
|
|
class Config:
|
|
"""Configuration class"""
|
|
# RabbitMQ connection configuration
|
|
rabbitmq_uri: str = "amqp://guest:guest@localhost:5673/"
|
|
|
|
# Exchange and queue configuration
|
|
exchange_name: str = "reliable.exchange"
|
|
queue_name: str = "reliable.queue"
|
|
|
|
# Dead letter queue configuration
|
|
dead_letter_exchange: str = "reliable.dead.letter.exchange"
|
|
dead_letter_queue: str = "reliable.dead.letter.queue"
|
|
|
|
# Retry configuration
|
|
max_retries: int = 3
|
|
message_ttl: int = 300000 # 5 minutes
|
|
|
|
# QoS configuration
|
|
prefetch_count: int = 1
|
|
|
|
# Logging configuration
|
|
log_level: str = "INFO"
|
|
|
|
def get_connection_config(self) -> Dict[str, Any]:
|
|
"""Get connection configuration"""
|
|
return {
|
|
'uri': self.rabbitmq_uri,
|
|
'prefetch_count': self.prefetch_count
|
|
}
|
|
|
|
def get_dead_letter_config(self) -> Dict[str, str]:
|
|
"""Get dead letter queue configuration"""
|
|
return {
|
|
'dead_letter_exchange': self.dead_letter_exchange,
|
|
'dead_letter_queue': self.dead_letter_queue
|
|
}
|
|
|
|
|
|
# Global configuration instance
|
|
config = Config()
|
|
```
|
|
|
|
**Custom Message Handler Function:**
|
|
```python
|
|
async def default_message_handler(message_data: Dict[str, Any]):
|
|
"""Default message handler function"""
|
|
await asyncio.sleep(1) # Simulate processing time
|
|
|
|
message_type = message_data.get('type', '')
|
|
content = message_data.get('content', '')
|
|
|
|
# Simulate business logic processing
|
|
if message_type == 'will_fail':
|
|
raise Exception(f"Simulated business processing failure: {content}")
|
|
|
|
logger.info(f"[Consumer] Business logic processing completed: {content}")
|
|
```
|
|
|
|
#### Test Results
|
|
|
|
```
|
|
2025-09-07 11:25:02,498 - __main__ - INFO - === Testing Reliable Messaging with Context Manager ===
|
|
2025-09-07 11:25:02,509 - reliable_mq.reliable_producer - INFO - [Producer] Connected, queue: reliable.queue
|
|
2025-09-07 11:25:02,513 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Connected, listening to queue: reliable.queue
|
|
2025-09-07 11:25:02,518 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Connected
|
|
|
|
2025-09-07 11:25:03,523 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Starting to process message: msg_323632.845174041
|
|
2025-09-07 11:25:03,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message content: {'content': 'Important business message 1', 'type': 'business', 'timestamp': '2025-09-07T11:25:03.519240', 'message_id': 'msg_323632.845174041'}
|
|
2025-09-07 11:25:03,524 - reliable_mq.reliable_producer - INFO - [Producer] Message sent: msg_323632.845174041 (type: business, content: Important business message 1)
|
|
2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Business logic processing completed: Important business message 1
|
|
2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed successfully
|
|
2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed and acknowledged
|
|
2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Current processed message count: 1
|
|
|
|
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ==================================================
|
|
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] 🚫 Duplicate message detected, skipping:
|
|
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message ID: duplicate_090f7015
|
|
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message content: {
|
|
"content": "Duplicate message test",
|
|
"type": "duplicate_test",
|
|
"timestamp": "2025-09-07T11:25:05.546930",
|
|
"message_id": "duplicate_090f7015"
|
|
}
|
|
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Total processed messages: 4
|
|
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ==================================================
|
|
|
|
2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message msg_323635.377526708 failed after 3 retries: Simulated business processing failure: Message that will fail 1
|
|
2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message sent to dead letter queue: msg_323635.377526708, error: Processing failed
|
|
|
|
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ==================================================
|
|
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Received Dead Letter Message:
|
|
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message ID: msg_323635.377526708
|
|
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message Content: {
|
|
"content": "Message that will fail 1",
|
|
"type": "will_fail",
|
|
"timestamp": "2025-09-07T11:25:06.051557",
|
|
"message_id": "msg_323635.377526708"
|
|
}
|
|
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Error Reason: Processing failed
|
|
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ==================================================
|
|
|
|
2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] 💾 Dead letter message saved to database: msg_323635.377526708
|
|
2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Database Record: {
|
|
"id": "msg_323635.377526708",
|
|
"original_message": {
|
|
"content": "Message that will fail 1",
|
|
"type": "will_fail",
|
|
"timestamp": "2025-09-07T11:25:06.051557",
|
|
"message_id": "msg_323635.377526708"
|
|
},
|
|
"error_info": "Processing failed",
|
|
"created_at": "2025-09-07T11:25:15.064341",
|
|
"status": "failed"
|
|
}
|
|
|
|
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ==================================================
|
|
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message Statistics:
|
|
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Total Processed: 4
|
|
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message IDs: ['msg_323632.845174041', 'msg_323633.351571583', 'duplicate_090f7015', 'msg_323633.854272166']
|
|
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ==================================================
|
|
```
|
|
|
|
#### Architecture Diagram
|
|

|
|
```mermaid
|
|
graph TD
|
|
P[ReliableProducer<br/>Reliable Producer] --> E[Reliable Exchange<br/>reliable.exchange]
|
|
E --> Q[Reliable Queue<br/>reliable.queue]
|
|
Q --> C[ReliableConsumer<br/>Reliable Consumer]
|
|
|
|
C -->|Processing success| ACK[Message ACK<br/>Message Acknowledgment]
|
|
C -->|Processing failure| RETRY[Retry Logic<br/>Retry Mechanism]
|
|
RETRY -->|Retry success| ACK
|
|
RETRY -->|Retry failure| DLQ[Dead Letter Queue<br/>Dead Letter Queue]
|
|
|
|
DLQ --> DLC[DeadLetterConsumer<br/>Dead Letter Consumer]
|
|
DLC --> DB[(Database<br/>Database)]
|
|
|
|
subgraph "Reliability Guarantees"
|
|
PERSIST[Message Persistence<br/>Message Persistence]
|
|
IDEMPOTENT[Idempotency Check<br/>Idempotency Check]
|
|
CONFIRM[Publisher Confirmation<br/>Publisher Confirmation]
|
|
end
|
|
|
|
subgraph "Message Flow"
|
|
MSG1[Normal message] --> SUCCESS[Processing success]
|
|
MSG2[Duplicate message] --> SKIP[Skip processing]
|
|
MSG3[Failed message] --> FAIL[Failed after retry]
|
|
end
|
|
|
|
style P fill:#e1f5fe
|
|
style E fill:#f3e5f5
|
|
style Q fill:#e8f5e8
|
|
style C fill:#fff3e0
|
|
style DLQ fill:#ffebee
|
|
style DLC fill:#ffebee
|
|
style DB fill:#f3e5f5
|
|
style ACK fill:#e8f5e8
|
|
style RETRY fill:#fff3e0
|
|
```
|