Files
igny8/backend/igny8_core/modules/writer/views.py
IGNY8 VPS (Salman) cbb32b1c9d filter fixes
2026-01-19 10:49:01 +00:00

2492 lines
102 KiB
Python

from rest_framework import viewsets, filters, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
import django_filters
from django.db import transaction, models
from django.db.models import Q
from drf_spectacular.utils import extend_schema, extend_schema_view
from igny8_core.api.base import SiteSectorModelViewSet
from igny8_core.api.pagination import CustomPageNumberPagination
from igny8_core.api.response import success_response, error_response
from igny8_core.api.throttles import DebugScopedRateThrottle
from igny8_core.api.permissions import IsAuthenticatedAndActive, IsViewerOrAbove, IsEditorOrAbove
from .models import Tasks, Images, Content
from .serializers import (
TasksSerializer,
ImagesSerializer,
ContentSerializer,
ContentTaxonomySerializer,
)
from igny8_core.business.content.models import ContentTaxonomy # ContentAttribute model exists but serializer removed in Stage 1
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
from igny8_core.business.content.services.validation_service import ContentValidationService
from igny8_core.business.content.services.metadata_mapping_service import MetadataMappingService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
# Custom FilterSets with date range filtering support
class TasksFilter(django_filters.FilterSet):
"""Custom filter for Tasks with date range support.
Uses CharFilter for content_type and content_structure to accept any value
(database may have legacy values not in current model choices).
"""
created_at__gte = django_filters.IsoDateTimeFilter(field_name='created_at', lookup_expr='gte')
created_at__lte = django_filters.IsoDateTimeFilter(field_name='created_at', lookup_expr='lte')
content_type = django_filters.CharFilter(field_name='content_type')
content_structure = django_filters.CharFilter(field_name='content_structure')
class Meta:
model = Tasks
fields = ['status', 'cluster_id', 'content_type', 'content_structure', 'created_at__gte', 'created_at__lte']
class ImagesFilter(django_filters.FilterSet):
"""Custom filter for Images with date range support"""
created_at__gte = django_filters.IsoDateTimeFilter(field_name='created_at', lookup_expr='gte')
created_at__lte = django_filters.IsoDateTimeFilter(field_name='created_at', lookup_expr='lte')
class Meta:
model = Images
fields = ['task_id', 'content_id', 'image_type', 'status', 'created_at__gte', 'created_at__lte']
class ContentFilter(django_filters.FilterSet):
"""Custom filter for Content with date range support.
Uses CharFilter for content_type and content_structure to accept any value
(database may have legacy values not in current model choices).
"""
created_at__gte = django_filters.IsoDateTimeFilter(field_name='created_at', lookup_expr='gte')
created_at__lte = django_filters.IsoDateTimeFilter(field_name='created_at', lookup_expr='lte')
content_type = django_filters.CharFilter(field_name='content_type')
content_structure = django_filters.CharFilter(field_name='content_structure')
class Meta:
model = Content
fields = ['cluster_id', 'site_id', 'status', 'site_status', 'content_type', 'content_structure', 'source', 'created_at__gte', 'created_at__lte']
@extend_schema_view(
list=extend_schema(tags=['Writer']),
create=extend_schema(tags=['Writer']),
retrieve=extend_schema(tags=['Writer']),
update=extend_schema(tags=['Writer']),
partial_update=extend_schema(tags=['Writer']),
destroy=extend_schema(tags=['Writer']),
)
class TasksViewSet(SiteSectorModelViewSet):
"""
ViewSet for managing tasks with CRUD operations
Unified API Standard v1.0 compliant
Stage 1 Refactored - removed deprecated filters
"""
queryset = Tasks.objects.select_related('cluster', 'site', 'sector')
serializer_class = TasksSerializer
permission_classes = [IsAuthenticatedAndActive, IsViewerOrAbove]
pagination_class = CustomPageNumberPagination # Explicitly use custom pagination
throttle_scope = 'writer'
throttle_classes = [DebugScopedRateThrottle]
# DRF filtering configuration
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
# Search configuration
search_fields = ['title', 'keywords']
# Ordering configuration
ordering_fields = ['title', 'created_at', 'status']
ordering = ['-created_at'] # Default ordering (newest first)
# Filter configuration - use custom filterset for date range filtering
filterset_class = TasksFilter
def perform_create(self, serializer):
"""Require explicit site_id and sector_id - no defaults."""
user = getattr(self.request, 'user', None)
try:
query_params = getattr(self.request, 'query_params', None)
if query_params is None:
query_params = getattr(self.request, 'GET', {})
except AttributeError:
query_params = {}
site_id = serializer.validated_data.get('site_id') or query_params.get('site_id')
sector_id = serializer.validated_data.get('sector_id') or query_params.get('sector_id')
from igny8_core.auth.models import Site, Sector
from rest_framework.exceptions import ValidationError
# Site ID is REQUIRED
if not site_id:
raise ValidationError("site_id is required. Please select a site.")
try:
site = Site.objects.get(id=site_id)
except Site.DoesNotExist:
raise ValidationError(f"Site with id {site_id} does not exist")
# Sector ID is REQUIRED
if not sector_id:
raise ValidationError("sector_id is required. Please select a sector.")
try:
sector = Sector.objects.get(id=sector_id)
if sector.site_id != site_id:
raise ValidationError(f"Sector does not belong to the selected site")
except Sector.DoesNotExist:
raise ValidationError(f"Sector with id {sector_id} does not exist")
serializer.validated_data.pop('site_id', None)
serializer.validated_data.pop('sector_id', None)
account = getattr(self.request, 'account', None)
if not account and user and user.is_authenticated and user.account:
account = user.account
if not account:
account = site.account
serializer.save(account=account, site=site, sector=sector)
@action(detail=False, methods=['POST'], url_path='bulk_delete', url_name='bulk_delete')
def bulk_delete(self, request):
"""Bulk delete tasks"""
ids = request.data.get('ids', [])
if not ids:
return error_response(
error='No IDs provided',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
queryset = self.get_queryset()
items_to_delete = queryset.filter(id__in=ids)
deleted_count = items_to_delete.count()
items_to_delete.delete() # Soft delete via SoftDeletableModel
return success_response(data={'deleted_count': deleted_count}, request=request)
@action(detail=False, methods=['post'], url_path='bulk_update', url_name='bulk_update')
def bulk_update(self, request):
"""Bulk update task status"""
ids = request.data.get('ids', [])
status_value = request.data.get('status')
if not ids:
return error_response(
error='No IDs provided',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
if not status_value:
return error_response(
error='No status provided',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
queryset = self.get_queryset()
updated_count = queryset.filter(id__in=ids).update(status=status_value)
return success_response(data={'updated_count': updated_count}, request=request)
@action(detail=False, methods=['post'], url_path='auto_generate_content', url_name='auto_generate_content')
def auto_generate_content(self, request):
"""Auto-generate content for tasks using ContentGenerationService"""
import logging
logger = logging.getLogger(__name__)
try:
ids = request.data.get('ids', [])
if not ids:
return error_response(
error='No IDs provided',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
if len(ids) > 10:
return error_response(
error='Maximum 10 tasks allowed for content generation',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Get account
account = getattr(request, 'account', None)
if not account:
return error_response(
error='Account is required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Validate task IDs exist
queryset = self.get_queryset()
existing_tasks = queryset.filter(id__in=ids, account=account)
existing_count = existing_tasks.count()
if existing_count == 0:
return error_response(
error=f'No tasks found for the provided IDs: {ids}',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
# Use service to generate content
service = ContentGenerationService()
try:
result = service.generate_content(ids, account)
if result.get('success'):
if 'task_id' in result:
# Async task queued
return success_response(
data={'task_id': result['task_id']},
message=result.get('message', 'Content generation started'),
request=request
)
else:
# Synchronous execution
return success_response(
data=result,
message='Content generated successfully',
request=request
)
else:
return error_response(
error=result.get('error', 'Content generation failed'),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except InsufficientCreditsError as e:
return error_response(
error=str(e),
status_code=status.HTTP_402_PAYMENT_REQUIRED,
request=request
)
except Exception as e:
logger.error(f"Error in auto_generate_content: {str(e)}", exc_info=True)
return error_response(
error=f'Content generation failed: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except Exception as e:
logger.error(f"Unexpected error in auto_generate_content: {str(e)}", exc_info=True)
return error_response(
error=f'Unexpected error: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=False, methods=['get'], url_path='filter_options', url_name='filter_options')
def filter_options(self, request):
"""
Get distinct filter values from current data with cascading support.
Returns only values that exist based on other active filters.
"""
import logging
from django.db.models import Q
logger = logging.getLogger(__name__)
try:
queryset = self.get_queryset()
# Get filter parameters for cascading
status_filter = request.query_params.get('status', '')
content_type_filter = request.query_params.get('content_type', '')
content_structure_filter = request.query_params.get('content_structure', '')
cluster_filter = request.query_params.get('cluster', '')
search = request.query_params.get('search', '')
# Apply search to base queryset
base_qs = queryset
if search:
base_qs = base_qs.filter(
Q(title__icontains=search) | Q(keywords__icontains=search)
)
# Get statuses (filtered by other fields)
status_qs = base_qs
if content_type_filter:
status_qs = status_qs.filter(content_type=content_type_filter)
if content_structure_filter:
status_qs = status_qs.filter(content_structure=content_structure_filter)
if cluster_filter:
status_qs = status_qs.filter(cluster_id=cluster_filter)
statuses = list(set(status_qs.values_list('status', flat=True)))
statuses = sorted([s for s in statuses if s])
status_labels = {
'queued': 'Queued',
'processing': 'Processing',
'completed': 'Completed',
'failed': 'Failed',
}
status_options = [
{'value': '', 'label': 'All Status'},
] + [
{'value': s, 'label': status_labels.get(s, s.title())}
for s in statuses
]
# Get content types (filtered by other fields)
type_qs = base_qs
if status_filter:
type_qs = type_qs.filter(status=status_filter)
if content_structure_filter:
type_qs = type_qs.filter(content_structure=content_structure_filter)
if cluster_filter:
type_qs = type_qs.filter(cluster_id=cluster_filter)
content_types = list(set(type_qs.values_list('content_type', flat=True)))
content_types = sorted([t for t in content_types if t])
type_labels = {
'post': 'Post',
'page': 'Page',
'product': 'Product',
'taxonomy': 'Taxonomy',
}
content_type_options = [
{'value': '', 'label': 'All Types'},
] + [
{'value': t, 'label': type_labels.get(t, t.title())}
for t in content_types
]
# Get content structures (filtered by other fields)
structure_qs = base_qs
if status_filter:
structure_qs = structure_qs.filter(status=status_filter)
if content_type_filter:
structure_qs = structure_qs.filter(content_type=content_type_filter)
if cluster_filter:
structure_qs = structure_qs.filter(cluster_id=cluster_filter)
structures = list(set(structure_qs.values_list('content_structure', flat=True)))
structures = sorted([s for s in structures if s])
structure_labels = {
'article': 'Article', 'guide': 'Guide', 'comparison': 'Comparison',
'review': 'Review', 'listicle': 'Listicle', 'landing_page': 'Landing Page',
'business_page': 'Business Page', 'service_page': 'Service Page',
'general': 'General', 'cluster_hub': 'Cluster Hub',
'product_page': 'Product Page', 'category_archive': 'Category Archive',
'tag_archive': 'Tag Archive', 'attribute_archive': 'Attribute Archive',
}
content_structure_options = [
{'value': '', 'label': 'All Structures'},
] + [
{'value': s, 'label': structure_labels.get(s, s.replace('_', ' ').title())}
for s in structures
]
# Get clusters (filtered by other fields)
cluster_qs = base_qs
if status_filter:
cluster_qs = cluster_qs.filter(status=status_filter)
if content_type_filter:
cluster_qs = cluster_qs.filter(content_type=content_type_filter)
if content_structure_filter:
cluster_qs = cluster_qs.filter(content_structure=content_structure_filter)
from igny8_core.modules.planner.models import Clusters
cluster_ids = list(set(
cluster_qs.exclude(cluster_id__isnull=True)
.values_list('cluster_id', flat=True)
))
clusters = Clusters.objects.filter(id__in=cluster_ids).values('id', 'name').order_by('name')
cluster_options = [
{'value': '', 'label': 'All Clusters'},
] + [
{'value': str(c['id']), 'label': c['name']}
for c in clusters
]
return success_response(
data={
'statuses': status_options,
'content_types': content_type_options,
'content_structures': content_structure_options,
'clusters': cluster_options,
},
request=request
)
except Exception as e:
logger.error(f"Error in filter_options: {str(e)}", exc_info=True)
return error_response(
error=f'Failed to fetch filter options: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@extend_schema_view(
list=extend_schema(tags=['Writer']),
create=extend_schema(tags=['Writer']),
retrieve=extend_schema(tags=['Writer']),
update=extend_schema(tags=['Writer']),
partial_update=extend_schema(tags=['Writer']),
destroy=extend_schema(tags=['Writer']),
)
class ImagesViewSet(SiteSectorModelViewSet):
"""
ViewSet for managing content images
Unified API Standard v1.0 compliant
"""
queryset = Images.objects.all()
serializer_class = ImagesSerializer
permission_classes = [IsAuthenticatedAndActive, IsViewerOrAbove]
pagination_class = CustomPageNumberPagination
throttle_scope = 'writer'
throttle_classes = [DebugScopedRateThrottle]
filter_backends = [DjangoFilterBackend, filters.OrderingFilter]
ordering_fields = ['created_at', 'position', 'id']
ordering = ['-id'] # Sort by ID descending (newest first)
filterset_class = ImagesFilter
def perform_create(self, serializer):
"""Override to automatically set account, site, and sector"""
from rest_framework.exceptions import ValidationError
# Get site and sector from request (set by middleware) or user's active context
site = getattr(self.request, 'site', None)
sector = getattr(self.request, 'sector', None)
if not site:
# Fallback to user's active site if not set by middleware
user = getattr(self.request, 'user', None)
if user and user.is_authenticated and hasattr(user, 'active_site'):
site = user.active_site
if not sector and site:
# Fallback to default sector for the site if not set by middleware
from igny8_core.auth.models import Sector
sector = site.sectors.filter(is_default=True).first()
# Site and sector are required - raise ValidationError if not available
# Use dict format for ValidationError to ensure proper error structure
if not site:
raise ValidationError({"site": ["Site is required for image creation. Please select a site."]})
if not sector:
raise ValidationError({"sector": ["Sector is required for image creation. Please select a sector."]})
# Add site and sector to validated_data so base class can validate access
serializer.validated_data['site'] = site
serializer.validated_data['sector'] = sector
# Call parent to set account and validate access
super().perform_create(serializer)
@action(detail=True, methods=['get'], url_path='file', url_name='image_file')
def serve_image_file(self, request, pk=None):
"""
Serve image file from local path via URL
GET /api/v1/writer/images/{id}/file/
"""
import os
from django.http import FileResponse, Http404
from django.conf import settings
try:
# Get image directly without account filtering for file serving
# This allows public access to image files
try:
image = Images.objects.get(pk=pk)
except Images.DoesNotExist:
return error_response(
error='Image not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
# Check if image has a local path
if not image.image_path:
return error_response(
error='No local file path available for this image',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
file_path = image.image_path
# Verify file exists at the saved path
if not os.path.exists(file_path):
logger.error(f"[serve_image_file] Image {pk} - File not found at saved path: {file_path}")
return error_response(
error=f'Image file not found at: {file_path}',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
# Check if file is readable
if not os.access(file_path, os.R_OK):
return error_response(
error='Image file is not readable',
status_code=status.HTTP_403_FORBIDDEN,
request=request
)
# Determine content type from file extension
import mimetypes
content_type, _ = mimetypes.guess_type(file_path)
if not content_type:
content_type = 'image/png' # Default to PNG
# Serve the file
try:
return FileResponse(
open(file_path, 'rb'),
content_type=content_type,
filename=os.path.basename(file_path)
)
except Exception as e:
return error_response(
error=f'Failed to serve file: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except Images.DoesNotExist:
return error_response(
error='Image not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Error serving image file: {str(e)}", exc_info=True)
return error_response(
error=f'Failed to serve image: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=False, methods=['post'], url_path='auto_generate', url_name='auto_generate_images')
def auto_generate_images(self, request):
"""Auto-generate images for tasks using AI"""
task_ids = request.data.get('task_ids', [])
if not task_ids:
return error_response(
error='No task IDs provided',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
if len(task_ids) > 10:
return error_response(
error='Maximum 10 tasks allowed for image generation',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Get account
account = getattr(request, 'account', None)
account_id = account.id if account else None
# Try to queue Celery task, fall back to synchronous if Celery not available
try:
from igny8_core.ai.tasks import run_ai_task
from kombu.exceptions import OperationalError as KombuOperationalError
if hasattr(run_ai_task, 'delay'):
# Celery is available - queue async task
task = run_ai_task.delay(
function_name='generate_images',
payload={'ids': task_ids},
account_id=account_id
)
return success_response(
data={'task_id': str(task.id)},
message='Image generation started',
request=request
)
else:
# Celery not available - execute synchronously
result = run_ai_task(
function_name='generate_images',
payload={'ids': task_ids},
account_id=account_id
)
if result.get('success'):
return success_response(
data={'images_created': result.get('count', 0)},
message=result.get('message', 'Image generation completed'),
request=request
)
else:
return error_response(
error=result.get('error', 'Image generation failed'),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except KombuOperationalError as e:
return error_response(
error='Task queue unavailable. Please try again.',
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
request=request
)
except ImportError:
# Tasks module not available
return error_response(
error='Image generation task not available',
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
request=request
)
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f"Error queuing image generation task: {str(e)}", exc_info=True)
return error_response(
error=f'Failed to start image generation: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=False, methods=['post'], url_path='bulk_update', url_name='bulk_update')
def bulk_update(self, request):
"""Bulk update image status by content_id or image IDs
Updates all images for a content record (featured + 1-6 in-article images)
"""
from django.db.models import Q
from .models import Content
content_id = request.data.get('content_id')
image_ids = request.data.get('ids', [])
status_value = request.data.get('status')
if not status_value:
return error_response(
error='No status provided',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
queryset = self.get_queryset()
# Update by content_id if provided, otherwise by image IDs
if content_id:
try:
# Get the content object to also update images linked directly to content
content = Content.objects.get(id=content_id)
# Update images linked directly to content (all images: featured + in-article)
# Note: task field was removed in refactor - images now link directly to content
updated_count = queryset.filter(content=content).update(status=status_value)
except Content.DoesNotExist:
return error_response(
error='Content not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
elif image_ids:
updated_count = queryset.filter(id__in=image_ids).update(status=status_value)
else:
return error_response(
error='Either content_id or ids must be provided',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
return success_response(data={'updated_count': updated_count}, request=request)
@action(detail=False, methods=['get'], url_path='content_images', url_name='content_images')
def content_images(self, request):
"""Get images grouped by content - one row per content with featured and in-article images"""
from .serializers import ContentImagesGroupSerializer, ContentImageSerializer
account = getattr(request, 'account', None)
# Get site_id and sector_id from query parameters
site_id = request.query_params.get('site_id')
sector_id = request.query_params.get('sector_id')
# Get all content that has images (either directly or via task)
# First, get content with direct image links
queryset = Content.objects.filter(images__isnull=False)
if account:
queryset = queryset.filter(account=account)
# Apply site/sector filtering if provided
if site_id:
try:
queryset = queryset.filter(site_id=int(site_id))
except (ValueError, TypeError):
pass
if sector_id:
try:
queryset = queryset.filter(sector_id=int(sector_id))
except (ValueError, TypeError):
pass
# Task field removed in Stage 1 - images are now only linked to content directly
# All images must be linked via content, not task
# Build grouped response
grouped_data = []
content_ids = set(queryset.values_list('id', flat=True).distinct())
for content_id in content_ids:
try:
content = Content.objects.get(id=content_id)
# Get images linked directly to content
content_images = Images.objects.filter(content=content).order_by('position')
# Get featured image
featured_image = content_images.filter(image_type='featured').first()
# Get in-article images (sorted by position)
in_article_images = list(content_images.filter(image_type='in_article').order_by('position'))
# Determine overall status
all_images = list(content_images)
if not all_images:
overall_status = 'pending'
elif all(img.status == 'generated' for img in all_images):
overall_status = 'complete'
elif any(img.status == 'failed' for img in all_images):
overall_status = 'failed'
elif any(img.status == 'generated' for img in all_images):
overall_status = 'partial'
else:
overall_status = 'pending'
# Create serializer instances with request context for proper URL generation
featured_serializer = ContentImageSerializer(featured_image, context={'request': request}) if featured_image else None
in_article_serializers = [ContentImageSerializer(img, context={'request': request}) for img in in_article_images]
grouped_data.append({
'content_id': content.id,
'content_title': content.title or content.meta_title or f"Content #{content.id}",
'content_status': content.status, # Add content status
'featured_image': featured_serializer.data if featured_serializer else None,
'in_article_images': [s.data for s in in_article_serializers],
'overall_status': overall_status,
})
except Content.DoesNotExist:
continue
# Sort by content title
grouped_data.sort(key=lambda x: x['content_title'])
return success_response(
data={
'count': len(grouped_data),
'results': grouped_data
},
request=request
)
@action(detail=False, methods=['post'], url_path='generate_images', url_name='generate_images')
def generate_images(self, request):
"""Generate images from prompts - queues Celery task for sequential processing"""
from igny8_core.ai.tasks import process_image_generation_queue
account = getattr(request, 'account', None)
image_ids = request.data.get('ids', [])
content_id = request.data.get('content_id')
if not image_ids:
return error_response(
error='No image IDs provided',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
account_id = account.id if account else None
# Queue Celery task
try:
if hasattr(process_image_generation_queue, 'delay'):
task = process_image_generation_queue.delay(
image_ids=image_ids,
account_id=account_id,
content_id=content_id
)
return success_response(
data={'task_id': str(task.id)},
message='Image generation started',
request=request
)
else:
# Fallback to synchronous execution (for testing)
result = process_image_generation_queue(
image_ids=image_ids,
account_id=account_id,
content_id=content_id
)
return success_response(data=result, request=request)
except Exception as e:
logger.error(f"[generate_images] Error: {str(e)}", exc_info=True)
return error_response(
error=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['get'], url_path='validation', url_name='validation')
def validation(self, request, pk=None):
"""
Stage 3: Get validation checklist for content.
GET /api/v1/writer/content/{id}/validation/
Returns aggregated validation checklist for Writer UI.
"""
content = self.get_object()
validation_service = ContentValidationService()
errors = validation_service.validate_content(content)
publish_errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'ready_to_publish': len(publish_errors) == 0,
'validation_errors': errors,
'publish_errors': publish_errors,
'metadata': {
'has_entity_type': bool(content.content_type),
'entity_type': content.content_type,
'has_cluster_mapping': self._has_cluster_mapping(content),
'has_taxonomy_mapping': self._has_taxonomy_mapping(content),
}
},
request=request
)
@action(detail=True, methods=['post'], url_path='validate', url_name='validate')
def validate(self, request, pk=None):
"""
Stage 3: Re-run validators and return actionable errors.
POST /api/v1/writer/content/{id}/validate/
Re-validates content and returns structured errors.
"""
content = self.get_object()
validation_service = ContentValidationService()
# Persist metadata mappings if task exists
# Metadata is now persisted directly on content - no task linkage needed
# mapping_service = MetadataMappingService() # DEPRECATED
# mapping_service.persist_task_metadata_to_content(content) # DEPRECATED
errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'errors': errors,
},
request=request
)
def _has_cluster_mapping(self, content):
"""Helper to check if content has cluster mapping"""
from igny8_core.business.content.models import ContentClusterMap
return ContentClusterMap.objects.filter(content=content).exists()
def _has_taxonomy_mapping(self, content):
"""Helper to check if content has taxonomy mapping"""
from igny8_core.business.content.models import ContentTaxonomyMap
return ContentTaxonomyMap.objects.filter(content=content).exists()
@extend_schema_view(
list=extend_schema(tags=['Writer']),
create=extend_schema(tags=['Writer']),
retrieve=extend_schema(tags=['Writer']),
update=extend_schema(tags=['Writer']),
partial_update=extend_schema(tags=['Writer']),
destroy=extend_schema(tags=['Writer']),
)
class ContentViewSet(SiteSectorModelViewSet):
"""
ViewSet for managing content with new unified structure
Unified API Standard v1.0 compliant
Stage 1 Refactored - removed deprecated fields
"""
queryset = Content.objects.select_related('cluster', 'site', 'sector').prefetch_related('taxonomy_terms')
serializer_class = ContentSerializer
permission_classes = [IsAuthenticatedAndActive, IsViewerOrAbove]
pagination_class = CustomPageNumberPagination
throttle_scope = 'writer'
throttle_classes = [DebugScopedRateThrottle]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
search_fields = ['title', 'content_html', 'external_url']
ordering_fields = ['created_at', 'updated_at', 'status']
ordering = ['-created_at']
# Stage 1: removed task_id, entity_type, content_format, cluster_role, sync_status, external_type
# Use custom filterset for date range filtering
filterset_class = ContentFilter
def get_queryset(self):
"""Override to support status__in filtering for multiple statuses"""
queryset = super().get_queryset()
# Support status__in query param (comma-separated list of statuses)
status_in = self.request.query_params.get('status__in', None)
if status_in:
statuses = [s.strip() for s in status_in.split(',') if s.strip()]
if statuses:
queryset = queryset.filter(status__in=statuses)
return queryset
def perform_create(self, serializer):
"""Override to check monthly word limit and set account"""
user = getattr(self.request, 'user', None)
account = getattr(self.request, 'account', None)
if not account and user and user.is_authenticated:
account = getattr(user, 'account', None)
# Get word count from validated data
word_count = serializer.validated_data.get('word_count', 0)
# If word_count not provided, calculate from content_html using standardized utility
if not word_count and 'content_html' in serializer.validated_data:
from igny8_core.utils.word_counter import calculate_word_count
html_content = serializer.validated_data.get('content_html', '')
word_count = calculate_word_count(html_content)
serializer.validated_data['word_count'] = word_count
if account:
serializer.save(account=account)
else:
serializer.save()
@action(detail=False, methods=['get'], url_path='filter_options', url_name='filter_options')
def filter_options(self, request):
"""
Get distinct filter values from current data with cascading support.
Returns only values that exist based on other active filters.
"""
import logging
from django.db.models import Q
logger = logging.getLogger(__name__)
try:
queryset = self.get_queryset()
# Get filter parameters for cascading
status_filter = request.query_params.get('status', '')
status_in_filter = request.query_params.get('status__in', '') # Comma-separated list of statuses
site_status_filter = request.query_params.get('site_status', '')
content_type_filter = request.query_params.get('content_type', '')
content_structure_filter = request.query_params.get('content_structure', '')
source_filter = request.query_params.get('source', '')
search = request.query_params.get('search', '')
# Apply base status__in filter to restrict entire result set (e.g., for Approved page)
base_qs = queryset
if status_in_filter:
status_list = [s.strip() for s in status_in_filter.split(',') if s.strip()]
if status_list:
base_qs = base_qs.filter(status__in=status_list)
# Apply search to base queryset
if search:
base_qs = base_qs.filter(
Q(title__icontains=search) | Q(summary__icontains=search)
)
# Get statuses (filtered by other fields)
status_qs = base_qs
if site_status_filter:
status_qs = status_qs.filter(site_status=site_status_filter)
if content_type_filter:
status_qs = status_qs.filter(content_type=content_type_filter)
if content_structure_filter:
status_qs = status_qs.filter(content_structure=content_structure_filter)
if source_filter:
status_qs = status_qs.filter(source=source_filter)
statuses = list(set(status_qs.values_list('status', flat=True)))
statuses = sorted([s for s in statuses if s])
status_labels = {
'draft': 'Draft',
'review': 'Review',
'approved': 'Approved',
'published': 'Published',
}
status_options = [
{'value': '', 'label': 'All Content Status'},
] + [
{'value': s, 'label': status_labels.get(s, s.title())}
for s in statuses
]
# Get site_statuses (filtered by other fields)
site_status_qs = base_qs
if status_filter:
site_status_qs = site_status_qs.filter(status=status_filter)
if content_type_filter:
site_status_qs = site_status_qs.filter(content_type=content_type_filter)
if content_structure_filter:
site_status_qs = site_status_qs.filter(content_structure=content_structure_filter)
if source_filter:
site_status_qs = site_status_qs.filter(source=source_filter)
site_statuses = list(set(site_status_qs.values_list('site_status', flat=True)))
site_statuses = sorted([s for s in site_statuses if s])
site_status_labels = {
'not_published': 'Not Published',
'scheduled': 'Scheduled',
'publishing': 'Publishing',
'published': 'Published',
'failed': 'Failed',
}
site_status_options = [
{'value': '', 'label': 'All Site Status'},
] + [
{'value': s, 'label': site_status_labels.get(s, s.replace('_', ' ').title())}
for s in site_statuses
]
# Get content types (filtered by other fields)
type_qs = base_qs
if status_filter:
type_qs = type_qs.filter(status=status_filter)
if site_status_filter:
type_qs = type_qs.filter(site_status=site_status_filter)
if content_structure_filter:
type_qs = type_qs.filter(content_structure=content_structure_filter)
if source_filter:
type_qs = type_qs.filter(source=source_filter)
content_types = list(set(type_qs.values_list('content_type', flat=True)))
content_types = sorted([t for t in content_types if t])
type_labels = {
'post': 'Post',
'page': 'Page',
'product': 'Product',
'taxonomy': 'Taxonomy',
}
content_type_options = [
{'value': '', 'label': 'All Types'},
] + [
{'value': t, 'label': type_labels.get(t, t.title())}
for t in content_types
]
# Get content structures (filtered by other fields)
structure_qs = base_qs
if status_filter:
structure_qs = structure_qs.filter(status=status_filter)
if site_status_filter:
structure_qs = structure_qs.filter(site_status=site_status_filter)
if content_type_filter:
structure_qs = structure_qs.filter(content_type=content_type_filter)
if source_filter:
structure_qs = structure_qs.filter(source=source_filter)
structures = list(set(structure_qs.values_list('content_structure', flat=True)))
structures = sorted([s for s in structures if s])
structure_labels = {
'article': 'Article', 'guide': 'Guide', 'comparison': 'Comparison',
'review': 'Review', 'listicle': 'Listicle', 'landing_page': 'Landing Page',
'business_page': 'Business Page', 'service_page': 'Service Page',
'general': 'General', 'cluster_hub': 'Cluster Hub',
'product_page': 'Product Page', 'category_archive': 'Category Archive',
'tag_archive': 'Tag Archive', 'attribute_archive': 'Attribute Archive',
}
content_structure_options = [
{'value': '', 'label': 'All Structures'},
] + [
{'value': s, 'label': structure_labels.get(s, s.replace('_', ' ').title())}
for s in structures
]
# Get sources (filtered by other fields)
source_qs = base_qs
if status_filter:
source_qs = source_qs.filter(status=status_filter)
if site_status_filter:
source_qs = source_qs.filter(site_status=site_status_filter)
if content_type_filter:
source_qs = source_qs.filter(content_type=content_type_filter)
if content_structure_filter:
source_qs = source_qs.filter(content_structure=content_structure_filter)
sources = list(set(source_qs.values_list('source', flat=True)))
sources = sorted([s for s in sources if s])
source_labels = {
'igny8': 'IGNY8',
'wordpress': 'WordPress',
}
source_options = [
{'value': '', 'label': 'All Sources'},
] + [
{'value': s, 'label': source_labels.get(s, s.title())}
for s in sources
]
return success_response(
data={
'statuses': status_options,
'site_statuses': site_status_options,
'content_types': content_type_options,
'content_structures': content_structure_options,
'sources': source_options,
},
request=request
)
except Exception as e:
logger.error(f"Error in filter_options: {str(e)}", exc_info=True)
return error_response(
error=f'Failed to fetch filter options: {str(e)}',
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=False, methods=['POST'], url_path='bulk_delete', url_name='bulk_delete')
def bulk_delete(self, request):
"""Bulk delete content"""
ids = request.data.get('ids', [])
if not ids:
return error_response(
error='No IDs provided',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
queryset = self.get_queryset()
items_to_delete = queryset.filter(id__in=ids)
deleted_count = items_to_delete.count()
items_to_delete.delete() # Soft delete via SoftDeletableModel
return success_response(data={'deleted_count': deleted_count}, request=request)
@action(detail=True, methods=['post'], url_path='publish', url_name='publish', permission_classes=[IsAuthenticatedAndActive, IsEditorOrAbove])
def publish(self, request, pk=None):
"""
STAGE 3: Publish content to WordPress site via Celery task.
Mirrors the automated publishing flow for manual publishing from Review page.
POST /api/v1/writer/content/{id}/publish/
{
"site_integration_id": 1 // Optional - defaults to finding WordPress integration for content's site
}
"""
from igny8_core.business.integration.models import SiteIntegration
from igny8_core.tasks.wordpress_publishing import publish_content_to_wordpress
import logging
logger = logging.getLogger(__name__)
content = self.get_object()
# STAGE 3: Prevent duplicate publishing
if content.external_id:
return error_response(
error='Content already published. Use WordPress to update or unpublish first.',
status_code=status.HTTP_400_BAD_REQUEST,
request=request,
errors={'external_id': [f'Already published with ID: {content.external_id}']}
)
# Get site integration (use content's site if not specified)
site_integration_id = request.data.get('site_integration_id')
if not site_integration_id:
# Find WordPress integration for this site
site_integrations = SiteIntegration.objects.filter(
site=content.site,
platform='wordpress',
is_active=True
)
if not site_integrations.exists():
return error_response(
error='No active WordPress integration found for this site',
status_code=status.HTTP_400_BAD_REQUEST,
request=request,
errors={'site_integration': ['WordPress integration is required to publish']}
)
site_integration = site_integrations.first()
else:
try:
site_integration = SiteIntegration.objects.get(
id=site_integration_id,
site=content.site,
platform='wordpress'
)
except SiteIntegration.DoesNotExist:
return error_response(
error=f'WordPress integration with id {site_integration_id} not found for this site',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
# OPTIMISTIC UPDATE: Set status to published immediately for better UX
# The Celery task will update external_id and external_url when WordPress responds
content.status = 'published'
content.save(update_fields=['status', 'updated_at'])
# Queue publishing task (same as automated flow)
try:
result = publish_content_to_wordpress.delay(
content_id=content.id,
site_integration_id=site_integration.id
)
logger.info(f"[ContentViewSet.publish] Queued Celery task {result.id} for content {content.id}, status set to 'published'")
return success_response(
data={
'content_id': content.id,
'task_id': result.id,
'status': 'published',
'message': 'Publishing queued - content will be published to WordPress shortly'
},
message='Content status updated to published and queued for WordPress',
request=request,
status_code=status.HTTP_202_ACCEPTED
)
except Exception as e:
logger.error(f"[ContentViewSet.publish] Error queuing publish task: {str(e)}", exc_info=True)
# Revert status on error
content.status = 'review'
content.save(update_fields=['status', 'updated_at'])
return error_response(
error=f"Failed to queue publishing task: {str(e)}",
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['get'], url_path='wordpress_status', url_name='wordpress_status')
def wordpress_status(self, request, pk=None):
"""
Get WordPress post status for published content.
Calls WordPress REST API to get current status.
GET /api/v1/writer/content/{id}/wordpress_status/
Returns: {
'wordpress_status': 'publish'|'draft'|'pending'|null,
'external_id': 123,
'external_url': 'https://...',
'last_checked': '2025-11-30T...'
}
"""
import requests
from django.utils import timezone
from igny8_core.business.integration.models import SiteIntegration
import logging
logger = logging.getLogger(__name__)
content = self.get_object()
if not content.external_id:
return success_response(
data={
'wordpress_status': None,
'external_id': None,
'external_url': None,
'message': 'Content not published to WordPress yet'
},
request=request
)
# Get WordPress integration for this content's site
try:
site_integration = SiteIntegration.objects.filter(
site=content.site,
platform='wordpress',
is_active=True
).first()
if not site_integration:
return error_response(
error='No active WordPress integration found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
# Call WordPress REST API to get post status
wordpress_url = f"{site_integration.site_url}/wp-json/igny8/v1/post-status/{content.external_id}/"
headers = {
'X-IGNY8-API-KEY': site_integration.api_key,
}
response = requests.get(wordpress_url, headers=headers, timeout=10)
if response.status_code == 200:
wp_data = response.json().get('data', {})
return success_response(
data={
'wordpress_status': wp_data.get('post_status'),
'external_id': content.external_id,
'external_url': content.external_url,
'post_title': wp_data.get('post_title'),
'post_modified': wp_data.get('post_modified'),
'last_checked': timezone.now().isoformat()
},
request=request
)
else:
logger.error(f"WordPress API error: {response.status_code} - {response.text}")
return error_response(
error=f'Failed to get WordPress status: {response.status_code}',
status_code=status.HTTP_502_BAD_GATEWAY,
request=request
)
except requests.RequestException as e:
logger.error(f"Request to WordPress failed: {str(e)}")
return error_response(
error=f'Connection to WordPress failed: {str(e)}',
status_code=status.HTTP_502_BAD_GATEWAY,
request=request
)
except Exception as e:
logger.error(f"Error getting WordPress status: {str(e)}", exc_info=True)
return error_response(
error=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['post'], url_path='unpublish', url_name='unpublish', permission_classes=[IsAuthenticatedAndActive, IsEditorOrAbove])
def unpublish(self, request, pk=None):
"""
STAGE 3: Unpublish content - clear external references and revert to draft.
Note: This does NOT delete the WordPress post, only clears the link.
POST /api/v1/writer/content/{id}/unpublish/
"""
content = self.get_object()
if not content.external_id:
return error_response(
error='Content is not published',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Store the old values for response
old_external_id = content.external_id
old_external_url = content.external_url
# Clear external references and revert status
content.external_id = None
content.external_url = None
content.status = 'draft'
content.save(update_fields=['external_id', 'external_url', 'status', 'updated_at'])
return success_response(
data={
'content_id': content.id,
'status': content.status,
'was_external_id': old_external_id,
'was_external_url': old_external_url,
},
message='Content unpublished successfully. WordPress post was not deleted.',
request=request
)
@action(detail=True, methods=['post'], url_path='schedule', url_name='schedule', permission_classes=[IsAuthenticatedAndActive, IsEditorOrAbove])
def schedule(self, request, pk=None):
"""
Schedule content for publishing at a specific date/time.
Sets site_status to 'scheduled' and scheduled_publish_at to the provided datetime.
POST /api/v1/writer/content/{id}/schedule/
{
"scheduled_publish_at": "2026-01-15T09:00:00Z" // Required: ISO 8601 datetime
}
"""
from django.utils import timezone
from django.utils.dateparse import parse_datetime
import logging
logger = logging.getLogger(__name__)
content = self.get_object()
# Validate content status - must be approved to schedule
if content.status != 'approved':
return error_response(
error=f'Only approved content can be scheduled. Current status: {content.status}',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Check if already published
if content.site_status == 'published':
return error_response(
error='Content is already published',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Get scheduled_publish_at from request
scheduled_at_str = request.data.get('scheduled_publish_at')
if not scheduled_at_str:
return error_response(
error='scheduled_publish_at is required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Parse datetime - handle Z suffix (UTC indicator) which Django's parse_datetime doesn't support
if scheduled_at_str.endswith('Z'):
scheduled_at_str = scheduled_at_str[:-1] + '+00:00'
scheduled_at = parse_datetime(scheduled_at_str)
if not scheduled_at:
return error_response(
error='Invalid datetime format. Use ISO 8601 format (e.g., 2026-01-15T09:00:00Z)',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Ensure datetime is in the future
if scheduled_at <= timezone.now():
return error_response(
error='Scheduled time must be in the future',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Update content
content.site_status = 'scheduled'
content.scheduled_publish_at = scheduled_at
content.site_status_updated_at = timezone.now()
content.save(update_fields=['site_status', 'scheduled_publish_at', 'site_status_updated_at', 'updated_at'])
logger.info(f"[ContentViewSet.schedule] Content {content.id} scheduled for {scheduled_at}")
return success_response(
data={
'content_id': content.id,
'site_status': content.site_status,
'scheduled_publish_at': content.scheduled_publish_at.isoformat(),
},
message=f'Content scheduled for {scheduled_at.strftime("%Y-%m-%d %H:%M")}',
request=request
)
@action(detail=True, methods=['post'], url_path='unschedule', url_name='unschedule', permission_classes=[IsAuthenticatedAndActive, IsEditorOrAbove])
def unschedule(self, request, pk=None):
"""
Remove content from publishing schedule.
Clears site_status and scheduled_publish_at.
POST /api/v1/writer/content/{id}/unschedule/
"""
from django.utils import timezone
import logging
logger = logging.getLogger(__name__)
content = self.get_object()
# Check if content is scheduled
if content.site_status not in ['scheduled', 'publishing']:
return error_response(
error=f'Content is not scheduled. Current site_status: {content.site_status}',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Store old values
old_scheduled_at = content.scheduled_publish_at
# Clear scheduling
content.site_status = 'not_published'
content.scheduled_publish_at = None
content.site_status_updated_at = timezone.now()
content.save(update_fields=['site_status', 'scheduled_publish_at', 'site_status_updated_at', 'updated_at'])
logger.info(f"[ContentViewSet.unschedule] Content {content.id} removed from schedule (was {old_scheduled_at})")
return success_response(
data={
'content_id': content.id,
'site_status': content.site_status,
'was_scheduled_for': old_scheduled_at.isoformat() if old_scheduled_at else None,
},
message='Content removed from publishing schedule',
request=request
)
@action(detail=True, methods=['post'], url_path='reschedule', url_name='reschedule', permission_classes=[IsAuthenticatedAndActive, IsEditorOrAbove])
def reschedule(self, request, pk=None):
"""
Reschedule failed or published content for republishing.
Updates scheduled_publish_at and sets site_status back to 'scheduled'.
POST /api/v1/writer/content/{id}/reschedule/
{
"scheduled_at": "2026-01-20T14:00:00Z" // ISO 8601 datetime
}
"""
from django.utils import timezone
from dateutil import parser
import logging
logger = logging.getLogger(__name__)
content = self.get_object()
# Get scheduled time from request
scheduled_at_str = request.data.get('scheduled_at')
if not scheduled_at_str:
return error_response(
error='scheduled_at is required (ISO 8601 datetime)',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Parse datetime
try:
scheduled_at = parser.isoparse(scheduled_at_str)
if scheduled_at.tzinfo is None:
scheduled_at = timezone.make_aware(scheduled_at)
except (ValueError, TypeError) as e:
return error_response(
error=f'Invalid datetime format: {str(e)}. Use ISO 8601 format.',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Ensure datetime is in the future
if scheduled_at <= timezone.now():
return error_response(
error='Scheduled time must be in the future',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Store old values for logging
old_status = content.site_status
old_scheduled_at = content.scheduled_publish_at
# Update content - allow rescheduling from any state
content.site_status = 'scheduled'
content.scheduled_publish_at = scheduled_at
content.site_status_updated_at = timezone.now()
content.save(update_fields=['site_status', 'scheduled_publish_at', 'site_status_updated_at', 'updated_at'])
logger.info(f"[ContentViewSet.reschedule] Content {content.id} rescheduled from {old_status} (was {old_scheduled_at}) to {scheduled_at}")
return success_response(
data={
'content_id': content.id,
'site_status': content.site_status,
'scheduled_publish_at': content.scheduled_publish_at.isoformat(),
'previous_status': old_status,
'was_scheduled_for': old_scheduled_at.isoformat() if old_scheduled_at else None,
},
message=f'Content rescheduled for {scheduled_at.strftime("%Y-%m-%d %H:%M")}',
request=request
)
@action(detail=False, methods=['post'], url_path='bulk_schedule_preview', url_name='bulk_schedule_preview', permission_classes=[IsAuthenticatedAndActive, IsEditorOrAbove])
def bulk_schedule_preview(self, request):
"""
Preview bulk scheduling with site default settings.
Shows what the schedule would look like before confirming.
POST /api/v1/writer/content/bulk_schedule_preview/
{
"content_ids": [123, 124, 125],
"site_id": 45
}
"""
from django.utils import timezone
from datetime import timedelta
from igny8_core.business.integration.models import Site, SitePublishingSettings
import logging
logger = logging.getLogger(__name__)
content_ids = request.data.get('content_ids', [])
site_id = request.data.get('site_id')
if not content_ids:
return error_response(
error='content_ids is required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
if not site_id:
return error_response(
error='site_id is required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Get site and publishing settings
try:
site = Site.objects.get(id=site_id)
pub_settings = SitePublishingSettings.objects.filter(site=site).first()
except Site.DoesNotExist:
return error_response(
error=f'Site {site_id} not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
# Default settings if none exist
base_time_str = '09:00 AM'
stagger_interval = 15 # minutes
timezone_str = 'America/New_York'
if pub_settings:
base_time_str = pub_settings.auto_publish_time or base_time_str
stagger_interval = pub_settings.stagger_interval_minutes or stagger_interval
timezone_str = pub_settings.timezone or timezone_str
# Get content items
content_qs = self.get_queryset().filter(id__in=content_ids)
# Generate schedule preview
schedule_preview = []
now = timezone.now()
# Parse base time (format: "09:00 AM" or "14:30")
try:
from datetime import datetime
if 'AM' in base_time_str or 'PM' in base_time_str:
time_obj = datetime.strptime(base_time_str, '%I:%M %p').time()
else:
time_obj = datetime.strptime(base_time_str, '%H:%M').time()
except ValueError:
time_obj = datetime.strptime('09:00', '%H:%M').time()
# Start from tomorrow at base time
start_date = now.replace(hour=time_obj.hour, minute=time_obj.minute, second=0, microsecond=0)
if start_date <= now:
start_date += timedelta(days=1)
# Create schedule for each content item
for index, content in enumerate(content_qs):
scheduled_at = start_date + timedelta(minutes=stagger_interval * index)
schedule_preview.append({
'content_id': content.id,
'title': content.title,
'scheduled_at': scheduled_at.isoformat(),
})
logger.info(f"[bulk_schedule_preview] Generated preview for {len(schedule_preview)} items")
return success_response(
data={
'scheduled_count': len(schedule_preview),
'schedule_preview': schedule_preview,
'site_settings': {
'base_time': base_time_str,
'stagger_interval': stagger_interval,
'timezone': timezone_str,
},
},
message=f'Preview generated for {len(schedule_preview)} items',
request=request
)
@action(detail=False, methods=['post'], url_path='bulk_schedule', url_name='bulk_schedule', permission_classes=[IsAuthenticatedAndActive, IsEditorOrAbove])
def bulk_schedule(self, request):
"""
Bulk schedule multiple content items using site default settings.
POST /api/v1/writer/content/bulk_schedule/
{
"content_ids": [123, 124, 125],
"use_site_defaults": true,
"site_id": 45
}
"""
from django.utils import timezone
from datetime import timedelta
from igny8_core.business.integration.models import Site, SitePublishingSettings
import logging
logger = logging.getLogger(__name__)
content_ids = request.data.get('content_ids', [])
use_site_defaults = request.data.get('use_site_defaults', True)
site_id = request.data.get('site_id')
if not content_ids:
return error_response(
error='content_ids is required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
if not site_id:
return error_response(
error='site_id is required',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
# Get site and publishing settings
try:
site = Site.objects.get(id=site_id)
pub_settings = SitePublishingSettings.objects.filter(site=site).first()
except Site.DoesNotExist:
return error_response(
error=f'Site {site_id} not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
# Default settings if none exist
base_time_str = '09:00 AM'
stagger_interval = 15 # minutes
if pub_settings and use_site_defaults:
base_time_str = pub_settings.auto_publish_time or base_time_str
stagger_interval = pub_settings.stagger_interval_minutes or stagger_interval
# Get content items
content_qs = self.get_queryset().filter(id__in=content_ids)
# Generate schedule and apply
now = timezone.now()
# Parse base time
try:
from datetime import datetime
if 'AM' in base_time_str or 'PM' in base_time_str:
time_obj = datetime.strptime(base_time_str, '%I:%M %p').time()
else:
time_obj = datetime.strptime(base_time_str, '%H:%M').time()
except ValueError:
time_obj = datetime.strptime('09:00', '%H:%M').time()
# Start from tomorrow at base time
start_date = now.replace(hour=time_obj.hour, minute=time_obj.minute, second=0, microsecond=0)
if start_date <= now:
start_date += timedelta(days=1)
# Schedule each content item
scheduled_count = 0
for index, content in enumerate(content_qs):
scheduled_at = start_date + timedelta(minutes=stagger_interval * index)
content.site_status = 'scheduled'
content.scheduled_publish_at = scheduled_at
content.site_status_updated_at = now
content.save(update_fields=['site_status', 'scheduled_publish_at', 'site_status_updated_at', 'updated_at'])
scheduled_count += 1
logger.info(f"[bulk_schedule] Scheduled {scheduled_count} content items")
return success_response(
data={
'scheduled_count': scheduled_count,
},
message=f'Successfully scheduled {scheduled_count} items',
request=request
)
@action(detail=False, methods=['post'], url_path='generate_image_prompts', url_name='generate_image_prompts')
def generate_image_prompts(self, request):
"""Generate image prompts for content records - same pattern as other AI functions"""
from igny8_core.ai.tasks import run_ai_task
account = getattr(request, 'account', None)
ids = request.data.get('ids', [])
if not ids:
return error_response(
error='No IDs provided',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
account_id = account.id if account else None
# Queue Celery task
try:
if hasattr(run_ai_task, 'delay'):
task = run_ai_task.delay(
function_name='generate_image_prompts',
payload={'ids': ids},
account_id=account_id
)
return success_response(
data={'task_id': str(task.id)},
message='Image prompt generation started',
request=request
)
else:
# Fallback to synchronous execution
result = run_ai_task(
function_name='generate_image_prompts',
payload={'ids': ids},
account_id=account_id
)
if result.get('success'):
return success_response(
data={'prompts_created': result.get('count', 0)},
message='Image prompts generated successfully',
request=request
)
else:
return error_response(
error=result.get('error', 'Image prompt generation failed'),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except Exception as e:
return error_response(
error=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['get'], url_path='validation', url_name='validation')
def validation(self, request, pk=None):
"""
Stage 3: Get validation checklist for content.
GET /api/v1/writer/content/{id}/validation/
Returns aggregated validation checklist for Writer UI.
"""
content = self.get_object()
validation_service = ContentValidationService()
errors = validation_service.validate_content(content)
publish_errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'ready_to_publish': len(publish_errors) == 0,
'validation_errors': errors,
'publish_errors': publish_errors,
'metadata': {
'has_entity_type': bool(content.content_type),
'entity_type': content.content_type,
'has_cluster_mapping': self._has_cluster_mapping(content),
'has_taxonomy_mapping': self._has_taxonomy_mapping(content),
}
},
request=request
)
@action(detail=True, methods=['post'], url_path='validate', url_name='validate')
def validate(self, request, pk=None):
"""
Stage 3: Re-run validators and return actionable errors.
POST /api/v1/writer/content/{id}/validate/
Re-validates content and returns structured errors.
"""
content = self.get_object()
validation_service = ContentValidationService()
# Persist metadata mappings if task exists
# Metadata is now persisted directly on content - no task linkage needed
# mapping_service = MetadataMappingService() # DEPRECATED
# mapping_service.persist_task_metadata_to_content(content) # DEPRECATED
errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'errors': errors,
},
request=request
)
def _has_cluster_mapping(self, content):
"""Helper to check if content has cluster mapping"""
from igny8_core.business.content.models import ContentClusterMap
return ContentClusterMap.objects.filter(content=content).exists()
def _has_taxonomy_mapping(self, content):
"""Helper to check if content has taxonomy mapping"""
from igny8_core.business.content.models import ContentTaxonomyMap
return ContentTaxonomyMap.objects.filter(content=content).exists()
@action(detail=False, methods=['post'], url_path='generate_product', url_name='generate_product')
def generate_product(self, request):
"""
Generate product content (Phase 8).
POST /api/v1/writer/content/generate_product/
{
"name": "Product Name",
"description": "Product description",
"features": ["Feature 1", "Feature 2"],
"target_audience": "Target audience",
"primary_keyword": "Primary keyword",
"site_id": 1, // optional
"sector_id": 1 // optional
}
"""
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
from igny8_core.auth.models import Site, Sector
account = getattr(request, 'account', None)
if not account:
return error_response(
error='Account not found',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
product_data = request.data
site_id = product_data.get('site_id')
sector_id = product_data.get('sector_id')
site = None
sector = None
if site_id:
try:
site = Site.objects.get(id=site_id, account=account)
except Site.DoesNotExist:
return error_response(
error='Site not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
if sector_id:
try:
sector = Sector.objects.get(id=sector_id, account=account)
except Sector.DoesNotExist:
return error_response(
error='Sector not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
service = ContentGenerationService()
try:
result = service.generate_product_content(
product_data=product_data,
account=account,
site=site,
sector=sector
)
if result.get('success'):
return success_response(
data={'task_id': result.get('task_id')},
message=result.get('message', 'Product content generation started'),
request=request
)
else:
return error_response(
error=result.get('error', 'Product content generation failed'),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except Exception as e:
return error_response(
error=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['get'], url_path='validation', url_name='validation')
def validation(self, request, pk=None):
"""
Stage 3: Get validation checklist for content.
GET /api/v1/writer/content/{id}/validation/
Returns aggregated validation checklist for Writer UI.
"""
content = self.get_object()
validation_service = ContentValidationService()
errors = validation_service.validate_content(content)
publish_errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'ready_to_publish': len(publish_errors) == 0,
'validation_errors': errors,
'publish_errors': publish_errors,
'metadata': {
'has_entity_type': bool(content.content_type),
'entity_type': content.content_type,
'has_cluster_mapping': self._has_cluster_mapping(content),
'has_taxonomy_mapping': self._has_taxonomy_mapping(content),
}
},
request=request
)
@action(detail=True, methods=['post'], url_path='validate', url_name='validate')
def validate(self, request, pk=None):
"""
Stage 3: Re-run validators and return actionable errors.
POST /api/v1/writer/content/{id}/validate/
Re-validates content and returns structured errors.
"""
content = self.get_object()
validation_service = ContentValidationService()
# Persist metadata mappings if task exists
# Metadata is now persisted directly on content - no task linkage needed
# mapping_service = MetadataMappingService() # DEPRECATED
# mapping_service.persist_task_metadata_to_content(content) # DEPRECATED
errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'errors': errors,
},
request=request
)
def _has_cluster_mapping(self, content):
"""Helper to check if content has cluster mapping"""
from igny8_core.business.content.models import ContentClusterMap
return ContentClusterMap.objects.filter(content=content).exists()
def _has_taxonomy_mapping(self, content):
"""Helper to check if content has taxonomy mapping"""
from igny8_core.business.content.models import ContentTaxonomyMap
return ContentTaxonomyMap.objects.filter(content=content).exists()
@action(detail=False, methods=['post'], url_path='generate_service', url_name='generate_service')
def generate_service(self, request):
"""
Generate service page content (Phase 8).
POST /api/v1/writer/content/generate_service/
{
"name": "Service Name",
"description": "Service description",
"benefits": ["Benefit 1", "Benefit 2"],
"target_audience": "Target audience",
"primary_keyword": "Primary keyword",
"site_id": 1, // optional
"sector_id": 1 // optional
}
"""
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
from igny8_core.auth.models import Site, Sector
account = getattr(request, 'account', None)
if not account:
return error_response(
error='Account not found',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
service_data = request.data
site_id = service_data.get('site_id')
sector_id = service_data.get('sector_id')
site = None
sector = None
if site_id:
try:
site = Site.objects.get(id=site_id, account=account)
except Site.DoesNotExist:
return error_response(
error='Site not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
if sector_id:
try:
sector = Sector.objects.get(id=sector_id, account=account)
except Sector.DoesNotExist:
return error_response(
error='Sector not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
service = ContentGenerationService()
try:
result = service.generate_service_page(
service_data=service_data,
account=account,
site=site,
sector=sector
)
if result.get('success'):
return success_response(
data={'task_id': result.get('task_id')},
message=result.get('message', 'Service page generation started'),
request=request
)
else:
return error_response(
error=result.get('error', 'Service page generation failed'),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except Exception as e:
return error_response(
error=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['get'], url_path='validation', url_name='validation')
def validation(self, request, pk=None):
"""
Stage 3: Get validation checklist for content.
GET /api/v1/writer/content/{id}/validation/
Returns aggregated validation checklist for Writer UI.
"""
content = self.get_object()
validation_service = ContentValidationService()
errors = validation_service.validate_content(content)
publish_errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'ready_to_publish': len(publish_errors) == 0,
'validation_errors': errors,
'publish_errors': publish_errors,
'metadata': {
'has_entity_type': bool(content.content_type),
'entity_type': content.content_type,
'has_cluster_mapping': self._has_cluster_mapping(content),
'has_taxonomy_mapping': self._has_taxonomy_mapping(content),
}
},
request=request
)
@action(detail=True, methods=['post'], url_path='validate', url_name='validate')
def validate(self, request, pk=None):
"""
Stage 3: Re-run validators and return actionable errors.
POST /api/v1/writer/content/{id}/validate/
Re-validates content and returns structured errors.
"""
content = self.get_object()
validation_service = ContentValidationService()
# Persist metadata mappings if task exists
# Metadata is now persisted directly on content - no task linkage needed
# mapping_service = MetadataMappingService() # DEPRECATED
# mapping_service.persist_task_metadata_to_content(content) # DEPRECATED
errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'errors': errors,
},
request=request
)
def _has_cluster_mapping(self, content):
"""Helper to check if content has cluster mapping"""
from igny8_core.business.content.models import ContentClusterMap
return ContentClusterMap.objects.filter(content=content).exists()
def _has_taxonomy_mapping(self, content):
"""Helper to check if content has taxonomy mapping"""
from igny8_core.business.content.models import ContentTaxonomyMap
return ContentTaxonomyMap.objects.filter(content=content).exists()
@action(detail=False, methods=['post'], url_path='generate_taxonomy', url_name='generate_taxonomy')
def generate_taxonomy(self, request):
"""
Generate taxonomy page content (Phase 8).
POST /api/v1/writer/content/generate_taxonomy/
{
"name": "Taxonomy Name",
"description": "Taxonomy description",
"items": ["Item 1", "Item 2"],
"primary_keyword": "Primary keyword",
"site_id": 1, // optional
"sector_id": 1 // optional
}
"""
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
from igny8_core.auth.models import Site, Sector
account = getattr(request, 'account', None)
if not account:
return error_response(
error='Account not found',
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
taxonomy_data = request.data
site_id = taxonomy_data.get('site_id')
sector_id = taxonomy_data.get('sector_id')
site = None
sector = None
if site_id:
try:
site = Site.objects.get(id=site_id, account=account)
except Site.DoesNotExist:
return error_response(
error='Site not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
if sector_id:
try:
sector = Sector.objects.get(id=sector_id, account=account)
except Sector.DoesNotExist:
return error_response(
error='Sector not found',
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
service = ContentGenerationService()
try:
result = service.generate_taxonomy(
taxonomy_data=taxonomy_data,
account=account,
site=site,
sector=sector
)
if result.get('success'):
return success_response(
data={'task_id': result.get('task_id')},
message=result.get('message', 'Taxonomy generation started'),
request=request
)
else:
return error_response(
error=result.get('error', 'Taxonomy generation failed'),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
except Exception as e:
return error_response(
error=str(e),
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@action(detail=True, methods=['get'], url_path='validation', url_name='validation')
def validation(self, request, pk=None):
"""
Stage 3: Get validation checklist for content.
GET /api/v1/writer/content/{id}/validation/
Returns aggregated validation checklist for Writer UI.
"""
content = self.get_object()
validation_service = ContentValidationService()
errors = validation_service.validate_content(content)
publish_errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'ready_to_publish': len(publish_errors) == 0,
'validation_errors': errors,
'publish_errors': publish_errors,
'metadata': {
'has_entity_type': bool(content.content_type),
'entity_type': content.content_type,
'has_cluster_mapping': self._has_cluster_mapping(content),
'has_taxonomy_mapping': self._has_taxonomy_mapping(content),
}
},
request=request
)
@action(detail=True, methods=['post'], url_path='validate', url_name='validate')
def validate(self, request, pk=None):
"""
Stage 3: Re-run validators and return actionable errors.
POST /api/v1/writer/content/{id}/validate/
Re-validates content and returns structured errors.
"""
content = self.get_object()
validation_service = ContentValidationService()
# Persist metadata mappings if task exists
# Metadata is now persisted directly on content - no task linkage needed
# mapping_service = MetadataMappingService() # DEPRECATED
# mapping_service.persist_task_metadata_to_content(content) # DEPRECATED
errors = validation_service.validate_for_publish(content)
return success_response(
data={
'content_id': content.id,
'is_valid': len(errors) == 0,
'errors': errors,
},
request=request
)
def _has_cluster_mapping(self, content):
"""Helper to check if content has cluster mapping"""
from igny8_core.business.content.models import ContentClusterMap
return ContentClusterMap.objects.filter(content=content).exists()
def _has_taxonomy_mapping(self, content):
"""Helper to check if content has taxonomy mapping"""
# Check new M2M relationship
return content.taxonomy_terms.exists()
@extend_schema_view(
list=extend_schema(tags=['Writer']),
create=extend_schema(tags=['Writer']),
retrieve=extend_schema(tags=['Writer']),
update=extend_schema(tags=['Writer']),
partial_update=extend_schema(tags=['Writer']),
destroy=extend_schema(tags=['Writer']),
)
class ContentTaxonomyViewSet(SiteSectorModelViewSet):
"""
ViewSet for managing content taxonomies (categories, tags, product attributes)
Unified API Standard v1.0 compliant
"""
queryset = ContentTaxonomy.objects.select_related('parent', 'site', 'sector').prefetch_related('clusters', 'contents')
serializer_class = ContentTaxonomySerializer
permission_classes = [IsAuthenticatedAndActive, IsViewerOrAbove]
pagination_class = CustomPageNumberPagination
throttle_scope = 'writer'
throttle_classes = [DebugScopedRateThrottle]
# DRF filtering configuration
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
# Search configuration
search_fields = ['name', 'slug', 'description', 'external_taxonomy']
# Ordering configuration
ordering_fields = ['name', 'taxonomy_type', 'count', 'created_at']
ordering = ['taxonomy_type', 'name']
# Filter configuration
# Removed "parent" to avoid non-model field in filterset (breaks drf-spectacular)
filterset_fields = ['taxonomy_type', 'sync_status', 'external_id', 'external_taxonomy']
def perform_create(self, serializer):
"""Create taxonomy with site/sector context"""
user = getattr(self.request, 'user', None)
try:
query_params = getattr(self.request, 'query_params', None)
if query_params is None:
query_params = getattr(self.request, 'GET', {})
except AttributeError:
query_params = {}
site_id = serializer.validated_data.get('site_id') or query_params.get('site_id')
sector_id = serializer.validated_data.get('sector_id') or query_params.get('sector_id')
from igny8_core.auth.models import Site, Sector
from rest_framework.exceptions import ValidationError
if not site_id:
raise ValidationError("site_id is required")
try:
site = Site.objects.get(id=site_id)
except Site.DoesNotExist:
raise ValidationError(f"Site with id {site_id} does not exist")
if not sector_id:
raise ValidationError("sector_id is required")
try:
sector = Sector.objects.get(id=sector_id)
if sector.site_id != site_id:
raise ValidationError(f"Sector does not belong to the selected site")
except Sector.DoesNotExist:
raise ValidationError(f"Sector with id {sector_id} does not exist")
serializer.validated_data.pop('site_id', None)
serializer.validated_data.pop('sector_id', None)
account = getattr(self.request, 'account', None)
if not account and user and user.is_authenticated and user.account:
account = user.account
if not account:
account = site.account
serializer.save(account=account, site=site, sector=sector)
@action(detail=True, methods=['post'], permission_classes=[IsAuthenticatedAndActive, IsEditorOrAbove])
def map_to_cluster(self, request, pk=None):
"""Map taxonomy to semantic cluster"""
taxonomy = self.get_object()
cluster_id = request.data.get('cluster_id')
if not cluster_id:
return error_response(
error="cluster_id is required",
status_code=status.HTTP_400_BAD_REQUEST,
request=request
)
from igny8_core.business.planning.models import Clusters
try:
cluster = Clusters.objects.get(id=cluster_id, site=taxonomy.site)
taxonomy.clusters.add(cluster)
return success_response(
data={'message': f'Taxonomy "{taxonomy.name}" mapped to cluster "{cluster.name}"'},
message="Taxonomy mapped to cluster successfully",
request=request
)
except Clusters.DoesNotExist:
return error_response(
error=f"Cluster with id {cluster_id} not found",
status_code=status.HTTP_404_NOT_FOUND,
request=request
)
@action(detail=True, methods=['get'])
def contents(self, request, pk=None):
"""Get all content associated with this taxonomy"""
taxonomy = self.get_object()
contents = taxonomy.contents.all()
serializer = ContentSerializer(contents, many=True, context={'request': request})
return success_response(
data=serializer.data,
message=f"Found {contents.count()} content items for taxonomy '{taxonomy.name}'",
request=request
)
# ContentAttributeViewSet temporarily disabled - ContentAttributeSerializer was removed in Stage 1
# TODO: Re-implement or remove completely based on Stage 1 architecture decisions