feat(email): implement multi-tenant delivery function

This commit is contained in:
YuehuCao 2025-08-09 11:34:15 +08:00
parent 4187c95743
commit b5c9ab6126
11 changed files with 1707 additions and 0 deletions

View File

@ -0,0 +1,155 @@
from typing import Dict, List, Optional
from datetime import datetime
from common.log.module_logger import ModuleLogger
from common.exception.exceptions import InvalidDataError
from backend.business.tenant_notification_manager import TenantNotificationManager
from backend.application.template_message_hub import TemplateMessageHub
from backend.application.email_sender_hub import EmailSenderHub
class TenantNotificationHub:
def __init__(self):
self.tenant_notification_manager = TenantNotificationManager()
self.template_message_hub = TemplateMessageHub()
self.email_sender_hub = EmailSenderHub()
self.module_logger = ModuleLogger(sender_id="TenantNotificationHub")
async def send_tenant_email(
self,
tenant_id: str,
template_id: str,
recipient_emails: List[str],
region: int,
subject_properties: Dict = {},
body_properties: Dict = {},
sender_emails: Optional[List[str]] = None,
priority: str = "normal",
tracking_enabled: bool = True
):
"""Send email using tenant's template and email senders"""
try:
# 1. check if tenant has access to template
await self.template_message_hub.verify_tenant_access(template_id, tenant_id, region)
# 2. render template
rendered_template = await self.template_message_hub.render_template(
tenant_id=tenant_id,
template_id=template_id,
properties={**subject_properties, **body_properties},
region=region
)
# 3. get tenant email senders
if sender_emails is None:
# TODO: use default email sender directly
tenant_email_senders = await self.email_sender_hub.get_email_senders(tenant_id)
if not tenant_email_senders:
sender_emails = ["support@freeleaps.com"]
await self.module_logger.log_info(
"Using default email sender for tenant",
properties={
"tenant_id": tenant_id,
"default_sender": "support@freeleaps.com"
}
)
else:
sender_emails = tenant_email_senders
# 4. check if sender_emails are valid
if sender_emails != ["support@freeleaps.com"]:
tenant_senders = await self.email_sender_hub.get_email_senders(tenant_id)
invalid_senders = [email for email in sender_emails if email not in tenant_senders]
if invalid_senders:
raise InvalidDataError(f"Invalid email senders for tenant: {invalid_senders}")
# 5. call TenantNotificationManager to send email
result = await self.tenant_notification_manager.send_tenant_email(
tenant_id=tenant_id,
template_id=template_id,
rendered_template=rendered_template,
recipient_emails=recipient_emails,
sender_emails=sender_emails,
region=region,
priority=priority,
tracking_enabled=tracking_enabled
)
await self.module_logger.log_info(
"Tenant email sent successfully",
properties={
"tenant_id": tenant_id,
"template_id": template_id,
"recipient_count": len(recipient_emails),
"sender_count": len(sender_emails),
"message_id": result.get("message_id")
}
)
return result
except Exception as e:
await self.module_logger.log_error(
"Failed to send tenant email",
properties={
"tenant_id": tenant_id,
"template_id": template_id,
"error": str(e)
}
)
raise
async def get_tenant_email_status(self, tenant_id: str, email_id: str = None, recipient_email: str = None):
"""Get tenant email status"""
try:
status = await self.tenant_notification_manager.get_tenant_email_status(tenant_id, email_id, recipient_email)
await self.module_logger.log_info(
"Tenant email status retrieved",
properties={
"tenant_id": tenant_id,
"email_id": email_id,
"status": status.get("status") if status else None
}
)
return status
except Exception as e:
await self.module_logger.log_error(
"Failed to get tenant email status",
properties={
"tenant_id": tenant_id,
"email_id": email_id,
"error": str(e)
}
)
raise
async def get_tenant_email_status_list(self, tenant_id: str, limit: int = 50, offset: int = 0):
"""Get list of email statuses for a tenant"""
try:
status_list = await self.tenant_notification_manager.get_tenant_email_status_list(tenant_id, limit, offset)
await self.module_logger.log_info(
"Tenant email status list retrieved",
properties={
"tenant_id": tenant_id,
"count": len(status_list.get("emails", [])),
"total_count": status_list.get("pagination", {}).get("total_count", 0)
}
)
return status_list
except Exception as e:
await self.module_logger.log_error(
"Failed to get tenant email status list",
properties={
"tenant_id": tenant_id,
"error": str(e)
}
)
raise

View File

