ai updaet
This commit is contained in:
@@ -4,7 +4,7 @@ AI Engine - Central orchestrator for all AI functions
|
||||
import logging
|
||||
from typing import Dict, Any, Optional
|
||||
from igny8_core.ai.base import BaseAIFunction
|
||||
from igny8_core.ai.tracker import StepTracker, ProgressTracker, CostTracker
|
||||
from igny8_core.ai.tracker import StepTracker, ProgressTracker, CostTracker, ConsoleStepTracker
|
||||
from igny8_core.ai.ai_core import AICore
|
||||
from igny8_core.ai.settings import get_model_config
|
||||
|
||||
@@ -21,7 +21,8 @@ class AIEngine:
|
||||
self.task = celery_task
|
||||
self.account = account
|
||||
self.tracker = ProgressTracker(celery_task)
|
||||
self.step_tracker = StepTracker('ai_engine')
|
||||
self.step_tracker = StepTracker('ai_engine') # For Celery progress callbacks
|
||||
self.console_tracker = None # Will be initialized per function
|
||||
self.cost_tracker = CostTracker()
|
||||
|
||||
def execute(self, fn: BaseAIFunction, payload: dict) -> dict:
|
||||
@@ -39,16 +40,24 @@ class AIEngine:
|
||||
function_name = fn.get_name()
|
||||
self.step_tracker.function_name = function_name
|
||||
|
||||
# Initialize console tracker for logging (Stage 3 requirement)
|
||||
self.console_tracker = ConsoleStepTracker(function_name)
|
||||
self.console_tracker.init(f"Starting {function_name} execution")
|
||||
|
||||
try:
|
||||
# Phase 1: INIT - Validation & Setup (0-10%)
|
||||
self.console_tracker.prep("Validating input payload")
|
||||
validated = fn.validate(payload, self.account)
|
||||
if not validated['valid']:
|
||||
self.console_tracker.error('ValidationError', validated['error'])
|
||||
return self._handle_error(validated['error'], fn)
|
||||
|
||||
self.console_tracker.prep("Validation complete")
|
||||
self.step_tracker.add_request_step("INIT", "success", "Validation complete")
|
||||
self.tracker.update("INIT", 10, "Validation complete", meta=self.step_tracker.get_meta())
|
||||
|
||||
# Phase 2: PREP - Data Loading & Prompt Building (10-25%)
|
||||
self.console_tracker.prep("Loading data from database")
|
||||
data = fn.prepare(payload, self.account)
|
||||
if isinstance(data, (list, tuple)):
|
||||
data_count = len(data)
|
||||
@@ -57,7 +66,9 @@ class AIEngine:
|
||||
else:
|
||||
data_count = 1
|
||||
|
||||
self.console_tracker.prep(f"Building prompt from {data_count} items")
|
||||
prompt = fn.build_prompt(data, self.account)
|
||||
self.console_tracker.prep(f"Prompt built: {len(prompt)} characters")
|
||||
self.step_tracker.add_request_step("PREP", "success", f"Loaded {data_count} items, built prompt ({len(prompt)} chars)")
|
||||
self.tracker.update("PREP", 25, f"Data prepared: {data_count} items", meta=self.step_tracker.get_meta())
|
||||
|
||||
@@ -65,23 +76,27 @@ class AIEngine:
|
||||
ai_core = AICore(account=self.account)
|
||||
function_name = fn.get_name()
|
||||
|
||||
# Get model config from settings
|
||||
# Get model config from settings (Stage 4 requirement)
|
||||
model_config = get_model_config(function_name)
|
||||
model = model_config.get('model')
|
||||
|
||||
self.console_tracker.ai_call(f"Calling {model or 'default'} model with {len(prompt)} char prompt")
|
||||
|
||||
# Track AI call start
|
||||
self.step_tracker.add_response_step("AI_CALL", "success", f"Calling {model or 'default'} model...")
|
||||
self.tracker.update("AI_CALL", 30, f"Sending to {model or 'default'}...", meta=self.step_tracker.get_meta())
|
||||
|
||||
try:
|
||||
# Use centralized run_ai_request() with console logging
|
||||
# Use centralized run_ai_request() with console logging (Stage 2 & 3 requirement)
|
||||
# Pass console_tracker for unified logging
|
||||
raw_response = ai_core.run_ai_request(
|
||||
prompt=prompt,
|
||||
model=model,
|
||||
max_tokens=model_config.get('max_tokens'),
|
||||
temperature=model_config.get('temperature'),
|
||||
response_format=model_config.get('response_format'),
|
||||
function_name=function_name
|
||||
function_name=function_name,
|
||||
tracker=self.console_tracker # Pass console tracker for logging
|
||||
)
|
||||
except Exception as e:
|
||||
error_msg = f"AI call failed: {str(e)}"
|
||||
@@ -116,6 +131,7 @@ class AIEngine:
|
||||
|
||||
# Phase 4: PARSE - Response Parsing (70-85%)
|
||||
try:
|
||||
self.console_tracker.parse("Parsing AI response")
|
||||
response_content = raw_response.get('content', '')
|
||||
parsed = fn.parse_response(response_content, self.step_tracker)
|
||||
|
||||
@@ -126,6 +142,7 @@ class AIEngine:
|
||||
else:
|
||||
parsed_count = 1
|
||||
|
||||
self.console_tracker.parse(f"Successfully parsed {parsed_count} items from response")
|
||||
self.step_tracker.add_response_step("PARSE", "success", f"Parsed {parsed_count} items from AI response")
|
||||
self.tracker.update("PARSE", 85, f"Parsed {parsed_count} items", meta=self.step_tracker.get_meta())
|
||||
except Exception as parse_error:
|
||||
@@ -135,12 +152,27 @@ class AIEngine:
|
||||
return self._handle_error(error_msg, fn)
|
||||
|
||||
# Phase 5: SAVE - Database Operations (85-98%)
|
||||
self.console_tracker.save("Saving results to database")
|
||||
# Pass step_tracker to save_output so it can add validation steps
|
||||
save_result = fn.save_output(parsed, data, self.account, self.tracker, step_tracker=self.step_tracker)
|
||||
clusters_created = save_result.get('clusters_created', 0)
|
||||
keywords_updated = save_result.get('keywords_updated', 0)
|
||||
self.step_tracker.add_request_step("SAVE", "success", f"Created {clusters_created} clusters, updated {keywords_updated} keywords")
|
||||
self.tracker.update("SAVE", 98, f"Saved: {clusters_created} clusters, {keywords_updated} keywords", meta=self.step_tracker.get_meta())
|
||||
count = save_result.get('count', 0)
|
||||
|
||||
# Build success message based on function type
|
||||
if clusters_created:
|
||||
save_msg = f"Created {clusters_created} clusters, updated {keywords_updated} keywords"
|
||||
elif count:
|
||||
save_msg = f"Saved {count} items"
|
||||
else:
|
||||
save_msg = "Results saved successfully"
|
||||
|
||||
self.console_tracker.save(save_msg)
|
||||
self.step_tracker.add_request_step("SAVE", "success", save_msg)
|
||||
self.tracker.update("SAVE", 98, save_msg, meta=self.step_tracker.get_meta())
|
||||
|
||||
# Store save_msg for use in DONE phase
|
||||
final_save_msg = save_msg
|
||||
|
||||
# Track credit usage after successful save
|
||||
if self.account and raw_response:
|
||||
@@ -175,6 +207,8 @@ class AIEngine:
|
||||
logger.warning(f"Failed to log credit usage: {e}", exc_info=True)
|
||||
|
||||
# Phase 6: DONE - Finalization (98-100%)
|
||||
success_msg = f"Task completed: {final_save_msg}" if 'final_save_msg' in locals() else "Task completed successfully"
|
||||
self.console_tracker.done(success_msg)
|
||||
self.step_tracker.add_request_step("DONE", "success", "Task completed successfully")
|
||||
self.tracker.update("DONE", 100, "Task complete!", meta=self.step_tracker.get_meta())
|
||||
|
||||
@@ -197,6 +231,12 @@ class AIEngine:
|
||||
def _handle_error(self, error: str, fn: BaseAIFunction = None, exc_info=False):
|
||||
"""Centralized error handling"""
|
||||
function_name = fn.get_name() if fn else 'unknown'
|
||||
|
||||
# Log to console tracker if available (Stage 3 requirement)
|
||||
if self.console_tracker:
|
||||
error_type = type(error).__name__ if isinstance(error, Exception) else 'Error'
|
||||
self.console_tracker.error(error_type, str(error), exception=error if isinstance(error, Exception) else None)
|
||||
|
||||
self.step_tracker.add_request_step("Error", "error", error, error=error)
|
||||
|
||||
error_meta = {
|
||||
|
||||
Reference in New Issue
Block a user