Files
igny8/agent58k/server.py
IGNY8 VPS (Salman) 0b3830c891 1
2025-11-29 01:48:53 +00:00

598 lines
21 KiB
Python

#!/usr/bin/env python3
"""
Qwen Agent MCP Server - Full IDE capabilities
Provides file operations, terminal, code analysis, search, and more
"""
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Optional, Literal, Dict, Any, AsyncGenerator
import time, uuid, json, subprocess, traceback, os, pathlib, re, asyncio
from qwen_agent.agents import Assistant
from qwen_agent.tools.base import BaseTool, register_tool
# =============================================================================
# LLM Configuration
# =============================================================================
LLM_CFG = {
"model": "zdolny/qwen3-coder58k-tools:latest",
"model_server": "http://127.0.0.1:11434/v1",
"api_key": "9cf447669f9b080bcd4ec44ed93488b1ba32319b4a34288a52d549cd2bfddec7",
}
# =============================================================================
# Tool Implementations - Full IDE Capabilities
# =============================================================================
class FileReadTool(BaseTool):
"""Read contents of a file with line range support"""
description = "Read file contents. Supports line ranges for large files."
parameters = [{
'name': 'file_path',
'type': 'string',
'description': 'Absolute path to the file',
'required': True
}, {
'name': 'start_line',
'type': 'integer',
'description': 'Starting line number (1-indexed, optional)',
'required': False
}, {
'name': 'end_line',
'type': 'integer',
'description': 'Ending line number (1-indexed, optional)',
'required': False
}]
def call(self, params: str, **kwargs) -> str:
try:
args = json.loads(params) if isinstance(params, str) else params
file_path = args['file_path']
start_line = args.get('start_line')
end_line = args.get('end_line')
with open(file_path, 'r', encoding='utf-8') as f:
if start_line is not None or end_line is not None:
lines = f.readlines()
start = (start_line - 1) if start_line else 0
end = end_line if end_line else len(lines)
content = ''.join(lines[start:end])
else:
content = f.read()
return f"File: {file_path}\n{'='*60}\n{content}"
except Exception as e:
return f"Error reading file: {str(e)}\n{traceback.format_exc()}"
class FileWriteTool(BaseTool):
"""Write or create a file with content"""
description = "Create or overwrite a file with new content"
parameters = [{
'name': 'file_path',
'type': 'string',
'description': 'Absolute path to the file',
'required': True
}, {
'name': 'content',
'type': 'string',
'description': 'Content to write to the file',
'required': True
}]
def call(self, params: str, **kwargs) -> str:
try:
args = json.loads(params) if isinstance(params, str) else params
file_path = args['file_path']
content = args['content']
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully wrote {len(content)} bytes to {file_path}"
except Exception as e:
return f"Error writing file: {str(e)}\n{traceback.format_exc()}"
class FileEditTool(BaseTool):
"""Replace string in file - for precise edits"""
description = "Replace exact string match in a file with new content"
parameters = [{
'name': 'file_path',
'type': 'string',
'description': 'Absolute path to the file',
'required': True
}, {
'name': 'old_string',
'type': 'string',
'description': 'Exact string to find and replace (include context)',
'required': True
}, {
'name': 'new_string',
'type': 'string',
'description': 'New string to replace with',
'required': True
}]
def call(self, params: str, **kwargs) -> str:
try:
args = json.loads(params) if isinstance(params, str) else params
file_path = args['file_path']
old_string = args['old_string']
new_string = args['new_string']
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if old_string not in content:
return f"Error: old_string not found in {file_path}"
count = content.count(old_string)
if count > 1:
return f"Error: old_string appears {count} times. Be more specific."
new_content = content.replace(old_string, new_string, 1)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(new_content)
return f"Successfully replaced string in {file_path}"
except Exception as e:
return f"Error editing file: {str(e)}\n{traceback.format_exc()}"
class TerminalTool(BaseTool):
"""Execute shell commands"""
description = "Run shell commands in terminal. Use for git, npm, python scripts, etc."
parameters = [{
'name': 'command',
'type': 'string',
'description': 'Shell command to execute',
'required': True
}, {
'name': 'working_dir',
'type': 'string',
'description': 'Working directory (optional)',
'required': False
}, {
'name': 'timeout',
'type': 'integer',
'description': 'Timeout in seconds (default 30)',
'required': False
}]
def call(self, params: str, **kwargs) -> str:
try:
args = json.loads(params) if isinstance(params, str) else params
command = args['command']
working_dir = args.get('working_dir', os.getcwd())
timeout = args.get('timeout', 30)
result = subprocess.run(
command,
shell=True,
cwd=working_dir,
capture_output=True,
text=True,
timeout=timeout
)
output = f"Command: {command}\n"
output += f"Exit Code: {result.returncode}\n"
output += f"{'='*60}\n"
if result.stdout:
output += f"STDOUT:\n{result.stdout}\n"
if result.stderr:
output += f"STDERR:\n{result.stderr}\n"
return output
except subprocess.TimeoutExpired:
return f"Error: Command timed out after {timeout}s"
except Exception as e:
return f"Error executing command: {str(e)}\n{traceback.format_exc()}"
class GrepSearchTool(BaseTool):
"""Search for text/regex in workspace files"""
description = "Search for text or regex patterns in files"
parameters = [{
'name': 'query',
'type': 'string',
'description': 'Text or regex pattern to search for',
'required': True
}, {
'name': 'path',
'type': 'string',
'description': 'Directory to search in (default: current)',
'required': False
}, {
'name': 'is_regex',
'type': 'boolean',
'description': 'Whether query is regex (default: false)',
'required': False
}, {
'name': 'file_pattern',
'type': 'string',
'description': 'File pattern to match (e.g., "*.py")',
'required': False
}]
def call(self, params: str, **kwargs) -> str:
try:
args = json.loads(params) if isinstance(params, str) else params
query = args['query']
search_path = args.get('path', os.getcwd())
is_regex = args.get('is_regex', False)
file_pattern = args.get('file_pattern', '*')
# Use ripgrep if available, otherwise fallback to grep
if subprocess.run(['which', 'rg'], capture_output=True).returncode == 0:
cmd = ['rg', '--line-number', '--color', 'never']
if not is_regex:
cmd.append('--fixed-strings')
if file_pattern != '*':
cmd.extend(['--glob', file_pattern])
cmd.extend([query, search_path])
else:
cmd = ['grep', '-rn']
if not is_regex:
cmd.append('-F')
if file_pattern != '*':
cmd.extend(['--include', file_pattern])
cmd.extend([query, search_path])
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
lines = result.stdout.split('\n')[:100] # Limit to 100 results
return f"Found {len(lines)} matches:\n{chr(10).join(lines)}"
elif result.returncode == 1:
return "No matches found"
else:
return f"Search error: {result.stderr}"
except Exception as e:
return f"Error searching: {str(e)}"
class ListDirectoryTool(BaseTool):
"""List files and directories"""
description = "List contents of a directory"
parameters = [{
'name': 'path',
'type': 'string',
'description': 'Directory path to list',
'required': True
}, {
'name': 'recursive',
'type': 'boolean',
'description': 'List recursively (default: false)',
'required': False
}]
def call(self, params: str, **kwargs) -> str:
try:
args = json.loads(params) if isinstance(params, str) else params
path = args['path']
recursive = args.get('recursive', False)
if recursive:
result = []
for root, dirs, files in os.walk(path):
level = root.replace(path, '').count(os.sep)
indent = ' ' * 2 * level
result.append(f'{indent}{os.path.basename(root)}/')
subindent = ' ' * 2 * (level + 1)
for file in files:
result.append(f'{subindent}{file}')
return '\n'.join(result[:200]) # Limit output
else:
items = os.listdir(path)
result = []
for item in sorted(items):
full_path = os.path.join(path, item)
if os.path.isdir(full_path):
result.append(f"{item}/")
else:
size = os.path.getsize(full_path)
result.append(f"{item} ({size} bytes)")
return '\n'.join(result)
except Exception as e:
return f"Error listing directory: {str(e)}"
class PythonExecuteTool(BaseTool):
"""Execute Python code in isolated scope"""
description = "Execute Python code and return result. Use for calculations, data processing, etc."
parameters = [{
'name': 'code',
'type': 'string',
'description': 'Python code to execute',
'required': True
}]
def call(self, params: str, **kwargs) -> str:
try:
args = json.loads(params) if isinstance(params, str) else params
code = args['code']
# Create isolated scope
scope = {'__builtins__': __builtins__}
# Execute code
exec(code, scope)
# Try to get result
if 'result' in scope:
return str(scope['result'])
elif scope:
# Return last non-builtin value
values = [v for k, v in scope.items() if not k.startswith('_')]
if values:
return str(values[-1])
return "Code executed successfully (no return value)"
except Exception as e:
return f"Python execution error:\n{traceback.format_exc()}"
class GetErrorsTool(BaseTool):
"""Get syntax/lint errors in workspace"""
description = "Check for errors in code files (uses common linters)"
parameters = [{
'name': 'file_path',
'type': 'string',
'description': 'File to check for errors (optional, checks all if not provided)',
'required': False
}]
def call(self, params: str, **kwargs) -> str:
try:
args = json.loads(params) if isinstance(params, str) else params
file_path = args.get('file_path')
if file_path:
# Check specific file
ext = os.path.splitext(file_path)[1]
if ext == '.py':
result = subprocess.run(['python', '-m', 'py_compile', file_path],
capture_output=True, text=True)
if result.returncode != 0:
return f"Python errors in {file_path}:\n{result.stderr}"
return f"No syntax errors in {file_path}"
elif ext in ['.js', '.ts', '.jsx', '.tsx']:
# Try eslint
result = subprocess.run(['npx', 'eslint', file_path],
capture_output=True, text=True)
return result.stdout or "No errors found"
else:
return f"No linter available for {ext} files"
else:
return "Specify a file_path to check for errors"
except Exception as e:
return f"Error checking for errors: {str(e)}"
# =============================================================================
# Register All Tools
# =============================================================================
TOOLS = [
FileReadTool(),
FileWriteTool(),
FileEditTool(),
TerminalTool(),
GrepSearchTool(),
ListDirectoryTool(),
PythonExecuteTool(),
GetErrorsTool(),
]
# Create agent with all tools
agent = Assistant(llm=LLM_CFG, function_list=TOOLS)
# =============================================================================
# FastAPI Server
# =============================================================================
app = FastAPI(title="Qwen Agent MCP Server", version="1.0.0")
class ChatMessage(BaseModel):
role: Literal["system", "user", "assistant", "function"]
content: str
name: Optional[str] = None
class ChatRequest(BaseModel):
model: str
messages: List[ChatMessage]
stream: Optional[bool] = False
temperature: Optional[float] = 0.7
max_tokens: Optional[int] = None
class ChatResponse(BaseModel):
id: str
object: str = "chat.completion"
created: int
model: str
choices: List[Dict[str, Any]]
usage: Dict[str, int]
@app.get("/")
async def root():
return {
"name": "Qwen Agent MCP Server",
"version": "1.0.0",
"capabilities": {
"file_operations": ["read", "write", "edit"],
"terminal": True,
"search": True,
"code_execution": True,
"streaming": True
},
"tools": [tool.__class__.__name__ for tool in TOOLS]
}
@app.get("/v1/models")
async def list_models():
return {
"object": "list",
"data": [{
"id": LLM_CFG["model"],
"object": "model",
"created": int(time.time()),
"owned_by": "qwen-agent"
}]
}
async def stream_agent_response(messages: List[Dict], session_id: str) -> AsyncGenerator[str, None]:
"""Stream agent responses as SSE"""
try:
accumulated_content = []
for response in agent.run(messages=messages, session_id=session_id):
# Handle different response types from qwen_agent
if isinstance(response, list):
for item in response:
if isinstance(item, dict) and 'content' in item:
content = item['content']
accumulated_content.append(content)
# Stream the delta
chunk = {
"id": f"agent-{session_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": LLM_CFG["model"],
"choices": [{
"index": 0,
"delta": {"content": content},
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk)}\n\n"
elif isinstance(response, dict) and 'content' in response:
content = response['content']
accumulated_content.append(content)
chunk = {
"id": f"agent-{session_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": LLM_CFG["model"],
"choices": [{
"index": 0,
"delta": {"content": content},
"finish_reason": None
}]
}
yield f"data: {json.dumps(chunk)}\n\n"
# Send final chunk
final_chunk = {
"id": f"agent-{session_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": LLM_CFG["model"],
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop"
}]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
error_msg = f"Error in agent stream: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
error_chunk = {
"id": f"agent-{session_id}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": LLM_CFG["model"],
"choices": [{
"index": 0,
"delta": {"content": f"\n\nError: {str(e)}"},
"finish_reason": "error"
}]
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield "data: [DONE]\n\n"
@app.post("/v1/chat/completions")
async def chat_completions(req: ChatRequest):
"""Main chat endpoint with streaming support"""
try:
session_id = str(uuid.uuid4())
messages = [m.model_dump() for m in req.messages]
# Handle streaming
if req.stream:
return StreamingResponse(
stream_agent_response(messages, session_id),
media_type="text/event-stream"
)
# Non-streaming response
final_content = []
for response in agent.run(messages=messages, session_id=session_id):
if isinstance(response, list):
for item in response:
if isinstance(item, dict) and 'content' in item:
final_content.append(item['content'])
elif isinstance(item, str):
final_content.append(item)
elif isinstance(response, dict) and 'content' in response:
final_content.append(response['content'])
elif isinstance(response, str):
final_content.append(response)
# Combine all content
combined_content = '\n'.join(str(c) for c in final_content if c)
return {
"id": f"agent-{session_id}",
"object": "chat.completion",
"created": int(time.time()),
"model": req.model,
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": combined_content or "No response generated"
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
}
except Exception as e:
print(f"Error in chat completion: {str(e)}")
print(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
return {"status": "healthy", "agent": "ready"}
if __name__ == "__main__":
import uvicorn
print("🚀 Starting Qwen Agent MCP Server...")
print(f"📦 Model: {LLM_CFG['model']}")
print(f"🔧 Tools: {len(TOOLS)}")
print(f"🌐 Server: http://localhost:8000")
print(f"📚 Docs: http://localhost:8000/docs")
uvicorn.run(app, host="0.0.0.0", port=8000)