feat: add new apis for Magicleaps product use

This commit is contained in:
sunhaolou 2025-08-25 17:31:16 +08:00
parent 4b8be6fd43
commit c200e70970
7 changed files with 199 additions and 1 deletions

View File

@ -79,6 +79,10 @@ class SignInHub:
async def try_signin_with_email(self, email: str, host: str) -> UserLoginAction: async def try_signin_with_email(self, email: str, host: str) -> UserLoginAction:
return await self.signin_manager.try_signin_with_email(email=email, host=host) return await self.signin_manager.try_signin_with_email(email=email, host=host)
@log_entry_exit_async
async def try_magicleaps_signin_with_email(self, email: str, host: str) -> UserLoginAction:
return await self.signin_manager.try_magicleaps_signin_with_email(email=email, host=host)
@log_entry_exit_async @log_entry_exit_async
async def reset_password_through_email(self, email: str, host: str) -> int: async def reset_password_through_email(self, email: str, host: str) -> int:
return await self.signin_manager.reset_password_through_email( return await self.signin_manager.reset_password_through_email(

View File

@ -276,6 +276,46 @@ class SignInManager:
else: else:
return UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED return UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED
async def try_magicleaps_signin_with_email(self, email: str, host: str) -> UserLoginAction:
"""try signin through email using MagicLeaps branding, generate auth code and send to the email address
Args:
email (str): email address
host (str): host url that user tried to sign in
Returns:
int: UserLoginAction
"""
user_id = await self.user_auth_service.get_user_id_by_email(email)
is_password_reset_required = False
if user_id:
is_password_reset_required = (
await self.user_auth_service.is_password_reset_required(user_id)
)
if user_id is None or is_password_reset_required:
# send auth code through email if the email address
# hasn't been associated with any account.
# Or if the user's password is empty, which means the user's pasword hasn't been set.
mail_code = await self.user_auth_service.generate_auth_code_for_object(
email, AuthType.EMAIL
)
await self.notification_service.send_notification(
sender_id=app_settings.SYSTEM_USER_ID,
channels=["2"], # 2 maps to email in NotificationChannel
receiver_id=email,
subject="email",
event="magicleaps_authentication", # Use the new event type
properties={"auth_code": mail_code},
# TODO: reconsider necessity of adding region info here
# region=RegionHandler().detect_from_host(host),
)
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE
else:
return UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED
async def reset_password_through_email(self, email: str, host: str) -> int: async def reset_password_through_email(self, email: str, host: str) -> int:
"""verify the email is exisitng, clear the existing password, """verify the email is exisitng, clear the existing password,
generate auth code and send to the email address generate auth code and send to the email address

View File

@ -1,7 +1,9 @@
from fastapi import APIRouter from fastapi import APIRouter
from .try_signin_with_email import router as ts_router from .try_signin_with_email import router as ts_router
from .try_magicleaps_signin_with_email import router as tms_router
from .signin_with_email_and_password import router as se_router from .signin_with_email_and_password import router as se_router
from .signin_with_email_and_code import router as sw_router from .signin_with_email_and_code import router as sw_router
from .signin_with_magicleaps_email_and_code import router as swm_router
from .update_user_password import router as up_router from .update_user_password import router as up_router
from .update_new_user_flid import router as uu_router from .update_new_user_flid import router as uu_router
from .reset_password_through_email import router as rp_router from .reset_password_through_email import router as rp_router
@ -11,7 +13,9 @@ from .sign_out import router as so_router
router = APIRouter() router = APIRouter()
router.include_router(ts_router, prefix="/signin", tags=["signin"]) router.include_router(ts_router, prefix="/signin", tags=["signin"])
router.include_router(tms_router, prefix="/signin", tags=["signin"])
router.include_router(sw_router, prefix="/signin", tags=["signin"]) router.include_router(sw_router, prefix="/signin", tags=["signin"])
router.include_router(swm_router, prefix="/signin", tags=["signin"])
router.include_router(up_router, prefix="/signin", tags=["signin"]) router.include_router(up_router, prefix="/signin", tags=["signin"])
router.include_router(se_router, prefix="/signin", tags=["signin"]) router.include_router(se_router, prefix="/signin", tags=["signin"])
router.include_router(so_router, prefix="/signin", tags=["signin"]) router.include_router(so_router, prefix="/signin", tags=["signin"])

View File

@ -0,0 +1,97 @@
import logging
from datetime import datetime, timezone, timedelta
from typing import Optional
from fastapi import APIRouter
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from backend.application.signin_hub import SignInHub
from common.constants.jwt_constants import USER_ROLE_NAMES, USER_PERMISSIONS
from common.token.token_manager import TokenManager
router = APIRouter()
token_manager = TokenManager()
# Web API
# signin_with_magicleaps_email_and_code
#
class RequestIn(BaseModel):
email: str
code: str
host: str
time_zone: Optional[str] = "UTC"
class ResponseOut(BaseModel):
# 1: succeeded
signin_result: int
# the access token for futhur communication with server
access_token: Optional[str] = None
# the refresh token for new access token generation
refresh_token: Optional[str] = None
# the identity of the signed in user
identity: Optional[str] = None
# the date time when the access toke will be expired
expires_in: Optional[datetime] = None
# the system assigned role of the user.
role: Optional[int] = None
# preferred region for user
preferred_region: Optional[str] = None
@router.post(
"/signin-with-magicleaps-email-and-code",
operation_id="user-signin-with-magicleaps-email-and-code",
summary="try to signin with email and authentication code using MagicLeaps branding",
description="client user is trying to sign in with their email and the authenication code \
the system sent to the email in previous step using MagicLeaps branding.",
response_model=ResponseOut,
)
async def signin_with_magicleaps_email_and_code(item: RequestIn) -> ResponseOut:
(
signed_in,
adminstrative_role,
identity,
flid,
preferred_region,
user_role_names,
user_permission_keys
) = await SignInHub().signin_with_email_and_code(
item.email, item.code, item.host, item.time_zone
)
logging.debug(
f"signedin={signed_in}, adminstrative_role={adminstrative_role}, identity={identity}"
)
if signed_in and identity and adminstrative_role:
subject = {"id": identity, "role": adminstrative_role, USER_ROLE_NAMES: user_role_names, USER_PERMISSIONS: user_permission_keys}
access_token = token_manager.create_access_token(subject=subject)
refresh_token = token_manager.create_refresh_token(subject=subject)
expires_in = datetime.now(timezone.utc) + timedelta(
minutes=token_manager.access_token_expire_minutes
)
else:
access_token = None
refresh_token = None
expires_in = None
result = {
"signin_result": signed_in,
"access_token": access_token,
"refresh_token": refresh_token,
"identity": identity,
"expires_in": expires_in,
"role": adminstrative_role,
USER_ROLE_NAMES: user_role_names,
USER_PERMISSIONS: user_permission_keys,
"flid": flid,
"preferred_region": preferred_region,
}
return JSONResponse(content=jsonable_encoder(result))

View File

@ -0,0 +1,37 @@
from backend.application.signin_hub import SignInHub
from pydantic import BaseModel
from fastapi import APIRouter
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
router = APIRouter()
# Web API
# try_magicleaps_signin_with_email
#
class UserSignWithEmailBody(BaseModel):
email: str
host: str
class UserSignWithEmailResponse(BaseModel):
signin_type: int
@router.post(
"/try-magicleaps-signin-with-email",
operation_id="user-try-magicleaps-signin-with-email",
summary="try to signin with email using MagicLeaps branding",
description="A client user is trying to sign in with their email using MagicLeaps branding. \
The system will determine to send an authentication code to the email \
or let the user use their FLID and password to sign in",
response_description="signin_type:0 meaning simplified(using email) signin, \
1 meaning standard(using FLID and password) signin",
)
async def try_magicleaps_signin_with_email(
item: UserSignWithEmailBody,
):
result = await SignInHub().try_magicleaps_signin_with_email(item.email, item.host)
return JSONResponse(content=jsonable_encoder(result))

View File

@ -100,7 +100,7 @@ class NotificationManager:
message_subject = notification_config["message_subject"] message_subject = notification_config["message_subject"]
# Handle authentication specific formatting # Handle authentication specific formatting
if event_lower == "authentication" and "auth_code" in properties: if (event_lower == "authentication" or event_lower == "magicleaps_authentication") and "auth_code" in properties:
message = message.format(properties["auth_code"]) message = message.format(properties["auth_code"])
# Append content_text if it exists in properties # Append content_text if it exists in properties

View File

@ -58,12 +58,20 @@ SystemNotifications = {
"authentication": { # Event "authentication": { # Event
"message_subject": "Freeleaps Support", "message_subject": "Freeleaps Support",
"message": "The auth code is: {} . \r\nDo not share this to anyone.", "message": "The auth code is: {} . \r\nDo not share this to anyone.",
},
"magicleaps_authentication": { # Event
"message_subject": "MagicLeaps Support",
"message": "The auth code is: {} . \r\nDo not share this to anyone.",
} }
}, },
"mobile": { # Subject "mobile": { # Subject
"authentication": { # Event "authentication": { # Event
"message_subject": "Freeleaps Support", "message_subject": "Freeleaps Support",
"message": "The auth code is: {} . \r\nDo not share this to anyone.", "message": "The auth code is: {} . \r\nDo not share this to anyone.",
},
"magicleaps_authentication": { # Event
"message_subject": "MagicLeaps Support",
"message": "The auth code is: {} . \r\nDo not share this to anyone.",
} }
}, },
"message": { # Subject "message": { # Subject
@ -112,12 +120,20 @@ SystemNotifications = {
"authentication": { # Event "authentication": { # Event
"message_subject": "自由跳跃技术支持", "message_subject": "自由跳跃技术支持",
"message": "安全认证码为: {} . \r\n切勿分享给他人", "message": "安全认证码为: {} . \r\n切勿分享给他人",
},
"magicleaps_authentication": { # Event
"message_subject": "MagicLeaps 技术支持",
"message": "安全认证码为: {} . \r\n切勿分享给他人",
} }
}, },
"mobile": { # Subject "mobile": { # Subject
"authentication": { # Event "authentication": { # Event
"message_subject": "自由跳跃技术支持", "message_subject": "自由跳跃技术支持",
"message": "安全认证码为: {} . \r\n切勿分享给他人", "message": "安全认证码为: {} . \r\n切勿分享给他人",
},
"magicleaps_authentication": { # Event
"message_subject": "MagicLeaps 技术支持",
"message": "安全认证码为: {} . \r\n切勿分享给他人",
} }
}, },
"message": { # Subject "message": { # Subject