rabbitmq-test/README_en.md
2025-09-11 09:57:10 +08:00

53 KiB

1. Basic Concepts of MQ

1.1 What is MQ?

MQ stands for Message Queue

  • "Message Queue" is a container that stores messages during the transmission process
  • It follows the typical producer-consumer model: producers continuously produce messages to the message queue, consumers continuously retrieve messages from the queue
  • Benefits: Producers only need to focus on sending messages, consumers only need to focus on receiving messages, with no business logic intrusion between them, achieving decoupling between producers and consumers

1.2 Why Use MQ?

Message Queue is a middleware used for asynchronous communication in distributed systems. Its core function is to achieve asynchronous message delivery through a store-and-forward mechanism, solving problems such as system coupling, traffic peak shaving, and asynchronous processing.

Main Functions:

1. Decouple System Components

  • In traditional systems, components usually call each other directly (e.g., Service A directly calls Service B), resulting in tight coupling
  • After introducing message queues, Service A only needs to send messages to the queue without caring who receives or processes them; Service B can get messages from the queue
  • Both parties communicate through message format agreements, independent of each other, reducing system coupling

2. Asynchronous Processing, Improve Efficiency

  • In synchronous processing scenarios, an operation may need to wait for multiple services to complete sequentially, with total time being the sum of all steps
  • Message queues support asynchronous processing: after the main process is completed, only need to send messages to the queue, can return results without waiting for subsequent steps to complete
  • Other services asynchronously get messages from the queue and process them, significantly improving system response speed and throughput

3. Traffic Peak Shaving, Protect System

  • Sudden traffic (such as e-commerce flash sales, live streaming) may instantly overwhelm backend services
  • Message queues can serve as a "buffer pool": peak requests first enter the queue, backend services consume messages from the queue according to their own processing capacity

4. Data Synchronization and Distribution

  • The same message can be consumed by multiple consumers, achieving "send once, process multiple"
  • Cross-system data synchronization, ensuring data consistency through message queues

5. Retry and Fault Tolerance

  • If consumer services temporarily fail, message queues will retain messages and redeliver them after service recovery
  • Combined with retry mechanisms, can solve problems such as network fluctuations and temporary service unavailability

2. RabbitMQ

2.1 Introduction

RabbitMQ is an open-source message broker software (also called message queue middleware), written in Erlang, used in distributed systems to implement asynchronous communication and decoupling between applications.

Features:

  • Multiple Protocol Support: Besides AMQP, RabbitMQ also supports STOMP, MQTT and other message protocols
  • High Reliability: Ensures no message loss through message persistence, clustering, mirrored queues and other mechanisms
  • Flexible Routing Mechanism: Exchanges provide rich message routing rules
  • Multi-language Client Support: Provides client libraries for multiple programming languages
  • Friendly Management Interface: Has a visual management interface

2.2 Core Components

img_6.png

Producer → Channel → Exchange → Queue → Channel → Consumer

Core Component Details:

1. Producer

  • Definition: Application or service that sends messages
  • Function: Encapsulate business data into messages and send to RabbitMQ server
  • Features: No need to care about the final receiver, only need to specify which exchange to send messages to

2. Consumer

  • Definition: Application or service that receives and processes messages
  • Function: Continuously monitor queues, when messages arrive, get messages from queue and perform business processing
  • Features: Consumers are bound to queues, can inform RabbitMQ whether message processing is complete through automatic or manual acknowledgment mechanisms

3. Queue

  • Definition: Container that stores messages, the final destination of messages
  • Core Properties:
    • Name: Unique identifier of the queue
    • Durable: If true, queue will be retained after RabbitMQ restart
    • Exclusive: If true, queue is only visible to current connection
    • Auto-delete: If true, queue is automatically deleted when the last consumer disconnects

