Add scheduled automation task and update URL routing

- Introduced a new scheduled task for executing automation rules every 5 minutes in the Celery beat schedule.
- Updated URL routing to include a new endpoint for automation-related functionalities.
- Refactored imports in various modules to align with the new business layer structure, ensuring backward compatibility for billing models, exceptions, and services.
This commit is contained in:
IGNY8 VPS (Salman)
2025-11-16 22:11:05 +00:00
parent 455358ecfc
commit 7f8982a0ab
40 changed files with 1822 additions and 778 deletions

View File

@@ -195,8 +195,8 @@ class AIEngine:
# Phase 2.5: CREDIT CHECK - Check credits before AI call (25%)
if self.account:
try:
from igny8_core.modules.billing.services import CreditService
from igny8_core.modules.billing.exceptions import InsufficientCreditsError
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
# Map function name to operation type
operation_type = self._get_operation_type(function_name)
@@ -353,8 +353,8 @@ class AIEngine:
# Phase 5.5: DEDUCT CREDITS - Deduct credits after successful save
if self.account and raw_response:
try:
from igny8_core.modules.billing.services import CreditService
from igny8_core.modules.billing.exceptions import InsufficientCreditsError
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
# Map function name to operation type
operation_type = self._get_operation_type(function_name)

View File

@@ -8,7 +8,7 @@ from django.db.models import Q
from igny8_core.auth.models import Account, User, Site, Sector
from igny8_core.modules.planner.models import Keywords, Clusters, ContentIdeas
from igny8_core.modules.writer.models import Tasks, Images, Content
from igny8_core.modules.billing.models import CreditTransaction, CreditUsageLog
from igny8_core.business.billing.models import CreditTransaction, CreditUsageLog
from igny8_core.modules.system.models import AIPrompt, IntegrationSettings, AuthorProfile, Strategy
from igny8_core.modules.system.settings_models import AccountSettings, UserSettings, ModuleSettings, AISettings

View File

@@ -0,0 +1,5 @@
"""
Business logic layer - Models and Services
Separated from API layer (modules/) for clean architecture
"""

View File

@@ -0,0 +1,4 @@
"""
Automation business logic - AutomationRule, ScheduledTask models and services
"""

View File

@@ -0,0 +1,141 @@
"""
Automation Models
Phase 2: Automation System
"""
from django.db import models
from django.core.validators import MinValueValidator
from igny8_core.auth.models import SiteSectorBaseModel, AccountBaseModel
import json
class AutomationRule(SiteSectorBaseModel):
"""
Automation Rule model for defining automated workflows.
Rules can be triggered by:
- schedule: Time-based triggers (cron-like)
- event: Event-based triggers (content created, keyword added, etc.)
- manual: Manual execution only
"""
TRIGGER_CHOICES = [
('schedule', 'Schedule'),
('event', 'Event'),
('manual', 'Manual'),
]
STATUS_CHOICES = [
('active', 'Active'),
('inactive', 'Inactive'),
('paused', 'Paused'),
]
name = models.CharField(max_length=255, help_text="Rule name")
description = models.TextField(blank=True, null=True, help_text="Rule description")
# Trigger configuration
trigger = models.CharField(max_length=50, choices=TRIGGER_CHOICES, default='manual')
# Schedule configuration (for schedule triggers)
# Stored as cron-like string: "0 0 * * *" (daily at midnight)
schedule = models.CharField(
max_length=100,
blank=True,
null=True,
help_text="Cron-like schedule string (e.g., '0 0 * * *' for daily at midnight)"
)
# Conditions (JSON field)
# Format: [{"field": "content.status", "operator": "equals", "value": "draft"}, ...]
conditions = models.JSONField(
default=list,
help_text="List of conditions that must be met for rule to execute"
)
# Actions (JSON field)
# Format: [{"type": "generate_content", "params": {...}}, ...]
actions = models.JSONField(
default=list,
help_text="List of actions to execute when rule triggers"
)
# Status
is_active = models.BooleanField(default=True, help_text="Whether rule is active")
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='active')
# Execution tracking
last_executed_at = models.DateTimeField(null=True, blank=True)
execution_count = models.IntegerField(default=0, validators=[MinValueValidator(0)])
# Metadata
metadata = models.JSONField(default=dict, help_text="Additional metadata")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_automation_rules'
ordering = ['-created_at']
verbose_name = 'Automation Rule'
verbose_name_plural = 'Automation Rules'
indexes = [
models.Index(fields=['trigger', 'is_active']),
models.Index(fields=['status']),
models.Index(fields=['site', 'sector']),
models.Index(fields=['trigger', 'is_active', 'status']),
]
def __str__(self):
return f"{self.name} ({self.get_trigger_display()})"
class ScheduledTask(AccountBaseModel):
"""
Scheduled Task model for tracking scheduled automation rule executions.
"""
STATUS_CHOICES = [
('pending', 'Pending'),
('running', 'Running'),
('completed', 'Completed'),
('failed', 'Failed'),
('cancelled', 'Cancelled'),
]
automation_rule = models.ForeignKey(
AutomationRule,
on_delete=models.CASCADE,
related_name='scheduled_tasks',
help_text="The automation rule this task belongs to"
)
scheduled_at = models.DateTimeField(help_text="When the task is scheduled to run")
executed_at = models.DateTimeField(null=True, blank=True, help_text="When the task was actually executed")
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='pending')
# Execution results
result = models.JSONField(default=dict, help_text="Execution result data")
error_message = models.TextField(blank=True, null=True, help_text="Error message if execution failed")
# Metadata
metadata = models.JSONField(default=dict, help_text="Additional metadata")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_scheduled_tasks'
ordering = ['-scheduled_at']
verbose_name = 'Scheduled Task'
verbose_name_plural = 'Scheduled Tasks'
indexes = [
models.Index(fields=['automation_rule', 'status']),
models.Index(fields=['scheduled_at', 'status']),
models.Index(fields=['account', 'status']),
models.Index(fields=['status', 'scheduled_at']),
]
def __str__(self):
return f"Scheduled task for {self.automation_rule.name} at {self.scheduled_at}"

View File

@@ -0,0 +1,4 @@
"""
Automation services
"""

View File

