""" RabbitMQ 配置模块 """ import os from typing import Dict, Any class RabbitMQConfig: """RabbitMQ 配置类""" def __init__(self): # 从环境变量或默认值获取配置 self.uri = os.getenv('RABBITMQ_URI', 'amqp://guest:guest@localhost:5673/') self.exchange_name = os.getenv('RABBITMQ_EXCHANGE', 'reliable.exchange') self.queue_name = os.getenv('RABBITMQ_QUEUE', 'reliable.queue') self.dead_letter_exchange = os.getenv('RABBITMQ_DL_EXCHANGE', 'dead_letter_exchange') self.dead_letter_queue = os.getenv('RABBITMQ_DL_QUEUE', 'dead_letter_queue') # 消息配置 self.max_retries = int(os.getenv('RABBITMQ_MAX_RETRIES', '3')) self.message_ttl = int(os.getenv('RABBITMQ_MESSAGE_TTL', '300000')) # 5分钟 self.prefetch_count = int(os.getenv('RABBITMQ_PREFETCH_COUNT', '1')) # 日志配置 self.log_level = os.getenv('RABBITMQ_LOG_LEVEL', 'INFO') def get_connection_config(self) -> Dict[str, Any]: """获取连接配置""" return { 'uri': self.uri, 'prefetch_count': self.prefetch_count } def get_exchange_config(self) -> Dict[str, Any]: """获取交换器配置""" return { 'exchange_name': self.exchange_name, 'dead_letter_exchange': self.dead_letter_exchange } def get_queue_config(self) -> Dict[str, Any]: """获取队列配置""" return { 'queue_name': self.queue_name, 'dead_letter_queue': self.dead_letter_queue, 'message_ttl': self.message_ttl, 'max_retries': self.max_retries } def get_dead_letter_config(self) -> Dict[str, Any]: """获取死信队列配置""" return { 'dead_letter_exchange': self.dead_letter_exchange, 'dead_letter_queue': self.dead_letter_queue, 'dead_letter_routing_key': 'dead_letter' } # 全局配置实例 config = RabbitMQConfig()