Updated the WordPress integration validation to require an API key as the sole authentication method, removing support for username and application password authentication.
603 lines
23 KiB
Python
603 lines
23 KiB
Python
"""
|
|
Integration ViewSet
|
|
Phase 6: Site Integration & Multi-Destination Publishing
|
|
"""
|
|
from rest_framework import status
|
|
from rest_framework.decorators import action
|
|
from rest_framework.response import Response
|
|
from django.utils import timezone
|
|
|
|
from igny8_core.api.base import SiteSectorModelViewSet
|
|
from igny8_core.api.permissions import IsAuthenticatedAndActive, IsEditorOrAbove
|
|
from igny8_core.api.response import success_response, error_response
|
|
from igny8_core.api.throttles import DebugScopedRateThrottle
|
|
from igny8_core.business.integration.models import SiteIntegration
|
|
from igny8_core.business.integration.services.integration_service import IntegrationService
|
|
from igny8_core.business.integration.services.sync_service import SyncService
|
|
from igny8_core.business.integration.services.sync_health_service import SyncHealthService
|
|
from igny8_core.business.integration.services.content_sync_service import ContentSyncService
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IntegrationViewSet(SiteSectorModelViewSet):
|
|
"""
|
|
ViewSet for SiteIntegration model.
|
|
"""
|
|
queryset = SiteIntegration.objects.select_related('site')
|
|
permission_classes = [IsAuthenticatedAndActive, IsEditorOrAbove]
|
|
throttle_scope = 'integration'
|
|
throttle_classes = [DebugScopedRateThrottle]
|
|
|
|
def get_queryset(self):
|
|
"""
|
|
Override to filter integrations by site.
|
|
SiteIntegration only has 'site' field (no 'sector'), so SiteSectorModelViewSet's
|
|
filtering doesn't apply. We manually filter by site here.
|
|
"""
|
|
queryset = super().get_queryset()
|
|
|
|
# Get site parameter from query params
|
|
site_id = self.request.query_params.get('site_id') or self.request.query_params.get('site')
|
|
|
|
if site_id:
|
|
try:
|
|
site_id_int = int(site_id)
|
|
queryset = queryset.filter(site_id=site_id_int)
|
|
except (ValueError, TypeError):
|
|
# Invalid site_id, return empty queryset
|
|
queryset = queryset.none()
|
|
|
|
return queryset
|
|
|
|
def get_serializer_class(self):
|
|
from rest_framework import serializers
|
|
|
|
class SiteIntegrationSerializer(serializers.ModelSerializer):
|
|
def validate(self, data):
|
|
"""
|
|
Custom validation for WordPress integrations.
|
|
API key is the only required authentication method.
|
|
"""
|
|
validated_data = super().validate(data)
|
|
|
|
# For WordPress platform, require API key only
|
|
if validated_data.get('platform') == 'wordpress':
|
|
credentials = validated_data.get('credentials_json', {})
|
|
|
|
# API key is required for all WordPress integrations
|
|
if not credentials.get('api_key'):
|
|
raise serializers.ValidationError({
|
|
'credentials_json': 'API key is required for WordPress integration.'
|
|
})
|
|
|
|
return validated_data
|
|
|
|
class Meta:
|
|
model = SiteIntegration
|
|
fields = '__all__'
|
|
read_only_fields = ['created_at', 'updated_at', 'last_sync_at']
|
|
|
|
return SiteIntegrationSerializer
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def test_connection(self, request, pk=None):
|
|
"""
|
|
Test connection to integrated platform.
|
|
|
|
POST /api/v1/integration/integrations/{id}/test_connection/
|
|
"""
|
|
integration = self.get_object()
|
|
|
|
service = IntegrationService()
|
|
result = service.test_connection(integration)
|
|
|
|
if result.get('success'):
|
|
return success_response(result, request=request)
|
|
else:
|
|
return error_response(
|
|
result.get('message', 'Connection test failed'),
|
|
None,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
request
|
|
)
|
|
|
|
from rest_framework.permissions import AllowAny
|
|
from rest_framework.throttling import BaseThrottle
|
|
|
|
class NoThrottle(BaseThrottle):
|
|
"""Temporary throttle class that allows all requests"""
|
|
def allow_request(self, request, view):
|
|
return True
|
|
|
|
@action(detail=False, methods=['post'], url_path='test-connection',
|
|
permission_classes=[AllowAny], throttle_classes=[NoThrottle])
|
|
def test_connection_collection(self, request):
|
|
"""
|
|
Collection-level test connection endpoint for frontend convenience.
|
|
POST /api/v1/integration/integrations/test-connection/
|
|
|
|
Body:
|
|
{
|
|
"site_id": 123,
|
|
"api_key": "...",
|
|
"site_url": "https://example.com"
|
|
}
|
|
"""
|
|
site_id = request.data.get('site_id')
|
|
api_key = request.data.get('api_key')
|
|
site_url = request.data.get('site_url')
|
|
|
|
if not site_id:
|
|
return error_response('site_id is required', None, status.HTTP_400_BAD_REQUEST, request)
|
|
|
|
# Verify site exists
|
|
from igny8_core.auth.models import Site
|
|
try:
|
|
site = Site.objects.get(id=int(site_id))
|
|
except (Site.DoesNotExist, ValueError, TypeError):
|
|
return error_response('Site not found or invalid', None, status.HTTP_404_NOT_FOUND, request)
|
|
|
|
# Authentication: accept either authenticated user OR matching API key in body
|
|
api_key = request.data.get('api_key') or api_key
|
|
authenticated = False
|
|
# If request has a valid user and belongs to same account, allow
|
|
if hasattr(request, 'user') and getattr(request.user, 'is_authenticated', False):
|
|
try:
|
|
# If user has account, ensure site belongs to user's account
|
|
if site.account == request.user.account:
|
|
authenticated = True
|
|
except Exception:
|
|
# Ignore and fallback to api_key check
|
|
pass
|
|
|
|
# If not authenticated via session, allow if provided api_key matches site's stored wp_api_key
|
|
if not authenticated:
|
|
stored_key = getattr(site, 'wp_api_key', None)
|
|
if stored_key and api_key and str(api_key) == str(stored_key):
|
|
authenticated = True
|
|
elif not stored_key:
|
|
# API key not set on site - provide helpful error message
|
|
return error_response(
|
|
'API key not configured for this site. Please generate an API key in the IGNY8 app and ensure it is saved to the site.',
|
|
None,
|
|
status.HTTP_403_FORBIDDEN,
|
|
request
|
|
)
|
|
elif api_key and stored_key and str(api_key) != str(stored_key):
|
|
# API key provided but doesn't match
|
|
return error_response(
|
|
'Invalid API key. The provided API key does not match the one stored for this site.',
|
|
None,
|
|
status.HTTP_403_FORBIDDEN,
|
|
request
|
|
)
|
|
|
|
if not authenticated:
|
|
return error_response('Authentication credentials were not provided.', None, status.HTTP_403_FORBIDDEN, request)
|
|
|
|
# Try to find an existing integration for this site+platform
|
|
integration = SiteIntegration.objects.filter(site=site, platform='wordpress').first()
|
|
|
|
# If not found, create and save the integration to database
|
|
integration_created = False
|
|
if not integration:
|
|
integration = SiteIntegration.objects.create(
|
|
account=site.account,
|
|
site=site,
|
|
platform='wordpress',
|
|
platform_type='cms',
|
|
config_json={'site_url': site_url} if site_url else {},
|
|
credentials_json={'api_key': api_key} if api_key else {},
|
|
is_active=True,
|
|
sync_enabled=True
|
|
)
|
|
integration_created = True
|
|
logger.info(f"[IntegrationViewSet] Created WordPress integration {integration.id} for site {site.id}")
|
|
|
|
service = IntegrationService()
|
|
# Mark this as initial connection test since API key was provided in request body
|
|
# This allows the test to pass even if WordPress plugin hasn't stored the key yet
|
|
is_initial_connection = bool(api_key and request.data.get('api_key'))
|
|
result = service._test_wordpress_connection(integration, is_initial_connection=is_initial_connection)
|
|
|
|
if result.get('success'):
|
|
# Include integration_id in response so plugin can store it
|
|
result['integration_id'] = integration.id
|
|
result['integration_created'] = integration_created
|
|
return success_response(result, request=request)
|
|
else:
|
|
# If test failed and we just created integration, delete it
|
|
if integration_created:
|
|
integration.delete()
|
|
logger.info(f"[IntegrationViewSet] Deleted integration {integration.id} due to failed connection test")
|
|
return error_response(result.get('message', 'Connection test failed'), None, status.HTTP_400_BAD_REQUEST, request)
|
|
|
|
@action(detail=True, methods=['post'])
|
|
def sync(self, request, pk=None):
|
|
"""
|
|
Trigger synchronization with integrated platform.
|
|
|
|
POST /api/v1/integration/integrations/{id}/sync/
|
|
|
|
Request body:
|
|
{
|
|
"direction": "both", # 'both', 'to_external', 'from_external'
|
|
"content_types": ["blog_post", "page"] # Optional
|
|
}
|
|
"""
|
|
integration = self.get_object()
|
|
|
|
direction = request.data.get('direction', 'both')
|
|
content_types = request.data.get('content_types')
|
|
|
|
sync_service = SyncService()
|
|
result = sync_service.sync(integration, direction=direction, content_types=content_types)
|
|
|
|
response_status = status.HTTP_200_OK if result.get('success') else status.HTTP_400_BAD_REQUEST
|
|
return success_response(result, request=request, status_code=response_status)
|
|
|
|
@action(detail=True, methods=['get'])
|
|
def sync_status(self, request, pk=None):
|
|
"""
|
|
Get sync status for integration.
|
|
|
|
GET /api/v1/integration/integrations/{id}/sync_status/
|
|
"""
|
|
integration = self.get_object()
|
|
|
|
sync_service = SyncService()
|
|
status_data = sync_service.get_sync_status(integration)
|
|
|
|
return success_response(status_data, request=request)
|
|
|
|
@action(detail=True, methods=['post'], url_path='update-structure')
|
|
def update_site_structure(self, request, pk=None):
|
|
"""
|
|
Update WordPress site structure (post types, taxonomies, counts).
|
|
Called by WordPress plugin to push site configuration to backend.
|
|
|
|
POST /api/v1/integration/integrations/{id}/update-structure/
|
|
|
|
Request body:
|
|
{
|
|
"post_types": {
|
|
"post": {"label": "Posts", "count": 123, "enabled": true, "fetch_limit": 100},
|
|
"page": {"label": "Pages", "count": 12, "enabled": true, "fetch_limit": 100},
|
|
"product": {"label": "Products", "count": 456, "enabled": false, "fetch_limit": 50}
|
|
},
|
|
"taxonomies": {
|
|
"category": {"label": "Categories", "count": 25, "enabled": true, "fetch_limit": 100},
|
|
"post_tag": {"label": "Tags", "count": 102, "enabled": true, "fetch_limit": 100},
|
|
"product_cat": {"label": "Product Categories", "count": 15, "enabled": false, "fetch_limit": 50}
|
|
},
|
|
"plugin_connection_enabled": true,
|
|
"two_way_sync_enabled": true
|
|
}
|
|
"""
|
|
integration = self.get_object()
|
|
|
|
# Update config with new structure
|
|
config = integration.config_json or {}
|
|
|
|
post_types = request.data.get('post_types', {})
|
|
taxonomies = request.data.get('taxonomies', {})
|
|
|
|
if post_types or taxonomies:
|
|
config['content_types'] = {
|
|
'post_types': post_types,
|
|
'taxonomies': taxonomies,
|
|
'last_structure_fetch': request.data.get('timestamp') or str(timezone.now().isoformat())
|
|
}
|
|
config['plugin_connection_enabled'] = request.data.get('plugin_connection_enabled', True)
|
|
config['two_way_sync_enabled'] = request.data.get('two_way_sync_enabled', True)
|
|
|
|
integration.config_json = config
|
|
integration.save()
|
|
|
|
return success_response({
|
|
'message': 'Site structure updated successfully',
|
|
'post_types_count': len(post_types),
|
|
'taxonomies_count': len(taxonomies),
|
|
'last_structure_fetch': config['content_types']['last_structure_fetch']
|
|
}, request=request)
|
|
|
|
return error_response(
|
|
'No post types or taxonomies provided',
|
|
None,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
request
|
|
)
|
|
|
|
@action(detail=True, methods=['get'], url_path='content-types')
|
|
def content_types_summary(self, request, pk=None):
|
|
"""
|
|
Get content types summary with counts from synced data.
|
|
|
|
GET /api/v1/integration/integrations/{id}/content-types/
|
|
|
|
Returns:
|
|
{
|
|
"success": true,
|
|
"data": {
|
|
"post_types": {
|
|
"post": {"label": "Posts", "count": 123, "synced_count": 50},
|
|
"page": {"label": "Pages", "count": 12, "synced_count": 12},
|
|
"product": {"label": "Products", "count": 456, "synced_count": 200}
|
|
},
|
|
"taxonomies": {
|
|
"category": {"label": "Categories", "count": 25, "synced_count": 25},
|
|
"post_tag": {"label": "Tags", "count": 102, "synced_count": 80},
|
|
"product_cat": {"label": "Product Categories", "count": 15, "synced_count": 15}
|
|
},
|
|
"last_structure_fetch": "2025-11-22T10:00:00Z"
|
|
}
|
|
}
|
|
"""
|
|
integration = self.get_object()
|
|
site = integration.site
|
|
|
|
# Get config from integration
|
|
config = integration.config_json or {}
|
|
content_types = config.get('content_types', {})
|
|
|
|
# Get synced counts from Content and ContentTaxonomy models
|
|
from igny8_core.business.content.models import Content, ContentTaxonomy
|
|
|
|
# Build response with synced counts
|
|
post_types_data = {}
|
|
for wp_type, type_config in content_types.get('post_types', {}).items():
|
|
# Map WP type to entity_type
|
|
entity_type_map = {
|
|
'post': 'post',
|
|
'page': 'page',
|
|
'product': 'product',
|
|
'service': 'service',
|
|
}
|
|
entity_type = entity_type_map.get(wp_type, 'post')
|
|
|
|
# Count synced content
|
|
synced_count = Content.objects.filter(
|
|
site=site,
|
|
entity_type=entity_type,
|
|
external_type=wp_type,
|
|
sync_status__in=['imported', 'synced']
|
|
).count()
|
|
|
|
post_types_data[wp_type] = {
|
|
'label': type_config.get('label', wp_type.title()),
|
|
'count': type_config.get('count', 0),
|
|
'synced_count': synced_count,
|
|
'enabled': type_config.get('enabled', False),
|
|
'fetch_limit': type_config.get('fetch_limit', 100),
|
|
'last_synced': type_config.get('last_synced'),
|
|
}
|
|
|
|
taxonomies_data = {}
|
|
for wp_tax, tax_config in content_types.get('taxonomies', {}).items():
|
|
# Count synced taxonomies
|
|
synced_count = ContentTaxonomy.objects.filter(
|
|
site=site,
|
|
external_taxonomy=wp_tax,
|
|
sync_status__in=['imported', 'synced']
|
|
).count()
|
|
|
|
taxonomies_data[wp_tax] = {
|
|
'label': tax_config.get('label', wp_tax.title()),
|
|
'count': tax_config.get('count', 0),
|
|
'synced_count': synced_count,
|
|
'enabled': tax_config.get('enabled', False),
|
|
'fetch_limit': tax_config.get('fetch_limit', 100),
|
|
'last_synced': tax_config.get('last_synced'),
|
|
}
|
|
|
|
summary = {
|
|
'post_types': post_types_data,
|
|
'taxonomies': taxonomies_data,
|
|
'last_structure_fetch': content_types.get('last_structure_fetch'),
|
|
'plugin_connection_enabled': config.get('plugin_connection_enabled', True),
|
|
'two_way_sync_enabled': config.get('two_way_sync_enabled', True),
|
|
}
|
|
|
|
return success_response(summary, request=request)
|
|
|
|
# Stage 4: Site-level sync endpoints
|
|
|
|
@action(detail=False, methods=['get'], url_path='sites/(?P<site_id>[^/.]+)/sync/status')
|
|
def sync_status_by_site(self, request, site_id=None):
|
|
"""
|
|
Get sync status for all integrations on a site.
|
|
Stage 4: Site-level sync health endpoint.
|
|
|
|
GET /api/v1/integration/integrations/sites/{site_id}/sync/status/
|
|
"""
|
|
try:
|
|
site_id_int = int(site_id)
|
|
except (ValueError, TypeError):
|
|
return error_response(
|
|
'Invalid site_id',
|
|
None,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
request
|
|
)
|
|
|
|
# Verify site belongs to user's account
|
|
from igny8_core.auth.models import Site
|
|
try:
|
|
site = Site.objects.get(id=site_id_int, account=request.user.account)
|
|
except Site.DoesNotExist:
|
|
return error_response(
|
|
'Site not found',
|
|
None,
|
|
status.HTTP_404_NOT_FOUND,
|
|
request
|
|
)
|
|
|
|
sync_health_service = SyncHealthService()
|
|
status_data = sync_health_service.get_sync_status(site_id_int)
|
|
|
|
return success_response(status_data, request=request)
|
|
|
|
@action(detail=False, methods=['post'], url_path='sites/(?P<site_id>[^/.]+)/sync/run')
|
|
def run_sync(self, request, site_id=None):
|
|
"""
|
|
Trigger sync for all integrations on a site.
|
|
Stage 4: Site-level sync trigger endpoint.
|
|
|
|
POST /api/v1/integration/integrations/sites/{site_id}/sync/run/
|
|
|
|
Request body:
|
|
{
|
|
"direction": "both", # Optional: 'both', 'to_external', 'from_external'
|
|
"content_types": ["blog_post", "product"] # Optional
|
|
}
|
|
"""
|
|
try:
|
|
site_id_int = int(site_id)
|
|
except (ValueError, TypeError):
|
|
return error_response(
|
|
'Invalid site_id',
|
|
None,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
request
|
|
)
|
|
|
|
# Verify site belongs to user's account
|
|
from igny8_core.auth.models import Site
|
|
try:
|
|
site = Site.objects.get(id=site_id_int, account=request.user.account)
|
|
except Site.DoesNotExist:
|
|
return error_response(
|
|
'Site not found',
|
|
None,
|
|
status.HTTP_404_NOT_FOUND,
|
|
request
|
|
)
|
|
|
|
direction = request.data.get('direction', 'both')
|
|
content_types = request.data.get('content_types')
|
|
|
|
# Get all active integrations for this site
|
|
integrations = SiteIntegration.objects.filter(
|
|
site_id=site_id_int,
|
|
is_active=True,
|
|
sync_enabled=True
|
|
)
|
|
|
|
if not integrations.exists():
|
|
return error_response(
|
|
'No active integrations found for this site',
|
|
None,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
request
|
|
)
|
|
|
|
sync_service = SyncService()
|
|
sync_health_service = SyncHealthService()
|
|
results = []
|
|
|
|
for integration in integrations:
|
|
result = sync_service.sync(integration, direction=direction, content_types=content_types)
|
|
|
|
# Record sync run
|
|
sync_health_service.record_sync_run(integration.id, result)
|
|
|
|
results.append({
|
|
'integration_id': integration.id,
|
|
'platform': integration.platform,
|
|
'result': result
|
|
})
|
|
|
|
return success_response({
|
|
'site_id': site_id_int,
|
|
'sync_results': results,
|
|
'total_integrations': len(results)
|
|
}, request=request)
|
|
|
|
@action(detail=False, methods=['get'], url_path='sites/(?P<site_id>[^/.]+)/sync/mismatches')
|
|
def get_mismatches(self, request, site_id=None):
|
|
"""
|
|
Get sync mismatches for a site.
|
|
Stage 4: Detailed mismatch information.
|
|
|
|
GET /api/v1/integration/integrations/sites/{site_id}/sync/mismatches/
|
|
"""
|
|
try:
|
|
site_id_int = int(site_id)
|
|
except (ValueError, TypeError):
|
|
return error_response(
|
|
'Invalid site_id',
|
|
None,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
request
|
|
)
|
|
|
|
# Verify site belongs to user's account
|
|
from igny8_core.auth.models import Site
|
|
try:
|
|
site = Site.objects.get(id=site_id_int, account=request.user.account)
|
|
except Site.DoesNotExist:
|
|
return error_response(
|
|
'Site not found',
|
|
None,
|
|
status.HTTP_404_NOT_FOUND,
|
|
request
|
|
)
|
|
|
|
sync_health_service = SyncHealthService()
|
|
mismatches = sync_health_service.get_mismatches(site_id_int)
|
|
|
|
return success_response(mismatches, request=request)
|
|
|
|
@action(detail=False, methods=['get'], url_path='sites/(?P<site_id>[^/.]+)/sync/logs')
|
|
def get_sync_logs(self, request, site_id=None):
|
|
"""
|
|
Get sync logs for a site.
|
|
Stage 4: Sync history and logs.
|
|
|
|
GET /api/v1/integration/integrations/sites/{site_id}/sync/logs/
|
|
|
|
Query params:
|
|
- limit: Number of logs to return (default: 100)
|
|
- integration_id: Filter by specific integration
|
|
"""
|
|
try:
|
|
site_id_int = int(site_id)
|
|
except (ValueError, TypeError):
|
|
return error_response(
|
|
'Invalid site_id',
|
|
None,
|
|
status.HTTP_400_BAD_REQUEST,
|
|
request
|
|
)
|
|
|
|
# Verify site belongs to user's account
|
|
from igny8_core.auth.models import Site
|
|
try:
|
|
site = Site.objects.get(id=site_id_int, account=request.user.account)
|
|
except Site.DoesNotExist:
|
|
return error_response(
|
|
'Site not found',
|
|
None,
|
|
status.HTTP_404_NOT_FOUND,
|
|
request
|
|
)
|
|
|
|
limit = int(request.query_params.get('limit', 100))
|
|
integration_id = request.query_params.get('integration_id')
|
|
|
|
sync_health_service = SyncHealthService()
|
|
logs = sync_health_service.get_sync_logs(
|
|
site_id_int,
|
|
integration_id=int(integration_id) if integration_id else None,
|
|
limit=limit
|
|
)
|
|
|
|
return success_response({
|
|
'site_id': site_id_int,
|
|
'logs': logs,
|
|
'count': len(logs)
|
|
}, request=request)
|
|
|