fixing wp-igny8-integration

This commit is contained in:
alorig
2025-11-29 15:23:12 +05:00
parent 8d096b383a
commit 0549dea124
4 changed files with 320 additions and 298 deletions

View File

@@ -32,11 +32,18 @@ class WordPressAdapter(BaseAdapter):
Args: Args:
content: Content instance or dict with content data content: Content instance or dict with content data
destination_config: { destination_config: {
# API Key method (preferred):
'site_url': str,
'api_key': str,
# OR username/password method:
'site_url': str, 'site_url': str,
'username': str, 'username': str,
'app_password': str, 'app_password': str,
'status': str (optional, default 'draft'),
'featured_image_url': str (optional) # Optional:
'status': str (default 'draft'),
'featured_image_url': str
} }
Returns: Returns:
@@ -49,60 +56,46 @@ class WordPressAdapter(BaseAdapter):
} }
""" """
try: try:
# Get WordPress client
client = self._get_client(destination_config)
# Extract content data # Extract content data
if hasattr(content, 'title'): if hasattr(content, 'title'):
# Content model instance # Content model instance
title = content.title title = content.title
# Stage 1 schema: content_html is the primary field content_html = getattr(content, 'content_html', '') or ''
content_html = getattr(content, 'content_html', '') or getattr(content, 'html_content', '') or getattr(content, 'content', '')
elif isinstance(content, dict): elif isinstance(content, dict):
# Dict with content data # Dict with content data
title = content.get('title', '') title = content.get('title', '')
content_html = content.get('content_html') or content.get('html_content') or content.get('content', '') content_html = content.get('content_html', '')
else: else:
raise ValueError(f"Unsupported content type: {type(content)}") raise ValueError(f"Unsupported content type: {type(content)}")
# Get publishing options # Get site URL
status = destination_config.get('status', 'draft') site_url = destination_config.get('site_url')
featured_image_url = destination_config.get('featured_image_url') if not site_url:
raise ValueError("site_url is required in destination_config")
# Publish to WordPress # Check if using API key authentication
result = client.create_post( api_key = destination_config.get('api_key')
title=title,
content=content_html,
status=status,
featured_image_url=featured_image_url
)
# Handle different response formats (for compatibility with mocks and real API) if api_key:
if result.get('success') or result.get('id') or result.get('post_id'): # Use IGNY8 custom endpoint with API key
# Extract post ID from various possible fields return self._publish_via_api_key(
post_id = result.get('post_id') or result.get('id') or result.get('ID') site_url=site_url,
url = result.get('url') or result.get('link') api_key=api_key,
content=content,
return { title=title,
'success': True, content_html=content_html,
'external_id': str(post_id) if post_id else None, destination_config=destination_config
'url': url, )
'published_at': datetime.now(),
'metadata': {
'post_id': post_id,
'status': status
}
}
else: else:
return { # Use standard WordPress REST API with username/password
'success': False, return self._publish_via_username_password(
'external_id': None, site_url=site_url,
'url': None, username=destination_config.get('username'),
'published_at': None, app_password=destination_config.get('app_password'),
'metadata': { title=title,
'error': result.get('error', 'Unknown error') content_html=content_html,
} destination_config=destination_config
} )
except Exception as e: except Exception as e:
logger.error( logger.error(
@@ -119,6 +112,139 @@ class WordPressAdapter(BaseAdapter):
} }
} }
def _publish_via_api_key(
self,
site_url: str,
api_key: str,
content: Any,
title: str,
content_html: str,
destination_config: Dict[str, Any]
) -> Dict[str, Any]:
"""
Publish via IGNY8 custom WordPress endpoint using API key.
This uses the /wp-json/igny8/v1/publish-content/ endpoint.
"""
import requests
from django.utils.html import strip_tags
# Generate excerpt
excerpt = ''
if content_html:
excerpt = strip_tags(content_html)[:150].strip()
if len(content_html) > 150:
excerpt += '...'
# Prepare payload
content_data = {
'content_id': content.id if hasattr(content, 'id') else None,
'title': title,
'content_html': content_html,
'excerpt': excerpt,
'status': destination_config.get('status', 'publish'),
}
# Add optional fields from content model
if hasattr(content, 'meta_title'):
content_data['seo_title'] = content.meta_title or ''
if hasattr(content, 'meta_description'):
content_data['seo_description'] = content.meta_description or ''
if hasattr(content, 'primary_keyword'):
content_data['primary_keyword'] = content.primary_keyword or ''
if hasattr(content, 'secondary_keywords'):
content_data['secondary_keywords'] = content.secondary_keywords or []
if hasattr(content, 'cluster') and content.cluster:
content_data['cluster_id'] = content.cluster.id
if hasattr(content, 'sector') and content.sector:
content_data['sector_id'] = content.sector.id
# Call WordPress endpoint
url = f"{site_url.rstrip('/')}/wp-json/igny8/v1/publish-content/"
headers = {
'Content-Type': 'application/json',
'X-IGNY8-API-KEY': api_key,
}
response = requests.post(url, json=content_data, headers=headers, timeout=30)
if response.status_code == 201:
wp_data = response.json().get('data', {})
return {
'success': True,
'external_id': str(wp_data.get('post_id')),
'url': wp_data.get('post_url'),
'published_at': datetime.now(),
'metadata': {
'post_id': wp_data.get('post_id'),
'status': destination_config.get('status', 'publish')
}
}
else:
error_msg = f"HTTP {response.status_code}: {response.text}"
return {
'success': False,
'external_id': None,
'url': None,
'published_at': None,
'metadata': {'error': error_msg}
}
def _publish_via_username_password(
self,
site_url: str,
username: str,
app_password: str,
title: str,
content_html: str,
destination_config: Dict[str, Any]
) -> Dict[str, Any]:
"""
Publish via standard WordPress REST API using username/password.
"""
if not username or not app_password:
raise ValueError("username and app_password are required when not using API key")
# Get WordPress client
client = WordPressClient(site_url, username, app_password)
# Get publishing options
status = destination_config.get('status', 'draft')
featured_image_url = destination_config.get('featured_image_url')
# Publish to WordPress
result = client.create_post(
title=title,
content=content_html,
status=status,
featured_image_url=featured_image_url
)
# Handle response
if result.get('success') or result.get('post_id'):
post_id = result.get('post_id') or result.get('id')
url = result.get('url') or result.get('link')
return {
'success': True,
'external_id': str(post_id) if post_id else None,
'url': url,
'published_at': datetime.now(),
'metadata': {
'post_id': post_id,
'status': status
}
}
else:
return {
'success': False,
'external_id': None,
'url': None,
'published_at': None,
'metadata': {
'error': result.get('error', 'Unknown error')
}
}
def test_connection( def test_connection(
self, self,
config: Dict[str, Any] config: Dict[str, Any]

View File

@@ -138,8 +138,8 @@ class PublisherService:
if not adapter: if not adapter:
raise ValueError(f"No adapter found for destination: {destination}") raise ValueError(f"No adapter found for destination: {destination}")
# Get destination config (for now, basic config - can be extended) # Get destination config
destination_config = {'account': account} destination_config = {}
# If content has site, try to get integration config # If content has site, try to get integration config
if hasattr(content, 'site') and content.site: if hasattr(content, 'site') and content.site:
@@ -151,8 +151,13 @@ class PublisherService:
).first() ).first()
if integration: if integration:
destination_config.update(integration.config_json) # Merge config_json and credentials_json
destination_config.update(integration.get_credentials()) destination_config.update(integration.config_json or {})
destination_config.update(integration.get_credentials() or {})
# Ensure site_url is set (from config or from site model)
if not destination_config.get('site_url'):
destination_config['site_url'] = content.site.url
# Publish via adapter # Publish via adapter
result = adapter.publish(content, destination_config) result = adapter.publish(content, destination_config)

