161 lines
6.1 KiB
Python
161 lines
6.1 KiB
Python
from fastapi import APIRouter, HTTPException, Depends
|
|
import traceback
|
|
from fastapi.responses import JSONResponse
|
|
from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from backend.application.email_sender_hub import EmailSenderHub
|
|
from common.token.token_manager import TokenManager
|
|
from pydantic import BaseModel
|
|
from typing import List
|
|
|
|
router = APIRouter()
|
|
|
|
security = HTTPBearer()
|
|
token_manager = TokenManager()
|
|
|
|
email_sender_hub = EmailSenderHub()
|
|
|
|
# Define the request body schema
|
|
class EmailSenderRequest(BaseModel):
|
|
email_sender: str
|
|
|
|
# check credentials for admin and tenant
|
|
def admin_only(credentials: HTTPAuthorizationCredentials = Depends(security)):
|
|
try:
|
|
payload = token_manager.decode_token(credentials.credentials)
|
|
|
|
# handle token generated by structure Authentication service
|
|
if "subject" in payload:
|
|
# Authentication format: payload.subject.role
|
|
role = payload.get("subject", {}).get("role")
|
|
else:
|
|
# local generated format: payload.role
|
|
role = payload.get("role")
|
|
|
|
# according to AdministrativeRole enum defined in authentication service, ADMINISTRATOR = 8
|
|
if role not in ["admin", 8]:
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
return payload
|
|
except Exception:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
def tenant_only(credentials: HTTPAuthorizationCredentials = Depends(security)):
|
|
try:
|
|
payload = token_manager.decode_token(credentials.credentials)
|
|
|
|
# handle token generated by structure Authentication service
|
|
if "subject" in payload:
|
|
tenant_id = payload.get("subject", {}).get("id")
|
|
user_id = payload.get("subject", {}).get("id")
|
|
role = payload.get("subject", {}).get("role")
|
|
else:
|
|
tenant_id = payload.get("tenant_id")
|
|
user_id = payload.get("id")
|
|
role = payload.get("role")
|
|
|
|
if not tenant_id:
|
|
raise HTTPException(status_code=403, detail="Tenant access required")
|
|
|
|
# according to AdministrativeRole enum defined in authentication service, BUSINESS = 2
|
|
if role not in [2, "tenant"]:
|
|
raise HTTPException(status_code=403, detail="Tenant access required")
|
|
|
|
payload["tenant_id"] = tenant_id
|
|
payload["user_id"] = user_id
|
|
payload["is_admin"] = False
|
|
return payload
|
|
except Exception:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
# Web API
|
|
# Get email sender for tenant
|
|
@router.get(
|
|
"/email_sender/get",
|
|
dependencies=[Depends(tenant_only)],
|
|
operation_id="get_email_sender",
|
|
summary="Get email sender for tenant",
|
|
description="Retrieve the email sender configured for the current tenant",
|
|
response_description="email sender address"
|
|
)
|
|
async def get_email_sender(payload: dict = Depends(tenant_only)):
|
|
try:
|
|
tenant_id = payload.get("tenant_id")
|
|
|
|
email_sender = await email_sender_hub.get_email_sender(tenant_id)
|
|
|
|
return JSONResponse(
|
|
content={"success": True, "email_sender": email_sender},
|
|
status_code=200
|
|
)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail="Failed to get email sender")
|
|
|
|
# Set email sender for tenant
|
|
@router.post(
|
|
"/email_sender/set",
|
|
dependencies=[Depends(tenant_only)],
|
|
operation_id="set_email_sender",
|
|
summary="Set email sender for tenant",
|
|
description="Set the email sender for the specified tenant (replaces existing)",
|
|
response_description="Success/failure response in setting email sender"
|
|
)
|
|
async def set_email_sender(request: EmailSenderRequest, payload: dict = Depends(tenant_only)):
|
|
try:
|
|
tenant_id = payload.get("tenant_id")
|
|
email_sender = await email_sender_hub.set_email_sender(tenant_id, request.email_sender)
|
|
return JSONResponse(
|
|
content=email_sender,
|
|
status_code=200
|
|
)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise HTTPException(status_code=500, detail=f"Failed to set email sender: {str(e)}")
|
|
|
|
# Update email sender for tenant
|
|
@router.put(
|
|
"/email_sender/update",
|
|
dependencies=[Depends(tenant_only)],
|
|
operation_id="update_email_sender",
|
|
summary="Update email sender for tenant",
|
|
description="Update the email sender for the specified tenant (only if exists)",
|
|
response_description="Success/failure response in updating email sender"
|
|
)
|
|
async def update_email_sender(request: EmailSenderRequest, payload: dict = Depends(tenant_only)):
|
|
try:
|
|
tenant_id = payload.get("tenant_id")
|
|
email_sender = await email_sender_hub.update_email_sender(tenant_id, request.email_sender)
|
|
return JSONResponse(
|
|
content=email_sender,
|
|
status_code=200
|
|
)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise HTTPException(status_code=500, detail=f"Failed to update email sender: {str(e)}")
|
|
|
|
# Delete email sender configuration for tenant
|
|
@router.delete(
|
|
"/email_sender/delete/{tenant_id}",
|
|
dependencies=[Depends(admin_only)],
|
|
operation_id="delete_email_sender",
|
|
summary="Delete email sender configuration for tenant",
|
|
description="Completely delete the email sender configuration for the specified tenant",
|
|
response_description="Success/failure response in deleting email sender configuration"
|
|
)
|
|
async def delete_email_sender(tenant_id: str):
|
|
try:
|
|
result = await email_sender_hub.delete_email_sender(tenant_id)
|
|
return JSONResponse(
|
|
content=result,
|
|
status_code=200
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail="Failed to delete email sender")
|