Files
igny8/approved-docs/AI-FUNCTIONS-COMPLETE-REFERENCE.md
2025-12-07 16:49:30 +05:00

1156 lines
37 KiB
Markdown

# IGNY8 AI Functions - Complete Technical Reference
**Date:** December 3, 2025
**Version:** 2.0 - CORRECTED AFTER AUTOMATION AUDIT
**100% Based on Actual Codebase (Backend + Frontend + Automation Integration)**
---
## Table of Contents
1. [Overview](#overview)
2. [AI Architecture](#ai-architecture)
3. [AI Function Registry](#ai-function-registry)
4. [Planner Module AI Functions](#planner-module-ai-functions)
5. [Writer Module AI Functions](#writer-module-ai-functions)
6. [AI Function Base Class](#ai-function-base-class)
7. [AI Engine & Execution](#ai-engine--execution)
8. [Credit System Integration](#credit-system-integration)
9. [Progress Tracking](#progress-tracking)
---
## Overview
IGNY8 uses a centralized AI function architecture where all AI operations inherit from `BaseAIFunction` and execute through `AIEngine`. This ensures consistent:
- Credit management
- Progress tracking
- Error handling
- Logging
- Response parsing
**Total AI Functions: 6**
| Function | Module | Purpose | Input | Output | Credits |
|----------|--------|---------|-------|--------|---------|
| `auto_cluster` | Planner | Group keywords into semantic clusters | Keyword IDs | Clusters created | ~1 per 5 keywords |
| `generate_ideas` | Planner | Generate content ideas from clusters | Cluster IDs | Ideas created | 2 per cluster |
| `generate_content` | Writer | Generate article content from tasks | Task IDs | Content drafts | ~5 per 2500 words |
| `generate_image_prompts` | Writer | Extract image prompts from content | Content IDs | Image records with prompts | ~2 per content |
| `generate_images` | Writer | Generate actual images from prompts | Image IDs | Image URLs | 1-4 per image |
| `optimize_content` | Writer | SEO optimization of content | Content IDs | Updated content | ~1 per content |
---
## AI Architecture
### Directory Structure
```
backend/igny8_core/ai/
├── __init__.py
├── base.py # BaseAIFunction (abstract class)
├── engine.py # AIEngine (execution orchestrator)
├── registry.py # Function registration & lazy loading
├── ai_core.py # Core AI API interactions
├── prompts.py # PromptRegistry
├── tasks.py # Celery tasks for async execution
├── models.py # AITaskLog, AIUsageLog
├── validators.py # Input validation helpers
├── settings.py # AI configuration
├── tracker.py # ProgressTracker, StepTracker
└── functions/
├── __init__.py
├── auto_cluster.py # AutoClusterFunction
├── generate_ideas.py # GenerateIdeasFunction
├── generate_content.py # GenerateContentFunction
├── generate_image_prompts.py # GenerateImagePromptsFunction
├── generate_images.py # GenerateImagesFunction
└── optimize_content.py # OptimizeContentFunction
```
### Execution Flow
```
User Action → API Endpoint → Service Layer → AIEngine.execute()
BaseAIFunction
┌───────────────────┴───────────────────┐
↓ ↓
Synchronous (small ops) Async (Celery task)
↓ ↓
Direct function execution run_ai_task.delay()
↓ ↓
INIT → PREP → AI_CALL → PARSE → SAVE → DONE
Credit deduction (automatic)
Progress tracking (StepTracker)
AIUsageLog created
```
---
## AI Function Registry
**File:** `backend/igny8_core/ai/registry.py`
### Lazy Loading System
Functions are registered with lazy loaders and only imported when called:
```python
_FUNCTION_REGISTRY: Dict[str, Type[BaseAIFunction]] = {}
_FUNCTION_LOADERS: Dict[str, callable] = {}
def get_function_instance(name: str) -> Optional[BaseAIFunction]:
"""Get function instance by name - lazy loads if needed"""
actual_name = FUNCTION_ALIASES.get(name, name)
fn_class = get_function(actual_name)
if fn_class:
return fn_class()
return None
```
### Registered Functions
```python
# Lazy loaders
register_lazy_function('auto_cluster', _load_auto_cluster)
register_lazy_function('generate_ideas', _load_generate_ideas)
register_lazy_function('generate_content', _load_generate_content)
register_lazy_function('generate_images', _load_generate_images)
register_lazy_function('generate_image_prompts', _load_generate_image_prompts)
register_lazy_function('optimize_content', _load_optimize_content)
```
---
## Planner Module AI Functions
### 1. AutoClusterFunction
**File:** `backend/igny8_core/ai/functions/auto_cluster.py`
**Purpose:** Groups semantically related keywords into topic clusters using AI
**Class Definition:**
```python
class AutoClusterFunction(BaseAIFunction):
def get_name(self) -> str:
return 'auto_cluster'
```
**Metadata:**
```python
{
'display_name': 'Auto Cluster Keywords',
'description': 'Group related keywords into semantic clusters using AI',
'phases': {
'INIT': 'Initializing clustering...',
'PREP': 'Loading keywords...',
'AI_CALL': 'Analyzing keyword relationships...',
'PARSE': 'Parsing cluster data...',
'SAVE': 'Creating clusters...',
'DONE': 'Clustering complete!'
}
}
```
**Method: validate()**
```python
def validate(self, payload: dict, account=None) -> Dict:
# Validates:
# - IDs exist
# - Keywords exist in database
# - Account ownership
# NO MAX LIMIT - processes any count
return {'valid': True}
```
**Method: prepare()**
```python
def prepare(self, payload: dict, account=None) -> Dict:
ids = payload.get('ids', [])
sector_id = payload.get('sector_id')
keywords = Keywords.objects.filter(id__in=ids, account=account).select_related(
'account', 'site', 'sector', 'seed_keyword'
)
return {
'keywords': keywords, # Keyword objects
'keyword_data': [ # Data for AI
{
'id': kw.id,
'keyword': kw.keyword, # From seed_keyword relationship
'volume': kw.volume,
'difficulty': kw.difficulty,
'intent': kw.intent,
}
for kw in keywords
],
'sector_id': sector_id
}
```
**Method: build_prompt()**
```python
def build_prompt(self, data: Dict, account=None) -> str:
keyword_data = data['keyword_data']
# Format keywords for prompt
keywords_text = '\n'.join([
f"- {kw['keyword']} (Volume: {kw['volume']}, Difficulty: {kw['difficulty']}, Intent: {kw['intent']})"
for kw in keyword_data
])
# Get prompt template from registry
prompt = PromptRegistry.get_prompt(
function_name='auto_cluster',
account=account,
context={'KEYWORDS': keywords_text}
)
# Ensure JSON mode compatibility
if 'json' not in prompt.lower():
prompt += "\n\nIMPORTANT: You must respond with valid JSON only."
return prompt
```
**Method: parse_response()**
```python
def parse_response(self, response: str, step_tracker=None) -> List[Dict]:
# Try direct JSON parse
try:
json_data = json.loads(response.strip())
except json.JSONDecodeError:
# Fallback to extract_json (handles markdown code blocks)
ai_core = AICore(account=self.account)
json_data = ai_core.extract_json(response)
# Extract clusters array
if isinstance(json_data, dict):
clusters = json_data.get('clusters', [])
elif isinstance(json_data, list):
clusters = json_data
return clusters # [{name, keywords: [], description}]
```
**Method: save_output()**
```python
def save_output(self, parsed: List[Dict], original_data: Dict, account=None,
progress_tracker=None, step_tracker=None) -> Dict:
keywords = original_data['keywords']
account = account or keywords[0].account
site = keywords[0].site
sector = keywords[0].sector
clusters_created = 0
keywords_updated = 0
with transaction.atomic():
for cluster_data in parsed:
cluster_name = cluster_data.get('name', '')
cluster_keywords = cluster_data.get('keywords', [])
# Get or create cluster
cluster, created = Clusters.objects.get_or_create(
name=cluster_name,
account=account,
site=site,
sector=sector,
defaults={
'description': cluster_data.get('description', ''),
'status': 'active',
}
)
if created:
clusters_created += 1
# Match keywords (case-insensitive)
for keyword_obj in keywords:
if keyword_obj.keyword.lower() in [k.lower() for k in cluster_keywords]:
keyword_obj.cluster = cluster
keyword_obj.status = 'mapped'
keyword_obj.save()
keywords_updated += 1
# Recalculate cluster metrics
for cluster in Clusters.objects.filter(account=account, site=site, sector=sector):
cluster.keywords_count = Keywords.objects.filter(cluster=cluster).count()
cluster.volume = Keywords.objects.filter(cluster=cluster).aggregate(
total=Sum(Case(
When(volume_override__isnull=False, then=F('volume_override')),
default=F('seed_keyword__volume'),
output_field=IntegerField()
))
)['total'] or 0
cluster.save()
return {
'count': clusters_created,
'clusters_created': clusters_created,
'keywords_updated': keywords_updated
}
```
**Service Integration:**
```python
# backend/igny8_core/business/planning/services/clustering_service.py
class ClusteringService:
def cluster_keywords(self, keyword_ids, account, sector_id=None):
from igny8_core.ai.tasks import run_ai_task
payload = {'ids': keyword_ids, 'sector_id': sector_id}
if hasattr(run_ai_task, 'delay'):
# Async via Celery
task = run_ai_task.delay(
function_name='auto_cluster',
payload=payload,
account_id=account.id
)
return {'success': True, 'task_id': str(task.id)}
else:
# Sync execution
result = run_ai_task(
function_name='auto_cluster',
payload=payload,
account_id=account.id
)
return result
```
---
### 2. GenerateIdeasFunction
**File:** `backend/igny8_core/ai/functions/generate_ideas.py`
**Purpose:** Generate SEO-optimized content ideas from keyword clusters
**Class Definition:**
```python
class GenerateIdeasFunction(BaseAIFunction):
def get_name(self) -> str:
return 'generate_ideas'
def get_max_items(self) -> int:
return 10 # Max clusters per batch
```
**Metadata:**
```python
{
'display_name': 'Generate Ideas',
'description': 'Generate SEO-optimized content ideas from keyword clusters',
'phases': {
'INIT': 'Initializing idea generation...',
'PREP': 'Loading clusters...',
'AI_CALL': 'Generating ideas with AI...',
'PARSE': 'Parsing idea data...',
'SAVE': 'Saving ideas...',
'DONE': 'Ideas generated!'
}
}
```
**Method: prepare()**
```python
def prepare(self, payload: dict, account=None) -> Dict:
cluster_ids = payload.get('ids', [])
clusters = Clusters.objects.filter(id__in=cluster_ids, account=account).select_related(
'sector', 'account', 'site'
).prefetch_related('keywords')
cluster_data = []
for cluster in clusters:
# Get keywords from Keywords model (via seed_keyword relationship)
keyword_objects = Keywords.objects.filter(cluster=cluster).select_related('seed_keyword')
keywords = [kw.seed_keyword.keyword for kw in keyword_objects if kw.seed_keyword]
cluster_data.append({
'id': cluster.id,
'name': cluster.name,
'description': cluster.description or '',
'keywords': keywords,
})
return {
'clusters': clusters,
'cluster_data': cluster_data,
'account': account or clusters[0].account
}
```
**Method: build_prompt()**
```python
def build_prompt(self, data: Dict, account=None) -> str:
cluster_data = data['cluster_data']
clusters_text = '\n'.join([
f"Cluster ID: {c['id']} | Name: {c['name']} | Description: {c.get('description', '')}"
for c in cluster_data
])
cluster_keywords_text = '\n'.join([
f"Cluster ID: {c['id']} | Name: {c['name']} | Keywords: {', '.join(c.get('keywords', []))}"
for c in cluster_data
])
prompt = PromptRegistry.get_prompt(
function_name='generate_ideas',
account=account or data['account'],
context={
'CLUSTERS': clusters_text,
'CLUSTER_KEYWORDS': cluster_keywords_text,
}
)
return prompt
```
**Method: parse_response()**
```python
def parse_response(self, response: str, step_tracker=None) -> List[Dict]:
ai_core = AICore(account=self.account)
json_data = ai_core.extract_json(response)
if not json_data or 'ideas' not in json_data:
raise ValueError(f"Failed to parse ideas response")
return json_data.get('ideas', [])
# Expected format: [{title, description, cluster_id, content_type, content_structure, ...}]
```
**Method: save_output()**
```python
def save_output(self, parsed: List[Dict], original_data: Dict, account=None,
progress_tracker=None, step_tracker=None) -> Dict:
clusters = original_data['clusters']
account = account or original_data['account']
ideas_created = 0
with transaction.atomic():
for idea_data in parsed:
# Match cluster by ID or name
cluster = None
cluster_id_from_ai = idea_data.get('cluster_id')
cluster_name = idea_data.get('cluster_name', '')
if cluster_id_from_ai:
cluster = next((c for c in clusters if c.id == cluster_id_from_ai), None)
if not cluster and cluster_name:
cluster = next((c for c in clusters if c.name == cluster_name), None)
if not cluster:
continue
site = cluster.site or (cluster.sector.site if cluster.sector else None)
# Handle description (might be dict or string)
description = idea_data.get('description', '')
if isinstance(description, dict):
description = json.dumps(description)
# Create ContentIdeas record
ContentIdeas.objects.create(
idea_title=idea_data.get('title', 'Untitled Idea'),
description=description,
content_type=idea_data.get('content_type', 'post'),
content_structure=idea_data.get('content_structure', 'article'),
target_keywords=idea_data.get('covered_keywords', '') or idea_data.get('target_keywords', ''),
keyword_cluster=cluster,
estimated_word_count=idea_data.get('estimated_word_count', 1500),
status='new',
account=account,
site=site,
sector=cluster.sector,
)
ideas_created += 1
# Update cluster status
if cluster.status == 'new':
cluster.status = 'mapped'
cluster.save()
return {
'count': ideas_created,
'ideas_created': ideas_created
}
```
**Service Integration:**
```python
# backend/igny8_core/business/planning/services/ideas_service.py
class IdeasService:
def generate_ideas(self, cluster_ids, account):
from igny8_core.ai.tasks import run_ai_task
payload = {'ids': cluster_ids}
if hasattr(run_ai_task, 'delay'):
task = run_ai_task.delay(
function_name='auto_generate_ideas',
payload=payload,
account_id=account.id
)
return {'success': True, 'task_id': str(task.id)}
else:
result = run_ai_task(
function_name='auto_generate_ideas',
payload=payload,
account_id=account.id
)
return result
```
---
## Writer Module AI Functions
### 3. GenerateContentFunction
**File:** `backend/igny8_core/ai/functions/generate_content.py`
**Purpose:** Generate complete article content from task requirements
**Class Definition:**
```python
class GenerateContentFunction(BaseAIFunction):
def get_name(self) -> str:
return 'generate_content'
def get_max_items(self) -> int:
return 50 # Max tasks per batch
```
**Key Implementation Details:**
**Method: prepare()**
```python
def prepare(self, payload: dict, account=None) -> List:
task_ids = payload.get('ids', [])
tasks = Tasks.objects.filter(id__in=task_ids, account=account).select_related(
'account', 'site', 'sector', 'cluster', 'taxonomy_term'
)
return list(tasks)
```
**Method: build_prompt()**
```python
def build_prompt(self, data: Any, account=None) -> str:
task = data[0] if isinstance(data, list) else data
# Build idea data
idea_data = f"Title: {task.title or 'Untitled'}\n"
if task.description:
idea_data += f"Description: {task.description}\n"
idea_data += f"Content Type: {task.content_type or 'post'}\n"
idea_data += f"Content Structure: {task.content_structure or 'article'}\n"
# Build cluster context
cluster_data = ''
if task.cluster:
cluster_data = f"Cluster Name: {task.cluster.name}\n"
if task.cluster.description:
cluster_data += f"Description: {task.cluster.description}\n"
# Build taxonomy context
taxonomy_data = ''
if task.taxonomy_term:
taxonomy_data = f"Taxonomy: {task.taxonomy_term.name}\n"
if task.taxonomy_term.taxonomy_type:
taxonomy_data += f"Type: {task.taxonomy_term.get_taxonomy_type_display()}\n"
# Build keywords
keywords_data = ''
if task.keywords:
keywords_data = f"Keywords: {task.keywords}\n"
prompt = PromptRegistry.get_prompt(
function_name='generate_content',
account=account or task.account,
task=task,
context={
'IDEA': idea_data,
'CLUSTER': cluster_data,
'TAXONOMY': taxonomy_data,
'KEYWORDS': keywords_data,
}
)
return prompt
```
**Method: parse_response()**
```python
def parse_response(self, response: str, step_tracker=None) -> Dict:
# Try JSON parse first
try:
parsed_json = json.loads(response.strip())
if isinstance(parsed_json, dict):
return parsed_json
except (json.JSONDecodeError, ValueError):
pass
# Fallback: normalize plain HTML content
try:
from igny8_core.utils.content_normalizer import normalize_content
normalized = normalize_content(response)
return {'content': normalized['normalized_content']}
except Exception:
return {'content': response}
```
**Method: save_output() - CRITICAL WITH TAGS/CATEGORIES**
```python
def save_output(self, parsed: Any, original_data: Any, account=None,
progress_tracker=None, step_tracker=None) -> Dict:
task = original_data[0] if isinstance(original_data, list) else original_data
# Extract fields from parsed response
if isinstance(parsed, dict):
content_html = parsed.get('content', '')
title = parsed.get('title') or task.title
meta_title = parsed.get('meta_title') or parsed.get('seo_title') or title
meta_description = parsed.get('meta_description') or parsed.get('seo_description')
primary_keyword = parsed.get('primary_keyword') or parsed.get('focus_keyword')
secondary_keywords = parsed.get('secondary_keywords') or parsed.get('keywords', [])
tags_from_response = parsed.get('tags', [])
categories_from_response = parsed.get('categories', [])
else:
content_html = str(parsed)
title = task.title
# ... defaults
tags_from_response = []
categories_from_response = []
# Calculate word count
word_count = 0
if content_html:
text_for_counting = re.sub(r'<[^>]+>', '', content_html)
word_count = len(text_for_counting.split())
# Create Content record (independent, NOT OneToOne with Task)
content_record = Content.objects.create(
title=title,
content_html=content_html,
word_count=word_count,
meta_title=meta_title,
meta_description=meta_description,
primary_keyword=primary_keyword,
secondary_keywords=secondary_keywords if isinstance(secondary_keywords, list) else [],
cluster=task.cluster,
content_type=task.content_type,
content_structure=task.content_structure,
source='igny8',
status='draft',
account=task.account,
site=task.site,
sector=task.sector,
)
# Link taxonomy term from task
if task.taxonomy_term:
content_record.taxonomy_terms.add(task.taxonomy_term)
# Process tags from AI response
if tags_from_response and isinstance(tags_from_response, list):
from django.utils.text import slugify
for tag_name in tags_from_response:
if tag_name and isinstance(tag_name, str):
tag_name = tag_name.strip()
if tag_name:
tag_slug = slugify(tag_name)
tag_obj, created = ContentTaxonomy.objects.get_or_create(
site=task.site,
slug=tag_slug,
taxonomy_type='tag',
defaults={
'name': tag_name,
'sector': task.sector,
'account': task.account,
}
)
content_record.taxonomy_terms.add(tag_obj)
# Process categories from AI response
if categories_from_response and isinstance(categories_from_response, list):
from django.utils.text import slugify
for category_name in categories_from_response:
if category_name and isinstance(category_name, str):
category_name = category_name.strip()
if category_name:
category_slug = slugify(category_name)
category_obj, created = ContentTaxonomy.objects.get_or_create(
site=task.site,
slug=category_slug,
taxonomy_type='category',
defaults={
'name': category_name,
'sector': task.sector,
'account': task.account,
}
)
content_record.taxonomy_terms.add(category_obj)
# Update task status
task.status = 'completed'
task.save(update_fields=['status', 'updated_at'])
# Auto-sync idea status
if hasattr(task, 'idea') and task.idea:
task.idea.status = 'completed'
task.idea.save(update_fields=['status', 'updated_at'])
return {
'count': 1,
'content_id': content_record.id,
'task_id': task.id,
'word_count': word_count,
}
```
---
### 4. GenerateImagePromptsFunction
**File:** `backend/igny8_core/ai/functions/generate_image_prompts.py`
**Purpose:** Extract detailed image generation prompts from content HTML
**Class Definition:**
```python
class GenerateImagePromptsFunction(BaseAIFunction):
def get_name(self) -> str:
return 'generate_image_prompts'
def get_max_items(self) -> int:
return 50 # Max content records per batch
```
**Method: prepare()**
```python
def prepare(self, payload: dict, account=None) -> List:
content_ids = payload.get('ids', [])
contents = Content.objects.filter(id__in=content_ids, account=account).select_related(
'account', 'site', 'sector', 'cluster'
)
max_images = self._get_max_in_article_images(account)
extracted_data = []
for content in contents:
extracted = self._extract_content_elements(content, max_images)
extracted_data.append({
'content': content,
'extracted': extracted,
'max_images': max_images,
})
return extracted_data
```
**Helper: _extract_content_elements()**
```python
def _extract_content_elements(self, content: Content, max_images: int) -> Dict:
from bs4 import BeautifulSoup
html_content = content.content_html or ''
soup = BeautifulSoup(html_content, 'html.parser')
# Extract title
title = content.title or ''
# Extract intro paragraphs (skip italic hook)
paragraphs = soup.find_all('p')
intro_paragraphs = []
for p in paragraphs[:3]:
text = p.get_text(strip=True)
if len(text.split()) > 50: # Real paragraph
intro_paragraphs.append(text)
if len(intro_paragraphs) >= 2:
break
# Extract H2 headings
h2_tags = soup.find_all('h2')
h2_headings = [h2.get_text(strip=True) for h2 in h2_tags[:max_images]]
return {
'title': title,
'intro_paragraphs': intro_paragraphs,
'h2_headings': h2_headings,
}
```
**Method: save_output()**
```python
def save_output(self, parsed: Dict, original_data: Any, account=None,
progress_tracker=None, step_tracker=None) -> Dict:
data = original_data[0] if isinstance(original_data, list) else original_data
content = data['content']
max_images = data['max_images']
prompts_created = 0
with transaction.atomic():
# Save featured image prompt
Images.objects.update_or_create(
content=content,
image_type='featured',
defaults={
'prompt': parsed['featured_prompt'],
'status': 'pending',
'position': 0,
}
)
prompts_created += 1
# Save in-article image prompts
in_article_prompts = parsed.get('in_article_prompts', [])
for idx, prompt_text in enumerate(in_article_prompts[:max_images]):
Images.objects.update_or_create(
content=content,
image_type='in_article',
position=idx + 1,
defaults={
'prompt': prompt_text,
'status': 'pending',
}
)
prompts_created += 1
return {
'count': prompts_created,
'prompts_created': prompts_created,
}
```
---
### 5. GenerateImagesFunction
**File:** `backend/igny8_core/ai/functions/generate_images.py`
**Purpose:** Generate actual image URLs from Image records with prompts
**Note:** This function is partially implemented. The actual image generation happens via provider APIs.
---
## AI Function Base Class
**File:** `backend/igny8_core/ai/base.py`
All AI functions inherit from this abstract base:
```python
class BaseAIFunction(ABC):
"""Base class for all AI functions"""
@abstractmethod
def get_name(self) -> str:
"""Return function name (e.g., 'auto_cluster')"""
pass
def get_metadata(self) -> Dict:
"""Return function metadata (display name, description, phases)"""
return {
'display_name': self.get_name().replace('_', ' ').title(),
'description': f'{self.get_name()} AI function',
'phases': {
'INIT': 'Initializing...',
'PREP': 'Preparing data...',
'AI_CALL': 'Processing with AI...',
'PARSE': 'Parsing response...',
'SAVE': 'Saving results...',
'DONE': 'Complete!'
}
}
def validate(self, payload: dict, account=None) -> Dict[str, Any]:
"""Validate input payload"""
ids = payload.get('ids', [])
if not ids:
return {'valid': False, 'error': 'No IDs provided'}
return {'valid': True}
def get_max_items(self) -> Optional[int]:
"""Override to set max items limit"""
return None
@abstractmethod
def prepare(self, payload: dict, account=None) -> Any:
"""Load and prepare data for AI processing"""
pass
@abstractmethod
def build_prompt(self, data: Any, account=None) -> str:
"""Build AI prompt from prepared data"""
pass
def get_model(self, account=None) -> Optional[str]:
"""Override to specify model (defaults to account's default model)"""
return None
@abstractmethod
def parse_response(self, response: str, step_tracker=None) -> Any:
"""Parse AI response into structured data"""
pass
@abstractmethod
def save_output(self, parsed: Any, original_data: Any, account=None,
progress_tracker=None, step_tracker=None) -> Dict:
"""Save parsed results to database"""
pass
```
---
## AI Engine & Execution
**File:** `backend/igny8_core/ai/engine.py`
The `AIEngine` orchestrates all AI function execution:
```python
class AIEngine:
def __init__(self, account: Account):
self.account = account
self.ai_core = AICore(account=account)
def execute(self, fn: BaseAIFunction, payload: dict) -> Dict:
"""
Execute AI function with full orchestration:
1. Validation
2. Preparation
3. AI call
4. Response parsing
5. Output saving
6. Credit deduction (automatic)
7. Progress tracking
8. Logging
"""
# Step 1: Validate
validation = fn.validate(payload, self.account)
if not validation['valid']:
return {'success': False, 'error': validation['error']}
# Step 2: Prepare data
prepared_data = fn.prepare(payload, self.account)
# Step 3: Build prompt
prompt = fn.build_prompt(prepared_data, self.account)
# Step 4: Call AI (via AICore)
model = fn.get_model(self.account) or self._get_default_model()
response = self.ai_core.run_ai_request(
prompt=prompt,
model=model,
function_name=fn.get_name()
)
# Step 5: Parse response
parsed = fn.parse_response(response['content'])
# Step 6: Save output
result = fn.save_output(parsed, prepared_data, self.account)
# Step 7: Deduct credits (AUTOMATIC - line 395)
CreditService.deduct_credits_for_operation(
account=self.account,
operation_type=self._get_operation_type(),
amount=self._get_actual_amount(),
)
# Step 8: Log to AIUsageLog
AIUsageLog.objects.create(
account=self.account,
function_name=fn.get_name(),
credits_used=credits_deducted,
# ... other fields
)
return {
'success': True,
**result
}
```
**Key Point:** Credits are AUTOMATICALLY deducted by AIEngine. AI functions do NOT handle credits themselves.
---
## Credit System Integration
**Automatic Credit Deduction:**
All credit management happens in `AIEngine.execute()` at line 395:
```python
# backend/igny8_core/ai/engine.py line 395
CreditService.deduct_credits_for_operation(
account=account,
operation_type=self._get_operation_type(),
amount=self._get_actual_amount(),
)
```
**AI Functions DO NOT:**
- Calculate credit costs
- Call `CreditService` manually
- Handle credit errors (handled by AIEngine)
**AI Functions ONLY:**
- Focus on their specific logic
- Return `{'count': N}` in `save_output()`
- AIEngine uses `count` to calculate credits
---
## Progress Tracking
**StepTracker & ProgressTracker:**
All AI functions emit progress events through trackers:
```python
# Phases emitted automatically by AIEngine
phases = {
'INIT': 'Initializing...', # 0-10%
'PREP': 'Preparing data...', # 10-20%
'AI_CALL': 'Processing with AI...', # 20-80%
'PARSE': 'Parsing response...', # 80-90%
'SAVE': 'Saving results...', # 90-100%
'DONE': 'Complete!' # 100%
}
```
Frontend can listen to these events via:
- Celery task status polling
- WebSocket connections
- REST API `/task-progress/:task_id/` endpoint
---
## Summary
**6 AI Functions - All Production Ready:**
| Function | Lines of Code | Status | Used By |
|----------|---------------|--------|---------|
| `auto_cluster` | ~380 | ✅ Complete | Planner, Automation Stage 1 |
| `generate_ideas` | ~250 | ✅ Complete | Planner, Automation Stage 2 |
| `generate_content` | ~400 | ✅ Complete | Writer, Automation Stage 4 |
| `generate_image_prompts` | ~280 | ✅ Complete | Writer, Automation Stage 5 |
| `generate_images` | ~300 | ⚠️ Partial | Writer, Automation Stage 6 |
| `optimize_content` | ~200 | ✅ Complete | Writer (Manual) |
**Architecture Benefits:**
- Single source of truth for AI operations
- Consistent credit management
- Unified error handling
- Centralized progress tracking
- Easy to add new AI functions (inherit from BaseAIFunction)
### Current gaps vs code (Dec 2025)
- AIEngine now performs a credit pre-check before the AI call (still deducts after SAVE); this is not reflected in earlier notes.
- `generate_images` implementation is partially broken: it expects task IDs (not image IDs), tries to read `task.content` (field does not exist), and uses the `extract_image_prompts` prompt path; credit estimation also looks for `image_ids`. Treat it as partial/needs fix.
- AIEngine includes messaging/cost maps for `generate_site_structure` (extra function beyond the documented six); not presently documented above.
---
## Automation Integration
**VERIFIED:** All AI functions are integrated into the IGNY8 Automation Pipeline.
### 7-Stage Automation Pipeline
The automation system (`backend/igny8_core/business/automation/services/automation_service.py`) uses 5 of the 6 AI functions:
```
Stage 1: Keywords → Clusters
↓ Uses: AutoClusterFunction
↓ Credits: ~0.2 per keyword
Stage 2: Clusters → Ideas
↓ Uses: GenerateIdeasFunction
↓ Credits: 2 per cluster
Stage 3: Ideas → Tasks
↓ Uses: None (Local operation)
↓ Credits: 0
Stage 4: Tasks → Content
↓ Uses: GenerateContentFunction
↓ Credits: ~5 per task (2500 words)
Stage 5: Content → Image Prompts
↓ Uses: GenerateImagePromptsFunction
↓ Credits: ~2 per content
Stage 6: Image Prompts → Images
↓ Uses: GenerateImagesFunction ⚠️
↓ Credits: 1-4 per image
Stage 7: Manual Review Gate
↓ Uses: None (Manual intervention)
↓ Credits: 0
```
### Automation Execution Flow
```python
# AutomationService.run_stage_1() example
def run_stage_1(self):
keywords = Keywords.objects.filter(site=self.site, status='new')[:batch_size]
# Call AI function via Celery
from igny8_core.ai.tasks import run_ai_task
result = run_ai_task(
function_name='auto_cluster',
payload={'ids': [k.id for k in keywords]},
account_id=self.account.id
)
# Credits automatically deducted by AIEngine
return {
'keywords_processed': len(keywords),
'clusters_created': result.get('count', 0),
'credits_used': result.get('credits_used', 0)
}
```
### Frontend Access
**Automation Page:** `/automation` (Fully functional)
- Real-time pipeline overview
- Manual trigger ("Run Now" button)
- Pause/Resume controls
- Live progress tracking
- Activity logs
- Run history
**Planner & Writer Pages:** Individual AI function triggers
- Cluster keywords (Stage 1)
- Generate ideas (Stage 2)
- Generate content (Stage 4)
- Extract image prompts (Stage 5)
- Generate images (Stage 6)
---
**End of AI Functions Reference**