feat(refactor): remove duplicate get_current_user

This commit is contained in:
icecheng 2025-07-25 10:00:08 +08:00
parent 0f7d63f4a2
commit 30e6ca72a7
15 changed files with 81 additions and 120 deletions

View File

@ -1,11 +1,34 @@
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
import uuid import uuid
from typing import Dict from typing import Dict, List
from jose import jwt, JWTError from jose import jwt, JWTError
from common.config.app_settings import app_settings from common.config.app_settings import app_settings
from fastapi import Depends, HTTPException from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette.status import HTTP_401_UNAUTHORIZED
from common.constants.jwt_constants import USER_ROLE_NAMES, USER_PERMISSIONS
class CurrentUser:
def __init__(self, user_id: str, user_role_names: List[str], user_permission_keys: List[str]):
self.user_id = user_id
self.user_role_names = user_role_names
self.user_permission_keys = user_permission_keys
def has_all_permissions(self, permissions: List[str]) -> bool:
"""Check if the user has all the specified permissions"""
if not permissions:
return True
return all(p in self.user_permission_keys for p in permissions)
def has_any_permissions(self, permissions: List[str]) -> bool:
"""Check if the user has at least one of the specified permissions"""
if not permissions:
return True
return any(p in self.user_permission_keys for p in permissions)
security = HTTPBearer()
class TokenManager: class TokenManager:
@ -73,16 +96,35 @@ class TokenManager:
else: else:
raise ValueError("Invalid refresh token") raise ValueError("Invalid refresh token")
async def get_current_user( async def get_current_user(self, credentials: HTTPAuthorizationCredentials = Depends(security)) -> CurrentUser:
self, token: str = Depends(OAuth2PasswordBearer(tokenUrl="token"))
) -> Dict:
""" """
Extract and validate user information from the JWT token. Returns the current user object for the given credentials.
""" """
try: try:
payload = self.decode_token(token) # Decode JWT token payload = self.decode_token(credentials.credentials)
return payload user = payload.get("subject")
except ValueError: if not user or "id" not in user:
raise HTTPException( raise HTTPException(status_code=401, detail="Invalid authentication token")
status_code=HTTP_401_UNAUTHORIZED, detail="Invalid or expired token" return CurrentUser(user.get("id"), user.get(USER_ROLE_NAMES), user.get(USER_PERMISSIONS))
) except JWTError:
raise HTTPException(status_code=401, detail="Invalid authentication token")
def has_all_permissions(self, permissions: List[str]):
"""Check if the user has all the specified permissions"""
def inner_dependency(current_user: CurrentUser = Depends(self.get_current_user)):
if not current_user.has_all_permissions(permissions):
raise HTTPException(status_code=403, detail="Not allowed")
return True
return inner_dependency
def has_any_permissions(self, permissions: List[str]):
"""Check if the user has at least one of the specified permissions"""
def inner_dependency(current_user: CurrentUser = Depends(self.get_current_user)):
if not current_user.has_any_permissions(permissions):
raise HTTPException(status_code=403, detail="Not allowed")
return True
return inner_dependency

View File

