123 lines
4.4 KiB
Python
123 lines
4.4 KiB
Python
"""
|
||
Admin Alert System
|
||
"""
|
||
from django.utils import timezone
|
||
from datetime import timedelta
|
||
|
||
|
||
class AdminAlerts:
|
||
"""System for admin alerts and notifications"""
|
||
|
||
@staticmethod
|
||
def get_alerts():
|
||
"""Get all active alerts"""
|
||
alerts = []
|
||
today = timezone.now().date()
|
||
|
||
# Check for pending payments
|
||
from igny8_core.business.billing.models import Payment
|
||
pending_payments = Payment.objects.filter(status='pending_approval').count()
|
||
if pending_payments > 0:
|
||
alerts.append({
|
||
'level': 'warning',
|
||
'icon': '⚠️',
|
||
'message': f'{pending_payments} payment(s) awaiting approval',
|
||
'url': '/admin/billing/payment/?status=pending_approval',
|
||
'action': 'Review Payments'
|
||
})
|
||
|
||
# Check for low credit accounts
|
||
from igny8_core.auth.models import Account
|
||
low_credit_accounts = Account.objects.filter(
|
||
status='active',
|
||
credits__lt=100
|
||
).count()
|
||
if low_credit_accounts > 0:
|
||
alerts.append({
|
||
'level': 'info',
|
||
'icon': 'ℹ️',
|
||
'message': f'{low_credit_accounts} account(s) with low credits',
|
||
'url': '/admin/igny8_core_auth/account/?credits__lt=100',
|
||
'action': 'View Accounts'
|
||
})
|
||
|
||
# Check for very low credits (critical)
|
||
critical_credit_accounts = Account.objects.filter(
|
||
status='active',
|
||
credits__lt=10
|
||
).count()
|
||
if critical_credit_accounts > 0:
|
||
alerts.append({
|
||
'level': 'error',
|
||
'icon': '🔴',
|
||
'message': f'{critical_credit_accounts} account(s) with critical low credits (< 10)',
|
||
'url': '/admin/igny8_core_auth/account/?credits__lt=10',
|
||
'action': 'Urgent Review'
|
||
})
|
||
|
||
# Check for failed automations
|
||
from igny8_core.business.automation.models import AutomationRun
|
||
failed_today = AutomationRun.objects.filter(
|
||
status='failed',
|
||
started_at__date=today
|
||
).count()
|
||
if failed_today > 0:
|
||
alerts.append({
|
||
'level': 'error',
|
||
'icon': '🔴',
|
||
'message': f'{failed_today} automation(s) failed today',
|
||
'url': '/admin/automation/automationrun/?status=failed',
|
||
'action': 'Review Failures'
|
||
})
|
||
|
||
# Check for failed syncs
|
||
from igny8_core.business.integration.models import SyncEvent
|
||
failed_syncs = SyncEvent.objects.filter(
|
||
success=False,
|
||
created_at__date=today
|
||
).count()
|
||
if failed_syncs > 5: # Only alert if more than 5
|
||
alerts.append({
|
||
'level': 'warning',
|
||
'icon': '⚠️',
|
||
'message': f'{failed_syncs} WordPress sync failures today',
|
||
'url': '/admin/integration/syncevent/?success=False',
|
||
'action': 'Review Syncs'
|
||
})
|
||
|
||
# Check for failed Celery tasks
|
||
try:
|
||
from django_celery_results.models import TaskResult
|
||
celery_failed = TaskResult.objects.filter(
|
||
status='FAILURE',
|
||
date_created__date=today
|
||
).count()
|
||
if celery_failed > 0:
|
||
alerts.append({
|
||
'level': 'error',
|
||
'icon': '🔴',
|
||
'message': f'{celery_failed} Celery task(s) failed today',
|
||
'url': '/admin/django_celery_results/taskresult/?status=FAILURE',
|
||
'action': 'Review Tasks'
|
||
})
|
||
except:
|
||
pass
|
||
|
||
# Check for stale pending tasks (older than 24 hours)
|
||
from igny8_core.modules.writer.models import Tasks
|
||
yesterday = today - timedelta(days=1)
|
||
stale_tasks = Tasks.objects.filter(
|
||
status='pending',
|
||
created_at__date__lte=yesterday
|
||
).count()
|
||
if stale_tasks > 10:
|
||
alerts.append({
|
||
'level': 'info',
|
||
'icon': 'ℹ️',
|
||
'message': f'{stale_tasks} tasks pending for more than 24 hours',
|
||
'url': '/admin/writer/tasks/?status=pending',
|
||
'action': 'Review Tasks'
|
||
})
|
||
|
||
return alerts
|