This commit is contained in:
alorig
2025-11-18 06:50:35 +05:00
parent ef16ad760f
commit 873f97ea3f
27 changed files with 1717 additions and 4 deletions

View File

@@ -0,0 +1,5 @@
"""
Integration Tests
Phase 6: Site Integration & Multi-Destination Publishing
"""

View File

@@ -0,0 +1,114 @@
"""
Tests for ContentSyncService
Phase 6: Site Integration & Multi-Destination Publishing
"""
from django.test import TestCase
from unittest.mock import patch, Mock
from igny8_core.auth.models import Account, Site, Sector
from igny8_core.business.integration.models import SiteIntegration
from igny8_core.business.integration.services.content_sync_service import ContentSyncService
from igny8_core.business.content.models import Content
class ContentSyncServiceTestCase(TestCase):
"""Test cases for ContentSyncService"""
def setUp(self):
"""Set up test data"""
self.account = Account.objects.create(name="Test Account")
self.site = Site.objects.create(
account=self.account,
name="Test Site",
slug="test-site"
)
self.sector = Sector.objects.create(
account=self.account,
site=self.site,
name="Test Sector",
slug="test-sector"
)
self.integration = SiteIntegration.objects.create(
account=self.account,
site=self.site,
platform='wordpress',
platform_type='cms',
sync_enabled=True
)
self.service = ContentSyncService()
def test_sync_content_from_wordpress_creates_content(self):
"""Test: WordPress sync works (when plugin connected)"""
mock_posts = [
{
'id': 1,
'title': 'Test Post',
'content': '<p>Test content</p>',
'status': 'publish',
}
]
with patch.object(self.service, '_fetch_wordpress_posts') as mock_fetch:
mock_fetch.return_value = mock_posts
result = self.service.sync_from_wordpress(self.integration)
self.assertTrue(result.get('success'))
self.assertEqual(result.get('synced_count'), 1)
# Verify content was created
content = Content.objects.filter(site=self.site).first()
self.assertIsNotNone(content)
self.assertEqual(content.title, 'Test Post')
self.assertEqual(content.source, 'wordpress')
def test_sync_content_from_shopify_creates_content(self):
"""Test: Content sync works"""
mock_products = [
{
'id': 1,
'title': 'Test Product',
'body_html': '<p>Product description</p>',
}
]
with patch.object(self.service, '_fetch_shopify_products') as mock_fetch:
mock_fetch.return_value = mock_products
result = self.service.sync_from_shopify(self.integration)
self.assertTrue(result.get('success'))
self.assertEqual(result.get('synced_count'), 1)
def test_sync_handles_duplicate_content(self):
"""Test: Content sync works"""
# Create existing content
Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test Post",
html_content="<p>Existing</p>",
source='wordpress'
)
mock_posts = [
{
'id': 1,
'title': 'Test Post',
'content': '<p>Updated content</p>',
}
]
with patch.object(self.service, '_fetch_wordpress_posts') as mock_fetch:
mock_fetch.return_value = mock_posts
result = self.service.sync_from_wordpress(self.integration)
# Should update existing, not create duplicate
content_count = Content.objects.filter(
site=self.site,
title='Test Post'
).count()
self.assertEqual(content_count, 1)

View File

@@ -0,0 +1,99 @@
"""
Tests for IntegrationService
Phase 6: Site Integration & Multi-Destination Publishing
"""
from django.test import TestCase
from igny8_core.auth.models import Account, Site, Sector
from igny8_core.business.integration.models import SiteIntegration
from igny8_core.business.integration.services.integration_service import IntegrationService
class IntegrationServiceTestCase(TestCase):
"""Test cases for IntegrationService"""
def setUp(self):
"""Set up test data"""
self.account = Account.objects.create(name="Test Account")
self.site = Site.objects.create(
account=self.account,
name="Test Site",
slug="test-site"
)
self.sector = Sector.objects.create(
account=self.account,
site=self.site,
name="Test Sector",
slug="test-sector"
)
self.service = IntegrationService()
def test_create_integration_stores_config(self):
"""Test: Site integrations work correctly"""
integration = self.service.create_integration(
site=self.site,
platform='wordpress',
config={'url': 'https://example.com'},
credentials={'api_key': 'test-key'},
platform_type='cms'
)
self.assertIsNotNone(integration)
self.assertEqual(integration.platform, 'wordpress')
self.assertEqual(integration.platform_type, 'cms')
self.assertEqual(integration.config_json.get('url'), 'https://example.com')
self.assertTrue(integration.is_active)
def test_get_integrations_for_site_returns_all(self):
"""Test: Site integrations work correctly"""
self.service.create_integration(
site=self.site,
platform='wordpress',
config={},
credentials={}
)
self.service.create_integration(
site=self.site,
platform='shopify',
config={},
credentials={}
)
integrations = self.service.get_integrations_for_site(self.site)
self.assertEqual(integrations.count(), 2)
platforms = [i.platform for i in integrations]
self.assertIn('wordpress', platforms)
self.assertIn('shopify', platforms)
def test_test_connection_validates_credentials(self):
"""Test: Site integrations work correctly"""
integration = self.service.create_integration(
site=self.site,
platform='wordpress',
config={'url': 'https://example.com'},
credentials={'api_key': 'test-key'}
)
with self.assertRaises(NotImplementedError):
# Connection testing to be implemented per platform
self.service.test_connection(integration)
def test_update_integration_updates_fields(self):
"""Test: Site integrations work correctly"""
integration = self.service.create_integration(
site=self.site,
platform='wordpress',
config={'url': 'https://old.com'},
credentials={}
)
updated = self.service.update_integration(
integration,
config={'url': 'https://new.com'},
is_active=False
)
self.assertEqual(updated.config_json.get('url'), 'https://new.com')
self.assertFalse(updated.is_active)

View File

@@ -0,0 +1,86 @@
"""
Tests for SyncService
Phase 6: Site Integration & Multi-Destination Publishing
"""
from django.test import TestCase
from django.utils import timezone
from unittest.mock import patch, Mock
from igny8_core.auth.models import Account, Site, Sector
from igny8_core.business.integration.models import SiteIntegration
from igny8_core.business.integration.services.sync_service import SyncService
class SyncServiceTestCase(TestCase):
"""Test cases for SyncService"""
def setUp(self):
"""Set up test data"""
self.account = Account.objects.create(name="Test Account")
self.site = Site.objects.create(
account=self.account,
name="Test Site",
slug="test-site"
)
self.sector = Sector.objects.create(
account=self.account,
site=self.site,
name="Test Sector",
slug="test-sector"
)
self.integration = SiteIntegration.objects.create(
account=self.account,
site=self.site,
platform='wordpress',
platform_type='cms',
sync_enabled=True,
sync_status='pending'
)
self.service = SyncService()
def test_sync_updates_status(self):
"""Test: Two-way sync functions properly"""
with patch.object(self.service, '_sync_to_external') as mock_sync_to, \
patch.object(self.service, '_sync_from_external') as mock_sync_from:
mock_sync_to.return_value = {'success': True, 'synced': 5}
mock_sync_from.return_value = {'success': True, 'synced': 3}
result = self.service.sync(self.integration, direction='both')
self.assertTrue(result.get('success'))
self.integration.refresh_from_db()
self.assertEqual(self.integration.sync_status, 'success')
self.assertIsNotNone(self.integration.last_sync_at)
def test_sync_to_external_only(self):
"""Test: Two-way sync functions properly"""
with patch.object(self.service, '_sync_to_external') as mock_sync_to:
mock_sync_to.return_value = {'success': True, 'synced': 5}
result = self.service.sync(self.integration, direction='to_external')
self.assertTrue(result.get('success'))
mock_sync_to.assert_called_once()
def test_sync_from_external_only(self):
"""Test: WordPress sync works (when plugin connected)"""
with patch.object(self.service, '_sync_from_external') as mock_sync_from:
mock_sync_from.return_value = {'success': True, 'synced': 3}
result = self.service.sync(self.integration, direction='from_external')
self.assertTrue(result.get('success'))
mock_sync_from.assert_called_once()
def test_sync_handles_errors(self):
"""Test: Two-way sync functions properly"""
with patch.object(self.service, '_sync_to_external') as mock_sync_to:
mock_sync_to.side_effect = Exception("Sync failed")
result = self.service.sync(self.integration, direction='to_external')
self.assertFalse(result.get('success'))
self.integration.refresh_from_db()
self.assertEqual(self.integration.sync_status, 'failed')
self.assertIsNotNone(self.integration.sync_error)