import bcrypt from datetime import datetime, timedelta, timezone from typing import Optional from common.utils.string import generate_auth_code from backend.infra.code_management.depot_handler import ( CodeDepotHandler, ) from backend.models.user.constants import ( AuthType, ) from backend.models.user.models import ( AuthCodeDoc, UserEmailDoc, UserMobileDoc, UserPasswordDoc, ) from backend.models.user_profile.models import BasicProfileDoc class UserAuthHandler: def __init__(self): self.code_depot_manager = CodeDepotHandler() async def verify_user_with_password(self, user_id: str, password: str) -> bool: """Verify user's password Args: user_id (str): user identity, _id in UserAccountDoc password (str): password user provided, clear text Returns: bool: True if password is correct, else return False """ user_password = await UserPasswordDoc.find( UserPasswordDoc.user_id == user_id ).first_or_none() if user_password: # password is reseted to empty string, cannot be verified if user_password.password == "": return False if bcrypt.checkpw( password.encode("utf-8"), user_password.password.encode("utf-8") ): return True else: return False else: return False async def get_user_password(self, user_id: str) -> Optional[str]: """Get user password through the user_id Args: user_id (str): user identity, _id in UserAccountDoc Returns: str: password hash """ user_password = await UserPasswordDoc.find( UserPasswordDoc.user_id == user_id ).first_or_none() if user_password is None: return None else: return user_password.password async def get_user_email(self, user_id: str) -> Optional[str]: """get user email through the user_id Args: user_id (str): user identity, _id in UserAccountDoc Returns: str: email address """ user_email = await UserEmailDoc.find( UserEmailDoc.user_id == user_id ).first_or_none() if user_email is None: return None else: return user_email.email async def get_user_id_by_email(self, email: str) -> Optional[str]: """get user id through email from user_email doc Args: email (str): email address, compare email address in lowercase Returns: Optional[str]: user_id or None """ user_email = await UserEmailDoc.find( UserEmailDoc.email == email.lower() ).first_or_none() if user_email is None: return None else: return user_email.user_id def user_sign_out(self, token): pass async def generate_auth_code_for_email(self, email: str) -> str: """send auth code to email address Args: email (str): email address """ auth_code = generate_auth_code() expiry = datetime.now(timezone.utc) + timedelta(minutes=5) auth_code_doc = AuthCodeDoc( auth_code=auth_code, method=email.lower(), method_type=AuthType.EMAIL, expiry=expiry, ) await auth_code_doc.create() return auth_code async def verify_email_code(self, email: str, code: str) -> bool: """sign in with email and code Args: email (str): email address code (str): auth code to be verified Returns: bool: True if code is valid, False otherwise """ result = await AuthCodeDoc.find( AuthCodeDoc.method == email.lower(), AuthCodeDoc.auth_code == code, AuthCodeDoc.expiry > datetime.now(timezone.utc), AuthCodeDoc.method_type == AuthType.EMAIL, ).first_or_none() if result: return True else: return False async def get_user_mobile(self, user_id: str) -> Optional[str]: """get user mobile number through the user_id Args: user_id (str): user identity, _id in UserAccountDoc Returns: str: mobile number """ user_mobile = await UserMobileDoc.find( UserMobileDoc.user_id == user_id ).first_or_none() if user_mobile is None: return None else: return user_mobile.mobile async def generate_auth_code_for_mobile(self, mobile: str) -> str: """send auth code to mobile number Args: mobile (str): mobile number """ auth_code = generate_auth_code() expiry = datetime.now(timezone.utc) + timedelta(minutes=5) auth_code_doc = AuthCodeDoc( auth_code=auth_code, method=mobile.lower(), method_type=AuthType.MOBILE, expiry=expiry, ) await auth_code_doc.create() return auth_code async def verify_mobile_with_code(self, mobile, code): """sign in with mobile and code Args: mobile (str): mobile number code (str): auth code to be verified Returns: bool: True if code is valid, False otherwise """ result = await AuthCodeDoc.find( AuthCodeDoc.method == mobile.lower(), AuthCodeDoc.auth_code == code, AuthCodeDoc.expiry > datetime.now(timezone.utc), AuthCodeDoc.method_type == AuthType.MOBILE, ).first_or_none() if result: return True else: return False async def save_email_auth_method(self, user_id: str, email: str): """save email auth method to user_email doc Args: user_id (str): user id email (str): email address """ user_email = await UserEmailDoc.find( UserEmailDoc.user_id == user_id ).first_or_none() if user_email is None: new_user_email = UserEmailDoc(user_id=user_id, email=email.lower()) await new_user_email.create() else: user_email.email = email.lower() await user_email.save() async def save_password_auth_method(self, user_id: str, user_flid, password: str): """save password auth method to user_password doc Args: user_id (str): user id password (str): user password """ password_hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()) user_password = await UserPasswordDoc.find( UserPasswordDoc.user_id == user_id ).first_or_none() if user_password is None: new_user_password = UserPasswordDoc( user_id=user_id, password=password_hashed ) await new_user_password.create() else: user_password.password = password_hashed await user_password.save() result = await self.code_depot_manager.update_depot_user_password( user_flid, password ) if not result: raise Exception("Failed to update user password in code depot") async def reset_password(self, user_id: str): """clean password auth method from user_password doc Args: user_id (str): user id """ user_password = await UserPasswordDoc.find( UserPasswordDoc.user_id == user_id ).first_or_none() if user_password: user_password.password = "" await user_password.save() else: raise Exception("User password was not set before.") async def is_password_reset_required(self, user_id: str) -> bool: """check if password is required for the user Args: user_id (str): user id Returns: bool: True if password is required, False otherwise """ user_password = await UserPasswordDoc.find( UserPasswordDoc.user_id == user_id ).first_or_none() if user_password: return user_password.password == "" else: return True async def is_flid_reset_required(self, user_id: str) -> bool: basic_profile = await BasicProfileDoc.find_one( BasicProfileDoc.user_id == user_id ) if basic_profile: return basic_profile.FLID.update_time == basic_profile.FLID.create_time async def is_flid_available(self, user_flid: str) -> bool: basic_profile = await BasicProfileDoc.find_one( BasicProfileDoc.FLID.identity == user_flid ) if basic_profile: return False else: return True async def get_flid(self, user_id: str) -> str: basic_profile = await BasicProfileDoc.find_one( BasicProfileDoc.user_id == user_id ) if basic_profile: return basic_profile.FLID.identity else: return None async def update_flid(self, user_id: str, flid: str) -> bool: basic_profile = await BasicProfileDoc.find_one( BasicProfileDoc.user_id == user_id ) if basic_profile: basic_profile.FLID.identity = flid basic_profile.FLID.update_time = datetime.now(timezone.utc) basic_profile.FLID.set_by = user_id await basic_profile.save() return True else: return False async def generate_auth_code(self, deliver_object: str, auth_type: AuthType) -> str: """send auth code to email address Args: deliver_object (str): email address, mobile, etc auth_type (str): authentication type """ auth_code = generate_auth_code() expiry = datetime.now(timezone.utc) + timedelta(minutes=5) auth_code_doc = AuthCodeDoc( auth_code=auth_code, method=deliver_object.lower(), method_type=auth_type, expiry=expiry, ) await auth_code_doc.create() return auth_code