# 01E: Blueprint-Aware Content Pipeline **IGNY8 Phase 1: Content Automation with SAG Blueprint Enhancement** --- ## 1. CURRENT STATE ### Existing Pipeline Architecture IGNY8's content pipeline operates as a 7-stage automated system, orchestrated via Celery with scheduled execution (daily/weekly/monthly via Celery Beat): | Stage | Function | Automation | Output | |-------|----------|-----------|--------| | 1 | Keywords | Import CSV/seed lists | Keyword list per site | | 2 | Clusters | AutoClusterKeywords (GPT-4) | Semantic keyword groups | | 3 | Ideas | GenerateIdeas | Content brief queue | | 4 | Tasks | Queue creation | Writer task list | | 5 | Content | GenerateContent (AI) | Draft articles | | 6 | Images | GenerateImages | Featured + in-article images | | 7 | Review | Editorial queue | Published content | ### Current Limitations - **Generic clustering**: All keywords grouped by semantic similarity, no business-specific structure - **One-size-fits-all content**: All articles follow same template regardless of content type - **No hierarchy**: No distinction between hub pages, blog posts, product pages, term pages, or service pages - **No priority**: All content treated equally; foundational content (hubs) may not be written first - **No taxonomy integration**: Generated content not automatically assigned to custom taxonomies - **No blueprint context**: Writers receive keywords but not strategic framework ### Celery Automation Context - **Celery Beat**: Manages recurring schedule (daily, weekly, monthly per site) - **Task Queue**: Each stage enqueued as separate Celery task - **State Tracking**: Uses Django ORM to track Job, Stage, Keyword, Cluster, Idea, Task, Content, Image models - **Failure Handling**: Retry logic, dead-letter queue for failed tasks - **Logging**: Structured logging to track execution per site per stage --- ## 2. WHAT TO BUILD ### Vision: Blueprint-Driven Pipeline When a site has an **active SAG Blueprint**, every pipeline stage becomes context-aware: - Content priorities driven by blueprint's execution phases - Content types (hub, blog, product, term, service) determined at ideation - Prompt templates matched to content structure and type - Output taxonomy-tagged and cluster-assigned automatically When **no blueprint exists**, the pipeline reverts to legacy mode—no breaking changes. ### New/Enhanced Stages #### Stage 0: Blueprint Check (NEW) Execute before pipeline stages 1–7. **Responsibility**: Determine execution mode and load context. **Logic**: ```python IF Site.sag_blueprint EXISTS AND sag_blueprint.status == 'active': LOAD blueprint IDENTIFY unfulfilled content needs from blueprint.content_plan DETERMINE execution_priority from blueprint.execution_phases SET pipeline_mode = 'blueprint_aware' ELSE: SET pipeline_mode = 'legacy' PROCEED to Stage 1 with no blueprint context ``` **Outputs**: - `pipeline_mode`: 'blueprint_aware' | 'legacy' - `blueprint_context`: SAGBlueprint instance (if active) - `execution_phases`: List of priority phases for content queue --- #### Stage 1: Keyword Processing (ENHANCED) **Legacy behavior** (no blueprint): Pass keywords to Stage 2 unchanged. **Blueprint-aware** (active blueprint): 1. For each new/imported keyword, query blueprint's SAGClusters 2. Match keyword to existing clusters based on: - Attribute overlap (e.g., keyword "sustainable farming" matches cluster with attribute "sustainability") - Semantic proximity to cluster topic - Sector alignment 3. Assign matched keyword to cluster's `keywords` list 4. Flag unmatched keywords: - **Gap**: No cluster exists for this topic - **Outlier**: Keyword semantic distance > threshold from all clusters - **Frontier**: Keyword extends cluster into new subtopic (possible new cluster) 5. Update `SAGCluster.keywords`, `SAGCluster.updated_at` **Outputs**: - Updated cluster keyword lists - Gap/outlier report for content strategy review - Flagged keywords for potential new cluster formation --- #### Stage 2: AI Cluster Keywords (ENHANCED) **Legacy behavior** (no blueprint): Run existing `AutoClusterKeywords` via GPT-4 grouping. **Blueprint-aware** (active blueprint): 1. **SKIP** `AutoClusterKeywords` entirely 2. Clusters already defined by SAG framework (Stage 0 loaded blueprint) 3. For new keywords from Stage 1: - Map to existing clusters (already done in Stage 1) - Create mapping record linking keyword → SAGCluster 4. Flag unmatched keywords (from Stage 1) for manual review 5. No new clusters created (cluster formation is Phase 1C process, not pipeline) **Outputs**: - Keyword-to-cluster mapping - Unmatched keyword report --- #### Stage 3: Generate Content Ideas (ENHANCED) **Legacy behavior** (no blueprint): Run existing `GenerateIdeas` function. **Blueprint-aware** (active blueprint): 1. Call `sag/ai_functions/content_planning.py::GenerateIdeasWithBlueprint` 2. For each idea generated, enrich with: - **Sector**: From SAGCluster.sector - **Structure**: From blueprint.content_plan[cluster].structure (e.g., 'guide_tutorial', 'comparison', 'review', 'how_to', 'question') - **Type**: From blueprint.content_plan[cluster].type (e.g., 'cluster_hub', 'blog_post', 'product_page', 'term_page', 'service_page') - **SAGCluster ID**: Link idea to blueprint cluster - **idea_source**: Set to 'sag_blueprint' 3. Respect execution phases: - Phase 1: Generate ideas for `category_pages`, `top_cluster_hubs` - Phase 2: Generate ideas for `remaining_hubs`, `first_blogs_per_cluster` - Phase 3: Generate ideas for `attribute_term_pages`, `product_enrichment` - Phase 4: Generate ideas for `additional_blogs`, `brand_comparisons` 4. Prioritize queuing by phase **Outputs**: - Idea records with type, structure, sector, cluster assignment - Execution phase assignments - Queue prioritized by phase --- #### Stage 4: Create Writer Tasks (ENHANCED) **Legacy behavior** (no blueprint): Create basic task with keyword/idea reference. **Blueprint-aware** (active blueprint): 1. For each idea, create Task with: - Standard fields: title, keyword, site, status, assigned_to - **New fields**: - `sag_cluster_id`: Reference to blueprint cluster - `blueprint_context`: JSON blob containing execution context 2. `blueprint_context` structure: ```json { "cluster_id": "uuid", "cluster_name": "string", "cluster_type": "string (topical|product|service)", "cluster_sector": "string", "hub_title": "string (cluster's main hub page title)", "hub_url": "string (blueprint.site.domain/cluster_slug)", "cluster_attributes": ["list of attribute terms"], "related_clusters": ["list of related cluster ids"], "cluster_products": ["list of product ids if product cluster"], "content_structure": "string (guide_tutorial|comparison|review|how_to|question|listicle)", "content_type": "string (cluster_hub|blog_post|product_page|term_page|service_page)", "execution_phase": "integer (1-4)", "seo_strategy": "object (primary_keyword, related_keywords, intent)" } ``` 3. If no blueprint: Create task without blueprint_context (legacy) **Outputs**: - Task records with sag_cluster_id and blueprint_context --- #### Stage 5: Generate Article Content (ENHANCED) **Legacy behavior** (no blueprint): Run existing `GenerateContent` with generic prompt. **Blueprint-aware** (has blueprint_context): 1. **Load prompt template** by content_type + content_structure combination: | Content Type | Structure | Template Key | |---|---|---| | Cluster Hub | Guide Tutorial | `sag_hub_guide` | | Cluster Hub | Top Listicle | `sag_hub_listicle` | | Blog Post | Comparison | `sag_blog_comparison` | | Blog Post | Review | `sag_blog_review` | | Blog Post | How To | `sag_blog_howto` | | Blog Post | Question | `sag_blog_question` | | Term Page | Guide Tutorial | `sag_term_page` | | Product Page | Review | `sag_product_page` | | Service Page | Guide Tutorial | `sag_service_page` | | Landing Page | Guide Tutorial | `sag_landing_guide` | | Landing Page | Comparison | `sag_landing_comparison` | | Business Page | Guide Tutorial | `sag_business_guide` | 2. **Inject blueprint context variables** into prompt template: ``` {cluster_name} → From SAGCluster.name {cluster_type} → From SAGCluster.cluster_type {cluster_sector} → From SAGCluster.sector {hub_title} → From blueprint_context.hub_title {hub_url} → From blueprint_context.hub_url {attribute_terms} → Comma-separated list from cluster attributes {cluster_products} → Product list if product cluster {related_clusters} → Related cluster names for internal linking {content_structure} → Structure type for consistency {content_type} → Content type for tone/depth ``` 3. Call GPT-4 with enriched prompt template 4. Post-process output: - Add internal links to related cluster hubs - Add cross-references to attribute term pages - Inject CTA appropriate to content type (e.g., product link for product cluster) 5. If no blueprint_context: Run legacy `GenerateContent` unchanged **Outputs**: - Content record with body, title, sag_cluster_id, content_type, content_structure --- #### Stage 6: Taxonomy Assignment (NEW) Execute after content generation, **only if blueprint exists**. **Responsibility**: Auto-assign content to custom WP taxonomies derived from blueprint. **Logic**: 1. Load site's custom taxonomies from blueprint (`SAGCluster.wp_taxonomy_mapping`) 2. For generated content: - Match content to cluster's attributes and taxonomy terms - Assign custom taxonomy values from blueprint mapping - Set `content.sag_cluster_id` (links to blueprint structure) - Update cluster status: - If first content in cluster: set `SAGCluster.status = 'partial'` - If all planned content exists: set `SAGCluster.status = 'complete'` 3. Store taxonomy assignments in `Content.taxonomies` JSON field **Outputs**: - Content records tagged with custom taxonomies - Cluster status updated to reflect content completion --- #### Stage 7: Image Generation (ENHANCED) **Legacy behavior** (no blueprint): Generate generic featured + in-article images. **Blueprint-aware** (blueprint exists): 1. Match image style to content type: - **Hub page**: Hero/authority style (professional, comprehensive) - **Blog post**: Supporting/educational (friendly, illustrative) - **Product page**: E-commerce standard (product-focused, clean) - **Term page**: Category representation (taxonomy icon or concept illustration) - **Service page**: Service illustration (professional, trustworthy) - **Landing page**: Conversion-focused (compelling, aspirational) 2. Use cluster theme/color palette from blueprint for style consistency 3. Generate alt text leveraging content_structure + cluster context 4. If no blueprint: Generate images with default style **Outputs**: - Image records with style type, alt text, sag_cluster_id --- ### Execution Priority (Blueprint-Driven) Pipeline processes content by `SAGBlueprint.execution_priority` phases: ```python execution_priority = { "phase_1": ["category_pages", "top_cluster_hubs"], "phase_2": ["remaining_hubs", "first_blogs_per_cluster"], "phase_3": ["attribute_term_pages", "product_enrichment"], "phase_4": ["additional_blogs", "brand_comparisons"] } ``` **Queue behavior**: - Stage 3 filters ideas by phase - Stage 4 prioritizes tasks by phase - Celery task enqueuing respects phase order - **Rationale**: Foundational content (hubs) published before supporting content (blogs) --- ## 3. DATA MODELS / APIs ### Related Models (from 01A, 01C, 01D) ```python # sag/models.py — SAG Blueprint Structure class SAGBlueprint(models.Model): site = ForeignKey(Site) name = CharField(max_length=255) status = CharField(choices=['draft', 'active', 'archived']) created_at = DateTimeField(auto_now_add=True) updated_at = DateTimeField(auto_now=True) # Phase-based execution plan execution_priority = JSONField(default=dict) # phases 1-4 content_plan = JSONField() # cluster_id → content specs # Taxonomy mapping to WordPress custom taxonomies wp_taxonomy_mapping = JSONField() # cluster_id → tax values class SAGCluster(models.Model): blueprint = ForeignKey(SAGBlueprint) name = CharField(max_length=255) cluster_type = CharField(choices=['topical', 'product', 'service']) sector = CharField(max_length=255) keywords = JSONField(default=list) attributes = JSONField(default=list) status = CharField(choices=['draft', 'partial', 'complete']) updated_at = DateTimeField(auto_now=True) ``` ### Pipeline Models (existing) ```python # content/models.py — Content Pipeline class Keyword(models.Model): site = ForeignKey(Site) term = CharField(max_length=255) source = CharField(choices=['csv_import', 'seed_list', 'user', 'sag_blueprint']) sag_cluster_id = UUIDField(null=True, blank=True) # NEW: links to blueprint cluster created_at = DateTimeField(auto_now_add=True) class Cluster(models.Model): site = ForeignKey(Site) name = CharField(max_length=255) keywords = JSONField(default=list) created_by = CharField(choices=['auto_cluster', 'sag_blueprint']) class Idea(models.Model): site = ForeignKey(Site) title = CharField(max_length=255) keyword = ForeignKey(Keyword) cluster = ForeignKey(Cluster, null=True) sector = CharField(max_length=255) # NEW structure = CharField(choices=['guide_tutorial', 'comparison', 'review', 'how_to', 'question', 'listicle']) # NEW content_type = CharField(choices=['cluster_hub', 'blog_post', 'product_page', 'term_page', 'service_page', 'landing_page', 'business_page']) # NEW sag_cluster_id = UUIDField(null=True, blank=True) # NEW idea_source = CharField(choices=['auto_generate', 'sag_blueprint']) # NEW execution_phase = IntegerField(null=True) # NEW: 1-4 from blueprint created_at = DateTimeField(auto_now_add=True) class Task(models.Model): site = ForeignKey(Site) title = CharField(max_length=255) idea = ForeignKey(Idea) status = CharField(choices=['pending', 'assigned', 'in_progress', 'review', 'completed']) assigned_to = ForeignKey(User, null=True) sag_cluster_id = UUIDField(null=True, blank=True) # NEW blueprint_context = JSONField(null=True, blank=True) # NEW: execution context created_at = DateTimeField(auto_now_add=True) class Content(models.Model): site = ForeignKey(Site) title = CharField(max_length=255) body = TextField() task = ForeignKey(Task, null=True) content_type = CharField(choices=['cluster_hub', 'blog_post', 'product_page', 'term_page', 'service_page', 'landing_page', 'business_page']) # NEW content_structure = CharField(choices=['guide_tutorial', 'comparison', 'review', 'how_to', 'question', 'listicle']) # NEW sag_cluster_id = UUIDField(null=True, blank=True) # NEW taxonomies = JSONField(default=dict, null=True, blank=True) # NEW: custom WP taxonomies status = CharField(choices=['draft', 'review', 'published']) created_at = DateTimeField(auto_now_add=True) class Image(models.Model): content = ForeignKey(Content) url = URLField() alt_text = CharField(max_length=255) style_type = CharField(choices=['hero', 'supporting', 'ecommerce', 'category', 'service', 'conversion']) # NEW sag_cluster_id = UUIDField(null=True, blank=True) # NEW created_at = DateTimeField(auto_now_add=True) class Job(models.Model): """Pipeline execution tracking""" site = ForeignKey(Site) status = CharField(choices=['pending', 'running', 'completed', 'failed']) stage = IntegerField(choices=[(0, 'Blueprint Check'), (1, 'Keywords'), (2, 'Cluster'), (3, 'Ideas'), (4, 'Tasks'), (5, 'Content'), (6, 'Taxonomy'), (7, 'Images')]) blueprint_mode = CharField(choices=['legacy', 'blueprint_aware']) # NEW log = TextField(default='') created_at = DateTimeField(auto_now_add=True) completed_at = DateTimeField(null=True) ``` ### API Endpoints (Celery Task Functions) #### Stage 0: Blueprint Check ```python # celery_app/tasks.py @app.task(bind=True, max_retries=3) def check_blueprint(self, site_id): """ Stage 0: Determine execution mode and load blueprint context. Returns: { 'status': 'success', 'pipeline_mode': 'blueprint_aware' | 'legacy', 'blueprint_id': 'uuid' (if active), 'execution_phases': list, 'next_stage': 1 } """ try: site = Site.objects.get(id=site_id) job = Job.objects.create(site=site, stage=0, status='running') blueprint = SAGBlueprint.objects.filter( site=site, status='active' ).first() if blueprint: result = { 'status': 'success', 'pipeline_mode': 'blueprint_aware', 'blueprint_id': str(blueprint.id), 'execution_phases': blueprint.execution_priority, } job.blueprint_mode = 'blueprint_aware' else: result = { 'status': 'success', 'pipeline_mode': 'legacy', 'blueprint_id': None, 'execution_phases': None, } job.blueprint_mode = 'legacy' job.status = 'completed' job.save() # Chain to Stage 1 process_keywords.delay(site_id, result) return result except Exception as e: self.retry(exc=e, countdown=60) ``` #### Stage 1: Keyword Processing ```python @app.task(bind=True, max_retries=3) def process_keywords(self, site_id, blueprint_context): """ Stage 1: Process keywords and optionally map to SAGClusters. If blueprint_context['pipeline_mode'] == 'blueprint_aware': - Map keywords to existing SAGClusters - Flag unmatched keywords Else: - Pass keywords to next stage unchanged """ try: site = Site.objects.get(id=site_id) job = Job.objects.create( site=site, stage=1, status='running', blueprint_mode=blueprint_context['pipeline_mode'] ) keywords = Keyword.objects.filter(site=site, sag_cluster_id__isnull=True) if blueprint_context['pipeline_mode'] == 'blueprint_aware': blueprint = SAGBlueprint.objects.get(id=blueprint_context['blueprint_id']) clusters = SAGCluster.objects.filter(blueprint=blueprint) matched_count = 0 unmatched_keywords = [] for keyword in keywords: # Semantic matching: find best cluster cluster = _match_keyword_to_cluster(keyword, clusters) if cluster: keyword.sag_cluster_id = cluster.id keyword.save() cluster.keywords.append(keyword.term) cluster.save() matched_count += 1 else: unmatched_keywords.append(keyword.term) job.log = f"Matched {matched_count} keywords. Unmatched: {unmatched_keywords}" else: job.log = "Legacy mode: keywords passed unchanged" job.status = 'completed' job.save() # Chain to Stage 2 cluster_keywords.delay(site_id, blueprint_context) return {'status': 'success', 'keywords_processed': keywords.count()} except Exception as e: self.retry(exc=e, countdown=60) def _match_keyword_to_cluster(keyword, clusters): """Find best-matching SAGCluster for keyword via embedding similarity.""" # Uses semantic search (embeddings) to find best cluster match # Returns SAGCluster or None pass ``` #### Stage 2: AI Cluster Keywords ```python @app.task(bind=True, max_retries=3) def cluster_keywords(self, site_id, blueprint_context): """ Stage 2: Cluster keywords. If blueprint_aware: - SKIP AutoClusterKeywords - Use blueprint clusters from Stage 0 Else: - Run AutoClusterKeywords (existing function) """ try: site = Site.objects.get(id=site_id) job = Job.objects.create( site=site, stage=2, status='running', blueprint_mode=blueprint_context['pipeline_mode'] ) if blueprint_context['pipeline_mode'] == 'blueprint_aware': # Clusters already exist from blueprint clusters = SAGCluster.objects.filter( blueprint_id=blueprint_context['blueprint_id'] ) job.log = f"Using {clusters.count()} blueprint clusters" else: # Run existing AutoClusterKeywords clusters = AutoClusterKeywords(site_id) job.log = f"AutoClusterKeywords created {clusters.count()} clusters" job.status = 'completed' job.save() # Chain to Stage 3 generate_ideas.delay(site_id, blueprint_context) return {'status': 'success', 'clusters': clusters.count()} except Exception as e: self.retry(exc=e, countdown=60) ``` #### Stage 3: Generate Content Ideas ```python @app.task(bind=True, max_retries=3) def generate_ideas(self, site_id, blueprint_context): """ Stage 3: Generate content ideas. If blueprint_aware: - Call GenerateIdeasWithBlueprint - Enrich ideas with type, structure, sector - Respect execution phases Else: - Call existing GenerateIdeas """ try: site = Site.objects.get(id=site_id) job = Job.objects.create( site=site, stage=3, status='running', blueprint_mode=blueprint_context['pipeline_mode'] ) if blueprint_context['pipeline_mode'] == 'blueprint_aware': blueprint = SAGBlueprint.objects.get(id=blueprint_context['blueprint_id']) ideas = GenerateIdeasWithBlueprint(site, blueprint) job.log = f"Generated {len(ideas)} blueprint-aware ideas across {len(blueprint_context['execution_phases'])} phases" else: ideas = GenerateIdeas(site) job.log = f"Generated {len(ideas)} legacy ideas" job.status = 'completed' job.save() # Chain to Stage 4 create_tasks.delay(site_id, blueprint_context) return {'status': 'success', 'ideas': len(ideas)} except Exception as e: self.retry(exc=e, countdown=60) ``` #### Stage 4: Create Writer Tasks ```python @app.task(bind=True, max_retries=3) def create_tasks(self, site_id, blueprint_context): """ Stage 4: Create writer tasks. If blueprint_aware: - Enrich task with sag_cluster_id and blueprint_context JSON - Respect execution phase priority Else: - Create basic tasks """ try: site = Site.objects.get(id=site_id) job = Job.objects.create( site=site, stage=4, status='running', blueprint_mode=blueprint_context['pipeline_mode'] ) ideas = Idea.objects.filter(site=site, task__isnull=True) task_count = 0 for idea in ideas: task = Task.objects.create( site=site, title=idea.title, idea=idea, status='pending' ) if blueprint_context['pipeline_mode'] == 'blueprint_aware' and idea.sag_cluster_id: cluster = SAGCluster.objects.get(id=idea.sag_cluster_id) blueprint = cluster.blueprint task.sag_cluster_id = idea.sag_cluster_id task.blueprint_context = { 'cluster_id': str(cluster.id), 'cluster_name': cluster.name, 'cluster_type': cluster.cluster_type, 'cluster_sector': cluster.sector, 'hub_title': blueprint.content_plan.get(str(cluster.id), {}).get('hub_title'), 'hub_url': f"{site.domain}/hubs/{cluster.name.lower().replace(' ', '-')}", 'cluster_attributes': cluster.attributes, 'content_structure': idea.structure, 'content_type': idea.content_type, 'execution_phase': idea.execution_phase, } task.save() task_count += 1 job.log = f"Created {task_count} tasks" job.status = 'completed' job.save() # Chain to Stage 5 generate_content.delay(site_id, blueprint_context) return {'status': 'success', 'tasks': task_count} except Exception as e: self.retry(exc=e, countdown=60) ``` #### Stage 5: Generate Article Content ```python @app.task(bind=True, max_retries=3) def generate_content(self, site_id, blueprint_context): """ Stage 5: Generate article content. If task has blueprint_context: - Load prompt template by content_type + structure - Inject blueprint context variables - Call GPT-4 with enriched prompt - Post-process for internal links Else: - Call existing GenerateContent """ try: site = Site.objects.get(id=site_id) job = Job.objects.create( site=site, stage=5, status='running', blueprint_mode=blueprint_context['pipeline_mode'] ) tasks = Task.objects.filter(site=site, status='completed', content__isnull=True) content_count = 0 for task in tasks: if task.blueprint_context: # Blueprint-aware content generation prompt_key = _get_prompt_key( task.blueprint_context['content_type'], task.blueprint_context['content_structure'] ) template = PROMPT_TEMPLATES.get(prompt_key) # Inject variables prompt = template.format(**task.blueprint_context) # Call GPT-4 article = gpt4_call(prompt) # Post-process article = _add_internal_links(article, task.blueprint_context) else: # Legacy content generation article = GenerateContent(task.idea.keyword) content = Content.objects.create( site=site, title=task.title, body=article, task=task, sag_cluster_id=task.sag_cluster_id, content_type=task.blueprint_context.get('content_type') if task.blueprint_context else 'blog_post', content_structure=task.blueprint_context.get('content_structure') if task.blueprint_context else None, ) content_count += 1 job.log = f"Generated {content_count} articles" job.status = 'completed' job.save() # Chain to Stage 6 assign_taxonomy.delay(site_id, blueprint_context) return {'status': 'success', 'content': content_count} except Exception as e: self.retry(exc=e, countdown=60) def _get_prompt_key(content_type, structure): """Map content_type + structure to prompt template key.""" mapping = { ('cluster_hub', 'guide_tutorial'): 'sag_hub_guide', ('cluster_hub', 'listicle'): 'sag_hub_listicle', ('blog_post', 'comparison'): 'sag_blog_comparison', ('blog_post', 'review'): 'sag_blog_review', ('blog_post', 'how_to'): 'sag_blog_howto', ('blog_post', 'question'): 'sag_blog_question', ('term_page', 'guide_tutorial'): 'sag_term_page', ('product_page', 'review'): 'sag_product_page', ('service_page', 'guide_tutorial'): 'sag_service_page', ('landing_page', 'guide_tutorial'): 'sag_landing_guide', ('landing_page', 'comparison'): 'sag_landing_comparison', ('business_page', 'guide_tutorial'): 'sag_business_guide', } return mapping.get((content_type, structure), 'sag_default') def _add_internal_links(article, blueprint_context): """Add internal links to related cluster hubs and attribute term pages.""" # Parse article, identify linking opportunities # Inject markdown links to related content pass ``` #### Stage 6: Taxonomy Assignment ```python @app.task(bind=True, max_retries=3) def assign_taxonomy(self, site_id, blueprint_context): """ Stage 6: Assign content to custom WP taxonomies (blueprint mode only). If blueprint_aware: - Match content to cluster attributes - Assign custom taxonomy values - Update cluster status Else: - Skip stage """ try: site = Site.objects.get(id=site_id) job = Job.objects.create( site=site, stage=6, status='running', blueprint_mode=blueprint_context['pipeline_mode'] ) if blueprint_context['pipeline_mode'] != 'blueprint_aware': job.log = "Legacy mode: taxonomy assignment skipped" job.status = 'completed' job.save() generate_images.delay(site_id, blueprint_context) return {'status': 'success', 'skipped': True} blueprint = SAGBlueprint.objects.get(id=blueprint_context['blueprint_id']) content_items = Content.objects.filter(site=site, sag_cluster_id__isnull=False, taxonomies__isnull=True) assigned_count = 0 for content in content_items: cluster = SAGCluster.objects.get(id=content.sag_cluster_id) # Load taxonomy mapping from blueprint tax_mapping = blueprint.wp_taxonomy_mapping.get(str(cluster.id), {}) # Assign taxonomies content.taxonomies = tax_mapping content.save() # Update cluster status if Content.objects.filter(sag_cluster_id=cluster.id).count() > 0: if cluster.status == 'draft': cluster.status = 'partial' cluster.save() assigned_count += 1 job.log = f"Assigned {assigned_count} content items to taxonomies" job.status = 'completed' job.save() # Chain to Stage 7 generate_images.delay(site_id, blueprint_context) return {'status': 'success', 'assigned': assigned_count} except Exception as e: self.retry(exc=e, countdown=60) ``` #### Stage 7: Image Generation ```python @app.task(bind=True, max_retries=3) def generate_images(self, site_id, blueprint_context): """ Stage 7: Generate featured and in-article images. If blueprint_aware: - Match image style to content type - Use cluster theme/color palette Else: - Generate default style images """ try: site = Site.objects.get(id=site_id) job = Job.objects.create( site=site, stage=7, status='running', blueprint_mode=blueprint_context['pipeline_mode'] ) content_items = Content.objects.filter(site=site, image__isnull=True) image_count = 0 for content in content_items: if blueprint_context['pipeline_mode'] == 'blueprint_aware' and content.content_type: # Match style to content type style_mapping = { 'cluster_hub': 'hero', 'blog_post': 'supporting', 'product_page': 'ecommerce', 'term_page': 'category', 'service_page': 'service', 'landing_page': 'conversion', } style = style_mapping.get(content.content_type, 'supporting') else: style = 'supporting' # Generate featured image featured_image = GenerateImage(content.title, style) image = Image.objects.create( content=content, url=featured_image['url'], alt_text=featured_image['alt_text'], style_type=style, sag_cluster_id=content.sag_cluster_id, ) image_count += 1 job.log = f"Generated {image_count} images" job.status = 'completed' job.save() return {'status': 'success', 'images': image_count} except Exception as e: self.retry(exc=e, countdown=60) ``` --- ## 4. IMPLEMENTATION STEPS ### Phase A: Data Model Extensions (Week 1) 1. Add fields to Keyword, Idea, Task, Content, Image models (see Section 3) 2. Create SAGBlueprint, SAGCluster models (reference 01A) 3. Create database migrations 4. Test model relationships and queries ### Phase B: Stage 0 Implementation (Week 1) 1. Implement `check_blueprint` Celery task 2. Add blueprint loading and caching logic 3. Create execution_priority parsing 4. Test with sample blueprints (active and inactive) 5. Add logging and error handling ### Phase C: Stage 1–2 Enhancement (Week 2) 1. Implement `_match_keyword_to_cluster` function (embedding-based matching) 2. Extend `process_keywords` task for blueprint mode 3. Modify `cluster_keywords` to skip AutoClusterKeywords when blueprint active 4. Add unmatched keyword flagging and reporting 5. Test with mixed keyword sets ### Phase D: Stage 3 Enhancement (Week 2) 1. Create `sag/ai_functions/content_planning.py` module 2. Implement `GenerateIdeasWithBlueprint` function 3. Add phase-based filtering and prioritization 4. Integrate structure/type/sector enrichment 5. Test idea generation for each content type ### Phase E: Stage 4 Enhancement (Week 3) 1. Extend `create_tasks` task with blueprint_context JSON assembly 2. Add execution_phase assignment 3. Test blueprint_context structure completeness 4. Verify sag_cluster_id linking ### Phase F: Stage 5 Enhancement (Week 3) 1. Create PROMPT_TEMPLATES dictionary with all template keys 2. Implement `_get_prompt_key` function 3. Extend `generate_content` task to use templates 4. Implement `_add_internal_links` post-processing 5. Test content generation for each content_type + structure combination 6. Validate prompt variable injection ### Phase G: Stage 6 Implementation (Week 4) 1. Implement `assign_taxonomy` task 2. Add taxonomy mapping logic from blueprint.wp_taxonomy_mapping 3. Implement cluster status updates 4. Test taxonomy assignment with sample blueprints ### Phase H: Stage 7 Enhancement (Week 4) 1. Extend `generate_images` task for blueprint mode 2. Add style_type mapping by content_type 3. Implement color palette usage from blueprint 4. Test image generation for each content type ### Phase I: Integration & Testing (Week 5) 1. Test full pipeline execution with active blueprint 2. Test full pipeline execution without blueprint (legacy mode) 3. Add integration tests for each stage transition 4. Test error handling and retries 5. Load testing with multiple concurrent sites ### Phase J: Deployment & Monitoring (Week 6) 1. Deploy models and migrations to staging 2. Deploy Celery tasks to staging 3. Validate with staging data 4. Set up pipeline execution monitoring (01G) 5. Deploy to production with feature flag (blueprint mode off by default) --- ## 5. ACCEPTANCE CRITERIA ### Functional Requirements - **Stage 0**: Blueprint check completes successfully; mode determination accurate - **Stage 1**: Keywords matched to clusters with 85%+ accuracy; unmatched flagged - **Stage 2**: Legacy mode skipped when blueprint active; clusters pre-loaded - **Stage 3**: Ideas generated with correct type/structure/sector/cluster assignment - **Stage 4**: Tasks enriched with complete blueprint_context JSON - **Stage 5**: Content generated using template-specific prompts; blueprint variables injected - **Stage 6**: Content assigned to custom taxonomies; cluster status updated - **Stage 7**: Images generated with correct style matching content type ### Quality Criteria - **No breaking changes**: Legacy mode works identically to pre-blueprint pipeline - **Error handling**: All Celery tasks handle failures gracefully; retry logic functional - **Performance**: Pipeline completes within baseline timing (per site, per stage) - **Logging**: All stages log execution details and decisions - **Data integrity**: sag_cluster_id and blueprint_context consistently populated ### Testing Coverage - Unit tests: Each function and task (>80% coverage) - Integration tests: Full pipeline execution with/without blueprint - Scenario tests: - Active blueprint (all phases) - Inactive blueprint (legacy mode) - Mixed keywords (matched + unmatched) - Multiple sites with different blueprints - Failed tasks (retry logic) ### Documentation - Docstrings: All functions documented with inputs/outputs - README: Setup and execution instructions - Troubleshooting guide: Common issues and solutions ### Monitoring (01G Health Monitoring) - Pipeline execution time per stage per site - Content generation success rate by content_type - Taxonomy assignment accuracy - Cluster completion status tracking - Unmatched keyword trending --- ## 6. CLAUDE CODE INSTRUCTIONS ### Running the Pipeline Locally #### Prerequisites ```bash # Install dependencies pip install -r requirements.txt celery[redis] pytest pytest-django # Set up local database python manage.py migrate # Start Redis (for Celery) redis-server ``` #### Initialize Test Data ```bash # Create sample site and blueprint python manage.py shell << EOF from django.contrib.auth.models import User from sites.models import Site from sag.models import SAGBlueprint, SAGCluster site = Site.objects.create(name="Test Site", domain="test.local") blueprint = SAGBlueprint.objects.create( site=site, name="Test Blueprint", status="active", execution_priority={ "phase_1": ["category_pages", "top_cluster_hubs"], "phase_2": ["remaining_hubs"], "phase_3": ["attribute_term_pages"], "phase_4": ["additional_blogs"], }, content_plan={}, wp_taxonomy_mapping={} ) cluster = SAGCluster.objects.create( blueprint=blueprint, name="Test Cluster", cluster_type="topical", sector="Tech", keywords=["python", "django"], attributes=["web development", "open source"], status="draft" ) print(f"Created site {site.id}, blueprint {blueprint.id}, cluster {cluster.id}") EOF ``` #### Execute Pipeline Stages ```bash # Start Celery worker (in separate terminal) celery -A igny8.celery_app worker --loglevel=info # Run Stage 0: Blueprint Check python manage.py shell << EOF from celery_app.tasks import check_blueprint result = check_blueprint.delay(site_id="") print(result.get()) EOF # Run full pipeline python manage.py shell << EOF from celery_app.tasks import check_blueprint from uuid import UUID site_id = UUID("") check_blueprint.delay(site_id) # Each stage automatically chains to the next EOF # Monitor pipeline execution celery -A igny8.celery_app events # or view logs: tail -f celery.log ``` ### Testing the Pipeline #### Unit Tests ```bash pytest content/tests/test_pipeline.py -v pytest sag/tests/test_blueprint.py -v pytest celery_app/tests/test_tasks.py -v ``` #### Integration Test ```bash pytest content/tests/test_pipeline_integration.py::test_full_blueprint_pipeline -v # Test legacy mode pytest content/tests/test_pipeline_integration.py::test_full_legacy_pipeline -v # Test mixed mode (some sites with blueprint, some without) pytest content/tests/test_pipeline_integration.py::test_mixed_mode_execution -v ``` #### Manual Test Scenario ```bash # 1. Create test site and blueprint python manage.py shell < scripts/setup_test_data.py # 2. Import sample keywords python manage.py shell << EOF from content.models import Keyword from sites.models import Site site = Site.objects.get(name="Test Site") keywords = ["python tutorial", "django rest", "web scraping"] for kw in keywords: Keyword.objects.create(site=site, term=kw, source='csv_import') EOF # 3. Run pipeline celery -A igny8.celery_app worker --loglevel=debug & python manage.py shell << EOF from celery_app.tasks import check_blueprint from sites.models import Site site = Site.objects.get(name="Test Site") check_blueprint.delay(site.id) EOF # 4. Inspect results python manage.py shell << EOF from content.models import Keyword, Idea, Task, Content, Image from sites.models import Site site = Site.objects.get(name="Test Site") print("Keywords:", Keyword.objects.filter(site=site).count()) print("Ideas:", Idea.objects.filter(site=site).count()) print("Tasks:", Task.objects.filter(site=site).count()) print("Content:", Content.objects.filter(site=site).count()) print("Images:", Image.objects.filter(site=site).count()) # Check blueprint context task = Task.objects.filter(site=site, blueprint_context__isnull=False).first() if task: print("Blueprint context:", task.blueprint_context) EOF ``` ### Debugging Common Issues #### Blueprint Not Detected ```bash # Check if blueprint exists and is active python manage.py shell << EOF from sag.models import SAGBlueprint from sites.models import Site site = Site.objects.get(id="") blueprint = SAGBlueprint.objects.filter(site=site, status='active').first() print(f"Blueprint: {blueprint}") if blueprint: print(f"Status: {blueprint.status}") print(f"Content plan: {blueprint.content_plan}") EOF ``` #### Keywords Not Matching ```bash # Check keyword-cluster mapping python manage.py shell << EOF from content.models import Keyword from sag.models import SAGCluster keywords = Keyword.objects.filter(sag_cluster_id__isnull=True) print(f"Unmatched keywords: {[kw.term for kw in keywords]}") # Check available clusters clusters = SAGCluster.objects.all() for cluster in clusters: print(f"Cluster '{cluster.name}': {cluster.attributes}") EOF ``` #### Content Not Generated ```bash # Check task status python manage.py shell << EOF from content.models import Task tasks = Task.objects.all() for task in tasks: print(f"Task {task.id}: status={task.status}, blueprint_context={bool(task.blueprint_context)}") EOF # Check Celery task logs celery -A igny8.celery_app inspect active celery -A igny8.celery_app inspect reserved celery -A igny8.celery_app purge # WARNING: clears queue ``` ### Extending with Custom Prompt Templates #### Add New Template ```python # In sag/prompt_templates.py PROMPT_TEMPLATES = { 'sag_hub_guide': """ You are writing a comprehensive guide for {cluster_name}, a {cluster_type} in the {cluster_sector} sector. Topic: {cluster_name} Related terms: {attribute_terms} Hub page: {hub_url} Structure: Guide/Tutorial format - Introduction: What is {cluster_name}? - Key concepts: {attribute_terms} - Step-by-step guide - Common pitfalls - Conclusion with links to {hub_title} Write a comprehensive, SEO-optimized guide. """, # Add more templates here... } # Usage in generate_content task: # template = PROMPT_TEMPLATES['sag_hub_guide'] # prompt = template.format(**blueprint_context) ``` ### Monitoring Pipeline Health (Integration with 01G) ```bash # View pipeline execution history python manage.py shell << EOF from content.models import Job jobs = Job.objects.filter(stage=5).order_by('-created_at')[:10] for job in jobs: duration = (job.completed_at - job.created_at).total_seconds() if job.completed_at else None print(f"Stage {job.stage}: {job.status} ({duration}s) - {job.blueprint_mode}") EOF # Check cluster completion status python manage.py shell << EOF from sag.models import SAGCluster clusters = SAGCluster.objects.all() for cluster in clusters: content_count = cluster.content_set.count() print(f"Cluster '{cluster.name}': {cluster.status} ({content_count} content items)") EOF ``` --- ## Cross-References | Document | Reference Purpose | |----------|-------------------| | **01A**: SAG Blueprint Model | SAGBlueprint, SAGCluster models used at Stage 0 | | **01C**: Cluster Formation | Clusters created by SAG framework; used by pipeline | | **01D**: Setup Wizard | Creates blueprint that drives pipeline execution | | **01F**: Case 1 Analysis | Produces blueprints that feed this pipeline | | **01G**: Health Monitoring | Tracks pipeline output per cluster and stage | | **Content_Types_Writing_Plan.md** | Content type definitions; prompt template structure | --- ## Summary The Blueprint-Aware Content Pipeline enhances IGNY8's 7-stage automation with SAG framework context at every step. When a site has an active blueprint, content generation becomes strategic: keywords map to clusters, ideas inherit type/structure/sector assignments, prompts leverage cluster context, and output auto-taxonomizes. When no blueprint exists, the pipeline defaults to legacy mode unchanged. **Key innovation**: Two-mode execution (blueprint-aware + legacy) enables gradual adoption—teams can opt in to blueprint-driven content without disrupting existing sites. **Execution priority phases** ensure foundational content (hubs) publishes before supporting content (blogs), building authority tier-by-tier.