Files
igny8/docs/ai/AI-FUNCTIONS-COMPLETE-REFERENCE.md
IGNY8 VPS (Salman) 2420f1678d docs 1
2025-12-07 11:28:32 +00:00

37 KiB

IGNY8 AI Functions - Complete Technical Reference

Date: December 3, 2025
Version: 2.0 - CORRECTED AFTER AUTOMATION AUDIT
100% Based on Actual Codebase (Backend + Frontend + Automation Integration)


Table of Contents

  1. Overview
  2. AI Architecture
  3. AI Function Registry
  4. Planner Module AI Functions
  5. Writer Module AI Functions
  6. AI Function Base Class
  7. AI Engine & Execution
  8. Credit System Integration
  9. Progress Tracking

Overview

IGNY8 uses a centralized AI function architecture where all AI operations inherit from BaseAIFunction and execute through AIEngine. This ensures consistent:

  • Credit management
  • Progress tracking
  • Error handling
  • Logging
  • Response parsing

Total AI Functions: 6

Function Module Purpose Input Output Credits
auto_cluster Planner Group keywords into semantic clusters Keyword IDs Clusters created ~1 per 5 keywords
generate_ideas Planner Generate content ideas from clusters Cluster IDs Ideas created 2 per cluster
generate_content Writer Generate article content from tasks Task IDs Content drafts ~5 per 2500 words
generate_image_prompts Writer Extract image prompts from content Content IDs Image records with prompts ~2 per content
generate_images Writer Generate actual images from prompts Image IDs Image URLs 1-4 per image
optimize_content Writer SEO optimization of content Content IDs Updated content ~1 per content

AI Architecture

Directory Structure

backend/igny8_core/ai/
├── __init__.py
├── base.py                    # BaseAIFunction (abstract class)
├── engine.py                  # AIEngine (execution orchestrator)
├── registry.py                # Function registration & lazy loading
├── ai_core.py                 # Core AI API interactions
├── prompts.py                 # PromptRegistry
├── tasks.py                   # Celery tasks for async execution
├── models.py                  # AITaskLog, AIUsageLog
├── validators.py              # Input validation helpers
├── settings.py                # AI configuration
├── tracker.py                 # ProgressTracker, StepTracker
└── functions/
    ├── __init__.py
    ├── auto_cluster.py        # AutoClusterFunction
    ├── generate_ideas.py      # GenerateIdeasFunction
    ├── generate_content.py    # GenerateContentFunction
    ├── generate_image_prompts.py  # GenerateImagePromptsFunction
    ├── generate_images.py     # GenerateImagesFunction
    └── optimize_content.py    # OptimizeContentFunction

Execution Flow

User Action → API Endpoint → Service Layer → AIEngine.execute()
                                                    ↓
                                            BaseAIFunction
                                                    ↓
                                ┌───────────────────┴───────────────────┐
                                ↓                                       ↓
                        Synchronous (small ops)              Async (Celery task)
                                ↓                                       ↓
                    Direct function execution              run_ai_task.delay()
                                ↓                                       ↓
                    INIT → PREP → AI_CALL → PARSE → SAVE → DONE
                                ↓
                    Credit deduction (automatic)
                                ↓
                    Progress tracking (StepTracker)
                                ↓
                    AIUsageLog created

AI Function Registry

File: backend/igny8_core/ai/registry.py

Lazy Loading System

Functions are registered with lazy loaders and only imported when called:

_FUNCTION_REGISTRY: Dict[str, Type[BaseAIFunction]] = {}
_FUNCTION_LOADERS: Dict[str, callable] = {}

def get_function_instance(name: str) -> Optional[BaseAIFunction]:
    """Get function instance by name - lazy loads if needed"""
    actual_name = FUNCTION_ALIASES.get(name, name)
    fn_class = get_function(actual_name)
    if fn_class:
        return fn_class()
    return None

Registered Functions

# Lazy loaders
register_lazy_function('auto_cluster', _load_auto_cluster)
register_lazy_function('generate_ideas', _load_generate_ideas)
register_lazy_function('generate_content', _load_generate_content)
register_lazy_function('generate_images', _load_generate_images)
register_lazy_function('generate_image_prompts', _load_generate_image_prompts)
register_lazy_function('optimize_content', _load_optimize_content)

