feat(billing): add missing payment methods and configurations

- Added migration to include global payment method configurations for Stripe and PayPal (both disabled).
- Ensured existing payment methods like bank transfer and manual payment are correctly configured.
- Added database constraints and indexes for improved data integrity in billing models.
- Introduced foreign key relationship between CreditTransaction and Payment models.
- Added webhook configuration fields to PaymentMethodConfig for future payment gateway integrations.
- Updated SignUpFormUnified component to handle payment method selection based on user country and plan.
- Implemented PaymentHistory component to display user's payment history with status indicators.
This commit is contained in:
IGNY8 VPS (Salman)
2025-12-09 06:14:44 +00:00
parent 72d0b6b0fd
commit 4d13a57068
36 changed files with 4159 additions and 253 deletions

View File

@@ -0,0 +1,47 @@
# Generated migration to fix subscription constraints
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('igny8_core_auth', '0011_remove_subscription_payment_method'),
]
operations = [
# Add unique constraint on tenant_id at database level
migrations.RunSQL(
sql="""
CREATE UNIQUE INDEX IF NOT EXISTS igny8_subscriptions_tenant_id_unique
ON igny8_subscriptions(tenant_id);
""",
reverse_sql="""
DROP INDEX IF EXISTS igny8_subscriptions_tenant_id_unique;
"""
),
# Make plan field required (non-nullable)
# First set default plan (ID 1 - Free Plan) for any null values
migrations.RunSQL(
sql="""
UPDATE igny8_subscriptions
SET plan_id = 1
WHERE plan_id IS NULL;
""",
reverse_sql=migrations.RunSQL.noop
),
# Now alter the field to be non-nullable
migrations.AlterField(
model_name='subscription',
name='plan',
field=models.ForeignKey(
on_delete=django.db.models.deletion.PROTECT,
related_name='subscriptions',
to='igny8_core_auth.plan',
help_text='Subscription plan (tracks historical plan even if account changes plan)'
),
),
]

View File

@@ -249,8 +249,6 @@ class Subscription(models.Model):
'igny8_core_auth.Plan',
on_delete=models.PROTECT,
related_name='subscriptions',
null=True,
blank=True,
help_text='Subscription plan (tracks historical plan even if account changes plan)'
)
stripe_subscription_id = models.CharField(

View File

@@ -235,6 +235,9 @@ class SiteUserAccessSerializer(serializers.ModelSerializer):
read_only_fields = ['granted_at']
from igny8_core.business.billing.models import PAYMENT_METHOD_CHOICES
class UserSerializer(serializers.ModelSerializer):
account = AccountSerializer(read_only=True)
accessible_sites = serializers.SerializerMethodField()
@@ -267,7 +270,7 @@ class RegisterSerializer(serializers.Serializer):
)
plan_slug = serializers.CharField(max_length=50, required=False)
payment_method = serializers.ChoiceField(
choices=['stripe', 'paypal', 'bank_transfer', 'local_wallet'],
choices=[choice[0] for choice in PAYMENT_METHOD_CHOICES],
default='bank_transfer',
required=False
)
@@ -291,6 +294,21 @@ class RegisterSerializer(serializers.Serializer):
if 'plan_id' in attrs and attrs.get('plan_id') == '':
attrs['plan_id'] = None
# Validate billing fields for paid plans
plan_slug = attrs.get('plan_slug')
paid_plans = ['starter', 'growth', 'scale']
if plan_slug and plan_slug in paid_plans:
# Require billing_country for paid plans
if not attrs.get('billing_country'):
raise serializers.ValidationError({
"billing_country": "Billing country is required for paid plans."
})
# Require payment_method for paid plans
if not attrs.get('payment_method'):
raise serializers.ValidationError({
"payment_method": "Payment method is required for paid plans."
})
return attrs
def create(self, validated_data):

View File

@@ -46,12 +46,36 @@ class RegisterView(APIView):
permission_classes = [permissions.AllowAny]
def post(self, request):
from .utils import generate_access_token, generate_refresh_token, get_token_expiry
from django.contrib.auth import login
serializer = RegisterSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
# Log the user in (create session for session authentication)
login(request, user)
# Get account from user
account = getattr(user, 'account', None)
# Generate JWT tokens
access_token = generate_access_token(user, account)
refresh_token = generate_refresh_token(user, account)
access_expires_at = get_token_expiry('access')
refresh_expires_at = get_token_expiry('refresh')
user_serializer = UserSerializer(user)
return success_response(
data={'user': user_serializer.data},
data={
'user': user_serializer.data,
'tokens': {
'access': access_token,
'refresh': refresh_token,
'access_expires_at': access_expires_at.isoformat(),
'refresh_expires_at': refresh_expires_at.isoformat(),
}
},
message='Registration successful',
status_code=status.HTTP_201_CREATED,
request=request

View File

@@ -139,13 +139,8 @@ class CreditPackageAdmin(admin.ModelAdmin):
readonly_fields = ['created_at', 'updated_at']
@admin.register(PaymentMethodConfig)
class PaymentMethodConfigAdmin(admin.ModelAdmin):
list_display = ['country_code', 'payment_method', 'is_enabled', 'display_name', 'sort_order']
list_filter = ['payment_method', 'is_enabled', 'country_code']
search_fields = ['country_code', 'display_name', 'payment_method']
readonly_fields = ['created_at', 'updated_at']
# PaymentMethodConfig admin is in modules/billing/admin.py - do not duplicate
# @admin.register(PaymentMethodConfig)
@admin.register(AccountPaymentMethod)
class AccountPaymentMethodAdmin(admin.ModelAdmin):

View File

@@ -0,0 +1,37 @@
"""
Billing configuration settings
"""
from django.conf import settings
# Payment Gateway Mode
PAYMENT_GATEWAY_MODE = getattr(settings, 'PAYMENT_GATEWAY_MODE', 'sandbox') # 'sandbox' or 'production'
# Auto-approve payments (development only)
AUTO_APPROVE_PAYMENTS = getattr(settings, 'AUTO_APPROVE_PAYMENTS', False)
# Invoice due date offset (days)
INVOICE_DUE_DATE_OFFSET = getattr(settings, 'INVOICE_DUE_DATE_OFFSET', 7)
# Grace period for payment (days)
PAYMENT_GRACE_PERIOD = getattr(settings, 'PAYMENT_GRACE_PERIOD', 7)
# Maximum payment retry attempts
MAX_PAYMENT_RETRIES = getattr(settings, 'MAX_PAYMENT_RETRIES', 3)
# Subscription renewal advance notice (days)
SUBSCRIPTION_RENEWAL_NOTICE_DAYS = getattr(settings, 'SUBSCRIPTION_RENEWAL_NOTICE_DAYS', 7)
# Default subscription plan slugs
DEFAULT_PLAN_SLUGS = {
'free': getattr(settings, 'FREE_PLAN_SLUG', 'basic-free'),
'starter': getattr(settings, 'STARTER_PLAN_SLUG', 'starter-10'),
'professional': getattr(settings, 'PROFESSIONAL_PLAN_SLUG', 'professional-100'),
'enterprise': getattr(settings, 'ENTERPRISE_PLAN_SLUG', 'enterprise-unlimited'),
}
# Credit package slugs
DEFAULT_CREDIT_PACKAGES = {
'small': getattr(settings, 'SMALL_CREDIT_PACKAGE_SLUG', 'credits-100'),
'medium': getattr(settings, 'MEDIUM_CREDIT_PACKAGE_SLUG', 'credits-500'),
'large': getattr(settings, 'LARGE_CREDIT_PACKAGE_SLUG', 'credits-1000'),
}

View File

@@ -8,6 +8,16 @@ from django.conf import settings
from igny8_core.auth.models import AccountBaseModel
# Centralized payment method choices - single source of truth
PAYMENT_METHOD_CHOICES = [
('stripe', 'Stripe (Credit/Debit Card)'),
('paypal', 'PayPal'),
('bank_transfer', 'Bank Transfer (Manual)'),
('local_wallet', 'Local Wallet (Manual)'),
('manual', 'Manual Payment'),
]
class CreditTransaction(AccountBaseModel):
"""Track all credit transactions (additions, deductions)"""
TRANSACTION_TYPE_CHOICES = [
@@ -23,11 +33,24 @@ class CreditTransaction(AccountBaseModel):
balance_after = models.IntegerField(help_text="Credit balance after this transaction")
description = models.CharField(max_length=255)
metadata = models.JSONField(default=dict, help_text="Additional context (AI call details, etc.)")
# Payment FK - preferred over reference_id string
payment = models.ForeignKey(
'billing.Payment',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='credit_transactions',
help_text='Payment that triggered this credit transaction'
)
# Deprecated: Use payment FK instead
reference_id = models.CharField(
max_length=255,
blank=True,
help_text="Optional reference (e.g., payment id, invoice id)"
help_text="DEPRECATED: Use payment FK. Legacy reference (e.g., payment id, invoice id)"
)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
@@ -181,6 +204,16 @@ class Invoice(AccountBaseModel):
invoice_number = models.CharField(max_length=50, unique=True, db_index=True)
# Subscription relationship
subscription = models.ForeignKey(
'igny8_core_auth.Subscription',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='invoices',
help_text='Subscription this invoice is for (if subscription-based)'
)
# Amounts
subtotal = models.DecimalField(max_digits=10, decimal_places=2, default=0)
tax = models.DecimalField(max_digits=10, decimal_places=2, default=0)
@@ -295,13 +328,8 @@ class Payment(AccountBaseModel):
('refunded', 'Refunded'), # Payment refunded (rare)
]
PAYMENT_METHOD_CHOICES = [
('stripe', 'Stripe (Credit/Debit Card)'),
('paypal', 'PayPal'),
('bank_transfer', 'Bank Transfer (Manual)'),
('local_wallet', 'Local Wallet (Manual)'),
('manual', 'Manual Payment'),
]
# Use centralized payment method choices
PAYMENT_METHOD_CHOICES = PAYMENT_METHOD_CHOICES
invoice = models.ForeignKey(Invoice, on_delete=models.CASCADE, related_name='payments')
@@ -310,7 +338,7 @@ class Payment(AccountBaseModel):
currency = models.CharField(max_length=3, default='USD')
# Status
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending', db_index=True)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending_approval', db_index=True)
# Payment method
payment_method = models.CharField(max_length=50, choices=PAYMENT_METHOD_CHOICES, db_index=True)
@@ -366,85 +394,6 @@ class Payment(AccountBaseModel):
def __str__(self):
return f"Payment {self.id} - {self.get_payment_method_display()} - {self.amount} {self.currency}"
def save(self, *args, **kwargs):
"""
Override save to automatically update related objects when payment is approved.
When status changes to 'succeeded', automatically:
1. Mark invoice as paid
2. Activate subscription
3. Activate account
4. Add credits
"""
# Check if status is changing to succeeded
is_new = self.pk is None
old_status = None
if not is_new:
try:
old_payment = Payment.objects.get(pk=self.pk)
old_status = old_payment.status
except Payment.DoesNotExist:
pass
# If status is changing to succeeded, trigger approval workflow
if self.status == 'succeeded' and old_status != 'succeeded':
from django.utils import timezone
from django.db import transaction
from igny8_core.business.billing.services.credit_service import CreditService
# Set approval timestamp if not set
if not self.processed_at:
self.processed_at = timezone.now()
if not self.approved_at:
self.approved_at = timezone.now()
# Save payment first
super().save(*args, **kwargs)
# Then update related objects in transaction
with transaction.atomic():
# 1. Update Invoice
if self.invoice:
self.invoice.status = 'paid'
self.invoice.paid_at = timezone.now()
self.invoice.save(update_fields=['status', 'paid_at'])
# 2. Update Account (MUST be before subscription check)
if self.account:
self.account.status = 'active'
self.account.save(update_fields=['status'])
# 3. Update Subscription via account.subscription (one-to-one relationship)
try:
if hasattr(self.account, 'subscription'):
subscription = self.account.subscription
subscription.status = 'active'
subscription.external_payment_id = self.manual_reference or f'payment-{self.id}'
subscription.save(update_fields=['status', 'external_payment_id'])
# 4. Add Credits from subscription plan
if subscription.plan and subscription.plan.included_credits > 0:
CreditService.add_credits(
account=self.account,
amount=subscription.plan.included_credits,
transaction_type='subscription',
description=f'{subscription.plan.name} - Invoice {self.invoice.invoice_number}',
metadata={
'subscription_id': subscription.id,
'invoice_id': self.invoice.id,
'payment_id': self.id,
'auto_approved': True
}
)
except Exception as e:
# Log error but don't fail payment save
import logging
logger = logging.getLogger(__name__)
logger.error(f'Error updating subscription/credits for payment {self.id}: {e}', exc_info=True)
else:
# Normal save
super().save(*args, **kwargs)
class CreditPackage(models.Model):
@@ -497,12 +446,8 @@ class PaymentMethodConfig(models.Model):
Configure payment methods availability per country
Allows enabling/disabling manual payments by region
"""
PAYMENT_METHOD_CHOICES = [
('stripe', 'Stripe'),
('paypal', 'PayPal'),
('bank_transfer', 'Bank Transfer'),
('local_wallet', 'Local Wallet'),
]
# Use centralized choices
PAYMENT_METHOD_CHOICES = PAYMENT_METHOD_CHOICES
country_code = models.CharField(
max_length=2,
@@ -526,6 +471,12 @@ class PaymentMethodConfig(models.Model):
wallet_type = models.CharField(max_length=100, blank=True, help_text="E.g., PayTM, PhonePe, etc.")
wallet_id = models.CharField(max_length=255, blank=True)
# Webhook configuration (Stripe/PayPal)
webhook_url = models.URLField(blank=True, help_text="Webhook URL for payment gateway callbacks")
webhook_secret = models.CharField(max_length=255, blank=True, help_text="Webhook secret for signature verification")
api_key = models.CharField(max_length=255, blank=True, help_text="API key for payment gateway integration")
api_secret = models.CharField(max_length=255, blank=True, help_text="API secret for payment gateway integration")
# Order/priority
sort_order = models.IntegerField(default=0)
@@ -549,12 +500,8 @@ class AccountPaymentMethod(AccountBaseModel):
Account-scoped payment methods (Stripe/PayPal/manual bank/wallet).
Only metadata/refs are stored here; no secrets.
"""
PAYMENT_METHOD_CHOICES = [
('stripe', 'Stripe'),
('paypal', 'PayPal'),
('bank_transfer', 'Bank Transfer'),
('local_wallet', 'Local Wallet'),
]
# Use centralized choices
PAYMENT_METHOD_CHOICES = PAYMENT_METHOD_CHOICES
type = models.CharField(max_length=50, choices=PAYMENT_METHOD_CHOICES, db_index=True)
display_name = models.CharField(max_length=100, help_text="User-visible label", default='')

