Files
igny8/backend/igny8_core/auth/utils.py
IGNY8 VPS (Salman) 4fb3a144d7 messy logout fixing
2025-12-15 12:01:41 +00:00

231 lines
6.8 KiB
Python

"""
JWT Token Utilities for Authentication
"""
import jwt
from datetime import datetime, timedelta
from django.conf import settings
from django.utils import timezone
def get_jwt_secret_key():
"""Get JWT secret key from settings or fallback to Django SECRET_KEY"""
return getattr(settings, 'JWT_SECRET_KEY', settings.SECRET_KEY)
def get_jwt_algorithm():
"""Get JWT algorithm from settings"""
return getattr(settings, 'JWT_ALGORITHM', 'HS256')
def get_access_token_expiry():
"""Get access token expiry time from settings"""
return getattr(settings, 'JWT_ACCESS_TOKEN_EXPIRY', timedelta(minutes=15))
def get_refresh_token_expiry():
"""Get refresh token expiry time from settings"""
return getattr(settings, 'JWT_REFRESH_TOKEN_EXPIRY', timedelta(days=7))
def generate_access_token(user, account=None):
"""
Generate JWT access token for user
Args:
user: User instance
account: Account instance (optional, will use user.account if not provided)
Returns:
str: JWT access token
"""
if account is None:
account = getattr(user, 'account', None)
now = timezone.now()
expiry = now + get_access_token_expiry()
payload = {
'user_id': user.id,
'account_id': account.id if account else None,
'email': user.email,
'exp': int(expiry.timestamp()),
'iat': int(now.timestamp()),
'type': 'access',
}
token = jwt.encode(payload, get_jwt_secret_key(), algorithm=get_jwt_algorithm())
return token
def generate_refresh_token_pair(user, account=None, remember_me=False, device_id='', user_agent='', ip_address=None):
"""
Generate JWT refresh token and store it server-side for rotation/revocation.
Args:
user: User instance
account: Account instance (optional, will use user.account if not provided)
remember_me: If True, token expires in 20 days; otherwise 7 days
device_id: Client device identifier
user_agent: Browser user agent
ip_address: Client IP address
Returns:
tuple: (refresh_token_string, refresh_token_id, expiry_datetime)
"""
from .models_refresh_token import RefreshToken
if account is None:
account = getattr(user, 'account', None)
# Create server-side refresh token record
token_record = RefreshToken.create_token(
user=user,
remember_me=remember_me,
device_id=device_id,
user_agent=user_agent,
ip_address=ip_address
)
# Generate JWT with token_id embedded (for rotation tracking)
now = timezone.now()
payload = {
'user_id': user.id,
'account_id': account.id if account else None,
'token_id': token_record.token_id,
'exp': int(token_record.expires_at.timestamp()),
'iat': int(now.timestamp()),
'type': 'refresh',
}
token_string = jwt.encode(payload, get_jwt_secret_key(), algorithm=get_jwt_algorithm())
return token_string, token_record.token_id, token_record.expires_at
def decode_token(token):
"""
Decode and validate JWT token
Args:
token: JWT token string
Returns:
dict: Decoded token payload
Raises:
jwt.InvalidTokenError: If token is invalid or expired
"""
try:
payload = jwt.decode(
token,
get_jwt_secret_key(),
algorithms=[get_jwt_algorithm()],
options={"verify_signature": True, "verify_exp": True}
)
return payload
except jwt.ExpiredSignatureError:
raise jwt.InvalidTokenError("Token has expired")
except jwt.InvalidTokenError:
raise
def get_token_expiry(token_type='access'):
"""
Get token expiry datetime
Args:
token_type: 'access' or 'refresh'
Returns:
datetime: Expiry datetime
"""
now = timezone.now()
if token_type == 'refresh':
return now + get_refresh_token_expiry()
return now + get_access_token_expiry()
def validate_account_and_plan(user_or_account):
"""
Validate account exists and has active plan.
Allows trial, active, and pending_payment statuses.
Bypasses validation for superusers, developers, and system accounts.
Args:
user_or_account: User or Account instance
Returns:
tuple: (is_valid: bool, error_msg: str or None, http_status: int or None)
"""
from rest_framework import status
from .models import User, Account
# Bypass validation for superusers
if isinstance(user_or_account, User):
if getattr(user_or_account, 'is_superuser', False):
return (True, None, None)
# Bypass validation for developers
if hasattr(user_or_account, 'role') and user_or_account.role == 'developer':
return (True, None, None)
# Bypass validation for system account users
try:
if hasattr(user_or_account, 'is_system_account_user') and user_or_account.is_system_account_user():
return (True, None, None)
except Exception:
pass
# Extract account from user or use directly
if isinstance(user_or_account, User):
try:
account = getattr(user_or_account, 'account', None)
except Exception:
account = None
elif isinstance(user_or_account, Account):
account = user_or_account
# Check if account is a system account
try:
if hasattr(account, 'is_system_account') and account.is_system_account():
return (True, None, None)
except Exception:
pass
else:
return (False, 'Invalid object type', status.HTTP_400_BAD_REQUEST)
# Check account exists
if not account:
return (
False,
'Account not configured for this user. Please contact support.',
status.HTTP_403_FORBIDDEN
)
# Check account status - allow trial, active, pending_payment
# Block only suspended and cancelled
if hasattr(account, 'status') and account.status in ['suspended', 'cancelled']:
return (
False,
f'Account is {account.status}. Please contact support.',
status.HTTP_403_FORBIDDEN
)
# Check plan exists and is active
plan = getattr(account, 'plan', None)
if not plan:
return (
False,
'No subscription plan assigned. Visit igny8.com/pricing to subscribe.',
status.HTTP_402_PAYMENT_REQUIRED
)
if hasattr(plan, 'is_active') and not plan.is_active:
return (
False,
'Active subscription required. Visit igny8.com/pricing to subscribe.',
status.HTTP_402_PAYMENT_REQUIRED
)
return (True, None, None)