feat(email_comsumer): introduce multi-tenant delivery system
This commit is contained in:
parent
d164b6f567
commit
4187c95743
@ -1,13 +1,126 @@
|
|||||||
from common.config.app_settings import app_settings
|
from common.config.app_settings import app_settings
|
||||||
from backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber
|
from backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber
|
||||||
from backend.infra.email_handler import EmailHandler
|
from backend.infra.email_handler import EmailHandler
|
||||||
|
from common.log.module_logger import ModuleLogger
|
||||||
|
|
||||||
|
|
||||||
class EmailMQConsumer:
|
class EmailMQConsumer:
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def mail_handler(register_key: str, message: dict, args: dict):
|
async def mail_handler(register_key: str, message: dict, args: dict):
|
||||||
|
"""Handle both regular email and multi-tenant email messages"""
|
||||||
|
try:
|
||||||
|
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
|
||||||
|
|
||||||
|
# examin whether it is a multi-tenant email message
|
||||||
|
if "tenant_id" in message and "template_id" in message:
|
||||||
|
await module_logger.log_info(
|
||||||
|
"Processing tenant email message",
|
||||||
|
properties={
|
||||||
|
"template_id": message.get("template_id"),
|
||||||
|
"tenant_id": message.get("tenant_id"),
|
||||||
|
"destination_count": len(message.get("destination_emails", []))
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# call the tenant email handler
|
||||||
|
return await EmailMQConsumer.tenant_mail_handler(register_key, message, args)
|
||||||
|
else:
|
||||||
|
# this is a regular email message
|
||||||
|
await module_logger.log_info(
|
||||||
|
"Processing regular email message",
|
||||||
|
properties={
|
||||||
|
"receiver_id": message.get("receiver_id"),
|
||||||
|
"subject": message.get("subject")
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return await EmailMQConsumer.regular_mail_handler(register_key, message, args)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
|
||||||
|
await module_logger.log_error(
|
||||||
|
"Failed to process email message",
|
||||||
|
properties={
|
||||||
|
"message_type": "tenant" if "tenant_id" in message else "regular",
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def regular_mail_handler(register_key: str, message: dict, args: dict):
|
||||||
|
"""Handle regular email messages"""
|
||||||
return await EmailHandler().send_email(message)
|
return await EmailHandler().send_email(message)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def tenant_mail_handler(register_key: str, message: dict, args: dict):
|
||||||
|
"""Handle tenant email messages"""
|
||||||
|
try:
|
||||||
|
tenant_id = message.get("tenant_id")
|
||||||
|
template_id = message.get("template_id")
|
||||||
|
destination_emails = message.get("destination_emails", [])
|
||||||
|
sender_emails = message.get("sender_emails", [])
|
||||||
|
subject_properties = message.get("subject_properties", {})
|
||||||
|
body_properties = message.get("body_properties", {})
|
||||||
|
|
||||||
|
email_handler = EmailHandler()
|
||||||
|
email_ids = []
|
||||||
|
|
||||||
|
for recipient_email in destination_emails:
|
||||||
|
try:
|
||||||
|
email_id = await email_handler.send_tenant_email(
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
template_id=template_id,
|
||||||
|
recipient_email=recipient_email,
|
||||||
|
sender_emails=sender_emails,
|
||||||
|
subject_properties=subject_properties,
|
||||||
|
body_properties=body_properties
|
||||||
|
)
|
||||||
|
|
||||||
|
email_ids.append(email_id)
|
||||||
|
|
||||||
|
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
|
||||||
|
await module_logger.log_info(
|
||||||
|
f"Tenant email sent to {recipient_email}",
|
||||||
|
properties={
|
||||||
|
"tenant_id": tenant_id,
|
||||||
|
"template_id": template_id,
|
||||||
|
"email_id": email_id,
|
||||||
|
"recipient_email": recipient_email
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
|
||||||
|
await module_logger.log_error(
|
||||||
|
f"Failed to send tenant email to {recipient_email}",
|
||||||
|
properties={
|
||||||
|
"tenant_id": tenant_id,
|
||||||
|
"template_id": template_id,
|
||||||
|
"recipient_email": recipient_email,
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"email_ids": email_ids,
|
||||||
|
"status": "processed" if email_ids else "failed",
|
||||||
|
"tenant_id": tenant_id,
|
||||||
|
"template_id": template_id
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
module_logger = ModuleLogger(sender_id="EmailMQConsumer")
|
||||||
|
await module_logger.log_error(
|
||||||
|
"Failed to process tenant email",
|
||||||
|
properties={
|
||||||
|
"tenant_id": message.get("tenant_id"),
|
||||||
|
"template_id": message.get("template_id"),
|
||||||
|
"error": str(e)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
def __init__(self, mq_client: AsyncMQSubscriber) -> None:
|
def __init__(self, mq_client: AsyncMQSubscriber) -> None:
|
||||||
self.sender_id = app_settings.EMAIL_FROM
|
self.sender_id = app_settings.EMAIL_FROM
|
||||||
self.email_handler = EmailHandler()
|
self.email_handler = EmailHandler()
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user