Files
igny8/backend/igny8_core/api/authentication.py
2025-11-21 20:59:50 +05:00

143 lines
5.1 KiB
Python

"""
Custom authentication classes for DRF
"""
from rest_framework.authentication import SessionAuthentication, BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
import jwt
class CSRFExemptSessionAuthentication(SessionAuthentication):
"""
Session authentication that doesn't enforce CSRF for API endpoints.
This is safe for API usage since we're using session cookies and proper CORS settings.
"""
def enforce_csrf(self, request):
"""
Override to skip CSRF enforcement for API endpoints.
"""
return # Skip CSRF check
class JWTAuthentication(BaseAuthentication):
"""
JWT token authentication for DRF.
Extracts JWT token from Authorization header and validates it.
"""
def authenticate(self, request):
"""
Authenticate the request and return a two-tuple of (user, token).
"""
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return None # No JWT token, let other auth classes handle it
token = auth_header.split(' ')[1] if len(auth_header.split(' ')) > 1 else None
if not token:
return None
try:
from igny8_core.auth.utils import decode_token
from igny8_core.auth.models import User, Account
# Decode and validate token
payload = decode_token(token)
# Verify it's an access token
if payload.get('type') != 'access':
# Invalid token type - return None to allow other auth classes to try
return None
# Get user
user_id = payload.get('user_id')
if not user_id:
# Invalid token payload - return None to allow other auth classes to try
return None
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
# User not found - return None to allow other auth classes to try
return None
# Get account from token
account_id = payload.get('account_id')
account = None
if account_id:
try:
account = Account.objects.get(id=account_id)
except Account.DoesNotExist:
# Account from token doesn't exist - don't fallback, set to None
account = None
# Set account on request (only if account_id was in token and account exists)
request.account = account
return (user, token)
except jwt.InvalidTokenError:
# Invalid or expired token - return None to allow other auth classes (session) to try
return None
except Exception as e:
# Other errors - return None to allow other auth classes to try
# This allows session authentication to work if JWT fails
return None
class APIKeyAuthentication(BaseAuthentication):
"""
API Key authentication for WordPress integration.
Validates API keys stored in Site.wp_api_key field.
"""
def authenticate(self, request):
"""
Authenticate using WordPress API key.
Returns (user, api_key) tuple if valid.
"""
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return None # Not an API key request
api_key = auth_header.split(' ')[1] if len(auth_header.split(' ')) > 1 else None
if not api_key or len(api_key) < 20: # API keys should be at least 20 chars
return None
# Don't try to authenticate JWT tokens (they start with 'ey')
if api_key.startswith('ey'):
return None # Let JWTAuthentication handle it
try:
from igny8_core.auth.models import Site, User
# Find site by API key
site = Site.objects.select_related('account', 'account__owner').filter(
wp_api_key=api_key,
is_active=True
).first()
if not site:
return None # API key not found or site inactive
# Get account and user
account = site.account
user = account.owner # Use account owner as the authenticated user
if not user.is_active:
raise AuthenticationFailed('User account is disabled.')
# Set account on request for tenant isolation
request.account = account
# Set site on request for WordPress integration context
request.site = site
return (user, api_key)
except Exception as e:
# Log the error but return None to allow other auth classes to try
import logging
logger = logging.getLogger(__name__)
logger.debug(f'APIKeyAuthentication error: {str(e)}')
return None