""" Automation API Views REST API endpoints for automation management """ from rest_framework import viewsets, status from rest_framework.decorators import action from rest_framework.response import Response from rest_framework.permissions import IsAuthenticated from django.shortcuts import get_object_or_404 from django.utils import timezone from django.db.models import Count, Sum, Avg, F from datetime import timedelta from drf_spectacular.utils import extend_schema from igny8_core.business.automation.models import AutomationConfig, AutomationRun from igny8_core.business.automation.services import AutomationService from igny8_core.auth.models import Account, Site from igny8_core.business.planning.models import Keywords, Clusters, ContentIdeas from igny8_core.business.content.models import Tasks, Content, Images class AutomationViewSet(viewsets.ViewSet): """API endpoints for automation""" permission_classes = [IsAuthenticated] def _get_site(self, request): """Get site from request""" site_id = request.query_params.get('site_id') if not site_id: return None, Response( {'error': 'site_id required'}, status=status.HTTP_400_BAD_REQUEST ) site = get_object_or_404(Site, id=site_id, account=request.user.account) return site, None @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def config(self, request): """ GET /api/v1/automation/config/?site_id=123 Get automation configuration for site """ site, error_response = self._get_site(request) if error_response: return error_response config, _ = AutomationConfig.objects.get_or_create( account=site.account, site=site, defaults={ 'is_enabled': False, 'frequency': 'daily', 'scheduled_time': '02:00', 'within_stage_delay': 3, 'between_stage_delay': 5, } ) return Response({ 'is_enabled': config.is_enabled, 'frequency': config.frequency, 'scheduled_time': str(config.scheduled_time), 'stage_1_enabled': config.stage_1_enabled, 'stage_2_enabled': config.stage_2_enabled, 'stage_3_enabled': config.stage_3_enabled, 'stage_4_enabled': config.stage_4_enabled, 'stage_5_enabled': config.stage_5_enabled, 'stage_6_enabled': config.stage_6_enabled, 'stage_7_enabled': config.stage_7_enabled, 'stage_1_batch_size': config.stage_1_batch_size, 'stage_2_batch_size': config.stage_2_batch_size, 'stage_3_batch_size': config.stage_3_batch_size, 'stage_4_batch_size': config.stage_4_batch_size, 'stage_5_batch_size': config.stage_5_batch_size, 'stage_6_batch_size': config.stage_6_batch_size, 'within_stage_delay': config.within_stage_delay, 'between_stage_delay': config.between_stage_delay, 'last_run_at': config.last_run_at, 'next_run_at': config.next_run_at, }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['put']) def update_config(self, request): """ PUT /api/v1/automation/update_config/?site_id=123 Update automation configuration Body: { "is_enabled": true, "frequency": "daily", "scheduled_time": "02:00", "stage_1_batch_size": 50, ... } """ site, error_response = self._get_site(request) if error_response: return error_response config, _ = AutomationConfig.objects.get_or_create( account=site.account, site=site ) # Update fields if 'is_enabled' in request.data: config.is_enabled = request.data['is_enabled'] if 'frequency' in request.data: config.frequency = request.data['frequency'] if 'scheduled_time' in request.data: config.scheduled_time = request.data['scheduled_time'] # Stage enabled toggles if 'stage_1_enabled' in request.data: config.stage_1_enabled = request.data['stage_1_enabled'] if 'stage_2_enabled' in request.data: config.stage_2_enabled = request.data['stage_2_enabled'] if 'stage_3_enabled' in request.data: config.stage_3_enabled = request.data['stage_3_enabled'] if 'stage_4_enabled' in request.data: config.stage_4_enabled = request.data['stage_4_enabled'] if 'stage_5_enabled' in request.data: config.stage_5_enabled = request.data['stage_5_enabled'] if 'stage_6_enabled' in request.data: config.stage_6_enabled = request.data['stage_6_enabled'] if 'stage_7_enabled' in request.data: config.stage_7_enabled = request.data['stage_7_enabled'] # Batch sizes if 'stage_1_batch_size' in request.data: config.stage_1_batch_size = request.data['stage_1_batch_size'] if 'stage_2_batch_size' in request.data: config.stage_2_batch_size = request.data['stage_2_batch_size'] if 'stage_3_batch_size' in request.data: config.stage_3_batch_size = request.data['stage_3_batch_size'] if 'stage_4_batch_size' in request.data: config.stage_4_batch_size = request.data['stage_4_batch_size'] if 'stage_5_batch_size' in request.data: config.stage_5_batch_size = request.data['stage_5_batch_size'] if 'stage_6_batch_size' in request.data: config.stage_6_batch_size = request.data['stage_6_batch_size'] # Delay settings if 'within_stage_delay' in request.data: try: config.within_stage_delay = int(request.data['within_stage_delay']) except (TypeError, ValueError): pass if 'between_stage_delay' in request.data: try: config.between_stage_delay = int(request.data['between_stage_delay']) except (TypeError, ValueError): pass config.save() return Response({ 'message': 'Config updated', 'is_enabled': config.is_enabled, 'frequency': config.frequency, 'scheduled_time': str(config.scheduled_time), 'stage_1_enabled': config.stage_1_enabled, 'stage_2_enabled': config.stage_2_enabled, 'stage_3_enabled': config.stage_3_enabled, 'stage_4_enabled': config.stage_4_enabled, 'stage_5_enabled': config.stage_5_enabled, 'stage_6_enabled': config.stage_6_enabled, 'stage_7_enabled': config.stage_7_enabled, 'stage_1_batch_size': config.stage_1_batch_size, 'stage_2_batch_size': config.stage_2_batch_size, 'stage_3_batch_size': config.stage_3_batch_size, 'stage_4_batch_size': config.stage_4_batch_size, 'stage_5_batch_size': config.stage_5_batch_size, 'stage_6_batch_size': config.stage_6_batch_size, 'within_stage_delay': config.within_stage_delay, 'between_stage_delay': config.between_stage_delay, 'last_run_at': config.last_run_at, 'next_run_at': config.next_run_at, }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['post']) def run_now(self, request): """ POST /api/v1/automation/run_now/?site_id=123 Trigger automation run immediately """ site, error_response = self._get_site(request) if error_response: return error_response try: service = AutomationService(site.account, site) run_id = service.start_automation(trigger_type='manual') # Start async processing from igny8_core.business.automation.tasks import run_automation_task run_automation_task.delay(run_id) return Response({ 'run_id': run_id, 'message': 'Automation started' }) except ValueError as e: return Response( {'error': str(e)}, status=status.HTTP_400_BAD_REQUEST ) except Exception as e: return Response( {'error': f'Failed to start automation: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def current_run(self, request): """ GET /api/v1/automation/current_run/?site_id=123 Get current automation run status """ site, error_response = self._get_site(request) if error_response: return error_response run = AutomationRun.objects.filter( site=site, status__in=['running', 'paused'] ).order_by('-started_at').first() if not run: return Response({'run': None}) return Response({ 'run': { 'run_id': run.run_id, 'status': run.status, 'current_stage': run.current_stage, 'trigger_type': run.trigger_type, 'started_at': run.started_at, 'total_credits_used': run.total_credits_used, 'stage_1_result': run.stage_1_result, 'stage_2_result': run.stage_2_result, 'stage_3_result': run.stage_3_result, 'stage_4_result': run.stage_4_result, 'stage_5_result': run.stage_5_result, 'stage_6_result': run.stage_6_result, 'stage_7_result': run.stage_7_result, } }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['post']) def pause(self, request): """ POST /api/v1/automation/pause/?run_id=abc123 Pause automation run """ run_id = request.query_params.get('run_id') if not run_id: return Response( {'error': 'run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: service = AutomationService.from_run_id(run_id) service.pause_automation() return Response({'message': 'Automation paused'}) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['post']) def resume(self, request): """ POST /api/v1/automation/resume/?run_id=abc123 Resume paused automation run """ run_id = request.query_params.get('run_id') if not run_id: return Response( {'error': 'run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: service = AutomationService.from_run_id(run_id) service.resume_automation() # Resume async processing from igny8_core.business.automation.tasks import resume_automation_task resume_automation_task.delay(run_id) return Response({'message': 'Automation resumed'}) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) def _calculate_run_number(self, site, run): """Calculate sequential run number for a site""" return AutomationRun.objects.filter( site=site, started_at__lte=run.started_at ).count() def _calculate_historical_averages(self, site, completed_runs): """Calculate historical averages from completed runs""" if completed_runs.count() < 3: # Not enough data, return defaults return { 'period_days': 30, 'runs_analyzed': completed_runs.count(), 'avg_credits_stage_1': 0.2, 'avg_credits_stage_2': 2.0, 'avg_credits_stage_4': 5.0, 'avg_credits_stage_5': 2.0, 'avg_credits_stage_6': 2.0, 'avg_output_ratio_stage_1': 0.125, 'avg_output_ratio_stage_2': 8.7, 'avg_output_ratio_stage_5': 4.0, 'avg_output_ratio_stage_6': 1.0, } # Calculate per-stage averages stage_1_credits = [] stage_2_credits = [] stage_4_credits = [] stage_5_credits = [] stage_6_credits = [] output_ratios_1 = [] output_ratios_2 = [] output_ratios_5 = [] output_ratios_6 = [] for run in completed_runs[:10]: # Last 10 runs if run.stage_1_result: processed = run.stage_1_result.get('keywords_processed', 0) created = run.stage_1_result.get('clusters_created', 0) credits = run.stage_1_result.get('credits_used', 0) if processed > 0: stage_1_credits.append(credits / processed) if created > 0 and processed > 0: output_ratios_1.append(created / processed) if run.stage_2_result: processed = run.stage_2_result.get('clusters_processed', 0) created = run.stage_2_result.get('ideas_created', 0) credits = run.stage_2_result.get('credits_used', 0) if processed > 0: stage_2_credits.append(credits / processed) if created > 0 and processed > 0: output_ratios_2.append(created / processed) if run.stage_4_result: processed = run.stage_4_result.get('tasks_processed', 0) credits = run.stage_4_result.get('credits_used', 0) if processed > 0: stage_4_credits.append(credits / processed) if run.stage_5_result: processed = run.stage_5_result.get('content_processed', 0) created = run.stage_5_result.get('prompts_created', 0) credits = run.stage_5_result.get('credits_used', 0) if processed > 0: stage_5_credits.append(credits / processed) if created > 0 and processed > 0: output_ratios_5.append(created / processed) if run.stage_6_result: processed = run.stage_6_result.get('images_processed', 0) created = run.stage_6_result.get('images_generated', 0) credits = run.stage_6_result.get('credits_used', 0) if processed > 0: stage_6_credits.append(credits / processed) if created > 0 and processed > 0: output_ratios_6.append(created / processed) def avg(lst): return sum(lst) / len(lst) if lst else 0 return { 'period_days': 30, 'runs_analyzed': min(completed_runs.count(), 10), 'avg_credits_stage_1': round(avg(stage_1_credits), 2), 'avg_credits_stage_2': round(avg(stage_2_credits), 2), 'avg_credits_stage_4': round(avg(stage_4_credits), 2), 'avg_credits_stage_5': round(avg(stage_5_credits), 2), 'avg_credits_stage_6': round(avg(stage_6_credits), 2), 'avg_output_ratio_stage_1': round(avg(output_ratios_1), 3), 'avg_output_ratio_stage_2': round(avg(output_ratios_2), 1), 'avg_output_ratio_stage_5': round(avg(output_ratios_5), 1), 'avg_output_ratio_stage_6': round(avg(output_ratios_6), 1), } def _calculate_predictive_analysis(self, site, historical_averages): """Calculate predictive cost and output analysis""" # Get pending counts pending_keywords = Keywords.objects.filter(site=site, status='new', disabled=False).count() pending_clusters = Clusters.objects.filter(site=site, status='new', disabled=False).exclude(ideas__isnull=False).count() pending_ideas = ContentIdeas.objects.filter(site=site, status='new').count() pending_tasks = Tasks.objects.filter(site=site, status='queued').count() pending_content = Content.objects.filter(site=site, status='draft').annotate(images_count=Count('images')).filter(images_count=0).count() pending_images = Images.objects.filter(site=site, status='pending').count() pending_review = Content.objects.filter(site=site, status='review').count() # Calculate estimates using historical averages stage_1_credits = int(pending_keywords * historical_averages['avg_credits_stage_1']) stage_2_credits = int(pending_clusters * historical_averages['avg_credits_stage_2']) stage_4_credits = int(pending_tasks * historical_averages['avg_credits_stage_4']) stage_5_credits = int(pending_content * historical_averages['avg_credits_stage_5']) stage_6_credits = int(pending_images * historical_averages['avg_credits_stage_6']) total_estimated = stage_1_credits + stage_2_credits + stage_4_credits + stage_5_credits + stage_6_credits recommended_buffer = int(total_estimated * 1.2) # Calculate expected outputs expected_clusters = int(pending_keywords * historical_averages['avg_output_ratio_stage_1']) if historical_averages['avg_output_ratio_stage_1'] > 0 else 0 expected_ideas = int(pending_clusters * historical_averages['avg_output_ratio_stage_2']) if historical_averages['avg_output_ratio_stage_2'] > 0 else 0 expected_prompts = int(pending_content * historical_averages['avg_output_ratio_stage_5']) if historical_averages['avg_output_ratio_stage_5'] > 0 else 0 expected_images = int(pending_images * historical_averages['avg_output_ratio_stage_6']) if historical_averages['avg_output_ratio_stage_6'] > 0 else 0 return { 'stages': [ { 'stage': 1, 'name': 'Keywords → Clusters', 'pending_items': pending_keywords, 'avg_credits_per_item': historical_averages['avg_credits_stage_1'], 'estimated_credits': stage_1_credits, 'avg_output_ratio': historical_averages['avg_output_ratio_stage_1'], 'estimated_output': expected_clusters, 'output_type': 'clusters' }, { 'stage': 2, 'name': 'Clusters → Ideas', 'pending_items': pending_clusters, 'avg_credits_per_item': historical_averages['avg_credits_stage_2'], 'estimated_credits': stage_2_credits, 'avg_output_ratio': historical_averages['avg_output_ratio_stage_2'], 'estimated_output': expected_ideas, 'output_type': 'ideas' }, { 'stage': 3, 'name': 'Ideas → Tasks', 'pending_items': pending_ideas, 'avg_credits_per_item': 0, 'estimated_credits': 0, 'avg_output_ratio': 1.0, 'estimated_output': pending_ideas, 'output_type': 'tasks' }, { 'stage': 4, 'name': 'Tasks → Content', 'pending_items': pending_tasks, 'avg_credits_per_item': historical_averages['avg_credits_stage_4'], 'estimated_credits': stage_4_credits, 'avg_output_ratio': 1.0, 'estimated_output': pending_tasks, 'output_type': 'content' }, { 'stage': 5, 'name': 'Content → Image Prompts', 'pending_items': pending_content, 'avg_credits_per_item': historical_averages['avg_credits_stage_5'], 'estimated_credits': stage_5_credits, 'avg_output_ratio': historical_averages['avg_output_ratio_stage_5'], 'estimated_output': expected_prompts, 'output_type': 'prompts' }, { 'stage': 6, 'name': 'Image Prompts → Images', 'pending_items': pending_images, 'avg_credits_per_item': historical_averages['avg_credits_stage_6'], 'estimated_credits': stage_6_credits, 'avg_output_ratio': historical_averages['avg_output_ratio_stage_6'], 'estimated_output': expected_images, 'output_type': 'images' }, { 'stage': 7, 'name': 'Review → Approved', 'pending_items': pending_review, 'avg_credits_per_item': 0, 'estimated_credits': 0, 'avg_output_ratio': 1.0, 'estimated_output': pending_review, 'output_type': 'approved' }, ], 'total_estimated_credits': total_estimated, 'recommended_buffer': recommended_buffer, 'current_balance': site.account.credits, 'is_sufficient': site.account.credits >= recommended_buffer, 'expected_outputs': { 'clusters': expected_clusters, 'ideas': expected_ideas, 'content': pending_tasks, 'images': expected_images, } } def _get_attention_items(self, site): """Get items requiring attention""" # Count items with issues skipped_ideas = ContentIdeas.objects.filter(site=site, status='skipped').count() failed_content = Content.objects.filter(site=site, status='failed').count() failed_images = Images.objects.filter(site=site, status='failed').count() return { 'skipped_ideas': skipped_ideas, 'failed_content': failed_content, 'failed_images': failed_images, 'total_attention_needed': skipped_ideas + failed_content + failed_images, } @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def overview_stats(self, request): """ GET /api/v1/automation/overview_stats/?site_id=123 Get comprehensive automation statistics for overview page """ site, error_response = self._get_site(request) if error_response: return error_response # Calculate run statistics from last 30 days thirty_days_ago = timezone.now() - timedelta(days=30) seven_days_ago = timezone.now() - timedelta(days=7) fourteen_days_ago = timezone.now() - timedelta(days=14) all_runs = AutomationRun.objects.filter(site=site) recent_runs = all_runs.filter(started_at__gte=thirty_days_ago) this_week_runs = all_runs.filter(started_at__gte=seven_days_ago) last_week_runs = all_runs.filter(started_at__gte=fourteen_days_ago, started_at__lt=seven_days_ago) completed_runs = recent_runs.filter(status='completed') failed_runs = recent_runs.filter(status='failed') # Calculate averages from completed runs avg_duration = completed_runs.annotate( duration=F('completed_at') - F('started_at') ).aggregate(avg=Avg('duration'))['avg'] avg_credits = completed_runs.aggregate(avg=Avg('total_credits_used'))['avg'] or 0 # Calculate historical averages per stage historical_averages = self._calculate_historical_averages(site, completed_runs) # Get pending items and calculate predictions predictive_analysis = self._calculate_predictive_analysis(site, historical_averages) # Get attention items (failed/skipped) attention_items = self._get_attention_items(site) # Calculate trends last_week_avg_credits = last_week_runs.filter(status='completed').aggregate(avg=Avg('total_credits_used'))['avg'] or 0 credits_trend = 0 if last_week_avg_credits > 0: this_week_avg = this_week_runs.filter(status='completed').aggregate(avg=Avg('total_credits_used'))['avg'] or 0 credits_trend = round(((this_week_avg - last_week_avg_credits) / last_week_avg_credits) * 100, 1) return Response({ 'run_statistics': { 'total_runs': all_runs.count(), 'completed_runs': completed_runs.count(), 'failed_runs': failed_runs.count(), 'success_rate': round(completed_runs.count() / recent_runs.count() * 100, 1) if recent_runs.count() > 0 else 0, 'avg_duration_seconds': int(avg_duration.total_seconds()) if avg_duration else 0, 'avg_credits_per_run': round(avg_credits, 1), 'runs_this_week': this_week_runs.count(), 'runs_last_week': last_week_runs.count(), 'credits_trend': credits_trend, }, 'predictive_analysis': predictive_analysis, 'attention_items': attention_items, 'historical_averages': historical_averages, }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def history(self, request): """ GET /api/v1/automation/history/?site_id=123 Get automation run history """ site, error_response = self._get_site(request) if error_response: return error_response # Get pagination params page = int(request.query_params.get('page', 1)) page_size = int(request.query_params.get('page_size', 20)) runs_qs = AutomationRun.objects.filter(site=site).order_by('-started_at') total_count = runs_qs.count() # Paginate start = (page - 1) * page_size end = start + page_size runs = runs_qs[start:end] # Build response with enhanced data runs_data = [] for run in runs: # Calculate run number run_number = self._calculate_run_number(site, run) # Calculate duration duration_seconds = 0 if run.completed_at and run.started_at: duration_seconds = int((run.completed_at - run.started_at).total_seconds()) # Count completed and failed stages stages_completed = 0 stages_failed = 0 stage_statuses = [] for stage_num in range(1, 8): result = getattr(run, f'stage_{stage_num}_result', None) if result: if result.get('credits_used', 0) >= 0: # Stage ran stages_completed += 1 stage_statuses.append('completed') else: stages_failed += 1 stage_statuses.append('failed') else: if run.status == 'completed' and stage_num <= run.current_stage: stage_statuses.append('skipped') else: stage_statuses.append('pending') # Calculate summary stats from stage results items_processed = run.initial_snapshot.get('total_initial_items', 0) if run.initial_snapshot else 0 items_created = 0 content_created = 0 images_generated = 0 if run.stage_1_result: items_created += run.stage_1_result.get('clusters_created', 0) if run.stage_2_result: items_created += run.stage_2_result.get('ideas_created', 0) if run.stage_4_result: content_created = run.stage_4_result.get('content_created', 0) items_created += content_created if run.stage_6_result: images_generated = run.stage_6_result.get('images_generated', 0) items_created += images_generated runs_data.append({ 'run_id': run.run_id, 'run_number': run_number, 'run_title': f"{site.domain} #{run_number}", 'status': run.status, 'trigger_type': run.trigger_type, 'started_at': run.started_at, 'completed_at': run.completed_at, 'duration_seconds': duration_seconds, 'total_credits_used': run.total_credits_used, 'current_stage': run.current_stage, 'stages_completed': stages_completed, 'stages_failed': stages_failed, 'initial_snapshot': run.initial_snapshot or {}, 'summary': { 'items_processed': items_processed, 'items_created': items_created, 'content_created': content_created, 'images_generated': images_generated, }, 'stage_statuses': stage_statuses, }) return Response({ 'runs': runs_data, 'pagination': { 'page': page, 'page_size': page_size, 'total_count': total_count, 'total_pages': (total_count + page_size - 1) // page_size, } }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def run_detail(self, request): """ GET /api/v1/automation/run_detail/?run_id=abc123 Get detailed information about a specific automation run """ site, error_response = self._get_site(request) if error_response: return error_response run_id = request.query_params.get('run_id') if not run_id: return Response( {'error': 'run_id parameter is required'}, status=status.HTTP_400_BAD_REQUEST ) try: run = AutomationRun.objects.get(run_id=run_id, site=site) except AutomationRun.DoesNotExist: return Response( {'error': 'Automation run not found'}, status=status.HTTP_404_NOT_FOUND ) # Basic run info run_number = self._calculate_run_number(site, run) duration_seconds = 0 if run.completed_at and run.started_at: duration_seconds = int((run.completed_at - run.started_at).total_seconds()) # Get historical averages for comparison completed_runs = AutomationRun.objects.filter( site=site, status='completed' ).order_by('-completed_at')[:10] historical_averages = self._calculate_historical_averages(site, completed_runs) # Build detailed stage analysis stages = [] total_credits = 0 total_items_processed = 0 total_items_created = 0 stage_names = [ 'Keyword Clustering', 'Idea Generation', 'Task Creation', 'Content Writing', 'Content SEO Optimization', 'Image Generation', 'Image SEO Optimization' ] for stage_num in range(1, 8): result = getattr(run, f'stage_{stage_num}_result', None) or {} credits_used = result.get('credits_used', 0) items_processed = result.get('items_processed', 0) items_created = result.get('items_created', 0) # Try alternative field names if items_created == 0: items_created = result.get('clusters_created', 0) items_created += result.get('ideas_created', 0) items_created += result.get('tasks_created', 0) items_created += result.get('content_created', 0) items_created += result.get('images_generated', 0) stage_status = 'pending' if result: if credits_used > 0 or items_created > 0: stage_status = 'completed' elif result.get('error'): stage_status = 'failed' elif run.status == 'completed' and stage_num <= run.current_stage: stage_status = 'skipped' # Compare to historical averages historical_credits = 0 historical_items = 0 if historical_averages['stages']: for hist_stage in historical_averages['stages']: if hist_stage['stage_number'] == stage_num: historical_credits = hist_stage['avg_credits'] historical_items = hist_stage['avg_items_created'] break credit_variance = 0 items_variance = 0 if historical_credits > 0: credit_variance = ((credits_used - historical_credits) / historical_credits) * 100 if historical_items > 0: items_variance = ((items_created - historical_items) / historical_items) * 100 stages.append({ 'stage_number': stage_num, 'stage_name': stage_names[stage_num - 1], 'status': stage_status, 'credits_used': credits_used, 'items_processed': items_processed, 'items_created': items_created, 'duration_seconds': result.get('duration', 0), 'error': result.get('error', ''), 'comparison': { 'historical_avg_credits': historical_credits, 'historical_avg_items': historical_items, 'credit_variance_pct': round(credit_variance, 1), 'items_variance_pct': round(items_variance, 1), } }) total_credits += credits_used total_items_processed += items_processed total_items_created += items_created # Calculate efficiency metrics efficiency = { 'credits_per_item': round(total_credits / total_items_created, 2) if total_items_created > 0 else 0, 'items_per_minute': round(total_items_created / (duration_seconds / 60), 2) if duration_seconds > 0 else 0, 'credits_per_minute': round(total_credits / (duration_seconds / 60), 2) if duration_seconds > 0 else 0, } # Generate insights insights = [] # Check for variance issues for stage in stages: comp = stage['comparison'] if abs(comp['credit_variance_pct']) > 20: direction = 'higher' if comp['credit_variance_pct'] > 0 else 'lower' insights.append({ 'type': 'variance', 'severity': 'warning' if abs(comp['credit_variance_pct']) > 50 else 'info', 'message': f"{stage['stage_name']} used {abs(comp['credit_variance_pct']):.0f}% {direction} credits than average" }) # Check for failures for stage in stages: if stage['status'] == 'failed': insights.append({ 'type': 'error', 'severity': 'error', 'message': f"{stage['stage_name']} failed: {stage['error']}" }) # Check efficiency if historical_averages['avg_credits_per_item'] > 0: efficiency_diff = ((efficiency['credits_per_item'] - historical_averages['avg_credits_per_item']) / historical_averages['avg_credits_per_item']) * 100 if efficiency_diff < -10: insights.append({ 'type': 'success', 'severity': 'info', 'message': f"This run was {abs(efficiency_diff):.0f}% more credit-efficient than average" }) elif efficiency_diff > 10: insights.append({ 'type': 'warning', 'severity': 'warning', 'message': f"This run used {efficiency_diff:.0f}% more credits per item than average" }) return Response({ 'run': { 'run_id': run.run_id, 'run_number': run_number, 'run_title': f"{site.domain} #{run_number}", 'status': run.status, 'trigger_type': run.trigger_type, 'started_at': run.started_at, 'completed_at': run.completed_at, 'duration_seconds': duration_seconds, 'current_stage': run.current_stage, 'total_credits_used': total_credits, 'initial_snapshot': run.initial_snapshot or {}, }, 'stages': stages, 'efficiency': efficiency, 'insights': insights, 'historical_comparison': { 'avg_credits': historical_averages['avg_total_credits'], 'avg_duration_seconds': historical_averages['avg_duration_seconds'], 'avg_credits_per_item': historical_averages['avg_credits_per_item'], } }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def logs(self, request): """ GET /api/v1/automation/logs/?run_id=abc123&lines=100 Get automation run logs """ run_id = request.query_params.get('run_id') if not run_id: return Response( {'error': 'run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: run = AutomationRun.objects.get(run_id=run_id) service = AutomationService(run.account, run.site) lines = int(request.query_params.get('lines', 100)) log_text = service.logger.get_activity_log( run.account.id, run.site.id, run_id, lines ) return Response({ 'run_id': run_id, 'log': log_text }) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def estimate(self, request): """ GET /api/v1/automation/estimate/?site_id=123 Estimate credits needed for automation """ site, error_response = self._get_site(request) if error_response: return error_response service = AutomationService(site.account, site) estimated_credits = service.estimate_credits() return Response({ 'estimated_credits': estimated_credits, 'current_balance': site.account.credits, 'sufficient': site.account.credits >= (estimated_credits * 1.2) }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def pipeline_overview(self, request): """ GET /api/v1/automation/pipeline_overview/?site_id=123 Get pipeline overview with pending counts for all stages """ site, error_response = self._get_site(request) if error_response: return error_response from igny8_core.business.planning.models import Keywords, Clusters, ContentIdeas from igny8_core.business.content.models import Tasks, Content, Images from django.db.models import Count def _counts_by_status(model, extra_filter=None, exclude_filter=None): """Return a dict of counts keyed by status and the total for a given model and site.""" qs = model.objects.filter(site=site) if extra_filter: qs = qs.filter(**extra_filter) if exclude_filter: qs = qs.exclude(**exclude_filter) # Group by status when available try: rows = qs.values('status').annotate(count=Count('id')) counts = {r['status']: r['count'] for r in rows} total = sum(counts.values()) except Exception: # Fallback: count all total = qs.count() counts = {'total': total} return counts, total # Stage 1: Keywords pending clustering stage_1_counts, stage_1_total = _counts_by_status( Keywords, extra_filter={'disabled': False} ) # FIXED: Stage 1 pending = all keywords with status='new' (ready for clustering) # This should match the "New" count shown in Keywords metric card # Previously filtered by cluster__isnull=True which caused mismatch stage_1_pending = Keywords.objects.filter( site=site, status='new', disabled=False ).count() # Stage 2: Clusters needing ideas stage_2_counts, stage_2_total = _counts_by_status( Clusters, extra_filter={'disabled': False} ) stage_2_pending = Clusters.objects.filter( site=site, status='new', disabled=False ).exclude( ideas__isnull=False ).count() # Stage 3: Ideas ready to queue stage_3_counts, stage_3_total = _counts_by_status(ContentIdeas) stage_3_pending = ContentIdeas.objects.filter( site=site, status='new' ).count() # Stage 4: Tasks ready for content generation stage_4_counts, stage_4_total = _counts_by_status(Tasks) stage_4_pending = Tasks.objects.filter( site=site, status='queued' ).count() # Stage 5: Content ready for image prompts # We will provide counts per content status and also compute pending as previous (draft with 0 images) stage_5_counts, stage_5_total = _counts_by_status(Content) stage_5_pending = Content.objects.filter( site=site, status='draft' ).annotate( images_count=Count('images') ).filter( images_count=0 ).count() # Stage 6: Image prompts ready for generation stage_6_counts, stage_6_total = _counts_by_status(Images) stage_6_pending = Images.objects.filter( site=site, status='pending' ).count() # Stage 7: Content ready for review # Provide counts per status for content and keep previous "review" pending count stage_7_counts, stage_7_total = _counts_by_status(Content) stage_7_ready = Content.objects.filter( site=site, status='review' ).count() return Response({ 'stages': [ { 'number': 1, 'name': 'Keywords → Clusters', 'pending': stage_1_pending, 'type': 'AI', 'counts': stage_1_counts, 'total': stage_1_total }, { 'number': 2, 'name': 'Clusters → Ideas', 'pending': stage_2_pending, 'type': 'AI', 'counts': stage_2_counts, 'total': stage_2_total }, { 'number': 3, 'name': 'Ideas → Tasks', 'pending': stage_3_pending, 'type': 'Local', 'counts': stage_3_counts, 'total': stage_3_total }, { 'number': 4, 'name': 'Tasks → Content', 'pending': stage_4_pending, 'type': 'AI', 'counts': stage_4_counts, 'total': stage_4_total }, { 'number': 5, 'name': 'Content → Image Prompts', 'pending': stage_5_pending, 'type': 'AI', 'counts': stage_5_counts, 'total': stage_5_total }, { 'number': 6, 'name': 'Image Prompts → Images', 'pending': stage_6_pending, 'type': 'AI', 'counts': stage_6_counts, 'total': stage_6_total }, { 'number': 7, 'name': 'Manual Review Gate', 'pending': stage_7_ready, 'type': 'Manual', 'counts': stage_7_counts, 'total': stage_7_total } ] }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def eligibility(self, request): """ GET /api/v1/automation/eligibility/?site_id=123 Check if site is eligible for automation. A site is eligible if it has ANY data in the pipeline: - At least one keyword, OR - At least one cluster, OR - At least one idea, OR - At least one task, OR - At least one content item, OR - At least one image Sites with zero data across ALL entities are not eligible. """ site, error_response = self._get_site(request) if error_response: return error_response from igny8_core.business.planning.models import Keywords, Clusters, ContentIdeas from igny8_core.business.content.models import Tasks, Content, Images # Check total counts for each entity keywords_total = Keywords.objects.filter(site=site, disabled=False).count() clusters_total = Clusters.objects.filter(site=site, disabled=False).count() ideas_total = ContentIdeas.objects.filter(site=site).count() tasks_total = Tasks.objects.filter(site=site).count() content_total = Content.objects.filter(site=site).count() images_total = Images.objects.filter(site=site).count() # Site is eligible if ANY of these totals is > 0 total_items = keywords_total + clusters_total + ideas_total + tasks_total + content_total + images_total is_eligible = total_items > 0 # Provide details for the UI return Response({ 'is_eligible': is_eligible, 'totals': { 'keywords': keywords_total, 'clusters': clusters_total, 'ideas': ideas_total, 'tasks': tasks_total, 'content': content_total, 'images': images_total, }, 'total_items': total_items, 'message': None if is_eligible else 'This site has no data yet. Add keywords in the Planner module to get started with automation.' }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get'], url_path='current_processing') def current_processing(self, request): """ GET /api/v1/automation/current_processing/?site_id=123&run_id=abc Get current processing state for active automation run """ site_id = request.query_params.get('site_id') run_id = request.query_params.get('run_id') if not site_id or not run_id: return Response( {'error': 'site_id and run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: # Get the site site = get_object_or_404(Site, id=site_id, account=request.user.account) # Get the run run = AutomationRun.objects.get(run_id=run_id, site=site) # If not running or paused, return minimal state with updated credits if run.status not in ('running', 'paused'): return Response({'data': None}) # Get current processing state service = AutomationService.from_run_id(run_id) state = service.get_current_processing_state() # Refresh run to get latest total_credits_used run.refresh_from_db() # Add updated credits info to response response_data = { 'state': state, 'total_credits_used': run.total_credits_used, 'current_stage': run.current_stage, } return Response({'data': response_data}) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) except Exception as e: return Response( {'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['post'], url_path='pause') def pause_automation(self, request): """ POST /api/v1/automation/pause/?site_id=123&run_id=abc Pause current automation run Will complete current queue item then pause before next item """ site_id = request.query_params.get('site_id') run_id = request.query_params.get('run_id') if not site_id or not run_id: return Response( {'error': 'site_id and run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: site = get_object_or_404(Site, id=site_id, account=request.user.account) run = AutomationRun.objects.get(run_id=run_id, site=site) if run.status != 'running': return Response( {'error': f'Cannot pause automation with status: {run.status}'}, status=status.HTTP_400_BAD_REQUEST ) # Update status to paused run.status = 'paused' run.paused_at = timezone.now() run.save(update_fields=['status', 'paused_at']) return Response({ 'message': 'Automation paused', 'status': run.status, 'paused_at': run.paused_at }) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) except Exception as e: return Response( {'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['post'], url_path='resume') def resume_automation(self, request): """ POST /api/v1/automation/resume/?site_id=123&run_id=abc Resume paused automation run Will continue from next queue item in current stage """ site_id = request.query_params.get('site_id') run_id = request.query_params.get('run_id') if not site_id or not run_id: return Response( {'error': 'site_id and run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: site = get_object_or_404(Site, id=site_id, account=request.user.account) run = AutomationRun.objects.get(run_id=run_id, site=site) if run.status != 'paused': return Response( {'error': f'Cannot resume automation with status: {run.status}'}, status=status.HTTP_400_BAD_REQUEST ) # Update status to running run.status = 'running' run.resumed_at = timezone.now() run.save(update_fields=['status', 'resumed_at']) # Queue continuation task from igny8_core.business.automation.tasks import continue_automation_task continue_automation_task.delay(run_id) return Response({ 'message': 'Automation resumed', 'status': run.status, 'resumed_at': run.resumed_at }) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) except Exception as e: return Response( {'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['post'], url_path='cancel') def cancel_automation(self, request): """ POST /api/v1/automation/cancel/?site_id=123&run_id=abc Cancel current automation run Will complete current queue item then stop permanently """ site_id = request.query_params.get('site_id') run_id = request.query_params.get('run_id') if not site_id or not run_id: return Response( {'error': 'site_id and run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: site = get_object_or_404(Site, id=site_id, account=request.user.account) run = AutomationRun.objects.get(run_id=run_id, site=site) if run.status not in ['running', 'paused']: return Response( {'error': f'Cannot cancel automation with status: {run.status}'}, status=status.HTTP_400_BAD_REQUEST ) # Update status to cancelled run.status = 'cancelled' run.cancelled_at = timezone.now() run.completed_at = timezone.now() run.save(update_fields=['status', 'cancelled_at', 'completed_at']) return Response({ 'message': 'Automation cancelled', 'status': run.status, 'cancelled_at': run.cancelled_at }) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) except Exception as e: return Response( {'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get'], url_path='run_progress') def run_progress(self, request): """ GET /api/v1/automation/run_progress/?site_id=123&run_id=abc Unified endpoint for ALL run progress data - global + per-stage. Replaces multiple separate API calls with single comprehensive response. Response includes: - run: Current run status and metadata - global_progress: Overall pipeline progress percentage - stages: Per-stage progress with input/output/processed counts - metrics: Credits used, duration, errors """ site_id = request.query_params.get('site_id') run_id = request.query_params.get('run_id') if not site_id: return Response( {'error': 'site_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: site = get_object_or_404(Site, id=site_id, account=request.user.account) # If no run_id, get current run if run_id: run = AutomationRun.objects.get(run_id=run_id, site=site) else: run = AutomationRun.objects.filter( site=site, status__in=['running', 'paused'] ).order_by('-started_at').first() if not run: return Response({ 'run': None, 'global_progress': None, 'stages': [], 'metrics': None }) # Build unified response response = self._build_run_progress_response(site, run) return Response(response) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) except Exception as e: return Response( {'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) def _build_run_progress_response(self, site, run): """Build comprehensive progress response for a run""" from igny8_core.business.planning.models import Keywords, Clusters, ContentIdeas from igny8_core.business.content.models import Tasks, Content, Images from django.db.models import Count from django.utils import timezone initial_snapshot = run.initial_snapshot or {} # Helper to get processed count from result def get_processed(result, key): if not result: return 0 return result.get(key, 0) # Helper to get output count from result def get_output(result, key): if not result: return 0 return result.get(key, 0) # Stage-specific key mapping for processed counts processed_keys = { 1: 'keywords_processed', 2: 'clusters_processed', 3: 'ideas_processed', 4: 'tasks_processed', 5: 'content_processed', 6: 'images_processed', 7: 'ready_for_review' } # Stage-specific key mapping for output counts output_keys = { 1: 'clusters_created', 2: 'ideas_created', 3: 'tasks_created', 4: 'content_created', 5: 'prompts_created', 6: 'images_generated', 7: 'ready_for_review' } # Build stages array stages = [] total_processed = 0 total_initial = initial_snapshot.get('total_initial_items', 0) stage_names = { 1: 'Keywords → Clusters', 2: 'Clusters → Ideas', 3: 'Ideas → Tasks', 4: 'Tasks → Content', 5: 'Content → Image Prompts', 6: 'Image Prompts → Images', 7: 'Manual Review Gate' } stage_types = { 1: 'AI', 2: 'AI', 3: 'Local', 4: 'AI', 5: 'AI', 6: 'AI', 7: 'Manual' } for stage_num in range(1, 8): result = getattr(run, f'stage_{stage_num}_result', None) initial_count = initial_snapshot.get(f'stage_{stage_num}_initial', 0) processed = get_processed(result, processed_keys[stage_num]) output = get_output(result, output_keys[stage_num]) total_processed += processed # Determine stage status if run.current_stage > stage_num: stage_status = 'completed' elif run.current_stage == stage_num: stage_status = 'active' else: stage_status = 'pending' # Calculate progress percentage for this stage progress = 0 if initial_count > 0: progress = round((processed / initial_count) * 100) elif run.current_stage > stage_num: progress = 100 stage_data = { 'number': stage_num, 'name': stage_names[stage_num], 'type': stage_types[stage_num], 'status': stage_status, 'input_count': initial_count, 'output_count': output, 'processed_count': processed, 'progress_percentage': min(progress, 100), 'credits_used': result.get('credits_used', 0) if result else 0, 'time_elapsed': result.get('time_elapsed', '') if result else '', } # Add currently_processing for active stage if stage_status == 'active': try: service = AutomationService.from_run_id(run.run_id) processing_state = service.get_current_processing_state() if processing_state: stage_data['currently_processing'] = processing_state.get('currently_processing', []) stage_data['up_next'] = processing_state.get('up_next', []) stage_data['remaining_count'] = processing_state.get('remaining_count', 0) except Exception: pass stages.append(stage_data) # Calculate global progress # Stages 1-6 are automation stages, Stage 7 is manual review (not counted) # Progress = weighted average of stages 1-6 completion global_percentage = 0 if run.status == 'completed': # If run is completed (after Stage 6), show 100% global_percentage = 100 elif run.status in ('cancelled', 'failed'): # Keep current progress for cancelled/failed if total_initial > 0: global_percentage = round((total_processed / total_initial) * 100) else: # Calculate based on completed stages (1-6 only) # Each of the 6 automation stages contributes ~16.67% to total completed_stages = min(max(run.current_stage - 1, 0), 6) stage_weight = 100 / 6 # Each stage is ~16.67% # Base progress from completed stages base_progress = completed_stages * stage_weight # Add partial progress from current stage current_stage_progress = 0 if run.current_stage <= 6: current_result = getattr(run, f'stage_{run.current_stage}_result', None) current_initial = initial_snapshot.get(f'stage_{run.current_stage}_initial', 0) if current_initial > 0 and current_result: processed_key = processed_keys.get(run.current_stage, '') current_processed = current_result.get(processed_key, 0) current_stage_progress = (current_processed / current_initial) * stage_weight global_percentage = round(base_progress + current_stage_progress) # Calculate duration duration_seconds = 0 if run.started_at: end_time = run.completed_at or timezone.now() duration_seconds = int((end_time - run.started_at).total_seconds()) return { 'run': { 'run_id': run.run_id, 'status': run.status, 'current_stage': run.current_stage, 'trigger_type': run.trigger_type, 'started_at': run.started_at, 'completed_at': run.completed_at, 'paused_at': run.paused_at, }, 'global_progress': { 'total_items': total_initial, 'completed_items': total_processed, 'percentage': min(global_percentage, 100), 'current_stage': run.current_stage, 'total_stages': 7 }, 'stages': stages, 'metrics': { 'credits_used': run.total_credits_used, 'duration_seconds': duration_seconds, 'errors': [] }, 'initial_snapshot': initial_snapshot }