From 14beeed75ce5a9c473fe261135dbc3a37522e69a Mon Sep 17 00:00:00 2001 From: Desktop Date: Mon, 10 Nov 2025 23:51:59 +0500 Subject: [PATCH] Stage 3 & stage 4 --- backend/igny8_core/ai/processor.py | 75 -- backend/igny8_core/modules/planner/tasks.py | 735 ------------ backend/igny8_core/modules/writer/tasks.py | 1156 ------------------- backend/igny8_core/modules/writer/views.py | 75 +- backend/igny8_core/utils/ai_processor.py | 36 +- frontend/src/services/api.ts | 132 +-- frontend/src/store/aiRequestLogsStore.ts | 122 -- 7 files changed, 72 insertions(+), 2259 deletions(-) delete mode 100644 backend/igny8_core/ai/processor.py delete mode 100644 backend/igny8_core/modules/planner/tasks.py delete mode 100644 backend/igny8_core/modules/writer/tasks.py delete mode 100644 frontend/src/store/aiRequestLogsStore.ts diff --git a/backend/igny8_core/ai/processor.py b/backend/igny8_core/ai/processor.py deleted file mode 100644 index 671a569a..00000000 --- a/backend/igny8_core/ai/processor.py +++ /dev/null @@ -1,75 +0,0 @@ -""" -AI Processor wrapper for the framework -DEPRECATED: Use AICore.run_ai_request() instead for all new code. -This file is kept for backward compatibility only. -""" -from typing import Dict, Any, Optional, List -from igny8_core.utils.ai_processor import AIProcessor as BaseAIProcessor -from igny8_core.ai.ai_core import AICore - - -class AIProcessor: - """ - Framework-compatible wrapper around existing AIProcessor. - DEPRECATED: Use AICore.run_ai_request() instead. - This class redirects to AICore for consistency. - """ - - def __init__(self, account=None): - # Use AICore internally for all requests - self.ai_core = AICore(account=account) - self.account = account - # Keep old processor for backward compatibility only - self.processor = BaseAIProcessor(account=account) - - def call( - self, - prompt: str, - model: Optional[str] = None, - max_tokens: int = 4000, - temperature: float = 0.7, - response_format: Optional[Dict] = None, - response_steps: Optional[List] = None, - progress_callback=None - ) -> Dict[str, Any]: - """ - Call AI provider with prompt. - DEPRECATED: Use AICore.run_ai_request() instead. - - Returns: - Dict with 'content', 'error', 'input_tokens', 'output_tokens', - 'total_tokens', 'model', 'cost', 'api_id' - """ - # Redirect to AICore for centralized execution - return self.ai_core.run_ai_request( - prompt=prompt, - model=model, - max_tokens=max_tokens, - temperature=temperature, - response_format=response_format, - function_name='AIProcessor.call' - ) - - def extract_json(self, response_text: str) -> Optional[Dict]: - """Extract JSON from response text""" - return self.ai_core.extract_json(response_text) - - def generate_image( - self, - prompt: str, - model: str = 'dall-e-3', - size: str = '1024x1024', - n: int = 1, - account=None - ) -> Dict[str, Any]: - """Generate image using AI""" - return self.ai_core.generate_image( - prompt=prompt, - provider='openai', - model=model, - size=size, - n=n, - account=account or self.account, - function_name='AIProcessor.generate_image' - ) - diff --git a/backend/igny8_core/modules/planner/tasks.py b/backend/igny8_core/modules/planner/tasks.py deleted file mode 100644 index 8d6184d1..00000000 --- a/backend/igny8_core/modules/planner/tasks.py +++ /dev/null @@ -1,735 +0,0 @@ -""" -Celery tasks for Planner module - AI clustering and idea generation -""" -import logging -import time -from typing import List -from django.db import transaction -from igny8_core.modules.planner.models import Keywords, Clusters, ContentIdeas -from igny8_core.utils.ai_processor import ai_processor -from igny8_core.ai.tracker import ConsoleStepTracker - -logger = logging.getLogger(__name__) - -# Try to import Celery, fall back to synchronous execution if not available -try: - from celery import shared_task - CELERY_AVAILABLE = True -except ImportError: - CELERY_AVAILABLE = False - # Create a mock decorator for synchronous execution - def shared_task(*args, **kwargs): - def decorator(func): - return func - return decorator - - -# ============================================================================ -# DEPRECATED: This function is deprecated. Use the new AI framework instead. -# New path: views.py -> run_ai_task -> AIEngine -> AutoClusterFunction -# This function is kept for backward compatibility but should not be used. -# ============================================================================ -def _auto_cluster_keywords_core(keyword_ids: List[int], sector_id: int = None, account_id: int = None, progress_callback=None): - """ - [DEPRECATED] Core logic for clustering keywords. Can be called with or without Celery. - - ⚠️ WARNING: This function is deprecated. Use the new AI framework instead: - - New path: views.py -> run_ai_task -> AIEngine -> AutoClusterFunction - - This function uses the old AIProcessor and does not use PromptRegistry - - Console logging may not work correctly in this path - - Args: - keyword_ids: List of keyword IDs to cluster - sector_id: Sector ID for the keywords - account_id: Account ID for account isolation - progress_callback: Optional function to call for progress updates (for Celery tasks) - """ - # Initialize console step tracker for logging - tracker = ConsoleStepTracker('auto_cluster') - tracker.init(f"Starting keyword clustering for {len(keyword_ids)} keywords") - - # Track request and response steps (for Celery progress callbacks) - request_steps = [] - response_steps = [] - - try: - from igny8_core.auth.models import Sector - - # Initialize progress if callback provided - if progress_callback: - progress_callback( - state='PROGRESS', - meta={ - 'current': 0, - 'total': len(keyword_ids), - 'percentage': 0, - 'message': 'Initializing keyword clustering...', - 'phase': 'initializing', - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - # Step 4: Keyword Loading & Validation - tracker.prep(f"Loading {len(keyword_ids)} keywords from database") - step_start = time.time() - keywords_queryset = Keywords.objects.filter(id__in=keyword_ids) - if account_id: - keywords_queryset = keywords_queryset.filter(account_id=account_id) - if sector_id: - keywords_queryset = keywords_queryset.filter(sector_id=sector_id) - - keywords = list(keywords_queryset.select_related('account', 'site', 'site__account', 'sector', 'sector__site')) - - if not keywords: - error_msg = f"No keywords found for clustering: {keyword_ids}" - logger.warning(error_msg) - tracker.error('Validation', error_msg) - request_steps.append({ - 'stepNumber': 4, - 'stepName': 'Keyword Loading & Validation', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'error', - 'message': 'No keywords found', - 'error': 'No keywords found', - 'duration': int((time.time() - step_start) * 1000) - }) - if progress_callback: - progress_callback( - state='PROGRESS', - meta={'request_steps': request_steps, 'response_steps': response_steps} - ) - return {'success': False, 'error': 'No keywords found', 'request_steps': request_steps, 'response_steps': response_steps} - - tracker.prep(f"Loaded {len(keywords)} keywords successfully") - request_steps.append({ - 'stepNumber': 4, - 'stepName': 'Keyword Loading & Validation', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'success', - 'message': f'Loaded {len(keywords)} keywords', - 'duration': int((time.time() - step_start) * 1000) - }) - - total_keywords = len(keywords) - - # Step 5: Relationship Validation - step_start = time.time() - try: - first_keyword = keywords[0] - account = getattr(first_keyword, 'account', None) - site = getattr(first_keyword, 'site', None) - - # If account is None, try to get it from site - if not account and site: - try: - account = getattr(site, 'account', None) - except Exception: - pass - - sector = getattr(first_keyword, 'sector', None) - - # If site is None, try to get it from sector - if not site and sector: - try: - site = getattr(sector, 'site', None) - except Exception: - pass - - except Exception as e: - logger.error(f"Error accessing keyword relationships: {str(e)}") - request_steps.append({ - 'stepNumber': 5, - 'stepName': 'Relationship Validation', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'error', - 'message': f'Error accessing relationships: {str(e)}', - 'error': str(e), - 'duration': int((time.time() - step_start) * 1000) - }) - if progress_callback: - progress_callback( - state='PROGRESS', - meta={'request_steps': request_steps, 'response_steps': response_steps} - ) - return {'success': False, 'error': f'Invalid keyword data: {str(e)}', 'request_steps': request_steps, 'response_steps': response_steps} - - if not account: - logger.error(f"No account found for keywords: {keyword_ids}. Keyword site: {getattr(first_keyword, 'site', None)}, Keyword account: {getattr(first_keyword, 'account', None)}") - request_steps.append({ - 'stepNumber': 5, - 'stepName': 'Relationship Validation', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'error', - 'message': 'No account found', - 'error': 'No account found for keywords', - 'duration': int((time.time() - step_start) * 1000) - }) - if progress_callback: - progress_callback( - state='PROGRESS', - meta={'request_steps': request_steps, 'response_steps': response_steps} - ) - return {'success': False, 'error': 'No account found for keywords. Please ensure keywords are properly associated with a site and account.', 'request_steps': request_steps, 'response_steps': response_steps} - - if not site: - logger.error(f"No site found for keywords: {keyword_ids}. Keyword site: {getattr(first_keyword, 'site', None)}, Sector site: {getattr(sector, 'site', None) if sector else None}") - request_steps.append({ - 'stepNumber': 5, - 'stepName': 'Relationship Validation', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'error', - 'message': 'No site found', - 'error': 'No site found for keywords', - 'duration': int((time.time() - step_start) * 1000) - }) - if progress_callback: - progress_callback( - state='PROGRESS', - meta={'request_steps': request_steps, 'response_steps': response_steps} - ) - return {'success': False, 'error': 'No site found for keywords. Please ensure keywords are properly associated with a site.', 'request_steps': request_steps, 'response_steps': response_steps} - - request_steps.append({ - 'stepNumber': 5, - 'stepName': 'Relationship Validation', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'success', - 'message': f'Account: {account.id if account else None}, Site: {site.id if site else None}, Sector: {sector.id if sector else None}', - 'duration': int((time.time() - step_start) * 1000) - }) - - # Update progress: Analyzing keywords (0-40%) - if progress_callback: - progress_callback( - state='PROGRESS', - meta={ - 'current': 0, - 'total': total_keywords, - 'percentage': 5, - 'message': f'Preparing to analyze {total_keywords} keywords...', - 'phase': 'preparing', - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - # Get sector name if available - sector_name = sector.name if sector else None - - # Format keywords for AI - keyword_data = [ - { - 'keyword': kw.keyword, - 'volume': kw.volume, - 'difficulty': kw.difficulty, - 'intent': kw.intent, - } - for kw in keywords - ] - - # Update progress: Sending to AI (10-40%) - if progress_callback: - progress_callback( - state='PROGRESS', - meta={ - 'current': 0, - 'total': total_keywords, - 'percentage': 10, - 'message': 'Analyzing keyword relationships with AI...', - 'phase': 'analyzing', - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - # Step 6: AIProcessor Creation - step_start = time.time() - from igny8_core.utils.ai_processor import AIProcessor - try: - # Log account info for debugging - account_id = account.id if account else None - account_name = account.name if account else None - logger.info(f"Creating AIProcessor with account: id={account_id}, name={account_name}") - - processor = AIProcessor(account=account) - - # Log API key status - has_api_key = bool(processor.openai_api_key) - api_key_preview = processor.openai_api_key[:10] + "..." if processor.openai_api_key else "None" - logger.info(f"AIProcessor created. Has API key: {has_api_key}, Preview: {api_key_preview}, Model: {processor.default_model}") - - request_steps.append({ - 'stepNumber': 6, - 'stepName': 'AIProcessor Creation', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'success', - 'message': f'AIProcessor created with account context (Account ID: {account_id}, Has API Key: {has_api_key})', - 'duration': int((time.time() - step_start) * 1000) - }) - except Exception as e: - logger.error(f"Error creating AIProcessor: {type(e).__name__}: {str(e)}", exc_info=True) - request_steps.append({ - 'stepNumber': 6, - 'stepName': 'AIProcessor Creation', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'error', - 'message': f'Error creating AIProcessor: {str(e)}', - 'error': str(e), - 'duration': int((time.time() - step_start) * 1000) - }) - if progress_callback: - progress_callback( - state='PROGRESS', - meta={'request_steps': request_steps, 'response_steps': response_steps} - ) - return {'success': False, 'error': f'Error creating AIProcessor: {str(e)}', 'request_steps': request_steps, 'response_steps': response_steps} - - # Step 7: AI Call Preparation - step_start = time.time() - try: - # Check if API key is available - if not processor.openai_api_key: - # Try to debug why API key is missing - logger.error(f"OpenAI API key not found for account {account.id if account else None}") - # Check IntegrationSettings directly - try: - from igny8_core.modules.system.models import IntegrationSettings - settings_obj = IntegrationSettings.objects.filter( - integration_type='openai', - account=account, - is_active=True - ).first() - if settings_obj: - logger.error(f"IntegrationSettings found but API key missing. Config keys: {list(settings_obj.config.keys()) if settings_obj.config else 'None'}") - else: - logger.error(f"No IntegrationSettings found for account {account.id if account else None}, integration_type='openai', is_active=True") - except Exception as debug_error: - logger.error(f"Error checking IntegrationSettings: {str(debug_error)}", exc_info=True) - request_steps.append({ - 'stepNumber': 7, - 'stepName': 'AI Call Preparation', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'error', - 'message': 'OpenAI API key not configured', - 'error': 'OpenAI API key not configured', - 'duration': int((time.time() - step_start) * 1000) - }) - if progress_callback: - progress_callback( - state='PROGRESS', - meta={'request_steps': request_steps, 'response_steps': response_steps} - ) - return {'success': False, 'error': 'OpenAI API key not configured', 'request_steps': request_steps, 'response_steps': response_steps} - - request_steps.append({ - 'stepNumber': 7, - 'stepName': 'AI Call Preparation', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'success', - 'message': f'Prepared {len(keyword_data)} keywords for AI analysis', - 'duration': int((time.time() - step_start) * 1000) - }) - except Exception as e: - request_steps.append({ - 'stepNumber': 7, - 'stepName': 'AI Call Preparation', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'error', - 'message': f'Error preparing AI call: {str(e)}', - 'error': str(e), - 'duration': int((time.time() - step_start) * 1000) - }) - if progress_callback: - progress_callback( - state='PROGRESS', - meta={'request_steps': request_steps, 'response_steps': response_steps} - ) - return {'success': False, 'error': f'Error preparing AI call: {str(e)}', 'request_steps': request_steps, 'response_steps': response_steps} - - # Call AI with step tracking - tracker.ai_call(f"Sending {len(keyword_data)} keywords to AI for clustering") - result = processor.cluster_keywords( - keyword_data, - sector_name=sector_name, - account=account, - response_steps=response_steps, - progress_callback=progress_callback, - tracker=tracker # Pass tracker for console logging - ) - - if result.get('error'): - error_msg = f"AI clustering error: {result['error']}" - logger.error(error_msg) - tracker.error('AI_CALL', error_msg) - if progress_callback: - progress_callback( - state='FAILURE', - meta={ - 'error': result['error'], - 'message': f"Error: {result['error']}", - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - return {'success': False, 'error': result['error'], 'request_steps': request_steps, 'response_steps': response_steps} - - # Parse response - tracker.parse("Parsing AI response into cluster data") - - # Update response_steps from result if available - if result.get('response_steps'): - response_steps.extend(result.get('response_steps', [])) - - # Update progress: Creating clusters (40-90%) - clusters_data = result.get('clusters', []) - if progress_callback: - progress_callback( - state='PROGRESS', - meta={ - 'current': 0, - 'total': total_keywords, - 'percentage': 40, - 'message': f'Creating {len(clusters_data)} clusters...', - 'phase': 'creating_clusters', - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - clusters_created = 0 - keywords_updated = 0 - - # Step 13: Database Transaction Start - tracker.save(f"Creating {len(clusters_data)} clusters in database") - step_start = time.time() - # Create/update clusters and assign keywords - # Note: account and sector are already extracted above to avoid database queries inside transaction - with transaction.atomic(): - if response_steps is not None: - response_steps.append({ - 'stepNumber': 13, - 'stepName': 'Database Transaction Start', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'success', - 'message': 'Transaction started', - 'duration': int((time.time() - step_start) * 1000) - }) - - # Step 14: Cluster Creation/Update - cluster_step_start = time.time() - for idx, cluster_data in enumerate(clusters_data): - cluster_name = cluster_data.get('name', '') - cluster_keywords = cluster_data.get('keywords', []) - - if not cluster_name or not cluster_keywords: - continue - - # Update progress for each cluster - if progress_callback: - progress_pct = 40 + int((idx / len(clusters_data)) * 50) - progress_callback( - state='PROGRESS', - meta={ - 'current': idx + 1, - 'total': len(clusters_data), - 'percentage': progress_pct, - 'message': f"Creating cluster '{cluster_name}' ({idx + 1} of {len(clusters_data)})...", - 'phase': 'creating_clusters', - 'current_item': cluster_name, - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - # Get or create cluster - # Note: Clusters model (SiteSectorBaseModel) requires both site and sector - # Ensure site is always set (can be from sector.site if sector exists) - cluster_site = site if site else (sector.site if sector and hasattr(sector, 'site') else None) - - if not cluster_site: - logger.error(f"Cannot create cluster '{cluster_name}': No site available. Keywords: {keyword_ids}") - continue - - if sector: - cluster, created = Clusters.objects.get_or_create( - name=cluster_name, - account=account, - site=cluster_site, - sector=sector, - defaults={ - 'description': cluster_data.get('description', ''), - 'status': 'active', - } - ) - else: - # If no sector, create cluster without sector filter but still require site - cluster, created = Clusters.objects.get_or_create( - name=cluster_name, - account=account, - site=cluster_site, - sector__isnull=True, - defaults={ - 'description': cluster_data.get('description', ''), - 'status': 'active', - 'sector': None, - } - ) - - if created: - clusters_created += 1 - - # Step 15: Keyword Matching & Assignment - kw_step_start = time.time() - # Assign keywords to cluster - # Match keywords by keyword string (case-insensitive) from the already-loaded keywords list - # Also create a mapping for fuzzy matching (handles minor variations) - matched_keyword_objects = [] - unmatched_keywords = [] - - # Create normalized versions for exact matching - cluster_keywords_normalized = {} - for kw in cluster_keywords: - normalized = kw.strip().lower() - cluster_keywords_normalized[normalized] = kw.strip() # Keep original for logging - - # Create a mapping of all available keywords (normalized) - available_keywords_normalized = { - kw_obj.keyword.strip().lower(): kw_obj - for kw_obj in keywords - } - - # First pass: exact matches (case-insensitive) - for cluster_kw_normalized, cluster_kw_original in cluster_keywords_normalized.items(): - if cluster_kw_normalized in available_keywords_normalized: - matched_keyword_objects.append(available_keywords_normalized[cluster_kw_normalized]) - else: - unmatched_keywords.append(cluster_kw_original) - - # Log unmatched keywords for debugging - if unmatched_keywords: - logger.warning( - f"Some keywords in cluster '{cluster_name}' were not matched: {unmatched_keywords}. " - f"Available keywords: {[kw.keyword for kw in keywords]}" - ) - - # Update matched keywords - if matched_keyword_objects: - matched_ids = [kw.id for kw in matched_keyword_objects] - # Rebuild queryset inside transaction to avoid database connection issues - # Handle sector=None case - keyword_filter = Keywords.objects.filter( - id__in=matched_ids, - account=account - ) - if sector: - keyword_filter = keyword_filter.filter(sector=sector) - else: - keyword_filter = keyword_filter.filter(sector__isnull=True) - - updated_count = keyword_filter.update( - cluster=cluster, - status='mapped' # Update status from pending to mapped - ) - keywords_updated += updated_count - - # Log steps 14 and 15 after all clusters are processed - if response_steps is not None: - response_steps.append({ - 'stepNumber': 14, - 'stepName': 'Cluster Creation/Update', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'success', - 'message': f'Created/updated {clusters_created} clusters', - 'duration': int((time.time() - cluster_step_start) * 1000) - }) - response_steps.append({ - 'stepNumber': 15, - 'stepName': 'Keyword Matching & Assignment', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'success', - 'message': f'Assigned {keywords_updated} keywords to clusters', - 'duration': 0 # Duration already included in step 14 - }) - - # Step 16: Metrics Recalculation & Commit - step_start = time.time() - # Update progress: Recalculating metrics (90-95%) - if progress_callback: - progress_callback( - state='PROGRESS', - meta={ - 'current': clusters_created, - 'total': clusters_created, - 'percentage': 90, - 'message': 'Recalculating cluster metrics...', - 'phase': 'finalizing', - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - # Recalculate cluster metrics - from django.db.models import Sum - cluster_filter = Clusters.objects.filter(account=account) - if sector: - cluster_filter = cluster_filter.filter(sector=sector) - else: - cluster_filter = cluster_filter.filter(sector__isnull=True) - - for cluster in cluster_filter: - cluster.keywords_count = Keywords.objects.filter(cluster=cluster).count() - volume_sum = Keywords.objects.filter(cluster=cluster).aggregate( - total=Sum('volume') - )['total'] - cluster.volume = volume_sum or 0 - cluster.save() - - # Transaction commits here automatically - if response_steps is not None: - response_steps.append({ - 'stepNumber': 16, - 'stepName': 'Metrics Recalculation & Commit', - 'functionName': '_auto_cluster_keywords_core', - 'status': 'success', - 'message': f'Recalculated metrics for {cluster_filter.count()} clusters, transaction committed', - 'duration': int((time.time() - step_start) * 1000) - }) - - # Final progress update - final_message = f"Clustering complete: {clusters_created} clusters created, {keywords_updated} keywords updated" - logger.info(final_message) - tracker.done(final_message) - - if progress_callback: - progress_callback( - state='SUCCESS', - meta={ - 'message': final_message, - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - return { - 'success': True, - 'clusters_created': clusters_created, - 'keywords_updated': keywords_updated, - 'message': final_message, - 'request_steps': request_steps, - 'response_steps': response_steps, - } - - except Exception as e: - error_msg = f"Error in auto_cluster_keywords_core: {str(e)}" - logger.error(error_msg, exc_info=True) - tracker.error('Exception', error_msg, exception=e) - if progress_callback: - progress_callback( - state='FAILURE', - meta={ - 'error': str(e), - 'message': f'Error: {str(e)}', - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - return { - 'success': False, - 'error': str(e), - 'request_steps': request_steps, - 'response_steps': response_steps - } - - -@shared_task(bind=True, max_retries=3) -# ============================================================================ -# DEPRECATED: This Celery task is deprecated. Use run_ai_task instead. -# New path: views.py -> run_ai_task -> AIEngine -> AutoClusterFunction -# ============================================================================ -def auto_cluster_keywords_task(self, keyword_ids: List[int], sector_id: int = None, account_id: int = None): - """ - [DEPRECATED] Celery task wrapper for clustering keywords using AI. - - ⚠️ WARNING: This task is deprecated. Use the new AI framework instead: - - New path: views.py -> run_ai_task -> AIEngine -> AutoClusterFunction - - This task uses the old _auto_cluster_keywords_core function - - Console logging may not work correctly in this path - - Args: - keyword_ids: List of keyword IDs to cluster - sector_id: Sector ID for the keywords - account_id: Account ID for account isolation - """ - logger.info("=" * 80) - logger.info("auto_cluster_keywords_task STARTED") - logger.info(f" - Task ID: {self.request.id}") - logger.info(f" - keyword_ids: {keyword_ids}") - logger.info(f" - sector_id: {sector_id}") - logger.info(f" - account_id: {account_id}") - logger.info("=" * 80) - - # Initialize request_steps and response_steps for error reporting - request_steps = [] - response_steps = [] - - def progress_callback(state, meta): - # Capture request_steps and response_steps from meta if available - nonlocal request_steps, response_steps - if isinstance(meta, dict): - if 'request_steps' in meta: - request_steps = meta['request_steps'] - if 'response_steps' in meta: - response_steps = meta['response_steps'] - self.update_state(state=state, meta=meta) - - try: - result = _auto_cluster_keywords_core(keyword_ids, sector_id, account_id, progress_callback) - logger.info(f"auto_cluster_keywords_task COMPLETED: {result}") - return result - except Exception as e: - error_type = type(e).__name__ - error_msg = str(e) - - # Log full error details - logger.error("=" * 80) - logger.error(f"auto_cluster_keywords_task FAILED: {error_type}: {error_msg}") - logger.error(f" - Task ID: {self.request.id}") - logger.error(f" - keyword_ids: {keyword_ids}") - logger.error(f" - sector_id: {sector_id}") - logger.error(f" - account_id: {account_id}") - logger.error("=" * 80, exc_info=True) - - # Create detailed error dict that Celery can serialize - error_dict = { - 'error': error_msg, - 'error_type': error_type, - 'error_class': error_type, - 'message': f'{error_type}: {error_msg}', - 'request_steps': request_steps, - 'response_steps': response_steps, - 'task_id': str(self.request.id), - 'keyword_ids': keyword_ids, - 'sector_id': sector_id, - 'account_id': account_id - } - - # Update task state with detailed error - try: - self.update_state( - state='FAILURE', - meta=error_dict - ) - except Exception as update_error: - # If update_state fails, log it but continue - logger.error(f"Failed to update task state: {str(update_error)}") - - # Return error result - return error_dict - - -# REMOVED: All idea generation functions removed -# - auto_generate_ideas_task -# - _generate_single_idea_core -# - generate_single_idea_task - diff --git a/backend/igny8_core/modules/writer/tasks.py b/backend/igny8_core/modules/writer/tasks.py deleted file mode 100644 index fdb5c219..00000000 --- a/backend/igny8_core/modules/writer/tasks.py +++ /dev/null @@ -1,1156 +0,0 @@ -""" -Celery tasks for Writer module - AI content generation -""" -import logging -import re -import time -from typing import List -from django.db import transaction -from igny8_core.modules.writer.models import Tasks, Images, Content -from igny8_core.utils.ai_processor import ai_processor -from igny8_core.modules.system.utils import get_prompt_value, get_default_prompt -from igny8_core.ai.functions.generate_content import generate_content_core -from igny8_core.ai.functions.generate_images import generate_images_core - -logger = logging.getLogger(__name__) - -# Try to import Celery, fall back to synchronous execution if not available -try: - from celery import shared_task - CELERY_AVAILABLE = True -except ImportError: - CELERY_AVAILABLE = False - def shared_task(*args, **kwargs): - def decorator(func): - return func - return decorator - - -@shared_task(bind=True, max_retries=3) -def auto_generate_content_task(self, task_ids: List[int], account_id: int = None): - """ - Celery task to generate content for tasks using AI. - - Args: - task_ids: List of task IDs - account_id: Account ID for account isolation - """ - try: - # Step tracking for progress modal - step_counter = 0 - request_steps = [] - response_steps = [] - - def add_step(step_name, status='success', message='', step_type='request'): - nonlocal step_counter - step_counter += 1 - step = { - 'stepNumber': step_counter, - 'stepName': step_name, - 'status': status, - 'message': message, - 'timestamp': time.time() - } - if step_type == 'request': - request_steps.append(step) - else: - response_steps.append(step) - return step - - # Initialize progress - add_step('INIT', 'success', 'Initializing content generation...', 'request') - self.update_state( - state='PROGRESS', - meta={ - 'current': 0, - 'total': len(task_ids), - 'percentage': 0, - 'message': 'Initializing content generation...', - 'phase': 'INIT', - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - # ======================================================================== - # DATABASE QUERY PHASE - Detailed logging - # ======================================================================== - logger.info("=" * 80) - logger.info("DATABASE QUERY: Starting task retrieval") - logger.info(f" - task_ids: {task_ids}") - logger.info(f" - account_id: {account_id}") - logger.info("=" * 80) - - # Get tasks with all relationships preloaded to avoid N+1 queries - try: - from django.db import connection - - # Check database connection first - try: - with connection.cursor() as cursor: - cursor.execute("SELECT 1") - logger.info(" - ✓ Database connection verified") - except Exception as conn_error: - logger.error(f" - ✗ Database connection failed: {type(conn_error).__name__}: {str(conn_error)}") - raise - - # Build queryset step by step with error handling - logger.info(" - Building queryset...") - tasks_queryset = Tasks.objects.filter(id__in=task_ids) - logger.info(f" - Initial queryset created for {len(task_ids)} task IDs") - - # Count before account filter (may fail if database issue) - try: - initial_count = tasks_queryset.count() - logger.info(f" - Initial queryset count (before account filter): {initial_count}") - except Exception as count_error: - logger.error(f" - ✗ Failed to count initial queryset: {type(count_error).__name__}: {str(count_error)}") - raise - - if account_id: - logger.info(f" - Applying account filter (account_id={account_id})...") - tasks_queryset = tasks_queryset.filter(account_id=account_id) - try: - filtered_count = tasks_queryset.count() - logger.info(f" - After account filter: {filtered_count} tasks") - except Exception as count_error: - logger.error(f" - ✗ Failed to count after account filter: {type(count_error).__name__}: {str(count_error)}") - raise - - # Log queryset SQL for debugging (only if query is valid) - try: - sql_query = str(tasks_queryset.query) - logger.info(f" - Queryset SQL: {sql_query[:500]}...") # Truncate if too long - except Exception as sql_error: - logger.warning(f" - ⚠️ Could not generate SQL string: {type(sql_error).__name__}: {str(sql_error)}") - - # Optimize queries: preload all related objects to avoid N+1 queries - # Only select relationships that definitely exist (nullable FKs handled safely) - logger.info(" - Applying select_related for: cluster, idea, sector, account, site") - try: - tasks_queryset = tasks_queryset.select_related( - 'cluster', # Cluster FK (nullable) - 'idea', # ContentIdeas FK (nullable) - 'sector', # Sector FK (required - SiteSectorBaseModel) - 'account', # Account FK (required - AccountBaseModel) - 'site', # Site FK (required - SiteSectorBaseModel) - ) - logger.info(" - ✓ select_related applied successfully") - except Exception as select_error: - logger.error(f" - ✗ Failed to apply select_related: {type(select_error).__name__}: {str(select_error)}") - # Try without select_related as fallback - logger.warning(" - ⚠️ Falling back to queryset without select_related") - tasks_queryset = Tasks.objects.filter(id__in=task_ids) - if account_id: - tasks_queryset = tasks_queryset.filter(account_id=account_id) - - # Convert to list to execute query and log results - logger.info(" - Executing database query...") - try: - tasks = list(tasks_queryset) - logger.info(f" - ✓ Query executed successfully: {len(tasks)} tasks retrieved") - except Exception as query_error: - logger.error(f" - ✗ Query execution failed: {type(query_error).__name__}: {str(query_error)}") - logger.error(f" - This may indicate a database schema issue or missing relationships") - raise - - except Exception as db_error: - logger.error("=" * 80) - logger.error("DATABASE QUERY ERROR") - logger.error(f" - Error type: {type(db_error).__name__}") - logger.error(f" - Error message: {str(db_error)}") - logger.error(f" - task_ids: {task_ids}") - logger.error(f" - account_id: {account_id}") - logger.error("=" * 80, exc_info=True) - raise - - # Log detailed information about each retrieved task - logger.info("=" * 80) - logger.info("DATABASE RESULTS: Task data collected") - logger.info(f" - Total tasks retrieved: {len(tasks)}") - logger.info("=" * 80) - - for idx, task in enumerate(tasks): - logger.info(f" Task #{idx + 1} (ID: {task.id}):") - logger.info(f" - title: {task.title}") - logger.info(f" - status: {task.status}") - logger.info(f" - account_id: {task.account_id if hasattr(task, 'account_id') else 'N/A'}") - logger.info(f" - site_id: {task.site_id if hasattr(task, 'site_id') else 'N/A'}") - logger.info(f" - sector_id: {task.sector_id if hasattr(task, 'sector_id') else 'N/A'}") - logger.info(f" - cluster_id: {task.cluster_id if hasattr(task, 'cluster_id') else 'None'}") - logger.info(f" - idea_id: {task.idea_id if hasattr(task, 'idea_id') else 'None'}") - - # Check if relationships are loaded - try: - account_loaded = task.account is not None - site_loaded = task.site is not None - sector_loaded = task.sector is not None - cluster_loaded = task.cluster is not None - idea_loaded = task.idea is not None - - logger.info(f" - Relationships loaded:") - logger.info(f" * account: {account_loaded} (ID: {task.account.id if account_loaded else 'N/A'})") - logger.info(f" * site: {site_loaded} (ID: {task.site.id if site_loaded else 'N/A'}, Name: {task.site.name if site_loaded else 'N/A'})") - logger.info(f" * sector: {sector_loaded} (ID: {task.sector.id if sector_loaded else 'N/A'}, Name: {task.sector.name if sector_loaded else 'N/A'})") - logger.info(f" * cluster: {cluster_loaded} (ID: {task.cluster.id if cluster_loaded else 'None'}, Name: {task.cluster.name if cluster_loaded else 'N/A'})") - logger.info(f" * idea: {idea_loaded} (ID: {task.idea.id if idea_loaded else 'None'}, Title: {task.idea.idea_title if idea_loaded else 'N/A'})") - - # Check for potential data issues - if not account_loaded: - logger.error(f" - ⚠️ WARNING: Task {task.id} has no account loaded!") - if not site_loaded: - logger.error(f" - ⚠️ WARNING: Task {task.id} has no site loaded!") - if not sector_loaded: - logger.error(f" - ⚠️ WARNING: Task {task.id} has no sector loaded!") - - except Exception as rel_error: - logger.error(f" - ⚠️ ERROR accessing relationships: {type(rel_error).__name__}: {str(rel_error)}") - logger.error(f" - This may indicate a database relationship issue", exc_info=True) - - logger.info("=" * 80) - - if not tasks: - logger.warning(f"No tasks found: {task_ids}") - return {'success': False, 'error': 'No tasks found'} - - total_tasks = len(tasks) - - # Update progress: Preparing tasks - add_step('PREP', 'success', f'Preparing {total_tasks} tasks for content generation...', 'request') - self.update_state( - state='PROGRESS', - meta={ - 'current': 0, - 'total': total_tasks, - 'percentage': 10, - 'message': f'Preparing {total_tasks} tasks for content generation...', - 'phase': 'PREP', - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - tasks_updated = 0 - - # Generate content for each task - with transaction.atomic(): - for idx, task in enumerate(tasks): - # ======================================================================== - # TASK VALIDATION PHASE - Detailed logging - # ======================================================================== - logger.info("=" * 80) - logger.info(f"PROCESSING TASK #{idx + 1}/{total_tasks} (ID: {task.id})") - logger.info("=" * 80) - - # Validate task has required data before processing - logger.info(" - Validating task data...") - if not task.title: - logger.warning(f" - ⚠️ Task {task.id} has no title, skipping") - continue - logger.info(f" - ✓ Title: {task.title}") - - # Get account - ensure it's loaded (already in select_related) - logger.info(" - Checking account relationship...") - try: - account = task.account - if not account: - logger.error(f" - ✗ Task {task.id} has no account object (account_id={task.account_id}), skipping") - continue - logger.info(f" - ✓ Account loaded: ID={account.id}, Name={account.name if hasattr(account, 'name') else 'N/A'}") - except Exception as account_error: - logger.error(f" - ✗ ERROR accessing account: {type(account_error).__name__}: {str(account_error)}") - logger.error(f" - Task account_id: {task.account_id}", exc_info=True) - continue - - # Validate site relationship - logger.info(" - Checking site relationship...") - try: - site = task.site - if not site: - logger.error(f" - ✗ Task {task.id} has no site object (site_id={task.site_id}), skipping") - continue - logger.info(f" - ✓ Site loaded: ID={site.id}, Name={site.name if hasattr(site, 'name') else 'N/A'}") - except Exception as site_error: - logger.error(f" - ✗ ERROR accessing site: {type(site_error).__name__}: {str(site_error)}") - logger.error(f" - Task site_id: {task.site_id}", exc_info=True) - continue - - # Validate sector relationship - logger.info(" - Checking sector relationship...") - try: - sector = task.sector - if not sector: - logger.error(f" - ✗ Task {task.id} has no sector object (sector_id={task.sector_id}), skipping") - continue - logger.info(f" - ✓ Sector loaded: ID={sector.id}, Name={sector.name if hasattr(sector, 'name') else 'N/A'}") - except Exception as sector_error: - logger.error(f" - ✗ ERROR accessing sector: {type(sector_error).__name__}: {str(sector_error)}") - logger.error(f" - Task sector_id: {task.sector_id}", exc_info=True) - continue - - # Check cluster relationship (nullable) - logger.info(" - Checking cluster relationship (nullable)...") - try: - cluster = task.cluster - if cluster: - logger.info(f" - ✓ Cluster loaded: ID={cluster.id}, Name={cluster.name if hasattr(cluster, 'name') else 'N/A'}") - else: - logger.info(f" - ℹ Cluster is None (nullable field)") - except Exception as cluster_error: - logger.warning(f" - ⚠️ ERROR accessing cluster (nullable): {type(cluster_error).__name__}: {str(cluster_error)}") - logger.warning(f" - Task cluster_id: {task.cluster_id}") - # Don't skip - cluster is nullable - - # Check idea relationship (nullable) - logger.info(" - Checking idea relationship (nullable)...") - try: - idea = task.idea - if idea: - logger.info(f" - ✓ Idea loaded: ID={idea.id}, Title={idea.idea_title if hasattr(idea, 'idea_title') else 'N/A'}") - else: - logger.info(f" - ℹ Idea is None (nullable field)") - except Exception as idea_error: - logger.warning(f" - ⚠️ ERROR accessing idea (nullable): {type(idea_error).__name__}: {str(idea_error)}") - logger.warning(f" - Task idea_id: {task.idea_id}") - # Don't skip - idea is nullable - - # Update progress: Processing task - # Calculate base percentage: 10% (PREP) + progress through tasks (10-50%) - base_pct = 10 - task_progress_pct = base_pct + int((idx / total_tasks) * 40) # 10-50% for task prep - self.update_state( - state='PROGRESS', - meta={ - 'current': idx + 1, - 'total': total_tasks, - 'percentage': task_progress_pct, - 'message': f"Preparing content generation for '{task.title}' ({idx + 1} of {total_tasks})...", - 'phase': 'PREP', - 'current_item': task.title, - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - # ======================================================================== - # PROMPT LOADING PHASE - Detailed logging - # ======================================================================== - logger.info(" - Loading prompt template...") - try: - # Get prompt template from database or default (account-aware) - # Use utility function to ensure proper loading - logger.info(f" * Attempting to load prompt from database for account {account.id}...") - prompt_template = get_prompt_value(account, 'content_generation') - - if not prompt_template: - # Fallback to default if not found - logger.warning(f" * No custom prompt found in database, using default...") - prompt_template = get_default_prompt('content_generation') - logger.warning(f" * Using default prompt for account {account.id}") - else: - logger.info(f" * ✓ Custom prompt loaded from database (length: {len(prompt_template)} chars)") - - logger.info(f" * Prompt template length: {len(prompt_template)} characters") - logger.info(f" * Prompt template preview (first 200 chars): {prompt_template[:200]}...") - - except Exception as prompt_error: - logger.error(f" * ✗ ERROR loading prompt: {type(prompt_error).__name__}: {str(prompt_error)}") - logger.error(f" * Account ID: {account.id}", exc_info=True) - # Fallback to default - try: - prompt_template = get_default_prompt('content_generation') - logger.warning(f" * Using default prompt as fallback") - except Exception as default_error: - logger.error(f" * ✗ CRITICAL: Cannot load default prompt either: {str(default_error)}") - continue - - # Validate prompt template has required placeholders - if '[IGNY8_IDEA]' not in prompt_template: - logger.warning(f"Prompt template missing [IGNY8_IDEA] placeholder for task {task.id}") - if '[IGNY8_CLUSTER]' not in prompt_template: - logger.warning(f"Prompt template missing [IGNY8_CLUSTER] placeholder for task {task.id}") - if '[IGNY8_KEYWORDS]' not in prompt_template: - logger.warning(f"Prompt template missing [IGNY8_KEYWORDS] placeholder for task {task.id}") - - # ======================================================================== - # DATA FORMATTING PHASE - Detailed logging - # ======================================================================== - logger.info(" - Formatting data for AI prompt...") - - # Build idea data string (format similar to WordPress plugin) - logger.info(" * Building idea data string...") - idea_data = f"Title: {task.title or 'Untitled'}\n" - logger.info(f" - Title: {task.title or 'Untitled'}") - - if task.description: - idea_data += f"Description: {task.description}\n" - logger.info(f" - Description: {task.description[:100] if len(task.description) > 100 else task.description}...") - else: - logger.info(f" - Description: None") - - if task.idea and task.idea.description: - logger.info(f" * Processing idea description (ID: {task.idea.id})...") - # Handle structured description (JSON) vs plain text - description = task.idea.description - logger.info(f" - Idea description type: {type(description).__name__}, length: {len(str(description)) if description else 0}") - - try: - import json - # Try to parse as JSON (structured outline) - logger.info(f" - Attempting to parse as JSON...") - parsed_desc = json.loads(description) - if isinstance(parsed_desc, dict): - logger.info(f" - ✓ Successfully parsed as JSON dict") - logger.info(f" - JSON keys: {list(parsed_desc.keys())}") - # Format structured description - formatted_desc = "Content Outline:\n\n" - if 'H2' in parsed_desc: - h2_count = len(parsed_desc['H2']) - logger.info(f" - Found {h2_count} H2 sections") - for h2_idx, h2_section in enumerate(parsed_desc['H2']): - formatted_desc += f"## {h2_section.get('heading', '')}\n" - if 'subsections' in h2_section: - h3_count = len(h2_section['subsections']) - logger.info(f" - H2 #{h2_idx + 1}: {h2_section.get('heading', '')} ({h3_count} subsections)") - for h3_section in h2_section['subsections']: - formatted_desc += f"### {h3_section.get('subheading', '')}\n" - formatted_desc += f"Content Type: {h3_section.get('content_type', '')}\n" - formatted_desc += f"Details: {h3_section.get('details', '')}\n\n" - description = formatted_desc - logger.info(f" - ✓ Formatted structured description (length: {len(description)} chars)") - except json.JSONDecodeError as json_error: - # Not JSON, use as plain text - logger.info(f" - Not JSON format (JSONDecodeError: {str(json_error)}), using as plain text") - pass - except TypeError as type_error: - logger.warning(f" - Type error parsing description: {str(type_error)}, using as plain text") - pass - except Exception as parse_error: - logger.error(f" - ✗ Unexpected error parsing description: {type(parse_error).__name__}: {str(parse_error)}") - logger.error(f" - Description value: {str(description)[:200]}...", exc_info=True) - # Continue with plain text - pass - - idea_data += f"Outline: {description}\n" - logger.info(f" - ✓ Added outline to idea_data") - if task.idea: - idea_data += f"Structure: {task.idea.content_structure or task.content_structure or 'blog_post'}\n" - idea_data += f"Type: {task.idea.content_type or task.content_type or 'blog_post'}\n" - if task.idea.estimated_word_count: - idea_data += f"Estimated Word Count: {task.idea.estimated_word_count}\n" - - # Build cluster data string (format similar to WordPress plugin) - logger.info(" * Building cluster data string...") - cluster_data = '' - if task.cluster: - try: - cluster_data = f"Cluster Name: {task.cluster.name or ''}\n" - logger.info(f" - Cluster name: {task.cluster.name or 'N/A'}") - - if task.cluster.description: - cluster_data += f"Description: {task.cluster.description}\n" - logger.info(f" - Cluster description: {task.cluster.description[:100] if len(task.cluster.description) > 100 else task.cluster.description}...") - - cluster_data += f"Status: {task.cluster.status or 'active'}\n" - logger.info(f" - Cluster status: {task.cluster.status or 'active'}") - - # Get keyword count from cluster (if available) - if hasattr(task.cluster, 'keywords_count'): - keywords_count = task.cluster.keywords_count or 0 - cluster_data += f"Keyword Count: {keywords_count}\n" - logger.info(f" - Cluster keywords_count: {keywords_count}") - - if hasattr(task.cluster, 'volume'): - volume = task.cluster.volume or 0 - cluster_data += f"Total Volume: {volume}\n" - logger.info(f" - Cluster volume: {volume}") - - logger.info(f" - ✓ Cluster data formatted (length: {len(cluster_data)} chars)") - except Exception as cluster_data_error: - logger.error(f" - ✗ ERROR building cluster data: {type(cluster_data_error).__name__}: {str(cluster_data_error)}") - logger.error(f" - Cluster object: {task.cluster}", exc_info=True) - cluster_data = f"Cluster Name: {task.cluster.name or 'Unknown'}\n" - else: - logger.info(f" - No cluster associated with task") - - # Build keywords string - logger.info(" * Building keywords data string...") - # Prefer task.keywords, fallback to idea.target_keywords, then cluster keywords - keywords_data = task.keywords or '' - logger.info(f" - Task keywords: {keywords_data or 'None'}") - - if not keywords_data and task.idea: - try: - if task.idea.target_keywords: - keywords_data = task.idea.target_keywords - logger.info(f" - Using idea.target_keywords: {keywords_data[:100] if len(keywords_data) > 100 else keywords_data}...") - except Exception as idea_keywords_error: - logger.warning(f" - ⚠️ ERROR accessing idea.target_keywords: {type(idea_keywords_error).__name__}: {str(idea_keywords_error)}") - - if not keywords_data and task.cluster: - logger.info(f" - No keywords from task or idea, cluster available but not fetching keywords") - keywords_data = '' - - logger.info(f" - ✓ Final keywords_data: {keywords_data[:100] if len(keywords_data) > 100 else keywords_data}...") - - # Replace placeholders in prompt template - logger.info(" * Replacing placeholders in prompt template...") - try: - prompt = prompt_template.replace('[IGNY8_IDEA]', idea_data) - logger.info(f" - ✓ Replaced [IGNY8_IDEA] (idea_data length: {len(idea_data)} chars)") - - prompt = prompt.replace('[IGNY8_CLUSTER]', cluster_data) - logger.info(f" - ✓ Replaced [IGNY8_CLUSTER] (cluster_data length: {len(cluster_data)} chars)") - - prompt = prompt.replace('[IGNY8_KEYWORDS]', keywords_data) - logger.info(f" - ✓ Replaced [IGNY8_KEYWORDS] (keywords_data length: {len(keywords_data)} chars)") - - logger.info(f" - ✓ Final prompt length: {len(prompt)} characters") - logger.info(f" - Final prompt preview (first 500 chars): {prompt[:500]}...") - - except Exception as prompt_replace_error: - logger.error(f" - ✗ ERROR replacing placeholders: {type(prompt_replace_error).__name__}: {str(prompt_replace_error)}") - logger.error(f" - Prompt template length: {len(prompt_template)}") - logger.error(f" - Idea data length: {len(idea_data)}") - logger.error(f" - Cluster data length: {len(cluster_data)}") - logger.error(f" - Keywords data length: {len(keywords_data)}", exc_info=True) - continue - - # Log prompt preparation summary - logger.info("=" * 80) - logger.info(f"PROMPT PREPARATION SUMMARY for Task {task.id}:") - logger.info(f" - Prompt length: {len(prompt)} characters") - logger.info(f" - Has idea: {bool(task.idea)}") - logger.info(f" - Has cluster: {bool(task.cluster)}") - logger.info(f" - Idea data length: {len(idea_data)} chars") - logger.info(f" - Cluster data length: {len(cluster_data)} chars") - logger.info(f" - Keywords data length: {len(keywords_data)} chars") - logger.info("=" * 80) - - # Update progress: Generating with AI - add_step('AI_CALL', 'success', f"Generating article content for '{task.title}'...", 'request') - ai_call_pct = 50 + int((idx / total_tasks) * 20) # 50-70% for AI call - self.update_state( - state='PROGRESS', - meta={ - 'current': idx + 1, - 'total': total_tasks, - 'percentage': ai_call_pct, - 'message': f"Generating article content for '{task.title}'...", - 'phase': 'AI_CALL', - 'current_item': task.title, - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - # ======================================================================== - # AI PROCESSOR INITIALIZATION PHASE - Detailed logging - # ======================================================================== - logger.info(" - Initializing AIProcessor...") - try: - # Create AIProcessor instance with account to load API keys from IntegrationSettings - # This ensures API keys and model are loaded from IntegrationSettings - from igny8_core.utils.ai_processor import AIProcessor - logger.info(f" * Creating AIProcessor instance for account {account.id}...") - processor = AIProcessor(account=account) - logger.info(f" * ✓ AIProcessor created successfully") - - # Validate processor has API key - logger.info(f" * Checking OpenAI API key...") - if not processor.openai_api_key: - logger.error(f" * ✗ OpenAI API key not configured for account {account.id}") - logger.error(f" * This will cause the AI request to fail") - continue - else: - # Log partial key for verification (first 10 chars + ...) - api_key_preview = processor.openai_api_key[:10] + "..." if len(processor.openai_api_key) > 10 else "***" - logger.info(f" * ✓ OpenAI API key configured (preview: {api_key_preview})") - - # Log model information - logger.info(f" * Default model: {processor.default_model}") - logger.info(f" * Model rates available: {list(processor.model_rates.keys())}") - - except Exception as processor_error: - logger.error(f" * ✗ ERROR initializing AIProcessor: {type(processor_error).__name__}: {str(processor_error)}") - logger.error(f" * Account ID: {account.id}", exc_info=True) - continue - - # Log AI request details (without exposing sensitive data) - logger.info("=" * 80) - logger.info(f"AI REQUEST PREPARATION for Task {task.id}:") - logger.info(f" - Model: {processor.default_model}") - logger.info(f" - Prompt length: {len(prompt)} characters") - logger.info(f" - Max tokens: 4000") - logger.info("=" * 80) - - # ======================================================================== - # AI API CALL PHASE - Detailed logging - # ======================================================================== - logger.info(" - Calling AI API...") - try: - # Call AI processor - result = processor.generate_content(prompt, max_tokens=4000) - logger.info(f" * ✓ AI API call completed") - - # Log response details - if result.get('error'): - logger.error(f" * ✗ AI returned error: {result['error']}") - logger.error(f" * Error details: {result}") - continue - - content = result.get('content', '') - if not content: - logger.warning(f" * ⚠️ No content in AI response") - logger.warning(f" * Response keys: {list(result.keys())}") - logger.warning(f" * Full response: {result}") - continue - - # Log raw response - logger.info(f" * ✓ Raw content received: {len(content)} characters") - logger.info(f" * Response keys: {list(result.keys())}") - logger.info(f" * Input tokens: {result.get('input_tokens', 'N/A')}") - logger.info(f" * Output tokens: {result.get('output_tokens', 'N/A')}") - logger.info(f" * Total tokens: {result.get('tokens_used', result.get('total_tokens', 'N/A'))}") - logger.info(f" * Cost: ${result.get('cost', 'N/A')}") - logger.info(f" * Raw content preview (first 200 chars): {content[:200]}...") - - # Update progress: Parsing content - add_step('PARSE', 'success', f"Processing content for '{task.title}'...", 'response') - parse_pct = 70 + int((idx / total_tasks) * 10) # 70-80% for parsing - self.update_state( - state='PROGRESS', - meta={ - 'current': idx + 1, - 'total': total_tasks, - 'percentage': parse_pct, - 'message': f"Processing content for '{task.title}'...", - 'phase': 'PARSE', - 'current_item': task.title, - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - # Parse JSON response using GenerateContentFunction's parse_response method - logger.info(f" * Parsing AI response (length: {len(content)} chars)...") - try: - from igny8_core.ai.functions.generate_content import GenerateContentFunction - fn = GenerateContentFunction() - parsed_response = fn.parse_response(content) - - logger.info(f" * ✓ Response parsed:") - logger.info(f" - Type: {type(parsed_response).__name__}") - if isinstance(parsed_response, dict): - logger.info(f" - Keys: {list(parsed_response.keys())}") - logger.info(f" - Has title: {bool(parsed_response.get('title'))}") - logger.info(f" - Has meta_title: {bool(parsed_response.get('meta_title'))}") - logger.info(f" - Has primary_keyword: {bool(parsed_response.get('primary_keyword'))}") - logger.info(f" - Has secondary_keywords: {bool(parsed_response.get('secondary_keywords'))}") - logger.info(f" - Has tags: {bool(parsed_response.get('tags'))}") - logger.info(f" - Has categories: {bool(parsed_response.get('categories'))}") - logger.info(f" - Content length: {len(parsed_response.get('content', ''))} chars") - else: - logger.info(f" - Content length: {len(str(parsed_response))} chars") - - # Use parsed response for saving - parsed_data = parsed_response - - except Exception as parse_error: - logger.warning(f" * ⚠️ JSON parsing failed: {type(parse_error).__name__}: {str(parse_error)}") - logger.warning(f" * Treating as plain text content") - # Fallback to plain text - parsed_data = {'content': content} - - # Normalize content from parsed response - content_to_normalize = parsed_data.get('content', '') if isinstance(parsed_data, dict) else str(parsed_data) - if content_to_normalize: - logger.info(f" * Normalizing content (length: {len(content_to_normalize)} chars)...") - try: - from igny8_core.utils.content_normalizer import normalize_content - normalized = normalize_content(content_to_normalize) - normalized_content = normalized['normalized_content'] - content_type = normalized['content_type'] - has_structure = normalized['has_structure'] - original_format = normalized['original_format'] - - logger.info(f" * ✓ Content normalized:") - logger.info(f" - Original format: {original_format}") - logger.info(f" - Content type: {content_type}") - logger.info(f" - Has structure: {has_structure}") - logger.info(f" - Normalized length: {len(normalized_content)} chars") - logger.info(f" - Normalized preview (first 200 chars): {normalized_content[:200]}...") - - # Update parsed_data with normalized content - if isinstance(parsed_data, dict): - parsed_data['content'] = normalized_content - else: - parsed_data = {'content': normalized_content} - - except Exception as norm_error: - logger.warning(f" * ⚠️ Content normalization failed: {type(norm_error).__name__}: {str(norm_error)}") - logger.warning(f" * Using original content as-is") - # Continue with original content - - except Exception as ai_error: - logger.error(f" * ✗ EXCEPTION during AI API call: {type(ai_error).__name__}: {str(ai_error)}") - logger.error(f" * Task ID: {task.id}", exc_info=True) - continue - - # Use GenerateContentFunction's save_output method to properly save all fields - logger.info(" - Saving content to database using GenerateContentFunction.save_output()...") - try: - from igny8_core.ai.functions.generate_content import GenerateContentFunction - fn = GenerateContentFunction() - - # Save using the proper save_output method which handles all fields - save_result = fn.save_output(parsed_data, [task], account) - - # Get word count from save result or calculate - word_count = save_result.get('word_count', 0) - if not word_count and isinstance(parsed_data, dict): - content_for_count = parsed_data.get('content', '') - if content_for_count: - text_for_counting = re.sub(r'<[^>]+>', '', content_for_count) - word_count = len(text_for_counting.split()) - - logger.info(f" * ✓ Task saved successfully using save_output()") - logger.info(f" - tasks_updated: {save_result.get('tasks_updated', 0)}") - logger.info(f" - word_count: {word_count}") - - # Log all fields that were saved - logger.info(f" * Saved fields:") - logger.info(f" - task_id: {task.id}") - logger.info(f" - task_status: {task.status}") - if isinstance(parsed_data, dict): - logger.info(f" - content_title: {parsed_data.get('title') or task.title}") - logger.info(f" - content_primary_keyword: {parsed_data.get('primary_keyword') or 'N/A'}") - logger.info(f" - content_secondary_keywords: {len(parsed_data.get('secondary_keywords') or [])} items") - logger.info(f" - content_tags: {len(parsed_data.get('tags') or [])} items") - logger.info(f" - content_categories: {len(parsed_data.get('categories') or [])} items") - logger.info(f" - content_word_count: {word_count}") - - # Update progress: Saving content - add_step('SAVE', 'success', f"Content saved for '{task.title}' ({word_count} words)...", 'response') - save_pct = 85 + int((idx / total_tasks) * 10) # 85-95% for saving - self.update_state( - state='PROGRESS', - meta={ - 'current': idx + 1, - 'total': total_tasks, - 'percentage': save_pct, - 'message': f"Content saved for '{task.title}' ({word_count} words)...", - 'phase': 'SAVE', - 'current_item': task.title, - 'request_steps': request_steps, - 'response_steps': response_steps - } - ) - - tasks_updated += save_result.get('tasks_updated', 0) - logger.info(f" * ✓ Task {task.id} content generation completed successfully") - - except Exception as save_error: - logger.error("=" * 80) - logger.error(f"DATABASE SAVE ERROR for Task {task.id}") - logger.error(f" - Error type: {type(save_error).__name__}") - logger.error(f" - Error message: {str(save_error)}") - logger.error(f" - Task ID: {task.id}") - logger.error(f" - Task title: {task.title}") - logger.error(f" - Content length: {len(content) if content else 0}") - logger.error(f" - Word count: {word_count}") - logger.error("=" * 80, exc_info=True) - continue - - logger.info("=" * 80) - logger.info(f"✓ TASK {task.id} PROCESSING COMPLETE") - logger.info("=" * 80) - - # Final progress update - mark as DONE - final_message = f"Content generation complete: {tasks_updated} articles generated" - add_step('DONE', 'success', final_message, 'response') - logger.info("=" * 80) - logger.info(f"TASK COMPLETION SUMMARY") - logger.info(f" - Total tasks processed: {total_tasks}") - logger.info(f" - Tasks successfully updated: {tasks_updated}") - logger.info(f" - Tasks failed/skipped: {total_tasks - tasks_updated}") - logger.info("=" * 80) - - # Update final state before returning - self.update_state( - state='SUCCESS', - meta={ - 'current': total_tasks, - 'total': total_tasks, - 'percentage': 100, - 'message': final_message, - 'phase': 'DONE', - 'request_steps': request_steps, - 'response_steps': response_steps, - 'tasks_updated': tasks_updated - } - ) - - return { - 'success': True, - 'tasks_updated': tasks_updated, - 'message': final_message, - } - - except Exception as e: - # Import database error types for better error handling - from django.db import OperationalError, DatabaseError, IntegrityError - from django.core.exceptions import ValidationError - - error_type = type(e).__name__ - error_message = str(e) - - logger.error("=" * 80) - logger.error("CRITICAL ERROR in auto_generate_content_task") - logger.error(f" - Error type: {error_type}") - logger.error(f" - Error message: {error_message}") - logger.error(f" - Task IDs: {task_ids}") - logger.error(f" - Account ID: {account_id}") - logger.error("=" * 80, exc_info=True) - - # Update Celery task state with detailed error information - self.update_state( - state='FAILURE', - meta={ - 'error': error_message, - 'error_type': error_type, - 'message': f'Error: {error_message}', - 'task_ids': task_ids, - 'account_id': account_id - } - ) - - # Return error result instead of raising (for synchronous execution) - return { - 'success': False, - 'error': error_message, - 'error_type': error_type, - 'tasks_updated': 0 - } - - -@shared_task(bind=True, max_retries=3) -def auto_generate_images_task(self, task_ids: List[int], account_id: int = None): - """ - Celery task to generate images for tasks using AI. - Sequential processing: Featured → Desktop → Mobile images - - Args: - task_ids: List of task IDs - account_id: Account ID for account isolation - """ - try: - from igny8_core.auth.models import Account - - # Initialize progress - self.update_state( - state='PROGRESS', - meta={ - 'current': 0, - 'total': len(task_ids), - 'percentage': 0, - 'message': 'Initializing image generation...', - 'phase': 'initializing' - } - ) - - # Get account (backward compatibility: account_id parameter) - account = None - if account_id: - try: - account = Account.objects.get(id=account_id) - except Account.DoesNotExist: - pass - - # Get tasks - tasks_queryset = Tasks.objects.filter(id__in=task_ids) - if account: - tasks_queryset = tasks_queryset.filter(account=account) - - tasks = list(tasks_queryset.select_related('account', 'sector', 'site')) - - if not tasks: - logger.warning(f"No tasks found: {task_ids}") - return {'success': False, 'error': 'No tasks found'} - - # Get image generation settings from IntegrationSettings - image_settings = {} - if account: - try: - from igny8_core.modules.system.models import IntegrationSettings - integration = IntegrationSettings.objects.get( - account=account, - integration_type='image_generation', - is_active=True - ) - image_settings = integration.config or {} - except IntegrationSettings.DoesNotExist: - logger.warning("Image generation settings not found, using defaults") - - # Extract settings - provider = image_settings.get('provider') or image_settings.get('service', 'openai') - # Get model based on provider - if provider == 'runware': - model = image_settings.get('model') or image_settings.get('runwareModel', 'runware:97@1') - else: - model = image_settings.get('model', 'dall-e-3') - image_type = image_settings.get('image_type', 'realistic') - max_in_article_images = int(image_settings.get('max_in_article_images', 2)) - image_format = image_settings.get('image_format', 'webp') - desktop_enabled = image_settings.get('desktop_enabled', True) - mobile_enabled = image_settings.get('mobile_enabled', True) - - total_tasks = len(tasks) - - # Calculate total images to generate - # Each task: 1 featured + (desktop_enabled ? max_in_article_images : 0) + (mobile_enabled ? max_in_article_images : 0) - images_per_task = 1 + (max_in_article_images if desktop_enabled else 0) + (max_in_article_images if mobile_enabled else 0) - total_images = total_tasks * images_per_task - - current_image = 0 - images_created = 0 - - # Create AIProcessor instance - from igny8_core.utils.ai_processor import AIProcessor - processor = AIProcessor(account=account) - - # Process each task sequentially - for task_idx, task in enumerate(tasks): - # Check if task has content - if not task.content: - logger.warning(f"Task {task.id} has no content, skipping image generation") - continue - - # Update progress: Extracting image prompts (0-10% per task) - progress_pct = int((task_idx / total_tasks) * 10) - self.update_state( - state='PROGRESS', - meta={ - 'current': task_idx + 1, - 'total': total_tasks, - 'percentage': progress_pct, - 'message': f"Extracting image prompts from '{task.title}' ({task_idx + 1} of {total_tasks})...", - 'phase': 'extracting_prompts', - 'current_item': task.title - } - ) - - # Extract image prompts from content - prompts_result = processor.extract_image_prompts( - content=task.content, - title=task.title, - max_images=max_in_article_images, - account=account - ) - - if prompts_result.get('error'): - logger.error(f"Error extracting prompts for task {task.id}: {prompts_result['error']}") - continue - - featured_prompt = prompts_result.get('featured_prompt', '') - in_article_prompts = prompts_result.get('in_article_prompts', []) - - # Get image prompt template - image_prompt_template = get_prompt_value(account, 'image_prompt_template') if account else None - if not image_prompt_template: - image_prompt_template = get_default_prompt('image_prompt_template') - - # Get negative prompt - negative_prompt = get_prompt_value(account, 'negative_prompt') if account else None - if not negative_prompt: - negative_prompt = get_default_prompt('negative_prompt') - - # Generate Featured Image (always) - current_image += 1 - progress_pct = 10 + int((current_image / total_images) * 30) - self.update_state( - state='PROGRESS', - meta={ - 'current': current_image, - 'total': total_images, - 'percentage': progress_pct, - 'message': f"Generating Featured Image for '{task.title}' ({task_idx + 1} of {total_tasks})...", - 'phase': 'generating_featured', - 'current_item': task.title - } - ) - - # Format featured image prompt - formatted_featured_prompt = image_prompt_template.format( - image_type=image_type, - post_title=task.title, - image_prompt=featured_prompt - ) - - # Generate featured image - # For Runware, pass model in kwargs - featured_kwargs = {} - if provider == 'runware': - featured_kwargs['model'] = model - featured_kwargs['negative_prompt'] = negative_prompt - - featured_result = processor.generate_image( - prompt=formatted_featured_prompt, - provider=provider, - model=model if provider != 'runware' else None, # Model param for OpenAI, kwargs for Runware - size='1280x832', # Featured image size (fixed) - **featured_kwargs - ) - - if featured_result.get('url') and not featured_result.get('error'): - Images.objects.create( - task=task, - image_type='featured', - image_url=featured_result['url'], - prompt=featured_result.get('revised_prompt') or formatted_featured_prompt, - status='generated', - account=task.account, - site=task.site, - sector=task.sector, - ) - images_created += 1 - - # Generate Desktop Images (if enabled) - if desktop_enabled and in_article_prompts: - for img_idx, img_prompt in enumerate(in_article_prompts): - current_image += 1 - progress_pct = 40 + int((current_image / total_images) * 30) - self.update_state( - state='PROGRESS', - meta={ - 'current': current_image, - 'total': total_images, - 'percentage': progress_pct, - 'message': f"Generating Desktop Image {img_idx + 1} of {len(in_article_prompts)} for '{task.title}'...", - 'phase': 'generating_desktop', - 'current_item': task.title - } - ) - - # Format desktop image prompt - formatted_img_prompt = image_prompt_template.format( - image_type=image_type, - post_title=task.title, - image_prompt=img_prompt - ) - - # Generate desktop image - desktop_kwargs = {} - if provider == 'runware': - desktop_kwargs['model'] = model - desktop_kwargs['negative_prompt'] = negative_prompt - - desktop_result = processor.generate_image( - prompt=formatted_img_prompt, - provider=provider, - model=model if provider != 'runware' else None, - size='1024x1024', # Desktop image size (fixed) - **desktop_kwargs - ) - - if desktop_result.get('url') and not desktop_result.get('error'): - Images.objects.create( - task=task, - image_type='desktop', - image_url=desktop_result['url'], - prompt=desktop_result.get('revised_prompt') or formatted_img_prompt, - status='generated', - position=img_idx + 1, - account=task.account, - site=task.site, - sector=task.sector, - ) - images_created += 1 - - # Generate Mobile Images (if enabled) - if mobile_enabled and in_article_prompts: - for img_idx, img_prompt in enumerate(in_article_prompts): - current_image += 1 - progress_pct = 70 + int((current_image / total_images) * 25) - self.update_state( - state='PROGRESS', - meta={ - 'current': current_image, - 'total': total_images, - 'percentage': progress_pct, - 'message': f"Generating Mobile Image {img_idx + 1} of {len(in_article_prompts)} for '{task.title}'...", - 'phase': 'generating_mobile', - 'current_item': task.title - } - ) - - # Format mobile image prompt - formatted_img_prompt = image_prompt_template.format( - image_type=image_type, - post_title=task.title, - image_prompt=img_prompt - ) - - # Generate mobile image - mobile_kwargs = {} - if provider == 'runware': - mobile_kwargs['model'] = model - mobile_kwargs['negative_prompt'] = negative_prompt - - mobile_result = processor.generate_image( - prompt=formatted_img_prompt, - provider=provider, - model=model if provider != 'runware' else None, - size='960x1280', # Mobile image size (fixed) - **mobile_kwargs - ) - - if mobile_result.get('url') and not mobile_result.get('error'): - Images.objects.create( - task=task, - image_type='mobile', - image_url=mobile_result['url'], - prompt=mobile_result.get('revised_prompt') or formatted_img_prompt, - status='generated', - position=img_idx + 1, - account=task.account, - site=task.site, - sector=task.sector, - ) - images_created += 1 - - # Update progress: Saving images (95-98%) - progress_pct = 95 + int((task_idx / total_tasks) * 3) - self.update_state( - state='PROGRESS', - meta={ - 'current': task_idx + 1, - 'total': total_tasks, - 'percentage': progress_pct, - 'message': f"Saving images for '{task.title}'...", - 'phase': 'saving', - 'current_item': task.title - } - ) - - # Final progress update - final_message = f"Image generation complete: {images_created} images generated for {total_tasks} tasks" - logger.info(final_message) - - return { - 'success': True, - 'images_created': images_created, - 'message': final_message, - } - - except Exception as e: - logger.error(f"Error in auto_generate_images_task: {str(e)}", exc_info=True) - self.update_state( - state='FAILURE', - meta={ - 'error': str(e), - 'message': f'Error: {str(e)}' - } - ) - raise - diff --git a/backend/igny8_core/modules/writer/views.py b/backend/igny8_core/modules/writer/views.py index 216b5f55..8691eeb3 100644 --- a/backend/igny8_core/modules/writer/views.py +++ b/backend/igny8_core/modules/writer/views.py @@ -178,19 +178,37 @@ class TasksViewSet(SiteSectorModelViewSet): # Try to queue Celery task, fall back to synchronous if Celery not available try: - from .tasks import auto_generate_content_task + from igny8_core.ai.tasks import run_ai_task + from kombu.exceptions import OperationalError as KombuOperationalError - if hasattr(auto_generate_content_task, 'delay'): + if hasattr(run_ai_task, 'delay'): # Celery is available - queue async task logger.info(f"auto_generate_content: Queuing Celery task for {len(ids)} tasks") try: - task = auto_generate_content_task.delay(ids, account_id=account_id) + task = run_ai_task.delay( + function_name='generate_content', + payload={'ids': ids}, + account_id=account_id + ) logger.info(f"auto_generate_content: Celery task queued successfully: {task.id}") return Response({ 'success': True, 'task_id': str(task.id), 'message': 'Content generation started' }, status=status.HTTP_200_OK) + except KombuOperationalError as celery_error: + logger.error("=" * 80) + logger.error("CELERY ERROR: Failed to queue task") + logger.error(f" - Error type: {type(celery_error).__name__}") + logger.error(f" - Error message: {str(celery_error)}") + logger.error(f" - Task IDs: {ids}") + logger.error(f" - Account ID: {account_id}") + logger.error("=" * 80, exc_info=True) + + return Response({ + 'error': 'Task queue unavailable. Please try again.', + 'type': 'QueueError' + }, status=status.HTTP_503_SERVICE_UNAVAILABLE) except Exception as celery_error: logger.error("=" * 80) logger.error("CELERY ERROR: Failed to queue task") @@ -202,11 +220,15 @@ class TasksViewSet(SiteSectorModelViewSet): # Fall back to synchronous execution logger.info("auto_generate_content: Falling back to synchronous execution") - result = auto_generate_content_task(ids, account_id=account_id) + result = run_ai_task( + function_name='generate_content', + payload={'ids': ids}, + account_id=account_id + ) if result.get('success'): return Response({ 'success': True, - 'tasks_updated': result.get('tasks_updated', 0), + 'tasks_updated': result.get('count', 0), 'message': 'Content generated successfully (synchronous)' }, status=status.HTTP_200_OK) else: @@ -217,12 +239,16 @@ class TasksViewSet(SiteSectorModelViewSet): else: # Celery not available - execute synchronously logger.info(f"auto_generate_content: Executing synchronously (Celery not available)") - result = auto_generate_content_task(ids, account_id=account_id) + result = run_ai_task( + function_name='generate_content', + payload={'ids': ids}, + account_id=account_id + ) if result.get('success'): - logger.info(f"auto_generate_content: Synchronous execution successful: {result.get('tasks_updated', 0)} tasks updated") + logger.info(f"auto_generate_content: Synchronous execution successful: {result.get('count', 0)} tasks updated") return Response({ 'success': True, - 'tasks_updated': result.get('tasks_updated', 0), + 'tasks_updated': result.get('count', 0), 'message': 'Content generated successfully' }, status=status.HTTP_200_OK) else: @@ -356,10 +382,16 @@ class ImagesViewSet(SiteSectorModelViewSet): # Try to queue Celery task, fall back to synchronous if Celery not available try: - from .tasks import auto_generate_images_task - if hasattr(auto_generate_images_task, 'delay'): + from igny8_core.ai.tasks import run_ai_task + from kombu.exceptions import OperationalError as KombuOperationalError + + if hasattr(run_ai_task, 'delay'): # Celery is available - queue async task - task = auto_generate_images_task.delay(task_ids, account_id=account_id) + task = run_ai_task.delay( + function_name='generate_images', + payload={'ids': task_ids}, + account_id=account_id + ) return Response({ 'success': True, 'task_id': str(task.id), @@ -367,22 +399,39 @@ class ImagesViewSet(SiteSectorModelViewSet): }, status=status.HTTP_200_OK) else: # Celery not available - execute synchronously - result = auto_generate_images_task(task_ids, account_id=account_id) + result = run_ai_task( + function_name='generate_images', + payload={'ids': task_ids}, + account_id=account_id + ) if result.get('success'): return Response({ 'success': True, - 'images_created': result.get('images_created', 0), + 'images_created': result.get('count', 0), 'message': result.get('message', 'Image generation completed') }, status=status.HTTP_200_OK) else: return Response({ 'error': result.get('error', 'Image generation failed') }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + except KombuOperationalError as e: + return Response({ + 'error': 'Task queue unavailable. Please try again.', + 'type': 'QueueError' + }, status=status.HTTP_503_SERVICE_UNAVAILABLE) except ImportError: # Tasks module not available return Response({ 'error': 'Image generation task not available' }, status=status.HTTP_503_SERVICE_UNAVAILABLE) + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.error(f"Error queuing image generation task: {str(e)}", exc_info=True) + return Response({ + 'error': f'Failed to start image generation: {str(e)}', + 'type': 'TaskError' + }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) class ContentViewSet(SiteSectorModelViewSet): diff --git a/backend/igny8_core/utils/ai_processor.py b/backend/igny8_core/utils/ai_processor.py index 582743c3..0befc71e 100644 --- a/backend/igny8_core/utils/ai_processor.py +++ b/backend/igny8_core/utils/ai_processor.py @@ -15,33 +15,15 @@ from django.core.cache import cache logger = logging.getLogger(__name__) -# Model pricing (per 1M tokens) - EXACT from reference plugin model-rates-config.php -MODEL_RATES = { - 'gpt-4.1': {'input': 2.00, 'output': 8.00}, - 'gpt-4o-mini': {'input': 0.15, 'output': 0.60}, - 'gpt-4o': {'input': 2.50, 'output': 10.00}, -} - -# Image model pricing (per image) - EXACT from reference plugin -IMAGE_MODEL_RATES = { - 'dall-e-3': 0.040, - 'dall-e-2': 0.020, - 'gpt-image-1': 0.042, - 'gpt-image-1-mini': 0.011, -} - -# Valid OpenAI image generation models (only these work with /v1/images/generations endpoint) -VALID_OPENAI_IMAGE_MODELS = { - 'dall-e-3', - 'dall-e-2', - # Note: gpt-image-1 and gpt-image-1-mini are NOT valid for OpenAI's /v1/images/generations endpoint -} - -# Valid image sizes per model (from OpenAI official documentation) -VALID_SIZES_BY_MODEL = { - 'dall-e-3': ['1024x1024', '1024x1792', '1792x1024'], - 'dall-e-2': ['256x256', '512x512', '1024x1024'], -} +# Import constants from unified location +from igny8_core.ai.constants import ( + MODEL_RATES, + IMAGE_MODEL_RATES, + VALID_OPENAI_IMAGE_MODELS, + VALID_SIZES_BY_MODEL, + DEFAULT_AI_MODEL, + JSON_MODE_MODELS, +) class AIProcessor: diff --git a/frontend/src/services/api.ts b/frontend/src/services/api.ts index 6f04a92f..5d343913 100644 --- a/frontend/src/services/api.ts +++ b/frontend/src/services/api.ts @@ -1,7 +1,6 @@ // Centralized API configuration and functions // Auto-detect API URL based on current origin (supports both IP and subdomain access) import { useAuthStore } from '../store/authStore'; -import { useAIRequestLogsStore } from '../store/aiRequestLogsStore'; function getApiBaseUrl(): string { // First check environment variables @@ -575,43 +574,15 @@ export async function bulkUpdateClustersStatus(ids: number[], status: string): P } export async function autoClusterKeywords(keywordIds: number[], sectorId?: number): Promise<{ success: boolean; task_id?: string; clusters_created?: number; keywords_updated?: number; message?: string; error?: string }> { - const startTime = Date.now(); - const addLog = useAIRequestLogsStore.getState().addLog; - const endpoint = `/v1/planner/keywords/auto_cluster/`; const requestBody = { ids: keywordIds, sector_id: sectorId }; - const pendingLogId = addLog({ - function: 'autoClusterKeywords', - endpoint, - request: { - method: 'POST', - body: requestBody, - }, - status: 'pending', - }); - try { const response = await fetchAPI(endpoint, { method: 'POST', body: JSON.stringify(requestBody), }); - const duration = Date.now() - startTime; - const updateLog = useAIRequestLogsStore.getState().updateLog; - - // Update log with response data (including task_id for progress tracking) - if (pendingLogId && response) { - updateLog(pendingLogId, { - response: { - status: 200, - data: response, - }, - status: response.success === false ? 'error' : 'success', - duration, - }); - } - // Check if response indicates an error (success: false) if (response && response.success === false) { // Return error response as-is so caller can check result.success @@ -620,108 +591,7 @@ export async function autoClusterKeywords(keywordIds: number[], sectorId?: numbe return response; } catch (error: any) { - const duration = Date.now() - startTime; - - // Try to extract error response data if available - let errorResponseData = null; - let errorRequestSteps = null; - - // Check if error has response data (from fetchAPI) - if (error.response || error.data) { - errorResponseData = error.response || error.data; - errorRequestSteps = errorResponseData?.request_steps; - } else if ((error as any).response) { - // Error object from fetchAPI has response attached - errorResponseData = (error as any).response; - errorRequestSteps = errorResponseData?.request_steps; - } - - // Parse error message to extract error type - let errorType = 'UNKNOWN_ERROR'; - let errorMessage = error.message || 'Unknown error'; - - // Check if error response contains JSON with error field - if (error.message && error.message.includes('API Error')) { - // Try to extract structured error from API response - const apiErrorMatch = error.message.match(/API Error \(\d+\): ([^-]+) - (.+)/); - if (apiErrorMatch) { - errorType = apiErrorMatch[1].trim(); - errorMessage = apiErrorMatch[2].trim(); - } - } - - if (errorMessage.includes('OperationalError')) { - errorType = 'DATABASE_ERROR'; - errorMessage = errorMessage.replace(/API Error \(\d+\): /, '').replace(/ - .*OperationalError.*/, ' - Database operation failed'); - } else if (errorMessage.includes('ValidationError')) { - errorType = 'VALIDATION_ERROR'; - } else if (errorMessage.includes('PermissionDenied')) { - errorType = 'PERMISSION_ERROR'; - } else if (errorMessage.includes('NotFound')) { - errorType = 'NOT_FOUND_ERROR'; - } else if (errorMessage.includes('IntegrityError')) { - errorType = 'DATABASE_ERROR'; - } else if (errorMessage.includes('RelatedObjectDoesNotExist')) { - errorType = 'RELATED_OBJECT_ERROR'; - // Extract clean error message - errorMessage = errorMessage.replace(/API Error \(\d+\): [^-]+ - /, '').trim(); - } - - // Update existing log or create new one - const updateLog = useAIRequestLogsStore.getState().updateLog; - const addRequestStep = useAIRequestLogsStore.getState().addRequestStep; - - if (pendingLogId) { - updateLog(pendingLogId, { - response: { - status: errorResponseData?.status || 500, - error: errorMessage, - errorType, - data: errorResponseData, - }, - status: 'error', - duration, - }); - - // Add request steps from error response if available - if (errorRequestSteps && Array.isArray(errorRequestSteps)) { - errorRequestSteps.forEach((step: any) => { - addRequestStep(pendingLogId, step); - }); - } - } else { - // Create new log if pendingLogId doesn't exist - const errorLogId = addLog({ - function: 'autoClusterKeywords', - endpoint, - request: { - method: 'POST', - body: requestBody, - }, - response: { - status: errorResponseData?.status || 500, - error: errorMessage, - errorType, - data: errorResponseData, - }, - status: 'error', - duration, - }); - - if (errorLogId && errorRequestSteps && Array.isArray(errorRequestSteps)) { - errorRequestSteps.forEach((step: any) => { - addRequestStep(errorLogId, step); - }); - } - } - - // Return error response in same format as successful response - // This allows the caller to check result.success === false - return { - success: false, - error: errorMessage, - errorType, - }; + throw error; } } diff --git a/frontend/src/store/aiRequestLogsStore.ts b/frontend/src/store/aiRequestLogsStore.ts deleted file mode 100644 index 6d14b0f7..00000000 --- a/frontend/src/store/aiRequestLogsStore.ts +++ /dev/null @@ -1,122 +0,0 @@ -import { create } from 'zustand'; - -export interface AIStepLog { - stepNumber: number; - stepName: string; - functionName: string; - status: 'pending' | 'success' | 'error'; - timestamp: Date; - message?: string; - error?: string; - duration?: number; // milliseconds -} - -export interface AIRequestLog { - id: string; - timestamp: Date; - function: string; // e.g., 'autoClusterKeywords', 'autoGenerateIdeas', 'autoGenerateContent', 'autoGenerateImages' - endpoint: string; - request: { - method: string; - body?: any; - params?: any; - }; - response?: { - status: number; - data?: any; - error?: string; - errorType?: string; // e.g., 'DATABASE_ERROR', 'VALIDATION_ERROR', 'PERMISSION_ERROR' - }; - status: 'pending' | 'success' | 'error'; - duration?: number; // milliseconds - requestSteps: AIStepLog[]; // Request steps (INIT, PREP, SAVE, DONE) - responseSteps: AIStepLog[]; // Response steps (AI_CALL, PARSE) -} - -interface AIRequestLogsStore { - logs: AIRequestLog[]; - addLog: (log: Omit) => string; - updateLog: (logId: string, updates: Partial) => void; - addRequestStep: (logId: string, step: Omit) => void; - addResponseStep: (logId: string, step: Omit) => void; - clearLogs: () => void; - maxLogs: number; -} - -export const useAIRequestLogsStore = create((set, get) => ({ - logs: [], - maxLogs: 20, // Keep last 20 logs - - addLog: (log) => { - const newLog: AIRequestLog = { - ...log, - id: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, - timestamp: new Date(), - requestSteps: [], - responseSteps: [], - }; - - set((state) => { - const updatedLogs = [newLog, ...state.logs].slice(0, state.maxLogs); - return { logs: updatedLogs }; - }); - - // Return the log ID so callers can add steps - return newLog.id; - }, - - updateLog: (logId, updates) => { - set((state) => { - const updatedLogs = state.logs.map((log) => { - if (log.id === logId) { - return { ...log, ...updates }; - } - return log; - }); - return { logs: updatedLogs }; - }); - }, - - addRequestStep: (logId, step) => { - set((state) => { - const updatedLogs = state.logs.map((log) => { - if (log.id === logId) { - const stepWithTimestamp: AIStepLog = { - ...step, - timestamp: new Date(), - }; - return { - ...log, - requestSteps: [...log.requestSteps, stepWithTimestamp], - }; - } - return log; - }); - return { logs: updatedLogs }; - }); - }, - - addResponseStep: (logId, step) => { - set((state) => { - const updatedLogs = state.logs.map((log) => { - if (log.id === logId) { - const stepWithTimestamp: AIStepLog = { - ...step, - timestamp: new Date(), - }; - return { - ...log, - responseSteps: [...log.responseSteps, stepWithTimestamp], - }; - } - return log; - }); - return { logs: updatedLogs }; - }); - }, - - clearLogs: () => { - set({ logs: [] }); - }, -})); -