stage 4-2
This commit is contained in:
@@ -0,0 +1,445 @@
|
||||
"""
|
||||
Sync Health Service
|
||||
Stage 4: Track sync health, mismatches, and logs
|
||||
|
||||
Provides health monitoring for site integrations.
|
||||
"""
|
||||
import logging
|
||||
from typing import Dict, Any, Optional, List
|
||||
from datetime import datetime, timedelta
|
||||
from django.utils import timezone
|
||||
|
||||
from igny8_core.business.integration.models import SiteIntegration
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SyncHealthService:
|
||||
"""
|
||||
Service for tracking sync health and detecting mismatches.
|
||||
"""
|
||||
|
||||
def get_sync_status(
|
||||
self,
|
||||
site_id: int,
|
||||
integration_id: Optional[int] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get sync status for a site or specific integration.
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
integration_id: Optional integration ID (if None, returns all integrations)
|
||||
|
||||
Returns:
|
||||
dict: {
|
||||
'site_id': int,
|
||||
'integrations': [
|
||||
{
|
||||
'id': int,
|
||||
'platform': str,
|
||||
'status': str,
|
||||
'last_sync_at': datetime,
|
||||
'sync_enabled': bool,
|
||||
'is_healthy': bool,
|
||||
'error': str,
|
||||
'mismatch_count': int
|
||||
}
|
||||
],
|
||||
'overall_status': str, # 'healthy', 'warning', 'error'
|
||||
'last_sync_at': datetime
|
||||
}
|
||||
"""
|
||||
try:
|
||||
integrations_query = SiteIntegration.objects.filter(
|
||||
site_id=site_id,
|
||||
is_active=True
|
||||
)
|
||||
|
||||
if integration_id:
|
||||
integrations_query = integrations_query.filter(id=integration_id)
|
||||
|
||||
integrations = []
|
||||
overall_healthy = True
|
||||
last_sync = None
|
||||
|
||||
for integration in integrations_query:
|
||||
mismatch_count = self._count_mismatches(integration)
|
||||
is_healthy = (
|
||||
integration.sync_status == 'success' and
|
||||
mismatch_count == 0 and
|
||||
(not integration.sync_error or integration.sync_error == '')
|
||||
)
|
||||
|
||||
if not is_healthy:
|
||||
overall_healthy = False
|
||||
|
||||
if integration.last_sync_at:
|
||||
if last_sync is None or integration.last_sync_at > last_sync:
|
||||
last_sync = integration.last_sync_at
|
||||
|
||||
integrations.append({
|
||||
'id': integration.id,
|
||||
'platform': integration.platform,
|
||||
'status': integration.sync_status,
|
||||
'last_sync_at': integration.last_sync_at.isoformat() if integration.last_sync_at else None,
|
||||
'sync_enabled': integration.sync_enabled,
|
||||
'is_healthy': is_healthy,
|
||||
'error': integration.sync_error,
|
||||
'mismatch_count': mismatch_count
|
||||
})
|
||||
|
||||
# Determine overall status
|
||||
if overall_healthy:
|
||||
overall_status = 'healthy'
|
||||
elif any(i['status'] == 'failed' for i in integrations):
|
||||
overall_status = 'error'
|
||||
else:
|
||||
overall_status = 'warning'
|
||||
|
||||
return {
|
||||
'site_id': site_id,
|
||||
'integrations': integrations,
|
||||
'overall_status': overall_status,
|
||||
'last_sync_at': last_sync.isoformat() if last_sync else None
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting sync status: {e}", exc_info=True)
|
||||
return {
|
||||
'site_id': site_id,
|
||||
'integrations': [],
|
||||
'overall_status': 'error',
|
||||
'last_sync_at': None,
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
def get_mismatches(
|
||||
self,
|
||||
site_id: int,
|
||||
integration_id: Optional[int] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get detailed mismatch information.
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
integration_id: Optional integration ID
|
||||
|
||||
Returns:
|
||||
dict: {
|
||||
'taxonomies': {
|
||||
'missing_in_wordpress': List[Dict],
|
||||
'missing_in_igny8': List[Dict],
|
||||
'mismatched': List[Dict]
|
||||
},
|
||||
'products': {
|
||||
'missing_in_wordpress': List[Dict],
|
||||
'missing_in_igny8': List[Dict]
|
||||
},
|
||||
'posts': {
|
||||
'missing_in_wordpress': List[Dict],
|
||||
'missing_in_igny8': List[Dict]
|
||||
}
|
||||
}
|
||||
"""
|
||||
try:
|
||||
integrations_query = SiteIntegration.objects.filter(
|
||||
site_id=site_id,
|
||||
is_active=True
|
||||
)
|
||||
|
||||
if integration_id:
|
||||
integrations_query = integrations_query.filter(id=integration_id)
|
||||
|
||||
all_mismatches = {
|
||||
'taxonomies': {
|
||||
'missing_in_wordpress': [],
|
||||
'missing_in_igny8': [],
|
||||
'mismatched': []
|
||||
},
|
||||
'products': {
|
||||
'missing_in_wordpress': [],
|
||||
'missing_in_igny8': []
|
||||
},
|
||||
'posts': {
|
||||
'missing_in_wordpress': [],
|
||||
'missing_in_igny8': []
|
||||
}
|
||||
}
|
||||
|
||||
for integration in integrations_query:
|
||||
if integration.platform == 'wordpress':
|
||||
mismatches = self._detect_wordpress_mismatches(integration)
|
||||
# Merge mismatches
|
||||
for key in all_mismatches:
|
||||
if key in mismatches:
|
||||
all_mismatches[key]['missing_in_wordpress'].extend(
|
||||
mismatches[key].get('missing_in_wordpress', [])
|
||||
)
|
||||
all_mismatches[key]['missing_in_igny8'].extend(
|
||||
mismatches[key].get('missing_in_igny8', [])
|
||||
)
|
||||
if 'mismatched' in mismatches[key]:
|
||||
all_mismatches[key]['mismatched'].extend(
|
||||
mismatches[key]['mismatched']
|
||||
)
|
||||
|
||||
return all_mismatches
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting mismatches: {e}", exc_info=True)
|
||||
return {
|
||||
'taxonomies': {'missing_in_wordpress': [], 'missing_in_igny8': [], 'mismatched': []},
|
||||
'products': {'missing_in_wordpress': [], 'missing_in_igny8': []},
|
||||
'posts': {'missing_in_wordpress': [], 'missing_in_igny8': []},
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
def get_sync_logs(
|
||||
self,
|
||||
site_id: int,
|
||||
integration_id: Optional[int] = None,
|
||||
limit: int = 100
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get sync logs for a site or integration.
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
integration_id: Optional integration ID
|
||||
limit: Maximum number of logs to return
|
||||
|
||||
Returns:
|
||||
List of log dictionaries
|
||||
"""
|
||||
try:
|
||||
integrations_query = SiteIntegration.objects.filter(
|
||||
site_id=site_id,
|
||||
is_active=True
|
||||
)
|
||||
|
||||
if integration_id:
|
||||
integrations_query = integrations_query.filter(id=integration_id)
|
||||
|
||||
logs = []
|
||||
for integration in integrations_query:
|
||||
# Use SiteIntegration fields as log entries
|
||||
if integration.last_sync_at:
|
||||
logs.append({
|
||||
'integration_id': integration.id,
|
||||
'platform': integration.platform,
|
||||
'timestamp': integration.last_sync_at.isoformat(),
|
||||
'status': integration.sync_status,
|
||||
'error': integration.sync_error,
|
||||
'duration': None, # Not tracked in current model
|
||||
'items_processed': None # Not tracked in current model
|
||||
})
|
||||
|
||||
# Sort by timestamp descending
|
||||
logs.sort(key=lambda x: x['timestamp'] or '', reverse=True)
|
||||
|
||||
return logs[:limit]
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting sync logs: {e}", exc_info=True)
|
||||
return []
|
||||
|
||||
def record_sync_run(
|
||||
self,
|
||||
integration_id: int,
|
||||
result: Dict[str, Any]
|
||||
) -> None:
|
||||
"""
|
||||
Record a sync run result.
|
||||
|
||||
Args:
|
||||
integration_id: Integration ID
|
||||
result: Sync result dict from ContentSyncService
|
||||
"""
|
||||
try:
|
||||
integration = SiteIntegration.objects.get(id=integration_id)
|
||||
|
||||
if result.get('success'):
|
||||
integration.sync_status = 'success'
|
||||
integration.last_sync_at = timezone.now()
|
||||
integration.sync_error = None
|
||||
else:
|
||||
integration.sync_status = 'failed'
|
||||
integration.sync_error = result.get('error', 'Unknown error')
|
||||
|
||||
integration.save(update_fields=['sync_status', 'last_sync_at', 'sync_error', 'updated_at'])
|
||||
|
||||
logger.info(
|
||||
f"[SyncHealthService] Recorded sync run for integration {integration_id}: "
|
||||
f"status={integration.sync_status}, synced_count={result.get('synced_count', 0)}"
|
||||
)
|
||||
except SiteIntegration.DoesNotExist:
|
||||
logger.warning(f"Integration {integration_id} not found for sync recording")
|
||||
except Exception as e:
|
||||
logger.error(f"Error recording sync run: {e}", exc_info=True)
|
||||
|
||||
def _count_mismatches(self, integration: SiteIntegration) -> int:
|
||||
"""
|
||||
Count total mismatches for an integration.
|
||||
|
||||
Args:
|
||||
integration: SiteIntegration instance
|
||||
|
||||
Returns:
|
||||
int: Total mismatch count
|
||||
"""
|
||||
try:
|
||||
if integration.platform != 'wordpress':
|
||||
return 0
|
||||
|
||||
mismatches = self._detect_wordpress_mismatches(integration)
|
||||
count = 0
|
||||
for category in mismatches.values():
|
||||
count += len(category.get('missing_in_wordpress', []))
|
||||
count += len(category.get('missing_in_igny8', []))
|
||||
count += len(category.get('mismatched', []))
|
||||
return count
|
||||
except Exception as e:
|
||||
logger.warning(f"Error counting mismatches: {e}")
|
||||
return 0
|
||||
|
||||
def _detect_wordpress_mismatches(
|
||||
self,
|
||||
integration: SiteIntegration
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Detect mismatches between IGNY8 and WordPress.
|
||||
|
||||
Args:
|
||||
integration: SiteIntegration instance
|
||||
|
||||
Returns:
|
||||
dict: Mismatch details
|
||||
"""
|
||||
mismatches = {
|
||||
'taxonomies': {
|
||||
'missing_in_wordpress': [],
|
||||
'missing_in_igny8': [],
|
||||
'mismatched': []
|
||||
},
|
||||
'products': {
|
||||
'missing_in_wordpress': [],
|
||||
'missing_in_igny8': []
|
||||
},
|
||||
'posts': {
|
||||
'missing_in_wordpress': [],
|
||||
'missing_in_igny8': []
|
||||
}
|
||||
}
|
||||
|
||||
try:
|
||||
from igny8_core.utils.wordpress import WordPressClient
|
||||
from igny8_core.business.site_building.models import SiteBlueprint, SiteBlueprintTaxonomy
|
||||
from igny8_core.business.content.models import Content
|
||||
|
||||
credentials = integration.get_credentials()
|
||||
client = WordPressClient(
|
||||
site_url=integration.config_json.get('site_url', ''),
|
||||
username=credentials.get('username'),
|
||||
app_password=credentials.get('app_password')
|
||||
)
|
||||
|
||||
# Get site blueprint
|
||||
blueprint = SiteBlueprint.objects.filter(
|
||||
account=integration.account,
|
||||
site=integration.site
|
||||
).first()
|
||||
|
||||
if not blueprint:
|
||||
return mismatches
|
||||
|
||||
# Check taxonomy mismatches
|
||||
# Get IGNY8 taxonomies
|
||||
igny8_taxonomies = SiteBlueprintTaxonomy.objects.filter(
|
||||
site_blueprint=blueprint
|
||||
)
|
||||
|
||||
# Get WordPress categories
|
||||
wp_categories = client.get_categories(per_page=100)
|
||||
wp_category_ids = {str(cat['id']): cat for cat in wp_categories}
|
||||
|
||||
# Get WordPress tags
|
||||
wp_tags = client.get_tags(per_page=100)
|
||||
wp_tag_ids = {str(tag['id']): tag for tag in wp_tags}
|
||||
|
||||
for taxonomy in igny8_taxonomies:
|
||||
if taxonomy.external_reference:
|
||||
# Check if still exists in WordPress
|
||||
if taxonomy.taxonomy_type in ['blog_category', 'product_category']:
|
||||
if taxonomy.external_reference not in wp_category_ids:
|
||||
mismatches['taxonomies']['missing_in_wordpress'].append({
|
||||
'id': taxonomy.id,
|
||||
'name': taxonomy.name,
|
||||
'type': taxonomy.taxonomy_type,
|
||||
'external_reference': taxonomy.external_reference
|
||||
})
|
||||
elif taxonomy.taxonomy_type in ['blog_tag', 'product_tag']:
|
||||
if taxonomy.external_reference not in wp_tag_ids:
|
||||
mismatches['taxonomies']['missing_in_wordpress'].append({
|
||||
'id': taxonomy.id,
|
||||
'name': taxonomy.name,
|
||||
'type': taxonomy.taxonomy_type,
|
||||
'external_reference': taxonomy.external_reference
|
||||
})
|
||||
else:
|
||||
# Taxonomy exists in IGNY8 but not synced to WordPress
|
||||
mismatches['taxonomies']['missing_in_wordpress'].append({
|
||||
'id': taxonomy.id,
|
||||
'name': taxonomy.name,
|
||||
'type': taxonomy.taxonomy_type
|
||||
})
|
||||
|
||||
# Check for WordPress taxonomies not in IGNY8
|
||||
for cat in wp_categories:
|
||||
if not SiteBlueprintTaxonomy.objects.filter(
|
||||
site_blueprint=blueprint,
|
||||
external_reference=str(cat['id'])
|
||||
).exists():
|
||||
mismatches['taxonomies']['missing_in_igny8'].append({
|
||||
'name': cat['name'],
|
||||
'slug': cat['slug'],
|
||||
'type': 'blog_category',
|
||||
'external_reference': str(cat['id'])
|
||||
})
|
||||
|
||||
for tag in wp_tags:
|
||||
if not SiteBlueprintTaxonomy.objects.filter(
|
||||
site_blueprint=blueprint,
|
||||
external_reference=str(tag['id'])
|
||||
).exists():
|
||||
mismatches['taxonomies']['missing_in_igny8'].append({
|
||||
'name': tag['name'],
|
||||
'slug': tag['slug'],
|
||||
'type': 'blog_tag',
|
||||
'external_reference': str(tag['id'])
|
||||
})
|
||||
|
||||
# Check content mismatches (basic check)
|
||||
igny8_content = Content.objects.filter(
|
||||
account=integration.account,
|
||||
site=integration.site,
|
||||
source='igny8',
|
||||
status='publish'
|
||||
)
|
||||
|
||||
for content in igny8_content[:50]: # Limit check
|
||||
if content.metadata and content.metadata.get('wordpress_id'):
|
||||
# Content should exist in WordPress (would need to check)
|
||||
# For now, just note if metadata exists
|
||||
pass
|
||||
else:
|
||||
# Content not synced to WordPress
|
||||
mismatches['posts']['missing_in_wordpress'].append({
|
||||
'id': content.id,
|
||||
'title': content.title,
|
||||
'type': content.content_type
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error detecting WordPress mismatches: {e}")
|
||||
|
||||
return mismatches
|
||||
|
||||
@@ -26,6 +26,7 @@ class SyncService:
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Perform synchronization.
|
||||
Stage 4: Enhanced to record sync runs for health tracking.
|
||||
|
||||
Args:
|
||||
integration: SiteIntegration instance
|
||||
@@ -74,13 +75,23 @@ class SyncService:
|
||||
|
||||
total_synced = to_result.get('synced_count', 0) + from_result.get('synced_count', 0)
|
||||
|
||||
return {
|
||||
result = {
|
||||
'success': to_result.get('success') and from_result.get('success'),
|
||||
'synced_count': total_synced,
|
||||
'to_external': to_result,
|
||||
'from_external': from_result
|
||||
}
|
||||
|
||||
# Stage 4: Record sync run for health tracking
|
||||
try:
|
||||
from igny8_core.business.integration.services.sync_health_service import SyncHealthService
|
||||
sync_health_service = SyncHealthService()
|
||||
sync_health_service.record_sync_run(integration.id, result)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to record sync run: {e}")
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"[SyncService] Error syncing integration {integration.id}: {str(e)}",
|
||||
@@ -91,11 +102,21 @@ class SyncService:
|
||||
integration.sync_error = str(e)
|
||||
integration.save(update_fields=['sync_status', 'sync_error', 'updated_at'])
|
||||
|
||||
return {
|
||||
error_result = {
|
||||
'success': False,
|
||||
'error': str(e),
|
||||
'synced_count': 0
|
||||
}
|
||||
|
||||
# Stage 4: Record failed sync run
|
||||
try:
|
||||
from igny8_core.business.integration.services.sync_health_service import SyncHealthService
|
||||
sync_health_service = SyncHealthService()
|
||||
sync_health_service.record_sync_run(integration.id, error_result)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to record sync run: {e}")
|
||||
|
||||
return error_result
|
||||
|
||||
def _sync_to_external(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user