Files
igny8/backend/igny8_core/auth/admin.py
IGNY8 VPS (Salman) eb88a0e12d django phase2.3.4.
2025-12-14 15:10:41 +00:00

582 lines
23 KiB
Python

"""
Admin interface for auth models
"""
from django import forms
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from igny8_core.admin.base import AccountAdminMixin
from .models import User, Account, Plan, Subscription, Site, Sector, SiteUserAccess, Industry, IndustrySector, SeedKeyword, PasswordResetToken
class AccountAdminForm(forms.ModelForm):
"""Custom form for Account admin with dynamic payment method choices from PaymentMethodConfig"""
class Meta:
model = Account
fields = '__all__'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
from igny8_core.business.billing.models import PaymentMethodConfig, AccountPaymentMethod
if self.instance and self.instance.pk:
# Get country from billing_country, fallback to wildcard '*' for global
country = self.instance.billing_country or '*'
# Get enabled payment methods for this country OR global (*)
available_methods = PaymentMethodConfig.objects.filter(
country_code__in=[country, '*'],
is_enabled=True
).order_by('country_code', 'sort_order').values_list('payment_method', 'display_name')
if available_methods:
# Build choices from PaymentMethodConfig
choices = []
seen = set()
for method_type, display_name in available_methods:
if method_type not in seen:
choices.append((method_type, display_name or method_type.replace('_', ' ').title()))
seen.add(method_type)
else:
# Fallback to model choices if no configs
choices = Account.PAYMENT_METHOD_CHOICES
self.fields['payment_method'].widget = forms.Select(choices=choices)
# Get current default from AccountPaymentMethod
default_method = AccountPaymentMethod.objects.filter(
account=self.instance,
is_default=True,
is_enabled=True
).first()
if default_method:
self.fields['payment_method'].initial = default_method.type
self.fields['payment_method'].help_text = f'✓ Current: {default_method.display_name} ({default_method.get_type_display()})'
else:
self.fields['payment_method'].help_text = 'Select from available payment methods based on country'
def save(self, commit=True):
"""When payment_method changes, update/create AccountPaymentMethod"""
from igny8_core.business.billing.models import AccountPaymentMethod, PaymentMethodConfig
instance = super().save(commit=False)
if commit:
instance.save()
# Get selected payment method
selected_type = self.cleaned_data.get('payment_method')
if selected_type:
# Get config for display name and instructions
country = instance.billing_country or '*'
config = PaymentMethodConfig.objects.filter(
country_code__in=[country, '*'],
payment_method=selected_type,
is_enabled=True
).first()
# Create or update AccountPaymentMethod
account_method, created = AccountPaymentMethod.objects.get_or_create(
account=instance,
type=selected_type,
defaults={
'display_name': config.display_name if config else selected_type.replace('_', ' ').title(),
'is_default': True,
'is_enabled': True,
'instructions': config.instructions if config else '',
'country_code': instance.billing_country or '',
}
)
if not created:
# Update existing and set as default
account_method.is_default = True
account_method.is_enabled = True
if config:
account_method.display_name = config.display_name
account_method.instructions = config.instructions
account_method.save()
# Unset other methods as default
AccountPaymentMethod.objects.filter(
account=instance
).exclude(id=account_method.id).update(is_default=False)
return instance
@admin.register(Plan)
class PlanAdmin(admin.ModelAdmin):
"""Plan admin - Global, no account filtering needed"""
list_display = ['name', 'slug', 'price', 'billing_cycle', 'max_sites', 'max_users', 'max_keywords', 'max_content_words', 'included_credits', 'is_active', 'is_featured']
list_filter = ['is_active', 'billing_cycle', 'is_internal', 'is_featured']
search_fields = ['name', 'slug']
readonly_fields = ['created_at']
fieldsets = (
('Plan Info', {
'fields': ('name', 'slug', 'price', 'original_price', 'annual_discount_percent', 'billing_cycle', 'features', 'is_active', 'is_featured', 'is_internal'),
'description': 'Price: Current price | Original Price: Crossed-out price (optional) | Annual Discount %: For annual billing | Is Featured: Show as popular/recommended plan'
}),
('Account Management Limits', {
'fields': ('max_users', 'max_sites', 'max_industries', 'max_author_profiles'),
'description': 'Persistent limits for account-level resources'
}),
('Hard Limits (Persistent)', {
'fields': ('max_keywords', 'max_clusters'),
'description': 'Total allowed - never reset'
}),
('Monthly Limits (Reset on Billing Cycle)', {
'fields': ('max_content_ideas', 'max_content_words', 'max_images_basic', 'max_images_premium', 'max_image_prompts'),
'description': 'Monthly allowances - reset at billing cycle'
}),
('Billing & Credits', {
'fields': ('included_credits', 'extra_credit_price', 'allow_credit_topup', 'auto_credit_topup_threshold', 'auto_credit_topup_amount', 'credits_per_month')
}),
('Stripe Integration', {
'fields': ('stripe_product_id', 'stripe_price_id')
}),
)
@admin.register(Account)
class AccountAdmin(AccountAdminMixin, admin.ModelAdmin):
form = AccountAdminForm
list_display = ['name', 'slug', 'owner', 'plan', 'status', 'health_indicator', 'credits', 'created_at']
list_filter = ['status', 'plan']
search_fields = ['name', 'slug']
readonly_fields = ['created_at', 'updated_at', 'health_indicator', 'health_details']
def get_queryset(self, request):
"""Override to filter by account for non-superusers"""
qs = super().get_queryset(request)
if request.user.is_superuser or (hasattr(request.user, 'is_developer') and request.user.is_developer()):
return qs
# Owners can see their own accounts
if hasattr(request.user, 'role') and request.user.role == 'owner':
return qs.filter(owner=request.user)
# Admins can see their account
try:
user_account = getattr(request.user, 'account', None)
if user_account:
return qs.filter(id=user_account.id)
except (AttributeError, Exception):
# If account access fails (e.g., column mismatch), return empty
pass
return qs.none()
def health_indicator(self, obj):
"""Display health status with visual indicator"""
from django.utils.html import format_html
from django.utils import timezone
from datetime import timedelta
# Check credits
if obj.credits < 10:
status = 'critical'
icon = '🔴'
message = 'Critical: Very low credits'
elif obj.credits < 100:
status = 'warning'
icon = '⚠️'
message = 'Warning: Low credits'
else:
status = 'good'
icon = ''
message = 'Good'
# Check for recent failed automations
try:
from igny8_core.business.automation.models import AutomationRun
week_ago = timezone.now() - timedelta(days=7)
failed_runs = AutomationRun.objects.filter(
account=obj,
status='failed',
created_at__gte=week_ago
).count()
if failed_runs > 5:
status = 'critical'
icon = '🔴'
message = f'Critical: {failed_runs} automation failures'
elif failed_runs > 0:
if status == 'good':
status = 'warning'
icon = '⚠️'
message = f'Warning: {failed_runs} automation failures'
except:
pass
# Check account status
if obj.status != 'active':
status = 'critical'
icon = '🔴'
message = f'Critical: Account {obj.status}'
colors = {
'good': '#0bbf87',
'warning': '#ff7a00',
'critical': '#ef4444'
}
return format_html(
'<span style="font-size: 16px;">{}</span> <span style="color: {}; font-weight: 600;">{}</span>',
icon, colors[status], message
)
health_indicator.short_description = 'Health'
def health_details(self, obj):
"""Detailed health information"""
from django.utils.html import format_html
from django.utils import timezone
from datetime import timedelta
details = []
# Credits status
if obj.credits < 10:
details.append(f'🔴 <b>Critical:</b> Only {obj.credits} credits remaining')
elif obj.credits < 100:
details.append(f'⚠️ <b>Warning:</b> Only {obj.credits} credits remaining')
else:
details.append(f'✅ <b>Credits:</b> {obj.credits} available')
# Recent activity
try:
from igny8_core.modules.writer.models import Content
week_ago = timezone.now() - timedelta(days=7)
recent_content = Content.objects.filter(
site__account=obj,
created_at__gte=week_ago
).count()
details.append(f'📚 <b>Activity:</b> {recent_content} content pieces created this week')
except:
pass
# Failed automations
try:
from igny8_core.business.automation.models import AutomationRun
week_ago = timezone.now() - timedelta(days=7)
failed_runs = AutomationRun.objects.filter(
account=obj,
status='failed',
created_at__gte=week_ago
).count()
if failed_runs > 0:
details.append(f'🔴 <b>Automations:</b> {failed_runs} failures this week')
else:
details.append(f'✅ <b>Automations:</b> No failures this week')
except:
pass
# Failed syncs
try:
from igny8_core.business.integration.models import SyncEvent
today = timezone.now().date()
failed_syncs = SyncEvent.objects.filter(
site__account=obj,
success=False,
created_at__date=today
).count()
if failed_syncs > 0:
details.append(f'⚠️ <b>Syncs:</b> {failed_syncs} failures today')
else:
details.append(f'✅ <b>Syncs:</b> No failures today')
except:
pass
# Account status
if obj.status == 'active':
details.append(f'✅ <b>Status:</b> Active')
else:
details.append(f'🔴 <b>Status:</b> {obj.status.title()}')
return format_html('<br>'.join(details))
health_details.short_description = 'Health Details'
def has_delete_permission(self, request, obj=None):
if obj and getattr(obj, 'slug', '') == 'aws-admin':
return False
return super().has_delete_permission(request, obj)
@admin.register(Subscription)
class SubscriptionAdmin(AccountAdminMixin, admin.ModelAdmin):
list_display = ['account', 'status', 'current_period_start', 'current_period_end']
list_filter = ['status']
search_fields = ['account__name', 'stripe_subscription_id']
readonly_fields = ['created_at', 'updated_at']
@admin.register(PasswordResetToken)
class PasswordResetTokenAdmin(admin.ModelAdmin):
list_display = ['user', 'token', 'used', 'expires_at', 'created_at']
list_filter = ['used', 'expires_at', 'created_at']
search_fields = ['user__email', 'token']
readonly_fields = ['created_at', 'token']
def get_queryset(self, request):
"""Filter by account for non-superusers"""
qs = super().get_queryset(request)
if request.user.is_superuser or (hasattr(request.user, 'is_developer') and request.user.is_developer()):
return qs
user_account = getattr(request.user, 'account', None)
if user_account:
return qs.filter(user__account=user_account)
return qs.none()
class SectorInline(admin.TabularInline):
"""Inline admin for sectors within Site admin."""
model = Sector
extra = 0
fields = ['industry_sector', 'name', 'slug', 'status', 'is_active', 'get_keywords_count', 'get_clusters_count']
readonly_fields = ['get_keywords_count', 'get_clusters_count']
def get_keywords_count(self, obj):
if obj.pk:
return getattr(obj, 'keywords_set', obj.keywords_set).count()
return 0
get_keywords_count.short_description = 'Keywords'
def get_clusters_count(self, obj):
if obj.pk:
return getattr(obj, 'clusters_set', obj.clusters_set).count()
return 0
get_clusters_count.short_description = 'Clusters'
@admin.register(Site)
class SiteAdmin(AccountAdminMixin, admin.ModelAdmin):
list_display = ['name', 'slug', 'account', 'industry', 'domain', 'status', 'is_active', 'get_api_key_status', 'get_sectors_count']
list_filter = ['status', 'is_active', 'account', 'industry', 'hosting_type']
search_fields = ['name', 'slug', 'domain', 'industry__name']
readonly_fields = ['created_at', 'updated_at', 'get_api_key_display']
inlines = [SectorInline]
actions = ['generate_api_keys']
fieldsets = (
('Site Info', {
'fields': ('name', 'slug', 'account', 'domain', 'description', 'industry', 'site_type', 'hosting_type', 'status', 'is_active')
}),
('WordPress Integration', {
'fields': ('get_api_key_display',),
'description': 'WordPress integration API key. Use SiteIntegration model for full integration settings.'
}),
('SEO Metadata', {
'fields': ('seo_metadata',),
'classes': ('collapse',)
}),
('Timestamps', {
'fields': ('created_at', 'updated_at'),
'classes': ('collapse',)
}),
)
def get_api_key_display(self, obj):
"""Display API key with copy button"""
if obj.wp_api_key:
from django.utils.html import format_html
return format_html(
'<div style="display:flex; align-items:center; gap:10px;">'
'<code style="background:#f0f0f0; padding:5px 10px; border-radius:3px;">{}</code>'
'<button type="button" onclick="navigator.clipboard.writeText(\'{}\'); alert(\'API Key copied to clipboard!\');" '
'style="padding:5px 10px; cursor:pointer;">Copy</button>'
'</div>',
obj.wp_api_key,
obj.wp_api_key
)
return format_html('<em>No API key generated</em>')
get_api_key_display.short_description = 'WordPress API Key'
def get_api_key_status(self, obj):
"""Show API key status in list view"""
from django.utils.html import format_html
if obj.wp_api_key:
return format_html('<span style="color:green;">●</span> Active')
return format_html('<span style="color:gray;">○</span> None')
get_api_key_status.short_description = 'API Key'
def generate_api_keys(self, request, queryset):
"""Generate API keys for selected sites"""
import secrets
updated_count = 0
for site in queryset:
if not site.wp_api_key:
site.wp_api_key = f"igny8_{''.join(secrets.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for _ in range(40))}"
site.save()
updated_count += 1
self.message_user(request, f'Generated API keys for {updated_count} site(s). Sites with existing keys were skipped.')
generate_api_keys.short_description = 'Generate WordPress API Keys'
def get_sectors_count(self, obj):
try:
return obj.get_active_sectors_count()
except:
return 0
get_sectors_count.short_description = 'Active Sectors'
def get_industry_display(self, obj):
"""Safely get industry name"""
try:
return obj.industry.name if obj.industry else '-'
except:
return '-'
get_industry_display.short_description = 'Industry'
@admin.register(Sector)
class SectorAdmin(AccountAdminMixin, admin.ModelAdmin):
list_display = ['name', 'slug', 'site', 'industry_sector', 'get_industry', 'status', 'is_active', 'get_keywords_count', 'get_clusters_count']
list_filter = ['status', 'is_active', 'site', 'industry_sector__industry']
search_fields = ['name', 'slug', 'site__name', 'industry_sector__name']
readonly_fields = ['created_at', 'updated_at']
def get_industry(self, obj):
"""Safely get industry name"""
try:
if obj.industry_sector and obj.industry_sector.industry:
return obj.industry_sector.industry.name
except:
pass
return '-'
get_industry.short_description = 'Industry'
def get_keywords_count(self, obj):
"""Safely get keywords count"""
try:
if obj.pk:
return getattr(obj, 'keywords_set', obj.keywords_set).count()
except:
pass
return 0
get_keywords_count.short_description = 'Keywords'
def get_clusters_count(self, obj):
"""Safely get clusters count"""
try:
if obj.pk:
return getattr(obj, 'clusters_set', obj.clusters_set).count()
except:
pass
return 0
get_clusters_count.short_description = 'Clusters'
@admin.register(SiteUserAccess)
class SiteUserAccessAdmin(admin.ModelAdmin):
list_display = ['user', 'site', 'granted_at', 'granted_by']
list_filter = ['granted_at']
search_fields = ['user__email', 'site__name']
readonly_fields = ['granted_at']
class IndustrySectorInline(admin.TabularInline):
"""Inline admin for industry sectors within Industry admin."""
model = IndustrySector
extra = 0
fields = ['name', 'slug', 'description', 'is_active']
readonly_fields = []
@admin.register(Industry)
class IndustryAdmin(admin.ModelAdmin):
list_display = ['name', 'slug', 'is_active', 'get_sectors_count', 'created_at']
list_filter = ['is_active']
search_fields = ['name', 'slug', 'description']
readonly_fields = ['created_at', 'updated_at']
inlines = [IndustrySectorInline]
actions = ['delete_selected'] # Enable bulk delete
change_list_template = 'admin/igny8_core_auth/industry/change_list.html'
def get_sectors_count(self, obj):
return obj.sectors.filter(is_active=True).count()
get_sectors_count.short_description = 'Active Sectors'
def has_delete_permission(self, request, obj=None):
"""Allow deletion for superusers and developers"""
return request.user.is_superuser or (hasattr(request.user, 'is_developer') and request.user.is_developer())
@admin.register(IndustrySector)
class IndustrySectorAdmin(admin.ModelAdmin):
list_display = ['name', 'slug', 'industry', 'is_active']
list_filter = ['is_active', 'industry']
search_fields = ['name', 'slug', 'description']
readonly_fields = ['created_at', 'updated_at']
actions = ['delete_selected'] # Enable bulk delete
change_list_template = 'admin/igny8_core_auth/industrysector/change_list.html'
def has_delete_permission(self, request, obj=None):
"""Allow deletion for superusers and developers"""
return request.user.is_superuser or (hasattr(request.user, 'is_developer') and request.user.is_developer())
@admin.register(SeedKeyword)
class SeedKeywordAdmin(admin.ModelAdmin):
"""SeedKeyword admin - Global reference data, no account filtering"""
list_display = ['keyword', 'industry', 'sector', 'volume', 'difficulty', 'intent', 'is_active', 'created_at']
list_filter = ['is_active', 'industry', 'sector', 'intent']
search_fields = ['keyword']
readonly_fields = ['created_at', 'updated_at']
actions = ['delete_selected'] # Enable bulk delete
change_list_template = 'admin/igny8_core_auth/seedkeyword/change_list.html'
fieldsets = (
('Keyword Info', {
'fields': ('keyword', 'industry', 'sector', 'is_active')
}),
('SEO Metrics', {
'fields': ('volume', 'difficulty', 'intent')
}),
('Timestamps', {
'fields': ('created_at', 'updated_at')
}),
)
def has_delete_permission(self, request, obj=None):
"""Allow deletion for superusers and developers"""
return request.user.is_superuser or (hasattr(request.user, 'is_developer') and request.user.is_developer())
@admin.register(User)
class UserAdmin(BaseUserAdmin):
list_display = ['email', 'username', 'account', 'role', 'is_active', 'is_staff', 'created_at']
list_filter = ['role', 'account', 'is_active', 'is_staff']
search_fields = ['email', 'username']
readonly_fields = ['created_at', 'updated_at']
fieldsets = BaseUserAdmin.fieldsets + (
('IGNY8 Info', {'fields': ('account', 'role')}),
('Timestamps', {'fields': ('created_at', 'updated_at')}),
)
add_fieldsets = BaseUserAdmin.add_fieldsets + (
('IGNY8 Info', {'fields': ('account', 'role')}),
)
def get_queryset(self, request):
"""Filter users by account for non-superusers"""
qs = super().get_queryset(request)
if request.user.is_superuser or (hasattr(request.user, 'is_developer') and request.user.is_developer()):
return qs
user_account = getattr(request.user, 'account', None)
if user_account:
return qs.filter(account=user_account)
return qs.none()
def get_account_display(self, obj):
"""Safely get account name"""
try:
account = getattr(obj, 'account', None)
return account.name if account else '-'
except:
return '-'
get_account_display.short_description = 'Account'