@ -1,77 +0,0 @@
from typing import List
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from common.config.app_settings import app_settings
from common.constants.jwt_constants import USER_ROLE_NAMES, USER_PERMISSIONS
security = HTTPBearer()
class CurrentUser:
def __init__(self, user_id: str, user_role_names: List[str], user_permission_keys: List[str]):
self.user_id = user_id
self.user_role_names = user_role_names
self.user_permission_keys = user_permission_keys
def has_all_permissions(self, permissions: List[str]) -> bool:
"""Check if the user has all the specified permissions"""
if not permissions:
return True
return all(p in self.user_permission_keys for p in permissions)
def has_any_permissions(self, permissions: List[str]) -> bool:
"""Check if the user has at least one of the specified permissions"""
if not permissions:
return True
return any(p in self.user_permission_keys for p in permissions)
def decode_jwt_token(token: str):
payload = jwt.decode(
token,
app_settings.JWT_SECRET_KEY,
algorithms=[app_settings.JWT_ALGORITHM],
)
return payload
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> CurrentUser:
try:
payload = decode_jwt_token(credentials.credentials)
user = payload.get("subject")
if not user or "id" not in user:
raise HTTPException(status_code=401, detail="Invalid authentication token")
return CurrentUser(user.get("id"), user.get(USER_ROLE_NAMES), user.get(USER_PERMISSIONS))
except JWTError:
raise HTTPException(status_code=401, detail="Invalid authentication token")
def has_all_permissions(
permissions: List[str],
):
"""Check if the user has all the specified permissions"""
def inner_dependency(current_user: CurrentUser = Depends(get_current_user)):
if not current_user.has_all_permissions(permissions):
raise HTTPException(status_code=403, detail="Not allowed")
return True
return inner_dependency
def has_any_permissions(
permissions: List[str]
):
"""Check if the user has at least one of the specified permissions"""
def inner_dependency(current_user: CurrentUser = Depends(get_current_user)):
if not current_user.has_any_permissions(permissions):
raise HTTPException(status_code=403, detail="Not allowed")
return True
return inner_dependency

View File

@ -1,5 +1,5 @@
# user with admin role # user with admin role
USER_EMAIL = "XXXXX" USER_EMAIL = "XXXX"
USER_PASSWORD = "XXXXX" USER_PASSWORD = "XXXX"
# authentication base url # authentication base url
BASE_URL = "http://localhost:8103" BASE_URL = "http://localhost:8103"

View File

@ -1,7 +1,6 @@
from webapi.bootstrap.application import create_app from webapi.bootstrap.application import create_app
from webapi.config.site_settings import site_settings from webapi.config.site_settings import site_settings
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn import uvicorn
from typing import Any from typing import Any

View File

@ -1,6 +1,6 @@
from backend.application.signin_hub import SignInHub from backend.application.signin_hub import SignInHub
from pydantic import BaseModel from pydantic import BaseModel
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager, CurrentUser
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
@ -28,14 +28,14 @@ class RequestIn(BaseModel):
) )
async def send_email_code( async def send_email_code(
item: RequestIn, item: RequestIn,
current_user: dict = Depends(token_manager.get_current_user), current_user: CurrentUser = Depends(token_manager.get_current_user),
): ):
user_id = current_user.get("id") user_id = current_user.user_id
if not user_id: if not user_id:
raise HTTPException( raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED, detail="Could not validate credentials" status_code=HTTP_401_UNAUTHORIZED, detail="Could not validate credentials"
) )
result = await SignInHub(user_id).send_email_code(item.sender_id, item.email) result = await SignInHub().send_email_code(item.sender_id, item.email)
return JSONResponse(content=jsonable_encoder(result)) return JSONResponse(content=jsonable_encoder(result))

View File

@ -1,6 +1,6 @@
from backend.application.signin_hub import SignInHub from backend.application.signin_hub import SignInHub
from pydantic import BaseModel from pydantic import BaseModel
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager, CurrentUser
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
@ -27,14 +27,14 @@ class RequestIn(BaseModel):
) )
async def send_email_code( async def send_email_code(
item: RequestIn, item: RequestIn,
current_user: dict = Depends(token_manager.get_current_user), current_user: CurrentUser = Depends(token_manager.get_current_user),
): ):
user_id = current_user.get("id") user_id = current_user.user_id
if not user_id: if not user_id:
raise HTTPException( raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED, detail="Could not validate credentials" status_code=HTTP_401_UNAUTHORIZED, detail="Could not validate credentials"
) )
result = await SignInHub(user_id).send_mobile_code(item.email) result = await SignInHub().send_mobile_code(item.email)
return JSONResponse(content=jsonable_encoder(result)) return JSONResponse(content=jsonable_encoder(result))

