""" Automation API Views REST API endpoints for automation management """ from rest_framework import viewsets, status from rest_framework.decorators import action from rest_framework.response import Response from rest_framework.permissions import IsAuthenticated from django.shortcuts import get_object_or_404 from django.utils import timezone from drf_spectacular.utils import extend_schema from igny8_core.business.automation.models import AutomationConfig, AutomationRun from igny8_core.business.automation.services import AutomationService from igny8_core.auth.models import Account, Site class AutomationViewSet(viewsets.ViewSet): """API endpoints for automation""" permission_classes = [IsAuthenticated] def _get_site(self, request): """Get site from request""" site_id = request.query_params.get('site_id') if not site_id: return None, Response( {'error': 'site_id required'}, status=status.HTTP_400_BAD_REQUEST ) site = get_object_or_404(Site, id=site_id, account=request.user.account) return site, None @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def config(self, request): """ GET /api/v1/automation/config/?site_id=123 Get automation configuration for site """ site, error_response = self._get_site(request) if error_response: return error_response config, _ = AutomationConfig.objects.get_or_create( account=site.account, site=site, defaults={ 'is_enabled': False, 'frequency': 'daily', 'scheduled_time': '02:00', 'within_stage_delay': 3, 'between_stage_delay': 5, } ) return Response({ 'is_enabled': config.is_enabled, 'frequency': config.frequency, 'scheduled_time': str(config.scheduled_time), 'stage_1_batch_size': config.stage_1_batch_size, 'stage_2_batch_size': config.stage_2_batch_size, 'stage_3_batch_size': config.stage_3_batch_size, 'stage_4_batch_size': config.stage_4_batch_size, 'stage_5_batch_size': config.stage_5_batch_size, 'stage_6_batch_size': config.stage_6_batch_size, 'within_stage_delay': config.within_stage_delay, 'between_stage_delay': config.between_stage_delay, 'last_run_at': config.last_run_at, 'next_run_at': config.next_run_at, }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['put']) def update_config(self, request): """ PUT /api/v1/automation/update_config/?site_id=123 Update automation configuration Body: { "is_enabled": true, "frequency": "daily", "scheduled_time": "02:00", "stage_1_batch_size": 50, ... } """ site, error_response = self._get_site(request) if error_response: return error_response config, _ = AutomationConfig.objects.get_or_create( account=site.account, site=site ) # Update fields if 'is_enabled' in request.data: config.is_enabled = request.data['is_enabled'] if 'frequency' in request.data: config.frequency = request.data['frequency'] if 'scheduled_time' in request.data: config.scheduled_time = request.data['scheduled_time'] if 'stage_1_batch_size' in request.data: config.stage_1_batch_size = request.data['stage_1_batch_size'] if 'stage_2_batch_size' in request.data: config.stage_2_batch_size = request.data['stage_2_batch_size'] if 'stage_3_batch_size' in request.data: config.stage_3_batch_size = request.data['stage_3_batch_size'] if 'stage_4_batch_size' in request.data: config.stage_4_batch_size = request.data['stage_4_batch_size'] if 'stage_5_batch_size' in request.data: config.stage_5_batch_size = request.data['stage_5_batch_size'] if 'stage_6_batch_size' in request.data: config.stage_6_batch_size = request.data['stage_6_batch_size'] # Delay settings if 'within_stage_delay' in request.data: try: config.within_stage_delay = int(request.data['within_stage_delay']) except (TypeError, ValueError): pass if 'between_stage_delay' in request.data: try: config.between_stage_delay = int(request.data['between_stage_delay']) except (TypeError, ValueError): pass config.save() return Response({ 'message': 'Config updated', 'is_enabled': config.is_enabled, 'frequency': config.frequency, 'scheduled_time': str(config.scheduled_time), 'stage_1_batch_size': config.stage_1_batch_size, 'stage_2_batch_size': config.stage_2_batch_size, 'stage_3_batch_size': config.stage_3_batch_size, 'stage_4_batch_size': config.stage_4_batch_size, 'stage_5_batch_size': config.stage_5_batch_size, 'stage_6_batch_size': config.stage_6_batch_size, 'within_stage_delay': config.within_stage_delay, 'between_stage_delay': config.between_stage_delay, 'last_run_at': config.last_run_at, 'next_run_at': config.next_run_at, }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['post']) def run_now(self, request): """ POST /api/v1/automation/run_now/?site_id=123 Trigger automation run immediately """ site, error_response = self._get_site(request) if error_response: return error_response try: service = AutomationService(site.account, site) run_id = service.start_automation(trigger_type='manual') # Start async processing from igny8_core.business.automation.tasks import run_automation_task run_automation_task.delay(run_id) return Response({ 'run_id': run_id, 'message': 'Automation started' }) except ValueError as e: return Response( {'error': str(e)}, status=status.HTTP_400_BAD_REQUEST ) except Exception as e: return Response( {'error': f'Failed to start automation: {str(e)}'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def current_run(self, request): """ GET /api/v1/automation/current_run/?site_id=123 Get current automation run status """ site, error_response = self._get_site(request) if error_response: return error_response run = AutomationRun.objects.filter( site=site, status__in=['running', 'paused'] ).order_by('-started_at').first() if not run: return Response({'run': None}) return Response({ 'run': { 'run_id': run.run_id, 'status': run.status, 'current_stage': run.current_stage, 'trigger_type': run.trigger_type, 'started_at': run.started_at, 'total_credits_used': run.total_credits_used, 'stage_1_result': run.stage_1_result, 'stage_2_result': run.stage_2_result, 'stage_3_result': run.stage_3_result, 'stage_4_result': run.stage_4_result, 'stage_5_result': run.stage_5_result, 'stage_6_result': run.stage_6_result, 'stage_7_result': run.stage_7_result, } }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['post']) def pause(self, request): """ POST /api/v1/automation/pause/?run_id=abc123 Pause automation run """ run_id = request.query_params.get('run_id') if not run_id: return Response( {'error': 'run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: service = AutomationService.from_run_id(run_id) service.pause_automation() return Response({'message': 'Automation paused'}) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['post']) def resume(self, request): """ POST /api/v1/automation/resume/?run_id=abc123 Resume paused automation run """ run_id = request.query_params.get('run_id') if not run_id: return Response( {'error': 'run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: service = AutomationService.from_run_id(run_id) service.resume_automation() # Resume async processing from igny8_core.business.automation.tasks import resume_automation_task resume_automation_task.delay(run_id) return Response({'message': 'Automation resumed'}) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def history(self, request): """ GET /api/v1/automation/history/?site_id=123 Get automation run history """ site, error_response = self._get_site(request) if error_response: return error_response runs = AutomationRun.objects.filter( site=site ).order_by('-started_at')[:20] return Response({ 'runs': [ { 'run_id': run.run_id, 'status': run.status, 'trigger_type': run.trigger_type, 'started_at': run.started_at, 'completed_at': run.completed_at, 'total_credits_used': run.total_credits_used, 'current_stage': run.current_stage, } for run in runs ] }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def logs(self, request): """ GET /api/v1/automation/logs/?run_id=abc123&lines=100 Get automation run logs """ run_id = request.query_params.get('run_id') if not run_id: return Response( {'error': 'run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: run = AutomationRun.objects.get(run_id=run_id) service = AutomationService(run.account, run.site) lines = int(request.query_params.get('lines', 100)) log_text = service.logger.get_activity_log( run.account.id, run.site.id, run_id, lines ) return Response({ 'run_id': run_id, 'log': log_text }) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def estimate(self, request): """ GET /api/v1/automation/estimate/?site_id=123 Estimate credits needed for automation """ site, error_response = self._get_site(request) if error_response: return error_response service = AutomationService(site.account, site) estimated_credits = service.estimate_credits() return Response({ 'estimated_credits': estimated_credits, 'current_balance': site.account.credits, 'sufficient': site.account.credits >= (estimated_credits * 1.2) }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get']) def pipeline_overview(self, request): """ GET /api/v1/automation/pipeline_overview/?site_id=123 Get pipeline overview with pending counts for all stages """ site, error_response = self._get_site(request) if error_response: return error_response from igny8_core.business.planning.models import Keywords, Clusters, ContentIdeas from igny8_core.business.content.models import Tasks, Content, Images from django.db.models import Count def _counts_by_status(model, extra_filter=None, exclude_filter=None): """Return a dict of counts keyed by status and the total for a given model and site.""" qs = model.objects.filter(site=site) if extra_filter: qs = qs.filter(**extra_filter) if exclude_filter: qs = qs.exclude(**exclude_filter) # Group by status when available try: rows = qs.values('status').annotate(count=Count('id')) counts = {r['status']: r['count'] for r in rows} total = sum(counts.values()) except Exception: # Fallback: count all total = qs.count() counts = {'total': total} return counts, total # Stage 1: Keywords pending clustering (keep previous "pending" semantics but also return status breakdown) stage_1_counts, stage_1_total = _counts_by_status( Keywords, extra_filter={'disabled': False} ) # pending definition used by the UI previously (new & not clustered) stage_1_pending = Keywords.objects.filter( site=site, status='new', cluster__isnull=True, disabled=False ).count() # Stage 2: Clusters needing ideas stage_2_counts, stage_2_total = _counts_by_status( Clusters, extra_filter={'disabled': False} ) stage_2_pending = Clusters.objects.filter( site=site, status='new', disabled=False ).exclude( ideas__isnull=False ).count() # Stage 3: Ideas ready to queue stage_3_counts, stage_3_total = _counts_by_status(ContentIdeas) stage_3_pending = ContentIdeas.objects.filter( site=site, status='new' ).count() # Stage 4: Tasks ready for content generation stage_4_counts, stage_4_total = _counts_by_status(Tasks) stage_4_pending = Tasks.objects.filter( site=site, status='queued' ).count() # Stage 5: Content ready for image prompts # We will provide counts per content status and also compute pending as previous (draft with 0 images) stage_5_counts, stage_5_total = _counts_by_status(Content) stage_5_pending = Content.objects.filter( site=site, status='draft' ).annotate( images_count=Count('images') ).filter( images_count=0 ).count() # Stage 6: Image prompts ready for generation stage_6_counts, stage_6_total = _counts_by_status(Images) stage_6_pending = Images.objects.filter( site=site, status='pending' ).count() # Stage 7: Content ready for review # Provide counts per status for content and keep previous "review" pending count stage_7_counts, stage_7_total = _counts_by_status(Content) stage_7_ready = Content.objects.filter( site=site, status='review' ).count() return Response({ 'stages': [ { 'number': 1, 'name': 'Keywords → Clusters', 'pending': stage_1_pending, 'type': 'AI', 'counts': stage_1_counts, 'total': stage_1_total }, { 'number': 2, 'name': 'Clusters → Ideas', 'pending': stage_2_pending, 'type': 'AI', 'counts': stage_2_counts, 'total': stage_2_total }, { 'number': 3, 'name': 'Ideas → Tasks', 'pending': stage_3_pending, 'type': 'Local', 'counts': stage_3_counts, 'total': stage_3_total }, { 'number': 4, 'name': 'Tasks → Content', 'pending': stage_4_pending, 'type': 'AI', 'counts': stage_4_counts, 'total': stage_4_total }, { 'number': 5, 'name': 'Content → Image Prompts', 'pending': stage_5_pending, 'type': 'AI', 'counts': stage_5_counts, 'total': stage_5_total }, { 'number': 6, 'name': 'Image Prompts → Images', 'pending': stage_6_pending, 'type': 'AI', 'counts': stage_6_counts, 'total': stage_6_total }, { 'number': 7, 'name': 'Manual Review Gate', 'pending': stage_7_ready, 'type': 'Manual', 'counts': stage_7_counts, 'total': stage_7_total } ] }) @extend_schema(tags=['Automation']) @action(detail=False, methods=['get'], url_path='current_processing') def current_processing(self, request): """ GET /api/v1/automation/current_processing/?site_id=123&run_id=abc Get current processing state for active automation run """ site_id = request.query_params.get('site_id') run_id = request.query_params.get('run_id') if not site_id or not run_id: return Response( {'error': 'site_id and run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: # Get the site site = get_object_or_404(Site, id=site_id, account=request.user.account) # Get the run run = AutomationRun.objects.get(run_id=run_id, site=site) # If not running, return None if run.status != 'running': return Response({'data': None}) # Get current processing state service = AutomationService.from_run_id(run_id) state = service.get_current_processing_state() return Response({'data': state}) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) except Exception as e: return Response( {'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['post'], url_path='pause') def pause_automation(self, request): """ POST /api/v1/automation/pause/?site_id=123&run_id=abc Pause current automation run Will complete current queue item then pause before next item """ site_id = request.query_params.get('site_id') run_id = request.query_params.get('run_id') if not site_id or not run_id: return Response( {'error': 'site_id and run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: site = get_object_or_404(Site, id=site_id, account=request.user.account) run = AutomationRun.objects.get(run_id=run_id, site=site) if run.status != 'running': return Response( {'error': f'Cannot pause automation with status: {run.status}'}, status=status.HTTP_400_BAD_REQUEST ) # Update status to paused run.status = 'paused' run.paused_at = timezone.now() run.save(update_fields=['status', 'paused_at']) return Response({ 'message': 'Automation paused', 'status': run.status, 'paused_at': run.paused_at }) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) except Exception as e: return Response( {'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['post'], url_path='resume') def resume_automation(self, request): """ POST /api/v1/automation/resume/?site_id=123&run_id=abc Resume paused automation run Will continue from next queue item in current stage """ site_id = request.query_params.get('site_id') run_id = request.query_params.get('run_id') if not site_id or not run_id: return Response( {'error': 'site_id and run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: site = get_object_or_404(Site, id=site_id, account=request.user.account) run = AutomationRun.objects.get(run_id=run_id, site=site) if run.status != 'paused': return Response( {'error': f'Cannot resume automation with status: {run.status}'}, status=status.HTTP_400_BAD_REQUEST ) # Update status to running run.status = 'running' run.resumed_at = timezone.now() run.save(update_fields=['status', 'resumed_at']) # Queue continuation task from igny8_core.business.automation.tasks import continue_automation_task continue_automation_task.delay(run_id) return Response({ 'message': 'Automation resumed', 'status': run.status, 'resumed_at': run.resumed_at }) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) except Exception as e: return Response( {'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) @extend_schema(tags=['Automation']) @action(detail=False, methods=['post'], url_path='cancel') def cancel_automation(self, request): """ POST /api/v1/automation/cancel/?site_id=123&run_id=abc Cancel current automation run Will complete current queue item then stop permanently """ site_id = request.query_params.get('site_id') run_id = request.query_params.get('run_id') if not site_id or not run_id: return Response( {'error': 'site_id and run_id required'}, status=status.HTTP_400_BAD_REQUEST ) try: site = get_object_or_404(Site, id=site_id, account=request.user.account) run = AutomationRun.objects.get(run_id=run_id, site=site) if run.status not in ['running', 'paused']: return Response( {'error': f'Cannot cancel automation with status: {run.status}'}, status=status.HTTP_400_BAD_REQUEST ) # Update status to cancelled run.status = 'cancelled' run.cancelled_at = timezone.now() run.completed_at = timezone.now() run.save(update_fields=['status', 'cancelled_at', 'completed_at']) return Response({ 'message': 'Automation cancelled', 'status': run.status, 'cancelled_at': run.cancelled_at }) except AutomationRun.DoesNotExist: return Response( {'error': 'Run not found'}, status=status.HTTP_404_NOT_FOUND ) except Exception as e: return Response( {'error': str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR )