stage 4-1

This commit is contained in:
alorig
2025-11-20 03:30:39 +05:00
parent 6c05adc990
commit ec3ca2da5d
7 changed files with 1304 additions and 84 deletions

View File

@@ -1,6 +1,7 @@
"""
Content Sync Service
Phase 6: Site Integration & Multi-Destination Publishing
Stage 4: Enhanced with taxonomy and product sync
Syncs content between IGNY8 and external platforms.
"""
@@ -8,6 +9,7 @@ import logging
from typing import Dict, Any, Optional, List
from igny8_core.business.integration.models import SiteIntegration
from igny8_core.utils.wordpress import WordPressClient
logger = logging.getLogger(__name__)
@@ -98,6 +100,7 @@ class ContentSyncService:
) -> Dict[str, Any]:
"""
Sync content from IGNY8 to WordPress.
Stage 4: Enhanced to sync taxonomies before content.
Args:
integration: SiteIntegration instance
@@ -106,15 +109,67 @@ class ContentSyncService:
Returns:
dict: Sync result
"""
# TODO: Implement WordPress sync
# This will use the WordPress adapter to publish content
logger.info(f"[ContentSyncService] Syncing to WordPress for integration {integration.id}")
return {
'success': True,
'synced_count': 0,
'message': 'WordPress sync to external not yet fully implemented'
}
try:
# Get WordPress client
credentials = integration.get_credentials()
client = WordPressClient(
site_url=integration.config_json.get('site_url', ''),
username=credentials.get('username'),
app_password=credentials.get('app_password')
)
# Stage 4: Sync taxonomies first
taxonomy_result = self._sync_taxonomies_to_wordpress(integration, client)
# Sync content (posts/products)
from igny8_core.business.content.models import Content
from igny8_core.business.publishing.services.adapters.wordpress_adapter import WordPressAdapter
content_query = Content.objects.filter(
account=integration.account,
site=integration.site,
source='igny8',
status='publish'
)
if content_types:
content_query = content_query.filter(content_type__in=content_types)
synced_count = 0
adapter = WordPressAdapter()
destination_config = {
'site_url': integration.config_json.get('site_url', ''),
'username': credentials.get('username'),
'app_password': credentials.get('app_password'),
'status': 'publish'
}
for content in content_query[:100]: # Limit to 100 per sync
result = adapter.publish(content, destination_config)
if result.get('success'):
synced_count += 1
# Store external reference
if not content.metadata:
content.metadata = {}
content.metadata['wordpress_id'] = result.get('external_id')
content.save(update_fields=['metadata'])
return {
'success': True,
'synced_count': synced_count,
'taxonomies_synced': taxonomy_result.get('synced_count', 0),
'message': f'Synced {synced_count} content items and {taxonomy_result.get("synced_count", 0)} taxonomies'
}
except Exception as e:
logger.error(
f"[ContentSyncService] Error syncing to WordPress: {str(e)}",
exc_info=True
)
return {
'success': False,
'error': str(e),
'synced_count': 0
}
def sync_from_wordpress(
self,
@@ -183,6 +238,7 @@ class ContentSyncService:
) -> Dict[str, Any]:
"""
Internal method for syncing from WordPress (used by sync_from_external).
Stage 4: Enhanced to sync taxonomies and products.
Args:
integration: SiteIntegration instance
@@ -191,7 +247,46 @@ class ContentSyncService:
Returns:
dict: Sync result
"""
return self.sync_from_wordpress(integration)
try:
# Get WordPress client
credentials = integration.get_credentials()
client = WordPressClient(
site_url=integration.config_json.get('site_url', ''),
username=credentials.get('username'),
app_password=credentials.get('app_password')
)
# Stage 4: Sync taxonomies first
taxonomy_result = self._sync_taxonomies_from_wordpress(integration, client)
# Sync posts
posts_result = self.sync_from_wordpress(integration)
# Sync WooCommerce products if available
products_result = self._sync_products_from_wordpress(integration, client)
total_synced = (
posts_result.get('synced_count', 0) +
products_result.get('synced_count', 0)
)
return {
'success': True,
'synced_count': total_synced,
'taxonomies_synced': taxonomy_result.get('synced_count', 0),
'posts_synced': posts_result.get('synced_count', 0),
'products_synced': products_result.get('synced_count', 0),
}
except Exception as e:
logger.error(
f"[ContentSyncService] Error syncing from WordPress: {str(e)}",
exc_info=True
)
return {
'success': False,
'error': str(e),
'synced_count': 0
}
def _fetch_wordpress_posts(
self,
@@ -206,10 +301,299 @@ class ContentSyncService:
Returns:
List of post dictionaries
"""
# TODO: Implement actual WordPress API call
# For now, return empty list - tests will mock this
logger.info(f"[ContentSyncService] Fetching WordPress posts for integration {integration.id}")
return []
try:
credentials = integration.get_credentials()
client = WordPressClient(
site_url=integration.config_json.get('site_url', ''),
username=credentials.get('username'),
app_password=credentials.get('app_password')
)
# Fetch posts via WordPress REST API
import requests
response = client.session.get(
f"{client.api_base}/posts",
params={'per_page': 100, 'status': 'publish'}
)
if response.status_code == 200:
posts = response.json()
return [
{
'id': post.get('id'),
'title': post.get('title', {}).get('rendered', ''),
'content': post.get('content', {}).get('rendered', ''),
'status': post.get('status', 'publish'),
'categories': post.get('categories', []),
'tags': post.get('tags', [])
}
for post in posts
]
return []
except Exception as e:
logger.error(f"Error fetching WordPress posts: {e}")
return []
# Stage 4: Taxonomy Sync Methods
def _sync_taxonomies_from_wordpress(
self,
integration: SiteIntegration,
client: WordPressClient
) -> Dict[str, Any]:
"""
Sync taxonomies from WordPress to IGNY8.
Args:
integration: SiteIntegration instance
client: WordPressClient instance
Returns:
dict: Sync result with synced_count
"""
try:
from igny8_core.business.site_building.models import SiteBlueprint
from igny8_core.business.site_building.services.taxonomy_service import TaxonomyService
# Get or create site blueprint for this site
blueprint = SiteBlueprint.objects.filter(
account=integration.account,
site=integration.site
).first()
if not blueprint:
logger.warning(f"No blueprint found for site {integration.site.id}, skipping taxonomy sync")
return {'success': True, 'synced_count': 0}
taxonomy_service = TaxonomyService()
synced_count = 0
# Sync WordPress categories
categories = client.get_categories(per_page=100)
category_records = [
{
'name': cat['name'],
'slug': cat['slug'],
'description': cat.get('description', ''),
'taxonomy_type': 'blog_category',
'external_reference': str(cat['id']),
'metadata': {'parent': cat.get('parent', 0)}
}
for cat in categories
]
if category_records:
taxonomy_service.import_from_external(
blueprint,
category_records,
default_type='blog_category'
)
synced_count += len(category_records)
# Sync WordPress tags
tags = client.get_tags(per_page=100)
tag_records = [
{
'name': tag['name'],
'slug': tag['slug'],
'description': tag.get('description', ''),
'taxonomy_type': 'blog_tag',
'external_reference': str(tag['id'])
}
for tag in tags
]
if tag_records:
taxonomy_service.import_from_external(
blueprint,
tag_records,
default_type='blog_tag'
)
synced_count += len(tag_records)
# Sync WooCommerce product categories if available
try:
product_categories = client.get_product_categories(per_page=100)
product_category_records = [
{
'name': cat['name'],
'slug': cat['slug'],
'description': cat.get('description', ''),
'taxonomy_type': 'product_category',
'external_reference': f"wc_cat_{cat['id']}",
'metadata': {'parent': cat.get('parent', 0)}
}
for cat in product_categories
]
if product_category_records:
taxonomy_service.import_from_external(
blueprint,
product_category_records,
default_type='product_category'
)
synced_count += len(product_category_records)
except Exception as e:
logger.warning(f"WooCommerce not available or error fetching product categories: {e}")
return {
'success': True,
'synced_count': synced_count
}
except Exception as e:
logger.error(f"Error syncing taxonomies from WordPress: {e}", exc_info=True)
return {
'success': False,
'error': str(e),
'synced_count': 0
}
def _sync_taxonomies_to_wordpress(
self,
integration: SiteIntegration,
client: WordPressClient
) -> Dict[str, Any]:
"""
Ensure taxonomies exist in WordPress before publishing content.
Args:
integration: SiteIntegration instance
client: WordPressClient instance
Returns:
dict: Sync result with synced_count
"""
try:
from igny8_core.business.site_building.models import SiteBlueprint, SiteBlueprintTaxonomy
# Get site blueprint
blueprint = SiteBlueprint.objects.filter(
account=integration.account,
site=integration.site
).first()
if not blueprint:
return {'success': True, 'synced_count': 0}
synced_count = 0
# Get taxonomies that don't have external_reference (not yet synced)
taxonomies = SiteBlueprintTaxonomy.objects.filter(
site_blueprint=blueprint,
external_reference__isnull=True
)
for taxonomy in taxonomies:
try:
if taxonomy.taxonomy_type in ['blog_category', 'product_category']:
result = client.create_category(
name=taxonomy.name,
slug=taxonomy.slug,
description=taxonomy.description
)
if result.get('success'):
taxonomy.external_reference = str(result.get('category_id'))
taxonomy.save(update_fields=['external_reference'])
synced_count += 1
elif taxonomy.taxonomy_type in ['blog_tag', 'product_tag']:
result = client.create_tag(
name=taxonomy.name,
slug=taxonomy.slug,
description=taxonomy.description
)
if result.get('success'):
taxonomy.external_reference = str(result.get('tag_id'))
taxonomy.save(update_fields=['external_reference'])
synced_count += 1
except Exception as e:
logger.warning(f"Error syncing taxonomy {taxonomy.id} to WordPress: {e}")
continue
return {
'success': True,
'synced_count': synced_count
}
except Exception as e:
logger.error(f"Error syncing taxonomies to WordPress: {e}", exc_info=True)
return {
'success': False,
'error': str(e),
'synced_count': 0
}
def _sync_products_from_wordpress(
self,
integration: SiteIntegration,
client: WordPressClient
) -> Dict[str, Any]:
"""
Sync WooCommerce products from WordPress to IGNY8.
Args:
integration: SiteIntegration instance
client: WordPressClient instance
Returns:
dict: Sync result with synced_count
"""
try:
products = client.get_products(per_page=100)
synced_count = 0
from igny8_core.business.content.models import Content
for product in products:
content, created = Content.objects.get_or_create(
account=integration.account,
site=integration.site,
sector=integration.site.sectors.first() if hasattr(integration.site, 'sectors') else None,
title=product.get('name', ''),
source='wordpress',
defaults={
'html_content': product.get('description', ''),
'content_type': 'product',
'status': 'published' if product.get('status') == 'publish' else 'draft',
'metadata': {
'wordpress_id': product.get('id'),
'product_type': product.get('type'),
'sku': product.get('sku'),
'price': product.get('price'),
'regular_price': product.get('regular_price'),
'sale_price': product.get('sale_price'),
'categories': product.get('categories', []),
'tags': product.get('tags', []),
'attributes': product.get('attributes', [])
}
}
)
if not created:
content.html_content = product.get('description', '')
if not content.metadata:
content.metadata = {}
content.metadata.update({
'wordpress_id': product.get('id'),
'product_type': product.get('type'),
'sku': product.get('sku'),
'price': product.get('price'),
'regular_price': product.get('regular_price'),
'sale_price': product.get('sale_price'),
'categories': product.get('categories', []),
'tags': product.get('tags', []),
'attributes': product.get('attributes', [])
})
content.save()
synced_count += 1
return {
'success': True,
'synced_count': synced_count
}
except Exception as e:
logger.error(f"Error syncing products from WordPress: {e}", exc_info=True)
return {
'success': False,
'error': str(e),
'synced_count': 0
}
def _sync_to_shopify(
self,

View File

@@ -1,10 +1,11 @@
"""
WordPress Integration Service
Handles communication with WordPress sites via REST API
Stage 4: Enhanced with taxonomy and WooCommerce product support
"""
import logging
import requests
from typing import Dict, Any, Optional
from typing import Dict, Any, Optional, List
from django.conf import settings
logger = logging.getLogger(__name__)
@@ -26,6 +27,7 @@ class WordPressClient:
"""
self.site_url = site_url.rstrip('/')
self.api_base = f"{self.site_url}/wp-json/wp/v2"
self.woocommerce_api_base = f"{self.site_url}/wp-json/wc/v3" # WooCommerce API
self.igny8_api_base = f"{self.site_url}/wp-json/igny8/v1" # Custom IGNY8 endpoints
self.username = username
self.app_password = app_password
@@ -215,4 +217,377 @@ class WordPressClient:
'message': None,
'error': str(e),
}
# Stage 4: Taxonomy Methods
def get_categories(self, per_page: int = 100, page: int = 1) -> List[Dict[str, Any]]:
"""
Get WordPress categories.
Args:
per_page: Number of categories per page
page: Page number
Returns:
List of category dictionaries
"""
try:
response = self.session.get(
f"{self.api_base}/categories",
params={'per_page': per_page, 'page': page}
)
if response.status_code == 200:
categories = response.json()
return [
{
'id': cat.get('id'),
'name': cat.get('name', ''),
'slug': cat.get('slug', ''),
'description': cat.get('description', ''),
'count': cat.get('count', 0),
'parent': cat.get('parent', 0),
'link': cat.get('link', '')
}
for cat in categories
]
logger.warning(f"Failed to fetch categories: HTTP {response.status_code}")
return []
except Exception as e:
logger.error(f"Error fetching WordPress categories: {e}")
return []
def create_category(
self,
name: str,
slug: Optional[str] = None,
description: Optional[str] = None,
parent_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Create a WordPress category.
Args:
name: Category name
slug: Category slug (auto-generated if not provided)
description: Category description
parent_id: Parent category ID
Returns:
Dict with 'success', 'category_id', 'error'
"""
try:
category_data = {'name': name}
if slug:
category_data['slug'] = slug
if description:
category_data['description'] = description
if parent_id:
category_data['parent'] = parent_id
response = self.session.post(
f"{self.api_base}/categories",
json=category_data
)
if response.status_code in [200, 201]:
data = response.json()
return {
'success': True,
'category_id': data.get('id'),
'slug': data.get('slug'),
'error': None,
}
return {
'success': False,
'category_id': None,
'slug': None,
'error': f"HTTP {response.status_code}: {response.text}",
}
except Exception as e:
logger.error(f"WordPress category creation failed: {e}")
return {
'success': False,
'category_id': None,
'slug': None,
'error': str(e),
}
def get_tags(self, per_page: int = 100, page: int = 1) -> List[Dict[str, Any]]:
"""
Get WordPress tags.
Args:
per_page: Number of tags per page
page: Page number
Returns:
List of tag dictionaries
"""
try:
response = self.session.get(
f"{self.api_base}/tags",
params={'per_page': per_page, 'page': page}
)
if response.status_code == 200:
tags = response.json()
return [
{
'id': tag.get('id'),
'name': tag.get('name', ''),
'slug': tag.get('slug', ''),
'description': tag.get('description', ''),
'count': tag.get('count', 0),
'link': tag.get('link', '')
}
for tag in tags
]
logger.warning(f"Failed to fetch tags: HTTP {response.status_code}")
return []
except Exception as e:
logger.error(f"Error fetching WordPress tags: {e}")
return []
def create_tag(
self,
name: str,
slug: Optional[str] = None,
description: Optional[str] = None
) -> Dict[str, Any]:
"""
Create a WordPress tag.
Args:
name: Tag name
slug: Tag slug (auto-generated if not provided)
description: Tag description
Returns:
Dict with 'success', 'tag_id', 'error'
"""
try:
tag_data = {'name': name}
if slug:
tag_data['slug'] = slug
if description:
tag_data['description'] = description
response = self.session.post(
f"{self.api_base}/tags",
json=tag_data
)
if response.status_code in [200, 201]:
data = response.json()
return {
'success': True,
'tag_id': data.get('id'),
'slug': data.get('slug'),
'error': None,
}
return {
'success': False,
'tag_id': None,
'slug': None,
'error': f"HTTP {response.status_code}: {response.text}",
}
except Exception as e:
logger.error(f"WordPress tag creation failed: {e}")
return {
'success': False,
'tag_id': None,
'slug': None,
'error': str(e),
}
# Stage 4: WooCommerce Product Methods
def get_products(
self,
per_page: int = 100,
page: int = 1,
status: str = 'publish'
) -> List[Dict[str, Any]]:
"""
Get WooCommerce products.
Args:
per_page: Number of products per page
page: Page number
status: Product status ('publish', 'draft', etc.)
Returns:
List of product dictionaries
"""
try:
response = self.session.get(
f"{self.woocommerce_api_base}/products",
params={
'per_page': per_page,
'page': page,
'status': status
}
)
if response.status_code == 200:
products = response.json()
return [
{
'id': prod.get('id'),
'name': prod.get('name', ''),
'slug': prod.get('slug', ''),
'sku': prod.get('sku', ''),
'type': prod.get('type', 'simple'),
'status': prod.get('status', 'publish'),
'description': prod.get('description', ''),
'short_description': prod.get('short_description', ''),
'price': prod.get('price', ''),
'regular_price': prod.get('regular_price', ''),
'sale_price': prod.get('sale_price', ''),
'on_sale': prod.get('on_sale', False),
'stock_status': prod.get('stock_status', 'instock'),
'stock_quantity': prod.get('stock_quantity'),
'categories': prod.get('categories', []),
'tags': prod.get('tags', []),
'images': prod.get('images', []),
'attributes': prod.get('attributes', []),
'variations': prod.get('variations', []),
'permalink': prod.get('permalink', '')
}
for prod in products
]
logger.warning(f"Failed to fetch products: HTTP {response.status_code}")
return []
except Exception as e:
logger.error(f"Error fetching WooCommerce products: {e}")
return []
def create_product(self, product_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Create a WooCommerce product.
Args:
product_data: Product data dictionary
Returns:
Dict with 'success', 'product_id', 'error'
"""
try:
response = self.session.post(
f"{self.woocommerce_api_base}/products",
json=product_data
)
if response.status_code in [200, 201]:
data = response.json()
return {
'success': True,
'product_id': data.get('id'),
'slug': data.get('slug'),
'error': None,
}
return {
'success': False,
'product_id': None,
'slug': None,
'error': f"HTTP {response.status_code}: {response.text}",
}
except Exception as e:
logger.error(f"WooCommerce product creation failed: {e}")
return {
'success': False,
'product_id': None,
'slug': None,
'error': str(e),
}
def get_product_categories(self, per_page: int = 100, page: int = 1) -> List[Dict[str, Any]]:
"""
Get WooCommerce product categories.
Args:
per_page: Number of categories per page
page: Page number
Returns:
List of product category dictionaries
"""
try:
response = self.session.get(
f"{self.woocommerce_api_base}/products/categories",
params={'per_page': per_page, 'page': page}
)
if response.status_code == 200:
categories = response.json()
return [
{
'id': cat.get('id'),
'name': cat.get('name', ''),
'slug': cat.get('slug', ''),
'description': cat.get('description', ''),
'count': cat.get('count', 0),
'parent': cat.get('parent', 0),
'image': cat.get('image', {}).get('src') if cat.get('image') else None
}
for cat in categories
]
logger.warning(f"Failed to fetch product categories: HTTP {response.status_code}")
return []
except Exception as e:
logger.error(f"Error fetching WooCommerce product categories: {e}")
return []
def get_product_attributes(self) -> List[Dict[str, Any]]:
"""
Get WooCommerce product attributes.
Returns:
List of product attribute dictionaries with terms
"""
try:
# Get attributes
response = self.session.get(f"{self.woocommerce_api_base}/products/attributes")
if response.status_code != 200:
logger.warning(f"Failed to fetch product attributes: HTTP {response.status_code}")
return []
attributes = response.json()
result = []
for attr in attributes:
attr_id = attr.get('id')
# Get terms for this attribute
terms_response = self.session.get(
f"{self.woocommerce_api_base}/products/attributes/{attr_id}/terms"
)
terms = []
if terms_response.status_code == 200:
terms_data = terms_response.json()
terms = [
{
'id': term.get('id'),
'name': term.get('name', ''),
'slug': term.get('slug', '')
}
for term in terms_data
]
result.append({
'id': attr_id,
'name': attr.get('name', ''),
'slug': attr.get('slug', ''),
'type': attr.get('type', 'select'),
'order_by': attr.get('order_by', 'menu_order'),
'has_archives': attr.get('has_archives', False),
'terms': terms
})
return result
except Exception as e:
logger.error(f"Error fetching WooCommerce product attributes: {e}")
return []