Fixing PLans page

This commit is contained in:
IGNY8 VPS (Salman)
2025-12-08 14:12:08 +00:00
parent da3b45d1c7
commit 144e955b92
24 changed files with 1992 additions and 1105 deletions

View File

@@ -0,0 +1,185 @@
#!/usr/bin/env python
"""
Create API test data for billing endpoints
All test records are marked with 'API_TEST' in name/description/notes
"""
import os
import sys
import django
# Setup Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'igny8_core.settings')
django.setup()
from django.utils import timezone
from django.contrib.auth import get_user_model
from igny8_core.auth.models import Account, Plan
from igny8_core.business.billing.models import (
Invoice, Payment, CreditTransaction, AccountPaymentMethod, PaymentMethodConfig
)
from decimal import Decimal
from datetime import timedelta
User = get_user_model()
print("Creating API test data...")
# Get or create test account
try:
account = Account.objects.get(name__icontains='scale')
print(f"✓ Using existing account: {account.name} (ID: {account.id})")
except Account.DoesNotExist:
# Get a plan
plan = Plan.objects.filter(is_active=True).first()
account = Account.objects.create(
name='API_TEST_ACCOUNT',
slug='api-test-account',
plan=plan,
credits=5000,
status='active'
)
print(f"✓ Created test account: {account.name} (ID: {account.id})")
# Create test invoices
invoice1, created = Invoice.objects.get_or_create(
account=account,
invoice_number='INV-API-TEST-001',
defaults={
'status': 'pending',
'subtotal': Decimal('99.99'),
'tax': Decimal('0.00'),
'total': Decimal('99.99'),
'currency': 'USD',
'invoice_date': timezone.now().date(),
'due_date': (timezone.now() + timedelta(days=30)).date(),
'billing_email': 'test@igny8.com',
'notes': 'API_TEST: Invoice for approval test',
'line_items': [{'description': 'API Test Service', 'amount': 99.99, 'quantity': 1}],
}
)
if created:
print(f"✓ Created test invoice 1 (ID: {invoice1.id})")
else:
print(f"✓ Existing test invoice 1 (ID: {invoice1.id})")
invoice2, created = Invoice.objects.get_or_create(
account=account,
invoice_number='INV-API-TEST-002',
defaults={
'status': 'pending',
'subtotal': Decimal('49.99'),
'tax': Decimal('0.00'),
'total': Decimal('49.99'),
'currency': 'USD',
'invoice_date': timezone.now().date(),
'due_date': (timezone.now() + timedelta(days=30)).date(),
'billing_email': 'test@igny8.com',
'notes': 'API_TEST: Invoice for rejection test',
'line_items': [{'description': 'API Test Service', 'amount': 49.99, 'quantity': 1}],
}
)
if created:
print(f"✓ Created test invoice 2 (ID: {invoice2.id})")
else:
print(f"✓ Existing test invoice 2 (ID: {invoice2.id})")
# Create test payment for approval
pending_payment, created = Payment.objects.get_or_create(
account=account,
invoice=invoice1,
manual_reference='API_TEST_REF_001',
defaults={
'status': 'pending_approval',
'payment_method': 'bank_transfer',
'amount': Decimal('99.99'),
'currency': 'USD',
'manual_notes': 'API_TEST: Test payment for approval endpoint',
}
)
if created:
print(f"✓ Created pending payment (ID: {pending_payment.id}) for approve_payment endpoint")
else:
print(f"✓ Existing pending payment (ID: {pending_payment.id})")
# Create test payment for rejection
reject_payment, created = Payment.objects.get_or_create(
account=account,
invoice=invoice2,
manual_reference='API_TEST_REF_002',
defaults={
'status': 'pending_approval',
'payment_method': 'manual',
'amount': Decimal('49.99'),
'currency': 'USD',
'manual_notes': 'API_TEST: Test payment for rejection endpoint',
}
)
if created:
print(f"✓ Created pending payment (ID: {reject_payment.id}) for reject_payment endpoint")
else:
print(f"✓ Existing pending payment (ID: {reject_payment.id})")
# Get or create test payment method config
configs = PaymentMethodConfig.objects.filter(payment_method='bank_transfer')
if configs.exists():
config = configs.first()
print(f"✓ Using existing payment method config (ID: {config.id})")
created = False
else:
config = PaymentMethodConfig.objects.create(
payment_method='bank_transfer',
display_name='API_TEST Bank Transfer',
instructions='API_TEST: Transfer to account 123456789',
is_enabled=True,
sort_order=1,
)
print(f"✓ Created payment method config (ID: {config.id})")
created = True
# Create test account payment method
account_method, created = AccountPaymentMethod.objects.get_or_create(
account=account,
type='bank_transfer',
defaults={
'display_name': 'API_TEST Account Bank Transfer',
'instructions': 'API_TEST: Test account-specific payment method',
'is_default': True,
}
)
if created:
print(f"✓ Created account payment method (ID: {account_method.id})")
else:
print(f"✓ Existing account payment method (ID: {account_method.id})")
# Create test credit transaction
transaction, created = CreditTransaction.objects.get_or_create(
account=account,
transaction_type='adjustment',
amount=1000,
defaults={
'balance_after': account.credits,
'description': 'API_TEST: Test credit adjustment',
'metadata': {'test': True, 'reason': 'API testing'},
}
)
if created:
print(f"✓ Created credit transaction (ID: {transaction.id})")
else:
print(f"✓ Existing credit transaction (ID: {transaction.id})")
print("\n" + "="*60)
print("API Test Data Summary:")
print("="*60)
print(f"Account ID: {account.id}")
print(f"Pending Payment (approve): ID {pending_payment.id}")
print(f"Pending Payment (reject): ID {reject_payment.id}")
print(f"Payment Method Config: ID {config.id}")
print(f"Account Payment Method: ID {account_method.id}")
print(f"Credit Transaction: ID {transaction.id}")
print("="*60)
print("\nTest endpoints:")
print(f"POST /v1/admin/billing/{pending_payment.id}/approve_payment/")
print(f"POST /v1/admin/billing/{reject_payment.id}/reject_payment/")
print(f"POST /v1/admin/users/{account.id}/adjust-credits/")
print(f"GET /v1/billing/payment-methods/{account_method.id}/set_default/")
print("="*60)