@@ -0,0 +1,101 @@
"""
Action Executor
Executes rule actions
"""
import logging
from igny8_core.business.planning.services.clustering_service import ClusteringService
from igny8_core.business.planning.services.ideas_service import IdeasService
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
logger = logging.getLogger(__name__)
class ActionExecutor:
"""Executes rule actions"""
def __init__(self):
self.clustering_service = ClusteringService()
self.ideas_service = IdeasService()
self.content_service = ContentGenerationService()
def execute(self, action, context, rule):
"""
Execute a single action.
Args:
action: Action dict with 'type' and 'params'
context: Context dict
rule: AutomationRule instance
Returns:
dict: Action execution result
"""
action_type = action.get('type')
params = action.get('params', {})
if action_type == 'cluster_keywords':
return self._execute_cluster_keywords(params, rule)
elif action_type == 'generate_ideas':
return self._execute_generate_ideas(params, rule)
elif action_type == 'generate_content':
return self._execute_generate_content(params, rule)
else:
logger.warning(f"Unknown action type: {action_type}")
return {
'success': False,
'error': f'Unknown action type: {action_type}'
}
def _execute_cluster_keywords(self, params, rule):
"""Execute cluster keywords action"""
keyword_ids = params.get('keyword_ids', [])
sector_id = params.get('sector_id') or (rule.sector.id if rule.sector else None)
try:
result = self.clustering_service.cluster_keywords(
keyword_ids=keyword_ids,
account=rule.account,
sector_id=sector_id
)
return result
except Exception as e:
logger.error(f"Error clustering keywords: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}
def _execute_generate_ideas(self, params, rule):
"""Execute generate ideas action"""
cluster_ids = params.get('cluster_ids', [])
try:
result = self.ideas_service.generate_ideas(
cluster_ids=cluster_ids,
account=rule.account
)
return result
except Exception as e:
logger.error(f"Error generating ideas: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}
def _execute_generate_content(self, params, rule):
"""Execute generate content action"""
task_ids = params.get('task_ids', [])
try:
result = self.content_service.generate_content(
task_ids=task_ids,
account=rule.account
)
return result
except Exception as e:
logger.error(f"Error generating content: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}

View File

@@ -0,0 +1,141 @@
"""
Automation Service
Main service for executing automation rules
"""
import logging
from django.utils import timezone
from igny8_core.business.automation.models import AutomationRule, ScheduledTask
from igny8_core.business.automation.services.rule_engine import RuleEngine
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
logger = logging.getLogger(__name__)
class AutomationService:
"""Service for executing automation rules"""
def __init__(self):
self.rule_engine = RuleEngine()
self.credit_service = CreditService()
def execute_rule(self, rule, context=None):
"""
Execute an automation rule.
Args:
rule: AutomationRule instance
context: Optional context dict for condition evaluation
Returns:
dict: Execution result with status and data
"""
if not rule.is_active or rule.status != 'active':
return {
'status': 'skipped',
'reason': 'Rule is inactive',
'rule_id': rule.id
}
# Check credits (estimate based on actions)
estimated_credits = self._estimate_credits(rule)
try:
self.credit_service.check_credits_legacy(rule.account, estimated_credits)
except InsufficientCreditsError as e:
logger.warning(f"Rule {rule.id} skipped: {str(e)}")
return {
'status': 'skipped',
'reason': f'Insufficient credits: {str(e)}',
'rule_id': rule.id
}
# Execute via rule engine
try:
result = self.rule_engine.execute(rule, context or {})
# Update rule tracking
rule.last_executed_at = timezone.now()
rule.execution_count += 1
rule.save(update_fields=['last_executed_at', 'execution_count'])
return {
'status': 'completed',
'rule_id': rule.id,
'result': result
}
except Exception as e:
logger.error(f"Error executing rule {rule.id}: {str(e)}", exc_info=True)
return {
'status': 'failed',
'reason': str(e),
'rule_id': rule.id
}
def _estimate_credits(self, rule):
"""Estimate credits needed for rule execution"""
# Simple estimation based on action types
estimated = 0
for action in rule.actions:
action_type = action.get('type', '')
if 'cluster' in action_type:
estimated += 10
elif 'idea' in action_type:
estimated += 15
elif 'content' in action_type:
estimated += 50 # Conservative estimate
else:
estimated += 5 # Default
return max(estimated, 10) # Minimum 10 credits
def execute_scheduled_rules(self):
"""
Execute all scheduled rules that are due.
Called by Celery Beat task.
Returns:
dict: Summary of executions
"""
from django.utils import timezone
now = timezone.now()
# Get active scheduled rules
rules = AutomationRule.objects.filter(
trigger='schedule',
is_active=True,
status='active'
)
executed = 0
skipped = 0
failed = 0
for rule in rules:
# Check if rule should execute based on schedule
if self._should_execute_schedule(rule, now):
result = self.execute_rule(rule)
if result['status'] == 'completed':
executed += 1
elif result['status'] == 'skipped':
skipped += 1
else:
failed += 1
return {
'executed': executed,
'skipped': skipped,
'failed': failed,
'total': len(rules)
}
def _should_execute_schedule(self, rule, now):
"""
Check if a scheduled rule should execute now.
Simple implementation - can be enhanced with proper cron parsing.
"""
if not rule.schedule:
return False
# For now, simple check - can be enhanced with cron parser
# This is a placeholder - proper implementation would parse cron string
return True # Simplified for now

View File

@@ -0,0 +1,104 @@
"""
Condition Evaluator
Evaluates rule conditions
"""
import logging
logger = logging.getLogger(__name__)
class ConditionEvaluator:
"""Evaluates rule conditions"""
OPERATORS = {
'equals': lambda a, b: a == b,
'not_equals': lambda a, b: a != b,
'greater_than': lambda a, b: a > b,
'greater_than_or_equal': lambda a, b: a >= b,
'less_than': lambda a, b: a < b,
'less_than_or_equal': lambda a, b: a <= b,
'in': lambda a, b: a in b,
'contains': lambda a, b: b in a if isinstance(a, str) else a in b,
'is_empty': lambda a, b: not a or (isinstance(a, str) and not a.strip()),
'is_not_empty': lambda a, b: a and (not isinstance(a, str) or a.strip()),
}
def evaluate(self, conditions, context):
"""
Evaluate a list of conditions.
Args:
conditions: List of condition dicts
context: Context dict for field resolution
Returns:
bool: True if all conditions are met
"""
if not conditions:
return True
for condition in conditions:
if not self._evaluate_condition(condition, context):
return False
return True
def _evaluate_condition(self, condition, context):
"""
Evaluate a single condition.
Condition format:
{
"field": "content.status",
"operator": "equals",
"value": "draft"
}
"""
field_path = condition.get('field')
operator = condition.get('operator', 'equals')
expected_value = condition.get('value')
if not field_path:
logger.warning("Condition missing 'field'")
return False
# Resolve field value from context
actual_value = self._resolve_field(field_path, context)
# Get operator function
op_func = self.OPERATORS.get(operator)
if not op_func:
logger.warning(f"Unknown operator: {operator}")
return False
# Evaluate
try:
return op_func(actual_value, expected_value)
except Exception as e:
logger.error(f"Error evaluating condition: {str(e)}", exc_info=True)
return False
def _resolve_field(self, field_path, context):
"""
Resolve a field path from context.
Examples:
- "content.status" -> context['content']['status']
- "count" -> context['count']
"""
parts = field_path.split('.')
value = context
for part in parts:
if isinstance(value, dict):
value = value.get(part)
elif hasattr(value, part):
value = getattr(value, part)
else:
return None
if value is None:
return None
return value

View File

@@ -0,0 +1,61 @@
"""
Rule Engine
Orchestrates rule execution
"""
import logging
from igny8_core.business.automation.services.condition_evaluator import ConditionEvaluator
from igny8_core.business.automation.services.action_executor import ActionExecutor
logger = logging.getLogger(__name__)
class RuleEngine:
"""Orchestrates rule execution"""
def __init__(self):
self.condition_evaluator = ConditionEvaluator()
self.action_executor = ActionExecutor()
def execute(self, rule, context):
"""
Execute a rule by evaluating conditions and executing actions.
Args:
rule: AutomationRule instance
context: Context dict for evaluation
Returns:
dict: Execution results
"""
# Evaluate conditions
if rule.conditions:
conditions_met = self.condition_evaluator.evaluate(rule.conditions, context)
if not conditions_met:
return {
'success': False,
'reason': 'Conditions not met'
}
# Execute actions
action_results = []
for action in rule.actions:
try:
result = self.action_executor.execute(action, context, rule)
action_results.append({
'action': action,
'success': True,
'result': result
})
except Exception as e:
logger.error(f"Action execution failed: {str(e)}", exc_info=True)
action_results.append({
'action': action,
'success': False,
'error': str(e)
})
return {
'success': True,
'actions': action_results
}

View File

@@ -0,0 +1,28 @@
"""
Automation Celery Tasks
"""
from celery import shared_task
import logging
from igny8_core.business.automation.services.automation_service import AutomationService
logger = logging.getLogger(__name__)
@shared_task(name='igny8_core.business.automation.tasks.execute_scheduled_automation_rules')
def execute_scheduled_automation_rules():
"""
Execute all scheduled automation rules.
Called by Celery Beat.
"""
try:
service = AutomationService()
result = service.execute_scheduled_rules()
logger.info(f"Executed scheduled automation rules: {result}")
return result
except Exception as e:
logger.error(f"Error executing scheduled automation rules: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}

View File

@@ -0,0 +1,4 @@
"""
Billing business logic - CreditTransaction, CreditUsageLog models and services
"""

View File

@@ -0,0 +1,21 @@
"""
Credit Cost Constants
Phase 0: Credit-only system costs per operation
"""
CREDIT_COSTS = {
'clustering': 10, # Per clustering request
'idea_generation': 15, # Per cluster → ideas request
'content_generation': 1, # Per 100 words
'image_prompt_extraction': 2, # Per content piece
'image_generation': 5, # Per image
'linking': 8, # Per content piece (NEW)
'optimization': 1, # Per 200 words (NEW)
'site_structure_generation': 50, # Per site blueprint (NEW)
'site_page_generation': 20, # Per page (NEW)
# Legacy operation types (for backward compatibility)
'ideas': 15, # Alias for idea_generation
'content': 3, # Legacy: 3 credits per content piece
'images': 5, # Alias for image_generation
'reparse': 1, # Per reparse
}

View File

@@ -0,0 +1,14 @@
"""
Billing Exceptions
"""
class InsufficientCreditsError(Exception):
"""Raised when account doesn't have enough credits"""
pass
class CreditCalculationError(Exception):
"""Raised when credit calculation fails"""
pass

View File

@@ -0,0 +1,75 @@
"""
Billing Models for Credit System
"""
from django.db import models
from django.core.validators import MinValueValidator
from igny8_core.auth.models import AccountBaseModel
class CreditTransaction(AccountBaseModel):
"""Track all credit transactions (additions, deductions)"""
TRANSACTION_TYPE_CHOICES = [
('purchase', 'Purchase'),
('subscription', 'Subscription Renewal'),
('refund', 'Refund'),
('deduction', 'Usage Deduction'),
('adjustment', 'Manual Adjustment'),
]
transaction_type = models.CharField(max_length=20, choices=TRANSACTION_TYPE_CHOICES, db_index=True)
amount = models.IntegerField(help_text="Positive for additions, negative for deductions")
balance_after = models.IntegerField(help_text="Credit balance after this transaction")
description = models.CharField(max_length=255)
metadata = models.JSONField(default=dict, help_text="Additional context (AI call details, etc.)")
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'igny8_credit_transactions'
ordering = ['-created_at']
indexes = [
models.Index(fields=['account', 'transaction_type']),
models.Index(fields=['account', 'created_at']),
]
def __str__(self):
account = getattr(self, 'account', None)
return f"{self.get_transaction_type_display()} - {self.amount} credits - {account.name if account else 'No Account'}"
class CreditUsageLog(AccountBaseModel):
"""Detailed log of credit usage per AI operation"""
OPERATION_TYPE_CHOICES = [
('clustering', 'Keyword Clustering'),
('idea_generation', 'Content Ideas Generation'),
('content_generation', 'Content Generation'),
('image_generation', 'Image Generation'),
('reparse', 'Content Reparse'),
('ideas', 'Content Ideas Generation'), # Legacy
('content', 'Content Generation'), # Legacy
('images', 'Image Generation'), # Legacy
]
operation_type = models.CharField(max_length=50, choices=OPERATION_TYPE_CHOICES, db_index=True)
credits_used = models.IntegerField(validators=[MinValueValidator(0)])
cost_usd = models.DecimalField(max_digits=10, decimal_places=4, null=True, blank=True)
model_used = models.CharField(max_length=100, blank=True)
tokens_input = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0)])
tokens_output = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0)])
related_object_type = models.CharField(max_length=50, blank=True) # 'keyword', 'cluster', 'task'
related_object_id = models.IntegerField(null=True, blank=True)
metadata = models.JSONField(default=dict)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'igny8_credit_usage_logs'
ordering = ['-created_at']
indexes = [
models.Index(fields=['account', 'operation_type']),
models.Index(fields=['account', 'created_at']),
models.Index(fields=['account', 'operation_type', 'created_at']),
]
def __str__(self):
account = getattr(self, 'account', None)
return f"{self.get_operation_type_display()} - {self.credits_used} credits - {account.name if account else 'No Account'}"

