Files
igny8/backend/igny8_core/business/automation/views.py
IGNY8 VPS (Salman) 1521f3ff8c fixes
2025-12-04 17:58:41 +00:00

672 lines
24 KiB
Python

"""
Automation API Views
REST API endpoints for automation management
"""
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from django.shortcuts import get_object_or_404
from django.utils import timezone
from igny8_core.business.automation.models import AutomationConfig, AutomationRun
from igny8_core.business.automation.services import AutomationService
from igny8_core.auth.models import Account, Site
class AutomationViewSet(viewsets.ViewSet):
"""API endpoints for automation"""
permission_classes = [IsAuthenticated]
def _get_site(self, request):
"""Get site from request"""
site_id = request.query_params.get('site_id')
if not site_id:
return None, Response(
{'error': 'site_id required'},
status=status.HTTP_400_BAD_REQUEST
)
site = get_object_or_404(Site, id=site_id, account=request.user.account)
return site, None
@action(detail=False, methods=['get'])
def config(self, request):
"""
GET /api/v1/automation/config/?site_id=123
Get automation configuration for site
"""
site, error_response = self._get_site(request)
if error_response:
return error_response
config, _ = AutomationConfig.objects.get_or_create(
account=site.account,
site=site,
defaults={
'is_enabled': False,
'frequency': 'daily',
'scheduled_time': '02:00',
}
)
return Response({
'is_enabled': config.is_enabled,
'frequency': config.frequency,
'scheduled_time': str(config.scheduled_time),
'stage_1_batch_size': config.stage_1_batch_size,
'stage_2_batch_size': config.stage_2_batch_size,
'stage_3_batch_size': config.stage_3_batch_size,
'stage_4_batch_size': config.stage_4_batch_size,
'stage_5_batch_size': config.stage_5_batch_size,
'stage_6_batch_size': config.stage_6_batch_size,
'last_run_at': config.last_run_at,
'next_run_at': config.next_run_at,
})
@action(detail=False, methods=['put'])
def update_config(self, request):
"""
PUT /api/v1/automation/update_config/?site_id=123
Update automation configuration
Body:
{
"is_enabled": true,
"frequency": "daily",
"scheduled_time": "02:00",
"stage_1_batch_size": 20,
...
}
"""
site, error_response = self._get_site(request)
if error_response:
return error_response
config, _ = AutomationConfig.objects.get_or_create(
account=site.account,
site=site
)
# Update fields
if 'is_enabled' in request.data:
config.is_enabled = request.data['is_enabled']
if 'frequency' in request.data:
config.frequency = request.data['frequency']
if 'scheduled_time' in request.data:
config.scheduled_time = request.data['scheduled_time']
if 'stage_1_batch_size' in request.data:
config.stage_1_batch_size = request.data['stage_1_batch_size']
if 'stage_2_batch_size' in request.data:
config.stage_2_batch_size = request.data['stage_2_batch_size']
if 'stage_3_batch_size' in request.data:
config.stage_3_batch_size = request.data['stage_3_batch_size']
if 'stage_4_batch_size' in request.data:
config.stage_4_batch_size = request.data['stage_4_batch_size']
if 'stage_5_batch_size' in request.data:
config.stage_5_batch_size = request.data['stage_5_batch_size']
if 'stage_6_batch_size' in request.data:
config.stage_6_batch_size = request.data['stage_6_batch_size']
config.save()
return Response({'message': 'Config updated'})
@action(detail=False, methods=['post'])
def run_now(self, request):
"""
POST /api/v1/automation/run_now/?site_id=123
Trigger automation run immediately
"""
site, error_response = self._get_site(request)
if error_response:
return error_response
try:
service = AutomationService(site.account, site)
run_id = service.start_automation(trigger_type='manual')
# Start async processing
from igny8_core.business.automation.tasks import run_automation_task
run_automation_task.delay(run_id)
return Response({
'run_id': run_id,
'message': 'Automation started'
})
except ValueError as e:
return Response(
{'error': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
except Exception as e:
return Response(
{'error': f'Failed to start automation: {str(e)}'},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@action(detail=False, methods=['get'])
def current_run(self, request):
"""
GET /api/v1/automation/current_run/?site_id=123
Get current automation run status
"""
site, error_response = self._get_site(request)
if error_response:
return error_response
run = AutomationRun.objects.filter(
site=site,
status__in=['running', 'paused']
).order_by('-started_at').first()
if not run:
return Response({'run': None})
return Response({
'run': {
'run_id': run.run_id,
'status': run.status,
'current_stage': run.current_stage,
'trigger_type': run.trigger_type,
'started_at': run.started_at,
'total_credits_used': run.total_credits_used,
'stage_1_result': run.stage_1_result,
'stage_2_result': run.stage_2_result,
'stage_3_result': run.stage_3_result,
'stage_4_result': run.stage_4_result,
'stage_5_result': run.stage_5_result,
'stage_6_result': run.stage_6_result,
'stage_7_result': run.stage_7_result,
}
})
@action(detail=False, methods=['post'])
def pause(self, request):
"""
POST /api/v1/automation/pause/?run_id=abc123
Pause automation run
"""
run_id = request.query_params.get('run_id')
if not run_id:
return Response(
{'error': 'run_id required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
service = AutomationService.from_run_id(run_id)
service.pause_automation()
return Response({'message': 'Automation paused'})
except AutomationRun.DoesNotExist:
return Response(
{'error': 'Run not found'},
status=status.HTTP_404_NOT_FOUND
)
@action(detail=False, methods=['post'])
def resume(self, request):
"""
POST /api/v1/automation/resume/?run_id=abc123
Resume paused automation run
"""
run_id = request.query_params.get('run_id')
if not run_id:
return Response(
{'error': 'run_id required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
service = AutomationService.from_run_id(run_id)
service.resume_automation()
# Resume async processing
from igny8_core.business.automation.tasks import resume_automation_task
resume_automation_task.delay(run_id)
return Response({'message': 'Automation resumed'})
except AutomationRun.DoesNotExist:
return Response(
{'error': 'Run not found'},
status=status.HTTP_404_NOT_FOUND
)
@action(detail=False, methods=['get'])
def history(self, request):
"""
GET /api/v1/automation/history/?site_id=123
Get automation run history
"""
site, error_response = self._get_site(request)
if error_response:
return error_response
runs = AutomationRun.objects.filter(
site=site
).order_by('-started_at')[:20]
return Response({
'runs': [
{
'run_id': run.run_id,
'status': run.status,
'trigger_type': run.trigger_type,
'started_at': run.started_at,
'completed_at': run.completed_at,
'total_credits_used': run.total_credits_used,
'current_stage': run.current_stage,
}
for run in runs
]
})
@action(detail=False, methods=['get'])
def logs(self, request):
"""
GET /api/v1/automation/logs/?run_id=abc123&lines=100
Get automation run logs
"""
run_id = request.query_params.get('run_id')
if not run_id:
return Response(
{'error': 'run_id required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
run = AutomationRun.objects.get(run_id=run_id)
service = AutomationService(run.account, run.site)
lines = int(request.query_params.get('lines', 100))
log_text = service.logger.get_activity_log(
run.account.id, run.site.id, run_id, lines
)
return Response({
'run_id': run_id,
'log': log_text
})
except AutomationRun.DoesNotExist:
return Response(
{'error': 'Run not found'},
status=status.HTTP_404_NOT_FOUND
)
@action(detail=False, methods=['get'])
def estimate(self, request):
"""
GET /api/v1/automation/estimate/?site_id=123
Estimate credits needed for automation
"""
site, error_response = self._get_site(request)
if error_response:
return error_response
service = AutomationService(site.account, site)
estimated_credits = service.estimate_credits()
return Response({
'estimated_credits': estimated_credits,
'current_balance': site.account.credits,
'sufficient': site.account.credits >= (estimated_credits * 1.2)
})
@action(detail=False, methods=['get'])
def pipeline_overview(self, request):
"""
GET /api/v1/automation/pipeline_overview/?site_id=123
Get pipeline overview with pending counts for all stages
"""
site, error_response = self._get_site(request)
if error_response:
return error_response
from igny8_core.business.planning.models import Keywords, Clusters, ContentIdeas
from igny8_core.business.content.models import Tasks, Content, Images
from django.db.models import Count
def _counts_by_status(model, extra_filter=None, exclude_filter=None):
"""Return a dict of counts keyed by status and the total for a given model and site."""
qs = model.objects.filter(site=site)
if extra_filter:
qs = qs.filter(**extra_filter)
if exclude_filter:
qs = qs.exclude(**exclude_filter)
# Group by status when available
try:
rows = qs.values('status').annotate(count=Count('id'))
counts = {r['status']: r['count'] for r in rows}
total = sum(counts.values())
except Exception:
# Fallback: count all
total = qs.count()
counts = {'total': total}
return counts, total
# Stage 1: Keywords pending clustering (keep previous "pending" semantics but also return status breakdown)
stage_1_counts, stage_1_total = _counts_by_status(
Keywords,
extra_filter={'disabled': False}
)
# pending definition used by the UI previously (new & not clustered)
stage_1_pending = Keywords.objects.filter(
site=site,
status='new',
cluster__isnull=True,
disabled=False
).count()
# Stage 2: Clusters needing ideas
stage_2_counts, stage_2_total = _counts_by_status(
Clusters,
extra_filter={'disabled': False}
)
stage_2_pending = Clusters.objects.filter(
site=site,
status='new',
disabled=False
).exclude(
ideas__isnull=False
).count()
# Stage 3: Ideas ready to queue
stage_3_counts, stage_3_total = _counts_by_status(ContentIdeas)
stage_3_pending = ContentIdeas.objects.filter(
site=site,
status='new'
).count()
# Stage 4: Tasks ready for content generation
stage_4_counts, stage_4_total = _counts_by_status(Tasks)
stage_4_pending = Tasks.objects.filter(
site=site,
status='queued'
).count()
# Stage 5: Content ready for image prompts
# We will provide counts per content status and also compute pending as previous (draft with 0 images)
stage_5_counts, stage_5_total = _counts_by_status(Content)
stage_5_pending = Content.objects.filter(
site=site,
status='draft'
).annotate(
images_count=Count('images')
).filter(
images_count=0
).count()
# Stage 6: Image prompts ready for generation
stage_6_counts, stage_6_total = _counts_by_status(Images)
stage_6_pending = Images.objects.filter(
site=site,
status='pending'
).count()
# Stage 7: Content ready for review
# Provide counts per status for content and keep previous "review" pending count
stage_7_counts, stage_7_total = _counts_by_status(Content)
stage_7_ready = Content.objects.filter(
site=site,
status='review'
).count()
return Response({
'stages': [
{
'number': 1,
'name': 'Keywords → Clusters',
'pending': stage_1_pending,
'type': 'AI',
'counts': stage_1_counts,
'total': stage_1_total
},
{
'number': 2,
'name': 'Clusters → Ideas',
'pending': stage_2_pending,
'type': 'AI',
'counts': stage_2_counts,
'total': stage_2_total
},
{
'number': 3,
'name': 'Ideas → Tasks',
'pending': stage_3_pending,
'type': 'Local',
'counts': stage_3_counts,
'total': stage_3_total
},
{
'number': 4,
'name': 'Tasks → Content',
'pending': stage_4_pending,
'type': 'AI',
'counts': stage_4_counts,
'total': stage_4_total
},
{
'number': 5,
'name': 'Content → Image Prompts',
'pending': stage_5_pending,
'type': 'AI',
'counts': stage_5_counts,
'total': stage_5_total
},
{
'number': 6,
'name': 'Image Prompts → Images',
'pending': stage_6_pending,
'type': 'AI',
'counts': stage_6_counts,
'total': stage_6_total
},
{
'number': 7,
'name': 'Manual Review Gate',
'pending': stage_7_ready,
'type': 'Manual',
'counts': stage_7_counts,
'total': stage_7_total
}
]
})
@action(detail=False, methods=['get'], url_path='current_processing')
def current_processing(self, request):
"""
GET /api/v1/automation/current_processing/?site_id=123&run_id=abc
Get current processing state for active automation run
"""
site_id = request.query_params.get('site_id')
run_id = request.query_params.get('run_id')
if not site_id or not run_id:
return Response(
{'error': 'site_id and run_id required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
# Get the site
site = get_object_or_404(Site, id=site_id, account=request.user.account)
# Get the run
run = AutomationRun.objects.get(run_id=run_id, site=site)
# If not running, return None
if run.status != 'running':
return Response({'data': None})
# Get current processing state
service = AutomationService.from_run_id(run_id)
state = service.get_current_processing_state()
return Response({'data': state})
except AutomationRun.DoesNotExist:
return Response(
{'error': 'Run not found'},
status=status.HTTP_404_NOT_FOUND
)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@action(detail=False, methods=['post'], url_path='pause')
def pause_automation(self, request):
"""
POST /api/v1/automation/pause/?site_id=123&run_id=abc
Pause current automation run
Will complete current queue item then pause before next item
"""
site_id = request.query_params.get('site_id')
run_id = request.query_params.get('run_id')
if not site_id or not run_id:
return Response(
{'error': 'site_id and run_id required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
site = get_object_or_404(Site, id=site_id, account=request.user.account)
run = AutomationRun.objects.get(run_id=run_id, site=site)
if run.status != 'running':
return Response(
{'error': f'Cannot pause automation with status: {run.status}'},
status=status.HTTP_400_BAD_REQUEST
)
# Update status to paused
run.status = 'paused'
run.paused_at = timezone.now()
run.save(update_fields=['status', 'paused_at'])
return Response({
'message': 'Automation paused',
'status': run.status,
'paused_at': run.paused_at
})
except AutomationRun.DoesNotExist:
return Response(
{'error': 'Run not found'},
status=status.HTTP_404_NOT_FOUND
)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@action(detail=False, methods=['post'], url_path='resume')
def resume_automation(self, request):
"""
POST /api/v1/automation/resume/?site_id=123&run_id=abc
Resume paused automation run
Will continue from next queue item in current stage
"""
site_id = request.query_params.get('site_id')
run_id = request.query_params.get('run_id')
if not site_id or not run_id:
return Response(
{'error': 'site_id and run_id required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
site = get_object_or_404(Site, id=site_id, account=request.user.account)
run = AutomationRun.objects.get(run_id=run_id, site=site)
if run.status != 'paused':
return Response(
{'error': f'Cannot resume automation with status: {run.status}'},
status=status.HTTP_400_BAD_REQUEST
)
# Update status to running
run.status = 'running'
run.resumed_at = timezone.now()
run.save(update_fields=['status', 'resumed_at'])
# Queue continuation task
from igny8_core.business.automation.tasks import continue_automation_task
continue_automation_task.delay(run_id)
return Response({
'message': 'Automation resumed',
'status': run.status,
'resumed_at': run.resumed_at
})
except AutomationRun.DoesNotExist:
return Response(
{'error': 'Run not found'},
status=status.HTTP_404_NOT_FOUND
)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@action(detail=False, methods=['post'], url_path='cancel')
def cancel_automation(self, request):
"""
POST /api/v1/automation/cancel/?site_id=123&run_id=abc
Cancel current automation run
Will complete current queue item then stop permanently
"""
site_id = request.query_params.get('site_id')
run_id = request.query_params.get('run_id')
if not site_id or not run_id:
return Response(
{'error': 'site_id and run_id required'},
status=status.HTTP_400_BAD_REQUEST
)
try:
site = get_object_or_404(Site, id=site_id, account=request.user.account)
run = AutomationRun.objects.get(run_id=run_id, site=site)
if run.status not in ['running', 'paused']:
return Response(
{'error': f'Cannot cancel automation with status: {run.status}'},
status=status.HTTP_400_BAD_REQUEST
)
# Update status to cancelled
run.status = 'cancelled'
run.cancelled_at = timezone.now()
run.completed_at = timezone.now()
run.save(update_fields=['status', 'cancelled_at', 'completed_at'])
return Response({
'message': 'Automation cancelled',
'status': run.status,
'cancelled_at': run.cancelled_at
})
except AutomationRun.DoesNotExist:
return Response(
{'error': 'Run not found'},
status=status.HTTP_404_NOT_FOUND
)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)