- Renamed `domain/` to `business/` to better reflect the organization of code by business logic. - Updated all relevant file paths and references throughout the project to align with the new structure. - Ensured that all models and services are now located under the `business/` directory, maintaining existing functionality while improving clarity.
18 KiB
PHASE 2: AUTOMATION SYSTEM
Detailed Implementation Plan
Goal: Implement automation rules and scheduled tasks.
Timeline: 2-3 weeks
Priority: HIGH
Dependencies: Phase 1
TABLE OF CONTENTS
- Overview
- Automation Models
- Automation Service
- Celery Beat Tasks
- Automation API
- Automation UI
- Testing & Validation
- Implementation Checklist
OVERVIEW
Objectives
- ✅ Create AutomationRule and ScheduledTask models
- ✅ Build AutomationService with rule execution engine
- ✅ Implement Celery Beat scheduled tasks
- ✅ Create automation API endpoints
- ✅ Build automation UI (Dashboard, Rules, History)
Key Principles
- Rule-Based: Users create rules with triggers, conditions, actions
- Scheduled Execution: Rules can run on schedule or event triggers
- Credit-Aware: Automation respects credit limits
- Audit Trail: All automation executions logged
AUTOMATION MODELS
2.1 Automation Models
Purpose: Store automation rules and scheduled task records.
AutomationRule Model
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| AutomationRule Model | business/automation/models.py |
Phase 1 | Create model with trigger, conditions, actions, schedule |
AutomationRule Model:
# business/automation/models.py
class AutomationRule(SiteSectorBaseModel):
name = models.CharField(max_length=255)
description = models.TextField(blank=True)
# Trigger configuration
trigger = models.CharField(
max_length=50,
choices=[
('schedule', 'Scheduled'),
('keyword_added', 'Keyword Added'),
('cluster_created', 'Cluster Created'),
('idea_created', 'Idea Created'),
('content_generated', 'Content Generated'),
('task_created', 'Task Created'),
]
)
# Condition evaluation
conditions = models.JSONField(default=dict)
# Example: {'field': 'status', 'operator': 'eq', 'value': 'draft'}
# Actions to execute
actions = models.JSONField(default=list)
# Example: [{'type': 'generate_ideas', 'params': {'cluster_ids': [1, 2]}}]
# Schedule configuration (for scheduled triggers)
schedule = models.JSONField(default=dict)
# Example: {'cron': '0 9 * * *', 'timezone': 'UTC'}
# Execution limits
is_active = models.BooleanField(default=True)
max_executions_per_day = models.IntegerField(default=10)
credit_limit_per_execution = models.IntegerField(default=100)
# Tracking
last_executed_at = models.DateTimeField(null=True, blank=True)
execution_count_today = models.IntegerField(default=0)
last_reset_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['-created_at']
ScheduledTask Model
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| ScheduledTask Model | business/automation/models.py |
Phase 1 | Create model to track scheduled executions |
ScheduledTask Model:
# business/automation/models.py
class ScheduledTask(SiteSectorBaseModel):
automation_rule = models.ForeignKey(AutomationRule, on_delete=models.CASCADE)
scheduled_at = models.DateTimeField()
executed_at = models.DateTimeField(null=True, blank=True)
status = models.CharField(
max_length=20,
choices=[
('pending', 'Pending'),
('running', 'Running'),
('completed', 'Completed'),
('failed', 'Failed'),
('skipped', 'Skipped'),
],
default='pending'
)
result = models.JSONField(default=dict, blank=True)
error_message = models.TextField(blank=True)
credits_used = models.IntegerField(default=0)
class Meta:
ordering = ['-scheduled_at']
Automation Migrations
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| Automation Migrations | business/automation/migrations/ |
Phase 1 | Create initial migrations |
AUTOMATION SERVICE
2.2 Automation Service
Purpose: Execute automation rules with condition evaluation and action execution.
AutomationService
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| AutomationService | business/automation/services/automation_service.py |
Phase 1 services | Main service for rule execution |
AutomationService Methods:
# business/automation/services/automation_service.py
class AutomationService:
def __init__(self):
self.rule_engine = RuleEngine()
self.condition_evaluator = ConditionEvaluator()
self.action_executor = ActionExecutor()
self.credit_service = CreditService()
def execute_rule(self, rule, context=None):
"""Execute an automation rule"""
# Check if rule is active
if not rule.is_active:
return {'status': 'skipped', 'reason': 'Rule is inactive'}
# Check execution limits
if not self._check_execution_limits(rule):
return {'status': 'skipped', 'reason': 'Execution limit reached'}
# Check credits
if not self.credit_service.check_credits(rule.account, 'automation', rule.credit_limit_per_execution):
return {'status': 'skipped', 'reason': 'Insufficient credits'}
# Evaluate conditions
if not self.condition_evaluator.evaluate(rule.conditions, context):
return {'status': 'skipped', 'reason': 'Conditions not met'}
# Execute actions
results = self.action_executor.execute(rule.actions, context)
# Update rule tracking
rule.last_executed_at = timezone.now()
rule.execution_count_today += 1
rule.save()
return {'status': 'completed', 'results': results}
def _check_execution_limits(self, rule):
"""Check if rule can execute (daily limit)"""
# Reset counter if new day
if rule.last_reset_at.date() < timezone.now().date():
rule.execution_count_today = 0
rule.last_reset_at = timezone.now()
rule.save()
return rule.execution_count_today < rule.max_executions_per_day
Rule Execution Engine
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| Rule Execution Engine | business/automation/services/rule_engine.py |
Phase 1 services | Orchestrates rule execution |
RuleEngine Methods:
# business/automation/services/rule_engine.py
class RuleEngine:
def execute_rule(self, rule, context):
"""Orchestrate rule execution"""
# Validate rule
# Check conditions
# Execute actions
# Handle errors
pass
Condition Evaluator
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| Condition Evaluator | business/automation/services/condition_evaluator.py |
None | Evaluates rule conditions |
ConditionEvaluator Methods:
# business/automation/services/condition_evaluator.py
class ConditionEvaluator:
def evaluate(self, conditions, context):
"""Evaluate rule conditions"""
# Support operators: eq, ne, gt, gte, lt, lte, in, contains
# Example: {'field': 'status', 'operator': 'eq', 'value': 'draft'}
pass
Action Executor
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| Action Executor | business/automation/services/action_executor.py |
Phase 1 services | Executes rule actions |
ActionExecutor Methods:
# business/automation/services/action_executor.py
class ActionExecutor:
def __init__(self):
self.clustering_service = ClusteringService()
self.ideas_service = IdeasService()
self.content_service = ContentGenerationService()
def execute(self, actions, context):
"""Execute rule actions"""
results = []
for action in actions:
action_type = action['type']
params = action.get('params', {})
if action_type == 'generate_ideas':
result = self.ideas_service.generate_ideas(params['cluster_ids'], context['account'])
elif action_type == 'generate_content':
result = self.content_service.generate_content(params['task_id'], context['account'])
# ... other action types
results.append(result)
return results
CELERY BEAT TASKS
2.3 Celery Beat Tasks
Purpose: Schedule automation rules and monthly credit replenishment.
Scheduled Automation Task
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| Scheduled Automation Task | infrastructure/messaging/automation_tasks.py |
AutomationService | Periodic task to execute scheduled rules |
Scheduled Automation Task:
# infrastructure/messaging/automation_tasks.py
from celery import shared_task
from celery.schedules import crontab
@shared_task
def execute_scheduled_automation_rules():
"""Execute all scheduled automation rules"""
from business.automation.services.automation_service import AutomationService
service = AutomationService()
rules = AutomationRule.objects.filter(
trigger='schedule',
is_active=True
)
for rule in rules:
# Check if rule should execute based on schedule
if should_execute_now(rule.schedule):
service.execute_rule(rule)
Monthly Credit Replenishment
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| Monthly Credit Replenishment | infrastructure/messaging/automation_tasks.py |
CreditService | Add credits monthly to accounts |
Monthly Credit Replenishment Task:
# infrastructure/messaging/automation_tasks.py
@shared_task
def replenish_monthly_credits():
"""Replenish monthly credits for all active accounts"""
from business.billing.services.credit_service import CreditService
service = CreditService()
accounts = Account.objects.filter(status='active')
for account in accounts:
if account.plan:
monthly_credits = account.plan.monthly_credits
if monthly_credits > 0:
service.add_credits(account, monthly_credits, 'monthly_replenishment')
Celery Beat Configuration
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| Celery Beat Configuration | backend/igny8_core/celery.py |
None | Configure periodic tasks |
Celery Beat Configuration:
# backend/igny8_core/celery.py
from celery.schedules import crontab
app.conf.beat_schedule = {
'execute-scheduled-automation-rules': {
'task': 'infrastructure.messaging.automation_tasks.execute_scheduled_automation_rules',
'schedule': crontab(minute='*/15'), # Every 15 minutes
},
'replenish-monthly-credits': {
'task': 'infrastructure.messaging.automation_tasks.replenish_monthly_credits',
'schedule': crontab(hour=0, minute=0, day_of_month=1), # First day of month
},
}
AUTOMATION API
2.4 Automation API
Purpose: CRUD API for automation rules and scheduled tasks.
AutomationRule ViewSet
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| AutomationRule ViewSet | modules/automation/views.py |
AutomationService | CRUD operations for rules |
AutomationRule ViewSet:
# modules/automation/views.py
class AutomationRuleViewSet(AccountModelViewSet):
queryset = AutomationRule.objects.all()
serializer_class = AutomationRuleSerializer
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.automation_service = AutomationService()
@action(detail=True, methods=['post'])
def execute(self, request, pk=None):
"""Manually execute a rule"""
rule = self.get_object()
result = self.automation_service.execute_rule(rule, {'account': request.account})
return Response(result)
@action(detail=True, methods=['post'])
def test(self, request, pk=None):
"""Test rule conditions without executing"""
rule = self.get_object()
# Test condition evaluation
pass
ScheduledTask ViewSet
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| ScheduledTask ViewSet | modules/automation/views.py |
AutomationService | View scheduled task history |
ScheduledTask ViewSet:
# modules/automation/views.py
class ScheduledTaskViewSet(AccountModelViewSet):
queryset = ScheduledTask.objects.all()
serializer_class = ScheduledTaskSerializer
filterset_fields = ['status', 'automation_rule']
ordering = ['-scheduled_at']
Automation URLs
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| Automation URLs | modules/automation/urls.py |
None | Register automation routes |
Automation URLs:
# modules/automation/urls.py
from rest_framework.routers import DefaultRouter
from .views import AutomationRuleViewSet, ScheduledTaskViewSet
router = DefaultRouter()
router.register(r'rules', AutomationRuleViewSet, basename='automation-rule')
router.register(r'scheduled-tasks', ScheduledTaskViewSet, basename='scheduled-task')
urlpatterns = router.urls
AUTOMATION UI
2.5 Automation UI
Purpose: User interface for managing automation rules and viewing history.
Automation Dashboard
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| Automation Dashboard | frontend/src/pages/Automation/Dashboard.tsx |
EXISTING (placeholder) | Overview of automation status |
Dashboard Features:
- Active rules count
- Recent executions
- Success/failure rates
- Credit usage from automation
- Quick actions (create rule, view history)
Rules Management
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| Rules Management | frontend/src/pages/Automation/Rules.tsx |
NEW | CRUD interface for rules |
Rules Management Features:
- List all rules
- Create new rule (wizard)
- Edit existing rule
- Enable/disable rule
- Delete rule
- Test rule
- Manual execution
Schedules (Part of Automation Menu)
Note: Schedules functionality will be integrated into the Automation menu group, not as a separate page.
Schedules Features (within Automation Dashboard):
- List scheduled tasks
- Filter by status, rule, date
- View execution results
- View error messages
- Retry failed tasks
Automation API Client
| Task | File | Dependencies | Implementation |
|---|---|---|---|
| Automation API Client | frontend/src/services/automation.api.ts |
NEW | API client for automation endpoints |
Automation API Client:
// frontend/src/services/automation.api.ts
export const automationApi = {
getRules: () => fetchAPI('/automation/rules/'),
createRule: (data) => fetchAPI('/automation/rules/', { method: 'POST', body: data }),
updateRule: (id, data) => fetchAPI(`/automation/rules/${id}/`, { method: 'PUT', body: data }),
deleteRule: (id) => fetchAPI(`/automation/rules/${id}/`, { method: 'DELETE' }),
executeRule: (id) => fetchAPI(`/automation/rules/${id}/execute/`, { method: 'POST' }),
getScheduledTasks: (filters) => fetchAPI('/automation/scheduled-tasks/', { params: filters }),
};
TESTING & VALIDATION
2.6 Testing
Test Cases:
-
Automation Service Tests:
- ✅ Rules execute correctly
- ✅ Conditions evaluate correctly
- ✅ Actions execute correctly
- ✅ Execution limits enforced
- ✅ Credit checks work
-
Scheduled Tasks Tests:
- ✅ Scheduled tasks run on time
- ✅ Credit replenishment works monthly
- ✅ Task status tracking works
-
API Tests:
- ✅ CRUD operations work
- ✅ Rule execution endpoint works
- ✅ Scheduled task history works
-
UI Tests:
- ✅ Dashboard displays correctly
- ✅ Rules management works
- ✅ Schedule history displays correctly
IMPLEMENTATION CHECKLIST
Backend Tasks
- Create
business/automation/models.py - Create AutomationRule model
- Create ScheduledTask model
- Create automation migrations
- Create
business/automation/services/automation_service.py - Create
business/automation/services/rule_engine.py - Create
business/automation/services/condition_evaluator.py - Create
business/automation/services/action_executor.py - Create
infrastructure/messaging/automation_tasks.py - Add scheduled automation task
- Add monthly credit replenishment task
- Configure Celery Beat
- Create
modules/automation/views.py - Create AutomationRule ViewSet
- Create ScheduledTask ViewSet
- Create
modules/automation/serializers.py - Create
modules/automation/urls.py - Register automation URLs in main urls.py
Frontend Tasks
- Implement
frontend/src/pages/Automation/Dashboard.tsx - Create
frontend/src/pages/Automation/Rules.tsx - Integrate schedules functionality into Automation Dashboard (not as separate page)
- Create
frontend/src/services/automation.api.ts - Create rule creation wizard
- Create rule editor
- Create schedule history table (within Automation Dashboard)
Testing Tasks
- Test automation rule execution
- Test scheduled tasks
- Test credit replenishment
- Test API endpoints
- Test UI components
RISK ASSESSMENT
| Risk | Level | Mitigation |
|---|---|---|
| Rule execution errors | MEDIUM | Comprehensive error handling, logging |
| Credit limit violations | MEDIUM | Credit checks before execution |
| Scheduled task failures | MEDIUM | Retry mechanism, error logging |
| Performance issues | LOW | Background processing, rate limiting |
SUCCESS CRITERIA
- ✅ Automation rules execute correctly
- ✅ Scheduled tasks run on time
- ✅ Credit replenishment works monthly
- ✅ UI shows automation status
- ✅ Rules can be created, edited, deleted
- ✅ Execution history is tracked
- ✅ All automation respects credit limits
END OF PHASE 2 DOCUMENT