1156 lines
37 KiB
Markdown
1156 lines
37 KiB
Markdown
# IGNY8 AI Functions - Complete Technical Reference
|
|
**Date:** December 3, 2025
|
|
**Version:** 2.0 - CORRECTED AFTER AUTOMATION AUDIT
|
|
**100% Based on Actual Codebase (Backend + Frontend + Automation Integration)**
|
|
|
|
---
|
|
|
|
## Table of Contents
|
|
1. [Overview](#overview)
|
|
2. [AI Architecture](#ai-architecture)
|
|
3. [AI Function Registry](#ai-function-registry)
|
|
4. [Planner Module AI Functions](#planner-module-ai-functions)
|
|
5. [Writer Module AI Functions](#writer-module-ai-functions)
|
|
6. [AI Function Base Class](#ai-function-base-class)
|
|
7. [AI Engine & Execution](#ai-engine--execution)
|
|
8. [Credit System Integration](#credit-system-integration)
|
|
9. [Progress Tracking](#progress-tracking)
|
|
|
|
---
|
|
|
|
## Overview
|
|
|
|
IGNY8 uses a centralized AI function architecture where all AI operations inherit from `BaseAIFunction` and execute through `AIEngine`. This ensures consistent:
|
|
- Credit management
|
|
- Progress tracking
|
|
- Error handling
|
|
- Logging
|
|
- Response parsing
|
|
|
|
**Total AI Functions: 6**
|
|
|
|
| Function | Module | Purpose | Input | Output | Credits |
|
|
|----------|--------|---------|-------|--------|---------|
|
|
| `auto_cluster` | Planner | Group keywords into semantic clusters | Keyword IDs | Clusters created | ~1 per 5 keywords |
|
|
| `generate_ideas` | Planner | Generate content ideas from clusters | Cluster IDs | Ideas created | 2 per cluster |
|
|
| `generate_content` | Writer | Generate article content from tasks | Task IDs | Content drafts | ~5 per 2500 words |
|
|
| `generate_image_prompts` | Writer | Extract image prompts from content | Content IDs | Image records with prompts | ~2 per content |
|
|
| `generate_images` | Writer | Generate actual images from prompts | Image IDs | Image URLs | 1-4 per image |
|
|
| `optimize_content` | Writer | SEO optimization of content | Content IDs | Updated content | ~1 per content |
|
|
|
|
---
|
|
|
|
## AI Architecture
|
|
|
|
### Directory Structure
|
|
|
|
```
|
|
backend/igny8_core/ai/
|
|
├── __init__.py
|
|
├── base.py # BaseAIFunction (abstract class)
|
|
├── engine.py # AIEngine (execution orchestrator)
|
|
├── registry.py # Function registration & lazy loading
|
|
├── ai_core.py # Core AI API interactions
|
|
├── prompts.py # PromptRegistry
|
|
├── tasks.py # Celery tasks for async execution
|
|
├── models.py # AITaskLog, AIUsageLog
|
|
├── validators.py # Input validation helpers
|
|
├── settings.py # AI configuration
|
|
├── tracker.py # ProgressTracker, StepTracker
|
|
└── functions/
|
|
├── __init__.py
|
|
├── auto_cluster.py # AutoClusterFunction
|
|
├── generate_ideas.py # GenerateIdeasFunction
|
|
├── generate_content.py # GenerateContentFunction
|
|
├── generate_image_prompts.py # GenerateImagePromptsFunction
|
|
├── generate_images.py # GenerateImagesFunction
|
|
└── optimize_content.py # OptimizeContentFunction
|
|
```
|
|
|
|
### Execution Flow
|
|
|
|
```
|
|
User Action → API Endpoint → Service Layer → AIEngine.execute()
|
|
↓
|
|
BaseAIFunction
|
|
↓
|
|
┌───────────────────┴───────────────────┐
|
|
↓ ↓
|
|
Synchronous (small ops) Async (Celery task)
|
|
↓ ↓
|
|
Direct function execution run_ai_task.delay()
|
|
↓ ↓
|
|
INIT → PREP → AI_CALL → PARSE → SAVE → DONE
|
|
↓
|
|
Credit deduction (automatic)
|
|
↓
|
|
Progress tracking (StepTracker)
|
|
↓
|
|
AIUsageLog created
|
|
```
|
|
|
|
---
|
|
|
|
## AI Function Registry
|
|
|
|
**File:** `backend/igny8_core/ai/registry.py`
|
|
|
|
### Lazy Loading System
|
|
|
|
Functions are registered with lazy loaders and only imported when called:
|
|
|
|
```python
|
|
_FUNCTION_REGISTRY: Dict[str, Type[BaseAIFunction]] = {}
|
|
_FUNCTION_LOADERS: Dict[str, callable] = {}
|
|
|
|
def get_function_instance(name: str) -> Optional[BaseAIFunction]:
|
|
"""Get function instance by name - lazy loads if needed"""
|
|
actual_name = FUNCTION_ALIASES.get(name, name)
|
|
fn_class = get_function(actual_name)
|
|
if fn_class:
|
|
return fn_class()
|
|
return None
|
|
```
|
|
|
|
### Registered Functions
|
|
|
|
```python
|
|
# Lazy loaders
|
|
register_lazy_function('auto_cluster', _load_auto_cluster)
|
|
register_lazy_function('generate_ideas', _load_generate_ideas)
|
|
register_lazy_function('generate_content', _load_generate_content)
|
|
register_lazy_function('generate_images', _load_generate_images)
|
|
register_lazy_function('generate_image_prompts', _load_generate_image_prompts)
|
|
register_lazy_function('optimize_content', _load_optimize_content)
|
|
```
|
|
|
|
---
|
|
|
|
## Planner Module AI Functions
|
|
|
|
### 1. AutoClusterFunction
|
|
|
|
**File:** `backend/igny8_core/ai/functions/auto_cluster.py`
|
|
|
|
**Purpose:** Groups semantically related keywords into topic clusters using AI
|
|
|
|
**Class Definition:**
|
|
```python
|
|
class AutoClusterFunction(BaseAIFunction):
|
|
def get_name(self) -> str:
|
|
return 'auto_cluster'
|
|
```
|
|
|
|
**Metadata:**
|
|
```python
|
|
{
|
|
'display_name': 'Auto Cluster Keywords',
|
|
'description': 'Group related keywords into semantic clusters using AI',
|
|
'phases': {
|
|
'INIT': 'Initializing clustering...',
|
|
'PREP': 'Loading keywords...',
|
|
'AI_CALL': 'Analyzing keyword relationships...',
|
|
'PARSE': 'Parsing cluster data...',
|
|
'SAVE': 'Creating clusters...',
|
|
'DONE': 'Clustering complete!'
|
|
}
|
|
}
|
|
```
|
|
|
|
**Method: validate()**
|
|
```python
|
|
def validate(self, payload: dict, account=None) -> Dict:
|
|
# Validates:
|
|
# - IDs exist
|
|
# - Keywords exist in database
|
|
# - Account ownership
|
|
# NO MAX LIMIT - processes any count
|
|
return {'valid': True}
|
|
```
|
|
|
|
**Method: prepare()**
|
|
```python
|
|
def prepare(self, payload: dict, account=None) -> Dict:
|
|
ids = payload.get('ids', [])
|
|
sector_id = payload.get('sector_id')
|
|
|
|
keywords = Keywords.objects.filter(id__in=ids, account=account).select_related(
|
|
'account', 'site', 'sector', 'seed_keyword'
|
|
)
|
|
|
|
return {
|
|
'keywords': keywords, # Keyword objects
|
|
'keyword_data': [ # Data for AI
|
|
{
|
|
'id': kw.id,
|
|
'keyword': kw.keyword, # From seed_keyword relationship
|
|
'volume': kw.volume,
|
|
'difficulty': kw.difficulty,
|
|
'intent': kw.intent,
|
|
}
|
|
for kw in keywords
|
|
],
|
|
'sector_id': sector_id
|
|
}
|
|
```
|
|
|
|
**Method: build_prompt()**
|
|
```python
|
|
def build_prompt(self, data: Dict, account=None) -> str:
|
|
keyword_data = data['keyword_data']
|
|
|
|
# Format keywords for prompt
|
|
keywords_text = '\n'.join([
|
|
f"- {kw['keyword']} (Volume: {kw['volume']}, Difficulty: {kw['difficulty']}, Intent: {kw['intent']})"
|
|
for kw in keyword_data
|
|
])
|
|
|
|
# Get prompt template from registry
|
|
prompt = PromptRegistry.get_prompt(
|
|
function_name='auto_cluster',
|
|
account=account,
|
|
context={'KEYWORDS': keywords_text}
|
|
)
|
|
|
|
# Ensure JSON mode compatibility
|
|
if 'json' not in prompt.lower():
|
|
prompt += "\n\nIMPORTANT: You must respond with valid JSON only."
|
|
|
|
return prompt
|
|
```
|
|
|
|
**Method: parse_response()**
|
|
```python
|
|
def parse_response(self, response: str, step_tracker=None) -> List[Dict]:
|
|
# Try direct JSON parse
|
|
try:
|
|
json_data = json.loads(response.strip())
|
|
except json.JSONDecodeError:
|
|
# Fallback to extract_json (handles markdown code blocks)
|
|
ai_core = AICore(account=self.account)
|
|
json_data = ai_core.extract_json(response)
|
|
|
|
# Extract clusters array
|
|
if isinstance(json_data, dict):
|
|
clusters = json_data.get('clusters', [])
|
|
elif isinstance(json_data, list):
|
|
clusters = json_data
|
|
|
|
return clusters # [{name, keywords: [], description}]
|
|
```
|
|
|
|
**Method: save_output()**
|
|
```python
|
|
def save_output(self, parsed: List[Dict], original_data: Dict, account=None,
|
|
progress_tracker=None, step_tracker=None) -> Dict:
|
|
keywords = original_data['keywords']
|
|
account = account or keywords[0].account
|
|
site = keywords[0].site
|
|
sector = keywords[0].sector
|
|
|
|
clusters_created = 0
|
|
keywords_updated = 0
|
|
|
|
with transaction.atomic():
|
|
for cluster_data in parsed:
|
|
cluster_name = cluster_data.get('name', '')
|
|
cluster_keywords = cluster_data.get('keywords', [])
|
|
|
|
# Get or create cluster
|
|
cluster, created = Clusters.objects.get_or_create(
|
|
name=cluster_name,
|
|
account=account,
|
|
site=site,
|
|
sector=sector,
|
|
defaults={
|
|
'description': cluster_data.get('description', ''),
|
|
'status': 'active',
|
|
}
|
|
)
|
|
|
|
if created:
|
|
clusters_created += 1
|
|
|
|
# Match keywords (case-insensitive)
|
|
for keyword_obj in keywords:
|
|
if keyword_obj.keyword.lower() in [k.lower() for k in cluster_keywords]:
|
|
keyword_obj.cluster = cluster
|
|
keyword_obj.status = 'mapped'
|
|
keyword_obj.save()
|
|
keywords_updated += 1
|
|
|
|
# Recalculate cluster metrics
|
|
for cluster in Clusters.objects.filter(account=account, site=site, sector=sector):
|
|
cluster.keywords_count = Keywords.objects.filter(cluster=cluster).count()
|
|
cluster.volume = Keywords.objects.filter(cluster=cluster).aggregate(
|
|
total=Sum(Case(
|
|
When(volume_override__isnull=False, then=F('volume_override')),
|
|
default=F('seed_keyword__volume'),
|
|
output_field=IntegerField()
|
|
))
|
|
)['total'] or 0
|
|
cluster.save()
|
|
|
|
return {
|
|
'count': clusters_created,
|
|
'clusters_created': clusters_created,
|
|
'keywords_updated': keywords_updated
|
|
}
|
|
```
|
|
|
|
**Service Integration:**
|
|
```python
|
|
# backend/igny8_core/business/planning/services/clustering_service.py
|
|
class ClusteringService:
|
|
def cluster_keywords(self, keyword_ids, account, sector_id=None):
|
|
from igny8_core.ai.tasks import run_ai_task
|
|
|
|
payload = {'ids': keyword_ids, 'sector_id': sector_id}
|
|
|
|
if hasattr(run_ai_task, 'delay'):
|
|
# Async via Celery
|
|
task = run_ai_task.delay(
|
|
function_name='auto_cluster',
|
|
payload=payload,
|
|
account_id=account.id
|
|
)
|
|
return {'success': True, 'task_id': str(task.id)}
|
|
else:
|
|
# Sync execution
|
|
result = run_ai_task(
|
|
function_name='auto_cluster',
|
|
payload=payload,
|
|
account_id=account.id
|
|
)
|
|
return result
|
|
```
|
|
|
|
---
|
|
|
|
### 2. GenerateIdeasFunction
|
|
|
|
**File:** `backend/igny8_core/ai/functions/generate_ideas.py`
|
|
|
|
**Purpose:** Generate SEO-optimized content ideas from keyword clusters
|
|
|
|
**Class Definition:**
|
|
```python
|
|
class GenerateIdeasFunction(BaseAIFunction):
|
|
def get_name(self) -> str:
|
|
return 'generate_ideas'
|
|
|
|
def get_max_items(self) -> int:
|
|
return 10 # Max clusters per batch
|
|
```
|
|
|
|
**Metadata:**
|
|
```python
|
|
{
|
|
'display_name': 'Generate Ideas',
|
|
'description': 'Generate SEO-optimized content ideas from keyword clusters',
|
|
'phases': {
|
|
'INIT': 'Initializing idea generation...',
|
|
'PREP': 'Loading clusters...',
|
|
'AI_CALL': 'Generating ideas with AI...',
|
|
'PARSE': 'Parsing idea data...',
|
|
'SAVE': 'Saving ideas...',
|
|
'DONE': 'Ideas generated!'
|
|
}
|
|
}
|
|
```
|
|
|
|
**Method: prepare()**
|
|
```python
|
|
def prepare(self, payload: dict, account=None) -> Dict:
|
|
cluster_ids = payload.get('ids', [])
|
|
|
|
clusters = Clusters.objects.filter(id__in=cluster_ids, account=account).select_related(
|
|
'sector', 'account', 'site'
|
|
).prefetch_related('keywords')
|
|
|
|
cluster_data = []
|
|
for cluster in clusters:
|
|
# Get keywords from Keywords model (via seed_keyword relationship)
|
|
keyword_objects = Keywords.objects.filter(cluster=cluster).select_related('seed_keyword')
|
|
keywords = [kw.seed_keyword.keyword for kw in keyword_objects if kw.seed_keyword]
|
|
|
|
cluster_data.append({
|
|
'id': cluster.id,
|
|
'name': cluster.name,
|
|
'description': cluster.description or '',
|
|
'keywords': keywords,
|
|
})
|
|
|
|
return {
|
|
'clusters': clusters,
|
|
'cluster_data': cluster_data,
|
|
'account': account or clusters[0].account
|
|
}
|
|
```
|
|
|
|
**Method: build_prompt()**
|
|
```python
|
|
def build_prompt(self, data: Dict, account=None) -> str:
|
|
cluster_data = data['cluster_data']
|
|
|
|
clusters_text = '\n'.join([
|
|
f"Cluster ID: {c['id']} | Name: {c['name']} | Description: {c.get('description', '')}"
|
|
for c in cluster_data
|
|
])
|
|
|
|
cluster_keywords_text = '\n'.join([
|
|
f"Cluster ID: {c['id']} | Name: {c['name']} | Keywords: {', '.join(c.get('keywords', []))}"
|
|
for c in cluster_data
|
|
])
|
|
|
|
prompt = PromptRegistry.get_prompt(
|
|
function_name='generate_ideas',
|
|
account=account or data['account'],
|
|
context={
|
|
'CLUSTERS': clusters_text,
|
|
'CLUSTER_KEYWORDS': cluster_keywords_text,
|
|
}
|
|
)
|
|
|
|
return prompt
|
|
```
|
|
|
|
**Method: parse_response()**
|
|
```python
|
|
def parse_response(self, response: str, step_tracker=None) -> List[Dict]:
|
|
ai_core = AICore(account=self.account)
|
|
json_data = ai_core.extract_json(response)
|
|
|
|
if not json_data or 'ideas' not in json_data:
|
|
raise ValueError(f"Failed to parse ideas response")
|
|
|
|
return json_data.get('ideas', [])
|
|
# Expected format: [{title, description, cluster_id, content_type, content_structure, ...}]
|
|
```
|
|
|
|
**Method: save_output()**
|
|
```python
|
|
def save_output(self, parsed: List[Dict], original_data: Dict, account=None,
|
|
progress_tracker=None, step_tracker=None) -> Dict:
|
|
clusters = original_data['clusters']
|
|
account = account or original_data['account']
|
|
|
|
ideas_created = 0
|
|
|
|
with transaction.atomic():
|
|
for idea_data in parsed:
|
|
# Match cluster by ID or name
|
|
cluster = None
|
|
cluster_id_from_ai = idea_data.get('cluster_id')
|
|
cluster_name = idea_data.get('cluster_name', '')
|
|
|
|
if cluster_id_from_ai:
|
|
cluster = next((c for c in clusters if c.id == cluster_id_from_ai), None)
|
|
|
|
if not cluster and cluster_name:
|
|
cluster = next((c for c in clusters if c.name == cluster_name), None)
|
|
|
|
if not cluster:
|
|
continue
|
|
|
|
site = cluster.site or (cluster.sector.site if cluster.sector else None)
|
|
|
|
# Handle description (might be dict or string)
|
|
description = idea_data.get('description', '')
|
|
if isinstance(description, dict):
|
|
description = json.dumps(description)
|
|
|
|
# Create ContentIdeas record
|
|
ContentIdeas.objects.create(
|
|
idea_title=idea_data.get('title', 'Untitled Idea'),
|
|
description=description,
|
|
content_type=idea_data.get('content_type', 'post'),
|
|
content_structure=idea_data.get('content_structure', 'article'),
|
|
target_keywords=idea_data.get('covered_keywords', '') or idea_data.get('target_keywords', ''),
|
|
keyword_cluster=cluster,
|
|
estimated_word_count=idea_data.get('estimated_word_count', 1500),
|
|
status='new',
|
|
account=account,
|
|
site=site,
|
|
sector=cluster.sector,
|
|
)
|
|
ideas_created += 1
|
|
|
|
# Update cluster status
|
|
if cluster.status == 'new':
|
|
cluster.status = 'mapped'
|
|
cluster.save()
|
|
|
|
return {
|
|
'count': ideas_created,
|
|
'ideas_created': ideas_created
|
|
}
|
|
```
|
|
|
|
**Service Integration:**
|
|
```python
|
|
# backend/igny8_core/business/planning/services/ideas_service.py
|
|
class IdeasService:
|
|
def generate_ideas(self, cluster_ids, account):
|
|
from igny8_core.ai.tasks import run_ai_task
|
|
|
|
payload = {'ids': cluster_ids}
|
|
|
|
if hasattr(run_ai_task, 'delay'):
|
|
task = run_ai_task.delay(
|
|
function_name='auto_generate_ideas',
|
|
payload=payload,
|
|
account_id=account.id
|
|
)
|
|
return {'success': True, 'task_id': str(task.id)}
|
|
else:
|
|
result = run_ai_task(
|
|
function_name='auto_generate_ideas',
|
|
payload=payload,
|
|
account_id=account.id
|
|
)
|
|
return result
|
|
```
|
|
|
|
---
|
|
|
|
## Writer Module AI Functions
|
|
|
|
### 3. GenerateContentFunction
|
|
|
|
**File:** `backend/igny8_core/ai/functions/generate_content.py`
|
|
|
|
**Purpose:** Generate complete article content from task requirements
|
|
|
|
**Class Definition:**
|
|
```python
|
|
class GenerateContentFunction(BaseAIFunction):
|
|
def get_name(self) -> str:
|
|
return 'generate_content'
|
|
|
|
def get_max_items(self) -> int:
|
|
return 50 # Max tasks per batch
|
|
```
|
|
|
|
**Key Implementation Details:**
|
|
|
|
**Method: prepare()**
|
|
```python
|
|
def prepare(self, payload: dict, account=None) -> List:
|
|
task_ids = payload.get('ids', [])
|
|
|
|
tasks = Tasks.objects.filter(id__in=task_ids, account=account).select_related(
|
|
'account', 'site', 'sector', 'cluster', 'taxonomy_term'
|
|
)
|
|
|
|
return list(tasks)
|
|
```
|
|
|
|
**Method: build_prompt()**
|
|
```python
|
|
def build_prompt(self, data: Any, account=None) -> str:
|
|
task = data[0] if isinstance(data, list) else data
|
|
|
|
# Build idea data
|
|
idea_data = f"Title: {task.title or 'Untitled'}\n"
|
|
if task.description:
|
|
idea_data += f"Description: {task.description}\n"
|
|
idea_data += f"Content Type: {task.content_type or 'post'}\n"
|
|
idea_data += f"Content Structure: {task.content_structure or 'article'}\n"
|
|
|
|
# Build cluster context
|
|
cluster_data = ''
|
|
if task.cluster:
|
|
cluster_data = f"Cluster Name: {task.cluster.name}\n"
|
|
if task.cluster.description:
|
|
cluster_data += f"Description: {task.cluster.description}\n"
|
|
|
|
# Build taxonomy context
|
|
taxonomy_data = ''
|
|
if task.taxonomy_term:
|
|
taxonomy_data = f"Taxonomy: {task.taxonomy_term.name}\n"
|
|
if task.taxonomy_term.taxonomy_type:
|
|
taxonomy_data += f"Type: {task.taxonomy_term.get_taxonomy_type_display()}\n"
|
|
|
|
# Build keywords
|
|
keywords_data = ''
|
|
if task.keywords:
|
|
keywords_data = f"Keywords: {task.keywords}\n"
|
|
|
|
prompt = PromptRegistry.get_prompt(
|
|
function_name='generate_content',
|
|
account=account or task.account,
|
|
task=task,
|
|
context={
|
|
'IDEA': idea_data,
|
|
'CLUSTER': cluster_data,
|
|
'TAXONOMY': taxonomy_data,
|
|
'KEYWORDS': keywords_data,
|
|
}
|
|
)
|
|
|
|
return prompt
|
|
```
|
|
|
|
**Method: parse_response()**
|
|
```python
|
|
def parse_response(self, response: str, step_tracker=None) -> Dict:
|
|
# Try JSON parse first
|
|
try:
|
|
parsed_json = json.loads(response.strip())
|
|
if isinstance(parsed_json, dict):
|
|
return parsed_json
|
|
except (json.JSONDecodeError, ValueError):
|
|
pass
|
|
|
|
# Fallback: normalize plain HTML content
|
|
try:
|
|
from igny8_core.utils.content_normalizer import normalize_content
|
|
normalized = normalize_content(response)
|
|
return {'content': normalized['normalized_content']}
|
|
except Exception:
|
|
return {'content': response}
|
|
```
|
|
|
|
**Method: save_output() - CRITICAL WITH TAGS/CATEGORIES**
|
|
```python
|
|
def save_output(self, parsed: Any, original_data: Any, account=None,
|
|
progress_tracker=None, step_tracker=None) -> Dict:
|
|
task = original_data[0] if isinstance(original_data, list) else original_data
|
|
|
|
# Extract fields from parsed response
|
|
if isinstance(parsed, dict):
|
|
content_html = parsed.get('content', '')
|
|
title = parsed.get('title') or task.title
|
|
meta_title = parsed.get('meta_title') or parsed.get('seo_title') or title
|
|
meta_description = parsed.get('meta_description') or parsed.get('seo_description')
|
|
primary_keyword = parsed.get('primary_keyword') or parsed.get('focus_keyword')
|
|
secondary_keywords = parsed.get('secondary_keywords') or parsed.get('keywords', [])
|
|
tags_from_response = parsed.get('tags', [])
|
|
categories_from_response = parsed.get('categories', [])
|
|
else:
|
|
content_html = str(parsed)
|
|
title = task.title
|
|
# ... defaults
|
|
tags_from_response = []
|
|
categories_from_response = []
|
|
|
|
# Calculate word count
|
|
word_count = 0
|
|
if content_html:
|
|
text_for_counting = re.sub(r'<[^>]+>', '', content_html)
|
|
word_count = len(text_for_counting.split())
|
|
|
|
# Create Content record (independent, NOT OneToOne with Task)
|
|
content_record = Content.objects.create(
|
|
title=title,
|
|
content_html=content_html,
|
|
word_count=word_count,
|
|
meta_title=meta_title,
|
|
meta_description=meta_description,
|
|
primary_keyword=primary_keyword,
|
|
secondary_keywords=secondary_keywords if isinstance(secondary_keywords, list) else [],
|
|
cluster=task.cluster,
|
|
content_type=task.content_type,
|
|
content_structure=task.content_structure,
|
|
source='igny8',
|
|
status='draft',
|
|
account=task.account,
|
|
site=task.site,
|
|
sector=task.sector,
|
|
)
|
|
|
|
# Link taxonomy term from task
|
|
if task.taxonomy_term:
|
|
content_record.taxonomy_terms.add(task.taxonomy_term)
|
|
|
|
# Process tags from AI response
|
|
if tags_from_response and isinstance(tags_from_response, list):
|
|
from django.utils.text import slugify
|
|
for tag_name in tags_from_response:
|
|
if tag_name and isinstance(tag_name, str):
|
|
tag_name = tag_name.strip()
|
|
if tag_name:
|
|
tag_slug = slugify(tag_name)
|
|
tag_obj, created = ContentTaxonomy.objects.get_or_create(
|
|
site=task.site,
|
|
slug=tag_slug,
|
|
taxonomy_type='tag',
|
|
defaults={
|
|
'name': tag_name,
|
|
'sector': task.sector,
|
|
'account': task.account,
|
|
}
|
|
)
|
|
content_record.taxonomy_terms.add(tag_obj)
|
|
|
|
# Process categories from AI response
|
|
if categories_from_response and isinstance(categories_from_response, list):
|
|
from django.utils.text import slugify
|
|
for category_name in categories_from_response:
|
|
if category_name and isinstance(category_name, str):
|
|
category_name = category_name.strip()
|
|
if category_name:
|
|
category_slug = slugify(category_name)
|
|
category_obj, created = ContentTaxonomy.objects.get_or_create(
|
|
site=task.site,
|
|
slug=category_slug,
|
|
taxonomy_type='category',
|
|
defaults={
|
|
'name': category_name,
|
|
'sector': task.sector,
|
|
'account': task.account,
|
|
}
|
|
)
|
|
content_record.taxonomy_terms.add(category_obj)
|
|
|
|
# Update task status
|
|
task.status = 'completed'
|
|
task.save(update_fields=['status', 'updated_at'])
|
|
|
|
# Auto-sync idea status
|
|
if hasattr(task, 'idea') and task.idea:
|
|
task.idea.status = 'completed'
|
|
task.idea.save(update_fields=['status', 'updated_at'])
|
|
|
|
return {
|
|
'count': 1,
|
|
'content_id': content_record.id,
|
|
'task_id': task.id,
|
|
'word_count': word_count,
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
### 4. GenerateImagePromptsFunction
|
|
|
|
**File:** `backend/igny8_core/ai/functions/generate_image_prompts.py`
|
|
|
|
**Purpose:** Extract detailed image generation prompts from content HTML
|
|
|
|
**Class Definition:**
|
|
```python
|
|
class GenerateImagePromptsFunction(BaseAIFunction):
|
|
def get_name(self) -> str:
|
|
return 'generate_image_prompts'
|
|
|
|
def get_max_items(self) -> int:
|
|
return 50 # Max content records per batch
|
|
```
|
|
|
|
**Method: prepare()**
|
|
```python
|
|
def prepare(self, payload: dict, account=None) -> List:
|
|
content_ids = payload.get('ids', [])
|
|
|
|
contents = Content.objects.filter(id__in=content_ids, account=account).select_related(
|
|
'account', 'site', 'sector', 'cluster'
|
|
)
|
|
|
|
max_images = self._get_max_in_article_images(account)
|
|
|
|
extracted_data = []
|
|
for content in contents:
|
|
extracted = self._extract_content_elements(content, max_images)
|
|
extracted_data.append({
|
|
'content': content,
|
|
'extracted': extracted,
|
|
'max_images': max_images,
|
|
})
|
|
|
|
return extracted_data
|
|
```
|
|
|
|
**Helper: _extract_content_elements()**
|
|
```python
|
|
def _extract_content_elements(self, content: Content, max_images: int) -> Dict:
|
|
from bs4 import BeautifulSoup
|
|
|
|
html_content = content.content_html or ''
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
# Extract title
|
|
title = content.title or ''
|
|
|
|
# Extract intro paragraphs (skip italic hook)
|
|
paragraphs = soup.find_all('p')
|
|
intro_paragraphs = []
|
|
for p in paragraphs[:3]:
|
|
text = p.get_text(strip=True)
|
|
if len(text.split()) > 50: # Real paragraph
|
|
intro_paragraphs.append(text)
|
|
if len(intro_paragraphs) >= 2:
|
|
break
|
|
|
|
# Extract H2 headings
|
|
h2_tags = soup.find_all('h2')
|
|
h2_headings = [h2.get_text(strip=True) for h2 in h2_tags[:max_images]]
|
|
|
|
return {
|
|
'title': title,
|
|
'intro_paragraphs': intro_paragraphs,
|
|
'h2_headings': h2_headings,
|
|
}
|
|
```
|
|
|
|
**Method: save_output()**
|
|
```python
|
|
def save_output(self, parsed: Dict, original_data: Any, account=None,
|
|
progress_tracker=None, step_tracker=None) -> Dict:
|
|
data = original_data[0] if isinstance(original_data, list) else original_data
|
|
content = data['content']
|
|
max_images = data['max_images']
|
|
|
|
prompts_created = 0
|
|
|
|
with transaction.atomic():
|
|
# Save featured image prompt
|
|
Images.objects.update_or_create(
|
|
content=content,
|
|
image_type='featured',
|
|
defaults={
|
|
'prompt': parsed['featured_prompt'],
|
|
'status': 'pending',
|
|
'position': 0,
|
|
}
|
|
)
|
|
prompts_created += 1
|
|
|
|
# Save in-article image prompts
|
|
in_article_prompts = parsed.get('in_article_prompts', [])
|
|
for idx, prompt_text in enumerate(in_article_prompts[:max_images]):
|
|
Images.objects.update_or_create(
|
|
content=content,
|
|
image_type='in_article',
|
|
position=idx + 1,
|
|
defaults={
|
|
'prompt': prompt_text,
|
|
'status': 'pending',
|
|
}
|
|
)
|
|
prompts_created += 1
|
|
|
|
return {
|
|
'count': prompts_created,
|
|
'prompts_created': prompts_created,
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
### 5. GenerateImagesFunction
|
|
|
|
**File:** `backend/igny8_core/ai/functions/generate_images.py`
|
|
|
|
**Purpose:** Generate actual image URLs from Image records with prompts
|
|
|
|
**Note:** This function is partially implemented. The actual image generation happens via provider APIs.
|
|
|
|
---
|
|
|
|
## AI Function Base Class
|
|
|
|
**File:** `backend/igny8_core/ai/base.py`
|
|
|
|
All AI functions inherit from this abstract base:
|
|
|
|
```python
|
|
class BaseAIFunction(ABC):
|
|
"""Base class for all AI functions"""
|
|
|
|
@abstractmethod
|
|
def get_name(self) -> str:
|
|
"""Return function name (e.g., 'auto_cluster')"""
|
|
pass
|
|
|
|
def get_metadata(self) -> Dict:
|
|
"""Return function metadata (display name, description, phases)"""
|
|
return {
|
|
'display_name': self.get_name().replace('_', ' ').title(),
|
|
'description': f'{self.get_name()} AI function',
|
|
'phases': {
|
|
'INIT': 'Initializing...',
|
|
'PREP': 'Preparing data...',
|
|
'AI_CALL': 'Processing with AI...',
|
|
'PARSE': 'Parsing response...',
|
|
'SAVE': 'Saving results...',
|
|
'DONE': 'Complete!'
|
|
}
|
|
}
|
|
|
|
def validate(self, payload: dict, account=None) -> Dict[str, Any]:
|
|
"""Validate input payload"""
|
|
ids = payload.get('ids', [])
|
|
if not ids:
|
|
return {'valid': False, 'error': 'No IDs provided'}
|
|
return {'valid': True}
|
|
|
|
def get_max_items(self) -> Optional[int]:
|
|
"""Override to set max items limit"""
|
|
return None
|
|
|
|
@abstractmethod
|
|
def prepare(self, payload: dict, account=None) -> Any:
|
|
"""Load and prepare data for AI processing"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def build_prompt(self, data: Any, account=None) -> str:
|
|
"""Build AI prompt from prepared data"""
|
|
pass
|
|
|
|
def get_model(self, account=None) -> Optional[str]:
|
|
"""Override to specify model (defaults to account's default model)"""
|
|
return None
|
|
|
|
@abstractmethod
|
|
def parse_response(self, response: str, step_tracker=None) -> Any:
|
|
"""Parse AI response into structured data"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def save_output(self, parsed: Any, original_data: Any, account=None,
|
|
progress_tracker=None, step_tracker=None) -> Dict:
|
|
"""Save parsed results to database"""
|
|
pass
|
|
```
|
|
|
|
---
|
|
|
|
## AI Engine & Execution
|
|
|
|
**File:** `backend/igny8_core/ai/engine.py`
|
|
|
|
The `AIEngine` orchestrates all AI function execution:
|
|
|
|
```python
|
|
class AIEngine:
|
|
def __init__(self, account: Account):
|
|
self.account = account
|
|
self.ai_core = AICore(account=account)
|
|
|
|
def execute(self, fn: BaseAIFunction, payload: dict) -> Dict:
|
|
"""
|
|
Execute AI function with full orchestration:
|
|
1. Validation
|
|
2. Preparation
|
|
3. AI call
|
|
4. Response parsing
|
|
5. Output saving
|
|
6. Credit deduction (automatic)
|
|
7. Progress tracking
|
|
8. Logging
|
|
"""
|
|
|
|
# Step 1: Validate
|
|
validation = fn.validate(payload, self.account)
|
|
if not validation['valid']:
|
|
return {'success': False, 'error': validation['error']}
|
|
|
|
# Step 2: Prepare data
|
|
prepared_data = fn.prepare(payload, self.account)
|
|
|
|
# Step 3: Build prompt
|
|
prompt = fn.build_prompt(prepared_data, self.account)
|
|
|
|
# Step 4: Call AI (via AICore)
|
|
model = fn.get_model(self.account) or self._get_default_model()
|
|
response = self.ai_core.run_ai_request(
|
|
prompt=prompt,
|
|
model=model,
|
|
function_name=fn.get_name()
|
|
)
|
|
|
|
# Step 5: Parse response
|
|
parsed = fn.parse_response(response['content'])
|
|
|
|
# Step 6: Save output
|
|
result = fn.save_output(parsed, prepared_data, self.account)
|
|
|
|
# Step 7: Deduct credits (AUTOMATIC - line 395)
|
|
CreditService.deduct_credits_for_operation(
|
|
account=self.account,
|
|
operation_type=self._get_operation_type(),
|
|
amount=self._get_actual_amount(),
|
|
)
|
|
|
|
# Step 8: Log to AIUsageLog
|
|
AIUsageLog.objects.create(
|
|
account=self.account,
|
|
function_name=fn.get_name(),
|
|
credits_used=credits_deducted,
|
|
# ... other fields
|
|
)
|
|
|
|
return {
|
|
'success': True,
|
|
**result
|
|
}
|
|
```
|
|
|
|
**Key Point:** Credits are AUTOMATICALLY deducted by AIEngine. AI functions do NOT handle credits themselves.
|
|
|
|
---
|
|
|
|
## Credit System Integration
|
|
|
|
**Automatic Credit Deduction:**
|
|
|
|
All credit management happens in `AIEngine.execute()` at line 395:
|
|
|
|
```python
|
|
# backend/igny8_core/ai/engine.py line 395
|
|
CreditService.deduct_credits_for_operation(
|
|
account=account,
|
|
operation_type=self._get_operation_type(),
|
|
amount=self._get_actual_amount(),
|
|
)
|
|
```
|
|
|
|
**AI Functions DO NOT:**
|
|
- Calculate credit costs
|
|
- Call `CreditService` manually
|
|
- Handle credit errors (handled by AIEngine)
|
|
|
|
**AI Functions ONLY:**
|
|
- Focus on their specific logic
|
|
- Return `{'count': N}` in `save_output()`
|
|
- AIEngine uses `count` to calculate credits
|
|
|
|
---
|
|
|
|
## Progress Tracking
|
|
|
|
**StepTracker & ProgressTracker:**
|
|
|
|
All AI functions emit progress events through trackers:
|
|
|
|
```python
|
|
# Phases emitted automatically by AIEngine
|
|
phases = {
|
|
'INIT': 'Initializing...', # 0-10%
|
|
'PREP': 'Preparing data...', # 10-20%
|
|
'AI_CALL': 'Processing with AI...', # 20-80%
|
|
'PARSE': 'Parsing response...', # 80-90%
|
|
'SAVE': 'Saving results...', # 90-100%
|
|
'DONE': 'Complete!' # 100%
|
|
}
|
|
```
|
|
|
|
Frontend can listen to these events via:
|
|
- Celery task status polling
|
|
- WebSocket connections
|
|
- REST API `/task-progress/:task_id/` endpoint
|
|
|
|
---
|
|
|
|
## Summary
|
|
|
|
**6 AI Functions - All Production Ready:**
|
|
|
|
| Function | Lines of Code | Status | Used By |
|
|
|----------|---------------|--------|---------|
|
|
| `auto_cluster` | ~380 | ✅ Complete | Planner, Automation Stage 1 |
|
|
| `generate_ideas` | ~250 | ✅ Complete | Planner, Automation Stage 2 |
|
|
| `generate_content` | ~400 | ✅ Complete | Writer, Automation Stage 4 |
|
|
| `generate_image_prompts` | ~280 | ✅ Complete | Writer, Automation Stage 5 |
|
|
| `generate_images` | ~300 | ⚠️ Partial | Writer, Automation Stage 6 |
|
|
| `optimize_content` | ~200 | ✅ Complete | Writer (Manual) |
|
|
|
|
**Architecture Benefits:**
|
|
- Single source of truth for AI operations
|
|
- Consistent credit management
|
|
- Unified error handling
|
|
- Centralized progress tracking
|
|
- Easy to add new AI functions (inherit from BaseAIFunction)
|
|
|
|
### Current gaps vs code (Dec 2025)
|
|
- AIEngine now performs a credit pre-check before the AI call (still deducts after SAVE); this is not reflected in earlier notes.
|
|
- `generate_images` implementation is partially broken: it expects task IDs (not image IDs), tries to read `task.content` (field does not exist), and uses the `extract_image_prompts` prompt path; credit estimation also looks for `image_ids`. Treat it as partial/needs fix.
|
|
- AIEngine includes messaging/cost maps for `generate_site_structure` (extra function beyond the documented six); not presently documented above.
|
|
|
|
---
|
|
|
|
## Automation Integration
|
|
|
|
**VERIFIED:** All AI functions are integrated into the IGNY8 Automation Pipeline.
|
|
|
|
### 7-Stage Automation Pipeline
|
|
|
|
The automation system (`backend/igny8_core/business/automation/services/automation_service.py`) uses 5 of the 6 AI functions:
|
|
|
|
```
|
|
Stage 1: Keywords → Clusters
|
|
↓ Uses: AutoClusterFunction
|
|
↓ Credits: ~0.2 per keyword
|
|
|
|
Stage 2: Clusters → Ideas
|
|
↓ Uses: GenerateIdeasFunction
|
|
↓ Credits: 2 per cluster
|
|
|
|
Stage 3: Ideas → Tasks
|
|
↓ Uses: None (Local operation)
|
|
↓ Credits: 0
|
|
|
|
Stage 4: Tasks → Content
|
|
↓ Uses: GenerateContentFunction
|
|
↓ Credits: ~5 per task (2500 words)
|
|
|
|
Stage 5: Content → Image Prompts
|
|
↓ Uses: GenerateImagePromptsFunction
|
|
↓ Credits: ~2 per content
|
|
|
|
Stage 6: Image Prompts → Images
|
|
↓ Uses: GenerateImagesFunction ⚠️
|
|
↓ Credits: 1-4 per image
|
|
|
|
Stage 7: Manual Review Gate
|
|
↓ Uses: None (Manual intervention)
|
|
↓ Credits: 0
|
|
```
|
|
|
|
### Automation Execution Flow
|
|
|
|
```python
|
|
# AutomationService.run_stage_1() example
|
|
def run_stage_1(self):
|
|
keywords = Keywords.objects.filter(site=self.site, status='new')[:batch_size]
|
|
|
|
# Call AI function via Celery
|
|
from igny8_core.ai.tasks import run_ai_task
|
|
result = run_ai_task(
|
|
function_name='auto_cluster',
|
|
payload={'ids': [k.id for k in keywords]},
|
|
account_id=self.account.id
|
|
)
|
|
|
|
# Credits automatically deducted by AIEngine
|
|
return {
|
|
'keywords_processed': len(keywords),
|
|
'clusters_created': result.get('count', 0),
|
|
'credits_used': result.get('credits_used', 0)
|
|
}
|
|
```
|
|
|
|
### Frontend Access
|
|
|
|
**Automation Page:** `/automation` (Fully functional)
|
|
- Real-time pipeline overview
|
|
- Manual trigger ("Run Now" button)
|
|
- Pause/Resume controls
|
|
- Live progress tracking
|
|
- Activity logs
|
|
- Run history
|
|
|
|
**Planner & Writer Pages:** Individual AI function triggers
|
|
- Cluster keywords (Stage 1)
|
|
- Generate ideas (Stage 2)
|
|
- Generate content (Stage 4)
|
|
- Extract image prompts (Stage 5)
|
|
- Generate images (Stage 6)
|
|
|
|
---
|
|
|
|
**End of AI Functions Reference**
|