This commit is contained in:
dongli 2024-12-01 19:28:55 -08:00
parent 99983b85ab
commit ae3792e25c
21 changed files with 13 additions and 616 deletions

3
.gitignore vendored
View File

@ -4,4 +4,5 @@
/deploy/.*
*.log
*.pyc
freedev.code-workspace
freedev.code-workspace
.idea/

View File

@ -1,7 +1,6 @@
from webapi.bootstrap.application import create_app
from webapi.config.site_settings import site_settings
from fastapi.responses import RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from typing import Any

View File

@ -1,10 +1,6 @@
from fastapi import APIRouter
from .signin import router as signin_router
from .tokens import router as token_router
from .auth import router as auth_router
from .payment import router as payment_router
api_router = APIRouter()
api_router.include_router(signin_router, tags=["user"])
api_router.include_router(token_router, tags=["token"])
api_router.include_router(auth_router, tags=["auth"])
api_router.include_router(payment_router, tags=["payment"])
websocket_router = APIRouter()

View File

@ -1,9 +0,0 @@
from fastapi import APIRouter
from .send_email_code import router as sec_router
from .send_mobile_code import router as smc_router
router = APIRouter(prefix="/auth")
router.include_router(sec_router, tags=["authentication"])
router.include_router(smc_router, tags=["authentication"])

View File

@ -1,41 +0,0 @@
from backend.application.signin_hub import SignInHub
from pydantic import BaseModel
from common.token.token_manager import TokenManager
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from fastapi import APIRouter, Depends, HTTPException
from starlette.status import HTTP_401_UNAUTHORIZED
router = APIRouter()
token_manager = TokenManager()
# Web API
# send_email_code
#
class RequestIn(BaseModel):
email: str
sender_id: str
@router.post(
"/send-email-code",
operation_id="send-email-code",
summary="Send a verification code to the specified email address",
description="This API requires an authenticated user and will send a code to the specified email. \
The code can be used later to verify the email address.",
response_description="Indicates success or failure of the email code send operation",
)
async def send_email_code(
item: RequestIn,
current_user: dict = Depends(token_manager.get_current_user),
):
user_id = current_user.get("id")
if not user_id:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED, detail="Could not validate credentials"
)
result = await SignInHub(user_id).send_email_code(item.sender_id, item.email)
return JSONResponse(content=jsonable_encoder(result))

View File

@ -1,40 +0,0 @@
from backend.application.signin_hub import SignInHub
from pydantic import BaseModel
from common.token.token_manager import TokenManager
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from fastapi import APIRouter, Depends, HTTPException
from starlette.status import HTTP_401_UNAUTHORIZED
router = APIRouter()
token_manager = TokenManager()
# Web API
# send_email_code
#
class RequestIn(BaseModel):
email: str
@router.post(
"/send-mobile-code",
operation_id="send-mobile-code",
summary="Send a verification code to the specified mobile number",
description="This API requires an authenticated user and will send a code to the specified mobile. \
The code can be used later to verify the mobile.",
response_description="Indicates success or failure of the mobile code send operation",
)
async def send_email_code(
item: RequestIn,
current_user: dict = Depends(token_manager.get_current_user),
):
user_id = current_user.get("id")
if not user_id:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED, detail="Could not validate credentials"
)
result = await SignInHub(user_id).send_mobile_code(item.email)
return JSONResponse(content=jsonable_encoder(result))

View File

@ -0,0 +1,9 @@
from fastapi import APIRouter
from .payment_manager_controller import router as pmc
from .stripe_manager_controller import router as smc
router = APIRouter(prefix="/payment")
router.include_router(pmc, tags=["payment"])
router.include_router(smc, tags=["payment"])

View File

@ -1,19 +0,0 @@
from fastapi import APIRouter
from .try_signin_with_email import router as ts_router
from .signin_with_email_and_password import router as se_router
from .signin_with_email_and_code import router as sw_router
from .update_user_password import router as up_router
from .update_new_user_flid import router as uu_router
from .reset_password_through_email import router as rp_router
from .sign_out import router as so_router
router = APIRouter(prefix="/signin")
router.include_router(ts_router, tags=["signin"])
router.include_router(sw_router, tags=["signin"])
router.include_router(up_router, tags=["signin"])
router.include_router(se_router, tags=["signin"])
router.include_router(so_router, tags=["signin"])
router.include_router(rp_router, tags=["signin"])
router.include_router(uu_router, tags=["signin"])

