freeleaps-service-hub/apps/authentication/backend/services/permission/role_service.py
2025-09-26 17:18:44 +08:00

56 lines
2.9 KiB
Python

from typing import Optional, Dict, Any, List
from fastapi.exceptions import RequestValidationError
from backend.infra.permission.role_handler import RoleHandler
from backend.models.permission.models import RoleDoc
from beanie import PydanticObjectId
class RoleService:
def __init__(self):
self.role_handler = RoleHandler()
async def create_role(self, role_key: str, role_name: str, role_description: Optional[str], role_level: int) -> RoleDoc:
"""Create a new role, ensuring role_key and role_name are unique and not empty"""
doc = await self.role_handler.create_role(role_key, role_name, role_description, role_level)
return doc
async def update_role(self, role_id: str, role_key: str, role_name: str, role_description: Optional[str], role_level: int) -> RoleDoc:
"""Update an existing role, ensuring role_key and role_name are unique and not empty"""
doc = await self.role_handler.update_role(PydanticObjectId(role_id), role_key, role_name, role_description, role_level)
return doc
async def create_or_update_role(self, role_key: str, role_name: str, role_level: int, custom_role_id: Optional[str], role_description: Optional[str] = None) -> RoleDoc:
"""Create or update a role document"""
return await self.role_handler.create_or_update_role(role_key, role_name, role_level, custom_role_id, role_description)
async def query_roles(self, role_key: Optional[str], role_name: Optional[str], page: int = 1, page_size: int = 10) -> Dict[str, Any]:
"""Query roles with pagination and fuzzy search by role_key and role_name"""
if page < 1 or page_size < 1:
raise RequestValidationError("page and page_size must be positive integers.")
skip = (page - 1) * page_size
docs, total = await self.role_handler.query_roles(role_key, role_name, skip, page_size)
return {
"items": [doc.dict() for doc in docs],
"total": total,
"page": page,
"page_size": page_size
}
async def query_roles_no_pagination(self, role_id: Optional[str] = None, role_key: Optional[str] = None, role_name: Optional[str] = None) -> Dict[str, Any]:
"""Query roles fuzzy search without pagination"""
docs, total = await self.role_handler.query_roles_no_pagination(role_id, role_key, role_name)
return {
"items": [doc.dict() for doc in docs],
"total": total
}
async def assign_permissions_to_role(self, role_id: str, permission_ids: List[str]) -> RoleDoc:
"""Assign permissions to a role by updating the permission_ids field"""
return await self.role_handler.assign_permissions_to_role(PydanticObjectId(role_id), permission_ids)
async def delete_role(self, role_id: str) -> None:
"""Delete a role document after checking if it is referenced by any user"""
return await self.role_handler.delete_role(PydanticObjectId(role_id))