140 lines
6.0 KiB
Python
140 lines
6.0 KiB
Python
from common.config.app_settings import app_settings
|
|
from backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber
|
|
from backend.infra.email_handler import EmailHandler
|
|
from common.log.module_logger import ModuleLogger
|
|
|
|
|
|
class EmailMQConsumer:
|
|
@staticmethod
|
|
async def mail_handler(register_key: str, message: dict, args: dict):
|
|
"""Handle both regular email and multi-tenant email messages"""
|
|
try:
|
|
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
|
|
|
|
# examin whether it is a multi-tenant email message
|
|
if "tenant_id" in message and "template_id" in message.get("properties", {}):
|
|
await module_logger.log_info(
|
|
"Processing tenant email message",
|
|
properties={
|
|
"template_id": message.get("properties", {}).get("template_id"),
|
|
"tenant_id": message.get("tenant_id"),
|
|
"destination_count": len(message.get("properties", {}).get("destination_emails", []))
|
|
}
|
|
)
|
|
|
|
# call the tenant email handler
|
|
return await EmailMQConsumer.tenant_mail_handler(register_key, message, args)
|
|
else:
|
|
# this is a regular email message
|
|
await module_logger.log_info(
|
|
"Processing regular email message",
|
|
properties={
|
|
"receiver_id": message.get("receiver_id"),
|
|
"subject": message.get("subject")
|
|
}
|
|
)
|
|
|
|
return await EmailMQConsumer.regular_mail_handler(register_key, message, args)
|
|
|
|
except Exception as e:
|
|
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
|
|
await module_logger.log_error(
|
|
"Failed to process email message",
|
|
properties={
|
|
"message_type": "tenant" if "tenant_id" in message else "regular",
|
|
"error": str(e)
|
|
}
|
|
)
|
|
raise
|
|
|
|
@staticmethod
|
|
async def regular_mail_handler(register_key: str, message: dict, args: dict):
|
|
"""Handle regular email messages"""
|
|
return await EmailHandler().send_email(message)
|
|
|
|
@staticmethod
|
|
async def tenant_mail_handler(register_key: str, message: dict, args: dict):
|
|
"""Handle tenant email messages"""
|
|
try:
|
|
# Get fields from message structure
|
|
tenant_id = message.get("tenant_id") or message.get("properties", {}).get("tenant_id")
|
|
template_id = message.get("properties", {}).get("template_id")
|
|
destination_emails = message.get("properties", {}).get("destination_emails", [])
|
|
sender_emails = message.get("properties", {}).get("sender_emails", [])
|
|
# Use rendered content instead of template properties
|
|
subject = message.get("properties", {}).get("content_subject", "No Subject")
|
|
html_content = message.get("properties", {}).get("content_text", "")
|
|
|
|
email_handler = EmailHandler()
|
|
email_ids = []
|
|
|
|
for recipient_email in destination_emails:
|
|
try:
|
|
email_id = await email_handler.send_tenant_email(
|
|
tenant_id=tenant_id,
|
|
template_id=template_id,
|
|
recipient_email=recipient_email,
|
|
sender_emails=sender_emails,
|
|
subject_properties={"subject": subject},
|
|
body_properties={"html_content": html_content}
|
|
)
|
|
|
|
email_ids.append(email_id)
|
|
|
|
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
|
|
await module_logger.log_info(
|
|
f"Tenant email sent to {recipient_email}",
|
|
properties={
|
|
"tenant_id": tenant_id,
|
|
"template_id": template_id,
|
|
"email_id": email_id,
|
|
"recipient_email": recipient_email
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
|
|
await module_logger.log_error(
|
|
f"Failed to send tenant email to {recipient_email}",
|
|
properties={
|
|
"tenant_id": tenant_id,
|
|
"template_id": template_id,
|
|
"recipient_email": recipient_email,
|
|
"error": str(e)
|
|
}
|
|
)
|
|
|
|
return {
|
|
"email_ids": email_ids,
|
|
"status": "processed" if email_ids else "failed",
|
|
"tenant_id": tenant_id,
|
|
"template_id": template_id
|
|
}
|
|
|
|
except Exception as e:
|
|
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
|
|
await module_logger.log_error(
|
|
"Failed to process tenant email",
|
|
properties={
|
|
"tenant_id": message.get("tenant_id"),
|
|
"template_id": message.get("template_id"),
|
|
"error": str(e)
|
|
}
|
|
)
|
|
raise
|
|
|
|
def __init__(self, mq_client: AsyncMQSubscriber) -> None:
|
|
self.sender_id = app_settings.EMAIL_FROM
|
|
self.email_handler = EmailHandler()
|
|
self.mq_client = mq_client
|
|
|
|
async def register_consumer(self):
|
|
await self.mq_client.register_consumer(
|
|
registry_key=app_settings.EMAIL_FROM,
|
|
callback_method=EmailMQConsumer.mail_handler,
|
|
args={"email_handler": self},
|
|
)
|
|
|
|
async def unregister_consumer(self):
|
|
await self.mq_client.unregister_consumer(registry_key=self.sender_id)
|