4. Exchange

  • Definition: Receives messages sent by producers and forwards them to one or more queues according to routing rules
  • Function: Similar to a "router", responsible for message routing logic
  • Types:
    • Direct Exchange: Message's Routing Key exactly matches the queue's Binding Key
    • Topic Exchange: Supports wildcards (* matches single word, # matches multiple words)
    • Fanout Exchange: Ignores Routing Key, broadcasts messages to all bound queues
    • Headers Exchange: Matches based on message properties (Headers) rather than Routing Key

5. Binding

  • Definition: Association relationship between exchange and queue, containing routing rules
  • Function: Tells exchange "which queues need to receive what kind of messages"

6. Connection

  • Definition: TCP connection between producer/consumer and RabbitMQ server
  • Features: TCP connection establishment has high overhead, so connections are usually reused

7. Channel

  • Definition: Virtual connection built on top of TCP connection, the actual channel for message delivery
  • Function: Reduces number of TCP connections, lowers server resource consumption

2.3 Install RabbitMQ

With Management Page

docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq:management

Without Management Page

docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq

Access Management Page: http://localhost:15673

  • Username: guest
  • Password: guest

2.4 RabbitMQ Management Page

The management page provides rich monitoring and management functions:

  • Overview: Server overview, including connection count, queue count, message rate, etc.
  • Connections: Display all client connection information
  • Channels: Display channel information for each connection
  • Exchanges: Manage exchanges
  • Queues: Manage queues
  • Admin: User and permission management

3. Python Integration

3.1 Fanout Exchange Demo

Core Features:

  • Ignores routing key (Routing Key), whatever value is set will be ignored
  • Messages will be broadcast to all queues bound to it
  • Suitable for scenarios requiring "one-to-many" notifications (such as system notifications, log broadcasting)

Architecture Diagram:

img_1.png

graph TD
    P[Producer<br/>fanout_publish] --> E[Fanout Exchange<br/>demo.fanout]
    E --> Q1[Queue<br/>demo.fanout.queue-0]
    E --> Q2[Queue<br/>demo.fanout.queue-1]
    E --> Q3[Queue<br/>demo.fanout.queue-2]
    Q1 --> C1[Consumer 1<br/>fanout_consumer_1]
    Q2 --> C2[Consumer 2<br/>fanout_consumer_2]
    Q3 --> C3[Consumer 3<br/>fanout_consumer_3]
    
    style P fill:#e1f5fe
    style E fill:#f3e5f5
    style Q1 fill:#e8f5e8
    style Q2 fill:#e8f5e8
    style Q3 fill:#e8f5e8
    style C1 fill:#fff3e0
    style C2 fill:#fff3e0
    style C3 fill:#fff3e0

Producer Code:

import asyncio
import aio_pika
from config import RABBITMQ_URI

async def setup_fanout_exchange(exchange_name="demo.fanout", queue_name_prefix="demo.fanout.queue-"):
    # Establish connection
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()

    # 1. Declare Fanout type exchange
    fanout_exchange = await channel.declare_exchange(
        exchange_name,
        aio_pika.ExchangeType.FANOUT,
        durable=True  # Exchange persistence
    )

    # 2. Define list of queue names to bind
    queue_names = [queue_name_prefix + str(i) for i in range(3)]

    # 3. Loop to create queues and bind to exchange
    for name in queue_names:
        queue = await channel.declare_queue(
            name,
            durable=True,
            auto_delete=False
        )
        # Bind queue to Fanout exchange (ignore routing key)
        await queue.bind(fanout_exchange, routing_key="")

async def fanout_publish(message: str = "", exchange_name: str = "demo.fanout"):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()

    fanout_exchange = await channel.declare_exchange(
        exchange_name,
        aio_pika.ExchangeType.FANOUT,
        durable=True
    )

    message = aio_pika.Message(
        body=message.encode("utf-8"),
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT  # Message persistence
    )

    # Send message to Fanout exchange
    await fanout_exchange.publish(message, routing_key="")
    await connection.close()

Consumer Code:

import asyncio
import aio_pika
from config import RABBITMQ_URI

async def fanout_consumer(queue_name: str, consumer_id: int):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)

    queue = await channel.declare_queue(
        queue_name,
        durable=True,
        auto_delete=False
    )

    async def on_message_received(message: aio_pika.IncomingMessage):
        async with message.process():
            message_content = message.body.decode("utf-8")
            print(f"[Fanout Consumer {consumer_id}] Received broadcast message:")
            print(f"  Listening queue: {queue_name}")
            print(f"  Message content: {message_content}")
            print(f"  Message persistence: {'Yes' if message.delivery_mode == 2 else 'No'}")
            await asyncio.sleep(1)

    consumer_tag = f"fanout_consumer_{consumer_id}_{queue_name}"
    await queue.consume(on_message_received, consumer_tag=consumer_tag)
    print(f"[Fanout Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
    await asyncio.Future()

Complete Test Code

Test Run Script:

#!/usr/bin/env python3
"""
Run Fanout Exchange Test
"""

import asyncio
import sys
import os

# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from product.fanout_publish import fanout_publish, setup_fanout_exchange
from comsumer.fanout_consumer import start_all_fanout_consumers


async def run_fanout_test():
    """Run fanout exchange test with producer and consumer"""
    print("=== Running Fanout Exchange Test ===")
    
    # Start consumer in background
    consumer_task = asyncio.create_task(start_all_fanout_consumers())
    
    # Wait for consumer to start
    await asyncio.sleep(1)
    
    # Setup and publish messages
    await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-")
    await fanout_publish(message="hello world", exchange_name="demo.fanout")
    await fanout_publish(message="test message 2", exchange_name="demo.fanout")
    await fanout_publish(message="test message 3", exchange_name="demo.fanout")
    
    # Wait for messages to be processed
    await asyncio.sleep(3)
    
    # Cancel consumer
    consumer_task.cancel()
    print("✅ Fanout test completed successfully!")


if __name__ == "__main__":
    asyncio.run(run_fanout_test())

Test Output:

=== Running Fanout Exchange Test ===
[Fanout Consumer 2] Started, listening to queue: demo.fanout.queue-1 (tag: fanout_consumer_2_demo.fanout.queue-1)
[Fanout Consumer 1] Started, listening to queue: demo.fanout.queue-0 (tag: fanout_consumer_1_demo.fanout.queue-0)
[Fanout Consumer 3] Started, listening to queue: demo.fanout.queue-2 (tag: fanout_consumer_3_demo.fanout.queue-2)

[Fanout Consumer 2] Received broadcast message:
  Listening queue: demo.fanout.queue-1
  Message content: hello world
  Message persistence: Yes
[Fanout Consumer 1] Received broadcast message:
  Listening queue: demo.fanout.queue-0
  Message content: hello world
  Message persistence: Yes
[Fanout Consumer 3] Received broadcast message:
  Listening queue: demo.fanout.queue-2
  Message content: hello world
  Message persistence: Yes
[Fanout Consumer 3] Received broadcast message:
  Listening queue: demo.fanout.queue-2
  Message content: test message 2
  Message persistence: Yes
[Fanout Consumer 1] Received broadcast message:
  Listening queue: demo.fanout.queue-0
  Message content: test message 2
  Message persistence: Yes
[Fanout Consumer 2] Received broadcast message:
  Listening queue: demo.fanout.queue-1
  Message content: test message 2
  Message persistence: Yes
[Fanout Consumer 1] Received broadcast message:
  Listening queue: demo.fanout.queue-0
  Message content: test message 3
  Message persistence: Yes
[Fanout Consumer 2] Received broadcast message:
  Listening queue: demo.fanout.queue-1
  Message content: test message 3
  Message persistence: Yes
[Fanout Consumer 3] Received broadcast message:
  Listening queue: demo.fanout.queue-2
  Message content: test message 3
  Message persistence: Yes
✅ Fanout test completed successfully!

3.2 Direct Exchange Demo

Core Features:

  • Based on exact matching between routing key and binding key
  • Messages are only routed to a queue when the message's routing key exactly matches the queue's binding key
  • Suitable for scenarios requiring precise routing (such as log level distinction: error, warning, info routed to different queues)

Architecture Diagram:

graph TD
    P[Producer<br/>direct_publish] --> E[Direct Exchange<br/>demo.direct]
    E -->|routing_key: error| Q1[Queue<br/>demo.direct.queue-error]
    E -->|routing_key: warning| Q2[Queue<br/>demo.direct.queue-warning]
    E -->|routing_key: info| Q3[Queue<br/>demo.direct.queue-info]
    E -->|routing_key: debug| Q3
    Q1 --> C1[Error Level Consumer<br/>direct_error_level]
    Q2 --> C2[Warning Level Consumer<br/>direct_warning_level]
    Q3 --> C3[Info/Debug Level Consumer<br/>direct_info/debug_level]
    
    style P fill:#e1f5fe
    style E fill:#f3e5f5
    style Q1 fill:#ffebee
    style Q2 fill:#fff3e0
    style Q3 fill:#e8f5e8
    style C1 fill:#ffebee
    style C2 fill:#fff3e0
    style C3 fill:#e8f5e8

Producer Code:

async def setup_direct_exchange(exchange_name="demo.direct", queue_prefix="demo.direct.queue-"):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()

    direct_exchange = await channel.declare_exchange(
        exchange_name,
        aio_pika.ExchangeType.DIRECT,
        durable=True
    )

    # Define queues and corresponding binding keys
    queue_bindings = [
        (f"{queue_prefix}error", ["error"]),      # Handle error level messages
        (f"{queue_prefix}warning", ["warning"]),  # Handle warning level messages
        (f"{queue_prefix}info", ["info", "debug"]) # Handle info and debug level messages
    ]

    for queue_name, binding_keys in queue_bindings:
        queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
        for binding_key in binding_keys:
            await queue.bind(direct_exchange, routing_key=binding_key)
        print(f"Queue {queue_name} bound to routing keys: {binding_keys}")

async def direct_publish(message: str, routing_key: str, exchange_name: str = "demo.direct"):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()

    exchange = await channel.declare_exchange(
        exchange_name,
        aio_pika.ExchangeType.DIRECT,
        durable=True
    )

    message_obj = aio_pika.Message(
        body=message.encode("utf-8"),
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT
    )

    await exchange.publish(message_obj, routing_key=routing_key)
    print(f"Message sent: {message} (routing key: {routing_key})")

Consumer Code:

async def direct_consumer(queue_name: str, consumer_label: str):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)

    queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)

    async def on_message_received(message: aio_pika.IncomingMessage):
        async with message.process():
            message_content = message.body.decode("utf-8")
            print(f"[{consumer_label} Consumer] Received message:")
            print(f"  Queue name: {queue_name}")
            print(f"  Message content: {message_content}")
            print(f"  Message routing key: {message.routing_key}")
            print(f"  Processing time: {asyncio.get_running_loop().time():.2f}s")

            # Simulate different processing times for different level messages
            if "error" in queue_name:
                await asyncio.sleep(2)
            elif "warning" in queue_name:
                await asyncio.sleep(1)
            elif "info" in queue_name:
                await asyncio.sleep(0.5)

    consumer_tag = f"direct_{consumer_label.lower().replace(' ', '_')}_{queue_name}"
    await queue.consume(on_message_received, consumer_tag=consumer_tag)
    print(f"[{consumer_label} Consumer] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
    await asyncio.Future()

Complete Test Code

Test Run Script:

#!/usr/bin/env python3
"""
Run Direct Exchange Test
"""

import asyncio
import sys
import os

# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from product.direct_publish import setup_direct_exchange, direct_publish
from comsumer.direct_consumer import start_all_direct_consumers


async def run_direct_exchange_test():
    """Run direct exchange test with producer and consumer"""
    print("=== Running Direct Exchange Test ===")
    
    # Start consumer in background
    consumer_task = asyncio.create_task(start_all_direct_consumers())
    
    # Wait for consumer to start
    await asyncio.sleep(1)
    
    # Setup exchange and publish messages
    await setup_direct_exchange()
    
    test_messages = [
        ("System crash, unable to start", "error"),  # Route to error queue
        ("Disk space insufficient", "warning"),  # Route to warning queue
        ("User login successful", "info"),  # Route to info queue
        ("Debug info: Database connection successful", "debug")  # Route to info queue
    ]

    for msg, routing_key in test_messages:
        await direct_publish(msg, routing_key)
        await asyncio.sleep(0.5)
    
    # Wait for messages to be processed
    await asyncio.sleep(3)
    
    # Cancel consumer
    consumer_task.cancel()
    print("✅ Direct exchange test completed successfully!")


if __name__ == "__main__":
    asyncio.run(run_direct_exchange_test())

Test Output:

=== Running Direct Exchange Test ===
[Info/Debug Level Consumer] Started, listening to queue: demo.direct.queue-info (tag: direct_info/debug_level_demo.direct.queue-info)
[Warning Level Consumer] Started, listening to queue: demo.direct.queue-warning (tag: direct_warning_level_demo.direct.queue-warning)
[Error Level Consumer] Started, listening to queue: demo.direct.queue-error (tag: direct_error_level_demo.direct.queue-error)

Queue demo.direct.queue-error bound to routing keys: ['error']
Queue demo.direct.queue-warning bound to routing keys: ['warning']
Queue demo.direct.queue-info bound to routing keys: ['info', 'debug']

[Error Level Consumer] Received message:
  Queue name: demo.direct.queue-error
  Message content: System crash, unable to start
  Message routing key: error
  Processing time: 322774.03s

Message sent: System crash, unable to start (routing key: error)
[Warning Level Consumer] Received message:
  Queue name: demo.direct.queue-warning
  Message content: Disk space insufficient
  Message routing key: warning
  Processing time: 322774.54s

Message sent: Disk space insufficient (routing key: warning)
[Info/Debug Level Consumer] Received message:
  Queue name: demo.direct.queue-info
  Message content: User login successful
  Message routing key: info
  Processing time: 322775.06s

Message sent: User login successful (routing key: info)
[Info/Debug Level Consumer] Received message:
  Queue name: demo.direct.queue-info
  Message content: Debug info: Database connection successful
  Message routing key: debug
  Processing time: 322775.57s

Message sent: Debug info: Database connection successful (routing key: debug)
✅ Direct exchange test completed successfully!

3.3 Direct Exchange Demo (Load Balancing)

Implementation Principle:

  1. Create multiple queues: Each queue is bound to the same Direct Exchange but uses different routing keys
  2. Producer routing strategy: Select a routing key through round-robin, random, or hash based on message characteristics
  3. Consumer processing: Each queue corresponds to one or more consumers, each processing assigned messages

Architecture Diagram:

graph TD
    P[BalancedProducer<br/>Round-robin sending] --> E[Direct Exchange<br/>demo.direct.multi.queue]
    E -->|routing_key: route.1| Q1[Queue<br/>task.queue.1]
    E -->|routing_key: route.2| Q2[Queue<br/>task.queue.2]
    E -->|routing_key: route.3| Q3[Queue<br/>task.queue.3]
    Q1 --> C1[Consumer 1<br/>multi_consumer_1]
    Q2 --> C2[Consumer 2<br/>multi_consumer_2]
    Q3 --> C3[Consumer 3<br/>multi_consumer_3]
    
    P -.->|Round-robin algorithm| P
    
    style P fill:#e1f5fe
    style E fill:#f3e5f5
    style Q1 fill:#e8f5e8
    style Q2 fill:#e8f5e8
    style Q3 fill:#e8f5e8
    style C1 fill:#fff3e0
    style C2 fill:#fff3e0
    style C3 fill:#fff3e0

Producer Code:

class BalancedProducer:
    def __init__(self, exchange_name="demo.direct.multi.queue", queue_count=3):
        self.exchange_name = exchange_name
        self.queue_count = queue_count
        self.current_index = 0  # Round-robin index

    async def connect(self):
        self.connection = await aio_pika.connect_robust(RABBITMQ_URI)
        self.channel = await self.connection.channel()
        self.exchange = await self.channel.declare_exchange(
            self.exchange_name,
            aio_pika.ExchangeType.DIRECT,
            durable=True
        )

    async def publish(self, message: str):
        # Round-robin algorithm: switch to next routing key after each send
        self.current_index = (self.current_index + 1) % self.queue_count
        route_key = f"route.{self.current_index + 1}"

        message_obj = aio_pika.Message(
            body=message.encode("utf-8"),
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT
        )

        await self.exchange.publish(message_obj, routing_key=route_key)
        print(f"Message sent: {message} (routed to {route_key})")

Consumer Code:

async def queue_consumer(queue_name: str, consumer_id: int):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)

    queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)

    async def on_message(message: aio_pika.IncomingMessage):
        async with message.process():
            content = message.body.decode("utf-8")
            print(f"[Consumer {consumer_id}] Processing message: {content}")
            print(f"[Consumer {consumer_id}] From queue: {queue_name}")
            print(f"[Consumer {consumer_id}] Routing key: {message.routing_key}")
            await asyncio.sleep(1)

    consumer_tag = f"multi_consumer_{consumer_id}_{queue_name}"
    await queue.consume(on_message, consumer_tag=consumer_tag)
    print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
    await asyncio.Future()

