logging system

This commit is contained in:
IGNY8 VPS (Salman)
2025-12-01 00:43:38 +00:00
parent 42bc24f2c0
commit 3f2385d4d9
10 changed files with 2051 additions and 343 deletions

View File

@@ -15,6 +15,7 @@ from igny8_core.business.content.models import Content
from igny8_core.business.integration.models import SiteIntegration, SyncEvent
logger = logging.getLogger(__name__)
webhook_logger = logging.getLogger('webhooks')
class NoThrottle(BaseThrottle):
@@ -46,14 +47,22 @@ def wordpress_status_webhook(request):
}
"""
try:
webhook_logger.info("="*80)
webhook_logger.info("📥 WORDPRESS STATUS WEBHOOK RECEIVED")
webhook_logger.info(f" Headers: {dict(request.headers)}")
webhook_logger.info(f" Body: {request.data}")
webhook_logger.info("="*80)
# Validate API key
api_key = request.headers.get('X-IGNY8-API-KEY') or request.headers.get('Authorization', '').replace('Bearer ', '')
if not api_key:
webhook_logger.error(" ❌ Missing API key in request headers")
return error_response(
error='Missing API key',
status_code=http_status.HTTP_401_UNAUTHORIZED,
request=request
)
webhook_logger.info(f" ✅ API key present: ***{api_key[-4:]}")
# Get webhook data
data = request.data
@@ -63,10 +72,16 @@ def wordpress_status_webhook(request):
post_url = data.get('post_url')
site_url = data.get('site_url')
logger.info(f"[wordpress_status_webhook] Received webhook: content_id={content_id}, post_id={post_id}, status={post_status}")
webhook_logger.info(f"STEP 1: Parsing webhook data...")
webhook_logger.info(f" - Content ID: {content_id}")
webhook_logger.info(f" - Post ID: {post_id}")
webhook_logger.info(f" - Post Status: {post_status}")
webhook_logger.info(f" - Post URL: {post_url}")
webhook_logger.info(f" - Site URL: {site_url}")
# Validate required fields
if not content_id or not post_id or not post_status:
webhook_logger.error(" ❌ Missing required fields")
return error_response(
error='Missing required fields: content_id, post_id, post_status',
status_code=http_status.HTTP_400_BAD_REQUEST,

View File

@@ -515,3 +515,68 @@ CELERY_TASK_TIME_LIMIT = 30 * 60 # 30 minutes
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # 25 minutes
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
# Publish/Sync Logging Configuration
PUBLISH_SYNC_LOG_DIR = os.path.join(BASE_DIR, 'logs', 'publish-sync-logs')
os.makedirs(PUBLISH_SYNC_LOG_DIR, exist_ok=True)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '[{asctime}] [{levelname}] [{name}] {message}',
'style': '{',
'datefmt': '%Y-%m-%d %H:%M:%S',
},
'publish_sync': {
'format': '[{asctime}] [{levelname}] {message}',
'style': '{',
'datefmt': '%Y-%m-%d %H:%M:%S',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'verbose',
},
'publish_sync_file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': os.path.join(PUBLISH_SYNC_LOG_DIR, 'publish-sync.log'),
'maxBytes': 10 * 1024 * 1024, # 10 MB
'backupCount': 10,
'formatter': 'publish_sync',
},
'wordpress_api_file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': os.path.join(PUBLISH_SYNC_LOG_DIR, 'wordpress-api.log'),
'maxBytes': 10 * 1024 * 1024, # 10 MB
'backupCount': 10,
'formatter': 'publish_sync',
},
'webhook_file': {
'class': 'logging.handlers.RotatingFileHandler',
'filename': os.path.join(PUBLISH_SYNC_LOG_DIR, 'webhooks.log'),
'maxBytes': 10 * 1024 * 1024, # 10 MB
'backupCount': 10,
'formatter': 'publish_sync',
},
},
'loggers': {
'publish_sync': {
'handlers': ['console', 'publish_sync_file'],
'level': 'INFO',
'propagate': False,
},
'wordpress_api': {
'handlers': ['console', 'wordpress_api_file'],
'level': 'INFO',
'propagate': False,
},
'webhooks': {
'handlers': ['console', 'webhook_file'],
'level': 'INFO',
'propagate': False,
},
},
}

View File

@@ -1,7 +1,8 @@
"""
IGNY8 Content Publishing Celery Tasks
IGNY8 Content Publishing Celery Tasks - WITH COMPREHENSIVE LOGGING
Handles automated publishing of content from IGNY8 to WordPress sites.
All workflow steps are logged to files for debugging and monitoring.
"""
from celery import shared_task
from django.conf import settings
@@ -9,15 +10,21 @@ from django.utils import timezone
from datetime import timedelta
import requests
import logging
import time
import json
from typing import Dict, List, Any, Optional
# Standard logger
logger = logging.getLogger(__name__)
# Dedicated file loggers
publish_logger = logging.getLogger('publish_sync')
api_logger = logging.getLogger('wordpress_api')
@shared_task(bind=True, max_retries=3)
def publish_content_to_wordpress(self, content_id: int, site_integration_id: int, task_id: Optional[int] = None) -> Dict[str, Any]:
"""
Publish a single content item to WordPress
Publish a single content item to WordPress with comprehensive logging
Args:
content_id: IGNY8 content ID
@@ -27,104 +34,147 @@ def publish_content_to_wordpress(self, content_id: int, site_integration_id: int
Returns:
Dict with success status and details
"""
start_time = time.time()
try:
from igny8_core.business.content.models import Content
from igny8_core.business.integration.models import SiteIntegration
from igny8_core.business.content.models import Content, ContentTaxonomyMap
from igny8_core.business.integration.models import SiteIntegration, SyncEvent
from igny8_core.modules.writer.models import Images
from django.utils.html import strip_tags
logger.info(f"[publish_content_to_wordpress] 🎯 Celery task started: content_id={content_id}, site_integration_id={site_integration_id}")
publish_logger.info("="*80)
publish_logger.info(f"🎯 PUBLISH WORKFLOW STARTED")
publish_logger.info(f" Content ID: {content_id}")
publish_logger.info(f" Site Integration ID: {site_integration_id}")
publish_logger.info(f" Task ID: {task_id}")
publish_logger.info("="*80)
# Get content and site integration
# STEP 1: Load content and integration from database
try:
publish_logger.info(f"STEP 1: Loading content and integration from database...")
content = Content.objects.get(id=content_id)
logger.info(f"[publish_content_to_wordpress] 📄 Content loaded: title='{content.title}'")
site_integration = SiteIntegration.objects.get(id=site_integration_id)
logger.info(f"[publish_content_to_wordpress] 🔌 Integration loaded: platform={site_integration.platform}, site={site_integration.site.name}")
# Extract site info for logging context
site_id = site_integration.site.id if site_integration.site else 'unknown'
site_domain = site_integration.base_url.replace('https://', '').replace('http://', '').split('/')[0] if site_integration.base_url else 'unknown'
log_prefix = f"[{site_id}-{site_domain}]"
publish_logger.info(f" ✅ Content loaded:")
publish_logger.info(f" {log_prefix} Title: '{content.title}'")
publish_logger.info(f" {log_prefix} Status: {content.status}")
publish_logger.info(f" {log_prefix} Type: {content.content_type}")
publish_logger.info(f" {log_prefix} Site: {content.site.name if content.site else 'None'}")
publish_logger.info(f" {log_prefix} Account: {content.account.name if content.account else 'None'}")
publish_logger.info(f" ✅ Integration loaded:")
publish_logger.info(f" {log_prefix} Platform: {site_integration.platform}")
publish_logger.info(f" {log_prefix} Site: {site_integration.site.name}")
publish_logger.info(f" {log_prefix} Base URL: {site_integration.base_url}")
publish_logger.info(f" {log_prefix} API Key: {'***' + site_integration.api_key[-4:] if site_integration.api_key else 'None'}")
except (Content.DoesNotExist, SiteIntegration.DoesNotExist) as e:
logger.error(f"[publish_content_to_wordpress] ❌ Content or site integration not found: {e}")
publish_logger.error(f" ❌ Database lookup failed: {e}")
return {"success": False, "error": str(e)}
# Check if content is already published
# STEP 2: Check if already published
publish_logger.info(f"{log_prefix} STEP 2: Checking if content is already published...")
if content.external_id:
logger.info(f"[publish_content_to_wordpress] ⚠️ Content {content_id} already published: external_id={content.external_id}")
publish_logger.info(f" {log_prefix} ⚠️ Content already published:")
publish_logger.info(f" {log_prefix} External ID: {content.external_id}")
publish_logger.info(f" {log_prefix} External URL: {content.external_url}")
publish_logger.info(f" {log_prefix} ⏭️ Skipping publish workflow")
return {"success": True, "message": "Already published", "external_id": content.external_id}
else:
publish_logger.info(f" {log_prefix} ✅ Content not yet published, proceeding...")
logger.info(f"[publish_content_to_wordpress] 📦 Preparing content payload...")
logger.info(f"[publish_content_to_wordpress] Content title: '{content.title}'")
logger.info(f"[publish_content_to_wordpress] Content status: '{content.status}'")
logger.info(f"[publish_content_to_wordpress] Content type: '{content.content_type}'")
# Prepare content data for WordPress
# Generate excerpt from content_html (Content model has no 'brief' field)
# STEP 3: Generate excerpt from HTML content
publish_logger.info(f"{log_prefix} STEP 3: Generating excerpt from content HTML...")
excerpt = ''
if content.content_html:
from django.utils.html import strip_tags
excerpt = strip_tags(content.content_html)[:150].strip()
if len(content.content_html) > 150:
excerpt += '...'
logger.info(f"[publish_content_to_wordpress] Content HTML length: {len(content.content_html)} chars")
publish_logger.info(f" {log_prefix} ✅ Excerpt generated:")
publish_logger.info(f" {log_prefix} Content HTML length: {len(content.content_html)} chars")
publish_logger.info(f" {log_prefix} Excerpt: {excerpt[:80]}...")
else:
logger.warning(f"[publish_content_to_wordpress] ⚠️ No content_html found!")
publish_logger.warning(f" {log_prefix} ⚠️ No content_html found - excerpt will be empty")
# Get taxonomy terms from ContentTaxonomyMap
from igny8_core.business.content.models import ContentTaxonomyMap
# STEP 4: Get taxonomy terms (categories)
publish_logger.info(f"{log_prefix} STEP 4: Loading taxonomy mappings for categories...")
taxonomy_maps = ContentTaxonomyMap.objects.filter(content=content).select_related('taxonomy')
logger.info(f"[publish_content_to_wordpress] Found {taxonomy_maps.count()} taxonomy mappings")
publish_logger.info(f" {log_prefix} Found {taxonomy_maps.count()} taxonomy mappings")
# Build categories and tags arrays from taxonomy mappings
categories = []
tags = []
for mapping in taxonomy_maps:
tax = mapping.taxonomy
if tax:
# Add taxonomy term name to categories (will be mapped in WordPress)
categories.append(tax.name)
logger.info(f"[publish_content_to_wordpress] 📁 Added category: '{tax.name}'")
if mapping.taxonomy:
categories.append(mapping.taxonomy.name)
publish_logger.info(f" {log_prefix} 📁 Category: '{mapping.taxonomy.name}'")
# Get images from Images model
from igny8_core.modules.writer.models import Images
featured_image_url = None
gallery_images = []
if not categories:
publish_logger.warning(f" {log_prefix} ⚠️ No categories found for content")
else:
publish_logger.info(f" {log_prefix} ✅ TOTAL categories: {len(categories)}")
images = Images.objects.filter(content=content).order_by('position')
logger.info(f"[publish_content_to_wordpress] Found {images.count()} images for content")
# STEP 5: Get keywords as tags
publish_logger.info(f"{log_prefix} STEP 5: Extracting keywords as tags...")
tags = []
for image in images:
if image.image_type == 'featured' and image.image_url:
featured_image_url = image.image_url
logger.info(f"[publish_content_to_wordpress] 🖼️ Featured image: {image.image_url[:100]}")
elif image.image_type == 'in_article' and image.image_url:
gallery_images.append({
'url': image.image_url,
'alt': image.alt_text or '',
# Add primary and secondary keywords as tags
if content.primary_keyword:
tags.append(content.primary_keyword)
logger.info(f"[publish_content_to_wordpress] 🏷️ Primary keyword (tag): '{content.primary_keyword}'")
publish_logger.info(f" {log_prefix} 🏷️ Primary keyword: '{content.primary_keyword}'")
else:
logger.info(f"[publish_content_to_wordpress] No primary keyword found")
publish_logger.warning(f" {log_prefix} ⚠️ No primary keyword found")
if content.secondary_keywords:
if isinstance(content.secondary_keywords, list):
tags.extend(content.secondary_keywords)
logger.info(f"[publish_content_to_wordpress] 🏷️ Added {len(content.secondary_keywords)} secondary keywords as tags")
publish_logger.info(f" {log_prefix} 🏷️ Secondary keywords (list): {content.secondary_keywords}")
elif isinstance(content.secondary_keywords, str):
import json
try:
keywords = json.loads(content.secondary_keywords)
if isinstance(keywords, list):
tags.extend(keywords)
logger.info(f"[publish_content_to_wordpress] 🏷️ Added {len(keywords)} secondary keywords as tags (from JSON)")
publish_logger.info(f" {log_prefix} 🏷️ Secondary keywords (JSON): {keywords}")
except (json.JSONDecodeError, TypeError):
logger.warning(f"[publish_content_to_wordpress] Failed to parse secondary_keywords as JSON")
publish_logger.warning(f" {log_prefix} ⚠️ Failed to parse secondary_keywords as JSON: {content.secondary_keywords}")
else:
logger.info(f"[publish_content_to_wordpress] No secondary keywords found")
publish_logger.warning(f" {log_prefix} ⚠️ No secondary keywords found")
logger.info(f"[publish_content_to_wordpress] 📊 TOTAL: {len(categories)} categories, {len(tags)} tags")ords = json.loads(content.secondary_keywords)
if isinstance(keywords, list):
tags.extend(keywords)
except (json.JSONDecodeError, TypeError):
pass
if not tags:
publish_logger.warning(f" {log_prefix} ⚠️ No tags found for content")
else:
publish_logger.info(f" {log_prefix} ✅ TOTAL tags: {len(tags)}")
# STEP 6: Get images (featured + gallery)
publish_logger.info(f"{log_prefix} STEP 6: Loading images for content...")
images = Images.objects.filter(content=content).order_by('position')
publish_logger.info(f" {log_prefix} Found {images.count()} images")
featured_image_url = None
gallery_images = []
for image in images:
if image.image_type == 'featured' and image.image_url:
featured_image_url = image.image_url
publish_logger.info(f" {log_prefix} 🖼️ Featured image: {image.image_url[:80]}...")
elif image.image_type == 'in_article' and image.image_url:
gallery_images.append({
'url': image.image_url,
'alt': image.alt_text or '',
'caption': image.caption or ''
})
publish_logger.info(f" {log_prefix} 🖼️ Gallery image {len(gallery_images)}: {image.image_url[:60]}...")
if not featured_image_url:
publish_logger.warning(f" {log_prefix} ⚠️ No featured image found")
if not gallery_images:
publish_logger.info(f" {log_prefix} No gallery images found")
else:
publish_logger.info(f" {log_prefix} ✅ Gallery images: {len(gallery_images)}")
# STEP 7: Prepare content payload
publish_logger.info(f"{log_prefix} STEP 7: Building WordPress API payload...")
content_data = {
'content_id': content.id,
'task_id': task_id,
@@ -132,173 +182,245 @@ def publish_content_to_wordpress(self, content_id: int, site_integration_id: int
'content_html': content.content_html or '',
'excerpt': excerpt,
'status': 'publish',
# Content model has no author field - use site default author in WordPress
'author_email': None,
'author_name': None,
# Content model has no published_at - WordPress will use current time
'published_at': None,
# Use correct Content model field names
'seo_title': content.meta_title or '',
'seo_description': content.meta_description or '',
'primary_keyword': content.primary_keyword or '',
'secondary_keywords': content.secondary_keywords or [],
# Send featured image URL from Images model
'featured_image_url': featured_image_url,
'gallery_images': gallery_images,
# Send cluster and sector IDs (Content has ForeignKey to cluster, not many-to-many)
'cluster_id': content.cluster.id if content.cluster else None,
'sector_id': content.sector.id if content.sector else None,
# Send categories and tags from taxonomy mappings and keywords
'categories': categories,
'tags': tags,
# Keep for backward compatibility
'sectors': [],
'clusters': []
logger.info(f"[publish_content_to_wordpress] 🚀 POSTing to WordPress: {wordpress_url}")
logger.info(f"[publish_content_to_wordpress] 📦 Payload summary:")
logger.info(f" - Categories: {categories}")
logger.info(f" - Tags: {tags}")
logger.info(f" - Featured image: {'Yes' if featured_image_url else 'No'}")
logger.info(f" - Gallery images: {len(gallery_images)}")
logger.info(f" - SEO title: {'Yes' if content_data.get('seo_title') else 'No'}")
logger.info(f" - SEO description: {'Yes' if content_data.get('seo_description') else 'No'}")
}
response = requests.post(
wordpress_url,
json=content_data,
headers=headers,
timeout=30
# Update external_id and external_url for unified Content model
old_status = content.status
content.external_id = wp_data.get('post_id')
content.external_url = wp_data.get('post_url')
content.status = 'published'
content.save(update_fields=[
'external_id', 'external_url', 'status', 'updated_at'
])
logger.info(f"[publish_content_to_wordpress] 💾 Content model updated:")
logger.info(f" - Status: '{old_status}''published'")
logger.info(f" - External ID: {content.external_id}")
logger.info(f" - External URL: {content.external_url}")
wordpress_url,
json=content_data,
headers=headers,
timeout=30
)
logger.info(f"[publish_content_to_wordpress] 📬 WordPress response: status={response.status_code}")
publish_logger.info(f" {log_prefix} ✅ Payload built:")
publish_logger.info(f" {log_prefix} Title: {content.title}")
publish_logger.info(f" {log_prefix} Content HTML: {len(content_data['content_html'])} chars")
publish_logger.info(f" {log_prefix} Categories: {len(categories)}")
publish_logger.info(f" {log_prefix} Tags: {len(tags)}")
publish_logger.info(f" {log_prefix} Featured image: {'Yes' if featured_image_url else 'No'}")
publish_logger.info(f" {log_prefix} Gallery images: {len(gallery_images)}")
publish_logger.info(f" {log_prefix} SEO title: {'Yes' if content_data['seo_title'] else 'No'}")
publish_logger.info(f" {log_prefix} SEO description: {'Yes' if content_data['seo_description'] else 'No'}")
publish_logger.info(f" {log_prefix} Cluster ID: {content_data['cluster_id']}")
publish_logger.info(f" {log_prefix} Sector ID: {content_data['sector_id']}")
# Track start time for duration measurement
import time
from igny8_core.business.integration.models import SyncEvent
start_time = time.time()
# STEP 8: Send API request to WordPress
wordpress_url = f"{site_integration.base_url}/wp-json/igny8/v1/publish"
headers = {
'X-IGNY8-API-Key': site_integration.api_key,
'Content-Type': 'application/json',
}
publish_logger.info(f"{log_prefix} STEP 8: Sending POST request to WordPress...")
api_logger.info(f"{log_prefix} API REQUEST: POST {wordpress_url}")
api_logger.info(f" {log_prefix} Headers: X-IGNY8-API-Key: ***{site_integration.api_key[-4:]}")
api_logger.info(f" {log_prefix} Payload: {json.dumps(content_data, indent=2)[:500]}...")
try:
response = requests.post(
wordpress_url,
json=content_data,
headers=headers,
timeout=30
)
api_logger.info(f"{log_prefix} API RESPONSE: {response.status_code}")
api_logger.info(f" {log_prefix} Response body: {response.text[:500]}")
publish_logger.info(f" {log_prefix} ✅ WordPress responded: HTTP {response.status_code}")
except requests.exceptions.Timeout as e:
error_msg = f"WordPress API timeout after 30s: {str(e)}"
api_logger.error(f"{log_prefix} API ERROR: {error_msg}")
publish_logger.error(f" {log_prefix} ❌ Request timed out after 30 seconds")
# Log failure event
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='error',
action='content_publish',
description=f"Timeout publishing '{content.title}' to WordPress",
success=False,
content_id=content.id,
error_message=error_msg,
duration_ms=int((time.time() - start_time) * 1000)
)
return {"success": False, "error": error_msg}
except requests.exceptions.RequestException as e:
error_msg = f"WordPress API request failed: {str(e)}"
api_logger.error(f"{log_prefix} API ERROR: {error_msg}")
publish_logger.error(f" {log_prefix} ❌ Request exception: {str(e)}")
# Log failure event
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='error',
action='content_publish',
description=f"Request error publishing '{content.title}' to WordPress",
success=False,
content_id=content.id,
error_message=error_msg,
duration_ms=int((time.time() - start_time) * 1000)
)
return {"success": False, "error": error_msg}
# STEP 9: Process response
publish_logger.info(f"{log_prefix} STEP 9: Processing WordPress response...")
if response.status_code == 201:
# Success
wp_data = response.json().get('data', {})
wp_status = wp_data.get('post_status', 'publish')
logger.info(f"[publish_content_to_wordpress] ✅ WordPress post created successfully: post_id={wp_data.get('post_id')}, status={wp_status}")
# Update external_id, external_url, and wordpress_status in Content model
content.external_id = str(wp_data.get('post_id'))
content.external_url = wp_data.get('post_url')
content.status = 'published'
# Add wordpress_status field to Content model metadata
if not hasattr(content, 'metadata') or content.metadata is None:
content.metadata = {}
content.metadata['wordpress_status'] = wp_status
content.save(update_fields=[
'external_id', 'external_url', 'status', 'metadata', 'updated_at'
])
logger.info(f"[publish_content_to_wordpress] 💾 Content model updated:")
logger.info(f" - External ID: {content.external_id}")
logger.info(f" - External URL: {content.external_url}")
logger.info(f" - Status: published")
logger.info(f" - WordPress Status: {wp_status}")
# Log sync event
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='publish',
action='content_publish',
description=f"Published content '{content.title}' to WordPress",
success=True,
content_id=content.id,
external_id=str(content.external_id),
details={
'post_url': content.external_url,
'wordpress_status': wp_status,
'categories': categories,
'tags': tags,
'has_featured_image': bool(featured_image_url),
'gallery_images_count': len(gallery_images),
},
duration_ms=duration_ms
)
logger.info(f"[publish_content_to_wordpress] 🎉 Successfully published content {content_id} to WordPress post {content.external_id}")
return {
"success": True,
"external_id": content.external_id,
"external_url": content.external_url,
"wordpress_status": wp_status
}
# SUCCESS - Post created
publish_logger.info(f" {log_prefix} ✅ WordPress post created successfully (HTTP 201)")
try:
response_data = response.json()
wp_data = response_data.get('data', {})
wp_status = wp_data.get('post_status', 'publish')
publish_logger.info(f" {log_prefix} Post ID: {wp_data.get('post_id')}")
publish_logger.info(f" {log_prefix} Post URL: {wp_data.get('post_url')}")
publish_logger.info(f" {log_prefix} Post status: {wp_status}")
# Update Content model
publish_logger.info(f"{log_prefix} STEP 10: Updating IGNY8 Content model...")
content.external_id = str(wp_data.get('post_id'))
content.external_url = wp_data.get('post_url')
content.status = 'published'
if not hasattr(content, 'metadata') or content.metadata is None:
content.metadata = {}
content.metadata['wordpress_status'] = wp_status
content.save(update_fields=['external_id', 'external_url', 'status', 'metadata', 'updated_at'])
publish_logger.info(f" {log_prefix} ✅ Content model updated:")
publish_logger.info(f" {log_prefix} External ID: {content.external_id}")
publish_logger.info(f" {log_prefix} External URL: {content.external_url}")
publish_logger.info(f" {log_prefix} Status: published")
publish_logger.info(f" {log_prefix} WordPress status: {wp_status}")
# Log success event
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='publish',
action='content_publish',
description=f"Published content '{content.title}' to WordPress",
success=True,
content_id=content.id,
external_id=str(content.external_id),
details={
'post_url': content.external_url,
'wordpress_status': wp_status,
'categories': categories,
'tags': tags,
'has_featured_image': bool(featured_image_url),
'gallery_images_count': len(gallery_images),
},
duration_ms=duration_ms
)
publish_logger.info(f"="*80)
publish_logger.info(f"{log_prefix} 🎉 PUBLISH WORKFLOW COMPLETED SUCCESSFULLY")
publish_logger.info(f" {log_prefix} Duration: {duration_ms}ms")
publish_logger.info(f" {log_prefix} WordPress Post ID: {content.external_id}")
publish_logger.info("="*80)
return {
"success": True,
"external_id": content.external_id,
"external_url": content.external_url,
"wordpress_status": wp_status
}
except (json.JSONDecodeError, KeyError) as e:
error_msg = f"Failed to parse WordPress response: {str(e)}"
publish_logger.error(f" ❌ Response parsing error: {error_msg}")
publish_logger.error(f" Raw response: {response.text[:200]}")
return {"success": False, "error": error_msg}
elif response.status_code == 409:
# Content already exists
wp_data = response.json().get('data', {})
wp_status = wp_data.get('post_status', 'publish')
content.external_id = str(wp_data.get('post_id'))
content.external_url = wp_data.get('post_url')
content.status = 'published'
# Update wordpress_status in metadata
if not hasattr(content, 'metadata') or content.metadata is None:
content.metadata = {}
content.metadata['wordpress_status'] = wp_status
content.save(update_fields=[
'external_id', 'external_url', 'status', 'metadata', 'updated_at'
])
# Log sync event
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='sync',
action='content_publish',
description=f"Content '{content.title}' already exists in WordPress",
success=True,
content_id=content.id,
external_id=str(content.external_id),
details={
'post_url': content.external_url,
'wordpress_status': wp_status,
'already_exists': True,
},
duration_ms=duration_ms
)
logger.info(f"Content {content_id} already exists on WordPress")
return {
"success": True,
"message": "Content already exists",
"external_id": content.external_id,
"wordpress_status": wp_status
}
# CONFLICT - Post already exists
publish_logger.info(f" {log_prefix} ⚠️ Content already exists in WordPress (HTTP 409)")
try:
response_data = response.json()
wp_data = response_data.get('data', {})
wp_status = wp_data.get('post_status', 'publish')
publish_logger.info(f" {log_prefix} Existing Post ID: {wp_data.get('post_id')}")
publish_logger.info(f" {log_prefix} Post URL: {wp_data.get('post_url')}")
# Update Content model
content.external_id = str(wp_data.get('post_id'))
content.external_url = wp_data.get('post_url')
content.status = 'published'
if not hasattr(content, 'metadata') or content.metadata is None:
content.metadata = {}
content.metadata['wordpress_status'] = wp_status
content.save(update_fields=['external_id', 'external_url', 'status', 'metadata', 'updated_at'])
# Log sync event
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='sync',
action='content_publish',
description=f"Content '{content.title}' already exists in WordPress",
success=True,
content_id=content.id,
external_id=str(content.external_id),
details={
'post_url': content.external_url,
'wordpress_status': wp_status,
'already_exists': True,
},
duration_ms=duration_ms
)
publish_logger.info(f"="*80)
publish_logger.info(f"{log_prefix} ✅ PUBLISH WORKFLOW COMPLETED (already exists)")
publish_logger.info(f" {log_prefix} Duration: {duration_ms}ms")
publish_logger.info("="*80)
return {
"success": True,
"message": "Content already exists",
"external_id": content.external_id,
"wordpress_status": wp_status
}
except (json.JSONDecodeError, KeyError) as e:
error_msg = f"Failed to parse 409 response: {str(e)}"
publish_logger.error(f" ❌ Response parsing error: {error_msg}")
return {"success": False, "error": error_msg}
else:
# Error
error_msg = f"WordPress API error: {response.status_code} - {response.text}"
logger.error(f"[publish_content_to_wordpress] ❌ {error_msg}")
# ERROR - Unexpected status code
error_msg = f"WordPress API error: HTTP {response.status_code}"
publish_logger.error(f" {log_prefix} ❌ Unexpected status code: {response.status_code}")
publish_logger.error(f" {log_prefix} Response: {response.text[:500]}")
# Log sync event for failure
api_logger.error(f"{log_prefix} API ERROR: {error_msg}")
api_logger.error(f" {log_prefix} Full response: {response.text}")
# Log failure event
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
@@ -312,47 +434,57 @@ def publish_content_to_wordpress(self, content_id: int, site_integration_id: int
error_message=error_msg,
details={
'status_code': response.status_code,
'response_text': response.text[:500], # Limit length
'response_text': response.text[:500],
},
duration_ms=duration_ms
)
# Retry logic
if self.request.retries < self.max_retries:
# Exponential backoff: 1min, 5min, 15min
countdown = 60 * (5 ** self.request.retries)
logger.warning(f"[publish_content_to_wordpress] 🔄 Retrying (attempt {self.request.retries + 1}/{self.max_retries}) in {countdown}s")
publish_logger.warning(f" {log_prefix} 🔄 Scheduling retry (attempt {self.request.retries + 1}/{self.max_retries}) in {countdown}s")
raise self.retry(countdown=countdown, exc=Exception(error_msg))
else:
# Max retries reached - mark as failed
logger.error(f"[publish_content_to_wordpress] ❌ Max retries reached, giving up")
publish_logger.error(f" {log_prefix} Max retries reached, giving up")
publish_logger.info(f"="*80)
publish_logger.error(f"{log_prefix} ❌ PUBLISH WORKFLOW FAILED")
publish_logger.info(f" {log_prefix} Duration: {duration_ms}ms")
publish_logger.info("="*80)
return {"success": False, "error": error_msg}
except Exception as e:
logger.error(f"[publish_content_to_wordpress] ❌ Exception during publish: {str(e)}", exc_info=True)
duration_ms = int((time.time() - start_time) * 1000)
# Try to use log_prefix if available
try:
prefix = log_prefix
except:
prefix = "[unknown-site]"
# Log sync event for exception
publish_logger.error(f"="*80)
publish_logger.error(f"{prefix} 💥 PUBLISH WORKFLOW EXCEPTION")
publish_logger.error(f" {prefix} Exception type: {type(e).__name__}")
publish_logger.error(f" {prefix} Exception message: {str(e)}")
publish_logger.error(f" {prefix} Duration: {duration_ms}ms")
publish_logger.error("="*80, exc_info=True)
# Try to log sync event
try:
from igny8_core.business.integration.models import SyncEvent
import time
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='error',
action='content_publish',
description=f"Exception while publishing content '{content.title}' to WordPress",
description=f"Exception publishing '{content.title}' to WordPress",
success=False,
content_id=content.id,
error_message=str(e),
details={
'exception_type': type(e).__name__,
'traceback': str(e),
},
details={'exception_type': type(e).__name__},
duration_ms=duration_ms
)
except Exception as log_error:
logger.error(f"Failed to log sync event: {str(log_error)}")
publish_logger.error(f"Failed to log sync event: {str(log_error)}")
return {"success": False, "error": str(e)}
@@ -361,133 +493,39 @@ def publish_content_to_wordpress(self, content_id: int, site_integration_id: int
def process_pending_wordpress_publications() -> Dict[str, Any]:
"""
Process all content items pending WordPress publication
Runs every 5 minutes
"""
try:
from igny8_core.business.content.models import Content
from igny8_core.business.integration.models import SiteIntegration
# Find content marked for WordPress publishing (status = published, external_id = empty)
pending_content = Content.objects.filter(
status='published',
external_id__isnull=True
).select_related('site', 'sector', 'cluster')
if not pending_content.exists():
logger.info("No content pending WordPress publication")
publish_logger.info("No content pending WordPress publication")
return {"success": True, "processed": 0}
# Get active WordPress integrations
active_integrations = SiteIntegration.objects.filter(
platform='wordpress',
is_active=True
)
if not active_integrations.exists():
logger.warning("No active WordPress integrations found")
publish_logger.warning("No active WordPress integrations found")
return {"success": False, "error": "No active WordPress integrations"}
processed = 0
for content in pending_content[:50]: # Process max 50 at a time
for content in pending_content[:50]:
for integration in active_integrations.filter(site=content.site):
# Queue individual publish task
publish_content_to_wordpress.delay(
content.id,
integration.id
)
publish_content_to_wordpress.delay(content.id, integration.id)
processed += 1
break # Only queue with first matching integration
break
logger.info(f"Queued {processed} content items for WordPress publication")
publish_logger.info(f"Queued {processed} content items for WordPress publication")
return {"success": True, "processed": processed}
except Exception as e:
logger.error(f"Error processing pending WordPress publications: {str(e)}", exc_info=True)
publish_logger.error(f"Error processing pending publications: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)}
@shared_task
def bulk_publish_content_to_wordpress(content_ids: List[int], site_integration_id: int) -> Dict[str, Any]:
"""
Bulk publish multiple content items to WordPress
Used for manual bulk operations from Content Manager
"""
try:
from igny8_core.business.content.models import Content
from igny8_core.business.integration.models import SiteIntegration
site_integration = SiteIntegration.objects.get(id=site_integration_id)
content_items = Content.objects.filter(id__in=content_ids)
results = {
"success": True,
"total": len(content_ids),
"queued": 0,
"skipped": 0,
"errors": []
}
for content in content_items:
try:
# Skip if already published
if content.external_id:
results["skipped"] += 1
continue
# Queue individual publish task
publish_content_to_wordpress.delay(
content.id,
site_integration.id
)
results["queued"] += 1
except Exception as e:
results["errors"].append(f"Content {content.id}: {str(e)}")
if results["errors"]:
results["success"] = len(results["errors"]) < results["total"] / 2
logger.info(f"Bulk publish: {results['queued']} queued, {results['skipped']} skipped, {len(results['errors'])} errors")
return results
except Exception as e:
logger.error(f"Error in bulk publish: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)}
@shared_task
def wordpress_status_reconciliation() -> Dict[str, Any]:
"""
Daily task to verify published content still exists on WordPress
Checks for discrepancies and fixes them
"""
try:
from igny8_core.business.content.models import Content
# Get content marked as published
published_content = Content.objects.filter(
external_id__isnull=False
)[:100] # Limit to prevent timeouts
logger.info(f"Status reconciliation: Checking {len(published_content)} published items")
return {"success": True, "checked": len(published_content)}
except Exception as e:
logger.error(f"Error in status reconciliation: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)}
@shared_task
def retry_failed_wordpress_publications() -> Dict[str, Any]:
"""
Retry failed WordPress publications (runs daily)
For future use when we implement failure tracking
"""
try:
logger.info("Retry task: No failure tracking currently implemented")
return {"success": True, "retried": 0}
except Exception as e:
logger.error(f"Error retrying failed publications: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)}

View File

@@ -0,0 +1,508 @@
"""
IGNY8 Content Publishing Celery Tasks
Handles automated publishing of content from IGNY8 to WordPress sites.
"""
from celery import shared_task
from django.conf import settings
from django.utils import timezone
from datetime import timedelta
import requests
import logging
from typing import Dict, List, Any, Optional
logger = logging.getLogger(__name__)
publish_logger = logging.getLogger('publish_sync')
api_logger = logging.getLogger('wordpress_api')
@shared_task(bind=True, max_retries=3)
def publish_content_to_wordpress(self, content_id: int, site_integration_id: int, task_id: Optional[int] = None) -> Dict[str, Any]:
"""
Publish a single content item to WordPress
Args:
content_id: IGNY8 content ID
site_integration_id: WordPress site integration ID
task_id: Optional IGNY8 task ID
Returns:
Dict with success status and details
"""
try:
from igny8_core.business.content.models import Content
from igny8_core.business.integration.models import SiteIntegration
publish_logger.info("="*80)
publish_logger.info(f"🎯 PUBLISH WORKFLOW STARTED")
publish_logger.info(f" Content ID: {content_id}")
publish_logger.info(f" Site Integration ID: {site_integration_id}")
publish_logger.info(f" Task ID: {task_id}")
publish_logger.info("="*80)
# Get content and site integration
try:
publish_logger.info(f"STEP 1: Loading content and integration from database...")
content = Content.objects.get(id=content_id)
publish_logger.info(f" ✅ Content loaded: '{content.title}'")
publish_logger.info(f" - Status: {content.status}")
publish_logger.info(f" - Type: {content.content_type}")
publish_logger.info(f" - Site: {content.site.name if content.site else 'None'}")
site_integration = SiteIntegration.objects.get(id=site_integration_id)
publish_logger.info(f" ✅ Integration loaded:")
publish_logger.info(f" - Platform: {site_integration.platform}")
publish_logger.info(f" - Site: {site_integration.site.name}")
publish_logger.info(f" - Base URL: {site_integration.base_url}")
except (Content.DoesNotExist, SiteIntegration.DoesNotExist) as e:
publish_logger.error(f" ❌ Database lookup failed: {e}")
return {"success": False, "error": str(e)}
# Check if content is already published
if content.external_id:
logger.info(f"[publish_content_to_wordpress] ⚠️ Content {content_id} already published: external_id={content.external_id}")
return {"success": True, "message": "Already published", "external_id": content.external_id}
logger.info(f"[publish_content_to_wordpress] 📦 Preparing content payload...")
logger.info(f"[publish_content_to_wordpress] Content title: '{content.title}'")
logger.info(f"[publish_content_to_wordpress] Content status: '{content.status}'")
logger.info(f"[publish_content_to_wordpress] Content type: '{content.content_type}'")
# Prepare content data for WordPress
# Generate excerpt from content_html (Content model has no 'brief' field)
excerpt = ''
if content.content_html:
from django.utils.html import strip_tags
excerpt = strip_tags(content.content_html)[:150].strip()
if len(content.content_html) > 150:
excerpt += '...'
logger.info(f"[publish_content_to_wordpress] Content HTML length: {len(content.content_html)} chars")
else:
logger.warning(f"[publish_content_to_wordpress] ⚠️ No content_html found!")
# Get taxonomy terms from ContentTaxonomyMap
from igny8_core.business.content.models import ContentTaxonomyMap
taxonomy_maps = ContentTaxonomyMap.objects.filter(content=content).select_related('taxonomy')
logger.info(f"[publish_content_to_wordpress] Found {taxonomy_maps.count()} taxonomy mappings")
# Build categories and tags arrays from taxonomy mappings
categories = []
tags = []
for mapping in taxonomy_maps:
tax = mapping.taxonomy
if tax:
# Add taxonomy term name to categories (will be mapped in WordPress)
categories.append(tax.name)
logger.info(f"[publish_content_to_wordpress] 📁 Added category: '{tax.name}'")
# Get images from Images model
from igny8_core.modules.writer.models import Images
featured_image_url = None
gallery_images = []
images = Images.objects.filter(content=content).order_by('position')
logger.info(f"[publish_content_to_wordpress] Found {images.count()} images for content")
for image in images:
if image.image_type == 'featured' and image.image_url:
featured_image_url = image.image_url
logger.info(f"[publish_content_to_wordpress] 🖼️ Featured image: {image.image_url[:100]}")
elif image.image_type == 'in_article' and image.image_url:
gallery_images.append({
'url': image.image_url,
'alt': image.alt_text or '',
# Add primary and secondary keywords as tags
if content.primary_keyword:
tags.append(content.primary_keyword)
logger.info(f"[publish_content_to_wordpress] 🏷️ Primary keyword (tag): '{content.primary_keyword}'")
else:
logger.info(f"[publish_content_to_wordpress] No primary keyword found")
if content.secondary_keywords:
if isinstance(content.secondary_keywords, list):
tags.extend(content.secondary_keywords)
logger.info(f"[publish_content_to_wordpress] 🏷️ Added {len(content.secondary_keywords)} secondary keywords as tags")
elif isinstance(content.secondary_keywords, str):
import json
try:
keywords = json.loads(content.secondary_keywords)
if isinstance(keywords, list):
tags.extend(keywords)
logger.info(f"[publish_content_to_wordpress] 🏷️ Added {len(keywords)} secondary keywords as tags (from JSON)")
except (json.JSONDecodeError, TypeError):
logger.warning(f"[publish_content_to_wordpress] Failed to parse secondary_keywords as JSON")
else:
logger.info(f"[publish_content_to_wordpress] No secondary keywords found")
logger.info(f"[publish_content_to_wordpress] 📊 TOTAL: {len(categories)} categories, {len(tags)} tags")ords = json.loads(content.secondary_keywords)
if isinstance(keywords, list):
tags.extend(keywords)
except (json.JSONDecodeError, TypeError):
pass
content_data = {
'content_id': content.id,
'task_id': task_id,
'title': content.title,
'content_html': content.content_html or '',
'excerpt': excerpt,
'status': 'publish',
# Content model has no author field - use site default author in WordPress
'author_email': None,
'author_name': None,
# Content model has no published_at - WordPress will use current time
'published_at': None,
# Use correct Content model field names
'seo_title': content.meta_title or '',
'seo_description': content.meta_description or '',
'primary_keyword': content.primary_keyword or '',
'secondary_keywords': content.secondary_keywords or [],
# Send featured image URL from Images model
'featured_image_url': featured_image_url,
'gallery_images': gallery_images,
# Send cluster and sector IDs (Content has ForeignKey to cluster, not many-to-many)
'cluster_id': content.cluster.id if content.cluster else None,
'sector_id': content.sector.id if content.sector else None,
# Send categories and tags from taxonomy mappings and keywords
'categories': categories,
'tags': tags,
# Keep for backward compatibility
'sectors': [],
'clusters': []
logger.info(f"[publish_content_to_wordpress] 🚀 POSTing to WordPress: {wordpress_url}")
logger.info(f"[publish_content_to_wordpress] 📦 Payload summary:")
logger.info(f" - Categories: {categories}")
logger.info(f" - Tags: {tags}")
logger.info(f" - Featured image: {'Yes' if featured_image_url else 'No'}")
logger.info(f" - Gallery images: {len(gallery_images)}")
logger.info(f" - SEO title: {'Yes' if content_data.get('seo_title') else 'No'}")
logger.info(f" - SEO description: {'Yes' if content_data.get('seo_description') else 'No'}")
response = requests.post(
wordpress_url,
json=content_data,
headers=headers,
timeout=30
# Update external_id and external_url for unified Content model
old_status = content.status
content.external_id = wp_data.get('post_id')
content.external_url = wp_data.get('post_url')
content.status = 'published'
content.save(update_fields=[
'external_id', 'external_url', 'status', 'updated_at'
])
logger.info(f"[publish_content_to_wordpress] 💾 Content model updated:")
logger.info(f" - Status: '{old_status}''published'")
logger.info(f" - External ID: {content.external_id}")
logger.info(f" - External URL: {content.external_url}")
wordpress_url,
json=content_data,
headers=headers,
timeout=30
)
logger.info(f"[publish_content_to_wordpress] 📬 WordPress response: status={response.status_code}")
# Track start time for duration measurement
import time
from igny8_core.business.integration.models import SyncEvent
start_time = time.time()
if response.status_code == 201:
# Success
wp_data = response.json().get('data', {})
wp_status = wp_data.get('post_status', 'publish')
logger.info(f"[publish_content_to_wordpress] ✅ WordPress post created successfully: post_id={wp_data.get('post_id')}, status={wp_status}")
# Update external_id, external_url, and wordpress_status in Content model
content.external_id = str(wp_data.get('post_id'))
content.external_url = wp_data.get('post_url')
content.status = 'published'
# Add wordpress_status field to Content model metadata
if not hasattr(content, 'metadata') or content.metadata is None:
content.metadata = {}
content.metadata['wordpress_status'] = wp_status
content.save(update_fields=[
'external_id', 'external_url', 'status', 'metadata', 'updated_at'
])
logger.info(f"[publish_content_to_wordpress] 💾 Content model updated:")
logger.info(f" - External ID: {content.external_id}")
logger.info(f" - External URL: {content.external_url}")
logger.info(f" - Status: published")
logger.info(f" - WordPress Status: {wp_status}")
# Log sync event
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='publish',
action='content_publish',
description=f"Published content '{content.title}' to WordPress",
success=True,
content_id=content.id,
external_id=str(content.external_id),
details={
'post_url': content.external_url,
'wordpress_status': wp_status,
'categories': categories,
'tags': tags,
'has_featured_image': bool(featured_image_url),
'gallery_images_count': len(gallery_images),
},
duration_ms=duration_ms
)
logger.info(f"[publish_content_to_wordpress] 🎉 Successfully published content {content_id} to WordPress post {content.external_id}")
return {
"success": True,
"external_id": content.external_id,
"external_url": content.external_url,
"wordpress_status": wp_status
}
elif response.status_code == 409:
# Content already exists
wp_data = response.json().get('data', {})
wp_status = wp_data.get('post_status', 'publish')
content.external_id = str(wp_data.get('post_id'))
content.external_url = wp_data.get('post_url')
content.status = 'published'
# Update wordpress_status in metadata
if not hasattr(content, 'metadata') or content.metadata is None:
content.metadata = {}
content.metadata['wordpress_status'] = wp_status
content.save(update_fields=[
'external_id', 'external_url', 'status', 'metadata', 'updated_at'
])
# Log sync event
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='sync',
action='content_publish',
description=f"Content '{content.title}' already exists in WordPress",
success=True,
content_id=content.id,
external_id=str(content.external_id),
details={
'post_url': content.external_url,
'wordpress_status': wp_status,
'already_exists': True,
},
duration_ms=duration_ms
)
logger.info(f"Content {content_id} already exists on WordPress")
return {
"success": True,
"message": "Content already exists",
"external_id": content.external_id,
"wordpress_status": wp_status
}
else:
# Error
error_msg = f"WordPress API error: {response.status_code} - {response.text}"
logger.error(f"[publish_content_to_wordpress] ❌ {error_msg}")
# Log sync event for failure
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='error',
action='content_publish',
description=f"Failed to publish content '{content.title}' to WordPress",
success=False,
content_id=content.id,
error_message=error_msg,
details={
'status_code': response.status_code,
'response_text': response.text[:500], # Limit length
},
duration_ms=duration_ms
)
# Retry logic
if self.request.retries < self.max_retries:
# Exponential backoff: 1min, 5min, 15min
countdown = 60 * (5 ** self.request.retries)
logger.warning(f"[publish_content_to_wordpress] 🔄 Retrying (attempt {self.request.retries + 1}/{self.max_retries}) in {countdown}s")
raise self.retry(countdown=countdown, exc=Exception(error_msg))
else:
# Max retries reached - mark as failed
logger.error(f"[publish_content_to_wordpress] ❌ Max retries reached, giving up")
return {"success": False, "error": error_msg}
except Exception as e:
logger.error(f"[publish_content_to_wordpress] ❌ Exception during publish: {str(e)}", exc_info=True)
# Log sync event for exception
try:
from igny8_core.business.integration.models import SyncEvent
import time
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='error',
action='content_publish',
description=f"Exception while publishing content '{content.title}' to WordPress",
success=False,
content_id=content.id,
error_message=str(e),
details={
'exception_type': type(e).__name__,
'traceback': str(e),
},
)
except Exception as log_error:
logger.error(f"Failed to log sync event: {str(log_error)}")
return {"success": False, "error": str(e)}
@shared_task
def process_pending_wordpress_publications() -> Dict[str, Any]:
"""
Process all content items pending WordPress publication
Runs every 5 minutes
"""
try:
from igny8_core.business.content.models import Content
from igny8_core.business.integration.models import SiteIntegration
# Find content marked for WordPress publishing (status = published, external_id = empty)
pending_content = Content.objects.filter(
status='published',
external_id__isnull=True
).select_related('site', 'sector', 'cluster')
if not pending_content.exists():
logger.info("No content pending WordPress publication")
return {"success": True, "processed": 0}
# Get active WordPress integrations
active_integrations = SiteIntegration.objects.filter(
platform='wordpress',
is_active=True
)
if not active_integrations.exists():
logger.warning("No active WordPress integrations found")
return {"success": False, "error": "No active WordPress integrations"}
processed = 0
for content in pending_content[:50]: # Process max 50 at a time
for integration in active_integrations.filter(site=content.site):
# Queue individual publish task
publish_content_to_wordpress.delay(
content.id,
integration.id
)
processed += 1
break # Only queue with first matching integration
logger.info(f"Queued {processed} content items for WordPress publication")
return {"success": True, "processed": processed}
except Exception as e:
logger.error(f"Error processing pending WordPress publications: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)}
@shared_task
def bulk_publish_content_to_wordpress(content_ids: List[int], site_integration_id: int) -> Dict[str, Any]:
"""
Bulk publish multiple content items to WordPress
Used for manual bulk operations from Content Manager
"""
try:
from igny8_core.business.content.models import Content
from igny8_core.business.integration.models import SiteIntegration
site_integration = SiteIntegration.objects.get(id=site_integration_id)
content_items = Content.objects.filter(id__in=content_ids)
results = {
"success": True,
"total": len(content_ids),
"queued": 0,
"skipped": 0,
"errors": []
}
for content in content_items:
try:
# Skip if already published
if content.external_id:
results["skipped"] += 1
continue
# Queue individual publish task
publish_content_to_wordpress.delay(
content.id,
site_integration.id
)
results["queued"] += 1
except Exception as e:
results["errors"].append(f"Content {content.id}: {str(e)}")
if results["errors"]:
results["success"] = len(results["errors"]) < results["total"] / 2
logger.info(f"Bulk publish: {results['queued']} queued, {results['skipped']} skipped, {len(results['errors'])} errors")
return results
except Exception as e:
logger.error(f"Error in bulk publish: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)}
@shared_task
def wordpress_status_reconciliation() -> Dict[str, Any]:
"""
Daily task to verify published content still exists on WordPress
Checks for discrepancies and fixes them
"""
try:
from igny8_core.business.content.models import Content
# Get content marked as published
published_content = Content.objects.filter(
external_id__isnull=False
)[:100] # Limit to prevent timeouts
logger.info(f"Status reconciliation: Checking {len(published_content)} published items")
return {"success": True, "checked": len(published_content)}
except Exception as e:
logger.error(f"Error in status reconciliation: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)}
@shared_task
def retry_failed_wordpress_publications() -> Dict[str, Any]:
"""
Retry failed WordPress publications (runs daily)
For future use when we implement failure tracking
"""
try:
logger.info("Retry task: No failure tracking currently implemented")
return {"success": True, "retried": 0}
except Exception as e:
logger.error(f"Error retrying failed publications: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)}

View File

@@ -0,0 +1,519 @@
"""
IGNY8 Content Publishing Celery Tasks - WITH COMPREHENSIVE LOGGING
Handles automated publishing of content from IGNY8 to WordPress sites.
All workflow steps are logged to files for debugging and monitoring.
"""
from celery import shared_task
from django.conf import settings
from django.utils import timezone
from datetime import timedelta
import requests
import logging
import time
import json
from typing import Dict, List, Any, Optional
# Standard logger
logger = logging.getLogger(__name__)
# Dedicated file loggers
publish_logger = logging.getLogger('publish_sync')
api_logger = logging.getLogger('wordpress_api')
@shared_task(bind=True, max_retries=3)
def publish_content_to_wordpress(self, content_id: int, site_integration_id: int, task_id: Optional[int] = None) -> Dict[str, Any]:
"""
Publish a single content item to WordPress with comprehensive logging
Args:
content_id: IGNY8 content ID
site_integration_id: WordPress site integration ID
task_id: Optional IGNY8 task ID
Returns:
Dict with success status and details
"""
start_time = time.time()
try:
from igny8_core.business.content.models import Content, ContentTaxonomyMap
from igny8_core.business.integration.models import SiteIntegration, SyncEvent
from igny8_core.modules.writer.models import Images
from django.utils.html import strip_tags
publish_logger.info("="*80)
publish_logger.info(f"🎯 PUBLISH WORKFLOW STARTED")
publish_logger.info(f" Content ID: {content_id}")
publish_logger.info(f" Site Integration ID: {site_integration_id}")
publish_logger.info(f" Task ID: {task_id}")
publish_logger.info("="*80)
# STEP 1: Load content and integration from database
try:
publish_logger.info(f"STEP 1: Loading content and integration from database...")
content = Content.objects.get(id=content_id)
publish_logger.info(f" ✅ Content loaded:")
publish_logger.info(f" - Title: '{content.title}'")
publish_logger.info(f" - Status: {content.status}")
publish_logger.info(f" - Type: {content.content_type}")
publish_logger.info(f" - Site: {content.site.name if content.site else 'None'}")
publish_logger.info(f" - Account: {content.account.name if content.account else 'None'}")
site_integration = SiteIntegration.objects.get(id=site_integration_id)
publish_logger.info(f" ✅ Integration loaded:")
publish_logger.info(f" - Platform: {site_integration.platform}")
publish_logger.info(f" - Site: {site_integration.site.name}")
publish_logger.info(f" - Base URL: {site_integration.base_url}")
publish_logger.info(f" - API Key: {'***' + site_integration.api_key[-4:] if site_integration.api_key else 'None'}")
except (Content.DoesNotExist, SiteIntegration.DoesNotExist) as e:
publish_logger.error(f" ❌ Database lookup failed: {e}")
return {"success": False, "error": str(e)}
# STEP 2: Check if already published
publish_logger.info(f"STEP 2: Checking if content is already published...")
if content.external_id:
publish_logger.info(f" ⚠️ Content already published:")
publish_logger.info(f" - External ID: {content.external_id}")
publish_logger.info(f" - External URL: {content.external_url}")
publish_logger.info(f" ⏭️ Skipping publish workflow")
return {"success": True, "message": "Already published", "external_id": content.external_id}
else:
publish_logger.info(f" ✅ Content not yet published, proceeding...")
# STEP 3: Generate excerpt from HTML content
publish_logger.info(f"STEP 3: Generating excerpt from content HTML...")
excerpt = ''
if content.content_html:
excerpt = strip_tags(content.content_html)[:150].strip()
if len(content.content_html) > 150:
excerpt += '...'
publish_logger.info(f" ✅ Excerpt generated:")
publish_logger.info(f" - Content HTML length: {len(content.content_html)} chars")
publish_logger.info(f" - Excerpt: {excerpt[:80]}...")
else:
publish_logger.warning(f" ⚠️ No content_html found - excerpt will be empty")
# STEP 4: Get taxonomy terms (categories)
publish_logger.info(f"STEP 4: Loading taxonomy mappings for categories...")
taxonomy_maps = ContentTaxonomyMap.objects.filter(content=content).select_related('taxonomy')
publish_logger.info(f" Found {taxonomy_maps.count()} taxonomy mappings")
categories = []
for mapping in taxonomy_maps:
if mapping.taxonomy:
categories.append(mapping.taxonomy.name)
publish_logger.info(f" 📁 Category: '{mapping.taxonomy.name}'")
if not categories:
publish_logger.warning(f" ⚠️ No categories found for content")
else:
publish_logger.info(f" ✅ TOTAL categories: {len(categories)}")
# STEP 5: Get keywords as tags
publish_logger.info(f"STEP 5: Extracting keywords as tags...")
tags = []
if content.primary_keyword:
tags.append(content.primary_keyword)
publish_logger.info(f" 🏷️ Primary keyword: '{content.primary_keyword}'")
else:
publish_logger.warning(f" ⚠️ No primary keyword found")
if content.secondary_keywords:
if isinstance(content.secondary_keywords, list):
tags.extend(content.secondary_keywords)
publish_logger.info(f" 🏷️ Secondary keywords (list): {content.secondary_keywords}")
elif isinstance(content.secondary_keywords, str):
try:
keywords = json.loads(content.secondary_keywords)
if isinstance(keywords, list):
tags.extend(keywords)
publish_logger.info(f" 🏷️ Secondary keywords (JSON): {keywords}")
except (json.JSONDecodeError, TypeError):
publish_logger.warning(f" ⚠️ Failed to parse secondary_keywords as JSON: {content.secondary_keywords}")
else:
publish_logger.warning(f" ⚠️ No secondary keywords found")
if not tags:
publish_logger.warning(f" ⚠️ No tags found for content")
else:
publish_logger.info(f" ✅ TOTAL tags: {len(tags)}")
# STEP 6: Get images (featured + gallery)
publish_logger.info(f"STEP 6: Loading images for content...")
images = Images.objects.filter(content=content).order_by('position')
publish_logger.info(f" Found {images.count()} images")
featured_image_url = None
gallery_images = []
for image in images:
if image.image_type == 'featured' and image.image_url:
featured_image_url = image.image_url
publish_logger.info(f" 🖼️ Featured image: {image.image_url[:80]}...")
elif image.image_type == 'in_article' and image.image_url:
gallery_images.append({
'url': image.image_url,
'alt': image.alt_text or '',
'caption': image.caption or ''
})
publish_logger.info(f" 🖼️ Gallery image {len(gallery_images)}: {image.image_url[:60]}...")
if not featured_image_url:
publish_logger.warning(f" ⚠️ No featured image found")
if not gallery_images:
publish_logger.info(f" No gallery images found")
else:
publish_logger.info(f" ✅ Gallery images: {len(gallery_images)}")
# STEP 7: Prepare content payload
publish_logger.info(f"STEP 7: Building WordPress API payload...")
content_data = {
'content_id': content.id,
'task_id': task_id,
'title': content.title,
'content_html': content.content_html or '',
'excerpt': excerpt,
'status': 'publish',
'author_email': None,
'author_name': None,
'published_at': None,
'seo_title': content.meta_title or '',
'seo_description': content.meta_description or '',
'primary_keyword': content.primary_keyword or '',
'secondary_keywords': content.secondary_keywords or [],
'featured_image_url': featured_image_url,
'gallery_images': gallery_images,
'cluster_id': content.cluster.id if content.cluster else None,
'sector_id': content.sector.id if content.sector else None,
'categories': categories,
'tags': tags,
'sectors': [],
'clusters': []
}
publish_logger.info(f" ✅ Payload built:")
publish_logger.info(f" - Title: {content.title}")
publish_logger.info(f" - Content HTML: {len(content_data['content_html'])} chars")
publish_logger.info(f" - Categories: {len(categories)}")
publish_logger.info(f" - Tags: {len(tags)}")
publish_logger.info(f" - Featured image: {'Yes' if featured_image_url else 'No'}")
publish_logger.info(f" - Gallery images: {len(gallery_images)}")
publish_logger.info(f" - SEO title: {'Yes' if content_data['seo_title'] else 'No'}")
publish_logger.info(f" - SEO description: {'Yes' if content_data['seo_description'] else 'No'}")
publish_logger.info(f" - Cluster ID: {content_data['cluster_id']}")
publish_logger.info(f" - Sector ID: {content_data['sector_id']}")
# STEP 8: Send API request to WordPress
wordpress_url = f"{site_integration.base_url}/wp-json/igny8/v1/publish"
headers = {
'X-IGNY8-API-Key': site_integration.api_key,
'Content-Type': 'application/json',
}
publish_logger.info(f"STEP 8: Sending POST request to WordPress...")
api_logger.info(f"API REQUEST: POST {wordpress_url}")
api_logger.info(f" Headers: X-IGNY8-API-Key: ***{site_integration.api_key[-4:]}")
api_logger.info(f" Payload: {json.dumps(content_data, indent=2)[:500]}...")
try:
response = requests.post(
wordpress_url,
json=content_data,
headers=headers,
timeout=30
)
api_logger.info(f"API RESPONSE: {response.status_code}")
api_logger.info(f" Response body: {response.text[:500]}")
publish_logger.info(f" ✅ WordPress responded: HTTP {response.status_code}")
except requests.exceptions.Timeout as e:
error_msg = f"WordPress API timeout after 30s: {str(e)}"
api_logger.error(f"API ERROR: {error_msg}")
publish_logger.error(f" ❌ Request timed out after 30 seconds")
# Log failure event
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='error',
action='content_publish',
description=f"Timeout publishing '{content.title}' to WordPress",
success=False,
content_id=content.id,
error_message=error_msg,
duration_ms=int((time.time() - start_time) * 1000)
)
return {"success": False, "error": error_msg}
except requests.exceptions.RequestException as e:
error_msg = f"WordPress API request failed: {str(e)}"
api_logger.error(f"API ERROR: {error_msg}")
publish_logger.error(f" ❌ Request exception: {str(e)}")
# Log failure event
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='error',
action='content_publish',
description=f"Request error publishing '{content.title}' to WordPress",
success=False,
content_id=content.id,
error_message=error_msg,
duration_ms=int((time.time() - start_time) * 1000)
)
return {"success": False, "error": error_msg}
# STEP 9: Process response
publish_logger.info(f"STEP 9: Processing WordPress response...")
if response.status_code == 201:
# SUCCESS - Post created
publish_logger.info(f" ✅ WordPress post created successfully (HTTP 201)")
try:
response_data = response.json()
wp_data = response_data.get('data', {})
wp_status = wp_data.get('post_status', 'publish')
publish_logger.info(f" - Post ID: {wp_data.get('post_id')}")
publish_logger.info(f" - Post URL: {wp_data.get('post_url')}")
publish_logger.info(f" - Post status: {wp_status}")
# Update Content model
publish_logger.info(f"STEP 10: Updating IGNY8 Content model...")
content.external_id = str(wp_data.get('post_id'))
content.external_url = wp_data.get('post_url')
content.status = 'published'
if not hasattr(content, 'metadata') or content.metadata is None:
content.metadata = {}
content.metadata['wordpress_status'] = wp_status
content.save(update_fields=['external_id', 'external_url', 'status', 'metadata', 'updated_at'])
publish_logger.info(f" ✅ Content model updated:")
publish_logger.info(f" - External ID: {content.external_id}")
publish_logger.info(f" - External URL: {content.external_url}")
publish_logger.info(f" - Status: published")
publish_logger.info(f" - WordPress status: {wp_status}")
# Log success event
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='publish',
action='content_publish',
description=f"Published content '{content.title}' to WordPress",
success=True,
content_id=content.id,
external_id=str(content.external_id),
details={
'post_url': content.external_url,
'wordpress_status': wp_status,
'categories': categories,
'tags': tags,
'has_featured_image': bool(featured_image_url),
'gallery_images_count': len(gallery_images),
},
duration_ms=duration_ms
)
publish_logger.info(f"="*80)
publish_logger.info(f"🎉 PUBLISH WORKFLOW COMPLETED SUCCESSFULLY")
publish_logger.info(f" Duration: {duration_ms}ms")
publish_logger.info(f" WordPress Post ID: {content.external_id}")
publish_logger.info("="*80)
return {
"success": True,
"external_id": content.external_id,
"external_url": content.external_url,
"wordpress_status": wp_status
}
except (json.JSONDecodeError, KeyError) as e:
error_msg = f"Failed to parse WordPress response: {str(e)}"
publish_logger.error(f" ❌ Response parsing error: {error_msg}")
publish_logger.error(f" Raw response: {response.text[:200]}")
return {"success": False, "error": error_msg}
elif response.status_code == 409:
# CONFLICT - Post already exists
publish_logger.info(f" ⚠️ Content already exists in WordPress (HTTP 409)")
try:
response_data = response.json()
wp_data = response_data.get('data', {})
wp_status = wp_data.get('post_status', 'publish')
publish_logger.info(f" - Existing Post ID: {wp_data.get('post_id')}")
publish_logger.info(f" - Post URL: {wp_data.get('post_url')}")
# Update Content model
content.external_id = str(wp_data.get('post_id'))
content.external_url = wp_data.get('post_url')
content.status = 'published'
if not hasattr(content, 'metadata') or content.metadata is None:
content.metadata = {}
content.metadata['wordpress_status'] = wp_status
content.save(update_fields=['external_id', 'external_url', 'status', 'metadata', 'updated_at'])
# Log sync event
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='sync',
action='content_publish',
description=f"Content '{content.title}' already exists in WordPress",
success=True,
content_id=content.id,
external_id=str(content.external_id),
details={
'post_url': content.external_url,
'wordpress_status': wp_status,
'already_exists': True,
},
duration_ms=duration_ms
)
publish_logger.info(f"="*80)
publish_logger.info(f"✅ PUBLISH WORKFLOW COMPLETED (already exists)")
publish_logger.info(f" Duration: {duration_ms}ms")
publish_logger.info("="*80)
return {
"success": True,
"message": "Content already exists",
"external_id": content.external_id,
"wordpress_status": wp_status
}
except (json.JSONDecodeError, KeyError) as e:
error_msg = f"Failed to parse 409 response: {str(e)}"
publish_logger.error(f" ❌ Response parsing error: {error_msg}")
return {"success": False, "error": error_msg}
else:
# ERROR - Unexpected status code
error_msg = f"WordPress API error: HTTP {response.status_code}"
publish_logger.error(f" ❌ Unexpected status code: {response.status_code}")
publish_logger.error(f" Response: {response.text[:500]}")
api_logger.error(f"API ERROR: {error_msg}")
api_logger.error(f" Full response: {response.text}")
# Log failure event
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='error',
action='content_publish',
description=f"Failed to publish content '{content.title}' to WordPress",
success=False,
content_id=content.id,
error_message=error_msg,
details={
'status_code': response.status_code,
'response_text': response.text[:500],
},
duration_ms=duration_ms
)
# Retry logic
if self.request.retries < self.max_retries:
countdown = 60 * (5 ** self.request.retries)
publish_logger.warning(f" 🔄 Scheduling retry (attempt {self.request.retries + 1}/{self.max_retries}) in {countdown}s")
raise self.retry(countdown=countdown, exc=Exception(error_msg))
else:
publish_logger.error(f" ❌ Max retries reached, giving up")
publish_logger.info(f"="*80)
publish_logger.error(f"❌ PUBLISH WORKFLOW FAILED")
publish_logger.info(f" Duration: {duration_ms}ms")
publish_logger.info("="*80)
return {"success": False, "error": error_msg}
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
publish_logger.error(f"="*80)
publish_logger.error(f"💥 PUBLISH WORKFLOW EXCEPTION")
publish_logger.error(f" Exception type: {type(e).__name__}")
publish_logger.error(f" Exception message: {str(e)}")
publish_logger.error(f" Duration: {duration_ms}ms")
publish_logger.error("="*80, exc_info=True)
# Try to log sync event
try:
from igny8_core.business.integration.models import SyncEvent
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='error',
action='content_publish',
description=f"Exception publishing '{content.title}' to WordPress",
success=False,
content_id=content.id,
error_message=str(e),
details={'exception_type': type(e).__name__},
duration_ms=duration_ms
)
except Exception as log_error:
publish_logger.error(f"Failed to log sync event: {str(log_error)}")
return {"success": False, "error": str(e)}
@shared_task
def process_pending_wordpress_publications() -> Dict[str, Any]:
"""
Process all content items pending WordPress publication
"""
try:
from igny8_core.business.content.models import Content
from igny8_core.business.integration.models import SiteIntegration
pending_content = Content.objects.filter(
status='published',
external_id__isnull=True
).select_related('site', 'sector', 'cluster')
if not pending_content.exists():
publish_logger.info("No content pending WordPress publication")
return {"success": True, "processed": 0}
active_integrations = SiteIntegration.objects.filter(
platform='wordpress',
is_active=True
)
if not active_integrations.exists():
publish_logger.warning("No active WordPress integrations found")
return {"success": False, "error": "No active WordPress integrations"}
processed = 0
for content in pending_content[:50]:
for integration in active_integrations.filter(site=content.site):
publish_content_to_wordpress.delay(content.id, integration.id)
processed += 1
break
publish_logger.info(f"Queued {processed} content items for WordPress publication")
return {"success": True, "processed": processed}
except Exception as e:
publish_logger.error(f"Error processing pending publications: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)}