fixes
This commit is contained in:
@@ -159,35 +159,47 @@ class AutomationService:
|
||||
keyword_ids = list(pending_keywords.values_list('id', flat=True))
|
||||
|
||||
for i in range(0, len(keyword_ids), actual_batch_size):
|
||||
batch = keyword_ids[i:i + actual_batch_size]
|
||||
batch_num = (i // actual_batch_size) + 1
|
||||
total_batches = (len(keyword_ids) + actual_batch_size - 1) // actual_batch_size
|
||||
try:
|
||||
batch = keyword_ids[i:i + actual_batch_size]
|
||||
batch_num = (i // actual_batch_size) + 1
|
||||
total_batches = (len(keyword_ids) + actual_batch_size - 1) // actual_batch_size
|
||||
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Processing batch {batch_num}/{total_batches} ({len(batch)} keywords)"
|
||||
)
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Processing batch {batch_num}/{total_batches} ({len(batch)} keywords)"
|
||||
)
|
||||
|
||||
# Call AI function via AIEngine
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=AutoClusterFunction(),
|
||||
payload={'ids': batch}
|
||||
)
|
||||
# Call AI function via AIEngine
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=AutoClusterFunction(),
|
||||
payload={'ids': batch}
|
||||
)
|
||||
|
||||
# Monitor task
|
||||
task_id = result.get('task_id')
|
||||
if task_id:
|
||||
self._wait_for_task(task_id, stage_number, f"Batch {batch_num}")
|
||||
# Monitor task
|
||||
task_id = result.get('task_id')
|
||||
if task_id:
|
||||
# FIXED: Pass continue_on_error=True to keep processing other batches on failure
|
||||
self._wait_for_task(task_id, stage_number, f"Batch {batch_num}", continue_on_error=True)
|
||||
|
||||
keywords_processed += len(batch)
|
||||
batches_run += 1
|
||||
keywords_processed += len(batch)
|
||||
batches_run += 1
|
||||
|
||||
# Log progress
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Batch {batch_num} complete"
|
||||
)
|
||||
# Log progress
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Batch {batch_num} complete"
|
||||
)
|
||||
except Exception as e:
|
||||
# FIXED: Log error but continue processing remaining batches
|
||||
error_msg = f"Failed to process batch {batch_num}: {str(e)}"
|
||||
logger.error(f"[AutomationService] {error_msg}", exc_info=True)
|
||||
self.logger.log_stage_error(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, error_msg
|
||||
)
|
||||
# Continue to next batch
|
||||
continue
|
||||
|
||||
# ADDED: Within-stage delay (between batches)
|
||||
if i + actual_batch_size < len(keyword_ids): # Not the last batch
|
||||
@@ -315,29 +327,41 @@ class AutomationService:
|
||||
credits_before = self._get_credits_used()
|
||||
|
||||
for cluster in pending_clusters:
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Generating ideas for cluster: {cluster.name}"
|
||||
)
|
||||
try:
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Generating ideas for cluster: {cluster.name}"
|
||||
)
|
||||
|
||||
# Call AI function via AIEngine
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=GenerateIdeasFunction(),
|
||||
payload={'ids': [cluster.id]}
|
||||
)
|
||||
# Call AI function via AIEngine
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=GenerateIdeasFunction(),
|
||||
payload={'ids': [cluster.id]}
|
||||
)
|
||||
|
||||
# Monitor task
|
||||
task_id = result.get('task_id')
|
||||
if task_id:
|
||||
self._wait_for_task(task_id, stage_number, f"Cluster '{cluster.name}'")
|
||||
# Monitor task
|
||||
task_id = result.get('task_id')
|
||||
if task_id:
|
||||
# FIXED: Pass continue_on_error=True to keep processing other clusters on failure
|
||||
self._wait_for_task(task_id, stage_number, f"Cluster '{cluster.name}'", continue_on_error=True)
|
||||
|
||||
clusters_processed += 1
|
||||
clusters_processed += 1
|
||||
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Cluster '{cluster.name}' complete"
|
||||
)
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Cluster '{cluster.name}' complete"
|
||||
)
|
||||
except Exception as e:
|
||||
# FIXED: Log error but continue processing remaining clusters
|
||||
error_msg = f"Failed to generate ideas for cluster '{cluster.name}': {str(e)}"
|
||||
logger.error(f"[AutomationService] {error_msg}", exc_info=True)
|
||||
self.logger.log_stage_error(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, error_msg
|
||||
)
|
||||
# Continue to next cluster
|
||||
continue
|
||||
|
||||
# Get ideas created count
|
||||
ideas_created = ContentIdeas.objects.filter(
|
||||
@@ -575,30 +599,42 @@ class AutomationService:
|
||||
total_tasks = len(task_list)
|
||||
|
||||
for idx, task in enumerate(task_list, 1):
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Generating content for task {idx}/{total_tasks}: {task.title}"
|
||||
)
|
||||
try:
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Generating content for task {idx}/{total_tasks}: {task.title}"
|
||||
)
|
||||
|
||||
# Call AI function via AIEngine
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=GenerateContentFunction(),
|
||||
payload={'ids': [task.id]}
|
||||
)
|
||||
# Call AI function via AIEngine
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=GenerateContentFunction(),
|
||||
payload={'ids': [task.id]}
|
||||
)
|
||||
|
||||
# Monitor task
|
||||
task_id = result.get('task_id')
|
||||
if task_id:
|
||||
self._wait_for_task(task_id, stage_number, f"Task '{task.title}'")
|
||||
# Monitor task
|
||||
task_id = result.get('task_id')
|
||||
if task_id:
|
||||
# FIXED: Pass continue_on_error=True to keep processing other tasks on failure
|
||||
self._wait_for_task(task_id, stage_number, f"Task '{task.title}'", continue_on_error=True)
|
||||
|
||||
tasks_processed += 1
|
||||
tasks_processed += 1
|
||||
|
||||
# Log progress
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Task '{task.title}' complete ({tasks_processed}/{total_tasks})"
|
||||
)
|
||||
# Log progress
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Task '{task.title}' complete ({tasks_processed}/{total_tasks})"
|
||||
)
|
||||
except Exception as e:
|
||||
# FIXED: Log error but continue processing remaining tasks
|
||||
error_msg = f"Failed to process task '{task.title}': {str(e)}"
|
||||
logger.error(f"[AutomationService] {error_msg}", exc_info=True)
|
||||
self.logger.log_stage_error(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, error_msg
|
||||
)
|
||||
# Continue to next task
|
||||
continue
|
||||
|
||||
# ADDED: Within-stage delay between tasks (if not last task)
|
||||
if idx < total_tasks:
|
||||
@@ -739,29 +775,41 @@ class AutomationService:
|
||||
total_content = len(content_list)
|
||||
|
||||
for idx, content in enumerate(content_list, 1):
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Extracting prompts {idx}/{total_content}: {content.title}"
|
||||
)
|
||||
try:
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Extracting prompts {idx}/{total_content}: {content.title}"
|
||||
)
|
||||
|
||||
# Call AI function via AIEngine
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=GenerateImagePromptsFunction(),
|
||||
payload={'ids': [content.id]}
|
||||
)
|
||||
# Call AI function via AIEngine
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=GenerateImagePromptsFunction(),
|
||||
payload={'ids': [content.id]}
|
||||
)
|
||||
|
||||
# Monitor task
|
||||
task_id = result.get('task_id')
|
||||
if task_id:
|
||||
self._wait_for_task(task_id, stage_number, f"Content '{content.title}'")
|
||||
# Monitor task
|
||||
task_id = result.get('task_id')
|
||||
if task_id:
|
||||
# FIXED: Pass continue_on_error=True to keep processing other content on failure
|
||||
self._wait_for_task(task_id, stage_number, f"Content '{content.title}'", continue_on_error=True)
|
||||
|
||||
content_processed += 1
|
||||
content_processed += 1
|
||||
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Content '{content.title}' complete ({content_processed}/{total_content})"
|
||||
)
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Content '{content.title}' complete ({content_processed}/{total_content})"
|
||||
)
|
||||
except Exception as e:
|
||||
# FIXED: Log error but continue processing remaining content
|
||||
error_msg = f"Failed to extract prompts for content '{content.title}': {str(e)}"
|
||||
logger.error(f"[AutomationService] {error_msg}", exc_info=True)
|
||||
self.logger.log_stage_error(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, error_msg
|
||||
)
|
||||
# Continue to next content
|
||||
continue
|
||||
|
||||
# ADDED: Within-stage delay between content pieces
|
||||
if idx < total_content:
|
||||
@@ -874,30 +922,43 @@ class AutomationService:
|
||||
total_images = len(image_list)
|
||||
|
||||
for idx, image in enumerate(image_list, 1):
|
||||
content_title = image.content.title if image.content else 'Unknown'
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Generating image {idx}/{total_images}: {image.image_type} for '{content_title}'"
|
||||
)
|
||||
try:
|
||||
content_title = image.content.title if image.content else 'Unknown'
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Generating image {idx}/{total_images}: {image.image_type} for '{content_title}'"
|
||||
)
|
||||
|
||||
# Call AI function via AIEngine
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=GenerateImagesFunction(),
|
||||
payload={'image_ids': [image.id]}
|
||||
)
|
||||
# Call AI function via AIEngine
|
||||
engine = AIEngine(account=self.account)
|
||||
result = engine.execute(
|
||||
fn=GenerateImagesFunction(),
|
||||
payload={'image_ids': [image.id]}
|
||||
)
|
||||
|
||||
# Monitor task
|
||||
task_id = result.get('task_id')
|
||||
if task_id:
|
||||
self._wait_for_task(task_id, stage_number, f"Image for '{content_title}'")
|
||||
# Monitor task
|
||||
task_id = result.get('task_id')
|
||||
if task_id:
|
||||
# FIXED: Pass continue_on_error=True to keep processing other images on failure
|
||||
self._wait_for_task(task_id, stage_number, f"Image for '{content_title}'", continue_on_error=True)
|
||||
|
||||
images_processed += 1
|
||||
images_processed += 1
|
||||
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Image generated for '{content_title}' ({images_processed}/{total_images})"
|
||||
)
|
||||
self.logger.log_stage_progress(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, f"Image generated for '{content_title}' ({images_processed}/{total_images})"
|
||||
)
|
||||
except Exception as e:
|
||||
# FIXED: Log error but continue processing remaining images
|
||||
content_title = image.content.title if image.content else 'Unknown'
|
||||
error_msg = f"Failed to generate image for content '{content_title}': {str(e)}"
|
||||
logger.error(f"[AutomationService] {error_msg}", exc_info=True)
|
||||
self.logger.log_stage_error(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, error_msg
|
||||
)
|
||||
# Continue to next image
|
||||
continue
|
||||
|
||||
# ADDED: Within-stage delay between images
|
||||
if idx < total_images:
|
||||
@@ -1042,12 +1103,42 @@ class AutomationService:
|
||||
|
||||
# Helper methods
|
||||
|
||||
def _wait_for_task(self, task_id: str, stage_number: int, item_name: str):
|
||||
"""Wait for Celery task to complete"""
|
||||
def _wait_for_task(self, task_id: str, stage_number: int, item_name: str, continue_on_error: bool = True):
|
||||
"""Wait for Celery task to complete
|
||||
|
||||
Args:
|
||||
task_id: Celery task ID
|
||||
stage_number: Current stage number
|
||||
item_name: Name of item being processed
|
||||
continue_on_error: If True, log error but don't raise exception (default: True)
|
||||
"""
|
||||
result = AsyncResult(task_id)
|
||||
|
||||
while not result.ready():
|
||||
# FIXED: Wrap result.ready() in try-catch to handle Celery backend errors
|
||||
max_retries = 100 # Maximum polling attempts (100 * 3s = 5 minutes)
|
||||
retry_count = 0
|
||||
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
is_ready = result.ready()
|
||||
if is_ready:
|
||||
break
|
||||
except (TypeError, ValueError, Exception) as e:
|
||||
# FIXED: Handle Celery backend errors (e.g., Redis key serialization issues)
|
||||
logger.warning(f"[AutomationService] Celery backend error while checking task {task_id}: {e}")
|
||||
# Wait a bit longer and try to get result directly
|
||||
time.sleep(5)
|
||||
try:
|
||||
# Try to get the result status from database/backend
|
||||
if result.state in ['SUCCESS', 'FAILURE']:
|
||||
break
|
||||
except Exception:
|
||||
# If still failing, assume task is done and break
|
||||
logger.warning(f"[AutomationService] Cannot determine task state, assuming complete")
|
||||
break
|
||||
|
||||
time.sleep(3) # Poll every 3 seconds
|
||||
retry_count += 1
|
||||
|
||||
# Check for pause
|
||||
self.run.refresh_from_db()
|
||||
@@ -1058,13 +1149,23 @@ class AutomationService:
|
||||
time.sleep(5)
|
||||
self.run.refresh_from_db()
|
||||
|
||||
if result.failed():
|
||||
error_msg = f"Task failed for {item_name}"
|
||||
self.logger.log_stage_error(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, error_msg
|
||||
)
|
||||
raise Exception(error_msg)
|
||||
# Check if task failed
|
||||
try:
|
||||
if result.failed():
|
||||
error_msg = f"Task failed for {item_name}"
|
||||
self.logger.log_stage_error(
|
||||
self.run.run_id, self.account.id, self.site.id,
|
||||
stage_number, error_msg
|
||||
)
|
||||
if not continue_on_error:
|
||||
raise Exception(error_msg)
|
||||
else:
|
||||
logger.warning(f"[AutomationService] {error_msg} - continuing with next item")
|
||||
except (TypeError, ValueError, Exception) as e:
|
||||
# FIXED: If we can't determine task status due to backend error, log and continue
|
||||
logger.warning(f"[AutomationService] Cannot determine task failure state for {item_name}: {e}")
|
||||
if not continue_on_error:
|
||||
raise
|
||||
|
||||
def _get_credits_used(self) -> int:
|
||||
"""Get total credits used by this run so far"""
|
||||
|
||||
@@ -501,6 +501,7 @@ JWT_ACCESS_TOKEN_EXPIRY = timedelta(minutes=15)
|
||||
JWT_REFRESH_TOKEN_EXPIRY = timedelta(days=30) # Extended to 30 days for persistent login
|
||||
|
||||
# Celery Configuration
|
||||
# FIXED: Use redis:// URL with explicit string parameters to avoid Celery backend key serialization issues
|
||||
CELERY_BROKER_URL = os.getenv('CELERY_BROKER_URL', f"redis://{os.getenv('REDIS_HOST', 'redis')}:{os.getenv('REDIS_PORT', '6379')}/0")
|
||||
CELERY_RESULT_BACKEND = os.getenv('CELERY_RESULT_BACKEND', f"redis://{os.getenv('REDIS_HOST', 'redis')}:{os.getenv('REDIS_PORT', '6379')}/0")
|
||||
CELERY_ACCEPT_CONTENT = ['json']
|
||||
@@ -513,6 +514,11 @@ CELERY_TASK_TIME_LIMIT = 30 * 60 # 30 minutes
|
||||
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # 25 minutes
|
||||
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
|
||||
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
|
||||
# FIXED: Add explicit backend options to prevent key serialization issues
|
||||
CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = {
|
||||
'master_name': 'mymaster'
|
||||
} if os.getenv('REDIS_SENTINEL_ENABLED', 'false').lower() == 'true' else {}
|
||||
CELERY_REDIS_BACKEND_USE_SSL = os.getenv('REDIS_SSL_ENABLED', 'false').lower() == 'true'
|
||||
|
||||
# Publish/Sync Logging Configuration
|
||||
PUBLISH_SYNC_LOG_DIR = os.path.join(BASE_DIR, 'logs', 'publish-sync-logs')
|
||||
|
||||
Reference in New Issue
Block a user