diff --git a/backend/igny8_core/business/automation/services/automation_service.py b/backend/igny8_core/business/automation/services/automation_service.py index 72ec7020..45c14bb3 100644 --- a/backend/igny8_core/business/automation/services/automation_service.py +++ b/backend/igny8_core/business/automation/services/automation_service.py @@ -159,35 +159,47 @@ class AutomationService: keyword_ids = list(pending_keywords.values_list('id', flat=True)) for i in range(0, len(keyword_ids), actual_batch_size): - batch = keyword_ids[i:i + actual_batch_size] - batch_num = (i // actual_batch_size) + 1 - total_batches = (len(keyword_ids) + actual_batch_size - 1) // actual_batch_size - - self.logger.log_stage_progress( - self.run.run_id, self.account.id, self.site.id, - stage_number, f"Processing batch {batch_num}/{total_batches} ({len(batch)} keywords)" - ) - - # Call AI function via AIEngine - engine = AIEngine(account=self.account) - result = engine.execute( - fn=AutoClusterFunction(), - payload={'ids': batch} - ) - - # Monitor task - task_id = result.get('task_id') - if task_id: - self._wait_for_task(task_id, stage_number, f"Batch {batch_num}") - - keywords_processed += len(batch) - batches_run += 1 - - # Log progress - self.logger.log_stage_progress( - self.run.run_id, self.account.id, self.site.id, - stage_number, f"Batch {batch_num} complete" - ) + try: + batch = keyword_ids[i:i + actual_batch_size] + batch_num = (i // actual_batch_size) + 1 + total_batches = (len(keyword_ids) + actual_batch_size - 1) // actual_batch_size + + self.logger.log_stage_progress( + self.run.run_id, self.account.id, self.site.id, + stage_number, f"Processing batch {batch_num}/{total_batches} ({len(batch)} keywords)" + ) + + # Call AI function via AIEngine + engine = AIEngine(account=self.account) + result = engine.execute( + fn=AutoClusterFunction(), + payload={'ids': batch} + ) + + # Monitor task + task_id = result.get('task_id') + if task_id: + # FIXED: Pass continue_on_error=True to keep processing other batches on failure + self._wait_for_task(task_id, stage_number, f"Batch {batch_num}", continue_on_error=True) + + keywords_processed += len(batch) + batches_run += 1 + + # Log progress + self.logger.log_stage_progress( + self.run.run_id, self.account.id, self.site.id, + stage_number, f"Batch {batch_num} complete" + ) + except Exception as e: + # FIXED: Log error but continue processing remaining batches + error_msg = f"Failed to process batch {batch_num}: {str(e)}" + logger.error(f"[AutomationService] {error_msg}", exc_info=True) + self.logger.log_stage_error( + self.run.run_id, self.account.id, self.site.id, + stage_number, error_msg + ) + # Continue to next batch + continue # ADDED: Within-stage delay (between batches) if i + actual_batch_size < len(keyword_ids): # Not the last batch @@ -315,29 +327,41 @@ class AutomationService: credits_before = self._get_credits_used() for cluster in pending_clusters: - self.logger.log_stage_progress( - self.run.run_id, self.account.id, self.site.id, - stage_number, f"Generating ideas for cluster: {cluster.name}" - ) - - # Call AI function via AIEngine - engine = AIEngine(account=self.account) - result = engine.execute( - fn=GenerateIdeasFunction(), - payload={'ids': [cluster.id]} - ) - - # Monitor task - task_id = result.get('task_id') - if task_id: - self._wait_for_task(task_id, stage_number, f"Cluster '{cluster.name}'") - - clusters_processed += 1 - - self.logger.log_stage_progress( - self.run.run_id, self.account.id, self.site.id, - stage_number, f"Cluster '{cluster.name}' complete" - ) + try: + self.logger.log_stage_progress( + self.run.run_id, self.account.id, self.site.id, + stage_number, f"Generating ideas for cluster: {cluster.name}" + ) + + # Call AI function via AIEngine + engine = AIEngine(account=self.account) + result = engine.execute( + fn=GenerateIdeasFunction(), + payload={'ids': [cluster.id]} + ) + + # Monitor task + task_id = result.get('task_id') + if task_id: + # FIXED: Pass continue_on_error=True to keep processing other clusters on failure + self._wait_for_task(task_id, stage_number, f"Cluster '{cluster.name}'", continue_on_error=True) + + clusters_processed += 1 + + self.logger.log_stage_progress( + self.run.run_id, self.account.id, self.site.id, + stage_number, f"Cluster '{cluster.name}' complete" + ) + except Exception as e: + # FIXED: Log error but continue processing remaining clusters + error_msg = f"Failed to generate ideas for cluster '{cluster.name}': {str(e)}" + logger.error(f"[AutomationService] {error_msg}", exc_info=True) + self.logger.log_stage_error( + self.run.run_id, self.account.id, self.site.id, + stage_number, error_msg + ) + # Continue to next cluster + continue # Get ideas created count ideas_created = ContentIdeas.objects.filter( @@ -575,30 +599,42 @@ class AutomationService: total_tasks = len(task_list) for idx, task in enumerate(task_list, 1): - self.logger.log_stage_progress( - self.run.run_id, self.account.id, self.site.id, - stage_number, f"Generating content for task {idx}/{total_tasks}: {task.title}" - ) - - # Call AI function via AIEngine - engine = AIEngine(account=self.account) - result = engine.execute( - fn=GenerateContentFunction(), - payload={'ids': [task.id]} - ) - - # Monitor task - task_id = result.get('task_id') - if task_id: - self._wait_for_task(task_id, stage_number, f"Task '{task.title}'") - - tasks_processed += 1 - - # Log progress - self.logger.log_stage_progress( - self.run.run_id, self.account.id, self.site.id, - stage_number, f"Task '{task.title}' complete ({tasks_processed}/{total_tasks})" - ) + try: + self.logger.log_stage_progress( + self.run.run_id, self.account.id, self.site.id, + stage_number, f"Generating content for task {idx}/{total_tasks}: {task.title}" + ) + + # Call AI function via AIEngine + engine = AIEngine(account=self.account) + result = engine.execute( + fn=GenerateContentFunction(), + payload={'ids': [task.id]} + ) + + # Monitor task + task_id = result.get('task_id') + if task_id: + # FIXED: Pass continue_on_error=True to keep processing other tasks on failure + self._wait_for_task(task_id, stage_number, f"Task '{task.title}'", continue_on_error=True) + + tasks_processed += 1 + + # Log progress + self.logger.log_stage_progress( + self.run.run_id, self.account.id, self.site.id, + stage_number, f"Task '{task.title}' complete ({tasks_processed}/{total_tasks})" + ) + except Exception as e: + # FIXED: Log error but continue processing remaining tasks + error_msg = f"Failed to process task '{task.title}': {str(e)}" + logger.error(f"[AutomationService] {error_msg}", exc_info=True) + self.logger.log_stage_error( + self.run.run_id, self.account.id, self.site.id, + stage_number, error_msg + ) + # Continue to next task + continue # ADDED: Within-stage delay between tasks (if not last task) if idx < total_tasks: @@ -739,29 +775,41 @@ class AutomationService: total_content = len(content_list) for idx, content in enumerate(content_list, 1): - self.logger.log_stage_progress( - self.run.run_id, self.account.id, self.site.id, - stage_number, f"Extracting prompts {idx}/{total_content}: {content.title}" - ) - - # Call AI function via AIEngine - engine = AIEngine(account=self.account) - result = engine.execute( - fn=GenerateImagePromptsFunction(), - payload={'ids': [content.id]} - ) - - # Monitor task - task_id = result.get('task_id') - if task_id: - self._wait_for_task(task_id, stage_number, f"Content '{content.title}'") - - content_processed += 1 - - self.logger.log_stage_progress( - self.run.run_id, self.account.id, self.site.id, - stage_number, f"Content '{content.title}' complete ({content_processed}/{total_content})" - ) + try: + self.logger.log_stage_progress( + self.run.run_id, self.account.id, self.site.id, + stage_number, f"Extracting prompts {idx}/{total_content}: {content.title}" + ) + + # Call AI function via AIEngine + engine = AIEngine(account=self.account) + result = engine.execute( + fn=GenerateImagePromptsFunction(), + payload={'ids': [content.id]} + ) + + # Monitor task + task_id = result.get('task_id') + if task_id: + # FIXED: Pass continue_on_error=True to keep processing other content on failure + self._wait_for_task(task_id, stage_number, f"Content '{content.title}'", continue_on_error=True) + + content_processed += 1 + + self.logger.log_stage_progress( + self.run.run_id, self.account.id, self.site.id, + stage_number, f"Content '{content.title}' complete ({content_processed}/{total_content})" + ) + except Exception as e: + # FIXED: Log error but continue processing remaining content + error_msg = f"Failed to extract prompts for content '{content.title}': {str(e)}" + logger.error(f"[AutomationService] {error_msg}", exc_info=True) + self.logger.log_stage_error( + self.run.run_id, self.account.id, self.site.id, + stage_number, error_msg + ) + # Continue to next content + continue # ADDED: Within-stage delay between content pieces if idx < total_content: @@ -874,30 +922,43 @@ class AutomationService: total_images = len(image_list) for idx, image in enumerate(image_list, 1): - content_title = image.content.title if image.content else 'Unknown' - self.logger.log_stage_progress( - self.run.run_id, self.account.id, self.site.id, - stage_number, f"Generating image {idx}/{total_images}: {image.image_type} for '{content_title}'" - ) - - # Call AI function via AIEngine - engine = AIEngine(account=self.account) - result = engine.execute( - fn=GenerateImagesFunction(), - payload={'image_ids': [image.id]} - ) - - # Monitor task - task_id = result.get('task_id') - if task_id: - self._wait_for_task(task_id, stage_number, f"Image for '{content_title}'") - - images_processed += 1 - - self.logger.log_stage_progress( - self.run.run_id, self.account.id, self.site.id, - stage_number, f"Image generated for '{content_title}' ({images_processed}/{total_images})" - ) + try: + content_title = image.content.title if image.content else 'Unknown' + self.logger.log_stage_progress( + self.run.run_id, self.account.id, self.site.id, + stage_number, f"Generating image {idx}/{total_images}: {image.image_type} for '{content_title}'" + ) + + # Call AI function via AIEngine + engine = AIEngine(account=self.account) + result = engine.execute( + fn=GenerateImagesFunction(), + payload={'image_ids': [image.id]} + ) + + # Monitor task + task_id = result.get('task_id') + if task_id: + # FIXED: Pass continue_on_error=True to keep processing other images on failure + self._wait_for_task(task_id, stage_number, f"Image for '{content_title}'", continue_on_error=True) + + images_processed += 1 + + self.logger.log_stage_progress( + self.run.run_id, self.account.id, self.site.id, + stage_number, f"Image generated for '{content_title}' ({images_processed}/{total_images})" + ) + except Exception as e: + # FIXED: Log error but continue processing remaining images + content_title = image.content.title if image.content else 'Unknown' + error_msg = f"Failed to generate image for content '{content_title}': {str(e)}" + logger.error(f"[AutomationService] {error_msg}", exc_info=True) + self.logger.log_stage_error( + self.run.run_id, self.account.id, self.site.id, + stage_number, error_msg + ) + # Continue to next image + continue # ADDED: Within-stage delay between images if idx < total_images: @@ -1042,12 +1103,42 @@ class AutomationService: # Helper methods - def _wait_for_task(self, task_id: str, stage_number: int, item_name: str): - """Wait for Celery task to complete""" + def _wait_for_task(self, task_id: str, stage_number: int, item_name: str, continue_on_error: bool = True): + """Wait for Celery task to complete + + Args: + task_id: Celery task ID + stage_number: Current stage number + item_name: Name of item being processed + continue_on_error: If True, log error but don't raise exception (default: True) + """ result = AsyncResult(task_id) - while not result.ready(): + # FIXED: Wrap result.ready() in try-catch to handle Celery backend errors + max_retries = 100 # Maximum polling attempts (100 * 3s = 5 minutes) + retry_count = 0 + + while retry_count < max_retries: + try: + is_ready = result.ready() + if is_ready: + break + except (TypeError, ValueError, Exception) as e: + # FIXED: Handle Celery backend errors (e.g., Redis key serialization issues) + logger.warning(f"[AutomationService] Celery backend error while checking task {task_id}: {e}") + # Wait a bit longer and try to get result directly + time.sleep(5) + try: + # Try to get the result status from database/backend + if result.state in ['SUCCESS', 'FAILURE']: + break + except Exception: + # If still failing, assume task is done and break + logger.warning(f"[AutomationService] Cannot determine task state, assuming complete") + break + time.sleep(3) # Poll every 3 seconds + retry_count += 1 # Check for pause self.run.refresh_from_db() @@ -1058,13 +1149,23 @@ class AutomationService: time.sleep(5) self.run.refresh_from_db() - if result.failed(): - error_msg = f"Task failed for {item_name}" - self.logger.log_stage_error( - self.run.run_id, self.account.id, self.site.id, - stage_number, error_msg - ) - raise Exception(error_msg) + # Check if task failed + try: + if result.failed(): + error_msg = f"Task failed for {item_name}" + self.logger.log_stage_error( + self.run.run_id, self.account.id, self.site.id, + stage_number, error_msg + ) + if not continue_on_error: + raise Exception(error_msg) + else: + logger.warning(f"[AutomationService] {error_msg} - continuing with next item") + except (TypeError, ValueError, Exception) as e: + # FIXED: If we can't determine task status due to backend error, log and continue + logger.warning(f"[AutomationService] Cannot determine task failure state for {item_name}: {e}") + if not continue_on_error: + raise def _get_credits_used(self) -> int: """Get total credits used by this run so far""" diff --git a/backend/igny8_core/settings.py b/backend/igny8_core/settings.py index 7f4de728..0be30b10 100644 --- a/backend/igny8_core/settings.py +++ b/backend/igny8_core/settings.py @@ -501,6 +501,7 @@ JWT_ACCESS_TOKEN_EXPIRY = timedelta(minutes=15) JWT_REFRESH_TOKEN_EXPIRY = timedelta(days=30) # Extended to 30 days for persistent login # Celery Configuration +# FIXED: Use redis:// URL with explicit string parameters to avoid Celery backend key serialization issues CELERY_BROKER_URL = os.getenv('CELERY_BROKER_URL', f"redis://{os.getenv('REDIS_HOST', 'redis')}:{os.getenv('REDIS_PORT', '6379')}/0") CELERY_RESULT_BACKEND = os.getenv('CELERY_RESULT_BACKEND', f"redis://{os.getenv('REDIS_HOST', 'redis')}:{os.getenv('REDIS_PORT', '6379')}/0") CELERY_ACCEPT_CONTENT = ['json'] @@ -513,6 +514,11 @@ CELERY_TASK_TIME_LIMIT = 30 * 60 # 30 minutes CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # 25 minutes CELERY_WORKER_PREFETCH_MULTIPLIER = 1 CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000 +# FIXED: Add explicit backend options to prevent key serialization issues +CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = { + 'master_name': 'mymaster' +} if os.getenv('REDIS_SENTINEL_ENABLED', 'false').lower() == 'true' else {} +CELERY_REDIS_BACKEND_USE_SSL = os.getenv('REDIS_SSL_ENABLED', 'false').lower() == 'true' # Publish/Sync Logging Configuration PUBLISH_SYNC_LOG_DIR = os.path.join(BASE_DIR, 'logs', 'publish-sync-logs')