freeleaps-service-hub/apps/notification/backend/infra/template_message_handler.py
YuehuCao 2213fa59b5 feat(templates): enforce uniqueness and improve safety
- Raise TemplateExistsError when duplicate detected
- Migrate placeholder syntax from {} to {{}}
- Add validation for:
  * Reserved keyword collisions
  * Injection attempt patterns
- Update all test cases
2025-08-18 22:15:43 +08:00

533 lines
20 KiB
Python

import re
from backend.models.models import MessageTemplateDoc
from common.log.module_logger import ModuleLogger
from datetime import datetime
from typing import List, Optional
from common.constants.region import UserRegion
class TemplateMessageHandler:
def __init__(self):
self.module_logger = ModuleLogger(sender_id="TemplateMessageHandler")
@staticmethod
def _normalize_placeholders(text: str) -> str:
"""Convert Handlebars-like placeholders {{name}} into Python str.format style {name}.
Does not touch CSS double braces like `.class {{ ... }}` because those won't match \w+.
"""
if not isinstance(text, str):
return text
return re.sub(r"\{\{(\w+)\}\}", r"{\1}", text)
async def verify_tenant_access(self, template_id: str, tenant_id: str, region: int) -> Optional[MessageTemplateDoc]:
"""get template by tenant and template ids with region"""
try:
template = await MessageTemplateDoc.find_one({
"template_id": template_id,
"tenant_id": tenant_id,
"region": region
})
await self.module_logger.log_info(
info="Tenant template retrieved from database",
properties={
"template_id": template_id,
"tenant_id": tenant_id,
"region": region
}
)
return template
except Exception as e:
await self.module_logger.log_error(
error="Failed to get tenant template from database",
properties={
"template_id": template_id,
"tenant_id": tenant_id,
"region": region,
"error": str(e)
}
)
raise
# ==================== global templates ====================
async def create_global_template(self, template: MessageTemplateDoc) -> MessageTemplateDoc:
"""create global template with upsert support"""
try:
# check if template already exists
existing_template = await MessageTemplateDoc.find_one({
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"region": template.region
})
if existing_template:
await self.module_logger.log_info(
info="Global template already exists",
properties={
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"region": template.region
}
)
raise ValueError("Global template already exists")
else:
await template.create()
await self.module_logger.log_info(
info="Template created in database",
properties={
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"region": template.region
}
)
return template
except Exception as e:
await self.module_logger.log_error(
error="Failed to create/update template in database",
properties={
"template_id": template.template_id,
"error": str(e)
}
)
raise
async def update_global_template(self, template_id: str, data: dict, region: int) -> dict:
"""update global template"""
try:
template = await MessageTemplateDoc.find_one({
"template_id": template_id,
"tenant_id": None,
"region": region
})
if not template:
raise ValueError(f"Template not found for template_id: {template_id}, region: {data.get('region')}")
data["updated_at"] = datetime.utcnow()
# remove fields that are not allowed to be updated
if "id" in data:
del data["id"]
if "template_id" in data:
del data["template_id"]
if "tenant_id" in data:
del data["tenant_id"]
if "region" in data:
del data["region"]
await template.set(data)
await self.module_logger.log_info(
info="Template updated in database",
properties={
"template_id": template_id,
"tenant_id": data.get("tenant_id"),
"region": data.get("region"),
"updated_fields": list(data.keys())
}
)
return {"success": True}
except Exception as e:
await self.module_logger.log_error(
error="Failed to update template in database",
properties={
"template_id": template_id,
"region": data.get("region"),
"error": str(e)
}
)
raise
async def delete_global_template(self, template_id: str) -> dict:
"""delete global template"""
try:
# find all global templates named as template_id
templates = await MessageTemplateDoc.find({
"template_id": template_id,
"tenant_id": None
}).to_list()
if not templates:
raise ValueError("Global template not found")
# delete all found templates
deleted_count = 0
for template in templates:
await template.delete()
deleted_count += 1
await self.module_logger.log_info(
info="Global templates deleted from database",
properties={
"template_id": template_id,
"deleted_count": deleted_count,
"total_found": len(templates)
}
)
return {"success": True, "deleted_count": deleted_count}
except Exception as e:
await self.module_logger.log_error(
error="Failed to delete global templates from database",
properties={
"template_id": template_id,
"error": str(e)
}
)
raise
async def list_global_templates(self, region: int) -> List[MessageTemplateDoc]:
"""list global templates"""
try:
templates = await MessageTemplateDoc.find({
"tenant_id": None,
"region": region
}).to_list()
await self.module_logger.log_info(
info="Global templates retrieved from database",
properties={"region": region, "count": len(templates)}
)
return templates
except Exception as e:
await self.module_logger.log_error(
error="Failed to get global templates from database",
properties={
"region": region,
"error": str(e)
}
)
raise
async def find_global_template(self, template_id: str, region: int) -> Optional[MessageTemplateDoc]:
"""find global template"""
try:
template = await MessageTemplateDoc.find_one({
"template_id": template_id,
"tenant_id": None,
"region": region
})
await self.module_logger.log_info(
info="Global template retrieved from database",
properties={
"template_id": template_id,
"region": region
}
)
return template
except Exception as e:
await self.module_logger.log_error(
error="Failed to get global template from database",
properties={
"template_id": template_id,
"region": region,
"error": str(e)
}
)
raise
# ==================== tenant templates ====================
async def list_tenant_templates(self, tenant_id: str, region: int) -> List[MessageTemplateDoc]:
"""list tenant templates"""
try:
templates = await MessageTemplateDoc.find({
"tenant_id": tenant_id,
"region": region
}).to_list()
await self.module_logger.log_info(
info="Tenant templates retrieved from database",
properties={
"tenant_id": tenant_id,
"region": region,
"count": len(templates)
}
)
return templates
except Exception as e:
await self.module_logger.log_error(
error="Failed to get tenant templates from database",
properties={
"tenant_id": tenant_id,
"region": region,
"error": str(e)
}
)
raise
async def find_tenant_template(self, tenant_id: str, template_id: str, region: int) -> Optional[MessageTemplateDoc]:
"""find tenant template"""
try:
template = await MessageTemplateDoc.find_one({
"template_id": template_id,
"tenant_id": tenant_id,
"region": region
})
await self.module_logger.log_info(
info="Tenant template retrieved from database",
properties={
"template_id": template_id,
"tenant_id": tenant_id,
"region": region
}
)
return template
except Exception as e:
await self.module_logger.log_error(
error="Failed to get tenant template from database",
properties={
"template_id": template_id,
"region": region,
"error": str(e)
}
)
raise
async def create_tenant_template(self, tenant_id: str, template: MessageTemplateDoc) -> MessageTemplateDoc:
"""create tenant template with upsert support"""
try:
# check if template already exists
existing_template = await MessageTemplateDoc.find_one({
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"region": template.region
})
if existing_template:
await self.module_logger.log_info(
info="Tenant Template already exists",
properties={
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"region": template.region
}
)
raise ValueError("Template already exists")
else:
await template.create()
await self.module_logger.log_info(
info="Template created in database",
properties={
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"region": template.region
}
)
return template
except Exception as e:
await self.module_logger.log_error(
error="Failed to create/update tenant template in database",
properties={
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"error": str(e)
}
)
raise
async def update_tenant_template(self, tenant_id: str, template_id: str, data: dict, region: int) -> dict:
"""update template"""
try:
template = await MessageTemplateDoc.find_one({
"template_id": template_id,
"tenant_id": tenant_id,
"region": region
})
if not template:
raise ValueError(f"Template not found for template_id: {template_id}, tenant_id: {data.get('tenant_id')}, region: {data.get('region')}")
data["updated_at"] = datetime.utcnow()
# remove fields that are not allowed to be updated
if "id" in data:
del data["id"]
if "template_id" in data:
del data["template_id"]
if "tenant_id" in data:
del data["tenant_id"]
if "region" in data:
del data["region"]
await template.set(data)
await self.module_logger.log_info(
info="Template updated in database",
properties={
"template_id": template_id,
"tenant_id": data.get("tenant_id"),
"region": data.get("region"),
"updated_fields": list(data.keys())
}
)
return {"success": True}
except Exception as e:
await self.module_logger.log_error(
error="Failed to update template in database",
properties={
"template_id": template_id,
"tenant_id": data.get("tenant_id"),
"region": data.get("region"),
"error": str(e)
}
)
raise
async def delete_tenant_template(self, tenant_id: str, template_id: str, region: int) -> dict:
"""delete tenant template"""
try:
template = await MessageTemplateDoc.find_one({
"template_id": template_id,
"tenant_id": tenant_id,
"region": region
})
if not template:
raise ValueError("Template not found")
await template.delete()
await self.module_logger.log_info(
info="Template deleted from database",
properties={"template_id": template_id}
)
return {"success": True}
except Exception as e:
await self.module_logger.log_error(
error="Failed to delete template from database",
properties={
"template_id": template_id,
"error": str(e)
}
)
raise
async def validate_template_parameters(self, template: MessageTemplateDoc, properties: dict) -> list:
"""validate template parameters"""
try:
# Normalize double-curly placeholders to single-curly for extraction
normalized_subject = self._normalize_placeholders(template.subject)
normalized_body = self._normalize_placeholders(template.body)
subject_placeholders = re.findall(r'\{(\w+)\}', normalized_subject)
body_placeholders = re.findall(r'\{(\w+)\}', normalized_body)
all_placeholders = list(set(subject_placeholders + body_placeholders))
missing_params = set(all_placeholders) - set(properties.keys())
if missing_params:
raise ValueError(f"Missing required parameters: {missing_params}. "
f"Template requires: {all_placeholders}")
extra_params = set(properties.keys()) - set(all_placeholders)
if extra_params:
await self.module_logger.log_warning(
f"Extra parameters provided: {extra_params}",
properties={
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"extra_params": list(extra_params)
}
)
for param, value in properties.items():
if param in all_placeholders:
if value is None:
raise ValueError(f"Parameter '{param}' cannot be None")
if isinstance(value, str) and not value.strip():
await self.module_logger.log_warning(
f"Parameter '{param}' is empty string",
properties={
"template_id": template.template_id,
"parameter": param
}
)
return all_placeholders
except Exception as e:
await self.module_logger.log_error(
error="Template parameter validation failed",
properties={
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"error": str(e)
}
)
raise
async def render_template(self, template: MessageTemplateDoc, properties: dict) -> dict:
"""render template"""
try:
# Build normalized copies for rendering
normalized_subject = self._normalize_placeholders(template.subject)
normalized_body = self._normalize_placeholders(template.body)
# Validate using normalized content
temp_for_validation = MessageTemplateDoc(
template_id=template.template_id,
tenant_id=template.tenant_id,
region=template.region,
subject=normalized_subject,
body=normalized_body,
is_active=template.is_active,
)
required_params = await self.validate_template_parameters(temp_for_validation, properties)
subject = normalized_subject.format(**properties)
body = normalized_body.format(**properties)
await self.module_logger.log_info(
info="Template rendered",
properties={
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"region": template.region,
"required_params": required_params,
"provided_params": list(properties.keys())
}
)
return {
"subject": subject,
"body": body,
"required_params": required_params,
"template_id": template.template_id,
"region": template.region
}
except ValueError as e:
await self.module_logger.log_error(
error="Template validation failed",
properties={
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"region": template.region,
"error": str(e)
}
)
raise
except KeyError as e:
await self.module_logger.log_error(
error="Missing template parameter",
properties={
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"region": template.region,
"missing_parameter": str(e)
}
)
raise ValueError(f"Missing template parameter: {str(e)}")
except Exception as e:
await self.module_logger.log_error(
error="Template rendering error",
properties={
"template_id": template.template_id,
"tenant_id": template.tenant_id,
"region": template.region,
"error": str(e)
}
)
raise ValueError(f"Template rendering error: {str(e)}")