This commit is contained in:
IGNY8 VPS (Salman)
2025-11-26 19:14:30 +00:00
parent f88aae78b1
commit 94a8aee0e2
12 changed files with 889 additions and 53 deletions

View File

@@ -0,0 +1,393 @@
#!/usr/bin/env python
"""
Diagnostic script for generate_content function issues
Tests each layer of the content generation pipeline to identify where it's failing
"""
import os
import sys
import django
import logging
# Setup Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'igny8_core.settings')
django.setup()
from igny8_core.auth.models import Account
from igny8_core.modules.writer.models import Tasks, Content
from igny8_core.modules.system.models import IntegrationSettings
from igny8_core.ai.registry import get_function_instance
from igny8_core.ai.engine import AIEngine
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
logger = logging.getLogger(__name__)
def print_section(title):
"""Print a section header"""
print("\n" + "=" * 80)
print(f" {title}")
print("=" * 80 + "\n")
def test_prerequisites():
"""Test that prerequisites are met"""
print_section("1. TESTING PREREQUISITES")
# Check if account exists
try:
account = Account.objects.first()
if not account:
print("❌ FAIL: No account found in database")
return None
print(f"✅ PASS: Found account: {account.id} ({account.email})")
except Exception as e:
print(f"❌ FAIL: Error getting account: {e}")
return None
# Check OpenAI integration settings
try:
openai_settings = IntegrationSettings.objects.filter(
integration_type='openai',
account=account,
is_active=True
).first()
if not openai_settings:
print("❌ FAIL: No active OpenAI integration settings found")
return None
if not openai_settings.config or not openai_settings.config.get('apiKey'):
print("❌ FAIL: OpenAI API key not configured in IntegrationSettings")
return None
api_key_preview = openai_settings.config['apiKey'][:10] + "..." if openai_settings.config.get('apiKey') else "None"
model = openai_settings.config.get('model', 'Not set')
print(f"✅ PASS: OpenAI settings found (API key: {api_key_preview}, Model: {model})")
except Exception as e:
print(f"❌ FAIL: Error checking OpenAI settings: {e}")
return None
# Check if tasks exist
try:
tasks = Tasks.objects.filter(account=account, status='pending')[:5]
task_count = tasks.count()
if task_count == 0:
print("⚠️ WARNING: No pending tasks found, will try to use any task")
tasks = Tasks.objects.filter(account=account)[:5]
task_count = tasks.count()
if task_count == 0:
print("❌ FAIL: No tasks found at all")
return None
print(f"✅ PASS: Found {task_count} task(s)")
for task in tasks:
print(f" - Task {task.id}: {task.title or 'Untitled'} (status: {task.status})")
except Exception as e:
print(f"❌ FAIL: Error getting tasks: {e}")
return None
return {
'account': account,
'tasks': list(tasks),
'openai_settings': openai_settings
}
def test_function_registry():
"""Test that the generate_content function is registered"""
print_section("2. TESTING FUNCTION REGISTRY")
try:
fn = get_function_instance('generate_content')
if not fn:
print("❌ FAIL: generate_content function not found in registry")
return False
print(f"✅ PASS: Function registered: {fn.get_name()}")
metadata = fn.get_metadata()
print(f" - Display name: {metadata.get('display_name')}")
print(f" - Description: {metadata.get('description')}")
return True
except Exception as e:
print(f"❌ FAIL: Error loading function: {e}")
import traceback
traceback.print_exc()
return False
def test_function_validation(context):
"""Test function validation"""
print_section("3. TESTING FUNCTION VALIDATION")
try:
fn = get_function_instance('generate_content')
account = context['account']
task = context['tasks'][0]
payload = {'ids': [task.id]}
print(f"Testing with payload: {payload}")
result = fn.validate(payload, account)
if result['valid']:
print(f"✅ PASS: Validation succeeded")
else:
print(f"❌ FAIL: Validation failed: {result.get('error')}")
return False
return True
except Exception as e:
print(f"❌ FAIL: Error during validation: {e}")
import traceback
traceback.print_exc()
return False
def test_function_prepare(context):
"""Test function prepare phase"""
print_section("4. TESTING FUNCTION PREPARE")
try:
fn = get_function_instance('generate_content')
account = context['account']
task = context['tasks'][0]
payload = {'ids': [task.id]}
print(f"Preparing task {task.id}: {task.title or 'Untitled'}")
data = fn.prepare(payload, account)
if not data:
print("❌ FAIL: Prepare returned no data")
return False
if isinstance(data, list):
print(f"✅ PASS: Prepared {len(data)} task(s)")
for t in data:
print(f" - Task {t.id}: {t.title or 'Untitled'}")
print(f" Cluster: {t.cluster.name if t.cluster else 'None'}")
print(f" Taxonomy: {t.taxonomy_term.name if t.taxonomy_term else 'None'}")
print(f" Keywords: {t.keywords.count()} keyword(s)")
else:
print(f"✅ PASS: Prepared data: {type(data)}")
context['prepared_data'] = data
return True
except Exception as e:
print(f"❌ FAIL: Error during prepare: {e}")
import traceback
traceback.print_exc()
return False
def test_function_build_prompt(context):
"""Test prompt building"""
print_section("5. TESTING PROMPT BUILDING")
try:
fn = get_function_instance('generate_content')
account = context['account']
data = context['prepared_data']
prompt = fn.build_prompt(data, account)
if not prompt:
print("❌ FAIL: No prompt generated")
return False
print(f"✅ PASS: Prompt generated ({len(prompt)} characters)")
print("\nPrompt preview (first 500 chars):")
print("-" * 80)
print(prompt[:500])
if len(prompt) > 500:
print(f"\n... ({len(prompt) - 500} more characters)")
print("-" * 80)
context['prompt'] = prompt
return True
except Exception as e:
print(f"❌ FAIL: Error building prompt: {e}")
import traceback
traceback.print_exc()
return False
def test_model_config(context):
"""Test model configuration"""
print_section("6. TESTING MODEL CONFIGURATION")
try:
from igny8_core.ai.settings import get_model_config
account = context['account']
model_config = get_model_config('generate_content', account=account)
if not model_config:
print("❌ FAIL: No model config returned")
return False
print(f"✅ PASS: Model configuration loaded")
print(f" - Model: {model_config.get('model')}")
print(f" - Max tokens: {model_config.get('max_tokens')}")
print(f" - Temperature: {model_config.get('temperature')}")
print(f" - Response format: {model_config.get('response_format')}")
context['model_config'] = model_config
return True
except Exception as e:
print(f"❌ FAIL: Error getting model config: {e}")
import traceback
traceback.print_exc()
return False
def test_ai_core_request(context):
"""Test AI core request (actual API call)"""
print_section("7. TESTING AI CORE REQUEST (ACTUAL API CALL)")
# Ask user for confirmation
print("⚠️ WARNING: This will make an actual API call to OpenAI and cost money!")
print("Do you want to proceed? (yes/no): ", end='')
response = input().strip().lower()
if response != 'yes':
print("Skipping API call test")
return True
try:
from igny8_core.ai.ai_core import AICore
account = context['account']
prompt = context['prompt']
model_config = context['model_config']
# Use a shorter test prompt to save costs
test_prompt = prompt[:1000] + "\n\n[TEST MODE - Generate only title and first paragraph]"
print(f"Making test API call with shortened prompt ({len(test_prompt)} chars)...")
ai_core = AICore(account=account)
result = ai_core.run_ai_request(
prompt=test_prompt,
model=model_config['model'],
max_tokens=500, # Limit tokens for testing
temperature=model_config.get('temperature', 0.7),
response_format=model_config.get('response_format'),
function_name='generate_content_test'
)
if result.get('error'):
print(f"❌ FAIL: API call returned error: {result['error']}")
return False
if not result.get('content'):
print(f"❌ FAIL: API call returned no content")
return False
print(f"✅ PASS: API call successful")
print(f" - Tokens: {result.get('total_tokens', 0)}")
print(f" - Cost: ${result.get('cost', 0):.6f}")
print(f" - Model: {result.get('model')}")
print(f"\nContent preview (first 300 chars):")
print("-" * 80)
print(result['content'][:300])
print("-" * 80)
context['ai_response'] = result
return True
except Exception as e:
print(f"❌ FAIL: Error during API call: {e}")
import traceback
traceback.print_exc()
return False
def test_service_layer(context):
"""Test the content generation service"""
print_section("8. TESTING CONTENT GENERATION SERVICE")
print("⚠️ WARNING: This will make a full API call and create content!")
print("Do you want to proceed? (yes/no): ", end='')
response = input().strip().lower()
if response != 'yes':
print("Skipping service test")
return True
try:
account = context['account']
task = context['tasks'][0]
service = ContentGenerationService()
print(f"Calling generate_content with task {task.id}...")
result = service.generate_content([task.id], account)
if not result:
print("❌ FAIL: Service returned None")
return False
if not result.get('success'):
print(f"❌ FAIL: Service failed: {result.get('error')}")
return False
print(f"✅ PASS: Service call successful")
if 'task_id' in result:
print(f" - Celery task ID: {result['task_id']}")
print(f" - Message: {result.get('message')}")
print("\n⚠️ Note: Content generation is running in background (Celery)")
print(" Check Celery logs for actual execution status")
else:
print(f" - Content created: {result.get('content_id')}")
print(f" - Word count: {result.get('word_count')}")
return True
except Exception as e:
print(f"❌ FAIL: Error in service layer: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run all diagnostic tests"""
print("\n" + "=" * 80)
print(" GENERATE_CONTENT DIAGNOSTIC TOOL")
print("=" * 80)
print("\nThis tool will test each layer of the content generation pipeline")
print("to identify where the function is failing.")
# Run tests
context = test_prerequisites()
if not context:
print("\n❌ FATAL: Prerequisites test failed. Cannot continue.")
return
if not test_function_registry():
print("\n❌ FATAL: Function registry test failed. Cannot continue.")
return
if not test_function_validation(context):
print("\n❌ FATAL: Validation test failed. Cannot continue.")
return
if not test_function_prepare(context):
print("\n❌ FATAL: Prepare test failed. Cannot continue.")
return
if not test_function_build_prompt(context):
print("\n❌ FATAL: Prompt building test failed. Cannot continue.")
return
if not test_model_config(context):
print("\n❌ FATAL: Model config test failed. Cannot continue.")
return
# Optional tests (require API calls)
test_ai_core_request(context)
test_service_layer(context)
print_section("DIAGNOSTIC COMPLETE")
print("Review the results above to identify where the generate_content")
print("function is failing.\n")
if __name__ == '__main__':
main()

View File

@@ -524,16 +524,20 @@ class AIEngine:
'generate_image_prompts': 'image_prompt_extraction',
'generate_images': 'image_generation',
'generate_site_structure': 'site_structure_generation',
'generate_page_content': 'content_generation', # Site Builder page content
}
return mapping.get(function_name, function_name)
def _get_estimated_amount(self, function_name, data, payload):
"""Get estimated amount for credit calculation (before operation)"""
if function_name == 'generate_content':
# Estimate word count from task or default
if isinstance(data, dict):
return data.get('estimated_word_count', 1000)
return 1000 # Default estimate
if function_name == 'generate_content' or function_name == 'generate_page_content':
# Estimate word count - tasks don't have word_count field, use default
# For generate_content, data is a list of Task objects
# For generate_page_content, data is a PageBlueprint object
if isinstance(data, list) and len(data) > 0:
# Multiple tasks - estimate 1000 words per task
return len(data) * 1000
return 1000 # Default estimate for single item
elif function_name == 'generate_images':
# Count images to generate
if isinstance(payload, dict):
@@ -550,16 +554,24 @@ class AIEngine:
def _get_actual_amount(self, function_name, save_result, parsed, data):
"""Get actual amount for credit calculation (after operation)"""
if function_name == 'generate_content':
if function_name == 'generate_content' or function_name == 'generate_page_content':
# Get actual word count from saved content
if isinstance(save_result, dict):
word_count = save_result.get('word_count')
if word_count:
if word_count and word_count > 0:
return word_count
# Fallback: estimate from parsed content
if isinstance(parsed, dict) and 'content' in parsed:
content = parsed['content']
return len(content.split()) if isinstance(content, str) else 1000
# Fallback: estimate from html_content if available
if isinstance(parsed, dict) and 'html_content' in parsed:
html_content = parsed['html_content']
if isinstance(html_content, str):
# Strip HTML tags for word count
import re
text = re.sub(r'<[^>]+>', '', html_content)
return len(text.split())
return 1000
elif function_name == 'generate_images':
# Count successfully generated images