View File

@ -1,37 +0,0 @@
from backend.application.signin_hub import SignInHub
from pydantic import BaseModel
from fastapi import APIRouter
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
router = APIRouter()
# Web API
# reset_password_through_email
#
class UserSignWithEmailBody(BaseModel):
email: str
host: str
class UserSignWithEmailResponse(BaseModel):
signin_type: int
@router.post(
"/reset-password-through-email",
operation_id="user-reset-password-through-email",
summary="user reset password through email",
description="A client user forgets the password. \
The system will send auth code the their email\
to let the user reset the password",
response_description="action: UserLoginAction",
)
async def reset_password_through_email(
item: UserSignWithEmailBody,
):
result = await SignInHub().reset_password_through_email(item.email, item.host)
result = {"action": result}
return JSONResponse(content=jsonable_encoder(result))

View File

@ -1,40 +0,0 @@
from backend.application.signin_hub import SignInHub
from pydantic import BaseModel
from fastapi import APIRouter
from common.token.token_manager import TokenManager
from fastapi import APIRouter, Depends
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from fastapi import Depends, HTTPException
from starlette.status import HTTP_401_UNAUTHORIZED
router = APIRouter()
token_manager = TokenManager()
# Web API
# sign_out
#
class RequestIn(BaseModel):
identity: str
@router.post(
"/sign-out",
operation_id="sign-out",
summary="sign out a logged in user",
description="sign out a logged in user",
response_description="none",
)
async def sign_out(
item: RequestIn,
current_user: dict = Depends(token_manager.get_current_user),
):
user_id = current_user.get("id")
if not user_id:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED, detail="Could not validate credentials"
)
result = await SignInHub().sign_out(user_id)
return JSONResponse(content=jsonable_encoder(result))

View File

@ -1,91 +0,0 @@
import logging
from datetime import datetime, timezone
from typing import Optional
from fastapi import APIRouter
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from backend.application.signin_hub import SignInHub
from common.token.token_manager import TokenManager
router = APIRouter()
token_manager = TokenManager()
# Web API
# signin-with-email-n-code
#
class RequestIn(BaseModel):
email: str
code: str
host: str
time_zone: Optional[str] = "UTC"
class ResponseOut(BaseModel):
# 1: succeeded
signin_result: int
# the access token for futhur communication with server
access_token: Optional[str] = None
# the refresh token for new access token generation
refresh_token: Optional[str] = None
# the identity of the signed in user
identity: Optional[str] = None
# the date time when the access toke will be expired
expires_in: Optional[datetime] = None
# the system assigned role of the user.
role: Optional[int] = None
# preferred region for user
preferred_region: Optional[str] = None
@router.post(
"/signin-with-email-and-code",
operation_id="user-signin-with-email-and-code",
summary="try to signin with email and authentication code",
description="client user is trying to sign in with their email and the authenication code \
the system sent to the email in previous step.",
response_model=ResponseOut,
)
async def signin_with_email_and_code(item: RequestIn) -> ResponseOut:
(
signed_in,
adminstrative_role,
identity,
flid,
preferred_region,
) = await SignInHub().signin_with_email_and_code(
item.email, item.code, item.host, item.time_zone
)
logging.debug(
f"signedin={signed_in}, adminstrative_role={adminstrative_role}, identity={identity}"
)
if signed_in and identity and adminstrative_role:
subject = {"id": identity, "role": adminstrative_role}
access_token = token_manager.create_access_token(subject=subject)
refresh_token = token_manager.create_refresh_token(subject=subject)
expires_in = (
datetime.now(timezone.utc) + token_manager.access_token_expire_minutes
)
else:
access_token = None
refresh_token = None
expires_in = None
result = {
"signin_result": signed_in,
"access_token": access_token,
"refresh_token": refresh_token,
"identity": identity,
"expires_in": expires_in,
"role": adminstrative_role,
"flid": flid,
"preferred_region": preferred_region,
}
return JSONResponse(content=jsonable_encoder(result))