View File

@@ -0,0 +1,228 @@
"""
Email service for billing notifications
"""
from django.core.mail import send_mail
from django.template.loader import render_to_string
from django.conf import settings
import logging
logger = logging.getLogger(__name__)
class BillingEmailService:
"""Service for sending billing-related emails"""
@staticmethod
def send_payment_confirmation_email(payment, account):
"""
Send email when user submits manual payment for approval
"""
subject = f'Payment Confirmation Received - Invoice #{payment.invoice.invoice_number}'
context = {
'account_name': account.name,
'invoice_number': payment.invoice.invoice_number,
'amount': payment.amount,
'currency': payment.currency,
'payment_method': payment.get_payment_method_display(),
'manual_reference': payment.manual_reference,
'created_at': payment.created_at,
}
# Plain text message
message = f"""
Hi {account.name},
We have received your payment confirmation for Invoice #{payment.invoice.invoice_number}.
Payment Details:
- Amount: {payment.currency} {payment.amount}
- Payment Method: {payment.get_payment_method_display()}
- Reference: {payment.manual_reference}
- Submitted: {payment.created_at.strftime('%Y-%m-%d %H:%M')}
Your payment is currently under review. You will receive another email once it has been approved.
Thank you,
The Igny8 Team
"""
try:
send_mail(
subject=subject,
message=message.strip(),
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[account.billing_email or account.owner.email],
fail_silently=False,
)
logger.info(f'Payment confirmation email sent for Payment {payment.id}')
except Exception as e:
logger.error(f'Failed to send payment confirmation email: {str(e)}')
@staticmethod
def send_payment_approved_email(payment, account, subscription):
"""
Send email when payment is approved and account activated
"""
subject = f'Payment Approved - Account Activated'
context = {
'account_name': account.name,
'invoice_number': payment.invoice.invoice_number,
'amount': payment.amount,
'currency': payment.currency,
'plan_name': subscription.plan.name if subscription else 'N/A',
'approved_at': payment.approved_at,
}
message = f"""
Hi {account.name},
Great news! Your payment has been approved and your account is now active.
Payment Details:
- Invoice: #{payment.invoice.invoice_number}
- Amount: {payment.currency} {payment.amount}
- Plan: {subscription.plan.name if subscription else 'N/A'}
- Approved: {payment.approved_at.strftime('%Y-%m-%d %H:%M')}
You can now access all features of your plan. Log in to get started!
Dashboard: {settings.FRONTEND_URL}/dashboard
Thank you,
The Igny8 Team
"""
try:
send_mail(
subject=subject,
message=message.strip(),
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[account.billing_email or account.owner.email],
fail_silently=False,
)
logger.info(f'Payment approved email sent for Payment {payment.id}')
except Exception as e:
logger.error(f'Failed to send payment approved email: {str(e)}')
@staticmethod
def send_payment_rejected_email(payment, account, reason):
"""
Send email when payment is rejected
"""
subject = f'Payment Declined - Action Required'
message = f"""
Hi {account.name},
Unfortunately, we were unable to approve your payment for Invoice #{payment.invoice.invoice_number}.
Reason: {reason}
Payment Details:
- Invoice: #{payment.invoice.invoice_number}
- Amount: {payment.currency} {payment.amount}
- Reference: {payment.manual_reference}
You can retry your payment by logging into your account:
{settings.FRONTEND_URL}/billing
If you have questions, please contact our support team.
Thank you,
The Igny8 Team
"""
try:
send_mail(
subject=subject,
message=message.strip(),
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[account.billing_email or account.owner.email],
fail_silently=False,
)
logger.info(f'Payment rejected email sent for Payment {payment.id}')
except Exception as e:
logger.error(f'Failed to send payment rejected email: {str(e)}')
@staticmethod
def send_refund_notification(user, payment, refund_amount, reason):
"""
Send email when refund is processed
"""
subject = f'Refund Processed - Invoice #{payment.invoice.invoice_number}'
message = f"""
Hi {user.first_name or user.email},
Your refund has been processed successfully.
Refund Details:
- Invoice: #{payment.invoice.invoice_number}
- Original Amount: {payment.currency} {payment.amount}
- Refund Amount: {payment.currency} {refund_amount}
- Reason: {reason}
- Processed: {payment.refunded_at.strftime('%Y-%m-%d %H:%M')}
The refund will appear in your original payment method within 5-10 business days.
If you have any questions, please contact our support team.
Thank you,
The Igny8 Team
"""
try:
send_mail(
subject=subject,
message=message.strip(),
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[user.email],
fail_silently=False,
)
logger.info(f'Refund notification email sent for Payment {payment.id}')
except Exception as e:
logger.error(f'Failed to send refund notification email: {str(e)}')
@staticmethod
def send_subscription_renewal_notice(subscription, days_until_renewal):
"""
Send email reminder before subscription renewal
"""
subject = f'Subscription Renewal Reminder - {days_until_renewal} Days'
account = subscription.account
user = account.owner
message = f"""
Hi {account.name},
Your subscription will be renewed in {days_until_renewal} days.
Subscription Details:
- Plan: {subscription.plan.name}
- Renewal Date: {subscription.current_period_end.strftime('%Y-%m-%d')}
- Amount: {subscription.plan.currency} {subscription.plan.price}
Your payment method will be charged automatically on the renewal date.
To manage your subscription or update payment details:
{settings.FRONTEND_URL}/billing/subscription
Thank you,
The Igny8 Team
"""
try:
send_mail(
subject=subject,
message=message.strip(),
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[account.billing_email or user.email],
fail_silently=False,
)
logger.info(f'Renewal notice sent for Subscription {subscription.id}')
except Exception as e:
logger.error(f'Failed to send renewal notice: {str(e)}')

View File

@@ -17,20 +17,31 @@ class InvoiceService:
@staticmethod
def generate_invoice_number(account: Account) -> str:
"""
Generate unique invoice number
Generate unique invoice number with atomic locking to prevent duplicates
Format: INV-{ACCOUNT_ID}-{YEAR}{MONTH}-{COUNTER}
"""
from django.db import transaction
now = timezone.now()
prefix = f"INV-{account.id}-{now.year}{now.month:02d}"
# Get count of invoices for this account this month
count = Invoice.objects.filter(
account=account,
created_at__year=now.year,
created_at__month=now.month
).count()
return f"{prefix}-{count + 1:04d}"
# Use atomic transaction with SELECT FOR UPDATE to prevent race conditions
with transaction.atomic():
# Lock the invoice table for this account/month to get accurate count
count = Invoice.objects.select_for_update().filter(
account=account,
created_at__year=now.year,
created_at__month=now.month
).count()
invoice_number = f"{prefix}-{count + 1:04d}"
# Double-check uniqueness (should not happen with lock, but safety check)
while Invoice.objects.filter(invoice_number=invoice_number).exists():
count += 1
invoice_number = f"{prefix}-{count + 1:04d}"
return invoice_number
@staticmethod
@transaction.atomic
@@ -58,27 +69,42 @@ class InvoiceService:
'snapshot_date': timezone.now().isoformat()
}
# For manual payments, use configurable grace period instead of billing_period_end
from igny8_core.business.billing.config import INVOICE_DUE_DATE_OFFSET
invoice_date = timezone.now().date()
due_date = invoice_date + timedelta(days=INVOICE_DUE_DATE_OFFSET)
# Get currency based on billing country
from igny8_core.business.billing.utils.currency import get_currency_for_country, convert_usd_to_local
currency = get_currency_for_country(account.billing_country)
# Convert plan price to local currency
local_price = convert_usd_to_local(float(plan.price), account.billing_country)
invoice = Invoice.objects.create(
account=account,
subscription=subscription, # Set FK directly
invoice_number=InvoiceService.generate_invoice_number(account),
status='pending',
currency='USD',
invoice_date=timezone.now().date(),
due_date=billing_period_end.date(),
currency=currency,
invoice_date=invoice_date,
due_date=due_date,
metadata={
'billing_snapshot': billing_snapshot,
'billing_period_start': billing_period_start.isoformat(),
'billing_period_end': billing_period_end.isoformat(),
'subscription_id': subscription.id
'subscription_id': subscription.id, # Keep in metadata for backward compatibility
'usd_price': str(plan.price), # Store original USD price
'exchange_rate': str(local_price / float(plan.price) if plan.price > 0 else 1.0)
}
)
# Add line item for subscription
# Add line item for subscription with converted price
invoice.add_line_item(
description=f"{plan.name} Plan - {billing_period_start.strftime('%b %Y')}",
quantity=1,
unit_price=plan.price,
amount=plan.price
unit_price=Decimal(str(local_price)),
amount=Decimal(str(local_price))
)
invoice.calculate_totals()
@@ -95,26 +121,38 @@ class InvoiceService:
"""
Create invoice for credit package purchase
"""
from igny8_core.business.billing.config import INVOICE_DUE_DATE_OFFSET
invoice_date = timezone.now().date()
# Get currency based on billing country
from igny8_core.business.billing.utils.currency import get_currency_for_country, convert_usd_to_local
currency = get_currency_for_country(account.billing_country)
# Convert credit package price to local currency
local_price = convert_usd_to_local(float(credit_package.price), account.billing_country)
invoice = Invoice.objects.create(
account=account,
invoice_number=InvoiceService.generate_invoice_number(account),
billing_email=account.billing_email or account.users.filter(role='owner').first().email,
status='pending',
currency='USD',
invoice_date=timezone.now().date(),
due_date=timezone.now().date(),
currency=currency,
invoice_date=invoice_date,
due_date=invoice_date + timedelta(days=INVOICE_DUE_DATE_OFFSET),
metadata={
'credit_package_id': credit_package.id,
'credit_amount': credit_package.credits,
'usd_price': str(credit_package.price), # Store original USD price
'exchange_rate': str(local_price / float(credit_package.price) if credit_package.price > 0 else 1.0)
},
)
# Add line item for credit package
# Add line item for credit package with converted price
invoice.add_line_item(
description=f"{credit_package.name} - {credit_package.credits:,} Credits",
quantity=1,
unit_price=credit_package.price,
amount=credit_package.price
unit_price=Decimal(str(local_price)),
amount=Decimal(str(local_price))
)
invoice.calculate_totals()

View File

