Files
igny8/backend/igny8_core/modules/integration/views.py
alorig 93923f25aa Require API key for WordPress integration auth
Updated the WordPress integration validation to require an API key as the sole authentication method, removing support for username and application password authentication.
2025-11-24 07:45:45 +05:00

603 lines
23 KiB
Python

"""
Integration ViewSet
Phase 6: Site Integration & Multi-Destination Publishing
"""
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.response import Response
from django.utils import timezone
from igny8_core.api.base import SiteSectorModelViewSet
from igny8_core.api.permissions import IsAuthenticatedAndActive, IsEditorOrAbove
from igny8_core.api.response import success_response, error_response
from igny8_core.api.throttles import DebugScopedRateThrottle
from igny8_core.business.integration.models import SiteIntegration
from igny8_core.business.integration.services.integration_service import IntegrationService
from igny8_core.business.integration.services.sync_service import SyncService
from igny8_core.business.integration.services.sync_health_service import SyncHealthService
from igny8_core.business.integration.services.content_sync_service import ContentSyncService
import logging
logger = logging.getLogger(__name__)
class IntegrationViewSet(SiteSectorModelViewSet):
"""
ViewSet for SiteIntegration model.
"""
queryset = SiteIntegration.objects.select_related('site')
permission_classes = [IsAuthenticatedAndActive, IsEditorOrAbove]
throttle_scope = 'integration'
throttle_classes = [DebugScopedRateThrottle]
def get_queryset(self):
"""
Override to filter integrations by site.
SiteIntegration only has 'site' field (no 'sector'), so SiteSectorModelViewSet's
filtering doesn't apply. We manually filter by site here.
"""
queryset = super().get_queryset()
# Get site parameter from query params
site_id = self.request.query_params.get('site_id') or self.request.query_params.get('site')
if site_id:
try:
site_id_int = int(site_id)
queryset = queryset.filter(site_id=site_id_int)
except (ValueError, TypeError):
# Invalid site_id, return empty queryset
queryset = queryset.none()
return queryset
def get_serializer_class(self):
from rest_framework import serializers
class SiteIntegrationSerializer(serializers.ModelSerializer):
def validate(self, data):
"""
Custom validation for WordPress integrations.
API key is the only required authentication method.
"""
validated_data = super().validate(data)
# For WordPress platform, require API key only
if validated_data.get('platform') == 'wordpress':
credentials = validated_data.get('credentials_json', {})
# API key is required for all WordPress integrations
if not credentials.get('api_key'):
raise serializers.ValidationError({
'credentials_json': 'API key is required for WordPress integration.'
})
return validated_data
class Meta:
model = SiteIntegration
fields = '__all__'
read_only_fields = ['created_at', 'updated_at', 'last_sync_at']
return SiteIntegrationSerializer
@action(detail=True, methods=['post'])
def test_connection(self, request, pk=None):
"""
Test connection to integrated platform.
POST /api/v1/integration/integrations/{id}/test_connection/
"""
integration = self.get_object()
service = IntegrationService()
result = service.test_connection(integration)
if result.get('success'):
return success_response(result, request=request)
else:
return error_response(
result.get('message', 'Connection test failed'),
None,
status.HTTP_400_BAD_REQUEST,
request
)
from rest_framework.permissions import AllowAny
from rest_framework.throttling import BaseThrottle
class NoThrottle(BaseThrottle):
"""Temporary throttle class that allows all requests"""
def allow_request(self, request, view):
return True
@action(detail=False, methods=['post'], url_path='test-connection',
permission_classes=[AllowAny], throttle_classes=[NoThrottle])
def test_connection_collection(self, request):
"""
Collection-level test connection endpoint for frontend convenience.
POST /api/v1/integration/integrations/test-connection/
Body:
{
"site_id": 123,
"api_key": "...",
"site_url": "https://example.com"
}
"""
site_id = request.data.get('site_id')
api_key = request.data.get('api_key')
site_url = request.data.get('site_url')
if not site_id:
return error_response('site_id is required', None, status.HTTP_400_BAD_REQUEST, request)
# Verify site exists
from igny8_core.auth.models import Site
try:
site = Site.objects.get(id=int(site_id))
except (Site.DoesNotExist, ValueError, TypeError):
return error_response('Site not found or invalid', None, status.HTTP_404_NOT_FOUND, request)
# Authentication: accept either authenticated user OR matching API key in body
api_key = request.data.get('api_key') or api_key
authenticated = False
# If request has a valid user and belongs to same account, allow
if hasattr(request, 'user') and getattr(request.user, 'is_authenticated', False):
try:
# If user has account, ensure site belongs to user's account
if site.account == request.user.account:
authenticated = True
except Exception:
# Ignore and fallback to api_key check
pass
# If not authenticated via session, allow if provided api_key matches site's stored wp_api_key
if not authenticated:
stored_key = getattr(site, 'wp_api_key', None)
if stored_key and api_key and str(api_key) == str(stored_key):
authenticated = True
elif not stored_key:
# API key not set on site - provide helpful error message
return error_response(
'API key not configured for this site. Please generate an API key in the IGNY8 app and ensure it is saved to the site.',
None,
status.HTTP_403_FORBIDDEN,
request
)
elif api_key and stored_key and str(api_key) != str(stored_key):
# API key provided but doesn't match
return error_response(
'Invalid API key. The provided API key does not match the one stored for this site.',
None,
status.HTTP_403_FORBIDDEN,
request
)
if not authenticated:
return error_response('Authentication credentials were not provided.', None, status.HTTP_403_FORBIDDEN, request)
# Try to find an existing integration for this site+platform
integration = SiteIntegration.objects.filter(site=site, platform='wordpress').first()
# If not found, create and save the integration to database
integration_created = False
if not integration:
integration = SiteIntegration.objects.create(
account=site.account,
site=site,
platform='wordpress',
platform_type='cms',
config_json={'site_url': site_url} if site_url else {},
credentials_json={'api_key': api_key} if api_key else {},
is_active=True,
sync_enabled=True
)
integration_created = True
logger.info(f"[IntegrationViewSet] Created WordPress integration {integration.id} for site {site.id}")
service = IntegrationService()
# Mark this as initial connection test since API key was provided in request body
# This allows the test to pass even if WordPress plugin hasn't stored the key yet
is_initial_connection = bool(api_key and request.data.get('api_key'))
result = service._test_wordpress_connection(integration, is_initial_connection=is_initial_connection)
if result.get('success'):
# Include integration_id in response so plugin can store it
result['integration_id'] = integration.id
result['integration_created'] = integration_created
return success_response(result, request=request)
else:
# If test failed and we just created integration, delete it
if integration_created:
integration.delete()
logger.info(f"[IntegrationViewSet] Deleted integration {integration.id} due to failed connection test")
return error_response(result.get('message', 'Connection test failed'), None, status.HTTP_400_BAD_REQUEST, request)
@action(detail=True, methods=['post'])
def sync(self, request, pk=None):
"""
Trigger synchronization with integrated platform.
POST /api/v1/integration/integrations/{id}/sync/
Request body:
{
"direction": "both", # 'both', 'to_external', 'from_external'
"content_types": ["blog_post", "page"] # Optional
}
"""
integration = self.get_object()
direction = request.data.get('direction', 'both')
content_types = request.data.get('content_types')
sync_service = SyncService()
result = sync_service.sync(integration, direction=direction, content_types=content_types)
response_status = status.HTTP_200_OK if result.get('success') else status.HTTP_400_BAD_REQUEST
return success_response(result, request=request, status_code=response_status)
@action(detail=True, methods=['get'])
def sync_status(self, request, pk=None):
"""
Get sync status for integration.
GET /api/v1/integration/integrations/{id}/sync_status/
"""
integration = self.get_object()
sync_service = SyncService()
status_data = sync_service.get_sync_status(integration)
return success_response(status_data, request=request)
@action(detail=True, methods=['post'], url_path='update-structure')
def update_site_structure(self, request, pk=None):
"""
Update WordPress site structure (post types, taxonomies, counts).
Called by WordPress plugin to push site configuration to backend.
POST /api/v1/integration/integrations/{id}/update-structure/
Request body:
{
"post_types": {
"post": {"label": "Posts", "count": 123, "enabled": true, "fetch_limit": 100},
"page": {"label": "Pages", "count": 12, "enabled": true, "fetch_limit": 100},
"product": {"label": "Products", "count": 456, "enabled": false, "fetch_limit": 50}
},
"taxonomies": {
"category": {"label": "Categories", "count": 25, "enabled": true, "fetch_limit": 100},
"post_tag": {"label": "Tags", "count": 102, "enabled": true, "fetch_limit": 100},
"product_cat": {"label": "Product Categories", "count": 15, "enabled": false, "fetch_limit": 50}
},
"plugin_connection_enabled": true,
"two_way_sync_enabled": true
}
"""
integration = self.get_object()
# Update config with new structure
config = integration.config_json or {}
post_types = request.data.get('post_types', {})
taxonomies = request.data.get('taxonomies', {})
if post_types or taxonomies:
config['content_types'] = {
'post_types': post_types,
'taxonomies': taxonomies,
'last_structure_fetch': request.data.get('timestamp') or str(timezone.now().isoformat())
}
config['plugin_connection_enabled'] = request.data.get('plugin_connection_enabled', True)
config['two_way_sync_enabled'] = request.data.get('two_way_sync_enabled', True)
integration.config_json = config
integration.save()
return success_response({
'message': 'Site structure updated successfully',
'post_types_count': len(post_types),
'taxonomies_count': len(taxonomies),
'last_structure_fetch': config['content_types']['last_structure_fetch']
}, request=request)
return error_response(
'No post types or taxonomies provided',
None,
status.HTTP_400_BAD_REQUEST,
request
)
@action(detail=True, methods=['get'], url_path='content-types')
def content_types_summary(self, request, pk=None):
"""
Get content types summary with counts from synced data.
GET /api/v1/integration/integrations/{id}/content-types/
Returns:
{
"success": true,
"data": {
"post_types": {
"post": {"label": "Posts", "count": 123, "synced_count": 50},
"page": {"label": "Pages", "count": 12, "synced_count": 12},
"product": {"label": "Products", "count": 456, "synced_count": 200}
},
"taxonomies": {
"category": {"label": "Categories", "count": 25, "synced_count": 25},
"post_tag": {"label": "Tags", "count": 102, "synced_count": 80},
"product_cat": {"label": "Product Categories", "count": 15, "synced_count": 15}
},
"last_structure_fetch": "2025-11-22T10:00:00Z"
}
}
"""
integration = self.get_object()
site = integration.site
# Get config from integration
config = integration.config_json or {}
content_types = config.get('content_types', {})
# Get synced counts from Content and ContentTaxonomy models
from igny8_core.business.content.models import Content, ContentTaxonomy
# Build response with synced counts
post_types_data = {}
for wp_type, type_config in content_types.get('post_types', {}).items():
# Map WP type to entity_type
entity_type_map = {
'post': 'post',
'page': 'page',
'product': 'product',
'service': 'service',
}
entity_type = entity_type_map.get(wp_type, 'post')
# Count synced content
synced_count = Content.objects.filter(
site=site,
entity_type=entity_type,
external_type=wp_type,
sync_status__in=['imported', 'synced']
).count()
post_types_data[wp_type] = {
'label': type_config.get('label', wp_type.title()),
'count': type_config.get('count', 0),
'synced_count': synced_count,
'enabled': type_config.get('enabled', False),
'fetch_limit': type_config.get('fetch_limit', 100),
'last_synced': type_config.get('last_synced'),
}
taxonomies_data = {}
for wp_tax, tax_config in content_types.get('taxonomies', {}).items():
# Count synced taxonomies
synced_count = ContentTaxonomy.objects.filter(
site=site,
external_taxonomy=wp_tax,
sync_status__in=['imported', 'synced']
).count()
taxonomies_data[wp_tax] = {
'label': tax_config.get('label', wp_tax.title()),
'count': tax_config.get('count', 0),
'synced_count': synced_count,
'enabled': tax_config.get('enabled', False),
'fetch_limit': tax_config.get('fetch_limit', 100),
'last_synced': tax_config.get('last_synced'),
}
summary = {
'post_types': post_types_data,
'taxonomies': taxonomies_data,
'last_structure_fetch': content_types.get('last_structure_fetch'),
'plugin_connection_enabled': config.get('plugin_connection_enabled', True),
'two_way_sync_enabled': config.get('two_way_sync_enabled', True),
}
return success_response(summary, request=request)
# Stage 4: Site-level sync endpoints
@action(detail=False, methods=['get'], url_path='sites/(?P<site_id>[^/.]+)/sync/status')
def sync_status_by_site(self, request, site_id=None):
"""
Get sync status for all integrations on a site.
Stage 4: Site-level sync health endpoint.
GET /api/v1/integration/integrations/sites/{site_id}/sync/status/
"""
try:
site_id_int = int(site_id)
except (ValueError, TypeError):
return error_response(
'Invalid site_id',
None,
status.HTTP_400_BAD_REQUEST,
request
)
# Verify site belongs to user's account
from igny8_core.auth.models import Site
try:
site = Site.objects.get(id=site_id_int, account=request.user.account)
except Site.DoesNotExist:
return error_response(
'Site not found',
None,
status.HTTP_404_NOT_FOUND,
request
)
sync_health_service = SyncHealthService()
status_data = sync_health_service.get_sync_status(site_id_int)
return success_response(status_data, request=request)
@action(detail=False, methods=['post'], url_path='sites/(?P<site_id>[^/.]+)/sync/run')
def run_sync(self, request, site_id=None):
"""
Trigger sync for all integrations on a site.
Stage 4: Site-level sync trigger endpoint.
POST /api/v1/integration/integrations/sites/{site_id}/sync/run/
Request body:
{
"direction": "both", # Optional: 'both', 'to_external', 'from_external'
"content_types": ["blog_post", "product"] # Optional
}
"""
try:
site_id_int = int(site_id)
except (ValueError, TypeError):
return error_response(
'Invalid site_id',
None,
status.HTTP_400_BAD_REQUEST,
request
)
# Verify site belongs to user's account
from igny8_core.auth.models import Site
try:
site = Site.objects.get(id=site_id_int, account=request.user.account)
except Site.DoesNotExist:
return error_response(
'Site not found',
None,
status.HTTP_404_NOT_FOUND,
request
)
direction = request.data.get('direction', 'both')
content_types = request.data.get('content_types')
# Get all active integrations for this site
integrations = SiteIntegration.objects.filter(
site_id=site_id_int,
is_active=True,
sync_enabled=True
)
if not integrations.exists():
return error_response(
'No active integrations found for this site',
None,
status.HTTP_400_BAD_REQUEST,
request
)
sync_service = SyncService()
sync_health_service = SyncHealthService()
results = []
for integration in integrations:
result = sync_service.sync(integration, direction=direction, content_types=content_types)
# Record sync run
sync_health_service.record_sync_run(integration.id, result)
results.append({
'integration_id': integration.id,
'platform': integration.platform,
'result': result
})
return success_response({
'site_id': site_id_int,
'sync_results': results,
'total_integrations': len(results)
}, request=request)
@action(detail=False, methods=['get'], url_path='sites/(?P<site_id>[^/.]+)/sync/mismatches')
def get_mismatches(self, request, site_id=None):
"""
Get sync mismatches for a site.
Stage 4: Detailed mismatch information.
GET /api/v1/integration/integrations/sites/{site_id}/sync/mismatches/
"""
try:
site_id_int = int(site_id)
except (ValueError, TypeError):
return error_response(
'Invalid site_id',
None,
status.HTTP_400_BAD_REQUEST,
request
)
# Verify site belongs to user's account
from igny8_core.auth.models import Site
try:
site = Site.objects.get(id=site_id_int, account=request.user.account)
except Site.DoesNotExist:
return error_response(
'Site not found',
None,
status.HTTP_404_NOT_FOUND,
request
)
sync_health_service = SyncHealthService()
mismatches = sync_health_service.get_mismatches(site_id_int)
return success_response(mismatches, request=request)
@action(detail=False, methods=['get'], url_path='sites/(?P<site_id>[^/.]+)/sync/logs')
def get_sync_logs(self, request, site_id=None):
"""
Get sync logs for a site.
Stage 4: Sync history and logs.
GET /api/v1/integration/integrations/sites/{site_id}/sync/logs/
Query params:
- limit: Number of logs to return (default: 100)
- integration_id: Filter by specific integration
"""
try:
site_id_int = int(site_id)
except (ValueError, TypeError):
return error_response(
'Invalid site_id',
None,
status.HTTP_400_BAD_REQUEST,
request
)
# Verify site belongs to user's account
from igny8_core.auth.models import Site
try:
site = Site.objects.get(id=site_id_int, account=request.user.account)
except Site.DoesNotExist:
return error_response(
'Site not found',
None,
status.HTTP_404_NOT_FOUND,
request
)
limit = int(request.query_params.get('limit', 100))
integration_id = request.query_params.get('integration_id')
sync_health_service = SyncHealthService()
logs = sync_health_service.get_sync_logs(
site_id_int,
integration_id=int(integration_id) if integration_id else None,
limit=limit
)
return success_response({
'site_id': site_id_int,
'logs': logs,
'count': len(logs)
}, request=request)