Planner Module AI Functions

1. AutoClusterFunction

File: backend/igny8_core/ai/functions/auto_cluster.py

Purpose: Groups semantically related keywords into topic clusters using AI

Class Definition:

class AutoClusterFunction(BaseAIFunction):
    def get_name(self) -> str:
        return 'auto_cluster'

Metadata:

{
    'display_name': 'Auto Cluster Keywords',
    'description': 'Group related keywords into semantic clusters using AI',
    'phases': {
        'INIT': 'Initializing clustering...',
        'PREP': 'Loading keywords...',
        'AI_CALL': 'Analyzing keyword relationships...',
        'PARSE': 'Parsing cluster data...',
        'SAVE': 'Creating clusters...',
        'DONE': 'Clustering complete!'
    }
}

Method: validate()

def validate(self, payload: dict, account=None) -> Dict:
    # Validates:
    # - IDs exist
    # - Keywords exist in database
    # - Account ownership
    # NO MAX LIMIT - processes any count
    return {'valid': True}

Method: prepare()

def prepare(self, payload: dict, account=None) -> Dict:
    ids = payload.get('ids', [])
    sector_id = payload.get('sector_id')
    
    keywords = Keywords.objects.filter(id__in=ids, account=account).select_related(
        'account', 'site', 'sector', 'seed_keyword'
    )
    
    return {
        'keywords': keywords,  # Keyword objects
        'keyword_data': [      # Data for AI
            {
                'id': kw.id,
                'keyword': kw.keyword,  # From seed_keyword relationship
                'volume': kw.volume,
                'difficulty': kw.difficulty,
                'intent': kw.intent,
            }
            for kw in keywords
        ],
        'sector_id': sector_id
    }

Method: build_prompt()

def build_prompt(self, data: Dict, account=None) -> str:
    keyword_data = data['keyword_data']
    
    # Format keywords for prompt
    keywords_text = '\n'.join([
        f"- {kw['keyword']} (Volume: {kw['volume']}, Difficulty: {kw['difficulty']}, Intent: {kw['intent']})"
        for kw in keyword_data
    ])
    
    # Get prompt template from registry
    prompt = PromptRegistry.get_prompt(
        function_name='auto_cluster',
        account=account,
        context={'KEYWORDS': keywords_text}
    )
    
    # Ensure JSON mode compatibility
    if 'json' not in prompt.lower():
        prompt += "\n\nIMPORTANT: You must respond with valid JSON only."
    
    return prompt

Method: parse_response()

def parse_response(self, response: str, step_tracker=None) -> List[Dict]:
    # Try direct JSON parse
    try:
        json_data = json.loads(response.strip())
    except json.JSONDecodeError:
        # Fallback to extract_json (handles markdown code blocks)
        ai_core = AICore(account=self.account)
        json_data = ai_core.extract_json(response)
    
    # Extract clusters array
    if isinstance(json_data, dict):
        clusters = json_data.get('clusters', [])
    elif isinstance(json_data, list):
        clusters = json_data
    
    return clusters  # [{name, keywords: [], description}]

Method: save_output()

def save_output(self, parsed: List[Dict], original_data: Dict, account=None, 
                progress_tracker=None, step_tracker=None) -> Dict:
    keywords = original_data['keywords']
    account = account or keywords[0].account
    site = keywords[0].site
    sector = keywords[0].sector
    
    clusters_created = 0
    keywords_updated = 0
    
    with transaction.atomic():
        for cluster_data in parsed:
            cluster_name = cluster_data.get('name', '')
            cluster_keywords = cluster_data.get('keywords', [])
            
            # Get or create cluster
            cluster, created = Clusters.objects.get_or_create(
                name=cluster_name,
                account=account,
                site=site,
                sector=sector,
                defaults={
                    'description': cluster_data.get('description', ''),
                    'status': 'active',
                }
            )
            
            if created:
                clusters_created += 1
            
            # Match keywords (case-insensitive)
            for keyword_obj in keywords:
                if keyword_obj.keyword.lower() in [k.lower() for k in cluster_keywords]:
                    keyword_obj.cluster = cluster
                    keyword_obj.status = 'mapped'
                    keyword_obj.save()
                    keywords_updated += 1
        
        # Recalculate cluster metrics
        for cluster in Clusters.objects.filter(account=account, site=site, sector=sector):
            cluster.keywords_count = Keywords.objects.filter(cluster=cluster).count()
            cluster.volume = Keywords.objects.filter(cluster=cluster).aggregate(
                total=Sum(Case(
                    When(volume_override__isnull=False, then=F('volume_override')),
                    default=F('seed_keyword__volume'),
                    output_field=IntegerField()
                ))
            )['total'] or 0
            cluster.save()
    
    return {
        'count': clusters_created,
        'clusters_created': clusters_created,
        'keywords_updated': keywords_updated
    }