View File

@ -7,7 +7,6 @@ from typing import Optional
from backend.models.permission.constants import DefaultPermissionEnum from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.permission_service import PermissionService from backend.services.permission.permission_service import PermissionService
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
token_manager = TokenManager() token_manager = TokenManager()
@ -38,7 +37,7 @@ class PermissionResponse(BaseModel):
) )
async def create_permission( async def create_permission(
req: CreatePermissionRequest, req: CreatePermissionRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key])) _: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
) -> PermissionResponse: ) -> PermissionResponse:
doc = await permission_service.create_permission(req.permission_key, req.permission_name, req.description) doc = await permission_service.create_permission(req.permission_key, req.permission_name, req.description)

View File

@ -3,8 +3,9 @@ from pydantic import BaseModel
from backend.models.permission.constants import DefaultPermissionEnum from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.permission_service import PermissionService from backend.services.permission.permission_service import PermissionService
from common.utils.jwt_token import has_all_permissions from common.token.token_manager import TokenManager
token_manager = TokenManager()
router = APIRouter() router = APIRouter()
permission_service = PermissionService() permission_service = PermissionService()
@ -26,7 +27,7 @@ class DeletePermissionResponse(BaseModel):
) )
async def delete_permission( async def delete_permission(
req: DeletePermissionRequest, req: DeletePermissionRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key])) _: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
) -> DeletePermissionResponse: ) -> DeletePermissionResponse:
await permission_service.delete_permission(req.permission_id) await permission_service.delete_permission(req.permission_id)
return DeletePermissionResponse(success=True) return DeletePermissionResponse(success=True)

View File

@ -7,7 +7,7 @@ from typing import Optional
from backend.models.permission.constants import DefaultPermissionEnum from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.permission_service import PermissionService from backend.services.permission.permission_service import PermissionService
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
token_manager = TokenManager() token_manager = TokenManager()
@ -39,7 +39,7 @@ class PermissionResponse(BaseModel):
) )
async def update_permission( async def update_permission(
req: UpdatePermissionRequest, req: UpdatePermissionRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key])) _: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_PERMISSIONS.value.permission_key]))
) -> PermissionResponse: ) -> PermissionResponse:
doc = await permission_service.update_permission(req.permission_id, req.permission_key, req.permission_name, doc = await permission_service.update_permission(req.permission_id, req.permission_key, req.permission_name,
req.description) req.description)

View File

@ -7,7 +7,6 @@ from typing import List
from backend.models.permission.constants import DefaultPermissionEnum from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.role_service import RoleService from backend.services.permission.role_service import RoleService
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
token_manager = TokenManager() token_manager = TokenManager()
@ -36,7 +35,7 @@ class RoleResponse(BaseModel):
) )
async def assign_permissions_to_role( async def assign_permissions_to_role(
req: AssignPermissionsRequest, req: AssignPermissionsRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key])) _: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
) -> RoleResponse: ) -> RoleResponse:
doc = await role_service.assign_permissions_to_role(req.role_id, req.permission_ids) doc = await role_service.assign_permissions_to_role(req.role_id, req.permission_ids)
return RoleResponse(**doc.dict()) return RoleResponse(**doc.dict())

View File

@ -7,7 +7,6 @@ from typing import Optional, List
from backend.models.permission.constants import DefaultPermissionEnum from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.role_service import RoleService from backend.services.permission.role_service import RoleService
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
token_manager = TokenManager() token_manager = TokenManager()
@ -41,7 +40,7 @@ class RoleResponse(BaseModel):
) )
async def create_role( async def create_role(
req: CreateRoleRequest, req: CreateRoleRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key])) _: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
) -> RoleResponse: ) -> RoleResponse:
doc = await role_service.create_role(req.role_key, req.role_name, req.role_description, req.role_level) doc = await role_service.create_role(req.role_key, req.role_name, req.role_description, req.role_level)
return RoleResponse(**doc.dict()) return RoleResponse(**doc.dict())

