""" ViewSets for Billing API Unified API Standard v1.0 compliant """ from rest_framework import viewsets, status, permissions from rest_framework.decorators import action from rest_framework.response import Response from django.db.models import Sum, Count, Q from django.utils import timezone from datetime import timedelta from decimal import Decimal from drf_spectacular.utils import extend_schema, extend_schema_view from igny8_core.api.base import AccountModelViewSet from igny8_core.api.pagination import CustomPageNumberPagination from igny8_core.api.response import success_response, error_response from igny8_core.api.throttles import DebugScopedRateThrottle from igny8_core.api.authentication import JWTAuthentication, CSRFExemptSessionAuthentication from igny8_core.api.permissions import IsAuthenticatedAndActive, HasTenantAccess, IsAdminOrOwner from .models import CreditTransaction, CreditUsageLog from .serializers import ( CreditTransactionSerializer, CreditUsageLogSerializer, CreditBalanceSerializer, UsageSummarySerializer, UsageLimitsSerializer ) from .services import CreditService from .exceptions import InsufficientCreditsError @extend_schema_view( list=extend_schema(tags=['Billing'], summary='Get credit balance'), ) class CreditBalanceViewSet(viewsets.ViewSet): """ ViewSet for credit balance operations Unified API Standard v1.0 compliant """ permission_classes = [IsAuthenticatedAndActive, HasTenantAccess] authentication_classes = [JWTAuthentication] throttle_scope = 'billing' throttle_classes = [DebugScopedRateThrottle] def list(self, request): """Get current credit balance and usage""" account = getattr(request, 'account', None) if not account: user = getattr(request, 'user', None) if user and user.is_authenticated: from igny8_core.auth.models import User as UserModel user = UserModel.objects.select_related('account', 'account__plan').get(id=user.id) account = user.account request.account = account if not account: return success_response(data={ 'credits': 0, 'plan_credits_per_month': 0, 'credits_used_this_month': 0, 'credits_remaining': 0, }, request=request) # Get plan credits - plan is already associated plan_credits_per_month = 0 if account.plan: plan_credits_per_month = account.plan.get_effective_credits_per_month() # Calculate credits used this month now = timezone.now() start_of_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) credits_used_this_month = CreditUsageLog.objects.filter( account=account, created_at__gte=start_of_month ).aggregate(total=Sum('credits_used'))['total'] or 0 credits = account.credits or 0 credits_remaining = credits data = { 'credits': credits, 'plan_credits_per_month': plan_credits_per_month, 'credits_used_this_month': credits_used_this_month, 'credits_remaining': credits_remaining, } # Validate and serialize data serializer = CreditBalanceSerializer(data=data) serializer.is_valid(raise_exception=True) return success_response(data=serializer.validated_data, request=request) @extend_schema_view( list=extend_schema(tags=['Billing']), retrieve=extend_schema(tags=['Billing']), ) class CreditUsageViewSet(AccountModelViewSet): """ ViewSet for credit usage logs Unified API Standard v1.0 compliant """ queryset = CreditUsageLog.objects.all() serializer_class = CreditUsageLogSerializer permission_classes = [IsAuthenticatedAndActive, HasTenantAccess] authentication_classes = [JWTAuthentication] pagination_class = CustomPageNumberPagination throttle_scope = 'billing' throttle_classes = [DebugScopedRateThrottle] filter_backends = [] def get_queryset(self): """Get usage logs for current account - base class handles account filtering""" queryset = super().get_queryset() # Filter by operation type operation_type = self.request.query_params.get('operation_type') if operation_type: queryset = queryset.filter(operation_type=operation_type) # Filter by date range start_date = self.request.query_params.get('start_date') end_date = self.request.query_params.get('end_date') if start_date: queryset = queryset.filter(created_at__gte=start_date) if end_date: queryset = queryset.filter(created_at__lte=end_date) return queryset.order_by('-created_at') @extend_schema(tags=['Billing'], summary='Get usage summary') @action(detail=False, methods=['get']) def summary(self, request): """Get usage summary for date range""" account = getattr(request, 'account', None) if not account: user = getattr(request, 'user', None) if user: account = getattr(user, 'account', None) if not account: return error_response( error='Account not found', status_code=status.HTTP_400_BAD_REQUEST, request=request ) # Get date range from query params start_date = request.query_params.get('start_date') end_date = request.query_params.get('end_date') # Default to current month if not provided now = timezone.now() def parse_iso_datetime(dt_str): """Parse ISO datetime string, handling Z suffix for UTC""" if not dt_str: return None # Handle Z suffix (UTC indicator) which Django's parse_datetime doesn't support if dt_str.endswith('Z'): dt_str = dt_str[:-1] + '+00:00' from django.utils.dateparse import parse_datetime return parse_datetime(dt_str) if not start_date: start_date = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) else: start_date = parse_iso_datetime(start_date) or start_date if not end_date: end_date = now else: end_date = parse_iso_datetime(end_date) or end_date # Get usage logs in date range usage_logs = CreditUsageLog.objects.filter( account=account, created_at__gte=start_date, created_at__lte=end_date ) # Filter by site if provided site_id = request.query_params.get('site_id') if site_id: try: usage_logs = usage_logs.filter(site_id=int(site_id)) except (ValueError, TypeError): pass # Calculate totals total_credits_used = usage_logs.aggregate(total=Sum('credits_used'))['total'] or 0 total_cost_usd = usage_logs.aggregate(total=Sum('cost_usd'))['total'] or Decimal('0.00') # Group by operation type by_operation = {} for operation_type, _ in CreditUsageLog.OPERATION_TYPE_CHOICES: operation_logs = usage_logs.filter(operation_type=operation_type) credits = operation_logs.aggregate(total=Sum('credits_used'))['total'] or 0 cost = operation_logs.aggregate(total=Sum('cost_usd'))['total'] or Decimal('0.00') count = operation_logs.count() if credits > 0 or count > 0: by_operation[operation_type] = { 'credits': credits, 'cost': float(cost), 'count': count } # Group by model by_model = {} model_stats = usage_logs.values('model_used').annotate( credits=Sum('credits_used'), cost=Sum('cost_usd'), count=Count('id') ).filter(model_used__isnull=False).exclude(model_used='') for stat in model_stats: model = stat['model_used'] by_model[model] = { 'credits': stat['credits'] or 0, 'cost': float(stat['cost'] or Decimal('0.00')) } data = { 'period': { 'start': start_date.isoformat() if hasattr(start_date, 'isoformat') else str(start_date), 'end': end_date.isoformat() if hasattr(end_date, 'isoformat') else str(end_date), }, 'total_credits_used': total_credits_used, 'total_cost_usd': float(total_cost_usd), 'by_operation': by_operation, 'by_model': by_model, } serializer = UsageSummarySerializer(data) return success_response(data=serializer.data, request=request) @extend_schema(tags=['Billing'], summary='Get usage limits') @action(detail=False, methods=['get'], url_path='limits', url_name='limits') def limits(self, request): """ Get account limits and credit usage statistics (Phase 0: Credit-only system). Returns account management limits and credit usage only. """ # Try multiple ways to get account account = getattr(request, 'account', None) if not account: user = getattr(request, 'user', None) if user and user.is_authenticated: # Try to get account from user - refresh from DB to ensure we have latest try: from igny8_core.auth.models import User as UserModel # Refresh user from DB to get account relationship user = UserModel.objects.select_related('account', 'account__plan').get(id=user.id) account = user.account # Also set it on request for future use request.account = account except (AttributeError, UserModel.DoesNotExist, Exception) as e: account = None if not account: # Return empty limits instead of error - frontend will show "no data" message return success_response(data={'limits': []}, request=request) plan = account.plan if not plan: # Return empty limits instead of error - allows frontend to show "no plan" message return success_response(data={'limits': []}, request=request) # Import models from igny8_core.auth.models import User, Site # Get current month boundaries now = timezone.now() start_of_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) # Calculate usage statistics limits_data = [] # Credit Usage (Phase 0: Credit-only system) credits_used_month = CreditUsageLog.objects.filter( account=account, created_at__gte=start_of_month ).aggregate(total=Sum('credits_used'))['total'] or 0 # Get credits by operation type cluster_credits = CreditUsageLog.objects.filter( account=account, operation_type__in=['clustering'], created_at__gte=start_of_month ).aggregate(total=Sum('credits_used'))['total'] or 0 content_credits = CreditUsageLog.objects.filter( account=account, operation_type__in=['content', 'content_generation'], created_at__gte=start_of_month ).aggregate(total=Sum('credits_used'))['total'] or 0 image_credits = CreditUsageLog.objects.filter( account=account, operation_type__in=['images', 'image_generation', 'image_prompt_extraction'], created_at__gte=start_of_month ).aggregate(total=Sum('credits_used'))['total'] or 0 idea_credits = CreditUsageLog.objects.filter( account=account, operation_type__in=['ideas', 'idea_generation'], created_at__gte=start_of_month ).aggregate(total=Sum('credits_used'))['total'] or 0 # Use included_credits from plan (Phase 0: Credit-only) plan_credits = plan.included_credits or plan.credits_per_month or 0 limits_data.extend([ { 'title': 'Monthly Credits', 'limit': plan_credits, 'used': credits_used_month, 'available': max(0, plan_credits - credits_used_month), 'unit': 'credits', 'category': 'credits', 'percentage': (credits_used_month / plan_credits * 100) if plan_credits else 0 }, { 'title': 'Current Balance', 'limit': None, # No limit - shows current balance 'used': None, 'available': account.credits, 'unit': 'credits', 'category': 'credits', 'percentage': None }, { 'title': 'Clustering Credits', 'limit': None, 'used': cluster_credits, 'available': None, 'unit': 'credits', 'category': 'credits', 'percentage': None }, { 'title': 'Content Generation Credits', 'limit': None, 'used': content_credits, 'available': None, 'unit': 'credits', 'category': 'credits', 'percentage': None }, { 'title': 'Image Generation Credits', 'limit': None, 'used': image_credits, 'available': None, 'unit': 'credits', 'category': 'credits', 'percentage': None }, { 'title': 'Idea Generation Credits', 'limit': None, 'used': idea_credits, 'available': None, 'unit': 'credits', 'category': 'credits', 'percentage': None }, ]) # Account Management Limits (kept - not operation limits) users_count = User.objects.filter(account=account).count() sites_count = Site.objects.filter(account=account).count() limits_data.extend([ { 'title': 'Users', 'limit': plan.max_users or 0, 'used': users_count, 'available': max(0, (plan.max_users or 0) - users_count), 'unit': 'users', 'category': 'account', 'percentage': (users_count / (plan.max_users or 1)) * 100 if plan.max_users else 0 }, { 'title': 'Sites', 'limit': plan.max_sites or 0, 'used': sites_count, 'available': max(0, (plan.max_sites or 0) - sites_count), 'unit': 'sites', 'category': 'account', 'percentage': (sites_count / (plan.max_sites or 1)) * 100 if plan.max_sites else 0 }, ]) # Return data directly - serializer validation not needed for read-only endpoint return success_response(data={'limits': limits_data}, request=request) @extend_schema_view( list=extend_schema(tags=['Billing']), retrieve=extend_schema(tags=['Billing']), ) class CreditTransactionViewSet(AccountModelViewSet): """ ViewSet for credit transaction history Unified API Standard v1.0 compliant """ queryset = CreditTransaction.objects.all() serializer_class = CreditTransactionSerializer permission_classes = [IsAuthenticatedAndActive, HasTenantAccess, IsAdminOrOwner] authentication_classes = [JWTAuthentication] pagination_class = CustomPageNumberPagination throttle_scope = 'billing' throttle_classes = [DebugScopedRateThrottle] def get_queryset(self): """Get transactions for current account - base class handles account filtering""" queryset = super().get_queryset() # Filter by transaction type transaction_type = self.request.query_params.get('transaction_type') if transaction_type: queryset = queryset.filter(transaction_type=transaction_type) return queryset.order_by('-created_at') class BillingOverviewViewSet(viewsets.ViewSet): """User-facing billing overview API""" permission_classes = [IsAuthenticatedAndActive] authentication_classes = [JWTAuthentication] def account_balance(self, request): """Get account balance with subscription info""" account = getattr(request, 'account', None) or request.user.account # Get subscription plan subscription_plan = 'Free' monthly_credits_included = 0 if account.plan: subscription_plan = account.plan.name monthly_credits_included = account.plan.get_effective_credits_per_month() # Calculate bonus credits (credits beyond monthly allowance) bonus_credits = max(0, account.credits - monthly_credits_included) data = { 'credits': account.credits or 0, 'subscription_plan': subscription_plan, 'monthly_credits_included': monthly_credits_included, 'bonus_credits': bonus_credits, } return Response(data) @extend_schema_view( stats=extend_schema(tags=['Admin Billing'], summary='Admin billing stats'), list_users=extend_schema(tags=['Admin Billing'], summary='List users with credit info'), adjust_credits=extend_schema(tags=['Admin Billing'], summary='Adjust user credits'), list_credit_costs=extend_schema(tags=['Admin Billing'], summary='List credit cost configurations'), update_credit_costs=extend_schema(tags=['Admin Billing'], summary='Update credit cost configurations'), ) class AdminBillingViewSet(viewsets.ViewSet): """Admin-only billing management API""" permission_classes = [IsAuthenticatedAndActive, permissions.IsAdminUser] authentication_classes = [JWTAuthentication] def stats(self, request): """Get system-wide billing statistics""" from igny8_core.auth.models import Account total_users = Account.objects.filter(status='active').count() active_users = Account.objects.filter(status='active').exclude(users__last_login__isnull=True).count() total_credits_issued = Account.objects.aggregate( total=Sum('credits') )['total'] or 0 total_credits_used = CreditUsageLog.objects.aggregate( total=Sum('credits_used') )['total'] or 0 return Response({ 'total_users': total_users, 'active_users': active_users, 'total_credits_issued': total_credits_issued, 'total_credits_used': total_credits_used, }) def list_users(self, request): """List all users with credit information""" from igny8_core.auth.models import Account from django.db.models import Q # Get search query from request search = request.query_params.get('search', '') queryset = Account.objects.filter(status='active').prefetch_related('users') # Apply search filter if search: queryset = queryset.filter( Q(user__username__icontains=search) | Q(user__email__icontains=search) ) accounts = queryset[:100] data = [] for acc in accounts: user = acc.users.first() if acc.users.exists() else None data.append({ 'id': acc.id, 'username': user.username if user else 'N/A', 'email': user.email if user else 'N/A', 'credits': acc.credits or 0, 'subscription_plan': acc.plan.name if acc.plan else 'Free', 'is_active': acc.status == 'active', 'date_joined': acc.created_at }) return Response({'results': data}) def adjust_credits(self, request, user_id): """Adjust credits for a specific user""" from igny8_core.auth.models import Account try: account = Account.objects.get(id=user_id) except Account.DoesNotExist: return Response({'error': 'User not found'}, status=404) amount = request.data.get('amount', 0) reason = request.data.get('reason', 'Admin adjustment') try: amount = int(amount) except (ValueError, TypeError): return Response({'error': 'Invalid amount'}, status=400) # Adjust credits old_balance = account.credits account.credits = (account.credits or 0) + amount account.save() # Log the adjustment CreditUsageLog.objects.create( account=account, operation_type='admin_adjustment', credits_used=-amount, # Negative for additions credits_balance_after=account.credits, details={'reason': reason, 'old_balance': old_balance, 'adjusted_by': request.user.id} ) return Response({ 'success': True, 'new_balance': account.credits, 'old_balance': old_balance, 'adjustment': amount }) def list_credit_costs(self, request): """List credit cost configurations""" from igny8_core.business.billing.models import CreditCostConfig configs = CreditCostConfig.objects.filter(is_active=True) data = [{ 'id': c.id, 'operation_type': c.operation_type, 'display_name': c.display_name, 'credits_cost': c.credits_cost, 'unit': c.unit, 'is_active': c.is_active, 'created_at': c.created_at } for c in configs] return Response({'results': data}) def update_credit_costs(self, request): """Update credit cost configurations""" from igny8_core.business.billing.models import CreditCostConfig updates = request.data.get('updates', []) for update in updates: config_id = update.get('id') new_cost = update.get('cost') if config_id and new_cost is not None: try: config = CreditCostConfig.objects.get(id=config_id) config.cost = new_cost config.save() except CreditCostConfig.DoesNotExist: continue return Response({'success': True}) def invoices(self, request): """List all invoices (admin view)""" from igny8_core.business.billing.models import Invoice invoices = Invoice.objects.all().select_related('account').order_by('-created_at')[:100] data = [{ 'id': inv.id, 'invoice_number': inv.invoice_number, 'account_name': inv.account.name if inv.account else 'N/A', 'status': inv.status, 'total_amount': str(inv.total), 'created_at': inv.created_at.isoformat() } for inv in invoices] return Response({'results': data}) def payments(self, request): """List all payments (admin view)""" from igny8_core.business.billing.models import Payment payments = Payment.objects.all().select_related('account', 'invoice').order_by('-created_at')[:100] data = [{ 'id': pay.id, 'account_name': pay.account.name if pay.account else 'N/A', 'invoice_number': pay.invoice.invoice_number if pay.invoice else 'N/A', 'amount': str(pay.amount), 'status': pay.status, 'payment_method': pay.payment_method, 'created_at': pay.created_at.isoformat() } for pay in payments] return Response({'results': data}) def pending_payments(self, request): """List pending payments awaiting approval""" from igny8_core.business.billing.models import Payment payments = Payment.objects.filter(status='pending_approval').select_related('account', 'invoice').order_by('-created_at') data = [{ 'id': pay.id, 'account_name': pay.account.name if pay.account else 'N/A', 'invoice_number': pay.invoice.invoice_number if pay.invoice else 'N/A', 'amount': str(pay.amount), 'payment_method': pay.payment_method, 'manual_reference': pay.manual_reference, 'manual_notes': pay.manual_notes, 'created_at': pay.created_at.isoformat() } for pay in payments] return Response({'results': data}) def approve_payment(self, request, pk): """Approve a pending payment - activates account, subscription, and adds credits""" from django.db import transaction from igny8_core.business.billing.models import Payment from igny8_core.modules.billing.services import CreditService import logging logger = logging.getLogger(__name__) try: with transaction.atomic(): # Get payment with related objects payment = Payment.objects.select_related( 'invoice', 'invoice__subscription', 'invoice__subscription__plan', 'account', 'account__subscription', 'account__subscription__plan', 'account__plan' ).get(pk=pk, status='pending_approval') admin_notes = request.data.get('notes', '') # 1. Update Payment status payment.status = 'succeeded' payment.processed_at = timezone.now() payment.approved_by = request.user payment.approved_at = timezone.now() if admin_notes: payment.admin_notes = admin_notes payment.save() invoice = payment.invoice account = payment.account # 2. Mark invoice as paid if invoice: invoice.status = 'paid' invoice.paid_at = timezone.now() invoice.save() # 3. Get and activate subscription subscription = None if invoice and hasattr(invoice, 'subscription') and invoice.subscription: subscription = invoice.subscription elif account and hasattr(account, 'subscription'): try: subscription = account.subscription except Exception: pass if subscription: subscription.status = 'active' subscription.external_payment_id = payment.manual_reference subscription.save(update_fields=['status', 'external_payment_id']) # 4. CRITICAL: Set account status to active account.status = 'active' account.save(update_fields=['status']) # 5. Add credits if plan has included credits credits_added = 0 try: plan = None if subscription and subscription.plan: plan = subscription.plan elif account and account.plan: plan = account.plan if plan and plan.included_credits > 0: credits_added = plan.included_credits CreditService.add_credits( account=account, amount=credits_added, transaction_type='subscription', description=f'{plan.name} plan credits - Invoice {invoice.invoice_number if invoice else "N/A"}', metadata={ 'subscription_id': subscription.id if subscription else None, 'invoice_id': invoice.id if invoice else None, 'payment_id': payment.id, 'plan_id': plan.id, 'approved_by': request.user.email } ) except Exception as credit_error: logger.error(f'Credit addition failed for payment {payment.id}: {credit_error}', exc_info=True) # Don't fail the approval if credits fail - account is still activated logger.info( f'Payment approved: Payment {payment.id}, Account {account.id} set to active, ' f'{credits_added} credits added' ) # 6. Send approval email try: from igny8_core.business.billing.services.email_service import BillingEmailService BillingEmailService.send_payment_approved_email(payment, account, subscription) except Exception as e: logger.error(f'Failed to send payment approved email: {str(e)}') return Response({ 'success': True, 'message': 'Payment approved. Account activated.', 'payment': { 'id': payment.id, 'status': payment.status, 'account_status': account.status, 'credits_added': credits_added } }) except Payment.DoesNotExist: return Response({'error': 'Payment not found or not pending'}, status=404) except Exception as e: logger.error(f'Error approving payment {pk}: {str(e)}', exc_info=True) return Response({'error': f'Failed to approve payment: {str(e)}'}, status=500) def reject_payment(self, request, pk): """Reject a pending payment""" from igny8_core.business.billing.models import Payment import logging logger = logging.getLogger(__name__) try: payment = Payment.objects.select_related('account').get(pk=pk, status='pending_approval') rejection_reason = request.data.get('reason', 'Rejected by admin') payment.status = 'failed' payment.failed_at = timezone.now() payment.failure_reason = rejection_reason payment.approved_by = request.user payment.approved_at = timezone.now() payment.admin_notes = rejection_reason payment.save() # Update account status to allow retry (if not already active) account = payment.account if account and account.status != 'active': account.status = 'pending_payment' account.save(update_fields=['status']) logger.info(f'Payment rejected: Payment {payment.id}, Reason: {rejection_reason}') # Send rejection email try: from igny8_core.business.billing.services.email_service import BillingEmailService BillingEmailService.send_payment_rejected_email(payment, account, rejection_reason) except Exception as e: logger.error(f'Failed to send payment rejected email: {str(e)}') return Response({'success': True, 'message': 'Payment rejected'}) except Payment.DoesNotExist: return Response({'error': 'Payment not found or not pending'}, status=404) def payment_method_configs(self, request): """List or create payment method configurations""" from igny8_core.business.billing.models import PaymentMethodConfig if request.method == 'GET': configs = PaymentMethodConfig.objects.all() data = [{ 'id': c.id, 'type': c.type, 'name': c.name, 'description': c.description, 'is_enabled': c.is_enabled, 'sort_order': c.sort_order } for c in configs] return Response({'results': data}) else: # Handle POST for creating new config return Response({'error': 'Not implemented'}, status=501) def payment_method_config(self, request, pk): """Get, update, or delete a payment method config""" from igny8_core.business.billing.models import PaymentMethodConfig try: config = PaymentMethodConfig.objects.get(pk=pk) if request.method == 'GET': return Response({ 'id': config.id, 'type': config.type, 'name': config.name, 'description': config.description, 'is_enabled': config.is_enabled, 'sort_order': config.sort_order }) elif request.method in ['PATCH', 'PUT']: # Update config return Response({'error': 'Not implemented'}, status=501) elif request.method == 'DELETE': # Delete config config.delete() return Response({'success': True}) except PaymentMethodConfig.DoesNotExist: return Response({'error': 'Config not found'}, status=404) def account_payment_methods(self, request): """List or create account payment methods""" from igny8_core.business.billing.models import AccountPaymentMethod if request.method == 'GET': methods = AccountPaymentMethod.objects.all().select_related('account')[:100] data = [{ 'id': str(m.id), 'account_name': m.account.name if m.account else 'N/A', 'type': m.type, 'display_name': m.display_name, 'is_default': m.is_default } for m in methods] return Response({'results': data}) else: return Response({'error': 'Not implemented'}, status=501) def account_payment_method(self, request, pk): """Get, update, or delete an account payment method""" from igny8_core.business.billing.models import AccountPaymentMethod try: method = AccountPaymentMethod.objects.get(pk=pk) if request.method == 'GET': return Response({ 'id': str(method.id), 'account_name': method.account.name if method.account else 'N/A', 'type': method.type, 'display_name': method.display_name, 'is_default': method.is_default }) elif request.method in ['PATCH', 'PUT']: return Response({'error': 'Not implemented'}, status=501) elif request.method == 'DELETE': method.delete() return Response({'success': True}) except AccountPaymentMethod.DoesNotExist: return Response({'error': 'Method not found'}, status=404) def set_default_account_payment_method(self, request, pk): """Set an account payment method as default""" from igny8_core.business.billing.models import AccountPaymentMethod try: method = AccountPaymentMethod.objects.get(pk=pk) # Unset other defaults for this account AccountPaymentMethod.objects.filter(account=method.account, is_default=True).update(is_default=False) # Set this as default method.is_default = True method.save() return Response({'success': True, 'message': 'Payment method set as default'}) except AccountPaymentMethod.DoesNotExist: return Response({'error': 'Method not found'}, status=404) @extend_schema_view( list=extend_schema(tags=['AI Models'], summary='List available AI models'), retrieve=extend_schema(tags=['AI Models'], summary='Get AI model details'), ) class AIModelConfigViewSet(viewsets.ReadOnlyModelViewSet): """ ViewSet for AI Model Configuration (Read-Only) Provides model information for frontend dropdowns and displays """ permission_classes = [IsAuthenticatedAndActive] authentication_classes = [JWTAuthentication, CSRFExemptSessionAuthentication] throttle_scope = 'billing' throttle_classes = [DebugScopedRateThrottle] pagination_class = None # No pagination for model lists lookup_field = 'model_name' def get_queryset(self): """Get AIModelConfig queryset with filters""" from igny8_core.business.billing.models import AIModelConfig queryset = AIModelConfig.objects.filter(is_active=True) # Filter by model type model_type = self.request.query_params.get('type', None) if model_type: queryset = queryset.filter(model_type=model_type) # Filter by provider provider = self.request.query_params.get('provider', None) if provider: queryset = queryset.filter(provider=provider) # Filter by default is_default = self.request.query_params.get('default', None) if is_default is not None: is_default_bool = is_default.lower() in ['true', '1', 'yes'] queryset = queryset.filter(is_default=is_default_bool) return queryset.order_by('model_type', 'model_name') def get_serializer_class(self): """Return serializer class""" from .serializers import AIModelConfigSerializer return AIModelConfigSerializer def list(self, request, *args, **kwargs): """List all available models with filters""" queryset = self.get_queryset() serializer = self.get_serializer(queryset, many=True) return success_response( data=serializer.data, message='AI models retrieved successfully' ) def retrieve(self, request, *args, **kwargs): """Get details for a specific model""" try: instance = self.get_queryset().get(model_name=kwargs.get('model_name')) serializer = self.get_serializer(instance) return success_response( data=serializer.data, message='AI model details retrieved successfully' ) except Exception as e: return error_response( message='Model not found', errors={'model_name': [str(e)]}, status_code=status.HTTP_404_NOT_FOUND ) # ============================================================================== # Site AI Budget Allocation ViewSet # ============================================================================== from rest_framework import serializers as drf_serializers class SiteAIBudgetAllocationSerializer(drf_serializers.Serializer): """Serializer for SiteAIBudgetAllocation model""" id = drf_serializers.IntegerField(read_only=True) ai_function = drf_serializers.CharField() ai_function_display = drf_serializers.SerializerMethodField() allocation_percentage = drf_serializers.IntegerField(min_value=0, max_value=100) is_enabled = drf_serializers.BooleanField() def get_ai_function_display(self, obj): display_map = { 'clustering': 'Keyword Clustering (Stage 1)', 'idea_generation': 'Ideas Generation (Stage 2)', 'content_generation': 'Content Generation (Stage 4)', 'image_prompt': 'Image Prompt Extraction (Stage 5)', 'image_generation': 'Image Generation (Stage 6)', } if hasattr(obj, 'ai_function'): return display_map.get(obj.ai_function, obj.ai_function) return display_map.get(obj.get('ai_function', ''), '') @extend_schema_view( list=extend_schema(tags=['Billing'], summary='Get AI budget allocations for a site'), create=extend_schema(tags=['Billing'], summary='Update AI budget allocations for a site'), ) class SiteAIBudgetAllocationViewSet(viewsets.ViewSet): """ ViewSet for managing Site AI Budget Allocations. GET /api/v1/billing/sites/{site_id}/ai-budget/ POST /api/v1/billing/sites/{site_id}/ai-budget/ Allows configuring what percentage of the site's credit budget can be used for each AI function during automation runs. """ permission_classes = [IsAuthenticatedAndActive, HasTenantAccess] authentication_classes = [JWTAuthentication] throttle_scope = 'billing' throttle_classes = [DebugScopedRateThrottle] def _get_site(self, site_id, request): """Get site and verify user has access""" from igny8_core.auth.models import Site try: site = Site.objects.get(id=int(site_id)) account = getattr(request, 'account', None) if account and site.account != account: return None return site except (Site.DoesNotExist, ValueError, TypeError): return None def list(self, request, site_id=None): """ Get AI budget allocations for a site. Creates default allocations if they don't exist. """ from igny8_core.business.billing.models import SiteAIBudgetAllocation site = self._get_site(site_id, request) if not site: return error_response( message='Site not found or access denied', errors=None, status_code=status.HTTP_404_NOT_FOUND ) account = getattr(request, 'account', None) or site.account # Get or create default allocations allocations = SiteAIBudgetAllocation.get_or_create_defaults_for_site(site, account) # Calculate total allocation total_percentage = sum(a.allocation_percentage for a in allocations if a.is_enabled) serializer = SiteAIBudgetAllocationSerializer(allocations, many=True) return success_response( data={ 'allocations': serializer.data, 'total_percentage': total_percentage, 'is_valid': total_percentage <= 100, }, message='AI budget allocations retrieved' ) def create(self, request, site_id=None): """ Update AI budget allocations for a site. Body: { "allocations": [ {"ai_function": "clustering", "allocation_percentage": 15, "is_enabled": true}, {"ai_function": "idea_generation", "allocation_percentage": 10, "is_enabled": true}, {"ai_function": "content_generation", "allocation_percentage": 40, "is_enabled": true}, {"ai_function": "image_prompt", "allocation_percentage": 5, "is_enabled": true}, {"ai_function": "image_generation", "allocation_percentage": 30, "is_enabled": true} ] } """ from igny8_core.business.billing.models import SiteAIBudgetAllocation site = self._get_site(site_id, request) if not site: return error_response( message='Site not found or access denied', errors=None, status_code=status.HTTP_404_NOT_FOUND ) account = getattr(request, 'account', None) or site.account allocations_data = request.data.get('allocations', []) if not allocations_data: return error_response( message='No allocations provided', errors={'allocations': ['This field is required']}, status_code=status.HTTP_400_BAD_REQUEST ) # Validate total percentage total_percentage = sum( a.get('allocation_percentage', 0) for a in allocations_data if a.get('is_enabled', True) ) if total_percentage > 100: return error_response( message='Total allocation exceeds 100%', errors={'total_percentage': [f'Total is {total_percentage}%, must be <= 100%']}, status_code=status.HTTP_400_BAD_REQUEST ) # Update or create allocations valid_functions = ['clustering', 'idea_generation', 'content_generation', 'image_prompt', 'image_generation'] updated = [] for alloc_data in allocations_data: ai_function = alloc_data.get('ai_function') if ai_function not in valid_functions: continue allocation, _ = SiteAIBudgetAllocation.objects.update_or_create( account=account, site=site, ai_function=ai_function, defaults={ 'allocation_percentage': alloc_data.get('allocation_percentage', 20), 'is_enabled': alloc_data.get('is_enabled', True), } ) updated.append(allocation) serializer = SiteAIBudgetAllocationSerializer(updated, many=True) return success_response( data={ 'allocations': serializer.data, 'total_percentage': total_percentage, }, message='AI budget allocations updated successfully' )