freeleaps-service-hub/apps/authentication/backend/business/signin_manager.py
2024-10-30 07:22:26 -07:00

357 lines
14 KiB
Python

import random
from typing import Tuple, Optional
from app.authentication.backend.services.auth.user_auth_service import UserAuthService
from infra.i18n.region_handler import RegionHandler
from app.authentication.backend.models.user.constants import (
UserLoginAction,
NewUserMethod,
)
from app.authentication.backend.models.user.constants import UserLoginAction
from app.authentication.backend.services.user.user_management_service import (
UserManagementService,
)
from app.authentication.backend.services.code_depot.code_depot_service import (
CodeDepotService,
)
from infra.log.module_logger import ModuleLogger
from infra.utils.string import check_password_complexity
from infra.exception.exceptions import InvalidDataError
from app.authentication.backend.services.notification.notification_service import (
NotificationService,
)
from app.authentication.backend.models.user.constants import (
AuthType,
)
class SignInManager:
def __init__(self):
self.user_auth_service = UserAuthService()
self.region_handler = RegionHandler()
self.user_management_service = UserManagementService()
self.module_logger = ModuleLogger(sender_id=SignInManager)
self.code_depot_service = CodeDepotService()
self.notification_service = NotificationService()
# TODO: Dax: notification service
# self.event_service = EventService()
async def signin_with_email_and_code(
self, email: str, code: str, host: str, time_zone: Optional[str] = "UTC"
) -> Tuple[int, Optional[int], Optional[str], Optional[str]]:
"""Try to signin with email and code.
create a new user account, if the email address has never been used before.
Args:
email (str): email address
code (str): auth code to be verified
host (str): the host address by which the client access the frontend service
Returns:
[int, Optional[int], Optional[str], Optional[str]]:
- int: UserLoginAction
- Optional[int]: user role
- Optional[str]: user_id
- Optional[str]: flid
"""
# check if the user account exist
user_id = await self.user_auth_service.get_user_id_by_email(email)
# if it cannot find user account according to the email address, new user
is_new_user = user_id is None
preferred_region = self.region_handler.detect_from_host(host)
# verify the email through auth code
if await self.user_auth_service.verify_email_with_code(email, code):
if is_new_user:
user_account = (
await self.user_management_service.create_new_user_account(
method=NewUserMethod.EMAIL, region=preferred_region
)
)
user_id = str(user_account.id)
await self.user_management_service.initialize_new_user_data(
user_id=str(user_account.id),
method=NewUserMethod.EMAIL,
email_address=email,
region=preferred_region,
time_zone=time_zone,
)
# TODO: Dax - Add notification for log_signup_event
# await self.event_service.log_signup_event(
# user_id=user_id,
# email=email,
# region=preferred_region,
# time_zone=time_zone,
# )
# TODO: Dax - Add notification for log_signup_event
# This will be done by sending the notification back to Freeleaps App
# await self.achievement_service.record_login_event(user_id)
if await self.user_auth_service.is_flid_reset_required(user_id):
return (
UserLoginAction.REVIEW_AND_REVISE_FLID,
user_account.user_role,
user_id,
email.split("@")[0],
preferred_region,
)
user_flid = await self.user_auth_service.get_user_flid(user_id)
if await self.user_auth_service.is_password_reset_required(user_id):
return (
UserLoginAction.NEW_USER_SET_PASSWORD,
user_account.user_role,
user_id,
user_flid,
preferred_region,
)
return (
UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED,
user_account.user_role,
user_id,
user_flid,
preferred_region,
)
else:
await self.module_logger.log_warning(
info="The auth code is invalid.",
properties={"email": email, "code": code},
)
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None, None
async def signin_with_email_and_password(
self, email: str, password: str
) -> Tuple[UserLoginAction, Optional[int], Optional[str], Optional[str]]:
# check if the user account exist
user_id = await self.user_auth_service.get_user_id_by_email(email)
# if it cannot find user account according to the email address, new user
is_new_user = user_id is None
if is_new_user:
# cannot find the email address
return [UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE, None, None, None]
else:
if await self.user_auth_service.is_password_reset_required(user_id):
# password hasn't been set before, save password for the user
return [
UserLoginAction.NEW_USER_SET_PASSWORD,
None,
None,
None,
]
else:
if await self.user_auth_service.verify_user_with_password(
user_id, password
):
# TODO: Add user achievement logic in Freeleaps
# TODO: Add notification system
# user logins count + 1
# await UserAchievement(user_id).record_user_login_event()
# for new users, log the sign up event
# await self.event_dispatcher.dispatch_event(
# receiver_id=settings.SYSTEM_USER_ID,
# subject="login",
# event="signin",
# properties={
# "user_id": user_id,
# },
# )
user_account = await self.user_management_service.get_account_by_id(
user_id=user_id
)
if await self.user_auth_service.is_flid_reset_required(user_id):
return [
UserLoginAction.REVIEW_AND_REVISE_FLID,
user_account.user_role,
user_id,
email.split("@")[0],
]
user_flid = await self.user_auth_service.get_user_flid(user_id)
# password verification passed
return [
UserLoginAction.USER_SIGNED_IN,
user_account.user_role,
user_id,
user_flid,
]
else:
# ask user to input password again.
# TODO: we need to limit times of user to input the wrong password
return [
UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED,
None,
None,
None,
]
async def update_new_user_flid(
self, user_id: str, user_flid: str
) -> Tuple[UserLoginAction, Optional[str]]:
if await self.user_auth_service.is_flid_available(user_flid):
code_depot_email = "{}@freeleaps.com".format(user_flid)
result = await self.code_depot_service.create_depot_user(
user_flid, user_id, code_depot_email
)
if not result:
await self.module_logger.log_error(
error="Failed to create depot user for {} with flid {} and email {}".format(
user_id, user_flid, code_depot_email
),
properties={
"user_id": user_id,
"user_flid": user_flid,
"code_depot_email": code_depot_email,
},
)
return [
UserLoginAction.REVIEW_AND_REVISE_FLID,
"{}{}".format(user_flid, random.randint(100, 999)),
]
await self.user_auth_service.update_flid(user_id, user_flid)
if await self.user_auth_service.is_password_reset_required(user_id):
return [
UserLoginAction.NEW_USER_SET_PASSWORD,
user_flid,
]
else:
return [
UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED,
user_flid,
]
else:
return [
UserLoginAction.REVIEW_AND_REVISE_FLID,
"{}{}".format(user_flid, random.randint(100, 999)),
]
async def try_signin_with_email(self, email: str, host: str) -> UserLoginAction:
"""try signin through email, generate auth code and send to the email address
Args:
email (str): email address
host (str): host url that user tried to sign in
Returns:
int: UserLoginAction
"""
user_id = await self.user_auth_service.get_user_id_by_email(email)
is_password_reset_required = False
if user_id:
is_password_reset_required = (
await self.user_auth_service.is_password_reset_required(user_id)
)
if user_id is None or is_password_reset_required:
# send auth code through email if the email address
# hasn't been associated with any account.
# Or if the user's password is empty, which means the user's pasword hasn't been set.
mail_code = await self.user_auth_service.generate_auth_code_for_email(email)
region = RegionHandler().detect_from_host(host)
# TODO: Add notification
# async with self.notification_center:
# await self.notification_center.send_email_notification(
# receiver_id=email,
# subject="email",
# event="authentication",
# properties={"auth_code": mail_code},
# region=region,
# )
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE
else:
return UserLoginAction.EXISTING_USER_PASSWORD_REQUIRED
async def reset_password_through_email(self, email: str, host: str) -> int:
"""verify the email is exisitng, clear the existing password,
generate auth code and send to the email address
so in the following steps, the user can reset their password.
Args:
email (str): email address
host (str): host that user will perform the reset on
Returns:
int: UserLoginAction
"""
user_id = await self.user_auth_service.get_user_id_by_email(email)
if user_id is not None:
# send auth code through email if the email address
# hasn been associated with any account.
mail_code = await self.user_auth_service.generate_auth_code_for_email(email)
region = RegionHandler().detect_from_host(host)
# TODO: Add notification support
# async with self.notification_center:
# await self.notification_center.send_email_notification(
# receiver_id=email,
# subject="email",
# event="authentication",
# properties={"auth_code": mail_code},
# region=region,
# )
await self.user_auth_service.reset_password(user_id)
return UserLoginAction.VERIFY_EMAIL_WITH_AUTH_CODE
else:
return UserLoginAction.EMAIL_NOT_ASSOCIATED_WITH_USER
async def update_user_password(self, user_id: str, password: str) -> dict[str, any]:
error_message = """
Password does not pass complexity requirements:
- At least one lowercase character
- At least one uppercase character
- At least one digit
- At least one special character (punctuation, brackets, quotes, etc.)
"""
if not check_password_complexity(password):
raise InvalidDataError(error_message)
user_flid = await self.user_auth_service.get_user_flid(user_id)
await self.user_auth_service.save_password_auth_method(
user_id, user_flid, password
)
return {"succeeded": True}
async def send_email_code(self, sender_id: str, email: str) -> bool:
mail_code = await self.user_auth_service.generate_auth_code_for_object(
email, AuthType.EMAIL
)
success = await self.notification_service.send_notification(
sender_id=sender_id,
channels=["email"],
receiver_id=email,
subject="email",
event="authentication",
properties={"auth_code": mail_code},
)
return success
async def send_mobile_code(self, sender_id: str, mobile: str) -> bool:
mail_code = await self.user_auth_service.generate_auth_code_for_object(
mobile, AuthType.MOBILE
)
success = await self.notification_service.send_notification(
sender_id=sender_id,
channels=["email"],
receiver_id=mobile,
subject="mobile",
event="authentication",
properties={"auth_code": mail_code},
)
return success