reaminign phase 1-2 tasks

This commit is contained in:
IGNY8 VPS (Salman)
2025-11-16 22:17:33 +00:00
parent 7f8982a0ab
commit 92f51859fe
2 changed files with 109 additions and 335 deletions

View File

@@ -17,6 +17,9 @@ from igny8_core.api.permissions import IsAuthenticatedAndActive, IsViewerOrAbove
from .models import Keywords, Clusters, ContentIdeas
from .serializers import KeywordSerializer, ContentIdeasSerializer
from .cluster_serializers import ClusterSerializer
from igny8_core.business.planning.services.clustering_service import ClusteringService
from igny8_core.business.planning.services.ideas_service import IdeasService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
@extend_schema_view(
@@ -568,64 +571,39 @@ class KeywordViewSet(SiteSectorModelViewSet):
@action(detail=False, methods=['post'], url_path='auto_cluster', url_name='auto_cluster')
def auto_cluster(self, request):
"""Auto-cluster keywords using AI - New unified framework"""
"""Auto-cluster keywords using ClusteringService"""
import logging
from igny8_core.ai.tasks import run_ai_task
from kombu.exceptions import OperationalError as KombuOperationalError
logger = logging.getLogger(__name__)
try:
keyword_ids = request.data.get('ids', [])
sector_id = request.data.get('sector_id')
# Get account
account = getattr(request, 'account', None)
account_id = account.id if account else None
# Prepare payload
payload = {
'ids': request.data.get('ids', []),
'sector_id': request.data.get('sector_id')
}
logger.info(f"auto_cluster called with ids={payload['ids']}, sector_id={payload.get('sector_id')}")
# Validate basic input
if not payload['ids']:
if not account:
return error_response(
error='No IDs provided',
error='Account is required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
if len(payload['ids']) > 20:
return error_response(
error='Maximum 20 keywords allowed for clustering',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Try to queue Celery task
# Use service to cluster keywords
service = ClusteringService()
try:
if hasattr(run_ai_task, 'delay'):
task = run_ai_task.delay(
function_name='auto_cluster',
payload=payload,
account_id=account_id
)
logger.info(f"Task queued: {task.id}")
result = service.cluster_keywords(keyword_ids, account, sector_id)
if result.get('success'):
if 'task_id' in result:
# Async task queued
return success_response(
data={'task_id': str(task.id)},
message='Clustering started',
data={'task_id': result['task_id']},
message=result.get('message', 'Clustering started'),
request=request
)
else:
# Celery not available - execute synchronously
logger.warning("Celery not available, executing synchronously")
result = run_ai_task(
function_name='auto_cluster',
payload=payload,
account_id=account_id
)
if result.get('success'):
# Synchronous execution
return success_response(
data=result,
request=request
@@ -636,23 +614,10 @@ class KeywordViewSet(SiteSectorModelViewSet):
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except (KombuOperationalError, ConnectionError) as e:
# Broker connection failed - fall back to synchronous execution
logger.warning(f"Celery broker unavailable, falling back to synchronous execution: {str(e)}")
result = run_ai_task(
function_name='auto_cluster',
payload=payload,
account_id=account_id
)
if result.get('success'):
return success_response(
data=result,
request=request
)
else:
except InsufficientCreditsError as e:
return error_response(
error=result.get('error', 'Clustering failed'),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
error=str(e),
status_code=status.HTTP_402_PAYMENT_REQUIRED,
request=request
)
except Exception as e:
@@ -843,63 +808,38 @@ class ClusterViewSet(SiteSectorModelViewSet):
@action(detail=False, methods=['post'], url_path='auto_generate_ideas', url_name='auto_generate_ideas')
def auto_generate_ideas(self, request):
"""Auto-generate ideas for clusters using AI - New unified framework"""
"""Auto-generate ideas for clusters using IdeasService"""
import logging
from igny8_core.ai.tasks import run_ai_task
from kombu.exceptions import OperationalError as KombuOperationalError
logger = logging.getLogger(__name__)
try:
cluster_ids = request.data.get('ids', [])
# Get account
account = getattr(request, 'account', None)
account_id = account.id if account else None
# Prepare payload
payload = {
'ids': request.data.get('ids', [])
}
logger.info(f"auto_generate_ideas called with ids={payload['ids']}")
# Validate basic input
if not payload['ids']:
if not account:
return error_response(
error='No IDs provided',
error='Account is required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
if len(payload['ids']) > 10:
return error_response(
error='Maximum 10 clusters allowed for idea generation',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Try to queue Celery task
# Use service to generate ideas
service = IdeasService()
try:
if hasattr(run_ai_task, 'delay'):
task = run_ai_task.delay(
function_name='auto_generate_ideas',
payload=payload,
account_id=account_id
)
logger.info(f"Task queued: {task.id}")
result = service.generate_ideas(cluster_ids, account)
if result.get('success'):
if 'task_id' in result:
# Async task queued
return success_response(
data={'task_id': str(task.id)},
message='Idea generation started',
data={'task_id': result['task_id']},
message=result.get('message', 'Idea generation started'),
request=request
)
else:
# Celery not available - execute synchronously
logger.warning("Celery not available, executing synchronously")
result = run_ai_task(
function_name='auto_generate_ideas',
payload=payload,
account_id=account_id
)
if result.get('success'):
# Synchronous execution
return success_response(
data=result,
request=request
@@ -910,23 +850,10 @@ class ClusterViewSet(SiteSectorModelViewSet):
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except (KombuOperationalError, ConnectionError) as e:
# Broker connection failed - fall back to synchronous execution
logger.warning(f"Celery broker unavailable, falling back to synchronous execution: {str(e)}")
result = run_ai_task(
function_name='auto_generate_ideas',
payload=payload,
account_id=account_id
)
if result.get('success'):
return success_response(
data=result,
request=request
)
else:
except InsufficientCreditsError as e:
return error_response(
error=result.get('error', 'Idea generation failed'),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
error=str(e),
status_code=status.HTTP_402_PAYMENT_REQUIRED,
request=request
)
except Exception as e:

View File

@@ -12,6 +12,8 @@ from igny8_core.api.throttles import DebugScopedRateThrottle
from igny8_core.api.permissions import IsAuthenticatedAndActive, IsViewerOrAbove, IsEditorOrAbove
from .models import Tasks, Images, Content
from .serializers import TasksSerializer, ImagesSerializer, ContentSerializer
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
@extend_schema_view(
@@ -137,17 +139,14 @@ class TasksViewSet(SiteSectorModelViewSet):
@action(detail=False, methods=['post'], url_path='auto_generate_content', url_name='auto_generate_content')
def auto_generate_content(self, request):
"""Auto-generate content for tasks using AI"""
"""Auto-generate content for tasks using ContentGenerationService"""
import logging
from django.db import OperationalError, DatabaseError, IntegrityError
from django.core.exceptions import ValidationError
logger = logging.getLogger(__name__)
try:
ids = request.data.get('ids', [])
if not ids:
logger.warning("auto_generate_content: No IDs provided")
return error_response(
error='No IDs provided',
status_code=status.HTTP_400_BAD_REQUEST,
@@ -155,233 +154,81 @@ class TasksViewSet(SiteSectorModelViewSet):
)
if len(ids) > 10:
logger.warning(f"auto_generate_content: Too many IDs provided: {len(ids)}")
return error_response(
error='Maximum 10 tasks allowed for content generation',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
logger.info(f"auto_generate_content: Processing {len(ids)} task IDs: {ids}")
# Get account
account = getattr(request, 'account', None)
account_id = account.id if account else None
logger.info(f"auto_generate_content: Account ID: {account_id}")
if not account:
return error_response(
error='Account is required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Validate task IDs exist in database before proceeding
try:
# Validate task IDs exist
queryset = self.get_queryset()
existing_tasks = queryset.filter(id__in=ids)
existing_tasks = queryset.filter(id__in=ids, account=account)
existing_count = existing_tasks.count()
existing_ids = list(existing_tasks.values_list('id', flat=True))
logger.info(f"auto_generate_content: Found {existing_count} existing tasks out of {len(ids)} requested")
logger.info(f"auto_generate_content: Existing task IDs: {existing_ids}")
if existing_count == 0:
logger.error(f"auto_generate_content: No tasks found for IDs: {ids}")
return error_response(
error=f'No tasks found for the provided IDs: {ids}',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
if existing_count < len(ids):
missing_ids = set(ids) - set(existing_ids)
logger.warning(f"auto_generate_content: Some task IDs not found: {missing_ids}")
# Continue with existing tasks, but log warning
except (OperationalError, DatabaseError) as db_error:
logger.error("=" * 80)
logger.error("DATABASE ERROR: Failed to query tasks")
logger.error(f" - Error type: {type(db_error).__name__}")
logger.error(f" - Error message: {str(db_error)}")
logger.error(f" - Requested IDs: {ids}")
logger.error(f" - Account ID: {account_id}")
logger.error("=" * 80, exc_info=True)
return error_response(
error=f'Database error while querying tasks: {str(db_error)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
# Try to queue Celery task, fall back to synchronous if Celery not available
# Use service to generate content
service = ContentGenerationService()
try:
from igny8_core.ai.tasks import run_ai_task
from kombu.exceptions import OperationalError as KombuOperationalError
result = service.generate_content(ids, account)
if hasattr(run_ai_task, 'delay'):
# Celery is available - queue async task
logger.info(f"auto_generate_content: Queuing Celery task for {len(ids)} tasks")
try:
task = run_ai_task.delay(
function_name='generate_content',
payload={'ids': ids},
account_id=account_id
)
logger.info(f"auto_generate_content: Celery task queued successfully: {task.id}")
return success_response(
data={'task_id': str(task.id)},
message='Content generation started',
request=request
)
except KombuOperationalError as celery_error:
logger.error("=" * 80)
logger.error("CELERY ERROR: Failed to queue task")
logger.error(f" - Error type: {type(celery_error).__name__}")
logger.error(f" - Error message: {str(celery_error)}")
logger.error(f" - Task IDs: {ids}")
logger.error(f" - Account ID: {account_id}")
logger.error("=" * 80, exc_info=True)
return error_response(
error='Task queue unavailable. Please try again.',
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
request=request
)
except Exception as celery_error:
logger.error("=" * 80)
logger.error("CELERY ERROR: Failed to queue task")
logger.error(f" - Error type: {type(celery_error).__name__}")
logger.error(f" - Error message: {str(celery_error)}")
logger.error(f" - Task IDs: {ids}")
logger.error(f" - Account ID: {account_id}")
logger.error("=" * 80, exc_info=True)
# Fall back to synchronous execution
logger.info("auto_generate_content: Falling back to synchronous execution")
result = run_ai_task(
function_name='generate_content',
payload={'ids': ids},
account_id=account_id
)
if result.get('success'):
if 'task_id' in result:
# Async task queued
return success_response(
data={'tasks_updated': result.get('count', 0)},
message='Content generated successfully (synchronous)',
data={'task_id': result['task_id']},
message=result.get('message', 'Content generation started'),
request=request
)
else:
return error_response(
error=result.get('error', 'Content generation failed'),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
else:
# Celery not available - execute synchronously
logger.info(f"auto_generate_content: Executing synchronously (Celery not available)")
result = run_ai_task(
function_name='generate_content',
payload={'ids': ids},
account_id=account_id
)
if result.get('success'):
logger.info(f"auto_generate_content: Synchronous execution successful: {result.get('count', 0)} tasks updated")
# Synchronous execution
return success_response(
data={'tasks_updated': result.get('count', 0)},
data=result,
message='Content generated successfully',
request=request
)
else:
logger.error(f"auto_generate_content: Synchronous execution failed: {result.get('error', 'Unknown error')}")
return error_response(
error=result.get('error', 'Content generation failed'),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except ImportError as import_error:
logger.error(f"auto_generate_content: ImportError - tasks module not available: {str(import_error)}")
# Tasks module not available - update status only
try:
queryset = self.get_queryset()
tasks = queryset.filter(id__in=ids, status='queued')
updated_count = tasks.update(status='completed', content='[AI content generation not available]')
logger.info(f"auto_generate_content: Updated {updated_count} tasks (AI generation not available)")
return success_response(
data={'updated_count': updated_count},
message='Tasks updated (AI generation not available)',
except InsufficientCreditsError as e:
return error_response(
error=str(e),
status_code=status.HTTP_402_PAYMENT_REQUIRED,
request=request
)
except (OperationalError, DatabaseError) as db_error:
logger.error("=" * 80)
logger.error("DATABASE ERROR: Failed to update tasks")
logger.error(f" - Error type: {type(db_error).__name__}")
logger.error(f" - Error message: {str(db_error)}")
logger.error("=" * 80, exc_info=True)
except Exception as e:
logger.error(f"Error in auto_generate_content: {str(e)}", exc_info=True)
return error_response(
error=f'Database error while updating tasks: {str(db_error)}',
error=f'Content generation failed: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except (OperationalError, DatabaseError) as db_error:
logger.error("=" * 80)
logger.error("DATABASE ERROR: Failed during task execution")
logger.error(f" - Error type: {type(db_error).__name__}")
logger.error(f" - Error message: {str(db_error)}")
logger.error(f" - Task IDs: {ids}")
logger.error(f" - Account ID: {account_id}")
logger.error("=" * 80, exc_info=True)
return error_response(
error=f'Database error during content generation: {str(db_error)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except IntegrityError as integrity_error:
logger.error("=" * 80)
logger.error("INTEGRITY ERROR: Data integrity violation")
logger.error(f" - Error message: {str(integrity_error)}")
logger.error(f" - Task IDs: {ids}")
logger.error("=" * 80, exc_info=True)
return error_response(
error=f'Data integrity error: {str(integrity_error)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except ValidationError as validation_error:
logger.error(f"auto_generate_content: ValidationError: {str(validation_error)}")
return error_response(
error=f'Validation error: {str(validation_error)}',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
except Exception as e:
logger.error("=" * 80)
logger.error("UNEXPECTED ERROR in auto_generate_content")
logger.error(f" - Error type: {type(e).__name__}")
logger.error(f" - Error message: {str(e)}")
logger.error(f" - Task IDs: {ids}")
logger.error(f" - Account ID: {account_id}")
logger.error("=" * 80, exc_info=True)
logger.error(f"Unexpected error in auto_generate_content: {str(e)}", exc_info=True)
return error_response(
error=f'Unexpected error: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except Exception as outer_error:
logger.error("=" * 80)
logger.error("CRITICAL ERROR: Outer exception handler")
logger.error(f" - Error type: {type(outer_error).__name__}")
logger.error(f" - Error message: {str(outer_error)}")
logger.error("=" * 80, exc_info=True)
return error_response(
error=f'Critical error: {str(outer_error)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@extend_schema_view(
list=extend_schema(tags=['Writer']),