freeleaps-service-hub/apps/notification/common/token/token_manager.py
2024-10-30 20:49:50 -07:00

81 lines
2.8 KiB
Python

from datetime import datetime, timedelta, timezone
from typing import Dict
from jose import jwt, JWTError
from common.config.app_settings import app_settings
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from starlette.status import HTTP_401_UNAUTHORIZED
class TokenManager:
def __init__(self):
self.secret_key = app_settings.JWT_SECRET_KEY
self.algorithm = "HS256"
self.access_token_expire_minutes = app_settings.ACCESS_TOKEN_EXPIRE_MINUTES
self.refresh_token_expire_days = app_settings.REFRESH_TOKEN_EXPIRE_DAYS
def create_access_token(self, subject: Dict[str, str]) -> str:
"""
Generates an access token with a short expiration time.
"""
expire = datetime.now(timezone.utc) + timedelta(
minutes=self.access_token_expire_minutes
)
to_encode = subject.copy()
to_encode.update({"exp": expire})
return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
def create_refresh_token(self, subject: Dict[str, str]) -> str:
"""
Generates a refresh token with a longer expiration time.
"""
expire = datetime.now(timezone.utc) + timedelta(
days=self.refresh_token_expire_days
)
to_encode = subject.copy()
to_encode.update({"exp": expire})
return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
def decode_token(self, token: str) -> Dict:
"""
Decodes a JWT token and returns the payload.
"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return payload
except JWTError:
raise ValueError("Invalid token")
def verify_refresh_token(self, token: str) -> bool:
"""
Verifies a refresh token to ensure it is valid and not expired.
"""
try:
payload = self.decode_token(token)
return True
except ValueError:
return False
def refresh_access_token(self, refresh_token: str, subject: Dict[str, str]) -> str:
"""
Verifies the refresh token and creates a new access token.
"""
if self.verify_refresh_token(refresh_token):
return self.create_access_token(subject)
else:
raise ValueError("Invalid refresh token")
async def get_current_user(
self, token: str = Depends(OAuth2PasswordBearer(tokenUrl="token"))
) -> Dict:
"""
Extract and validate user information from the JWT token.
"""
try:
payload = self.decode_token(token) # Decode JWT token
return payload
except ValueError:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED, detail="Invalid or expired token"
)