Initial commit: igny8 project

This commit is contained in:
igny8
2025-11-09 10:27:02 +00:00
commit 60b8188111
27265 changed files with 4360521 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
"""
IGNY8 Utilities Module
"""

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,272 @@
"""
Content Normalizer - Normalizes AI-generated content from different formats
Handles full HTML documents, HTML fragments, and plain text
Preserves HTML structure for future use on blogs/pages
"""
import re
import logging
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
# Try to import BeautifulSoup, fallback to regex if not available
try:
from bs4 import BeautifulSoup
HAS_BEAUTIFULSOUP = True
except ImportError:
HAS_BEAUTIFULSOUP = False
logger.warning("BeautifulSoup4 not available, using regex fallback for HTML parsing")
def normalize_content(content: str, content_format: Optional[str] = None) -> Dict[str, Any]:
"""
Normalize content from different AI response formats.
Args:
content: Raw content from AI (can be full HTML, HTML fragments, or plain text)
content_format: Optional hint about format ('html_document', 'html_fragment', 'plain_text')
Returns:
Dict with:
- 'normalized_content': Clean HTML ready for display and storage
- 'content_type': 'html' or 'text'
- 'has_structure': bool (True if has headings, lists, etc.)
- 'original_format': Detected format
"""
if not content or not isinstance(content, str):
return {
'normalized_content': '',
'content_type': 'text',
'has_structure': False,
'original_format': 'empty'
}
content = content.strip()
# Detect format
detected_format = content_format or _detect_content_format(content)
logger.info(f"Normalizing content: format={detected_format}, length={len(content)}")
try:
if detected_format == 'html_document':
# Extract body content from full HTML document
normalized = _extract_body_content(content)
elif detected_format == 'html_fragment':
# Clean and normalize HTML fragments
normalized = _normalize_html_fragment(content)
else:
# Plain text - convert to HTML
normalized = _convert_text_to_html(content)
# Final cleanup and validation
normalized = _final_cleanup(normalized)
# Check if content has structure (headings, lists, etc.)
has_structure = _has_structure(normalized)
return {
'normalized_content': normalized,
'content_type': 'html',
'has_structure': has_structure,
'original_format': detected_format
}
except Exception as e:
logger.error(f"Error normalizing content: {e}", exc_info=True)
# Fallback: return sanitized version
return {
'normalized_content': _escape_html(content),
'content_type': 'text',
'has_structure': False,
'original_format': 'error'
}
def _detect_content_format(content: str) -> str:
"""Detect the format of the content"""
content_lower = content.lower().strip()
# Check for full HTML document
if (content_lower.startswith('<!doctype') or
content_lower.startswith('<html') or
('<html' in content_lower and '<body' in content_lower)):
return 'html_document'
# Check for HTML fragments (has HTML tags but not full document)
if re.search(r'<[h1-6p]|</[h1-6p]|<ul|<ol|<li|<table|<div|<article|<section', content_lower):
return 'html_fragment'
# Plain text
return 'plain_text'
def _extract_body_content(html_content: str) -> str:
"""Extract body content from full HTML document"""
if HAS_BEAUTIFULSOUP:
try:
soup = BeautifulSoup(html_content, 'html.parser')
# Find article, main, or body tag
article = soup.find('article')
if article:
return str(article)
main = soup.find('main')
if main:
return str(main)
body = soup.find('body')
if body:
# Extract all children of body
body_content = ''.join(str(child) for child in body.children)
return body_content
# If no body found, return the whole thing (shouldn't happen)
return html_content
except Exception as e:
logger.warning(f"Error parsing HTML with BeautifulSoup: {e}")
# Fallback: use regex to extract body content
body_match = re.search(r'<body[^>]*>(.*?)</body>', html_content, re.DOTALL | re.IGNORECASE)
if body_match:
return body_match.group(1)
# If still no body, try to extract article
article_match = re.search(r'<article[^>]*>(.*?)</article>', html_content, re.DOTALL | re.IGNORECASE)
if article_match:
return article_match.group(1)
# Try to extract main
main_match = re.search(r'<main[^>]*>(.*?)</main>', html_content, re.DOTALL | re.IGNORECASE)
if main_match:
return main_match.group(1)
return html_content
def _normalize_html_fragment(html_content: str) -> str:
"""Normalize HTML fragments - ensure proper structure"""
if HAS_BEAUTIFULSOUP:
try:
soup = BeautifulSoup(html_content, 'html.parser')
# Wrap in article if not already wrapped and has structure
if not soup.find('article') and not soup.find('div', class_='content-wrapper'):
# Check if content has structure
if soup.find(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']):
# Has headings - wrap in article
article = soup.new_tag('article')
# Move all top-level elements into article
for element in list(soup.children):
if hasattr(element, 'extract'):
article.append(element.extract())
else:
article.append(str(element))
soup.append(article)
# Clean up empty tags (except br, hr, img)
for tag in soup.find_all():
if not tag.get_text(strip=True) and not tag.find_all(['img', 'br', 'hr']):
tag.decompose()
return str(soup)
except Exception as e:
logger.warning(f"Error normalizing HTML fragment with BeautifulSoup: {e}")
# Fallback: basic regex cleanup
# Remove empty tags
html_content = re.sub(r'<(\w+)[^>]*>\s*</\1>', '', html_content)
# Wrap in article if not already wrapped
if not re.search(r'<article', html_content, re.IGNORECASE):
if re.search(r'<h[1-6]', html_content, re.IGNORECASE):
html_content = f'<article>{html_content}</article>'
return html_content
def _convert_text_to_html(text: str) -> str:
"""Convert plain text to HTML"""
lines = text.split('\n')
html_parts = []
in_list = False
for line in lines:
trimmed = line.strip()
if not trimmed:
if in_list:
html_parts.append('</ul>')
in_list = False
html_parts.append('<br>')
continue
# Check for headings (markdown style or plain)
if trimmed.startswith('# '):
if in_list:
html_parts.append('</ul>')
in_list = False
html_parts.append(f'<h1>{_escape_html(trimmed[2:])}</h1>')
elif trimmed.startswith('## '):
if in_list:
html_parts.append('</ul>')
in_list = False
html_parts.append(f'<h2>{_escape_html(trimmed[3:])}</h2>')
elif trimmed.startswith('### '):
if in_list:
html_parts.append('</ul>')
in_list = False
html_parts.append(f'<h3>{_escape_html(trimmed[4:])}</h3>')
elif trimmed.startswith('#### '):
if in_list:
html_parts.append('</ul>')
in_list = False
html_parts.append(f'<h4>{_escape_html(trimmed[5:])}</h4>')
# Check for bullet points
elif trimmed.startswith('- ') or trimmed.startswith('* '):
if not in_list:
html_parts.append('<ul>')
in_list = True
text = trimmed[2:].strip()
html_parts.append(f'<li>{_escape_html(text)}</li>')
else:
if in_list:
html_parts.append('</ul>')
in_list = False
html_parts.append(f'<p>{_escape_html(trimmed)}</p>')
if in_list:
html_parts.append('</ul>')
return '<article>' + ''.join(html_parts) + '</article>'
def _final_cleanup(html_content: str) -> str:
"""Final cleanup of HTML content"""
# Remove extra whitespace between tags
html_content = re.sub(r'>\s+<', '><', html_content)
# Fix multiple br tags (max 2 consecutive)
html_content = re.sub(r'(<br\s*/?>){3,}', '<br><br>', html_content)
# Remove leading/trailing whitespace
html_content = html_content.strip()
return html_content
def _has_structure(html_content: str) -> bool:
"""Check if content has structural elements"""
structure_tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'ul', 'ol', 'table']
return any(f'<{tag}' in html_content.lower() for tag in structure_tags)
def _escape_html(text: str) -> str:
"""Escape HTML special characters"""
return (text
.replace('&', '&amp;')
.replace('<', '&lt;')
.replace('>', '&gt;')
.replace('"', '&quot;')
.replace("'", '&#x27;'))

