Files
igny8/backend/igny8_core/business/billing/views.py
2025-12-09 00:11:35 +00:00

720 lines
30 KiB
Python

"""
Billing Views - Payment confirmation and management
"""
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import AllowAny
from django.db import transaction
from django.db.models import Q
from django.utils import timezone
from django.http import HttpResponse
from datetime import timedelta
from igny8_core.api.response import success_response, error_response, paginated_response
from igny8_core.api.permissions import IsAdminOrOwner, IsAuthenticatedAndActive, HasTenantAccess
from igny8_core.api.base import AccountModelViewSet
from igny8_core.api.pagination import CustomPageNumberPagination
from igny8_core.auth.models import Account, Subscription
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.services.invoice_service import InvoiceService
from igny8_core.business.billing.models import (
CreditTransaction, Invoice, Payment, CreditPackage,
AccountPaymentMethod, PaymentMethodConfig
)
from igny8_core.modules.billing.serializers import PaymentMethodConfigSerializer, PaymentConfirmationSerializer
import logging
logger = logging.getLogger(__name__)
class BillingViewSet(viewsets.GenericViewSet):
"""
ViewSet for billing operations (admin-only).
"""
permission_classes = [IsAdminOrOwner]
@action(detail=False, methods=['post'], url_path='confirm-bank-transfer')
def confirm_bank_transfer(self, request):
"""
Confirm a bank transfer payment and activate/renew subscription.
Request body:
{
"account_id": 123,
"external_payment_id": "BT-2025-001",
"amount": "29.99",
"payer_name": "John Doe",
"proof_url": "https://...",
"period_months": 1
}
"""
account_id = request.data.get('account_id')
subscription_id = request.data.get('subscription_id')
external_payment_id = request.data.get('external_payment_id')
amount = request.data.get('amount')
payer_name = request.data.get('payer_name')
proof_url = request.data.get('proof_url')
period_months = int(request.data.get('period_months', 1))
if not all([external_payment_id, amount, payer_name]):
return error_response(
error='external_payment_id, amount, and payer_name are required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
if not account_id and not subscription_id:
return error_response(
error='Either account_id or subscription_id is required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
try:
with transaction.atomic():
# Get account
if account_id:
account = Account.objects.select_related('plan').get(id=account_id)
subscription = getattr(account, 'subscription', None)
else:
subscription = Subscription.objects.select_related('account', 'account__plan').get(id=subscription_id)
account = subscription.account
if not account or not account.plan:
return error_response(
error='Account or plan not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
# Calculate period dates based on billing cycle
now = timezone.now()
if account.plan.billing_cycle == 'monthly':
period_end = now + timedelta(days=30 * period_months)
elif account.plan.billing_cycle == 'annual':
period_end = now + timedelta(days=365 * period_months)
else:
period_end = now + timedelta(days=30 * period_months)
# Create or update subscription
if not subscription:
subscription = Subscription.objects.create(
account=account,
payment_method='bank_transfer',
external_payment_id=external_payment_id,
status='active',
current_period_start=now,
current_period_end=period_end,
cancel_at_period_end=False
)
else:
subscription.payment_method = 'bank_transfer'
subscription.external_payment_id = external_payment_id
subscription.status = 'active'
subscription.current_period_start = now
subscription.current_period_end = period_end
subscription.cancel_at_period_end = False
subscription.save()
# Update account
account.payment_method = 'bank_transfer'
account.status = 'active'
monthly_credits = account.plan.get_effective_credits_per_month()
account.credits = monthly_credits
account.save()
# Log transaction
CreditTransaction.objects.create(
account=account,
transaction_type='subscription',
amount=monthly_credits,
balance_after=monthly_credits,
description=f'Bank transfer payment confirmed: {external_payment_id}',
metadata={
'external_payment_id': external_payment_id,
'amount': str(amount),
'payer_name': payer_name,
'proof_url': proof_url if proof_url else '',
'period_months': period_months,
'confirmed_by': request.user.email
}
)
logger.info(
f'Bank transfer confirmed for account {account.id}: '
f'{external_payment_id}, {amount}, {monthly_credits} credits added'
)
return success_response(
data={
'account_id': account.id,
'subscription_id': subscription.id,
'status': 'active',
'credits': account.credits,
'period_start': subscription.current_period_start.isoformat(),
'period_end': subscription.current_period_end.isoformat()
},
message='Bank transfer confirmed successfully',
request=request
)
except Account.DoesNotExist:
return error_response(
error='Account not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except Subscription.DoesNotExist:
return error_response(
error='Subscription not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except Exception as e:
logger.error(f'Error confirming bank transfer: {str(e)}', exc_info=True)
return error_response(
error=f'Failed to confirm payment: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=False, methods=['get'], url_path='payment-methods', permission_classes=[AllowAny])
def list_payment_methods(self, request):
"""
Get available payment methods for a specific country.
Query params:
country: ISO 2-letter country code (default: '*' for global)
Returns payment methods filtered by country (country-specific + global).
"""
country = request.GET.get('country', '*').upper()
# Get country-specific + global methods
methods = PaymentMethodConfig.objects.filter(
Q(country_code=country) | Q(country_code='*'),
is_enabled=True
).order_by('sort_order')
serializer = PaymentMethodConfigSerializer(methods, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
@action(detail=False, methods=['post'], url_path='payments/confirm', permission_classes=[IsAuthenticatedAndActive])
def confirm_payment(self, request):
"""
User confirms manual payment (bank transfer or local wallet).
Creates Payment record with status='pending_approval' for admin review.
Request body:
{
"invoice_id": 123,
"payment_method": "bank_transfer",
"manual_reference": "BT-20251208-12345",
"manual_notes": "Transferred via ABC Bank",
"amount": "29.00",
"proof_url": "https://..." // optional
}
"""
serializer = PaymentConfirmationSerializer(data=request.data)
if not serializer.is_valid():
return error_response(
error=serializer.errors,
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
invoice_id = serializer.validated_data['invoice_id']
payment_method = serializer.validated_data['payment_method']
manual_reference = serializer.validated_data['manual_reference']
manual_notes = serializer.validated_data.get('manual_notes', '')
amount = serializer.validated_data['amount']
proof_url = serializer.validated_data.get('proof_url')
try:
# Get invoice - must belong to user's account
invoice = Invoice.objects.select_related('account').get(
id=invoice_id,
account=request.account
)
# Validate amount matches invoice
if amount != invoice.total:
return error_response(
error=f'Amount mismatch. Invoice total is {invoice.total} {invoice.currency}',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Create payment record with pending approval status
payment = Payment.objects.create(
account=request.account,
invoice=invoice,
amount=amount,
currency=invoice.currency,
status='pending_approval',
payment_method=payment_method,
manual_reference=manual_reference,
manual_notes=manual_notes,
metadata={'proof_url': proof_url, 'submitted_by': request.user.email} if proof_url else {'submitted_by': request.user.email}
)
logger.info(
f'Payment confirmation submitted: Payment {payment.id}, '
f'Invoice {invoice.invoice_number}, Account {request.account.id}, '
f'Reference: {manual_reference}'
)
# TODO: Send notification to admin
# send_payment_confirmation_notification(payment)
return success_response(
data={
'payment_id': payment.id,
'invoice_id': invoice.id,
'invoice_number': invoice.invoice_number,
'status': 'pending_approval',
'amount': str(amount),
'currency': invoice.currency,
'manual_reference': manual_reference
},
message='Payment confirmation submitted for review. You will be notified once approved.',
request=request
)
except Invoice.DoesNotExist:
return error_response(
error='Invoice not found or does not belong to your account',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except Exception as e:
logger.error(f'Error confirming payment: {str(e)}', exc_info=True)
return error_response(
error=f'Failed to submit payment confirmation: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['post'], url_path='approve', permission_classes=[IsAdminOrOwner])
def approve_payment(self, request, pk=None):
"""
Admin approves a manual payment.
Atomically updates: payment status → invoice paid → subscription active → account active → add credits.
Request body:
{
"admin_notes": "Verified payment in bank statement"
}
"""
admin_notes = request.data.get('admin_notes', '')
try:
with transaction.atomic():
# Get payment with related objects
payment = Payment.objects.select_related(
'invoice',
'invoice__subscription',
'invoice__subscription__plan',
'account'
).get(id=pk)
if payment.status != 'pending_approval':
return error_response(
error=f'Payment is not pending approval (current status: {payment.status})',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
invoice = payment.invoice
subscription = invoice.subscription
account = payment.account
# 1. Update Payment
payment.status = 'succeeded'
payment.approved_by = request.user
payment.approved_at = timezone.now()
payment.processed_at = timezone.now()
payment.admin_notes = admin_notes
payment.save(update_fields=['status', 'approved_by', 'approved_at', 'processed_at', 'admin_notes'])
# 2. Update Invoice
invoice.status = 'paid'
invoice.paid_at = timezone.now()
invoice.save(update_fields=['status', 'paid_at'])
# 3. Update Subscription
if subscription:
subscription.status = 'active'
subscription.external_payment_id = payment.manual_reference
subscription.save(update_fields=['status', 'external_payment_id'])
# 4. Update Account
account.status = 'active'
account.save(update_fields=['status'])
# 5. Add Credits (if subscription has plan)
credits_added = 0
if subscription and subscription.plan:
credits_added = subscription.plan.included_credits
# Use CreditService to add credits
CreditService.add_credits(
account=account,
amount=credits_added,
transaction_type='subscription',
description=f'{subscription.plan.name} plan credits - Invoice {invoice.invoice_number}',
metadata={
'subscription_id': subscription.id,
'invoice_id': invoice.id,
'payment_id': payment.id,
'plan_id': subscription.plan.id,
'approved_by': request.user.email
}
)
logger.info(
f'Payment approved: Payment {payment.id}, Invoice {invoice.invoice_number}, '
f'Account {account.id} activated, {credits_added} credits added'
)
# TODO: Send activation email to user
# send_account_activated_email(account, subscription)
return success_response(
data={
'payment_id': payment.id,
'invoice_id': invoice.id,
'invoice_number': invoice.invoice_number,
'account_id': account.id,
'account_status': account.status,
'subscription_status': subscription.status if subscription else None,
'credits_added': credits_added,
'total_credits': account.credits,
'approved_by': request.user.email,
'approved_at': payment.approved_at.isoformat()
},
message='Payment approved successfully. Account activated.',
request=request
)
except Payment.DoesNotExist:
return error_response(
error='Payment not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except Exception as e:
logger.error(f'Error approving payment: {str(e)}', exc_info=True)
return error_response(
error=f'Failed to approve payment: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['post'], url_path='reject', permission_classes=[IsAdminOrOwner])
def reject_payment(self, request, pk=None):
"""
Admin rejects a manual payment.
Request body:
{
"admin_notes": "Transaction reference not found in bank statement"
}
"""
admin_notes = request.data.get('admin_notes', 'Payment rejected by admin')
try:
payment = Payment.objects.get(id=pk)
if payment.status != 'pending_approval':
return error_response(
error=f'Payment is not pending approval (current status: {payment.status})',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
payment.status = 'failed'
payment.approved_by = request.user
payment.approved_at = timezone.now()
payment.failed_at = timezone.now()
payment.admin_notes = admin_notes
payment.failure_reason = admin_notes
payment.save(update_fields=['status', 'approved_by', 'approved_at', 'failed_at', 'admin_notes', 'failure_reason'])
logger.info(f'Payment rejected: Payment {payment.id}, Reason: {admin_notes}')
# TODO: Send rejection email to user
# send_payment_rejected_email(payment)
return success_response(
data={
'payment_id': payment.id,
'status': 'failed',
'rejected_by': request.user.email,
'rejected_at': payment.approved_at.isoformat()
},
message='Payment rejected.',
request=request
)
except Payment.DoesNotExist:
return error_response(
error='Payment not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except Exception as e:
logger.error(f'Error rejecting payment: {str(e)}', exc_info=True)
return error_response(
error=f'Failed to reject payment: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
class InvoiceViewSet(AccountModelViewSet):
"""ViewSet for user-facing invoices"""
queryset = Invoice.objects.all().select_related('account')
permission_classes = [IsAuthenticatedAndActive, HasTenantAccess]
pagination_class = CustomPageNumberPagination
def get_queryset(self):
"""Filter invoices by account"""
queryset = super().get_queryset()
if hasattr(self.request, 'account') and self.request.account:
queryset = queryset.filter(account=self.request.account)
return queryset.order_by('-invoice_date', '-created_at')
def list(self, request):
"""List invoices for current account"""
queryset = self.get_queryset()
# Filter by status if provided
status_param = request.query_params.get('status')
if status_param:
queryset = queryset.filter(status=status_param)
paginator = self.pagination_class()
page = paginator.paginate_queryset(queryset, request)
# Serialize invoice data
results = []
for invoice in (page if page is not None else []):
results.append({
'id': invoice.id,
'invoice_number': invoice.invoice_number,
'status': invoice.status,
'total_amount': str(invoice.total),
'subtotal': str(invoice.subtotal),
'tax_amount': str(invoice.tax),
'currency': invoice.currency,
'invoice_date': invoice.invoice_date.isoformat(),
'due_date': invoice.due_date.isoformat(),
'paid_at': invoice.paid_at.isoformat() if invoice.paid_at else None,
'line_items': invoice.line_items,
'billing_email': invoice.billing_email,
'notes': invoice.notes,
'created_at': invoice.created_at.isoformat(),
})
return paginated_response(
{'count': paginator.page.paginator.count, 'next': paginator.get_next_link(), 'previous': paginator.get_previous_link(), 'results': results},
request=request
)
def retrieve(self, request, pk=None):
"""Get invoice detail"""
try:
invoice = self.get_queryset().get(pk=pk)
data = {
'id': invoice.id,
'invoice_number': invoice.invoice_number,
'status': invoice.status,
'total_amount': str(invoice.total),
'subtotal': str(invoice.subtotal),
'tax_amount': str(invoice.tax),
'currency': invoice.currency,
'invoice_date': invoice.invoice_date.isoformat(),
'due_date': invoice.due_date.isoformat(),
'paid_at': invoice.paid_at.isoformat() if invoice.paid_at else None,
'line_items': invoice.line_items,
'billing_email': invoice.billing_email,
'notes': invoice.notes,
'created_at': invoice.created_at.isoformat(),
}
return success_response(data=data, request=request)
except Invoice.DoesNotExist:
return error_response(error='Invoice not found', status_code=404, request=request)
@action(detail=True, methods=['get'])
def download_pdf(self, request, pk=None):
"""Download invoice PDF"""
try:
invoice = self.get_queryset().get(pk=pk)
pdf_bytes = InvoiceService.generate_pdf(invoice)
response = HttpResponse(pdf_bytes, content_type='application/pdf')
response['Content-Disposition'] = f'attachment; filename="invoice-{invoice.invoice_number}.pdf"'
return response
except Invoice.DoesNotExist:
return error_response(error='Invoice not found', status_code=404, request=request)
class PaymentViewSet(AccountModelViewSet):
"""ViewSet for user-facing payments"""
queryset = Payment.objects.all().select_related('account', 'invoice')
permission_classes = [IsAuthenticatedAndActive, HasTenantAccess]
pagination_class = CustomPageNumberPagination
def get_queryset(self):
"""Filter payments by account"""
queryset = super().get_queryset()
if hasattr(self.request, 'account') and self.request.account:
queryset = queryset.filter(account=self.request.account)
return queryset.order_by('-created_at')
def list(self, request):
"""List payments for current account"""
queryset = self.get_queryset()
# Filter by status if provided
status_param = request.query_params.get('status')
if status_param:
queryset = queryset.filter(status=status_param)
# Filter by invoice if provided
invoice_id = request.query_params.get('invoice_id')
if invoice_id:
queryset = queryset.filter(invoice_id=invoice_id)
paginator = self.pagination_class()
page = paginator.paginate_queryset(queryset, request)
# Serialize payment data
results = []
for payment in (page if page is not None else []):
results.append({
'id': payment.id,
'invoice_id': payment.invoice_id,
'invoice_number': payment.invoice.invoice_number if payment.invoice else None,
'amount': str(payment.amount),
'currency': payment.currency,
'status': payment.status,
'payment_method': payment.payment_method,
'created_at': payment.created_at.isoformat(),
'processed_at': payment.processed_at.isoformat() if payment.processed_at else None,
'manual_reference': payment.manual_reference,
'manual_notes': payment.manual_notes,
})
return paginated_response(
{'count': paginator.page.paginator.count, 'next': paginator.get_next_link(), 'previous': paginator.get_previous_link(), 'results': results},
request=request
)
@action(detail=False, methods=['post'])
def manual(self, request):
"""Submit manual payment for approval"""
invoice_id = request.data.get('invoice_id')
amount = request.data.get('amount')
payment_method = request.data.get('payment_method', 'bank_transfer')
reference = request.data.get('reference', '')
notes = request.data.get('notes', '')
if not amount:
return error_response(error='Amount is required', status_code=400, request=request)
try:
account = request.account
invoice = None
if invoice_id:
invoice = Invoice.objects.get(id=invoice_id, account=account)
payment = Payment.objects.create(
account=account,
invoice=invoice,
amount=amount,
currency='USD',
payment_method=payment_method,
status='pending_approval',
manual_reference=reference,
manual_notes=notes,
)
return success_response(
data={'id': payment.id, 'status': payment.status},
message='Manual payment submitted for approval',
status_code=201,
request=request
)
except Invoice.DoesNotExist:
return error_response(error='Invoice not found', status_code=404, request=request)
class CreditPackageViewSet(viewsets.ReadOnlyModelViewSet):
"""ViewSet for credit packages (read-only for users)"""
queryset = CreditPackage.objects.filter(is_active=True).order_by('sort_order')
permission_classes = [IsAuthenticatedAndActive]
pagination_class = CustomPageNumberPagination
def list(self, request):
"""List available credit packages"""
queryset = self.get_queryset()
paginator = self.pagination_class()
page = paginator.paginate_queryset(queryset, request)
results = []
for package in (page if page is not None else []):
results.append({
'id': package.id,
'name': package.name,
'slug': package.slug,
'credits': package.credits,
'price': str(package.price),
'discount_percentage': package.discount_percentage,
'is_featured': package.is_featured,
'description': package.description,
'display_order': package.sort_order,
})
return paginated_response(
{'count': paginator.page.paginator.count, 'next': paginator.get_next_link(), 'previous': paginator.get_previous_link(), 'results': results},
request=request
)
class AccountPaymentMethodViewSet(AccountModelViewSet):
"""ViewSet for account payment methods"""
queryset = AccountPaymentMethod.objects.all()
permission_classes = [IsAuthenticatedAndActive, HasTenantAccess]
pagination_class = CustomPageNumberPagination
def get_queryset(self):
"""Filter payment methods by account"""
queryset = super().get_queryset()
if hasattr(self.request, 'account') and self.request.account:
queryset = queryset.filter(account=self.request.account)
return queryset.order_by('-is_default', 'type')
def list(self, request):
"""List payment methods for current account"""
queryset = self.get_queryset()
paginator = self.pagination_class()
page = paginator.paginate_queryset(queryset, request)
results = []
for method in (page if page is not None else []):
results.append({
'id': str(method.id),
'type': method.type,
'display_name': method.display_name,
'is_default': method.is_default,
'is_enabled': method.is_enabled if hasattr(method, 'is_enabled') else True,
'instructions': method.instructions,
})
return paginated_response(
{'count': paginator.page.paginator.count, 'next': paginator.get_next_link(), 'previous': paginator.get_previous_link(), 'results': results},
request=request
)