Service Integration:

# backend/igny8_core/business/planning/services/clustering_service.py
class ClusteringService:
    def cluster_keywords(self, keyword_ids, account, sector_id=None):
        from igny8_core.ai.tasks import run_ai_task
        
        payload = {'ids': keyword_ids, 'sector_id': sector_id}
        
        if hasattr(run_ai_task, 'delay'):
            # Async via Celery
            task = run_ai_task.delay(
                function_name='auto_cluster',
                payload=payload,
                account_id=account.id
            )
            return {'success': True, 'task_id': str(task.id)}
        else:
            # Sync execution
            result = run_ai_task(
                function_name='auto_cluster',
                payload=payload,
                account_id=account.id
            )
            return result

2. GenerateIdeasFunction

File: backend/igny8_core/ai/functions/generate_ideas.py

Purpose: Generate SEO-optimized content ideas from keyword clusters

Class Definition:

class GenerateIdeasFunction(BaseAIFunction):
    def get_name(self) -> str:
        return 'generate_ideas'
    
    def get_max_items(self) -> int:
        return 10  # Max clusters per batch

Metadata:

{
    'display_name': 'Generate Ideas',
    'description': 'Generate SEO-optimized content ideas from keyword clusters',
    'phases': {
        'INIT': 'Initializing idea generation...',
        'PREP': 'Loading clusters...',
        'AI_CALL': 'Generating ideas with AI...',
        'PARSE': 'Parsing idea data...',
        'SAVE': 'Saving ideas...',
        'DONE': 'Ideas generated!'
    }
}

Method: prepare()

def prepare(self, payload: dict, account=None) -> Dict:
    cluster_ids = payload.get('ids', [])
    
    clusters = Clusters.objects.filter(id__in=cluster_ids, account=account).select_related(
        'sector', 'account', 'site'
    ).prefetch_related('keywords')
    
    cluster_data = []
    for cluster in clusters:
        # Get keywords from Keywords model (via seed_keyword relationship)
        keyword_objects = Keywords.objects.filter(cluster=cluster).select_related('seed_keyword')
        keywords = [kw.seed_keyword.keyword for kw in keyword_objects if kw.seed_keyword]
        
        cluster_data.append({
            'id': cluster.id,
            'name': cluster.name,
            'description': cluster.description or '',
            'keywords': keywords,
        })
    
    return {
        'clusters': clusters,
        'cluster_data': cluster_data,
        'account': account or clusters[0].account
    }

Method: build_prompt()

def build_prompt(self, data: Dict, account=None) -> str:
    cluster_data = data['cluster_data']
    
    clusters_text = '\n'.join([
        f"Cluster ID: {c['id']} | Name: {c['name']} | Description: {c.get('description', '')}"
        for c in cluster_data
    ])
    
    cluster_keywords_text = '\n'.join([
        f"Cluster ID: {c['id']} | Name: {c['name']} | Keywords: {', '.join(c.get('keywords', []))}"
        for c in cluster_data
    ])
    
    prompt = PromptRegistry.get_prompt(
        function_name='generate_ideas',
        account=account or data['account'],
        context={
            'CLUSTERS': clusters_text,
            'CLUSTER_KEYWORDS': cluster_keywords_text,
        }
    )
    
    return prompt

Method: parse_response()

