Section 3-8 - #MIgration Runs -
Multiple Migfeat: Update publishing terminology and add publishing settings - Changed references from "WordPress" to "Site" across multiple components for consistency. - Introduced a new "Publishing" tab in Site Settings to manage automatic content approval and publishing behavior. - Added publishing settings model to the backend with fields for auto-approval, auto-publish, and publishing limits. - Implemented Celery tasks for scheduling and processing automated content publishing. - Enhanced Writer Dashboard to include metrics for content published to the site and scheduled for publishing.
This commit is contained in:
11
backend/igny8_core/tasks/__init__.py
Normal file
11
backend/igny8_core/tasks/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""
|
||||
IGNY8 Celery Tasks
|
||||
|
||||
This module contains all Celery background tasks for the application.
|
||||
"""
|
||||
|
||||
# Import all task modules to ensure they're registered with Celery
|
||||
from igny8_core.tasks.plan_limits import *
|
||||
from igny8_core.tasks.backup import *
|
||||
from igny8_core.tasks.wordpress_publishing import *
|
||||
from igny8_core.tasks.publishing_scheduler import *
|
||||
361
backend/igny8_core/tasks/publishing_scheduler.py
Normal file
361
backend/igny8_core/tasks/publishing_scheduler.py
Normal file
@@ -0,0 +1,361 @@
|
||||
"""
|
||||
IGNY8 Publishing Scheduler Tasks
|
||||
|
||||
Celery tasks for scheduling and processing automated content publishing:
|
||||
1. schedule_approved_content: Runs hourly - schedules approved content for publishing
|
||||
2. process_scheduled_publications: Runs every 5 minutes - processes scheduled content
|
||||
"""
|
||||
from celery import shared_task
|
||||
from django.utils import timezone
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
from typing import Dict, Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task(name='publishing.schedule_approved_content')
|
||||
def schedule_approved_content() -> Dict[str, Any]:
|
||||
"""
|
||||
Hourly task that schedules approved content for publishing based on PublishingSettings.
|
||||
|
||||
For each site with PublishingSettings.auto_publish_enabled:
|
||||
1. Gets all content with status='approved' and site_status='not_published'
|
||||
2. Calculates next available publish slots based on:
|
||||
- publish_days (which days are allowed)
|
||||
- publish_time_slots (which times are allowed)
|
||||
- daily/weekly/monthly limits
|
||||
3. Sets scheduled_publish_at and site_status='scheduled'
|
||||
|
||||
Returns:
|
||||
Dict with scheduling results per site
|
||||
"""
|
||||
from igny8_core.business.content.models import Content
|
||||
from igny8_core.business.integration.models import PublishingSettings
|
||||
from igny8_core.auth.models import Site
|
||||
|
||||
results = {
|
||||
'sites_processed': 0,
|
||||
'content_scheduled': 0,
|
||||
'errors': [],
|
||||
'details': []
|
||||
}
|
||||
|
||||
try:
|
||||
# Get all sites with auto_publish enabled
|
||||
sites_with_settings = PublishingSettings.objects.filter(auto_publish_enabled=True)
|
||||
|
||||
for settings in sites_with_settings:
|
||||
site = settings.site
|
||||
site_result = {
|
||||
'site_id': site.id,
|
||||
'site_name': site.name,
|
||||
'scheduled_count': 0
|
||||
}
|
||||
|
||||
try:
|
||||
# Get approved content that's not yet scheduled
|
||||
pending_content = Content.objects.filter(
|
||||
site=site,
|
||||
status='approved',
|
||||
site_status='not_published',
|
||||
scheduled_publish_at__isnull=True
|
||||
).order_by('created_at')
|
||||
|
||||
if not pending_content.exists():
|
||||
logger.debug(f"Site {site.id}: No pending content to schedule")
|
||||
results['details'].append(site_result)
|
||||
results['sites_processed'] += 1
|
||||
continue
|
||||
|
||||
# Calculate available slots
|
||||
available_slots = _calculate_available_slots(settings, site)
|
||||
|
||||
# Assign slots to content
|
||||
for i, content in enumerate(pending_content):
|
||||
if i >= len(available_slots):
|
||||
logger.info(f"Site {site.id}: No more slots available (limit reached)")
|
||||
break
|
||||
|
||||
scheduled_time = available_slots[i]
|
||||
content.scheduled_publish_at = scheduled_time
|
||||
content.site_status = 'scheduled'
|
||||
content.site_status_updated_at = timezone.now()
|
||||
content.save(update_fields=['scheduled_publish_at', 'site_status', 'site_status_updated_at'])
|
||||
|
||||
site_result['scheduled_count'] += 1
|
||||
results['content_scheduled'] += 1
|
||||
|
||||
logger.info(f"Scheduled content {content.id} for {scheduled_time}")
|
||||
|
||||
results['details'].append(site_result)
|
||||
results['sites_processed'] += 1
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error processing site {site.id}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
results['errors'].append(error_msg)
|
||||
|
||||
logger.info(f"Publishing scheduler completed: {results['content_scheduled']} content items scheduled across {results['sites_processed']} sites")
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Fatal error in schedule_approved_content: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
results['errors'].append(error_msg)
|
||||
return results
|
||||
|
||||
|
||||
def _calculate_available_slots(settings: 'PublishingSettings', site: 'Site') -> list:
|
||||
"""
|
||||
Calculate available publishing time slots based on settings and limits.
|
||||
|
||||
Args:
|
||||
settings: PublishingSettings instance
|
||||
site: Site instance
|
||||
|
||||
Returns:
|
||||
List of datetime objects representing available slots
|
||||
"""
|
||||
from igny8_core.business.content.models import Content
|
||||
|
||||
now = timezone.now()
|
||||
slots = []
|
||||
|
||||
# Get configured days and times
|
||||
publish_days = settings.publish_days or ['mon', 'tue', 'wed', 'thu', 'fri']
|
||||
publish_times = settings.publish_time_slots or ['09:00', '14:00', '18:00']
|
||||
|
||||
# Day name mapping
|
||||
day_map = {
|
||||
'mon': 0, 'tue': 1, 'wed': 2, 'thu': 3,
|
||||
'fri': 4, 'sat': 5, 'sun': 6
|
||||
}
|
||||
allowed_days = [day_map.get(d.lower(), -1) for d in publish_days]
|
||||
allowed_days = [d for d in allowed_days if d >= 0]
|
||||
|
||||
# Parse time slots
|
||||
time_slots = []
|
||||
for time_str in publish_times:
|
||||
try:
|
||||
hour, minute = map(int, time_str.split(':'))
|
||||
time_slots.append((hour, minute))
|
||||
except (ValueError, AttributeError):
|
||||
continue
|
||||
|
||||
if not time_slots:
|
||||
time_slots = [(9, 0), (14, 0), (18, 0)]
|
||||
|
||||
# Calculate limits
|
||||
daily_limit = settings.daily_publish_limit
|
||||
weekly_limit = settings.weekly_publish_limit
|
||||
monthly_limit = settings.monthly_publish_limit
|
||||
|
||||
# Count existing scheduled/published content
|
||||
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
week_start = today_start - timedelta(days=now.weekday())
|
||||
month_start = today_start.replace(day=1)
|
||||
|
||||
daily_count = Content.objects.filter(
|
||||
site=site,
|
||||
site_status__in=['scheduled', 'publishing', 'published'],
|
||||
scheduled_publish_at__gte=today_start
|
||||
).count()
|
||||
|
||||
weekly_count = Content.objects.filter(
|
||||
site=site,
|
||||
site_status__in=['scheduled', 'publishing', 'published'],
|
||||
scheduled_publish_at__gte=week_start
|
||||
).count()
|
||||
|
||||
monthly_count = Content.objects.filter(
|
||||
site=site,
|
||||
site_status__in=['scheduled', 'publishing', 'published'],
|
||||
scheduled_publish_at__gte=month_start
|
||||
).count()
|
||||
|
||||
# Generate slots for next 30 days
|
||||
current_date = now.date()
|
||||
slots_per_day = {} # Track slots used per day
|
||||
|
||||
for day_offset in range(30):
|
||||
check_date = current_date + timedelta(days=day_offset)
|
||||
|
||||
# Check if day is allowed
|
||||
if check_date.weekday() not in allowed_days:
|
||||
continue
|
||||
|
||||
for hour, minute in time_slots:
|
||||
slot_time = timezone.make_aware(
|
||||
datetime.combine(check_date, datetime.min.time().replace(hour=hour, minute=minute))
|
||||
)
|
||||
|
||||
# Skip if in the past
|
||||
if slot_time <= now:
|
||||
continue
|
||||
|
||||
# Check daily limit
|
||||
day_key = check_date.isoformat()
|
||||
slots_this_day = slots_per_day.get(day_key, 0)
|
||||
if daily_limit and (daily_count + slots_this_day) >= daily_limit:
|
||||
continue
|
||||
|
||||
# Check weekly limit
|
||||
slot_week_start = slot_time - timedelta(days=slot_time.weekday())
|
||||
if slot_week_start.date() == week_start.date():
|
||||
scheduled_in_week = weekly_count + len([s for s in slots if s >= week_start])
|
||||
if weekly_limit and scheduled_in_week >= weekly_limit:
|
||||
continue
|
||||
|
||||
# Check monthly limit
|
||||
if slot_time.month == now.month and slot_time.year == now.year:
|
||||
scheduled_in_month = monthly_count + len([s for s in slots if s.month == now.month])
|
||||
if monthly_limit and scheduled_in_month >= monthly_limit:
|
||||
continue
|
||||
|
||||
slots.append(slot_time)
|
||||
slots_per_day[day_key] = slots_per_day.get(day_key, 0) + 1
|
||||
|
||||
# Limit total slots to prevent memory issues
|
||||
if len(slots) >= 100:
|
||||
return slots
|
||||
|
||||
return slots
|
||||
|
||||
|
||||
@shared_task(name='publishing.process_scheduled_publications')
|
||||
def process_scheduled_publications() -> Dict[str, Any]:
|
||||
"""
|
||||
Every 5 minutes: Process content scheduled for publishing.
|
||||
|
||||
Finds all content where:
|
||||
- site_status = 'scheduled'
|
||||
- scheduled_publish_at <= now
|
||||
|
||||
For each, triggers the WordPress publishing task.
|
||||
|
||||
Returns:
|
||||
Dict with processing results
|
||||
"""
|
||||
from igny8_core.business.content.models import Content
|
||||
from igny8_core.business.integration.models import SiteIntegration
|
||||
from igny8_core.tasks.wordpress_publishing import publish_content_to_wordpress
|
||||
|
||||
results = {
|
||||
'processed': 0,
|
||||
'published': 0,
|
||||
'failed': 0,
|
||||
'errors': []
|
||||
}
|
||||
|
||||
now = timezone.now()
|
||||
|
||||
try:
|
||||
# Get all scheduled content that's due
|
||||
due_content = Content.objects.filter(
|
||||
site_status='scheduled',
|
||||
scheduled_publish_at__lte=now
|
||||
).select_related('site', 'task')
|
||||
|
||||
for content in due_content:
|
||||
results['processed'] += 1
|
||||
|
||||
try:
|
||||
# Update status to publishing
|
||||
content.site_status = 'publishing'
|
||||
content.site_status_updated_at = timezone.now()
|
||||
content.save(update_fields=['site_status', 'site_status_updated_at'])
|
||||
|
||||
# Get site integration
|
||||
site_integration = SiteIntegration.objects.filter(
|
||||
site=content.site,
|
||||
platform='wordpress',
|
||||
is_active=True
|
||||
).first()
|
||||
|
||||
if not site_integration:
|
||||
error_msg = f"No active WordPress integration for site {content.site_id}"
|
||||
logger.error(error_msg)
|
||||
content.site_status = 'failed'
|
||||
content.site_status_updated_at = timezone.now()
|
||||
content.save(update_fields=['site_status', 'site_status_updated_at'])
|
||||
results['failed'] += 1
|
||||
results['errors'].append(error_msg)
|
||||
continue
|
||||
|
||||
# Queue the WordPress publishing task
|
||||
task_id = content.task_id if hasattr(content, 'task') and content.task else None
|
||||
publish_content_to_wordpress.delay(
|
||||
content_id=content.id,
|
||||
site_integration_id=site_integration.id,
|
||||
task_id=task_id
|
||||
)
|
||||
|
||||
logger.info(f"Queued content {content.id} for WordPress publishing")
|
||||
results['published'] += 1
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error processing content {content.id}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
content.site_status = 'failed'
|
||||
content.site_status_updated_at = timezone.now()
|
||||
content.save(update_fields=['site_status', 'site_status_updated_at'])
|
||||
results['failed'] += 1
|
||||
results['errors'].append(error_msg)
|
||||
|
||||
logger.info(f"Processing completed: {results['published']}/{results['processed']} published successfully")
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Fatal error in process_scheduled_publications: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
results['errors'].append(error_msg)
|
||||
return results
|
||||
|
||||
|
||||
@shared_task(name='publishing.update_content_site_status')
|
||||
def update_content_site_status(content_id: int, new_status: str, external_id: str = None, external_url: str = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Update content site_status after WordPress publishing completes.
|
||||
Called by the WordPress publishing task upon success or failure.
|
||||
|
||||
Args:
|
||||
content_id: Content ID to update
|
||||
new_status: New site_status ('published' or 'failed')
|
||||
external_id: WordPress post ID (on success)
|
||||
external_url: WordPress post URL (on success)
|
||||
|
||||
Returns:
|
||||
Dict with update result
|
||||
"""
|
||||
from igny8_core.business.content.models import Content
|
||||
|
||||
try:
|
||||
content = Content.objects.get(id=content_id)
|
||||
content.site_status = new_status
|
||||
content.site_status_updated_at = timezone.now()
|
||||
|
||||
if external_id:
|
||||
content.external_id = external_id
|
||||
if external_url:
|
||||
content.external_url = external_url
|
||||
|
||||
update_fields = ['site_status', 'site_status_updated_at']
|
||||
if external_id:
|
||||
update_fields.append('external_id')
|
||||
if external_url:
|
||||
update_fields.append('external_url')
|
||||
|
||||
content.save(update_fields=update_fields)
|
||||
|
||||
logger.info(f"Updated content {content_id} site_status to {new_status}")
|
||||
return {'success': True, 'content_id': content_id, 'new_status': new_status}
|
||||
|
||||
except Content.DoesNotExist:
|
||||
error_msg = f"Content {content_id} not found"
|
||||
logger.error(error_msg)
|
||||
return {'success': False, 'error': error_msg}
|
||||
except Exception as e:
|
||||
error_msg = f"Error updating content {content_id}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
return {'success': False, 'error': error_msg}
|
||||
@@ -354,12 +354,14 @@ def publish_content_to_wordpress(self, content_id: int, site_integration_id: int
|
||||
content.external_id = str(wp_data.get('post_id'))
|
||||
content.external_url = wp_data.get('post_url')
|
||||
content.status = 'published'
|
||||
content.site_status = 'published'
|
||||
content.site_status_updated_at = timezone.now()
|
||||
|
||||
if not hasattr(content, 'metadata') or content.metadata is None:
|
||||
content.metadata = {}
|
||||
content.metadata['wordpress_status'] = wp_status
|
||||
|
||||
content.save(update_fields=['external_id', 'external_url', 'status', 'metadata', 'updated_at'])
|
||||
content.save(update_fields=['external_id', 'external_url', 'status', 'site_status', 'site_status_updated_at', 'metadata', 'updated_at'])
|
||||
|
||||
publish_logger.info(f" {log_prefix} ✅ Content model updated:")
|
||||
publish_logger.info(f" {log_prefix} External ID: {content.external_id}")
|
||||
@@ -425,12 +427,14 @@ def publish_content_to_wordpress(self, content_id: int, site_integration_id: int
|
||||
content.external_id = str(wp_data.get('post_id'))
|
||||
content.external_url = wp_data.get('post_url')
|
||||
content.status = 'published'
|
||||
content.site_status = 'published'
|
||||
content.site_status_updated_at = timezone.now()
|
||||
|
||||
if not hasattr(content, 'metadata') or content.metadata is None:
|
||||
content.metadata = {}
|
||||
content.metadata['wordpress_status'] = wp_status
|
||||
|
||||
content.save(update_fields=['external_id', 'external_url', 'status', 'metadata', 'updated_at'])
|
||||
content.save(update_fields=['external_id', 'external_url', 'status', 'site_status', 'site_status_updated_at', 'metadata', 'updated_at'])
|
||||
|
||||
# Log sync event
|
||||
duration_ms = int((time.time() - start_time) * 1000)
|
||||
@@ -525,6 +529,16 @@ def publish_content_to_wordpress(self, content_id: int, site_integration_id: int
|
||||
publish_logger.error(f" {prefix} Duration: {duration_ms}ms")
|
||||
publish_logger.error("="*80, exc_info=True)
|
||||
|
||||
# Update site_status to failed if content was loaded
|
||||
try:
|
||||
if content and content.id:
|
||||
content.site_status = 'failed'
|
||||
content.site_status_updated_at = timezone.now()
|
||||
content.save(update_fields=['site_status', 'site_status_updated_at'])
|
||||
publish_logger.info(f"Updated content {content.id} site_status to 'failed'")
|
||||
except Exception as update_error:
|
||||
publish_logger.error(f"Failed to update site_status: {str(update_error)}")
|
||||
|
||||
# Try to log sync event
|
||||
try:
|
||||
from igny8_core.business.integration.models import SyncEvent
|
||||
|
||||
Reference in New Issue
Block a user