35 Commits

Author SHA1 Message Date
alorig
a0f3e3a778 Create __init__.py 2025-11-18 05:03:34 +05:00
alorig
40d379dd7e Pahse 5 2025-11-18 05:03:27 +05:00
alorig
342d9eab17 rearrange 2025-11-18 04:16:37 +05:00
alorig
8040d983de asd 2025-11-18 03:32:36 +05:00
alorig
abcbf687ae Phase 4 pending 2025-11-18 03:27:56 +05:00
IGNY8 VPS (Salman)
ee56f9bbac Refactor frontend components to use new icon imports and improve default values
- Updated `EnhancedMetricCard` to set a default accent color to blue.
- Replaced `lucide-react` icons with custom icons in `LinkResults`, `OptimizationScores`, and various pages in the Automation and Optimizer sections.
- Enhanced button layouts in `AutomationRules`, `Tasks`, and `ContentSelector` for better alignment and user experience.
- Improved loading indicators across components for a more consistent UI experience.
2025-11-17 21:38:08 +00:00
IGNY8 VPS (Salman)
0818dfe385 Add automation routes and enhance header metrics management
- Introduced new routes for Automation Rules and Automation Tasks in the frontend.
- Updated the AppSidebar to include sub-items for Automation navigation.
- Enhanced HeaderMetricsContext to manage credit and page metrics more effectively, ensuring proper merging and clearing of metrics.
- Adjusted AppLayout and TablePageTemplate to maintain credit balance while managing page metrics.
2025-11-17 21:04:46 +00:00
IGNY8 VPS (Salman)
aa74fb0d65 1 2025-11-17 20:50:51 +00:00
IGNY8 VPS (Salman)
a7d432500f docs 2025-11-17 20:35:04 +00:00
IGNY8 VPS (Salman)
b6b1aecdce Update site builder configurations and enhance migration dependencies
- Added `OptimizationConfig` to `INSTALLED_APPS` in `settings.py`.
- Updated migration dependencies in `0001_initial.py` to include `writer` for content source fields.
- Modified the `account` ForeignKey in `PageBlueprint` and `SiteBlueprint` models to use `tenant_id` for better clarity.
- Deleted obsolete implementation plan documents for phases 0-4 to streamline project documentation.
- Improved overall project structure by removing outdated files and enhancing migration clarity.
2025-11-17 20:22:29 +00:00
alorig
f7115190dc Add Linker and Optimizer modules with API integration and frontend components
- Added Linker and Optimizer apps to `INSTALLED_APPS` in `settings.py`.
- Configured API endpoints for Linker and Optimizer in `urls.py`.
- Implemented `OptimizeContentFunction` for content optimization in the AI module.
- Created prompts for content optimization and site structure generation.
- Updated `OptimizerService` to utilize the new AI function for content optimization.
- Developed frontend components including dashboards and content lists for Linker and Optimizer.
- Integrated new routes and sidebar navigation for Linker and Optimizer in the frontend.
- Enhanced content management with source and sync status filters in the Writer module.
- Comprehensive test coverage added for new features and components.
2025-11-18 00:41:00 +05:00
IGNY8 VPS (Salman)
4b9e1a49a9 Remove obsolete scripts and files, update site builder configurations
- Deleted the `import_plans.py`, `run_tests.py`, and `test_run.py` scripts as they are no longer needed.
- Updated the initial migration dependency in `0001_initial.py` to reflect recent changes in the `igny8_core_auth` app.
- Enhanced the implementation plan documentation to include new phases and updates on the site builder project.
- Updated the `vite.config.ts` and `package.json` to integrate testing configurations and dependencies for the site builder.
2025-11-17 17:48:15 +00:00
IGNY8 VPS (Salman)
5a36686844 Add site builder service to Docker Compose and remove obsolete scripts
- Introduced a new service `igny8_site_builder` in `docker-compose.app.yml` for site building functionality, including environment variables and volume mappings.
- Deleted several outdated scripts: `create_test_users.py`, `test_image_write_access.py`, `update_free_plan.py`, and the database file `db.sqlite3` to clean up the backend.
- Updated Django settings and URL configurations to integrate the new site builder module.
2025-11-17 16:08:51 +00:00
IGNY8 VPS (Salman)
e3d4ba2c02 Remove unused files and enhance error handling in serializers.py
- Deleted the outdated backend dependency file for drf-spectacular.
- Removed an unused image from the frontend assets.
- Improved error handling in TasksSerializer by catching ObjectDoesNotExist in addition to AttributeError.
2025-11-17 13:21:32 +00:00
alorig
2605c62eec Revert "Update serializers.py"
This reverts commit 41c1501764.
2025-11-17 17:34:18 +05:00
alorig
41c1501764 Update serializers.py 2025-11-17 17:32:46 +05:00
alorig
fe7af3c81c Revert "Enhance dashboard data fetching by adding active site checks"
This reverts commit 75ba407df5.
2025-11-17 17:28:30 +05:00
alorig
ea9ffedc01 Revert "Update Usage.tsx"
This reverts commit bf6589449f.
2025-11-17 17:28:24 +05:00
alorig
bf6589449f Update Usage.tsx 2025-11-17 17:24:38 +05:00
alorig
75ba407df5 Enhance dashboard data fetching by adding active site checks
- Implemented checks for active site in Home, Planner, and Writer dashboards to prevent data fetching when no site is selected.
- Updated API calls to include site_id in requests for better data accuracy.
- Modified user messages to guide users in selecting an active site for insights.
2025-11-17 17:22:15 +05:00
alorig
4b21009cf8 Update README.md 2025-11-17 16:59:48 +05:00
IGNY8 VPS (Salman)
8a9dd8ed2f aaaa 2025-11-17 11:58:45 +00:00
IGNY8 VPS (Salman)
9930728e8a Add source tracking and sync status fields to Content model; update services module
- Introduced new fields in the Content model for source tracking and sync status, including external references and optimization fields.
- Updated the services module to include new content generation and pipeline services for better organization and clarity.
2025-11-17 11:15:15 +00:00
IGNY8 VPS (Salman)
fe95d09bbe phase 0 to 2 completed 2025-11-16 23:02:22 +00:00
IGNY8 VPS (Salman)
4ecc1706bc celery 2025-11-16 22:57:36 +00:00
IGNY8 VPS (Salman)
0f02bd6409 celery 2025-11-16 22:52:43 +00:00
IGNY8 VPS (Salman)
1134285a12 Update app labels for billing, writer, and planner models; fix foreign key references in automation migrations
- Set app labels for CreditTransaction and CreditUsageLog models to 'billing'.
- Updated app labels for Tasks, Content, and Images models to 'writer'.
- Changed foreign key references in automation migrations from 'account' to 'tenant' for consistency.
2025-11-16 22:37:16 +00:00
IGNY8 VPS (Salman)
1c2c9354ba Add automation module to settings and update app labels
- Registered the new AutomationConfig in the INSTALLED_APPS of settings.py.
- Set the app_label for AutomationRule and ScheduledTask models to 'automation' for better organization and clarity in the database schema.
2025-11-16 22:23:39 +00:00
IGNY8 VPS (Salman)
92f51859fe reaminign phase 1-2 tasks 2025-11-16 22:17:33 +00:00
IGNY8 VPS (Salman)
7f8982a0ab Add scheduled automation task and update URL routing
- Introduced a new scheduled task for executing automation rules every 5 minutes in the Celery beat schedule.
- Updated URL routing to include a new endpoint for automation-related functionalities.
- Refactored imports in various modules to align with the new business layer structure, ensuring backward compatibility for billing models, exceptions, and services.
2025-11-16 22:11:05 +00:00
IGNY8 VPS (Salman)
455358ecfc Refactor domain structure to business layer
- Renamed `domain/` to `business/` to better reflect the organization of code by business logic.
- Updated all relevant file paths and references throughout the project to align with the new structure.
- Ensured that all models and services are now located under the `business/` directory, maintaining existing functionality while improving clarity.
2025-11-16 21:47:51 +00:00
IGNY8 VPS (Salman)
cb0e42bb8d dd 2025-11-16 21:33:55 +00:00
IGNY8 VPS (Salman)
9ab87416d8 Merge branch 'feature/phase-0-credit-system' 2025-11-16 21:29:55 +00:00
IGNY8 VPS (Salman)
b2e60b749a 1 2025-11-16 20:02:45 +00:00
IGNY8 VPS (Salman)
9f3c4a6cdd Fix middleware: Don't set request.user, only request.account
- Middleware should only set request.account, not request.user
- Let DRF authentication handle request.user setting
- This prevents conflicts between middleware and DRF authentication
- Fixes /me endpoint returning wrong user issue
2025-11-16 19:49:55 +00:00
284 changed files with 24521 additions and 5464 deletions

View File

@@ -6,7 +6,7 @@ Full-stack SaaS platform for SEO keyword management and AI-driven content genera
---
## 🏗️ Architecture
## 🏗️ Architectures
- **Backend**: Django 5.2+ with Django REST Framework (Port 8010/8011)
- **Frontend**: React 19 with TypeScript and Vite (Port 5173/8021)

View File

@@ -1,37 +0,0 @@
Collecting drf-spectacular
Downloading drf_spectacular-0.29.0-py3-none-any.whl.metadata (14 kB)
Requirement already satisfied: Django>=2.2 in /usr/local/lib/python3.11/site-packages (from drf-spectacular) (5.2.8)
Requirement already satisfied: djangorestframework>=3.10.3 in /usr/local/lib/python3.11/site-packages (from drf-spectacular) (3.16.1)
Collecting uritemplate>=2.0.0 (from drf-spectacular)
Downloading uritemplate-4.2.0-py3-none-any.whl.metadata (2.6 kB)
Collecting PyYAML>=5.1 (from drf-spectacular)
Downloading pyyaml-6.0.3-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (2.4 kB)
Collecting jsonschema>=2.6.0 (from drf-spectacular)
Downloading jsonschema-4.25.1-py3-none-any.whl.metadata (7.6 kB)
Collecting inflection>=0.3.1 (from drf-spectacular)
Downloading inflection-0.5.1-py2.py3-none-any.whl.metadata (1.7 kB)
Requirement already satisfied: asgiref>=3.8.1 in /usr/local/lib/python3.11/site-packages (from Django>=2.2->drf-spectacular) (3.10.0)
Requirement already satisfied: sqlparse>=0.3.1 in /usr/local/lib/python3.11/site-packages (from Django>=2.2->drf-spectacular) (0.5.3)
Collecting attrs>=22.2.0 (from jsonschema>=2.6.0->drf-spectacular)
Downloading attrs-25.4.0-py3-none-any.whl.metadata (10 kB)
Collecting jsonschema-specifications>=2023.03.6 (from jsonschema>=2.6.0->drf-spectacular)
Downloading jsonschema_specifications-2025.9.1-py3-none-any.whl.metadata (2.9 kB)
Collecting referencing>=0.28.4 (from jsonschema>=2.6.0->drf-spectacular)
Downloading referencing-0.37.0-py3-none-any.whl.metadata (2.8 kB)
Collecting rpds-py>=0.7.1 (from jsonschema>=2.6.0->drf-spectacular)
Downloading rpds_py-0.28.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.1 kB)
Requirement already satisfied: typing-extensions>=4.4.0 in /usr/local/lib/python3.11/site-packages (from referencing>=0.28.4->jsonschema>=2.6.0->drf-spectacular) (4.15.0)
Downloading drf_spectacular-0.29.0-py3-none-any.whl (105 kB)
Downloading inflection-0.5.1-py2.py3-none-any.whl (9.5 kB)
Downloading jsonschema-4.25.1-py3-none-any.whl (90 kB)
Downloading attrs-25.4.0-py3-none-any.whl (67 kB)
Downloading jsonschema_specifications-2025.9.1-py3-none-any.whl (18 kB)
Downloading pyyaml-6.0.3-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl (806 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 806.6/806.6 kB 36.0 MB/s 0:00:00
Downloading referencing-0.37.0-py3-none-any.whl (26 kB)
Downloading rpds_py-0.28.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (382 kB)
Downloading uritemplate-4.2.0-py3-none-any.whl (11 kB)
Installing collected packages: uritemplate, rpds-py, PyYAML, inflection, attrs, referencing, jsonschema-specifications, jsonschema, drf-spectacular
Successfully installed PyYAML-6.0.3 attrs-25.4.0 drf-spectacular-0.29.0 inflection-0.5.1 jsonschema-4.25.1 jsonschema-specifications-2025.9.1 referencing-0.37.0 rpds-py-0.28.0 uritemplate-4.2.0
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager, possibly rendering your system unusable. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv. Use the --root-user-action option if you know what you are doing and want to suppress this warning.

Binary file not shown.

View File

@@ -1,187 +0,0 @@
#!/usr/bin/env python
"""
Script to create 3 real users with 3 paid packages (Starter, Growth, Scale)
All accounts will be active and properly configured.
Email format: plan-name@igny8.com
"""
import os
import django
import sys
from decimal import Decimal
# Setup Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'igny8_core.settings')
django.setup()
from django.db import transaction
from igny8_core.auth.models import Plan, Account, User
from django.utils.text import slugify
# User data - 3 users with 3 different paid plans
# Email format: plan-name@igny8.com
USERS_DATA = [
{
"email": "starter@igny8.com",
"username": "starter",
"first_name": "Starter",
"last_name": "Account",
"password": "SecurePass123!@#",
"plan_slug": "starter", # $89/month
"account_name": "Starter Account",
},
{
"email": "growth@igny8.com",
"username": "growth",
"first_name": "Growth",
"last_name": "Account",
"password": "SecurePass123!@#",
"plan_slug": "growth", # $139/month
"account_name": "Growth Account",
},
{
"email": "scale@igny8.com",
"username": "scale",
"first_name": "Scale",
"last_name": "Account",
"password": "SecurePass123!@#",
"plan_slug": "scale", # $229/month
"account_name": "Scale Account",
},
]
def create_user_with_plan(user_data):
"""Create a user with account and assigned plan."""
try:
with transaction.atomic():
# Get the plan
try:
plan = Plan.objects.get(slug=user_data['plan_slug'], is_active=True)
except Plan.DoesNotExist:
print(f"❌ ERROR: Plan '{user_data['plan_slug']}' not found or inactive!")
return None
# Check if user already exists
if User.objects.filter(email=user_data['email']).exists():
print(f"⚠️ User {user_data['email']} already exists. Updating...")
existing_user = User.objects.get(email=user_data['email'])
if existing_user.account:
existing_user.account.plan = plan
existing_user.account.status = 'active'
existing_user.account.save()
print(f" ✅ Updated account plan to {plan.name} and set status to active")
return existing_user
# Generate unique account slug
base_slug = slugify(user_data['account_name'])
account_slug = base_slug
counter = 1
while Account.objects.filter(slug=account_slug).exists():
account_slug = f"{base_slug}-{counter}"
counter += 1
# Create user first (without account)
user = User.objects.create_user(
username=user_data['username'],
email=user_data['email'],
password=user_data['password'],
first_name=user_data['first_name'],
last_name=user_data['last_name'],
account=None, # Will be set after account creation
role='owner'
)
# Create account with user as owner and assigned plan
account = Account.objects.create(
name=user_data['account_name'],
slug=account_slug,
owner=user,
plan=plan,
status='active', # Set to active
credits=plan.included_credits or 0, # Set initial credits from plan
)
# Update user to reference the new account
user.account = account
user.save()
print(f"✅ Created user: {user.email}")
print(f" - Name: {user.get_full_name()}")
print(f" - Username: {user.username}")
print(f" - Account: {account.name} (slug: {account.slug})")
print(f" - Plan: {plan.name} (${plan.price}/month)")
print(f" - Status: {account.status}")
print(f" - Credits: {account.credits}")
print(f" - Max Sites: {plan.max_sites}")
print(f" - Max Users: {plan.max_users}")
print()
return user
except Exception as e:
print(f"❌ ERROR creating user {user_data['email']}: {e}")
import traceback
traceback.print_exc()
return None
def main():
"""Main function to create all users."""
print("=" * 80)
print("Creating 3 Users with Paid Plans")
print("=" * 80)
print()
# Verify plans exist
print("Checking available plans...")
plans = Plan.objects.filter(is_active=True).order_by('price')
if plans.count() < 3:
print(f"⚠️ WARNING: Only {plans.count()} active plan(s) found. Need at least 3.")
print("Available plans:")
for p in plans:
print(f" - {p.slug} (${p.price})")
print()
print("Please run import_plans.py first to create the plans.")
return
print("✅ Found plans:")
for p in plans:
print(f" - {p.name} ({p.slug}): ${p.price}/month")
print()
# Create users
created_users = []
for user_data in USERS_DATA:
user = create_user_with_plan(user_data)
if user:
created_users.append(user)
# Summary
print("=" * 80)
print("SUMMARY")
print("=" * 80)
print(f"Total users created/updated: {len(created_users)}")
print()
print("User Login Credentials:")
print("-" * 80)
for user_data in USERS_DATA:
print(f"Email: {user_data['email']}")
print(f"Password: {user_data['password']}")
print(f"Plan: {user_data['plan_slug'].title()}")
print()
print("✅ All users created successfully!")
print()
print("You can now log in with any of these accounts at:")
print("https://app.igny8.com/login")
if __name__ == '__main__':
try:
main()
except Exception as e:
print(f"❌ Fatal error: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 164 KiB

View File

@@ -34,6 +34,8 @@ class AIEngine:
return f"{count} task{'s' if count != 1 else ''}"
elif function_name == 'generate_images':
return f"{count} task{'s' if count != 1 else ''}"
elif function_name == 'generate_site_structure':
return "1 site blueprint"
return f"{count} item{'s' if count != 1 else ''}"
def _build_validation_message(self, function_name: str, payload: dict, count: int, input_description: str) -> str:
@@ -80,6 +82,12 @@ class AIEngine:
total_images = 1 + max_images
return f"Mapping Content for {total_images} Image Prompts"
return f"Mapping Content for Image Prompts"
elif function_name == 'generate_site_structure':
blueprint_name = ''
if isinstance(data, dict):
blueprint = data.get('blueprint')
blueprint_name = f"{getattr(blueprint, 'name', '')}" if blueprint and getattr(blueprint, 'name', None) else ''
return f"Preparing site blueprint {blueprint_name}".strip()
return f"Preparing {count} item{'s' if count != 1 else ''}"
def _get_ai_call_message(self, function_name: str, count: int) -> str:
@@ -92,6 +100,8 @@ class AIEngine:
return f"Writing article{'s' if count != 1 else ''} with AI"
elif function_name == 'generate_images':
return f"Creating image{'s' if count != 1 else ''} with AI"
elif function_name == 'generate_site_structure':
return "Designing complete site architecture"
return f"Processing with AI"
def _get_parse_message(self, function_name: str) -> str:
@@ -104,6 +114,8 @@ class AIEngine:
return "Formatting content"
elif function_name == 'generate_images':
return "Processing images"
elif function_name == 'generate_site_structure':
return "Compiling site map"
return "Processing results"
def _get_parse_message_with_count(self, function_name: str, count: int) -> str:
@@ -122,6 +134,8 @@ class AIEngine:
if in_article_count > 0:
return f"Writing {in_article_count} Inarticle Image Prompts"
return "Writing Inarticle Image Prompts"
elif function_name == 'generate_site_structure':
return f"{count} page blueprint{'s' if count != 1 else ''} mapped"
return f"{count} item{'s' if count != 1 else ''} processed"
def _get_save_message(self, function_name: str, count: int) -> str:
@@ -137,6 +151,8 @@ class AIEngine:
elif function_name == 'generate_image_prompts':
# Count is total prompts created
return f"Assigning {count} Prompts to Dedicated Slots"
elif function_name == 'generate_site_structure':
return f"Publishing {count} page blueprint{'s' if count != 1 else ''}"
return f"Saving {count} item{'s' if count != 1 else ''}"
def execute(self, fn: BaseAIFunction, payload: dict) -> dict:
@@ -195,8 +211,8 @@ class AIEngine:
# Phase 2.5: CREDIT CHECK - Check credits before AI call (25%)
if self.account:
try:
from igny8_core.modules.billing.services import CreditService
from igny8_core.modules.billing.exceptions import InsufficientCreditsError
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
# Map function name to operation type
operation_type = self._get_operation_type(function_name)
@@ -353,8 +369,8 @@ class AIEngine:
# Phase 5.5: DEDUCT CREDITS - Deduct credits after successful save
if self.account and raw_response:
try:
from igny8_core.modules.billing.services import CreditService
from igny8_core.modules.billing.exceptions import InsufficientCreditsError
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
# Map function name to operation type
operation_type = self._get_operation_type(function_name)
@@ -494,6 +510,7 @@ class AIEngine:
'generate_content': 'content_generation',
'generate_image_prompts': 'image_prompt_extraction',
'generate_images': 'image_generation',
'generate_site_structure': 'site_structure_generation',
}
return mapping.get(function_name, function_name)
@@ -554,6 +571,7 @@ class AIEngine:
'generate_content': 'content',
'generate_image_prompts': 'image',
'generate_images': 'image',
'generate_site_structure': 'site_blueprint',
}
return mapping.get(function_name, 'unknown')

View File