View File

@@ -22,9 +22,11 @@ class DebugScopedRateThrottle(ScopedRateThrottle):
def allow_request(self, request, view):
"""
Check if request should be throttled.
Bypasses for: DEBUG mode, superusers, developers, system accounts, and public requests.
Enforces per-account throttling for regular users.
DISABLED - Always allow all requests.
"""
return True
# OLD CODE BELOW (DISABLED)
# Bypass for superusers and developers
if request.user and hasattr(request.user, 'is_authenticated') and request.user.is_authenticated:
if getattr(request.user, 'is_superuser', False):

View File

@@ -0,0 +1,82 @@
"""
Management command to clean up expired and orphaned sessions
Helps prevent session contamination and reduces DB bloat
"""
from django.core.management.base import BaseCommand
from django.contrib.sessions.models import Session
from django.contrib.auth import get_user_model
from datetime import datetime, timedelta
User = get_user_model()
class Command(BaseCommand):
help = 'Clean up expired sessions and detect session contamination'
def add_arguments(self, parser):
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be deleted without actually deleting',
)
parser.add_argument(
'--days',
type=int,
default=7,
help='Delete sessions older than X days (default: 7)',
)
def handle(self, *args, **options):
dry_run = options['dry_run']
days = options['days']
cutoff_date = datetime.now() - timedelta(days=days)
# Get all sessions
all_sessions = Session.objects.all()
expired_sessions = Session.objects.filter(expire_date__lt=datetime.now())
old_sessions = Session.objects.filter(expire_date__lt=cutoff_date)
self.stdout.write(f"\n📊 Session Statistics:")
self.stdout.write(f" Total sessions: {all_sessions.count()}")
self.stdout.write(f" Expired sessions: {expired_sessions.count()}")
self.stdout.write(f" Sessions older than {days} days: {old_sessions.count()}")
# Count sessions by user
user_sessions = {}
for session in all_sessions:
try:
data = session.get_decoded()
user_id = data.get('_auth_user_id')
if user_id:
user = User.objects.get(id=user_id)
key = f"{user.username} ({user.account.slug if user.account else 'no-account'})"
user_sessions[key] = user_sessions.get(key, 0) + 1
except:
pass
if user_sessions:
self.stdout.write(f"\n📈 Active sessions by user:")
for user_key, count in sorted(user_sessions.items(), key=lambda x: x[1], reverse=True)[:10]:
indicator = "⚠️ " if count > 20 else " "
self.stdout.write(f"{indicator}{user_key}: {count} sessions")
# Delete expired sessions
if expired_sessions.exists():
if dry_run:
self.stdout.write(self.style.WARNING(f"\n[DRY RUN] Would delete {expired_sessions.count()} expired sessions"))
else:
count = expired_sessions.delete()[0]
self.stdout.write(self.style.SUCCESS(f"\n✓ Deleted {count} expired sessions"))
else:
self.stdout.write(f"\n✓ No expired sessions to clean")
# Detect potential contamination
warnings = []
for user_key, count in user_sessions.items():
if count > 50:
warnings.append(f"User '{user_key}' has {count} active sessions (potential proliferation)")
if warnings:
self.stdout.write(self.style.WARNING(f"\n⚠️ Contamination Warnings:"))
for warning in warnings:
self.stdout.write(self.style.WARNING(f" {warning}"))
self.stdout.write(f"\n💡 Consider running: python manage.py clearsessions")

