feat(role_management): Add a universal Depends for permission verification.

This commit is contained in:
icecheng 2025-07-21 14:51:00 +08:00
parent 4109018692
commit b8be65615b
16 changed files with 172 additions and 29 deletions

View File

@ -26,6 +26,7 @@ class DefaultPermission:
class DefaultPermissionEnum(Enum): class DefaultPermissionEnum(Enum):
CHANGE_ROLES = DefaultPermission("change:roles", "Change roles", "Add/Update/Delete roles") CHANGE_ROLES = DefaultPermission("change:roles", "Change roles", "Add/Update/Delete roles")
CHANGE_PERMISSIONS = DefaultPermission("change:permissions", "Change permissions", "Add/Update/Remove permissions") CHANGE_PERMISSIONS = DefaultPermission("change:permissions", "Change permissions", "Add/Update/Remove permissions")
ASSIGN_ROLES = DefaultPermission("assign:roles", "Assign roles", "Assign roles to user")
class AdministrativeRole(IntEnum): class AdministrativeRole(IntEnum):

View File

@ -0,0 +1,2 @@
USER_ROLE_NAMES = "role_names"
USER_PERMISSIONS = "user_permissions"

View File

@ -0,0 +1,77 @@
from typing import List
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from common.config.app_settings import app_settings
from common.constants.jwt_constants import USER_ROLE_NAMES, USER_PERMISSIONS
security = HTTPBearer()
class CurrentUser:
def __init__(self, user_id: str, user_role_names: List[str], user_permission_keys: List[str]):
self.user_id = user_id
self.user_role_names = user_role_names
self.user_permission_keys = user_permission_keys
def has_all_permissions(self, permissions: List[str]) -> bool:
"""Check if the user has all the specified permissions"""
if not permissions:
return True
return all(p in self.user_permission_keys for p in permissions)
def has_any_permissions(self, permissions: List[str]) -> bool:
"""Check if the user has at least one of the specified permissions"""
if not permissions:
return True
return any(p in self.user_permission_keys for p in permissions)
def decode_jwt_token(token: str):
payload = jwt.decode(
token,
app_settings.JWT_SECRET_KEY,
algorithms=[app_settings.JWT_ALGORITHM],
)
return payload
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> CurrentUser:
try:
payload = decode_jwt_token(credentials.credentials)
user = payload.get("subject")
if not user or "id" not in user:
raise HTTPException(status_code=401, detail="Invalid authentication token")
return CurrentUser(user.get("id"), user.get(USER_ROLE_NAMES), user.get(USER_PERMISSIONS))
except JWTError:
raise HTTPException(status_code=401, detail="Invalid authentication token")
def has_all_permissions(
permissions: List[str],
):
"""Check if the user has all the specified permissions"""
def inner_dependency(current_user: CurrentUser = Depends(get_current_user)):
if not current_user.has_all_permissions(permissions):
raise HTTPException(status_code=403, detail="Not allowed")
return True
return inner_dependency
def has_any_permissions(
permissions: List[str]
):
"""Check if the user has at least one of the specified permissions"""
def inner_dependency(current_user: CurrentUser = Depends(get_current_user)):
if not current_user.has_any_permissions(permissions):
raise HTTPException(status_code=403, detail="Not allowed")
return True
return inner_dependency

View File

