Files
igny8/backend/igny8_core/modules/writer/tasks.py
IGNY8 VPS (Salman) 8bb4c5d016 Add new fields to TasksSerializer and enhance auto_generate_content_task with detailed step tracking
- Updated TasksSerializer to include 'primary_keyword', 'secondary_keywords', 'tags', and 'categories'.
- Enhanced auto_generate_content_task to track progress with detailed steps, including initialization, preparation, AI call, parsing, and saving.
- Updated progress modal to reflect new phases and improved animation for smoother user experience.
- Adjusted routing and configuration for content and drafts pages in the frontend.
2025-11-10 13:17:48 +00:00

1138 lines
59 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Celery tasks for Writer module - AI content generation
"""
import logging
import re
import time
from typing import List
from django.db import transaction
from igny8_core.modules.writer.models import Tasks, Images, Content
from igny8_core.utils.ai_processor import ai_processor
from igny8_core.modules.system.utils import get_prompt_value, get_default_prompt
from igny8_core.ai.functions.generate_content import generate_content_core
from igny8_core.ai.functions.generate_images import generate_images_core
logger = logging.getLogger(__name__)
# Try to import Celery, fall back to synchronous execution if not available
try:
from celery import shared_task
CELERY_AVAILABLE = True
except ImportError:
CELERY_AVAILABLE = False
def shared_task(*args, **kwargs):
def decorator(func):
return func
return decorator
@shared_task(bind=True, max_retries=3)
def auto_generate_content_task(self, task_ids: List[int], account_id: int = None):
"""
Celery task to generate content for tasks using AI.
Args:
task_ids: List of task IDs
account_id: Account ID for account isolation
"""
try:
# Step tracking for progress modal
step_counter = 0
request_steps = []
response_steps = []
def add_step(step_name, status='success', message='', step_type='request'):
nonlocal step_counter
step_counter += 1
step = {
'stepNumber': step_counter,
'stepName': step_name,
'status': status,
'message': message,
'timestamp': time.time()
}
if step_type == 'request':
request_steps.append(step)
else:
response_steps.append(step)
return step
# Initialize progress
add_step('INIT', 'success', 'Initializing content generation...', 'request')
self.update_state(
state='PROGRESS',
meta={
'current': 0,
'total': len(task_ids),
'percentage': 0,
'message': 'Initializing content generation...',
'phase': 'INIT',
'request_steps': request_steps,
'response_steps': response_steps
}
)
# ========================================================================
# DATABASE QUERY PHASE - Detailed logging
# ========================================================================
logger.info("=" * 80)
logger.info("DATABASE QUERY: Starting task retrieval")
logger.info(f" - task_ids: {task_ids}")
logger.info(f" - account_id: {account_id}")
logger.info("=" * 80)
# Get tasks with all relationships preloaded to avoid N+1 queries
try:
from django.db import connection
# Check database connection first
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
logger.info(" - ✓ Database connection verified")
except Exception as conn_error:
logger.error(f" - ✗ Database connection failed: {type(conn_error).__name__}: {str(conn_error)}")
raise
# Build queryset step by step with error handling
logger.info(" - Building queryset...")
tasks_queryset = Tasks.objects.filter(id__in=task_ids)
logger.info(f" - Initial queryset created for {len(task_ids)} task IDs")
# Count before account filter (may fail if database issue)
try:
initial_count = tasks_queryset.count()
logger.info(f" - Initial queryset count (before account filter): {initial_count}")
except Exception as count_error:
logger.error(f" - ✗ Failed to count initial queryset: {type(count_error).__name__}: {str(count_error)}")
raise
if account_id:
logger.info(f" - Applying account filter (account_id={account_id})...")
tasks_queryset = tasks_queryset.filter(account_id=account_id)
try:
filtered_count = tasks_queryset.count()
logger.info(f" - After account filter: {filtered_count} tasks")
except Exception as count_error:
logger.error(f" - ✗ Failed to count after account filter: {type(count_error).__name__}: {str(count_error)}")
raise
# Log queryset SQL for debugging (only if query is valid)
try:
sql_query = str(tasks_queryset.query)
logger.info(f" - Queryset SQL: {sql_query[:500]}...") # Truncate if too long
except Exception as sql_error:
logger.warning(f" - ⚠️ Could not generate SQL string: {type(sql_error).__name__}: {str(sql_error)}")
# Optimize queries: preload all related objects to avoid N+1 queries
# Only select relationships that definitely exist (nullable FKs handled safely)
logger.info(" - Applying select_related for: cluster, idea, sector, account, site")
try:
tasks_queryset = tasks_queryset.select_related(
'cluster', # Cluster FK (nullable)
'idea', # ContentIdeas FK (nullable)
'sector', # Sector FK (required - SiteSectorBaseModel)
'account', # Account FK (required - AccountBaseModel)
'site', # Site FK (required - SiteSectorBaseModel)
)
logger.info(" - ✓ select_related applied successfully")
except Exception as select_error:
logger.error(f" - ✗ Failed to apply select_related: {type(select_error).__name__}: {str(select_error)}")
# Try without select_related as fallback
logger.warning(" - ⚠️ Falling back to queryset without select_related")
tasks_queryset = Tasks.objects.filter(id__in=task_ids)
if account_id:
tasks_queryset = tasks_queryset.filter(account_id=account_id)
# Convert to list to execute query and log results
logger.info(" - Executing database query...")
try:
tasks = list(tasks_queryset)
logger.info(f" - ✓ Query executed successfully: {len(tasks)} tasks retrieved")
except Exception as query_error:
logger.error(f" - ✗ Query execution failed: {type(query_error).__name__}: {str(query_error)}")
logger.error(f" - This may indicate a database schema issue or missing relationships")
raise
except Exception as db_error:
logger.error("=" * 80)
logger.error("DATABASE QUERY ERROR")
logger.error(f" - Error type: {type(db_error).__name__}")
logger.error(f" - Error message: {str(db_error)}")
logger.error(f" - task_ids: {task_ids}")
logger.error(f" - account_id: {account_id}")
logger.error("=" * 80, exc_info=True)
raise
# Log detailed information about each retrieved task
logger.info("=" * 80)
logger.info("DATABASE RESULTS: Task data collected")
logger.info(f" - Total tasks retrieved: {len(tasks)}")
logger.info("=" * 80)
for idx, task in enumerate(tasks):
logger.info(f" Task #{idx + 1} (ID: {task.id}):")
logger.info(f" - title: {task.title}")
logger.info(f" - status: {task.status}")
logger.info(f" - account_id: {task.account_id if hasattr(task, 'account_id') else 'N/A'}")
logger.info(f" - site_id: {task.site_id if hasattr(task, 'site_id') else 'N/A'}")
logger.info(f" - sector_id: {task.sector_id if hasattr(task, 'sector_id') else 'N/A'}")
logger.info(f" - cluster_id: {task.cluster_id if hasattr(task, 'cluster_id') else 'None'}")
logger.info(f" - idea_id: {task.idea_id if hasattr(task, 'idea_id') else 'None'}")
# Check if relationships are loaded
try:
account_loaded = task.account is not None
site_loaded = task.site is not None
sector_loaded = task.sector is not None
cluster_loaded = task.cluster is not None
idea_loaded = task.idea is not None
logger.info(f" - Relationships loaded:")
logger.info(f" * account: {account_loaded} (ID: {task.account.id if account_loaded else 'N/A'})")
logger.info(f" * site: {site_loaded} (ID: {task.site.id if site_loaded else 'N/A'}, Name: {task.site.name if site_loaded else 'N/A'})")
logger.info(f" * sector: {sector_loaded} (ID: {task.sector.id if sector_loaded else 'N/A'}, Name: {task.sector.name if sector_loaded else 'N/A'})")
logger.info(f" * cluster: {cluster_loaded} (ID: {task.cluster.id if cluster_loaded else 'None'}, Name: {task.cluster.name if cluster_loaded else 'N/A'})")
logger.info(f" * idea: {idea_loaded} (ID: {task.idea.id if idea_loaded else 'None'}, Title: {task.idea.idea_title if idea_loaded else 'N/A'})")
# Check for potential data issues
if not account_loaded:
logger.error(f" - ⚠️ WARNING: Task {task.id} has no account loaded!")
if not site_loaded:
logger.error(f" - ⚠️ WARNING: Task {task.id} has no site loaded!")
if not sector_loaded:
logger.error(f" - ⚠️ WARNING: Task {task.id} has no sector loaded!")
except Exception as rel_error:
logger.error(f" - ⚠️ ERROR accessing relationships: {type(rel_error).__name__}: {str(rel_error)}")
logger.error(f" - This may indicate a database relationship issue", exc_info=True)
logger.info("=" * 80)
if not tasks:
logger.warning(f"No tasks found: {task_ids}")
return {'success': False, 'error': 'No tasks found'}
total_tasks = len(tasks)
# Update progress: Preparing tasks
add_step('PREP', 'success', f'Preparing {total_tasks} tasks for content generation...', 'request')
self.update_state(
state='PROGRESS',
meta={
'current': 0,
'total': total_tasks,
'percentage': 10,
'message': f'Preparing {total_tasks} tasks for content generation...',
'phase': 'PREP',
'request_steps': request_steps,
'response_steps': response_steps
}
)
tasks_updated = 0
# Generate content for each task
with transaction.atomic():
for idx, task in enumerate(tasks):
# ========================================================================
# TASK VALIDATION PHASE - Detailed logging
# ========================================================================
logger.info("=" * 80)
logger.info(f"PROCESSING TASK #{idx + 1}/{total_tasks} (ID: {task.id})")
logger.info("=" * 80)
# Validate task has required data before processing
logger.info(" - Validating task data...")
if not task.title:
logger.warning(f" - ⚠️ Task {task.id} has no title, skipping")
continue
logger.info(f" - ✓ Title: {task.title}")
# Get account - ensure it's loaded (already in select_related)
logger.info(" - Checking account relationship...")
try:
account = task.account
if not account:
logger.error(f" - ✗ Task {task.id} has no account object (account_id={task.account_id}), skipping")
continue
logger.info(f" - ✓ Account loaded: ID={account.id}, Name={account.name if hasattr(account, 'name') else 'N/A'}")
except Exception as account_error:
logger.error(f" - ✗ ERROR accessing account: {type(account_error).__name__}: {str(account_error)}")
logger.error(f" - Task account_id: {task.account_id}", exc_info=True)
continue
# Validate site relationship
logger.info(" - Checking site relationship...")
try:
site = task.site
if not site:
logger.error(f" - ✗ Task {task.id} has no site object (site_id={task.site_id}), skipping")
continue
logger.info(f" - ✓ Site loaded: ID={site.id}, Name={site.name if hasattr(site, 'name') else 'N/A'}")
except Exception as site_error:
logger.error(f" - ✗ ERROR accessing site: {type(site_error).__name__}: {str(site_error)}")
logger.error(f" - Task site_id: {task.site_id}", exc_info=True)
continue
# Validate sector relationship
logger.info(" - Checking sector relationship...")
try:
sector = task.sector
if not sector:
logger.error(f" - ✗ Task {task.id} has no sector object (sector_id={task.sector_id}), skipping")
continue
logger.info(f" - ✓ Sector loaded: ID={sector.id}, Name={sector.name if hasattr(sector, 'name') else 'N/A'}")
except Exception as sector_error:
logger.error(f" - ✗ ERROR accessing sector: {type(sector_error).__name__}: {str(sector_error)}")
logger.error(f" - Task sector_id: {task.sector_id}", exc_info=True)
continue
# Check cluster relationship (nullable)
logger.info(" - Checking cluster relationship (nullable)...")
try:
cluster = task.cluster
if cluster:
logger.info(f" - ✓ Cluster loaded: ID={cluster.id}, Name={cluster.name if hasattr(cluster, 'name') else 'N/A'}")
else:
logger.info(f" - Cluster is None (nullable field)")
except Exception as cluster_error:
logger.warning(f" - ⚠️ ERROR accessing cluster (nullable): {type(cluster_error).__name__}: {str(cluster_error)}")
logger.warning(f" - Task cluster_id: {task.cluster_id}")
# Don't skip - cluster is nullable
# Check idea relationship (nullable)
logger.info(" - Checking idea relationship (nullable)...")
try:
idea = task.idea
if idea:
logger.info(f" - ✓ Idea loaded: ID={idea.id}, Title={idea.idea_title if hasattr(idea, 'idea_title') else 'N/A'}")
else:
logger.info(f" - Idea is None (nullable field)")
except Exception as idea_error:
logger.warning(f" - ⚠️ ERROR accessing idea (nullable): {type(idea_error).__name__}: {str(idea_error)}")
logger.warning(f" - Task idea_id: {task.idea_id}")
# Don't skip - idea is nullable
# Update progress: Processing task
# Calculate base percentage: 10% (PREP) + progress through tasks (10-50%)
base_pct = 10
task_progress_pct = base_pct + int((idx / total_tasks) * 40) # 10-50% for task prep
self.update_state(
state='PROGRESS',
meta={
'current': idx + 1,
'total': total_tasks,
'percentage': task_progress_pct,
'message': f"Preparing content generation for '{task.title}' ({idx + 1} of {total_tasks})...",
'phase': 'PREP',
'current_item': task.title,
'request_steps': request_steps,
'response_steps': response_steps
}
)
# ========================================================================
# PROMPT LOADING PHASE - Detailed logging
# ========================================================================
logger.info(" - Loading prompt template...")
try:
# Get prompt template from database or default (account-aware)
# Use utility function to ensure proper loading
logger.info(f" * Attempting to load prompt from database for account {account.id}...")
prompt_template = get_prompt_value(account, 'content_generation')
if not prompt_template:
# Fallback to default if not found
logger.warning(f" * No custom prompt found in database, using default...")
prompt_template = get_default_prompt('content_generation')
logger.warning(f" * Using default prompt for account {account.id}")
else:
logger.info(f" * ✓ Custom prompt loaded from database (length: {len(prompt_template)} chars)")
logger.info(f" * Prompt template length: {len(prompt_template)} characters")
logger.info(f" * Prompt template preview (first 200 chars): {prompt_template[:200]}...")
except Exception as prompt_error:
logger.error(f" * ✗ ERROR loading prompt: {type(prompt_error).__name__}: {str(prompt_error)}")
logger.error(f" * Account ID: {account.id}", exc_info=True)
# Fallback to default
try:
prompt_template = get_default_prompt('content_generation')
logger.warning(f" * Using default prompt as fallback")
except Exception as default_error:
logger.error(f" * ✗ CRITICAL: Cannot load default prompt either: {str(default_error)}")
continue
# Validate prompt template has required placeholders
if '[IGNY8_IDEA]' not in prompt_template:
logger.warning(f"Prompt template missing [IGNY8_IDEA] placeholder for task {task.id}")
if '[IGNY8_CLUSTER]' not in prompt_template:
logger.warning(f"Prompt template missing [IGNY8_CLUSTER] placeholder for task {task.id}")
if '[IGNY8_KEYWORDS]' not in prompt_template:
logger.warning(f"Prompt template missing [IGNY8_KEYWORDS] placeholder for task {task.id}")
# ========================================================================
# DATA FORMATTING PHASE - Detailed logging
# ========================================================================
logger.info(" - Formatting data for AI prompt...")
# Build idea data string (format similar to WordPress plugin)
logger.info(" * Building idea data string...")
idea_data = f"Title: {task.title or 'Untitled'}\n"
logger.info(f" - Title: {task.title or 'Untitled'}")
if task.description:
idea_data += f"Description: {task.description}\n"
logger.info(f" - Description: {task.description[:100] if len(task.description) > 100 else task.description}...")
else:
logger.info(f" - Description: None")
if task.idea and task.idea.description:
logger.info(f" * Processing idea description (ID: {task.idea.id})...")
# Handle structured description (JSON) vs plain text
description = task.idea.description
logger.info(f" - Idea description type: {type(description).__name__}, length: {len(str(description)) if description else 0}")
try:
import json
# Try to parse as JSON (structured outline)
logger.info(f" - Attempting to parse as JSON...")
parsed_desc = json.loads(description)
if isinstance(parsed_desc, dict):
logger.info(f" - ✓ Successfully parsed as JSON dict")
logger.info(f" - JSON keys: {list(parsed_desc.keys())}")
# Format structured description
formatted_desc = "Content Outline:\n\n"
if 'H2' in parsed_desc:
h2_count = len(parsed_desc['H2'])
logger.info(f" - Found {h2_count} H2 sections")
for h2_idx, h2_section in enumerate(parsed_desc['H2']):
formatted_desc += f"## {h2_section.get('heading', '')}\n"
if 'subsections' in h2_section:
h3_count = len(h2_section['subsections'])
logger.info(f" - H2 #{h2_idx + 1}: {h2_section.get('heading', '')} ({h3_count} subsections)")
for h3_section in h2_section['subsections']:
formatted_desc += f"### {h3_section.get('subheading', '')}\n"
formatted_desc += f"Content Type: {h3_section.get('content_type', '')}\n"
formatted_desc += f"Details: {h3_section.get('details', '')}\n\n"
description = formatted_desc
logger.info(f" - ✓ Formatted structured description (length: {len(description)} chars)")
except json.JSONDecodeError as json_error:
# Not JSON, use as plain text
logger.info(f" - Not JSON format (JSONDecodeError: {str(json_error)}), using as plain text")
pass
except TypeError as type_error:
logger.warning(f" - Type error parsing description: {str(type_error)}, using as plain text")
pass
except Exception as parse_error:
logger.error(f" - ✗ Unexpected error parsing description: {type(parse_error).__name__}: {str(parse_error)}")
logger.error(f" - Description value: {str(description)[:200]}...", exc_info=True)
# Continue with plain text
pass
idea_data += f"Outline: {description}\n"
logger.info(f" - ✓ Added outline to idea_data")
if task.idea:
idea_data += f"Structure: {task.idea.content_structure or task.content_structure or 'blog_post'}\n"
idea_data += f"Type: {task.idea.content_type or task.content_type or 'blog_post'}\n"
if task.idea.estimated_word_count:
idea_data += f"Estimated Word Count: {task.idea.estimated_word_count}\n"
# Build cluster data string (format similar to WordPress plugin)
logger.info(" * Building cluster data string...")
cluster_data = ''
if task.cluster:
try:
cluster_data = f"Cluster Name: {task.cluster.name or ''}\n"
logger.info(f" - Cluster name: {task.cluster.name or 'N/A'}")
if task.cluster.description:
cluster_data += f"Description: {task.cluster.description}\n"
logger.info(f" - Cluster description: {task.cluster.description[:100] if len(task.cluster.description) > 100 else task.cluster.description}...")
cluster_data += f"Status: {task.cluster.status or 'active'}\n"
logger.info(f" - Cluster status: {task.cluster.status or 'active'}")
# Get keyword count from cluster (if available)
if hasattr(task.cluster, 'keywords_count'):
keywords_count = task.cluster.keywords_count or 0
cluster_data += f"Keyword Count: {keywords_count}\n"
logger.info(f" - Cluster keywords_count: {keywords_count}")
if hasattr(task.cluster, 'volume'):
volume = task.cluster.volume or 0
cluster_data += f"Total Volume: {volume}\n"
logger.info(f" - Cluster volume: {volume}")
logger.info(f" - ✓ Cluster data formatted (length: {len(cluster_data)} chars)")
except Exception as cluster_data_error:
logger.error(f" - ✗ ERROR building cluster data: {type(cluster_data_error).__name__}: {str(cluster_data_error)}")
logger.error(f" - Cluster object: {task.cluster}", exc_info=True)
cluster_data = f"Cluster Name: {task.cluster.name or 'Unknown'}\n"
else:
logger.info(f" - No cluster associated with task")
# Build keywords string
logger.info(" * Building keywords data string...")
# Prefer task.keywords, fallback to idea.target_keywords, then cluster keywords
keywords_data = task.keywords or ''
logger.info(f" - Task keywords: {keywords_data or 'None'}")
if not keywords_data and task.idea:
try:
if task.idea.target_keywords:
keywords_data = task.idea.target_keywords
logger.info(f" - Using idea.target_keywords: {keywords_data[:100] if len(keywords_data) > 100 else keywords_data}...")
except Exception as idea_keywords_error:
logger.warning(f" - ⚠️ ERROR accessing idea.target_keywords: {type(idea_keywords_error).__name__}: {str(idea_keywords_error)}")
if not keywords_data and task.cluster:
logger.info(f" - No keywords from task or idea, cluster available but not fetching keywords")
keywords_data = ''
logger.info(f" - ✓ Final keywords_data: {keywords_data[:100] if len(keywords_data) > 100 else keywords_data}...")
# Replace placeholders in prompt template
logger.info(" * Replacing placeholders in prompt template...")
try:
prompt = prompt_template.replace('[IGNY8_IDEA]', idea_data)
logger.info(f" - ✓ Replaced [IGNY8_IDEA] (idea_data length: {len(idea_data)} chars)")
prompt = prompt.replace('[IGNY8_CLUSTER]', cluster_data)
logger.info(f" - ✓ Replaced [IGNY8_CLUSTER] (cluster_data length: {len(cluster_data)} chars)")
prompt = prompt.replace('[IGNY8_KEYWORDS]', keywords_data)
logger.info(f" - ✓ Replaced [IGNY8_KEYWORDS] (keywords_data length: {len(keywords_data)} chars)")
logger.info(f" - ✓ Final prompt length: {len(prompt)} characters")
logger.info(f" - Final prompt preview (first 500 chars): {prompt[:500]}...")
except Exception as prompt_replace_error:
logger.error(f" - ✗ ERROR replacing placeholders: {type(prompt_replace_error).__name__}: {str(prompt_replace_error)}")
logger.error(f" - Prompt template length: {len(prompt_template)}")
logger.error(f" - Idea data length: {len(idea_data)}")
logger.error(f" - Cluster data length: {len(cluster_data)}")
logger.error(f" - Keywords data length: {len(keywords_data)}", exc_info=True)
continue
# Log prompt preparation summary
logger.info("=" * 80)
logger.info(f"PROMPT PREPARATION SUMMARY for Task {task.id}:")
logger.info(f" - Prompt length: {len(prompt)} characters")
logger.info(f" - Has idea: {bool(task.idea)}")
logger.info(f" - Has cluster: {bool(task.cluster)}")
logger.info(f" - Idea data length: {len(idea_data)} chars")
logger.info(f" - Cluster data length: {len(cluster_data)} chars")
logger.info(f" - Keywords data length: {len(keywords_data)} chars")
logger.info("=" * 80)
# Update progress: Generating with AI
add_step('AI_CALL', 'success', f"Generating article content for '{task.title}'...", 'request')
ai_call_pct = 50 + int((idx / total_tasks) * 20) # 50-70% for AI call
self.update_state(
state='PROGRESS',
meta={
'current': idx + 1,
'total': total_tasks,
'percentage': ai_call_pct,
'message': f"Generating article content for '{task.title}'...",
'phase': 'AI_CALL',
'current_item': task.title,
'request_steps': request_steps,
'response_steps': response_steps
}
)
# ========================================================================
# AI PROCESSOR INITIALIZATION PHASE - Detailed logging
# ========================================================================
logger.info(" - Initializing AIProcessor...")
try:
# Create AIProcessor instance with account to load API keys from IntegrationSettings
# This ensures API keys and model are loaded from IntegrationSettings
from igny8_core.utils.ai_processor import AIProcessor
logger.info(f" * Creating AIProcessor instance for account {account.id}...")
processor = AIProcessor(account=account)
logger.info(f" * ✓ AIProcessor created successfully")
# Validate processor has API key
logger.info(f" * Checking OpenAI API key...")
if not processor.openai_api_key:
logger.error(f" * ✗ OpenAI API key not configured for account {account.id}")
logger.error(f" * This will cause the AI request to fail")
continue
else:
# Log partial key for verification (first 10 chars + ...)
api_key_preview = processor.openai_api_key[:10] + "..." if len(processor.openai_api_key) > 10 else "***"
logger.info(f" * ✓ OpenAI API key configured (preview: {api_key_preview})")
# Log model information
logger.info(f" * Default model: {processor.default_model}")
logger.info(f" * Model rates available: {list(processor.model_rates.keys())}")
except Exception as processor_error:
logger.error(f" * ✗ ERROR initializing AIProcessor: {type(processor_error).__name__}: {str(processor_error)}")
logger.error(f" * Account ID: {account.id}", exc_info=True)
continue
# Log AI request details (without exposing sensitive data)
logger.info("=" * 80)
logger.info(f"AI REQUEST PREPARATION for Task {task.id}:")
logger.info(f" - Model: {processor.default_model}")
logger.info(f" - Prompt length: {len(prompt)} characters")
logger.info(f" - Max tokens: 4000")
logger.info("=" * 80)
# ========================================================================
# AI API CALL PHASE - Detailed logging
# ========================================================================
logger.info(" - Calling AI API...")
try:
# Call AI processor
result = processor.generate_content(prompt, max_tokens=4000)
logger.info(f" * ✓ AI API call completed")
# Log response details
if result.get('error'):
logger.error(f" * ✗ AI returned error: {result['error']}")
logger.error(f" * Error details: {result}")
continue
content = result.get('content', '')
if not content:
logger.warning(f" * ⚠️ No content in AI response")
logger.warning(f" * Response keys: {list(result.keys())}")
logger.warning(f" * Full response: {result}")
continue
# Log raw response
logger.info(f" * ✓ Raw content received: {len(content)} characters")
logger.info(f" * Response keys: {list(result.keys())}")
logger.info(f" * Input tokens: {result.get('input_tokens', 'N/A')}")
logger.info(f" * Output tokens: {result.get('output_tokens', 'N/A')}")
logger.info(f" * Total tokens: {result.get('tokens_used', result.get('total_tokens', 'N/A'))}")
logger.info(f" * Cost: ${result.get('cost', 'N/A')}")
logger.info(f" * Raw content preview (first 200 chars): {content[:200]}...")
# Update progress: Parsing content
add_step('PARSE', 'success', f"Processing content for '{task.title}'...", 'response')
parse_pct = 70 + int((idx / total_tasks) * 10) # 70-80% for parsing
self.update_state(
state='PROGRESS',
meta={
'current': idx + 1,
'total': total_tasks,
'percentage': parse_pct,
'message': f"Processing content for '{task.title}'...",
'phase': 'PARSE',
'current_item': task.title,
'request_steps': request_steps,
'response_steps': response_steps
}
)
# Normalize content from different AI response formats
logger.info(f" * Normalizing content (length: {len(content)} chars)...")
try:
from igny8_core.utils.content_normalizer import normalize_content
normalized = normalize_content(content)
normalized_content = normalized['normalized_content']
content_type = normalized['content_type']
has_structure = normalized['has_structure']
original_format = normalized['original_format']
logger.info(f" * ✓ Content normalized:")
logger.info(f" - Original format: {original_format}")
logger.info(f" - Content type: {content_type}")
logger.info(f" - Has structure: {has_structure}")
logger.info(f" - Normalized length: {len(normalized_content)} chars")
logger.info(f" - Normalized preview (first 200 chars): {normalized_content[:200]}...")
# Use normalized content
content = normalized_content
except Exception as norm_error:
logger.warning(f" * ⚠️ Content normalization failed: {type(norm_error).__name__}: {str(norm_error)}")
logger.warning(f" * Using original content as-is")
# Continue with original content
except Exception as ai_error:
logger.error(f" * ✗ EXCEPTION during AI API call: {type(ai_error).__name__}: {str(ai_error)}")
logger.error(f" * Task ID: {task.id}", exc_info=True)
continue
# Calculate word count from normalized content
# Remove HTML tags for word count
text_for_counting = re.sub(r'<[^>]+>', '', content)
word_count = len(text_for_counting.split())
logger.info(f" * ✓ Word count calculated: {word_count} words (from normalized HTML)")
# Update progress: Saving content
add_step('SAVE', 'success', f"Saving content for '{task.title}' ({word_count} words)...", 'request')
save_pct = 85 + int((idx / total_tasks) * 10) # 85-95% for saving
self.update_state(
state='PROGRESS',
meta={
'current': idx + 1,
'total': total_tasks,
'percentage': save_pct,
'message': f"Saving content for '{task.title}' ({word_count} words)...",
'phase': 'SAVE',
'current_item': task.title,
'request_steps': request_steps,
'response_steps': response_steps
}
)
# ========================================================================
# DATABASE SAVE PHASE - Detailed logging
# ========================================================================
logger.info(" - Saving content to database...")
try:
# Update task
logger.info(f" * Updating task {task.id} fields...")
task.content = content
logger.info(f" - content: {len(content)} chars")
task.word_count = word_count
logger.info(f" - word_count: {word_count}")
task.meta_title = task.title # Use title as meta title for now
logger.info(f" - meta_title: {task.title}")
task.meta_description = (task.description or '')[:160] # Truncate to 160 chars
logger.info(f" - meta_description: {len(task.meta_description)} chars")
old_status = task.status
task.status = 'draft' # Update status from queued to draft
logger.info(f" - status: {old_status}{task.status}")
# Log all fields being saved
logger.info(f" * Task fields to save:")
logger.info(f" - id: {task.id}")
logger.info(f" - title: {task.title}")
logger.info(f" - account_id: {task.account_id}")
logger.info(f" - site_id: {task.site_id}")
logger.info(f" - sector_id: {task.sector_id}")
logger.info(f" - cluster_id: {task.cluster_id}")
logger.info(f" - idea_id: {task.idea_id}")
logger.info(f" - content length: {len(task.content)}")
logger.info(f" - word_count: {task.word_count}")
# Save to database
logger.info(f" * Executing task.save()...")
task.save()
logger.info(f" * ✓ Task saved successfully to database")
# Mark save step as complete
add_step('SAVE', 'success', f"Content saved for '{task.title}'", 'response')
tasks_updated += 1
logger.info(f" * ✓ Task {task.id} content generation completed successfully")
except Exception as save_error:
logger.error("=" * 80)
logger.error(f"DATABASE SAVE ERROR for Task {task.id}")
logger.error(f" - Error type: {type(save_error).__name__}")
logger.error(f" - Error message: {str(save_error)}")
logger.error(f" - Task ID: {task.id}")
logger.error(f" - Task title: {task.title}")
logger.error(f" - Content length: {len(content) if content else 0}")
logger.error(f" - Word count: {word_count}")
logger.error("=" * 80, exc_info=True)
continue
logger.info("=" * 80)
logger.info(f"✓ TASK {task.id} PROCESSING COMPLETE")
logger.info("=" * 80)
# Final progress update - mark as DONE
final_message = f"Content generation complete: {tasks_updated} articles generated"
add_step('DONE', 'success', final_message, 'response')
logger.info("=" * 80)
logger.info(f"TASK COMPLETION SUMMARY")
logger.info(f" - Total tasks processed: {total_tasks}")
logger.info(f" - Tasks successfully updated: {tasks_updated}")
logger.info(f" - Tasks failed/skipped: {total_tasks - tasks_updated}")
logger.info("=" * 80)
# Update final state before returning
self.update_state(
state='SUCCESS',
meta={
'current': total_tasks,
'total': total_tasks,
'percentage': 100,
'message': final_message,
'phase': 'DONE',
'request_steps': request_steps,
'response_steps': response_steps,
'tasks_updated': tasks_updated
}
)
return {
'success': True,
'tasks_updated': tasks_updated,
'message': final_message,
}
except Exception as e:
# Import database error types for better error handling
from django.db import OperationalError, DatabaseError, IntegrityError
from django.core.exceptions import ValidationError
error_type = type(e).__name__
error_message = str(e)
logger.error("=" * 80)
logger.error("CRITICAL ERROR in auto_generate_content_task")
logger.error(f" - Error type: {error_type}")
logger.error(f" - Error message: {error_message}")
logger.error(f" - Task IDs: {task_ids}")
logger.error(f" - Account ID: {account_id}")
logger.error("=" * 80, exc_info=True)
# Update Celery task state with detailed error information
self.update_state(
state='FAILURE',
meta={
'error': error_message,
'error_type': error_type,
'message': f'Error: {error_message}',
'task_ids': task_ids,
'account_id': account_id
}
)
# Return error result instead of raising (for synchronous execution)
return {
'success': False,
'error': error_message,
'error_type': error_type,
'tasks_updated': 0
}
@shared_task(bind=True, max_retries=3)
def auto_generate_images_task(self, task_ids: List[int], account_id: int = None):
"""
Celery task to generate images for tasks using AI.
Sequential processing: Featured → Desktop → Mobile images
Args:
task_ids: List of task IDs
account_id: Account ID for account isolation
"""
try:
from igny8_core.auth.models import Account
# Initialize progress
self.update_state(
state='PROGRESS',
meta={
'current': 0,
'total': len(task_ids),
'percentage': 0,
'message': 'Initializing image generation...',
'phase': 'initializing'
}
)
# Get account (backward compatibility: account_id parameter)
account = None
if account_id:
try:
account = Account.objects.get(id=account_id)
except Account.DoesNotExist:
pass
# Get tasks
tasks_queryset = Tasks.objects.filter(id__in=task_ids)
if account:
tasks_queryset = tasks_queryset.filter(account=account)
tasks = list(tasks_queryset.select_related('account', 'sector', 'site'))
if not tasks:
logger.warning(f"No tasks found: {task_ids}")
return {'success': False, 'error': 'No tasks found'}
# Get image generation settings from IntegrationSettings
image_settings = {}
if account:
try:
from igny8_core.modules.system.models import IntegrationSettings
integration = IntegrationSettings.objects.get(
account=account,
integration_type='image_generation',
is_active=True
)
image_settings = integration.config or {}
except IntegrationSettings.DoesNotExist:
logger.warning("Image generation settings not found, using defaults")
# Extract settings
provider = image_settings.get('provider') or image_settings.get('service', 'openai')
# Get model based on provider
if provider == 'runware':
model = image_settings.get('model') or image_settings.get('runwareModel', 'runware:97@1')
else:
model = image_settings.get('model', 'dall-e-3')
image_type = image_settings.get('image_type', 'realistic')
max_in_article_images = int(image_settings.get('max_in_article_images', 2))
image_format = image_settings.get('image_format', 'webp')
desktop_enabled = image_settings.get('desktop_enabled', True)
mobile_enabled = image_settings.get('mobile_enabled', True)
total_tasks = len(tasks)
# Calculate total images to generate
# Each task: 1 featured + (desktop_enabled ? max_in_article_images : 0) + (mobile_enabled ? max_in_article_images : 0)
images_per_task = 1 + (max_in_article_images if desktop_enabled else 0) + (max_in_article_images if mobile_enabled else 0)
total_images = total_tasks * images_per_task
current_image = 0
images_created = 0
# Create AIProcessor instance
from igny8_core.utils.ai_processor import AIProcessor
processor = AIProcessor(account=account)
# Process each task sequentially
for task_idx, task in enumerate(tasks):
# Check if task has content
if not task.content:
logger.warning(f"Task {task.id} has no content, skipping image generation")
continue
# Update progress: Extracting image prompts (0-10% per task)
progress_pct = int((task_idx / total_tasks) * 10)
self.update_state(
state='PROGRESS',
meta={
'current': task_idx + 1,
'total': total_tasks,
'percentage': progress_pct,
'message': f"Extracting image prompts from '{task.title}' ({task_idx + 1} of {total_tasks})...",
'phase': 'extracting_prompts',
'current_item': task.title
}
)
# Extract image prompts from content
prompts_result = processor.extract_image_prompts(
content=task.content,
title=task.title,
max_images=max_in_article_images,
account=account
)
if prompts_result.get('error'):
logger.error(f"Error extracting prompts for task {task.id}: {prompts_result['error']}")
continue
featured_prompt = prompts_result.get('featured_prompt', '')
in_article_prompts = prompts_result.get('in_article_prompts', [])
# Get image prompt template
image_prompt_template = get_prompt_value(account, 'image_prompt_template') if account else None
if not image_prompt_template:
image_prompt_template = get_default_prompt('image_prompt_template')
# Get negative prompt
negative_prompt = get_prompt_value(account, 'negative_prompt') if account else None
if not negative_prompt:
negative_prompt = get_default_prompt('negative_prompt')
# Generate Featured Image (always)
current_image += 1
progress_pct = 10 + int((current_image / total_images) * 30)
self.update_state(
state='PROGRESS',
meta={
'current': current_image,
'total': total_images,
'percentage': progress_pct,
'message': f"Generating Featured Image for '{task.title}' ({task_idx + 1} of {total_tasks})...",
'phase': 'generating_featured',
'current_item': task.title
}
)
# Format featured image prompt
formatted_featured_prompt = image_prompt_template.format(
image_type=image_type,
post_title=task.title,
image_prompt=featured_prompt
)
# Generate featured image
# For Runware, pass model in kwargs
featured_kwargs = {}
if provider == 'runware':
featured_kwargs['model'] = model
featured_kwargs['negative_prompt'] = negative_prompt
featured_result = processor.generate_image(
prompt=formatted_featured_prompt,
provider=provider,
model=model if provider != 'runware' else None, # Model param for OpenAI, kwargs for Runware
size='1280x832', # Featured image size (fixed)
**featured_kwargs
)
if featured_result.get('url') and not featured_result.get('error'):
Images.objects.create(
task=task,
image_type='featured',
image_url=featured_result['url'],
prompt=featured_result.get('revised_prompt') or formatted_featured_prompt,
status='generated',
account=task.account,
site=task.site,
sector=task.sector,
)
images_created += 1
# Generate Desktop Images (if enabled)
if desktop_enabled and in_article_prompts:
for img_idx, img_prompt in enumerate(in_article_prompts):
current_image += 1
progress_pct = 40 + int((current_image / total_images) * 30)
self.update_state(
state='PROGRESS',
meta={
'current': current_image,
'total': total_images,
'percentage': progress_pct,
'message': f"Generating Desktop Image {img_idx + 1} of {len(in_article_prompts)} for '{task.title}'...",
'phase': 'generating_desktop',
'current_item': task.title
}
)
# Format desktop image prompt
formatted_img_prompt = image_prompt_template.format(
image_type=image_type,
post_title=task.title,
image_prompt=img_prompt
)
# Generate desktop image
desktop_kwargs = {}
if provider == 'runware':
desktop_kwargs['model'] = model
desktop_kwargs['negative_prompt'] = negative_prompt
desktop_result = processor.generate_image(
prompt=formatted_img_prompt,
provider=provider,
model=model if provider != 'runware' else None,
size='1024x1024', # Desktop image size (fixed)
**desktop_kwargs
)
if desktop_result.get('url') and not desktop_result.get('error'):
Images.objects.create(
task=task,
image_type='desktop',
image_url=desktop_result['url'],
prompt=desktop_result.get('revised_prompt') or formatted_img_prompt,
status='generated',
position=img_idx + 1,
account=task.account,
site=task.site,
sector=task.sector,
)
images_created += 1
# Generate Mobile Images (if enabled)
if mobile_enabled and in_article_prompts:
for img_idx, img_prompt in enumerate(in_article_prompts):
current_image += 1
progress_pct = 70 + int((current_image / total_images) * 25)
self.update_state(
state='PROGRESS',
meta={
'current': current_image,
'total': total_images,
'percentage': progress_pct,
'message': f"Generating Mobile Image {img_idx + 1} of {len(in_article_prompts)} for '{task.title}'...",
'phase': 'generating_mobile',
'current_item': task.title
}
)
# Format mobile image prompt
formatted_img_prompt = image_prompt_template.format(
image_type=image_type,
post_title=task.title,
image_prompt=img_prompt
)
# Generate mobile image
mobile_kwargs = {}
if provider == 'runware':
mobile_kwargs['model'] = model
mobile_kwargs['negative_prompt'] = negative_prompt
mobile_result = processor.generate_image(
prompt=formatted_img_prompt,
provider=provider,
model=model if provider != 'runware' else None,
size='960x1280', # Mobile image size (fixed)
**mobile_kwargs
)
if mobile_result.get('url') and not mobile_result.get('error'):
Images.objects.create(
task=task,
image_type='mobile',
image_url=mobile_result['url'],
prompt=mobile_result.get('revised_prompt') or formatted_img_prompt,
status='generated',
position=img_idx + 1,
account=task.account,
site=task.site,
sector=task.sector,
)
images_created += 1
# Update progress: Saving images (95-98%)
progress_pct = 95 + int((task_idx / total_tasks) * 3)
self.update_state(
state='PROGRESS',
meta={
'current': task_idx + 1,
'total': total_tasks,
'percentage': progress_pct,
'message': f"Saving images for '{task.title}'...",
'phase': 'saving',
'current_item': task.title
}
)
# Final progress update
final_message = f"Image generation complete: {images_created} images generated for {total_tasks} tasks"
logger.info(final_message)
return {
'success': True,
'images_created': images_created,
'message': final_message,
}
except Exception as e:
logger.error(f"Error in auto_generate_images_task: {str(e)}", exc_info=True)
self.update_state(
state='FAILURE',
meta={
'error': str(e),
'message': f'Error: {str(e)}'
}
)
raise