This commit is contained in:
Desktop
2025-11-10 22:05:35 +05:00
parent c21ce01cd2
commit 46f5bb4d62
10 changed files with 2193 additions and 0 deletions

View File

@@ -0,0 +1,347 @@
"""
Progress and Step Tracking utilities for AI framework
"""
import time
import logging
from typing import List, Dict, Any, Optional, Callable
from datetime import datetime
from igny8_core.ai.types import StepLog, ProgressState
from igny8_core.ai.constants import DEBUG_MODE
logger = logging.getLogger(__name__)
class StepTracker:
"""Tracks detailed request and response steps for debugging"""
def __init__(self, function_name: str):
self.function_name = function_name
self.request_steps: List[Dict] = []
self.response_steps: List[Dict] = []
self.step_counter = 0
def add_request_step(
self,
step_name: str,
status: str = 'success',
message: str = '',
error: str = None,
duration: int = None
) -> Dict:
"""Add a request step with automatic timing"""
self.step_counter += 1
step = {
'stepNumber': self.step_counter,
'stepName': step_name,
'functionName': self.function_name,
'status': status,
'message': message,
'duration': duration
}
if error:
step['error'] = error
self.request_steps.append(step)
return step
def add_response_step(
self,
step_name: str,
status: str = 'success',
message: str = '',
error: str = None,
duration: int = None
) -> Dict:
"""Add a response step with automatic timing"""
self.step_counter += 1
step = {
'stepNumber': self.step_counter,
'stepName': step_name,
'functionName': self.function_name,
'status': status,
'message': message,
'duration': duration
}
if error:
step['error'] = error
self.response_steps.append(step)
return step
def get_meta(self) -> Dict:
"""Get metadata for progress callback"""
return {
'request_steps': self.request_steps,
'response_steps': self.response_steps
}
class ProgressTracker:
"""Tracks progress updates for AI tasks"""
def __init__(self, celery_task=None):
self.task = celery_task
self.current_phase = 'INIT'
self.current_message = 'Initializing...'
self.current_percentage = 0
self.start_time = time.time()
self.current = 0
self.total = 0
def update(
self,
phase: str,
percentage: int,
message: str,
current: int = None,
total: int = None,
current_item: str = None,
meta: Dict = None
):
"""Update progress with consistent format"""
self.current_phase = phase
self.current_message = message
self.current_percentage = percentage
if current is not None:
self.current = current
if total is not None:
self.total = total
progress_meta = {
'phase': phase,
'percentage': percentage,
'message': message,
'current': self.current,
'total': self.total,
}
if current_item:
progress_meta['current_item'] = current_item
if meta:
progress_meta.update(meta)
# Update Celery task state if available
if self.task:
try:
self.task.update_state(
state='PROGRESS',
meta=progress_meta
)
except Exception as e:
logger.warning(f"Failed to update Celery task state: {e}")
logger.info(f"[{phase}] {percentage}%: {message}")
def set_phase(self, phase: str, percentage: int, message: str, meta: Dict = None):
"""Set progress phase"""
self.update(phase, percentage, message, meta=meta)
def complete(self, message: str = "Task complete!", meta: Dict = None):
"""Mark task as complete"""
final_meta = {
'phase': 'DONE',
'percentage': 100,
'message': message,
'status': 'success'
}
if meta:
final_meta.update(meta)
if self.task:
try:
self.task.update_state(
state='SUCCESS',
meta=final_meta
)
except Exception as e:
logger.warning(f"Failed to update Celery task state: {e}")
def error(self, error_message: str, meta: Dict = None):
"""Mark task as failed"""
error_meta = {
'phase': 'ERROR',
'percentage': 0,
'message': f'Error: {error_message}',
'status': 'error',
'error': error_message
}
if meta:
error_meta.update(meta)
if self.task:
try:
self.task.update_state(
state='FAILURE',
meta=error_meta
)
except Exception as e:
logger.warning(f"Failed to update Celery task state: {e}")
def get_duration(self) -> int:
"""Get elapsed time in milliseconds"""
return int((time.time() - self.start_time) * 1000)
def update_ai_progress(self, state: str, meta: Dict):
"""Callback for AI processor progress updates"""
if isinstance(meta, dict):
percentage = meta.get('percentage', self.current_percentage)
message = meta.get('message', self.current_message)
phase = meta.get('phase', self.current_phase)
self.update(phase, percentage, message, meta=meta)
class CostTracker:
"""Tracks API costs and token usage"""
def __init__(self):
self.total_cost = 0.0
self.total_tokens = 0
self.operations = []
def record(self, function_name: str, cost: float, tokens: int, model: str = None):
"""Record an API call cost"""
self.total_cost += cost
self.total_tokens += tokens
self.operations.append({
'function': function_name,
'cost': cost,
'tokens': tokens,
'model': model
})
def get_total(self) -> float:
"""Get total cost"""
return self.total_cost
def get_total_tokens(self) -> int:
"""Get total tokens"""
return self.total_tokens
def get_operations(self) -> List[Dict]:
"""Get all operations"""
return self.operations
class ConsoleStepTracker:
"""
Lightweight console-based step tracker for AI functions.
Logs each step to console with timestamps and clear labels.
Only logs if DEBUG_MODE is True.
"""
def __init__(self, function_name: str):
self.function_name = function_name
self.start_time = time.time()
self.steps = []
self.current_phase = None
# Debug: Verify DEBUG_MODE is enabled
import sys
if DEBUG_MODE:
init_msg = f"[DEBUG] ConsoleStepTracker initialized for '{function_name}' - DEBUG_MODE is ENABLED"
logger.info(init_msg)
print(init_msg, flush=True, file=sys.stdout)
else:
init_msg = f"[WARNING] ConsoleStepTracker initialized for '{function_name}' - DEBUG_MODE is DISABLED"
logger.warning(init_msg)
print(init_msg, flush=True, file=sys.stdout)
def _log(self, phase: str, message: str, status: str = 'info'):
"""Internal logging method that checks DEBUG_MODE"""
if not DEBUG_MODE:
return
import sys
timestamp = datetime.now().strftime('%H:%M:%S')
phase_label = phase.upper()
if status == 'error':
log_msg = f"[{timestamp}] [{self.function_name}] [{phase_label}] [ERROR] {message}"
# Use logger.error for errors so they're always visible
logger.error(log_msg)
elif status == 'success':
log_msg = f"[{timestamp}] [{self.function_name}] [{phase_label}] ✅ {message}"
logger.info(log_msg)
else:
log_msg = f"[{timestamp}] [{self.function_name}] [{phase_label}] {message}"
logger.info(log_msg)
# Also print to stdout for immediate visibility (works in Celery worker logs)
print(log_msg, flush=True, file=sys.stdout)
self.steps.append({
'timestamp': timestamp,
'phase': phase,
'message': message,
'status': status
})
self.current_phase = phase
def init(self, message: str = "Task started"):
"""Log initialization phase"""
self._log('INIT', message)
def prep(self, message: str):
"""Log preparation phase"""
self._log('PREP', message)
def ai_call(self, message: str):
"""Log AI call phase"""
self._log('AI_CALL', message)
def parse(self, message: str):
"""Log parsing phase"""
self._log('PARSE', message)
def save(self, message: str):
"""Log save phase"""
self._log('SAVE', message)
def done(self, message: str = "Execution completed"):
"""Log completion"""
duration = time.time() - self.start_time
self._log('DONE', f"{message} (Duration: {duration:.2f}s)", status='success')
if DEBUG_MODE:
import sys
complete_msg = f"[{self.function_name}] === AI Task Complete ==="
logger.info(complete_msg)
print(complete_msg, flush=True, file=sys.stdout)
def error(self, error_type: str, message: str, exception: Exception = None):
"""Log error with standardized format"""
error_msg = f"{error_type} {message}"
if exception:
error_msg += f" ({type(exception).__name__})"
self._log(self.current_phase or 'ERROR', error_msg, status='error')
if DEBUG_MODE and exception:
import sys
import traceback
error_trace_msg = f"[{self.function_name}] [ERROR] Stack trace:"
logger.error(error_trace_msg, exc_info=exception)
print(error_trace_msg, flush=True, file=sys.stdout)
traceback.print_exc(file=sys.stdout)
def retry(self, attempt: int, max_attempts: int, reason: str = ""):
"""Log retry attempt"""
msg = f"Retry attempt {attempt}/{max_attempts}"
if reason:
msg += f" {reason}"
self._log('AI_CALL', msg, status='info')
def timeout(self, timeout_seconds: int):
"""Log timeout"""
self.error('Timeout', f"Request timeout after {timeout_seconds}s")
def rate_limit(self, retry_after: str):
"""Log rate limit"""
self.error('RateLimit', f"OpenAI rate limit hit, retry in {retry_after}s")
def malformed_json(self, details: str = ""):
"""Log JSON parsing error"""
msg = "Failed to parse model response: Unexpected JSON"
if details:
msg += f" {details}"
self.error('MalformedJSON', msg)