Complete Test Code

Test Run Script:

#!/usr/bin/env python3
"""
Run Multi-Queue Load Balancing Test
"""

import asyncio
import sys
import os

# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from product.direct_multi_publish import setup_multi_queue_balance, BalancedProducer
from comsumer.direct_multi_consumer import start_balanced_consumers


async def run_multi_queue_balance_test():
    """Run multi-queue load balancing test with producer and consumer"""
    print("=== Running Multi-Queue Load Balancing Test ===")
    
    queue_count = 3
    
    # Start consumer in background
    consumer_task = asyncio.create_task(start_balanced_consumers(queue_count=queue_count))
    
    # Wait for consumer to start
    await asyncio.sleep(1)
    
    # Setup and publish messages
    await setup_multi_queue_balance(queue_count=queue_count)
    
    producer = BalancedProducer(queue_count=queue_count)
    await producer.connect()
    
    for i in range(10):
        await producer.publish(f"Task {i + 1}: Multi-queue load balancing test")
        await asyncio.sleep(0.3)
    
    await producer.close()
    
    # Wait for messages to be processed
    await asyncio.sleep(3)
    
    # Cancel consumer
    consumer_task.cancel()
    print("✅ Multi-queue load balancing test completed successfully!")


if __name__ == "__main__":
    asyncio.run(run_multi_queue_balance_test())

