import random from typing import Tuple, Optional from backend.services.auth.user_auth_service import UserAuthService from common.utils.region import RegionHandler from backend.models.user.constants import ( UserLoginAction, NewUserMethod, ) from backend.models.user.constants import UserLoginAction from backend.services.user.user_management_service import ( UserManagementService, ) from backend.services.code_depot.code_depot_service import ( CodeDepotService, ) from common.log.module_logger import ModuleLogger from common.utils.string import check_password_complexity from common.exception.exceptions import InvalidDataError from backend.services.notification.notification_service import ( NotificationService, ) from backend.models.user.constants import ( AuthType, ) from common.config.app_settings import app_settings class SignInManager: def __init__(self): self.user_auth_service = UserAuthService() self.region_handler = RegionHandler() self.user_management_service = UserManagementService() self.module_logger = ModuleLogger(sender_id=SignInManager) self.code_depot_service = CodeDepotService() self.notification_service = NotificationService() async def signin_with_email_and_code( self, email: str, code: str, host: str, time_zone: Optional[str] = "UTC" ) -> Tuple[int, Optional[int], Optional[str], Optional[str]]: """Try to signin with email and code. create a new user account, if the email address has never been used before. Args: email (str): email address code (str): auth code to be verified host (str): the host address by which the client access the frontend service Returns: [int, Optional[int], Optional[str], Optional[str]]: - int: UserLoginAction - Optional[int]: user role - Optional[str]: user_id - Optional[str]: flid """ # check if the user account exist user_id = await self.user_auth_service.get_user_id_by_email(email) # if it cannot find user account according to the email address, new user is_new_user = user_id is None preferred_region = self.region_handler.detect_from_host(host) # verify the email through auth code if await self.user_auth_service.verify_email_with_code(email, code): if is_new_user: user_account = ( await self.user_management_service.create_new_user_account( method=NewUserMethod.EMAIL, region=preferred_region ) ) user_id = str(user_account.id) await self.user_management_service.initialize_new_user_data( user_id=str(user_account.id), method=NewUserMethod.EMAIL, email_address=email, region=preferred_region, time_zone=time_zone, ) user_account = await self.user_management_service.get_account_by_id( user_id=user_id ) if await self.user_auth_service.is_flid_reset_required(user_id): return ( UserLoginAction.REVIEW_AND_REVISE_FLID, user_account.user_role, user_id, email.split("@")[0], preferred_region, ) user_flid = await self.user_auth_service.get_user_flid(user_id) if await self.user_auth_service.is_password_reset_required(user_id): return ( UserLoginAction.NEW_USER_SET_PASSWORD, user_account.user_role, user_id, user_flid, preferred_region, ) return ( UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED, user_account.user_role, user_id, user_flid, preferred_region, ) else: await self.module_logger.log_warning( warning="The auth code is invalid.", properties={"email": email, "code": code}, ) return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None, None async def signin_with_email_and_password( self, email: str, password: str ) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[str]]: # check if the user account exist user_id = await self.user_auth_service.get_user_id_by_email(email) # if it cannot find user account according to the email address, new user is_new_user = user_id is None if is_new_user: # cannot find the email address return [UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None] else: if await self.user_auth_service.is_password_reset_required(user_id): # password hasn't been set before, save password for the user return [ UserLoginAction.NEW_USER_SET_PASSWORD, None, None, None, ] else: if await self.user_auth_service.verify_user_with_password( user_id, password ): user_account = await self.user_management_service.get_account_by_id( user_id=user_id ) if await self.user_auth_service.is_flid_reset_required(user_id): return [ UserLoginAction.REVIEW_AND_REVISE_FLID, user_account.user_role, user_id, email.split("@")[0], ] user_flid = await self.user_auth_service.get_user_flid(user_id) # password verification passed return [ UserLoginAction.USER_SIGNED_IN, user_account.user_role, user_id, user_flid, ] else: # ask user to input password again. # TODO: we need to limit times of user to input the wrong password return [ UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED, None, None, None, ] async def update_new_user_flid( self, user_id: str, user_flid: str ) -> Tuple[UserLoginAction, Optional[str]]: if await self.user_auth_service.is_flid_available(user_flid): code_depot_email = "{}@freeleaps.com".format(user_flid) result = await self.code_depot_service.create_depot_user( user_flid, user_id, code_depot_email ) if not result: await self.module_logger.log_error( error="Failed to create depot user for {} with flid {} and email {}".format( user_id, user_flid, code_depot_email ), properties={ "user_id": user_id, "user_flid": user_flid, "code_depot_email": code_depot_email, }, ) return [ UserLoginAction.REVIEW_AND_REVISE_FLID, "{}{}".format(user_flid, random.randint(100, 999)), ] await self.user_auth_service.update_flid(user_id, user_flid) if await self.user_auth_service.is_password_reset_required(user_id): return [ UserLoginAction.NEW_USER_SET_PASSWORD, user_flid, ] else: return [ UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED, user_flid, ] else: return [ UserLoginAction.REVIEW_AND_REVISE_FLID, "{}{}".format(user_flid, random.randint(100, 999)), ] async def try_signin_with_email(self, email: str, host: str) -> UserLoginAction: """try signin through email, generate auth code and send to the email address Args: email (str): email address host (str): host url that user tried to sign in Returns: int: UserLoginAction """ user_id = await self.user_auth_service.get_user_id_by_email(email) is_password_reset_required = False if user_id: is_password_reset_required = ( await self.user_auth_service.is_password_reset_required(user_id) ) if user_id is None or is_password_reset_required: # send auth code through email if the email address # hasn't been associated with any account. # Or if the user's password is empty, which means the user's pasword hasn't been set. mail_code = await self.user_auth_service.generate_auth_code_for_object( email, AuthType.EMAIL ) await self.notification_service.send_notification( sender_id=app_settings.SYSTEM_USER_ID, channels=["2"], # 2 maps to email in NotificationChannel receiver_id=email, subject="email", event="authentication", properties={"auth_code": mail_code}, # TODO: reconsider necessity of adding region info here # region=RegionHandler().detect_from_host(host), ) return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE else: return UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED async def reset_password_through_email(self, email: str, host: str) -> int: """verify the email is exisitng, clear the existing password, generate auth code and send to the email address so in the following steps, the user can reset their password. Args: email (str): email address host (str): host that user will perform the reset on Returns: int: UserLoginAction """ user_id = await self.user_auth_service.get_user_id_by_email(email) if user_id is not None: # send auth code through email if the email address # hasn been associated with any account. mail_code = await self.user_auth_service.generate_auth_code_for_object( "email", AuthType.EMAIL ) await self.notification_service.send_notification( sender_id=app_settings.SYSTEM_USER_ID, channels=["2"], # 2 maps to email in NotificationChannel receiver_id=email, subject="email", event="authentication", properties={"auth_code": mail_code}, ) await self.user_auth_service.reset_password(user_id) return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE else: return UserLoginAction.EMAIL_NOT_ASSOCIATED_WITH_USER async def update_user_password(self, user_id: str, password: str) -> dict[str, any]: error_message = """ Password does not pass complexity requirements: - At least one lowercase character - At least one uppercase character - At least one digit - At least one special character (punctuation, brackets, quotes, etc.) """ if not check_password_complexity(password): raise InvalidDataError(error_message) user_flid = await self.user_auth_service.get_user_flid(user_id) await self.user_auth_service.save_password_auth_method( user_id, user_flid, password ) return {"succeeded": True} async def send_email_code(self, sender_id: str, email: str) -> bool: mail_code = await self.user_auth_service.generate_auth_code_for_object( email, AuthType.EMAIL ) success = await self.notification_service.send_notification( sender_id=sender_id, channels=["email"], receiver_id=email, subject="email", event="authentication", properties={"auth_code": mail_code}, ) return success async def send_mobile_code(self, sender_id: str, mobile: str) -> bool: mail_code = await self.user_auth_service.generate_auth_code_for_object( mobile, AuthType.MOBILE ) success = await self.notification_service.send_notification( sender_id=sender_id, channels=["email"], receiver_id=mobile, subject="mobile", event="authentication", properties={"auth_code": mail_code}, ) return success