""" Automation Celery Tasks Background tasks for automation pipeline """ from celery import shared_task, chain from celery.utils.log import get_task_logger from datetime import datetime, timedelta from django.utils import timezone from igny8_core.business.automation.models import AutomationConfig, AutomationRun from igny8_core.business.automation.services import AutomationService logger = get_task_logger(__name__) @shared_task(name='automation.check_scheduled_automations') def check_scheduled_automations(): """ Check for scheduled automation runs (runs every hour) """ logger.info("[AutomationTask] Checking scheduled automations") now = timezone.now() current_time = now.time() # Find configs that should run now for config in AutomationConfig.objects.filter(is_enabled=True): # Check if it's time to run should_run = False if config.frequency == 'daily': # Run if current time matches scheduled_time if current_time.hour == config.scheduled_time.hour and current_time.minute < 60: should_run = True elif config.frequency == 'weekly': # Run on Mondays at scheduled_time if now.weekday() == 0 and current_time.hour == config.scheduled_time.hour and current_time.minute < 60: should_run = True elif config.frequency == 'monthly': # Run on 1st of month at scheduled_time if now.day == 1 and current_time.hour == config.scheduled_time.hour and current_time.minute < 60: should_run = True if should_run: # Check if already ran today if config.last_run_at: time_since_last_run = now - config.last_run_at if time_since_last_run < timedelta(hours=23): logger.info(f"[AutomationTask] Skipping site {config.site.id} - already ran today") continue # Check if already running OR paused (don't start new if existing in progress) if AutomationRun.objects.filter(site=config.site, status__in=['running', 'paused']).exists(): logger.info(f"[AutomationTask] Skipping site {config.site.id} - automation in progress (running/paused)") continue logger.info(f"[AutomationTask] Starting scheduled automation for site {config.site.id}") try: service = AutomationService(config.account, config.site) run_id = service.start_automation(trigger_type='scheduled') # Update config config.last_run_at = now config.next_run_at = _calculate_next_run(config, now) config.save() # Start async processing run_automation_task.delay(run_id) except Exception as e: logger.error(f"[AutomationTask] Failed to start automation for site {config.site.id}: {e}") @shared_task(name='automation.run_automation_task', bind=True, max_retries=0) def run_automation_task(self, run_id: str): """ Run automation pipeline (chains all stages) """ logger.info(f"[AutomationTask] Starting automation run: {run_id}") try: service = AutomationService.from_run_id(run_id) config = service.config # Run all stages sequentially, checking for enabled status, pause/cancel between stages if config.stage_1_enabled: service.run_stage_1() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 1") return else: logger.info(f"[AutomationTask] Stage 1 is disabled, skipping") if config.stage_2_enabled: service.run_stage_2() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 2") return else: logger.info(f"[AutomationTask] Stage 2 is disabled, skipping") if config.stage_3_enabled: service.run_stage_3() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 3") return else: logger.info(f"[AutomationTask] Stage 3 is disabled, skipping") if config.stage_4_enabled: service.run_stage_4() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 4") return else: logger.info(f"[AutomationTask] Stage 4 is disabled, skipping") if config.stage_5_enabled: service.run_stage_5() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 5") return else: logger.info(f"[AutomationTask] Stage 5 is disabled, skipping") if config.stage_6_enabled: service.run_stage_6() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 6") return else: logger.info(f"[AutomationTask] Stage 6 is disabled, skipping") if config.stage_7_enabled: service.run_stage_7() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Automation {service.run.status} after stage 7") return else: logger.info(f"[AutomationTask] Stage 7 is disabled, skipping") logger.info(f"[AutomationTask] Completed automation run: {run_id}") except Exception as e: logger.error(f"[AutomationTask] Failed automation run {run_id}: {e}") # Mark as failed run = AutomationRun.objects.get(run_id=run_id) run.status = 'failed' run.error_message = str(e) run.completed_at = timezone.now() run.save() # Release lock from django.core.cache import cache cache.delete(f'automation_lock_{run.site.id}') raise @shared_task(name='automation.resume_automation_task', bind=True, max_retries=0) def resume_automation_task(self, run_id: str): """ Resume paused automation run from current stage. CRITICAL FIXES: - Verifies run status is 'running' before processing - Reacquires lock in case it expired during long pause - Checks pause/cancel status after each stage - Releases lock on failure """ logger.info(f"[AutomationTask] Resuming automation run: {run_id}") try: from django.core.cache import cache # Load run and verify status run = AutomationRun.objects.get(run_id=run_id) # CRITICAL FIX: Verify run is actually in 'running' status # (status is set to 'running' by views.resume before calling this task) if run.status != 'running': logger.warning(f"[AutomationTask] Run {run_id} status is '{run.status}', not 'running'. Aborting resume.") return # CRITICAL FIX: Reacquire lock in case it expired during long pause (6hr timeout) lock_key = f'automation_lock_{run.site.id}' lock_acquired = cache.add(lock_key, run_id, timeout=21600) # 6 hours if not lock_acquired: # Lock exists - check if it's ours (from original run start) existing_lock = cache.get(lock_key) # If lock exists but isn't our run_id, another run may have started if existing_lock and existing_lock != run_id and existing_lock != 'locked': logger.warning(f"[AutomationTask] Lock held by different run ({existing_lock}). Aborting resume for {run_id}") run.status = 'failed' run.error_message = f'Lock acquired by another run ({existing_lock}) during pause' run.completed_at = timezone.now() run.save() return # Lock exists and is either 'locked' (our old format) or our run_id - proceed logger.info(f"[AutomationTask] Existing lock found, proceeding with resume") else: # We acquired a new lock (old one expired) logger.info(f"[AutomationTask] Reacquired lock after expiry for run {run_id}") service = AutomationService.from_run_id(run_id) config = service.config # Continue from current stage stage_methods = [ service.run_stage_1, service.run_stage_2, service.run_stage_3, service.run_stage_4, service.run_stage_5, service.run_stage_6, service.run_stage_7, ] stage_enabled = [ config.stage_1_enabled, config.stage_2_enabled, config.stage_3_enabled, config.stage_4_enabled, config.stage_5_enabled, config.stage_6_enabled, config.stage_7_enabled, ] # Run from current_stage to end, only if stage is enabled for stage in range(run.current_stage - 1, 7): if stage_enabled[stage]: stage_methods[stage]() # CRITICAL FIX: Check for pause/cancel AFTER each stage (same as run_automation_task) service.run.refresh_from_db() if service.run.status in ['paused', 'cancelled']: logger.info(f"[AutomationTask] Resumed automation {service.run.status} after stage {stage + 1}") return else: logger.info(f"[AutomationTask] Stage {stage + 1} is disabled, skipping") logger.info(f"[AutomationTask] Resumed automation completed: {run_id}") except Exception as e: logger.error(f"[AutomationTask] Failed to resume automation run {run_id}: {e}") # Mark as failed and release lock try: run = AutomationRun.objects.get(run_id=run_id) run.status = 'failed' run.error_message = str(e) run.completed_at = timezone.now() run.save() # Release lock on failure from django.core.cache import cache cache.delete(f'automation_lock_{run.site.id}') except Exception as cleanup_err: logger.error(f"[AutomationTask] Failed to cleanup after resume failure: {cleanup_err}") raise # Alias for continue_automation_task (same as resume) continue_automation_task = resume_automation_task def _calculate_next_run(config: AutomationConfig, now: datetime) -> datetime: """Calculate next run time based on frequency""" if config.frequency == 'daily': next_run = now + timedelta(days=1) next_run = next_run.replace( hour=config.scheduled_time.hour, minute=config.scheduled_time.minute, second=0, microsecond=0 ) elif config.frequency == 'weekly': # Next Monday days_until_monday = (7 - now.weekday()) % 7 if days_until_monday == 0: days_until_monday = 7 next_run = now + timedelta(days=days_until_monday) next_run = next_run.replace( hour=config.scheduled_time.hour, minute=config.scheduled_time.minute, second=0, microsecond=0 ) elif config.frequency == 'monthly': # Next 1st of month if now.month == 12: next_run = now.replace(year=now.year + 1, month=1, day=1) else: next_run = now.replace(month=now.month + 1, day=1) next_run = next_run.replace( hour=config.scheduled_time.hour, minute=config.scheduled_time.minute, second=0, microsecond=0 ) else: next_run = now + timedelta(days=1) return next_run