#!/usr/bin/env python """ Diagnostic script for generate_content function issues Tests each layer of the content generation pipeline to identify where it's failing """ import os import sys import django import logging # Setup Django os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'igny8_core.settings') django.setup() from igny8_core.auth.models import Account from igny8_core.modules.writer.models import Tasks, Content from igny8_core.modules.system.models import IntegrationSettings from igny8_core.ai.registry import get_function_instance from igny8_core.ai.engine import AIEngine from igny8_core.business.content.services.content_generation_service import ContentGenerationService # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s' ) logger = logging.getLogger(__name__) def print_section(title): """Print a section header""" print("\n" + "=" * 80) print(f" {title}") print("=" * 80 + "\n") def test_prerequisites(): """Test that prerequisites are met""" print_section("1. TESTING PREREQUISITES") # Check if account exists try: account = Account.objects.first() if not account: print("❌ FAIL: No account found in database") return None print(f"✅ PASS: Found account: {account.id} ({account.email})") except Exception as e: print(f"❌ FAIL: Error getting account: {e}") return None # Check OpenAI integration settings try: openai_settings = IntegrationSettings.objects.filter( integration_type='openai', account=account, is_active=True ).first() if not openai_settings: print("❌ FAIL: No active OpenAI integration settings found") return None if not openai_settings.config or not openai_settings.config.get('apiKey'): print("❌ FAIL: OpenAI API key not configured in IntegrationSettings") return None api_key_preview = openai_settings.config['apiKey'][:10] + "..." if openai_settings.config.get('apiKey') else "None" model = openai_settings.config.get('model', 'Not set') print(f"✅ PASS: OpenAI settings found (API key: {api_key_preview}, Model: {model})") except Exception as e: print(f"❌ FAIL: Error checking OpenAI settings: {e}") return None # Check if tasks exist try: tasks = Tasks.objects.filter(account=account, status='pending')[:5] task_count = tasks.count() if task_count == 0: print("⚠️ WARNING: No pending tasks found, will try to use any task") tasks = Tasks.objects.filter(account=account)[:5] task_count = tasks.count() if task_count == 0: print("❌ FAIL: No tasks found at all") return None print(f"✅ PASS: Found {task_count} task(s)") for task in tasks: print(f" - Task {task.id}: {task.title or 'Untitled'} (status: {task.status})") except Exception as e: print(f"❌ FAIL: Error getting tasks: {e}") return None return { 'account': account, 'tasks': list(tasks), 'openai_settings': openai_settings } def test_function_registry(): """Test that the generate_content function is registered""" print_section("2. TESTING FUNCTION REGISTRY") try: fn = get_function_instance('generate_content') if not fn: print("❌ FAIL: generate_content function not found in registry") return False print(f"✅ PASS: Function registered: {fn.get_name()}") metadata = fn.get_metadata() print(f" - Display name: {metadata.get('display_name')}") print(f" - Description: {metadata.get('description')}") return True except Exception as e: print(f"❌ FAIL: Error loading function: {e}") import traceback traceback.print_exc() return False def test_function_validation(context): """Test function validation""" print_section("3. TESTING FUNCTION VALIDATION") try: fn = get_function_instance('generate_content') account = context['account'] task = context['tasks'][0] payload = {'ids': [task.id]} print(f"Testing with payload: {payload}") result = fn.validate(payload, account) if result['valid']: print(f"✅ PASS: Validation succeeded") else: print(f"❌ FAIL: Validation failed: {result.get('error')}") return False return True except Exception as e: print(f"❌ FAIL: Error during validation: {e}") import traceback traceback.print_exc() return False def test_function_prepare(context): """Test function prepare phase""" print_section("4. TESTING FUNCTION PREPARE") try: fn = get_function_instance('generate_content') account = context['account'] task = context['tasks'][0] payload = {'ids': [task.id]} print(f"Preparing task {task.id}: {task.title or 'Untitled'}") data = fn.prepare(payload, account) if not data: print("❌ FAIL: Prepare returned no data") return False if isinstance(data, list): print(f"✅ PASS: Prepared {len(data)} task(s)") for t in data: print(f" - Task {t.id}: {t.title or 'Untitled'}") print(f" Cluster: {t.cluster.name if t.cluster else 'None'}") print(f" Taxonomy: {t.taxonomy_term.name if t.taxonomy_term else 'None'}") print(f" Keywords: {t.keywords.count()} keyword(s)") else: print(f"✅ PASS: Prepared data: {type(data)}") context['prepared_data'] = data return True except Exception as e: print(f"❌ FAIL: Error during prepare: {e}") import traceback traceback.print_exc() return False def test_function_build_prompt(context): """Test prompt building""" print_section("5. TESTING PROMPT BUILDING") try: fn = get_function_instance('generate_content') account = context['account'] data = context['prepared_data'] prompt = fn.build_prompt(data, account) if not prompt: print("❌ FAIL: No prompt generated") return False print(f"✅ PASS: Prompt generated ({len(prompt)} characters)") print("\nPrompt preview (first 500 chars):") print("-" * 80) print(prompt[:500]) if len(prompt) > 500: print(f"\n... ({len(prompt) - 500} more characters)") print("-" * 80) context['prompt'] = prompt return True except Exception as e: print(f"❌ FAIL: Error building prompt: {e}") import traceback traceback.print_exc() return False def test_model_config(context): """Test model configuration""" print_section("6. TESTING MODEL CONFIGURATION") try: from igny8_core.ai.settings import get_model_config account = context['account'] model_config = get_model_config('generate_content', account=account) if not model_config: print("❌ FAIL: No model config returned") return False print(f"✅ PASS: Model configuration loaded") print(f" - Model: {model_config.get('model')}") print(f" - Max tokens: {model_config.get('max_tokens')}") print(f" - Temperature: {model_config.get('temperature')}") print(f" - Response format: {model_config.get('response_format')}") context['model_config'] = model_config return True except Exception as e: print(f"❌ FAIL: Error getting model config: {e}") import traceback traceback.print_exc() return False def test_ai_core_request(context): """Test AI core request (actual API call)""" print_section("7. TESTING AI CORE REQUEST (ACTUAL API CALL)") # Ask user for confirmation print("⚠️ WARNING: This will make an actual API call to OpenAI and cost money!") print("Do you want to proceed? (yes/no): ", end='') response = input().strip().lower() if response != 'yes': print("Skipping API call test") return True try: from igny8_core.ai.ai_core import AICore account = context['account'] prompt = context['prompt'] model_config = context['model_config'] # Use a shorter test prompt to save costs test_prompt = prompt[:1000] + "\n\n[TEST MODE - Generate only title and first paragraph]" print(f"Making test API call with shortened prompt ({len(test_prompt)} chars)...") ai_core = AICore(account=account) result = ai_core.run_ai_request( prompt=test_prompt, model=model_config['model'], max_tokens=500, # Limit tokens for testing temperature=model_config.get('temperature', 0.7), response_format=model_config.get('response_format'), function_name='generate_content_test' ) if result.get('error'): print(f"❌ FAIL: API call returned error: {result['error']}") return False if not result.get('content'): print(f"❌ FAIL: API call returned no content") return False print(f"✅ PASS: API call successful") print(f" - Tokens: {result.get('total_tokens', 0)}") print(f" - Cost: ${result.get('cost', 0):.6f}") print(f" - Model: {result.get('model')}") print(f"\nContent preview (first 300 chars):") print("-" * 80) print(result['content'][:300]) print("-" * 80) context['ai_response'] = result return True except Exception as e: print(f"❌ FAIL: Error during API call: {e}") import traceback traceback.print_exc() return False def test_service_layer(context): """Test the content generation service""" print_section("8. TESTING CONTENT GENERATION SERVICE") print("⚠️ WARNING: This will make a full API call and create content!") print("Do you want to proceed? (yes/no): ", end='') response = input().strip().lower() if response != 'yes': print("Skipping service test") return True try: account = context['account'] task = context['tasks'][0] service = ContentGenerationService() print(f"Calling generate_content with task {task.id}...") result = service.generate_content([task.id], account) if not result: print("❌ FAIL: Service returned None") return False if not result.get('success'): print(f"❌ FAIL: Service failed: {result.get('error')}") return False print(f"✅ PASS: Service call successful") if 'task_id' in result: print(f" - Celery task ID: {result['task_id']}") print(f" - Message: {result.get('message')}") print("\n⚠️ Note: Content generation is running in background (Celery)") print(" Check Celery logs for actual execution status") else: print(f" - Content created: {result.get('content_id')}") print(f" - Word count: {result.get('word_count')}") return True except Exception as e: print(f"❌ FAIL: Error in service layer: {e}") import traceback traceback.print_exc() return False def main(): """Run all diagnostic tests""" print("\n" + "=" * 80) print(" GENERATE_CONTENT DIAGNOSTIC TOOL") print("=" * 80) print("\nThis tool will test each layer of the content generation pipeline") print("to identify where the function is failing.") # Run tests context = test_prerequisites() if not context: print("\n❌ FATAL: Prerequisites test failed. Cannot continue.") return if not test_function_registry(): print("\n❌ FATAL: Function registry test failed. Cannot continue.") return if not test_function_validation(context): print("\n❌ FATAL: Validation test failed. Cannot continue.") return if not test_function_prepare(context): print("\n❌ FATAL: Prepare test failed. Cannot continue.") return if not test_function_build_prompt(context): print("\n❌ FATAL: Prompt building test failed. Cannot continue.") return if not test_model_config(context): print("\n❌ FATAL: Model config test failed. Cannot continue.") return # Optional tests (require API calls) test_ai_core_request(context) test_service_layer(context) print_section("DIAGNOSTIC COMPLETE") print("Review the results above to identify where the generate_content") print("function is failing.\n") if __name__ == '__main__': main()