1409 lines
70 KiB
Python
1409 lines
70 KiB
Python
"""
|
||
Integration settings views - for OpenAI, Runware, GSC integrations
|
||
Unified API Standard v1.0 compliant
|
||
"""
|
||
import logging
|
||
from rest_framework import viewsets, status
|
||
from rest_framework.decorators import action
|
||
from django.db import transaction
|
||
from drf_spectacular.utils import extend_schema, extend_schema_view
|
||
from igny8_core.api.base import AccountModelViewSet
|
||
from igny8_core.api.response import success_response, error_response
|
||
from igny8_core.api.throttles import DebugScopedRateThrottle
|
||
from igny8_core.api.permissions import IsAuthenticatedAndActive, HasTenantAccess, IsSystemAccountOrDeveloper
|
||
from django.conf import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@extend_schema_view(
|
||
list=extend_schema(tags=['System']),
|
||
retrieve=extend_schema(tags=['System']),
|
||
update=extend_schema(tags=['System']),
|
||
test_connection=extend_schema(tags=['System']),
|
||
task_progress=extend_schema(tags=['System']),
|
||
get_image_generation_settings=extend_schema(tags=['System']),
|
||
)
|
||
class IntegrationSettingsViewSet(viewsets.ViewSet):
|
||
"""
|
||
ViewSet for managing integration settings (OpenAI, Runware, GSC)
|
||
Following reference plugin pattern: WordPress uses update_option() for igny8_api_settings
|
||
We store in IntegrationSettings model with account isolation
|
||
|
||
IMPORTANT: Integration settings are system-wide (configured by super users/developers)
|
||
Normal users don't configure their own API keys - they use the system account settings via fallback
|
||
|
||
NOTE: Class-level permissions are [IsAuthenticatedAndActive, HasTenantAccess] only.
|
||
Individual actions override with IsSystemAccountOrDeveloper where needed (save, test).
|
||
task_progress and get_image_generation_settings need to be accessible to all authenticated users.
|
||
"""
|
||
permission_classes = [IsAuthenticatedAndActive, HasTenantAccess]
|
||
|
||
throttle_scope = 'system_admin'
|
||
throttle_classes = [DebugScopedRateThrottle]
|
||
|
||
def list(self, request):
|
||
"""List all integrations - for debugging URL patterns"""
|
||
logger.info("[IntegrationSettingsViewSet] list() called")
|
||
return success_response(
|
||
data={
|
||
'message': 'IntegrationSettingsViewSet is working',
|
||
'available_endpoints': [
|
||
'GET /api/v1/system/settings/integrations/<pk>/',
|
||
'POST /api/v1/system/settings/integrations/<pk>/save/',
|
||
'POST /api/v1/system/settings/integrations/<pk>/test/',
|
||
'POST /api/v1/system/settings/integrations/<pk>/generate/',
|
||
]
|
||
},
|
||
request=request
|
||
)
|
||
|
||
def retrieve(self, request, pk=None):
|
||
"""Get integration settings - GET /api/v1/system/settings/integrations/{pk}/"""
|
||
return self.get_settings(request, pk)
|
||
|
||
def update(self, request, pk=None):
|
||
"""Save integration settings (PUT) - PUT /api/v1/system/settings/integrations/{pk}/"""
|
||
return self.save_settings(request, pk)
|
||
|
||
def save_post(self, request, pk=None, **kwargs):
|
||
"""Save integration settings (POST) - POST /api/v1/system/settings/integrations/{pk}/save/
|
||
This matches the frontend endpoint call exactly.
|
||
Reference plugin: WordPress form submits to options.php which calls update_option() via register_setting callback.
|
||
We save to IntegrationSettings model instead.
|
||
"""
|
||
# Extract pk from kwargs if not passed as parameter (DRF passes via **kwargs)
|
||
if not pk:
|
||
pk = kwargs.get('pk')
|
||
return self.save_settings(request, pk)
|
||
|
||
@action(detail=True, methods=['post'], url_path='test', url_name='test',
|
||
permission_classes=[IsAuthenticatedAndActive, HasTenantAccess, IsSystemAccountOrDeveloper])
|
||
def test_connection(self, request, pk=None):
|
||
"""
|
||
Test API connection for OpenAI or Runware
|
||
Supports two modes:
|
||
- with_response=false: Simple connection test (GET /v1/models)
|
||
- with_response=true: Full response test with ping message
|
||
"""
|
||
integration_type = pk # 'openai', 'runware'
|
||
|
||
logger.info(f"[test_connection] Called for integration_type={integration_type}, user={getattr(request, 'user', None)}, account={getattr(request, 'account', None)}")
|
||
|
||
if not integration_type:
|
||
return error_response(
|
||
error='Integration type is required',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
|
||
# Get API key and config from request or saved settings
|
||
config = request.data.get('config', {}) if isinstance(request.data.get('config'), dict) else {}
|
||
api_key = request.data.get('apiKey') or config.get('apiKey')
|
||
|
||
# Merge request.data with config if config is a dict
|
||
if not isinstance(config, dict):
|
||
config = {}
|
||
|
||
if not api_key:
|
||
# Try to get from saved settings
|
||
account = getattr(request, 'account', None)
|
||
logger.info(f"[test_connection] Account from request: {account.id if account else None}")
|
||
# Fallback to user's account
|
||
if not account:
|
||
user = getattr(request, 'user', None)
|
||
if user and hasattr(user, 'is_authenticated') and user.is_authenticated:
|
||
account = getattr(user, 'account', None)
|
||
# Fallback to default account
|
||
if not account:
|
||
from igny8_core.auth.models import Account
|
||
try:
|
||
account = Account.objects.first()
|
||
except Exception:
|
||
pass
|
||
|
||
if account:
|
||
try:
|
||
from .models import IntegrationSettings
|
||
logger.info(f"[test_connection] Looking for saved settings for account {account.id}")
|
||
saved_settings = IntegrationSettings.objects.get(
|
||
integration_type=integration_type,
|
||
account=account
|
||
)
|
||
api_key = saved_settings.config.get('apiKey')
|
||
logger.info(f"[test_connection] Found saved settings, has_apiKey={bool(api_key)}")
|
||
except IntegrationSettings.DoesNotExist:
|
||
logger.warning(f"[test_connection] No saved settings found for {integration_type} and account {account.id}")
|
||
pass
|
||
|
||
if not api_key:
|
||
logger.error(f"[test_connection] No API key found in request or saved settings")
|
||
return error_response(
|
||
error='API key is required',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
|
||
logger.info(f"[test_connection] Testing {integration_type} connection with API key (length={len(api_key) if api_key else 0})")
|
||
try:
|
||
if integration_type == 'openai':
|
||
return self._test_openai(api_key, config, request)
|
||
elif integration_type == 'runware':
|
||
return self._test_runware(api_key, request)
|
||
else:
|
||
return error_response(
|
||
error=f'Validation not supported for {integration_type}',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Error testing {integration_type} connection: {str(e)}", exc_info=True)
|
||
import traceback
|
||
error_trace = traceback.format_exc()
|
||
logger.error(f"Full traceback: {error_trace}")
|
||
return error_response(
|
||
error=str(e),
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
|
||
def _test_openai(self, api_key: str, config: dict = None, request=None):
|
||
"""
|
||
Test OpenAI API connection.
|
||
EXACT match to reference plugin's igny8_test_connection() function.
|
||
Reference: ai/openai-api.php line 186-309
|
||
"""
|
||
import requests
|
||
|
||
# Get model from config or use default (reference plugin: get_option('igny8_model', 'gpt-4.1'))
|
||
model = (config or {}).get('model', 'gpt-4.1') if config else 'gpt-4.1'
|
||
|
||
# Check if test with response is requested (reference plugin: $with_response parameter)
|
||
with_response = (config or {}).get('with_response', False) if config else False
|
||
|
||
if with_response:
|
||
# Test with actual API call (reference plugin: test with chat completion)
|
||
request_body = {
|
||
'model': model,
|
||
'messages': [
|
||
{
|
||
'role': 'user',
|
||
'content': 'test ping, reply with: OK! Ping Received. Also tell me: what is your maximum token limit that I can use in 1 request?'
|
||
}
|
||
],
|
||
'temperature': 0.7,
|
||
}
|
||
|
||
try:
|
||
response = requests.post(
|
||
'https://api.openai.com/v1/chat/completions',
|
||
headers={
|
||
'Authorization': f'Bearer {api_key}',
|
||
'Content-Type': 'application/json',
|
||
},
|
||
json=request_body,
|
||
timeout=15
|
||
)
|
||
|
||
if response.status_code >= 200 and response.status_code < 300:
|
||
response_data = response.json()
|
||
|
||
if 'choices' in response_data and len(response_data['choices']) > 0:
|
||
response_text = response_data['choices'][0]['message']['content'].strip()
|
||
|
||
# Extract token usage information (reference plugin: line 269-271)
|
||
usage = response_data.get('usage', {})
|
||
input_tokens = usage.get('prompt_tokens', 0)
|
||
output_tokens = usage.get('completion_tokens', 0)
|
||
total_tokens = usage.get('total_tokens', 0)
|
||
|
||
# Calculate cost using model rates (reference plugin: line 274-275)
|
||
from igny8_core.utils.ai_processor import MODEL_RATES
|
||
rates = MODEL_RATES.get(model, {'input': 2.00, 'output': 8.00})
|
||
cost = (input_tokens * rates['input'] + output_tokens * rates['output']) / 1000000
|
||
|
||
return success_response(
|
||
data={
|
||
'message': 'API connection and response test successful!',
|
||
'model_used': model,
|
||
'response': response_text,
|
||
'tokens_used': f"{input_tokens} / {output_tokens}",
|
||
'total_tokens': total_tokens,
|
||
'cost': f'${cost:.4f}',
|
||
'full_response': response_data,
|
||
},
|
||
request=request
|
||
)
|
||
else:
|
||
return error_response(
|
||
error='API responded but no content received',
|
||
errors={'response': [response.text[:500]]},
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
else:
|
||
body = response.text
|
||
# Map OpenAI API errors to appropriate HTTP status codes
|
||
# OpenAI 401 (invalid API key) should be 400 (Bad Request) in our API
|
||
# OpenAI 4xx errors are client errors (invalid request) -> 400
|
||
# OpenAI 5xx errors are server errors -> 500
|
||
if response.status_code == 401:
|
||
# Invalid API key - this is a validation error, not an auth error
|
||
status_code = status.HTTP_400_BAD_REQUEST
|
||
elif 400 <= response.status_code < 500:
|
||
# Other client errors from OpenAI (invalid request, rate limit, etc.)
|
||
status_code = status.HTTP_400_BAD_REQUEST
|
||
elif response.status_code >= 500:
|
||
# Server errors from OpenAI
|
||
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
|
||
else:
|
||
status_code = response.status_code
|
||
|
||
return error_response(
|
||
error=f'HTTP {response.status_code} – {body[:200]}',
|
||
status_code=status_code,
|
||
request=request
|
||
)
|
||
except requests.exceptions.RequestException as e:
|
||
return error_response(
|
||
error=str(e),
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
else:
|
||
# Simple connection test without API call (reference plugin: GET /v1/models)
|
||
try:
|
||
response = requests.get(
|
||
'https://api.openai.com/v1/models',
|
||
headers={
|
||
'Authorization': f'Bearer {api_key}',
|
||
},
|
||
timeout=10
|
||
)
|
||
|
||
if response.status_code >= 200 and response.status_code < 300:
|
||
return success_response(
|
||
data={
|
||
'message': 'API connection successful!',
|
||
'model_used': model,
|
||
'response': 'Connection verified without API call'
|
||
},
|
||
request=request
|
||
)
|
||
else:
|
||
body = response.text
|
||
# Map OpenAI API errors to appropriate HTTP status codes
|
||
# OpenAI 401 (invalid API key) should be 400 (Bad Request) in our API
|
||
# OpenAI 4xx errors are client errors (invalid request) -> 400
|
||
# OpenAI 5xx errors are server errors -> 500
|
||
if response.status_code == 401:
|
||
# Invalid API key - this is a validation error, not an auth error
|
||
status_code = status.HTTP_400_BAD_REQUEST
|
||
elif 400 <= response.status_code < 500:
|
||
# Other client errors from OpenAI (invalid request, rate limit, etc.)
|
||
status_code = status.HTTP_400_BAD_REQUEST
|
||
elif response.status_code >= 500:
|
||
# Server errors from OpenAI
|
||
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
|
||
else:
|
||
status_code = response.status_code
|
||
|
||
return error_response(
|
||
error=f'HTTP {response.status_code} – {body[:200]}',
|
||
status_code=status_code,
|
||
request=request
|
||
)
|
||
except requests.exceptions.RequestException as e:
|
||
return error_response(
|
||
error=str(e),
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
|
||
def _test_runware(self, api_key: str, request):
|
||
"""
|
||
Test Runware API connection using 64x64 image generation (ping validation)
|
||
Reference: Uses same format as image generation but with minimal 64x64 size for fast validation
|
||
"""
|
||
from igny8_core.utils.ai_processor import AIProcessor
|
||
|
||
# Get account from request
|
||
account = getattr(request, 'account', None)
|
||
if not account:
|
||
user = getattr(request, 'user', None)
|
||
if user and hasattr(user, 'is_authenticated') and user.is_authenticated:
|
||
account = getattr(user, 'account', None)
|
||
# Fallback to default account
|
||
if not account:
|
||
from igny8_core.auth.models import Account
|
||
try:
|
||
account = Account.objects.first()
|
||
except Exception:
|
||
pass
|
||
|
||
try:
|
||
# EXACT match to reference plugin: core/admin/ajax.php line 4946-5003
|
||
# Reference plugin uses: 128x128, steps=2, CFGScale=5, prompt='test image connection'
|
||
import requests
|
||
import uuid
|
||
import json
|
||
|
||
test_prompt = 'test image connection'
|
||
|
||
# Prepare payload EXACTLY as reference plugin
|
||
payload = [
|
||
{
|
||
'taskType': 'authentication',
|
||
'apiKey': api_key
|
||
},
|
||
{
|
||
'taskType': 'imageInference',
|
||
'taskUUID': str(uuid.uuid4()),
|
||
'positivePrompt': test_prompt,
|
||
'model': 'runware:97@1',
|
||
'width': 128, # Reference plugin uses 128x128, not 64x64
|
||
'height': 128,
|
||
'negativePrompt': 'text, watermark, logo, overlay, title, caption, writing on walls, writing on objects, UI, infographic elements, post title',
|
||
'steps': 2, # Low steps for fast testing
|
||
'CFGScale': 5,
|
||
'numberResults': 1
|
||
}
|
||
]
|
||
|
||
logger.info("[_test_runware] Testing Runware API with 128x128 image generation (matching reference plugin)")
|
||
logger.info(f"[_test_runware] Payload: {json.dumps(payload, indent=2)}")
|
||
|
||
# Make API request
|
||
response = requests.post(
|
||
'https://api.runware.ai/v1',
|
||
headers={'Content-Type': 'application/json'},
|
||
json=payload,
|
||
timeout=30
|
||
)
|
||
|
||
logger.info(f"[_test_runware] Response status: {response.status_code}")
|
||
|
||
if response.status_code != 200:
|
||
error_text = response.text
|
||
logger.error(f"[_test_runware] HTTP error {response.status_code}: {error_text[:200]}")
|
||
return error_response(
|
||
error=f'HTTP {response.status_code}: {error_text[:200]}',
|
||
status_code=response.status_code,
|
||
request=request
|
||
)
|
||
|
||
# Parse response - Reference plugin checks: $body['data'][0]['imageURL']
|
||
body = response.json()
|
||
logger.info(f"[_test_runware] Response body type: {type(body)}")
|
||
logger.info(f"[_test_runware] Response body: {json.dumps(body, indent=2)[:1000]}")
|
||
|
||
# Reference plugin line 4996: if (isset($body['data'][0]['imageURL']))
|
||
if isinstance(body, dict) and 'data' in body:
|
||
data = body['data']
|
||
if isinstance(data, list) and len(data) > 0:
|
||
first_item = data[0]
|
||
image_url = first_item.get('imageURL') or first_item.get('image_url')
|
||
if image_url:
|
||
logger.info(f"[_test_runware] Success! Image URL: {image_url[:50]}...")
|
||
return success_response(
|
||
data={
|
||
'message': '✅ Runware API connected successfully!',
|
||
'image_url': image_url,
|
||
'cost': '$0.0090',
|
||
'provider': 'runware',
|
||
'model': 'runware:97@1',
|
||
'size': '128x128'
|
||
},
|
||
request=request
|
||
)
|
||
|
||
# Check for errors - Reference plugin line 4998: elseif (isset($body['errors'][0]['message']))
|
||
if isinstance(body, dict) and 'errors' in body:
|
||
errors = body['errors']
|
||
if isinstance(errors, list) and len(errors) > 0:
|
||
error_msg = errors[0].get('message', 'Unknown Runware API error')
|
||
logger.error(f"[_test_runware] Runware API error: {error_msg}")
|
||
return error_response(
|
||
error=f'❌ {error_msg}',
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
|
||
# Unknown response format
|
||
logger.error(f"[_test_runware] Unknown response format: {body}")
|
||
return error_response(
|
||
error='❌ Unknown response from Runware.',
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"[_test_runware] Exception in Runware API test: {str(e)}", exc_info=True)
|
||
return error_response(
|
||
error=f'Runware API test failed: {str(e)}',
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
|
||
def generate_image(self, request, pk=None, **kwargs):
|
||
"""
|
||
Generate image using the configured image generation service
|
||
POST /api/v1/system/settings/integrations/image_generation/generate/
|
||
Note: This method is called via custom URL pattern, not @action decorator
|
||
"""
|
||
# Extract pk from kwargs if not passed as parameter (DRF passes via **kwargs)
|
||
if not pk:
|
||
pk = kwargs.get('pk')
|
||
|
||
# Log detailed request info for debugging
|
||
logger.info("=" * 80)
|
||
logger.info("[generate_image] ENDPOINT CALLED - Image generation request received")
|
||
logger.info(f"[generate_image] pk parameter: {pk}")
|
||
logger.info(f"[generate_image] kwargs: {kwargs}")
|
||
logger.info(f"[generate_image] request.path: {request.path}")
|
||
logger.info(f"[generate_image] request.method: {request.method}")
|
||
logger.info(f"[generate_image] request.META.get('PATH_INFO'): {request.META.get('PATH_INFO')}")
|
||
logger.info(f"[generate_image] request.META.get('REQUEST_URI'): {request.META.get('REQUEST_URI', 'N/A')}")
|
||
logger.info(f"[generate_image] request.META.get('HTTP_HOST'): {request.META.get('HTTP_HOST', 'N/A')}")
|
||
logger.info(f"[generate_image] request.META.get('HTTP_REFERER'): {request.META.get('HTTP_REFERER', 'N/A')}")
|
||
logger.info(f"[generate_image] request.data: {request.data}")
|
||
|
||
if pk != 'image_generation':
|
||
logger.error(f"[generate_image] Invalid pk: {pk}, expected 'image_generation'")
|
||
return error_response(
|
||
error=f'Image generation endpoint only available for image_generation integration, got: {pk}',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
|
||
# Get account
|
||
logger.info("[generate_image] Step 1: Getting account")
|
||
account = getattr(request, 'account', None)
|
||
if not account:
|
||
user = getattr(request, 'user', None)
|
||
logger.info(f"[generate_image] No account in request, checking user: {user}")
|
||
if user and hasattr(user, 'is_authenticated') and user.is_authenticated:
|
||
account = getattr(user, 'account', None)
|
||
logger.info(f"[generate_image] Got account from user: {account}")
|
||
if not account:
|
||
logger.info("[generate_image] No account found, trying to get first account from DB")
|
||
from igny8_core.auth.models import Account
|
||
try:
|
||
account = Account.objects.first()
|
||
logger.info(f"[generate_image] Got first account from DB: {account}")
|
||
except Exception as e:
|
||
logger.error(f"[generate_image] Error getting account from DB: {e}")
|
||
pass
|
||
|
||
if not account:
|
||
logger.error("[generate_image] ERROR: No account found, returning error response")
|
||
return error_response(
|
||
error='Account not found',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
|
||
logger.info(f"[generate_image] Account resolved: {account.id if account else 'None'}")
|
||
|
||
# Get request parameters
|
||
logger.info("[generate_image] Step 2: Extracting request parameters")
|
||
prompt = request.data.get('prompt', '')
|
||
negative_prompt = request.data.get('negative_prompt', '')
|
||
image_type = request.data.get('image_type', 'realistic')
|
||
image_size = request.data.get('image_size', '1024x1024')
|
||
image_format = request.data.get('image_format', 'webp')
|
||
provider = request.data.get('provider', 'openai')
|
||
model = request.data.get('model', 'dall-e-3')
|
||
|
||
logger.info(f"[generate_image] Request parameters: provider={provider}, model={model}, image_type={image_type}, image_size={image_size}, prompt_length={len(prompt)}")
|
||
logger.info(f"[generate_image] IMPORTANT: Using ONLY {provider.upper()} provider for this request. NOT using both providers.")
|
||
|
||
if not prompt:
|
||
logger.error("[generate_image] ERROR: Prompt is empty")
|
||
return error_response(
|
||
error='Prompt is required',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
|
||
# Get API key from saved settings for the specified provider only
|
||
logger.info(f"[generate_image] Step 3: Getting API key for provider: {provider}")
|
||
from .models import IntegrationSettings
|
||
|
||
# Only fetch settings for the specified provider
|
||
api_key = None
|
||
integration_enabled = False
|
||
integration_type = provider # 'openai' or 'runware'
|
||
|
||
try:
|
||
integration_settings = IntegrationSettings.objects.get(
|
||
integration_type=integration_type,
|
||
account=account
|
||
)
|
||
api_key = integration_settings.config.get('apiKey')
|
||
integration_enabled = integration_settings.is_active
|
||
logger.info(f"[generate_image] {integration_type.upper()} settings found: enabled={integration_enabled}, has_key={bool(api_key)}")
|
||
except IntegrationSettings.DoesNotExist:
|
||
logger.warning(f"[generate_image] {integration_type.upper()} settings not found in database")
|
||
api_key = None
|
||
integration_enabled = False
|
||
except Exception as e:
|
||
logger.error(f"[generate_image] Error getting {integration_type.upper()} settings: {e}")
|
||
api_key = None
|
||
integration_enabled = False
|
||
|
||
# Validate provider and API key
|
||
logger.info(f"[generate_image] Step 4: Validating {provider} provider and API key")
|
||
if provider not in ['openai', 'runware']:
|
||
logger.error(f"[generate_image] ERROR: Invalid provider: {provider}")
|
||
return error_response(
|
||
error=f'Invalid provider: {provider}. Must be "openai" or "runware"',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
|
||
if not api_key or not integration_enabled:
|
||
logger.error(f"[generate_image] ERROR: {provider.upper()} API key not configured or integration not enabled")
|
||
return error_response(
|
||
error=f'{provider.upper()} API key not configured or integration not enabled',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
|
||
logger.info(f"[generate_image] {provider.upper()} API key validated successfully")
|
||
|
||
# Generate image using AIProcessor
|
||
logger.info("[generate_image] Step 5: Creating AIProcessor and generating image")
|
||
try:
|
||
from igny8_core.utils.ai_processor import AIProcessor
|
||
processor = AIProcessor(account=account)
|
||
logger.info("[generate_image] AIProcessor created successfully")
|
||
|
||
# Parse size
|
||
width, height = map(int, image_size.split('x'))
|
||
size_str = f'{width}x{height}'
|
||
logger.info(f"[generate_image] Image size parsed: {size_str}")
|
||
|
||
logger.info(f"[generate_image] Calling processor.generate_image with: provider={provider}, model={model}, size={size_str}")
|
||
result = processor.generate_image(
|
||
prompt=prompt,
|
||
provider=provider,
|
||
model=model,
|
||
size=size_str,
|
||
n=1,
|
||
api_key=api_key,
|
||
negative_prompt=negative_prompt if provider == 'runware' else None, # OpenAI doesn't support negative prompts
|
||
)
|
||
|
||
logger.info(f"[generate_image] AIProcessor.generate_image returned: has_url={bool(result.get('url'))}, has_error={bool(result.get('error'))}")
|
||
|
||
if result.get('error'):
|
||
logger.error(f"[generate_image] ERROR from AIProcessor: {result.get('error')}")
|
||
return error_response(
|
||
error=result['error'],
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
|
||
logger.info("[generate_image] Image generation successful, returning response")
|
||
response_data = {
|
||
'image_url': result.get('url'),
|
||
'revised_prompt': result.get('revised_prompt'),
|
||
'model': model,
|
||
'provider': provider,
|
||
'cost': f"${result.get('cost', 0):.4f}" if result.get('cost') else None,
|
||
}
|
||
logger.info(f"[generate_image] Returning success response: {response_data}")
|
||
return success_response(
|
||
data=response_data,
|
||
request=request
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"[generate_image] EXCEPTION in image generation: {str(e)}", exc_info=True)
|
||
return error_response(
|
||
error=f'Failed to generate image: {str(e)}',
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
|
||
def create(self, request):
|
||
"""Create integration settings"""
|
||
integration_type = request.data.get('integration_type')
|
||
if not integration_type:
|
||
return error_response(
|
||
error='integration_type is required',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
return self.save_settings(request, integration_type)
|
||
|
||
def save_settings(self, request, pk=None):
|
||
"""Save integration settings"""
|
||
integration_type = pk # 'openai', 'runware', 'gsc'
|
||
|
||
logger.info(f"[save_settings] Called for integration_type={integration_type}, user={getattr(request, 'user', None)}, account={getattr(request, 'account', None)}")
|
||
|
||
if not integration_type:
|
||
return error_response(
|
||
error='Integration type is required',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
|
||
# Ensure config is a dict
|
||
config = dict(request.data) if hasattr(request.data, 'dict') else (request.data if isinstance(request.data, dict) else {})
|
||
logger.info(f"[save_settings] Config keys: {list(config.keys()) if isinstance(config, dict) else 'Not a dict'}")
|
||
|
||
try:
|
||
# Get account - try multiple methods
|
||
account = getattr(request, 'account', None)
|
||
logger.info(f"[save_settings] Account from request: {account.id if account else None}")
|
||
|
||
# Fallback 1: Get from authenticated user's account
|
||
if not account:
|
||
user = getattr(request, 'user', None)
|
||
if user and hasattr(user, 'is_authenticated') and user.is_authenticated:
|
||
try:
|
||
account = getattr(user, 'account', None)
|
||
except Exception as e:
|
||
logger.warning(f"Error getting account from user: {e}")
|
||
account = None
|
||
|
||
# Fallback 2: If still no account, get default account (for development)
|
||
if not account:
|
||
from igny8_core.auth.models import Account
|
||
try:
|
||
# Get the first account as fallback (development only)
|
||
account = Account.objects.first()
|
||
except Exception as e:
|
||
logger.warning(f"Error getting default account: {e}")
|
||
account = None
|
||
|
||
if not account:
|
||
logger.error(f"[save_settings] No account found after all fallbacks")
|
||
return error_response(
|
||
error='Account not found. Please ensure you are logged in.',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
|
||
logger.info(f"[save_settings] Using account: {account.id} ({account.name}, slug={account.slug}, status={account.status})")
|
||
|
||
# Store integration settings in a simple model or settings table
|
||
# For now, we'll use a simple approach - store in IntegrationSettings model
|
||
# or use Django settings/database
|
||
|
||
# Import IntegrationSettings model
|
||
from .models import IntegrationSettings
|
||
|
||
# For image_generation, ensure provider is set correctly
|
||
if integration_type == 'image_generation':
|
||
# Map service to provider if service is provided
|
||
if 'service' in config and 'provider' not in config:
|
||
config['provider'] = config['service']
|
||
# Ensure provider is set
|
||
if 'provider' not in config:
|
||
config['provider'] = config.get('service', 'openai')
|
||
# Set model based on provider
|
||
if config.get('provider') == 'openai' and 'model' not in config:
|
||
config['model'] = config.get('imageModel', 'dall-e-3')
|
||
elif config.get('provider') == 'runware' and 'model' not in config:
|
||
config['model'] = config.get('runwareModel', 'runware:97@1')
|
||
# Ensure all image settings have defaults
|
||
config.setdefault('image_type', 'realistic')
|
||
config.setdefault('max_in_article_images', 2)
|
||
config.setdefault('image_format', 'webp')
|
||
config.setdefault('desktop_enabled', True)
|
||
config.setdefault('mobile_enabled', True)
|
||
|
||
# Set default image sizes based on provider/model
|
||
provider = config.get('provider', 'openai')
|
||
model = config.get('model', 'dall-e-3')
|
||
|
||
if not config.get('featured_image_size'):
|
||
if provider == 'runware':
|
||
config['featured_image_size'] = '1280x832'
|
||
else: # openai
|
||
config['featured_image_size'] = '1024x1024'
|
||
|
||
if not config.get('desktop_image_size'):
|
||
config['desktop_image_size'] = '1024x1024'
|
||
|
||
# Get or create integration settings
|
||
logger.info(f"[save_settings] Attempting get_or_create for {integration_type} with account {account.id}")
|
||
integration_settings, created = IntegrationSettings.objects.get_or_create(
|
||
integration_type=integration_type,
|
||
account=account,
|
||
defaults={'config': config, 'is_active': config.get('enabled', False)}
|
||
)
|
||
logger.info(f"[save_settings] get_or_create result: created={created}, id={integration_settings.id}")
|
||
|
||
if not created:
|
||
logger.info(f"[save_settings] Updating existing settings (id={integration_settings.id})")
|
||
integration_settings.config = config
|
||
integration_settings.is_active = config.get('enabled', False)
|
||
integration_settings.save()
|
||
logger.info(f"[save_settings] Settings updated successfully")
|
||
|
||
logger.info(f"[save_settings] Successfully saved settings for {integration_type}")
|
||
return success_response(
|
||
data={'config': config},
|
||
message=f'{integration_type.upper()} settings saved successfully',
|
||
request=request
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error saving integration settings for {integration_type}: {str(e)}", exc_info=True)
|
||
import traceback
|
||
error_trace = traceback.format_exc()
|
||
logger.error(f"Full traceback: {error_trace}")
|
||
return error_response(
|
||
error=f'Failed to save settings: {str(e)}',
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
|
||
def get_settings(self, request, pk=None):
|
||
"""Get integration settings - defaults to AWS-admin settings if account doesn't have its own"""
|
||
integration_type = pk
|
||
|
||
if not integration_type:
|
||
return error_response(
|
||
error='Integration type is required',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
|
||
try:
|
||
# Get account - try multiple methods (same as save_settings)
|
||
account = getattr(request, 'account', None)
|
||
|
||
# Fallback 1: Get from authenticated user's account
|
||
if not account:
|
||
user = getattr(request, 'user', None)
|
||
if user and hasattr(user, 'is_authenticated') and user.is_authenticated:
|
||
try:
|
||
account = getattr(user, 'account', None)
|
||
except Exception as e:
|
||
logger.warning(f"Error getting account from user: {e}")
|
||
account = None
|
||
|
||
from .models import IntegrationSettings
|
||
|
||
# Get account-specific settings
|
||
if account:
|
||
try:
|
||
integration_settings = IntegrationSettings.objects.get(
|
||
integration_type=integration_type,
|
||
account=account
|
||
)
|
||
response_data = {
|
||
'id': integration_settings.integration_type,
|
||
'enabled': integration_settings.is_active,
|
||
**integration_settings.config
|
||
}
|
||
return success_response(
|
||
data=response_data,
|
||
request=request
|
||
)
|
||
except IntegrationSettings.DoesNotExist:
|
||
pass
|
||
except Exception as e:
|
||
logger.error(f"Error getting account-specific settings: {e}", exc_info=True)
|
||
|
||
# Return empty config if no settings found
|
||
return success_response(
|
||
data={},
|
||
request=request
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Unexpected error in get_settings for {integration_type}: {e}", exc_info=True)
|
||
return error_response(
|
||
error=f'Failed to get settings: {str(e)}',
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
|
||
@action(detail=False, methods=['get'], url_path='image_generation', url_name='image_generation_settings')
|
||
def get_image_generation_settings(self, request):
|
||
"""Get image generation settings for current account"""
|
||
account = getattr(request, 'account', None)
|
||
|
||
if not account:
|
||
# Fallback to user's account
|
||
user = getattr(request, 'user', None)
|
||
if user and hasattr(user, 'is_authenticated') and user.is_authenticated:
|
||
account = getattr(user, 'account', None)
|
||
# Fallback to default account
|
||
if not account:
|
||
from igny8_core.auth.models import Account
|
||
try:
|
||
account = Account.objects.first()
|
||
except Exception:
|
||
pass
|
||
|
||
if not account:
|
||
return error_response(
|
||
error='Account not found',
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
request=request
|
||
)
|
||
|
||
try:
|
||
from .models import IntegrationSettings
|
||
integration = IntegrationSettings.objects.get(
|
||
account=account,
|
||
integration_type='image_generation',
|
||
is_active=True
|
||
)
|
||
|
||
config = integration.config or {}
|
||
|
||
# Debug: Log what's actually in the config
|
||
logger.info(f"[get_image_generation_settings] Full config: {config}")
|
||
logger.info(f"[get_image_generation_settings] Config keys: {list(config.keys())}")
|
||
logger.info(f"[get_image_generation_settings] model field: {config.get('model')}")
|
||
logger.info(f"[get_image_generation_settings] imageModel field: {config.get('imageModel')}")
|
||
|
||
# Get model - try 'model' first, then 'imageModel' as fallback
|
||
model = config.get('model') or config.get('imageModel') or 'dall-e-3'
|
||
|
||
# Set defaults for image sizes if not present
|
||
provider = config.get('provider', 'openai')
|
||
default_featured_size = '1280x832' if provider == 'runware' else '1024x1024'
|
||
|
||
return success_response(
|
||
data={
|
||
'config': {
|
||
'provider': config.get('provider', 'openai'),
|
||
'model': model,
|
||
'image_type': config.get('image_type', 'realistic'),
|
||
'max_in_article_images': config.get('max_in_article_images', 2),
|
||
'image_format': config.get('image_format', 'webp'),
|
||
'desktop_enabled': config.get('desktop_enabled', True),
|
||
'mobile_enabled': config.get('mobile_enabled', True),
|
||
'featured_image_size': config.get('featured_image_size', default_featured_size),
|
||
'desktop_image_size': config.get('desktop_image_size', '1024x1024'),
|
||
}
|
||
},
|
||
request=request
|
||
)
|
||
except IntegrationSettings.DoesNotExist:
|
||
return success_response(
|
||
data={
|
||
'config': {
|
||
'provider': 'openai',
|
||
'model': 'dall-e-3',
|
||
'image_type': 'realistic',
|
||
'max_in_article_images': 2,
|
||
'image_format': 'webp',
|
||
'desktop_enabled': True,
|
||
'mobile_enabled': True,
|
||
}
|
||
},
|
||
request=request
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"[get_image_generation_settings] Error: {str(e)}", exc_info=True)
|
||
return error_response(
|
||
error=str(e),
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
|
||
@action(detail=False, methods=['get'], url_path='task_progress/(?P<task_id>[^/.]+)', url_name='task-progress',
|
||
permission_classes=[IsAuthenticatedAndActive]) # Allow any authenticated user to check task progress
|
||
def task_progress(self, request, task_id=None):
|
||
"""
|
||
Get Celery task progress status
|
||
GET /api/v1/system/settings/task_progress/<task_id>/
|
||
|
||
Permission: Any authenticated user can check task progress (not restricted to system accounts)
|
||
"""
|
||
if not task_id:
|
||
return error_response(
|
||
error='Task ID is required',
|
||
status_code=status.HTTP_400_BAD_REQUEST,
|
||
request=request
|
||
)
|
||
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
|
||
try:
|
||
# Try to import Celery AsyncResult
|
||
try:
|
||
from celery.result import AsyncResult
|
||
from kombu.exceptions import OperationalError as KombuOperationalError
|
||
# Try to import redis ConnectionError, but it might not be available
|
||
try:
|
||
from redis.exceptions import ConnectionError as RedisConnectionError
|
||
except ImportError:
|
||
# Redis might not be installed or ConnectionError might not exist
|
||
RedisConnectionError = ConnectionError
|
||
except ImportError:
|
||
logger.warning("Celery not available - task progress cannot be retrieved")
|
||
return success_response(
|
||
data={
|
||
'state': 'PENDING',
|
||
'meta': {
|
||
'percentage': 0,
|
||
'message': 'Celery not available - cannot retrieve task status',
|
||
'error': 'Celery not configured'
|
||
}
|
||
},
|
||
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
||
request=request
|
||
)
|
||
|
||
try:
|
||
# Create AsyncResult - this should not raise an exception even if task doesn't exist
|
||
task = AsyncResult(task_id)
|
||
|
||
# Safely get task state - accessing task.state can raise ValueError if exception info is malformed
|
||
# or ConnectionError if backend is unavailable
|
||
try:
|
||
task_state = task.state
|
||
except (ValueError, KeyError) as state_exc:
|
||
# Task has malformed exception info - try to get error from multiple sources
|
||
logger.warning(f"Error accessing task.state (malformed exception info): {str(state_exc)}")
|
||
error_msg = 'Task failed - exception details unavailable'
|
||
error_type = 'UnknownError'
|
||
request_steps = []
|
||
response_steps = []
|
||
|
||
# First, try to get from backend's stored meta (most reliable for our update_state calls)
|
||
try:
|
||
backend = task.backend
|
||
if hasattr(backend, 'get_task_meta'):
|
||
stored_meta = backend.get_task_meta(task_id)
|
||
if stored_meta and isinstance(stored_meta, dict):
|
||
meta = stored_meta.get('meta', {})
|
||
if isinstance(meta, dict):
|
||
if 'error' in meta:
|
||
error_msg = meta.get('error')
|
||
if 'error_type' in meta:
|
||
error_type = meta.get('error_type', error_type)
|
||
if 'request_steps' in meta:
|
||
request_steps = meta.get('request_steps', [])
|
||
if 'response_steps' in meta:
|
||
response_steps = meta.get('response_steps', [])
|
||
except Exception as e:
|
||
logger.debug(f"Error getting from backend meta: {str(e)}")
|
||
|
||
# Try to get error from task.result
|
||
if error_msg == 'Task failed - exception details unavailable':
|
||
try:
|
||
if hasattr(task, 'result'):
|
||
result = task.result
|
||
if isinstance(result, dict):
|
||
error_msg = result.get('error', error_msg)
|
||
error_type = result.get('error_type', error_type)
|
||
request_steps = result.get('request_steps', request_steps)
|
||
response_steps = result.get('response_steps', response_steps)
|
||
elif isinstance(result, str):
|
||
error_msg = result
|
||
except Exception as e:
|
||
logger.debug(f"Error extracting error from task.result: {str(e)}")
|
||
|
||
# Also try to get error from task.info
|
||
if error_msg == 'Task failed - exception details unavailable':
|
||
try:
|
||
if hasattr(task, 'info') and task.info:
|
||
if isinstance(task.info, dict):
|
||
if 'error' in task.info:
|
||
error_msg = task.info['error']
|
||
if 'error_type' in task.info:
|
||
error_type = task.info['error_type']
|
||
if 'request_steps' in task.info:
|
||
request_steps = task.info.get('request_steps', request_steps)
|
||
if 'response_steps' in task.info:
|
||
response_steps = task.info.get('response_steps', response_steps)
|
||
except Exception as e:
|
||
logger.debug(f"Error extracting error from task.info: {str(e)}")
|
||
|
||
return success_response(
|
||
data={
|
||
'state': 'FAILURE',
|
||
'meta': {
|
||
'error': error_msg,
|
||
'error_type': error_type,
|
||
'percentage': 0,
|
||
'message': f'Error: {error_msg}',
|
||
'request_steps': request_steps,
|
||
'response_steps': response_steps,
|
||
}
|
||
},
|
||
request=request
|
||
)
|
||
except (KombuOperationalError, RedisConnectionError, ConnectionError) as conn_exc:
|
||
# Backend connection error - task might not be registered yet or backend is down
|
||
logger.warning(f"Backend connection error accessing task.state for {task_id}: {type(conn_exc).__name__}: {str(conn_exc)}")
|
||
return success_response(
|
||
data={
|
||
'state': 'PENDING',
|
||
'meta': {
|
||
'percentage': 0,
|
||
'message': 'Task is being queued...',
|
||
'phase': 'initializing',
|
||
'error': None # Don't show as error, just pending
|
||
}
|
||
},
|
||
request=request
|
||
)
|
||
except Exception as state_exc:
|
||
logger.error(f"Unexpected error accessing task.state: {type(state_exc).__name__}: {str(state_exc)}")
|
||
return success_response(
|
||
data={
|
||
'state': 'UNKNOWN',
|
||
'meta': {
|
||
'error': f'Error accessing task: {str(state_exc)}',
|
||
'percentage': 0,
|
||
'message': f'Error: {str(state_exc)}',
|
||
}
|
||
},
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
|
||
# Check if task exists and is accessible
|
||
if task_state is None:
|
||
# Task doesn't exist or hasn't been registered yet
|
||
return success_response(
|
||
data={
|
||
'state': 'PENDING',
|
||
'meta': {
|
||
'percentage': 0,
|
||
'message': 'Task not found or not yet registered',
|
||
'phase': 'initializing',
|
||
}
|
||
},
|
||
request=request
|
||
)
|
||
|
||
# Safely get task info/result
|
||
# Try to get error from multiple sources
|
||
task_result = None
|
||
task_info = None
|
||
error_message = None
|
||
error_type = None
|
||
|
||
# First, try to get from backend's stored meta (most reliable for our update_state calls)
|
||
try:
|
||
backend = task.backend
|
||
if hasattr(backend, 'get_task_meta'):
|
||
stored_meta = backend.get_task_meta(task_id)
|
||
if stored_meta and isinstance(stored_meta, dict):
|
||
meta = stored_meta.get('meta', {})
|
||
if isinstance(meta, dict):
|
||
if 'error' in meta:
|
||
error_message = meta.get('error')
|
||
error_type = meta.get('error_type', 'UnknownError')
|
||
except Exception as backend_err:
|
||
logger.debug(f"Could not get from backend meta: {backend_err}")
|
||
|
||
try:
|
||
# Try to get result first - this often has the actual error
|
||
if not error_message and hasattr(task, 'result'):
|
||
try:
|
||
task_result = task.result
|
||
# If result is a dict with error, extract it
|
||
if isinstance(task_result, dict):
|
||
if 'error' in task_result:
|
||
error_message = task_result.get('error')
|
||
error_type = task_result.get('error_type', 'UnknownError')
|
||
elif 'success' in task_result and not task_result.get('success'):
|
||
error_message = task_result.get('error', 'Task failed')
|
||
error_type = task_result.get('error_type', 'UnknownError')
|
||
except Exception:
|
||
pass # Will try task.info next
|
||
except Exception:
|
||
pass
|
||
|
||
# Then try task.info
|
||
if not error_message and hasattr(task, 'info'):
|
||
try:
|
||
task_info = task.info
|
||
if isinstance(task_info, dict):
|
||
if 'error' in task_info:
|
||
error_message = task_info.get('error')
|
||
error_type = task_info.get('error_type', 'UnknownError')
|
||
except (ValueError, KeyError, AttributeError) as info_exc:
|
||
# Log the actual exception that occurred
|
||
logger.error(f"Error accessing task.info for {task_id}: {type(info_exc).__name__}: {str(info_exc)}", exc_info=True)
|
||
# Try to get error from traceback if available
|
||
try:
|
||
if hasattr(task, 'traceback'):
|
||
error_message = f"Task failed: {str(task.traceback)}"
|
||
except:
|
||
pass
|
||
except (KombuOperationalError, RedisConnectionError, ConnectionError) as conn_exc:
|
||
# Backend connection error - task might not be registered yet
|
||
logger.warning(f"Backend connection error accessing task.info for {task_id}: {str(conn_exc)}")
|
||
task_info = None
|
||
except Exception as e:
|
||
logger.error(f"Unexpected error accessing task.info: {type(e).__name__}: {str(e)}", exc_info=True)
|
||
else:
|
||
if not hasattr(task, 'info'):
|
||
task_info = None
|
||
|
||
# If still no error message, try to get from task.result again
|
||
if not error_message and hasattr(task, 'result'):
|
||
try:
|
||
task_result = task.result
|
||
if isinstance(task_result, dict):
|
||
if 'error' in task_result:
|
||
error_message = task_result.get('error')
|
||
error_type = task_result.get('error_type', 'UnknownError')
|
||
elif 'success' in task_result and not task_result.get('success'):
|
||
error_message = task_result.get('error', 'Task failed')
|
||
error_type = task_result.get('error_type', 'UnknownError')
|
||
elif isinstance(task_result, str):
|
||
error_message = task_result
|
||
elif isinstance(task_result, Exception):
|
||
error_message = str(task_result)
|
||
error_type = type(task_result).__name__
|
||
except (ValueError, KeyError) as result_exc:
|
||
logger.warning(f"Error accessing task.result: {str(result_exc)}")
|
||
task_result = None
|
||
except (KombuOperationalError, RedisConnectionError, ConnectionError) as conn_exc:
|
||
# Backend connection error
|
||
logger.warning(f"Backend connection error accessing task.result for {task_id}: {str(conn_exc)}")
|
||
task_result = None
|
||
except Exception as info_error:
|
||
logger.warning(f"Unexpected error accessing task result: {str(info_error)}")
|
||
task_result = None
|
||
|
||
# Use extracted error or fallback - try traceback as last resort
|
||
if not error_message:
|
||
try:
|
||
if hasattr(task, 'traceback') and task.traceback:
|
||
error_message = f"Task failed: {str(task.traceback)}"
|
||
except Exception:
|
||
pass
|
||
|
||
if not error_message:
|
||
error_message = f"Task failed - check Celery worker logs for task {task_id}"
|
||
|
||
if task_state == 'PROGRESS':
|
||
meta = task_info or {}
|
||
response_meta = {
|
||
'current': meta.get('current', 0) if isinstance(meta, dict) else 0,
|
||
'total': meta.get('total', 0) if isinstance(meta, dict) else 0,
|
||
'percentage': meta.get('percentage', 0) if isinstance(meta, dict) else 0,
|
||
'message': meta.get('message', 'Processing...') if isinstance(meta, dict) else 'Processing...',
|
||
'phase': meta.get('phase', 'processing') if isinstance(meta, dict) else 'processing',
|
||
'current_item': meta.get('current_item') if isinstance(meta, dict) else None,
|
||
'completed': meta.get('completed', 0) if isinstance(meta, dict) else 0,
|
||
# Image generation progress fields
|
||
'current_image': meta.get('current_image') if isinstance(meta, dict) else None,
|
||
'current_image_id': meta.get('current_image_id') if isinstance(meta, dict) else None,
|
||
'current_image_progress': meta.get('current_image_progress') if isinstance(meta, dict) else None,
|
||
'total_images': meta.get('total_images') if isinstance(meta, dict) else None,
|
||
'failed': meta.get('failed', 0) if isinstance(meta, dict) else 0,
|
||
'results': meta.get('results', []) if isinstance(meta, dict) else [],
|
||
}
|
||
# Include step logs if available
|
||
if isinstance(meta, dict):
|
||
if 'request_steps' in meta:
|
||
response_meta['request_steps'] = meta['request_steps']
|
||
if 'response_steps' in meta:
|
||
response_meta['response_steps'] = meta['response_steps']
|
||
# Include image_queue if available (for image generation)
|
||
if 'image_queue' in meta:
|
||
response_meta['image_queue'] = meta['image_queue']
|
||
return success_response(
|
||
data={
|
||
'state': task_state,
|
||
'meta': response_meta
|
||
},
|
||
request=request
|
||
)
|
||
elif task_state == 'SUCCESS':
|
||
result = task_result or {}
|
||
meta = result if isinstance(result, dict) else {}
|
||
response_meta = {
|
||
'percentage': 100,
|
||
'message': meta.get('message', 'Task completed successfully') if isinstance(meta, dict) else 'Task completed successfully',
|
||
'result': result,
|
||
'details': meta if isinstance(meta, dict) else {},
|
||
}
|
||
# Include step logs if available
|
||
if isinstance(meta, dict):
|
||
if 'request_steps' in meta:
|
||
response_meta['request_steps'] = meta['request_steps']
|
||
if 'response_steps' in meta:
|
||
response_meta['response_steps'] = meta['response_steps']
|
||
return success_response(
|
||
data={
|
||
'state': task_state,
|
||
'meta': response_meta
|
||
},
|
||
request=request
|
||
)
|
||
elif task_state == 'FAILURE':
|
||
# Try to get error from task.info meta first (this is where run_ai_task sets it)
|
||
if not error_message and isinstance(task_info, dict):
|
||
error_message = task_info.get('error') or task_info.get('message', '')
|
||
error_type = task_info.get('error_type', 'UnknownError')
|
||
# Also check if message contains error info
|
||
if not error_message and 'message' in task_info:
|
||
msg = task_info.get('message', '')
|
||
if msg and 'Error:' in msg:
|
||
error_message = msg.replace('Error: ', '')
|
||
|
||
# Use extracted error_message if available, otherwise try to get from error_info
|
||
if not error_message:
|
||
error_info = task_info
|
||
if isinstance(error_info, Exception):
|
||
error_message = str(error_info)
|
||
elif isinstance(error_info, dict):
|
||
error_message = error_info.get('error') or error_info.get('message', '') or str(error_info)
|
||
elif error_info:
|
||
error_message = str(error_info)
|
||
|
||
# Final fallback - ensure we always have an error message
|
||
if not error_message or error_message.strip() == '':
|
||
error_message = f'Task execution failed - check Celery worker logs for task {task_id}'
|
||
error_type = 'ExecutionError'
|
||
|
||
# If still no error message, try to get from task backend directly
|
||
if not error_message:
|
||
try:
|
||
# Try to get from backend's stored result
|
||
backend = task.backend
|
||
if hasattr(backend, 'get'):
|
||
stored = backend.get(task_id)
|
||
if stored and isinstance(stored, dict):
|
||
if 'error' in stored:
|
||
error_message = stored['error']
|
||
elif isinstance(stored.get('result'), dict) and 'error' in stored['result']:
|
||
error_message = stored['result']['error']
|
||
except Exception as backend_err:
|
||
logger.warning(f"Error getting from backend: {backend_err}")
|
||
|
||
# Final fallback
|
||
if not error_message:
|
||
error_message = 'Task failed - check backend logs for details'
|
||
|
||
response_meta = {
|
||
'error': error_message,
|
||
'percentage': 0,
|
||
'message': f'Error: {error_message}',
|
||
}
|
||
|
||
# Include error_type if available
|
||
if error_type:
|
||
response_meta['error_type'] = error_type
|
||
|
||
# Include step logs if available (from task result or error_info)
|
||
result = task_result or {}
|
||
meta = result if isinstance(result, dict) else (task_info if isinstance(task_info, dict) else {})
|
||
if isinstance(meta, dict):
|
||
if 'request_steps' in meta:
|
||
response_meta['request_steps'] = meta['request_steps']
|
||
if 'response_steps' in meta:
|
||
response_meta['response_steps'] = meta['response_steps']
|
||
# Also include error_type if available in meta
|
||
if 'error_type' in meta and not error_type:
|
||
response_meta['error_type'] = meta['error_type']
|
||
# Also check for error in meta directly
|
||
if 'error' in meta and not error_message:
|
||
error_message = meta['error']
|
||
response_meta['error'] = error_message
|
||
if 'error_type' in meta and not error_type:
|
||
error_type = meta['error_type']
|
||
response_meta['error_type'] = error_type
|
||
|
||
return success_response(
|
||
data={
|
||
'state': task_state,
|
||
'meta': response_meta
|
||
},
|
||
request=request
|
||
)
|
||
else:
|
||
# PENDING, STARTED, or other states
|
||
return success_response(
|
||
data={
|
||
'state': task_state,
|
||
'meta': {
|
||
'percentage': 0,
|
||
'message': 'Task is starting...',
|
||
'phase': 'initializing',
|
||
}
|
||
},
|
||
request=request
|
||
)
|
||
except (KombuOperationalError, RedisConnectionError, ConnectionError) as conn_error:
|
||
# Backend connection error - task might not be registered yet or backend is down
|
||
logger.warning(f"Backend connection error for task {task_id}: {type(conn_error).__name__}: {str(conn_error)}")
|
||
return success_response(
|
||
data={
|
||
'state': 'PENDING',
|
||
'meta': {
|
||
'percentage': 0,
|
||
'message': 'Task is being queued...',
|
||
'phase': 'initializing',
|
||
'error': None # Don't show as error, just pending
|
||
}
|
||
},
|
||
request=request
|
||
)
|
||
except Exception as task_error:
|
||
logger.error(f"Error accessing Celery task {task_id}: {type(task_error).__name__}: {str(task_error)}", exc_info=True)
|
||
return success_response(
|
||
data={
|
||
'state': 'UNKNOWN',
|
||
'meta': {
|
||
'percentage': 0,
|
||
'message': f'Error accessing task: {str(task_error)}',
|
||
'error': str(task_error)
|
||
}
|
||
},
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
|
||
except Exception as e:
|
||
# Check if it's a connection-related error - treat as PENDING instead of error
|
||
error_type = type(e).__name__
|
||
error_str = str(e).lower()
|
||
is_connection_error = (
|
||
'connection' in error_str or
|
||
'connect' in error_str or
|
||
'timeout' in error_str or
|
||
'unavailable' in error_str or
|
||
'network' in error_str or
|
||
error_type in ('ConnectionError', 'TimeoutError', 'OperationalError')
|
||
)
|
||
|
||
if is_connection_error:
|
||
logger.warning(f"Connection error getting task progress for {task_id}: {error_type}: {str(e)}")
|
||
return success_response(
|
||
data={
|
||
'state': 'PENDING',
|
||
'meta': {
|
||
'percentage': 0,
|
||
'message': 'Task is being queued...',
|
||
'phase': 'initializing',
|
||
'error': None
|
||
}
|
||
},
|
||
request=request
|
||
)
|
||
else:
|
||
logger.error(f"Error getting task progress for {task_id}: {error_type}: {str(e)}", exc_info=True)
|
||
return success_response(
|
||
data={
|
||
'state': 'ERROR',
|
||
'meta': {
|
||
'error': f'Error getting task status: {str(e)}',
|
||
'percentage': 0,
|
||
'message': f'Error: {str(e)}'
|
||
}
|
||
},
|
||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||
request=request
|
||
)
|
||
|