Add SEO fields to Tasks model, improve content generation response handling, and enhance progress bar animation

- Added primary_keyword, secondary_keywords, tags, and categories fields to Tasks model
- Updated generate_content function to handle full JSON response with all SEO fields
- Improved progress bar animation: smooth 1% increments every 300ms
- Enhanced step detection for content generation vs clustering vs ideas
- Fixed progress modal to show correct messages for each function type
- Added comprehensive logging to Keywords and Tasks pages for AI functions
- Fixed error handling to show meaningful error messages instead of generic failures
This commit is contained in:
Gitea Deploy
2025-11-09 21:22:34 +00:00
parent 09d22ab0e2
commit 961362e088
17340 changed files with 10636 additions and 2248776 deletions

View File

@@ -1,4 +1,17 @@
"""
AI Function implementations
"""
from igny8_core.ai.functions.auto_cluster import AutoClusterFunction
from igny8_core.ai.functions.generate_ideas import GenerateIdeasFunction, generate_ideas_core
from igny8_core.ai.functions.generate_content import GenerateContentFunction, generate_content_core
from igny8_core.ai.functions.generate_images import GenerateImagesFunction, generate_images_core
__all__ = [
'AutoClusterFunction',
'GenerateIdeasFunction',
'generate_ideas_core',
'GenerateContentFunction',
'generate_content_core',
'GenerateImagesFunction',
'generate_images_core',
]

View File

