Files
2026-01-19 21:08:25 +00:00

736 lines
29 KiB
Python

"""
Authentication URL Configuration
"""
from django.urls import path, include
from django.views.decorators.csrf import csrf_exempt
from rest_framework.routers import DefaultRouter
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, permissions
from drf_spectacular.utils import extend_schema
from igny8_core.api.response import success_response, error_response
from django.conf import settings
from .views import (
GroupsViewSet, UsersViewSet, AccountsViewSet, SubscriptionsViewSet,
SiteUserAccessViewSet, PlanViewSet, SiteViewSet, SectorViewSet,
IndustryViewSet, SeedKeywordViewSet
)
from .serializers import RegisterSerializer, LoginSerializer, ChangePasswordSerializer, UserSerializer, RefreshTokenSerializer
from .models import User
from .utils import generate_access_token, get_token_expiry, decode_token
import jwt
router = DefaultRouter()
# Main structure: Groups, Users, Accounts, Subscriptions, Site User Access
router.register(r'groups', GroupsViewSet, basename='group')
router.register(r'users', UsersViewSet, basename='user')
router.register(r'accounts', AccountsViewSet, basename='account')
router.register(r'subscriptions', SubscriptionsViewSet, basename='subscription')
router.register(r'site-access', SiteUserAccessViewSet, basename='site-access')
# Supporting viewsets
router.register(r'plans', PlanViewSet, basename='plan')
router.register(r'sites', SiteViewSet, basename='site')
router.register(r'sectors', SectorViewSet, basename='sector')
router.register(r'industries', IndustryViewSet, basename='industry')
router.register(r'keywords-library', SeedKeywordViewSet, basename='keywords-library')
# Note: AuthViewSet removed - using direct APIView endpoints instead (login, register, etc.)
@extend_schema(
tags=['Authentication'],
summary='User Registration',
description='Register a new user account'
)
class RegisterView(APIView):
"""Registration endpoint."""
permission_classes = [permissions.AllowAny]
def post(self, request):
from .utils import generate_access_token, generate_refresh_token, get_access_token_expiry, get_refresh_token_expiry
from django.contrib.auth import login, logout
from django.utils import timezone
force_logout = request.data.get('force_logout', False)
serializer = RegisterSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
# SECURITY: Check for session contamination before login
# If there's an existing session from a different user, handle it
if request.session.session_key:
existing_user_id = request.session.get('_auth_user_id')
if existing_user_id and str(existing_user_id) != str(user.id):
# Get existing user details
try:
existing_user = User.objects.get(id=existing_user_id)
existing_email = existing_user.email
existing_username = existing_user.username or existing_email.split('@')[0]
except User.DoesNotExist:
existing_email = 'Unknown user'
existing_username = 'Unknown'
# If not forcing logout, return conflict info
if not force_logout:
return Response(
{
'status': 'error',
'error': 'session_conflict',
'message': f'You have an active session for another account ({existing_email}). Please logout first or choose to continue.',
'existing_user': {
'email': existing_email,
'username': existing_username,
'id': existing_user_id
},
'requested_user': {
'email': user.email,
'username': user.username or user.email.split('@')[0],
'id': user.id
}
},
status=status.HTTP_409_CONFLICT
)
# Force logout - clean existing session completely
logout(request)
# Clear all session data
request.session.flush()
# Log the user in (create session for session authentication)
login(request, user)
# Get account from user
account = getattr(user, 'account', None)
# Generate JWT tokens
access_token = generate_access_token(user, account)
refresh_token = generate_refresh_token(user, account)
access_expires_at = timezone.now() + get_access_token_expiry()
refresh_expires_at = timezone.now() + get_refresh_token_expiry()
user_serializer = UserSerializer(user)
# Build response data
response_data = {
'user': user_serializer.data,
'tokens': {
'access': access_token,
'refresh': refresh_token,
'access_expires_at': access_expires_at.isoformat(),
'refresh_expires_at': refresh_expires_at.isoformat(),
}
}
# NOTE: Payment checkout is NO LONGER created at registration
# User will complete payment on /account/plans after signup
# This simplifies the signup flow and consolidates all payment handling
# Send welcome email (if enabled in settings)
try:
from igny8_core.modules.system.email_models import EmailSettings
from igny8_core.business.billing.services.email_service import send_welcome_email
email_settings = EmailSettings.get_settings()
if email_settings.send_welcome_emails and account:
send_welcome_email(user, account)
except Exception as e:
# Don't fail registration if email fails
import logging
logger = logging.getLogger(__name__)
logger.error(f"Failed to send welcome email for user {user.id}: {e}")
return success_response(
data=response_data,
message='Registration successful',
status_code=status.HTTP_201_CREATED,
request=request
)
return error_response(
error='Validation failed',
errors=serializer.errors,
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
@extend_schema(
tags=['Authentication'],
summary='User Login',
description='Authenticate user and receive JWT tokens'
)
class LoginView(APIView):
"""Login endpoint."""
permission_classes = [permissions.AllowAny]
def post(self, request):
serializer = LoginSerializer(data=request.data)
if serializer.is_valid():
email = serializer.validated_data['email']
password = serializer.validated_data['password']
remember_me = serializer.validated_data.get('remember_me', False)
force_logout = request.data.get('force_logout', False)
try:
user = User.objects.select_related('account', 'account__plan').get(email=email)
except User.DoesNotExist:
return error_response(
error='Invalid credentials',
status_code=status.HTTP_401_UNAUTHORIZED,
request=request
)
if user.check_password(password):
# SECURITY: Check for session contamination before login
# If user has a session cookie from a different user, handle it
if request.session.session_key:
existing_user_id = request.session.get('_auth_user_id')
if existing_user_id and str(existing_user_id) != str(user.id):
# Get existing user details
try:
existing_user = User.objects.get(id=existing_user_id)
existing_email = existing_user.email
existing_username = existing_user.username or existing_email.split('@')[0]
except User.DoesNotExist:
existing_email = 'Unknown user'
existing_username = 'Unknown'
# If not forcing logout, return conflict info
if not force_logout:
return Response(
{
'status': 'error',
'error': 'session_conflict',
'message': f'You have an active session for another account ({existing_email}). Please logout first or choose to continue.',
'existing_user': {
'email': existing_email,
'username': existing_username,
'id': existing_user_id
},
'requested_user': {
'email': user.email,
'username': user.username or user.email.split('@')[0],
'id': user.id
}
},
status=status.HTTP_409_CONFLICT
)
# Force logout - clean existing session completely
from django.contrib.auth import logout
logout(request)
# Clear all session data
request.session.flush()
# Log the user in (create session for session authentication)
from django.contrib.auth import login
login(request, user)
# Get account from user
account = getattr(user, 'account', None)
# Generate JWT tokens
from .utils import generate_access_token, generate_refresh_token, get_access_token_expiry, get_refresh_token_expiry
from django.utils import timezone
access_token = generate_access_token(user, account, remember_me=remember_me)
refresh_token = generate_refresh_token(user, account)
access_expires_at = timezone.now() + get_access_token_expiry(remember_me=remember_me)
refresh_expires_at = timezone.now() + get_refresh_token_expiry()
# Serialize user data safely, handling missing account relationship
try:
user_serializer = UserSerializer(user)
user_data = user_serializer.data
except Exception as e:
# Fallback if serializer fails (e.g., missing account_id column)
# Log the error for debugging but don't fail the login
import logging
logger = logging.getLogger(__name__)
logger.warning(f"UserSerializer failed for user {user.id}: {e}", exc_info=True)
# Ensure username is properly set (use email prefix if username is empty/default)
username = user.username if user.username and user.username != 'user' else user.email.split('@')[0]
user_data = {
'id': user.id,
'username': username,
'email': user.email,
'role': user.role,
'account': None,
'accessible_sites': [],
}
return success_response(
data={
'user': user_data,
'access': access_token,
'refresh': refresh_token,
'access_expires_at': access_expires_at.isoformat(),
'refresh_expires_at': refresh_expires_at.isoformat(),
},
message='Login successful',
request=request
)
return error_response(
error='Invalid credentials',
status_code=status.HTTP_401_UNAUTHORIZED,
request=request
)
return error_response(
error='Validation failed',
errors=serializer.errors,
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
@extend_schema(
tags=['Authentication'],
summary='User Logout',
description='Clear session and logout user'
)
class LogoutView(APIView):
"""Logout endpoint."""
permission_classes = [permissions.AllowAny]
def post(self, request):
from django.contrib.auth import logout as django_logout
# Clear Django auth session
django_logout(request)
try:
request.session.flush()
except Exception:
# If session is unavailable or already cleared, ignore
pass
response = success_response(
message='Logged out successfully',
request=request
)
# Explicitly expire session cookie across domain/path
try:
response.delete_cookie(
settings.SESSION_COOKIE_NAME,
path=getattr(settings, 'SESSION_COOKIE_PATH', '/'),
domain=getattr(settings, 'SESSION_COOKIE_DOMAIN', None)
)
except Exception:
# If settings are misconfigured, still return success
pass
return response
@extend_schema(
tags=['Authentication'],
summary='Request Password Reset',
description='Request password reset email'
)
class PasswordResetRequestView(APIView):
"""Request password reset endpoint - sends email with reset token."""
permission_classes = [permissions.AllowAny]
def post(self, request):
from .serializers import RequestPasswordResetSerializer
from .models import PasswordResetToken
serializer = RequestPasswordResetSerializer(data=request.data)
if not serializer.is_valid():
return error_response(
error='Validation failed',
errors=serializer.errors,
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
email = serializer.validated_data['email']
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
# Don't reveal if email exists - return success anyway
return success_response(
message='If an account with that email exists, a password reset link has been sent.',
request=request
)
# Generate secure token
import secrets
token = secrets.token_urlsafe(32)
# Create reset token (expires in 1 hour)
from django.utils import timezone
from datetime import timedelta
expires_at = timezone.now() + timedelta(hours=1)
PasswordResetToken.objects.create(
user=user,
token=token,
expires_at=expires_at
)
# Send password reset email
import logging
logger = logging.getLogger(__name__)
logger.info(f"[PASSWORD_RESET] Attempting to send reset email to: {email}")
try:
from igny8_core.business.billing.services.email_service import send_password_reset_email
result = send_password_reset_email(user, token)
logger.info(f"[PASSWORD_RESET] Email send result: {result}")
print(f"[PASSWORD_RESET] Email send result: {result}") # Console output
except Exception as e:
logger.error(f"[PASSWORD_RESET] Failed to send password reset email: {e}", exc_info=True)
print(f"[PASSWORD_RESET] ERROR: {e}") # Console output
return success_response(
message='If an account with that email exists, a password reset link has been sent.',
request=request
)
@extend_schema(
tags=['Authentication'],
summary='Reset Password',
description='Reset password using token from email'
)
class PasswordResetConfirmView(APIView):
"""Confirm password reset with token."""
permission_classes = [permissions.AllowAny]
def post(self, request):
from .serializers import ResetPasswordSerializer
from .models import PasswordResetToken
from django.utils import timezone
serializer = ResetPasswordSerializer(data=request.data)
if not serializer.is_valid():
return error_response(
error='Validation failed',
errors=serializer.errors,
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
token = serializer.validated_data['token']
new_password = serializer.validated_data['new_password']
try:
reset_token = PasswordResetToken.objects.get(
token=token,
used=False,
expires_at__gt=timezone.now()
)
except PasswordResetToken.DoesNotExist:
return error_response(
error='Invalid or expired reset token',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Reset password
user = reset_token.user
user.set_password(new_password)
user.save()
# Mark token as used
reset_token.used = True
reset_token.save()
return success_response(
message='Password reset successfully. You can now log in with your new password.',
request=request
)
@extend_schema(
tags=['Authentication'],
summary='Change Password',
description='Change user password'
)
class ChangePasswordView(APIView):
"""Change password endpoint."""
permission_classes = [permissions.IsAuthenticated]
def post(self, request):
serializer = ChangePasswordSerializer(data=request.data, context={'request': request})
if serializer.is_valid():
user = request.user
if not user.check_password(serializer.validated_data['old_password']):
return error_response(
error='Current password is incorrect',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
user.set_password(serializer.validated_data['new_password'])
user.save()
return success_response(
message='Password changed successfully',
request=request
)
return error_response(
error='Validation failed',
errors=serializer.errors,
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
@extend_schema(
tags=['Authentication'],
summary='Refresh Token',
description='Refresh access token using refresh token'
)
class RefreshTokenView(APIView):
"""Refresh access token endpoint."""
permission_classes = [permissions.AllowAny]
def post(self, request):
serializer = RefreshTokenSerializer(data=request.data)
if not serializer.is_valid():
return error_response(
error='Validation failed',
errors=serializer.errors,
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
refresh_token = serializer.validated_data['refresh']
try:
# Decode and validate refresh token
payload = decode_token(refresh_token)
# Verify it's a refresh token
if payload.get('type') != 'refresh':
return error_response(
error='Invalid token type',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Get user
user_id = payload.get('user_id')
account_id = payload.get('account_id')
try:
user = User.objects.select_related('account', 'account__plan').get(id=user_id)
except User.DoesNotExist:
return error_response(
error='User not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
# Get account
account = None
if account_id:
try:
from .models import Account
account = Account.objects.get(id=account_id)
except Exception:
pass
if not account:
account = getattr(user, 'account', None)
# Generate new access token
from .utils import get_token_expiry
access_token = generate_access_token(user, account)
access_expires_at = get_token_expiry('access')
return success_response(
data={
'access': access_token,
'access_expires_at': access_expires_at.isoformat()
},
request=request
)
except jwt.InvalidTokenError:
return error_response(
error='Invalid or expired refresh token',
status_code=status.HTTP_401_UNAUTHORIZED,
request=request
)
@extend_schema(
tags=['Authentication'],
summary='Get Country List',
description='Returns list of countries for registration country selection'
)
class CountryListView(APIView):
"""Returns list of countries for signup dropdown"""
permission_classes = [permissions.AllowAny] # Public endpoint
def get(self, request):
"""Get list of countries with codes and names"""
# Comprehensive list of countries for billing purposes
countries = [
{'code': 'US', 'name': 'United States'},
{'code': 'GB', 'name': 'United Kingdom'},
{'code': 'CA', 'name': 'Canada'},
{'code': 'AU', 'name': 'Australia'},
{'code': 'DE', 'name': 'Germany'},
{'code': 'FR', 'name': 'France'},
{'code': 'ES', 'name': 'Spain'},
{'code': 'IT', 'name': 'Italy'},
{'code': 'NL', 'name': 'Netherlands'},
{'code': 'BE', 'name': 'Belgium'},
{'code': 'CH', 'name': 'Switzerland'},
{'code': 'AT', 'name': 'Austria'},
{'code': 'SE', 'name': 'Sweden'},
{'code': 'NO', 'name': 'Norway'},
{'code': 'DK', 'name': 'Denmark'},
{'code': 'FI', 'name': 'Finland'},
{'code': 'IE', 'name': 'Ireland'},
{'code': 'PT', 'name': 'Portugal'},
{'code': 'PL', 'name': 'Poland'},
{'code': 'CZ', 'name': 'Czech Republic'},
{'code': 'NZ', 'name': 'New Zealand'},
{'code': 'SG', 'name': 'Singapore'},
{'code': 'HK', 'name': 'Hong Kong'},
{'code': 'JP', 'name': 'Japan'},
{'code': 'KR', 'name': 'South Korea'},
{'code': 'IN', 'name': 'India'},
{'code': 'PK', 'name': 'Pakistan'},
{'code': 'BD', 'name': 'Bangladesh'},
{'code': 'AE', 'name': 'United Arab Emirates'},
{'code': 'SA', 'name': 'Saudi Arabia'},
{'code': 'ZA', 'name': 'South Africa'},
{'code': 'NG', 'name': 'Nigeria'},
{'code': 'EG', 'name': 'Egypt'},
{'code': 'KE', 'name': 'Kenya'},
{'code': 'BR', 'name': 'Brazil'},
{'code': 'MX', 'name': 'Mexico'},
{'code': 'AR', 'name': 'Argentina'},
{'code': 'CL', 'name': 'Chile'},
{'code': 'CO', 'name': 'Colombia'},
{'code': 'PE', 'name': 'Peru'},
{'code': 'MY', 'name': 'Malaysia'},
{'code': 'TH', 'name': 'Thailand'},
{'code': 'VN', 'name': 'Vietnam'},
{'code': 'PH', 'name': 'Philippines'},
{'code': 'ID', 'name': 'Indonesia'},
{'code': 'TR', 'name': 'Turkey'},
{'code': 'RU', 'name': 'Russia'},
{'code': 'UA', 'name': 'Ukraine'},
{'code': 'RO', 'name': 'Romania'},
{'code': 'GR', 'name': 'Greece'},
{'code': 'IL', 'name': 'Israel'},
{'code': 'TW', 'name': 'Taiwan'},
]
# Sort alphabetically by name
countries.sort(key=lambda x: x['name'])
return Response({'countries': countries})
@extend_schema(exclude=True) # Exclude from public API documentation - internal authenticated endpoint
class MeView(APIView):
"""Get current user information."""
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
# Refresh user from DB to get latest account/plan data
# This ensures account/plan changes are reflected immediately
from .models import User as UserModel
user = UserModel.objects.select_related('account', 'account__plan').get(id=request.user.id)
serializer = UserSerializer(user)
return success_response(
data={'user': serializer.data},
request=request
)
@extend_schema(
tags=['Authentication'],
summary='Unsubscribe from Emails',
description='Unsubscribe a user from marketing, billing, or all email notifications'
)
class UnsubscribeView(APIView):
"""Handle email unsubscribe requests with signed URLs."""
permission_classes = [permissions.AllowAny]
def post(self, request):
"""
Process unsubscribe request.
Expected payload:
- email: The email address to unsubscribe
- type: Type of emails to unsubscribe from (marketing, billing, all)
- ts: Timestamp from signed URL
- sig: HMAC signature from signed URL
"""
from igny8_core.business.billing.services.email_service import verify_unsubscribe_signature
import logging
logger = logging.getLogger(__name__)
email = request.data.get('email')
email_type = request.data.get('type', 'all')
timestamp = request.data.get('ts')
signature = request.data.get('sig')
# Validate required fields
if not email or not timestamp or not signature:
return error_response(
error='Missing required parameters',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
try:
timestamp = int(timestamp)
except (ValueError, TypeError):
return error_response(
error='Invalid timestamp',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Verify signature
if not verify_unsubscribe_signature(email, email_type, timestamp, signature):
return error_response(
error='Invalid or expired unsubscribe link',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Log the unsubscribe request
# In production, update user preferences or use email provider's suppression list
logger.info(f'Unsubscribe request processed: email={email}, type={email_type}')
# TODO: Implement preference storage
# Options:
# 1. Add email preference fields to User model
# 2. Use Resend's suppression list API
# 3. Create EmailPreferences model
return success_response(
message=f'Successfully unsubscribed from {email_type} emails',
request=request
)
urlpatterns = [
path('', include(router.urls)),
path('register/', csrf_exempt(RegisterView.as_view()), name='auth-register'),
path('login/', csrf_exempt(LoginView.as_view()), name='auth-login'),
path('logout/', csrf_exempt(LogoutView.as_view()), name='auth-logout'),
path('refresh/', csrf_exempt(RefreshTokenView.as_view()), name='auth-refresh'),
path('change-password/', ChangePasswordView.as_view(), name='auth-change-password'),
path('password-reset/', csrf_exempt(PasswordResetRequestView.as_view()), name='auth-password-reset-request'),
path('password-reset/confirm/', csrf_exempt(PasswordResetConfirmView.as_view()), name='auth-password-reset-confirm'),
path('me/', MeView.as_view(), name='auth-me'),
path('countries/', CountryListView.as_view(), name='auth-countries'),
path('unsubscribe/', csrf_exempt(UnsubscribeView.as_view()), name='auth-unsubscribe'),
]