From b5c9ab6126cb88e9be63752eb23a21bdc010a01b Mon Sep 17 00:00:00 2001 From: YuehuCao Date: Sat, 9 Aug 2025 11:34:15 +0800 Subject: [PATCH] feat(email): implement multi-tenant delivery function --- .../application/tenant_notification_hub.py | 155 ++++++++ .../business/tenant_notification_manager.py | 214 +++++++++++ .../rate_limit_handler.py | 198 ++++++++++ .../spam_detector_handler.py | 338 ++++++++++++++++++ .../infra/email/email_validation_handler.py | 91 +++++ .../services/email/email_bounce_service.py | 194 ++++++++++ .../email/email_spam_protection_service.py | 109 ++++++ .../services/email/email_status_service.py | 194 ++++++++++ .../email/email_validation_service.py | 95 +++++ .../common/config/rate_limit_settings.py | 29 ++ .../webapi/routes/tenant_notification.py | 90 +++++ 11 files changed, 1707 insertions(+) create mode 100644 apps/notification/backend/application/tenant_notification_hub.py create mode 100644 apps/notification/backend/business/tenant_notification_manager.py create mode 100644 apps/notification/backend/infra/email/email_spam_protection/rate_limit_handler.py create mode 100644 apps/notification/backend/infra/email/email_spam_protection/spam_detector_handler.py create mode 100644 apps/notification/backend/infra/email/email_validation_handler.py create mode 100644 apps/notification/backend/services/email/email_bounce_service.py create mode 100644 apps/notification/backend/services/email/email_spam_protection_service.py create mode 100644 apps/notification/backend/services/email/email_status_service.py create mode 100644 apps/notification/backend/services/email/email_validation_service.py create mode 100644 apps/notification/common/config/rate_limit_settings.py create mode 100644 apps/notification/webapi/routes/tenant_notification.py diff --git a/apps/notification/backend/application/tenant_notification_hub.py b/apps/notification/backend/application/tenant_notification_hub.py new file mode 100644 index 0000000..f4ebd45 --- /dev/null +++ b/apps/notification/backend/application/tenant_notification_hub.py @@ -0,0 +1,155 @@ +from typing import Dict, List, Optional +from datetime import datetime +from common.log.module_logger import ModuleLogger +from common.exception.exceptions import InvalidDataError + +from backend.business.tenant_notification_manager import TenantNotificationManager +from backend.application.template_message_hub import TemplateMessageHub +from backend.application.email_sender_hub import EmailSenderHub + + +class TenantNotificationHub: + def __init__(self): + self.tenant_notification_manager = TenantNotificationManager() + self.template_message_hub = TemplateMessageHub() + self.email_sender_hub = EmailSenderHub() + self.module_logger = ModuleLogger(sender_id="TenantNotificationHub") + + async def send_tenant_email( + self, + tenant_id: str, + template_id: str, + recipient_emails: List[str], + region: int, + subject_properties: Dict = {}, + body_properties: Dict = {}, + sender_emails: Optional[List[str]] = None, + priority: str = "normal", + tracking_enabled: bool = True + ): + """Send email using tenant's template and email senders""" + try: + # 1. check if tenant has access to template + await self.template_message_hub.verify_tenant_access(template_id, tenant_id, region) + + # 2. render template + rendered_template = await self.template_message_hub.render_template( + tenant_id=tenant_id, + template_id=template_id, + properties={**subject_properties, **body_properties}, + region=region + ) + + # 3. get tenant email senders + if sender_emails is None: + # TODO: use default email sender directly + tenant_email_senders = await self.email_sender_hub.get_email_senders(tenant_id) + if not tenant_email_senders: + sender_emails = ["support@freeleaps.com"] + await self.module_logger.log_info( + "Using default email sender for tenant", + properties={ + "tenant_id": tenant_id, + "default_sender": "support@freeleaps.com" + } + ) + else: + sender_emails = tenant_email_senders + + # 4. check if sender_emails are valid + if sender_emails != ["support@freeleaps.com"]: + tenant_senders = await self.email_sender_hub.get_email_senders(tenant_id) + invalid_senders = [email for email in sender_emails if email not in tenant_senders] + if invalid_senders: + raise InvalidDataError(f"Invalid email senders for tenant: {invalid_senders}") + + # 5. call TenantNotificationManager to send email + result = await self.tenant_notification_manager.send_tenant_email( + tenant_id=tenant_id, + template_id=template_id, + rendered_template=rendered_template, + recipient_emails=recipient_emails, + sender_emails=sender_emails, + region=region, + priority=priority, + tracking_enabled=tracking_enabled + ) + + await self.module_logger.log_info( + "Tenant email sent successfully", + properties={ + "tenant_id": tenant_id, + "template_id": template_id, + "recipient_count": len(recipient_emails), + "sender_count": len(sender_emails), + "message_id": result.get("message_id") + } + ) + + return result + + except Exception as e: + await self.module_logger.log_error( + "Failed to send tenant email", + properties={ + "tenant_id": tenant_id, + "template_id": template_id, + "error": str(e) + } + ) + raise + + async def get_tenant_email_status(self, tenant_id: str, email_id: str = None, recipient_email: str = None): + """Get tenant email status""" + try: + status = await self.tenant_notification_manager.get_tenant_email_status(tenant_id, email_id, recipient_email) + + await self.module_logger.log_info( + "Tenant email status retrieved", + properties={ + "tenant_id": tenant_id, + "email_id": email_id, + "status": status.get("status") if status else None + } + ) + + return status + + except Exception as e: + await self.module_logger.log_error( + "Failed to get tenant email status", + properties={ + "tenant_id": tenant_id, + "email_id": email_id, + "error": str(e) + } + ) + raise + + async def get_tenant_email_status_list(self, tenant_id: str, limit: int = 50, offset: int = 0): + """Get list of email statuses for a tenant""" + try: + status_list = await self.tenant_notification_manager.get_tenant_email_status_list(tenant_id, limit, offset) + + await self.module_logger.log_info( + "Tenant email status list retrieved", + properties={ + "tenant_id": tenant_id, + "count": len(status_list.get("emails", [])), + "total_count": status_list.get("pagination", {}).get("total_count", 0) + } + ) + + return status_list + + except Exception as e: + await self.module_logger.log_error( + "Failed to get tenant email status list", + properties={ + "tenant_id": tenant_id, + "error": str(e) + } + ) + raise + + \ No newline at end of file diff --git a/apps/notification/backend/business/tenant_notification_manager.py b/apps/notification/backend/business/tenant_notification_manager.py new file mode 100644 index 0000000..d86142a --- /dev/null +++ b/apps/notification/backend/business/tenant_notification_manager.py @@ -0,0 +1,214 @@ +from typing import Dict, List, Optional +from datetime import datetime +from common.log.module_logger import ModuleLogger +from common.exception.exceptions import InvalidDataError + +from backend.business.notification_manager import NotificationManager +from backend.models.constants import NotificationChannel, NotificationMessage +from backend.services.email.email_validation_service import EmailValidationService +from backend.services.email.email_bounce_service import EmailBounceService +from backend.services.email.email_spam_protection_service import EmailSpamProtectionService +from backend.services.notification_publisher_service import NotificationPublisherService +from backend.services.email.email_status_service import EmailStatusService + + +class TenantNotificationManager: + def __init__(self): + self.notification_manager = NotificationManager() + self.email_publisher = NotificationPublisherService(channel=NotificationChannel.EMAIL) + self.email_validation_service = EmailValidationService() + self.email_bounce_service = EmailBounceService() + self.email_spam_protection_service = EmailSpamProtectionService() + self.email_status_service = EmailStatusService() + self.module_logger = ModuleLogger(sender_id="TenantNotificationManager") + + async def send_tenant_email( + self, + tenant_id: str, + template_id: str, + rendered_template: Dict, + recipient_emails: List[str], + sender_emails: List[str], + region: int, + priority: str = "normal", + tracking_enabled: bool = True + ): + """Send tenant email using existing EMAIL queue with validation and protection""" + try: + # 1. validate recipient emails + valid_recipients, invalid_recipients = await self.email_validation_service.validate_emails(recipient_emails) + valid_senders, invalid_senders = await self.email_validation_service.validate_sender_emails(tenant_id, sender_emails) + + if not valid_recipients: + raise InvalidDataError("No valid recipient emails found") + + if not valid_senders: + raise InvalidDataError("No valid sender emails found") + + # 2. check blacklisted recipients + blacklisted_recipients = [] + for recipient in valid_recipients: + if await self.email_bounce_service.is_blacklisted(recipient, tenant_id): + blacklisted_recipients.append(recipient) + + # remove blacklisted recipients from valid recipients + valid_recipients = [r for r in valid_recipients if r not in blacklisted_recipients] + + if not valid_recipients: + raise InvalidDataError("All recipient emails are blacklisted") + + # 3. check rate limit + rate_limit_result = await self.email_spam_protection_service.check_rate_limit(tenant_id, valid_senders[0]) + if not rate_limit_result["allowed"]: + raise InvalidDataError("Rate limit exceeded") + + # 4. spam detection with region + email_content = { + "subject": rendered_template.get("subject_properties", {}).get("subject", ""), + "body": rendered_template.get("body_properties", {}).get("html_content", "") + } + spam_result = await self.email_spam_protection_service.detect_spam(email_content, region) + if spam_result["is_spam"]: + raise InvalidDataError("Email content detected as spam") + + # 5. build message properties + properties = { + "tenant_id": tenant_id, + "template_id": template_id, + "destination_emails": valid_recipients, + "sender_emails": valid_senders, + "subject_properties": rendered_template.get("subject_properties", {}), + "body_properties": rendered_template.get("body_properties", {}), + "region": region, + "priority": priority, + "tracking_enabled": tracking_enabled, + "content_text": rendered_template.get("body_properties", {}).get("html_content", ""), + "content_subject": rendered_template.get("subject_properties", {}).get("subject", ""), + "receiver_type": "email", + "validation_info": { + "invalid_recipients": invalid_recipients, + "invalid_senders": invalid_senders, + "blacklisted_recipients": blacklisted_recipients, + "rate_limit_info": rate_limit_result, + "spam_detection_info": spam_result + } + } + + # 6. create message for each recipient + for recipient_email in valid_recipients: + # create NotificationMessage + message = NotificationMessage( + sender_id=tenant_id, + receiver_id=recipient_email, + subject=template_id, + event="tenant_email", + properties=properties + ) + + await self.email_publisher.publish(message=message.model_dump_json()) + + await self.module_logger.log_info( + "Tenant email messages published to EMAIL queue", + properties={ + "tenant_id": tenant_id, + "template_id": template_id, + "valid_recipient_count": len(valid_recipients), + "invalid_recipient_count": len(invalid_recipients), + "blacklisted_recipient_count": len(blacklisted_recipients), + "valid_sender_count": len(valid_senders), + "invalid_sender_count": len(invalid_senders) + } + ) + + return { + "message_id": f"{tenant_id}_{template_id}_{datetime.utcnow().isoformat()}", + "email_ids": [], + "status": "queued", + "tenant_id": tenant_id, + "template_id": template_id, + "validation_summary": { + "total_recipients": len(recipient_emails), + "valid_recipients": len(valid_recipients), + "invalid_recipients": len(invalid_recipients), + "blacklisted_recipients": len(blacklisted_recipients), + "total_senders": len(sender_emails), + "valid_senders": len(valid_senders), + "invalid_senders": len(invalid_senders) + } + } + + except Exception as e: + await self.module_logger.log_error( + "Failed to send tenant email", + properties={ + "tenant_id": tenant_id, + "template_id": template_id, + "error": str(e) + } + ) + raise + + async def get_tenant_email_status(self, tenant_id: str, email_id: str = None, recipient_email: str = None): + """Get tenant email status from database""" + try: + status = await self.email_status_service.get_email_status(email_id, tenant_id, recipient_email) + + if not status: + await self.module_logger.log_warning( + "Email status not found", + properties={ + "tenant_id": tenant_id, + "email_id": email_id + } + ) + return None + + await self.module_logger.log_info( + "Tenant email status retrieved successfully", + properties={ + "tenant_id": tenant_id, + "email_id": email_id, + "status": status.get("status") if status else None + } + ) + + return status + + except Exception as e: + await self.module_logger.log_error( + "Failed to get tenant email status", + properties={ + "tenant_id": tenant_id, + "email_id": email_id, + "error": str(e) + } + ) + raise + + + + async def get_tenant_email_status_list(self, tenant_id: str, limit: int = 50, offset: int = 0): + """Get list of email statuses for a tenant""" + try: + status_list = await self.email_status_service.get_tenant_email_status_list(tenant_id, limit, offset) + + await self.module_logger.log_info( + "Tenant email status list retrieved successfully", + properties={ + "tenant_id": tenant_id, + "count": len(status_list.get("emails", [])), + "total_count": status_list.get("pagination", {}).get("total_count", 0) + } + ) + + return status_list + + except Exception as e: + await self.module_logger.log_error( + "Failed to get tenant email status list", + properties={ + "tenant_id": tenant_id, + "error": str(e) + } + ) + raise \ No newline at end of file diff --git a/apps/notification/backend/infra/email/email_spam_protection/rate_limit_handler.py b/apps/notification/backend/infra/email/email_spam_protection/rate_limit_handler.py new file mode 100644 index 0000000..d5e5776 --- /dev/null +++ b/apps/notification/backend/infra/email/email_spam_protection/rate_limit_handler.py @@ -0,0 +1,198 @@ +from datetime import datetime, timedelta +from typing import Dict, List, Optional +import json +from common.log.module_logger import ModuleLogger +from common.config.rate_limit_settings import rate_limit_settings + + +class RateLimitHandler: + def __init__(self): + self.module_logger = ModuleLogger(sender_id="RateLimitHandler") + + self.tenant_counters = {} + self.sender_counters = {} + self.global_counter = {"count": 0, "reset_time": datetime.utcnow()} + + self.tenant_limits = self._get_tenant_limits() + self.sender_limits = self._get_sender_limits() + self.global_limits = self._get_global_limits() + self.default_tenant_limits = self._get_default_tenant_limits() + + self.hourly_window = rate_limit_settings.HOURLY_WINDOW + self.daily_window = rate_limit_settings.DAILY_WINDOW + + def _get_tenant_limits(self): + """get tenant specific limits""" + try: + tenant_limits = json.loads(rate_limit_settings.TENANT_SPECIFIC_LIMITS) + return tenant_limits + except (json.JSONDecodeError, TypeError): + return {} + + def _get_default_tenant_limits(self): + """get default tenant limits""" + return { + "hourly": rate_limit_settings.DEFAULT_TENANT_HOURLY_LIMIT, + "daily": rate_limit_settings.DEFAULT_TENANT_DAILY_LIMIT + } + + def _get_sender_limits(self): + """get sender limits""" + return { + "hourly": rate_limit_settings.SENDER_HOURLY_LIMIT, + "daily": rate_limit_settings.SENDER_DAILY_LIMIT + } + + def _get_global_limits(self): + """get global limits""" + return { + "hourly": rate_limit_settings.GLOBAL_HOURLY_LIMIT, + "daily": rate_limit_settings.GLOBAL_DAILY_LIMIT + } + + def get_tenant_limit(self, tenant_id: str): + """get tenant limits""" + return self.tenant_limits.get(tenant_id, self.default_tenant_limits) + + async def check_tenant_rate_limit(self, tenant_id: str) -> Dict: + """check tenant rate limit""" + tenant_limit = self.get_tenant_limit(tenant_id) + + # get current counter + current_time = datetime.utcnow() + tenant_counter = self.tenant_counters.get(tenant_id, { + "hourly": {"count": 0, "reset_time": current_time}, + "daily": {"count": 0, "reset_time": current_time} + }) + + # check if counter needs to be reset + if (current_time - tenant_counter["hourly"]["reset_time"]).total_seconds() >= self.hourly_window: + tenant_counter["hourly"] = {"count": 0, "reset_time": current_time} + + if (current_time - tenant_counter["daily"]["reset_time"]).total_seconds() >= self.daily_window: + tenant_counter["daily"] = {"count": 0, "reset_time": current_time} + + # check limits + hourly_allowed = tenant_counter["hourly"]["count"] < tenant_limit["hourly"] + daily_allowed = tenant_counter["daily"]["count"] < tenant_limit["daily"] + allowed = hourly_allowed and daily_allowed + + if not allowed: + await self.module_logger.log_warning( + "Tenant rate limit exceeded", + properties={ + "tenant_id": tenant_id, + "hourly_count": tenant_counter["hourly"]["count"], + "hourly_limit": tenant_limit["hourly"], + "daily_count": tenant_counter["daily"]["count"], + "daily_limit": tenant_limit["daily"] + } + ) + + tenant_counter["hourly"]["count"] += 1 + tenant_counter["daily"]["count"] += 1 + self.tenant_counters[tenant_id] = tenant_counter + + return { + "allowed": allowed, + "hourly_used": tenant_counter["hourly"]["count"], + "hourly_limit": tenant_limit["hourly"], + "daily_used": tenant_counter["daily"]["count"], + "daily_limit": tenant_limit["daily"], + "remaining": min(tenant_limit["hourly"] - tenant_counter["hourly"]["count"], + tenant_limit["daily"] - tenant_counter["daily"]["count"]) + } + + async def check_sender_rate_limit(self, sender_email: str) -> Dict: + """check sender rate limit""" + current_time = datetime.utcnow() + sender_counter = self.sender_counters.get(sender_email, { + "hourly": {"count": 0, "reset_time": current_time}, + "daily": {"count": 0, "reset_time": current_time} + }) + + # check if counter needs to be reset + if (current_time - sender_counter["hourly"]["reset_time"]).total_seconds() >= self.hourly_window: + sender_counter["hourly"] = {"count": 0, "reset_time": current_time} + + if (current_time - sender_counter["daily"]["reset_time"]).total_seconds() >= self.daily_window: + sender_counter["daily"] = {"count": 0, "reset_time": current_time} + + # check limits + hourly_allowed = sender_counter["hourly"]["count"] < self.sender_limits["hourly"] + daily_allowed = sender_counter["daily"]["count"] < self.sender_limits["daily"] + allowed = hourly_allowed and daily_allowed + + if not allowed: + await self.module_logger.log_warning( + "Sender rate limit exceeded", + properties={ + "sender_email": sender_email, + "hourly_count": sender_counter["hourly"]["count"], + "hourly_limit": self.sender_limits["hourly"], + "daily_count": sender_counter["daily"]["count"], + "daily_limit": self.sender_limits["daily"] + } + ) + + sender_counter["hourly"]["count"] += 1 + sender_counter["daily"]["count"] += 1 + self.sender_counters[sender_email] = sender_counter + + return { + "allowed": allowed, + "hourly_used": sender_counter["hourly"]["count"], + "hourly_limit": self.sender_limits["hourly"], + "daily_used": sender_counter["daily"]["count"], + "daily_limit": self.sender_limits["daily"], + "remaining": min(self.sender_limits["hourly"] - sender_counter["hourly"]["count"], + self.sender_limits["daily"] - sender_counter["daily"]["count"]) + } + + async def check_global_rate_limit(self) -> Dict: + """check global rate limit""" + current_time = datetime.utcnow() + + # check if counter needs to be reset + if (current_time - self.global_counter["reset_time"]).total_seconds() >= self.hourly_window: + self.global_counter = {"count": 0, "reset_time": current_time} + + # check limits + allowed = self.global_counter["count"] < self.global_limits["hourly"] + + if not allowed: + await self.module_logger.log_warning( + "Global rate limit exceeded", + properties={ + "current_count": self.global_counter["count"], + "limit": self.global_limits["hourly"] + } + ) + + self.global_counter["count"] += 1 + + return { + "allowed": allowed, + "used": self.global_counter["count"], + "limit": self.global_limits["hourly"], + "remaining": self.global_limits["hourly"] - self.global_counter["count"] + } + + async def check_all_rate_limits(self, tenant_id: str, sender_email: str) -> Dict[str, Dict]: + """check all rate limits""" + results = { + "tenant": await self.check_tenant_rate_limit(tenant_id), + "sender": await self.check_sender_rate_limit(sender_email), + "global": await self.check_global_rate_limit() + } + + await self.module_logger.log_info( + "Rate limit check completed", + properties={ + "tenant_id": tenant_id, + "sender_email": sender_email, + "results": results + } + ) + + return results \ No newline at end of file diff --git a/apps/notification/backend/infra/email/email_spam_protection/spam_detector_handler.py b/apps/notification/backend/infra/email/email_spam_protection/spam_detector_handler.py new file mode 100644 index 0000000..aca7270 --- /dev/null +++ b/apps/notification/backend/infra/email/email_spam_protection/spam_detector_handler.py @@ -0,0 +1,338 @@ +import re +from typing import Dict, List +from urllib.parse import urlparse +from common.log.module_logger import ModuleLogger +from common.constants.region import UserRegion + + +class SpamDetectorHandler: + def __init__(self): + self.module_logger = ModuleLogger(sender_id="SpamDetectorHandler") + + # English spam keywords + self.spam_keywords_en = [ + 'free', 'money', 'cash', 'winner', 'lottery', 'prize', 'urgent', + 'limited time', 'act now', 'click here', 'buy now', 'discount', + 'make money', 'earn money', 'work from home', 'get rich', + 'viagra', 'cialis', 'weight loss', 'diet pills', + 'credit card', 'loan', 'debt', 'refinance', + 'investment', 'stock', 'trading', 'crypto' + ] + + # chinese spam keywords + self.spam_keywords_zh = [ + '免费', '赚钱', '现金', '中奖', '彩票', '奖品', '紧急', + '限时', '立即行动', '点击这里', '立即购买', '折扣', + '赚钱', '赚大钱', '在家工作', '致富', + '伟哥', '减肥', '减肥药', '保健品', + '信用卡', '贷款', '债务', '再融资', + '投资', '股票', '交易', '加密货币' + ] + + # suspicious link patterns + self.suspicious_link_patterns = [ + r'bit\.ly', r'tinyurl\.com', r'goo\.gl', r't\.co', + r'click\.here', r'buy\.now', r'free\.money', + r'\.cn', r'\.hk', r'\.tw', + r'[^\x00-\x7F]', + ] + + # chinese spam patterns + self.chinese_spam_patterns = [ + r'[免费|赚钱|中奖|彩票|奖品|紧急]{2,}', + r'[!]{2,}', + r'[?]{2,}', + r'[。]{2,}', + r'[,]{2,}', + r'[、]{2,}', + r'[:]{2,}', + r'[;]{2,}', + r'[(]{2,}', + r'[)]{2,}', + r'[【]{2,}', + r'【.*?】', + r'(.*?)', + r'《.*?》', + r'[0-9]{11}', + r'[0-9]{6,}', + r'[A-Za-z0-9]{20,}' + ] + + # content analysis results + self.content_analysis = {} + self.link_analysis = {} + self.keyword_analysis = {} + + async def _analyze_chinese_spam_patterns(self, subject: str, body: str) -> float: + """analyze chinese spam patterns""" + try: + score = 0.0 + text = f"{subject} {body}" + + for pattern in self.chinese_spam_patterns: + matches = re.findall(pattern, text) + if matches: + score += 0.1 * len(matches) + + chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text)) + total_chars = len(text) + if total_chars > 0: + chinese_ratio = chinese_chars / total_chars + if chinese_ratio > 0.8: + spam_keyword_count = 0 + for keyword in self.spam_keywords_zh: + if keyword in text: + spam_keyword_count += 1 + + if spam_keyword_count > 3: + score += 0.3 + + return min(score, 1.0) + + except Exception as e: + await self.module_logger.log_error( + "Chinese spam pattern analysis failed", + properties={"error": str(e)} + ) + return 0.0 + + async def _analyze_english_spam_patterns(self, subject: str, body: str) -> float: + """analyze english spam patterns""" + try: + score = 0.0 + text = f"{subject} {body}".lower() + + for keyword in self.spam_keywords_en: + if keyword.lower() in text: + score += 0.1 + + words = text.split() + upper_words = [word for word in words if word.isupper() and len(word) > 2] + if len(upper_words) > len(words) * 0.3: + score += 0.2 + + word_count = {} + for word in words: + word_count[word] = word_count.get(word, 0) + 1 + + repeated_words = [word for word, count in word_count.items() if count > 3] + if len(repeated_words) > 2: + score += 0.2 + + return min(score, 1.0) + + except Exception as e: + await self.module_logger.log_error( + "English spam pattern analysis failed", + properties={"error": str(e)} + ) + return 0.0 + + async def analyze_content(self, subject: str, body: str, region: int) -> float: + """analyze email content based on region""" + try: + score = 0.0 + total_checks = 0 + is_chinese_region = (region == 1) + + # check uppercase ratio (english region) + if not is_chinese_region and subject: + upper_ratio = sum(1 for c in subject if c.isupper()) / len(subject) + if upper_ratio > 0.7: + score += 0.3 + total_checks += 1 + + # check exclamation count (chinese and english) + exclamation_count = subject.count('!') + body.count('!') + subject.count('!') + body.count('!') + if exclamation_count > 3: + score += 0.2 + total_checks += 1 + + # check repeated characters (chinese and english) + if subject: + for char in subject: + if subject.count(char) > len(subject) * 0.3: + score += 0.2 + break + total_checks += 1 + + # check content length + if len(body) < 50: + score += 0.1 + total_checks += 1 + + # check HTML tag ratio + html_tags = len(re.findall(r'<[^>]+>', body)) + if html_tags > len(body) * 0.1: + score += 0.2 + total_checks += 1 + + # check specific features based on region + if is_chinese_region: + # check chinese spam features + chinese_score = await self._analyze_chinese_spam_patterns(subject, body) + score += chinese_score + total_checks += 1 + else: + # check english spam features + english_score = await self._analyze_english_spam_patterns(subject, body) + score += english_score + total_checks += 1 + + final_score = score / total_checks if total_checks > 0 else 0.0 + + # store analysis results + self.content_analysis = { + "subject": subject, + "body_length": len(body), + "region": region, + "is_chinese_region": is_chinese_region, + "upper_ratio": upper_ratio if not is_chinese_region and subject else 0, + "exclamation_count": exclamation_count, + "html_tag_count": html_tags, + "chinese_spam_score": chinese_score if is_chinese_region else 0, + "english_spam_score": english_score if not is_chinese_region else 0, + "score": final_score + } + + await self.module_logger.log_info( + "Content analysis completed", + properties=self.content_analysis + ) + + return final_score + + except Exception as e: + await self.module_logger.log_error( + "Content analysis failed", + properties={"error": str(e), "region": region.value} + ) + return 0.5 + + + + async def analyze_links(self, body: str, region: int) -> float: + """analyze links in email based on region""" + try: + score = 0.0 + total_links = 0 + suspicious_links = 0 + is_chinese_region = (region == 1) + + # extract all links + link_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+' + links = re.findall(link_pattern, body) + total_links = len(links) + suspicious_links = 0 + + # check suspicious link patterns + for link in links: + for pattern in self.suspicious_link_patterns: + if re.search(pattern, link, re.IGNORECASE): + suspicious_links += 1 + break + + # check specific features based on region + if is_chinese_region: + if not re.search(r'\.cn|\.hk|\.tw', link, re.IGNORECASE): + suspicious_links += 1 + else: + if not re.search(r'\.com|\.org|\.net', link, re.IGNORECASE): + suspicious_links += 1 + + # check domain length + try: + parsed_url = urlparse(link) + domain = parsed_url.netloc + if len(domain) > 50: + suspicious_links += 1 + except: + suspicious_links += 1 + + if total_links > 0: + score = suspicious_links / total_links + else: + score = 0.0 + + # store analysis results + self.link_analysis = { + "total_links": total_links, + "suspicious_links": suspicious_links, + "links": links, + "region": region, + "is_chinese_region": is_chinese_region, + "score": score + } + + await self.module_logger.log_info( + "Link analysis completed", + properties=self.link_analysis + ) + + return score + + except Exception as e: + await self.module_logger.log_error( + "Link analysis failed", + properties={"error": str(e), "region": region} + ) + return 0.5 + + async def analyze_keywords(self, subject: str, body: str, region: int) -> float: + """analyze keywords based on region""" + try: + score = 0.0 + found_keywords = [] + + # get keywords by region + if region == 1: + keywords = self.spam_keywords_zh + else: + keywords = self.spam_keywords_en + # TODO: add other regions + is_chinese_region = (region == 1) + + # merge subject and body for keyword check + text = f"{subject} {body}" + if not is_chinese_region: + text = text.lower() + + # check keywords + for keyword in keywords: + if is_chinese_region: + if keyword in text: + found_keywords.append(keyword) + score += 0.1 + else: + if keyword.lower() in text.lower(): + found_keywords.append(keyword) + score += 0.1 + + score = min(score, 1.0) + + # store analysis results + self.keyword_analysis = { + "found_keywords": found_keywords, + "keyword_count": len(found_keywords), + "total_keywords": len(keywords), + "region": region, + "is_chinese_region": is_chinese_region, + "keywords_used": keywords, + "score": score + } + + await self.module_logger.log_info( + "Keyword analysis completed", + properties=self.keyword_analysis + ) + + return score + + except Exception as e: + await self.module_logger.log_error( + "Keyword analysis failed", + properties={"error": str(e), "region": region} + ) + return 0.5 + + \ No newline at end of file diff --git a/apps/notification/backend/infra/email/email_validation_handler.py b/apps/notification/backend/infra/email/email_validation_handler.py new file mode 100644 index 0000000..aeecf02 --- /dev/null +++ b/apps/notification/backend/infra/email/email_validation_handler.py @@ -0,0 +1,91 @@ +import re +import dns.resolver +from typing import Dict, List +from common.log.module_logger import ModuleLogger +from common.exception.exceptions import InvalidDataError + + +class EmailValidationHandler: + def __init__(self): + self.module_logger = ModuleLogger(sender_id="EmailValidationHandler") + + self.email_pattern = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$') + # common spam keywords + self.spam_keywords = [ + 'free', 'money', 'cash', 'winner', 'lottery', 'prize', 'urgent', + 'limited time', 'act now', 'click here', 'buy now', 'discount' + ] + + async def is_valid_email(self, email: str) -> bool: + """validate email format""" + try: + if not email or not isinstance(email, str): + return False + + if not self.email_pattern.match(email): + return False + + # length check: RFC 5321 regulate the max length of email-254 + if len(email) > 254: + return False + + # local part and domain part check + local_part, domain_part = email.split('@', 1) + if len(local_part) > 64 or len(local_part) == 0: + return False + if len(domain_part) > 253 or len(domain_part) == 0: + return False + + # check if domain part contains valid characters + if not re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', domain_part): + return False + + await self.module_logger.log_info( + f"Email format validation passed: {email}", + properties={"email": email} + ) + + return True + + except Exception as e: + await self.module_logger.log_error( + f"Email validation failed: {email}", + properties={"email": email, "error": str(e)} + ) + return False + + async def is_valid_domain(self, email: str) -> bool: + """check if domain part is valid""" + try: + domain = email.split('@')[1] + + # check MX record + try: + mx_records = dns.resolver.resolve(domain, 'MX') + if not mx_records: + return False + except Exception: + return False + + # check A record (backup) + try: + a_records = dns.resolver.resolve(domain, 'A') + if not a_records: + return False + except Exception: + pass + + await self.module_logger.log_info( + f"Domain validation passed: {domain}", + properties={"domain": domain} + ) + + return True + + except Exception as e: + await self.module_logger.log_error( + f"Domain validation failed: {email}", + properties={"email": email, "error": str(e)} + ) + return False + diff --git a/apps/notification/backend/services/email/email_bounce_service.py b/apps/notification/backend/services/email/email_bounce_service.py new file mode 100644 index 0000000..df3d824 --- /dev/null +++ b/apps/notification/backend/services/email/email_bounce_service.py @@ -0,0 +1,194 @@ +from typing import Dict, Optional +from datetime import datetime +from common.log.module_logger import ModuleLogger +from backend.models.models import EmailBounceDoc, EmailSendStatusDoc +from common.constants.email import BounceType + + +class EmailBounceService: + def __init__(self): + self.module_logger = ModuleLogger(sender_id="EmailBounceService") + + async def process_bounce_event(self, email: str, tenant_id: str, bounce_type: BounceType, + reason: str, message_id: str = None) -> Dict: + """处理退信事件,建立email_id关联""" + try: + # 1. 查找对应的邮件记录 + email_status_doc = await EmailSendStatusDoc.find_one( + EmailSendStatusDoc.recipient_email == email, + EmailSendStatusDoc.tenant_id == tenant_id + ).sort(-EmailSendStatusDoc.created_at) # 获取最新的邮件记录 + + email_id = None + template_id = None + + if email_status_doc: + email_id = email_status_doc.email_id + template_id = email_status_doc.template_id + await self.module_logger.log_info( + "Found email record for bounce", + properties={ + "email": email, + "email_id": email_id, + "tenant_id": tenant_id + } + ) + else: + await self.module_logger.log_warning( + "No email record found for bounce", + properties={ + "email": email, + "tenant_id": tenant_id + } + ) + + # 2. 创建退信记录 + bounce_doc = EmailBounceDoc( + email=email, + tenant_id=tenant_id, + email_id=email_id, # 建立关联 + template_id=template_id, + bounce_type=bounce_type, + reason=reason, + bounced_at=datetime.utcnow(), + original_message_id=message_id, + processed=False + ) + + await bounce_doc.save() + + await self.module_logger.log_info( + "Bounce event processed successfully", + properties={ + "email": email, + "email_id": email_id, + "tenant_id": tenant_id, + "bounce_type": bounce_type.value, + "reason": reason + } + ) + + return { + "email": email, + "email_id": email_id, + "tenant_id": tenant_id, + "bounce_type": bounce_type.value, + "reason": reason, + "processed": True + } + + except Exception as e: + await self.module_logger.log_error( + "Failed to process bounce event", + properties={ + "email": email, + "tenant_id": tenant_id, + "error": str(e) + } + ) + raise + + async def get_bounce_info(self, email: str, tenant_id: str) -> Optional[Dict]: + """获取退信信息""" + try: + bounce_doc = await EmailBounceDoc.find_one( + EmailBounceDoc.email == email, + EmailBounceDoc.tenant_id == tenant_id + ).sort(-EmailBounceDoc.created_at) + + if not bounce_doc: + return None + + return { + "email": bounce_doc.email, + "email_id": bounce_doc.email_id, + "tenant_id": bounce_doc.tenant_id, + "template_id": bounce_doc.template_id, + "bounce_type": bounce_doc.bounce_type.value, + "reason": bounce_doc.reason, + "bounced_at": bounce_doc.bounced_at.isoformat(), + "original_message_id": bounce_doc.original_message_id, + "processed": bounce_doc.processed, + "processed_at": bounce_doc.processed_at.isoformat() if bounce_doc.processed_at else None, + "created_at": bounce_doc.created_at.isoformat() + } + + except Exception as e: + await self.module_logger.log_error( + "Failed to get bounce info", + properties={ + "email": email, + "tenant_id": tenant_id, + "error": str(e) + } + ) + raise + + async def mark_bounce_processed(self, email: str, tenant_id: str) -> bool: + """标记退信为已处理""" + try: + bounce_doc = await EmailBounceDoc.find_one( + EmailBounceDoc.email == email, + EmailBounceDoc.tenant_id == tenant_id + ).sort(-EmailBounceDoc.created_at) + + if bounce_doc: + bounce_doc.processed = True + bounce_doc.processed_at = datetime.utcnow() + await bounce_doc.save() + + await self.module_logger.log_info( + "Bounce marked as processed", + properties={ + "email": email, + "tenant_id": tenant_id + } + ) + return True + + return False + + except Exception as e: + await self.module_logger.log_error( + "Failed to mark bounce as processed", + properties={ + "email": email, + "tenant_id": tenant_id, + "error": str(e) + } + ) + raise + + async def is_blacklisted(self, email: str, tenant_id: str) -> bool: + """检查邮箱是否在黑名单中""" + try: + # 查找该邮箱的退信记录 + bounce_doc = await EmailBounceDoc.find_one( + EmailBounceDoc.email == email, + EmailBounceDoc.tenant_id == tenant_id, + EmailBounceDoc.bounce_type == BounceType.HARD_BOUNCE # 只检查硬退信 + ) + + is_blacklisted = bounce_doc is not None + + await self.module_logger.log_info( + "Email blacklist check completed", + properties={ + "email": email, + "tenant_id": tenant_id, + "is_blacklisted": is_blacklisted + } + ) + + return is_blacklisted + + except Exception as e: + await self.module_logger.log_error( + "Failed to check email blacklist", + properties={ + "email": email, + "tenant_id": tenant_id, + "error": str(e) + } + ) + return False \ No newline at end of file diff --git a/apps/notification/backend/services/email/email_spam_protection_service.py b/apps/notification/backend/services/email/email_spam_protection_service.py new file mode 100644 index 0000000..bef4414 --- /dev/null +++ b/apps/notification/backend/services/email/email_spam_protection_service.py @@ -0,0 +1,109 @@ +from typing import List, Dict, Optional +from datetime import datetime, timedelta +from common.log.module_logger import ModuleLogger +from common.exception.exceptions import InvalidDataError +from common.constants.region import UserRegion +from backend.infra.email.email_spam_protection.rate_limit_handler import RateLimitHandler +from backend.infra.email.email_spam_protection.spam_detector_handler import SpamDetectorHandler + + +class EmailSpamProtectionService: + def __init__(self): + self.rate_limit_handler = RateLimitHandler() + self.spam_detector_handler = SpamDetectorHandler() + self.module_logger = ModuleLogger(sender_id="EmailSpamProtectionService") + + async def check_rate_limit(self, tenant_id: str, sender_email: str) -> Dict: + """check rate limit""" + try: + # check tenant rate limit + tenant_limit = await self.rate_limit_handler.check_tenant_rate_limit(tenant_id) + + # check sender rate limit + sender_limit = await self.rate_limit_handler.check_sender_rate_limit(sender_email) + + # check global rate limit + global_limit = await self.rate_limit_handler.check_global_rate_limit() + + is_allowed = tenant_limit["allowed"] and sender_limit["allowed"] and global_limit["allowed"] + + await self.module_logger.log_info( + "Rate limit check completed", + properties={ + "tenant_id": tenant_id, + "sender_email": sender_email, + "is_allowed": is_allowed, + "tenant_remaining": tenant_limit.get("remaining", 0), + "sender_remaining": sender_limit.get("remaining", 0), + "global_remaining": global_limit.get("remaining", 0) + } + ) + + return { + "allowed": is_allowed, + "tenant_limit": tenant_limit, + "sender_limit": sender_limit, + "global_limit": global_limit + } + + except Exception as e: + await self.module_logger.log_error( + "Rate limit check failed", + properties={ + "tenant_id": tenant_id, + "sender_email": sender_email, + "error": str(e) + } + ) + raise + + async def detect_spam(self, email_content: Dict, region: int): + """detect if email is spam based on region""" + try: + subject = email_content.get("subject", "") + body = email_content.get("body", "") + + # content detection with region + content_score = await self.spam_detector_handler.analyze_content(subject, body, region) + + # link detection with region + link_score = await self.spam_detector_handler.analyze_links(body, region) + + # keyword detection with region + keyword_score = await self.spam_detector_handler.analyze_keywords(subject, body, region) + + # overall score + total_score = (content_score + link_score + keyword_score) / 3 + #TODO: threshold can be configured + is_spam = (total_score > 0.7) + + await self.module_logger.log_info( + "Spam detection completed", + properties={ + "region": region, + "content_score": content_score, + "link_score": link_score, + "keyword_score": keyword_score, + "total_score": total_score, + "is_spam": is_spam + } + ) + + return { + "is_spam": is_spam, + "total_score": total_score, + "content_score": content_score, + "link_score": link_score, + "keyword_score": keyword_score, + "region": region + } + + except Exception as e: + await self.module_logger.log_error( + "Spam detection failed", + properties={ + "error": str(e), + "region": region + } + ) + raise diff --git a/apps/notification/backend/services/email/email_status_service.py b/apps/notification/backend/services/email/email_status_service.py new file mode 100644 index 0000000..5f60d95 --- /dev/null +++ b/apps/notification/backend/services/email/email_status_service.py @@ -0,0 +1,194 @@ +from typing import Dict, Optional, List +from datetime import datetime +from common.log.module_logger import ModuleLogger +from backend.models.models import EmailSendStatusDoc, EmailTrackingDoc, EmailBounceDoc +from common.constants.email import EmailSendStatus, BounceType + + +class EmailStatusService: + def __init__(self): + self.module_logger = ModuleLogger(sender_id="EmailStatusService") + + async def get_email_status(self, email_id: str = None, tenant_id: str = None, recipient_email: str = None): + """Get comprehensive email status including send status, tracking, and bounce info""" + try: + # 1. judge email send status by email_id or recipient_email + if email_id: + # 1.1 get email send status and bounce info by email_id + try: + email_bounce_doc = await EmailBounceDoc.find_one( + {"email_id": email_id, "tenant_id": tenant_id} + ) + email_status_doc = await EmailSendStatusDoc.find_one( + {"email_id": email_id, "tenant_id": tenant_id} + ) + except Exception: + # If database is not initialized, return None for testing + email_bounce_doc = None + email_status_doc = None + + if not email_status_doc: + await self.module_logger.log_warning( + "Email status not found by email_id", + properties={ + "email_id": email_id, + "tenant_id": tenant_id + } + ) + return None + + elif recipient_email and tenant_id: + # 1.2 get email send status and bounce info by recipient_email (for bounce scenarios) + try: + email_bounce_doc = await EmailBounceDoc.find_one( + {"email": recipient_email, "tenant_id": tenant_id} + ) + if email_bounce_doc and email_bounce_doc.email_id: + email_status_doc = await EmailSendStatusDoc.find_one( + {"email_id": email_bounce_doc.email_id, "tenant_id": tenant_id} + ) + else: + email_status_doc = await EmailSendStatusDoc.find_one( + {"recipient_email": recipient_email, "tenant_id": tenant_id} + ).sort(-EmailSendStatusDoc.created_at) + except Exception: + # If database is not initialized, return None for testing + email_bounce_doc = None + email_status_doc = None + + if not email_status_doc: + await self.module_logger.log_warning( + "Email status not found by recipient_email", + properties={ + "recipient_email": recipient_email, + "tenant_id": tenant_id + } + ) + return None + else: + # 1.3 if no email_id and recipient_email, raise error + raise ValueError("Either email_id or (recipient_email, tenant_id) must be provided") + + # 2. get email tracking info + try: + email_tracking_doc = await EmailTrackingDoc.find_one( + {"email_id": email_status_doc.email_id, "tenant_id": tenant_id} + ) if email_status_doc else None + except Exception: + # If database is not initialized, return None for testing + email_tracking_doc = None + + # 3. build return result + if not email_status_doc: + return None + + status_info = { + "email_id": email_status_doc.email_id, + "tenant_id": tenant_id, + "recipient_email": email_status_doc.recipient_email, + "template_id": email_status_doc.template_id, + "subject": email_status_doc.subject, + "status": email_status_doc.status.value, + "sent_at": email_status_doc.sent_at.isoformat() if email_status_doc.sent_at else None, + "failed_at": email_status_doc.failed_at.isoformat() if email_status_doc.failed_at else None, + "error_message": email_status_doc.error_message, + "retry_count": email_status_doc.retry_count, + "max_retries": email_status_doc.max_retries, + "message_id": email_status_doc.message_id, + "created_at": email_status_doc.created_at.isoformat(), + "updated_at": email_status_doc.updated_at.isoformat() if email_status_doc.updated_at else None, + "email_senders": email_status_doc.email_senders, + "tracking": { + "enabled": email_tracking_doc.tracking_enabled if email_tracking_doc else False, + "opened_at": email_tracking_doc.opened_at.isoformat() if email_tracking_doc and email_tracking_doc.opened_at else None, + "opened_count": email_tracking_doc.opened_count if email_tracking_doc else 0, + "clicked_at": email_tracking_doc.clicked_at.isoformat() if email_tracking_doc and email_tracking_doc.clicked_at else None, + "clicked_count": email_tracking_doc.clicked_count if email_tracking_doc else 0, + "clicked_links": email_tracking_doc.clicked_links if email_tracking_doc else [], + "user_agent": email_tracking_doc.user_agent if email_tracking_doc else None, + "ip_address": email_tracking_doc.ip_address if email_tracking_doc else None + }, + "bounce": { + "bounced": email_bounce_doc is not None, + "bounce_type": email_bounce_doc.bounce_type.value if email_bounce_doc else None, + "bounce_reason": email_bounce_doc.reason if email_bounce_doc else None, + "bounced_at": email_bounce_doc.bounced_at.isoformat() if email_bounce_doc else None, + "processed": email_bounce_doc.processed if email_bounce_doc else False, + "processed_at": email_bounce_doc.processed_at.isoformat() if email_bounce_doc and email_bounce_doc.processed_at else None + } + } + + await self.module_logger.log_info( + "Email status retrieved successfully", + properties={ + "email_id": email_status_doc.email_id, + "tenant_id": tenant_id, + "status": status_info["status"] + } + ) + + return status_info + + except Exception as e: + await self.module_logger.log_error( + "Failed to get email status", + properties={ + "email_id": email_id or "unknown", + "tenant_id": tenant_id, + "error": str(e) + } + ) + raise + + async def get_tenant_email_status_list(self, tenant_id: str, limit: int = 50, offset: int = 0) -> Dict: + """Get list of email statuses for a tenant""" + try: + email_status_docs = await EmailSendStatusDoc.find( + {"tenant_id": tenant_id} + ).skip(offset).limit(limit).sort(-EmailSendStatusDoc.created_at).to_list() + + status_list = [] + for doc in email_status_docs: + status_list.append({ + "email_id": doc.email_id, + "recipient_email": doc.recipient_email, + "template_id": doc.template_id, + "subject": doc.subject, + "status": doc.status.value, + "sent_at": doc.sent_at.isoformat() if doc.sent_at else None, + "created_at": doc.created_at.isoformat() + }) + + total_count = await EmailSendStatusDoc.find( + EmailSendStatusDoc.tenant_id == tenant_id + ).count() + + await self.module_logger.log_info( + "Tenant email status list retrieved", + properties={ + "tenant_id": tenant_id, + "count": len(status_list), + "total_count": total_count + } + ) + + return { + "tenant_id": tenant_id, + "emails": status_list, + "pagination": { + "limit": limit, + "offset": offset, + "total_count": total_count, + "has_more": offset + limit < total_count + } + } + + except Exception as e: + await self.module_logger.log_error( + "Failed to get tenant email status list", + properties={ + "tenant_id": tenant_id, + "error": str(e) + } + ) + raise diff --git a/apps/notification/backend/services/email/email_validation_service.py b/apps/notification/backend/services/email/email_validation_service.py new file mode 100644 index 0000000..4e6d17b --- /dev/null +++ b/apps/notification/backend/services/email/email_validation_service.py @@ -0,0 +1,95 @@ +from typing import List, Dict, Tuple +from common.log.module_logger import ModuleLogger +from common.exception.exceptions import InvalidDataError +from backend.infra.email.email_validation_handler import EmailValidationHandler +from backend.infra.email_sender_handler import EmailSenderHandler + + +class EmailValidationService: + def __init__(self): + self.email_validation_handler = EmailValidationHandler() + self.email_sender_handler = EmailSenderHandler() + self.module_logger = ModuleLogger(sender_id="EmailValidationService") + + async def validate_emails(self, emails: List[str]): + """validate email list, return valid and invalid emails""" + try: + valid_emails = [] + invalid_emails = [] + + for email in emails: + if await self.email_validation_handler.is_valid_email(email): + valid_emails.append(email) + else: + invalid_emails.append(email) + await self.module_logger.log_warning( + f"Invalid email detected: {email}", + properties={"email": email} + ) + + await self.module_logger.log_info( + "Email validation completed", + properties={ + "total_emails": len(emails), + "valid_count": len(valid_emails), + "invalid_count": len(invalid_emails) + } + ) + + return valid_emails, invalid_emails + + except Exception as e: + await self.module_logger.log_error( + "Email validation failed", + properties={"error": str(e)} + ) + raise + + async def validate_sender_emails(self, tenant_id: str, sender_emails: List[str]): + """validate sender emails, including format validation and permission validation""" + try: + valid_senders = [] + invalid_senders = [] + + authorized_senders = await self.email_sender_handler.get_email_senders(tenant_id) + for sender_email in sender_emails: + # format validation + if not await self.email_validation_handler.is_valid_email(sender_email): + invalid_senders.append(sender_email) + continue + + # domain validation + if not await self.email_validation_handler.is_valid_domain(sender_email): + invalid_senders.append(sender_email) + continue + + # sender permission validation + # Allow support@freeleaps.com as default sender even if not in authorized_senders + if sender_email not in authorized_senders and sender_email != "support@freeleaps.com": + invalid_senders.append(sender_email) + continue + + valid_senders.append(sender_email) + + await self.module_logger.log_info( + "Sender email validation completed", + properties={ + "tenant_id": tenant_id, + "total_senders": len(sender_emails), + "valid_count": len(valid_senders), + "invalid_count": len(invalid_senders) + } + ) + + return valid_senders, invalid_senders + + except Exception as e: + await self.module_logger.log_error( + "Sender email validation failed", + properties={ + "tenant_id": tenant_id, + "error": str(e) + } + ) + raise + \ No newline at end of file diff --git a/apps/notification/common/config/rate_limit_settings.py b/apps/notification/common/config/rate_limit_settings.py new file mode 100644 index 0000000..bd9fa32 --- /dev/null +++ b/apps/notification/common/config/rate_limit_settings.py @@ -0,0 +1,29 @@ +import os +from typing import Dict, Any +from pydantic_settings import BaseSettings + + +class RateLimitSettings(BaseSettings): + GLOBAL_HOURLY_LIMIT: int = int(os.getenv("GLOBAL_HOURLY_LIMIT", "10000")) + GLOBAL_DAILY_LIMIT: int = int(os.getenv("GLOBAL_DAILY_LIMIT", "100000")) + + SENDER_HOURLY_LIMIT: int = int(os.getenv("SENDER_HOURLY_LIMIT", "100")) + SENDER_DAILY_LIMIT: int = int(os.getenv("SENDER_DAILY_LIMIT", "1000")) + + DEFAULT_TENANT_HOURLY_LIMIT: int = int(os.getenv("DEFAULT_TENANT_HOURLY_LIMIT", "500")) + DEFAULT_TENANT_DAILY_LIMIT: int = int(os.getenv("DEFAULT_TENANT_DAILY_LIMIT", "5000")) + + TENANT_SPECIFIC_LIMITS: str = os.getenv("TENANT_SPECIFIC_LIMITS", "{}") + + HOURLY_WINDOW: int = 3600 + DAILY_WINDOW: int = 86400 + + RESET_TIME_HOUR: int = 0 + RESET_TIME_MINUTE: int = 0 + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + extra = "ignore" + +rate_limit_settings = RateLimitSettings() \ No newline at end of file diff --git a/apps/notification/webapi/routes/tenant_notification.py b/apps/notification/webapi/routes/tenant_notification.py new file mode 100644 index 0000000..c7987ff --- /dev/null +++ b/apps/notification/webapi/routes/tenant_notification.py @@ -0,0 +1,90 @@ +from fastapi import APIRouter, HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel +from typing import Dict, List, Optional +from backend.application.tenant_notification_hub import TenantNotificationHub + + +router = APIRouter() + +tenant_notification_hub = TenantNotificationHub() + +class TenantEmailRequest(BaseModel): + tenant_id: str + template_id: str + recipient_emails: List[str] + subject_properties: Dict = {} + body_properties: Dict = {} + region: int + sender_emails: Optional[List[str]] = None + priority: str = "normal" + tracking_enabled: bool = True + +@router.post( + "/send_tenant_email", + operation_id="send_tenant_email", + summary="Send email using tenant's template and email senders", + description="Send email using tenant's selected template and email senders", + response_description="Success/failure response in processing the tenant email send request", +) +async def send_tenant_email(request: TenantEmailRequest): + try: + result = await tenant_notification_hub.send_tenant_email( + tenant_id=request.tenant_id, + template_id=request.template_id, + recipient_emails=request.recipient_emails, + subject_properties=request.subject_properties, + body_properties=request.body_properties, + region=request.region, + sender_emails=request.sender_emails, + priority=request.priority, + tracking_enabled=request.tracking_enabled + ) + return JSONResponse( + content={ + "message": "Tenant email queued successfully.", + "message_id": result.get("message_id"), + "email_ids": result.get("email_ids", []) + }, + status_code=200 + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to send tenant email: {str(e)}") + +@router.get( + "/tenant_email_status/{tenant_id}", + operation_id="get_tenant_email_status", + summary="Get tenant email status", + description="Get the status of a tenant email by email_id or recipient_email", +) +async def get_tenant_email_status(tenant_id: str, email_id: str = None, recipient_email: str = None): + try: + status = await tenant_notification_hub.get_tenant_email_status(tenant_id, email_id, recipient_email) + return JSONResponse(content=status, status_code=200) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to get tenant email status: {str(e)}") + +@router.get( + "/tenant_email_status_list/{tenant_id}", + operation_id="get_tenant_email_status_list", + summary="Get tenant email status list", + description="Get a list of email statuses for a tenant with pagination", +) +async def get_tenant_email_status_list( + tenant_id: str, + limit: int = 50, + offset: int = 0 +): + try: + status_list = await tenant_notification_hub.get_tenant_email_status_list(tenant_id, limit, offset) + return JSONResponse(content=status_list, status_code=200) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to get tenant email status list: {str(e)}") + + \ No newline at end of file