Files
igny8/backend/igny8_core/admin/alerts.py
IGNY8 VPS (Salman) eb88a0e12d django phase2.3.4.
2025-12-14 15:10:41 +00:00

123 lines
4.4 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Admin Alert System
"""
from django.utils import timezone
from datetime import timedelta
class AdminAlerts:
"""System for admin alerts and notifications"""
@staticmethod
def get_alerts():
"""Get all active alerts"""
alerts = []
today = timezone.now().date()
# Check for pending payments
from igny8_core.business.billing.models import Payment
pending_payments = Payment.objects.filter(status='pending_approval').count()
if pending_payments > 0:
alerts.append({
'level': 'warning',
'icon': '⚠️',
'message': f'{pending_payments} payment(s) awaiting approval',
'url': '/admin/billing/payment/?status=pending_approval',
'action': 'Review Payments'
})
# Check for low credit accounts
from igny8_core.auth.models import Account
low_credit_accounts = Account.objects.filter(
status='active',
credits__lt=100
).count()
if low_credit_accounts > 0:
alerts.append({
'level': 'info',
'icon': '',
'message': f'{low_credit_accounts} account(s) with low credits',
'url': '/admin/igny8_core_auth/account/?credits__lt=100',
'action': 'View Accounts'
})
# Check for very low credits (critical)
critical_credit_accounts = Account.objects.filter(
status='active',
credits__lt=10
).count()
if critical_credit_accounts > 0:
alerts.append({
'level': 'error',
'icon': '🔴',
'message': f'{critical_credit_accounts} account(s) with critical low credits (< 10)',
'url': '/admin/igny8_core_auth/account/?credits__lt=10',
'action': 'Urgent Review'
})
# Check for failed automations
from igny8_core.business.automation.models import AutomationRun
failed_today = AutomationRun.objects.filter(
status='failed',
started_at__date=today
).count()
if failed_today > 0:
alerts.append({
'level': 'error',
'icon': '🔴',
'message': f'{failed_today} automation(s) failed today',
'url': '/admin/automation/automationrun/?status=failed',
'action': 'Review Failures'
})
# Check for failed syncs
from igny8_core.business.integration.models import SyncEvent
failed_syncs = SyncEvent.objects.filter(
success=False,
created_at__date=today
).count()
if failed_syncs > 5: # Only alert if more than 5
alerts.append({
'level': 'warning',
'icon': '⚠️',
'message': f'{failed_syncs} WordPress sync failures today',
'url': '/admin/integration/syncevent/?success=False',
'action': 'Review Syncs'
})
# Check for failed Celery tasks
try:
from django_celery_results.models import TaskResult
celery_failed = TaskResult.objects.filter(
status='FAILURE',
date_created__date=today
).count()
if celery_failed > 0:
alerts.append({
'level': 'error',
'icon': '🔴',
'message': f'{celery_failed} Celery task(s) failed today',
'url': '/admin/django_celery_results/taskresult/?status=FAILURE',
'action': 'Review Tasks'
})
except:
pass
# Check for stale pending tasks (older than 24 hours)
from igny8_core.modules.writer.models import Tasks
yesterday = today - timedelta(days=1)
stale_tasks = Tasks.objects.filter(
status='pending',
created_at__date__lte=yesterday
).count()
if stale_tasks > 10:
alerts.append({
'level': 'info',
'icon': '',
'message': f'{stale_tasks} tasks pending for more than 24 hours',
'url': '/admin/writer/tasks/?status=pending',
'action': 'Review Tasks'
})
return alerts