Files
igny8/backend/igny8_core/business/billing/views.py
IGNY8 VPS (Salman) 6e2101d019 feat: add Usage Limits Panel component with usage tracking and visual indicators for limits
style: implement custom color schemes and gradients for account section, enhancing visual hierarchy
2025-12-12 13:15:15 +00:00

910 lines
39 KiB
Python

"""
Billing Views - Payment confirmation and management
"""
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import AllowAny
from django.db import transaction
from django.db.models import Q
from django.utils import timezone
from django.http import HttpResponse
from datetime import timedelta
from igny8_core.api.response import success_response, error_response, paginated_response
from igny8_core.api.permissions import IsAdminOrOwner, IsAuthenticatedAndActive, HasTenantAccess
from igny8_core.api.base import AccountModelViewSet
from igny8_core.api.pagination import CustomPageNumberPagination
from igny8_core.auth.models import Account, Subscription
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.services.invoice_service import InvoiceService
from igny8_core.business.billing.models import (
CreditTransaction, Invoice, Payment, CreditPackage,
AccountPaymentMethod, PaymentMethodConfig
)
from igny8_core.modules.billing.serializers import PaymentMethodConfigSerializer, PaymentConfirmationSerializer
import logging
logger = logging.getLogger(__name__)
class BillingViewSet(viewsets.GenericViewSet):
"""
ViewSet for billing operations (admin-only).
"""
permission_classes = [IsAdminOrOwner]
def get_permissions(self):
"""
Allow action-level permissions to override class-level permissions.
"""
# Try to get permission_classes from the action
try:
# DRF stores action permission_classes in the view method
return [permission() for permission in self.permission_classes]
except Exception:
return super().get_permissions()
@action(detail=False, methods=['post'], url_path='confirm-bank-transfer')
def confirm_bank_transfer(self, request):
"""
Confirm a bank transfer payment and activate/renew subscription.
Request body:
{
"account_id": 123,
"external_payment_id": "BT-2025-001",
"amount": "29.99",
"payer_name": "John Doe",
"proof_url": "https://...",
"period_months": 1
}
"""
account_id = request.data.get('account_id')
subscription_id = request.data.get('subscription_id')
external_payment_id = request.data.get('external_payment_id')
amount = request.data.get('amount')
payer_name = request.data.get('payer_name')
proof_url = request.data.get('proof_url')
period_months = int(request.data.get('period_months', 1))
if not all([external_payment_id, amount, payer_name]):
return error_response(
error='external_payment_id, amount, and payer_name are required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
if not account_id and not subscription_id:
return error_response(
error='Either account_id or subscription_id is required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
try:
with transaction.atomic():
# Get account
if account_id:
account = Account.objects.select_related('plan').get(id=account_id)
subscription = getattr(account, 'subscription', None)
else:
subscription = Subscription.objects.select_related('account', 'account__plan').get(id=subscription_id)
account = subscription.account
if not account or not account.plan:
return error_response(
error='Account or plan not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
# Calculate period dates based on billing cycle
now = timezone.now()
if account.plan.billing_cycle == 'monthly':
period_end = now + timedelta(days=30 * period_months)
elif account.plan.billing_cycle == 'annual':
period_end = now + timedelta(days=365 * period_months)
else:
period_end = now + timedelta(days=30 * period_months)
# Create or update subscription
if not subscription:
subscription = Subscription.objects.create(
account=account,
payment_method='bank_transfer',
external_payment_id=external_payment_id,
status='active',
current_period_start=now,
current_period_end=period_end,
cancel_at_period_end=False
)
else:
subscription.payment_method = 'bank_transfer'
subscription.external_payment_id = external_payment_id
subscription.status = 'active'
subscription.current_period_start = now
subscription.current_period_end = period_end
subscription.cancel_at_period_end = False
subscription.save()
# Update account
account.payment_method = 'bank_transfer'
account.status = 'active'
monthly_credits = account.plan.get_effective_credits_per_month()
account.credits = monthly_credits
account.save()
# Log transaction
CreditTransaction.objects.create(
account=account,
transaction_type='subscription',
amount=monthly_credits,
balance_after=monthly_credits,
description=f'Bank transfer payment confirmed: {external_payment_id}',
metadata={
'external_payment_id': external_payment_id,
'amount': str(amount),
'payer_name': payer_name,
'proof_url': proof_url if proof_url else '',
'period_months': period_months,
'confirmed_by': request.user.email
}
)
logger.info(
f'Bank transfer confirmed for account {account.id}: '
f'{external_payment_id}, {amount}, {monthly_credits} credits added'
)
return success_response(
data={
'account_id': account.id,
'subscription_id': subscription.id,
'status': 'active',
'credits': account.credits,
'period_start': subscription.current_period_start.isoformat(),
'period_end': subscription.current_period_end.isoformat()
},
message='Bank transfer confirmed successfully',
request=request
)
except Account.DoesNotExist:
return error_response(
error='Account not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except Subscription.DoesNotExist:
return error_response(
error='Subscription not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except Exception as e:
logger.error(f'Error confirming bank transfer: {str(e)}', exc_info=True)
return error_response(
error=f'Failed to confirm payment: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=False, methods=['get'], url_path='payment-methods', permission_classes=[AllowAny])
def list_payment_methods(self, request):
"""
Get available payment methods for a specific country.
Public endpoint - only returns enabled payment methods.
Does not expose sensitive configuration details.
Query params:
country: ISO 2-letter country code (default: 'US')
Returns payment methods filtered by country.
"""
country = request.GET.get('country', 'US').upper()
# Get country-specific methods
methods = PaymentMethodConfig.objects.filter(
country_code=country,
is_enabled=True
).order_by('sort_order')
# Serialize using the proper serializer
serializer = PaymentMethodConfigSerializer(methods, many=True)
# Return in consistent format
return Response({
'success': True,
'results': serializer.data
}, status=status.HTTP_200_OK)
@action(detail=False, methods=['post'], url_path='payments/confirm', permission_classes=[IsAuthenticatedAndActive])
def confirm_payment(self, request):
"""
User confirms manual payment (bank transfer or local wallet).
Creates Payment record with status='pending_approval' for admin review.
Request body:
{
"invoice_id": 123,
"payment_method": "bank_transfer",
"manual_reference": "BT-20251208-12345",
"manual_notes": "Transferred via ABC Bank",
"amount": "29.00",
"proof_url": "https://..." // optional
}
"""
serializer = PaymentConfirmationSerializer(data=request.data)
if not serializer.is_valid():
return error_response(
error=serializer.errors,
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
invoice_id = serializer.validated_data['invoice_id']
payment_method = serializer.validated_data['payment_method']
manual_reference = serializer.validated_data['manual_reference']
manual_notes = serializer.validated_data.get('manual_notes', '')
amount = serializer.validated_data['amount']
proof_url = serializer.validated_data.get('proof_url')
try:
# Get invoice - must belong to user's account
invoice = Invoice.objects.select_related('account').get(
id=invoice_id,
account=request.account
)
# Check if payment already exists for this invoice
existing_payment = Payment.objects.filter(
invoice=invoice,
status__in=['pending_approval', 'succeeded']
).first()
if existing_payment:
if existing_payment.status == 'succeeded':
return error_response(
error='This invoice has already been paid and approved.',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
else:
return error_response(
error=f'A payment confirmation is already pending approval for this invoice (Payment ID: {existing_payment.id}).',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Validate amount matches invoice
if amount != invoice.total:
return error_response(
error=f'Amount mismatch. Invoice total is {invoice.total} {invoice.currency}',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Create payment record with pending approval status
payment = Payment.objects.create(
account=request.account,
invoice=invoice,
amount=amount,
currency=invoice.currency,
status='pending_approval',
payment_method=payment_method,
manual_reference=manual_reference,
manual_notes=manual_notes,
metadata={'proof_url': proof_url, 'submitted_by': request.user.email} if proof_url else {'submitted_by': request.user.email}
)
logger.info(
f'Payment confirmation submitted: Payment {payment.id}, '
f'Invoice {invoice.invoice_number}, Account {request.account.id}, '
f'Reference: {manual_reference}'
)
# Send email notification to user
try:
from igny8_core.business.billing.services.email_service import BillingEmailService
BillingEmailService.send_payment_confirmation_email(payment, request.account)
except Exception as e:
logger.error(f'Failed to send payment confirmation email: {str(e)}')
return success_response(
data={
'payment_id': payment.id,
'invoice_id': invoice.id,
'invoice_number': invoice.invoice_number,
'status': 'pending_approval',
'amount': str(amount),
'currency': invoice.currency,
'manual_reference': manual_reference
},
message='Payment confirmation submitted for review. You will be notified once approved.',
request=request
)
except Invoice.DoesNotExist:
return error_response(
error='Invoice not found. Please check the invoice ID or contact support.',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except ValueError as ve:
return error_response(
error=f'Invalid amount format: {str(ve)}',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
except Exception as e:
logger.error(f'Error confirming payment: {str(e)}', exc_info=True)
return error_response(
error='An unexpected error occurred while processing your payment confirmation. Please try again or contact support.',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['post'], url_path='approve', permission_classes=[IsAdminOrOwner])
def approve_payment(self, request, pk=None):
"""
Admin approves a manual payment.
Atomically updates: payment status → invoice paid → subscription active → account active → add credits.
Request body:
{
"admin_notes": "Verified payment in bank statement"
}
"""
admin_notes = request.data.get('admin_notes', '')
try:
with transaction.atomic():
# Get payment with all related objects to prevent N+1 queries
payment = Payment.objects.select_related(
'invoice',
'invoice__subscription',
'invoice__subscription__plan',
'account',
'account__subscription',
'account__subscription__plan',
'account__plan'
).get(id=pk)
if payment.status != 'pending_approval':
status_msg = {
'succeeded': 'This payment has already been approved and processed',
'failed': 'This payment was previously rejected and cannot be approved',
'refunded': 'This payment was refunded and cannot be re-approved'
}.get(payment.status, f'Payment has invalid status: {payment.status}')
return error_response(
error=status_msg,
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
invoice = payment.invoice
account = payment.account
# Validate invoice is still pending
if invoice.status == 'paid':
return error_response(
error='Invoice is already marked as paid. Payment cannot be approved again.',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Validate invoice is not void
if invoice.status == 'void':
return error_response(
error='Invoice has been voided. Payment cannot be approved for a void invoice.',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Validate amount matches
if payment.amount != invoice.total:
return error_response(
error=f'Payment amount ({payment.currency} {payment.amount}) does not match invoice total ({invoice.currency} {invoice.total}). Please verify the payment.',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Get subscription from invoice first, fallback to account.subscription
subscription = None
if invoice and hasattr(invoice, 'subscription') and invoice.subscription:
subscription = invoice.subscription
elif account and hasattr(account, 'subscription'):
try:
subscription = account.subscription
except Exception:
pass
# 1. Update Payment
payment.status = 'succeeded'
payment.approved_by = request.user
payment.approved_at = timezone.now()
payment.processed_at = timezone.now()
payment.admin_notes = admin_notes
payment.save(update_fields=['status', 'approved_by', 'approved_at', 'processed_at', 'admin_notes'])
# 2. Update Invoice
invoice.status = 'paid'
invoice.paid_at = timezone.now()
invoice.save(update_fields=['status', 'paid_at'])
# 3. Update Subscription
if subscription:
subscription.status = 'active'
subscription.external_payment_id = payment.manual_reference
subscription.save(update_fields=['status', 'external_payment_id'])
# 4. Update Account
account.status = 'active'
account.save(update_fields=['status'])
# 5. Add Credits (if subscription has plan)
credits_added = 0
try:
if subscription and subscription.plan and subscription.plan.included_credits > 0:
credits_added = subscription.plan.included_credits
# Use CreditService to add credits
CreditService.add_credits(
account=account,
amount=credits_added,
transaction_type='subscription',
description=f'{subscription.plan.name} plan credits - Invoice {invoice.invoice_number}',
metadata={
'subscription_id': subscription.id,
'invoice_id': invoice.id,
'payment_id': payment.id,
'plan_id': subscription.plan.id,
'approved_by': request.user.email
}
)
elif account and account.plan and account.plan.included_credits > 0:
# Fallback: use account plan if subscription not found
credits_added = account.plan.included_credits
CreditService.add_credits(
account=account,
amount=credits_added,
transaction_type='subscription',
description=f'{account.plan.name} plan credits - Invoice {invoice.invoice_number}',
metadata={
'invoice_id': invoice.id,
'payment_id': payment.id,
'plan_id': account.plan.id,
'approved_by': request.user.email,
'fallback': 'account_plan'
}
)
except Exception as credit_error:
# Rollback payment approval if credit addition fails
logger.error(f'Credit addition failed for payment {payment.id}: {credit_error}', exc_info=True)
raise Exception(f'Failed to add credits: {str(credit_error)}') from credit_error
logger.info(
f'Payment approved: Payment {payment.id}, Invoice {invoice.invoice_number}, '
f'Account {account.id} activated, {credits_added} credits added'
)
# Send activation email to user
try:
from igny8_core.business.billing.services.email_service import BillingEmailService
BillingEmailService.send_payment_approved_email(payment, account, subscription)
except Exception as e:
logger.error(f'Failed to send payment approved email: {str(e)}')
return success_response(
data={
'payment_id': payment.id,
'invoice_id': invoice.id,
'invoice_number': invoice.invoice_number,
'account_id': account.id,
'account_status': account.status,
'subscription_status': subscription.status if subscription else None,
'credits_added': credits_added,
'total_credits': account.credits,
'approved_by': request.user.email,
'approved_at': payment.approved_at.isoformat()
},
message='Payment approved successfully. Account activated.',
request=request
)
except Payment.DoesNotExist:
return error_response(
error='Payment not found. The payment may have been deleted or the ID is incorrect.',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except Exception as e:
logger.error(f'Error approving payment {pk}: {str(e)}', exc_info=True)
# Provide specific error messages
error_msg = str(e)
if 'credit' in error_msg.lower():
error_msg = 'Failed to add credits to account. Payment not approved. Please check the plan configuration.'
elif 'subscription' in error_msg.lower():
error_msg = 'Failed to activate subscription. Payment not approved. Please verify subscription exists.'
else:
error_msg = f'Payment approval failed: {error_msg}'
return error_response(
error=error_msg,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['post'], url_path='reject', permission_classes=[IsAdminOrOwner])
def reject_payment(self, request, pk=None):
"""
Admin rejects a manual payment.
Request body:
{
"admin_notes": "Transaction reference not found in bank statement"
}
"""
admin_notes = request.data.get('admin_notes', 'Payment rejected by admin')
try:
payment = Payment.objects.get(id=pk)
if payment.status != 'pending_approval':
return error_response(
error=f'Payment is not pending approval (current status: {payment.status})',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
payment.status = 'failed'
payment.approved_by = request.user
payment.approved_at = timezone.now()
payment.failed_at = timezone.now()
payment.admin_notes = admin_notes
payment.failure_reason = admin_notes
payment.save(update_fields=['status', 'approved_by', 'approved_at', 'failed_at', 'admin_notes', 'failure_reason'])
# Update account status to allow retry
account = payment.account
if account.status != 'active':
account.status = 'pending_payment'
account.save(update_fields=['status'])
logger.info(f'Payment rejected: Payment {payment.id}, Reason: {admin_notes}')
# Send rejection email to user
try:
from igny8_core.business.billing.services.email_service import BillingEmailService
BillingEmailService.send_payment_rejected_email(payment, account, admin_notes)
except Exception as e:
logger.error(f'Failed to send payment rejected email: {str(e)}')
return success_response(
data={
'payment_id': payment.id,
'status': 'failed',
'rejected_by': request.user.email,
'rejected_at': payment.approved_at.isoformat()
},
message='Payment rejected.',
request=request
)
except Payment.DoesNotExist:
return error_response(
error='Payment not found. The payment may have been deleted or the ID is incorrect.',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except Exception as e:
logger.error(f'Error rejecting payment {pk}: {str(e)}', exc_info=True)
return error_response(
error=f'Failed to reject payment. Please try again or contact technical support.',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
class InvoiceViewSet(AccountModelViewSet):
"""ViewSet for user-facing invoices"""
queryset = Invoice.objects.all().select_related('account')
permission_classes = [IsAuthenticatedAndActive, HasTenantAccess]
pagination_class = CustomPageNumberPagination
def get_queryset(self):
"""Filter invoices by account"""
queryset = super().get_queryset()
if hasattr(self.request, 'account') and self.request.account:
queryset = queryset.filter(account=self.request.account)
return queryset.order_by('-invoice_date', '-created_at')
def list(self, request):
"""List invoices for current account"""
queryset = self.get_queryset()
# Filter by status if provided
status_param = request.query_params.get('status')
if status_param:
queryset = queryset.filter(status=status_param)
paginator = self.pagination_class()
page = paginator.paginate_queryset(queryset, request)
# Serialize invoice data
results = []
for invoice in (page if page is not None else []):
results.append({
'id': invoice.id,
'invoice_number': invoice.invoice_number,
'status': invoice.status,
'total': str(invoice.total), # Alias for compatibility
'total_amount': str(invoice.total),
'subtotal': str(invoice.subtotal),
'tax_amount': str(invoice.tax),
'currency': invoice.currency,
'invoice_date': invoice.invoice_date.isoformat(),
'due_date': invoice.due_date.isoformat(),
'paid_at': invoice.paid_at.isoformat() if invoice.paid_at else None,
'line_items': invoice.line_items,
'billing_email': invoice.billing_email,
'notes': invoice.notes,
'created_at': invoice.created_at.isoformat(),
})
return paginated_response(
{'count': paginator.page.paginator.count, 'next': paginator.get_next_link(), 'previous': paginator.get_previous_link(), 'results': results},
request=request
)
def retrieve(self, request, pk=None):
"""Get invoice detail"""
try:
invoice = self.get_queryset().get(pk=pk)
data = {
'id': invoice.id,
'invoice_number': invoice.invoice_number,
'status': invoice.status,
'total': str(invoice.total), # Alias for compatibility
'total_amount': str(invoice.total),
'subtotal': str(invoice.subtotal),
'tax_amount': str(invoice.tax),
'currency': invoice.currency,
'invoice_date': invoice.invoice_date.isoformat(),
'due_date': invoice.due_date.isoformat(),
'paid_at': invoice.paid_at.isoformat() if invoice.paid_at else None,
'line_items': invoice.line_items,
'billing_email': invoice.billing_email,
'notes': invoice.notes,
'created_at': invoice.created_at.isoformat(),
}
return success_response(data=data, request=request)
except Invoice.DoesNotExist:
return error_response(error='Invoice not found', status_code=404, request=request)
@action(detail=True, methods=['get'])
def download_pdf(self, request, pk=None):
"""Download invoice PDF"""
try:
invoice = self.get_queryset().get(pk=pk)
pdf_bytes = InvoiceService.generate_pdf(invoice)
response = HttpResponse(pdf_bytes, content_type='application/pdf')
response['Content-Disposition'] = f'attachment; filename="invoice-{invoice.invoice_number}.pdf"'
return response
except Invoice.DoesNotExist:
return error_response(error='Invoice not found', status_code=404, request=request)
class PaymentViewSet(AccountModelViewSet):
"""ViewSet for user-facing payments"""
queryset = Payment.objects.all().select_related('account', 'invoice')
permission_classes = [IsAuthenticatedAndActive, HasTenantAccess]
pagination_class = CustomPageNumberPagination
throttle_scope = 'payment_confirmation'
def get_throttles(self):
"""Apply stricter throttling to manual payment submission"""
from rest_framework.throttling import UserRateThrottle
if self.action == 'manual':
# 5 payment submissions per hour per user
class PaymentSubmissionThrottle(UserRateThrottle):
rate = '5/hour'
return [PaymentSubmissionThrottle()]
return super().get_throttles()
def get_queryset(self):
"""Filter payments by account"""
queryset = super().get_queryset()
if hasattr(self.request, 'account') and self.request.account:
queryset = queryset.filter(account=self.request.account)
return queryset.order_by('-created_at')
def list(self, request):
"""List payments for current account"""
queryset = self.get_queryset()
# Filter by status if provided
status_param = request.query_params.get('status')
if status_param:
queryset = queryset.filter(status=status_param)
# Filter by invoice if provided
invoice_id = request.query_params.get('invoice_id')
if invoice_id:
queryset = queryset.filter(invoice_id=invoice_id)
paginator = self.pagination_class()
page = paginator.paginate_queryset(queryset, request)
# Serialize payment data
results = []
for payment in (page if page is not None else []):
results.append({
'id': payment.id,
'invoice_id': payment.invoice_id,
'invoice_number': payment.invoice.invoice_number if payment.invoice else None,
'amount': str(payment.amount),
'currency': payment.currency,
'status': payment.status,
'payment_method': payment.payment_method,
'created_at': payment.created_at.isoformat(),
'processed_at': payment.processed_at.isoformat() if payment.processed_at else None,
'manual_reference': payment.manual_reference,
'manual_notes': payment.manual_notes,
# admin_notes intentionally excluded - internal only
})
return paginated_response(
{'count': paginator.page.paginator.count, 'next': paginator.get_next_link(), 'previous': paginator.get_previous_link(), 'results': results},
request=request
)
@action(detail=False, methods=['post'])
def manual(self, request):
"""Submit manual payment for approval"""
invoice_id = request.data.get('invoice_id')
amount = request.data.get('amount')
payment_method = request.data.get('payment_method', 'bank_transfer')
reference = request.data.get('reference', '')
notes = request.data.get('notes', '')
if not amount:
return error_response(error='Amount is required', status_code=400, request=request)
try:
account = request.account
invoice = None
if invoice_id:
invoice = Invoice.objects.get(id=invoice_id, account=account)
payment = Payment.objects.create(
account=account,
invoice=invoice,
amount=amount,
currency='USD',
payment_method=payment_method,
status='pending_approval',
manual_reference=reference,
manual_notes=notes,
)
return success_response(
data={'id': payment.id, 'status': payment.status},
message='Manual payment submitted for approval',
status_code=201,
request=request
)
except Invoice.DoesNotExist:
return error_response(error='Invoice not found', status_code=404, request=request)
class CreditPackageViewSet(viewsets.ReadOnlyModelViewSet):
"""ViewSet for credit packages (read-only for users)"""
queryset = CreditPackage.objects.filter(is_active=True).order_by('sort_order')
permission_classes = [IsAuthenticatedAndActive]
pagination_class = CustomPageNumberPagination
def list(self, request):
"""List available credit packages"""
queryset = self.get_queryset()
paginator = self.pagination_class()
page = paginator.paginate_queryset(queryset, request)
results = []
for package in (page if page is not None else []):
results.append({
'id': package.id,
'name': package.name,
'slug': package.slug,
'credits': package.credits,
'price': str(package.price),
'discount_percentage': package.discount_percentage,
'is_featured': package.is_featured,
'description': package.description,
'display_order': package.sort_order,
})
return paginated_response(
{'count': paginator.page.paginator.count, 'next': paginator.get_next_link(), 'previous': paginator.get_previous_link(), 'results': results},
request=request
)
class AccountPaymentMethodViewSet(AccountModelViewSet):
"""ViewSet for account payment methods"""
queryset = AccountPaymentMethod.objects.all()
permission_classes = [IsAuthenticatedAndActive, HasTenantAccess]
pagination_class = CustomPageNumberPagination
def get_queryset(self):
"""Filter payment methods by account"""
queryset = super().get_queryset()
if hasattr(self.request, 'account') and self.request.account:
queryset = queryset.filter(account=self.request.account)
return queryset.order_by('-is_default', 'type')
def list(self, request):
"""List payment methods for current account"""
queryset = self.get_queryset()
paginator = self.pagination_class()
page = paginator.paginate_queryset(queryset, request)
results = []
for method in (page if page is not None else []):
results.append({
'id': str(method.id),
'type': method.type,
'display_name': method.display_name,
'is_default': method.is_default,
'is_enabled': method.is_enabled if hasattr(method, 'is_enabled') else True,
'instructions': method.instructions,
})
return paginated_response(
{'count': paginator.page.paginator.count, 'next': paginator.get_next_link(), 'previous': paginator.get_previous_link(), 'results': results},
request=request
)
# ============================================================================
# USAGE SUMMARY (Plan Limits) - User-facing endpoint
# ============================================================================
from rest_framework.decorators import api_view, permission_classes
@api_view(['GET'])
@permission_classes([IsAuthenticatedAndActive, HasTenantAccess])
def get_usage_summary(request):
"""
Get comprehensive usage summary for current account.
Includes hard limits (sites, users, keywords, clusters) and monthly limits (ideas, words, images).
GET /api/v1/billing/usage-summary/
"""
try:
account = getattr(request, 'account', None)
if not account:
return error_response(
error='Account not found.',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
from igny8_core.business.billing.services.limit_service import LimitService
summary = LimitService.get_usage_summary(account)
return success_response(
data=summary,
message='Usage summary retrieved successfully.',
request=request
)
except Exception as e:
logger.error(f'Error getting usage summary: {str(e)}', exc_info=True)
return error_response(
error=f'Failed to retrieve usage summary: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)