""" Automation Celery Tasks Background tasks for automation pipeline """ from celery import shared_task, chain from celery.utils.log import get_task_logger from datetime import datetime, timedelta from django.utils import timezone from igny8_core.business.automation.models import AutomationConfig, AutomationRun from igny8_core.business.automation.services import AutomationService logger = get_task_logger(__name__) @shared_task(name='automation.check_scheduled_automations') def check_scheduled_automations(): """ Check for scheduled automation runs (runs every hour at :05) SIMPLIFIED LOGIC: - User selects hour only (0-23), stored as HH:00 - Celery runs at :05 of every hour - Match: scheduled_hour == current_hour """ logger.info("[AutomationTask] Checking scheduled automations (hourly at :05)") now = timezone.now() current_hour = now.hour logger.info(f"[AutomationTask] Current hour: {current_hour}:05, checking for configs scheduled at hour {current_hour}") # Find configs that should run now (matching hour) for config in AutomationConfig.objects.filter(is_enabled=True): scheduled_hour = config.scheduled_time.hour # Simple hour match should_run = False if config.frequency == 'daily': # Run every day if hour matches should_run = (scheduled_hour == current_hour) elif config.frequency == 'weekly': # Run on Mondays if hour matches should_run = (now.weekday() == 0 and scheduled_hour == current_hour) elif config.frequency == 'monthly': # Run on 1st of month if hour matches should_run = (now.day == 1 and scheduled_hour == current_hour) logger.debug(f"[AutomationTask] Site {config.site_id}: freq={config.frequency}, scheduled_hour={scheduled_hour}, current_hour={current_hour}, should_run={should_run}") if should_run: # Check if already ran within the last 23 hours (prevents duplicate runs) if config.last_run_at: time_since_last_run = now - config.last_run_at if time_since_last_run < timedelta(hours=23): logger.info(f"[AutomationTask] Skipping site {config.site_id} - already ran {time_since_last_run} ago") continue # Check if already running OR paused (don't start new if existing in progress) if AutomationRun.objects.filter(site=config.site, status__in=['running', 'paused']).exists(): logger.info(f"[AutomationTask] Skipping site {config.site_id} - automation in progress (running/paused)") continue logger.info(f"[AutomationTask] Starting scheduled automation for site {config.site_id}") try: service = AutomationService(config.account, config.site) run_id = service.start_automation(trigger_type='scheduled') # Update config config.last_run_at = now config.next_run_at = _calculate_next_run(config, now) config.save() # Start async processing run_automation_task.delay(run_id) except Exception as e: logger.error(f"[AutomationTask] Failed to start automation for site {config.site.id}: {e}") @shared_task(name='automation.check_test_triggers') def check_test_triggers(): """ Check for test triggers (runs every minute, but only processes if test_mode_enabled) This allows admins to test automation without waiting for hourly schedule. - Set test_mode_enabled=True and test_trigger_at=datetime on AutomationConfig - When test_trigger_at <= now, automation triggers immediately - Bypasses: is_enabled check, 23hr block, frequency rules - Run is marked as trigger_type='test' - test_trigger_at is cleared after trigger """ # Quick check - if no configs have test mode enabled, exit immediately if not AutomationConfig.objects.filter(test_mode_enabled=True).exists(): return # No logging to avoid spam logger.info("[AutomationTask] Checking test triggers") now = timezone.now() # Find configs with test mode enabled and trigger time passed test_configs = AutomationConfig.objects.filter( test_mode_enabled=True, test_trigger_at__isnull=False, test_trigger_at__lte=now ) for config in test_configs: logger.info(f"[AutomationTask] Test trigger found for site {config.site_id}, trigger_at={config.test_trigger_at}") # Check if already running (still respect this to avoid conflicts) if AutomationRun.objects.filter(site=config.site, status__in=['running', 'paused']).exists(): logger.info(f"[AutomationTask] Skipping test trigger for site {config.site_id} - automation in progress") continue try: service = AutomationService(config.account, config.site) run_id = service.start_automation(trigger_type='test') # Clear test_trigger_at (don't update last_run_at - test runs don't affect production schedule) config.test_trigger_at = None config.save(update_fields=['test_trigger_at']) logger.info(f"[AutomationTask] Started test automation for site {config.site_id}, run_id={run_id}") # Start async processing run_automation_task.delay(run_id) except Exception as e: logger.error(f"[AutomationTask] Failed to start test automation for site {config.site_id}: {e}") # Clear trigger to prevent infinite retry config.test_trigger_at = None config.save(update_fields=['test_trigger_at']) @shared_task(name='automation.run_automation_task', bind=True, max_retries=0) def run_automation_task(self, run_id: str): """ Run automation pipeline (chains all stages) """ logger.info(f"[AutomationTask] Starting automation run: {run_id}") try: service = AutomationService.from_run_id(run_id) config = service.config # Run all stages sequentially, checking for enabled status, pause/cancel between stages if config.stage_1_enabled: service.run_stage_1() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 1") return else: logger.info(f"[AutomationTask] Stage 1 is disabled, skipping") if config.stage_2_enabled: service.run_stage_2() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 2") return else: logger.info(f"[AutomationTask] Stage 2 is disabled, skipping") if config.stage_3_enabled: service.run_stage_3() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 3") return else: logger.info(f"[AutomationTask] Stage 3 is disabled, skipping") if config.stage_4_enabled: service.run_stage_4() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 4") return else: logger.info(f"[AutomationTask] Stage 4 is disabled, skipping") if config.stage_5_enabled: service.run_stage_5() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 5") return else: logger.info(f"[AutomationTask] Stage 5 is disabled, skipping") if config.stage_6_enabled: service.run_stage_6() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 6") return else: logger.info(f"[AutomationTask] Stage 6 is disabled, skipping") if config.stage_7_enabled: service.run_stage_7() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 7") return else: logger.info(f"[AutomationTask] Stage 7 is disabled, skipping") logger.info(f"[AutomationTask] Completed automation run: {run_id}") except Exception as e: logger.error(f"[AutomationTask] Failed automation run {run_id}: {e}") # Mark as failed run = AutomationRun.objects.get(run_id=run_id) run.status = 'failed' run.error_message = str(e) run.completed_at = timezone.now() run.save() # Release lock from django.core.cache import cache cache.delete(f'automation_lock_{run.site.id}') raise @shared_task(name='automation.resume_automation_task', bind=True, max_retries=0) def resume_automation_task(self, run_id: str): """ Resume paused automation run from current stage. CRITICAL FIXES: - Verifies run status is 'running' before processing - Reacquires lock in case it expired during long pause - Checks pause/cancel status after each stage - Releases lock on failure """ logger.info(f"[AutomationTask] Resuming automation run: {run_id}") try: from django.core.cache import cache # Load run and verify status run = AutomationRun.objects.get(run_id=run_id) # CRITICAL FIX: Verify run is actually in 'running' status # (status is set to 'running' by views.resume before calling this task) if run.status != 'running': logger.warning(f"[AutomationTask] Run {run_id} status is '{run.status}', not 'running'. Aborting resume.") return # CRITICAL FIX: Reacquire lock in case it expired during long pause (6hr timeout) lock_key = f'automation_lock_{run.site.id}' lock_acquired = cache.add(lock_key, run_id, timeout=21600) # 6 hours if not lock_acquired: # Lock exists - check if it's ours (from original run start) existing_lock = cache.get(lock_key) # If lock exists but isn't our run_id, another run may have started if existing_lock and existing_lock != run_id and existing_lock != 'locked': logger.warning(f"[AutomationTask] Lock held by different run ({existing_lock}). Aborting resume for {run_id}") run.status = 'failed' run.error_message = f'Lock acquired by another run ({existing_lock}) during pause' run.completed_at = timezone.now() run.save() return # Lock exists and is either 'locked' (our old format) or our run_id - proceed logger.info(f"[AutomationTask] Existing lock found, proceeding with resume") else: # We acquired a new lock (old one expired) logger.info(f"[AutomationTask] Reacquired lock after expiry for run {run_id}") service = AutomationService.from_run_id(run_id) config = service.config # Continue from current stage stage_methods = [ service.run_stage_1, service.run_stage_2, service.run_stage_3, service.run_stage_4, service.run_stage_5, service.run_stage_6, service.run_stage_7, ] stage_enabled = [ config.stage_1_enabled, config.stage_2_enabled, config.stage_3_enabled, config.stage_4_enabled, config.stage_5_enabled, config.stage_6_enabled, config.stage_7_enabled, ] # Run from current_stage to end, only if stage is enabled for stage in range(run.current_stage - 1, 7): if stage_enabled[stage]: stage_methods[stage]() # CRITICAL FIX: Check for pause/cancel AFTER each stage (same as run_automation_task) service.run.refresh_from_db() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Resumed automation {service.run.status} after stage {stage + 1}") return else: logger.info(f"[AutomationTask] Stage {stage + 1} is disabled, skipping") logger.info(f"[AutomationTask] Resumed automation completed: {run_id}") except Exception as e: logger.error(f"[AutomationTask] Failed to resume automation run {run_id}: {e}") # Mark as failed and release lock try: run = AutomationRun.objects.get(run_id=run_id) run.status = 'failed' run.error_message = str(e) run.completed_at = timezone.now() run.save() # Release lock on failure from django.core.cache import cache cache.delete(f'automation_lock_{run.site.id}') except Exception as cleanup_err: logger.error(f"[AutomationTask] Failed to cleanup after resume failure: {cleanup_err}") raise # Alias for continue_automation_task (same as resume) continue_automation_task = resume_automation_task def _calculate_next_run(config: AutomationConfig, now: datetime) -> datetime: """Calculate next run time based on frequency""" if config.frequency == 'daily': next_run = now + timedelta(days=1) next_run = next_run.replace( hour=config.scheduled_time.hour, minute=config.scheduled_time.minute, second=0, microsecond=0 ) elif config.frequency == 'weekly': # Next Monday days_until_monday = (7 - now.weekday()) % 7 if days_until_monday == 0: days_until_monday = 7 next_run = now + timedelta(days=days_until_monday) next_run = next_run.replace( hour=config.scheduled_time.hour, minute=config.scheduled_time.minute, second=0, microsecond=0 ) elif config.frequency == 'monthly': # Next 1st of month if now.month == 12: next_run = now.replace(year=now.year + 1, month=1, day=1) else: next_run = now.replace(month=now.month + 1, day=1) next_run = next_run.replace( hour=config.scheduled_time.hour, minute=config.scheduled_time.minute, second=0, microsecond=0 ) else: next_run = now + timedelta(days=1) return next_run