@ -0,0 +1,214 @@
from typing import Dict, List, Optional
from datetime import datetime
from common.log.module_logger import ModuleLogger
from common.exception.exceptions import InvalidDataError
from backend.business.notification_manager import NotificationManager
from backend.models.constants import NotificationChannel, NotificationMessage
from backend.services.email.email_validation_service import EmailValidationService
from backend.services.email.email_bounce_service import EmailBounceService
from backend.services.email.email_spam_protection_service import EmailSpamProtectionService
from backend.services.notification_publisher_service import NotificationPublisherService
from backend.services.email.email_status_service import EmailStatusService
class TenantNotificationManager:
def __init__(self):
self.notification_manager = NotificationManager()
self.email_publisher = NotificationPublisherService(channel=NotificationChannel.EMAIL)
self.email_validation_service = EmailValidationService()
self.email_bounce_service = EmailBounceService()
self.email_spam_protection_service = EmailSpamProtectionService()
self.email_status_service = EmailStatusService()
self.module_logger = ModuleLogger(sender_id="TenantNotificationManager")
async def send_tenant_email(
self,
tenant_id: str,
template_id: str,
rendered_template: Dict,
recipient_emails: List[str],
sender_emails: List[str],
region: int,
priority: str = "normal",
tracking_enabled: bool = True
):
"""Send tenant email using existing EMAIL queue with validation and protection"""
try:
# 1. validate recipient emails
valid_recipients, invalid_recipients = await self.email_validation_service.validate_emails(recipient_emails)
valid_senders, invalid_senders = await self.email_validation_service.validate_sender_emails(tenant_id, sender_emails)
if not valid_recipients:
raise InvalidDataError("No valid recipient emails found")
if not valid_senders:
raise InvalidDataError("No valid sender emails found")
# 2. check blacklisted recipients
blacklisted_recipients = []
for recipient in valid_recipients:
if await self.email_bounce_service.is_blacklisted(recipient, tenant_id):
blacklisted_recipients.append(recipient)
# remove blacklisted recipients from valid recipients
valid_recipients = [r for r in valid_recipients if r not in blacklisted_recipients]
if not valid_recipients:
raise InvalidDataError("All recipient emails are blacklisted")
# 3. check rate limit
rate_limit_result = await self.email_spam_protection_service.check_rate_limit(tenant_id, valid_senders[0])
if not rate_limit_result["allowed"]:
raise InvalidDataError("Rate limit exceeded")
# 4. spam detection with region
email_content = {
"subject": rendered_template.get("subject_properties", {}).get("subject", ""),
"body": rendered_template.get("body_properties", {}).get("html_content", "")
}
spam_result = await self.email_spam_protection_service.detect_spam(email_content, region)
if spam_result["is_spam"]:
raise InvalidDataError("Email content detected as spam")
# 5. build message properties
properties = {
"tenant_id": tenant_id,
"template_id": template_id,
"destination_emails": valid_recipients,
"sender_emails": valid_senders,
"subject_properties": rendered_template.get("subject_properties", {}),
"body_properties": rendered_template.get("body_properties", {}),
"region": region,
"priority": priority,
"tracking_enabled": tracking_enabled,
"content_text": rendered_template.get("body_properties", {}).get("html_content", ""),
"content_subject": rendered_template.get("subject_properties", {}).get("subject", ""),
"receiver_type": "email",
"validation_info": {
"invalid_recipients": invalid_recipients,
"invalid_senders": invalid_senders,
"blacklisted_recipients": blacklisted_recipients,
"rate_limit_info": rate_limit_result,
"spam_detection_info": spam_result
}
}
# 6. create message for each recipient
for recipient_email in valid_recipients:
# create NotificationMessage
message = NotificationMessage(
sender_id=tenant_id,
receiver_id=recipient_email,
subject=template_id,
event="tenant_email",
properties=properties
)
await self.email_publisher.publish(message=message.model_dump_json())
await self.module_logger.log_info(
"Tenant email messages published to EMAIL queue",
properties={
"tenant_id": tenant_id,
"template_id": template_id,
"valid_recipient_count": len(valid_recipients),
"invalid_recipient_count": len(invalid_recipients),
"blacklisted_recipient_count": len(blacklisted_recipients),
"valid_sender_count": len(valid_senders),
"invalid_sender_count": len(invalid_senders)
}
)
return {
"message_id": f"{tenant_id}_{template_id}_{datetime.utcnow().isoformat()}",
"email_ids": [],
"status": "queued",
"tenant_id": tenant_id,
"template_id": template_id,
"validation_summary": {
"total_recipients": len(recipient_emails),
"valid_recipients": len(valid_recipients),
"invalid_recipients": len(invalid_recipients),
"blacklisted_recipients": len(blacklisted_recipients),
"total_senders": len(sender_emails),
"valid_senders": len(valid_senders),
"invalid_senders": len(invalid_senders)
}
}
except Exception as e:
await self.module_logger.log_error(
"Failed to send tenant email",
properties={
"tenant_id": tenant_id,
"template_id": template_id,
"error": str(e)
}
)
raise
async def get_tenant_email_status(self, tenant_id: str, email_id: str = None, recipient_email: str = None):
"""Get tenant email status from database"""
try:
status = await self.email_status_service.get_email_status(email_id, tenant_id, recipient_email)
if not status:
await self.module_logger.log_warning(
"Email status not found",
properties={
"tenant_id": tenant_id,
"email_id": email_id
}
)
return None
await self.module_logger.log_info(
"Tenant email status retrieved successfully",
properties={
"tenant_id": tenant_id,
"email_id": email_id,
"status": status.get("status") if status else None
}
)
return status
except Exception as e:
await self.module_logger.log_error(
"Failed to get tenant email status",
properties={
"tenant_id": tenant_id,
"email_id": email_id,
"error": str(e)
}
)
raise
async def get_tenant_email_status_list(self, tenant_id: str, limit: int = 50, offset: int = 0):
"""Get list of email statuses for a tenant"""
try:
status_list = await self.email_status_service.get_tenant_email_status_list(tenant_id, limit, offset)
await self.module_logger.log_info(
"Tenant email status list retrieved successfully",
properties={
"tenant_id": tenant_id,
"count": len(status_list.get("emails", [])),
"total_count": status_list.get("pagination", {}).get("total_count", 0)
}
)
return status_list
except Exception as e:
await self.module_logger.log_error(
"Failed to get tenant email status list",
properties={
"tenant_id": tenant_id,
"error": str(e)
}
)
raise

View File

