fina autoamtiona adn billing and credits
This commit is contained in:
@@ -24,7 +24,7 @@ from igny8_core.ai.functions.auto_cluster import AutoClusterFunction
|
||||
from igny8_core.ai.functions.generate_ideas import GenerateIdeasFunction
|
||||
from igny8_core.ai.functions.generate_content import GenerateContentFunction
|
||||
from igny8_core.ai.functions.generate_image_prompts import GenerateImagePromptsFunction
|
||||
from igny8_core.ai.functions.generate_images import GenerateImagesFunction
|
||||
from igny8_core.ai.tasks import process_image_generation_queue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -58,6 +58,26 @@ class AutomationService:
|
||||
service.run = run
|
||||
return service
|
||||
|
||||
def _check_should_stop(self) -> tuple[bool, str]:
|
||||
"""
|
||||
Check if automation should stop (paused or cancelled)
|
||||
|
||||
Returns:
|
||||
(should_stop, reason)
|
||||
"""
|
||||
if not self.run:
|
||||
return False, ""
|
||||
|
||||
# Refresh run from database
|
||||
self.run.refresh_from_db()
|
||||
|
||||
if self.run.status == 'paused':
|
||||
return True, "paused"
|
||||
elif self.run.status == 'cancelled':
|
||||
return True, "cancelled"
|
||||
|
||||
return False, ""
|
||||
|
||||
def start_automation(self, trigger_type: str = 'manual') -> str:
|
||||
"""
|
||||
Start automation run
|
||||
@@ -130,6 +150,45 @@ class AutomationService:
|
||||
|
||||
total_count = pending_keywords.count()
|
||||
|
||||
# NEW: Pre-stage validation for minimum keywords
|
||||
from igny8_core.ai.validators.cluster_validators import validate_minimum_keywords
|
||||
|
||||
keyword_ids_for_validation = list(pending_keywords.values_list('id', flat=True))
|
||||
|
||||
min_validation = validate_minimum_keywords(
|
||||
keyword_ids=keyword_ids_for_validation,
|
||||
account=self.account,
|
||||
min_required=5
|
||||
)
|
||||
|
||||
if not min_validation['valid']:
|
||||
# Log validation failure
|
||||
self.logger.log_stage_start(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, stage_name, total_count
|
||||
)
|
||||
|
||||
error_msg = min_validation['error']
|
||||
self.logger.log_stage_error(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, error_msg
|
||||
)
|
||||
|
||||
# Skip stage with proper result
|
||||
self.run.stage_1_result = {
|
||||
'keywords_processed': 0,
|
||||
'clusters_created': 0,
|
||||
'batches_run': 0,
|
||||
'skipped': True,
|
||||
'skip_reason': error_msg,
|
||||
'credits_used': 0
|
||||
}
|
||||
self.run.current_stage = 2
|
||||
self.run.save()
|
||||
|
||||
logger.warning(f"[AutomationService] Stage 1 skipped: {error_msg}")
|
||||
return
|
||||
|
||||
# Log stage start
|
||||
self.logger.log_stage_start(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
@@ -929,15 +988,25 @@ class AutomationService:
|
||||
stage_number, f"Generating image {idx}/{total_images}: {image.image_type} for '{content_title}'"
|
||||
)
|
||||
|
||||
# Call AI function via AIEngine
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=GenerateImagesFunction(),
|
||||
payload={'image_ids': [image.id]}
|
||||
)
|
||||
# Call process_image_generation_queue directly (same as Writer/Images page)
|
||||
# Queue the task
|
||||
if hasattr(process_image_generation_queue, 'delay'):
|
||||
task = process_image_generation_queue.delay(
|
||||
image_ids=[image.id],
|
||||
account_id=self.account.id,
|
||||
content_id=image.content.id if image.content else None
|
||||
)
|
||||
task_id = str(task.id)
|
||||
else:
|
||||
# Fallback for testing (synchronous)
|
||||
result = process_image_generation_queue(
|
||||
image_ids=[image.id],
|
||||
account_id=self.account.id,
|
||||
content_id=image.content.id if image.content else None
|
||||
)
|
||||
task_id = None
|
||||
|
||||
# Monitor task
|
||||
task_id = result.get('task_id')
|
||||
# Monitor task (if async)
|
||||
if task_id:
|
||||
# FIXED: Pass continue_on_error=True to keep processing other images on failure
|
||||
self._wait_for_task(task_id, stage_number, f"Image for '{content_title}'", continue_on_error=True)
|
||||
@@ -1185,3 +1254,250 @@ class AutomationService:
|
||||
minutes = int(elapsed // 60)
|
||||
seconds = int(elapsed % 60)
|
||||
return f"{minutes}m {seconds}s"
|
||||
|
||||
def get_current_processing_state(self) -> dict:
|
||||
"""
|
||||
Get real-time processing state for current automation run
|
||||
Returns detailed info about what's currently being processed
|
||||
"""
|
||||
if not self.run or self.run.status != 'running':
|
||||
return None
|
||||
|
||||
stage = self.run.current_stage
|
||||
|
||||
# Get stage-specific data based on current stage
|
||||
if stage == 1: # Keywords → Clusters
|
||||
return self._get_stage_1_state()
|
||||
elif stage == 2: # Clusters → Ideas
|
||||
return self._get_stage_2_state()
|
||||
elif stage == 3: # Ideas → Tasks
|
||||
return self._get_stage_3_state()
|
||||
elif stage == 4: # Tasks → Content
|
||||
return self._get_stage_4_state()
|
||||
elif stage == 5: # Content → Image Prompts
|
||||
return self._get_stage_5_state()
|
||||
elif stage == 6: # Image Prompts → Images
|
||||
return self._get_stage_6_state()
|
||||
elif stage == 7: # Manual Review Gate
|
||||
return self._get_stage_7_state()
|
||||
|
||||
return None
|
||||
|
||||
def _get_stage_1_state(self) -> dict:
|
||||
"""Get processing state for Stage 1: Keywords → Clusters"""
|
||||
queue = Keywords.objects.filter(
|
||||
site=self.site, status='new'
|
||||
).order_by('id')
|
||||
|
||||
processed = self._get_processed_count(1)
|
||||
total = queue.count() + processed
|
||||
|
||||
return {
|
||||
'stage_number': 1,
|
||||
'stage_name': 'Keywords → Clusters',
|
||||
'stage_type': 'AI',
|
||||
'total_items': total,
|
||||
'processed_items': processed,
|
||||
'percentage': round((processed / total * 100) if total > 0 else 0),
|
||||
'currently_processing': self._get_current_items(queue, 3),
|
||||
'up_next': self._get_next_items(queue, 2, skip=3),
|
||||
'remaining_count': queue.count()
|
||||
}
|
||||
|
||||
def _get_stage_2_state(self) -> dict:
|
||||
"""Get processing state for Stage 2: Clusters → Ideas"""
|
||||
queue = Clusters.objects.filter(
|
||||
site=self.site, status='new', disabled=False
|
||||
).order_by('id')
|
||||
|
||||
processed = self._get_processed_count(2)
|
||||
total = queue.count() + processed
|
||||
|
||||
return {
|
||||
'stage_number': 2,
|
||||
'stage_name': 'Clusters → Ideas',
|
||||
'stage_type': 'AI',
|
||||
'total_items': total,
|
||||
'processed_items': processed,
|
||||
'percentage': round((processed / total * 100) if total > 0 else 0),
|
||||
'currently_processing': self._get_current_items(queue, 1),
|
||||
'up_next': self._get_next_items(queue, 2, skip=1),
|
||||
'remaining_count': queue.count()
|
||||
}
|
||||
|
||||
def _get_stage_3_state(self) -> dict:
|
||||
"""Get processing state for Stage 3: Ideas → Tasks"""
|
||||
queue = ContentIdeas.objects.filter(
|
||||
site=self.site, status='approved'
|
||||
).order_by('id')
|
||||
|
||||
processed = self._get_processed_count(3)
|
||||
total = queue.count() + processed
|
||||
|
||||
return {
|
||||
'stage_number': 3,
|
||||
'stage_name': 'Ideas → Tasks',
|
||||
'stage_type': 'Local',
|
||||
'total_items': total,
|
||||
'processed_items': processed,
|
||||
'percentage': round((processed / total * 100) if total > 0 else 0),
|
||||
'currently_processing': self._get_current_items(queue, 1),
|
||||
'up_next': self._get_next_items(queue, 2, skip=1),
|
||||
'remaining_count': queue.count()
|
||||
}
|
||||
|
||||
def _get_stage_4_state(self) -> dict:
|
||||
"""Get processing state for Stage 4: Tasks → Content"""
|
||||
queue = Tasks.objects.filter(
|
||||
site=self.site, status='ready'
|
||||
).order_by('id')
|
||||
|
||||
processed = self._get_processed_count(4)
|
||||
total = queue.count() + processed
|
||||
|
||||
return {
|
||||
'stage_number': 4,
|
||||
'stage_name': 'Tasks → Content',
|
||||
'stage_type': 'AI',
|
||||
'total_items': total,
|
||||
'processed_items': processed,
|
||||
'percentage': round((processed / total * 100) if total > 0 else 0),
|
||||
'currently_processing': self._get_current_items(queue, 1),
|
||||
'up_next': self._get_next_items(queue, 2, skip=1),
|
||||
'remaining_count': queue.count()
|
||||
}
|
||||
|
||||
def _get_stage_5_state(self) -> dict:
|
||||
"""Get processing state for Stage 5: Content → Image Prompts"""
|
||||
queue = Content.objects.filter(
|
||||
site=self.site,
|
||||
status='draft'
|
||||
).annotate(
|
||||
images_count=Count('images')
|
||||
).filter(
|
||||
images_count=0
|
||||
).order_by('id')
|
||||
|
||||
processed = self._get_processed_count(5)
|
||||
total = queue.count() + processed
|
||||
|
||||
return {
|
||||
'stage_number': 5,
|
||||
'stage_name': 'Content → Image Prompts',
|
||||
'stage_type': 'AI',
|
||||
'total_items': total,
|
||||
'processed_items': processed,
|
||||
'percentage': round((processed / total * 100) if total > 0 else 0),
|
||||
'currently_processing': self._get_current_items(queue, 1),
|
||||
'up_next': self._get_next_items(queue, 2, skip=1),
|
||||
'remaining_count': queue.count()
|
||||
}
|
||||
|
||||
def _get_stage_6_state(self) -> dict:
|
||||
"""Get processing state for Stage 6: Image Prompts → Images"""
|
||||
queue = Images.objects.filter(
|
||||
site=self.site, status='pending'
|
||||
).order_by('id')
|
||||
|
||||
processed = self._get_processed_count(6)
|
||||
total = queue.count() + processed
|
||||
|
||||
return {
|
||||
'stage_number': 6,
|
||||
'stage_name': 'Image Prompts → Images',
|
||||
'stage_type': 'AI',
|
||||
'total_items': total,
|
||||
'processed_items': processed,
|
||||
'percentage': round((processed / total * 100) if total > 0 else 0),
|
||||
'currently_processing': self._get_current_items(queue, 1),
|
||||
'up_next': self._get_next_items(queue, 2, skip=1),
|
||||
'remaining_count': queue.count()
|
||||
}
|
||||
|
||||
def _get_stage_7_state(self) -> dict:
|
||||
"""Get processing state for Stage 7: Manual Review Gate"""
|
||||
queue = Content.objects.filter(
|
||||
site=self.site, status='review'
|
||||
).order_by('id')
|
||||
|
||||
total = queue.count()
|
||||
|
||||
return {
|
||||
'stage_number': 7,
|
||||
'stage_name': 'Manual Review Gate',
|
||||
'stage_type': 'Manual',
|
||||
'total_items': total,
|
||||
'processed_items': total,
|
||||
'percentage': 100,
|
||||
'currently_processing': [],
|
||||
'up_next': self._get_current_items(queue, 3),
|
||||
'remaining_count': total
|
||||
}
|
||||
|
||||
def _get_processed_count(self, stage: int) -> int:
|
||||
"""Get count of items processed in current stage"""
|
||||
if not self.run:
|
||||
return 0
|
||||
|
||||
result_key = f'stage_{stage}_result'
|
||||
result = getattr(self.run, result_key, {})
|
||||
|
||||
if not result:
|
||||
return 0
|
||||
|
||||
# Extract appropriate count from result
|
||||
if stage == 1:
|
||||
return result.get('keywords_processed', 0)
|
||||
elif stage == 2:
|
||||
return result.get('clusters_processed', 0)
|
||||
elif stage == 3:
|
||||
return result.get('ideas_processed', 0)
|
||||
elif stage == 4:
|
||||
return result.get('tasks_processed', 0)
|
||||
elif stage == 5:
|
||||
return result.get('content_processed', 0)
|
||||
elif stage == 6:
|
||||
return result.get('images_processed', 0)
|
||||
|
||||
return 0
|
||||
|
||||
def _get_current_items(self, queryset, count: int) -> list:
|
||||
"""Get currently processing items"""
|
||||
items = queryset[:count]
|
||||
return [
|
||||
{
|
||||
'id': item.id,
|
||||
'title': self._get_item_title(item),
|
||||
'type': queryset.model.__name__.lower()
|
||||
}
|
||||
for item in items
|
||||
]
|
||||
|
||||
def _get_next_items(self, queryset, count: int, skip: int = 0) -> list:
|
||||
"""Get next items in queue"""
|
||||
items = queryset[skip:skip + count]
|
||||
return [
|
||||
{
|
||||
'id': item.id,
|
||||
'title': self._get_item_title(item),
|
||||
'type': queryset.model.__name__.lower()
|
||||
}
|
||||
for item in items
|
||||
]
|
||||
|
||||
def _get_item_title(self, item) -> str:
|
||||
"""Extract title from various model types"""
|
||||
# Try different title fields based on model type
|
||||
if hasattr(item, 'keyword'):
|
||||
return item.keyword
|
||||
elif hasattr(item, 'cluster_name'):
|
||||
return item.cluster_name
|
||||
elif hasattr(item, 'idea_title'):
|
||||
return item.idea_title
|
||||
elif hasattr(item, 'title'):
|
||||
return item.title
|
||||
elif hasattr(item, 'image_type') and hasattr(item, 'content'):
|
||||
content_title = item.content.title if item.content else 'Unknown'
|
||||
return f"{item.image_type} for '{content_title}'"
|
||||
|
||||
return 'Unknown'
|
||||
|
||||
Reference in New Issue
Block a user