394 lines
13 KiB
Python
394 lines
13 KiB
Python
#!/usr/bin/env python
|
|
"""
|
|
Diagnostic script for generate_content function issues
|
|
Tests each layer of the content generation pipeline to identify where it's failing
|
|
"""
|
|
import os
|
|
import sys
|
|
import django
|
|
import logging
|
|
|
|
# Setup Django
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'igny8_core.settings')
|
|
django.setup()
|
|
|
|
from igny8_core.auth.models import Account
|
|
from igny8_core.modules.writer.models import Tasks, Content
|
|
from igny8_core.modules.system.models import IntegrationSettings
|
|
from igny8_core.ai.registry import get_function_instance
|
|
from igny8_core.ai.engine import AIEngine
|
|
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def print_section(title):
|
|
"""Print a section header"""
|
|
print("\n" + "=" * 80)
|
|
print(f" {title}")
|
|
print("=" * 80 + "\n")
|
|
|
|
def test_prerequisites():
|
|
"""Test that prerequisites are met"""
|
|
print_section("1. TESTING PREREQUISITES")
|
|
|
|
# Check if account exists
|
|
try:
|
|
account = Account.objects.first()
|
|
if not account:
|
|
print("❌ FAIL: No account found in database")
|
|
return None
|
|
print(f"✅ PASS: Found account: {account.id} ({account.email})")
|
|
except Exception as e:
|
|
print(f"❌ FAIL: Error getting account: {e}")
|
|
return None
|
|
|
|
# Check OpenAI integration settings
|
|
try:
|
|
openai_settings = IntegrationSettings.objects.filter(
|
|
integration_type='openai',
|
|
account=account,
|
|
is_active=True
|
|
).first()
|
|
|
|
if not openai_settings:
|
|
print("❌ FAIL: No active OpenAI integration settings found")
|
|
return None
|
|
|
|
if not openai_settings.config or not openai_settings.config.get('apiKey'):
|
|
print("❌ FAIL: OpenAI API key not configured in IntegrationSettings")
|
|
return None
|
|
|
|
api_key_preview = openai_settings.config['apiKey'][:10] + "..." if openai_settings.config.get('apiKey') else "None"
|
|
model = openai_settings.config.get('model', 'Not set')
|
|
print(f"✅ PASS: OpenAI settings found (API key: {api_key_preview}, Model: {model})")
|
|
except Exception as e:
|
|
print(f"❌ FAIL: Error checking OpenAI settings: {e}")
|
|
return None
|
|
|
|
# Check if tasks exist
|
|
try:
|
|
tasks = Tasks.objects.filter(account=account, status='pending')[:5]
|
|
task_count = tasks.count()
|
|
|
|
if task_count == 0:
|
|
print("⚠️ WARNING: No pending tasks found, will try to use any task")
|
|
tasks = Tasks.objects.filter(account=account)[:5]
|
|
task_count = tasks.count()
|
|
|
|
if task_count == 0:
|
|
print("❌ FAIL: No tasks found at all")
|
|
return None
|
|
|
|
print(f"✅ PASS: Found {task_count} task(s)")
|
|
for task in tasks:
|
|
print(f" - Task {task.id}: {task.title or 'Untitled'} (status: {task.status})")
|
|
except Exception as e:
|
|
print(f"❌ FAIL: Error getting tasks: {e}")
|
|
return None
|
|
|
|
return {
|
|
'account': account,
|
|
'tasks': list(tasks),
|
|
'openai_settings': openai_settings
|
|
}
|
|
|
|
def test_function_registry():
|
|
"""Test that the generate_content function is registered"""
|
|
print_section("2. TESTING FUNCTION REGISTRY")
|
|
|
|
try:
|
|
fn = get_function_instance('generate_content')
|
|
if not fn:
|
|
print("❌ FAIL: generate_content function not found in registry")
|
|
return False
|
|
|
|
print(f"✅ PASS: Function registered: {fn.get_name()}")
|
|
metadata = fn.get_metadata()
|
|
print(f" - Display name: {metadata.get('display_name')}")
|
|
print(f" - Description: {metadata.get('description')}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ FAIL: Error loading function: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def test_function_validation(context):
|
|
"""Test function validation"""
|
|
print_section("3. TESTING FUNCTION VALIDATION")
|
|
|
|
try:
|
|
fn = get_function_instance('generate_content')
|
|
account = context['account']
|
|
task = context['tasks'][0]
|
|
|
|
payload = {'ids': [task.id]}
|
|
print(f"Testing with payload: {payload}")
|
|
|
|
result = fn.validate(payload, account)
|
|
|
|
if result['valid']:
|
|
print(f"✅ PASS: Validation succeeded")
|
|
else:
|
|
print(f"❌ FAIL: Validation failed: {result.get('error')}")
|
|
return False
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ FAIL: Error during validation: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def test_function_prepare(context):
|
|
"""Test function prepare phase"""
|
|
print_section("4. TESTING FUNCTION PREPARE")
|
|
|
|
try:
|
|
fn = get_function_instance('generate_content')
|
|
account = context['account']
|
|
task = context['tasks'][0]
|
|
|
|
payload = {'ids': [task.id]}
|
|
print(f"Preparing task {task.id}: {task.title or 'Untitled'}")
|
|
|
|
data = fn.prepare(payload, account)
|
|
|
|
if not data:
|
|
print("❌ FAIL: Prepare returned no data")
|
|
return False
|
|
|
|
if isinstance(data, list):
|
|
print(f"✅ PASS: Prepared {len(data)} task(s)")
|
|
for t in data:
|
|
print(f" - Task {t.id}: {t.title or 'Untitled'}")
|
|
print(f" Cluster: {t.cluster.name if t.cluster else 'None'}")
|
|
print(f" Taxonomy: {t.taxonomy_term.name if t.taxonomy_term else 'None'}")
|
|
print(f" Keywords: {t.keywords.count()} keyword(s)")
|
|
else:
|
|
print(f"✅ PASS: Prepared data: {type(data)}")
|
|
|
|
context['prepared_data'] = data
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ FAIL: Error during prepare: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def test_function_build_prompt(context):
|
|
"""Test prompt building"""
|
|
print_section("5. TESTING PROMPT BUILDING")
|
|
|
|
try:
|
|
fn = get_function_instance('generate_content')
|
|
account = context['account']
|
|
data = context['prepared_data']
|
|
|
|
prompt = fn.build_prompt(data, account)
|
|
|
|
if not prompt:
|
|
print("❌ FAIL: No prompt generated")
|
|
return False
|
|
|
|
print(f"✅ PASS: Prompt generated ({len(prompt)} characters)")
|
|
print("\nPrompt preview (first 500 chars):")
|
|
print("-" * 80)
|
|
print(prompt[:500])
|
|
if len(prompt) > 500:
|
|
print(f"\n... ({len(prompt) - 500} more characters)")
|
|
print("-" * 80)
|
|
|
|
context['prompt'] = prompt
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ FAIL: Error building prompt: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def test_model_config(context):
|
|
"""Test model configuration"""
|
|
print_section("6. TESTING MODEL CONFIGURATION")
|
|
|
|
try:
|
|
from igny8_core.ai.settings import get_model_config
|
|
account = context['account']
|
|
|
|
model_config = get_model_config('generate_content', account=account)
|
|
|
|
if not model_config:
|
|
print("❌ FAIL: No model config returned")
|
|
return False
|
|
|
|
print(f"✅ PASS: Model configuration loaded")
|
|
print(f" - Model: {model_config.get('model')}")
|
|
print(f" - Max tokens: {model_config.get('max_tokens')}")
|
|
print(f" - Temperature: {model_config.get('temperature')}")
|
|
print(f" - Response format: {model_config.get('response_format')}")
|
|
|
|
context['model_config'] = model_config
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ FAIL: Error getting model config: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def test_ai_core_request(context):
|
|
"""Test AI core request (actual API call)"""
|
|
print_section("7. TESTING AI CORE REQUEST (ACTUAL API CALL)")
|
|
|
|
# Ask user for confirmation
|
|
print("⚠️ WARNING: This will make an actual API call to OpenAI and cost money!")
|
|
print("Do you want to proceed? (yes/no): ", end='')
|
|
response = input().strip().lower()
|
|
|
|
if response != 'yes':
|
|
print("Skipping API call test")
|
|
return True
|
|
|
|
try:
|
|
from igny8_core.ai.ai_core import AICore
|
|
account = context['account']
|
|
prompt = context['prompt']
|
|
model_config = context['model_config']
|
|
|
|
# Use a shorter test prompt to save costs
|
|
test_prompt = prompt[:1000] + "\n\n[TEST MODE - Generate only title and first paragraph]"
|
|
|
|
print(f"Making test API call with shortened prompt ({len(test_prompt)} chars)...")
|
|
|
|
ai_core = AICore(account=account)
|
|
result = ai_core.run_ai_request(
|
|
prompt=test_prompt,
|
|
model=model_config['model'],
|
|
max_tokens=500, # Limit tokens for testing
|
|
temperature=model_config.get('temperature', 0.7),
|
|
response_format=model_config.get('response_format'),
|
|
function_name='generate_content_test'
|
|
)
|
|
|
|
if result.get('error'):
|
|
print(f"❌ FAIL: API call returned error: {result['error']}")
|
|
return False
|
|
|
|
if not result.get('content'):
|
|
print(f"❌ FAIL: API call returned no content")
|
|
return False
|
|
|
|
print(f"✅ PASS: API call successful")
|
|
print(f" - Tokens: {result.get('total_tokens', 0)}")
|
|
print(f" - Cost: ${result.get('cost', 0):.6f}")
|
|
print(f" - Model: {result.get('model')}")
|
|
print(f"\nContent preview (first 300 chars):")
|
|
print("-" * 80)
|
|
print(result['content'][:300])
|
|
print("-" * 80)
|
|
|
|
context['ai_response'] = result
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ FAIL: Error during API call: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def test_service_layer(context):
|
|
"""Test the content generation service"""
|
|
print_section("8. TESTING CONTENT GENERATION SERVICE")
|
|
|
|
print("⚠️ WARNING: This will make a full API call and create content!")
|
|
print("Do you want to proceed? (yes/no): ", end='')
|
|
response = input().strip().lower()
|
|
|
|
if response != 'yes':
|
|
print("Skipping service test")
|
|
return True
|
|
|
|
try:
|
|
account = context['account']
|
|
task = context['tasks'][0]
|
|
|
|
service = ContentGenerationService()
|
|
|
|
print(f"Calling generate_content with task {task.id}...")
|
|
|
|
result = service.generate_content([task.id], account)
|
|
|
|
if not result:
|
|
print("❌ FAIL: Service returned None")
|
|
return False
|
|
|
|
if not result.get('success'):
|
|
print(f"❌ FAIL: Service failed: {result.get('error')}")
|
|
return False
|
|
|
|
print(f"✅ PASS: Service call successful")
|
|
|
|
if 'task_id' in result:
|
|
print(f" - Celery task ID: {result['task_id']}")
|
|
print(f" - Message: {result.get('message')}")
|
|
print("\n⚠️ Note: Content generation is running in background (Celery)")
|
|
print(" Check Celery logs for actual execution status")
|
|
else:
|
|
print(f" - Content created: {result.get('content_id')}")
|
|
print(f" - Word count: {result.get('word_count')}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ FAIL: Error in service layer: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def main():
|
|
"""Run all diagnostic tests"""
|
|
print("\n" + "=" * 80)
|
|
print(" GENERATE_CONTENT DIAGNOSTIC TOOL")
|
|
print("=" * 80)
|
|
print("\nThis tool will test each layer of the content generation pipeline")
|
|
print("to identify where the function is failing.")
|
|
|
|
# Run tests
|
|
context = test_prerequisites()
|
|
if not context:
|
|
print("\n❌ FATAL: Prerequisites test failed. Cannot continue.")
|
|
return
|
|
|
|
if not test_function_registry():
|
|
print("\n❌ FATAL: Function registry test failed. Cannot continue.")
|
|
return
|
|
|
|
if not test_function_validation(context):
|
|
print("\n❌ FATAL: Validation test failed. Cannot continue.")
|
|
return
|
|
|
|
if not test_function_prepare(context):
|
|
print("\n❌ FATAL: Prepare test failed. Cannot continue.")
|
|
return
|
|
|
|
if not test_function_build_prompt(context):
|
|
print("\n❌ FATAL: Prompt building test failed. Cannot continue.")
|
|
return
|
|
|
|
if not test_model_config(context):
|
|
print("\n❌ FATAL: Model config test failed. Cannot continue.")
|
|
return
|
|
|
|
# Optional tests (require API calls)
|
|
test_ai_core_request(context)
|
|
test_service_layer(context)
|
|
|
|
print_section("DIAGNOSTIC COMPLETE")
|
|
print("Review the results above to identify where the generate_content")
|
|
print("function is failing.\n")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|