66 lines
2.8 KiB
Python
66 lines
2.8 KiB
Python
from typing import Optional, List
|
|
from fastapi.exceptions import RequestValidationError
|
|
from backend.models.permission.models import RoleDoc, UserRoleDoc, PermissionDoc
|
|
from beanie import PydanticObjectId
|
|
|
|
|
|
class UserRoleHandler:
|
|
def __init__(self):
|
|
pass
|
|
|
|
async def assign_roles_to_user(self, user_id: str, role_ids: List[str]) -> Optional[UserRoleDoc]:
|
|
"""Assign roles to a user by updating or creating the UserRoleDoc"""
|
|
if not user_id or not role_ids:
|
|
raise RequestValidationError("user_id and role_ids are required.")
|
|
|
|
# Validate that all role_ids exist in the role collection
|
|
for role_id in role_ids:
|
|
role_doc = await RoleDoc.get(PydanticObjectId(role_id))
|
|
if not role_doc:
|
|
raise RequestValidationError(f"Role with id {role_id} not found.")
|
|
|
|
# Remove duplicates from role_ids
|
|
unique_role_ids = list(dict.fromkeys(role_ids))
|
|
|
|
# Check if UserRoleDoc already exists for this user
|
|
existing_user_role = await UserRoleDoc.find_one(UserRoleDoc.user_id == user_id)
|
|
|
|
if existing_user_role:
|
|
# Update existing UserRoleDoc
|
|
existing_user_role.role_ids = unique_role_ids
|
|
await existing_user_role.save()
|
|
return existing_user_role
|
|
else:
|
|
# Create new UserRoleDoc
|
|
user_role_doc = UserRoleDoc(
|
|
user_id=user_id,
|
|
role_ids=unique_role_ids
|
|
)
|
|
await user_role_doc.insert()
|
|
return user_role_doc
|
|
|
|
async def get_role_and_permission_by_user_id(self, user_id: str) -> tuple[list[str], list[str]]:
|
|
"""Get all role names and permission keys for a user by user_id"""
|
|
# Query user roles
|
|
user_role_doc = await UserRoleDoc.find_one(UserRoleDoc.user_id == user_id)
|
|
if not user_role_doc or not user_role_doc.role_ids:
|
|
# No roles assigned
|
|
return [], []
|
|
# Query all roles by role_ids
|
|
roles = await RoleDoc.find({"_id": {"$in": [PydanticObjectId(rid) for rid in user_role_doc.role_ids]}}).to_list()
|
|
role_names = [role.role_name for role in roles]
|
|
# Collect all permission_ids from all roles
|
|
all_permission_ids = []
|
|
for role in roles:
|
|
if role.permission_ids:
|
|
all_permission_ids.extend(role.permission_ids)
|
|
# Remove duplicates
|
|
unique_permission_ids = list(dict.fromkeys(all_permission_ids))
|
|
# Query all permissions by permission_ids
|
|
if unique_permission_ids:
|
|
permissions = await PermissionDoc.find({"_id": {"$in": [PydanticObjectId(pid) for pid in unique_permission_ids]}}).to_list()
|
|
permission_keys = [perm.permission_key for perm in permissions]
|
|
else:
|
|
permission_keys = []
|
|
return role_names, permission_keys
|