398 lines
16 KiB
Python
398 lines
16 KiB
Python
import random
|
|
from typing import Tuple, Optional, List
|
|
|
|
|
|
from backend.services.auth.user_auth_service import UserAuthService
|
|
from common.constants.region import UserRegion
|
|
from common.utils.region import RegionHandler
|
|
from backend.models.user.constants import (
|
|
NewUserMethod,
|
|
)
|
|
from backend.models.user.constants import UserLoginAction
|
|
from backend.services.user.user_management_service import (
|
|
UserManagementService,
|
|
)
|
|
from backend.services.code_depot.code_depot_service import (
|
|
CodeDepotService,
|
|
)
|
|
from common.log.module_logger import ModuleLogger
|
|
from common.utils.string import check_password_complexity
|
|
from common.exception.exceptions import InvalidDataError
|
|
from backend.services.notification.notification_service import (
|
|
NotificationService,
|
|
)
|
|
from backend.models.user.constants import (
|
|
AuthType,
|
|
)
|
|
from common.config.app_settings import app_settings
|
|
|
|
|
|
class SignInManager:
|
|
def __init__(self):
|
|
self.user_auth_service = UserAuthService()
|
|
self.region_handler = RegionHandler()
|
|
self.user_management_service = UserManagementService()
|
|
self.module_logger = ModuleLogger(sender_id=SignInManager)
|
|
self.code_depot_service = CodeDepotService()
|
|
self.notification_service = NotificationService()
|
|
|
|
async def signin_with_email_and_code(
|
|
self, email: str, code: str, host: str, time_zone: Optional[str] = "UTC"
|
|
) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[UserRegion], Optional[str], Optional[List[str]], Optional[List[str]]]:
|
|
"""Try to signin with email and code.
|
|
create a new user account, if the email address has never been used before.
|
|
|
|
Args:
|
|
email (str): email address
|
|
code (str): auth code to be verified
|
|
host (str): the host address by which the client access the frontend service, for detecting UserRegion
|
|
time_zone (str, optional): timezone of the frontend service
|
|
|
|
Returns:
|
|
[int, Optional[int], Optional[str], Optional[str]]:
|
|
- int: UserLoginAction
|
|
- Optional[int]: user role
|
|
- Optional[str]: user_id
|
|
- Optional[str]: flid
|
|
- Optional[str]: region
|
|
- Optional[str]: user role names
|
|
- Optional[str]: user permission keys
|
|
"""
|
|
# check if the user account exist
|
|
user_id = await self.user_auth_service.get_user_id_by_email(email)
|
|
|
|
# if it cannot find user account according to the email address, new user
|
|
is_new_user = user_id is None
|
|
preferred_region = self.region_handler.detect_from_host(host)
|
|
|
|
# verify the email through auth code
|
|
if await self.user_auth_service.verify_email_with_code(email, code):
|
|
if is_new_user:
|
|
user_account = (
|
|
await self.user_management_service.create_new_user_account(
|
|
method=NewUserMethod.EMAIL, region=preferred_region
|
|
)
|
|
)
|
|
user_id = str(user_account.id)
|
|
await self.user_management_service.initialize_new_user_data(
|
|
user_id=str(user_account.id),
|
|
method=NewUserMethod.EMAIL,
|
|
email_address=email,
|
|
region=preferred_region,
|
|
time_zone=time_zone,
|
|
)
|
|
|
|
user_account = await self.user_management_service.get_account_by_id(
|
|
user_id=user_id
|
|
)
|
|
role_names, permission_keys = await self.user_management_service.get_role_and_permission_by_user_id(
|
|
user_id=user_id
|
|
)
|
|
if await self.user_auth_service.is_flid_reset_required(user_id):
|
|
return (
|
|
UserLoginAction.REVIEW_AND_REVISE_FLID,
|
|
user_account.user_role,
|
|
user_id,
|
|
email.split("@")[0],
|
|
preferred_region,
|
|
role_names,
|
|
permission_keys,
|
|
)
|
|
|
|
user_flid = await self.user_auth_service.get_user_flid(user_id)
|
|
if await self.user_auth_service.is_password_reset_required(user_id):
|
|
return (
|
|
UserLoginAction.NEW_USER_SET_PASSWORD,
|
|
user_account.user_role,
|
|
user_id,
|
|
user_flid,
|
|
preferred_region,
|
|
role_names,
|
|
permission_keys,
|
|
)
|
|
return (
|
|
UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED,
|
|
user_account.user_role,
|
|
user_id,
|
|
user_flid,
|
|
preferred_region,
|
|
role_names,
|
|
permission_keys,
|
|
)
|
|
else:
|
|
await self.module_logger.log_warning(
|
|
warning="The auth code is invalid.",
|
|
properties={"email": email, "code": code},
|
|
)
|
|
# TODO refactor this to reduce None
|
|
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None, None, None, None
|
|
|
|
async def signin_with_email_and_password(
|
|
self, email: str, password: str
|
|
) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[str], Optional[List[str]], Optional[List[str]]]:
|
|
|
|
# check if the user account exist
|
|
user_id = await self.user_auth_service.get_user_id_by_email(email)
|
|
|
|
# if it cannot find user account according to the email address, new user
|
|
is_new_user = user_id is None
|
|
|
|
if is_new_user:
|
|
# cannot find the email address
|
|
# TODO refactor this to reduce None
|
|
return (UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None, None, None)
|
|
else:
|
|
if await self.user_auth_service.is_password_reset_required(user_id):
|
|
# password hasn't been set before, save password for the user
|
|
return (
|
|
UserLoginAction.NEW_USER_SET_PASSWORD,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None
|
|
)
|
|
else:
|
|
if await self.user_auth_service.verify_user_with_password(
|
|
user_id, password
|
|
):
|
|
user_account = await self.user_management_service.get_account_by_id(
|
|
user_id=user_id
|
|
)
|
|
role_names, permission_keys = await self.user_management_service.get_role_and_permission_by_user_id(user_id)
|
|
if await self.user_auth_service.is_flid_reset_required(user_id):
|
|
return (
|
|
UserLoginAction.REVIEW_AND_REVISE_FLID,
|
|
user_account.user_role,
|
|
user_id,
|
|
email.split("@")[0],
|
|
role_names,
|
|
permission_keys,
|
|
)
|
|
|
|
user_flid = await self.user_auth_service.get_user_flid(user_id)
|
|
|
|
# password verification passed
|
|
return (
|
|
UserLoginAction.USER_SIGNED_IN,
|
|
user_account.user_role,
|
|
user_id,
|
|
user_flid,
|
|
role_names,
|
|
permission_keys
|
|
)
|
|
else:
|
|
# ask user to input password again.
|
|
# TODO: we need to limit times of user to input the wrong password
|
|
# TODO refactor this to reduce None
|
|
return (
|
|
UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None
|
|
)
|
|
|
|
async def update_new_user_flid(
|
|
self, user_id: str, user_flid: str
|
|
) -> Tuple[UserLoginAction, Optional[str]]:
|
|
if await self.user_auth_service.is_flid_available(user_flid):
|
|
|
|
code_depot_email = "{}@freeleaps.com".format(user_flid)
|
|
result = await self.code_depot_service.create_depot_user(
|
|
user_flid, user_id, code_depot_email
|
|
)
|
|
|
|
if not result:
|
|
await self.module_logger.log_error(
|
|
error="Failed to create depot user for {} with flid {} and email {}".format(
|
|
user_id, user_flid, code_depot_email
|
|
),
|
|
properties={
|
|
"user_id": user_id,
|
|
"user_flid": user_flid,
|
|
"code_depot_email": code_depot_email,
|
|
},
|
|
)
|
|
return (
|
|
UserLoginAction.REVIEW_AND_REVISE_FLID,
|
|
"{}{}".format(user_flid, random.randint(100, 999)),
|
|
)
|
|
await self.user_auth_service.update_flid(user_id, user_flid)
|
|
if await self.user_auth_service.is_password_reset_required(user_id):
|
|
return (
|
|
UserLoginAction.NEW_USER_SET_PASSWORD,
|
|
user_flid,
|
|
)
|
|
else:
|
|
return (
|
|
UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED,
|
|
user_flid,
|
|
)
|
|
else:
|
|
return (
|
|
UserLoginAction.REVIEW_AND_REVISE_FLID,
|
|
"{}{}".format(user_flid, random.randint(100, 999)),
|
|
)
|
|
|
|
async def try_signin_with_email(self, email: str, host: str) -> UserLoginAction:
|
|
"""try signin through email, generate auth code and send to the email address
|
|
|
|
Args:
|
|
email (str): email address
|
|
host (str): host url that user tried to sign in
|
|
|
|
Returns:
|
|
int: UserLoginAction
|
|
"""
|
|
user_id = await self.user_auth_service.get_user_id_by_email(email)
|
|
|
|
is_password_reset_required = False
|
|
if user_id:
|
|
is_password_reset_required = (
|
|
await self.user_auth_service.is_password_reset_required(user_id)
|
|
)
|
|
|
|
if user_id is None or is_password_reset_required:
|
|
# send auth code through email if the email address
|
|
# hasn't been associated with any account.
|
|
# Or if the user's password is empty, which means the user's pasword hasn't been set.
|
|
|
|
mail_code = await self.user_auth_service.generate_auth_code_for_object(
|
|
email, AuthType.EMAIL
|
|
)
|
|
await self.notification_service.send_notification(
|
|
sender_id=app_settings.SYSTEM_USER_ID,
|
|
channels=["2"], # 2 maps to email in NotificationChannel
|
|
receiver_id=email,
|
|
subject="email",
|
|
event="authentication",
|
|
properties={"auth_code": mail_code},
|
|
# TODO: reconsider necessity of adding region info here
|
|
# region=RegionHandler().detect_from_host(host),
|
|
)
|
|
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE
|
|
else:
|
|
return UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED
|
|
|
|
async def try_magicleaps_signin_with_email(self, email: str, host: str) -> UserLoginAction:
|
|
"""try signin through email using MagicLeaps branding, generate auth code and send to the email address
|
|
|
|
Args:
|
|
email (str): email address
|
|
host (str): host url that user tried to sign in
|
|
|
|
Returns:
|
|
int: UserLoginAction
|
|
"""
|
|
user_id = await self.user_auth_service.get_user_id_by_email(email)
|
|
|
|
is_password_reset_required = False
|
|
if user_id:
|
|
is_password_reset_required = (
|
|
await self.user_auth_service.is_password_reset_required(user_id)
|
|
)
|
|
|
|
if user_id is None or is_password_reset_required:
|
|
# send auth code through email if the email address
|
|
# hasn't been associated with any account.
|
|
# Or if the user's password is empty, which means the user's pasword hasn't been set.
|
|
|
|
mail_code = await self.user_auth_service.generate_auth_code_for_object(
|
|
email, AuthType.EMAIL
|
|
)
|
|
await self.notification_service.send_notification(
|
|
sender_id=app_settings.SYSTEM_USER_ID,
|
|
channels=["2"], # 2 maps to email in NotificationChannel
|
|
receiver_id=email,
|
|
subject="email",
|
|
event="magicleaps_authentication", # Use the new event type
|
|
properties={"auth_code": mail_code},
|
|
# TODO: reconsider necessity of adding region info here
|
|
# region=RegionHandler().detect_from_host(host),
|
|
)
|
|
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE
|
|
else:
|
|
return UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED
|
|
|
|
async def reset_password_through_email(self, email: str, host: str) -> int:
|
|
"""verify the email is exisitng, clear the existing password,
|
|
generate auth code and send to the email address
|
|
so in the following steps, the user can reset their password.
|
|
|
|
Args:
|
|
email (str): email address
|
|
host (str): host that user will perform the reset on
|
|
|
|
Returns:
|
|
int: UserLoginAction
|
|
"""
|
|
|
|
user_id = await self.user_auth_service.get_user_id_by_email(email)
|
|
if user_id is not None:
|
|
# send auth code through email if the email address
|
|
# hasn been associated with any account.
|
|
mail_code = await self.user_auth_service.generate_auth_code_for_object(
|
|
email, AuthType.EMAIL
|
|
)
|
|
await self.notification_service.send_notification(
|
|
sender_id=app_settings.SYSTEM_USER_ID,
|
|
channels=["2"], # 2 maps to email in NotificationChannel
|
|
receiver_id=email,
|
|
subject="email",
|
|
event="authentication",
|
|
properties={"auth_code": mail_code},
|
|
)
|
|
|
|
await self.user_auth_service.reset_password(user_id)
|
|
|
|
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE
|
|
else:
|
|
return UserLoginAction.EMAIL_NOT_ASSOCIATED_WITH_USER
|
|
|
|
async def update_user_password(self, user_id: str, password: str) -> dict[str, any]:
|
|
error_message = """
|
|
Password does not pass complexity requirements:
|
|
- At least one lowercase character
|
|
- At least one uppercase character
|
|
- At least one digit
|
|
- At least one special character (punctuation, brackets, quotes, etc.)
|
|
"""
|
|
if not check_password_complexity(password):
|
|
raise InvalidDataError(error_message)
|
|
|
|
user_flid = await self.user_auth_service.get_user_flid(user_id)
|
|
await self.user_auth_service.save_password_auth_method(
|
|
user_id, user_flid, password
|
|
)
|
|
return {"succeeded": True}
|
|
|
|
async def send_email_code(self, sender_id: str, email: str) -> bool:
|
|
mail_code = await self.user_auth_service.generate_auth_code_for_object(
|
|
email, AuthType.EMAIL
|
|
)
|
|
success = await self.notification_service.send_notification(
|
|
sender_id=sender_id,
|
|
channels=["email"],
|
|
receiver_id=email,
|
|
subject="email",
|
|
event="authentication",
|
|
properties={"auth_code": mail_code},
|
|
)
|
|
return success
|
|
|
|
async def send_mobile_code(self, sender_id: str, mobile: str) -> bool:
|
|
mail_code = await self.user_auth_service.generate_auth_code_for_object(
|
|
mobile, AuthType.MOBILE
|
|
)
|
|
success = await self.notification_service.send_notification(
|
|
sender_id=sender_id,
|
|
channels=["email"],
|
|
receiver_id=mobile,
|
|
subject="mobile",
|
|
event="authentication",
|
|
properties={"auth_code": mail_code},
|
|
)
|
|
return success
|