385
backend/igny8_core/tasks/wordpress_publishing.py
Normal file
385
backend/igny8_core/tasks/wordpress_publishing.py
Normal file
@@ -0,0 +1,385 @@
|
||||
"""
|
||||
IGNY8 Content Publishing Celery Tasks
|
||||
|
||||
Handles automated publishing of content from IGNY8 to WordPress sites.
|
||||
"""
|
||||
from celery import shared_task
|
||||
from django.conf import settings
|
||||
from django.utils import timezone
|
||||
from datetime import timedelta
|
||||
import requests
|
||||
import logging
|
||||
from typing import Dict, List, Any, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task(bind=True, max_retries=3)
|
||||
def publish_content_to_wordpress(self, content_id: int, site_integration_id: int, task_id: Optional[int] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Publish a single content item to WordPress
|
||||
|
||||
Args:
|
||||
content_id: IGNY8 content ID
|
||||
site_integration_id: WordPress site integration ID
|
||||
task_id: Optional IGNY8 task ID
|
||||
|
||||
Returns:
|
||||
Dict with success status and details
|
||||
"""
|
||||
try:
|
||||
from igny8_core.models import ContentPost, SiteIntegration
|
||||
|
||||
# Get content and site integration
|
||||
try:
|
||||
content = ContentPost.objects.get(id=content_id)
|
||||
site_integration = SiteIntegration.objects.get(id=site_integration_id)
|
||||
except (ContentPost.DoesNotExist, SiteIntegration.DoesNotExist) as e:
|
||||
logger.error(f"Content or site integration not found: {e}")
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Check if content is ready for publishing
|
||||
if content.wordpress_sync_status == 'success':
|
||||
logger.info(f"Content {content_id} already published to WordPress")
|
||||
return {"success": True, "message": "Already published", "wordpress_post_id": content.wordpress_post_id}
|
||||
|
||||
if content.wordpress_sync_status == 'syncing':
|
||||
logger.info(f"Content {content_id} is currently syncing")
|
||||
return {"success": False, "error": "Content is currently syncing"}
|
||||
|
||||
# Update status to syncing
|
||||
content.wordpress_sync_status = 'syncing'
|
||||
content.save(update_fields=['wordpress_sync_status'])
|
||||
|
||||
# Prepare content data for WordPress
|
||||
content_data = {
|
||||
'content_id': content.id,
|
||||
'task_id': task_id,
|
||||
'title': content.title,
|
||||
'content_html': content.content_html or content.content,
|
||||
'excerpt': content.brief or '',
|
||||
'status': 'publish',
|
||||
'author_email': content.author.email if content.author else None,
|
||||
'author_name': content.author.get_full_name() if content.author else None,
|
||||
'published_at': content.published_at.isoformat() if content.published_at else None,
|
||||
'seo_title': getattr(content, 'seo_title', ''),
|
||||
'seo_description': getattr(content, 'seo_description', ''),
|
||||
'featured_image_url': content.featured_image.url if content.featured_image else None,
|
||||
'sectors': [{'id': s.id, 'name': s.name} for s in content.sectors.all()],
|
||||
'clusters': [{'id': c.id, 'name': c.name} for c in content.clusters.all()],
|
||||
'tags': getattr(content, 'tags', []),
|
||||
'focus_keywords': getattr(content, 'focus_keywords', [])
|
||||
}
|
||||
|
||||
# Call WordPress REST API
|
||||
wordpress_url = f"{site_integration.site_url}/wp-json/igny8/v1/publish-content/"
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'X-IGNY8-API-KEY': site_integration.api_key,
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
wordpress_url,
|
||||
json=content_data,
|
||||
headers=headers,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
if response.status_code == 201:
|
||||
# Success
|
||||
wp_data = response.json().get('data', {})
|
||||
content.wordpress_sync_status = 'success'
|
||||
content.wordpress_post_id = wp_data.get('post_id')
|
||||
content.wordpress_post_url = wp_data.get('post_url')
|
||||
content.last_wordpress_sync = timezone.now()
|
||||
content.save(update_fields=[
|
||||
'wordpress_sync_status', 'wordpress_post_id',
|
||||
'wordpress_post_url', 'last_wordpress_sync'
|
||||
])
|
||||
|
||||
logger.info(f"Successfully published content {content_id} to WordPress post {content.wordpress_post_id}")
|
||||
return {
|
||||
"success": True,
|
||||
"wordpress_post_id": content.wordpress_post_id,
|
||||
"wordpress_post_url": content.wordpress_post_url
|
||||
}
|
||||
|
||||
elif response.status_code == 409:
|
||||
# Content already exists
|
||||
wp_data = response.json().get('data', {})
|
||||
content.wordpress_sync_status = 'success'
|
||||
content.wordpress_post_id = wp_data.get('post_id')
|
||||
content.last_wordpress_sync = timezone.now()
|
||||
content.save(update_fields=[
|
||||
'wordpress_sync_status', 'wordpress_post_id', 'last_wordpress_sync'
|
||||
])
|
||||
|
||||
logger.info(f"Content {content_id} already exists on WordPress")
|
||||
return {"success": True, "message": "Content already exists", "wordpress_post_id": content.wordpress_post_id}
|
||||
|
||||
else:
|
||||
# Error
|
||||
error_msg = f"WordPress API error: {response.status_code} - {response.text}"
|
||||
logger.error(error_msg)
|
||||
|
||||
# Retry logic
|
||||
if self.request.retries < self.max_retries:
|
||||
content.wordpress_sync_attempts = (content.wordpress_sync_attempts or 0) + 1
|
||||
content.save(update_fields=['wordpress_sync_attempts'])
|
||||
|
||||
# Exponential backoff: 1min, 5min, 15min
|
||||
countdown = 60 * (5 ** self.request.retries)
|
||||
raise self.retry(countdown=countdown, exc=Exception(error_msg))
|
||||
else:
|
||||
# Max retries reached
|
||||
content.wordpress_sync_status = 'failed'
|
||||
content.last_wordpress_sync = timezone.now()
|
||||
content.save(update_fields=['wordpress_sync_status', 'last_wordpress_sync'])
|
||||
|
||||
return {"success": False, "error": error_msg}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error publishing content {content_id}: {str(e)}")
|
||||
|
||||
# Update content status on error
|
||||
try:
|
||||
content = ContentPost.objects.get(id=content_id)
|
||||
content.wordpress_sync_status = 'failed'
|
||||
content.last_wordpress_sync = timezone.now()
|
||||
content.save(update_fields=['wordpress_sync_status', 'last_wordpress_sync'])
|
||||
except:
|
||||
pass
|
||||
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
|
||||
@shared_task
|
||||
def process_pending_wordpress_publications() -> Dict[str, Any]:
|
||||
"""
|
||||
Process all content items pending WordPress publication
|
||||
Runs every 5 minutes
|
||||
"""
|
||||
try:
|
||||
from igny8_core.models import ContentPost, SiteIntegration
|
||||
|
||||
# Find content marked for WordPress publishing
|
||||
pending_content = ContentPost.objects.filter(
|
||||
wordpress_sync_status='pending',
|
||||
published_at__isnull=False # Only published content
|
||||
).select_related('author').prefetch_related('sectors', 'clusters')
|
||||
|
||||
if not pending_content.exists():
|
||||
logger.info("No content pending WordPress publication")
|
||||
return {"success": True, "processed": 0}
|
||||
|
||||
# Get active WordPress integrations
|
||||
active_integrations = SiteIntegration.objects.filter(
|
||||
platform='wordpress',
|
||||
is_active=True,
|
||||
api_key__isnull=False
|
||||
)
|
||||
|
||||
if not active_integrations.exists():
|
||||
logger.warning("No active WordPress integrations found")
|
||||
return {"success": False, "error": "No active WordPress integrations"}
|
||||
|
||||
processed = 0
|
||||
failed = 0
|
||||
|
||||
for content in pending_content[:50]: # Process max 50 at a time
|
||||
for integration in active_integrations:
|
||||
# Get task_id if content is associated with a task
|
||||
task_id = None
|
||||
if hasattr(content, 'writer_task'):
|
||||
task_id = content.writer_task.id
|
||||
|
||||
# Queue individual publish task
|
||||
publish_content_to_wordpress.delay(
|
||||
content.id,
|
||||
integration.id,
|
||||
task_id
|
||||
)
|
||||
processed += 1
|
||||
|
||||
logger.info(f"Queued {processed} content items for WordPress publication")
|
||||
return {"success": True, "processed": processed, "failed": failed}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing pending WordPress publications: {str(e)}")
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
|
||||
@shared_task
|
||||
def bulk_publish_content_to_wordpress(content_ids: List[int], site_integration_id: int) -> Dict[str, Any]:
|
||||
"""
|
||||
Bulk publish multiple content items to WordPress
|
||||
Used for manual bulk operations from Content Manager
|
||||
"""
|
||||
try:
|
||||
from igny8_core.models import ContentPost, SiteIntegration
|
||||
|
||||
site_integration = SiteIntegration.objects.get(id=site_integration_id)
|
||||
content_items = ContentPost.objects.filter(id__in=content_ids)
|
||||
|
||||
results = {
|
||||
"success": True,
|
||||
"total": len(content_ids),
|
||||
"queued": 0,
|
||||
"skipped": 0,
|
||||
"errors": []
|
||||
}
|
||||
|
||||
for content in content_items:
|
||||
try:
|
||||
# Skip if already published or syncing
|
||||
if content.wordpress_sync_status in ['success', 'syncing']:
|
||||
results["skipped"] += 1
|
||||
continue
|
||||
|
||||
# Mark as pending and queue
|
||||
content.wordpress_sync_status = 'pending'
|
||||
content.save(update_fields=['wordpress_sync_status'])
|
||||
|
||||
# Get task_id if available
|
||||
task_id = None
|
||||
if hasattr(content, 'writer_task'):
|
||||
task_id = content.writer_task.id
|
||||
|
||||
# Queue individual publish task
|
||||
publish_content_to_wordpress.delay(
|
||||
content.id,
|
||||
site_integration.id,
|
||||
task_id
|
||||
)
|
||||
results["queued"] += 1
|
||||
|
||||
except Exception as e:
|
||||
results["errors"].append(f"Content {content.id}: {str(e)}")
|
||||
|
||||
if results["errors"]:
|
||||
results["success"] = len(results["errors"]) < results["total"] / 2 # Success if < 50% errors
|
||||
|
||||
logger.info(f"Bulk publish: {results['queued']} queued, {results['skipped']} skipped, {len(results['errors'])} errors")
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in bulk publish: {str(e)}")
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
|
||||
@shared_task
|
||||
def wordpress_status_reconciliation() -> Dict[str, Any]:
|
||||
"""
|
||||
Daily task to reconcile status between IGNY8 and WordPress
|
||||
Checks for discrepancies and fixes them
|
||||
"""
|
||||
try:
|
||||
from igny8_core.models import ContentPost, SiteIntegration
|
||||
|
||||
# Get content marked as published to WordPress
|
||||
wp_content = ContentPost.objects.filter(
|
||||
wordpress_sync_status='success',
|
||||
wordpress_post_id__isnull=False
|
||||
)
|
||||
|
||||
active_integrations = SiteIntegration.objects.filter(
|
||||
platform='wordpress',
|
||||
is_active=True
|
||||
)
|
||||
|
||||
reconciled = 0
|
||||
errors = []
|
||||
|
||||
for integration in active_integrations:
|
||||
integration_content = wp_content.filter(
|
||||
# Assuming there's a way to link content to integration
|
||||
# This would depend on your data model
|
||||
)
|
||||
|
||||
for content in integration_content[:100]: # Limit to prevent timeouts
|
||||
try:
|
||||
# Check WordPress post status
|
||||
wp_url = f"{integration.site_url}/wp-json/igny8/v1/post-status/{content.id}/"
|
||||
headers = {'X-IGNY8-API-KEY': integration.api_key}
|
||||
|
||||
response = requests.get(wp_url, headers=headers, timeout=10)
|
||||
|
||||
if response.status_code == 200:
|
||||
wp_data = response.json().get('data', {})
|
||||
wp_status = wp_data.get('wordpress_status')
|
||||
|
||||
# Update if status changed
|
||||
if wp_status == 'trash' and content.wordpress_sync_status == 'success':
|
||||
content.wordpress_sync_status = 'failed'
|
||||
content.save(update_fields=['wordpress_sync_status'])
|
||||
reconciled += 1
|
||||
|
||||
elif response.status_code == 404:
|
||||
# Post not found on WordPress
|
||||
content.wordpress_sync_status = 'failed'
|
||||
content.wordpress_post_id = None
|
||||
content.wordpress_post_url = None
|
||||
content.save(update_fields=[
|
||||
'wordpress_sync_status', 'wordpress_post_id', 'wordpress_post_url'
|
||||
])
|
||||
reconciled += 1
|
||||
|
||||
except Exception as e:
|
||||
errors.append(f"Content {content.id}: {str(e)}")
|
||||
|
||||
logger.info(f"Status reconciliation: {reconciled} reconciled, {len(errors)} errors")
|
||||
return {"success": True, "reconciled": reconciled, "errors": errors}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in status reconciliation: {str(e)}")
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
|
||||
@shared_task
|
||||
def retry_failed_wordpress_publications() -> Dict[str, Any]:
|
||||
"""
|
||||
Retry failed WordPress publications (runs daily)
|
||||
Only retries items that failed more than 1 hour ago
|
||||
"""
|
||||
try:
|
||||
from igny8_core.models import ContentPost, SiteIntegration
|
||||
|
||||
# Find failed publications older than 1 hour
|
||||
one_hour_ago = timezone.now() - timedelta(hours=1)
|
||||
failed_content = ContentPost.objects.filter(
|
||||
wordpress_sync_status='failed',
|
||||
last_wordpress_sync__lt=one_hour_ago,
|
||||
wordpress_sync_attempts__lt=5 # Max 5 total attempts
|
||||
)
|
||||
|
||||
active_integrations = SiteIntegration.objects.filter(
|
||||
platform='wordpress',
|
||||
is_active=True
|
||||
)
|
||||
|
||||
retried = 0
|
||||
|
||||
for content in failed_content[:20]: # Limit retries per run
|
||||
for integration in active_integrations:
|
||||
# Reset status and retry
|
||||
content.wordpress_sync_status = 'pending'
|
||||
content.save(update_fields=['wordpress_sync_status'])
|
||||
|
||||
task_id = None
|
||||
if hasattr(content, 'writer_task'):
|
||||
task_id = content.writer_task.id
|
||||
|
||||
publish_content_to_wordpress.delay(
|
||||
content.id,
|
||||
integration.id,
|
||||
task_id
|
||||
)
|
||||
retried += 1
|
||||
break # Only retry with first active integration
|
||||
|
||||
logger.info(f"Retried {retried} failed WordPress publications")
|
||||
return {"success": True, "retried": retried}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error retrying failed publications: {str(e)}")
|
||||
return {"success": False, "error": str(e)}
|
||||
Reference in New Issue
Block a user