rabbitmq-test/run_fanout_test.py
2025-09-07 10:55:19 +08:00

43 lines
1.2 KiB
Python

#!/usr/bin/env python3
"""
Run Fanout Exchange Test
"""
import asyncio
import sys
import os
# Add current directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from product.fanout_publish import fanout_publish, setup_fanout_exchange
from comsumer.fanout_consumer import start_all_fanout_consumers
async def run_fanout_test():
"""Run fanout exchange test with producer and consumer"""
print("=== Running Fanout Exchange Test ===")
# Start consumer in background
consumer_task = asyncio.create_task(start_all_fanout_consumers())
# Wait for consumer to start
await asyncio.sleep(1)
# Setup and publish messages
await setup_fanout_exchange("demo.fanout", "demo.fanout.queue-")
await fanout_publish(message="hello world", exchange_name="demo.fanout")
await fanout_publish(message="test message 2", exchange_name="demo.fanout")
await fanout_publish(message="test message 3", exchange_name="demo.fanout")
# Wait for messages to be processed
await asyncio.sleep(3)
# Cancel consumer
consumer_task.cancel()
print("✅ Fanout test completed successfully!")
if __name__ == "__main__":
asyncio.run(run_fanout_test())