Implement Stage 3: Enhance content metadata and validation features

- Added entity metadata fields to the Tasks model, including entity_type, taxonomy, and cluster_role.
- Updated CandidateEngine to prioritize content relevance based on cluster mappings.
- Introduced metadata completeness scoring in ContentAnalyzer.
- Enhanced validation services to check for entity type and mapping completeness.
- Updated frontend components to display and validate new metadata fields.
- Implemented API endpoints for content validation and metadata persistence.
- Migrated existing data to populate new metadata fields for Tasks and Content.
This commit is contained in:
IGNY8 VPS (Salman)
2025-11-19 19:21:30 +00:00
parent 38f6026e73
commit bae9ea47d8
33 changed files with 2388 additions and 73 deletions

Binary file not shown.

View File

@@ -53,6 +53,46 @@ class Tasks(SiteSectorBaseModel):
content_type = models.CharField(max_length=50, choices=CONTENT_TYPE_CHOICES, default='blog_post')
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='queued')
# Stage 3: Entity metadata fields
ENTITY_TYPE_CHOICES = [
('blog_post', 'Blog Post'),
('article', 'Article'),
('product', 'Product'),
('service', 'Service Page'),
('taxonomy', 'Taxonomy Page'),
('page', 'Page'),
]
CLUSTER_ROLE_CHOICES = [
('hub', 'Hub Page'),
('supporting', 'Supporting Page'),
('attribute', 'Attribute Page'),
]
entity_type = models.CharField(
max_length=50,
choices=ENTITY_TYPE_CHOICES,
default='blog_post',
db_index=True,
blank=True,
null=True,
help_text="Type of content entity (inherited from idea/blueprint)"
)
taxonomy = models.ForeignKey(
'site_building.SiteBlueprintTaxonomy',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='tasks',
help_text="Taxonomy association when derived from blueprint planning"
)
cluster_role = models.CharField(
max_length=50,
choices=CLUSTER_ROLE_CHOICES,
default='hub',
blank=True,
null=True,
help_text="Role within the cluster-driven sitemap"
)
# Content fields
content = models.TextField(blank=True, null=True) # Generated content
word_count = models.IntegerField(default=0)
@@ -78,6 +118,8 @@ class Tasks(SiteSectorBaseModel):
models.Index(fields=['status']),
models.Index(fields=['cluster']),
models.Index(fields=['content_type']),
models.Index(fields=['entity_type']),
models.Index(fields=['cluster_role']),
models.Index(fields=['site', 'sector']),
]

View File

@@ -0,0 +1,116 @@
"""
Metadata Mapping Service
Stage 3: Persists cluster/taxonomy/attribute mappings from Tasks to Content
"""
import logging
from typing import Optional
from django.db import transaction
from igny8_core.business.content.models import (
Tasks,
Content,
ContentClusterMap,
ContentTaxonomyMap,
ContentAttributeMap,
)
logger = logging.getLogger(__name__)
class MetadataMappingService:
"""Service for persisting metadata mappings from Tasks to Content"""
@transaction.atomic
def persist_task_metadata_to_content(self, content: Content) -> None:
"""
Persist cluster/taxonomy/attribute mappings from Task to Content.
Args:
content: Content instance with an associated task
"""
if not content.task:
logger.warning(f"Content {content.id} has no associated task, skipping metadata mapping")
return
task = content.task
# Stage 3: Persist cluster mapping if task has cluster
if task.cluster:
ContentClusterMap.objects.get_or_create(
content=content,
cluster=task.cluster,
role=task.cluster_role or 'hub',
defaults={
'account': content.account,
'site': content.site,
'sector': content.sector,
'source': 'blueprint' if task.idea else 'manual',
'metadata': {},
}
)
logger.info(f"Created cluster mapping for content {content.id} -> cluster {task.cluster.id}")
# Stage 3: Persist taxonomy mapping if task has taxonomy
if task.taxonomy:
ContentTaxonomyMap.objects.get_or_create(
content=content,
taxonomy=task.taxonomy,
defaults={
'account': content.account,
'site': content.site,
'sector': content.sector,
'source': 'blueprint',
'metadata': {},
}
)
logger.info(f"Created taxonomy mapping for content {content.id} -> taxonomy {task.taxonomy.id}")
# Stage 3: Inherit entity_type from task
if task.entity_type and not content.entity_type:
content.entity_type = task.entity_type
content.save(update_fields=['entity_type'])
logger.info(f"Set entity_type {task.entity_type} for content {content.id}")
# Stage 3: Extract attributes from task metadata if available
# This can be extended to parse task.description or task.metadata for attributes
# For now, we'll rely on explicit attribute data in future enhancements
@transaction.atomic
def backfill_content_metadata(self, content: Content) -> None:
"""
Backfill metadata mappings for existing content that may be missing mappings.
Args:
content: Content instance to backfill
"""
# If content already has mappings, skip
if ContentClusterMap.objects.filter(content=content).exists():
return
# Try to infer from task
if content.task:
self.persist_task_metadata_to_content(content)
return
# Try to infer from content metadata
if content.metadata:
cluster_id = content.metadata.get('cluster_id')
if cluster_id:
from igny8_core.business.planning.models import Clusters
try:
cluster = Clusters.objects.get(id=cluster_id)
ContentClusterMap.objects.get_or_create(
content=content,
cluster=cluster,
role='hub', # Default
defaults={
'account': content.account,
'site': content.site,
'sector': content.sector,
'source': 'manual',
'metadata': {},
}
)
except Clusters.DoesNotExist:
logger.warning(f"Cluster {cluster_id} not found for content {content.id}")

View File