View File

@@ -65,7 +65,7 @@ class GenerateContentFunction(BaseAIFunction):
# STAGE 3: Preload relationships - taxonomy_term instead of taxonomy
tasks = list(queryset.select_related(
'account', 'site', 'sector', 'cluster', 'taxonomy_term'
).prefetch_related('keywords'))
))
if not tasks:
raise ValueError("No tasks found")
@@ -106,11 +106,10 @@ class GenerateContentFunction(BaseAIFunction):
if task.taxonomy_term.taxonomy_type:
taxonomy_data += f"Type: {task.taxonomy_term.get_taxonomy_type_display()}\n"
# STAGE 3: Build keywords context (from keywords M2M)
# STAGE 3: Build keywords context (from keywords TextField)
keywords_data = ''
if task.keywords.exists():
keyword_list = [kw.keyword for kw in task.keywords.all()]
keywords_data = "Keywords: " + ", ".join(keyword_list) + "\n"
if task.keywords:
keywords_data = f"Keywords: {task.keywords}\n"
# Get prompt from registry with context
prompt = PromptRegistry.get_prompt(
@@ -192,6 +191,7 @@ class GenerateContentFunction(BaseAIFunction):
# Core fields
title=title,
content_html=content_html or '',
word_count=word_count,
cluster=task.cluster,
content_type=task.content_type,
content_structure=task.content_structure,

View File

@@ -63,7 +63,7 @@ class GenerateImagePromptsFunction(BaseAIFunction):
if account:
queryset = queryset.filter(account=account)
contents = list(queryset.select_related('task', 'account', 'site', 'sector'))
contents = list(queryset.select_related('account', 'site', 'sector', 'cluster'))
if not contents:
raise ValueError("No content records found")
@@ -203,7 +203,7 @@ class GenerateImagePromptsFunction(BaseAIFunction):
"""Extract title, intro paragraphs, and H2 headings from content HTML"""
from bs4 import BeautifulSoup
html_content = content.html_content or ''
html_content = content.content_html or ''
soup = BeautifulSoup(html_content, 'html.parser')
# Extract title

View File

@@ -544,42 +544,42 @@ OUTPUT FORMAT:
--------------
Return ONLY a JSON object in this exact structure:
{
{{
"title": "[Page title - SEO optimized, natural]",
"html_content": "[Full HTML content for fallback/SEO - complete article]",
"word_count": [Integer - word count of HTML content],
"blocks": [
{
{{
"type": "hero",
"data": {
"data": {{
"heading": "[Compelling hero headline]",
"subheading": "[Supporting subheadline]",
"content": "[Brief hero description - 1-2 sentences]",
"buttonText": "[CTA button text]",
"buttonLink": "[CTA link URL]"
}
},
{
}}
}},
{{
"type": "text",
"data": {
"data": {{
"heading": "[Section heading]",
"content": "[Rich text content with paragraphs, lists, etc.]"
}
},
{
}}
}},
{{
"type": "features",
"data": {
"data": {{
"heading": "[Features section heading]",
"content": [
"[Feature 1: Description]",
"[Feature 2: Description]",
"[Feature 3: Description]"
]
}
},
{
}}
}},
{{
"type": "testimonials",
"data": {
"data": {{
"heading": "[Testimonials heading]",
"subheading": "[Optional subheading]",
"content": [
@@ -587,20 +587,20 @@ Return ONLY a JSON object in this exact structure:
"[Testimonial quote 2]",
"[Testimonial quote 3]"
]
}
},
{
}}
}},
{{
"type": "cta",
"data": {
"data": {{
"heading": "[CTA heading]",
"subheading": "[CTA subheading]",
"content": "[CTA description]",
"buttonText": "[Button text]",
"buttonLink": "[Button link]"
}
}
}}
}}
]
}
}}
BLOCK TYPE GUIDELINES:
----------------------
@@ -699,7 +699,7 @@ Primary Keyword: [IGNY8_PRIMARY_KEYWORD]
OUTPUT FORMAT:
Return ONLY a JSON object in this format:
{
{{
"title": "[Taxonomy name and purpose]",
"meta_title": "[SEO-optimized meta title, 30-60 chars]",
"meta_description": "[Compelling meta description, 120-160 chars]",
@@ -710,39 +710,39 @@ Return ONLY a JSON object in this format:
"tags": ["tag1", "tag2", "tag3"],
"categories": ["Category > Subcategory"],
"json_blocks": [
{
{{
"type": "taxonomy_overview",
"heading": "Taxonomy Overview",
"content": "Detailed taxonomy description"
},
{
}},
{{
"type": "categories",
"heading": "Categories",
"items": [
{
{{
"name": "Category 1",
"description": "Category description",
"subcategories": ["Subcat 1", "Subcat 2"]
}
}}
]
},
{
}},
{{
"type": "tags",
"heading": "Tags",
"items": ["Tag 1", "Tag 2", "Tag 3"]
},
{
}},
{{
"type": "hierarchy",
"heading": "Taxonomy Hierarchy",
"structure": {"Level 1": {"Level 2": ["Level 3"]}}
}
"structure": {{"Level 1": {{"Level 2": ["Level 3"]}}}}
}}
],
"structure_data": {
"structure_data": {{
"taxonomy_type": "[Taxonomy type]",
"item_count": [Integer],
"hierarchy_levels": [Integer]
}
}
}}
}}
CONTENT REQUIREMENTS:
- Clear taxonomy overview and purpose
@@ -865,7 +865,7 @@ CONTENT REQUIREMENTS:
if '{' in rendered and '}' in rendered:
try:
rendered = rendered.format(**normalized_context)
except (KeyError, ValueError) as e:
except (KeyError, ValueError, IndexError) as e:
# If .format() fails, log warning but keep the [IGNY8_*] replacements
logger.warning(f"Failed to format prompt with .format(): {e}. Using [IGNY8_*] replacements only.")

View File

@@ -91,6 +91,11 @@ class Tasks(SiteSectorBaseModel):
null=True,
help_text="Comma-separated keywords for this task"
)
word_count = models.IntegerField(
default=1000,
validators=[MinValueValidator(100)],
help_text="Target word count for content generation"
)
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='queued')
created_at = models.DateTimeField(auto_now_add=True)
@@ -165,6 +170,10 @@ class Content(SiteSectorBaseModel):
# Core content fields
title = models.CharField(max_length=255, db_index=True)
content_html = models.TextField(help_text="Final HTML content")
word_count = models.IntegerField(
default=0,
help_text="Actual word count of content (calculated from HTML)"
)
cluster = models.ForeignKey(
'planner.Clusters',
on_delete=models.SET_NULL,

View File

@@ -33,7 +33,7 @@ class ContentGenerationService:
# Get tasks
tasks = Tasks.objects.filter(id__in=task_ids, account=account)
# Calculate estimated credits needed
# Calculate estimated credits needed based on word count
total_word_count = sum(task.word_count or 1000 for task in tasks)
# Check credits

View File

@@ -0,0 +1,23 @@
# Generated manually on 2025-11-26
from django.db import migrations, models
import django.core.validators
class Migration(migrations.Migration):
dependencies = [
('writer', '0008_field_rename_implementation'),
]
operations = [
migrations.AddField(
model_name='tasks',
name='word_count',
field=models.IntegerField(
default=1000,
validators=[django.core.validators.MinValueValidator(100)],
help_text='Target word count for content generation'
),
),
]

View File

@@ -25,6 +25,7 @@ class TasksSerializer(serializers.ModelSerializer):
'content_type',
'content_structure',
'taxonomy_term_id',
'word_count',
'status',
'sector_name',
'site_id',
@@ -171,6 +172,7 @@ class ContentSerializer(serializers.ModelSerializer):
'external_url',
'source',
'status',
'word_count',
'sector_name',
'site_id',
'sector_id',

View File

@@ -0,0 +1,174 @@
#!/usr/bin/env python
"""
Test script to run inside Docker container to diagnose generate_content issues
"""
import os
import sys
import django
# Setup Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'igny8_core.settings')
django.setup()
import logging
from django.contrib.auth import get_user_model
from igny8_core.auth.models import Account
from igny8_core.business.site_building.models import PageBlueprint, SiteBlueprint
from igny8_core.business.site_building.services.page_generation_service import PageGenerationService
from igny8_core.modules.system.models import IntegrationSettings
from igny8_core.ai.settings import get_model_config
# Set up logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def main():
print("=" * 80)
print("GENERATE_CONTENT DIAGNOSTIC TEST")
print("=" * 80)
# 1. Test User Authentication and get Account
print("\n1. Testing User Authentication...")
User = get_user_model()
user = User.objects.filter(email='dev@igny8.com').first()
if not user:
print("❌ ERROR: User 'dev@igny8.com' not found!")
return
print(f"✓ User found: {user.email} (ID: {user.id})")
# Get the associated account
account = user.account if hasattr(user, 'account') else None
if not account:
print("❌ ERROR: User has no associated Account!")
return
print(f"✓ Account found: {account.id}")
# 2. Check Integration Settings
print("\n2. Checking Integration Settings...")
try:
integration = IntegrationSettings.objects.filter(account=account, is_active=True).first()
if integration:
print(f"✓ Integration found: {integration.integration_type}")
print(f" - Config keys: {list(integration.config.keys()) if integration.config else 'None'}")
print(f" - Active: {integration.is_active}")
else:
print("❌ WARNING: No active IntegrationSettings found!")
print(" This will cause AI requests to fail!")
except Exception as e:
print(f"❌ ERROR checking integration: {e}")
# 3. Test Model Configuration
print("\n3. Testing Model Configuration...")
try:
model_config = get_model_config('generate_page_content', account=account)
print(f"✓ Model config loaded:")
print(f" - Model: {model_config.get('model')}")
print(f" - Max tokens: {model_config.get('max_tokens')}")
print(f" - Temperature: {model_config.get('temperature')}")
except Exception as e:
print(f"❌ ERROR: {e}")
import traceback
traceback.print_exc()
# 4. Check for Site Blueprints
print("\n4. Checking for Site Blueprints...")
site_blueprints = SiteBlueprint.objects.filter(account=account)
print(f" - Found {site_blueprints.count()} site blueprints")
if site_blueprints.exists():
sb = site_blueprints.first()
print(f" - First blueprint: {sb.name} (ID: {sb.id})")
print(f" - Pages: {sb.pages.count()}")
# 5. Check for Page Blueprints
print("\n5. Checking for Page Blueprints...")
pages = PageBlueprint.objects.filter(account=account)
print(f" - Found {pages.count()} page blueprints")
if not pages.exists():
print("❌ WARNING: No page blueprints found! Creating a test page...")
# Create a test site blueprint if needed
if not site_blueprints.exists():
from igny8_core.modules.planner.models import Sector
sector = Sector.objects.filter(account=account).first()
if not sector:
print("❌ ERROR: No sector found for account. Cannot create test blueprint.")
return
sb = SiteBlueprint.objects.create(
account=account,
sector=sector,
name="Test Site Blueprint",
site_type="business",
status="draft"
)
print(f"✓ Created test site blueprint: {sb.id}")
else:
sb = site_blueprints.first()
# Create a test page
page = PageBlueprint.objects.create(
account=account,
site_blueprint=sb,
sector=sb.sector,
title="Test Home Page",
slug="home",
type="home",
status="draft",
blocks_json=[
{
"type": "hero",
"heading": "Welcome to Our Test Site",
"subheading": "This is a test page for content generation"
},
{
"type": "features",
"heading": "Our Features"
}
]
)
print(f"✓ Created test page blueprint: {page.id}")
else:
page = pages.first()
print(f" - Using existing page: {page.title} (ID: {page.id})")
# 6. Test generate_page_content
print("\n6. Testing generate_page_content...")
print(f" - Page ID: {page.id}")
print(f" - Page title: {page.title}")
print(f" - Page type: {page.type}")
print(f" - Page status: {page.status}")
print(f" - Blocks count: {len(page.blocks_json) if page.blocks_json else 0}")
try:
service = PageGenerationService()
print("\n Calling service.generate_page_content()...")
result = service.generate_page_content(page, force_regenerate=False)
print(f"\n✓ SUCCESS!")
print(f" Result: {result}")
except ValueError as e:
print(f"\n❌ ValueError: {e}")
import traceback
traceback.print_exc()
except Exception as e:
print(f"\n❌ Exception: {e}")
import traceback
traceback.print_exc()
print("\n" + "=" * 80)
print("TEST COMPLETE")
print("=" * 80)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""
Simple test to check generate_content function execution
Run this to see exact error messages
"""
import os
import sys
import django
import logging
import json
# Setup Django
sys.path.insert(0, '/data/app/igny8/backend')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'igny8_core.settings')
django.setup()
# Setup logging to see all messages
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
from igny8_core.auth.models import Account
from igny8_core.modules.writer.models import Tasks
from igny8_core.ai.registry import get_function_instance
from igny8_core.ai.engine import AIEngine
print("\n" + "="*80)
print("SIMPLE GENERATE_CONTENT TEST")
print("="*80 + "\n")
# Get first account and task
try:
account = Account.objects.first()
print(f"✓ Account: {account.id} - {account.email}")
except Exception as e:
print(f"✗ Failed to get account: {e}")
sys.exit(1)
try:
task = Tasks.objects.filter(account=account).first()
if not task:
print("✗ No tasks found")
sys.exit(1)
print(f"✓ Task: {task.id} - {task.title or 'Untitled'}")
print(f" Status: {task.status}")
print(f" Cluster: {task.cluster.name if task.cluster else 'None'}")
print(f" Taxonomy: {task.taxonomy_term.name if task.taxonomy_term else 'None'}")
except Exception as e:
print(f"✗ Failed to get task: {e}")
sys.exit(1)
# Get function
try:
fn = get_function_instance('generate_content')
print(f"✓ Function loaded: {fn.get_name()}")
except Exception as e:
print(f"✗ Failed to load function: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# Test validation
print("\nTesting validation...")
try:
payload = {'ids': [task.id]}
result = fn.validate(payload, account)
if result['valid']:
print(f"✓ Validation passed")
else:
print(f"✗ Validation failed: {result.get('error')}")
sys.exit(1)
except Exception as e:
print(f"✗ Validation error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# Test prepare
print("\nTesting prepare...")
try:
data = fn.prepare(payload, account)
print(f"✓ Prepare succeeded: {len(data) if isinstance(data, list) else 1} task(s)")
except Exception as e:
print(f"✗ Prepare error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# Test build_prompt
print("\nTesting build_prompt...")
try:
prompt = fn.build_prompt(data, account)
print(f"✓ Prompt built: {len(prompt)} characters")
print(f"\nPrompt preview (first 500 chars):")
print("-" * 80)
print(prompt[:500])
print("-" * 80)
except Exception as e:
print(f"✗ Build prompt error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
# Test model config
print("\nTesting model config...")
try:
from igny8_core.ai.settings import get_model_config
model_config = get_model_config('generate_content', account=account)
print(f"✓ Model config loaded:")
print(f" Model: {model_config.get('model')}")
print(f" Max tokens: {model_config.get('max_tokens')}")
print(f" Temperature: {model_config.get('temperature')}")
except Exception as e:
print(f"✗ Model config error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
print("\n" + "="*80)
print("All tests passed! The function structure is correct.")
print("If content generation still fails, the issue is likely:")
print("1. API key is invalid or missing")
print("2. OpenAI API error (rate limit, quota, etc.)")
print("3. Prompt is too long or has invalid format")
print("4. Celery worker is not running or has errors")
print("\nCheck Celery worker logs with:")
print(" journalctl -u celery-worker -f")
print("="*80 + "\n")

View File

@@ -0,0 +1,94 @@
#!/usr/bin/env python
"""Test generate_content for Writer tasks"""
import os
import sys
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'igny8_core.settings')
django.setup()
from django.contrib.auth import get_user_model
from igny8_core.business.content.models import Tasks
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
print("=" * 80)
print("TESTING WRITER GENERATE_CONTENT")
print("=" * 80)
User = get_user_model()
user = User.objects.filter(email='dev@igny8.com').first()
if not user:
print("❌ User not found")
sys.exit(1)
account = user.account if hasattr(user, 'account') else None
if not account:
print("❌ No account found for user")
sys.exit(1)
print(f"✓ User: {user.email}")
print(f"✓ Account: {account.id}")
# Get tasks
tasks = Tasks.objects.filter(account=account, status='queued')[:5]
print(f"\n✓ Found {tasks.count()} queued tasks")
if tasks.exists():
task = tasks.first()
print(f"\nTesting with task:")
print(f" - ID: {task.id}")
print(f" - Title: {task.title}")
print(f" - Status: {task.status}")
print(f" - Cluster: {task.cluster.name if task.cluster else 'None'}")
print("\nCalling generate_content service...")
service = ContentGenerationService()
try:
result = service.generate_content([task.id], account)
print(f"\n✓ SUCCESS!")
print(f"Result: {result}")
except Exception as e:
print(f"\n❌ ERROR: {e}")
import traceback
traceback.print_exc()
else:
print("\n⚠️ No queued tasks found. Creating a test task...")
from igny8_core.modules.planner.models import Clusters
cluster = Clusters.objects.filter(account=account).first()
if not cluster:
print("❌ No clusters found. Cannot create test task.")
sys.exit(1)
task = Tasks.objects.create(
account=account,
site=cluster.site,
sector=cluster.sector,
cluster=cluster,
title="Test Article: Benefits of Organic Cotton Bedding",
description="Comprehensive guide covering health benefits, environmental impact, and buying guide",
content_type='post',
content_structure='article',
status='queued'
)
print(f"✓ Created test task: {task.id} - {task.title}")
print("\nCalling generate_content service...")
service = ContentGenerationService()
try:
result = service.generate_content([task.id], account)
print(f"\n✓ SUCCESS!")
print(f"Result: {result}")
except Exception as e:
print(f"\n❌ ERROR: {e}")
import traceback
traceback.print_exc()
print("\n" + "=" * 80)
print("TEST COMPLETE")
print("=" * 80)