57 KiB
AI Automation Pipeline - Complete Implementation Plan
Version: 2.0
Date: December 3, 2025
Scope: Site-level automation orchestrating existing AI functions
🎯 CORE ARCHITECTURE DECISIONS
Decision 1: Site-Level Automation (NO Sector)
Rationale:
- User manages automation per website, not per topic/sector
- Simpler UX - single site selector at top of page
- Database queries filter by
site_idonly (no sector_id filtering) - Content naturally spans multiple sectors within a site
- One automation schedule per site (not per site/sector combination)
Implementation:
- Remove sector dropdown from automation page UI
- AutomationRun model: Remove sector foreign key
- AutomationConfig model: One config per site (not per site+sector)
- All stage database queries:
.filter(site=site)(no sector filter)
Decision 2: Single Global Automation Page
Why:
- Complete pipeline visibility in one place (Keywords → Draft Content)
- Configure one schedule for entire lifecycle
- See exactly where pipeline is stuck or running
- Cleaner UX - no jumping between module pages
Location: /automation (new route below Sites in sidebar)
Decision 3: Strictly Sequential Stages (Never Parallel)
Critical Principle:
- Stage N+1 ONLY starts when Stage N is 100% complete
- Within each stage: process items in batches sequentially
- Hard stop between stages to verify completion
- Only ONE stage active at a time per site
Example Flow:
Stage 1 starts → processes ALL batches → completes 100%
↓ (trigger next)
Stage 2 starts → processes ALL batches → completes 100%
↓ (trigger next)
Stage 3 starts → ...
Never:
- Run stages in parallel
- Start next stage while current stage has pending items
- Skip verification between stages
Decision 4: Automation Stops Before Publishing
Manual Review Gate (Stage 7):
- Automation ends when content reaches
status='draft'with all images generated - User manually reviews content quality, accuracy, brand voice
- User manually publishes via existing bulk actions on Content page
- No automated WordPress publishing (requires human oversight)
Rationale:
- Content quality control needed
- Publishing has real consequences (public-facing)
- Legal/compliance review may be required
- Brand voice verification essential
📊 EXISTING AI FUNCTIONS (Zero Duplication)
┌──────────────────────────────────────────────────────────────┐
│ 🤖 AI AUTOMATION PIPELINE │
│ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │
│ │
│ ⏰ SCHEDULE │
│ Next Run: Tomorrow at 2:00 AM (in 16 hours) │
│ Frequency: [Daily ▼] at [02:00 ▼] │
│ Status: ● Scheduled │
│ │
│ [Run Now] [Pause Schedule] [Configure] │
│ │
├──────────────────────────────────────────────────────────────┤
│ 📊 PIPELINE OVERVIEW │
│ │
│ Keywords ──→ Clusters ──→ Ideas ──→ Tasks ──→ Content │
│ 47 pending 42 20 generating │
│ pending Stage 1 ready queued Stage 5 │
│ │
│ Overall Progress: ━━━━━━━╸ 62% (Stage 5/7) │
│ Estimated Completion: 2 hours 15 minutes │
│ │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ STAGE 1: Keywords → Clusters (AI) │
│ Status: ✓ Completed │
│ • Processed: 60 keywords → 8 clusters │
│ • Time: 2m 30s | Credits: 12 │
│ [View Details] [Retry Failed] │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ STAGE 2: Clusters → Ideas (AI) │
│ Status: ✓ Completed │
│ • Processed: 8 clusters → 56 ideas │
│ • Time: 8m 15s | Credits: 16 │
│ [View Details] │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ STAGE 3: Ideas → Tasks (Local Queue) │
│ Status: ✓ Completed │
│ • Processed: 42 ideas → 42 tasks │
│ • Time: Instant | Credits: 0 │
│ [View Details] │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ STAGE 4: Tasks → Content (AI) │
│ Status: ● Processing (Task 3/20) │
│ • Current: "Ultimate Coffee Bean Guide" ━━━━╸ 65% │
│ • Progress: 2 completed, 1 processing, 17 queued │
│ • Time: 45m elapsed | Credits: 38 used │
│ • ETA: 1h 30m remaining │
│ [View Details] [Pause Stage] │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ STAGE 5: Content → Image Prompts (AI) │
│ Status: ⏸ Waiting (Stage 4 must complete) │
│ • Pending: 2 content pieces ready for prompts │
│ • Queue: Will process when Stage 4 completes │
│ [View Details] [Trigger Now] │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ STAGE 6: Image Prompts → Generated Images (AI) │
│ Status: ⏸ Waiting │
│ • Pending: 0 prompts ready │
│ [View Details] │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ STAGE 7: Content → Review (Manual Gate) 🚫 STOPS HERE │
│ Status: ⏸ Awaiting Manual Review │
│ • Ready for Review: 2 content pieces │
│ • Note: Automation stops here. User reviews manually. │
│ [Go to Review Page] │
└──────────────────────────────────────────────────────────────┘
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
📋 LIVE ACTIVITY LOG (Last 50 events)
├─ 14:23:45 - Stage 4: Started content generation for Task 3
├─ 14:24:12 - Stage 4: Writing sections (65% complete)
├─ 14:22:30 - Stage 4: Completed Task 2 → Content created
├─ 14:20:15 - Stage 4: Started content generation for Task 2
├─ 14:18:45 - Stage 4: Completed Task 1 → Content created
└─ [View Full Log]
💰 TOTAL CREDITS USED THIS RUN: 66 credits
All 6 AI Functions Already Exist and Work:
| Function | File Location | Input | Output | Credits | Status |
|---|---|---|---|---|---|
| auto_cluster | ai/functions/auto_cluster.py |
Keyword IDs (max 20) | Clusters created | 1 per 5 keywords | ✅ Working |
| generate_ideas | ai/functions/generate_ideas.py |
Cluster IDs (max 5) | Ideas created | 2 per cluster | ✅ Working |
| bulk_queue_to_writer | modules/planner/views.py (line 1014) |
Idea IDs | Tasks created | 0 (local) | ✅ Working |
| generate_content | ai/functions/generate_content.py |
Task IDs (1 at a time) | Content draft | 1 per 500 words | ✅ Working |
| generate_image_prompts | ai/functions/generate_image_prompts.py |
Content IDs | Image prompts | 0.5 per prompt | ✅ Working |
| generate_images | ai/functions/generate_images.py |
Image prompt IDs | Generated images | 1-4 per image | ✅ Working |
🚫 WHAT AI FUNCTIONS ALREADY DO (DO NOT DUPLICATE)
Credit Management (Fully Automated in ai/engine.py):
# Line 395 in AIEngine.execute():
CreditService.deduct_credits_for_operation(
account=account,
operation_type=self._get_operation_type(),
amount=self._get_actual_amount(),
...
)
- ✅ Credits are AUTOMATICALLY deducted after successful save
- ✅ Credit calculation happens in
_get_actual_amount()and_get_operation_type() - ❌ Automation does NOT need to call
CreditServicemanually - ❌ Automation does NOT need to calculate credit costs
Status Updates (Handled Inside AI Functions):
- ✅ Keywords:
status='new'→status='mapped'(in auto_cluster save_output) - ✅ Clusters: Created with
status='new'(in auto_cluster save_output) - ✅ Ideas:
status='new'→status='queued'(in bulk_queue_to_writer) - ✅ Tasks: Created with
status='queued', →status='completed'(in generate_content) - ✅ Content: Created with
status='draft', →status='review'ONLY when all images complete (ai/tasks.py line 723) - ✅ Images:
status='pending'→status='generated'(in generate_images save_output) - ❌ Automation does NOT update these statuses directly
Progress Tracking (Event-Based System Already Exists):
- ✅
StepTrackerandProgressTrackeremit real-time events during AI execution - ✅ Each AI function has 6 phases:
INIT,PREP,AI_CALL,PARSE,SAVE,DONE - ✅ Phase descriptions available in function metadata:
get_metadata() - ❌ Automation does NOT need to poll progress every 2 seconds
- ❌ Automation listens to existing phase events via Celery task status
Error Handling & Logging:
- ✅ AIEngine wraps execution in try/catch, logs to
AIUsageLog - ✅ Failed operations rollback database changes automatically
- ❌ Automation only needs to check final task result (success/failure)
Automation Service ONLY Does:
- Batch Selection: Query database for items to process (by status and site)
- Function Calling: Call existing AI functions with selected item IDs
- Stage Sequencing: Wait for Stage N completion before starting Stage N+1
- Scheduling: Trigger automation runs on configurable schedules
- Aggregation: Collect results from all batches and log totals per stage
🏗️ 7-STAGE PIPELINE ARCHITECTURE
Sequential Stage Flow
| Stage | From | To | Function Used | Batch Size | Type |
|---|---|---|---|---|---|
| 1 | Keywords (status='new', cluster_id=null) |
Clusters (status='new') |
auto_cluster |
20 keywords | AI |
| 2 | Clusters (status='new', no ideas) |
Ideas (status='new') |
generate_ideas |
1 cluster | AI |
| 3 | Ideas (status='new') |
Tasks (status='queued') |
bulk_queue_to_writer |
20 ideas | Local |
| 4 | Tasks (status='queued') |
Content (status='draft') |
generate_content |
1 task | AI |
| 5 | Content (status='draft', no Images) |
Images (status='pending' with prompts) |
generate_image_prompts |
1 content | AI |
| 6 | Images (status='pending') |
Images (status='generated' with URLs) |
generate_images |
1 image | AI |
| 7 | Content (status='review') |
Manual Review | None (gate) | N/A | Manual |
Stage 1: Keywords → Clusters (AI)
Purpose: Group semantically similar keywords into topic clusters
Database Query (Automation Orchestrator):
pending_keywords = Keywords.objects.filter(
site=site,
status='new',
cluster__isnull=True,
disabled=False
)
Orchestration Logic (What Automation Does):
-
Select Batch: Count pending keywords
- If 0 keywords → Skip stage, log "No keywords to process"
- If 1-20 keywords → Select all (batch_size = count)
- If >20 keywords → Select first 20 (configurable batch_size)
-
Call AI Function:
from igny8_core.ai.functions.auto_cluster import AutoCluster result = AutoCluster().execute( payload={'ids': keyword_ids}, account=account ) # Returns: {'task_id': 'celery_task_abc123'} -
Monitor Progress: Listen to Celery task status
- Use existing
StepTrackerphase events (INIT → PREP → AI_CALL → PARSE → SAVE → DONE) - OR poll:
AsyncResult(task_id).stateuntil SUCCESS/FAILURE - Log phase progress: "AI analyzing keywords (65% complete)"
- Use existing
-
Collect Results: When task completes
- AI function already updated Keywords.status → 'mapped'
- AI function already created Cluster records with status='new'
- AI function already deducted credits via AIEngine
- Automation just logs: "Batch complete: N clusters created"
-
Repeat: If more keywords remain, select next batch and go to step 2
Stage Completion Criteria:
- All keyword batches processed (pending_keywords.count() == 0)
- No critical errors
What AI Function Does (Already Implemented - DON'T DUPLICATE):
- ✅ Groups keywords semantically using AI
- ✅ Creates Cluster records with
status='new' - ✅ Updates Keywords:
cluster_id=cluster.id,status='mapped' - ✅ Deducts credits automatically (AIEngine line 395)
- ✅ Logs to AIUsageLog
- ✅ Emits progress events via StepTracker
Stage Result Logged:
{
"keywords_processed": 47,
"clusters_created": 8,
"batches_run": 3,
"credits_used": 10 // Read from AIUsageLog sum, not calculated
}
Stage 2: Clusters → Ideas (AI)
Purpose: Generate content ideas for each cluster
Database Query:
Clusters.objects.filter(
site=site,
status='new',
disabled=False
).exclude(
ideas__isnull=False # Has no ideas yet
)
Process:
- Count clusters without ideas
- If 0 → Skip stage
- If > 0 → Process one cluster at a time (configurable batch size = 1)
- For each cluster:
- Log: "Generating ideas for cluster: {cluster.name}"
- Call
IdeasService.generate_ideas(cluster_ids=[cluster.id], account) - Function returns
{'task_id': 'xyz789'} - Monitor via Celery task status or StepTracker events
- Wait for completion
- Log: "Cluster '{name}' complete: N ideas created"
- Log stage summary
Stage Completion Criteria:
- All clusters processed
- Each cluster now has >=1 idea
- No errors
Updates:
- ContentIdeas: New records created with
status='new',keyword_cluster_id=cluster.id - Clusters:
status='mapped' - Stage result:
{clusters_processed: 8, ideas_created: 56}
Credits: ~16 credits (2 per cluster)
Stage 3: Ideas → Tasks (Local Queue)
Purpose: Convert content ideas to writer tasks (local, no AI)
Database Query:
ContentIdeas.objects.filter(
site=site,
status='new'
)
Process:
- Count pending ideas
- If 0 → Skip stage
- If > 0 → Split into batches of 20
- For each batch:
- Log: "Queueing batch X/Y (20 ideas)"
- Call
bulk_queue_to_writerview logic (NOT via HTTP, direct function call) - For each idea:
- Create Tasks record with title=idea.idea_title, status='queued', cluster=idea.keyword_cluster
- Update idea status to 'queued'
- Log: "Batch X complete: 20 tasks created"
- Log stage summary
Stage Completion Criteria:
- All batches processed
- All ideas now have
status='queued' - Corresponding Tasks exist with
status='queued' - No errors
Updates:
- Tasks: New records created with
status='queued' - ContentIdeas:
statuschanged 'new' → 'queued' - Stage result:
{ideas_processed: 56, tasks_created: 56, batches: 3}
Credits: 0 (local operation)
Stage 4: Tasks → Content (AI)
Purpose: Generate full content drafts from tasks
Database Query (Automation Orchestrator):
pending_tasks = Tasks.objects.filter(
site=site,
status='queued',
content__isnull=True # No content generated yet
)
Orchestration Logic:
-
Select Item: Count queued tasks
- If 0 → Skip stage
- If > 0 → Select ONE task at a time (sequential processing)
-
Call AI Function:
from igny8_core.ai.functions.generate_content import GenerateContent result = GenerateContent().execute( payload={'ids': [task.id]}, account=account ) # Returns: {'task_id': 'celery_task_xyz789'} -
Monitor Progress: Listen to Celery task status
- Use
StepTrackerphase events for real-time updates - Log: "Writing sections (65% complete)" (from phase metadata)
- Content generation takes 5-15 minutes per task
- Use
-
Collect Results: When task completes
- AI function already created Content with
status='draft' - AI function already updated Task.status → 'completed'
- AI function already updated Idea.status → 'completed'
- AI function already deducted credits based on word count
- Automation logs: "Content created (2500 words)"
- AI function already created Content with
-
Repeat: Process next task sequentially
Stage Completion Criteria:
- All tasks processed (pending_tasks.count() == 0)
- Each task has linked Content record
What AI Function Does (Already Implemented):
- ✅ Generates article sections using AI
- ✅ Creates Content record with
status='draft',task_id=task.id - ✅ Updates Task:
status='completed' - ✅ Updates linked Idea:
status='completed' - ✅ Deducts credits: 1 credit per 500 words (automatic)
- ✅ Logs to AIUsageLog with word count
Stage Result Logged:
{
"tasks_processed": 56,
"content_created": 56,
"total_words": 140000,
"credits_used": 280 // From AIUsageLog, not calculated
}
Stage 5: Content → Image Prompts (AI)
Purpose: Extract image prompts from content and create Images records with prompts
CRITICAL: There is NO separate "ImagePrompts" model. Images records ARE the prompts (with status='pending') until images are generated.
Database Query (Automation Orchestrator):
# Content that has NO Images records at all
content_without_images = Content.objects.filter(
site=site,
status='draft'
).annotate(
images_count=Count('images')
).filter(
images_count=0 # No Images records exist yet
)
Orchestration Logic:
-
Select Item: Count content without any Images records
- If 0 → Skip stage
- If > 0 → Select ONE content at a time (sequential)
-
Call AI Function:
from igny8_core.ai.functions.generate_image_prompts import GenerateImagePromptsFunction result = GenerateImagePromptsFunction().execute( payload={'ids': [content.id]}, account=account ) # Returns: {'task_id': 'celery_task_prompts456'} -
Monitor Progress: Wait for completion
-
Collect Results: When task completes
- AI function already created Images records with:
status='pending'prompt='...'(AI-generated prompt text)image_type='featured'or'in_article'content_id=content.id
- Content.status stays
'draft'(unchanged) - Automation logs: "Content '{title}' complete: N prompts created"
- AI function already created Images records with:
-
Repeat: Process next content sequentially
Stage Completion Criteria:
- All content processed (content_without_images.count() == 0)
- Each content has >=1 Images record with
status='pending'and prompt text
What AI Function Does (Already Implemented):
- ✅ Extracts featured image prompt from title/intro
- ✅ Extracts in-article prompts from H2 headings
- ✅ Creates Images records with
status='pending',prompt='...' - ✅ Deducts credits automatically (0.5 per prompt)
- ✅ Logs to AIUsageLog
Stage Result Logged:
{
"content_processed": 56,
"prompts_created": 224,
"credits_used": 112 // From AIUsageLog
}
Stage 6: Images (Prompts) → Generated Images (AI)
Purpose: Generate actual image URLs from Images records that contain prompts
CRITICAL: Input is Images records with status='pending' (these contain the prompts). Output is same Images records updated with status='generated' and image_url='https://...'
Database Query (Automation Orchestrator):
# Images with prompts waiting to be generated
pending_images = Images.objects.filter(
site=site,
status='pending' # Has prompt text, needs image URL
)
Orchestration Logic:
-
Select Item: Count pending Images
- If 0 → Skip stage
- If > 0 → Select ONE Image at a time (sequential)
-
Call AI Function:
from igny8_core.ai.functions.generate_images import GenerateImages result = GenerateImages().execute( payload={'image_ids': [image.id]}, account=account ) # Returns: {'task_id': 'celery_task_img789'} -
Monitor Progress: Wait for completion
-
Collect Results: When task completes
- AI function already called image API using the
promptfield - AI function already updated Images:
status='pending'→status='generated'image_url='https://...'(populated with generated image URL)
- AI function already deducted credits (1-4 per image)
- Automation logs: "Image generated: {image_url}"
- AI function already called image API using the
-
Automatic Content Status Change (NOT done by automation):
- After each image generation, background task checks if ALL Images for that Content are now
status='generated' - When last image completes: Content.status changes
'draft'→'review'(inai/tasks.pyline 723) - Automation does NOT trigger this - happens automatically
- After each image generation, background task checks if ALL Images for that Content are now
-
Repeat: Process next pending Image sequentially
Stage Completion Criteria:
- All pending Images processed (pending_images.count() == 0)
- All Images now have
image_url != null,status='generated'
What AI Function Does (Already Implemented):
- ✅ Reads
promptfield from Images record - ✅ Calls image generation API (OpenAI/Runware) with prompt
- ✅ Updates Images:
image_url=generated_url,status='generated' - ✅ Deducts credits automatically (1-4 per image)
- ✅ Logs to AIUsageLog
What Happens Automatically (ai/tasks.py:723):
- ✅ Background task checks if all Images for a Content are
status='generated' - ✅ When complete: Content.status changes
'draft'→'review' - ✅ This happens OUTSIDE automation orchestrator (in Celery task)
Stage Result Logged:
{
"images_processed": 224,
"images_generated": 224,
"content_moved_to_review": 56, // Side effect (automatic)
"credits_used": 448 // From AIUsageLog
}
Stage 7: Manual Review Gate (STOP)
Purpose: Automation ends - content automatically moved to 'review' status ready for manual review
CRITICAL: Content with status='review' was automatically set in Stage 6 when ALL images completed. Automation just counts them.
Database Query (Automation Orchestrator):
# Content that has ALL images generated (status already changed to 'review')
ready_for_review = Content.objects.filter(
site=site,
status='review' # Automatically set when all images complete
)
Orchestration Logic:
-
Count Only: Count content with
status='review'- No processing, just counting
- These Content records already have all Images with
status='generated'
-
Log Results:
- Log: "Automation complete. X content pieces ready for review"
- Log: "Content IDs ready: [123, 456, 789, ...]"
-
Mark Run Complete:
- AutomationRun.status = 'completed'
- AutomationRun.completed_at = now()
-
Send Notification (optional):
- Email/notification: "Your automation run completed. X content pieces ready for review"
-
STOP: No further automation stages
Stage Completion Criteria:
- Counting complete
- Automation run marked
status='completed'
What AI Function Does:
- N/A - No AI function called in this stage
Stage Result Logged:
{
"ready_for_review": 56,
"content_ids": [123, 456, 789, ...]
}
What Happens Next (Manual - User Action):
- User navigates to
/writer/contentpage - Content page shows filter:
status='review' - User sees 56 content pieces with all images generated
- User manually reviews:
- Content quality
- Image relevance
- Brand voice
- Accuracy
- User selects multiple content → "Bulk Publish" action
- Existing WordPress publishing workflow executes
Why Manual Review is Required:
- Quality control before public publishing
- Legal/compliance verification
- Brand voice consistency check
- Final accuracy confirmation
🔄 BATCH PROCESSING WITHIN STAGES
Critical Concepts
Batch vs Queue:
- Batch: Group of items processed together in ONE AI call
- Queue: Total pending items waiting to be processed
Example - Stage 1 with 47 keywords:
Total Queue: 47 keywords
Batch Size: 20
Execution:
Batch 1: Keywords 1-20 → Call auto_cluster → Wait for completion
Batch 2: Keywords 21-40 → Call auto_cluster → Wait for completion
Batch 3: Keywords 41-47 → Call auto_cluster → Wait for completion
Total Batches: 3
Processing: Sequential (never parallel)
UI Display:
Stage 1: Keywords → Clusters
Status: ● Processing
Queue: 47 keywords total
Progress: Batch 2/3 (40 processed, 7 remaining)
Current: Processing keywords 21-40
Time Elapsed: 4m 30s
Credits Used: 8
Batch Completion Triggers
Within Stage:
- Batch completes → Immediately start next batch
- Last batch completes → Stage complete
Between Stages:
- Stage N completes → Trigger Stage N+1 automatically
- Hard verification: Ensure queue is empty before proceeding
Detailed Stage Processing Queues (UI Elements):
Each stage card should show:
- Total Queue Count - How many items need processing in this stage
- Current Batch - Which batch is being processed (e.g., "Batch 2/5")
- Processed Count - How many items completed so far
- Remaining Count - How many items left in queue
- Current Item - What specific item is processing right now (for single-item batches)
Example UI for Stage 4:
┌──────────────────────────────────────────────────────────────┐
│ STAGE 4: Tasks → Content (AI) │
│ Status: ● Processing │
│ │
│ 📊 QUEUE OVERVIEW: │
│ ├─ Total Tasks: 56 │
│ ├─ Processed: 23 │
│ ├─ Remaining: 33 │
│ └─ Progress: ━━━━━━━╸━━━━━━━━━━━━ 41% │
│ │
│ 🔄 CURRENT PROCESSING: │
│ ├─ Item: Task 24/56 │
│ ├─ Title: "Ultimate Coffee Bean Buying Guide" │
│ ├─ Progress: Writing sections (65% complete) │
│ └─ Time: 2m 15s elapsed │
│ │
│ 💳 STAGE STATS: │
│ ├─ Credits Used: 46 │
│ ├─ Time Elapsed: 1h 23m │
│ └─ ETA: 1h 15m remaining │
│ │
│ [View Details] [Pause Stage] │
└──────────────────────────────────────────────────────────────┘
🗄️ DATABASE STRUCTURE
New Models to Create
AutomationRun (tracks each automation execution)
Table: igny8_automation_runs
Fields:
- id: Integer (PK)
- run_id: String (unique, indexed) - Format: run_20251203_140523_manual
- account_id: ForeignKey(Account)
- site_id: ForeignKey(Site)
- trigger_type: String - Choices: 'manual', 'scheduled'
- status: String - Choices: 'running', 'paused', 'completed', 'failed'
- current_stage: Integer - Current stage number (1-7)
- started_at: DateTime
- completed_at: DateTime (nullable)
- total_credits_used: Integer
- stage_1_result: JSON - {keywords_processed, clusters_created, batches}
- stage_2_result: JSON - {clusters_processed, ideas_created}
- stage_3_result: JSON - {ideas_processed, tasks_created}
- stage_4_result: JSON - {tasks_processed, content_created, total_words}
- stage_5_result: JSON - {content_processed, prompts_created}
- stage_6_result: JSON - {prompts_processed, images_generated}
- stage_7_result: JSON - {ready_for_review}
- error_message: Text (nullable)
Indexes:
- run_id (unique)
- site_id, started_at
- status, started_at
AutomationConfig (per-site configuration)
Table: igny8_automation_configs
Fields:
- id: Integer (PK)
- account_id: ForeignKey(Account)
- site_id: ForeignKey(Site, unique) - ONE config per site
- is_enabled: Boolean - Whether scheduled automation is active
- frequency: String - Choices: 'daily', 'weekly', 'monthly'
- scheduled_time: Time - When to run (e.g., 02:00)
- stage_1_batch_size: Integer - Default 20 (keywords per batch)
- stage_2_batch_size: Integer - Default 1 (clusters at a time)
- stage_3_batch_size: Integer - Default 20 (ideas per batch)
- stage_4_batch_size: Integer - Default 1 (tasks - sequential)
- stage_5_batch_size: Integer - Default 1 (content at a time)
- stage_6_batch_size: Integer - Default 1 (images - sequential)
- last_run_at: DateTime (nullable)
- next_run_at: DateTime (nullable) - Calculated based on frequency
Constraints:
- Unique: site_id (one config per site)
File-Based Logging Structure
Directory Structure:
logs/
└── automation/
└── {account_id}/
└── {site_id}/
└── {run_id}/
├── automation_run.log (main activity log)
├── stage_1.log (keywords → clusters)
├── stage_2.log (clusters → ideas)
├── stage_3.log (ideas → tasks)
├── stage_4.log (tasks → content)
├── stage_5.log (content → prompts)
├── stage_6.log (prompts → images)
└── stage_7.log (review gate)
Log File Format (automation_run.log):
========================================
AUTOMATION RUN: run_20251203_140523_manual
Started: 2025-12-03 14:05:23
Trigger: manual
Account: 5
Site: 12
========================================
14:05:23 - Automation started (trigger: manual)
14:05:23 - Credit check: Account has 1500 credits, estimated need: 866 credits
14:05:23 - Stage 1 starting: Keywords → Clusters
14:05:24 - Stage 1: Found 47 pending keywords
14:05:24 - Stage 1: Processing batch 1/3 (20 keywords)
14:05:25 - Stage 1: AI task queued: task_id=abc123
14:07:30 - Stage 1: Batch 1 complete - 3 clusters created
14:07:31 - Stage 1: Processing batch 2/3 (20 keywords)
[... continues ...]
Stage-Specific Log (stage_1.log):
========================================
STAGE 1: Keywords → Clusters (AI)
Started: 2025-12-03 14:05:23
========================================
14:05:24 - Query: Keywords.objects.filter(site=12, status='new', cluster__isnull=True)
14:05:24 - Found 47 pending keywords
14:05:24 - Batch size: 20 keywords
14:05:24 - Total batches: 3
--- Batch 1/3 ---
14:05:24 - Keyword IDs: [101, 102, 103, ..., 120]
14:05:25 - Calling ClusteringService.cluster_keywords(ids=[101..120], account=5, site_id=12)
14:05:25 - AI task queued: task_id=abc123
14:05:26 - Monitoring task status...
14:05:28 - Phase: INIT - Initializing (StepTracker event)
14:05:45 - Phase: AI_CALL - AI analyzing keywords (StepTracker event)
14:07:15 - Phase: SAVE - Creating clusters (StepTracker event)
14:07:30 - Phase: DONE - Complete
14:07:30 - Result: 3 clusters created
14:07:30 - Clusters: ["Coffee Beans", "Brewing Methods", "Coffee Equipment"]
14:07:30 - Credits used: 4 (from AIUsageLog)
--- Batch 2/3 ---
[... continues ...]
========================================
STAGE 1 COMPLETE
Total Time: 5m 30s
Processed: 47 keywords
Clusters Created: 8
Credits Used: 10
========================================
🔐 SAFETY MECHANISMS
1. Concurrency Control (Prevent Duplicate Runs)
Problem: User clicks "Run Now" while scheduled task is running
Solution: Distributed locking using Django cache
Implementation Logic:
When starting automation:
1. Try to acquire lock: cache.add(f'automation_lock_{site.id}', 'locked', timeout=21600)
2. If lock exists → Return error: "Automation already running for this site"
3. If lock acquired → Proceed with run
4. On completion/failure → Release lock: cache.delete(f'automation_lock_{site.id}')
Also check database:
- Query AutomationRun.objects.filter(site=site, status='running').exists()
- If exists → Error: "Another automation is running"
User sees:
- "Automation already in progress. Started at 02:00 AM, currently on Stage 4."
- Link to view current run progress
2. Credit Reservation (Prevent Mid-Run Failures)
Problem: Account runs out of credits during Stage 4
Solution: Reserve estimated credits at start, deduct as used
Implementation Logic:
Before starting:
1. Estimate total credits needed:
- Count keywords → estimate clustering credits
- Count clusters → estimate ideas credits
- Estimate content generation (assume avg word count)
- Estimate image generation (assume 4 images per content)
2. Check: account.credits_balance >= estimated_credits * 1.2 (20% buffer)
3. If insufficient → Error: "Need ~866 credits, you have 500"
4. Reserve credits: account.credits_reserved += estimated_credits
5. As each stage completes → Deduct actual: account.credits_balance -= actual_used
6. On completion → Release unused: account.credits_reserved -= unused
Database fields needed:
- Account.credits_reserved (new field)
3. Stage Idempotency (Safe to Retry)
Problem: User resumes paused run, Stage 1 runs again creating duplicate clusters
Solution: Check if stage already completed before executing
Implementation Logic:
At start of each run_stage_N():
1. Check AutomationRun.stage_N_result
2. If result exists and has processed_count > 0:
- Log: "Stage N already completed - skipping"
- return (skip to next stage)
3. Else: Proceed with stage execution
4. Celery Task Chaining (Non-Blocking Workers)
Problem: Synchronous execution blocks Celery worker for hours
Solution: Chain stages as separate Celery tasks
Implementation Logic:
Instead of:
def start_automation():
run_stage_1() # blocks for 30 min
run_stage_2() # blocks for 45 min
...
Do:
@shared_task
def run_stage_1_task(run_id):
service = AutomationService.from_run_id(run_id)
service.run_stage_1()
# Trigger next stage
run_stage_2_task.apply_async(args=[run_id], countdown=5)
@shared_task
def run_stage_2_task(run_id):
service = AutomationService.from_run_id(run_id)
service.run_stage_2()
run_stage_3_task.apply_async(args=[run_id], countdown=5)
Benefits:
- Workers not blocked for hours
- Can retry individual stages
- Better monitoring in Celery Flower
- Horizontal scaling possible
5. Pause/Resume Capability
User Can:
- Pause automation at any point
- Resume from where it left off
Implementation Logic:
Pause:
- Update AutomationRun.status = 'paused'
- Current stage completes current batch then stops
- Celery task checks status before each batch
Resume:
- Update AutomationRun.status = 'running'
- Restart from current_stage
- Use idempotency check to skip completed work
6. Error Handling Per Stage
If Stage Fails:
try:
run_stage_1()
except Exception as e:
- Log error to stage_1.log
- Update AutomationRun:
- status = 'failed'
- error_message = str(e)
- current_stage = 1 (where it failed)
- Send notification: "Automation failed at Stage 1"
- Stop execution (don't proceed to Stage 2)
User can:
- View logs to see what went wrong
- Fix issue (e.g., add credits)
- Click "Resume" to retry from Stage 1
7. Log Cleanup (Prevent Disk Bloat)
Problem: After 1000 runs, logs occupy 80MB+ per site
Solution: Celery periodic task to delete old logs
Implementation Logic:
@shared_task
def cleanup_old_automation_logs():
cutoff = datetime.now() - timedelta(days=90) # Keep last 90 days
old_runs = AutomationRun.objects.filter(
started_at__lt=cutoff,
status__in=['completed', 'failed']
)
for run in old_runs:
log_dir = f'logs/automation/{run.account_id}/{run.site_id}/{run.run_id}/'
shutil.rmtree(log_dir) # Delete directory
run.delete() # Remove DB record
Schedule: Weekly, Monday 3 AM
🎨 FRONTEND DESIGN
Page Structure: /automation
Layout:
┌─────────────────────────────────────────────────────────────┐
│ 🤖 AI AUTOMATION PIPELINE │
│ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ │
│ │
│ │
│ ⏰ SCHEDULE │
│ Next Run: Tomorrow at 2:00 AM (in 16 hours) │
│ Frequency: [Daily ▼] at [02:00 ▼] │
│ Status: ● Scheduled │
│ │
│ [Run Now] [Pause Schedule] [Configure] │
│ │
├─────────────────────────────────────────────────────────────┤
│ 📊 PIPELINE OVERVIEW │
│ │
│ Keywords ──→ Clusters ──→ Ideas ──→ Tasks ──→ Content │
│ 47 8 42 20 generating │
│ pending new ready queued Stage 5 │
│ │
│ Overall Progress: ━━━━━━━╸━━━━━━━━━ 62% (Stage 5/7) │
│ Estimated Completion: 2 hours 15 minutes │
│ │
└─────────────────────────────────────────────────────────────┘
[STAGE 1 CARD - completed state]
[STAGE 2 CARD - completed state]
[STAGE 3 CARD - completed state]
[STAGE 4 CARD - running state with queue details]
[STAGE 5 CARD - waiting state]
[STAGE 6 CARD - waiting state]
[STAGE 7 CARD - gate state]
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
📋 LIVE ACTIVITY LOG (Last 50 events)
├─ 14:23:45 - Stage 4: Started content generation for Task 3
├─ 14:24:12 - Stage 4: Writing sections (65% complete)
├─ 14:22:30 - Stage 4: Completed Task 2 → Content created
├─ 14:20:15 - Stage 4: Started content generation for Task 2
└─ [View Full Log]
💰 TOTAL CREDITS USED THIS RUN: 66 credits
Components:
StageCard.tsx - Individual stage display component
- Props: stageNumber, stageName, status, queueData, result
- Shows: Status badge, queue overview, progress bar, stats
- Actions: "View Details", "Pause", "Retry Failed"
ActivityLog.tsx - Live activity feed component
- Props: runId
- Fetches:
/api/v1/automation/activity_log/{runId}every 3 seconds - Shows: Timestamped log entries, color-coded by type (info/success/error)
ConfigModal.tsx - Schedule configuration modal
- Fields: Frequency dropdown, Time picker, Batch sizes (advanced)
- Saves to: AutomationConfig model via
/api/v1/automation/config/
Sidebar Menu Addition:
Sites
├─ Site Management
└─ Site Settings
Automation ← NEW
└─ Pipeline Dashboard
Planner
├─ Keywords
├─ Clusters
└─ Ideas
Real-Time Progress Updates
UI Update Strategy:
- Frontend Polling: Poll automation status API every 3 seconds when run is active
- Backend Progress: Uses event-based
StepTrackerto capture AI function phases - When automation is
status='running'→ Poll every 3 seconds - When
status='completed'orstatus='failed'→ Stop polling - When
status='paused'→ Poll every 10 seconds
How Progress Works:
- AI Function Execution: Each AI function emits phase events (INIT, PREP, AI_CALL, PARSE, SAVE, DONE)
- StepTracker Captures: Progress tracker records these events with metadata
- Automation Logs: Orchestrator reads from StepTracker and logs to file
- UI Polls: Frontend polls automation status API to read aggregated progress
- Display: UI shows current phase and completion percentage per stage
API Endpoint:
GET /api/v1/automation/current_run/?site_id=12
Response:
{
"run": {
"run_id": "run_20251203_140523_manual",
"status": "running",
"current_stage": 4,
"started_at": "2025-12-03T14:05:23Z",
"total_credits_used": 66,
"stage_1_result": {"keywords_processed": 47, "clusters_created": 8},
"stage_2_result": {"clusters_processed": 8, "ideas_created": 56},
"stage_3_result": {"ideas_processed": 56, "tasks_created": 56},
"stage_4_result": {"tasks_processed": 23, "tasks_total": 56},
...
},
"activity_log": [
"14:23:45 - Stage 4: Started content generation for Task 3",
"14:24:12 - Stage 4: Writing sections (65% complete)",
...
],
"queues": {
"stage_1": {"total": 0, "pending": 0},
"stage_2": {"total": 0, "pending": 0},
"stage_3": {"total": 0, "pending": 0},
"stage_4": {"total": 56, "pending": 33},
"stage_5": {"total": 23, "pending": 23},
"stage_6": {"total": 0, "pending": 0},
"stage_7": {"total": 0, "pending": 0}
}
}
🔄 BACKEND IMPLEMENTATION FLOW
Service Layer Architecture
AutomationService (core orchestrator)
- Location:
backend/igny8_core/business/automation/services/automation_service.py - Responsibility: Execute stages sequentially, manage run state
- Reuses: All existing AI function classes (NO duplication)
AutomationLogger (file logging)
- Location:
backend/igny8_core/business/automation/services/automation_logger.py - Responsibility: Write timestamped logs to files
- Methods: start_run(), log_stage_start(), log_stage_progress(), log_stage_complete()
Key Service Methods:
AutomationService:
- __init__(account, site) → Initialize with site context (NO sector)
- start_automation(trigger_type) → Main entry point
- run_stage_1() → Keywords → Clusters
- run_stage_2() → Clusters → Ideas
- run_stage_3() → Ideas → Tasks
- run_stage_4() → Tasks → Content
- run_stage_5() → Content → Prompts
- run_stage_6() → Prompts → Images
- run_stage_7() → Review gate
- pause_automation() → Pause current run
- resume_automation() → Resume from current_stage
- estimate_credits() → Calculate estimated credits needed
AutomationLogger:
- start_run(account_id, site_id, trigger_type) → Create log directory, return run_id
- log_stage_start(run_id, stage_number, stage_name, pending_count)
- log_stage_progress(run_id, stage_number, message)
- log_stage_complete(run_id, stage_number, processed_count, time_elapsed, credits_used)
- log_stage_error(run_id, stage_number, error_message)
- get_activity_log(run_id, last_n=50) → Return last N log lines
API Endpoints to Implement
AutomationViewSet - Django REST Framework ViewSet
- Base URL:
/api/v1/automation/ - Actions:
POST /api/v1/automation/run_now/
- Body: {"site_id": 12}
- Action: Trigger manual automation run
- Returns: {"run_id": "run_...", "message": "Automation started"}
GET /api/v1/automation/current_run/?site_id=12
- Returns: Current/latest run status, activity log, queue counts
POST /api/v1/automation/pause/
- Body: {"run_id": "run_..."}
- Action: Pause running automation
POST /api/v1/automation/resume/
- Body: {"run_id": "run_..."}
- Action: Resume paused automation
GET /api/v1/automation/config/?site_id=12
- Returns: AutomationConfig for site
PUT /api/v1/automation/config/
- Body: {"site_id": 12, "is_enabled": true, "frequency": "daily", "scheduled_time": "02:00"}
- Action: Update automation schedule
GET /api/v1/automation/history/?site_id=12&page=1
- Returns: Paginated list of past runs
GET /api/v1/automation/logs/{run_id}/
- Returns: Full logs for a specific run (all stage files)
Celery Tasks for Scheduling
Periodic Task (runs every hour)
@shared_task(name='check_scheduled_automations')
def check_scheduled_automations():
"""
Runs every hour (via Celery Beat)
Checks if any AutomationConfig needs to run
"""
now = timezone.now()
configs = AutomationConfig.objects.filter(
is_enabled=True,
next_run_at__lte=now
)
for config in configs:
# Check for concurrent run
if AutomationRun.objects.filter(site=config.site, status='running').exists():
continue # Skip if already running
# Start automation
run_automation_task.delay(
account_id=config.account_id,
site_id=config.site_id,
trigger_type='scheduled'
)
# Calculate next run time
if config.frequency == 'daily':
config.next_run_at = now + timedelta(days=1)
elif config.frequency == 'weekly':
config.next_run_at = now + timedelta(weeks=1)
elif config.frequency == 'monthly':
config.next_run_at = now + timedelta(days=30)
config.last_run_at = now
config.save()
Schedule in celery.py:
app.conf.beat_schedule['check-scheduled-automations'] = {
'task': 'check_scheduled_automations',
'schedule': crontab(minute=0), # Every hour on the hour
}
Stage Task Chain
@shared_task
def run_automation_task(account_id, site_id, trigger_type):
"""
Main automation task - chains individual stage tasks
"""
service = AutomationService(account_id, site_id)
run_id = service.start_automation(trigger_type)
# Chain stages as separate tasks for non-blocking execution
chain(
run_stage_1.si(run_id),
run_stage_2.si(run_id),
run_stage_3.si(run_id),
run_stage_4.si(run_id),
run_stage_5.si(run_id),
run_stage_6.si(run_id),
run_stage_7.si(run_id),
).apply_async()
@shared_task
def run_stage_1(run_id):
service = AutomationService.from_run_id(run_id)
service.run_stage_1()
return run_id # Pass to next task
@shared_task
def run_stage_2(run_id):
service = AutomationService.from_run_id(run_id)
service.run_stage_2()
return run_id
[... similar for stages 3-7 ...]
🧪 TESTING STRATEGY
Unit Tests
Test AutomationService:
- test_estimate_credits_calculation()
- test_stage_1_processes_batches_correctly()
- test_stage_completion_triggers_next_stage()
- test_pause_stops_after_current_batch()
- test_resume_from_paused_state()
- test_idempotency_skips_completed_stages()
Test AutomationLogger:
- test_creates_log_directory_structure()
- test_writes_timestamped_log_entries()
- test_get_activity_log_returns_last_n_lines()
Integration Tests
Test Full Pipeline:
def test_full_automation_pipeline():
# Setup: Create 10 keywords
keywords = KeywordFactory.create_batch(10, site=site)
# Execute
service = AutomationService(account, site)
result = service.start_automation(trigger_type='manual')
# Assert Stage 1
assert result['stage_1_result']['keywords_processed'] == 10
assert result['stage_1_result']['clusters_created'] > 0
# Assert Stage 2
assert result['stage_2_result']['ideas_created'] > 0
# Assert Stage 3
assert result['stage_3_result']['tasks_created'] > 0
# Assert Stage 4
assert result['stage_4_result']['content_created'] > 0
# Assert Stage 5
assert result['stage_5_result']['prompts_created'] > 0
# Assert Stage 6
assert result['stage_6_result']['images_generated'] > 0
# Assert final state
assert result['status'] == 'completed'
assert AutomationRun.objects.get(run_id=result['run_id']).status == 'completed'
Test Error Scenarios:
- test_insufficient_credits_prevents_start()
- test_concurrent_run_prevented()
- test_stage_failure_stops_pipeline()
- test_rollback_on_error()
📋 IMPLEMENTATION CHECKLIST
Phase 1: Database & Models (Week 1)
- Create
automationapp directory structure - Define AutomationRun model with all stage_result JSON fields
- Define AutomationConfig model (one per site, NO sector)
- Create migrations
- Test model creation and queries
Phase 2: Logging Service (Week 1)
- Create AutomationLogger class
- Implement start_run() with log directory creation
- Implement log_stage_start(), log_stage_progress(), log_stage_complete()
- Implement get_activity_log()
- Test file logging manually
Phase 3: Core Automation Service (Week 2)
- Create AutomationService class
- Implement estimate_credits()
- Implement start_automation() with credit check
- Implement run_stage_1() calling ClusteringService
- Test Stage 1 in isolation with real keywords
- Implement run_stage_2() calling IdeasService
- Test Stage 2 in isolation
- Implement run_stage_3() calling bulk_queue_to_writer logic
- Implement run_stage_4() calling GenerateContentFunction
- Implement run_stage_5() calling GenerateImagePromptsFunction
- Implement run_stage_6() calling GenerateImagesFunction
- Implement run_stage_7() review gate (count only)
- Implement pause_automation() and resume_automation()
Phase 4: API Endpoints (Week 3)
- Create AutomationViewSet
- Implement run_now() action
- Implement current_run() action
- Implement pause() and resume() actions
- Implement config GET/PUT actions
- Implement history() action
- Implement logs() action
- Add URL routing in api_urls.py
- Test all endpoints with Postman/curl
Phase 5: Celery Tasks & Scheduling (Week 3)
- Create check_scheduled_automations periodic task
- Create run_automation_task
- Create stage task chain (run_stage_1, run_stage_2, etc.)
- Register tasks in celery.py
- Add Celery Beat schedule
- Test scheduled execution
Phase 6: Frontend Components (Week 4)
- Create /automation route in React Router
- Create Dashboard.tsx page component
- Create StageCard.tsx with queue display
- Create ActivityLog.tsx with 3-second polling
- Create ConfigModal.tsx for schedule settings
- Add "Automation" to sidebar menu (below Sites)
- Implement "Run Now" button
- Implement "Pause" and "Resume" buttons
- Test full UI flow
Phase 7: Safety & Polish (Week 5)
- Implement distributed locking (prevent concurrent runs)
- Implement credit reservation system
- Implement stage idempotency checks
- Implement error handling and rollback
- Create cleanup_old_automation_logs task
- Add email/notification on completion/failure
- Load testing with 100+ keywords
- UI polish and responsiveness
- Documentation update
🚀 POST-LAUNCH ENHANCEMENTS
Future Features (Phase 8+)
- Conditional Stages: Skip stages if no data (e.g., skip Stage 1 if no keywords)
- Parallel Task Processing: Process multiple tasks simultaneously in Stage 4 (with worker limits)
- Smart Scheduling: Avoid peak hours, optimize for cost
- A/B Testing: Test different prompts, compare results
- Content Quality Scoring: Auto-reject low-quality AI content
- WordPress Auto-Publish: With approval workflow and staging
- Analytics Integration: Track content performance post-publish
- Social Media Auto-Post: Share published content to social channels
📖 USER DOCUMENTATION
How to Use Automation
1. Configure Schedule:
- Navigate to Automation page
- Click "Configure" button
- Set frequency (Daily/Weekly/Monthly)
- Set time (e.g., 2:00 AM)
- Optionally adjust batch sizes (advanced)
- Click "Save"
2. Manual Run:
- Click "Run Now" button
- Monitor progress in real-time
- View activity log for details
3. Review Content:
- Wait for automation to complete (or check next morning if scheduled)
- Navigate to Writer → Content page
- Filter by "Draft" status with images generated
- Review content quality
- Select multiple → Bulk Publish
4. Monitor History:
- View past runs in History tab
- Click run to view detailed logs
- See credits used per run
✅ SUCCESS CRITERIA
Automation is successful if:
- ✅ Runs without manual intervention from Keywords → Draft Content
- ✅ Processes 100+ keywords without errors
- ✅ Respects credit limits (pre-check + reservation)
- ✅ Stops at review gate (doesn't auto-publish)
- ✅ Completes within estimated time (6-12 hours for 100 keywords)
- ✅ UI shows real-time progress accurately
- ✅ Logs are detailed and troubleshoot-able
- ✅ Can pause/resume without data loss
- ✅ Scheduled runs trigger correctly
- ✅ No duplicate runs occur
- ✅ Reuses ALL existing AI functions (zero duplication)
END OF COMPLETE IMPLEMENTATION PLAN
This plan ensures a safe, modular, observable, and maintainable automation system that orchestrates the existing IGNY8 AI functions into a fully automated content pipeline.