freeleaps-service-hub/apps/authentication/backend/infra/permission/role_handler.py

156 lines
6.7 KiB
Python

from typing import Optional, List, Tuple
from fastapi.exceptions import RequestValidationError
from backend.models.permission.models import RoleDoc, PermissionDoc, UserRoleDoc
from beanie import PydanticObjectId
from datetime import datetime
class RoleHandler:
def __init__(self):
pass
async def create_role(self, role_key: str, role_name: str, role_description: Optional[str], role_level: int, custom_role_id: Optional[str] = None) -> Optional[RoleDoc]:
"""Create a new role, ensuring role_key and role_name are unique and not empty"""
if not role_key or not role_name:
raise RequestValidationError("role_key and role_name are required.")
if await RoleDoc.find_one({str(RoleDoc.role_key): role_key}) or await RoleDoc.find_one(
{str(RoleDoc.role_name): role_name}):
raise RequestValidationError("role_key or role_name has already been created.")
# Check if custom_role_id is provided and if it already exists
if custom_role_id:
try:
custom_id = PydanticObjectId(custom_role_id)
if await RoleDoc.get(custom_id):
raise RequestValidationError("Role with the provided custom_role_id already exists.")
except Exception:
raise RequestValidationError("Invalid custom_role_id format. Must be a valid ObjectId.")
doc = RoleDoc(
role_key=role_key,
role_name=role_name,
role_description=role_description,
permission_ids=[],
role_level=role_level,
created_at=datetime.now(),
updated_at=datetime.now()
)
# Set custom ID if provided
if custom_role_id:
doc.id = PydanticObjectId(custom_role_id)
await doc.insert()
return doc
async def update_role(self, role_id: PydanticObjectId, role_key: str, role_name: str,
role_description: Optional[str], role_level: int, custom_role_id: Optional[str] = None) -> Optional[
RoleDoc]:
"""Update an existing role, ensuring role_key and role_name are unique and not empty"""
if not role_id or not role_key or not role_name:
raise RequestValidationError("role_id, role_key and role_name are required.")
doc = await RoleDoc.get(role_id)
if not doc:
raise RequestValidationError("role not found.")
#if doc.is_default:
# raise RequestValidationError("Default role cannot be updated.")
# Check for uniqueness (exclude self)
conflict = await RoleDoc.find_one({
"$and": [
{"_id": {"$ne": role_id}},
{"$or": [
{str(RoleDoc.role_key): role_key},
{str(RoleDoc.role_name): role_name}
]}
]
})
if conflict:
raise RequestValidationError("role_key or role_name already exists.")
doc.role_key = role_key
doc.role_name = role_name
doc.role_description = role_description
doc.role_level = role_level
doc.updated_at = datetime.now()
# Set custom role ID if provided
if custom_role_id:
# Store the old ID for cleanup
old_id = doc.id
doc.id = PydanticObjectId(custom_role_id)
await doc.save()
# Delete the old document with the original ID
try:
old_doc = await RoleDoc.get(old_id)
if (str(old_id) != custom_role_id) and old_doc:
await old_doc.delete()
except Exception as e:
# Log the error but don't fail the operation
print(f"Warning: Failed to delete old role document {old_id}: {e}")
else:
await doc.save()
return doc
async def query_roles(self, role_key: Optional[str], role_name: Optional[str], skip: int = 0, limit: int = 10) -> \
Tuple[List[RoleDoc], int]:
"""Query roles with pagination and fuzzy search by role_key and role_name"""
query = {}
if role_key:
query[str(RoleDoc.role_key)] = {"$regex": role_key, "$options": "i"}
if role_name:
query[str(RoleDoc.role_name)] = {"$regex": role_name, "$options": "i"}
cursor = RoleDoc.find(query)
total = await cursor.count()
docs = await cursor.skip(skip).limit(limit).to_list()
return docs, total
async def query_roles_by_id(self, role_id: PydanticObjectId) -> Optional[RoleDoc]:
"""Query a role by its ID"""
if not role_id:
raise RequestValidationError("role_id is required.")
doc = await RoleDoc.get(role_id)
if not doc:
raise RequestValidationError("Role not found.")
return doc
async def assign_permissions_to_role(self, role_id: PydanticObjectId, permission_ids: List[str]) -> Optional[RoleDoc]:
"""Assign permissions to a role by updating the permission_ids field"""
if not role_id or not permission_ids:
raise RequestValidationError("role_id and permission_ids are required.")
doc = await RoleDoc.get(role_id)
if not doc:
raise RequestValidationError(f"Role not found. {role_id}")
# Validate that all permission_ids exist in the permission collection
for permission_id in permission_ids:
permission_doc = await PermissionDoc.get(PydanticObjectId(permission_id))
if not permission_doc:
raise RequestValidationError(f"Permission with id {permission_id} not found.")
# Remove duplicates from permission_ids
unique_permission_ids = list(dict.fromkeys(permission_ids))
doc.permission_ids = unique_permission_ids
doc.updated_at = datetime.now()
await doc.save()
return doc
async def delete_role(self, role_id: PydanticObjectId) -> None:
"""Delete a role document after checking if it is referenced by any user and is not default"""
if not role_id:
raise RequestValidationError("role_id is required.")
# Check if any user references this role
user_role = await UserRoleDoc.find_one({"role_ids": str(role_id)})
if user_role:
raise RequestValidationError("Role is referenced by a user and cannot be deleted.")
doc = await RoleDoc.get(role_id)
if not doc:
raise RequestValidationError("Role not found.")
# Check if the role is default
#if doc.is_default:
# raise RequestValidationError("Default role cannot be deleted.")
await doc.delete()