View File

@@ -35,13 +35,11 @@ class AccountContextMiddleware(MiddlewareMixin):
# This ensures changes to account/plan are reflected immediately without re-login
try:
from .models import User as UserModel
# Refresh user from DB with account and plan relationships to get latest data
# This is important so account/plan changes are reflected immediately
user = UserModel.objects.select_related('account', 'account__plan').get(id=request.user.id)
# Update request.user with fresh data
request.user = user
# Get account from refreshed user
user_account = getattr(user, 'account', None)
# CRITICAL FIX: Never mutate request.user - it causes session contamination
# Instead, just read the current user and set request.account
# Django's session middleware already sets request.user correctly
user = request.user # Use the user from session, don't overwrite it
validation_error = self._validate_account_and_plan(request, user)
if validation_error:
return validation_error

View File

@@ -0,0 +1,26 @@
# Generated by Django 5.2.8 on 2025-12-08 13:01
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('igny8_core_auth', '0007_add_payment_method_fields'),
]
operations = [
migrations.RemoveIndex(
model_name='account',
name='auth_acc_payment_idx',
),
migrations.RemoveIndex(
model_name='subscription',
name='auth_sub_payment_idx',
),
migrations.AddField(
model_name='plan',
name='is_internal',
field=models.BooleanField(default=False, help_text='Internal-only plan (Free/Internal) - hidden from public plan listings'),
),
]

View File

@@ -0,0 +1,36 @@
# Generated manually
from django.db import migrations, models
import django.core.validators
class Migration(migrations.Migration):
dependencies = [
('igny8_core_auth', '0008_add_plan_is_internal'),
]
operations = [
migrations.AddField(
model_name='plan',
name='annual_discount_percent',
field=models.DecimalField(
decimal_places=2,
default=15.0,
help_text='Annual subscription discount percentage (default 15%)',
max_digits=5,
validators=[
django.core.validators.MinValueValidator(0),
django.core.validators.MaxValueValidator(100)
]
),
),
migrations.AddField(
model_name='plan',
name='is_featured',
field=models.BooleanField(
default=False,
help_text='Highlight this plan as popular/recommended'
),
),
]

View File