Test Output:

=== Running Multi-Queue Load Balancing Test ===
[Consumer 2] Started, listening to queue: task.queue.2 (tag: multi_consumer_2_task.queue.2)
[Consumer 3] Started, listening to queue: task.queue.3 (tag: multi_consumer_3_task.queue.3)
[Consumer 1] Started, listening to queue: task.queue.1 (tag: multi_consumer_1_task.queue.1)

Queue task.queue.1 bound to routing key: route.1
Queue task.queue.2 bound to routing key: route.2
Queue task.queue.3 bound to routing key: route.3

[Consumer 2] Processing message: Task 1: Multi-queue load balancing test
[Consumer 2] From queue: task.queue.2
[Consumer 2] Routing key: route.2

Message sent: Task 1: Multi-queue load balancing test (routed to route.2)
[Consumer 3] Processing message: Task 2: Multi-queue load balancing test
[Consumer 3] From queue: task.queue.3
[Consumer 3] Routing key: route.3

Message sent: Task 2: Multi-queue load balancing test (routed to route.3)
[Consumer 1] Processing message: Task 3: Multi-queue load balancing test
[Consumer 1] From queue: task.queue.1
[Consumer 1] Routing key: route.1

Message sent: Task 3: Multi-queue load balancing test (routed to route.1)
Message sent: Task 4: Multi-queue load balancing test (routed to route.2)
[Consumer 2] Processing message: Task 4: Multi-queue load balancing test
[Consumer 2] From queue: task.queue.2
[Consumer 2] Routing key: route.2