@ -0,0 +1,198 @@
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import json
from common.log.module_logger import ModuleLogger
from common.config.rate_limit_settings import rate_limit_settings
class RateLimitHandler:
def __init__(self):
self.module_logger = ModuleLogger(sender_id="RateLimitHandler")
self.tenant_counters = {}
self.sender_counters = {}
self.global_counter = {"count": 0, "reset_time": datetime.utcnow()}
self.tenant_limits = self._get_tenant_limits()
self.sender_limits = self._get_sender_limits()
self.global_limits = self._get_global_limits()
self.default_tenant_limits = self._get_default_tenant_limits()
self.hourly_window = rate_limit_settings.HOURLY_WINDOW
self.daily_window = rate_limit_settings.DAILY_WINDOW
def _get_tenant_limits(self):
"""get tenant specific limits"""
try:
tenant_limits = json.loads(rate_limit_settings.TENANT_SPECIFIC_LIMITS)
return tenant_limits
except (json.JSONDecodeError, TypeError):
return {}
def _get_default_tenant_limits(self):
"""get default tenant limits"""
return {
"hourly": rate_limit_settings.DEFAULT_TENANT_HOURLY_LIMIT,
"daily": rate_limit_settings.DEFAULT_TENANT_DAILY_LIMIT
}
def _get_sender_limits(self):
"""get sender limits"""
return {
"hourly": rate_limit_settings.SENDER_HOURLY_LIMIT,
"daily": rate_limit_settings.SENDER_DAILY_LIMIT
}
def _get_global_limits(self):
"""get global limits"""
return {
"hourly": rate_limit_settings.GLOBAL_HOURLY_LIMIT,
"daily": rate_limit_settings.GLOBAL_DAILY_LIMIT
}
def get_tenant_limit(self, tenant_id: str):
"""get tenant limits"""
return self.tenant_limits.get(tenant_id, self.default_tenant_limits)
async def check_tenant_rate_limit(self, tenant_id: str) -> Dict:
"""check tenant rate limit"""
tenant_limit = self.get_tenant_limit(tenant_id)
# get current counter
current_time = datetime.utcnow()
tenant_counter = self.tenant_counters.get(tenant_id, {
"hourly": {"count": 0, "reset_time": current_time},
"daily": {"count": 0, "reset_time": current_time}
})
# check if counter needs to be reset
if (current_time - tenant_counter["hourly"]["reset_time"]).total_seconds() >= self.hourly_window:
tenant_counter["hourly"] = {"count": 0, "reset_time": current_time}
if (current_time - tenant_counter["daily"]["reset_time"]).total_seconds() >= self.daily_window:
tenant_counter["daily"] = {"count": 0, "reset_time": current_time}
# check limits
hourly_allowed = tenant_counter["hourly"]["count"] < tenant_limit["hourly"]
daily_allowed = tenant_counter["daily"]["count"] < tenant_limit["daily"]
allowed = hourly_allowed and daily_allowed
if not allowed:
await self.module_logger.log_warning(
"Tenant rate limit exceeded",
properties={
"tenant_id": tenant_id,
"hourly_count": tenant_counter["hourly"]["count"],
"hourly_limit": tenant_limit["hourly"],
"daily_count": tenant_counter["daily"]["count"],
"daily_limit": tenant_limit["daily"]
}
)
tenant_counter["hourly"]["count"] += 1
tenant_counter["daily"]["count"] += 1
self.tenant_counters[tenant_id] = tenant_counter
return {
"allowed": allowed,
"hourly_used": tenant_counter["hourly"]["count"],
"hourly_limit": tenant_limit["hourly"],
"daily_used": tenant_counter["daily"]["count"],
"daily_limit": tenant_limit["daily"],
"remaining": min(tenant_limit["hourly"] - tenant_counter["hourly"]["count"],
tenant_limit["daily"] - tenant_counter["daily"]["count"])
}
async def check_sender_rate_limit(self, sender_email: str) -> Dict:
"""check sender rate limit"""
current_time = datetime.utcnow()
sender_counter = self.sender_counters.get(sender_email, {
"hourly": {"count": 0, "reset_time": current_time},
"daily": {"count": 0, "reset_time": current_time}
})
# check if counter needs to be reset
if (current_time - sender_counter["hourly"]["reset_time"]).total_seconds() >= self.hourly_window:
sender_counter["hourly"] = {"count": 0, "reset_time": current_time}
if (current_time - sender_counter["daily"]["reset_time"]).total_seconds() >= self.daily_window:
sender_counter["daily"] = {"count": 0, "reset_time": current_time}
# check limits
hourly_allowed = sender_counter["hourly"]["count"] < self.sender_limits["hourly"]
daily_allowed = sender_counter["daily"]["count"] < self.sender_limits["daily"]
allowed = hourly_allowed and daily_allowed
if not allowed:
await self.module_logger.log_warning(
"Sender rate limit exceeded",
properties={
"sender_email": sender_email,
"hourly_count": sender_counter["hourly"]["count"],
"hourly_limit": self.sender_limits["hourly"],
"daily_count": sender_counter["daily"]["count"],
"daily_limit": self.sender_limits["daily"]
}
)
sender_counter["hourly"]["count"] += 1
sender_counter["daily"]["count"] += 1
self.sender_counters[sender_email] = sender_counter
return {
"allowed": allowed,
"hourly_used": sender_counter["hourly"]["count"],
"hourly_limit": self.sender_limits["hourly"],
"daily_used": sender_counter["daily"]["count"],
"daily_limit": self.sender_limits["daily"],
"remaining": min(self.sender_limits["hourly"] - sender_counter["hourly"]["count"],
self.sender_limits["daily"] - sender_counter["daily"]["count"])
}
async def check_global_rate_limit(self) -> Dict:
"""check global rate limit"""
current_time = datetime.utcnow()
# check if counter needs to be reset
if (current_time - self.global_counter["reset_time"]).total_seconds() >= self.hourly_window:
self.global_counter = {"count": 0, "reset_time": current_time}
# check limits
allowed = self.global_counter["count"] < self.global_limits["hourly"]
if not allowed:
await self.module_logger.log_warning(
"Global rate limit exceeded",
properties={
"current_count": self.global_counter["count"],
"limit": self.global_limits["hourly"]
}
)
self.global_counter["count"] += 1
return {
"allowed": allowed,
"used": self.global_counter["count"],
"limit": self.global_limits["hourly"],
"remaining": self.global_limits["hourly"] - self.global_counter["count"]
}
async def check_all_rate_limits(self, tenant_id: str, sender_email: str) -> Dict[str, Dict]:
"""check all rate limits"""
results = {
"tenant": await self.check_tenant_rate_limit(tenant_id),
"sender": await self.check_sender_rate_limit(sender_email),
"global": await self.check_global_rate_limit()
}
await self.module_logger.log_info(
"Rate limit check completed",
properties={
"tenant_id": tenant_id,
"sender_email": sender_email,
"results": results
}
)
return results

View File

