598 lines
21 KiB
Python
598 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Qwen Agent MCP Server - Full IDE capabilities
|
|
Provides file operations, terminal, code analysis, search, and more
|
|
"""
|
|
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import StreamingResponse
|
|
from pydantic import BaseModel, Field
|
|
from typing import List, Optional, Literal, Dict, Any, AsyncGenerator
|
|
import time, uuid, json, subprocess, traceback, os, pathlib, re, asyncio
|
|
from qwen_agent.agents import Assistant
|
|
from qwen_agent.tools.base import BaseTool, register_tool
|
|
|
|
# =============================================================================
|
|
# LLM Configuration
|
|
# =============================================================================
|
|
LLM_CFG = {
|
|
"model": "zdolny/qwen3-coder58k-tools:latest",
|
|
"model_server": "http://127.0.0.1:11434/v1",
|
|
"api_key": "9cf447669f9b080bcd4ec44ed93488b1ba32319b4a34288a52d549cd2bfddec7",
|
|
}
|
|
|
|
# =============================================================================
|
|
# Tool Implementations - Full IDE Capabilities
|
|
# =============================================================================
|
|
|
|
class FileReadTool(BaseTool):
|
|
"""Read contents of a file with line range support"""
|
|
description = "Read file contents. Supports line ranges for large files."
|
|
parameters = [{
|
|
'name': 'file_path',
|
|
'type': 'string',
|
|
'description': 'Absolute path to the file',
|
|
'required': True
|
|
}, {
|
|
'name': 'start_line',
|
|
'type': 'integer',
|
|
'description': 'Starting line number (1-indexed, optional)',
|
|
'required': False
|
|
}, {
|
|
'name': 'end_line',
|
|
'type': 'integer',
|
|
'description': 'Ending line number (1-indexed, optional)',
|
|
'required': False
|
|
}]
|
|
|
|
def call(self, params: str, **kwargs) -> str:
|
|
try:
|
|
args = json.loads(params) if isinstance(params, str) else params
|
|
file_path = args['file_path']
|
|
start_line = args.get('start_line')
|
|
end_line = args.get('end_line')
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
if start_line is not None or end_line is not None:
|
|
lines = f.readlines()
|
|
start = (start_line - 1) if start_line else 0
|
|
end = end_line if end_line else len(lines)
|
|
content = ''.join(lines[start:end])
|
|
else:
|
|
content = f.read()
|
|
|
|
return f"File: {file_path}\n{'='*60}\n{content}"
|
|
except Exception as e:
|
|
return f"Error reading file: {str(e)}\n{traceback.format_exc()}"
|
|
|
|
|
|
class FileWriteTool(BaseTool):
|
|
"""Write or create a file with content"""
|
|
description = "Create or overwrite a file with new content"
|
|
parameters = [{
|
|
'name': 'file_path',
|
|
'type': 'string',
|
|
'description': 'Absolute path to the file',
|
|
'required': True
|
|
}, {
|
|
'name': 'content',
|
|
'type': 'string',
|
|
'description': 'Content to write to the file',
|
|
'required': True
|
|
}]
|
|
|
|
def call(self, params: str, **kwargs) -> str:
|
|
try:
|
|
args = json.loads(params) if isinstance(params, str) else params
|
|
file_path = args['file_path']
|
|
content = args['content']
|
|
|
|
# Create directory if it doesn't exist
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
f.write(content)
|
|
|
|
return f"Successfully wrote {len(content)} bytes to {file_path}"
|
|
except Exception as e:
|
|
return f"Error writing file: {str(e)}\n{traceback.format_exc()}"
|
|
|
|
|
|
class FileEditTool(BaseTool):
|
|
"""Replace string in file - for precise edits"""
|
|
description = "Replace exact string match in a file with new content"
|
|
parameters = [{
|
|
'name': 'file_path',
|
|
'type': 'string',
|
|
'description': 'Absolute path to the file',
|
|
'required': True
|
|
}, {
|
|
'name': 'old_string',
|
|
'type': 'string',
|
|
'description': 'Exact string to find and replace (include context)',
|
|
'required': True
|
|
}, {
|
|
'name': 'new_string',
|
|
'type': 'string',
|
|
'description': 'New string to replace with',
|
|
'required': True
|
|
}]
|
|
|
|
def call(self, params: str, **kwargs) -> str:
|
|
try:
|
|
args = json.loads(params) if isinstance(params, str) else params
|
|
file_path = args['file_path']
|
|
old_string = args['old_string']
|
|
new_string = args['new_string']
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
if old_string not in content:
|
|
return f"Error: old_string not found in {file_path}"
|
|
|
|
count = content.count(old_string)
|
|
if count > 1:
|
|
return f"Error: old_string appears {count} times. Be more specific."
|
|
|
|
new_content = content.replace(old_string, new_string, 1)
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
f.write(new_content)
|
|
|
|
return f"Successfully replaced string in {file_path}"
|
|
except Exception as e:
|
|
return f"Error editing file: {str(e)}\n{traceback.format_exc()}"
|
|
|
|
|
|
class TerminalTool(BaseTool):
|
|
"""Execute shell commands"""
|
|
description = "Run shell commands in terminal. Use for git, npm, python scripts, etc."
|
|
parameters = [{
|
|
'name': 'command',
|
|
'type': 'string',
|
|
'description': 'Shell command to execute',
|
|
'required': True
|
|
}, {
|
|
'name': 'working_dir',
|
|
'type': 'string',
|
|
'description': 'Working directory (optional)',
|
|
'required': False
|
|
}, {
|
|
'name': 'timeout',
|
|
'type': 'integer',
|
|
'description': 'Timeout in seconds (default 30)',
|
|
'required': False
|
|
}]
|
|
|
|
def call(self, params: str, **kwargs) -> str:
|
|
try:
|
|
args = json.loads(params) if isinstance(params, str) else params
|
|
command = args['command']
|
|
working_dir = args.get('working_dir', os.getcwd())
|
|
timeout = args.get('timeout', 30)
|
|
|
|
result = subprocess.run(
|
|
command,
|
|
shell=True,
|
|
cwd=working_dir,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout
|
|
)
|
|
|
|
output = f"Command: {command}\n"
|
|
output += f"Exit Code: {result.returncode}\n"
|
|
output += f"{'='*60}\n"
|
|
if result.stdout:
|
|
output += f"STDOUT:\n{result.stdout}\n"
|
|
if result.stderr:
|
|
output += f"STDERR:\n{result.stderr}\n"
|
|
|
|
return output
|
|
except subprocess.TimeoutExpired:
|
|
return f"Error: Command timed out after {timeout}s"
|
|
except Exception as e:
|
|
return f"Error executing command: {str(e)}\n{traceback.format_exc()}"
|
|
|
|
|
|
class GrepSearchTool(BaseTool):
|
|
"""Search for text/regex in workspace files"""
|
|
description = "Search for text or regex patterns in files"
|
|
parameters = [{
|
|
'name': 'query',
|
|
'type': 'string',
|
|
'description': 'Text or regex pattern to search for',
|
|
'required': True
|
|
}, {
|
|
'name': 'path',
|
|
'type': 'string',
|
|
'description': 'Directory to search in (default: current)',
|
|
'required': False
|
|
}, {
|
|
'name': 'is_regex',
|
|
'type': 'boolean',
|
|
'description': 'Whether query is regex (default: false)',
|
|
'required': False
|
|
}, {
|
|
'name': 'file_pattern',
|
|
'type': 'string',
|
|
'description': 'File pattern to match (e.g., "*.py")',
|
|
'required': False
|
|
}]
|
|
|
|
def call(self, params: str, **kwargs) -> str:
|
|
try:
|
|
args = json.loads(params) if isinstance(params, str) else params
|
|
query = args['query']
|
|
search_path = args.get('path', os.getcwd())
|
|
is_regex = args.get('is_regex', False)
|
|
file_pattern = args.get('file_pattern', '*')
|
|
|
|
# Use ripgrep if available, otherwise fallback to grep
|
|
if subprocess.run(['which', 'rg'], capture_output=True).returncode == 0:
|
|
cmd = ['rg', '--line-number', '--color', 'never']
|
|
if not is_regex:
|
|
cmd.append('--fixed-strings')
|
|
if file_pattern != '*':
|
|
cmd.extend(['--glob', file_pattern])
|
|
cmd.extend([query, search_path])
|
|
else:
|
|
cmd = ['grep', '-rn']
|
|
if not is_regex:
|
|
cmd.append('-F')
|
|
if file_pattern != '*':
|
|
cmd.extend(['--include', file_pattern])
|
|
cmd.extend([query, search_path])
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
if result.returncode == 0:
|
|
lines = result.stdout.split('\n')[:100] # Limit to 100 results
|
|
return f"Found {len(lines)} matches:\n{chr(10).join(lines)}"
|
|
elif result.returncode == 1:
|
|
return "No matches found"
|
|
else:
|
|
return f"Search error: {result.stderr}"
|
|
except Exception as e:
|
|
return f"Error searching: {str(e)}"
|
|
|
|
|
|
class ListDirectoryTool(BaseTool):
|
|
"""List files and directories"""
|
|
description = "List contents of a directory"
|
|
parameters = [{
|
|
'name': 'path',
|
|
'type': 'string',
|
|
'description': 'Directory path to list',
|
|
'required': True
|
|
}, {
|
|
'name': 'recursive',
|
|
'type': 'boolean',
|
|
'description': 'List recursively (default: false)',
|
|
'required': False
|
|
}]
|
|
|
|
def call(self, params: str, **kwargs) -> str:
|
|
try:
|
|
args = json.loads(params) if isinstance(params, str) else params
|
|
path = args['path']
|
|
recursive = args.get('recursive', False)
|
|
|
|
if recursive:
|
|
result = []
|
|
for root, dirs, files in os.walk(path):
|
|
level = root.replace(path, '').count(os.sep)
|
|
indent = ' ' * 2 * level
|
|
result.append(f'{indent}{os.path.basename(root)}/')
|
|
subindent = ' ' * 2 * (level + 1)
|
|
for file in files:
|
|
result.append(f'{subindent}{file}')
|
|
return '\n'.join(result[:200]) # Limit output
|
|
else:
|
|
items = os.listdir(path)
|
|
result = []
|
|
for item in sorted(items):
|
|
full_path = os.path.join(path, item)
|
|
if os.path.isdir(full_path):
|
|
result.append(f"{item}/")
|
|
else:
|
|
size = os.path.getsize(full_path)
|
|
result.append(f"{item} ({size} bytes)")
|
|
return '\n'.join(result)
|
|
except Exception as e:
|
|
return f"Error listing directory: {str(e)}"
|
|
|
|
|
|
class PythonExecuteTool(BaseTool):
|
|
"""Execute Python code in isolated scope"""
|
|
description = "Execute Python code and return result. Use for calculations, data processing, etc."
|
|
parameters = [{
|
|
'name': 'code',
|
|
'type': 'string',
|
|
'description': 'Python code to execute',
|
|
'required': True
|
|
}]
|
|
|
|
def call(self, params: str, **kwargs) -> str:
|
|
try:
|
|
args = json.loads(params) if isinstance(params, str) else params
|
|
code = args['code']
|
|
|
|
# Create isolated scope
|
|
scope = {'__builtins__': __builtins__}
|
|
|
|
# Execute code
|
|
exec(code, scope)
|
|
|
|
# Try to get result
|
|
if 'result' in scope:
|
|
return str(scope['result'])
|
|
elif scope:
|
|
# Return last non-builtin value
|
|
values = [v for k, v in scope.items() if not k.startswith('_')]
|
|
if values:
|
|
return str(values[-1])
|
|
|
|
return "Code executed successfully (no return value)"
|
|
except Exception as e:
|
|
return f"Python execution error:\n{traceback.format_exc()}"
|
|
|
|
|
|
class GetErrorsTool(BaseTool):
|
|
"""Get syntax/lint errors in workspace"""
|
|
description = "Check for errors in code files (uses common linters)"
|
|
parameters = [{
|
|
'name': 'file_path',
|
|
'type': 'string',
|
|
'description': 'File to check for errors (optional, checks all if not provided)',
|
|
'required': False
|
|
}]
|
|
|
|
def call(self, params: str, **kwargs) -> str:
|
|
try:
|
|
args = json.loads(params) if isinstance(params, str) else params
|
|
file_path = args.get('file_path')
|
|
|
|
if file_path:
|
|
# Check specific file
|
|
ext = os.path.splitext(file_path)[1]
|
|
if ext == '.py':
|
|
result = subprocess.run(['python', '-m', 'py_compile', file_path],
|
|
capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
return f"Python errors in {file_path}:\n{result.stderr}"
|
|
return f"No syntax errors in {file_path}"
|
|
elif ext in ['.js', '.ts', '.jsx', '.tsx']:
|
|
# Try eslint
|
|
result = subprocess.run(['npx', 'eslint', file_path],
|
|
capture_output=True, text=True)
|
|
return result.stdout or "No errors found"
|
|
else:
|
|
return f"No linter available for {ext} files"
|
|
else:
|
|
return "Specify a file_path to check for errors"
|
|
except Exception as e:
|
|
return f"Error checking for errors: {str(e)}"
|
|
|
|
|
|
# =============================================================================
|
|
# Register All Tools
|
|
# =============================================================================
|
|
TOOLS = [
|
|
FileReadTool(),
|
|
FileWriteTool(),
|
|
FileEditTool(),
|
|
TerminalTool(),
|
|
GrepSearchTool(),
|
|
ListDirectoryTool(),
|
|
PythonExecuteTool(),
|
|
GetErrorsTool(),
|
|
]
|
|
|
|
# Create agent with all tools
|
|
agent = Assistant(llm=LLM_CFG, function_list=TOOLS)
|
|
|
|
# =============================================================================
|
|
# FastAPI Server
|
|
# =============================================================================
|
|
app = FastAPI(title="Qwen Agent MCP Server", version="1.0.0")
|
|
|
|
class ChatMessage(BaseModel):
|
|
role: Literal["system", "user", "assistant", "function"]
|
|
content: str
|
|
name: Optional[str] = None
|
|
|
|
class ChatRequest(BaseModel):
|
|
model: str
|
|
messages: List[ChatMessage]
|
|
stream: Optional[bool] = False
|
|
temperature: Optional[float] = 0.7
|
|
max_tokens: Optional[int] = None
|
|
|
|
class ChatResponse(BaseModel):
|
|
id: str
|
|
object: str = "chat.completion"
|
|
created: int
|
|
model: str
|
|
choices: List[Dict[str, Any]]
|
|
usage: Dict[str, int]
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return {
|
|
"name": "Qwen Agent MCP Server",
|
|
"version": "1.0.0",
|
|
"capabilities": {
|
|
"file_operations": ["read", "write", "edit"],
|
|
"terminal": True,
|
|
"search": True,
|
|
"code_execution": True,
|
|
"streaming": True
|
|
},
|
|
"tools": [tool.__class__.__name__ for tool in TOOLS]
|
|
}
|
|
|
|
|
|
@app.get("/v1/models")
|
|
async def list_models():
|
|
return {
|
|
"object": "list",
|
|
"data": [{
|
|
"id": LLM_CFG["model"],
|
|
"object": "model",
|
|
"created": int(time.time()),
|
|
"owned_by": "qwen-agent"
|
|
}]
|
|
}
|
|
|
|
|
|
async def stream_agent_response(messages: List[Dict], session_id: str) -> AsyncGenerator[str, None]:
|
|
"""Stream agent responses as SSE"""
|
|
try:
|
|
accumulated_content = []
|
|
|
|
for response in agent.run(messages=messages, session_id=session_id):
|
|
# Handle different response types from qwen_agent
|
|
if isinstance(response, list):
|
|
for item in response:
|
|
if isinstance(item, dict) and 'content' in item:
|
|
content = item['content']
|
|
accumulated_content.append(content)
|
|
|
|
# Stream the delta
|
|
chunk = {
|
|
"id": f"agent-{session_id}",
|
|
"object": "chat.completion.chunk",
|
|
"created": int(time.time()),
|
|
"model": LLM_CFG["model"],
|
|
"choices": [{
|
|
"index": 0,
|
|
"delta": {"content": content},
|
|
"finish_reason": None
|
|
}]
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n"
|
|
elif isinstance(response, dict) and 'content' in response:
|
|
content = response['content']
|
|
accumulated_content.append(content)
|
|
|
|
chunk = {
|
|
"id": f"agent-{session_id}",
|
|
"object": "chat.completion.chunk",
|
|
"created": int(time.time()),
|
|
"model": LLM_CFG["model"],
|
|
"choices": [{
|
|
"index": 0,
|
|
"delta": {"content": content},
|
|
"finish_reason": None
|
|
}]
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n"
|
|
|
|
# Send final chunk
|
|
final_chunk = {
|
|
"id": f"agent-{session_id}",
|
|
"object": "chat.completion.chunk",
|
|
"created": int(time.time()),
|
|
"model": LLM_CFG["model"],
|
|
"choices": [{
|
|
"index": 0,
|
|
"delta": {},
|
|
"finish_reason": "stop"
|
|
}]
|
|
}
|
|
yield f"data: {json.dumps(final_chunk)}\n\n"
|
|
yield "data: [DONE]\n\n"
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error in agent stream: {str(e)}\n{traceback.format_exc()}"
|
|
print(error_msg)
|
|
error_chunk = {
|
|
"id": f"agent-{session_id}",
|
|
"object": "chat.completion.chunk",
|
|
"created": int(time.time()),
|
|
"model": LLM_CFG["model"],
|
|
"choices": [{
|
|
"index": 0,
|
|
"delta": {"content": f"\n\nError: {str(e)}"},
|
|
"finish_reason": "error"
|
|
}]
|
|
}
|
|
yield f"data: {json.dumps(error_chunk)}\n\n"
|
|
yield "data: [DONE]\n\n"
|
|
|
|
|
|
@app.post("/v1/chat/completions")
|
|
async def chat_completions(req: ChatRequest):
|
|
"""Main chat endpoint with streaming support"""
|
|
try:
|
|
session_id = str(uuid.uuid4())
|
|
messages = [m.model_dump() for m in req.messages]
|
|
|
|
# Handle streaming
|
|
if req.stream:
|
|
return StreamingResponse(
|
|
stream_agent_response(messages, session_id),
|
|
media_type="text/event-stream"
|
|
)
|
|
|
|
# Non-streaming response
|
|
final_content = []
|
|
|
|
for response in agent.run(messages=messages, session_id=session_id):
|
|
if isinstance(response, list):
|
|
for item in response:
|
|
if isinstance(item, dict) and 'content' in item:
|
|
final_content.append(item['content'])
|
|
elif isinstance(item, str):
|
|
final_content.append(item)
|
|
elif isinstance(response, dict) and 'content' in response:
|
|
final_content.append(response['content'])
|
|
elif isinstance(response, str):
|
|
final_content.append(response)
|
|
|
|
# Combine all content
|
|
combined_content = '\n'.join(str(c) for c in final_content if c)
|
|
|
|
return {
|
|
"id": f"agent-{session_id}",
|
|
"object": "chat.completion",
|
|
"created": int(time.time()),
|
|
"model": req.model,
|
|
"choices": [{
|
|
"index": 0,
|
|
"message": {
|
|
"role": "assistant",
|
|
"content": combined_content or "No response generated"
|
|
},
|
|
"finish_reason": "stop"
|
|
}],
|
|
"usage": {
|
|
"prompt_tokens": 0,
|
|
"completion_tokens": 0,
|
|
"total_tokens": 0
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error in chat completion: {str(e)}")
|
|
print(traceback.format_exc())
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "healthy", "agent": "ready"}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
print("🚀 Starting Qwen Agent MCP Server...")
|
|
print(f"📦 Model: {LLM_CFG['model']}")
|
|
print(f"🔧 Tools: {len(TOOLS)}")
|
|
print(f"🌐 Server: http://localhost:8000")
|
|
print(f"📚 Docs: http://localhost:8000/docs")
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|