Message sent: Task 5: Multi-queue load balancing test (routed to route.3)
[Consumer 3] Processing message: Task 5: Multi-queue load balancing test
[Consumer 3] From queue: task.queue.3
[Consumer 3] Routing key: route.3

Message sent: Task 6: Multi-queue load balancing test (routed to route.1)
[Consumer 1] Processing message: Task 6: Multi-queue load balancing test
[Consumer 1] From queue: task.queue.1
[Consumer 1] Routing key: route.1

Message sent: Task 7: Multi-queue load balancing test (routed to route.2)
[Consumer 2] Processing message: Task 7: Multi-queue load balancing test
[Consumer 2] From queue: task.queue.2
[Consumer 2] Routing key: route.2

Message sent: Task 8: Multi-queue load balancing test (routed to route.3)
[Consumer 3] Processing message: Task 8: Multi-queue load balancing test
[Consumer 3] From queue: task.queue.3
[Consumer 3] Routing key: route.3

Message sent: Task 9: Multi-queue load balancing test (routed to route.1)
[Consumer 1] Processing message: Task 9: Multi-queue load balancing test
[Consumer 1] From queue: task.queue.1
[Consumer 1] Routing key: route.1

Message sent: Task 10: Multi-queue load balancing test (routed to route.2)
[Consumer 2] Processing message: Task 10: Multi-queue load balancing test
[Consumer 2] From queue: task.queue.2
[Consumer 2] Routing key: route.2

✅ Multi-queue load balancing test completed successfully!

3.4 Topic Exchange Demo

Core Features:

  • Routing keys use hierarchical strings (separated by ., such as order.create.user)
  • Supports two wildcards:
    • *: Matches 1 level (e.g., user.* can match user.login but not user.login.success)
    • #: Matches 0 or more levels (e.g., order.# can match order, order.pay, order.pay.success)

Architecture Diagram:

graph TD
    P[Producer<br/>topic_publish] --> E[Topic Exchange<br/>demo.topic]
    E -->|binding: #.critical| Q1[Queue<br/>demo.topic.queue-critical]
    E -->|binding: order.#| Q2[Queue<br/>demo.topic.queue-order]
    E -->|binding: user.login.*| Q3[Queue<br/>demo.topic.queue-user.login]
    Q1 --> C1[CriticalHandler<br/>topic_CriticalHandler]
    Q2 --> C2[OrderHandler<br/>topic_OrderHandler]
    Q3 --> C3[UserLoginHandler<br/>topic_UserLoginHandler]
    
    subgraph "Routing Key Examples"
        R1[order.create.critical] -.->|Matches #.critical and order.#| Q1
        R1 -.-> Q2
        R2[user.login.success] -.->|Matches user.login.*| Q3
        R3[system.log.info] -.->|No match| X[Message discarded]
    end
    
    style P fill:#e1f5fe
    style E fill:#f3e5f5
    style Q1 fill:#ffebee
    style Q2 fill:#e8f5e8
    style Q3 fill:#fff3e0
    style C1 fill:#ffebee
    style C2 fill:#e8f5e8
    style C3 fill:#fff3e0
    style X fill:#f5f5f5

Producer Code:

async def setup_topic_exchange(exchange_name="demo.topic", queue_prefix="demo.topic.queue-"):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()

    topic_exchange = await channel.declare_exchange(
        exchange_name,
        aio_pika.ExchangeType.TOPIC,
        durable=True
    )

    # Define queues and corresponding binding keys (supports wildcards)
    queue_bindings = [
        (f"{queue_prefix}critical", ["#.critical"]),      # Match any prefix+critical
        (f"{queue_prefix}order", ["order.#"]),            # Match all order-prefixed routing keys
        (f"{queue_prefix}user.login", ["user.login.*"])   # Match user.login+1 suffix
    ]

    for queue_name, binding_keys in queue_bindings:
        queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
        for binding_key in binding_keys:
            await queue.bind(topic_exchange, routing_key=binding_key)
        print(f"Queue {queue_name} bound to routing keys: {binding_keys}")

async def topic_publish(message: str, routing_key: str, exchange_name: str = "demo.topic"):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()

    exchange = await channel.declare_exchange(
        exchange_name,
        aio_pika.ExchangeType.TOPIC,
        durable=True
    )

    message_obj = aio_pika.Message(
        body=message.encode("utf-8"),
        delivery_mode=aio_pika.DeliveryMode.PERSISTENT
    )

    await exchange.publish(message_obj, routing_key=routing_key)
    print(f"Message sent: {message} (routing key: {routing_key})")

Consumer Code:

async def topic_consumer(queue_name: str, consumer_id: str):
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)

    queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)

    async def on_message(message: aio_pika.IncomingMessage):
        async with message.process():
            message_content = message.body.decode("utf-8")
            print(f"[Consumer {consumer_id}] Received message: {message_content}")
            print(f"[Consumer {consumer_id}] Message routing key: {message.routing_key}")
            print(f"[Consumer {consumer_id}] From queue: {queue_name}")
            await asyncio.sleep(1)

    consumer_tag = f"topic_{consumer_id}_{queue_name}"
    await queue.consume(on_message, consumer_tag=consumer_tag)
    print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})")
    await asyncio.Future()

