## 1. Basic Concepts of MQ ### 1.1 What is MQ? MQ stands for Message Queue - "Message Queue" is a container that stores messages during the transmission process - It follows the typical producer-consumer model: producers continuously produce messages to the message queue, consumers continuously retrieve messages from the queue - Benefits: Producers only need to focus on sending messages, consumers only need to focus on receiving messages, with no business logic intrusion between them, achieving decoupling between producers and consumers ### 1.2 Why Use MQ? Message Queue is a middleware used for asynchronous communication in distributed systems. Its core function is to achieve asynchronous message delivery through a **store-and-forward** mechanism, solving problems such as system coupling, traffic peak shaving, and asynchronous processing. #### Main Functions: **1. Decouple System Components** - In traditional systems, components usually call each other directly (e.g., Service A directly calls Service B), resulting in tight coupling - After introducing message queues, Service A only needs to send messages to the queue without caring who receives or processes them; Service B can get messages from the queue - Both parties communicate through message format agreements, independent of each other, reducing system coupling **2. Asynchronous Processing, Improve Efficiency** - In synchronous processing scenarios, an operation may need to wait for multiple services to complete sequentially, with total time being the sum of all steps - Message queues support asynchronous processing: after the main process is completed, only need to send messages to the queue, can return results without waiting for subsequent steps to complete - Other services asynchronously get messages from the queue and process them, significantly improving system response speed and throughput **3. Traffic Peak Shaving, Protect System** - Sudden traffic (such as e-commerce flash sales, live streaming) may instantly overwhelm backend services - Message queues can serve as a "buffer pool": peak requests first enter the queue, backend services consume messages from the queue according to their own processing capacity **4. Data Synchronization and Distribution** - The same message can be consumed by multiple consumers, achieving "send once, process multiple" - Cross-system data synchronization, ensuring data consistency through message queues **5. Retry and Fault Tolerance** - If consumer services temporarily fail, message queues will retain messages and redeliver them after service recovery - Combined with retry mechanisms, can solve problems such as network fluctuations and temporary service unavailability --- ## 2. RabbitMQ ### 2.1 Introduction RabbitMQ is an open-source message broker software (also called message queue middleware), written in Erlang, used in distributed systems to implement asynchronous communication and decoupling between applications. #### Features: - **Multiple Protocol Support**: Besides AMQP, RabbitMQ also supports STOMP, MQTT and other message protocols - **High Reliability**: Ensures no message loss through message persistence, clustering, mirrored queues and other mechanisms - **Flexible Routing Mechanism**: Exchanges provide rich message routing rules - **Multi-language Client Support**: Provides client libraries for multiple programming languages - **Friendly Management Interface**: Has a visual management interface ### 2.2 Core Components ![img_6.png](assert/img_6.png) ``` Producer → Channel → Exchange → Queue → Channel → Consumer ``` #### Core Component Details: **1. Producer** - Definition: Application or service that sends messages - Function: Encapsulate business data into messages and send to RabbitMQ server - Features: No need to care about the final receiver, only need to specify which exchange to send messages to **2. Consumer** - Definition: Application or service that receives and processes messages - Function: Continuously monitor queues, when messages arrive, get messages from queue and perform business processing - Features: Consumers are bound to queues, can inform RabbitMQ whether message processing is complete through automatic or manual acknowledgment mechanisms **3. Queue** - Definition: Container that stores messages, the final destination of messages - Core Properties: - Name: Unique identifier of the queue - Durable: If true, queue will be retained after RabbitMQ restart - Exclusive: If true, queue is only visible to current connection - Auto-delete: If true, queue is automatically deleted when the last consumer disconnects **4. Exchange** - Definition: Receives messages sent by producers and forwards them to one or more queues according to routing rules - Function: Similar to a "router", responsible for message routing logic - Types: - **Direct Exchange**: Message's Routing Key exactly matches the queue's Binding Key - **Topic Exchange**: Supports wildcards (* matches single word, # matches multiple words) - **Fanout Exchange**: Ignores Routing Key, broadcasts messages to all bound queues - **Headers Exchange**: Matches based on message properties (Headers) rather than Routing Key **5. Binding** - Definition: Association relationship between exchange and queue, containing routing rules - Function: Tells exchange "which queues need to receive what kind of messages" **6. Connection** - Definition: TCP connection between producer/consumer and RabbitMQ server - Features: TCP connection establishment has high overhead, so connections are usually reused **7. Channel** - Definition: Virtual connection built on top of TCP connection, the actual channel for message delivery - Function: Reduces number of TCP connections, lowers server resource consumption ### 2.3 Install RabbitMQ #### With Management Page ```bash docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq:management ``` #### Without Management Page ```bash docker run -d --name rabbitmq -p 5673:5672 -p 15673:15672 rabbitmq ``` **Access Management Page**: http://localhost:15673 - Username: guest - Password: guest ### 2.4 RabbitMQ Management Page The management page provides rich monitoring and management functions: - **Overview**: Server overview, including connection count, queue count, message rate, etc. - **Connections**: Display all client connection information - **Channels**: Display channel information for each connection - **Exchanges**: Manage exchanges - **Queues**: Manage queues - **Admin**: User and permission management --- ## 3. Python Integration - Based on python aio-pika library for integration - repo:https://gitea.freeleaps.mathmast.com/icecheng/rabbitmq-test ### 3.1 Fanout Exchange Demo #### Core Features: - Ignores routing key (Routing Key), whatever value is set will be ignored - Messages will be broadcast to all queues bound to it - Suitable for scenarios requiring "one-to-many" notifications (such as system notifications, log broadcasting) #### Architecture Diagram: ![img_1.png](assert/img_1.png) ```mermaid graph TD P[Producer
fanout_publish] --> E[Fanout Exchange
demo.fanout] E --> Q1[Queue
demo.fanout.queue-0] E --> Q2[Queue
demo.fanout.queue-1] E --> Q3[Queue
demo.fanout.queue-2] Q1 --> C1[Consumer 1
fanout_consumer_1] Q2 --> C2[Consumer 2
fanout_consumer_2] Q3 --> C3[Consumer 3
fanout_consumer_3] style P fill:#e1f5fe style E fill:#f3e5f5 style Q1 fill:#e8f5e8 style Q2 fill:#e8f5e8 style Q3 fill:#e8f5e8 style C1 fill:#fff3e0 style C2 fill:#fff3e0 style C3 fill:#fff3e0 ``` #### Producer Code: ```python import asyncio import aio_pika from config import RABBITMQ_URI async def setup_fanout_exchange(exchange_name="demo.fanout", queue_name_prefix="demo.fanout.queue-"): # Establish connection connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() # 1. Declare Fanout type exchange fanout_exchange = await channel.declare_exchange( exchange_name, aio_pika.ExchangeType.FANOUT, durable=True # Exchange persistence ) # 2. Define list of queue names to bind queue_names = [queue_name_prefix + str(i) for i in range(3)] # 3. Loop to create queues and bind to exchange for name in queue_names: queue = await channel.declare_queue( name, durable=True, auto_delete=False ) # Bind queue to Fanout exchange (ignore routing key) await queue.bind(fanout_exchange, routing_key="") async def fanout_publish(message: str = "", exchange_name: str = "demo.fanout"): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() fanout_exchange = await channel.declare_exchange( exchange_name, aio_pika.ExchangeType.FANOUT, durable=True ) message = aio_pika.Message( body=message.encode("utf-8"), delivery_mode=aio_pika.DeliveryMode.PERSISTENT # Message persistence ) # Send message to Fanout exchange await fanout_exchange.publish(message, routing_key="") await connection.close() ``` #### Consumer Code: ```python import asyncio import aio_pika from config import RABBITMQ_URI async def fanout_consumer(queue_name: str, consumer_id: int): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue( queue_name, durable=True, auto_delete=False ) async def on_message_received(message: aio_pika.IncomingMessage): async with message.process(): message_content = message.body.decode("utf-8") print(f"[Fanout Consumer {consumer_id}] Received broadcast message:") print(f" Listening queue: {queue_name}") print(f" Message content: {message_content}") print(f" Message persistence: {'Yes' if message.delivery_mode == 2 else 'No'}") await asyncio.sleep(1) consumer_tag = f"fanout_consumer_{consumer_id}_{queue_name}" await queue.consume(on_message_received, consumer_tag=consumer_tag) print(f"[Fanout Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") await asyncio.Future() ``` #### Complete Test Code **Test Run Script:** ```python #!/usr/bin/env python3 """ Run Fanout Exchange Test """ import asyncio import sys import os # Add current directory to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from product.fanout_publish import fanout_publish, setup_fanout_exchange from comsumer.fanout_consumer import start_all_fanout_consumers async def run_fanout_test(): """Run fanout exchange test with producer and consumer""" print("=== Running Fanout Exchange Test ===") # Start consumer in background consumer_task = asyncio.create_task(start_all_fanout_consumers()) # Wait for consumer to start await asyncio.sleep(1) # Setup and publish messages await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-") await fanout_publish(message="hello world", exchange_name="demo.fanout") await fanout_publish(message="test message 2", exchange_name="demo.fanout") await fanout_publish(message="test message 3", exchange_name="demo.fanout") # Wait for messages to be processed await asyncio.sleep(3) # Cancel consumer consumer_task.cancel() print("✅ Fanout test completed successfully!") if __name__ == "__main__": asyncio.run(run_fanout_test()) ``` #### Test Output: ``` === Running Fanout Exchange Test === [Fanout Consumer 2] Started, listening to queue: demo.fanout.queue-1 (tag: fanout_consumer_2_demo.fanout.queue-1) [Fanout Consumer 1] Started, listening to queue: demo.fanout.queue-0 (tag: fanout_consumer_1_demo.fanout.queue-0) [Fanout Consumer 3] Started, listening to queue: demo.fanout.queue-2 (tag: fanout_consumer_3_demo.fanout.queue-2) [Fanout Consumer 2] Received broadcast message: Listening queue: demo.fanout.queue-1 Message content: hello world Message persistence: Yes [Fanout Consumer 1] Received broadcast message: Listening queue: demo.fanout.queue-0 Message content: hello world Message persistence: Yes [Fanout Consumer 3] Received broadcast message: Listening queue: demo.fanout.queue-2 Message content: hello world Message persistence: Yes [Fanout Consumer 3] Received broadcast message: Listening queue: demo.fanout.queue-2 Message content: test message 2 Message persistence: Yes [Fanout Consumer 1] Received broadcast message: Listening queue: demo.fanout.queue-0 Message content: test message 2 Message persistence: Yes [Fanout Consumer 2] Received broadcast message: Listening queue: demo.fanout.queue-1 Message content: test message 2 Message persistence: Yes [Fanout Consumer 1] Received broadcast message: Listening queue: demo.fanout.queue-0 Message content: test message 3 Message persistence: Yes [Fanout Consumer 2] Received broadcast message: Listening queue: demo.fanout.queue-1 Message content: test message 3 Message persistence: Yes [Fanout Consumer 3] Received broadcast message: Listening queue: demo.fanout.queue-2 Message content: test message 3 Message persistence: Yes ✅ Fanout test completed successfully! ``` ### 3.2 Direct Exchange Demo #### Core Features: - Based on exact matching between routing key and binding key - Messages are only routed to a queue when the message's routing key exactly matches the queue's binding key - Suitable for scenarios requiring precise routing (such as log level distinction: error, warning, info routed to different queues) #### Architecture Diagram: ```mermaid graph TD P[Producer
direct_publish] --> E[Direct Exchange
demo.direct] E -->|routing_key: error| Q1[Queue
demo.direct.queue-error] E -->|routing_key: warning| Q2[Queue
demo.direct.queue-warning] E -->|routing_key: info| Q3[Queue
demo.direct.queue-info] E -->|routing_key: debug| Q3 Q1 --> C1[Error Level Consumer
direct_error_level] Q2 --> C2[Warning Level Consumer
direct_warning_level] Q3 --> C3[Info/Debug Level Consumer
direct_info/debug_level] style P fill:#e1f5fe style E fill:#f3e5f5 style Q1 fill:#ffebee style Q2 fill:#fff3e0 style Q3 fill:#e8f5e8 style C1 fill:#ffebee style C2 fill:#fff3e0 style C3 fill:#e8f5e8 ``` #### Producer Code: ```python async def setup_direct_exchange(exchange_name="demo.direct", queue_prefix="demo.direct.queue-"): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() direct_exchange = await channel.declare_exchange( exchange_name, aio_pika.ExchangeType.DIRECT, durable=True ) # Define queues and corresponding binding keys queue_bindings = [ (f"{queue_prefix}error", ["error"]), # Handle error level messages (f"{queue_prefix}warning", ["warning"]), # Handle warning level messages (f"{queue_prefix}info", ["info", "debug"]) # Handle info and debug level messages ] for queue_name, binding_keys in queue_bindings: queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) for binding_key in binding_keys: await queue.bind(direct_exchange, routing_key=binding_key) print(f"Queue {queue_name} bound to routing keys: {binding_keys}") async def direct_publish(message: str, routing_key: str, exchange_name: str = "demo.direct"): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() exchange = await channel.declare_exchange( exchange_name, aio_pika.ExchangeType.DIRECT, durable=True ) message_obj = aio_pika.Message( body=message.encode("utf-8"), delivery_mode=aio_pika.DeliveryMode.PERSISTENT ) await exchange.publish(message_obj, routing_key=routing_key) print(f"Message sent: {message} (routing key: {routing_key})") ``` #### Consumer Code: ```python async def direct_consumer(queue_name: str, consumer_label: str): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) async def on_message_received(message: aio_pika.IncomingMessage): async with message.process(): message_content = message.body.decode("utf-8") print(f"[{consumer_label} Consumer] Received message:") print(f" Queue name: {queue_name}") print(f" Message content: {message_content}") print(f" Message routing key: {message.routing_key}") print(f" Processing time: {asyncio.get_running_loop().time():.2f}s") # Simulate different processing times for different level messages if "error" in queue_name: await asyncio.sleep(2) elif "warning" in queue_name: await asyncio.sleep(1) elif "info" in queue_name: await asyncio.sleep(0.5) consumer_tag = f"direct_{consumer_label.lower().replace(' ', '_')}_{queue_name}" await queue.consume(on_message_received, consumer_tag=consumer_tag) print(f"[{consumer_label} Consumer] Started, listening to queue: {queue_name} (tag: {consumer_tag})") await asyncio.Future() ``` #### Complete Test Code **Test Run Script:** ```python #!/usr/bin/env python3 """ Run Direct Exchange Test """ import asyncio import sys import os # Add current directory to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from product.direct_publish import setup_direct_exchange, direct_publish from comsumer.direct_consumer import start_all_direct_consumers async def run_direct_exchange_test(): """Run direct exchange test with producer and consumer""" print("=== Running Direct Exchange Test ===") # Start consumer in background consumer_task = asyncio.create_task(start_all_direct_consumers()) # Wait for consumer to start await asyncio.sleep(1) # Setup exchange and publish messages await setup_direct_exchange() test_messages = [ ("System crash, unable to start", "error"), # Route to error queue ("Disk space insufficient", "warning"), # Route to warning queue ("User login successful", "info"), # Route to info queue ("Debug info: Database connection successful", "debug") # Route to info queue ] for msg, routing_key in test_messages: await direct_publish(msg, routing_key) await asyncio.sleep(0.5) # Wait for messages to be processed await asyncio.sleep(3) # Cancel consumer consumer_task.cancel() print("✅ Direct exchange test completed successfully!") if __name__ == "__main__": asyncio.run(run_direct_exchange_test()) ``` #### Test Output: ``` === Running Direct Exchange Test === [Info/Debug Level Consumer] Started, listening to queue: demo.direct.queue-info (tag: direct_info/debug_level_demo.direct.queue-info) [Warning Level Consumer] Started, listening to queue: demo.direct.queue-warning (tag: direct_warning_level_demo.direct.queue-warning) [Error Level Consumer] Started, listening to queue: demo.direct.queue-error (tag: direct_error_level_demo.direct.queue-error) Queue demo.direct.queue-error bound to routing keys: ['error'] Queue demo.direct.queue-warning bound to routing keys: ['warning'] Queue demo.direct.queue-info bound to routing keys: ['info', 'debug'] [Error Level Consumer] Received message: Queue name: demo.direct.queue-error Message content: System crash, unable to start Message routing key: error Processing time: 322774.03s Message sent: System crash, unable to start (routing key: error) [Warning Level Consumer] Received message: Queue name: demo.direct.queue-warning Message content: Disk space insufficient Message routing key: warning Processing time: 322774.54s Message sent: Disk space insufficient (routing key: warning) [Info/Debug Level Consumer] Received message: Queue name: demo.direct.queue-info Message content: User login successful Message routing key: info Processing time: 322775.06s Message sent: User login successful (routing key: info) [Info/Debug Level Consumer] Received message: Queue name: demo.direct.queue-info Message content: Debug info: Database connection successful Message routing key: debug Processing time: 322775.57s Message sent: Debug info: Database connection successful (routing key: debug) ✅ Direct exchange test completed successfully! ``` ### 3.3 Direct Exchange Demo (Load Balancing) #### Implementation Principle: 1. Create multiple queues: Each queue is bound to the same Direct Exchange but uses different routing keys 2. Producer routing strategy: Select a routing key through round-robin, random, or hash based on message characteristics 3. Consumer processing: Each queue corresponds to one or more consumers, each processing assigned messages #### Architecture Diagram: ```mermaid graph TD P[BalancedProducer
Round-robin sending] --> E[Direct Exchange
demo.direct.multi.queue] E -->|routing_key: route.1| Q1[Queue
task.queue.1] E -->|routing_key: route.2| Q2[Queue
task.queue.2] E -->|routing_key: route.3| Q3[Queue
task.queue.3] Q1 --> C1[Consumer 1
multi_consumer_1] Q2 --> C2[Consumer 2
multi_consumer_2] Q3 --> C3[Consumer 3
multi_consumer_3] P -.->|Round-robin algorithm| P style P fill:#e1f5fe style E fill:#f3e5f5 style Q1 fill:#e8f5e8 style Q2 fill:#e8f5e8 style Q3 fill:#e8f5e8 style C1 fill:#fff3e0 style C2 fill:#fff3e0 style C3 fill:#fff3e0 ``` #### Producer Code: ```python class BalancedProducer: def __init__(self, exchange_name="demo.direct.multi.queue", queue_count=3): self.exchange_name = exchange_name self.queue_count = queue_count self.current_index = 0 # Round-robin index async def connect(self): self.connection = await aio_pika.connect_robust(RABBITMQ_URI) self.channel = await self.connection.channel() self.exchange = await self.channel.declare_exchange( self.exchange_name, aio_pika.ExchangeType.DIRECT, durable=True ) async def publish(self, message: str): # Round-robin algorithm: switch to next routing key after each send self.current_index = (self.current_index + 1) % self.queue_count route_key = f"route.{self.current_index + 1}" message_obj = aio_pika.Message( body=message.encode("utf-8"), delivery_mode=aio_pika.DeliveryMode.PERSISTENT ) await self.exchange.publish(message_obj, routing_key=route_key) print(f"Message sent: {message} (routed to {route_key})") ``` #### Consumer Code: ```python async def queue_consumer(queue_name: str, consumer_id: int): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) async def on_message(message: aio_pika.IncomingMessage): async with message.process(): content = message.body.decode("utf-8") print(f"[Consumer {consumer_id}] Processing message: {content}") print(f"[Consumer {consumer_id}] From queue: {queue_name}") print(f"[Consumer {consumer_id}] Routing key: {message.routing_key}") await asyncio.sleep(1) consumer_tag = f"multi_consumer_{consumer_id}_{queue_name}" await queue.consume(on_message, consumer_tag=consumer_tag) print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") await asyncio.Future() ``` #### Complete Test Code **Test Run Script:** ```python #!/usr/bin/env python3 """ Run Multi-Queue Load Balancing Test """ import asyncio import sys import os # Add current directory to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from product.direct_multi_publish import setup_multi_queue_balance, BalancedProducer from comsumer.direct_multi_consumer import start_balanced_consumers async def run_multi_queue_balance_test(): """Run multi-queue load balancing test with producer and consumer""" print("=== Running Multi-Queue Load Balancing Test ===") queue_count = 3 # Start consumer in background consumer_task = asyncio.create_task(start_balanced_consumers(queue_count=queue_count)) # Wait for consumer to start await asyncio.sleep(1) # Setup and publish messages await setup_multi_queue_balance(queue_count=queue_count) producer = BalancedProducer(queue_count=queue_count) await producer.connect() for i in range(10): await producer.publish(f"Task {i + 1}: Multi-queue load balancing test") await asyncio.sleep(0.3) await producer.close() # Wait for messages to be processed await asyncio.sleep(3) # Cancel consumer consumer_task.cancel() print("✅ Multi-queue load balancing test completed successfully!") if __name__ == "__main__": asyncio.run(run_multi_queue_balance_test()) ``` #### Test Output: ``` === Running Multi-Queue Load Balancing Test === [Consumer 2] Started, listening to queue: task.queue.2 (tag: multi_consumer_2_task.queue.2) [Consumer 3] Started, listening to queue: task.queue.3 (tag: multi_consumer_3_task.queue.3) [Consumer 1] Started, listening to queue: task.queue.1 (tag: multi_consumer_1_task.queue.1) Queue task.queue.1 bound to routing key: route.1 Queue task.queue.2 bound to routing key: route.2 Queue task.queue.3 bound to routing key: route.3 [Consumer 2] Processing message: Task 1: Multi-queue load balancing test [Consumer 2] From queue: task.queue.2 [Consumer 2] Routing key: route.2 Message sent: Task 1: Multi-queue load balancing test (routed to route.2) [Consumer 3] Processing message: Task 2: Multi-queue load balancing test [Consumer 3] From queue: task.queue.3 [Consumer 3] Routing key: route.3 Message sent: Task 2: Multi-queue load balancing test (routed to route.3) [Consumer 1] Processing message: Task 3: Multi-queue load balancing test [Consumer 1] From queue: task.queue.1 [Consumer 1] Routing key: route.1 Message sent: Task 3: Multi-queue load balancing test (routed to route.1) Message sent: Task 4: Multi-queue load balancing test (routed to route.2) [Consumer 2] Processing message: Task 4: Multi-queue load balancing test [Consumer 2] From queue: task.queue.2 [Consumer 2] Routing key: route.2 Message sent: Task 5: Multi-queue load balancing test (routed to route.3) [Consumer 3] Processing message: Task 5: Multi-queue load balancing test [Consumer 3] From queue: task.queue.3 [Consumer 3] Routing key: route.3 Message sent: Task 6: Multi-queue load balancing test (routed to route.1) [Consumer 1] Processing message: Task 6: Multi-queue load balancing test [Consumer 1] From queue: task.queue.1 [Consumer 1] Routing key: route.1 Message sent: Task 7: Multi-queue load balancing test (routed to route.2) [Consumer 2] Processing message: Task 7: Multi-queue load balancing test [Consumer 2] From queue: task.queue.2 [Consumer 2] Routing key: route.2 Message sent: Task 8: Multi-queue load balancing test (routed to route.3) [Consumer 3] Processing message: Task 8: Multi-queue load balancing test [Consumer 3] From queue: task.queue.3 [Consumer 3] Routing key: route.3 Message sent: Task 9: Multi-queue load balancing test (routed to route.1) [Consumer 1] Processing message: Task 9: Multi-queue load balancing test [Consumer 1] From queue: task.queue.1 [Consumer 1] Routing key: route.1 Message sent: Task 10: Multi-queue load balancing test (routed to route.2) [Consumer 2] Processing message: Task 10: Multi-queue load balancing test [Consumer 2] From queue: task.queue.2 [Consumer 2] Routing key: route.2 ✅ Multi-queue load balancing test completed successfully! ``` ### 3.4 Topic Exchange Demo #### Core Features: - Routing keys use hierarchical strings (separated by ., such as order.create.user) - Supports two wildcards: - `*`: Matches 1 level (e.g., user.* can match user.login but not user.login.success) - `#`: Matches 0 or more levels (e.g., order.# can match order, order.pay, order.pay.success) #### Architecture Diagram: ```mermaid graph TD P[Producer
topic_publish] --> E[Topic Exchange
demo.topic] E -->|binding: #.critical| Q1[Queue
demo.topic.queue-critical] E -->|binding: order.#| Q2[Queue
demo.topic.queue-order] E -->|binding: user.login.*| Q3[Queue
demo.topic.queue-user.login] Q1 --> C1[CriticalHandler
topic_CriticalHandler] Q2 --> C2[OrderHandler
topic_OrderHandler] Q3 --> C3[UserLoginHandler
topic_UserLoginHandler] subgraph "Routing Key Examples" R1[order.create.critical] -.->|Matches #.critical and order.#| Q1 R1 -.-> Q2 R2[user.login.success] -.->|Matches user.login.*| Q3 R3[system.log.info] -.->|No match| X[Message discarded] end style P fill:#e1f5fe style E fill:#f3e5f5 style Q1 fill:#ffebee style Q2 fill:#e8f5e8 style Q3 fill:#fff3e0 style C1 fill:#ffebee style C2 fill:#e8f5e8 style C3 fill:#fff3e0 style X fill:#f5f5f5 ``` #### Producer Code: ```python async def setup_topic_exchange(exchange_name="demo.topic", queue_prefix="demo.topic.queue-"): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() topic_exchange = await channel.declare_exchange( exchange_name, aio_pika.ExchangeType.TOPIC, durable=True ) # Define queues and corresponding binding keys (supports wildcards) queue_bindings = [ (f"{queue_prefix}critical", ["#.critical"]), # Match any prefix+critical (f"{queue_prefix}order", ["order.#"]), # Match all order-prefixed routing keys (f"{queue_prefix}user.login", ["user.login.*"]) # Match user.login+1 suffix ] for queue_name, binding_keys in queue_bindings: queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) for binding_key in binding_keys: await queue.bind(topic_exchange, routing_key=binding_key) print(f"Queue {queue_name} bound to routing keys: {binding_keys}") async def topic_publish(message: str, routing_key: str, exchange_name: str = "demo.topic"): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() exchange = await channel.declare_exchange( exchange_name, aio_pika.ExchangeType.TOPIC, durable=True ) message_obj = aio_pika.Message( body=message.encode("utf-8"), delivery_mode=aio_pika.DeliveryMode.PERSISTENT ) await exchange.publish(message_obj, routing_key=routing_key) print(f"Message sent: {message} (routing key: {routing_key})") ``` #### Consumer Code: ```python async def topic_consumer(queue_name: str, consumer_id: str): connection = await aio_pika.connect_robust(RABBITMQ_URI) channel = await connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False) async def on_message(message: aio_pika.IncomingMessage): async with message.process(): message_content = message.body.decode("utf-8") print(f"[Consumer {consumer_id}] Received message: {message_content}") print(f"[Consumer {consumer_id}] Message routing key: {message.routing_key}") print(f"[Consumer {consumer_id}] From queue: {queue_name}") await asyncio.sleep(1) consumer_tag = f"topic_{consumer_id}_{queue_name}" await queue.consume(on_message, consumer_tag=consumer_tag) print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") await asyncio.Future() ``` #### Complete Test Code **Test Run Script:** ```python #!/usr/bin/env python3 """ Run Topic Exchange Test """ import asyncio import sys import os # Add current directory to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from product.topic_publish import setup_topic_exchange, topic_publish from comsumer.topic_consumer import start_all_topic_consumers async def run_topic_exchange_test(): """Run topic exchange test with producer and consumer""" print("=== Running Topic Exchange Test ===") # Start consumer in background consumer_task = asyncio.create_task(start_all_topic_consumers()) # Wait for consumer to start await asyncio.sleep(1) # Setup exchange and publish messages await setup_topic_exchange() test_messages = [ ("Order creation failed (critical error)", "order.create.critical"), ("User login successful", "user.login.success"), ("Order payment completed", "order.pay.success"), ("System crash (critical error)", "system.crash.critical"), ("User login failed", "user.login.failed"), ("Normal system log", "system.log.info") # Won't match any binding key, will be discarded ] for msg, routing_key in test_messages: await topic_publish(msg, routing_key) await asyncio.sleep(0.5) # Wait for messages to be processed await asyncio.sleep(3) # Cancel consumer consumer_task.cancel() print("✅ Topic exchange test completed successfully!") if __name__ == "__main__": asyncio.run(run_topic_exchange_test()) ``` #### Test Output: ``` === Running Topic Exchange Test === [Consumer CriticalHandler] Started, listening to queue: demo.topic.queue-critical (tag: topic_CriticalHandler_demo.topic.queue-critical) [Consumer UserLoginHandler] Started, listening to queue: demo.topic.queue-user.login (tag: topic_UserLoginHandler_demo.topic.queue-user.login) [Consumer OrderHandler] Started, listening to queue: demo.topic.queue-order (tag: topic_OrderHandler_demo.topic.queue-order) Queue demo.topic.queue-critical bound to routing keys: ['#.critical'] Queue demo.topic.queue-order bound to routing keys: ['order.#'] Queue demo.topic.queue-user.login bound to routing keys: ['user.login.*'] [Consumer OrderHandler] Received message: Order creation failed (critical error) [Consumer OrderHandler] Message routing key: order.create.critical [Consumer OrderHandler] From queue: demo.topic.queue-order [Consumer CriticalHandler] Received message: Order creation failed (critical error) [Consumer CriticalHandler] Message routing key: order.create.critical [Consumer CriticalHandler] From queue: demo.topic.queue-critical Message sent: Order creation failed (critical error) (routing key: order.create.critical) [Consumer UserLoginHandler] Received message: User login successful [Consumer UserLoginHandler] Message routing key: user.login.success [Consumer UserLoginHandler] From queue: demo.topic.queue-user.login Message sent: User login successful (routing key: user.login.success) [Consumer OrderHandler] Received message: Order payment completed [Consumer OrderHandler] Message routing key: order.pay.success [Consumer OrderHandler] From queue: demo.topic.queue-order Message sent: Order payment completed (routing key: order.pay.success) [Consumer CriticalHandler] Received message: System crash (critical error) [Consumer CriticalHandler] Message routing key: system.crash.critical [Consumer CriticalHandler] From queue: demo.topic.queue-critical Message sent: System crash (critical error) (routing key: system.crash.critical) [Consumer UserLoginHandler] Received message: User login failed [Consumer UserLoginHandler] Message routing key: user.login.failed [Consumer UserLoginHandler] From queue: demo.topic.queue-user.login Message sent: User login failed (routing key: user.login.failed) Message sent: Normal system log (routing key: system.log.info) ✅ Topic exchange test completed successfully! ``` ### 3.5 Reliable RabbitMQ Producer-Consumer #### Reliability Guarantee Mechanisms To ensure message consumption reliability, we need to address the following aspects: **1. Message Persistence** - Exchange persistence: `durable=True` - Queue persistence: `durable=True` - Message persistence: `delivery_mode=PERSISTENT` **2. Message Acknowledgment Mechanism** - Automatic acknowledgment: `async with message.process()` - Manual acknowledgment: `message.ack()` / `message.nack()` - Ensure message acknowledgment only after successful processing **3. Message Idempotency** - Use message ID for deduplication - Database records of processed message IDs - Prevent duplicate processing of the same message **4. Retry Mechanism** - Configurable maximum retry count - Internal consumer retry, avoid message re-queuing **5. Dead Letter Queue** - Complete solution for handling failed messages - Automatic creation of dead letter exchange and queue - Detailed error information recording #### Key Code Implementation **Reliable Producer Code:** ```python class ReliableProducer: """Reliable Message Producer""" def _generate_message_id(self, message_data: Dict[str, Any]) -> str: """Generate message ID for message""" message_type = message_data.get('type', '') content = message_data.get('content', '') # For duplicate_test type messages, generate fixed ID based on content if message_type == 'duplicate_test': import hashlib content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() return f"duplicate_{content_hash[:8]}" else: return f"msg_{asyncio.get_running_loop().time()}" async def publish_reliable_message(self, message_data: Dict[str, Any]) -> bool: """Publish reliable message""" try: # Generate message ID message_id = self._generate_message_id(message_data) # Add message metadata message_data.update({ 'timestamp': datetime.now().isoformat(), 'message_id': message_id }) # Create persistent message message = aio_pika.Message( body=json.dumps(message_data, ensure_ascii=False).encode('utf-8'), delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # Message persistence message_id=message_id, timestamp=datetime.now() ) # Send message and wait for confirmation await self.exchange.publish(message, routing_key="reliable") logger.info(f"[Producer] Message sent: {message_id}") return True except Exception as e: logger.error(f"[Producer] Failed to send message: {e}") return False ``` **Reliable Consumer Code:** ```python class ReliableConsumer: """Reliable Message Consumer""" def __init__(self, queue_name: Optional[str] = None, consumer_name: Optional[str] = None): self.processed_messages: Set[str] = set() # Store processed message IDs async def process_message(self, message: aio_pika.IncomingMessage): """Core message processing logic""" try: # Parse message message_data = json.loads(message.body.decode('utf-8')) message_id = message_data.get('message_id') # Check if message has been processed before (idempotency check) if message_id in self.processed_messages: logger.warning(f"[Consumer-{self.consumer_name}] 🚫 Duplicate message detected, skipping: {message_id}") await message.ack() return logger.info(f"[Consumer-{self.consumer_name}] Starting to process message: {message_id}") # Retry processing message directly success = await self.retry_process_message(message_data, message_id, 0) # Only record processed message ID after successful processing if success: self.processed_messages.add(message_id) await message.ack() logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed and acknowledged") else: # Processing failed, send to dead letter queue await self.send_to_dead_letter_queue(message, message_id, "Processing failed") await message.ack() except Exception as e: logger.error(f"[Consumer-{self.consumer_name}] Failed to process message: {e}") await message.ack() async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool: """Retry processing message directly""" max_retries = config.max_retries last_error = None for attempt in range(max_retries + 1): try: logger.info(f"[Consumer-{self.consumer_name}] Attempting to process message {message_id}, attempt {attempt + 1}") await self.message_handler(message_data) logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed successfully") return True except Exception as e: last_error = str(e) logger.warning(f"[Consumer-{self.consumer_name}] Message {message_id} failed on attempt {attempt + 1}: {e}") if attempt < max_retries: await asyncio.sleep(1) else: logger.error(f"[Consumer-{self.consumer_name}] Message {message_id} failed after {max_retries} retries: {last_error}") return False ``` **Dead Letter Queue Consumer Code:** ```python class DeadLetterConsumer: """Dead Letter Queue Consumer""" async def process_dead_letter_message(self, message: aio_pika.IncomingMessage): """Process dead letter message""" try: # Parse dead letter message dead_letter_data = json.loads(message.body.decode('utf-8')) original_message = dead_letter_data.get('original_message', {}) error_info = dead_letter_data.get('error_info', 'Unknown') message_id = dead_letter_data.get('message_id', 'Unknown') # Print dead letter message information logger.error("=" * 50) logger.error("[Dead Letter Consumer] Received Dead Letter Message:") logger.error(f"[Dead Letter Consumer] Message ID: {message_id}") logger.error(f"[Dead Letter Consumer] Message Content: {json.dumps(original_message, ensure_ascii=False, indent=2)}") logger.error(f"[Dead Letter Consumer] Error Reason: {error_info}") logger.error("=" * 50) # Save to database await self.save_to_database(original_message, error_info, message_id) # Acknowledge dead letter message await message.ack() logger.info(f"[Dead Letter Consumer] Dead letter message {message_id} processed") except Exception as e: logger.error(f"[Dead Letter Consumer] Failed to process dead letter message: {e}") await message.nack(requeue=False) ``` #### Complete Test Code **Test Run Script:** ```python """ RabbitMQ Reliable Messaging Test Module """ import asyncio import logging from reliable_mq import ReliableProducer, ReliableConsumer from reliable_mq.dead_letter_consumer import DeadLetterConsumer from reliable_mq.config import config # Configure logging logging.basicConfig( level=getattr(logging, config.log_level), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) async def run_context_manager_messaging(): """Test reliable messaging using context manager""" logger.info("=== Testing Reliable Messaging with Context Manager ===") # Use async context manager async with ReliableProducer() as producer: async with ReliableConsumer(consumer_name="context_test_consumer") as consumer: async with DeadLetterConsumer() as dead_letter_consumer: # Start consumers (run in background) consumer_task = asyncio.create_task(consumer.start_consuming()) dead_letter_task = asyncio.create_task(dead_letter_consumer.start_consuming()) # Wait for consumers to start await asyncio.sleep(1) # Send test messages test_messages = [ {"content": "Important business message 1", "type": "business"}, {"content": "System notification message 2", "type": "notification"}, {"content": "User action message 3", "type": "user_action"}, {"content": "Duplicate message test", "type": "duplicate_test"}, {"content": "Duplicate message test", "type": "duplicate_test"}, # Duplicate message {"content": "Message that will fail 1", "type": "will_fail"}, # These messages will fail and go to dead letter queue {"content": "Message that will fail 2", "type": "will_fail"}, {"content": "Message that will fail 3", "type": "will_fail"}, ] for msg in test_messages: await producer.publish_reliable_message(msg) await asyncio.sleep(0.5) # Wait for message processing to complete await asyncio.sleep(30) # Cancel tasks consumer_task.cancel() dead_letter_task.cancel() if __name__ == '__main__': asyncio.run(run_context_manager_messaging()) ``` **Configuration File:** ```python """ RabbitMQ Reliable Messaging Configuration Module """ import os from dataclasses import dataclass from typing import Dict, Any @dataclass class Config: """Configuration class""" # RabbitMQ connection configuration rabbitmq_uri: str = "amqp://guest:guest@localhost:5673/" # Exchange and queue configuration exchange_name: str = "reliable.exchange" queue_name: str = "reliable.queue" # Dead letter queue configuration dead_letter_exchange: str = "reliable.dead.letter.exchange" dead_letter_queue: str = "reliable.dead.letter.queue" # Retry configuration max_retries: int = 3 message_ttl: int = 300000 # 5 minutes # QoS configuration prefetch_count: int = 1 # Logging configuration log_level: str = "INFO" def get_connection_config(self) -> Dict[str, Any]: """Get connection configuration""" return { 'uri': self.rabbitmq_uri, 'prefetch_count': self.prefetch_count } def get_dead_letter_config(self) -> Dict[str, str]: """Get dead letter queue configuration""" return { 'dead_letter_exchange': self.dead_letter_exchange, 'dead_letter_queue': self.dead_letter_queue } # Global configuration instance config = Config() ``` **Custom Message Handler Function:** ```python async def default_message_handler(message_data: Dict[str, Any]): """Default message handler function""" await asyncio.sleep(1) # Simulate processing time message_type = message_data.get('type', '') content = message_data.get('content', '') # Simulate business logic processing if message_type == 'will_fail': raise Exception(f"Simulated business processing failure: {content}") logger.info(f"[Consumer] Business logic processing completed: {content}") ``` #### Test Results ``` 2025-09-07 11:25:02,498 - __main__ - INFO - === Testing Reliable Messaging with Context Manager === 2025-09-07 11:25:02,509 - reliable_mq.reliable_producer - INFO - [Producer] Connected, queue: reliable.queue 2025-09-07 11:25:02,513 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Connected, listening to queue: reliable.queue 2025-09-07 11:25:02,518 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Connected 2025-09-07 11:25:03,523 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Starting to process message: msg_323632.845174041 2025-09-07 11:25:03,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message content: {'content': 'Important business message 1', 'type': 'business', 'timestamp': '2025-09-07T11:25:03.519240', 'message_id': 'msg_323632.845174041'} 2025-09-07 11:25:03,524 - reliable_mq.reliable_producer - INFO - [Producer] Message sent: msg_323632.845174041 (type: business, content: Important business message 1) 2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Business logic processing completed: Important business message 1 2025-09-07 11:25:04,524 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed successfully 2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Message msg_323632.845174041 processed and acknowledged 2025-09-07 11:25:04,525 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Current processed message count: 1 2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ================================================== 2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] 🚫 Duplicate message detected, skipping: 2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message ID: duplicate_090f7015 2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Message content: { "content": "Duplicate message test", "type": "duplicate_test", "timestamp": "2025-09-07T11:25:05.546930", "message_id": "duplicate_090f7015" } 2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - [Consumer-context_test_consumer] Total processed messages: 4 2025-09-07 11:25:07,539 - reliable_mq.reliable_consumer - WARNING - ================================================== 2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message msg_323635.377526708 failed after 3 retries: Simulated business processing failure: Message that will fail 1 2025-09-07 11:25:14,551 - reliable_mq.reliable_consumer - ERROR - [Consumer-context_test_consumer] Message sent to dead letter queue: msg_323635.377526708, error: Processing failed 2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ================================================== 2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Received Dead Letter Message: 2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message ID: msg_323635.377526708 2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Message Content: { "content": "Message that will fail 1", "type": "will_fail", "timestamp": "2025-09-07T11:25:06.051557", "message_id": "msg_323635.377526708" } 2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - [Dead Letter Consumer] Error Reason: Processing failed 2025-09-07 11:25:14,560 - reliable_mq.dead_letter_consumer - ERROR - ================================================== 2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] 💾 Dead letter message saved to database: msg_323635.377526708 2025-09-07 11:25:15,064 - reliable_mq.dead_letter_consumer - INFO - [Dead Letter Consumer] Database Record: { "id": "msg_323635.377526708", "original_message": { "content": "Message that will fail 1", "type": "will_fail", "timestamp": "2025-09-07T11:25:06.051557", "message_id": "msg_323635.377526708" }, "error_info": "Processing failed", "created_at": "2025-09-07T11:25:15.064341", "status": "failed" } 2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ================================================== 2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message Statistics: 2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Total Processed: 4 2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - [Consumer-context_test_consumer] Processed Message IDs: ['msg_323632.845174041', 'msg_323633.351571583', 'duplicate_090f7015', 'msg_323633.854272166'] 2025-09-07 11:25:37,583 - reliable_mq.reliable_consumer - INFO - ================================================== ``` #### Architecture Diagram ![img_5.png](assert/img_5.png) ```mermaid graph TD P[ReliableProducer
Reliable Producer] --> E[Reliable Exchange
reliable.exchange] E --> Q[Reliable Queue
reliable.queue] Q --> C[ReliableConsumer
Reliable Consumer] C -->|Processing success| ACK[Message ACK
Message Acknowledgment] C -->|Processing failure| RETRY[Retry Logic
Retry Mechanism] RETRY -->|Retry success| ACK RETRY -->|Retry failure| DLQ[Dead Letter Queue
Dead Letter Queue] DLQ --> DLC[DeadLetterConsumer
Dead Letter Consumer] DLC --> DB[(Database
Database)] subgraph "Reliability Guarantees" PERSIST[Message Persistence
Message Persistence] IDEMPOTENT[Idempotency Check
Idempotency Check] CONFIRM[Publisher Confirmation
Publisher Confirmation] end subgraph "Message Flow" MSG1[Normal message] --> SUCCESS[Processing success] MSG2[Duplicate message] --> SKIP[Skip processing] MSG3[Failed message] --> FAIL[Failed after retry] end style P fill:#e1f5fe style E fill:#f3e5f5 style Q fill:#e8f5e8 style C fill:#fff3e0 style DLQ fill:#ffebee style DLC fill:#ffebee style DB fill:#f3e5f5 style ACK fill:#e8f5e8 style RETRY fill:#fff3e0 ```