ai fucntiosn adn otehr atuoamtion fixes
This commit is contained in:
@@ -172,6 +172,22 @@ class AutomationService:
|
||||
|
||||
total_count = pending_keywords.count()
|
||||
|
||||
# IMPORTANT: Group keywords by sector to avoid mixing sectors in clustering
|
||||
# Each sector's keywords must be processed separately
|
||||
from collections import defaultdict
|
||||
keywords_by_sector = defaultdict(list)
|
||||
for kw_id, sector_id in pending_keywords.values_list('id', 'sector_id'):
|
||||
# Use sector_id or 'no_sector' for keywords without a sector
|
||||
key = sector_id if sector_id else 'no_sector'
|
||||
keywords_by_sector[key].append(kw_id)
|
||||
|
||||
sector_count = len(keywords_by_sector)
|
||||
if sector_count > 1:
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Keywords span {sector_count} sectors - will process each sector separately"
|
||||
)
|
||||
|
||||
# NEW: Pre-stage validation for minimum keywords
|
||||
from igny8_core.ai.validators.cluster_validators import validate_minimum_keywords
|
||||
|
||||
@@ -229,20 +245,19 @@ class AutomationService:
|
||||
|
||||
# Process in batches with dynamic sizing
|
||||
batch_size = self.config.stage_1_batch_size
|
||||
# FIXED: Use min() for dynamic batch sizing
|
||||
actual_batch_size = min(total_count, batch_size)
|
||||
|
||||
keywords_processed = 0
|
||||
clusters_created = 0
|
||||
batches_run = 0
|
||||
credits_before = self._get_credits_used()
|
||||
|
||||
keyword_ids = list(pending_keywords.values_list('id', flat=True))
|
||||
# Get total keyword count for progress tracking
|
||||
total_keyword_count = sum(len(ids) for ids in keywords_by_sector.values())
|
||||
|
||||
# INITIAL SAVE: Set keywords_total immediately so frontend shows accurate counts from start
|
||||
self.run.stage_1_result = {
|
||||
'keywords_processed': 0,
|
||||
'keywords_total': len(keyword_ids),
|
||||
'keywords_total': total_keyword_count,
|
||||
'clusters_created': 0,
|
||||
'batches_run': 0,
|
||||
'credits_used': 0,
|
||||
@@ -251,17 +266,28 @@ class AutomationService:
|
||||
}
|
||||
self.run.save(update_fields=['stage_1_result'])
|
||||
|
||||
for i in range(0, len(keyword_ids), actual_batch_size):
|
||||
# Check if automation should stop (paused or cancelled)
|
||||
should_stop, reason = self._check_should_stop()
|
||||
if should_stop:
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Stage {reason} - saving progress ({keywords_processed} keywords processed)"
|
||||
)
|
||||
# Save current progress
|
||||
credits_used = self._get_credits_used() - credits_before
|
||||
time_elapsed = self._format_time_elapsed(start_time)
|
||||
# Process each sector's keywords separately to avoid mixing sectors
|
||||
for sector_idx, (sector_key, sector_keyword_ids) in enumerate(keywords_by_sector.items()):
|
||||
sector_name = f"Sector {sector_key}" if sector_key != 'no_sector' else "No Sector"
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Processing {sector_name} ({len(sector_keyword_ids)} keywords) [{sector_idx + 1}/{len(keywords_by_sector)}]"
|
||||
)
|
||||
|
||||
# Dynamic batch sizing per sector
|
||||
actual_batch_size = min(len(sector_keyword_ids), batch_size)
|
||||
|
||||
for i in range(0, len(sector_keyword_ids), actual_batch_size):
|
||||
# Check if automation should stop (paused or cancelled)
|
||||
should_stop, reason = self._check_should_stop()
|
||||
if should_stop:
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Stage {reason} - saving progress ({keywords_processed} keywords processed)"
|
||||
)
|
||||
# Save current progress
|
||||
credits_used = self._get_credits_used() - credits_before
|
||||
time_elapsed = self._format_time_elapsed(start_time)
|
||||
self.run.stage_1_result = {
|
||||
'keywords_processed': keywords_processed,
|
||||
'clusters_created': clusters_created,
|
||||
@@ -275,92 +301,92 @@ class AutomationService:
|
||||
self.run.save()
|
||||
return
|
||||
|
||||
try:
|
||||
batch = keyword_ids[i:i + actual_batch_size]
|
||||
batch_num = (i // actual_batch_size) + 1
|
||||
total_batches = (len(keyword_ids) + actual_batch_size - 1) // actual_batch_size
|
||||
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Processing batch {batch_num}/{total_batches} ({len(batch)} keywords)"
|
||||
)
|
||||
|
||||
# Call AI function via AIEngine (runs synchronously - no Celery subtask)
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=AutoClusterFunction(),
|
||||
payload={'ids': batch}
|
||||
)
|
||||
|
||||
# NOTE: AIEngine.execute() runs synchronously and returns immediately
|
||||
# No Celery task polling needed
|
||||
|
||||
if not result.get('success'):
|
||||
error_msg = result.get('error', 'Unknown error')
|
||||
logger.warning(f"[AutomationService] Clustering failed for batch {batch_num}: {error_msg}")
|
||||
# Continue to next batch
|
||||
|
||||
keywords_processed += len(batch)
|
||||
batches_run += 1
|
||||
|
||||
# Log progress
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Batch {batch_num} complete"
|
||||
)
|
||||
|
||||
# INCREMENTAL SAVE: Update stage result after each batch for real-time UI progress
|
||||
clusters_so_far = Clusters.objects.filter(
|
||||
site=self.site,
|
||||
created_at__gte=self.run.started_at
|
||||
).count()
|
||||
self.run.stage_1_result = {
|
||||
'keywords_processed': keywords_processed,
|
||||
'keywords_total': len(keyword_ids),
|
||||
'clusters_created': clusters_so_far,
|
||||
'batches_run': batches_run,
|
||||
'credits_used': self._get_credits_used() - credits_before,
|
||||
'time_elapsed': self._format_time_elapsed(start_time),
|
||||
'in_progress': True
|
||||
}
|
||||
self.run.save(update_fields=['stage_1_result'])
|
||||
|
||||
# Emit per-item trace event for UI progress tracking
|
||||
try:
|
||||
self.logger.append_trace(self.account.id, self.site.id, self.run.run_id, {
|
||||
'event': 'stage_item_processed',
|
||||
'run_id': self.run.run_id,
|
||||
'stage': stage_number,
|
||||
'processed': keywords_processed,
|
||||
'total': len(keyword_ids),
|
||||
'batch_num': batch_num,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
# FIXED: Log error but continue processing remaining batches
|
||||
error_msg = f"Failed to process batch {batch_num}: {str(e)}"
|
||||
logger.error(f"[AutomationService] {error_msg}", exc_info=True)
|
||||
self.logger.log_stage_error(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, error_msg
|
||||
)
|
||||
# Continue to next batch
|
||||
continue
|
||||
|
||||
# ADDED: Within-stage delay (between batches)
|
||||
if i + actual_batch_size < len(keyword_ids): # Not the last batch
|
||||
delay = self.config.within_stage_delay
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Waiting {delay} seconds before next batch..."
|
||||
)
|
||||
time.sleep(delay)
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, "Delay complete, resuming processing"
|
||||
)
|
||||
batch = sector_keyword_ids[i:i + actual_batch_size]
|
||||
batch_num = (i // actual_batch_size) + 1
|
||||
total_batches = (len(sector_keyword_ids) + actual_batch_size - 1) // actual_batch_size
|
||||
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Processing {sector_name} batch {batch_num}/{total_batches} ({len(batch)} keywords)"
|
||||
)
|
||||
|
||||
# Call AI function via AIEngine (runs synchronously - no Celery subtask)
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=AutoClusterFunction(),
|
||||
payload={'ids': batch}
|
||||
)
|
||||
|
||||
# NOTE: AIEngine.execute() runs synchronously and returns immediately
|
||||
# No Celery task polling needed
|
||||
|
||||
if not result.get('success'):
|
||||
error_msg = result.get('error', 'Unknown error')
|
||||
logger.warning(f"[AutomationService] Clustering failed for {sector_name} batch {batch_num}: {error_msg}")
|
||||
# Continue to next batch
|
||||
|
||||
keywords_processed += len(batch)
|
||||
batches_run += 1
|
||||
|
||||
# Log progress
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"{sector_name} batch {batch_num} complete"
|
||||
)
|
||||
|
||||
# INCREMENTAL SAVE: Update stage result after each batch for real-time UI progress
|
||||
clusters_so_far = Clusters.objects.filter(
|
||||
site=self.site,
|
||||
created_at__gte=self.run.started_at
|
||||
).count()
|
||||
self.run.stage_1_result = {
|
||||
'keywords_processed': keywords_processed,
|
||||
'keywords_total': total_keyword_count,
|
||||
'clusters_created': clusters_so_far,
|
||||
'batches_run': batches_run,
|
||||
'credits_used': self._get_credits_used() - credits_before,
|
||||
'time_elapsed': self._format_time_elapsed(start_time),
|
||||
'in_progress': True
|
||||
}
|
||||
self.run.save(update_fields=['stage_1_result'])
|
||||
|
||||
# Emit per-item trace event for UI progress tracking
|
||||
try:
|
||||
self.logger.append_trace(self.account.id, self.site.id, self.run.run_id, {
|
||||
'event': 'stage_item_processed',
|
||||
'run_id': self.run.run_id,
|
||||
'stage': stage_number,
|
||||
'processed': keywords_processed,
|
||||
'total': total_keyword_count,
|
||||
'batch_num': batch_num,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
# FIXED: Log error but continue processing remaining batches
|
||||
error_msg = f"Failed to process {sector_name} batch {batch_num}: {str(e)}"
|
||||
logger.error(f"[AutomationService] {error_msg}", exc_info=True)
|
||||
self.logger.log_stage_error(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, error_msg
|
||||
)
|
||||
# Continue to next batch
|
||||
continue
|
||||
|
||||
# ADDED: Within-stage delay (between batches)
|
||||
if i + actual_batch_size < len(sector_keyword_ids): # Not the last batch in this sector
|
||||
delay = self.config.within_stage_delay
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Waiting {delay} seconds before next batch..."
|
||||
)
|
||||
time.sleep(delay)
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, "Delay complete, resuming processing"
|
||||
)
|
||||
|
||||
# Get clusters created count
|
||||
clusters_created = Clusters.objects.filter(
|
||||
|
||||
@@ -156,7 +156,7 @@ class CreditService:
|
||||
raise CreditCalculationError(f"Error calculating credits: {e}")
|
||||
|
||||
@staticmethod
|
||||
def calculate_credits_from_tokens(operation_type, tokens_input, tokens_output):
|
||||
def calculate_credits_from_tokens(operation_type, tokens_input, tokens_output, model_name=None):
|
||||
"""
|
||||
Calculate credits from actual token usage using configured ratio.
|
||||
This is the ONLY way credits are calculated in the system.
|
||||
@@ -165,6 +165,7 @@ class CreditService:
|
||||
operation_type: Type of operation
|
||||
tokens_input: Input tokens used
|
||||
tokens_output: Output tokens used
|
||||
model_name: Optional AI model name (e.g., 'gpt-4o') for model-specific tokens_per_credit
|
||||
|
||||
Returns:
|
||||
int: Credits to deduct
|
||||
@@ -174,7 +175,7 @@ class CreditService:
|
||||
"""
|
||||
import logging
|
||||
import math
|
||||
from igny8_core.business.billing.models import CreditCostConfig, BillingConfiguration
|
||||
from igny8_core.business.billing.models import CreditCostConfig, BillingConfiguration, AIModelConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -184,15 +185,32 @@ class CreditService:
|
||||
is_active=True
|
||||
).first()
|
||||
|
||||
if not config:
|
||||
# Use global billing config as fallback
|
||||
billing_config = BillingConfiguration.get_config()
|
||||
# Get tokens_per_credit from AIModelConfig if model_name provided
|
||||
billing_config = BillingConfiguration.get_config()
|
||||
tokens_per_credit = None
|
||||
|
||||
if model_name:
|
||||
# Try to get model-specific tokens_per_credit from AIModelConfig
|
||||
model_config = AIModelConfig.objects.filter(
|
||||
model_name=model_name,
|
||||
is_active=True
|
||||
).first()
|
||||
if model_config and model_config.tokens_per_credit:
|
||||
tokens_per_credit = model_config.tokens_per_credit
|
||||
logger.info(f"Using model-specific tokens_per_credit: {tokens_per_credit} for {model_name}")
|
||||
|
||||
# Fallback to global billing config
|
||||
if tokens_per_credit is None:
|
||||
tokens_per_credit = billing_config.default_tokens_per_credit
|
||||
logger.info(f"Using global default tokens_per_credit: {tokens_per_credit}")
|
||||
|
||||
if not config:
|
||||
min_credits = 1
|
||||
logger.info(f"No config for {operation_type}, using default: {tokens_per_credit} tokens/credit")
|
||||
logger.info(f"No config for {operation_type}, using default: {tokens_per_credit} tokens/credit, min 1 credit")
|
||||
else:
|
||||
tokens_per_credit = config.tokens_per_credit
|
||||
min_credits = config.min_credits
|
||||
# Use base_credits as minimum for this operation
|
||||
min_credits = config.base_credits
|
||||
logger.info(f"Config for {operation_type}: {tokens_per_credit} tokens/credit, min {min_credits} credits")
|
||||
|
||||
# Calculate total tokens
|
||||
total_tokens = (tokens_input or 0) + (tokens_output or 0)
|
||||
@@ -250,8 +268,8 @@ class CreditService:
|
||||
).first()
|
||||
|
||||
if config:
|
||||
# Use minimum credits as estimate for token-based operations
|
||||
required = config.min_credits
|
||||
# Use base_credits as estimate for token-based operations
|
||||
required = config.base_credits
|
||||
else:
|
||||
# Fallback to constants
|
||||
required = CREDIT_COSTS.get(operation_type, 1)
|
||||
@@ -377,10 +395,22 @@ class CreditService:
|
||||
metadata=metadata or {}
|
||||
)
|
||||
|
||||
# Convert site_id to Site instance if needed
|
||||
site_instance = None
|
||||
if site is not None:
|
||||
from igny8_core.auth.models import Site
|
||||
if isinstance(site, int):
|
||||
try:
|
||||
site_instance = Site.objects.get(id=site)
|
||||
except Site.DoesNotExist:
|
||||
logger.warning(f"Site with id {site} not found for credit usage log")
|
||||
else:
|
||||
site_instance = site
|
||||
|
||||
# Create CreditUsageLog
|
||||
CreditUsageLog.objects.create(
|
||||
account=account,
|
||||
site=site,
|
||||
site=site_instance,
|
||||
operation_type=operation_type,
|
||||
credits_used=amount,
|
||||
cost_usd=cost_usd,
|
||||
@@ -442,9 +472,9 @@ class CreditService:
|
||||
f"Got: tokens_input={tokens_input}, tokens_output={tokens_output}"
|
||||
)
|
||||
|
||||
# Calculate credits from actual token usage
|
||||
# Calculate credits from actual token usage (pass model_used for model-specific rate)
|
||||
credits_required = CreditService.calculate_credits_from_tokens(
|
||||
operation_type, tokens_input, tokens_output
|
||||
operation_type, tokens_input, tokens_output, model_name=model_used
|
||||
)
|
||||
|
||||
# Check sufficient credits
|
||||
|
||||
Reference in New Issue
Block a user