Complete Test Code

Test Run Script:

#!/usr/bin/env python3
"""
Run Topic Exchange Test
"""

import asyncio
import sys
import os

# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

from product.topic_publish import setup_topic_exchange, topic_publish
from comsumer.topic_consumer import start_all_topic_consumers


async def run_topic_exchange_test():
    """Run topic exchange test with producer and consumer"""
    print("=== Running Topic Exchange Test ===")
    
    # Start consumer in background
    consumer_task = asyncio.create_task(start_all_topic_consumers())
    
    # Wait for consumer to start
    await asyncio.sleep(1)
    
    # Setup exchange and publish messages
    await setup_topic_exchange()
    
    test_messages = [
        ("Order creation failed (critical error)", "order.create.critical"),
        ("User login successful", "user.login.success"),
        ("Order payment completed", "order.pay.success"),
        ("System crash (critical error)", "system.crash.critical"),
        ("User login failed", "user.login.failed"),
        ("Normal system log", "system.log.info")  # Won't match any binding key, will be discarded
    ]

    for msg, routing_key in test_messages:
        await topic_publish(msg, routing_key)
        await asyncio.sleep(0.5)
    
    # Wait for messages to be processed
    await asyncio.sleep(3)
    
    # Cancel consumer
    consumer_task.cancel()
    print("✅ Topic exchange test completed successfully!")


if __name__ == "__main__":
    asyncio.run(run_topic_exchange_test())

Test Output:

=== Running Topic Exchange Test ===
[Consumer CriticalHandler] Started, listening to queue: demo.topic.queue-critical (tag: topic_CriticalHandler_demo.topic.queue-critical)
[Consumer UserLoginHandler] Started, listening to queue: demo.topic.queue-user.login (tag: topic_UserLoginHandler_demo.topic.queue-user.login)
[Consumer OrderHandler] Started, listening to queue: demo.topic.queue-order (tag: topic_OrderHandler_demo.topic.queue-order)

Queue demo.topic.queue-critical bound to routing keys: ['#.critical']
Queue demo.topic.queue-order bound to routing keys: ['order.#']
Queue demo.topic.queue-user.login bound to routing keys: ['user.login.*']

[Consumer OrderHandler] Received message: Order creation failed (critical error)
[Consumer OrderHandler] Message routing key: order.create.critical
[Consumer OrderHandler] From queue: demo.topic.queue-order

[Consumer CriticalHandler] Received message: Order creation failed (critical error)
[Consumer CriticalHandler] Message routing key: order.create.critical
[Consumer CriticalHandler] From queue: demo.topic.queue-critical

Message sent: Order creation failed (critical error) (routing key: order.create.critical)
[Consumer UserLoginHandler] Received message: User login successful
[Consumer UserLoginHandler] Message routing key: user.login.success
[Consumer UserLoginHandler] From queue: demo.topic.queue-user.login

Message sent: User login successful (routing key: user.login.success)
[Consumer OrderHandler] Received message: Order payment completed
[Consumer OrderHandler] Message routing key: order.pay.success
[Consumer OrderHandler] From queue: demo.topic.queue-order

Message sent: Order payment completed (routing key: order.pay.success)
[Consumer CriticalHandler] Received message: System crash (critical error)
[Consumer CriticalHandler] Message routing key: system.crash.critical
[Consumer CriticalHandler] From queue: demo.topic.queue-critical

Message sent: System crash (critical error) (routing key: system.crash.critical)
[Consumer UserLoginHandler] Received message: User login failed
[Consumer UserLoginHandler] Message routing key: user.login.failed
[Consumer UserLoginHandler] From queue: demo.topic.queue-user.login

Message sent: User login failed (routing key: user.login.failed)
Message sent: Normal system log (routing key: system.log.info)
✅ Topic exchange test completed successfully!

3.5 Reliable RabbitMQ Producer-Consumer

Reliability Guarantee Mechanisms

To ensure message consumption reliability, we need to address the following aspects:

1. Message Persistence

  • Exchange persistence: durable=True
  • Queue persistence: durable=True
  • Message persistence: delivery_mode=PERSISTENT

2. Message Acknowledgment Mechanism

  • Automatic acknowledgment: async with message.process()
  • Manual acknowledgment: message.ack() / message.nack()
  • Ensure message acknowledgment only after successful processing

3. Message Idempotency

  • Use message ID for deduplication
  • Database records of processed message IDs
  • Prevent duplicate processing of the same message

4. Retry Mechanism

  • Configurable maximum retry count
  • Internal consumer retry, avoid message re-queuing

5. Dead Letter Queue

  • Complete solution for handling failed messages
  • Automatic creation of dead letter exchange and queue
  • Detailed error information recording

Key Code Implementation

Reliable Producer Code:

class ReliableProducer:
    """Reliable Message Producer"""
    
    def _generate_message_id(self, message_data: Dict[str, Any]) -> str:
        """Generate message ID for message"""
        message_type = message_data.get('type', '')
        content = message_data.get('content', '')
        
        # For duplicate_test type messages, generate fixed ID based on content
        if message_type == 'duplicate_test':
            import hashlib
            content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
            return f"duplicate_{content_hash[:8]}"
        else:
            return f"msg_{asyncio.get_running_loop().time()}"
    
    async def publish_reliable_message(self, message_data: Dict[str, Any]) -> bool:
        """Publish reliable message"""
        try:
            # Generate message ID
            message_id = self._generate_message_id(message_data)
            
            # Add message metadata
            message_data.update({
                'timestamp': datetime.now().isoformat(),
                'message_id': message_id
            })

            # Create persistent message
            message = aio_pika.Message(
                body=json.dumps(message_data, ensure_ascii=False).encode('utf-8'),
                delivery_mode=aio_pika.DeliveryMode.PERSISTENT,  # Message persistence
                message_id=message_id,
                timestamp=datetime.now()
            )

            # Send message and wait for confirmation
            await self.exchange.publish(message, routing_key="reliable")
            logger.info(f"[Producer] Message sent: {message_id}")
            return True
        except Exception as e:
            logger.error(f"[Producer] Failed to send message: {e}")
            return False