def parse_response(self, response: str, step_tracker=None) -> List[Dict]:
    ai_core = AICore(account=self.account)
    json_data = ai_core.extract_json(response)
    
    if not json_data or 'ideas' not in json_data:
        raise ValueError(f"Failed to parse ideas response")
    
    return json_data.get('ideas', [])
    # Expected format: [{title, description, cluster_id, content_type, content_structure, ...}]

Method: save_output()

def save_output(self, parsed: List[Dict], original_data: Dict, account=None,
                progress_tracker=None, step_tracker=None) -> Dict:
    clusters = original_data['clusters']
    account = account or original_data['account']
    
    ideas_created = 0
    
    with transaction.atomic():
        for idea_data in parsed:
            # Match cluster by ID or name
            cluster = None
            cluster_id_from_ai = idea_data.get('cluster_id')
            cluster_name = idea_data.get('cluster_name', '')
            
            if cluster_id_from_ai:
                cluster = next((c for c in clusters if c.id == cluster_id_from_ai), None)
            
            if not cluster and cluster_name:
                cluster = next((c for c in clusters if c.name == cluster_name), None)
            
            if not cluster:
                continue
            
            site = cluster.site or (cluster.sector.site if cluster.sector else None)
            
            # Handle description (might be dict or string)
            description = idea_data.get('description', '')
            if isinstance(description, dict):
                description = json.dumps(description)
            
            # Create ContentIdeas record
            ContentIdeas.objects.create(
                idea_title=idea_data.get('title', 'Untitled Idea'),
                description=description,
                content_type=idea_data.get('content_type', 'post'),
                content_structure=idea_data.get('content_structure', 'article'),
                target_keywords=idea_data.get('covered_keywords', '') or idea_data.get('target_keywords', ''),
                keyword_cluster=cluster,
                estimated_word_count=idea_data.get('estimated_word_count', 1500),
                status='new',
                account=account,
                site=site,
                sector=cluster.sector,
            )
            ideas_created += 1
            
            # Update cluster status
            if cluster.status == 'new':
                cluster.status = 'mapped'
                cluster.save()
    
    return {
        'count': ideas_created,
        'ideas_created': ideas_created
    }

Service Integration:

# backend/igny8_core/business/planning/services/ideas_service.py
class IdeasService:
    def generate_ideas(self, cluster_ids, account):
        from igny8_core.ai.tasks import run_ai_task
        
        payload = {'ids': cluster_ids}
        
        if hasattr(run_ai_task, 'delay'):
            task = run_ai_task.delay(
                function_name='auto_generate_ideas',
                payload=payload,
                account_id=account.id
            )
            return {'success': True, 'task_id': str(task.id)}
        else:
            result = run_ai_task(
                function_name='auto_generate_ideas',
                payload=payload,
                account_id=account.id
            )
            return result

Writer Module AI Functions

3. GenerateContentFunction

File: backend/igny8_core/ai/functions/generate_content.py

Purpose: Generate complete article content from task requirements

Class Definition:

class GenerateContentFunction(BaseAIFunction):
    def get_name(self) -> str:
        return 'generate_content'
    
    def get_max_items(self) -> int:
        return 50  # Max tasks per batch

Key Implementation Details:

Method: prepare()

def prepare(self, payload: dict, account=None) -> List:
    task_ids = payload.get('ids', [])
    
    tasks = Tasks.objects.filter(id__in=task_ids, account=account).select_related(
        'account', 'site', 'sector', 'cluster', 'taxonomy_term'
    )
    
    return list(tasks)

Method: build_prompt()

