diff --git a/apps/notification/webapi/utils/email_consumer.py b/apps/notification/webapi/utils/email_consumer.py index 0473ddc..18893eb 100644 --- a/apps/notification/webapi/utils/email_consumer.py +++ b/apps/notification/webapi/utils/email_consumer.py @@ -1,13 +1,126 @@ from common.config.app_settings import app_settings from backend.infra.rabbitmq.async_subscriber import AsyncMQSubscriber from backend.infra.email_handler import EmailHandler +from common.log.module_logger import ModuleLogger class EmailMQConsumer: @staticmethod async def mail_handler(register_key: str, message: dict, args: dict): + """Handle both regular email and multi-tenant email messages""" + try: + module_logger = ModuleLogger(sender_id="EmailMQConsumer") + + # examin whether it is a multi-tenant email message + if "tenant_id" in message and "template_id" in message: + await module_logger.log_info( + "Processing tenant email message", + properties={ + "template_id": message.get("template_id"), + "tenant_id": message.get("tenant_id"), + "destination_count": len(message.get("destination_emails", [])) + } + ) + + # call the tenant email handler + return await EmailMQConsumer.tenant_mail_handler(register_key, message, args) + else: + # this is a regular email message + await module_logger.log_info( + "Processing regular email message", + properties={ + "receiver_id": message.get("receiver_id"), + "subject": message.get("subject") + } + ) + + return await EmailMQConsumer.regular_mail_handler(register_key, message, args) + + except Exception as e: + module_logger = ModuleLogger(sender_id="EmailMQConsumer") + await module_logger.log_error( + "Failed to process email message", + properties={ + "message_type": "tenant" if "tenant_id" in message else "regular", + "error": str(e) + } + ) + raise + + @staticmethod + async def regular_mail_handler(register_key: str, message: dict, args: dict): + """Handle regular email messages""" return await EmailHandler().send_email(message) + @staticmethod + async def tenant_mail_handler(register_key: str, message: dict, args: dict): + """Handle tenant email messages""" + try: + tenant_id = message.get("tenant_id") + template_id = message.get("template_id") + destination_emails = message.get("destination_emails", []) + sender_emails = message.get("sender_emails", []) + subject_properties = message.get("subject_properties", {}) + body_properties = message.get("body_properties", {}) + + email_handler = EmailHandler() + email_ids = [] + + for recipient_email in destination_emails: + try: + email_id = await email_handler.send_tenant_email( + tenant_id=tenant_id, + template_id=template_id, + recipient_email=recipient_email, + sender_emails=sender_emails, + subject_properties=subject_properties, + body_properties=body_properties + ) + + email_ids.append(email_id) + + module_logger = ModuleLogger(sender_id="EmailMQConsumer") + await module_logger.log_info( + f"Tenant email sent to {recipient_email}", + properties={ + "tenant_id": tenant_id, + "template_id": template_id, + "email_id": email_id, + "recipient_email": recipient_email + } + ) + + except Exception as e: + module_logger = ModuleLogger(sender_id="EmailMQConsumer") + await module_logger.log_error( + f"Failed to send tenant email to {recipient_email}", + properties={ + "tenant_id": tenant_id, + "template_id": template_id, + "recipient_email": recipient_email, + "error": str(e) + } + ) + + return { + "email_ids": email_ids, + "status": "processed" if email_ids else "failed", + "tenant_id": tenant_id, + "template_id": template_id + } + + except Exception as e: + module_logger = ModuleLogger(sender_id="EmailMQConsumer") + await module_logger.log_error( + "Failed to process tenant email", + properties={ + "tenant_id": message.get("tenant_id"), + "template_id": message.get("template_id"), + "error": str(e) + } + ) + raise + def __init__(self, mq_client: AsyncMQSubscriber) -> None: self.sender_id = app_settings.EMAIL_FROM self.email_handler = EmailHandler()