View File

@@ -0,0 +1,89 @@
"""
Queue Manager - Manages Celery tasks for async operations
"""
import logging
from typing import Dict, Any, Optional
from django.conf import settings
logger = logging.getLogger(__name__)
# TODO: When Celery is set up, import and use actual task decorators
# from celery import shared_task
class QueueManager:
"""
Queue manager for async task execution.
Provides abstraction over Celery/django-q for task queuing.
"""
def __init__(self):
self.queue_enabled = getattr(settings, 'CELERY_ENABLED', False)
def enqueue_task(
self,
task_name: str,
args: tuple = (),
kwargs: Dict[str, Any] = None,
account_id: Optional[int] = None,
priority: int = 5,
delay_seconds: int = 0
) -> str:
"""
Enqueue a task for async execution.
Args:
task_name: Name of the task function
args: Positional arguments
kwargs: Keyword arguments
account_id: Account ID for account-specific queues
priority: Task priority (1-10, higher = more priority)
delay_seconds: Delay before execution (seconds)
Returns:
Task ID
"""
if not self.queue_enabled:
logger.warning(f"Queue not enabled, task {task_name} would be queued")
return "no-queue"
# TODO: Implement actual Celery task enqueueing
# Example: task.delay(*args, **kwargs)
logger.info(f"Enqueueing task {task_name} for account {account_id}")
return "task-id-placeholder"
def get_task_status(self, task_id: str) -> Dict[str, Any]:
"""
Get status of a queued task.
Args:
task_id: Task ID returned from enqueue_task
Returns:
Dict with 'status', 'result', 'error'
"""
# TODO: Implement task status checking
return {
'status': 'PENDING',
'result': None,
'error': None,
}
def cancel_task(self, task_id: str) -> bool:
"""
Cancel a queued or running task.
Args:
task_id: Task ID to cancel
Returns:
True if cancelled, False otherwise
"""
# TODO: Implement task cancellation
logger.info(f"Cancelling task {task_id}")
return False
# Singleton instance
queue_manager = QueueManager()