def build_prompt(self, data: Any, account=None) -> str:
    task = data[0] if isinstance(data, list) else data
    
    # Build idea data
    idea_data = f"Title: {task.title or 'Untitled'}\n"
    if task.description:
        idea_data += f"Description: {task.description}\n"
    idea_data += f"Content Type: {task.content_type or 'post'}\n"
    idea_data += f"Content Structure: {task.content_structure or 'article'}\n"
    
    # Build cluster context
    cluster_data = ''
    if task.cluster:
        cluster_data = f"Cluster Name: {task.cluster.name}\n"
        if task.cluster.description:
            cluster_data += f"Description: {task.cluster.description}\n"
    
    # Build taxonomy context
    taxonomy_data = ''
    if task.taxonomy_term:
        taxonomy_data = f"Taxonomy: {task.taxonomy_term.name}\n"
        if task.taxonomy_term.taxonomy_type:
            taxonomy_data += f"Type: {task.taxonomy_term.get_taxonomy_type_display()}\n"
    
    # Build keywords
    keywords_data = ''
    if task.keywords:
        keywords_data = f"Keywords: {task.keywords}\n"
    
    prompt = PromptRegistry.get_prompt(
        function_name='generate_content',
        account=account or task.account,
        task=task,
        context={
            'IDEA': idea_data,
            'CLUSTER': cluster_data,
            'TAXONOMY': taxonomy_data,
            'KEYWORDS': keywords_data,
        }
    )
    
    return prompt

Method: parse_response()

def parse_response(self, response: str, step_tracker=None) -> Dict:
    # Try JSON parse first
    try:
        parsed_json = json.loads(response.strip())
        if isinstance(parsed_json, dict):
            return parsed_json
    except (json.JSONDecodeError, ValueError):
        pass
    
    # Fallback: normalize plain HTML content
    try:
        from igny8_core.utils.content_normalizer import normalize_content
        normalized = normalize_content(response)
        return {'content': normalized['normalized_content']}
    except Exception:
        return {'content': response}

Method: save_output() - CRITICAL WITH TAGS/CATEGORIES

def save_output(self, parsed: Any, original_data: Any, account=None,
                progress_tracker=None, step_tracker=None) -> Dict:
    task = original_data[0] if isinstance(original_data, list) else original_data
    
    # Extract fields from parsed response
    if isinstance(parsed, dict):
        content_html = parsed.get('content', '')
        title = parsed.get('title') or task.title
        meta_title = parsed.get('meta_title') or parsed.get('seo_title') or title
        meta_description = parsed.get('meta_description') or parsed.get('seo_description')
        primary_keyword = parsed.get('primary_keyword') or parsed.get('focus_keyword')
        secondary_keywords = parsed.get('secondary_keywords') or parsed.get('keywords', [])
        tags_from_response = parsed.get('tags', [])
        categories_from_response = parsed.get('categories', [])
    else:
        content_html = str(parsed)
        title = task.title
        # ... defaults
        tags_from_response = []
        categories_from_response = []
    
    # Calculate word count
    word_count = 0
    if content_html:
        text_for_counting = re.sub(r'<[^>]+>', '', content_html)
        word_count = len(text_for_counting.split())
    
    # Create Content record (independent, NOT OneToOne with Task)
    content_record = Content.objects.create(
        title=title,
        content_html=content_html,
        word_count=word_count,
        meta_title=meta_title,
        meta_description=meta_description,
        primary_keyword=primary_keyword,
        secondary_keywords=secondary_keywords if isinstance(secondary_keywords, list) else [],
        cluster=task.cluster,
        content_type=task.content_type,
        content_structure=task.content_structure,
        source='igny8',
        status='draft',
        account=task.account,
        site=task.site,
        sector=task.sector,
    )
    
    # Link taxonomy term from task
    if task.taxonomy_term:
        content_record.taxonomy_terms.add(task.taxonomy_term)
    
    # Process tags from AI response
    if tags_from_response and isinstance(tags_from_response, list):
        from django.utils.text import slugify
        for tag_name in tags_from_response:
            if tag_name and isinstance(tag_name, str):
                tag_name = tag_name.strip()
                if tag_name:
                    tag_slug = slugify(tag_name)
                    tag_obj, created = ContentTaxonomy.objects.get_or_create(
                        site=task.site,
                        slug=tag_slug,
                        taxonomy_type='tag',
                        defaults={
                            'name': tag_name,
                            'sector': task.sector,
                            'account': task.account,
                        }
                    )
                    content_record.taxonomy_terms.add(tag_obj)
    
    # Process categories from AI response
    if categories_from_response and isinstance(categories_from_response, list):
        from django.utils.text import slugify
        for category_name in categories_from_response:
            if category_name and isinstance(category_name, str):
                category_name = category_name.strip()
                if category_name:
                    category_slug = slugify(category_name)
                    category_obj, created = ContentTaxonomy.objects.get_or_create(
                        site=task.site,
                        slug=category_slug,
                        taxonomy_type='category',
                        defaults={
                            'name': category_name,
                            'sector': task.sector,
                            'account': task.account,
                        }
                    )
                    content_record.taxonomy_terms.add(category_obj)
    
    # Update task status
    task.status = 'completed'
    task.save(update_fields=['status', 'updated_at'])
    
    # Auto-sync idea status
    if hasattr(task, 'idea') and task.idea:
        task.idea.status = 'completed'
        task.idea.save(update_fields=['status', 'updated_at'])
    
    return {
        'count': 1,
        'content_id': content_record.id,
        'task_id': task.id,
        'word_count': word_count,
    }