@@ -0,0 +1,170 @@
"""
Content Validation Service
Stage 3: Validates content metadata before publish
"""
import logging
from typing import List, Dict, Optional
from django.core.exceptions import ValidationError
from igny8_core.business.content.models import Tasks, Content
logger = logging.getLogger(__name__)
class ContentValidationService:
"""Service for validating content metadata requirements"""
def validate_task(self, task: Tasks) -> List[Dict[str, str]]:
"""
Validate a task has required metadata.
Args:
task: Task instance to validate
Returns:
List of validation errors (empty if valid)
"""
errors = []
# Stage 3: Enforce "no cluster, no task" rule when feature flag enabled
from django.conf import settings
if getattr(settings, 'USE_SITE_BUILDER_REFACTOR', False):
if not task.cluster:
errors.append({
'field': 'cluster',
'code': 'missing_cluster',
'message': 'Task must be associated with a cluster before content generation',
})
# Stage 3: Validate entity_type is set
if not task.entity_type:
errors.append({
'field': 'entity_type',
'code': 'missing_entity_type',
'message': 'Task must have an entity type specified',
})
# Stage 3: Validate taxonomy for product/service entities
if task.entity_type in ['product', 'service']:
if not task.taxonomy:
errors.append({
'field': 'taxonomy',
'code': 'missing_taxonomy',
'message': f'{task.entity_type.title()} tasks require a taxonomy association',
})
return errors
def validate_content(self, content: Content) -> List[Dict[str, str]]:
"""
Validate content has required metadata before publish.
Args:
content: Content instance to validate
Returns:
List of validation errors (empty if valid)
"""
errors = []
# Stage 3: Validate entity_type
if not content.entity_type:
errors.append({
'field': 'entity_type',
'code': 'missing_entity_type',
'message': 'Content must have an entity type specified',
})
# Stage 3: Validate cluster mapping exists for IGNY8 content
if content.source == 'igny8':
from igny8_core.business.content.models import ContentClusterMap
if not ContentClusterMap.objects.filter(content=content).exists():
errors.append({
'field': 'cluster_mapping',
'code': 'missing_cluster_mapping',
'message': 'Content must be mapped to at least one cluster',
})
# Stage 3: Validate taxonomy for product/service content
if content.entity_type in ['product', 'service']:
from igny8_core.business.content.models import ContentTaxonomyMap
if not ContentTaxonomyMap.objects.filter(content=content).exists():
errors.append({
'field': 'taxonomy_mapping',
'code': 'missing_taxonomy_mapping',
'message': f'{content.entity_type.title()} content requires a taxonomy mapping',
})
# Stage 3: Validate required attributes for products
if content.entity_type == 'product':
from igny8_core.business.content.models import ContentAttributeMap
required_attrs = ['price', 'sku', 'category']
existing_attrs = ContentAttributeMap.objects.filter(
content=content,
name__in=required_attrs
).values_list('name', flat=True)
missing_attrs = set(required_attrs) - set(existing_attrs)
if missing_attrs:
errors.append({
'field': 'attributes',
'code': 'missing_attributes',
'message': f'Product content requires attributes: {", ".join(missing_attrs)}',
})
return errors
def validate_for_publish(self, content: Content) -> List[Dict[str, str]]:
"""
Comprehensive validation before publishing content.
Args:
content: Content instance to validate
Returns:
List of validation errors (empty if ready to publish)
"""
errors = []
# Basic content validation
errors.extend(self.validate_content(content))
# Additional publish requirements
if not content.title:
errors.append({
'field': 'title',
'code': 'missing_title',
'message': 'Content must have a title before publishing',
})
if not content.html_content or len(content.html_content.strip()) < 100:
errors.append({
'field': 'html_content',
'code': 'insufficient_content',
'message': 'Content must have at least 100 characters before publishing',
})
return errors
def ensure_required_attributes(self, task: Tasks) -> List[Dict[str, str]]:
"""
Check if task has required attributes based on entity type.
Args:
task: Task instance to check
Returns:
List of missing attribute errors
"""
errors = []
if task.entity_type == 'product':
# Products should have taxonomy and cluster
if not task.taxonomy:
errors.append({
'field': 'taxonomy',
'code': 'missing_taxonomy',
'message': 'Product tasks require a taxonomy (product category)',
})
return errors

View File