Reliable Consumer Code:

class ReliableConsumer:
    """Reliable Message Consumer"""
    
    def __init__(self, queue_name: Optional[str] = None, consumer_name: Optional[str] = None):
        self.processed_messages: Set[str] = set()  # Store processed message IDs
    
    async def process_message(self, message: aio_pika.IncomingMessage):
        """Core message processing logic"""
        try:
            # Parse message
            message_data = json.loads(message.body.decode('utf-8'))
            message_id = message_data.get('message_id')

            # Check if message has been processed before (idempotency check)
            if message_id in self.processed_messages:
                logger.warning(f"[Consumer-{self.consumer_name}] 🚫 Duplicate message detected, skipping: {message_id}")
                await message.ack()
                return

            logger.info(f"[Consumer-{self.consumer_name}] Starting to process message: {message_id}")

            # Retry processing message directly
            success = await self.retry_process_message(message_data, message_id, 0)

            # Only record processed message ID after successful processing
            if success:
                self.processed_messages.add(message_id)
                await message.ack()
                logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed and acknowledged")
            else:
                # Processing failed, send to dead letter queue
                await self.send_to_dead_letter_queue(message, message_id, "Processing failed")
                await message.ack()
        except Exception as e:
            logger.error(f"[Consumer-{self.consumer_name}] Failed to process message: {e}")
            await message.ack()
    
    async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool:
        """Retry processing message directly"""
        max_retries = config.max_retries
        last_error = None
        
        for attempt in range(max_retries + 1):
            try:
                logger.info(f"[Consumer-{self.consumer_name}] Attempting to process message {message_id}, attempt {attempt + 1}")
                await self.message_handler(message_data)
                logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed successfully")
                return True
            except Exception as e:
                last_error = str(e)
                logger.warning(f"[Consumer-{self.consumer_name}] Message {message_id} failed on attempt {attempt + 1}: {e}")
                if attempt < max_retries:
                    await asyncio.sleep(1)
                else:
                    logger.error(f"[Consumer-{self.consumer_name}] Message {message_id} failed after {max_retries} retries: {last_error}")
                    return False

Dead Letter Queue Consumer Code:

class DeadLetterConsumer:
    """Dead Letter Queue Consumer"""
    
    async def process_dead_letter_message(self, message: aio_pika.IncomingMessage):
        """Process dead letter message"""
        try:
            # Parse dead letter message
            dead_letter_data = json.loads(message.body.decode('utf-8'))
            original_message = dead_letter_data.get('original_message', {})
            error_info = dead_letter_data.get('error_info', 'Unknown')
            message_id = dead_letter_data.get('message_id', 'Unknown')
            
            # Print dead letter message information
            logger.error("=" * 50)
            logger.error("[Dead Letter Consumer] Received Dead Letter Message:")
            logger.error(f"[Dead Letter Consumer] Message ID: {message_id}")
            logger.error(f"[Dead Letter Consumer] Message Content: {json.dumps(original_message, ensure_ascii=False, indent=2)}")
            logger.error(f"[Dead Letter Consumer] Error Reason: {error_info}")
            logger.error("=" * 50)
            
            # Save to database
            await self.save_to_database(original_message, error_info, message_id)
            
            # Acknowledge dead letter message
            await message.ack()
            logger.info(f"[Dead Letter Consumer] Dead letter message {message_id} processed")
        except Exception as e:
            logger.error(f"[Dead Letter Consumer] Failed to process dead letter message: {e}")
            await message.nack(requeue=False)

Complete Test Code

Test Run Script:

"""
RabbitMQ Reliable Messaging Test Module
"""

import asyncio
import logging

from reliable_mq import ReliableProducer, ReliableConsumer
from reliable_mq.dead_letter_consumer import DeadLetterConsumer
from reliable_mq.config import config

