Files
igny8/backend/igny8_core/modules/system/views.py

644 lines
25 KiB
Python

"""
System module views - for global settings and prompts
"""
import psutil
import os
import logging
from rest_framework import viewsets, status as http_status, filters
from rest_framework.decorators import action, api_view, permission_classes
from rest_framework.response import Response
from rest_framework.permissions import AllowAny
from django.db import transaction, connection
from django.core.cache import cache
from django.utils import timezone
from django_filters.rest_framework import DjangoFilterBackend
from igny8_core.api.base import AccountModelViewSet
from .models import AIPrompt, AuthorProfile, Strategy
from .serializers import AIPromptSerializer, AuthorProfileSerializer, StrategySerializer
logger = logging.getLogger(__name__)
class AIPromptViewSet(AccountModelViewSet):
"""
ViewSet for managing AI prompts
"""
queryset = AIPrompt.objects.all()
serializer_class = AIPromptSerializer
permission_classes = [] # Allow any for now
def get_queryset(self):
"""Get prompts for the current account"""
return super().get_queryset().order_by('prompt_type')
@action(detail=False, methods=['get'], url_path='by_type/(?P<prompt_type>[^/.]+)', url_name='by_type')
def get_by_type(self, request, prompt_type=None):
"""Get prompt by type"""
try:
prompt = self.get_queryset().get(prompt_type=prompt_type)
serializer = self.get_serializer(prompt)
return Response(serializer.data)
except AIPrompt.DoesNotExist:
# Return default if not found
from .utils import get_default_prompt
default_value = get_default_prompt(prompt_type)
return Response({
'prompt_type': prompt_type,
'prompt_value': default_value,
'default_prompt': default_value,
'is_active': True,
})
@action(detail=False, methods=['post'], url_path='save', url_name='save')
def save_prompt(self, request):
"""Save or update a prompt"""
prompt_type = request.data.get('prompt_type')
prompt_value = request.data.get('prompt_value')
if not prompt_type:
return Response({'error': 'prompt_type is required'}, status=status.HTTP_400_BAD_REQUEST)
if prompt_value is None:
return Response({'error': 'prompt_value is required'}, status=status.HTTP_400_BAD_REQUEST)
# Get account - try multiple methods
account = getattr(request, 'account', None)
# Fallback 1: Get from authenticated user's account
if not account:
user = getattr(request, 'user', None)
if user and hasattr(user, 'is_authenticated') and user.is_authenticated:
account = getattr(user, 'account', None)
# Fallback 2: If still no account, get default account (for development)
if not account:
from igny8_core.auth.models import Account
try:
account = Account.objects.first()
except Exception:
pass
if not account:
return Response({'error': 'Account not found. Please ensure you are logged in.'}, status=status.HTTP_400_BAD_REQUEST)
# Get default prompt value if creating new
from .utils import get_default_prompt
default_value = get_default_prompt(prompt_type)
# Get or create prompt
prompt, created = AIPrompt.objects.get_or_create(
prompt_type=prompt_type,
account=account,
defaults={
'prompt_value': prompt_value,
'default_prompt': default_value,
'is_active': True,
}
)
if not created:
prompt.prompt_value = prompt_value
prompt.save()
serializer = self.get_serializer(prompt)
return Response({
'success': True,
'data': serializer.data,
'message': f'{prompt.get_prompt_type_display()} saved successfully'
})
@action(detail=False, methods=['post'], url_path='reset', url_name='reset')
def reset_prompt(self, request):
"""Reset prompt to default"""
prompt_type = request.data.get('prompt_type')
if not prompt_type:
return Response({'error': 'prompt_type is required'}, status=status.HTTP_400_BAD_REQUEST)
# Get account - try multiple methods (same as integration_views)
account = getattr(request, 'account', None)
# Fallback 1: Get from authenticated user's account
if not account:
user = getattr(request, 'user', None)
if user and hasattr(user, 'is_authenticated') and user.is_authenticated:
account = getattr(user, 'account', None)
# Fallback 2: If still no account, get default account (for development)
if not account:
from igny8_core.auth.models import Account
try:
account = Account.objects.first()
except Exception:
pass
if not account:
return Response({'error': 'Account not found. Please ensure you are logged in.'}, status=status.HTTP_400_BAD_REQUEST)
# Get default prompt
from .utils import get_default_prompt
default_value = get_default_prompt(prompt_type)
# Update or create prompt
prompt, created = AIPrompt.objects.get_or_create(
prompt_type=prompt_type,
account=account,
defaults={
'prompt_value': default_value,
'default_prompt': default_value,
'is_active': True,
}
)
if not created:
prompt.prompt_value = default_value
prompt.save()
serializer = self.get_serializer(prompt)
return Response({
'success': True,
'data': serializer.data,
'message': f'{prompt.get_prompt_type_display()} reset to default'
})
class AuthorProfileViewSet(AccountModelViewSet):
"""
ViewSet for managing Author Profiles
"""
queryset = AuthorProfile.objects.all()
serializer_class = AuthorProfileSerializer
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
search_fields = ['name', 'description', 'tone']
ordering_fields = ['name', 'created_at', 'updated_at']
ordering = ['name']
filterset_fields = ['is_active', 'language']
class StrategyViewSet(AccountModelViewSet):
"""
ViewSet for managing Strategies
"""
queryset = Strategy.objects.all()
serializer_class = StrategySerializer
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
search_fields = ['name', 'description']
ordering_fields = ['name', 'created_at', 'updated_at']
ordering = ['name']
filterset_fields = ['is_active', 'sector']
@api_view(['GET'])
@permission_classes([AllowAny]) # Adjust permissions as needed
def system_status(request):
"""
Comprehensive system status endpoint for monitoring
Returns CPU, memory, disk, database, Redis, Celery, and process information
"""
status_data = {
'timestamp': timezone.now().isoformat(),
'system': {},
'database': {},
'redis': {},
'celery': {},
'processes': {},
'modules': {},
}
try:
# System Resources
cpu_percent = psutil.cpu_percent(interval=1)
cpu_count = psutil.cpu_count()
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
status_data['system'] = {
'cpu': {
'usage_percent': cpu_percent,
'cores': cpu_count,
'status': 'healthy' if cpu_percent < 80 else 'warning' if cpu_percent < 95 else 'critical'
},
'memory': {
'total_gb': round(memory.total / (1024**3), 2),
'used_gb': round(memory.used / (1024**3), 2),
'available_gb': round(memory.available / (1024**3), 2),
'usage_percent': memory.percent,
'status': 'healthy' if memory.percent < 80 else 'warning' if memory.percent < 95 else 'critical'
},
'disk': {
'total_gb': round(disk.total / (1024**3), 2),
'used_gb': round(disk.used / (1024**3), 2),
'free_gb': round(disk.free / (1024**3), 2),
'usage_percent': disk.percent,
'status': 'healthy' if disk.percent < 80 else 'warning' if disk.percent < 95 else 'critical'
}
}
except Exception as e:
logger.error(f"Error getting system resources: {str(e)}")
status_data['system'] = {'error': str(e)}
try:
# Database Status
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
db_conn = True
cursor.execute("SELECT version()")
db_version = cursor.fetchone()[0] if cursor.rowcount > 0 else 'Unknown'
# Get database size (PostgreSQL)
try:
cursor.execute("""
SELECT pg_size_pretty(pg_database_size(current_database()))
""")
db_size = cursor.fetchone()[0] if cursor.rowcount > 0 else 'Unknown'
except:
db_size = 'Unknown'
# Count active connections
try:
cursor.execute("SELECT count(*) FROM pg_stat_activity WHERE state = 'active'")
active_connections = cursor.fetchone()[0] if cursor.rowcount > 0 else 0
except:
active_connections = 0
status_data['database'] = {
'connected': db_conn,
'version': db_version,
'size': db_size,
'active_connections': active_connections,
'status': 'healthy' if db_conn else 'critical'
}
except Exception as e:
logger.error(f"Error getting database status: {str(e)}")
status_data['database'] = {'connected': False, 'error': str(e), 'status': 'critical'}
try:
# Redis Status
redis_conn = False
redis_info = {}
try:
cache.set('status_check', 'ok', 10)
test_value = cache.get('status_check')
redis_conn = test_value == 'ok'
# Try to get Redis info if available
if hasattr(cache, 'client'):
try:
redis_client = cache.client.get_client()
redis_info = redis_client.info()
except:
pass
except Exception as e:
redis_conn = False
redis_info = {'error': str(e)}
status_data['redis'] = {
'connected': redis_conn,
'status': 'healthy' if redis_conn else 'critical',
'info': redis_info if redis_info else {}
}
except Exception as e:
logger.error(f"Error getting Redis status: {str(e)}")
status_data['redis'] = {'connected': False, 'error': str(e), 'status': 'critical'}
try:
# Celery Status
celery_workers = []
celery_tasks = {
'active': 0,
'scheduled': 0,
'reserved': 0,
}
try:
from celery import current_app
inspect = current_app.control.inspect()
# Get active workers
active_workers = inspect.active() or {}
scheduled = inspect.scheduled() or {}
reserved = inspect.reserved() or {}
celery_workers = list(active_workers.keys())
celery_tasks['active'] = sum(len(tasks) for tasks in active_workers.values())
celery_tasks['scheduled'] = sum(len(tasks) for tasks in scheduled.values())
celery_tasks['reserved'] = sum(len(tasks) for tasks in reserved.values())
except Exception as e:
logger.warning(f"Error getting Celery status: {str(e)}")
celery_workers = []
celery_tasks = {'error': str(e)}
status_data['celery'] = {
'workers': celery_workers,
'worker_count': len(celery_workers),
'tasks': celery_tasks,
'status': 'healthy' if len(celery_workers) > 0 else 'warning'
}
except Exception as e:
logger.error(f"Error getting Celery status: {str(e)}")
status_data['celery'] = {'error': str(e), 'status': 'warning'}
try:
# Process Monitoring by Stack/Component
processes = {
'gunicorn': [],
'celery': [],
'postgres': [],
'redis': [],
'nginx': [],
'other': []
}
process_stats = {
'gunicorn': {'count': 0, 'cpu': 0, 'memory_mb': 0},
'celery': {'count': 0, 'cpu': 0, 'memory_mb': 0},
'postgres': {'count': 0, 'cpu': 0, 'memory_mb': 0},
'redis': {'count': 0, 'cpu': 0, 'memory_mb': 0},
'nginx': {'count': 0, 'cpu': 0, 'memory_mb': 0},
}
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cpu_percent', 'memory_info']):
try:
proc_info = proc.info
name = proc_info['name'].lower()
cmdline = ' '.join(proc_info['cmdline']) if proc_info['cmdline'] else ''
cmdline_lower = cmdline.lower()
cpu = proc_info.get('cpu_percent', 0) or 0
memory = proc_info.get('memory_info', None)
memory_mb = (memory.rss / (1024**2)) if memory else 0
# Categorize processes
if 'gunicorn' in cmdline_lower or 'gunicorn' in name:
processes['gunicorn'].append({
'pid': proc_info['pid'],
'name': name,
'cpu_percent': round(cpu, 2),
'memory_mb': round(memory_mb, 2)
})
process_stats['gunicorn']['count'] += 1
process_stats['gunicorn']['cpu'] += cpu
process_stats['gunicorn']['memory_mb'] += memory_mb
elif 'celery' in cmdline_lower or 'celery' in name:
processes['celery'].append({
'pid': proc_info['pid'],
'name': name,
'cpu_percent': round(cpu, 2),
'memory_mb': round(memory_mb, 2)
})
process_stats['celery']['count'] += 1
process_stats['celery']['cpu'] += cpu
process_stats['celery']['memory_mb'] += memory_mb
elif 'postgres' in name or 'postgresql' in name:
processes['postgres'].append({
'pid': proc_info['pid'],
'name': name,
'cpu_percent': round(cpu, 2),
'memory_mb': round(memory_mb, 2)
})
process_stats['postgres']['count'] += 1
process_stats['postgres']['cpu'] += cpu
process_stats['postgres']['memory_mb'] += memory_mb
elif 'redis' in name or 'redis-server' in name:
processes['redis'].append({
'pid': proc_info['pid'],
'name': name,
'cpu_percent': round(cpu, 2),
'memory_mb': round(memory_mb, 2)
})
process_stats['redis']['count'] += 1
process_stats['redis']['cpu'] += cpu
process_stats['redis']['memory_mb'] += memory_mb
elif 'nginx' in name or 'caddy' in name:
processes['nginx'].append({
'pid': proc_info['pid'],
'name': name,
'cpu_percent': round(cpu, 2),
'memory_mb': round(memory_mb, 2)
})
process_stats['nginx']['count'] += 1
process_stats['nginx']['cpu'] += cpu
process_stats['nginx']['memory_mb'] += memory_mb
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
# Round stats
for key in process_stats:
process_stats[key]['cpu'] = round(process_stats[key]['cpu'], 2)
process_stats[key]['memory_mb'] = round(process_stats[key]['memory_mb'], 2)
status_data['processes'] = {
'by_stack': process_stats,
'details': {k: v[:10] for k, v in processes.items()} # Limit details to 10 per type
}
except Exception as e:
logger.error(f"Error getting process information: {str(e)}")
status_data['processes'] = {'error': str(e)}
try:
# Module-specific task counts
from igny8_core.modules.planner.models import Keywords, Clusters, ContentIdeas
from igny8_core.modules.writer.models import Tasks, Images
status_data['modules'] = {
'planner': {
'keywords': Keywords.objects.count(),
'clusters': Clusters.objects.count(),
'content_ideas': ContentIdeas.objects.count(),
},
'writer': {
'tasks': Tasks.objects.count(),
'images': Images.objects.count(),
}
}
except Exception as e:
logger.error(f"Error getting module statistics: {str(e)}")
status_data['modules'] = {'error': str(e)}
return Response(status_data)
@api_view(['GET'])
@permission_classes([AllowAny]) # Will check admin in view
def get_request_metrics(request, request_id):
"""
Get resource metrics for a specific request.
Only accessible to admins/developers.
"""
# Check if user is admin/developer
if not request.user.is_authenticated:
return Response({'error': 'Authentication required'}, status=http_status.HTTP_401_UNAUTHORIZED)
if not (hasattr(request.user, 'is_admin_or_developer') and request.user.is_admin_or_developer()):
return Response({'error': 'Admin access required'}, status=http_status.HTTP_403_FORBIDDEN)
# Get metrics from cache
from django.core.cache import cache
metrics = cache.get(f"resource_tracking_{request_id}")
if not metrics:
return Response({'error': 'Metrics not found or expired'}, status=http_status.HTTP_404_NOT_FOUND)
return Response(metrics)
@api_view(['POST'])
@permission_classes([AllowAny])
def gitea_webhook(request):
"""
Webhook endpoint to receive push events from Gitea.
Handles automatic deployment when code is pushed to the repository.
"""
import json
import subprocess
import os
try:
# Parse webhook payload
payload = json.loads(request.body)
event_type = request.headers.get('X-Gitea-Event', 'push')
logger.info(f"[Webhook] Received {event_type} event from Gitea")
# Only process push events
if event_type != 'push':
return Response({
'status': 'ignored',
'message': f'Event type {event_type} is not processed'
}, status=http_status.HTTP_200_OK)
# Extract repository information
repository = payload.get('repository', {})
repo_name = repository.get('name', '')
repo_full_name = repository.get('full_name', '')
ref = payload.get('ref', '')
# Only process pushes to main branch
if ref != 'refs/heads/main':
logger.info(f"[Webhook] Ignoring push to {ref}, only processing main branch")
return Response({
'status': 'ignored',
'message': f'Push to {ref} ignored, only main branch is processed'
}, status=http_status.HTTP_200_OK)
# Get commit information
commits = payload.get('commits', [])
commit_count = len(commits)
pusher = payload.get('pusher', {}).get('username', 'unknown')
logger.info(f"[Webhook] Processing push: {commit_count} commit(s) by {pusher} to {repo_full_name}")
# Trigger deployment - restart containers
# Note: Git pull is handled by post-receive hook in Gitea
# This webhook restarts containers to apply changes
deployment_success = False
deployment_error = None
try:
# Try to use docker Python library first, fallback to subprocess
try:
import docker as docker_lib
client = docker_lib.DockerClient(base_url='unix://var/run/docker.sock')
# Restart frontend container (don't restart backend from within itself)
logger.info(f"[Webhook] Restarting frontend container...")
frontend_container = client.containers.get("igny8_frontend")
frontend_container.restart(timeout=30)
logger.info(f"[Webhook] Frontend container restarted successfully")
# Schedule backend restart via subprocess in background (non-blocking)
# This avoids deadlock from restarting the container we're running in
logger.info(f"[Webhook] Scheduling backend container restart...")
import threading
def restart_backend():
import time
time.sleep(2) # Give webhook time to respond
try:
backend_container = client.containers.get("igny8_backend")
backend_container.restart(timeout=30)
logger.info(f"[Webhook] Backend container restarted successfully (delayed)")
except Exception as e:
logger.error(f"[Webhook] Delayed backend restart failed: {e}")
restart_thread = threading.Thread(target=restart_backend, daemon=True)
restart_thread.start()
deployment_success = True
except ImportError:
# Fallback to subprocess with docker command
logger.info(f"[Webhook] Docker library not available, using subprocess...")
# Try /usr/bin/docker or docker in PATH
docker_cmd = "/usr/bin/docker"
import shutil
if not os.path.exists(docker_cmd):
docker_cmd = shutil.which("docker") or "docker"
# Restart backend container
logger.info(f"[Webhook] Restarting backend container...")
backend_result = subprocess.run(
[docker_cmd, "restart", "igny8_backend"],
capture_output=True,
text=True,
timeout=30
)
if backend_result.returncode != 0:
raise Exception(f"Backend restart failed: {backend_result.stderr}")
logger.info(f"[Webhook] Backend container restarted successfully")
# Restart frontend container
logger.info(f"[Webhook] Restarting frontend container...")
frontend_result = subprocess.run(
[docker_cmd, "restart", "igny8_frontend"],
capture_output=True,
text=True,
timeout=30
)
if frontend_result.returncode != 0:
raise Exception(f"Frontend restart failed: {frontend_result.stderr}")
logger.info(f"[Webhook] Frontend container restarted successfully")
deployment_success = True
logger.info(f"[Webhook] Deployment completed: containers restarted")
except subprocess.TimeoutExpired as e:
deployment_error = f"Deployment timeout: {str(e)}"
logger.error(f"[Webhook] {deployment_error}")
except Exception as deploy_error:
deployment_error = str(deploy_error)
logger.error(f"[Webhook] Deployment error: {deploy_error}", exc_info=True)
return Response({
'status': 'success' if deployment_success else 'partial',
'message': 'Webhook received and processed',
'repository': repo_full_name,
'branch': ref,
'commits': commit_count,
'pusher': pusher,
'event': event_type,
'deployment': {
'success': deployment_success,
'error': deployment_error
}
}, status=http_status.HTTP_200_OK)
except json.JSONDecodeError as e:
logger.error(f"[Webhook] Invalid JSON payload: {e}")
return Response({
'status': 'error',
'message': 'Invalid JSON payload'
}, status=http_status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error(f"[Webhook] Error processing webhook: {e}", exc_info=True)
return Response({
'status': 'error',
'message': str(e)
}, status=http_status.HTTP_500_INTERNAL_SERVER_ERROR)