diff --git a/apps/authentication/backend/infra/permission/permission_handler.py b/apps/authentication/backend/infra/permission/permission_handler.py index 9983396..a0b482b 100644 --- a/apps/authentication/backend/infra/permission/permission_handler.py +++ b/apps/authentication/backend/infra/permission/permission_handler.py @@ -12,7 +12,7 @@ class PermissionHandler: pass async def create_permission(self, permission_key: str, permission_name: str, - description: Optional[str] = None, custom_permission_id: Optional[str] = None) -> Optional[PermissionDoc]: + description: Optional[str] = None) -> Optional[PermissionDoc]: """Create a new permission document""" if not permission_key or not permission_name: raise RequestValidationError("permission_key and permission_name are required.") @@ -21,14 +21,6 @@ class PermissionHandler: {str(PermissionDoc.permission_key): permission_key}) or await PermissionDoc.find_one( {str(PermissionDoc.permission_name): permission_name}): raise RequestValidationError("permission has already been created.") - if custom_permission_id: - try: - custom_id = PydanticObjectId(custom_permission_id) - if await PermissionDoc.get(custom_id): - raise RequestValidationError("Permission with the provided custom_permission_id already exists.") - except Exception: - raise RequestValidationError("Invalid custom_permission_id format. Must be a valid ObjectId.") - doc = PermissionDoc( permission_key=permission_key, permission_name=permission_name, @@ -36,15 +28,11 @@ class PermissionHandler: created_at=datetime.now(), updated_at=datetime.now() ) - - if custom_permission_id: - doc.id = PydanticObjectId(custom_permission_id) - await doc.insert() return doc async def update_permission(self, permission_id: PydanticObjectId, permission_key: Optional[str] = None, - permission_name: Optional[str] = None, description: Optional[str] = None, custom_permission_id: Optional[str] = None) -> Optional[ + permission_name: Optional[str] = None, description: Optional[str] = None) -> Optional[ PermissionDoc]: """Update an existing permission document by id, ensuring permission_key is unique""" if not permission_id or not permission_key or not permission_name: @@ -52,8 +40,8 @@ class PermissionHandler: doc = await PermissionDoc.get(permission_id) if not doc: raise RequestValidationError("Permission not found.") - #if doc.is_default: - # raise RequestValidationError("Default permission cannot be updated.") + if doc.is_default: + raise RequestValidationError("Default permission cannot be updated.") # Check for uniqueness (exclude self) conflict = await PermissionDoc.find_one({ "$and": [ @@ -70,26 +58,70 @@ class PermissionHandler: doc.permission_name = permission_name doc.description = description doc.updated_at = datetime.now() - - if custom_permission_id: - # Store the old ID for cleanup - old_id = doc.id - doc.id = PydanticObjectId(custom_permission_id) - await doc.save() - - # Delete the old document with the original ID - try: - old_doc = await PermissionDoc.get(old_id) - if (str(old_id) != custom_permission_id) and old_doc: - await old_doc.delete() - except Exception as e: - # Log the error but don't fail the operation - print(f"Warning: Failed to delete old permission document {old_id}: {e}") - else: - await doc.save() - + + await doc.save() return doc + async def create_or_update_permission(self, permission_key: str, permission_name: str, custom_permission_id: Optional[str], description: Optional[str] = None) -> Optional[PermissionDoc]: + """Create or update a permission document""" + # Input validation + if not permission_key or not permission_name: + raise RequestValidationError("permission_key and permission_name are required.") + + def create_new_doc(): + return PermissionDoc( + permission_key=permission_key, + permission_name=permission_name, + description=description, + created_at=datetime.now(), + updated_at=datetime.now() + ) + + def update_doc_fields(doc): + doc.permission_key = permission_key + doc.permission_name = permission_name + doc.description = description + doc.updated_at = datetime.now() + + try: + # Check if permission with this key already exists + existing_doc = await PermissionDoc.find_one( + {str(PermissionDoc.permission_key): permission_key} + ) + except Exception as e: + existing_doc = None + + if existing_doc: + # If permission with this key already exists + if custom_permission_id and str(custom_permission_id) != str(existing_doc.id): + # Different ID provided - replace the document + id_conflict = await PermissionDoc.get(custom_permission_id) + if id_conflict: + raise RequestValidationError("Permission with the provided ID already exists.") + + new_doc = create_new_doc() + new_doc.id = PydanticObjectId(custom_permission_id) + await new_doc.insert() + await existing_doc.delete() + return new_doc + else: + # Same ID or no ID provided - update existing document + update_doc_fields(existing_doc) + await existing_doc.save() + return existing_doc + else: + # If no existing document with this key, create new document + new_doc = create_new_doc() + + if custom_permission_id: + id_conflict = await PermissionDoc.get(custom_permission_id) + if id_conflict: + raise RequestValidationError("Permission with the provided ID already exists.") + new_doc.id = PydanticObjectId(custom_permission_id) + + await new_doc.insert() + return new_doc + async def query_permissions( self, permission_key: Optional[str] = None, @@ -108,6 +140,28 @@ class PermissionHandler: docs = await cursor.skip(skip).limit(limit).to_list() return docs, total + async def query_permissions_no_pagination( + self, + permission_id: Optional[str] = None, + permission_key: Optional[str] = None, + permission_name: Optional[str] = None + ) -> Tuple[List[PermissionDoc], int]: + """Query permissions fuzzy search""" + query = {} + if permission_id: + try: + query[str(PermissionDoc.id)] = PydanticObjectId(permission_id) + except Exception: + raise RequestValidationError("Invalid permission_id format. Must be a valid ObjectId.") + if permission_key: + query[str(PermissionDoc.permission_key)] = {"$regex": permission_key, "$options": "i"} + if permission_name: + query[str(PermissionDoc.permission_name)] = {"$regex": permission_name, "$options": "i"} + cursor = PermissionDoc.find(query) + total = await cursor.count() + docs = await cursor.to_list() + return docs, total + async def delete_permission(self, permission_id: PydanticObjectId) -> None: """Delete a permission document after checking if it is referenced by any role and is not default""" if not permission_id: @@ -120,6 +174,6 @@ class PermissionHandler: if not doc: raise RequestValidationError("Permission not found.") # Check if the permission is default - #if doc.is_default: - # raise RequestValidationError("Default permission cannot be deleted.") + if doc.is_default: + raise RequestValidationError("Default permission cannot be deleted.") await doc.delete() diff --git a/apps/authentication/backend/infra/permission/role_handler.py b/apps/authentication/backend/infra/permission/role_handler.py index c1dd796..4b77d75 100644 --- a/apps/authentication/backend/infra/permission/role_handler.py +++ b/apps/authentication/backend/infra/permission/role_handler.py @@ -11,23 +11,13 @@ class RoleHandler: def __init__(self): pass - async def create_role(self, role_key: str, role_name: str, role_description: Optional[str], role_level: int, custom_role_id: Optional[str] = None) -> Optional[RoleDoc]: + async def create_role(self, role_key: str, role_name: str, role_description: Optional[str], role_level: int) -> Optional[RoleDoc]: """Create a new role, ensuring role_key and role_name are unique and not empty""" if not role_key or not role_name: raise RequestValidationError("role_key and role_name are required.") if await RoleDoc.find_one({str(RoleDoc.role_key): role_key}) or await RoleDoc.find_one( {str(RoleDoc.role_name): role_name}): raise RequestValidationError("role_key or role_name has already been created.") - - # Check if custom_role_id is provided and if it already exists - if custom_role_id: - try: - custom_id = PydanticObjectId(custom_role_id) - if await RoleDoc.get(custom_id): - raise RequestValidationError("Role with the provided custom_role_id already exists.") - except Exception: - raise RequestValidationError("Invalid custom_role_id format. Must be a valid ObjectId.") - doc = RoleDoc( role_key=role_key, role_name=role_name, @@ -37,16 +27,11 @@ class RoleHandler: created_at=datetime.now(), updated_at=datetime.now() ) - - # Set custom ID if provided - if custom_role_id: - doc.id = PydanticObjectId(custom_role_id) - await doc.insert() return doc async def update_role(self, role_id: PydanticObjectId, role_key: str, role_name: str, - role_description: Optional[str], role_level: int, custom_role_id: Optional[str] = None) -> Optional[ + role_description: Optional[str], role_level: int) -> Optional[ RoleDoc]: """Update an existing role, ensuring role_key and role_name are unique and not empty""" if not role_id or not role_key or not role_name: @@ -54,8 +39,8 @@ class RoleHandler: doc = await RoleDoc.get(role_id) if not doc: raise RequestValidationError("role not found.") - #if doc.is_default: - # raise RequestValidationError("Default role cannot be updated.") + if doc.is_default: + raise RequestValidationError("Default role cannot be updated.") # Check for uniqueness (exclude self) conflict = await RoleDoc.find_one({ "$and": [ @@ -73,27 +58,69 @@ class RoleHandler: doc.role_description = role_description doc.role_level = role_level doc.updated_at = datetime.now() - - # Set custom role ID if provided - if custom_role_id: - # Store the old ID for cleanup - old_id = doc.id - doc.id = PydanticObjectId(custom_role_id) - await doc.save() - - # Delete the old document with the original ID - try: - old_doc = await RoleDoc.get(old_id) - if (str(old_id) != custom_role_id) and old_doc: - await old_doc.delete() - except Exception as e: - # Log the error but don't fail the operation - print(f"Warning: Failed to delete old role document {old_id}: {e}") - else: - await doc.save() - + await doc.save() return doc + async def create_or_update_role(self, role_key: str, role_name: str, role_level: int, custom_role_id: Optional[str], role_description: Optional[str] = None) -> Optional[RoleDoc]: + """Create or update a role document""" + # Input validation + if not role_key or not role_name: + raise RequestValidationError("role_key and role_name are required.") + + def create_new_doc(): + return RoleDoc( + role_key=role_key, + role_name=role_name, + role_description=role_description, + role_level=role_level, + permission_ids=[], + created_at=datetime.now(), + updated_at=datetime.now() + ) + def update_doc_fields(doc): + doc.role_key = role_key + doc.role_name = role_name + doc.role_description = role_description + doc.role_level = role_level + doc.updated_at = datetime.now() + + # Check if role with this key already exists + existing_doc = await RoleDoc.find_one( + {str(RoleDoc.role_key): role_key} + ) + + if existing_doc: + # If role with this key already exists + if custom_role_id and str(custom_role_id) != str(existing_doc.id): + # Different ID provided - replace the document + id_conflict = await RoleDoc.get(custom_role_id) + if id_conflict: + raise RequestValidationError("Role with the provided ID already exists.") + + new_doc = create_new_doc() + new_doc.id = PydanticObjectId(custom_role_id) + await new_doc.insert() + await existing_doc.delete() + return new_doc + + else: + # Same ID or no ID provided - update existing document + update_doc_fields(existing_doc) + await existing_doc.save() + return existing_doc + else: + # If no existing document with this key, create new document + new_doc = create_new_doc() + + if custom_role_id: + id_conflict = await RoleDoc.get(custom_role_id) + if id_conflict: + raise RequestValidationError("Role with the provided ID already exists.") + new_doc.id = PydanticObjectId(custom_role_id) + + await new_doc.insert() + return new_doc + async def query_roles(self, role_key: Optional[str], role_name: Optional[str], skip: int = 0, limit: int = 10) -> \ Tuple[List[RoleDoc], int]: """Query roles with pagination and fuzzy search by role_key and role_name""" @@ -107,14 +134,27 @@ class RoleHandler: docs = await cursor.skip(skip).limit(limit).to_list() return docs, total - async def query_roles_by_id(self, role_id: PydanticObjectId) -> Optional[RoleDoc]: - """Query a role by its ID""" - if not role_id: - raise RequestValidationError("role_id is required.") - doc = await RoleDoc.get(role_id) - if not doc: - raise RequestValidationError("Role not found.") - return doc + async def query_roles_no_pagination( + self, + role_id: Optional[str] = None, + role_key: Optional[str] = None, + role_name: Optional[str] = None + ) -> Tuple[List[RoleDoc], int]: + """Query roles fuzzy search without pagination""" + query = {} + if role_id: + try: + query[str(RoleDoc.id)] = PydanticObjectId(role_id) + except Exception: + raise RequestValidationError("Invalid role_id format. Must be a valid ObjectId.") + if role_key: + query[str(RoleDoc.role_key)] = {"$regex": role_key, "$options": "i"} + if role_name: + query[str(RoleDoc.role_name)] = {"$regex": role_name, "$options": "i"} + cursor = RoleDoc.find(query) + total = await cursor.count() + docs = await cursor.to_list() + return docs, total async def assign_permissions_to_role(self, role_id: PydanticObjectId, permission_ids: List[str]) -> Optional[RoleDoc]: """Assign permissions to a role by updating the permission_ids field""" @@ -122,7 +162,7 @@ class RoleHandler: raise RequestValidationError("role_id and permission_ids are required.") doc = await RoleDoc.get(role_id) if not doc: - raise RequestValidationError(f"Role not found. {role_id}") + raise RequestValidationError("Role not found.") # Validate that all permission_ids exist in the permission collection for permission_id in permission_ids: @@ -150,6 +190,6 @@ class RoleHandler: if not doc: raise RequestValidationError("Role not found.") # Check if the role is default - #if doc.is_default: - # raise RequestValidationError("Default role cannot be deleted.") + if doc.is_default: + raise RequestValidationError("Default role cannot be deleted.") await doc.delete() diff --git a/apps/authentication/backend/services/permission/permission_service.py b/apps/authentication/backend/services/permission/permission_service.py index 1feb8be..304b58a 100644 --- a/apps/authentication/backend/services/permission/permission_service.py +++ b/apps/authentication/backend/services/permission/permission_service.py @@ -10,14 +10,18 @@ class PermissionService: def __init__(self): self.permission_handler = PermissionHandler() - async def create_permission(self, permission_key: str, permission_name: str, description: Optional[str] = None, custom_permission_id: Optional[str] = None) -> PermissionDoc: + async def create_permission(self, permission_key: str, permission_name: str, description: Optional[str] = None) -> PermissionDoc: """Create a new permission document""" - return await self.permission_handler.create_permission(permission_key, permission_name, description, custom_permission_id) + return await self.permission_handler.create_permission(permission_key, permission_name, description) - async def update_permission(self, permission_id: str, permission_key: Optional[str] = None, permission_name: Optional[str] = None, description: Optional[str] = None, custom_permission_id: Optional[str] = None) -> PermissionDoc: + async def update_permission(self, permission_id: str, permission_key: Optional[str] = None, permission_name: Optional[str] = None, description: Optional[str] = None) -> PermissionDoc: """Update an existing permission document by id""" - return await self.permission_handler.update_permission(PydanticObjectId(permission_id), permission_key, permission_name, description, custom_permission_id) + return await self.permission_handler.update_permission(PydanticObjectId(permission_id), permission_key, permission_name, description) + async def create_or_update_permission(self, permission_key: str, permission_name: str, custom_permission_id: Optional[str], description: Optional[str] = None) -> PermissionDoc: + """Create or update a permission document""" + return await self.permission_handler.create_or_update_permission(permission_key, permission_name, custom_permission_id, description) + async def query_permissions(self, permission_key: Optional[str] = None, permission_name: Optional[str] = None, page: int = 1, page_size: int = 10) -> Dict[str, Any]: """Query permissions with pagination and fuzzy search""" if page < 1 or page_size < 1: @@ -30,6 +34,13 @@ class PermissionService: "page": page, "page_size": page_size } + async def query_permissions_no_pagination(self, permission_id: Optional[str] = None, permission_key: Optional[str] = None, permission_name: Optional[str] = None) -> Dict[str, Any]: + """Query permissions fuzzy search""" + docs, total = await self.permission_handler.query_permissions_no_pagination(permission_id, permission_key, permission_name) + return { + "items": [doc.dict() for doc in docs], + "total": total + } async def delete_permission(self, permission_id: str) -> None: """Delete a permission document after checking if it is referenced by any role""" diff --git a/apps/authentication/backend/services/permission/role_service.py b/apps/authentication/backend/services/permission/role_service.py index 143b381..97f84e2 100644 --- a/apps/authentication/backend/services/permission/role_service.py +++ b/apps/authentication/backend/services/permission/role_service.py @@ -10,18 +10,22 @@ class RoleService: def __init__(self): self.role_handler = RoleHandler() - async def create_role(self, role_key: str, role_name: str, role_description: Optional[str], role_level: int, custom_role_id: Optional[str] = None) -> RoleDoc: + async def create_role(self, role_key: str, role_name: str, role_description: Optional[str], role_level: int) -> RoleDoc: """Create a new role, ensuring role_key and role_name are unique and not empty""" - doc = await self.role_handler.create_role(role_key, role_name, role_description, role_level, custom_role_id) + doc = await self.role_handler.create_role(role_key, role_name, role_description, role_level) return doc - async def update_role(self, role_id: str, role_key: str, role_name: str, role_description: Optional[str], role_level: int, custom_role_id: Optional[str] = None) -> RoleDoc: + async def update_role(self, role_id: str, role_key: str, role_name: str, role_description: Optional[str], role_level: int) -> RoleDoc: """Update an existing role, ensuring role_key and role_name are unique and not empty""" - doc = await self.role_handler.update_role(PydanticObjectId(role_id), role_key, role_name, role_description, role_level, custom_role_id) + doc = await self.role_handler.update_role(PydanticObjectId(role_id), role_key, role_name, role_description, role_level) return doc + async def create_or_update_role(self, role_key: str, role_name: str, role_level: int, custom_role_id: Optional[str], role_description: Optional[str] = None) -> RoleDoc: + """Create or update a role document""" + return await self.role_handler.create_or_update_role(role_key, role_name, role_level, custom_role_id, role_description) + async def query_roles(self, role_key: Optional[str], role_name: Optional[str], page: int = 1, page_size: int = 10) -> Dict[str, Any]: """Query roles with pagination and fuzzy search by role_key and role_name""" if page < 1 or page_size < 1: @@ -35,14 +39,18 @@ class RoleService: "page_size": page_size } + async def query_roles_no_pagination(self, role_id: Optional[str] = None, role_key: Optional[str] = None, role_name: Optional[str] = None) -> Dict[str, Any]: + """Query roles fuzzy search without pagination""" + docs, total = await self.role_handler.query_roles_no_pagination(role_id, role_key, role_name) + return { + "items": [doc.dict() for doc in docs], + "total": total + } + async def assign_permissions_to_role(self, role_id: str, permission_ids: List[str]) -> RoleDoc: """Assign permissions to a role by updating the permission_ids field""" return await self.role_handler.assign_permissions_to_role(PydanticObjectId(role_id), permission_ids) - async def query_roles_by_id(self, role_id: str) -> RoleDoc: - """Query a single role by ID""" - return await self.role_handler.query_roles_by_id(PydanticObjectId(role_id)) - async def delete_role(self, role_id: str) -> None: """Delete a role document after checking if it is referenced by any user""" return await self.role_handler.delete_role(PydanticObjectId(role_id)) \ No newline at end of file diff --git a/apps/authentication/webapi/routes/permission/__init__.py b/apps/authentication/webapi/routes/permission/__init__.py index cf1aafb..49fb7b2 100644 --- a/apps/authentication/webapi/routes/permission/__init__.py +++ b/apps/authentication/webapi/routes/permission/__init__.py @@ -1,13 +1,18 @@ from fastapi import APIRouter +from .create_or_update_permission import router as cup_router from .create_permission import router as cp_router from .query_permission import router as qp_router from .update_permission import router as up_router from .delete_permission import router as delp_router +from .query_permission_no_pagination import router as qpno_router + router = APIRouter() +router.include_router(cup_router, prefix="/permission", tags=["permission"]) router.include_router(cp_router, prefix="/permission", tags=["permission"]) router.include_router(qp_router, prefix="/permission", tags=["permission"]) router.include_router(up_router, prefix="/permission", tags=["permission"]) -router.include_router(delp_router, prefix="/permission", tags=["permission"]) \ No newline at end of file +router.include_router(delp_router, prefix="/permission", tags=["permission"]) +router.include_router(qpno_router, prefix="/permission", tags=["permission"]) \ No newline at end of file diff --git a/apps/authentication/webapi/routes/role/__init__.py b/apps/authentication/webapi/routes/role/__init__.py index f82d8b7..c71537c 100644 --- a/apps/authentication/webapi/routes/role/__init__.py +++ b/apps/authentication/webapi/routes/role/__init__.py @@ -1,16 +1,18 @@ from fastapi import APIRouter +from .create_or_update_role import router as create_or_update_role_router from .create_role import router as create_role_router from .update_role import router as update_role_router from .query_role import router as query_role_router -from .query_role_by_id import router as query_role_by_id_router +from .query_role_no_pagination import router as query_role_no_pagination_router from .assign_permissions import router as assign_permissions_router from .delete_role import router as delete_role_router router = APIRouter() +router.include_router(create_or_update_role_router, prefix="/role", tags=["role"]) router.include_router(create_role_router, prefix="/role", tags=["role"]) router.include_router(update_role_router, prefix="/role", tags=["role"]) router.include_router(query_role_router, prefix="/role", tags=["role"]) -router.include_router(query_role_by_id_router, prefix="/role", tags=["role"]) +router.include_router(query_role_no_pagination_router, prefix="/role", tags=["role"]) router.include_router(assign_permissions_router, prefix="/role", tags=["role"]) router.include_router(delete_role_router, prefix="/role", tags=["role"]) \ No newline at end of file