@ -12,7 +12,10 @@ def register(app):
async def init_admin_permission(): async def init_admin_permission():
# Initialize permissions if not exist # Initialize permissions if not exist
default_permission_ids = [] default_permission_ids = []
for default_permission in [DefaultPermissionEnum.CHANGE_PERMISSIONS, DefaultPermissionEnum.CHANGE_ROLES]: for default_permission in \
[DefaultPermissionEnum.CHANGE_PERMISSIONS,
DefaultPermissionEnum.CHANGE_ROLES,
DefaultPermissionEnum.ASSIGN_ROLES]:
if not await PermissionDoc.find_one( if not await PermissionDoc.find_one(
{str(PermissionDoc.permission_key): default_permission.value.permission_key}): {str(PermissionDoc.permission_key): default_permission.value.permission_key}):
doc = await PermissionDoc( doc = await PermissionDoc(

View File

@ -1,20 +1,25 @@
from datetime import datetime from datetime import datetime
from fastapi import APIRouter from fastapi import APIRouter, Depends
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional from typing import Optional
from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.permission_service import PermissionService from backend.services.permission.permission_service import PermissionService
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
token_manager = TokenManager() token_manager = TokenManager()
permission_service = PermissionService() permission_service = PermissionService()
class CreatePermissionRequest(BaseModel): class CreatePermissionRequest(BaseModel):
permission_key: str permission_key: str
permission_name: str permission_name: str
description: Optional[str] = None description: Optional[str] = None
class PermissionResponse(BaseModel): class PermissionResponse(BaseModel):
id: str id: str
permission_key: str permission_key: str
@ -23,6 +28,7 @@ class PermissionResponse(BaseModel):
created_at: datetime created_at: datetime
updated_at: datetime updated_at: datetime
@router.post( @router.post(
"/create", "/create",
response_model=PermissionResponse, response_model=PermissionResponse,
@ -32,6 +38,7 @@ class PermissionResponse(BaseModel):
) )
async def create_permission( async def create_permission(
req: CreatePermissionRequest, req: CreatePermissionRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
) -> PermissionResponse: ) -> PermissionResponse:
doc = await permission_service.create_permission(req.permission_key, req.permission_name, req.description) doc = await permission_service.create_permission(req.permission_key, req.permission_name, req.description)

View File

@ -1,16 +1,22 @@
from fastapi import APIRouter from fastapi import APIRouter, Depends
from pydantic import BaseModel from pydantic import BaseModel
from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.permission_service import PermissionService from backend.services.permission.permission_service import PermissionService
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
permission_service = PermissionService() permission_service = PermissionService()
class DeletePermissionRequest(BaseModel): class DeletePermissionRequest(BaseModel):
permission_id: str permission_id: str
class DeletePermissionResponse(BaseModel): class DeletePermissionResponse(BaseModel):
success: bool success: bool
@router.post( @router.post(
"/delete", "/delete",
response_model=DeletePermissionResponse, response_model=DeletePermissionResponse,
@ -18,6 +24,9 @@ class DeletePermissionResponse(BaseModel):
summary="Delete Permission", summary="Delete Permission",
description="Delete a permission after checking if it is referenced by any role." description="Delete a permission after checking if it is referenced by any role."
) )
async def delete_permission(req: DeletePermissionRequest) -> DeletePermissionResponse: async def delete_permission(
req: DeletePermissionRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
) -> DeletePermissionResponse:
await permission_service.delete_permission(req.permission_id) await permission_service.delete_permission(req.permission_id)
return DeletePermissionResponse(success=True) return DeletePermissionResponse(success=True)

View File

@ -1,21 +1,26 @@
from datetime import datetime from datetime import datetime
from fastapi import APIRouter from fastapi import APIRouter, Depends
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional from typing import Optional
from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.permission_service import PermissionService from backend.services.permission.permission_service import PermissionService
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
token_manager = TokenManager() token_manager = TokenManager()
permission_service = PermissionService() permission_service = PermissionService()
class UpdatePermissionRequest(BaseModel): class UpdatePermissionRequest(BaseModel):
permission_id: str permission_id: str
permission_key: str permission_key: str
permission_name: str permission_name: str
description: Optional[str] = None description: Optional[str] = None
class PermissionResponse(BaseModel): class PermissionResponse(BaseModel):
id: str id: str
permission_key: str permission_key: str
@ -24,6 +29,7 @@ class PermissionResponse(BaseModel):
created_at: datetime created_at: datetime
updated_at: datetime updated_at: datetime
@router.post( @router.post(
"/update", "/update",
response_model=PermissionResponse, response_model=PermissionResponse,
@ -33,6 +39,8 @@ class PermissionResponse(BaseModel):
) )
async def update_permission( async def update_permission(
req: UpdatePermissionRequest, req: UpdatePermissionRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
) -> PermissionResponse: ) -> PermissionResponse:
doc = await permission_service.update_permission(req.permission_id, req.permission_key, req.permission_name, req.description) doc = await permission_service.update_permission(req.permission_id, req.permission_key, req.permission_name,
req.description)
return PermissionResponse(**doc.dict()) return PermissionResponse(**doc.dict())

View File

@ -1,10 +1,13 @@
from datetime import datetime from datetime import datetime
from fastapi import APIRouter from fastapi import APIRouter, Depends
from pydantic import BaseModel from pydantic import BaseModel
from typing import List from typing import List
from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.role_service import RoleService from backend.services.permission.role_service import RoleService
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
token_manager = TokenManager() token_manager = TokenManager()
@ -33,6 +36,7 @@ class RoleResponse(BaseModel):
) )
async def assign_permissions_to_role( async def assign_permissions_to_role(
req: AssignPermissionsRequest, req: AssignPermissionsRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
) -> RoleResponse: ) -> RoleResponse:
doc = await role_service.assign_permissions_to_role(req.role_id, req.permission_ids) doc = await role_service.assign_permissions_to_role(req.role_id, req.permission_ids)
return RoleResponse(**doc.dict()) return RoleResponse(**doc.dict())

View File