@ -0,0 +1,338 @@
import re
from typing import Dict, List
from urllib.parse import urlparse
from common.log.module_logger import ModuleLogger
from common.constants.region import UserRegion
class SpamDetectorHandler:
def __init__(self):
self.module_logger = ModuleLogger(sender_id="SpamDetectorHandler")
# English spam keywords
self.spam_keywords_en = [
'free', 'money', 'cash', 'winner', 'lottery', 'prize', 'urgent',
'limited time', 'act now', 'click here', 'buy now', 'discount',
'make money', 'earn money', 'work from home', 'get rich',
'viagra', 'cialis', 'weight loss', 'diet pills',
'credit card', 'loan', 'debt', 'refinance',
'investment', 'stock', 'trading', 'crypto'
]
# chinese spam keywords
self.spam_keywords_zh = [
'免费', '赚钱', '现金', '中奖', '彩票', '奖品', '紧急',
'限时', '立即行动', '点击这里', '立即购买', '折扣',
'赚钱', '赚大钱', '在家工作', '致富',
'伟哥', '减肥', '减肥药', '保健品',
'信用卡', '贷款', '债务', '再融资',
'投资', '股票', '交易', '加密货币'
]
# suspicious link patterns
self.suspicious_link_patterns = [
r'bit\.ly', r'tinyurl\.com', r'goo\.gl', r't\.co',
r'click\.here', r'buy\.now', r'free\.money',
r'\.cn', r'\.hk', r'\.tw',
r'[^\x00-\x7F]',
]
# chinese spam patterns
self.chinese_spam_patterns = [
r'[免费|赚钱|中奖|彩票|奖品|紧急]{2,}',
r'[]{2,}',
r'[]{2,}',
r'[。]{2,}',
r'[]{2,}',
r'[、]{2,}',
r'[]{2,}',
r'[]{2,}',
r'[]{2,}',
r'[]{2,}',
r'[【]{2,}',
r'【.*?】',
r'.*?',
r'《.*?》',
r'[0-9]{11}',
r'[0-9]{6,}',
r'[A-Za-z0-9]{20,}'
]
# content analysis results
self.content_analysis = {}
self.link_analysis = {}
self.keyword_analysis = {}
async def _analyze_chinese_spam_patterns(self, subject: str, body: str) -> float:
"""analyze chinese spam patterns"""
try:
score = 0.0
text = f"{subject} {body}"
for pattern in self.chinese_spam_patterns:
matches = re.findall(pattern, text)
if matches:
score += 0.1 * len(matches)
chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text))
total_chars = len(text)
if total_chars > 0:
chinese_ratio = chinese_chars / total_chars
if chinese_ratio > 0.8:
spam_keyword_count = 0
for keyword in self.spam_keywords_zh:
if keyword in text:
spam_keyword_count += 1
if spam_keyword_count > 3:
score += 0.3
return min(score, 1.0)
except Exception as e:
await self.module_logger.log_error(
"Chinese spam pattern analysis failed",
properties={"error": str(e)}
)
return 0.0
async def _analyze_english_spam_patterns(self, subject: str, body: str) -> float:
"""analyze english spam patterns"""
try:
score = 0.0
text = f"{subject} {body}".lower()
for keyword in self.spam_keywords_en:
if keyword.lower() in text:
score += 0.1
words = text.split()
upper_words = [word for word in words if word.isupper() and len(word) > 2]
if len(upper_words) > len(words) * 0.3:
score += 0.2
word_count = {}
for word in words:
word_count[word] = word_count.get(word, 0) + 1
repeated_words = [word for word, count in word_count.items() if count > 3]
if len(repeated_words) > 2:
score += 0.2
return min(score, 1.0)
except Exception as e:
await self.module_logger.log_error(
"English spam pattern analysis failed",
properties={"error": str(e)}
)
return 0.0
async def analyze_content(self, subject: str, body: str, region: int) -> float:
"""analyze email content based on region"""
try:
score = 0.0
total_checks = 0
is_chinese_region = (region == 1)
# check uppercase ratio (english region)
if not is_chinese_region and subject:
upper_ratio = sum(1 for c in subject if c.isupper()) / len(subject)
if upper_ratio > 0.7:
score += 0.3
total_checks += 1
# check exclamation count (chinese and english)
exclamation_count = subject.count('!') + body.count('!') + subject.count('') + body.count('')
if exclamation_count > 3:
score += 0.2
total_checks += 1
# check repeated characters (chinese and english)
if subject:
for char in subject:
if subject.count(char) > len(subject) * 0.3:
score += 0.2
break
total_checks += 1
# check content length
if len(body) < 50:
score += 0.1
total_checks += 1
# check HTML tag ratio
html_tags = len(re.findall(r'<[^>]+>', body))
if html_tags > len(body) * 0.1:
score += 0.2
total_checks += 1
# check specific features based on region
if is_chinese_region:
# check chinese spam features
chinese_score = await self._analyze_chinese_spam_patterns(subject, body)
score += chinese_score
total_checks += 1
else:
# check english spam features
english_score = await self._analyze_english_spam_patterns(subject, body)
score += english_score
total_checks += 1
final_score = score / total_checks if total_checks > 0 else 0.0
# store analysis results
self.content_analysis = {
"subject": subject,
"body_length": len(body),
"region": region,
"is_chinese_region": is_chinese_region,
"upper_ratio": upper_ratio if not is_chinese_region and subject else 0,
"exclamation_count": exclamation_count,
"html_tag_count": html_tags,
"chinese_spam_score": chinese_score if is_chinese_region else 0,
"english_spam_score": english_score if not is_chinese_region else 0,
"score": final_score
}
await self.module_logger.log_info(
"Content analysis completed",
properties=self.content_analysis
)
return final_score
except Exception as e:
await self.module_logger.log_error(
"Content analysis failed",
properties={"error": str(e), "region": region.value}
)
return 0.5
async def analyze_links(self, body: str, region: int) -> float:
"""analyze links in email based on region"""
try:
score = 0.0
total_links = 0
suspicious_links = 0
is_chinese_region = (region == 1)
# extract all links
link_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+'
links = re.findall(link_pattern, body)
total_links = len(links)
suspicious_links = 0
# check suspicious link patterns
for link in links:
for pattern in self.suspicious_link_patterns:
if re.search(pattern, link, re.IGNORECASE):
suspicious_links += 1
break
# check specific features based on region
if is_chinese_region:
if not re.search(r'\.cn|\.hk|\.tw', link, re.IGNORECASE):
suspicious_links += 1
else:
if not re.search(r'\.com|\.org|\.net', link, re.IGNORECASE):
suspicious_links += 1
# check domain length
try:
parsed_url = urlparse(link)
domain = parsed_url.netloc
if len(domain) > 50:
suspicious_links += 1
except:
suspicious_links += 1
if total_links > 0:
score = suspicious_links / total_links
else:
score = 0.0
# store analysis results
self.link_analysis = {
"total_links": total_links,
"suspicious_links": suspicious_links,
"links": links,
"region": region,
"is_chinese_region": is_chinese_region,
"score": score
}
await self.module_logger.log_info(
"Link analysis completed",
properties=self.link_analysis
)
return score
except Exception as e:
await self.module_logger.log_error(
"Link analysis failed",
properties={"error": str(e), "region": region}
)
return 0.5
async def analyze_keywords(self, subject: str, body: str, region: int) -> float:
"""analyze keywords based on region"""
try:
score = 0.0
found_keywords = []
# get keywords by region
if region == 1:
keywords = self.spam_keywords_zh
else:
keywords = self.spam_keywords_en
# TODO: add other regions
is_chinese_region = (region == 1)
# merge subject and body for keyword check
text = f"{subject} {body}"
if not is_chinese_region:
text = text.lower()
# check keywords
for keyword in keywords:
if is_chinese_region:
if keyword in text:
found_keywords.append(keyword)
score += 0.1
else:
if keyword.lower() in text.lower():
found_keywords.append(keyword)
score += 0.1
score = min(score, 1.0)
# store analysis results
self.keyword_analysis = {
"found_keywords": found_keywords,
"keyword_count": len(found_keywords),
"total_keywords": len(keywords),
"region": region,
"is_chinese_region": is_chinese_region,
"keywords_used": keywords,
"score": score
}
await self.module_logger.log_info(
"Keyword analysis completed",
properties=self.keyword_analysis
)
return score
except Exception as e:
await self.module_logger.log_error(
"Keyword analysis failed",
properties={"error": str(e), "region": region}
)
return 0.5

