- Improved progress modal messages in ai/engine.py (Section 4) - Added keywords_count and has_integration to SiteSerializer (Section 6) - Added notificationStore.ts for frontend notifications (Section 8) - Added NotificationDropdownNew component (Section 8) - Added SiteSetupChecklist to SiteCard in compact mode (Section 6) - Updated api.ts Site interface with new fields
655 lines
33 KiB
Python
655 lines
33 KiB
Python
"""
|
|
AI Engine - Central orchestrator for all AI functions
|
|
"""
|
|
import logging
|
|
from typing import Dict, Any, Optional
|
|
from igny8_core.ai.base import BaseAIFunction
|
|
from igny8_core.ai.tracker import StepTracker, ProgressTracker, CostTracker
|
|
from igny8_core.ai.ai_core import AICore
|
|
from igny8_core.ai.settings import get_model_config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AIEngine:
|
|
"""
|
|
Central orchestrator for all AI functions.
|
|
Manages lifecycle, progress, logging, retries, cost tracking.
|
|
"""
|
|
|
|
def __init__(self, celery_task=None, account=None):
|
|
self.task = celery_task
|
|
self.account = account
|
|
self.tracker = ProgressTracker(celery_task)
|
|
self.step_tracker = StepTracker('ai_engine') # For Celery progress callbacks
|
|
self.cost_tracker = CostTracker()
|
|
|
|
def _get_input_description(self, function_name: str, payload: dict, count: int) -> str:
|
|
"""Get user-friendly input description"""
|
|
if function_name == 'auto_cluster':
|
|
return f"{count} keyword{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_ideas':
|
|
return f"{count} cluster{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_content':
|
|
return f"{count} article{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_images':
|
|
return f"{count} image{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_image_prompts':
|
|
return f"{count} image prompt{'s' if count != 1 else ''}"
|
|
elif function_name == 'optimize_content':
|
|
return f"{count} article{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_site_structure':
|
|
return "site blueprint"
|
|
return f"{count} item{'s' if count != 1 else ''}"
|
|
|
|
def _build_validation_message(self, function_name: str, payload: dict, count: int, input_description: str) -> str:
|
|
"""Build validation message with item names for better UX"""
|
|
if function_name == 'auto_cluster' and count > 0:
|
|
try:
|
|
from igny8_core.modules.planner.models import Keywords
|
|
ids = payload.get('ids', [])
|
|
keywords = Keywords.objects.filter(id__in=ids, account=self.account).values_list('keyword', flat=True)[:3]
|
|
keyword_list = list(keywords)
|
|
|
|
if len(keyword_list) > 0:
|
|
remaining = count - len(keyword_list)
|
|
if remaining > 0:
|
|
keywords_text = ', '.join(keyword_list)
|
|
return f"Validating {count} keywords for clustering"
|
|
else:
|
|
keywords_text = ', '.join(keyword_list)
|
|
return f"Validating {keywords_text}"
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load keyword names for validation message: {e}")
|
|
elif function_name == 'generate_ideas':
|
|
return f"Analyzing {count} clusters for content opportunities"
|
|
elif function_name == 'generate_content':
|
|
return f"Preparing {count} article{'s' if count != 1 else ''} for generation"
|
|
elif function_name == 'generate_image_prompts':
|
|
return f"Analyzing content for image opportunities"
|
|
elif function_name == 'generate_images':
|
|
return f"Queuing {count} image{'s' if count != 1 else ''} for generation"
|
|
elif function_name == 'optimize_content':
|
|
return f"Analyzing {count} article{'s' if count != 1 else ''} for optimization"
|
|
|
|
# Fallback to simple count message
|
|
return f"Validating {input_description}"
|
|
|
|
def _get_prep_message(self, function_name: str, count: int, data: Any) -> str:
|
|
"""Get user-friendly prep message"""
|
|
if function_name == 'auto_cluster':
|
|
return f"Analyzing keyword relationships for {count} keyword{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_ideas':
|
|
# Count keywords in clusters if available
|
|
keyword_count = 0
|
|
if isinstance(data, dict) and 'cluster_data' in data:
|
|
for cluster in data['cluster_data']:
|
|
keyword_count += len(cluster.get('keywords', []))
|
|
if keyword_count > 0:
|
|
return f"Mapping {keyword_count} keywords to topic briefs"
|
|
return f"Mapping keywords to topic briefs for {count} cluster{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_content':
|
|
return f"Building content brief{'s' if count != 1 else ''} with target keywords"
|
|
elif function_name == 'generate_images':
|
|
return f"Preparing AI image generation ({count} image{'s' if count != 1 else ''})"
|
|
elif function_name == 'generate_image_prompts':
|
|
# Extract max_images from data if available
|
|
if isinstance(data, list) and len(data) > 0:
|
|
max_images = data[0].get('max_images')
|
|
total_images = 1 + max_images # 1 featured + max_images in-article
|
|
return f"Identifying 1 featured + {max_images} in-article image slots"
|
|
elif isinstance(data, dict) and 'max_images' in data:
|
|
max_images = data.get('max_images')
|
|
total_images = 1 + max_images
|
|
return f"Identifying 1 featured + {max_images} in-article image slots"
|
|
return f"Identifying featured and in-article image slots"
|
|
elif function_name == 'optimize_content':
|
|
return f"Analyzing SEO factors for {count} article{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_site_structure':
|
|
blueprint_name = ''
|
|
if isinstance(data, dict):
|
|
blueprint = data.get('blueprint')
|
|
if blueprint and getattr(blueprint, 'name', None):
|
|
blueprint_name = f'"{blueprint.name}"'
|
|
return f"Preparing site blueprint {blueprint_name}".strip()
|
|
return f"Preparing {count} item{'s' if count != 1 else ''}"
|
|
|
|
def _get_ai_call_message(self, function_name: str, count: int) -> str:
|
|
"""Get user-friendly AI call message"""
|
|
if function_name == 'auto_cluster':
|
|
return f"Grouping {count} keywords by search intent"
|
|
elif function_name == 'generate_ideas':
|
|
return f"Generating content ideas for {count} cluster{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_content':
|
|
return f"Writing {count} article{'s' if count != 1 else ''} with AI"
|
|
elif function_name == 'generate_images':
|
|
return f"Generating image{'s' if count != 1 else ''} with AI"
|
|
elif function_name == 'generate_image_prompts':
|
|
return f"Creating optimized prompts for {count} image{'s' if count != 1 else ''}"
|
|
elif function_name == 'optimize_content':
|
|
return f"Optimizing {count} article{'s' if count != 1 else ''} for SEO"
|
|
elif function_name == 'generate_site_structure':
|
|
return "Designing complete site architecture"
|
|
return f"Processing with AI"
|
|
|
|
def _get_parse_message(self, function_name: str) -> str:
|
|
"""Get user-friendly parse message"""
|
|
if function_name == 'auto_cluster':
|
|
return "Organizing semantic clusters"
|
|
elif function_name == 'generate_ideas':
|
|
return "Structuring article outlines"
|
|
elif function_name == 'generate_content':
|
|
return "Formatting HTML content and metadata"
|
|
elif function_name == 'generate_images':
|
|
return "Processing generated images"
|
|
elif function_name == 'generate_image_prompts':
|
|
return "Refining contextual image descriptions"
|
|
elif function_name == 'optimize_content':
|
|
return "Compiling optimization scores"
|
|
elif function_name == 'generate_site_structure':
|
|
return "Compiling site map"
|
|
return "Processing results"
|
|
|
|
def _get_parse_message_with_count(self, function_name: str, count: int) -> str:
|
|
"""Get user-friendly parse message with count"""
|
|
if function_name == 'auto_cluster':
|
|
return f"Organizing {count} semantic cluster{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_ideas':
|
|
return f"Structuring {count} article outline{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_content':
|
|
return f"Formatting {count} article{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_images':
|
|
return f"Processing {count} generated image{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_image_prompts':
|
|
# Count is total prompts, in-article is count - 1 (subtract featured)
|
|
in_article_count = max(0, count - 1)
|
|
if in_article_count > 0:
|
|
return f"Refining {in_article_count} in-article image description{'s' if in_article_count != 1 else ''}"
|
|
return "Refining image descriptions"
|
|
elif function_name == 'optimize_content':
|
|
return f"Compiling scores for {count} article{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_site_structure':
|
|
return f"{count} page blueprint{'s' if count != 1 else ''} mapped"
|
|
return f"{count} item{'s' if count != 1 else ''} processed"
|
|
|
|
def _get_save_message(self, function_name: str, count: int) -> str:
|
|
"""Get user-friendly save message"""
|
|
if function_name == 'auto_cluster':
|
|
return f"Saving {count} cluster{'s' if count != 1 else ''} with keywords"
|
|
elif function_name == 'generate_ideas':
|
|
return f"Saving {count} idea{'s' if count != 1 else ''} with outlines"
|
|
elif function_name == 'generate_content':
|
|
return f"Saving {count} article{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_images':
|
|
return f"Uploading {count} image{'s' if count != 1 else ''} to media library"
|
|
elif function_name == 'generate_image_prompts':
|
|
in_article = max(0, count - 1)
|
|
return f"Assigning {count} prompts (1 featured + {in_article} in-article)"
|
|
elif function_name == 'optimize_content':
|
|
return f"Saving optimization scores for {count} article{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_site_structure':
|
|
return f"Publishing {count} page blueprint{'s' if count != 1 else ''}"
|
|
return f"Saving {count} item{'s' if count != 1 else ''}"
|
|
|
|
def _get_done_message(self, function_name: str, result: dict) -> str:
|
|
"""Get user-friendly completion message with counts"""
|
|
count = result.get('count', 0)
|
|
|
|
if function_name == 'auto_cluster':
|
|
keyword_count = result.get('keywords_clustered', 0)
|
|
return f"✓ Organized {keyword_count} keywords into {count} semantic cluster{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_ideas':
|
|
return f"✓ Created {count} content idea{'s' if count != 1 else ''} with detailed outlines"
|
|
elif function_name == 'generate_content':
|
|
total_words = result.get('total_words', 0)
|
|
if total_words > 0:
|
|
return f"✓ Generated {count} article{'s' if count != 1 else ''} ({total_words:,} words)"
|
|
return f"✓ Generated {count} article{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_images':
|
|
return f"✓ Generated and saved {count} AI image{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_image_prompts':
|
|
in_article = max(0, count - 1)
|
|
return f"✓ Created {count} image prompt{'s' if count != 1 else ''} (1 featured + {in_article} in-article)"
|
|
elif function_name == 'optimize_content':
|
|
avg_score = result.get('average_score', 0)
|
|
if avg_score > 0:
|
|
return f"✓ Optimized {count} article{'s' if count != 1 else ''} (avg score: {avg_score}%)"
|
|
return f"✓ Optimized {count} article{'s' if count != 1 else ''}"
|
|
elif function_name == 'generate_site_structure':
|
|
return f"✓ Created {count} page blueprint{'s' if count != 1 else ''}"
|
|
return f"✓ {count} item{'s' if count != 1 else ''} completed"
|
|
|
|
def execute(self, fn: BaseAIFunction, payload: dict) -> dict:
|
|
"""
|
|
Unified execution pipeline for all AI functions.
|
|
|
|
Phases with improved percentage mapping:
|
|
- INIT (0-10%): Validation & preparation
|
|
- PREP (10-25%): Data loading & prompt building
|
|
- AI_CALL (25-70%): API call to provider (longest phase)
|
|
- PARSE (70-85%): Response parsing
|
|
- SAVE (85-98%): Database operations
|
|
- DONE (98-100%): Finalization
|
|
"""
|
|
function_name = fn.get_name()
|
|
self.step_tracker.function_name = function_name
|
|
|
|
try:
|
|
# Phase 1: INIT - Validation & Setup (0-10%)
|
|
# Extract input data for user-friendly messages
|
|
ids = payload.get('ids', [])
|
|
input_count = len(ids) if ids else 0
|
|
input_description = self._get_input_description(function_name, payload, input_count)
|
|
|
|
validated = fn.validate(payload, self.account)
|
|
if not validated['valid']:
|
|
return self._handle_error(validated['error'], fn)
|
|
|
|
# Build validation message with keyword names for auto_cluster
|
|
validation_message = self._build_validation_message(function_name, payload, input_count, input_description)
|
|
self.step_tracker.add_request_step("INIT", "success", validation_message)
|
|
self.tracker.update("INIT", 10, validation_message, meta=self.step_tracker.get_meta())
|
|
|
|
# Phase 2: PREP - Data Loading & Prompt Building (10-25%)
|
|
data = fn.prepare(payload, self.account)
|
|
if isinstance(data, (list, tuple)):
|
|
data_count = len(data)
|
|
elif isinstance(data, dict):
|
|
# Check for cluster_data (for generate_ideas) or keywords (for auto_cluster)
|
|
if 'cluster_data' in data:
|
|
data_count = len(data['cluster_data'])
|
|
elif 'keywords' in data:
|
|
data_count = len(data['keywords'])
|
|
else:
|
|
data_count = data.get('count', input_count)
|
|
else:
|
|
data_count = input_count
|
|
|
|
prep_message = self._get_prep_message(function_name, data_count, data)
|
|
|
|
prompt = fn.build_prompt(data, self.account)
|
|
|
|
self.step_tracker.add_request_step("PREP", "success", prep_message)
|
|
self.tracker.update("PREP", 25, prep_message, meta=self.step_tracker.get_meta())
|
|
|
|
# Phase 2.5: CREDIT CHECK - Check credits before AI call (25%)
|
|
if self.account:
|
|
try:
|
|
from igny8_core.business.billing.services.credit_service import CreditService
|
|
from igny8_core.business.billing.exceptions import InsufficientCreditsError
|
|
|
|
# Map function name to operation type
|
|
operation_type = self._get_operation_type(function_name)
|
|
|
|
# Calculate estimated cost
|
|
estimated_amount = self._get_estimated_amount(function_name, data, payload)
|
|
|
|
# Check credits BEFORE AI call
|
|
CreditService.check_credits(self.account, operation_type, estimated_amount)
|
|
|
|
logger.info(f"[AIEngine] Credit check passed: {operation_type}, estimated amount: {estimated_amount}")
|
|
except InsufficientCreditsError as e:
|
|
error_msg = str(e)
|
|
error_type = 'InsufficientCreditsError'
|
|
logger.error(f"[AIEngine] {error_msg}")
|
|
return self._handle_error(error_msg, fn, error_type=error_type)
|
|
except Exception as e:
|
|
logger.warning(f"[AIEngine] Failed to check credits: {e}", exc_info=True)
|
|
# Don't fail the operation if credit check fails (for backward compatibility)
|
|
|
|
# Phase 3: AI_CALL - Provider API Call (25-70%)
|
|
# Validate account exists before proceeding
|
|
if not self.account:
|
|
error_msg = "Account is required for AI function execution"
|
|
logger.error(f"[AIEngine] {error_msg}")
|
|
return self._handle_error(error_msg, fn)
|
|
|
|
ai_core = AICore(account=self.account)
|
|
function_name = fn.get_name()
|
|
|
|
# Generate function_id for tracking (ai-{function_name}-01)
|
|
# Normalize underscores to hyphens to match frontend tracking IDs
|
|
function_id_base = function_name.replace('_', '-')
|
|
function_id = f"ai-{function_id_base}-01-desktop"
|
|
|
|
# Get model config from settings (requires account)
|
|
# This will raise ValueError if IntegrationSettings not configured
|
|
try:
|
|
model_config = get_model_config(function_name, account=self.account)
|
|
model = model_config.get('model')
|
|
except ValueError as e:
|
|
# IntegrationSettings not configured or model missing
|
|
error_msg = str(e)
|
|
error_type = 'ConfigurationError'
|
|
logger.error(f"[AIEngine] {error_msg}")
|
|
return self._handle_error(error_msg, fn, error_type=error_type)
|
|
except Exception as e:
|
|
# Other unexpected errors
|
|
error_msg = f"Failed to get model configuration: {str(e)}"
|
|
error_type = type(e).__name__
|
|
logger.error(f"[AIEngine] {error_msg}", exc_info=True)
|
|
return self._handle_error(error_msg, fn, error_type=error_type)
|
|
|
|
# Debug logging: Show model configuration (console only, not in step tracker)
|
|
logger.info(f"[AIEngine] Model Configuration for {function_name}:")
|
|
logger.info(f" - Model from get_model_config: {model}")
|
|
logger.info(f" - Full model_config: {model_config}")
|
|
|
|
# Track AI call start with user-friendly message
|
|
ai_call_message = self._get_ai_call_message(function_name, data_count)
|
|
self.step_tracker.add_response_step("AI_CALL", "success", ai_call_message)
|
|
self.tracker.update("AI_CALL", 50, ai_call_message, meta=self.step_tracker.get_meta())
|
|
|
|
try:
|
|
# Use centralized run_ai_request()
|
|
raw_response = ai_core.run_ai_request(
|
|
prompt=prompt,
|
|
model=model,
|
|
max_tokens=model_config.get('max_tokens'),
|
|
temperature=model_config.get('temperature'),
|
|
response_format=model_config.get('response_format'),
|
|
function_name=function_name,
|
|
function_id=function_id # Pass function_id for tracking
|
|
)
|
|
except Exception as e:
|
|
error_msg = f"AI call failed: {str(e)}"
|
|
logger.error(f"Exception during AI call: {error_msg}", exc_info=True)
|
|
return self._handle_error(error_msg, fn)
|
|
|
|
if raw_response.get('error'):
|
|
error_msg = raw_response.get('error', 'Unknown AI error')
|
|
logger.error(f"AI call returned error: {error_msg}")
|
|
return self._handle_error(error_msg, fn)
|
|
|
|
if not raw_response.get('content'):
|
|
error_msg = "AI call returned no content"
|
|
logger.error(error_msg)
|
|
return self._handle_error(error_msg, fn)
|
|
|
|
# Track cost
|
|
self.cost_tracker.record(
|
|
function_name=function_name,
|
|
cost=raw_response.get('cost', 0),
|
|
tokens=raw_response.get('total_tokens', 0),
|
|
model=raw_response.get('model')
|
|
)
|
|
|
|
# Update AI_CALL step with results
|
|
self.step_tracker.response_steps[-1] = {
|
|
**self.step_tracker.response_steps[-1],
|
|
'message': f"Received {raw_response.get('total_tokens', 0)} tokens, Cost: ${raw_response.get('cost', 0):.6f}",
|
|
'duration': raw_response.get('duration')
|
|
}
|
|
self.tracker.update("AI_CALL", 70, f"AI response received ({raw_response.get('total_tokens', 0)} tokens)", meta=self.step_tracker.get_meta())
|
|
|
|
# Phase 4: PARSE - Response Parsing (70-85%)
|
|
try:
|
|
parse_message = self._get_parse_message(function_name)
|
|
response_content = raw_response.get('content', '')
|
|
parsed = fn.parse_response(response_content, self.step_tracker)
|
|
|
|
if isinstance(parsed, (list, tuple)):
|
|
parsed_count = len(parsed)
|
|
elif isinstance(parsed, dict):
|
|
# Check if it's a content dict (has 'content' field) or a result dict (has 'count')
|
|
if 'content' in parsed:
|
|
parsed_count = 1 # Single content item
|
|
else:
|
|
parsed_count = parsed.get('count', 1)
|
|
else:
|
|
parsed_count = 1
|
|
|
|
# Update parse message with count for better UX
|
|
parse_message = self._get_parse_message_with_count(function_name, parsed_count)
|
|
|
|
self.step_tracker.add_response_step("PARSE", "success", parse_message)
|
|
self.tracker.update("PARSE", 85, parse_message, meta=self.step_tracker.get_meta())
|
|
except Exception as parse_error:
|
|
error_msg = f"Failed to parse AI response: {str(parse_error)}"
|
|
logger.error(f"AIEngine: {error_msg}", exc_info=True)
|
|
logger.error(f"AIEngine: Response content was: {response_content[:500] if response_content else 'None'}...")
|
|
return self._handle_error(error_msg, fn)
|
|
|
|
# Phase 5: SAVE - Database Operations (85-98%)
|
|
save_result = fn.save_output(parsed, data, self.account, self.tracker, step_tracker=self.step_tracker)
|
|
clusters_created = save_result.get('clusters_created', 0)
|
|
keywords_updated = save_result.get('keywords_updated', 0)
|
|
count = save_result.get('count', 0)
|
|
|
|
# Use user-friendly save message based on function type
|
|
if clusters_created:
|
|
save_msg = f"Saving {clusters_created} cluster{'s' if clusters_created != 1 else ''}"
|
|
elif count:
|
|
save_msg = self._get_save_message(function_name, count)
|
|
else:
|
|
save_msg = self._get_save_message(function_name, data_count)
|
|
|
|
self.step_tracker.add_request_step("SAVE", "success", save_msg)
|
|
self.tracker.update("SAVE", 98, save_msg, meta=self.step_tracker.get_meta())
|
|
|
|
# Store save_msg for use in DONE phase
|
|
final_save_msg = save_msg
|
|
|
|
# Phase 5.5: DEDUCT CREDITS - Deduct credits after successful save
|
|
if self.account and raw_response:
|
|
try:
|
|
from igny8_core.business.billing.services.credit_service import CreditService
|
|
from igny8_core.business.billing.exceptions import InsufficientCreditsError
|
|
|
|
# Map function name to operation type
|
|
operation_type = self._get_operation_type(function_name)
|
|
|
|
# Get actual token usage from response (AI returns 'input_tokens' and 'output_tokens')
|
|
tokens_input = raw_response.get('input_tokens', 0)
|
|
tokens_output = raw_response.get('output_tokens', 0)
|
|
|
|
# Deduct credits based on actual token usage
|
|
CreditService.deduct_credits_for_operation(
|
|
account=self.account,
|
|
operation_type=operation_type,
|
|
tokens_input=tokens_input,
|
|
tokens_output=tokens_output,
|
|
cost_usd=raw_response.get('cost'),
|
|
model_used=raw_response.get('model', ''),
|
|
related_object_type=self._get_related_object_type(function_name),
|
|
related_object_id=save_result.get('id') or save_result.get('cluster_id') or save_result.get('task_id'),
|
|
metadata={
|
|
'function_name': function_name,
|
|
'clusters_created': clusters_created,
|
|
'keywords_updated': keywords_updated,
|
|
'count': count,
|
|
**save_result
|
|
}
|
|
)
|
|
|
|
logger.info(
|
|
f"[AIEngine] Credits deducted: {operation_type}, "
|
|
f"tokens: {tokens_input + tokens_output} ({tokens_input} in, {tokens_output} out)"
|
|
)
|
|
except InsufficientCreditsError as e:
|
|
# This shouldn't happen since we checked before, but log it
|
|
logger.error(f"[AIEngine] Insufficient credits during deduction: {e}")
|
|
except Exception as e:
|
|
logger.warning(f"[AIEngine] Failed to deduct credits: {e}", exc_info=True)
|
|
# Don't fail the operation if credit deduction fails (for backward compatibility)
|
|
|
|
# Phase 6: DONE - Finalization (98-100%)
|
|
done_msg = self._get_done_message(function_name, save_result)
|
|
self.step_tracker.add_request_step("DONE", "success", done_msg)
|
|
self.tracker.update("DONE", 100, done_msg, meta=self.step_tracker.get_meta())
|
|
|
|
# Log to database
|
|
self._log_to_database(fn, payload, parsed, save_result)
|
|
|
|
return {
|
|
'success': True,
|
|
**save_result,
|
|
'request_steps': self.step_tracker.request_steps,
|
|
'response_steps': self.step_tracker.response_steps,
|
|
'cost': self.cost_tracker.get_total(),
|
|
'tokens': self.cost_tracker.get_total_tokens()
|
|
}
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
error_type = type(e).__name__
|
|
logger.error(f"Error in AIEngine.execute for {function_name}: {error_msg}", exc_info=True)
|
|
return self._handle_error(error_msg, fn, exc_info=True, error_type=error_type)
|
|
|
|
def _handle_error(self, error: str, fn: BaseAIFunction = None, exc_info=False, error_type: str = None):
|
|
"""Centralized error handling"""
|
|
function_name = fn.get_name() if fn else 'unknown'
|
|
|
|
# Determine error type
|
|
if error_type:
|
|
final_error_type = error_type
|
|
elif isinstance(error, Exception):
|
|
final_error_type = type(error).__name__
|
|
else:
|
|
final_error_type = 'Error'
|
|
|
|
self.step_tracker.add_request_step("Error", "error", error, error=error)
|
|
|
|
error_meta = {
|
|
'error': error,
|
|
'error_type': final_error_type,
|
|
**self.step_tracker.get_meta()
|
|
}
|
|
self.tracker.error(error, meta=error_meta)
|
|
|
|
if exc_info:
|
|
logger.error(f"Error in {function_name}: {error}", exc_info=True)
|
|
else:
|
|
logger.error(f"Error in {function_name}: {error}")
|
|
|
|
self._log_to_database(fn, None, None, None, error=error)
|
|
|
|
return {
|
|
'success': False,
|
|
'error': error,
|
|
'error_type': final_error_type,
|
|
'request_steps': self.step_tracker.request_steps,
|
|
'response_steps': self.step_tracker.response_steps
|
|
}
|
|
|
|
def _log_to_database(
|
|
self,
|
|
fn: BaseAIFunction = None,
|
|
payload: dict = None,
|
|
parsed: Any = None,
|
|
save_result: dict = None,
|
|
error: str = None
|
|
):
|
|
"""Log to unified ai_task_logs table"""
|
|
try:
|
|
from igny8_core.ai.models import AITaskLog
|
|
|
|
# Only log if account exists (AITaskLog requires account)
|
|
if not self.account:
|
|
logger.warning("Cannot log AI task - no account available")
|
|
return
|
|
|
|
AITaskLog.objects.create(
|
|
task_id=self.task.request.id if self.task else None,
|
|
function_name=fn.get_name() if fn else None,
|
|
account=self.account,
|
|
phase=self.tracker.current_phase,
|
|
message=self.tracker.current_message,
|
|
status='error' if error else 'success',
|
|
duration=self.tracker.get_duration(),
|
|
cost=self.cost_tracker.get_total(),
|
|
tokens=self.cost_tracker.get_total_tokens(),
|
|
request_steps=self.step_tracker.request_steps,
|
|
response_steps=self.step_tracker.response_steps,
|
|
error=error,
|
|
payload=payload,
|
|
result=save_result
|
|
)
|
|
except Exception as e:
|
|
# Don't fail the task if logging fails
|
|
logger.warning(f"Failed to log to database: {e}")
|
|
|
|
def _get_operation_type(self, function_name):
|
|
"""Map function name to operation type for credit system"""
|
|
mapping = {
|
|
'auto_cluster': 'clustering',
|
|
'generate_ideas': 'idea_generation',
|
|
'generate_content': 'content_generation',
|
|
'generate_image_prompts': 'image_prompt_extraction',
|
|
'generate_images': 'image_generation',
|
|
'generate_site_structure': 'site_structure_generation',
|
|
}
|
|
return mapping.get(function_name, function_name)
|
|
|
|
def _get_estimated_amount(self, function_name, data, payload):
|
|
"""Get estimated amount for credit calculation (before operation)"""
|
|
if function_name == 'generate_content':
|
|
# Estimate word count - tasks don't have word_count field, use default
|
|
# data is a list of Task objects
|
|
if isinstance(data, list) and len(data) > 0:
|
|
# Multiple tasks - estimate 1000 words per task
|
|
return len(data) * 1000
|
|
return 1000 # Default estimate for single item
|
|
elif function_name == 'generate_images':
|
|
# Count images to generate
|
|
if isinstance(payload, dict):
|
|
image_ids = payload.get('image_ids', [])
|
|
return len(image_ids) if image_ids else 1
|
|
return 1
|
|
elif function_name == 'generate_ideas':
|
|
# Count clusters
|
|
if isinstance(data, dict) and 'cluster_data' in data:
|
|
return len(data['cluster_data'])
|
|
return 1
|
|
# For fixed cost operations (clustering, image_prompt_extraction), return None
|
|
return None
|
|
|
|
def _get_actual_amount(self, function_name, save_result, parsed, data):
|
|
"""Get actual amount for credit calculation (after operation)"""
|
|
if function_name == 'generate_content':
|
|
# Get actual word count from saved content
|
|
if isinstance(save_result, dict):
|
|
word_count = save_result.get('word_count')
|
|
if word_count and word_count > 0:
|
|
return word_count
|
|
# Fallback: estimate from parsed content
|
|
if isinstance(parsed, dict) and 'content' in parsed:
|
|
content = parsed['content']
|
|
return len(content.split()) if isinstance(content, str) else 1000
|
|
# Fallback: estimate from html_content if available
|
|
if isinstance(parsed, dict) and 'html_content' in parsed:
|
|
html_content = parsed['html_content']
|
|
if isinstance(html_content, str):
|
|
# Strip HTML tags for word count
|
|
import re
|
|
text = re.sub(r'<[^>]+>', '', html_content)
|
|
return len(text.split())
|
|
return 1000
|
|
elif function_name == 'generate_images':
|
|
# Count successfully generated images
|
|
count = save_result.get('count', 0)
|
|
if count > 0:
|
|
return count
|
|
return 1
|
|
elif function_name == 'generate_ideas':
|
|
# Count ideas generated
|
|
count = save_result.get('count', 0)
|
|
if count > 0:
|
|
return count
|
|
return 1
|
|
# For fixed cost operations, return None
|
|
return None
|
|
|
|
def _get_related_object_type(self, function_name):
|
|
"""Get related object type for credit logging"""
|
|
mapping = {
|
|
'auto_cluster': 'cluster',
|
|
'generate_ideas': 'content_idea',
|
|
'generate_content': 'content',
|
|
'generate_image_prompts': 'image',
|
|
'generate_images': 'image',
|
|
'generate_site_structure': 'site_blueprint',
|
|
}
|
|
return mapping.get(function_name, 'unknown')
|
|
|