@@ -6,7 +6,9 @@ from typing import Dict, List, Any
from django.db import transaction
from igny8_core.ai.base import BaseAIFunction
from igny8_core.modules.planner.models import Keywords, Clusters
from igny8_core.modules.system.utils import get_prompt_value
from igny8_core.ai.ai_core import AICore
from igny8_core.ai.prompts import PromptRegistry
from igny8_core.ai.settings import get_model_config
logger = logging.getLogger(__name__)
@@ -36,49 +38,23 @@ class AutoClusterFunction(BaseAIFunction):
def validate(self, payload: dict, account=None) -> Dict:
"""Custom validation for clustering with plan limit checks"""
result = super().validate(payload, account)
from igny8_core.ai.validators import validate_ids, validate_keywords_exist, validate_cluster_limits
# Base validation
result = validate_ids(payload, max_items=self.get_max_items())
if not result['valid']:
return result
# Additional validation: check keywords exist
# Check keywords exist
ids = payload.get('ids', [])
queryset = Keywords.objects.filter(id__in=ids)
if account:
queryset = queryset.filter(account=account)
keywords_result = validate_keywords_exist(ids, account)
if not keywords_result['valid']:
return keywords_result
if queryset.count() == 0:
return {'valid': False, 'error': 'No keywords found'}
# Plan limit validation
if account:
plan = getattr(account, 'plan', None)
if plan:
from django.utils import timezone
from igny8_core.modules.planner.models import Clusters
# Check daily cluster limit
now = timezone.now()
start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
clusters_today = Clusters.objects.filter(
account=account,
created_at__gte=start_of_day
).count()
if plan.daily_cluster_limit and clusters_today >= plan.daily_cluster_limit:
return {
'valid': False,
'error': f'Daily cluster limit reached ({plan.daily_cluster_limit} clusters per day). Please try again tomorrow.'
}
# Check max clusters limit
total_clusters = Clusters.objects.filter(account=account).count()
if plan.max_clusters and total_clusters >= plan.max_clusters:
return {
'valid': False,
'error': f'Maximum cluster limit reached ({plan.max_clusters} clusters). Please upgrade your plan or delete existing clusters.'
}
else:
return {'valid': False, 'error': 'Account does not have an active plan'}
# Check plan limits
limit_result = validate_cluster_limits(account, operation_type='cluster')
if not limit_result['valid']:
return limit_result
return {'valid': True}
@@ -115,20 +91,18 @@ class AutoClusterFunction(BaseAIFunction):
}
def build_prompt(self, data: Dict, account=None) -> str:
"""Build clustering prompt"""
"""Build clustering prompt using registry"""
keyword_data = data['keyword_data']
sector_id = data.get('sector_id')
# Get prompt template
prompt_template = get_prompt_value(account, 'clustering')
# Format keywords
keywords_text = '\n'.join([
f"- {kw['keyword']} (Volume: {kw['volume']}, Difficulty: {kw['difficulty']}, Intent: {kw['intent']})"
for kw in keyword_data
])
prompt = prompt_template.replace('[IGNY8_KEYWORDS]', keywords_text)
# Build context
context = {'KEYWORDS': keywords_text}
# Add sector context if available
if sector_id:
@@ -136,14 +110,26 @@ class AutoClusterFunction(BaseAIFunction):
from igny8_core.auth.models import Sector
sector = Sector.objects.get(id=sector_id)
if sector:
prompt += f"\n\nNote: These keywords are for the '{sector.name}' sector."
context['SECTOR'] = sector.name
except Exception:
pass
# Get prompt from registry
prompt = PromptRegistry.get_prompt(
function_name='auto_cluster',
account=account,
context=context
)
# Verify placeholder replacement
if '[IGNY8_KEYWORDS]' in prompt:
logger.error(f"[IGNY8_KEYWORDS] placeholder NOT replaced! Prompt length: {len(prompt)}")
else:
logger.info(f"Prompt placeholder replaced successfully. Prompt length: {len(prompt)}, Keywords text length: {len(keywords_text)}")
# IMPORTANT: When using JSON mode, OpenAI requires explicit JSON instruction
# The prompt template already includes "Format the output as a JSON object"
# but we need to ensure it's explicit for JSON mode compliance
# Check if prompt already explicitly requests JSON (case-insensitive)
prompt_lower = prompt.lower()
has_json_request = (
'json' in prompt_lower and
@@ -158,7 +144,7 @@ class AutoClusterFunction(BaseAIFunction):
def parse_response(self, response: str, step_tracker=None) -> List[Dict]:
"""Parse AI response into cluster data"""
import json
from igny8_core.ai.processor import AIProcessor
from igny8_core.ai.ai_core import AICore
if not response or not response.strip():
error_msg = "Empty response from AI"
@@ -172,8 +158,8 @@ class AutoClusterFunction(BaseAIFunction):
except json.JSONDecodeError as e:
logger.warning(f"parse_response: Direct JSON parse failed: {e}, trying extract_json method")
# Fall back to extract_json method which handles markdown code blocks
processor = AIProcessor()
json_data = processor.extract_json(response)
ai_core = AICore(account=getattr(self, 'account', None))
json_data = ai_core.extract_json(response)
if not json_data:
error_msg = f"Failed to parse clustering response. Response: {response[:200]}..."

View File

@@ -0,0 +1,337 @@
"""
Generate Content AI Function
Extracted from modules/writer/tasks.py
"""
import logging
import re
from typing import Dict, List, Any
from django.db import transaction
from igny8_core.ai.base import BaseAIFunction
from igny8_core.modules.writer.models import Tasks
from igny8_core.ai.ai_core import AICore
from igny8_core.ai.validators import validate_tasks_exist
from igny8_core.ai.prompts import PromptRegistry
from igny8_core.ai.settings import get_model_config
logger = logging.getLogger(__name__)
class GenerateContentFunction(BaseAIFunction):
"""Generate content for tasks using AI"""
def get_name(self) -> str:
return 'generate_content'
def get_metadata(self) -> Dict:
return {
'display_name': 'Generate Content',
'description': 'Generate article content from task ideas',
'phases': {
'INIT': 'Initializing content generation...',
'PREP': 'Loading tasks and building prompts...',
'AI_CALL': 'Generating content with AI...',
'PARSE': 'Processing content...',
'SAVE': 'Saving content...',
'DONE': 'Content generated!'
}
}
def get_max_items(self) -> int:
return 50 # Max tasks per batch
def validate(self, payload: dict, account=None) -> Dict:
"""Validate task IDs"""
result = super().validate(payload, account)
if not result['valid']:
return result
# Check tasks exist
task_ids = payload.get('ids', [])
if task_ids:
task_result = validate_tasks_exist(task_ids, account)
if not task_result['valid']:
return task_result
return {'valid': True}
def prepare(self, payload: dict, account=None) -> List:
"""Load tasks with all relationships"""
task_ids = payload.get('ids', [])
queryset = Tasks.objects.filter(id__in=task_ids)
if account:
queryset = queryset.filter(account=account)
# Preload all relationships to avoid N+1 queries
tasks = list(queryset.select_related(
'account', 'site', 'sector', 'cluster', 'idea'
))
if not tasks:
raise ValueError("No tasks found")
return tasks
def build_prompt(self, data: Any, account=None) -> str:
"""Build content generation prompt for a single task using registry"""
if isinstance(data, list):
# For now, handle single task (will be called per task)
if not data:
raise ValueError("No tasks provided")
task = data[0]
else:
task = data
account = account or task.account
# Build idea data string
idea_data = f"Title: {task.title or 'Untitled'}\n"
if task.description:
idea_data += f"Description: {task.description}\n"
# Handle idea description (might be JSON or plain text)
if task.idea and task.idea.description:
description = task.idea.description
try:
import json
parsed_desc = json.loads(description)
if isinstance(parsed_desc, dict):
formatted_desc = "Content Outline:\n\n"
if 'H2' in parsed_desc:
for h2_section in parsed_desc['H2']:
formatted_desc += f"## {h2_section.get('heading', '')}\n"
if 'subsections' in h2_section:
for h3_section in h2_section['subsections']:
formatted_desc += f"### {h3_section.get('subheading', '')}\n"
formatted_desc += f"Content Type: {h3_section.get('content_type', '')}\n"
formatted_desc += f"Details: {h3_section.get('details', '')}\n\n"
description = formatted_desc
except (json.JSONDecodeError, TypeError):
pass # Use as plain text
idea_data += f"Outline: {description}\n"
if task.idea:
idea_data += f"Structure: {task.idea.content_structure or task.content_structure or 'blog_post'}\n"
idea_data += f"Type: {task.idea.content_type or task.content_type or 'blog_post'}\n"
if task.idea.estimated_word_count:
idea_data += f"Estimated Word Count: {task.idea.estimated_word_count}\n"
# Build cluster data string
cluster_data = ''
if task.cluster:
cluster_data = f"Cluster Name: {task.cluster.name or ''}\n"
if task.cluster.description:
cluster_data += f"Description: {task.cluster.description}\n"
cluster_data += f"Status: {task.cluster.status or 'active'}\n"
# Build keywords string
keywords_data = task.keywords or ''
if not keywords_data and task.idea:
keywords_data = task.idea.target_keywords or ''
# Get prompt from registry with context
prompt = PromptRegistry.get_prompt(
function_name='generate_content',
account=account,
task=task,
context={
'IDEA': idea_data,
'CLUSTER': cluster_data,
'KEYWORDS': keywords_data,
}
)
return prompt
def parse_response(self, response: str, step_tracker=None) -> Dict:
"""Parse content response - can be JSON or plain text"""
import json
# Try to parse as JSON first
try:
parsed_json = json.loads(response.strip())
if isinstance(parsed_json, dict):
# It's a JSON object with structured data
return parsed_json
except (json.JSONDecodeError, ValueError):
pass
# If not JSON, treat as plain content and normalize
try:
from igny8_core.utils.content_normalizer import normalize_content
normalized = normalize_content(response)
content_text = normalized['normalized_content']
# Return as dict with content field for consistency
return {'content': content_text}
except Exception as e:
logger.warning(f"Content normalization failed: {e}, using original")
return {'content': response}
def save_output(
self,
parsed: Any,
original_data: Any,
account=None,
progress_tracker=None,
step_tracker=None
) -> Dict:
"""Save content to task - handles both JSON and plain text responses"""
if isinstance(original_data, list):
task = original_data[0] if original_data else None
else:
task = original_data
if not task:
raise ValueError("No task provided for saving")
# Handle parsed response - can be dict (JSON) or string (plain text)
if isinstance(parsed, dict):
# JSON response with structured fields
content = parsed.get('content', '')
title = parsed.get('title', task.title)
meta_title = parsed.get('meta_title', title or task.title)
meta_description = parsed.get('meta_description', '')
word_count = parsed.get('word_count', 0)
primary_keyword = parsed.get('primary_keyword', '')
secondary_keywords = parsed.get('secondary_keywords', [])
tags = parsed.get('tags', [])
categories = parsed.get('categories', [])
else:
# Plain text response (legacy)
content = str(parsed)
title = task.title
meta_title = task.title
meta_description = (task.description or '')[:160] if task.description else ''
word_count = 0
primary_keyword = ''
secondary_keywords = []
tags = []
categories = []
# Calculate word count if not provided
if not word_count and content:
text_for_counting = re.sub(r'<[^>]+>', '', content)
word_count = len(text_for_counting.split())
# Update task with all fields
if content:
task.content = content
if title and title != task.title:
task.title = title
task.word_count = word_count
# SEO fields
if meta_title:
task.meta_title = meta_title
elif not task.meta_title:
task.meta_title = task.title # Fallback to title
if meta_description:
task.meta_description = meta_description
elif not task.meta_description and task.description:
task.meta_description = (task.description or '')[:160] # Fallback to description
if primary_keyword:
task.primary_keyword = primary_keyword
if secondary_keywords:
task.secondary_keywords = secondary_keywords if isinstance(secondary_keywords, list) else []
if tags:
task.tags = tags if isinstance(tags, list) else []
if categories:
task.categories = categories if isinstance(categories, list) else []
task.status = 'draft'
task.save()
return {
'count': 1,
'tasks_updated': 1,
'word_count': word_count
}
def generate_content_core(task_ids: List[int], account_id: int = None, progress_callback=None):
"""
Core logic for generating content (legacy function signature for backward compatibility).
Can be called with or without Celery.
Args:
task_ids: List of task IDs
account_id: Account ID for account isolation
progress_callback: Optional function to call for progress updates
Returns:
Dict with 'success', 'tasks_updated', 'message', etc.
"""
try:
from igny8_core.auth.models import Account
account = None
if account_id:
account = Account.objects.get(id=account_id)
# Use the new function class
fn = GenerateContentFunction()
fn.account = account
# Prepare payload
payload = {'ids': task_ids}
# Validate
validated = fn.validate(payload, account)
if not validated['valid']:
return {'success': False, 'error': validated['error']}
# Prepare data
tasks = fn.prepare(payload, account)
tasks_updated = 0
# Process each task
for task in tasks:
# Build prompt for this task
prompt = fn.build_prompt([task], account)
# Get model config from settings
model_config = get_model_config('generate_content')
# Call AI using centralized request handler
ai_core = AICore(account=account)
result = ai_core.run_ai_request(
prompt=prompt,
model=model_config.get('model'),
max_tokens=model_config.get('max_tokens'),
temperature=model_config.get('temperature'),
response_format=model_config.get('response_format'),
function_name='generate_content'
)
if result.get('error'):
logger.error(f"AI error for task {task.id}: {result['error']}")
continue
# Parse response
content = fn.parse_response(result['content'])
if not content:
logger.warning(f"No content generated for task {task.id}")
continue
# Save output
save_result = fn.save_output(content, [task], account)
tasks_updated += save_result.get('tasks_updated', 0)
return {
'success': True,
'tasks_updated': tasks_updated,
'message': f'Content generation complete: {tasks_updated} articles generated'
}
except Exception as e:
logger.error(f"Error in generate_content_core: {str(e)}", exc_info=True)
return {'success': False, 'error': str(e)}

View File

@@ -0,0 +1,330 @@
"""
Generate Ideas AI Function
Extracted from modules/planner/tasks.py
"""
import logging
import json
from typing import Dict, List, Any
from django.db import transaction
from igny8_core.ai.base import BaseAIFunction
from igny8_core.modules.planner.models import Clusters, ContentIdeas
from igny8_core.ai.ai_core import AICore
from igny8_core.ai.validators import validate_cluster_exists, validate_cluster_limits
from igny8_core.ai.tracker import ConsoleStepTracker
from igny8_core.ai.prompts import PromptRegistry
from igny8_core.ai.settings import get_model_config
logger = logging.getLogger(__name__)
class GenerateIdeasFunction(BaseAIFunction):
"""Generate content ideas from clusters using AI"""
def get_name(self) -> str:
return 'generate_ideas'
def get_metadata(self) -> Dict:
return {
'display_name': 'Generate Ideas',
'description': 'Generate SEO-optimized content ideas from keyword clusters',
'phases': {
'INIT': 'Initializing idea generation...',
'PREP': 'Loading clusters...',
'AI_CALL': 'Generating ideas with AI...',
'PARSE': 'Parsing idea data...',
'SAVE': 'Saving ideas...',
'DONE': 'Ideas generated!'
}
}
def get_max_items(self) -> int:
return 10 # Max clusters per idea generation
def validate(self, payload: dict, account=None) -> Dict:
"""Validate cluster IDs and plan limits"""
result = super().validate(payload, account)
if not result['valid']:
return result
# Check cluster exists
cluster_ids = payload.get('ids', [])
if cluster_ids:
cluster_id = cluster_ids[0] # For single cluster idea generation
cluster_result = validate_cluster_exists(cluster_id, account)
if not cluster_result['valid']:
return cluster_result
# Check plan limits
limit_result = validate_cluster_limits(account, operation_type='idea')
if not limit_result['valid']:
return limit_result
return {'valid': True}
def prepare(self, payload: dict, account=None) -> Dict:
"""Load clusters with keywords"""
cluster_ids = payload.get('ids', [])
if not cluster_ids:
raise ValueError("No cluster IDs provided")
# Support multiple clusters (up to get_max_items())
queryset = Clusters.objects.filter(id__in=cluster_ids)
if account:
queryset = queryset.filter(account=account)
clusters = list(queryset.select_related('sector', 'account', 'site', 'sector__site').prefetch_related('keywords'))
if not clusters:
raise ValueError("No clusters found")
# Get keywords for each cluster
from igny8_core.modules.planner.models import Keywords
cluster_data = []
for cluster in clusters:
# Get keywords and extract the keyword text from seed_keyword relationship
keyword_objects = Keywords.objects.filter(cluster=cluster).select_related('seed_keyword')
keywords = [kw.seed_keyword.keyword for kw in keyword_objects if kw.seed_keyword]
cluster_data.append({
'id': cluster.id,
'name': cluster.name,
'description': cluster.description or '',
'keywords': keywords,
})
# Get account from first cluster if not provided
account = account or (clusters[0].account if clusters else None)
return {
'clusters': clusters, # List of cluster objects
'cluster_data': cluster_data, # Formatted data for AI
'account': account
}
def build_prompt(self, data: Dict, account=None) -> str:
"""Build ideas generation prompt using registry"""
cluster_data = data['cluster_data']
account = account or data.get('account')
# Format clusters text
clusters_text = '\n'.join([
f"Cluster ID: {c.get('id', '')} | Name: {c.get('name', '')} | Description: {c.get('description', '')}"
for c in cluster_data
])
# Format cluster keywords
cluster_keywords_text = '\n'.join([
f"Cluster ID: {c.get('id', '')} | Name: {c.get('name', '')} | Keywords: {', '.join(c.get('keywords', []))}"
for c in cluster_data
])
# Get prompt from registry with context
prompt = PromptRegistry.get_prompt(
function_name='generate_ideas',
account=account,
context={
'CLUSTERS': clusters_text,
'CLUSTER_KEYWORDS': cluster_keywords_text,
}
)
return prompt
def parse_response(self, response: str, step_tracker=None) -> List[Dict]:
"""Parse AI response into idea data"""
ai_core = AICore(account=self.account if hasattr(self, 'account') else None)
json_data = ai_core.extract_json(response)
if not json_data or 'ideas' not in json_data:
error_msg = f"Failed to parse ideas response: {response[:200]}..."
logger.error(error_msg)
raise ValueError(error_msg)
return json_data.get('ideas', [])
def save_output(
self,
parsed: List[Dict],
original_data: Dict,
account=None,
progress_tracker=None,
step_tracker=None
) -> Dict:
"""Save ideas to database"""
clusters = original_data['clusters'] # List of cluster objects
cluster_data = original_data['cluster_data'] # Formatted data for matching
account = account or original_data.get('account')
if not account:
raise ValueError("Account is required for idea creation")
ideas_created = 0
with transaction.atomic():
for idx, idea_data in enumerate(parsed):
# Find matching cluster by ID or name
cluster = None
cluster_id_from_ai = idea_data.get('cluster_id')
cluster_name = idea_data.get('cluster_name', '')
# Try to match by ID first
if cluster_id_from_ai:
for c in clusters:
if c.id == cluster_id_from_ai:
cluster = c
break
# Fallback to name matching
if not cluster and cluster_name:
for c in clusters:
if c.name == cluster_name:
cluster = c
break
# If still no match, use position-based matching (first idea -> first cluster, etc.)
if not cluster and len(clusters) > 0:
cluster_index = idx % len(clusters)
cluster = clusters[cluster_index]
logger.warning(f"Cluster not found by ID/name for idea '{idea_data.get('title', 'Untitled')}', using cluster at index {cluster_index}")
if not cluster:
logger.warning(f"Cluster not found for idea '{idea_data.get('title', 'Untitled')}', skipping")
continue
# Ensure site is available
site = cluster.site
if not site and cluster.sector:
site = cluster.sector.site
if not site:
logger.error(f"Site not found for cluster {cluster.id}, cannot create ContentIdeas")
continue
# Handle description - might be dict or string
description = idea_data.get('description', '')
if isinstance(description, dict):
description = json.dumps(description)
elif not isinstance(description, str):
description = str(description)
# Handle target_keywords
target_keywords = idea_data.get('covered_keywords', '') or idea_data.get('target_keywords', '')
# Create ContentIdeas record
ContentIdeas.objects.create(
idea_title=idea_data.get('title', 'Untitled Idea'),
description=description,
content_type=idea_data.get('content_type', 'blog_post'),
content_structure=idea_data.get('content_structure', 'supporting_page'),
target_keywords=target_keywords,
keyword_cluster=cluster,
estimated_word_count=idea_data.get('estimated_word_count', 1500),
status='new',
account=account,
site=site,
sector=cluster.sector,
)
ideas_created += 1
return {
'count': ideas_created,
'ideas_created': ideas_created
}
def generate_ideas_core(cluster_id: int, account_id: int = None, progress_callback=None):
"""
Core logic for generating ideas (legacy function signature for backward compatibility).
Can be called with or without Celery.
Args:
cluster_id: Cluster ID to generate idea for
account_id: Account ID for account isolation
progress_callback: Optional function to call for progress updates
Returns:
Dict with 'success', 'idea_created', 'message', etc.
"""
tracker = ConsoleStepTracker('generate_ideas')
tracker.init("Task started")
try:
from igny8_core.auth.models import Account
account = None
if account_id:
account = Account.objects.get(id=account_id)
tracker.prep("Loading account and cluster data...")
# Use the new function class
fn = GenerateIdeasFunction()
# Store account for use in methods
fn.account = account
# Prepare payload
payload = {'ids': [cluster_id]}
# Validate
tracker.prep("Validating input...")
validated = fn.validate(payload, account)
if not validated['valid']:
tracker.error('ValidationError', validated['error'])
return {'success': False, 'error': validated['error']}
# Prepare data
tracker.prep("Loading cluster with keywords...")
data = fn.prepare(payload, account)
# Build prompt
tracker.prep("Building prompt...")
prompt = fn.build_prompt(data, account)
# Get model config from settings
model_config = get_model_config('generate_ideas')
# Call AI using centralized request handler
ai_core = AICore(account=account)
result = ai_core.run_ai_request(
prompt=prompt,
model=model_config.get('model'),
max_tokens=model_config.get('max_tokens'),
temperature=model_config.get('temperature'),
response_format=model_config.get('response_format'),
function_name='generate_ideas',
tracker=tracker
)
if result.get('error'):
return {'success': False, 'error': result['error']}
# Parse response
tracker.parse("Parsing AI response...")
ideas_data = fn.parse_response(result['content'])
if not ideas_data:
tracker.error('ParseError', 'No ideas generated by AI')
return {'success': False, 'error': 'No ideas generated by AI'}
tracker.parse(f"Parsed {len(ideas_data)} idea(s)")
# Take first idea
idea_data = ideas_data[0]
# Save output
tracker.save("Saving idea to database...")
save_result = fn.save_output(ideas_data, data, account)
tracker.save(f"Saved {save_result['ideas_created']} idea(s)")
tracker.done(f"Idea '{idea_data.get('title', 'Untitled')}' created successfully")
return {
'success': True,
'idea_created': save_result['ideas_created'],
'message': f"Idea '{idea_data.get('title', 'Untitled')}' created"
}
except Exception as e:
tracker.error('Exception', str(e), e)
logger.error(f"Error in generate_ideas_core: {str(e)}", exc_info=True)
return {'success': False, 'error': str(e)}

View File

@@ -0,0 +1,277 @@
"""
Generate Images AI Function
Extracted from modules/writer/tasks.py
"""
import logging
from typing import Dict, List, Any
from django.db import transaction
from igny8_core.ai.base import BaseAIFunction
from igny8_core.modules.writer.models import Tasks, Images
from igny8_core.ai.ai_core import AICore
from igny8_core.ai.validators import validate_tasks_exist
from igny8_core.ai.prompts import PromptRegistry
from igny8_core.ai.settings import get_model_config
logger = logging.getLogger(__name__)
class GenerateImagesFunction(BaseAIFunction):
"""Generate images for tasks using AI"""
def get_name(self) -> str:
return 'generate_images'
def get_metadata(self) -> Dict:
return {
'display_name': 'Generate Images',
'description': 'Generate featured and in-article images for tasks',
'phases': {
'INIT': 'Initializing image generation...',
'PREP': 'Extracting image prompts...',
'AI_CALL': 'Generating images with AI...',
'PARSE': 'Processing image URLs...',
'SAVE': 'Saving images...',
'DONE': 'Images generated!'
}
}
def get_max_items(self) -> int:
return 20 # Max tasks per batch
def validate(self, payload: dict, account=None) -> Dict:
"""Validate task IDs"""
result = super().validate(payload, account)
if not result['valid']:
return result
# Check tasks exist
task_ids = payload.get('ids', [])
if task_ids:
task_result = validate_tasks_exist(task_ids, account)
if not task_result['valid']:
return task_result
return {'valid': True}
def prepare(self, payload: dict, account=None) -> Dict:
"""Load tasks and image generation settings"""
task_ids = payload.get('ids', [])
queryset = Tasks.objects.filter(id__in=task_ids)
if account:
queryset = queryset.filter(account=account)
tasks = list(queryset.select_related('account', 'sector', 'site'))
if not tasks:
raise ValueError("No tasks found")
# Get image generation settings
image_settings = {}
if account:
try:
from igny8_core.modules.system.models import IntegrationSettings
integration = IntegrationSettings.objects.get(
account=account,
integration_type='image_generation',
is_active=True
)
image_settings = integration.config or {}
except Exception:
pass
# Extract settings with defaults
provider = image_settings.get('provider') or image_settings.get('service', 'openai')
if provider == 'runware':
model = image_settings.get('model') or image_settings.get('runwareModel', 'runware:97@1')
else:
model = image_settings.get('model', 'dall-e-3')
return {
'tasks': tasks,
'account': account,
'provider': provider,
'model': model,
'image_type': image_settings.get('image_type', 'realistic'),
'max_in_article_images': int(image_settings.get('max_in_article_images', 2)),
'desktop_enabled': image_settings.get('desktop_enabled', True),
'mobile_enabled': image_settings.get('mobile_enabled', True),
}
def build_prompt(self, data: Dict, account=None) -> Dict:
"""Extract image prompts from task content"""
task = data.get('task')
max_images = data.get('max_in_article_images', 2)
if not task or not task.content:
raise ValueError("Task has no content")
# Use AI to extract image prompts
ai_core = AICore(account=account or data.get('account'))
account_obj = account or data.get('account')
# Get prompt from registry
prompt = PromptRegistry.get_prompt(
function_name='extract_image_prompts',
account=account_obj,
context={
'title': task.title,
'content': task.content[:5000], # Limit content length
'max_images': max_images
}
)
# Get model config
model_config = get_model_config('extract_image_prompts')
# Call AI to extract prompts using centralized request handler
result = ai_core.run_ai_request(
prompt=prompt,
model=model_config.get('model'),
max_tokens=model_config.get('max_tokens'),
temperature=model_config.get('temperature'),
response_format=model_config.get('response_format'),
function_name='extract_image_prompts'
)
if result.get('error'):
raise ValueError(f"Failed to extract image prompts: {result['error']}")
# Parse JSON response
json_data = ai_core.extract_json(result['content'])
if not json_data:
raise ValueError("Failed to parse image prompts response")
return {
'featured_prompt': json_data.get('featured_prompt', ''),
'in_article_prompts': json_data.get('in_article_prompts', [])
}
def parse_response(self, response: Dict, step_tracker=None) -> Dict:
"""Parse image generation response (already parsed, just return)"""
return response
def save_output(
self,
parsed: Dict,
original_data: Dict,
account=None,
progress_tracker=None,
step_tracker=None
) -> Dict:
"""Save images to database"""
task = original_data.get('task')
image_url = parsed.get('url')
image_type = parsed.get('image_type') # 'featured', 'desktop', 'mobile'
if not task or not image_url:
raise ValueError("Missing task or image URL")
# Create Images record
image = Images.objects.create(
task=task,
image_url=image_url,
image_type=image_type,
account=account or task.account,
site=task.site,
sector=task.sector,
)
return {
'count': 1,
'images_created': 1,
'image_id': image.id
}
def generate_images_core(task_ids: List[int], account_id: int = None, progress_callback=None):
"""
Core logic for generating images (legacy function signature for backward compatibility).
Can be called with or without Celery.
Args:
task_ids: List of task IDs
account_id: Account ID for account isolation
progress_callback: Optional function to call for progress updates
Returns:
Dict with 'success', 'images_created', 'message', etc.
"""
try:
from igny8_core.auth.models import Account
account = None
if account_id:
account = Account.objects.get(id=account_id)
# Use the new function class
fn = GenerateImagesFunction()
fn.account = account
# Prepare payload
payload = {'ids': task_ids}
# Validate
validated = fn.validate(payload, account)
if not validated['valid']:
return {'success': False, 'error': validated['error']}
# Prepare data
data = fn.prepare(payload, account)
tasks = data['tasks']
# Get prompts from registry
image_prompt_template = PromptRegistry.get_image_prompt_template(account)
negative_prompt = PromptRegistry.get_negative_prompt(account)
ai_core = AICore(account=account)
images_created = 0
# Process each task
for task in tasks:
if not task.content:
continue
# Extract image prompts
prompts_data = fn.build_prompt({'task': task, **data}, account)
featured_prompt = prompts_data['featured_prompt']
in_article_prompts = prompts_data['in_article_prompts']
# Format featured prompt
formatted_featured = image_prompt_template.format(
image_type=data['image_type'],
post_title=task.title,
image_prompt=featured_prompt
)
# Generate featured image using centralized handler
featured_result = ai_core.generate_image(
prompt=formatted_featured,
provider=data['provider'],
model=data['model'],
negative_prompt=negative_prompt,
function_name='generate_images'
)
if not featured_result.get('error') and featured_result.get('url'):
fn.save_output(
{'url': featured_result['url'], 'image_type': 'featured'},
{'task': task, **data},
account
)
images_created += 1
# Generate in-article images (desktop/mobile if enabled)
# ... (simplified for now, full logic in tasks.py)
return {
'success': True,
'images_created': images_created,
'message': f'Image generation complete: {images_created} images created'
}
except Exception as e:
logger.error(f"Error in generate_images_core: {str(e)}", exc_info=True)
return {'success': False, 'error': str(e)}