@@ -0,0 +1,246 @@
"""
Invoice PDF generation service
Generates PDF invoices for billing records
"""
from decimal import Decimal
from datetime import datetime
from io import BytesIO
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer, Image
from reportlab.lib.enums import TA_LEFT, TA_RIGHT, TA_CENTER
from django.conf import settings
import logging
logger = logging.getLogger(__name__)
class InvoicePDFGenerator:
"""Generate PDF invoices"""
@staticmethod
def generate_invoice_pdf(invoice):
"""
Generate PDF for an invoice
Args:
invoice: Invoice model instance
Returns:
BytesIO: PDF file buffer
"""
buffer = BytesIO()
# Create PDF document
doc = SimpleDocTemplate(
buffer,
pagesize=letter,
rightMargin=0.75*inch,
leftMargin=0.75*inch,
topMargin=0.75*inch,
bottomMargin=0.75*inch
)
# Container for PDF elements
elements = []
styles = getSampleStyleSheet()
# Custom styles
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
textColor=colors.HexColor('#1f2937'),
spaceAfter=30,
)
heading_style = ParagraphStyle(
'CustomHeading',
parent=styles['Heading2'],
fontSize=14,
textColor=colors.HexColor('#374151'),
spaceAfter=12,
)
normal_style = ParagraphStyle(
'CustomNormal',
parent=styles['Normal'],
fontSize=10,
textColor=colors.HexColor('#4b5563'),
)
# Header
elements.append(Paragraph('INVOICE', title_style))
elements.append(Spacer(1, 0.2*inch))
# Company info and invoice details side by side
company_data = [
['<b>From:</b>', f'<b>Invoice #:</b> {invoice.invoice_number}'],
[getattr(settings, 'COMPANY_NAME', 'Igny8'), f'<b>Date:</b> {invoice.created_at.strftime("%B %d, %Y")}'],
[getattr(settings, 'COMPANY_ADDRESS', ''), f'<b>Due Date:</b> {invoice.due_date.strftime("%B %d, %Y")}'],
[getattr(settings, 'COMPANY_EMAIL', settings.DEFAULT_FROM_EMAIL), f'<b>Status:</b> {invoice.status.upper()}'],
]
company_table = Table(company_data, colWidths=[3.5*inch, 3*inch])
company_table.setStyle(TableStyle([
('FONTNAME', (0, 0), (-1, -1), 'Helvetica'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('TEXTCOLOR', (0, 0), (-1, -1), colors.HexColor('#4b5563')),
('VALIGN', (0, 0), (-1, -1), 'TOP'),
('ALIGN', (1, 0), (1, -1), 'RIGHT'),
]))
elements.append(company_table)
elements.append(Spacer(1, 0.3*inch))
# Bill to section
elements.append(Paragraph('<b>Bill To:</b>', heading_style))
bill_to_data = [
[invoice.account.name],
[invoice.account.owner.email],
]
if hasattr(invoice.account, 'billing_email') and invoice.account.billing_email:
bill_to_data.append([f'Billing: {invoice.account.billing_email}'])
for line in bill_to_data:
elements.append(Paragraph(line[0], normal_style))
elements.append(Spacer(1, 0.3*inch))
# Line items table
elements.append(Paragraph('<b>Items:</b>', heading_style))
# Table header
line_items_data = [
['Description', 'Quantity', 'Unit Price', 'Amount']
]
# Get line items
for item in invoice.line_items.all():
line_items_data.append([
item.description,
str(item.quantity),
f'{invoice.currency} {item.unit_price:.2f}',
f'{invoice.currency} {item.total_price:.2f}'
])
# Add subtotal, tax, total rows
line_items_data.append(['', '', '<b>Subtotal:</b>', f'<b>{invoice.currency} {invoice.subtotal:.2f}</b>'])
if invoice.tax_amount and invoice.tax_amount > 0:
line_items_data.append(['', '', f'Tax ({invoice.tax_rate}%):', f'{invoice.currency} {invoice.tax_amount:.2f}'])
if invoice.discount_amount and invoice.discount_amount > 0:
line_items_data.append(['', '', 'Discount:', f'-{invoice.currency} {invoice.discount_amount:.2f}'])
line_items_data.append(['', '', '<b>Total:</b>', f'<b>{invoice.currency} {invoice.total_amount:.2f}</b>'])
# Create table
line_items_table = Table(
line_items_data,
colWidths=[3*inch, 1*inch, 1.25*inch, 1.25*inch]
)
line_items_table.setStyle(TableStyle([
# Header row
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#f3f4f6')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.HexColor('#1f2937')),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 10),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
# Body rows
('FONTNAME', (0, 1), (-1, -4), 'Helvetica'),
('FONTSIZE', (0, 1), (-1, -4), 9),
('TEXTCOLOR', (0, 1), (-1, -4), colors.HexColor('#4b5563')),
('ROWBACKGROUNDS', (0, 1), (-1, -4), [colors.white, colors.HexColor('#f9fafb')]),
# Summary rows (last 3-4 rows)
('FONTNAME', (0, -4), (-1, -1), 'Helvetica'),
('FONTSIZE', (0, -4), (-1, -1), 9),
('ALIGN', (2, 0), (2, -1), 'RIGHT'),
('ALIGN', (3, 0), (3, -1), 'RIGHT'),
# Grid
('GRID', (0, 0), (-1, -4), 0.5, colors.HexColor('#e5e7eb')),
('LINEABOVE', (2, -4), (-1, -4), 1, colors.HexColor('#d1d5db')),
('LINEABOVE', (2, -1), (-1, -1), 2, colors.HexColor('#1f2937')),
# Padding
('TOPPADDING', (0, 0), (-1, -1), 8),
('BOTTOMPADDING', (0, 0), (-1, -1), 8),
('LEFTPADDING', (0, 0), (-1, -1), 10),
('RIGHTPADDING', (0, 0), (-1, -1), 10),
]))
elements.append(line_items_table)
elements.append(Spacer(1, 0.4*inch))
# Payment information
if invoice.status == 'paid':
elements.append(Paragraph('<b>Payment Information:</b>', heading_style))
payment = invoice.payments.filter(status='succeeded').first()
if payment:
payment_info = [
f'Payment Method: {payment.get_payment_method_display()}',
f'Paid On: {payment.processed_at.strftime("%B %d, %Y")}',
]
if payment.manual_reference:
payment_info.append(f'Reference: {payment.manual_reference}')
for line in payment_info:
elements.append(Paragraph(line, normal_style))
elements.append(Spacer(1, 0.2*inch))
# Footer / Notes
if invoice.notes:
elements.append(Spacer(1, 0.2*inch))
elements.append(Paragraph('<b>Notes:</b>', heading_style))
elements.append(Paragraph(invoice.notes, normal_style))
# Terms
elements.append(Spacer(1, 0.3*inch))
elements.append(Paragraph('<b>Terms & Conditions:</b>', heading_style))
terms = getattr(settings, 'INVOICE_TERMS', 'Payment is due within 7 days of invoice date.')
elements.append(Paragraph(terms, normal_style))
# Build PDF
doc.build(elements)
# Get PDF content
buffer.seek(0)
return buffer
@staticmethod
def save_invoice_pdf(invoice, file_path=None):
"""
Generate and save invoice PDF to file
Args:
invoice: Invoice model instance
file_path: Optional file path, defaults to media/invoices/
Returns:
str: File path where PDF was saved
"""
import os
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
# Generate PDF
pdf_buffer = InvoicePDFGenerator.generate_invoice_pdf(invoice)
# Determine file path
if not file_path:
file_path = f'invoices/{invoice.invoice_number}.pdf'
# Save to storage
saved_path = default_storage.save(file_path, ContentFile(pdf_buffer.read()))
logger.info(f'Invoice PDF saved: {saved_path}')
return saved_path

View File

@@ -0,0 +1,178 @@
"""
Payment retry mechanism for failed payments
Implements automatic retry logic with exponential backoff
"""
from datetime import timedelta
from django.utils import timezone
from celery import shared_task
from igny8_core.business.billing.models import Payment
from igny8_core.business.billing.config import MAX_PAYMENT_RETRIES
import logging
logger = logging.getLogger(__name__)
@shared_task(name='billing.retry_failed_payment')
def retry_failed_payment(payment_id: int):
"""
Retry a failed payment with exponential backoff
Args:
payment_id: Payment ID to retry
"""
try:
payment = Payment.objects.get(id=payment_id)
# Only retry failed payments
if payment.status != 'failed':
logger.info(f"Payment {payment_id} status is {payment.status}, skipping retry")
return
# Check retry count
retry_count = payment.metadata.get('retry_count', 0)
if retry_count >= MAX_PAYMENT_RETRIES:
logger.warning(f"Payment {payment_id} exceeded max retries ({MAX_PAYMENT_RETRIES})")
payment.metadata['retry_exhausted'] = True
payment.save(update_fields=['metadata'])
return
# Process retry based on payment method
if payment.payment_method == 'stripe':
success = _retry_stripe_payment(payment)
elif payment.payment_method == 'paypal':
success = _retry_paypal_payment(payment)
else:
# Manual payments cannot be automatically retried
logger.info(f"Payment {payment_id} is manual, cannot auto-retry")
return
# Update retry count
retry_count += 1
payment.metadata['retry_count'] = retry_count
payment.metadata['last_retry_at'] = timezone.now().isoformat()
if success:
payment.status = 'succeeded'
payment.processed_at = timezone.now()
payment.failure_reason = ''
logger.info(f"Payment {payment_id} retry succeeded")
else:
# Schedule next retry with exponential backoff
if retry_count < MAX_PAYMENT_RETRIES:
delay_minutes = 5 * (2 ** retry_count) # 5, 10, 20 minutes
retry_failed_payment.apply_async(
args=[payment_id],
countdown=delay_minutes * 60
)
payment.metadata['next_retry_at'] = (
timezone.now() + timedelta(minutes=delay_minutes)
).isoformat()
logger.info(f"Payment {payment_id} retry {retry_count} failed, next retry in {delay_minutes}m")
payment.save(update_fields=['status', 'processed_at', 'failure_reason', 'metadata'])
except Payment.DoesNotExist:
logger.error(f"Payment {payment_id} not found for retry")
except Exception as e:
logger.exception(f"Error retrying payment {payment_id}: {str(e)}")
def _retry_stripe_payment(payment: Payment) -> bool:
"""
Retry Stripe payment
Args:
payment: Payment instance
Returns:
True if retry succeeded, False otherwise
"""
try:
import stripe
from igny8_core.business.billing.utils.payment_gateways import get_stripe_client
stripe_client = get_stripe_client()
# Retrieve payment intent
intent = stripe_client.PaymentIntent.retrieve(payment.stripe_payment_intent_id)
# Attempt to confirm the payment intent
if intent.status == 'requires_payment_method':
# Cannot retry without new payment method
payment.failure_reason = 'Requires new payment method'
return False
elif intent.status == 'requires_action':
# Requires customer action (3D Secure)
payment.failure_reason = 'Requires customer authentication'
return False
elif intent.status == 'succeeded':
return True
return False
except Exception as e:
logger.exception(f"Stripe retry error for payment {payment.id}: {str(e)}")
payment.failure_reason = str(e)
return False
def _retry_paypal_payment(payment: Payment) -> bool:
"""
Retry PayPal payment
Args:
payment: Payment instance
Returns:
True if retry succeeded, False otherwise
"""
try:
from igny8_core.business.billing.utils.payment_gateways import get_paypal_client
paypal_client = get_paypal_client()
# Check order status
order = paypal_client.orders.get(payment.paypal_order_id)
if order.status == 'APPROVED':
# Attempt to capture
capture_response = paypal_client.orders.capture(payment.paypal_order_id)
if capture_response.status == 'COMPLETED':
payment.paypal_capture_id = capture_response.purchase_units[0].payments.captures[0].id
return True
elif order.status == 'COMPLETED':
return True
payment.failure_reason = f'PayPal order status: {order.status}'
return False
except Exception as e:
logger.exception(f"PayPal retry error for payment {payment.id}: {str(e)}")
payment.failure_reason = str(e)
return False
@shared_task(name='billing.schedule_payment_retries')
def schedule_payment_retries():
"""
Periodic task to identify failed payments and schedule retries
Should be run every 5 minutes via Celery Beat
"""
# Get failed payments from last 24 hours that haven't exhausted retries
cutoff_time = timezone.now() - timedelta(hours=24)
failed_payments = Payment.objects.filter(
status='failed',
failed_at__gte=cutoff_time,
payment_method__in=['stripe', 'paypal'] # Only auto-retry gateway payments
).exclude(
metadata__has_key='retry_exhausted'
)
for payment in failed_payments:
retry_count = payment.metadata.get('retry_count', 0)
if retry_count < MAX_PAYMENT_RETRIES:
# Schedule immediate retry if not already scheduled
if 'next_retry_at' not in payment.metadata:
retry_failed_payment.delay(payment.id)
logger.info(f"Scheduled retry for payment {payment.id}")

View File

@@ -0,0 +1,222 @@
"""
Subscription renewal tasks
Handles automatic subscription renewals with Celery
"""
from datetime import timedelta
from django.db import transaction
from django.utils import timezone
from celery import shared_task
from igny8_core.business.billing.models import Subscription, Invoice, Payment
from igny8_core.business.billing.services.invoice_service import InvoiceService
from igny8_core.business.billing.services.email_service import BillingEmailService
from igny8_core.business.billing.config import SUBSCRIPTION_RENEWAL_NOTICE_DAYS
import logging
logger = logging.getLogger(__name__)
@shared_task(name='billing.send_renewal_notices')
def send_renewal_notices():
"""
Send renewal notice emails to subscribers
Run daily to check subscriptions expiring soon
"""
notice_date = timezone.now().date() + timedelta(days=SUBSCRIPTION_RENEWAL_NOTICE_DAYS)
# Get active subscriptions expiring on notice_date
subscriptions = Subscription.objects.filter(
status='active',
current_period_end__date=notice_date
).select_related('account', 'plan', 'account__owner')
for subscription in subscriptions:
# Check if notice already sent
if subscription.metadata.get('renewal_notice_sent'):
continue
try:
BillingEmailService.send_subscription_renewal_notice(
subscription=subscription,
days_until_renewal=SUBSCRIPTION_RENEWAL_NOTICE_DAYS
)
# Mark notice as sent
subscription.metadata['renewal_notice_sent'] = True
subscription.metadata['renewal_notice_sent_at'] = timezone.now().isoformat()
subscription.save(update_fields=['metadata'])
logger.info(f"Renewal notice sent for subscription {subscription.id}")
except Exception as e:
logger.exception(f"Failed to send renewal notice for subscription {subscription.id}: {str(e)}")
@shared_task(name='billing.process_subscription_renewals')
def process_subscription_renewals():
"""
Process subscription renewals for subscriptions ending today
Run daily at midnight to renew subscriptions
"""
today = timezone.now().date()
# Get active subscriptions ending today
subscriptions = Subscription.objects.filter(
status='active',
current_period_end__date=today
).select_related('account', 'plan')
logger.info(f"Processing {subscriptions.count()} subscription renewals for {today}")
for subscription in subscriptions:
try:
renew_subscription(subscription.id)
except Exception as e:
logger.exception(f"Failed to renew subscription {subscription.id}: {str(e)}")
@shared_task(name='billing.renew_subscription')
def renew_subscription(subscription_id: int):
"""
Renew a specific subscription
Args:
subscription_id: Subscription ID to renew
"""
try:
subscription = Subscription.objects.select_related('account', 'plan').get(id=subscription_id)
if subscription.status != 'active':
logger.warning(f"Subscription {subscription_id} is not active, skipping renewal")
return
with transaction.atomic():
# Create renewal invoice
invoice = InvoiceService.create_subscription_invoice(
account=subscription.account,
subscription=subscription
)
# Attempt automatic payment if payment method on file
payment_attempted = False
# Check if account has saved payment method
if subscription.metadata.get('stripe_subscription_id'):
payment_attempted = _attempt_stripe_renewal(subscription, invoice)
elif subscription.metadata.get('paypal_subscription_id'):
payment_attempted = _attempt_paypal_renewal(subscription, invoice)
if payment_attempted:
# Payment processing will handle subscription renewal
logger.info(f"Automatic payment initiated for subscription {subscription_id}")
else:
# No automatic payment - send invoice for manual payment
logger.info(f"Manual payment required for subscription {subscription_id}")
# Mark subscription as pending renewal
subscription.status = 'pending_renewal'
subscription.metadata['renewal_invoice_id'] = invoice.id
subscription.metadata['renewal_required_at'] = timezone.now().isoformat()
subscription.save(update_fields=['status', 'metadata'])
# TODO: Send invoice email
# Clear renewal notice flag
if 'renewal_notice_sent' in subscription.metadata:
del subscription.metadata['renewal_notice_sent']
subscription.save(update_fields=['metadata'])
except Subscription.DoesNotExist:
logger.error(f"Subscription {subscription_id} not found for renewal")
except Exception as e:
logger.exception(f"Error renewing subscription {subscription_id}: {str(e)}")
def _attempt_stripe_renewal(subscription: Subscription, invoice: Invoice) -> bool:
"""
Attempt to charge Stripe subscription
Returns:
True if payment initiated, False otherwise
"""
try:
import stripe
from igny8_core.business.billing.utils.payment_gateways import get_stripe_client
stripe_client = get_stripe_client()
# Retrieve Stripe subscription
stripe_sub = stripe_client.Subscription.retrieve(
subscription.metadata['stripe_subscription_id']
)
# Create payment intent for renewal
intent = stripe_client.PaymentIntent.create(
amount=int(float(invoice.total_amount) * 100),
currency=invoice.currency.lower(),
customer=stripe_sub.customer,
payment_method=stripe_sub.default_payment_method,
off_session=True,
confirm=True,
metadata={
'invoice_id': invoice.id,
'subscription_id': subscription.id
}
)
# Create payment record
Payment.objects.create(
account=subscription.account,
invoice=invoice,
amount=invoice.total_amount,
currency=invoice.currency,
payment_method='stripe',
status='processing',
stripe_payment_intent_id=intent.id,
metadata={'renewal': True}
)
return True
except Exception as e:
logger.exception(f"Stripe renewal payment failed for subscription {subscription.id}: {str(e)}")
return False
def _attempt_paypal_renewal(subscription: Subscription, invoice: Invoice) -> bool:
"""
Attempt to charge PayPal subscription
Returns:
True if payment initiated, False otherwise
"""
try:
from igny8_core.business.billing.utils.payment_gateways import get_paypal_client
paypal_client = get_paypal_client()
# PayPal subscriptions bill automatically
# We just need to verify the subscription is still active
paypal_sub = paypal_client.subscriptions.get(
subscription.metadata['paypal_subscription_id']
)
if paypal_sub.status == 'ACTIVE':
# PayPal will charge automatically, create payment record as processing
Payment.objects.create(
account=subscription.account,
invoice=invoice,
amount=invoice.total_amount,
currency=invoice.currency,
payment_method='paypal',
status='processing',
paypal_order_id=subscription.metadata['paypal_subscription_id'],
metadata={'renewal': True}
)
return True
else:
logger.warning(f"PayPal subscription {paypal_sub.id} status: {paypal_sub.status}")
return False
except Exception as e:
logger.exception(f"PayPal renewal check failed for subscription {subscription.id}: {str(e)}")
return False

View File

@@ -0,0 +1,299 @@
"""
Concurrency tests for payment approval
Tests race conditions and concurrent approval attempts
"""
import pytest
from django.test import TestCase, TransactionTestCase
from django.contrib.auth import get_user_model
from django.db import transaction
from concurrent.futures import ThreadPoolExecutor, as_completed
from decimal import Decimal
from igny8_core.business.billing.models import (
Invoice, Payment, Subscription, Plan, Account
)
from igny8_core.business.billing.views import approve_payment
from unittest.mock import Mock
import threading
User = get_user_model()
class PaymentApprovalConcurrencyTest(TransactionTestCase):
"""Test concurrent payment approval scenarios"""
def setUp(self):
"""Set up test data"""
# Create admin user
self.admin = User.objects.create_user(
email='admin@test.com',
password='testpass123',
is_staff=True
)
# Create account
self.account = Account.objects.create(
name='Test Account',
owner=self.admin,
credit_balance=0
)
# Create plan
self.plan = Plan.objects.create(
name='Test Plan',
slug='test-plan',
price=Decimal('100.00'),
currency='USD',
billing_period='monthly',
included_credits=1000
)
# Create subscription
self.subscription = Subscription.objects.create(
account=self.account,
plan=self.plan,
status='pending_payment'
)
# Create invoice
self.invoice = Invoice.objects.create(
account=self.account,
invoice_number='INV-TEST-001',
status='pending',
subtotal=Decimal('100.00'),
total_amount=Decimal('100.00'),
currency='USD',
invoice_type='subscription'
)
# Create payment
self.payment = Payment.objects.create(
account=self.account,
invoice=self.invoice,
amount=Decimal('100.00'),
currency='USD',
payment_method='bank_transfer',
status='pending_approval',
manual_reference='TEST-REF-001'
)
def test_concurrent_approval_attempts(self):
"""
Test that only one concurrent approval succeeds
Multiple admins trying to approve same payment simultaneously
"""
num_threads = 5
success_count = 0
failure_count = 0
results = []
def approve_payment_thread(payment_id, admin_user):
"""Thread worker to approve payment"""
try:
# Simulate approval logic with transaction
with transaction.atomic():
payment = Payment.objects.select_for_update().get(id=payment_id)
# Check if already approved
if payment.status == 'succeeded':
return {'success': False, 'reason': 'already_approved'}
# Approve payment
payment.status = 'succeeded'
payment.approved_by = admin_user
payment.save()
# Update invoice
invoice = payment.invoice
invoice.status = 'paid'
invoice.save()
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}
# Create multiple threads attempting approval
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = []
for i in range(num_threads):
future = executor.submit(approve_payment_thread, self.payment.id, self.admin)
futures.append(future)
# Collect results
for future in as_completed(futures):
result = future.result()
results.append(result)
if result.get('success'):
success_count += 1
else:
failure_count += 1
# Verify only one approval succeeded
self.assertEqual(success_count, 1, "Only one approval should succeed")
self.assertEqual(failure_count, num_threads - 1, "Other attempts should fail")
# Verify final state
payment = Payment.objects.get(id=self.payment.id)
self.assertEqual(payment.status, 'succeeded')
invoice = Invoice.objects.get(id=self.invoice.id)
self.assertEqual(invoice.status, 'paid')
def test_payment_and_invoice_consistency(self):
"""
Test that payment and invoice remain consistent under concurrent operations
"""
def read_payment_invoice(payment_id):
"""Read payment and invoice status"""
payment = Payment.objects.get(id=payment_id)
invoice = Invoice.objects.get(id=payment.invoice_id)
return {
'payment_status': payment.status,
'invoice_status': invoice.status,
'consistent': (
(payment.status == 'succeeded' and invoice.status == 'paid') or
(payment.status == 'pending_approval' and invoice.status == 'pending')
)
}
# Approve payment in one thread
def approve():
with transaction.atomic():
payment = Payment.objects.select_for_update().get(id=self.payment.id)
payment.status = 'succeeded'
payment.save()
invoice = Invoice.objects.select_for_update().get(id=self.invoice.id)
invoice.status = 'paid'
invoice.save()
# Read state in parallel threads
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
# Start approval
approval_future = executor.submit(approve)
# Multiple concurrent reads
read_futures = [
executor.submit(read_payment_invoice, self.payment.id)
for _ in range(20)
]
# Wait for approval
approval_future.result()
# Collect read results
for future in as_completed(read_futures):
results.append(future.result())
# All reads should show consistent state
for result in results:
self.assertTrue(
result['consistent'],
f"Inconsistent state: payment={result['payment_status']}, invoice={result['invoice_status']}"
)
def test_double_approval_prevention(self):
"""
Test that payment cannot be approved twice
"""
# First approval
with transaction.atomic():
payment = Payment.objects.select_for_update().get(id=self.payment.id)
payment.status = 'succeeded'
payment.approved_by = self.admin
payment.save()
invoice = payment.invoice
invoice.status = 'paid'
invoice.save()
# Attempt second approval
result = None
try:
with transaction.atomic():
payment = Payment.objects.select_for_update().get(id=self.payment.id)
# Should detect already approved
if payment.status == 'succeeded':
result = 'already_approved'
else:
payment.status = 'succeeded'
payment.save()
result = 'approved'
except Exception as e:
result = f'error: {str(e)}'
self.assertEqual(result, 'already_approved', "Second approval should be prevented")
class CreditTransactionConcurrencyTest(TransactionTestCase):
"""Test concurrent credit additions/deductions"""
def setUp(self):
self.admin = User.objects.create_user(
email='admin@test.com',
password='testpass123'
)
self.account = Account.objects.create(
name='Test Account',
owner=self.admin,
credit_balance=1000
)
def test_concurrent_credit_deductions(self):
"""
Test that concurrent credit deductions maintain correct balance
"""
initial_balance = self.account.credit_balance
deduction_amount = 10
num_operations = 20
def deduct_credits(account_id, amount):
"""Deduct credits atomically"""
from igny8_core.business.billing.models import CreditTransaction
with transaction.atomic():
account = Account.objects.select_for_update().get(id=account_id)
# Check sufficient balance
if account.credit_balance < amount:
return {'success': False, 'reason': 'insufficient_credits'}
# Deduct credits
account.credit_balance -= amount
new_balance = account.credit_balance
account.save()
# Record transaction
CreditTransaction.objects.create(
account=account,
transaction_type='deduction',
amount=-amount,
balance_after=new_balance,
description='Test deduction'
)
return {'success': True, 'new_balance': new_balance}
# Concurrent deductions
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(deduct_credits, self.account.id, deduction_amount)
for _ in range(num_operations)
]
results = [future.result() for future in as_completed(futures)]
# Verify all succeeded
success_count = sum(1 for r in results if r.get('success'))
self.assertEqual(success_count, num_operations, "All deductions should succeed")
# Verify final balance
self.account.refresh_from_db()
expected_balance = initial_balance - (deduction_amount * num_operations)
self.assertEqual(
self.account.credit_balance,
expected_balance,
f"Final balance should be {expected_balance}"
)