4. GenerateImagePromptsFunction

File: backend/igny8_core/ai/functions/generate_image_prompts.py

Purpose: Extract detailed image generation prompts from content HTML

Class Definition:

class GenerateImagePromptsFunction(BaseAIFunction):
    def get_name(self) -> str:
        return 'generate_image_prompts'
    
    def get_max_items(self) -> int:
        return 50  # Max content records per batch

Method: prepare()

def prepare(self, payload: dict, account=None) -> List:
    content_ids = payload.get('ids', [])
    
    contents = Content.objects.filter(id__in=content_ids, account=account).select_related(
        'account', 'site', 'sector', 'cluster'
    )
    
    max_images = self._get_max_in_article_images(account)
    
    extracted_data = []
    for content in contents:
        extracted = self._extract_content_elements(content, max_images)
        extracted_data.append({
            'content': content,
            'extracted': extracted,
            'max_images': max_images,
        })
    
    return extracted_data

Helper: _extract_content_elements()

def _extract_content_elements(self, content: Content, max_images: int) -> Dict:
    from bs4 import BeautifulSoup
    
    html_content = content.content_html or ''
    soup = BeautifulSoup(html_content, 'html.parser')
    
    # Extract title
    title = content.title or ''
    
    # Extract intro paragraphs (skip italic hook)
    paragraphs = soup.find_all('p')
    intro_paragraphs = []
    for p in paragraphs[:3]:
        text = p.get_text(strip=True)
        if len(text.split()) > 50:  # Real paragraph
            intro_paragraphs.append(text)
            if len(intro_paragraphs) >= 2:
                break
    
    # Extract H2 headings
    h2_tags = soup.find_all('h2')
    h2_headings = [h2.get_text(strip=True) for h2 in h2_tags[:max_images]]
    
    return {
        'title': title,
        'intro_paragraphs': intro_paragraphs,
        'h2_headings': h2_headings,
    }

Method: save_output()

def save_output(self, parsed: Dict, original_data: Any, account=None,
                progress_tracker=None, step_tracker=None) -> Dict:
    data = original_data[0] if isinstance(original_data, list) else original_data
    content = data['content']
    max_images = data['max_images']
    
    prompts_created = 0
    
    with transaction.atomic():
        # Save featured image prompt
        Images.objects.update_or_create(
            content=content,
            image_type='featured',
            defaults={
                'prompt': parsed['featured_prompt'],
                'status': 'pending',
                'position': 0,
            }
        )
        prompts_created += 1
        
        # Save in-article image prompts
        in_article_prompts = parsed.get('in_article_prompts', [])
        for idx, prompt_text in enumerate(in_article_prompts[:max_images]):
            Images.objects.update_or_create(
                content=content,
                image_type='in_article',
                position=idx + 1,
                defaults={
                    'prompt': prompt_text,
                    'status': 'pending',
                }
            )
            prompts_created += 1
    
    return {
        'count': prompts_created,
        'prompts_created': prompts_created,
    }

5. GenerateImagesFunction

File: backend/igny8_core/ai/functions/generate_images.py

Purpose: Generate actual image URLs from Image records with prompts

Note: This function is partially implemented. The actual image generation happens via provider APIs.


AI Function Base Class

File: backend/igny8_core/ai/base.py

All AI functions inherit from this abstract base:

