fix
This commit is contained in:
57
backend/force_cancel_automation.py
Normal file
57
backend/force_cancel_automation.py
Normal file
@@ -0,0 +1,57 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Force cancel stuck automation runs and clear cache locks"""
|
||||
import os
|
||||
import sys
|
||||
import django
|
||||
|
||||
# Setup Django
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'igny8_core.settings')
|
||||
django.setup()
|
||||
|
||||
from igny8_core.business.automation.models import AutomationRun
|
||||
from django.core.cache import cache
|
||||
from django.utils import timezone
|
||||
|
||||
print("=" * 80)
|
||||
print("AUTOMATION RUN FORCE CANCEL & CLEANUP")
|
||||
print("=" * 80)
|
||||
|
||||
# Check and cancel active runs
|
||||
runs = AutomationRun.objects.filter(status__in=['running', 'paused']).order_by('-started_at')
|
||||
print(f"\nFound {runs.count()} active run(s)")
|
||||
|
||||
if runs.count() == 0:
|
||||
print(" No runs to cancel\n")
|
||||
else:
|
||||
for r in runs:
|
||||
duration = (timezone.now() - r.started_at).total_seconds() / 60
|
||||
print(f"\nRun ID: {r.run_id}")
|
||||
print(f" Site: {r.site_id}")
|
||||
print(f" Status: {r.status}")
|
||||
print(f" Stage: {r.current_stage}")
|
||||
print(f" Started: {r.started_at} ({duration:.1f}m ago)")
|
||||
print(f" Credits: {r.total_credits_used}")
|
||||
|
||||
# Force cancel
|
||||
print(f" >>> FORCE CANCELLING...")
|
||||
r.status = 'cancelled'
|
||||
r.save()
|
||||
print(f" >>> Status: {r.status}")
|
||||
|
||||
# Clear cache lock
|
||||
lock_key = f'automation_lock_{r.site_id}'
|
||||
cache.delete(lock_key)
|
||||
print(f" >>> Lock cleared: {lock_key}")
|
||||
|
||||
print("\n" + "=" * 40)
|
||||
print("Cache lock status:")
|
||||
for site_id in [5, 16]:
|
||||
lock_key = f'automation_lock_{site_id}'
|
||||
lock_val = cache.get(lock_key)
|
||||
status = lock_val or 'UNLOCKED ✓'
|
||||
print(f" Site {site_id}: {status}")
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("✓ CLEANUP COMPLETE - You can now start a new automation run")
|
||||
print("=" * 80)
|
||||
@@ -89,6 +89,9 @@ class Igny8AdminSite(admin.AdminSite):
|
||||
('system', 'UserSettings'),
|
||||
('system', 'ModuleSettings'),
|
||||
('system', 'AISettings'),
|
||||
# Automation config lives under the automation app - include here
|
||||
('automation', 'AutomationConfig'),
|
||||
('automation', 'AutomationRun'),
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
20
backend/igny8_core/business/automation/admin.py
Normal file
20
backend/igny8_core/business/automation/admin.py
Normal file
@@ -0,0 +1,20 @@
|
||||
"""
|
||||
Admin registration for Automation models
|
||||
"""
|
||||
from django.contrib import admin
|
||||
from igny8_core.admin.base import AccountAdminMixin
|
||||
from .models import AutomationConfig, AutomationRun
|
||||
|
||||
|
||||
@admin.register(AutomationConfig)
|
||||
class AutomationConfigAdmin(AccountAdminMixin, admin.ModelAdmin):
|
||||
list_display = ('site', 'is_enabled', 'frequency', 'scheduled_time', 'within_stage_delay', 'between_stage_delay', 'last_run_at')
|
||||
list_filter = ('is_enabled', 'frequency')
|
||||
search_fields = ('site__domain',)
|
||||
|
||||
|
||||
@admin.register(AutomationRun)
|
||||
class AutomationRunAdmin(AccountAdminMixin, admin.ModelAdmin):
|
||||
list_display = ('run_id', 'site', 'status', 'current_stage', 'started_at', 'completed_at')
|
||||
list_filter = ('status', 'current_stage')
|
||||
search_fields = ('run_id', 'site__domain')
|
||||
@@ -7,15 +7,22 @@ import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
import json
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AutomationLogger:
|
||||
"""File-based logging for automation runs"""
|
||||
|
||||
def __init__(self, base_log_dir: str = 'logs/automation'):
|
||||
"""File-based logging for automation runs
|
||||
|
||||
Writes logs under a per-account/per-site/run directory by default.
|
||||
Optionally a shared_log_dir can be provided to mirror logs into a consolidated folder.
|
||||
"""
|
||||
|
||||
def __init__(self, base_log_dir: str = '/data/app/logs/automation', shared_log_dir: str | None = None):
|
||||
# Use absolute path by default to avoid surprises from current working directory
|
||||
self.base_log_dir = base_log_dir
|
||||
self.shared_log_dir = shared_log_dir
|
||||
|
||||
def start_run(self, account_id: int, site_id: int, trigger_type: str) -> str:
|
||||
"""
|
||||
@@ -28,11 +35,17 @@ class AutomationLogger:
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
run_id = f"run_{timestamp}_{trigger_type}"
|
||||
|
||||
# Create directory structure
|
||||
# Create directory structure (primary)
|
||||
run_dir = self._get_run_dir(account_id, site_id, run_id)
|
||||
os.makedirs(run_dir, exist_ok=True)
|
||||
|
||||
# Create mirrored directory in shared log dir if configured
|
||||
shared_run_dir = None
|
||||
if self.shared_log_dir:
|
||||
shared_run_dir = os.path.join(self.shared_log_dir, run_id)
|
||||
os.makedirs(shared_run_dir, exist_ok=True)
|
||||
|
||||
# Create main log file
|
||||
# Create main log file in primary run dir
|
||||
log_file = os.path.join(run_dir, 'automation_run.log')
|
||||
with open(log_file, 'w') as f:
|
||||
f.write("=" * 80 + "\n")
|
||||
@@ -42,6 +55,41 @@ class AutomationLogger:
|
||||
f.write(f"Account: {account_id}\n")
|
||||
f.write(f"Site: {site_id}\n")
|
||||
f.write("=" * 80 + "\n\n")
|
||||
|
||||
# Also create a main log in the shared run dir (if configured)
|
||||
if shared_run_dir:
|
||||
shared_log_file = os.path.join(shared_run_dir, 'automation_run.log')
|
||||
with open(shared_log_file, 'w') as f:
|
||||
f.write("=" * 80 + "\n")
|
||||
f.write(f"AUTOMATION RUN (SHARED): {run_id}\n")
|
||||
f.write(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||
f.write(f"Trigger: {trigger_type}\n")
|
||||
f.write(f"Account: {account_id}\n")
|
||||
f.write(f"Site: {site_id}\n")
|
||||
f.write("=" * 80 + "\n\n")
|
||||
# Structured trace event for run start
|
||||
try:
|
||||
trace_event = {
|
||||
'event': 'run_started',
|
||||
'run_id': run_id,
|
||||
'trigger': trigger_type,
|
||||
'account_id': account_id,
|
||||
'site_id': site_id,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
}
|
||||
# best-effort append
|
||||
run_dir = self._get_run_dir(account_id, site_id, run_id)
|
||||
os.makedirs(run_dir, exist_ok=True)
|
||||
trace_file = os.path.join(run_dir, 'run_trace.jsonl')
|
||||
with open(trace_file, 'a') as tf:
|
||||
tf.write(json.dumps(trace_event) + "\n")
|
||||
if self.shared_log_dir:
|
||||
shared_trace = os.path.join(self.shared_log_dir, run_id, 'run_trace.jsonl')
|
||||
os.makedirs(os.path.dirname(shared_trace), exist_ok=True)
|
||||
with open(shared_trace, 'a') as stf:
|
||||
stf.write(json.dumps(trace_event) + "\n")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
logger.info(f"[AutomationLogger] Created run: {run_id}")
|
||||
return run_id
|
||||
@@ -56,14 +104,48 @@ class AutomationLogger:
|
||||
self._append_to_main_log(account_id, site_id, run_id,
|
||||
f"{timestamp} - Stage {stage_number}: Found {pending_count} pending items")
|
||||
|
||||
# Stage-specific log
|
||||
# Stage-specific log (primary)
|
||||
stage_log = self._get_stage_log_path(account_id, site_id, run_id, stage_number)
|
||||
os.makedirs(os.path.dirname(stage_log), exist_ok=True)
|
||||
with open(stage_log, 'w') as f:
|
||||
f.write("=" * 80 + "\n")
|
||||
f.write(f"STAGE {stage_number}: {stage_name}\n")
|
||||
f.write(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||
f.write("=" * 80 + "\n\n")
|
||||
f.write(f"{timestamp} - Found {pending_count} pending items\n")
|
||||
|
||||
# Mirror stage log into shared dir if configured
|
||||
if self.shared_log_dir:
|
||||
shared_stage_log = os.path.join(self.shared_log_dir, run_id, f'stage_{str(stage_number)}.log')
|
||||
os.makedirs(os.path.dirname(shared_stage_log), exist_ok=True)
|
||||
with open(shared_stage_log, 'w') as f:
|
||||
f.write("=" * 80 + "\n")
|
||||
f.write(f"STAGE {stage_number}: {stage_name} (SHARED)\n")
|
||||
f.write(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||||
f.write("=" * 80 + "\n\n")
|
||||
f.write(f"{timestamp} - Found {pending_count} pending items\n")
|
||||
# Structured stage start trace
|
||||
try:
|
||||
trace_event = {
|
||||
'event': 'stage_start',
|
||||
'run_id': run_id,
|
||||
'stage': stage_number,
|
||||
'stage_name': stage_name,
|
||||
'pending_count': pending_count,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
}
|
||||
run_dir = self._get_run_dir(account_id, site_id, run_id)
|
||||
os.makedirs(run_dir, exist_ok=True)
|
||||
trace_file = os.path.join(run_dir, 'run_trace.jsonl')
|
||||
with open(trace_file, 'a') as tf:
|
||||
tf.write(json.dumps(trace_event) + "\n")
|
||||
if self.shared_log_dir:
|
||||
shared_trace = os.path.join(self.shared_log_dir, run_id, 'run_trace.jsonl')
|
||||
os.makedirs(os.path.dirname(shared_trace), exist_ok=True)
|
||||
with open(shared_trace, 'a') as stf:
|
||||
stf.write(json.dumps(trace_event) + "\n")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def log_stage_progress(self, run_id: str, account_id: int, site_id: int, stage_number: int, message: str):
|
||||
"""Log stage progress"""
|
||||
@@ -73,10 +155,39 @@ class AutomationLogger:
|
||||
# Main log
|
||||
self._append_to_main_log(account_id, site_id, run_id, log_message)
|
||||
|
||||
# Stage-specific log
|
||||
# Stage-specific log (primary)
|
||||
stage_log = self._get_stage_log_path(account_id, site_id, run_id, stage_number)
|
||||
os.makedirs(os.path.dirname(stage_log), exist_ok=True)
|
||||
with open(stage_log, 'a') as f:
|
||||
f.write(f"{log_message}\n")
|
||||
|
||||
# Mirror progress into shared dir if configured
|
||||
if self.shared_log_dir:
|
||||
shared_stage_log = os.path.join(self.shared_log_dir, run_id, f'stage_{str(stage_number)}.log')
|
||||
os.makedirs(os.path.dirname(shared_stage_log), exist_ok=True)
|
||||
with open(shared_stage_log, 'a') as f:
|
||||
f.write(f"{log_message}\n")
|
||||
# Structured progress trace
|
||||
try:
|
||||
trace_event = {
|
||||
'event': 'stage_progress',
|
||||
'run_id': run_id,
|
||||
'stage': stage_number,
|
||||
'message': message,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
}
|
||||
run_dir = self._get_run_dir(account_id, site_id, run_id)
|
||||
os.makedirs(run_dir, exist_ok=True)
|
||||
trace_file = os.path.join(run_dir, 'run_trace.jsonl')
|
||||
with open(trace_file, 'a') as tf:
|
||||
tf.write(json.dumps(trace_event) + "\n")
|
||||
if self.shared_log_dir:
|
||||
shared_trace = os.path.join(self.shared_log_dir, run_id, 'run_trace.jsonl')
|
||||
os.makedirs(os.path.dirname(shared_trace), exist_ok=True)
|
||||
with open(shared_trace, 'a') as stf:
|
||||
stf.write(json.dumps(trace_event) + "\n")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def log_stage_complete(self, run_id: str, account_id: int, site_id: int, stage_number: int,
|
||||
processed_count: int, time_elapsed: str, credits_used: int):
|
||||
@@ -87,8 +198,9 @@ class AutomationLogger:
|
||||
self._append_to_main_log(account_id, site_id, run_id,
|
||||
f"{timestamp} - Stage {stage_number} complete: {processed_count} items processed")
|
||||
|
||||
# Stage-specific log
|
||||
# Stage-specific log (primary)
|
||||
stage_log = self._get_stage_log_path(account_id, site_id, run_id, stage_number)
|
||||
os.makedirs(os.path.dirname(stage_log), exist_ok=True)
|
||||
with open(stage_log, 'a') as f:
|
||||
f.write("\n" + "=" * 80 + "\n")
|
||||
f.write(f"STAGE {stage_number} COMPLETE\n")
|
||||
@@ -96,6 +208,41 @@ class AutomationLogger:
|
||||
f.write(f"Processed: {processed_count} items\n")
|
||||
f.write(f"Credits Used: {credits_used}\n")
|
||||
f.write("=" * 80 + "\n")
|
||||
|
||||
# Mirror completion into shared dir if configured
|
||||
if self.shared_log_dir:
|
||||
shared_stage_log = os.path.join(self.shared_log_dir, run_id, f'stage_{str(stage_number)}.log')
|
||||
os.makedirs(os.path.dirname(shared_stage_log), exist_ok=True)
|
||||
with open(shared_stage_log, 'a') as f:
|
||||
f.write("\n" + "=" * 80 + "\n")
|
||||
f.write(f"STAGE {stage_number} COMPLETE (SHARED)\n")
|
||||
f.write(f"Total Time: {time_elapsed}\n")
|
||||
f.write(f"Processed: {processed_count} items\n")
|
||||
f.write(f"Credits Used: {credits_used}\n")
|
||||
f.write("=" * 80 + "\n")
|
||||
# Structured completion trace
|
||||
try:
|
||||
trace_event = {
|
||||
'event': 'stage_complete',
|
||||
'run_id': run_id,
|
||||
'stage': stage_number,
|
||||
'processed_count': processed_count,
|
||||
'time_elapsed': time_elapsed,
|
||||
'credits_used': credits_used,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
}
|
||||
run_dir = self._get_run_dir(account_id, site_id, run_id)
|
||||
os.makedirs(run_dir, exist_ok=True)
|
||||
trace_file = os.path.join(run_dir, 'run_trace.jsonl')
|
||||
with open(trace_file, 'a') as tf:
|
||||
tf.write(json.dumps(trace_event) + "\n")
|
||||
if self.shared_log_dir:
|
||||
shared_trace = os.path.join(self.shared_log_dir, run_id, 'run_trace.jsonl')
|
||||
os.makedirs(os.path.dirname(shared_trace), exist_ok=True)
|
||||
with open(shared_trace, 'a') as stf:
|
||||
stf.write(json.dumps(trace_event) + "\n")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def log_stage_error(self, run_id: str, account_id: int, site_id: int, stage_number: int, error_message: str):
|
||||
"""Log stage error"""
|
||||
@@ -105,10 +252,39 @@ class AutomationLogger:
|
||||
# Main log
|
||||
self._append_to_main_log(account_id, site_id, run_id, log_message)
|
||||
|
||||
# Stage-specific log
|
||||
# Stage-specific log (primary)
|
||||
stage_log = self._get_stage_log_path(account_id, site_id, run_id, stage_number)
|
||||
os.makedirs(os.path.dirname(stage_log), exist_ok=True)
|
||||
with open(stage_log, 'a') as f:
|
||||
f.write(f"\n{log_message}\n")
|
||||
|
||||
# Mirror error into shared dir if configured
|
||||
if self.shared_log_dir:
|
||||
shared_stage_log = os.path.join(self.shared_log_dir, run_id, f'stage_{str(stage_number)}.log')
|
||||
os.makedirs(os.path.dirname(shared_stage_log), exist_ok=True)
|
||||
with open(shared_stage_log, 'a') as f:
|
||||
f.write(f"\n{log_message}\n")
|
||||
# Structured error trace
|
||||
try:
|
||||
trace_event = {
|
||||
'event': 'stage_error',
|
||||
'run_id': run_id,
|
||||
'stage': stage_number,
|
||||
'error': error_message,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
}
|
||||
run_dir = self._get_run_dir(account_id, site_id, run_id)
|
||||
os.makedirs(run_dir, exist_ok=True)
|
||||
trace_file = os.path.join(run_dir, 'run_trace.jsonl')
|
||||
with open(trace_file, 'a') as tf:
|
||||
tf.write(json.dumps(trace_event) + "\n")
|
||||
if self.shared_log_dir:
|
||||
shared_trace = os.path.join(self.shared_log_dir, run_id, 'run_trace.jsonl')
|
||||
os.makedirs(os.path.dirname(shared_trace), exist_ok=True)
|
||||
with open(shared_trace, 'a') as stf:
|
||||
stf.write(json.dumps(trace_event) + "\n")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def get_activity_log(self, account_id: int, site_id: int, run_id: str, last_n: int = 50) -> List[str]:
|
||||
"""
|
||||
@@ -144,9 +320,48 @@ class AutomationLogger:
|
||||
|
||||
def _append_to_main_log(self, account_id: int, site_id: int, run_id: str, message: str):
|
||||
"""Append message to main log file"""
|
||||
# Ensure base log dir exists
|
||||
try:
|
||||
os.makedirs(self.base_log_dir, exist_ok=True)
|
||||
except Exception:
|
||||
# Best-effort: if directory creation fails, still attempt to write to run dir
|
||||
pass
|
||||
|
||||
log_file = os.path.join(self._get_run_dir(account_id, site_id, run_id), 'automation_run.log')
|
||||
os.makedirs(os.path.dirname(log_file), exist_ok=True)
|
||||
with open(log_file, 'a') as f:
|
||||
f.write(f"{message}\n")
|
||||
|
||||
# Also append to a diagnostic file so we can trace logger calls across runs
|
||||
try:
|
||||
diag_file = os.path.join(self.base_log_dir, 'automation_diagnostic.log')
|
||||
with open(diag_file, 'a') as df:
|
||||
df.write(f"{self._timestamp()} - {account_id}/{site_id}/{run_id} - {message}\n")
|
||||
except Exception:
|
||||
# Never fail the main logging flow because of diagnostics
|
||||
pass
|
||||
|
||||
def append_trace(self, account_id: int, site_id: int, run_id: str, event: dict):
|
||||
"""Public helper to append a structured trace event (JSONL) for a run and mirror to shared dir."""
|
||||
try:
|
||||
run_dir = self._get_run_dir(account_id, site_id, run_id)
|
||||
os.makedirs(run_dir, exist_ok=True)
|
||||
trace_file = os.path.join(run_dir, 'run_trace.jsonl')
|
||||
with open(trace_file, 'a') as tf:
|
||||
tf.write(json.dumps(event) + "\n")
|
||||
except Exception:
|
||||
# Best-effort: ignore trace write failures
|
||||
pass
|
||||
|
||||
if self.shared_log_dir:
|
||||
try:
|
||||
shared_run_dir = os.path.join(self.shared_log_dir, run_id)
|
||||
os.makedirs(shared_run_dir, exist_ok=True)
|
||||
shared_trace = os.path.join(shared_run_dir, 'run_trace.jsonl')
|
||||
with open(shared_trace, 'a') as stf:
|
||||
stf.write(json.dumps(event) + "\n")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _timestamp(self) -> str:
|
||||
"""Get formatted timestamp"""
|
||||
|
||||
@@ -13,6 +13,7 @@ from celery.result import AsyncResult
|
||||
|
||||
from igny8_core.business.automation.models import AutomationRun, AutomationConfig
|
||||
from igny8_core.business.automation.services.automation_logger import AutomationLogger
|
||||
from django.conf import settings
|
||||
from igny8_core.auth.models import Account, Site
|
||||
from igny8_core.modules.planner.models import Keywords, Clusters, ContentIdeas
|
||||
from igny8_core.modules.writer.models import Tasks, Content, Images
|
||||
@@ -35,7 +36,9 @@ class AutomationService:
|
||||
def __init__(self, account: Account, site: Site):
|
||||
self.account = account
|
||||
self.site = site
|
||||
self.logger = AutomationLogger()
|
||||
# Initialize AutomationLogger; allow optional shared log dir from settings
|
||||
shared_dir = getattr(settings, 'AUTOMATION_SHARED_LOG_DIR', None)
|
||||
self.logger = AutomationLogger(shared_log_dir=shared_dir)
|
||||
self.run = None
|
||||
self.config = None
|
||||
|
||||
@@ -272,6 +275,20 @@ class AutomationService:
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Batch {batch_num} complete"
|
||||
)
|
||||
|
||||
# Emit per-item trace event for UI progress tracking
|
||||
try:
|
||||
self.logger.append_trace(self.account.id, self.site.id, self.run.run_id, {
|
||||
'event': 'stage_item_processed',
|
||||
'run_id': self.run.run_id,
|
||||
'stage': stage_number,
|
||||
'processed': keywords_processed,
|
||||
'total': len(keyword_ids),
|
||||
'batch_num': batch_num,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
# FIXED: Log error but continue processing remaining batches
|
||||
error_msg = f"Failed to process batch {batch_num}: {str(e)}"
|
||||
@@ -782,6 +799,20 @@ class AutomationService:
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Task '{task.title}' complete ({tasks_processed}/{total_tasks})"
|
||||
)
|
||||
|
||||
# Emit per-item trace event for UI progress tracking
|
||||
try:
|
||||
self.logger.append_trace(self.account.id, self.site.id, self.run.run_id, {
|
||||
'event': 'stage_item_processed',
|
||||
'run_id': self.run.run_id,
|
||||
'stage': stage_number,
|
||||
'processed': tasks_processed,
|
||||
'total': total_tasks,
|
||||
'item': {'id': task.id, 'title': task.title},
|
||||
'timestamp': datetime.now().isoformat()
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
# FIXED: Log error but continue processing remaining tasks
|
||||
error_msg = f"Failed to process task '{task.title}': {str(e)}"
|
||||
@@ -1419,28 +1450,68 @@ class AutomationService:
|
||||
Get real-time processing state for current automation run
|
||||
Returns detailed info about what's currently being processed
|
||||
"""
|
||||
if not self.run or self.run.status != 'running':
|
||||
if not self.run:
|
||||
return None
|
||||
|
||||
# Allow paused runs to show state (UI needs this to avoid blanking)
|
||||
if self.run.status not in ('running', 'paused'):
|
||||
return None
|
||||
|
||||
stage = self.run.current_stage
|
||||
|
||||
|
||||
# Get stage-specific data based on current stage
|
||||
state = None
|
||||
if stage == 1: # Keywords → Clusters
|
||||
return self._get_stage_1_state()
|
||||
state = self._get_stage_1_state()
|
||||
elif stage == 2: # Clusters → Ideas
|
||||
return self._get_stage_2_state()
|
||||
state = self._get_stage_2_state()
|
||||
elif stage == 3: # Ideas → Tasks
|
||||
return self._get_stage_3_state()
|
||||
state = self._get_stage_3_state()
|
||||
elif stage == 4: # Tasks → Content
|
||||
return self._get_stage_4_state()
|
||||
state = self._get_stage_4_state()
|
||||
elif stage == 5: # Content → Image Prompts
|
||||
return self._get_stage_5_state()
|
||||
state = self._get_stage_5_state()
|
||||
elif stage == 6: # Image Prompts → Images
|
||||
return self._get_stage_6_state()
|
||||
state = self._get_stage_6_state()
|
||||
elif stage == 7: # Manual Review Gate
|
||||
return self._get_stage_7_state()
|
||||
|
||||
return None
|
||||
state = self._get_stage_7_state()
|
||||
|
||||
# Trace what we return so UI mapping can be diagnosed
|
||||
try:
|
||||
if state is not None and self.logger:
|
||||
# Avoid heavy payloads for very large queues — only include keys we care about
|
||||
lightweight = {
|
||||
'stage_number': state.get('stage_number'),
|
||||
'stage_name': state.get('stage_name'),
|
||||
'stage_type': state.get('stage_type'),
|
||||
'total_items': state.get('total_items'),
|
||||
'processed_items': state.get('processed_items'),
|
||||
'percentage': state.get('percentage'),
|
||||
'currently_processing_count': len(state.get('currently_processing') or []),
|
||||
'up_next_count': len(state.get('up_next') or []),
|
||||
'remaining_count': state.get('remaining_count'),
|
||||
}
|
||||
trace = {
|
||||
'event': 'get_current_processing',
|
||||
'run_id': getattr(self.run, 'run_id', None),
|
||||
'site_id': getattr(self.site, 'id', None),
|
||||
'account_id': getattr(self.account, 'id', None),
|
||||
'payload': lightweight,
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
}
|
||||
# Best-effort append
|
||||
try:
|
||||
self.logger.append_trace(self.account.id, self.site.id, self.run.run_id, trace)
|
||||
except Exception:
|
||||
# fallback: try writing to diagnostic
|
||||
try:
|
||||
self.logger._append_to_main_log(self.account.id, self.site.id, self.run.run_id, f"TRACE: {trace}")
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return state
|
||||
|
||||
def _get_stage_1_state(self) -> dict:
|
||||
"""Get processing state for Stage 1: Keywords → Clusters"""
|
||||
@@ -1594,29 +1665,49 @@ class AutomationService:
|
||||
}
|
||||
|
||||
def _get_processed_count(self, stage: int) -> int:
|
||||
"""Get count of items processed in current stage"""
|
||||
"""Get count of items processed in current stage during this run"""
|
||||
if not self.run:
|
||||
return 0
|
||||
|
||||
result_key = f'stage_{stage}_result'
|
||||
result = getattr(self.run, result_key, {})
|
||||
|
||||
if not result:
|
||||
return 0
|
||||
|
||||
# Extract appropriate count from result
|
||||
# Count items that were updated during this run and changed status from pending
|
||||
if stage == 1:
|
||||
return result.get('keywords_processed', 0)
|
||||
# Keywords that changed status from 'new' during this run
|
||||
return Keywords.objects.filter(
|
||||
site=self.site,
|
||||
updated_at__gte=self.run.started_at
|
||||
).exclude(status='new').count()
|
||||
elif stage == 2:
|
||||
return result.get('clusters_processed', 0)
|
||||
# Clusters that changed status from 'new' during this run
|
||||
return Clusters.objects.filter(
|
||||
site=self.site,
|
||||
updated_at__gte=self.run.started_at
|
||||
).exclude(status='new').count()
|
||||
elif stage == 3:
|
||||
return result.get('ideas_processed', 0)
|
||||
# Ideas that changed status from 'approved' during this run
|
||||
return ContentIdeas.objects.filter(
|
||||
site=self.site,
|
||||
updated_at__gte=self.run.started_at
|
||||
).exclude(status='approved').count()
|
||||
elif stage == 4:
|
||||
return result.get('tasks_processed', 0)
|
||||
# Tasks that changed status from 'ready'/'queued' during this run
|
||||
return Tasks.objects.filter(
|
||||
site=self.site,
|
||||
updated_at__gte=self.run.started_at
|
||||
).exclude(status__in=['ready', 'queued']).count()
|
||||
elif stage == 5:
|
||||
return result.get('content_processed', 0)
|
||||
# Content processed for image prompts during this run
|
||||
return Content.objects.filter(
|
||||
site=self.site,
|
||||
updated_at__gte=self.run.started_at,
|
||||
images__isnull=False
|
||||
).distinct().count()
|
||||
elif stage == 6:
|
||||
return result.get('images_processed', 0)
|
||||
# Images completed during this run
|
||||
return Images.objects.filter(
|
||||
site=self.site,
|
||||
updated_at__gte=self.run.started_at,
|
||||
status='completed'
|
||||
).count()
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
@@ -47,6 +47,8 @@ class AutomationViewSet(viewsets.ViewSet):
|
||||
'is_enabled': False,
|
||||
'frequency': 'daily',
|
||||
'scheduled_time': '02:00',
|
||||
'within_stage_delay': 3,
|
||||
'between_stage_delay': 5,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -60,6 +62,8 @@ class AutomationViewSet(viewsets.ViewSet):
|
||||
'stage_4_batch_size': config.stage_4_batch_size,
|
||||
'stage_5_batch_size': config.stage_5_batch_size,
|
||||
'stage_6_batch_size': config.stage_6_batch_size,
|
||||
'within_stage_delay': config.within_stage_delay,
|
||||
'between_stage_delay': config.between_stage_delay,
|
||||
'last_run_at': config.last_run_at,
|
||||
'next_run_at': config.next_run_at,
|
||||
})
|
||||
@@ -107,10 +111,36 @@ class AutomationViewSet(viewsets.ViewSet):
|
||||
config.stage_5_batch_size = request.data['stage_5_batch_size']
|
||||
if 'stage_6_batch_size' in request.data:
|
||||
config.stage_6_batch_size = request.data['stage_6_batch_size']
|
||||
# Delay settings
|
||||
if 'within_stage_delay' in request.data:
|
||||
try:
|
||||
config.within_stage_delay = int(request.data['within_stage_delay'])
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
if 'between_stage_delay' in request.data:
|
||||
try:
|
||||
config.between_stage_delay = int(request.data['between_stage_delay'])
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
config.save()
|
||||
|
||||
return Response({'message': 'Config updated'})
|
||||
return Response({
|
||||
'message': 'Config updated',
|
||||
'is_enabled': config.is_enabled,
|
||||
'frequency': config.frequency,
|
||||
'scheduled_time': str(config.scheduled_time),
|
||||
'stage_1_batch_size': config.stage_1_batch_size,
|
||||
'stage_2_batch_size': config.stage_2_batch_size,
|
||||
'stage_3_batch_size': config.stage_3_batch_size,
|
||||
'stage_4_batch_size': config.stage_4_batch_size,
|
||||
'stage_5_batch_size': config.stage_5_batch_size,
|
||||
'stage_6_batch_size': config.stage_6_batch_size,
|
||||
'within_stage_delay': config.within_stage_delay,
|
||||
'between_stage_delay': config.between_stage_delay,
|
||||
'last_run_at': config.last_run_at,
|
||||
'next_run_at': config.next_run_at,
|
||||
})
|
||||
|
||||
@action(detail=False, methods=['post'])
|
||||
def run_now(self, request):
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
from django.core.management.base import BaseCommand
|
||||
import json
|
||||
from django.utils import timezone
|
||||
|
||||
from igny8_core.business.automation.models import AutomationRun
|
||||
from igny8_core.business.automation.services import AutomationService
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'Dump current processing state for all running automation runs to the logs (and stdout)'
|
||||
|
||||
def handle(self, *args, **options):
|
||||
runs = AutomationRun.objects.filter(status='running')
|
||||
if not runs.exists():
|
||||
self.stdout.write('No running automation runs found')
|
||||
return
|
||||
|
||||
for run in runs:
|
||||
try:
|
||||
svc = AutomationService.from_run_id(run.run_id)
|
||||
state = svc.get_current_processing_state()
|
||||
snapshot = {
|
||||
'timestamp': timezone.now().isoformat(),
|
||||
'run_id': run.run_id,
|
||||
'site_id': run.site.id,
|
||||
'account_id': run.account.id,
|
||||
'state': state,
|
||||
}
|
||||
# Append to a global processing snapshot file
|
||||
out_path = '/data/app/logs/automation/processing_snapshots.jsonl'
|
||||
try:
|
||||
with open(out_path, 'a') as f:
|
||||
f.write(json.dumps(snapshot) + "\n")
|
||||
except Exception as e:
|
||||
self.stderr.write(f'Failed to write snapshot to {out_path}: {e}')
|
||||
|
||||
# Also use the run-specific trace via logger
|
||||
try:
|
||||
svc.logger.append_trace(run.account.id, run.site.id, run.run_id, {
|
||||
'event': 'processing_snapshot',
|
||||
'snapshot': snapshot,
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self.stdout.write(f'Wrote snapshot for run {run.run_id}')
|
||||
except Exception as e:
|
||||
self.stderr.write(f'Error processing run {run.run_id}: {e}')
|
||||
@@ -222,6 +222,30 @@ class KeywordViewSet(SiteSectorModelViewSet):
|
||||
deleted_count, _ = queryset.filter(id__in=ids).delete()
|
||||
|
||||
return success_response(data={'deleted_count': deleted_count}, request=request)
|
||||
|
||||
@action(detail=False, methods=['post'], url_path='bulk_update', url_name='bulk_update')
|
||||
def bulk_update(self, request):
|
||||
"""Bulk update cluster status"""
|
||||
ids = request.data.get('ids', [])
|
||||
status_value = request.data.get('status')
|
||||
|
||||
if not ids:
|
||||
return error_response(
|
||||
error='No IDs provided',
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
request=request
|
||||
)
|
||||
if not status_value:
|
||||
return error_response(
|
||||
error='No status provided',
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
request=request
|
||||
)
|
||||
|
||||
queryset = self.get_queryset()
|
||||
updated_count = queryset.filter(id__in=ids).update(status=status_value)
|
||||
|
||||
return success_response(data={'updated_count': updated_count}, request=request)
|
||||
|
||||
@action(detail=False, methods=['post'], url_path='bulk_update', url_name='bulk_update')
|
||||
def bulk_update(self, request):
|
||||
|
||||
Reference in New Issue
Block a user