from app.authentication.backend.models.constants import ( NewUserMethod, UserAccountProperty, UserLoginAction, ) from typing import Optional, Tuple from infra.i18n.region_handler import RegionHandler from infra.models.constants import UserRegion from infra.log.log_utils import log_entry_exit_async from app.authentication.backend.infra.user_management.user_auth_handler import ( UserAuthManager, ) from app.authentication.backend.business.signin_manager import SignInManager class SignInHub: def __init__(self) -> None: self.user_auth_manager = UserAuthManager() self.signin_manager = SignInManager() self.basic_profile_store = BasicProfileStore() self.provider_profile_store = ProviderProfileStore() self.code_depot_manager = CodeDepotManager() self.notification_center = NotificationCenter(sender_id=settings.SYSTEM_USER_ID) self.event_dispatcher = UserEventDispatcher(owner_id=settings.SYSTEM_USER_ID) self.module_logger = ModuleLogger(sender_id=UserManager) async def signin_with_email_and_code( self, email: str, code: str, host: str, time_zone: Optional[str] = "UTC" ) -> Tuple[int, Optional[int], Optional[str], Optional[str]]: """ Interacts with the business layer to handle the sign-in process with email and code. """ return await self.signin_manager.signin_with_email_and_code( email=email, code=code, host=host, time_zone=time_zone ) @log_entry_exit_async async def signin_with_email_and_code( self, email: str, code: str, host: str, time_zone: Optional[str] = "UTC" ) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[str]]: """Try to signin with email and code. create a new user account, if the email address has never been used before. Args: email (str): email address code (str): auth code to be verified host (str): the host address by which the client access the frontend service Returns: [int, Optional[int], Optional[str], Optional[str]]: - int: UserLoginAction - Optional[int]: user role - Optional[str]: user_id - Optional[str]: flid """ # Step 1: Verify the email and code try: user_id, is_new_user, preferred_region = ( await self.signin_manager.verify_email_with_code(email, code, host) ) except InvalidAuthCodeException: await self.logger.log_info( info="The auth code is invalid.", properties={"email": email, "code": code}, ) return [UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None, None] # Step 2: Handle new user creation if necessary # This will be moved to the Freeleaps if is_new_user: user_id = await self.user_service.create_new_user( NewUserMethod.EMAIL, preferred_region, email, time_zone ) await self.event_service.log_signup_event( user_id, email, preferred_region, time_zone ) # Step 3: Fetch user account and handle login actions user_account = await self.user_service.get_user_account(user_id) await self.event_service.log_user_login_event(user_id) # Step 4: Handle special actions (FLID reset, password reset, etc.) if await self.user_service.is_flid_reset_required(user_id): return [ UserLoginAction.REVIEW_AND_REVISE_FLID, user_account.user_role, user_id, email.split("@")[0], preferred_region, ] user_flid = await self.user_service.get_flid(user_id) if await self.auth_service.is_password_reset_required(user_id): return [ UserLoginAction.NEW_USER_SET_PASSWORD, user_account.user_role, user_id, user_flid, user_account.preferred_region, ] return [ UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED, user_account.user_role, user_id, user_flid, user_account.preferred_region, ] @log_entry_exit_async async def __create_new_user_account( self, method: NewUserMethod, region: UserRegion ) -> str: """create a new user account document in DB Args: method (NewUserMethod): the method the new user came from region : preferred user region detected via the user log-in website Returns: str: id of user account """ if NewUserMethod.EMAIL == method: user_account = UserAccountDoc( profile_id=None, account_id=None, service_plan_id=None, properties=int(UserAccountProperty.EMAIL_VERIFIED), capabilities=int(Capability.VISITOR), user_role=int(AdministrativeRole.PERSONAL), region=region, ) user_account = await user_account.create() elif NewUserMethod.MOBILE == method: user_account = UserAccountDoc( profile_id=None, account_id=None, service_plan_id=None, properties=int(UserAccountProperty.MOBILE_VERIFIED), capabilities=int(Capability.VISITOR), user_role=int(AdministrativeRole.PERSONAL), region=region, ) user_account = await user_account.create() # Create other doc in collections for the new user await UserAchievement(str(user_account.id)).create_activeness_achievement() return str(user_account.id)