""" Unified Celery task entrypoint for all AI functions """ import logging from celery import shared_task from igny8_core.ai.engine import AIEngine from igny8_core.ai.registry import get_function_instance logger = logging.getLogger(__name__) @shared_task(bind=True, max_retries=3) def run_ai_task(self, function_name: str, payload: dict, account_id: int = None): """ Single Celery entrypoint for all AI functions. Dynamically loads and executes the requested function. Args: function_name: Name of the AI function (e.g., 'auto_cluster') payload: Function-specific payload account_id: Account ID for account isolation """ logger.info("=" * 80) logger.info(f"run_ai_task STARTED: {function_name}") logger.info(f" - Task ID: {self.request.id}") logger.info(f" - Function: {function_name}") logger.info(f" - Account ID: {account_id}") logger.info(f" - Payload keys: {list(payload.keys())}") logger.info("=" * 80) try: # Get account account = None if account_id: from igny8_core.auth.models import Account try: account = Account.objects.get(id=account_id) except Account.DoesNotExist: logger.warning(f"Account {account_id} not found") # Get function from registry fn = get_function_instance(function_name) if not fn: error_msg = f'Function {function_name} not found in registry' logger.error(error_msg) return { 'success': False, 'error': error_msg } # Create engine and execute engine = AIEngine(celery_task=self, account=account) result = engine.execute(fn, payload) logger.info("=" * 80) logger.info(f"run_ai_task COMPLETED: {function_name}") logger.info(f" - Success: {result.get('success')}") if not result.get('success'): logger.error(f" - Error: {result.get('error')}") logger.info("=" * 80) # If execution failed, update state and return error (don't raise to avoid serialization issues) if not result.get('success'): error_msg = result.get('error', 'Task execution failed') error_type = result.get('error_type', 'ExecutionError') # Update task state with error details error_meta = { 'error': error_msg, 'error_type': error_type, 'function_name': function_name, 'phase': result.get('phase', 'ERROR'), 'percentage': 0, 'message': f'Error: {error_msg}', 'request_steps': result.get('request_steps', []), 'response_steps': result.get('response_steps', []) } try: self.update_state( state='FAILURE', meta=error_meta ) except Exception as update_err: logger.warning(f"Failed to update task state: {update_err}") # Return error result - Celery will mark as FAILURE based on state # Don't raise exception to avoid serialization issues return { 'success': False, 'error': error_msg, 'error_type': error_type, **error_meta } return result except Exception as e: error_type = type(e).__name__ error_msg = str(e) logger.error("=" * 80) logger.error(f"run_ai_task FAILED: {function_name}") logger.error(f" - Error: {error_type}: {error_msg}") logger.error("=" * 80, exc_info=True) # Update task state with error details (don't raise to avoid serialization issues) error_meta = { 'error': error_msg, 'error_type': error_type, 'function_name': function_name, 'phase': 'ERROR', 'percentage': 0, 'message': f'Error: {error_msg}' } try: self.update_state( state='FAILURE', meta=error_meta ) except Exception as update_err: logger.warning(f"Failed to update task state: {update_err}") # Return error result - don't raise to avoid Celery serialization issues return { 'success': False, 'error': error_msg, 'error_type': error_type, 'function_name': function_name, **error_meta } @shared_task(bind=True, name='igny8_core.ai.tasks.process_image_generation_queue') def process_image_generation_queue(self, image_ids: list, account_id: int = None, content_id: int = None): """ Process image generation queue sequentially (one image at a time) Updates Celery task meta with progress for each image """ from typing import List from igny8_core.modules.writer.models import Images, Content from igny8_core.modules.system.models import IntegrationSettings from igny8_core.ai.ai_core import AICore from igny8_core.ai.prompts import PromptRegistry logger.info("=" * 80) logger.info(f"process_image_generation_queue STARTED") logger.info(f" - Task ID: {self.request.id}") logger.info(f" - Image IDs: {image_ids}") logger.info(f" - Account ID: {account_id}") logger.info(f" - Content ID: {content_id}") logger.info("=" * 80) account = None if account_id: from igny8_core.auth.models import Account try: account = Account.objects.get(id=account_id) except Account.DoesNotExist: logger.error(f"Account {account_id} not found") return {'success': False, 'error': 'Account not found'} # Initialize progress tracking total_images = len(image_ids) completed = 0 failed = 0 results = [] # Get image generation settings from IntegrationSettings try: image_settings = IntegrationSettings.objects.get( account=account, integration_type='image_generation', is_active=True ) config = image_settings.config or {} provider = config.get('provider', 'openai') model = config.get('model', 'dall-e-3') image_type = config.get('image_type', 'realistic') image_format = config.get('image_format', 'webp') desktop_enabled = config.get('desktop_enabled', True) mobile_enabled = config.get('mobile_enabled', True) except IntegrationSettings.DoesNotExist: logger.error("Image generation settings not found") return {'success': False, 'error': 'Image generation settings not found'} # Get provider API key (using same approach as test image generation) # Note: API key is stored as 'apiKey' (camelCase) in IntegrationSettings.config try: provider_settings = IntegrationSettings.objects.get( account=account, integration_type=provider, # 'openai' or 'runware' is_active=True ) api_key = provider_settings.config.get('apiKey') if provider_settings.config else None if not api_key: logger.error(f"[process_image_generation_queue] {provider.upper()} API key not found in config") return {'success': False, 'error': f'{provider.upper()} API key not configured'} logger.info(f"[process_image_generation_queue] {provider.upper()} API key retrieved successfully") except IntegrationSettings.DoesNotExist: logger.error(f"[process_image_generation_queue] {provider.upper()} integration settings not found") return {'success': False, 'error': f'{provider.upper()} integration not found or not active'} except Exception as e: logger.error(f"[process_image_generation_queue] Error getting {provider} API key: {e}", exc_info=True) return {'success': False, 'error': f'Error retrieving {provider} API key: {str(e)}'} # Get image prompt template (has placeholders: {image_type}, {post_title}, {image_prompt}) try: image_prompt_template = PromptRegistry.get_image_prompt_template(account) except Exception as e: logger.warning(f"Failed to get image prompt template: {e}, using fallback") image_prompt_template = 'Create a high-quality {image_type} image for a blog post titled "{post_title}". Image prompt: {image_prompt}' # Get negative prompt for Runware (only needed for Runware provider) negative_prompt = None if provider == 'runware': try: negative_prompt = PromptRegistry.get_negative_prompt(account) except Exception as e: logger.warning(f"Failed to get negative prompt: {e}") negative_prompt = None # Initialize AICore ai_core = AICore(account=account) # Process each image sequentially for index, image_id in enumerate(image_ids, 1): try: # Update task meta: current image processing self.update_state( state='PROGRESS', meta={ 'current_image': index, 'total_images': total_images, 'completed': completed, 'failed': failed, 'status': 'processing', 'current_image_id': image_id, 'results': results } ) # Load image record try: image = Images.objects.get(id=image_id, account=account) except Images.DoesNotExist: logger.error(f"Image {image_id} not found") results.append({ 'image_id': image_id, 'status': 'failed', 'error': 'Image record not found' }) failed += 1 continue # Check if prompt exists if not image.prompt: logger.warning(f"Image {image_id} has no prompt") results.append({ 'image_id': image_id, 'status': 'failed', 'error': 'No prompt found' }) failed += 1 continue # Get content for template formatting content = image.content if not content: logger.warning(f"Image {image_id} has no content") results.append({ 'image_id': image_id, 'status': 'failed', 'error': 'No content associated' }) failed += 1 continue # Format template with image prompt from database # Template has placeholders: {image_type}, {post_title}, {image_prompt} try: formatted_prompt = image_prompt_template.format( image_type=image_type, post_title=content.title or content.meta_title or f"Content #{content.id}", image_prompt=image.prompt # Read directly from database field ) except Exception as e: # Fallback if template formatting fails logger.warning(f"Prompt template formatting failed: {e}, using image prompt directly") formatted_prompt = image.prompt # Generate image (using same approach as test image generation) logger.info(f"[process_image_generation_queue] Generating image {index}/{total_images} (ID: {image_id})") logger.info(f"[process_image_generation_queue] Provider: {provider}, Model: {model}") logger.info(f"[process_image_generation_queue] Prompt length: {len(formatted_prompt)}") logger.info(f"[process_image_generation_queue] Image type: {image_type}") result = ai_core.generate_image( prompt=formatted_prompt, provider=provider, model=model, size='1024x1024', api_key=api_key, negative_prompt=negative_prompt, function_name='generate_images_from_prompts' ) logger.info(f"[process_image_generation_queue] Image generation result: has_url={bool(result.get('url'))}, has_error={bool(result.get('error'))}") # Check for errors if result.get('error'): logger.error(f"Image generation failed for {image_id}: {result.get('error')}") # Update image record: failed image.status = 'failed' image.save(update_fields=['status']) results.append({ 'image_id': image_id, 'status': 'failed', 'error': result.get('error') }) failed += 1 else: logger.info(f"Image generation successful for {image_id}") # Update image record: success image.image_url = result.get('url') image.status = 'generated' image.save(update_fields=['image_url', 'status']) results.append({ 'image_id': image_id, 'status': 'completed', 'image_url': result.get('url'), 'revised_prompt': result.get('revised_prompt') }) completed += 1 except Exception as e: logger.error(f"Error processing image {image_id}: {str(e)}", exc_info=True) results.append({ 'image_id': image_id, 'status': 'failed', 'error': str(e) }) failed += 1 # Final state logger.info("=" * 80) logger.info(f"process_image_generation_queue COMPLETED") logger.info(f" - Total: {total_images}") logger.info(f" - Completed: {completed}") logger.info(f" - Failed: {failed}") logger.info("=" * 80) return { 'success': True, 'total_images': total_images, 'completed': completed, 'failed': failed, 'results': results }