@@ -6,6 +6,7 @@ from igny8_core.ai.functions.generate_ideas import GenerateIdeasFunction
from igny8_core.ai.functions.generate_content import GenerateContentFunction
from igny8_core.ai.functions.generate_images import GenerateImagesFunction, generate_images_core
from igny8_core.ai.functions.generate_image_prompts import GenerateImagePromptsFunction
from igny8_core.ai.functions.generate_site_structure import GenerateSiteStructureFunction
__all__ = [
'AutoClusterFunction',
@@ -14,4 +15,5 @@ __all__ = [
'GenerateImagesFunction',
'generate_images_core',
'GenerateImagePromptsFunction',
'GenerateSiteStructureFunction',
]

View File

@@ -0,0 +1,214 @@
"""
Generate Site Structure AI Function
Phase 3 Site Builder
"""
import json
import logging
from typing import Any, Dict, List, Tuple
from django.utils.text import slugify
from igny8_core.ai.base import BaseAIFunction
from igny8_core.ai.prompts import PromptRegistry
from igny8_core.business.site_building.models import SiteBlueprint, PageBlueprint
logger = logging.getLogger(__name__)
class GenerateSiteStructureFunction(BaseAIFunction):
"""AI function that turns a business brief into a full site blueprint."""
def get_name(self) -> str:
return 'generate_site_structure'
def get_metadata(self) -> Dict:
metadata = super().get_metadata()
metadata.update({
'display_name': 'Generate Site Structure',
'description': 'Create site/page architecture from business brief, objectives, and style guides.',
'phases': {
'INIT': 'Validating blueprint data…',
'PREP': 'Preparing site context…',
'AI_CALL': 'Generating site structure with AI…',
'PARSE': 'Parsing generated blueprint…',
'SAVE': 'Saving pages and blocks…',
'DONE': 'Site structure ready!'
}
})
return metadata
def validate(self, payload: dict, account=None) -> Dict[str, Any]:
if not payload.get('ids'):
return {'valid': False, 'error': 'Site blueprint ID is required'}
return {'valid': True}
def prepare(self, payload: dict, account=None) -> Dict[str, Any]:
blueprint_ids = payload.get('ids', [])
queryset = SiteBlueprint.objects.filter(id__in=blueprint_ids)
if account:
queryset = queryset.filter(account=account)
blueprint = queryset.select_related('account', 'site').prefetch_related('pages').first()
if not blueprint:
raise ValueError("Site blueprint not found")
config = blueprint.config_json or {}
business_brief = payload.get('business_brief') or config.get('business_brief') or ''
objectives = payload.get('objectives') or config.get('objectives') or []
style = payload.get('style') or config.get('style') or {}
return {
'blueprint': blueprint,
'business_brief': business_brief,
'objectives': objectives,
'style': style,
}
def build_prompt(self, data: Dict[str, Any], account=None) -> str:
blueprint: SiteBlueprint = data['blueprint']
objectives = data.get('objectives') or []
objectives_text = '\n'.join(f"- {obj}" for obj in objectives) if isinstance(objectives, list) else objectives
style = data.get('style') or {}
style_text = json.dumps(style, indent=2) if isinstance(style, dict) and style else str(style)
existing_pages = [
{
'title': page.title,
'slug': page.slug,
'type': page.type,
'status': page.status,
}
for page in blueprint.pages.all()
]
context = {
'BUSINESS_BRIEF': data.get('business_brief', ''),
'OBJECTIVES': objectives_text or 'Create a full marketing site with clear navigation.',
'STYLE': style_text or 'Modern, responsive, accessible web design.',
'SITE_INFO': json.dumps({
'site_name': blueprint.name,
'site_description': blueprint.description,
'hosting_type': blueprint.hosting_type,
'existing_pages': existing_pages,
'existing_structure': blueprint.structure_json or {},
}, indent=2)
}
return PromptRegistry.get_prompt(
'generate_site_structure',
account=account or blueprint.account,
context=context
)
def parse_response(self, response: str, step_tracker=None) -> Dict[str, Any]:
if not response:
raise ValueError("AI response is empty")
response = response.strip()
try:
return self._ensure_dict(json.loads(response))
except json.JSONDecodeError:
logger.warning("Response not valid JSON, attempting to extract JSON object")
cleaned = self._extract_json_object(response)
if cleaned:
return self._ensure_dict(json.loads(cleaned))
raise ValueError("Unable to parse AI response into JSON")
def save_output(
self,
parsed: Dict[str, Any],
original_data: Dict[str, Any],
account=None,
progress_tracker=None,
step_tracker=None
) -> Dict[str, Any]:
blueprint: SiteBlueprint = original_data['blueprint']
structure = self._ensure_dict(parsed)
pages = structure.get('pages', [])
blueprint.structure_json = structure
blueprint.status = 'ready'
blueprint.save(update_fields=['structure_json', 'status', 'updated_at'])
created, updated, deleted = self._sync_page_blueprints(blueprint, pages)
message = f"Pages synced (created: {created}, updated: {updated}, deleted: {deleted})"
logger.info("[GenerateSiteStructure] %s for blueprint %s", message, blueprint.id)
return {
'success': True,
'count': created + updated,
'site_blueprint_id': blueprint.id,
'pages_created': created,
'pages_updated': updated,
'pages_deleted': deleted,
}
# Helpers -----------------------------------------------------------------
def _ensure_dict(self, data: Any) -> Dict[str, Any]:
if isinstance(data, dict):
return data
raise ValueError("AI response must be a JSON object with site metadata")
def _extract_json_object(self, text: str) -> str:
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1 and end > start:
return text[start:end + 1]
return ''
def _sync_page_blueprints(self, blueprint: SiteBlueprint, pages: List[Dict[str, Any]]) -> Tuple[int, int, int]:
existing = {page.slug: page for page in blueprint.pages.all()}
seen_slugs = set()
created = updated = 0
for order, page_data in enumerate(pages or []):
slug = page_data.get('slug') or page_data.get('id') or page_data.get('title') or f"page-{order + 1}"
slug = slugify(slug) or f"page-{order + 1}"
seen_slugs.add(slug)
defaults = {
'title': page_data.get('title') or page_data.get('name') or slug.replace('-', ' ').title(),
'type': self._map_page_type(page_data.get('type')),
'blocks_json': page_data.get('blocks') or page_data.get('sections') or [],
'status': page_data.get('status') or 'draft',
'order': order,
}
page_obj, created_flag = PageBlueprint.objects.update_or_create(
site_blueprint=blueprint,
slug=slug,
defaults=defaults
)
if created_flag:
created += 1
else:
updated += 1
# Delete pages not present in new structure
deleted = 0
for slug, page in existing.items():
if slug not in seen_slugs:
page.delete()
deleted += 1
return created, updated, deleted
def _map_page_type(self, page_type: Any) -> str:
allowed = {choice[0] for choice in PageBlueprint._meta.get_field('type').choices}
if isinstance(page_type, str):
normalized = page_type.lower()
if normalized in allowed:
return normalized
# Map friendly names
mapping = {
'homepage': 'home',
'landing': 'home',
'service': 'services',
'product': 'products',
}
mapped = mapping.get(normalized)
if mapped in allowed:
return mapped
return 'custom'

View File

@@ -0,0 +1,167 @@
"""
Optimize Content AI Function
Phase 4 Linker & Optimizer
"""
import json
import logging
from typing import Any, Dict
from igny8_core.ai.base import BaseAIFunction
from igny8_core.ai.prompts import PromptRegistry
from igny8_core.business.content.models import Content
logger = logging.getLogger(__name__)
class OptimizeContentFunction(BaseAIFunction):
"""AI function that optimizes content for SEO, readability, and engagement."""
def get_name(self) -> str:
return 'optimize_content'
def get_metadata(self) -> Dict:
metadata = super().get_metadata()
metadata.update({
'display_name': 'Optimize Content',
'description': 'Optimize content for SEO, readability, and engagement.',
'phases': {
'INIT': 'Validating content data…',
'PREP': 'Preparing content context…',
'AI_CALL': 'Optimizing content with AI…',
'PARSE': 'Parsing optimized content…',
'SAVE': 'Saving optimized content…',
'DONE': 'Content optimized!'
}
})
return metadata
def validate(self, payload: dict, account=None) -> Dict[str, Any]:
if not payload.get('ids'):
return {'valid': False, 'error': 'Content ID is required'}
return {'valid': True}
def prepare(self, payload: dict, account=None) -> Dict[str, Any]:
content_ids = payload.get('ids', [])
queryset = Content.objects.filter(id__in=content_ids)
if account:
queryset = queryset.filter(account=account)
content = queryset.select_related('account', 'site', 'sector').first()
if not content:
raise ValueError("Content not found")
# Get current scores from analyzer
from igny8_core.business.optimization.services.analyzer import ContentAnalyzer
analyzer = ContentAnalyzer()
scores_before = analyzer.analyze(content)
return {
'content': content,
'scores_before': scores_before,
'html_content': content.html_content or '',
'meta_title': content.meta_title or '',
'meta_description': content.meta_description or '',
'primary_keyword': content.primary_keyword or '',
}
def build_prompt(self, data: Dict[str, Any], account=None) -> str:
content: Content = data['content']
scores_before = data.get('scores_before', {})
context = {
'CONTENT_TITLE': content.title or 'Untitled',
'HTML_CONTENT': data.get('html_content', ''),
'META_TITLE': data.get('meta_title', ''),
'META_DESCRIPTION': data.get('meta_description', ''),
'PRIMARY_KEYWORD': data.get('primary_keyword', ''),
'WORD_COUNT': str(content.word_count or 0),
'CURRENT_SCORES': json.dumps(scores_before, indent=2),
'SOURCE': content.source,
'INTERNAL_LINKS_COUNT': str(len(content.internal_links) if content.internal_links else 0),
}
return PromptRegistry.get_prompt(
'optimize_content',
account=account or content.account,
context=context
)
def parse_response(self, response: str, step_tracker=None) -> Dict[str, Any]:
if not response:
raise ValueError("AI response is empty")
response = response.strip()
try:
return self._ensure_dict(json.loads(response))
except json.JSONDecodeError:
logger.warning("Response not valid JSON, attempting to extract JSON object")
cleaned = self._extract_json_object(response)
if cleaned:
return self._ensure_dict(json.loads(cleaned))
raise ValueError("Unable to parse AI response into JSON")
def save_output(
self,
parsed: Dict[str, Any],
original_data: Dict[str, Any],
account=None,
progress_tracker=None,
step_tracker=None
) -> Dict[str, Any]:
content: Content = original_data['content']
# Extract optimized content
optimized_html = parsed.get('html_content') or parsed.get('content') or content.html_content
optimized_meta_title = parsed.get('meta_title') or content.meta_title
optimized_meta_description = parsed.get('meta_description') or content.meta_description
# Update content
content.html_content = optimized_html
if optimized_meta_title:
content.meta_title = optimized_meta_title
if optimized_meta_description:
content.meta_description = optimized_meta_description
# Recalculate word count
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
content_service = ContentGenerationService()
content.word_count = content_service._count_words(optimized_html)
# Increment optimizer version
content.optimizer_version += 1
# Get scores after optimization
from igny8_core.business.optimization.services.analyzer import ContentAnalyzer
analyzer = ContentAnalyzer()
scores_after = analyzer.analyze(content)
content.optimization_scores = scores_after
content.save(update_fields=[
'html_content', 'meta_title', 'meta_description',
'word_count', 'optimizer_version', 'optimization_scores', 'updated_at'
])
return {
'success': True,
'content_id': content.id,
'scores_before': original_data.get('scores_before', {}),
'scores_after': scores_after,
'word_count_before': original_data.get('word_count', 0),
'word_count_after': content.word_count,
'html_content': optimized_html,
'meta_title': optimized_meta_title,
'meta_description': optimized_meta_description,
}
# Helper methods
def _ensure_dict(self, data: Any) -> Dict[str, Any]:
if isinstance(data, dict):
return data
raise ValueError("AI response must be a JSON object")
def _extract_json_object(self, text: str) -> str:
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1 and end > start:
return text[start:end + 1]
return ''

View File

@@ -0,0 +1,2 @@
# AI functions tests

View File

@@ -0,0 +1,179 @@
"""
Tests for OptimizeContentFunction
"""
from unittest.mock import Mock, patch, MagicMock
from django.test import TestCase
from igny8_core.business.content.models import Content
from igny8_core.ai.functions.optimize_content import OptimizeContentFunction
from igny8_core.api.tests.test_integration_base import IntegrationTestBase
class OptimizeContentFunctionTests(IntegrationTestBase):
"""Tests for OptimizeContentFunction"""
def setUp(self):
super().setUp()
self.function = OptimizeContentFunction()
# Create test content
self.content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test Content",
html_content="<p>This is test content.</p>",
meta_title="Test Title",
meta_description="Test description",
primary_keyword="test keyword",
word_count=500,
status='draft'
)
def test_function_validation_phase(self):
"""Test validation phase"""
# Valid payload
result = self.function.validate({'ids': [self.content.id]}, self.account)
self.assertTrue(result['valid'])
# Invalid payload - missing ids
result = self.function.validate({}, self.account)
self.assertFalse(result['valid'])
self.assertIn('error', result)
def test_function_prep_phase(self):
"""Test prep phase"""
payload = {'ids': [self.content.id]}
data = self.function.prepare(payload, self.account)
self.assertIn('content', data)
self.assertIn('scores_before', data)
self.assertIn('html_content', data)
self.assertEqual(data['content'].id, self.content.id)
def test_function_prep_phase_content_not_found(self):
"""Test prep phase with non-existent content"""
payload = {'ids': [99999]}
with self.assertRaises(ValueError):
self.function.prepare(payload, self.account)
@patch('igny8_core.ai.functions.optimize_content.PromptRegistry.get_prompt')
def test_function_build_prompt(self, mock_get_prompt):
"""Test prompt building"""
mock_get_prompt.return_value = "Test prompt"
data = {
'content': self.content,
'html_content': '<p>Test</p>',
'meta_title': 'Title',
'meta_description': 'Description',
'primary_keyword': 'keyword',
'scores_before': {'overall_score': 50.0}
}
prompt = self.function.build_prompt(data, self.account)
self.assertEqual(prompt, "Test prompt")
mock_get_prompt.assert_called_once()
# Check that context was passed
call_args = mock_get_prompt.call_args
self.assertIn('context', call_args.kwargs)
def test_function_parse_response_valid_json(self):
"""Test parsing valid JSON response"""
response = '{"html_content": "<p>Optimized</p>", "meta_title": "New Title"}'
parsed = self.function.parse_response(response)
self.assertIn('html_content', parsed)
self.assertEqual(parsed['html_content'], "<p>Optimized</p>")
self.assertEqual(parsed['meta_title'], "New Title")
def test_function_parse_response_invalid_json(self):
"""Test parsing invalid JSON response"""
response = "This is not JSON"
with self.assertRaises(ValueError):
self.function.parse_response(response)
def test_function_parse_response_extracts_json_object(self):
"""Test that JSON object is extracted from text"""
response = 'Some text {"html_content": "<p>Optimized</p>"} more text'
parsed = self.function.parse_response(response)
self.assertIn('html_content', parsed)
self.assertEqual(parsed['html_content'], "<p>Optimized</p>")
@patch('igny8_core.business.optimization.services.analyzer.ContentAnalyzer.analyze')
@patch('igny8_core.business.content.services.content_generation_service.ContentGenerationService._count_words')
def test_function_save_phase(self, mock_count_words, mock_analyze):
"""Test save phase updates content"""
mock_count_words.return_value = 600
mock_analyze.return_value = {
'seo_score': 75.0,
'readability_score': 80.0,
'engagement_score': 70.0,
'overall_score': 75.0
}
parsed = {
'html_content': '<p>Optimized content.</p>',
'meta_title': 'Optimized Title',
'meta_description': 'Optimized Description'
}
original_data = {
'content': self.content,
'scores_before': {'overall_score': 50.0},
'word_count': 500
}
result = self.function.save_output(parsed, original_data, self.account)
self.assertTrue(result['success'])
self.assertEqual(result['content_id'], self.content.id)
# Refresh content from DB
self.content.refresh_from_db()
self.assertEqual(self.content.html_content, '<p>Optimized content.</p>')
self.assertEqual(self.content.optimizer_version, 1)
self.assertIsNotNone(self.content.optimization_scores)
def test_function_handles_invalid_content_id(self):
"""Test that function handles invalid content ID"""
payload = {'ids': [99999]}
with self.assertRaises(ValueError):
self.function.prepare(payload, self.account)
def test_function_respects_account_isolation(self):
"""Test that function respects account isolation"""
from igny8_core.auth.models import Account
other_account = Account.objects.create(
name="Other Account",
slug="other",
plan=self.plan,
owner=self.user
)
payload = {'ids': [self.content.id]}
# Should not find content from different account
with self.assertRaises(ValueError):
self.function.prepare(payload, other_account)
def test_get_name(self):
"""Test get_name method"""
self.assertEqual(self.function.get_name(), 'optimize_content')
def test_get_metadata(self):
"""Test get_metadata method"""
metadata = self.function.get_metadata()
self.assertIn('display_name', metadata)
self.assertIn('description', metadata)
self.assertIn('phases', metadata)
self.assertEqual(metadata['display_name'], 'Optimize Content')

View File

@@ -239,6 +239,73 @@ OUTPUT FORMAT
Return ONLY the final JSON object.
Do NOT include any comments, formatting, or explanations.""",
'site_structure_generation': """You are a senior UX architect and information designer. Use the business brief, objectives, style references, and existing site info to propose a complete multi-page marketing website structure.
INPUT CONTEXT
==============
BUSINESS BRIEF:
[IGNY8_BUSINESS_BRIEF]
PRIMARY OBJECTIVES:
[IGNY8_OBJECTIVES]
STYLE & BRAND NOTES:
[IGNY8_STYLE]
SITE INFO / CURRENT STRUCTURE:
[IGNY8_SITE_INFO]
OUTPUT REQUIREMENTS
====================
Return ONE JSON object with the following keys:
{
"site": {
"name": "...",
"primary_navigation": ["home", "services", "about", "contact"],
"secondary_navigation": ["blog", "faq"],
"hero_message": "High level value statement",
"tone": "voice + tone summary"
},
"pages": [
{
"slug": "home",
"title": "Home",
"type": "home | about | services | products | blog | contact | custom",
"status": "draft",
"objective": "Explain the core brand promise and primary CTA",
"primary_cta": "Book a strategy call",
"seo": {
"meta_title": "...",
"meta_description": "..."
},
"blocks": [
{
"type": "hero | features | services | stats | testimonials | faq | contact | custom",
"heading": "Section headline",
"subheading": "Support copy",
"layout": "full-width | two-column | cards | carousel",
"content": [
"Bullet or short paragraph describing what to render in this block"
]
}
]
}
]
}
RULES
=====
- Include 58 pages covering the complete buyer journey (awareness → evaluation → conversion → trust).
- Every page must have at least 3 blocks with concrete guidance (no placeholders like "Lorem ipsum").
- Use consistent slug naming, all lowercase with hyphens.
- Type must match the allowed enum and reflect page intent.
- Ensure the navigation arrays align with the page list.
- Focus on practical descriptions that an engineering team can hand off directly to the Site Builder.
Return ONLY valid JSON. No commentary, explanations, or Markdown.
""",
'image_prompt_extraction': """Extract image prompts from the following article content.
ARTICLE TITLE: {title}
@@ -265,6 +332,62 @@ Make sure each prompt is detailed enough for image generation, describing the vi
'image_prompt_template': 'Create a high-quality {image_type} image to use as a featured photo for a blog post titled "{post_title}". The image should visually represent the theme, mood, and subject implied by the image prompt: {image_prompt}. Focus on a realistic, well-composed scene that naturally communicates the topic without text or logos. Use balanced lighting, pleasing composition, and photographic detail suitable for lifestyle or editorial web content. Avoid adding any visible or readable text, brand names, or illustrative effects. **And make sure image is not blurry.**',
'negative_prompt': 'text, watermark, logo, overlay, title, caption, writing on walls, writing on objects, UI, infographic elements, post title',
'optimize_content': """You are an expert content optimizer specializing in SEO, readability, and engagement.
Your task is to optimize the provided content to improve its SEO score, readability, and engagement metrics.
CURRENT CONTENT:
Title: {CONTENT_TITLE}
Word Count: {WORD_COUNT}
Source: {SOURCE}
Primary Keyword: {PRIMARY_KEYWORD}
Internal Links: {INTERNAL_LINKS_COUNT}
CURRENT META DATA:
Meta Title: {META_TITLE}
Meta Description: {META_DESCRIPTION}
CURRENT SCORES:
{CURRENT_SCORES}
HTML CONTENT:
{HTML_CONTENT}
OPTIMIZATION REQUIREMENTS:
1. SEO Optimization:
- Ensure meta title is 30-60 characters (if provided)
- Ensure meta description is 120-160 characters (if provided)
- Optimize primary keyword usage (natural, not keyword stuffing)
- Improve heading structure (H1, H2, H3 hierarchy)
- Add internal links where relevant (maintain existing links)
2. Readability:
- Average sentence length: 15-20 words
- Use clear, concise language
- Break up long paragraphs
- Use bullet points and lists where appropriate
- Ensure proper paragraph structure
3. Engagement:
- Add compelling headings
- Include relevant images placeholders (alt text)
- Use engaging language
- Create clear call-to-action sections
- Improve content flow and structure
OUTPUT FORMAT:
Return ONLY a JSON object in this format:
{{
"html_content": "[Optimized HTML content]",
"meta_title": "[Optimized meta title, 30-60 chars]",
"meta_description": "[Optimized meta description, 120-160 chars]",
"optimization_notes": "[Brief notes on what was optimized]"
}}
Do not include any explanations, text, or commentary outside the JSON output.
""",
}
# Mapping from function names to prompt types
@@ -275,6 +398,8 @@ Make sure each prompt is detailed enough for image generation, describing the vi
'generate_images': 'image_prompt_extraction',
'extract_image_prompts': 'image_prompt_extraction',
'generate_image_prompts': 'image_prompt_extraction',
'generate_site_structure': 'site_structure_generation',
'optimize_content': 'optimize_content',
}
@classmethod

View File

@@ -94,9 +94,21 @@ def _load_generate_image_prompts():
from igny8_core.ai.functions.generate_image_prompts import GenerateImagePromptsFunction
return GenerateImagePromptsFunction
def _load_generate_site_structure():
"""Lazy loader for generate_site_structure function"""
from igny8_core.ai.functions.generate_site_structure import GenerateSiteStructureFunction
return GenerateSiteStructureFunction
def _load_optimize_content():
"""Lazy loader for optimize_content function"""
from igny8_core.ai.functions.optimize_content import OptimizeContentFunction
return OptimizeContentFunction
register_lazy_function('auto_cluster', _load_auto_cluster)
register_lazy_function('generate_ideas', _load_generate_ideas)
register_lazy_function('generate_content', _load_generate_content)
register_lazy_function('generate_images', _load_generate_images)
register_lazy_function('generate_image_prompts', _load_generate_image_prompts)
register_lazy_function('generate_site_structure', _load_generate_site_structure)
register_lazy_function('optimize_content', _load_optimize_content)

View File

@@ -0,0 +1,86 @@
from __future__ import annotations
from igny8_core.ai.functions.generate_site_structure import GenerateSiteStructureFunction
from igny8_core.business.site_building.models import PageBlueprint
from igny8_core.business.site_building.tests.base import SiteBuilderTestBase
class GenerateSiteStructureFunctionTests(SiteBuilderTestBase):
"""Covers parsing + persistence logic for the Site Builder AI function."""
def setUp(self):
super().setUp()
self.function = GenerateSiteStructureFunction()
def test_parse_response_extracts_json_object(self):
noisy_response = """
Thoughts about the request…
{
"site": {"name": "Acme Robotics"},
"pages": [{"slug": "home", "title": "Home"}]
}
Extra commentary that should be ignored.
"""
parsed = self.function.parse_response(noisy_response)
self.assertEqual(parsed['site']['name'], 'Acme Robotics')
self.assertEqual(parsed['pages'][0]['slug'], 'home')
def test_save_output_updates_structure_and_syncs_pages(self):
# Existing page to prove update/delete flows.
legacy_page = PageBlueprint.objects.create(
site_blueprint=self.blueprint,
slug='legacy',
title='Legacy Page',
type='custom',
blocks_json=[],
order=5,
)
parsed = {
'site': {'name': 'Future Robotics'},
'pages': [
{
'slug': 'home',
'title': 'Homepage',
'type': 'home',
'status': 'ready',
'blocks': [{'type': 'hero', 'heading': 'Build faster'}],
},
{
'slug': 'about',
'title': 'About Us',
'type': 'about',
'blocks': [],
},
],
}
result = self.function.save_output(parsed, {'blueprint': self.blueprint})
self.blueprint.refresh_from_db()
self.assertEqual(self.blueprint.status, 'ready')
self.assertEqual(self.blueprint.structure_json['site']['name'], 'Future Robotics')
self.assertEqual(result['pages_created'], 1)
self.assertEqual(result['pages_updated'], 1)
self.assertEqual(result['pages_deleted'], 1)
slugs = set(self.blueprint.pages.values_list('slug', flat=True))
self.assertIn('home', slugs)
self.assertIn('about', slugs)
self.assertNotIn(legacy_page.slug, slugs)
def test_build_prompt_includes_existing_pages(self):
# Convert structure to JSON to ensure template rendering stays stable.
data = self.function.prepare(
payload={'ids': [self.blueprint.id]},
account=self.account,
)
prompt = self.function.build_prompt(data, account=self.account)
self.assertIn(self.blueprint.name, prompt)
self.assertIn('Home', prompt)
# The prompt should mention hosting type and objectives in JSON context.
self.assertIn(self.blueprint.hosting_type, prompt)
for objective in self.blueprint.config_json.get('objectives', []):
self.assertIn(objective, prompt)

View File

@@ -1,116 +0,0 @@
"""
Test script for AI functions
Run this to verify all AI functions work with console logging
"""
import os
import sys
import django
# Setup Django
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../../../'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'igny8.settings')
django.setup()
from igny8_core.ai.functions.auto_cluster import AutoClusterFunction
from igny8_core.ai.functions.generate_images import generate_images_core
from igny8_core.ai.ai_core import AICore
def test_ai_core():
"""Test AICore.run_ai_request() directly"""
print("\n" + "="*80)
print("TEST 1: AICore.run_ai_request() - Direct API Call")
print("="*80)
ai_core = AICore()
result = ai_core.run_ai_request(
prompt="Say 'Hello, World!' in JSON format: {\"message\": \"your message\"}",
max_tokens=100,
function_name='test_ai_core'
)
if result.get('error'):
print(f"❌ Error: {result['error']}")
else:
print(f"✅ Success! Content: {result.get('content', '')[:100]}")
print(f" Tokens: {result.get('total_tokens')}, Cost: ${result.get('cost', 0):.6f}")
def test_auto_cluster():
"""Test auto cluster function"""
print("\n" + "="*80)
print("TEST 2: Auto Cluster Function")
print("="*80)
print("Note: This requires actual keyword IDs in the database")
print("Skipping - requires database setup")
# Uncomment to test with real data:
# fn = AutoClusterFunction()
# result = fn.validate({'ids': [1, 2, 3]})
# print(f"Validation result: {result}")
def test_generate_content():
"""Test generate content function"""
print("\n" + "="*80)
print("TEST 3: Generate Content Function")
print("="*80)
print("Note: This requires actual task IDs in the database")
print("Skipping - requires database setup")
def test_generate_images():
"""Test generate images function"""
print("\n" + "="*80)
print("TEST 4: Generate Images Function")
print("="*80)
print("Note: This requires actual task IDs in the database")
print("Skipping - requires database setup")
# Uncomment to test with real data:
# result = generate_images_core(task_ids=[1], account_id=1)
# print(f"Result: {result}")
def test_json_extraction():
"""Test JSON extraction"""
print("\n" + "="*80)
print("TEST 5: JSON Extraction")
print("="*80)
ai_core = AICore()
# Test 1: Direct JSON
json_text = '{"clusters": [{"name": "Test", "keywords": ["test"]}]}'
result = ai_core.extract_json(json_text)
print(f"✅ Direct JSON: {result is not None}")
# Test 2: JSON in markdown
json_markdown = '```json\n{"clusters": [{"name": "Test"}]}\n```'
result = ai_core.extract_json(json_markdown)
print(f"✅ JSON in markdown: {result is not None}")
# Test 3: Invalid JSON
invalid_json = "This is not JSON"
result = ai_core.extract_json(invalid_json)
print(f"✅ Invalid JSON handled: {result is None}")
if __name__ == '__main__':
print("\n" + "="*80)
print("AI FUNCTIONS TEST SUITE")
print("="*80)
print("Testing all AI functions with console logging enabled")
print("="*80)
# Run tests
test_ai_core()
test_json_extraction()
test_auto_cluster()
test_generate_content()
test_generate_images()
print("\n" + "="*80)
print("TEST SUITE COMPLETE")
print("="*80)
print("\nAll console logging should be visible above.")
print("Check for [AI][function_name] Step X: messages")

View File

@@ -67,16 +67,10 @@ class JWTAuthentication(BaseAuthentication):
try:
account = Account.objects.get(id=account_id)
except Account.DoesNotExist:
pass
if not account:
try:
account = getattr(user, 'account', None)
except (AttributeError, Exception):
# If account access fails, set to None
# Account from token doesn't exist - don't fallback, set to None
account = None
# Set account on request
# Set account on request (only if account_id was in token and account exists)
request.account = account
return (user, token)

View File

@@ -1,25 +0,0 @@
#!/usr/bin/env python
"""
Test runner script for API tests
Run all tests: python manage.py test igny8_core.api.tests
Run specific test: python manage.py test igny8_core.api.tests.test_response
"""
import os
import sys
import django
# Setup Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'igny8_core.settings')
django.setup()
from django.core.management import execute_from_command_line
if __name__ == '__main__':
# Run all API tests
if len(sys.argv) > 1:
# Custom test specified
execute_from_command_line(['manage.py', 'test'] + sys.argv[1:])
else:
# Run all API tests
execute_from_command_line(['manage.py', 'test', 'igny8_core.api.tests', '--verbosity=2'])

View File

@@ -8,7 +8,7 @@ from django.db.models import Q
from igny8_core.auth.models import Account, User, Site, Sector
from igny8_core.modules.planner.models import Keywords, Clusters, ContentIdeas
from igny8_core.modules.writer.models import Tasks, Images, Content
from igny8_core.modules.billing.models import CreditTransaction, CreditUsageLog
from igny8_core.business.billing.models import CreditTransaction, CreditUsageLog
from igny8_core.modules.system.models import AIPrompt, IntegrationSettings, AuthorProfile, Strategy
from igny8_core.modules.system.settings_models import AccountSettings, UserSettings, ModuleSettings, AISettings

View File

@@ -76,7 +76,6 @@ class AccountContextMiddleware(MiddlewareMixin):
if not JWT_AVAILABLE:
# JWT library not installed yet - skip for now
request.account = None
request.user = None
return None
# Decode JWT token with signature verification
@@ -94,42 +93,30 @@ class AccountContextMiddleware(MiddlewareMixin):
if user_id:
from .models import User, Account
try:
# Refresh user from DB with account and plan relationships to get latest data
# This ensures changes to account/plan are reflected immediately without re-login
# Get user from DB (but don't set request.user - let DRF authentication handle that)
# Only set request.account for account context
user = User.objects.select_related('account', 'account__plan').get(id=user_id)
request.user = user
if account_id:
# Verify account still exists and matches user
account = Account.objects.get(id=account_id)
# If user's account changed, use the new one from user object
if user.account and user.account.id != account_id:
request.account = user.account
else:
request.account = account
else:
# Verify account still exists
try:
user_account = getattr(user, 'account', None)
if user_account:
request.account = user_account
else:
request.account = None
except (AttributeError, Exception):
# If account access fails (e.g., column mismatch), set to None
account = Account.objects.get(id=account_id)
request.account = account
except Account.DoesNotExist:
# Account from token doesn't exist - don't fallback, set to None
request.account = None
else:
# No account_id in token - set to None (don't fallback to user.account)
request.account = None
except (User.DoesNotExist, Account.DoesNotExist):
request.account = None
request.user = None
else:
request.account = None
request.user = None
except jwt.InvalidTokenError:
request.account = None
request.user = None
except Exception:
# Fail silently for now - allow unauthenticated access
request.account = None
request.user = None
return None

