freeleaps-service-hub/apps/notification/backend/application/template_message_hub.py
YuehuCao 11c1cc811d Refactor(application): Extract frequently used value into class-level constant
email_sender_hub.py: extract email_sender_manager = EmailSenderManager()
notification_hub.py: extract notification_manager = NotificationManager()
template_message_hub.py: extract template_message_manager = TemplateMessageManager()
template_message_hub.py: add more details in render_template function
2025-08-11 15:21:17 +08:00

140 lines
5.2 KiB
Python

from typing import Dict, List, Optional
from backend.business.template_message_manager import TemplateMessageManager
from common.constants.region import UserRegion
class TemplateMessageHub:
def __init__(self):
self.template_message_manager = TemplateMessageManager()
async def verify_tenant_access(self, template_id: str, tenant_id: str, region: int):
"""get template by tenant and template ids with region"""
if not template_id or not tenant_id:
raise ValueError("template_id and tenant_id are required")
return await self.template_message_manager.verify_tenant_access(template_id, tenant_id, region)
# ==================== global templates ====================
async def create_global_template(
self,
template_id: str,
region: int,
subject: str,
body: str,
is_active: bool = True
):
"""create global template"""
if not template_id or not subject or not body:
raise ValueError("template_id, subject, and body are required")
return await self.template_message_manager.create_global_template_from_data(
template_id=template_id,
region=region,
subject=subject,
body=body,
is_active=is_active
)
async def update_global_template(self, template_id: str, data: dict, region: int):
"""update global template"""
if not template_id:
raise ValueError("template_id is required")
if not data:
raise ValueError("Update data cannot be empty")
allowed_fields = {"subject", "body", "is_active"}
invalid_fields = set(data.keys()) - allowed_fields
if invalid_fields:
raise ValueError(f"Invalid update fields: {invalid_fields}")
return await self.template_message_manager.update_global_template(template_id, data, region)
async def delete_global_template(self, template_id: str):
"""delete global template"""
if not template_id:
raise ValueError("template_id is required")
return await self.template_message_manager.delete_global_template(template_id)
async def list_global_templates(self, region: int):
"""list global templates"""
return await self.template_message_manager.list_global_templates(region)
# ==================== TENANT templates ====================
async def list_tenant_templates(self, tenant_id: str, region: int):
"""list tenant templates"""
return await self.template_message_manager.list_tenant_templates(tenant_id, region)
async def assign_templates_to_tenant(self, tenant_id: str, template_ids: List[str], region: int):
"""assign templates to tenant"""
if not template_ids:
raise ValueError("template_ids cannot be empty")
if not tenant_id:
raise ValueError("tenant_id is required")
return await self.template_message_manager.assign_templates_to_tenant(tenant_id, template_ids, region)
async def create_tenant_template(
self,
template_id: str,
region: int,
subject: str,
body: str,
tenant_id: str,
is_active: bool = True
):
"""create tenant template"""
if not template_id or not subject or not body or not tenant_id:
raise ValueError("template_id, subject, body, and tenant_id are required")
return await self.template_message_manager.create_tenant_template_from_data(
template_id=template_id,
region=region,
subject=subject,
body=body,
tenant_id=tenant_id,
is_active=is_active
)
async def update_tenant_template(self, tenant_id: str, template_id: str, data: dict, region: int):
"""update tenant template"""
if not template_id or not tenant_id:
raise ValueError("template_id and tenant_id are required")
if not data:
raise ValueError("Update data cannot be empty")
allowed_fields = {"subject", "body", "is_active"}
invalid_fields = set(data.keys()) - allowed_fields
if invalid_fields:
raise ValueError(f"Invalid update fields: {invalid_fields}")
return await self.template_message_manager.update_tenant_template(tenant_id, template_id, data, region)
async def delete_tenant_template(self, tenant_id: str, template_id: str, region: int):
"""delete tenant template"""
if not template_id or not tenant_id:
raise ValueError("template_id and tenant_id are required")
return await self.template_message_manager.delete_tenant_template(tenant_id, template_id, region)
async def render_template(
self,
tenant_id: str,
template_id: str,
properties: dict,
region: int
):
"""Given the content of properties, render template with tenant_id, template_id and region"""
if not template_id or not tenant_id:
raise ValueError("template_id and tenant_id are required")
if not properties:
raise ValueError("properties cannot be empty")
return await self.template_message_manager.render_template(tenant_id, template_id, properties, region)