fix fix fix

This commit is contained in:
IGNY8 VPS (Salman)
2025-12-01 00:13:46 +00:00
parent 90b532d13b
commit 42bc24f2c0
11 changed files with 1592 additions and 26 deletions

View File

@@ -0,0 +1,17 @@
# Generated by Django 5.2.8 on 2025-12-01 00:05
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('igny8_core_auth', '0002_add_wp_api_key_to_site'),
]
operations = [
migrations.AlterModelOptions(
name='seedkeyword',
options={'ordering': ['keyword'], 'verbose_name': 'Seed Keyword', 'verbose_name_plural': 'Global Keywords Database'},
),
]

View File

@@ -0,0 +1,42 @@
# Generated by Django 5.2.8 on 2025-12-01 00:05
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('igny8_core_auth', '0003_add_sync_event_model'),
('integration', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='SyncEvent',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('updated_at', models.DateTimeField(auto_now=True)),
('event_type', models.CharField(choices=[('publish', 'Content Published'), ('sync', 'Status Synced'), ('metadata_sync', 'Metadata Synced'), ('error', 'Error'), ('webhook', 'Webhook Received'), ('test', 'Connection Test')], db_index=True, help_text='Type of sync event', max_length=50)),
('action', models.CharField(choices=[('content_publish', 'Content Publish'), ('status_update', 'Status Update'), ('metadata_update', 'Metadata Update'), ('test_connection', 'Test Connection'), ('webhook_received', 'Webhook Received')], db_index=True, help_text='Specific action performed', max_length=100)),
('description', models.TextField(help_text='Human-readable description of the event')),
('success', models.BooleanField(db_index=True, default=True, help_text='Whether the event was successful')),
('content_id', models.IntegerField(blank=True, db_index=True, help_text='IGNY8 content ID if applicable', null=True)),
('external_id', models.CharField(blank=True, db_index=True, help_text='External platform ID (e.g., WordPress post ID)', max_length=255, null=True)),
('details', models.JSONField(default=dict, help_text='Additional event details (request/response data, errors, etc.)')),
('error_message', models.TextField(blank=True, help_text='Error message if event failed', null=True)),
('duration_ms', models.IntegerField(blank=True, help_text='Event duration in milliseconds', null=True)),
('created_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('account', models.ForeignKey(db_column='tenant_id', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_set', to='igny8_core_auth.account')),
('integration', models.ForeignKey(help_text='Integration this event belongs to', on_delete=django.db.models.deletion.CASCADE, related_name='sync_events', to='integration.siteintegration')),
('site', models.ForeignKey(help_text='Site this event belongs to', on_delete=django.db.models.deletion.CASCADE, related_name='sync_events', to='igny8_core_auth.site')),
],
options={
'verbose_name': 'Sync Event',
'verbose_name_plural': 'Sync Events',
'db_table': 'igny8_sync_events',
'ordering': ['-created_at'],
'indexes': [models.Index(fields=['integration', '-created_at'], name='idx_integration_events'), models.Index(fields=['site', '-created_at'], name='idx_site_events'), models.Index(fields=['content_id'], name='idx_content_events'), models.Index(fields=['event_type', '-created_at'], name='idx_event_type_time')],
},
),
]

View File

