65 lines
1.9 KiB
Python
65 lines
1.9 KiB
Python
import asyncio
|
||
import aio_pika
|
||
|
||
from config import RABBITMQ_URI
|
||
|
||
|
||
async def setup_fanout_exchange(exchange_name="demo.fanout", queue_name_prefix="demo.fanout.queue-"):
|
||
# 建立连接
|
||
connection = await aio_pika.connect_robust(
|
||
RABBITMQ_URI
|
||
)
|
||
channel = await connection.channel()
|
||
|
||
# 1. 声明 Fanout 类型交换器,不存在就创建,存在就复用
|
||
fanout_exchange = await channel.declare_exchange(
|
||
exchange_name,
|
||
aio_pika.ExchangeType.FANOUT,
|
||
durable=True # 交换器持久化
|
||
)
|
||
|
||
# 2. 定义需要绑定的队列名称列表
|
||
queue_names = [queue_name_prefix + str(i) for i in range(3)]
|
||
|
||
# 3. 循环创建队列并绑定到交换器
|
||
for name in queue_names:
|
||
# 声明队列(持久化),不存在就创建,存在就复用
|
||
queue = await channel.declare_queue(
|
||
name,
|
||
durable=True,
|
||
auto_delete=False
|
||
)
|
||
# 绑定队列到 Fanout 交换器(忽略路由键)
|
||
await queue.bind(fanout_exchange, routing_key="")
|
||
|
||
|
||
async def fanout_publish(message: str = "", exchange_name: str = "demo.fanout"):
|
||
# 建立与RabbitMQ的连接
|
||
connection = await aio_pika.connect_robust(
|
||
RABBITMQ_URI
|
||
)
|
||
|
||
# 创建通道
|
||
channel = await connection.channel()
|
||
|
||
# 声明一个Fanout类型的交换器
|
||
fanout_exchange = await channel.declare_exchange(
|
||
exchange_name, # 交换器名称
|
||
aio_pika.ExchangeType.FANOUT, # 交换器类型为FANOUT
|
||
durable=True # 持久化交换器
|
||
)
|
||
|
||
# 构建消息对象
|
||
message = aio_pika.Message(
|
||
body=message.encode("utf-8"),
|
||
delivery_mode=aio_pika.DeliveryMode.PERSISTENT # 消息持久化
|
||
)
|
||
|
||
# 发送消息到Fanout交换器
|
||
# Fanout类型不需要需要指定routing_key,即使指定也会被忽略
|
||
await fanout_exchange.publish(
|
||
message,
|
||
routing_key="" # 路由键为空
|
||
)
|
||
await connection.close()
|