View File

@@ -0,0 +1,5 @@
"""
Business logic layer - Models and Services
Separated from API layer (modules/) for clean architecture
"""

View File

@@ -0,0 +1,4 @@
"""
Automation business logic - AutomationRule, ScheduledTask models and services
"""

View File

@@ -0,0 +1,100 @@
# Generated manually for Phase 2: Automation System
import django.core.validators
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('igny8_core_auth', '0008_passwordresettoken_alter_industry_options_and_more'),
]
operations = [
migrations.CreateModel(
name='AutomationRule',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('name', models.CharField(help_text='Rule name', max_length=255)),
('description', models.TextField(blank=True, help_text='Rule description', null=True)),
('trigger', models.CharField(choices=[('schedule', 'Schedule'), ('event', 'Event'), ('manual', 'Manual')], default='manual', max_length=50)),
('schedule', models.CharField(blank=True, help_text="Cron-like schedule string (e.g., '0 0 * * *' for daily at midnight)", max_length=100, null=True)),
('conditions', models.JSONField(default=list, help_text='List of conditions that must be met for rule to execute')),
('actions', models.JSONField(default=list, help_text='List of actions to execute when rule triggers')),
('is_active', models.BooleanField(default=True, help_text='Whether rule is active')),
('status', models.CharField(choices=[('active', 'Active'), ('inactive', 'Inactive'), ('paused', 'Paused')], default='active', max_length=50)),
('last_executed_at', models.DateTimeField(blank=True, null=True)),
('execution_count', models.IntegerField(default=0, validators=[django.core.validators.MinValueValidator(0)])),
('metadata', models.JSONField(default=dict, help_text='Additional metadata')),
('account', models.ForeignKey(db_column='tenant_id', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_set', to='igny8_core_auth.account')),
('site', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='igny8_core_auth.site')),
('sector', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='igny8_core_auth.sector')),
],
options={
'db_table': 'igny8_automation_rules',
'ordering': ['-created_at'],
'verbose_name': 'Automation Rule',
'verbose_name_plural': 'Automation Rules',
},
),
migrations.CreateModel(
name='ScheduledTask',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('scheduled_at', models.DateTimeField(help_text='When the task is scheduled to run')),
('executed_at', models.DateTimeField(blank=True, help_text='When the task was actually executed', null=True)),
('status', models.CharField(choices=[('pending', 'Pending'), ('running', 'Running'), ('completed', 'Completed'), ('failed', 'Failed'), ('cancelled', 'Cancelled')], default='pending', max_length=50)),
('result', models.JSONField(default=dict, help_text='Execution result data')),
('error_message', models.TextField(blank=True, help_text='Error message if execution failed', null=True)),
('metadata', models.JSONField(default=dict, help_text='Additional metadata')),
('account', models.ForeignKey(db_column='tenant_id', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_set', to='igny8_core_auth.account')),
('automation_rule', models.ForeignKey(help_text='The automation rule this task belongs to', on_delete=django.db.models.deletion.CASCADE, related_name='scheduled_tasks', to='automation.automationrule')),
],
options={
'db_table': 'igny8_scheduled_tasks',
'ordering': ['-scheduled_at'],
'verbose_name': 'Scheduled Task',
'verbose_name_plural': 'Scheduled Tasks',
},
),
migrations.AddIndex(
model_name='automationrule',
index=models.Index(fields=['trigger', 'is_active'], name='igny8_autom_trigger_123abc_idx'),
),
migrations.AddIndex(
model_name='automationrule',
index=models.Index(fields=['status'], name='igny8_autom_status_456def_idx'),
),
migrations.AddIndex(
model_name='automationrule',
index=models.Index(fields=['site', 'sector'], name='igny8_autom_site_id_789ghi_idx'),
),
migrations.AddIndex(
model_name='automationrule',
index=models.Index(fields=['trigger', 'is_active', 'status'], name='igny8_autom_trigger_0abjkl_idx'),
),
migrations.AddIndex(
model_name='scheduledtask',
index=models.Index(fields=['automation_rule', 'status'], name='igny8_sched_automation_123abc_idx'),
),
migrations.AddIndex(
model_name='scheduledtask',
index=models.Index(fields=['scheduled_at', 'status'], name='igny8_sched_scheduled_456def_idx'),
),
migrations.AddIndex(
model_name='scheduledtask',
index=models.Index(fields=['account', 'status'], name='igny8_sched_account_789ghi_idx'),
),
migrations.AddIndex(
model_name='scheduledtask',
index=models.Index(fields=['status', 'scheduled_at'], name='igny8_sched_status_0abjkl_idx'),
),
]

View File

@@ -0,0 +1,143 @@
"""
Automation Models
Phase 2: Automation System
"""
from django.db import models
from django.core.validators import MinValueValidator
from igny8_core.auth.models import SiteSectorBaseModel, AccountBaseModel
import json
class AutomationRule(SiteSectorBaseModel):
"""
Automation Rule model for defining automated workflows.
Rules can be triggered by:
- schedule: Time-based triggers (cron-like)
- event: Event-based triggers (content created, keyword added, etc.)
- manual: Manual execution only
"""
TRIGGER_CHOICES = [
('schedule', 'Schedule'),
('event', 'Event'),
('manual', 'Manual'),
]
STATUS_CHOICES = [
('active', 'Active'),
('inactive', 'Inactive'),
('paused', 'Paused'),
]
name = models.CharField(max_length=255, help_text="Rule name")
description = models.TextField(blank=True, null=True, help_text="Rule description")
# Trigger configuration
trigger = models.CharField(max_length=50, choices=TRIGGER_CHOICES, default='manual')
# Schedule configuration (for schedule triggers)
# Stored as cron-like string: "0 0 * * *" (daily at midnight)
schedule = models.CharField(
max_length=100,
blank=True,
null=True,
help_text="Cron-like schedule string (e.g., '0 0 * * *' for daily at midnight)"
)
# Conditions (JSON field)
# Format: [{"field": "content.status", "operator": "equals", "value": "draft"}, ...]
conditions = models.JSONField(
default=list,
help_text="List of conditions that must be met for rule to execute"
)
# Actions (JSON field)
# Format: [{"type": "generate_content", "params": {...}}, ...]
actions = models.JSONField(
default=list,
help_text="List of actions to execute when rule triggers"
)
# Status
is_active = models.BooleanField(default=True, help_text="Whether rule is active")
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='active')
# Execution tracking
last_executed_at = models.DateTimeField(null=True, blank=True)
execution_count = models.IntegerField(default=0, validators=[MinValueValidator(0)])
# Metadata
metadata = models.JSONField(default=dict, help_text="Additional metadata")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'automation'
db_table = 'igny8_automation_rules'
ordering = ['-created_at']
verbose_name = 'Automation Rule'
verbose_name_plural = 'Automation Rules'
indexes = [
models.Index(fields=['trigger', 'is_active']),
models.Index(fields=['status']),
models.Index(fields=['site', 'sector']),
models.Index(fields=['trigger', 'is_active', 'status']),
]
def __str__(self):
return f"{self.name} ({self.get_trigger_display()})"
class ScheduledTask(AccountBaseModel):
"""
Scheduled Task model for tracking scheduled automation rule executions.
"""
STATUS_CHOICES = [
('pending', 'Pending'),
('running', 'Running'),
('completed', 'Completed'),
('failed', 'Failed'),
('cancelled', 'Cancelled'),
]
automation_rule = models.ForeignKey(
AutomationRule,
on_delete=models.CASCADE,
related_name='scheduled_tasks',
help_text="The automation rule this task belongs to"
)
scheduled_at = models.DateTimeField(help_text="When the task is scheduled to run")
executed_at = models.DateTimeField(null=True, blank=True, help_text="When the task was actually executed")
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='pending')
# Execution results
result = models.JSONField(default=dict, help_text="Execution result data")
error_message = models.TextField(blank=True, null=True, help_text="Error message if execution failed")
# Metadata
metadata = models.JSONField(default=dict, help_text="Additional metadata")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'automation'
db_table = 'igny8_scheduled_tasks'
ordering = ['-scheduled_at']
verbose_name = 'Scheduled Task'
verbose_name_plural = 'Scheduled Tasks'
indexes = [
models.Index(fields=['automation_rule', 'status']),
models.Index(fields=['scheduled_at', 'status']),
models.Index(fields=['account', 'status']),
models.Index(fields=['status', 'scheduled_at']),
]
def __str__(self):
return f"Scheduled task for {self.automation_rule.name} at {self.scheduled_at}"

View File

@@ -0,0 +1,4 @@
"""
Automation services
"""

View File

@@ -0,0 +1,101 @@
"""
Action Executor
Executes rule actions
"""
import logging
from igny8_core.business.planning.services.clustering_service import ClusteringService
from igny8_core.business.planning.services.ideas_service import IdeasService
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
logger = logging.getLogger(__name__)
class ActionExecutor:
"""Executes rule actions"""
def __init__(self):
self.clustering_service = ClusteringService()
self.ideas_service = IdeasService()
self.content_service = ContentGenerationService()
def execute(self, action, context, rule):
"""
Execute a single action.
Args:
action: Action dict with 'type' and 'params'
context: Context dict
rule: AutomationRule instance
Returns:
dict: Action execution result
"""
action_type = action.get('type')
params = action.get('params', {})
if action_type == 'cluster_keywords':
return self._execute_cluster_keywords(params, rule)
elif action_type == 'generate_ideas':
return self._execute_generate_ideas(params, rule)
elif action_type == 'generate_content':
return self._execute_generate_content(params, rule)
else:
logger.warning(f"Unknown action type: {action_type}")
return {
'success': False,
'error': f'Unknown action type: {action_type}'
}
def _execute_cluster_keywords(self, params, rule):
"""Execute cluster keywords action"""
keyword_ids = params.get('keyword_ids', [])
sector_id = params.get('sector_id') or (rule.sector.id if rule.sector else None)
try:
result = self.clustering_service.cluster_keywords(
keyword_ids=keyword_ids,
account=rule.account,
sector_id=sector_id
)
return result
except Exception as e:
logger.error(f"Error clustering keywords: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}
def _execute_generate_ideas(self, params, rule):
"""Execute generate ideas action"""
cluster_ids = params.get('cluster_ids', [])
try:
result = self.ideas_service.generate_ideas(
cluster_ids=cluster_ids,
account=rule.account
)
return result
except Exception as e:
logger.error(f"Error generating ideas: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}
def _execute_generate_content(self, params, rule):
"""Execute generate content action"""
task_ids = params.get('task_ids', [])
try:
result = self.content_service.generate_content(
task_ids=task_ids,
account=rule.account
)
return result
except Exception as e:
logger.error(f"Error generating content: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}

View File

@@ -0,0 +1,141 @@
"""
Automation Service
Main service for executing automation rules
"""
import logging
from django.utils import timezone
from igny8_core.business.automation.models import AutomationRule, ScheduledTask
from igny8_core.business.automation.services.rule_engine import RuleEngine
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
logger = logging.getLogger(__name__)
class AutomationService:
"""Service for executing automation rules"""
def __init__(self):
self.rule_engine = RuleEngine()
self.credit_service = CreditService()
def execute_rule(self, rule, context=None):
"""
Execute an automation rule.
Args:
rule: AutomationRule instance
context: Optional context dict for condition evaluation
Returns:
dict: Execution result with status and data
"""
if not rule.is_active or rule.status != 'active':
return {
'status': 'skipped',
'reason': 'Rule is inactive',
'rule_id': rule.id
}
# Check credits (estimate based on actions)
estimated_credits = self._estimate_credits(rule)
try:
self.credit_service.check_credits_legacy(rule.account, estimated_credits)
except InsufficientCreditsError as e:
logger.warning(f"Rule {rule.id} skipped: {str(e)}")
return {
'status': 'skipped',
'reason': f'Insufficient credits: {str(e)}',
'rule_id': rule.id
}
# Execute via rule engine
try:
result = self.rule_engine.execute(rule, context or {})
# Update rule tracking
rule.last_executed_at = timezone.now()
rule.execution_count += 1
rule.save(update_fields=['last_executed_at', 'execution_count'])
return {
'status': 'completed',
'rule_id': rule.id,
'result': result
}
except Exception as e:
logger.error(f"Error executing rule {rule.id}: {str(e)}", exc_info=True)
return {
'status': 'failed',
'reason': str(e),
'rule_id': rule.id
}
def _estimate_credits(self, rule):
"""Estimate credits needed for rule execution"""
# Simple estimation based on action types
estimated = 0
for action in rule.actions:
action_type = action.get('type', '')
if 'cluster' in action_type:
estimated += 10
elif 'idea' in action_type:
estimated += 15
elif 'content' in action_type:
estimated += 50 # Conservative estimate
else:
estimated += 5 # Default
return max(estimated, 10) # Minimum 10 credits
def execute_scheduled_rules(self):
"""
Execute all scheduled rules that are due.
Called by Celery Beat task.
Returns:
dict: Summary of executions
"""
from django.utils import timezone
now = timezone.now()
# Get active scheduled rules
rules = AutomationRule.objects.filter(
trigger='schedule',
is_active=True,
status='active'
)
executed = 0
skipped = 0
failed = 0
for rule in rules:
# Check if rule should execute based on schedule
if self._should_execute_schedule(rule, now):
result = self.execute_rule(rule)
if result['status'] == 'completed':
executed += 1
elif result['status'] == 'skipped':
skipped += 1
else:
failed += 1
return {
'executed': executed,
'skipped': skipped,
'failed': failed,
'total': len(rules)
}
def _should_execute_schedule(self, rule, now):
"""
Check if a scheduled rule should execute now.
Simple implementation - can be enhanced with proper cron parsing.
"""
if not rule.schedule:
return False
# For now, simple check - can be enhanced with cron parser
# This is a placeholder - proper implementation would parse cron string
return True # Simplified for now

View File

@@ -0,0 +1,104 @@
"""
Condition Evaluator
Evaluates rule conditions
"""
import logging
logger = logging.getLogger(__name__)
class ConditionEvaluator:
"""Evaluates rule conditions"""
OPERATORS = {
'equals': lambda a, b: a == b,
'not_equals': lambda a, b: a != b,
'greater_than': lambda a, b: a > b,
'greater_than_or_equal': lambda a, b: a >= b,
'less_than': lambda a, b: a < b,
'less_than_or_equal': lambda a, b: a <= b,
'in': lambda a, b: a in b,
'contains': lambda a, b: b in a if isinstance(a, str) else a in b,
'is_empty': lambda a, b: not a or (isinstance(a, str) and not a.strip()),
'is_not_empty': lambda a, b: a and (not isinstance(a, str) or a.strip()),
}
def evaluate(self, conditions, context):
"""
Evaluate a list of conditions.
Args:
conditions: List of condition dicts
context: Context dict for field resolution
Returns:
bool: True if all conditions are met
"""
if not conditions:
return True
for condition in conditions:
if not self._evaluate_condition(condition, context):
return False
return True
def _evaluate_condition(self, condition, context):
"""
Evaluate a single condition.
Condition format:
{
"field": "content.status",
"operator": "equals",
"value": "draft"
}
"""
field_path = condition.get('field')
operator = condition.get('operator', 'equals')
expected_value = condition.get('value')
if not field_path:
logger.warning("Condition missing 'field'")
return False
# Resolve field value from context
actual_value = self._resolve_field(field_path, context)
# Get operator function
op_func = self.OPERATORS.get(operator)
if not op_func:
logger.warning(f"Unknown operator: {operator}")
return False
# Evaluate
try:
return op_func(actual_value, expected_value)
except Exception as e:
logger.error(f"Error evaluating condition: {str(e)}", exc_info=True)
return False
def _resolve_field(self, field_path, context):
"""
Resolve a field path from context.
Examples:
- "content.status" -> context['content']['status']
- "count" -> context['count']
"""
parts = field_path.split('.')
value = context
for part in parts:
if isinstance(value, dict):
value = value.get(part)
elif hasattr(value, part):
value = getattr(value, part)
else:
return None
if value is None:
return None
return value

View File

@@ -0,0 +1,61 @@
"""
Rule Engine
Orchestrates rule execution
"""
import logging
from igny8_core.business.automation.services.condition_evaluator import ConditionEvaluator
from igny8_core.business.automation.services.action_executor import ActionExecutor
logger = logging.getLogger(__name__)
class RuleEngine:
"""Orchestrates rule execution"""
def __init__(self):
self.condition_evaluator = ConditionEvaluator()
self.action_executor = ActionExecutor()
def execute(self, rule, context):
"""
Execute a rule by evaluating conditions and executing actions.
Args:
rule: AutomationRule instance
context: Context dict for evaluation
Returns:
dict: Execution results
"""
# Evaluate conditions
if rule.conditions:
conditions_met = self.condition_evaluator.evaluate(rule.conditions, context)
if not conditions_met:
return {
'success': False,
'reason': 'Conditions not met'
}
# Execute actions
action_results = []
for action in rule.actions:
try:
result = self.action_executor.execute(action, context, rule)
action_results.append({
'action': action,
'success': True,
'result': result
})
except Exception as e:
logger.error(f"Action execution failed: {str(e)}", exc_info=True)
action_results.append({
'action': action,
'success': False,
'error': str(e)
})
return {
'success': True,
'actions': action_results
}

View File

@@ -0,0 +1,28 @@
"""
Automation Celery Tasks
"""
from celery import shared_task
import logging
from igny8_core.business.automation.services.automation_service import AutomationService
logger = logging.getLogger(__name__)
@shared_task(name='igny8_core.business.automation.tasks.execute_scheduled_automation_rules')
def execute_scheduled_automation_rules():
"""
Execute all scheduled automation rules.
Called by Celery Beat.
"""
try:
service = AutomationService()
result = service.execute_scheduled_rules()
logger.info(f"Executed scheduled automation rules: {result}")
return result
except Exception as e:
logger.error(f"Error executing scheduled automation rules: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}

View File

@@ -0,0 +1,4 @@
"""
Billing business logic - CreditTransaction, CreditUsageLog models and services
"""

View File

@@ -0,0 +1,21 @@
"""
Credit Cost Constants
Phase 0: Credit-only system costs per operation
"""
CREDIT_COSTS = {
'clustering': 10, # Per clustering request
'idea_generation': 15, # Per cluster → ideas request
'content_generation': 1, # Per 100 words
'image_prompt_extraction': 2, # Per content piece
'image_generation': 5, # Per image
'linking': 8, # Per content piece (NEW)
'optimization': 1, # Per 200 words (NEW)
'site_structure_generation': 50, # Per site blueprint (NEW)
'site_page_generation': 20, # Per page (NEW)
# Legacy operation types (for backward compatibility)
'ideas': 15, # Alias for idea_generation
'content': 3, # Legacy: 3 credits per content piece
'images': 5, # Alias for image_generation
'reparse': 1, # Per reparse
}

View File

@@ -0,0 +1,14 @@
"""
Billing Exceptions
"""
class InsufficientCreditsError(Exception):
"""Raised when account doesn't have enough credits"""
pass
class CreditCalculationError(Exception):
"""Raised when credit calculation fails"""
pass

View File

@@ -0,0 +1,77 @@
"""
Billing Models for Credit System
"""
from django.db import models
from django.core.validators import MinValueValidator
from igny8_core.auth.models import AccountBaseModel
class CreditTransaction(AccountBaseModel):
"""Track all credit transactions (additions, deductions)"""
TRANSACTION_TYPE_CHOICES = [
('purchase', 'Purchase'),
('subscription', 'Subscription Renewal'),
('refund', 'Refund'),
('deduction', 'Usage Deduction'),
('adjustment', 'Manual Adjustment'),
]
transaction_type = models.CharField(max_length=20, choices=TRANSACTION_TYPE_CHOICES, db_index=True)
amount = models.IntegerField(help_text="Positive for additions, negative for deductions")
balance_after = models.IntegerField(help_text="Credit balance after this transaction")
description = models.CharField(max_length=255)
metadata = models.JSONField(default=dict, help_text="Additional context (AI call details, etc.)")
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
app_label = 'billing'
db_table = 'igny8_credit_transactions'
ordering = ['-created_at']
indexes = [
models.Index(fields=['account', 'transaction_type']),
models.Index(fields=['account', 'created_at']),
]
def __str__(self):
account = getattr(self, 'account', None)
return f"{self.get_transaction_type_display()} - {self.amount} credits - {account.name if account else 'No Account'}"
class CreditUsageLog(AccountBaseModel):
"""Detailed log of credit usage per AI operation"""
OPERATION_TYPE_CHOICES = [
('clustering', 'Keyword Clustering'),
('idea_generation', 'Content Ideas Generation'),
('content_generation', 'Content Generation'),
('image_generation', 'Image Generation'),
('reparse', 'Content Reparse'),
('ideas', 'Content Ideas Generation'), # Legacy
('content', 'Content Generation'), # Legacy
('images', 'Image Generation'), # Legacy
]
operation_type = models.CharField(max_length=50, choices=OPERATION_TYPE_CHOICES, db_index=True)
credits_used = models.IntegerField(validators=[MinValueValidator(0)])
cost_usd = models.DecimalField(max_digits=10, decimal_places=4, null=True, blank=True)
model_used = models.CharField(max_length=100, blank=True)
tokens_input = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0)])
tokens_output = models.IntegerField(null=True, blank=True, validators=[MinValueValidator(0)])
related_object_type = models.CharField(max_length=50, blank=True) # 'keyword', 'cluster', 'task'
related_object_id = models.IntegerField(null=True, blank=True)
metadata = models.JSONField(default=dict)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
app_label = 'billing'
db_table = 'igny8_credit_usage_logs'
ordering = ['-created_at']
indexes = [
models.Index(fields=['account', 'operation_type']),
models.Index(fields=['account', 'created_at']),
models.Index(fields=['account', 'operation_type', 'created_at']),
]
def __str__(self):
account = getattr(self, 'account', None)
return f"{self.get_operation_type_display()} - {self.credits_used} credits - {account.name if account else 'No Account'}"

View File

@@ -0,0 +1,4 @@
"""
Billing services
"""

View File

@@ -0,0 +1,264 @@
"""
Credit Service for managing credit transactions and deductions
"""
from django.db import transaction
from django.utils import timezone
from igny8_core.business.billing.models import CreditTransaction, CreditUsageLog
from igny8_core.business.billing.constants import CREDIT_COSTS
from igny8_core.business.billing.exceptions import InsufficientCreditsError, CreditCalculationError
from igny8_core.auth.models import Account
class CreditService:
"""Service for managing credits"""
@staticmethod
def get_credit_cost(operation_type, amount=None):
"""
Get credit cost for operation.
Args:
operation_type: Type of operation (from CREDIT_COSTS)
amount: Optional amount (word count, image count, etc.)
Returns:
int: Number of credits required
Raises:
CreditCalculationError: If operation type is unknown
"""
base_cost = CREDIT_COSTS.get(operation_type, 0)
if base_cost == 0:
raise CreditCalculationError(f"Unknown operation type: {operation_type}")
# Variable cost operations
if operation_type == 'content_generation' and amount:
# Per 100 words
return max(1, int(base_cost * (amount / 100)))
elif operation_type == 'optimization' and amount:
# Per 200 words
return max(1, int(base_cost * (amount / 200)))
elif operation_type == 'image_generation' and amount:
# Per image
return base_cost * amount
elif operation_type == 'idea_generation' and amount:
# Per idea
return base_cost * amount
# Fixed cost operations
return base_cost
@staticmethod
def check_credits(account, operation_type, amount=None):
"""
Check if account has sufficient credits for an operation.
Args:
account: Account instance
operation_type: Type of operation
amount: Optional amount (word count, image count, etc.)
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
required = CreditService.get_credit_cost(operation_type, amount)
if account.credits < required:
raise InsufficientCreditsError(
f"Insufficient credits. Required: {required}, Available: {account.credits}"
)
return True
@staticmethod
def check_credits_legacy(account, required_credits):
"""
Legacy method: Check if account has enough credits (for backward compatibility).
Args:
account: Account instance
required_credits: Number of credits required
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
if account.credits < required_credits:
raise InsufficientCreditsError(
f"Insufficient credits. Required: {required_credits}, Available: {account.credits}"
)
@staticmethod
@transaction.atomic
def deduct_credits(account, amount, operation_type, description, metadata=None, cost_usd=None, model_used=None, tokens_input=None, tokens_output=None, related_object_type=None, related_object_id=None):
"""
Deduct credits and log transaction.
Args:
account: Account instance
amount: Number of credits to deduct
operation_type: Type of operation (from CreditUsageLog.OPERATION_TYPE_CHOICES)
description: Description of the transaction
metadata: Optional metadata dict
cost_usd: Optional cost in USD
model_used: Optional AI model used
tokens_input: Optional input tokens
tokens_output: Optional output tokens
related_object_type: Optional related object type
related_object_id: Optional related object ID
Returns:
int: New credit balance
"""
# Check sufficient credits (legacy: amount is already calculated)
CreditService.check_credits_legacy(account, amount)
# Deduct from account.credits
account.credits -= amount
account.save(update_fields=['credits'])
# Create CreditTransaction
CreditTransaction.objects.create(
account=account,
transaction_type='deduction',
amount=-amount, # Negative for deduction
balance_after=account.credits,
description=description,
metadata=metadata or {}
)
# Create CreditUsageLog
CreditUsageLog.objects.create(
account=account,
operation_type=operation_type,
credits_used=amount,
cost_usd=cost_usd,
model_used=model_used or '',
tokens_input=tokens_input,
tokens_output=tokens_output,
related_object_type=related_object_type or '',
related_object_id=related_object_id,
metadata=metadata or {}
)
return account.credits
@staticmethod
@transaction.atomic
def deduct_credits_for_operation(account, operation_type, amount=None, description=None, metadata=None, cost_usd=None, model_used=None, tokens_input=None, tokens_output=None, related_object_type=None, related_object_id=None):
"""
Deduct credits for an operation (convenience method that calculates cost automatically).
Args:
account: Account instance
operation_type: Type of operation
amount: Optional amount (word count, image count, etc.)
description: Optional description (auto-generated if not provided)
metadata: Optional metadata dict
cost_usd: Optional cost in USD
model_used: Optional AI model used
tokens_input: Optional input tokens
tokens_output: Optional output tokens
related_object_type: Optional related object type
related_object_id: Optional related object ID
Returns:
int: New credit balance
"""
# Calculate credit cost
credits_required = CreditService.get_credit_cost(operation_type, amount)
# Check sufficient credits
CreditService.check_credits(account, operation_type, amount)
# Auto-generate description if not provided
if not description:
if operation_type == 'clustering':
description = f"Clustering operation"
elif operation_type == 'idea_generation':
description = f"Generated {amount or 1} idea(s)"
elif operation_type == 'content_generation':
description = f"Generated content ({amount or 0} words)"
elif operation_type == 'image_generation':
description = f"Generated {amount or 1} image(s)"
else:
description = f"{operation_type} operation"
return CreditService.deduct_credits(
account=account,
amount=credits_required,
operation_type=operation_type,
description=description,
metadata=metadata,
cost_usd=cost_usd,
model_used=model_used,
tokens_input=tokens_input,
tokens_output=tokens_output,
related_object_type=related_object_type,
related_object_id=related_object_id
)
@staticmethod
@transaction.atomic
def add_credits(account, amount, transaction_type, description, metadata=None):
"""
Add credits (purchase, subscription, etc.).
Args:
account: Account instance
amount: Number of credits to add
transaction_type: Type of transaction (from CreditTransaction.TRANSACTION_TYPE_CHOICES)
description: Description of the transaction
metadata: Optional metadata dict
Returns:
int: New credit balance
"""
# Add to account.credits
account.credits += amount
account.save(update_fields=['credits'])
# Create CreditTransaction
CreditTransaction.objects.create(
account=account,
transaction_type=transaction_type,
amount=amount, # Positive for addition
balance_after=account.credits,
description=description,
metadata=metadata or {}
)
return account.credits
@staticmethod
def calculate_credits_for_operation(operation_type, **kwargs):
"""
Calculate credits needed for an operation.
Legacy method - use get_credit_cost() instead.
Args:
operation_type: Type of operation
**kwargs: Operation-specific parameters
Returns:
int: Number of credits required
Raises:
CreditCalculationError: If calculation fails
"""
# Map legacy operation types
if operation_type == 'ideas':
operation_type = 'idea_generation'
elif operation_type == 'content':
operation_type = 'content_generation'
elif operation_type == 'images':
operation_type = 'image_generation'
# Extract amount from kwargs
amount = None
if 'word_count' in kwargs:
amount = kwargs.get('word_count')
elif 'image_count' in kwargs:
amount = kwargs.get('image_count')
elif 'idea_count' in kwargs:
amount = kwargs.get('idea_count')
return CreditService.get_credit_cost(operation_type, amount)

View File

@@ -0,0 +1,2 @@
# Billing tests

View File

@@ -0,0 +1,133 @@
"""
Tests for Phase 4 credit deduction
"""
from unittest.mock import patch
from django.test import TestCase
from igny8_core.business.content.models import Content
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.constants import CREDIT_COSTS
from igny8_core.business.billing.exceptions import InsufficientCreditsError
from igny8_core.api.tests.test_integration_base import IntegrationTestBase
class Phase4CreditTests(IntegrationTestBase):
"""Tests for Phase 4 credit deduction"""
def setUp(self):
super().setUp()
# Set initial credits
self.account.credits = 1000
self.account.save()
def test_linking_deducts_correct_credits(self):
"""Test that linking deducts correct credits"""
cost = CreditService.get_credit_cost('linking')
expected_cost = CREDIT_COSTS.get('linking', 0)
self.assertEqual(cost, expected_cost)
self.assertEqual(cost, 8) # From constants
def test_optimization_deducts_correct_credits(self):
"""Test that optimization deducts correct credits based on word count"""
word_count = 500
cost = CreditService.get_credit_cost('optimization', word_count)
# Should be 1 credit per 200 words, so 500 words = 3 credits (max(1, 1 * 500/200) = 3)
expected = max(1, int(CREDIT_COSTS.get('optimization', 1) * (word_count / 200)))
self.assertEqual(cost, expected)
def test_optimization_credits_per_entry_point(self):
"""Test that optimization credits are same regardless of entry point"""
word_count = 400
# All entry points should use same credit calculation
cost = CreditService.get_credit_cost('optimization', word_count)
# 400 words = 2 credits (1 * 400/200)
self.assertEqual(cost, 2)
@patch('igny8_core.business.billing.services.credit_service.CreditService.deduct_credits')
def test_pipeline_deducts_credits_at_each_stage(self, mock_deduct):
"""Test that pipeline deducts credits at each stage"""
from igny8_core.business.content.services.content_pipeline_service import ContentPipelineService
from igny8_core.business.linking.services.linker_service import LinkerService
from igny8_core.business.optimization.services.optimizer_service import OptimizerService
content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test",
word_count=400,
source='igny8'
)
# Mock the services
with patch.object(LinkerService, 'process') as mock_link, \
patch.object(OptimizerService, 'optimize_from_writer') as mock_optimize:
mock_link.return_value = content
mock_optimize.return_value = content
service = ContentPipelineService()
service.process_writer_content(content.id)
# Should deduct credits for both linking and optimization
self.assertGreater(mock_deduct.call_count, 0)
def test_insufficient_credits_blocks_linking(self):
"""Test that insufficient credits blocks linking"""
self.account.credits = 5 # Less than linking cost (8)
self.account.save()
with self.assertRaises(InsufficientCreditsError):
CreditService.check_credits(self.account, 'linking')
def test_insufficient_credits_blocks_optimization(self):
"""Test that insufficient credits blocks optimization"""
self.account.credits = 1 # Less than optimization cost for 500 words
self.account.save()
with self.assertRaises(InsufficientCreditsError):
CreditService.check_credits(self.account, 'optimization', 500)
def test_credit_deduction_logged(self):
"""Test that credit deduction is logged"""
from igny8_core.business.billing.models import CreditUsageLog
initial_credits = self.account.credits
cost = CreditService.get_credit_cost('linking')
CreditService.deduct_credits_for_operation(
account=self.account,
operation_type='linking',
description="Test linking"
)
self.account.refresh_from_db()
self.assertEqual(self.account.credits, initial_credits - cost)
# Check that usage log was created
log = CreditUsageLog.objects.filter(
account=self.account,
operation_type='linking'
).first()
self.assertIsNotNone(log)
def test_batch_operations_deduct_multiple_credits(self):
"""Test that batch operations deduct multiple credits"""
initial_credits = self.account.credits
linking_cost = CreditService.get_credit_cost('linking')
# Deduct for 3 linking operations
for i in range(3):
CreditService.deduct_credits_for_operation(
account=self.account,
operation_type='linking',
description=f"Linking {i}"
)
self.account.refresh_from_db()
expected_credits = initial_credits - (linking_cost * 3)
self.assertEqual(self.account.credits, expected_credits)

