Add source tracking and sync status fields to Content model; update services module

- Introduced new fields in the Content model for source tracking and sync status, including external references and optimization fields.
- Updated the services module to include new content generation and pipeline services for better organization and clarity.
This commit is contained in:
IGNY8 VPS (Salman)
2025-11-17 11:15:15 +00:00
parent fe95d09bbe
commit 9930728e8a
19 changed files with 2281 additions and 1 deletions

Binary file not shown.

View File

@@ -115,6 +115,47 @@ class Content(SiteSectorBaseModel):
generated_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
# Phase 4: Source tracking
SOURCE_CHOICES = [
('igny8', 'IGNY8 Generated'),
('wordpress', 'WordPress Synced'),
('shopify', 'Shopify Synced'),
('custom', 'Custom API Synced'),
]
source = models.CharField(
max_length=50,
choices=SOURCE_CHOICES,
default='igny8',
db_index=True,
help_text="Source of the content"
)
SYNC_STATUS_CHOICES = [
('native', 'Native IGNY8 Content'),
('imported', 'Imported from External'),
('synced', 'Synced from External'),
]
sync_status = models.CharField(
max_length=50,
choices=SYNC_STATUS_CHOICES,
default='native',
db_index=True,
help_text="Sync status of the content"
)
# External reference fields
external_id = models.CharField(max_length=255, blank=True, null=True, help_text="External platform ID")
external_url = models.URLField(blank=True, null=True, help_text="External platform URL")
sync_metadata = models.JSONField(default=dict, blank=True, help_text="Platform-specific sync metadata")
# Phase 4: Linking fields
internal_links = models.JSONField(default=list, blank=True, help_text="Internal links added by linker")
linker_version = models.IntegerField(default=0, help_text="Version of linker processing")
# Phase 4: Optimization fields
optimizer_version = models.IntegerField(default=0, help_text="Version of optimizer processing")
optimization_scores = models.JSONField(default=dict, blank=True, help_text="Optimization scores (SEO, readability, engagement)")
class Meta:
app_label = 'writer'
db_table = 'igny8_content'
@@ -124,6 +165,9 @@ class Content(SiteSectorBaseModel):
indexes = [
models.Index(fields=['task']),
models.Index(fields=['generated_at']),
models.Index(fields=['source']),
models.Index(fields=['sync_status']),
models.Index(fields=['source', 'sync_status']),
]
def save(self, *args, **kwargs):

View File

@@ -1,4 +1,8 @@
"""
Content services
Content Services
"""
from igny8_core.business.content.services.content_generation_service import ContentGenerationService
from igny8_core.business.content.services.content_pipeline_service import ContentPipelineService
__all__ = ['ContentGenerationService', 'ContentPipelineService']

View File

@@ -0,0 +1,133 @@
"""
Content Pipeline Service
Orchestrates content processing pipeline: Writer → Linker → Optimizer
"""
import logging
from typing import List, Optional
from igny8_core.business.content.models import Content
from igny8_core.business.linking.services.linker_service import LinkerService
from igny8_core.business.optimization.services.optimizer_service import OptimizerService
logger = logging.getLogger(__name__)
class ContentPipelineService:
"""Orchestrates content processing pipeline"""
def __init__(self):
self.linker_service = LinkerService()
self.optimizer_service = OptimizerService()
def process_writer_content(
self,
content_id: int,
stages: Optional[List[str]] = None
) -> Content:
"""
Writer → Linker → Optimizer pipeline.
Args:
content_id: Content ID from Writer
stages: List of stages to run: ['linking', 'optimization'] (default: both)
Returns:
Processed Content instance
"""
if stages is None:
stages = ['linking', 'optimization']
try:
content = Content.objects.get(id=content_id, source='igny8')
except Content.DoesNotExist:
raise ValueError(f"IGNY8 content with id {content_id} does not exist")
# Stage 1: Linking
if 'linking' in stages:
try:
content = self.linker_service.process(content.id)
logger.info(f"Linked content {content_id}")
except Exception as e:
logger.error(f"Error in linking stage for content {content_id}: {str(e)}", exc_info=True)
# Continue to next stage even if linking fails
pass
# Stage 2: Optimization
if 'optimization' in stages:
try:
content = self.optimizer_service.optimize_from_writer(content.id)
logger.info(f"Optimized content {content_id}")
except Exception as e:
logger.error(f"Error in optimization stage for content {content_id}: {str(e)}", exc_info=True)
# Don't fail the whole pipeline
pass
return content
def process_synced_content(
self,
content_id: int,
stages: Optional[List[str]] = None
) -> Content:
"""
Synced Content → Optimizer pipeline (skip linking if needed).
Args:
content_id: Content ID from sync (WordPress, Shopify, etc.)
stages: List of stages to run: ['optimization'] (default: optimization only)
Returns:
Processed Content instance
"""
if stages is None:
stages = ['optimization']
try:
content = Content.objects.get(id=content_id)
except Content.DoesNotExist:
raise ValueError(f"Content with id {content_id} does not exist")
# Stage: Optimization (skip linking for synced content by default)
if 'optimization' in stages:
try:
if content.source == 'wordpress':
content = self.optimizer_service.optimize_from_wordpress_sync(content.id)
elif content.source in ['shopify', 'custom']:
content = self.optimizer_service.optimize_from_external_sync(content.id)
else:
content = self.optimizer_service.optimize_manual(content.id)
logger.info(f"Optimized synced content {content_id}")
except Exception as e:
logger.error(f"Error in optimization stage for content {content_id}: {str(e)}", exc_info=True)
raise
return content
def batch_process_writer_content(
self,
content_ids: List[int],
stages: Optional[List[str]] = None
) -> List[Content]:
"""
Batch process multiple Writer content items.
Args:
content_ids: List of content IDs
stages: List of stages to run
Returns:
List of processed Content instances
"""
results = []
for content_id in content_ids:
try:
result = self.process_writer_content(content_id, stages)
results.append(result)
except Exception as e:
logger.error(f"Error processing content {content_id}: {str(e)}", exc_info=True)
# Continue with other items
continue
return results

