""" Celery Task Monitoring Admin - Unfold Style """ from django.contrib import admin from django.utils.html import format_html from django.contrib import messages from django_celery_results.models import TaskResult, GroupResult from unfold.admin import ModelAdmin from unfold.contrib.filters.admin import RangeDateFilter from celery import current_app class CeleryTaskResultAdmin(ModelAdmin): """Admin interface for monitoring Celery tasks with Unfold styling""" list_display = [ 'task_id', 'task_name', 'colored_status', 'date_created', 'date_done', 'execution_time', ] list_filter = [ 'status', 'task_name', ('date_created', RangeDateFilter), ('date_done', RangeDateFilter), ] search_fields = ['task_id', 'task_name', 'task_args'] readonly_fields = [ 'task_id', 'task_name', 'task_args', 'task_kwargs', 'result', 'traceback', 'date_created', 'date_done', 'colored_status', 'execution_time' ] date_hierarchy = 'date_created' ordering = ['-date_created'] actions = ['retry_failed_tasks', 'clear_old_tasks'] fieldsets = ( ('Task Information', { 'fields': ('task_id', 'task_name', 'colored_status') }), ('Execution Details', { 'fields': ('date_created', 'date_done', 'execution_time') }), ('Task Arguments', { 'fields': ('task_args', 'task_kwargs'), 'classes': ('collapse',) }), ('Result & Errors', { 'fields': ('result', 'traceback'), 'classes': ('collapse',) }), ) def colored_status(self, obj): """Display status with color coding""" colors = { 'SUCCESS': '#0bbf87', # IGNY8 success green 'FAILURE': '#ef4444', # IGNY8 danger red 'PENDING': '#ff7a00', # IGNY8 warning orange 'STARTED': '#0693e3', # IGNY8 primary blue 'RETRY': '#5d4ae3', # IGNY8 purple } color = colors.get(obj.status, '#64748b') # Default gray return format_html( '{}', color, obj.status ) colored_status.short_description = 'Status' def execution_time(self, obj): """Calculate and display execution time""" if obj.date_done and obj.date_created: duration = obj.date_done - obj.date_created seconds = duration.total_seconds() if seconds < 1: time_str = f'{seconds * 1000:.2f}ms' return format_html('{}', time_str) elif seconds < 60: time_str = f'{seconds:.2f}s' return format_html('{}', time_str) else: minutes = seconds / 60 time_str = f'{minutes:.1f}m' return format_html('{}', time_str) return '-' execution_time.short_description = 'Duration' def retry_failed_tasks(self, request, queryset): """Retry failed celery tasks by re-queuing them""" from igny8_core.celery import app import json failed_tasks = queryset.filter(status='FAILURE') count = 0 errors = [] for task in failed_tasks: try: # Get task function task_func = current_app.tasks.get(task.task_name) if task_func: # Parse task args and kwargs import ast try: args = ast.literal_eval(task.task_args) if task.task_args else [] kwargs = ast.literal_eval(task.task_kwargs) if task.task_kwargs else {} except: args = [] kwargs = {} # Retry the task task_func.apply_async(args=args, kwargs=kwargs) count += 1 else: errors.append(f'Task function not found: {task.task_name}') except Exception as e: errors.append(f'Error retrying {task.task_id}: {str(e)}') if count > 0: self.message_user(request, f'Successfully queued {count} task(s) for retry.', 'SUCCESS') if errors: for error in errors[:5]: # Show max 5 errors self.message_user(request, f'Error: {error}', 'WARNING') retry_failed_tasks.short_description = 'Retry Failed Tasks' def clear_old_tasks(self, request, queryset): """Clear old completed tasks""" from datetime import timedelta from django.utils import timezone # Delete tasks older than 30 days cutoff_date = timezone.now() - timedelta(days=30) old_tasks = queryset.filter( date_created__lt=cutoff_date, status__in=['SUCCESS', 'FAILURE'] ) count = old_tasks.count() old_tasks.delete() self.message_user(request, f'Cleared {count} old task(s)', messages.SUCCESS) clear_old_tasks.short_description = 'Clear Old Tasks (30+ days)' def has_add_permission(self, request): """Disable manual task creation""" return False def has_change_permission(self, request, obj=None): """Make read-only""" return False class CeleryGroupResultAdmin(ModelAdmin): """Admin interface for monitoring Celery group results with Unfold styling""" list_display = [ 'group_id', 'date_created', 'date_done', 'result_count', ] list_filter = [ ('date_created', RangeDateFilter), ('date_done', RangeDateFilter), ] search_fields = ['group_id', 'result'] readonly_fields = [ 'group_id', 'date_created', 'date_done', 'content_type', 'content_encoding', 'result' ] date_hierarchy = 'date_created' ordering = ['-date_created'] fieldsets = ( ('Group Information', { 'fields': ('group_id', 'date_created', 'date_done') }), ('Result Details', { 'fields': ('content_type', 'content_encoding', 'result'), 'classes': ('collapse',) }), ) def result_count(self, obj): """Count tasks in the group""" if obj.result: try: import json result_data = json.loads(obj.result) if isinstance(obj.result, str) else obj.result if isinstance(result_data, list): return len(result_data) except: pass return '-' result_count.short_description = 'Task Count' def has_add_permission(self, request): """Disable manual group result creation""" return False def has_change_permission(self, request, obj=None): """Make read-only""" return False