View File

@@ -0,0 +1,141 @@
"""
Test payment method filtering by country
"""
from django.test import TestCase, Client
from django.contrib.auth import get_user_model
from igny8_core.business.billing.models import PaymentMethodConfig
User = get_user_model()
class PaymentMethodFilteringTest(TestCase):
"""Test payment method filtering by billing country"""
def setUp(self):
"""Create test payment method configs"""
# Global methods (available everywhere)
PaymentMethodConfig.objects.create(
country_code='*',
payment_method='stripe',
display_name='Credit/Debit Card',
is_enabled=True,
sort_order=1,
)
PaymentMethodConfig.objects.create(
country_code='*',
payment_method='paypal',
display_name='PayPal',
is_enabled=True,
sort_order=2,
)
# Country-specific methods
PaymentMethodConfig.objects.create(
country_code='GB',
payment_method='bank_transfer',
display_name='Bank Transfer (UK)',
is_enabled=True,
sort_order=3,
)
PaymentMethodConfig.objects.create(
country_code='IN',
payment_method='local_wallet',
display_name='UPI/Wallets',
is_enabled=True,
sort_order=4,
)
PaymentMethodConfig.objects.create(
country_code='PK',
payment_method='bank_transfer',
display_name='Bank Transfer (Pakistan)',
is_enabled=True,
sort_order=5,
)
# Disabled method (should not appear)
PaymentMethodConfig.objects.create(
country_code='*',
payment_method='manual',
display_name='Manual',
is_enabled=False,
sort_order=99,
)
self.client = Client()
def test_filter_payment_methods_by_us(self):
"""Test filtering for US country - should get only global methods"""
response = self.client.get('/api/v1/billing/admin/payment-methods/?country=US')
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertTrue(data['success'])
self.assertEqual(len(data['results']), 2) # Only stripe and paypal
methods = [m['type'] for m in data['results']]
self.assertIn('stripe', methods)
self.assertIn('paypal', methods)
def test_filter_payment_methods_by_gb(self):
"""Test filtering for GB - should get global + GB-specific"""
response = self.client.get('/api/v1/billing/admin/payment-methods/?country=GB')
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertTrue(data['success'])
self.assertEqual(len(data['results']), 3) # stripe, paypal, bank_transfer(GB)
methods = [m['type'] for m in data['results']]
self.assertIn('stripe', methods)
self.assertIn('paypal', methods)
self.assertIn('bank_transfer', methods)
def test_filter_payment_methods_by_in(self):
"""Test filtering for IN - should get global + IN-specific"""
response = self.client.get('/api/v1/billing/admin/payment-methods/?country=IN')
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertTrue(data['success'])
self.assertEqual(len(data['results']), 3) # stripe, paypal, local_wallet(IN)
methods = [m['type'] for m in data['results']]
self.assertIn('stripe', methods)
self.assertIn('paypal', methods)
self.assertIn('local_wallet', methods)
def test_disabled_methods_not_returned(self):
"""Test that disabled payment methods are not included"""
response = self.client.get('/api/v1/billing/admin/payment-methods/?country=*')
self.assertEqual(response.status_code, 200)
data = response.json()
methods = [m['type'] for m in data['results']]
self.assertNotIn('manual', methods) # Disabled method should not appear
def test_sort_order_respected(self):
\"\"\"Test that payment methods are returned in sort_order\"\"\"
response = self.client.get('/api/v1/billing/admin/payment-methods/?country=GB')
self.assertEqual(response.status_code, 200)
data = response.json()
# Verify first method has lowest sort_order
self.assertEqual(data['results'][0]['type'], 'stripe')
self.assertEqual(data['results'][0]['sort_order'], 1)
def test_default_country_fallback(self):
"""Test that missing country parameter defaults to global (*)\"\"\"\n response = self.client.get('/api/v1/billing/admin/payment-methods/')
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertTrue(data['success'])
# Should get at least global methods
methods = [m['type'] for m in data['results']]
self.assertIn('stripe', methods)
self.assertIn('paypal', methods)

View File

@@ -0,0 +1,192 @@
"""
Integration tests for payment workflow
"""
from django.test import TestCase
from django.contrib.auth import get_user_model
from django.utils import timezone
from decimal import Decimal
from datetime import timedelta
from igny8_core.auth.models import Account, Plan, Subscription
from igny8_core.business.billing.models import Invoice, Payment
from igny8_core.business.billing.services.invoice_service import InvoiceService
User = get_user_model()
class PaymentWorkflowIntegrationTest(TestCase):
"""Test complete payment workflow including invoice.subscription FK"""
def setUp(self):
"""Create test data"""
# Create plan
self.plan = Plan.objects.create(
name='Test Plan',
slug='test-plan',
price=Decimal('29.00'),
included_credits=1000,
max_sites=5,
)
# Create account
self.account = Account.objects.create(
name='Test Account',
slug='test-account',
status='pending_payment',
billing_country='US',
billing_email='test@example.com',
)
# Create user
self.user = User.objects.create_user(
username='testuser',
email='testuser@example.com',
password='testpass123',
account=self.account,
)
# Create subscription
billing_period_start = timezone.now()
billing_period_end = billing_period_start + timedelta(days=30)
self.subscription = Subscription.objects.create(
account=self.account,
plan=self.plan,
status='pending_payment',
current_period_start=billing_period_start,
current_period_end=billing_period_end,
)
def test_invoice_subscription_fk_relationship(self):
"""Test that invoice.subscription FK works correctly"""
# Create invoice via service
billing_period_start = timezone.now()
billing_period_end = billing_period_start + timedelta(days=30)
invoice = InvoiceService.create_subscription_invoice(
subscription=self.subscription,
billing_period_start=billing_period_start,
billing_period_end=billing_period_end,
)
# Verify FK relationship
self.assertIsNotNone(invoice.subscription)
self.assertEqual(invoice.subscription.id, self.subscription.id)
self.assertEqual(invoice.subscription.plan.id, self.plan.id)
# Verify can access subscription from invoice
self.assertEqual(invoice.subscription.account, self.account)
self.assertEqual(invoice.subscription.plan.name, 'Test Plan')
def test_payment_approval_with_subscription(self):
"""Test payment approval workflow uses invoice.subscription"""
# Create invoice
billing_period_start = timezone.now()
billing_period_end = billing_period_start + timedelta(days=30)
invoice = InvoiceService.create_subscription_invoice(
subscription=self.subscription,
billing_period_start=billing_period_start,
billing_period_end=billing_period_end,
)
# Create payment
payment = Payment.objects.create(
account=self.account,
invoice=invoice,
amount=invoice.total,
currency='USD',
status='pending_approval',
payment_method='bank_transfer',
manual_reference='TEST-REF-001',
)
# Verify payment links to invoice which links to subscription
self.assertIsNotNone(payment.invoice)
self.assertIsNotNone(payment.invoice.subscription)
self.assertEqual(payment.invoice.subscription.id, self.subscription.id)
# Simulate approval workflow
payment.status = 'succeeded'
payment.approved_by = self.user
payment.approved_at = timezone.now()
payment.save()
# Update related records
invoice.status = 'paid'
invoice.paid_at = timezone.now()
invoice.save()
subscription = invoice.subscription
subscription.status = 'active'
subscription.save()
# Verify workflow completed successfully
self.assertEqual(payment.status, 'succeeded')
self.assertEqual(invoice.status, 'paid')
self.assertEqual(subscription.status, 'active')
self.assertEqual(subscription.plan.included_credits, 1000)
def test_subscription_dates_not_null_for_paid_plans(self):
"""Test that subscription dates are set for paid plans"""
self.assertIsNotNone(self.subscription.current_period_start)
self.assertIsNotNone(self.subscription.current_period_end)
# Verify dates are in future
self.assertGreater(self.subscription.current_period_end, self.subscription.current_period_start)
def test_invoice_currency_based_on_country(self):
"""Test that invoice currency is set based on billing country"""
# Test US -> USD
self.account.billing_country = 'US'
self.account.save()
billing_period_start = timezone.now()
billing_period_end = billing_period_start + timedelta(days=30)
invoice_us = InvoiceService.create_subscription_invoice(
subscription=self.subscription,
billing_period_start=billing_period_start,
billing_period_end=billing_period_end,
)
self.assertEqual(invoice_us.currency, 'USD')
# Test GB -> GBP
self.account.billing_country = 'GB'
self.account.save()
invoice_gb = InvoiceService.create_subscription_invoice(
subscription=self.subscription,
billing_period_start=billing_period_start,
billing_period_end=billing_period_end,
)
self.assertEqual(invoice_gb.currency, 'GBP')
# Test IN -> INR
self.account.billing_country = 'IN'
self.account.save()
invoice_in = InvoiceService.create_subscription_invoice(
subscription=self.subscription,
billing_period_start=billing_period_start,
billing_period_end=billing_period_end,
)
self.assertEqual(invoice_in.currency, 'INR')
def test_invoice_due_date_grace_period(self):
"""Test that invoice due date uses grace period instead of billing_period_end"""
billing_period_start = timezone.now()
billing_period_end = billing_period_start + timedelta(days=30)
invoice = InvoiceService.create_subscription_invoice(
subscription=self.subscription,
billing_period_start=billing_period_start,
billing_period_end=billing_period_end,
)
# Verify due date is invoice_date + 7 days (grace period)
expected_due_date = invoice.invoice_date + timedelta(days=7)
self.assertEqual(invoice.due_date, expected_due_date)
# Verify it's NOT billing_period_end
self.assertNotEqual(invoice.due_date, billing_period_end.date())

View File

@@ -0,0 +1,213 @@
"""
Currency utilities for billing
Maps countries to their currencies based on Stripe/PayPal standards
"""
# Country to currency mapping (Stripe/PayPal standard format)
COUNTRY_CURRENCY_MAP = {
# North America
'US': 'USD',
'CA': 'CAD',
'MX': 'MXN',
# Europe
'GB': 'GBP',
'DE': 'EUR',
'FR': 'EUR',
'IT': 'EUR',
'ES': 'EUR',
'NL': 'EUR',
'BE': 'EUR',
'AT': 'EUR',
'PT': 'EUR',
'IE': 'EUR',
'GR': 'EUR',
'FI': 'EUR',
'LU': 'EUR',
'CH': 'CHF',
'NO': 'NOK',
'SE': 'SEK',
'DK': 'DKK',
'PL': 'PLN',
'CZ': 'CZK',
'HU': 'HUF',
'RO': 'RON',
# Asia Pacific
'IN': 'INR',
'PK': 'PKR',
'BD': 'BDT',
'LK': 'LKR',
'JP': 'JPY',
'CN': 'CNY',
'HK': 'HKD',
'SG': 'SGD',
'MY': 'MYR',
'TH': 'THB',
'ID': 'IDR',
'PH': 'PHP',
'VN': 'VND',
'KR': 'KRW',
'TW': 'TWD',
'AU': 'AUD',
'NZ': 'NZD',
# Middle East
'AE': 'AED',
'SA': 'SAR',
'QA': 'QAR',
'KW': 'KWD',
'BH': 'BHD',
'OM': 'OMR',
'IL': 'ILS',
'TR': 'TRY',
# Africa
'ZA': 'ZAR',
'NG': 'NGN',
'KE': 'KES',
'EG': 'EGP',
'MA': 'MAD',
# South America
'BR': 'BRL',
'AR': 'ARS',
'CL': 'CLP',
'CO': 'COP',
'PE': 'PEN',
}
# Default currency fallback
DEFAULT_CURRENCY = 'USD'
def get_currency_for_country(country_code: str) -> str:
"""
Get currency code for a given country code.
Args:
country_code: ISO 2-letter country code (e.g., 'US', 'GB', 'IN')
Returns:
Currency code (e.g., 'USD', 'GBP', 'INR')
"""
if not country_code:
return DEFAULT_CURRENCY
country_code = country_code.upper().strip()
return COUNTRY_CURRENCY_MAP.get(country_code, DEFAULT_CURRENCY)
def get_currency_symbol(currency_code: str) -> str:
"""
Get currency symbol for a given currency code.
Args:
currency_code: Currency code (e.g., 'USD', 'GBP', 'INR')
Returns:
Currency symbol (e.g., '$', '£', '')
"""
CURRENCY_SYMBOLS = {
'USD': '$',
'EUR': '',
'GBP': '£',
'INR': '',
'JPY': '¥',
'CNY': '¥',
'AUD': 'A$',
'CAD': 'C$',
'CHF': 'Fr',
'SEK': 'kr',
'NOK': 'kr',
'DKK': 'kr',
'PLN': '',
'BRL': 'R$',
'ZAR': 'R',
'AED': 'د.إ',
'SAR': 'ر.س',
'PKR': '',
}
return CURRENCY_SYMBOLS.get(currency_code, currency_code + ' ')
# Currency multipliers for countries with payment methods configured
# These represent approximate exchange rates to USD
CURRENCY_MULTIPLIERS = {
'USD': 1.0, # United States
'GBP': 0.79, # United Kingdom
'INR': 83.0, # India
'PKR': 278.0, # Pakistan
'CAD': 1.35, # Canada
'AUD': 1.52, # Australia
'EUR': 0.92, # Germany, France (Eurozone)
}
# Map countries to their multipliers
COUNTRY_MULTIPLIERS = {
'US': CURRENCY_MULTIPLIERS['USD'],
'GB': CURRENCY_MULTIPLIERS['GBP'],
'IN': CURRENCY_MULTIPLIERS['INR'],
'PK': CURRENCY_MULTIPLIERS['PKR'],
'CA': CURRENCY_MULTIPLIERS['CAD'],
'AU': CURRENCY_MULTIPLIERS['AUD'],
'DE': CURRENCY_MULTIPLIERS['EUR'],
'FR': CURRENCY_MULTIPLIERS['EUR'],
}
def get_currency_multiplier(country_code: str) -> float:
"""
Get currency multiplier for a given country code.
Used to convert USD prices to local currency.
Args:
country_code: ISO 2-letter country code (e.g., 'US', 'GB', 'IN')
Returns:
Multiplier float (e.g., 1.0 for USD, 83.0 for INR)
"""
if not country_code:
return 1.0
country_code = country_code.upper().strip()
return COUNTRY_MULTIPLIERS.get(country_code, 1.0)
def convert_usd_to_local(usd_amount: float, country_code: str) -> float:
"""
Convert USD amount to local currency for given country.
Args:
usd_amount: Amount in USD
country_code: ISO 2-letter country code
Returns:
Amount in local currency
"""
multiplier = get_currency_multiplier(country_code)
return round(usd_amount * multiplier, 2)
def format_currency(amount: float, country_code: str = 'US') -> str:
"""
Format amount with appropriate currency symbol.
Args:
amount: Numeric amount
country_code: ISO 2-letter country code
Returns:
Formatted string (e.g., '$99.00', '₹8,300.00')
"""
currency = get_currency_for_country(country_code)
symbol = get_currency_symbol(currency)
# Format with commas for thousands
if amount >= 1000:
formatted = f"{amount:,.2f}"
else:
formatted = f"{amount:.2f}"
return f"{symbol}{formatted}"

View File

@@ -0,0 +1,186 @@
"""
Standardized Error Response Utilities
Ensures consistent error formats across the billing module
"""
from rest_framework import status
from rest_framework.response import Response
from typing import Dict, Any, Optional, List
class ErrorCode:
"""Standardized error codes for billing module"""
# Payment errors
PAYMENT_NOT_FOUND = 'payment_not_found'
PAYMENT_ALREADY_PROCESSED = 'payment_already_processed'
PAYMENT_AMOUNT_MISMATCH = 'payment_amount_mismatch'
PAYMENT_METHOD_NOT_AVAILABLE = 'payment_method_not_available'
# Invoice errors
INVOICE_NOT_FOUND = 'invoice_not_found'
INVOICE_ALREADY_PAID = 'invoice_already_paid'
INVOICE_VOIDED = 'invoice_voided'
INVOICE_EXPIRED = 'invoice_expired'
# Subscription errors
SUBSCRIPTION_NOT_FOUND = 'subscription_not_found'
SUBSCRIPTION_INACTIVE = 'subscription_inactive'
SUBSCRIPTION_ALREADY_EXISTS = 'subscription_already_exists'
# Credit errors
INSUFFICIENT_CREDITS = 'insufficient_credits'
INVALID_CREDIT_PACKAGE = 'invalid_credit_package'
# Validation errors
VALIDATION_ERROR = 'validation_error'
MISSING_REQUIRED_FIELD = 'missing_required_field'
INVALID_STATUS_TRANSITION = 'invalid_status_transition'
# Authorization errors
UNAUTHORIZED = 'unauthorized'
FORBIDDEN = 'forbidden'
# System errors
INTERNAL_ERROR = 'internal_error'
TIMEOUT = 'timeout'
RATE_LIMITED = 'rate_limited'
def error_response(
message: str,
code: str = ErrorCode.INTERNAL_ERROR,
details: Optional[Dict[str, Any]] = None,
field_errors: Optional[Dict[str, List[str]]] = None,
status_code: int = status.HTTP_400_BAD_REQUEST
) -> Response:
"""
Create standardized error response
Args:
message: Human-readable error message
code: Error code from ErrorCode class
details: Additional error context
field_errors: Field-specific validation errors
status_code: HTTP status code
Returns:
DRF Response with standardized error format
"""
response_data = {
'success': False,
'error': {
'code': code,
'message': message,
}
}
if details:
response_data['error']['details'] = details
if field_errors:
response_data['error']['field_errors'] = field_errors
return Response(response_data, status=status_code)
def success_response(
data: Any = None,
message: Optional[str] = None,
status_code: int = status.HTTP_200_OK
) -> Response:
"""
Create standardized success response
Args:
data: Response data
message: Optional success message
status_code: HTTP status code
Returns:
DRF Response with standardized success format
"""
response_data = {
'success': True,
}
if message:
response_data['message'] = message
if data is not None:
response_data['data'] = data
return Response(response_data, status=status_code)
def validation_error_response(
field_errors: Dict[str, List[str]],
message: str = 'Validation failed'
) -> Response:
"""
Create validation error response
Args:
field_errors: Dictionary mapping field names to error messages
message: General validation error message
Returns:
DRF Response with validation error format
"""
return error_response(
message=message,
code=ErrorCode.VALIDATION_ERROR,
field_errors=field_errors,
status_code=status.HTTP_400_BAD_REQUEST
)
def not_found_response(
resource: str,
identifier: Any = None
) -> Response:
"""
Create not found error response
Args:
resource: Resource type (e.g., 'Payment', 'Invoice')
identifier: Resource identifier (ID, slug, etc.)
Returns:
DRF Response with not found error
"""
message = f"{resource} not found"
if identifier:
message += f": {identifier}"
code_map = {
'Payment': ErrorCode.PAYMENT_NOT_FOUND,
'Invoice': ErrorCode.INVOICE_NOT_FOUND,
'Subscription': ErrorCode.SUBSCRIPTION_NOT_FOUND,
}
return error_response(
message=message,
code=code_map.get(resource, ErrorCode.INTERNAL_ERROR),
status_code=status.HTTP_404_NOT_FOUND
)
def unauthorized_response(
message: str = 'Authentication required'
) -> Response:
"""Create unauthorized error response"""
return error_response(
message=message,
code=ErrorCode.UNAUTHORIZED,
status_code=status.HTTP_401_UNAUTHORIZED
)
def forbidden_response(
message: str = 'You do not have permission to perform this action'
) -> Response:
"""Create forbidden error response"""
return error_response(
message=message,
code=ErrorCode.FORBIDDEN,
status_code=status.HTTP_403_FORBIDDEN
)

View File

@@ -33,6 +33,17 @@ class BillingViewSet(viewsets.GenericViewSet):
"""
permission_classes = [IsAdminOrOwner]
def get_permissions(self):
"""
Allow action-level permissions to override class-level permissions.
"""
# Try to get permission_classes from the action
try:
# DRF stores action permission_classes in the view method
return [permission() for permission in self.permission_classes]
except Exception:
return super().get_permissions()
@action(detail=False, methods=['post'], url_path='confirm-bank-transfer')
def confirm_bank_transfer(self, request):
"""
@@ -182,22 +193,30 @@ class BillingViewSet(viewsets.GenericViewSet):
def list_payment_methods(self, request):
"""
Get available payment methods for a specific country.
Public endpoint - only returns enabled payment methods.
Does not expose sensitive configuration details.
Query params:
country: ISO 2-letter country code (default: '*' for global)
country: ISO 2-letter country code (default: 'US')
Returns payment methods filtered by country (country-specific + global).
Returns payment methods filtered by country.
"""
country = request.GET.get('country', '*').upper()
country = request.GET.get('country', 'US').upper()
# Get country-specific + global methods
# Get country-specific methods
methods = PaymentMethodConfig.objects.filter(
Q(country_code=country) | Q(country_code='*'),
country_code=country,
is_enabled=True
).order_by('sort_order')
# Serialize using the proper serializer
serializer = PaymentMethodConfigSerializer(methods, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
# Return in consistent format
return Response({
'success': True,
'results': serializer.data
}, status=status.HTTP_200_OK)
@action(detail=False, methods=['post'], url_path='payments/confirm', permission_classes=[IsAuthenticatedAndActive])
def confirm_payment(self, request):
@@ -237,6 +256,26 @@ class BillingViewSet(viewsets.GenericViewSet):
account=request.account
)
# Check if payment already exists for this invoice
existing_payment = Payment.objects.filter(
invoice=invoice,
status__in=['pending_approval', 'succeeded']
).first()
if existing_payment:
if existing_payment.status == 'succeeded':
return error_response(
error='This invoice has already been paid and approved.',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
else:
return error_response(
error=f'A payment confirmation is already pending approval for this invoice (Payment ID: {existing_payment.id}).',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Validate amount matches invoice
if amount != invoice.total:
return error_response(
@@ -264,8 +303,12 @@ class BillingViewSet(viewsets.GenericViewSet):
f'Reference: {manual_reference}'
)
# TODO: Send notification to admin
# send_payment_confirmation_notification(payment)
# Send email notification to user
try:
from igny8_core.business.billing.services.email_service import BillingEmailService
BillingEmailService.send_payment_confirmation_email(payment, request.account)
except Exception as e:
logger.error(f'Failed to send payment confirmation email: {str(e)}')
return success_response(
data={
@@ -283,14 +326,20 @@ class BillingViewSet(viewsets.GenericViewSet):
except Invoice.DoesNotExist:
return error_response(
error='Invoice not found or does not belong to your account',
error='Invoice not found. Please check the invoice ID or contact support.',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except ValueError as ve:
return error_response(
error=f'Invalid amount format: {str(ve)}',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
except Exception as e:
logger.error(f'Error confirming payment: {str(e)}', exc_info=True)
return error_response(
error=f'Failed to submit payment confirmation: {str(e)}',
error='An unexpected error occurred while processing your payment confirmation. Please try again or contact support.',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@@ -310,25 +359,66 @@ class BillingViewSet(viewsets.GenericViewSet):
try:
with transaction.atomic():
# Get payment with related objects
# Get payment with all related objects to prevent N+1 queries
payment = Payment.objects.select_related(
'invoice',
'invoice__subscription',
'invoice__subscription__plan',
'account'
'account',
'account__subscription',
'account__subscription__plan',
'account__plan'
).get(id=pk)
if payment.status != 'pending_approval':
status_msg = {
'succeeded': 'This payment has already been approved and processed',
'failed': 'This payment was previously rejected and cannot be approved',
'refunded': 'This payment was refunded and cannot be re-approved'
}.get(payment.status, f'Payment has invalid status: {payment.status}')
return error_response(
error=f'Payment is not pending approval (current status: {payment.status})',
error=status_msg,
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
invoice = payment.invoice
subscription = invoice.subscription
account = payment.account
# Validate invoice is still pending
if invoice.status == 'paid':
return error_response(
error='Invoice is already marked as paid. Payment cannot be approved again.',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Validate invoice is not void
if invoice.status == 'void':
return error_response(
error='Invoice has been voided. Payment cannot be approved for a void invoice.',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Validate amount matches
if payment.amount != invoice.total:
return error_response(
error=f'Payment amount ({payment.currency} {payment.amount}) does not match invoice total ({invoice.currency} {invoice.total}). Please verify the payment.',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Get subscription from invoice first, fallback to account.subscription
subscription = None
if invoice and hasattr(invoice, 'subscription') and invoice.subscription:
subscription = invoice.subscription
elif account and hasattr(account, 'subscription'):
try:
subscription = account.subscription
except Exception:
pass
# 1. Update Payment
payment.status = 'succeeded'
payment.approved_by = request.user
@@ -354,31 +444,56 @@ class BillingViewSet(viewsets.GenericViewSet):
# 5. Add Credits (if subscription has plan)
credits_added = 0
if subscription and subscription.plan:
credits_added = subscription.plan.included_credits
# Use CreditService to add credits
CreditService.add_credits(
account=account,
amount=credits_added,
transaction_type='subscription',
description=f'{subscription.plan.name} plan credits - Invoice {invoice.invoice_number}',
metadata={
'subscription_id': subscription.id,
'invoice_id': invoice.id,
'payment_id': payment.id,
'plan_id': subscription.plan.id,
'approved_by': request.user.email
}
)
try:
if subscription and subscription.plan and subscription.plan.included_credits > 0:
credits_added = subscription.plan.included_credits
# Use CreditService to add credits
CreditService.add_credits(
account=account,
amount=credits_added,
transaction_type='subscription',
description=f'{subscription.plan.name} plan credits - Invoice {invoice.invoice_number}',
metadata={
'subscription_id': subscription.id,
'invoice_id': invoice.id,
'payment_id': payment.id,
'plan_id': subscription.plan.id,
'approved_by': request.user.email
}
)
elif account and account.plan and account.plan.included_credits > 0:
# Fallback: use account plan if subscription not found
credits_added = account.plan.included_credits
CreditService.add_credits(
account=account,
amount=credits_added,
transaction_type='subscription',
description=f'{account.plan.name} plan credits - Invoice {invoice.invoice_number}',
metadata={
'invoice_id': invoice.id,
'payment_id': payment.id,
'plan_id': account.plan.id,
'approved_by': request.user.email,
'fallback': 'account_plan'
}
)
except Exception as credit_error:
# Rollback payment approval if credit addition fails
logger.error(f'Credit addition failed for payment {payment.id}: {credit_error}', exc_info=True)
raise Exception(f'Failed to add credits: {str(credit_error)}') from credit_error
logger.info(
f'Payment approved: Payment {payment.id}, Invoice {invoice.invoice_number}, '
f'Account {account.id} activated, {credits_added} credits added'
)
# TODO: Send activation email to user
# send_account_activated_email(account, subscription)
# Send activation email to user
try:
from igny8_core.business.billing.services.email_service import BillingEmailService
BillingEmailService.send_payment_approved_email(payment, account, subscription)
except Exception as e:
logger.error(f'Failed to send payment approved email: {str(e)}')
return success_response(
data={
@@ -399,14 +514,24 @@ class BillingViewSet(viewsets.GenericViewSet):
except Payment.DoesNotExist:
return error_response(
error='Payment not found',
error='Payment not found. The payment may have been deleted or the ID is incorrect.',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except Exception as e:
logger.error(f'Error approving payment: {str(e)}', exc_info=True)
logger.error(f'Error approving payment {pk}: {str(e)}', exc_info=True)
# Provide specific error messages
error_msg = str(e)
if 'credit' in error_msg.lower():
error_msg = 'Failed to add credits to account. Payment not approved. Please check the plan configuration.'
elif 'subscription' in error_msg.lower():
error_msg = 'Failed to activate subscription. Payment not approved. Please verify subscription exists.'
else:
error_msg = f'Payment approval failed: {error_msg}'
return error_response(
error=f'Failed to approve payment: {str(e)}',
error=error_msg,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@@ -441,10 +566,20 @@ class BillingViewSet(viewsets.GenericViewSet):
payment.failure_reason = admin_notes
payment.save(update_fields=['status', 'approved_by', 'approved_at', 'failed_at', 'admin_notes', 'failure_reason'])
# Update account status to allow retry
account = payment.account
if account.status != 'active':
account.status = 'pending_payment'
account.save(update_fields=['status'])
logger.info(f'Payment rejected: Payment {payment.id}, Reason: {admin_notes}')
# TODO: Send rejection email to user
# send_payment_rejected_email(payment)
# Send rejection email to user
try:
from igny8_core.business.billing.services.email_service import BillingEmailService
BillingEmailService.send_payment_rejected_email(payment, account, admin_notes)
except Exception as e:
logger.error(f'Failed to send payment rejected email: {str(e)}')
return success_response(
data={
@@ -459,14 +594,14 @@ class BillingViewSet(viewsets.GenericViewSet):
except Payment.DoesNotExist:
return error_response(
error='Payment not found',
error='Payment not found. The payment may have been deleted or the ID is incorrect.',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except Exception as e:
logger.error(f'Error rejecting payment: {str(e)}', exc_info=True)
logger.error(f'Error rejecting payment {pk}: {str(e)}', exc_info=True)
return error_response(
error=f'Failed to reject payment: {str(e)}',
error=f'Failed to reject payment. Please try again or contact technical support.',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@@ -504,6 +639,7 @@ class InvoiceViewSet(AccountModelViewSet):
'id': invoice.id,
'invoice_number': invoice.invoice_number,
'status': invoice.status,
'total': str(invoice.total), # Alias for compatibility
'total_amount': str(invoice.total),
'subtotal': str(invoice.subtotal),
'tax_amount': str(invoice.tax),
@@ -530,6 +666,7 @@ class InvoiceViewSet(AccountModelViewSet):
'id': invoice.id,
'invoice_number': invoice.invoice_number,
'status': invoice.status,
'total': str(invoice.total), # Alias for compatibility
'total_amount': str(invoice.total),
'subtotal': str(invoice.subtotal),
'tax_amount': str(invoice.tax),
@@ -565,6 +702,17 @@ class PaymentViewSet(AccountModelViewSet):
queryset = Payment.objects.all().select_related('account', 'invoice')
permission_classes = [IsAuthenticatedAndActive, HasTenantAccess]
pagination_class = CustomPageNumberPagination
throttle_scope = 'payment_confirmation'
def get_throttles(self):
"""Apply stricter throttling to manual payment submission"""
from rest_framework.throttling import UserRateThrottle
if self.action == 'manual':
# 5 payment submissions per hour per user
class PaymentSubmissionThrottle(UserRateThrottle):
rate = '5/hour'
return [PaymentSubmissionThrottle()]
return super().get_throttles()
def get_queryset(self):
"""Filter payments by account"""
@@ -605,6 +753,7 @@ class PaymentViewSet(AccountModelViewSet):
'processed_at': payment.processed_at.isoformat() if payment.processed_at else None,
'manual_reference': payment.manual_reference,
'manual_notes': payment.manual_notes,
# admin_notes intentionally excluded - internal only
})
return paginated_response(

View File

@@ -0,0 +1,41 @@
"""
Invoice PDF views
API endpoints for generating and downloading invoice PDFs
"""
from django.http import HttpResponse
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from igny8_core.business.billing.models import Invoice
from igny8_core.business.billing.services.pdf_service import InvoicePDFGenerator
from igny8_core.business.billing.utils.errors import not_found_response
import logging
logger = logging.getLogger(__name__)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def download_invoice_pdf(request, invoice_id):
"""
Download invoice as PDF
GET /api/v1/billing/invoices/<id>/pdf/
"""
try:
invoice = Invoice.objects.prefetch_related('line_items').get(
id=invoice_id,
account=request.user.account
)
except Invoice.DoesNotExist:
return not_found_response('Invoice', invoice_id)
# Generate PDF
pdf_buffer = InvoicePDFGenerator.generate_invoice_pdf(invoice)
# Return PDF response
response = HttpResponse(pdf_buffer.read(), content_type='application/pdf')
response['Content-Disposition'] = f'attachment; filename="invoice_{invoice.invoice_number}.pdf"'
logger.info(f'Invoice PDF downloaded: {invoice.invoice_number} by user {request.user.id}')
return response

View File

@@ -0,0 +1,208 @@
"""
Refund workflow for payments
Handles full and partial refunds with proper accounting
"""
from decimal import Decimal
from django.db import transaction
from django.utils import timezone
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from igny8_core.business.billing.models import Payment, CreditTransaction, Invoice
from igny8_core.business.billing.utils.errors import (
error_response, success_response, not_found_response, ErrorCode
)
from igny8_core.business.billing.services.email_service import BillingEmailService
import logging
logger = logging.getLogger(__name__)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def initiate_refund(request, payment_id):
"""
Initiate a refund for a payment
Request body:
{
"amount": "50.00", # Optional, defaults to full refund
"reason": "Customer requested refund",
"refund_credits": true # Whether to deduct credits
}
"""
try:
payment = Payment.objects.select_related('invoice', 'account').get(
id=payment_id,
account=request.user.account
)
except Payment.DoesNotExist:
return not_found_response('Payment', payment_id)
# Validate payment can be refunded
if payment.status != 'succeeded':
return error_response(
message='Only successful payments can be refunded',
code=ErrorCode.INVALID_STATUS_TRANSITION,
status_code=status.HTTP_400_BAD_REQUEST
)
if payment.refunded_at:
return error_response(
message='This payment has already been refunded',
code=ErrorCode.PAYMENT_ALREADY_PROCESSED,
status_code=status.HTTP_400_BAD_REQUEST
)
# Parse refund amount
refund_amount = request.data.get('amount')
if refund_amount:
refund_amount = Decimal(str(refund_amount))
if refund_amount > Decimal(payment.amount):
return error_response(
message=f'Refund amount cannot exceed payment amount ({payment.amount})',
code=ErrorCode.VALIDATION_ERROR,
status_code=status.HTTP_400_BAD_REQUEST
)
else:
refund_amount = Decimal(payment.amount)
reason = request.data.get('reason', 'Refund requested')
refund_credits = request.data.get('refund_credits', True)
try:
with transaction.atomic():
# Process refund based on payment method
refund_successful = False
if payment.payment_method == 'stripe':
refund_successful = _process_stripe_refund(payment, refund_amount, reason)
elif payment.payment_method == 'paypal':
refund_successful = _process_paypal_refund(payment, refund_amount, reason)
else:
# Manual payment refund - mark as refunded
refund_successful = True
if not refund_successful:
return error_response(
message='Refund processing failed. Please contact support.',
code=ErrorCode.INTERNAL_ERROR,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
# Update payment record
payment.status = 'refunded'
payment.refunded_at = timezone.now()
payment.metadata['refund_amount'] = str(refund_amount)
payment.metadata['refund_reason'] = reason
payment.save(update_fields=['status', 'refunded_at', 'metadata'])
# Update invoice if full refund
if refund_amount == Decimal(payment.amount):
invoice = payment.invoice
if invoice.status == 'paid':
invoice.status = 'pending'
invoice.paid_at = None
invoice.save(update_fields=['status', 'paid_at'])
# Deduct credits if applicable
if refund_credits and payment.credit_transactions.exists():
for credit_tx in payment.credit_transactions.all():
if credit_tx.amount > 0: # Only deduct positive credits
# Get current balance
account = payment.account
current_balance = account.credit_balance
# Create deduction transaction
CreditTransaction.objects.create(
account=account,
transaction_type='refund',
amount=-credit_tx.amount,
balance_after=current_balance - credit_tx.amount,
description=f'Refund: {reason}',
payment=payment,
metadata={'original_transaction': credit_tx.id}
)
# Update account balance
account.credit_balance -= credit_tx.amount
account.save(update_fields=['credit_balance'])
# Send refund notification email
try:
BillingEmailService.send_refund_notification(
user=payment.account.owner,
payment=payment,
refund_amount=str(refund_amount),
reason=reason
)
except Exception as e:
logger.error(f"Failed to send refund email for payment {payment_id}: {str(e)}")
return success_response(
data={
'payment_id': payment.id,
'refund_amount': str(refund_amount),
'status': 'refunded'
},
message='Refund processed successfully'
)
except Exception as e:
logger.exception(f"Refund error for payment {payment_id}: {str(e)}")
return error_response(
message='An error occurred while processing the refund',
code=ErrorCode.INTERNAL_ERROR,
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def _process_stripe_refund(payment: Payment, amount: Decimal, reason: str) -> bool:
"""Process Stripe refund"""
try:
import stripe
from igny8_core.business.billing.utils.payment_gateways import get_stripe_client
stripe_client = get_stripe_client()
refund = stripe_client.Refund.create(
payment_intent=payment.stripe_payment_intent_id,
amount=int(amount * 100), # Convert to cents
reason='requested_by_customer',
metadata={'reason': reason}
)
payment.metadata['stripe_refund_id'] = refund.id
return refund.status == 'succeeded'
except Exception as e:
logger.exception(f"Stripe refund failed for payment {payment.id}: {str(e)}")
return False
def _process_paypal_refund(payment: Payment, amount: Decimal, reason: str) -> bool:
"""Process PayPal refund"""
try:
from igny8_core.business.billing.utils.payment_gateways import get_paypal_client
paypal_client = get_paypal_client()
refund_request = {
'amount': {
'value': str(amount),
'currency_code': payment.currency
},
'note_to_payer': reason
}
refund = paypal_client.payments.captures.refund(
payment.paypal_capture_id,
refund_request
)
payment.metadata['paypal_refund_id'] = refund.id
return refund.status == 'COMPLETED'
except Exception as e:
logger.exception(f"PayPal refund failed for payment {payment.id}: {str(e)}")
return False

View File

@@ -125,29 +125,136 @@ class PaymentAdmin(AccountAdminMixin, admin.ModelAdmin):
def save_model(self, request, obj, form, change):
"""
Override save_model to set approved_by when status changes to succeeded.
The Payment.save() method will handle all the cascade updates automatically.
Override save_model to trigger approval workflow when status changes to succeeded.
This ensures manual status changes in admin also activate accounts and add credits.
"""
from django.db import transaction
from django.utils import timezone
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.auth.models import Subscription
# Check if status changed to 'succeeded'
status_changed_to_succeeded = False
if change and 'status' in form.changed_data:
if obj.status == 'succeeded' and form.initial.get('status') != 'succeeded':
status_changed_to_succeeded = True
elif not change and obj.status == 'succeeded':
status_changed_to_succeeded = True
# Save the payment first
if obj.status == 'succeeded' and not obj.approved_by:
obj.approved_by = request.user
if not obj.approved_at:
obj.approved_at = timezone.now()
if not obj.processed_at:
obj.processed_at = timezone.now()
super().save_model(request, obj, form, change)
# If status changed to succeeded, trigger the full approval workflow
if status_changed_to_succeeded:
try:
with transaction.atomic():
invoice = obj.invoice
account = obj.account
# Get subscription from invoice or account
subscription = None
if invoice and hasattr(invoice, 'subscription') and invoice.subscription:
subscription = invoice.subscription
elif account and hasattr(account, 'subscription'):
try:
subscription = account.subscription
except Subscription.DoesNotExist:
pass
# Update Invoice
if invoice and invoice.status != 'paid':
invoice.status = 'paid'
invoice.paid_at = timezone.now()
invoice.save()
# Update Subscription
if subscription and subscription.status != 'active':
subscription.status = 'active'
subscription.external_payment_id = obj.manual_reference
subscription.save()
# Update Account
if account.status != 'active':
account.status = 'active'
account.save()
# Add Credits (check if not already added)
from igny8_core.business.billing.models import CreditTransaction
existing_credit = CreditTransaction.objects.filter(
account=account,
metadata__payment_id=obj.id
).exists()
if not existing_credit:
credits_to_add = 0
plan_name = ''
if subscription and subscription.plan:
credits_to_add = subscription.plan.included_credits
plan_name = subscription.plan.name
elif account and account.plan:
credits_to_add = account.plan.included_credits
plan_name = account.plan.name
if credits_to_add > 0:
CreditService.add_credits(
account=account,
amount=credits_to_add,
transaction_type='subscription',
description=f'{plan_name} - Invoice {invoice.invoice_number}',
metadata={
'subscription_id': subscription.id if subscription else None,
'invoice_id': invoice.id,
'payment_id': obj.id,
'approved_by': request.user.email
}
)
self.message_user(
request,
f'✓ Payment approved: Account activated, {credits_to_add} credits added',
level='SUCCESS'
)
except Exception as e:
self.message_user(
request,
f'✗ Payment saved but workflow failed: {str(e)}',
level='ERROR'
)
def approve_payments(self, request, queryset):
"""Approve selected manual payments"""
from django.db import transaction
from django.utils import timezone
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.auth.models import Subscription
count = 0
successful = []
errors = []
for payment in queryset.filter(status='pending_approval'):
try:
with transaction.atomic():
invoice = payment.invoice
subscription = invoice.subscription if hasattr(invoice, 'subscription') else None
account = payment.account
# Get subscription from invoice or account
subscription = None
if invoice and hasattr(invoice, 'subscription') and invoice.subscription:
subscription = invoice.subscription
elif account and hasattr(account, 'subscription'):
try:
subscription = account.subscription
except Subscription.DoesNotExist:
pass
# Update Payment
payment.status = 'succeeded'
payment.approved_by = request.user
@@ -172,10 +279,12 @@ class PaymentAdmin(AccountAdminMixin, admin.ModelAdmin):
account.save()
# Add Credits
if subscription and subscription.plan:
credits_added = 0
if subscription and subscription.plan and subscription.plan.included_credits > 0:
credits_added = subscription.plan.included_credits
CreditService.add_credits(
account=account,
amount=subscription.plan.included_credits,
amount=credits_added,
transaction_type='subscription',
description=f'{subscription.plan.name} - Invoice {invoice.invoice_number}',
metadata={
@@ -185,17 +294,38 @@ class PaymentAdmin(AccountAdminMixin, admin.ModelAdmin):
'approved_by': request.user.email
}
)
elif account and account.plan and account.plan.included_credits > 0:
credits_added = account.plan.included_credits
CreditService.add_credits(
account=account,
amount=credits_added,
transaction_type='subscription',
description=f'{account.plan.name} - Invoice {invoice.invoice_number}',
metadata={
'invoice_id': invoice.id,
'payment_id': payment.id,
'approved_by': request.user.email
}
)
count += 1
successful.append(f'Payment #{payment.id} - {account.name} - Invoice {invoice.invoice_number} - {credits_added} credits')
except Exception as e:
errors.append(f'Payment {payment.id}: {str(e)}')
errors.append(f'Payment #{payment.id}: {str(e)}')
if count:
self.message_user(request, f'Successfully approved {count} payment(s)')
# Detailed success message
if successful:
self.message_user(request, f'✓ Successfully approved {len(successful)} payment(s):', level='SUCCESS')
for msg in successful[:10]: # Show first 10
self.message_user(request, f'{msg}', level='SUCCESS')
if len(successful) > 10:
self.message_user(request, f' ... and {len(successful) - 10} more', level='SUCCESS')
# Detailed error messages
if errors:
self.message_user(request, f'✗ Failed to approve {len(errors)} payment(s):', level='ERROR')
for error in errors:
self.message_user(request, error, level='ERROR')
self.message_user(request, f'{error}', level='ERROR')
approve_payments.short_description = 'Approve selected manual payments'

View File

@@ -0,0 +1,84 @@
# Generated migration to add subscription FK to Invoice and fix Payment status default
from django.db import migrations, models
import django.db.models.deletion
def populate_subscription_from_metadata(apps, schema_editor):
"""Populate subscription FK from metadata for existing invoices"""
# Use raw SQL to avoid model field issues during migration
from django.db import connection
with connection.cursor() as cursor:
# Get invoices with subscription_id in metadata
cursor.execute("""
SELECT id, metadata
FROM igny8_invoices
WHERE metadata::text LIKE '%subscription_id%'
""")
updated_count = 0
for invoice_id, metadata in cursor.fetchall():
if metadata and isinstance(metadata, dict) and 'subscription_id' in metadata:
try:
sub_id = int(metadata['subscription_id'])
# Check if subscription exists
cursor.execute(
"SELECT id FROM igny8_subscriptions WHERE id = %s",
[sub_id]
)
if cursor.fetchone():
# Update invoice with subscription FK
cursor.execute(
"UPDATE igny8_invoices SET subscription_id = %s WHERE id = %s",
[sub_id, invoice_id]
)
updated_count += 1
except (ValueError, KeyError) as e:
print(f"Could not populate subscription for invoice {invoice_id}: {e}")
print(f"Populated subscription FK for {updated_count} invoices")
class Migration(migrations.Migration):
dependencies = [
('billing', '0007_simplify_payment_statuses'),
('igny8_core_auth', '0011_remove_subscription_payment_method'),
]
operations = [
# Add subscription FK to Invoice
migrations.AddField(
model_name='invoice',
name='subscription',
field=models.ForeignKey(
blank=True,
help_text='Subscription this invoice is for (if subscription-based)',
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='invoices',
to='igny8_core_auth.subscription'
),
),
# Populate data
migrations.RunPython(
populate_subscription_from_metadata,
reverse_code=migrations.RunPython.noop
),
# Fix Payment status default
migrations.AlterField(
model_name='payment',
name='status',
field=models.CharField(
choices=[
('pending_approval', 'Pending Approval'),
('succeeded', 'Succeeded'),
('failed', 'Failed'),
('refunded', 'Refunded')
],
db_index=True,
default='pending_approval',
max_length=20
),
),
]

View File

@@ -0,0 +1,80 @@
# Migration to add missing payment method configurations
from django.db import migrations
def add_missing_payment_methods(apps, schema_editor):
"""Add stripe and paypal global configs (disabled) and ensure all configs exist"""
PaymentMethodConfig = apps.get_model('billing', 'PaymentMethodConfig')
# Add global Stripe (disabled - waiting for integration)
PaymentMethodConfig.objects.get_or_create(
country_code='*',
payment_method='stripe',
defaults={
'is_enabled': False,
'display_name': 'Credit/Debit Card (Stripe)',
'instructions': 'Stripe payment integration coming soon.',
'sort_order': 1
}
)
# Add global PayPal (disabled - waiting for integration)
PaymentMethodConfig.objects.get_or_create(
country_code='*',
payment_method='paypal',
defaults={
'is_enabled': False,
'display_name': 'PayPal',
'instructions': 'PayPal payment integration coming soon.',
'sort_order': 2
}
)
# Ensure global bank_transfer exists with good instructions
PaymentMethodConfig.objects.get_or_create(
country_code='*',
payment_method='bank_transfer',
defaults={
'is_enabled': True,
'display_name': 'Bank Transfer',
'instructions': 'Bank transfer details will be provided after registration.',
'sort_order': 3
}
)
# Add manual payment as global option
PaymentMethodConfig.objects.get_or_create(
country_code='*',
payment_method='manual',
defaults={
'is_enabled': True,
'display_name': 'Manual Payment (Contact Support)',
'instructions': 'Contact support@igny8.com for manual payment arrangements.',
'sort_order': 10
}
)
print("Added/updated payment method configurations")
def remove_added_payment_methods(apps, schema_editor):
"""Reverse migration"""
PaymentMethodConfig = apps.get_model('billing', 'PaymentMethodConfig')
PaymentMethodConfig.objects.filter(
country_code='*',
payment_method__in=['stripe', 'paypal', 'manual']
).delete()
class Migration(migrations.Migration):
dependencies = [
('billing', '0008_add_invoice_subscription_fk'),
]
operations = [
migrations.RunPython(
add_missing_payment_methods,
reverse_code=remove_added_payment_methods
),
]

View File

@@ -0,0 +1,58 @@
# Migration to add database constraints and indexes for data integrity
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('billing', '0009_add_missing_payment_methods'),
('igny8_core_auth', '0011_remove_subscription_payment_method'),
]
operations = [
# Add DB index on invoice_number for fast lookups
migrations.AlterField(
model_name='invoice',
name='invoice_number',
field=models.CharField(db_index=True, max_length=50, unique=True),
),
# Add index on Payment.status for filtering
migrations.AlterField(
model_name='payment',
name='status',
field=models.CharField(
choices=[
('pending_approval', 'Pending Approval'),
('succeeded', 'Succeeded'),
('failed', 'Failed'),
('refunded', 'Refunded')
],
db_index=True,
default='pending_approval',
max_length=20
),
),
# Add partial unique index on AccountPaymentMethod for single default per account
# This prevents multiple is_default=True per account
migrations.RunSQL(
sql="""
CREATE UNIQUE INDEX billing_account_payment_method_single_default
ON igny8_account_payment_methods (tenant_id)
WHERE is_default = true AND is_enabled = true;
""",
reverse_sql="""
DROP INDEX IF EXISTS billing_account_payment_method_single_default;
"""
),
# Add composite index on Payment for common queries
migrations.AddIndex(
model_name='payment',
index=models.Index(
fields=['account', 'status', '-created_at'],
name='payment_account_status_created_idx'
),
),
]

View File

@@ -0,0 +1,25 @@
# Generated migration to add payment constraints
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('billing', '0010_add_database_constraints'),
]
operations = [
# Add composite unique constraint on manual_reference + tenant_id
# This prevents duplicate payment submissions with same reference for an account
migrations.RunSQL(
sql="""
CREATE UNIQUE INDEX billing_payment_manual_ref_account_unique
ON igny8_payments(tenant_id, manual_reference)
WHERE manual_reference != '' AND manual_reference IS NOT NULL;
""",
reverse_sql="""
DROP INDEX IF EXISTS billing_payment_manual_ref_account_unique;
"""
),
]

View File

@@ -0,0 +1,44 @@
# Generated migration to add Payment FK to CreditTransaction
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('billing', '0011_add_manual_reference_constraint'),
]
operations = [
# Add payment FK field
migrations.AddField(
model_name='credittransaction',
name='payment',
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='credit_transactions',
to='billing.payment',
help_text='Payment that triggered this credit transaction'
),
),
# Migrate existing reference_id data to payment FK
migrations.RunSQL(
sql="""
UPDATE igny8_credit_transactions ct
SET payment_id = (
SELECT id FROM igny8_payments p
WHERE p.id::text = ct.reference_id
AND ct.reference_id ~ '^[0-9]+$'
LIMIT 1
)
WHERE ct.reference_id IS NOT NULL
AND ct.reference_id != ''
AND ct.reference_id ~ '^[0-9]+$';
""",
reverse_sql=migrations.RunSQL.noop
),
]

View File

@@ -0,0 +1,49 @@
# Generated migration to add webhook fields to PaymentMethodConfig
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('billing', '0012_add_payment_fk_to_credit_transaction'),
]
operations = [
# Add webhook configuration fields
migrations.AddField(
model_name='paymentmethodconfig',
name='webhook_url',
field=models.URLField(
blank=True,
help_text='Webhook URL for payment gateway callbacks (Stripe/PayPal)'
),
),
migrations.AddField(
model_name='paymentmethodconfig',
name='webhook_secret',
field=models.CharField(
max_length=255,
blank=True,
help_text='Webhook secret for signature verification'
),
),
migrations.AddField(
model_name='paymentmethodconfig',
name='api_key',
field=models.CharField(
max_length=255,
blank=True,
help_text='API key for payment gateway integration'
),
),
migrations.AddField(
model_name='paymentmethodconfig',
name='api_secret',
field=models.CharField(
max_length=255,
blank=True,
help_text='API secret for payment gateway integration'
),
),
]

View File

@@ -1,6 +1,8 @@
"""
Serializers for Billing Models
"""
from typing import Any, Dict, Optional
from decimal import Decimal
from rest_framework import serializers
from .models import CreditTransaction, CreditUsageLog
from igny8_core.auth.models import Account
@@ -8,7 +10,11 @@ from igny8_core.business.billing.models import PaymentMethodConfig, Payment
class CreditTransactionSerializer(serializers.ModelSerializer):
transaction_type_display = serializers.CharField(source='get_transaction_type_display', read_only=True)
"""Serializer for credit transactions"""
transaction_type_display: serializers.CharField = serializers.CharField(
source='get_transaction_type_display',
read_only=True
)
class Meta:
model = CreditTransaction
@@ -20,7 +26,11 @@ class CreditTransactionSerializer(serializers.ModelSerializer):
class CreditUsageLogSerializer(serializers.ModelSerializer):
operation_type_display = serializers.CharField(source='get_operation_type_display', read_only=True)
"""Serializer for credit usage logs"""
operation_type_display: serializers.CharField = serializers.CharField(
source='get_operation_type_display',
read_only=True
)
class Meta:
model = CreditUsageLog
@@ -34,24 +44,27 @@ class CreditUsageLogSerializer(serializers.ModelSerializer):
class CreditBalanceSerializer(serializers.Serializer):
"""Serializer for credit balance response"""
credits = serializers.IntegerField()
plan_credits_per_month = serializers.IntegerField()
credits_used_this_month = serializers.IntegerField()
credits_remaining = serializers.IntegerField()
credits: serializers.IntegerField = serializers.IntegerField()
plan_credits_per_month: serializers.IntegerField = serializers.IntegerField()
credits_used_this_month: serializers.IntegerField = serializers.IntegerField()
credits_remaining: serializers.IntegerField = serializers.IntegerField()
class UsageSummarySerializer(serializers.Serializer):
"""Serializer for usage summary response"""
period = serializers.DictField()
total_credits_used = serializers.IntegerField()
total_cost_usd = serializers.DecimalField(max_digits=10, decimal_places=2)
by_operation = serializers.DictField()
by_model = serializers.DictField()
period: serializers.DictField = serializers.DictField()
total_credits_used: serializers.IntegerField = serializers.IntegerField()
total_cost_usd: serializers.DecimalField = serializers.DecimalField(max_digits=10, decimal_places=2)
by_operation: serializers.DictField = serializers.DictField()
by_model: serializers.DictField = serializers.DictField()
class PaymentMethodConfigSerializer(serializers.ModelSerializer):
"""Serializer for payment method configuration"""
payment_method_display = serializers.CharField(source='get_payment_method_display', read_only=True)
payment_method_display: serializers.CharField = serializers.CharField(
source='get_payment_method_display',
read_only=True
)
class Meta:
model = PaymentMethodConfig
@@ -66,43 +79,66 @@ class PaymentMethodConfigSerializer(serializers.ModelSerializer):
class PaymentConfirmationSerializer(serializers.Serializer):
"""Serializer for manual payment confirmation"""
invoice_id = serializers.IntegerField(required=True)
payment_method = serializers.ChoiceField(
invoice_id: serializers.IntegerField = serializers.IntegerField(required=True)
payment_method: serializers.ChoiceField = serializers.ChoiceField(
choices=['bank_transfer', 'local_wallet'],
required=True
)
manual_reference = serializers.CharField(
manual_reference: serializers.CharField = serializers.CharField(
required=True,
max_length=255,
help_text="Transaction reference number"
)
manual_notes = serializers.CharField(
manual_notes: serializers.CharField = serializers.CharField(
required=False,
allow_blank=True,
help_text="Additional notes about the payment"
)
amount = serializers.DecimalField(
amount: serializers.DecimalField = serializers.DecimalField(
max_digits=10,
decimal_places=2,
required=True
)
proof_url = serializers.URLField(
proof_url: serializers.URLField = serializers.URLField(
required=False,
allow_blank=True,
help_text="URL to receipt/proof of payment"
)
def validate_proof_url(self, value: Optional[str]) -> Optional[str]:
"""Validate proof_url is a valid URL format"""
if value and not value.strip():
raise serializers.ValidationError("Proof URL cannot be empty if provided")
if value:
# Additional validation: must be http or https
if not value.startswith(('http://', 'https://')):
raise serializers.ValidationError("Proof URL must start with http:// or https://")
return value
def validate_amount(self, value: Optional[Decimal]) -> Decimal:
"""Validate amount has max 2 decimal places"""
if value is None:
raise serializers.ValidationError("Amount is required")
if value <= 0:
raise serializers.ValidationError("Amount must be greater than 0")
# Check decimal precision (max 2 decimal places)
if value.as_tuple().exponent < -2:
raise serializers.ValidationError("Amount can have maximum 2 decimal places")
return value
class LimitCardSerializer(serializers.Serializer):
"""Serializer for individual limit card"""
title = serializers.CharField()
limit = serializers.IntegerField()
used = serializers.IntegerField()
available = serializers.IntegerField()
unit = serializers.CharField()
category = serializers.CharField()
percentage = serializers.FloatField()
title: serializers.CharField = serializers.CharField()
limit: serializers.IntegerField = serializers.IntegerField()
used: serializers.IntegerField = serializers.IntegerField()
available: serializers.IntegerField = serializers.IntegerField()
unit: serializers.CharField = serializers.CharField()
category: serializers.CharField = serializers.CharField()
percentage: serializers.FloatField = serializers.FloatField()
class UsageLimitsSerializer(serializers.Serializer):
"""Serializer for usage limits response"""
limits = LimitCardSerializer(many=True)
limits: LimitCardSerializer = LimitCardSerializer(many=True)