147 lines
6.3 KiB
Python
147 lines
6.3 KiB
Python
from typing import Optional, List, Tuple
|
|
|
|
from fastapi.exceptions import RequestValidationError
|
|
|
|
from backend.models.permission.models import RoleDoc, PermissionDoc, UserRoleDoc
|
|
from beanie import PydanticObjectId
|
|
from datetime import datetime
|
|
|
|
|
|
class RoleHandler:
|
|
def __init__(self):
|
|
pass
|
|
|
|
async def create_role(self, role_key: str, role_name: str, role_description: Optional[str], role_level: int, custom_role_id: Optional[str] = None) -> Optional[RoleDoc]:
|
|
"""Create a new role, ensuring role_key and role_name are unique and not empty"""
|
|
if not role_key or not role_name:
|
|
raise RequestValidationError("role_key and role_name are required.")
|
|
if await RoleDoc.find_one({str(RoleDoc.role_key): role_key}) or await RoleDoc.find_one(
|
|
{str(RoleDoc.role_name): role_name}):
|
|
raise RequestValidationError("role_key or role_name has already been created.")
|
|
|
|
# Check if custom_role_id is provided and if it already exists
|
|
if custom_role_id:
|
|
try:
|
|
custom_id = PydanticObjectId(custom_role_id)
|
|
if await RoleDoc.get(custom_id):
|
|
raise RequestValidationError("Role with the provided custom_role_id already exists.")
|
|
except Exception:
|
|
raise RequestValidationError("Invalid custom_role_id format. Must be a valid ObjectId.")
|
|
|
|
doc = RoleDoc(
|
|
role_key=role_key,
|
|
role_name=role_name,
|
|
role_description=role_description,
|
|
permission_ids=[],
|
|
role_level=role_level,
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now()
|
|
)
|
|
|
|
# Set custom ID if provided
|
|
if custom_role_id:
|
|
doc.id = PydanticObjectId(custom_role_id)
|
|
|
|
await doc.insert()
|
|
return doc
|
|
|
|
async def update_role(self, role_id: PydanticObjectId, role_key: str, role_name: str,
|
|
role_description: Optional[str], role_level: int, custom_role_id: Optional[str] = None) -> Optional[
|
|
RoleDoc]:
|
|
"""Update an existing role, ensuring role_key and role_name are unique and not empty"""
|
|
if not role_id or not role_key or not role_name:
|
|
raise RequestValidationError("role_id, role_key and role_name are required.")
|
|
doc = await RoleDoc.get(role_id)
|
|
if not doc:
|
|
raise RequestValidationError("role not found.")
|
|
#if doc.is_default:
|
|
# raise RequestValidationError("Default role cannot be updated.")
|
|
# Check for uniqueness (exclude self)
|
|
conflict = await RoleDoc.find_one({
|
|
"$and": [
|
|
{"_id": {"$ne": role_id}},
|
|
{"$or": [
|
|
{str(RoleDoc.role_key): role_key},
|
|
{str(RoleDoc.role_name): role_name}
|
|
]}
|
|
]
|
|
})
|
|
if conflict:
|
|
raise RequestValidationError("role_key or role_name already exists.")
|
|
doc.role_key = role_key
|
|
doc.role_name = role_name
|
|
doc.role_description = role_description
|
|
doc.role_level = role_level
|
|
doc.updated_at = datetime.now()
|
|
|
|
# Set custom role ID if provided
|
|
if custom_role_id:
|
|
# Store the old ID for cleanup
|
|
old_id = doc.id
|
|
doc.id = PydanticObjectId(custom_role_id)
|
|
await doc.save()
|
|
|
|
# Delete the old document with the original ID
|
|
try:
|
|
old_doc = await RoleDoc.get(old_id)
|
|
if old_doc:
|
|
await old_doc.delete()
|
|
except Exception as e:
|
|
# Log the error but don't fail the operation
|
|
print(f"Warning: Failed to delete old role document {old_id}: {e}")
|
|
else:
|
|
await doc.save()
|
|
|
|
return doc
|
|
|
|
async def query_roles(self, role_key: Optional[str], role_name: Optional[str], skip: int = 0, limit: int = 10) -> \
|
|
Tuple[List[RoleDoc], int]:
|
|
"""Query roles with pagination and fuzzy search by role_key and role_name"""
|
|
query = {}
|
|
if role_key:
|
|
query[str(RoleDoc.role_key)] = {"$regex": role_key, "$options": "i"}
|
|
if role_name:
|
|
query[str(RoleDoc.role_name)] = {"$regex": role_name, "$options": "i"}
|
|
cursor = RoleDoc.find(query)
|
|
total = await cursor.count()
|
|
docs = await cursor.skip(skip).limit(limit).to_list()
|
|
return docs, total
|
|
|
|
async def assign_permissions_to_role(self, role_id: PydanticObjectId, permission_ids: List[str]) -> Optional[RoleDoc]:
|
|
"""Assign permissions to a role by updating the permission_ids field"""
|
|
if not role_id or not permission_ids:
|
|
raise RequestValidationError("role_id and permission_ids are required.")
|
|
doc = await RoleDoc.get(role_id)
|
|
if not doc:
|
|
raise RequestValidationError("Role not found.")
|
|
|
|
# Validate that all permission_ids exist in the permission collection
|
|
for permission_id in permission_ids:
|
|
permission_doc = await PermissionDoc.get(PydanticObjectId(permission_id))
|
|
if not permission_doc:
|
|
raise RequestValidationError(f"Permission with id {permission_id} not found.")
|
|
|
|
# Remove duplicates from permission_ids
|
|
unique_permission_ids = list(dict.fromkeys(permission_ids))
|
|
|
|
doc.permission_ids = unique_permission_ids
|
|
doc.updated_at = datetime.now()
|
|
await doc.save()
|
|
return doc
|
|
|
|
async def delete_role(self, role_id: PydanticObjectId) -> None:
|
|
"""Delete a role document after checking if it is referenced by any user and is not default"""
|
|
if not role_id:
|
|
raise RequestValidationError("role_id is required.")
|
|
# Check if any user references this role
|
|
user_role = await UserRoleDoc.find_one({"role_ids": str(role_id)})
|
|
if user_role:
|
|
raise RequestValidationError("Role is referenced by a user and cannot be deleted.")
|
|
doc = await RoleDoc.get(role_id)
|
|
if not doc:
|
|
raise RequestValidationError("Role not found.")
|
|
# Check if the role is default
|
|
#if doc.is_default:
|
|
# raise RequestValidationError("Default role cannot be deleted.")
|
|
await doc.delete()
|