freeleaps-service-hub/apps/notification/backend/business/tenant_notification_manager.py

214 lines
9.2 KiB
Python

from typing import Dict, List, Optional
from datetime import datetime
from common.log.module_logger import ModuleLogger
from common.exception.exceptions import InvalidDataError
from backend.business.notification_manager import NotificationManager
from backend.models.constants import NotificationChannel, NotificationMessage
from backend.services.email.email_validation_service import EmailValidationService
from backend.services.email.email_bounce_service import EmailBounceService
from backend.services.email.email_spam_protection_service import EmailSpamProtectionService
from backend.services.notification_publisher_service import NotificationPublisherService
from backend.services.email.email_status_service import EmailStatusService
class TenantNotificationManager:
def __init__(self):
self.notification_manager = NotificationManager()
self.email_publisher = NotificationPublisherService(channel=NotificationChannel.EMAIL)
self.email_validation_service = EmailValidationService()
self.email_bounce_service = EmailBounceService()
self.email_spam_protection_service = EmailSpamProtectionService()
self.email_status_service = EmailStatusService()
self.module_logger = ModuleLogger(sender_id="TenantNotificationManager")
async def send_tenant_email(
self,
tenant_id: str,
template_id: str,
rendered_template: Dict,
recipient_emails: List[str],
sender_emails: List[str],
region: int,
priority: str = "normal",
tracking_enabled: bool = True
):
"""Send tenant email using existing EMAIL queue with validation and protection"""
try:
# 1. validate recipient emails
valid_recipients, invalid_recipients = await self.email_validation_service.validate_emails(recipient_emails)
valid_senders, invalid_senders = await self.email_validation_service.validate_sender_emails(tenant_id, sender_emails)
if not valid_recipients:
raise InvalidDataError("No valid recipient emails found")
if not valid_senders:
raise InvalidDataError("No valid sender emails found")
# 2. check blacklisted recipients
blacklisted_recipients = []
for recipient in valid_recipients:
if await self.email_bounce_service.is_blacklisted(recipient, tenant_id):
blacklisted_recipients.append(recipient)
# remove blacklisted recipients from valid recipients
valid_recipients = [r for r in valid_recipients if r not in blacklisted_recipients]
if not valid_recipients:
raise InvalidDataError("All recipient emails are blacklisted")
# 3. check rate limit
rate_limit_result = await self.email_spam_protection_service.check_rate_limit(tenant_id, valid_senders[0])
if not rate_limit_result["allowed"]:
raise InvalidDataError("Rate limit exceeded")
# 4. spam detection with region
email_content = {
"subject": rendered_template.get("subject_properties", {}).get("subject", ""),
"body": rendered_template.get("body_properties", {}).get("html_content", "")
}
spam_result = await self.email_spam_protection_service.detect_spam(email_content, region)
if spam_result["is_spam"]:
raise InvalidDataError("Email content detected as spam")
# 5. build message properties
properties = {
"tenant_id": tenant_id,
"template_id": template_id,
"destination_emails": valid_recipients,
"sender_emails": valid_senders,
"subject_properties": rendered_template.get("subject_properties", {}),
"body_properties": rendered_template.get("body_properties", {}),
"region": region,
"priority": priority,
"tracking_enabled": tracking_enabled,
"content_text": rendered_template.get("body_properties", {}).get("html_content", ""),
"content_subject": rendered_template.get("subject_properties", {}).get("subject", ""),
"receiver_type": "email",
"validation_info": {
"invalid_recipients": invalid_recipients,
"invalid_senders": invalid_senders,
"blacklisted_recipients": blacklisted_recipients,
"rate_limit_info": rate_limit_result,
"spam_detection_info": spam_result
}
}
# 6. create message for each recipient
for recipient_email in valid_recipients:
# create NotificationMessage
message = NotificationMessage(
sender_id=tenant_id,
receiver_id=recipient_email,
subject=template_id,
event="tenant_email",
properties=properties
)
await self.email_publisher.publish(message=message.model_dump_json())
await self.module_logger.log_info(
"Tenant email messages published to EMAIL queue",
properties={
"tenant_id": tenant_id,
"template_id": template_id,
"valid_recipient_count": len(valid_recipients),
"invalid_recipient_count": len(invalid_recipients),
"blacklisted_recipient_count": len(blacklisted_recipients),
"valid_sender_count": len(valid_senders),
"invalid_sender_count": len(invalid_senders)
}
)
return {
"message_id": f"{tenant_id}_{template_id}_{datetime.utcnow().isoformat()}",
"email_ids": [],
"status": "queued",
"tenant_id": tenant_id,
"template_id": template_id,
"validation_summary": {
"total_recipients": len(recipient_emails),
"valid_recipients": len(valid_recipients),
"invalid_recipients": len(invalid_recipients),
"blacklisted_recipients": len(blacklisted_recipients),
"total_senders": len(sender_emails),
"valid_senders": len(valid_senders),
"invalid_senders": len(invalid_senders)
}
}
except Exception as e:
await self.module_logger.log_error(
"Failed to send tenant email",
properties={
"tenant_id": tenant_id,
"template_id": template_id,
"error": str(e)
}
)
raise
async def get_tenant_email_status(self, tenant_id: str, email_id: str = None, recipient_email: str = None):
"""Get tenant email status from database"""
try:
status = await self.email_status_service.get_email_status(email_id, tenant_id, recipient_email)
if not status:
await self.module_logger.log_warning(
"Email status not found",
properties={
"tenant_id": tenant_id,
"email_id": email_id
}
)
return None
await self.module_logger.log_info(
"Tenant email status retrieved successfully",
properties={
"tenant_id": tenant_id,
"email_id": email_id,
"status": status.get("status") if status else None
}
)
return status
except Exception as e:
await self.module_logger.log_error(
"Failed to get tenant email status",
properties={
"tenant_id": tenant_id,
"email_id": email_id,
"error": str(e)
}
)
raise
async def get_tenant_email_status_list(self, tenant_id: str, limit: int = 50, offset: int = 0):
"""Get list of email statuses for a tenant"""
try:
status_list = await self.email_status_service.get_tenant_email_status_list(tenant_id, limit, offset)
await self.module_logger.log_info(
"Tenant email status list retrieved successfully",
properties={
"tenant_id": tenant_id,
"count": len(status_list.get("emails", [])),
"total_count": status_list.get("pagination", {}).get("total_count", 0)
}
)
return status_list
except Exception as e:
await self.module_logger.log_error(
"Failed to get tenant email status list",
properties={
"tenant_id": tenant_id,
"error": str(e)
}
)
raise