freeleaps-service-hub/app/notification/backend/infra/rabbitmq/async_publisher.py

51 lines
2.0 KiB
Python

from infra.log.module_logger import ModuleLogger
from .async_client import AsyncMQClient
import aio_pika
import json
import asyncio
class AsyncMQPublisher(AsyncMQClient):
def __init__(self, channel_name: str) -> None:
super().__init__(channel_name=channel_name)
self.module_logger = ModuleLogger(sender_id="AsyncMQPublisher")
async def publish(self, message: str | object, max_retries: int = 3):
retries = 0
while retries < max_retries:
try:
if not hasattr(self, "exchange") or self.exchange is None:
# Ensure the exchange is bound before publishing
await self.bind()
await self.exchange.publish(
aio_pika.Message(
bytes(
(
message
if isinstance(message, str)
else json.dumps(message)
),
"utf-8",
),
content_type="text/plain",
),
self.routing_key,
)
return # Exit after successful publish
except aio_pika.exceptions.ChannelInvalidStateError as e:
retries += 1
await self.module_logger.log_exception(
exception=e,
text=f"Attempting reconnect and retry {retries}/{max_retries} "
"for publish ran into ChannelInvalidStateError",
)
await asyncio.sleep(2) # Short delay before retrying
await self.bind() # Rebind to re-establish connection/channel
# Log final failure if retries are exhausted
await self.module_logger.log_exception(
exception=ConnectionError("Unable to publish after max retries."),
text=f"Publish failed after {max_retries} retries.",
)