This commit is contained in:
IGNY8 VPS (Salman)
2025-12-08 06:02:04 +00:00
parent 191287829f
commit 156742d679
23 changed files with 442 additions and 0 deletions

View File

@@ -0,0 +1,868 @@
"""
AI Core - Centralized execution and logging layer for all AI requests
Handles API calls, model selection, response parsing, and console logging
"""
import logging
import json
import re
import requests
import time
from typing import Dict, Any, Optional, List
from django.conf import settings
from .constants import (
DEFAULT_AI_MODEL,
JSON_MODE_MODELS,
MODEL_RATES,
IMAGE_MODEL_RATES,
VALID_OPENAI_IMAGE_MODELS,
VALID_SIZES_BY_MODEL,
DEBUG_MODE,
)
from .tracker import ConsoleStepTracker
logger = logging.getLogger(__name__)
class AICore:
"""
Centralized AI operations handler with console logging.
All AI requests go through run_ai_request() for consistent execution and logging.
"""
def __init__(self, account=None):
"""
Initialize AICore with account context.
Args:
account: Optional account object for API key/model loading
"""
self.account = account
self._openai_api_key = None
self._runware_api_key = None
self._load_account_settings()
def _load_account_settings(self):
"""Load API keys from IntegrationSettings with fallbacks (account -> system account -> Django settings)"""
def get_system_account():
try:
from igny8_core.auth.models import Account
for slug in ['aws-admin', 'default-account', 'default']:
acct = Account.objects.filter(slug=slug).first()
if acct:
return acct
except Exception:
return None
return None
def get_integration_key(integration_type: str, account):
if not account:
return None
try:
from igny8_core.modules.system.models import IntegrationSettings
settings_obj = IntegrationSettings.objects.filter(
integration_type=integration_type,
account=account,
is_active=True
).first()
if settings_obj and settings_obj.config:
return settings_obj.config.get('apiKey')
except Exception as e:
logger.warning(f"Could not load {integration_type} settings for account {getattr(account, 'id', None)}: {e}", exc_info=True)
return None
# 1) Account-specific keys
if self.account:
self._openai_api_key = get_integration_key('openai', self.account)
self._runware_api_key = get_integration_key('runware', self.account)
# 2) Fallback to system account keys (shared across tenants)
if not self._openai_api_key or not self._runware_api_key:
system_account = get_system_account()
if not self._openai_api_key:
self._openai_api_key = get_integration_key('openai', system_account)
if not self._runware_api_key:
self._runware_api_key = get_integration_key('runware', system_account)
# 3) Fallback to Django settings
if not self._openai_api_key:
self._openai_api_key = getattr(settings, 'OPENAI_API_KEY', None)
if not self._runware_api_key:
self._runware_api_key = getattr(settings, 'RUNWARE_API_KEY', None)
def get_api_key(self, integration_type: str = 'openai') -> Optional[str]:
"""Get API key for integration type"""
if integration_type == 'openai':
return self._openai_api_key
elif integration_type == 'runware':
return self._runware_api_key
return None
def get_model(self, integration_type: str = 'openai') -> str:
"""
Get model for integration type.
DEPRECATED: Model should be passed directly to run_ai_request().
This method is kept for backward compatibility but raises an error.
"""
raise ValueError(
"get_model() is deprecated. Model must be passed directly to run_ai_request(). "
"Use get_model_config() from settings.py to get model from IntegrationSettings."
)
def run_ai_request(
self,
prompt: str,
model: str,
max_tokens: int = 4000,
temperature: float = 0.7,
response_format: Optional[Dict] = None,
api_key: Optional[str] = None,
function_name: str = 'ai_request',
function_id: Optional[str] = None,
tracker: Optional[ConsoleStepTracker] = None
) -> Dict[str, Any]:
"""
Centralized AI request handler with console logging.
All AI text generation requests go through this method.
Args:
prompt: Prompt text
model: Model name (required - must be provided from IntegrationSettings)
max_tokens: Maximum tokens
temperature: Temperature (0-1)
response_format: Optional response format dict (for JSON mode)
api_key: Optional API key override
function_name: Function name for logging (e.g., 'cluster_keywords')
tracker: Optional ConsoleStepTracker instance for logging
Returns:
Dict with 'content', 'input_tokens', 'output_tokens', 'total_tokens',
'model', 'cost', 'error', 'api_id'
Raises:
ValueError: If model is not provided
"""
# Use provided tracker or create a new one
if tracker is None:
tracker = ConsoleStepTracker(function_name)
tracker.ai_call("Preparing request...")
# Step 1: Validate model is provided
if not model:
error_msg = "Model is required. Ensure IntegrationSettings is configured for the account."
tracker.error('ConfigurationError', error_msg)
logger.error(f"[AICore] {error_msg}")
return {
'content': None,
'error': error_msg,
'input_tokens': 0,
'output_tokens': 0,
'total_tokens': 0,
'model': None,
'cost': 0.0,
'api_id': None,
}
# Step 2: Validate API key
api_key = api_key or self._openai_api_key
if not api_key:
error_msg = 'OpenAI API key not configured'
tracker.error('ConfigurationError', error_msg)
return {
'content': None,
'error': error_msg,
'input_tokens': 0,
'output_tokens': 0,
'total_tokens': 0,
'model': model,
'cost': 0.0,
'api_id': None,
}
# Step 3: Use provided model (no fallback)
active_model = model
# Debug logging: Show model used
logger.info(f"[AICore] Model Configuration:")
logger.info(f" - Model parameter passed: {model}")
logger.info(f" - Model used in request: {active_model}")
tracker.ai_call(f"Using model: {active_model}")
if active_model not in MODEL_RATES:
error_msg = f"Model '{active_model}' is not supported. Supported models: {list(MODEL_RATES.keys())}"
logger.error(f"[AICore] {error_msg}")
tracker.error('ConfigurationError', error_msg)
return {
'content': None,
'error': error_msg,
'input_tokens': 0,
'output_tokens': 0,
'total_tokens': 0,
'model': active_model,
'cost': 0.0,
'api_id': None,
}
tracker.ai_call(f"Using model: {active_model}")
# Step 3: Auto-enable JSON mode for supported models
if response_format is None and active_model in JSON_MODE_MODELS:
response_format = {'type': 'json_object'}
tracker.ai_call(f"Auto-enabled JSON mode for {active_model}")
elif response_format:
tracker.ai_call(f"Using custom response format: {response_format}")
else:
tracker.ai_call("Using text response format")
# Step 4: Validate prompt length and add function_id
prompt_length = len(prompt)
tracker.ai_call(f"Prompt length: {prompt_length} characters")
# Add function_id to prompt if provided (for tracking)
final_prompt = prompt
if function_id:
function_id_prefix = f'function_id: "{function_id}"\n\n'
final_prompt = function_id_prefix + prompt
tracker.ai_call(f"Added function_id to prompt: {function_id}")
# Step 5: Build request payload
url = 'https://api.openai.com/v1/chat/completions'
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
}
body_data = {
'model': active_model,
'messages': [{'role': 'user', 'content': final_prompt}],
'temperature': temperature,
}
if max_tokens:
body_data['max_tokens'] = max_tokens
if response_format:
body_data['response_format'] = response_format
tracker.ai_call(f"Request payload prepared (model={active_model}, max_tokens={max_tokens}, temp={temperature})")
# Step 6: Send request
tracker.ai_call("Sending request to OpenAI API...")
request_start = time.time()
try:
response = requests.post(url, headers=headers, json=body_data, timeout=60)
request_duration = time.time() - request_start
tracker.ai_call(f"Received response in {request_duration:.2f}s (status={response.status_code})")
# Step 7: Validate HTTP response
if response.status_code != 200:
error_data = response.json() if response.headers.get('content-type', '').startswith('application/json') else {}
error_message = f"HTTP {response.status_code} error"
if isinstance(error_data, dict) and 'error' in error_data:
if isinstance(error_data['error'], dict) and 'message' in error_data['error']:
error_message += f": {error_data['error']['message']}"
# Check for rate limit
if response.status_code == 429:
retry_after = response.headers.get('retry-after', '60')
tracker.rate_limit(retry_after)
error_message += f" (Rate limit - retry after {retry_after}s)"
else:
tracker.error('HTTPError', error_message)
logger.error(f"OpenAI API HTTP error {response.status_code}: {error_message}")
return {
'content': None,
'error': error_message,
'input_tokens': 0,
'output_tokens': 0,
'total_tokens': 0,
'model': active_model,
'cost': 0.0,
'api_id': None,
}
# Step 8: Parse response JSON
try:
data = response.json()
except json.JSONDecodeError as e:
error_msg = f'Failed to parse JSON response: {str(e)}'
tracker.malformed_json(str(e))
logger.error(error_msg)
return {
'content': None,
'error': error_msg,
'input_tokens': 0,
'output_tokens': 0,
'total_tokens': 0,
'model': active_model,
'cost': 0.0,
'api_id': None,
}
api_id = data.get('id')
# Step 9: Extract content
if 'choices' in data and len(data['choices']) > 0:
content = data['choices'][0]['message']['content']
usage = data.get('usage', {})
input_tokens = usage.get('prompt_tokens', 0)
output_tokens = usage.get('completion_tokens', 0)
total_tokens = usage.get('total_tokens', 0)
tracker.parse(f"Received {total_tokens} tokens (input: {input_tokens}, output: {output_tokens})")
tracker.parse(f"Content length: {len(content)} characters")
# Step 10: Calculate cost
rates = MODEL_RATES.get(active_model, {'input': 2.00, 'output': 8.00})
cost = (input_tokens * rates['input'] + output_tokens * rates['output']) / 1_000_000
tracker.parse(f"Cost calculated: ${cost:.6f}")
tracker.done("Request completed successfully")
return {
'content': content,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'total_tokens': total_tokens,
'model': active_model,
'cost': cost,
'error': None,
'api_id': api_id,
'duration': request_duration, # Add duration tracking
}
else:
error_msg = 'No content in OpenAI response'
tracker.error('EmptyResponse', error_msg)
logger.error(error_msg)
return {
'content': None,
'error': error_msg,
'input_tokens': 0,
'output_tokens': 0,
'total_tokens': 0,
'model': active_model,
'cost': 0.0,
'api_id': api_id,
}
except requests.exceptions.Timeout:
error_msg = 'Request timeout (60s exceeded)'
tracker.timeout(60)
logger.error(error_msg)
return {
'content': None,
'error': error_msg,
'input_tokens': 0,
'output_tokens': 0,
'total_tokens': 0,
'model': active_model,
'cost': 0.0,
'api_id': None,
}
except requests.exceptions.RequestException as e:
error_msg = f'Request exception: {str(e)}'
tracker.error('RequestException', error_msg, e)
logger.error(f"OpenAI API error: {error_msg}", exc_info=True)
return {
'content': None,
'error': error_msg,
'input_tokens': 0,
'output_tokens': 0,
'total_tokens': 0,
'model': active_model,
'cost': 0.0,
'api_id': None,
}
except Exception as e:
error_msg = f'Unexpected error: {str(e)}'
logger.error(f"[AI][{function_name}][Error] {error_msg}", exc_info=True)
if tracker:
tracker.error('UnexpectedError', error_msg, e)
return {
'content': None,
'error': error_msg,
'input_tokens': 0,
'output_tokens': 0,
'total_tokens': 0,
'model': active_model,
'cost': 0.0,
'api_id': None,
}
def extract_json(self, response_text: str) -> Optional[Dict]:
"""
Extract JSON from response text.
Handles markdown code blocks, multiline JSON, etc.
Args:
response_text: Raw response text from AI
Returns:
Parsed JSON dict or None
"""
if not response_text or not response_text.strip():
return None
# Try direct JSON parse first
try:
return json.loads(response_text.strip())
except json.JSONDecodeError:
pass
# Try to extract JSON from markdown code blocks
json_block_pattern = r'```(?:json)?\s*(\{.*?\}|\[.*?\])\s*```'
matches = re.findall(json_block_pattern, response_text, re.DOTALL)
if matches:
try:
return json.loads(matches[0])
except json.JSONDecodeError:
pass
# Try to find JSON object/array in text
json_pattern = r'(\{.*\}|\[.*\])'
matches = re.findall(json_pattern, response_text, re.DOTALL)
for match in matches:
try:
return json.loads(match)
except json.JSONDecodeError:
continue
return None
def generate_image(
self,
prompt: str,
provider: str = 'openai',
model: Optional[str] = None,
size: str = '1024x1024',
n: int = 1,
api_key: Optional[str] = None,
negative_prompt: Optional[str] = None,
function_name: str = 'generate_image'
) -> Dict[str, Any]:
"""
Generate image using AI with console logging.
Args:
prompt: Image prompt
provider: 'openai' or 'runware'
model: Model name
size: Image size
n: Number of images
api_key: Optional API key override
negative_prompt: Optional negative prompt
function_name: Function name for logging
Returns:
Dict with 'url', 'revised_prompt', 'cost', 'error', etc.
"""
print(f"[AI][{function_name}] Step 1: Preparing image generation request...")
if provider == 'openai':
return self._generate_image_openai(prompt, model, size, n, api_key, negative_prompt, function_name)
elif provider == 'runware':
return self._generate_image_runware(prompt, model, size, n, api_key, negative_prompt, function_name)
else:
error_msg = f'Unknown provider: {provider}'
print(f"[AI][{function_name}][Error] {error_msg}")
return {
'url': None,
'revised_prompt': None,
'provider': provider,
'cost': 0.0,
'error': error_msg,
}
def _generate_image_openai(
self,
prompt: str,
model: Optional[str],
size: str,
n: int,
api_key: Optional[str],
negative_prompt: Optional[str],
function_name: str
) -> Dict[str, Any]:
"""Generate image using OpenAI DALL-E"""
print(f"[AI][{function_name}] Provider: OpenAI")
# Determine character limit based on model
# DALL-E 2: 1000 chars, DALL-E 3: 4000 chars
model = model or 'dall-e-3'
if model == 'dall-e-2':
max_length = 1000
elif model == 'dall-e-3':
max_length = 4000
else:
# Default to 1000 for safety
max_length = 1000
# CRITICAL: Truncate prompt to model-specific limit BEFORE any processing
if len(prompt) > max_length:
print(f"[AI][{function_name}][Warning] Prompt too long ({len(prompt)} chars), truncating to {max_length} for {model}")
# Try word-aware truncation, but fallback to hard truncate if no space found
truncated = prompt[:max_length - 3]
last_space = truncated.rfind(' ')
if last_space > max_length * 0.9: # Only use word-aware if we have a reasonable space
prompt = truncated[:last_space] + "..."
else:
prompt = prompt[:max_length] # Hard truncate if no good space found
print(f"[AI][{function_name}] Truncated prompt length: {len(prompt)}")
# Final safety check
if len(prompt) > max_length:
prompt = prompt[:max_length]
print(f"[AI][{function_name}][Error] Had to hard truncate to exactly {max_length} chars")
api_key = api_key or self._openai_api_key
if not api_key:
error_msg = 'OpenAI API key not configured'
print(f"[AI][{function_name}][Error] {error_msg}")
return {
'url': None,
'revised_prompt': None,
'provider': 'openai',
'cost': 0.0,
'error': error_msg,
}
model = model or 'dall-e-3'
print(f"[AI][{function_name}] Step 2: Using model: {model}, size: {size}")
# Validate model
if model not in VALID_OPENAI_IMAGE_MODELS:
error_msg = f"Model '{model}' is not valid for OpenAI image generation. Only {', '.join(VALID_OPENAI_IMAGE_MODELS)} are supported."
print(f"[AI][{function_name}][Error] {error_msg}")
return {
'url': None,
'revised_prompt': None,
'provider': 'openai',
'cost': 0.0,
'error': error_msg,
}
# Validate size
valid_sizes = VALID_SIZES_BY_MODEL.get(model, [])
if size not in valid_sizes:
error_msg = f"Image size '{size}' is not valid for model '{model}'. Valid sizes: {', '.join(valid_sizes)}"
print(f"[AI][{function_name}][Error] {error_msg}")
return {
'url': None,
'revised_prompt': None,
'provider': 'openai',
'cost': 0.0,
'error': error_msg,
}
url = 'https://api.openai.com/v1/images/generations'
print(f"[AI][{function_name}] Step 3: Sending request to OpenAI Images API...")
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
}
data = {
'model': model,
'prompt': prompt,
'n': n,
'size': size
}
if negative_prompt:
# Note: OpenAI DALL-E doesn't support negative_prompt in API, but we log it
print(f"[AI][{function_name}] Note: Negative prompt provided but OpenAI DALL-E doesn't support it")
request_start = time.time()
try:
response = requests.post(url, headers=headers, json=data, timeout=150)
request_duration = time.time() - request_start
print(f"[AI][{function_name}] Step 4: Received response in {request_duration:.2f}s (status={response.status_code})")
if response.status_code != 200:
error_data = response.json() if response.headers.get('content-type', '').startswith('application/json') else {}
error_message = f"HTTP {response.status_code} error"
if isinstance(error_data, dict) and 'error' in error_data:
if isinstance(error_data['error'], dict) and 'message' in error_data['error']:
error_message += f": {error_data['error']['message']}"
print(f"[AI][{function_name}][Error] {error_message}")
return {
'url': None,
'revised_prompt': None,
'provider': 'openai',
'cost': 0.0,
'error': error_message,
}
body = response.json()
if 'data' in body and len(body['data']) > 0:
image_data = body['data'][0]
image_url = image_data.get('url')
revised_prompt = image_data.get('revised_prompt')
cost = IMAGE_MODEL_RATES.get(model, 0.040) * n
print(f"[AI][{function_name}] Step 5: Image generated successfully")
print(f"[AI][{function_name}] Step 6: Cost: ${cost:.4f}")
print(f"[AI][{function_name}][Success] Image generation completed")
return {
'url': image_url,
'revised_prompt': revised_prompt,
'provider': 'openai',
'cost': cost,
'error': None,
}
else:
error_msg = 'No image data in response'
print(f"[AI][{function_name}][Error] {error_msg}")
return {
'url': None,
'revised_prompt': None,
'provider': 'openai',
'cost': 0.0,
'error': error_msg,
}
except requests.exceptions.Timeout:
error_msg = 'Request timeout (150s exceeded)'
print(f"[AI][{function_name}][Error] {error_msg}")
return {
'url': None,
'revised_prompt': None,
'provider': 'openai',
'cost': 0.0,
'error': error_msg,
}
except Exception as e:
error_msg = f'Unexpected error: {str(e)}'
print(f"[AI][{function_name}][Error] {error_msg}")
logger.error(error_msg, exc_info=True)
return {
'url': None,
'revised_prompt': None,
'provider': 'openai',
'cost': 0.0,
'error': error_msg,
}
def _generate_image_runware(
self,
prompt: str,
model: Optional[str],
size: str,
n: int,
api_key: Optional[str],
negative_prompt: Optional[str],
function_name: str
) -> Dict[str, Any]:
"""Generate image using Runware"""
print(f"[AI][{function_name}] Provider: Runware")
api_key = api_key or self._runware_api_key
if not api_key:
error_msg = 'Runware API key not configured'
print(f"[AI][{function_name}][Error] {error_msg}")
return {
'url': None,
'provider': 'runware',
'cost': 0.0,
'error': error_msg,
}
runware_model = model or 'runware:97@1'
print(f"[AI][{function_name}] Step 2: Using model: {runware_model}, size: {size}")
# Parse size
try:
width, height = map(int, size.split('x'))
except ValueError:
error_msg = f"Invalid size format: {size}. Expected format: WIDTHxHEIGHT"
print(f"[AI][{function_name}][Error] {error_msg}")
return {
'url': None,
'provider': 'runware',
'cost': 0.0,
'error': error_msg,
}
url = 'https://api.runware.ai/v1'
print(f"[AI][{function_name}] Step 3: Sending request to Runware API...")
print(f"[AI][{function_name}] Runware API key check: has_key={bool(api_key)}, key_length={len(api_key) if api_key else 0}")
# Runware uses array payload with authentication task first, then imageInference
# Reference: image-generation.php lines 79-97
import uuid
payload = [
{
'taskType': 'authentication',
'apiKey': api_key
},
{
'taskType': 'imageInference',
'taskUUID': str(uuid.uuid4()),
'positivePrompt': prompt,
'negativePrompt': negative_prompt or '',
'model': runware_model,
'width': width,
'height': height,
'steps': 30,
'CFGScale': 7.5,
'numberResults': 1,
'outputFormat': 'webp'
}
]
request_start = time.time()
try:
response = requests.post(url, json=payload, timeout=150)
request_duration = time.time() - request_start
print(f"[AI][{function_name}] Step 4: Received response in {request_duration:.2f}s (status={response.status_code})")
if response.status_code != 200:
error_msg = f"HTTP {response.status_code} error"
print(f"[AI][{function_name}][Error] {error_msg}")
return {
'url': None,
'provider': 'runware',
'cost': 0.0,
'error': error_msg,
}
body = response.json()
print(f"[AI][{function_name}] Runware response type: {type(body)}, length: {len(body) if isinstance(body, list) else 'N/A'}")
logger.info(f"[AI][{function_name}] Runware response body (first 1000 chars): {str(body)[:1000]}")
# Runware returns array: [auth_result, image_result]
# image_result has 'data' array with image objects containing 'imageURL'
# Reference: AIProcessor has more robust parsing - match that logic
image_url = None
error_msg = None
if isinstance(body, list):
# Case 1: Array response - find the imageInference result
print(f"[AI][{function_name}] Response is array with {len(body)} elements")
for idx, item in enumerate(body):
print(f"[AI][{function_name}] Array element {idx}: {type(item)}, keys: {list(item.keys()) if isinstance(item, dict) else 'N/A'}")
if isinstance(item, dict):
# Check if this is the image result with 'data' key
if 'data' in item:
data = item['data']
print(f"[AI][{function_name}] Found 'data' key, type: {type(data)}")
if isinstance(data, list) and len(data) > 0:
first_item = data[0]
print(f"[AI][{function_name}] First data item keys: {list(first_item.keys()) if isinstance(first_item, dict) else 'N/A'}")
image_url = first_item.get('imageURL') or first_item.get('image_url')
if image_url:
print(f"[AI][{function_name}] Found imageURL: {image_url[:50]}...")
break
# Check for errors
if 'errors' in item:
errors = item['errors']
print(f"[AI][{function_name}] Found 'errors' key, type: {type(errors)}")
if isinstance(errors, list) and len(errors) > 0:
error_obj = errors[0]
error_msg = error_obj.get('message') or error_obj.get('error') or str(error_obj)
print(f"[AI][{function_name}][Error] Error in response: {error_msg}")
break
# Check for error at root level
if 'error' in item:
error_msg = item['error']
print(f"[AI][{function_name}][Error] Error at root level: {error_msg}")
break
elif isinstance(body, dict):
# Case 2: Direct dict response
print(f"[AI][{function_name}] Response is dict with keys: {list(body.keys())}")
if 'data' in body:
data = body['data']
print(f"[AI][{function_name}] Found 'data' key, type: {type(data)}")
if isinstance(data, list) and len(data) > 0:
first_item = data[0]
print(f"[AI][{function_name}] First data item keys: {list(first_item.keys()) if isinstance(first_item, dict) else 'N/A'}")
image_url = first_item.get('imageURL') or first_item.get('image_url')
elif 'errors' in body:
errors = body['errors']
print(f"[AI][{function_name}] Found 'errors' key, type: {type(errors)}")
if isinstance(errors, list) and len(errors) > 0:
error_obj = errors[0]
error_msg = error_obj.get('message') or error_obj.get('error') or str(error_obj)
print(f"[AI][{function_name}][Error] Error in response: {error_msg}")
elif 'error' in body:
error_msg = body['error']
print(f"[AI][{function_name}][Error] Error at root level: {error_msg}")
if error_msg:
print(f"[AI][{function_name}][Error] Runware API error: {error_msg}")
return {
'url': None,
'provider': 'runware',
'cost': 0.0,
'error': error_msg,
}
if image_url:
cost = 0.009 * n # Runware pricing
print(f"[AI][{function_name}] Step 5: Image generated successfully")
print(f"[AI][{function_name}] Step 6: Cost: ${cost:.4f}")
print(f"[AI][{function_name}][Success] Image generation completed")
return {
'url': image_url,
'provider': 'runware',
'cost': cost,
'error': None,
}
else:
# If we get here, we couldn't parse the response
error_msg = f'No image data in Runware response. Response type: {type(body).__name__}'
print(f"[AI][{function_name}][Error] {error_msg}")
logger.error(f"[AI][{function_name}] Full Runware response: {json.dumps(body, indent=2) if isinstance(body, (dict, list)) else str(body)}")
return {
'url': None,
'provider': 'runware',
'cost': 0.0,
'error': error_msg,
}
except Exception as e:
error_msg = f'Unexpected error: {str(e)}'
print(f"[AI][{function_name}][Error] {error_msg}")
logger.error(error_msg, exc_info=True)
return {
'url': None,
'provider': 'runware',
'cost': 0.0,
'error': error_msg,
}
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int, model_type: str = 'text') -> float:
"""Calculate cost for API call"""
if model_type == 'text':
rates = MODEL_RATES.get(model, {'input': 2.00, 'output': 8.00})
input_cost = (input_tokens / 1_000_000) * rates['input']
output_cost = (output_tokens / 1_000_000) * rates['output']
return input_cost + output_cost
elif model_type == 'image':
rate = IMAGE_MODEL_RATES.get(model, 0.040)
return rate * 1
return 0.0
# Legacy method names for backward compatibility
def call_openai(self, prompt: str, model: Optional[str] = None, max_tokens: int = 4000,
temperature: float = 0.7, response_format: Optional[Dict] = None,
api_key: Optional[str] = None) -> Dict[str, Any]:
"""Legacy method - redirects to run_ai_request()"""
return self.run_ai_request(
prompt=prompt,
model=model,
max_tokens=max_tokens,
temperature=temperature,
response_format=response_format,
api_key=api_key,
function_name='call_openai'
)