View File

@ -3,8 +3,9 @@ from pydantic import BaseModel
from backend.models.permission.constants import DefaultPermissionEnum from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.role_service import RoleService from backend.services.permission.role_service import RoleService
from common.utils.jwt_token import has_all_permissions from common.token.token_manager import TokenManager
token_manager = TokenManager()
router = APIRouter() router = APIRouter()
role_service = RoleService() role_service = RoleService()
@ -26,7 +27,7 @@ class DeleteRoleResponse(BaseModel):
) )
async def delete_role( async def delete_role(
req: DeleteRoleRequest, req: DeleteRoleRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key])) _: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
) -> DeleteRoleResponse: ) -> DeleteRoleResponse:
await role_service.delete_role(req.role_id) await role_service.delete_role(req.role_id)
return DeleteRoleResponse(success=True) return DeleteRoleResponse(success=True)

View File

@ -7,7 +7,6 @@ from typing import Optional, List
from backend.models.permission.constants import DefaultPermissionEnum from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.permission.role_service import RoleService from backend.services.permission.role_service import RoleService
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
token_manager = TokenManager() token_manager = TokenManager()
@ -42,7 +41,7 @@ class RoleResponse(BaseModel):
) )
async def update_role( async def update_role(
req: UpdateRoleRequest, req: UpdateRoleRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key])) _: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.CHANGE_ROLES.value.permission_key]))
) -> RoleResponse: ) -> RoleResponse:
doc = await role_service.update_role(req.role_id, req.role_key, req.role_name, req.role_description, req.role_level) doc = await role_service.update_role(req.role_id, req.role_key, req.role_name, req.role_description, req.role_level)
return RoleResponse(**doc.dict()) return RoleResponse(**doc.dict())

View File

@ -1,7 +1,7 @@
from backend.application.signin_hub import SignInHub from backend.application.signin_hub import SignInHub
from pydantic import BaseModel from pydantic import BaseModel
from fastapi import APIRouter from fastapi import APIRouter
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager, CurrentUser
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
@ -28,9 +28,9 @@ class RequestIn(BaseModel):
) )
async def sign_out( async def sign_out(
item: RequestIn, item: RequestIn,
current_user: dict = Depends(token_manager.get_current_user), current_user: CurrentUser = Depends(token_manager.get_current_user),
): ):
user_id = current_user.get("id") user_id = current_user.user_id
if not user_id: if not user_id:
raise HTTPException( raise HTTPException(

View File

@ -6,7 +6,6 @@ from typing import List, Optional
from backend.models.permission.constants import DefaultPermissionEnum from backend.models.permission.constants import DefaultPermissionEnum
from backend.services.user.user_management_service import UserManagementService from backend.services.user.user_management_service import UserManagementService
from common.token.token_manager import TokenManager from common.token.token_manager import TokenManager
from common.utils.jwt_token import has_all_permissions
router = APIRouter() router = APIRouter()
token_manager = TokenManager() token_manager = TokenManager()
@ -32,7 +31,7 @@ class UserRoleResponse(BaseModel):
) )
async def assign_roles_to_user( async def assign_roles_to_user(
req: AssignRolesRequest, req: AssignRolesRequest,
_: bool = Depends(has_all_permissions([DefaultPermissionEnum.ASSIGN_ROLES.value.permission_key])), _: bool = Depends(token_manager.has_all_permissions([DefaultPermissionEnum.ASSIGN_ROLES.value.permission_key])),
) -> UserRoleResponse: ) -> UserRoleResponse:
doc = await user_management_service.assign_roles_to_user(req.user_id, req.role_ids) doc = await user_management_service.assign_roles_to_user(req.user_id, req.role_ids)
return UserRoleResponse(**doc.dict()) return UserRoleResponse(**doc.dict())