Files
igny8/backend/igny8_core/business/automation/tasks.py
2026-01-17 00:10:26 +00:00

257 lines
9.4 KiB
Python

"""
Automation Celery Tasks
Background tasks for automation pipeline
"""
from celery import shared_task, chain
from celery.utils.log import get_task_logger
from datetime import datetime, timedelta
from django.utils import timezone
from igny8_core.business.automation.models import AutomationConfig, AutomationRun
from igny8_core.business.automation.services import AutomationService
logger = get_task_logger(__name__)
@shared_task(name='automation.check_scheduled_automations')
def check_scheduled_automations():
"""
Check for scheduled automation runs (runs every hour)
"""
logger.info("[AutomationTask] Checking scheduled automations")
now = timezone.now()
current_time = now.time()
# Find configs that should run now
for config in AutomationConfig.objects.filter(is_enabled=True):
# Check if it's time to run
should_run = False
if config.frequency == 'daily':
# Run if current time matches scheduled_time
if current_time.hour == config.scheduled_time.hour and current_time.minute < 60:
should_run = True
elif config.frequency == 'weekly':
# Run on Mondays at scheduled_time
if now.weekday() == 0 and current_time.hour == config.scheduled_time.hour and current_time.minute < 60:
should_run = True
elif config.frequency == 'monthly':
# Run on 1st of month at scheduled_time
if now.day == 1 and current_time.hour == config.scheduled_time.hour and current_time.minute < 60:
should_run = True
if should_run:
# Check if already ran today
if config.last_run_at:
time_since_last_run = now - config.last_run_at
if time_since_last_run < timedelta(hours=23):
logger.info(f"[AutomationTask] Skipping site {config.site.id} - already ran today")
continue
# Check if already running
if AutomationRun.objects.filter(site=config.site, status='running').exists():
logger.info(f"[AutomationTask] Skipping site {config.site.id} - already running")
continue
logger.info(f"[AutomationTask] Starting scheduled automation for site {config.site.id}")
try:
service = AutomationService(config.account, config.site)
run_id = service.start_automation(trigger_type='scheduled')
# Update config
config.last_run_at = now
config.next_run_at = _calculate_next_run(config, now)
config.save()
# Start async processing
run_automation_task.delay(run_id)
except Exception as e:
logger.error(f"[AutomationTask] Failed to start automation for site {config.site.id}: {e}")
@shared_task(name='automation.run_automation_task', bind=True, max_retries=0)
def run_automation_task(self, run_id: str):
"""
Run automation pipeline (chains all stages)
"""
logger.info(f"[AutomationTask] Starting automation run: {run_id}")
try:
service = AutomationService.from_run_id(run_id)
config = service.config
# Run all stages sequentially, checking for enabled status, pause/cancel between stages
if config.stage_1_enabled:
service.run_stage_1()
if service.run.status in ['paused', 'cancelled']:
logger.info(f"[AutomationTask] Automation {service.run.status} after stage 1")
return
else:
logger.info(f"[AutomationTask] Stage 1 is disabled, skipping")
if config.stage_2_enabled:
service.run_stage_2()
if service.run.status in ['paused', 'cancelled']:
logger.info(f"[AutomationTask] Automation {service.run.status} after stage 2")
return
else:
logger.info(f"[AutomationTask] Stage 2 is disabled, skipping")
if config.stage_3_enabled:
service.run_stage_3()
if service.run.status in ['paused', 'cancelled']:
logger.info(f"[AutomationTask] Automation {service.run.status} after stage 3")
return
else:
logger.info(f"[AutomationTask] Stage 3 is disabled, skipping")
if config.stage_4_enabled:
service.run_stage_4()
if service.run.status in ['paused', 'cancelled']:
logger.info(f"[AutomationTask] Automation {service.run.status} after stage 4")
return
else:
logger.info(f"[AutomationTask] Stage 4 is disabled, skipping")
if config.stage_5_enabled:
service.run_stage_5()
if service.run.status in ['paused', 'cancelled']:
logger.info(f"[AutomationTask] Automation {service.run.status} after stage 5")
return
else:
logger.info(f"[AutomationTask] Stage 5 is disabled, skipping")
if config.stage_6_enabled:
service.run_stage_6()
if service.run.status in ['paused', 'cancelled']:
logger.info(f"[AutomationTask] Automation {service.run.status} after stage 6")
return
else:
logger.info(f"[AutomationTask] Stage 6 is disabled, skipping")
if config.stage_7_enabled:
service.run_stage_7()
if service.run.status in ['paused', 'cancelled']:
logger.info(f"[AutomationTask] Automation {service.run.status} after stage 7")
return
else:
logger.info(f"[AutomationTask] Stage 7 is disabled, skipping")
logger.info(f"[AutomationTask] Completed automation run: {run_id}")
except Exception as e:
logger.error(f"[AutomationTask] Failed automation run {run_id}: {e}")
# Mark as failed
run = AutomationRun.objects.get(run_id=run_id)
run.status = 'failed'
run.error_message = str(e)
run.completed_at = timezone.now()
run.save()
# Release lock
from django.core.cache import cache
cache.delete(f'automation_lock_{run.site.id}')
raise
@shared_task(name='automation.resume_automation_task', bind=True, max_retries=0)
def resume_automation_task(self, run_id: str):
"""
Resume paused automation run from current stage
"""
logger.info(f"[AutomationTask] Resuming automation run: {run_id}")
try:
service = AutomationService.from_run_id(run_id)
run = service.run
config = service.config
# Continue from current stage
stage_methods = [
service.run_stage_1,
service.run_stage_2,
service.run_stage_3,
service.run_stage_4,
service.run_stage_5,
service.run_stage_6,
service.run_stage_7,
]
stage_enabled = [
config.stage_1_enabled,
config.stage_2_enabled,
config.stage_3_enabled,
config.stage_4_enabled,
config.stage_5_enabled,
config.stage_6_enabled,
config.stage_7_enabled,
]
# Run from current_stage to end, only if stage is enabled
for stage in range(run.current_stage - 1, 7):
if stage_enabled[stage]:
stage_methods[stage]()
else:
logger.info(f"[AutomationTask] Stage {stage + 1} is disabled, skipping")
logger.info(f"[AutomationTask] Resumed automation run: {run_id}")
except Exception as e:
logger.error(f"[AutomationTask] Failed to resume automation run {run_id}: {e}")
# Mark as failed
run = AutomationRun.objects.get(run_id=run_id)
run.status = 'failed'
run.error_message = str(e)
run.completed_at = timezone.now()
run.save()
# Alias for continue_automation_task (same as resume)
continue_automation_task = resume_automation_task
def _calculate_next_run(config: AutomationConfig, now: datetime) -> datetime:
"""Calculate next run time based on frequency"""
if config.frequency == 'daily':
next_run = now + timedelta(days=1)
next_run = next_run.replace(
hour=config.scheduled_time.hour,
minute=config.scheduled_time.minute,
second=0,
microsecond=0
)
elif config.frequency == 'weekly':
# Next Monday
days_until_monday = (7 - now.weekday()) % 7
if days_until_monday == 0:
days_until_monday = 7
next_run = now + timedelta(days=days_until_monday)
next_run = next_run.replace(
hour=config.scheduled_time.hour,
minute=config.scheduled_time.minute,
second=0,
microsecond=0
)
elif config.frequency == 'monthly':
# Next 1st of month
if now.month == 12:
next_run = now.replace(year=now.year + 1, month=1, day=1)
else:
next_run = now.replace(month=now.month + 1, day=1)
next_run = next_run.replace(
hour=config.scheduled_time.hour,
minute=config.scheduled_time.minute,
second=0,
microsecond=0
)
else:
next_run = now + timedelta(days=1)
return next_run