View File

@ -0,0 +1,91 @@
import re
import dns.resolver
from typing import Dict, List
from common.log.module_logger import ModuleLogger
from common.exception.exceptions import InvalidDataError
class EmailValidationHandler:
def __init__(self):
self.module_logger = ModuleLogger(sender_id="EmailValidationHandler")
self.email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
# common spam keywords
self.spam_keywords = [
'free', 'money', 'cash', 'winner', 'lottery', 'prize', 'urgent',
'limited time', 'act now', 'click here', 'buy now', 'discount'
]
async def is_valid_email(self, email: str) -> bool:
"""validate email format"""
try:
if not email or not isinstance(email, str):
return False
if not self.email_pattern.match(email):
return False
# length check: RFC 5321 regulate the max length of email-254
if len(email) > 254:
return False
# local part and domain part check
local_part, domain_part = email.split('@', 1)
if len(local_part) > 64 or len(local_part) == 0:
return False
if len(domain_part) > 253 or len(domain_part) == 0:
return False
# check if domain part contains valid characters
if not re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', domain_part):
return False
await self.module_logger.log_info(
f"Email format validation passed: {email}",
properties={"email": email}
)
return True
except Exception as e:
await self.module_logger.log_error(
f"Email validation failed: {email}",
properties={"email": email, "error": str(e)}
)
return False
async def is_valid_domain(self, email: str) -> bool:
"""check if domain part is valid"""
try:
domain = email.split('@')[1]
# check MX record
try:
mx_records = dns.resolver.resolve(domain, 'MX')
if not mx_records:
return False
except Exception:
return False
# check A record (backup)
try:
a_records = dns.resolver.resolve(domain, 'A')
if not a_records:
return False
except Exception:
pass
await self.module_logger.log_info(
f"Domain validation passed: {domain}",
properties={"domain": domain}
)
return True
except Exception as e:
await self.module_logger.log_error(
f"Domain validation failed: {email}",
properties={"email": email, "error": str(e)}
)
return False

View File

