Files
igny8/v2/V2-Execution-Docs/01E-blueprint-aware-pipeline.md
IGNY8 VPS (Salman) 128b186865 temproary docs uplaoded
2026-03-23 09:02:49 +00:00

1266 lines
44 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 01E: Blueprint-Aware Content Pipeline
**IGNY8 Phase 1: Content Automation with SAG Blueprint Enhancement**
---
## 1. CURRENT STATE
### Existing Pipeline Architecture
IGNY8's content pipeline operates as a 7-stage automated system, orchestrated via Celery with scheduled execution (daily/weekly/monthly via Celery Beat):
| Stage | Function | Automation | Output |
|-------|----------|-----------|--------|
| 1 | Keywords | Import CSV/seed lists | Keyword list per site |
| 2 | Clusters | AutoClusterKeywords (GPT-4) | Semantic keyword groups |
| 3 | Ideas | GenerateIdeas | Content brief queue |
| 4 | Tasks | Queue creation | Writer task list |
| 5 | Content | GenerateContent (AI) | Draft articles |
| 6 | Images | GenerateImages | Featured + in-article images |
| 7 | Review | Editorial queue | Published content |
### Current Limitations
- **Generic clustering**: All keywords grouped by semantic similarity, no business-specific structure
- **One-size-fits-all content**: All articles follow same template regardless of content type
- **No hierarchy**: No distinction between hub pages, blog posts, product pages, term pages, or service pages
- **No priority**: All content treated equally; foundational content (hubs) may not be written first
- **No taxonomy integration**: Generated content not automatically assigned to custom taxonomies
- **No blueprint context**: Writers receive keywords but not strategic framework
### Celery Automation Context
- **Celery Beat**: Manages recurring schedule (daily, weekly, monthly per site)
- **Task Queue**: Each stage enqueued as separate Celery task
- **State Tracking**: Uses Django ORM to track Job, Stage, Keyword, Cluster, Idea, Task, Content, Image models
- **Failure Handling**: Retry logic, dead-letter queue for failed tasks
- **Logging**: Structured logging to track execution per site per stage
---
## 2. WHAT TO BUILD
### Vision: Blueprint-Driven Pipeline
When a site has an **active SAG Blueprint**, every pipeline stage becomes context-aware:
- Content priorities driven by blueprint's execution phases
- Content types (hub, blog, product, term, service) determined at ideation
- Prompt templates matched to content structure and type
- Output taxonomy-tagged and cluster-assigned automatically
When **no blueprint exists**, the pipeline reverts to legacy mode—no breaking changes.
### New/Enhanced Stages
#### Stage 0: Blueprint Check (NEW)
Execute before pipeline stages 17.
**Responsibility**: Determine execution mode and load context.
**Logic**:
```python
IF Site.sag_blueprint EXISTS AND sag_blueprint.status == 'active':
LOAD blueprint
IDENTIFY unfulfilled content needs from blueprint.content_plan
DETERMINE execution_priority from blueprint.execution_phases
SET pipeline_mode = 'blueprint_aware'
ELSE:
SET pipeline_mode = 'legacy'
PROCEED to Stage 1 with no blueprint context
```
**Outputs**:
- `pipeline_mode`: 'blueprint_aware' | 'legacy'
- `blueprint_context`: SAGBlueprint instance (if active)
- `execution_phases`: List of priority phases for content queue
---
#### Stage 1: Keyword Processing (ENHANCED)
**Legacy behavior** (no blueprint): Pass keywords to Stage 2 unchanged.
**Blueprint-aware** (active blueprint):
1. For each new/imported keyword, query blueprint's SAGClusters
2. Match keyword to existing clusters based on:
- Attribute overlap (e.g., keyword "sustainable farming" matches cluster with attribute "sustainability")
- Semantic proximity to cluster topic
- Sector alignment
3. Assign matched keyword to cluster's `keywords` list
4. Flag unmatched keywords:
- **Gap**: No cluster exists for this topic
- **Outlier**: Keyword semantic distance > threshold from all clusters
- **Frontier**: Keyword extends cluster into new subtopic (possible new cluster)
5. Update `SAGCluster.keywords`, `SAGCluster.updated_at`
**Outputs**:
- Updated cluster keyword lists
- Gap/outlier report for content strategy review
- Flagged keywords for potential new cluster formation
---
#### Stage 2: AI Cluster Keywords (ENHANCED)
**Legacy behavior** (no blueprint): Run existing `AutoClusterKeywords` via GPT-4 grouping.
**Blueprint-aware** (active blueprint):
1. **SKIP** `AutoClusterKeywords` entirely
2. Clusters already defined by SAG framework (Stage 0 loaded blueprint)
3. For new keywords from Stage 1:
- Map to existing clusters (already done in Stage 1)
- Create mapping record linking keyword → SAGCluster
4. Flag unmatched keywords (from Stage 1) for manual review
5. No new clusters created (cluster formation is Phase 1C process, not pipeline)
**Outputs**:
- Keyword-to-cluster mapping
- Unmatched keyword report
---
#### Stage 3: Generate Content Ideas (ENHANCED)
**Legacy behavior** (no blueprint): Run existing `GenerateIdeas` function.
**Blueprint-aware** (active blueprint):
1. Call `sag/ai_functions/content_planning.py::GenerateIdeasWithBlueprint`
2. For each idea generated, enrich with:
- **Sector**: From SAGCluster.sector
- **Structure**: From blueprint.content_plan[cluster].structure (e.g., 'guide_tutorial', 'comparison', 'review', 'how_to', 'question')
- **Type**: From blueprint.content_plan[cluster].type (e.g., 'cluster_hub', 'blog_post', 'product_page', 'term_page', 'service_page')
- **SAGCluster ID**: Link idea to blueprint cluster
- **idea_source**: Set to 'sag_blueprint'
3. Respect execution phases:
- Phase 1: Generate ideas for `category_pages`, `top_cluster_hubs`
- Phase 2: Generate ideas for `remaining_hubs`, `first_blogs_per_cluster`
- Phase 3: Generate ideas for `attribute_term_pages`, `product_enrichment`
- Phase 4: Generate ideas for `additional_blogs`, `brand_comparisons`
4. Prioritize queuing by phase
**Outputs**:
- Idea records with type, structure, sector, cluster assignment
- Execution phase assignments
- Queue prioritized by phase
---
#### Stage 4: Create Writer Tasks (ENHANCED)
**Legacy behavior** (no blueprint): Create basic task with keyword/idea reference.
**Blueprint-aware** (active blueprint):
1. For each idea, create Task with:
- Standard fields: title, keyword, site, status, assigned_to
- **New fields**:
- `sag_cluster_id`: Reference to blueprint cluster
- `blueprint_context`: JSON blob containing execution context
2. `blueprint_context` structure:
```json
{
"cluster_id": "uuid",
"cluster_name": "string",
"cluster_type": "string (topical|product|service)",
"cluster_sector": "string",
"hub_title": "string (cluster's main hub page title)",
"hub_url": "string (blueprint.site.domain/cluster_slug)",
"cluster_attributes": ["list of attribute terms"],
"related_clusters": ["list of related cluster ids"],
"cluster_products": ["list of product ids if product cluster"],
"content_structure": "string (guide_tutorial|comparison|review|how_to|question|listicle)",
"content_type": "string (cluster_hub|blog_post|product_page|term_page|service_page)",
"execution_phase": "integer (1-4)",
"seo_strategy": "object (primary_keyword, related_keywords, intent)"
}
```
3. If no blueprint: Create task without blueprint_context (legacy)
**Outputs**:
- Task records with sag_cluster_id and blueprint_context
---
#### Stage 5: Generate Article Content (ENHANCED)
**Legacy behavior** (no blueprint): Run existing `GenerateContent` with generic prompt.
**Blueprint-aware** (has blueprint_context):
1. **Load prompt template** by content_type + content_structure combination:
| Content Type | Structure | Template Key |
|---|---|---|
| Cluster Hub | Guide Tutorial | `sag_hub_guide` |
| Cluster Hub | Top Listicle | `sag_hub_listicle` |
| Blog Post | Comparison | `sag_blog_comparison` |
| Blog Post | Review | `sag_blog_review` |
| Blog Post | How To | `sag_blog_howto` |
| Blog Post | Question | `sag_blog_question` |
| Term Page | Guide Tutorial | `sag_term_page` |
| Product Page | Review | `sag_product_page` |
| Service Page | Guide Tutorial | `sag_service_page` |
| Landing Page | Guide Tutorial | `sag_landing_guide` |
| Landing Page | Comparison | `sag_landing_comparison` |
| Business Page | Guide Tutorial | `sag_business_guide` |
2. **Inject blueprint context variables** into prompt template:
```
{cluster_name} → From SAGCluster.name
{cluster_type} → From SAGCluster.cluster_type
{cluster_sector} → From SAGCluster.sector
{hub_title} → From blueprint_context.hub_title
{hub_url} → From blueprint_context.hub_url
{attribute_terms} → Comma-separated list from cluster attributes
{cluster_products} → Product list if product cluster
{related_clusters} → Related cluster names for internal linking
{content_structure} → Structure type for consistency
{content_type} → Content type for tone/depth
```
3. Call GPT-4 with enriched prompt template
4. Post-process output:
- Add internal links to related cluster hubs
- Add cross-references to attribute term pages
- Inject CTA appropriate to content type (e.g., product link for product cluster)
5. If no blueprint_context: Run legacy `GenerateContent` unchanged
**Outputs**:
- Content record with body, title, sag_cluster_id, content_type, content_structure
---
#### Stage 6: Taxonomy Assignment (NEW)
Execute after content generation, **only if blueprint exists**.
**Responsibility**: Auto-assign content to custom WP taxonomies derived from blueprint.
**Logic**:
1. Load site's custom taxonomies from blueprint (`SAGCluster.wp_taxonomy_mapping`)
2. For generated content:
- Match content to cluster's attributes and taxonomy terms
- Assign custom taxonomy values from blueprint mapping
- Set `content.sag_cluster_id` (links to blueprint structure)
- Update cluster status:
- If first content in cluster: set `SAGCluster.status = 'partial'`
- If all planned content exists: set `SAGCluster.status = 'complete'`
3. Store taxonomy assignments in `Content.taxonomies` JSON field
**Outputs**:
- Content records tagged with custom taxonomies
- Cluster status updated to reflect content completion
---
#### Stage 7: Image Generation (ENHANCED)
**Legacy behavior** (no blueprint): Generate generic featured + in-article images.
**Blueprint-aware** (blueprint exists):
1. Match image style to content type:
- **Hub page**: Hero/authority style (professional, comprehensive)
- **Blog post**: Supporting/educational (friendly, illustrative)
- **Product page**: E-commerce standard (product-focused, clean)
- **Term page**: Category representation (taxonomy icon or concept illustration)
- **Service page**: Service illustration (professional, trustworthy)
- **Landing page**: Conversion-focused (compelling, aspirational)
2. Use cluster theme/color palette from blueprint for style consistency
3. Generate alt text leveraging content_structure + cluster context
4. If no blueprint: Generate images with default style
**Outputs**:
- Image records with style type, alt text, sag_cluster_id
---
### Execution Priority (Blueprint-Driven)
Pipeline processes content by `SAGBlueprint.execution_priority` phases:
```python
execution_priority = {
"phase_1": ["category_pages", "top_cluster_hubs"],
"phase_2": ["remaining_hubs", "first_blogs_per_cluster"],
"phase_3": ["attribute_term_pages", "product_enrichment"],
"phase_4": ["additional_blogs", "brand_comparisons"]
}
```
**Queue behavior**:
- Stage 3 filters ideas by phase
- Stage 4 prioritizes tasks by phase
- Celery task enqueuing respects phase order
- **Rationale**: Foundational content (hubs) published before supporting content (blogs)
---
## 3. DATA MODELS / APIs
### Related Models (from 01A, 01C, 01D)
```python
# sag/models.py — SAG Blueprint Structure
class SAGBlueprint(models.Model):
site = ForeignKey(Site)
name = CharField(max_length=255)
status = CharField(choices=['draft', 'active', 'archived'])
created_at = DateTimeField(auto_now_add=True)
updated_at = DateTimeField(auto_now=True)
# Phase-based execution plan
execution_priority = JSONField(default=dict) # phases 1-4
content_plan = JSONField() # cluster_id → content specs
# Taxonomy mapping to WordPress custom taxonomies
wp_taxonomy_mapping = JSONField() # cluster_id → tax values
class SAGCluster(models.Model):
blueprint = ForeignKey(SAGBlueprint)
name = CharField(max_length=255)
cluster_type = CharField(choices=['topical', 'product', 'service'])
sector = CharField(max_length=255)
keywords = JSONField(default=list)
attributes = JSONField(default=list)
status = CharField(choices=['draft', 'partial', 'complete'])
updated_at = DateTimeField(auto_now=True)
```
### Pipeline Models (existing)
```python
# content/models.py — Content Pipeline
class Keyword(models.Model):
site = ForeignKey(Site)
term = CharField(max_length=255)
source = CharField(choices=['csv_import', 'seed_list', 'user', 'sag_blueprint'])
sag_cluster_id = UUIDField(null=True, blank=True) # NEW: links to blueprint cluster
created_at = DateTimeField(auto_now_add=True)
class Cluster(models.Model):
site = ForeignKey(Site)
name = CharField(max_length=255)
keywords = JSONField(default=list)
created_by = CharField(choices=['auto_cluster', 'sag_blueprint'])
class Idea(models.Model):
site = ForeignKey(Site)
title = CharField(max_length=255)
keyword = ForeignKey(Keyword)
cluster = ForeignKey(Cluster, null=True)
sector = CharField(max_length=255) # NEW
structure = CharField(choices=['guide_tutorial', 'comparison', 'review', 'how_to', 'question', 'listicle']) # NEW
content_type = CharField(choices=['cluster_hub', 'blog_post', 'product_page', 'term_page', 'service_page', 'landing_page', 'business_page']) # NEW
sag_cluster_id = UUIDField(null=True, blank=True) # NEW
idea_source = CharField(choices=['auto_generate', 'sag_blueprint']) # NEW
execution_phase = IntegerField(null=True) # NEW: 1-4 from blueprint
created_at = DateTimeField(auto_now_add=True)
class Task(models.Model):
site = ForeignKey(Site)
title = CharField(max_length=255)
idea = ForeignKey(Idea)
status = CharField(choices=['pending', 'assigned', 'in_progress', 'review', 'completed'])
assigned_to = ForeignKey(User, null=True)
sag_cluster_id = UUIDField(null=True, blank=True) # NEW
blueprint_context = JSONField(null=True, blank=True) # NEW: execution context
created_at = DateTimeField(auto_now_add=True)
class Content(models.Model):
site = ForeignKey(Site)
title = CharField(max_length=255)
body = TextField()
task = ForeignKey(Task, null=True)
content_type = CharField(choices=['cluster_hub', 'blog_post', 'product_page', 'term_page', 'service_page', 'landing_page', 'business_page']) # NEW
content_structure = CharField(choices=['guide_tutorial', 'comparison', 'review', 'how_to', 'question', 'listicle']) # NEW
sag_cluster_id = UUIDField(null=True, blank=True) # NEW
taxonomies = JSONField(default=dict, null=True, blank=True) # NEW: custom WP taxonomies
status = CharField(choices=['draft', 'review', 'published'])
created_at = DateTimeField(auto_now_add=True)
class Image(models.Model):
content = ForeignKey(Content)
url = URLField()
alt_text = CharField(max_length=255)
style_type = CharField(choices=['hero', 'supporting', 'ecommerce', 'category', 'service', 'conversion']) # NEW
sag_cluster_id = UUIDField(null=True, blank=True) # NEW
created_at = DateTimeField(auto_now_add=True)
class Job(models.Model):
"""Pipeline execution tracking"""
site = ForeignKey(Site)
status = CharField(choices=['pending', 'running', 'completed', 'failed'])
stage = IntegerField(choices=[(0, 'Blueprint Check'), (1, 'Keywords'), (2, 'Cluster'), (3, 'Ideas'), (4, 'Tasks'), (5, 'Content'), (6, 'Taxonomy'), (7, 'Images')])
blueprint_mode = CharField(choices=['legacy', 'blueprint_aware']) # NEW
log = TextField(default='')
created_at = DateTimeField(auto_now_add=True)
completed_at = DateTimeField(null=True)
```
### API Endpoints (Celery Task Functions)
#### Stage 0: Blueprint Check
```python
# celery_app/tasks.py
@app.task(bind=True, max_retries=3)
def check_blueprint(self, site_id):
"""
Stage 0: Determine execution mode and load blueprint context.
Returns:
{
'status': 'success',
'pipeline_mode': 'blueprint_aware' | 'legacy',
'blueprint_id': 'uuid' (if active),
'execution_phases': list,
'next_stage': 1
}
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(site=site, stage=0, status='running')
blueprint = SAGBlueprint.objects.filter(
site=site,
status='active'
).first()
if blueprint:
result = {
'status': 'success',
'pipeline_mode': 'blueprint_aware',
'blueprint_id': str(blueprint.id),
'execution_phases': blueprint.execution_priority,
}
job.blueprint_mode = 'blueprint_aware'
else:
result = {
'status': 'success',
'pipeline_mode': 'legacy',
'blueprint_id': None,
'execution_phases': None,
}
job.blueprint_mode = 'legacy'
job.status = 'completed'
job.save()
# Chain to Stage 1
process_keywords.delay(site_id, result)
return result
except Exception as e:
self.retry(exc=e, countdown=60)
```
#### Stage 1: Keyword Processing
```python
@app.task(bind=True, max_retries=3)
def process_keywords(self, site_id, blueprint_context):
"""
Stage 1: Process keywords and optionally map to SAGClusters.
If blueprint_context['pipeline_mode'] == 'blueprint_aware':
- Map keywords to existing SAGClusters
- Flag unmatched keywords
Else:
- Pass keywords to next stage unchanged
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=1,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
keywords = Keyword.objects.filter(site=site, sag_cluster_id__isnull=True)
if blueprint_context['pipeline_mode'] == 'blueprint_aware':
blueprint = SAGBlueprint.objects.get(id=blueprint_context['blueprint_id'])
clusters = SAGCluster.objects.filter(blueprint=blueprint)
matched_count = 0
unmatched_keywords = []
for keyword in keywords:
# Semantic matching: find best cluster
cluster = _match_keyword_to_cluster(keyword, clusters)
if cluster:
keyword.sag_cluster_id = cluster.id
keyword.save()
cluster.keywords.append(keyword.term)
cluster.save()
matched_count += 1
else:
unmatched_keywords.append(keyword.term)
job.log = f"Matched {matched_count} keywords. Unmatched: {unmatched_keywords}"
else:
job.log = "Legacy mode: keywords passed unchanged"
job.status = 'completed'
job.save()
# Chain to Stage 2
cluster_keywords.delay(site_id, blueprint_context)
return {'status': 'success', 'keywords_processed': keywords.count()}
except Exception as e:
self.retry(exc=e, countdown=60)
def _match_keyword_to_cluster(keyword, clusters):
"""Find best-matching SAGCluster for keyword via embedding similarity."""
# Uses semantic search (embeddings) to find best cluster match
# Returns SAGCluster or None
pass
```
#### Stage 2: AI Cluster Keywords
```python
@app.task(bind=True, max_retries=3)
def cluster_keywords(self, site_id, blueprint_context):
"""
Stage 2: Cluster keywords.
If blueprint_aware:
- SKIP AutoClusterKeywords
- Use blueprint clusters from Stage 0
Else:
- Run AutoClusterKeywords (existing function)
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=2,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
if blueprint_context['pipeline_mode'] == 'blueprint_aware':
# Clusters already exist from blueprint
clusters = SAGCluster.objects.filter(
blueprint_id=blueprint_context['blueprint_id']
)
job.log = f"Using {clusters.count()} blueprint clusters"
else:
# Run existing AutoClusterKeywords
clusters = AutoClusterKeywords(site_id)
job.log = f"AutoClusterKeywords created {clusters.count()} clusters"
job.status = 'completed'
job.save()
# Chain to Stage 3
generate_ideas.delay(site_id, blueprint_context)
return {'status': 'success', 'clusters': clusters.count()}
except Exception as e:
self.retry(exc=e, countdown=60)
```
#### Stage 3: Generate Content Ideas
```python
@app.task(bind=True, max_retries=3)
def generate_ideas(self, site_id, blueprint_context):
"""
Stage 3: Generate content ideas.
If blueprint_aware:
- Call GenerateIdeasWithBlueprint
- Enrich ideas with type, structure, sector
- Respect execution phases
Else:
- Call existing GenerateIdeas
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=3,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
if blueprint_context['pipeline_mode'] == 'blueprint_aware':
blueprint = SAGBlueprint.objects.get(id=blueprint_context['blueprint_id'])
ideas = GenerateIdeasWithBlueprint(site, blueprint)
job.log = f"Generated {len(ideas)} blueprint-aware ideas across {len(blueprint_context['execution_phases'])} phases"
else:
ideas = GenerateIdeas(site)
job.log = f"Generated {len(ideas)} legacy ideas"
job.status = 'completed'
job.save()
# Chain to Stage 4
create_tasks.delay(site_id, blueprint_context)
return {'status': 'success', 'ideas': len(ideas)}
except Exception as e:
self.retry(exc=e, countdown=60)
```
#### Stage 4: Create Writer Tasks
```python
@app.task(bind=True, max_retries=3)
def create_tasks(self, site_id, blueprint_context):
"""
Stage 4: Create writer tasks.
If blueprint_aware:
- Enrich task with sag_cluster_id and blueprint_context JSON
- Respect execution phase priority
Else:
- Create basic tasks
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=4,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
ideas = Idea.objects.filter(site=site, task__isnull=True)
task_count = 0
for idea in ideas:
task = Task.objects.create(
site=site,
title=idea.title,
idea=idea,
status='pending'
)
if blueprint_context['pipeline_mode'] == 'blueprint_aware' and idea.sag_cluster_id:
cluster = SAGCluster.objects.get(id=idea.sag_cluster_id)
blueprint = cluster.blueprint
task.sag_cluster_id = idea.sag_cluster_id
task.blueprint_context = {
'cluster_id': str(cluster.id),
'cluster_name': cluster.name,
'cluster_type': cluster.cluster_type,
'cluster_sector': cluster.sector,
'hub_title': blueprint.content_plan.get(str(cluster.id), {}).get('hub_title'),
'hub_url': f"{site.domain}/hubs/{cluster.name.lower().replace(' ', '-')}",
'cluster_attributes': cluster.attributes,
'content_structure': idea.structure,
'content_type': idea.content_type,
'execution_phase': idea.execution_phase,
}
task.save()
task_count += 1
job.log = f"Created {task_count} tasks"
job.status = 'completed'
job.save()
# Chain to Stage 5
generate_content.delay(site_id, blueprint_context)
return {'status': 'success', 'tasks': task_count}
except Exception as e:
self.retry(exc=e, countdown=60)
```
#### Stage 5: Generate Article Content
```python
@app.task(bind=True, max_retries=3)
def generate_content(self, site_id, blueprint_context):
"""
Stage 5: Generate article content.
If task has blueprint_context:
- Load prompt template by content_type + structure
- Inject blueprint context variables
- Call GPT-4 with enriched prompt
- Post-process for internal links
Else:
- Call existing GenerateContent
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=5,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
tasks = Task.objects.filter(site=site, status='completed', content__isnull=True)
content_count = 0
for task in tasks:
if task.blueprint_context:
# Blueprint-aware content generation
prompt_key = _get_prompt_key(
task.blueprint_context['content_type'],
task.blueprint_context['content_structure']
)
template = PROMPT_TEMPLATES.get(prompt_key)
# Inject variables
prompt = template.format(**task.blueprint_context)
# Call GPT-4
article = gpt4_call(prompt)
# Post-process
article = _add_internal_links(article, task.blueprint_context)
else:
# Legacy content generation
article = GenerateContent(task.idea.keyword)
content = Content.objects.create(
site=site,
title=task.title,
body=article,
task=task,
sag_cluster_id=task.sag_cluster_id,
content_type=task.blueprint_context.get('content_type') if task.blueprint_context else 'blog_post',
content_structure=task.blueprint_context.get('content_structure') if task.blueprint_context else None,
)
content_count += 1
job.log = f"Generated {content_count} articles"
job.status = 'completed'
job.save()
# Chain to Stage 6
assign_taxonomy.delay(site_id, blueprint_context)
return {'status': 'success', 'content': content_count}
except Exception as e:
self.retry(exc=e, countdown=60)
def _get_prompt_key(content_type, structure):
"""Map content_type + structure to prompt template key."""
mapping = {
('cluster_hub', 'guide_tutorial'): 'sag_hub_guide',
('cluster_hub', 'listicle'): 'sag_hub_listicle',
('blog_post', 'comparison'): 'sag_blog_comparison',
('blog_post', 'review'): 'sag_blog_review',
('blog_post', 'how_to'): 'sag_blog_howto',
('blog_post', 'question'): 'sag_blog_question',
('term_page', 'guide_tutorial'): 'sag_term_page',
('product_page', 'review'): 'sag_product_page',
('service_page', 'guide_tutorial'): 'sag_service_page',
('landing_page', 'guide_tutorial'): 'sag_landing_guide',
('landing_page', 'comparison'): 'sag_landing_comparison',
('business_page', 'guide_tutorial'): 'sag_business_guide',
}
return mapping.get((content_type, structure), 'sag_default')
def _add_internal_links(article, blueprint_context):
"""Add internal links to related cluster hubs and attribute term pages."""
# Parse article, identify linking opportunities
# Inject markdown links to related content
pass
```
#### Stage 6: Taxonomy Assignment
```python
@app.task(bind=True, max_retries=3)
def assign_taxonomy(self, site_id, blueprint_context):
"""
Stage 6: Assign content to custom WP taxonomies (blueprint mode only).
If blueprint_aware:
- Match content to cluster attributes
- Assign custom taxonomy values
- Update cluster status
Else:
- Skip stage
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=6,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
if blueprint_context['pipeline_mode'] != 'blueprint_aware':
job.log = "Legacy mode: taxonomy assignment skipped"
job.status = 'completed'
job.save()
generate_images.delay(site_id, blueprint_context)
return {'status': 'success', 'skipped': True}
blueprint = SAGBlueprint.objects.get(id=blueprint_context['blueprint_id'])
content_items = Content.objects.filter(site=site, sag_cluster_id__isnull=False, taxonomies__isnull=True)
assigned_count = 0
for content in content_items:
cluster = SAGCluster.objects.get(id=content.sag_cluster_id)
# Load taxonomy mapping from blueprint
tax_mapping = blueprint.wp_taxonomy_mapping.get(str(cluster.id), {})
# Assign taxonomies
content.taxonomies = tax_mapping
content.save()
# Update cluster status
if Content.objects.filter(sag_cluster_id=cluster.id).count() > 0:
if cluster.status == 'draft':
cluster.status = 'partial'
cluster.save()
assigned_count += 1
job.log = f"Assigned {assigned_count} content items to taxonomies"
job.status = 'completed'
job.save()
# Chain to Stage 7
generate_images.delay(site_id, blueprint_context)
return {'status': 'success', 'assigned': assigned_count}
except Exception as e:
self.retry(exc=e, countdown=60)
```
#### Stage 7: Image Generation
```python
@app.task(bind=True, max_retries=3)
def generate_images(self, site_id, blueprint_context):
"""
Stage 7: Generate featured and in-article images.
If blueprint_aware:
- Match image style to content type
- Use cluster theme/color palette
Else:
- Generate default style images
"""
try:
site = Site.objects.get(id=site_id)
job = Job.objects.create(
site=site,
stage=7,
status='running',
blueprint_mode=blueprint_context['pipeline_mode']
)
content_items = Content.objects.filter(site=site, image__isnull=True)
image_count = 0
for content in content_items:
if blueprint_context['pipeline_mode'] == 'blueprint_aware' and content.content_type:
# Match style to content type
style_mapping = {
'cluster_hub': 'hero',
'blog_post': 'supporting',
'product_page': 'ecommerce',
'term_page': 'category',
'service_page': 'service',
'landing_page': 'conversion',
}
style = style_mapping.get(content.content_type, 'supporting')
else:
style = 'supporting'
# Generate featured image
featured_image = GenerateImage(content.title, style)
image = Image.objects.create(
content=content,
url=featured_image['url'],
alt_text=featured_image['alt_text'],
style_type=style,
sag_cluster_id=content.sag_cluster_id,
)
image_count += 1
job.log = f"Generated {image_count} images"
job.status = 'completed'
job.save()
return {'status': 'success', 'images': image_count}
except Exception as e:
self.retry(exc=e, countdown=60)
```
---
## 4. IMPLEMENTATION STEPS
### Phase A: Data Model Extensions (Week 1)
1. Add fields to Keyword, Idea, Task, Content, Image models (see Section 3)
2. Create SAGBlueprint, SAGCluster models (reference 01A)
3. Create database migrations
4. Test model relationships and queries
### Phase B: Stage 0 Implementation (Week 1)
1. Implement `check_blueprint` Celery task
2. Add blueprint loading and caching logic
3. Create execution_priority parsing
4. Test with sample blueprints (active and inactive)
5. Add logging and error handling
### Phase C: Stage 12 Enhancement (Week 2)
1. Implement `_match_keyword_to_cluster` function (embedding-based matching)
2. Extend `process_keywords` task for blueprint mode
3. Modify `cluster_keywords` to skip AutoClusterKeywords when blueprint active
4. Add unmatched keyword flagging and reporting
5. Test with mixed keyword sets
### Phase D: Stage 3 Enhancement (Week 2)
1. Create `sag/ai_functions/content_planning.py` module
2. Implement `GenerateIdeasWithBlueprint` function
3. Add phase-based filtering and prioritization
4. Integrate structure/type/sector enrichment
5. Test idea generation for each content type
### Phase E: Stage 4 Enhancement (Week 3)
1. Extend `create_tasks` task with blueprint_context JSON assembly
2. Add execution_phase assignment
3. Test blueprint_context structure completeness
4. Verify sag_cluster_id linking
### Phase F: Stage 5 Enhancement (Week 3)
1. Create PROMPT_TEMPLATES dictionary with all template keys
2. Implement `_get_prompt_key` function
3. Extend `generate_content` task to use templates
4. Implement `_add_internal_links` post-processing
5. Test content generation for each content_type + structure combination
6. Validate prompt variable injection
### Phase G: Stage 6 Implementation (Week 4)
1. Implement `assign_taxonomy` task
2. Add taxonomy mapping logic from blueprint.wp_taxonomy_mapping
3. Implement cluster status updates
4. Test taxonomy assignment with sample blueprints
### Phase H: Stage 7 Enhancement (Week 4)
1. Extend `generate_images` task for blueprint mode
2. Add style_type mapping by content_type
3. Implement color palette usage from blueprint
4. Test image generation for each content type
### Phase I: Integration & Testing (Week 5)
1. Test full pipeline execution with active blueprint
2. Test full pipeline execution without blueprint (legacy mode)
3. Add integration tests for each stage transition
4. Test error handling and retries
5. Load testing with multiple concurrent sites
### Phase J: Deployment & Monitoring (Week 6)
1. Deploy models and migrations to staging
2. Deploy Celery tasks to staging
3. Validate with staging data
4. Set up pipeline execution monitoring (01G)
5. Deploy to production with feature flag (blueprint mode off by default)
---
## 5. ACCEPTANCE CRITERIA
### Functional Requirements
- **Stage 0**: Blueprint check completes successfully; mode determination accurate
- **Stage 1**: Keywords matched to clusters with 85%+ accuracy; unmatched flagged
- **Stage 2**: Legacy mode skipped when blueprint active; clusters pre-loaded
- **Stage 3**: Ideas generated with correct type/structure/sector/cluster assignment
- **Stage 4**: Tasks enriched with complete blueprint_context JSON
- **Stage 5**: Content generated using template-specific prompts; blueprint variables injected
- **Stage 6**: Content assigned to custom taxonomies; cluster status updated
- **Stage 7**: Images generated with correct style matching content type
### Quality Criteria
- **No breaking changes**: Legacy mode works identically to pre-blueprint pipeline
- **Error handling**: All Celery tasks handle failures gracefully; retry logic functional
- **Performance**: Pipeline completes within baseline timing (per site, per stage)
- **Logging**: All stages log execution details and decisions
- **Data integrity**: sag_cluster_id and blueprint_context consistently populated
### Testing Coverage
- Unit tests: Each function and task (>80% coverage)
- Integration tests: Full pipeline execution with/without blueprint
- Scenario tests:
- Active blueprint (all phases)
- Inactive blueprint (legacy mode)
- Mixed keywords (matched + unmatched)
- Multiple sites with different blueprints
- Failed tasks (retry logic)
### Documentation
- Docstrings: All functions documented with inputs/outputs
- README: Setup and execution instructions
- Troubleshooting guide: Common issues and solutions
### Monitoring (01G Health Monitoring)
- Pipeline execution time per stage per site
- Content generation success rate by content_type
- Taxonomy assignment accuracy
- Cluster completion status tracking
- Unmatched keyword trending
---
## 6. CLAUDE CODE INSTRUCTIONS
### Running the Pipeline Locally
#### Prerequisites
```bash
# Install dependencies
pip install -r requirements.txt
celery[redis] pytest pytest-django
# Set up local database
python manage.py migrate
# Start Redis (for Celery)
redis-server
```
#### Initialize Test Data
```bash
# Create sample site and blueprint
python manage.py shell << EOF
from django.contrib.auth.models import User
from sites.models import Site
from sag.models import SAGBlueprint, SAGCluster
site = Site.objects.create(name="Test Site", domain="test.local")
blueprint = SAGBlueprint.objects.create(
site=site,
name="Test Blueprint",
status="active",
execution_priority={
"phase_1": ["category_pages", "top_cluster_hubs"],
"phase_2": ["remaining_hubs"],
"phase_3": ["attribute_term_pages"],
"phase_4": ["additional_blogs"],
},
content_plan={},
wp_taxonomy_mapping={}
)
cluster = SAGCluster.objects.create(
blueprint=blueprint,
name="Test Cluster",
cluster_type="topical",
sector="Tech",
keywords=["python", "django"],
attributes=["web development", "open source"],
status="draft"
)
print(f"Created site {site.id}, blueprint {blueprint.id}, cluster {cluster.id}")
EOF
```
#### Execute Pipeline Stages
```bash
# Start Celery worker (in separate terminal)
celery -A igny8.celery_app worker --loglevel=info
# Run Stage 0: Blueprint Check
python manage.py shell << EOF
from celery_app.tasks import check_blueprint
result = check_blueprint.delay(site_id="<site-uuid>")
print(result.get())
EOF
# Run full pipeline
python manage.py shell << EOF
from celery_app.tasks import check_blueprint
from uuid import UUID
site_id = UUID("<site-uuid>")
check_blueprint.delay(site_id)
# Each stage automatically chains to the next
EOF
# Monitor pipeline execution
celery -A igny8.celery_app events
# or view logs: tail -f celery.log
```
### Testing the Pipeline
#### Unit Tests
```bash
pytest content/tests/test_pipeline.py -v
pytest sag/tests/test_blueprint.py -v
pytest celery_app/tests/test_tasks.py -v
```
#### Integration Test
```bash
pytest content/tests/test_pipeline_integration.py::test_full_blueprint_pipeline -v
# Test legacy mode
pytest content/tests/test_pipeline_integration.py::test_full_legacy_pipeline -v
# Test mixed mode (some sites with blueprint, some without)
pytest content/tests/test_pipeline_integration.py::test_mixed_mode_execution -v
```
#### Manual Test Scenario
```bash
# 1. Create test site and blueprint
python manage.py shell < scripts/setup_test_data.py
# 2. Import sample keywords
python manage.py shell << EOF
from content.models import Keyword
from sites.models import Site
site = Site.objects.get(name="Test Site")
keywords = ["python tutorial", "django rest", "web scraping"]
for kw in keywords:
Keyword.objects.create(site=site, term=kw, source='csv_import')
EOF
# 3. Run pipeline
celery -A igny8.celery_app worker --loglevel=debug &
python manage.py shell << EOF
from celery_app.tasks import check_blueprint
from sites.models import Site
site = Site.objects.get(name="Test Site")
check_blueprint.delay(site.id)
EOF
# 4. Inspect results
python manage.py shell << EOF
from content.models import Keyword, Idea, Task, Content, Image
from sites.models import Site
site = Site.objects.get(name="Test Site")
print("Keywords:", Keyword.objects.filter(site=site).count())
print("Ideas:", Idea.objects.filter(site=site).count())
print("Tasks:", Task.objects.filter(site=site).count())
print("Content:", Content.objects.filter(site=site).count())
print("Images:", Image.objects.filter(site=site).count())
# Check blueprint context
task = Task.objects.filter(site=site, blueprint_context__isnull=False).first()
if task:
print("Blueprint context:", task.blueprint_context)
EOF
```
### Debugging Common Issues
#### Blueprint Not Detected
```bash
# Check if blueprint exists and is active
python manage.py shell << EOF
from sag.models import SAGBlueprint
from sites.models import Site
site = Site.objects.get(id="<site-id>")
blueprint = SAGBlueprint.objects.filter(site=site, status='active').first()
print(f"Blueprint: {blueprint}")
if blueprint:
print(f"Status: {blueprint.status}")
print(f"Content plan: {blueprint.content_plan}")
EOF
```
#### Keywords Not Matching
```bash
# Check keyword-cluster mapping
python manage.py shell << EOF
from content.models import Keyword
from sag.models import SAGCluster
keywords = Keyword.objects.filter(sag_cluster_id__isnull=True)
print(f"Unmatched keywords: {[kw.term for kw in keywords]}")
# Check available clusters
clusters = SAGCluster.objects.all()
for cluster in clusters:
print(f"Cluster '{cluster.name}': {cluster.attributes}")
EOF
```
#### Content Not Generated
```bash
# Check task status
python manage.py shell << EOF
from content.models import Task
tasks = Task.objects.all()
for task in tasks:
print(f"Task {task.id}: status={task.status}, blueprint_context={bool(task.blueprint_context)}")
EOF
# Check Celery task logs
celery -A igny8.celery_app inspect active
celery -A igny8.celery_app inspect reserved
celery -A igny8.celery_app purge # WARNING: clears queue
```
### Extending with Custom Prompt Templates
#### Add New Template
```python
# In sag/prompt_templates.py
PROMPT_TEMPLATES = {
'sag_hub_guide': """
You are writing a comprehensive guide for {cluster_name}, a {cluster_type} in the {cluster_sector} sector.
Topic: {cluster_name}
Related terms: {attribute_terms}
Hub page: {hub_url}
Structure: Guide/Tutorial format
- Introduction: What is {cluster_name}?
- Key concepts: {attribute_terms}
- Step-by-step guide
- Common pitfalls
- Conclusion with links to {hub_title}
Write a comprehensive, SEO-optimized guide.
""",
# Add more templates here...
}
# Usage in generate_content task:
# template = PROMPT_TEMPLATES['sag_hub_guide']
# prompt = template.format(**blueprint_context)
```
### Monitoring Pipeline Health (Integration with 01G)
```bash
# View pipeline execution history
python manage.py shell << EOF
from content.models import Job
jobs = Job.objects.filter(stage=5).order_by('-created_at')[:10]
for job in jobs:
duration = (job.completed_at - job.created_at).total_seconds() if job.completed_at else None
print(f"Stage {job.stage}: {job.status} ({duration}s) - {job.blueprint_mode}")
EOF
# Check cluster completion status
python manage.py shell << EOF
from sag.models import SAGCluster
clusters = SAGCluster.objects.all()
for cluster in clusters:
content_count = cluster.content_set.count()
print(f"Cluster '{cluster.name}': {cluster.status} ({content_count} content items)")
EOF
```
---
## Cross-References
| Document | Reference Purpose |
|----------|-------------------|
| **01A**: SAG Blueprint Model | SAGBlueprint, SAGCluster models used at Stage 0 |
| **01C**: Cluster Formation | Clusters created by SAG framework; used by pipeline |
| **01D**: Setup Wizard | Creates blueprint that drives pipeline execution |
| **01F**: Case 1 Analysis | Produces blueprints that feed this pipeline |
| **01G**: Health Monitoring | Tracks pipeline output per cluster and stage |
| **Content_Types_Writing_Plan.md** | Content type definitions; prompt template structure |
---
## Summary
The Blueprint-Aware Content Pipeline enhances IGNY8's 7-stage automation with SAG framework context at every step. When a site has an active blueprint, content generation becomes strategic: keywords map to clusters, ideas inherit type/structure/sector assignments, prompts leverage cluster context, and output auto-taxonomizes. When no blueprint exists, the pipeline defaults to legacy mode unchanged.
**Key innovation**: Two-mode execution (blueprint-aware + legacy) enables gradual adoption—teams can opt in to blueprint-driven content without disrupting existing sites. **Execution priority phases** ensure foundational content (hubs) publishes before supporting content (blogs), building authority tier-by-tier.