freeleaps-service-hub/app/authentication/backend/infra/auth/user_auth_handler.py

350 lines
10 KiB
Python

import bcrypt
from datetime import datetime, timedelta, timezone
from typing import Optional
from infra.utils.string import generate_auth_code
from app.authentication.backend.infra.code_management.depot_handler import (
CodeDepotHandler,
)
from app.authentication.backend.models.user.constants import (
AuthType,
)
from app.authentication.backend.models.user.models import (
AuthCodeDoc,
UserEmailDoc,
UserMobileDoc,
UserPasswordDoc,
)
from app.authentication.backend.models.user_profile.models import BasicProfileDoc
class UserAuthHandler:
def __init__(self):
self.code_depot_manager = CodeDepotHandler()
async def verify_user_with_password(self, user_id: str, password: str) -> bool:
"""Verify user's password
Args:
user_id (str): user identity, _id in UserAccountDoc
password (str): password user provided, clear text
Returns:
bool: True if password is correct, else return False
"""
user_password = await UserPasswordDoc.find(
UserPasswordDoc.user_id == user_id
).first_or_none()
if user_password:
# password is reseted to empty string, cannot be verified
if user_password.password == "":
return False
if bcrypt.checkpw(
password.encode("utf-8"), user_password.password.encode("utf-8")
):
return True
else:
return False
else:
return False
async def get_user_password(self, user_id: str) -> Optional[str]:
"""Get user password through the user_id
Args:
user_id (str): user identity, _id in UserAccountDoc
Returns:
str: password hash
"""
user_password = await UserPasswordDoc.find(
UserPasswordDoc.user_id == user_id
).first_or_none()
if user_password is None:
return None
else:
return user_password.password
async def get_user_email(self, user_id: str) -> Optional[str]:
"""get user email through the user_id
Args:
user_id (str): user identity, _id in UserAccountDoc
Returns:
str: email address
"""
user_email = await UserEmailDoc.find(
UserEmailDoc.user_id == user_id
).first_or_none()
if user_email is None:
return None
else:
return user_email.email
async def get_user_id_by_email(self, email: str) -> Optional[str]:
"""get user id through email from user_email doc
Args:
email (str): email address, compare email address in lowercase
Returns:
Optional[str]: user_id or None
"""
user_email = await UserEmailDoc.find(
UserEmailDoc.email == email.lower()
).first_or_none()
if user_email is None:
return None
else:
return user_email.user_id
def user_sign_out(self, token):
pass
async def generate_auth_code_for_email(self, email: str) -> str:
"""send auth code to email address
Args:
email (str): email address
"""
auth_code = generate_auth_code()
expiry = datetime.now(timezone.utc) + timedelta(minutes=5)
auth_code_doc = AuthCodeDoc(
auth_code=auth_code,
method=email.lower(),
method_type=AuthType.EMAIL,
expiry=expiry,
)
await auth_code_doc.create()
return auth_code
async def verify_email_code(self, email: str, code: str) -> bool:
"""sign in with email and code
Args:
email (str): email address
code (str): auth code to be verified
Returns:
bool: True if code is valid, False otherwise
"""
result = await AuthCodeDoc.find(
AuthCodeDoc.method == email.lower(),
AuthCodeDoc.auth_code == code,
AuthCodeDoc.expiry > datetime.now(timezone.utc),
AuthCodeDoc.method_type == AuthType.EMAIL,
).first_or_none()
if result:
return True
else:
return False
async def get_user_mobile(self, user_id: str) -> Optional[str]:
"""get user mobile number through the user_id
Args:
user_id (str): user identity, _id in UserAccountDoc
Returns:
str: mobile number
"""
user_mobile = await UserMobileDoc.find(
UserMobileDoc.user_id == user_id
).first_or_none()
if user_mobile is None:
return None
else:
return user_mobile.mobile
async def generate_auth_code_for_mobile(self, mobile: str) -> str:
"""send auth code to mobile number
Args:
mobile (str): mobile number
"""
auth_code = generate_auth_code()
expiry = datetime.now(timezone.utc) + timedelta(minutes=5)
auth_code_doc = AuthCodeDoc(
auth_code=auth_code,
method=mobile.lower(),
method_type=AuthType.MOBILE,
expiry=expiry,
)
await auth_code_doc.create()
return auth_code
async def verify_mobile_with_code(self, mobile, code):
"""sign in with mobile and code
Args:
mobile (str): mobile number
code (str): auth code to be verified
Returns:
bool: True if code is valid, False otherwise
"""
result = await AuthCodeDoc.find(
AuthCodeDoc.method == mobile.lower(),
AuthCodeDoc.auth_code == code,
AuthCodeDoc.expiry > datetime.now(timezone.utc),
AuthCodeDoc.method_type == AuthType.MOBILE,
).first_or_none()
if result:
return True
else:
return False
async def save_email_auth_method(self, user_id: str, email: str):
"""save email auth method to user_email doc
Args:
user_id (str): user id
email (str): email address
"""
user_email = await UserEmailDoc.find(
UserEmailDoc.user_id == user_id
).first_or_none()
if user_email is None:
new_user_email = UserEmailDoc(user_id=user_id, email=email.lower())
await new_user_email.create()
else:
user_email.email = email.lower()
await user_email.save()
async def save_password_auth_method(self, user_id: str, user_flid, password: str):
"""save password auth method to user_password doc
Args:
user_id (str): user id
password (str): user password
"""
password_hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
user_password = await UserPasswordDoc.find(
UserPasswordDoc.user_id == user_id
).first_or_none()
if user_password is None:
new_user_password = UserPasswordDoc(
user_id=user_id, password=password_hashed
)
await new_user_password.create()
else:
user_password.password = password_hashed
await user_password.save()
result = await self.code_depot_manager.update_depot_user_password(
user_flid, password
)
if not result:
raise Exception("Failed to update user password in code depot")
async def reset_password(self, user_id: str):
"""clean password auth method from user_password doc
Args:
user_id (str): user id
"""
user_password = await UserPasswordDoc.find(
UserPasswordDoc.user_id == user_id
).first_or_none()
if user_password:
user_password.password = ""
await user_password.save()
else:
raise Exception("User password was not set before.")
async def is_password_reset_required(self, user_id: str) -> bool:
"""check if password is required for the user
Args:
user_id (str): user id
Returns:
bool: True if password is required, False otherwise
"""
user_password = await UserPasswordDoc.find(
UserPasswordDoc.user_id == user_id
).first_or_none()
if user_password:
return user_password.password == ""
else:
return True
async def is_flid_reset_required(self, user_id: str) -> bool:
basic_profile = await BasicProfileDoc.find_one(
BasicProfileDoc.user_id == user_id
)
if basic_profile:
return basic_profile.FLID.update_time == basic_profile.FLID.create_time
async def is_flid_available(self, user_flid: str) -> bool:
basic_profile = await BasicProfileDoc.find_one(
BasicProfileDoc.FLID.identity == user_flid
)
if basic_profile:
return False
else:
return True
async def get_flid(self, user_id: str) -> str:
basic_profile = await BasicProfileDoc.find_one(
BasicProfileDoc.user_id == user_id
)
if basic_profile:
return basic_profile.FLID.identity
else:
return None
async def update_flid(self, user_id: str, flid: str) -> bool:
basic_profile = await BasicProfileDoc.find_one(
BasicProfileDoc.user_id == user_id
)
if basic_profile:
basic_profile.FLID.identity = flid
basic_profile.FLID.update_time = datetime.now(timezone.utc)
basic_profile.FLID.set_by = user_id
await basic_profile.save()
return True
else:
return False
async def generate_auth_code(self, deliver_object: str, auth_type: AuthType) -> str:
"""send auth code to email address
Args:
deliver_object (str): email address, mobile, etc
auth_type (str): authentication type
"""
auth_code = generate_auth_code()
expiry = datetime.now(timezone.utc) + timedelta(minutes=5)
auth_code_doc = AuthCodeDoc(
auth_code=auth_code,
method=deliver_object.lower(),
method_type=auth_type,
expiry=expiry,
)
await auth_code_doc.create()
return auth_code