193 lines
5.1 KiB
Python
193 lines
5.1 KiB
Python
"""
|
|
JWT Token Utilities for Authentication
|
|
"""
|
|
import jwt
|
|
from datetime import datetime, timedelta
|
|
from django.conf import settings
|
|
from django.utils import timezone
|
|
|
|
|
|
def get_jwt_secret_key():
|
|
"""Get JWT secret key from settings or fallback to Django SECRET_KEY"""
|
|
return getattr(settings, 'JWT_SECRET_KEY', settings.SECRET_KEY)
|
|
|
|
|
|
def get_jwt_algorithm():
|
|
"""Get JWT algorithm from settings"""
|
|
return getattr(settings, 'JWT_ALGORITHM', 'HS256')
|
|
|
|
|
|
def get_access_token_expiry():
|
|
"""Get access token expiry time from settings"""
|
|
return getattr(settings, 'JWT_ACCESS_TOKEN_EXPIRY', timedelta(minutes=15))
|
|
|
|
|
|
def get_refresh_token_expiry():
|
|
"""Get refresh token expiry time from settings"""
|
|
return getattr(settings, 'JWT_REFRESH_TOKEN_EXPIRY', timedelta(days=7))
|
|
|
|
|
|
def generate_access_token(user, account=None):
|
|
"""
|
|
Generate JWT access token for user
|
|
|
|
Args:
|
|
user: User instance
|
|
account: Account instance (optional, will use user.account if not provided)
|
|
|
|
Returns:
|
|
str: JWT access token
|
|
"""
|
|
if account is None:
|
|
account = getattr(user, 'account', None)
|
|
|
|
now = timezone.now()
|
|
expiry = now + get_access_token_expiry()
|
|
|
|
payload = {
|
|
'user_id': user.id,
|
|
'account_id': account.id if account else None,
|
|
'email': user.email,
|
|
'exp': int(expiry.timestamp()),
|
|
'iat': int(now.timestamp()),
|
|
'type': 'access',
|
|
}
|
|
|
|
token = jwt.encode(payload, get_jwt_secret_key(), algorithm=get_jwt_algorithm())
|
|
return token
|
|
|
|
|
|
def generate_refresh_token(user, account=None):
|
|
"""
|
|
Generate JWT refresh token for user
|
|
|
|
Args:
|
|
user: User instance
|
|
account: Account instance (optional, will use user.account if not provided)
|
|
|
|
Returns:
|
|
str: JWT refresh token
|
|
"""
|
|
if account is None:
|
|
account = getattr(user, 'account', None)
|
|
|
|
now = timezone.now()
|
|
expiry = now + get_refresh_token_expiry()
|
|
|
|
payload = {
|
|
'user_id': user.id,
|
|
'account_id': account.id if account else None,
|
|
'exp': int(expiry.timestamp()),
|
|
'iat': int(now.timestamp()),
|
|
'type': 'refresh',
|
|
}
|
|
|
|
token = jwt.encode(payload, get_jwt_secret_key(), algorithm=get_jwt_algorithm())
|
|
return token
|
|
|
|
|
|
def decode_token(token):
|
|
"""
|
|
Decode and validate JWT token
|
|
|
|
Args:
|
|
token: JWT token string
|
|
|
|
Returns:
|
|
dict: Decoded token payload
|
|
|
|
Raises:
|
|
jwt.InvalidTokenError: If token is invalid or expired
|
|
"""
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
get_jwt_secret_key(),
|
|
algorithms=[get_jwt_algorithm()],
|
|
options={"verify_signature": True, "verify_exp": True}
|
|
)
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
raise jwt.InvalidTokenError("Token has expired")
|
|
except jwt.InvalidTokenError:
|
|
raise
|
|
|
|
|
|
def get_token_expiry(token_type='access'):
|
|
"""
|
|
Get token expiry datetime
|
|
|
|
Args:
|
|
token_type: 'access' or 'refresh'
|
|
|
|
Returns:
|
|
datetime: Expiry datetime
|
|
"""
|
|
now = timezone.now()
|
|
if token_type == 'refresh':
|
|
return now + get_refresh_token_expiry()
|
|
return now + get_access_token_expiry()
|
|
|
|
|
|
|
|
|
|
def validate_account_and_plan(user_or_account):
|
|
"""
|
|
Validate account exists and has active plan.
|
|
Allows trial, active, and pending_payment statuses.
|
|
|
|
Args:
|
|
user_or_account: User or Account instance
|
|
|
|
Returns:
|
|
tuple: (is_valid: bool, error_msg: str or None, http_status: int or None)
|
|
"""
|
|
from rest_framework import status
|
|
from .models import User, Account
|
|
|
|
# Extract account from user or use directly
|
|
if isinstance(user_or_account, User):
|
|
try:
|
|
account = getattr(user_or_account, 'account', None)
|
|
except Exception:
|
|
account = None
|
|
elif isinstance(user_or_account, Account):
|
|
account = user_or_account
|
|
else:
|
|
return (False, 'Invalid object type', status.HTTP_400_BAD_REQUEST)
|
|
|
|
# Check account exists
|
|
if not account:
|
|
return (
|
|
False,
|
|
'Account not configured for this user. Please contact support.',
|
|
status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
# Check account status - allow trial, active, pending_payment
|
|
# Block only suspended and cancelled
|
|
if hasattr(account, 'status') and account.status in ['suspended', 'cancelled']:
|
|
return (
|
|
False,
|
|
f'Account is {account.status}. Please contact support.',
|
|
status.HTTP_403_FORBIDDEN
|
|
)
|
|
|
|
# Check plan exists and is active
|
|
plan = getattr(account, 'plan', None)
|
|
if not plan:
|
|
return (
|
|
False,
|
|
'No subscription plan assigned. Visit igny8.com/pricing to subscribe.',
|
|
status.HTTP_402_PAYMENT_REQUIRED
|
|
)
|
|
|
|
if hasattr(plan, 'is_active') and not plan.is_active:
|
|
return (
|
|
False,
|
|
'Active subscription required. Visit igny8.com/pricing to subscribe.',
|
|
status.HTTP_402_PAYMENT_REQUIRED
|
|
)
|
|
|
|
return (True, None, None)
|