View File

@@ -0,0 +1,4 @@
"""
Billing services
"""

View File

@@ -0,0 +1,264 @@
"""
Credit Service for managing credit transactions and deductions
"""
from django.db import transaction
from django.utils import timezone
from igny8_core.business.billing.models import CreditTransaction, CreditUsageLog
from igny8_core.business.billing.constants import CREDIT_COSTS
from igny8_core.business.billing.exceptions import InsufficientCreditsError, CreditCalculationError
from igny8_core.auth.models import Account
class CreditService:
"""Service for managing credits"""
@staticmethod
def get_credit_cost(operation_type, amount=None):
"""
Get credit cost for operation.
Args:
operation_type: Type of operation (from CREDIT_COSTS)
amount: Optional amount (word count, image count, etc.)
Returns:
int: Number of credits required
Raises:
CreditCalculationError: If operation type is unknown
"""
base_cost = CREDIT_COSTS.get(operation_type, 0)
if base_cost == 0:
raise CreditCalculationError(f"Unknown operation type: {operation_type}")
# Variable cost operations
if operation_type == 'content_generation' and amount:
# Per 100 words
return max(1, int(base_cost * (amount / 100)))
elif operation_type == 'optimization' and amount:
# Per 200 words
return max(1, int(base_cost * (amount / 200)))
elif operation_type == 'image_generation' and amount:
# Per image
return base_cost * amount
elif operation_type == 'idea_generation' and amount:
# Per idea
return base_cost * amount
# Fixed cost operations
return base_cost
@staticmethod
def check_credits(account, operation_type, amount=None):
"""
Check if account has sufficient credits for an operation.
Args:
account: Account instance
operation_type: Type of operation
amount: Optional amount (word count, image count, etc.)
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
required = CreditService.get_credit_cost(operation_type, amount)
if account.credits < required:
raise InsufficientCreditsError(
f"Insufficient credits. Required: {required}, Available: {account.credits}"
)
return True
@staticmethod
def check_credits_legacy(account, required_credits):
"""
Legacy method: Check if account has enough credits (for backward compatibility).
Args:
account: Account instance
required_credits: Number of credits required
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
if account.credits < required_credits:
raise InsufficientCreditsError(
f"Insufficient credits. Required: {required_credits}, Available: {account.credits}"
)
@staticmethod
@transaction.atomic
def deduct_credits(account, amount, operation_type, description, metadata=None, cost_usd=None, model_used=None, tokens_input=None, tokens_output=None, related_object_type=None, related_object_id=None):
"""
Deduct credits and log transaction.
Args:
account: Account instance
amount: Number of credits to deduct
operation_type: Type of operation (from CreditUsageLog.OPERATION_TYPE_CHOICES)
description: Description of the transaction
metadata: Optional metadata dict
cost_usd: Optional cost in USD
model_used: Optional AI model used
tokens_input: Optional input tokens
tokens_output: Optional output tokens
related_object_type: Optional related object type
related_object_id: Optional related object ID
Returns:
int: New credit balance
"""
# Check sufficient credits (legacy: amount is already calculated)
CreditService.check_credits_legacy(account, amount)
# Deduct from account.credits
account.credits -= amount
account.save(update_fields=['credits'])
# Create CreditTransaction
CreditTransaction.objects.create(
account=account,
transaction_type='deduction',
amount=-amount, # Negative for deduction
balance_after=account.credits,
description=description,
metadata=metadata or {}
)
# Create CreditUsageLog
CreditUsageLog.objects.create(
account=account,
operation_type=operation_type,
credits_used=amount,
cost_usd=cost_usd,
model_used=model_used or '',
tokens_input=tokens_input,
tokens_output=tokens_output,
related_object_type=related_object_type or '',
related_object_id=related_object_id,
metadata=metadata or {}
)
return account.credits
@staticmethod
@transaction.atomic
def deduct_credits_for_operation(account, operation_type, amount=None, description=None, metadata=None, cost_usd=None, model_used=None, tokens_input=None, tokens_output=None, related_object_type=None, related_object_id=None):
"""
Deduct credits for an operation (convenience method that calculates cost automatically).
Args:
account: Account instance
operation_type: Type of operation
amount: Optional amount (word count, image count, etc.)
description: Optional description (auto-generated if not provided)
metadata: Optional metadata dict
cost_usd: Optional cost in USD
model_used: Optional AI model used
tokens_input: Optional input tokens
tokens_output: Optional output tokens
related_object_type: Optional related object type
related_object_id: Optional related object ID
Returns:
int: New credit balance
"""
# Calculate credit cost
credits_required = CreditService.get_credit_cost(operation_type, amount)
# Check sufficient credits
CreditService.check_credits(account, operation_type, amount)
# Auto-generate description if not provided
if not description:
if operation_type == 'clustering':
description = f"Clustering operation"
elif operation_type == 'idea_generation':
description = f"Generated {amount or 1} idea(s)"
elif operation_type == 'content_generation':
description = f"Generated content ({amount or 0} words)"
elif operation_type == 'image_generation':
description = f"Generated {amount or 1} image(s)"
else:
description = f"{operation_type} operation"
return CreditService.deduct_credits(
account=account,
amount=credits_required,
operation_type=operation_type,
description=description,
metadata=metadata,
cost_usd=cost_usd,
model_used=model_used,
tokens_input=tokens_input,
tokens_output=tokens_output,
related_object_type=related_object_type,
related_object_id=related_object_id
)
@staticmethod
@transaction.atomic
def add_credits(account, amount, transaction_type, description, metadata=None):
"""
Add credits (purchase, subscription, etc.).
Args:
account: Account instance
amount: Number of credits to add
transaction_type: Type of transaction (from CreditTransaction.TRANSACTION_TYPE_CHOICES)
description: Description of the transaction
metadata: Optional metadata dict
Returns:
int: New credit balance
"""
# Add to account.credits
account.credits += amount
account.save(update_fields=['credits'])
# Create CreditTransaction
CreditTransaction.objects.create(
account=account,
transaction_type=transaction_type,
amount=amount, # Positive for addition
balance_after=account.credits,
description=description,
metadata=metadata or {}
)
return account.credits
@staticmethod
def calculate_credits_for_operation(operation_type, **kwargs):
"""
Calculate credits needed for an operation.
Legacy method - use get_credit_cost() instead.
Args:
operation_type: Type of operation
**kwargs: Operation-specific parameters
Returns:
int: Number of credits required
Raises:
CreditCalculationError: If calculation fails
"""
# Map legacy operation types
if operation_type == 'ideas':
operation_type = 'idea_generation'
elif operation_type == 'content':
operation_type = 'content_generation'
elif operation_type == 'images':
operation_type = 'image_generation'
# Extract amount from kwargs
amount = None
if 'word_count' in kwargs:
amount = kwargs.get('word_count')
elif 'image_count' in kwargs:
amount = kwargs.get('image_count')
elif 'idea_count' in kwargs:
amount = kwargs.get('idea_count')
return CreditService.get_credit_cost(operation_type, amount)

View File

@@ -0,0 +1,4 @@
"""
Content business logic - Content, Tasks, Images models and services
"""

View File