View File

@@ -0,0 +1,4 @@
"""
Content business logic - Content, Tasks, Images models and services
"""

View File

@@ -0,0 +1,257 @@
from django.db import models
from django.core.validators import MinValueValidator
from igny8_core.auth.models import SiteSectorBaseModel
class Tasks(SiteSectorBaseModel):
"""Tasks model for content generation queue"""
STATUS_CHOICES = [
('queued', 'Queued'),
('completed', 'Completed'),
]
CONTENT_STRUCTURE_CHOICES = [
('cluster_hub', 'Cluster Hub'),
('landing_page', 'Landing Page'),
('pillar_page', 'Pillar Page'),
('supporting_page', 'Supporting Page'),
]
CONTENT_TYPE_CHOICES = [
('blog_post', 'Blog Post'),
('article', 'Article'),
('guide', 'Guide'),
('tutorial', 'Tutorial'),
]
title = models.CharField(max_length=255, db_index=True)
description = models.TextField(blank=True, null=True)
keywords = models.CharField(max_length=500, blank=True) # Comma-separated keywords (legacy)
cluster = models.ForeignKey(
'planner.Clusters',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='tasks',
limit_choices_to={'sector': models.F('sector')}
)
keyword_objects = models.ManyToManyField(
'planner.Keywords',
blank=True,
related_name='tasks',
help_text="Individual keywords linked to this task"
)
idea = models.ForeignKey(
'planner.ContentIdeas',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='tasks'
)
content_structure = models.CharField(max_length=50, choices=CONTENT_STRUCTURE_CHOICES, default='blog_post')
content_type = models.CharField(max_length=50, choices=CONTENT_TYPE_CHOICES, default='blog_post')
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='queued')
# Content fields
content = models.TextField(blank=True, null=True) # Generated content
word_count = models.IntegerField(default=0)
# SEO fields
meta_title = models.CharField(max_length=255, blank=True, null=True)
meta_description = models.TextField(blank=True, null=True)
# WordPress integration
assigned_post_id = models.IntegerField(null=True, blank=True) # WordPress post ID if published
post_url = models.URLField(blank=True, null=True) # WordPress post URL
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'writer'
db_table = 'igny8_tasks'
ordering = ['-created_at']
verbose_name = 'Task'
verbose_name_plural = 'Tasks'
indexes = [
models.Index(fields=['title']),
models.Index(fields=['status']),
models.Index(fields=['cluster']),
models.Index(fields=['content_type']),
models.Index(fields=['site', 'sector']),
]
def __str__(self):
return self.title
class Content(SiteSectorBaseModel):
"""
Content model for storing final AI-generated article content.
Separated from Task for content versioning and storage optimization.
"""
task = models.OneToOneField(
Tasks,
on_delete=models.CASCADE,
null=True,
blank=True,
related_name='content_record',
help_text="The task this content belongs to"
)
html_content = models.TextField(help_text="Final AI-generated HTML content")
word_count = models.IntegerField(default=0, validators=[MinValueValidator(0)])
metadata = models.JSONField(default=dict, help_text="Additional metadata (SEO, structure, etc.)")
title = models.CharField(max_length=255, blank=True, null=True)
meta_title = models.CharField(max_length=255, blank=True, null=True)
meta_description = models.TextField(blank=True, null=True)
primary_keyword = models.CharField(max_length=255, blank=True, null=True)
secondary_keywords = models.JSONField(default=list, blank=True, help_text="List of secondary keywords")
tags = models.JSONField(default=list, blank=True, help_text="List of tags")
categories = models.JSONField(default=list, blank=True, help_text="List of categories")
STATUS_CHOICES = [
('draft', 'Draft'),
('review', 'Review'),
('publish', 'Publish'),
]
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='draft', help_text="Content workflow status (draft, review, publish)")
generated_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
# Phase 4: Source tracking
SOURCE_CHOICES = [
('igny8', 'IGNY8 Generated'),
('wordpress', 'WordPress Synced'),
('shopify', 'Shopify Synced'),
('custom', 'Custom API Synced'),
]
source = models.CharField(
max_length=50,
choices=SOURCE_CHOICES,
default='igny8',
db_index=True,
help_text="Source of the content"
)
SYNC_STATUS_CHOICES = [
('native', 'Native IGNY8 Content'),
('imported', 'Imported from External'),
('synced', 'Synced from External'),
]
sync_status = models.CharField(
max_length=50,
choices=SYNC_STATUS_CHOICES,
default='native',
db_index=True,
help_text="Sync status of the content"
)
# External reference fields
external_id = models.CharField(max_length=255, blank=True, null=True, help_text="External platform ID")
external_url = models.URLField(blank=True, null=True, help_text="External platform URL")
sync_metadata = models.JSONField(default=dict, blank=True, help_text="Platform-specific sync metadata")
# Phase 4: Linking fields
internal_links = models.JSONField(default=list, blank=True, help_text="Internal links added by linker")
linker_version = models.IntegerField(default=0, help_text="Version of linker processing")
# Phase 4: Optimization fields
optimizer_version = models.IntegerField(default=0, help_text="Version of optimizer processing")
optimization_scores = models.JSONField(default=dict, blank=True, help_text="Optimization scores (SEO, readability, engagement)")
class Meta:
app_label = 'writer'
db_table = 'igny8_content'
ordering = ['-generated_at']
verbose_name = 'Content'
verbose_name_plural = 'Contents'
indexes = [
models.Index(fields=['task']),
models.Index(fields=['generated_at']),
models.Index(fields=['source']),
models.Index(fields=['sync_status']),
models.Index(fields=['source', 'sync_status']),
]
def save(self, *args, **kwargs):
"""Automatically set account, site, and sector from task"""
if self.task_id: # Check task_id instead of accessing task to avoid RelatedObjectDoesNotExist
try:
self.account = self.task.account
self.site = self.task.site
self.sector = self.task.sector
except self.task.RelatedObjectDoesNotExist:
pass # Task doesn't exist, skip
super().save(*args, **kwargs)
def __str__(self):
return f"Content for {self.task.title}"
class Images(SiteSectorBaseModel):
"""Images model for content-related images (featured, desktop, mobile, in-article)"""
IMAGE_TYPE_CHOICES = [
('featured', 'Featured Image'),
('desktop', 'Desktop Image'),
('mobile', 'Mobile Image'),
('in_article', 'In-Article Image'),
]
content = models.ForeignKey(
Content,
on_delete=models.CASCADE,
related_name='images',
null=True,
blank=True,
help_text="The content this image belongs to (preferred)"
)
task = models.ForeignKey(
Tasks,
on_delete=models.CASCADE,
related_name='images',
null=True,
blank=True,
help_text="The task this image belongs to (legacy, use content instead)"
)
image_type = models.CharField(max_length=50, choices=IMAGE_TYPE_CHOICES, default='featured')
image_url = models.CharField(max_length=500, blank=True, null=True, help_text="URL of the generated/stored image")
image_path = models.CharField(max_length=500, blank=True, null=True, help_text="Local path if stored locally")
prompt = models.TextField(blank=True, null=True, help_text="Image generation prompt used")
status = models.CharField(max_length=50, default='pending', help_text="Status: pending, generated, failed")
position = models.IntegerField(default=0, help_text="Position for in-article images ordering")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'writer'
db_table = 'igny8_images'
ordering = ['content', 'position', '-created_at']
verbose_name = 'Image'
verbose_name_plural = 'Images'
indexes = [
models.Index(fields=['content', 'image_type']),
models.Index(fields=['task', 'image_type']),
models.Index(fields=['status']),
models.Index(fields=['content', 'position']),
models.Index(fields=['task', 'position']),
]
def save(self, *args, **kwargs):
"""Automatically set account, site, and sector from content or task"""
# Prefer content over task
if self.content:
self.account = self.content.account
self.site = self.content.site
self.sector = self.content.sector
elif self.task:
self.account = self.task.account
self.site = self.task.site
self.sector = self.task.sector
super().save(*args, **kwargs)
def __str__(self):
content_title = self.content.title if self.content else None
task_title = self.task.title if self.task else None
title = content_title or task_title or 'Unknown'
return f"{title} - {self.image_type}"

View File

@@ -0,0 +1,8 @@
"""
Content Services
"""
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
from igny8_core.business.content.services.content_pipeline_service import ContentPipelineService
__all__ = ['ContentGenerationService', 'ContentPipelineService']

View File

@@ -0,0 +1,75 @@
"""
Content Generation Service
Handles content generation business logic
"""
import logging
from igny8_core.business.content.models import Tasks
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
logger = logging.getLogger(__name__)
class ContentGenerationService:
"""Service for content generation operations"""
def __init__(self):
self.credit_service = CreditService()
def generate_content(self, task_ids, account):
"""
Generate content for tasks.
Args:
task_ids: List of task IDs
account: Account instance
Returns:
dict: Result with success status and data
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
# Get tasks
tasks = Tasks.objects.filter(id__in=task_ids, account=account)
# Calculate estimated credits needed
total_word_count = sum(task.word_count or 1000 for task in tasks)
# Check credits
try:
self.credit_service.check_credits(account, 'content_generation', total_word_count)
except InsufficientCreditsError:
raise
# Delegate to AI task (actual generation happens in Celery)
from igny8_core.ai.tasks import run_ai_task
try:
if hasattr(run_ai_task, 'delay'):
# Celery available - queue async
task = run_ai_task.delay(
function_name='generate_content',
payload={'ids': task_ids},
account_id=account.id
)
return {
'success': True,
'task_id': str(task.id),
'message': 'Content generation started'
}
else:
# Celery not available - execute synchronously
result = run_ai_task(
function_name='generate_content',
payload={'ids': task_ids},
account_id=account.id
)
return result
except Exception as e:
logger.error(f"Error in generate_content: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}

View File

@@ -0,0 +1,133 @@
"""
Content Pipeline Service
Orchestrates content processing pipeline: Writer → Linker → Optimizer
"""
import logging
from typing import List, Optional
from igny8_core.business.content.models import Content
from igny8_core.business.linking.services.linker_service import LinkerService
from igny8_core.business.optimization.services.optimizer_service import OptimizerService
logger = logging.getLogger(__name__)
class ContentPipelineService:
"""Orchestrates content processing pipeline"""
def __init__(self):
self.linker_service = LinkerService()
self.optimizer_service = OptimizerService()
def process_writer_content(
self,
content_id: int,
stages: Optional[List[str]] = None
) -> Content:
"""
Writer → Linker → Optimizer pipeline.
Args:
content_id: Content ID from Writer
stages: List of stages to run: ['linking', 'optimization'] (default: both)
Returns:
Processed Content instance
"""
if stages is None:
stages = ['linking', 'optimization']
try:
content = Content.objects.get(id=content_id, source='igny8')
except Content.DoesNotExist:
raise ValueError(f"IGNY8 content with id {content_id} does not exist")
# Stage 1: Linking
if 'linking' in stages:
try:
content = self.linker_service.process(content.id)
logger.info(f"Linked content {content_id}")
except Exception as e:
logger.error(f"Error in linking stage for content {content_id}: {str(e)}", exc_info=True)
# Continue to next stage even if linking fails
pass
# Stage 2: Optimization
if 'optimization' in stages:
try:
content = self.optimizer_service.optimize_from_writer(content.id)
logger.info(f"Optimized content {content_id}")
except Exception as e:
logger.error(f"Error in optimization stage for content {content_id}: {str(e)}", exc_info=True)
# Don't fail the whole pipeline
pass
return content
def process_synced_content(
self,
content_id: int,
stages: Optional[List[str]] = None
) -> Content:
"""
Synced Content → Optimizer pipeline (skip linking if needed).
Args:
content_id: Content ID from sync (WordPress, Shopify, etc.)
stages: List of stages to run: ['optimization'] (default: optimization only)
Returns:
Processed Content instance
"""
if stages is None:
stages = ['optimization']
try:
content = Content.objects.get(id=content_id)
except Content.DoesNotExist:
raise ValueError(f"Content with id {content_id} does not exist")
# Stage: Optimization (skip linking for synced content by default)
if 'optimization' in stages:
try:
if content.source == 'wordpress':
content = self.optimizer_service.optimize_from_wordpress_sync(content.id)
elif content.source in ['shopify', 'custom']:
content = self.optimizer_service.optimize_from_external_sync(content.id)
else:
content = self.optimizer_service.optimize_manual(content.id)
logger.info(f"Optimized synced content {content_id}")
except Exception as e:
logger.error(f"Error in optimization stage for content {content_id}: {str(e)}", exc_info=True)
raise
return content
def batch_process_writer_content(
self,
content_ids: List[int],
stages: Optional[List[str]] = None
) -> List[Content]:
"""
Batch process multiple Writer content items.
Args:
content_ids: List of content IDs
stages: List of stages to run
Returns:
List of processed Content instances
"""
results = []
for content_id in content_ids:
try:
result = self.process_writer_content(content_id, stages)
results.append(result)
except Exception as e:
logger.error(f"Error processing content {content_id}: {str(e)}", exc_info=True)
# Continue with other items
continue
return results

View File

@@ -0,0 +1,2 @@
# Content tests

View File

@@ -0,0 +1,185 @@
"""
Tests for ContentPipelineService
"""
from unittest.mock import patch, MagicMock
from django.test import TestCase
from igny8_core.business.content.models import Content
from igny8_core.business.content.services.content_pipeline_service import ContentPipelineService
from igny8_core.api.tests.test_integration_base import IntegrationTestBase
class ContentPipelineServiceTests(IntegrationTestBase):
"""Tests for ContentPipelineService"""
def setUp(self):
super().setUp()
self.service = ContentPipelineService()
# Create writer content
self.writer_content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Writer Content",
html_content="<p>Writer content.</p>",
word_count=500,
status='draft',
source='igny8'
)
# Create synced content
self.synced_content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="WordPress Content",
html_content="<p>WordPress content.</p>",
word_count=500,
status='draft',
source='wordpress'
)
@patch('igny8_core.business.content.services.content_pipeline_service.LinkerService.process')
@patch('igny8_core.business.content.services.content_pipeline_service.OptimizerService.optimize_from_writer')
def test_process_writer_content_full_pipeline(self, mock_optimize, mock_link):
"""Test full pipeline for writer content (linking + optimization)"""
mock_link.return_value = self.writer_content
mock_optimize.return_value = self.writer_content
result = self.service.process_writer_content(self.writer_content.id)
self.assertEqual(result.id, self.writer_content.id)
mock_link.assert_called_once()
mock_optimize.assert_called_once()
@patch('igny8_core.business.content.services.content_pipeline_service.OptimizerService.optimize_from_writer')
def test_process_writer_content_optimization_only(self, mock_optimize):
"""Test writer content with optimization only"""
mock_optimize.return_value = self.writer_content
result = self.service.process_writer_content(
self.writer_content.id,
stages=['optimization']
)
self.assertEqual(result.id, self.writer_content.id)
mock_optimize.assert_called_once()
@patch('igny8_core.business.content.services.content_pipeline_service.LinkerService.process')
def test_process_writer_content_linking_only(self, mock_link):
"""Test writer content with linking only"""
mock_link.return_value = self.writer_content
result = self.service.process_writer_content(
self.writer_content.id,
stages=['linking']
)
self.assertEqual(result.id, self.writer_content.id)
mock_link.assert_called_once()
@patch('igny8_core.business.content.services.content_pipeline_service.LinkerService.process')
@patch('igny8_core.business.content.services.content_pipeline_service.OptimizerService.optimize_from_writer')
def test_process_writer_content_handles_linker_failure(self, mock_optimize, mock_link):
"""Test that pipeline continues when linking fails"""
mock_link.side_effect = Exception("Linking failed")
mock_optimize.return_value = self.writer_content
# Should not raise exception, should continue to optimization
result = self.service.process_writer_content(self.writer_content.id)
self.assertEqual(result.id, self.writer_content.id)
mock_optimize.assert_called_once()
@patch('igny8_core.business.content.services.content_pipeline_service.OptimizerService.optimize_from_wordpress_sync')
def test_process_synced_content_wordpress(self, mock_optimize):
"""Test synced content pipeline for WordPress"""
mock_optimize.return_value = self.synced_content
result = self.service.process_synced_content(self.synced_content.id)
self.assertEqual(result.id, self.synced_content.id)
mock_optimize.assert_called_once()
@patch('igny8_core.business.content.services.content_pipeline_service.OptimizerService.optimize_from_external_sync')
def test_process_synced_content_shopify(self, mock_optimize):
"""Test synced content pipeline for Shopify"""
shopify_content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Shopify Content",
word_count=100,
source='shopify'
)
mock_optimize.return_value = shopify_content
result = self.service.process_synced_content(shopify_content.id)
self.assertEqual(result.id, shopify_content.id)
mock_optimize.assert_called_once()
@patch('igny8_core.business.content.services.content_pipeline_service.OptimizerService.optimize_manual')
def test_process_synced_content_custom(self, mock_optimize):
"""Test synced content pipeline for custom source"""
custom_content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Custom Content",
word_count=100,
source='custom'
)
mock_optimize.return_value = custom_content
result = self.service.process_synced_content(custom_content.id)
self.assertEqual(result.id, custom_content.id)
mock_optimize.assert_called_once()
@patch('igny8_core.business.content.services.content_pipeline_service.ContentPipelineService.process_writer_content')
def test_batch_process_writer_content(self, mock_process):
"""Test batch processing writer content"""
content2 = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Content 2",
word_count=100,
source='igny8'
)
mock_process.side_effect = [self.writer_content, content2]
results = self.service.batch_process_writer_content([
self.writer_content.id,
content2.id
])
self.assertEqual(len(results), 2)
self.assertEqual(mock_process.call_count, 2)
@patch('igny8_core.business.content.services.content_pipeline_service.ContentPipelineService.process_writer_content')
def test_batch_process_handles_partial_failure(self, mock_process):
"""Test batch processing handles partial failures"""
mock_process.side_effect = [self.writer_content, Exception("Failed")]
results = self.service.batch_process_writer_content([
self.writer_content.id,
99999
])
# Should continue processing and return successful results
self.assertEqual(len(results), 1)
self.assertEqual(results[0].id, self.writer_content.id)
def test_process_writer_content_invalid_content(self):
"""Test that ValueError is raised for invalid content"""
with self.assertRaises(ValueError):
self.service.process_writer_content(99999)
def test_process_synced_content_invalid_content(self):
"""Test that ValueError is raised for invalid synced content"""
with self.assertRaises(ValueError):
self.service.process_synced_content(99999)

View File

@@ -0,0 +1,5 @@
"""
Integration Domain
Phase 6: Site Integration & Multi-Destination Publishing
"""

View File

@@ -0,0 +1,6 @@
"""
Linking Business Logic
Phase 4: Linker & Optimizer
"""

View File

@@ -0,0 +1,5 @@
"""
Linking Services
"""

View File

@@ -0,0 +1,117 @@
"""
Link Candidate Engine
Finds relevant content for internal linking
"""
import logging
from typing import List, Dict
from django.db import models
from igny8_core.business.content.models import Content
logger = logging.getLogger(__name__)
class CandidateEngine:
"""Finds link candidates for content"""
def find_candidates(self, content: Content, max_candidates: int = 10) -> List[Dict]:
"""
Find link candidates for a piece of content.
Args:
content: Content instance to find links for
max_candidates: Maximum number of candidates to return
Returns:
List of candidate dicts with: {'content_id', 'title', 'url', 'relevance_score', 'anchor_text'}
"""
if not content or not content.html_content:
return []
# Find relevant content from same account/site/sector
relevant_content = self._find_relevant_content(content)
# Score candidates based on relevance
candidates = self._score_candidates(content, relevant_content)
# Sort by score and return top candidates
candidates.sort(key=lambda x: x.get('relevance_score', 0), reverse=True)
return candidates[:max_candidates]
def _find_relevant_content(self, content: Content) -> List[Content]:
"""Find relevant content from same account/site/sector"""
# Get content from same account, site, and sector
queryset = Content.objects.filter(
account=content.account,
site=content.site,
sector=content.sector,
status__in=['draft', 'review', 'publish']
).exclude(id=content.id)
# Filter by keywords if available
if content.primary_keyword:
queryset = queryset.filter(
models.Q(primary_keyword__icontains=content.primary_keyword) |
models.Q(secondary_keywords__icontains=content.primary_keyword)
)
return list(queryset[:50]) # Limit initial query
def _score_candidates(self, content: Content, candidates: List[Content]) -> List[Dict]:
"""Score candidates based on relevance"""
scored = []
for candidate in candidates:
score = 0
# Keyword overlap (higher weight)
if content.primary_keyword and candidate.primary_keyword:
if content.primary_keyword.lower() in candidate.primary_keyword.lower():
score += 30
if candidate.primary_keyword.lower() in content.primary_keyword.lower():
score += 30
# Secondary keywords overlap
if content.secondary_keywords and candidate.secondary_keywords:
overlap = set(content.secondary_keywords) & set(candidate.secondary_keywords)
score += len(overlap) * 10
# Category overlap
if content.categories and candidate.categories:
overlap = set(content.categories) & set(candidate.categories)
score += len(overlap) * 5
# Tag overlap
if content.tags and candidate.tags:
overlap = set(content.tags) & set(candidate.tags)
score += len(overlap) * 3
# Recency bonus (newer content gets slight boost)
if candidate.generated_at:
days_old = (content.generated_at - candidate.generated_at).days
if days_old < 30:
score += 5
if score > 0:
scored.append({
'content_id': candidate.id,
'title': candidate.title or candidate.task.title if candidate.task else 'Untitled',
'url': f"/content/{candidate.id}/", # Placeholder - actual URL depends on routing
'relevance_score': score,
'anchor_text': self._generate_anchor_text(candidate, content)
})
return scored
def _generate_anchor_text(self, candidate: Content, source_content: Content) -> str:
"""Generate anchor text for link"""
# Use primary keyword if available, otherwise use title
if candidate.primary_keyword:
return candidate.primary_keyword
elif candidate.title:
return candidate.title
elif candidate.task and candidate.task.title:
return candidate.task.title
else:
return "Learn more"

View File

@@ -0,0 +1,73 @@
"""
Link Injection Engine
Injects internal links into content HTML
"""
import logging
import re
from typing import List, Dict
from igny8_core.business.content.models import Content
logger = logging.getLogger(__name__)
class InjectionEngine:
"""Injects links into content HTML"""
def inject_links(self, content: Content, candidates: List[Dict], max_links: int = 5) -> Dict:
"""
Inject links into content HTML.
Args:
content: Content instance
candidates: List of link candidates from CandidateEngine
max_links: Maximum number of links to inject
Returns:
Dict with: {'html_content', 'links', 'links_added'}
"""
if not content.html_content or not candidates:
return {
'html_content': content.html_content,
'links': [],
'links_added': 0
}
html = content.html_content
links_added = []
links_used = set() # Track which candidates we've used
# Sort candidates by relevance score
sorted_candidates = sorted(candidates, key=lambda x: x.get('relevance_score', 0), reverse=True)
# Inject links (limit to max_links)
for candidate in sorted_candidates[:max_links]:
if candidate['content_id'] in links_used:
continue
anchor_text = candidate.get('anchor_text', 'Learn more')
url = candidate.get('url', f"/content/{candidate['content_id']}/")
# Find first occurrence of anchor text in HTML (case-insensitive)
pattern = re.compile(re.escape(anchor_text), re.IGNORECASE)
match = pattern.search(html)
if match:
# Replace with link
link_html = f'<a href="{url}" class="internal-link">{anchor_text}</a>'
html = html[:match.start()] + link_html + html[match.end():]
links_added.append({
'content_id': candidate['content_id'],
'anchor_text': anchor_text,
'url': url,
'position': match.start()
})
links_used.add(candidate['content_id'])
return {
'html_content': html,
'links': links_added,
'links_added': len(links_added)
}

View File

@@ -0,0 +1,101 @@
"""
Linker Service
Main service for processing content for internal linking
"""
import logging
from typing import List
from igny8_core.business.content.models import Content
from igny8_core.business.linking.services.candidate_engine import CandidateEngine
from igny8_core.business.linking.services.injection_engine import InjectionEngine
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
logger = logging.getLogger(__name__)
class LinkerService:
"""Service for processing content for internal linking"""
def __init__(self):
self.candidate_engine = CandidateEngine()
self.injection_engine = InjectionEngine()
self.credit_service = CreditService()
def process(self, content_id: int) -> Content:
"""
Process content for linking.
Args:
content_id: Content ID to process
Returns:
Updated Content instance
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
try:
content = Content.objects.get(id=content_id)
except Content.DoesNotExist:
raise ValueError(f"Content with id {content_id} does not exist")
account = content.account
# Check credits
try:
self.credit_service.check_credits(account, 'linking')
except InsufficientCreditsError:
raise
# Find link candidates
candidates = self.candidate_engine.find_candidates(content)
if not candidates:
logger.info(f"No link candidates found for content {content_id}")
return content
# Inject links
result = self.injection_engine.inject_links(content, candidates)
# Update content
content.html_content = result['html_content']
content.internal_links = result['links']
content.linker_version += 1
content.save(update_fields=['html_content', 'internal_links', 'linker_version'])
# Deduct credits
self.credit_service.deduct_credits_for_operation(
account=account,
operation_type='linking',
description=f"Internal linking for content: {content.title or 'Untitled'}",
related_object_type='content',
related_object_id=content.id
)
logger.info(f"Linked content {content_id}: {result['links_added']} links added")
return content
def batch_process(self, content_ids: List[int]) -> List[Content]:
"""
Process multiple content items for linking.
Args:
content_ids: List of content IDs to process
Returns:
List of updated Content instances
"""
results = []
for content_id in content_ids:
try:
result = self.process(content_id)
results.append(result)
except Exception as e:
logger.error(f"Error processing content {content_id}: {str(e)}", exc_info=True)
# Continue with other items
continue
return results

