120 lines
4.1 KiB
Python
120 lines
4.1 KiB
Python
"""
|
|
Unified Celery task entrypoint for all AI functions
|
|
"""
|
|
import logging
|
|
from celery import shared_task
|
|
from igny8_core.ai.engine import AIEngine
|
|
from igny8_core.ai.registry import get_function_instance
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@shared_task(bind=True, max_retries=3)
|
|
def run_ai_task(self, function_name: str, payload: dict, account_id: int = None):
|
|
"""
|
|
Single Celery entrypoint for all AI functions.
|
|
Dynamically loads and executes the requested function.
|
|
|
|
Args:
|
|
function_name: Name of the AI function (e.g., 'auto_cluster')
|
|
payload: Function-specific payload
|
|
account_id: Account ID for account isolation
|
|
"""
|
|
logger.info("=" * 80)
|
|
logger.info(f"run_ai_task STARTED: {function_name}")
|
|
logger.info(f" - Task ID: {self.request.id}")
|
|
logger.info(f" - Function: {function_name}")
|
|
logger.info(f" - Account ID: {account_id}")
|
|
logger.info(f" - Payload keys: {list(payload.keys())}")
|
|
logger.info("=" * 80)
|
|
|
|
try:
|
|
# Get account
|
|
account = None
|
|
if account_id:
|
|
from igny8_core.auth.models import Account
|
|
try:
|
|
account = Account.objects.get(id=account_id)
|
|
except Account.DoesNotExist:
|
|
logger.warning(f"Account {account_id} not found")
|
|
|
|
# Get function from registry
|
|
fn = get_function_instance(function_name)
|
|
if not fn:
|
|
error_msg = f'Function {function_name} not found in registry'
|
|
logger.error(error_msg)
|
|
return {
|
|
'success': False,
|
|
'error': error_msg
|
|
}
|
|
|
|
# Create engine and execute
|
|
engine = AIEngine(celery_task=self, account=account)
|
|
result = engine.execute(fn, payload)
|
|
|
|
logger.info("=" * 80)
|
|
logger.info(f"run_ai_task COMPLETED: {function_name}")
|
|
logger.info(f" - Success: {result.get('success')}")
|
|
if not result.get('success'):
|
|
logger.error(f" - Error: {result.get('error')}")
|
|
logger.info("=" * 80)
|
|
|
|
# If execution failed, raise exception so Celery marks it as FAILURE
|
|
if not result.get('success'):
|
|
error_msg = result.get('error', 'Task execution failed')
|
|
error_type = result.get('error_type', 'ExecutionError')
|
|
# Update task state before raising
|
|
try:
|
|
self.update_state(
|
|
state='FAILURE',
|
|
meta={
|
|
'error': error_msg,
|
|
'error_type': error_type,
|
|
'function_name': function_name,
|
|
'phase': result.get('phase', 'ERROR'),
|
|
'percentage': 0,
|
|
'message': f'Error: {error_msg}',
|
|
'request_steps': result.get('request_steps', []),
|
|
'response_steps': result.get('response_steps', [])
|
|
}
|
|
)
|
|
except Exception:
|
|
pass
|
|
# Raise exception so Celery properly tracks failure
|
|
raise Exception(f"{error_type}: {error_msg}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
error_type = type(e).__name__
|
|
error_msg = str(e)
|
|
|
|
logger.error("=" * 80)
|
|
logger.error(f"run_ai_task FAILED: {function_name}")
|
|
logger.error(f" - Error: {error_type}: {error_msg}")
|
|
logger.error("=" * 80, exc_info=True)
|
|
|
|
# Update task state with error details
|
|
try:
|
|
self.update_state(
|
|
state='FAILURE',
|
|
meta={
|
|
'error': error_msg,
|
|
'error_type': error_type,
|
|
'function_name': function_name,
|
|
'phase': 'ERROR',
|
|
'percentage': 0,
|
|
'message': f'Error: {error_msg}'
|
|
}
|
|
)
|
|
except Exception:
|
|
pass # Don't fail if state update fails
|
|
|
|
return {
|
|
'success': False,
|
|
'error': error_msg,
|
|
'error_type': error_type,
|
|
'function_name': function_name
|
|
}
|
|
|