Automation revamp part 1

This commit is contained in:
IGNY8 VPS (Salman)
2025-12-28 01:46:27 +00:00
parent 0605f650b1
commit ea9125b805
9 changed files with 1237 additions and 58 deletions

View File

@@ -0,0 +1,22 @@
# Generated migration for adding initial_snapshot field to AutomationRun
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('automation', '0005_add_default_image_service'),
]
operations = [
migrations.AddField(
model_name='automationrun',
name='initial_snapshot',
field=models.JSONField(
blank=True,
default=dict,
help_text='Snapshot of initial queue sizes: {stage_1_initial, stage_2_initial, ..., total_initial_items}'
),
),
]

View File

@@ -88,6 +88,13 @@ class AutomationRun(models.Model):
total_credits_used = models.IntegerField(default=0)
# Initial queue snapshot - captured at run start for accurate progress tracking
initial_snapshot = models.JSONField(
default=dict,
blank=True,
help_text="Snapshot of initial queue sizes: {stage_1_initial, stage_2_initial, ..., total_initial_items}"
)
# JSON results per stage
stage_1_result = models.JSONField(null=True, blank=True, help_text="{keywords_processed, clusters_created, batches}")
stage_2_result = models.JSONField(null=True, blank=True, help_text="{clusters_processed, ideas_created}")

View File