View File

@@ -0,0 +1,2 @@
# Linking tests

View File

@@ -0,0 +1,139 @@
"""
Tests for CandidateEngine
"""
from django.test import TestCase
from igny8_core.business.content.models import Content
from igny8_core.business.linking.services.candidate_engine import CandidateEngine
from igny8_core.api.tests.test_integration_base import IntegrationTestBase
class CandidateEngineTests(IntegrationTestBase):
"""Tests for CandidateEngine"""
def setUp(self):
super().setUp()
self.engine = CandidateEngine()
# Create source content
self.source_content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Source Content",
html_content="<p>Source content about test keyword.</p>",
primary_keyword="test keyword",
secondary_keywords=["keyword1", "keyword2"],
categories=["category1"],
tags=["tag1", "tag2"],
word_count=100,
status='draft'
)
# Create relevant content (same keyword)
self.relevant_content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Relevant Content",
html_content="<p>Relevant content about test keyword.</p>",
primary_keyword="test keyword",
secondary_keywords=["keyword1"],
categories=["category1"],
tags=["tag1"],
word_count=150,
status='draft'
)
# Create less relevant content (different keyword)
self.less_relevant = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Less Relevant",
html_content="<p>Different content.</p>",
primary_keyword="different keyword",
word_count=100,
status='draft'
)
def test_find_candidates_returns_relevant_content(self):
"""Test that find_candidates returns relevant content"""
candidates = self.engine.find_candidates(self.source_content, max_candidates=10)
# Should find relevant content
candidate_ids = [c['content_id'] for c in candidates]
self.assertIn(self.relevant_content.id, candidate_ids)
def test_find_candidates_scores_by_relevance(self):
"""Test that candidates are scored by relevance"""
candidates = self.engine.find_candidates(self.source_content, max_candidates=10)
# Relevant content should have higher score
relevant_candidate = next((c for c in candidates if c['content_id'] == self.relevant_content.id), None)
self.assertIsNotNone(relevant_candidate)
self.assertGreater(relevant_candidate['relevance_score'], 0)
def test_find_candidates_excludes_self(self):
"""Test that source content is excluded from candidates"""
candidates = self.engine.find_candidates(self.source_content, max_candidates=10)
candidate_ids = [c['content_id'] for c in candidates]
self.assertNotIn(self.source_content.id, candidate_ids)
def test_find_candidates_respects_account_isolation(self):
"""Test that candidates are only from same account"""
# Create content from different account
from igny8_core.auth.models import Account
other_account = Account.objects.create(
name="Other Account",
slug="other-account",
plan=self.plan,
owner=self.user
)
other_content = Content.objects.create(
account=other_account,
site=self.site,
sector=self.sector,
title="Other Account Content",
primary_keyword="test keyword",
word_count=100,
status='draft'
)
candidates = self.engine.find_candidates(self.source_content, max_candidates=10)
candidate_ids = [c['content_id'] for c in candidates]
self.assertNotIn(other_content.id, candidate_ids)
def test_find_candidates_returns_empty_for_no_content(self):
"""Test that empty list is returned when no content"""
empty_content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Empty",
html_content="",
word_count=0,
status='draft'
)
candidates = self.engine.find_candidates(empty_content, max_candidates=10)
self.assertEqual(len(candidates), 0)
def test_find_candidates_respects_max_candidates(self):
"""Test that max_candidates limit is respected"""
# Create multiple relevant content items
for i in range(15):
Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title=f"Content {i}",
primary_keyword="test keyword",
word_count=100,
status='draft'
)
candidates = self.engine.find_candidates(self.source_content, max_candidates=5)
self.assertLessEqual(len(candidates), 5)

View File

@@ -0,0 +1,136 @@
"""
Tests for InjectionEngine
"""
from django.test import TestCase
from igny8_core.business.content.models import Content
from igny8_core.business.linking.services.injection_engine import InjectionEngine
from igny8_core.api.tests.test_integration_base import IntegrationTestBase
class InjectionEngineTests(IntegrationTestBase):
"""Tests for InjectionEngine"""
def setUp(self):
super().setUp()
self.engine = InjectionEngine()
# Create content with HTML
self.content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test Content",
html_content="<p>This is test content with some keywords and text.</p>",
word_count=100,
status='draft'
)
def test_inject_links_adds_links_to_html(self):
"""Test that links are injected into HTML content"""
candidates = [{
'content_id': 1,
'title': 'Target Content',
'url': '/content/1/',
'relevance_score': 50,
'anchor_text': 'keywords'
}]
result = self.engine.inject_links(self.content, candidates, max_links=5)
# Check that link was added
self.assertIn('<a href="/content/1/" class="internal-link">keywords</a>', result['html_content'])
self.assertEqual(result['links_added'], 1)
self.assertEqual(len(result['links']), 1)
def test_inject_links_respects_max_links(self):
"""Test that max_links limit is respected"""
candidates = [
{'content_id': i, 'title': f'Content {i}', 'url': f'/content/{i}/',
'relevance_score': 50, 'anchor_text': f'keyword{i}'}
for i in range(10)
]
# Update HTML to include all anchor texts
self.content.html_content = "<p>" + " ".join([f'keyword{i}' for i in range(10)]) + "</p>"
self.content.save()
result = self.engine.inject_links(self.content, candidates, max_links=3)
self.assertLessEqual(result['links_added'], 3)
self.assertLessEqual(len(result['links']), 3)
def test_inject_links_returns_unchanged_when_no_candidates(self):
"""Test that content is unchanged when no candidates"""
original_html = self.content.html_content
result = self.engine.inject_links(self.content, [], max_links=5)
self.assertEqual(result['html_content'], original_html)
self.assertEqual(result['links_added'], 0)
self.assertEqual(len(result['links']), 0)
def test_inject_links_returns_unchanged_when_no_html(self):
"""Test that empty HTML returns unchanged"""
self.content.html_content = ""
self.content.save()
candidates = [{
'content_id': 1,
'title': 'Target',
'url': '/content/1/',
'relevance_score': 50,
'anchor_text': 'test'
}]
result = self.engine.inject_links(self.content, candidates, max_links=5)
self.assertEqual(result['html_content'], "")
self.assertEqual(result['links_added'], 0)
def test_inject_links_case_insensitive_matching(self):
"""Test that anchor text matching is case-insensitive"""
self.content.html_content = "<p>This is TEST content.</p>"
self.content.save()
candidates = [{
'content_id': 1,
'title': 'Target',
'url': '/content/1/',
'relevance_score': 50,
'anchor_text': 'test'
}]
result = self.engine.inject_links(self.content, candidates, max_links=5)
# Should find and replace despite case difference
self.assertIn('internal-link', result['html_content'])
self.assertEqual(result['links_added'], 1)
def test_inject_links_prevents_duplicate_links(self):
"""Test that same candidate is not linked twice"""
candidates = [
{
'content_id': 1,
'title': 'Target',
'url': '/content/1/',
'relevance_score': 50,
'anchor_text': 'test'
},
{
'content_id': 1, # Same content_id
'title': 'Target',
'url': '/content/1/',
'relevance_score': 40,
'anchor_text': 'test'
}
]
self.content.html_content = "<p>This is test content with test keywords.</p>"
self.content.save()
result = self.engine.inject_links(self.content, candidates, max_links=5)
# Should only add one link despite two candidates
self.assertEqual(result['links_added'], 1)
self.assertEqual(result['html_content'].count('internal-link'), 1)

View File

@@ -0,0 +1,141 @@
"""
Tests for LinkerService
"""
from unittest.mock import Mock, patch, MagicMock
from django.test import TestCase
from igny8_core.business.content.models import Content
from igny8_core.business.linking.services.linker_service import LinkerService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
from igny8_core.api.tests.test_integration_base import IntegrationTestBase
class LinkerServiceTests(IntegrationTestBase):
"""Tests for LinkerService"""
def setUp(self):
super().setUp()
self.service = LinkerService()
# Create test content
self.content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test Content",
html_content="<p>This is test content with some keywords.</p>",
primary_keyword="test keyword",
word_count=100,
status='draft'
)
# Create another content for linking
self.target_content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Target Content",
html_content="<p>Target content for linking.</p>",
primary_keyword="test keyword",
word_count=150,
status='draft'
)
@patch('igny8_core.business.linking.services.linker_service.CreditService.check_credits')
@patch('igny8_core.business.linking.services.linker_service.CandidateEngine.find_candidates')
@patch('igny8_core.business.linking.services.linker_service.InjectionEngine.inject_links')
@patch('igny8_core.business.linking.services.linker_service.CreditService.deduct_credits_for_operation')
def test_process_single_content(self, mock_deduct, mock_inject, mock_find, mock_check):
"""Test processing single content for linking"""
# Setup mocks
mock_check.return_value = True
mock_find.return_value = [{
'content_id': self.target_content.id,
'title': 'Target Content',
'url': '/content/2/',
'relevance_score': 50,
'anchor_text': 'test keyword'
}]
mock_inject.return_value = {
'html_content': '<p>This is test content with <a href="/content/2/">test keyword</a>.</p>',
'links': [{
'content_id': self.target_content.id,
'anchor_text': 'test keyword',
'url': '/content/2/'
}],
'links_added': 1
}
# Execute
result = self.service.process(self.content.id)
# Assertions
self.assertEqual(result.id, self.content.id)
self.assertEqual(result.linker_version, 1)
self.assertEqual(len(result.internal_links), 1)
mock_check.assert_called_once_with(self.account, 'linking')
mock_deduct.assert_called_once()
@patch('igny8_core.business.linking.services.linker_service.CreditService.check_credits')
def test_process_insufficient_credits(self, mock_check):
"""Test that InsufficientCreditsError is raised when credits are insufficient"""
mock_check.side_effect = InsufficientCreditsError("Insufficient credits")
with self.assertRaises(InsufficientCreditsError):
self.service.process(self.content.id)
def test_process_content_not_found(self):
"""Test that ValueError is raised when content doesn't exist"""
with self.assertRaises(ValueError):
self.service.process(99999)
@patch('igny8_core.business.linking.services.linker_service.LinkerService.process')
def test_batch_process_multiple_content(self, mock_process):
"""Test batch processing multiple content items"""
# Create additional content
content2 = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Content 2",
html_content="<p>Content 2</p>",
word_count=100,
status='draft'
)
# Setup mock
mock_process.side_effect = [self.content, content2]
# Execute
results = self.service.batch_process([self.content.id, content2.id])
# Assertions
self.assertEqual(len(results), 2)
self.assertEqual(mock_process.call_count, 2)
@patch('igny8_core.business.linking.services.linker_service.LinkerService.process')
def test_batch_process_handles_partial_failure(self, mock_process):
"""Test batch processing handles partial failures"""
# Setup mock to fail on second item
mock_process.side_effect = [self.content, Exception("Processing failed")]
# Execute
results = self.service.batch_process([self.content.id, 99999])
# Assertions - should continue processing other items
self.assertEqual(len(results), 1)
self.assertEqual(results[0].id, self.content.id)
@patch('igny8_core.business.linking.services.linker_service.CreditService.check_credits')
@patch('igny8_core.business.linking.services.linker_service.CandidateEngine.find_candidates')
def test_process_no_candidates_found(self, mock_find, mock_check):
"""Test processing when no candidates are found"""
mock_check.return_value = True
mock_find.return_value = []
# Execute
result = self.service.process(self.content.id)
# Assertions - should return content unchanged
self.assertEqual(result.id, self.content.id)
self.assertEqual(result.linker_version, 0) # Not incremented

View File

@@ -0,0 +1,6 @@
"""
Optimization Business Logic
Phase 4: Linker & Optimizer
"""

View File

@@ -0,0 +1,8 @@
from django.apps import AppConfig
class OptimizationConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'igny8_core.business.optimization'
verbose_name = 'Optimization'

View File

