Files

1114 lines
42 KiB
Python

"""
Integration ViewSet
Phase 6: Site Integration & Multi-Destination Publishing
"""
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.response import Response
from django.utils import timezone
from drf_spectacular.utils import extend_schema, extend_schema_view
from igny8_core.api.base import SiteSectorModelViewSet
from igny8_core.api.permissions import IsAuthenticatedAndActive, IsEditorOrAbove, IsViewerOrAbove
from igny8_core.api.response import success_response, error_response
from igny8_core.api.throttles import DebugScopedRateThrottle
from igny8_core.auth.models import Site
from igny8_core.business.integration.models import SiteIntegration
from igny8_core.business.integration.services.integration_service import IntegrationService
from igny8_core.business.integration.services.sync_service import SyncService
from igny8_core.business.integration.services.sync_health_service import SyncHealthService
from igny8_core.business.integration.services.content_sync_service import ContentSyncService
import logging
logger = logging.getLogger(__name__)
@extend_schema_view(
list=extend_schema(tags=['Integration']),
create=extend_schema(tags=['Integration']),
retrieve=extend_schema(tags=['Integration']),
update=extend_schema(tags=['Integration']),
partial_update=extend_schema(tags=['Integration']),
destroy=extend_schema(tags=['Integration']),
)
class IntegrationViewSet(SiteSectorModelViewSet):
"""
ViewSet for SiteIntegration model.
"""
queryset = SiteIntegration.objects.select_related('site')
permission_classes = [IsAuthenticatedAndActive, IsEditorOrAbove]
throttle_scope = 'integration'
throttle_classes = [DebugScopedRateThrottle]
def get_permissions(self):
if self.action in ['list', 'retrieve']:
return [IsAuthenticatedAndActive(), IsViewerOrAbove()]
return [IsAuthenticatedAndActive(), IsEditorOrAbove()]
def get_queryset(self):
"""
Override to filter integrations by site.
SiteIntegration only has 'site' field (no 'sector'), so SiteSectorModelViewSet's
filtering doesn't apply. We manually filter by site here.
"""
queryset = super().get_queryset()
# Get site parameter from query params
site_id = self.request.query_params.get('site_id') or self.request.query_params.get('site')
if site_id:
try:
site_id_int = int(site_id)
queryset = queryset.filter(site_id=site_id_int)
except (ValueError, TypeError):
# Invalid site_id, return empty queryset
queryset = queryset.none()
return queryset
def get_serializer_class(self):
from rest_framework import serializers
class SiteIntegrationSerializer(serializers.ModelSerializer):
api_key = serializers.SerializerMethodField()
def get_api_key(self, obj):
"""Return the API key from Site.wp_api_key (SINGLE source of truth)"""
# API key is stored on Site model, not in SiteIntegration credentials
return obj.site.wp_api_key or ''
def validate(self, data):
"""
Custom validation for WordPress integrations.
API key is stored on Site model, not in SiteIntegration.
"""
validated_data = super().validate(data)
# For WordPress platform, check API key exists on Site (not in credentials_json)
if validated_data.get('platform') == 'wordpress':
site = validated_data.get('site') or getattr(self.instance, 'site', None)
if site and not site.wp_api_key:
raise serializers.ValidationError({
'site': 'Site must have an API key generated before creating WordPress integration.'
})
return validated_data
class Meta:
model = SiteIntegration
fields = '__all__'
read_only_fields = ['created_at', 'updated_at', 'last_sync_at']
return SiteIntegrationSerializer
@extend_schema(tags=['Integration'])
@action(detail=True, methods=['post'])
def test_connection(self, request, pk=None):
"""
Test connection to integrated platform.
POST /api/v1/integration/integrations/{id}/test_connection/
"""
integration = self.get_object()
service = IntegrationService()
result = service.test_connection(integration)
if result.get('success'):
return success_response(result, request=request)
else:
return error_response(
result.get('message', 'Connection test failed'),
None,
status.HTTP_400_BAD_REQUEST,
request
)
from rest_framework.permissions import AllowAny
from rest_framework.throttling import BaseThrottle
class NoThrottle(BaseThrottle):
"""Temporary throttle class that allows all requests"""
def allow_request(self, request, view):
return True
@extend_schema(tags=['Integration'])
@action(detail=False, methods=['post'], url_path='test-connection',
permission_classes=[AllowAny], throttle_classes=[NoThrottle])
def test_connection_collection(self, request):
"""
Test WordPress connection using Site.wp_api_key (single source of truth).
POST /api/v1/integration/integrations/test-connection/
Body:
{
"site_id": 123
}
Tests:
1. WordPress site is reachable
2. IGNY8 plugin is installed
3. Plugin has API key configured (matching Site.wp_api_key)
"""
import requests as http_requests
site_id = request.data.get('site_id')
if not site_id:
return error_response('site_id is required', None, status.HTTP_400_BAD_REQUEST, request)
# Verify site exists
from igny8_core.auth.models import Site
try:
site = Site.objects.get(id=int(site_id))
except (Site.DoesNotExist, ValueError, TypeError):
return error_response('Site not found or invalid', None, status.HTTP_404_NOT_FOUND, request)
# Authentication: user must be authenticated and belong to same account
if not hasattr(request, 'user') or not getattr(request.user, 'is_authenticated', False):
return error_response('Authentication required', None, status.HTTP_403_FORBIDDEN, request)
try:
if site.account != request.user.account:
return error_response('Site does not belong to your account', None, status.HTTP_403_FORBIDDEN, request)
except Exception:
return error_response('Authentication failed', None, status.HTTP_403_FORBIDDEN, request)
# Get stored API key from Site model (single source of truth)
stored_api_key = site.wp_api_key
if not stored_api_key:
return error_response(
'API key not configured. Please generate an API key first.',
None,
status.HTTP_400_BAD_REQUEST,
request
)
# Get site URL
site_url = site.domain or site.url
if not site_url:
return error_response(
'Site URL not configured',
None,
status.HTTP_400_BAD_REQUEST,
request
)
# Health check results
health_checks = {
'site_url_configured': True,
'api_key_configured': True,
'wp_rest_api_reachable': False,
'plugin_installed': False,
'plugin_has_api_key': False,
'api_key_verified': False, # NEW: Verifies IGNY8 and WordPress have SAME key
}
issues = []
try:
# Check 1: WordPress REST API reachable
try:
rest_response = http_requests.get(
f"{site_url.rstrip('/')}/wp-json/",
timeout=10
)
if rest_response.status_code == 200:
health_checks['wp_rest_api_reachable'] = True
else:
issues.append(f"WordPress REST API not reachable: HTTP {rest_response.status_code}")
except Exception as e:
issues.append(f"WordPress REST API unreachable: {str(e)}")
# Check 2: IGNY8 Plugin installed (public status endpoint)
try:
status_response = http_requests.get(
f"{site_url.rstrip('/')}/wp-json/igny8/v1/status",
timeout=10
)
if status_response.status_code == 200:
health_checks['plugin_installed'] = True
status_data = status_response.json()
plugin_data = status_data.get('data', status_data)
if plugin_data.get('connected') or plugin_data.get('has_api_key'):
health_checks['plugin_has_api_key'] = True
else:
issues.append("Plugin installed but no API key configured in WordPress")
else:
issues.append(f"IGNY8 plugin not found: HTTP {status_response.status_code}")
except Exception as e:
issues.append(f"Cannot detect IGNY8 plugin: {str(e)}")
# Check 3: Verify API keys MATCH by making authenticated request
# This is the CRITICAL and AUTHORITATIVE check - WordPress must accept our API key
# We always try this if plugin is installed, regardless of what /status says about has_api_key
# because the /verify-key endpoint is the source of truth
if health_checks['plugin_installed']:
try:
# Make authenticated request using Site.wp_api_key to dedicated verify endpoint
verify_response = http_requests.get(
f"{site_url.rstrip('/')}/wp-json/igny8/v1/verify-key",
headers={
'X-IGNY8-API-KEY': stored_api_key,
'Content-Type': 'application/json'
},
timeout=10
)
if verify_response.status_code == 200:
health_checks['api_key_verified'] = True
# If verify succeeds, plugin definitely has the matching key
health_checks['plugin_has_api_key'] = True
elif verify_response.status_code in [401, 403]:
issues.append("API key mismatch - WordPress has different key than IGNY8. Please copy the API key from IGNY8 to WordPress plugin settings.")
else:
issues.append(f"API key verification failed: HTTP {verify_response.status_code}")
except Exception as e:
issues.append(f"API key verification request failed: {str(e)}")
# Determine overall status - MUST include api_key_verified for true connection
is_healthy = (
health_checks['api_key_configured'] and
health_checks['wp_rest_api_reachable'] and
health_checks['plugin_installed'] and
health_checks['plugin_has_api_key'] and
health_checks['api_key_verified'] # CRITICAL: keys must match
)
# Build message with clear guidance
if is_healthy:
message = "✅ WordPress integration is fully connected and verified"
elif not health_checks['wp_rest_api_reachable']:
message = "❌ Cannot reach WordPress site"
elif not health_checks['plugin_installed']:
message = "⚠️ WordPress is reachable but IGNY8 plugin not installed"
elif not health_checks['plugin_has_api_key']:
message = "⚠️ Plugin installed but no API key configured in WordPress"
elif not health_checks['api_key_verified']:
message = "⚠️ API key mismatch - copy the API key from IGNY8 to WordPress plugin"
else:
message = "❌ WordPress connection failed"
return success_response({
'success': is_healthy,
'message': message,
'site_id': site.id,
'site_name': site.name,
'site_url': site_url,
'api_key_configured': bool(stored_api_key),
'health_checks': health_checks,
'issues': issues if issues else None,
}, request=request)
except Exception as e:
logger.error(f"WordPress connection test failed: {e}")
return error_response(
f'Connection test failed: {str(e)}',
None,
status.HTTP_500_INTERNAL_SERVER_ERROR,
request
)
@extend_schema(tags=['Integration'])
@action(detail=True, methods=['post'])
def sync(self, request, pk=None):
"""
Trigger synchronization with integrated platform.
POST /api/v1/integration/integrations/{id}/sync/
Request body:
{
"direction": "metadata", # 'metadata', 'both', 'to_external', 'from_external'
"content_types": ["blog_post", "page"] # Optional
}
"""
integration = self.get_object()
direction = request.data.get('direction', 'metadata')
content_types = request.data.get('content_types')
# Handle metadata-only sync (for "Sync Now" button)
if direction == 'metadata':
from igny8_core.business.integration.services.sync_metadata_service import SyncMetadataService
metadata_service = SyncMetadataService()
result = metadata_service.sync_wordpress_structure(integration)
else:
# Full content sync (legacy behavior)
sync_service = SyncService()
result = sync_service.sync(integration, direction=direction, content_types=content_types)
response_status = status.HTTP_200_OK if result.get('success') else status.HTTP_400_BAD_REQUEST
return success_response(result, request=request, status_code=response_status)
@extend_schema(tags=['Integration'])
@action(detail=True, methods=['get'])
def sync_status(self, request, pk=None):
"""
Get sync status for integration.
GET /api/v1/integration/integrations/{id}/sync_status/
"""
integration = self.get_object()
sync_service = SyncService()
status_data = sync_service.get_sync_status(integration)
return success_response(status_data, request=request)
@action(detail=True, methods=['post'], url_path='update-structure')
def update_site_structure(self, request, pk=None):
"""
Update WordPress site structure (post types, taxonomies, counts).
Called by WordPress plugin to push site configuration to backend.
POST /api/v1/integration/integrations/{id}/update-structure/
Request body:
{
"post_types": {
"post": {"label": "Posts", "count": 123, "enabled": true, "fetch_limit": 100},
"page": {"label": "Pages", "count": 12, "enabled": true, "fetch_limit": 100},
"product": {"label": "Products", "count": 456, "enabled": false, "fetch_limit": 50}
},
"taxonomies": {
"category": {"label": "Categories", "count": 25, "enabled": true, "fetch_limit": 100},
"post_tag": {"label": "Tags", "count": 102, "enabled": true, "fetch_limit": 100},
"product_cat": {"label": "Product Categories", "count": 15, "enabled": false, "fetch_limit": 50}
},
"plugin_connection_enabled": true,
"two_way_sync_enabled": true
}
"""
integration = self.get_object()
# Update config with new structure
config = integration.config_json or {}
post_types = request.data.get('post_types', {})
taxonomies = request.data.get('taxonomies', {})
if post_types or taxonomies:
config['content_types'] = {
'post_types': post_types,
'taxonomies': taxonomies,
'last_structure_fetch': request.data.get('timestamp') or str(timezone.now().isoformat())
}
config['plugin_connection_enabled'] = request.data.get('plugin_connection_enabled', True)
config['two_way_sync_enabled'] = request.data.get('two_way_sync_enabled', True)
integration.config_json = config
integration.save()
return success_response({
'message': 'Site structure updated successfully',
'post_types_count': len(post_types),
'taxonomies_count': len(taxonomies),
'last_structure_fetch': config['content_types']['last_structure_fetch']
}, request=request)
return error_response(
'No post types or taxonomies provided',
None,
status.HTTP_400_BAD_REQUEST,
request
)
@action(detail=True, methods=['get'], url_path='content-types')
def content_types_summary(self, request, pk=None):
"""
Get content types summary with counts from synced data.
GET /api/v1/integration/integrations/{id}/content-types/
Returns:
{
"success": true,
"data": {
"post_types": {
"post": {"label": "Posts", "count": 123, "synced_count": 50},
"page": {"label": "Pages", "count": 12, "synced_count": 12},
"product": {"label": "Products", "count": 456, "synced_count": 200}
},
"taxonomies": {
"category": {"label": "Categories", "count": 25, "synced_count": 25},
"post_tag": {"label": "Tags", "count": 102, "synced_count": 80},
"product_cat": {"label": "Product Categories", "count": 15, "synced_count": 15}
},
"last_structure_fetch": "2025-11-22T10:00:00Z"
}
}
"""
integration = self.get_object()
site = integration.site
# Get config from integration
config = integration.config_json or {}
content_types = config.get('content_types', {})
# Get synced counts from Content and ContentTaxonomy models
from igny8_core.business.content.models import Content, ContentTaxonomy
# Build response with synced counts
post_types_data = {}
for wp_type, type_config in content_types.get('post_types', {}).items():
# Map WP type to content_type
content_type_map = {
'post': 'post',
'page': 'page',
'product': 'product',
'service': 'service',
}
content_type = content_type_map.get(wp_type, 'post')
# Count synced content
synced_count = Content.objects.filter(
site=site,
content_type=content_type,
external_type=wp_type,
sync_status__in=['imported', 'synced']
).count()
post_types_data[wp_type] = {
'label': type_config.get('label', wp_type.title()),
'count': type_config.get('count', 0),
'synced_count': synced_count,
'enabled': type_config.get('enabled', False),
'fetch_limit': type_config.get('fetch_limit', 100),
'last_synced': type_config.get('last_synced'),
}
taxonomies_data = {}
for wp_tax, tax_config in content_types.get('taxonomies', {}).items():
# Count synced taxonomies
synced_count = ContentTaxonomy.objects.filter(
site=site,
external_taxonomy=wp_tax
).count()
taxonomies_data[wp_tax] = {
'label': tax_config.get('label', wp_tax.title()),
'count': tax_config.get('count', 0),
'synced_count': synced_count,
'enabled': tax_config.get('enabled', False),
'fetch_limit': tax_config.get('fetch_limit', 100),
'last_synced': tax_config.get('last_synced'),
}
summary = {
'post_types': post_types_data,
'taxonomies': taxonomies_data,
'last_structure_fetch': content_types.get('last_structure_fetch'),
'plugin_connection_enabled': config.get('plugin_connection_enabled', True),
'two_way_sync_enabled': config.get('two_way_sync_enabled', True),
}
return success_response(summary, request=request)
# Stage 4: Site-level sync endpoints
@action(detail=False, methods=['get'], url_path='sites/(?P<site_id>[^/.]+)/sync/status')
def sync_status_by_site(self, request, site_id=None):
"""
Get sync status for all integrations on a site.
Stage 4: Site-level sync health endpoint.
GET /api/v1/integration/integrations/sites/{site_id}/sync/status/
"""
try:
site_id_int = int(site_id)
except (ValueError, TypeError):
return error_response(
'Invalid site_id',
None,
status.HTTP_400_BAD_REQUEST,
request
)
# Verify site belongs to user's account
from igny8_core.auth.models import Site
try:
site = Site.objects.get(id=site_id_int, account=request.user.account)
except Site.DoesNotExist:
return error_response(
'Site not found',
None,
status.HTTP_404_NOT_FOUND,
request
)
sync_health_service = SyncHealthService()
status_data = sync_health_service.get_sync_status(site_id_int)
return success_response(status_data, request=request)
@action(detail=False, methods=['post'], url_path='sites/(?P<site_id>[^/.]+)/sync/run')
def run_sync(self, request, site_id=None):
"""
Trigger sync for all integrations on a site.
Stage 4: Site-level sync trigger endpoint.
POST /api/v1/integration/integrations/sites/{site_id}/sync/run/
Request body:
{
"direction": "both", # Optional: 'both', 'to_external', 'from_external'
"content_types": ["blog_post", "product"] # Optional
}
"""
try:
site_id_int = int(site_id)
except (ValueError, TypeError):
return error_response(
'Invalid site_id',
None,
status.HTTP_400_BAD_REQUEST,
request
)
# Verify site belongs to user's account
from igny8_core.auth.models import Site
try:
site = Site.objects.get(id=site_id_int, account=request.user.account)
except Site.DoesNotExist:
return error_response(
'Site not found',
None,
status.HTTP_404_NOT_FOUND,
request
)
direction = request.data.get('direction', 'both')
content_types = request.data.get('content_types')
# Get all active integrations for this site
integrations = SiteIntegration.objects.filter(
site_id=site_id_int,
is_active=True,
sync_enabled=True
)
if not integrations.exists():
return error_response(
'No active integrations found for this site',
None,
status.HTTP_400_BAD_REQUEST,
request
)
sync_service = SyncService()
sync_health_service = SyncHealthService()
results = []
for integration in integrations:
result = sync_service.sync(integration, direction=direction, content_types=content_types)
# Record sync run
sync_health_service.record_sync_run(integration.id, result)
results.append({
'integration_id': integration.id,
'platform': integration.platform,
'result': result
})
return success_response({
'site_id': site_id_int,
'sync_results': results,
'total_integrations': len(results)
}, request=request)
@action(detail=False, methods=['get'], url_path='sites/(?P<site_id>[^/.]+)/sync/mismatches')
def get_mismatches(self, request, site_id=None):
"""
Get sync mismatches for a site.
Stage 4: Detailed mismatch information.
GET /api/v1/integration/integrations/sites/{site_id}/sync/mismatches/
"""
try:
site_id_int = int(site_id)
except (ValueError, TypeError):
return error_response(
'Invalid site_id',
None,
status.HTTP_400_BAD_REQUEST,
request
)
# Verify site belongs to user's account
from igny8_core.auth.models import Site
try:
site = Site.objects.get(id=site_id_int, account=request.user.account)
except Site.DoesNotExist:
return error_response(
'Site not found',
None,
status.HTTP_404_NOT_FOUND,
request
)
sync_health_service = SyncHealthService()
mismatches = sync_health_service.get_mismatches(site_id_int)
return success_response(mismatches, request=request)
@action(detail=False, methods=['get'], url_path='sites/(?P<site_id>[^/.]+)/sync/logs')
def get_sync_logs(self, request, site_id=None):
"""
Get sync logs for a site.
Stage 4: Sync history and logs.
GET /api/v1/integration/integrations/sites/{site_id}/sync/logs/
Query params:
- limit: Number of logs to return (default: 100)
- integration_id: Filter by specific integration
"""
try:
site_id_int = int(site_id)
except (ValueError, TypeError):
return error_response(
'Invalid site_id',
None,
status.HTTP_400_BAD_REQUEST,
request
)
# Verify site belongs to user's account
from igny8_core.auth.models import Site
try:
site = Site.objects.get(id=site_id_int, account=request.user.account)
except Site.DoesNotExist:
return error_response(
'Site not found',
None,
status.HTTP_404_NOT_FOUND,
request
)
limit = int(request.query_params.get('limit', 100))
integration_id = request.query_params.get('integration_id')
sync_health_service = SyncHealthService()
logs = sync_health_service.get_sync_logs(
site_id_int,
integration_id=int(integration_id) if integration_id else None,
limit=limit
)
return success_response({
'site_id': site_id_int,
'logs': logs,
'count': len(logs)
}, request=request)
@action(detail=True, methods=['get'], url_path='debug-status')
def debug_status(self, request, pk=None):
"""
Get comprehensive debug status for WordPress integration.
Includes health, recent events, and data validation.
GET /api/v1/integration/integrations/{id}/debug-status/
Query params:
- include_events: Include recent sync events (default: true)
- include_validation: Include data validation matrix (default: false)
- event_limit: Number of events to return (default: 50)
"""
from igny8_core.business.integration.models import SyncEvent
integration = self.get_object()
include_events = request.query_params.get('include_events', 'true').lower() == 'true'
include_validation = request.query_params.get('include_validation', 'false').lower() == 'true'
event_limit = int(request.query_params.get('event_limit', 50))
# Get integration health
health_data = {
'api_status': 'healthy' if integration.is_active else 'error',
'api_message': 'Integration is active' if integration.is_active else 'Integration is inactive',
'last_api_check': integration.updated_at.isoformat() if integration.updated_at else None,
'plugin_active': integration.is_active,
'plugin_version': integration.credentials_json.get('plugin_version', 'Unknown') if integration.credentials_json else 'Unknown',
'debug_mode': integration.credentials_json.get('debug_enabled', False) if integration.credentials_json else False,
'sync_healthy': integration.last_sync_at is not None,
'pending_syncs': 0, # TODO: Calculate from actual sync queue
'last_sync': integration.last_sync_at.isoformat() if integration.last_sync_at else None,
}
response_data = {
'health': health_data,
}
# Include sync events if requested
if include_events:
# Get real-time sync events from database
events_qs = SyncEvent.objects.filter(
integration=integration
).order_by('-created_at')[:event_limit]
# Format events for frontend
events = []
for event in events_qs:
events.append({
'id': event.id,
'type': event.event_type,
'action': event.action,
'description': event.description,
'timestamp': event.created_at.isoformat(),
'success': event.success,
'content_id': event.content_id,
'external_id': event.external_id,
'error_message': event.error_message,
'duration_ms': event.duration_ms,
'details': event.details,
})
response_data['events'] = events
response_data['events_count'] = len(events)
# Include data validation if requested
if include_validation:
# TODO: Implement actual data validation check
# For now, return placeholder data
response_data['validation'] = [
{
'field_name': 'content_title',
'igny8_value': 'Sample Title',
'wp_value': 'Sample Title',
'matches': True,
},
{
'field_name': 'content_html',
'igny8_value': '<p>Content</p>',
'wp_value': '<p>Content</p>',
'matches': True,
},
]
return success_response(response_data, request=request)
@action(detail=True, methods=['post'], url_path='trigger-debug')
def trigger_debug(self, request, pk=None):
"""
Enable/disable debug mode for WordPress integration.
POST /api/v1/integration/integrations/{id}/trigger-debug/
Body:
{
"debug_enabled": true
}
"""
integration = self.get_object()
debug_enabled = request.data.get('debug_enabled', False)
# Update credentials with debug flag
credentials = integration.credentials_json or {}
credentials['debug_enabled'] = debug_enabled
integration.credentials_json = credentials
integration.save()
logger.info(
f"WordPress debug mode {'enabled' if debug_enabled else 'disabled'} "
f"for integration {integration.id} (site: {integration.site.name})"
)
return success_response({
'debug_enabled': debug_enabled,
'message': f"Debug mode {'enabled' if debug_enabled else 'disabled'} successfully",
}, request=request)
@action(detail=True, methods=['post'], url_path='test-connection')
def test_connection_detail(self, request, pk=None):
"""
Test connection to WordPress site.
POST /api/v1/integration/integrations/{id}/test-connection/
"""
integration = self.get_object()
service = IntegrationService()
result = service.test_connection(integration)
if result.get('success'):
return success_response(result, request=request)
else:
return error_response(
result.get('message', 'Connection test failed'),
result.get('details'),
status.HTTP_400_BAD_REQUEST,
request
)
@action(detail=False, methods=['post'], url_path='generate-api-key')
def generate_api_key(self, request):
"""
Generate a new API key for a site's WordPress integration.
POST /api/v1/integration/integrations/generate-api-key/
Body:
{
"site_id": 5
}
"""
site_id = request.data.get('site_id')
if not site_id:
return error_response(
'Site ID is required',
None,
status.HTTP_400_BAD_REQUEST,
request
)
try:
site = Site.objects.get(id=site_id)
except Site.DoesNotExist:
return error_response(
f'Site with ID {site_id} not found',
None,
status.HTTP_404_NOT_FOUND,
request
)
# Generate API key with format: igny8_site_{site_id}_{timestamp}_{random}
import time
import random
import string
timestamp = int(time.time() * 1000)
random_suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
api_key = f"igny8_site_{site_id}_{timestamp}_{random_suffix}"
# SINGLE SOURCE OF TRUTH: Store API key ONLY in Site.wp_api_key
# This is where APIKeyAuthentication validates against
site.wp_api_key = api_key
site.save(update_fields=['wp_api_key'])
logger.info(
f"Generated new API key for site {site.name} (ID: {site_id}), "
f"stored in Site.wp_api_key (single source of truth)"
)
return success_response({
'api_key': api_key,
'site_id': site.id,
'site_name': site.name,
'site_url': site.domain or site.url,
'message': 'API key generated successfully. WordPress integration is ready.',
}, request=request)
@action(detail=False, methods=['post'], url_path='revoke-api-key')
def revoke_api_key(self, request):
"""
Revoke (delete) the API key for a site's WordPress integration.
POST /api/v1/integration/integrations/revoke-api-key/
Body:
{
"site_id": 5
}
"""
site_id = request.data.get('site_id')
if not site_id:
return error_response(
'Site ID is required',
None,
status.HTTP_400_BAD_REQUEST,
request
)
try:
site = Site.objects.get(id=site_id)
except Site.DoesNotExist:
return error_response(
f'Site with ID {site_id} not found',
None,
status.HTTP_404_NOT_FOUND,
request
)
# Verify user has access to this site
if site.account != request.user.account:
return error_response(
'You do not have permission to modify this site',
None,
status.HTTP_403_FORBIDDEN,
request
)
# SINGLE SOURCE OF TRUTH: Remove API key from Site.wp_api_key
site.wp_api_key = None
site.save(update_fields=['wp_api_key'])
logger.info(
f"Revoked API key for site {site.name} (ID: {site_id})"
)
return success_response({
'site_id': site.id,
'site_name': site.name,
'message': 'API key revoked successfully. WordPress integration is now disconnected.',
}, request=request)
# PublishingSettings ViewSet
from rest_framework import serializers, viewsets
from igny8_core.business.integration.models import PublishingSettings
class PublishingSettingsSerializer(serializers.ModelSerializer):
"""Serializer for PublishingSettings model"""
class Meta:
model = PublishingSettings
fields = [
'id',
'site',
'auto_approval_enabled',
'auto_publish_enabled',
'scheduling_mode',
'daily_publish_limit',
'weekly_publish_limit',
'monthly_publish_limit',
'publish_days',
'publish_time_slots',
'stagger_start_time',
'stagger_end_time',
'stagger_interval_minutes',
'queue_limit',
'created_at',
'updated_at',
]
read_only_fields = ['id', 'site', 'created_at', 'updated_at']
@extend_schema_view(
retrieve=extend_schema(tags=['Integration']),
update=extend_schema(tags=['Integration']),
partial_update=extend_schema(tags=['Integration']),
)
class PublishingSettingsViewSet(viewsets.ViewSet):
"""
ViewSet for managing site-level publishing settings.
GET /api/v1/integration/sites/{site_id}/publishing-settings/
PUT /api/v1/integration/sites/{site_id}/publishing-settings/
PATCH /api/v1/integration/sites/{site_id}/publishing-settings/
"""
permission_classes = [IsAuthenticatedAndActive, IsEditorOrAbove]
throttle_scope = 'integration'
throttle_classes = [DebugScopedRateThrottle]
def get_permissions(self):
if self.action == 'retrieve':
return [IsAuthenticatedAndActive(), IsViewerOrAbove()]
return [IsAuthenticatedAndActive(), IsEditorOrAbove()]
def _get_site(self, site_id, request):
"""Get site and verify user has access"""
from igny8_core.auth.models import Site
try:
site = Site.objects.get(id=int(site_id))
# Check if user has access to this site (same account)
if hasattr(request, 'account') and site.account != request.account:
return None
return site
except (Site.DoesNotExist, ValueError, TypeError):
return None
@extend_schema(tags=['Integration'])
def retrieve(self, request, site_id=None):
"""
Get publishing settings for a site.
Creates default settings if they don't exist.
"""
site = self._get_site(site_id, request)
if not site:
return error_response(
'Site not found or access denied',
None,
status.HTTP_404_NOT_FOUND,
request
)
# Get or create settings with defaults
settings, created = PublishingSettings.get_or_create_for_site(site)
serializer = PublishingSettingsSerializer(settings)
return success_response(
data=serializer.data,
message='Publishing settings retrieved' + (' (created with defaults)' if created else ''),
request=request
)
@extend_schema(tags=['Integration'])
def update(self, request, site_id=None):
"""
Update publishing settings for a site (full update).
"""
site = self._get_site(site_id, request)
if not site:
return error_response(
'Site not found or access denied',
None,
status.HTTP_404_NOT_FOUND,
request
)
# Get or create settings
settings, _ = PublishingSettings.get_or_create_for_site(site)
serializer = PublishingSettingsSerializer(settings, data=request.data)
if serializer.is_valid():
serializer.save()
return success_response(
data=serializer.data,
message='Publishing settings updated',
request=request
)
return error_response(
'Validation failed',
serializer.errors,
status.HTTP_400_BAD_REQUEST,
request
)
@extend_schema(tags=['Integration'])
def partial_update(self, request, site_id=None):
"""
Partially update publishing settings for a site.
"""
site = self._get_site(site_id, request)
if not site:
return error_response(
'Site not found or access denied',
None,
status.HTTP_404_NOT_FOUND,
request
)
# Get or create settings
settings, _ = PublishingSettings.get_or_create_for_site(site)
serializer = PublishingSettingsSerializer(settings, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return success_response(
data=serializer.data,
message='Publishing settings updated',
request=request
)
return error_response(
'Validation failed',
serializer.errors,
status.HTTP_400_BAD_REQUEST,
request
)