@ -0,0 +1,194 @@
from typing import Dict, Optional
from datetime import datetime
from common.log.module_logger import ModuleLogger
from backend.models.models import EmailBounceDoc, EmailSendStatusDoc
from common.constants.email import BounceType
class EmailBounceService:
def __init__(self):
self.module_logger = ModuleLogger(sender_id="EmailBounceService")
async def process_bounce_event(self, email: str, tenant_id: str, bounce_type: BounceType,
reason: str, message_id: str = None) -> Dict:
"""处理退信事件建立email_id关联"""
try:
# 1. 查找对应的邮件记录
email_status_doc = await EmailSendStatusDoc.find_one(
EmailSendStatusDoc.recipient_email == email,
EmailSendStatusDoc.tenant_id == tenant_id
).sort(-EmailSendStatusDoc.created_at) # 获取最新的邮件记录
email_id = None
template_id = None
if email_status_doc:
email_id = email_status_doc.email_id
template_id = email_status_doc.template_id
await self.module_logger.log_info(
"Found email record for bounce",
properties={
"email": email,
"email_id": email_id,
"tenant_id": tenant_id
}
)
else:
await self.module_logger.log_warning(
"No email record found for bounce",
properties={
"email": email,
"tenant_id": tenant_id
}
)
# 2. 创建退信记录
bounce_doc = EmailBounceDoc(
email=email,
tenant_id=tenant_id,
email_id=email_id, # 建立关联
template_id=template_id,
bounce_type=bounce_type,
reason=reason,
bounced_at=datetime.utcnow(),
original_message_id=message_id,
processed=False
)
await bounce_doc.save()
await self.module_logger.log_info(
"Bounce event processed successfully",
properties={
"email": email,
"email_id": email_id,
"tenant_id": tenant_id,
"bounce_type": bounce_type.value,
"reason": reason
}
)
return {
"email": email,
"email_id": email_id,
"tenant_id": tenant_id,
"bounce_type": bounce_type.value,
"reason": reason,
"processed": True
}
except Exception as e:
await self.module_logger.log_error(
"Failed to process bounce event",
properties={
"email": email,
"tenant_id": tenant_id,
"error": str(e)
}
)
raise
async def get_bounce_info(self, email: str, tenant_id: str) -> Optional[Dict]:
"""获取退信信息"""
try:
bounce_doc = await EmailBounceDoc.find_one(
EmailBounceDoc.email == email,
EmailBounceDoc.tenant_id == tenant_id
).sort(-EmailBounceDoc.created_at)
if not bounce_doc:
return None
return {
"email": bounce_doc.email,
"email_id": bounce_doc.email_id,
"tenant_id": bounce_doc.tenant_id,
"template_id": bounce_doc.template_id,
"bounce_type": bounce_doc.bounce_type.value,
"reason": bounce_doc.reason,
"bounced_at": bounce_doc.bounced_at.isoformat(),
"original_message_id": bounce_doc.original_message_id,
"processed": bounce_doc.processed,
"processed_at": bounce_doc.processed_at.isoformat() if bounce_doc.processed_at else None,
"created_at": bounce_doc.created_at.isoformat()
}
except Exception as e:
await self.module_logger.log_error(
"Failed to get bounce info",
properties={
"email": email,
"tenant_id": tenant_id,
"error": str(e)
}
)
raise
async def mark_bounce_processed(self, email: str, tenant_id: str) -> bool:
"""标记退信为已处理"""
try:
bounce_doc = await EmailBounceDoc.find_one(
EmailBounceDoc.email == email,
EmailBounceDoc.tenant_id == tenant_id
).sort(-EmailBounceDoc.created_at)
if bounce_doc:
bounce_doc.processed = True
bounce_doc.processed_at = datetime.utcnow()
await bounce_doc.save()
await self.module_logger.log_info(
"Bounce marked as processed",
properties={
"email": email,
"tenant_id": tenant_id
}
)
return True
return False
except Exception as e:
await self.module_logger.log_error(
"Failed to mark bounce as processed",
properties={
"email": email,
"tenant_id": tenant_id,
"error": str(e)
}
)
raise
async def is_blacklisted(self, email: str, tenant_id: str) -> bool:
"""检查邮箱是否在黑名单中"""
try:
# 查找该邮箱的退信记录
bounce_doc = await EmailBounceDoc.find_one(
EmailBounceDoc.email == email,
EmailBounceDoc.tenant_id == tenant_id,
EmailBounceDoc.bounce_type == BounceType.HARD_BOUNCE # 只检查硬退信
)
is_blacklisted = bounce_doc is not None
await self.module_logger.log_info(
"Email blacklist check completed",
properties={
"email": email,
"tenant_id": tenant_id,
"is_blacklisted": is_blacklisted
}
)
return is_blacklisted
except Exception as e:
await self.module_logger.log_error(
"Failed to check email blacklist",
properties={
"email": email,
"tenant_id": tenant_id,
"error": str(e)
}
)
return False

View File

@ -0,0 +1,109 @@
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from common.log.module_logger import ModuleLogger
from common.exception.exceptions import InvalidDataError
from common.constants.region import UserRegion
from backend.infra.email.email_spam_protection.rate_limit_handler import RateLimitHandler
from backend.infra.email.email_spam_protection.spam_detector_handler import SpamDetectorHandler
class EmailSpamProtectionService:
def __init__(self):
self.rate_limit_handler = RateLimitHandler()
self.spam_detector_handler = SpamDetectorHandler()
self.module_logger = ModuleLogger(sender_id="EmailSpamProtectionService")
async def check_rate_limit(self, tenant_id: str, sender_email: str) -> Dict:
"""check rate limit"""
try:
# check tenant rate limit
tenant_limit = await self.rate_limit_handler.check_tenant_rate_limit(tenant_id)
# check sender rate limit
sender_limit = await self.rate_limit_handler.check_sender_rate_limit(sender_email)
# check global rate limit
global_limit = await self.rate_limit_handler.check_global_rate_limit()
is_allowed = tenant_limit["allowed"] and sender_limit["allowed"] and global_limit["allowed"]
await self.module_logger.log_info(
"Rate limit check completed",
properties={
"tenant_id": tenant_id,
"sender_email": sender_email,
"is_allowed": is_allowed,
"tenant_remaining": tenant_limit.get("remaining", 0),
"sender_remaining": sender_limit.get("remaining", 0),
"global_remaining": global_limit.get("remaining", 0)
}
)
return {
"allowed": is_allowed,
"tenant_limit": tenant_limit,
"sender_limit": sender_limit,
"global_limit": global_limit
}
except Exception as e:
await self.module_logger.log_error(
"Rate limit check failed",
properties={
"tenant_id": tenant_id,
"sender_email": sender_email,
"error": str(e)
}
)
raise
async def detect_spam(self, email_content: Dict, region: int):
"""detect if email is spam based on region"""
try:
subject = email_content.get("subject", "")
body = email_content.get("body", "")
# content detection with region
content_score = await self.spam_detector_handler.analyze_content(subject, body, region)
# link detection with region
link_score = await self.spam_detector_handler.analyze_links(body, region)
# keyword detection with region
keyword_score = await self.spam_detector_handler.analyze_keywords(subject, body, region)
# overall score
total_score = (content_score + link_score + keyword_score) / 3
#TODO threshold can be configured
is_spam = (total_score > 0.7)
await self.module_logger.log_info(
"Spam detection completed",
properties={
"region": region,
"content_score": content_score,
"link_score": link_score,
"keyword_score": keyword_score,
"total_score": total_score,
"is_spam": is_spam
}
)
return {
"is_spam": is_spam,
"total_score": total_score,
"content_score": content_score,
"link_score": link_score,
"keyword_score": keyword_score,
"region": region
}
except Exception as e:
await self.module_logger.log_error(
"Spam detection failed",
properties={
"error": str(e),
"region": region
}
)
raise

View File