@@ -154,8 +154,17 @@ class Plan(models.Model):
slug = models.SlugField(unique=True, max_length=255)
price = models.DecimalField(max_digits=10, decimal_places=2)
billing_cycle = models.CharField(max_length=20, choices=BILLING_CYCLE_CHOICES, default='monthly')
annual_discount_percent = models.DecimalField(
max_digits=5,
decimal_places=2,
default=15.00,
validators=[MinValueValidator(0), MaxValueValidator(100)],
help_text="Annual subscription discount percentage (default 15%)"
)
is_featured = models.BooleanField(default=False, help_text="Highlight this plan as popular/recommended")
features = models.JSONField(default=list, blank=True, help_text="Plan features as JSON array (e.g., ['ai_writer', 'image_gen', 'auto_publish'])")
is_active = models.BooleanField(default=True)
is_internal = models.BooleanField(default=False, help_text="Internal-only plan (Free/Internal) - hidden from public plan listings")
created_at = models.DateTimeField(auto_now_add=True)
# Account Management Limits (kept - not operation limits)

View File

@@ -10,7 +10,8 @@ class PlanSerializer(serializers.ModelSerializer):
class Meta:
model = Plan
fields = [
'id', 'name', 'slug', 'price', 'billing_cycle', 'features', 'is_active',
'id', 'name', 'slug', 'price', 'billing_cycle', 'annual_discount_percent',
'is_featured', 'features', 'is_active',
'max_users', 'max_sites', 'max_industries', 'max_author_profiles',
'included_credits', 'extra_credit_price', 'allow_credit_topup',
'auto_credit_topup_threshold', 'auto_credit_topup_amount',

View File

@@ -440,9 +440,10 @@ class SiteUserAccessViewSet(AccountModelViewSet):
class PlanViewSet(viewsets.ReadOnlyModelViewSet):
"""
ViewSet for listing active subscription plans.
Excludes internal-only plans (Free/Internal) from public listings.
Unified API Standard v1.0 compliant
"""
queryset = Plan.objects.filter(is_active=True)
queryset = Plan.objects.filter(is_active=True, is_internal=False)
serializer_class = PlanSerializer
permission_classes = [permissions.AllowAny]
pagination_class = CustomPageNumberPagination
@@ -450,6 +451,16 @@ class PlanViewSet(viewsets.ReadOnlyModelViewSet):
throttle_scope = None
throttle_classes: list = []
def list(self, request, *args, **kwargs):
"""Override list to return paginated response with unified format"""
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return success_response(data={'results': serializer.data}, request=request)
def retrieve(self, request, *args, **kwargs):
"""Override retrieve to return unified format"""
try:

View File

@@ -200,7 +200,7 @@ class InvoiceViewSet(AccountModelViewSet):
# Serialize invoice data
results = []
for invoice in page:
for invoice in (page if page is not None else []):
results.append({
'id': invoice.id,
'invoice_number': invoice.invoice_number,
@@ -218,8 +218,10 @@ class InvoiceViewSet(AccountModelViewSet):
'created_at': invoice.created_at.isoformat(),
})
paginated_data = paginator.get_paginated_response({'results': results}).data
return paginated_response(paginated_data, request=request)
return paginated_response(
{'count': paginator.page.paginator.count, 'next': paginator.get_next_link(), 'previous': paginator.get_previous_link(), 'results': results},
request=request
)
def retrieve(self, request, pk=None):
"""Get invoice detail"""
@@ -291,7 +293,7 @@ class PaymentViewSet(AccountModelViewSet):
# Serialize payment data
results = []
for payment in page:
for payment in (page if page is not None else []):
results.append({
'id': payment.id,
'invoice_id': payment.invoice_id,
@@ -306,8 +308,10 @@ class PaymentViewSet(AccountModelViewSet):
'manual_notes': payment.manual_notes,
})
paginated_data = paginator.get_paginated_response({'results': results}).data
return paginated_response(paginated_data, request=request)
return paginated_response(
{'count': paginator.page.paginator.count, 'next': paginator.get_next_link(), 'previous': paginator.get_previous_link(), 'results': results},
request=request
)
@action(detail=False, methods=['post'])
def manual(self, request):
@@ -361,7 +365,7 @@ class CreditPackageViewSet(viewsets.ReadOnlyModelViewSet):
page = paginator.paginate_queryset(queryset, request)
results = []
for package in page:
for package in (page if page is not None else []):
results.append({
'id': package.id,
'name': package.name,
@@ -374,8 +378,10 @@ class CreditPackageViewSet(viewsets.ReadOnlyModelViewSet):
'display_order': package.sort_order,
})
paginated_data = paginator.get_paginated_response({'results': results}).data
return paginated_response(paginated_data, request=request)
return paginated_response(
{'count': paginator.page.paginator.count, 'next': paginator.get_next_link(), 'previous': paginator.get_previous_link(), 'results': results},
request=request
)
class AccountPaymentMethodViewSet(AccountModelViewSet):
@@ -398,7 +404,7 @@ class AccountPaymentMethodViewSet(AccountModelViewSet):
page = paginator.paginate_queryset(queryset, request)
results = []
for method in page:
for method in (page if page is not None else []):
results.append({
'id': str(method.id),
'type': method.type,
@@ -408,5 +414,7 @@ class AccountPaymentMethodViewSet(AccountModelViewSet):
'instructions': method.instructions,
})
paginated_data = paginator.get_paginated_response({'results': results}).data
return paginated_response(paginated_data, request=request)
return paginated_response(
{'count': paginator.page.paginator.count, 'next': paginator.get_next_link(), 'previous': paginator.get_previous_link(), 'results': results},
request=request
)

View File

@@ -578,4 +578,176 @@ class AdminBillingViewSet(viewsets.ViewSet):
continue
return Response({'success': True})
def invoices(self, request):
"""List all invoices (admin view)"""
from igny8_core.business.billing.models import Invoice
invoices = Invoice.objects.all().select_related('account').order_by('-created_at')[:100]
data = [{
'id': inv.id,
'invoice_number': inv.invoice_number,
'account_name': inv.account.name if inv.account else 'N/A',
'status': inv.status,
'total_amount': str(inv.total),
'created_at': inv.created_at.isoformat()
} for inv in invoices]
return Response({'results': data})
def payments(self, request):
"""List all payments (admin view)"""
from igny8_core.business.billing.models import Payment
payments = Payment.objects.all().select_related('account', 'invoice').order_by('-created_at')[:100]
data = [{
'id': pay.id,
'account_name': pay.account.name if pay.account else 'N/A',
'invoice_number': pay.invoice.invoice_number if pay.invoice else 'N/A',
'amount': str(pay.amount),
'status': pay.status,
'payment_method': pay.payment_method,
'created_at': pay.created_at.isoformat()
} for pay in payments]
return Response({'results': data})
def pending_payments(self, request):
"""List pending payments awaiting approval"""
from igny8_core.business.billing.models import Payment
payments = Payment.objects.filter(status='pending_approval').select_related('account', 'invoice').order_by('-created_at')
data = [{
'id': pay.id,
'account_name': pay.account.name if pay.account else 'N/A',
'invoice_number': pay.invoice.invoice_number if pay.invoice else 'N/A',
'amount': str(pay.amount),
'payment_method': pay.payment_method,
'manual_reference': pay.manual_reference,
'manual_notes': pay.manual_notes,
'created_at': pay.created_at.isoformat()
} for pay in payments]
return Response({'results': data})
def approve_payment(self, request, pk):
"""Approve a pending payment"""
from igny8_core.business.billing.models import Payment
try:
payment = Payment.objects.get(pk=pk, status='pending_approval')
payment.status = 'completed'
payment.processed_at = timezone.now()
payment.save()
# If payment has an invoice, mark it as paid
if payment.invoice:
payment.invoice.status = 'paid'
payment.invoice.paid_at = timezone.now()
payment.invoice.save()
return Response({'success': True, 'message': 'Payment approved'})
except Payment.DoesNotExist:
return Response({'error': 'Payment not found or not pending'}, status=404)
def reject_payment(self, request, pk):
"""Reject a pending payment"""
from igny8_core.business.billing.models import Payment
try:
payment = Payment.objects.get(pk=pk, status='pending_approval')
payment.status = 'failed'
payment.failed_at = timezone.now()
payment.failure_reason = request.data.get('reason', 'Rejected by admin')
payment.save()
return Response({'success': True, 'message': 'Payment rejected'})
except Payment.DoesNotExist:
return Response({'error': 'Payment not found or not pending'}, status=404)
def payment_method_configs(self, request):
"""List or create payment method configurations"""
from igny8_core.business.billing.models import PaymentMethodConfig
if request.method == 'GET':
configs = PaymentMethodConfig.objects.all()
data = [{
'id': c.id,
'type': c.type,
'name': c.name,
'description': c.description,
'is_enabled': c.is_enabled,
'sort_order': c.sort_order
} for c in configs]
return Response({'results': data})
else:
# Handle POST for creating new config
return Response({'error': 'Not implemented'}, status=501)
def payment_method_config(self, request, pk):
"""Get, update, or delete a payment method config"""
from igny8_core.business.billing.models import PaymentMethodConfig
try:
config = PaymentMethodConfig.objects.get(pk=pk)
if request.method == 'GET':
return Response({
'id': config.id,
'type': config.type,
'name': config.name,
'description': config.description,
'is_enabled': config.is_enabled,
'sort_order': config.sort_order
})
elif request.method in ['PATCH', 'PUT']:
# Update config
return Response({'error': 'Not implemented'}, status=501)
elif request.method == 'DELETE':
# Delete config
config.delete()
return Response({'success': True})
except PaymentMethodConfig.DoesNotExist:
return Response({'error': 'Config not found'}, status=404)
def account_payment_methods(self, request):
"""List or create account payment methods"""
from igny8_core.business.billing.models import AccountPaymentMethod
if request.method == 'GET':
methods = AccountPaymentMethod.objects.all().select_related('account')[:100]
data = [{
'id': str(m.id),
'account_name': m.account.name if m.account else 'N/A',
'type': m.type,
'display_name': m.display_name,
'is_default': m.is_default
} for m in methods]
return Response({'results': data})
else:
return Response({'error': 'Not implemented'}, status=501)
def account_payment_method(self, request, pk):
"""Get, update, or delete an account payment method"""
from igny8_core.business.billing.models import AccountPaymentMethod
try:
method = AccountPaymentMethod.objects.get(pk=pk)
if request.method == 'GET':
return Response({
'id': str(method.id),
'account_name': method.account.name if method.account else 'N/A',
'type': method.type,
'display_name': method.display_name,
'is_default': method.is_default
})
elif request.method in ['PATCH', 'PUT']:
return Response({'error': 'Not implemented'}, status=501)
elif request.method == 'DELETE':
method.delete()
return Response({'success': True})
except AccountPaymentMethod.DoesNotExist:
return Response({'error': 'Method not found'}, status=404)
def set_default_account_payment_method(self, request, pk):
"""Set an account payment method as default"""
from igny8_core.business.billing.models import AccountPaymentMethod
try:
method = AccountPaymentMethod.objects.get(pk=pk)
# Unset other defaults for this account
AccountPaymentMethod.objects.filter(account=method.account, is_default=True).update(is_default=False)
# Set this as default
method.is_default = True
method.save()
return Response({'success': True, 'message': 'Payment method set as default'})
except AccountPaymentMethod.DoesNotExist:
return Response({'error': 'Method not found'}, status=404)

View File

@@ -80,6 +80,15 @@ USE_SECURE_COOKIES = os.getenv('USE_SECURE_COOKIES', 'False').lower() == 'true'
SESSION_COOKIE_SECURE = USE_SECURE_COOKIES
CSRF_COOKIE_SECURE = USE_SECURE_COOKIES
# CRITICAL: Session isolation to prevent contamination
SESSION_COOKIE_NAME = 'igny8_sessionid' # Custom name to avoid conflicts
SESSION_COOKIE_HTTPONLY = True # Prevent JavaScript access
SESSION_COOKIE_SAMESITE = 'Strict' # Prevent cross-site cookie sharing
SESSION_COOKIE_AGE = 86400 # 24 hours
SESSION_SAVE_EVERY_REQUEST = False # Don't update session on every request (reduces DB load)
SESSION_COOKIE_PATH = '/' # Explicit path
# Don't set SESSION_COOKIE_DOMAIN - let it default to current domain for strict isolation
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'whitenoise.middleware.WhiteNoiseMiddleware',
@@ -228,39 +237,9 @@ REST_FRAMEWORK = {
# Unified API Standard v1.0: Exception handler enabled by default
# Set IGNY8_USE_UNIFIED_EXCEPTION_HANDLER=False to disable
'EXCEPTION_HANDLER': 'rest_framework.views.exception_handler' if os.getenv('IGNY8_USE_UNIFIED_EXCEPTION_HANDLER', 'True').lower() == 'false' else 'igny8_core.api.exception_handlers.custom_exception_handler',
# Rate limiting - configured but bypassed in DEBUG mode
'DEFAULT_THROTTLE_CLASSES': [
'igny8_core.api.throttles.DebugScopedRateThrottle',
],
'DEFAULT_THROTTLE_RATES': {
# AI Functions - Expensive operations (kept modest but higher to reduce false 429s)
'ai_function': '60/min',
'image_gen': '90/min',
# Content Operations
'content_write': '180/min',
'content_read': '600/min',
# Authentication
'auth': '300/min', # Login, register, password reset
'auth_strict': '120/min', # Sensitive auth operations
'auth_read': '600/min', # Read-only auth-adjacent endpoints (e.g., subscriptions, industries)
# Planner Operations
'planner': '300/min',
'planner_ai': '60/min',
# Writer Operations
'writer': '300/min',
'writer_ai': '60/min',
# System Operations
'system': '600/min',
'system_admin': '120/min',
# Billing Operations
'billing': '180/min',
'billing_admin': '60/min',
'linker': '180/min',
'optimizer': '60/min',
'integration': '600/min',
# Default fallback
'default': '600/min',
},
# Rate limiting - DISABLED
'DEFAULT_THROTTLE_CLASSES': [],
'DEFAULT_THROTTLE_RATES': {},
# OpenAPI Schema Generation (drf-spectacular)
'DEFAULT_SCHEMA_CLASS': 'drf_spectacular.openapi.AutoSchema',
}

View File

@@ -0,0 +1,18 @@
# Generated manually for adding is_internal flag to Plan model
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('igny8_core', '0012_creditpackage_alter_paymentmethodconfig_options_and_more'),
]
operations = [
migrations.AddField(
model_name='plan',
name='is_internal',
field=models.BooleanField(default=False, help_text='Internal-only plan (Free/Internal) - hidden from public plan listings'),
),
]

