""" Progress and Step Tracking utilities for AI framework """ import time import logging from typing import List, Dict, Any, Optional, Callable from datetime import datetime from decimal import Decimal from igny8_core.ai.constants import DEBUG_MODE logger = logging.getLogger(__name__) class StepTracker: """Tracks detailed request and response steps for debugging""" def __init__(self, function_name: str): self.function_name = function_name self.request_steps: List[Dict] = [] self.response_steps: List[Dict] = [] self.step_counter = 0 def add_request_step( self, step_name: str, status: str = 'success', message: str = '', error: str = None, duration: int = None ) -> Dict: """Add a request step with automatic timing""" self.step_counter += 1 step = { 'stepNumber': self.step_counter, 'stepName': step_name, 'functionName': self.function_name, 'status': status, 'message': message, 'duration': duration } if error: step['error'] = error self.request_steps.append(step) return step def add_response_step( self, step_name: str, status: str = 'success', message: str = '', error: str = None, duration: int = None ) -> Dict: """Add a response step with automatic timing""" self.step_counter += 1 step = { 'stepNumber': self.step_counter, 'stepName': step_name, 'functionName': self.function_name, 'status': status, 'message': message, 'duration': duration } if error: step['error'] = error self.response_steps.append(step) return step def get_meta(self) -> Dict: """Get metadata for progress callback""" return { 'request_steps': self.request_steps, 'response_steps': self.response_steps } class ProgressTracker: """Tracks progress updates for AI tasks""" def __init__(self, celery_task=None): self.task = celery_task self.current_phase = 'INIT' self.current_message = 'Initializing...' self.current_percentage = 0 self.start_time = time.time() self.current = 0 self.total = 0 def update( self, phase: str, percentage: int, message: str, current: int = None, total: int = None, current_item: str = None, meta: Dict = None ): """Update progress with consistent format""" self.current_phase = phase self.current_message = message self.current_percentage = percentage if current is not None: self.current = current if total is not None: self.total = total progress_meta = { 'phase': phase, 'percentage': percentage, 'message': message, 'current': self.current, 'total': self.total, } if current_item: progress_meta['current_item'] = current_item if meta: progress_meta.update(meta) # Update Celery task state if available if self.task: try: self.task.update_state( state='PROGRESS', meta=progress_meta ) except Exception as e: logger.warning(f"Failed to update Celery task state: {e}") logger.info(f"[{phase}] {percentage}%: {message}") def set_phase(self, phase: str, percentage: int, message: str, meta: Dict = None): """Set progress phase""" self.update(phase, percentage, message, meta=meta) def complete(self, message: str = "Task complete!", meta: Dict = None): """Mark task as complete""" final_meta = { 'phase': 'DONE', 'percentage': 100, 'message': message, 'status': 'success' } if meta: final_meta.update(meta) if self.task: try: self.task.update_state( state='SUCCESS', meta=final_meta ) except Exception as e: logger.warning(f"Failed to update Celery task state: {e}") def error(self, error_message: str, meta: Dict = None): """Mark task as failed""" error_meta = { 'phase': 'ERROR', 'percentage': 0, 'message': f'Error: {error_message}', 'status': 'error', 'error': error_message } if meta: error_meta.update(meta) if self.task: try: self.task.update_state( state='FAILURE', meta=error_meta ) except Exception as e: logger.warning(f"Failed to update Celery task state: {e}") def get_duration(self) -> int: """Get elapsed time in milliseconds""" return int((time.time() - self.start_time) * 1000) def update_ai_progress(self, state: str, meta: Dict): """Callback for AI processor progress updates""" if isinstance(meta, dict): percentage = meta.get('percentage', self.current_percentage) message = meta.get('message', self.current_message) phase = meta.get('phase', self.current_phase) self.update(phase, percentage, message, meta=meta) class CostTracker: """Tracks API costs and token usage""" def __init__(self): self.total_cost = Decimal('0.0') self.total_tokens = 0 self.operations = [] def record(self, function_name: str, cost, tokens: int, model: str = None): """Record an API call cost Args: function_name: Name of the AI function cost: Cost value (can be float or Decimal) tokens: Number of tokens used model: Model name """ # Convert cost to Decimal if it's a float to avoid type mixing if not isinstance(cost, Decimal): cost = Decimal(str(cost)) self.total_cost += cost self.total_tokens += tokens self.operations.append({ 'function': function_name, 'cost': float(cost), # Store as float for JSON serialization 'tokens': tokens, 'model': model }) def get_total(self): """Get total cost (returns float for JSON serialization)""" return float(self.total_cost) def get_total_tokens(self) -> int: """Get total tokens""" return self.total_tokens def get_operations(self) -> List[Dict]: """Get all operations""" return self.operations class ConsoleStepTracker: """ Lightweight console-based step tracker for AI functions. Logs each step to console with timestamps and clear labels. Only logs if DEBUG_MODE is True. """ def __init__(self, function_name: str): self.function_name = function_name self.start_time = time.time() self.steps = [] self.current_phase = None # Debug: Verify DEBUG_MODE is enabled import sys if DEBUG_MODE: init_msg = f"[DEBUG] ConsoleStepTracker initialized for '{function_name}' - DEBUG_MODE is ENABLED" logger.info(init_msg) print(init_msg, flush=True, file=sys.stdout) else: init_msg = f"[WARNING] ConsoleStepTracker initialized for '{function_name}' - DEBUG_MODE is DISABLED" logger.warning(init_msg) print(init_msg, flush=True, file=sys.stdout) def _log(self, phase: str, message: str, status: str = 'info'): """Internal logging method that checks DEBUG_MODE""" if not DEBUG_MODE: return import sys timestamp = datetime.now().strftime('%H:%M:%S') phase_label = phase.upper() if status == 'error': log_msg = f"[{timestamp}] [{self.function_name}] [{phase_label}] [ERROR] {message}" # Use logger.error for errors so they're always visible logger.error(log_msg) elif status == 'success': log_msg = f"[{timestamp}] [{self.function_name}] [{phase_label}] ✅ {message}" logger.info(log_msg) else: log_msg = f"[{timestamp}] [{self.function_name}] [{phase_label}] {message}" logger.info(log_msg) # Also print to stdout for immediate visibility (works in Celery worker logs) print(log_msg, flush=True, file=sys.stdout) self.steps.append({ 'timestamp': timestamp, 'phase': phase, 'message': message, 'status': status }) self.current_phase = phase def init(self, message: str = "Task started"): """Log initialization phase""" self._log('INIT', message) def prep(self, message: str): """Log preparation phase""" self._log('PREP', message) def ai_call(self, message: str): """Log AI call phase""" self._log('AI_CALL', message) def parse(self, message: str): """Log parsing phase""" self._log('PARSE', message) def save(self, message: str): """Log save phase""" self._log('SAVE', message) def done(self, message: str = "Execution completed"): """Log completion""" duration = time.time() - self.start_time self._log('DONE', f"{message} (Duration: {duration:.2f}s)", status='success') if DEBUG_MODE: import sys complete_msg = f"[{self.function_name}] === AI Task Complete ===" logger.info(complete_msg) print(complete_msg, flush=True, file=sys.stdout) def error(self, error_type: str, message: str, exception: Exception = None): """Log error with standardized format""" error_msg = f"{error_type} – {message}" if exception: error_msg += f" ({type(exception).__name__})" self._log(self.current_phase or 'ERROR', error_msg, status='error') if DEBUG_MODE and exception: import sys import traceback error_trace_msg = f"[{self.function_name}] [ERROR] Stack trace:" logger.error(error_trace_msg, exc_info=exception) print(error_trace_msg, flush=True, file=sys.stdout) traceback.print_exc(file=sys.stdout) def retry(self, attempt: int, max_attempts: int, reason: str = ""): """Log retry attempt""" msg = f"Retry attempt {attempt}/{max_attempts}" if reason: msg += f" – {reason}" self._log('AI_CALL', msg, status='info') def timeout(self, timeout_seconds: int): """Log timeout""" self.error('Timeout', f"Request timeout after {timeout_seconds}s") def rate_limit(self, retry_after: str): """Log rate limit""" self.error('RateLimit', f"OpenAI rate limit hit, retry in {retry_after}s") def malformed_json(self, details: str = ""): """Log JSON parsing error""" msg = "Failed to parse model response: Unexpected JSON" if details: msg += f" – {details}" self.error('MalformedJSON', msg)