freeleaps-service-hub/apps/notification/backend/models/models.py

127 lines
3.5 KiB
Python

from datetime import datetime
from typing import Optional, List
from common.constants.region import UserRegion
from common.constants.email import EmailSendStatus, BounceType
from backend.models.base_doc import BaseDoc
class MessageTemplateDoc(BaseDoc):
template_id: str
tenant_id: Optional[str] = None
region: UserRegion
subject: str
body: str
is_active: bool = True
created_at: datetime = datetime.utcnow()
updated_at: Optional[datetime] = None
class Settings:
name = "message_templates_doc"
indexes = [
"template_id",
"tenant_id",
"region"
]
class EmailSenderDoc(BaseDoc):
tenant_id: str
email_sender: Optional[str] = None
is_active: bool = True
class Settings:
name = "email_sender_doc"
indexes = ["tenant_id"]
class EmailSendStatusDoc(BaseDoc):
email_id: str
tenant_id: str
email_sender: Optional[str] = None
recipient_email: str
template_id: Optional[str] = None
subject: str
body: str
status: EmailSendStatus = EmailSendStatus.PENDING
sent_at: Optional[datetime] = None
failed_at: Optional[datetime] = None
error_message: Optional[str] = None
retry_count: int = 0
max_retries: int = 3
message_id: Optional[str] = None
created_at: datetime = datetime.utcnow()
updated_at: Optional[datetime] = None
class Settings:
name = "email_send_status_doc"
indexes = [
"email_id",
"tenant_id"
]
class EmailTrackingDoc(BaseDoc):
email_id: str
tenant_id: str
recipient_email: str
template_id: Optional[str] = None
sent_at: datetime
message_id: Optional[str] = None
opened_at: Optional[datetime] = None
opened_count: int = 0
clicked_at: Optional[datetime] = None
clicked_count: int = 0
clicked_links: List[str] = []
user_agent: Optional[str] = None
ip_address: Optional[str] = None
tracking_enabled: bool = True
created_at: datetime = datetime.utcnow()
updated_at: Optional[datetime] = None
class Settings:
name = "email_tracking_doc"
indexes = [
"email_id",
"tenant_id"
]
class EmailBounceDoc(BaseDoc):
email: str
tenant_id: str
email_id: Optional[str] = None
template_id: Optional[str] = None
bounce_type: BounceType
reason: str
bounced_at: datetime
original_email_id: Optional[str] = None
original_message_id: Optional[str] = None
processed: bool = False
processed_at: Optional[datetime] = None
created_at: datetime = datetime.utcnow()
class Settings:
name = "email_bounce_doc"
indexes = [
"email",
"tenant_id"
]
class UsageLogDoc(BaseDoc):
timestamp: datetime = datetime.utcnow() # timestamp
tenant_id: str # tenant id
operation: str # operation type
request_id: str # request id # TODO: use true one
status: str # operation status
latency_ms: int # latency time(milliseconds)
bytes_in: int # input bytes
bytes_out: int # output bytes
key_id: Optional[str] = None # API Key ID
extra: dict = {} # extra information
class Settings:
name = "usage_log_doc"
indexes = [
"tenant_id",
"request_id",
"key_id"
]