@@ -0,0 +1,54 @@
# Generated manually for Phase 4: Optimization System
import django.core.validators
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('igny8_core_auth', '0013_remove_ai_cost_per_request'),
('writer', '0009_add_content_site_source_fields'),
]
operations = [
migrations.CreateModel(
name='OptimizationTask',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('scores_before', models.JSONField(default=dict, help_text='Optimization scores before')),
('scores_after', models.JSONField(default=dict, help_text='Optimization scores after')),
('html_before', models.TextField(blank=True, help_text='HTML content before optimization')),
('html_after', models.TextField(blank=True, help_text='HTML content after optimization')),
('status', models.CharField(choices=[('pending', 'Pending'), ('running', 'Running'), ('completed', 'Completed'), ('failed', 'Failed')], db_index=True, default='pending', help_text='Optimization task status', max_length=20)),
('credits_used', models.IntegerField(default=0, help_text='Credits used for optimization', validators=[django.core.validators.MinValueValidator(0)])),
('metadata', models.JSONField(blank=True, default=dict, help_text='Additional metadata')),
('account', models.ForeignKey(db_column='tenant_id', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_set', to='igny8_core_auth.tenant')),
('content', models.ForeignKey(help_text='The content being optimized', on_delete=django.db.models.deletion.CASCADE, related_name='optimization_tasks', to='writer.content')),
],
options={
'db_table': 'igny8_optimization_tasks',
'ordering': ['-created_at'],
'verbose_name': 'Optimization Task',
'verbose_name_plural': 'Optimization Tasks',
},
),
migrations.AddIndex(
model_name='optimizationtask',
index=models.Index(fields=['content', 'status'], name='igny8_optim_content_status_idx'),
),
migrations.AddIndex(
model_name='optimizationtask',
index=models.Index(fields=['account', 'status'], name='igny8_optim_account_status_idx'),
),
migrations.AddIndex(
model_name='optimizationtask',
index=models.Index(fields=['status', 'created_at'], name='igny8_optim_status_created_idx'),
),
]

View File

@@ -0,0 +1,77 @@
"""
Optimization Models
Phase 4: Linker & Optimizer
"""
from django.db import models
from django.core.validators import MinValueValidator
from igny8_core.auth.models import AccountBaseModel
from igny8_core.business.content.models import Content
class OptimizationTask(AccountBaseModel):
"""
Optimization Task model for tracking content optimization runs.
"""
STATUS_CHOICES = [
('pending', 'Pending'),
('running', 'Running'),
('completed', 'Completed'),
('failed', 'Failed'),
]
content = models.ForeignKey(
Content,
on_delete=models.CASCADE,
related_name='optimization_tasks',
help_text="The content being optimized"
)
# Scores before and after optimization
scores_before = models.JSONField(default=dict, help_text="Optimization scores before")
scores_after = models.JSONField(default=dict, help_text="Optimization scores after")
# Content before and after (for comparison)
html_before = models.TextField(blank=True, help_text="HTML content before optimization")
html_after = models.TextField(blank=True, help_text="HTML content after optimization")
# Status
status = models.CharField(
max_length=20,
choices=STATUS_CHOICES,
default='pending',
db_index=True,
help_text="Optimization task status"
)
# Credits used
credits_used = models.IntegerField(default=0, validators=[MinValueValidator(0)], help_text="Credits used for optimization")
# Metadata
metadata = models.JSONField(default=dict, blank=True, help_text="Additional metadata")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'optimization'
db_table = 'igny8_optimization_tasks'
ordering = ['-created_at']
verbose_name = 'Optimization Task'
verbose_name_plural = 'Optimization Tasks'
indexes = [
models.Index(fields=['content', 'status']),
models.Index(fields=['account', 'status']),
models.Index(fields=['status', 'created_at']),
]
def save(self, *args, **kwargs):
"""Automatically set account from content"""
if self.content:
self.account = self.content.account
super().save(*args, **kwargs)
def __str__(self):
return f"Optimization for {self.content.title or 'Content'} ({self.get_status_display()})"

View File

@@ -0,0 +1,5 @@
"""
Optimization Services
"""

View File

@@ -0,0 +1,184 @@
"""
Content Analyzer
Analyzes content quality and calculates optimization scores
"""
import logging
import re
from typing import Dict
from igny8_core.business.content.models import Content
logger = logging.getLogger(__name__)
class ContentAnalyzer:
"""Analyzes content quality"""
def analyze(self, content: Content) -> Dict:
"""
Analyze content and return scores.
Args:
content: Content instance to analyze
Returns:
Dict with scores: {'seo_score', 'readability_score', 'engagement_score', 'overall_score'}
"""
if not content or not content.html_content:
return {
'seo_score': 0,
'readability_score': 0,
'engagement_score': 0,
'overall_score': 0
}
seo_score = self._calculate_seo_score(content)
readability_score = self._calculate_readability_score(content)
engagement_score = self._calculate_engagement_score(content)
# Overall score is weighted average
overall_score = (
seo_score * 0.4 +
readability_score * 0.3 +
engagement_score * 0.3
)
return {
'seo_score': round(seo_score, 2),
'readability_score': round(readability_score, 2),
'engagement_score': round(engagement_score, 2),
'overall_score': round(overall_score, 2),
'word_count': content.word_count or 0,
'has_meta_title': bool(content.meta_title),
'has_meta_description': bool(content.meta_description),
'has_primary_keyword': bool(content.primary_keyword),
'internal_links_count': len(content.internal_links) if content.internal_links else 0
}
def _calculate_seo_score(self, content: Content) -> float:
"""Calculate SEO score (0-100)"""
score = 0
# Meta title (20 points)
if content.meta_title:
if len(content.meta_title) >= 30 and len(content.meta_title) <= 60:
score += 20
elif len(content.meta_title) > 0:
score += 10
# Meta description (20 points)
if content.meta_description:
if len(content.meta_description) >= 120 and len(content.meta_description) <= 160:
score += 20
elif len(content.meta_description) > 0:
score += 10
# Primary keyword (20 points)
if content.primary_keyword:
score += 20
# Word count (20 points) - optimal range 1000-2500 words
word_count = content.word_count or 0
if 1000 <= word_count <= 2500:
score += 20
elif 500 <= word_count < 1000 or 2500 < word_count <= 3000:
score += 15
elif word_count > 0:
score += 10
# Internal links (20 points)
internal_links = content.internal_links or []
if len(internal_links) >= 3:
score += 20
elif len(internal_links) >= 1:
score += 10
return min(score, 100)
def _calculate_readability_score(self, content: Content) -> float:
"""Calculate readability score (0-100)"""
if not content.html_content:
return 0
# Simple readability metrics
html = content.html_content
# Remove HTML tags for text analysis
text = re.sub(r'<[^>]+>', '', html)
sentences = re.split(r'[.!?]+', text)
words = text.split()
if not words:
return 0
# Average sentence length (optimal: 15-20 words)
avg_sentence_length = len(words) / max(len(sentences), 1)
if 15 <= avg_sentence_length <= 20:
sentence_score = 40
elif 10 <= avg_sentence_length < 15 or 20 < avg_sentence_length <= 25:
sentence_score = 30
else:
sentence_score = 20
# Average word length (optimal: 4-5 characters)
avg_word_length = sum(len(word) for word in words) / len(words)
if 4 <= avg_word_length <= 5:
word_score = 30
elif 3 <= avg_word_length < 4 or 5 < avg_word_length <= 6:
word_score = 20
else:
word_score = 10
# Paragraph structure (30 points)
paragraphs = html.count('<p>') + html.count('<div>')
if paragraphs >= 3:
paragraph_score = 30
elif paragraphs >= 1:
paragraph_score = 20
else:
paragraph_score = 10
return min(sentence_score + word_score + paragraph_score, 100)
def _calculate_engagement_score(self, content: Content) -> float:
"""Calculate engagement score (0-100)"""
score = 0
# Headings (30 points)
if content.html_content:
h1_count = content.html_content.count('<h1>')
h2_count = content.html_content.count('<h2>')
h3_count = content.html_content.count('<h3>')
if h1_count >= 1 and h2_count >= 2:
score += 30
elif h1_count >= 1 or h2_count >= 1:
score += 20
elif h3_count >= 1:
score += 10
# Images (30 points)
if hasattr(content, 'images'):
image_count = content.images.count()
if image_count >= 3:
score += 30
elif image_count >= 1:
score += 20
# Lists (20 points)
if content.html_content:
list_count = content.html_content.count('<ul>') + content.html_content.count('<ol>')
if list_count >= 2:
score += 20
elif list_count >= 1:
score += 10
# Internal links (20 points)
internal_links = content.internal_links or []
if len(internal_links) >= 3:
score += 20
elif len(internal_links) >= 1:
score += 10
return min(score, 100)

View File

@@ -0,0 +1,231 @@
"""
Optimizer Service
Main service for content optimization with multiple entry points
"""
import logging
from typing import Optional
from igny8_core.business.content.models import Content
from igny8_core.business.optimization.models import OptimizationTask
from igny8_core.business.optimization.services.analyzer import ContentAnalyzer
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
logger = logging.getLogger(__name__)
class OptimizerService:
"""Service for content optimization with multiple entry points"""
def __init__(self):
self.analyzer = ContentAnalyzer()
self.credit_service = CreditService()
def optimize_from_writer(self, content_id: int) -> Content:
"""
Entry Point 1: Writer → Optimizer
Args:
content_id: Content ID from Writer module
Returns:
Optimized Content instance
"""
try:
content = Content.objects.get(id=content_id, source='igny8')
except Content.DoesNotExist:
raise ValueError(f"IGNY8 content with id {content_id} does not exist")
return self.optimize(content)
def optimize_from_wordpress_sync(self, content_id: int) -> Content:
"""
Entry Point 2: WordPress Sync → Optimizer
Args:
content_id: Content ID synced from WordPress
Returns:
Optimized Content instance
"""
try:
content = Content.objects.get(id=content_id, source='wordpress')
except Content.DoesNotExist:
raise ValueError(f"WordPress content with id {content_id} does not exist")
return self.optimize(content)
def optimize_from_external_sync(self, content_id: int) -> Content:
"""
Entry Point 3: External Sync → Optimizer (Shopify, custom APIs)
Args:
content_id: Content ID synced from external source
Returns:
Optimized Content instance
"""
try:
content = Content.objects.get(id=content_id, source__in=['shopify', 'custom'])
except Content.DoesNotExist:
raise ValueError(f"External content with id {content_id} does not exist")
return self.optimize(content)
def optimize_manual(self, content_id: int) -> Content:
"""
Entry Point 4: Manual Selection → Optimizer
Args:
content_id: Content ID selected manually
Returns:
Optimized Content instance
"""
try:
content = Content.objects.get(id=content_id)
except Content.DoesNotExist:
raise ValueError(f"Content with id {content_id} does not exist")
return self.optimize(content)
def optimize(self, content: Content) -> Content:
"""
Unified optimization logic (used by all entry points).
Args:
content: Content instance to optimize
Returns:
Optimized Content instance
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
account = content.account
word_count = content.word_count or 0
# Check credits
try:
self.credit_service.check_credits(account, 'optimization', word_count)
except InsufficientCreditsError:
raise
# Analyze content before optimization
scores_before = self.analyzer.analyze(content)
html_before = content.html_content
# Create optimization task
task = OptimizationTask.objects.create(
content=content,
scores_before=scores_before,
status='running',
html_before=html_before,
account=account
)
try:
# Delegate to AI function (actual optimization happens in Celery/AI task)
# For now, we'll do a simple optimization pass
# In production, this would call the AI function
optimized_content = self._optimize_content(content, scores_before)
# Analyze optimized content
scores_after = self.analyzer.analyze(optimized_content)
# Calculate credits used
credits_used = self.credit_service.get_credit_cost('optimization', word_count)
# Update optimization task
task.scores_after = scores_after
task.html_after = optimized_content.html_content
task.status = 'completed'
task.credits_used = credits_used
task.save()
# Update content
content.html_content = optimized_content.html_content
content.optimizer_version += 1
content.optimization_scores = scores_after
content.save(update_fields=['html_content', 'optimizer_version', 'optimization_scores'])
# Deduct credits
self.credit_service.deduct_credits_for_operation(
account=account,
operation_type='optimization',
amount=word_count,
description=f"Content optimization: {content.title or 'Untitled'}",
related_object_type='content',
related_object_id=content.id,
metadata={
'scores_before': scores_before,
'scores_after': scores_after,
'improvement': scores_after.get('overall_score', 0) - scores_before.get('overall_score', 0)
}
)
logger.info(f"Optimized content {content.id}: {scores_before.get('overall_score', 0)}{scores_after.get('overall_score', 0)}")
return content
except Exception as e:
logger.error(f"Error optimizing content {content.id}: {str(e)}", exc_info=True)
task.status = 'failed'
task.metadata = {'error': str(e)}
task.save()
raise
def _optimize_content(self, content: Content, scores_before: dict) -> Content:
"""
Internal method to optimize content using AI function.
Args:
content: Content to optimize
scores_before: Scores before optimization
Returns:
Optimized Content instance
"""
from igny8_core.ai.engine import AIEngine
from igny8_core.ai.registry import get_function_instance
# Prepare payload for AI function
payload = {
'ids': [content.id],
}
# Get function from registry
fn = get_function_instance('optimize_content')
if not fn:
raise ValueError("OptimizeContentFunction not found in registry")
# Execute AI function
ai_engine = AIEngine(account=content.account)
result = ai_engine.execute(fn, payload)
if not result.get('success'):
raise ValueError(f"Optimization failed: {result.get('error', 'Unknown error')}")
# The AI function's save_output method already updates the content
# We just need to refresh from database to get the updated content
content.refresh_from_db()
return content
def analyze_only(self, content_id: int) -> dict:
"""
Analyze content without optimizing (for preview).
Args:
content_id: Content ID to analyze
Returns:
Analysis scores dict
"""
try:
content = Content.objects.get(id=content_id)
except Content.DoesNotExist:
raise ValueError(f"Content with id {content_id} does not exist")
return self.analyzer.analyze(content)

View File

@@ -0,0 +1,2 @@
# Optimization tests

View File

@@ -0,0 +1,177 @@
"""
Tests for ContentAnalyzer
"""
from django.test import TestCase
from igny8_core.business.content.models import Content
from igny8_core.business.optimization.services.analyzer import ContentAnalyzer
from igny8_core.api.tests.test_integration_base import IntegrationTestBase
class ContentAnalyzerTests(IntegrationTestBase):
"""Tests for ContentAnalyzer"""
def setUp(self):
super().setUp()
self.analyzer = ContentAnalyzer()
def test_analyze_returns_all_scores(self):
"""Test that analyze returns all required scores"""
content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test Content",
html_content="<p>This is test content.</p>",
meta_title="Test Title",
meta_description="Test description",
primary_keyword="test keyword",
word_count=1500,
status='draft'
)
scores = self.analyzer.analyze(content)
self.assertIn('seo_score', scores)
self.assertIn('readability_score', scores)
self.assertIn('engagement_score', scores)
self.assertIn('overall_score', scores)
self.assertIn('word_count', scores)
self.assertIn('has_meta_title', scores)
self.assertIn('has_meta_description', scores)
self.assertIn('has_primary_keyword', scores)
self.assertIn('internal_links_count', scores)
def test_analyze_returns_zero_scores_for_empty_content(self):
"""Test that empty content returns zero scores"""
content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Empty",
html_content="",
word_count=0,
status='draft'
)
scores = self.analyzer.analyze(content)
self.assertEqual(scores['seo_score'], 0)
self.assertEqual(scores['readability_score'], 0)
self.assertEqual(scores['engagement_score'], 0)
self.assertEqual(scores['overall_score'], 0)
def test_calculate_seo_score_with_meta_title(self):
"""Test SEO score calculation with meta title"""
content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test",
meta_title="Test Title" * 5, # 50 chars - optimal length
word_count=1500,
status='draft'
)
scores = self.analyzer.analyze(content)
self.assertGreater(scores['seo_score'], 0)
def test_calculate_seo_score_with_primary_keyword(self):
"""Test SEO score calculation with primary keyword"""
content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test",
primary_keyword="test keyword",
word_count=1500,
status='draft'
)
scores = self.analyzer.analyze(content)
self.assertGreater(scores['seo_score'], 0)
def test_calculate_readability_score(self):
"""Test readability score calculation"""
# Create content with good readability (short sentences, paragraphs)
html = "<p>This is a sentence.</p><p>This is another sentence.</p><p>And one more.</p>"
content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test",
html_content=html,
word_count=20,
status='draft'
)
scores = self.analyzer.analyze(content)
self.assertGreater(scores['readability_score'], 0)
def test_calculate_engagement_score_with_headings(self):
"""Test engagement score calculation with headings"""
html = "<h1>Main Heading</h1><h2>Subheading 1</h2><h2>Subheading 2</h2>"
content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test",
html_content=html,
word_count=100,
status='draft'
)
scores = self.analyzer.analyze(content)
self.assertGreater(scores['engagement_score'], 0)
def test_calculate_engagement_score_with_internal_links(self):
"""Test engagement score calculation with internal links"""
content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test",
html_content="<p>Test content.</p>",
internal_links=[
{'content_id': 1, 'anchor_text': 'link1'},
{'content_id': 2, 'anchor_text': 'link2'},
{'content_id': 3, 'anchor_text': 'link3'}
],
word_count=100,
status='draft'
)
scores = self.analyzer.analyze(content)
self.assertGreater(scores['engagement_score'], 0)
self.assertEqual(scores['internal_links_count'], 3)
def test_overall_score_is_weighted_average(self):
"""Test that overall score is weighted average"""
content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test",
html_content="<p>Test content.</p>",
meta_title="Test Title",
meta_description="Test description",
primary_keyword="test",
word_count=1500,
status='draft'
)
scores = self.analyzer.analyze(content)
# Overall should be weighted: SEO (40%) + Readability (30%) + Engagement (30%)
expected = (
scores['seo_score'] * 0.4 +
scores['readability_score'] * 0.3 +
scores['engagement_score'] * 0.3
)
self.assertAlmostEqual(scores['overall_score'], expected, places=1)

View File

@@ -0,0 +1,189 @@
"""
Tests for OptimizerService
"""
from unittest.mock import Mock, patch, MagicMock
from django.test import TestCase
from igny8_core.business.content.models import Content
from igny8_core.business.optimization.models import OptimizationTask
from igny8_core.business.optimization.services.optimizer_service import OptimizerService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
from igny8_core.api.tests.test_integration_base import IntegrationTestBase
class OptimizerServiceTests(IntegrationTestBase):
"""Tests for OptimizerService"""
def setUp(self):
super().setUp()
self.service = OptimizerService()
# Create test content
self.content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Test Content",
html_content="<p>This is test content.</p>",
meta_title="Test Title",
meta_description="Test description",
primary_keyword="test keyword",
word_count=500,
status='draft',
source='igny8'
)
@patch('igny8_core.business.optimization.services.optimizer_service.CreditService.check_credits')
@patch('igny8_core.business.optimization.services.optimizer_service.ContentAnalyzer.analyze')
@patch('igny8_core.business.optimization.services.optimizer_service.OptimizerService._optimize_content')
@patch('igny8_core.business.optimization.services.optimizer_service.CreditService.deduct_credits_for_operation')
def test_optimize_from_writer(self, mock_deduct, mock_optimize, mock_analyze, mock_check):
"""Test optimize_from_writer entry point"""
mock_check.return_value = True
mock_analyze.return_value = {
'seo_score': 50.0,
'readability_score': 60.0,
'engagement_score': 55.0,
'overall_score': 55.0
}
optimized_content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Optimized Content",
html_content="<p>Optimized content.</p>",
word_count=500,
status='draft',
source='igny8'
)
mock_optimize.return_value = optimized_content
result = self.service.optimize_from_writer(self.content.id)
self.assertEqual(result.id, self.content.id)
mock_check.assert_called_once()
mock_deduct.assert_called_once()
def test_optimize_from_writer_invalid_content(self):
"""Test that ValueError is raised for invalid content"""
with self.assertRaises(ValueError):
self.service.optimize_from_writer(99999)
def test_optimize_from_writer_wrong_source(self):
"""Test that ValueError is raised for wrong source"""
content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="WordPress Content",
word_count=100,
source='wordpress'
)
with self.assertRaises(ValueError):
self.service.optimize_from_writer(content.id)
@patch('igny8_core.business.optimization.services.optimizer_service.CreditService.check_credits')
def test_optimize_insufficient_credits(self, mock_check):
"""Test that InsufficientCreditsError is raised when credits are insufficient"""
mock_check.side_effect = InsufficientCreditsError("Insufficient credits")
with self.assertRaises(InsufficientCreditsError):
self.service.optimize(self.content)
@patch('igny8_core.business.optimization.services.optimizer_service.CreditService.check_credits')
@patch('igny8_core.business.optimization.services.optimizer_service.ContentAnalyzer.analyze')
@patch('igny8_core.business.optimization.services.optimizer_service.OptimizerService._optimize_content')
@patch('igny8_core.business.optimization.services.optimizer_service.CreditService.deduct_credits_for_operation')
def test_optimize_creates_optimization_task(self, mock_deduct, mock_optimize, mock_analyze, mock_check):
"""Test that optimization creates OptimizationTask"""
mock_check.return_value = True
scores = {
'seo_score': 50.0,
'readability_score': 60.0,
'engagement_score': 55.0,
'overall_score': 55.0
}
mock_analyze.return_value = scores
optimized_content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Optimized",
html_content="<p>Optimized.</p>",
word_count=500,
status='draft'
)
mock_optimize.return_value = optimized_content
result = self.service.optimize(self.content)
# Check that task was created
task = OptimizationTask.objects.filter(content=self.content).first()
self.assertIsNotNone(task)
self.assertEqual(task.status, 'completed')
self.assertEqual(task.scores_before, scores)
@patch('igny8_core.business.optimization.services.optimizer_service.CreditService.check_credits')
@patch('igny8_core.business.optimization.services.optimizer_service.ContentAnalyzer.analyze')
def test_analyze_only_returns_scores(self, mock_analyze, mock_check):
"""Test analyze_only method returns scores without optimizing"""
scores = {
'seo_score': 50.0,
'readability_score': 60.0,
'engagement_score': 55.0,
'overall_score': 55.0
}
mock_analyze.return_value = scores
result = self.service.analyze_only(self.content.id)
self.assertEqual(result, scores)
mock_analyze.assert_called_once()
def test_optimize_from_wordpress_sync(self):
"""Test optimize_from_wordpress_sync entry point"""
content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="WordPress Content",
word_count=100,
source='wordpress'
)
with patch.object(self.service, 'optimize') as mock_optimize:
mock_optimize.return_value = content
result = self.service.optimize_from_wordpress_sync(content.id)
self.assertEqual(result.id, content.id)
mock_optimize.assert_called_once()
def test_optimize_from_external_sync(self):
"""Test optimize_from_external_sync entry point"""
content = Content.objects.create(
account=self.account,
site=self.site,
sector=self.sector,
title="Shopify Content",
word_count=100,
source='shopify'
)
with patch.object(self.service, 'optimize') as mock_optimize:
mock_optimize.return_value = content
result = self.service.optimize_from_external_sync(content.id)
self.assertEqual(result.id, content.id)
mock_optimize.assert_called_once()
def test_optimize_manual(self):
"""Test optimize_manual entry point"""
with patch.object(self.service, 'optimize') as mock_optimize:
mock_optimize.return_value = self.content
result = self.service.optimize_manual(self.content.id)
self.assertEqual(result.id, self.content.id)
mock_optimize.assert_called_once()

View File

@@ -0,0 +1,4 @@
"""
Planning business logic - Keywords, Clusters, ContentIdeas models and services
"""

View File

@@ -0,0 +1,198 @@
from django.db import models
from igny8_core.auth.models import SiteSectorBaseModel, SeedKeyword
class Clusters(SiteSectorBaseModel):
"""Clusters model for keyword grouping"""
name = models.CharField(max_length=255, unique=True, db_index=True)
description = models.TextField(blank=True, null=True)
keywords_count = models.IntegerField(default=0)
volume = models.IntegerField(default=0)
mapped_pages = models.IntegerField(default=0)
status = models.CharField(max_length=50, default='active')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'planner'
db_table = 'igny8_clusters'
ordering = ['name']
verbose_name = 'Cluster'
verbose_name_plural = 'Clusters'
indexes = [
models.Index(fields=['name']),
models.Index(fields=['status']),
models.Index(fields=['site', 'sector']),
]
def __str__(self):
return self.name
class Keywords(SiteSectorBaseModel):
"""
Keywords model for SEO keyword management.
Site-specific instances that reference global SeedKeywords.
"""
STATUS_CHOICES = [
('active', 'Active'),
('pending', 'Pending'),
('archived', 'Archived'),
]
# Required: Link to global SeedKeyword
seed_keyword = models.ForeignKey(
SeedKeyword,
on_delete=models.PROTECT, # Prevent deletion if Keywords reference it
related_name='site_keywords',
help_text="Reference to the global seed keyword"
)
# Site-specific overrides (optional)
volume_override = models.IntegerField(
null=True,
blank=True,
help_text="Site-specific volume override (uses seed_keyword.volume if not set)"
)
difficulty_override = models.IntegerField(
null=True,
blank=True,
help_text="Site-specific difficulty override (uses seed_keyword.difficulty if not set)"
)
cluster = models.ForeignKey(
'Clusters',
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='keywords',
limit_choices_to={'sector': models.F('sector')} # Cluster must be in same sector
)
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='pending')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'planner'
db_table = 'igny8_keywords'
ordering = ['-created_at']
verbose_name = 'Keyword'
verbose_name_plural = 'Keywords'
unique_together = [['seed_keyword', 'site', 'sector']] # One keyword per site/sector
indexes = [
models.Index(fields=['seed_keyword']),
models.Index(fields=['status']),
models.Index(fields=['cluster']),
models.Index(fields=['site', 'sector']),
models.Index(fields=['seed_keyword', 'site', 'sector']),
]
@property
def keyword(self):
"""Get keyword text from seed_keyword"""
return self.seed_keyword.keyword if self.seed_keyword else ''
@property
def volume(self):
"""Get volume from override or seed_keyword"""
return self.volume_override if self.volume_override is not None else (self.seed_keyword.volume if self.seed_keyword else 0)
@property
def difficulty(self):
"""Get difficulty from override or seed_keyword"""
return self.difficulty_override if self.difficulty_override is not None else (self.seed_keyword.difficulty if self.seed_keyword else 0)
@property
def intent(self):
"""Get intent from seed_keyword"""
return self.seed_keyword.intent if self.seed_keyword else 'informational'
def save(self, *args, **kwargs):
"""Validate that seed_keyword's industry/sector matches site's industry/sector"""
if self.seed_keyword and self.site and self.sector:
# Validate industry match
if self.site.industry != self.seed_keyword.industry:
from django.core.exceptions import ValidationError
raise ValidationError(
f"SeedKeyword industry ({self.seed_keyword.industry.name}) must match site industry ({self.site.industry.name})"
)
# Validate sector match (site sector's industry_sector must match seed_keyword's sector)
if self.sector.industry_sector != self.seed_keyword.sector:
from django.core.exceptions import ValidationError
raise ValidationError(
f"SeedKeyword sector ({self.seed_keyword.sector.name}) must match site sector's industry sector ({self.sector.industry_sector.name if self.sector.industry_sector else 'None'})"
)
super().save(*args, **kwargs)
def __str__(self):
return self.keyword
class ContentIdeas(SiteSectorBaseModel):
"""Content Ideas model for planning content based on keyword clusters"""
STATUS_CHOICES = [
('new', 'New'),
('scheduled', 'Scheduled'),
('published', 'Published'),
]
CONTENT_STRUCTURE_CHOICES = [
('cluster_hub', 'Cluster Hub'),
('landing_page', 'Landing Page'),
('pillar_page', 'Pillar Page'),
('supporting_page', 'Supporting Page'),
]
CONTENT_TYPE_CHOICES = [
('blog_post', 'Blog Post'),
('article', 'Article'),
('guide', 'Guide'),
('tutorial', 'Tutorial'),
]
idea_title = models.CharField(max_length=255, db_index=True)
description = models.TextField(blank=True, null=True)
content_structure = models.CharField(max_length=50, choices=CONTENT_STRUCTURE_CHOICES, default='blog_post')
content_type = models.CharField(max_length=50, choices=CONTENT_TYPE_CHOICES, default='blog_post')
target_keywords = models.CharField(max_length=500, blank=True) # Comma-separated keywords (legacy)
keyword_objects = models.ManyToManyField(
'Keywords',
blank=True,
related_name='content_ideas',
help_text="Individual keywords linked to this content idea"
)
keyword_cluster = models.ForeignKey(
Clusters,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name='ideas',
limit_choices_to={'sector': models.F('sector')}
)
status = models.CharField(max_length=50, choices=STATUS_CHOICES, default='new')
estimated_word_count = models.IntegerField(default=1000)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'planner'
db_table = 'igny8_content_ideas'
ordering = ['-created_at']
verbose_name = 'Content Idea'
verbose_name_plural = 'Content Ideas'
indexes = [
models.Index(fields=['idea_title']),
models.Index(fields=['status']),
models.Index(fields=['keyword_cluster']),
models.Index(fields=['content_structure']),
models.Index(fields=['site', 'sector']),
]
def __str__(self):
return self.idea_title

View File

@@ -0,0 +1,4 @@
"""
Planning services
"""

View File

@@ -0,0 +1,88 @@
"""
Clustering Service
Handles keyword clustering business logic
"""
import logging
from igny8_core.business.planning.models import Keywords
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
logger = logging.getLogger(__name__)
class ClusteringService:
"""Service for keyword clustering operations"""
def __init__(self):
self.credit_service = CreditService()
def cluster_keywords(self, keyword_ids, account, sector_id=None):
"""
Cluster keywords using AI.
Args:
keyword_ids: List of keyword IDs
account: Account instance
sector_id: Optional sector ID
Returns:
dict: Result with success status and data
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
# Validate input
if not keyword_ids:
return {
'success': False,
'error': 'No keyword IDs provided'
}
if len(keyword_ids) > 20:
return {
'success': False,
'error': 'Maximum 20 keywords allowed for clustering'
}
# Check credits (fixed cost per clustering operation)
try:
self.credit_service.check_credits(account, 'clustering')
except InsufficientCreditsError:
raise
# Delegate to AI task
from igny8_core.ai.tasks import run_ai_task
payload = {
'ids': keyword_ids,
'sector_id': sector_id
}
try:
if hasattr(run_ai_task, 'delay'):
# Celery available - queue async
task = run_ai_task.delay(
function_name='auto_cluster',
payload=payload,
account_id=account.id
)
return {
'success': True,
'task_id': str(task.id),
'message': 'Clustering started'
}
else:
# Celery not available - execute synchronously
result = run_ai_task(
function_name='auto_cluster',
payload=payload,
account_id=account.id
)
return result
except Exception as e:
logger.error(f"Error in cluster_keywords: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}

View File

@@ -0,0 +1,90 @@
"""
Ideas Service
Handles content ideas generation business logic
"""
import logging
from igny8_core.business.planning.models import Clusters
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
logger = logging.getLogger(__name__)
class IdeasService:
"""Service for content ideas generation operations"""
def __init__(self):
self.credit_service = CreditService()
def generate_ideas(self, cluster_ids, account):
"""
Generate content ideas from clusters.
Args:
cluster_ids: List of cluster IDs
account: Account instance
Returns:
dict: Result with success status and data
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
# Validate input
if not cluster_ids:
return {
'success': False,
'error': 'No cluster IDs provided'
}
if len(cluster_ids) > 10:
return {
'success': False,
'error': 'Maximum 10 clusters allowed for idea generation'
}
# Get clusters to count ideas
clusters = Clusters.objects.filter(id__in=cluster_ids, account=account)
idea_count = len(cluster_ids)
# Check credits
try:
self.credit_service.check_credits(account, 'idea_generation', idea_count)
except InsufficientCreditsError:
raise
# Delegate to AI task
from igny8_core.ai.tasks import run_ai_task
payload = {
'ids': cluster_ids
}
try:
if hasattr(run_ai_task, 'delay'):
# Celery available - queue async
task = run_ai_task.delay(
function_name='auto_generate_ideas',
payload=payload,
account_id=account.id
)
return {
'success': True,
'task_id': str(task.id),
'message': 'Idea generation started'
}
else:
# Celery not available - execute synchronously
result = run_ai_task(
function_name='auto_generate_ideas',
payload=payload,
account_id=account.id
)
return result
except Exception as e:
logger.error(f"Error in generate_ideas: {str(e)}", exc_info=True)
return {
'success': False,
'error': str(e)
}

View File

@@ -0,0 +1,5 @@
"""
Publishing Domain
Phase 5: Sites Renderer & Publishing
"""

View File

@@ -0,0 +1,12 @@
"""
Publishing App Configuration
"""
from django.apps import AppConfig
class PublishingConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'igny8_core.business.publishing'
label = 'publishing'
verbose_name = 'Publishing'

View File

@@ -0,0 +1,98 @@
# Generated manually for Phase 5: Publishing System
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
('igny8_core_auth', '0014_remove_plan_operation_limits_phase0'),
('site_building', '0001_initial'),
('writer', '0009_add_content_site_source_fields'),
]
operations = [
migrations.CreateModel(
name='PublishingRecord',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('destination', models.CharField(db_index=True, help_text="Destination platform: 'wordpress', 'sites', 'shopify'", max_length=50)),
('destination_id', models.CharField(blank=True, help_text='External ID in destination platform', max_length=255, null=True)),
('destination_url', models.URLField(blank=True, help_text='URL of published content/site', null=True)),
('status', models.CharField(choices=[('pending', 'Pending'), ('publishing', 'Publishing'), ('published', 'Published'), ('failed', 'Failed')], db_index=True, default='pending', max_length=20)),
('published_at', models.DateTimeField(blank=True, null=True)),
('error_message', models.TextField(blank=True, null=True)),
('metadata', models.JSONField(default=dict, help_text='Platform-specific metadata')),
('account', models.ForeignKey(db_column='tenant_id', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_set', to='igny8_core_auth.tenant')),
('content', models.ForeignKey(blank=True, help_text='Content being published (if publishing content)', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='publishing_records', to='writer.content')),
('site', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_set', to='igny8_core_auth.site')),
('sector', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_set', to='igny8_core_auth.sector')),
('site_blueprint', models.ForeignKey(blank=True, help_text='Site blueprint being published (if publishing site)', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='publishing_records', to='site_building.siteblueprint')),
],
options={
'db_table': 'igny8_publishing_records',
'ordering': ['-created_at'],
},
),
migrations.CreateModel(
name='DeploymentRecord',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('version', models.IntegerField(help_text='Blueprint version being deployed')),
('deployed_version', models.IntegerField(blank=True, help_text='Currently deployed version (after successful deployment)', null=True)),
('status', models.CharField(choices=[('pending', 'Pending'), ('deploying', 'Deploying'), ('deployed', 'Deployed'), ('failed', 'Failed'), ('rolled_back', 'Rolled Back')], db_index=True, default='pending', max_length=20)),
('deployed_at', models.DateTimeField(blank=True, null=True)),
('deployment_url', models.URLField(blank=True, help_text='Public URL of deployed site', null=True)),
('error_message', models.TextField(blank=True, null=True)),
('metadata', models.JSONField(default=dict, help_text='Deployment metadata (build info, file paths, etc.)')),
('account', models.ForeignKey(db_column='tenant_id', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_set', to='igny8_core_auth.tenant')),
('site', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_set', to='igny8_core_auth.site')),
('sector', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_set', to='igny8_core_auth.sector')),
('site_blueprint', models.ForeignKey(help_text='Site blueprint being deployed', on_delete=django.db.models.deletion.CASCADE, related_name='deployments', to='site_building.siteblueprint')),
],
options={
'db_table': 'igny8_deployment_records',
'ordering': ['-created_at'],
},
),
migrations.AddIndex(
model_name='publishingrecord',
index=models.Index(fields=['destination', 'status'], name='igny8_publi_destina_123abc_idx'),
),
migrations.AddIndex(
model_name='publishingrecord',
index=models.Index(fields=['content', 'destination'], name='igny8_publi_content_456def_idx'),
),
migrations.AddIndex(
model_name='publishingrecord',
index=models.Index(fields=['site_blueprint', 'destination'], name='igny8_publi_site_bl_789ghi_idx'),
),
migrations.AddIndex(
model_name='publishingrecord',
index=models.Index(fields=['account', 'status'], name='igny8_publi_account_012jkl_idx'),
),
migrations.AddIndex(
model_name='deploymentrecord',
index=models.Index(fields=['site_blueprint', 'status'], name='igny8_deplo_site_bl_345mno_idx'),
),
migrations.AddIndex(
model_name='deploymentrecord',
index=models.Index(fields=['site_blueprint', 'version'], name='igny8_deplo_site_bl_678pqr_idx'),
),
migrations.AddIndex(
model_name='deploymentrecord',
index=models.Index(fields=['status'], name='igny8_deplo_status_901stu_idx'),
),
migrations.AddIndex(
model_name='deploymentrecord',
index=models.Index(fields=['account', 'status'], name='igny8_deplo_account_234vwx_idx'),
),
]

View File

@@ -0,0 +1,4 @@
"""
Publishing Migrations
"""

View File

@@ -0,0 +1,159 @@
"""
Publishing Models
Phase 5: Sites Renderer & Publishing
"""
from django.db import models
from igny8_core.auth.models import SiteSectorBaseModel
class PublishingRecord(SiteSectorBaseModel):
"""
Track content publishing to various destinations.
"""
STATUS_CHOICES = [
('pending', 'Pending'),
('publishing', 'Publishing'),
('published', 'Published'),
('failed', 'Failed'),
]
# Content or SiteBlueprint reference (one must be set)
content = models.ForeignKey(
'content.Content',
on_delete=models.CASCADE,
null=True,
blank=True,
related_name='publishing_records',
help_text="Content being published (if publishing content)"
)
site_blueprint = models.ForeignKey(
'site_building.SiteBlueprint',
on_delete=models.CASCADE,
null=True,
blank=True,
related_name='publishing_records',
help_text="Site blueprint being published (if publishing site)"
)
# Destination information
destination = models.CharField(
max_length=50,
db_index=True,
help_text="Destination platform: 'wordpress', 'sites', 'shopify'"
)
destination_id = models.CharField(
max_length=255,
blank=True,
null=True,
help_text="External ID in destination platform"
)
destination_url = models.URLField(
blank=True,
null=True,
help_text="URL of published content/site"
)
# Status tracking
status = models.CharField(
max_length=20,
choices=STATUS_CHOICES,
default='pending',
db_index=True
)
published_at = models.DateTimeField(null=True, blank=True)
error_message = models.TextField(blank=True, null=True)
# Metadata
metadata = models.JSONField(
default=dict,
help_text="Platform-specific metadata"
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'publishing'
db_table = 'igny8_publishing_records'
ordering = ['-created_at']
indexes = [
models.Index(fields=['destination', 'status']),
models.Index(fields=['content', 'destination']),
models.Index(fields=['site_blueprint', 'destination']),
models.Index(fields=['account', 'status']),
]
def __str__(self):
target = self.content or self.site_blueprint
return f"{target}{self.destination} ({self.get_status_display()})"
class DeploymentRecord(SiteSectorBaseModel):
"""
Track site deployments to Sites renderer.
"""
STATUS_CHOICES = [
('pending', 'Pending'),
('deploying', 'Deploying'),
('deployed', 'Deployed'),
('failed', 'Failed'),
('rolled_back', 'Rolled Back'),
]
site_blueprint = models.ForeignKey(
'site_building.SiteBlueprint',
on_delete=models.CASCADE,
related_name='deployments',
help_text="Site blueprint being deployed"
)
# Version tracking
version = models.IntegerField(
help_text="Blueprint version being deployed"
)
deployed_version = models.IntegerField(
null=True,
blank=True,
help_text="Currently deployed version (after successful deployment)"
)
# Status tracking
status = models.CharField(
max_length=20,
choices=STATUS_CHOICES,
default='pending',
db_index=True
)
deployed_at = models.DateTimeField(null=True, blank=True)
deployment_url = models.URLField(
blank=True,
null=True,
help_text="Public URL of deployed site"
)
error_message = models.TextField(blank=True, null=True)
# Metadata
metadata = models.JSONField(
default=dict,
help_text="Deployment metadata (build info, file paths, etc.)"
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'publishing'
db_table = 'igny8_deployment_records'
ordering = ['-created_at']
indexes = [
models.Index(fields=['site_blueprint', 'status']),
models.Index(fields=['site_blueprint', 'version']),
models.Index(fields=['status']),
models.Index(fields=['account', 'status']),
]
def __str__(self):
return f"{self.site_blueprint.name} v{self.version} ({self.get_status_display()})"

View File

@@ -0,0 +1,4 @@
"""
Publishing Services
"""

View File

@@ -0,0 +1,4 @@
"""
Publishing Adapters
"""

View File

@@ -0,0 +1,203 @@
"""
Sites Renderer Adapter
Phase 5: Sites Renderer & Publishing
Adapter for deploying sites to IGNY8 Sites renderer.
"""
import logging
import json
import os
from typing import Dict, Any
from pathlib import Path
from igny8_core.business.site_building.models import SiteBlueprint
from igny8_core.business.publishing.models import DeploymentRecord
logger = logging.getLogger(__name__)
class SitesRendererAdapter:
"""
Adapter for deploying sites to IGNY8 Sites renderer.
Writes site definitions to filesystem for Sites container to serve.
"""
def __init__(self):
self.sites_data_path = os.getenv('SITES_DATA_PATH', '/data/app/sites-data')
def deploy(self, site_blueprint: SiteBlueprint) -> Dict[str, Any]:
"""
Deploy site blueprint to Sites renderer.
Args:
site_blueprint: SiteBlueprint instance to deploy
Returns:
dict: Deployment result with status and deployment record
"""
try:
# Create deployment record
deployment = DeploymentRecord.objects.create(
account=site_blueprint.account,
site=site_blueprint.site,
sector=site_blueprint.sector,
site_blueprint=site_blueprint,
version=site_blueprint.version,
status='deploying'
)
# Build site definition
site_definition = self._build_site_definition(site_blueprint)
# Write to filesystem
deployment_path = self._write_site_definition(
site_blueprint,
site_definition,
deployment.version
)
# Update deployment record
deployment.status = 'deployed'
deployment.deployed_version = site_blueprint.version
deployment.deployment_url = self._get_deployment_url(site_blueprint)
deployment.metadata = {
'deployment_path': str(deployment_path),
'site_definition': site_definition
}
deployment.save()
# Update blueprint
site_blueprint.deployed_version = site_blueprint.version
site_blueprint.status = 'deployed'
site_blueprint.save(update_fields=['deployed_version', 'status', 'updated_at'])
logger.info(
f"[SitesRendererAdapter] Successfully deployed site {site_blueprint.id} v{deployment.version}"
)
return {
'success': True,
'deployment_id': deployment.id,
'version': deployment.version,
'deployment_url': deployment.deployment_url,
'deployment_path': str(deployment_path)
}
except Exception as e:
logger.error(
f"[SitesRendererAdapter] Error deploying site {site_blueprint.id}: {str(e)}",
exc_info=True
)
# Update deployment record with error
if 'deployment' in locals():
deployment.status = 'failed'
deployment.error_message = str(e)
deployment.save()
return {
'success': False,
'error': str(e)
}
def _build_site_definition(self, site_blueprint: SiteBlueprint) -> Dict[str, Any]:
"""
Build site definition JSON from blueprint.
Args:
site_blueprint: SiteBlueprint instance
Returns:
dict: Site definition structure
"""
# Get all pages
pages = []
for page in site_blueprint.pages.all().order_by('order'):
pages.append({
'id': page.id,
'slug': page.slug,
'title': page.title,
'type': page.type,
'blocks': page.blocks_json,
'status': page.status,
})
# Build site definition
definition = {
'id': site_blueprint.id,
'name': site_blueprint.name,
'description': site_blueprint.description,
'version': site_blueprint.version,
'layout': site_blueprint.structure_json.get('layout', 'default'),
'theme': site_blueprint.structure_json.get('theme', {}),
'navigation': site_blueprint.structure_json.get('navigation', []),
'pages': pages,
'config': site_blueprint.config_json,
'created_at': site_blueprint.created_at.isoformat(),
'updated_at': site_blueprint.updated_at.isoformat(),
}
return definition
def _write_site_definition(
self,
site_blueprint: SiteBlueprint,
site_definition: Dict[str, Any],
version: int
) -> Path:
"""
Write site definition to filesystem.
Args:
site_blueprint: SiteBlueprint instance
site_definition: Site definition dict
version: Version number
Returns:
Path: Deployment path
"""
# Build path: /data/app/sites-data/clients/{site_id}/v{version}/
site_id = site_blueprint.site.id
deployment_dir = Path(self.sites_data_path) / 'clients' / str(site_id) / f'v{version}'
deployment_dir.mkdir(parents=True, exist_ok=True)
# Write site.json
site_json_path = deployment_dir / 'site.json'
with open(site_json_path, 'w', encoding='utf-8') as f:
json.dump(site_definition, f, indent=2, ensure_ascii=False)
# Write pages
pages_dir = deployment_dir / 'pages'
pages_dir.mkdir(exist_ok=True)
for page in site_definition.get('pages', []):
page_json_path = pages_dir / f"{page['slug']}.json"
with open(page_json_path, 'w', encoding='utf-8') as f:
json.dump(page, f, indent=2, ensure_ascii=False)
# Ensure assets directory exists
assets_dir = deployment_dir / 'assets'
assets_dir.mkdir(exist_ok=True)
(assets_dir / 'images').mkdir(exist_ok=True)
(assets_dir / 'documents').mkdir(exist_ok=True)
(assets_dir / 'media').mkdir(exist_ok=True)
logger.info(f"[SitesRendererAdapter] Wrote site definition to {deployment_dir}")
return deployment_dir
def _get_deployment_url(self, site_blueprint: SiteBlueprint) -> str:
"""
Get deployment URL for site.
Args:
site_blueprint: SiteBlueprint instance
Returns:
str: Deployment URL
"""
# TODO: Implement URL generation based on site configuration
# For now, return placeholder
site_id = site_blueprint.site.id
return f"https://{site_id}.igny8.com" # Placeholder

View File

@@ -0,0 +1,140 @@
"""
Deployment Service
Phase 5: Sites Renderer & Publishing
Manages deployment lifecycle for sites.
"""
import logging
from typing import Optional
from igny8_core.business.site_building.models import SiteBlueprint
from igny8_core.business.publishing.models import DeploymentRecord
logger = logging.getLogger(__name__)
class DeploymentService:
"""
Service for managing site deployment lifecycle.
"""
def get_status(self, site_blueprint: SiteBlueprint) -> Optional[DeploymentRecord]:
"""
Get current deployment status for a site.
Args:
site_blueprint: SiteBlueprint instance
Returns:
DeploymentRecord or None
"""
return DeploymentRecord.objects.filter(
site_blueprint=site_blueprint,
status='deployed'
).order_by('-deployed_at').first()
def get_latest_deployment(
self,
site_blueprint: SiteBlueprint
) -> Optional[DeploymentRecord]:
"""
Get latest deployment record (any status).
Args:
site_blueprint: SiteBlueprint instance
Returns:
DeploymentRecord or None
"""
return DeploymentRecord.objects.filter(
site_blueprint=site_blueprint
).order_by('-created_at').first()
def rollback(
self,
site_blueprint: SiteBlueprint,
target_version: int
) -> dict:
"""
Rollback site to a previous version.
Args:
site_blueprint: SiteBlueprint instance
target_version: Version to rollback to
Returns:
dict: Rollback result
"""
try:
# Find deployment record for target version
target_deployment = DeploymentRecord.objects.filter(
site_blueprint=site_blueprint,
version=target_version,
status='deployed'
).first()
if not target_deployment:
return {
'success': False,
'error': f'Deployment for version {target_version} not found'
}
# Create new deployment record for rollback
rollback_deployment = DeploymentRecord.objects.create(
account=site_blueprint.account,
site=site_blueprint.site,
sector=site_blueprint.sector,
site_blueprint=site_blueprint,
version=target_version,
status='deployed',
deployed_version=target_version,
deployment_url=target_deployment.deployment_url,
metadata={
'rollback_from': site_blueprint.version,
'rollback_to': target_version
}
)
# Update blueprint
site_blueprint.deployed_version = target_version
site_blueprint.save(update_fields=['deployed_version', 'updated_at'])
logger.info(
f"[DeploymentService] Rolled back site {site_blueprint.id} to version {target_version}"
)
return {
'success': True,
'deployment_id': rollback_deployment.id,
'version': target_version
}
except Exception as e:
logger.error(
f"[DeploymentService] Error rolling back site {site_blueprint.id}: {str(e)}",
exc_info=True
)
return {
'success': False,
'error': str(e)
}
def list_deployments(
self,
site_blueprint: SiteBlueprint
) -> list:
"""
List all deployments for a site.
Args:
site_blueprint: SiteBlueprint instance
Returns:
list: List of DeploymentRecord instances
"""
return list(
DeploymentRecord.objects.filter(
site_blueprint=site_blueprint
).order_by('-created_at')
)

View File

@@ -0,0 +1,187 @@
"""
Publisher Service
Phase 5: Sites Renderer & Publishing
Main publishing orchestrator for content and sites.
"""
import logging
from typing import Optional, List, Dict, Any
from igny8_core.business.publishing.models import PublishingRecord, DeploymentRecord
from igny8_core.business.site_building.models import SiteBlueprint
logger = logging.getLogger(__name__)
class PublisherService:
"""
Main publishing service for content and sites.
Routes to appropriate adapters based on destination.
"""
def __init__(self):
self.adapters = {}
self._load_adapters()
def _load_adapters(self):
"""Lazy load adapters to avoid circular imports"""
pass # Will be implemented when adapters are created
def publish_to_sites(self, site_blueprint: SiteBlueprint) -> Dict[str, Any]:
"""
Publish site to Sites renderer.
Args:
site_blueprint: SiteBlueprint instance to deploy
Returns:
dict: Deployment result with status and deployment record
"""
from igny8_core.business.publishing.services.adapters.sites_renderer_adapter import SitesRendererAdapter
adapter = SitesRendererAdapter()
return adapter.deploy(site_blueprint)
def get_deployment_status(self, site_blueprint: SiteBlueprint) -> Optional[DeploymentRecord]:
"""
Get deployment status for a site.
Args:
site_blueprint: SiteBlueprint instance
Returns:
DeploymentRecord or None
"""
from igny8_core.business.publishing.services.deployment_service import DeploymentService
service = DeploymentService()
return service.get_status(site_blueprint)
def publish_content(
self,
content_id: int,
destinations: List[str],
account
) -> Dict[str, Any]:
"""
Publish content to multiple destinations.
Args:
content_id: Content ID to publish
destinations: List of destination platforms ('wordpress', 'sites', 'shopify')
account: Account instance
Returns:
dict: Publishing results per destination
"""
from igny8_core.business.content.models import Content
try:
content = Content.objects.get(id=content_id, account=account)
except Content.DoesNotExist:
return {
'success': False,
'error': f'Content {content_id} not found'
}
results = []
for destination in destinations:
try:
result = self._publish_to_destination(content, destination, account)
results.append(result)
except Exception as e:
logger.error(
f"Error publishing content {content_id} to {destination}: {str(e)}",
exc_info=True
)
results.append({
'destination': destination,
'success': False,
'error': str(e)
})
return {
'success': all(r.get('success', False) for r in results),
'results': results
}
def _publish_to_destination(
self,
content,
destination: str,
account
) -> Dict[str, Any]:
"""
Publish content to a specific destination.
Args:
content: Content instance
destination: Destination platform name
account: Account instance
Returns:
dict: Publishing result
"""
# Create publishing record
record = PublishingRecord.objects.create(
account=account,
site=content.site,
sector=content.sector,
content=content,
destination=destination,
status='pending'
)
try:
# Get adapter for destination
adapter = self._get_adapter(destination)
if not adapter:
raise ValueError(f"No adapter found for destination: {destination}")
# Publish via adapter
result = adapter.publish(content, {'account': account})
# Update record
record.status = 'published' if result.get('success') else 'failed'
record.destination_id = result.get('external_id')
record.destination_url = result.get('url')
record.published_at = result.get('published_at')
record.error_message = result.get('error')
record.metadata = result.get('metadata', {})
record.save()
return {
'destination': destination,
'success': result.get('success', False),
'publishing_record_id': record.id,
'external_id': result.get('external_id'),
'url': result.get('url')
}
except Exception as e:
record.status = 'failed'
record.error_message = str(e)
record.save()
raise
def _get_adapter(self, destination: str):
"""
Get adapter for destination platform.
Args:
destination: Platform name ('wordpress', 'sites', 'shopify')
Returns:
Adapter instance or None
"""
# Lazy import to avoid circular dependencies
if destination == 'sites':
from igny8_core.business.publishing.services.adapters.sites_renderer_adapter import SitesRendererAdapter
return SitesRendererAdapter()
elif destination == 'wordpress':
# Will be implemented in Phase 6
return None
elif destination == 'shopify':
# Will be implemented in Phase 6
return None
return None

View File

@@ -0,0 +1,6 @@
"""
Site Building Business Logic
Phase 3: Site Builder
"""
default_app_config = 'igny8_core.business.site_building.apps.SiteBuildingConfig'

View File

@@ -0,0 +1,9 @@
from django.apps import AppConfig
class SiteBuildingConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'igny8_core.business.site_building'
verbose_name = 'Site Building'

View File

@@ -0,0 +1,95 @@
from django.db import migrations, models
import django.db.models.deletion
import django.core.validators
class Migration(migrations.Migration):
initial = True
dependencies = [
('igny8_core_auth', '0014_remove_plan_operation_limits_phase0'),
('writer', '0009_add_content_site_source_fields'),
]
operations = [
migrations.CreateModel(
name='SiteBlueprint',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('name', models.CharField(help_text='Site name', max_length=255)),
('description', models.TextField(blank=True, help_text='Site description', null=True)),
('config_json', models.JSONField(default=dict, help_text='Wizard configuration: business_type, style, objectives, etc.')),
('structure_json', models.JSONField(default=dict, help_text='AI-generated structure: pages, layout, theme, etc.')),
('status', models.CharField(choices=[('draft', 'Draft'), ('generating', 'Generating'), ('ready', 'Ready'), ('deployed', 'Deployed')], db_index=True, default='draft', help_text='Blueprint status', max_length=20)),
('hosting_type', models.CharField(choices=[('igny8_sites', 'IGNY8 Sites'), ('wordpress', 'WordPress'), ('shopify', 'Shopify'), ('multi', 'Multiple Destinations')], default='igny8_sites', help_text='Target hosting platform', max_length=50)),
('version', models.IntegerField(default=1, help_text='Blueprint version', validators=[django.core.validators.MinValueValidator(1)])),
('deployed_version', models.IntegerField(blank=True, help_text='Currently deployed version', null=True)),
('account', models.ForeignKey(db_column='tenant_id', on_delete=django.db.models.deletion.CASCADE, related_name='siteblueprint_set', to='igny8_core_auth.tenant')),
('sector', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='siteblueprint_set', to='igny8_core_auth.sector')),
('site', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='siteblueprint_set', to='igny8_core_auth.site')),
],
options={
'verbose_name': 'Site Blueprint',
'verbose_name_plural': 'Site Blueprints',
'db_table': 'igny8_site_blueprints',
'ordering': ['-created_at'],
},
),
migrations.CreateModel(
name='PageBlueprint',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('slug', models.SlugField(help_text='Page URL slug', max_length=255)),
('title', models.CharField(help_text='Page title', max_length=255)),
('type', models.CharField(choices=[('home', 'Home'), ('about', 'About'), ('services', 'Services'), ('products', 'Products'), ('blog', 'Blog'), ('contact', 'Contact'), ('custom', 'Custom')], default='custom', help_text='Page type', max_length=50)),
('blocks_json', models.JSONField(default=list, help_text="Page content blocks: [{'type': 'hero', 'data': {...}}, ...]")),
('status', models.CharField(choices=[('draft', 'Draft'), ('generating', 'Generating'), ('ready', 'Ready')], db_index=True, default='draft', help_text='Page status', max_length=20)),
('order', models.IntegerField(default=0, help_text='Page order in navigation')),
('account', models.ForeignKey(db_column='tenant_id', on_delete=django.db.models.deletion.CASCADE, related_name='pageblueprint_set', to='igny8_core_auth.tenant')),
('sector', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='pageblueprint_set', to='igny8_core_auth.sector')),
('site', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='pageblueprint_set', to='igny8_core_auth.site')),
('site_blueprint', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='pages', to='site_building.siteblueprint')),
],
options={
'verbose_name': 'Page Blueprint',
'verbose_name_plural': 'Page Blueprints',
'db_table': 'igny8_page_blueprints',
'ordering': ['order', 'created_at'],
'unique_together': {('site_blueprint', 'slug')},
},
),
migrations.AddIndex(
model_name='siteblueprint',
index=models.Index(fields=['status'], name='igny8_site__status_247ddc_idx'),
),
migrations.AddIndex(
model_name='siteblueprint',
index=models.Index(fields=['hosting_type'], name='igny8_site__hosting_c4bb41_idx'),
),
migrations.AddIndex(
model_name='siteblueprint',
index=models.Index(fields=['site', 'sector'], name='igny8_site__site_id__5f0a4e_idx'),
),
migrations.AddIndex(
model_name='siteblueprint',
index=models.Index(fields=['account', 'status'], name='igny8_site__account__38f18a_idx'),
),
migrations.AddIndex(
model_name='pageblueprint',
index=models.Index(fields=['site_blueprint', 'status'], name='igny8_page__site_bl_1b5d8b_idx'),
),
migrations.AddIndex(
model_name='pageblueprint',
index=models.Index(fields=['type'], name='igny8_page__type_b11552_idx'),
),
migrations.AddIndex(
model_name='pageblueprint',
index=models.Index(fields=['site_blueprint', 'order'], name='igny8_page__site_bl_7a77d7_idx'),
),
]

View File

@@ -0,0 +1,2 @@

View File

@@ -0,0 +1,168 @@
"""
Site Builder Models
Phase 3: Site Builder
"""
from django.db import models
from django.core.validators import MinValueValidator
from igny8_core.auth.models import SiteSectorBaseModel
class SiteBlueprint(SiteSectorBaseModel):
"""
Site Blueprint model for storing AI-generated site structures.
"""
STATUS_CHOICES = [
('draft', 'Draft'),
('generating', 'Generating'),
('ready', 'Ready'),
('deployed', 'Deployed'),
]
HOSTING_TYPE_CHOICES = [
('igny8_sites', 'IGNY8 Sites'),
('wordpress', 'WordPress'),
('shopify', 'Shopify'),
('multi', 'Multiple Destinations'),
]
name = models.CharField(max_length=255, help_text="Site name")
description = models.TextField(blank=True, null=True, help_text="Site description")
# Site configuration (from wizard)
config_json = models.JSONField(
default=dict,
help_text="Wizard configuration: business_type, style, objectives, etc."
)
# Generated structure (from AI)
structure_json = models.JSONField(
default=dict,
help_text="AI-generated structure: pages, layout, theme, etc."
)
# Status tracking
status = models.CharField(
max_length=20,
choices=STATUS_CHOICES,
default='draft',
db_index=True,
help_text="Blueprint status"
)
# Hosting configuration
hosting_type = models.CharField(
max_length=50,
choices=HOSTING_TYPE_CHOICES,
default='igny8_sites',
help_text="Target hosting platform"
)
# Version tracking
version = models.IntegerField(default=1, validators=[MinValueValidator(1)], help_text="Blueprint version")
deployed_version = models.IntegerField(null=True, blank=True, help_text="Currently deployed version")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'site_building'
db_table = 'igny8_site_blueprints'
ordering = ['-created_at']
verbose_name = 'Site Blueprint'
verbose_name_plural = 'Site Blueprints'
indexes = [
models.Index(fields=['status']),
models.Index(fields=['hosting_type']),
models.Index(fields=['site', 'sector']),
models.Index(fields=['account', 'status']),
]
def __str__(self):
return f"{self.name} ({self.get_status_display()})"
class PageBlueprint(SiteSectorBaseModel):
"""
Page Blueprint model for storing individual page definitions.
"""
PAGE_TYPE_CHOICES = [
('home', 'Home'),
('about', 'About'),
('services', 'Services'),
('products', 'Products'),
('blog', 'Blog'),
('contact', 'Contact'),
('custom', 'Custom'),
]
STATUS_CHOICES = [
('draft', 'Draft'),
('generating', 'Generating'),
('ready', 'Ready'),
]
site_blueprint = models.ForeignKey(
SiteBlueprint,
on_delete=models.CASCADE,
related_name='pages',
help_text="The site blueprint this page belongs to"
)
slug = models.SlugField(max_length=255, help_text="Page URL slug")
title = models.CharField(max_length=255, help_text="Page title")
# Page type
type = models.CharField(
max_length=50,
choices=PAGE_TYPE_CHOICES,
default='custom',
help_text="Page type"
)
# Page content (blocks)
blocks_json = models.JSONField(
default=list,
help_text="Page content blocks: [{'type': 'hero', 'data': {...}}, ...]"
)
# Status
status = models.CharField(
max_length=20,
choices=STATUS_CHOICES,
default='draft',
db_index=True,
help_text="Page status"
)
# Order
order = models.IntegerField(default=0, help_text="Page order in navigation")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'site_building'
db_table = 'igny8_page_blueprints'
ordering = ['order', 'created_at']
verbose_name = 'Page Blueprint'
verbose_name_plural = 'Page Blueprints'
unique_together = [['site_blueprint', 'slug']]
indexes = [
models.Index(fields=['site_blueprint', 'status']),
models.Index(fields=['type']),
models.Index(fields=['site_blueprint', 'order']),
]
def save(self, *args, **kwargs):
"""Automatically set account, site, and sector from site_blueprint"""
if self.site_blueprint:
self.account = self.site_blueprint.account
self.site = self.site_blueprint.site
self.sector = self.site_blueprint.sector
super().save(*args, **kwargs)
def __str__(self):
return f"{self.title} ({self.site_blueprint.name})"

View File

@@ -0,0 +1,13 @@
"""
Site Building Services
"""
from igny8_core.business.site_building.services.file_management_service import SiteBuilderFileService
from igny8_core.business.site_building.services.structure_generation_service import StructureGenerationService
from igny8_core.business.site_building.services.page_generation_service import PageGenerationService
__all__ = [
'SiteBuilderFileService',
'StructureGenerationService',
'PageGenerationService',
]

View File

@@ -0,0 +1,264 @@
"""
Site File Management Service
Manages file uploads, deletions, and access control for site assets
"""
import logging
import os
from pathlib import Path
from typing import List, Dict, Optional
from django.core.exceptions import PermissionDenied, ValidationError
from igny8_core.auth.models import User, Site
logger = logging.getLogger(__name__)
# Base path for site files
SITES_DATA_BASE = Path('/data/app/sites-data/clients')
class SiteBuilderFileService:
"""Service for managing site files and assets"""
def __init__(self):
self.base_path = SITES_DATA_BASE
self.max_file_size = 10 * 1024 * 1024 # 10MB per file
self.max_storage_per_site = 100 * 1024 * 1024 # 100MB per site
def get_user_accessible_sites(self, user: User) -> List[Site]:
"""
Get sites user can access for file management.
Args:
user: User instance
Returns:
List of Site instances user can access
"""
# Owner/Admin: Full access to all account sites
if user.is_owner_or_admin():
return Site.objects.filter(account=user.account, is_active=True)
# Editor/Viewer: Access to granted sites (via SiteUserAccess)
# TODO: Implement SiteUserAccess check when available
return Site.objects.filter(account=user.account, is_active=True)
def check_file_access(self, user: User, site_id: int) -> bool:
"""
Check if user can access site's files.
Args:
user: User instance
site_id: Site ID
Returns:
True if user has access, False otherwise
"""
accessible_sites = self.get_user_accessible_sites(user)
return any(site.id == site_id for site in accessible_sites)
def get_site_files_path(self, site_id: int, version: int = 1) -> Path:
"""
Get site's files directory path.
Args:
site_id: Site ID
version: Site version (default: 1)
Returns:
Path object for site files directory
"""
return self.base_path / str(site_id) / f"v{version}" / "assets"
def check_storage_quota(self, site_id: int, file_size: int) -> bool:
"""
Check if site has enough storage quota.
Args:
site_id: Site ID
file_size: Size of file to upload in bytes
Returns:
True if quota available, False otherwise
"""
site_path = self.get_site_files_path(site_id)
# Calculate current storage usage
current_usage = self._calculate_storage_usage(site_path)
# Check if adding file would exceed quota
return (current_usage + file_size) <= self.max_storage_per_site
def _calculate_storage_usage(self, site_path: Path) -> int:
"""Calculate current storage usage for a site"""
if not site_path.exists():
return 0
total_size = 0
for file_path in site_path.rglob('*'):
if file_path.is_file():
total_size += file_path.stat().st_size
return total_size
def upload_file(
self,
user: User,
site_id: int,
file,
folder: str = 'images',
version: int = 1
) -> Dict:
"""
Upload file to site's assets folder.
Args:
user: User instance
site_id: Site ID
file: Django UploadedFile instance
folder: Subfolder name (images, documents, media)
version: Site version
Returns:
Dict with file_path, file_url, file_size
Raises:
PermissionDenied: If user doesn't have access
ValidationError: If file size exceeds limit or quota exceeded
"""
# Check access
if not self.check_file_access(user, site_id):
raise PermissionDenied("No access to this site")
# Check file size
if file.size > self.max_file_size:
raise ValidationError(f"File size exceeds maximum of {self.max_file_size / 1024 / 1024}MB")
# Check storage quota
if not self.check_storage_quota(site_id, file.size):
raise ValidationError("Storage quota exceeded")
# Get target directory
site_path = self.get_site_files_path(site_id, version)
target_dir = site_path / folder
target_dir.mkdir(parents=True, exist_ok=True)
# Save file
file_path = target_dir / file.name
with open(file_path, 'wb') as f:
for chunk in file.chunks():
f.write(chunk)
# Generate file URL (relative to site assets)
file_url = f"/sites/{site_id}/v{version}/assets/{folder}/{file.name}"
logger.info(f"Uploaded file {file.name} to site {site_id}/{folder}")
return {
'file_path': str(file_path),
'file_url': file_url,
'file_size': file.size,
'folder': folder
}
def delete_file(
self,
user: User,
site_id: int,
file_path: str,
version: int = 1
) -> bool:
"""
Delete file from site's assets.
Args:
user: User instance
site_id: Site ID
file_path: Relative file path (e.g., 'images/photo.jpg')
version: Site version
Returns:
True if deleted, False otherwise
Raises:
PermissionDenied: If user doesn't have access
"""
# Check access
if not self.check_file_access(user, site_id):
raise PermissionDenied("No access to this site")
# Get full file path
site_path = self.get_site_files_path(site_id, version)
full_path = site_path / file_path
# Check if file exists and is within site directory
if not full_path.exists() or not str(full_path).startswith(str(site_path)):
return False
# Delete file
full_path.unlink()
logger.info(f"Deleted file {file_path} from site {site_id}")
return True
def list_files(
self,
user: User,
site_id: int,
folder: Optional[str] = None,
version: int = 1
) -> List[Dict]:
"""
List files in site's assets.
Args:
user: User instance
site_id: Site ID
folder: Optional folder to list (None = all folders)
version: Site version
Returns:
List of file dicts with: name, path, size, folder, url
Raises:
PermissionDenied: If user doesn't have access
"""
# Check access
if not self.check_file_access(user, site_id):
raise PermissionDenied("No access to this site")
site_path = self.get_site_files_path(site_id, version)
if not site_path.exists():
return []
files = []
# List files in specified folder or all folders
if folder:
folder_path = site_path / folder
if folder_path.exists():
files.extend(self._list_directory(folder_path, folder, site_id, version))
else:
# List all folders
for folder_dir in site_path.iterdir():
if folder_dir.is_dir():
files.extend(self._list_directory(folder_dir, folder_dir.name, site_id, version))
return files
def _list_directory(self, directory: Path, folder_name: str, site_id: int, version: int) -> List[Dict]:
"""List files in a directory"""
files = []
for file_path in directory.iterdir():
if file_path.is_file():
file_url = f"/sites/{site_id}/v{version}/assets/{folder_name}/{file_path.name}"
files.append({
'name': file_path.name,
'path': f"{folder_name}/{file_path.name}",
'size': file_path.stat().st_size,
'folder': folder_name,
'url': file_url
})
return files

View File

@@ -0,0 +1,149 @@
"""
Page Generation Service
Leverages the Writer ContentGenerationService to draft page copy for Site Builder blueprints.
"""
import logging
from typing import Optional
from django.db import transaction
from igny8_core.business.content.models import Tasks
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
from igny8_core.business.site_building.models import PageBlueprint
logger = logging.getLogger(__name__)
class PageGenerationService:
"""
Thin wrapper that converts Site Builder pages into writer tasks and reuses the
existing content generation pipeline. This keeps content authoring logic
inside the Writer module while Site Builder focuses on structure.
"""
def __init__(self):
self.content_service = ContentGenerationService()
def generate_page_content(self, page_blueprint: PageBlueprint, force_regenerate: bool = False) -> dict:
"""
Generate (or regenerate) content for a single Site Builder page.
Args:
page_blueprint: Target PageBlueprint instance.
force_regenerate: If True, resets any temporary task data.
"""
if not page_blueprint:
raise ValueError("Page blueprint is required")
task = self._ensure_task(page_blueprint, force_regenerate=force_regenerate)
# Mark page as generating before handing off to Writer pipeline
page_blueprint.status = 'generating'
page_blueprint.save(update_fields=['status', 'updated_at'])
account = page_blueprint.account
logger.info(
"[PageGenerationService] Triggering content generation for page %s (task %s)",
page_blueprint.id,
task.id,
)
return self.content_service.generate_content([task.id], account)
def regenerate_page(self, page_blueprint: PageBlueprint) -> dict:
"""Force regeneration by dropping the cached task metadata."""
return self.generate_page_content(page_blueprint, force_regenerate=True)
# Internal helpers --------------------------------------------------------
def _ensure_task(self, page_blueprint: PageBlueprint, force_regenerate: bool = False) -> Tasks:
"""
Create or reuse a Writer task that mirrors the given page blueprint.
We rely on a deterministic title pattern to keep the mapping lightweight
without introducing new relations/migrations.
"""
title = self._build_task_title(page_blueprint)
task_qs = Tasks.objects.filter(
account=page_blueprint.account,
site=page_blueprint.site,
sector=page_blueprint.sector,
title=title,
)
if force_regenerate:
task_qs.delete()
else:
existing = task_qs.first()
if existing:
return existing
return self._create_task_from_page(page_blueprint, title)
@transaction.atomic
def _create_task_from_page(self, page_blueprint: PageBlueprint, title: str) -> Tasks:
"""Translate blueprint metadata into a Writer task."""
description_parts = [
f"Site Blueprint: {page_blueprint.site_blueprint.name}",
f"Page Type: {page_blueprint.type}",
]
hero_block = self._first_block_heading(page_blueprint)
if hero_block:
description_parts.append(f"Hero/Primary Heading: {hero_block}")
keywords = self._build_keywords_hint(page_blueprint)
task = Tasks.objects.create(
account=page_blueprint.account,
site=page_blueprint.site,
sector=page_blueprint.sector,
title=title,
description="\n".join(filter(None, description_parts)),
keywords=keywords,
content_structure=self._map_content_structure(page_blueprint.type),
content_type='article',
status='queued',
)
logger.info(
"[PageGenerationService] Created writer task %s for page blueprint %s",
task.id,
page_blueprint.id,
)
return task
def _build_task_title(self, page_blueprint: PageBlueprint) -> str:
base = page_blueprint.title or page_blueprint.slug.replace('-', ' ').title()
return f"[Site Builder] {base}"
def _build_keywords_hint(self, page_blueprint: PageBlueprint) -> str:
keywords = []
if page_blueprint.blocks_json:
for block in page_blueprint.blocks_json:
heading = block.get('heading') if isinstance(block, dict) else None
if heading:
keywords.append(heading)
keywords.append(page_blueprint.slug.replace('-', ' '))
return ", ".join(dict.fromkeys(filter(None, keywords)))
def _map_content_structure(self, page_type: Optional[str]) -> str:
if not page_type:
return 'landing_page'
mapping = {
'home': 'landing_page',
'about': 'supporting_page',
'services': 'pillar_page',
'products': 'pillar_page',
'blog': 'cluster_hub',
'contact': 'supporting_page',
}
return mapping.get(page_type.lower(), 'landing_page')
def _first_block_heading(self, page_blueprint: PageBlueprint) -> Optional[str]:
if not page_blueprint.blocks_json:
return None
for block in page_blueprint.blocks_json:
if isinstance(block, dict):
heading = block.get('heading') or block.get('title')
if heading:
return heading
return None

View File

@@ -0,0 +1,122 @@
"""
Structure Generation Service
Triggers the AI workflow that maps business briefs to page blueprints.
"""
import logging
from typing import Any, Dict, List, Optional
from django.utils import timezone
from igny8_core.business.billing.exceptions import InsufficientCreditsError
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.site_building.models import SiteBlueprint
logger = logging.getLogger(__name__)
class StructureGenerationService:
"""Orchestrates AI-powered site structure generation."""
def __init__(self):
self.credit_service = CreditService()
def generate_structure(
self,
site_blueprint: SiteBlueprint,
business_brief: str,
objectives: Optional[List[str]] = None,
style_preferences: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Kick off AI structure generation for a single blueprint.
Args:
site_blueprint: Target blueprint instance.
business_brief: Business description / positioning statement.
objectives: Optional list of goals for the new site.
style_preferences: Optional design/style hints.
metadata: Additional free-form context.
"""
if not site_blueprint:
raise ValueError("Site blueprint is required")
account = site_blueprint.account
objectives = objectives or []
style_preferences = style_preferences or {}
metadata = metadata or {}
logger.info(
"[StructureGenerationService] Starting generation for blueprint %s (account %s)",
site_blueprint.id,
getattr(account, 'id', None),
)
# Ensure the account can afford the request
try:
self.credit_service.check_credits(account, 'site_structure_generation')
except InsufficientCreditsError:
site_blueprint.status = 'draft'
site_blueprint.save(update_fields=['status', 'updated_at'])
raise
# Persist the latest inputs for future regenerations
config = site_blueprint.config_json or {}
config.update({
'business_brief': business_brief,
'objectives': objectives,
'style': style_preferences,
'last_requested_at': timezone.now().isoformat(),
'metadata': metadata,
})
site_blueprint.config_json = config
site_blueprint.status = 'generating'
site_blueprint.save(update_fields=['config_json', 'status', 'updated_at'])
payload = {
'ids': [site_blueprint.id],
'business_brief': business_brief,
'objectives': objectives,
'style': style_preferences,
'metadata': metadata,
}
return self._dispatch_ai_task(payload, account_id=account.id)
# Internal helpers --------------------------------------------------------
def _dispatch_ai_task(self, payload: Dict[str, Any], account_id: int) -> Dict[str, Any]:
from igny8_core.ai.tasks import run_ai_task
try:
if hasattr(run_ai_task, 'delay'):
async_result = run_ai_task.delay(
function_name='generate_site_structure',
payload=payload,
account_id=account_id
)
logger.info(
"[StructureGenerationService] Queued AI task %s for account %s",
async_result.id,
account_id,
)
return {
'success': True,
'task_id': str(async_result.id),
'message': 'Site structure generation queued',
}
# Celery not available run synchronously
logger.warning("[StructureGenerationService] Celery unavailable, running synchronously")
return run_ai_task(
function_name='generate_site_structure',
payload=payload,
account_id=account_id
)
except Exception as exc:
logger.error("Failed to dispatch structure generation: %s", exc, exc_info=True)
return {
'success': False,
'error': str(exc),
}

View File

@@ -0,0 +1,2 @@

View File

@@ -0,0 +1,90 @@
from __future__ import annotations
from decimal import Decimal
from django.test import TestCase
from igny8_core.auth.models import (
Account,
Industry,
IndustrySector,
Plan,
Sector,
Site,
User,
)
from igny8_core.business.site_building.models import PageBlueprint, SiteBlueprint
class SiteBuilderTestBase(TestCase):
"""
Provides a lightweight set of fixtures (account/site/sector/blueprint)
so Site Builder tests can focus on service logic instead of boilerplate.
"""
def setUp(self):
super().setUp()
self.plan = Plan.objects.create(
name='Test Plan',
slug='test-plan',
price=Decimal('0.00'),
included_credits=1000,
)
self.user = User.objects.create_user(
username='blueprint-owner',
email='owner@example.com',
password='testpass123',
role='owner',
)
self.account = Account.objects.create(
name='Site Builder Account',
slug='site-builder-account',
owner=self.user,
plan=self.plan,
)
self.user.account = self.account
self.user.save()
self.industry = Industry.objects.create(name='Automation', slug='automation')
self.industry_sector = IndustrySector.objects.create(
industry=self.industry,
name='Robotics',
slug='robotics',
)
self.site = Site.objects.create(
name='Acme Robotics',
slug='acme-robotics',
account=self.account,
industry=self.industry,
)
self.sector = Sector.objects.create(
site=self.site,
industry_sector=self.industry_sector,
name='Warehouse Automation',
slug='warehouse-automation',
account=self.account,
)
self.blueprint = SiteBlueprint.objects.create(
site=self.site,
sector=self.sector,
name='Core Blueprint',
description='Initial blueprint used for tests',
hosting_type='igny8_sites',
config_json={
'business_brief': 'Default brief',
'objectives': ['Drive demos'],
'style': {'palette': 'bold'},
},
)
self.page_blueprint = PageBlueprint.objects.create(
site_blueprint=self.blueprint,
slug='home',
title='Home',
type='home',
blocks_json=[{'type': 'hero', 'heading': 'Welcome'}],
status='draft',
order=0,
)

View File

@@ -0,0 +1,97 @@
from __future__ import annotations
from unittest.mock import MagicMock, patch
from igny8_core.business.billing.exceptions import InsufficientCreditsError
from igny8_core.business.content.models import Tasks
from igny8_core.business.site_building.services.page_generation_service import PageGenerationService
from igny8_core.business.site_building.services.structure_generation_service import (
StructureGenerationService,
)
from .base import SiteBuilderTestBase
class StructureGenerationServiceTests(SiteBuilderTestBase):
"""Covers the orchestration path for generating site structures."""
@patch('igny8_core.ai.tasks.run_ai_task')
@patch('igny8_core.business.site_building.services.structure_generation_service.CreditService.check_credits')
def test_generate_structure_updates_config_and_dispatches_task(self, mock_check, mock_run_ai):
mock_async_result = MagicMock()
mock_async_result.id = 'celery-123'
mock_run_ai.delay.return_value = mock_async_result
service = StructureGenerationService()
payload = {
'business_brief': 'We build autonomous fulfillment robots.',
'objectives': ['Book more demos'],
'style_preferences': {'palette': 'cool', 'personality': 'optimistic'},
'metadata': {'requested_by': 'integration-test'},
}
result = service.generate_structure(self.blueprint, **payload)
self.assertTrue(result['success'])
self.assertEqual(result['task_id'], 'celery-123')
mock_check.assert_called_once_with(self.account, 'site_structure_generation')
mock_run_ai.delay.assert_called_once()
self.blueprint.refresh_from_db()
self.assertEqual(self.blueprint.status, 'generating')
self.assertEqual(self.blueprint.config_json['business_brief'], payload['business_brief'])
self.assertEqual(self.blueprint.config_json['objectives'], payload['objectives'])
self.assertEqual(self.blueprint.config_json['style'], payload['style_preferences'])
self.assertIn('last_requested_at', self.blueprint.config_json)
self.assertEqual(self.blueprint.config_json['metadata'], payload['metadata'])
@patch('igny8_core.business.site_building.services.structure_generation_service.CreditService.check_credits')
def test_generate_structure_rolls_back_when_insufficient_credits(self, mock_check):
mock_check.side_effect = InsufficientCreditsError('No credits remaining')
service = StructureGenerationService()
with self.assertRaises(InsufficientCreditsError):
service.generate_structure(
self.blueprint,
business_brief='Too expensive request',
)
self.blueprint.refresh_from_db()
self.assertEqual(self.blueprint.status, 'draft')
class PageGenerationServiceTests(SiteBuilderTestBase):
"""Ensures Site Builder pages correctly leverage the Writer pipeline."""
@patch('igny8_core.business.site_building.services.page_generation_service.ContentGenerationService.generate_content')
def test_generate_page_content_creates_writer_task(self, mock_generate_content):
mock_generate_content.return_value = {'success': True}
service = PageGenerationService()
result = service.generate_page_content(self.page_blueprint)
created_task = Tasks.objects.get()
expected_title = '[Site Builder] Home'
self.assertEqual(created_task.title, expected_title)
mock_generate_content.assert_called_once_with([created_task.id], self.account)
self.page_blueprint.refresh_from_db()
self.assertEqual(self.page_blueprint.status, 'generating')
self.assertEqual(result, {'success': True})
@patch('igny8_core.business.site_building.services.page_generation_service.ContentGenerationService.generate_content')
def test_regenerate_page_replaces_writer_task(self, mock_generate_content):
mock_generate_content.return_value = {'success': True}
service = PageGenerationService()
first_result = service.generate_page_content(self.page_blueprint)
first_task_id = Tasks.objects.get().id
self.assertEqual(first_result, {'success': True})
second_result = service.regenerate_page(self.page_blueprint)
second_task = Tasks.objects.get()
self.assertEqual(second_result, {'success': True})
self.assertNotEqual(first_task_id, second_task.id)
self.assertEqual(Tasks.objects.count(), 1)
self.assertEqual(mock_generate_content.call_count, 2)

View File

@@ -25,6 +25,10 @@ app.conf.beat_schedule = {
'task': 'igny8_core.modules.billing.tasks.replenish_monthly_credits',
'schedule': crontab(hour=0, minute=0, day_of_month=1), # First day of month at midnight
},
'execute-scheduled-automation-rules': {
'task': 'igny8_core.business.automation.tasks.execute_scheduled_automation_rules',
'schedule': crontab(minute='*/5'), # Every 5 minutes
},
}
@app.task(bind=True, ignore_result=True)

View File

@@ -0,0 +1,5 @@
"""
Automation Module - API Layer
Business logic is in business/automation/
"""

View File

@@ -0,0 +1,13 @@
"""
Automation App Configuration
"""
from django.apps import AppConfig
class AutomationConfig(AppConfig):
"""Configuration for automation module"""
default_auto_field = 'django.db.models.BigAutoField'
name = 'igny8_core.modules.automation'
label = 'automation'
verbose_name = 'Automation'

View File

@@ -0,0 +1,100 @@
# Generated manually for Phase 2: Automation System
import django.core.validators
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('igny8_core_auth', '0008_passwordresettoken_alter_industry_options_and_more'),
]
operations = [
migrations.CreateModel(
name='AutomationRule',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('name', models.CharField(help_text='Rule name', max_length=255)),
('description', models.TextField(blank=True, help_text='Rule description', null=True)),
('trigger', models.CharField(choices=[('schedule', 'Schedule'), ('event', 'Event'), ('manual', 'Manual')], default='manual', max_length=50)),
('schedule', models.CharField(blank=True, help_text="Cron-like schedule string (e.g., '0 0 * * *' for daily at midnight)", max_length=100, null=True)),
('conditions', models.JSONField(default=list, help_text='List of conditions that must be met for rule to execute')),
('actions', models.JSONField(default=list, help_text='List of actions to execute when rule triggers')),
('is_active', models.BooleanField(default=True, help_text='Whether rule is active')),
('status', models.CharField(choices=[('active', 'Active'), ('inactive', 'Inactive'), ('paused', 'Paused')], default='active', max_length=50)),
('last_executed_at', models.DateTimeField(blank=True, null=True)),
('execution_count', models.IntegerField(default=0, validators=[django.core.validators.MinValueValidator(0)])),
('metadata', models.JSONField(default=dict, help_text='Additional metadata')),
('account', models.ForeignKey(db_column='tenant_id', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_set', to='igny8_core_auth.tenant')),
('site', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='igny8_core_auth.site')),
('sector', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='igny8_core_auth.sector')),
],
options={
'db_table': 'igny8_automation_rules',
'ordering': ['-created_at'],
'verbose_name': 'Automation Rule',
'verbose_name_plural': 'Automation Rules',
},
),
migrations.CreateModel(
name='ScheduledTask',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('scheduled_at', models.DateTimeField(help_text='When the task is scheduled to run')),
('executed_at', models.DateTimeField(blank=True, help_text='When the task was actually executed', null=True)),
('status', models.CharField(choices=[('pending', 'Pending'), ('running', 'Running'), ('completed', 'Completed'), ('failed', 'Failed'), ('cancelled', 'Cancelled')], default='pending', max_length=50)),
('result', models.JSONField(default=dict, help_text='Execution result data')),
('error_message', models.TextField(blank=True, help_text='Error message if execution failed', null=True)),
('metadata', models.JSONField(default=dict, help_text='Additional metadata')),
('account', models.ForeignKey(db_column='tenant_id', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_set', to='igny8_core_auth.tenant')),
('automation_rule', models.ForeignKey(help_text='The automation rule this task belongs to', on_delete=django.db.models.deletion.CASCADE, related_name='scheduled_tasks', to='automation.automationrule')),
],
options={
'db_table': 'igny8_scheduled_tasks',
'ordering': ['-scheduled_at'],
'verbose_name': 'Scheduled Task',
'verbose_name_plural': 'Scheduled Tasks',
},
),
migrations.AddIndex(
model_name='automationrule',
index=models.Index(fields=['trigger', 'is_active'], name='igny8_autom_trigger_123abc_idx'),
),
migrations.AddIndex(
model_name='automationrule',
index=models.Index(fields=['status'], name='igny8_autom_status_456def_idx'),
),
migrations.AddIndex(
model_name='automationrule',
index=models.Index(fields=['site', 'sector'], name='igny8_autom_site_id_789ghi_idx'),
),
migrations.AddIndex(
model_name='automationrule',
index=models.Index(fields=['trigger', 'is_active', 'status'], name='igny8_autom_trigger_0abjkl_idx'),
),
migrations.AddIndex(
model_name='scheduledtask',
index=models.Index(fields=['automation_rule', 'status'], name='igny8_sched_automation_123abc_idx'),
),
migrations.AddIndex(
model_name='scheduledtask',
index=models.Index(fields=['scheduled_at', 'status'], name='igny8_sched_scheduled_456def_idx'),
),
migrations.AddIndex(
model_name='scheduledtask',
index=models.Index(fields=['account', 'status'], name='igny8_sched_account_789ghi_idx'),
),
migrations.AddIndex(
model_name='scheduledtask',
index=models.Index(fields=['status', 'scheduled_at'], name='igny8_sched_status_0abjkl_idx'),
),
]

View File

@@ -0,0 +1,5 @@
# Backward compatibility alias - models moved to business/automation/
from igny8_core.business.automation.models import AutomationRule, ScheduledTask
__all__ = ['AutomationRule', 'ScheduledTask']

Some files were not shown because too many files have changed in this diff Show More