Files
igny8/backend/igny8_core/ai/tracker.py
alorig 98e900da73 fg
2025-11-09 23:44:39 +05:00

341 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Progress and Step Tracking utilities for AI framework
"""
import time
import logging
from typing import List, Dict, Any, Optional, Callable
from datetime import datetime
from igny8_core.ai.types import StepLog, ProgressState
from igny8_core.ai.constants import DEBUG_MODE
logger = logging.getLogger(__name__)
class StepTracker:
"""Tracks detailed request and response steps for debugging"""
def __init__(self, function_name: str):
self.function_name = function_name
self.request_steps: List[Dict] = []
self.response_steps: List[Dict] = []
self.step_counter = 0
def add_request_step(
self,
step_name: str,
status: str = 'success',
message: str = '',
error: str = None,
duration: int = None
) -> Dict:
"""Add a request step with automatic timing"""
self.step_counter += 1
step = {
'stepNumber': self.step_counter,
'stepName': step_name,
'functionName': self.function_name,
'status': status,
'message': message,
'duration': duration
}
if error:
step['error'] = error
self.request_steps.append(step)
return step
def add_response_step(
self,
step_name: str,
status: str = 'success',
message: str = '',
error: str = None,
duration: int = None
) -> Dict:
"""Add a response step with automatic timing"""
self.step_counter += 1
step = {
'stepNumber': self.step_counter,
'stepName': step_name,
'functionName': self.function_name,
'status': status,
'message': message,
'duration': duration
}
if error:
step['error'] = error
self.response_steps.append(step)
return step
def get_meta(self) -> Dict:
"""Get metadata for progress callback"""
return {
'request_steps': self.request_steps,
'response_steps': self.response_steps
}
class ProgressTracker:
"""Tracks progress updates for AI tasks"""
def __init__(self, celery_task=None):
self.task = celery_task
self.current_phase = 'INIT'
self.current_message = 'Initializing...'
self.current_percentage = 0
self.start_time = time.time()
self.current = 0
self.total = 0
def update(
self,
phase: str,
percentage: int,
message: str,
current: int = None,
total: int = None,
current_item: str = None,
meta: Dict = None
):
"""Update progress with consistent format"""
self.current_phase = phase
self.current_message = message
self.current_percentage = percentage
if current is not None:
self.current = current
if total is not None:
self.total = total
progress_meta = {
'phase': phase,
'percentage': percentage,
'message': message,
'current': self.current,
'total': self.total,
}
if current_item:
progress_meta['current_item'] = current_item
if meta:
progress_meta.update(meta)
# Update Celery task state if available
if self.task:
try:
self.task.update_state(
state='PROGRESS',
meta=progress_meta
)
except Exception as e:
logger.warning(f"Failed to update Celery task state: {e}")
logger.info(f"[{phase}] {percentage}%: {message}")
def set_phase(self, phase: str, percentage: int, message: str, meta: Dict = None):
"""Set progress phase"""
self.update(phase, percentage, message, meta=meta)
def complete(self, message: str = "Task complete!", meta: Dict = None):
"""Mark task as complete"""
final_meta = {
'phase': 'DONE',
'percentage': 100,
'message': message,
'status': 'success'
}
if meta:
final_meta.update(meta)
if self.task:
try:
self.task.update_state(
state='SUCCESS',
meta=final_meta
)
except Exception as e:
logger.warning(f"Failed to update Celery task state: {e}")
def error(self, error_message: str, meta: Dict = None):
"""Mark task as failed"""
error_meta = {
'phase': 'ERROR',
'percentage': 0,
'message': f'Error: {error_message}',
'status': 'error',
'error': error_message
}
if meta:
error_meta.update(meta)
if self.task:
try:
self.task.update_state(
state='FAILURE',
meta=error_meta
)
except Exception as e:
logger.warning(f"Failed to update Celery task state: {e}")
def get_duration(self) -> int:
"""Get elapsed time in milliseconds"""
return int((time.time() - self.start_time) * 1000)
def update_ai_progress(self, state: str, meta: Dict):
"""Callback for AI processor progress updates"""
if isinstance(meta, dict):
percentage = meta.get('percentage', self.current_percentage)
message = meta.get('message', self.current_message)
phase = meta.get('phase', self.current_phase)
self.update(phase, percentage, message, meta=meta)
class CostTracker:
"""Tracks API costs and token usage"""
def __init__(self):
self.total_cost = 0.0
self.total_tokens = 0
self.operations = []
def record(self, function_name: str, cost: float, tokens: int, model: str = None):
"""Record an API call cost"""
self.total_cost += cost
self.total_tokens += tokens
self.operations.append({
'function': function_name,
'cost': cost,
'tokens': tokens,
'model': model
})
def get_total(self) -> float:
"""Get total cost"""
return self.total_cost
def get_total_tokens(self) -> int:
"""Get total tokens"""
return self.total_tokens
def get_operations(self) -> List[Dict]:
"""Get all operations"""
return self.operations
class ConsoleStepTracker:
"""
Lightweight console-based step tracker for AI functions.
Logs each step to console with timestamps and clear labels.
Only logs if DEBUG_MODE is True.
"""
def __init__(self, function_name: str):
self.function_name = function_name
self.start_time = time.time()
self.steps = []
self.current_phase = None
# Debug: Verify DEBUG_MODE is enabled
if DEBUG_MODE:
print(f"[DEBUG] ConsoleStepTracker initialized for '{function_name}' - DEBUG_MODE is ENABLED", flush=True)
logger.info(f"ConsoleStepTracker initialized for '{function_name}' - DEBUG_MODE is ENABLED")
else:
print(f"[WARNING] ConsoleStepTracker initialized for '{function_name}' - DEBUG_MODE is DISABLED", flush=True)
logger.warning(f"ConsoleStepTracker initialized for '{function_name}' - DEBUG_MODE is DISABLED")
def _log(self, phase: str, message: str, status: str = 'info'):
"""Internal logging method that checks DEBUG_MODE"""
if not DEBUG_MODE:
return
import sys
timestamp = datetime.now().strftime('%H:%M:%S')
phase_label = phase.upper()
if status == 'error':
log_msg = f"[{timestamp}] [{self.function_name}] [{phase_label}] [ERROR] {message}"
elif status == 'success':
log_msg = f"[{timestamp}] [{self.function_name}] [{phase_label}] ✅ {message}"
else:
log_msg = f"[{timestamp}] [{self.function_name}] [{phase_label}] {message}"
# Print and flush immediately to ensure console output
print(log_msg, flush=True)
# Also log to Python logger for better visibility
logger.info(log_msg)
self.steps.append({
'timestamp': timestamp,
'phase': phase,
'message': message,
'status': status
})
self.current_phase = phase
def init(self, message: str = "Task started"):
"""Log initialization phase"""
self._log('INIT', message)
def prep(self, message: str):
"""Log preparation phase"""
self._log('PREP', message)
def ai_call(self, message: str):
"""Log AI call phase"""
self._log('AI_CALL', message)
def parse(self, message: str):
"""Log parsing phase"""
self._log('PARSE', message)
def save(self, message: str):
"""Log save phase"""
self._log('SAVE', message)
def done(self, message: str = "Execution completed"):
"""Log completion"""
duration = time.time() - self.start_time
self._log('DONE', f"{message} (Duration: {duration:.2f}s)", status='success')
if DEBUG_MODE:
import sys
print(f"[{self.function_name}] === AI Task Complete ===", flush=True)
logger.info(f"[{self.function_name}] === AI Task Complete ===")
def error(self, error_type: str, message: str, exception: Exception = None):
"""Log error with standardized format"""
error_msg = f"{error_type} {message}"
if exception:
error_msg += f" ({type(exception).__name__})"
self._log(self.current_phase or 'ERROR', error_msg, status='error')
if DEBUG_MODE and exception:
import sys
import traceback
print(f"[{self.function_name}] [ERROR] Stack trace:", flush=True)
traceback.print_exc()
logger.error(f"[{self.function_name}] [ERROR] Stack trace:", exc_info=exception)
def retry(self, attempt: int, max_attempts: int, reason: str = ""):
"""Log retry attempt"""
msg = f"Retry attempt {attempt}/{max_attempts}"
if reason:
msg += f" {reason}"
self._log('AI_CALL', msg, status='info')
def timeout(self, timeout_seconds: int):
"""Log timeout"""
self.error('Timeout', f"Request timeout after {timeout_seconds}s")
def rate_limit(self, retry_after: str):
"""Log rate limit"""
self.error('RateLimit', f"OpenAI rate limit hit, retry in {retry_after}s")
def malformed_json(self, details: str = ""):
"""Log JSON parsing error"""
msg = "Failed to parse model response: Unexpected JSON"
if details:
msg += f" {details}"
self.error('MalformedJSON', msg)