View File

@@ -0,0 +1,141 @@
#!/usr/bin/env python
"""
Test script to detect and reproduce session contamination bugs
Usage: docker exec igny8_backend python test_session_contamination.py
"""
import os
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'igny8_core.settings')
django.setup()
from django.contrib.sessions.models import Session
from django.contrib.auth import get_user_model
from django.test import RequestFactory
from django.contrib.sessions.middleware import SessionMiddleware
from igny8_core.auth.middleware import AccountContextMiddleware
from datetime import datetime, timedelta
User = get_user_model()
def test_session_isolation():
"""Test that sessions are properly isolated between users"""
print("\n=== SESSION CONTAMINATION TEST ===\n")
# Get test users
try:
developer = User.objects.get(username='developer')
scale_user = User.objects.filter(account__slug='scale-account').first()
if not scale_user:
print("⚠️ No scale account user found, creating one...")
from igny8_core.auth.models import Account
scale_account = Account.objects.filter(slug='scale-account').first()
if scale_account:
scale_user = User.objects.create_user(
username='scale_test',
email='scale@test.com',
password='testpass123',
account=scale_account,
role='owner'
)
else:
print("❌ No scale account found")
return False
print(f"✓ Developer user: {developer.username} (account: {developer.account.slug})")
print(f"✓ Scale user: {scale_user.username} (account: {scale_user.account.slug if scale_user.account else 'None'})")
except Exception as e:
print(f"❌ Failed to get test users: {e}")
return False
# Check active sessions
active_sessions = Session.objects.filter(expire_date__gte=datetime.now())
print(f"\n📊 Total active sessions: {active_sessions.count()}")
# Count sessions by user
user_sessions = {}
for session in active_sessions:
try:
data = session.get_decoded()
user_id = data.get('_auth_user_id')
if user_id:
user = User.objects.get(id=user_id)
key = f"{user.username} ({user.account.slug if user.account else 'no-account'})"
user_sessions[key] = user_sessions.get(key, 0) + 1
except:
pass
print("\n📈 Sessions by user:")
for user_key, count in sorted(user_sessions.items(), key=lambda x: x[1], reverse=True):
print(f" {user_key}: {count} sessions")
# Check for session contamination patterns
contamination_found = False
# Pattern 1: Too many sessions for one user
for user_key, count in user_sessions.items():
if count > 20:
print(f"\n⚠️ WARNING: {user_key} has {count} sessions (possible proliferation)")
contamination_found = True
# Pattern 2: Check session cookie settings
from django.conf import settings
print(f"\n🔧 Session Configuration:")
print(f" SESSION_COOKIE_NAME: {settings.SESSION_COOKIE_NAME}")
print(f" SESSION_COOKIE_DOMAIN: {getattr(settings, 'SESSION_COOKIE_DOMAIN', 'Not set (good)')}")
print(f" SESSION_COOKIE_SAMESITE: {getattr(settings, 'SESSION_COOKIE_SAMESITE', 'Not set')}")
print(f" SESSION_COOKIE_HTTPONLY: {settings.SESSION_COOKIE_HTTPONLY}")
print(f" SESSION_ENGINE: {settings.SESSION_ENGINE}")
if getattr(settings, 'SESSION_COOKIE_SAMESITE', None) != 'Strict':
print(f"\n⚠️ WARNING: SESSION_COOKIE_SAMESITE should be 'Strict' (currently: {getattr(settings, 'SESSION_COOKIE_SAMESITE', 'Not set')})")
contamination_found = True
# Test middleware isolation
print(f"\n🧪 Testing Middleware Isolation...")
factory = RequestFactory()
# Simulate two requests from different users
request1 = factory.get('/api/v1/test/')
request1.user = developer
request1.session = {}
request2 = factory.get('/api/v1/test/')
request2.user = scale_user
request2.session = {}
middleware = AccountContextMiddleware(lambda x: None)
# Process requests
middleware.process_request(request1)
middleware.process_request(request2)
# Check isolation
account1 = getattr(request1, 'account', None)
account2 = getattr(request2, 'account', None)
print(f" Request 1 account: {account1.slug if account1 else 'None'}")
print(f" Request 2 account: {account2.slug if account2 else 'None'}")
if account1 and account2 and account1.id == account2.id:
print(f"\n❌ CONTAMINATION DETECTED: Both requests have same account!")
contamination_found = True
else:
print(f"\n✓ Middleware isolation working correctly")
# Final result
if contamination_found:
print(f"\n❌ SESSION CONTAMINATION DETECTED")
print(f"\nRecommended fixes:")
print(f"1. Set SESSION_COOKIE_SAMESITE='Strict' in settings.py")
print(f"2. Clear all existing sessions: Session.objects.all().delete()")
print(f"3. Ensure users logout and re-login with fresh cookies")
return False
else:
print(f"\n✅ No contamination detected - sessions appear isolated")
return True
if __name__ == '__main__':
result = test_session_isolation()
exit(0 if result else 1)