131 lines
3.2 KiB
Python
131 lines
3.2 KiB
Python
"""
|
|
JWT Token Utilities for Authentication
|
|
"""
|
|
import jwt
|
|
from datetime import datetime, timedelta
|
|
from django.conf import settings
|
|
from django.utils import timezone
|
|
|
|
|
|
def get_jwt_secret_key():
|
|
"""Get JWT secret key from settings or fallback to Django SECRET_KEY"""
|
|
return getattr(settings, 'JWT_SECRET_KEY', settings.SECRET_KEY)
|
|
|
|
|
|
def get_jwt_algorithm():
|
|
"""Get JWT algorithm from settings"""
|
|
return getattr(settings, 'JWT_ALGORITHM', 'HS256')
|
|
|
|
|
|
def get_access_token_expiry():
|
|
"""Get access token expiry time from settings"""
|
|
return getattr(settings, 'JWT_ACCESS_TOKEN_EXPIRY', timedelta(minutes=15))
|
|
|
|
|
|
def get_refresh_token_expiry():
|
|
"""Get refresh token expiry time from settings"""
|
|
return getattr(settings, 'JWT_REFRESH_TOKEN_EXPIRY', timedelta(days=7))
|
|
|
|
|
|
def generate_access_token(user, account=None):
|
|
"""
|
|
Generate JWT access token for user
|
|
|
|
Args:
|
|
user: User instance
|
|
account: Account instance (optional, will use user.account if not provided)
|
|
|
|
Returns:
|
|
str: JWT access token
|
|
"""
|
|
if account is None:
|
|
account = getattr(user, 'account', None)
|
|
|
|
now = timezone.now()
|
|
expiry = now + get_access_token_expiry()
|
|
|
|
payload = {
|
|
'user_id': user.id,
|
|
'account_id': account.id if account else None,
|
|
'email': user.email,
|
|
'exp': int(expiry.timestamp()),
|
|
'iat': int(now.timestamp()),
|
|
'type': 'access',
|
|
}
|
|
|
|
token = jwt.encode(payload, get_jwt_secret_key(), algorithm=get_jwt_algorithm())
|
|
return token
|
|
|
|
|
|
def generate_refresh_token(user, account=None):
|
|
"""
|
|
Generate JWT refresh token for user
|
|
|
|
Args:
|
|
user: User instance
|
|
account: Account instance (optional, will use user.account if not provided)
|
|
|
|
Returns:
|
|
str: JWT refresh token
|
|
"""
|
|
if account is None:
|
|
account = getattr(user, 'account', None)
|
|
|
|
now = timezone.now()
|
|
expiry = now + get_refresh_token_expiry()
|
|
|
|
payload = {
|
|
'user_id': user.id,
|
|
'account_id': account.id if account else None,
|
|
'exp': int(expiry.timestamp()),
|
|
'iat': int(now.timestamp()),
|
|
'type': 'refresh',
|
|
}
|
|
|
|
token = jwt.encode(payload, get_jwt_secret_key(), algorithm=get_jwt_algorithm())
|
|
return token
|
|
|
|
|
|
def decode_token(token):
|
|
"""
|
|
Decode and validate JWT token
|
|
|
|
Args:
|
|
token: JWT token string
|
|
|
|
Returns:
|
|
dict: Decoded token payload
|
|
|
|
Raises:
|
|
jwt.InvalidTokenError: If token is invalid or expired
|
|
"""
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
get_jwt_secret_key(),
|
|
algorithms=[get_jwt_algorithm()],
|
|
options={"verify_signature": True, "verify_exp": True}
|
|
)
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
raise jwt.InvalidTokenError("Token has expired")
|
|
except jwt.InvalidTokenError:
|
|
raise
|
|
|
|
|
|
def get_token_expiry(token_type='access'):
|
|
"""
|
|
Get token expiry datetime
|
|
|
|
Args:
|
|
token_type: 'access' or 'refresh'
|
|
|
|
Returns:
|
|
datetime: Expiry datetime
|
|
"""
|
|
now = timezone.now()
|
|
if token_type == 'refresh':
|
|
return now + get_refresh_token_expiry()
|
|
return now + get_access_token_expiry()
|
|
|