@ -1,21 +1,26 @@
from datetime import datetime from datetime import datetime
from fastapi import APIRouter from fastapi import APIRouter, Depends
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional, List from typing import Optional, List
from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.role_service import RoleService from backend.services.permission.role_service import RoleService
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
token_manager = TokenManager() token_manager = TokenManager()
role_service = RoleService() role_service = RoleService()
class CreateRoleRequest(BaseModel): class CreateRoleRequest(BaseModel):
role_key: str role_key: str
role_name: str role_name: str
role_description: Optional[str] = None role_description: Optional[str] = None
role_level: int role_level: int
class RoleResponse(BaseModel): class RoleResponse(BaseModel):
id: str id: str
role_key: str role_key: str
@ -26,6 +31,7 @@ class RoleResponse(BaseModel):
created_at: datetime created_at: datetime
updated_at: datetime updated_at: datetime
@router.post( @router.post(
"/create", "/create",
response_model=RoleResponse, response_model=RoleResponse,
@ -35,6 +41,7 @@ class RoleResponse(BaseModel):
) )
async def create_role( async def create_role(
req: CreateRoleRequest, req: CreateRoleRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
) -> RoleResponse: ) -> RoleResponse:
doc = await role_service.create_role(req.role_key, req.role_name, req.role_description, req.role_level) doc = await role_service.create_role(req.role_key, req.role_name, req.role_description, req.role_level)
return RoleResponse(**doc.dict()) return RoleResponse(**doc.dict())

View File

@ -1,16 +1,22 @@
from fastapi import APIRouter from fastapi import APIRouter, Depends
from pydantic import BaseModel from pydantic import BaseModel
from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.role_service import RoleService from backend.services.permission.role_service import RoleService
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
role_service = RoleService() role_service = RoleService()
class DeleteRoleRequest(BaseModel): class DeleteRoleRequest(BaseModel):
role_id: str role_id: str
class DeleteRoleResponse(BaseModel): class DeleteRoleResponse(BaseModel):
success: bool success: bool
@router.post( @router.post(
"/delete", "/delete",
response_model=DeleteRoleResponse, response_model=DeleteRoleResponse,
@ -18,6 +24,9 @@ class DeleteRoleResponse(BaseModel):
summary="Delete Role", summary="Delete Role",
description="Delete a role after checking if it is referenced by any user." description="Delete a role after checking if it is referenced by any user."
) )
async def delete_role(req: DeleteRoleRequest) -> DeleteRoleResponse: async def delete_role(
req: DeleteRoleRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
) -> DeleteRoleResponse:
await role_service.delete_role(req.role_id) await role_service.delete_role(req.role_id)
return DeleteRoleResponse(success=True) return DeleteRoleResponse(success=True)

View File

@ -1,15 +1,19 @@
from datetime import datetime from datetime import datetime
from fastapi import APIRouter from fastapi import APIRouter, Depends
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional, List from typing import Optional, List
from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.role_service import RoleService from backend.services.permission.role_service import RoleService
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
token_manager = TokenManager() token_manager = TokenManager()
role_service = RoleService() role_service = RoleService()
class UpdateRoleRequest(BaseModel): class UpdateRoleRequest(BaseModel):
role_id: str role_id: str
role_key: str role_key: str
@ -17,6 +21,7 @@ class UpdateRoleRequest(BaseModel):
role_description: Optional[str] = None role_description: Optional[str] = None
role_level: int role_level: int
class RoleResponse(BaseModel): class RoleResponse(BaseModel):
id: str id: str
role_key: str role_key: str
@ -27,6 +32,7 @@ class RoleResponse(BaseModel):
created_at: datetime created_at: datetime
updated_at: datetime updated_at: datetime
@router.post( @router.post(
"/update", "/update",
response_model=RoleResponse, response_model=RoleResponse,
@ -36,6 +42,7 @@ class RoleResponse(BaseModel):
) )
async def update_role( async def update_role(
req: UpdateRoleRequest, req: UpdateRoleRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
) -> RoleResponse: ) -> RoleResponse:
doc = await role_service.update_role(req.role_id, req.role_key, req.role_name, req.role_description, req.role_level) doc = await role_service.update_role(req.role_id, req.role_key, req.role_name, req.role_description, req.role_level)
return RoleResponse(**doc.dict()) return RoleResponse(**doc.dict())

View File

