Files
igny8/backend/igny8_core/api/account_views.py
IGNY8 VPS (Salman) 1e718105f2 billing admin account 1
2025-12-05 08:01:55 +00:00

232 lines
8.0 KiB
Python

"""
Account Management API Views
Handles account settings, team management, and usage analytics
"""
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from django.contrib.auth import get_user_model
from django.db.models import Q, Count, Sum
from django.utils import timezone
from datetime import timedelta
from igny8_core.auth.models import Account
from igny8_core.business.billing.models import CreditTransaction
User = get_user_model()
class AccountSettingsViewSet(viewsets.ViewSet):
"""Account settings management"""
permission_classes = [IsAuthenticated]
def retrieve(self, request):
"""Get account settings"""
account = request.user.account
return Response({
'id': account.id,
'name': account.name,
'slug': account.slug,
'billing_address_line1': account.billing_address_line1 or '',
'billing_address_line2': account.billing_address_line2 or '',
'billing_city': account.billing_city or '',
'billing_state': account.billing_state or '',
'billing_postal_code': account.billing_postal_code or '',
'billing_country': account.billing_country or '',
'tax_id': account.tax_id or '',
'billing_email': account.billing_email or '',
'credits': account.credits,
'created_at': account.created_at.isoformat(),
'updated_at': account.updated_at.isoformat(),
})
def partial_update(self, request):
"""Update account settings"""
account = request.user.account
# Update allowed fields
allowed_fields = [
'name', 'billing_address_line1', 'billing_address_line2',
'billing_city', 'billing_state', 'billing_postal_code',
'billing_country', 'tax_id', 'billing_email'
]
for field in allowed_fields:
if field in request.data:
setattr(account, field, request.data[field])
account.save()
return Response({
'message': 'Account settings updated successfully',
'account': {
'id': account.id,
'name': account.name,
'slug': account.slug,
'billing_address_line1': account.billing_address_line1,
'billing_address_line2': account.billing_address_line2,
'billing_city': account.billing_city,
'billing_state': account.billing_state,
'billing_postal_code': account.billing_postal_code,
'billing_country': account.billing_country,
'tax_id': account.tax_id,
'billing_email': account.billing_email,
}
})
class TeamManagementViewSet(viewsets.ViewSet):
"""Team members management"""
permission_classes = [IsAuthenticated]
def list(self, request):
"""List team members"""
account = request.user.account
users = User.objects.filter(account=account)
return Response({
'results': [
{
'id': user.id,
'email': user.email,
'first_name': user.first_name,
'last_name': user.last_name,
'is_active': user.is_active,
'is_staff': user.is_staff,
'date_joined': user.date_joined.isoformat(),
'last_login': user.last_login.isoformat() if user.last_login else None,
}
for user in users
],
'count': users.count()
})
def create(self, request):
"""Invite new team member"""
account = request.user.account
email = request.data.get('email')
if not email:
return Response(
{'error': 'Email is required'},
status=status.HTTP_400_BAD_REQUEST
)
# Check if user already exists
if User.objects.filter(email=email).exists():
return Response(
{'error': 'User with this email already exists'},
status=status.HTTP_400_BAD_REQUEST
)
# Create user (simplified - in production, send invitation email)
user = User.objects.create_user(
email=email,
first_name=request.data.get('first_name', ''),
last_name=request.data.get('last_name', ''),
account=account
)
return Response({
'message': 'Team member invited successfully',
'user': {
'id': user.id,
'email': user.email,
'first_name': user.first_name,
'last_name': user.last_name,
}
}, status=status.HTTP_201_CREATED)
def destroy(self, request, pk=None):
"""Remove team member"""
account = request.user.account
try:
user = User.objects.get(id=pk, account=account)
# Prevent removing yourself
if user.id == request.user.id:
return Response(
{'error': 'Cannot remove yourself'},
status=status.HTTP_400_BAD_REQUEST
)
user.is_active = False
user.save()
return Response({
'message': 'Team member removed successfully'
})
except User.DoesNotExist:
return Response(
{'error': 'User not found'},
status=status.HTTP_404_NOT_FOUND
)
class UsageAnalyticsViewSet(viewsets.ViewSet):
"""Usage analytics and statistics"""
permission_classes = [IsAuthenticated]
@action(detail=False, methods=['get'])
def overview(self, request):
"""Get usage analytics overview"""
account = request.user.account
# Get date range (default: last 30 days)
days = int(request.query_params.get('days', 30))
start_date = timezone.now() - timedelta(days=days)
# Get transactions in period
transactions = CreditTransaction.objects.filter(
account=account,
created_at__gte=start_date
)
# Calculate totals by type
usage_by_type = transactions.filter(
amount__lt=0
).values('transaction_type').annotate(
total=Sum('amount'),
count=Count('id')
)
purchases_by_type = transactions.filter(
amount__gt=0
).values('transaction_type').annotate(
total=Sum('amount'),
count=Count('id')
)
# Daily usage
daily_usage = []
for i in range(days):
date = start_date + timedelta(days=i)
day_txns = transactions.filter(
created_at__date=date.date()
)
usage = day_txns.filter(amount__lt=0).aggregate(Sum('amount'))['amount__sum'] or 0
purchases = day_txns.filter(amount__gt=0).aggregate(Sum('amount'))['amount__sum'] or 0
daily_usage.append({
'date': date.date().isoformat(),
'usage': abs(usage),
'purchases': purchases,
'net': purchases + usage
})
return Response({
'period_days': days,
'start_date': start_date.isoformat(),
'end_date': timezone.now().isoformat(),
'current_balance': account.credits,
'usage_by_type': list(usage_by_type),
'purchases_by_type': list(purchases_by_type),
'daily_usage': daily_usage,
'total_usage': abs(transactions.filter(amount__lt=0).aggregate(Sum('amount'))['amount__sum'] or 0),
'total_purchases': transactions.filter(amount__gt=0).aggregate(Sum('amount'))['amount__sum'] or 0,
})