Merge pull request 'tania_authentication_local' (#77) from tania_authentication_local into dev
Reviewed-on: freeleaps/freeleaps-service-hub#77
This commit is contained in:
commit
75813a3599
@ -62,6 +62,66 @@ class PermissionHandler:
|
|||||||
await doc.save()
|
await doc.save()
|
||||||
return doc
|
return doc
|
||||||
|
|
||||||
|
async def create_or_update_permission(self, permission_key: str, permission_name: str, custom_permission_id: Optional[str], description: Optional[str] = None) -> Optional[PermissionDoc]:
|
||||||
|
"""Create or update a permission document"""
|
||||||
|
# Input validation
|
||||||
|
if not permission_key or not permission_name:
|
||||||
|
raise RequestValidationError("permission_key and permission_name are required.")
|
||||||
|
|
||||||
|
def create_new_doc():
|
||||||
|
return PermissionDoc(
|
||||||
|
permission_key=permission_key,
|
||||||
|
permission_name=permission_name,
|
||||||
|
description=description,
|
||||||
|
created_at=datetime.now(),
|
||||||
|
updated_at=datetime.now()
|
||||||
|
)
|
||||||
|
|
||||||
|
def update_doc_fields(doc):
|
||||||
|
doc.permission_key = permission_key
|
||||||
|
doc.permission_name = permission_name
|
||||||
|
doc.description = description
|
||||||
|
doc.updated_at = datetime.now()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Check if permission with this key already exists
|
||||||
|
existing_doc = await PermissionDoc.find_one(
|
||||||
|
{str(PermissionDoc.permission_key): permission_key}
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
existing_doc = None
|
||||||
|
|
||||||
|
if existing_doc:
|
||||||
|
# If permission with this key already exists
|
||||||
|
if custom_permission_id and str(custom_permission_id) != str(existing_doc.id):
|
||||||
|
# Different ID provided - replace the document
|
||||||
|
id_conflict = await PermissionDoc.get(custom_permission_id)
|
||||||
|
if id_conflict:
|
||||||
|
raise RequestValidationError("Permission with the provided ID already exists.")
|
||||||
|
|
||||||
|
new_doc = create_new_doc()
|
||||||
|
new_doc.id = PydanticObjectId(custom_permission_id)
|
||||||
|
await new_doc.insert()
|
||||||
|
await existing_doc.delete()
|
||||||
|
return new_doc
|
||||||
|
else:
|
||||||
|
# Same ID or no ID provided - update existing document
|
||||||
|
update_doc_fields(existing_doc)
|
||||||
|
await existing_doc.save()
|
||||||
|
return existing_doc
|
||||||
|
else:
|
||||||
|
# If no existing document with this key, create new document
|
||||||
|
new_doc = create_new_doc()
|
||||||
|
|
||||||
|
if custom_permission_id:
|
||||||
|
id_conflict = await PermissionDoc.get(custom_permission_id)
|
||||||
|
if id_conflict:
|
||||||
|
raise RequestValidationError("Permission with the provided ID already exists.")
|
||||||
|
new_doc.id = PydanticObjectId(custom_permission_id)
|
||||||
|
|
||||||
|
await new_doc.insert()
|
||||||
|
return new_doc
|
||||||
|
|
||||||
async def query_permissions(
|
async def query_permissions(
|
||||||
self,
|
self,
|
||||||
permission_key: Optional[str] = None,
|
permission_key: Optional[str] = None,
|
||||||
@ -80,6 +140,28 @@ class PermissionHandler:
|
|||||||
docs = await cursor.skip(skip).limit(limit).to_list()
|
docs = await cursor.skip(skip).limit(limit).to_list()
|
||||||
return docs, total
|
return docs, total
|
||||||
|
|
||||||
|
async def query_permissions_no_pagination(
|
||||||
|
self,
|
||||||
|
permission_id: Optional[str] = None,
|
||||||
|
permission_key: Optional[str] = None,
|
||||||
|
permission_name: Optional[str] = None
|
||||||
|
) -> Tuple[List[PermissionDoc], int]:
|
||||||
|
"""Query permissions fuzzy search"""
|
||||||
|
query = {}
|
||||||
|
if permission_id:
|
||||||
|
try:
|
||||||
|
query[str(PermissionDoc.id)] = PydanticObjectId(permission_id)
|
||||||
|
except Exception:
|
||||||
|
raise RequestValidationError("Invalid permission_id format. Must be a valid ObjectId.")
|
||||||
|
if permission_key:
|
||||||
|
query[str(PermissionDoc.permission_key)] = {"$regex": permission_key, "$options": "i"}
|
||||||
|
if permission_name:
|
||||||
|
query[str(PermissionDoc.permission_name)] = {"$regex": permission_name, "$options": "i"}
|
||||||
|
cursor = PermissionDoc.find(query)
|
||||||
|
total = await cursor.count()
|
||||||
|
docs = await cursor.to_list()
|
||||||
|
return docs, total
|
||||||
|
|
||||||
async def delete_permission(self, permission_id: PydanticObjectId) -> None:
|
async def delete_permission(self, permission_id: PydanticObjectId) -> None:
|
||||||
"""Delete a permission document after checking if it is referenced by any role and is not default"""
|
"""Delete a permission document after checking if it is referenced by any role and is not default"""
|
||||||
if not permission_id:
|
if not permission_id:
|
||||||
|
|||||||
@ -61,6 +61,66 @@ class RoleHandler:
|
|||||||
await doc.save()
|
await doc.save()
|
||||||
return doc
|
return doc
|
||||||
|
|
||||||
|
async def create_or_update_role(self, role_key: str, role_name: str, role_level: int, custom_role_id: Optional[str], role_description: Optional[str] = None) -> Optional[RoleDoc]:
|
||||||
|
"""Create or update a role document"""
|
||||||
|
# Input validation
|
||||||
|
if not role_key or not role_name:
|
||||||
|
raise RequestValidationError("role_key and role_name are required.")
|
||||||
|
|
||||||
|
def create_new_doc():
|
||||||
|
return RoleDoc(
|
||||||
|
role_key=role_key,
|
||||||
|
role_name=role_name,
|
||||||
|
role_description=role_description,
|
||||||
|
role_level=role_level,
|
||||||
|
permission_ids=[],
|
||||||
|
created_at=datetime.now(),
|
||||||
|
updated_at=datetime.now()
|
||||||
|
)
|
||||||
|
def update_doc_fields(doc):
|
||||||
|
doc.role_key = role_key
|
||||||
|
doc.role_name = role_name
|
||||||
|
doc.role_description = role_description
|
||||||
|
doc.role_level = role_level
|
||||||
|
doc.updated_at = datetime.now()
|
||||||
|
|
||||||
|
# Check if role with this key already exists
|
||||||
|
existing_doc = await RoleDoc.find_one(
|
||||||
|
{str(RoleDoc.role_key): role_key}
|
||||||
|
)
|
||||||
|
|
||||||
|
if existing_doc:
|
||||||
|
# If role with this key already exists
|
||||||
|
if custom_role_id and str(custom_role_id) != str(existing_doc.id):
|
||||||
|
# Different ID provided - replace the document
|
||||||
|
id_conflict = await RoleDoc.get(custom_role_id)
|
||||||
|
if id_conflict:
|
||||||
|
raise RequestValidationError("Role with the provided ID already exists.")
|
||||||
|
|
||||||
|
new_doc = create_new_doc()
|
||||||
|
new_doc.id = PydanticObjectId(custom_role_id)
|
||||||
|
await new_doc.insert()
|
||||||
|
await existing_doc.delete()
|
||||||
|
return new_doc
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Same ID or no ID provided - update existing document
|
||||||
|
update_doc_fields(existing_doc)
|
||||||
|
await existing_doc.save()
|
||||||
|
return existing_doc
|
||||||
|
else:
|
||||||
|
# If no existing document with this key, create new document
|
||||||
|
new_doc = create_new_doc()
|
||||||
|
|
||||||
|
if custom_role_id:
|
||||||
|
id_conflict = await RoleDoc.get(custom_role_id)
|
||||||
|
if id_conflict:
|
||||||
|
raise RequestValidationError("Role with the provided ID already exists.")
|
||||||
|
new_doc.id = PydanticObjectId(custom_role_id)
|
||||||
|
|
||||||
|
await new_doc.insert()
|
||||||
|
return new_doc
|
||||||
|
|
||||||
async def query_roles(self, role_key: Optional[str], role_name: Optional[str], skip: int = 0, limit: int = 10) -> \
|
async def query_roles(self, role_key: Optional[str], role_name: Optional[str], skip: int = 0, limit: int = 10) -> \
|
||||||
Tuple[List[RoleDoc], int]:
|
Tuple[List[RoleDoc], int]:
|
||||||
"""Query roles with pagination and fuzzy search by role_key and role_name"""
|
"""Query roles with pagination and fuzzy search by role_key and role_name"""
|
||||||
@ -74,6 +134,28 @@ class RoleHandler:
|
|||||||
docs = await cursor.skip(skip).limit(limit).to_list()
|
docs = await cursor.skip(skip).limit(limit).to_list()
|
||||||
return docs, total
|
return docs, total
|
||||||
|
|
||||||
|
async def query_roles_no_pagination(
|
||||||
|
self,
|
||||||
|
role_id: Optional[str] = None,
|
||||||
|
role_key: Optional[str] = None,
|
||||||
|
role_name: Optional[str] = None
|
||||||
|
) -> Tuple[List[RoleDoc], int]:
|
||||||
|
"""Query roles fuzzy search without pagination"""
|
||||||
|
query = {}
|
||||||
|
if role_id:
|
||||||
|
try:
|
||||||
|
query[str(RoleDoc.id)] = PydanticObjectId(role_id)
|
||||||
|
except Exception:
|
||||||
|
raise RequestValidationError("Invalid role_id format. Must be a valid ObjectId.")
|
||||||
|
if role_key:
|
||||||
|
query[str(RoleDoc.role_key)] = {"$regex": role_key, "$options": "i"}
|
||||||
|
if role_name:
|
||||||
|
query[str(RoleDoc.role_name)] = {"$regex": role_name, "$options": "i"}
|
||||||
|
cursor = RoleDoc.find(query)
|
||||||
|
total = await cursor.count()
|
||||||
|
docs = await cursor.to_list()
|
||||||
|
return docs, total
|
||||||
|
|
||||||
async def assign_permissions_to_role(self, role_id: PydanticObjectId, permission_ids: List[str]) -> Optional[RoleDoc]:
|
async def assign_permissions_to_role(self, role_id: PydanticObjectId, permission_ids: List[str]) -> Optional[RoleDoc]:
|
||||||
"""Assign permissions to a role by updating the permission_ids field"""
|
"""Assign permissions to a role by updating the permission_ids field"""
|
||||||
if not role_id or not permission_ids:
|
if not role_id or not permission_ids:
|
||||||
|
|||||||
@ -1,32 +1,4 @@
|
|||||||
from dataclasses import dataclass
|
from enum import IntEnum
|
||||||
from enum import IntEnum, Enum
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True) # frozen=True
|
|
||||||
class DefaultRole:
|
|
||||||
role_name: str
|
|
||||||
role_key: str
|
|
||||||
role_description: str
|
|
||||||
role_level: int
|
|
||||||
|
|
||||||
|
|
||||||
# Default roles, which all tenants will have, cannot be modified.
|
|
||||||
class DefaultRoleEnum(Enum):
|
|
||||||
ADMIN = DefaultRole("Administrator", "admin", "Have all permissions", 0)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass(frozen=True) # frozen=True
|
|
||||||
class DefaultPermission:
|
|
||||||
permission_key: str
|
|
||||||
permission_name: str
|
|
||||||
permission_description: str
|
|
||||||
|
|
||||||
|
|
||||||
# Default permissions, which all tenants will have, cannot be modified.
|
|
||||||
class DefaultPermissionEnum(Enum):
|
|
||||||
CHANGE_ROLES = DefaultPermission("change:roles", "Change roles", "Add/Update/Delete roles")
|
|
||||||
CHANGE_PERMISSIONS = DefaultPermission("change:permissions", "Change permissions", "Add/Update/Remove permissions")
|
|
||||||
ASSIGN_ROLES = DefaultPermission("assign:roles", "Assign roles", "Assign roles to user")
|
|
||||||
|
|
||||||
|
|
||||||
class AdministrativeRole(IntEnum):
|
class AdministrativeRole(IntEnum):
|
||||||
|
|||||||
@ -26,6 +26,7 @@ class RoleDoc(Document):
|
|||||||
role_description: Optional[str] = None
|
role_description: Optional[str] = None
|
||||||
permission_ids: list[str]
|
permission_ids: list[str]
|
||||||
role_level: int
|
role_level: int
|
||||||
|
revision_id: Optional[str] = None # Revision ID for version control
|
||||||
created_at: datetime = datetime.now() # Creation timestamp, auto-generated
|
created_at: datetime = datetime.now() # Creation timestamp, auto-generated
|
||||||
updated_at: datetime = datetime.now() # Last update timestamp, auto-updated
|
updated_at: datetime = datetime.now() # Last update timestamp, auto-updated
|
||||||
is_default: bool = False
|
is_default: bool = False
|
||||||
|
|||||||
@ -18,6 +18,10 @@ class PermissionService:
|
|||||||
"""Update an existing permission document by id"""
|
"""Update an existing permission document by id"""
|
||||||
return await self.permission_handler.update_permission(PydanticObjectId(permission_id), permission_key, permission_name, description)
|
return await self.permission_handler.update_permission(PydanticObjectId(permission_id), permission_key, permission_name, description)
|
||||||
|
|
||||||
|
async def create_or_update_permission(self, permission_key: str, permission_name: str, custom_permission_id: Optional[str], description: Optional[str] = None) -> PermissionDoc:
|
||||||
|
"""Create or update a permission document"""
|
||||||
|
return await self.permission_handler.create_or_update_permission(permission_key, permission_name, custom_permission_id, description)
|
||||||
|
|
||||||
async def query_permissions(self, permission_key: Optional[str] = None, permission_name: Optional[str] = None, page: int = 1, page_size: int = 10) -> Dict[str, Any]:
|
async def query_permissions(self, permission_key: Optional[str] = None, permission_name: Optional[str] = None, page: int = 1, page_size: int = 10) -> Dict[str, Any]:
|
||||||
"""Query permissions with pagination and fuzzy search"""
|
"""Query permissions with pagination and fuzzy search"""
|
||||||
if page < 1 or page_size < 1:
|
if page < 1 or page_size < 1:
|
||||||
@ -30,6 +34,13 @@ class PermissionService:
|
|||||||
"page": page,
|
"page": page,
|
||||||
"page_size": page_size
|
"page_size": page_size
|
||||||
}
|
}
|
||||||
|
async def query_permissions_no_pagination(self, permission_id: Optional[str] = None, permission_key: Optional[str] = None, permission_name: Optional[str] = None) -> Dict[str, Any]:
|
||||||
|
"""Query permissions fuzzy search"""
|
||||||
|
docs, total = await self.permission_handler.query_permissions_no_pagination(permission_id, permission_key, permission_name)
|
||||||
|
return {
|
||||||
|
"items": [doc.dict() for doc in docs],
|
||||||
|
"total": total
|
||||||
|
}
|
||||||
|
|
||||||
async def delete_permission(self, permission_id: str) -> None:
|
async def delete_permission(self, permission_id: str) -> None:
|
||||||
"""Delete a permission document after checking if it is referenced by any role"""
|
"""Delete a permission document after checking if it is referenced by any role"""
|
||||||
|
|||||||
@ -22,6 +22,10 @@ class RoleService:
|
|||||||
doc = await self.role_handler.update_role(PydanticObjectId(role_id), role_key, role_name, role_description, role_level)
|
doc = await self.role_handler.update_role(PydanticObjectId(role_id), role_key, role_name, role_description, role_level)
|
||||||
return doc
|
return doc
|
||||||
|
|
||||||
|
async def create_or_update_role(self, role_key: str, role_name: str, role_level: int, custom_role_id: Optional[str], role_description: Optional[str] = None) -> RoleDoc:
|
||||||
|
"""Create or update a role document"""
|
||||||
|
return await self.role_handler.create_or_update_role(role_key, role_name, role_level, custom_role_id, role_description)
|
||||||
|
|
||||||
async def query_roles(self, role_key: Optional[str], role_name: Optional[str], page: int = 1, page_size: int = 10) -> Dict[str, Any]:
|
async def query_roles(self, role_key: Optional[str], role_name: Optional[str], page: int = 1, page_size: int = 10) -> Dict[str, Any]:
|
||||||
"""Query roles with pagination and fuzzy search by role_key and role_name"""
|
"""Query roles with pagination and fuzzy search by role_key and role_name"""
|
||||||
if page < 1 or page_size < 1:
|
if page < 1 or page_size < 1:
|
||||||
@ -35,6 +39,14 @@ class RoleService:
|
|||||||
"page_size": page_size
|
"page_size": page_size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async def query_roles_no_pagination(self, role_id: Optional[str] = None, role_key: Optional[str] = None, role_name: Optional[str] = None) -> Dict[str, Any]:
|
||||||
|
"""Query roles fuzzy search without pagination"""
|
||||||
|
docs, total = await self.role_handler.query_roles_no_pagination(role_id, role_key, role_name)
|
||||||
|
return {
|
||||||
|
"items": [doc.dict() for doc in docs],
|
||||||
|
"total": total
|
||||||
|
}
|
||||||
|
|
||||||
async def assign_permissions_to_role(self, role_id: str, permission_ids: List[str]) -> RoleDoc:
|
async def assign_permissions_to_role(self, role_id: str, permission_ids: List[str]) -> RoleDoc:
|
||||||
"""Assign permissions to a role by updating the permission_ids field"""
|
"""Assign permissions to a role by updating the permission_ids field"""
|
||||||
return await self.role_handler.assign_permissions_to_role(PydanticObjectId(role_id), permission_ids)
|
return await self.role_handler.assign_permissions_to_role(PydanticObjectId(role_id), permission_ids)
|
||||||
|
|||||||
@ -11,7 +11,6 @@ from webapi.providers import metrics
|
|||||||
|
|
||||||
# from webapi.providers import scheduler
|
# from webapi.providers import scheduler
|
||||||
from webapi.providers import exception_handler
|
from webapi.providers import exception_handler
|
||||||
from webapi.providers import permission_initialize
|
|
||||||
from .freeleaps_app import FreeleapsApp
|
from .freeleaps_app import FreeleapsApp
|
||||||
from common.config.app_settings import app_settings
|
from common.config.app_settings import app_settings
|
||||||
|
|
||||||
@ -24,7 +23,6 @@ def create_app() -> FastAPI:
|
|||||||
register(app, exception_handler)
|
register(app, exception_handler)
|
||||||
register(app, database)
|
register(app, database)
|
||||||
register(app, router)
|
register(app, router)
|
||||||
register(app, permission_initialize)
|
|
||||||
# register(app, scheduler)
|
# register(app, scheduler)
|
||||||
register(app, common)
|
register(app, common)
|
||||||
|
|
||||||
|
|||||||
@ -1,42 +0,0 @@
|
|||||||
import logging
|
|
||||||
|
|
||||||
from backend.models.permission import PermissionDoc, RoleDoc
|
|
||||||
from backend.models.permission.constants import DefaultPermissionEnum, DefaultRoleEnum
|
|
||||||
|
|
||||||
|
|
||||||
def register(app):
|
|
||||||
# Configure logging for pymongo
|
|
||||||
logging.getLogger("init_admin_permission").setLevel(logging.INFO) # Suppress DEBUG logs
|
|
||||||
|
|
||||||
@app.on_event("startup")
|
|
||||||
async def init_admin_permission():
|
|
||||||
# Initialize permissions if not exist
|
|
||||||
default_permission_ids = []
|
|
||||||
for default_permission in \
|
|
||||||
[DefaultPermissionEnum.CHANGE_PERMISSIONS,
|
|
||||||
DefaultPermissionEnum.CHANGE_ROLES,
|
|
||||||
DefaultPermissionEnum.ASSIGN_ROLES]:
|
|
||||||
if not await PermissionDoc.find_one(
|
|
||||||
{str(PermissionDoc.permission_key): default_permission.value.permission_key}):
|
|
||||||
doc = await PermissionDoc(
|
|
||||||
permission_key=default_permission.value.permission_key,
|
|
||||||
permission_name=default_permission.value.permission_name,
|
|
||||||
description=default_permission.value.permission_description,
|
|
||||||
is_default=True,
|
|
||||||
).insert()
|
|
||||||
default_permission_ids.append(str(doc.id))
|
|
||||||
logging.info(f"default permissions initialized {default_permission_ids}")
|
|
||||||
# Initialize roles if not exist
|
|
||||||
default_role_ids = []
|
|
||||||
for default_role in [DefaultRoleEnum.ADMIN]:
|
|
||||||
if not await RoleDoc.find_one({str(RoleDoc.role_key): default_role.value.role_key}):
|
|
||||||
doc = await RoleDoc(
|
|
||||||
role_key=default_role.value.role_key,
|
|
||||||
role_name=default_role.value.role_name,
|
|
||||||
role_description=default_role.value.role_description,
|
|
||||||
permission_ids=default_permission_ids,
|
|
||||||
role_level=default_role.value.role_level,
|
|
||||||
is_default=True,
|
|
||||||
).insert()
|
|
||||||
default_role_ids.append(str(doc.id))
|
|
||||||
logging.info(f"default roles initialized {default_role_ids}")
|
|
||||||
@ -1,13 +1,18 @@
|
|||||||
from fastapi import APIRouter
|
from fastapi import APIRouter
|
||||||
|
from .create_or_update_permission import router as cup_router
|
||||||
from .create_permission import router as cp_router
|
from .create_permission import router as cp_router
|
||||||
from .query_permission import router as qp_router
|
from .query_permission import router as qp_router
|
||||||
from .update_permission import router as up_router
|
from .update_permission import router as up_router
|
||||||
from .delete_permission import router as delp_router
|
from .delete_permission import router as delp_router
|
||||||
|
from .query_permission_no_pagination import router as qpno_router
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
|
router.include_router(cup_router, prefix="/permission", tags=["permission"])
|
||||||
router.include_router(cp_router, prefix="/permission", tags=["permission"])
|
router.include_router(cp_router, prefix="/permission", tags=["permission"])
|
||||||
router.include_router(qp_router, prefix="/permission", tags=["permission"])
|
router.include_router(qp_router, prefix="/permission", tags=["permission"])
|
||||||
router.include_router(up_router, prefix="/permission", tags=["permission"])
|
router.include_router(up_router, prefix="/permission", tags=["permission"])
|
||||||
router.include_router(delp_router, prefix="/permission", tags=["permission"])
|
router.include_router(delp_router, prefix="/permission", tags=["permission"])
|
||||||
|
router.include_router(qpno_router, prefix="/permission", tags=["permission"])
|
||||||
@ -0,0 +1,45 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from fastapi import APIRouter, Depends
|
||||||
|
from pydantic import BaseModel
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from backend.services.permission.permission_service import PermissionService
|
||||||
|
from common.token.token_manager import TokenManager
|
||||||
|
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
token_manager = TokenManager()
|
||||||
|
permission_service = PermissionService()
|
||||||
|
|
||||||
|
|
||||||
|
class CreateOrUpdatePermissionRequest(BaseModel):
|
||||||
|
permission_key: str
|
||||||
|
permission_name: str
|
||||||
|
custom_permission_id: Optional[str] = None
|
||||||
|
description: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
class PermissionResponse(BaseModel):
|
||||||
|
id: str
|
||||||
|
permission_key: str
|
||||||
|
permission_name: str
|
||||||
|
description: Optional[str] = None
|
||||||
|
created_at: datetime
|
||||||
|
updated_at: datetime
|
||||||
|
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"/create-or-update",
|
||||||
|
response_model=PermissionResponse,
|
||||||
|
operation_id="create-or-update-permission",
|
||||||
|
summary="Create or Update Permission",
|
||||||
|
description="Create or update a permission by id."
|
||||||
|
)
|
||||||
|
async def create_or_update_permission(
|
||||||
|
req: CreateOrUpdatePermissionRequest,
|
||||||
|
#_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
|
||||||
|
) -> PermissionResponse:
|
||||||
|
doc = await permission_service.create_or_update_permission(req.permission_key, req.permission_name, req.custom_permission_id,
|
||||||
|
req.description)
|
||||||
|
return PermissionResponse(**doc.dict())
|
||||||
@ -4,7 +4,6 @@ from fastapi import APIRouter, Depends
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from backend.models.permission.constants import DefaultPermissionEnum
|
|
||||||
from backend.services.permission.permission_service import PermissionService
|
from backend.services.permission.permission_service import PermissionService
|
||||||
from common.token.token_manager import TokenManager
|
from common.token.token_manager import TokenManager
|
||||||
|
|
||||||
@ -37,7 +36,7 @@ class PermissionResponse(BaseModel):
|
|||||||
)
|
)
|
||||||
async def create_permission(
|
async def create_permission(
|
||||||
req: CreatePermissionRequest,
|
req: CreatePermissionRequest,
|
||||||
_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
|
#_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
|
||||||
) -> PermissionResponse:
|
) -> PermissionResponse:
|
||||||
doc = await permission_service.create_permission(req.permission_key, req.permission_name, req.description)
|
doc = await permission_service.create_permission(req.permission_key, req.permission_name, req.description)
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
from fastapi import APIRouter, Depends
|
from fastapi import APIRouter, Depends
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from backend.models.permission.constants import DefaultPermissionEnum
|
|
||||||
from backend.services.permission.permission_service import PermissionService
|
from backend.services.permission.permission_service import PermissionService
|
||||||
from common.token.token_manager import TokenManager
|
from common.token.token_manager import TokenManager
|
||||||
|
|
||||||
@ -27,7 +26,7 @@ class DeletePermissionResponse(BaseModel):
|
|||||||
)
|
)
|
||||||
async def delete_permission(
|
async def delete_permission(
|
||||||
req: DeletePermissionRequest,
|
req: DeletePermissionRequest,
|
||||||
_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
|
#_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
|
||||||
) -> DeletePermissionResponse:
|
) -> DeletePermissionResponse:
|
||||||
await permission_service.delete_permission(req.permission_id)
|
await permission_service.delete_permission(req.permission_id)
|
||||||
return DeletePermissionResponse(success=True)
|
return DeletePermissionResponse(success=True)
|
||||||
|
|||||||
@ -0,0 +1,45 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from fastapi import APIRouter
|
||||||
|
from pydantic import BaseModel
|
||||||
|
from typing import Optional, List
|
||||||
|
from backend.services.permission.permission_service import PermissionService
|
||||||
|
from common.token.token_manager import TokenManager
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
token_manager = TokenManager()
|
||||||
|
permission_service = PermissionService()
|
||||||
|
|
||||||
|
class QueryPermissionNoPaginationRequest(BaseModel):
|
||||||
|
permission_id: Optional[str] = None
|
||||||
|
permission_key: Optional[str] = None
|
||||||
|
permission_name: Optional[str] = None
|
||||||
|
|
||||||
|
class PermissionResponse(BaseModel):
|
||||||
|
id: str
|
||||||
|
permission_key: str
|
||||||
|
permission_name: str
|
||||||
|
description: Optional[str] = None
|
||||||
|
created_at: datetime
|
||||||
|
updated_at: datetime
|
||||||
|
|
||||||
|
class QueryPermissionNoPaginationResponse(BaseModel):
|
||||||
|
items: List[PermissionResponse]
|
||||||
|
total: int
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"/query_permission_no_pagination",
|
||||||
|
response_model=QueryPermissionNoPaginationResponse,
|
||||||
|
operation_id="query-permission-no-pagination",
|
||||||
|
summary="Query Permission No Pagination",
|
||||||
|
description="Query permissions fuzzy search."
|
||||||
|
)
|
||||||
|
async def query_permission_no_pagination(
|
||||||
|
req: QueryPermissionNoPaginationRequest,
|
||||||
|
) -> QueryPermissionNoPaginationResponse:
|
||||||
|
result = await permission_service.query_permissions_no_pagination(req.permission_id, req.permission_key, req.permission_name)
|
||||||
|
items = [PermissionResponse(**item) for item in result["items"]]
|
||||||
|
return QueryPermissionNoPaginationResponse(
|
||||||
|
items=items,
|
||||||
|
total=result["total"]
|
||||||
|
)
|
||||||
@ -4,7 +4,6 @@ from fastapi import APIRouter, Depends
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from backend.models.permission.constants import DefaultPermissionEnum
|
|
||||||
from backend.services.permission.permission_service import PermissionService
|
from backend.services.permission.permission_service import PermissionService
|
||||||
from common.token.token_manager import TokenManager
|
from common.token.token_manager import TokenManager
|
||||||
|
|
||||||
@ -39,7 +38,7 @@ class PermissionResponse(BaseModel):
|
|||||||
)
|
)
|
||||||
async def update_permission(
|
async def update_permission(
|
||||||
req: UpdatePermissionRequest,
|
req: UpdatePermissionRequest,
|
||||||
_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
|
#_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
|
||||||
) -> PermissionResponse:
|
) -> PermissionResponse:
|
||||||
doc = await permission_service.update_permission(req.permission_id, req.permission_key, req.permission_name,
|
doc = await permission_service.update_permission(req.permission_id, req.permission_key, req.permission_name,
|
||||||
req.description)
|
req.description)
|
||||||
|
|||||||
@ -1,14 +1,18 @@
|
|||||||
from fastapi import APIRouter
|
from fastapi import APIRouter
|
||||||
|
from .create_or_update_role import router as create_or_update_role_router
|
||||||
from .create_role import router as create_role_router
|
from .create_role import router as create_role_router
|
||||||
from .update_role import router as update_role_router
|
from .update_role import router as update_role_router
|
||||||
from .query_role import router as query_role_router
|
from .query_role import router as query_role_router
|
||||||
|
from .query_role_no_pagination import router as query_role_no_pagination_router
|
||||||
from .assign_permissions import router as assign_permissions_router
|
from .assign_permissions import router as assign_permissions_router
|
||||||
from .delete_role import router as delete_role_router
|
from .delete_role import router as delete_role_router
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
|
router.include_router(create_or_update_role_router, prefix="/role", tags=["role"])
|
||||||
router.include_router(create_role_router, prefix="/role", tags=["role"])
|
router.include_router(create_role_router, prefix="/role", tags=["role"])
|
||||||
router.include_router(update_role_router, prefix="/role", tags=["role"])
|
router.include_router(update_role_router, prefix="/role", tags=["role"])
|
||||||
router.include_router(query_role_router, prefix="/role", tags=["role"])
|
router.include_router(query_role_router, prefix="/role", tags=["role"])
|
||||||
|
router.include_router(query_role_no_pagination_router, prefix="/role", tags=["role"])
|
||||||
router.include_router(assign_permissions_router, prefix="/role", tags=["role"])
|
router.include_router(assign_permissions_router, prefix="/role", tags=["role"])
|
||||||
router.include_router(delete_role_router, prefix="/role", tags=["role"])
|
router.include_router(delete_role_router, prefix="/role", tags=["role"])
|
||||||
@ -4,7 +4,6 @@ from fastapi import APIRouter, Depends
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
from backend.models.permission.constants import DefaultPermissionEnum
|
|
||||||
from backend.services.permission.role_service import RoleService
|
from backend.services.permission.role_service import RoleService
|
||||||
from common.token.token_manager import TokenManager
|
from common.token.token_manager import TokenManager
|
||||||
|
|
||||||
@ -35,7 +34,7 @@ class RoleResponse(BaseModel):
|
|||||||
)
|
)
|
||||||
async def assign_permissions_to_role(
|
async def assign_permissions_to_role(
|
||||||
req: AssignPermissionsRequest,
|
req: AssignPermissionsRequest,
|
||||||
_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
|
#_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
|
||||||
) -> RoleResponse:
|
) -> RoleResponse:
|
||||||
doc = await role_service.assign_permissions_to_role(req.role_id, req.permission_ids)
|
doc = await role_service.assign_permissions_to_role(req.role_id, req.permission_ids)
|
||||||
return RoleResponse(**doc.dict())
|
return RoleResponse(**doc.dict())
|
||||||
@ -0,0 +1,49 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from fastapi import APIRouter, Depends
|
||||||
|
from pydantic import BaseModel
|
||||||
|
from typing import Optional, List
|
||||||
|
|
||||||
|
from backend.services.permission.role_service import RoleService
|
||||||
|
from common.token.token_manager import TokenManager
|
||||||
|
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
token_manager = TokenManager()
|
||||||
|
role_service = RoleService()
|
||||||
|
|
||||||
|
|
||||||
|
class CreateOrUpdateRoleRequest(BaseModel):
|
||||||
|
role_key: str
|
||||||
|
role_name: str
|
||||||
|
role_level: int
|
||||||
|
custom_role_id: Optional[str] = None
|
||||||
|
role_description: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class RoleResponse(BaseModel):
|
||||||
|
id: str
|
||||||
|
role_key: str
|
||||||
|
role_name: str
|
||||||
|
role_description: Optional[str] = None
|
||||||
|
permission_ids: List[str]
|
||||||
|
role_level: int
|
||||||
|
created_at: datetime
|
||||||
|
updated_at: datetime
|
||||||
|
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"/create-or-update",
|
||||||
|
response_model=RoleResponse,
|
||||||
|
operation_id="create-or-update-role",
|
||||||
|
summary="Create or Update Role",
|
||||||
|
description="Create or update a role by id."
|
||||||
|
)
|
||||||
|
async def create_or_update_permission(
|
||||||
|
req: CreateOrUpdateRoleRequest,
|
||||||
|
#_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
|
||||||
|
) -> RoleResponse:
|
||||||
|
doc = await role_service.create_or_update_role(req.role_key, req.role_name, req.role_level, req.custom_role_id, req.role_description)
|
||||||
|
|
||||||
|
return RoleResponse(**doc.dict())
|
||||||
@ -4,7 +4,6 @@ from fastapi import APIRouter, Depends
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from typing import Optional, List
|
from typing import Optional, List
|
||||||
|
|
||||||
from backend.models.permission.constants import DefaultPermissionEnum
|
|
||||||
from backend.services.permission.role_service import RoleService
|
from backend.services.permission.role_service import RoleService
|
||||||
from common.token.token_manager import TokenManager
|
from common.token.token_manager import TokenManager
|
||||||
|
|
||||||
@ -40,7 +39,7 @@ class RoleResponse(BaseModel):
|
|||||||
)
|
)
|
||||||
async def create_role(
|
async def create_role(
|
||||||
req: CreateRoleRequest,
|
req: CreateRoleRequest,
|
||||||
_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
|
# _: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
|
||||||
) -> RoleResponse:
|
) -> RoleResponse:
|
||||||
doc = await role_service.create_role(req.role_key, req.role_name, req.role_description, req.role_level)
|
doc = await role_service.create_role(req.role_key, req.role_name, req.role_description, req.role_level)
|
||||||
return RoleResponse(**doc.dict())
|
return RoleResponse(**doc.dict())
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
from fastapi import APIRouter, Depends
|
from fastapi import APIRouter, Depends
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from backend.models.permission.constants import DefaultPermissionEnum
|
|
||||||
from backend.services.permission.role_service import RoleService
|
from backend.services.permission.role_service import RoleService
|
||||||
from common.token.token_manager import TokenManager
|
from common.token.token_manager import TokenManager
|
||||||
|
|
||||||
@ -27,7 +26,7 @@ class DeleteRoleResponse(BaseModel):
|
|||||||
)
|
)
|
||||||
async def delete_role(
|
async def delete_role(
|
||||||
req: DeleteRoleRequest,
|
req: DeleteRoleRequest,
|
||||||
_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
|
#_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
|
||||||
) -> DeleteRoleResponse:
|
) -> DeleteRoleResponse:
|
||||||
await role_service.delete_role(req.role_id)
|
await role_service.delete_role(req.role_id)
|
||||||
return DeleteRoleResponse(success=True)
|
return DeleteRoleResponse(success=True)
|
||||||
|
|||||||
@ -0,0 +1,47 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from fastapi import APIRouter
|
||||||
|
from pydantic import BaseModel
|
||||||
|
from typing import Optional, List
|
||||||
|
from backend.services.permission.role_service import RoleService
|
||||||
|
from common.token.token_manager import TokenManager
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
token_manager = TokenManager()
|
||||||
|
role_service = RoleService()
|
||||||
|
|
||||||
|
class QueryRoleNoPaginationRequest(BaseModel):
|
||||||
|
role_id: Optional[str] = None
|
||||||
|
role_key: Optional[str] = None
|
||||||
|
role_name: Optional[str] = None
|
||||||
|
|
||||||
|
class RoleResponse(BaseModel):
|
||||||
|
id: str
|
||||||
|
role_key: str
|
||||||
|
role_name: str
|
||||||
|
role_description: Optional[str] = None
|
||||||
|
permission_ids: List[str]
|
||||||
|
role_level: int
|
||||||
|
created_at: datetime
|
||||||
|
updated_at: datetime
|
||||||
|
|
||||||
|
class QueryRoleNoPaginationResponse(BaseModel):
|
||||||
|
items: List[RoleResponse]
|
||||||
|
total: int
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"/query_role_no_pagination",
|
||||||
|
response_model=QueryRoleNoPaginationResponse,
|
||||||
|
operation_id="query-role-no-pagination",
|
||||||
|
summary="Query Role No Pagination",
|
||||||
|
description="Query roles fuzzy search without pagination."
|
||||||
|
)
|
||||||
|
async def query_role_no_pagination(
|
||||||
|
req: QueryRoleNoPaginationRequest,
|
||||||
|
) -> QueryRoleNoPaginationResponse:
|
||||||
|
result = await role_service.query_roles_no_pagination(req.role_id, req.role_key, req.role_name)
|
||||||
|
items = [RoleResponse(**item) for item in result["items"]]
|
||||||
|
return QueryRoleNoPaginationResponse(
|
||||||
|
items=items,
|
||||||
|
total=result["total"]
|
||||||
|
)
|
||||||
@ -4,7 +4,6 @@ from fastapi import APIRouter, Depends
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from typing import Optional, List
|
from typing import Optional, List
|
||||||
|
|
||||||
from backend.models.permission.constants import DefaultPermissionEnum
|
|
||||||
from backend.services.permission.role_service import RoleService
|
from backend.services.permission.role_service import RoleService
|
||||||
from common.token.token_manager import TokenManager
|
from common.token.token_manager import TokenManager
|
||||||
|
|
||||||
@ -41,7 +40,7 @@ class RoleResponse(BaseModel):
|
|||||||
)
|
)
|
||||||
async def update_role(
|
async def update_role(
|
||||||
req: UpdateRoleRequest,
|
req: UpdateRoleRequest,
|
||||||
_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
|
#_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
|
||||||
) -> RoleResponse:
|
) -> RoleResponse:
|
||||||
doc = await role_service.update_role(req.role_id, req.role_key, req.role_name, req.role_description, req.role_level)
|
doc = await role_service.update_role(req.role_id, req.role_key, req.role_name, req.role_description, req.role_level)
|
||||||
return RoleResponse(**doc.dict())
|
return RoleResponse(**doc.dict())
|
||||||
|
|||||||
@ -3,7 +3,6 @@ from fastapi.params import Depends
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
|
||||||
from backend.models.permission.constants import DefaultPermissionEnum
|
|
||||||
from backend.services.user.user_management_service import UserManagementService
|
from backend.services.user.user_management_service import UserManagementService
|
||||||
from common.token.token_manager import TokenManager
|
from common.token.token_manager import TokenManager
|
||||||
|
|
||||||
@ -31,7 +30,7 @@ class UserRoleResponse(BaseModel):
|
|||||||
)
|
)
|
||||||
async def assign_roles_to_user(
|
async def assign_roles_to_user(
|
||||||
req: AssignRolesRequest,
|
req: AssignRolesRequest,
|
||||||
_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.ASSIGN_ROLES.value.permission_key])),
|
#_: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.INVITE_COLLABORATOR.value.permission_key])),
|
||||||
) -> UserRoleResponse:
|
) -> UserRoleResponse:
|
||||||
doc = await user_management_service.assign_roles_to_user(req.user_id, req.role_ids)
|
doc = await user_management_service.assign_roles_to_user(req.user_id, req.role_ids)
|
||||||
return UserRoleResponse(**doc.dict())
|
return UserRoleResponse(**doc.dict())
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user