class BaseAIFunction(ABC):
    """Base class for all AI functions"""
    
    @abstractmethod
    def get_name(self) -> str:
        """Return function name (e.g., 'auto_cluster')"""
        pass
    
    def get_metadata(self) -> Dict:
        """Return function metadata (display name, description, phases)"""
        return {
            'display_name': self.get_name().replace('_', ' ').title(),
            'description': f'{self.get_name()} AI function',
            'phases': {
                'INIT': 'Initializing...',
                'PREP': 'Preparing data...',
                'AI_CALL': 'Processing with AI...',
                'PARSE': 'Parsing response...',
                'SAVE': 'Saving results...',
                'DONE': 'Complete!'
            }
        }
    
    def validate(self, payload: dict, account=None) -> Dict[str, Any]:
        """Validate input payload"""
        ids = payload.get('ids', [])
        if not ids:
            return {'valid': False, 'error': 'No IDs provided'}
        return {'valid': True}
    
    def get_max_items(self) -> Optional[int]:
        """Override to set max items limit"""
        return None
    
    @abstractmethod
    def prepare(self, payload: dict, account=None) -> Any:
        """Load and prepare data for AI processing"""
        pass
    
    @abstractmethod
    def build_prompt(self, data: Any, account=None) -> str:
        """Build AI prompt from prepared data"""
        pass
    
    def get_model(self, account=None) -> Optional[str]:
        """Override to specify model (defaults to account's default model)"""
        return None
    
    @abstractmethod
    def parse_response(self, response: str, step_tracker=None) -> Any:
        """Parse AI response into structured data"""
        pass
    
    @abstractmethod
    def save_output(self, parsed: Any, original_data: Any, account=None,
                    progress_tracker=None, step_tracker=None) -> Dict:
        """Save parsed results to database"""
        pass

AI Engine & Execution

File: backend/igny8_core/ai/engine.py

The AIEngine orchestrates all AI function execution:

class AIEngine:
    def __init__(self, account: Account):
        self.account = account
        self.ai_core = AICore(account=account)
    
    def execute(self, fn: BaseAIFunction, payload: dict) -> Dict:
        """
        Execute AI function with full orchestration:
        1. Validation
        2. Preparation
        3. AI call
        4. Response parsing
        5. Output saving
        6. Credit deduction (automatic)
        7. Progress tracking
        8. Logging
        """
        
        # Step 1: Validate
        validation = fn.validate(payload, self.account)
        if not validation['valid']:
            return {'success': False, 'error': validation['error']}
        
        # Step 2: Prepare data
        prepared_data = fn.prepare(payload, self.account)
        
        # Step 3: Build prompt
        prompt = fn.build_prompt(prepared_data, self.account)
        
        # Step 4: Call AI (via AICore)
        model = fn.get_model(self.account) or self._get_default_model()
        response = self.ai_core.run_ai_request(
            prompt=prompt,
            model=model,
            function_name=fn.get_name()
        )
        
        # Step 5: Parse response
        parsed = fn.parse_response(response['content'])
        
        # Step 6: Save output
        result = fn.save_output(parsed, prepared_data, self.account)
        
        # Step 7: Deduct credits (AUTOMATIC - line 395)
        CreditService.deduct_credits_for_operation(
            account=self.account,
            operation_type=self._get_operation_type(),
            amount=self._get_actual_amount(),
        )
        
        # Step 8: Log to AIUsageLog
        AIUsageLog.objects.create(
            account=self.account,
            function_name=fn.get_name(),
            credits_used=credits_deducted,
            # ... other fields
        )
        
        return {
            'success': True,
            **result
        }

Key Point: Credits are AUTOMATICALLY deducted by AIEngine. AI functions do NOT handle credits themselves.


Credit System Integration

Automatic Credit Deduction:

All credit management happens in AIEngine.execute() at line 395:

# backend/igny8_core/ai/engine.py line 395
CreditService.deduct_credits_for_operation(
    account=account,
    operation_type=self._get_operation_type(),
    amount=self._get_actual_amount(),
)

AI Functions DO NOT:

  • Calculate credit costs
  • Call CreditService manually
  • Handle credit errors (handled by AIEngine)

AI Functions ONLY:

  • Focus on their specific logic
  • Return {'count': N} in save_output()
  • AIEngine uses count to calculate credits

