""" Celery tasks for Planner module - AI clustering and idea generation """ import logging import time from typing import List from django.db import transaction from igny8_core.modules.planner.models import Keywords, Clusters, ContentIdeas from igny8_core.utils.ai_processor import ai_processor from igny8_core.ai.functions.generate_ideas import generate_ideas_core from igny8_core.ai.tracker import ConsoleStepTracker logger = logging.getLogger(__name__) # Try to import Celery, fall back to synchronous execution if not available try: from celery import shared_task CELERY_AVAILABLE = True except ImportError: CELERY_AVAILABLE = False # Create a mock decorator for synchronous execution def shared_task(*args, **kwargs): def decorator(func): return func return decorator # ============================================================================ # DEPRECATED: This function is deprecated. Use the new AI framework instead. # New path: views.py -> run_ai_task -> AIEngine -> AutoClusterFunction # This function is kept for backward compatibility but should not be used. # ============================================================================ def _auto_cluster_keywords_core(keyword_ids: List[int], sector_id: int = None, account_id: int = None, progress_callback=None): """ [DEPRECATED] Core logic for clustering keywords. Can be called with or without Celery. ⚠️ WARNING: This function is deprecated. Use the new AI framework instead: - New path: views.py -> run_ai_task -> AIEngine -> AutoClusterFunction - This function uses the old AIProcessor and does not use PromptRegistry - Console logging may not work correctly in this path Args: keyword_ids: List of keyword IDs to cluster sector_id: Sector ID for the keywords account_id: Account ID for account isolation progress_callback: Optional function to call for progress updates (for Celery tasks) """ # Initialize console step tracker for logging tracker = ConsoleStepTracker('auto_cluster') tracker.init(f"Starting keyword clustering for {len(keyword_ids)} keywords") # Track request and response steps (for Celery progress callbacks) request_steps = [] response_steps = [] try: from igny8_core.auth.models import Sector # Initialize progress if callback provided if progress_callback: progress_callback( state='PROGRESS', meta={ 'current': 0, 'total': len(keyword_ids), 'percentage': 0, 'message': 'Initializing keyword clustering...', 'phase': 'initializing', 'request_steps': request_steps, 'response_steps': response_steps } ) # Step 4: Keyword Loading & Validation tracker.prep(f"Loading {len(keyword_ids)} keywords from database") step_start = time.time() keywords_queryset = Keywords.objects.filter(id__in=keyword_ids) if account_id: keywords_queryset = keywords_queryset.filter(account_id=account_id) if sector_id: keywords_queryset = keywords_queryset.filter(sector_id=sector_id) keywords = list(keywords_queryset.select_related('account', 'site', 'site__account', 'sector', 'sector__site')) if not keywords: error_msg = f"No keywords found for clustering: {keyword_ids}" logger.warning(error_msg) tracker.error('Validation', error_msg) request_steps.append({ 'stepNumber': 4, 'stepName': 'Keyword Loading & Validation', 'functionName': '_auto_cluster_keywords_core', 'status': 'error', 'message': 'No keywords found', 'error': 'No keywords found', 'duration': int((time.time() - step_start) * 1000) }) if progress_callback: progress_callback( state='PROGRESS', meta={'request_steps': request_steps, 'response_steps': response_steps} ) return {'success': False, 'error': 'No keywords found', 'request_steps': request_steps, 'response_steps': response_steps} tracker.prep(f"Loaded {len(keywords)} keywords successfully") request_steps.append({ 'stepNumber': 4, 'stepName': 'Keyword Loading & Validation', 'functionName': '_auto_cluster_keywords_core', 'status': 'success', 'message': f'Loaded {len(keywords)} keywords', 'duration': int((time.time() - step_start) * 1000) }) total_keywords = len(keywords) # Step 5: Relationship Validation step_start = time.time() try: first_keyword = keywords[0] account = getattr(first_keyword, 'account', None) site = getattr(first_keyword, 'site', None) # If account is None, try to get it from site if not account and site: try: account = getattr(site, 'account', None) except Exception: pass sector = getattr(first_keyword, 'sector', None) # If site is None, try to get it from sector if not site and sector: try: site = getattr(sector, 'site', None) except Exception: pass except Exception as e: logger.error(f"Error accessing keyword relationships: {str(e)}") request_steps.append({ 'stepNumber': 5, 'stepName': 'Relationship Validation', 'functionName': '_auto_cluster_keywords_core', 'status': 'error', 'message': f'Error accessing relationships: {str(e)}', 'error': str(e), 'duration': int((time.time() - step_start) * 1000) }) if progress_callback: progress_callback( state='PROGRESS', meta={'request_steps': request_steps, 'response_steps': response_steps} ) return {'success': False, 'error': f'Invalid keyword data: {str(e)}', 'request_steps': request_steps, 'response_steps': response_steps} if not account: logger.error(f"No account found for keywords: {keyword_ids}. Keyword site: {getattr(first_keyword, 'site', None)}, Keyword account: {getattr(first_keyword, 'account', None)}") request_steps.append({ 'stepNumber': 5, 'stepName': 'Relationship Validation', 'functionName': '_auto_cluster_keywords_core', 'status': 'error', 'message': 'No account found', 'error': 'No account found for keywords', 'duration': int((time.time() - step_start) * 1000) }) if progress_callback: progress_callback( state='PROGRESS', meta={'request_steps': request_steps, 'response_steps': response_steps} ) return {'success': False, 'error': 'No account found for keywords. Please ensure keywords are properly associated with a site and account.', 'request_steps': request_steps, 'response_steps': response_steps} if not site: logger.error(f"No site found for keywords: {keyword_ids}. Keyword site: {getattr(first_keyword, 'site', None)}, Sector site: {getattr(sector, 'site', None) if sector else None}") request_steps.append({ 'stepNumber': 5, 'stepName': 'Relationship Validation', 'functionName': '_auto_cluster_keywords_core', 'status': 'error', 'message': 'No site found', 'error': 'No site found for keywords', 'duration': int((time.time() - step_start) * 1000) }) if progress_callback: progress_callback( state='PROGRESS', meta={'request_steps': request_steps, 'response_steps': response_steps} ) return {'success': False, 'error': 'No site found for keywords. Please ensure keywords are properly associated with a site.', 'request_steps': request_steps, 'response_steps': response_steps} request_steps.append({ 'stepNumber': 5, 'stepName': 'Relationship Validation', 'functionName': '_auto_cluster_keywords_core', 'status': 'success', 'message': f'Account: {account.id if account else None}, Site: {site.id if site else None}, Sector: {sector.id if sector else None}', 'duration': int((time.time() - step_start) * 1000) }) # Update progress: Analyzing keywords (0-40%) if progress_callback: progress_callback( state='PROGRESS', meta={ 'current': 0, 'total': total_keywords, 'percentage': 5, 'message': f'Preparing to analyze {total_keywords} keywords...', 'phase': 'preparing', 'request_steps': request_steps, 'response_steps': response_steps } ) # Get sector name if available sector_name = sector.name if sector else None # Format keywords for AI keyword_data = [ { 'keyword': kw.keyword, 'volume': kw.volume, 'difficulty': kw.difficulty, 'intent': kw.intent, } for kw in keywords ] # Update progress: Sending to AI (10-40%) if progress_callback: progress_callback( state='PROGRESS', meta={ 'current': 0, 'total': total_keywords, 'percentage': 10, 'message': 'Analyzing keyword relationships with AI...', 'phase': 'analyzing', 'request_steps': request_steps, 'response_steps': response_steps } ) # Step 6: AIProcessor Creation step_start = time.time() from igny8_core.utils.ai_processor import AIProcessor try: # Log account info for debugging account_id = account.id if account else None account_name = account.name if account else None logger.info(f"Creating AIProcessor with account: id={account_id}, name={account_name}") processor = AIProcessor(account=account) # Log API key status has_api_key = bool(processor.openai_api_key) api_key_preview = processor.openai_api_key[:10] + "..." if processor.openai_api_key else "None" logger.info(f"AIProcessor created. Has API key: {has_api_key}, Preview: {api_key_preview}, Model: {processor.default_model}") request_steps.append({ 'stepNumber': 6, 'stepName': 'AIProcessor Creation', 'functionName': '_auto_cluster_keywords_core', 'status': 'success', 'message': f'AIProcessor created with account context (Account ID: {account_id}, Has API Key: {has_api_key})', 'duration': int((time.time() - step_start) * 1000) }) except Exception as e: logger.error(f"Error creating AIProcessor: {type(e).__name__}: {str(e)}", exc_info=True) request_steps.append({ 'stepNumber': 6, 'stepName': 'AIProcessor Creation', 'functionName': '_auto_cluster_keywords_core', 'status': 'error', 'message': f'Error creating AIProcessor: {str(e)}', 'error': str(e), 'duration': int((time.time() - step_start) * 1000) }) if progress_callback: progress_callback( state='PROGRESS', meta={'request_steps': request_steps, 'response_steps': response_steps} ) return {'success': False, 'error': f'Error creating AIProcessor: {str(e)}', 'request_steps': request_steps, 'response_steps': response_steps} # Step 7: AI Call Preparation step_start = time.time() try: # Check if API key is available if not processor.openai_api_key: # Try to debug why API key is missing logger.error(f"OpenAI API key not found for account {account.id if account else None}") # Check IntegrationSettings directly try: from igny8_core.modules.system.models import IntegrationSettings settings_obj = IntegrationSettings.objects.filter( integration_type='openai', account=account, is_active=True ).first() if settings_obj: logger.error(f"IntegrationSettings found but API key missing. Config keys: {list(settings_obj.config.keys()) if settings_obj.config else 'None'}") else: logger.error(f"No IntegrationSettings found for account {account.id if account else None}, integration_type='openai', is_active=True") except Exception as debug_error: logger.error(f"Error checking IntegrationSettings: {str(debug_error)}", exc_info=True) request_steps.append({ 'stepNumber': 7, 'stepName': 'AI Call Preparation', 'functionName': '_auto_cluster_keywords_core', 'status': 'error', 'message': 'OpenAI API key not configured', 'error': 'OpenAI API key not configured', 'duration': int((time.time() - step_start) * 1000) }) if progress_callback: progress_callback( state='PROGRESS', meta={'request_steps': request_steps, 'response_steps': response_steps} ) return {'success': False, 'error': 'OpenAI API key not configured', 'request_steps': request_steps, 'response_steps': response_steps} request_steps.append({ 'stepNumber': 7, 'stepName': 'AI Call Preparation', 'functionName': '_auto_cluster_keywords_core', 'status': 'success', 'message': f'Prepared {len(keyword_data)} keywords for AI analysis', 'duration': int((time.time() - step_start) * 1000) }) except Exception as e: request_steps.append({ 'stepNumber': 7, 'stepName': 'AI Call Preparation', 'functionName': '_auto_cluster_keywords_core', 'status': 'error', 'message': f'Error preparing AI call: {str(e)}', 'error': str(e), 'duration': int((time.time() - step_start) * 1000) }) if progress_callback: progress_callback( state='PROGRESS', meta={'request_steps': request_steps, 'response_steps': response_steps} ) return {'success': False, 'error': f'Error preparing AI call: {str(e)}', 'request_steps': request_steps, 'response_steps': response_steps} # Call AI with step tracking tracker.ai_call(f"Sending {len(keyword_data)} keywords to AI for clustering") result = processor.cluster_keywords( keyword_data, sector_name=sector_name, account=account, response_steps=response_steps, progress_callback=progress_callback, tracker=tracker # Pass tracker for console logging ) if result.get('error'): error_msg = f"AI clustering error: {result['error']}" logger.error(error_msg) tracker.error('AI_CALL', error_msg) if progress_callback: progress_callback( state='FAILURE', meta={ 'error': result['error'], 'message': f"Error: {result['error']}", 'request_steps': request_steps, 'response_steps': response_steps } ) return {'success': False, 'error': result['error'], 'request_steps': request_steps, 'response_steps': response_steps} # Parse response tracker.parse("Parsing AI response into cluster data") # Update response_steps from result if available if result.get('response_steps'): response_steps.extend(result.get('response_steps', [])) # Update progress: Creating clusters (40-90%) clusters_data = result.get('clusters', []) if progress_callback: progress_callback( state='PROGRESS', meta={ 'current': 0, 'total': total_keywords, 'percentage': 40, 'message': f'Creating {len(clusters_data)} clusters...', 'phase': 'creating_clusters', 'request_steps': request_steps, 'response_steps': response_steps } ) clusters_created = 0 keywords_updated = 0 # Step 13: Database Transaction Start tracker.save(f"Creating {len(clusters_data)} clusters in database") step_start = time.time() # Create/update clusters and assign keywords # Note: account and sector are already extracted above to avoid database queries inside transaction with transaction.atomic(): if response_steps is not None: response_steps.append({ 'stepNumber': 13, 'stepName': 'Database Transaction Start', 'functionName': '_auto_cluster_keywords_core', 'status': 'success', 'message': 'Transaction started', 'duration': int((time.time() - step_start) * 1000) }) # Step 14: Cluster Creation/Update cluster_step_start = time.time() for idx, cluster_data in enumerate(clusters_data): cluster_name = cluster_data.get('name', '') cluster_keywords = cluster_data.get('keywords', []) if not cluster_name or not cluster_keywords: continue # Update progress for each cluster if progress_callback: progress_pct = 40 + int((idx / len(clusters_data)) * 50) progress_callback( state='PROGRESS', meta={ 'current': idx + 1, 'total': len(clusters_data), 'percentage': progress_pct, 'message': f"Creating cluster '{cluster_name}' ({idx + 1} of {len(clusters_data)})...", 'phase': 'creating_clusters', 'current_item': cluster_name, 'request_steps': request_steps, 'response_steps': response_steps } ) # Get or create cluster # Note: Clusters model (SiteSectorBaseModel) requires both site and sector # Ensure site is always set (can be from sector.site if sector exists) cluster_site = site if site else (sector.site if sector and hasattr(sector, 'site') else None) if not cluster_site: logger.error(f"Cannot create cluster '{cluster_name}': No site available. Keywords: {keyword_ids}") continue if sector: cluster, created = Clusters.objects.get_or_create( name=cluster_name, account=account, site=cluster_site, sector=sector, defaults={ 'description': cluster_data.get('description', ''), 'status': 'active', } ) else: # If no sector, create cluster without sector filter but still require site cluster, created = Clusters.objects.get_or_create( name=cluster_name, account=account, site=cluster_site, sector__isnull=True, defaults={ 'description': cluster_data.get('description', ''), 'status': 'active', 'sector': None, } ) if created: clusters_created += 1 # Step 15: Keyword Matching & Assignment kw_step_start = time.time() # Assign keywords to cluster # Match keywords by keyword string (case-insensitive) from the already-loaded keywords list # Also create a mapping for fuzzy matching (handles minor variations) matched_keyword_objects = [] unmatched_keywords = [] # Create normalized versions for exact matching cluster_keywords_normalized = {} for kw in cluster_keywords: normalized = kw.strip().lower() cluster_keywords_normalized[normalized] = kw.strip() # Keep original for logging # Create a mapping of all available keywords (normalized) available_keywords_normalized = { kw_obj.keyword.strip().lower(): kw_obj for kw_obj in keywords } # First pass: exact matches (case-insensitive) for cluster_kw_normalized, cluster_kw_original in cluster_keywords_normalized.items(): if cluster_kw_normalized in available_keywords_normalized: matched_keyword_objects.append(available_keywords_normalized[cluster_kw_normalized]) else: unmatched_keywords.append(cluster_kw_original) # Log unmatched keywords for debugging if unmatched_keywords: logger.warning( f"Some keywords in cluster '{cluster_name}' were not matched: {unmatched_keywords}. " f"Available keywords: {[kw.keyword for kw in keywords]}" ) # Update matched keywords if matched_keyword_objects: matched_ids = [kw.id for kw in matched_keyword_objects] # Rebuild queryset inside transaction to avoid database connection issues # Handle sector=None case keyword_filter = Keywords.objects.filter( id__in=matched_ids, account=account ) if sector: keyword_filter = keyword_filter.filter(sector=sector) else: keyword_filter = keyword_filter.filter(sector__isnull=True) updated_count = keyword_filter.update( cluster=cluster, status='mapped' # Update status from pending to mapped ) keywords_updated += updated_count # Log steps 14 and 15 after all clusters are processed if response_steps is not None: response_steps.append({ 'stepNumber': 14, 'stepName': 'Cluster Creation/Update', 'functionName': '_auto_cluster_keywords_core', 'status': 'success', 'message': f'Created/updated {clusters_created} clusters', 'duration': int((time.time() - cluster_step_start) * 1000) }) response_steps.append({ 'stepNumber': 15, 'stepName': 'Keyword Matching & Assignment', 'functionName': '_auto_cluster_keywords_core', 'status': 'success', 'message': f'Assigned {keywords_updated} keywords to clusters', 'duration': 0 # Duration already included in step 14 }) # Step 16: Metrics Recalculation & Commit step_start = time.time() # Update progress: Recalculating metrics (90-95%) if progress_callback: progress_callback( state='PROGRESS', meta={ 'current': clusters_created, 'total': clusters_created, 'percentage': 90, 'message': 'Recalculating cluster metrics...', 'phase': 'finalizing', 'request_steps': request_steps, 'response_steps': response_steps } ) # Recalculate cluster metrics from django.db.models import Sum cluster_filter = Clusters.objects.filter(account=account) if sector: cluster_filter = cluster_filter.filter(sector=sector) else: cluster_filter = cluster_filter.filter(sector__isnull=True) for cluster in cluster_filter: cluster.keywords_count = Keywords.objects.filter(cluster=cluster).count() volume_sum = Keywords.objects.filter(cluster=cluster).aggregate( total=Sum('volume') )['total'] cluster.volume = volume_sum or 0 cluster.save() # Transaction commits here automatically if response_steps is not None: response_steps.append({ 'stepNumber': 16, 'stepName': 'Metrics Recalculation & Commit', 'functionName': '_auto_cluster_keywords_core', 'status': 'success', 'message': f'Recalculated metrics for {cluster_filter.count()} clusters, transaction committed', 'duration': int((time.time() - step_start) * 1000) }) # Final progress update final_message = f"Clustering complete: {clusters_created} clusters created, {keywords_updated} keywords updated" logger.info(final_message) tracker.done(final_message) if progress_callback: progress_callback( state='SUCCESS', meta={ 'message': final_message, 'request_steps': request_steps, 'response_steps': response_steps } ) return { 'success': True, 'clusters_created': clusters_created, 'keywords_updated': keywords_updated, 'message': final_message, 'request_steps': request_steps, 'response_steps': response_steps, } except Exception as e: error_msg = f"Error in auto_cluster_keywords_core: {str(e)}" logger.error(error_msg, exc_info=True) tracker.error('Exception', error_msg, exception=e) if progress_callback: progress_callback( state='FAILURE', meta={ 'error': str(e), 'message': f'Error: {str(e)}', 'request_steps': request_steps, 'response_steps': response_steps } ) return { 'success': False, 'error': str(e), 'request_steps': request_steps, 'response_steps': response_steps } @shared_task(bind=True, max_retries=3) # ============================================================================ # DEPRECATED: This Celery task is deprecated. Use run_ai_task instead. # New path: views.py -> run_ai_task -> AIEngine -> AutoClusterFunction # ============================================================================ def auto_cluster_keywords_task(self, keyword_ids: List[int], sector_id: int = None, account_id: int = None): """ [DEPRECATED] Celery task wrapper for clustering keywords using AI. ⚠️ WARNING: This task is deprecated. Use the new AI framework instead: - New path: views.py -> run_ai_task -> AIEngine -> AutoClusterFunction - This task uses the old _auto_cluster_keywords_core function - Console logging may not work correctly in this path Args: keyword_ids: List of keyword IDs to cluster sector_id: Sector ID for the keywords account_id: Account ID for account isolation """ logger.info("=" * 80) logger.info("auto_cluster_keywords_task STARTED") logger.info(f" - Task ID: {self.request.id}") logger.info(f" - keyword_ids: {keyword_ids}") logger.info(f" - sector_id: {sector_id}") logger.info(f" - account_id: {account_id}") logger.info("=" * 80) # Initialize request_steps and response_steps for error reporting request_steps = [] response_steps = [] def progress_callback(state, meta): # Capture request_steps and response_steps from meta if available nonlocal request_steps, response_steps if isinstance(meta, dict): if 'request_steps' in meta: request_steps = meta['request_steps'] if 'response_steps' in meta: response_steps = meta['response_steps'] self.update_state(state=state, meta=meta) try: result = _auto_cluster_keywords_core(keyword_ids, sector_id, account_id, progress_callback) logger.info(f"auto_cluster_keywords_task COMPLETED: {result}") return result except Exception as e: error_type = type(e).__name__ error_msg = str(e) # Log full error details logger.error("=" * 80) logger.error(f"auto_cluster_keywords_task FAILED: {error_type}: {error_msg}") logger.error(f" - Task ID: {self.request.id}") logger.error(f" - keyword_ids: {keyword_ids}") logger.error(f" - sector_id: {sector_id}") logger.error(f" - account_id: {account_id}") logger.error("=" * 80, exc_info=True) # Create detailed error dict that Celery can serialize error_dict = { 'error': error_msg, 'error_type': error_type, 'error_class': error_type, 'message': f'{error_type}: {error_msg}', 'request_steps': request_steps, 'response_steps': response_steps, 'task_id': str(self.request.id), 'keyword_ids': keyword_ids, 'sector_id': sector_id, 'account_id': account_id } # Update task state with detailed error try: self.update_state( state='FAILURE', meta=error_dict ) except Exception as update_error: # If update_state fails, log it but continue logger.error(f"Failed to update task state: {str(update_error)}") # Return error result return error_dict @shared_task(bind=True, max_retries=3) def auto_generate_ideas_task(self, cluster_ids: List[int], account_id: int = None): """ Celery task to generate content ideas for clusters using AI. Args: cluster_ids: List of cluster IDs account_id: Account ID for account isolation """ import sys print("=" * 80, flush=True, file=sys.stdout) print("[CELERY TASK] auto_generate_ideas_task STARTED", flush=True, file=sys.stdout) print(f"[CELERY TASK] Task ID: {self.request.id}", flush=True, file=sys.stdout) print(f"[CELERY TASK] cluster_ids: {cluster_ids}", flush=True, file=sys.stdout) print(f"[CELERY TASK] account_id: {account_id}", flush=True, file=sys.stdout) print("=" * 80, flush=True, file=sys.stdout) account_id = account_id logger.info("=" * 80) logger.info("auto_generate_ideas_task STARTED") logger.info(f" - Task ID: {self.request.id}") logger.info(f" - cluster_ids: {cluster_ids}") logger.info(f" - account_id: {account_id}") logger.info("=" * 80) try: from django.db import models from django.db import connection # Log database connection status try: connection.ensure_connection() logger.info("Database connection: OK") except Exception as db_error: logger.error(f"Database connection error: {type(db_error).__name__}: {str(db_error)}") raise # Initialize progress logger.info("Initializing task progress state...") self.update_state( state='PROGRESS', meta={ 'current': 0, 'total': len(cluster_ids), 'percentage': 0, 'message': 'Initializing content ideas generation...', 'phase': 'initializing' } ) # Get clusters with keywords and relationships (including site) logger.info(f"Querying clusters with IDs: {cluster_ids}") try: clusters_queryset = Clusters.objects.filter(id__in=cluster_ids) logger.info(f"Initial queryset count: {clusters_queryset.count()}") if account_id: clusters_queryset = clusters_queryset.filter(account_id=account_id) logger.info(f"After account filter count: {clusters_queryset.count()}") logger.info("Loading clusters with select_related...") clusters = list(clusters_queryset.select_related('sector', 'account', 'site', 'sector__site')) logger.info(f"Successfully loaded {len(clusters)} clusters") # Log each cluster's details for c in clusters: account = getattr(c, 'account', None) logger.info(f" Cluster {c.id}: name='{c.name}', account_id={account.id if account else 'None'}, site_id={c.site_id if c.site else 'None'}, sector_id={c.sector_id if c.sector else 'None'}") except Exception as query_error: logger.error(f"Error querying clusters: {type(query_error).__name__}: {str(query_error)}", exc_info=True) raise if not clusters: logger.warning(f"No clusters found: {cluster_ids}") return {'success': False, 'error': 'No clusters found'} total_clusters = len(clusters) # Update progress: Preparing clusters (0-10%) self.update_state( state='PROGRESS', meta={ 'current': 0, 'total': total_clusters, 'percentage': 5, 'message': f'Preparing {total_clusters} clusters for idea generation...', 'phase': 'preparing' } ) # Format cluster data for AI cluster_data = [] for idx, cluster in enumerate(clusters): # Get keywords for this cluster keywords = Keywords.objects.filter(cluster=cluster).values_list('keyword', flat=True) keywords_list = list(keywords) cluster_item = { 'id': cluster.id, 'name': cluster.name, 'description': cluster.description or '', 'keywords': keywords_list, } cluster_data.append(cluster_item) # Log cluster data being sent to AI logger.info(f"Cluster {idx + 1}/{total_clusters} data for AI:") logger.info(f" - ID: {cluster_item['id']}") logger.info(f" - Name: {cluster_item['name']}") logger.info(f" - Description: {cluster_item['description'][:100] if cluster_item['description'] else '(empty)'}...") logger.info(f" - Keywords count: {len(keywords_list)}") logger.info(f" - Keywords: {keywords_list[:5]}{'...' if len(keywords_list) > 5 else ''}") account = getattr(cluster, 'account', None) logger.info(f" - Cluster account: {account.id if account else 'None'}") logger.info(f" - Cluster site: {cluster.site_id if cluster.site else 'None'}") logger.info(f" - Cluster sector: {cluster.sector_id if cluster.sector else 'None'}") # Update progress for each cluster preparation progress_pct = 5 + int((idx / total_clusters) * 5) self.update_state( state='PROGRESS', meta={ 'current': idx + 1, 'total': total_clusters, 'percentage': progress_pct, 'message': f"Preparing cluster '{cluster.name}' ({idx + 1} of {total_clusters})...", 'phase': 'preparing', 'current_item': cluster.name } ) # Log clean request data before sending to AI logger.info("=" * 80) logger.info("CLEAN REQUEST DATA FOR AI (before sending request):") logger.info("=" * 80) import json clean_data = { 'total_clusters': len(cluster_data), 'clusters': [ { 'id': c['id'], 'name': c['name'], 'description': c['description'][:200] if c['description'] else '(empty)', 'keywords_count': len(c['keywords']), 'keywords': c['keywords'], } for c in cluster_data ] } logger.info(json.dumps(clean_data, indent=2)) logger.info("=" * 80) # Update progress: Generating ideas with AI (10-80%) self.update_state( state='PROGRESS', meta={ 'current': 0, 'total': total_clusters, 'percentage': 10, 'message': 'Generating content ideas with AI...', 'phase': 'generating' } ) # Use new AI framework with proper logging account = clusters[0].account if clusters else None account_id = account.id if account else None # Process each cluster using the new framework from igny8_core.ai.functions.generate_ideas import generate_ideas_core from igny8_core.ai.tasks import run_ai_task ideas_created = 0 all_ideas = [] # Process each cluster individually using the new framework for idx, cluster in enumerate(clusters): cluster_id = cluster.id logger.info(f"Processing cluster {idx + 1}/{total_clusters}: {cluster_id}") # Update progress progress_pct = 10 + int((idx / total_clusters) * 70) self.update_state( state='PROGRESS', meta={ 'current': idx + 1, 'total': total_clusters, 'percentage': progress_pct, 'message': f'Generating idea for cluster "{cluster.name}" ({idx + 1} of {total_clusters})...', 'phase': 'generating', 'current_item': cluster.name } ) # Use new framework - always use generate_ideas_core for proper console logging try: import sys print(f"[CELERY TASK] Calling generate_ideas_core for cluster {cluster_id}...", flush=True, file=sys.stdout) # Use generate_ideas_core which has ConsoleStepTracker built in result = generate_ideas_core(cluster_id, account_id=account_id) print(f"[CELERY TASK] generate_ideas_core returned: success={result.get('success')}, error={result.get('error')}", flush=True, file=sys.stdout) if result.get('success'): ideas_created += result.get('idea_created', 0) logger.info(f"✓ Successfully generated idea for cluster {cluster_id}") print(f"[CELERY TASK] ✓ Successfully generated idea for cluster {cluster_id}", flush=True, file=sys.stdout) else: error_msg = result.get('error', 'Unknown error') logger.error(f"✗ Failed to generate idea for cluster {cluster_id}: {error_msg}") print(f"[CELERY TASK] ✗ Failed to generate idea for cluster {cluster_id}: {error_msg}", flush=True, file=sys.stdout) # Update task state with error for this cluster self.update_state( state='PROGRESS', meta={ 'current': idx + 1, 'total': total_clusters, 'percentage': progress_pct, 'message': f'Error generating idea for cluster "{cluster.name}": {error_msg}', 'phase': 'error', 'current_item': cluster.name, 'error': error_msg, 'error_type': result.get('error_type', 'GenerationError') } ) except Exception as e: import sys import traceback error_msg = str(e) error_type = type(e).__name__ print("=" * 80, flush=True, file=sys.stdout) print(f"[CELERY TASK] EXCEPTION in generate_ideas_core call: {error_type}: {error_msg}", flush=True, file=sys.stdout) print("[CELERY TASK] Full traceback:", flush=True, file=sys.stdout) traceback.print_exc(file=sys.stdout) print("=" * 80, flush=True, file=sys.stdout) logger.error(f"✗ Error generating idea for cluster {cluster_id}: {error_msg}", exc_info=True) # Update task state with exception self.update_state( state='PROGRESS', meta={ 'current': idx + 1, 'total': total_clusters, 'percentage': progress_pct, 'message': f'Exception generating idea for cluster "{cluster.name}": {error_msg}', 'phase': 'error', 'current_item': cluster.name, 'error': error_msg, 'error_type': error_type } ) # Ideas are already saved by the new framework, just log results logger.info("=" * 80) logger.info(f"IDEAS GENERATION COMPLETE: {ideas_created} ideas created") logger.info("=" * 80) if ideas_created == 0: logger.warning("No ideas were created") self.update_state( state='FAILURE', meta={ 'error': 'No ideas created', 'message': 'No ideas were created' } ) return {'success': False, 'error': 'No ideas created'} # Final progress update final_message = f"Ideas generation complete: {ideas_created} ideas created for {total_clusters} clusters" logger.info(final_message) return { 'success': True, 'ideas_created': ideas_created, 'message': final_message, } except Exception as e: logger.error(f"Error in auto_generate_ideas_task: {str(e)}", exc_info=True) self.update_state( state='FAILURE', meta={ 'error': str(e), 'message': f'Error: {str(e)}' } ) raise def _generate_single_idea_core(cluster_id: int, account_id: int = None, progress_callback=None): """ Core logic for generating a single content idea for a cluster. Can be called with or without Celery. Args: cluster_id: Cluster ID to generate idea for account_id: Account ID for account isolation progress_callback: Optional function to call for progress updates (for Celery tasks) """ account_id = account_id try: # Initialize progress if callback provided if progress_callback: progress_callback( state='PROGRESS', meta={ 'current': 0, 'total': 1, 'percentage': 0, 'message': 'Initializing single idea generation...', 'phase': 'initializing' } ) # Get cluster with keywords and relationships clusters_queryset = Clusters.objects.filter(id=cluster_id) if account_id: clusters_queryset = clusters_queryset.filter(account_id=account_id) clusters = list(clusters_queryset.select_related('sector', 'account', 'site').prefetch_related('keywords')) if not clusters: logger.warning(f"Cluster not found: {cluster_id}") return {'success': False, 'error': 'Cluster not found'} cluster = clusters[0] # Update progress: Preparing cluster (0-10%) if progress_callback: progress_callback( state='PROGRESS', meta={ 'current': 0, 'total': 1, 'percentage': 5, 'message': f'Preparing cluster "{cluster.name}"...', 'phase': 'preparing', 'current_item': cluster.name } ) # Get keywords for this cluster keywords = Keywords.objects.filter(cluster=cluster).values_list('keyword', flat=True) # Format cluster data for AI cluster_data = [{ 'id': cluster.id, 'name': cluster.name, 'description': cluster.description or '', 'keywords': list(keywords), }] # Update progress: Generating idea with AI (10-80%) if progress_callback: progress_callback( state='PROGRESS', meta={ 'current': 0, 'total': 1, 'percentage': 10, 'message': 'Generating content idea with AI...', 'phase': 'generating' } ) # Use new AI framework with proper logging account = getattr(cluster, 'account', None) account_id = account.id if account else None # Use new framework with proper console logging from igny8_core.ai.functions.generate_ideas import generate_ideas_core # Use generate_ideas_core which has ConsoleStepTracker built in try: result = generate_ideas_core(cluster_id, account_id=account_id, progress_callback=progress_callback) except Exception as e: logger.error(f"Error generating idea: {str(e)}", exc_info=True) return {'success': False, 'error': str(e)} if not result.get('success'): error_msg = result.get('error', 'Unknown error') logger.error(f"AI idea generation error: {error_msg}") return {'success': False, 'error': error_msg} idea_created = result.get('idea_created', 0) or result.get('ideas_created', 0) if progress_callback: progress_callback( state='PROGRESS', meta={ 'current': 1, 'total': 1, 'percentage': 95, 'message': 'Idea generation complete', 'phase': 'complete' } ) # Ideas are already saved by the new framework (generate_ideas_core or run_ai_task) # No need to save again here # Final progress update final_message = f"Idea generation complete: {idea_created} idea(s) created" logger.info(final_message) if progress_callback: progress_callback( state='SUCCESS', meta={ 'current': 1, 'total': 1, 'percentage': 100, 'message': final_message, 'phase': 'completed' } ) return { 'success': True, 'idea_created': idea_created, 'message': final_message, } except Exception as e: logger.error(f"Error in _generate_single_idea_core: {str(e)}", exc_info=True) if progress_callback: progress_callback( state='FAILURE', meta={ 'error': str(e), 'message': f'Error: {str(e)}' } ) return {'success': False, 'error': str(e)} @shared_task(bind=True, max_retries=3) def generate_single_idea_task(self, cluster_id: int, account_id: int = None): """ Celery task to generate a single content idea for a cluster using AI. Args: cluster_id: Cluster ID account_id: Account ID for account isolation """ def progress_callback(state, meta): self.update_state(state=state, meta=meta) return _generate_single_idea_core(cluster_id, account_id, progress_callback)