590 lines
24 KiB
Python
590 lines
24 KiB
Python
"""
|
|
IGNY8 Publishing Scheduler Tasks
|
|
|
|
Celery tasks for scheduling and processing automated content publishing:
|
|
1. schedule_approved_content: Runs hourly - schedules approved content for publishing
|
|
2. process_scheduled_publications: Runs every 5 minutes - processes scheduled content
|
|
"""
|
|
from celery import shared_task
|
|
from django.utils import timezone
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
from typing import Dict, Any
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@shared_task(name='publishing.schedule_approved_content')
|
|
def schedule_approved_content() -> Dict[str, Any]:
|
|
"""
|
|
Hourly task that schedules approved content for publishing based on PublishingSettings.
|
|
|
|
For each site with PublishingSettings.auto_publish_enabled:
|
|
1. Gets all content with status='approved' and site_status='not_published'
|
|
2. Calculates next available publish slots based on:
|
|
- publish_days (which days are allowed)
|
|
- publish_time_slots (which times are allowed)
|
|
- daily/weekly/monthly limits
|
|
3. Sets scheduled_publish_at and site_status='scheduled'
|
|
|
|
Returns:
|
|
Dict with scheduling results per site
|
|
"""
|
|
from igny8_core.business.content.models import Content
|
|
from igny8_core.business.integration.models import PublishingSettings
|
|
from igny8_core.auth.models import Site
|
|
|
|
results = {
|
|
'sites_processed': 0,
|
|
'content_scheduled': 0,
|
|
'errors': [],
|
|
'details': []
|
|
}
|
|
|
|
try:
|
|
# Get all sites with auto_publish enabled
|
|
sites_with_settings = PublishingSettings.objects.filter(auto_publish_enabled=True)
|
|
|
|
for settings in sites_with_settings:
|
|
site = settings.site
|
|
site_result = {
|
|
'site_id': site.id,
|
|
'site_name': site.name,
|
|
'scheduled_count': 0
|
|
}
|
|
|
|
try:
|
|
# Get approved content that's not yet scheduled
|
|
pending_content = Content.objects.filter(
|
|
site=site,
|
|
status='approved',
|
|
site_status='not_published',
|
|
scheduled_publish_at__isnull=True
|
|
).order_by('created_at')
|
|
|
|
if not pending_content.exists():
|
|
logger.debug(f"Site {site.id}: No pending content to schedule")
|
|
results['details'].append(site_result)
|
|
results['sites_processed'] += 1
|
|
continue
|
|
|
|
# Handle immediate mode - schedule for now (will be picked up by process_scheduled_publications)
|
|
if settings.scheduling_mode == 'immediate':
|
|
for content in pending_content:
|
|
content.scheduled_publish_at = timezone.now()
|
|
content.site_status = 'scheduled'
|
|
content.site_status_updated_at = timezone.now()
|
|
content.save(update_fields=['scheduled_publish_at', 'site_status', 'site_status_updated_at'])
|
|
|
|
site_result['scheduled_count'] += 1
|
|
results['content_scheduled'] += 1
|
|
logger.info(f"Scheduled content {content.id} for immediate publishing")
|
|
|
|
results['details'].append(site_result)
|
|
results['sites_processed'] += 1
|
|
continue
|
|
|
|
# Calculate available slots for time_slots and stagger modes
|
|
available_slots = _calculate_available_slots(settings, site)
|
|
|
|
# Assign slots to content
|
|
for i, content in enumerate(pending_content):
|
|
if i >= len(available_slots):
|
|
logger.info(f"Site {site.id}: No more slots available (limit reached)")
|
|
break
|
|
|
|
scheduled_time = available_slots[i]
|
|
content.scheduled_publish_at = scheduled_time
|
|
content.site_status = 'scheduled'
|
|
content.site_status_updated_at = timezone.now()
|
|
content.save(update_fields=['scheduled_publish_at', 'site_status', 'site_status_updated_at'])
|
|
|
|
site_result['scheduled_count'] += 1
|
|
results['content_scheduled'] += 1
|
|
|
|
logger.info(f"Scheduled content {content.id} for {scheduled_time}")
|
|
|
|
results['details'].append(site_result)
|
|
results['sites_processed'] += 1
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error processing site {site.id}: {str(e)}"
|
|
logger.error(error_msg)
|
|
results['errors'].append(error_msg)
|
|
|
|
logger.info(f"Publishing scheduler completed: {results['content_scheduled']} content items scheduled across {results['sites_processed']} sites")
|
|
return results
|
|
|
|
except Exception as e:
|
|
error_msg = f"Fatal error in schedule_approved_content: {str(e)}"
|
|
logger.error(error_msg)
|
|
results['errors'].append(error_msg)
|
|
return results
|
|
|
|
|
|
def _calculate_available_slots(settings: 'PublishingSettings', site: 'Site') -> list:
|
|
"""
|
|
Calculate available publishing time slots based on settings and limits.
|
|
|
|
Supports three scheduling modes:
|
|
- time_slots: Publish at specific configured times each day
|
|
- stagger: Spread evenly throughout publish hours
|
|
- immediate: No scheduling - return immediately (handled separately)
|
|
|
|
Args:
|
|
settings: PublishingSettings instance
|
|
site: Site instance
|
|
|
|
Returns:
|
|
List of datetime objects representing available slots
|
|
"""
|
|
from igny8_core.business.content.models import Content
|
|
|
|
now = timezone.now()
|
|
|
|
# Immediate mode - return empty list (content published immediately in process_scheduled_publications)
|
|
if settings.scheduling_mode == 'immediate':
|
|
return []
|
|
|
|
# Common setup
|
|
publish_days = settings.publish_days or ['mon', 'tue', 'wed', 'thu', 'fri']
|
|
day_map = {
|
|
'mon': 0, 'tue': 1, 'wed': 2, 'thu': 3,
|
|
'fri': 4, 'sat': 5, 'sun': 6
|
|
}
|
|
allowed_days = [day_map.get(d.lower(), -1) for d in publish_days]
|
|
allowed_days = [d for d in allowed_days if d >= 0]
|
|
|
|
# Calculate limits from configured publish days/slots
|
|
daily_limit = settings.daily_capacity
|
|
weekly_limit = settings.weekly_capacity
|
|
monthly_limit = settings.monthly_capacity
|
|
queue_limit = getattr(settings, 'queue_limit', 100) or 100
|
|
|
|
# Route to appropriate slot generator
|
|
# Always use time_slots mode for scheduling
|
|
account_timezone = getattr(site.account, 'account_timezone', 'UTC') if hasattr(site, 'account') else 'UTC'
|
|
|
|
return _generate_time_slot_slots(
|
|
settings, site, now, allowed_days,
|
|
daily_limit, weekly_limit, monthly_limit, queue_limit,
|
|
account_timezone
|
|
)
|
|
|
|
|
|
def _generate_time_slot_slots(
|
|
settings, site, now, allowed_days,
|
|
daily_limit, weekly_limit, monthly_limit, queue_limit,
|
|
account_timezone: str
|
|
) -> list:
|
|
"""Generate slots based on specific time slots (original mode)."""
|
|
from igny8_core.business.content.models import Content
|
|
|
|
slots = []
|
|
publish_times = settings.publish_time_slots or ['09:00', '14:00', '18:00']
|
|
|
|
# Parse time slots
|
|
time_slots = []
|
|
for time_str in publish_times:
|
|
try:
|
|
hour, minute = map(int, time_str.split(':'))
|
|
time_slots.append((hour, minute))
|
|
except (ValueError, AttributeError):
|
|
continue
|
|
|
|
if not time_slots:
|
|
time_slots = [(9, 0), (14, 0), (18, 0)]
|
|
|
|
current_date = now.date()
|
|
slots_per_day = {}
|
|
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
from zoneinfo import ZoneInfo
|
|
tzinfo = ZoneInfo(account_timezone or 'UTC')
|
|
|
|
for day_offset in range(90): # Look 90 days ahead
|
|
check_date = current_date + timedelta(days=day_offset)
|
|
|
|
if check_date.weekday() not in allowed_days:
|
|
continue
|
|
|
|
# Existing scheduled times for this day to avoid conflicts
|
|
existing_times = set(
|
|
Content.objects.filter(
|
|
site=site,
|
|
site_status__in=['scheduled', 'publishing', 'published'],
|
|
scheduled_publish_at__date=check_date
|
|
).values_list('scheduled_publish_at', flat=True)
|
|
)
|
|
|
|
for hour, minute in time_slots:
|
|
slot_time = datetime.combine(check_date, datetime.min.time().replace(hour=hour, minute=minute))
|
|
slot_time = slot_time.replace(tzinfo=tzinfo)
|
|
|
|
# Skip if in the past
|
|
if slot_time <= now:
|
|
continue
|
|
|
|
# Skip if slot already occupied
|
|
if slot_time in existing_times:
|
|
continue
|
|
|
|
# Count existing scheduled/published content for this day/week/month
|
|
day_start = timezone.make_aware(datetime.combine(check_date, datetime.min.time()))
|
|
day_end = day_start + timedelta(days=1)
|
|
existing_day_count = Content.objects.filter(
|
|
site=site,
|
|
site_status__in=['scheduled', 'publishing', 'published'],
|
|
scheduled_publish_at__gte=day_start,
|
|
scheduled_publish_at__lt=day_end
|
|
).count()
|
|
|
|
week_start = day_start - timedelta(days=day_start.weekday())
|
|
week_end = week_start + timedelta(days=7)
|
|
existing_week_count = Content.objects.filter(
|
|
site=site,
|
|
site_status__in=['scheduled', 'publishing', 'published'],
|
|
scheduled_publish_at__gte=week_start,
|
|
scheduled_publish_at__lt=week_end
|
|
).count()
|
|
|
|
month_start = day_start.replace(day=1)
|
|
if month_start.month == 12:
|
|
next_month_start = month_start.replace(year=month_start.year + 1, month=1)
|
|
else:
|
|
next_month_start = month_start.replace(month=month_start.month + 1)
|
|
existing_month_count = Content.objects.filter(
|
|
site=site,
|
|
site_status__in=['scheduled', 'publishing', 'published'],
|
|
scheduled_publish_at__gte=month_start,
|
|
scheduled_publish_at__lt=next_month_start
|
|
).count()
|
|
|
|
# Check daily limit
|
|
day_key = check_date.isoformat()
|
|
slots_this_day = slots_per_day.get(day_key, 0)
|
|
if daily_limit and (existing_day_count + slots_this_day) >= daily_limit:
|
|
continue
|
|
|
|
# Check weekly limit
|
|
scheduled_in_week = existing_week_count + len([
|
|
s for s in slots if s >= week_start and s < week_end
|
|
])
|
|
if weekly_limit and scheduled_in_week >= weekly_limit:
|
|
continue
|
|
|
|
# Check monthly limit
|
|
scheduled_in_month = existing_month_count + len([
|
|
s for s in slots if s >= month_start and s < next_month_start
|
|
])
|
|
if monthly_limit and scheduled_in_month >= monthly_limit:
|
|
continue
|
|
|
|
slots.append(slot_time)
|
|
slots_per_day[day_key] = slots_per_day.get(day_key, 0) + 1
|
|
|
|
# Respect queue limit
|
|
if len(slots) >= queue_limit:
|
|
return slots
|
|
|
|
return slots
|
|
|
|
|
|
def _generate_stagger_slots(
|
|
settings, site, now, allowed_days,
|
|
daily_limit, weekly_limit, monthly_limit, queue_limit
|
|
) -> list:
|
|
"""
|
|
Generate slots spread evenly throughout the publishing window.
|
|
|
|
Distributes content throughout the day based on stagger_start_time,
|
|
stagger_end_time, and stagger_interval_minutes.
|
|
"""
|
|
from igny8_core.business.content.models import Content
|
|
|
|
slots = []
|
|
|
|
# Get stagger settings with defaults
|
|
start_hour, start_minute = 9, 0
|
|
end_hour, end_minute = 18, 0
|
|
|
|
if hasattr(settings, 'stagger_start_time') and settings.stagger_start_time:
|
|
start_hour = settings.stagger_start_time.hour
|
|
start_minute = settings.stagger_start_time.minute
|
|
|
|
if hasattr(settings, 'stagger_end_time') and settings.stagger_end_time:
|
|
end_hour = settings.stagger_end_time.hour
|
|
end_minute = settings.stagger_end_time.minute
|
|
|
|
interval_minutes = getattr(settings, 'stagger_interval_minutes', 30) or 30
|
|
interval = timedelta(minutes=interval_minutes)
|
|
|
|
current_date = now.date()
|
|
slots_per_day = {}
|
|
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
|
for day_offset in range(90): # Look 90 days ahead
|
|
check_date = current_date + timedelta(days=day_offset)
|
|
|
|
if check_date.weekday() not in allowed_days:
|
|
continue
|
|
|
|
# Day's publishing window
|
|
day_start = timezone.make_aware(
|
|
datetime.combine(check_date, datetime.min.time().replace(hour=start_hour, minute=start_minute))
|
|
)
|
|
day_end = timezone.make_aware(
|
|
datetime.combine(check_date, datetime.min.time().replace(hour=end_hour, minute=end_minute))
|
|
)
|
|
|
|
# Get existing scheduled times for this day to avoid conflicts
|
|
existing_times = set(
|
|
Content.objects.filter(
|
|
site=site,
|
|
site_status='scheduled',
|
|
scheduled_publish_at__date=check_date
|
|
).values_list('scheduled_publish_at', flat=True)
|
|
)
|
|
|
|
# Count existing scheduled/published content for this day/week/month
|
|
day_start = timezone.make_aware(datetime.combine(check_date, datetime.min.time()))
|
|
day_end = day_start + timedelta(days=1)
|
|
existing_day_count = Content.objects.filter(
|
|
site=site,
|
|
site_status__in=['scheduled', 'publishing', 'published'],
|
|
scheduled_publish_at__gte=day_start,
|
|
scheduled_publish_at__lt=day_end
|
|
).count()
|
|
|
|
week_start = day_start - timedelta(days=day_start.weekday())
|
|
week_end = week_start + timedelta(days=7)
|
|
existing_week_count = Content.objects.filter(
|
|
site=site,
|
|
site_status__in=['scheduled', 'publishing', 'published'],
|
|
scheduled_publish_at__gte=week_start,
|
|
scheduled_publish_at__lt=week_end
|
|
).count()
|
|
|
|
month_start = day_start.replace(day=1)
|
|
if month_start.month == 12:
|
|
next_month_start = month_start.replace(year=month_start.year + 1, month=1)
|
|
else:
|
|
next_month_start = month_start.replace(month=month_start.month + 1)
|
|
existing_month_count = Content.objects.filter(
|
|
site=site,
|
|
site_status__in=['scheduled', 'publishing', 'published'],
|
|
scheduled_publish_at__gte=month_start,
|
|
scheduled_publish_at__lt=next_month_start
|
|
).count()
|
|
|
|
# Start slot calculation
|
|
current_slot = day_start
|
|
if check_date == current_date and now > day_start:
|
|
# Start from next interval after now
|
|
minutes_since_start = (now - day_start).total_seconds() / 60
|
|
intervals_passed = int(minutes_since_start / interval_minutes) + 1
|
|
current_slot = day_start + timedelta(minutes=intervals_passed * interval_minutes)
|
|
|
|
day_key = check_date.isoformat()
|
|
|
|
while current_slot <= day_end:
|
|
# Check daily limit
|
|
slots_this_day = slots_per_day.get(day_key, 0)
|
|
if daily_limit and (existing_day_count + slots_this_day) >= daily_limit:
|
|
break # Move to next day
|
|
|
|
# Check weekly limit
|
|
scheduled_in_week = existing_week_count + len([
|
|
s for s in slots if s >= week_start and s < week_end
|
|
])
|
|
if weekly_limit and scheduled_in_week >= weekly_limit:
|
|
current_slot += interval
|
|
continue
|
|
|
|
# Check monthly limit
|
|
scheduled_in_month = existing_month_count + len([
|
|
s for s in slots if s >= month_start and s < next_month_start
|
|
])
|
|
if monthly_limit and scheduled_in_month >= monthly_limit:
|
|
current_slot += interval
|
|
continue
|
|
|
|
# Avoid existing scheduled times
|
|
if current_slot not in existing_times:
|
|
slots.append(current_slot)
|
|
slots_per_day[day_key] = slots_per_day.get(day_key, 0) + 1
|
|
|
|
current_slot += interval
|
|
|
|
# Respect queue limit
|
|
if len(slots) >= queue_limit:
|
|
return slots
|
|
|
|
return slots
|
|
|
|
|
|
@shared_task(name='publishing.process_scheduled_publications')
|
|
def process_scheduled_publications() -> Dict[str, Any]:
|
|
"""
|
|
Every 5 minutes: Process content scheduled for publishing.
|
|
|
|
Finds all content where:
|
|
- site_status = 'scheduled'
|
|
- scheduled_publish_at <= now
|
|
|
|
For each, triggers publishing via PublisherService (current system).
|
|
|
|
UPDATED: Uses Site.wp_api_key directly (no SiteIntegration needed).
|
|
|
|
Returns:
|
|
Dict with processing results
|
|
"""
|
|
from igny8_core.business.content.models import Content
|
|
from igny8_core.business.publishing.services.publisher_service import PublisherService
|
|
|
|
results = {
|
|
'processed': 0,
|
|
'published': 0,
|
|
'failed': 0,
|
|
'errors': []
|
|
}
|
|
|
|
now = timezone.now()
|
|
publisher_service = PublisherService()
|
|
|
|
try:
|
|
# Get all scheduled content that's due
|
|
due_content = Content.objects.filter(
|
|
site_status='scheduled',
|
|
scheduled_publish_at__lte=now
|
|
).select_related('site', 'sector', 'cluster', 'account')
|
|
|
|
logger.info(f"[process_scheduled_publications] Found {due_content.count()} content items due for publishing")
|
|
|
|
for content in due_content:
|
|
results['processed'] += 1
|
|
|
|
try:
|
|
# Validate prerequisites
|
|
if not content.site:
|
|
error_msg = f"Content {content.id} has no site assigned"
|
|
logger.error(f"[process_scheduled_publications] {error_msg}")
|
|
content.site_status = 'failed'
|
|
content.site_status_updated_at = timezone.now()
|
|
content.save(update_fields=['site_status', 'site_status_updated_at'])
|
|
results['failed'] += 1
|
|
results['errors'].append(error_msg)
|
|
continue
|
|
|
|
# Check WordPress configuration on Site
|
|
if not content.site.wp_api_key:
|
|
error_msg = f"Site '{content.site.name}' (ID: {content.site.id}) has no WordPress API key configured"
|
|
logger.error(f"[process_scheduled_publications] {error_msg}")
|
|
content.site_status = 'failed'
|
|
content.site_status_updated_at = timezone.now()
|
|
content.save(update_fields=['site_status', 'site_status_updated_at'])
|
|
results['failed'] += 1
|
|
results['errors'].append(error_msg)
|
|
continue
|
|
|
|
if not content.site.domain:
|
|
error_msg = f"Site '{content.site.name}' (ID: {content.site.id}) has no domain configured"
|
|
logger.error(f"[process_scheduled_publications] {error_msg}")
|
|
content.site_status = 'failed'
|
|
content.site_status_updated_at = timezone.now()
|
|
content.save(update_fields=['site_status', 'site_status_updated_at'])
|
|
results['failed'] += 1
|
|
results['errors'].append(error_msg)
|
|
continue
|
|
|
|
# Update status to publishing
|
|
content.site_status = 'publishing'
|
|
content.site_status_updated_at = timezone.now()
|
|
content.save(update_fields=['site_status', 'site_status_updated_at'])
|
|
|
|
# Publish via PublisherService (current system)
|
|
logger.info(f"[process_scheduled_publications] Publishing content {content.id} '{content.title}' to {content.site.domain}")
|
|
publish_result = publisher_service.publish_content(
|
|
content_id=content.id,
|
|
destinations=['wordpress'],
|
|
account=content.account
|
|
)
|
|
|
|
if publish_result.get('success'):
|
|
logger.info(f"[process_scheduled_publications] ✅ Successfully published content {content.id}")
|
|
results['published'] += 1
|
|
else:
|
|
error_msg = f"Publishing failed for content {content.id}: {publish_result.get('error', 'Unknown error')}"
|
|
logger.error(f"[process_scheduled_publications] ❌ {error_msg}")
|
|
content.site_status = 'failed'
|
|
content.site_status_updated_at = timezone.now()
|
|
content.save(update_fields=['site_status', 'site_status_updated_at'])
|
|
results['failed'] += 1
|
|
results['errors'].append(error_msg)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error processing content {content.id}: {str(e)}"
|
|
logger.error(f"[process_scheduled_publications] ❌ {error_msg}", exc_info=True)
|
|
content.site_status = 'failed'
|
|
content.site_status_updated_at = timezone.now()
|
|
content.save(update_fields=['site_status', 'site_status_updated_at'])
|
|
results['failed'] += 1
|
|
results['errors'].append(error_msg)
|
|
|
|
logger.info(f"[process_scheduled_publications] ✅ Completed: {results['published']}/{results['processed']} published successfully, {results['failed']} failed")
|
|
return results
|
|
|
|
except Exception as e:
|
|
error_msg = f"Fatal error in process_scheduled_publications: {str(e)}"
|
|
logger.error(f"[process_scheduled_publications] ❌ {error_msg}", exc_info=True)
|
|
results['errors'].append(error_msg)
|
|
return results
|
|
|
|
|
|
@shared_task(name='publishing.update_content_site_status')
|
|
def update_content_site_status(content_id: int, new_status: str, external_id: str = None, external_url: str = None) -> Dict[str, Any]:
|
|
"""
|
|
Update content site_status after WordPress publishing completes.
|
|
Called by the WordPress publishing task upon success or failure.
|
|
|
|
Args:
|
|
content_id: Content ID to update
|
|
new_status: New site_status ('published' or 'failed')
|
|
external_id: WordPress post ID (on success)
|
|
external_url: WordPress post URL (on success)
|
|
|
|
Returns:
|
|
Dict with update result
|
|
"""
|
|
from igny8_core.business.content.models import Content
|
|
|
|
try:
|
|
content = Content.objects.get(id=content_id)
|
|
content.site_status = new_status
|
|
content.site_status_updated_at = timezone.now()
|
|
|
|
if external_id:
|
|
content.external_id = external_id
|
|
if external_url:
|
|
content.external_url = external_url
|
|
|
|
update_fields = ['site_status', 'site_status_updated_at']
|
|
if external_id:
|
|
update_fields.append('external_id')
|
|
if external_url:
|
|
update_fields.append('external_url')
|
|
|
|
content.save(update_fields=update_fields)
|
|
|
|
logger.info(f"Updated content {content_id} site_status to {new_status}")
|
|
return {'success': True, 'content_id': content_id, 'new_status': new_status}
|
|
|
|
except Content.DoesNotExist:
|
|
error_msg = f"Content {content_id} not found"
|
|
logger.error(error_msg)
|
|
return {'success': False, 'error': error_msg}
|
|
except Exception as e:
|
|
error_msg = f"Error updating content {content_id}: {str(e)}"
|
|
logger.error(error_msg)
|
|
return {'success': False, 'error': error_msg}
|