156 lines
5.4 KiB
Python
156 lines
5.4 KiB
Python
"""
|
|
Celery Task Monitoring Admin - Unfold Style
|
|
"""
|
|
from django.contrib import admin
|
|
from django.utils.html import format_html
|
|
from django.contrib import messages
|
|
from django_celery_results.models import TaskResult
|
|
from unfold.admin import ModelAdmin
|
|
from unfold.contrib.filters.admin import RangeDateFilter
|
|
|
|
|
|
class CeleryTaskResultAdmin(ModelAdmin):
|
|
"""Admin interface for monitoring Celery tasks with Unfold styling"""
|
|
|
|
list_display = [
|
|
'task_id',
|
|
'task_name',
|
|
'colored_status',
|
|
'date_created',
|
|
'date_done',
|
|
'execution_time',
|
|
]
|
|
list_filter = [
|
|
'status',
|
|
'task_name',
|
|
('date_created', RangeDateFilter),
|
|
('date_done', RangeDateFilter),
|
|
]
|
|
search_fields = ['task_id', 'task_name', 'task_args']
|
|
readonly_fields = [
|
|
'task_id', 'task_name', 'task_args', 'task_kwargs',
|
|
'result', 'traceback', 'date_created', 'date_done',
|
|
'colored_status', 'execution_time'
|
|
]
|
|
date_hierarchy = 'date_created'
|
|
ordering = ['-date_created']
|
|
|
|
actions = ['retry_failed_tasks', 'clear_old_tasks']
|
|
|
|
fieldsets = (
|
|
('Task Information', {
|
|
'fields': ('task_id', 'task_name', 'colored_status')
|
|
}),
|
|
('Execution Details', {
|
|
'fields': ('date_created', 'date_done', 'execution_time')
|
|
}),
|
|
('Task Arguments', {
|
|
'fields': ('task_args', 'task_kwargs'),
|
|
'classes': ('collapse',)
|
|
}),
|
|
('Result & Errors', {
|
|
'fields': ('result', 'traceback'),
|
|
'classes': ('collapse',)
|
|
}),
|
|
)
|
|
|
|
def colored_status(self, obj):
|
|
"""Display status with color coding"""
|
|
colors = {
|
|
'SUCCESS': '#0bbf87', # IGNY8 success green
|
|
'FAILURE': '#ef4444', # IGNY8 danger red
|
|
'PENDING': '#ff7a00', # IGNY8 warning orange
|
|
'STARTED': '#0693e3', # IGNY8 primary blue
|
|
'RETRY': '#5d4ae3', # IGNY8 purple
|
|
}
|
|
color = colors.get(obj.status, '#64748b') # Default gray
|
|
|
|
return format_html(
|
|
'<span style="color: {}; font-weight: bold; font-size: 14px;">{}</span>',
|
|
color,
|
|
obj.status
|
|
)
|
|
colored_status.short_description = 'Status'
|
|
|
|
def execution_time(self, obj):
|
|
"""Calculate and display execution time"""
|
|
if obj.date_done and obj.date_created:
|
|
duration = obj.date_done - obj.date_created
|
|
seconds = duration.total_seconds()
|
|
|
|
if seconds < 1:
|
|
return format_html('<span style="color: #0bbf87;">{:.2f}ms</span>', seconds * 1000)
|
|
elif seconds < 60:
|
|
return format_html('<span style="color: #0693e3;">{:.2f}s</span>', seconds)
|
|
else:
|
|
minutes = seconds / 60
|
|
return format_html('<span style="color: #ff7a00;">{:.1f}m</span>', minutes)
|
|
return '-'
|
|
execution_time.short_description = 'Duration'
|
|
|
|
def retry_failed_tasks(self, request, queryset):
|
|
"""Retry failed celery tasks"""
|
|
from celery import current_app
|
|
|
|
failed_tasks = queryset.filter(status='FAILURE')
|
|
count = 0
|
|
errors = []
|
|
|
|
for task in failed_tasks:
|
|
try:
|
|
# Get task function
|
|
task_func = current_app.tasks.get(task.task_name)
|
|
if task_func:
|
|
# Parse task args and kwargs
|
|
import ast
|
|
try:
|
|
args = ast.literal_eval(task.task_args) if task.task_args else []
|
|
kwargs = ast.literal_eval(task.task_kwargs) if task.task_kwargs else {}
|
|
except:
|
|
args = []
|
|
kwargs = {}
|
|
|
|
# Retry the task
|
|
task_func.apply_async(args=args, kwargs=kwargs)
|
|
count += 1
|
|
else:
|
|
errors.append(f'Task function not found: {task.task_name}')
|
|
except Exception as e:
|
|
errors.append(f'Error retrying {task.task_id}: {str(e)}')
|
|
|
|
if count > 0:
|
|
self.message_user(request, f'✅ Retried {count} failed task(s)', messages.SUCCESS)
|
|
|
|
if errors:
|
|
for error in errors[:5]: # Show max 5 errors
|
|
self.message_user(request, f'⚠️ {error}', messages.WARNING)
|
|
|
|
retry_failed_tasks.short_description = '🔄 Retry Failed Tasks'
|
|
|
|
def clear_old_tasks(self, request, queryset):
|
|
"""Clear old completed tasks"""
|
|
from datetime import timedelta
|
|
from django.utils import timezone
|
|
|
|
# Delete tasks older than 30 days
|
|
cutoff_date = timezone.now() - timedelta(days=30)
|
|
old_tasks = queryset.filter(
|
|
date_created__lt=cutoff_date,
|
|
status__in=['SUCCESS', 'FAILURE']
|
|
)
|
|
|
|
count = old_tasks.count()
|
|
old_tasks.delete()
|
|
|
|
self.message_user(request, f'🗑️ Cleared {count} old task(s)', messages.SUCCESS)
|
|
|
|
clear_old_tasks.short_description = '🗑️ Clear Old Tasks (30+ days)'
|
|
|
|
def has_add_permission(self, request):
|
|
"""Disable manual task creation"""
|
|
return False
|
|
|
|
def has_change_permission(self, request, obj=None):
|
|
"""Make read-only"""
|
|
return False
|