View File

@ -1,87 +0,0 @@
import logging
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel
from fastapi import APIRouter
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from fastapi import Depends, HTTPException
from starlette.status import HTTP_401_UNAUTHORIZED
from backend.application.signin_hub import SignInHub
from common.token.token_manager import TokenManager
router = APIRouter()
token_manager = TokenManager()
# Web API
# signin-with-email-n-password
#
class RequestIn(BaseModel):
email: str
password: str
class ResponseOut(BaseModel):
# 1: succeeded
signin_result: int
# the access token for futhur communication with server
access_token: Optional[str] = None
# the refresh token for new access token generation
refresh_token: Optional[str] = None
# the identity of the signed in user
identity: Optional[str] = None
# the date time when the access toke will be expired
expires_in: Optional[datetime] = None
# the system assigned role of the user.
role: Optional[int] = None
# the flid of the user
flid: Optional[str] = None
@router.post(
"/signin-with-email-and-password",
operation_id="user-signin-with-email-and-password",
summary="try to signin with email and password",
description="client user is trying to sign in with their email and the password .",
response_model=ResponseOut,
)
async def signin_with_email_and_password(
item: RequestIn,
) -> ResponseOut:
(
signed_in,
adminstrative_role,
identity,
flid,
) = await SignInHub().signin_with_email_and_password(item.email, item.password)
logging.debug(
f"signedin={signed_in}, adminstrative_role={adminstrative_role}, identity={identity}"
)
if signed_in and adminstrative_role and identity:
subject = {"id": identity, "role": adminstrative_role}
access_token = token_manager.create_access_token(subject=subject)
refresh_token = token_manager.create_refresh_token(subject=subject)
expires_in = (
datetime.now(timezone.utc) + token_manager.access_token_expire_minutes
)
else:
access_token = None
refresh_token = None
expires_in = None
result = {
"signin_result": signed_in,
"access_token": access_token,
"refresh_token": refresh_token,
"identity": identity,
"expires_in": expires_in,
"role": adminstrative_role,
"flid": flid,
}
return JSONResponse(content=jsonable_encoder(result))

View File

@ -1,38 +0,0 @@
from backend.application.signin_hub import SignInHub
from pydantic import BaseModel
from fastapi import APIRouter
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
router = APIRouter()
# Web API
# try_signin_with_email
#
class UserSignWithEmailBody(BaseModel):
email: str
host: str
class UserSignWithEmailResponse(BaseModel):
signin_type: int
@router.post(
"/try-signin-with-email",
operation_id="user-try-signin-with-email",
summary="try to signin with email",
description="A client user is trying to sign in with their email. \
The system will determine to send an authentication code to the email \
or let the uesr use their FLID and passward to sign in",
response_description="signin_type:0 meaning simplified(using email) signin, \
1 meaning standard(using FLID and passward) signin",
)
async def try_signin_with_email(
item: UserSignWithEmailBody,
):
result = await SignInHub().try_signin_with_email(item.email, item.host)
result = {"signin_type": result}
return JSONResponse(content=jsonable_encoder(result))

View File

@ -1,50 +0,0 @@
from backend.application.signin_hub import SignInHub
from pydantic import BaseModel
from fastapi import APIRouter
from common.token.token_manager import TokenManager
from fastapi import APIRouter, Depends
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from fastapi import Depends, HTTPException
from starlette.status import HTTP_401_UNAUTHORIZED
router = APIRouter()
token_manager = TokenManager()
# Web API
# update_user_flid
#
class RequestIn(BaseModel):
flid: str
@router.post(
"/update-new-user-flid",
operation_id="user_update_new_user_password",
summary="update the new user's freeleaps user id",
description="Freeleaps user ID(FLID) is a unique identifier to be used in git and other service across the platform",
response_description="signin result to indicate the next step for the client",
)
async def update_new_user_flid(
item: RequestIn,
current_user: dict = Depends(token_manager.get_current_user),
):
user_id = current_user.get("id")
if not user_id:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED, detail="Could not validate credentials"
)
(
signed_in,
flid,
) = await SignInHub().update_new_user_flid(user_id, item.flid)
result = {
"signin_result": signed_in,
"flid": flid,
}
return JSONResponse(content=jsonable_encoder(result))