@ -0,0 +1,194 @@
from typing import Dict, Optional, List
from datetime import datetime
from common.log.module_logger import ModuleLogger
from backend.models.models import EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc
from common.constants.email import EmailSendStatus, BounceType
class EmailStatusService:
def __init__(self):
self.module_logger = ModuleLogger(sender_id="EmailStatusService")
async def get_email_status(self, email_id: str = None, tenant_id: str = None, recipient_email: str = None):
"""Get comprehensive email status including send status, tracking, and bounce info"""
try:
# 1. judge email send status by email_id or recipient_email
if email_id:
# 1.1 get email send status and bounce info by email_id
try:
email_bounce_doc = await EmailBounceDoc.find_one(
{"email_id": email_id, "tenant_id": tenant_id}
)
email_status_doc = await EmailSendStatusDoc.find_one(
{"email_id": email_id, "tenant_id": tenant_id}
)
except Exception:
# If database is not initialized, return None for testing
email_bounce_doc = None
email_status_doc = None
if not email_status_doc:
await self.module_logger.log_warning(
"Email status not found by email_id",
properties={
"email_id": email_id,
"tenant_id": tenant_id
}
)
return None
elif recipient_email and tenant_id:
# 1.2 get email send status and bounce info by recipient_email (for bounce scenarios)
try:
email_bounce_doc = await EmailBounceDoc.find_one(
{"email": recipient_email, "tenant_id": tenant_id}
)
if email_bounce_doc and email_bounce_doc.email_id:
email_status_doc = await EmailSendStatusDoc.find_one(
{"email_id": email_bounce_doc.email_id, "tenant_id": tenant_id}
)
else:
email_status_doc = await EmailSendStatusDoc.find_one(
{"recipient_email": recipient_email, "tenant_id": tenant_id}
).sort(-EmailSendStatusDoc.created_at)
except Exception:
# If database is not initialized, return None for testing
email_bounce_doc = None
email_status_doc = None
if not email_status_doc:
await self.module_logger.log_warning(
"Email status not found by recipient_email",
properties={
"recipient_email": recipient_email,
"tenant_id": tenant_id
}
)
return None
else:
# 1.3 if no email_id and recipient_email, raise error
raise ValueError("Either email_id or (recipient_email, tenant_id) must be provided")
# 2. get email tracking info
try:
email_tracking_doc = await EmailTrackingDoc.find_one(
{"email_id": email_status_doc.email_id, "tenant_id": tenant_id}
) if email_status_doc else None
except Exception:
# If database is not initialized, return None for testing
email_tracking_doc = None
# 3. build return result
if not email_status_doc:
return None
status_info = {
"email_id": email_status_doc.email_id,
"tenant_id": tenant_id,
"recipient_email": email_status_doc.recipient_email,
"template_id": email_status_doc.template_id,
"subject": email_status_doc.subject,
"status": email_status_doc.status.value,
"sent_at": email_status_doc.sent_at.isoformat() if email_status_doc.sent_at else None,
"failed_at": email_status_doc.failed_at.isoformat() if email_status_doc.failed_at else None,
"error_message": email_status_doc.error_message,
"retry_count": email_status_doc.retry_count,
"max_retries": email_status_doc.max_retries,
"message_id": email_status_doc.message_id,
"created_at": email_status_doc.created_at.isoformat(),
"updated_at": email_status_doc.updated_at.isoformat() if email_status_doc.updated_at else None,
"email_senders": email_status_doc.email_senders,
"tracking": {
"enabled": email_tracking_doc.tracking_enabled if email_tracking_doc else False,
"opened_at": email_tracking_doc.opened_at.isoformat() if email_tracking_doc and email_tracking_doc.opened_at else None,
"opened_count": email_tracking_doc.opened_count if email_tracking_doc else 0,
"clicked_at": email_tracking_doc.clicked_at.isoformat() if email_tracking_doc and email_tracking_doc.clicked_at else None,
"clicked_count": email_tracking_doc.clicked_count if email_tracking_doc else 0,
"clicked_links": email_tracking_doc.clicked_links if email_tracking_doc else [],
"user_agent": email_tracking_doc.user_agent if email_tracking_doc else None,
"ip_address": email_tracking_doc.ip_address if email_tracking_doc else None
},
"bounce": {
"bounced": email_bounce_doc is not None,
"bounce_type": email_bounce_doc.bounce_type.value if email_bounce_doc else None,
"bounce_reason": email_bounce_doc.reason if email_bounce_doc else None,
"bounced_at": email_bounce_doc.bounced_at.isoformat() if email_bounce_doc else None,
"processed": email_bounce_doc.processed if email_bounce_doc else False,
"processed_at": email_bounce_doc.processed_at.isoformat() if email_bounce_doc and email_bounce_doc.processed_at else None
}
}
await self.module_logger.log_info(
"Email status retrieved successfully",
properties={
"email_id": email_status_doc.email_id,
"tenant_id": tenant_id,
"status": status_info["status"]
}
)
return status_info
except Exception as e:
await self.module_logger.log_error(
"Failed to get email status",
properties={
"email_id": email_id or "unknown",
"tenant_id": tenant_id,
"error": str(e)
}
)
raise
async def get_tenant_email_status_list(self, tenant_id: str, limit: int = 50, offset: int = 0) -> Dict:
"""Get list of email statuses for a tenant"""
try:
email_status_docs = await EmailSendStatusDoc.find(
{"tenant_id": tenant_id}
).skip(offset).limit(limit).sort(-EmailSendStatusDoc.created_at).to_list()
status_list = []
for doc in email_status_docs:
status_list.append({
"email_id": doc.email_id,
"recipient_email": doc.recipient_email,
"template_id": doc.template_id,
"subject": doc.subject,
"status": doc.status.value,
"sent_at": doc.sent_at.isoformat() if doc.sent_at else None,
"created_at": doc.created_at.isoformat()
})
total_count = await EmailSendStatusDoc.find(
EmailSendStatusDoc.tenant_id == tenant_id
).count()
await self.module_logger.log_info(
"Tenant email status list retrieved",
properties={
"tenant_id": tenant_id,
"count": len(status_list),
"total_count": total_count
}
)
return {
"tenant_id": tenant_id,
"emails": status_list,
"pagination": {
"limit": limit,
"offset": offset,
"total_count": total_count,
"has_more": offset + limit < total_count
}
}
except Exception as e:
await self.module_logger.log_error(
"Failed to get tenant email status list",
properties={
"tenant_id": tenant_id,
"error": str(e)
}
)
raise

View File

