60 lines
2.0 KiB
Python
60 lines
2.0 KiB
Python
# application/auth/token/token_manager.py
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Dict
|
|
from jose import jwt
|
|
from infra.config.app_settings import app_settings
|
|
|
|
|
|
class TokenManager:
|
|
def __init__(self):
|
|
self.secret_key = app_settings.JWT_SECRET_KEY
|
|
self.algorithm = "HS256"
|
|
self.access_token_expire_minutes = app_settings.ACCESS_TOKEN_EXPIRE_MINUTES
|
|
self.refresh_token_expire_days = app_settings.REFRESH_TOKEN_EXPIRE_DAYS
|
|
|
|
def create_access_token(self, subject: Dict[str, str]) -> str:
|
|
"""
|
|
Generates an access token with a short expiration time.
|
|
Args:
|
|
subject (Dict[str, str]): A dictionary containing user information like 'id' and 'role'.
|
|
|
|
Returns:
|
|
str: Encoded JWT access token.
|
|
"""
|
|
expire = datetime.now(timezone.utc) + timedelta(
|
|
minutes=self.access_token_expire_minutes
|
|
)
|
|
to_encode = subject.copy()
|
|
to_encode.update({"exp": expire})
|
|
return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
|
|
|
|
def create_refresh_token(self, subject: Dict[str, str]) -> str:
|
|
"""
|
|
Generates a refresh token with a longer expiration time.
|
|
Args:
|
|
subject (Dict[str, str]): A dictionary containing user information like 'id' and 'role'.
|
|
|
|
Returns:
|
|
str: Encoded JWT refresh token.
|
|
"""
|
|
expire = datetime.now(timezone.utc) + timedelta(
|
|
days=self.refresh_token_expire_days
|
|
)
|
|
to_encode = subject.copy()
|
|
to_encode.update({"exp": expire})
|
|
return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
|
|
|
|
def decode_token(self, token: str) -> Dict:
|
|
"""
|
|
Decodes a JWT token and returns the payload.
|
|
Args:
|
|
token (str): Encoded JWT token.
|
|
|
|
Returns:
|
|
Dict: Decoded token payload.
|
|
|
|
Raises:
|
|
JWTError: If the token is invalid or expired.
|
|
"""
|
|
return jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
|