freeleaps-service-hub/apps/notification/backend/services/email/email_status_service.py

195 lines
9.2 KiB
Python

from typing import Dict, Optional, List
from datetime import datetime
from common.log.module_logger import ModuleLogger
from backend.models.models import EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc
from common.constants.email import EmailSendStatus, BounceType
class EmailStatusService:
def __init__(self):
self.module_logger = ModuleLogger(sender_id="EmailStatusService")
async def get_email_status(self, email_id: str = None, tenant_id: str = None, recipient_email: str = None):
"""Get comprehensive email status including send status, tracking, and bounce info"""
try:
# 1. judge email send status by email_id or recipient_email
if email_id:
# 1.1 get email send status and bounce info by email_id
try:
email_bounce_doc = await EmailBounceDoc.find_one(
{"email_id": email_id, "tenant_id": tenant_id}
)
email_status_doc = await EmailSendStatusDoc.find_one(
{"email_id": email_id, "tenant_id": tenant_id}
)
except Exception:
# If database is not initialized, return None for testing
email_bounce_doc = None
email_status_doc = None
if not email_status_doc:
await self.module_logger.log_warning(
"Email status not found by email_id",
properties={
"email_id": email_id,
"tenant_id": tenant_id
}
)
return None
elif recipient_email and tenant_id:
# 1.2 get email send status and bounce info by recipient_email (for bounce scenarios)
try:
email_bounce_doc = await EmailBounceDoc.find_one(
{"email": recipient_email, "tenant_id": tenant_id}
)
if email_bounce_doc and email_bounce_doc.email_id:
email_status_doc = await EmailSendStatusDoc.find_one(
{"email_id": email_bounce_doc.email_id, "tenant_id": tenant_id}
)
else:
email_status_doc = await EmailSendStatusDoc.find_one(
{"recipient_email": recipient_email, "tenant_id": tenant_id}
).sort(-EmailSendStatusDoc.created_at)
except Exception:
# If database is not initialized, return None for testing
email_bounce_doc = None
email_status_doc = None
if not email_status_doc:
await self.module_logger.log_warning(
"Email status not found by recipient_email",
properties={
"recipient_email": recipient_email,
"tenant_id": tenant_id
}
)
return None
else:
# 1.3 if no email_id and recipient_email, raise error
raise ValueError("Either email_id or (recipient_email, tenant_id) must be provided")
# 2. get email tracking info
try:
email_tracking_doc = await EmailTrackingDoc.find_one(
{"email_id": email_status_doc.email_id, "tenant_id": tenant_id}
) if email_status_doc else None
except Exception:
# If database is not initialized, return None for testing
email_tracking_doc = None
# 3. build return result
if not email_status_doc:
return None
status_info = {
"email_id": email_status_doc.email_id,
"tenant_id": tenant_id,
"recipient_email": email_status_doc.recipient_email,
"template_id": email_status_doc.template_id,
"subject": email_status_doc.subject,
"status": email_status_doc.status.value,
"sent_at": email_status_doc.sent_at.isoformat() if email_status_doc.sent_at else None,
"failed_at": email_status_doc.failed_at.isoformat() if email_status_doc.failed_at else None,
"error_message": email_status_doc.error_message,
"retry_count": email_status_doc.retry_count,
"max_retries": email_status_doc.max_retries,
"message_id": email_status_doc.message_id,
"created_at": email_status_doc.created_at.isoformat(),
"updated_at": email_status_doc.updated_at.isoformat() if email_status_doc.updated_at else None,
"email_senders": email_status_doc.email_senders,
"tracking": {
"enabled": email_tracking_doc.tracking_enabled if email_tracking_doc else False,
"opened_at": email_tracking_doc.opened_at.isoformat() if email_tracking_doc and email_tracking_doc.opened_at else None,
"opened_count": email_tracking_doc.opened_count if email_tracking_doc else 0,
"clicked_at": email_tracking_doc.clicked_at.isoformat() if email_tracking_doc and email_tracking_doc.clicked_at else None,
"clicked_count": email_tracking_doc.clicked_count if email_tracking_doc else 0,
"clicked_links": email_tracking_doc.clicked_links if email_tracking_doc else [],
"user_agent": email_tracking_doc.user_agent if email_tracking_doc else None,
"ip_address": email_tracking_doc.ip_address if email_tracking_doc else None
},
"bounce": {
"bounced": email_bounce_doc is not None,
"bounce_type": email_bounce_doc.bounce_type.value if email_bounce_doc else None,
"bounce_reason": email_bounce_doc.reason if email_bounce_doc else None,
"bounced_at": email_bounce_doc.bounced_at.isoformat() if email_bounce_doc else None,
"processed": email_bounce_doc.processed if email_bounce_doc else False,
"processed_at": email_bounce_doc.processed_at.isoformat() if email_bounce_doc and email_bounce_doc.processed_at else None
}
}
await self.module_logger.log_info(
"Email status retrieved successfully",
properties={
"email_id": email_status_doc.email_id,
"tenant_id": tenant_id,
"status": status_info["status"]
}
)
return status_info
except Exception as e:
await self.module_logger.log_error(
"Failed to get email status",
properties={
"email_id": email_id or "unknown",
"tenant_id": tenant_id,
"error": str(e)
}
)
raise
async def get_tenant_email_status_list(self, tenant_id: str, limit: int = 50, offset: int = 0) -> Dict:
"""Get list of email statuses for a tenant"""
try:
email_status_docs = await EmailSendStatusDoc.find(
{"tenant_id": tenant_id}
).skip(offset).limit(limit).sort(-EmailSendStatusDoc.created_at).to_list()
status_list = []
for doc in email_status_docs:
status_list.append({
"email_id": doc.email_id,
"recipient_email": doc.recipient_email,
"template_id": doc.template_id,
"subject": doc.subject,
"status": doc.status.value,
"sent_at": doc.sent_at.isoformat() if doc.sent_at else None,
"created_at": doc.created_at.isoformat()
})
total_count = await EmailSendStatusDoc.find(
EmailSendStatusDoc.tenant_id == tenant_id
).count()
await self.module_logger.log_info(
"Tenant email status list retrieved",
properties={
"tenant_id": tenant_id,
"count": len(status_list),
"total_count": total_count
}
)
return {
"tenant_id": tenant_id,
"emails": status_list,
"pagination": {
"limit": limit,
"offset": offset,
"total_count": total_count,
"has_more": offset + limit < total_count
}
}
except Exception as e:
await self.module_logger.log_error(
"Failed to get tenant email status list",
properties={
"tenant_id": tenant_id,
"error": str(e)
}
)
raise