@@ -0,0 +1,205 @@
from django.db import models
from django.core.validators import MinValueValidator
from igny8_core.auth.models import SiteSectorBaseModel
class Tasks(SiteSectorBaseModel):
"""Tasks model for content generation queue"""
STATUS_CHOICES = [
('queued', 'Queued'),
('completed', 'Completed'),
]
CONTENT_STRUCTURE_CHOICES = [
('cluster_hub', 'Cluster Hub'),
('landing_page', 'Landing Page'),
('pillar_page', 'Pillar Page'),
('supporting_page', 'Supporting Page'),
]
CONTENT_TYPE_CHOICES = [
('blog_post', 'Blog Post'),
('article', 'Article'),
('guide', 'Guide'),
('tutorial', 'Tutorial'),
]
title = models.CharField(max_length=255, db_index=True)
description = models.TextField(blank=True, null=True)
keywords = models.CharField(max_length=500, blank=True) # Comma-separated keywords (legacy)
cluster = models.ForeignKey(
'planning.Clusters',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='tasks',
limit_choices_to={'sector': models.F('sector')}
)
keyword_objects = models.ManyToManyField(
'planning.Keywords',
blank=True,
related_name='tasks',
help_text="Individual keywords linked to this task"
)
idea = models.ForeignKey(
'planning.ContentIdeas',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='tasks'
)
content_structure = models.CharField(max_length=50, choices=CONTENT_STRUCTURE_CHOICES, default='blog_post')
content_type = models.CharField(max_length=50, choices=CONTENT_TYPE_CHOICES, default='blog_post')
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='queued')
# Content fields
content = models.TextField(blank=True, null=True) # Generated content
word_count = models.IntegerField(default=0)
# SEO fields
meta_title = models.CharField(max_length=255, blank=True, null=True)
meta_description = models.TextField(blank=True, null=True)
# WordPress integration
assigned_post_id = models.IntegerField(null=True, blank=True) # WordPress post ID if published
post_url = models.URLField(blank=True, null=True) # WordPress post URL
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_tasks'
ordering = ['-created_at']
verbose_name = 'Task'
verbose_name_plural = 'Tasks'
indexes = [
models.Index(fields=['title']),
models.Index(fields=['status']),
models.Index(fields=['cluster']),
models.Index(fields=['content_type']),
models.Index(fields=['site', 'sector']),
]
def __str__(self):
return self.title
class Content(SiteSectorBaseModel):
"""
Content model for storing final AI-generated article content.
Separated from Task for content versioning and storage optimization.
"""
task = models.OneToOneField(
Tasks,
on_delete=models.CASCADE,
related_name='content_record',
help_text="The task this content belongs to"
)
html_content = models.TextField(help_text="Final AI-generated HTML content")
word_count = models.IntegerField(default=0, validators=[MinValueValidator(0)])
metadata = models.JSONField(default=dict, help_text="Additional metadata (SEO, structure, etc.)")
title = models.CharField(max_length=255, blank=True, null=True)
meta_title = models.CharField(max_length=255, blank=True, null=True)
meta_description = models.TextField(blank=True, null=True)
primary_keyword = models.CharField(max_length=255, blank=True, null=True)
secondary_keywords = models.JSONField(default=list, blank=True, help_text="List of secondary keywords")
tags = models.JSONField(default=list, blank=True, help_text="List of tags")
categories = models.JSONField(default=list, blank=True, help_text="List of categories")
STATUS_CHOICES = [
('draft', 'Draft'),
('review', 'Review'),
('publish', 'Publish'),
]
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='draft', help_text="Content workflow status (draft, review, publish)")
generated_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_content'
ordering = ['-generated_at']
verbose_name = 'Content'
verbose_name_plural = 'Contents'
indexes = [
models.Index(fields=['task']),
models.Index(fields=['generated_at']),
]
def save(self, *args, **kwargs):
"""Automatically set account, site, and sector from task"""
if self.task:
self.account = self.task.account
self.site = self.task.site
self.sector = self.task.sector
super().save(*args, **kwargs)
def __str__(self):
return f"Content for {self.task.title}"
class Images(SiteSectorBaseModel):
"""Images model for content-related images (featured, desktop, mobile, in-article)"""
IMAGE_TYPE_CHOICES = [
('featured', 'Featured Image'),
('desktop', 'Desktop Image'),
('mobile', 'Mobile Image'),
('in_article', 'In-Article Image'),
]
content = models.ForeignKey(
Content,
on_delete=models.CASCADE,
related_name='images',
null=True,
blank=True,
help_text="The content this image belongs to (preferred)"
)
task = models.ForeignKey(
Tasks,
on_delete=models.CASCADE,
related_name='images',
null=True,
blank=True,
help_text="The task this image belongs to (legacy, use content instead)"
)
image_type = models.CharField(max_length=50, choices=IMAGE_TYPE_CHOICES, default='featured')
image_url = models.CharField(max_length=500, blank=True, null=True, help_text="URL of the generated/stored image")
image_path = models.CharField(max_length=500, blank=True, null=True, help_text="Local path if stored locally")
prompt = models.TextField(blank=True, null=True, help_text="Image generation prompt used")
status = models.CharField(max_length=50, default='pending', help_text="Status: pending, generated, failed")
position = models.IntegerField(default=0, help_text="Position for in-article images ordering")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_images'
ordering = ['content', 'position', '-created_at']
verbose_name = 'Image'
verbose_name_plural = 'Images'
indexes = [
models.Index(fields=['content', 'image_type']),
models.Index(fields=['task', 'image_type']),
models.Index(fields=['status']),
models.Index(fields=['content', 'position']),
models.Index(fields=['task', 'position']),
]
def save(self, *args, **kwargs):
"""Automatically set account, site, and sector from content or task"""
# Prefer content over task
if self.content:
self.account = self.content.account
self.site = self.content.site
self.sector = self.content.sector
elif self.task:
self.account = self.task.account
self.site = self.task.site
self.sector = self.task.sector
super().save(*args, **kwargs)
def __str__(self):
content_title = self.content.title if self.content else None
task_title = self.task.title if self.task else None
title = content_title or task_title or 'Unknown'
return f"{title} - {self.image_type}"

View File

@@ -0,0 +1,4 @@
"""
Content services
"""

View File