View File

@@ -0,0 +1,6 @@
"""
Linking Business Logic
Phase 4: Linker & Optimizer
"""

View File

@@ -0,0 +1,5 @@
"""
Linking Services
"""

View File

@@ -0,0 +1,117 @@
"""
Link Candidate Engine
Finds relevant content for internal linking
"""
import logging
from typing import List, Dict
from django.db import models
from igny8_core.business.content.models import Content
logger = logging.getLogger(__name__)
class CandidateEngine:
"""Finds link candidates for content"""
def find_candidates(self, content: Content, max_candidates: int = 10) -> List[Dict]:
"""
Find link candidates for a piece of content.
Args:
content: Content instance to find links for
max_candidates: Maximum number of candidates to return
Returns:
List of candidate dicts with: {'content_id', 'title', 'url', 'relevance_score', 'anchor_text'}
"""
if not content or not content.html_content:
return []
# Find relevant content from same account/site/sector
relevant_content = self._find_relevant_content(content)
# Score candidates based on relevance
candidates = self._score_candidates(content, relevant_content)
# Sort by score and return top candidates
candidates.sort(key=lambda x: x.get('relevance_score', 0), reverse=True)
return candidates[:max_candidates]
def _find_relevant_content(self, content: Content) -> List[Content]:
"""Find relevant content from same account/site/sector"""
# Get content from same account, site, and sector
queryset = Content.objects.filter(
account=content.account,
site=content.site,
sector=content.sector,
status__in=['draft', 'review', 'publish']
).exclude(id=content.id)
# Filter by keywords if available
if content.primary_keyword:
queryset = queryset.filter(
models.Q(primary_keyword__icontains=content.primary_keyword) |
models.Q(secondary_keywords__icontains=content.primary_keyword)
)
return list(queryset[:50]) # Limit initial query
def _score_candidates(self, content: Content, candidates: List[Content]) -> List[Dict]:
"""Score candidates based on relevance"""
scored = []
for candidate in candidates:
score = 0
# Keyword overlap (higher weight)
if content.primary_keyword and candidate.primary_keyword:
if content.primary_keyword.lower() in candidate.primary_keyword.lower():
score += 30
if candidate.primary_keyword.lower() in content.primary_keyword.lower():
score += 30
# Secondary keywords overlap
if content.secondary_keywords and candidate.secondary_keywords:
overlap = set(content.secondary_keywords) & set(candidate.secondary_keywords)
score += len(overlap) * 10
# Category overlap
if content.categories and candidate.categories:
overlap = set(content.categories) & set(candidate.categories)
score += len(overlap) * 5
# Tag overlap
if content.tags and candidate.tags:
overlap = set(content.tags) & set(candidate.tags)
score += len(overlap) * 3
# Recency bonus (newer content gets slight boost)
if candidate.generated_at:
days_old = (content.generated_at - candidate.generated_at).days
if days_old < 30:
score += 5
if score > 0:
scored.append({
'content_id': candidate.id,
'title': candidate.title or candidate.task.title if candidate.task else 'Untitled',
'url': f"/content/{candidate.id}/", # Placeholder - actual URL depends on routing
'relevance_score': score,
'anchor_text': self._generate_anchor_text(candidate, content)
})
return scored
def _generate_anchor_text(self, candidate: Content, source_content: Content) -> str:
"""Generate anchor text for link"""
# Use primary keyword if available, otherwise use title
if candidate.primary_keyword:
return candidate.primary_keyword
elif candidate.title:
return candidate.title
elif candidate.task and candidate.task.title:
return candidate.task.title
else:
return "Learn more"

