rabbitmq-test/product/fanout_publish.py
2025-09-07 10:35:24 +08:00

65 lines
1.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import aio_pika
from config import RABBITMQ_URI
async def setup_fanout_exchange(exchange_name="demo.fanout", queue_name_prefix="demo.fanout.queue-"):
# 建立连接
connection = await aio_pika.connect_robust(
RABBITMQ_URI
)
channel = await connection.channel()
# 1. 声明 Fanout 类型交换器,不存在就创建,存在就复用
fanout_exchange = await channel.declare_exchange(
exchange_name,
aio_pika.ExchangeType.FANOUT,
durable=True # 交换器持久化
)
# 2. 定义需要绑定的队列名称列表
queue_names = [queue_name_prefix + str(i) for i in range(3)]
# 3. 循环创建队列并绑定到交换器
for name in queue_names:
# 声明队列(持久化),不存在就创建,存在就复用
queue = await channel.declare_queue(
name,
durable=True,
auto_delete=False
)
# 绑定队列到 Fanout 交换器(忽略路由键)
await queue.bind(fanout_exchange, routing_key="")
async def fanout_publish(message: str = "", exchange_name: str = "demo.fanout"):
# 建立与RabbitMQ的连接
connection = await aio_pika.connect_robust(
RABBITMQ_URI
)
# 创建通道
channel = await connection.channel()
# 声明一个Fanout类型的交换器
fanout_exchange = await channel.declare_exchange(
exchange_name, # 交换器名称
aio_pika.ExchangeType.FANOUT, # 交换器类型为FANOUT
durable=True # 持久化交换器
)
# 构建消息对象
message = aio_pika.Message(
body=message.encode("utf-8"),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT # 消息持久化
)
# 发送消息到Fanout交换器
# Fanout类型不需要需要指定routing_key即使指定也会被忽略
await fanout_exchange.publish(
message,
routing_key="" # 路由键为空
)
await connection.close()