This commit is contained in:
alorig
2025-11-18 05:21:27 +05:00
parent a0f3e3a778
commit 9a6d47b91b
34 changed files with 3258 additions and 9 deletions

View File

@@ -0,0 +1,4 @@
"""
Integration Services
"""

View File

@@ -0,0 +1,191 @@
"""
Content Sync Service
Phase 6: Site Integration & Multi-Destination Publishing
Syncs content between IGNY8 and external platforms.
"""
import logging
from typing import Dict, Any, Optional, List
from igny8_core.business.integration.models import SiteIntegration
logger = logging.getLogger(__name__)
class ContentSyncService:
"""
Service for syncing content to/from external platforms.
"""
def sync_to_external(
self,
integration: SiteIntegration,
content_types: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Sync content from IGNY8 to external platform.
Args:
integration: SiteIntegration instance
content_types: List of content types to sync (optional)
Returns:
dict: Sync result
"""
try:
if integration.platform == 'wordpress':
return self._sync_to_wordpress(integration, content_types)
elif integration.platform == 'shopify':
return self._sync_to_shopify(integration, content_types)
else:
return {
'success': False,
'error': f'Sync to {integration.platform} not implemented',
'synced_count': 0
}
except Exception as e:
logger.error(
f"[ContentSyncService] Error syncing to {integration.platform}: {str(e)}",
exc_info=True
)
return {
'success': False,
'error': str(e),
'synced_count': 0
}
def sync_from_external(
self,
integration: SiteIntegration,
content_types: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Sync content from external platform to IGNY8.
Args:
integration: SiteIntegration instance
content_types: List of content types to sync (optional)
Returns:
dict: Sync result
"""
try:
if integration.platform == 'wordpress':
return self._sync_from_wordpress(integration, content_types)
elif integration.platform == 'shopify':
return self._sync_from_shopify(integration, content_types)
else:
return {
'success': False,
'error': f'Sync from {integration.platform} not implemented',
'synced_count': 0
}
except Exception as e:
logger.error(
f"[ContentSyncService] Error syncing from {integration.platform}: {str(e)}",
exc_info=True
)
return {
'success': False,
'error': str(e),
'synced_count': 0
}
def _sync_to_wordpress(
self,
integration: SiteIntegration,
content_types: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Sync content from IGNY8 to WordPress.
Args:
integration: SiteIntegration instance
content_types: List of content types to sync
Returns:
dict: Sync result
"""
# TODO: Implement WordPress sync
# This will use the WordPress adapter to publish content
logger.info(f"[ContentSyncService] Syncing to WordPress for integration {integration.id}")
return {
'success': True,
'synced_count': 0,
'message': 'WordPress sync to external not yet fully implemented'
}
def _sync_from_wordpress(
self,
integration: SiteIntegration,
content_types: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Sync content from WordPress to IGNY8.
Args:
integration: SiteIntegration instance
content_types: List of content types to sync
Returns:
dict: Sync result
"""
# TODO: Implement WordPress import
# This will fetch posts/pages from WordPress and create Content records
logger.info(f"[ContentSyncService] Syncing from WordPress for integration {integration.id}")
return {
'success': True,
'synced_count': 0,
'message': 'WordPress sync from external not yet fully implemented'
}
def _sync_to_shopify(
self,
integration: SiteIntegration,
content_types: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Sync content from IGNY8 to Shopify.
Args:
integration: SiteIntegration instance
content_types: List of content types to sync
Returns:
dict: Sync result
"""
# TODO: Implement Shopify sync
logger.info(f"[ContentSyncService] Syncing to Shopify for integration {integration.id}")
return {
'success': True,
'synced_count': 0,
'message': 'Shopify sync not yet implemented'
}
def _sync_from_shopify(
self,
integration: SiteIntegration,
content_types: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Sync content from Shopify to IGNY8.
Args:
integration: SiteIntegration instance
content_types: List of content types to sync
Returns:
dict: Sync result
"""
# TODO: Implement Shopify import
logger.info(f"[ContentSyncService] Syncing from Shopify for integration {integration.id}")
return {
'success': True,
'synced_count': 0,
'message': 'Shopify sync not yet implemented'
}

View File

@@ -0,0 +1,245 @@
"""
Integration Service
Phase 6: Site Integration & Multi-Destination Publishing
Manages site integrations (WordPress, Shopify, etc.).
"""
import logging
from typing import Optional, Dict, Any
from igny8_core.business.integration.models import SiteIntegration
from igny8_core.auth.models import Site
logger = logging.getLogger(__name__)
class IntegrationService:
"""
Service for managing site integrations.
"""
def create_integration(
self,
site: Site,
platform: str,
config: Dict[str, Any],
credentials: Dict[str, Any],
platform_type: str = 'cms'
) -> SiteIntegration:
"""
Create a new site integration.
Args:
site: Site instance
platform: Platform name ('wordpress', 'shopify', 'custom')
config: Platform-specific configuration
credentials: Platform credentials (will be encrypted)
platform_type: Platform type ('cms', 'ecommerce', 'custom_api')
Returns:
SiteIntegration instance
"""
integration = SiteIntegration.objects.create(
account=site.account,
site=site,
platform=platform,
platform_type=platform_type,
config_json=config,
credentials_json=credentials,
is_active=True
)
logger.info(
f"[IntegrationService] Created integration {integration.id} for site {site.id}, platform {platform}"
)
return integration
def update_integration(
self,
integration: SiteIntegration,
config: Optional[Dict[str, Any]] = None,
credentials: Optional[Dict[str, Any]] = None,
is_active: Optional[bool] = None
) -> SiteIntegration:
"""
Update an existing integration.
Args:
integration: SiteIntegration instance
config: Updated configuration (optional)
credentials: Updated credentials (optional)
is_active: Active status (optional)
Returns:
Updated SiteIntegration instance
"""
if config is not None:
integration.config_json = config
if credentials is not None:
integration.set_credentials(credentials)
if is_active is not None:
integration.is_active = is_active
integration.save()
logger.info(f"[IntegrationService] Updated integration {integration.id}")
return integration
def delete_integration(self, integration: SiteIntegration):
"""
Delete an integration.
Args:
integration: SiteIntegration instance
"""
integration_id = integration.id
integration.delete()
logger.info(f"[IntegrationService] Deleted integration {integration_id}")
def get_integration(
self,
site: Site,
platform: str
) -> Optional[SiteIntegration]:
"""
Get integration for a site and platform.
Args:
site: Site instance
platform: Platform name
Returns:
SiteIntegration or None
"""
return SiteIntegration.objects.filter(
site=site,
platform=platform,
is_active=True
).first()
def list_integrations(
self,
site: Site,
active_only: bool = True
) -> list:
"""
List all integrations for a site.
Args:
site: Site instance
active_only: Only return active integrations
Returns:
List of SiteIntegration instances
"""
queryset = SiteIntegration.objects.filter(site=site)
if active_only:
queryset = queryset.filter(is_active=True)
return list(queryset.order_by('-created_at'))
def test_connection(
self,
integration: SiteIntegration
) -> Dict[str, Any]:
"""
Test connection to the integrated platform.
Args:
integration: SiteIntegration instance
Returns:
dict: {
'success': bool,
'message': str,
'details': dict
}
"""
try:
if integration.platform == 'wordpress':
return self._test_wordpress_connection(integration)
elif integration.platform == 'shopify':
return self._test_shopify_connection(integration)
else:
return {
'success': False,
'message': f'Connection testing not implemented for platform: {integration.platform}',
'details': {}
}
except Exception as e:
logger.error(
f"[IntegrationService] Error testing connection for integration {integration.id}: {str(e)}",
exc_info=True
)
return {
'success': False,
'message': str(e),
'details': {}
}
def _test_wordpress_connection(
self,
integration: SiteIntegration
) -> Dict[str, Any]:
"""
Test WordPress connection.
Args:
integration: SiteIntegration instance
Returns:
dict: Connection test result
"""
from igny8_core.utils.wordpress import WordPressClient
config = integration.config_json
credentials = integration.get_credentials()
site_url = config.get('site_url')
username = credentials.get('username')
app_password = credentials.get('app_password')
if not site_url:
return {
'success': False,
'message': 'WordPress site URL not configured',
'details': {}
}
try:
client = WordPressClient(site_url, username, app_password)
result = client.test_connection()
return result
except Exception as e:
return {
'success': False,
'message': f'WordPress connection failed: {str(e)}',
'details': {}
}
def _test_shopify_connection(
self,
integration: SiteIntegration
) -> Dict[str, Any]:
"""
Test Shopify connection.
Args:
integration: SiteIntegration instance
Returns:
dict: Connection test result
"""
# TODO: Implement Shopify connection testing
return {
'success': False,
'message': 'Shopify connection testing not yet implemented',
'details': {}
}

View File

@@ -0,0 +1,161 @@
"""
Sync Service
Phase 6: Site Integration & Multi-Destination Publishing
Handles two-way synchronization between IGNY8 and external platforms.
"""
import logging
from typing import Dict, Any, Optional
from datetime import datetime
from igny8_core.business.integration.models import SiteIntegration
logger = logging.getLogger(__name__)
class SyncService:
"""
Service for handling two-way sync between IGNY8 and external platforms.
"""
def sync(
self,
integration: SiteIntegration,
direction: str = 'both',
content_types: Optional[list] = None
) -> Dict[str, Any]:
"""
Perform synchronization.
Args:
integration: SiteIntegration instance
direction: 'both', 'to_external', 'from_external'
content_types: List of content types to sync (optional, syncs all if None)
Returns:
dict: Sync result
"""
if not integration.sync_enabled:
return {
'success': False,
'message': 'Sync is not enabled for this integration',
'synced_count': 0
}
# Update sync status
integration.sync_status = 'syncing'
integration.save(update_fields=['sync_status', 'updated_at'])
try:
if direction in ('both', 'to_external'):
# Sync from IGNY8 to external platform
to_result = self._sync_to_external(integration, content_types)
else:
to_result = {'success': True, 'synced_count': 0}
if direction in ('both', 'from_external'):
# Sync from external platform to IGNY8
from_result = self._sync_from_external(integration, content_types)
else:
from_result = {'success': True, 'synced_count': 0}
# Update sync status
if to_result.get('success') and from_result.get('success'):
integration.sync_status = 'success'
integration.sync_error = None
else:
integration.sync_status = 'failed'
integration.sync_error = (
to_result.get('error', '') + ' ' + from_result.get('error', '')
).strip()
integration.last_sync_at = datetime.now()
integration.save(update_fields=['sync_status', 'sync_error', 'last_sync_at', 'updated_at'])
total_synced = to_result.get('synced_count', 0) + from_result.get('synced_count', 0)
return {
'success': to_result.get('success') and from_result.get('success'),
'synced_count': total_synced,
'to_external': to_result,
'from_external': from_result
}
except Exception as e:
logger.error(
f"[SyncService] Error syncing integration {integration.id}: {str(e)}",
exc_info=True
)
integration.sync_status = 'failed'
integration.sync_error = str(e)
integration.save(update_fields=['sync_status', 'sync_error', 'updated_at'])
return {
'success': False,
'error': str(e),
'synced_count': 0
}
def _sync_to_external(
self,
integration: SiteIntegration,
content_types: Optional[list] = None
) -> Dict[str, Any]:
"""
Sync content from IGNY8 to external platform.
Args:
integration: SiteIntegration instance
content_types: List of content types to sync
Returns:
dict: Sync result
"""
# This will be implemented by ContentSyncService
from igny8_core.business.integration.services.content_sync_service import ContentSyncService
sync_service = ContentSyncService()
return sync_service.sync_to_external(integration, content_types)
def _sync_from_external(
self,
integration: SiteIntegration,
content_types: Optional[list] = None
) -> Dict[str, Any]:
"""
Sync content from external platform to IGNY8.
Args:
integration: SiteIntegration instance
content_types: List of content types to sync
Returns:
dict: Sync result
"""
# This will be implemented by ContentSyncService
from igny8_core.business.integration.services.content_sync_service import ContentSyncService
sync_service = ContentSyncService()
return sync_service.sync_from_external(integration, content_types)
def get_sync_status(
self,
integration: SiteIntegration
) -> Dict[str, Any]:
"""
Get current sync status for an integration.
Args:
integration: SiteIntegration instance
Returns:
dict: Sync status information
"""
return {
'sync_enabled': integration.sync_enabled,
'sync_status': integration.sync_status,
'last_sync_at': integration.last_sync_at.isoformat() if integration.last_sync_at else None,
'sync_error': integration.sync_error
}