from typing import Optional, List, Tuple from fastapi.exceptions import RequestValidationError from backend.models.permission.models import RoleDoc, PermissionDoc, UserRoleDoc from beanie import PydanticObjectId from datetime import datetime class RoleHandler: def __init__(self): pass async def create_role(self, role_key: str, role_name: str, role_description: Optional[str], role_level: int, custom_role_id: Optional[str] = None) -> Optional[RoleDoc]: """Create a new role, ensuring role_key and role_name are unique and not empty""" if not role_key or not role_name: raise RequestValidationError("role_key and role_name are required.") if await RoleDoc.find_one({str(RoleDoc.role_key): role_key}) or await RoleDoc.find_one( {str(RoleDoc.role_name): role_name}): raise RequestValidationError("role_key or role_name has already been created.") # Check if custom_role_id is provided and if it already exists if custom_role_id: try: custom_id = PydanticObjectId(custom_role_id) if await RoleDoc.get(custom_id): raise RequestValidationError("Role with the provided custom_role_id already exists.") except Exception: raise RequestValidationError("Invalid custom_role_id format. Must be a valid ObjectId.") doc = RoleDoc( role_key=role_key, role_name=role_name, role_description=role_description, permission_ids=[], role_level=role_level, created_at=datetime.now(), updated_at=datetime.now() ) # Set custom ID if provided if custom_role_id: doc.id = PydanticObjectId(custom_role_id) await doc.insert() return doc async def update_role(self, role_id: PydanticObjectId, role_key: str, role_name: str, role_description: Optional[str], role_level: int, custom_role_id: Optional[str] = None) -> Optional[ RoleDoc]: """Update an existing role, ensuring role_key and role_name are unique and not empty""" if not role_id or not role_key or not role_name: raise RequestValidationError("role_id, role_key and role_name are required.") doc = await RoleDoc.get(role_id) if not doc: raise RequestValidationError("role not found.") #if doc.is_default: # raise RequestValidationError("Default role cannot be updated.") # Check for uniqueness (exclude self) conflict = await RoleDoc.find_one({ "$and": [ {"_id": {"$ne": role_id}}, {"$or": [ {str(RoleDoc.role_key): role_key}, {str(RoleDoc.role_name): role_name} ]} ] }) if conflict: raise RequestValidationError("role_key or role_name already exists.") doc.role_key = role_key doc.role_name = role_name doc.role_description = role_description doc.role_level = role_level doc.updated_at = datetime.now() # Set custom role ID if provided if custom_role_id: # Store the old ID for cleanup old_id = doc.id doc.id = PydanticObjectId(custom_role_id) await doc.save() # Delete the old document with the original ID try: old_doc = await RoleDoc.get(old_id) if old_doc: await old_doc.delete() except Exception as e: # Log the error but don't fail the operation print(f"Warning: Failed to delete old role document {old_id}: {e}") else: await doc.save() return doc async def query_roles(self, role_key: Optional[str], role_name: Optional[str], skip: int = 0, limit: int = 10) -> \ Tuple[List[RoleDoc], int]: """Query roles with pagination and fuzzy search by role_key and role_name""" query = {} if role_key: query[str(RoleDoc.role_key)] = {"$regex": role_key, "$options": "i"} if role_name: query[str(RoleDoc.role_name)] = {"$regex": role_name, "$options": "i"} cursor = RoleDoc.find(query) total = await cursor.count() docs = await cursor.skip(skip).limit(limit).to_list() return docs, total async def assign_permissions_to_role(self, role_id: PydanticObjectId, permission_ids: List[str]) -> Optional[RoleDoc]: """Assign permissions to a role by updating the permission_ids field""" if not role_id or not permission_ids: raise RequestValidationError("role_id and permission_ids are required.") doc = await RoleDoc.get(role_id) if not doc: raise RequestValidationError("Role not found.") # Validate that all permission_ids exist in the permission collection for permission_id in permission_ids: permission_doc = await PermissionDoc.get(PydanticObjectId(permission_id)) if not permission_doc: raise RequestValidationError(f"Permission with id {permission_id} not found.") # Remove duplicates from permission_ids unique_permission_ids = list(dict.fromkeys(permission_ids)) doc.permission_ids = unique_permission_ids doc.updated_at = datetime.now() await doc.save() return doc async def delete_role(self, role_id: PydanticObjectId) -> None: """Delete a role document after checking if it is referenced by any user and is not default""" if not role_id: raise RequestValidationError("role_id is required.") # Check if any user references this role user_role = await UserRoleDoc.find_one({"role_ids": str(role_id)}) if user_role: raise RequestValidationError("Role is referenced by a user and cannot be deleted.") doc = await RoleDoc.get(role_id) if not doc: raise RequestValidationError("Role not found.") # Check if the role is default #if doc.is_default: # raise RequestValidationError("Default role cannot be deleted.") await doc.delete()