View File

@@ -0,0 +1,73 @@
"""
Link Injection Engine
Injects internal links into content HTML
"""
import logging
import re
from typing import List, Dict
from igny8_core.business.content.models import Content
logger = logging.getLogger(__name__)
class InjectionEngine:
"""Injects links into content HTML"""
def inject_links(self, content: Content, candidates: List[Dict], max_links: int = 5) -> Dict:
"""
Inject links into content HTML.
Args:
content: Content instance
candidates: List of link candidates from CandidateEngine
max_links: Maximum number of links to inject
Returns:
Dict with: {'html_content', 'links', 'links_added'}
"""
if not content.html_content or not candidates:
return {
'html_content': content.html_content,
'links': [],
'links_added': 0
}
html = content.html_content
links_added = []
links_used = set() # Track which candidates we've used
# Sort candidates by relevance score
sorted_candidates = sorted(candidates, key=lambda x: x.get('relevance_score', 0), reverse=True)
# Inject links (limit to max_links)
for candidate in sorted_candidates[:max_links]:
if candidate['content_id'] in links_used:
continue
anchor_text = candidate.get('anchor_text', 'Learn more')
url = candidate.get('url', f"/content/{candidate['content_id']}/")
# Find first occurrence of anchor text in HTML (case-insensitive)
pattern = re.compile(re.escape(anchor_text), re.IGNORECASE)
match = pattern.search(html)
if match:
# Replace with link
link_html = f'<a href="{url}" class="internal-link">{anchor_text}</a>'
html = html[:match.start()] + link_html + html[match.end():]
links_added.append({
'content_id': candidate['content_id'],
'anchor_text': anchor_text,
'url': url,
'position': match.start()
})
links_used.add(candidate['content_id'])
return {
'html_content': html,
'links': links_added,
'links_added': len(links_added)
}

View File

