Files
igny8/backend/igny8_core/business/integration/admin.py
2025-12-20 02:46:00 +00:00

135 lines
5.1 KiB
Python

from django.contrib import admin
from django.contrib import messages
from unfold.admin import ModelAdmin
from igny8_core.admin.base import AccountAdminMixin, Igny8ModelAdmin
from .models import SiteIntegration, SyncEvent
from import_export.admin import ExportMixin
from import_export import resources
class SyncEventResource(resources.ModelResource):
"""Resource class for exporting Sync Events"""
class Meta:
model = SyncEvent
fields = ('id', 'integration__site__name', 'site__name', 'event_type', 'action',
'success', 'external_id', 'description', 'created_at')
export_order = fields
class SiteIntegrationResource(resources.ModelResource):
"""Resource class for exporting Site Integrations"""
class Meta:
model = SiteIntegration
fields = ('id', 'site__name', 'platform', 'platform_type', 'is_active',
'sync_enabled', 'sync_status', 'last_sync_at', 'created_at')
export_order = fields
@admin.register(SiteIntegration)
class SiteIntegrationAdmin(ExportMixin, AccountAdminMixin, Igny8ModelAdmin):
resource_class = SiteIntegrationResource
list_display = [
'site',
'platform',
'platform_type',
'is_active',
'sync_enabled',
'sync_status',
'last_sync_at',
]
list_filter = ['platform', 'platform_type', 'is_active', 'sync_enabled', 'sync_status']
search_fields = ['site__name', 'site__domain', 'platform']
readonly_fields = ['created_at', 'updated_at']
actions = [
'bulk_enable_sync',
'bulk_disable_sync',
'bulk_trigger_sync',
'bulk_test_connection',
'bulk_delete_integrations',
]
def bulk_enable_sync(self, request, queryset):
"""Enable sync for selected integrations"""
updated = queryset.update(sync_enabled=True)
self.message_user(request, f'{updated} integration(s) sync enabled.', messages.SUCCESS)
bulk_enable_sync.short_description = 'Enable sync'
def bulk_disable_sync(self, request, queryset):
"""Disable sync for selected integrations"""
updated = queryset.update(sync_enabled=False)
self.message_user(request, f'{updated} integration(s) sync disabled.', messages.SUCCESS)
bulk_disable_sync.short_description = 'Disable sync'
def bulk_trigger_sync(self, request, queryset):
"""Trigger sync for selected integrations"""
count = 0
for integration in queryset.filter(sync_enabled=True, is_active=True):
# TODO: Trigger actual sync task here
count += 1
self.message_user(request, f'{count} integration(s) queued for sync.', messages.INFO)
bulk_trigger_sync.short_description = 'Trigger sync now'
def bulk_test_connection(self, request, queryset):
"""Test connection for selected integrations"""
tested = 0
successful = 0
for integration in queryset.filter(is_active=True):
# TODO: Implement actual connection test logic
tested += 1
successful += 1 # Placeholder
self.message_user(
request,
f'Tested {tested} integration(s). {successful} successful. (Connection test logic to be implemented)',
messages.INFO
)
bulk_test_connection.short_description = 'Test connections'
def bulk_delete_integrations(self, request, queryset):
"""Delete selected integrations"""
count = queryset.count()
queryset.delete()
self.message_user(request, f'{count} integration(s) deleted.', messages.SUCCESS)
bulk_delete_integrations.short_description = 'Delete selected integrations'
@admin.register(SyncEvent)
class SyncEventAdmin(ExportMixin, AccountAdminMixin, Igny8ModelAdmin):
resource_class = SyncEventResource
list_display = [
'integration',
'site',
'event_type',
'action',
'success',
'external_id',
'created_at',
]
list_filter = ['event_type', 'action', 'success', 'created_at']
search_fields = ['integration__site__name', 'site__name', 'description', 'external_id']
readonly_fields = ['created_at']
actions = [
'bulk_mark_reviewed',
'bulk_delete_old_events',
]
def bulk_mark_reviewed(self, request, queryset):
"""Mark selected sync events as reviewed"""
# Could add a 'reviewed' field to model in future
count = queryset.count()
self.message_user(request, f'{count} sync event(s) marked as reviewed.', messages.SUCCESS)
bulk_mark_reviewed.short_description = 'Mark as reviewed'
def bulk_delete_old_events(self, request, queryset):
"""Delete sync events older than 30 days"""
from django.utils import timezone
from datetime import timedelta
cutoff_date = timezone.now() - timedelta(days=30)
old_events = queryset.filter(created_at__lt=cutoff_date)
count = old_events.count()
old_events.delete()
self.message_user(request, f'{count} old sync event(s) deleted (older than 30 days).', messages.SUCCESS)
bulk_delete_old_events.short_description = 'Delete old events (>30 days)'