@@ -109,6 +109,9 @@ class AutomationService:
# Create run_id and log files
run_id = self.logger.start_run(self.account.id, self.site.id, trigger_type)
# Capture initial queue snapshot for accurate progress tracking
initial_snapshot = self._capture_initial_snapshot()
# Create AutomationRun record
self.run = AutomationRun.objects.create(
run_id=run_id,
@@ -117,6 +120,7 @@ class AutomationService:
trigger_type=trigger_type,
status='running',
current_stage=1,
initial_snapshot=initial_snapshot,
)
# Log start
@@ -124,6 +128,10 @@ class AutomationService:
run_id, self.account.id, self.site.id, 0,
f"Automation started (trigger: {trigger_type})"
)
self.logger.log_stage_progress(
run_id, self.account.id, self.site.id, 0,
f"Initial snapshot captured: {initial_snapshot['total_initial_items']} total items across all stages"
)
self.logger.log_stage_progress(
run_id, self.account.id, self.site.id, 0,
f"Credit check: Account has {self.account.credits} credits, estimated need: {estimated_credits} credits"
@@ -1361,6 +1369,61 @@ class AutomationService:
logger.info(f"[AutomationService] Estimated credits: {total}")
return total
def _capture_initial_snapshot(self) -> dict:
"""
Capture initial queue sizes at run start for accurate progress tracking.
This snapshot is used to calculate global progress percentage correctly.
"""
# Stage 1: Keywords pending clustering
stage_1_initial = Keywords.objects.filter(
site=self.site, status='new', cluster__isnull=True, disabled=False
).count()
# Stage 2: Clusters needing ideas
stage_2_initial = Clusters.objects.filter(
site=self.site, status='new', disabled=False
).exclude(ideas__isnull=False).count()
# Stage 3: Ideas ready to be converted to tasks
stage_3_initial = ContentIdeas.objects.filter(
site=self.site, status='new'
).count()
# Stage 4: Tasks ready for content generation
stage_4_initial = Tasks.objects.filter(
site=self.site, status='queued'
).count()
# Stage 5: Content needing image prompts
stage_5_initial = Content.objects.filter(
site=self.site, status='draft'
).annotate(images_count=Count('images')).filter(images_count=0).count()
# Stage 6: Image prompts pending generation
stage_6_initial = Images.objects.filter(
site=self.site, status='pending'
).count()
# Stage 7: Content ready for review
stage_7_initial = Content.objects.filter(
site=self.site, status='review'
).count()
snapshot = {
'stage_1_initial': stage_1_initial,
'stage_2_initial': stage_2_initial,
'stage_3_initial': stage_3_initial,
'stage_4_initial': stage_4_initial,
'stage_5_initial': stage_5_initial,
'stage_6_initial': stage_6_initial,
'stage_7_initial': stage_7_initial,
'total_initial_items': stage_1_initial + stage_2_initial + stage_3_initial +
stage_4_initial + stage_5_initial + stage_6_initial + stage_7_initial,
}
logger.info(f"[AutomationService] Initial snapshot captured: {snapshot}")
return snapshot
# Helper methods
def _wait_for_task(self, task_id: str, stage_number: int, item_name: str, continue_on_error: bool = True):
@@ -1559,7 +1622,7 @@ class AutomationService:
def _get_stage_3_state(self) -> dict:
"""Get processing state for Stage 3: Ideas → Tasks"""
queue = ContentIdeas.objects.filter(
site=self.site, status='approved'
site=self.site, status='new' # Fixed: Match pipeline_overview status
).order_by('id')
processed = self._get_processed_count(3)
@@ -1580,7 +1643,7 @@ class AutomationService:
def _get_stage_4_state(self) -> dict:
"""Get processing state for Stage 4: Tasks → Content"""
queue = Tasks.objects.filter(
site=self.site, status='ready'
site=self.site, status='queued' # Fixed: Match pipeline_overview status
).order_by('id')
processed = self._get_processed_count(4)
@@ -1666,51 +1729,30 @@ class AutomationService:
}
def _get_processed_count(self, stage: int) -> int:
"""Get count of items processed in current stage during this run"""
"""
Get accurate processed count from stage result.
Uses stage-specific keys for correct counting instead of DB queries.
"""
if not self.run:
return 0
# Count items that were updated during this run and changed status from pending
if stage == 1:
# Keywords that changed status from 'new' during this run
return Keywords.objects.filter(
site=self.site,
updated_at__gte=self.run.started_at
).exclude(status='new').count()
elif stage == 2:
# Clusters that changed status from 'new' during this run
return Clusters.objects.filter(
site=self.site,
updated_at__gte=self.run.started_at
).exclude(status='new').count()
elif stage == 3:
# Ideas that changed status from 'approved' during this run
return ContentIdeas.objects.filter(
site=self.site,
updated_at__gte=self.run.started_at
).exclude(status='approved').count()
elif stage == 4:
# Tasks that changed status from 'ready'/'queued' during this run
return Tasks.objects.filter(
site=self.site,
updated_at__gte=self.run.started_at
).exclude(status__in=['ready', 'queued']).count()
elif stage == 5:
# Content processed for image prompts during this run
return Content.objects.filter(
site=self.site,
updated_at__gte=self.run.started_at,
images__isnull=False
).distinct().count()
elif stage == 6:
# Images completed during this run
return Images.objects.filter(
site=self.site,
updated_at__gte=self.run.started_at,
status='completed'
).count()
# Get the stage result from the run
result = getattr(self.run, f'stage_{stage}_result', None)
if not result:
return 0
return 0
# Map stage to correct result key for processed count
key_map = {
1: 'keywords_processed',
2: 'clusters_processed',
3: 'ideas_processed',
4: 'tasks_processed',
5: 'content_processed',
6: 'images_processed',
7: 'ready_for_review'
}
return result.get(key_map.get(stage, ''), 0)
def _get_current_items(self, queryset, count: int) -> list:
"""Get currently processing items"""

View File

