from typing import Optional, List from fastapi.exceptions import RequestValidationError from backend.models.permission.models import RoleDoc, UserRoleDoc, PermissionDoc from bson import ObjectId class UserRoleHandler: def __init__(self): pass async def assign_roles_to_user(self, user_id: str, role_ids: List[str]) -> Optional[UserRoleDoc]: """Assign roles to a user by updating or creating the UserRoleDoc""" if not user_id or not role_ids: raise RequestValidationError("user_id and role_ids are required.") # Validate that all role_ids exist in the role collection for role_id in role_ids: role_doc = await RoleDoc.get(role_id) if not role_doc: raise RequestValidationError(f"Role with id {role_id} not found.") # Remove duplicates from role_ids unique_role_ids = list(dict.fromkeys(role_ids)) # Check if UserRoleDoc already exists for this user existing_user_role = await UserRoleDoc.find_one(UserRoleDoc.user_id == user_id) if existing_user_role: # Update existing UserRoleDoc existing_user_role.role_ids = unique_role_ids await existing_user_role.save() return existing_user_role else: # Create new UserRoleDoc user_role_doc = UserRoleDoc( user_id=user_id, role_ids=unique_role_ids ) await user_role_doc.insert() return user_role_doc async def get_role_and_permission_by_user_id(self, user_id: str) -> tuple[list[str], list[str]]: """Get all role names and permission keys for a user by user_id""" # Query user roles user_role_doc = await UserRoleDoc.find_one(UserRoleDoc.user_id == user_id) if not user_role_doc or not user_role_doc.role_ids: # No roles assigned return [], [] # Query all roles by role_ids roles = await RoleDoc.find({"_id": {"$in": user_role_doc.role_ids}}).to_list() role_names = [role.role_name for role in roles] # Collect all permission_ids from all roles all_permission_ids = [] for role in roles: if role.permission_ids: all_permission_ids.extend(role.permission_ids) # Remove duplicates unique_permission_ids = list(dict.fromkeys(all_permission_ids)) # Query all permissions by permission_ids if unique_permission_ids: permissions = await PermissionDoc.find({"_id": {"$in": unique_permission_ids}}).to_list() permission_keys = [perm.permission_key for perm in permissions] else: permission_keys = [] return role_names, permission_keys