freeleaps-service-hub/apps/authentication/backend/infra/permission/permission_handler.py

180 lines
7.7 KiB
Python

from typing import Optional, List, Tuple
from fastapi.exceptions import RequestValidationError
from backend.models.permission.models import PermissionDoc, RoleDoc
from bson import ObjectId
from datetime import datetime
class PermissionHandler:
def __init__(self):
pass
async def create_permission(self, permission_key: str, permission_name: str,
description: Optional[str] = None) -> Optional[PermissionDoc]:
"""Create a new permission document"""
if not permission_key or not permission_name:
raise RequestValidationError("permission_key and permission_name are required.")
# if exists.
if await PermissionDoc.find_one(
{str(PermissionDoc.permission_key): permission_key}) or await PermissionDoc.find_one(
{str(PermissionDoc.permission_name): permission_name}):
raise RequestValidationError("permission has already been created.")
doc = PermissionDoc(
permission_key=permission_key,
permission_name=permission_name,
description=description,
created_at=datetime.now(),
updated_at=datetime.now()
)
await doc.create()
return doc
async def update_permission(self, permission_id: str, permission_key: Optional[str] = None,
permission_name: Optional[str] = None, description: Optional[str] = None) -> Optional[
PermissionDoc]:
"""Update an existing permission document by id, ensuring permission_key is unique"""
if not permission_id or not permission_key or not permission_name:
raise RequestValidationError("permission_id, permission_key and permission_name is required.")
doc = await PermissionDoc.get(permission_id)
if not doc:
raise RequestValidationError("Permission not found.")
if doc.is_default:
raise RequestValidationError("Default permission cannot be updated.")
# Check for uniqueness (exclude self)
conflict = await PermissionDoc.find_one({
"$and": [
{"_id": {"$ne": permission_id}},
{"$or": [
{str(PermissionDoc.permission_key): permission_key},
{str(PermissionDoc.permission_name): permission_name}
]}
]
})
if conflict:
raise RequestValidationError("Permission name or permission key already exists.")
doc.permission_key = permission_key
doc.permission_name = permission_name
doc.description = description
doc.updated_at = datetime.now()
await doc.save()
return doc
async def create_or_update_permission(self, permission_key: str, permission_name: str, custom_permission_id: Optional[str], description: Optional[str] = None) -> Optional[PermissionDoc]:
"""Create or update a permission document"""
# Input validation
if not permission_key or not permission_name:
raise RequestValidationError("permission_key and permission_name are required.")
def create_new_doc():
return PermissionDoc(
permission_key=permission_key,
permission_name=permission_name,
description=description,
created_at=datetime.now(),
updated_at=datetime.now()
)
def update_doc_fields(doc):
doc.permission_key = permission_key
doc.permission_name = permission_name
doc.description = description
doc.updated_at = datetime.now()
try:
# Check if permission with this key already exists
existing_doc = await PermissionDoc.find_one(
{str(PermissionDoc.permission_key): permission_key}
)
except Exception as e:
existing_doc = None
if existing_doc:
# If permission with this key already exists
if custom_permission_id and str(custom_permission_id) != str(existing_doc.id):
# Different ID provided - replace the document
id_conflict = await PermissionDoc.get(custom_permission_id)
if id_conflict:
raise RequestValidationError("Permission with the provided ID already exists.")
new_doc = create_new_doc()
new_doc.id = custom_permission_id
await new_doc.create()
await existing_doc.delete()
return new_doc
else:
# Same ID or no ID provided - update existing document
update_doc_fields(existing_doc)
await existing_doc.save()
return existing_doc
else:
# If no existing document with this key, create new document
new_doc = create_new_doc()
if custom_permission_id:
id_conflict = await PermissionDoc.get(custom_permission_id)
if id_conflict:
raise RequestValidationError("Permission with the provided ID already exists.")
new_doc.id = custom_permission_id
await new_doc.create()
return new_doc
async def query_permissions(
self,
permission_key: Optional[str] = None,
permission_name: Optional[str] = None,
skip: int = 0,
limit: int = 10
) -> Tuple[List[PermissionDoc], int]:
"""Query permissions with pagination and fuzzy search"""
query = {}
if permission_key:
query[str(PermissionDoc.permission_key)] = {"$regex": permission_key, "$options": "i"}
if permission_name:
query[str(PermissionDoc.permission_name)] = {"$regex": permission_name, "$options": "i"}
cursor = PermissionDoc.find(query)
total = await cursor.count()
docs = await cursor.skip(skip).limit(limit).to_list()
return docs, total
async def query_permissions_no_pagination(
self,
permission_id: Optional[str] = None,
permission_key: Optional[str] = None,
permission_name: Optional[str] = None
) -> Tuple[List[PermissionDoc], int]:
"""Query permissions fuzzy search"""
query = {}
if permission_id:
try:
query[str(PermissionDoc.id)] = permission_id
except Exception:
raise RequestValidationError("Invalid permission_id format. Must be a valid ObjectId.")
if permission_key:
query[str(PermissionDoc.permission_key)] = {"$regex": permission_key, "$options": "i"}
if permission_name:
query[str(PermissionDoc.permission_name)] = {"$regex": permission_name, "$options": "i"}
cursor = PermissionDoc.find(query)
total = await cursor.count()
docs = await cursor.to_list()
return docs, total
async def delete_permission(self, permission_id: str) -> None:
"""Delete a permission document after checking if it is referenced by any role and is not default"""
if not permission_id:
raise RequestValidationError("permission_id is required.")
# Check if any role references this permission
role = await RoleDoc.find_one({"permission_ids": str(permission_id)})
if role:
raise RequestValidationError("Permission is referenced by a role and cannot be deleted.")
doc = await PermissionDoc.get(permission_id)
if not doc:
raise RequestValidationError("Permission not found.")
# Check if the permission is default
if doc.is_default:
raise RequestValidationError("Default permission cannot be deleted.")
await doc.delete()