"""
Celery Task Monitoring Admin - Unfold Style
"""
from django.contrib import admin
from django.utils.html import format_html
from django.contrib import messages
from django_celery_results.models import TaskResult, GroupResult
from unfold.admin import ModelAdmin
from unfold.contrib.filters.admin import RangeDateFilter
from celery import current_app
class CeleryTaskResultAdmin(ModelAdmin):
"""Admin interface for monitoring Celery tasks with Unfold styling"""
list_display = [
'task_id',
'task_name',
'colored_status',
'date_created',
'date_done',
'execution_time',
]
list_filter = [
'status',
'task_name',
('date_created', RangeDateFilter),
('date_done', RangeDateFilter),
]
search_fields = ['task_id', 'task_name', 'task_args']
readonly_fields = [
'task_id', 'task_name', 'task_args', 'task_kwargs',
'result', 'traceback', 'date_created', 'date_done',
'colored_status', 'execution_time'
]
date_hierarchy = 'date_created'
ordering = ['-date_created']
actions = ['retry_failed_tasks', 'clear_old_tasks']
fieldsets = (
('Task Information', {
'fields': ('task_id', 'task_name', 'colored_status')
}),
('Execution Details', {
'fields': ('date_created', 'date_done', 'execution_time')
}),
('Task Arguments', {
'fields': ('task_args', 'task_kwargs'),
'classes': ('collapse',)
}),
('Result & Errors', {
'fields': ('result', 'traceback'),
'classes': ('collapse',)
}),
)
def colored_status(self, obj):
"""Display status with color coding"""
colors = {
'SUCCESS': '#0bbf87', # IGNY8 success green
'FAILURE': '#ef4444', # IGNY8 danger red
'PENDING': '#ff7a00', # IGNY8 warning orange
'STARTED': '#0693e3', # IGNY8 primary blue
'RETRY': '#5d4ae3', # IGNY8 purple
}
color = colors.get(obj.status, '#64748b') # Default gray
return format_html(
'{}',
color,
obj.status
)
colored_status.short_description = 'Status'
def execution_time(self, obj):
"""Calculate and display execution time"""
if obj.date_done and obj.date_created:
duration = obj.date_done - obj.date_created
seconds = duration.total_seconds()
if seconds < 1:
time_str = f'{seconds * 1000:.2f}ms'
return format_html('{}', time_str)
elif seconds < 60:
time_str = f'{seconds:.2f}s'
return format_html('{}', time_str)
else:
minutes = seconds / 60
time_str = f'{minutes:.1f}m'
return format_html('{}', time_str)
return '-'
execution_time.short_description = 'Duration'
def retry_failed_tasks(self, request, queryset):
"""Retry failed celery tasks by re-queuing them"""
from igny8_core.celery import app
import json
failed_tasks = queryset.filter(status='FAILURE')
count = 0
errors = []
for task in failed_tasks:
try:
# Get task function
task_func = current_app.tasks.get(task.task_name)
if task_func:
# Parse task args and kwargs
import ast
try:
args = ast.literal_eval(task.task_args) if task.task_args else []
kwargs = ast.literal_eval(task.task_kwargs) if task.task_kwargs else {}
except:
args = []
kwargs = {}
# Retry the task
task_func.apply_async(args=args, kwargs=kwargs)
count += 1
else:
errors.append(f'Task function not found: {task.task_name}')
except Exception as e:
errors.append(f'Error retrying {task.task_id}: {str(e)}')
if count > 0:
self.message_user(request, f'Successfully queued {count} task(s) for retry.', 'SUCCESS')
if errors:
for error in errors[:5]: # Show max 5 errors
self.message_user(request, f'Error: {error}', 'WARNING')
retry_failed_tasks.short_description = 'Retry Failed Tasks'
def clear_old_tasks(self, request, queryset):
"""Clear old completed tasks"""
from datetime import timedelta
from django.utils import timezone
# Delete tasks older than 30 days
cutoff_date = timezone.now() - timedelta(days=30)
old_tasks = queryset.filter(
date_created__lt=cutoff_date,
status__in=['SUCCESS', 'FAILURE']
)
count = old_tasks.count()
old_tasks.delete()
self.message_user(request, f'Cleared {count} old task(s)', messages.SUCCESS)
clear_old_tasks.short_description = 'Clear Old Tasks (30+ days)'
def has_add_permission(self, request):
"""Disable manual task creation"""
return False
def has_change_permission(self, request, obj=None):
"""Make read-only"""
return False
class CeleryGroupResultAdmin(ModelAdmin):
"""Admin interface for monitoring Celery group results with Unfold styling"""
list_display = [
'group_id',
'date_created',
'date_done',
'result_count',
]
list_filter = [
('date_created', RangeDateFilter),
('date_done', RangeDateFilter),
]
search_fields = ['group_id', 'result']
readonly_fields = [
'group_id', 'date_created', 'date_done', 'content_type',
'content_encoding', 'result'
]
date_hierarchy = 'date_created'
ordering = ['-date_created']
fieldsets = (
('Group Information', {
'fields': ('group_id', 'date_created', 'date_done')
}),
('Result Details', {
'fields': ('content_type', 'content_encoding', 'result'),
'classes': ('collapse',)
}),
)
def result_count(self, obj):
"""Count tasks in the group"""
if obj.result:
try:
import json
result_data = json.loads(obj.result) if isinstance(obj.result, str) else obj.result
if isinstance(result_data, list):
return len(result_data)
except:
pass
return '-'
result_count.short_description = 'Task Count'
def has_add_permission(self, request):
"""Disable manual group result creation"""
return False
def has_change_permission(self, request, obj=None):
"""Make read-only"""
return False