freeleaps-service-hub/apps/authentication/backend/infra/permission/user_role_handler.py

66 lines
2.8 KiB
Python

from typing import Optional, List
from fastapi.exceptions import RequestValidationError
from backend.models.permission.models import RoleDoc, UserRoleDoc, PermissionDoc
from beanie import PydanticObjectId
class UserRoleHandler:
def __init__(self):
pass
async def assign_roles_to_user(self, user_id: str, role_ids: List[str]) -> Optional[UserRoleDoc]:
"""Assign roles to a user by updating or creating the UserRoleDoc"""
if not user_id or not role_ids:
raise RequestValidationError("user_id and role_ids are required.")
# Validate that all role_ids exist in the role collection
for role_id in role_ids:
role_doc = await RoleDoc.get(PydanticObjectId(role_id))
if not role_doc:
raise RequestValidationError(f"Role with id {role_id} not found.")
# Remove duplicates from role_ids
unique_role_ids = list(dict.fromkeys(role_ids))
# Check if UserRoleDoc already exists for this user
existing_user_role = await UserRoleDoc.find_one(UserRoleDoc.user_id == user_id)
if existing_user_role:
# Update existing UserRoleDoc
existing_user_role.role_ids = unique_role_ids
await existing_user_role.save()
return existing_user_role
else:
# Create new UserRoleDoc
user_role_doc = UserRoleDoc(
user_id=user_id,
role_ids=unique_role_ids
)
await user_role_doc.insert()
return user_role_doc
async def get_role_and_permission_by_user_id(self, user_id: str) -> tuple[list[str], list[str]]:
"""Get all role names and permission keys for a user by user_id"""
# Query user roles
user_role_doc = await UserRoleDoc.find_one(UserRoleDoc.user_id == user_id)
if not user_role_doc or not user_role_doc.role_ids:
# No roles assigned
return [], []
# Query all roles by role_ids
roles = await RoleDoc.find({"_id": {"$in": [PydanticObjectId(rid) for rid in user_role_doc.role_ids]}}).to_list()
role_names = [role.role_name for role in roles]
# Collect all permission_ids from all roles
all_permission_ids = []
for role in roles:
if role.permission_ids:
all_permission_ids.extend(role.permission_ids)
# Remove duplicates
unique_permission_ids = list(dict.fromkeys(all_permission_ids))
# Query all permissions by permission_ids
if unique_permission_ids:
permissions = await PermissionDoc.find({"_id": {"$in": [PydanticObjectId(pid) for pid in unique_permission_ids]}}).to_list()
permission_keys = [perm.permission_key for perm in permissions]
else:
permission_keys = []
return role_names, permission_keys