350 lines
10 KiB
Python
350 lines
10 KiB
Python
import bcrypt
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
from infra.utils.string import generate_auth_code
|
|
from app.authentication.backend.infra.code_management.depot_handler import (
|
|
CodeDepotHandler,
|
|
)
|
|
from app.authentication.backend.models.user.constants import (
|
|
AuthType,
|
|
)
|
|
from app.authentication.backend.models.user.models import (
|
|
AuthCodeDoc,
|
|
UserEmailDoc,
|
|
UserMobileDoc,
|
|
UserPasswordDoc,
|
|
)
|
|
|
|
from app.authentication.backend.models.user_profile.models import BasicProfileDoc
|
|
|
|
|
|
class UserAuthHandler:
|
|
def __init__(self):
|
|
self.code_depot_manager = CodeDepotHandler()
|
|
|
|
async def verify_user_with_password(self, user_id: str, password: str) -> bool:
|
|
"""Verify user's password
|
|
Args:
|
|
user_id (str): user identity, _id in UserAccountDoc
|
|
password (str): password user provided, clear text
|
|
|
|
Returns:
|
|
bool: True if password is correct, else return False
|
|
"""
|
|
|
|
user_password = await UserPasswordDoc.find(
|
|
UserPasswordDoc.user_id == user_id
|
|
).first_or_none()
|
|
|
|
if user_password:
|
|
# password is reseted to empty string, cannot be verified
|
|
if user_password.password == "":
|
|
return False
|
|
|
|
if bcrypt.checkpw(
|
|
password.encode("utf-8"), user_password.password.encode("utf-8")
|
|
):
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
return False
|
|
|
|
async def get_user_password(self, user_id: str) -> Optional[str]:
|
|
"""Get user password through the user_id
|
|
|
|
Args:
|
|
user_id (str): user identity, _id in UserAccountDoc
|
|
|
|
Returns:
|
|
str: password hash
|
|
"""
|
|
|
|
user_password = await UserPasswordDoc.find(
|
|
UserPasswordDoc.user_id == user_id
|
|
).first_or_none()
|
|
|
|
if user_password is None:
|
|
return None
|
|
else:
|
|
return user_password.password
|
|
|
|
async def get_user_email(self, user_id: str) -> Optional[str]:
|
|
"""get user email through the user_id
|
|
|
|
Args:
|
|
user_id (str): user identity, _id in UserAccountDoc
|
|
|
|
Returns:
|
|
str: email address
|
|
"""
|
|
user_email = await UserEmailDoc.find(
|
|
UserEmailDoc.user_id == user_id
|
|
).first_or_none()
|
|
|
|
if user_email is None:
|
|
return None
|
|
else:
|
|
return user_email.email
|
|
|
|
async def get_user_id_by_email(self, email: str) -> Optional[str]:
|
|
"""get user id through email from user_email doc
|
|
|
|
Args:
|
|
email (str): email address, compare email address in lowercase
|
|
|
|
Returns:
|
|
Optional[str]: user_id or None
|
|
"""
|
|
user_email = await UserEmailDoc.find(
|
|
UserEmailDoc.email == email.lower()
|
|
).first_or_none()
|
|
|
|
if user_email is None:
|
|
return None
|
|
else:
|
|
return user_email.user_id
|
|
|
|
def user_sign_out(self, token):
|
|
pass
|
|
|
|
async def generate_auth_code_for_email(self, email: str) -> str:
|
|
"""send auth code to email address
|
|
|
|
Args:
|
|
email (str): email address
|
|
"""
|
|
auth_code = generate_auth_code()
|
|
expiry = datetime.now(timezone.utc) + timedelta(minutes=5)
|
|
auth_code_doc = AuthCodeDoc(
|
|
auth_code=auth_code,
|
|
method=email.lower(),
|
|
method_type=AuthType.EMAIL,
|
|
expiry=expiry,
|
|
)
|
|
|
|
await auth_code_doc.create()
|
|
return auth_code
|
|
|
|
async def verify_email_code(self, email: str, code: str) -> bool:
|
|
"""sign in with email and code
|
|
|
|
Args:
|
|
email (str): email address
|
|
code (str): auth code to be verified
|
|
|
|
Returns:
|
|
bool: True if code is valid, False otherwise
|
|
"""
|
|
result = await AuthCodeDoc.find(
|
|
AuthCodeDoc.method == email.lower(),
|
|
AuthCodeDoc.auth_code == code,
|
|
AuthCodeDoc.expiry > datetime.now(timezone.utc),
|
|
AuthCodeDoc.method_type == AuthType.EMAIL,
|
|
).first_or_none()
|
|
|
|
if result:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
async def get_user_mobile(self, user_id: str) -> Optional[str]:
|
|
"""get user mobile number through the user_id
|
|
|
|
Args:
|
|
user_id (str): user identity, _id in UserAccountDoc
|
|
|
|
Returns:
|
|
str: mobile number
|
|
"""
|
|
user_mobile = await UserMobileDoc.find(
|
|
UserMobileDoc.user_id == user_id
|
|
).first_or_none()
|
|
|
|
if user_mobile is None:
|
|
return None
|
|
else:
|
|
return user_mobile.mobile
|
|
|
|
async def generate_auth_code_for_mobile(self, mobile: str) -> str:
|
|
"""send auth code to mobile number
|
|
|
|
Args:
|
|
mobile (str): mobile number
|
|
"""
|
|
auth_code = generate_auth_code()
|
|
expiry = datetime.now(timezone.utc) + timedelta(minutes=5)
|
|
auth_code_doc = AuthCodeDoc(
|
|
auth_code=auth_code,
|
|
method=mobile.lower(),
|
|
method_type=AuthType.MOBILE,
|
|
expiry=expiry,
|
|
)
|
|
|
|
await auth_code_doc.create()
|
|
return auth_code
|
|
|
|
async def verify_mobile_with_code(self, mobile, code):
|
|
"""sign in with mobile and code
|
|
|
|
Args:
|
|
mobile (str): mobile number
|
|
code (str): auth code to be verified
|
|
|
|
Returns:
|
|
bool: True if code is valid, False otherwise
|
|
"""
|
|
result = await AuthCodeDoc.find(
|
|
AuthCodeDoc.method == mobile.lower(),
|
|
AuthCodeDoc.auth_code == code,
|
|
AuthCodeDoc.expiry > datetime.now(timezone.utc),
|
|
AuthCodeDoc.method_type == AuthType.MOBILE,
|
|
).first_or_none()
|
|
|
|
if result:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
async def save_email_auth_method(self, user_id: str, email: str):
|
|
"""save email auth method to user_email doc
|
|
|
|
Args:
|
|
user_id (str): user id
|
|
email (str): email address
|
|
"""
|
|
user_email = await UserEmailDoc.find(
|
|
UserEmailDoc.user_id == user_id
|
|
).first_or_none()
|
|
|
|
if user_email is None:
|
|
new_user_email = UserEmailDoc(user_id=user_id, email=email.lower())
|
|
await new_user_email.create()
|
|
else:
|
|
user_email.email = email.lower()
|
|
await user_email.save()
|
|
|
|
async def save_password_auth_method(self, user_id: str, user_flid, password: str):
|
|
"""save password auth method to user_password doc
|
|
|
|
Args:
|
|
user_id (str): user id
|
|
password (str): user password
|
|
"""
|
|
password_hashed = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
|
|
|
|
user_password = await UserPasswordDoc.find(
|
|
UserPasswordDoc.user_id == user_id
|
|
).first_or_none()
|
|
|
|
if user_password is None:
|
|
new_user_password = UserPasswordDoc(
|
|
user_id=user_id, password=password_hashed
|
|
)
|
|
await new_user_password.create()
|
|
else:
|
|
user_password.password = password_hashed
|
|
await user_password.save()
|
|
|
|
result = await self.code_depot_manager.update_depot_user_password(
|
|
user_flid, password
|
|
)
|
|
if not result:
|
|
raise Exception("Failed to update user password in code depot")
|
|
|
|
async def reset_password(self, user_id: str):
|
|
"""clean password auth method from user_password doc
|
|
|
|
Args:
|
|
user_id (str): user id
|
|
"""
|
|
user_password = await UserPasswordDoc.find(
|
|
UserPasswordDoc.user_id == user_id
|
|
).first_or_none()
|
|
|
|
if user_password:
|
|
user_password.password = ""
|
|
await user_password.save()
|
|
else:
|
|
raise Exception("User password was not set before.")
|
|
|
|
async def is_password_reset_required(self, user_id: str) -> bool:
|
|
"""check if password is required for the user
|
|
|
|
Args:
|
|
user_id (str): user id
|
|
|
|
Returns:
|
|
bool: True if password is required, False otherwise
|
|
"""
|
|
user_password = await UserPasswordDoc.find(
|
|
UserPasswordDoc.user_id == user_id
|
|
).first_or_none()
|
|
|
|
if user_password:
|
|
return user_password.password == ""
|
|
else:
|
|
return True
|
|
|
|
async def is_flid_reset_required(self, user_id: str) -> bool:
|
|
basic_profile = await BasicProfileDoc.find_one(
|
|
BasicProfileDoc.user_id == user_id
|
|
)
|
|
|
|
if basic_profile:
|
|
return basic_profile.FLID.update_time == basic_profile.FLID.create_time
|
|
|
|
async def is_flid_available(self, user_flid: str) -> bool:
|
|
basic_profile = await BasicProfileDoc.find_one(
|
|
BasicProfileDoc.FLID.identity == user_flid
|
|
)
|
|
|
|
if basic_profile:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
async def get_flid(self, user_id: str) -> str:
|
|
basic_profile = await BasicProfileDoc.find_one(
|
|
BasicProfileDoc.user_id == user_id
|
|
)
|
|
|
|
if basic_profile:
|
|
return basic_profile.FLID.identity
|
|
else:
|
|
return None
|
|
|
|
async def update_flid(self, user_id: str, flid: str) -> bool:
|
|
basic_profile = await BasicProfileDoc.find_one(
|
|
BasicProfileDoc.user_id == user_id
|
|
)
|
|
|
|
if basic_profile:
|
|
basic_profile.FLID.identity = flid
|
|
basic_profile.FLID.update_time = datetime.now(timezone.utc)
|
|
basic_profile.FLID.set_by = user_id
|
|
await basic_profile.save()
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
async def generate_auth_code(self, deliver_object: str, auth_type: AuthType) -> str:
|
|
"""send auth code to email address
|
|
|
|
Args:
|
|
deliver_object (str): email address, mobile, etc
|
|
auth_type (str): authentication type
|
|
"""
|
|
auth_code = generate_auth_code()
|
|
expiry = datetime.now(timezone.utc) + timedelta(minutes=5)
|
|
auth_code_doc = AuthCodeDoc(
|
|
auth_code=auth_code,
|
|
method=deliver_object.lower(),
|
|
method_type=auth_type,
|
|
expiry=expiry,
|
|
)
|
|
|
|
await auth_code_doc.create()
|
|
return auth_code
|