#!/usr/bin/env python3 """ Qwen Agent MCP Server - Full IDE capabilities Provides file operations, terminal, code analysis, search, and more """ from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from typing import List, Optional, Literal, Dict, Any, AsyncGenerator import time, uuid, json, subprocess, traceback, os, pathlib, re, asyncio from qwen_agent.agents import Assistant from qwen_agent.tools.base import BaseTool, register_tool # ============================================================================= # LLM Configuration # ============================================================================= LLM_CFG = { "model": "zdolny/qwen3-coder58k-tools:latest", "model_server": "http://127.0.0.1:11434/v1", "api_key": "9cf447669f9b080bcd4ec44ed93488b1ba32319b4a34288a52d549cd2bfddec7", } # ============================================================================= # Tool Implementations - Full IDE Capabilities # ============================================================================= class FileReadTool(BaseTool): """Read contents of a file with line range support""" description = "Read file contents. Supports line ranges for large files." parameters = [{ 'name': 'file_path', 'type': 'string', 'description': 'Absolute path to the file', 'required': True }, { 'name': 'start_line', 'type': 'integer', 'description': 'Starting line number (1-indexed, optional)', 'required': False }, { 'name': 'end_line', 'type': 'integer', 'description': 'Ending line number (1-indexed, optional)', 'required': False }] def call(self, params: str, **kwargs) -> str: try: args = json.loads(params) if isinstance(params, str) else params file_path = args['file_path'] start_line = args.get('start_line') end_line = args.get('end_line') with open(file_path, 'r', encoding='utf-8') as f: if start_line is not None or end_line is not None: lines = f.readlines() start = (start_line - 1) if start_line else 0 end = end_line if end_line else len(lines) content = ''.join(lines[start:end]) else: content = f.read() return f"File: {file_path}\n{'='*60}\n{content}" except Exception as e: return f"Error reading file: {str(e)}\n{traceback.format_exc()}" class FileWriteTool(BaseTool): """Write or create a file with content""" description = "Create or overwrite a file with new content" parameters = [{ 'name': 'file_path', 'type': 'string', 'description': 'Absolute path to the file', 'required': True }, { 'name': 'content', 'type': 'string', 'description': 'Content to write to the file', 'required': True }] def call(self, params: str, **kwargs) -> str: try: args = json.loads(params) if isinstance(params, str) else params file_path = args['file_path'] content = args['content'] # Create directory if it doesn't exist os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: f.write(content) return f"Successfully wrote {len(content)} bytes to {file_path}" except Exception as e: return f"Error writing file: {str(e)}\n{traceback.format_exc()}" class FileEditTool(BaseTool): """Replace string in file - for precise edits""" description = "Replace exact string match in a file with new content" parameters = [{ 'name': 'file_path', 'type': 'string', 'description': 'Absolute path to the file', 'required': True }, { 'name': 'old_string', 'type': 'string', 'description': 'Exact string to find and replace (include context)', 'required': True }, { 'name': 'new_string', 'type': 'string', 'description': 'New string to replace with', 'required': True }] def call(self, params: str, **kwargs) -> str: try: args = json.loads(params) if isinstance(params, str) else params file_path = args['file_path'] old_string = args['old_string'] new_string = args['new_string'] with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if old_string not in content: return f"Error: old_string not found in {file_path}" count = content.count(old_string) if count > 1: return f"Error: old_string appears {count} times. Be more specific." new_content = content.replace(old_string, new_string, 1) with open(file_path, 'w', encoding='utf-8') as f: f.write(new_content) return f"Successfully replaced string in {file_path}" except Exception as e: return f"Error editing file: {str(e)}\n{traceback.format_exc()}" class TerminalTool(BaseTool): """Execute shell commands""" description = "Run shell commands in terminal. Use for git, npm, python scripts, etc." parameters = [{ 'name': 'command', 'type': 'string', 'description': 'Shell command to execute', 'required': True }, { 'name': 'working_dir', 'type': 'string', 'description': 'Working directory (optional)', 'required': False }, { 'name': 'timeout', 'type': 'integer', 'description': 'Timeout in seconds (default 30)', 'required': False }] def call(self, params: str, **kwargs) -> str: try: args = json.loads(params) if isinstance(params, str) else params command = args['command'] working_dir = args.get('working_dir', os.getcwd()) timeout = args.get('timeout', 30) result = subprocess.run( command, shell=True, cwd=working_dir, capture_output=True, text=True, timeout=timeout ) output = f"Command: {command}\n" output += f"Exit Code: {result.returncode}\n" output += f"{'='*60}\n" if result.stdout: output += f"STDOUT:\n{result.stdout}\n" if result.stderr: output += f"STDERR:\n{result.stderr}\n" return output except subprocess.TimeoutExpired: return f"Error: Command timed out after {timeout}s" except Exception as e: return f"Error executing command: {str(e)}\n{traceback.format_exc()}" class GrepSearchTool(BaseTool): """Search for text/regex in workspace files""" description = "Search for text or regex patterns in files" parameters = [{ 'name': 'query', 'type': 'string', 'description': 'Text or regex pattern to search for', 'required': True }, { 'name': 'path', 'type': 'string', 'description': 'Directory to search in (default: current)', 'required': False }, { 'name': 'is_regex', 'type': 'boolean', 'description': 'Whether query is regex (default: false)', 'required': False }, { 'name': 'file_pattern', 'type': 'string', 'description': 'File pattern to match (e.g., "*.py")', 'required': False }] def call(self, params: str, **kwargs) -> str: try: args = json.loads(params) if isinstance(params, str) else params query = args['query'] search_path = args.get('path', os.getcwd()) is_regex = args.get('is_regex', False) file_pattern = args.get('file_pattern', '*') # Use ripgrep if available, otherwise fallback to grep if subprocess.run(['which', 'rg'], capture_output=True).returncode == 0: cmd = ['rg', '--line-number', '--color', 'never'] if not is_regex: cmd.append('--fixed-strings') if file_pattern != '*': cmd.extend(['--glob', file_pattern]) cmd.extend([query, search_path]) else: cmd = ['grep', '-rn'] if not is_regex: cmd.append('-F') if file_pattern != '*': cmd.extend(['--include', file_pattern]) cmd.extend([query, search_path]) result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode == 0: lines = result.stdout.split('\n')[:100] # Limit to 100 results return f"Found {len(lines)} matches:\n{chr(10).join(lines)}" elif result.returncode == 1: return "No matches found" else: return f"Search error: {result.stderr}" except Exception as e: return f"Error searching: {str(e)}" class ListDirectoryTool(BaseTool): """List files and directories""" description = "List contents of a directory" parameters = [{ 'name': 'path', 'type': 'string', 'description': 'Directory path to list', 'required': True }, { 'name': 'recursive', 'type': 'boolean', 'description': 'List recursively (default: false)', 'required': False }] def call(self, params: str, **kwargs) -> str: try: args = json.loads(params) if isinstance(params, str) else params path = args['path'] recursive = args.get('recursive', False) if recursive: result = [] for root, dirs, files in os.walk(path): level = root.replace(path, '').count(os.sep) indent = ' ' * 2 * level result.append(f'{indent}{os.path.basename(root)}/') subindent = ' ' * 2 * (level + 1) for file in files: result.append(f'{subindent}{file}') return '\n'.join(result[:200]) # Limit output else: items = os.listdir(path) result = [] for item in sorted(items): full_path = os.path.join(path, item) if os.path.isdir(full_path): result.append(f"{item}/") else: size = os.path.getsize(full_path) result.append(f"{item} ({size} bytes)") return '\n'.join(result) except Exception as e: return f"Error listing directory: {str(e)}" class PythonExecuteTool(BaseTool): """Execute Python code in isolated scope""" description = "Execute Python code and return result. Use for calculations, data processing, etc." parameters = [{ 'name': 'code', 'type': 'string', 'description': 'Python code to execute', 'required': True }] def call(self, params: str, **kwargs) -> str: try: args = json.loads(params) if isinstance(params, str) else params code = args['code'] # Create isolated scope scope = {'__builtins__': __builtins__} # Execute code exec(code, scope) # Try to get result if 'result' in scope: return str(scope['result']) elif scope: # Return last non-builtin value values = [v for k, v in scope.items() if not k.startswith('_')] if values: return str(values[-1]) return "Code executed successfully (no return value)" except Exception as e: return f"Python execution error:\n{traceback.format_exc()}" class GetErrorsTool(BaseTool): """Get syntax/lint errors in workspace""" description = "Check for errors in code files (uses common linters)" parameters = [{ 'name': 'file_path', 'type': 'string', 'description': 'File to check for errors (optional, checks all if not provided)', 'required': False }] def call(self, params: str, **kwargs) -> str: try: args = json.loads(params) if isinstance(params, str) else params file_path = args.get('file_path') if file_path: # Check specific file ext = os.path.splitext(file_path)[1] if ext == '.py': result = subprocess.run(['python', '-m', 'py_compile', file_path], capture_output=True, text=True) if result.returncode != 0: return f"Python errors in {file_path}:\n{result.stderr}" return f"No syntax errors in {file_path}" elif ext in ['.js', '.ts', '.jsx', '.tsx']: # Try eslint result = subprocess.run(['npx', 'eslint', file_path], capture_output=True, text=True) return result.stdout or "No errors found" else: return f"No linter available for {ext} files" else: return "Specify a file_path to check for errors" except Exception as e: return f"Error checking for errors: {str(e)}" # ============================================================================= # Register All Tools # ============================================================================= TOOLS = [ FileReadTool(), FileWriteTool(), FileEditTool(), TerminalTool(), GrepSearchTool(), ListDirectoryTool(), PythonExecuteTool(), GetErrorsTool(), ] # Create agent with all tools agent = Assistant(llm=LLM_CFG, function_list=TOOLS) # ============================================================================= # FastAPI Server # ============================================================================= app = FastAPI(title="Qwen Agent MCP Server", version="1.0.0") class ChatMessage(BaseModel): role: Literal["system", "user", "assistant", "function"] content: str name: Optional[str] = None class ChatRequest(BaseModel): model: str messages: List[ChatMessage] stream: Optional[bool] = False temperature: Optional[float] = 0.7 max_tokens: Optional[int] = None class ChatResponse(BaseModel): id: str object: str = "chat.completion" created: int model: str choices: List[Dict[str, Any]] usage: Dict[str, int] @app.get("/") async def root(): return { "name": "Qwen Agent MCP Server", "version": "1.0.0", "capabilities": { "file_operations": ["read", "write", "edit"], "terminal": True, "search": True, "code_execution": True, "streaming": True }, "tools": [tool.__class__.__name__ for tool in TOOLS] } @app.get("/v1/models") async def list_models(): return { "object": "list", "data": [{ "id": LLM_CFG["model"], "object": "model", "created": int(time.time()), "owned_by": "qwen-agent" }] } async def stream_agent_response(messages: List[Dict], session_id: str) -> AsyncGenerator[str, None]: """Stream agent responses as SSE""" try: accumulated_content = [] for response in agent.run(messages=messages, session_id=session_id): # Handle different response types from qwen_agent if isinstance(response, list): for item in response: if isinstance(item, dict) and 'content' in item: content = item['content'] accumulated_content.append(content) # Stream the delta chunk = { "id": f"agent-{session_id}", "object": "chat.completion.chunk", "created": int(time.time()), "model": LLM_CFG["model"], "choices": [{ "index": 0, "delta": {"content": content}, "finish_reason": None }] } yield f"data: {json.dumps(chunk)}\n\n" elif isinstance(response, dict) and 'content' in response: content = response['content'] accumulated_content.append(content) chunk = { "id": f"agent-{session_id}", "object": "chat.completion.chunk", "created": int(time.time()), "model": LLM_CFG["model"], "choices": [{ "index": 0, "delta": {"content": content}, "finish_reason": None }] } yield f"data: {json.dumps(chunk)}\n\n" # Send final chunk final_chunk = { "id": f"agent-{session_id}", "object": "chat.completion.chunk", "created": int(time.time()), "model": LLM_CFG["model"], "choices": [{ "index": 0, "delta": {}, "finish_reason": "stop" }] } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n" except Exception as e: error_msg = f"Error in agent stream: {str(e)}\n{traceback.format_exc()}" print(error_msg) error_chunk = { "id": f"agent-{session_id}", "object": "chat.completion.chunk", "created": int(time.time()), "model": LLM_CFG["model"], "choices": [{ "index": 0, "delta": {"content": f"\n\nError: {str(e)}"}, "finish_reason": "error" }] } yield f"data: {json.dumps(error_chunk)}\n\n" yield "data: [DONE]\n\n" @app.post("/v1/chat/completions") async def chat_completions(req: ChatRequest): """Main chat endpoint with streaming support""" try: session_id = str(uuid.uuid4()) messages = [m.model_dump() for m in req.messages] # Handle streaming if req.stream: return StreamingResponse( stream_agent_response(messages, session_id), media_type="text/event-stream" ) # Non-streaming response final_content = [] for response in agent.run(messages=messages, session_id=session_id): if isinstance(response, list): for item in response: if isinstance(item, dict) and 'content' in item: final_content.append(item['content']) elif isinstance(item, str): final_content.append(item) elif isinstance(response, dict) and 'content' in response: final_content.append(response['content']) elif isinstance(response, str): final_content.append(response) # Combine all content combined_content = '\n'.join(str(c) for c in final_content if c) return { "id": f"agent-{session_id}", "object": "chat.completion", "created": int(time.time()), "model": req.model, "choices": [{ "index": 0, "message": { "role": "assistant", "content": combined_content or "No response generated" }, "finish_reason": "stop" }], "usage": { "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0 } } except Exception as e: print(f"Error in chat completion: {str(e)}") print(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) @app.get("/health") async def health(): return {"status": "healthy", "agent": "ready"} if __name__ == "__main__": import uvicorn print("🚀 Starting Qwen Agent MCP Server...") print(f"📦 Model: {LLM_CFG['model']}") print(f"🔧 Tools: {len(TOOLS)}") print(f"🌐 Server: http://localhost:8000") print(f"📚 Docs: http://localhost:8000/docs") uvicorn.run(app, host="0.0.0.0", port=8000)