1266 lines
44 KiB
Markdown
1266 lines
44 KiB
Markdown
# 01E: Blueprint-Aware Content Pipeline
|
||
**IGNY8 Phase 1: Content Automation with SAG Blueprint Enhancement**
|
||
|
||
---
|
||
|
||
## 1. CURRENT STATE
|
||
|
||
### Existing Pipeline Architecture
|
||
IGNY8's content pipeline operates as a 7-stage automated system, orchestrated via Celery with scheduled execution (daily/weekly/monthly via Celery Beat):
|
||
|
||
| Stage | Function | Automation | Output |
|
||
|-------|----------|-----------|--------|
|
||
| 1 | Keywords | Import CSV/seed lists | Keyword list per site |
|
||
| 2 | Clusters | AutoClusterKeywords (GPT-4) | Semantic keyword groups |
|
||
| 3 | Ideas | GenerateIdeas | Content brief queue |
|
||
| 4 | Tasks | Queue creation | Writer task list |
|
||
| 5 | Content | GenerateContent (AI) | Draft articles |
|
||
| 6 | Images | GenerateImages | Featured + in-article images |
|
||
| 7 | Review | Editorial queue | Published content |
|
||
|
||
### Current Limitations
|
||
- **Generic clustering**: All keywords grouped by semantic similarity, no business-specific structure
|
||
- **One-size-fits-all content**: All articles follow same template regardless of content type
|
||
- **No hierarchy**: No distinction between hub pages, blog posts, product pages, term pages, or service pages
|
||
- **No priority**: All content treated equally; foundational content (hubs) may not be written first
|
||
- **No taxonomy integration**: Generated content not automatically assigned to custom taxonomies
|
||
- **No blueprint context**: Writers receive keywords but not strategic framework
|
||
|
||
### Celery Automation Context
|
||
- **Celery Beat**: Manages recurring schedule (daily, weekly, monthly per site)
|
||
- **Task Queue**: Each stage enqueued as separate Celery task
|
||
- **State Tracking**: Uses Django ORM to track Job, Stage, Keyword, Cluster, Idea, Task, Content, Image models
|
||
- **Failure Handling**: Retry logic, dead-letter queue for failed tasks
|
||
- **Logging**: Structured logging to track execution per site per stage
|
||
|
||
---
|
||
|
||
## 2. WHAT TO BUILD
|
||
|
||
### Vision: Blueprint-Driven Pipeline
|
||
When a site has an **active SAG Blueprint**, every pipeline stage becomes context-aware:
|
||
- Content priorities driven by blueprint's execution phases
|
||
- Content types (hub, blog, product, term, service) determined at ideation
|
||
- Prompt templates matched to content structure and type
|
||
- Output taxonomy-tagged and cluster-assigned automatically
|
||
|
||
When **no blueprint exists**, the pipeline reverts to legacy mode—no breaking changes.
|
||
|
||
### New/Enhanced Stages
|
||
|
||
#### Stage 0: Blueprint Check (NEW)
|
||
Execute before pipeline stages 1–7.
|
||
|
||
**Responsibility**: Determine execution mode and load context.
|
||
|
||
**Logic**:
|
||
```python
|
||
IF Site.sag_blueprint EXISTS AND sag_blueprint.status == 'active':
|
||
LOAD blueprint
|
||
IDENTIFY unfulfilled content needs from blueprint.content_plan
|
||
DETERMINE execution_priority from blueprint.execution_phases
|
||
SET pipeline_mode = 'blueprint_aware'
|
||
ELSE:
|
||
SET pipeline_mode = 'legacy'
|
||
PROCEED to Stage 1 with no blueprint context
|
||
```
|
||
|
||
**Outputs**:
|
||
- `pipeline_mode`: 'blueprint_aware' | 'legacy'
|
||
- `blueprint_context`: SAGBlueprint instance (if active)
|
||
- `execution_phases`: List of priority phases for content queue
|
||
|
||
---
|
||
|
||
#### Stage 1: Keyword Processing (ENHANCED)
|
||
**Legacy behavior** (no blueprint): Pass keywords to Stage 2 unchanged.
|
||
|
||
**Blueprint-aware** (active blueprint):
|
||
1. For each new/imported keyword, query blueprint's SAGClusters
|
||
2. Match keyword to existing clusters based on:
|
||
- Attribute overlap (e.g., keyword "sustainable farming" matches cluster with attribute "sustainability")
|
||
- Semantic proximity to cluster topic
|
||
- Sector alignment
|
||
3. Assign matched keyword to cluster's `keywords` list
|
||
4. Flag unmatched keywords:
|
||
- **Gap**: No cluster exists for this topic
|
||
- **Outlier**: Keyword semantic distance > threshold from all clusters
|
||
- **Frontier**: Keyword extends cluster into new subtopic (possible new cluster)
|
||
5. Update `SAGCluster.keywords`, `SAGCluster.updated_at`
|
||
|
||
**Outputs**:
|
||
- Updated cluster keyword lists
|
||
- Gap/outlier report for content strategy review
|
||
- Flagged keywords for potential new cluster formation
|
||
|
||
---
|
||
|
||
#### Stage 2: AI Cluster Keywords (ENHANCED)
|
||
**Legacy behavior** (no blueprint): Run existing `AutoClusterKeywords` via GPT-4 grouping.
|
||
|
||
**Blueprint-aware** (active blueprint):
|
||
1. **SKIP** `AutoClusterKeywords` entirely
|
||
2. Clusters already defined by SAG framework (Stage 0 loaded blueprint)
|
||
3. For new keywords from Stage 1:
|
||
- Map to existing clusters (already done in Stage 1)
|
||
- Create mapping record linking keyword → SAGCluster
|
||
4. Flag unmatched keywords (from Stage 1) for manual review
|
||
5. No new clusters created (cluster formation is Phase 1C process, not pipeline)
|
||
|
||
**Outputs**:
|
||
- Keyword-to-cluster mapping
|
||
- Unmatched keyword report
|
||
|
||
---
|
||
|
||
#### Stage 3: Generate Content Ideas (ENHANCED)
|
||
**Legacy behavior** (no blueprint): Run existing `GenerateIdeas` function.
|
||
|
||
**Blueprint-aware** (active blueprint):
|
||
1. Call `sag/ai_functions/content_planning.py::GenerateIdeasWithBlueprint`
|
||
2. For each idea generated, enrich with:
|
||
- **Sector**: From SAGCluster.sector
|
||
- **Structure**: From blueprint.content_plan[cluster].structure (e.g., 'guide_tutorial', 'comparison', 'review', 'how_to', 'question')
|
||
- **Type**: From blueprint.content_plan[cluster].type (e.g., 'cluster_hub', 'blog_post', 'product_page', 'term_page', 'service_page')
|
||
- **SAGCluster ID**: Link idea to blueprint cluster
|
||
- **idea_source**: Set to 'sag_blueprint'
|
||
3. Respect execution phases:
|
||
- Phase 1: Generate ideas for `category_pages`, `top_cluster_hubs`
|
||
- Phase 2: Generate ideas for `remaining_hubs`, `first_blogs_per_cluster`
|
||
- Phase 3: Generate ideas for `attribute_term_pages`, `product_enrichment`
|
||
- Phase 4: Generate ideas for `additional_blogs`, `brand_comparisons`
|
||
4. Prioritize queuing by phase
|
||
|
||
**Outputs**:
|
||
- Idea records with type, structure, sector, cluster assignment
|
||
- Execution phase assignments
|
||
- Queue prioritized by phase
|
||
|
||
---
|
||
|
||
#### Stage 4: Create Writer Tasks (ENHANCED)
|
||
**Legacy behavior** (no blueprint): Create basic task with keyword/idea reference.
|
||
|
||
**Blueprint-aware** (active blueprint):
|
||
1. For each idea, create Task with:
|
||
- Standard fields: title, keyword, site, status, assigned_to
|
||
- **New fields**:
|
||
- `sag_cluster_id`: Reference to blueprint cluster
|
||
- `blueprint_context`: JSON blob containing execution context
|
||
2. `blueprint_context` structure:
|
||
```json
|
||
{
|
||
"cluster_id": "uuid",
|
||
"cluster_name": "string",
|
||
"cluster_type": "string (topical|product|service)",
|
||
"cluster_sector": "string",
|
||
"hub_title": "string (cluster's main hub page title)",
|
||
"hub_url": "string (blueprint.site.domain/cluster_slug)",
|
||
"cluster_attributes": ["list of attribute terms"],
|
||
"related_clusters": ["list of related cluster ids"],
|
||
"cluster_products": ["list of product ids if product cluster"],
|
||
"content_structure": "string (guide_tutorial|comparison|review|how_to|question|listicle)",
|
||
"content_type": "string (cluster_hub|blog_post|product_page|term_page|service_page)",
|
||
"execution_phase": "integer (1-4)",
|
||
"seo_strategy": "object (primary_keyword, related_keywords, intent)"
|
||
}
|
||
```
|
||
3. If no blueprint: Create task without blueprint_context (legacy)
|
||
|
||
**Outputs**:
|
||
- Task records with sag_cluster_id and blueprint_context
|
||
|
||
---
|
||
|
||
#### Stage 5: Generate Article Content (ENHANCED)
|
||
**Legacy behavior** (no blueprint): Run existing `GenerateContent` with generic prompt.
|
||
|
||
**Blueprint-aware** (has blueprint_context):
|
||
1. **Load prompt template** by content_type + content_structure combination:
|
||
|
||
| Content Type | Structure | Template Key |
|
||
|---|---|---|
|
||
| Cluster Hub | Guide Tutorial | `sag_hub_guide` |
|
||
| Cluster Hub | Top Listicle | `sag_hub_listicle` |
|
||
| Blog Post | Comparison | `sag_blog_comparison` |
|
||
| Blog Post | Review | `sag_blog_review` |
|
||
| Blog Post | How To | `sag_blog_howto` |
|
||
| Blog Post | Question | `sag_blog_question` |
|
||
| Term Page | Guide Tutorial | `sag_term_page` |
|
||
| Product Page | Review | `sag_product_page` |
|
||
| Service Page | Guide Tutorial | `sag_service_page` |
|
||
| Landing Page | Guide Tutorial | `sag_landing_guide` |
|
||
| Landing Page | Comparison | `sag_landing_comparison` |
|
||
| Business Page | Guide Tutorial | `sag_business_guide` |
|
||
|
||
2. **Inject blueprint context variables** into prompt template:
|
||
```
|
||
{cluster_name} → From SAGCluster.name
|
||
{cluster_type} → From SAGCluster.cluster_type
|
||
{cluster_sector} → From SAGCluster.sector
|
||
{hub_title} → From blueprint_context.hub_title
|
||
{hub_url} → From blueprint_context.hub_url
|
||
{attribute_terms} → Comma-separated list from cluster attributes
|
||
{cluster_products} → Product list if product cluster
|
||
{related_clusters} → Related cluster names for internal linking
|
||
{content_structure} → Structure type for consistency
|
||
{content_type} → Content type for tone/depth
|
||
```
|
||
|
||
3. Call GPT-4 with enriched prompt template
|
||
|
||
4. Post-process output:
|
||
- Add internal links to related cluster hubs
|
||
- Add cross-references to attribute term pages
|
||
- Inject CTA appropriate to content type (e.g., product link for product cluster)
|
||
|
||
5. If no blueprint_context: Run legacy `GenerateContent` unchanged
|
||
|
||
**Outputs**:
|
||
- Content record with body, title, sag_cluster_id, content_type, content_structure
|
||
|
||
---
|
||
|
||
#### Stage 6: Taxonomy Assignment (NEW)
|
||
Execute after content generation, **only if blueprint exists**.
|
||
|
||
**Responsibility**: Auto-assign content to custom WP taxonomies derived from blueprint.
|
||
|
||
**Logic**:
|
||
1. Load site's custom taxonomies from blueprint (`SAGCluster.wp_taxonomy_mapping`)
|
||
2. For generated content:
|
||
- Match content to cluster's attributes and taxonomy terms
|
||
- Assign custom taxonomy values from blueprint mapping
|
||
- Set `content.sag_cluster_id` (links to blueprint structure)
|
||
- Update cluster status:
|
||
- If first content in cluster: set `SAGCluster.status = 'partial'`
|
||
- If all planned content exists: set `SAGCluster.status = 'complete'`
|
||
3. Store taxonomy assignments in `Content.taxonomies` JSON field
|
||
|
||
**Outputs**:
|
||
- Content records tagged with custom taxonomies
|
||
- Cluster status updated to reflect content completion
|
||
|
||
---
|
||
|
||
#### Stage 7: Image Generation (ENHANCED)
|
||
**Legacy behavior** (no blueprint): Generate generic featured + in-article images.
|
||
|
||
**Blueprint-aware** (blueprint exists):
|
||
1. Match image style to content type:
|
||
- **Hub page**: Hero/authority style (professional, comprehensive)
|
||
- **Blog post**: Supporting/educational (friendly, illustrative)
|
||
- **Product page**: E-commerce standard (product-focused, clean)
|
||
- **Term page**: Category representation (taxonomy icon or concept illustration)
|
||
- **Service page**: Service illustration (professional, trustworthy)
|
||
- **Landing page**: Conversion-focused (compelling, aspirational)
|
||
2. Use cluster theme/color palette from blueprint for style consistency
|
||
3. Generate alt text leveraging content_structure + cluster context
|
||
4. If no blueprint: Generate images with default style
|
||
|
||
**Outputs**:
|
||
- Image records with style type, alt text, sag_cluster_id
|
||
|
||
---
|
||
|
||
### Execution Priority (Blueprint-Driven)
|
||
Pipeline processes content by `SAGBlueprint.execution_priority` phases:
|
||
|
||
```python
|
||
execution_priority = {
|
||
"phase_1": ["category_pages", "top_cluster_hubs"],
|
||
"phase_2": ["remaining_hubs", "first_blogs_per_cluster"],
|
||
"phase_3": ["attribute_term_pages", "product_enrichment"],
|
||
"phase_4": ["additional_blogs", "brand_comparisons"]
|
||
}
|
||
```
|
||
|
||
**Queue behavior**:
|
||
- Stage 3 filters ideas by phase
|
||
- Stage 4 prioritizes tasks by phase
|
||
- Celery task enqueuing respects phase order
|
||
- **Rationale**: Foundational content (hubs) published before supporting content (blogs)
|
||
|
||
---
|
||
|
||
## 3. DATA MODELS / APIs
|
||
|
||
### Related Models (from 01A, 01C, 01D)
|
||
```python
|
||
# sag/models.py — SAG Blueprint Structure
|
||
|
||
class SAGBlueprint(models.Model):
|
||
site = ForeignKey(Site)
|
||
name = CharField(max_length=255)
|
||
status = CharField(choices=['draft', 'active', 'archived'])
|
||
created_at = DateTimeField(auto_now_add=True)
|
||
updated_at = DateTimeField(auto_now=True)
|
||
|
||
# Phase-based execution plan
|
||
execution_priority = JSONField(default=dict) # phases 1-4
|
||
content_plan = JSONField() # cluster_id → content specs
|
||
|
||
# Taxonomy mapping to WordPress custom taxonomies
|
||
wp_taxonomy_mapping = JSONField() # cluster_id → tax values
|
||
|
||
class SAGCluster(models.Model):
|
||
blueprint = ForeignKey(SAGBlueprint)
|
||
name = CharField(max_length=255)
|
||
cluster_type = CharField(choices=['topical', 'product', 'service'])
|
||
sector = CharField(max_length=255)
|
||
keywords = JSONField(default=list)
|
||
attributes = JSONField(default=list)
|
||
status = CharField(choices=['draft', 'partial', 'complete'])
|
||
updated_at = DateTimeField(auto_now=True)
|
||
```
|
||
|
||
### Pipeline Models (existing)
|
||
```python
|
||
# content/models.py — Content Pipeline
|
||
|
||
class Keyword(models.Model):
|
||
site = ForeignKey(Site)
|
||
term = CharField(max_length=255)
|
||
source = CharField(choices=['csv_import', 'seed_list', 'user', 'sag_blueprint'])
|
||
sag_cluster_id = UUIDField(null=True, blank=True) # NEW: links to blueprint cluster
|
||
created_at = DateTimeField(auto_now_add=True)
|
||
|
||
class Cluster(models.Model):
|
||
site = ForeignKey(Site)
|
||
name = CharField(max_length=255)
|
||
keywords = JSONField(default=list)
|
||
created_by = CharField(choices=['auto_cluster', 'sag_blueprint'])
|
||
|
||
class Idea(models.Model):
|
||
site = ForeignKey(Site)
|
||
title = CharField(max_length=255)
|
||
keyword = ForeignKey(Keyword)
|
||
cluster = ForeignKey(Cluster, null=True)
|
||
sector = CharField(max_length=255) # NEW
|
||
structure = CharField(choices=['guide_tutorial', 'comparison', 'review', 'how_to', 'question', 'listicle']) # NEW
|
||
content_type = CharField(choices=['cluster_hub', 'blog_post', 'product_page', 'term_page', 'service_page', 'landing_page', 'business_page']) # NEW
|
||
sag_cluster_id = UUIDField(null=True, blank=True) # NEW
|
||
idea_source = CharField(choices=['auto_generate', 'sag_blueprint']) # NEW
|
||
execution_phase = IntegerField(null=True) # NEW: 1-4 from blueprint
|
||
created_at = DateTimeField(auto_now_add=True)
|
||
|
||
class Task(models.Model):
|
||
site = ForeignKey(Site)
|
||
title = CharField(max_length=255)
|
||
idea = ForeignKey(Idea)
|
||
status = CharField(choices=['pending', 'assigned', 'in_progress', 'review', 'completed'])
|
||
assigned_to = ForeignKey(User, null=True)
|
||
sag_cluster_id = UUIDField(null=True, blank=True) # NEW
|
||
blueprint_context = JSONField(null=True, blank=True) # NEW: execution context
|
||
created_at = DateTimeField(auto_now_add=True)
|
||
|
||
class Content(models.Model):
|
||
site = ForeignKey(Site)
|
||
title = CharField(max_length=255)
|
||
body = TextField()
|
||
task = ForeignKey(Task, null=True)
|
||
content_type = CharField(choices=['cluster_hub', 'blog_post', 'product_page', 'term_page', 'service_page', 'landing_page', 'business_page']) # NEW
|
||
content_structure = CharField(choices=['guide_tutorial', 'comparison', 'review', 'how_to', 'question', 'listicle']) # NEW
|
||
sag_cluster_id = UUIDField(null=True, blank=True) # NEW
|
||
taxonomies = JSONField(default=dict, null=True, blank=True) # NEW: custom WP taxonomies
|
||
status = CharField(choices=['draft', 'review', 'published'])
|
||
created_at = DateTimeField(auto_now_add=True)
|
||
|
||
class Image(models.Model):
|
||
content = ForeignKey(Content)
|
||
url = URLField()
|
||
alt_text = CharField(max_length=255)
|
||
style_type = CharField(choices=['hero', 'supporting', 'ecommerce', 'category', 'service', 'conversion']) # NEW
|
||
sag_cluster_id = UUIDField(null=True, blank=True) # NEW
|
||
created_at = DateTimeField(auto_now_add=True)
|
||
|
||
class Job(models.Model):
|
||
"""Pipeline execution tracking"""
|
||
site = ForeignKey(Site)
|
||
status = CharField(choices=['pending', 'running', 'completed', 'failed'])
|
||
stage = IntegerField(choices=[(0, 'Blueprint Check'), (1, 'Keywords'), (2, 'Cluster'), (3, 'Ideas'), (4, 'Tasks'), (5, 'Content'), (6, 'Taxonomy'), (7, 'Images')])
|
||
blueprint_mode = CharField(choices=['legacy', 'blueprint_aware']) # NEW
|
||
log = TextField(default='')
|
||
created_at = DateTimeField(auto_now_add=True)
|
||
completed_at = DateTimeField(null=True)
|
||
```
|
||
|
||
### API Endpoints (Celery Task Functions)
|
||
|
||
#### Stage 0: Blueprint Check
|
||
```python
|
||
# celery_app/tasks.py
|
||
|
||
@app.task(bind=True, max_retries=3)
|
||
def check_blueprint(self, site_id):
|
||
"""
|
||
Stage 0: Determine execution mode and load blueprint context.
|
||
|
||
Returns:
|
||
{
|
||
'status': 'success',
|
||
'pipeline_mode': 'blueprint_aware' | 'legacy',
|
||
'blueprint_id': 'uuid' (if active),
|
||
'execution_phases': list,
|
||
'next_stage': 1
|
||
}
|
||
"""
|
||
try:
|
||
site = Site.objects.get(id=site_id)
|
||
job = Job.objects.create(site=site, stage=0, status='running')
|
||
|
||
blueprint = SAGBlueprint.objects.filter(
|
||
site=site,
|
||
status='active'
|
||
).first()
|
||
|
||
if blueprint:
|
||
result = {
|
||
'status': 'success',
|
||
'pipeline_mode': 'blueprint_aware',
|
||
'blueprint_id': str(blueprint.id),
|
||
'execution_phases': blueprint.execution_priority,
|
||
}
|
||
job.blueprint_mode = 'blueprint_aware'
|
||
else:
|
||
result = {
|
||
'status': 'success',
|
||
'pipeline_mode': 'legacy',
|
||
'blueprint_id': None,
|
||
'execution_phases': None,
|
||
}
|
||
job.blueprint_mode = 'legacy'
|
||
|
||
job.status = 'completed'
|
||
job.save()
|
||
|
||
# Chain to Stage 1
|
||
process_keywords.delay(site_id, result)
|
||
|
||
return result
|
||
except Exception as e:
|
||
self.retry(exc=e, countdown=60)
|
||
```
|
||
|
||
#### Stage 1: Keyword Processing
|
||
```python
|
||
@app.task(bind=True, max_retries=3)
|
||
def process_keywords(self, site_id, blueprint_context):
|
||
"""
|
||
Stage 1: Process keywords and optionally map to SAGClusters.
|
||
|
||
If blueprint_context['pipeline_mode'] == 'blueprint_aware':
|
||
- Map keywords to existing SAGClusters
|
||
- Flag unmatched keywords
|
||
Else:
|
||
- Pass keywords to next stage unchanged
|
||
"""
|
||
try:
|
||
site = Site.objects.get(id=site_id)
|
||
job = Job.objects.create(
|
||
site=site,
|
||
stage=1,
|
||
status='running',
|
||
blueprint_mode=blueprint_context['pipeline_mode']
|
||
)
|
||
|
||
keywords = Keyword.objects.filter(site=site, sag_cluster_id__isnull=True)
|
||
|
||
if blueprint_context['pipeline_mode'] == 'blueprint_aware':
|
||
blueprint = SAGBlueprint.objects.get(id=blueprint_context['blueprint_id'])
|
||
clusters = SAGCluster.objects.filter(blueprint=blueprint)
|
||
|
||
matched_count = 0
|
||
unmatched_keywords = []
|
||
|
||
for keyword in keywords:
|
||
# Semantic matching: find best cluster
|
||
cluster = _match_keyword_to_cluster(keyword, clusters)
|
||
if cluster:
|
||
keyword.sag_cluster_id = cluster.id
|
||
keyword.save()
|
||
cluster.keywords.append(keyword.term)
|
||
cluster.save()
|
||
matched_count += 1
|
||
else:
|
||
unmatched_keywords.append(keyword.term)
|
||
|
||
job.log = f"Matched {matched_count} keywords. Unmatched: {unmatched_keywords}"
|
||
else:
|
||
job.log = "Legacy mode: keywords passed unchanged"
|
||
|
||
job.status = 'completed'
|
||
job.save()
|
||
|
||
# Chain to Stage 2
|
||
cluster_keywords.delay(site_id, blueprint_context)
|
||
|
||
return {'status': 'success', 'keywords_processed': keywords.count()}
|
||
except Exception as e:
|
||
self.retry(exc=e, countdown=60)
|
||
|
||
|
||
def _match_keyword_to_cluster(keyword, clusters):
|
||
"""Find best-matching SAGCluster for keyword via embedding similarity."""
|
||
# Uses semantic search (embeddings) to find best cluster match
|
||
# Returns SAGCluster or None
|
||
pass
|
||
```
|
||
|
||
#### Stage 2: AI Cluster Keywords
|
||
```python
|
||
@app.task(bind=True, max_retries=3)
|
||
def cluster_keywords(self, site_id, blueprint_context):
|
||
"""
|
||
Stage 2: Cluster keywords.
|
||
|
||
If blueprint_aware:
|
||
- SKIP AutoClusterKeywords
|
||
- Use blueprint clusters from Stage 0
|
||
Else:
|
||
- Run AutoClusterKeywords (existing function)
|
||
"""
|
||
try:
|
||
site = Site.objects.get(id=site_id)
|
||
job = Job.objects.create(
|
||
site=site,
|
||
stage=2,
|
||
status='running',
|
||
blueprint_mode=blueprint_context['pipeline_mode']
|
||
)
|
||
|
||
if blueprint_context['pipeline_mode'] == 'blueprint_aware':
|
||
# Clusters already exist from blueprint
|
||
clusters = SAGCluster.objects.filter(
|
||
blueprint_id=blueprint_context['blueprint_id']
|
||
)
|
||
job.log = f"Using {clusters.count()} blueprint clusters"
|
||
else:
|
||
# Run existing AutoClusterKeywords
|
||
clusters = AutoClusterKeywords(site_id)
|
||
job.log = f"AutoClusterKeywords created {clusters.count()} clusters"
|
||
|
||
job.status = 'completed'
|
||
job.save()
|
||
|
||
# Chain to Stage 3
|
||
generate_ideas.delay(site_id, blueprint_context)
|
||
|
||
return {'status': 'success', 'clusters': clusters.count()}
|
||
except Exception as e:
|
||
self.retry(exc=e, countdown=60)
|
||
```
|
||
|
||
#### Stage 3: Generate Content Ideas
|
||
```python
|
||
@app.task(bind=True, max_retries=3)
|
||
def generate_ideas(self, site_id, blueprint_context):
|
||
"""
|
||
Stage 3: Generate content ideas.
|
||
|
||
If blueprint_aware:
|
||
- Call GenerateIdeasWithBlueprint
|
||
- Enrich ideas with type, structure, sector
|
||
- Respect execution phases
|
||
Else:
|
||
- Call existing GenerateIdeas
|
||
"""
|
||
try:
|
||
site = Site.objects.get(id=site_id)
|
||
job = Job.objects.create(
|
||
site=site,
|
||
stage=3,
|
||
status='running',
|
||
blueprint_mode=blueprint_context['pipeline_mode']
|
||
)
|
||
|
||
if blueprint_context['pipeline_mode'] == 'blueprint_aware':
|
||
blueprint = SAGBlueprint.objects.get(id=blueprint_context['blueprint_id'])
|
||
ideas = GenerateIdeasWithBlueprint(site, blueprint)
|
||
job.log = f"Generated {len(ideas)} blueprint-aware ideas across {len(blueprint_context['execution_phases'])} phases"
|
||
else:
|
||
ideas = GenerateIdeas(site)
|
||
job.log = f"Generated {len(ideas)} legacy ideas"
|
||
|
||
job.status = 'completed'
|
||
job.save()
|
||
|
||
# Chain to Stage 4
|
||
create_tasks.delay(site_id, blueprint_context)
|
||
|
||
return {'status': 'success', 'ideas': len(ideas)}
|
||
except Exception as e:
|
||
self.retry(exc=e, countdown=60)
|
||
```
|
||
|
||
#### Stage 4: Create Writer Tasks
|
||
```python
|
||
@app.task(bind=True, max_retries=3)
|
||
def create_tasks(self, site_id, blueprint_context):
|
||
"""
|
||
Stage 4: Create writer tasks.
|
||
|
||
If blueprint_aware:
|
||
- Enrich task with sag_cluster_id and blueprint_context JSON
|
||
- Respect execution phase priority
|
||
Else:
|
||
- Create basic tasks
|
||
"""
|
||
try:
|
||
site = Site.objects.get(id=site_id)
|
||
job = Job.objects.create(
|
||
site=site,
|
||
stage=4,
|
||
status='running',
|
||
blueprint_mode=blueprint_context['pipeline_mode']
|
||
)
|
||
|
||
ideas = Idea.objects.filter(site=site, task__isnull=True)
|
||
|
||
task_count = 0
|
||
for idea in ideas:
|
||
task = Task.objects.create(
|
||
site=site,
|
||
title=idea.title,
|
||
idea=idea,
|
||
status='pending'
|
||
)
|
||
|
||
if blueprint_context['pipeline_mode'] == 'blueprint_aware' and idea.sag_cluster_id:
|
||
cluster = SAGCluster.objects.get(id=idea.sag_cluster_id)
|
||
blueprint = cluster.blueprint
|
||
|
||
task.sag_cluster_id = idea.sag_cluster_id
|
||
task.blueprint_context = {
|
||
'cluster_id': str(cluster.id),
|
||
'cluster_name': cluster.name,
|
||
'cluster_type': cluster.cluster_type,
|
||
'cluster_sector': cluster.sector,
|
||
'hub_title': blueprint.content_plan.get(str(cluster.id), {}).get('hub_title'),
|
||
'hub_url': f"{site.domain}/hubs/{cluster.name.lower().replace(' ', '-')}",
|
||
'cluster_attributes': cluster.attributes,
|
||
'content_structure': idea.structure,
|
||
'content_type': idea.content_type,
|
||
'execution_phase': idea.execution_phase,
|
||
}
|
||
task.save()
|
||
|
||
task_count += 1
|
||
|
||
job.log = f"Created {task_count} tasks"
|
||
job.status = 'completed'
|
||
job.save()
|
||
|
||
# Chain to Stage 5
|
||
generate_content.delay(site_id, blueprint_context)
|
||
|
||
return {'status': 'success', 'tasks': task_count}
|
||
except Exception as e:
|
||
self.retry(exc=e, countdown=60)
|
||
```
|
||
|
||
#### Stage 5: Generate Article Content
|
||
```python
|
||
@app.task(bind=True, max_retries=3)
|
||
def generate_content(self, site_id, blueprint_context):
|
||
"""
|
||
Stage 5: Generate article content.
|
||
|
||
If task has blueprint_context:
|
||
- Load prompt template by content_type + structure
|
||
- Inject blueprint context variables
|
||
- Call GPT-4 with enriched prompt
|
||
- Post-process for internal links
|
||
Else:
|
||
- Call existing GenerateContent
|
||
"""
|
||
try:
|
||
site = Site.objects.get(id=site_id)
|
||
job = Job.objects.create(
|
||
site=site,
|
||
stage=5,
|
||
status='running',
|
||
blueprint_mode=blueprint_context['pipeline_mode']
|
||
)
|
||
|
||
tasks = Task.objects.filter(site=site, status='completed', content__isnull=True)
|
||
|
||
content_count = 0
|
||
for task in tasks:
|
||
if task.blueprint_context:
|
||
# Blueprint-aware content generation
|
||
prompt_key = _get_prompt_key(
|
||
task.blueprint_context['content_type'],
|
||
task.blueprint_context['content_structure']
|
||
)
|
||
template = PROMPT_TEMPLATES.get(prompt_key)
|
||
|
||
# Inject variables
|
||
prompt = template.format(**task.blueprint_context)
|
||
|
||
# Call GPT-4
|
||
article = gpt4_call(prompt)
|
||
|
||
# Post-process
|
||
article = _add_internal_links(article, task.blueprint_context)
|
||
|
||
else:
|
||
# Legacy content generation
|
||
article = GenerateContent(task.idea.keyword)
|
||
|
||
content = Content.objects.create(
|
||
site=site,
|
||
title=task.title,
|
||
body=article,
|
||
task=task,
|
||
sag_cluster_id=task.sag_cluster_id,
|
||
content_type=task.blueprint_context.get('content_type') if task.blueprint_context else 'blog_post',
|
||
content_structure=task.blueprint_context.get('content_structure') if task.blueprint_context else None,
|
||
)
|
||
content_count += 1
|
||
|
||
job.log = f"Generated {content_count} articles"
|
||
job.status = 'completed'
|
||
job.save()
|
||
|
||
# Chain to Stage 6
|
||
assign_taxonomy.delay(site_id, blueprint_context)
|
||
|
||
return {'status': 'success', 'content': content_count}
|
||
except Exception as e:
|
||
self.retry(exc=e, countdown=60)
|
||
|
||
|
||
def _get_prompt_key(content_type, structure):
|
||
"""Map content_type + structure to prompt template key."""
|
||
mapping = {
|
||
('cluster_hub', 'guide_tutorial'): 'sag_hub_guide',
|
||
('cluster_hub', 'listicle'): 'sag_hub_listicle',
|
||
('blog_post', 'comparison'): 'sag_blog_comparison',
|
||
('blog_post', 'review'): 'sag_blog_review',
|
||
('blog_post', 'how_to'): 'sag_blog_howto',
|
||
('blog_post', 'question'): 'sag_blog_question',
|
||
('term_page', 'guide_tutorial'): 'sag_term_page',
|
||
('product_page', 'review'): 'sag_product_page',
|
||
('service_page', 'guide_tutorial'): 'sag_service_page',
|
||
('landing_page', 'guide_tutorial'): 'sag_landing_guide',
|
||
('landing_page', 'comparison'): 'sag_landing_comparison',
|
||
('business_page', 'guide_tutorial'): 'sag_business_guide',
|
||
}
|
||
return mapping.get((content_type, structure), 'sag_default')
|
||
|
||
|
||
def _add_internal_links(article, blueprint_context):
|
||
"""Add internal links to related cluster hubs and attribute term pages."""
|
||
# Parse article, identify linking opportunities
|
||
# Inject markdown links to related content
|
||
pass
|
||
```
|
||
|
||
#### Stage 6: Taxonomy Assignment
|
||
```python
|
||
@app.task(bind=True, max_retries=3)
|
||
def assign_taxonomy(self, site_id, blueprint_context):
|
||
"""
|
||
Stage 6: Assign content to custom WP taxonomies (blueprint mode only).
|
||
|
||
If blueprint_aware:
|
||
- Match content to cluster attributes
|
||
- Assign custom taxonomy values
|
||
- Update cluster status
|
||
Else:
|
||
- Skip stage
|
||
"""
|
||
try:
|
||
site = Site.objects.get(id=site_id)
|
||
job = Job.objects.create(
|
||
site=site,
|
||
stage=6,
|
||
status='running',
|
||
blueprint_mode=blueprint_context['pipeline_mode']
|
||
)
|
||
|
||
if blueprint_context['pipeline_mode'] != 'blueprint_aware':
|
||
job.log = "Legacy mode: taxonomy assignment skipped"
|
||
job.status = 'completed'
|
||
job.save()
|
||
generate_images.delay(site_id, blueprint_context)
|
||
return {'status': 'success', 'skipped': True}
|
||
|
||
blueprint = SAGBlueprint.objects.get(id=blueprint_context['blueprint_id'])
|
||
content_items = Content.objects.filter(site=site, sag_cluster_id__isnull=False, taxonomies__isnull=True)
|
||
|
||
assigned_count = 0
|
||
for content in content_items:
|
||
cluster = SAGCluster.objects.get(id=content.sag_cluster_id)
|
||
|
||
# Load taxonomy mapping from blueprint
|
||
tax_mapping = blueprint.wp_taxonomy_mapping.get(str(cluster.id), {})
|
||
|
||
# Assign taxonomies
|
||
content.taxonomies = tax_mapping
|
||
content.save()
|
||
|
||
# Update cluster status
|
||
if Content.objects.filter(sag_cluster_id=cluster.id).count() > 0:
|
||
if cluster.status == 'draft':
|
||
cluster.status = 'partial'
|
||
cluster.save()
|
||
|
||
assigned_count += 1
|
||
|
||
job.log = f"Assigned {assigned_count} content items to taxonomies"
|
||
job.status = 'completed'
|
||
job.save()
|
||
|
||
# Chain to Stage 7
|
||
generate_images.delay(site_id, blueprint_context)
|
||
|
||
return {'status': 'success', 'assigned': assigned_count}
|
||
except Exception as e:
|
||
self.retry(exc=e, countdown=60)
|
||
```
|
||
|
||
#### Stage 7: Image Generation
|
||
```python
|
||
@app.task(bind=True, max_retries=3)
|
||
def generate_images(self, site_id, blueprint_context):
|
||
"""
|
||
Stage 7: Generate featured and in-article images.
|
||
|
||
If blueprint_aware:
|
||
- Match image style to content type
|
||
- Use cluster theme/color palette
|
||
Else:
|
||
- Generate default style images
|
||
"""
|
||
try:
|
||
site = Site.objects.get(id=site_id)
|
||
job = Job.objects.create(
|
||
site=site,
|
||
stage=7,
|
||
status='running',
|
||
blueprint_mode=blueprint_context['pipeline_mode']
|
||
)
|
||
|
||
content_items = Content.objects.filter(site=site, image__isnull=True)
|
||
|
||
image_count = 0
|
||
for content in content_items:
|
||
if blueprint_context['pipeline_mode'] == 'blueprint_aware' and content.content_type:
|
||
# Match style to content type
|
||
style_mapping = {
|
||
'cluster_hub': 'hero',
|
||
'blog_post': 'supporting',
|
||
'product_page': 'ecommerce',
|
||
'term_page': 'category',
|
||
'service_page': 'service',
|
||
'landing_page': 'conversion',
|
||
}
|
||
style = style_mapping.get(content.content_type, 'supporting')
|
||
else:
|
||
style = 'supporting'
|
||
|
||
# Generate featured image
|
||
featured_image = GenerateImage(content.title, style)
|
||
image = Image.objects.create(
|
||
content=content,
|
||
url=featured_image['url'],
|
||
alt_text=featured_image['alt_text'],
|
||
style_type=style,
|
||
sag_cluster_id=content.sag_cluster_id,
|
||
)
|
||
image_count += 1
|
||
|
||
job.log = f"Generated {image_count} images"
|
||
job.status = 'completed'
|
||
job.save()
|
||
|
||
return {'status': 'success', 'images': image_count}
|
||
except Exception as e:
|
||
self.retry(exc=e, countdown=60)
|
||
```
|
||
|
||
---
|
||
|
||
## 4. IMPLEMENTATION STEPS
|
||
|
||
### Phase A: Data Model Extensions (Week 1)
|
||
1. Add fields to Keyword, Idea, Task, Content, Image models (see Section 3)
|
||
2. Create SAGBlueprint, SAGCluster models (reference 01A)
|
||
3. Create database migrations
|
||
4. Test model relationships and queries
|
||
|
||
### Phase B: Stage 0 Implementation (Week 1)
|
||
1. Implement `check_blueprint` Celery task
|
||
2. Add blueprint loading and caching logic
|
||
3. Create execution_priority parsing
|
||
4. Test with sample blueprints (active and inactive)
|
||
5. Add logging and error handling
|
||
|
||
### Phase C: Stage 1–2 Enhancement (Week 2)
|
||
1. Implement `_match_keyword_to_cluster` function (embedding-based matching)
|
||
2. Extend `process_keywords` task for blueprint mode
|
||
3. Modify `cluster_keywords` to skip AutoClusterKeywords when blueprint active
|
||
4. Add unmatched keyword flagging and reporting
|
||
5. Test with mixed keyword sets
|
||
|
||
### Phase D: Stage 3 Enhancement (Week 2)
|
||
1. Create `sag/ai_functions/content_planning.py` module
|
||
2. Implement `GenerateIdeasWithBlueprint` function
|
||
3. Add phase-based filtering and prioritization
|
||
4. Integrate structure/type/sector enrichment
|
||
5. Test idea generation for each content type
|
||
|
||
### Phase E: Stage 4 Enhancement (Week 3)
|
||
1. Extend `create_tasks` task with blueprint_context JSON assembly
|
||
2. Add execution_phase assignment
|
||
3. Test blueprint_context structure completeness
|
||
4. Verify sag_cluster_id linking
|
||
|
||
### Phase F: Stage 5 Enhancement (Week 3)
|
||
1. Create PROMPT_TEMPLATES dictionary with all template keys
|
||
2. Implement `_get_prompt_key` function
|
||
3. Extend `generate_content` task to use templates
|
||
4. Implement `_add_internal_links` post-processing
|
||
5. Test content generation for each content_type + structure combination
|
||
6. Validate prompt variable injection
|
||
|
||
### Phase G: Stage 6 Implementation (Week 4)
|
||
1. Implement `assign_taxonomy` task
|
||
2. Add taxonomy mapping logic from blueprint.wp_taxonomy_mapping
|
||
3. Implement cluster status updates
|
||
4. Test taxonomy assignment with sample blueprints
|
||
|
||
### Phase H: Stage 7 Enhancement (Week 4)
|
||
1. Extend `generate_images` task for blueprint mode
|
||
2. Add style_type mapping by content_type
|
||
3. Implement color palette usage from blueprint
|
||
4. Test image generation for each content type
|
||
|
||
### Phase I: Integration & Testing (Week 5)
|
||
1. Test full pipeline execution with active blueprint
|
||
2. Test full pipeline execution without blueprint (legacy mode)
|
||
3. Add integration tests for each stage transition
|
||
4. Test error handling and retries
|
||
5. Load testing with multiple concurrent sites
|
||
|
||
### Phase J: Deployment & Monitoring (Week 6)
|
||
1. Deploy models and migrations to staging
|
||
2. Deploy Celery tasks to staging
|
||
3. Validate with staging data
|
||
4. Set up pipeline execution monitoring (01G)
|
||
5. Deploy to production with feature flag (blueprint mode off by default)
|
||
|
||
---
|
||
|
||
## 5. ACCEPTANCE CRITERIA
|
||
|
||
### Functional Requirements
|
||
- **Stage 0**: Blueprint check completes successfully; mode determination accurate
|
||
- **Stage 1**: Keywords matched to clusters with 85%+ accuracy; unmatched flagged
|
||
- **Stage 2**: Legacy mode skipped when blueprint active; clusters pre-loaded
|
||
- **Stage 3**: Ideas generated with correct type/structure/sector/cluster assignment
|
||
- **Stage 4**: Tasks enriched with complete blueprint_context JSON
|
||
- **Stage 5**: Content generated using template-specific prompts; blueprint variables injected
|
||
- **Stage 6**: Content assigned to custom taxonomies; cluster status updated
|
||
- **Stage 7**: Images generated with correct style matching content type
|
||
|
||
### Quality Criteria
|
||
- **No breaking changes**: Legacy mode works identically to pre-blueprint pipeline
|
||
- **Error handling**: All Celery tasks handle failures gracefully; retry logic functional
|
||
- **Performance**: Pipeline completes within baseline timing (per site, per stage)
|
||
- **Logging**: All stages log execution details and decisions
|
||
- **Data integrity**: sag_cluster_id and blueprint_context consistently populated
|
||
|
||
### Testing Coverage
|
||
- Unit tests: Each function and task (>80% coverage)
|
||
- Integration tests: Full pipeline execution with/without blueprint
|
||
- Scenario tests:
|
||
- Active blueprint (all phases)
|
||
- Inactive blueprint (legacy mode)
|
||
- Mixed keywords (matched + unmatched)
|
||
- Multiple sites with different blueprints
|
||
- Failed tasks (retry logic)
|
||
|
||
### Documentation
|
||
- Docstrings: All functions documented with inputs/outputs
|
||
- README: Setup and execution instructions
|
||
- Troubleshooting guide: Common issues and solutions
|
||
|
||
### Monitoring (01G Health Monitoring)
|
||
- Pipeline execution time per stage per site
|
||
- Content generation success rate by content_type
|
||
- Taxonomy assignment accuracy
|
||
- Cluster completion status tracking
|
||
- Unmatched keyword trending
|
||
|
||
---
|
||
|
||
## 6. CLAUDE CODE INSTRUCTIONS
|
||
|
||
### Running the Pipeline Locally
|
||
|
||
#### Prerequisites
|
||
```bash
|
||
# Install dependencies
|
||
pip install -r requirements.txt
|
||
celery[redis] pytest pytest-django
|
||
|
||
# Set up local database
|
||
python manage.py migrate
|
||
|
||
# Start Redis (for Celery)
|
||
redis-server
|
||
```
|
||
|
||
#### Initialize Test Data
|
||
```bash
|
||
# Create sample site and blueprint
|
||
python manage.py shell << EOF
|
||
from django.contrib.auth.models import User
|
||
from sites.models import Site
|
||
from sag.models import SAGBlueprint, SAGCluster
|
||
|
||
site = Site.objects.create(name="Test Site", domain="test.local")
|
||
blueprint = SAGBlueprint.objects.create(
|
||
site=site,
|
||
name="Test Blueprint",
|
||
status="active",
|
||
execution_priority={
|
||
"phase_1": ["category_pages", "top_cluster_hubs"],
|
||
"phase_2": ["remaining_hubs"],
|
||
"phase_3": ["attribute_term_pages"],
|
||
"phase_4": ["additional_blogs"],
|
||
},
|
||
content_plan={},
|
||
wp_taxonomy_mapping={}
|
||
)
|
||
cluster = SAGCluster.objects.create(
|
||
blueprint=blueprint,
|
||
name="Test Cluster",
|
||
cluster_type="topical",
|
||
sector="Tech",
|
||
keywords=["python", "django"],
|
||
attributes=["web development", "open source"],
|
||
status="draft"
|
||
)
|
||
print(f"Created site {site.id}, blueprint {blueprint.id}, cluster {cluster.id}")
|
||
EOF
|
||
```
|
||
|
||
#### Execute Pipeline Stages
|
||
```bash
|
||
# Start Celery worker (in separate terminal)
|
||
celery -A igny8.celery_app worker --loglevel=info
|
||
|
||
# Run Stage 0: Blueprint Check
|
||
python manage.py shell << EOF
|
||
from celery_app.tasks import check_blueprint
|
||
result = check_blueprint.delay(site_id="<site-uuid>")
|
||
print(result.get())
|
||
EOF
|
||
|
||
# Run full pipeline
|
||
python manage.py shell << EOF
|
||
from celery_app.tasks import check_blueprint
|
||
from uuid import UUID
|
||
|
||
site_id = UUID("<site-uuid>")
|
||
check_blueprint.delay(site_id)
|
||
# Each stage automatically chains to the next
|
||
EOF
|
||
|
||
# Monitor pipeline execution
|
||
celery -A igny8.celery_app events
|
||
# or view logs: tail -f celery.log
|
||
```
|
||
|
||
### Testing the Pipeline
|
||
|
||
#### Unit Tests
|
||
```bash
|
||
pytest content/tests/test_pipeline.py -v
|
||
pytest sag/tests/test_blueprint.py -v
|
||
pytest celery_app/tests/test_tasks.py -v
|
||
```
|
||
|
||
#### Integration Test
|
||
```bash
|
||
pytest content/tests/test_pipeline_integration.py::test_full_blueprint_pipeline -v
|
||
|
||
# Test legacy mode
|
||
pytest content/tests/test_pipeline_integration.py::test_full_legacy_pipeline -v
|
||
|
||
# Test mixed mode (some sites with blueprint, some without)
|
||
pytest content/tests/test_pipeline_integration.py::test_mixed_mode_execution -v
|
||
```
|
||
|
||
#### Manual Test Scenario
|
||
```bash
|
||
# 1. Create test site and blueprint
|
||
python manage.py shell < scripts/setup_test_data.py
|
||
|
||
# 2. Import sample keywords
|
||
python manage.py shell << EOF
|
||
from content.models import Keyword
|
||
from sites.models import Site
|
||
site = Site.objects.get(name="Test Site")
|
||
keywords = ["python tutorial", "django rest", "web scraping"]
|
||
for kw in keywords:
|
||
Keyword.objects.create(site=site, term=kw, source='csv_import')
|
||
EOF
|
||
|
||
# 3. Run pipeline
|
||
celery -A igny8.celery_app worker --loglevel=debug &
|
||
python manage.py shell << EOF
|
||
from celery_app.tasks import check_blueprint
|
||
from sites.models import Site
|
||
site = Site.objects.get(name="Test Site")
|
||
check_blueprint.delay(site.id)
|
||
EOF
|
||
|
||
# 4. Inspect results
|
||
python manage.py shell << EOF
|
||
from content.models import Keyword, Idea, Task, Content, Image
|
||
from sites.models import Site
|
||
site = Site.objects.get(name="Test Site")
|
||
|
||
print("Keywords:", Keyword.objects.filter(site=site).count())
|
||
print("Ideas:", Idea.objects.filter(site=site).count())
|
||
print("Tasks:", Task.objects.filter(site=site).count())
|
||
print("Content:", Content.objects.filter(site=site).count())
|
||
print("Images:", Image.objects.filter(site=site).count())
|
||
|
||
# Check blueprint context
|
||
task = Task.objects.filter(site=site, blueprint_context__isnull=False).first()
|
||
if task:
|
||
print("Blueprint context:", task.blueprint_context)
|
||
EOF
|
||
```
|
||
|
||
### Debugging Common Issues
|
||
|
||
#### Blueprint Not Detected
|
||
```bash
|
||
# Check if blueprint exists and is active
|
||
python manage.py shell << EOF
|
||
from sag.models import SAGBlueprint
|
||
from sites.models import Site
|
||
site = Site.objects.get(id="<site-id>")
|
||
blueprint = SAGBlueprint.objects.filter(site=site, status='active').first()
|
||
print(f"Blueprint: {blueprint}")
|
||
if blueprint:
|
||
print(f"Status: {blueprint.status}")
|
||
print(f"Content plan: {blueprint.content_plan}")
|
||
EOF
|
||
```
|
||
|
||
#### Keywords Not Matching
|
||
```bash
|
||
# Check keyword-cluster mapping
|
||
python manage.py shell << EOF
|
||
from content.models import Keyword
|
||
from sag.models import SAGCluster
|
||
keywords = Keyword.objects.filter(sag_cluster_id__isnull=True)
|
||
print(f"Unmatched keywords: {[kw.term for kw in keywords]}")
|
||
|
||
# Check available clusters
|
||
clusters = SAGCluster.objects.all()
|
||
for cluster in clusters:
|
||
print(f"Cluster '{cluster.name}': {cluster.attributes}")
|
||
EOF
|
||
```
|
||
|
||
#### Content Not Generated
|
||
```bash
|
||
# Check task status
|
||
python manage.py shell << EOF
|
||
from content.models import Task
|
||
tasks = Task.objects.all()
|
||
for task in tasks:
|
||
print(f"Task {task.id}: status={task.status}, blueprint_context={bool(task.blueprint_context)}")
|
||
EOF
|
||
|
||
# Check Celery task logs
|
||
celery -A igny8.celery_app inspect active
|
||
celery -A igny8.celery_app inspect reserved
|
||
celery -A igny8.celery_app purge # WARNING: clears queue
|
||
```
|
||
|
||
### Extending with Custom Prompt Templates
|
||
|
||
#### Add New Template
|
||
```python
|
||
# In sag/prompt_templates.py
|
||
|
||
PROMPT_TEMPLATES = {
|
||
'sag_hub_guide': """
|
||
You are writing a comprehensive guide for {cluster_name}, a {cluster_type} in the {cluster_sector} sector.
|
||
|
||
Topic: {cluster_name}
|
||
Related terms: {attribute_terms}
|
||
Hub page: {hub_url}
|
||
|
||
Structure: Guide/Tutorial format
|
||
- Introduction: What is {cluster_name}?
|
||
- Key concepts: {attribute_terms}
|
||
- Step-by-step guide
|
||
- Common pitfalls
|
||
- Conclusion with links to {hub_title}
|
||
|
||
Write a comprehensive, SEO-optimized guide.
|
||
""",
|
||
|
||
# Add more templates here...
|
||
}
|
||
|
||
# Usage in generate_content task:
|
||
# template = PROMPT_TEMPLATES['sag_hub_guide']
|
||
# prompt = template.format(**blueprint_context)
|
||
```
|
||
|
||
### Monitoring Pipeline Health (Integration with 01G)
|
||
|
||
```bash
|
||
# View pipeline execution history
|
||
python manage.py shell << EOF
|
||
from content.models import Job
|
||
jobs = Job.objects.filter(stage=5).order_by('-created_at')[:10]
|
||
for job in jobs:
|
||
duration = (job.completed_at - job.created_at).total_seconds() if job.completed_at else None
|
||
print(f"Stage {job.stage}: {job.status} ({duration}s) - {job.blueprint_mode}")
|
||
EOF
|
||
|
||
# Check cluster completion status
|
||
python manage.py shell << EOF
|
||
from sag.models import SAGCluster
|
||
clusters = SAGCluster.objects.all()
|
||
for cluster in clusters:
|
||
content_count = cluster.content_set.count()
|
||
print(f"Cluster '{cluster.name}': {cluster.status} ({content_count} content items)")
|
||
EOF
|
||
```
|
||
|
||
---
|
||
|
||
## Cross-References
|
||
|
||
| Document | Reference Purpose |
|
||
|----------|-------------------|
|
||
| **01A**: SAG Blueprint Model | SAGBlueprint, SAGCluster models used at Stage 0 |
|
||
| **01C**: Cluster Formation | Clusters created by SAG framework; used by pipeline |
|
||
| **01D**: Setup Wizard | Creates blueprint that drives pipeline execution |
|
||
| **01F**: Case 1 Analysis | Produces blueprints that feed this pipeline |
|
||
| **01G**: Health Monitoring | Tracks pipeline output per cluster and stage |
|
||
| **Content_Types_Writing_Plan.md** | Content type definitions; prompt template structure |
|
||
|
||
---
|
||
|
||
## Summary
|
||
|
||
The Blueprint-Aware Content Pipeline enhances IGNY8's 7-stage automation with SAG framework context at every step. When a site has an active blueprint, content generation becomes strategic: keywords map to clusters, ideas inherit type/structure/sector assignments, prompts leverage cluster context, and output auto-taxonomizes. When no blueprint exists, the pipeline defaults to legacy mode unchanged.
|
||
|
||
**Key innovation**: Two-mode execution (blueprint-aware + legacy) enables gradual adoption—teams can opt in to blueprint-driven content without disrupting existing sites. **Execution priority phases** ensure foundational content (hubs) publishes before supporting content (blogs), building authority tier-by-tier.
|
||
|