View File

@@ -0,0 +1,218 @@
"""
WordPress Integration Service
Handles communication with WordPress sites via REST API
"""
import logging
import requests
from typing import Dict, Any, Optional
from django.conf import settings
logger = logging.getLogger(__name__)
class WordPressClient:
"""
WordPress REST API client for content publishing and sync.
"""
def __init__(self, site_url: str, username: str = None, app_password: str = None):
"""
Initialize WordPress client.
Args:
site_url: WordPress site URL (e.g., https://example.com)
username: WordPress username or application password username
app_password: WordPress application password
"""
self.site_url = site_url.rstrip('/')
self.api_base = f"{self.site_url}/wp-json/wp/v2"
self.igny8_api_base = f"{self.site_url}/wp-json/igny8/v1" # Custom IGNY8 endpoints
self.username = username
self.app_password = app_password
self.session = requests.Session()
# Set up authentication if provided
if username and app_password:
self.session.auth = (username, app_password)
def test_connection(self) -> Dict[str, Any]:
"""
Test connection to WordPress site.
Returns:
Dict with 'success', 'message', 'wp_version'
"""
try:
response = self.session.get(f"{self.api_base}/")
if response.status_code == 200:
data = response.json()
return {
'success': True,
'message': 'Connection successful',
'wp_version': data.get('version', 'Unknown'),
}
return {
'success': False,
'message': f"HTTP {response.status_code}",
'wp_version': None,
}
except Exception as e:
logger.error(f"WordPress connection test failed: {e}")
return {
'success': False,
'message': str(e),
'wp_version': None,
}
def create_post(
self,
title: str,
content: str,
status: str = 'draft',
featured_image_url: Optional[str] = None,
**kwargs
) -> Dict[str, Any]:
"""
Create a new WordPress post.
Args:
title: Post title
content: Post content (HTML or blocks)
status: Post status ('draft', 'publish', 'pending')
featured_image_url: URL of featured image (must be uploaded first)
**kwargs: Additional post fields (excerpt, categories, etc.)
Returns:
Dict with 'success', 'post_id', 'url', 'error'
"""
try:
post_data = {
'title': title,
'content': content,
'status': status,
**kwargs
}
if featured_image_url:
# Convert URL to media ID if needed
media_id = self._get_media_id_from_url(featured_image_url)
if media_id:
post_data['featured_media'] = media_id
response = self.session.post(f"{self.api_base}/posts", json=post_data)
if response.status_code in [200, 201]:
data = response.json()
return {
'success': True,
'post_id': data.get('id'),
'url': data.get('link'),
'error': None,
}
return {
'success': False,
'post_id': None,
'url': None,
'error': f"HTTP {response.status_code}: {response.text}",
}
except Exception as e:
logger.error(f"WordPress post creation failed: {e}")
return {
'success': False,
'post_id': None,
'url': None,
'error': str(e),
}
def upload_image(self, image_url: str, filename: str = None) -> Dict[str, Any]:
"""
Upload an image to WordPress media library.
Args:
image_url: URL of image to upload
filename: Optional filename
Returns:
Dict with 'success', 'media_id', 'url', 'error'
"""
try:
# Download image
img_response = requests.get(image_url)
if img_response.status_code != 200:
return {
'success': False,
'media_id': None,
'url': None,
'error': f"Failed to download image: HTTP {img_response.status_code}",
}
# Upload to WordPress
files = {
'file': (filename or 'image.jpg', img_response.content, img_response.headers.get('content-type', 'image/jpeg'))
}
response = self.session.post(f"{self.api_base}/media", files=files)
if response.status_code in [200, 201]:
data = response.json()
return {
'success': True,
'media_id': data.get('id'),
'url': data.get('source_url'),
'error': None,
}
return {
'success': False,
'media_id': None,
'url': None,
'error': f"HTTP {response.status_code}: {response.text}",
}
except Exception as e:
logger.error(f"WordPress image upload failed: {e}")
return {
'success': False,
'media_id': None,
'url': None,
'error': str(e),
}
def _get_media_id_from_url(self, url: str) -> Optional[int]:
"""Helper to get media ID from URL (if already uploaded)."""
# TODO: Implement media lookup by URL
return None
def sync_settings(self, settings_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Sync settings to WordPress via custom IGNY8 endpoint.
Args:
settings_data: Settings dictionary to sync
Returns:
Dict with 'success', 'message', 'error'
"""
try:
response = self.session.post(
f"{self.igny8_api_base}/sync-settings",
json=settings_data
)
if response.status_code == 200:
return {
'success': True,
'message': 'Settings synced successfully',
'error': None,
}
return {
'success': False,
'message': None,
'error': f"HTTP {response.status_code}: {response.text}",
}
except Exception as e:
logger.error(f"WordPress settings sync failed: {e}")
return {
'success': False,
'message': None,
'error': str(e),
}