Files
igny8/backend/igny8_core/modules/planner/tasks.py
2025-11-10 00:18:50 +05:00

736 lines
32 KiB
Python

"""
Celery tasks for Planner module - AI clustering and idea generation
"""
import logging
import time
from typing import List
from django.db import transaction
from igny8_core.modules.planner.models import Keywords, Clusters, ContentIdeas
from igny8_core.utils.ai_processor import ai_processor
from igny8_core.ai.tracker import ConsoleStepTracker
logger = logging.getLogger(__name__)
# Try to import Celery, fall back to synchronous execution if not available
try:
from celery import shared_task
CELERY_AVAILABLE = True
except ImportError:
CELERY_AVAILABLE = False
# Create a mock decorator for synchronous execution
def shared_task(*args, **kwargs):
def decorator(func):
return func
return decorator
# ============================================================================
# DEPRECATED: This function is deprecated. Use the new AI framework instead.
# New path: views.py -> run_ai_task -> AIEngine -> AutoClusterFunction
# This function is kept for backward compatibility but should not be used.
# ============================================================================
def _auto_cluster_keywords_core(keyword_ids: List[int], sector_id: int = None, account_id: int = None, progress_callback=None):
"""
[DEPRECATED] Core logic for clustering keywords. Can be called with or without Celery.
⚠️ WARNING: This function is deprecated. Use the new AI framework instead:
- New path: views.py -> run_ai_task -> AIEngine -> AutoClusterFunction
- This function uses the old AIProcessor and does not use PromptRegistry
- Console logging may not work correctly in this path
Args:
keyword_ids: List of keyword IDs to cluster
sector_id: Sector ID for the keywords
account_id: Account ID for account isolation
progress_callback: Optional function to call for progress updates (for Celery tasks)
"""
# Initialize console step tracker for logging
tracker = ConsoleStepTracker('auto_cluster')
tracker.init(f"Starting keyword clustering for {len(keyword_ids)} keywords")
# Track request and response steps (for Celery progress callbacks)
request_steps = []
response_steps = []
try:
from igny8_core.auth.models import Sector
# Initialize progress if callback provided
if progress_callback:
progress_callback(
state='PROGRESS',
meta={
'current': 0,
'total': len(keyword_ids),
'percentage': 0,
'message': 'Initializing keyword clustering...',
'phase': 'initializing',
'request_steps': request_steps,
'response_steps': response_steps
}
)
# Step 4: Keyword Loading & Validation
tracker.prep(f"Loading {len(keyword_ids)} keywords from database")
step_start = time.time()
keywords_queryset = Keywords.objects.filter(id__in=keyword_ids)
if account_id:
keywords_queryset = keywords_queryset.filter(account_id=account_id)
if sector_id:
keywords_queryset = keywords_queryset.filter(sector_id=sector_id)
keywords = list(keywords_queryset.select_related('account', 'site', 'site__account', 'sector', 'sector__site'))
if not keywords:
error_msg = f"No keywords found for clustering: {keyword_ids}"
logger.warning(error_msg)
tracker.error('Validation', error_msg)
request_steps.append({
'stepNumber': 4,
'stepName': 'Keyword Loading & Validation',
'functionName': '_auto_cluster_keywords_core',
'status': 'error',
'message': 'No keywords found',
'error': 'No keywords found',
'duration': int((time.time() - step_start) * 1000)
})
if progress_callback:
progress_callback(
state='PROGRESS',
meta={'request_steps': request_steps, 'response_steps': response_steps}
)
return {'success': False, 'error': 'No keywords found', 'request_steps': request_steps, 'response_steps': response_steps}
tracker.prep(f"Loaded {len(keywords)} keywords successfully")
request_steps.append({
'stepNumber': 4,
'stepName': 'Keyword Loading & Validation',
'functionName': '_auto_cluster_keywords_core',
'status': 'success',
'message': f'Loaded {len(keywords)} keywords',
'duration': int((time.time() - step_start) * 1000)
})
total_keywords = len(keywords)
# Step 5: Relationship Validation
step_start = time.time()
try:
first_keyword = keywords[0]
account = getattr(first_keyword, 'account', None)
site = getattr(first_keyword, 'site', None)
# If account is None, try to get it from site
if not account and site:
try:
account = getattr(site, 'account', None)
except Exception:
pass
sector = getattr(first_keyword, 'sector', None)
# If site is None, try to get it from sector
if not site and sector:
try:
site = getattr(sector, 'site', None)
except Exception:
pass
except Exception as e:
logger.error(f"Error accessing keyword relationships: {str(e)}")
request_steps.append({
'stepNumber': 5,
'stepName': 'Relationship Validation',
'functionName': '_auto_cluster_keywords_core',
'status': 'error',
'message': f'Error accessing relationships: {str(e)}',
'error': str(e),
'duration': int((time.time() - step_start) * 1000)
})
if progress_callback:
progress_callback(
state='PROGRESS',
meta={'request_steps': request_steps, 'response_steps': response_steps}
)
return {'success': False, 'error': f'Invalid keyword data: {str(e)}', 'request_steps': request_steps, 'response_steps': response_steps}
if not account:
logger.error(f"No account found for keywords: {keyword_ids}. Keyword site: {getattr(first_keyword, 'site', None)}, Keyword account: {getattr(first_keyword, 'account', None)}")
request_steps.append({
'stepNumber': 5,
'stepName': 'Relationship Validation',
'functionName': '_auto_cluster_keywords_core',
'status': 'error',
'message': 'No account found',
'error': 'No account found for keywords',
'duration': int((time.time() - step_start) * 1000)
})
if progress_callback:
progress_callback(
state='PROGRESS',
meta={'request_steps': request_steps, 'response_steps': response_steps}
)
return {'success': False, 'error': 'No account found for keywords. Please ensure keywords are properly associated with a site and account.', 'request_steps': request_steps, 'response_steps': response_steps}
if not site:
logger.error(f"No site found for keywords: {keyword_ids}. Keyword site: {getattr(first_keyword, 'site', None)}, Sector site: {getattr(sector, 'site', None) if sector else None}")
request_steps.append({
'stepNumber': 5,
'stepName': 'Relationship Validation',
'functionName': '_auto_cluster_keywords_core',
'status': 'error',
'message': 'No site found',
'error': 'No site found for keywords',
'duration': int((time.time() - step_start) * 1000)
})
if progress_callback:
progress_callback(
state='PROGRESS',
meta={'request_steps': request_steps, 'response_steps': response_steps}
)
return {'success': False, 'error': 'No site found for keywords. Please ensure keywords are properly associated with a site.', 'request_steps': request_steps, 'response_steps': response_steps}
request_steps.append({
'stepNumber': 5,
'stepName': 'Relationship Validation',
'functionName': '_auto_cluster_keywords_core',
'status': 'success',
'message': f'Account: {account.id if account else None}, Site: {site.id if site else None}, Sector: {sector.id if sector else None}',
'duration': int((time.time() - step_start) * 1000)
})
# Update progress: Analyzing keywords (0-40%)
if progress_callback:
progress_callback(
state='PROGRESS',
meta={
'current': 0,
'total': total_keywords,
'percentage': 5,
'message': f'Preparing to analyze {total_keywords} keywords...',
'phase': 'preparing',
'request_steps': request_steps,
'response_steps': response_steps
}
)
# Get sector name if available
sector_name = sector.name if sector else None
# Format keywords for AI
keyword_data = [
{
'keyword': kw.keyword,
'volume': kw.volume,
'difficulty': kw.difficulty,
'intent': kw.intent,
}
for kw in keywords
]
# Update progress: Sending to AI (10-40%)
if progress_callback:
progress_callback(
state='PROGRESS',
meta={
'current': 0,
'total': total_keywords,
'percentage': 10,
'message': 'Analyzing keyword relationships with AI...',
'phase': 'analyzing',
'request_steps': request_steps,
'response_steps': response_steps
}
)
# Step 6: AIProcessor Creation
step_start = time.time()
from igny8_core.utils.ai_processor import AIProcessor
try:
# Log account info for debugging
account_id = account.id if account else None
account_name = account.name if account else None
logger.info(f"Creating AIProcessor with account: id={account_id}, name={account_name}")
processor = AIProcessor(account=account)
# Log API key status
has_api_key = bool(processor.openai_api_key)
api_key_preview = processor.openai_api_key[:10] + "..." if processor.openai_api_key else "None"
logger.info(f"AIProcessor created. Has API key: {has_api_key}, Preview: {api_key_preview}, Model: {processor.default_model}")
request_steps.append({
'stepNumber': 6,
'stepName': 'AIProcessor Creation',
'functionName': '_auto_cluster_keywords_core',
'status': 'success',
'message': f'AIProcessor created with account context (Account ID: {account_id}, Has API Key: {has_api_key})',
'duration': int((time.time() - step_start) * 1000)
})
except Exception as e:
logger.error(f"Error creating AIProcessor: {type(e).__name__}: {str(e)}", exc_info=True)
request_steps.append({
'stepNumber': 6,
'stepName': 'AIProcessor Creation',
'functionName': '_auto_cluster_keywords_core',
'status': 'error',
'message': f'Error creating AIProcessor: {str(e)}',
'error': str(e),
'duration': int((time.time() - step_start) * 1000)
})
if progress_callback:
progress_callback(
state='PROGRESS',
meta={'request_steps': request_steps, 'response_steps': response_steps}
)
return {'success': False, 'error': f'Error creating AIProcessor: {str(e)}', 'request_steps': request_steps, 'response_steps': response_steps}
# Step 7: AI Call Preparation
step_start = time.time()
try:
# Check if API key is available
if not processor.openai_api_key:
# Try to debug why API key is missing
logger.error(f"OpenAI API key not found for account {account.id if account else None}")
# Check IntegrationSettings directly
try:
from igny8_core.modules.system.models import IntegrationSettings
settings_obj = IntegrationSettings.objects.filter(
integration_type='openai',
account=account,
is_active=True
).first()
if settings_obj:
logger.error(f"IntegrationSettings found but API key missing. Config keys: {list(settings_obj.config.keys()) if settings_obj.config else 'None'}")
else:
logger.error(f"No IntegrationSettings found for account {account.id if account else None}, integration_type='openai', is_active=True")
except Exception as debug_error:
logger.error(f"Error checking IntegrationSettings: {str(debug_error)}", exc_info=True)
request_steps.append({
'stepNumber': 7,
'stepName': 'AI Call Preparation',
'functionName': '_auto_cluster_keywords_core',
'status': 'error',
'message': 'OpenAI API key not configured',
'error': 'OpenAI API key not configured',
'duration': int((time.time() - step_start) * 1000)
})
if progress_callback:
progress_callback(
state='PROGRESS',
meta={'request_steps': request_steps, 'response_steps': response_steps}
)
return {'success': False, 'error': 'OpenAI API key not configured', 'request_steps': request_steps, 'response_steps': response_steps}
request_steps.append({
'stepNumber': 7,
'stepName': 'AI Call Preparation',
'functionName': '_auto_cluster_keywords_core',
'status': 'success',
'message': f'Prepared {len(keyword_data)} keywords for AI analysis',
'duration': int((time.time() - step_start) * 1000)
})
except Exception as e:
request_steps.append({
'stepNumber': 7,
'stepName': 'AI Call Preparation',
'functionName': '_auto_cluster_keywords_core',
'status': 'error',
'message': f'Error preparing AI call: {str(e)}',
'error': str(e),
'duration': int((time.time() - step_start) * 1000)
})
if progress_callback:
progress_callback(
state='PROGRESS',
meta={'request_steps': request_steps, 'response_steps': response_steps}
)
return {'success': False, 'error': f'Error preparing AI call: {str(e)}', 'request_steps': request_steps, 'response_steps': response_steps}
# Call AI with step tracking
tracker.ai_call(f"Sending {len(keyword_data)} keywords to AI for clustering")
result = processor.cluster_keywords(
keyword_data,
sector_name=sector_name,
account=account,
response_steps=response_steps,
progress_callback=progress_callback,
tracker=tracker # Pass tracker for console logging
)
if result.get('error'):
error_msg = f"AI clustering error: {result['error']}"
logger.error(error_msg)
tracker.error('AI_CALL', error_msg)
if progress_callback:
progress_callback(
state='FAILURE',
meta={
'error': result['error'],
'message': f"Error: {result['error']}",
'request_steps': request_steps,
'response_steps': response_steps
}
)
return {'success': False, 'error': result['error'], 'request_steps': request_steps, 'response_steps': response_steps}
# Parse response
tracker.parse("Parsing AI response into cluster data")
# Update response_steps from result if available
if result.get('response_steps'):
response_steps.extend(result.get('response_steps', []))
# Update progress: Creating clusters (40-90%)
clusters_data = result.get('clusters', [])
if progress_callback:
progress_callback(
state='PROGRESS',
meta={
'current': 0,
'total': total_keywords,
'percentage': 40,
'message': f'Creating {len(clusters_data)} clusters...',
'phase': 'creating_clusters',
'request_steps': request_steps,
'response_steps': response_steps
}
)
clusters_created = 0
keywords_updated = 0
# Step 13: Database Transaction Start
tracker.save(f"Creating {len(clusters_data)} clusters in database")
step_start = time.time()
# Create/update clusters and assign keywords
# Note: account and sector are already extracted above to avoid database queries inside transaction
with transaction.atomic():
if response_steps is not None:
response_steps.append({
'stepNumber': 13,
'stepName': 'Database Transaction Start',
'functionName': '_auto_cluster_keywords_core',
'status': 'success',
'message': 'Transaction started',
'duration': int((time.time() - step_start) * 1000)
})
# Step 14: Cluster Creation/Update
cluster_step_start = time.time()
for idx, cluster_data in enumerate(clusters_data):
cluster_name = cluster_data.get('name', '')
cluster_keywords = cluster_data.get('keywords', [])
if not cluster_name or not cluster_keywords:
continue
# Update progress for each cluster
if progress_callback:
progress_pct = 40 + int((idx / len(clusters_data)) * 50)
progress_callback(
state='PROGRESS',
meta={
'current': idx + 1,
'total': len(clusters_data),
'percentage': progress_pct,
'message': f"Creating cluster '{cluster_name}' ({idx + 1} of {len(clusters_data)})...",
'phase': 'creating_clusters',
'current_item': cluster_name,
'request_steps': request_steps,
'response_steps': response_steps
}
)
# Get or create cluster
# Note: Clusters model (SiteSectorBaseModel) requires both site and sector
# Ensure site is always set (can be from sector.site if sector exists)
cluster_site = site if site else (sector.site if sector and hasattr(sector, 'site') else None)
if not cluster_site:
logger.error(f"Cannot create cluster '{cluster_name}': No site available. Keywords: {keyword_ids}")
continue
if sector:
cluster, created = Clusters.objects.get_or_create(
name=cluster_name,
account=account,
site=cluster_site,
sector=sector,
defaults={
'description': cluster_data.get('description', ''),
'status': 'active',
}
)
else:
# If no sector, create cluster without sector filter but still require site
cluster, created = Clusters.objects.get_or_create(
name=cluster_name,
account=account,
site=cluster_site,
sector__isnull=True,
defaults={
'description': cluster_data.get('description', ''),
'status': 'active',
'sector': None,
}
)
if created:
clusters_created += 1
# Step 15: Keyword Matching & Assignment
kw_step_start = time.time()
# Assign keywords to cluster
# Match keywords by keyword string (case-insensitive) from the already-loaded keywords list
# Also create a mapping for fuzzy matching (handles minor variations)
matched_keyword_objects = []
unmatched_keywords = []
# Create normalized versions for exact matching
cluster_keywords_normalized = {}
for kw in cluster_keywords:
normalized = kw.strip().lower()
cluster_keywords_normalized[normalized] = kw.strip() # Keep original for logging
# Create a mapping of all available keywords (normalized)
available_keywords_normalized = {
kw_obj.keyword.strip().lower(): kw_obj
for kw_obj in keywords
}
# First pass: exact matches (case-insensitive)
for cluster_kw_normalized, cluster_kw_original in cluster_keywords_normalized.items():
if cluster_kw_normalized in available_keywords_normalized:
matched_keyword_objects.append(available_keywords_normalized[cluster_kw_normalized])
else:
unmatched_keywords.append(cluster_kw_original)
# Log unmatched keywords for debugging
if unmatched_keywords:
logger.warning(
f"Some keywords in cluster '{cluster_name}' were not matched: {unmatched_keywords}. "
f"Available keywords: {[kw.keyword for kw in keywords]}"
)
# Update matched keywords
if matched_keyword_objects:
matched_ids = [kw.id for kw in matched_keyword_objects]
# Rebuild queryset inside transaction to avoid database connection issues
# Handle sector=None case
keyword_filter = Keywords.objects.filter(
id__in=matched_ids,
account=account
)
if sector:
keyword_filter = keyword_filter.filter(sector=sector)
else:
keyword_filter = keyword_filter.filter(sector__isnull=True)
updated_count = keyword_filter.update(
cluster=cluster,
status='mapped' # Update status from pending to mapped
)
keywords_updated += updated_count
# Log steps 14 and 15 after all clusters are processed
if response_steps is not None:
response_steps.append({
'stepNumber': 14,
'stepName': 'Cluster Creation/Update',
'functionName': '_auto_cluster_keywords_core',
'status': 'success',
'message': f'Created/updated {clusters_created} clusters',
'duration': int((time.time() - cluster_step_start) * 1000)
})
response_steps.append({
'stepNumber': 15,
'stepName': 'Keyword Matching & Assignment',
'functionName': '_auto_cluster_keywords_core',
'status': 'success',
'message': f'Assigned {keywords_updated} keywords to clusters',
'duration': 0 # Duration already included in step 14
})
# Step 16: Metrics Recalculation & Commit
step_start = time.time()
# Update progress: Recalculating metrics (90-95%)
if progress_callback:
progress_callback(
state='PROGRESS',
meta={
'current': clusters_created,
'total': clusters_created,
'percentage': 90,
'message': 'Recalculating cluster metrics...',
'phase': 'finalizing',
'request_steps': request_steps,
'response_steps': response_steps
}
)
# Recalculate cluster metrics
from django.db.models import Sum
cluster_filter = Clusters.objects.filter(account=account)
if sector:
cluster_filter = cluster_filter.filter(sector=sector)
else:
cluster_filter = cluster_filter.filter(sector__isnull=True)
for cluster in cluster_filter:
cluster.keywords_count = Keywords.objects.filter(cluster=cluster).count()
volume_sum = Keywords.objects.filter(cluster=cluster).aggregate(
total=Sum('volume')
)['total']
cluster.volume = volume_sum or 0
cluster.save()
# Transaction commits here automatically
if response_steps is not None:
response_steps.append({
'stepNumber': 16,
'stepName': 'Metrics Recalculation & Commit',
'functionName': '_auto_cluster_keywords_core',
'status': 'success',
'message': f'Recalculated metrics for {cluster_filter.count()} clusters, transaction committed',
'duration': int((time.time() - step_start) * 1000)
})
# Final progress update
final_message = f"Clustering complete: {clusters_created} clusters created, {keywords_updated} keywords updated"
logger.info(final_message)
tracker.done(final_message)
if progress_callback:
progress_callback(
state='SUCCESS',
meta={
'message': final_message,
'request_steps': request_steps,
'response_steps': response_steps
}
)
return {
'success': True,
'clusters_created': clusters_created,
'keywords_updated': keywords_updated,
'message': final_message,
'request_steps': request_steps,
'response_steps': response_steps,
}
except Exception as e:
error_msg = f"Error in auto_cluster_keywords_core: {str(e)}"
logger.error(error_msg, exc_info=True)
tracker.error('Exception', error_msg, exception=e)
if progress_callback:
progress_callback(
state='FAILURE',
meta={
'error': str(e),
'message': f'Error: {str(e)}',
'request_steps': request_steps,
'response_steps': response_steps
}
)
return {
'success': False,
'error': str(e),
'request_steps': request_steps,
'response_steps': response_steps
}
@shared_task(bind=True, max_retries=3)
# ============================================================================
# DEPRECATED: This Celery task is deprecated. Use run_ai_task instead.
# New path: views.py -> run_ai_task -> AIEngine -> AutoClusterFunction
# ============================================================================
def auto_cluster_keywords_task(self, keyword_ids: List[int], sector_id: int = None, account_id: int = None):
"""
[DEPRECATED] Celery task wrapper for clustering keywords using AI.
⚠️ WARNING: This task is deprecated. Use the new AI framework instead:
- New path: views.py -> run_ai_task -> AIEngine -> AutoClusterFunction
- This task uses the old _auto_cluster_keywords_core function
- Console logging may not work correctly in this path
Args:
keyword_ids: List of keyword IDs to cluster
sector_id: Sector ID for the keywords
account_id: Account ID for account isolation
"""
logger.info("=" * 80)
logger.info("auto_cluster_keywords_task STARTED")
logger.info(f" - Task ID: {self.request.id}")
logger.info(f" - keyword_ids: {keyword_ids}")
logger.info(f" - sector_id: {sector_id}")
logger.info(f" - account_id: {account_id}")
logger.info("=" * 80)
# Initialize request_steps and response_steps for error reporting
request_steps = []
response_steps = []
def progress_callback(state, meta):
# Capture request_steps and response_steps from meta if available
nonlocal request_steps, response_steps
if isinstance(meta, dict):
if 'request_steps' in meta:
request_steps = meta['request_steps']
if 'response_steps' in meta:
response_steps = meta['response_steps']
self.update_state(state=state, meta=meta)
try:
result = _auto_cluster_keywords_core(keyword_ids, sector_id, account_id, progress_callback)
logger.info(f"auto_cluster_keywords_task COMPLETED: {result}")
return result
except Exception as e:
error_type = type(e).__name__
error_msg = str(e)
# Log full error details
logger.error("=" * 80)
logger.error(f"auto_cluster_keywords_task FAILED: {error_type}: {error_msg}")
logger.error(f" - Task ID: {self.request.id}")
logger.error(f" - keyword_ids: {keyword_ids}")
logger.error(f" - sector_id: {sector_id}")
logger.error(f" - account_id: {account_id}")
logger.error("=" * 80, exc_info=True)
# Create detailed error dict that Celery can serialize
error_dict = {
'error': error_msg,
'error_type': error_type,
'error_class': error_type,
'message': f'{error_type}: {error_msg}',
'request_steps': request_steps,
'response_steps': response_steps,
'task_id': str(self.request.id),
'keyword_ids': keyword_ids,
'sector_id': sector_id,
'account_id': account_id
}
# Update task state with detailed error
try:
self.update_state(
state='FAILURE',
meta=error_dict
)
except Exception as update_error:
# If update_state fails, log it but continue
logger.error(f"Failed to update task state: {str(update_error)}")
# Return error result
return error_dict
# REMOVED: All idea generation functions removed
# - auto_generate_ideas_task
# - _generate_single_idea_core
# - generate_single_idea_task