From 6c71045b3a8de381a99c3c4bbd842ad2d5555565 Mon Sep 17 00:00:00 2001 From: icecheng Date: Sun, 7 Sep 2025 10:55:09 +0800 Subject: [PATCH] --init --- README_TESTS.md | 136 ++++++++++++++++++++++++++ comsumer/direct_consumer.py | 33 +++---- comsumer/direct_multi_consumer.py | 11 ++- comsumer/fanout_consumer.py | 17 ++-- comsumer/topic_consumer.py | 11 ++- help.py | 75 +++++++++++++++ product/direct_multi_publish.py | 4 +- product/direct_publish.py | 4 +- product/topic_publish.py | 4 +- reliable_mq/reliable_consumer.py | 106 ++++++++++---------- reliable_mq/reliable_producer.py | 76 +++++++-------- run_all_tests.py | 113 ++++++++++++++++++++++ run_direct_test.py | 50 ++++++++++ run_fanout_test.py | 42 ++++++++ run_multi_queue_test.py | 50 ++++++++++ run_topic_test.py | 52 ++++++++++ test.py | 154 +++++++++++++++++++++++++----- 17 files changed, 783 insertions(+), 155 deletions(-) create mode 100644 README_TESTS.md create mode 100644 help.py create mode 100644 run_all_tests.py create mode 100644 run_direct_test.py create mode 100644 run_fanout_test.py create mode 100644 run_multi_queue_test.py create mode 100644 run_topic_test.py diff --git a/README_TESTS.md b/README_TESTS.md new file mode 100644 index 0000000..c595462 --- /dev/null +++ b/README_TESTS.md @@ -0,0 +1,136 @@ +# RabbitMQ Test Suite + +This directory contains comprehensive tests for different RabbitMQ exchange types and patterns. + +## Test Files + +### Individual Test Files + +1. **`run_fanout_test.py`** - Tests Fanout Exchange + - Broadcasts messages to all bound queues + - Demonstrates one-to-many messaging pattern + - Run: `python run_fanout_test.py` + +2. **`run_direct_test.py`** - Tests Direct Exchange + - Routes messages based on exact routing key matches + - Demonstrates selective message routing + - Run: `python run_direct_test.py` + +3. **`run_topic_test.py`** - Tests Topic Exchange + - Routes messages using wildcard patterns (* and #) + - Demonstrates hierarchical message routing + - Run: `python run_topic_test.py` + +4. **`run_multi_queue_test.py`** - Tests Multi-Queue Load Balancing + - Distributes messages across multiple queues + - Demonstrates load balancing and parallel processing + - Run: `python run_multi_queue_test.py` + +### Combined Test File + +5. **`test.py`** - Runs all tests sequentially + - Executes all exchange type tests in order + - Run: `python test.py` + +## Test Features + +### Producer and Consumer Coordination +- Each test starts consumers in the background +- Producers send messages after consumers are ready +- Tests demonstrate real-time message processing +- Automatic cleanup and task cancellation + +### Message Patterns Tested + +#### Fanout Exchange +- **Pattern**: One-to-many broadcasting +- **Queues**: 3 queues (demo.fanout.queue-0, demo.fanout.queue-1, demo.fanout.queue-2) +- **Behavior**: All queues receive every message +- **Use Case**: Notifications, announcements, logging + +#### Direct Exchange +- **Pattern**: Exact routing key matching +- **Queues**: error, warning, info (with debug routing to info) +- **Behavior**: Messages routed based on exact routing key +- **Use Case**: Log level routing, priority-based processing + +#### Topic Exchange +- **Pattern**: Wildcard pattern matching +- **Queues**: Critical, Success, Failed +- **Behavior**: Messages routed using * and # wildcards +- **Use Case**: Hierarchical event routing, microservice communication + +#### Multi-Queue Load Balancing +- **Pattern**: Round-robin distribution +- **Queues**: 3 balanced queues +- **Behavior**: Messages distributed evenly across queues +- **Use Case**: Horizontal scaling, parallel processing + +## Running Tests + +### Prerequisites +- RabbitMQ server running on localhost:5673 +- Python 3.7+ with asyncio support +- Required packages: aio-pika + +### Individual Test Execution +```bash +# Test Fanout Exchange +python run_fanout_test.py + +# Test Direct Exchange +python run_direct_test.py + +# Test Topic Exchange +python run_topic_test.py + +# Test Multi-Queue Load Balancing +python run_multi_queue_test.py +``` + +### Run All Tests +```bash +python test.py +``` + +## Test Output + +Each test provides detailed output showing: +- Consumer startup messages +- Message reception and processing +- Queue routing behavior +- Message persistence status +- Test completion status + +## Configuration + +Tests use the configuration from `config.py`: +- RabbitMQ URI: `amqp://guest:guest@localhost:5673/` +- Exchange and queue naming conventions +- Message persistence settings + +## Architecture + +### Producer Side +- Sets up exchanges and queues +- Publishes test messages with appropriate routing keys +- Handles connection management + +### Consumer Side +- Starts multiple consumers for different queues +- Processes messages with simulated business logic +- Demonstrates concurrent message handling + +### Test Coordination +- Uses asyncio tasks for concurrent execution +- Implements proper startup/shutdown sequences +- Ensures clean resource cleanup + +## Extending Tests + +To add new test scenarios: +1. Create a new test file following the naming pattern `run_xxx_test.py` +2. Import appropriate producer and consumer functions +3. Implement the test logic with proper async/await patterns +4. Add consumer startup, message publishing, and cleanup phases +5. Update this README with the new test description diff --git a/comsumer/direct_consumer.py b/comsumer/direct_consumer.py index 713251d..97eec5a 100644 --- a/comsumer/direct_consumer.py +++ b/comsumer/direct_consumer.py @@ -33,12 +33,12 @@ async def direct_consumer(queue_name: str, consumer_label: str): async with message.process(): # 解码消息体(生产者用 utf-8 编码,此处对应解码) message_content = message.body.decode("utf-8") - # 打印关键信息(便于调试和日志跟踪) - print(f"[{consumer_label} 消费者] 收到消息:") - print(f" 队列名称:{queue_name}") - print(f" 消息内容:{message_content}") - print(f" 消息路由键:{message.routing_key}") # 验证路由键是否匹配 - print(f" 处理时间:{asyncio.get_running_loop().time():.2f}s\n") + # Print key information (for debugging and log tracking) + print(f"[{consumer_label} Consumer] Received message:") + print(f" Queue name: {queue_name}") + print(f" Message content: {message_content}") + print(f" Message routing key: {message.routing_key}") # Verify routing key match + print(f" Processing time: {asyncio.get_running_loop().time():.2f}s\n") # 模拟不同级别消息的处理耗时(业务场景可替换为实际逻辑) if "error" in queue_name: @@ -52,9 +52,10 @@ async def direct_consumer(queue_name: str, consumer_label: str): await asyncio.sleep(0.5) # 6. 启动队列监听:将消息处理函数绑定到队列 - await queue.consume(on_message_received) - # 打印启动日志,提示用户消费者已就绪 - print(f"[{consumer_label} 消费者] 已启动,正在监听队列:{queue_name}\n") + consumer_tag = f"direct_{consumer_label.lower().replace(' ', '_')}_{queue_name}" + await queue.consume(on_message_received, consumer_tag=consumer_tag) + # Print startup log, indicating consumer is ready + print(f"[{consumer_label} Consumer] Started, listening to queue: {queue_name} (tag: {consumer_tag})\n") # 7. 保持消费者运行(无限期阻塞,直到手动停止程序) # 若不阻塞,协程会立即结束,消费者会断开连接 @@ -66,14 +67,14 @@ async def start_all_direct_consumers(queue_prefix="demo.direct.queue-"): 启动所有 Direct Exchange 对应的消费者 与生产者 setup_direct_exchange 中的 queue_bindings 完全对应 """ - # 定义要启动的消费者列表(队列名称 + 消费者标签) + # Define list of consumers to start (queue name + consumer label) consumers = [ - # 错误队列:处理路由键为 "error" 的消息 - direct_consumer(f"{queue_prefix}error", "错误级别"), - # 警告队列:处理路由键为 "warning" 的消息 - direct_consumer(f"{queue_prefix}warning", "警告级别"), - # 信息队列:处理路由键为 "info" 和 "debug" 的消息 - direct_consumer(f"{queue_prefix}info", "信息/调试级别") + # Error queue: handles messages with routing key "error" + direct_consumer(f"{queue_prefix}error", "Error Level"), + # Warning queue: handles messages with routing key "warning" + direct_consumer(f"{queue_prefix}warning", "Warning Level"), + # Info queue: handles messages with routing keys "info" and "debug" + direct_consumer(f"{queue_prefix}info", "Info/Debug Level") ] # 同时启动所有消费者(并发运行,互不阻塞) diff --git a/comsumer/direct_multi_consumer.py b/comsumer/direct_multi_consumer.py index acee7a6..ac4032e 100644 --- a/comsumer/direct_multi_consumer.py +++ b/comsumer/direct_multi_consumer.py @@ -29,16 +29,17 @@ async def queue_consumer(queue_name: str, consumer_id: int): # 自动确认消息(处理完成后从队列中删除) async with message.process(): content = message.body.decode("utf-8") - print(f"【消费者{consumer_id}】处理消息:{content}") - print(f"【消费者{consumer_id}】来自队列:{queue_name}") - print(f"【消费者{consumer_id}】路由键:{message.routing_key}\n") + print(f"[Consumer {consumer_id}] Processing message: {content}") + print(f"[Consumer {consumer_id}] From queue: {queue_name}") + print(f"[Consumer {consumer_id}] Routing key: {message.routing_key}\n") # 模拟业务处理耗时(根据实际场景调整) await asyncio.sleep(1) # 开始消费队列消息 - await queue.consume(on_message) - print(f"【消费者{consumer_id}】已启动,正在监听队列:{queue_name}") + consumer_tag = f"multi_consumer_{consumer_id}_{queue_name}" + await queue.consume(on_message, consumer_tag=consumer_tag) + print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") # 保持消费者运行(无限期阻塞) await asyncio.Future() diff --git a/comsumer/fanout_consumer.py b/comsumer/fanout_consumer.py index 5063415..49bc9a1 100644 --- a/comsumer/fanout_consumer.py +++ b/comsumer/fanout_consumer.py @@ -33,20 +33,21 @@ async def fanout_consumer(queue_name: str, consumer_id: int): async with message.process(): # 解码消息体(生产者用 utf-8 编码,此处对应解码) message_content = message.body.decode("utf-8") - # 打印关键信息(清晰展示 Fanout 广播特性:同一消息多队列接收) - print(f"【Fanout 消费者{consumer_id}】收到广播消息:") - print(f" 监听队列:{queue_name}") - print(f" 消息内容:{message_content}") - print(f" 消息持久化状态:{'是' if message.delivery_mode == 2 else '否'}") # 验证持久化 + # Print key information (clearly showing Fanout broadcast feature: same message received by multiple queues) + print(f"[Fanout Consumer {consumer_id}] Received broadcast message:") + print(f" Listening queue: {queue_name}") + print(f" Message content: {message_content}") + print(f" Message persistence: {'Yes' if message.delivery_mode == 2 else 'No'}") # Verify persistence # print(f" 处理时间:{asyncio.get_running_loop().time():.2f}s\n") # 模拟业务处理耗时(根据实际场景替换,如日志存储、通知推送等) await asyncio.sleep(1) # 6. 启动队列监听:将处理逻辑绑定到队列,持续接收广播消息 - await queue.consume(on_message_received) - # 打印启动日志,提示用户消费者就绪 - print(f"【Fanout 消费者{consumer_id}】已启动,正在监听队列:{queue_name}\n") + consumer_tag = f"fanout_consumer_{consumer_id}_{queue_name}" + await queue.consume(on_message_received, consumer_tag=consumer_tag) + # Print startup log, indicating consumer is ready + print(f"[Fanout Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})\n") # 7. 保持消费者运行(无限期阻塞,直到手动停止程序) # 若不阻塞,协程会立即结束,消费者连接会断开 diff --git a/comsumer/topic_consumer.py b/comsumer/topic_consumer.py index 83c50c8..bf1e214 100644 --- a/comsumer/topic_consumer.py +++ b/comsumer/topic_consumer.py @@ -29,16 +29,17 @@ async def topic_consumer(queue_name: str, consumer_id: str): # 自动确认消息(处理完成后从队列中删除) async with message.process(): message_content = message.body.decode("utf-8") - print(f"【消费者{consumer_id}】收到消息: {message_content}") - print(f"【消费者{consumer_id}】消息路由键: {message.routing_key}") - print(f"【消费者{consumer_id}】来自队列: {queue_name}\n") + print(f"[Consumer {consumer_id}] Received message: {message_content}") + print(f"[Consumer {consumer_id}] Message routing key: {message.routing_key}") + print(f"[Consumer {consumer_id}] From queue: {queue_name}\n") # 模拟消息处理耗时(根据实际业务逻辑调整) await asyncio.sleep(1) # 开始消费队列中的消息 - await queue.consume(on_message) - print(f"【消费者{consumer_id}】已启动,正在监听队列: {queue_name}") + consumer_tag = f"topic_{consumer_id}_{queue_name}" + await queue.consume(on_message, consumer_tag=consumer_tag) + print(f"[Consumer {consumer_id}] Started, listening to queue: {queue_name} (tag: {consumer_tag})") # 保持消费者运行(无限期阻塞) await asyncio.Future() diff --git a/help.py b/help.py new file mode 100644 index 0000000..160274e --- /dev/null +++ b/help.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +""" +RabbitMQ Test Help +Shows available test commands and usage information. +""" + +import os +import sys + + +def show_help(): + """Display help information for RabbitMQ tests""" + print("🐰 RabbitMQ Test Suite Help") + print("=" * 50) + + print("\n📋 Available Test Files:") + print("-" * 30) + + test_files = [ + ("run_fanout_test.py", "Test Fanout Exchange (broadcast messaging)"), + ("run_direct_test.py", "Test Direct Exchange (routing by key)"), + ("run_topic_test.py", "Test Topic Exchange (wildcard routing)"), + ("run_multi_queue_test.py", "Test Multi-Queue Load Balancing"), + ("test.py", "Run all tests sequentially"), + ("run_all_tests.py", "Run all individual test files with summary") + ] + + for filename, description in test_files: + exists = "✅" if os.path.exists(filename) else "❌" + print(f"{exists} {filename:<25} - {description}") + + print("\n🚀 Usage Examples:") + print("-" * 20) + print("python run_fanout_test.py # Test fanout exchange") + print("python run_direct_test.py # Test direct exchange") + print("python run_topic_test.py # Test topic exchange") + print("python run_multi_queue_test.py # Test load balancing") + print("python test.py # Run all tests") + print("python run_all_tests.py # Run with detailed summary") + + print("\n📖 Test Patterns:") + print("-" * 20) + print("• Fanout: One-to-many broadcasting") + print("• Direct: Exact routing key matching") + print("• Topic: Wildcard pattern matching (* and #)") + print("• Multi: Round-robin load balancing") + + print("\n⚙️ Prerequisites:") + print("-" * 20) + print("• RabbitMQ server running on localhost:5673") + print("• Python 3.7+ with asyncio support") + print("• aio-pika package installed") + + print("\n📁 File Structure:") + print("-" * 20) + print("product/ - Message producers") + print("comsumer/ - Message consumers") + print("config.py - RabbitMQ configuration") + print("run_*.py - Individual test files") + print("test.py - Combined test runner") + + print("\n🔧 Configuration:") + print("-" * 20) + print("Edit config.py to change:") + print("• RabbitMQ connection URI") + print("• Exchange and queue names") + print("• Message persistence settings") + + print("\n📚 Documentation:") + print("-" * 20) + print("See README_TESTS.md for detailed information") + + +if __name__ == "__main__": + show_help() diff --git a/product/direct_multi_publish.py b/product/direct_multi_publish.py index 3e123cc..4a8f239 100644 --- a/product/direct_multi_publish.py +++ b/product/direct_multi_publish.py @@ -33,7 +33,7 @@ async def setup_multi_queue_balance( # 绑定路由键(每个队列对应唯一路由键) await queue.bind(exchange_name, routing_key=route_key) - print(f"队列 {queue_name} 绑定路由键:{route_key}") + print(f"Queue {queue_name} bound to routing key: {route_key}") await connection.close() @@ -66,7 +66,7 @@ class BalancedProducer: ) await self.exchange.publish(message_obj, routing_key=route_key) - print(f"已发送消息:{message}(路由到 {route_key})") + print(f"Message sent: {message} (routed to {route_key})") async def close(self): """关闭连接""" diff --git a/product/direct_publish.py b/product/direct_publish.py index 3131763..8bfa28f 100644 --- a/product/direct_publish.py +++ b/product/direct_publish.py @@ -39,7 +39,7 @@ async def setup_direct_exchange(exchange_name="demo.direct", queue_prefix="demo. direct_exchange, routing_key=binding_key # Direct交换器需要指定绑定键 ) - print(f"队列 {queue_name} 已绑定路由键: {binding_keys}") + print(f"Queue {queue_name} bound to routing keys: {binding_keys}") await connection.close() @@ -68,6 +68,6 @@ async def direct_publish(message: str, routing_key: str, exchange_name: str = "d message_obj, routing_key=routing_key # 路由键决定消息流向哪个队列 ) - print(f"已发送消息: {message} (路由键: {routing_key})") + print(f"Message sent: {message} (routing key: {routing_key})") await connection.close() diff --git a/product/topic_publish.py b/product/topic_publish.py index 3e5f558..cff2239 100644 --- a/product/topic_publish.py +++ b/product/topic_publish.py @@ -42,7 +42,7 @@ async def setup_topic_exchange(exchange_name="demo.topic", queue_prefix="demo.to topic_exchange, routing_key=binding_key # Topic交换器使用带通配符的绑定键 ) - print(f"队列 {queue_name} 已绑定路由键: {binding_keys}") + print(f"Queue {queue_name} bound to routing keys: {binding_keys}") await connection.close() @@ -71,6 +71,6 @@ async def topic_publish(message: str, routing_key: str, exchange_name: str = "de message_obj, routing_key=routing_key # 路由键为层级化字符串(如"order.create.user") ) - print(f"已发送消息: {message} (路由键: {routing_key})") + print(f"Message sent: {message} (routing key: {routing_key})") await connection.close() diff --git a/reliable_mq/reliable_consumer.py b/reliable_mq/reliable_consumer.py index e2dc064..c58038b 100644 --- a/reliable_mq/reliable_consumer.py +++ b/reliable_mq/reliable_consumer.py @@ -1,5 +1,5 @@ """ -RabbitMQ 可靠消息消费者模块 +RabbitMQ Reliable Message Consumer Module """ import asyncio @@ -15,19 +15,19 @@ logger = logging.getLogger(__name__) class ReliableConsumer: - """可靠消息消费者""" + """Reliable Message Consumer""" def __init__(self, queue_name: Optional[str] = None, consumer_name: Optional[str] = None, message_handler: Optional[Callable] = None): """ - 初始化消费者 + Initialize consumer Args: - queue_name: 队列名称,默认使用配置中的值 - consumer_name: 消费者名称 - message_handler: 自定义消息处理函数 + queue_name: Queue name, defaults to config value + consumer_name: Consumer name + message_handler: Custom message handler function """ self.queue_name = queue_name or config.queue_name self.consumer_name = consumer_name or "reliable_consumer" @@ -35,113 +35,113 @@ class ReliableConsumer: self.connection = None self.channel = None self.queue = None - self.processed_messages: Set[str] = set() # 记录已处理的消息ID,防止重复处理 + self.processed_messages: Set[str] = set() # Store processed message IDs to prevent duplicate processing async def connect(self): - """建立连接""" + """Establish connection""" try: connection_config = config.get_connection_config() self.connection = await aio_pika.connect_robust(connection_config['uri']) self.channel = await self.connection.channel() - # 设置QoS,确保一次只处理一条消息 + # Set QoS to ensure only one message is processed at a time await self.channel.set_qos(prefetch_count=connection_config['prefetch_count']) - # 声明队列(确保队列存在) + # Declare queue (ensure queue exists) self.queue = await self.channel.declare_queue( self.queue_name, durable=True, auto_delete=False ) - logger.info(f"[消费者-{self.consumer_name}] 已连接,监听队列: {self.queue_name}") + logger.info(f"[Consumer-{self.consumer_name}] Connected, listening to queue: {self.queue_name}") except Exception as e: - logger.error(f"[消费者-{self.consumer_name}] 连接失败: {e}") + logger.error(f"[Consumer-{self.consumer_name}] Connection failed: {e}") raise async def process_message(self, message: aio_pika.IncomingMessage): - """处理消息的核心逻辑""" + """Core message processing logic""" try: - # 解析消息 + # Parse message message_data = json.loads(message.body.decode('utf-8')) message_id = message_data.get('message_id') - # 检查是否已经处理过此消息(幂等性检查) + # Check if message has been processed before (idempotency check) if message_id in self.processed_messages: logger.warning("=" * 50) - logger.warning(f"[消费者-{self.consumer_name}] 🚫 检测到重复消息,跳过处理:") - logger.warning(f"[消费者-{self.consumer_name}] 消息ID: {message_id}") - logger.warning(f"[消费者-{self.consumer_name}] 消息内容: {json.dumps(message_data, ensure_ascii=False, indent=2)}") - logger.warning(f"[消费者-{self.consumer_name}] 已处理消息总数: {len(self.processed_messages)}") + logger.warning(f"[Consumer-{self.consumer_name}] 🚫 Duplicate message detected, skipping:") + logger.warning(f"[Consumer-{self.consumer_name}] Message ID: {message_id}") + logger.warning(f"[Consumer-{self.consumer_name}] Message content: {json.dumps(message_data, ensure_ascii=False, indent=2)}") + logger.warning(f"[Consumer-{self.consumer_name}] Total processed messages: {len(self.processed_messages)}") logger.warning("=" * 50) await message.ack() return - logger.info(f"[消费者-{self.consumer_name}] 开始处理消息: {message_id}") - logger.info(f"[消费者-{self.consumer_name}] 消息内容: {message_data}") + logger.info(f"[Consumer-{self.consumer_name}] Starting to process message: {message_id}") + logger.info(f"[Consumer-{self.consumer_name}] Message content: {message_data}") - # 直接重试处理消息 + # Retry processing message directly success = await self.retry_process_message(message_data, message_id, 0) - # 只有在处理成功后才记录已处理的消息ID + # Only record processed message ID after successful processing if success: self.processed_messages.add(message_id) - # 确认消息 + # Acknowledge message await message.ack() - logger.info(f"[消费者-{self.consumer_name}] 消息 {message_id} 处理完成并确认") - logger.info(f"[消费者-{self.consumer_name}] 当前已处理消息数量: {len(self.processed_messages)}") + logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed and acknowledged") + logger.info(f"[Consumer-{self.consumer_name}] Current processed message count: {len(self.processed_messages)}") else: - # 处理失败,不记录消息ID,发送到死信队列 - await self.send_to_dead_letter_queue(message, message_id, "处理失败") - await message.ack() # 确认消息以避免无限重试 + # Processing failed, don't record message ID, send to dead letter queue + await self.send_to_dead_letter_queue(message, message_id, "Processing failed") + await message.ack() # Acknowledge message to avoid infinite retry except Exception as e: - logger.error(f"[消费者-{self.consumer_name}] 处理消息失败: {e}") - # 直接发送到死信队列,包含错误信息 + logger.error(f"[Consumer-{self.consumer_name}] Failed to process message: {e}") + # Send directly to dead letter queue with error information message_data = json.loads(message.body.decode('utf-8')) message_id = message_data.get('message_id') await self.send_to_dead_letter_queue(message, message_id, str(e)) - await message.ack() # 确认消息以避免无限重试 + await message.ack() # Acknowledge message to avoid infinite retry async def default_message_handler(self, message_data: Dict[str, Any]): - """默认消息处理函数""" - # 模拟处理时间 + """Default message handler function""" + # Simulate processing time await asyncio.sleep(1) - # 根据消息类型决定是否失败 + # Decide whether to fail based on message type message_type = message_data.get('type', '') if message_type == 'will_fail': - # 特定类型的消息总是失败,用于测试死信队列 - raise Exception(f"模拟业务处理失败: {message_data.get('content', '')}") + # Specific type of messages always fail, used for testing dead letter queue + raise Exception(f"Simulated business processing failure: {message_data.get('content', '')}") else: pass - logger.info(f"[消费者-{self.consumer_name}] 业务逻辑处理完成: {message_data.get('content', '')}") + logger.info(f"[Consumer-{self.consumer_name}] Business logic processing completed: {message_data.get('content', '')}") async def retry_process_message(self, message_data: Dict[str, Any], message_id: str, retry_count: int) -> bool: - """直接重试处理消息""" + """Retry processing message directly""" max_retries = config.max_retries last_error = None for attempt in range(max_retries + 1): try: - logger.info(f"[消费者-{self.consumer_name}] 尝试处理消息 {message_id},第 {attempt + 1} 次") + logger.info(f"[Consumer-{self.consumer_name}] Attempting to process message {message_id}, attempt {attempt + 1}") await self.message_handler(message_data) - logger.info(f"[消费者-{self.consumer_name}] 消息 {message_id} 处理成功") - return True # 处理成功,返回True + logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} processed successfully") + return True # Processing successful, return True except Exception as e: last_error = str(e) - logger.warning(f"[消费者-{self.consumer_name}] 消息 {message_id} 第 {attempt + 1} 次处理失败: {e}") + logger.warning(f"[Consumer-{self.consumer_name}] Message {message_id} failed on attempt {attempt + 1}: {e}") if attempt < max_retries: - # 等待一段时间后重试 + # Wait for a while before retrying await asyncio.sleep(1) else: - # 所有重试都失败,返回False - logger.error(f"[消费者-{self.consumer_name}] 消息 {message_id} 重试 {max_retries} 次后仍然失败: {last_error}") + # All retries failed, return False + logger.error(f"[Consumer-{self.consumer_name}] Message {message_id} failed after {max_retries} retries: {last_error}") return False async def send_to_dead_letter_queue(self, message: aio_pika.IncomingMessage, message_id: str, @@ -161,9 +161,9 @@ class ReliableConsumer: 'queue_name': self.queue_name } - logger.error(f"[消费者-{self.consumer_name}] 消息发送到死信队列: {message_id}, 错误: {error_info}") + logger.error(f"[Consumer-{self.consumer_name}] Message sent to dead letter queue: {message_id}, error: {error_info}") - # 创建死信交换器和队列 + # Create dead letter exchange and queue dead_letter_config = config.get_dead_letter_config() dead_letter_exchange = await self.channel.declare_exchange( dead_letter_config['dead_letter_exchange'], @@ -182,24 +182,24 @@ class ReliableConsumer: routing_key=dead_letter_config['dead_letter_routing_key'] ) - # 创建死信消息 + # Create dead letter message dead_letter_message = aio_pika.Message( body=json.dumps(dead_letter_data, ensure_ascii=False).encode('utf-8'), delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=f"dead_letter_{message_id}" ) - # 发送到死信队列 + # Send to dead letter queue await dead_letter_exchange.publish( dead_letter_message, routing_key=dead_letter_config['dead_letter_routing_key'] ) - logger.info(f"[消费者-{self.consumer_name}] 消息 {message_id} 已发送到死信队列") + logger.info(f"[Consumer-{self.consumer_name}] Message {message_id} sent to dead letter queue") except Exception as e: - logger.error(f"[消费者-{self.consumer_name}] 发送到死信队列失败: {e}") - logger.error(f"[消费者-{self.consumer_name}] 原始消息内容: {message.body.decode('utf-8') if message.body else 'None'}") + logger.error(f"[Consumer-{self.consumer_name}] Failed to send to dead letter queue: {e}") + logger.error(f"[Consumer-{self.consumer_name}] Original message content: {message.body.decode('utf-8') if message.body else 'None'}") async def start_consuming(self): """开始消费消息""" diff --git a/reliable_mq/reliable_producer.py b/reliable_mq/reliable_producer.py index 644b5c3..dcb7a44 100644 --- a/reliable_mq/reliable_producer.py +++ b/reliable_mq/reliable_producer.py @@ -1,5 +1,5 @@ """ -RabbitMQ 可靠消息生产者模块 +RabbitMQ Reliable Message Producer Module """ import asyncio @@ -15,17 +15,17 @@ logger = logging.getLogger(__name__) class ReliableProducer: - """可靠消息生产者""" + """Reliable Message Producer""" def __init__(self, exchange_name: Optional[str] = None, queue_name: Optional[str] = None): """ - 初始化生产者 + Initialize producer Args: - exchange_name: 交换器名称,默认使用配置中的值 - queue_name: 队列名称,默认使用配置中的值 + exchange_name: Exchange name, defaults to config value + queue_name: Queue name, defaults to config value """ self.exchange_name = exchange_name or config.exchange_name self.queue_name = queue_name or config.queue_name @@ -35,118 +35,118 @@ class ReliableProducer: self.queue = None async def connect(self): - """建立连接并设置确认机制""" + """Establish connection and setup confirmation mechanism""" try: - # 使用 robust 连接,支持自动重连 + # Use robust connection with auto-reconnect support connection_config = config.get_connection_config() self.connection = await aio_pika.connect_robust(connection_config['uri']) self.channel = await self.connection.channel() - # 开启发布确认机制 - 确保消息成功发送到队列 + # Enable publisher confirmations - ensure messages are successfully sent to queue await self.channel.set_qos(prefetch_count=connection_config['prefetch_count']) - # 声明持久化交换器 + # Declare durable exchange self.exchange = await self.channel.declare_exchange( self.exchange_name, aio_pika.ExchangeType.DIRECT, - durable=True # 交换器持久化 + durable=True # Exchange persistence ) - # 声明持久化队列 + # Declare durable queue self.queue = await self.channel.declare_queue( self.queue_name, - durable=True, # 队列持久化 - auto_delete=False, # 队列不自动删除 + durable=True, # Queue persistence + auto_delete=False, # Queue not auto-deleted ) - # 绑定队列到交换器 + # Bind queue to exchange await self.queue.bind(self.exchange, routing_key="reliable") - logger.info(f"[生产者] 已连接,队列: {self.queue_name}") + logger.info(f"[Producer] Connected, queue: {self.queue_name}") except Exception as e: - logger.error(f"[生产者] 连接失败: {e}") + logger.error(f"[Producer] Connection failed: {e}") raise def _generate_message_id(self, message_data: Dict[str, Any]) -> str: """ - 为消息生成消息ID - 对于 duplicate_test 类型的消息,生成固定的ID用于测试幂等性 + Generate message ID for message + For duplicate_test type messages, generate fixed ID for idempotency testing Args: - message_data: 消息数据字典 + message_data: Message data dictionary Returns: - str: 消息ID + str: Message ID """ message_type = message_data.get('type', '') content = message_data.get('content', '') - # 对于 duplicate_test 类型的消息,基于内容生成固定ID + # For duplicate_test type messages, generate fixed ID based on content if message_type == 'duplicate_test': - # 使用内容生成固定的消息ID + # Use content to generate fixed message ID import hashlib content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() return f"duplicate_{content_hash[:8]}" else: - # 其他消息使用时间戳生成唯一ID + # Other messages use timestamp to generate unique ID return f"msg_{asyncio.get_running_loop().time()}" async def publish_reliable_message(self, message_data: Dict[str, Any]) -> bool: """ - 发送可靠消息 + Publish reliable message Args: - message_data: 消息数据字典 + message_data: Message data dictionary Returns: - bool: 发送是否成功 + bool: Whether sending was successful """ try: - # 生成消息ID + # Generate message ID message_id = self._generate_message_id(message_data) - # 添加消息元数据 + # Add message metadata message_data.update({ 'timestamp': datetime.now().isoformat(), 'message_id': message_id }) - # 创建持久化消息 + # Create persistent message message = aio_pika.Message( body=json.dumps(message_data, ensure_ascii=False).encode('utf-8'), - delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # 消息持久化 + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, # Message persistence message_id=message_id, timestamp=datetime.now() ) - # 发送消息并等待确认 + # Send message and wait for confirmation await self.exchange.publish( message, routing_key="reliable" ) - logger.info(f"[生产者] 消息已发送: {message_id} (类型: {message_data.get('type', 'N/A')}, 内容: {message_data.get('content', 'N/A')})") + logger.info(f"[Producer] Message sent: {message_id} (type: {message_data.get('type', 'N/A')}, content: {message_data.get('content', 'N/A')})") return True except Exception as e: - logger.error(f"[生产者] 发送消息失败: {e}") + logger.error(f"[Producer] Failed to send message: {e}") return False async def close(self): - """关闭连接""" + """Close connection""" try: if self.connection: await self.connection.close() - logger.info("[生产者] 连接已关闭") + logger.info("[Producer] Connection closed") except Exception as e: - logger.error(f"[生产者] 关闭连接时出错: {e}") + logger.error(f"[Producer] Error closing connection: {e}") async def __aenter__(self): - """异步上下文管理器入口""" + """Async context manager entry""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): - """异步上下文管理器出口""" + """Async context manager exit""" await self.close() diff --git a/run_all_tests.py b/run_all_tests.py new file mode 100644 index 0000000..e9b68f0 --- /dev/null +++ b/run_all_tests.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +""" +Run All RabbitMQ Tests +This script runs all individual test files in sequence. +""" + +import asyncio +import sys +import os +import subprocess + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + + +def run_test_file(test_file): + """Run a test file and return success status""" + try: + print(f"\n{'='*60}") + print(f"Running {test_file}") + print(f"{'='*60}") + + result = subprocess.run([sys.executable, test_file], + capture_output=True, + text=True, + timeout=30) + + if result.returncode == 0: + print(f"✅ {test_file} completed successfully") + if result.stdout: + print("Output:") + print(result.stdout) + return True + else: + print(f"❌ {test_file} failed with return code {result.returncode}") + if result.stderr: + print("Error:") + print(result.stderr) + return False + + except subprocess.TimeoutExpired: + print(f"⏰ {test_file} timed out after 30 seconds") + return False + except Exception as e: + print(f"💥 {test_file} failed with exception: {e}") + return False + + +def main(): + """Main function to run all tests""" + print("🚀 Starting RabbitMQ Test Suite") + print("This will run all individual test files in sequence.") + + # List of test files to run + test_files = [ + "run_fanout_test.py", + "run_direct_test.py", + "run_topic_test.py", + "run_multi_queue_test.py" + ] + + # Check if test files exist + missing_files = [] + for test_file in test_files: + if not os.path.exists(test_file): + missing_files.append(test_file) + + if missing_files: + print(f"❌ Missing test files: {missing_files}") + sys.exit(1) + + # Run all tests + results = [] + for test_file in test_files: + success = run_test_file(test_file) + results.append((test_file, success)) + + # Wait between tests + if test_file != test_files[-1]: # Don't wait after last test + print("\n⏳ Waiting 2 seconds before next test...") + import time + time.sleep(2) + + # Print summary + print(f"\n{'='*60}") + print("TEST SUMMARY") + print(f"{'='*60}") + + passed = 0 + failed = 0 + + for test_file, success in results: + status = "✅ PASSED" if success else "❌ FAILED" + print(f"{test_file:<25} {status}") + if success: + passed += 1 + else: + failed += 1 + + print(f"\nTotal: {len(results)} tests") + print(f"Passed: {passed}") + print(f"Failed: {failed}") + + if failed == 0: + print("\n🎉 All tests passed!") + sys.exit(0) + else: + print(f"\n💥 {failed} test(s) failed!") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/run_direct_test.py b/run_direct_test.py new file mode 100644 index 0000000..3f9effe --- /dev/null +++ b/run_direct_test.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +""" +Run Direct Exchange Test +""" + +import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from product.direct_publish import setup_direct_exchange, direct_publish +from comsumer.direct_consumer import start_all_direct_consumers + + +async def run_direct_exchange_test(): + """Run direct exchange test with producer and consumer""" + print("=== Running Direct Exchange Test ===") + + # Start consumer in background + consumer_task = asyncio.create_task(start_all_direct_consumers()) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup exchange and publish messages + await setup_direct_exchange() + + test_messages = [ + ("System crash, unable to start", "error"), # Route to error queue + ("Disk space insufficient", "warning"), # Route to warning queue + ("User login successful", "info"), # Route to info queue + ("Debug info: Database connection successful", "debug") # Route to info queue + ] + + for msg, routing_key in test_messages: + await direct_publish(msg, routing_key) + await asyncio.sleep(0.5) + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Direct exchange test completed successfully!") + + +if __name__ == "__main__": + asyncio.run(run_direct_exchange_test()) diff --git a/run_fanout_test.py b/run_fanout_test.py new file mode 100644 index 0000000..590fe83 --- /dev/null +++ b/run_fanout_test.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +""" +Run Fanout Exchange Test +""" + +import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from product.fanout_publish import fanout_publish, setup_fanout_exchange +from comsumer.fanout_consumer import start_all_fanout_consumers + + +async def run_fanout_test(): + """Run fanout exchange test with producer and consumer""" + print("=== Running Fanout Exchange Test ===") + + # Start consumer in background + consumer_task = asyncio.create_task(start_all_fanout_consumers()) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup and publish messages + await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-") + await fanout_publish(message="hello world", exchange_name="demo.fanout") + await fanout_publish(message="test message 2", exchange_name="demo.fanout") + await fanout_publish(message="test message 3", exchange_name="demo.fanout") + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Fanout test completed successfully!") + + +if __name__ == "__main__": + asyncio.run(run_fanout_test()) diff --git a/run_multi_queue_test.py b/run_multi_queue_test.py new file mode 100644 index 0000000..03b6211 --- /dev/null +++ b/run_multi_queue_test.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +""" +Run Multi-Queue Load Balancing Test +""" + +import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from product.direct_multi_publish import setup_multi_queue_balance, BalancedProducer +from comsumer.direct_multi_consumer import start_balanced_consumers + + +async def run_multi_queue_balance_test(): + """Run multi-queue load balancing test with producer and consumer""" + print("=== Running Multi-Queue Load Balancing Test ===") + + queue_count = 3 + + # Start consumer in background + consumer_task = asyncio.create_task(start_balanced_consumers(queue_count=queue_count)) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup and publish messages + await setup_multi_queue_balance(queue_count=queue_count) + + producer = BalancedProducer(queue_count=queue_count) + await producer.connect() + + for i in range(10): + await producer.publish(f"Task {i + 1}: Multi-queue load balancing test") + await asyncio.sleep(0.3) + + await producer.close() + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Multi-queue load balancing test completed successfully!") + + +if __name__ == "__main__": + asyncio.run(run_multi_queue_balance_test()) diff --git a/run_topic_test.py b/run_topic_test.py new file mode 100644 index 0000000..7a91456 --- /dev/null +++ b/run_topic_test.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 +""" +Run Topic Exchange Test +""" + +import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from product.topic_publish import setup_topic_exchange, topic_publish +from comsumer.topic_consumer import start_all_topic_consumers + + +async def run_topic_exchange_test(): + """Run topic exchange test with producer and consumer""" + print("=== Running Topic Exchange Test ===") + + # Start consumer in background + consumer_task = asyncio.create_task(start_all_topic_consumers()) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup exchange and publish messages + await setup_topic_exchange() + + test_messages = [ + ("Order creation failed (critical error)", "order.create.critical"), + ("User login successful", "user.login.success"), + ("Order payment completed", "order.pay.success"), + ("System crash (critical error)", "system.crash.critical"), + ("User login failed", "user.login.failed"), + ("Normal system log", "system.log.info") # Won't match any binding key, will be discarded + ] + + for msg, routing_key in test_messages: + await topic_publish(msg, routing_key) + await asyncio.sleep(0.5) + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Topic exchange test completed successfully!") + + +if __name__ == "__main__": + asyncio.run(run_topic_exchange_test()) diff --git a/test.py b/test.py index 1351608..657c33e 100644 --- a/test.py +++ b/test.py @@ -1,59 +1,165 @@ import asyncio +import sys +import os + +# Add current directory to Python path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from product.direct_multi_publish import setup_multi_queue_balance, BalancedProducer from product.direct_publish import setup_direct_exchange, direct_publish from product.fanout_publish import fanout_publish, setup_fanout_exchange from product.topic_publish import setup_topic_exchange, topic_publish +from comsumer.direct_consumer import start_all_direct_consumers +from comsumer.fanout_consumer import start_all_fanout_consumers +from comsumer.topic_consumer import start_all_topic_consumers +from comsumer.direct_multi_consumer import start_balanced_consumers -async def test_fanout_publish(): +async def run_fanout_test(): + """Run fanout exchange test with producer and consumer""" + print("=== Running Fanout Exchange Test ===") + + # Start consumer in background + consumer_task = asyncio.create_task(start_all_fanout_consumers()) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup and publish messages await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-") await fanout_publish(message="hello world", exchange_name="demo.fanout") + await fanout_publish(message="test message 2", exchange_name="demo.fanout") + await fanout_publish(message="test message 3", exchange_name="demo.fanout") + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Fanout test completed successfully!") -async def test_direct_exchange(): - """测试Direct交换器的消息发送""" - # 1. 初始化Direct交换器和队列绑定 +async def run_direct_exchange_test(): + """Run direct exchange test with producer and consumer""" + print("=== Running Direct Exchange Test ===") + + # Start consumer in background + consumer_task = asyncio.create_task(start_all_direct_consumers()) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup exchange and publish messages await setup_direct_exchange() - - # 2. 发送不同路由键的测试消息 + test_messages = [ - ("系统崩溃,无法启动", "error"), # 路由到error队列 - ("磁盘空间不足", "warning"), # 路由到warning队列 - ("用户登录成功", "info"), # 路由到info队列 - ("调试信息:数据库连接成功", "debug") # 路由到info队列(因为info队列绑定了debug键) + ("System crash, unable to start", "error"), # Route to error queue + ("Disk space insufficient", "warning"), # Route to warning queue + ("User login successful", "info"), # Route to info queue + ("Debug info: Database connection successful", "debug") # Route to info queue ] for msg, routing_key in test_messages: await direct_publish(msg, routing_key) + await asyncio.sleep(0.5) + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Direct exchange test completed successfully!") -async def test_topic_exchange(): - """测试Topic交换器的通配符路由功能""" - # 1. 初始化Topic交换器和队列绑定 +async def run_topic_exchange_test(): + """Run topic exchange test with producer and consumer""" + print("=== Running Topic Exchange Test ===") + + # Start consumer in background + consumer_task = asyncio.create_task(start_all_topic_consumers()) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup exchange and publish messages await setup_topic_exchange() - - # 2. 发送不同路由键的测试消息(体现层级化路由) + test_messages = [ - ("订单创建失败(严重错误)", "order.create.critical"), - ("用户登录成功", "user.login.success"), - ("订单支付完成", "order.pay.success"), - ("系统崩溃(严重错误)", "system.crash.critical"), - ("用户登录失败", "user.login.failed"), - ("普通系统日志", "system.log.info") # 不匹配任何绑定键,会被丢弃 + ("Order creation failed (critical error)", "order.create.critical"), + ("User login successful", "user.login.success"), + ("Order payment completed", "order.pay.success"), + ("System crash (critical error)", "system.crash.critical"), + ("User login failed", "user.login.failed"), + ("Normal system log", "system.log.info") # Won't match any binding key, will be discarded ] for msg, routing_key in test_messages: await topic_publish(msg, routing_key) + await asyncio.sleep(0.5) + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Topic exchange test completed successfully!") -async def test_multi_queue_balance(): +async def run_multi_queue_balance_test(): + """Run multi-queue load balancing test with producer and consumer""" + print("=== Running Multi-Queue Load Balancing Test ===") + queue_count = 3 + + # Start consumer in background + consumer_task = asyncio.create_task(start_balanced_consumers(queue_count=queue_count)) + + # Wait for consumer to start + await asyncio.sleep(1) + + # Setup and publish messages await setup_multi_queue_balance(queue_count=queue_count) - + producer = BalancedProducer(queue_count=queue_count) await producer.connect() + for i in range(10): - await producer.publish(f"任务{i + 1}:多队列负载均衡测试") + await producer.publish(f"Task {i + 1}: Multi-queue load balancing test") await asyncio.sleep(0.3) + await producer.close() + + # Wait for messages to be processed + await asyncio.sleep(3) + + # Cancel consumer + consumer_task.cancel() + print("✅ Multi-queue load balancing test completed successfully!") + + +async def main(): + """Main function to run all tests""" + print("🚀 Starting RabbitMQ Tests...") + + try: + # Run all tests + await run_fanout_test() + await asyncio.sleep(1) + + await run_direct_exchange_test() + await asyncio.sleep(1) + + await run_topic_exchange_test() + await asyncio.sleep(1) + + await run_multi_queue_balance_test() + + print("\n🎉 All tests completed successfully!") + + except Exception as e: + print(f"❌ Test failed: {e}") + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main())