51 lines
2.0 KiB
Python
51 lines
2.0 KiB
Python
from infra.log.module_logger import ModuleLogger
|
|
from .async_client import AsyncMQClient
|
|
import aio_pika
|
|
import json
|
|
import asyncio
|
|
|
|
|
|
class AsyncMQPublisher(AsyncMQClient):
|
|
def __init__(self, channel_name: str) -> None:
|
|
super().__init__(channel_name=channel_name)
|
|
self.module_logger = ModuleLogger(sender_id="AsyncMQPublisher")
|
|
|
|
async def publish(self, message: str | object, max_retries: int = 3):
|
|
retries = 0
|
|
while retries < max_retries:
|
|
try:
|
|
if not hasattr(self, "exchange") or self.exchange is None:
|
|
# Ensure the exchange is bound before publishing
|
|
await self.bind()
|
|
|
|
await self.exchange.publish(
|
|
aio_pika.Message(
|
|
bytes(
|
|
(
|
|
message
|
|
if isinstance(message, str)
|
|
else json.dumps(message)
|
|
),
|
|
"utf-8",
|
|
),
|
|
content_type="text/plain",
|
|
),
|
|
self.routing_key,
|
|
)
|
|
return # Exit after successful publish
|
|
except aio_pika.exceptions.ChannelInvalidStateError as e:
|
|
retries += 1
|
|
await self.module_logger.log_exception(
|
|
exception=e,
|
|
text=f"Attempting reconnect and retry {retries}/{max_retries} "
|
|
"for publish ran into ChannelInvalidStateError",
|
|
)
|
|
await asyncio.sleep(2) # Short delay before retrying
|
|
await self.bind() # Rebind to re-establish connection/channel
|
|
|
|
# Log final failure if retries are exhausted
|
|
await self.module_logger.log_exception(
|
|
exception=ConnectionError("Unable to publish after max retries."),
|
|
text=f"Publish failed after {max_retries} retries.",
|
|
)
|