# Configure logging
logging.basicConfig(
    level=getattr(logging, config.log_level),
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


async def run_context_manager_messaging():
    """Test reliable messaging using context manager"""
    logger.info("=== Testing Reliable Messaging with Context Manager ===")

    # Use async context manager
    async with ReliableProducer() as producer:
        async with ReliableConsumer(consumer_name="context_test_consumer") as consumer:
            async with DeadLetterConsumer() as dead_letter_consumer:
                # Start consumers (run in background)
                consumer_task = asyncio.create_task(consumer.start_consuming())
                dead_letter_task = asyncio.create_task(dead_letter_consumer.start_consuming())

                # Wait for consumers to start
                await asyncio.sleep(1)

                # Send test messages
                test_messages = [
                    {"content": "Important business message 1", "type": "business"},
                    {"content": "System notification message 2", "type": "notification"},
                    {"content": "User action message 3", "type": "user_action"},
                    {"content": "Duplicate message test", "type": "duplicate_test"},
                    {"content": "Duplicate message test", "type": "duplicate_test"},  # Duplicate message
                    {"content": "Message that will fail 1", "type": "will_fail"},  # These messages will fail and go to dead letter queue
                    {"content": "Message that will fail 2", "type": "will_fail"},
                    {"content": "Message that will fail 3", "type": "will_fail"},
                ]

                for msg in test_messages:
                    await producer.publish_reliable_message(msg)
                    await asyncio.sleep(0.5)

                # Wait for message processing to complete
                await asyncio.sleep(30)

                # Cancel tasks
                consumer_task.cancel()
                dead_letter_task.cancel()


if __name__ == '__main__':
    asyncio.run(run_context_manager_messaging())

Configuration File:

"""
RabbitMQ Reliable Messaging Configuration Module
"""

import os
from dataclasses import dataclass
from typing import Dict, Any


@dataclass
class Config:
    """Configuration class"""
    # RabbitMQ connection configuration
    rabbitmq_uri: str = "amqp://guest:guest@localhost:5673/"
    
    # Exchange and queue configuration
    exchange_name: str = "reliable.exchange"
    queue_name: str = "reliable.queue"
    
    # Dead letter queue configuration
    dead_letter_exchange: str = "reliable.dead.letter.exchange"
    dead_letter_queue: str = "reliable.dead.letter.queue"
    
    # Retry configuration
    max_retries: int = 3
    message_ttl: int = 300000  # 5 minutes
    
    # QoS configuration
    prefetch_count: int = 1
    
    # Logging configuration
    log_level: str = "INFO"
    
    def get_connection_config(self) -> Dict[str, Any]:
        """Get connection configuration"""
        return {
            'uri': self.rabbitmq_uri,
            'prefetch_count': self.prefetch_count
        }
    
    def get_dead_letter_config(self) -> Dict[str, str]:
        """Get dead letter queue configuration"""
        return {
            'dead_letter_exchange': self.dead_letter_exchange,
            'dead_letter_queue': self.dead_letter_queue
        }


# Global configuration instance
config = Config()

Custom Message Handler Function:

async def default_message_handler(message_data: Dict[str, Any]):
    """Default message handler function"""
    await asyncio.sleep(1)  # Simulate processing time
    
    message_type = message_data.get('type', '')
    content = message_data.get('content', '')
    
    # Simulate business logic processing
    if message_type == 'will_fail':
        raise Exception(f"Simulated business processing failure: {content}")
    
    logger.info(f"[Consumer] Business logic processing completed: {content}")

Test Results

2025-09-07 11:25:02,498 - __main__ - INFO - === Testing Reliable Messaging with Context Manager ===
2025-09-07 11:25:02,509 - reliable_mq.reliable_producer - INFO - [Producer] Connected, queue: reliable.queue
2025-09-07 11:25:02,513 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Connected, listening to queue: reliable.queue
2025-09-07 11:25:02,518 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Connected

2025-09-07 11:25:03,523 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Starting to process message: msg_323632.845174041
2025-09-07 11:25:03,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message content: {'content': 'Important business message 1', 'type': 'business', 'timestamp': '2025-09-07T11:25:03.519240', 'message_id': 'msg_323632.845174041'}
2025-09-07 11:25:03,524 - reliable_mq.reliable_producer - INFO - [Producer] Message sent: msg_323632.845174041 (type: business, content: Important business message 1)
2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Business logic processing completed: Important business message 1
2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed successfully
2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed and acknowledged
2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Current processed message count: 1

2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ==================================================
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] 🚫 Duplicate message detected, skipping:
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message ID: duplicate_090f7015
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message content: {
  "content": "Duplicate message test",
  "type": "duplicate_test",
  "timestamp": "2025-09-07T11:25:05.546930",
  "message_id": "duplicate_090f7015"
}
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Total processed messages: 4
2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ==================================================

2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message msg_323635.377526708 failed after 3 retries: Simulated business processing failure: Message that will fail 1
2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message sent to dead letter queue: msg_323635.377526708, error: Processing failed

2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ==================================================
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Received Dead Letter Message:
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message ID: msg_323635.377526708
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message Content: {
  "content": "Message that will fail 1",
  "type": "will_fail",
  "timestamp": "2025-09-07T11:25:06.051557",
  "message_id": "msg_323635.377526708"
}
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Error Reason: Processing failed
2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ==================================================

2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] 💾 Dead letter message saved to database: msg_323635.377526708
2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Database Record: {
  "id": "msg_323635.377526708",
  "original_message": {
    "content": "Message that will fail 1",
    "type": "will_fail",
    "timestamp": "2025-09-07T11:25:06.051557",
    "message_id": "msg_323635.377526708"
  },
  "error_info": "Processing failed",
  "created_at": "2025-09-07T11:25:15.064341",
  "status": "failed"
}

2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ==================================================
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message Statistics:
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Total Processed: 4
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message IDs: ['msg_323632.845174041', 'msg_323633.351571583', 'duplicate_090f7015', 'msg_323633.854272166']
2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ==================================================

Architecture Diagram

img_5.png

graph TD
    P[ReliableProducer<br/>Reliable Producer] --> E[Reliable Exchange<br/>reliable.exchange]
    E --> Q[Reliable Queue<br/>reliable.queue]
    Q --> C[ReliableConsumer<br/>Reliable Consumer]
    
    C -->|Processing success| ACK[Message ACK<br/>Message Acknowledgment]
    C -->|Processing failure| RETRY[Retry Logic<br/>Retry Mechanism]
    RETRY -->|Retry success| ACK
    RETRY -->|Retry failure| DLQ[Dead Letter Queue<br/>Dead Letter Queue]
    
    DLQ --> DLC[DeadLetterConsumer<br/>Dead Letter Consumer]
    DLC --> DB[(Database<br/>Database)]
    
    subgraph "Reliability Guarantees"
        PERSIST[Message Persistence<br/>Message Persistence]
        IDEMPOTENT[Idempotency Check<br/>Idempotency Check]
        CONFIRM[Publisher Confirmation<br/>Publisher Confirmation]
    end
    
    subgraph "Message Flow"
        MSG1[Normal message] --> SUCCESS[Processing success]
        MSG2[Duplicate message] --> SKIP[Skip processing]
        MSG3[Failed message] --> FAIL[Failed after retry]
    end
    
    style P fill:#e1f5fe
    style E fill:#f3e5f5
    style Q fill:#e8f5e8
    style C fill:#fff3e0
    style DLQ fill:#ffebee
    style DLC fill:#ffebee
    style DB fill:#f3e5f5
    style ACK fill:#e8f5e8
    style RETRY fill:#fff3e0