50 KiB
01E: Blueprint-Aware Content Pipeline
Version: 1.1 (codebase-verified) Source of Truth: Codebase at
/data/app/igny8/backend/Last Verified: 2025-07-14
IGNY8 Phase 1: Content Automation with SAG Blueprint Enhancement
1. CURRENT STATE
Existing Pipeline Architecture
IGNY8's content pipeline operates as a 7-stage automated system, orchestrated via Celery with scheduled execution (daily/weekly/monthly via Celery Beat):
| Stage | Function | Automation | Output |
|---|---|---|---|
| 1 | Keywords | Import CSV/seed lists | Keyword list per site |
| 2 | Clusters | AutoClusterKeywords (GPT-4) | Semantic keyword groups |
| 3 | Ideas | GenerateIdeas | Content brief queue |
| 4 | Tasks | Queue creation | Writer task list |
| 5 | Content | GenerateContent (AI) | Draft articles |
| 6 | Images | GenerateImages | Featured + in-article images |
| 7 | Review | Editorial queue | Published content |
Current Limitations
- Generic clustering: All keywords grouped by semantic similarity, no business-specific structure
- One-size-fits-all content: All articles follow same template regardless of content type
- No hierarchy: No distinction between hub pages, blog posts, product pages, term pages, or service pages
- No priority: All content treated equally; foundational content (hubs) may not be written first
- No taxonomy integration: Generated content not automatically assigned to custom taxonomies
- No blueprint context: Writers receive keywords but not strategic framework
Celery Automation Context
- Celery Beat: Manages recurring schedule (daily, weekly, monthly per site)
- Task Queue: Each stage enqueued as separate Celery task
- State Tracking: Uses Django ORM to track Job, Stage, Keyword, Cluster, Idea, Task, Content, Image models
- Failure Handling: Retry logic, dead-letter queue for failed tasks
- Logging: Structured logging to track execution per site per stage
2. WHAT TO BUILD
Vision: Blueprint-Driven Pipeline
When a site has an active SAG Blueprint, every pipeline stage becomes context-aware:
- Content priorities driven by blueprint's execution phases
- Content types (hub, blog, product, term, service) determined at ideation
- Prompt templates matched to content structure and type
- Output taxonomy-tagged and cluster-assigned automatically
When no blueprint exists, the pipeline reverts to legacy mode—no breaking changes.
New/Enhanced Stages
Stage 0: Blueprint Check (NEW)
Execute before pipeline stages 1–7.
Responsibility: Determine execution mode and load context.
Logic:
IF Site.sag_blueprint EXISTS AND sag_blueprint.status == 'active':
LOAD blueprint
IDENTIFY unfulfilled content needs from blueprint.content_plan
DETERMINE execution_priority from blueprint.execution_phases
SET pipeline_mode = 'blueprint_aware'
ELSE:
SET pipeline_mode = 'legacy'
PROCEED to Stage 1 with no blueprint context
Outputs:
pipeline_mode: 'blueprint_aware' | 'legacy'blueprint_context: SAGBlueprint instance (if active)execution_phases: List of priority phases for content queue
Stage 1: Keyword Processing (ENHANCED)
Legacy behavior (no blueprint): Pass keywords to Stage 2 unchanged.
Blueprint-aware (active blueprint):
- For each new/imported keyword, query blueprint's SAGClusters
- Match keyword to existing clusters based on:
- Attribute overlap (e.g., keyword "sustainable farming" matches cluster with attribute "sustainability")
- Semantic proximity to cluster topic
- Sector alignment
- Assign matched keyword to cluster's
keywordslist - Flag unmatched keywords:
- Gap: No cluster exists for this topic
- Outlier: Keyword semantic distance > threshold from all clusters
- Frontier: Keyword extends cluster into new subtopic (possible new cluster)
- Update
SAGCluster.keywords,SAGCluster.updated_at
Outputs:
- Updated cluster keyword lists
- Gap/outlier report for content strategy review
- Flagged keywords for potential new cluster formation
Stage 2: AI Cluster Keywords (ENHANCED)
Legacy behavior (no blueprint): Run existing AutoClusterKeywords via GPT-4 grouping.
Blueprint-aware (active blueprint):
- SKIP
AutoClusterKeywordsentirely - Clusters already defined by SAG framework (Stage 0 loaded blueprint)
- For new keywords from Stage 1:
- Map to existing clusters (already done in Stage 1)
- Create mapping record linking keyword → SAGCluster
- Flag unmatched keywords (from Stage 1) for manual review
- No new clusters created (cluster formation is Phase 1C process, not pipeline)
Outputs:
- Keyword-to-cluster mapping
- Unmatched keyword report
Stage 3: Generate Content Ideas (ENHANCED)
Legacy behavior (no blueprint): Run existing GenerateIdeas function.
Blueprint-aware (active blueprint):
- Call
sag/ai_functions/content_planning.py::GenerateIdeasWithBlueprint - For each idea generated, enrich with:
- Sector: From SAGCluster.sector
- Structure: From blueprint.content_plan[cluster].structure (e.g., 'guide_tutorial', 'comparison', 'review', 'how_to', 'question')
- Type: From blueprint.content_plan[cluster].type (e.g., 'cluster_hub', 'blog_post', 'product_page', 'term_page', 'service_page')
- SAGCluster ID: Link idea to blueprint cluster
- idea_source: Set to 'sag_blueprint'
- Respect execution phases:
- Phase 1: Generate ideas for
category_pages,top_cluster_hubs - Phase 2: Generate ideas for
remaining_hubs,first_blogs_per_cluster - Phase 3: Generate ideas for
attribute_term_pages,product_enrichment - Phase 4: Generate ideas for
additional_blogs,brand_comparisons
- Phase 1: Generate ideas for
- Prioritize queuing by phase
Outputs:
- Idea records with type, structure, sector, cluster assignment
- Execution phase assignments
- Queue prioritized by phase
Stage 4: Create Writer Tasks (ENHANCED)
Legacy behavior (no blueprint): Create basic task with keyword/idea reference.
Blueprint-aware (active blueprint):
- For each idea, create Task with:
- Standard fields: title, keyword, site, status, assigned_to
- New fields:
sag_cluster_id: Reference to blueprint clusterblueprint_context: JSON blob containing execution context
blueprint_contextstructure:{ "cluster_id": "integer", "cluster_name": "string", "cluster_type": "string (topical|product|service)", "cluster_sector": "string", "hub_title": "string (cluster's main hub page title)", "hub_url": "string (blueprint.site.domain/cluster_slug)", "cluster_attributes": ["list of attribute terms"], "related_clusters": ["list of related cluster integer ids"], "cluster_products": ["list of product integer ids if product cluster"], "content_structure": "string (guide_tutorial|comparison|review|how_to|question|listicle)", "content_type": "string (cluster_hub|blog_post|product_page|term_page|service_page)", "execution_phase": "integer (1-4)", "seo_strategy": "object (primary_keyword, related_keywords, intent)" }- If no blueprint: Create task without blueprint_context (legacy)
Outputs:
- Task records with sag_cluster_id and blueprint_context
Stage 5: Generate Article Content (ENHANCED)
Legacy behavior (no blueprint): Run existing GenerateContent with generic prompt.
Blueprint-aware (has blueprint_context):
-
Load prompt template by content_type + content_structure combination:
Content Type Structure Template Key Cluster Hub Guide Tutorial sag_hub_guideCluster Hub Top Listicle sag_hub_listicleBlog Post Comparison sag_blog_comparisonBlog Post Review sag_blog_reviewBlog Post How To sag_blog_howtoBlog Post Question sag_blog_questionTerm Page Guide Tutorial sag_term_pageProduct Page Review sag_product_pageService Page Guide Tutorial sag_service_pageLanding Page Guide Tutorial sag_landing_guideLanding Page Comparison sag_landing_comparisonBusiness Page Guide Tutorial sag_business_guide -
Inject blueprint context variables into prompt template:
{cluster_name} → From SAGCluster.name {cluster_type} → From SAGCluster.cluster_type {cluster_sector} → From SAGCluster.sector {hub_title} → From blueprint_context.hub_title {hub_url} → From blueprint_context.hub_url {attribute_terms} → Comma-separated list from cluster attributes {cluster_products} → Product list if product cluster {related_clusters} → Related cluster names for internal linking {content_structure} → Structure type for consistency {content_type} → Content type for tone/depth -
Call GPT-4 with enriched prompt template
-
Post-process output:
- Add internal links to related cluster hubs
- Add cross-references to attribute term pages
- Inject CTA appropriate to content type (e.g., product link for product cluster)
-
If no blueprint_context: Run legacy
GenerateContentunchanged
Outputs:
- Content record with body, title, sag_cluster_id, content_type, content_structure
Stage 6: Taxonomy Assignment (NEW)
Execute after content generation, only if blueprint exists.
Responsibility: Auto-assign content to custom WP taxonomies derived from blueprint.
Logic:
- Load site's custom taxonomies from blueprint (
SAGCluster.wp_taxonomy_mapping) - For generated content:
- Match content to cluster's attributes and taxonomy terms
- Assign custom taxonomy values from blueprint mapping
- Set
content.sag_cluster_id(links to blueprint structure) - Update cluster status:
- If first content in cluster: set
SAGCluster.status = 'partial' - If all planned content exists: set
SAGCluster.status = 'complete'
- If first content in cluster: set
- Store taxonomy assignments in
Content.taxonomiesJSON field
Outputs:
- Content records tagged with custom taxonomies
- Cluster status updated to reflect content completion
Stage 7: Image Generation (ENHANCED)
Legacy behavior (no blueprint): Generate generic featured + in-article images.
Blueprint-aware (blueprint exists):
- Match image style to content type:
- Hub page: Hero/authority style (professional, comprehensive)
- Blog post: Supporting/educational (friendly, illustrative)
- Product page: E-commerce standard (product-focused, clean)
- Term page: Category representation (taxonomy icon or concept illustration)
- Service page: Service illustration (professional, trustworthy)
- Landing page: Conversion-focused (compelling, aspirational)
- Use cluster theme/color palette from blueprint for style consistency
- Generate alt text leveraging content_structure + cluster context
- If no blueprint: Generate images with default style
Outputs:
- Image records with style type, alt text, sag_cluster_id
Execution Priority (Blueprint-Driven)
Pipeline processes content by SAGBlueprint.execution_priority phases:
execution_priority = {
"phase_1": ["category_pages", "top_cluster_hubs"],
"phase_2": ["remaining_hubs", "first_blogs_per_cluster"],
"phase_3": ["attribute_term_pages", "product_enrichment"],
"phase_4": ["additional_blogs", "brand_comparisons"]
}
Queue behavior:
- Stage 3 filters ideas by phase
- Stage 4 prioritizes tasks by phase
- Celery task enqueuing respects phase order
- Rationale: Foundational content (hubs) published before supporting content (blogs)
3. DATA MODELS / APIs
Related Models (from 01A, 01C, 01D)
# igny8_core/sag/models.py — SAG Blueprint Structure
# DEFAULT_AUTO_FIELD = BigAutoField (integer PKs)
from igny8_core.auth.models import AccountBaseModel
class SAGBlueprint(AccountBaseModel):
site = ForeignKey('igny8_core_auth.Site', on_delete=models.CASCADE)
name = CharField(max_length=255)
status = CharField(choices=['draft', 'active', 'archived'])
created_at = DateTimeField(auto_now_add=True)
updated_at = DateTimeField(auto_now=True)
# Phase-based execution plan
execution_priority = JSONField(default=dict) # phases 1-4
content_plan = JSONField() # cluster_id → content specs
# Taxonomy mapping to WordPress custom taxonomies
wp_taxonomy_mapping = JSONField() # cluster_id → tax values
class SAGCluster(AccountBaseModel):
blueprint = ForeignKey('sag.SAGBlueprint', on_delete=models.CASCADE)
name = CharField(max_length=255)
cluster_type = CharField(choices=['topical', 'product', 'service'])
sector = CharField(max_length=255)
keywords = JSONField(default=list)
attributes = JSONField(default=list)
status = CharField(choices=['draft', 'partial', 'complete'])
updated_at = DateTimeField(auto_now=True)
Pipeline Models (existing — names are PLURAL per codebase convention)
# igny8_core/business/planning/models.py — Planning Pipeline (app_label: planner)
# DEFAULT_AUTO_FIELD = BigAutoField (integer PKs, NOT UUIDs)
class Keywords(SoftDeletableModel, SiteSectorBaseModel):
"""Site-specific keyword instances referencing global SeedKeywords."""
seed_keyword = ForeignKey(SeedKeyword, on_delete=models.CASCADE)
volume_override = IntegerField(null=True, blank=True)
difficulty_override = IntegerField(null=True, blank=True)
attribute_values = JSONField(default=list, blank=True)
cluster = ForeignKey('Clusters', on_delete=models.SET_NULL, null=True, blank=True)
status = CharField(max_length=50, choices=[('new','New'),('mapped','Mapped')], default='new')
disabled = BooleanField(default=False)
# NEW: optional SAG cluster link
sag_cluster_id = IntegerField(null=True, blank=True) # Links to sag.SAGCluster PK
created_at = DateTimeField(auto_now_add=True)
class Meta:
app_label = 'planner'
class Clusters(SoftDeletableModel, SiteSectorBaseModel):
"""Keyword clusters — pure topic clusters."""
name = CharField(max_length=255, db_index=True)
description = TextField(blank=True, null=True)
keywords_count = IntegerField(default=0)
volume = IntegerField(default=0)
mapped_pages = IntegerField(default=0)
status = CharField(max_length=50, choices=[('new','New'),('mapped','Mapped')], default='new')
disabled = BooleanField(default=False)
created_at = DateTimeField(auto_now_add=True)
updated_at = DateTimeField(auto_now=True)
class Meta:
app_label = 'planner'
class ContentIdeas(SoftDeletableModel, SiteSectorBaseModel):
"""Content ideas generated from keyword clusters."""
idea_title = CharField(max_length=255, db_index=True)
description = TextField(blank=True, null=True)
primary_focus_keywords = CharField(max_length=500, blank=True)
target_keywords = CharField(max_length=500, blank=True)
keyword_objects = ManyToManyField('Keywords', blank=True, related_name='content_ideas')
keyword_cluster = ForeignKey('Clusters', on_delete=models.SET_NULL, null=True, blank=True)
status = CharField(max_length=50, choices=[('new','New'),('queued','Queued'),('completed','Completed')], default='new')
disabled = BooleanField(default=False)
estimated_word_count = IntegerField(default=1000)
content_type = CharField(max_length=50, choices=[('post','Post'),('page','Page'),('product','Product'),('taxonomy','Taxonomy')], default='post')
content_structure = CharField(max_length=50, choices=[
('article','Article'),('guide','Guide'),('comparison','Comparison'),
('review','Review'),('listicle','Listicle'),('landing_page','Landing Page'),
('business_page','Business Page'),('service_page','Service Page'),
('general','General'),('cluster_hub','Cluster Hub'),('product_page','Product Page'),
('category_archive','Category Archive'),('tag_archive','Tag Archive'),
('attribute_archive','Attribute Archive'),
], default='article')
# NEW: SAG fields
sag_cluster_id = IntegerField(null=True, blank=True) # Links to sag.SAGCluster PK
idea_source = CharField(choices=['auto_generate', 'sag_blueprint'], null=True, blank=True) # NEW
execution_phase = IntegerField(null=True) # NEW: 1-4 from blueprint
created_at = DateTimeField(auto_now_add=True)
class Meta:
app_label = 'planner'
# igny8_core/business/content/models.py — Content Pipeline (app_label: writer)
class Tasks(SoftDeletableModel, SiteSectorBaseModel):
"""Tasks model for content generation queue."""
title = CharField(max_length=255, db_index=True)
description = TextField(blank=True, null=True)
cluster = ForeignKey('planner.Clusters', on_delete=models.SET_NULL, null=True, blank=False)
idea = ForeignKey('planner.ContentIdeas', on_delete=models.SET_NULL, null=True, blank=True)
content_type = CharField(max_length=100, choices=[('post','Post'),('page','Page'),('product','Product'),('taxonomy','Taxonomy')], default='post')
content_structure = CharField(max_length=100, choices=[...same as ContentIdeas...], default='article')
taxonomy_term = ForeignKey('ContentTaxonomy', on_delete=models.SET_NULL, null=True, blank=True)
keywords = TextField(blank=True, null=True, help_text='Comma-separated keywords')
word_count = IntegerField(default=1000)
status = CharField(max_length=50, choices=[('queued','Queued'),('completed','Completed')], default='queued')
# NEW: SAG fields
sag_cluster_id = IntegerField(null=True, blank=True) # Links to sag.SAGCluster PK
blueprint_context = JSONField(null=True, blank=True) # NEW: execution context
created_at = DateTimeField(auto_now_add=True)
updated_at = DateTimeField(auto_now=True)
class Meta:
app_label = 'writer'
class Content(SoftDeletableModel, SiteSectorBaseModel):
"""Content model for AI-generated or WordPress-imported content."""
title = CharField(max_length=255, db_index=True)
content_html = TextField(help_text='Final HTML content') # NOTE: field is content_html, NOT body
word_count = IntegerField(default=0)
meta_title = CharField(max_length=255, blank=True, null=True)
meta_description = TextField(blank=True, null=True)
primary_keyword = CharField(max_length=255, blank=True, null=True)
secondary_keywords = JSONField(default=list, blank=True)
cluster = ForeignKey('planner.Clusters', on_delete=models.SET_NULL, null=True, blank=False)
content_type = CharField(max_length=50, choices=[('post','Post'),('page','Page'),('product','Product'),('taxonomy','Taxonomy')], default='post')
content_structure = CharField(max_length=50, choices=[...same as Tasks...], default='article')
taxonomy_terms = ManyToManyField('ContentTaxonomy', through='ContentTaxonomyRelation', blank=True)
external_id = CharField(max_length=255, blank=True, null=True)
external_url = URLField(blank=True, null=True)
source = CharField(max_length=50, choices=[('igny8','IGNY8 Generated'),('wordpress','WordPress Imported')], default='igny8')
status = CharField(max_length=50, choices=[('draft','Draft'),('review','Review'),('approved','Approved'),('published','Published')], default='draft')
# NEW: SAG fields
sag_cluster_id = IntegerField(null=True, blank=True) # Links to sag.SAGCluster PK
created_at = DateTimeField(auto_now_add=True)
updated_at = DateTimeField(auto_now=True)
class Meta:
app_label = 'writer'
class Images(SoftDeletableModel, SiteSectorBaseModel):
"""Images model — note: class is Images (plural)."""
content = ForeignKey(Content, on_delete=models.CASCADE, null=True, blank=True)
task = ForeignKey(Tasks, on_delete=models.CASCADE, null=True, blank=True)
image_type = CharField(max_length=50, choices=[('featured','Featured'),('desktop','Desktop'),('mobile','Mobile'),('in_article','In-Article')], default='featured')
image_url = CharField(max_length=500, blank=True, null=True) # NOTE: field is image_url, NOT url
image_path = CharField(max_length=500, blank=True, null=True)
prompt = TextField(blank=True, null=True) # Generation prompt
caption = TextField(blank=True, null=True) # NOTE: field is caption, NOT alt_text
status = CharField(max_length=50, default='pending')
position = IntegerField(default=0)
# NEW: SAG fields
sag_cluster_id = IntegerField(null=True, blank=True) # Links to sag.SAGCluster PK
style_type = CharField(max_length=50, choices=[('hero','Hero'),('supporting','Supporting'),('ecommerce','Ecommerce'),('category','Category'),('service','Service'),('conversion','Conversion')], null=True, blank=True) # NEW
created_at = DateTimeField(auto_now_add=True)
class Meta:
app_label = 'writer'
class Job(models.Model):
"""Pipeline execution tracking (NEW model — does not yet exist in codebase)."""
site = ForeignKey('igny8_core_auth.Site', on_delete=models.CASCADE)
status = CharField(choices=['pending', 'running', 'completed', 'failed'])
stage = IntegerField(choices=[(0, 'Blueprint Check'), (1, 'Keywords'), (2, 'Cluster'), (3, 'Ideas'), (4, 'Tasks'), (5, 'Content'), (6, 'Taxonomy'), (7, 'Images')])
blueprint_mode = CharField(choices=['legacy', 'blueprint_aware']) # NEW
log = TextField(default='')
created_at = DateTimeField(auto_now_add=True)
completed_at = DateTimeField(null=True)
API Endpoints (Celery Task Functions)
Stage 0: Blueprint Check
# igny8_core/tasks.py (Celery app: celery -A igny8_core)
@app.task(bind=True, max_retries=3)
def check_blueprint(self, site_id):
"""
Stage 0: Determine execution mode and load blueprint context.
Args:
site_id: integer PK (BigAutoField)
Returns:
{
'status': 'success',
'pipeline_mode': 'blueprint_aware' | 'legacy',
'blueprint_id': integer (if active),
'execution_phases': list,
'next_stage': 1
}
"""
try:
site = Site.objects.get(id=site_id) # integer PK lookup
job = Job.objects.create(site=site, stage=0, status='running')
blueprint = SAGBlueprint.objects.filter(
site=site,
status='active'
).first()
if blueprint:
result = {
'status': 'success',
'pipeline_mode': 'blueprint_aware',
'blueprint_id': blueprint.id,
'execution_phases': blueprint.execution_priority,
}
job.blueprint_mode = 'blueprint_aware'
else:
result = {
'status': 'success',
'pipeline_mode': 'legacy',
'blueprint_id': None,
'execution_phases': None,
}
job.blueprint_mode = 'legacy'
job.status = 'completed'
job.save()
# Chain to Stage 1
process_keywords.delay(site_id, result)
return result
except Exception as e:
self.retry(exc=e, countdown=60)
Stage 1: Keyword Processing
@app.task(bind=True, max_retries=3)
def process_keywords(self, site_id, blueprint_context):
"""
Stage 1: Process keywords and optionally map to SAGClusters.
If blueprint_context['pipeline_mode'] == 'blueprint_aware':
- Map keywords to existing SAGClusters
- Flag unmatched keywords
Else:
- Pass keywords to next stage unchanged
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=1,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
keywords = Keywords.objects.filter(site=site, sag_cluster_id__isnull=True)
if blueprint_context['pipeline_mode'] == 'blueprint_aware':
blueprint = SAGBlueprint.objects.get(id=blueprint_context['blueprint_id'])
clusters = SAGCluster.objects.filter(blueprint=blueprint)
matched_count = 0
unmatched_keywords = []
for keyword in keywords:
# Semantic matching: find best cluster
cluster = _match_keyword_to_cluster(keyword, clusters)
if cluster:
keyword.sag_cluster_id = cluster.id
keyword.save()
cluster.keywords.append(keyword.keyword)
cluster.save()
matched_count += 1
else:
unmatched_keywords.append(keyword.keyword)
job.log = f"Matched {matched_count} keywords. Unmatched: {unmatched_keywords}"
else:
job.log = "Legacy mode: keywords passed unchanged"
job.status = 'completed'
job.save()
# Chain to Stage 2
cluster_keywords.delay(site_id, blueprint_context)
return {'status': 'success', 'keywords_processed': keywords.count()}
except Exception as e:
self.retry(exc=e, countdown=60)
def _match_keyword_to_cluster(keyword, clusters):
"""Find best-matching SAGCluster for keyword via embedding similarity."""
# Uses semantic search (embeddings) to find best cluster match
# Returns SAGCluster or None
pass
Stage 2: AI Cluster Keywords
@app.task(bind=True, max_retries=3)
def cluster_keywords(self, site_id, blueprint_context):
"""
Stage 2: Cluster keywords.
If blueprint_aware:
- SKIP AutoClusterKeywords
- Use blueprint clusters from Stage 0
Else:
- Run AutoClusterKeywords (existing function)
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=2,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
if blueprint_context['pipeline_mode'] == 'blueprint_aware':
# Clusters already exist from blueprint
clusters = SAGCluster.objects.filter(
blueprint_id=blueprint_context['blueprint_id']
)
job.log = f"Using {clusters.count()} blueprint clusters"
else:
# Run existing AutoClusterKeywords
clusters = AutoClusterKeywords(site_id)
job.log = f"AutoClusterKeywords created {clusters.count()} clusters"
job.status = 'completed'
job.save()
# Chain to Stage 3
generate_ideas.delay(site_id, blueprint_context)
return {'status': 'success', 'clusters': clusters.count()}
except Exception as e:
self.retry(exc=e, countdown=60)
Stage 3: Generate Content Ideas
@app.task(bind=True, max_retries=3)
def generate_ideas(self, site_id, blueprint_context):
"""
Stage 3: Generate content ideas.
If blueprint_aware:
- Call GenerateIdeasWithBlueprint
- Enrich ideas with type, structure, sector
- Respect execution phases
Else:
- Call existing GenerateIdeas
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=3,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
if blueprint_context['pipeline_mode'] == 'blueprint_aware':
blueprint = SAGBlueprint.objects.get(id=blueprint_context['blueprint_id'])
ideas = GenerateIdeasWithBlueprint(site, blueprint)
job.log = f"Generated {len(ideas)} blueprint-aware ideas across {len(blueprint_context['execution_phases'])} phases"
else:
ideas = GenerateIdeas(site)
job.log = f"Generated {len(ideas)} legacy ideas"
job.status = 'completed'
job.save()
# Chain to Stage 4
create_tasks.delay(site_id, blueprint_context)
return {'status': 'success', 'ideas': len(ideas)}
except Exception as e:
self.retry(exc=e, countdown=60)
Stage 4: Create Writer Tasks
@app.task(bind=True, max_retries=3)
def create_tasks(self, site_id, blueprint_context):
"""
Stage 4: Create writer tasks.
If blueprint_aware:
- Enrich task with sag_cluster_id and blueprint_context JSON
- Respect execution phase priority
Else:
- Create basic tasks
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=4,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
ideas = ContentIdeas.objects.filter(site=site, task__isnull=True)
task_count = 0
for idea in ideas:
task = Tasks.objects.create(
site=site,
title=idea.idea_title,
idea=idea,
status='queued' # Tasks.STATUS_CHOICES: queued/completed
)
if blueprint_context['pipeline_mode'] == 'blueprint_aware' and idea.sag_cluster_id:
cluster = SAGCluster.objects.get(id=idea.sag_cluster_id)
blueprint = cluster.blueprint
task.sag_cluster_id = idea.sag_cluster_id
task.blueprint_context = {
'cluster_id': cluster.id,
'cluster_name': cluster.name,
'cluster_type': cluster.cluster_type,
'cluster_sector': cluster.sector,
'hub_title': blueprint.content_plan.get(str(cluster.id), {}).get('hub_title'),
'hub_url': f"{site.domain}/hubs/{cluster.name.lower().replace(' ', '-')}",
'cluster_attributes': cluster.attributes,
'content_structure': idea.content_structure,
'content_type': idea.content_type,
'execution_phase': idea.execution_phase,
}
task.save()
task_count += 1
job.log = f"Created {task_count} tasks"
job.status = 'completed'
job.save()
# Chain to Stage 5
generate_content.delay(site_id, blueprint_context)
return {'status': 'success', 'tasks': task_count}
except Exception as e:
self.retry(exc=e, countdown=60)
Stage 5: Generate Article Content
@app.task(bind=True, max_retries=3)
def generate_content(self, site_id, blueprint_context):
"""
Stage 5: Generate article content.
If task has blueprint_context:
- Load prompt template by content_type + structure
- Inject blueprint context variables
- Call GPT-4 with enriched prompt
- Post-process for internal links
Else:
- Call existing GenerateContent
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=5,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
tasks = Tasks.objects.filter(site=site, status='completed', content__isnull=True)
content_count = 0
for task in tasks:
if task.blueprint_context:
# Blueprint-aware content generation
prompt_key = _get_prompt_key(
task.blueprint_context['content_type'],
task.blueprint_context['content_structure']
)
template = PROMPT_TEMPLATES.get(prompt_key)
# Inject variables
prompt = template.format(**task.blueprint_context)
# Call GPT-4
article = gpt4_call(prompt)
# Post-process
article = _add_internal_links(article, task.blueprint_context)
else:
# Legacy content generation
article = GenerateContent(task.idea.keyword)
content = Content.objects.create(
site=site,
title=task.title,
body=article,
task=task,
sag_cluster_id=task.sag_cluster_id,
content_type=task.blueprint_context.get('content_type') if task.blueprint_context else 'blog_post',
content_structure=task.blueprint_context.get('content_structure') if task.blueprint_context else None,
)
content_count += 1
job.log = f"Generated {content_count} articles"
job.status = 'completed'
job.save()
# Chain to Stage 6
assign_taxonomy.delay(site_id, blueprint_context)
return {'status': 'success', 'content': content_count}
except Exception as e:
self.retry(exc=e, countdown=60)
def _get_prompt_key(content_type, structure):
"""Map content_type + structure to prompt template key."""
mapping = {
('cluster_hub', 'guide_tutorial'): 'sag_hub_guide',
('cluster_hub', 'listicle'): 'sag_hub_listicle',
('blog_post', 'comparison'): 'sag_blog_comparison',
('blog_post', 'review'): 'sag_blog_review',
('blog_post', 'how_to'): 'sag_blog_howto',
('blog_post', 'question'): 'sag_blog_question',
('term_page', 'guide_tutorial'): 'sag_term_page',
('product_page', 'review'): 'sag_product_page',
('service_page', 'guide_tutorial'): 'sag_service_page',
('landing_page', 'guide_tutorial'): 'sag_landing_guide',
('landing_page', 'comparison'): 'sag_landing_comparison',
('business_page', 'guide_tutorial'): 'sag_business_guide',
}
return mapping.get((content_type, structure), 'sag_default')
def _add_internal_links(article, blueprint_context):
"""Add internal links to related cluster hubs and attribute term pages."""
# Parse article, identify linking opportunities
# Inject markdown links to related content
pass
Stage 6: Taxonomy Assignment
@app.task(bind=True, max_retries=3)
def assign_taxonomy(self, site_id, blueprint_context):
"""
Stage 6: Assign content to custom WP taxonomies (blueprint mode only).
If blueprint_aware:
- Match content to cluster attributes
- Assign custom taxonomy values
- Update cluster status
Else:
- Skip stage
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=6,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
if blueprint_context['pipeline_mode'] != 'blueprint_aware':
job.log = "Legacy mode: taxonomy assignment skipped"
job.status = 'completed'
job.save()
generate_images.delay(site_id, blueprint_context)
return {'status': 'success', 'skipped': True}
blueprint = SAGBlueprint.objects.get(id=blueprint_context['blueprint_id'])
content_items = Content.objects.filter(site=site, sag_cluster_id__isnull=False, taxonomies__isnull=True)
assigned_count = 0
for content in content_items:
cluster = SAGCluster.objects.get(id=content.sag_cluster_id)
# Load taxonomy mapping from blueprint
tax_mapping = blueprint.wp_taxonomy_mapping.get(cluster.id, {})
# Assign taxonomies
content.taxonomies = tax_mapping
content.save()
# Update cluster status
if Content.objects.filter(sag_cluster_id=cluster.id).count() > 0:
if cluster.status == 'draft':
cluster.status = 'partial'
cluster.save()
assigned_count += 1
job.log = f"Assigned {assigned_count} content items to taxonomies"
job.status = 'completed'
job.save()
# Chain to Stage 7
generate_images.delay(site_id, blueprint_context)
return {'status': 'success', 'assigned': assigned_count}
except Exception as e:
self.retry(exc=e, countdown=60)
Stage 7: Image Generation
@app.task(bind=True, max_retries=3)
def generate_images(self, site_id, blueprint_context):
"""
Stage 7: Generate featured and in-article images.
If blueprint_aware:
- Match image style to content type
- Use cluster theme/color palette
Else:
- Generate default style images
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=7,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
content_items = Content.objects.filter(site=site, image__isnull=True)
image_count = 0
for content in content_items:
if blueprint_context['pipeline_mode'] == 'blueprint_aware' and content.content_type:
# Match style to content type
style_mapping = {
'cluster_hub': 'hero',
'blog_post': 'supporting',
'product_page': 'ecommerce',
'term_page': 'category',
'service_page': 'service',
'landing_page': 'conversion',
}
style = style_mapping.get(content.content_type, 'supporting')
else:
style = 'supporting'
# Generate featured image
featured_image = GenerateImage(content.title, style)
image = Images.objects.create(
content=content,
url=featured_image['url'],
alt_text=featured_image['alt_text'],
style_type=style,
sag_cluster_id=content.sag_cluster_id,
)
image_count += 1
job.log = f"Generated {image_count} images"
job.status = 'completed'
job.save()
return {'status': 'success', 'images': image_count}
except Exception as e:
self.retry(exc=e, countdown=60)
4. IMPLEMENTATION STEPS
Phase A: Data Model Extensions (Week 1)
- Add fields to Keyword, Idea, Task, Content, Image models (see Section 3)
- Create SAGBlueprint, SAGCluster models (reference 01A)
- Create database migrations
- Test model relationships and queries
Phase B: Stage 0 Implementation (Week 1)
- Implement
check_blueprintCelery task - Add blueprint loading and caching logic
- Create execution_priority parsing
- Test with sample blueprints (active and inactive)
- Add logging and error handling
Phase C: Stage 1–2 Enhancement (Week 2)
- Implement
_match_keyword_to_clusterfunction (embedding-based matching) - Extend
process_keywordstask for blueprint mode - Modify
cluster_keywordsto skip AutoClusterKeywords when blueprint active - Add unmatched keyword flagging and reporting
- Test with mixed keyword sets
Phase D: Stage 3 Enhancement (Week 2)
- Create
sag/ai_functions/content_planning.pymodule - Implement
GenerateIdeasWithBlueprintfunction - Add phase-based filtering and prioritization
- Integrate structure/type/sector enrichment
- Test idea generation for each content type
Phase E: Stage 4 Enhancement (Week 3)
- Extend
create_taskstask with blueprint_context JSON assembly - Add execution_phase assignment
- Test blueprint_context structure completeness
- Verify sag_cluster_id linking
Phase F: Stage 5 Enhancement (Week 3)
- Create PROMPT_TEMPLATES dictionary with all template keys
- Implement
_get_prompt_keyfunction - Extend
generate_contenttask to use templates - Implement
_add_internal_linkspost-processing - Test content generation for each content_type + structure combination
- Validate prompt variable injection
Phase G: Stage 6 Implementation (Week 4)
- Implement
assign_taxonomytask - Add taxonomy mapping logic from blueprint.wp_taxonomy_mapping
- Implement cluster status updates
- Test taxonomy assignment with sample blueprints
Phase H: Stage 7 Enhancement (Week 4)
- Extend
generate_imagestask for blueprint mode - Add style_type mapping by content_type
- Implement color palette usage from blueprint
- Test image generation for each content type
Phase I: Integration & Testing (Week 5)
- Test full pipeline execution with active blueprint
- Test full pipeline execution without blueprint (legacy mode)
- Add integration tests for each stage transition
- Test error handling and retries
- Load testing with multiple concurrent sites
Phase J: Deployment & Monitoring (Week 6)
- Deploy models and migrations to staging
- Deploy Celery tasks to staging
- Validate with staging data
- Set up pipeline execution monitoring (01G)
- Deploy to production with feature flag (blueprint mode off by default)
5. ACCEPTANCE CRITERIA
Functional Requirements
- Stage 0: Blueprint check completes successfully; mode determination accurate
- Stage 1: Keywords matched to clusters with 85%+ accuracy; unmatched flagged
- Stage 2: Legacy mode skipped when blueprint active; clusters pre-loaded
- Stage 3: Ideas generated with correct type/structure/sector/cluster assignment
- Stage 4: Tasks enriched with complete blueprint_context JSON
- Stage 5: Content generated using template-specific prompts; blueprint variables injected
- Stage 6: Content assigned to custom taxonomies; cluster status updated
- Stage 7: Images generated with correct style matching content type
Quality Criteria
- No breaking changes: Legacy mode works identically to pre-blueprint pipeline
- Error handling: All Celery tasks handle failures gracefully; retry logic functional
- Performance: Pipeline completes within baseline timing (per site, per stage)
- Logging: All stages log execution details and decisions
- Data integrity: sag_cluster_id and blueprint_context consistently populated
Testing Coverage
- Unit tests: Each function and task (>80% coverage)
- Integration tests: Full pipeline execution with/without blueprint
- Scenario tests:
- Active blueprint (all phases)
- Inactive blueprint (legacy mode)
- Mixed keywords (matched + unmatched)
- Multiple sites with different blueprints
- Failed tasks (retry logic)
Documentation
- Docstrings: All functions documented with inputs/outputs
- README: Setup and execution instructions
- Troubleshooting guide: Common issues and solutions
Monitoring (01G Health Monitoring)
- Pipeline execution time per stage per site
- Content generation success rate by content_type
- Taxonomy assignment accuracy
- Cluster completion status tracking
- Unmatched keyword trending
6. CLAUDE CODE INSTRUCTIONS
Running the Pipeline Locally
Prerequisites
# Install dependencies
pip install -r requirements.txt
celery[redis] pytest pytest-django
# Set up local database
python manage.py migrate
# Start Redis (for Celery)
redis-server
Initialize Test Data
# Create sample site and blueprint
python manage.py shell << EOF
from django.contrib.auth.models import User
from igny8_core.auth.models import Site
from sag.models import SAGBlueprint, SAGCluster
site = Site.objects.create(name="Test Site", domain="test.local")
blueprint = SAGBlueprint.objects.create(
site=site,
name="Test Blueprint",
status="active",
execution_priority={
"phase_1": ["category_pages", "top_cluster_hubs"],
"phase_2": ["remaining_hubs"],
"phase_3": ["attribute_term_pages"],
"phase_4": ["additional_blogs"],
},
content_plan={},
wp_taxonomy_mapping={}
)
cluster = SAGCluster.objects.create(
blueprint=blueprint,
name="Test Cluster",
cluster_type="topical",
sector="Tech",
keywords=["python", "django"],
attributes=["web development", "open source"],
status="draft"
)
print(f"Created site {site.id}, blueprint {blueprint.id}, cluster {cluster.id}")
EOF
Execute Pipeline Stages
# Start Celery worker (in separate terminal)
celery -A igny8_core worker --loglevel=info
# Run Stage 0: Blueprint Check
python manage.py shell << EOF
from igny8_core.tasks import check_blueprint
result = check_blueprint.delay(site_id="<site-id>")
print(result.get())
EOF
# Run full pipeline
python manage.py shell << EOF
from igny8_core.tasks import check_blueprint
site_id = 1 # integer PK (BigAutoField)
check_blueprint.delay(site_id)
# Each stage automatically chains to the next
EOF
# Monitor pipeline execution
celery -A igny8_core events
# or view logs: tail -f celery.log
Testing the Pipeline
Unit Tests
pytest igny8_core/business/content/tests/test_pipeline.py -v
pytest igny8_core/sag/tests/test_blueprint.py -v
pytest igny8_core/tests/test_tasks.py -v
Integration Test
pytest igny8_core/business/content/tests/test_pipeline_integration.py::test_full_blueprint_pipeline -v
# Test legacy mode
pytest igny8_core/business/content/tests/test_pipeline_integration.py::test_full_legacy_pipeline -v
# Test mixed mode (some sites with blueprint, some without)
pytest igny8_core/business/content/tests/test_pipeline_integration.py::test_mixed_mode_execution -v
Manual Test Scenario
# 1. Create test site and blueprint
python manage.py shell < scripts/setup_test_data.py
# 2. Import sample keywords
python manage.py shell << EOF
from igny8_core.business.content.models import Keyword
from igny8_core.auth.models import Site
site = Site.objects.get(name="Test Site")
keywords = ["python tutorial", "django rest", "web scraping"]
for kw in keywords:
Keywords.objects.create(site=site, term=kw, source='csv_import')
EOF
# 3. Run pipeline
celery -A igny8_core worker --loglevel=debug &
python manage.py shell << EOF
from igny8_core.tasks import check_blueprint
from igny8_core.auth.models import Site
site = Site.objects.get(name="Test Site")
check_blueprint.delay(site.id)
EOF
# 4. Inspect results
python manage.py shell << EOF
from igny8_core.business.content.models import Keyword, Idea, Task, Content, Image
from igny8_core.auth.models import Site
site = Site.objects.get(name="Test Site")
print("Keywords:", Keywords.objects.filter(site=site).count())
print("Ideas:", ContentIdeas.objects.filter(site=site).count())
print("Tasks:", Tasks.objects.filter(site=site).count())
print("Content:", Content.objects.filter(site=site).count())
print("Images:", Images.objects.filter(site=site).count())
# Check blueprint context
task = Tasks.objects.filter(site=site, blueprint_context__isnull=False).first()
if task:
print("Blueprint context:", task.blueprint_context)
EOF
Debugging Common Issues
Blueprint Not Detected
# Check if blueprint exists and is active
python manage.py shell << EOF
from sag.models import SAGBlueprint
from igny8_core.auth.models import Site
site = Site.objects.get(id="<site-id>")
blueprint = SAGBlueprint.objects.filter(site=site, status='active').first()
print(f"Blueprint: {blueprint}")
if blueprint:
print(f"Status: {blueprint.status}")
print(f"Content plan: {blueprint.content_plan}")
EOF
Keywords Not Matching
# Check keyword-cluster mapping
python manage.py shell << EOF
from igny8_core.business.content.models import Keyword
from sag.models import SAGCluster
keywords = Keywords.objects.filter(sag_cluster_id__isnull=True)
print(f"Unmatched keywords: {[kw.term for kw in keywords]}")
# Check available clusters
clusters = SAGCluster.objects.all()
for cluster in clusters:
print(f"Cluster '{cluster.name}': {cluster.attributes}")
EOF
Content Not Generated
# Check task status
python manage.py shell << EOF
from igny8_core.business.content.models import Task
tasks = Tasks.objects.all()
for task in tasks:
print(f"Task {task.id}: status={task.status}, blueprint_context={bool(task.blueprint_context)}")
EOF
# Check Celery task logs
celery -A igny8_core inspect active
celery -A igny8_core inspect reserved
celery -A igny8_core purge # WARNING: clears queue
Extending with Custom Prompt Templates
Add New Template
# In sag/prompt_templates.py
PROMPT_TEMPLATES = {
'sag_hub_guide': """
You are writing a comprehensive guide for {cluster_name}, a {cluster_type} in the {cluster_sector} sector.
Topic: {cluster_name}
Related terms: {attribute_terms}
Hub page: {hub_url}
Structure: Guide/Tutorial format
- Introduction: What is {cluster_name}?
- Key concepts: {attribute_terms}
- Step-by-step guide
- Common pitfalls
- Conclusion with links to {hub_title}
Write a comprehensive, SEO-optimized guide.
""",
# Add more templates here...
}
# Usage in generate_content task:
# template = PROMPT_TEMPLATES['sag_hub_guide']
# prompt = template.format(**blueprint_context)
Monitoring Pipeline Health (Integration with 01G)
# View pipeline execution history
python manage.py shell << EOF
from igny8_core.business.content.models import Job
jobs = Job.objects.filter(stage=5).order_by('-created_at')[:10]
for job in jobs:
duration = (job.completed_at - job.created_at).total_seconds() if job.completed_at else None
print(f"Stage {job.stage}: {job.status} ({duration}s) - {job.blueprint_mode}")
EOF
# Check cluster completion status
python manage.py shell << EOF
from sag.models import SAGCluster
clusters = SAGCluster.objects.all()
for cluster in clusters:
content_count = cluster.content_set.count()
print(f"Cluster '{cluster.name}': {cluster.status} ({content_count} content items)")
EOF
Cross-References
| Document | Reference Purpose |
|---|---|
| 01A: SAG Blueprint Model | SAGBlueprint, SAGCluster models used at Stage 0 |
| 01C: Cluster Formation | Clusters created by SAG framework; used by pipeline |
| 01D: Setup Wizard | Creates blueprint that drives pipeline execution |
| 01F: Case 1 Analysis | Produces blueprints that feed this pipeline |
| 01G: Health Monitoring | Tracks pipeline output per cluster and stage |
| Content_Types_Writing_Plan.md | Content type definitions; prompt template structure |
Summary
The Blueprint-Aware Content Pipeline enhances IGNY8's 7-stage automation with SAG framework context at every step. When a site has an active blueprint, content generation becomes strategic: keywords map to clusters, ideas inherit type/structure/sector assignments, prompts leverage cluster context, and output auto-taxonomizes. When no blueprint exists, the pipeline defaults to legacy mode unchanged.
Key innovation: Two-mode execution (blueprint-aware + legacy) enables gradual adoption—teams can opt in to blueprint-driven content without disrupting existing sites. Execution priority phases ensure foundational content (hubs) publishes before supporting content (blogs), building authority tier-by-tier.