@@ -40,6 +40,9 @@ class CandidateEngine:
def _find_relevant_content(self, content: Content) -> List[Content]:
"""Find relevant content from same account/site/sector"""
# Stage 3: Use cluster mappings for better relevance
from igny8_core.business.content.models import ContentClusterMap
# Get content from same account, site, and sector
queryset = Content.objects.filter(
account=content.account,
@@ -48,7 +51,25 @@ class CandidateEngine:
status__in=['draft', 'review', 'publish']
).exclude(id=content.id)
# Filter by keywords if available
# Stage 3: Prioritize content from same cluster
content_clusters = ContentClusterMap.objects.filter(
content=content
).values_list('cluster_id', flat=True)
if content_clusters:
# Find content mapped to same clusters
cluster_content_ids = ContentClusterMap.objects.filter(
cluster_id__in=content_clusters
).exclude(content=content).values_list('content_id', flat=True).distinct()
# Prioritize cluster-matched content
cluster_matched = queryset.filter(id__in=cluster_content_ids)
other_content = queryset.exclude(id__in=cluster_content_ids)
# Combine: cluster-matched first, then others
return list(cluster_matched[:30]) + list(other_content[:20])
# Fallback to keyword-based filtering
if content.primary_keyword:
queryset = queryset.filter(
models.Q(primary_keyword__icontains=content.primary_keyword) |
@@ -59,38 +80,72 @@ class CandidateEngine:
def _score_candidates(self, content: Content, candidates: List[Content]) -> List[Dict]:
"""Score candidates based on relevance"""
from igny8_core.business.content.models import ContentClusterMap, ContentTaxonomyMap
# Stage 3: Get cluster mappings for content
content_clusters = set(
ContentClusterMap.objects.filter(content=content)
.values_list('cluster_id', flat=True)
)
content_taxonomies = set(
ContentTaxonomyMap.objects.filter(content=content)
.values_list('taxonomy_id', flat=True)
)
scored = []
for candidate in candidates:
score = 0
# Keyword overlap (higher weight)
# Stage 3: Cluster matching (highest priority)
candidate_clusters = set(
ContentClusterMap.objects.filter(content=candidate)
.values_list('cluster_id', flat=True)
)
cluster_overlap = content_clusters & candidate_clusters
if cluster_overlap:
score += 50 * len(cluster_overlap) # High weight for cluster matches
# Stage 3: Taxonomy matching
candidate_taxonomies = set(
ContentTaxonomyMap.objects.filter(content=candidate)
.values_list('taxonomy_id', flat=True)
)
taxonomy_overlap = content_taxonomies & candidate_taxonomies
if taxonomy_overlap:
score += 20 * len(taxonomy_overlap)
# Stage 3: Entity type matching
if content.entity_type == candidate.entity_type:
score += 15
# Keyword overlap (medium weight)
if content.primary_keyword and candidate.primary_keyword:
if content.primary_keyword.lower() in candidate.primary_keyword.lower():
score += 30
score += 20
if candidate.primary_keyword.lower() in content.primary_keyword.lower():
score += 30
score += 20
# Secondary keywords overlap
if content.secondary_keywords and candidate.secondary_keywords:
overlap = set(content.secondary_keywords) & set(candidate.secondary_keywords)
score += len(overlap) * 10
score += len(overlap) * 5
# Category overlap
if content.categories and candidate.categories:
overlap = set(content.categories) & set(candidate.categories)
score += len(overlap) * 5
score += len(overlap) * 3
# Tag overlap
if content.tags and candidate.tags:
overlap = set(content.tags) & set(candidate.tags)
score += len(overlap) * 3
score += len(overlap) * 2
# Recency bonus (newer content gets slight boost)
if candidate.generated_at:
days_old = (content.generated_at - candidate.generated_at).days
if days_old < 30:
score += 5
score += 3
if score > 0:
scored.append({
@@ -98,6 +153,8 @@ class CandidateEngine:
'title': candidate.title or candidate.task.title if candidate.task else 'Untitled',
'url': f"/content/{candidate.id}/", # Placeholder - actual URL depends on routing
'relevance_score': score,
'cluster_match': len(cluster_overlap) > 0, # Stage 3: Flag cluster matches
'taxonomy_match': len(taxonomy_overlap) > 0, # Stage 3: Flag taxonomy matches
'anchor_text': self._generate_anchor_text(candidate, content)
})

View File

@@ -35,25 +35,77 @@ class ContentAnalyzer:
readability_score = self._calculate_readability_score(content)
engagement_score = self._calculate_engagement_score(content)
# Overall score is weighted average
# Stage 3: Calculate metadata completeness score
metadata_score = self._calculate_metadata_score(content)
# Overall score is weighted average (includes metadata)
overall_score = (
seo_score * 0.4 +
readability_score * 0.3 +
engagement_score * 0.3
seo_score * 0.35 +
readability_score * 0.25 +
engagement_score * 0.25 +
metadata_score * 0.15
)
return {
'seo_score': round(seo_score, 2),
'readability_score': round(readability_score, 2),
'engagement_score': round(engagement_score, 2),
'metadata_score': round(metadata_score, 2), # Stage 3: Add metadata score
'overall_score': round(overall_score, 2),
'word_count': content.word_count or 0,
'has_meta_title': bool(content.meta_title),
'has_meta_description': bool(content.meta_description),
'has_primary_keyword': bool(content.primary_keyword),
'internal_links_count': len(content.internal_links) if content.internal_links else 0
'internal_links_count': len(content.internal_links) if content.internal_links else 0,
# Stage 3: Metadata completeness indicators
'has_entity_type': bool(content.entity_type),
'has_cluster_mapping': self._has_cluster_mapping(content),
'has_taxonomy_mapping': self._has_taxonomy_mapping(content),
}
def _calculate_metadata_score(self, content: Content) -> float:
"""Stage 3: Calculate metadata completeness score (0-100)"""
score = 0
# Entity type (20 points)
if content.entity_type:
score += 20
# Cluster mapping (30 points)
if self._has_cluster_mapping(content):
score += 30
# Taxonomy mapping (30 points) - required for products/services
if self._has_taxonomy_mapping(content):
score += 30
elif content.entity_type in ['product', 'service']:
# Products/services must have taxonomy
score += 0
else:
# Other types get partial credit
score += 15
# Attributes (20 points) - for products
if content.entity_type == 'product':
from igny8_core.business.content.models import ContentAttributeMap
attr_count = ContentAttributeMap.objects.filter(content=content).count()
if attr_count >= 3:
score += 20
elif attr_count >= 1:
score += 10
return min(score, 100)
def _has_cluster_mapping(self, content: Content) -> bool:
"""Stage 3: Check if content has cluster mapping"""
from igny8_core.business.content.models import ContentClusterMap
return ContentClusterMap.objects.filter(content=content).exists()
def _has_taxonomy_mapping(self, content: Content) -> bool:
"""Stage 3: Check if content has taxonomy mapping"""
from igny8_core.business.content.models import ContentTaxonomyMap
return ContentTaxonomyMap.objects.filter(content=content).exists()
def _calculate_seo_score(self, content: Content) -> float:
"""Calculate SEO score (0-100)"""
score = 0

View File

@@ -225,6 +225,38 @@ class PageGenerationService:
keywords = self._build_keywords_hint(page_blueprint)
# Stage 3: Map page type to entity_type
entity_type_map = {
'home': 'page',
'about': 'page',
'services': 'service',
'products': 'product',
'blog': 'blog_post',
'contact': 'page',
'custom': 'page',
}
entity_type = entity_type_map.get(page_blueprint.type, 'page')
# Stage 3: Try to find related cluster and taxonomy from blueprint
cluster_role = 'hub' # Default
taxonomy = None
# Find cluster link for this blueprint to infer role
from igny8_core.business.site_building.models import SiteBlueprintCluster
cluster_link = SiteBlueprintCluster.objects.filter(
site_blueprint=page_blueprint.site_blueprint
).first()
if cluster_link:
cluster_role = cluster_link.role
# Find taxonomy if page type suggests it (products/services)
if page_blueprint.type in ['products', 'services']:
from igny8_core.business.site_building.models import SiteBlueprintTaxonomy
taxonomy = SiteBlueprintTaxonomy.objects.filter(
site_blueprint=page_blueprint.site_blueprint,
taxonomy_type__in=['product_category', 'service_category']
).first()
task = Tasks.objects.create(
account=page_blueprint.account,
site=page_blueprint.site,
@@ -235,6 +267,10 @@ class PageGenerationService:
content_structure=self._map_content_structure(page_blueprint.type),
content_type='article',
status='queued',
# Stage 3: Set entity metadata
entity_type=entity_type,
taxonomy=taxonomy,
cluster_role=cluster_role,
)
logger.info(

View File

@@ -0,0 +1,22 @@
# Generated migration to fix tenant_id column name
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('billing', '0002_rename_tenant_to_account'),
]
operations = [
# Rename the database column from account_id to tenant_id to match model's db_column
migrations.RunSQL(
sql="ALTER TABLE igny8_credit_transactions RENAME COLUMN account_id TO tenant_id;",
reverse_sql="ALTER TABLE igny8_credit_transactions RENAME COLUMN tenant_id TO account_id;"
),
migrations.RunSQL(
sql="ALTER TABLE igny8_credit_usage_logs RENAME COLUMN account_id TO tenant_id;",
reverse_sql="ALTER TABLE igny8_credit_usage_logs RENAME COLUMN tenant_id TO account_id;"
),
]

View File

@@ -1011,6 +1011,7 @@ class ContentIdeasViewSet(SiteSectorModelViewSet):
created_tasks = []
for idea in ideas:
# Stage 3: Inherit metadata from idea
task = Tasks.objects.create(
title=idea.idea_title,
description=idea.description or '',
@@ -1023,6 +1024,10 @@ class ContentIdeasViewSet(SiteSectorModelViewSet):
account=idea.account,
site=idea.site,
sector=idea.sector,
# Stage 3: Inherit entity metadata
entity_type=idea.site_entity_type or 'blog_post',
taxonomy=idea.taxonomy,
cluster_role=idea.cluster_role or 'hub',
)
created_tasks.append(task.id)
# Update idea status

View File

@@ -218,6 +218,91 @@ class SiteBlueprintViewSet(SiteSectorModelViewSet):
except Exception as e:
return error_response(str(e), status.HTTP_400_BAD_REQUEST, request)
@action(detail=True, methods=['get'], url_path='progress', url_name='progress')
def progress(self, request, pk=None):
"""
Stage 3: Get cluster-level completion + validation status for site.
GET /api/v1/site-builder/blueprints/{id}/progress/
Returns progress summary with cluster coverage, validation flags.
"""
blueprint = self.get_object()
from igny8_core.business.content.models import (
Tasks,
Content,
ContentClusterMap,
ContentTaxonomyMap,
)
from igny8_core.business.planning.models import Clusters
from django.db.models import Count, Q
# Get clusters attached to blueprint
blueprint_clusters = blueprint.cluster_links.all()
cluster_ids = list(blueprint_clusters.values_list('cluster_id', flat=True))
# Get tasks and content for this blueprint's site
tasks = Tasks.objects.filter(site=blueprint.site)
content = Content.objects.filter(site=blueprint.site)
# Cluster coverage analysis
cluster_progress = []
for cluster_link in blueprint_clusters:
cluster = cluster_link.cluster
cluster_tasks = tasks.filter(cluster=cluster)
cluster_content_ids = ContentClusterMap.objects.filter(
cluster=cluster
).values_list('content_id', flat=True).distinct()
cluster_content = content.filter(id__in=cluster_content_ids)
# Count by role
hub_count = cluster_tasks.filter(cluster_role='hub').count()
supporting_count = cluster_tasks.filter(cluster_role='supporting').count()
attribute_count = cluster_tasks.filter(cluster_role='attribute').count()
cluster_progress.append({
'cluster_id': cluster.id,
'cluster_name': cluster.name,
'role': cluster_link.role,
'coverage_status': cluster_link.coverage_status,
'tasks_count': cluster_tasks.count(),
'content_count': cluster_content.count(),
'hub_pages': hub_count,
'supporting_pages': supporting_count,
'attribute_pages': attribute_count,
'is_complete': cluster_link.coverage_status == 'complete',
})
# Overall stats
total_tasks = tasks.count()
total_content = content.count()
tasks_with_cluster = tasks.filter(cluster__isnull=False).count()
content_with_cluster_map = ContentClusterMap.objects.filter(
content__site=blueprint.site
).values('content').distinct().count()
return success_response(
data={
'blueprint_id': blueprint.id,
'blueprint_name': blueprint.name,
'overall_progress': {
'total_tasks': total_tasks,
'total_content': total_content,
'tasks_with_cluster': tasks_with_cluster,
'content_with_cluster_mapping': content_with_cluster_map,
'completion_percentage': (
(content_with_cluster_map / total_content * 100) if total_content > 0 else 0
),
},
'cluster_progress': cluster_progress,
'validation_flags': {
'has_clusters': blueprint_clusters.exists(),
'has_taxonomies': blueprint.taxonomies.exists(),
'has_pages': blueprint.pages.exists(),
}
},
request=request
)
@action(detail=True, methods=['get'], url_path='workflow/context')
def workflow_context(self, request, pk=None):
"""Return aggregated wizard context (steps, clusters, taxonomies, coverage)."""

View File

@@ -0,0 +1,2 @@
# Writer management commands

View File

@@ -0,0 +1,2 @@
# Writer management commands

View File

@@ -0,0 +1,114 @@
"""
Management command to audit site metadata gaps
Stage 3: Summarizes metadata completeness per site
Usage: python manage.py audit_site_metadata --site {id}
"""
from django.core.management.base import BaseCommand
from django.db.models import Count, Q
from igny8_core.auth.models import Site
from igny8_core.business.content.models import (
Tasks,
Content,
ContentClusterMap,
ContentTaxonomyMap,
ContentAttributeMap,
)
class Command(BaseCommand):
help = 'Audit metadata completeness for a site (Stage 3)'
def add_arguments(self, parser):
parser.add_argument(
'--site',
type=int,
help='Site ID to audit (if not provided, audits all sites)',
)
parser.add_argument(
'--detailed',
action='store_true',
help='Show detailed breakdown by entity type',
)
def handle(self, *args, **options):
site_id = options.get('site')
detailed = options.get('detailed', False)
if site_id:
sites = Site.objects.filter(id=site_id)
else:
sites = Site.objects.all()
if not sites.exists():
self.stdout.write(self.style.ERROR(f'Site {site_id} not found'))
return
for site in sites:
self.stdout.write(self.style.SUCCESS(f'\n{"="*80}'))
self.stdout.write(self.style.SUCCESS(f'Auditing Site: {site.name} (ID: {site.id})'))
self.stdout.write(self.style.SUCCESS(f'{"="*80}\n'))
# Tasks audit
tasks = Tasks.objects.filter(site=site)
total_tasks = tasks.count()
tasks_with_cluster = tasks.filter(cluster__isnull=False).count()
tasks_with_entity_type = tasks.filter(entity_type__isnull=False).count()
tasks_with_taxonomy = tasks.filter(taxonomy__isnull=False).count()
tasks_with_cluster_role = tasks.filter(cluster_role__isnull=False).count()
self.stdout.write(f'\n📋 Tasks Summary:')
self.stdout.write(f' Total Tasks: {total_tasks}')
self.stdout.write(f' With Cluster: {tasks_with_cluster}/{total_tasks} ({tasks_with_cluster*100//total_tasks if total_tasks else 0}%)')
self.stdout.write(f' With Entity Type: {tasks_with_entity_type}/{total_tasks} ({tasks_with_entity_type*100//total_tasks if total_tasks else 0}%)')
self.stdout.write(f' With Taxonomy: {tasks_with_taxonomy}/{total_tasks} ({tasks_with_taxonomy*100//total_tasks if total_tasks else 0}%)')
self.stdout.write(f' With Cluster Role: {tasks_with_cluster_role}/{total_tasks} ({tasks_with_cluster_role*100//total_tasks if total_tasks else 0}%)')
# Content audit
content = Content.objects.filter(site=site)
total_content = content.count()
content_with_entity_type = content.filter(entity_type__isnull=False).count()
content_with_cluster_map = ContentClusterMap.objects.filter(
content__site=site
).values('content').distinct().count()
content_with_taxonomy_map = ContentTaxonomyMap.objects.filter(
content__site=site
).values('content').distinct().count()
content_with_attributes = ContentAttributeMap.objects.filter(
content__site=site
).values('content').distinct().count()
self.stdout.write(f'\n📄 Content Summary:')
self.stdout.write(f' Total Content: {total_content}')
self.stdout.write(f' With Entity Type: {content_with_entity_type}/{total_content} ({content_with_entity_type*100//total_content if total_content else 0}%)')
self.stdout.write(f' With Cluster Mapping: {content_with_cluster_map}/{total_content} ({content_with_cluster_map*100//total_content if total_content else 0}%)')
self.stdout.write(f' With Taxonomy Mapping: {content_with_taxonomy_map}/{total_content} ({content_with_taxonomy_map*100//total_content if total_content else 0}%)')
self.stdout.write(f' With Attributes: {content_with_attributes}/{total_content} ({content_with_attributes*100//total_content if total_content else 0}%)')
# Gap analysis
tasks_missing_cluster = tasks.filter(cluster__isnull=True).count()
tasks_missing_entity_type = tasks.filter(entity_type__isnull=True).count()
content_missing_cluster_map = total_content - content_with_cluster_map
self.stdout.write(f'\n⚠️ Gaps:')
self.stdout.write(f' Tasks missing cluster: {tasks_missing_cluster}')
self.stdout.write(f' Tasks missing entity_type: {tasks_missing_entity_type}')
self.stdout.write(f' Content missing cluster mapping: {content_missing_cluster_map}')
if detailed:
# Entity type breakdown
self.stdout.write(f'\n📊 Entity Type Breakdown:')
entity_types = tasks.values('entity_type').annotate(count=Count('id')).order_by('-count')
for et in entity_types:
self.stdout.write(f' {et["entity_type"] or "NULL"}: {et["count"]} tasks')
# Cluster role breakdown
self.stdout.write(f'\n🎯 Cluster Role Breakdown:')
roles = tasks.values('cluster_role').annotate(count=Count('id')).order_by('-count')
for role in roles:
self.stdout.write(f' {role["cluster_role"] or "NULL"}: {role["count"]} tasks')
self.stdout.write('')

View File

@@ -4,17 +4,121 @@ import django.db.models.deletion
def backfill_metadata_mappings_stub(apps, schema_editor):
"""
Stage 1: Placeholder for Stage 3 metadata backfill.
Stage 3: Backfill metadata mappings for existing Content/Task records.
This function will be extended in Stage 3 to backfill:
This function backfills:
- ContentClusterMap records from existing Content/Task -> Cluster relationships
- ContentTaxonomyMap records from existing taxonomy associations
- ContentAttributeMap records from existing attribute data
For now, this is a no-op to establish the migration hook.
- entity_type on Tasks from existing content_type or other fields (if field exists)
"""
# Stage 1: No-op - tables created, ready for Stage 3 backfill
pass
Tasks = apps.get_model('writer', 'Tasks')
Content = apps.get_model('writer', 'Content')
ContentClusterMap = apps.get_model('writer', 'ContentClusterMap')
ContentTaxonomyMap = apps.get_model('writer', 'ContentTaxonomyMap')
ContentAttributeMap = apps.get_model('writer', 'ContentAttributeMap')
# Check if entity_type field exists (added in migration 0013)
task_fields = [f.name for f in Tasks._meta.get_fields()]
has_entity_type = 'entity_type' in task_fields
# Backfill Tasks: Set entity_type from content_type if field exists and not set
tasks_updated = 0
if has_entity_type:
for task in Tasks.objects.filter(entity_type__isnull=True):
# Map content_type to entity_type
entity_type_map = {
'blog_post': 'blog_post',
'article': 'article',
'guide': 'article',
'tutorial': 'article',
}
task.entity_type = entity_type_map.get(task.content_type, 'blog_post')
task.save(update_fields=['entity_type'])
tasks_updated += 1
# Backfill Content: Set entity_type from task if not set
content_updated = 0
content_fields = [f.name for f in Content._meta.get_fields()]
if 'entity_type' in content_fields:
for content in Content.objects.filter(entity_type__isnull=True):
if content.task and has_entity_type and hasattr(content.task, 'entity_type') and content.task.entity_type:
content.entity_type = content.task.entity_type
content.save(update_fields=['entity_type'])
content_updated += 1
# Backfill ContentClusterMap: Create mappings from Task->Cluster relationships
cluster_maps_created = 0
has_cluster_role = 'cluster_role' in task_fields
content_fields = [f.name for f in Content._meta.get_fields()]
for task in Tasks.objects.filter(cluster__isnull=False):
# Find all Content records for this task
contents = Content.objects.filter(task=task)
for content in contents:
# Check if mapping already exists
if not ContentClusterMap.objects.filter(
content=content,
cluster=task.cluster
).exists():
# Get cluster_role if field exists
role = 'hub' # Default
if has_cluster_role and hasattr(task, 'cluster_role') and task.cluster_role:
role = task.cluster_role
# Get account/site/sector from content or task
account_id = getattr(content, 'account_id', None) or getattr(content, 'tenant_id', None) or getattr(task, 'account_id', None) or getattr(task, 'tenant_id', None)
site_id = getattr(content, 'site_id', None) or getattr(task, 'site_id', None)
sector_id = getattr(content, 'sector_id', None) or getattr(task, 'sector_id', None)
if account_id and site_id and sector_id:
ContentClusterMap.objects.create(
content=content,
task=task,
cluster=task.cluster,
role=role,
account_id=account_id,
site_id=site_id,
sector_id=sector_id,
source='blueprint' if task.idea else 'manual',
metadata={},
)
cluster_maps_created += 1
# Backfill ContentTaxonomyMap: Create mappings from Task->Taxonomy relationships
taxonomy_maps_created = 0
has_taxonomy = 'taxonomy' in task_fields
if has_taxonomy:
for task in Tasks.objects.filter(taxonomy__isnull=False):
contents = Content.objects.filter(task=task)
for content in contents:
if not ContentTaxonomyMap.objects.filter(
content=content,
taxonomy=task.taxonomy
).exists():
# Get account/site/sector from content or task
account_id = getattr(content, 'account_id', None) or getattr(content, 'tenant_id', None) or getattr(task, 'account_id', None) or getattr(task, 'tenant_id', None)
site_id = getattr(content, 'site_id', None) or getattr(task, 'site_id', None)
sector_id = getattr(content, 'sector_id', None) or getattr(task, 'sector_id', None)
if account_id and site_id and sector_id:
ContentTaxonomyMap.objects.create(
content=content,
task=task,
taxonomy=task.taxonomy,
account_id=account_id,
site_id=site_id,
sector_id=sector_id,
source='blueprint',
metadata={},
)
taxonomy_maps_created += 1
print(f"Backfill complete:")
print(f" - Tasks entity_type updated: {tasks_updated}")
print(f" - Content entity_type updated: {content_updated}")
print(f" - Cluster mappings created: {cluster_maps_created}")
print(f" - Taxonomy mappings created: {taxonomy_maps_created}")
def reverse_backfill_metadata_mappings_stub(apps, schema_editor):

View File

@@ -0,0 +1,70 @@
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('writer', '0012_metadata_mapping_tables'),
('site_building', '0003_workflow_and_taxonomies'),
]
operations = [
migrations.AddField(
model_name='tasks',
name='entity_type',
field=models.CharField(
blank=True,
choices=[
('blog_post', 'Blog Post'),
('article', 'Article'),
('product', 'Product'),
('service', 'Service Page'),
('taxonomy', 'Taxonomy Page'),
('page', 'Page'),
],
db_index=True,
default='blog_post',
help_text='Type of content entity (inherited from idea/blueprint)',
max_length=50,
null=True,
),
),
migrations.AddField(
model_name='tasks',
name='taxonomy',
field=models.ForeignKey(
blank=True,
help_text='Taxonomy association when derived from blueprint planning',
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='tasks',
to='site_building.SiteBlueprintTaxonomy',
),
),
migrations.AddField(
model_name='tasks',
name='cluster_role',
field=models.CharField(
blank=True,
choices=[
('hub', 'Hub Page'),
('supporting', 'Supporting Page'),
('attribute', 'Attribute Page'),
],
default='hub',
help_text='Role within the cluster-driven sitemap',
max_length=50,
null=True,
),
),
migrations.AddIndex(
model_name='tasks',
index=models.Index(fields=['entity_type'], name='writer_tasks_entity_type_idx'),
),
migrations.AddIndex(
model_name='tasks',
index=models.Index(fields=['cluster_role'], name='writer_tasks_cluster_role_idx'),
),
]

