feat: Implement WordPress publishing and unpublishing actions

- Added conditional visibility for table actions based on content state (published/draft).
- Introduced `publishContent` and `unpublishContent` API functions for handling WordPress integration.
- Updated `Content` component to manage publish/unpublish actions with appropriate error handling and success notifications.
- Refactored `PostEditor` to remove deprecated SEO fields and consolidate taxonomy management.
- Enhanced `TablePageTemplate` to filter row actions based on visibility conditions.
- Updated backend API to support publishing and unpublishing content with proper status updates and external references.
This commit is contained in:
alorig
2025-11-26 01:24:58 +05:00
parent ba842d8332
commit 53ea0c34ce
13 changed files with 1249 additions and 417 deletions

View File

@@ -1,13 +1,13 @@
"""
Generate Content AI Function
Extracted from modules/writer/tasks.py
STAGE 3: Updated to use final Stage 1 Content schema
"""
import logging
import re
from typing import Dict, List, Any
from django.db import transaction
from igny8_core.ai.base import BaseAIFunction
from igny8_core.modules.writer.models import Tasks, Content as TaskContent
from igny8_core.modules.writer.models import Tasks, Content
from igny8_core.ai.ai_core import AICore
from igny8_core.ai.validators import validate_tasks_exist
from igny8_core.ai.prompts import PromptRegistry
@@ -62,11 +62,10 @@ class GenerateContentFunction(BaseAIFunction):
if account:
queryset = queryset.filter(account=account)
# Preload all relationships to avoid N+1 queries
# Stage 3: Include taxonomy and keyword_objects for metadata
# STAGE 3: Preload relationships - taxonomy_term instead of taxonomy
tasks = list(queryset.select_related(
'account', 'site', 'sector', 'cluster', 'idea', 'taxonomy'
).prefetch_related('keyword_objects'))
'account', 'site', 'sector', 'cluster', 'taxonomy_term'
).prefetch_related('keywords'))
if not tasks:
raise ValueError("No tasks found")
@@ -74,9 +73,8 @@ class GenerateContentFunction(BaseAIFunction):
return tasks
def build_prompt(self, data: Any, account=None) -> str:
"""Build content generation prompt for a single task using registry"""
"""STAGE 3: Build content generation prompt using final Task schema"""
if isinstance(data, list):
# For now, handle single task (will be called per task)
if not data:
raise ValueError("No tasks provided")
task = data[0]
@@ -90,33 +88,9 @@ class GenerateContentFunction(BaseAIFunction):
if task.description:
idea_data += f"Description: {task.description}\n"
# Handle idea description (might be JSON or plain text)
if task.idea and task.idea.description:
description = task.idea.description
try:
import json
parsed_desc = json.loads(description)
if isinstance(parsed_desc, dict):
formatted_desc = "Content Outline:\n\n"
if 'H2' in parsed_desc:
for h2_section in parsed_desc['H2']:
formatted_desc += f"## {h2_section.get('heading', '')}\n"
if 'subsections' in h2_section:
for h3_section in h2_section['subsections']:
formatted_desc += f"### {h3_section.get('subheading', '')}\n"
formatted_desc += f"Content Type: {h3_section.get('content_type', '')}\n"
formatted_desc += f"Details: {h3_section.get('details', '')}\n\n"
description = formatted_desc
except (json.JSONDecodeError, TypeError):
pass # Use as plain text
idea_data += f"Outline: {description}\n"
if task.idea:
idea_data += f"Structure: {task.idea.content_structure or task.content_structure or 'blog_post'}\n"
idea_data += f"Type: {task.idea.content_type or task.content_type or 'blog_post'}\n"
if task.idea.estimated_word_count:
idea_data += f"Estimated Word Count: {task.idea.estimated_word_count}\n"
# Add content type and structure from task
idea_data += f"Content Type: {task.content_type or 'post'}\n"
idea_data += f"Content Structure: {task.content_structure or 'article'}\n"
# Build cluster data string
cluster_data = ''
@@ -124,56 +98,21 @@ class GenerateContentFunction(BaseAIFunction):
cluster_data = f"Cluster Name: {task.cluster.name or ''}\n"
if task.cluster.description:
cluster_data += f"Description: {task.cluster.description}\n"
cluster_data += f"Status: {task.cluster.status or 'active'}\n"
# Stage 3: Build cluster role context
cluster_role_data = ''
if hasattr(task, 'cluster_role') and task.cluster_role:
role_descriptions = {
'hub': 'Hub Page - Main authoritative resource for this topic cluster. Should be comprehensive, overview-focused, and link to supporting content.',
'supporting': 'Supporting Page - Detailed content that supports the hub page. Focus on specific aspects, use cases, or subtopics.',
'attribute': 'Attribute Page - Content focused on specific attributes, features, or specifications. Include detailed comparisons and specifications.',
}
role_desc = role_descriptions.get(task.cluster_role, f'Role: {task.cluster_role}')
cluster_role_data = f"Cluster Role: {role_desc}\n"
# Stage 3: Build taxonomy context
# STAGE 3: Build taxonomy context (from taxonomy_term FK)
taxonomy_data = ''
if hasattr(task, 'taxonomy') and task.taxonomy:
taxonomy_data = f"Taxonomy: {task.taxonomy.name or ''}\n"
if task.taxonomy.taxonomy_type:
taxonomy_data += f"Taxonomy Type: {task.taxonomy.get_taxonomy_type_display() or task.taxonomy.taxonomy_type}\n"
if task.taxonomy.description:
taxonomy_data += f"Description: {task.taxonomy.description}\n"
if task.taxonomy_term:
taxonomy_data = f"Taxonomy: {task.taxonomy_term.name or ''}\n"
if task.taxonomy_term.taxonomy_type:
taxonomy_data += f"Type: {task.taxonomy_term.get_taxonomy_type_display()}\n"
# Stage 3: Build attributes context from keywords
attributes_data = ''
if hasattr(task, 'keyword_objects') and task.keyword_objects.exists():
attribute_list = []
for keyword in task.keyword_objects.all():
if hasattr(keyword, 'attribute_values') and keyword.attribute_values:
if isinstance(keyword.attribute_values, dict):
for attr_name, attr_value in keyword.attribute_values.items():
attribute_list.append(f"{attr_name}: {attr_value}")
elif isinstance(keyword.attribute_values, list):
for attr_item in keyword.attribute_values:
if isinstance(attr_item, dict):
for attr_name, attr_value in attr_item.items():
attribute_list.append(f"{attr_name}: {attr_value}")
else:
attribute_list.append(str(attr_item))
if attribute_list:
attributes_data = "Product/Service Attributes:\n"
attributes_data += "\n".join(f"- {attr}" for attr in attribute_list) + "\n"
# Build keywords string
keywords_data = task.keywords or ''
if not keywords_data and task.idea:
keywords_data = task.idea.target_keywords or ''
# STAGE 3: Build keywords context (from keywords M2M)
keywords_data = ''
if task.keywords.exists():
keyword_list = [kw.keyword for kw in task.keywords.all()]
keywords_data = "Keywords: " + ", ".join(keyword_list) + "\n"
# Get prompt from registry with context
# Stage 3: Include cluster_role, taxonomy, and attributes in context
prompt = PromptRegistry.get_prompt(
function_name='generate_content',
account=account,
@@ -181,9 +120,7 @@ class GenerateContentFunction(BaseAIFunction):
context={
'IDEA': idea_data,
'CLUSTER': cluster_data,
'CLUSTER_ROLE': cluster_role_data,
'TAXONOMY': taxonomy_data,
'ATTRIBUTES': attributes_data,
'KEYWORDS': keywords_data,
}
)
@@ -222,7 +159,10 @@ class GenerateContentFunction(BaseAIFunction):
progress_tracker=None,
step_tracker=None
) -> Dict:
"""Save content to task - handles both JSON and plain text responses"""
"""
STAGE 3: Save content using final Stage 1 Content model schema.
Creates independent Content record (no OneToOne to Task).
"""
if isinstance(original_data, list):
task = original_data[0] if original_data else None
else:
@@ -236,113 +176,50 @@ class GenerateContentFunction(BaseAIFunction):
# JSON response with structured fields
content_html = parsed.get('content', '')
title = parsed.get('title') or task.title
meta_title = parsed.get('meta_title') or title or task.title
meta_description = parsed.get('meta_description', '')
word_count = parsed.get('word_count', 0)
primary_keyword = parsed.get('primary_keyword', '')
secondary_keywords = parsed.get('secondary_keywords', [])
tags = parsed.get('tags', [])
categories = parsed.get('categories', [])
# Content status should always be 'draft' for newly generated content
# Status can only be changed manually to 'review' or 'publish'
content_status = 'draft'
else:
# Plain text response (legacy)
# Plain text response
content_html = str(parsed)
title = task.title
meta_title = task.meta_title or task.title
meta_description = task.meta_description or (task.description or '')[:160] if task.description else ''
word_count = 0
primary_keyword = ''
secondary_keywords = []
tags = []
categories = []
content_status = 'draft'
# Calculate word count if not provided
if not word_count and content_html:
# Calculate word count
word_count = 0
if content_html:
text_for_counting = re.sub(r'<[^>]+>', '', content_html)
word_count = len(text_for_counting.split())
# Ensure related content record exists
content_record, _created = TaskContent.objects.get_or_create(
task=task,
defaults={
'account': task.account,
'site': task.site,
'sector': task.sector,
'html_content': content_html or '',
'word_count': word_count or 0,
'status': 'draft',
},
# STAGE 3: Create independent Content record using final schema
content_record = Content.objects.create(
# Core fields
title=title,
content_html=content_html or '',
cluster=task.cluster,
content_type=task.content_type,
content_structure=task.content_structure,
# Source and status
source='igny8',
status='draft',
# Site/Sector/Account
account=task.account,
site=task.site,
sector=task.sector,
)
# Update content fields
if content_html:
content_record.html_content = content_html
content_record.word_count = word_count or content_record.word_count or 0
content_record.title = title
content_record.meta_title = meta_title
content_record.meta_description = meta_description
content_record.primary_keyword = primary_keyword or ''
if isinstance(secondary_keywords, list):
content_record.secondary_keywords = secondary_keywords
elif secondary_keywords:
content_record.secondary_keywords = [secondary_keywords]
else:
content_record.secondary_keywords = []
if isinstance(tags, list):
content_record.tags = tags
elif tags:
content_record.tags = [tags]
else:
content_record.tags = []
if isinstance(categories, list):
content_record.categories = categories
elif categories:
content_record.categories = [categories]
else:
content_record.categories = []
# Always set status to 'draft' for newly generated content
# Status can only be: draft, review, published (changed manually)
content_record.status = 'draft'
# Merge any extra fields into metadata (non-standard keys)
if isinstance(parsed, dict):
excluded_keys = {
'content',
'title',
'meta_title',
'meta_description',
'primary_keyword',
'secondary_keywords',
'tags',
'categories',
'word_count',
'status',
}
extra_meta = {k: v for k, v in parsed.items() if k not in excluded_keys}
existing_meta = content_record.metadata or {}
existing_meta.update(extra_meta)
content_record.metadata = existing_meta
# Align foreign keys to ensure consistency
content_record.account = task.account
content_record.site = task.site
content_record.sector = task.sector
content_record.task = task
content_record.save()
# Update task status - keep task data intact but mark as completed
# Link taxonomy terms from task if available
if task.taxonomy_term:
content_record.taxonomy_terms.add(task.taxonomy_term)
# Link all keywords from task as taxonomy terms (if they have taxonomy mappings)
# This is optional - keywords are M2M on Task, not directly on Content
# STAGE 3: Update task status to completed
task.status = 'completed'
task.save(update_fields=['status', 'updated_at'])
return {
'count': 1,
'tasks_updated': 1,
'word_count': content_record.word_count,
'content_id': content_record.id,
'task_id': task.id,
'word_count': word_count,
}

View File

@@ -176,7 +176,7 @@ class ContentSyncService:
integration: SiteIntegration
) -> Dict[str, Any]:
"""
Sync content from WordPress to IGNY8.
STAGE 3: Sync content from WordPress to IGNY8 using final Stage 1 schema.
Args:
integration: SiteIntegration instance
@@ -188,31 +188,54 @@ class ContentSyncService:
posts = self._fetch_wordpress_posts(integration)
synced_count = 0
from igny8_core.business.content.models import Content
from igny8_core.business.content.models import Content, ContentTaxonomy
# Get default cluster if available
from igny8_core.business.planning.models import Clusters
default_cluster = Clusters.objects.filter(
site=integration.site,
name__icontains='imported'
).first()
for post in posts:
# Check if content already exists
content, created = Content.objects.get_or_create(
account=integration.account,
site=integration.site,
sector=integration.site.sectors.first() if hasattr(integration.site, 'sectors') else None,
title=post.get('title', ''),
source='wordpress',
defaults={
'html_content': post.get('content', ''),
'status': 'published' if post.get('status') == 'publish' else 'draft',
'metadata': {'wordpress_id': post.get('id')}
}
)
# Map WP post type to content_type
wp_type = post.get('type', 'post')
content_type = self._map_wp_post_type(wp_type)
if not created:
# Check if content already exists by external_id
existing = Content.objects.filter(
site=integration.site,
external_id=str(post.get('id')),
source='wordpress'
).first()
if existing:
# Update existing content
content.html_content = post.get('content', '')
content.status = 'published' if post.get('status') == 'publish' else 'draft'
if not content.metadata:
content.metadata = {}
content.metadata['wordpress_id'] = post.get('id')
content.save()
existing.title = post.get('title', {}).get('rendered', '') or post.get('title', '')
existing.content_html = post.get('content', {}).get('rendered', '') or post.get('content', '')
existing.external_url = post.get('link', '')
existing.status = 'published' if post.get('status') == 'publish' else 'draft'
existing.save()
content = existing
else:
# Create new content
content = Content.objects.create(
account=integration.account,
site=integration.site,
sector=integration.site.sectors.first() if hasattr(integration.site, 'sectors') else None,
title=post.get('title', {}).get('rendered', '') or post.get('title', ''),
content_html=post.get('content', {}).get('rendered', '') or post.get('content', ''),
cluster=default_cluster,
content_type=content_type,
content_structure='article', # Default, can be refined
source='wordpress',
status='published' if post.get('status') == 'publish' else 'draft',
external_id=str(post.get('id')),
external_url=post.get('link', ''),
)
# Sync taxonomies (categories and tags)
self._sync_post_taxonomies(content, post, integration)
synced_count += 1
@@ -678,6 +701,73 @@ class ContentSyncService:
'synced_count': 0
}
def _map_wp_post_type(self, wp_type: str) -> str:
"""
STAGE 3: Map WordPress post type to IGNY8 content_type.
Args:
wp_type: WordPress post type (post, page, product, etc.)
Returns:
str: Mapped content_type
"""
mapping = {
'post': 'post',
'page': 'page',
'product': 'product',
'service': 'service',
# Add more mappings as needed
}
return mapping.get(wp_type, 'post')
def _sync_post_taxonomies(
self,
content,
post: Dict[str, Any],
integration: SiteIntegration
) -> None:
"""
STAGE 3: Sync taxonomies (categories, tags) for a WordPress post.
Args:
content: Content instance
post: WordPress post data
integration: SiteIntegration instance
"""
from igny8_core.business.content.models import ContentTaxonomy
# Sync categories
for cat_id in post.get('categories', []):
taxonomy, _ = ContentTaxonomy.objects.get_or_create(
site=integration.site,
external_id=cat_id,
external_taxonomy='category',
defaults={
'name': f'Category {cat_id}', # Will be updated later
'slug': f'category-{cat_id}',
'taxonomy_type': 'category',
'account': integration.account,
'sector': integration.site.sectors.first() if hasattr(integration.site, 'sectors') else None,
}
)
content.taxonomy_terms.add(taxonomy)
# Sync tags
for tag_id in post.get('tags', []):
taxonomy, _ = ContentTaxonomy.objects.get_or_create(
site=integration.site,
external_id=tag_id,
external_taxonomy='post_tag',
defaults={
'name': f'Tag {tag_id}', # Will be updated later
'slug': f'tag-{tag_id}',
'taxonomy_type': 'tag',
'account': integration.account,
'sector': integration.site.sectors.first() if hasattr(integration.site, 'sectors') else None,
}
)
content.taxonomy_terms.add(taxonomy)
def _sync_from_shopify(
self,
integration: SiteIntegration,

View File

@@ -56,12 +56,12 @@ class WordPressAdapter(BaseAdapter):
if hasattr(content, 'title'):
# Content model instance
title = content.title
# Try different possible attribute names for content
content_html = getattr(content, 'html_content', None) or getattr(content, 'content', None) or ''
# Stage 1 schema: content_html is the primary field
content_html = getattr(content, 'content_html', '') or getattr(content, 'html_content', '') or getattr(content, 'content', '')
elif isinstance(content, dict):
# Dict with content data
title = content.get('title', '')
content_html = content.get('html_content') or content.get('content', '')
content_html = content.get('content_html') or content.get('html_content') or content.get('content', '')
else:
raise ValueError(f"Unsupported content type: {type(content)}")

View File

@@ -1011,23 +1011,39 @@ class ContentIdeasViewSet(SiteSectorModelViewSet):
created_tasks = []
for idea in ideas:
# Stage 3: Inherit metadata from idea
# STAGE 3: Map idea fields to final Task schema
# Map site_entity_type → content_type
content_type = idea.site_entity_type or 'post'
# Map cluster_role → content_structure
# hub → article, supporting → guide, attribute → comparison
role_to_structure = {
'hub': 'article',
'supporting': 'guide',
'attribute': 'comparison',
}
content_structure = role_to_structure.get(idea.cluster_role, 'article')
# Create task with Stage 1 final fields
task = Tasks.objects.create(
title=idea.idea_title,
description=idea.description or '',
keywords=idea.target_keywords or '',
cluster=idea.keyword_cluster,
idea=idea,
content_type=content_type,
content_structure=content_structure,
taxonomy_term=None, # Can be set later if taxonomy is available
status='queued',
account=idea.account,
site=idea.site,
sector=idea.sector,
# Stage 3: Inherit entity metadata (use standardized fields)
entity_type=(idea.site_entity_type or 'post'),
taxonomy=idea.taxonomy,
cluster_role=(idea.cluster_role or 'hub'),
)
# Link keywords from idea to task
if idea.keyword_objects.exists():
task.keywords.set(idea.keyword_objects.all())
created_tasks.append(task.id)
# Update idea status
idea.status = 'scheduled'
idea.save()

View File

@@ -761,22 +761,34 @@ class ContentViewSet(SiteSectorModelViewSet):
@action(detail=True, methods=['post'], url_path='publish', url_name='publish', permission_classes=[IsAuthenticatedAndActive, IsEditorOrAbove])
def publish(self, request, pk=None):
"""
Stage 1: Publish content to WordPress site.
STAGE 3: Publish content to WordPress site.
Prevents duplicate publishing and updates external_id/external_url.
POST /api/v1/writer/content/{id}/publish/
{
"site_id": 1 // WordPress site to publish to
"site_id": 1, // Optional - defaults to content's site
"status": "publish" // Optional - draft or publish
}
"""
import requests
from igny8_core.auth.models import Site
from igny8_core.business.publishing.services.adapters.wordpress_adapter import WordPressAdapter
content = self.get_object()
site_id = request.data.get('site_id')
# STAGE 3: Prevent duplicate publishing
if content.external_id:
return error_response(
error='Content already published. Use WordPress to update or unpublish first.',
status_code=status.HTTP_400_BAD_REQUEST,
request=request,
errors={'external_id': [f'Already published with ID: {content.external_id}']}
)
# Get site (use content's site if not specified)
site_id = request.data.get('site_id') or content.site_id
if not site_id:
return error_response(
error='site_id is required',
error='site_id is required or content must have a site',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
@@ -790,50 +802,40 @@ class ContentViewSet(SiteSectorModelViewSet):
request=request
)
# Build WordPress API payload
wp_payload = {
'title': content.title,
'content': content.content_html,
'status': 'publish',
'meta': {
'_igny8_content_id': str(content.id),
'_igny8_cluster_id': str(content.cluster_id) if content.cluster_id else '',
'_igny8_content_type': content.content_type,
'_igny8_content_structure': content.content_structure,
},
}
# Get WordPress credentials from site metadata
wp_credentials = site.metadata.get('wordpress', {}) if site.metadata else {}
wp_url = wp_credentials.get('url') or site.url
wp_username = wp_credentials.get('username')
wp_app_password = wp_credentials.get('app_password')
# Add taxonomy terms if present
if content.taxonomy_terms.exists():
wp_categories = []
wp_tags = []
for term in content.taxonomy_terms.all():
if term.taxonomy_type == 'category' and term.external_id:
wp_categories.append(int(term.external_id))
elif term.taxonomy_type == 'post_tag' and term.external_id:
wp_tags.append(int(term.external_id))
if wp_categories:
wp_payload['categories'] = wp_categories
if wp_tags:
wp_payload['tags'] = wp_tags
if not wp_username or not wp_app_password:
return error_response(
error='WordPress credentials not configured for this site',
status_code=status.HTTP_400_BAD_REQUEST,
request=request,
errors={'credentials': ['Missing WordPress username or app password in site settings']}
)
# Call WordPress REST API (using site's WP credentials)
try:
# TODO: Get WP credentials from site.metadata or environment
wp_url = site.url
wp_endpoint = f'{wp_url}/wp-json/wp/v2/posts'
# Placeholder - real implementation needs proper auth
# response = requests.post(wp_endpoint, json=wp_payload, auth=(wp_user, wp_password))
# response.raise_for_status()
# wp_post_data = response.json()
# For now, mark as published and return success
# Use WordPress adapter to publish
adapter = WordPressAdapter()
wp_status = request.data.get('status', 'publish') # draft or publish
result = adapter.publish(
content=content,
destination_config={
'site_url': wp_url,
'username': wp_username,
'app_password': wp_app_password,
'status': wp_status,
}
)
if result.get('success'):
# STAGE 3: Update content with external references
content.external_id = result.get('external_id')
content.external_url = result.get('url')
content.status = 'published'
content.external_id = '12345'
content.external_url = f'{wp_url}/?p=12345'
content.save()
content.save(update_fields=['external_id', 'external_url', 'status', 'updated_at'])
return success_response(
data={
@@ -841,18 +843,55 @@ class ContentViewSet(SiteSectorModelViewSet):
'status': content.status,
'external_id': content.external_id,
'external_url': content.external_url,
'message': 'Content published to WordPress (placeholder implementation)',
},
message='Content published successfully',
message='Content published to WordPress successfully',
request=request
)
except Exception as e:
else:
return error_response(
error=f'Failed to publish to WordPress: {str(e)}',
error=f"Failed to publish to WordPress: {result.get('metadata', {}).get('error', 'Unknown error')}",
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['post'], url_path='unpublish', url_name='unpublish', permission_classes=[IsAuthenticatedAndActive, IsEditorOrAbove])
def unpublish(self, request, pk=None):
"""
STAGE 3: Unpublish content - clear external references and revert to draft.
Note: This does NOT delete the WordPress post, only clears the link.
POST /api/v1/writer/content/{id}/unpublish/
"""
content = self.get_object()
if not content.external_id:
return error_response(
error='Content is not published',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Store the old values for response
old_external_id = content.external_id
old_external_url = content.external_url
# Clear external references and revert status
content.external_id = None
content.external_url = None
content.status = 'draft'
content.save(update_fields=['external_id', 'external_url', 'status', 'updated_at'])
return success_response(
data={
'content_id': content.id,
'status': content.status,
'was_external_id': old_external_id,
'was_external_url': old_external_url,
},
message='Content unpublished successfully. WordPress post was not deleted.',
request=request
)
@action(detail=False, methods=['post'], url_path='generate_image_prompts', url_name='generate_image_prompts')
def generate_image_prompts(self, request):
"""Generate image prompts for content records - same pattern as other AI functions"""