@@ -0,0 +1,75 @@
"""
Content Generation Service
Handles content generation business logic
"""
import logging
from igny8_core.business.content.models import Tasks
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
logger = logging.getLogger(__name__)
class ContentGenerationService:
"""Service for content generation operations"""
def __init__(self):
self.credit_service = CreditService()
def generate_content(self, task_ids, account):
"""
Generate content for tasks.
Args:
task_ids: List of task IDs
account: Account instance
Returns:
dict: Result with success status and data
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
# Get tasks
tasks = Tasks.objects.filter(id__in=task_ids, account=account)
# Calculate estimated credits needed
total_word_count = sum(task.word_count or 1000 for task in tasks)
# Check credits
try:
self.credit_service.check_credits(account, 'content_generation', total_word_count)
except InsufficientCreditsError:
raise
# Delegate to AI task (actual generation happens in Celery)
from igny8_core.ai.tasks import run_ai_task
try:
if hasattr(run_ai_task, 'delay'):
# Celery available - queue async
task = run_ai_task.delay(
function_name='generate_content',
payload={'ids': task_ids},
account_id=account.id
)
return {
'success': True,
'task_id': str(task.id),
'message': 'Content generation started'
}
else:
# Celery not available - execute synchronously
result = run_ai_task(
function_name='generate_content',
payload={'ids': task_ids},
account_id=account.id
)
return result
except Exception as e:
logger.error(f"Error in generate_content: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}

View File

@@ -0,0 +1,4 @@
"""
Planning business logic - Keywords, Clusters, ContentIdeas models and services
"""

View File

@@ -0,0 +1,195 @@
from django.db import models
from igny8_core.auth.models import SiteSectorBaseModel, SeedKeyword
class Clusters(SiteSectorBaseModel):
"""Clusters model for keyword grouping"""
name = models.CharField(max_length=255, unique=True, db_index=True)
description = models.TextField(blank=True, null=True)
keywords_count = models.IntegerField(default=0)
volume = models.IntegerField(default=0)
mapped_pages = models.IntegerField(default=0)
status = models.CharField(max_length=50, default='active')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_clusters'
ordering = ['name']
verbose_name = 'Cluster'
verbose_name_plural = 'Clusters'
indexes = [
models.Index(fields=['name']),
models.Index(fields=['status']),
models.Index(fields=['site', 'sector']),
]
def __str__(self):
return self.name
class Keywords(SiteSectorBaseModel):
"""
Keywords model for SEO keyword management.
Site-specific instances that reference global SeedKeywords.
"""
STATUS_CHOICES = [
('active', 'Active'),
('pending', 'Pending'),
('archived', 'Archived'),
]
# Required: Link to global SeedKeyword
seed_keyword = models.ForeignKey(
SeedKeyword,
on_delete=models.PROTECT, # Prevent deletion if Keywords reference it
related_name='site_keywords',
help_text="Reference to the global seed keyword"
)
# Site-specific overrides (optional)
volume_override = models.IntegerField(
null=True,
blank=True,
help_text="Site-specific volume override (uses seed_keyword.volume if not set)"
)
difficulty_override = models.IntegerField(
null=True,
blank=True,
help_text="Site-specific difficulty override (uses seed_keyword.difficulty if not set)"
)
cluster = models.ForeignKey(
'Clusters',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='keywords',
limit_choices_to={'sector': models.F('sector')} # Cluster must be in same sector
)
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='pending')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_keywords'
ordering = ['-created_at']
verbose_name = 'Keyword'
verbose_name_plural = 'Keywords'
unique_together = [['seed_keyword', 'site', 'sector']] # One keyword per site/sector
indexes = [
models.Index(fields=['seed_keyword']),
models.Index(fields=['status']),
models.Index(fields=['cluster']),
models.Index(fields=['site', 'sector']),
models.Index(fields=['seed_keyword', 'site', 'sector']),
]
@property
def keyword(self):
"""Get keyword text from seed_keyword"""
return self.seed_keyword.keyword if self.seed_keyword else ''
@property
def volume(self):
"""Get volume from override or seed_keyword"""
return self.volume_override if self.volume_override is not None else (self.seed_keyword.volume if self.seed_keyword else 0)
@property
def difficulty(self):
"""Get difficulty from override or seed_keyword"""
return self.difficulty_override if self.difficulty_override is not None else (self.seed_keyword.difficulty if self.seed_keyword else 0)
@property
def intent(self):
"""Get intent from seed_keyword"""
return self.seed_keyword.intent if self.seed_keyword else 'informational'
def save(self, *args, **kwargs):
"""Validate that seed_keyword's industry/sector matches site's industry/sector"""
if self.seed_keyword and self.site and self.sector:
# Validate industry match
if self.site.industry != self.seed_keyword.industry:
from django.core.exceptions import ValidationError
raise ValidationError(
f"SeedKeyword industry ({self.seed_keyword.industry.name}) must match site industry ({self.site.industry.name})"
)
# Validate sector match (site sector's industry_sector must match seed_keyword's sector)
if self.sector.industry_sector != self.seed_keyword.sector:
from django.core.exceptions import ValidationError
raise ValidationError(
f"SeedKeyword sector ({self.seed_keyword.sector.name}) must match site sector's industry sector ({self.sector.industry_sector.name if self.sector.industry_sector else 'None'})"
)
super().save(*args, **kwargs)
def __str__(self):
return self.keyword
class ContentIdeas(SiteSectorBaseModel):
"""Content Ideas model for planning content based on keyword clusters"""
STATUS_CHOICES = [
('new', 'New'),
('scheduled', 'Scheduled'),
('published', 'Published'),
]
CONTENT_STRUCTURE_CHOICES = [
('cluster_hub', 'Cluster Hub'),
('landing_page', 'Landing Page'),
('pillar_page', 'Pillar Page'),
('supporting_page', 'Supporting Page'),
]
CONTENT_TYPE_CHOICES = [
('blog_post', 'Blog Post'),
('article', 'Article'),
('guide', 'Guide'),
('tutorial', 'Tutorial'),
]
idea_title = models.CharField(max_length=255, db_index=True)
description = models.TextField(blank=True, null=True)
content_structure = models.CharField(max_length=50, choices=CONTENT_STRUCTURE_CHOICES, default='blog_post')
content_type = models.CharField(max_length=50, choices=CONTENT_TYPE_CHOICES, default='blog_post')
target_keywords = models.CharField(max_length=500, blank=True) # Comma-separated keywords (legacy)
keyword_objects = models.ManyToManyField(
'Keywords',
blank=True,
related_name='content_ideas',
help_text="Individual keywords linked to this content idea"
)
keyword_cluster = models.ForeignKey(
Clusters,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='ideas',
limit_choices_to={'sector': models.F('sector')}
)
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='new')
estimated_word_count = models.IntegerField(default=1000)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_content_ideas'
ordering = ['-created_at']
verbose_name = 'Content Idea'
verbose_name_plural = 'Content Ideas'
indexes = [
models.Index(fields=['idea_title']),
models.Index(fields=['status']),
models.Index(fields=['keyword_cluster']),
models.Index(fields=['content_structure']),
models.Index(fields=['site', 'sector']),
]
def __str__(self):
return self.idea_title

View File

@@ -0,0 +1,4 @@
"""
Planning services
"""

View File

@@ -0,0 +1,88 @@
"""
Clustering Service
Handles keyword clustering business logic
"""
import logging
from igny8_core.business.planning.models import Keywords
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
logger = logging.getLogger(__name__)
class ClusteringService:
"""Service for keyword clustering operations"""
def __init__(self):
self.credit_service = CreditService()
def cluster_keywords(self, keyword_ids, account, sector_id=None):
"""
Cluster keywords using AI.
Args:
keyword_ids: List of keyword IDs
account: Account instance
sector_id: Optional sector ID
Returns:
dict: Result with success status and data
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
# Validate input
if not keyword_ids:
return {
'success': False,
'error': 'No keyword IDs provided'
}
if len(keyword_ids) > 20:
return {
'success': False,
'error': 'Maximum 20 keywords allowed for clustering'
}
# Check credits (fixed cost per clustering operation)
try:
self.credit_service.check_credits(account, 'clustering')
except InsufficientCreditsError:
raise
# Delegate to AI task
from igny8_core.ai.tasks import run_ai_task
payload = {
'ids': keyword_ids,
'sector_id': sector_id
}
try:
if hasattr(run_ai_task, 'delay'):
# Celery available - queue async
task = run_ai_task.delay(
function_name='auto_cluster',
payload=payload,
account_id=account.id
)
return {
'success': True,
'task_id': str(task.id),
'message': 'Clustering started'
}
else:
# Celery not available - execute synchronously
result = run_ai_task(
function_name='auto_cluster',
payload=payload,
account_id=account.id
)
return result
except Exception as e:
logger.error(f"Error in cluster_keywords: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}

View File

@@ -0,0 +1,90 @@
"""
Ideas Service
Handles content ideas generation business logic
"""
import logging
from igny8_core.business.planning.models import Clusters
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
logger = logging.getLogger(__name__)
class IdeasService:
"""Service for content ideas generation operations"""
def __init__(self):
self.credit_service = CreditService()
def generate_ideas(self, cluster_ids, account):
"""
Generate content ideas from clusters.
Args:
cluster_ids: List of cluster IDs
account: Account instance
Returns:
dict: Result with success status and data
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
# Validate input
if not cluster_ids:
return {
'success': False,
'error': 'No cluster IDs provided'
}
if len(cluster_ids) > 10:
return {
'success': False,
'error': 'Maximum 10 clusters allowed for idea generation'
}
# Get clusters to count ideas
clusters = Clusters.objects.filter(id__in=cluster_ids, account=account)
idea_count = len(cluster_ids)
# Check credits
try:
self.credit_service.check_credits(account, 'idea_generation', idea_count)
except InsufficientCreditsError:
raise
# Delegate to AI task
from igny8_core.ai.tasks import run_ai_task
payload = {
'ids': cluster_ids
}
try:
if hasattr(run_ai_task, 'delay'):
# Celery available - queue async
task = run_ai_task.delay(
function_name='auto_generate_ideas',
payload=payload,
account_id=account.id
)
return {
'success': True,
'task_id': str(task.id),
'message': 'Idea generation started'
}
else:
# Celery not available - execute synchronously
result = run_ai_task(
function_name='auto_generate_ideas',
payload=payload,
account_id=account.id
)
return result
except Exception as e:
logger.error(f"Error in generate_ideas: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}

View File

@@ -25,6 +25,10 @@ app.conf.beat_schedule = {
'task': 'igny8_core.modules.billing.tasks.replenish_monthly_credits',
'schedule': crontab(hour=0, minute=0, day_of_month=1), # First day of month at midnight
},
'execute-scheduled-automation-rules': {
'task': 'igny8_core.business.automation.tasks.execute_scheduled_automation_rules',
'schedule': crontab(minute='*/5'), # Every 5 minutes
},
}
@app.task(bind=True, ignore_result=True)

View File

@@ -0,0 +1,5 @@
"""
Automation Module - API Layer
Business logic is in business/automation/
"""

View File

@@ -0,0 +1,36 @@
"""
Serializers for Automation Models
"""
from rest_framework import serializers
from igny8_core.business.automation.models import AutomationRule, ScheduledTask
class AutomationRuleSerializer(serializers.ModelSerializer):
"""Serializer for AutomationRule model"""
class Meta:
model = AutomationRule
fields = [
'id', 'name', 'description', 'trigger', 'schedule',
'conditions', 'actions', 'is_active', 'status',
'last_executed_at', 'execution_count',
'metadata', 'created_at', 'updated_at',
'account', 'site', 'sector'
]
read_only_fields = ['id', 'created_at', 'updated_at', 'last_executed_at', 'execution_count']
class ScheduledTaskSerializer(serializers.ModelSerializer):
"""Serializer for ScheduledTask model"""
automation_rule_name = serializers.CharField(source='automation_rule.name', read_only=True)
class Meta:
model = ScheduledTask
fields = [
'id', 'automation_rule', 'automation_rule_name',
'scheduled_at', 'executed_at', 'status',
'result', 'error_message', 'metadata',
'created_at', 'updated_at', 'account'
]
read_only_fields = ['id', 'created_at', 'updated_at', 'executed_at']

View File

@@ -0,0 +1,15 @@
"""
URL patterns for automation module.
"""
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .views import AutomationRuleViewSet, ScheduledTaskViewSet
router = DefaultRouter()
router.register(r'rules', AutomationRuleViewSet, basename='automation-rule')
router.register(r'scheduled-tasks', ScheduledTaskViewSet, basename='scheduled-task')
urlpatterns = [
path('', include(router.urls)),
]

View File

@@ -0,0 +1,92 @@
"""
ViewSets for Automation Models
Unified API Standard v1.0 compliant
"""
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
from rest_framework import filters
from drf_spectacular.utils import extend_schema, extend_schema_view
from igny8_core.api.base import SiteSectorModelViewSet, AccountModelViewSet
from igny8_core.api.pagination import CustomPageNumberPagination
from igny8_core.api.response import success_response, error_response
from igny8_core.api.throttles import DebugScopedRateThrottle
from igny8_core.api.permissions import IsAuthenticatedAndActive, IsViewerOrAbove
from igny8_core.business.automation.models import AutomationRule, ScheduledTask
from igny8_core.business.automation.services.automation_service import AutomationService
from .serializers import AutomationRuleSerializer, ScheduledTaskSerializer
@extend_schema_view(
list=extend_schema(tags=['Automation']),
create=extend_schema(tags=['Automation']),
retrieve=extend_schema(tags=['Automation']),
update=extend_schema(tags=['Automation']),
partial_update=extend_schema(tags=['Automation']),
destroy=extend_schema(tags=['Automation']),
)
class AutomationRuleViewSet(SiteSectorModelViewSet):
"""
ViewSet for managing automation rules
Unified API Standard v1.0 compliant
"""
queryset = AutomationRule.objects.all()
serializer_class = AutomationRuleSerializer
permission_classes = [IsAuthenticatedAndActive, IsViewerOrAbove]
pagination_class = CustomPageNumberPagination
throttle_scope = 'automation'
throttle_classes = [DebugScopedRateThrottle]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
search_fields = ['name', 'description']
ordering_fields = ['name', 'created_at', 'last_executed_at', 'execution_count']
ordering = ['-created_at']
filterset_fields = ['trigger', 'is_active', 'status']
@action(detail=True, methods=['post'], url_path='execute', url_name='execute')
def execute(self, request, pk=None):
"""Manually execute an automation rule"""
rule = self.get_object()
service = AutomationService()
try:
result = service.execute_rule(rule, context=request.data.get('context', {}))
return success_response(
data=result,
message='Rule executed successfully',
request=request
)
except Exception as e:
return error_response(
error=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@extend_schema_view(
list=extend_schema(tags=['Automation']),
create=extend_schema(tags=['Automation']),
retrieve=extend_schema(tags=['Automation']),
update=extend_schema(tags=['Automation']),
partial_update=extend_schema(tags=['Automation']),
destroy=extend_schema(tags=['Automation']),
)
class ScheduledTaskViewSet(AccountModelViewSet):
"""
ViewSet for managing scheduled tasks
Unified API Standard v1.0 compliant
"""
queryset = ScheduledTask.objects.select_related('automation_rule')
serializer_class = ScheduledTaskSerializer
permission_classes = [IsAuthenticatedAndActive, IsViewerOrAbove]
pagination_class = CustomPageNumberPagination
throttle_scope = 'automation'
throttle_classes = [DebugScopedRateThrottle]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
ordering_fields = ['scheduled_at', 'executed_at', 'status', 'created_at']
ordering = ['-scheduled_at']
filterset_fields = ['automation_rule', 'status']

View File

@@ -1,21 +1,4 @@
"""
Credit Cost Constants
Phase 0: Credit-only system costs per operation
"""
CREDIT_COSTS = {
'clustering': 10, # Per clustering request
'idea_generation': 15, # Per cluster → ideas request
'content_generation': 1, # Per 100 words
'image_prompt_extraction': 2, # Per content piece
'image_generation': 5, # Per image
'linking': 8, # Per content piece (NEW)
'optimization': 1, # Per 200 words (NEW)
'site_structure_generation': 50, # Per site blueprint (NEW)
'site_page_generation': 20, # Per page (NEW)
# Legacy operation types (for backward compatibility)
'ideas': 15, # Alias for idea_generation
'content': 3, # Legacy: 3 credits per content piece
'images': 5, # Alias for image_generation
'reparse': 1, # Per reparse
}
# Backward compatibility alias - constants moved to business/billing/
from igny8_core.business.billing.constants import CREDIT_COSTS
__all__ = ['CREDIT_COSTS']

View File

@@ -1,14 +1,4 @@
"""
Billing Exceptions
"""
class InsufficientCreditsError(Exception):
"""Raised when account doesn't have enough credits"""
pass
class CreditCalculationError(Exception):
"""Raised when credit calculation fails"""
pass
# Backward compatibility aliases - exceptions moved to business/billing/
from igny8_core.business.billing.exceptions import InsufficientCreditsError, CreditCalculationError
__all__ = ['InsufficientCreditsError', 'CreditCalculationError']

View File

@@ -1,72 +1,4 @@
"""
Billing Models for Credit System
"""
from django.db import models
from django.core.validators import MinValueValidator
from igny8_core.auth.models import AccountBaseModel
class CreditTransaction(AccountBaseModel):
"""Track all credit transactions (additions, deductions)"""
TRANSACTION_TYPE_CHOICES = [
('purchase', 'Purchase'),
('subscription', 'Subscription Renewal'),
('refund', 'Refund'),
('deduction', 'Usage Deduction'),
('adjustment', 'Manual Adjustment'),
]
transaction_type = models.CharField(max_length=20, choices=TRANSACTION_TYPE_CHOICES, db_index=True)
amount = models.IntegerField(help_text="Positive for additions, negative for deductions")
balance_after = models.IntegerField(help_text="Credit balance after this transaction")
description = models.CharField(max_length=255)
metadata = models.JSONField(default=dict, help_text="Additional context (AI call details, etc.)")
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'igny8_credit_transactions'
ordering = ['-created_at']
indexes = [
models.Index(fields=['account', 'transaction_type']),
models.Index(fields=['account', 'created_at']),
]
def __str__(self):
account = getattr(self, 'account', None)
return f"{self.get_transaction_type_display()} - {self.amount} credits - {account.name if account else 'No Account'}"
class CreditUsageLog(AccountBaseModel):
"""Detailed log of credit usage per AI operation"""
OPERATION_TYPE_CHOICES = [
('clustering', 'Keyword Clustering'),
('ideas', 'Content Ideas Generation'),
('content', 'Content Generation'),
('images', 'Image Generation'),
('reparse', 'Content Reparse'),
]
operation_type = models.CharField(max_length=50, choices=OPERATION_TYPE_CHOICES, db_index=True)
credits_used = models.IntegerField(validators=[MinValueValidator(0)])
cost_usd = models.DecimalField(max_digits=10, decimal_places=4, null=True, blank=True)
model_used = models.CharField(max_length=100, blank=True)
tokens_input = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0)])
tokens_output = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0)])
related_object_type = models.CharField(max_length=50, blank=True) # 'keyword', 'cluster', 'task'
related_object_id = models.IntegerField(null=True, blank=True)
metadata = models.JSONField(default=dict)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'igny8_credit_usage_logs'
ordering = ['-created_at']
indexes = [
models.Index(fields=['account', 'operation_type']),
models.Index(fields=['account', 'created_at']),
models.Index(fields=['account', 'operation_type', 'created_at']),
]
def __str__(self):
account = getattr(self, 'account', None)
return f"{self.get_operation_type_display()} - {self.credits_used} credits - {account.name if account else 'No Account'}"
# Backward compatibility aliases - models moved to business/billing/
from igny8_core.business.billing.models import CreditTransaction, CreditUsageLog
__all__ = ['CreditTransaction', 'CreditUsageLog']