@@ -714,3 +714,210 @@ class AutomationViewSet(viewsets.ViewSet):
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@extend_schema(tags=['Automation'])
@action(detail=False, methods=['get'], url_path='run_progress')
def run_progress(self, request):
"""
GET /api/v1/automation/run_progress/?site_id=123&run_id=abc
Unified endpoint for ALL run progress data - global + per-stage.
Replaces multiple separate API calls with single comprehensive response.
Response includes:
- run: Current run status and metadata
- global_progress: Overall pipeline progress percentage
- stages: Per-stage progress with input/output/processed counts
- metrics: Credits used, duration, errors
"""
site_id = request.query_params.get('site_id')
run_id = request.query_params.get('run_id')
if not site_id:
return Response(
{'error': 'site_id required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
site = get_object_or_404(Site, id=site_id, account=request.user.account)
# If no run_id, get current run
if run_id:
run = AutomationRun.objects.get(run_id=run_id, site=site)
else:
run = AutomationRun.objects.filter(
site=site,
status__in=['running', 'paused']
).order_by('-started_at').first()
if not run:
return Response({
'run': None,
'global_progress': None,
'stages': [],
'metrics': None
})
# Build unified response
response = self._build_run_progress_response(site, run)
return Response(response)
except AutomationRun.DoesNotExist:
return Response(
{'error': 'Run not found'},
status=status.HTTP_404_NOT_FOUND
)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def _build_run_progress_response(self, site, run):
"""Build comprehensive progress response for a run"""
from igny8_core.business.planning.models import Keywords, Clusters, ContentIdeas
from igny8_core.business.content.models import Tasks, Content, Images
from django.db.models import Count
from django.utils import timezone
initial_snapshot = run.initial_snapshot or {}
# Helper to get processed count from result
def get_processed(result, key):
if not result:
return 0
return result.get(key, 0)
# Helper to get output count from result
def get_output(result, key):
if not result:
return 0
return result.get(key, 0)
# Stage-specific key mapping for processed counts
processed_keys = {
1: 'keywords_processed',
2: 'clusters_processed',
3: 'ideas_processed',
4: 'tasks_processed',
5: 'content_processed',
6: 'images_processed',
7: 'ready_for_review'
}
# Stage-specific key mapping for output counts
output_keys = {
1: 'clusters_created',
2: 'ideas_created',
3: 'tasks_created',
4: 'content_created',
5: 'prompts_created',
6: 'images_generated',
7: 'ready_for_review'
}
# Build stages array
stages = []
total_processed = 0
total_initial = initial_snapshot.get('total_initial_items', 0)
stage_names = {
1: 'Keywords → Clusters',
2: 'Clusters → Ideas',
3: 'Ideas → Tasks',
4: 'Tasks → Content',
5: 'Content → Image Prompts',
6: 'Image Prompts → Images',
7: 'Manual Review Gate'
}
stage_types = {
1: 'AI', 2: 'AI', 3: 'Local', 4: 'AI', 5: 'AI', 6: 'AI', 7: 'Manual'
}
for stage_num in range(1, 8):
result = getattr(run, f'stage_{stage_num}_result', None)
initial_count = initial_snapshot.get(f'stage_{stage_num}_initial', 0)
processed = get_processed(result, processed_keys[stage_num])
output = get_output(result, output_keys[stage_num])
total_processed += processed
# Determine stage status
if run.current_stage > stage_num:
stage_status = 'completed'
elif run.current_stage == stage_num:
stage_status = 'active'
else:
stage_status = 'pending'
# Calculate progress percentage for this stage
progress = 0
if initial_count > 0:
progress = round((processed / initial_count) * 100)
elif run.current_stage > stage_num:
progress = 100
stage_data = {
'number': stage_num,
'name': stage_names[stage_num],
'type': stage_types[stage_num],
'status': stage_status,
'input_count': initial_count,
'output_count': output,
'processed_count': processed,
'progress_percentage': min(progress, 100),
'credits_used': result.get('credits_used', 0) if result else 0,
'time_elapsed': result.get('time_elapsed', '') if result else '',
}
# Add currently_processing for active stage
if stage_status == 'active':
try:
service = AutomationService.from_run_id(run.run_id)
processing_state = service.get_current_processing_state()
if processing_state:
stage_data['currently_processing'] = processing_state.get('currently_processing', [])
stage_data['up_next'] = processing_state.get('up_next', [])
stage_data['remaining_count'] = processing_state.get('remaining_count', 0)
except Exception:
pass
stages.append(stage_data)
# Calculate global progress
global_percentage = 0
if total_initial > 0:
global_percentage = round((total_processed / total_initial) * 100)
# Calculate duration
duration_seconds = 0
if run.started_at:
end_time = run.completed_at or timezone.now()
duration_seconds = int((end_time - run.started_at).total_seconds())
return {
'run': {
'run_id': run.run_id,
'status': run.status,
'current_stage': run.current_stage,
'trigger_type': run.trigger_type,
'started_at': run.started_at,
'completed_at': run.completed_at,
'paused_at': run.paused_at,
},
'global_progress': {
'total_items': total_initial,
'completed_items': total_processed,
'percentage': min(global_percentage, 100),
'current_stage': run.current_stage,
'total_stages': 7
},
'stages': stages,
'metrics': {
'credits_used': run.total_credits_used,
'duration_seconds': duration_seconds,
'errors': []
},
'initial_snapshot': initial_snapshot
}