Progress Tracking

StepTracker & ProgressTracker:

All AI functions emit progress events through trackers:

# Phases emitted automatically by AIEngine
phases = {
    'INIT': 'Initializing...',           # 0-10%
    'PREP': 'Preparing data...',         # 10-20%
    'AI_CALL': 'Processing with AI...',  # 20-80%
    'PARSE': 'Parsing response...',      # 80-90%
    'SAVE': 'Saving results...',         # 90-100%
    'DONE': 'Complete!'                  # 100%
}

Frontend can listen to these events via:

  • Celery task status polling
  • WebSocket connections
  • REST API /task-progress/:task_id/ endpoint

Summary

6 AI Functions - All Production Ready:

Function Lines of Code Status Used By
auto_cluster ~380 Complete Planner, Automation Stage 1
generate_ideas ~250 Complete Planner, Automation Stage 2
generate_content ~400 Complete Writer, Automation Stage 4
generate_image_prompts ~280 Complete Writer, Automation Stage 5
generate_images ~300 ⚠️ Partial Writer, Automation Stage 6
optimize_content ~200 Complete Writer (Manual)

Architecture Benefits:

  • Single source of truth for AI operations
  • Consistent credit management
  • Unified error handling
  • Centralized progress tracking
  • Easy to add new AI functions (inherit from BaseAIFunction)

Current gaps vs code (Dec 2025)

  • AIEngine now performs a credit pre-check before the AI call (still deducts after SAVE); this is not reflected in earlier notes.
  • generate_images implementation is partially broken: it expects task IDs (not image IDs), tries to read task.content (field does not exist), and uses the extract_image_prompts prompt path; credit estimation also looks for image_ids. Treat it as partial/needs fix.
  • AIEngine includes messaging/cost maps for generate_site_structure (extra function beyond the documented six); not presently documented above.

Automation Integration

VERIFIED: All AI functions are integrated into the IGNY8 Automation Pipeline.

7-Stage Automation Pipeline

The automation system (backend/igny8_core/business/automation/services/automation_service.py) uses 5 of the 6 AI functions:

Stage 1: Keywords → Clusters
  ↓ Uses: AutoClusterFunction
  ↓ Credits: ~0.2 per keyword
  
Stage 2: Clusters → Ideas
  ↓ Uses: GenerateIdeasFunction
  ↓ Credits: 2 per cluster
  
Stage 3: Ideas → Tasks
  ↓ Uses: None (Local operation)
  ↓ Credits: 0
  
Stage 4: Tasks → Content
  ↓ Uses: GenerateContentFunction
  ↓ Credits: ~5 per task (2500 words)
  
Stage 5: Content → Image Prompts
  ↓ Uses: GenerateImagePromptsFunction
  ↓ Credits: ~2 per content
  
Stage 6: Image Prompts → Images
  ↓ Uses: GenerateImagesFunction ⚠️
  ↓ Credits: 1-4 per image
  
Stage 7: Manual Review Gate
  ↓ Uses: None (Manual intervention)
  ↓ Credits: 0

Automation Execution Flow

# AutomationService.run_stage_1() example
def run_stage_1(self):
    keywords = Keywords.objects.filter(site=self.site, status='new')[:batch_size]
    
    # Call AI function via Celery
    from igny8_core.ai.tasks import run_ai_task
    result = run_ai_task(
        function_name='auto_cluster',
        payload={'ids': [k.id for k in keywords]},
        account_id=self.account.id
    )
    
    # Credits automatically deducted by AIEngine
    return {
        'keywords_processed': len(keywords),
        'clusters_created': result.get('count', 0),
        'credits_used': result.get('credits_used', 0)
    }

Frontend Access

Automation Page: /automation (Fully functional)

  • Real-time pipeline overview
  • Manual trigger ("Run Now" button)
  • Pause/Resume controls
  • Live progress tracking
  • Activity logs
  • Run history

Planner & Writer Pages: Individual AI function triggers

  • Cluster keywords (Stage 1)
  • Generate ideas (Stage 2)
  • Generate content (Stage 4)
  • Extract image prompts (Stage 5)
  • Generate images (Stage 6)

End of AI Functions Reference