""" System module views - for global settings and prompts """ import psutil import os import logging from rest_framework import viewsets, status as http_status, filters from rest_framework.decorators import action, api_view, permission_classes from rest_framework.response import Response from rest_framework.permissions import AllowAny from django.db import transaction, connection from django.core.cache import cache from django.utils import timezone from django_filters.rest_framework import DjangoFilterBackend from igny8_core.api.base import AccountModelViewSet from .models import AIPrompt, AuthorProfile, Strategy from .serializers import AIPromptSerializer, AuthorProfileSerializer, StrategySerializer logger = logging.getLogger(__name__) class AIPromptViewSet(AccountModelViewSet): """ ViewSet for managing AI prompts """ queryset = AIPrompt.objects.all() serializer_class = AIPromptSerializer permission_classes = [] # Allow any for now def get_queryset(self): """Get prompts for the current account""" return super().get_queryset().order_by('prompt_type') @action(detail=False, methods=['get'], url_path='by_type/(?P[^/.]+)', url_name='by_type') def get_by_type(self, request, prompt_type=None): """Get prompt by type""" try: prompt = self.get_queryset().get(prompt_type=prompt_type) serializer = self.get_serializer(prompt) return Response(serializer.data) except AIPrompt.DoesNotExist: # Return default if not found from .utils import get_default_prompt default_value = get_default_prompt(prompt_type) return Response({ 'prompt_type': prompt_type, 'prompt_value': default_value, 'default_prompt': default_value, 'is_active': True, }) @action(detail=False, methods=['post'], url_path='save', url_name='save') def save_prompt(self, request): """Save or update a prompt""" prompt_type = request.data.get('prompt_type') prompt_value = request.data.get('prompt_value') if not prompt_type: return Response({'error': 'prompt_type is required'}, status=http_status.HTTP_400_BAD_REQUEST) if prompt_value is None: return Response({'error': 'prompt_value is required'}, status=http_status.HTTP_400_BAD_REQUEST) # Get account - try multiple methods account = getattr(request, 'account', None) # Fallback 1: Get from authenticated user's account if not account: user = getattr(request, 'user', None) if user and hasattr(user, 'is_authenticated') and user.is_authenticated: account = getattr(user, 'account', None) # Fallback 2: If still no account, get default account (for development) if not account: from igny8_core.auth.models import Account try: account = Account.objects.first() except Exception: pass if not account: return Response({'error': 'Account not found. Please ensure you are logged in.'}, status=http_status.HTTP_400_BAD_REQUEST) # Get default prompt value if creating new from .utils import get_default_prompt default_value = get_default_prompt(prompt_type) # Get or create prompt prompt, created = AIPrompt.objects.get_or_create( prompt_type=prompt_type, account=account, defaults={ 'prompt_value': prompt_value, 'default_prompt': default_value, 'is_active': True, } ) if not created: prompt.prompt_value = prompt_value prompt.save() serializer = self.get_serializer(prompt) return Response({ 'success': True, 'data': serializer.data, 'message': f'{prompt.get_prompt_type_display()} saved successfully' }) @action(detail=False, methods=['post'], url_path='reset', url_name='reset') def reset_prompt(self, request): """Reset prompt to default""" prompt_type = request.data.get('prompt_type') if not prompt_type: return Response({'error': 'prompt_type is required'}, status=http_status.HTTP_400_BAD_REQUEST) # Get account - try multiple methods (same as integration_views) account = getattr(request, 'account', None) # Fallback 1: Get from authenticated user's account if not account: user = getattr(request, 'user', None) if user and hasattr(user, 'is_authenticated') and user.is_authenticated: account = getattr(user, 'account', None) # Fallback 2: If still no account, get default account (for development) if not account: from igny8_core.auth.models import Account try: account = Account.objects.first() except Exception: pass if not account: return Response({'error': 'Account not found. Please ensure you are logged in.'}, status=http_status.HTTP_400_BAD_REQUEST) # Get default prompt from .utils import get_default_prompt default_value = get_default_prompt(prompt_type) # Update or create prompt prompt, created = AIPrompt.objects.get_or_create( prompt_type=prompt_type, account=account, defaults={ 'prompt_value': default_value, 'default_prompt': default_value, 'is_active': True, } ) if not created: prompt.prompt_value = default_value prompt.save() serializer = self.get_serializer(prompt) return Response({ 'success': True, 'data': serializer.data, 'message': f'{prompt.get_prompt_type_display()} reset to default' }) class AuthorProfileViewSet(AccountModelViewSet): """ ViewSet for managing Author Profiles """ queryset = AuthorProfile.objects.all() serializer_class = AuthorProfileSerializer filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter] search_fields = ['name', 'description', 'tone'] ordering_fields = ['name', 'created_at', 'updated_at'] ordering = ['name'] filterset_fields = ['is_active', 'language'] class StrategyViewSet(AccountModelViewSet): """ ViewSet for managing Strategies """ queryset = Strategy.objects.all() serializer_class = StrategySerializer filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter] search_fields = ['name', 'description'] ordering_fields = ['name', 'created_at', 'updated_at'] ordering = ['name'] filterset_fields = ['is_active', 'sector'] @api_view(['GET']) @permission_classes([AllowAny]) # Adjust permissions as needed def system_status(request): """ Comprehensive system status endpoint for monitoring Returns CPU, memory, disk, database, Redis, Celery, and process information """ status_data = { 'timestamp': timezone.now().isoformat(), 'system': {}, 'database': {}, 'redis': {}, 'celery': {}, 'processes': {}, 'modules': {}, } try: # System Resources cpu_percent = psutil.cpu_percent(interval=1) cpu_count = psutil.cpu_count() memory = psutil.virtual_memory() disk = psutil.disk_usage('/') status_data['system'] = { 'cpu': { 'usage_percent': cpu_percent, 'cores': cpu_count, 'status': 'healthy' if cpu_percent < 80 else 'warning' if cpu_percent < 95 else 'critical' }, 'memory': { 'total_gb': round(memory.total / (1024**3), 2), 'used_gb': round(memory.used / (1024**3), 2), 'available_gb': round(memory.available / (1024**3), 2), 'usage_percent': memory.percent, 'status': 'healthy' if memory.percent < 80 else 'warning' if memory.percent < 95 else 'critical' }, 'disk': { 'total_gb': round(disk.total / (1024**3), 2), 'used_gb': round(disk.used / (1024**3), 2), 'free_gb': round(disk.free / (1024**3), 2), 'usage_percent': disk.percent, 'status': 'healthy' if disk.percent < 80 else 'warning' if disk.percent < 95 else 'critical' } } except Exception as e: logger.error(f"Error getting system resources: {str(e)}") status_data['system'] = {'error': str(e)} try: # Database Status with connection.cursor() as cursor: cursor.execute("SELECT 1") db_conn = True cursor.execute("SELECT version()") db_version = cursor.fetchone()[0] if cursor.rowcount > 0 else 'Unknown' # Get database size (PostgreSQL) try: cursor.execute(""" SELECT pg_size_pretty(pg_database_size(current_database())) """) db_size = cursor.fetchone()[0] if cursor.rowcount > 0 else 'Unknown' except: db_size = 'Unknown' # Count active connections try: cursor.execute("SELECT count(*) FROM pg_stat_activity WHERE state = 'active'") active_connections = cursor.fetchone()[0] if cursor.rowcount > 0 else 0 except: active_connections = 0 status_data['database'] = { 'connected': db_conn, 'version': db_version, 'size': db_size, 'active_connections': active_connections, 'status': 'healthy' if db_conn else 'critical' } except Exception as e: logger.error(f"Error getting database status: {str(e)}") status_data['database'] = {'connected': False, 'error': str(e), 'status': 'critical'} try: # Redis Status redis_conn = False redis_info = {} try: cache.set('status_check', 'ok', 10) test_value = cache.get('status_check') redis_conn = test_value == 'ok' # Try to get Redis info if available if hasattr(cache, 'client'): try: redis_client = cache.client.get_client() redis_info = redis_client.info() except: pass except Exception as e: redis_conn = False redis_info = {'error': str(e)} status_data['redis'] = { 'connected': redis_conn, 'status': 'healthy' if redis_conn else 'critical', 'info': redis_info if redis_info else {} } except Exception as e: logger.error(f"Error getting Redis status: {str(e)}") status_data['redis'] = {'connected': False, 'error': str(e), 'status': 'critical'} try: # Celery Status celery_workers = [] celery_tasks = { 'active': 0, 'scheduled': 0, 'reserved': 0, } try: from celery import current_app inspect = current_app.control.inspect() # Get active workers active_workers = inspect.active() or {} scheduled = inspect.scheduled() or {} reserved = inspect.reserved() or {} celery_workers = list(active_workers.keys()) celery_tasks['active'] = sum(len(tasks) for tasks in active_workers.values()) celery_tasks['scheduled'] = sum(len(tasks) for tasks in scheduled.values()) celery_tasks['reserved'] = sum(len(tasks) for tasks in reserved.values()) except Exception as e: logger.warning(f"Error getting Celery status: {str(e)}") celery_workers = [] celery_tasks = {'error': str(e)} status_data['celery'] = { 'workers': celery_workers, 'worker_count': len(celery_workers), 'tasks': celery_tasks, 'status': 'healthy' if len(celery_workers) > 0 else 'warning' } except Exception as e: logger.error(f"Error getting Celery status: {str(e)}") status_data['celery'] = {'error': str(e), 'status': 'warning'} try: # Process Monitoring by Stack/Component processes = { 'gunicorn': [], 'celery': [], 'postgres': [], 'redis': [], 'nginx': [], 'other': [] } process_stats = { 'gunicorn': {'count': 0, 'cpu': 0, 'memory_mb': 0}, 'celery': {'count': 0, 'cpu': 0, 'memory_mb': 0}, 'postgres': {'count': 0, 'cpu': 0, 'memory_mb': 0}, 'redis': {'count': 0, 'cpu': 0, 'memory_mb': 0}, 'nginx': {'count': 0, 'cpu': 0, 'memory_mb': 0}, } for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cpu_percent', 'memory_info']): try: proc_info = proc.info name = proc_info['name'].lower() cmdline = ' '.join(proc_info['cmdline']) if proc_info['cmdline'] else '' cmdline_lower = cmdline.lower() cpu = proc_info.get('cpu_percent', 0) or 0 memory = proc_info.get('memory_info', None) memory_mb = (memory.rss / (1024**2)) if memory else 0 # Categorize processes if 'gunicorn' in cmdline_lower or 'gunicorn' in name: processes['gunicorn'].append({ 'pid': proc_info['pid'], 'name': name, 'cpu_percent': round(cpu, 2), 'memory_mb': round(memory_mb, 2) }) process_stats['gunicorn']['count'] += 1 process_stats['gunicorn']['cpu'] += cpu process_stats['gunicorn']['memory_mb'] += memory_mb elif 'celery' in cmdline_lower or 'celery' in name: processes['celery'].append({ 'pid': proc_info['pid'], 'name': name, 'cpu_percent': round(cpu, 2), 'memory_mb': round(memory_mb, 2) }) process_stats['celery']['count'] += 1 process_stats['celery']['cpu'] += cpu process_stats['celery']['memory_mb'] += memory_mb elif 'postgres' in name or 'postgresql' in name: processes['postgres'].append({ 'pid': proc_info['pid'], 'name': name, 'cpu_percent': round(cpu, 2), 'memory_mb': round(memory_mb, 2) }) process_stats['postgres']['count'] += 1 process_stats['postgres']['cpu'] += cpu process_stats['postgres']['memory_mb'] += memory_mb elif 'redis' in name or 'redis-server' in name: processes['redis'].append({ 'pid': proc_info['pid'], 'name': name, 'cpu_percent': round(cpu, 2), 'memory_mb': round(memory_mb, 2) }) process_stats['redis']['count'] += 1 process_stats['redis']['cpu'] += cpu process_stats['redis']['memory_mb'] += memory_mb elif 'nginx' in name or 'caddy' in name: processes['nginx'].append({ 'pid': proc_info['pid'], 'name': name, 'cpu_percent': round(cpu, 2), 'memory_mb': round(memory_mb, 2) }) process_stats['nginx']['count'] += 1 process_stats['nginx']['cpu'] += cpu process_stats['nginx']['memory_mb'] += memory_mb except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue # Round stats for key in process_stats: process_stats[key]['cpu'] = round(process_stats[key]['cpu'], 2) process_stats[key]['memory_mb'] = round(process_stats[key]['memory_mb'], 2) status_data['processes'] = { 'by_stack': process_stats, 'details': {k: v[:10] for k, v in processes.items()} # Limit details to 10 per type } except Exception as e: logger.error(f"Error getting process information: {str(e)}") status_data['processes'] = {'error': str(e)} try: # Module-specific task counts from igny8_core.modules.planner.models import Keywords, Clusters, ContentIdeas from igny8_core.modules.writer.models import Tasks, Images status_data['modules'] = { 'planner': { 'keywords': Keywords.objects.count(), 'clusters': Clusters.objects.count(), 'content_ideas': ContentIdeas.objects.count(), }, 'writer': { 'tasks': Tasks.objects.count(), 'images': Images.objects.count(), } } except Exception as e: logger.error(f"Error getting module statistics: {str(e)}") status_data['modules'] = {'error': str(e)} return Response(status_data) @api_view(['GET']) @permission_classes([AllowAny]) # Will check admin in view def get_request_metrics(request, request_id): """ Get resource metrics for a specific request. Only accessible to admins/developers. """ # Check if user is admin/developer if not request.user.is_authenticated: return Response({'error': 'Authentication required'}, status=http_status.HTTP_401_UNAUTHORIZED) if not (hasattr(request.user, 'is_admin_or_developer') and request.user.is_admin_or_developer()): return Response({'error': 'Admin access required'}, status=http_status.HTTP_403_FORBIDDEN) # Get metrics from cache from django.core.cache import cache metrics = cache.get(f"resource_tracking_{request_id}") if not metrics: return Response({'error': 'Metrics not found or expired'}, status=http_status.HTTP_404_NOT_FOUND) return Response(metrics) @api_view(['POST']) @permission_classes([AllowAny]) def gitea_webhook(request): """ Webhook endpoint to receive push events from Gitea. Handles automatic deployment when code is pushed to the repository. """ import json import subprocess import os try: # Parse webhook payload payload = json.loads(request.body) event_type = request.headers.get('X-Gitea-Event', 'push') logger.info(f"[Webhook] Received {event_type} event from Gitea") # Only process push events if event_type != 'push': return Response({ 'status': 'ignored', 'message': f'Event type {event_type} is not processed' }, status=http_status.HTTP_200_OK) # Extract repository information repository = payload.get('repository', {}) repo_name = repository.get('name', '') repo_full_name = repository.get('full_name', '') ref = payload.get('ref', '') # Only process pushes to main branch if ref != 'refs/heads/main': logger.info(f"[Webhook] Ignoring push to {ref}, only processing main branch") return Response({ 'status': 'ignored', 'message': f'Push to {ref} ignored, only main branch is processed' }, status=http_status.HTTP_200_OK) # Get commit information commits = payload.get('commits', []) commit_count = len(commits) pusher = payload.get('pusher', {}).get('username', 'unknown') logger.info(f"[Webhook] Processing push: {commit_count} commit(s) by {pusher} to {repo_full_name}") # Pull latest code - use docker Python library to exec into gitea container try: import docker as docker_lib logger.info(f"[Webhook] Pulling latest code...") client = docker_lib.DockerClient(base_url='unix://var/run/docker.sock') gitea_container = client.containers.get('gitea') exec_result = gitea_container.exec_run( 'bash -c "git config --global --add safe.directory /deploy/igny8 2>/dev/null || true && ' 'git -C /deploy/igny8 fetch origin main && ' 'git -C /deploy/igny8 reset --hard origin/main"', user='root' ) if exec_result.exit_code == 0: logger.info(f"[Webhook] Git pull successful") else: logger.error(f"[Webhook] Git pull failed: {exec_result.output.decode()}") except ImportError: # Fallback to subprocess if docker library not available try: import subprocess logger.info(f"[Webhook] Pulling latest code (subprocess fallback)...") result = subprocess.run( ['docker', 'exec', 'gitea', 'bash', '-c', 'git config --global --add safe.directory /deploy/igny8 2>/dev/null || true && ' 'git -C /deploy/igny8 fetch origin main && ' 'git -C /deploy/igny8 reset --hard origin/main'], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: logger.info(f"[Webhook] Git pull successful") else: logger.error(f"[Webhook] Git pull failed: {result.stderr}") except Exception as e: logger.error(f"[Webhook] Git pull error: {e}") except Exception as e: logger.error(f"[Webhook] Git pull error: {e}") # Trigger deployment - restart containers deployment_success = False deployment_error = None try: # Try to use docker Python library first, fallback to subprocess try: import docker as docker_lib client = docker_lib.DockerClient(base_url='unix://var/run/docker.sock') # Restart frontend container (don't restart backend from within itself) logger.info(f"[Webhook] Restarting frontend container...") frontend_container = client.containers.get("igny8_frontend") frontend_container.restart(timeout=30) logger.info(f"[Webhook] Frontend container restarted successfully") # Schedule backend restart via subprocess in background (non-blocking) # This avoids deadlock from restarting the container we're running in logger.info(f"[Webhook] Scheduling backend container restart...") import threading def restart_backend(): import time time.sleep(2) # Give webhook time to respond try: backend_container = client.containers.get("igny8_backend") backend_container.restart(timeout=30) logger.info(f"[Webhook] Backend container restarted successfully (delayed)") except Exception as e: logger.error(f"[Webhook] Delayed backend restart failed: {e}") restart_thread = threading.Thread(target=restart_backend, daemon=True) restart_thread.start() deployment_success = True except ImportError: # Fallback to subprocess with docker command logger.info(f"[Webhook] Docker library not available, using subprocess...") # Try /usr/bin/docker or docker in PATH docker_cmd = "/usr/bin/docker" import shutil if not os.path.exists(docker_cmd): docker_cmd = shutil.which("docker") or "docker" # Restart backend container logger.info(f"[Webhook] Restarting backend container...") backend_result = subprocess.run( [docker_cmd, "restart", "igny8_backend"], capture_output=True, text=True, timeout=30 ) if backend_result.returncode != 0: raise Exception(f"Backend restart failed: {backend_result.stderr}") logger.info(f"[Webhook] Backend container restarted successfully") # Restart frontend container logger.info(f"[Webhook] Restarting frontend container...") frontend_result = subprocess.run( [docker_cmd, "restart", "igny8_frontend"], capture_output=True, text=True, timeout=30 ) if frontend_result.returncode != 0: raise Exception(f"Frontend restart failed: {frontend_result.stderr}") logger.info(f"[Webhook] Frontend container restarted successfully") deployment_success = True logger.info(f"[Webhook] Deployment completed: containers restarted") except subprocess.TimeoutExpired as e: deployment_error = f"Deployment timeout: {str(e)}" logger.error(f"[Webhook] {deployment_error}") except Exception as deploy_error: deployment_error = str(deploy_error) logger.error(f"[Webhook] Deployment error: {deploy_error}", exc_info=True) return Response({ 'status': 'success' if deployment_success else 'partial', 'message': 'Webhook received and processed', 'repository': repo_full_name, 'branch': ref, 'commits': commit_count, 'pusher': pusher, 'event': event_type, 'deployment': { 'success': deployment_success, 'error': deployment_error } }, status=http_status.HTTP_200_OK) except json.JSONDecodeError as e: logger.error(f"[Webhook] Invalid JSON payload: {e}") return Response({ 'status': 'error', 'message': 'Invalid JSON payload' }, status=http_status.HTTP_400_BAD_REQUEST) except Exception as e: logger.error(f"[Webhook] Error processing webhook: {e}", exc_info=True) return Response({ 'status': 'error', 'message': str(e) }, status=http_status.HTTP_500_INTERNAL_SERVER_ERROR)