""" System module views - for global settings and prompts """ import psutil import os import logging from rest_framework import viewsets, status as http_status, filters from rest_framework.decorators import action, api_view, permission_classes from rest_framework.response import Response from rest_framework.permissions import AllowAny from django.db import transaction, connection from django.core.cache import cache from django.utils import timezone from django_filters.rest_framework import DjangoFilterBackend from drf_spectacular.utils import extend_schema, extend_schema_view from igny8_core.api.base import AccountModelViewSet from igny8_core.api.response import success_response, error_response from igny8_core.api.permissions import IsEditorOrAbove, IsAuthenticatedAndActive, IsViewerOrAbove, HasTenantAccess from igny8_core.api.throttles import DebugScopedRateThrottle from igny8_core.api.pagination import CustomPageNumberPagination from .models import AIPrompt, AuthorProfile, Strategy from .serializers import AIPromptSerializer, AuthorProfileSerializer, StrategySerializer logger = logging.getLogger(__name__) @extend_schema_view( list=extend_schema(tags=['System']), create=extend_schema(tags=['System']), retrieve=extend_schema(tags=['System']), update=extend_schema(tags=['System']), partial_update=extend_schema(tags=['System']), destroy=extend_schema(tags=['System']), ) class AIPromptViewSet(AccountModelViewSet): """ ViewSet for managing AI prompts Unified API Standard v1.0 compliant """ queryset = AIPrompt.objects.all() serializer_class = AIPromptSerializer permission_classes = [IsAuthenticatedAndActive, HasTenantAccess] throttle_scope = 'system' throttle_classes = [DebugScopedRateThrottle] pagination_class = CustomPageNumberPagination # Explicitly use custom pagination def get_queryset(self): """Get prompts for the current account""" return super().get_queryset().order_by('prompt_type') @action(detail=False, methods=['get'], url_path='by_type/(?P[^/.]+)', url_name='by_type') def get_by_type(self, request, prompt_type=None): """Get prompt by type""" try: prompt = self.get_queryset().get(prompt_type=prompt_type) serializer = self.get_serializer(prompt) return success_response(data=serializer.data, request=request) except AIPrompt.DoesNotExist: # Return default if not found from .utils import get_default_prompt default_value = get_default_prompt(prompt_type) return success_response( data={ 'prompt_type': prompt_type, 'prompt_value': default_value, 'default_prompt': default_value, 'is_active': True, }, request=request ) @action(detail=False, methods=['post'], url_path='save', url_name='save') def save_prompt(self, request): """Save or update a prompt - requires editor or above""" # Check if user has editor or above permissions if not IsEditorOrAbove().has_permission(request, self): return error_response( error='Permission denied. Editor or above role required.', status_code=http_status.HTTP_403_FORBIDDEN, request=request ) prompt_type = request.data.get('prompt_type') prompt_value = request.data.get('prompt_value') if not prompt_type: return error_response( error='prompt_type is required', status_code=http_status.HTTP_400_BAD_REQUEST, request=request ) if prompt_value is None: return error_response( error='prompt_value is required', status_code=http_status.HTTP_400_BAD_REQUEST, request=request ) # Get account - try multiple methods account = getattr(request, 'account', None) # Fallback 1: Get from authenticated user's account if not account: user = getattr(request, 'user', None) if user and hasattr(user, 'is_authenticated') and user.is_authenticated: account = getattr(user, 'account', None) # Fallback 2: If still no account, get default account (for development) if not account: from igny8_core.auth.models import Account try: account = Account.objects.first() except Exception: pass if not account: return error_response( error='Account not found. Please ensure you are logged in.', status_code=http_status.HTTP_400_BAD_REQUEST, request=request ) # Get default prompt value if creating new from .utils import get_default_prompt default_value = get_default_prompt(prompt_type) # Get or create prompt prompt, created = AIPrompt.objects.get_or_create( prompt_type=prompt_type, account=account, defaults={ 'prompt_value': prompt_value, 'default_prompt': default_value, 'is_active': True, } ) if not created: prompt.prompt_value = prompt_value prompt.save() serializer = self.get_serializer(prompt) return success_response( data=serializer.data, message=f'{prompt.get_prompt_type_display()} saved successfully', request=request ) @action(detail=False, methods=['post'], url_path='reset', url_name='reset') def reset_prompt(self, request): """Reset prompt to default - requires editor or above""" # Check if user has editor or above permissions if not IsEditorOrAbove().has_permission(request, self): return error_response( error='Permission denied. Editor or above role required.', status_code=http_status.HTTP_403_FORBIDDEN, request=request ) prompt_type = request.data.get('prompt_type') if not prompt_type: return error_response( error='prompt_type is required', status_code=http_status.HTTP_400_BAD_REQUEST, request=request ) # Get account - try multiple methods (same as integration_views) account = getattr(request, 'account', None) # Fallback 1: Get from authenticated user's account if not account: user = getattr(request, 'user', None) if user and hasattr(user, 'is_authenticated') and user.is_authenticated: account = getattr(user, 'account', None) # Fallback 2: If still no account, get default account (for development) if not account: from igny8_core.auth.models import Account try: account = Account.objects.first() except Exception: pass if not account: return error_response( error='Account not found. Please ensure you are logged in.', status_code=http_status.HTTP_400_BAD_REQUEST, request=request ) # Get default prompt from .utils import get_default_prompt default_value = get_default_prompt(prompt_type) # Update or create prompt prompt, created = AIPrompt.objects.get_or_create( prompt_type=prompt_type, account=account, defaults={ 'prompt_value': default_value, 'default_prompt': default_value, 'is_active': True, } ) if not created: prompt.prompt_value = default_value prompt.save() serializer = self.get_serializer(prompt) return success_response( data=serializer.data, message=f'{prompt.get_prompt_type_display()} reset to default', request=request ) @extend_schema_view( list=extend_schema(tags=['System']), create=extend_schema(tags=['System']), retrieve=extend_schema(tags=['System']), update=extend_schema(tags=['System']), partial_update=extend_schema(tags=['System']), destroy=extend_schema(tags=['System']), ) class AuthorProfileViewSet(AccountModelViewSet): """ ViewSet for managing Author Profiles Unified API Standard v1.0 compliant """ queryset = AuthorProfile.objects.all() serializer_class = AuthorProfileSerializer permission_classes = [IsAuthenticatedAndActive, IsViewerOrAbove] throttle_scope = 'system' throttle_classes = [DebugScopedRateThrottle] filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter] search_fields = ['name', 'description', 'tone'] ordering_fields = ['name', 'created_at', 'updated_at'] ordering = ['name'] filterset_fields = ['is_active', 'language'] @extend_schema_view( list=extend_schema(tags=['System']), create=extend_schema(tags=['System']), retrieve=extend_schema(tags=['System']), update=extend_schema(tags=['System']), partial_update=extend_schema(tags=['System']), destroy=extend_schema(tags=['System']), ) class StrategyViewSet(AccountModelViewSet): """ ViewSet for managing Strategies Unified API Standard v1.0 compliant """ queryset = Strategy.objects.all() serializer_class = StrategySerializer permission_classes = [IsAuthenticatedAndActive, IsViewerOrAbove] throttle_scope = 'system' throttle_classes = [DebugScopedRateThrottle] filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter] search_fields = ['name', 'description'] ordering_fields = ['name', 'created_at', 'updated_at'] ordering = ['name'] filterset_fields = ['is_active', 'sector'] @api_view(['GET']) @permission_classes([AllowAny]) # Public endpoint for monitoring def system_status(request): """ Comprehensive system status endpoint for monitoring Returns CPU, memory, disk, database, Redis, Celery, and process information """ status_data = { 'timestamp': timezone.now().isoformat(), 'system': {}, 'database': {}, 'redis': {}, 'celery': {}, 'processes': {}, 'modules': {}, } try: # System Resources cpu_percent = psutil.cpu_percent(interval=1) cpu_count = psutil.cpu_count() memory = psutil.virtual_memory() disk = psutil.disk_usage('/') status_data['system'] = { 'cpu': { 'usage_percent': cpu_percent, 'cores': cpu_count, 'status': 'healthy' if cpu_percent < 80 else 'warning' if cpu_percent < 95 else 'critical' }, 'memory': { 'total_gb': round(memory.total / (1024**3), 2), 'used_gb': round(memory.used / (1024**3), 2), 'available_gb': round(memory.available / (1024**3), 2), 'usage_percent': memory.percent, 'status': 'healthy' if memory.percent < 80 else 'warning' if memory.percent < 95 else 'critical' }, 'disk': { 'total_gb': round(disk.total / (1024**3), 2), 'used_gb': round(disk.used / (1024**3), 2), 'free_gb': round(disk.free / (1024**3), 2), 'usage_percent': disk.percent, 'status': 'healthy' if disk.percent < 80 else 'warning' if disk.percent < 95 else 'critical' } } except Exception as e: logger.error(f"Error getting system resources: {str(e)}") status_data['system'] = {'error': str(e)} try: # Database Status with connection.cursor() as cursor: cursor.execute("SELECT 1") db_conn = True cursor.execute("SELECT version()") db_version = cursor.fetchone()[0] if cursor.rowcount > 0 else 'Unknown' # Get database size (PostgreSQL) try: cursor.execute(""" SELECT pg_size_pretty(pg_database_size(current_database())) """) db_size = cursor.fetchone()[0] if cursor.rowcount > 0 else 'Unknown' except: db_size = 'Unknown' # Count active connections try: cursor.execute("SELECT count(*) FROM pg_stat_activity WHERE state = 'active'") active_connections = cursor.fetchone()[0] if cursor.rowcount > 0 else 0 except: active_connections = 0 status_data['database'] = { 'connected': db_conn, 'version': db_version, 'size': db_size, 'active_connections': active_connections, 'status': 'healthy' if db_conn else 'critical' } except Exception as e: logger.error(f"Error getting database status: {str(e)}") status_data['database'] = {'connected': False, 'error': str(e), 'status': 'critical'} try: # Redis Status redis_conn = False redis_info = {} try: cache.set('status_check', 'ok', 10) test_value = cache.get('status_check') redis_conn = test_value == 'ok' # Try to get Redis info if available if hasattr(cache, 'client'): try: redis_client = cache.client.get_client() redis_info = redis_client.info() except: pass except Exception as e: redis_conn = False redis_info = {'error': str(e)} status_data['redis'] = { 'connected': redis_conn, 'status': 'healthy' if redis_conn else 'critical', 'info': redis_info if redis_info else {} } except Exception as e: logger.error(f"Error getting Redis status: {str(e)}") status_data['redis'] = {'connected': False, 'error': str(e), 'status': 'critical'} try: # Celery Status celery_workers = [] celery_tasks = { 'active': 0, 'scheduled': 0, 'reserved': 0, } try: from celery import current_app inspect = current_app.control.inspect() # Get active workers active_workers = inspect.active() or {} scheduled = inspect.scheduled() or {} reserved = inspect.reserved() or {} celery_workers = list(active_workers.keys()) celery_tasks['active'] = sum(len(tasks) for tasks in active_workers.values()) celery_tasks['scheduled'] = sum(len(tasks) for tasks in scheduled.values()) celery_tasks['reserved'] = sum(len(tasks) for tasks in reserved.values()) except Exception as e: logger.warning(f"Error getting Celery status: {str(e)}") celery_workers = [] celery_tasks = {'error': str(e)} status_data['celery'] = { 'workers': celery_workers, 'worker_count': len(celery_workers), 'tasks': celery_tasks, 'status': 'healthy' if len(celery_workers) > 0 else 'warning' } except Exception as e: logger.error(f"Error getting Celery status: {str(e)}") status_data['celery'] = {'error': str(e), 'status': 'warning'} try: # Process Monitoring by Stack/Component processes = { 'gunicorn': [], 'celery': [], 'postgres': [], 'redis': [], 'nginx': [], 'other': [] } process_stats = { 'gunicorn': {'count': 0, 'cpu': 0, 'memory_mb': 0}, 'celery': {'count': 0, 'cpu': 0, 'memory_mb': 0}, 'postgres': {'count': 0, 'cpu': 0, 'memory_mb': 0}, 'redis': {'count': 0, 'cpu': 0, 'memory_mb': 0}, 'nginx': {'count': 0, 'cpu': 0, 'memory_mb': 0}, } for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cpu_percent', 'memory_info']): try: proc_info = proc.info name = proc_info['name'].lower() cmdline = ' '.join(proc_info['cmdline']) if proc_info['cmdline'] else '' cmdline_lower = cmdline.lower() cpu = proc_info.get('cpu_percent', 0) or 0 memory = proc_info.get('memory_info', None) memory_mb = (memory.rss / (1024**2)) if memory else 0 # Categorize processes if 'gunicorn' in cmdline_lower or 'gunicorn' in name: processes['gunicorn'].append({ 'pid': proc_info['pid'], 'name': name, 'cpu_percent': round(cpu, 2), 'memory_mb': round(memory_mb, 2) }) process_stats['gunicorn']['count'] += 1 process_stats['gunicorn']['cpu'] += cpu process_stats['gunicorn']['memory_mb'] += memory_mb elif 'celery' in cmdline_lower or 'celery' in name: processes['celery'].append({ 'pid': proc_info['pid'], 'name': name, 'cpu_percent': round(cpu, 2), 'memory_mb': round(memory_mb, 2) }) process_stats['celery']['count'] += 1 process_stats['celery']['cpu'] += cpu process_stats['celery']['memory_mb'] += memory_mb elif 'postgres' in name or 'postgresql' in name: processes['postgres'].append({ 'pid': proc_info['pid'], 'name': name, 'cpu_percent': round(cpu, 2), 'memory_mb': round(memory_mb, 2) }) process_stats['postgres']['count'] += 1 process_stats['postgres']['cpu'] += cpu process_stats['postgres']['memory_mb'] += memory_mb elif 'redis' in name or 'redis-server' in name: processes['redis'].append({ 'pid': proc_info['pid'], 'name': name, 'cpu_percent': round(cpu, 2), 'memory_mb': round(memory_mb, 2) }) process_stats['redis']['count'] += 1 process_stats['redis']['cpu'] += cpu process_stats['redis']['memory_mb'] += memory_mb elif 'nginx' in name or 'caddy' in name: processes['nginx'].append({ 'pid': proc_info['pid'], 'name': name, 'cpu_percent': round(cpu, 2), 'memory_mb': round(memory_mb, 2) }) process_stats['nginx']['count'] += 1 process_stats['nginx']['cpu'] += cpu process_stats['nginx']['memory_mb'] += memory_mb except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue # Round stats for key in process_stats: process_stats[key]['cpu'] = round(process_stats[key]['cpu'], 2) process_stats[key]['memory_mb'] = round(process_stats[key]['memory_mb'], 2) status_data['processes'] = { 'by_stack': process_stats, 'details': {k: v[:10] for k, v in processes.items()} # Limit details to 10 per type } except Exception as e: logger.error(f"Error getting process information: {str(e)}") status_data['processes'] = {'error': str(e)} try: # Module-specific task counts from igny8_core.modules.planner.models import Keywords, Clusters, ContentIdeas from igny8_core.modules.writer.models import Tasks, Images status_data['modules'] = { 'planner': { 'keywords': Keywords.objects.count(), 'clusters': Clusters.objects.count(), 'content_ideas': ContentIdeas.objects.count(), }, 'writer': { 'tasks': Tasks.objects.count(), 'images': Images.objects.count(), } } except Exception as e: logger.error(f"Error getting module statistics: {str(e)}") status_data['modules'] = {'error': str(e)} return success_response(data=status_data, request=request) @api_view(['GET']) @permission_classes([AllowAny]) # Will check admin in view def get_request_metrics(request, request_id): """ Get resource metrics for a specific request. Only accessible to admins/developers. """ # Check if user is admin/developer if not request.user.is_authenticated: return error_response( error='Authentication required', status_code=http_status.HTTP_401_UNAUTHORIZED, request=request ) if not (hasattr(request.user, 'is_admin_or_developer') and request.user.is_admin_or_developer()): return error_response( error='Admin access required', status_code=http_status.HTTP_403_FORBIDDEN, request=request ) # Get metrics from cache from django.core.cache import cache metrics = cache.get(f"resource_tracking_{request_id}") if not metrics: return error_response( error='Metrics not found or expired', status_code=http_status.HTTP_404_NOT_FOUND, request=request ) return success_response(data=metrics, request=request) @api_view(['POST']) @permission_classes([AllowAny]) def gitea_webhook(request): """ Webhook endpoint to receive push events from Gitea. Handles automatic deployment when code is pushed to the repository. """ import json import subprocess import os try: # Parse webhook payload payload = json.loads(request.body) event_type = request.headers.get('X-Gitea-Event', 'push') logger.info(f"[Webhook] Received {event_type} event from Gitea") # Only process push events if event_type != 'push': return success_response( data={'status': 'ignored'}, message=f'Event type {event_type} is not processed', request=request ) # Extract repository information repository = payload.get('repository', {}) repo_name = repository.get('name', '') repo_full_name = repository.get('full_name', '') ref = payload.get('ref', '') # Only process pushes to main branch if ref != 'refs/heads/main': logger.info(f"[Webhook] Ignoring push to {ref}, only processing main branch") return success_response( data={'status': 'ignored'}, message=f'Push to {ref} ignored, only main branch is processed', request=request ) # Get commit information commits = payload.get('commits', []) commit_count = len(commits) pusher = payload.get('pusher', {}).get('username', 'unknown') logger.info(f"[Webhook] Processing push: {commit_count} commit(s) by {pusher} to {repo_full_name}") # Pull latest code - run git pull directly try: import subprocess logger.info(f"[Webhook] Pulling latest code...") # Set safe directory first subprocess.run( ['git', 'config', '--global', '--add', 'safe.directory', '/data/app/igny8'], capture_output=True, timeout=5 ) # Pull latest code result = subprocess.run( ['git', '-C', '/data/app/igny8', 'pull', 'origin', 'main'], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: logger.info(f"[Webhook] Git pull successful") else: logger.error(f"[Webhook] Git pull failed: {result.stderr}") except Exception as e: logger.error(f"[Webhook] Git pull error: {e}") # Trigger deployment - restart containers deployment_success = False deployment_error = None try: # Try to use docker Python library first, fallback to subprocess try: import docker as docker_lib client = docker_lib.DockerClient(base_url='unix://var/run/docker.sock') # Restart frontend container (don't restart backend from within itself) logger.info(f"[Webhook] Restarting frontend container...") frontend_container = client.containers.get("igny8_frontend") frontend_container.restart(timeout=30) logger.info(f"[Webhook] Frontend container restarted successfully") # Schedule backend restart via subprocess in background (non-blocking) # This avoids deadlock from restarting the container we're running in logger.info(f"[Webhook] Scheduling backend container restart...") import threading def restart_backend(): import time time.sleep(2) # Give webhook time to respond try: backend_container = client.containers.get("igny8_backend") backend_container.restart(timeout=30) logger.info(f"[Webhook] Backend container restarted successfully (delayed)") except Exception as e: logger.error(f"[Webhook] Delayed backend restart failed: {e}") restart_thread = threading.Thread(target=restart_backend, daemon=True) restart_thread.start() deployment_success = True except ImportError: # Fallback to subprocess with docker command logger.info(f"[Webhook] Docker library not available, using subprocess...") # Try /usr/bin/docker or docker in PATH docker_cmd = "/usr/bin/docker" import shutil if not os.path.exists(docker_cmd): docker_cmd = shutil.which("docker") or "docker" # Restart backend container logger.info(f"[Webhook] Restarting backend container...") backend_result = subprocess.run( [docker_cmd, "restart", "igny8_backend"], capture_output=True, text=True, timeout=30 ) if backend_result.returncode != 0: raise Exception(f"Backend restart failed: {backend_result.stderr}") logger.info(f"[Webhook] Backend container restarted successfully") # Restart frontend container logger.info(f"[Webhook] Restarting frontend container...") frontend_result = subprocess.run( [docker_cmd, "restart", "igny8_frontend"], capture_output=True, text=True, timeout=30 ) if frontend_result.returncode != 0: raise Exception(f"Frontend restart failed: {frontend_result.stderr}") logger.info(f"[Webhook] Frontend container restarted successfully") deployment_success = True logger.info(f"[Webhook] Deployment completed: containers restarted") except subprocess.TimeoutExpired as e: deployment_error = f"Deployment timeout: {str(e)}" logger.error(f"[Webhook] {deployment_error}") except Exception as deploy_error: deployment_error = str(deploy_error) logger.error(f"[Webhook] Deployment error: {deploy_error}", exc_info=True) return success_response( data={ 'status': 'success' if deployment_success else 'partial', 'repository': repo_full_name, 'branch': ref, 'commits': commit_count, 'pusher': pusher, 'event': event_type, 'deployment': { 'success': deployment_success, 'error': deployment_error } }, message='Webhook received and processed', request=request ) except json.JSONDecodeError as e: logger.error(f"[Webhook] Invalid JSON payload: {e}") return error_response( error='Invalid JSON payload', status_code=http_status.HTTP_400_BAD_REQUEST, request=request ) except Exception as e: logger.error(f"[Webhook] Error processing webhook: {e}", exc_info=True) return error_response( error=str(e), status_code=http_status.HTTP_500_INTERNAL_SERVER_ERROR, request=request )