Files
igny8/backend/igny8_core/admin/reports.py

622 lines
24 KiB
Python

"""
Analytics & Reporting Views for IGNY8 Admin
"""
from django.contrib.admin.views.decorators import staff_member_required
from django.shortcuts import render
from django.db.models import Count, Sum, Avg, Q
from django.utils import timezone
from datetime import timedelta
import json
@staff_member_required
def revenue_report(request):
"""Revenue and billing analytics"""
from igny8_core.business.billing.models import Payment
from igny8_core.auth.models import Plan
# Date ranges
today = timezone.now()
months = []
monthly_revenue = []
for i in range(6):
month_start = today.replace(day=1) - timedelta(days=30*i)
month_end = month_start.replace(day=28) + timedelta(days=4)
revenue = Payment.objects.filter(
status='succeeded',
processed_at__gte=month_start,
processed_at__lt=month_end
).aggregate(total=Sum('amount'))['total'] or 0
months.insert(0, month_start.strftime('%b %Y'))
monthly_revenue.insert(0, float(revenue))
# Plan distribution
plan_distribution = Plan.objects.annotate(
account_count=Count('accounts')
).values('name', 'account_count')
# Payment method breakdown
payment_methods = Payment.objects.filter(
status='succeeded'
).values('payment_method').annotate(
count=Count('id'),
total=Sum('amount')
).order_by('-total')
# Total revenue all time
total_revenue = Payment.objects.filter(
status='succeeded'
).aggregate(total=Sum('amount'))['total'] or 0
context = {
'title': 'Revenue Report',
'months': json.dumps(months),
'monthly_revenue': json.dumps(monthly_revenue),
'plan_distribution': list(plan_distribution),
'payment_methods': list(payment_methods),
'total_revenue': float(total_revenue),
}
# Merge with admin context
from igny8_core.admin.site import admin_site
admin_context = admin_site.each_context(request)
context.update(admin_context)
return render(request, 'admin/reports/revenue.html', context)
@staff_member_required
def usage_report(request):
"""Credit usage and AI operations analytics"""
from igny8_core.business.billing.models import CreditUsageLog
# Usage by operation type
usage_by_operation = CreditUsageLog.objects.values(
'operation_type'
).annotate(
total_credits=Sum('credits_used'),
total_cost=Sum('cost_usd'),
operation_count=Count('id')
).order_by('-total_credits')
# Top credit consumers
top_consumers = CreditUsageLog.objects.values(
'account__name'
).annotate(
total_credits=Sum('credits_used'),
operation_count=Count('id')
).order_by('-total_credits')[:10]
# Model usage distribution
model_usage = CreditUsageLog.objects.values(
'model_used'
).annotate(
usage_count=Count('id')
).order_by('-usage_count')
# Total credits used
total_credits = CreditUsageLog.objects.aggregate(
total=Sum('credits_used')
)['total'] or 0
context = {
'title': 'Usage Report',
'usage_by_operation': list(usage_by_operation),
'top_consumers': list(top_consumers),
'model_usage': list(model_usage),
'total_credits': int(total_credits),
}
# Merge with admin context
from igny8_core.admin.site import admin_site
admin_context = admin_site.each_context(request)
context.update(admin_context)
return render(request, 'admin/reports/usage.html', context)
@staff_member_required
def content_report(request):
"""Content production analytics"""
from igny8_core.modules.writer.models import Content, Tasks
# Content by type
content_by_type = Content.objects.values(
'content_type'
).annotate(count=Count('id')).order_by('-count')
# Production timeline (last 30 days)
days = []
daily_counts = []
for i in range(30):
day = timezone.now().date() - timedelta(days=i)
count = Content.objects.filter(created_at__date=day).count()
days.insert(0, day.strftime('%m/%d'))
daily_counts.insert(0, count)
# Average word count by content type
avg_words = Content.objects.values('content_type').annotate(
avg_words=Avg('word_count')
).order_by('-avg_words')
# Task completion rate
total_tasks = Tasks.objects.count()
completed_tasks = Tasks.objects.filter(status='completed').count()
completion_rate = (completed_tasks / total_tasks * 100) if total_tasks > 0 else 0
# Total content produced
total_content = Content.objects.count()
context = {
'title': 'Content Production Report',
'content_by_type': list(content_by_type),
'days': json.dumps(days),
'daily_counts': json.dumps(daily_counts),
'avg_words': list(avg_words),
'completion_rate': round(completion_rate, 1),
'total_content': total_content,
'total_tasks': total_tasks,
'completed_tasks': completed_tasks,
}
# Merge with admin context
from igny8_core.admin.site import admin_site
admin_context = admin_site.each_context(request)
context.update(admin_context)
return render(request, 'admin/reports/content.html', context)
@staff_member_required
def data_quality_report(request):
"""Check data quality and integrity"""
issues = []
# Orphaned content (no site)
from igny8_core.modules.writer.models import Content
orphaned_content = Content.objects.filter(site__isnull=True).count()
if orphaned_content > 0:
issues.append({
'severity': 'warning',
'type': 'Orphaned Records',
'count': orphaned_content,
'description': 'Content items without assigned site',
'action_url': '/admin/writer/content/?site__isnull=True'
})
# Tasks without clusters
from igny8_core.modules.writer.models import Tasks
tasks_no_cluster = Tasks.objects.filter(cluster__isnull=True).count()
if tasks_no_cluster > 0:
issues.append({
'severity': 'info',
'type': 'Missing Relationships',
'count': tasks_no_cluster,
'description': 'Tasks without assigned cluster',
'action_url': '/admin/writer/tasks/?cluster__isnull=True'
})
# Accounts with negative credits
from igny8_core.auth.models import Account
negative_credits = Account.objects.filter(credits__lt=0).count()
if negative_credits > 0:
issues.append({
'severity': 'error',
'type': 'Data Integrity',
'count': negative_credits,
'description': 'Accounts with negative credit balance',
'action_url': '/admin/igny8_core_auth/account/?credits__lt=0'
})
# Duplicate keywords
from igny8_core.modules.planner.models import Keywords
duplicates = Keywords.objects.values('seed_keyword', 'site', 'sector').annotate(
count=Count('id')
).filter(count__gt=1).count()
if duplicates > 0:
issues.append({
'severity': 'warning',
'type': 'Duplicates',
'count': duplicates,
'description': 'Duplicate keywords for same site/sector',
'action_url': '/admin/planner/keywords/'
})
# Content without SEO data
no_seo = Content.objects.filter(
Q(meta_title__isnull=True) | Q(meta_title='') |
Q(meta_description__isnull=True) | Q(meta_description='')
).count()
if no_seo > 0:
issues.append({
'severity': 'info',
'type': 'Incomplete Data',
'count': no_seo,
'description': 'Content missing SEO metadata',
'action_url': '/admin/writer/content/'
})
context = {
'title': 'Data Quality Report',
'issues': issues,
'total_issues': len(issues),
}
# Merge with admin context
from igny8_core.admin.site import admin_site
admin_context = admin_site.each_context(request)
context.update(admin_context)
return render(request, 'admin/reports/data_quality.html', context)
@staff_member_required
def token_usage_report(request):
"""Comprehensive token usage analytics with multi-dimensional insights"""
from igny8_core.business.billing.models import CreditUsageLog, AIModelConfig
from igny8_core.auth.models import Account
from decimal import Decimal
# Date filter setup
days_filter = request.GET.get('days', '30')
try:
days = int(days_filter)
except ValueError:
days = 30
start_date = timezone.now() - timedelta(days=days)
# Base queryset - include all records (tokens may be 0 for historical data)
logs = CreditUsageLog.objects.filter(
created_at__gte=start_date
)
# Total statistics
total_tokens_input = logs.aggregate(total=Sum('tokens_input'))['total'] or 0
total_tokens_output = logs.aggregate(total=Sum('tokens_output'))['total'] or 0
total_tokens = total_tokens_input + total_tokens_output
total_calls = logs.count()
avg_tokens_per_call = total_tokens / total_calls if total_calls > 0 else 0
# Token usage by model (using model_config FK)
token_by_model = logs.filter(model_config__isnull=False).values(
'model_config__model_name',
'model_config__display_name'
).annotate(
total_tokens_input=Sum('tokens_input'),
total_tokens_output=Sum('tokens_output'),
call_count=Count('id'),
total_cost_input=Sum('cost_usd_input'),
total_cost_output=Sum('cost_usd_output')
).order_by('-total_tokens_input')[:10]
# Add total_tokens and total_cost to each model
for model in token_by_model:
model['total_tokens'] = (model['total_tokens_input'] or 0) + (model['total_tokens_output'] or 0)
model['total_cost'] = (model['total_cost_input'] or 0) + (model['total_cost_output'] or 0)
model['avg_tokens'] = model['total_tokens'] / model['call_count'] if model['call_count'] > 0 else 0
model['model'] = model['model_config__display_name'] or model['model_config__model_name']
token_by_model = sorted(token_by_model, key=lambda x: x['total_tokens'], reverse=True)
# Token usage by function/operation
token_by_function = logs.values('operation_type').annotate(
total_tokens_input=Sum('tokens_input'),
total_tokens_output=Sum('tokens_output'),
call_count=Count('id'),
total_cost=Sum('cost_usd_total')
).order_by('-total_tokens_input')[:10]
# Add total_tokens to each function
for func in token_by_function:
func['total_tokens'] = (func['total_tokens_input'] or 0) + (func['total_tokens_output'] or 0)
func['avg_tokens'] = func['total_tokens'] / func['call_count'] if func['call_count'] > 0 else 0
func['function'] = func['operation_type'].replace('_', ' ').title() if func['operation_type'] else 'Unknown'
token_by_function = sorted(token_by_function, key=lambda x: x['total_tokens'], reverse=True)
# Token usage by account (top consumers)
token_by_account = logs.values('account__name', 'account_id').annotate(
total_tokens_input=Sum('tokens_input'),
total_tokens_output=Sum('tokens_output'),
call_count=Count('id'),
total_cost=Sum('cost_usd_total')
).order_by('-total_tokens_input')[:15]
# Add total_tokens to each account
for account in token_by_account:
account['total_tokens'] = (account['total_tokens_input'] or 0) + (account['total_tokens_output'] or 0)
token_by_account = sorted(token_by_account, key=lambda x: x['total_tokens'], reverse=True)[:15]
# Daily token trends (time series)
daily_data = []
daily_labels = []
for i in range(days):
day = timezone.now().date() - timedelta(days=days-i-1)
day_logs = logs.filter(created_at__date=day)
day_tokens_input = day_logs.aggregate(total=Sum('tokens_input'))['total'] or 0
day_tokens_output = day_logs.aggregate(total=Sum('tokens_output'))['total'] or 0
day_tokens = day_tokens_input + day_tokens_output
daily_labels.append(day.strftime('%m/%d'))
daily_data.append(int(day_tokens))
# Token efficiency metrics
success_rate = 100.0
successful_tokens = total_tokens
wasted_tokens = 0
tokens_by_status = [{
'error': None,
'total_tokens': total_tokens,
'call_count': total_calls,
'avg_tokens': avg_tokens_per_call
}]
# Peak usage times (hour of day)
hourly_usage = logs.extra(
select={'hour': "EXTRACT(hour FROM created_at)"}
).values('hour').annotate(
token_input=Sum('tokens_input'),
token_output=Sum('tokens_output'),
call_count=Count('id')
).order_by('hour')
# Add total token_count for each hour
for hour_data in hourly_usage:
hour_data['token_count'] = (hour_data['token_input'] or 0) + (hour_data['token_output'] or 0)
# Cost efficiency
total_cost = logs.aggregate(total=Sum('cost_usd_total'))['total'] or Decimal('0.00')
cost_per_1k_tokens = float(total_cost) / (total_tokens / 1000) if total_tokens > 0 else 0.0
context = {
'title': 'Token Usage Report',
'days_filter': days,
'total_tokens': int(total_tokens),
'total_calls': total_calls,
'avg_tokens_per_call': round(avg_tokens_per_call, 2),
'token_by_model': list(token_by_model),
'token_by_function': list(token_by_function),
'token_by_account': list(token_by_account),
'daily_labels': json.dumps(daily_labels),
'daily_data': json.dumps(daily_data),
'tokens_by_status': list(tokens_by_status),
'success_rate': round(success_rate, 2),
'successful_tokens': int(successful_tokens),
'wasted_tokens': int(wasted_tokens),
'hourly_usage': list(hourly_usage),
'total_cost': float(total_cost),
'cost_per_1k_tokens': float(cost_per_1k_tokens),
'current_app': '_reports',
}
# Merge with admin context
from igny8_core.admin.site import admin_site
admin_context = admin_site.each_context(request)
context.update(admin_context)
return render(request, 'admin/reports/token_usage.html', context)
@staff_member_required
def ai_cost_analysis(request):
"""Multi-dimensional AI cost analysis with model pricing, trends, and predictions"""
from igny8_core.business.billing.models import CreditUsageLog, AIModelConfig
from igny8_core.auth.models import Account
from decimal import Decimal
# Date filter setup
days_filter = request.GET.get('days', '30')
try:
days = int(days_filter)
except ValueError:
days = 30
start_date = timezone.now() - timedelta(days=days)
# Base queryset - filter for records with cost data
logs = CreditUsageLog.objects.filter(
created_at__gte=start_date,
cost_usd_total__isnull=False
)
# Overall cost metrics
total_cost = logs.aggregate(total=Sum('cost_usd_total'))['total'] or Decimal('0.00')
total_calls = logs.count()
avg_cost_per_call = logs.aggregate(avg=Avg('cost_usd_total'))['avg'] or Decimal('0.00')
total_tokens_input = logs.aggregate(total=Sum('tokens_input'))['total'] or 0
total_tokens_output = logs.aggregate(total=Sum('tokens_output'))['total'] or 0
total_tokens = total_tokens_input + total_tokens_output
# Revenue & Margin calculation
total_credits_charged = logs.aggregate(total=Sum('credits_used'))['total'] or 0
# Average credit price (simplified - in reality would vary by plan)
avg_credit_price = Decimal('0.01') # $0.01 per credit default
total_revenue = Decimal(total_credits_charged) * avg_credit_price
total_margin = total_revenue - total_cost
margin_percentage = float((total_margin / total_revenue * 100) if total_revenue > 0 else 0)
# Per-unit margins
margin_per_1m_tokens = float(total_margin) / (total_tokens / 1_000_000) if total_tokens > 0 else 0
margin_per_1k_credits = float(total_margin) / (total_credits_charged / 1000) if total_credits_charged > 0 else 0
# Cost by model with efficiency metrics (using model_config FK)
cost_by_model = logs.filter(model_config__isnull=False).values(
'model_config__model_name',
'model_config__display_name'
).annotate(
total_cost=Sum('cost_usd_total'),
call_count=Count('id'),
avg_cost=Avg('cost_usd_total'),
total_tokens_input=Sum('tokens_input'),
total_tokens_output=Sum('tokens_output'),
total_credits=Sum('credits_used')
).order_by('-total_cost')
# Add cost efficiency and margin for each model
for model in cost_by_model:
model['total_tokens'] = (model['total_tokens_input'] or 0) + (model['total_tokens_output'] or 0)
model['avg_tokens'] = model['total_tokens'] / model['call_count'] if model['call_count'] > 0 else 0
model['model'] = model['model_config__display_name'] or model['model_config__model_name']
if model['total_tokens'] and model['total_tokens'] > 0:
model['cost_per_1k_tokens'] = float(model['total_cost']) / (model['total_tokens'] / 1000)
else:
model['cost_per_1k_tokens'] = 0
# Calculate margin for this model
model_revenue = Decimal(model['total_credits'] or 0) * avg_credit_price
model_margin = model_revenue - model['total_cost']
model['revenue'] = float(model_revenue)
model['margin'] = float(model_margin)
model['margin_percentage'] = float((model_margin / model_revenue * 100) if model_revenue > 0 else 0)
# Cost by account (top spenders)
cost_by_account = logs.values('account__name', 'account_id').annotate(
total_cost=Sum('cost_usd_total'),
call_count=Count('id'),
total_tokens_input=Sum('tokens_input'),
total_tokens_output=Sum('tokens_output'),
avg_cost=Avg('cost_usd_total')
).order_by('-total_cost')[:15]
# Add total_tokens to each account
for account in cost_by_account:
account['total_tokens'] = (account['total_tokens_input'] or 0) + (account['total_tokens_output'] or 0)
# Cost by function/operation
cost_by_function = logs.values('operation_type').annotate(
total_cost=Sum('cost_usd_total'),
call_count=Count('id'),
avg_cost=Avg('cost_usd_total'),
total_tokens_input=Sum('tokens_input'),
total_tokens_output=Sum('tokens_output'),
total_credits=Sum('credits_used')
).order_by('-total_cost')[:10]
# Add total_tokens, function alias, and margin
for func in cost_by_function:
func['total_tokens'] = (func['total_tokens_input'] or 0) + (func['total_tokens_output'] or 0)
func['function'] = func['operation_type'].replace('_', ' ').title() if func['operation_type'] else 'Unknown'
# Calculate margin for this operation
func_revenue = Decimal(func['total_credits'] or 0) * avg_credit_price
func_margin = func_revenue - func['total_cost']
func['revenue'] = float(func_revenue)
func['margin'] = float(func_margin)
func['margin_percentage'] = float((func_margin / func_revenue * 100) if func_revenue > 0 else 0)
# Daily cost trends (time series)
daily_cost_data = []
daily_cost_labels = []
daily_call_data = []
for i in range(days):
day = timezone.now().date() - timedelta(days=days-i-1)
day_logs = logs.filter(created_at__date=day)
day_cost = day_logs.aggregate(total=Sum('cost_usd_total'))['total'] or Decimal('0.00')
day_calls = day_logs.count()
daily_cost_labels.append(day.strftime('%m/%d'))
daily_cost_data.append(float(day_cost))
daily_call_data.append(day_calls)
# Cost prediction (simple linear extrapolation)
if len(daily_cost_data) > 7:
recent_avg_daily = sum(daily_cost_data[-7:]) / 7
projected_monthly = recent_avg_daily * 30
else:
projected_monthly = 0
# Cost anomalies (calls costing > 3x average)
failed_cost = Decimal('0.00')
if avg_cost_per_call > 0:
anomaly_threshold = float(avg_cost_per_call) * 3
anomalies = logs.filter(cost_usd_total__gt=anomaly_threshold).select_related('model_config').values(
'model_config__model_name',
'model_config__display_name',
'operation_type',
'account__name',
'cost_usd_total',
'tokens_input',
'tokens_output',
'created_at'
).order_by('-cost_usd_total')[:10]
# Add aliases for template
for anomaly in anomalies:
anomaly['model'] = anomaly['model_config__display_name'] or anomaly['model_config__model_name'] or 'Unknown'
anomaly['function'] = anomaly['operation_type'].replace('_', ' ').title() if anomaly['operation_type'] else 'Unknown'
anomaly['cost'] = anomaly['cost_usd_total']
anomaly['tokens'] = (anomaly['tokens_input'] or 0) + (anomaly['tokens_output'] or 0)
else:
anomalies = []
# Model comparison matrix
model_comparison = []
for model_data in cost_by_model:
model_comparison.append({
'model': model_data['model'],
'total_cost': float(model_data['total_cost']),
'calls': model_data['call_count'],
'avg_cost': float(model_data['avg_cost']),
'total_tokens': model_data['total_tokens'],
'cost_per_1k': model_data['cost_per_1k_tokens'],
})
# Cost distribution percentages
if total_cost > 0:
for item in cost_by_model:
item['cost_percentage'] = float((item['total_cost'] / total_cost) * 100)
# Peak cost hours
hourly_cost = logs.extra(
select={'hour': "EXTRACT(hour FROM created_at)"}
).values('hour').annotate(
total_cost=Sum('cost_usd_total'),
call_count=Count('id')
).order_by('hour')
# Cost efficiency score
successful_cost = total_cost
efficiency_score = 100.0
context = {
'title': 'AI Cost & Margin Analysis',
'days_filter': days,
'total_cost': float(total_cost),
'total_revenue': float(total_revenue),
'total_margin': float(total_margin),
'margin_percentage': round(margin_percentage, 2),
'margin_per_1m_tokens': round(margin_per_1m_tokens, 4),
'margin_per_1k_credits': round(margin_per_1k_credits, 4),
'total_credits_charged': total_credits_charged,
'credit_price': float(avg_credit_price),
'total_calls': total_calls,
'avg_cost_per_call': float(avg_cost_per_call),
'total_tokens': int(total_tokens),
'cost_by_model': list(cost_by_model),
'cost_by_account': list(cost_by_account),
'cost_by_function': list(cost_by_function),
'daily_cost_labels': json.dumps(daily_cost_labels),
'daily_cost_data': json.dumps(daily_cost_data),
'daily_call_data': json.dumps(daily_call_data),
'projected_monthly': round(projected_monthly, 2),
'failed_cost': float(failed_cost),
'wasted_percentage': float((failed_cost / total_cost * 100) if total_cost > 0 else 0),
'anomalies': list(anomalies),
'model_comparison': model_comparison,
'hourly_cost': list(hourly_cost),
'efficiency_score': round(efficiency_score, 2),
'successful_cost': float(successful_cost),
'current_app': '_reports',
}
# Merge with admin context
from igny8_core.admin.site import admin_site
admin_context = admin_site.each_context(request)
context.update(admin_context)
return render(request, 'admin/reports/ai_cost_analysis.html', context)