""" IGNY8 Publishing Scheduler Tasks Celery tasks for scheduling and processing automated content publishing: 1. schedule_approved_content: Runs hourly - schedules approved content for publishing 2. process_scheduled_publications: Runs every 5 minutes - processes scheduled content """ from celery import shared_task from django.utils import timezone from datetime import datetime, timedelta import logging from typing import Dict, Any logger = logging.getLogger(__name__) @shared_task(name='publishing.schedule_approved_content') def schedule_approved_content() -> Dict[str, Any]: """ Hourly task that schedules approved content for publishing based on PublishingSettings. For each site with PublishingSettings.auto_publish_enabled: 1. Gets all content with status='approved' and site_status='not_published' 2. Calculates next available publish slots based on: - publish_days (which days are allowed) - publish_time_slots (which times are allowed) - daily/weekly/monthly limits 3. Sets scheduled_publish_at and site_status='scheduled' Returns: Dict with scheduling results per site """ from igny8_core.business.content.models import Content from igny8_core.business.integration.models import PublishingSettings from igny8_core.auth.models import Site results = { 'sites_processed': 0, 'content_scheduled': 0, 'errors': [], 'details': [] } try: # Get all sites with auto_publish enabled sites_with_settings = PublishingSettings.objects.filter(auto_publish_enabled=True) for settings in sites_with_settings: site = settings.site site_result = { 'site_id': site.id, 'site_name': site.name, 'scheduled_count': 0 } try: # Get approved content that's not yet scheduled pending_content = Content.objects.filter( site=site, status='approved', site_status='not_published', scheduled_publish_at__isnull=True ).order_by('created_at') if not pending_content.exists(): logger.debug(f"Site {site.id}: No pending content to schedule") results['details'].append(site_result) results['sites_processed'] += 1 continue # Calculate available slots available_slots = _calculate_available_slots(settings, site) # Assign slots to content for i, content in enumerate(pending_content): if i >= len(available_slots): logger.info(f"Site {site.id}: No more slots available (limit reached)") break scheduled_time = available_slots[i] content.scheduled_publish_at = scheduled_time content.site_status = 'scheduled' content.site_status_updated_at = timezone.now() content.save(update_fields=['scheduled_publish_at', 'site_status', 'site_status_updated_at']) site_result['scheduled_count'] += 1 results['content_scheduled'] += 1 logger.info(f"Scheduled content {content.id} for {scheduled_time}") results['details'].append(site_result) results['sites_processed'] += 1 except Exception as e: error_msg = f"Error processing site {site.id}: {str(e)}" logger.error(error_msg) results['errors'].append(error_msg) logger.info(f"Publishing scheduler completed: {results['content_scheduled']} content items scheduled across {results['sites_processed']} sites") return results except Exception as e: error_msg = f"Fatal error in schedule_approved_content: {str(e)}" logger.error(error_msg) results['errors'].append(error_msg) return results def _calculate_available_slots(settings: 'PublishingSettings', site: 'Site') -> list: """ Calculate available publishing time slots based on settings and limits. Args: settings: PublishingSettings instance site: Site instance Returns: List of datetime objects representing available slots """ from igny8_core.business.content.models import Content now = timezone.now() slots = [] # Get configured days and times publish_days = settings.publish_days or ['mon', 'tue', 'wed', 'thu', 'fri'] publish_times = settings.publish_time_slots or ['09:00', '14:00', '18:00'] # Day name mapping day_map = { 'mon': 0, 'tue': 1, 'wed': 2, 'thu': 3, 'fri': 4, 'sat': 5, 'sun': 6 } allowed_days = [day_map.get(d.lower(), -1) for d in publish_days] allowed_days = [d for d in allowed_days if d >= 0] # Parse time slots time_slots = [] for time_str in publish_times: try: hour, minute = map(int, time_str.split(':')) time_slots.append((hour, minute)) except (ValueError, AttributeError): continue if not time_slots: time_slots = [(9, 0), (14, 0), (18, 0)] # Calculate limits daily_limit = settings.daily_publish_limit weekly_limit = settings.weekly_publish_limit monthly_limit = settings.monthly_publish_limit # Count existing scheduled/published content today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) week_start = today_start - timedelta(days=now.weekday()) month_start = today_start.replace(day=1) daily_count = Content.objects.filter( site=site, site_status__in=['scheduled', 'publishing', 'published'], scheduled_publish_at__gte=today_start ).count() weekly_count = Content.objects.filter( site=site, site_status__in=['scheduled', 'publishing', 'published'], scheduled_publish_at__gte=week_start ).count() monthly_count = Content.objects.filter( site=site, site_status__in=['scheduled', 'publishing', 'published'], scheduled_publish_at__gte=month_start ).count() # Generate slots for next 30 days current_date = now.date() slots_per_day = {} # Track slots used per day for day_offset in range(30): check_date = current_date + timedelta(days=day_offset) # Check if day is allowed if check_date.weekday() not in allowed_days: continue for hour, minute in time_slots: slot_time = timezone.make_aware( datetime.combine(check_date, datetime.min.time().replace(hour=hour, minute=minute)) ) # Skip if in the past if slot_time <= now: continue # Check daily limit day_key = check_date.isoformat() slots_this_day = slots_per_day.get(day_key, 0) if daily_limit and (daily_count + slots_this_day) >= daily_limit: continue # Check weekly limit slot_week_start = slot_time - timedelta(days=slot_time.weekday()) if slot_week_start.date() == week_start.date(): scheduled_in_week = weekly_count + len([s for s in slots if s >= week_start]) if weekly_limit and scheduled_in_week >= weekly_limit: continue # Check monthly limit if slot_time.month == now.month and slot_time.year == now.year: scheduled_in_month = monthly_count + len([s for s in slots if s.month == now.month]) if monthly_limit and scheduled_in_month >= monthly_limit: continue slots.append(slot_time) slots_per_day[day_key] = slots_per_day.get(day_key, 0) + 1 # Limit total slots to prevent memory issues if len(slots) >= 100: return slots return slots @shared_task(name='publishing.process_scheduled_publications') def process_scheduled_publications() -> Dict[str, Any]: """ Every 5 minutes: Process content scheduled for publishing. Finds all content where: - site_status = 'scheduled' - scheduled_publish_at <= now For each, triggers publishing via PublisherService (current system). UPDATED: Uses Site.wp_api_key directly (no SiteIntegration needed). Returns: Dict with processing results """ from igny8_core.business.content.models import Content from igny8_core.business.publishing.services.publisher_service import PublisherService results = { 'processed': 0, 'published': 0, 'failed': 0, 'errors': [] } now = timezone.now() publisher_service = PublisherService() try: # Get all scheduled content that's due due_content = Content.objects.filter( site_status='scheduled', scheduled_publish_at__lte=now ).select_related('site', 'sector', 'cluster', 'account') logger.info(f"[process_scheduled_publications] Found {due_content.count()} content items due for publishing") for content in due_content: results['processed'] += 1 try: # Validate prerequisites if not content.site: error_msg = f"Content {content.id} has no site assigned" logger.error(f"[process_scheduled_publications] {error_msg}") content.site_status = 'failed' content.site_status_updated_at = timezone.now() content.save(update_fields=['site_status', 'site_status_updated_at']) results['failed'] += 1 results['errors'].append(error_msg) continue # Check WordPress configuration on Site if not content.site.wp_api_key: error_msg = f"Site '{content.site.name}' (ID: {content.site.id}) has no WordPress API key configured" logger.error(f"[process_scheduled_publications] {error_msg}") content.site_status = 'failed' content.site_status_updated_at = timezone.now() content.save(update_fields=['site_status', 'site_status_updated_at']) results['failed'] += 1 results['errors'].append(error_msg) continue if not content.site.domain: error_msg = f"Site '{content.site.name}' (ID: {content.site.id}) has no domain configured" logger.error(f"[process_scheduled_publications] {error_msg}") content.site_status = 'failed' content.site_status_updated_at = timezone.now() content.save(update_fields=['site_status', 'site_status_updated_at']) results['failed'] += 1 results['errors'].append(error_msg) continue # Update status to publishing content.site_status = 'publishing' content.site_status_updated_at = timezone.now() content.save(update_fields=['site_status', 'site_status_updated_at']) # Publish via PublisherService (current system) logger.info(f"[process_scheduled_publications] Publishing content {content.id} '{content.title}' to {content.site.domain}") publish_result = publisher_service.publish_content( content_id=content.id, destinations=['wordpress'], account=content.account ) if publish_result.get('success'): logger.info(f"[process_scheduled_publications] ✅ Successfully published content {content.id}") results['published'] += 1 else: error_msg = f"Publishing failed for content {content.id}: {publish_result.get('error', 'Unknown error')}" logger.error(f"[process_scheduled_publications] ❌ {error_msg}") content.site_status = 'failed' content.site_status_updated_at = timezone.now() content.save(update_fields=['site_status', 'site_status_updated_at']) results['failed'] += 1 results['errors'].append(error_msg) except Exception as e: error_msg = f"Error processing content {content.id}: {str(e)}" logger.error(f"[process_scheduled_publications] ❌ {error_msg}", exc_info=True) content.site_status = 'failed' content.site_status_updated_at = timezone.now() content.save(update_fields=['site_status', 'site_status_updated_at']) results['failed'] += 1 results['errors'].append(error_msg) logger.info(f"[process_scheduled_publications] ✅ Completed: {results['published']}/{results['processed']} published successfully, {results['failed']} failed") return results except Exception as e: error_msg = f"Fatal error in process_scheduled_publications: {str(e)}" logger.error(f"[process_scheduled_publications] ❌ {error_msg}", exc_info=True) results['errors'].append(error_msg) return results @shared_task(name='publishing.update_content_site_status') def update_content_site_status(content_id: int, new_status: str, external_id: str = None, external_url: str = None) -> Dict[str, Any]: """ Update content site_status after WordPress publishing completes. Called by the WordPress publishing task upon success or failure. Args: content_id: Content ID to update new_status: New site_status ('published' or 'failed') external_id: WordPress post ID (on success) external_url: WordPress post URL (on success) Returns: Dict with update result """ from igny8_core.business.content.models import Content try: content = Content.objects.get(id=content_id) content.site_status = new_status content.site_status_updated_at = timezone.now() if external_id: content.external_id = external_id if external_url: content.external_url = external_url update_fields = ['site_status', 'site_status_updated_at'] if external_id: update_fields.append('external_id') if external_url: update_fields.append('external_url') content.save(update_fields=update_fields) logger.info(f"Updated content {content_id} site_status to {new_status}") return {'success': True, 'content_id': content_id, 'new_status': new_status} except Content.DoesNotExist: error_msg = f"Content {content_id} not found" logger.error(error_msg) return {'success': False, 'error': error_msg} except Exception as e: error_msg = f"Error updating content {content_id}: {str(e)}" logger.error(error_msg) return {'success': False, 'error': error_msg}