fixes related to automation and celery schedules
This commit is contained in:
@@ -186,13 +186,76 @@ class UnifiedSiteSettingsViewSet(viewsets.ViewSet):
|
||||
# Update automation settings
|
||||
if 'automation' in data:
|
||||
auto = data['automation']
|
||||
schedule_changed = False
|
||||
|
||||
if 'enabled' in auto:
|
||||
if automation_config.is_enabled != auto['enabled']:
|
||||
schedule_changed = True
|
||||
automation_config.is_enabled = auto['enabled']
|
||||
if 'frequency' in auto:
|
||||
if automation_config.frequency != auto['frequency']:
|
||||
schedule_changed = True
|
||||
automation_config.frequency = auto['frequency']
|
||||
if 'time' in auto:
|
||||
from datetime import datetime
|
||||
automation_config.scheduled_time = datetime.strptime(auto['time'], '%H:%M').time()
|
||||
new_time = datetime.strptime(auto['time'], '%H:%M').time()
|
||||
if automation_config.scheduled_time != new_time:
|
||||
schedule_changed = True
|
||||
automation_config.scheduled_time = new_time
|
||||
|
||||
# Reset last_run_at and recalculate next_run_at if any schedule setting changed
|
||||
if schedule_changed:
|
||||
automation_config.last_run_at = None
|
||||
|
||||
# Recalculate next_run_at based on new schedule
|
||||
from django.utils import timezone
|
||||
from datetime import timedelta
|
||||
|
||||
now = timezone.now()
|
||||
scheduled_time = automation_config.scheduled_time
|
||||
|
||||
# Calculate next run at the scheduled time
|
||||
next_run = now.replace(
|
||||
hour=scheduled_time.hour,
|
||||
minute=scheduled_time.minute,
|
||||
second=0,
|
||||
microsecond=0
|
||||
)
|
||||
|
||||
# If scheduled time has passed today, set to tomorrow (for daily)
|
||||
# or appropriate next occurrence for weekly/monthly
|
||||
if next_run <= now:
|
||||
if automation_config.frequency == 'daily':
|
||||
next_run = next_run + timedelta(days=1)
|
||||
elif automation_config.frequency == 'weekly':
|
||||
# Next Monday
|
||||
days_until_monday = (7 - now.weekday()) % 7
|
||||
if days_until_monday == 0:
|
||||
days_until_monday = 7
|
||||
next_run = now + timedelta(days=days_until_monday)
|
||||
next_run = next_run.replace(
|
||||
hour=scheduled_time.hour,
|
||||
minute=scheduled_time.minute,
|
||||
second=0,
|
||||
microsecond=0
|
||||
)
|
||||
elif automation_config.frequency == 'monthly':
|
||||
# Next 1st of month
|
||||
if now.month == 12:
|
||||
next_run = now.replace(year=now.year + 1, month=1, day=1)
|
||||
else:
|
||||
next_run = now.replace(month=now.month + 1, day=1)
|
||||
next_run = next_run.replace(
|
||||
hour=scheduled_time.hour,
|
||||
minute=scheduled_time.minute,
|
||||
second=0,
|
||||
microsecond=0
|
||||
)
|
||||
else:
|
||||
next_run = next_run + timedelta(days=1)
|
||||
|
||||
automation_config.next_run_at = next_run
|
||||
logger.info(f"[UnifiedSettings] Schedule changed for site {site_id}, reset last_run_at=None, next_run_at={next_run}")
|
||||
|
||||
# Update stage configuration
|
||||
if 'stages' in data:
|
||||
|
||||
@@ -17,14 +17,14 @@ class AutomationConfigResource(resources.ModelResource):
|
||||
class Meta:
|
||||
model = AutomationConfig
|
||||
fields = ('id', 'site__domain', 'is_enabled', 'frequency', 'scheduled_time',
|
||||
'within_stage_delay', 'between_stage_delay', 'last_run_at', 'created_at')
|
||||
'last_run_at', 'next_run_at', 'created_at')
|
||||
export_order = fields
|
||||
|
||||
|
||||
@admin.register(AutomationConfig)
|
||||
class AutomationConfigAdmin(ExportMixin, AccountAdminMixin, Igny8ModelAdmin):
|
||||
resource_class = AutomationConfigResource
|
||||
list_display = ('site', 'is_enabled', 'frequency', 'scheduled_time', 'within_stage_delay', 'between_stage_delay', 'last_run_at')
|
||||
list_display = ('site', 'is_enabled', 'frequency', 'scheduled_time', 'next_scheduled_run', 'last_run_at')
|
||||
list_filter = ('is_enabled', 'frequency')
|
||||
search_fields = ('site__domain',)
|
||||
actions = [
|
||||
@@ -34,6 +34,142 @@ class AutomationConfigAdmin(ExportMixin, AccountAdminMixin, Igny8ModelAdmin):
|
||||
'bulk_update_delays',
|
||||
]
|
||||
|
||||
def next_scheduled_run(self, obj):
|
||||
"""
|
||||
Calculate the next scheduled run based on:
|
||||
- Celery Beat schedule (every 15 minutes at :00, :15, :30, :45)
|
||||
- Frequency (daily, weekly, monthly)
|
||||
- Scheduled time
|
||||
- 23-hour block after last_run_at
|
||||
|
||||
Celery checks window at :00 for :00-:14, at :15 for :15-:29, etc.
|
||||
So scheduled_time 12:12 will be picked up at the 12:00 check.
|
||||
"""
|
||||
from django.utils import timezone
|
||||
from datetime import timedelta
|
||||
|
||||
if not obj.is_enabled:
|
||||
return 'Disabled'
|
||||
|
||||
now = timezone.now()
|
||||
scheduled_hour = obj.scheduled_time.hour
|
||||
scheduled_minute = obj.scheduled_time.minute
|
||||
|
||||
# Calculate the Celery window start time for this scheduled_time
|
||||
# If scheduled at :12, Celery checks at :00 (window :00-:14)
|
||||
# If scheduled at :35, Celery checks at :30 (window :30-:44)
|
||||
window_start_minute = (scheduled_minute // 15) * 15
|
||||
|
||||
# Calculate next occurrence based on frequency
|
||||
def get_next_celery_pickup():
|
||||
# Start with today at the Celery window start time
|
||||
candidate = now.replace(
|
||||
hour=scheduled_hour,
|
||||
minute=window_start_minute,
|
||||
second=0,
|
||||
microsecond=0
|
||||
)
|
||||
|
||||
if obj.frequency == 'daily':
|
||||
# If time has passed today (Celery already checked this window), next is tomorrow
|
||||
if candidate <= now:
|
||||
candidate += timedelta(days=1)
|
||||
elif obj.frequency == 'weekly':
|
||||
# Run on Mondays
|
||||
days_until_monday = (7 - now.weekday()) % 7
|
||||
if days_until_monday == 0:
|
||||
# Today is Monday - check if time passed
|
||||
candidate = now.replace(
|
||||
hour=scheduled_hour,
|
||||
minute=window_start_minute,
|
||||
second=0,
|
||||
microsecond=0
|
||||
)
|
||||
if candidate <= now:
|
||||
days_until_monday = 7
|
||||
candidate += timedelta(days=7)
|
||||
else:
|
||||
candidate = now + timedelta(days=days_until_monday)
|
||||
candidate = candidate.replace(
|
||||
hour=scheduled_hour,
|
||||
minute=window_start_minute,
|
||||
second=0,
|
||||
microsecond=0
|
||||
)
|
||||
elif obj.frequency == 'monthly':
|
||||
# Run on 1st of month
|
||||
candidate = now.replace(
|
||||
day=1,
|
||||
hour=scheduled_hour,
|
||||
minute=window_start_minute,
|
||||
second=0,
|
||||
microsecond=0
|
||||
)
|
||||
if candidate <= now:
|
||||
# Next month
|
||||
if now.month == 12:
|
||||
candidate = candidate.replace(year=now.year + 1, month=1)
|
||||
else:
|
||||
candidate = candidate.replace(month=now.month + 1)
|
||||
|
||||
return candidate
|
||||
|
||||
next_celery_pickup = get_next_celery_pickup()
|
||||
|
||||
# Check 23-hour block
|
||||
if obj.last_run_at:
|
||||
earliest_eligible = obj.last_run_at + timedelta(hours=23)
|
||||
if next_celery_pickup < earliest_eligible:
|
||||
# Blocked - need to skip to next cycle
|
||||
if obj.frequency == 'daily':
|
||||
# Move to next day's window
|
||||
next_celery_pickup = earliest_eligible.replace(
|
||||
hour=scheduled_hour,
|
||||
minute=window_start_minute,
|
||||
second=0,
|
||||
microsecond=0
|
||||
)
|
||||
if next_celery_pickup < earliest_eligible:
|
||||
next_celery_pickup += timedelta(days=1)
|
||||
elif obj.frequency == 'weekly':
|
||||
# Find next Monday after earliest_eligible
|
||||
days_until_monday = (7 - earliest_eligible.weekday()) % 7
|
||||
if days_until_monday == 0:
|
||||
test_candidate = earliest_eligible.replace(
|
||||
hour=scheduled_hour,
|
||||
minute=window_start_minute,
|
||||
second=0,
|
||||
microsecond=0
|
||||
)
|
||||
if test_candidate <= earliest_eligible:
|
||||
days_until_monday = 7
|
||||
next_celery_pickup = earliest_eligible + timedelta(days=days_until_monday)
|
||||
next_celery_pickup = next_celery_pickup.replace(
|
||||
hour=scheduled_hour,
|
||||
minute=window_start_minute,
|
||||
second=0,
|
||||
microsecond=0
|
||||
)
|
||||
elif obj.frequency == 'monthly':
|
||||
# Find next 1st of month after earliest_eligible
|
||||
next_celery_pickup = earliest_eligible.replace(
|
||||
day=1,
|
||||
hour=scheduled_hour,
|
||||
minute=window_start_minute,
|
||||
second=0,
|
||||
microsecond=0
|
||||
)
|
||||
if next_celery_pickup < earliest_eligible:
|
||||
if earliest_eligible.month == 12:
|
||||
next_celery_pickup = next_celery_pickup.replace(year=earliest_eligible.year + 1, month=1)
|
||||
else:
|
||||
next_celery_pickup = next_celery_pickup.replace(month=earliest_eligible.month + 1)
|
||||
|
||||
# Format nicely
|
||||
return next_celery_pickup.strftime('%b %d, %Y, %-I:%M %p')
|
||||
|
||||
next_scheduled_run.short_description = 'Next Scheduled Run'
|
||||
|
||||
def bulk_enable(self, request, queryset):
|
||||
"""Enable selected automation configs"""
|
||||
updated = queryset.update(is_enabled=True)
|
||||
|
||||
@@ -16,45 +16,63 @@ logger = get_task_logger(__name__)
|
||||
@shared_task(name='automation.check_scheduled_automations')
|
||||
def check_scheduled_automations():
|
||||
"""
|
||||
Check for scheduled automation runs (runs every hour)
|
||||
Check for scheduled automation runs (runs every 15 minutes)
|
||||
Matches automations scheduled within the current 15-minute window.
|
||||
"""
|
||||
logger.info("[AutomationTask] Checking scheduled automations")
|
||||
|
||||
now = timezone.now()
|
||||
current_time = now.time()
|
||||
|
||||
# Calculate 15-minute window boundaries
|
||||
# Window starts at current quarter hour (0, 15, 30, 45)
|
||||
window_start_minute = (current_time.minute // 15) * 15
|
||||
window_end_minute = window_start_minute + 14
|
||||
|
||||
logger.info(f"[AutomationTask] Current time: {current_time}, checking window {current_time.hour}:{window_start_minute:02d}-{current_time.hour}:{window_end_minute:02d}")
|
||||
|
||||
# Find configs that should run now
|
||||
for config in AutomationConfig.objects.filter(is_enabled=True):
|
||||
# Check if it's time to run
|
||||
should_run = False
|
||||
scheduled_hour = config.scheduled_time.hour
|
||||
scheduled_minute = config.scheduled_time.minute
|
||||
|
||||
# Check if scheduled time falls within current 15-minute window
|
||||
def is_in_window():
|
||||
if current_time.hour != scheduled_hour:
|
||||
return False
|
||||
return window_start_minute <= scheduled_minute <= window_end_minute
|
||||
|
||||
if config.frequency == 'daily':
|
||||
# Run if current time matches scheduled_time
|
||||
if current_time.hour == config.scheduled_time.hour and current_time.minute < 60:
|
||||
# Run if scheduled_time falls within current 15-minute window
|
||||
if is_in_window():
|
||||
should_run = True
|
||||
elif config.frequency == 'weekly':
|
||||
# Run on Mondays at scheduled_time
|
||||
if now.weekday() == 0 and current_time.hour == config.scheduled_time.hour and current_time.minute < 60:
|
||||
# Run on Mondays within scheduled window
|
||||
if now.weekday() == 0 and is_in_window():
|
||||
should_run = True
|
||||
elif config.frequency == 'monthly':
|
||||
# Run on 1st of month at scheduled_time
|
||||
if now.day == 1 and current_time.hour == config.scheduled_time.hour and current_time.minute < 60:
|
||||
# Run on 1st of month within scheduled window
|
||||
if now.day == 1 and is_in_window():
|
||||
should_run = True
|
||||
|
||||
logger.debug(f"[AutomationTask] Site {config.site_id}: freq={config.frequency}, scheduled={config.scheduled_time}, should_run={should_run}")
|
||||
|
||||
if should_run:
|
||||
# Check if already ran today
|
||||
# Check if already ran within the last 23 hours (prevents duplicate runs)
|
||||
if config.last_run_at:
|
||||
time_since_last_run = now - config.last_run_at
|
||||
if time_since_last_run < timedelta(hours=23):
|
||||
logger.info(f"[AutomationTask] Skipping site {config.site.id} - already ran today")
|
||||
logger.info(f"[AutomationTask] Skipping site {config.site_id} - already ran {time_since_last_run} ago")
|
||||
continue
|
||||
|
||||
# Check if already running OR paused (don't start new if existing in progress)
|
||||
if AutomationRun.objects.filter(site=config.site, status__in=['running', 'paused']).exists():
|
||||
logger.info(f"[AutomationTask] Skipping site {config.site.id} - automation in progress (running/paused)")
|
||||
logger.info(f"[AutomationTask] Skipping site {config.site_id} - automation in progress (running/paused)")
|
||||
continue
|
||||
|
||||
logger.info(f"[AutomationTask] Starting scheduled automation for site {config.site.id}")
|
||||
logger.info(f"[AutomationTask] Starting scheduled automation for site {config.site_id}")
|
||||
|
||||
try:
|
||||
service = AutomationService(config.account, config.site)
|
||||
|
||||
@@ -115,13 +115,30 @@ class AutomationViewSet(viewsets.ViewSet):
|
||||
site=site
|
||||
)
|
||||
|
||||
# Update fields
|
||||
# Update fields - track if schedule changed
|
||||
schedule_changed = False
|
||||
|
||||
if 'is_enabled' in request.data:
|
||||
if config.is_enabled != request.data['is_enabled']:
|
||||
schedule_changed = True
|
||||
config.is_enabled = request.data['is_enabled']
|
||||
if 'frequency' in request.data:
|
||||
if config.frequency != request.data['frequency']:
|
||||
schedule_changed = True
|
||||
config.frequency = request.data['frequency']
|
||||
if 'scheduled_time' in request.data:
|
||||
config.scheduled_time = request.data['scheduled_time']
|
||||
new_time = request.data['scheduled_time']
|
||||
if str(config.scheduled_time) != str(new_time):
|
||||
schedule_changed = True
|
||||
config.scheduled_time = new_time
|
||||
|
||||
# Reset last_run_at and recalculate next_run_at if any schedule setting changed
|
||||
if schedule_changed:
|
||||
config.last_run_at = None
|
||||
# Recalculate next_run_at based on new schedule
|
||||
from igny8_core.business.automation.tasks import _calculate_next_run
|
||||
from django.utils import timezone
|
||||
config.next_run_at = _calculate_next_run(config, timezone.now())
|
||||
# Stage enabled toggles
|
||||
if 'stage_1_enabled' in request.data:
|
||||
config.stage_1_enabled = request.data['stage_1_enabled']
|
||||
@@ -1921,3 +1938,35 @@ class AutomationViewSet(viewsets.ViewSet):
|
||||
},
|
||||
'initial_snapshot': initial_snapshot
|
||||
}
|
||||
|
||||
@extend_schema(tags=['Automation'])
|
||||
@action(detail=False, methods=['get'], url_path='server_time')
|
||||
def server_time(self, request):
|
||||
"""
|
||||
GET /api/v1/automation/server_time/
|
||||
Get current server time (UTC) used for all automation scheduling.
|
||||
|
||||
Returns:
|
||||
- server_time: Current UTC timestamp (ISO 8601 format)
|
||||
- server_time_formatted: Human-readable UTC time
|
||||
- timezone: Server timezone setting (always UTC)
|
||||
- celery_timezone: Celery task timezone setting
|
||||
- use_tz: Whether Django is timezone-aware
|
||||
|
||||
Note: All automation schedules (scheduled_time) are in UTC.
|
||||
When user sets "02:00", the automation runs at 02:00 UTC.
|
||||
"""
|
||||
from django.conf import settings
|
||||
|
||||
now = timezone.now()
|
||||
|
||||
return Response({
|
||||
'server_time': now.isoformat(),
|
||||
'server_time_formatted': now.strftime('%H:%M'),
|
||||
'server_time_date': now.strftime('%Y-%m-%d'),
|
||||
'server_time_time': now.strftime('%H:%M:%S'),
|
||||
'timezone': settings.TIME_ZONE,
|
||||
'celery_timezone': getattr(settings, 'CELERY_TIMEZONE', settings.TIME_ZONE),
|
||||
'use_tz': settings.USE_TZ,
|
||||
'note': 'All automation schedules are in UTC. When you set "02:00", the automation runs at 02:00 UTC.'
|
||||
})
|
||||
@@ -57,7 +57,7 @@ app.conf.beat_schedule = {
|
||||
# Automation Tasks
|
||||
'check-scheduled-automations': {
|
||||
'task': 'automation.check_scheduled_automations',
|
||||
'schedule': crontab(minute=0), # Every hour at :00
|
||||
'schedule': crontab(minute='0,15,30,45'), # Every 15 minutes
|
||||
},
|
||||
# Publishing Scheduler Tasks
|
||||
'schedule-approved-content': {
|
||||
|
||||
Reference in New Issue
Block a user