View File

@@ -13,6 +13,8 @@ from igny8_core.api.permissions import IsAuthenticatedAndActive, IsViewerOrAbove
from .models import Tasks, Images, Content
from .serializers import TasksSerializer, ImagesSerializer, ContentSerializer
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
from igny8_core.business.content.services.validation_service import ContentValidationService
from igny8_core.business.content.services.metadata_mapping_service import MetadataMappingService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
@@ -668,6 +670,74 @@ class ImagesViewSet(SiteSectorModelViewSet):
request=request
)
@action(detail=True, methods=['get'], url_path='validation', url_name='validation')
def validation(self, request, pk=None):
"""
Stage 3: Get validation checklist for content.
GET /api/v1/writer/content/{id}/validation/
Returns aggregated validation checklist for Writer UI.
"""
content = self.get_object()
validation_service = ContentValidationService()
errors = validation_service.validate_content(content)
publish_errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'ready_to_publish': len(publish_errors) == 0,
'validation_errors': errors,
'publish_errors': publish_errors,
'metadata': {
'has_entity_type': bool(content.entity_type),
'entity_type': content.entity_type,
'has_cluster_mapping': self._has_cluster_mapping(content),
'has_taxonomy_mapping': self._has_taxonomy_mapping(content),
}
},
request=request
)
@action(detail=True, methods=['post'], url_path='validate', url_name='validate')
def validate(self, request, pk=None):
"""
Stage 3: Re-run validators and return actionable errors.
POST /api/v1/writer/content/{id}/validate/
Re-validates content and returns structured errors.
"""
content = self.get_object()
validation_service = ContentValidationService()
# Persist metadata mappings if task exists
if content.task:
mapping_service = MetadataMappingService()
mapping_service.persist_task_metadata_to_content(content)
errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'errors': errors,
},
request=request
)
def _has_cluster_mapping(self, content):
"""Helper to check if content has cluster mapping"""
from igny8_core.business.content.models import ContentClusterMap
return ContentClusterMap.objects.filter(content=content).exists()
def _has_taxonomy_mapping(self, content):
"""Helper to check if content has taxonomy mapping"""
from igny8_core.business.content.models import ContentTaxonomyMap
return ContentTaxonomyMap.objects.filter(content=content).exists()
@extend_schema_view(
list=extend_schema(tags=['Writer']),
create=extend_schema(tags=['Writer']),
@@ -758,6 +828,74 @@ class ContentViewSet(SiteSectorModelViewSet):
request=request
)
@action(detail=True, methods=['get'], url_path='validation', url_name='validation')
def validation(self, request, pk=None):
"""
Stage 3: Get validation checklist for content.
GET /api/v1/writer/content/{id}/validation/
Returns aggregated validation checklist for Writer UI.
"""
content = self.get_object()
validation_service = ContentValidationService()
errors = validation_service.validate_content(content)
publish_errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'ready_to_publish': len(publish_errors) == 0,
'validation_errors': errors,
'publish_errors': publish_errors,
'metadata': {
'has_entity_type': bool(content.entity_type),
'entity_type': content.entity_type,
'has_cluster_mapping': self._has_cluster_mapping(content),
'has_taxonomy_mapping': self._has_taxonomy_mapping(content),
}
},
request=request
)
@action(detail=True, methods=['post'], url_path='validate', url_name='validate')
def validate(self, request, pk=None):
"""
Stage 3: Re-run validators and return actionable errors.
POST /api/v1/writer/content/{id}/validate/
Re-validates content and returns structured errors.
"""
content = self.get_object()
validation_service = ContentValidationService()
# Persist metadata mappings if task exists
if content.task:
mapping_service = MetadataMappingService()
mapping_service.persist_task_metadata_to_content(content)
errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'errors': errors,
},
request=request
)
def _has_cluster_mapping(self, content):
"""Helper to check if content has cluster mapping"""
from igny8_core.business.content.models import ContentClusterMap
return ContentClusterMap.objects.filter(content=content).exists()
def _has_taxonomy_mapping(self, content):
"""Helper to check if content has taxonomy mapping"""
from igny8_core.business.content.models import ContentTaxonomyMap
return ContentTaxonomyMap.objects.filter(content=content).exists()
@action(detail=False, methods=['post'], url_path='generate_product', url_name='generate_product')
def generate_product(self, request):
"""
@@ -841,6 +979,74 @@ class ContentViewSet(SiteSectorModelViewSet):
request=request
)
@action(detail=True, methods=['get'], url_path='validation', url_name='validation')
def validation(self, request, pk=None):
"""
Stage 3: Get validation checklist for content.
GET /api/v1/writer/content/{id}/validation/
Returns aggregated validation checklist for Writer UI.
"""
content = self.get_object()
validation_service = ContentValidationService()
errors = validation_service.validate_content(content)
publish_errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'ready_to_publish': len(publish_errors) == 0,
'validation_errors': errors,
'publish_errors': publish_errors,
'metadata': {
'has_entity_type': bool(content.entity_type),
'entity_type': content.entity_type,
'has_cluster_mapping': self._has_cluster_mapping(content),
'has_taxonomy_mapping': self._has_taxonomy_mapping(content),
}
},
request=request
)
@action(detail=True, methods=['post'], url_path='validate', url_name='validate')
def validate(self, request, pk=None):
"""
Stage 3: Re-run validators and return actionable errors.
POST /api/v1/writer/content/{id}/validate/
Re-validates content and returns structured errors.
"""
content = self.get_object()
validation_service = ContentValidationService()
# Persist metadata mappings if task exists
if content.task:
mapping_service = MetadataMappingService()
mapping_service.persist_task_metadata_to_content(content)
errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'errors': errors,
},
request=request
)
def _has_cluster_mapping(self, content):
"""Helper to check if content has cluster mapping"""
from igny8_core.business.content.models import ContentClusterMap
return ContentClusterMap.objects.filter(content=content).exists()
def _has_taxonomy_mapping(self, content):
"""Helper to check if content has taxonomy mapping"""
from igny8_core.business.content.models import ContentTaxonomyMap
return ContentTaxonomyMap.objects.filter(content=content).exists()
@action(detail=False, methods=['post'], url_path='generate_service', url_name='generate_service')
def generate_service(self, request):
"""
@@ -924,6 +1130,74 @@ class ContentViewSet(SiteSectorModelViewSet):
request=request
)
@action(detail=True, methods=['get'], url_path='validation', url_name='validation')
def validation(self, request, pk=None):
"""
Stage 3: Get validation checklist for content.
GET /api/v1/writer/content/{id}/validation/
Returns aggregated validation checklist for Writer UI.
"""
content = self.get_object()
validation_service = ContentValidationService()
errors = validation_service.validate_content(content)
publish_errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'ready_to_publish': len(publish_errors) == 0,
'validation_errors': errors,
'publish_errors': publish_errors,
'metadata': {
'has_entity_type': bool(content.entity_type),
'entity_type': content.entity_type,
'has_cluster_mapping': self._has_cluster_mapping(content),
'has_taxonomy_mapping': self._has_taxonomy_mapping(content),
}
},
request=request
)
@action(detail=True, methods=['post'], url_path='validate', url_name='validate')
def validate(self, request, pk=None):
"""
Stage 3: Re-run validators and return actionable errors.
POST /api/v1/writer/content/{id}/validate/
Re-validates content and returns structured errors.
"""
content = self.get_object()
validation_service = ContentValidationService()
# Persist metadata mappings if task exists
if content.task:
mapping_service = MetadataMappingService()
mapping_service.persist_task_metadata_to_content(content)
errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'errors': errors,
},
request=request
)
def _has_cluster_mapping(self, content):
"""Helper to check if content has cluster mapping"""
from igny8_core.business.content.models import ContentClusterMap
return ContentClusterMap.objects.filter(content=content).exists()
def _has_taxonomy_mapping(self, content):
"""Helper to check if content has taxonomy mapping"""
from igny8_core.business.content.models import ContentTaxonomyMap
return ContentTaxonomyMap.objects.filter(content=content).exists()
@action(detail=False, methods=['post'], url_path='generate_taxonomy', url_name='generate_taxonomy')
def generate_taxonomy(self, request):
"""
@@ -1005,4 +1279,72 @@ class ContentViewSet(SiteSectorModelViewSet):
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['get'], url_path='validation', url_name='validation')
def validation(self, request, pk=None):
"""
Stage 3: Get validation checklist for content.
GET /api/v1/writer/content/{id}/validation/
Returns aggregated validation checklist for Writer UI.
"""
content = self.get_object()
validation_service = ContentValidationService()
errors = validation_service.validate_content(content)
publish_errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'ready_to_publish': len(publish_errors) == 0,
'validation_errors': errors,
'publish_errors': publish_errors,
'metadata': {
'has_entity_type': bool(content.entity_type),
'entity_type': content.entity_type,
'has_cluster_mapping': self._has_cluster_mapping(content),
'has_taxonomy_mapping': self._has_taxonomy_mapping(content),
}
},
request=request
)
@action(detail=True, methods=['post'], url_path='validate', url_name='validate')
def validate(self, request, pk=None):
"""
Stage 3: Re-run validators and return actionable errors.
POST /api/v1/writer/content/{id}/validate/
Re-validates content and returns structured errors.
"""
content = self.get_object()
validation_service = ContentValidationService()
# Persist metadata mappings if task exists
if content.task:
mapping_service = MetadataMappingService()
mapping_service.persist_task_metadata_to_content(content)
errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'errors': errors,
},
request=request
)
def _has_cluster_mapping(self, content):
"""Helper to check if content has cluster mapping"""
from igny8_core.business.content.models import ContentClusterMap
return ContentClusterMap.objects.filter(content=content).exists()
def _has_taxonomy_mapping(self, content):
"""Helper to check if content has taxonomy mapping"""
from igny8_core.business.content.models import ContentTaxonomyMap
return ContentTaxonomyMap.objects.filter(content=content).exists()