@@ -129,3 +129,118 @@ class SiteIntegration(AccountBaseModel):
"""
self.credentials_json = credentials
class SyncEvent(AccountBaseModel):
"""
Track sync events for debugging and monitoring.
Stores real-time events for the debug status page.
"""
EVENT_TYPE_CHOICES = [
('publish', 'Content Published'),
('sync', 'Status Synced'),
('metadata_sync', 'Metadata Synced'),
('error', 'Error'),
('webhook', 'Webhook Received'),
('test', 'Connection Test'),
]
ACTION_CHOICES = [
('content_publish', 'Content Publish'),
('status_update', 'Status Update'),
('metadata_update', 'Metadata Update'),
('test_connection', 'Test Connection'),
('webhook_received', 'Webhook Received'),
]
integration = models.ForeignKey(
SiteIntegration,
on_delete=models.CASCADE,
related_name='sync_events',
help_text="Integration this event belongs to"
)
site = models.ForeignKey(
'igny8_core_auth.Site',
on_delete=models.CASCADE,
related_name='sync_events',
help_text="Site this event belongs to"
)
event_type = models.CharField(
max_length=50,
choices=EVENT_TYPE_CHOICES,
db_index=True,
help_text="Type of sync event"
)
action = models.CharField(
max_length=100,
choices=ACTION_CHOICES,
db_index=True,
help_text="Specific action performed"
)
description = models.TextField(
help_text="Human-readable description of the event"
)
success = models.BooleanField(
default=True,
db_index=True,
help_text="Whether the event was successful"
)
# Related object references
content_id = models.IntegerField(
null=True,
blank=True,
db_index=True,
help_text="IGNY8 content ID if applicable"
)
external_id = models.CharField(
max_length=255,
null=True,
blank=True,
db_index=True,
help_text="External platform ID (e.g., WordPress post ID)"
)
# Event details (JSON for flexibility)
details = models.JSONField(
default=dict,
help_text="Additional event details (request/response data, errors, etc.)"
)
error_message = models.TextField(
null=True,
blank=True,
help_text="Error message if event failed"
)
# Duration tracking
duration_ms = models.IntegerField(
null=True,
blank=True,
help_text="Event duration in milliseconds"
)
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
class Meta:
app_label = 'integration'
db_table = 'igny8_sync_events'
verbose_name = 'Sync Event'
verbose_name_plural = 'Sync Events'
ordering = ['-created_at']
indexes = [
models.Index(fields=['integration', '-created_at'], name='idx_integration_events'),
models.Index(fields=['site', '-created_at'], name='idx_site_events'),
models.Index(fields=['content_id'], name='idx_content_events'),
models.Index(fields=['event_type', '-created_at'], name='idx_event_type_time'),
]
def __str__(self):
return f"{self.get_event_type_display()} - {self.description[:50]}"

View File

@@ -6,11 +6,19 @@ from django.urls import path, include
from rest_framework.routers import DefaultRouter
from igny8_core.modules.integration.views import IntegrationViewSet
from igny8_core.modules.integration.webhooks import (
wordpress_status_webhook,
wordpress_metadata_webhook,
)
router = DefaultRouter()
router.register(r'integrations', IntegrationViewSet, basename='integration')
urlpatterns = [
path('', include(router.urls)),
# Webhook endpoints
path('webhooks/wordpress/status/', wordpress_status_webhook, name='wordpress-status-webhook'),
path('webhooks/wordpress/metadata/', wordpress_metadata_webhook, name='wordpress-metadata-webhook'),
]

View File

@@ -608,15 +608,17 @@ class IntegrationViewSet(SiteSectorModelViewSet):
GET /api/v1/integration/integrations/{id}/debug-status/
Query params:
- include_events: Include recent sync events (default: false)
- include_events: Include recent sync events (default: true)
- include_validation: Include data validation matrix (default: false)
- event_limit: Number of events to return (default: 20)
- event_limit: Number of events to return (default: 50)
"""
from igny8_core.business.integration.models import SyncEvent
integration = self.get_object()
include_events = request.query_params.get('include_events', 'false').lower() == 'true'
include_events = request.query_params.get('include_events', 'true').lower() == 'true'
include_validation = request.query_params.get('include_validation', 'false').lower() == 'true'
event_limit = int(request.query_params.get('event_limit', 20))
event_limit = int(request.query_params.get('event_limit', 50))
# Get integration health
health_data = {
@@ -637,25 +639,30 @@ class IntegrationViewSet(SiteSectorModelViewSet):
# Include sync events if requested
if include_events:
sync_health_service = SyncHealthService()
logs = sync_health_service.get_sync_logs(
integration.site_id,
integration_id=integration.id,
limit=event_limit
)
# Get real-time sync events from database
events_qs = SyncEvent.objects.filter(
integration=integration
).order_by('-created_at')[:event_limit]
# Format logs as events
# Format events for frontend
events = []
for log in logs:
for event in events_qs:
events.append({
'type': 'sync' if log.get('success') else 'error',
'action': log.get('operation', 'sync'),
'description': log.get('message', 'Sync operation'),
'timestamp': log.get('timestamp', timezone.now().isoformat()),
'details': log.get('details', {}),
'id': event.id,
'type': event.event_type,
'action': event.action,
'description': event.description,
'timestamp': event.created_at.isoformat(),
'success': event.success,
'content_id': event.content_id,
'external_id': event.external_id,
'error_message': event.error_message,
'duration_ms': event.duration_ms,
'details': event.details,
})
response_data['events'] = events
response_data['events_count'] = len(events)
# Include data validation if requested
if include_validation:

View File

@@ -0,0 +1,328 @@
"""
WordPress Webhook Handlers
Receives status updates and sync events from WordPress
"""
from rest_framework.decorators import api_view, permission_classes, throttle_classes
from rest_framework.permissions import AllowAny
from rest_framework.throttling import BaseThrottle
from rest_framework.response import Response
from rest_framework import status as http_status
from django.utils import timezone
import logging
from igny8_core.api.response import success_response, error_response
from igny8_core.business.content.models import Content
from igny8_core.business.integration.models import SiteIntegration, SyncEvent
logger = logging.getLogger(__name__)
class NoThrottle(BaseThrottle):
"""No throttle for webhooks"""
def allow_request(self, request, view):
return True
@api_view(['POST'])
@permission_classes([AllowAny]) # Webhook authentication handled inside
@throttle_classes([NoThrottle])
def wordpress_status_webhook(request):
"""
Receive WordPress post status updates via webhook
POST /api/v1/integration/webhooks/wordpress/status/
Headers:
- X-IGNY8-API-KEY: <api_key>
Body:
{
"post_id": 123, # WordPress post ID
"content_id": 456, # IGNY8 content ID
"post_status": "publish", # WordPress status
"post_url": "https://example.com/post/...",
"post_title": "Post Title",
"site_url": "https://example.com"
}
"""
try:
# Validate API key
api_key = request.headers.get('X-IGNY8-API-KEY') or request.headers.get('Authorization', '').replace('Bearer ', '')
if not api_key:
return error_response(
error='Missing API key',
status_code=http_status.HTTP_401_UNAUTHORIZED,
request=request
)
# Get webhook data
data = request.data
content_id = data.get('content_id')
post_id = data.get('post_id')
post_status = data.get('post_status')
post_url = data.get('post_url')
site_url = data.get('site_url')
logger.info(f"[wordpress_status_webhook] Received webhook: content_id={content_id}, post_id={post_id}, status={post_status}")
# Validate required fields
if not content_id or not post_id or not post_status:
return error_response(
error='Missing required fields: content_id, post_id, post_status',
status_code=http_status.HTTP_400_BAD_REQUEST,
request=request
)
# Find content
try:
content = Content.objects.get(id=content_id)
except Content.DoesNotExist:
logger.error(f"[wordpress_status_webhook] Content {content_id} not found")
return error_response(
error=f'Content {content_id} not found',
status_code=http_status.HTTP_404_NOT_FOUND,
request=request
)
# Find site integration by site_url and verify API key
if site_url:
integration = SiteIntegration.objects.filter(
site=content.site,
platform='wordpress',
config_json__site_url=site_url
).first()
else:
# Fallback: find any active WordPress integration for this site
integration = SiteIntegration.objects.filter(
site=content.site,
platform='wordpress',
is_active=True
).first()
if not integration:
logger.error(f"[wordpress_status_webhook] No WordPress integration found for site {content.site.name}")
return error_response(
error='WordPress integration not found for this site',
status_code=http_status.HTTP_404_NOT_FOUND,
request=request
)
# Verify API key matches integration
stored_api_key = integration.credentials_json.get('api_key')
if not stored_api_key or stored_api_key != api_key:
logger.error(f"[wordpress_status_webhook] Invalid API key for integration {integration.id}")
return error_response(
error='Invalid API key',
status_code=http_status.HTTP_401_UNAUTHORIZED,
request=request
)
# Map WordPress status to IGNY8 status
status_map = {
'publish': 'published',
'draft': 'draft',
'pending': 'review',
'private': 'published',
'trash': 'draft',
'future': 'review',
}
igny8_status = status_map.get(post_status, 'review')
# Update content
old_status = content.status
old_wp_status = content.metadata.get('wordpress_status') if content.metadata else None
content.external_id = str(post_id)
if post_url:
content.external_url = post_url
# Only update IGNY8 status if WordPress status changed from draft to publish
if post_status == 'publish' and old_status != 'published':
content.status = 'published'
# Update WordPress status in metadata
if not content.metadata:
content.metadata = {}
content.metadata['wordpress_status'] = post_status
content.metadata['last_wp_sync'] = timezone.now().isoformat()
content.save(update_fields=['external_id', 'external_url', 'status', 'metadata', 'updated_at'])
logger.info(f"[wordpress_status_webhook] Updated content {content_id}:")
logger.info(f" - Status: {old_status}{content.status}")
logger.info(f" - WP Status: {old_wp_status}{post_status}")
logger.info(f" - External ID: {content.external_id}")
# Log sync event
SyncEvent.objects.create(
integration=integration,
site=content.site,
account=content.account,
event_type='webhook',
action='status_update',
description=f"WordPress status updated: '{content.title}' is now {post_status}",
success=True,
content_id=content.id,
external_id=str(post_id),
details={
'old_status': old_status,
'new_status': content.status,
'old_wp_status': old_wp_status,
'new_wp_status': post_status,
'post_url': post_url,
}
)
return success_response(
data={
'content_id': content.id,
'status': content.status,
'wordpress_status': post_status,
'external_id': content.external_id,
'external_url': content.external_url,
},
message='Content status updated successfully',
request=request
)
except Exception as e:
logger.error(f"[wordpress_status_webhook] Error processing webhook: {str(e)}", exc_info=True)
return error_response(
error=f'Failed to process webhook: {str(e)}',
status_code=http_status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)
@api_view(['POST'])
@permission_classes([AllowAny])
@throttle_classes([NoThrottle])
def wordpress_metadata_webhook(request):
"""
Receive WordPress metadata updates via webhook
POST /api/v1/integration/webhooks/wordpress/metadata/
Headers:
- X-IGNY8-API-KEY: <api_key>
Body:
{
"post_id": 123,
"content_id": 456,
"site_url": "https://example.com",
"metadata": {
"categories": [...],
"tags": [...],
"author": {...},
"modified_date": "2025-11-30T..."
}
}
"""
try:
# Validate API key
api_key = request.headers.get('X-IGNY8-API-KEY') or request.headers.get('Authorization', '').replace('Bearer ', '')
if not api_key:
return error_response(
error='Missing API key',
status_code=http_status.HTTP_401_UNAUTHORIZED,
request=request
)
# Get webhook data
data = request.data
content_id = data.get('content_id')
post_id = data.get('post_id')
site_url = data.get('site_url')
metadata = data.get('metadata', {})
logger.info(f"[wordpress_metadata_webhook] Received webhook: content_id={content_id}, post_id={post_id}")
# Validate required fields
if not content_id or not post_id:
return error_response(
error='Missing required fields: content_id, post_id',
status_code=http_status.HTTP_400_BAD_REQUEST,
request=request
)
# Find content
try:
content = Content.objects.get(id=content_id)
except Content.DoesNotExist:
return error_response(
error=f'Content {content_id} not found',
status_code=http_status.HTTP_404_NOT_FOUND,
request=request
)
# Find integration and verify API key
if site_url:
integration = SiteIntegration.objects.filter(
site=content.site,
platform='wordpress',
config_json__site_url=site_url
).first()
else:
integration = SiteIntegration.objects.filter(
site=content.site,
platform='wordpress',
is_active=True
).first()
if not integration:
return error_response(
error='WordPress integration not found',
status_code=http_status.HTTP_404_NOT_FOUND,
request=request
)
# Verify API key
stored_api_key = integration.credentials_json.get('api_key')
if not stored_api_key or stored_api_key != api_key:
return error_response(
error='Invalid API key',
status_code=http_status.HTTP_401_UNAUTHORIZED,
request=request
)
# Update content metadata
if not content.metadata:
content.metadata = {}
content.metadata['wp_metadata'] = metadata
content.metadata['last_metadata_sync'] = timezone.now().isoformat()
content.save(update_fields=['metadata', 'updated_at'])
# Log sync event
SyncEvent.objects.create(
integration=integration,
site=content.site,
account=content.account,
event_type='metadata_sync',
action='metadata_update',
description=f"WordPress metadata synced for '{content.title}'",
success=True,
content_id=content.id,
external_id=str(post_id),
details=metadata
)
logger.info(f"[wordpress_metadata_webhook] Updated metadata for content {content_id}")
return success_response(
data={
'content_id': content.id,
'metadata_updated': True,
},
message='Metadata updated successfully',
request=request
)
except Exception as e:
logger.error(f"[wordpress_metadata_webhook] Error: {str(e)}", exc_info=True)
return error_response(
error=f'Failed to process webhook: {str(e)}',
status_code=http_status.HTTP_500_INTERNAL_SERVER_ERROR,
request=request
)

View File

@@ -187,45 +187,136 @@ def publish_content_to_wordpress(self, content_id: int, site_integration_id: int
)
logger.info(f"[publish_content_to_wordpress] 📬 WordPress response: status={response.status_code}")
# Track start time for duration measurement
import time
from igny8_core.business.integration.models import SyncEvent
start_time = time.time()
if response.status_code == 201:
# Success
wp_data = response.json().get('data', {})
logger.info(f"[publish_content_to_wordpress] ✅ WordPress post created successfully: post_id={wp_data.get('post_id')}")
wp_status = wp_data.get('post_status', 'publish')
logger.info(f"[publish_content_to_wordpress] ✅ WordPress post created successfully: post_id={wp_data.get('post_id')}, status={wp_status}")
# Update external_id and external_url for unified Content model
content.external_id = wp_data.get('post_id')
# Update external_id, external_url, and wordpress_status in Content model
content.external_id = str(wp_data.get('post_id'))
content.external_url = wp_data.get('post_url')
content.status = 'published'
# Add wordpress_status field to Content model metadata
if not hasattr(content, 'metadata') or content.metadata is None:
content.metadata = {}
content.metadata['wordpress_status'] = wp_status
content.save(update_fields=[
'external_id', 'external_url', 'status', 'updated_at'
'external_id', 'external_url', 'status', 'metadata', 'updated_at'
])
logger.info(f"[publish_content_to_wordpress] 💾 Content model updated: external_id={content.external_id}, status=published")
logger.info(f"[publish_content_to_wordpress] 💾 Content model updated:")
logger.info(f" - External ID: {content.external_id}")
logger.info(f" - External URL: {content.external_url}")
logger.info(f" - Status: published")
logger.info(f" - WordPress Status: {wp_status}")
# Log sync event
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='publish',
action='content_publish',
description=f"Published content '{content.title}' to WordPress",
success=True,
content_id=content.id,
external_id=str(content.external_id),
details={
'post_url': content.external_url,
'wordpress_status': wp_status,
'categories': categories,
'tags': tags,
'has_featured_image': bool(featured_image_url),
'gallery_images_count': len(gallery_images),
},
duration_ms=duration_ms
)
logger.info(f"[publish_content_to_wordpress] 🎉 Successfully published content {content_id} to WordPress post {content.external_id}")
return {
"success": True,
"external_id": content.external_id,
"external_url": content.external_url
"external_url": content.external_url,
"wordpress_status": wp_status
}
elif response.status_code == 409:
# Content already exists
wp_data = response.json().get('data', {})
content.external_id = wp_data.get('post_id')
wp_status = wp_data.get('post_status', 'publish')
content.external_id = str(wp_data.get('post_id'))
content.external_url = wp_data.get('post_url')
content.status = 'published'
# Update wordpress_status in metadata
if not hasattr(content, 'metadata') or content.metadata is None:
content.metadata = {}
content.metadata['wordpress_status'] = wp_status
content.save(update_fields=[
'external_id', 'external_url', 'status', 'updated_at'
'external_id', 'external_url', 'status', 'metadata', 'updated_at'
])
# Log sync event
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='sync',
action='content_publish',
description=f"Content '{content.title}' already exists in WordPress",
success=True,
content_id=content.id,
external_id=str(content.external_id),
details={
'post_url': content.external_url,
'wordpress_status': wp_status,
'already_exists': True,
},
duration_ms=duration_ms
)
logger.info(f"Content {content_id} already exists on WordPress")
return {"success": True, "message": "Content already exists", "external_id": content.external_id}
return {
"success": True,
"message": "Content already exists",
"external_id": content.external_id,
"wordpress_status": wp_status
}
else:
# Error
error_msg = f"WordPress API error: {response.status_code} - {response.text}"
logger.error(f"[publish_content_to_wordpress] ❌ {error_msg}")
# Log sync event for failure
duration_ms = int((time.time() - start_time) * 1000)
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='error',
action='content_publish',
description=f"Failed to publish content '{content.title}' to WordPress",
success=False,
content_id=content.id,
error_message=error_msg,
details={
'status_code': response.status_code,
'response_text': response.text[:500], # Limit length
},
duration_ms=duration_ms
)
# Retry logic
if self.request.retries < self.max_retries:
# Exponential backoff: 1min, 5min, 15min
@@ -239,6 +330,30 @@ def publish_content_to_wordpress(self, content_id: int, site_integration_id: int
except Exception as e:
logger.error(f"[publish_content_to_wordpress] ❌ Exception during publish: {str(e)}", exc_info=True)
# Log sync event for exception
try:
from igny8_core.business.integration.models import SyncEvent
import time
SyncEvent.objects.create(
integration=site_integration,
site=content.site,
account=content.account,
event_type='error',
action='content_publish',
description=f"Exception while publishing content '{content.title}' to WordPress",
success=False,
content_id=content.id,
error_message=str(e),
details={
'exception_type': type(e).__name__,
'traceback': str(e),
},
)
except Exception as log_error:
logger.error(f"Failed to log sync event: {str(log_error)}")
return {"success": False, "error": str(e)}