View File

@@ -1,264 +1,4 @@
"""
Credit Service for managing credit transactions and deductions
"""
from django.db import transaction
from django.utils import timezone
from .models import CreditTransaction, CreditUsageLog
from .constants import CREDIT_COSTS
from .exceptions import InsufficientCreditsError, CreditCalculationError
from igny8_core.auth.models import Account
class CreditService:
"""Service for managing credits"""
@staticmethod
def get_credit_cost(operation_type, amount=None):
"""
Get credit cost for operation.
Args:
operation_type: Type of operation (from CREDIT_COSTS)
amount: Optional amount (word count, image count, etc.)
Returns:
int: Number of credits required
Raises:
CreditCalculationError: If operation type is unknown
"""
base_cost = CREDIT_COSTS.get(operation_type, 0)
if base_cost == 0:
raise CreditCalculationError(f"Unknown operation type: {operation_type}")
# Variable cost operations
if operation_type == 'content_generation' and amount:
# Per 100 words
return max(1, int(base_cost * (amount / 100)))
elif operation_type == 'optimization' and amount:
# Per 200 words
return max(1, int(base_cost * (amount / 200)))
elif operation_type == 'image_generation' and amount:
# Per image
return base_cost * amount
elif operation_type == 'idea_generation' and amount:
# Per idea
return base_cost * amount
# Fixed cost operations
return base_cost
@staticmethod
def check_credits(account, operation_type, amount=None):
"""
Check if account has sufficient credits for an operation.
Args:
account: Account instance
operation_type: Type of operation
amount: Optional amount (word count, image count, etc.)
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
required = CreditService.get_credit_cost(operation_type, amount)
if account.credits < required:
raise InsufficientCreditsError(
f"Insufficient credits. Required: {required}, Available: {account.credits}"
)
return True
@staticmethod
def check_credits_legacy(account, required_credits):
"""
Legacy method: Check if account has enough credits (for backward compatibility).
Args:
account: Account instance
required_credits: Number of credits required
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
if account.credits < required_credits:
raise InsufficientCreditsError(
f"Insufficient credits. Required: {required_credits}, Available: {account.credits}"
)
@staticmethod
@transaction.atomic
def deduct_credits(account, amount, operation_type, description, metadata=None, cost_usd=None, model_used=None, tokens_input=None, tokens_output=None, related_object_type=None, related_object_id=None):
"""
Deduct credits and log transaction.
Args:
account: Account instance
amount: Number of credits to deduct
operation_type: Type of operation (from CreditUsageLog.OPERATION_TYPE_CHOICES)
description: Description of the transaction
metadata: Optional metadata dict
cost_usd: Optional cost in USD
model_used: Optional AI model used
tokens_input: Optional input tokens
tokens_output: Optional output tokens
related_object_type: Optional related object type
related_object_id: Optional related object ID
Returns:
int: New credit balance
"""
# Check sufficient credits (legacy: amount is already calculated)
CreditService.check_credits_legacy(account, amount)
# Deduct from account.credits
account.credits -= amount
account.save(update_fields=['credits'])
# Create CreditTransaction
CreditTransaction.objects.create(
account=account,
transaction_type='deduction',
amount=-amount, # Negative for deduction
balance_after=account.credits,
description=description,
metadata=metadata or {}
)
# Create CreditUsageLog
CreditUsageLog.objects.create(
account=account,
operation_type=operation_type,
credits_used=amount,
cost_usd=cost_usd,
model_used=model_used or '',
tokens_input=tokens_input,
tokens_output=tokens_output,
related_object_type=related_object_type or '',
related_object_id=related_object_id,
metadata=metadata or {}
)
return account.credits
@staticmethod
@transaction.atomic
def deduct_credits_for_operation(account, operation_type, amount=None, description=None, metadata=None, cost_usd=None, model_used=None, tokens_input=None, tokens_output=None, related_object_type=None, related_object_id=None):
"""
Deduct credits for an operation (convenience method that calculates cost automatically).
Args:
account: Account instance
operation_type: Type of operation
amount: Optional amount (word count, image count, etc.)
description: Optional description (auto-generated if not provided)
metadata: Optional metadata dict
cost_usd: Optional cost in USD
model_used: Optional AI model used
tokens_input: Optional input tokens
tokens_output: Optional output tokens
related_object_type: Optional related object type
related_object_id: Optional related object ID
Returns:
int: New credit balance
"""
# Calculate credit cost
credits_required = CreditService.get_credit_cost(operation_type, amount)
# Check sufficient credits
CreditService.check_credits(account, operation_type, amount)
# Auto-generate description if not provided
if not description:
if operation_type == 'clustering':
description = f"Clustering operation"
elif operation_type == 'idea_generation':
description = f"Generated {amount or 1} idea(s)"
elif operation_type == 'content_generation':
description = f"Generated content ({amount or 0} words)"
elif operation_type == 'image_generation':
description = f"Generated {amount or 1} image(s)"
else:
description = f"{operation_type} operation"
return CreditService.deduct_credits(
account=account,
amount=credits_required,
operation_type=operation_type,
description=description,
metadata=metadata,
cost_usd=cost_usd,
model_used=model_used,
tokens_input=tokens_input,
tokens_output=tokens_output,
related_object_type=related_object_type,
related_object_id=related_object_id
)
@staticmethod
@transaction.atomic
def add_credits(account, amount, transaction_type, description, metadata=None):
"""
Add credits (purchase, subscription, etc.).
Args:
account: Account instance
amount: Number of credits to add
transaction_type: Type of transaction (from CreditTransaction.TRANSACTION_TYPE_CHOICES)
description: Description of the transaction
metadata: Optional metadata dict
Returns:
int: New credit balance
"""
# Add to account.credits
account.credits += amount
account.save(update_fields=['credits'])
# Create CreditTransaction
CreditTransaction.objects.create(
account=account,
transaction_type=transaction_type,
amount=amount, # Positive for addition
balance_after=account.credits,
description=description,
metadata=metadata or {}
)
return account.credits
@staticmethod
def calculate_credits_for_operation(operation_type, **kwargs):
"""
Calculate credits needed for an operation.
Legacy method - use get_credit_cost() instead.
Args:
operation_type: Type of operation
**kwargs: Operation-specific parameters
Returns:
int: Number of credits required
Raises:
CreditCalculationError: If calculation fails
"""
# Map legacy operation types
if operation_type == 'ideas':
operation_type = 'idea_generation'
elif operation_type == 'content':
operation_type = 'content_generation'
elif operation_type == 'images':
operation_type = 'image_generation'
# Extract amount from kwargs
amount = None
if 'word_count' in kwargs:
amount = kwargs.get('word_count')
elif 'image_count' in kwargs:
amount = kwargs.get('image_count')
elif 'idea_count' in kwargs:
amount = kwargs.get('idea_count')
return CreditService.get_credit_cost(operation_type, amount)
# Backward compatibility alias - service moved to business/billing/services/
from igny8_core.business.billing.services.credit_service import CreditService
__all__ = ['CreditService']

