Files
igny8/backend/igny8_core/ai/tasks.py
2026-01-10 09:39:17 +00:00

903 lines
45 KiB
Python

"""
Unified Celery task entrypoint for all AI functions
"""
import logging
from celery import shared_task
from igny8_core.ai.engine import AIEngine
from igny8_core.ai.registry import get_function_instance
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3)
def run_ai_task(self, function_name: str, payload: dict, account_id: int):
"""
Single Celery entrypoint for all AI functions.
Dynamically loads and executes the requested function.
Args:
function_name: Name of the AI function (e.g., 'auto_cluster')
payload: Function-specific payload
account_id: Account ID for account isolation (required)
Raises:
Returns error dict if account_id not provided or account not found
"""
logger.info("=" * 80)
logger.info(f"run_ai_task STARTED: {function_name}")
logger.info(f" - Task ID: {self.request.id}")
logger.info(f" - Function: {function_name}")
logger.info(f" - Account ID: {account_id}")
logger.info(f" - Payload keys: {list(payload.keys())}")
logger.info("=" * 80)
try:
# Validate account_id is provided
if not account_id:
error_msg = "account_id is required for AI task execution"
logger.error(f"[run_ai_task] {error_msg}")
return {
'success': False,
'error': error_msg,
'error_type': 'ConfigurationError'
}
# Get account and validate it exists
from igny8_core.auth.models import Account
try:
account = Account.objects.get(id=account_id)
except Account.DoesNotExist:
error_msg = f"Account {account_id} not found"
logger.error(f"[run_ai_task] {error_msg}")
return {
'success': False,
'error': error_msg,
'error_type': 'AccountNotFound'
}
# Get function from registry
fn = get_function_instance(function_name)
if not fn:
error_msg = f'Function {function_name} not found in registry'
logger.error(error_msg)
return {
'success': False,
'error': error_msg
}
# Create engine and execute
engine = AIEngine(celery_task=self, account=account)
result = engine.execute(fn, payload)
logger.info("=" * 80)
logger.info(f"run_ai_task COMPLETED: {function_name}")
logger.info(f" - Success: {result.get('success')}")
if not result.get('success'):
logger.error(f" - Error: {result.get('error')}")
logger.info("=" * 80)
# If execution failed, update state and return error (don't raise to avoid serialization issues)
if not result.get('success'):
error_msg = result.get('error', 'Task execution failed')
error_type = result.get('error_type', 'ExecutionError')
# Update task state with error details
error_meta = {
'error': error_msg,
'error_type': error_type,
'function_name': function_name,
'phase': result.get('phase', 'ERROR'),
'percentage': 0,
'message': f'Error: {error_msg}',
'request_steps': result.get('request_steps', []),
'response_steps': result.get('response_steps', [])
}
try:
self.update_state(
state='FAILURE',
meta=error_meta
)
except Exception as update_err:
logger.warning(f"Failed to update task state: {update_err}")
# Return error result - Celery will mark as FAILURE based on state
# Don't raise exception to avoid serialization issues
return {
'success': False,
'error': error_msg,
'error_type': error_type,
**error_meta
}
return result
except Exception as e:
error_type = type(e).__name__
error_msg = str(e)
logger.error("=" * 80)
logger.error(f"run_ai_task FAILED: {function_name}")
logger.error(f" - Error: {error_type}: {error_msg}")
logger.error("=" * 80, exc_info=True)
# Update task state with error details (don't raise to avoid serialization issues)
error_meta = {
'error': error_msg,
'error_type': error_type,
'function_name': function_name,
'phase': 'ERROR',
'percentage': 0,
'message': f'Error: {error_msg}'
}
try:
self.update_state(
state='FAILURE',
meta=error_meta
)
except Exception as update_err:
logger.warning(f"Failed to update task state: {update_err}")
# Return error result - don't raise to avoid Celery serialization issues
return {
'success': False,
'error': error_msg,
'error_type': error_type,
'function_name': function_name,
**error_meta
}
@shared_task(bind=True, name='igny8_core.ai.tasks.process_image_generation_queue')
def process_image_generation_queue(self, image_ids: list, account_id: int = None, content_id: int = None):
"""
Process image generation queue sequentially (one image at a time)
Updates Celery task meta with progress for each image
"""
from typing import List
from igny8_core.modules.writer.models import Images, Content
from igny8_core.modules.system.models import IntegrationSettings
from igny8_core.ai.ai_core import AICore
from igny8_core.ai.prompts import PromptRegistry
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError, CreditCalculationError
logger.info("=" * 80)
logger.info(f"process_image_generation_queue STARTED")
logger.info(f" - Task ID: {self.request.id}")
logger.info(f" - Image IDs: {image_ids}")
logger.info(f" - Account ID: {account_id}")
logger.info(f" - Content ID: {content_id}")
logger.info("=" * 80)
account = None
if account_id:
from igny8_core.auth.models import Account
try:
account = Account.objects.get(id=account_id)
except Account.DoesNotExist:
logger.error(f"Account {account_id} not found")
return {'success': False, 'error': 'Account not found'}
# Initialize progress tracking
total_images = len(image_ids)
completed = 0
failed = 0
results = []
# Get image generation settings from AISettings (with account overrides)
logger.info("[process_image_generation_queue] Step 1: Loading image generation settings")
from igny8_core.modules.system.ai_settings import AISettings
from igny8_core.ai.model_registry import ModelRegistry
# Get effective settings
image_type = AISettings.get_effective_image_style(account)
image_format = 'webp' # Default format
# Get default image model from database
default_model = ModelRegistry.get_default_model('image')
if default_model:
model_config = ModelRegistry.get_model(default_model)
provider = model_config.provider if model_config else 'openai'
model = default_model
else:
provider = 'openai'
model = 'dall-e-3'
logger.info(f"[process_image_generation_queue] Using PROVIDER: {provider}, MODEL: {model} from settings")
# Style to prompt enhancement mapping
# These style descriptors are added to the image prompt for better results
STYLE_PROMPT_MAP = {
# Runware styles
'photorealistic': 'ultra realistic photography, natural lighting, real world look, photorealistic',
'illustration': 'digital illustration, clean lines, artistic style, modern illustration',
'3d_render': 'computer generated 3D render, modern polished 3D style, depth and dramatic lighting',
'minimal_flat': 'minimal flat design, simple shapes, flat colors, modern graphic design aesthetic',
'artistic': 'artistic painterly style, expressive brushstrokes, hand painted aesthetic',
'cartoon': 'cartoon stylized illustration, playful exaggerated forms, animated character style',
# DALL-E styles (mapped from OpenAI API style parameter)
'natural': 'natural realistic style',
'vivid': 'vivid dramatic hyper-realistic style',
# Legacy fallbacks
'realistic': 'ultra realistic photography, natural lighting, photorealistic',
}
# Get the style description for prompt enhancement
style_description = STYLE_PROMPT_MAP.get(image_type, STYLE_PROMPT_MAP.get('photorealistic'))
logger.info(f"[process_image_generation_queue] Style: {image_type} -> prompt enhancement: {style_description[:50]}...")
# Model-specific landscape sizes (square is always 1024x1024)
# For Runware models - based on Runware documentation for optimal results per model
# For OpenAI DALL-E 3 - uses 1792x1024 for landscape
MODEL_LANDSCAPE_SIZES = {
'runware:97@1': '1280x768', # Hi Dream Full landscape
'bria:10@1': '1344x768', # Bria 3.2 landscape (16:9)
'google:4@2': '1376x768', # Nano Banana landscape (16:9)
'dall-e-3': '1792x1024', # DALL-E 3 landscape
'dall-e-2': '1024x1024', # DALL-E 2 only supports square
}
DEFAULT_SQUARE_SIZE = '1024x1024'
# Get model-specific landscape size for featured images
model_landscape_size = MODEL_LANDSCAPE_SIZES.get(model, '1792x1024' if provider == 'openai' else '1280x768')
# Featured image always uses model-specific landscape size
featured_image_size = model_landscape_size
# In-article images: alternating square/landscape based on position (handled in image loop)
in_article_square_size = DEFAULT_SQUARE_SIZE
in_article_landscape_size = model_landscape_size
logger.info(f"[process_image_generation_queue] Settings loaded:")
logger.info(f" - Provider: {provider}")
logger.info(f" - Model: {model}")
logger.info(f" - Image type: {image_type}")
logger.info(f" - Image format: {image_format}")
logger.info(f" - Featured image size: {featured_image_size}")
logger.info(f" - In-article square: {in_article_square_size}, landscape: {in_article_landscape_size}")
# Get provider API key from IntegrationProvider (centralized)
logger.info(f"[process_image_generation_queue] Step 2: Loading {provider.upper()} API key from IntegrationProvider")
# Get API key from IntegrationProvider (centralized)
api_key = ModelRegistry.get_api_key(provider)
if not api_key:
logger.error(f"[process_image_generation_queue] {provider.upper()} API key not configured in IntegrationProvider")
return {'success': False, 'error': f'{provider.upper()} API key not configured'}
# Log API key presence (but not the actual key for security)
api_key_preview = f"{api_key[:10]}...{api_key[-4:]}" if len(api_key) > 14 else "***"
logger.info(f"[process_image_generation_queue] {provider.upper()} API key retrieved successfully (length: {len(api_key)}, preview: {api_key_preview})")
# Get image prompt template (has placeholders: {image_type}, {post_title}, {image_prompt})
try:
image_prompt_template = PromptRegistry.get_image_prompt_template(account)
except Exception as e:
logger.warning(f"Failed to get image prompt template: {e}, using fallback")
image_prompt_template = '{image_type} image for blog post titled "{post_title}": {image_prompt}'
# Get negative prompt for Runware (only needed for Runware provider)
negative_prompt = None
if provider == 'runware':
try:
negative_prompt = PromptRegistry.get_negative_prompt(account)
except Exception as e:
logger.warning(f"Failed to get negative prompt: {e}")
negative_prompt = None
# Initialize AICore
ai_core = AICore(account=account)
# Credit check before processing images
if account:
logger.info(f"[process_image_generation_queue] Step 3: Checking credits for {total_images} images")
try:
required_credits = CreditService.check_credits_for_image(
account=account,
model_name=model,
num_images=total_images
)
logger.info(f"[process_image_generation_queue] Credit check passed: {required_credits} credits required, {account.credits} available")
except InsufficientCreditsError as e:
error_msg = str(e)
logger.error(f"[process_image_generation_queue] Insufficient credits: {error_msg}")
return {
'success': False,
'error': error_msg,
'error_type': 'InsufficientCreditsError',
'total_images': total_images,
'completed': 0,
'failed': total_images,
'results': []
}
except CreditCalculationError as e:
# Model not found or no credits_per_image configured - log warning but continue
# This allows backward compatibility if model not configured
logger.warning(f"[process_image_generation_queue] Credit calculation warning: {e}")
logger.warning(f"[process_image_generation_queue] Proceeding without credit check (model may not be configured)")
except Exception as e:
# Don't fail for unexpected credit check errors - log and continue
logger.warning(f"[process_image_generation_queue] Unexpected credit check error: {e}")
# Process each image sequentially
for index, image_id in enumerate(image_ids, 1):
try:
# Update task meta: current image processing (starting at 0%)
self.update_state(
state='PROGRESS',
meta={
'current_image': index,
'total_images': total_images,
'completed': completed,
'failed': failed,
'status': 'processing',
'current_image_id': image_id,
'current_image_progress': 0,
'results': results
}
)
# Load image record
logger.info(f"[process_image_generation_queue] Image {index}/{total_images} (ID: {image_id}): Loading from database")
try:
image = Images.objects.get(id=image_id, account=account)
logger.info(f"[process_image_generation_queue] Image {image_id} loaded:")
logger.info(f" - Type: {image.image_type}")
logger.info(f" - Status: {image.status}")
logger.info(f" - Prompt length: {len(image.prompt) if image.prompt else 0} chars")
logger.info(f" - Prompt preview: {image.prompt[:100] if image.prompt else 'None'}...")
logger.info(f" - Content ID: {image.content.id if image.content else 'None'}")
except Images.DoesNotExist:
logger.error(f"[process_image_generation_queue] Image {image_id} not found in database")
logger.error(f"[process_image_generation_queue] Account: {account.id if account else 'None'}")
results.append({
'image_id': image_id,
'status': 'failed',
'error': 'Image record not found'
})
failed += 1
continue
except Exception as e:
logger.error(f"[process_image_generation_queue] ERROR loading image {image_id}: {e}", exc_info=True)
results.append({
'image_id': image_id,
'status': 'failed',
'error': f'Error loading image: {str(e)[:180]}'
})
failed += 1
continue
# Check if prompt exists
if not image.prompt:
logger.warning(f"Image {image_id} has no prompt")
results.append({
'image_id': image_id,
'status': 'failed',
'error': 'No prompt found'
})
failed += 1
continue
# Get content for template formatting
content = image.content
if not content:
logger.warning(f"Image {image_id} has no content")
results.append({
'image_id': image_id,
'status': 'failed',
'error': 'No content associated'
})
failed += 1
continue
# Format template with image prompt from database
# For DALL-E 2: Use image prompt directly (no template), 1000 char limit
# For DALL-E 3: Use template with placeholders, 4000 char limit
# CRITICAL: DALL-E 2 has 1000 char limit, DALL-E 3 has 4000 char limit
image_prompt = image.prompt or ""
# Determine character limit based on model
if model == 'dall-e-2':
max_prompt_length = 1000
elif model == 'dall-e-3':
max_prompt_length = 4000
else:
# Default to 1000 for safety
max_prompt_length = 1000
logger.warning(f"Unknown model '{model}', using 1000 char limit")
logger.info(f"[process_image_generation_queue] Model: {model}, Max prompt length: {max_prompt_length} chars")
if model == 'dall-e-2':
# DALL-E 2: Use image prompt directly, no template
logger.info(f"[process_image_generation_queue] Using DALL-E 2 - skipping template, using image prompt directly")
formatted_prompt = image_prompt
# Truncate to 1000 chars if needed
if len(formatted_prompt) > max_prompt_length:
logger.warning(f"DALL-E 2 prompt too long ({len(formatted_prompt)} chars), truncating to {max_prompt_length}")
truncated = formatted_prompt[:max_prompt_length - 3]
last_space = truncated.rfind(' ')
if last_space > max_prompt_length * 0.9:
formatted_prompt = truncated[:last_space] + "..."
else:
formatted_prompt = formatted_prompt[:max_prompt_length]
else:
# DALL-E 3 and others: Use template
try:
# Truncate post_title (max 200 chars for DALL-E 3 to leave room for image_prompt)
post_title = content.title or content.meta_title or f"Content #{content.id}"
if len(post_title) > 200:
post_title = post_title[:197] + "..."
# Calculate actual template length with placeholders filled
# Format template with dummy values to measure actual length
template_with_dummies = image_prompt_template.format(
image_type=style_description, # Use actual style description length
post_title='X' * len(post_title), # Use same length as actual post_title
image_prompt='' # Empty to measure template overhead
)
template_overhead = len(template_with_dummies)
# Calculate max image_prompt length: max_prompt_length - template_overhead - safety margin (50)
max_image_prompt_length = max_prompt_length - template_overhead - 50
if max_image_prompt_length < 100:
# If template is too long, use minimum 100 chars for image_prompt
max_image_prompt_length = 100
logger.warning(f"Template is very long ({template_overhead} chars), limiting image_prompt to {max_image_prompt_length}")
logger.info(f"[process_image_generation_queue] Template overhead: {template_overhead} chars, max image_prompt: {max_image_prompt_length} chars")
# Truncate image_prompt to calculated max
if len(image_prompt) > max_image_prompt_length:
logger.warning(f"Image prompt too long ({len(image_prompt)} chars), truncating to {max_image_prompt_length}")
# Word-aware truncation
truncated = image_prompt[:max_image_prompt_length - 3]
last_space = truncated.rfind(' ')
if last_space > max_image_prompt_length * 0.8: # Only if we have a reasonable space
image_prompt = truncated[:last_space] + "..."
else:
image_prompt = image_prompt[:max_image_prompt_length - 3] + "..."
formatted_prompt = image_prompt_template.format(
image_type=style_description, # Use full style description instead of raw value
post_title=post_title,
image_prompt=image_prompt
)
# CRITICAL: Final safety check - truncate to model-specific limit
if len(formatted_prompt) > max_prompt_length:
logger.warning(f"Formatted prompt too long ({len(formatted_prompt)} chars), truncating to {max_prompt_length} for {model}")
# Try word-aware truncation
truncated = formatted_prompt[:max_prompt_length - 3]
last_space = truncated.rfind(' ')
if last_space > max_prompt_length * 0.9: # Only use word-aware if we have a reasonable space
formatted_prompt = truncated[:last_space] + "..."
else:
formatted_prompt = formatted_prompt[:max_prompt_length] # Hard truncate
# Double-check after truncation - MUST be <= max_prompt_length
if len(formatted_prompt) > max_prompt_length:
logger.error(f"Prompt still too long after truncation ({len(formatted_prompt)} chars), forcing hard truncate to {max_prompt_length}")
formatted_prompt = formatted_prompt[:max_prompt_length]
except Exception as e:
# Fallback if template formatting fails
logger.warning(f"Prompt template formatting failed: {e}, using image prompt directly")
formatted_prompt = image_prompt
# CRITICAL: Truncate to model-specific limit even in fallback
if len(formatted_prompt) > max_prompt_length:
logger.warning(f"Fallback prompt too long ({len(formatted_prompt)} chars), truncating to {max_prompt_length} for {model}")
# Try word-aware truncation
truncated = formatted_prompt[:max_prompt_length - 3]
last_space = truncated.rfind(' ')
if last_space > max_prompt_length * 0.9:
formatted_prompt = truncated[:last_space] + "..."
else:
formatted_prompt = formatted_prompt[:max_prompt_length] # Hard truncate
# Final hard truncate if still too long - MUST be <= max_prompt_length
if len(formatted_prompt) > max_prompt_length:
logger.error(f"Fallback prompt still too long ({len(formatted_prompt)} chars), forcing hard truncate to {max_prompt_length}")
formatted_prompt = formatted_prompt[:max_prompt_length]
# Generate image (using same approach as test image generation)
logger.info(f"[process_image_generation_queue] Generating image {index}/{total_images} (ID: {image_id})")
logger.info(f"[process_image_generation_queue] Provider: {provider}, Model: {model}")
logger.info(f"[process_image_generation_queue] Prompt length: {len(formatted_prompt)} (MUST be <= {max_prompt_length} for {model})")
if len(formatted_prompt) > max_prompt_length:
logger.error(f"[process_image_generation_queue] ERROR: Prompt is {len(formatted_prompt)} chars, truncating NOW to {max_prompt_length}!")
formatted_prompt = formatted_prompt[:max_prompt_length]
logger.info(f"[process_image_generation_queue] Final prompt length: {len(formatted_prompt)}")
logger.info(f"[process_image_generation_queue] Image type: {image_type}")
# Update progress: Starting image generation (0%)
self.update_state(
state='PROGRESS',
meta={
'current_image': index,
'total_images': total_images,
'completed': completed,
'failed': failed,
'status': 'processing',
'current_image_id': image_id,
'current_image_progress': 0,
'results': results
}
)
# Use appropriate size based on image type and position
# Featured: Always landscape (model-specific)
# In-article: Alternating square/landscape based on position
# Position 0: Square (1024x1024)
# Position 1: Landscape (model-specific)
# Position 2: Square (1024x1024)
# Position 3: Landscape (model-specific)
if image.image_type == 'featured':
image_size = featured_image_size # Model-specific landscape
elif image.image_type == 'in_article':
# Alternate based on position: even=square, odd=landscape
position = image.position or 0
if position % 2 == 0: # Position 0, 2: Square
image_size = in_article_square_size
else: # Position 1, 3: Landscape
image_size = in_article_landscape_size
logger.info(f"[process_image_generation_queue] In-article image position {position}: using {'square' if position % 2 == 0 else 'landscape'} size {image_size}")
else: # desktop or other (legacy)
image_size = in_article_square_size # Default to square
# For DALL-E, convert image_type to style parameter
# image_type is from user settings (e.g., 'vivid', 'natural', 'realistic')
# DALL-E accepts 'vivid' or 'natural' - map accordingly
dalle_style = None
if provider == 'openai':
# Map image_type to DALL-E style
# 'natural' = more realistic photos (default)
# 'vivid' = hyper-real, dramatic images
if image_type in ['vivid']:
dalle_style = 'vivid'
else:
# Default to 'natural' for realistic photos
dalle_style = 'natural'
logger.info(f"[process_image_generation_queue] DALL-E style: {dalle_style} (from image_type: {image_type})")
result = ai_core.generate_image(
prompt=formatted_prompt,
provider=provider,
model=model,
size=image_size,
api_key=api_key,
negative_prompt=negative_prompt,
function_name='generate_images_from_prompts',
style=dalle_style
)
# Update progress: Image generation complete (50%)
self.update_state(
state='PROGRESS',
meta={
'current_image': index,
'total_images': total_images,
'completed': completed,
'failed': failed,
'status': 'processing',
'current_image_id': image_id,
'current_image_progress': 50,
'results': results
}
)
logger.info(f"[process_image_generation_queue] Image generation result: has_url={bool(result.get('url'))}, has_error={bool(result.get('error'))}")
# Check for errors
if result.get('error'):
error_message = result.get('error', 'Unknown error')
logger.error(f"Image generation failed for {image_id}: {error_message}")
# Truncate error message to avoid database field length issues
# Some database fields may have 200 char limit, so truncate to 180 to be safe
truncated_error = error_message[:180] if len(error_message) > 180 else error_message
# Update image record: failed
try:
image.status = 'failed'
image.save(update_fields=['status'])
except Exception as save_error:
logger.error(f"Failed to save image status to database: {save_error}", exc_info=True)
# Continue even if save fails
results.append({
'image_id': image_id,
'status': 'failed',
'error': truncated_error
})
failed += 1
else:
logger.info(f"Image generation successful for {image_id}")
# Update image record: success
image_url = result.get('url')
logger.info(f"[process_image_generation_queue] Image {image_id} - URL received: {image_url[:100] if image_url else 'None'}...")
logger.info(f"[process_image_generation_queue] Image {image_id} - URL length: {len(image_url) if image_url else 0} characters")
# Update progress: Downloading image (75%)
self.update_state(
state='PROGRESS',
meta={
'current_image': index,
'total_images': total_images,
'completed': completed,
'failed': failed,
'status': 'processing',
'current_image_id': image_id,
'current_image_progress': 75,
'results': results
}
)
# Download and save image to /data/app/igny8/frontend/public/images/ai-images
saved_file_path = None
if image_url:
try:
import os
import requests
import time
# Use the correct path: /data/app/igny8/frontend/public/images/ai-images
# This is web-accessible via /images/ai-images/ (Vite serves from public/)
images_dir = '/data/app/igny8/frontend/public/images/ai-images'
# Create directory if it doesn't exist
os.makedirs(images_dir, exist_ok=True)
logger.info(f"[process_image_generation_queue] Image {image_id} - Using directory: {images_dir}")
# Generate filename: image_{image_id}_{timestamp}.png (or .webp for Runware)
timestamp = int(time.time())
# Use webp extension if provider is Runware, otherwise png
file_ext = 'webp' if provider == 'runware' else 'png'
filename = f"image_{image_id}_{timestamp}.{file_ext}"
file_path = os.path.join(images_dir, filename)
# Download image
logger.info(f"[process_image_generation_queue] Image {image_id} - Downloading from: {image_url[:100]}...")
response = requests.get(image_url, timeout=60)
response.raise_for_status()
# Save to file
with open(file_path, 'wb') as f:
f.write(response.content)
# Verify file was actually saved and exists
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
saved_file_path = file_path
logger.info(f"[process_image_generation_queue] Image {image_id} - Saved to: {file_path} ({len(response.content)} bytes, verified: {os.path.getsize(file_path)} bytes on disk)")
else:
logger.error(f"[process_image_generation_queue] Image {image_id} - File write appeared to succeed but file not found or empty: {file_path}")
saved_file_path = None
raise Exception(f"File was not saved successfully to {file_path}")
except Exception as download_error:
logger.error(f"[process_image_generation_queue] Image {image_id} - Failed to download/save image: {download_error}", exc_info=True)
# Continue with URL only if download fails
# Update progress: Saving to database (90%)
self.update_state(
state='PROGRESS',
meta={
'current_image': index,
'total_images': total_images,
'completed': completed,
'failed': failed,
'status': 'processing',
'current_image_id': image_id,
'current_image_progress': 90,
'results': results
}
)
# Log URL length for debugging (model field now supports up to 500 chars)
if image_url and len(image_url) > 500:
logger.error(f"[process_image_generation_queue] Image {image_id} - URL TOO LONG: {len(image_url)} chars (max 500). URL: {image_url[:150]}...")
logger.error(f"[process_image_generation_queue] Image {image_id} - CharField max_length=500 is too short! URL will be truncated.")
# Truncate to 500 chars if somehow longer (shouldn't happen, but safety check)
image_url = image_url[:500]
logger.warning(f"[process_image_generation_queue] Image {image_id} - Truncated URL length: {len(image_url)} chars")
elif image_url and len(image_url) > 200:
logger.info(f"[process_image_generation_queue] Image {image_id} - URL length {len(image_url)} chars (was limited to 200, now supports 500)")
try:
# Save file path and URL appropriately
if saved_file_path:
# Store local file path in image_path field
image.image_path = saved_file_path
# Also keep the original URL in image_url field for reference
if image_url:
image.image_url = image_url
logger.info(f"[process_image_generation_queue] Image {image_id} - Saved local path: {saved_file_path}")
else:
# Only URL available, save to image_url
image.image_url = image_url
logger.info(f"[process_image_generation_queue] Image {image_id} - Saved URL only: {image_url[:100] if image_url else 'None'}...")
image.status = 'generated'
# Determine which fields to update
update_fields = ['status']
if saved_file_path:
update_fields.append('image_path')
if image_url:
update_fields.append('image_url')
logger.info(f"[process_image_generation_queue] Image {image_id} - Attempting to save to database (fields: {update_fields})")
image.save(update_fields=update_fields)
logger.info(f"[process_image_generation_queue] Image {image_id} - Successfully saved to database")
except Exception as save_error:
error_str = str(save_error)
logger.error(f"[process_image_generation_queue] Image {image_id} - Database save FAILED: {error_str}", exc_info=True)
logger.error(f"[process_image_generation_queue] Image {image_id} - Error type: {type(save_error).__name__}")
# Continue even if save fails, but mark as failed in results
# Truncate error message to 180 chars to avoid same issue when saving error
truncated_error = error_str[:180] if len(error_str) > 180 else error_str
results.append({
'image_id': image_id,
'status': 'failed',
'error': f'Database save error: {truncated_error}'
})
failed += 1
else:
# Deduct credits for successful image generation
credits_deducted = 0
cost_usd = result.get('cost', 0) or result.get('cost_usd', 0) # generate_image returns 'cost'
if account:
try:
credits_deducted = CreditService.deduct_credits_for_image(
account=account,
model_name=model,
num_images=1,
description=f"Image generation: {content.title[:50] if content else 'Image'}" if content else f"Image {image_id}",
metadata={
'image_id': image_id,
'content_id': content_id,
'provider': provider,
'model': model,
'image_type': image.image_type if image else 'unknown',
'size': image_size,
},
cost_usd=cost_usd,
related_object_type='image',
related_object_id=image_id
)
logger.info(f"[process_image_generation_queue] Credits deducted for image {image_id}: account balance now {credits_deducted}")
except Exception as credit_error:
logger.error(f"[process_image_generation_queue] Failed to deduct credits for image {image_id}: {credit_error}")
# Don't fail the image generation if credit deduction fails
# Update progress: Complete (100%)
self.update_state(
state='PROGRESS',
meta={
'current_image': index,
'total_images': total_images,
'completed': completed + 1,
'failed': failed,
'status': 'processing',
'current_image_id': image_id,
'current_image_progress': 100,
'results': results + [{
'image_id': image_id,
'status': 'completed',
'image_url': image_url, # Original URL from API
'image_path': saved_file_path, # Local file path if saved
'revised_prompt': result.get('revised_prompt')
}]
}
)
results.append({
'image_id': image_id,
'status': 'completed',
'image_url': image_url, # Original URL from API
'image_path': saved_file_path, # Local file path if saved
'revised_prompt': result.get('revised_prompt')
})
completed += 1
except Exception as e:
logger.error(f"Error processing image {image_id}: {str(e)}", exc_info=True)
results.append({
'image_id': image_id,
'status': 'failed',
'error': str(e)
})
failed += 1
# Check if all images for the content are generated and update status to 'review'
if content_id and completed > 0:
try:
from igny8_core.business.content.models import Content, Images
content = Content.objects.get(id=content_id)
# Check if all images for this content are now generated
all_images = Images.objects.filter(content=content)
pending_images = all_images.filter(status='pending').count()
# If no pending images and content is still in draft, move to review
if pending_images == 0 and content.status == 'draft':
content.status = 'review'
content.save(update_fields=['status'])
logger.info(f"[process_image_generation_queue] Content #{content_id} status updated to 'review' (all images generated)")
except Exception as e:
logger.error(f"[process_image_generation_queue] Error updating content status: {str(e)}", exc_info=True)
# Final state
logger.info("=" * 80)
logger.info(f"process_image_generation_queue COMPLETED")
logger.info(f" - Total: {total_images}")
logger.info(f" - Completed: {completed}")
logger.info(f" - Failed: {failed}")
logger.info("=" * 80)
# Log to AITaskLog for consistency with other AI functions
if account:
try:
from igny8_core.ai.models import AITaskLog
import time
# Calculate total cost from results
total_cost = sum(r.get('cost', 0) for r in results if r.get('status') == 'completed')
AITaskLog.objects.create(
task_id=self.request.id,
function_name='generate_images',
account=account,
phase='DONE' if completed > 0 else 'ERROR',
message=f'Generated {completed} images ({failed} failed)' if completed > 0 else f'All {total_images} images failed',
status='success' if completed > 0 else 'error',
duration=0, # Could track actual duration if needed
cost=total_cost,
tokens=0, # Image generation doesn't use tokens
request_steps=[{
'phase': 'IMAGE_GENERATION',
'status': 'success' if completed > 0 else 'error',
'message': f'Processed {total_images} images: {completed} completed, {failed} failed'
}],
response_steps=[],
error=None if completed > 0 else f'All {total_images} images failed',
payload={'image_ids': image_ids, 'content_id': content_id},
result={
'total_images': total_images,
'completed': completed,
'failed': failed,
'model': model,
'provider': provider
}
)
logger.info(f"[process_image_generation_queue] AITaskLog entry created")
except Exception as log_error:
logger.warning(f"[process_image_generation_queue] Failed to create AITaskLog: {log_error}")
# Create notification for image generation completion
if account:
try:
from igny8_core.business.notifications.services import NotificationService
if completed > 0:
NotificationService.notify_images_complete(
account=account,
image_count=completed
)
elif failed > 0:
NotificationService.notify_images_failed(
account=account,
error=f'{failed} images failed to generate',
image_count=failed
)
logger.info(f"[process_image_generation_queue] Notification created")
except Exception as notif_error:
logger.warning(f"[process_image_generation_queue] Failed to create notification: {notif_error}")
return {
'success': True,
'total_images': total_images,
'completed': completed,
'failed': failed,
'results': results
}