freeleaps-service-hub/apps/notification/backend/services/email/email_spam_protection_service.py

110 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import List, Dict, Optional
from datetime import datetime, timedelta
from common.log.module_logger import ModuleLogger
from common.exception.exceptions import InvalidDataError
from common.constants.region import UserRegion
from backend.infra.email.email_spam_protection.rate_limit_handler import RateLimitHandler
from backend.infra.email.email_spam_protection.spam_detector_handler import SpamDetectorHandler
class EmailSpamProtectionService:
def __init__(self):
self.rate_limit_handler = RateLimitHandler()
self.spam_detector_handler = SpamDetectorHandler()
self.module_logger = ModuleLogger(sender_id="EmailSpamProtectionService")
async def check_rate_limit(self, tenant_id: str, sender_email: str) -> Dict:
"""check rate limit"""
try:
# check tenant rate limit
tenant_limit = await self.rate_limit_handler.check_tenant_rate_limit(tenant_id)
# check sender rate limit
sender_limit = await self.rate_limit_handler.check_sender_rate_limit(sender_email)
# check global rate limit
global_limit = await self.rate_limit_handler.check_global_rate_limit()
is_allowed = tenant_limit["allowed"] and sender_limit["allowed"] and global_limit["allowed"]
await self.module_logger.log_info(
"Rate limit check completed",
properties={
"tenant_id": tenant_id,
"sender_email": sender_email,
"is_allowed": is_allowed,
"tenant_remaining": tenant_limit.get("remaining", 0),
"sender_remaining": sender_limit.get("remaining", 0),
"global_remaining": global_limit.get("remaining", 0)
}
)
return {
"allowed": is_allowed,
"tenant_limit": tenant_limit,
"sender_limit": sender_limit,
"global_limit": global_limit
}
except Exception as e:
await self.module_logger.log_error(
"Rate limit check failed",
properties={
"tenant_id": tenant_id,
"sender_email": sender_email,
"error": str(e)
}
)
raise
async def detect_spam(self, email_content: Dict, region: int):
"""detect if email is spam based on region"""
try:
subject = email_content.get("subject", "")
body = email_content.get("body", "")
# content detection with region
content_score = await self.spam_detector_handler.analyze_content(subject, body, region)
# link detection with region
link_score = await self.spam_detector_handler.analyze_links(body, region)
# keyword detection with region
keyword_score = await self.spam_detector_handler.analyze_keywords(subject, body, region)
# overall score
total_score = (content_score + link_score + keyword_score) / 3
#TODO threshold can be configured
is_spam = (total_score > 0.7)
await self.module_logger.log_info(
"Spam detection completed",
properties={
"region": region,
"content_score": content_score,
"link_score": link_score,
"keyword_score": keyword_score,
"total_score": total_score,
"is_spam": is_spam
}
)
return {
"is_spam": is_spam,
"total_score": total_score,
"content_score": content_score,
"link_score": link_score,
"keyword_score": keyword_score,
"region": region
}
except Exception as e:
await self.module_logger.log_error(
"Spam detection failed",
properties={
"error": str(e),
"region": region
}
)
raise