View File

@@ -1,194 +1,4 @@
from django.db import models
from igny8_core.auth.models import SiteSectorBaseModel, SeedKeyword
# Backward compatibility aliases - models moved to business/planning/
from igny8_core.business.planning.models import Keywords, Clusters, ContentIdeas
class Clusters(SiteSectorBaseModel):
"""Clusters model for keyword grouping"""
name = models.CharField(max_length=255, unique=True, db_index=True)
description = models.TextField(blank=True, null=True)
keywords_count = models.IntegerField(default=0)
volume = models.IntegerField(default=0)
mapped_pages = models.IntegerField(default=0)
status = models.CharField(max_length=50, default='active')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_clusters'
ordering = ['name']
verbose_name = 'Cluster'
verbose_name_plural = 'Clusters'
indexes = [
models.Index(fields=['name']),
models.Index(fields=['status']),
models.Index(fields=['site', 'sector']),
]
def __str__(self):
return self.name
class Keywords(SiteSectorBaseModel):
"""
Keywords model for SEO keyword management.
Site-specific instances that reference global SeedKeywords.
"""
STATUS_CHOICES = [
('active', 'Active'),
('pending', 'Pending'),
('archived', 'Archived'),
]
# Required: Link to global SeedKeyword
seed_keyword = models.ForeignKey(
SeedKeyword,
on_delete=models.PROTECT, # Prevent deletion if Keywords reference it
related_name='site_keywords',
help_text="Reference to the global seed keyword"
)
# Site-specific overrides (optional)
volume_override = models.IntegerField(
null=True,
blank=True,
help_text="Site-specific volume override (uses seed_keyword.volume if not set)"
)
difficulty_override = models.IntegerField(
null=True,
blank=True,
help_text="Site-specific difficulty override (uses seed_keyword.difficulty if not set)"
)
cluster = models.ForeignKey(
Clusters,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='keywords',
limit_choices_to={'sector': models.F('sector')} # Cluster must be in same sector
)
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='pending')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_keywords'
ordering = ['-created_at']
verbose_name = 'Keyword'
verbose_name_plural = 'Keywords'
unique_together = [['seed_keyword', 'site', 'sector']] # One keyword per site/sector
indexes = [
models.Index(fields=['seed_keyword']),
models.Index(fields=['status']),
models.Index(fields=['cluster']),
models.Index(fields=['site', 'sector']),
models.Index(fields=['seed_keyword', 'site', 'sector']),
]
@property
def keyword(self):
"""Get keyword text from seed_keyword"""
return self.seed_keyword.keyword if self.seed_keyword else ''
@property
def volume(self):
"""Get volume from override or seed_keyword"""
return self.volume_override if self.volume_override is not None else (self.seed_keyword.volume if self.seed_keyword else 0)
@property
def difficulty(self):
"""Get difficulty from override or seed_keyword"""
return self.difficulty_override if self.difficulty_override is not None else (self.seed_keyword.difficulty if self.seed_keyword else 0)
@property
def intent(self):
"""Get intent from seed_keyword"""
return self.seed_keyword.intent if self.seed_keyword else 'informational'
def save(self, *args, **kwargs):
"""Validate that seed_keyword's industry/sector matches site's industry/sector"""
if self.seed_keyword and self.site and self.sector:
# Validate industry match
if self.site.industry != self.seed_keyword.industry:
from django.core.exceptions import ValidationError
raise ValidationError(
f"SeedKeyword industry ({self.seed_keyword.industry.name}) must match site industry ({self.site.industry.name})"
)
# Validate sector match (site sector's industry_sector must match seed_keyword's sector)
if self.sector.industry_sector != self.seed_keyword.sector:
from django.core.exceptions import ValidationError
raise ValidationError(
f"SeedKeyword sector ({self.seed_keyword.sector.name}) must match site sector's industry sector ({self.sector.industry_sector.name if self.sector.industry_sector else 'None'})"
)
super().save(*args, **kwargs)
def __str__(self):
return self.keyword
class ContentIdeas(SiteSectorBaseModel):
"""Content Ideas model for planning content based on keyword clusters"""
STATUS_CHOICES = [
('new', 'New'),
('scheduled', 'Scheduled'),
('published', 'Published'),
]
CONTENT_STRUCTURE_CHOICES = [
('cluster_hub', 'Cluster Hub'),
('landing_page', 'Landing Page'),
('pillar_page', 'Pillar Page'),
('supporting_page', 'Supporting Page'),
]
CONTENT_TYPE_CHOICES = [
('blog_post', 'Blog Post'),
('article', 'Article'),
('guide', 'Guide'),
('tutorial', 'Tutorial'),
]
idea_title = models.CharField(max_length=255, db_index=True)
description = models.TextField(blank=True, null=True)
content_structure = models.CharField(max_length=50, choices=CONTENT_STRUCTURE_CHOICES, default='blog_post')
content_type = models.CharField(max_length=50, choices=CONTENT_TYPE_CHOICES, default='blog_post')
target_keywords = models.CharField(max_length=500, blank=True) # Comma-separated keywords (legacy)
keyword_objects = models.ManyToManyField(
'Keywords',
blank=True,
related_name='content_ideas',
help_text="Individual keywords linked to this content idea"
)
keyword_cluster = models.ForeignKey(
Clusters,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='ideas',
limit_choices_to={'sector': models.F('sector')}
)
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='new')
estimated_word_count = models.IntegerField(default=1000)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_content_ideas'
ordering = ['-created_at']
verbose_name = 'Content Idea'
verbose_name_plural = 'Content Ideas'
indexes = [
models.Index(fields=['idea_title']),
models.Index(fields=['status']),
models.Index(fields=['keyword_cluster']),
models.Index(fields=['content_structure']),
models.Index(fields=['site', 'sector']),
]
def __str__(self):
return self.idea_title
__all__ = ['Keywords', 'Clusters', 'ContentIdeas']

