feat(role_management): Update the JWT token generation logic to include new fields user_roles and user_permissions in the payload.

This commit is contained in:
icecheng 2025-07-21 13:09:33 +08:00
parent 419e58da0c
commit 4109018692
6 changed files with 120 additions and 34 deletions

View File

@ -1,4 +1,8 @@
from typing import Optional, Tuple
from typing import Optional, Tuple, List
from backend.services.permission.permission_service import PermissionService
from backend.services.permission.role_service import RoleService
from common.constants.region import UserRegion
from common.log.log_utils import log_entry_exit_async
from backend.business.signin_manager import SignInManager
from backend.models.user.constants import UserLoginAction
@ -13,10 +17,28 @@ class SignInHub:
@log_entry_exit_async
async def signin_with_email_and_code(
self, email: str, code: str, host: str, time_zone: Optional[str] = "UTC"
) -> Tuple[int, Optional[int], Optional[str], Optional[str]]:
self, email: str, code: str, host: str, time_zone: Optional[str] = "UTC"
) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[str], Optional[UserRegion], Optional[List[str]],
Optional[List[str]]]:
"""
Interacts with the business layer to handle the sign-in process with email and code.
Try to signin with email and code.
create a new user account, if the email address has never been used before.
Args:
email (str): email address
code (str): auth code to be verified
host (str): the host address by which the client access the frontend service
time_zone (Optional[str]): time zone of the frontend service
Returns:
[int, Optional[int], Optional[str], Optional[str]]:
- int: UserLoginAction
- Optional[int]: user role
- Optional[str]: user_id
- Optional[str]: flid
- Optional[str]: region
- Optional[str]: user role names
- Optional[str]: user permission keys
"""
return await self.signin_manager.signin_with_email_and_code(
email=email, code=code, host=host, time_zone=time_zone
@ -25,7 +47,7 @@ class SignInHub:
@log_entry_exit_async
async def signin_with_email_and_password(
self, email: str, password: str
) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[str]]:
) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[str], Optional[List[str]], Optional[List[str]]]:
"""Try to signin with email and password.
Args:
@ -38,6 +60,8 @@ class SignInHub:
- Optional[int]: user role
- Optional[str]: user_id
- Optional[str]: flid
- Optional[List[str]]: user role names
- Optional[List[str]]: user permission keys
"""
return await self.signin_manager.signin_with_email_and_password(
email=email, password=password

View File

@ -1,9 +1,11 @@
import random
from typing import Tuple, Optional
from typing import Tuple, Optional, List
from backend.services.auth.user_auth_service import UserAuthService
from common.constants.region import UserRegion
from common.utils.region import RegionHandler
from backend.models.user.constants import (
UserLoginAction,
NewUserMethod,
)
from backend.models.user.constants import UserLoginAction
@ -36,7 +38,7 @@ class SignInManager:
async def signin_with_email_and_code(
self, email: str, code: str, host: str, time_zone: Optional[str] = "UTC"
) -> Tuple[int, Optional[int], Optional[str], Optional[str]]:
) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[UserRegion], Optional[str], Optional[List[str]], Optional[List[str]]]:
"""Try to signin with email and code.
create a new user account, if the email address has never been used before.
@ -44,6 +46,7 @@ class SignInManager:
email (str): email address
code (str): auth code to be verified
host (str): the host address by which the client access the frontend service
time_zone (str, optional): timezone of the frontend service
Returns:
[int, Optional[int], Optional[str], Optional[str]]:
@ -51,6 +54,9 @@ class SignInManager:
- Optional[int]: user role
- Optional[str]: user_id
- Optional[str]: flid
- Optional[str]: region
- Optional[str]: user role names
- Optional[str]: user permission keys
"""
# check if the user account exist
user_id = await self.user_auth_service.get_user_id_by_email(email)
@ -67,7 +73,6 @@ class SignInManager:
method=NewUserMethod.EMAIL, region=preferred_region
)
)
user_id = str(user_account.id)
await self.user_management_service.initialize_new_user_data(
user_id=str(user_account.id),
@ -80,6 +85,9 @@ class SignInManager:
user_account = await self.user_management_service.get_account_by_id(
user_id=user_id
)
role_names, permission_keys = await self.user_management_service.get_role_and_permission_by_user_id(
user_id=user_id
)
if await self.user_auth_service.is_flid_reset_required(user_id):
return (
UserLoginAction.REVIEW_AND_REVISE_FLID,
@ -87,10 +95,11 @@ class SignInManager:
user_id,
email.split("@")[0],
preferred_region,
role_names,
permission_keys,
)
user_flid = await self.user_auth_service.get_user_flid(user_id)
if await self.user_auth_service.is_password_reset_required(user_id):
return (
UserLoginAction.NEW_USER_SET_PASSWORD,
@ -98,25 +107,28 @@ class SignInManager:
user_id,
user_flid,
preferred_region,
role_names,
permission_keys,
)
return (
UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED,
user_account.user_role,
user_id,
user_flid,
preferred_region,
role_names,
permission_keys,
)
else:
await self.module_logger.log_warning(
warning="The auth code is invalid.",
properties={"email": email, "code": code},
)
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None, None
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None, None, None, None
async def signin_with_email_and_password(
self, email: str, password: str
) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[str]]:
) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[str], Optional[List[str]], Optional[List[str]]]:
# check if the user account exist
user_id = await self.user_auth_service.get_user_id_by_email(email)
@ -126,16 +138,18 @@ class SignInManager:
if is_new_user:
# cannot find the email address
return [UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None]
return (UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None, None, None)
else:
if await self.user_auth_service.is_password_reset_required(user_id):
# password hasn't been set before, save password for the user
return [
return (
UserLoginAction.NEW_USER_SET_PASSWORD,
None,
None,
None,
]
None,
None
)
else:
if await self.user_auth_service.verify_user_with_password(
user_id, password
@ -143,33 +157,39 @@ class SignInManager:
user_account = await self.user_management_service.get_account_by_id(
user_id=user_id
)
role_names, permission_keys = await self.user_management_service.get_role_and_permission_by_user_id(user_id)
if await self.user_auth_service.is_flid_reset_required(user_id):
return [
return (
UserLoginAction.REVIEW_AND_REVISE_FLID,
user_account.user_role,
user_id,
email.split("@")[0],
]
role_names,
permission_keys,
)
user_flid = await self.user_auth_service.get_user_flid(user_id)
# password verification passed
return [
return (
UserLoginAction.USER_SIGNED_IN,
user_account.user_role,
user_id,
user_flid,
]
role_names,
permission_keys
)
else:
# ask user to input password again.
# TODO: we need to limit times of user to input the wrong password
return [
return (
UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED,
None,
None,
None,
]
None,
None
)
async def update_new_user_flid(
self, user_id: str, user_flid: str
@ -192,26 +212,26 @@ class SignInManager:
"code_depot_email": code_depot_email,
},
)
return [
return (
UserLoginAction.REVIEW_AND_REVISE_FLID,
"{}{}".format(user_flid, random.randint(100, 999)),
]
)
await self.user_auth_service.update_flid(user_id, user_flid)
if await self.user_auth_service.is_password_reset_required(user_id):
return [
return (
UserLoginAction.NEW_USER_SET_PASSWORD,
user_flid,
]
)
else:
return [
return (
UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED,
user_flid,
]
)
else:
return [
return (
UserLoginAction.REVIEW_AND_REVISE_FLID,
"{}{}".format(user_flid, random.randint(100, 999)),
]
)
async def try_signin_with_email(self, email: str, host: str) -> UserLoginAction:
"""try signin through email, generate auth code and send to the email address

View File

@ -1,6 +1,6 @@
from typing import Optional, List
from fastapi.exceptions import RequestValidationError
from backend.models.permission.models import RoleDoc, UserRoleDoc
from backend.models.permission.models import RoleDoc, UserRoleDoc, PermissionDoc
from beanie import PydanticObjectId
@ -38,3 +38,28 @@ class UserRoleHandler:
)
await user_role_doc.insert()
return user_role_doc
async def get_role_and_permission_by_user_id(self, user_id: str) -> tuple[list[str], list[str]]:
"""Get all role names and permission keys for a user by user_id"""
# Query user roles
user_role_doc = await UserRoleDoc.find_one(UserRoleDoc.user_id == user_id)
if not user_role_doc or not user_role_doc.role_ids:
# No roles assigned
return [], []
# Query all roles by role_ids
roles = await RoleDoc.find({"_id": {"$in": [PydanticObjectId(rid) for rid in user_role_doc.role_ids]}}).to_list()
role_names = [role.role_name for role in roles]
# Collect all permission_ids from all roles
all_permission_ids = []
for role in roles:
if role.permission_ids:
all_permission_ids.extend(role.permission_ids)
# Remove duplicates
unique_permission_ids = list(dict.fromkeys(all_permission_ids))
# Query all permissions by permission_ids
if unique_permission_ids:
permissions = await PermissionDoc.find({"_id": {"$in": [PydanticObjectId(pid) for pid in unique_permission_ids]}}).to_list()
permission_keys = [perm.permission_key for perm in permissions]
else:
permission_keys = []
return role_names, permission_keys

View File

@ -1,6 +1,6 @@
from backend.models.permission.models import UserRoleDoc
from common.log.module_logger import ModuleLogger
from typing import Optional, List
from typing import Optional, List, Tuple
from backend.models.user.constants import (
NewUserMethod,
@ -106,3 +106,12 @@ class UserManagementService:
async def assign_roles_to_user(self, user_id: str, role_ids: List[str]) -> UserRoleDoc:
"""Assign roles to a user by updating or creating the UserRoleDoc"""
return await self.user_role_handler.assign_roles_to_user(user_id, role_ids)
async def get_role_and_permission_by_user_id(self, user_id: str) -> Tuple[List[str], List[str]]:
"""Get user role names and permission keys by user id
Args:
user_id (str): user id
Returns:
Tuple[List[str], List[str]]: user role names and permission keys
"""
return await self.user_role_handler.get_role_and_permission_by_user_id(user_id)

View File

@ -58,6 +58,8 @@ async def signin_with_email_and_code(item: RequestIn) -> ResponseOut:
identity,
flid,
preferred_region,
user_role_names,
user_permission_keys
) = await SignInHub().signin_with_email_and_code(
item.email, item.code, item.host, item.time_zone
)
@ -67,7 +69,7 @@ async def signin_with_email_and_code(item: RequestIn) -> ResponseOut:
)
if signed_in and identity and adminstrative_role:
subject = {"id": identity, "role": adminstrative_role}
subject = {"id": identity, "role": adminstrative_role, "role_names": user_role_names, "user_permissions": user_permission_keys}
access_token = token_manager.create_access_token(subject=subject)
refresh_token = token_manager.create_refresh_token(subject=subject)
expires_in = datetime.now(timezone.utc) + timedelta(
@ -85,6 +87,8 @@ async def signin_with_email_and_code(item: RequestIn) -> ResponseOut:
"identity": identity,
"expires_in": expires_in,
"role": adminstrative_role,
"role_names": user_role_names,
"user_permissions": user_permission_keys,
"flid": flid,
"preferred_region": preferred_region,
}

View File

@ -57,6 +57,8 @@ async def signin_with_email_and_password(
adminstrative_role,
identity,
flid,
user_role_names,
user_permission_keys
) = await SignInHub().signin_with_email_and_password(item.email, item.password)
logging.debug(
@ -64,7 +66,7 @@ async def signin_with_email_and_password(
)
if signed_in and adminstrative_role and identity:
subject = {"id": identity, "role": adminstrative_role}
subject = {"id": identity, "role": adminstrative_role, "role_names": user_role_names, "user_permissions": user_permission_keys}
access_token = token_manager.create_access_token(subject=subject)
refresh_token = token_manager.create_refresh_token(subject=subject)
expires_in = datetime.now(timezone.utc) + timedelta(
@ -82,6 +84,8 @@ async def signin_with_email_and_password(
"identity": identity,
"expires_in": expires_in,
"role": adminstrative_role,
"role_names": user_role_names,
"user_permissions": user_permission_keys,
"flid": flid,
}
return JSONResponse(content=jsonable_encoder(result))