freeleaps-service-hub/apps/notification/webapi/utils/email_consumer.py

138 lines
5.7 KiB
Python

from common.config.app_settings import app_settings
from backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber
from backend.infra.email_handler import EmailHandler
from common.log.module_logger import ModuleLogger
class EmailMQConsumer:
@staticmethod
async def mail_handler(register_key: str, message: dict, args: dict):
"""Handle both regular email and multi-tenant email messages"""
try:
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
# examin whether it is a multi-tenant email message
if "tenant_id" in message and "template_id" in message:
await module_logger.log_info(
"Processing tenant email message",
properties={
"template_id": message.get("template_id"),
"tenant_id": message.get("tenant_id"),
"destination_count": len(message.get("destination_emails", []))
}
)
# call the tenant email handler
return await EmailMQConsumer.tenant_mail_handler(register_key, message, args)
else:
# this is a regular email message
await module_logger.log_info(
"Processing regular email message",
properties={
"receiver_id": message.get("receiver_id"),
"subject": message.get("subject")
}
)
return await EmailMQConsumer.regular_mail_handler(register_key, message, args)
except Exception as e:
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
await module_logger.log_error(
"Failed to process email message",
properties={
"message_type": "tenant" if "tenant_id" in message else "regular",
"error": str(e)
}
)
raise
@staticmethod
async def regular_mail_handler(register_key: str, message: dict, args: dict):
"""Handle regular email messages"""
return await EmailHandler().send_email(message)
@staticmethod
async def tenant_mail_handler(register_key: str, message: dict, args: dict):
"""Handle tenant email messages"""
try:
tenant_id = message.get("tenant_id")
template_id = message.get("template_id")
destination_emails = message.get("destination_emails", [])
sender_emails = message.get("sender_emails", [])
subject_properties = message.get("subject_properties", {})
body_properties = message.get("body_properties", {})
email_handler = EmailHandler()
email_ids = []
for recipient_email in destination_emails:
try:
email_id = await email_handler.send_tenant_email(
tenant_id=tenant_id,
template_id=template_id,
recipient_email=recipient_email,
sender_emails=sender_emails,
subject_properties=subject_properties,
body_properties=body_properties
)
email_ids.append(email_id)
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
await module_logger.log_info(
f"Tenant email sent to {recipient_email}",
properties={
"tenant_id": tenant_id,
"template_id": template_id,
"email_id": email_id,
"recipient_email": recipient_email
}
)
except Exception as e:
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
await module_logger.log_error(
f"Failed to send tenant email to {recipient_email}",
properties={
"tenant_id": tenant_id,
"template_id": template_id,
"recipient_email": recipient_email,
"error": str(e)
}
)
return {
"email_ids": email_ids,
"status": "processed" if email_ids else "failed",
"tenant_id": tenant_id,
"template_id": template_id
}
except Exception as e:
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
await module_logger.log_error(
"Failed to process tenant email",
properties={
"tenant_id": message.get("tenant_id"),
"template_id": message.get("template_id"),
"error": str(e)
}
)
raise
def __init__(self, mq_client: AsyncMQSubscriber) -> None:
self.sender_id = app_settings.EMAIL_FROM
self.email_handler = EmailHandler()
self.mq_client = mq_client
async def register_consumer(self):
await self.mq_client.register_consumer(
registry_key=app_settings.EMAIL_FROM,
callback_method=EmailMQConsumer.mail_handler,
args={"email_handler": self},
)
async def unregister_consumer(self):
await self.mq_client.unregister_consumer(registry_key=self.sender_id)