View File

@ -1,49 +0,0 @@
from backend.application.signin_hub import SignInHub
from pydantic import BaseModel
from fastapi import APIRouter
from common.token.token_manager import TokenManager
from fastapi import APIRouter, Depends
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from fastapi import Depends, HTTPException
from starlette.status import HTTP_401_UNAUTHORIZED
router = APIRouter()
token_manager = TokenManager()
# Web API
# update_user_password
#
class RequestIn(BaseModel):
password: str
password2: str
@router.post(
"/update-user-password",
operation_id="user_update_user_password",
summary="updathe user's sign-in password",
description="Update the user's sign-in password. If the password was not set yet, this will enable the user to log in using the password",
response_description="signin_type:0 meaning simplified(using email) signin, \
1 meaning standard(using FLID and passward) signin",
)
async def update_user_password(
item: RequestIn,
current_user: dict = Depends(token_manager.get_current_user),
):
user_id = current_user.get("id")
if not user_id:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED, detail="Could not validate credentials"
)
if item.password != item.password2:
return JSONResponse(
content=jsonable_encoder(
{"error": "password and password2 are not the same"}
)
)
else:
result = await SignInHub().update_user_password(user_id, item.password)
return JSONResponse(content=jsonable_encoder(result))

View File

@ -1,12 +0,0 @@
from fastapi import APIRouter
from .generate_tokens import router as generate_tokens_router
from .refresh_token import router as refresh_token_router
from .verify_token import router as verify_token_router
router = APIRouter()
router.include_router(
generate_tokens_router, prefix="/token", tags=["Token Management"]
)
router.include_router(refresh_token_router, prefix="/token", tags=["Token Management"])
router.include_router(verify_token_router, prefix="/token", tags=["Token Management"])

View File

@ -1,35 +0,0 @@
from fastapi import APIRouter
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
from common.token.token_manager import TokenManager
router = APIRouter()
token_manager = TokenManager() # Initialize TokenManager
class TokenRequest(BaseModel):
id: str
role: int
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
expires_in: datetime
@router.post("/generate-tokens", response_model=TokenResponse)
async def generate_tokens(request: TokenRequest):
"""
Endpoint to generate access and refresh tokens.
"""
subject = {"id": request.id, "role": request.role}
access_token = token_manager.create_access_token(subject)
refresh_token = token_manager.create_refresh_token(subject)
expires_in = datetime.now(timezone.utc) + timedelta(
minutes=token_manager.access_token_expire_minutes
)
return TokenResponse(
access_token=access_token, refresh_token=refresh_token, expires_in=expires_in
)

View File

@ -1,33 +0,0 @@
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from common.token.token_manager import TokenManager
router = APIRouter()
token_manager = TokenManager() # Initialize TokenManager
class RefreshTokenRequest(BaseModel):
refresh_token: str
id: str
role: int
class RefreshTokenResponse(BaseModel):
access_token: str
@router.post("/refresh-token", response_model=RefreshTokenResponse)
async def refresh_token(request: RefreshTokenRequest):
"""
Endpoint to refresh the access token using a valid refresh token.
"""
subject = {"id": request.id, "role": request.role}
try:
access_token = token_manager.refresh_access_token(
request.refresh_token, subject
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return RefreshTokenResponse(access_token=access_token)

View File

@ -1,27 +0,0 @@
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from common.token.token_manager import TokenManager
router = APIRouter()
token_manager = TokenManager() # Initialize TokenManager
class VerifyTokenRequest(BaseModel):
token: str
class VerifyTokenResponse(BaseModel):
valid: bool
payload: dict
@router.post("/verify-token", response_model=VerifyTokenResponse)
async def verify_token(request: VerifyTokenRequest):
"""
Endpoint to verify if a token is valid and return the payload.
"""
try:
payload = token_manager.decode_token(request.token)
return VerifyTokenResponse(valid=True, payload=payload)
except ValueError:
raise HTTPException(status_code=401, detail="Invalid or expired token")