# PHASE 2: AUTOMATION SYSTEM **Detailed Implementation Plan** **Goal**: Implement automation rules and scheduled tasks. **Timeline**: 2-3 weeks **Priority**: HIGH **Dependencies**: Phase 1 --- ## TABLE OF CONTENTS 1. [Overview](#overview) 2. [Automation Models](#automation-models) 3. [Automation Service](#automation-service) 4. [Celery Beat Tasks](#celery-beat-tasks) 5. [Automation API](#automation-api) 6. [Automation UI](#automation-ui) 7. [Testing & Validation](#testing--validation) 8. [Implementation Checklist](#implementation-checklist) --- ## OVERVIEW ### Objectives - ✅ Create AutomationRule and ScheduledTask models - ✅ Build AutomationService with rule execution engine - ✅ Implement Celery Beat scheduled tasks - ✅ Create automation API endpoints - ✅ Build automation UI (Dashboard, Rules, History) ### Key Principles - **Rule-Based**: Users create rules with triggers, conditions, actions - **Scheduled Execution**: Rules can run on schedule or event triggers - **Credit-Aware**: Automation respects credit limits - **Audit Trail**: All automation executions logged --- ## AUTOMATION MODELS ### 2.1 Automation Models **Purpose**: Store automation rules and scheduled task records. #### AutomationRule Model | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **AutomationRule Model** | `domain/automation/models.py` | Phase 1 | Create model with trigger, conditions, actions, schedule | **AutomationRule Model**: ```python # domain/automation/models.py class AutomationRule(SiteSectorBaseModel): name = models.CharField(max_length=255) description = models.TextField(blank=True) # Trigger configuration trigger = models.CharField( max_length=50, choices=[ ('schedule', 'Scheduled'), ('keyword_added', 'Keyword Added'), ('cluster_created', 'Cluster Created'), ('idea_created', 'Idea Created'), ('content_generated', 'Content Generated'), ('task_created', 'Task Created'), ] ) # Condition evaluation conditions = models.JSONField(default=dict) # Example: {'field': 'status', 'operator': 'eq', 'value': 'draft'} # Actions to execute actions = models.JSONField(default=list) # Example: [{'type': 'generate_ideas', 'params': {'cluster_ids': [1, 2]}}] # Schedule configuration (for scheduled triggers) schedule = models.JSONField(default=dict) # Example: {'cron': '0 9 * * *', 'timezone': 'UTC'} # Execution limits is_active = models.BooleanField(default=True) max_executions_per_day = models.IntegerField(default=10) credit_limit_per_execution = models.IntegerField(default=100) # Tracking last_executed_at = models.DateTimeField(null=True, blank=True) execution_count_today = models.IntegerField(default=0) last_reset_at = models.DateTimeField(auto_now_add=True) class Meta: ordering = ['-created_at'] ``` #### ScheduledTask Model | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **ScheduledTask Model** | `domain/automation/models.py` | Phase 1 | Create model to track scheduled executions | **ScheduledTask Model**: ```python # domain/automation/models.py class ScheduledTask(SiteSectorBaseModel): automation_rule = models.ForeignKey(AutomationRule, on_delete=models.CASCADE) scheduled_at = models.DateTimeField() executed_at = models.DateTimeField(null=True, blank=True) status = models.CharField( max_length=20, choices=[ ('pending', 'Pending'), ('running', 'Running'), ('completed', 'Completed'), ('failed', 'Failed'), ('skipped', 'Skipped'), ], default='pending' ) result = models.JSONField(default=dict, blank=True) error_message = models.TextField(blank=True) credits_used = models.IntegerField(default=0) class Meta: ordering = ['-scheduled_at'] ``` #### Automation Migrations | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **Automation Migrations** | `domain/automation/migrations/` | Phase 1 | Create initial migrations | --- ## AUTOMATION SERVICE ### 2.2 Automation Service **Purpose**: Execute automation rules with condition evaluation and action execution. #### AutomationService | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **AutomationService** | `domain/automation/services/automation_service.py` | Phase 1 services | Main service for rule execution | **AutomationService Methods**: ```python # domain/automation/services/automation_service.py class AutomationService: def __init__(self): self.rule_engine = RuleEngine() self.condition_evaluator = ConditionEvaluator() self.action_executor = ActionExecutor() self.credit_service = CreditService() def execute_rule(self, rule, context=None): """Execute an automation rule""" # Check if rule is active if not rule.is_active: return {'status': 'skipped', 'reason': 'Rule is inactive'} # Check execution limits if not self._check_execution_limits(rule): return {'status': 'skipped', 'reason': 'Execution limit reached'} # Check credits if not self.credit_service.check_credits(rule.account, 'automation', rule.credit_limit_per_execution): return {'status': 'skipped', 'reason': 'Insufficient credits'} # Evaluate conditions if not self.condition_evaluator.evaluate(rule.conditions, context): return {'status': 'skipped', 'reason': 'Conditions not met'} # Execute actions results = self.action_executor.execute(rule.actions, context) # Update rule tracking rule.last_executed_at = timezone.now() rule.execution_count_today += 1 rule.save() return {'status': 'completed', 'results': results} def _check_execution_limits(self, rule): """Check if rule can execute (daily limit)""" # Reset counter if new day if rule.last_reset_at.date() < timezone.now().date(): rule.execution_count_today = 0 rule.last_reset_at = timezone.now() rule.save() return rule.execution_count_today < rule.max_executions_per_day ``` #### Rule Execution Engine | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **Rule Execution Engine** | `domain/automation/services/rule_engine.py` | Phase 1 services | Orchestrates rule execution | **RuleEngine Methods**: ```python # domain/automation/services/rule_engine.py class RuleEngine: def execute_rule(self, rule, context): """Orchestrate rule execution""" # Validate rule # Check conditions # Execute actions # Handle errors pass ``` #### Condition Evaluator | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **Condition Evaluator** | `domain/automation/services/condition_evaluator.py` | None | Evaluates rule conditions | **ConditionEvaluator Methods**: ```python # domain/automation/services/condition_evaluator.py class ConditionEvaluator: def evaluate(self, conditions, context): """Evaluate rule conditions""" # Support operators: eq, ne, gt, gte, lt, lte, in, contains # Example: {'field': 'status', 'operator': 'eq', 'value': 'draft'} pass ``` #### Action Executor | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **Action Executor** | `domain/automation/services/action_executor.py` | Phase 1 services | Executes rule actions | **ActionExecutor Methods**: ```python # domain/automation/services/action_executor.py class ActionExecutor: def __init__(self): self.clustering_service = ClusteringService() self.ideas_service = IdeasService() self.content_service = ContentGenerationService() def execute(self, actions, context): """Execute rule actions""" results = [] for action in actions: action_type = action['type'] params = action.get('params', {}) if action_type == 'generate_ideas': result = self.ideas_service.generate_ideas(params['cluster_ids'], context['account']) elif action_type == 'generate_content': result = self.content_service.generate_content(params['task_id'], context['account']) # ... other action types results.append(result) return results ``` --- ## CELERY BEAT TASKS ### 2.3 Celery Beat Tasks **Purpose**: Schedule automation rules and monthly credit replenishment. #### Scheduled Automation Task | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **Scheduled Automation Task** | `infrastructure/messaging/automation_tasks.py` | AutomationService | Periodic task to execute scheduled rules | **Scheduled Automation Task**: ```python # infrastructure/messaging/automation_tasks.py from celery import shared_task from celery.schedules import crontab @shared_task def execute_scheduled_automation_rules(): """Execute all scheduled automation rules""" from domain.automation.services.automation_service import AutomationService service = AutomationService() rules = AutomationRule.objects.filter( trigger='schedule', is_active=True ) for rule in rules: # Check if rule should execute based on schedule if should_execute_now(rule.schedule): service.execute_rule(rule) ``` #### Monthly Credit Replenishment | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **Monthly Credit Replenishment** | `infrastructure/messaging/automation_tasks.py` | CreditService | Add credits monthly to accounts | **Monthly Credit Replenishment Task**: ```python # infrastructure/messaging/automation_tasks.py @shared_task def replenish_monthly_credits(): """Replenish monthly credits for all active accounts""" from domain.billing.services.credit_service import CreditService service = CreditService() accounts = Account.objects.filter(status='active') for account in accounts: if account.plan: monthly_credits = account.plan.monthly_credits if monthly_credits > 0: service.add_credits(account, monthly_credits, 'monthly_replenishment') ``` #### Celery Beat Configuration | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **Celery Beat Configuration** | `backend/igny8_core/celery.py` | None | Configure periodic tasks | **Celery Beat Configuration**: ```python # backend/igny8_core/celery.py from celery.schedules import crontab app.conf.beat_schedule = { 'execute-scheduled-automation-rules': { 'task': 'infrastructure.messaging.automation_tasks.execute_scheduled_automation_rules', 'schedule': crontab(minute='*/15'), # Every 15 minutes }, 'replenish-monthly-credits': { 'task': 'infrastructure.messaging.automation_tasks.replenish_monthly_credits', 'schedule': crontab(hour=0, minute=0, day_of_month=1), # First day of month }, } ``` --- ## AUTOMATION API ### 2.4 Automation API **Purpose**: CRUD API for automation rules and scheduled tasks. #### AutomationRule ViewSet | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **AutomationRule ViewSet** | `modules/automation/views.py` | AutomationService | CRUD operations for rules | **AutomationRule ViewSet**: ```python # modules/automation/views.py class AutomationRuleViewSet(AccountModelViewSet): queryset = AutomationRule.objects.all() serializer_class = AutomationRuleSerializer def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.automation_service = AutomationService() @action(detail=True, methods=['post']) def execute(self, request, pk=None): """Manually execute a rule""" rule = self.get_object() result = self.automation_service.execute_rule(rule, {'account': request.account}) return Response(result) @action(detail=True, methods=['post']) def test(self, request, pk=None): """Test rule conditions without executing""" rule = self.get_object() # Test condition evaluation pass ``` #### ScheduledTask ViewSet | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **ScheduledTask ViewSet** | `modules/automation/views.py` | AutomationService | View scheduled task history | **ScheduledTask ViewSet**: ```python # modules/automation/views.py class ScheduledTaskViewSet(AccountModelViewSet): queryset = ScheduledTask.objects.all() serializer_class = ScheduledTaskSerializer filterset_fields = ['status', 'automation_rule'] ordering = ['-scheduled_at'] ``` #### Automation URLs | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **Automation URLs** | `modules/automation/urls.py` | None | Register automation routes | **Automation URLs**: ```python # modules/automation/urls.py from rest_framework.routers import DefaultRouter from .views import AutomationRuleViewSet, ScheduledTaskViewSet router = DefaultRouter() router.register(r'rules', AutomationRuleViewSet, basename='automation-rule') router.register(r'scheduled-tasks', ScheduledTaskViewSet, basename='scheduled-task') urlpatterns = router.urls ``` --- ## AUTOMATION UI ### 2.5 Automation UI **Purpose**: User interface for managing automation rules and viewing history. #### Automation Dashboard | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **Automation Dashboard** | `frontend/src/pages/Automation/Dashboard.tsx` | EXISTING (placeholder) | Overview of automation status | **Dashboard Features**: - Active rules count - Recent executions - Success/failure rates - Credit usage from automation - Quick actions (create rule, view history) #### Rules Management | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **Rules Management** | `frontend/src/pages/Automation/Rules.tsx` | NEW | CRUD interface for rules | **Rules Management Features**: - List all rules - Create new rule (wizard) - Edit existing rule - Enable/disable rule - Delete rule - Test rule - Manual execution #### Schedules (Part of Automation Menu) **Note**: Schedules functionality will be integrated into the Automation menu group, not as a separate page. **Schedules Features** (within Automation Dashboard): - List scheduled tasks - Filter by status, rule, date - View execution results - View error messages - Retry failed tasks #### Automation API Client | Task | File | Dependencies | Implementation | |------|------|--------------|----------------| | **Automation API Client** | `frontend/src/services/automation.api.ts` | NEW | API client for automation endpoints | **Automation API Client**: ```typescript // frontend/src/services/automation.api.ts export const automationApi = { getRules: () => fetchAPI('/automation/rules/'), createRule: (data) => fetchAPI('/automation/rules/', { method: 'POST', body: data }), updateRule: (id, data) => fetchAPI(`/automation/rules/${id}/`, { method: 'PUT', body: data }), deleteRule: (id) => fetchAPI(`/automation/rules/${id}/`, { method: 'DELETE' }), executeRule: (id) => fetchAPI(`/automation/rules/${id}/execute/`, { method: 'POST' }), getScheduledTasks: (filters) => fetchAPI('/automation/scheduled-tasks/', { params: filters }), }; ``` --- ## TESTING & VALIDATION ### 2.6 Testing **Test Cases**: 1. **Automation Service Tests**: - ✅ Rules execute correctly - ✅ Conditions evaluate correctly - ✅ Actions execute correctly - ✅ Execution limits enforced - ✅ Credit checks work 2. **Scheduled Tasks Tests**: - ✅ Scheduled tasks run on time - ✅ Credit replenishment works monthly - ✅ Task status tracking works 3. **API Tests**: - ✅ CRUD operations work - ✅ Rule execution endpoint works - ✅ Scheduled task history works 4. **UI Tests**: - ✅ Dashboard displays correctly - ✅ Rules management works - ✅ Schedule history displays correctly --- ## IMPLEMENTATION CHECKLIST ### Backend Tasks - [ ] Create `domain/automation/models.py` - [ ] Create AutomationRule model - [ ] Create ScheduledTask model - [ ] Create automation migrations - [ ] Create `domain/automation/services/automation_service.py` - [ ] Create `domain/automation/services/rule_engine.py` - [ ] Create `domain/automation/services/condition_evaluator.py` - [ ] Create `domain/automation/services/action_executor.py` - [ ] Create `infrastructure/messaging/automation_tasks.py` - [ ] Add scheduled automation task - [ ] Add monthly credit replenishment task - [ ] Configure Celery Beat - [ ] Create `modules/automation/views.py` - [ ] Create AutomationRule ViewSet - [ ] Create ScheduledTask ViewSet - [ ] Create `modules/automation/serializers.py` - [ ] Create `modules/automation/urls.py` - [ ] Register automation URLs in main urls.py ### Frontend Tasks - [ ] Implement `frontend/src/pages/Automation/Dashboard.tsx` - [ ] Create `frontend/src/pages/Automation/Rules.tsx` - [ ] Integrate schedules functionality into Automation Dashboard (not as separate page) - [ ] Create `frontend/src/services/automation.api.ts` - [ ] Create rule creation wizard - [ ] Create rule editor - [ ] Create schedule history table (within Automation Dashboard) ### Testing Tasks - [ ] Test automation rule execution - [ ] Test scheduled tasks - [ ] Test credit replenishment - [ ] Test API endpoints - [ ] Test UI components --- ## RISK ASSESSMENT | Risk | Level | Mitigation | |------|-------|------------| | **Rule execution errors** | MEDIUM | Comprehensive error handling, logging | | **Credit limit violations** | MEDIUM | Credit checks before execution | | **Scheduled task failures** | MEDIUM | Retry mechanism, error logging | | **Performance issues** | LOW | Background processing, rate limiting | --- ## SUCCESS CRITERIA - ✅ Automation rules execute correctly - ✅ Scheduled tasks run on time - ✅ Credit replenishment works monthly - ✅ UI shows automation status - ✅ Rules can be created, edited, deleted - ✅ Execution history is tracked - ✅ All automation respects credit limits --- **END OF PHASE 2 DOCUMENT**