""" Automation Celery Tasks Background tasks for automation pipeline """ from celery import shared_task, chain from celery.utils.log import get_task_logger from datetime import datetime, timedelta from django.utils import timezone from igny8_core.business.automation.models import AutomationConfig, AutomationRun from igny8_core.business.automation.services import AutomationService logger = get_task_logger(__name__) @shared_task(name='automation.check_scheduled_automations') def check_scheduled_automations(): """ Check for scheduled automation runs (runs every hour) """ logger.info("[AutomationTask] Checking scheduled automations") now = timezone.now() current_time = now.time() # Find configs that should run now for config in AutomationConfig.objects.filter(is_enabled=True): # Check if it's time to run should_run = False if config.frequency == 'daily': # Run if current time matches scheduled_time if current_time.hour == config.scheduled_time.hour and current_time.minute < 60: should_run = True elif config.frequency == 'weekly': # Run on Mondays at scheduled_time if now.weekday() == 0 and current_time.hour == config.scheduled_time.hour and current_time.minute < 60: should_run = True elif config.frequency == 'monthly': # Run on 1st of month at scheduled_time if now.day == 1 and current_time.hour == config.scheduled_time.hour and current_time.minute < 60: should_run = True if should_run: # Check if already ran today if config.last_run_at: time_since_last_run = now - config.last_run_at if time_since_last_run < timedelta(hours=23): logger.info(f"[AutomationTask] Skipping site {config.site.id} - already ran today") continue # Check if already running if AutomationRun.objects.filter(site=config.site, status='running').exists(): logger.info(f"[AutomationTask] Skipping site {config.site.id} - already running") continue logger.info(f"[AutomationTask] Starting scheduled automation for site {config.site.id}") try: service = AutomationService(config.account, config.site) run_id = service.start_automation(trigger_type='scheduled') # Update config config.last_run_at = now config.next_run_at = _calculate_next_run(config, now) config.save() # Start async processing run_automation_task.delay(run_id) except Exception as e: logger.error(f"[AutomationTask] Failed to start automation for site {config.site.id}: {e}") @shared_task(name='automation.run_automation_task', bind=True, max_retries=0) def run_automation_task(self, run_id: str): """ Run automation pipeline (chains all stages) """ logger.info(f"[AutomationTask] Starting automation run: {run_id}") try: service = AutomationService.from_run_id(run_id) # Run all stages sequentially service.run_stage_1() service.run_stage_2() service.run_stage_3() service.run_stage_4() service.run_stage_5() service.run_stage_6() service.run_stage_7() logger.info(f"[AutomationTask] Completed automation run: {run_id}") except Exception as e: logger.error(f"[AutomationTask] Failed automation run {run_id}: {e}") # Mark as failed run = AutomationRun.objects.get(run_id=run_id) run.status = 'failed' run.error_message = str(e) run.completed_at = timezone.now() run.save() # Release lock from django.core.cache import cache cache.delete(f'automation_lock_{run.site.id}') raise @shared_task(name='automation.resume_automation_task', bind=True, max_retries=0) def resume_automation_task(self, run_id: str): """ Resume paused automation run from current stage """ logger.info(f"[AutomationTask] Resuming automation run: {run_id}") try: service = AutomationService.from_run_id(run_id) run = service.run # Continue from current stage stage_methods = [ service.run_stage_1, service.run_stage_2, service.run_stage_3, service.run_stage_4, service.run_stage_5, service.run_stage_6, service.run_stage_7, ] # Run from current_stage to end for stage in range(run.current_stage - 1, 7): stage_methods[stage]() logger.info(f"[AutomationTask] Resumed automation run: {run_id}") except Exception as e: logger.error(f"[AutomationTask] Failed to resume automation run {run_id}: {e}") # Mark as failed run = AutomationRun.objects.get(run_id=run_id) run.status = 'failed' run.error_message = str(e) run.completed_at = timezone.now() run.save() # Release lock from django.core.cache import cache cache.delete(f'automation_lock_{run.site.id}') raise def _calculate_next_run(config: AutomationConfig, now: datetime) -> datetime: """Calculate next run time based on frequency""" if config.frequency == 'daily': next_run = now + timedelta(days=1) next_run = next_run.replace( hour=config.scheduled_time.hour, minute=config.scheduled_time.minute, second=0, microsecond=0 ) elif config.frequency == 'weekly': # Next Monday days_until_monday = (7 - now.weekday()) % 7 if days_until_monday == 0: days_until_monday = 7 next_run = now + timedelta(days=days_until_monday) next_run = next_run.replace( hour=config.scheduled_time.hour, minute=config.scheduled_time.minute, second=0, microsecond=0 ) elif config.frequency == 'monthly': # Next 1st of month if now.month == 12: next_run = now.replace(year=now.year + 1, month=1, day=1) else: next_run = now.replace(month=now.month + 1, day=1) next_run = next_run.replace( hour=config.scheduled_time.hour, minute=config.scheduled_time.minute, second=0, microsecond=0 ) else: next_run = now + timedelta(days=1) return next_run