View File

@@ -759,17 +759,19 @@ class ContentViewSet(SiteSectorModelViewSet):
@action(detail=True, methods=['post'], url_path='publish', url_name='publish', permission_classes=[IsAuthenticatedAndActive, IsEditorOrAbove]) @action(detail=True, methods=['post'], url_path='publish', url_name='publish', permission_classes=[IsAuthenticatedAndActive, IsEditorOrAbove])
def publish(self, request, pk=None): def publish(self, request, pk=None):
""" """
STAGE 3: Publish content to WordPress site. STAGE 3: Publish content to WordPress site via Celery task.
Prevents duplicate publishing and updates external_id/external_url. Mirrors the automated publishing flow for manual publishing from Review page.
POST /api/v1/writer/content/{id}/publish/ POST /api/v1/writer/content/{id}/publish/
{ {
"site_id": 1, // Optional - defaults to content's site "site_integration_id": 1 // Optional - defaults to finding WordPress integration for content's site
"status": "publish" // Optional - draft or publish
} }
""" """
from igny8_core.auth.models import Site from igny8_core.business.integration.models import SiteIntegration
from igny8_core.business.publishing.services.adapters.wordpress_adapter import WordPressAdapter from igny8_core.tasks.wordpress_publishing import publish_content_to_wordpress
import logging
logger = logging.getLogger(__name__)
content = self.get_object() content = self.get_object()
@@ -782,72 +784,65 @@ class ContentViewSet(SiteSectorModelViewSet):
errors={'external_id': [f'Already published with ID: {content.external_id}']} errors={'external_id': [f'Already published with ID: {content.external_id}']}
) )
# Get site (use content's site if not specified) # Get site integration (use content's site if not specified)
site_id = request.data.get('site_id') or content.site_id site_integration_id = request.data.get('site_integration_id')
if not site_id:
return error_response(
error='site_id is required or content must have a site',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
if not site_integration_id:
# Find WordPress integration for this site
site_integrations = SiteIntegration.objects.filter(
site=content.site,
platform='wordpress',
is_active=True
)
if not site_integrations.exists():
return error_response(
error='No active WordPress integration found for this site',
status_code=status.HTTP_400_BAD_REQUEST,
request=request,
errors={'site_integration': ['WordPress integration is required to publish']}
)
site_integration = site_integrations.first()
else:
try:
site_integration = SiteIntegration.objects.get(
id=site_integration_id,
site=content.site,
platform='wordpress'
)
except SiteIntegration.DoesNotExist:
return error_response(
error=f'WordPress integration with id {site_integration_id} not found for this site',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
# Queue publishing task (same as automated flow)
try: try:
site = Site.objects.get(id=site_id) result = publish_content_to_wordpress.delay(
except Site.DoesNotExist: content_id=content.id,
return error_response( site_integration_id=site_integration.id
error=f'Site with id {site_id} does not exist',
status_code=status.HTTP_404_NOT_FOUND,
request=request
) )
# Get WordPress credentials from site metadata logger.info(f"[ContentViewSet.publish] Queued Celery task {result.id} for content {content.id}")
wp_credentials = site.metadata.get('wordpress', {}) if site.metadata else {}
wp_url = wp_credentials.get('url') or site.url
wp_username = wp_credentials.get('username')
wp_app_password = wp_credentials.get('app_password')
if not wp_username or not wp_app_password:
return error_response(
error='WordPress credentials not configured for this site',
status_code=status.HTTP_400_BAD_REQUEST,
request=request,
errors={'credentials': ['Missing WordPress username or app password in site settings']}
)
# Use WordPress adapter to publish
adapter = WordPressAdapter()
wp_status = request.data.get('status', 'publish') # draft or publish
result = adapter.publish(
content=content,
destination_config={
'site_url': wp_url,
'username': wp_username,
'app_password': wp_app_password,
'status': wp_status,
}
)
if result.get('success'):
# STAGE 3: Update content with external references
content.external_id = result.get('external_id')
content.external_url = result.get('url')
content.status = 'published'
content.save(update_fields=['external_id', 'external_url', 'status', 'updated_at'])
return success_response( return success_response(
data={ data={
'content_id': content.id, 'content_id': content.id,
'status': content.status, 'task_id': result.id,
'external_id': content.external_id, 'status': 'queued',
'external_url': content.external_url, 'message': 'Publishing queued - content will be published to WordPress shortly'
}, },
message='Content published to WordPress successfully', message='Content publishing queued successfully',
request=request request=request,
status_code=status.HTTP_202_ACCEPTED
) )
else:
except Exception as e:
logger.error(f"[ContentViewSet.publish] Error queuing publish task: {str(e)}", exc_info=True)
return error_response( return error_response(
error=f"Failed to publish to WordPress: {result.get('metadata', {}).get('error', 'Unknown error')}", error=f"Failed to queue publishing task: {str(e)}",
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request request=request
) )

View File

@@ -28,47 +28,57 @@ def publish_content_to_wordpress(self, content_id: int, site_integration_id: int
Dict with success status and details Dict with success status and details
""" """
try: try:
from igny8_core.models import ContentPost, SiteIntegration from igny8_core.business.content.models import Content
from igny8_core.business.integration.models import SiteIntegration
# Get content and site integration # Get content and site integration
try: try:
content = ContentPost.objects.get(id=content_id) content = Content.objects.get(id=content_id)
site_integration = SiteIntegration.objects.get(id=site_integration_id) site_integration = SiteIntegration.objects.get(id=site_integration_id)
except (ContentPost.DoesNotExist, SiteIntegration.DoesNotExist) as e: except (Content.DoesNotExist, SiteIntegration.DoesNotExist) as e:
logger.error(f"Content or site integration not found: {e}") logger.error(f"Content or site integration not found: {e}")
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
# Check if content is ready for publishing # Check if content is already published
if content.wordpress_sync_status == 'success': if content.external_id:
logger.info(f"Content {content_id} already published to WordPress") logger.info(f"Content {content_id} already published to WordPress")
return {"success": True, "message": "Already published", "wordpress_post_id": content.wordpress_post_id} return {"success": True, "message": "Already published", "external_id": content.external_id}
if content.wordpress_sync_status == 'syncing':
logger.info(f"Content {content_id} is currently syncing")
return {"success": False, "error": "Content is currently syncing"}
# Update status to syncing
content.wordpress_sync_status = 'syncing'
content.save(update_fields=['wordpress_sync_status'])
# Prepare content data for WordPress # Prepare content data for WordPress
# Generate excerpt from content_html (Content model has no 'brief' field)
excerpt = ''
if content.content_html:
from django.utils.html import strip_tags
excerpt = strip_tags(content.content_html)[:150].strip()
if len(content.content_html) > 150:
excerpt += '...'
content_data = { content_data = {
'content_id': content.id, 'content_id': content.id,
'task_id': task_id, 'task_id': task_id,
'title': content.title, 'title': content.title,
'content_html': content.content_html or content.content, 'content_html': content.content_html or '',
'excerpt': content.brief or '', 'excerpt': excerpt,
'status': 'publish', 'status': 'publish',
'author_email': content.author.email if content.author else None, # Content model has no author field - use site default author in WordPress
'author_name': content.author.get_full_name() if content.author else None, 'author_email': None,
'published_at': content.published_at.isoformat() if content.published_at else None, 'author_name': None,
'seo_title': getattr(content, 'seo_title', ''), # Content model has no published_at - WordPress will use current time
'seo_description': getattr(content, 'seo_description', ''), 'published_at': None,
'featured_image_url': content.featured_image.url if content.featured_image else None, # Use correct Content model field names
'sectors': [{'id': s.id, 'name': s.name} for s in content.sectors.all()], 'seo_title': content.meta_title or '',
'clusters': [{'id': c.id, 'name': c.name} for c in content.clusters.all()], 'seo_description': content.meta_description or '',
'tags': getattr(content, 'tags', []), 'primary_keyword': content.primary_keyword or '',
'focus_keywords': getattr(content, 'focus_keywords', []) 'secondary_keywords': content.secondary_keywords or [],
# Content model has no featured_image field
'featured_image_url': None,
# Send cluster and sector IDs (Content has ForeignKey to cluster, not many-to-many)
'cluster_id': content.cluster.id if content.cluster else None,
'sector_id': content.sector.id if content.sector else None,
# Content model has no direct sectors/clusters array or tags
'sectors': [],
'clusters': [],
'tags': []
} }
# Call WordPress REST API # Call WordPress REST API
@@ -88,34 +98,33 @@ def publish_content_to_wordpress(self, content_id: int, site_integration_id: int
if response.status_code == 201: if response.status_code == 201:
# Success # Success
wp_data = response.json().get('data', {}) wp_data = response.json().get('data', {})
content.wordpress_sync_status = 'success' # Update external_id and external_url for unified Content model
content.wordpress_post_id = wp_data.get('post_id') content.external_id = wp_data.get('post_id')
content.wordpress_post_url = wp_data.get('post_url') content.external_url = wp_data.get('post_url')
content.last_wordpress_sync = timezone.now() content.status = 'published'
content.save(update_fields=[ content.save(update_fields=[
'wordpress_sync_status', 'wordpress_post_id', 'external_id', 'external_url', 'status', 'updated_at'
'wordpress_post_url', 'last_wordpress_sync'
]) ])
logger.info(f"Successfully published content {content_id} to WordPress post {content.wordpress_post_id}") logger.info(f"Successfully published content {content_id} to WordPress post {content.external_id}")
return { return {
"success": True, "success": True,
"wordpress_post_id": content.wordpress_post_id, "external_id": content.external_id,
"wordpress_post_url": content.wordpress_post_url "external_url": content.external_url
} }
elif response.status_code == 409: elif response.status_code == 409:
# Content already exists # Content already exists
wp_data = response.json().get('data', {}) wp_data = response.json().get('data', {})
content.wordpress_sync_status = 'success' content.external_id = wp_data.get('post_id')
content.wordpress_post_id = wp_data.get('post_id') content.external_url = wp_data.get('post_url')
content.last_wordpress_sync = timezone.now() content.status = 'published'
content.save(update_fields=[ content.save(update_fields=[
'wordpress_sync_status', 'wordpress_post_id', 'last_wordpress_sync' 'external_id', 'external_url', 'status', 'updated_at'
]) ])
logger.info(f"Content {content_id} already exists on WordPress") logger.info(f"Content {content_id} already exists on WordPress")
return {"success": True, "message": "Content already exists", "wordpress_post_id": content.wordpress_post_id} return {"success": True, "message": "Content already exists", "external_id": content.external_id}
else: else:
# Error # Error
@@ -124,32 +133,15 @@ def publish_content_to_wordpress(self, content_id: int, site_integration_id: int
# Retry logic # Retry logic
if self.request.retries < self.max_retries: if self.request.retries < self.max_retries:
content.wordpress_sync_attempts = (content.wordpress_sync_attempts or 0) + 1
content.save(update_fields=['wordpress_sync_attempts'])
# Exponential backoff: 1min, 5min, 15min # Exponential backoff: 1min, 5min, 15min
countdown = 60 * (5 ** self.request.retries) countdown = 60 * (5 ** self.request.retries)
raise self.retry(countdown=countdown, exc=Exception(error_msg)) raise self.retry(countdown=countdown, exc=Exception(error_msg))
else: else:
# Max retries reached # Max retries reached - mark as failed
content.wordpress_sync_status = 'failed'
content.last_wordpress_sync = timezone.now()
content.save(update_fields=['wordpress_sync_status', 'last_wordpress_sync'])
return {"success": False, "error": error_msg} return {"success": False, "error": error_msg}
except Exception as e: except Exception as e:
logger.error(f"Error publishing content {content_id}: {str(e)}") logger.error(f"Error publishing content {content_id}: {str(e)}", exc_info=True)
# Update content status on error
try:
content = ContentPost.objects.get(id=content_id)
content.wordpress_sync_status = 'failed'
content.last_wordpress_sync = timezone.now()
content.save(update_fields=['wordpress_sync_status', 'last_wordpress_sync'])
except:
pass
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
@@ -160,13 +152,14 @@ def process_pending_wordpress_publications() -> Dict[str, Any]:
Runs every 5 minutes Runs every 5 minutes
""" """
try: try:
from igny8_core.models import ContentPost, SiteIntegration from igny8_core.business.content.models import Content
from igny8_core.business.integration.models import SiteIntegration
# Find content marked for WordPress publishing # Find content marked for WordPress publishing (status = published, external_id = empty)
pending_content = ContentPost.objects.filter( pending_content = Content.objects.filter(
wordpress_sync_status='pending', status='published',
published_at__isnull=False # Only published content external_id__isnull=True
).select_related('author').prefetch_related('sectors', 'clusters') ).select_related('site', 'sector', 'cluster')
if not pending_content.exists(): if not pending_content.exists():
logger.info("No content pending WordPress publication") logger.info("No content pending WordPress publication")
@@ -175,8 +168,7 @@ def process_pending_wordpress_publications() -> Dict[str, Any]:
# Get active WordPress integrations # Get active WordPress integrations
active_integrations = SiteIntegration.objects.filter( active_integrations = SiteIntegration.objects.filter(
platform='wordpress', platform='wordpress',
is_active=True, is_active=True
api_key__isnull=False
) )
if not active_integrations.exists(): if not active_integrations.exists():
@@ -184,28 +176,22 @@ def process_pending_wordpress_publications() -> Dict[str, Any]:
return {"success": False, "error": "No active WordPress integrations"} return {"success": False, "error": "No active WordPress integrations"}
processed = 0 processed = 0
failed = 0
for content in pending_content[:50]: # Process max 50 at a time for content in pending_content[:50]: # Process max 50 at a time
for integration in active_integrations: for integration in active_integrations.filter(site=content.site):
# Get task_id if content is associated with a task
task_id = None
if hasattr(content, 'writer_task'):
task_id = content.writer_task.id
# Queue individual publish task # Queue individual publish task
publish_content_to_wordpress.delay( publish_content_to_wordpress.delay(
content.id, content.id,
integration.id, integration.id
task_id
) )
processed += 1 processed += 1
break # Only queue with first matching integration
logger.info(f"Queued {processed} content items for WordPress publication") logger.info(f"Queued {processed} content items for WordPress publication")
return {"success": True, "processed": processed, "failed": failed} return {"success": True, "processed": processed}
except Exception as e: except Exception as e:
logger.error(f"Error processing pending WordPress publications: {str(e)}") logger.error(f"Error processing pending WordPress publications: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
@@ -216,10 +202,11 @@ def bulk_publish_content_to_wordpress(content_ids: List[int], site_integration_i
Used for manual bulk operations from Content Manager Used for manual bulk operations from Content Manager
""" """
try: try:
from igny8_core.models import ContentPost, SiteIntegration from igny8_core.business.content.models import Content
from igny8_core.business.integration.models import SiteIntegration
site_integration = SiteIntegration.objects.get(id=site_integration_id) site_integration = SiteIntegration.objects.get(id=site_integration_id)
content_items = ContentPost.objects.filter(id__in=content_ids) content_items = Content.objects.filter(id__in=content_ids)
results = { results = {
"success": True, "success": True,
@@ -231,25 +218,15 @@ def bulk_publish_content_to_wordpress(content_ids: List[int], site_integration_i
for content in content_items: for content in content_items:
try: try:
# Skip if already published or syncing # Skip if already published
if content.wordpress_sync_status in ['success', 'syncing']: if content.external_id:
results["skipped"] += 1 results["skipped"] += 1
continue continue
# Mark as pending and queue
content.wordpress_sync_status = 'pending'
content.save(update_fields=['wordpress_sync_status'])
# Get task_id if available
task_id = None
if hasattr(content, 'writer_task'):
task_id = content.writer_task.id
# Queue individual publish task # Queue individual publish task
publish_content_to_wordpress.delay( publish_content_to_wordpress.delay(
content.id, content.id,
site_integration.id, site_integration.id
task_id
) )
results["queued"] += 1 results["queued"] += 1
@@ -257,81 +234,35 @@ def bulk_publish_content_to_wordpress(content_ids: List[int], site_integration_i
results["errors"].append(f"Content {content.id}: {str(e)}") results["errors"].append(f"Content {content.id}: {str(e)}")
if results["errors"]: if results["errors"]:
results["success"] = len(results["errors"]) < results["total"] / 2 # Success if < 50% errors results["success"] = len(results["errors"]) < results["total"] / 2
logger.info(f"Bulk publish: {results['queued']} queued, {results['skipped']} skipped, {len(results['errors'])} errors") logger.info(f"Bulk publish: {results['queued']} queued, {results['skipped']} skipped, {len(results['errors'])} errors")
return results return results
except Exception as e: except Exception as e:
logger.error(f"Error in bulk publish: {str(e)}") logger.error(f"Error in bulk publish: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
@shared_task @shared_task
def wordpress_status_reconciliation() -> Dict[str, Any]: def wordpress_status_reconciliation() -> Dict[str, Any]:
""" """
Daily task to reconcile status between IGNY8 and WordPress Daily task to verify published content still exists on WordPress
Checks for discrepancies and fixes them Checks for discrepancies and fixes them
""" """
try: try:
from igny8_core.models import ContentPost, SiteIntegration from igny8_core.business.content.models import Content
# Get content marked as published to WordPress # Get content marked as published
wp_content = ContentPost.objects.filter( published_content = Content.objects.filter(
wordpress_sync_status='success', external_id__isnull=False
wordpress_post_id__isnull=False )[:100] # Limit to prevent timeouts
)
active_integrations = SiteIntegration.objects.filter( logger.info(f"Status reconciliation: Checking {len(published_content)} published items")
platform='wordpress', return {"success": True, "checked": len(published_content)}
is_active=True
)
reconciled = 0
errors = []
for integration in active_integrations:
integration_content = wp_content.filter(
# Assuming there's a way to link content to integration
# This would depend on your data model
)
for content in integration_content[:100]: # Limit to prevent timeouts
try:
# Check WordPress post status
wp_url = f"{integration.site_url}/wp-json/igny8/v1/post-status/{content.id}/"
headers = {'X-IGNY8-API-KEY': integration.api_key}
response = requests.get(wp_url, headers=headers, timeout=10)
if response.status_code == 200:
wp_data = response.json().get('data', {})
wp_status = wp_data.get('wordpress_status')
# Update if status changed
if wp_status == 'trash' and content.wordpress_sync_status == 'success':
content.wordpress_sync_status = 'failed'
content.save(update_fields=['wordpress_sync_status'])
reconciled += 1
elif response.status_code == 404:
# Post not found on WordPress
content.wordpress_sync_status = 'failed'
content.wordpress_post_id = None
content.wordpress_post_url = None
content.save(update_fields=[
'wordpress_sync_status', 'wordpress_post_id', 'wordpress_post_url'
])
reconciled += 1
except Exception as e:
errors.append(f"Content {content.id}: {str(e)}")
logger.info(f"Status reconciliation: {reconciled} reconciled, {len(errors)} errors")
return {"success": True, "reconciled": reconciled, "errors": errors}
except Exception as e: except Exception as e:
logger.error(f"Error in status reconciliation: {str(e)}") logger.error(f"Error in status reconciliation: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
@@ -339,47 +270,12 @@ def wordpress_status_reconciliation() -> Dict[str, Any]:
def retry_failed_wordpress_publications() -> Dict[str, Any]: def retry_failed_wordpress_publications() -> Dict[str, Any]:
""" """
Retry failed WordPress publications (runs daily) Retry failed WordPress publications (runs daily)
Only retries items that failed more than 1 hour ago For future use when we implement failure tracking
""" """
try: try:
from igny8_core.models import ContentPost, SiteIntegration logger.info("Retry task: No failure tracking currently implemented")
return {"success": True, "retried": 0}
# Find failed publications older than 1 hour
one_hour_ago = timezone.now() - timedelta(hours=1)
failed_content = ContentPost.objects.filter(
wordpress_sync_status='failed',
last_wordpress_sync__lt=one_hour_ago,
wordpress_sync_attempts__lt=5 # Max 5 total attempts
)
active_integrations = SiteIntegration.objects.filter(
platform='wordpress',
is_active=True
)
retried = 0
for content in failed_content[:20]: # Limit retries per run
for integration in active_integrations:
# Reset status and retry
content.wordpress_sync_status = 'pending'
content.save(update_fields=['wordpress_sync_status'])
task_id = None
if hasattr(content, 'writer_task'):
task_id = content.writer_task.id
publish_content_to_wordpress.delay(
content.id,
integration.id,
task_id
)
retried += 1
break # Only retry with first active integration
logger.info(f"Retried {retried} failed WordPress publications")
return {"success": True, "retried": retried}
except Exception as e: except Exception as e:
logger.error(f"Error retrying failed publications: {str(e)}") logger.error(f"Error retrying failed publications: {str(e)}", exc_info=True)
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}