@ -8,6 +8,7 @@ from fastapi.responses import JSONResponse
from pydantic import BaseModel from pydantic import BaseModel
from backend.application.signin_hub import SignInHub from backend.application.signin_hub import SignInHub
from common.constants.jwt_constants import USER_ROLE_NAMES, USER_PERMISSIONS
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
@ -69,7 +70,7 @@ async def signin_with_email_and_code(item: RequestIn) -> ResponseOut:
) )
if signed_in and identity and adminstrative_role: if signed_in and identity and adminstrative_role:
subject = {"id": identity, "role": adminstrative_role, "role_names": user_role_names, "user_permissions": user_permission_keys} subject = {"id": identity, "role": adminstrative_role, USER_ROLE_NAMES: user_role_names, USER_PERMISSIONS: user_permission_keys}
access_token = token_manager.create_access_token(subject=subject) access_token = token_manager.create_access_token(subject=subject)
refresh_token = token_manager.create_refresh_token(subject=subject) refresh_token = token_manager.create_refresh_token(subject=subject)
expires_in = datetime.now(timezone.utc) + timedelta( expires_in = datetime.now(timezone.utc) + timedelta(
@ -87,8 +88,8 @@ async def signin_with_email_and_code(item: RequestIn) -> ResponseOut:
"identity": identity, "identity": identity,
"expires_in": expires_in, "expires_in": expires_in,
"role": adminstrative_role, "role": adminstrative_role,
"role_names": user_role_names, USER_ROLE_NAMES: user_role_names,
"user_permissions": user_permission_keys, USER_PERMISSIONS: user_permission_keys,
"flid": flid, "flid": flid,
"preferred_region": preferred_region, "preferred_region": preferred_region,
} }

View File

@ -10,6 +10,7 @@ from fastapi import Depends, HTTPException
from starlette.status import HTTP_401_UNAUTHORIZED from starlette.status import HTTP_401_UNAUTHORIZED
from backend.application.signin_hub import SignInHub from backend.application.signin_hub import SignInHub
from common.constants.jwt_constants import USER_ROLE_NAMES, USER_PERMISSIONS
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
router = APIRouter() router = APIRouter()
@ -66,7 +67,7 @@ async def signin_with_email_and_password(
) )
if signed_in and adminstrative_role and identity: if signed_in and adminstrative_role and identity:
subject = {"id": identity, "role": adminstrative_role, "role_names": user_role_names, "user_permissions": user_permission_keys} subject = {"id": identity, "role": adminstrative_role, USER_ROLE_NAMES: user_role_names, USER_PERMISSIONS: user_permission_keys}
access_token = token_manager.create_access_token(subject=subject) access_token = token_manager.create_access_token(subject=subject)
refresh_token = token_manager.create_refresh_token(subject=subject) refresh_token = token_manager.create_refresh_token(subject=subject)
expires_in = datetime.now(timezone.utc) + timedelta( expires_in = datetime.now(timezone.utc) + timedelta(
@ -84,8 +85,8 @@ async def signin_with_email_and_password(
"identity": identity, "identity": identity,
"expires_in": expires_in, "expires_in": expires_in,
"role": adminstrative_role, "role": adminstrative_role,
"role_names": user_role_names, USER_ROLE_NAMES: user_role_names,
"user_permissions": user_permission_keys, USER_PERMISSIONS: user_permission_keys,
"flid": flid, "flid": flid,
} }
return JSONResponse(content=jsonable_encoder(result)) return JSONResponse(content=jsonable_encoder(result))

View File

@ -1,21 +1,28 @@
from fastapi import APIRouter from fastapi import APIRouter
from fastapi.params import Depends
from pydantic import BaseModel from pydantic import BaseModel
from typing import List, Optional from typing import List, Optional
from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.user.user_management_service import UserManagementService from backend.services.user.user_management_service import UserManagementService
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
token_manager = TokenManager() token_manager = TokenManager()
user_management_service = UserManagementService() user_management_service = UserManagementService()
class AssignRolesRequest(BaseModel): class AssignRolesRequest(BaseModel):
user_id: str user_id: str
role_ids: List[str] role_ids: List[str]
class UserRoleResponse(BaseModel): class UserRoleResponse(BaseModel):
user_id: str user_id: str
role_ids: Optional[List[str]] role_ids: Optional[List[str]]
@router.post( @router.post(
"/assign-roles", "/assign-roles",
response_model=UserRoleResponse, response_model=UserRoleResponse,
@ -25,6 +32,7 @@ class UserRoleResponse(BaseModel):
) )
async def assign_roles_to_user( async def assign_roles_to_user(
req: AssignRolesRequest, req: AssignRolesRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.ASSIGN_ROLES.value.permission_key])),
) -> UserRoleResponse: ) -> UserRoleResponse:
doc = await user_management_service.assign_roles_to_user(req.user_id, req.role_ids) doc = await user_management_service.assign_roles_to_user(req.user_id, req.role_ids)
return UserRoleResponse(**doc.dict()) return UserRoleResponse(**doc.dict())

View File

@ -1 +0,0 @@
export RABBITMQ_HOST=localhost