from typing import Optional, List, Tuple from fastapi.exceptions import RequestValidationError from backend.models.permission.models import RoleDoc, PermissionDoc, UserRoleDoc from bson import ObjectId from datetime import datetime class RoleHandler: def __init__(self): pass async def create_role(self, role_key: str, role_name: str, role_description: Optional[str], role_level: int) -> Optional[RoleDoc]: """Create a new role, ensuring role_key and role_name are unique and not empty""" if not role_key or not role_name: raise RequestValidationError("role_key and role_name are required.") if await RoleDoc.find_one({str(RoleDoc.role_key): role_key}) or await RoleDoc.find_one( {str(RoleDoc.role_name): role_name}): raise RequestValidationError("role_key or role_name has already been created.") doc = RoleDoc( role_key=role_key, role_name=role_name, role_description=role_description, permission_ids=[], role_level=role_level, created_at=datetime.now(), updated_at=datetime.now() ) await doc.create() return doc async def update_role(self, role_id: str, role_key: str, role_name: str, role_description: Optional[str], role_level: int) -> Optional[ RoleDoc]: """Update an existing role, ensuring role_key and role_name are unique and not empty""" if not role_id or not role_key or not role_name: raise RequestValidationError("role_id, role_key and role_name are required.") doc = await RoleDoc.get(role_id) if not doc: raise RequestValidationError("role not found.") if doc.is_default: raise RequestValidationError("Default role cannot be updated.") # Check for uniqueness (exclude self) conflict = await RoleDoc.find_one({ "$and": [ {"_id": {"$ne": role_id}}, {"$or": [ {str(RoleDoc.role_key): role_key}, {str(RoleDoc.role_name): role_name} ]} ] }) if conflict: raise RequestValidationError("role_key or role_name already exists.") doc.role_key = role_key doc.role_name = role_name doc.role_description = role_description doc.role_level = role_level doc.updated_at = datetime.now() await doc.save() return doc async def create_or_update_role(self, role_key: str, role_name: str, role_level: int, custom_role_id: Optional[str], role_description: Optional[str] = None) -> Optional[RoleDoc]: """Create or update a role document""" # Input validation if not role_key or not role_name: raise RequestValidationError("role_key and role_name are required.") def create_new_doc(): return RoleDoc( role_key=role_key, role_name=role_name, role_description=role_description, role_level=role_level, permission_ids=[], created_at=datetime.now(), updated_at=datetime.now() ) def update_doc_fields(doc): doc.role_key = role_key doc.role_name = role_name doc.role_description = role_description doc.role_level = role_level doc.updated_at = datetime.now() # Check if role with this key already exists existing_doc = await RoleDoc.find_one( {str(RoleDoc.role_key): role_key} ) if existing_doc: # If role with this key already exists if custom_role_id and str(custom_role_id) != str(existing_doc.id): # Different ID provided - replace the document id_conflict = await RoleDoc.get(custom_role_id) if id_conflict: raise RequestValidationError("Role with the provided ID already exists.") new_doc = create_new_doc() new_doc.id = custom_role_id await new_doc.create() await existing_doc.delete() return new_doc else: # Same ID or no ID provided - update existing document update_doc_fields(existing_doc) await existing_doc.save() return existing_doc else: # If no existing document with this key, create new document new_doc = create_new_doc() if custom_role_id: id_conflict = await RoleDoc.get(custom_role_id) if id_conflict: raise RequestValidationError("Role with the provided ID already exists.") new_doc.id = custom_role_id await new_doc.insert() return new_doc async def query_roles(self, role_key: Optional[str], role_name: Optional[str], skip: int = 0, limit: int = 10) -> \ Tuple[List[RoleDoc], int]: """Query roles with pagination and fuzzy search by role_key and role_name""" query = {} if role_key: query["role_key"] = {"$regex": role_key, "$options": "i"} if role_name: query["role_name"] = {"$regex": role_name, "$options": "i"} cursor = RoleDoc.find(query) total = await cursor.count() docs = await cursor.skip(skip).limit(limit).to_list() return docs, total async def query_roles_no_pagination( self, role_id: Optional[str] = None, role_key: Optional[str] = None, role_name: Optional[str] = None ) -> Tuple[List[RoleDoc], int]: """Query roles fuzzy search without pagination""" query = {} if role_id: try: ObjectId(role_id) # Validate ObjectId format query["_id"] = role_id # Use MongoDB's _id field directly except Exception: raise RequestValidationError("Invalid role_id format. Must be a valid ObjectId.") if role_key: query["role_key"] = {"$regex": role_key, "$options": "i"} if role_name: query["role_name"] = {"$regex": role_name, "$options": "i"} cursor = RoleDoc.find(query) total = await cursor.count() docs = await cursor.to_list() return docs, total async def assign_permissions_to_role(self, role_id: str, permission_ids: List[str]) -> Optional[RoleDoc]: """Assign permissions to a role by updating the permission_ids field""" if not role_id or not permission_ids: raise RequestValidationError("role_id and permission_ids are required.") doc = await RoleDoc.get(role_id) if not doc: raise RequestValidationError("Role not found.") # Validate that all permission_ids exist in the permission collection for permission_id in permission_ids: permission_doc = await PermissionDoc.get(permission_id) if not permission_doc: raise RequestValidationError(f"Permission with id {permission_id} not found.") # Remove duplicates from permission_ids unique_permission_ids = list(dict.fromkeys(permission_ids)) doc.permission_ids = unique_permission_ids doc.updated_at = datetime.now() await doc.save() return doc async def delete_role(self, role_id: str) -> None: """Delete a role document after checking if it is referenced by any user and is not default""" if not role_id: raise RequestValidationError("role_id is required.") # Check if any user references this role user_role = await UserRoleDoc.find_one({"role_ids": str(role_id)}) if user_role: raise RequestValidationError("Role is referenced by a user and cannot be deleted.") doc = await RoleDoc.get(role_id) if not doc: raise RequestValidationError("Role not found.") # Check if the role is default if doc.is_default: raise RequestValidationError("Default role cannot be deleted.") await doc.delete()