Files
igny8/backend/igny8_core/modules/automation/views.py
IGNY8 VPS (Salman) 7f8982a0ab Add scheduled automation task and update URL routing
- Introduced a new scheduled task for executing automation rules every 5 minutes in the Celery beat schedule.
- Updated URL routing to include a new endpoint for automation-related functionalities.
- Refactored imports in various modules to align with the new business layer structure, ensuring backward compatibility for billing models, exceptions, and services.
2025-11-16 22:11:05 +00:00

93 lines
3.7 KiB
Python

"""
ViewSets for Automation Models
Unified API Standard v1.0 compliant
"""
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework import filters
from drf_spectacular.utils import extend_schema, extend_schema_view
from igny8_core.api.base import SiteSectorModelViewSet, AccountModelViewSet
from igny8_core.api.pagination import CustomPageNumberPagination
from igny8_core.api.response import success_response, error_response
from igny8_core.api.throttles import DebugScopedRateThrottle
from igny8_core.api.permissions import IsAuthenticatedAndActive, IsViewerOrAbove
from igny8_core.business.automation.models import AutomationRule, ScheduledTask
from igny8_core.business.automation.services.automation_service import AutomationService
from .serializers import AutomationRuleSerializer, ScheduledTaskSerializer
@extend_schema_view(
list=extend_schema(tags=['Automation']),
create=extend_schema(tags=['Automation']),
retrieve=extend_schema(tags=['Automation']),
update=extend_schema(tags=['Automation']),
partial_update=extend_schema(tags=['Automation']),
destroy=extend_schema(tags=['Automation']),
)
class AutomationRuleViewSet(SiteSectorModelViewSet):
"""
ViewSet for managing automation rules
Unified API Standard v1.0 compliant
"""
queryset = AutomationRule.objects.all()
serializer_class = AutomationRuleSerializer
permission_classes = [IsAuthenticatedAndActive, IsViewerOrAbove]
pagination_class = CustomPageNumberPagination
throttle_scope = 'automation'
throttle_classes = [DebugScopedRateThrottle]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
search_fields = ['name', 'description']
ordering_fields = ['name', 'created_at', 'last_executed_at', 'execution_count']
ordering = ['-created_at']
filterset_fields = ['trigger', 'is_active', 'status']
@action(detail=True, methods=['post'], url_path='execute', url_name='execute')
def execute(self, request, pk=None):
"""Manually execute an automation rule"""
rule = self.get_object()
service = AutomationService()
try:
result = service.execute_rule(rule, context=request.data.get('context', {}))
return success_response(
data=result,
message='Rule executed successfully',
request=request
)
except Exception as e:
return error_response(
error=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@extend_schema_view(
list=extend_schema(tags=['Automation']),
create=extend_schema(tags=['Automation']),
retrieve=extend_schema(tags=['Automation']),
update=extend_schema(tags=['Automation']),
partial_update=extend_schema(tags=['Automation']),
destroy=extend_schema(tags=['Automation']),
)
class ScheduledTaskViewSet(AccountModelViewSet):
"""
ViewSet for managing scheduled tasks
Unified API Standard v1.0 compliant
"""
queryset = ScheduledTask.objects.select_related('automation_rule')
serializer_class = ScheduledTaskSerializer
permission_classes = [IsAuthenticatedAndActive, IsViewerOrAbove]
pagination_class = CustomPageNumberPagination
throttle_scope = 'automation'
throttle_classes = [DebugScopedRateThrottle]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
ordering_fields = ['scheduled_at', 'executed_at', 'status', 'created_at']
ordering = ['-scheduled_at']
filterset_fields = ['automation_rule', 'status']