- Introduced a new scheduled task for executing automation rules every 5 minutes in the Celery beat schedule. - Updated URL routing to include a new endpoint for automation-related functionalities. - Refactored imports in various modules to align with the new business layer structure, ensuring backward compatibility for billing models, exceptions, and services.
93 lines
3.7 KiB
Python
93 lines
3.7 KiB
Python
"""
|
|
ViewSets for Automation Models
|
|
Unified API Standard v1.0 compliant
|
|
"""
|
|
from rest_framework import viewsets, status
|
|
from rest_framework.decorators import action
|
|
from rest_framework.response import Response
|
|
from django_filters.rest_framework import DjangoFilterBackend
|
|
from rest_framework import filters
|
|
from drf_spectacular.utils import extend_schema, extend_schema_view
|
|
from igny8_core.api.base import SiteSectorModelViewSet, AccountModelViewSet
|
|
from igny8_core.api.pagination import CustomPageNumberPagination
|
|
from igny8_core.api.response import success_response, error_response
|
|
from igny8_core.api.throttles import DebugScopedRateThrottle
|
|
from igny8_core.api.permissions import IsAuthenticatedAndActive, IsViewerOrAbove
|
|
from igny8_core.business.automation.models import AutomationRule, ScheduledTask
|
|
from igny8_core.business.automation.services.automation_service import AutomationService
|
|
from .serializers import AutomationRuleSerializer, ScheduledTaskSerializer
|
|
|
|
|
|
@extend_schema_view(
|
|
list=extend_schema(tags=['Automation']),
|
|
create=extend_schema(tags=['Automation']),
|
|
retrieve=extend_schema(tags=['Automation']),
|
|
update=extend_schema(tags=['Automation']),
|
|
partial_update=extend_schema(tags=['Automation']),
|
|
destroy=extend_schema(tags=['Automation']),
|
|
)
|
|
class AutomationRuleViewSet(SiteSectorModelViewSet):
|
|
"""
|
|
ViewSet for managing automation rules
|
|
Unified API Standard v1.0 compliant
|
|
"""
|
|
queryset = AutomationRule.objects.all()
|
|
serializer_class = AutomationRuleSerializer
|
|
permission_classes = [IsAuthenticatedAndActive, IsViewerOrAbove]
|
|
pagination_class = CustomPageNumberPagination
|
|
throttle_scope = 'automation'
|
|
throttle_classes = [DebugScopedRateThrottle]
|
|
|
|
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
|
|
search_fields = ['name', 'description']
|
|
ordering_fields = ['name', 'created_at', 'last_executed_at', 'execution_count']
|
|
ordering = ['-created_at']
|
|
filterset_fields = ['trigger', 'is_active', 'status']
|
|
|
|
@action(detail=True, methods=['post'], url_path='execute', url_name='execute')
|
|
def execute(self, request, pk=None):
|
|
"""Manually execute an automation rule"""
|
|
rule = self.get_object()
|
|
service = AutomationService()
|
|
|
|
try:
|
|
result = service.execute_rule(rule, context=request.data.get('context', {}))
|
|
return success_response(
|
|
data=result,
|
|
message='Rule executed successfully',
|
|
request=request
|
|
)
|
|
except Exception as e:
|
|
return error_response(
|
|
error=str(e),
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
request=request
|
|
)
|
|
|
|
|
|
@extend_schema_view(
|
|
list=extend_schema(tags=['Automation']),
|
|
create=extend_schema(tags=['Automation']),
|
|
retrieve=extend_schema(tags=['Automation']),
|
|
update=extend_schema(tags=['Automation']),
|
|
partial_update=extend_schema(tags=['Automation']),
|
|
destroy=extend_schema(tags=['Automation']),
|
|
)
|
|
class ScheduledTaskViewSet(AccountModelViewSet):
|
|
"""
|
|
ViewSet for managing scheduled tasks
|
|
Unified API Standard v1.0 compliant
|
|
"""
|
|
queryset = ScheduledTask.objects.select_related('automation_rule')
|
|
serializer_class = ScheduledTaskSerializer
|
|
permission_classes = [IsAuthenticatedAndActive, IsViewerOrAbove]
|
|
pagination_class = CustomPageNumberPagination
|
|
throttle_scope = 'automation'
|
|
throttle_classes = [DebugScopedRateThrottle]
|
|
|
|
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
|
|
ordering_fields = ['scheduled_at', 'executed_at', 'status', 'created_at']
|
|
ordering = ['-scheduled_at']
|
|
filterset_fields = ['automation_rule', 'status']
|
|
|