#!/usr/bin/env python3 """ Run Topic Exchange Test """ import asyncio import sys import os # Add current directory to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from product.topic_publish import setup_topic_exchange, topic_publish from comsumer.topic_consumer import start_all_topic_consumers async def run_topic_exchange_test(): """Run topic exchange test with producer and consumer""" print("=== Running Topic Exchange Test ===") # Start consumer in background consumer_task = asyncio.create_task(start_all_topic_consumers()) # Wait for consumer to start await asyncio.sleep(1) # Setup exchange and publish messages await setup_topic_exchange() test_messages = [ ("Order creation failed (critical error)", "order.create.critical"), ("User login successful", "user.login.success"), ("Order payment completed", "order.pay.success"), ("System crash (critical error)", "system.crash.critical"), ("User login failed", "user.login.failed"), ("Normal system log", "system.log.info") # Won't match any binding key, will be discarded ] for msg, routing_key in test_messages: await topic_publish(msg, routing_key) await asyncio.sleep(0.5) # Wait for messages to be processed await asyncio.sleep(3) # Cancel consumer consumer_task.cancel() print("✅ Topic exchange test completed successfully!") if __name__ == "__main__": asyncio.run(run_topic_exchange_test())