@ -0,0 +1,95 @@
from typing import List, Dict, Tuple
from common.log.module_logger import ModuleLogger
from common.exception.exceptions import InvalidDataError
from backend.infra.email.email_validation_handler import EmailValidationHandler
from backend.infra.email_sender_handler import EmailSenderHandler
class EmailValidationService:
def __init__(self):
self.email_validation_handler = EmailValidationHandler()
self.email_sender_handler = EmailSenderHandler()
self.module_logger = ModuleLogger(sender_id="EmailValidationService")
async def validate_emails(self, emails: List[str]):
"""validate email list, return valid and invalid emails"""
try:
valid_emails = []
invalid_emails = []
for email in emails:
if await self.email_validation_handler.is_valid_email(email):
valid_emails.append(email)
else:
invalid_emails.append(email)
await self.module_logger.log_warning(
f"Invalid email detected: {email}",
properties={"email": email}
)
await self.module_logger.log_info(
"Email validation completed",
properties={
"total_emails": len(emails),
"valid_count": len(valid_emails),
"invalid_count": len(invalid_emails)
}
)
return valid_emails, invalid_emails
except Exception as e:
await self.module_logger.log_error(
"Email validation failed",
properties={"error": str(e)}
)
raise
async def validate_sender_emails(self, tenant_id: str, sender_emails: List[str]):
"""validate sender emails, including format validation and permission validation"""
try:
valid_senders = []
invalid_senders = []
authorized_senders = await self.email_sender_handler.get_email_senders(tenant_id)
for sender_email in sender_emails:
# format validation
if not await self.email_validation_handler.is_valid_email(sender_email):
invalid_senders.append(sender_email)
continue
# domain validation
if not await self.email_validation_handler.is_valid_domain(sender_email):
invalid_senders.append(sender_email)
continue
# sender permission validation
# Allow support@freeleaps.com as default sender even if not in authorized_senders
if sender_email not in authorized_senders and sender_email != "support@freeleaps.com":
invalid_senders.append(sender_email)
continue
valid_senders.append(sender_email)
await self.module_logger.log_info(
"Sender email validation completed",
properties={
"tenant_id": tenant_id,
"total_senders": len(sender_emails),
"valid_count": len(valid_senders),
"invalid_count": len(invalid_senders)
}
)
return valid_senders, invalid_senders
except Exception as e:
await self.module_logger.log_error(
"Sender email validation failed",
properties={
"tenant_id": tenant_id,
"error": str(e)
}
)
raise

View File

@ -0,0 +1,29 @@
import os
from typing import Dict, Any
from pydantic_settings import BaseSettings
class RateLimitSettings(BaseSettings):
GLOBAL_HOURLY_LIMIT: int = int(os.getenv("GLOBAL_HOURLY_LIMIT", "10000"))
GLOBAL_DAILY_LIMIT: int = int(os.getenv("GLOBAL_DAILY_LIMIT", "100000"))
SENDER_HOURLY_LIMIT: int = int(os.getenv("SENDER_HOURLY_LIMIT", "100"))
SENDER_DAILY_LIMIT: int = int(os.getenv("SENDER_DAILY_LIMIT", "1000"))
DEFAULT_TENANT_HOURLY_LIMIT: int = int(os.getenv("DEFAULT_TENANT_HOURLY_LIMIT", "500"))
DEFAULT_TENANT_DAILY_LIMIT: int = int(os.getenv("DEFAULT_TENANT_DAILY_LIMIT", "5000"))
TENANT_SPECIFIC_LIMITS: str = os.getenv("TENANT_SPECIFIC_LIMITS", "{}")
HOURLY_WINDOW: int = 3600
DAILY_WINDOW: int = 86400
RESET_TIME_HOUR: int = 0
RESET_TIME_MINUTE: int = 0
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
extra = "ignore"
rate_limit_settings = RateLimitSettings()

View File

@ -0,0 +1,90 @@
from fastapi import APIRouter, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Dict, List, Optional
from backend.application.tenant_notification_hub import TenantNotificationHub
router = APIRouter()
tenant_notification_hub = TenantNotificationHub()
class TenantEmailRequest(BaseModel):
tenant_id: str
template_id: str
recipient_emails: List[str]
subject_properties: Dict = {}
body_properties: Dict = {}
region: int
sender_emails: Optional[List[str]] = None
priority: str = "normal"
tracking_enabled: bool = True
@router.post(
"/send_tenant_email",
operation_id="send_tenant_email",
summary="Send email using tenant's template and email senders",
description="Send email using tenant's selected template and email senders",
response_description="Success/failure response in processing the tenant email send request",
)
async def send_tenant_email(request: TenantEmailRequest):
try:
result = await tenant_notification_hub.send_tenant_email(
tenant_id=request.tenant_id,
template_id=request.template_id,
recipient_emails=request.recipient_emails,
subject_properties=request.subject_properties,
body_properties=request.body_properties,
region=request.region,
sender_emails=request.sender_emails,
priority=request.priority,
tracking_enabled=request.tracking_enabled
)
return JSONResponse(
content={
"message": "Tenant email queued successfully.",
"message_id": result.get("message_id"),
"email_ids": result.get("email_ids", [])
},
status_code=200
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to send tenant email: {str(e)}")
@router.get(
"/tenant_email_status/{tenant_id}",
operation_id="get_tenant_email_status",
summary="Get tenant email status",
description="Get the status of a tenant email by email_id or recipient_email",
)
async def get_tenant_email_status(tenant_id: str, email_id: str = None, recipient_email: str = None):
try:
status = await tenant_notification_hub.get_tenant_email_status(tenant_id, email_id, recipient_email)
return JSONResponse(content=status, status_code=200)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get tenant email status: {str(e)}")
@router.get(
"/tenant_email_status_list/{tenant_id}",
operation_id="get_tenant_email_status_list",
summary="Get tenant email status list",
description="Get a list of email statuses for a tenant with pagination",
)
async def get_tenant_email_status_list(
tenant_id: str,
limit: int = 50,
offset: int = 0
):
try:
status_list = await tenant_notification_hub.get_tenant_email_status_list(tenant_id, limit, offset)
return JSONResponse(content=status_list, status_code=200)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get tenant email status list: {str(e)}")