Files
igny8/backend/igny8_core/modules/billing/views.py

405 lines
16 KiB
Python

"""
ViewSets for Billing API
Unified API Standard v1.0 compliant
"""
from rest_framework import viewsets, status, permissions
from rest_framework.decorators import action
from rest_framework.response import Response
from django.db.models import Sum, Count, Q
from django.utils import timezone
from datetime import timedelta
from decimal import Decimal
from drf_spectacular.utils import extend_schema, extend_schema_view
from igny8_core.api.base import AccountModelViewSet
from igny8_core.api.pagination import CustomPageNumberPagination
from igny8_core.api.response import success_response, error_response
from igny8_core.api.throttles import DebugScopedRateThrottle
from igny8_core.api.authentication import JWTAuthentication, CSRFExemptSessionAuthentication
from igny8_core.api.permissions import IsAuthenticatedAndActive, HasTenantAccess, IsAdminOrOwner
from .models import CreditTransaction, CreditUsageLog
from .serializers import (
CreditTransactionSerializer, CreditUsageLogSerializer,
CreditBalanceSerializer, UsageSummarySerializer, UsageLimitsSerializer
)
from .services import CreditService
from .exceptions import InsufficientCreditsError
@extend_schema_view(
list=extend_schema(tags=['Billing']),
)
class CreditBalanceViewSet(viewsets.ViewSet):
"""
ViewSet for credit balance operations
Unified API Standard v1.0 compliant
"""
permission_classes = [IsAuthenticatedAndActive, HasTenantAccess]
authentication_classes = [JWTAuthentication, CSRFExemptSessionAuthentication]
throttle_scope = 'billing'
throttle_classes = [DebugScopedRateThrottle]
@action(detail=False, methods=['get'])
def balance(self, request):
"""Get current credit balance and usage"""
account = getattr(request, 'account', None)
if not account:
user = getattr(request, 'user', None)
if user and user.is_authenticated:
from igny8_core.auth.models import User as UserModel
user = UserModel.objects.select_related('account', 'account__plan').get(id=user.id)
account = user.account
request.account = account
if not account:
return success_response(data={
'credits': 0,
'plan_credits_per_month': 0,
'credits_used_this_month': 0,
'credits_remaining': 0,
}, request=request)
# Get plan credits - plan is already associated
plan_credits_per_month = 0
if account.plan:
plan_credits_per_month = account.plan.get_effective_credits_per_month()
# Calculate credits used this month
now = timezone.now()
start_of_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
credits_used_this_month = CreditUsageLog.objects.filter(
account=account,
created_at__gte=start_of_month
).aggregate(total=Sum('credits_used'))['total'] or 0
credits = account.credits or 0
credits_remaining = credits
data = {
'credits': credits,
'plan_credits_per_month': plan_credits_per_month,
'credits_used_this_month': credits_used_this_month,
'credits_remaining': credits_remaining,
}
serializer = CreditBalanceSerializer(data)
return success_response(data=serializer.data, request=request)
@extend_schema_view(
list=extend_schema(tags=['Billing']),
retrieve=extend_schema(tags=['Billing']),
)
class CreditUsageViewSet(AccountModelViewSet):
"""
ViewSet for credit usage logs
Unified API Standard v1.0 compliant
"""
queryset = CreditUsageLog.objects.all()
serializer_class = CreditUsageLogSerializer
permission_classes = [IsAuthenticatedAndActive, HasTenantAccess]
authentication_classes = [JWTAuthentication, CSRFExemptSessionAuthentication]
pagination_class = CustomPageNumberPagination
throttle_scope = 'billing'
throttle_classes = [DebugScopedRateThrottle]
filter_backends = []
def get_queryset(self):
"""Get usage logs for current account - base class handles account filtering"""
queryset = super().get_queryset()
# Filter by operation type
operation_type = self.request.query_params.get('operation_type')
if operation_type:
queryset = queryset.filter(operation_type=operation_type)
# Filter by date range
start_date = self.request.query_params.get('start_date')
end_date = self.request.query_params.get('end_date')
if start_date:
queryset = queryset.filter(created_at__gte=start_date)
if end_date:
queryset = queryset.filter(created_at__lte=end_date)
return queryset.order_by('-created_at')
@action(detail=False, methods=['get'])
def summary(self, request):
"""Get usage summary for date range"""
account = getattr(request, 'account', None)
if not account:
user = getattr(request, 'user', None)
if user:
account = getattr(user, 'account', None)
if not account:
return error_response(
error='Account not found',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Get date range from query params
start_date = request.query_params.get('start_date')
end_date = request.query_params.get('end_date')
# Default to current month if not provided
now = timezone.now()
if not start_date:
start_date = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
else:
from django.utils.dateparse import parse_datetime
start_date = parse_datetime(start_date) or start_date
if not end_date:
end_date = now
else:
from django.utils.dateparse import parse_datetime
end_date = parse_datetime(end_date) or end_date
# Get usage logs in date range
usage_logs = CreditUsageLog.objects.filter(
account=account,
created_at__gte=start_date,
created_at__lte=end_date
)
# Calculate totals
total_credits_used = usage_logs.aggregate(total=Sum('credits_used'))['total'] or 0
total_cost_usd = usage_logs.aggregate(total=Sum('cost_usd'))['total'] or Decimal('0.00')
# Group by operation type
by_operation = {}
for operation_type, _ in CreditUsageLog.OPERATION_TYPE_CHOICES:
operation_logs = usage_logs.filter(operation_type=operation_type)
credits = operation_logs.aggregate(total=Sum('credits_used'))['total'] or 0
cost = operation_logs.aggregate(total=Sum('cost_usd'))['total'] or Decimal('0.00')
count = operation_logs.count()
if credits > 0 or count > 0:
by_operation[operation_type] = {
'credits': credits,
'cost': float(cost),
'count': count
}
# Group by model
by_model = {}
model_stats = usage_logs.values('model_used').annotate(
credits=Sum('credits_used'),
cost=Sum('cost_usd'),
count=Count('id')
).filter(model_used__isnull=False).exclude(model_used='')
for stat in model_stats:
model = stat['model_used']
by_model[model] = {
'credits': stat['credits'] or 0,
'cost': float(stat['cost'] or Decimal('0.00'))
}
data = {
'period': {
'start': start_date.isoformat() if hasattr(start_date, 'isoformat') else str(start_date),
'end': end_date.isoformat() if hasattr(end_date, 'isoformat') else str(end_date),
},
'total_credits_used': total_credits_used,
'total_cost_usd': float(total_cost_usd),
'by_operation': by_operation,
'by_model': by_model,
}
serializer = UsageSummarySerializer(data)
return success_response(data=serializer.data, request=request)
@action(detail=False, methods=['get'], url_path='limits', url_name='limits')
def limits(self, request):
"""
Get account limits and credit usage statistics (Phase 0: Credit-only system).
Returns account management limits and credit usage only.
"""
# Try multiple ways to get account
account = getattr(request, 'account', None)
if not account:
user = getattr(request, 'user', None)
if user and user.is_authenticated:
# Try to get account from user - refresh from DB to ensure we have latest
try:
from igny8_core.auth.models import User as UserModel
# Refresh user from DB to get account relationship
user = UserModel.objects.select_related('account', 'account__plan').get(id=user.id)
account = user.account
# Also set it on request for future use
request.account = account
except (AttributeError, UserModel.DoesNotExist, Exception) as e:
account = None
if not account:
# Return empty limits instead of error - frontend will show "no data" message
return success_response(data={'limits': []}, request=request)
plan = account.plan
if not plan:
# Return empty limits instead of error - allows frontend to show "no plan" message
return success_response(data={'limits': []}, request=request)
# Import models
from igny8_core.auth.models import User, Site
# Get current month boundaries
now = timezone.now()
start_of_month = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
# Calculate usage statistics
limits_data = []
# Credit Usage (Phase 0: Credit-only system)
credits_used_month = CreditUsageLog.objects.filter(
account=account,
created_at__gte=start_of_month
).aggregate(total=Sum('credits_used'))['total'] or 0
# Get credits by operation type
cluster_credits = CreditUsageLog.objects.filter(
account=account,
operation_type__in=['clustering'],
created_at__gte=start_of_month
).aggregate(total=Sum('credits_used'))['total'] or 0
content_credits = CreditUsageLog.objects.filter(
account=account,
operation_type__in=['content', 'content_generation'],
created_at__gte=start_of_month
).aggregate(total=Sum('credits_used'))['total'] or 0
image_credits = CreditUsageLog.objects.filter(
account=account,
operation_type__in=['images', 'image_generation', 'image_prompt_extraction'],
created_at__gte=start_of_month
).aggregate(total=Sum('credits_used'))['total'] or 0
idea_credits = CreditUsageLog.objects.filter(
account=account,
operation_type__in=['ideas', 'idea_generation'],
created_at__gte=start_of_month
).aggregate(total=Sum('credits_used'))['total'] or 0
# Use included_credits from plan (Phase 0: Credit-only)
plan_credits = plan.included_credits or plan.credits_per_month or 0
limits_data.extend([
{
'title': 'Monthly Credits',
'limit': plan_credits,
'used': credits_used_month,
'available': max(0, plan_credits - credits_used_month),
'unit': 'credits',
'category': 'credits',
'percentage': (credits_used_month / plan_credits * 100) if plan_credits else 0
},
{
'title': 'Current Balance',
'limit': None, # No limit - shows current balance
'used': None,
'available': account.credits,
'unit': 'credits',
'category': 'credits',
'percentage': None
},
{
'title': 'Clustering Credits',
'limit': None,
'used': cluster_credits,
'available': None,
'unit': 'credits',
'category': 'credits',
'percentage': None
},
{
'title': 'Content Generation Credits',
'limit': None,
'used': content_credits,
'available': None,
'unit': 'credits',
'category': 'credits',
'percentage': None
},
{
'title': 'Image Generation Credits',
'limit': None,
'used': image_credits,
'available': None,
'unit': 'credits',
'category': 'credits',
'percentage': None
},
{
'title': 'Idea Generation Credits',
'limit': None,
'used': idea_credits,
'available': None,
'unit': 'credits',
'category': 'credits',
'percentage': None
},
])
# Account Management Limits (kept - not operation limits)
users_count = User.objects.filter(account=account).count()
sites_count = Site.objects.filter(account=account).count()
limits_data.extend([
{
'title': 'Users',
'limit': plan.max_users or 0,
'used': users_count,
'available': max(0, (plan.max_users or 0) - users_count),
'unit': 'users',
'category': 'account',
'percentage': (users_count / (plan.max_users or 1)) * 100 if plan.max_users else 0
},
{
'title': 'Sites',
'limit': plan.max_sites or 0,
'used': sites_count,
'available': max(0, (plan.max_sites or 0) - sites_count),
'unit': 'sites',
'category': 'account',
'percentage': (sites_count / (plan.max_sites or 1)) * 100 if plan.max_sites else 0
},
])
# Return data directly - serializer validation not needed for read-only endpoint
return success_response(data={'limits': limits_data}, request=request)
@extend_schema_view(
list=extend_schema(tags=['Billing']),
retrieve=extend_schema(tags=['Billing']),
)
class CreditTransactionViewSet(AccountModelViewSet):
"""
ViewSet for credit transaction history
Unified API Standard v1.0 compliant
"""
queryset = CreditTransaction.objects.all()
serializer_class = CreditTransactionSerializer
permission_classes = [IsAuthenticatedAndActive, HasTenantAccess, IsAdminOrOwner]
authentication_classes = [JWTAuthentication, CSRFExemptSessionAuthentication]
pagination_class = CustomPageNumberPagination
throttle_scope = 'billing'
throttle_classes = [DebugScopedRateThrottle]
def get_queryset(self):
"""Get transactions for current account - base class handles account filtering"""
queryset = super().get_queryset()
# Filter by transaction type
transaction_type = self.request.query_params.get('transaction_type')
if transaction_type:
queryset = queryset.filter(transaction_type=transaction_type)
return queryset.order_by('-created_at')