Files
igny8/backend/igny8_core/auth/admin.py
2025-12-17 05:58:13 +00:00

623 lines
25 KiB
Python

"""
Admin interface for auth models
"""
from django import forms
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from unfold.admin import ModelAdmin, TabularInline
from simple_history.admin import SimpleHistoryAdmin
from igny8_core.admin.base import AccountAdminMixin, Igny8ModelAdmin
from .models import User, Account, Plan, Subscription, Site, Sector, SiteUserAccess, Industry, IndustrySector, SeedKeyword, PasswordResetToken
from import_export.admin import ExportMixin
from import_export import resources
class AccountAdminForm(forms.ModelForm):
"""Custom form for Account admin with dynamic payment method choices from PaymentMethodConfig"""
class Meta:
model = Account
fields = '__all__'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
from igny8_core.business.billing.models import PaymentMethodConfig, AccountPaymentMethod
if self.instance and self.instance.pk:
# Get country from billing_country, fallback to wildcard '*' for global
country = self.instance.billing_country or '*'
# Get enabled payment methods for this country OR global (*)
available_methods = PaymentMethodConfig.objects.filter(
country_code__in=[country, '*'],
is_enabled=True
).order_by('country_code', 'sort_order').values_list('payment_method', 'display_name')
if available_methods:
# Build choices from PaymentMethodConfig
choices = []
seen = set()
for method_type, display_name in available_methods:
if method_type not in seen:
choices.append((method_type, display_name or method_type.replace('_', ' ').title()))
seen.add(method_type)
else:
# Fallback to model choices if no configs
choices = Account.PAYMENT_METHOD_CHOICES
self.fields['payment_method'].widget = forms.Select(choices=choices)
# Get current default from AccountPaymentMethod
default_method = AccountPaymentMethod.objects.filter(
account=self.instance,
is_default=True,
is_enabled=True
).first()
if default_method:
self.fields['payment_method'].initial = default_method.type
self.fields['payment_method'].help_text = f'✓ Current: {default_method.display_name} ({default_method.get_type_display()})'
else:
self.fields['payment_method'].help_text = 'Select from available payment methods based on country'
def save(self, commit=True):
"""When payment_method changes, update/create AccountPaymentMethod"""
from igny8_core.business.billing.models import AccountPaymentMethod, PaymentMethodConfig
instance = super().save(commit=False)
if commit:
instance.save()
# Get selected payment method
selected_type = self.cleaned_data.get('payment_method')
if selected_type:
# Get config for display name and instructions
country = instance.billing_country or '*'
config = PaymentMethodConfig.objects.filter(
country_code__in=[country, '*'],
payment_method=selected_type,
is_enabled=True
).first()
# Create or update AccountPaymentMethod
account_method, created = AccountPaymentMethod.objects.get_or_create(
account=instance,
type=selected_type,
defaults={
'display_name': config.display_name if config else selected_type.replace('_', ' ').title(),
'is_default': True,
'is_enabled': True,
'instructions': config.instructions if config else '',
'country_code': instance.billing_country or '',
}
)
if not created:
# Update existing and set as default
account_method.is_default = True
account_method.is_enabled = True
if config:
account_method.display_name = config.display_name
account_method.instructions = config.instructions
account_method.save()
# Unset other methods as default
AccountPaymentMethod.objects.filter(
account=instance
).exclude(id=account_method.id).update(is_default=False)
return instance
@admin.register(Plan)
class PlanAdmin(Igny8ModelAdmin):
"""Plan admin - Global, no account filtering needed"""
list_display = ['name', 'slug', 'price', 'billing_cycle', 'max_sites', 'max_users', 'max_keywords', 'max_content_words', 'included_credits', 'is_active', 'is_featured']
list_filter = ['is_active', 'billing_cycle', 'is_internal', 'is_featured']
search_fields = ['name', 'slug']
readonly_fields = ['created_at']
fieldsets = (
('Plan Info', {
'fields': ('name', 'slug', 'price', 'original_price', 'annual_discount_percent', 'billing_cycle', 'features', 'is_active', 'is_featured', 'is_internal'),
'description': 'Price: Current price | Original Price: Crossed-out price (optional) | Annual Discount %: For annual billing | Is Featured: Show as popular/recommended plan'
}),
('Account Management Limits', {
'fields': ('max_users', 'max_sites', 'max_industries', 'max_author_profiles'),
'description': 'Persistent limits for account-level resources'
}),
('Hard Limits (Persistent)', {
'fields': ('max_keywords', 'max_clusters'),
'description': 'Total allowed - never reset'
}),
('Monthly Limits (Reset on Billing Cycle)', {
'fields': ('max_content_ideas', 'max_content_words', 'max_images_basic', 'max_images_premium', 'max_image_prompts'),
'description': 'Monthly allowances - reset at billing cycle'
}),
('Billing & Credits', {
'fields': ('included_credits', 'extra_credit_price', 'allow_credit_topup', 'auto_credit_topup_threshold', 'auto_credit_topup_amount', 'credits_per_month')
}),
('Stripe Integration', {
'fields': ('stripe_product_id', 'stripe_price_id')
}),
)
class AccountResource(resources.ModelResource):
"""Resource class for exporting Accounts"""
class Meta:
model = Account
fields = ('id', 'name', 'slug', 'owner__email', 'plan__name', 'status',
'credits', 'billing_country', 'created_at', 'updated_at')
export_order = fields
@admin.register(Account)
class AccountAdmin(ExportMixin, AccountAdminMixin, SimpleHistoryAdmin, Igny8ModelAdmin):
resource_class = AccountResource
form = AccountAdminForm
list_display = ['name', 'slug', 'owner', 'plan', 'status', 'health_indicator', 'credits', 'created_at']
list_filter = ['status', 'plan']
search_fields = ['name', 'slug']
readonly_fields = ['created_at', 'updated_at', 'health_indicator', 'health_details']
def get_queryset(self, request):
"""Override to filter by account for non-superusers"""
qs = super().get_queryset(request)
if request.user.is_superuser or (hasattr(request.user, 'is_developer') and request.user.is_developer()):
return qs
# Owners can see their own accounts
if hasattr(request.user, 'role') and request.user.role == 'owner':
return qs.filter(owner=request.user)
# Admins can see their account
try:
user_account = getattr(request.user, 'account', None)
if user_account:
return qs.filter(id=user_account.id)
except (AttributeError, Exception):
# If account access fails (e.g., column mismatch), return empty
pass
return qs.none()
def health_indicator(self, obj):
"""Display health status with visual indicator"""
from django.utils.html import format_html
from django.utils import timezone
from datetime import timedelta
# Check credits
if obj.credits < 10:
status = 'critical'
message = 'Critical: Very low credits'
elif obj.credits < 100:
status = 'warning'
message = 'Warning: Low credits'
else:
status = 'good'
message = 'Good'
# Check for recent failed automations
try:
from igny8_core.business.automation.models import AutomationRun
week_ago = timezone.now() - timedelta(days=7)
failed_runs = AutomationRun.objects.filter(
account=obj,
status='failed',
created_at__gte=week_ago
).count()
if failed_runs > 5:
status = 'critical'
message = f'Critical: {failed_runs} automation failures'
elif failed_runs > 0:
if status == 'good':
status = 'warning'
message = f'Warning: {failed_runs} automation failures'
except:
pass
# Check account status
if obj.status != 'active':
status = 'critical'
message = f'Critical: Account {obj.status}'
colors = {
'good': '#0bbf87',
'warning': '#ff7a00',
'critical': '#ef4444'
}
return format_html(
'<span style="display: inline-flex; align-items: center; padding: 4px 12px; border-radius: 6px; background-color: {}; color: white; font-weight: 500; font-size: 13px;">{}</span>',
colors[status], message
)
health_indicator.short_description = 'Health'
def health_details(self, obj):
"""Detailed health information"""
from django.utils.html import format_html
from django.utils import timezone
from datetime import timedelta
details = []
# Credits status
colors = {
'critical': '#ef4444',
'warning': '#ff7a00',
'good': '#0bbf87'
}
if obj.credits < 10:
details.append(f'<span style="color: {colors["critical"]}; font-weight: 600;">Critical:</span> Only {obj.credits} credits remaining')
elif obj.credits < 100:
details.append(f'<span style="color: {colors["warning"]}; font-weight: 600;">Warning:</span> Only {obj.credits} credits remaining')
else:
details.append(f'<span style="color: {colors["good"]}; font-weight: 600;">Credits:</span> {obj.credits} available')
# Recent activity
try:
from igny8_core.modules.writer.models import Content
week_ago = timezone.now() - timedelta(days=7)
recent_content = Content.objects.filter(
site__account=obj,
created_at__gte=week_ago
).count()
details.append(f'📚 <b>Activity:</b> {recent_content} content pieces created this week')
except:
pass
# Failed automations
try:
from igny8_core.business.automation.models import AutomationRun
week_ago = timezone.now() - timedelta(days=7)
failed_runs = AutomationRun.objects.filter(
account=obj,
status='failed',
created_at__gte=week_ago
).count()
if failed_runs > 0:
details.append(f'🔴 <b>Automations:</b> {failed_runs} failures this week')
else:
details.append(f'✅ <b>Automations:</b> No failures this week')
except:
pass
# Failed syncs
try:
from igny8_core.business.integration.models import SyncEvent
today = timezone.now().date()
failed_syncs = SyncEvent.objects.filter(
site__account=obj,
success=False,
created_at__date=today
).count()
if failed_syncs > 0:
details.append(f'⚠️ <b>Syncs:</b> {failed_syncs} failures today')
else:
details.append(f'✅ <b>Syncs:</b> No failures today')
except:
pass
# Account status
if obj.status == 'active':
details.append(f'✅ <b>Status:</b> Active')
else:
details.append(f'🔴 <b>Status:</b> {obj.status.title()}')
return format_html('<br>'.join(details))
health_details.short_description = 'Health Details'
def has_delete_permission(self, request, obj=None):
if obj and getattr(obj, 'slug', '') == 'aws-admin':
return False
return super().has_delete_permission(request, obj)
@admin.register(Subscription)
class SubscriptionAdmin(AccountAdminMixin, Igny8ModelAdmin):
list_display = ['account', 'status', 'current_period_start', 'current_period_end']
list_filter = ['status']
search_fields = ['account__name', 'stripe_subscription_id']
readonly_fields = ['created_at', 'updated_at']
@admin.register(PasswordResetToken)
class PasswordResetTokenAdmin(Igny8ModelAdmin):
list_display = ['user', 'token', 'used', 'expires_at', 'created_at']
list_filter = ['used', 'expires_at', 'created_at']
search_fields = ['user__email', 'token']
readonly_fields = ['created_at', 'token']
def get_queryset(self, request):
"""Filter by account for non-superusers"""
qs = super().get_queryset(request)
if request.user.is_superuser or (hasattr(request.user, 'is_developer') and request.user.is_developer()):
return qs
user_account = getattr(request.user, 'account', None)
if user_account:
return qs.filter(user__account=user_account)
return qs.none()
class SectorInline(TabularInline):
"""Inline admin for sectors within Site admin."""
model = Sector
extra = 0
fields = ['industry_sector', 'name', 'slug', 'status', 'is_active', 'get_keywords_count', 'get_clusters_count']
readonly_fields = ['get_keywords_count', 'get_clusters_count']
def get_keywords_count(self, obj):
if obj.pk:
try:
return obj.keywords_set.count()
except (AttributeError, Exception):
return 0
return 0
get_keywords_count.short_description = 'Keywords'
def get_clusters_count(self, obj):
if obj.pk:
try:
return obj.clusters_set.count()
except (AttributeError, Exception):
return 0
return 0
get_clusters_count.short_description = 'Clusters'
class SiteResource(resources.ModelResource):
"""Resource class for exporting Sites"""
class Meta:
model = Site
fields = ('id', 'name', 'slug', 'account__name', 'industry__name', 'domain',
'status', 'is_active', 'site_type', 'hosting_type', 'created_at')
export_order = fields
@admin.register(Site)
class SiteAdmin(ExportMixin, AccountAdminMixin, Igny8ModelAdmin):
resource_class = SiteResource
list_display = ['name', 'slug', 'account', 'industry', 'domain', 'status', 'is_active', 'get_api_key_status', 'get_sectors_count']
list_filter = ['status', 'is_active', 'account', 'industry', 'hosting_type']
search_fields = ['name', 'slug', 'domain', 'industry__name']
readonly_fields = ['created_at', 'updated_at', 'get_api_key_display']
inlines = [SectorInline]
actions = ['generate_api_keys']
fieldsets = (
('Site Info', {
'fields': ('name', 'slug', 'account', 'domain', 'description', 'industry', 'site_type', 'hosting_type', 'status', 'is_active')
}),
('WordPress Integration', {
'fields': ('get_api_key_display',),
'description': 'WordPress integration API key. Use SiteIntegration model for full integration settings.'
}),
('SEO Metadata', {
'fields': ('seo_metadata',),
'classes': ('collapse',)
}),
('Timestamps', {
'fields': ('created_at', 'updated_at'),
'classes': ('collapse',)
}),
)
def get_api_key_display(self, obj):
"""Display API key with copy button"""
from django.utils.html import format_html
if obj.wp_api_key:
return format_html(
'<div style="display:flex; align-items:center; gap:10px;">'
'<code style="background:#f0f0f0; padding:5px 10px; border-radius:3px;">{}</code>'
'<button type="button" onclick="navigator.clipboard.writeText(\'{}\'); alert(\'API Key copied to clipboard!\');" '
'style="padding:5px 10px; cursor:pointer;">Copy</button>'
'</div>',
obj.wp_api_key,
obj.wp_api_key
)
return format_html('<em>No API key generated</em>')
get_api_key_display.short_description = 'WordPress API Key'
def get_api_key_status(self, obj):
"""Show API key status in list view"""
from django.utils.html import format_html
if obj.wp_api_key:
return format_html('<span style="color:green;">●</span> Active')
return format_html('<span style="color:gray;">○</span> None')
get_api_key_status.short_description = 'API Key'
def generate_api_keys(self, request, queryset):
"""Generate API keys for selected sites"""
import secrets
updated_count = 0
for site in queryset:
if not site.wp_api_key:
site.wp_api_key = f"igny8_{''.join(secrets.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(40))}"
site.save()
updated_count += 1
self.message_user(request, f'Generated API keys for {updated_count} site(s). Sites with existing keys were skipped.')
generate_api_keys.short_description = 'Generate WordPress API Keys'
def get_sectors_count(self, obj):
try:
return obj.get_active_sectors_count()
except:
return 0
get_sectors_count.short_description = 'Active Sectors'
def get_industry_display(self, obj):
"""Safely get industry name"""
try:
return obj.industry.name if obj.industry else '-'
except:
return '-'
get_industry_display.short_description = 'Industry'
@admin.register(Sector)
class SectorAdmin(AccountAdminMixin, Igny8ModelAdmin):
list_display = ['name', 'slug', 'site', 'industry_sector', 'get_industry', 'status', 'is_active', 'get_keywords_count', 'get_clusters_count']
list_filter = ['status', 'is_active', 'site', 'industry_sector__industry']
search_fields = ['name', 'slug', 'site__name', 'industry_sector__name']
readonly_fields = ['created_at', 'updated_at']
def get_industry(self, obj):
"""Safely get industry name"""
try:
if obj.industry_sector and obj.industry_sector.industry:
return obj.industry_sector.industry.name
except:
pass
return '-'
get_industry.short_description = 'Industry'
def get_keywords_count(self, obj):
"""Safely get keywords count"""
try:
if obj.pk:
return getattr(obj, 'keywords_set', obj.keywords_set).count()
except:
pass
return 0
get_keywords_count.short_description = 'Keywords'
def get_clusters_count(self, obj):
"""Safely get clusters count"""
try:
if obj.pk:
return getattr(obj, 'clusters_set', obj.clusters_set).count()
except:
pass
return 0
get_clusters_count.short_description = 'Clusters'
@admin.register(SiteUserAccess)
class SiteUserAccessAdmin(Igny8ModelAdmin):
list_display = ['user', 'site', 'granted_at', 'granted_by']
list_filter = ['granted_at']
search_fields = ['user__email', 'site__name']
readonly_fields = ['granted_at']
class IndustrySectorInline(TabularInline):
"""Inline admin for industry sectors within Industry admin."""
model = IndustrySector
extra = 0
fields = ['name', 'slug', 'description', 'is_active']
readonly_fields = []
@admin.register(Industry)
class IndustryAdmin(Igny8ModelAdmin):
list_display = ['name', 'slug', 'is_active', 'get_sectors_count', 'created_at']
list_filter = ['is_active']
search_fields = ['name', 'slug', 'description']
readonly_fields = ['created_at', 'updated_at']
inlines = [IndustrySectorInline]
actions = ['delete_selected'] # Enable bulk delete
def get_sectors_count(self, obj):
return obj.sectors.filter(is_active=True).count()
get_sectors_count.short_description = 'Active Sectors'
def has_delete_permission(self, request, obj=None):
"""Allow deletion for superusers and developers"""
return request.user.is_superuser or (hasattr(request.user, 'is_developer') and request.user.is_developer())
@admin.register(IndustrySector)
class IndustrySectorAdmin(Igny8ModelAdmin):
list_display = ['name', 'slug', 'industry', 'is_active']
list_filter = ['is_active', 'industry']
search_fields = ['name', 'slug', 'description']
readonly_fields = ['created_at', 'updated_at']
actions = ['delete_selected'] # Enable bulk delete
def has_delete_permission(self, request, obj=None):
"""Allow deletion for superusers and developers"""
return request.user.is_superuser or (hasattr(request.user, 'is_developer') and request.user.is_developer())
@admin.register(SeedKeyword)
class SeedKeywordAdmin(Igny8ModelAdmin):
"""SeedKeyword admin - Global reference data, no account filtering"""
list_display = ['keyword', 'industry', 'sector', 'volume', 'difficulty', 'intent', 'is_active', 'created_at']
list_filter = ['is_active', 'industry', 'sector', 'intent']
search_fields = ['keyword']
readonly_fields = ['created_at', 'updated_at']
actions = ['delete_selected'] # Enable bulk delete
fieldsets = (
('Keyword Info', {
'fields': ('keyword', 'industry', 'sector', 'is_active')
}),
('SEO Metrics', {
'fields': ('volume', 'difficulty', 'intent')
}),
('Timestamps', {
'fields': ('created_at', 'updated_at')
}),
)
def has_delete_permission(self, request, obj=None):
"""Allow deletion for superusers and developers"""
return request.user.is_superuser or (hasattr(request.user, 'is_developer') and request.user.is_developer())
class UserResource(resources.ModelResource):
"""Resource class for exporting Users"""
class Meta:
model = User
fields = ('id', 'email', 'username', 'account__name', 'role',
'is_active', 'is_staff', 'created_at', 'last_login')
export_order = fields
@admin.register(User)
class UserAdmin(ExportMixin, BaseUserAdmin, Igny8ModelAdmin):
"""
User admin using both Django's BaseUserAdmin (for user-specific functionality)
and Unfold's ModelAdmin (for modern UI and styling including popups)
"""
resource_class = UserResource
list_display = ['email', 'username', 'account', 'role', 'is_active', 'is_staff', 'created_at']
list_filter = ['role', 'account', 'is_active', 'is_staff']
search_fields = ['email', 'username']
readonly_fields = ['created_at', 'updated_at']
fieldsets = BaseUserAdmin.fieldsets + (
('IGNY8 Info', {'fields': ('account', 'role')}),
('Timestamps', {'fields': ('created_at', 'updated_at')}),
)
add_fieldsets = BaseUserAdmin.add_fieldsets + (
('IGNY8 Info', {'fields': ('account', 'role')}),
)
def get_queryset(self, request):
"""Filter users by account for non-superusers"""
qs = super().get_queryset(request)
if request.user.is_superuser or (hasattr(request.user, 'is_developer') and request.user.is_developer()):
return qs
user_account = getattr(request.user, 'account', None)
if user_account:
return qs.filter(account=user_account)
return qs.none()
def get_account_display(self, obj):
"""Safely get account name"""
try:
account = getattr(obj, 'account', None)
return account.name if account else '-'
except:
return '-'
get_account_display.short_description = 'Account'