@@ -0,0 +1,101 @@
"""
Linker Service
Main service for processing content for internal linking
"""
import logging
from typing import List
from igny8_core.business.content.models import Content
from igny8_core.business.linking.services.candidate_engine import CandidateEngine
from igny8_core.business.linking.services.injection_engine import InjectionEngine
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
logger = logging.getLogger(__name__)
class LinkerService:
"""Service for processing content for internal linking"""
def __init__(self):
self.candidate_engine = CandidateEngine()
self.injection_engine = InjectionEngine()
self.credit_service = CreditService()
def process(self, content_id: int) -> Content:
"""
Process content for linking.
Args:
content_id: Content ID to process
Returns:
Updated Content instance
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
try:
content = Content.objects.get(id=content_id)
except Content.DoesNotExist:
raise ValueError(f"Content with id {content_id} does not exist")
account = content.account
# Check credits
try:
self.credit_service.check_credits(account, 'linking')
except InsufficientCreditsError:
raise
# Find link candidates
candidates = self.candidate_engine.find_candidates(content)
if not candidates:
logger.info(f"No link candidates found for content {content_id}")
return content
# Inject links
result = self.injection_engine.inject_links(content, candidates)
# Update content
content.html_content = result['html_content']
content.internal_links = result['links']
content.linker_version += 1
content.save(update_fields=['html_content', 'internal_links', 'linker_version'])
# Deduct credits
self.credit_service.deduct_credits_for_operation(
account=account,
operation_type='linking',
description=f"Internal linking for content: {content.title or 'Untitled'}",
related_object_type='content',
related_object_id=content.id
)
logger.info(f"Linked content {content_id}: {result['links_added']} links added")
return content
def batch_process(self, content_ids: List[int]) -> List[Content]:
"""
Process multiple content items for linking.
Args:
content_ids: List of content IDs to process
Returns:
List of updated Content instances
"""
results = []
for content_id in content_ids:
try:
result = self.process(content_id)
results.append(result)
except Exception as e:
logger.error(f"Error processing content {content_id}: {str(e)}", exc_info=True)
# Continue with other items
continue
return results

View File

@@ -0,0 +1,6 @@
"""
Optimization Business Logic
Phase 4: Linker & Optimizer
"""

View File

@@ -0,0 +1,77 @@
"""
Optimization Models
Phase 4: Linker & Optimizer
"""
from django.db import models
from django.core.validators import MinValueValidator
from igny8_core.auth.models import AccountBaseModel
from igny8_core.business.content.models import Content
class OptimizationTask(AccountBaseModel):
"""
Optimization Task model for tracking content optimization runs.
"""
STATUS_CHOICES = [
('pending', 'Pending'),
('running', 'Running'),
('completed', 'Completed'),
('failed', 'Failed'),
]
content = models.ForeignKey(
Content,
on_delete=models.CASCADE,
related_name='optimization_tasks',
help_text="The content being optimized"
)
# Scores before and after optimization
scores_before = models.JSONField(default=dict, help_text="Optimization scores before")
scores_after = models.JSONField(default=dict, help_text="Optimization scores after")
# Content before and after (for comparison)
html_before = models.TextField(blank=True, help_text="HTML content before optimization")
html_after = models.TextField(blank=True, help_text="HTML content after optimization")
# Status
status = models.CharField(
max_length=20,
choices=STATUS_CHOICES,
default='pending',
db_index=True,
help_text="Optimization task status"
)
# Credits used
credits_used = models.IntegerField(default=0, validators=[MinValueValidator(0)], help_text="Credits used for optimization")
# Metadata
metadata = models.JSONField(default=dict, blank=True, help_text="Additional metadata")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'optimization'
db_table = 'igny8_optimization_tasks'
ordering = ['-created_at']
verbose_name = 'Optimization Task'
verbose_name_plural = 'Optimization Tasks'
indexes = [
models.Index(fields=['content', 'status']),
models.Index(fields=['account', 'status']),
models.Index(fields=['status', 'created_at']),
]
def save(self, *args, **kwargs):
"""Automatically set account from content"""
if self.content:
self.account = self.content.account
super().save(*args, **kwargs)
def __str__(self):
return f"Optimization for {self.content.title or 'Content'} ({self.get_status_display()})"

View File

@@ -0,0 +1,5 @@
"""
Optimization Services
"""

View File

@@ -0,0 +1,184 @@
"""
Content Analyzer
Analyzes content quality and calculates optimization scores
"""
import logging
import re
from typing import Dict
from igny8_core.business.content.models import Content
logger = logging.getLogger(__name__)
class ContentAnalyzer:
"""Analyzes content quality"""
def analyze(self, content: Content) -> Dict:
"""
Analyze content and return scores.
Args:
content: Content instance to analyze
Returns:
Dict with scores: {'seo_score', 'readability_score', 'engagement_score', 'overall_score'}
"""
if not content or not content.html_content:
return {
'seo_score': 0,
'readability_score': 0,
'engagement_score': 0,
'overall_score': 0
}
seo_score = self._calculate_seo_score(content)
readability_score = self._calculate_readability_score(content)
engagement_score = self._calculate_engagement_score(content)
# Overall score is weighted average
overall_score = (
seo_score * 0.4 +
readability_score * 0.3 +
engagement_score * 0.3
)
return {
'seo_score': round(seo_score, 2),
'readability_score': round(readability_score, 2),
'engagement_score': round(engagement_score, 2),
'overall_score': round(overall_score, 2),
'word_count': content.word_count or 0,
'has_meta_title': bool(content.meta_title),
'has_meta_description': bool(content.meta_description),
'has_primary_keyword': bool(content.primary_keyword),
'internal_links_count': len(content.internal_links) if content.internal_links else 0
}
def _calculate_seo_score(self, content: Content) -> float:
"""Calculate SEO score (0-100)"""
score = 0
# Meta title (20 points)
if content.meta_title:
if len(content.meta_title) >= 30 and len(content.meta_title) <= 60:
score += 20
elif len(content.meta_title) > 0:
score += 10
# Meta description (20 points)
if content.meta_description:
if len(content.meta_description) >= 120 and len(content.meta_description) <= 160:
score += 20
elif len(content.meta_description) > 0:
score += 10
# Primary keyword (20 points)
if content.primary_keyword:
score += 20
# Word count (20 points) - optimal range 1000-2500 words
word_count = content.word_count or 0
if 1000 <= word_count <= 2500:
score += 20
elif 500 <= word_count < 1000 or 2500 < word_count <= 3000:
score += 15
elif word_count > 0:
score += 10
# Internal links (20 points)
internal_links = content.internal_links or []
if len(internal_links) >= 3:
score += 20
elif len(internal_links) >= 1:
score += 10
return min(score, 100)
def _calculate_readability_score(self, content: Content) -> float:
"""Calculate readability score (0-100)"""
if not content.html_content:
return 0
# Simple readability metrics
html = content.html_content
# Remove HTML tags for text analysis
text = re.sub(r'<[^>]+>', '', html)
sentences = re.split(r'[.!?]+', text)
words = text.split()
if not words:
return 0
# Average sentence length (optimal: 15-20 words)
avg_sentence_length = len(words) / max(len(sentences), 1)
if 15 <= avg_sentence_length <= 20:
sentence_score = 40
elif 10 <= avg_sentence_length < 15 or 20 < avg_sentence_length <= 25:
sentence_score = 30
else:
sentence_score = 20
# Average word length (optimal: 4-5 characters)
avg_word_length = sum(len(word) for word in words) / len(words)
if 4 <= avg_word_length <= 5:
word_score = 30
elif 3 <= avg_word_length < 4 or 5 < avg_word_length <= 6:
word_score = 20
else:
word_score = 10
# Paragraph structure (30 points)
paragraphs = html.count('<p>') + html.count('<div>')
if paragraphs >= 3:
paragraph_score = 30
elif paragraphs >= 1:
paragraph_score = 20
else:
paragraph_score = 10
return min(sentence_score + word_score + paragraph_score, 100)
def _calculate_engagement_score(self, content: Content) -> float:
"""Calculate engagement score (0-100)"""
score = 0
# Headings (30 points)
if content.html_content:
h1_count = content.html_content.count('<h1>')
h2_count = content.html_content.count('<h2>')
h3_count = content.html_content.count('<h3>')
if h1_count >= 1 and h2_count >= 2:
score += 30
elif h1_count >= 1 or h2_count >= 1:
score += 20
elif h3_count >= 1:
score += 10
# Images (30 points)
if hasattr(content, 'images'):
image_count = content.images.count()
if image_count >= 3:
score += 30
elif image_count >= 1:
score += 20
# Lists (20 points)
if content.html_content:
list_count = content.html_content.count('<ul>') + content.html_content.count('<ol>')
if list_count >= 2:
score += 20
elif list_count >= 1:
score += 10
# Internal links (20 points)
internal_links = content.internal_links or []
if len(internal_links) >= 3:
score += 20
elif len(internal_links) >= 1:
score += 10
return min(score, 100)

View File

@@ -0,0 +1,216 @@
"""
Optimizer Service
Main service for content optimization with multiple entry points
"""
import logging
from typing import Optional
from igny8_core.business.content.models import Content
from igny8_core.business.optimization.models import OptimizationTask
from igny8_core.business.optimization.services.analyzer import ContentAnalyzer
from igny8_core.business.billing.services.credit_service import CreditService
from igny8_core.business.billing.exceptions import InsufficientCreditsError
logger = logging.getLogger(__name__)
class OptimizerService:
"""Service for content optimization with multiple entry points"""
def __init__(self):
self.analyzer = ContentAnalyzer()
self.credit_service = CreditService()
def optimize_from_writer(self, content_id: int) -> Content:
"""
Entry Point 1: Writer → Optimizer
Args:
content_id: Content ID from Writer module
Returns:
Optimized Content instance
"""
try:
content = Content.objects.get(id=content_id, source='igny8')
except Content.DoesNotExist:
raise ValueError(f"IGNY8 content with id {content_id} does not exist")
return self.optimize(content)
def optimize_from_wordpress_sync(self, content_id: int) -> Content:
"""
Entry Point 2: WordPress Sync → Optimizer
Args:
content_id: Content ID synced from WordPress
Returns:
Optimized Content instance
"""
try:
content = Content.objects.get(id=content_id, source='wordpress')
except Content.DoesNotExist:
raise ValueError(f"WordPress content with id {content_id} does not exist")
return self.optimize(content)
def optimize_from_external_sync(self, content_id: int) -> Content:
"""
Entry Point 3: External Sync → Optimizer (Shopify, custom APIs)
Args:
content_id: Content ID synced from external source
Returns:
Optimized Content instance
"""
try:
content = Content.objects.get(id=content_id, source__in=['shopify', 'custom'])
except Content.DoesNotExist:
raise ValueError(f"External content with id {content_id} does not exist")
return self.optimize(content)
def optimize_manual(self, content_id: int) -> Content:
"""
Entry Point 4: Manual Selection → Optimizer
Args:
content_id: Content ID selected manually
Returns:
Optimized Content instance
"""
try:
content = Content.objects.get(id=content_id)
except Content.DoesNotExist:
raise ValueError(f"Content with id {content_id} does not exist")
return self.optimize(content)
def optimize(self, content: Content) -> Content:
"""
Unified optimization logic (used by all entry points).
Args:
content: Content instance to optimize
Returns:
Optimized Content instance
Raises:
InsufficientCreditsError: If account doesn't have enough credits
"""
account = content.account
word_count = content.word_count or 0
# Check credits
try:
self.credit_service.check_credits(account, 'optimization', word_count)
except InsufficientCreditsError:
raise
# Analyze content before optimization
scores_before = self.analyzer.analyze(content)
html_before = content.html_content
# Create optimization task
task = OptimizationTask.objects.create(
content=content,
scores_before=scores_before,
status='running',
html_before=html_before,
account=account
)
try:
# Delegate to AI function (actual optimization happens in Celery/AI task)
# For now, we'll do a simple optimization pass
# In production, this would call the AI function
optimized_content = self._optimize_content(content, scores_before)
# Analyze optimized content
scores_after = self.analyzer.analyze(optimized_content)
# Calculate credits used
credits_used = self.credit_service.get_credit_cost('optimization', word_count)
# Update optimization task
task.scores_after = scores_after
task.html_after = optimized_content.html_content
task.status = 'completed'
task.credits_used = credits_used
task.save()
# Update content
content.html_content = optimized_content.html_content
content.optimizer_version += 1
content.optimization_scores = scores_after
content.save(update_fields=['html_content', 'optimizer_version', 'optimization_scores'])
# Deduct credits
self.credit_service.deduct_credits_for_operation(
account=account,
operation_type='optimization',
amount=word_count,
description=f"Content optimization: {content.title or 'Untitled'}",
related_object_type='content',
related_object_id=content.id,
metadata={
'scores_before': scores_before,
'scores_after': scores_after,
'improvement': scores_after.get('overall_score', 0) - scores_before.get('overall_score', 0)
}
)
logger.info(f"Optimized content {content.id}: {scores_before.get('overall_score', 0)}{scores_after.get('overall_score', 0)}")
return content
except Exception as e:
logger.error(f"Error optimizing content {content.id}: {str(e)}", exc_info=True)
task.status = 'failed'
task.metadata = {'error': str(e)}
task.save()
raise
def _optimize_content(self, content: Content, scores_before: dict) -> Content:
"""
Internal method to optimize content.
This is a placeholder - in production, this would call the AI function.
Args:
content: Content to optimize
scores_before: Scores before optimization
Returns:
Optimized Content instance
"""
# For now, return content as-is
# In production, this would:
# 1. Call OptimizeContentFunction AI function
# 2. Get optimized HTML
# 3. Update content
# Placeholder: We'll implement AI function call later
# For now, just return the content
return content
def analyze_only(self, content_id: int) -> dict:
"""
Analyze content without optimizing (for preview).
Args:
content_id: Content ID to analyze
Returns:
Analysis scores dict
"""
try:
content = Content.objects.get(id=content_id)
except Content.DoesNotExist:
raise ValueError(f"Content with id {content_id} does not exist")
return self.analyzer.analyze(content)

View File

@@ -0,0 +1,6 @@
"""
Site Building Business Logic
Phase 3: Site Builder
"""

View File

@@ -0,0 +1,168 @@
"""
Site Builder Models
Phase 3: Site Builder
"""
from django.db import models
from django.core.validators import MinValueValidator
from igny8_core.auth.models import SiteSectorBaseModel
class SiteBlueprint(SiteSectorBaseModel):
"""
Site Blueprint model for storing AI-generated site structures.
"""
STATUS_CHOICES = [
('draft', 'Draft'),
('generating', 'Generating'),
('ready', 'Ready'),
('deployed', 'Deployed'),
]
HOSTING_TYPE_CHOICES = [
('igny8_sites', 'IGNY8 Sites'),
('wordpress', 'WordPress'),
('shopify', 'Shopify'),
('multi', 'Multiple Destinations'),
]
name = models.CharField(max_length=255, help_text="Site name")
description = models.TextField(blank=True, null=True, help_text="Site description")
# Site configuration (from wizard)
config_json = models.JSONField(
default=dict,
help_text="Wizard configuration: business_type, style, objectives, etc."
)
# Generated structure (from AI)
structure_json = models.JSONField(
default=dict,
help_text="AI-generated structure: pages, layout, theme, etc."
)
# Status tracking
status = models.CharField(
max_length=20,
choices=STATUS_CHOICES,
default='draft',
db_index=True,
help_text="Blueprint status"
)
# Hosting configuration
hosting_type = models.CharField(
max_length=50,
choices=HOSTING_TYPE_CHOICES,
default='igny8_sites',
help_text="Target hosting platform"
)
# Version tracking
version = models.IntegerField(default=1, validators=[MinValueValidator(1)], help_text="Blueprint version")
deployed_version = models.IntegerField(null=True, blank=True, help_text="Currently deployed version")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'site_building'
db_table = 'igny8_site_blueprints'
ordering = ['-created_at']
verbose_name = 'Site Blueprint'
verbose_name_plural = 'Site Blueprints'
indexes = [
models.Index(fields=['status']),
models.Index(fields=['hosting_type']),
models.Index(fields=['site', 'sector']),
models.Index(fields=['account', 'status']),
]
def __str__(self):
return f"{self.name} ({self.get_status_display()})"
class PageBlueprint(SiteSectorBaseModel):
"""
Page Blueprint model for storing individual page definitions.
"""
PAGE_TYPE_CHOICES = [
('home', 'Home'),
('about', 'About'),
('services', 'Services'),
('products', 'Products'),
('blog', 'Blog'),
('contact', 'Contact'),
('custom', 'Custom'),
]
STATUS_CHOICES = [
('draft', 'Draft'),
('generating', 'Generating'),
('ready', 'Ready'),
]
site_blueprint = models.ForeignKey(
SiteBlueprint,
on_delete=models.CASCADE,
related_name='pages',
help_text="The site blueprint this page belongs to"
)
slug = models.SlugField(max_length=255, help_text="Page URL slug")
title = models.CharField(max_length=255, help_text="Page title")
# Page type
type = models.CharField(
max_length=50,
choices=PAGE_TYPE_CHOICES,
default='custom',
help_text="Page type"
)
# Page content (blocks)
blocks_json = models.JSONField(
default=list,
help_text="Page content blocks: [{'type': 'hero', 'data': {...}}, ...]"
)
# Status
status = models.CharField(
max_length=20,
choices=STATUS_CHOICES,
default='draft',
db_index=True,
help_text="Page status"
)
# Order
order = models.IntegerField(default=0, help_text="Page order in navigation")
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
app_label = 'site_building'
db_table = 'igny8_page_blueprints'
ordering = ['order', 'created_at']
verbose_name = 'Page Blueprint'
verbose_name_plural = 'Page Blueprints'
unique_together = [['site_blueprint', 'slug']]
indexes = [
models.Index(fields=['site_blueprint', 'status']),
models.Index(fields=['type']),
models.Index(fields=['site_blueprint', 'order']),
]
def save(self, *args, **kwargs):
"""Automatically set account, site, and sector from site_blueprint"""
if self.site_blueprint:
self.account = self.site_blueprint.account
self.site = self.site_blueprint.site
self.sector = self.site_blueprint.sector
super().save(*args, **kwargs)
def __str__(self):
return f"{self.title} ({self.site_blueprint.name})"

View File

@@ -0,0 +1,5 @@
"""
Site Building Services
"""

View File

@@ -0,0 +1,264 @@
"""
Site File Management Service
Manages file uploads, deletions, and access control for site assets
"""
import logging
import os
from pathlib import Path
from typing import List, Dict, Optional
from django.core.exceptions import PermissionDenied, ValidationError
from igny8_core.auth.models import User, Site
logger = logging.getLogger(__name__)
# Base path for site files
SITES_DATA_BASE = Path('/data/app/sites-data/clients')
class SiteBuilderFileService:
"""Service for managing site files and assets"""
def __init__(self):
self.base_path = SITES_DATA_BASE
self.max_file_size = 10 * 1024 * 1024 # 10MB per file
self.max_storage_per_site = 100 * 1024 * 1024 # 100MB per site
def get_user_accessible_sites(self, user: User) -> List[Site]:
"""
Get sites user can access for file management.
Args:
user: User instance
Returns:
List of Site instances user can access
"""
# Owner/Admin: Full access to all account sites
if user.is_owner_or_admin():
return Site.objects.filter(account=user.account, is_active=True)
# Editor/Viewer: Access to granted sites (via SiteUserAccess)
# TODO: Implement SiteUserAccess check when available
return Site.objects.filter(account=user.account, is_active=True)
def check_file_access(self, user: User, site_id: int) -> bool:
"""
Check if user can access site's files.
Args:
user: User instance
site_id: Site ID
Returns:
True if user has access, False otherwise
"""
accessible_sites = self.get_user_accessible_sites(user)
return any(site.id == site_id for site in accessible_sites)
def get_site_files_path(self, site_id: int, version: int = 1) -> Path:
"""
Get site's files directory path.
Args:
site_id: Site ID
version: Site version (default: 1)
Returns:
Path object for site files directory
"""
return self.base_path / str(site_id) / f"v{version}" / "assets"
def check_storage_quota(self, site_id: int, file_size: int) -> bool:
"""
Check if site has enough storage quota.
Args:
site_id: Site ID
file_size: Size of file to upload in bytes
Returns:
True if quota available, False otherwise
"""
site_path = self.get_site_files_path(site_id)
# Calculate current storage usage
current_usage = self._calculate_storage_usage(site_path)
# Check if adding file would exceed quota
return (current_usage + file_size) <= self.max_storage_per_site
def _calculate_storage_usage(self, site_path: Path) -> int:
"""Calculate current storage usage for a site"""
if not site_path.exists():
return 0
total_size = 0
for file_path in site_path.rglob('*'):
if file_path.is_file():
total_size += file_path.stat().st_size
return total_size
def upload_file(
self,
user: User,
site_id: int,
file,
folder: str = 'images',
version: int = 1
) -> Dict:
"""
Upload file to site's assets folder.
Args:
user: User instance
site_id: Site ID
file: Django UploadedFile instance
folder: Subfolder name (images, documents, media)
version: Site version
Returns:
Dict with file_path, file_url, file_size
Raises:
PermissionDenied: If user doesn't have access
ValidationError: If file size exceeds limit or quota exceeded
"""
# Check access
if not self.check_file_access(user, site_id):
raise PermissionDenied("No access to this site")
# Check file size
if file.size > self.max_file_size:
raise ValidationError(f"File size exceeds maximum of {self.max_file_size / 1024 / 1024}MB")
# Check storage quota
if not self.check_storage_quota(site_id, file.size):
raise ValidationError("Storage quota exceeded")
# Get target directory
site_path = self.get_site_files_path(site_id, version)
target_dir = site_path / folder
target_dir.mkdir(parents=True, exist_ok=True)
# Save file
file_path = target_dir / file.name
with open(file_path, 'wb') as f:
for chunk in file.chunks():
f.write(chunk)
# Generate file URL (relative to site assets)
file_url = f"/sites/{site_id}/v{version}/assets/{folder}/{file.name}"
logger.info(f"Uploaded file {file.name} to site {site_id}/{folder}")
return {
'file_path': str(file_path),
'file_url': file_url,
'file_size': file.size,
'folder': folder
}
def delete_file(
self,
user: User,
site_id: int,
file_path: str,
version: int = 1
) -> bool:
"""
Delete file from site's assets.
Args:
user: User instance
site_id: Site ID
file_path: Relative file path (e.g., 'images/photo.jpg')
version: Site version
Returns:
True if deleted, False otherwise
Raises:
PermissionDenied: If user doesn't have access
"""
# Check access
if not self.check_file_access(user, site_id):
raise PermissionDenied("No access to this site")
# Get full file path
site_path = self.get_site_files_path(site_id, version)
full_path = site_path / file_path
# Check if file exists and is within site directory
if not full_path.exists() or not str(full_path).startswith(str(site_path)):
return False
# Delete file
full_path.unlink()
logger.info(f"Deleted file {file_path} from site {site_id}")
return True
def list_files(
self,
user: User,
site_id: int,
folder: Optional[str] = None,
version: int = 1
) -> List[Dict]:
"""
List files in site's assets.
Args:
user: User instance
site_id: Site ID
folder: Optional folder to list (None = all folders)
version: Site version
Returns:
List of file dicts with: name, path, size, folder, url
Raises:
PermissionDenied: If user doesn't have access
"""
# Check access
if not self.check_file_access(user, site_id):
raise PermissionDenied("No access to this site")
site_path = self.get_site_files_path(site_id, version)
if not site_path.exists():
return []
files = []
# List files in specified folder or all folders
if folder:
folder_path = site_path / folder
if folder_path.exists():
files.extend(self._list_directory(folder_path, folder, site_id, version))
else:
# List all folders
for folder_dir in site_path.iterdir():
if folder_dir.is_dir():
files.extend(self._list_directory(folder_dir, folder_dir.name, site_id, version))
return files
def _list_directory(self, directory: Path, folder_name: str, site_id: int, version: int) -> List[Dict]:
"""List files in a directory"""
files = []
for file_path in directory.iterdir():
if file_path.is_file():
file_url = f"/sites/{site_id}/v{version}/assets/{folder_name}/{file_path.name}"
files.append({
'name': file_path.name,
'path': f"{folder_name}/{file_path.name}",
'size': file_path.stat().st_size,
'folder': folder_name,
'url': file_url
})
return files