""" JWT Token Utilities for Authentication """ import jwt from datetime import datetime, timedelta from django.conf import settings from django.utils import timezone def get_jwt_secret_key(): """Get JWT secret key from settings or fallback to Django SECRET_KEY""" return getattr(settings, 'JWT_SECRET_KEY', settings.SECRET_KEY) def get_jwt_algorithm(): """Get JWT algorithm from settings""" return getattr(settings, 'JWT_ALGORITHM', 'HS256') def get_access_token_expiry(remember_me=False): """Get access token expiry time from settings""" if remember_me: return getattr(settings, 'JWT_ACCESS_TOKEN_EXPIRY_REMEMBER_ME', timedelta(days=20)) return getattr(settings, 'JWT_ACCESS_TOKEN_EXPIRY', timedelta(hours=1)) def get_refresh_token_expiry(): """Get refresh token expiry time from settings""" return getattr(settings, 'JWT_REFRESH_TOKEN_EXPIRY', timedelta(days=30)) def generate_access_token(user, account=None, remember_me=False): """ Generate JWT access token for user Args: user: User instance account: Account instance (optional, will use user.account if not provided) remember_me: bool - If True, use extended expiry (20 days) Returns: str: JWT access token """ if account is None: account = getattr(user, 'account', None) now = timezone.now() expiry = now + get_access_token_expiry(remember_me=remember_me) payload = { 'user_id': user.id, 'account_id': account.id if account else None, 'email': user.email, 'exp': int(expiry.timestamp()), 'iat': int(now.timestamp()), 'type': 'access', 'remember_me': remember_me, } token = jwt.encode(payload, get_jwt_secret_key(), algorithm=get_jwt_algorithm()) return token def generate_refresh_token(user, account=None): """ Generate JWT refresh token for user Args: user: User instance account: Account instance (optional, will use user.account if not provided) Returns: str: JWT refresh token """ if account is None: account = getattr(user, 'account', None) now = timezone.now() expiry = now + get_refresh_token_expiry() payload = { 'user_id': user.id, 'account_id': account.id if account else None, 'exp': int(expiry.timestamp()), 'iat': int(now.timestamp()), 'type': 'refresh', } token = jwt.encode(payload, get_jwt_secret_key(), algorithm=get_jwt_algorithm()) return token def decode_token(token): """ Decode and validate JWT token Args: token: JWT token string Returns: dict: Decoded token payload Raises: jwt.InvalidTokenError: If token is invalid or expired """ try: payload = jwt.decode( token, get_jwt_secret_key(), algorithms=[get_jwt_algorithm()], options={"verify_signature": True, "verify_exp": True} ) return payload except jwt.ExpiredSignatureError: raise jwt.InvalidTokenError("Token has expired") except jwt.InvalidTokenError: raise def get_token_expiry(token_type='access'): """ Get token expiry datetime Args: token_type: 'access' or 'refresh' Returns: datetime: Expiry datetime """ now = timezone.now() if token_type == 'refresh': return now + get_refresh_token_expiry() return now + get_access_token_expiry() def validate_account_and_plan(user_or_account): """ Validate account exists and has active plan. Allows trial, active, and pending_payment statuses. Bypasses validation for superusers, developers, and system accounts. Args: user_or_account: User or Account instance Returns: tuple: (is_valid: bool, error_msg: str or None, http_status: int or None) """ from rest_framework import status from .models import User, Account # Extract account from user or use directly if isinstance(user_or_account, User): try: account = getattr(user_or_account, 'account', None) except Exception: account = None elif isinstance(user_or_account, Account): account = user_or_account # Check if account is a system account try: if hasattr(account, 'is_system_account') and account.is_system_account(): return (True, None, None) except Exception: pass else: return (False, 'Invalid object type', status.HTTP_400_BAD_REQUEST) # Check account exists if not account: return ( False, 'Account not configured for this user. Please contact support.', status.HTTP_403_FORBIDDEN ) # Check account status - allow trial, active, pending_payment # Block only suspended and cancelled if hasattr(account, 'status') and account.status in ['suspended', 'cancelled']: return ( False, f'Account is {account.status}. Please contact support.', status.HTTP_403_FORBIDDEN ) # Check plan exists and is active plan = getattr(account, 'plan', None) if not plan: return ( False, 'No subscription plan assigned. Visit igny8.com/pricing to subscribe.', status.HTTP_402_PAYMENT_REQUIRED ) if hasattr(plan, 'is_active') and not plan.is_active: return ( False, 'Active subscription required. Visit igny8.com/pricing to subscribe.', status.HTTP_402_PAYMENT_REQUIRED ) return (True, None, None)