Files
igny8/backend/igny8_core/auth/middleware.py
2025-12-15 07:14:06 +00:00

207 lines
9.3 KiB
Python

"""
Multi-Account Middleware
Extracts account from JWT token and injects into request context
"""
import logging
from django.utils.deprecation import MiddlewareMixin
from django.http import JsonResponse
from django.contrib.auth import logout
from rest_framework import status
logger = logging.getLogger('auth.middleware')
try:
import jwt
JWT_AVAILABLE = True
except ImportError:
JWT_AVAILABLE = False
from django.conf import settings
class AccountContextMiddleware(MiddlewareMixin):
"""
Middleware that extracts account information from JWT token
and adds it to request context for account isolation.
"""
def process_request(self, request):
"""Extract account from JWT token in Authorization header or session."""
# Skip for admin and auth endpoints
if request.path.startswith('/admin/') or request.path.startswith('/api/v1/auth/'):
return None
# First, try to get user from Django session (cookie-based auth)
# This handles cases where frontend uses credentials: 'include' with session cookies
if hasattr(request, 'user') and request.user and request.user.is_authenticated:
# CRITICAL FIX: Never query DB again or mutate request.user
# Django's AuthenticationMiddleware already loaded the user correctly
# Just use it directly and set request.account from the ALREADY LOADED relationship
try:
# Validate account/plan - but use the user object already set by Django
validation_error = self._validate_account_and_plan(request, request.user)
if validation_error:
return validation_error
# Set request.account from the user's account relationship
# This is already loaded, no need to query DB again
request.account = getattr(request.user, 'account', None)
# CRITICAL: Add account ID to session to prevent cross-contamination
# This ensures each session is tied to a specific account
if request.account:
request.session['_account_id'] = request.account.id
request.session['_user_id'] = request.user.id
# Verify session integrity - if stored IDs don't match, logout
stored_account_id = request.session.get('_account_id')
stored_user_id = request.session.get('_user_id')
if stored_account_id and stored_account_id != request.account.id:
# Session contamination detected - force logout
logger.warning(
f"[AUTO-LOGOUT] Session contamination: account_id mismatch. "
f"Session={stored_account_id}, Current={request.account.id}, "
f"User={request.user.id}, Path={request.path}, IP={request.META.get('REMOTE_ADDR')}"
)
logout(request)
return JsonResponse(
{'success': False, 'error': 'Session integrity violation detected. Please login again.'},
status=status.HTTP_401_UNAUTHORIZED
)
if stored_user_id and stored_user_id != request.user.id:
# Session contamination detected - force logout
logger.warning(
f"[AUTO-LOGOUT] Session contamination: user_id mismatch. "
f"Session={stored_user_id}, Current={request.user.id}, "
f"Account={request.account.id if request.account else None}, "
f"Path={request.path}, IP={request.META.get('REMOTE_ADDR')}"
)
logout(request)
return JsonResponse(
{'success': False, 'error': 'Session integrity violation detected. Please login again.'},
status=status.HTTP_401_UNAUTHORIZED
)
return None
except (AttributeError, Exception):
# If anything fails, just set account to None and continue
request.account = None
return None
# Get token from Authorization header (JWT auth - for future implementation)
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
# No JWT token - if session auth didn't work, set account to None
# But don't set request.user to None - it might be set by Django's auth middleware
if not hasattr(request, 'account'):
request.account = None
return None
token = auth_header.split(' ')[1] if len(auth_header.split(' ')) > 1 else None
if not token:
if not hasattr(request, 'account'):
request.account = None
return None
try:
if not JWT_AVAILABLE:
# JWT library not installed yet - skip for now
request.account = None
return None
# Decode JWT token with signature verification
# Use JWT_SECRET_KEY from settings (falls back to SECRET_KEY if not set)
jwt_secret = getattr(settings, 'JWT_SECRET_KEY', getattr(settings, 'SECRET_KEY', None))
if not jwt_secret:
raise ValueError("JWT_SECRET_KEY or SECRET_KEY must be set in settings")
decoded = jwt.decode(token, jwt_secret, algorithms=[getattr(settings, 'JWT_ALGORITHM', 'HS256')])
# Extract user and account info from token
user_id = decoded.get('user_id')
account_id = decoded.get('account_id')
if user_id:
from .models import User, Account
try:
# Get user from DB (but don't set request.user - let DRF authentication handle that)
# Only set request.account for account context
user = User.objects.select_related('account', 'account__plan').get(id=user_id)
validation_error = self._validate_account_and_plan(request, user)
if validation_error:
return validation_error
if account_id:
# Verify account still exists
try:
account = Account.objects.get(id=account_id)
request.account = account
except Account.DoesNotExist:
# Account from token doesn't exist - don't fallback, set to None
request.account = None
else:
# No account_id in token - set to None (don't fallback to user.account)
request.account = None
except (User.DoesNotExist, Account.DoesNotExist):
request.account = None
else:
request.account = None
except jwt.InvalidTokenError:
request.account = None
except Exception:
# Fail silently for now - allow unauthenticated access
request.account = None
return None
def _validate_account_and_plan(self, request, user):
"""
Ensure the authenticated user has an account and an active plan.
Uses shared validation helper for consistency.
Bypasses validation for superusers, developers, and system accounts.
"""
# Bypass validation for superusers
if getattr(user, 'is_superuser', False):
return None
# Bypass validation for developers
if hasattr(user, 'role') and user.role == 'developer':
return None
# Bypass validation for system account users
try:
if hasattr(user, 'is_system_account_user') and user.is_system_account_user():
return None
except Exception:
pass
from .utils import validate_account_and_plan
is_valid, error_message, http_status = validate_account_and_plan(user)
if not is_valid:
return self._deny_request(request, error_message, http_status)
return None
def _deny_request(self, request, error, status_code):
"""Logout session users (if any) and return a consistent JSON error."""
try:
if hasattr(request, 'user') and request.user and request.user.is_authenticated:
logger.warning(
f"[AUTO-LOGOUT] Account/plan validation failed: {error}. "
f"User={request.user.id}, Account={getattr(request, 'account', None)}, "
f"Path={request.path}, IP={request.META.get('REMOTE_ADDR')}"
)
logout(request)
except Exception as e:
logger.error(f"[AUTO-LOGOUT] Error during logout: {e}")
return JsonResponse(
{
'success': False,
'error': error,
},
status=status_code,
)