freeleaps-service-hub/apps/authentication/backend/business/signin_manager.py

415 lines
17 KiB
Python

import random
from typing import Tuple, Optional, List
from backend.services.auth.user_auth_service import UserAuthService
from common.constants.region import UserRegion
from common.utils.region import RegionHandler
from backend.models.user.constants import (
NewUserMethod,
)
from backend.models.user.constants import UserLoginAction
from backend.services.user.user_management_service import (
UserManagementService,
)
from backend.services.code_depot.code_depot_service import (
CodeDepotService,
)
from common.log.module_logger import ModuleLogger
from common.utils.string import check_password_complexity
from common.exception.exceptions import InvalidDataError
from backend.services.notification.notification_service import (
NotificationService,
)
from backend.models.user.constants import (
AuthType,
)
from common.config.app_settings import app_settings
class SignInManager:
def __init__(self):
self.user_auth_service = UserAuthService()
self.region_handler = RegionHandler()
self.user_management_service = UserManagementService()
self.module_logger = ModuleLogger(sender_id=SignInManager)
self.code_depot_service = CodeDepotService()
self.notification_service = NotificationService()
async def signin_with_email_and_code(
self, email: str, code: str, host: str, time_zone: Optional[str] = "UTC"
) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[UserRegion], Optional[str], Optional[List[str]], Optional[List[str]]]:
"""Try to signin with email and code.
create a new user account, if the email address has never been used before.
Args:
email (str): email address
code (str): auth code to be verified
host (str): the host address by which the client access the frontend service, for detecting UserRegion
time_zone (str, optional): timezone of the frontend service
Returns:
[int, Optional[int], Optional[str], Optional[str]]:
- int: UserLoginAction
- Optional[int]: user role
- Optional[str]: user_id
- Optional[str]: flid
- Optional[str]: region
- Optional[str]: user role names
- Optional[str]: user permission keys
"""
# check if the user account exist
user_id = await self.user_auth_service.get_user_id_by_email(email)
# if it cannot find user account according to the email address, new user
is_new_user = user_id is None
preferred_region = self.region_handler.detect_from_host(host)
# verify the email through auth code
if await self.user_auth_service.verify_email_with_code(email, code):
if is_new_user:
user_account = (
await self.user_management_service.create_new_user_account(
method=NewUserMethod.EMAIL, region=preferred_region
)
)
user_id = str(user_account.id)
await self.user_management_service.initialize_new_user_data(
user_id=str(user_account.id),
method=NewUserMethod.EMAIL,
email_address=email,
region=preferred_region,
time_zone=time_zone,
)
user_account = await self.user_management_service.get_account_by_id(
user_id=user_id
)
role_names, permission_keys = await self.user_management_service.get_role_and_permission_by_user_id(
user_id=user_id
)
if await self.user_auth_service.is_flid_reset_required(user_id):
return (
UserLoginAction.REVIEW_AND_REVISE_FLID,
user_account.user_role,
user_id,
email.split("@")[0],
preferred_region,
role_names,
permission_keys,
)
user_flid = await self.user_auth_service.get_user_flid(user_id)
if await self.user_auth_service.is_password_reset_required(user_id):
return (
UserLoginAction.NEW_USER_SET_PASSWORD,
user_account.user_role,
user_id,
user_flid,
preferred_region,
role_names,
permission_keys,
)
return (
UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED,
user_account.user_role,
user_id,
user_flid,
preferred_region,
role_names,
permission_keys,
)
else:
await self.module_logger.log_warning(
warning="The auth code is invalid.",
properties={"email": email, "code": code},
)
# TODO refactor this to reduce None
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None, None, None, None
async def signin_with_email_and_password(
self, email: str, password: str
) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[str], Optional[List[str]], Optional[List[str]]]:
# check if the user account exist
user_id = await self.user_auth_service.get_user_id_by_email(email)
# if it cannot find user account according to the email address, new user
is_new_user = user_id is None
if is_new_user:
# cannot find the email address
# TODO refactor this to reduce None
return (UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None, None, None)
else:
if await self.user_auth_service.is_password_reset_required(user_id):
# password hasn't been set before, save password for the user
return (
UserLoginAction.NEW_USER_SET_PASSWORD,
None,
None,
None,
None,
None
)
else:
if await self.user_auth_service.verify_user_with_password(
user_id, password
):
user_account = await self.user_management_service.get_account_by_id(
user_id=user_id
)
role_names, permission_keys = await self.user_management_service.get_role_and_permission_by_user_id(user_id)
if await self.user_auth_service.is_flid_reset_required(user_id):
return (
UserLoginAction.REVIEW_AND_REVISE_FLID,
user_account.user_role,
user_id,
email.split("@")[0],
role_names,
permission_keys,
)
user_flid = await self.user_auth_service.get_user_flid(user_id)
# password verification passed
return (
UserLoginAction.USER_SIGNED_IN,
user_account.user_role,
user_id,
user_flid,
role_names,
permission_keys
)
else:
# ask user to input password again.
# TODO: we need to limit times of user to input the wrong password
# TODO refactor this to reduce None
return (
UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED,
None,
None,
None,
None,
None
)
async def update_new_user_flid(
self, user_id: str, user_flid: str
) -> Tuple[UserLoginAction, Optional[str]]:
if await self.user_auth_service.is_flid_available(user_flid):
code_depot_email = "{}@freeleaps.com".format(user_flid)
result = await self.code_depot_service.create_depot_user(
user_flid, user_id, code_depot_email
)
if not result:
await self.module_logger.log_error(
error="Failed to create depot user for {} with flid {} and email {}".format(
user_id, user_flid, code_depot_email
),
properties={
"user_id": user_id,
"user_flid": user_flid,
"code_depot_email": code_depot_email,
},
)
return (
UserLoginAction.REVIEW_AND_REVISE_FLID,
"{}{}".format(user_flid, random.randint(100, 999)),
)
await self.user_auth_service.update_flid(user_id, user_flid)
if await self.user_auth_service.is_password_reset_required(user_id):
return (
UserLoginAction.NEW_USER_SET_PASSWORD,
user_flid,
)
else:
return (
UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED,
user_flid,
)
else:
return (
UserLoginAction.REVIEW_AND_REVISE_FLID,
"{}{}".format(user_flid, random.randint(100, 999)),
)
async def try_signin_with_email(self, email: str, host: str) -> UserLoginAction:
"""try signin through email, generate auth code and send to the email address
Args:
email (str): email address
host (str): host url that user tried to sign in
Returns:
int: UserLoginAction
"""
user_id = await self.user_auth_service.get_user_id_by_email(email)
is_password_reset_required = False
if user_id:
is_password_reset_required = (
await self.user_auth_service.is_password_reset_required(user_id)
)
if user_id is None or is_password_reset_required:
# send auth code through email if the email address
# hasn't been associated with any account.
# Or if the user's password is empty, which means the user's pasword hasn't been set.
mail_code = await self.user_auth_service.generate_auth_code_for_object(
email, AuthType.EMAIL
)
await self.notification_service.send_notification(
sender_id=app_settings.SYSTEM_USER_ID,
channels=["2"], # 2 maps to email in NotificationChannel
receiver_id=email,
subject="email",
event="authentication",
properties={"auth_code": mail_code},
# TODO: reconsider necessity of adding region info here
# region=RegionHandler().detect_from_host(host),
)
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE
else:
return UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED
async def try_magicleaps_signin_with_email(self, email: str, host: str) -> UserLoginAction:
"""try signin through email using MagicLeaps branding, generate auth code and send to the email address
Args:
email (str): email address
host (str): host url that user tried to sign in
Returns:
int: UserLoginAction
"""
user_id = await self.user_auth_service.get_user_id_by_email(email)
is_password_reset_required = False
if user_id:
is_password_reset_required = (
await self.user_auth_service.is_password_reset_required(user_id)
)
if user_id is None or is_password_reset_required:
# send auth code through email if the email address
# hasn't been associated with any account.
# Or if the user's password is empty, which means the user's pasword hasn't been set.
mail_code = await self.user_auth_service.generate_auth_code_for_object(
email, AuthType.EMAIL
)
await self.notification_service.send_notification(
sender_id=app_settings.SYSTEM_USER_ID,
channels=["2"], # 2 maps to email in NotificationChannel
receiver_id=email,
subject="email",
event="magicleaps_authentication", # Use the new event type
properties={"auth_code": mail_code},
# TODO: reconsider necessity of adding region info here
# region=RegionHandler().detect_from_host(host),
)
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE
else:
return UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED
async def reset_password_through_email(self, email: str, host: str) -> int:
"""verify the email is exisitng, clear the existing password,
generate auth code and send to the email address
so in the following steps, the user can reset their password.
Args:
email (str): email address
host (str): host that user will perform the reset on
Returns:
int: UserLoginAction
"""
user_id = await self.user_auth_service.get_user_id_by_email(email)
if user_id is not None:
# send auth code through email if the email address
# hasn been associated with any account.
mail_code = await self.user_auth_service.generate_auth_code_for_object(
email, AuthType.EMAIL
)
await self.notification_service.send_notification(
sender_id=app_settings.SYSTEM_USER_ID,
channels=["2"], # 2 maps to email in NotificationChannel
receiver_id=email,
subject="email",
event="authentication",
properties={"auth_code": mail_code},
)
await self.user_auth_service.reset_password(user_id)
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE
else:
return UserLoginAction.EMAIL_NOT_ASSOCIATED_WITH_USER
async def update_user_password(self, user_id: str, password: str) -> dict[str, any]:
error_message = """
Password does not pass complexity requirements:
- At least one lowercase character
- At least one uppercase character
- At least one digit
- At least one special character (punctuation, brackets, quotes, etc.)
"""
if not check_password_complexity(password):
raise InvalidDataError(error_message)
user_flid = await self.user_auth_service.get_user_flid(user_id)
await self.user_auth_service.save_password_auth_method(
user_id, user_flid, password
)
return {"succeeded": True}
async def update_magicleaps_user_password(self, user_id: str, password: str) -> dict[str, any]:
error_message = """
Password does not pass complexity requirements:
- At least one lowercase character
- At least one uppercase character
- At least one digit
- At least one special character (punctuation, brackets, quotes, etc.)
"""
if not check_password_complexity(password):
raise InvalidDataError(error_message)
user_flid = await self.user_auth_service.get_user_flid(user_id)
await self.user_auth_service.save_magicleaps_password_auth_method(
user_id, user_flid, password
)
return {"succeeded": True}
async def send_email_code(self, sender_id: str, email: str) -> bool:
mail_code = await self.user_auth_service.generate_auth_code_for_object(
email, AuthType.EMAIL
)
success = await self.notification_service.send_notification(
sender_id=sender_id,
channels=["email"],
receiver_id=email,
subject="email",
event="authentication",
properties={"auth_code": mail_code},
)
return success
async def send_mobile_code(self, sender_id: str, mobile: str) -> bool:
mail_code = await self.user_auth_service.generate_auth_code_for_object(
mobile, AuthType.MOBILE
)
success = await self.notification_service.send_notification(
sender_id=sender_id,
channels=["email"],
receiver_id=mobile,
subject="mobile",
event="authentication",
properties={"auth_code": mail_code},
)
return success