freeleaps-service-hub/apps/authentication/backend/infra/permission/role_handler.py
2025-09-26 17:18:44 +08:00

196 lines
8.2 KiB
Python

from typing import Optional, List, Tuple
from fastapi.exceptions import RequestValidationError
from backend.models.permission.models import RoleDoc, PermissionDoc, UserRoleDoc
from beanie import PydanticObjectId
from datetime import datetime
class RoleHandler:
def __init__(self):
pass
async def create_role(self, role_key: str, role_name: str, role_description: Optional[str], role_level: int) -> Optional[RoleDoc]:
"""Create a new role, ensuring role_key and role_name are unique and not empty"""
if not role_key or not role_name:
raise RequestValidationError("role_key and role_name are required.")
if await RoleDoc.find_one({str(RoleDoc.role_key): role_key}) or await RoleDoc.find_one(
{str(RoleDoc.role_name): role_name}):
raise RequestValidationError("role_key or role_name has already been created.")
doc = RoleDoc(
role_key=role_key,
role_name=role_name,
role_description=role_description,
permission_ids=[],
role_level=role_level,
created_at=datetime.now(),
updated_at=datetime.now()
)
await doc.insert()
return doc
async def update_role(self, role_id: PydanticObjectId, role_key: str, role_name: str,
role_description: Optional[str], role_level: int) -> Optional[
RoleDoc]:
"""Update an existing role, ensuring role_key and role_name are unique and not empty"""
if not role_id or not role_key or not role_name:
raise RequestValidationError("role_id, role_key and role_name are required.")
doc = await RoleDoc.get(role_id)
if not doc:
raise RequestValidationError("role not found.")
if doc.is_default:
raise RequestValidationError("Default role cannot be updated.")
# Check for uniqueness (exclude self)
conflict = await RoleDoc.find_one({
"$and": [
{"_id": {"$ne": role_id}},
{"$or": [
{str(RoleDoc.role_key): role_key},
{str(RoleDoc.role_name): role_name}
]}
]
})
if conflict:
raise RequestValidationError("role_key or role_name already exists.")
doc.role_key = role_key
doc.role_name = role_name
doc.role_description = role_description
doc.role_level = role_level
doc.updated_at = datetime.now()
await doc.save()
return doc
async def create_or_update_role(self, role_key: str, role_name: str, role_level: int, custom_role_id: Optional[str], role_description: Optional[str] = None) -> Optional[RoleDoc]:
"""Create or update a role document"""
# Input validation
if not role_key or not role_name:
raise RequestValidationError("role_key and role_name are required.")
def create_new_doc():
return RoleDoc(
role_key=role_key,
role_name=role_name,
role_description=role_description,
role_level=role_level,
permission_ids=[],
created_at=datetime.now(),
updated_at=datetime.now()
)
def update_doc_fields(doc):
doc.role_key = role_key
doc.role_name = role_name
doc.role_description = role_description
doc.role_level = role_level
doc.updated_at = datetime.now()
# Check if role with this key already exists
existing_doc = await RoleDoc.find_one(
{str(RoleDoc.role_key): role_key}
)
if existing_doc:
# If role with this key already exists
if custom_role_id and str(custom_role_id) != str(existing_doc.id):
# Different ID provided - replace the document
id_conflict = await RoleDoc.get(custom_role_id)
if id_conflict:
raise RequestValidationError("Role with the provided ID already exists.")
new_doc = create_new_doc()
new_doc.id = PydanticObjectId(custom_role_id)
await new_doc.insert()
await existing_doc.delete()
return new_doc
else:
# Same ID or no ID provided - update existing document
update_doc_fields(existing_doc)
await existing_doc.save()
return existing_doc
else:
# If no existing document with this key, create new document
new_doc = create_new_doc()
if custom_role_id:
id_conflict = await RoleDoc.get(custom_role_id)
if id_conflict:
raise RequestValidationError("Role with the provided ID already exists.")
new_doc.id = PydanticObjectId(custom_role_id)
await new_doc.insert()
return new_doc
async def query_roles(self, role_key: Optional[str], role_name: Optional[str], skip: int = 0, limit: int = 10) -> \
Tuple[List[RoleDoc], int]:
"""Query roles with pagination and fuzzy search by role_key and role_name"""
query = {}
if role_key:
query[str(RoleDoc.role_key)] = {"$regex": role_key, "$options": "i"}
if role_name:
query[str(RoleDoc.role_name)] = {"$regex": role_name, "$options": "i"}
cursor = RoleDoc.find(query)
total = await cursor.count()
docs = await cursor.skip(skip).limit(limit).to_list()
return docs, total
async def query_roles_no_pagination(
self,
role_id: Optional[str] = None,
role_key: Optional[str] = None,
role_name: Optional[str] = None
) -> Tuple[List[RoleDoc], int]:
"""Query roles fuzzy search without pagination"""
query = {}
if role_id:
try:
query[str(RoleDoc.id)] = PydanticObjectId(role_id)
except Exception:
raise RequestValidationError("Invalid role_id format. Must be a valid ObjectId.")
if role_key:
query[str(RoleDoc.role_key)] = {"$regex": role_key, "$options": "i"}
if role_name:
query[str(RoleDoc.role_name)] = {"$regex": role_name, "$options": "i"}
cursor = RoleDoc.find(query)
total = await cursor.count()
docs = await cursor.to_list()
return docs, total
async def assign_permissions_to_role(self, role_id: PydanticObjectId, permission_ids: List[str]) -> Optional[RoleDoc]:
"""Assign permissions to a role by updating the permission_ids field"""
if not role_id or not permission_ids:
raise RequestValidationError("role_id and permission_ids are required.")
doc = await RoleDoc.get(role_id)
if not doc:
raise RequestValidationError("Role not found.")
# Validate that all permission_ids exist in the permission collection
for permission_id in permission_ids:
permission_doc = await PermissionDoc.get(PydanticObjectId(permission_id))
if not permission_doc:
raise RequestValidationError(f"Permission with id {permission_id} not found.")
# Remove duplicates from permission_ids
unique_permission_ids = list(dict.fromkeys(permission_ids))
doc.permission_ids = unique_permission_ids
doc.updated_at = datetime.now()
await doc.save()
return doc
async def delete_role(self, role_id: PydanticObjectId) -> None:
"""Delete a role document after checking if it is referenced by any user and is not default"""
if not role_id:
raise RequestValidationError("role_id is required.")
# Check if any user references this role
user_role = await UserRoleDoc.find_one({"role_ids": str(role_id)})
if user_role:
raise RequestValidationError("Role is referenced by a user and cannot be deleted.")
doc = await RoleDoc.get(role_id)
if not doc:
raise RequestValidationError("Role not found.")
# Check if the role is default
if doc.is_default:
raise RequestValidationError("Default role cannot be deleted.")
await doc.delete()