693 lines
35 KiB
Python
693 lines
35 KiB
Python
"""
|
|
Unified Celery task entrypoint for all AI functions
|
|
"""
|
|
import logging
|
|
from celery import shared_task
|
|
from igny8_core.ai.engine import AIEngine
|
|
from igny8_core.ai.registry import get_function_instance
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def run_ai_task(self, function_name: str, payload: dict, account_id: int = None):
|
|
"""
|
|
Single Celery entrypoint for all AI functions.
|
|
Dynamically loads and executes the requested function.
|
|
|
|
Args:
|
|
function_name: Name of the AI function (e.g., 'auto_cluster')
|
|
payload: Function-specific payload
|
|
account_id: Account ID for account isolation
|
|
"""
|
|
logger.info("=" * 80)
|
|
logger.info(f"run_ai_task STARTED: {function_name}")
|
|
logger.info(f" - Task ID: {self.request.id}")
|
|
logger.info(f" - Function: {function_name}")
|
|
logger.info(f" - Account ID: {account_id}")
|
|
logger.info(f" - Payload keys: {list(payload.keys())}")
|
|
logger.info("=" * 80)
|
|
|
|
try:
|
|
# Get account
|
|
account = None
|
|
if account_id:
|
|
from igny8_core.auth.models import Account
|
|
try:
|
|
account = Account.objects.get(id=account_id)
|
|
except Account.DoesNotExist:
|
|
logger.warning(f"Account {account_id} not found")
|
|
|
|
# Get function from registry
|
|
fn = get_function_instance(function_name)
|
|
if not fn:
|
|
error_msg = f'Function {function_name} not found in registry'
|
|
logger.error(error_msg)
|
|
return {
|
|
'success': False,
|
|
'error': error_msg
|
|
}
|
|
|
|
# Create engine and execute
|
|
engine = AIEngine(celery_task=self, account=account)
|
|
result = engine.execute(fn, payload)
|
|
|
|
logger.info("=" * 80)
|
|
logger.info(f"run_ai_task COMPLETED: {function_name}")
|
|
logger.info(f" - Success: {result.get('success')}")
|
|
if not result.get('success'):
|
|
logger.error(f" - Error: {result.get('error')}")
|
|
logger.info("=" * 80)
|
|
|
|
# If execution failed, update state and return error (don't raise to avoid serialization issues)
|
|
if not result.get('success'):
|
|
error_msg = result.get('error', 'Task execution failed')
|
|
error_type = result.get('error_type', 'ExecutionError')
|
|
# Update task state with error details
|
|
error_meta = {
|
|
'error': error_msg,
|
|
'error_type': error_type,
|
|
'function_name': function_name,
|
|
'phase': result.get('phase', 'ERROR'),
|
|
'percentage': 0,
|
|
'message': f'Error: {error_msg}',
|
|
'request_steps': result.get('request_steps', []),
|
|
'response_steps': result.get('response_steps', [])
|
|
}
|
|
try:
|
|
self.update_state(
|
|
state='FAILURE',
|
|
meta=error_meta
|
|
)
|
|
except Exception as update_err:
|
|
logger.warning(f"Failed to update task state: {update_err}")
|
|
|
|
# Return error result - Celery will mark as FAILURE based on state
|
|
# Don't raise exception to avoid serialization issues
|
|
return {
|
|
'success': False,
|
|
'error': error_msg,
|
|
'error_type': error_type,
|
|
**error_meta
|
|
}
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
error_type = type(e).__name__
|
|
error_msg = str(e)
|
|
|
|
logger.error("=" * 80)
|
|
logger.error(f"run_ai_task FAILED: {function_name}")
|
|
logger.error(f" - Error: {error_type}: {error_msg}")
|
|
logger.error("=" * 80, exc_info=True)
|
|
|
|
# Update task state with error details (don't raise to avoid serialization issues)
|
|
error_meta = {
|
|
'error': error_msg,
|
|
'error_type': error_type,
|
|
'function_name': function_name,
|
|
'phase': 'ERROR',
|
|
'percentage': 0,
|
|
'message': f'Error: {error_msg}'
|
|
}
|
|
try:
|
|
self.update_state(
|
|
state='FAILURE',
|
|
meta=error_meta
|
|
)
|
|
except Exception as update_err:
|
|
logger.warning(f"Failed to update task state: {update_err}")
|
|
|
|
# Return error result - don't raise to avoid Celery serialization issues
|
|
return {
|
|
'success': False,
|
|
'error': error_msg,
|
|
'error_type': error_type,
|
|
'function_name': function_name,
|
|
**error_meta
|
|
}
|
|
|
|
|
|
@shared_task(bind=True, name='igny8_core.ai.tasks.process_image_generation_queue')
|
|
def process_image_generation_queue(self, image_ids: list, account_id: int = None, content_id: int = None):
|
|
"""
|
|
Process image generation queue sequentially (one image at a time)
|
|
Updates Celery task meta with progress for each image
|
|
"""
|
|
from typing import List
|
|
from igny8_core.modules.writer.models import Images, Content
|
|
from igny8_core.modules.system.models import IntegrationSettings
|
|
from igny8_core.ai.ai_core import AICore
|
|
from igny8_core.ai.prompts import PromptRegistry
|
|
|
|
logger.info("=" * 80)
|
|
logger.info(f"process_image_generation_queue STARTED")
|
|
logger.info(f" - Task ID: {self.request.id}")
|
|
logger.info(f" - Image IDs: {image_ids}")
|
|
logger.info(f" - Account ID: {account_id}")
|
|
logger.info(f" - Content ID: {content_id}")
|
|
logger.info("=" * 80)
|
|
|
|
account = None
|
|
if account_id:
|
|
from igny8_core.auth.models import Account
|
|
try:
|
|
account = Account.objects.get(id=account_id)
|
|
except Account.DoesNotExist:
|
|
logger.error(f"Account {account_id} not found")
|
|
return {'success': False, 'error': 'Account not found'}
|
|
|
|
# Initialize progress tracking
|
|
total_images = len(image_ids)
|
|
completed = 0
|
|
failed = 0
|
|
results = []
|
|
|
|
# Get image generation settings from IntegrationSettings
|
|
logger.info("[process_image_generation_queue] Step 1: Loading image generation settings")
|
|
try:
|
|
image_settings = IntegrationSettings.objects.get(
|
|
account=account,
|
|
integration_type='image_generation',
|
|
is_active=True
|
|
)
|
|
config = image_settings.config or {}
|
|
logger.info(f"[process_image_generation_queue] Image generation settings found. Config keys: {list(config.keys())}")
|
|
logger.info(f"[process_image_generation_queue] Full config: {config}")
|
|
|
|
# Get provider and model from config (respect user settings)
|
|
provider = config.get('provider', 'openai')
|
|
# Get model - try 'model' first, then 'imageModel' as fallback
|
|
model = config.get('model') or config.get('imageModel') or 'dall-e-3'
|
|
logger.info(f"[process_image_generation_queue] Using PROVIDER: {provider}, MODEL: {model} from settings")
|
|
image_type = config.get('image_type', 'realistic')
|
|
image_format = config.get('image_format', 'webp')
|
|
desktop_enabled = config.get('desktop_enabled', True)
|
|
mobile_enabled = config.get('mobile_enabled', True)
|
|
|
|
logger.info(f"[process_image_generation_queue] Settings loaded:")
|
|
logger.info(f" - Provider: {provider}")
|
|
logger.info(f" - Model: {model}")
|
|
logger.info(f" - Image type: {image_type}")
|
|
logger.info(f" - Image format: {image_format}")
|
|
logger.info(f" - Desktop enabled: {desktop_enabled}")
|
|
logger.info(f" - Mobile enabled: {mobile_enabled}")
|
|
except IntegrationSettings.DoesNotExist:
|
|
logger.error("[process_image_generation_queue] ERROR: Image generation settings not found")
|
|
logger.error(f"[process_image_generation_queue] Account: {account.id if account else 'None'}, integration_type: 'image_generation'")
|
|
return {'success': False, 'error': 'Image generation settings not found'}
|
|
except Exception as e:
|
|
logger.error(f"[process_image_generation_queue] ERROR loading image generation settings: {e}", exc_info=True)
|
|
return {'success': False, 'error': f'Error loading image generation settings: {str(e)}'}
|
|
|
|
# Get provider API key (using same approach as test image generation)
|
|
# Note: API key is stored as 'apiKey' (camelCase) in IntegrationSettings.config
|
|
logger.info(f"[process_image_generation_queue] Step 2: Loading {provider.upper()} API key")
|
|
try:
|
|
provider_settings = IntegrationSettings.objects.get(
|
|
account=account,
|
|
integration_type=provider, # Use the provider from settings
|
|
is_active=True
|
|
)
|
|
logger.info(f"[process_image_generation_queue] {provider.upper()} integration settings found")
|
|
logger.info(f"[process_image_generation_queue] {provider.upper()} config keys: {list(provider_settings.config.keys()) if provider_settings.config else 'None'}")
|
|
|
|
api_key = provider_settings.config.get('apiKey') if provider_settings.config else None
|
|
if not api_key:
|
|
logger.error(f"[process_image_generation_queue] {provider.upper()} API key not found in config")
|
|
logger.error(f"[process_image_generation_queue] {provider.upper()} config: {provider_settings.config}")
|
|
return {'success': False, 'error': f'{provider.upper()} API key not configured'}
|
|
|
|
# Log API key presence (but not the actual key for security)
|
|
api_key_preview = f"{api_key[:10]}...{api_key[-4:]}" if len(api_key) > 14 else "***"
|
|
logger.info(f"[process_image_generation_queue] {provider.upper()} API key retrieved successfully (length: {len(api_key)}, preview: {api_key_preview})")
|
|
except IntegrationSettings.DoesNotExist:
|
|
logger.error(f"[process_image_generation_queue] ERROR: {provider.upper()} integration settings not found")
|
|
logger.error(f"[process_image_generation_queue] Account: {account.id if account else 'None'}, integration_type: '{provider}'")
|
|
return {'success': False, 'error': f'{provider.upper()} integration not found or not active'}
|
|
except Exception as e:
|
|
logger.error(f"[process_image_generation_queue] ERROR getting {provider.upper()} API key: {e}", exc_info=True)
|
|
return {'success': False, 'error': f'Error retrieving {provider.upper()} API key: {str(e)}'}
|
|
|
|
# Get image prompt template (has placeholders: {image_type}, {post_title}, {image_prompt})
|
|
try:
|
|
image_prompt_template = PromptRegistry.get_image_prompt_template(account)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get image prompt template: {e}, using fallback")
|
|
image_prompt_template = 'Create a high-quality {image_type} image for a blog post titled "{post_title}". Image prompt: {image_prompt}'
|
|
|
|
# Get negative prompt for Runware (only needed for Runware provider)
|
|
negative_prompt = None
|
|
if provider == 'runware':
|
|
try:
|
|
negative_prompt = PromptRegistry.get_negative_prompt(account)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get negative prompt: {e}")
|
|
negative_prompt = None
|
|
|
|
# Initialize AICore
|
|
ai_core = AICore(account=account)
|
|
|
|
# Process each image sequentially
|
|
for index, image_id in enumerate(image_ids, 1):
|
|
try:
|
|
# Update task meta: current image processing (starting at 0%)
|
|
self.update_state(
|
|
state='PROGRESS',
|
|
meta={
|
|
'current_image': index,
|
|
'total_images': total_images,
|
|
'completed': completed,
|
|
'failed': failed,
|
|
'status': 'processing',
|
|
'current_image_id': image_id,
|
|
'current_image_progress': 0,
|
|
'results': results
|
|
}
|
|
)
|
|
|
|
# Load image record
|
|
logger.info(f"[process_image_generation_queue] Image {index}/{total_images} (ID: {image_id}): Loading from database")
|
|
try:
|
|
image = Images.objects.get(id=image_id, account=account)
|
|
logger.info(f"[process_image_generation_queue] Image {image_id} loaded:")
|
|
logger.info(f" - Type: {image.image_type}")
|
|
logger.info(f" - Status: {image.status}")
|
|
logger.info(f" - Prompt length: {len(image.prompt) if image.prompt else 0} chars")
|
|
logger.info(f" - Prompt preview: {image.prompt[:100] if image.prompt else 'None'}...")
|
|
logger.info(f" - Content ID: {image.content.id if image.content else 'None'}")
|
|
except Images.DoesNotExist:
|
|
logger.error(f"[process_image_generation_queue] Image {image_id} not found in database")
|
|
logger.error(f"[process_image_generation_queue] Account: {account.id if account else 'None'}")
|
|
results.append({
|
|
'image_id': image_id,
|
|
'status': 'failed',
|
|
'error': 'Image record not found'
|
|
})
|
|
failed += 1
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"[process_image_generation_queue] ERROR loading image {image_id}: {e}", exc_info=True)
|
|
results.append({
|
|
'image_id': image_id,
|
|
'status': 'failed',
|
|
'error': f'Error loading image: {str(e)[:180]}'
|
|
})
|
|
failed += 1
|
|
continue
|
|
|
|
# Check if prompt exists
|
|
if not image.prompt:
|
|
logger.warning(f"Image {image_id} has no prompt")
|
|
results.append({
|
|
'image_id': image_id,
|
|
'status': 'failed',
|
|
'error': 'No prompt found'
|
|
})
|
|
failed += 1
|
|
continue
|
|
|
|
# Get content for template formatting
|
|
content = image.content
|
|
if not content:
|
|
logger.warning(f"Image {image_id} has no content")
|
|
results.append({
|
|
'image_id': image_id,
|
|
'status': 'failed',
|
|
'error': 'No content associated'
|
|
})
|
|
failed += 1
|
|
continue
|
|
|
|
# Format template with image prompt from database
|
|
# For DALL-E 2: Use image prompt directly (no template), 1000 char limit
|
|
# For DALL-E 3: Use template with placeholders, 4000 char limit
|
|
# CRITICAL: DALL-E 2 has 1000 char limit, DALL-E 3 has 4000 char limit
|
|
image_prompt = image.prompt or ""
|
|
|
|
# Determine character limit based on model
|
|
if model == 'dall-e-2':
|
|
max_prompt_length = 1000
|
|
elif model == 'dall-e-3':
|
|
max_prompt_length = 4000
|
|
else:
|
|
# Default to 1000 for safety
|
|
max_prompt_length = 1000
|
|
logger.warning(f"Unknown model '{model}', using 1000 char limit")
|
|
|
|
logger.info(f"[process_image_generation_queue] Model: {model}, Max prompt length: {max_prompt_length} chars")
|
|
|
|
if model == 'dall-e-2':
|
|
# DALL-E 2: Use image prompt directly, no template
|
|
logger.info(f"[process_image_generation_queue] Using DALL-E 2 - skipping template, using image prompt directly")
|
|
formatted_prompt = image_prompt
|
|
|
|
# Truncate to 1000 chars if needed
|
|
if len(formatted_prompt) > max_prompt_length:
|
|
logger.warning(f"DALL-E 2 prompt too long ({len(formatted_prompt)} chars), truncating to {max_prompt_length}")
|
|
truncated = formatted_prompt[:max_prompt_length - 3]
|
|
last_space = truncated.rfind(' ')
|
|
if last_space > max_prompt_length * 0.9:
|
|
formatted_prompt = truncated[:last_space] + "..."
|
|
else:
|
|
formatted_prompt = formatted_prompt[:max_prompt_length]
|
|
else:
|
|
# DALL-E 3 and others: Use template
|
|
try:
|
|
# Truncate post_title (max 200 chars for DALL-E 3 to leave room for image_prompt)
|
|
post_title = content.title or content.meta_title or f"Content #{content.id}"
|
|
if len(post_title) > 200:
|
|
post_title = post_title[:197] + "..."
|
|
|
|
# Calculate actual template length with placeholders filled
|
|
# Format template with dummy values to measure actual length
|
|
template_with_dummies = image_prompt_template.format(
|
|
image_type=image_type,
|
|
post_title='X' * len(post_title), # Use same length as actual post_title
|
|
image_prompt='' # Empty to measure template overhead
|
|
)
|
|
template_overhead = len(template_with_dummies)
|
|
|
|
# Calculate max image_prompt length: max_prompt_length - template_overhead - safety margin (50)
|
|
max_image_prompt_length = max_prompt_length - template_overhead - 50
|
|
if max_image_prompt_length < 100:
|
|
# If template is too long, use minimum 100 chars for image_prompt
|
|
max_image_prompt_length = 100
|
|
logger.warning(f"Template is very long ({template_overhead} chars), limiting image_prompt to {max_image_prompt_length}")
|
|
|
|
logger.info(f"[process_image_generation_queue] Template overhead: {template_overhead} chars, max image_prompt: {max_image_prompt_length} chars")
|
|
|
|
# Truncate image_prompt to calculated max
|
|
if len(image_prompt) > max_image_prompt_length:
|
|
logger.warning(f"Image prompt too long ({len(image_prompt)} chars), truncating to {max_image_prompt_length}")
|
|
# Word-aware truncation
|
|
truncated = image_prompt[:max_image_prompt_length - 3]
|
|
last_space = truncated.rfind(' ')
|
|
if last_space > max_image_prompt_length * 0.8: # Only if we have a reasonable space
|
|
image_prompt = truncated[:last_space] + "..."
|
|
else:
|
|
image_prompt = image_prompt[:max_image_prompt_length - 3] + "..."
|
|
|
|
formatted_prompt = image_prompt_template.format(
|
|
image_type=image_type,
|
|
post_title=post_title,
|
|
image_prompt=image_prompt
|
|
)
|
|
|
|
# CRITICAL: Final safety check - truncate to model-specific limit
|
|
if len(formatted_prompt) > max_prompt_length:
|
|
logger.warning(f"Formatted prompt too long ({len(formatted_prompt)} chars), truncating to {max_prompt_length} for {model}")
|
|
# Try word-aware truncation
|
|
truncated = formatted_prompt[:max_prompt_length - 3]
|
|
last_space = truncated.rfind(' ')
|
|
if last_space > max_prompt_length * 0.9: # Only use word-aware if we have a reasonable space
|
|
formatted_prompt = truncated[:last_space] + "..."
|
|
else:
|
|
formatted_prompt = formatted_prompt[:max_prompt_length] # Hard truncate
|
|
|
|
# Double-check after truncation - MUST be <= max_prompt_length
|
|
if len(formatted_prompt) > max_prompt_length:
|
|
logger.error(f"Prompt still too long after truncation ({len(formatted_prompt)} chars), forcing hard truncate to {max_prompt_length}")
|
|
formatted_prompt = formatted_prompt[:max_prompt_length]
|
|
|
|
except Exception as e:
|
|
# Fallback if template formatting fails
|
|
logger.warning(f"Prompt template formatting failed: {e}, using image prompt directly")
|
|
formatted_prompt = image_prompt
|
|
# CRITICAL: Truncate to model-specific limit even in fallback
|
|
if len(formatted_prompt) > max_prompt_length:
|
|
logger.warning(f"Fallback prompt too long ({len(formatted_prompt)} chars), truncating to {max_prompt_length} for {model}")
|
|
# Try word-aware truncation
|
|
truncated = formatted_prompt[:max_prompt_length - 3]
|
|
last_space = truncated.rfind(' ')
|
|
if last_space > max_prompt_length * 0.9:
|
|
formatted_prompt = truncated[:last_space] + "..."
|
|
else:
|
|
formatted_prompt = formatted_prompt[:max_prompt_length] # Hard truncate
|
|
# Final hard truncate if still too long - MUST be <= max_prompt_length
|
|
if len(formatted_prompt) > max_prompt_length:
|
|
logger.error(f"Fallback prompt still too long ({len(formatted_prompt)} chars), forcing hard truncate to {max_prompt_length}")
|
|
formatted_prompt = formatted_prompt[:max_prompt_length]
|
|
|
|
# Generate image (using same approach as test image generation)
|
|
logger.info(f"[process_image_generation_queue] Generating image {index}/{total_images} (ID: {image_id})")
|
|
logger.info(f"[process_image_generation_queue] Provider: {provider}, Model: {model}")
|
|
logger.info(f"[process_image_generation_queue] Prompt length: {len(formatted_prompt)} (MUST be <= {max_prompt_length} for {model})")
|
|
if len(formatted_prompt) > max_prompt_length:
|
|
logger.error(f"[process_image_generation_queue] ERROR: Prompt is {len(formatted_prompt)} chars, truncating NOW to {max_prompt_length}!")
|
|
formatted_prompt = formatted_prompt[:max_prompt_length]
|
|
logger.info(f"[process_image_generation_queue] Final prompt length: {len(formatted_prompt)}")
|
|
logger.info(f"[process_image_generation_queue] Image type: {image_type}")
|
|
|
|
# Update progress: Starting image generation (0%)
|
|
self.update_state(
|
|
state='PROGRESS',
|
|
meta={
|
|
'current_image': index,
|
|
'total_images': total_images,
|
|
'completed': completed,
|
|
'failed': failed,
|
|
'status': 'processing',
|
|
'current_image_id': image_id,
|
|
'current_image_progress': 0,
|
|
'results': results
|
|
}
|
|
)
|
|
|
|
result = ai_core.generate_image(
|
|
prompt=formatted_prompt,
|
|
provider=provider,
|
|
model=model,
|
|
size='1024x1024',
|
|
api_key=api_key,
|
|
negative_prompt=negative_prompt,
|
|
function_name='generate_images_from_prompts'
|
|
)
|
|
|
|
# Update progress: Image generation complete (50%)
|
|
self.update_state(
|
|
state='PROGRESS',
|
|
meta={
|
|
'current_image': index,
|
|
'total_images': total_images,
|
|
'completed': completed,
|
|
'failed': failed,
|
|
'status': 'processing',
|
|
'current_image_id': image_id,
|
|
'current_image_progress': 50,
|
|
'results': results
|
|
}
|
|
)
|
|
|
|
logger.info(f"[process_image_generation_queue] Image generation result: has_url={bool(result.get('url'))}, has_error={bool(result.get('error'))}")
|
|
|
|
# Check for errors
|
|
if result.get('error'):
|
|
error_message = result.get('error', 'Unknown error')
|
|
logger.error(f"Image generation failed for {image_id}: {error_message}")
|
|
|
|
# Truncate error message to avoid database field length issues
|
|
# Some database fields may have 200 char limit, so truncate to 180 to be safe
|
|
truncated_error = error_message[:180] if len(error_message) > 180 else error_message
|
|
|
|
# Update image record: failed
|
|
try:
|
|
image.status = 'failed'
|
|
image.save(update_fields=['status'])
|
|
except Exception as save_error:
|
|
logger.error(f"Failed to save image status to database: {save_error}", exc_info=True)
|
|
# Continue even if save fails
|
|
|
|
results.append({
|
|
'image_id': image_id,
|
|
'status': 'failed',
|
|
'error': truncated_error
|
|
})
|
|
failed += 1
|
|
else:
|
|
logger.info(f"Image generation successful for {image_id}")
|
|
# Update image record: success
|
|
image_url = result.get('url')
|
|
logger.info(f"[process_image_generation_queue] Image {image_id} - URL received: {image_url[:100] if image_url else 'None'}...")
|
|
logger.info(f"[process_image_generation_queue] Image {image_id} - URL length: {len(image_url) if image_url else 0} characters")
|
|
|
|
# Update progress: Downloading image (75%)
|
|
self.update_state(
|
|
state='PROGRESS',
|
|
meta={
|
|
'current_image': index,
|
|
'total_images': total_images,
|
|
'completed': completed,
|
|
'failed': failed,
|
|
'status': 'processing',
|
|
'current_image_id': image_id,
|
|
'current_image_progress': 75,
|
|
'results': results
|
|
}
|
|
)
|
|
|
|
# Download and save image to /data/app/igny8/frontend/public/images/ai-images
|
|
saved_file_path = None
|
|
if image_url:
|
|
try:
|
|
import os
|
|
import requests
|
|
import time
|
|
|
|
# Use the correct path: /data/app/igny8/frontend/public/images/ai-images
|
|
# This is web-accessible via /images/ai-images/ (Vite serves from public/)
|
|
images_dir = '/data/app/igny8/frontend/public/images/ai-images'
|
|
|
|
# Create directory if it doesn't exist
|
|
os.makedirs(images_dir, exist_ok=True)
|
|
logger.info(f"[process_image_generation_queue] Image {image_id} - Using directory: {images_dir}")
|
|
|
|
# Generate filename: image_{image_id}_{timestamp}.png (or .webp for Runware)
|
|
timestamp = int(time.time())
|
|
# Use webp extension if provider is Runware, otherwise png
|
|
file_ext = 'webp' if provider == 'runware' else 'png'
|
|
filename = f"image_{image_id}_{timestamp}.{file_ext}"
|
|
file_path = os.path.join(images_dir, filename)
|
|
|
|
# Download image
|
|
logger.info(f"[process_image_generation_queue] Image {image_id} - Downloading from: {image_url[:100]}...")
|
|
response = requests.get(image_url, timeout=60)
|
|
response.raise_for_status()
|
|
|
|
# Save to file
|
|
with open(file_path, 'wb') as f:
|
|
f.write(response.content)
|
|
|
|
# Verify file was actually saved and exists
|
|
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
|
|
saved_file_path = file_path
|
|
logger.info(f"[process_image_generation_queue] Image {image_id} - Saved to: {file_path} ({len(response.content)} bytes, verified: {os.path.getsize(file_path)} bytes on disk)")
|
|
else:
|
|
logger.error(f"[process_image_generation_queue] Image {image_id} - File write appeared to succeed but file not found or empty: {file_path}")
|
|
saved_file_path = None
|
|
raise Exception(f"File was not saved successfully to {file_path}")
|
|
|
|
except Exception as download_error:
|
|
logger.error(f"[process_image_generation_queue] Image {image_id} - Failed to download/save image: {download_error}", exc_info=True)
|
|
# Continue with URL only if download fails
|
|
|
|
# Update progress: Saving to database (90%)
|
|
self.update_state(
|
|
state='PROGRESS',
|
|
meta={
|
|
'current_image': index,
|
|
'total_images': total_images,
|
|
'completed': completed,
|
|
'failed': failed,
|
|
'status': 'processing',
|
|
'current_image_id': image_id,
|
|
'current_image_progress': 90,
|
|
'results': results
|
|
}
|
|
)
|
|
|
|
# Log URL length for debugging (model field now supports up to 500 chars)
|
|
if image_url and len(image_url) > 500:
|
|
logger.error(f"[process_image_generation_queue] Image {image_id} - URL TOO LONG: {len(image_url)} chars (max 500). URL: {image_url[:150]}...")
|
|
logger.error(f"[process_image_generation_queue] Image {image_id} - CharField max_length=500 is too short! URL will be truncated.")
|
|
# Truncate to 500 chars if somehow longer (shouldn't happen, but safety check)
|
|
image_url = image_url[:500]
|
|
logger.warning(f"[process_image_generation_queue] Image {image_id} - Truncated URL length: {len(image_url)} chars")
|
|
elif image_url and len(image_url) > 200:
|
|
logger.info(f"[process_image_generation_queue] Image {image_id} - URL length {len(image_url)} chars (was limited to 200, now supports 500)")
|
|
|
|
try:
|
|
# Save file path and URL appropriately
|
|
if saved_file_path:
|
|
# Store local file path in image_path field
|
|
image.image_path = saved_file_path
|
|
# Also keep the original URL in image_url field for reference
|
|
if image_url:
|
|
image.image_url = image_url
|
|
logger.info(f"[process_image_generation_queue] Image {image_id} - Saved local path: {saved_file_path}")
|
|
else:
|
|
# Only URL available, save to image_url
|
|
image.image_url = image_url
|
|
logger.info(f"[process_image_generation_queue] Image {image_id} - Saved URL only: {image_url[:100] if image_url else 'None'}...")
|
|
image.status = 'generated'
|
|
|
|
# Determine which fields to update
|
|
update_fields = ['status']
|
|
if saved_file_path:
|
|
update_fields.append('image_path')
|
|
if image_url:
|
|
update_fields.append('image_url')
|
|
|
|
logger.info(f"[process_image_generation_queue] Image {image_id} - Attempting to save to database (fields: {update_fields})")
|
|
image.save(update_fields=update_fields)
|
|
logger.info(f"[process_image_generation_queue] Image {image_id} - Successfully saved to database")
|
|
except Exception as save_error:
|
|
error_str = str(save_error)
|
|
logger.error(f"[process_image_generation_queue] Image {image_id} - Database save FAILED: {error_str}", exc_info=True)
|
|
logger.error(f"[process_image_generation_queue] Image {image_id} - Error type: {type(save_error).__name__}")
|
|
|
|
# Continue even if save fails, but mark as failed in results
|
|
# Truncate error message to 180 chars to avoid same issue when saving error
|
|
truncated_error = error_str[:180] if len(error_str) > 180 else error_str
|
|
results.append({
|
|
'image_id': image_id,
|
|
'status': 'failed',
|
|
'error': f'Database save error: {truncated_error}'
|
|
})
|
|
failed += 1
|
|
else:
|
|
# Update progress: Complete (100%)
|
|
self.update_state(
|
|
state='PROGRESS',
|
|
meta={
|
|
'current_image': index,
|
|
'total_images': total_images,
|
|
'completed': completed + 1,
|
|
'failed': failed,
|
|
'status': 'processing',
|
|
'current_image_id': image_id,
|
|
'current_image_progress': 100,
|
|
'results': results + [{
|
|
'image_id': image_id,
|
|
'status': 'completed',
|
|
'image_url': image_url, # Original URL from API
|
|
'image_path': saved_file_path, # Local file path if saved
|
|
'revised_prompt': result.get('revised_prompt')
|
|
}]
|
|
}
|
|
)
|
|
|
|
results.append({
|
|
'image_id': image_id,
|
|
'status': 'completed',
|
|
'image_url': image_url, # Original URL from API
|
|
'image_path': saved_file_path, # Local file path if saved
|
|
'revised_prompt': result.get('revised_prompt')
|
|
})
|
|
completed += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing image {image_id}: {str(e)}", exc_info=True)
|
|
results.append({
|
|
'image_id': image_id,
|
|
'status': 'failed',
|
|
'error': str(e)
|
|
})
|
|
failed += 1
|
|
|
|
# Final state
|
|
logger.info("=" * 80)
|
|
logger.info(f"process_image_generation_queue COMPLETED")
|
|
logger.info(f" - Total: {total_images}")
|
|
logger.info(f" - Completed: {completed}")
|
|
logger.info(f" - Failed: {failed}")
|
|
logger.info("=" * 80)
|
|
|
|
return {
|
|
'success': True,
|
|
'total_images': total_images,
|
|
'completed': completed,
|
|
'failed': failed,
|
|
'results': results
|
|
} |