# AI Automation Pipeline - Implementation Blueprint **Date:** December 3, 2025 **Purpose:** Site-level automation orchestrating existing AI functions into sequential 7-stage pipeline --- ## 🎯 EXECUTIVE SUMMARY ### What We're Building A **site-level automation page** (`/automation`) that orchestrates 6 existing AI functions + 1 local function into a **strictly sequential** 7-stage pipeline for hands-free content generation from keywords to draft content ready for review. ### Core Principles ✅ **Zero Duplication** - Reuse all existing AI functions (auto_cluster, generate_ideas, generate_content, generate_image_prompts, generate_images) ✅ **Strictly Sequential** - Stage N+1 ONLY starts when Stage N is 100% complete ✅ **Batch Processing** - Within each stage, process items in configurable batches with queues visible ✅ **Site-Level Scope** - NO sector filtering - operates on entire site's data ✅ **Observable** - Real-time batch progress, detailed queue counts, stage-by-stage logs ✅ **Safe Execution** - Distributed locks, credit reservations, idempotent stages, Celery task chaining ### Sequential Stage Execution - **Stage completes** → Trigger next stage automatically - **Within stage** → Process batches sequentially until queue empty - **Between stages** → Hard stop, verify completion, then proceed - **Never parallel** - Only 1 stage active at a time per site ### Automation Stops Before Publishing - **Stage 7 (Manual Review Gate)** - Automation ends when content is draft+images ready - User manually reviews and publishes via existing bulk actions - No automated publishing to WordPress (human oversight required) --- ## 🏗️ SCOPE & DATA MODEL ### Site-Level Operation (NO Sector) ``` ┌─────────────────────────────────────┐ │ Site: "example.com" │ │ ├─ Keywords (ALL sectors combined) │ │ ├─ Clusters (ALL sectors combined) │ │ ├─ Ideas (ALL sectors combined) │ │ └─ Tasks (ALL sectors combined) │ └─────────────────────────────────────┘ UI: Only Site selector at top Database queries: Filter by site_id only No sector dropdown in automation page ``` ### Why Site-Level? - **Simplicity** - User manages automation per website, not per topic - **Unified Progress** - See total content pipeline for entire site - **Flexible Sectors** - Content can span multiple sectors naturally - **Easier Scheduling** - One automation config per site --- ## 📋 EXISTING AI FUNCTIONS (Reused, Not Duplicated) | Function | File | Input | Output | Credits | Already Works | |----------|------|-------|--------|---------|---------------| | **auto_cluster** | `ai/functions/auto_cluster.py` | Keyword IDs (max 20) | Clusters created | 1 per 5 keywords | ✅ Yes | | **generate_ideas** | `ai/functions/generate_ideas.py` | Cluster IDs (max 5) | Ideas created | 2 per cluster | ✅ Yes | | **generate_content** | `ai/functions/generate_content.py` | Task IDs (1 at a time) | Content draft | 1 per 500 words | ✅ Yes | | **generate_image_prompts** | `ai/functions/generate_image_prompts.py` | Content IDs | Image prompts | 0.5 per prompt | ✅ Yes | | **generate_images** | `ai/functions/generate_images.py` | Image prompt IDs | Generated images | 1-4 per image | ✅ Yes | | **bulk_queue_to_writer** | `modules/planner/views.py#L1084` | Idea IDs | Tasks created | 0 (local) | ✅ Yes | **All functions already:** - Have async Celery tasks - Return task_id for progress tracking - Deduct credits automatically - Update model statuses (new → mapped → queued → completed) - Handle errors gracefully --- ## 🏗️ NEW COMPONENTS TO BUILD ### Phase 1: Backend Infrastructure #### 1.1 Database Models **File:** `backend/igny8_core/business/automation/models.py` ```python class AutomationRun(SiteSectorBaseModel): """Track each automation run""" run_id = models.CharField(max_length=100, unique=True, db_index=True) # Format: run_20251203_140523_manual or run_20251204_020000_scheduled trigger_type = models.CharField(max_length=20, choices=[ ('manual', 'Manual'), ('scheduled', 'Scheduled') ]) status = models.CharField(max_length=20, choices=[ ('running', 'Running'), ('paused', 'Paused'), ('completed', 'Completed'), ('failed', 'Failed') ], default='running') current_stage = models.IntegerField(default=1) # 1-7 started_at = models.DateTimeField(auto_now_add=True) completed_at = models.DateTimeField(null=True, blank=True) total_credits_used = models.IntegerField(default=0) # Stage results (JSON) stage_1_result = models.JSONField(default=dict, blank=True) # {clusters_created: 8, keywords_processed: 47} stage_2_result = models.JSONField(default=dict, blank=True) # {ideas_created: 56} stage_3_result = models.JSONField(default=dict, blank=True) # {tasks_created: 56} stage_4_result = models.JSONField(default=dict, blank=True) # {content_created: 56} stage_5_result = models.JSONField(default=dict, blank=True) # {prompts_created: 224} stage_6_result = models.JSONField(default=dict, blank=True) # {images_created: 224} stage_7_result = models.JSONField(default=dict, blank=True) # {ready_for_review: 56} error_message = models.TextField(blank=True, null=True) class Meta: ordering = ['-started_at'] indexes = [ models.Index(fields=['run_id']), models.Index(fields=['site', 'sector', '-started_at']), ] class AutomationConfig(SiteSectorBaseModel): """Store automation schedule and settings per site/sector""" is_enabled = models.BooleanField(default=False) # Schedule frequency = models.CharField(max_length=20, choices=[ ('daily', 'Daily'), ('weekly', 'Weekly'), ('monthly', 'Monthly') ], default='daily') scheduled_time = models.TimeField(default='02:00') # 2:00 AM # Batch sizes (sensible defaults from plan) stage_1_batch_size = models.IntegerField(default=20) # Keywords per batch stage_2_batch_size = models.IntegerField(default=1) # Clusters at a time stage_3_batch_size = models.IntegerField(default=20) # Ideas per batch stage_4_batch_size = models.IntegerField(default=1) # Tasks (sequential) stage_5_batch_size = models.IntegerField(default=1) # Content at a time stage_6_batch_size = models.IntegerField(default=1) # Images (auto-handled) last_run_at = models.DateTimeField(null=True, blank=True) next_run_at = models.DateTimeField(null=True, blank=True) class Meta: unique_together = [['site', 'sector']] ``` #### 1.2 Logging Service **File:** `backend/igny8_core/business/automation/services/automation_logger.py` ```python import os import logging from datetime import datetime from django.conf import settings class AutomationLogger: """File-based logging for automation runs""" def __init__(self): self.base_path = os.path.join(settings.BASE_DIR, 'logs', 'automation') def start_run(self, account_id, site_id, sector_id, trigger_type): """Create run_id and log directory""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') run_id = f"run_{timestamp}_{trigger_type}" # Create directory: logs/automation/{account_id}/{site_id}/{sector_id}/{run_id}/ log_dir = os.path.join( self.base_path, str(account_id), str(site_id), str(sector_id), run_id ) os.makedirs(log_dir, exist_ok=True) # Create main log file main_log = os.path.join(log_dir, 'automation_run.log') with open(main_log, 'w') as f: f.write(f"{'='*60}\\n") f.write(f"AUTOMATION RUN: {run_id}\\n") f.write(f"Started: {datetime.now()}\\n") f.write(f"Trigger: {trigger_type}\\n") f.write(f"{'='*60}\\n\\n") return run_id def log_stage_start(self, run_id, stage_number, stage_name, pending_count, account_id, site_id, sector_id): """Log start of a stage""" stage_file = self._get_stage_file(run_id, stage_number, account_id, site_id, sector_id) timestamp = datetime.now().strftime('%H:%M:%S') with open(stage_file, 'a') as f: f.write(f"\\n{'='*60}\\n") f.write(f"STAGE {stage_number}: {stage_name}\\n") f.write(f"Started: {datetime.now()}\\n") f.write(f"{'='*60}\\n\\n") f.write(f"{timestamp} - Found {pending_count} items to process\\n") def log_stage_progress(self, run_id, stage_number, message, account_id, site_id, sector_id): """Log progress within a stage""" stage_file = self._get_stage_file(run_id, stage_number, account_id, site_id, sector_id) timestamp = datetime.now().strftime('%H:%M:%S') with open(stage_file, 'a') as f: f.write(f"{timestamp} - {message}\\n") # Also append to main log main_file = self._get_main_log(run_id, account_id, site_id, sector_id) with open(main_file, 'a') as f: f.write(f"{timestamp} - Stage {stage_number}: {message}\\n") def log_stage_complete(self, run_id, stage_number, stage_name, processed_count, time_elapsed, credits_used, account_id, site_id, sector_id): """Log completion of a stage""" stage_file = self._get_stage_file(run_id, stage_number, account_id, site_id, sector_id) with open(stage_file, 'a') as f: f.write(f"\\n{'='*60}\\n") f.write(f"STAGE {stage_number} COMPLETE\\n") f.write(f"Total Time: {time_elapsed}\\n") f.write(f"Processed: {processed_count} items\\n") f.write(f"Credits Used: {credits_used}\\n") f.write(f"{'='*60}\\n") def log_stage_error(self, run_id, stage_number, error_message, account_id, site_id, sector_id): """Log error in a stage""" stage_file = self._get_stage_file(run_id, stage_number, account_id, site_id, sector_id) timestamp = datetime.now().strftime('%H:%M:%S') with open(stage_file, 'a') as f: f.write(f"\\n{timestamp} - ERROR: {error_message}\\n") def get_activity_log(self, run_id, account_id, site_id, sector_id, last_n=50): """Get last N lines from main log""" main_file = self._get_main_log(run_id, account_id, site_id, sector_id) if not os.path.exists(main_file): return [] with open(main_file, 'r') as f: lines = f.readlines() return lines[-last_n:] # Last 50 lines def _get_stage_file(self, run_id, stage_number, account_id, site_id, sector_id): """Get path to stage log file""" log_dir = os.path.join( self.base_path, str(account_id), str(site_id), str(sector_id), run_id ) return os.path.join(log_dir, f"stage_{stage_number}.log") def _get_main_log(self, run_id, account_id, site_id, sector_id): """Get path to main log file""" log_dir = os.path.join( self.base_path, str(account_id), str(site_id), str(sector_id), run_id ) return os.path.join(log_dir, 'automation_run.log') ``` #### 1.3 Automation Service (Core Orchestrator) **File:** `backend/igny8_core/business/automation/services/automation_service.py` ```python import time from datetime import datetime, timedelta from django.utils import timezone from igny8_core.business.automation.models import AutomationRun, AutomationConfig from igny8_core.business.automation.services.automation_logger import AutomationLogger from igny8_core.business.planning.models import Keywords, Clusters, ContentIdeas from igny8_core.business.content.models import Tasks, Content, Images from igny8_core.business.billing.services.credit_service import CreditService # Import existing services (NO DUPLICATION) from igny8_core.business.planning.services.clustering_service import ClusteringService from igny8_core.business.planning.services.ideas_service import IdeasService from igny8_core.ai.functions.generate_content import GenerateContentFunction from igny8_core.ai.functions.generate_image_prompts import GenerateImagePromptsFunction from igny8_core.ai.functions.generate_images import GenerateImagesFunction class AutomationService: """ Orchestrates the 7-stage automation pipeline. Reuses all existing AI functions - zero duplication. """ def __init__(self, account, site, sector): self.account = account self.site = site self.sector = sector self.logger = AutomationLogger() self.credit_service = CreditService() # Existing services self.clustering_service = ClusteringService() self.ideas_service = IdeasService() self.content_function = GenerateContentFunction() self.prompts_function = GenerateImagePromptsFunction() self.images_function = GenerateImagesFunction() self.run = None self.config = None def start_automation(self, trigger_type='manual'): \"\"\" Main entry point for automation. Creates run record, executes stages sequentially. \"\"\" try: # Create run record run_id = self.logger.start_run( self.account.id, self.site.id, self.sector.id, trigger_type ) self.run = AutomationRun.objects.create( run_id=run_id, trigger_type=trigger_type, account=self.account, site=self.site, sector=self.sector, status='running', current_stage=1 ) # Load config (for batch sizes) self.config = AutomationConfig.objects.filter( site=self.site, sector=self.sector ).first() if not self.config: # Create default config self.config = AutomationConfig.objects.create( site=self.site, sector=self.sector, account=self.account ) # Execute stages sequentially self.run_stage_1() # Keywords → Clusters self.run_stage_2() # Clusters → Ideas self.run_stage_3() # Ideas → Tasks self.run_stage_4() # Tasks → Content self.run_stage_5() # Content → Image Prompts self.run_stage_6() # Image Prompts → Images self.run_stage_7() # Manual Review Gate # Mark complete self.run.status = 'completed' self.run.completed_at = timezone.now() self.run.save() return { 'success': True, 'run_id': run_id, 'message': 'Automation completed successfully' } except Exception as e: if self.run: self.run.status = 'failed' self.run.error_message = str(e) self.run.save() return { 'success': False, 'error': str(e) } def run_stage_1(self): \"\"\"Stage 1: Keywords (status='new') → Clusters (AI)\"\"\" stage_start = time.time() stage_number = 1 stage_name = "Keywords → Clusters (AI)" # Find pending keywords (status='new', cluster_id=null) pending_keywords = Keywords.objects.filter( site=self.site, sector=self.sector, status='new', cluster__isnull=True, disabled=False ) total_count = pending_keywords.count() if total_count == 0: self.logger.log_stage_progress( self.run.run_id, stage_number, "No pending keywords found - skipping stage", self.account.id, self.site.id, self.sector.id ) self.run.current_stage = 2 self.run.save() return self.logger.log_stage_start( self.run.run_id, stage_number, stage_name, total_count, self.account.id, self.site.id, self.sector.id ) # Process in batches (default 20 per batch) batch_size = self.config.stage_1_batch_size keyword_ids = list(pending_keywords.values_list('id', flat=True)) clusters_created = 0 keywords_processed = 0 credits_used = 0 for i in range(0, len(keyword_ids), batch_size): batch = keyword_ids[i:i+batch_size] batch_num = (i // batch_size) + 1 total_batches = (len(keyword_ids) + batch_size - 1) // batch_size self.logger.log_stage_progress( self.run.run_id, stage_number, f"Processing batch {batch_num}/{total_batches} ({len(batch)} keywords)", self.account.id, self.site.id, self.sector.id ) # Call existing ClusteringService (REUSE - NO DUPLICATION) result = self.clustering_service.cluster_keywords( keyword_ids=batch, account=self.account, sector_id=self.sector.id ) if result.get('success'): clusters_created += result.get('clusters_created', 0) keywords_processed += len(batch) credits_used += result.get('credits_used', 0) self.logger.log_stage_progress( self.run.run_id, stage_number, f"Batch {batch_num} complete: {result.get('clusters_created', 0)} clusters created", self.account.id, self.site.id, self.sector.id ) # Save stage result elapsed = time.time() - stage_start self.run.stage_1_result = { 'clusters_created': clusters_created, 'keywords_processed': keywords_processed } self.run.total_credits_used += credits_used self.run.current_stage = 2 self.run.save() self.logger.log_stage_complete( self.run.run_id, stage_number, stage_name, keywords_processed, f"{elapsed:.0f}s", credits_used, self.account.id, self.site.id, self.sector.id ) def run_stage_2(self): \"\"\"Stage 2: Clusters (status='new', no ideas) → Ideas (AI)\"\"\" # Similar structure to stage_1 # Calls existing IdeasService.generate_ideas() pass def run_stage_3(self): \"\"\"Stage 3: Ideas (status='new') → Tasks (Local queue)\"\"\" # Calls existing bulk_queue_to_writer endpoint logic pass def run_stage_4(self): \"\"\"Stage 4: Tasks (status='queued') → Content (AI)\"\"\" # Calls existing GenerateContentFunction # Process one task at a time (sequential) pass def run_stage_5(self): \"\"\"Stage 5: Content (draft) → Image Prompts (AI)\"\"\" # Calls existing GenerateImagePromptsFunction pass def run_stage_6(self): \"\"\"Stage 6: Image Prompts (pending) → Images (AI)\"\"\" # Calls existing GenerateImagesFunction # Handles batching automatically pass def run_stage_7(self): \"\"\"Stage 7: Manual Review Gate (STOP)\"\"\" # Just count content ready for review # Log final status # Automation ends here pass ``` --- ### Phase 2: API Endpoints #### 2.1 Automation ViewSet **File:** `backend/igny8_core/modules/automation/views.py` (NEW MODULE) ```python from rest_framework import viewsets from rest_framework.decorators import action from igny8_core.api.base import SiteSectorModelViewSet from igny8_core.api.response import success_response, error_response from igny8_core.business.automation.models import AutomationRun, AutomationConfig from igny8_core.business.automation.services.automation_service import AutomationService from igny8_core.business.automation.services.automation_logger import AutomationLogger class AutomationViewSet(SiteSectorModelViewSet): """API endpoints for automation""" @action(detail=False, methods=['post']) def run_now(self, request): """Trigger manual automation run""" account = request.account site = request.site sector = request.sector # Start automation service = AutomationService(account, site, sector) result = service.start_automation(trigger_type='manual') if result['success']: return success_response(data={'run_id': result['run_id']}) else: return error_response(message=result['error']) @action(detail=False, methods=['get']) def current_run(self, request): """Get current/latest automation run status""" site = request.site sector = request.sector run = AutomationRun.objects.filter( site=site, sector=sector ).order_by('-started_at').first() if not run: return success_response(data={'run': None}) # Get activity log logger = AutomationLogger() activity = logger.get_activity_log( run.run_id, run.account_id, run.site_id, run.sector_id, last_n=50 ) return success_response(data={ 'run': { 'run_id': run.run_id, 'status': run.status, 'current_stage': run.current_stage, 'trigger_type': run.trigger_type, 'started_at': run.started_at, 'total_credits_used': run.total_credits_used, 'stage_1_result': run.stage_1_result, 'stage_2_result': run.stage_2_result, 'stage_3_result': run.stage_3_result, 'stage_4_result': run.stage_4_result, 'stage_5_result': run.stage_5_result, 'stage_6_result': run.stage_6_result, 'stage_7_result': run.stage_7_result, }, 'activity_log': activity }) @action(detail=False, methods=['get', 'put']) def config(self, request): """Get/Update automation configuration""" site = request.site sector = request.sector account = request.account if request.method == 'GET': config, created = AutomationConfig.objects.get_or_create( site=site, sector=sector, defaults={'account': account} ) return success_response(data={ 'is_enabled': config.is_enabled, 'frequency': config.frequency, 'scheduled_time': config.scheduled_time, 'next_run_at': config.next_run_at, 'stage_1_batch_size': config.stage_1_batch_size, 'stage_2_batch_size': config.stage_2_batch_size, 'stage_3_batch_size': config.stage_3_batch_size, 'stage_4_batch_size': config.stage_4_batch_size, 'stage_5_batch_size': config.stage_5_batch_size, 'stage_6_batch_size': config.stage_6_batch_size, }) elif request.method == 'PUT': # Update configuration config, created = AutomationConfig.objects.get_or_create( site=site, sector=sector, defaults={'account': account} ) # Update fields from request config.is_enabled = request.data.get('is_enabled', config.is_enabled) config.frequency = request.data.get('frequency', config.frequency) config.scheduled_time = request.data.get('scheduled_time', config.scheduled_time) config.save() return success_response(message='Configuration updated') ``` #### 2.2 URL Configuration **File:** `backend/igny8_core/urls/api_urls.py` (ADD) ```python # Add to router router.register(r'automation', AutomationViewSet, basename='automation') ``` --- ### Phase 3: Celery Scheduled Task #### 3.1 Periodic Task for Scheduled Runs **File:** `backend/igny8_core/tasks/automation_tasks.py` (NEW) ```python from celery import shared_task from django.utils import timezone from datetime import datetime, timedelta from igny8_core.business.automation.models import AutomationConfig from igny8_core.business.automation.services.automation_service import AutomationService from igny8_core.auth.models import Account, Site, Sector @shared_task(name='run_scheduled_automation') def run_scheduled_automation(): \"\"\" Celery beat task - runs every hour, checks if any configs need to run \"\"\" now = timezone.now() # Find configs that: # 1. Are enabled # 2. Have next_run_at <= now configs = AutomationConfig.objects.filter( is_enabled=True, next_run_at__lte=now ) for config in configs: try: # Load related objects account = config.account site = config.site sector = config.sector # Start automation service = AutomationService(account, site, sector) service.start_automation(trigger_type='scheduled') # Calculate next run time if config.frequency == 'daily': next_run = now + timedelta(days=1) elif config.frequency == 'weekly': next_run = now + timedelta(weeks=1) elif config.frequency == 'monthly': next_run = now + timedelta(days=30) # Set time to scheduled_time next_run = next_run.replace( hour=config.scheduled_time.hour, minute=config.scheduled_time.minute, second=0, microsecond=0 ) config.last_run_at = now config.next_run_at = next_run config.save() except Exception as e: # Log error but continue with other configs print(f"Error running scheduled automation for {config.id}: {e}") continue ``` #### 3.2 Register Celery Beat Schedule **File:** `backend/igny8_core/celery.py` (UPDATE) ```python app.conf.beat_schedule = { # ... existing schedules ... 'run-scheduled-automation': { 'task': 'run_scheduled_automation', 'schedule': crontab(minute=0), # Every hour on the hour }, } ``` --- ### Phase 4: Frontend Components #### 4.1 Automation Page Component **File:** `frontend/src/pages/Automation/Dashboard.tsx` (NEW) ```tsx import { useState, useEffect } from 'react'; import { useInterval } from '../../hooks/useInterval'; import { automationApi } from '../../services/api/automationApi'; import { useSiteStore } from '../../stores/siteStore'; import { useSectorStore } from '../../stores/sectorStore'; import Button from '../../components/ui/button/Button'; import { BoltIcon, PlayIcon, PauseIcon, SettingsIcon } from '../../icons'; import StageCard from './components/StageCard'; import ActivityLog from './components/ActivityLog'; import ConfigModal from './components/ConfigModal'; export default function AutomationDashboard() { const { activeSite } = useSiteStore(); const { activeSector } = useSectorStore(); const [currentRun, setCurrentRun] = useState(null); const [activityLog, setActivityLog] = useState([]); const [config, setConfig] = useState(null); const [showConfigModal, setShowConfigModal] = useState(false); const [isRunning, setIsRunning] = useState(false); // Poll current run status every 3 seconds when running useInterval(() => { if (activeSite && activeSector) { loadCurrentRun(); } }, currentRun?.status === 'running' ? 3000 : null); useEffect(() => { if (activeSite && activeSector) { loadCurrentRun(); loadConfig(); } }, [activeSite?.id, activeSector?.id]); const loadCurrentRun = async () => { const response = await automationApi.getCurrentRun(); if (response.success) { setCurrentRun(response.data.run); setActivityLog(response.data.activity_log || []); } }; const loadConfig = async () => { const response = await automationApi.getConfig(); if (response.success) { setConfig(response.data); } }; const handleRunNow = async () => { setIsRunning(true); const response = await automationApi.runNow(); if (response.success) { // Start polling loadCurrentRun(); } setIsRunning(false); }; return (
Automated content generation from keywords to review