View File

@@ -1,206 +1,4 @@
from django.db import models
from django.core.validators import MinValueValidator
from igny8_core.auth.models import SiteSectorBaseModel
from igny8_core.modules.planner.models import Clusters, ContentIdeas, Keywords
class Tasks(SiteSectorBaseModel):
"""Tasks model for content generation queue"""
STATUS_CHOICES = [
('queued', 'Queued'),
('completed', 'Completed'),
]
CONTENT_STRUCTURE_CHOICES = [
('cluster_hub', 'Cluster Hub'),
('landing_page', 'Landing Page'),
('pillar_page', 'Pillar Page'),
('supporting_page', 'Supporting Page'),
]
CONTENT_TYPE_CHOICES = [
('blog_post', 'Blog Post'),
('article', 'Article'),
('guide', 'Guide'),
('tutorial', 'Tutorial'),
]
title = models.CharField(max_length=255, db_index=True)
description = models.TextField(blank=True, null=True)
keywords = models.CharField(max_length=500, blank=True) # Comma-separated keywords (legacy)
cluster = models.ForeignKey(
Clusters,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='tasks',
limit_choices_to={'sector': models.F('sector')}
)
keyword_objects = models.ManyToManyField(
Keywords,
blank=True,
related_name='tasks',
help_text="Individual keywords linked to this task"
)
idea = models.ForeignKey(
ContentIdeas,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='tasks'
)
content_structure = models.CharField(max_length=50, choices=CONTENT_STRUCTURE_CHOICES, default='blog_post')
content_type = models.CharField(max_length=50, choices=CONTENT_TYPE_CHOICES, default='blog_post')
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='queued')
# Content fields
content = models.TextField(blank=True, null=True) # Generated content
word_count = models.IntegerField(default=0)
# SEO fields
meta_title = models.CharField(max_length=255, blank=True, null=True)
meta_description = models.TextField(blank=True, null=True)
# WordPress integration
assigned_post_id = models.IntegerField(null=True, blank=True) # WordPress post ID if published
post_url = models.URLField(blank=True, null=True) # WordPress post URL
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_tasks'
ordering = ['-created_at']
verbose_name = 'Task'
verbose_name_plural = 'Tasks'
indexes = [
models.Index(fields=['title']),
models.Index(fields=['status']),
models.Index(fields=['cluster']),
models.Index(fields=['content_type']),
models.Index(fields=['site', 'sector']),
]
def __str__(self):
return self.title
class Content(SiteSectorBaseModel):
"""
Content model for storing final AI-generated article content.
Separated from Task for content versioning and storage optimization.
"""
task = models.OneToOneField(
Tasks,
on_delete=models.CASCADE,
related_name='content_record',
help_text="The task this content belongs to"
)
html_content = models.TextField(help_text="Final AI-generated HTML content")
word_count = models.IntegerField(default=0, validators=[MinValueValidator(0)])
metadata = models.JSONField(default=dict, help_text="Additional metadata (SEO, structure, etc.)")
title = models.CharField(max_length=255, blank=True, null=True)
meta_title = models.CharField(max_length=255, blank=True, null=True)
meta_description = models.TextField(blank=True, null=True)
primary_keyword = models.CharField(max_length=255, blank=True, null=True)
secondary_keywords = models.JSONField(default=list, blank=True, help_text="List of secondary keywords")
tags = models.JSONField(default=list, blank=True, help_text="List of tags")
categories = models.JSONField(default=list, blank=True, help_text="List of categories")
STATUS_CHOICES = [
('draft', 'Draft'),
('review', 'Review'),
('publish', 'Publish'),
]
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='draft', help_text="Content workflow status (draft, review, publish)")
generated_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_content'
ordering = ['-generated_at']
verbose_name = 'Content'
verbose_name_plural = 'Contents'
indexes = [
models.Index(fields=['task']),
models.Index(fields=['generated_at']),
]
def save(self, *args, **kwargs):
"""Automatically set account, site, and sector from task"""
if self.task:
self.account = self.task.account
self.site = self.task.site
self.sector = self.task.sector
super().save(*args, **kwargs)
def __str__(self):
return f"Content for {self.task.title}"
class Images(SiteSectorBaseModel):
"""Images model for content-related images (featured, desktop, mobile, in-article)"""
IMAGE_TYPE_CHOICES = [
('featured', 'Featured Image'),
('desktop', 'Desktop Image'),
('mobile', 'Mobile Image'),
('in_article', 'In-Article Image'),
]
content = models.ForeignKey(
Content,
on_delete=models.CASCADE,
related_name='images',
null=True,
blank=True,
help_text="The content this image belongs to (preferred)"
)
task = models.ForeignKey(
Tasks,
on_delete=models.CASCADE,
related_name='images',
null=True,
blank=True,
help_text="The task this image belongs to (legacy, use content instead)"
)
image_type = models.CharField(max_length=50, choices=IMAGE_TYPE_CHOICES, default='featured')
image_url = models.CharField(max_length=500, blank=True, null=True, help_text="URL of the generated/stored image")
image_path = models.CharField(max_length=500, blank=True, null=True, help_text="Local path if stored locally")
prompt = models.TextField(blank=True, null=True, help_text="Image generation prompt used")
status = models.CharField(max_length=50, default='pending', help_text="Status: pending, generated, failed")
position = models.IntegerField(default=0, help_text="Position for in-article images ordering")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'igny8_images'
ordering = ['content', 'position', '-created_at']
verbose_name = 'Image'
verbose_name_plural = 'Images'
indexes = [
models.Index(fields=['content', 'image_type']),
models.Index(fields=['task', 'image_type']),
models.Index(fields=['status']),
models.Index(fields=['content', 'position']),
models.Index(fields=['task', 'position']),
]
def save(self, *args, **kwargs):
"""Automatically set account, site, and sector from content or task"""
# Prefer content over task
if self.content:
self.account = self.content.account
self.site = self.content.site
self.sector = self.content.sector
elif self.task:
self.account = self.task.account
self.site = self.task.site
self.sector = self.task.sector
super().save(*args, **kwargs)
def __str__(self):
content_title = self.content.title if self.content else None
task_title = self.task.title if self.task else None
title = content_title or task_title or 'Unknown'
return f"{title} - {self.image_type}"
# Backward compatibility aliases - models moved to business/content/
from igny8_core.business.content.models import Tasks, Content, Images
__all__ = ['Tasks', 'Content', 'Images']

View File

@@ -1,7 +1,7 @@
from rest_framework import serializers
from django.db import models
from .models import Tasks, Images, Content
from igny8_core.modules.planner.models import Clusters, ContentIdeas
from igny8_core.business.planning.models import Clusters, ContentIdeas
class TasksSerializer(serializers.ModelSerializer):

View File

@@ -29,6 +29,7 @@ urlpatterns = [
path('api/v1/writer/', include('igny8_core.modules.writer.urls')),
path('api/v1/system/', include('igny8_core.modules.system.urls')),
path('api/v1/billing/', include('igny8_core.modules.billing.urls')), # Billing endpoints
path('api/v1/automation/', include('igny8_core.modules.automation.urls')), # Automation endpoints
# OpenAPI Schema and Documentation
path('api/schema/', SpectacularAPIView.as_view(), name='schema'),
path('api/docs/', SpectacularSwaggerView.as_view(url_name='schema'), name='swagger-ui'),