automation fixes (part2)

This commit is contained in:
IGNY8 VPS (Salman)
2025-12-28 03:15:39 +00:00
parent d4b9c8693a
commit f92b3fba6e
4 changed files with 543 additions and 37 deletions

View File

@@ -284,6 +284,22 @@ class AutomationService:
stage_number, f"Batch {batch_num} complete"
)
# INCREMENTAL SAVE: Update stage result after each batch for real-time UI progress
clusters_so_far = Clusters.objects.filter(
site=self.site,
created_at__gte=self.run.started_at
).count()
self.run.stage_1_result = {
'keywords_processed': keywords_processed,
'keywords_total': len(keyword_ids),
'clusters_created': clusters_so_far,
'batches_run': batches_run,
'credits_used': self._get_credits_used() - credits_before,
'time_elapsed': self._format_time_elapsed(start_time),
'in_progress': True
}
self.run.save(update_fields=['stage_1_result'])
# Emit per-item trace event for UI progress tracking
try:
self.logger.append_trace(self.account.id, self.site.id, self.run.run_id, {
@@ -358,6 +374,10 @@ class AutomationService:
}
self.run.current_stage = 2
self.run.total_credits_used += credits_used
# UPDATE SNAPSHOT: Record new items created for Stage 2
self._update_snapshot_after_stage(1, {'stage_2_initial': clusters_created})
self.run.save()
logger.info(f"[AutomationService] Stage 1 complete: {keywords_processed} keywords → {clusters_created} clusters")
@@ -484,6 +504,21 @@ class AutomationService:
self.run.run_id, self.account.id, self.site.id,
stage_number, f"Cluster '{cluster.name}' complete"
)
# INCREMENTAL SAVE: Update stage result after each cluster for real-time UI progress
ideas_so_far = ContentIdeas.objects.filter(
site=self.site,
created_at__gte=self.run.started_at
).count()
self.run.stage_2_result = {
'clusters_processed': clusters_processed,
'clusters_total': total_count,
'ideas_created': ideas_so_far,
'credits_used': self._get_credits_used() - credits_before,
'time_elapsed': self._format_time_elapsed(start_time),
'in_progress': True
}
self.run.save(update_fields=['stage_2_result'])
except Exception as e:
# FIXED: Log error but continue processing remaining clusters
error_msg = f"Failed to generate ideas for cluster '{cluster.name}': {str(e)}"
@@ -525,6 +560,10 @@ class AutomationService:
}
self.run.current_stage = 3
self.run.total_credits_used += credits_used
# UPDATE SNAPSHOT: Record new items created for Stage 3
self._update_snapshot_after_stage(2, {'stage_3_initial': ideas_created})
self.run.save()
logger.info(f"[AutomationService] Stage 2 complete: {clusters_processed} clusters → {ideas_created} ideas")
@@ -687,6 +726,10 @@ class AutomationService:
'time_elapsed': time_elapsed
}
self.run.current_stage = 4
# UPDATE SNAPSHOT: Record new items created for Stage 4
self._update_snapshot_after_stage(3, {'stage_4_initial': tasks_created})
self.run.save()
logger.info(f"[AutomationService] Stage 3 complete: {ideas_processed} ideas → {tasks_created} tasks")
@@ -809,6 +852,21 @@ class AutomationService:
stage_number, f"Task '{task.title}' complete ({tasks_processed}/{total_tasks})"
)
# INCREMENTAL SAVE: Update stage result after each item for real-time UI progress
content_created_so_far = Content.objects.filter(
site=self.site,
created_at__gte=self.run.started_at
).count()
self.run.stage_4_result = {
'tasks_processed': tasks_processed,
'tasks_total': total_tasks,
'content_created': content_created_so_far,
'credits_used': self._get_credits_used() - credits_before,
'time_elapsed': self._format_time_elapsed(start_time),
'in_progress': True
}
self.run.save(update_fields=['stage_4_result'])
# Emit per-item trace event for UI progress tracking
try:
self.logger.append_trace(self.account.id, self.site.id, self.run.run_id, {
@@ -1424,6 +1482,32 @@ class AutomationService:
logger.info(f"[AutomationService] Initial snapshot captured: {snapshot}")
return snapshot
def _update_snapshot_after_stage(self, completed_stage: int, updates: dict):
"""
Update snapshot after a stage completes with new items created.
This ensures accurate counts for cascading stages.
Args:
completed_stage: The stage number that just completed
updates: Dict of snapshot keys to update, e.g., {'stage_4_initial': 12}
"""
if not self.run or not self.run.initial_snapshot:
return
snapshot = self.run.initial_snapshot.copy()
old_total = snapshot.get('total_initial_items', 0)
for key, value in updates.items():
old_value = snapshot.get(key, 0)
snapshot[key] = value
# Adjust total
old_total = old_total - old_value + value
snapshot['total_initial_items'] = old_total
self.run.initial_snapshot = snapshot
logger.info(f"[AutomationService] Snapshot updated after Stage {completed_stage}: {updates}, new total: {old_total}")
# Helper methods
def _wait_for_task(self, task_id: str, stage_number: int, item_name: str, continue_on_error: bool = True):
@@ -1584,7 +1668,16 @@ class AutomationService:
).order_by('id')
processed = self._get_processed_count(1)
total = queue.count() + processed
remaining = queue.count()
# Use keywords_total from incremental result if available
result = getattr(self.run, 'stage_1_result', None)
if result and result.get('keywords_total'):
total = result.get('keywords_total')
elif self.run and self.run.initial_snapshot:
total = self.run.initial_snapshot.get('stage_1_initial', remaining + processed)
else:
total = remaining + processed
return {
'stage_number': 1,
@@ -1595,7 +1688,7 @@ class AutomationService:
'percentage': round((processed / total * 100) if total > 0 else 0),
'currently_processing': self._get_current_items(queue, 3),
'up_next': self._get_next_items(queue, 2, skip=3),
'remaining_count': queue.count()
'remaining_count': remaining
}
def _get_stage_2_state(self) -> dict:
@@ -1605,7 +1698,16 @@ class AutomationService:
).order_by('id')
processed = self._get_processed_count(2)
total = queue.count() + processed
remaining = queue.count()
# Use clusters_total from incremental result if available
result = getattr(self.run, 'stage_2_result', None)
if result and result.get('clusters_total'):
total = result.get('clusters_total')
elif self.run and self.run.initial_snapshot:
total = self.run.initial_snapshot.get('stage_2_initial', remaining + processed)
else:
total = remaining + processed
return {
'stage_number': 2,
@@ -1616,7 +1718,7 @@ class AutomationService:
'percentage': round((processed / total * 100) if total > 0 else 0),
'currently_processing': self._get_current_items(queue, 1),
'up_next': self._get_next_items(queue, 2, skip=1),
'remaining_count': queue.count()
'remaining_count': remaining
}
def _get_stage_3_state(self) -> dict:
@@ -1647,7 +1749,17 @@ class AutomationService:
).order_by('id')
processed = self._get_processed_count(4)
total = queue.count() + processed
remaining = queue.count()
# Use tasks_total from incremental result if available (during active processing)
result = getattr(self.run, 'stage_4_result', None)
if result and result.get('tasks_total'):
total = result.get('tasks_total')
elif self.run and self.run.initial_snapshot:
# Fall back to snapshot (may be updated after Stage 3)
total = self.run.initial_snapshot.get('stage_4_initial', remaining + processed)
else:
total = remaining + processed
return {
'stage_number': 4,
@@ -1658,7 +1770,7 @@ class AutomationService:
'percentage': round((processed / total * 100) if total > 0 else 0),
'currently_processing': self._get_current_items(queue, 1),
'up_next': self._get_next_items(queue, 2, skip=1),
'remaining_count': queue.count()
'remaining_count': remaining
}
def _get_stage_5_state(self) -> dict: