From 938e27c75e09fef07592345cb76d7c0fb70414b9 Mon Sep 17 00:00:00 2001 From: goulustis Date: Mon, 10 Nov 2025 13:18:37 +0800 Subject: [PATCH] remove unused --- lang_agent/mcp_server.py | 76 -------- scripts/run_agent_server.py | 16 -- scripts/start_mcp_server.py | 20 -- scripts/ws_start_all_mcps.py | 84 --------- scripts/ws_start_register_tools.py | 288 ----------------------------- 5 files changed, 484 deletions(-) delete mode 100644 lang_agent/mcp_server.py delete mode 100644 scripts/run_agent_server.py delete mode 100644 scripts/start_mcp_server.py delete mode 100644 scripts/ws_start_all_mcps.py delete mode 100644 scripts/ws_start_register_tools.py diff --git a/lang_agent/mcp_server.py b/lang_agent/mcp_server.py deleted file mode 100644 index 619fd1f..0000000 --- a/lang_agent/mcp_server.py +++ /dev/null @@ -1,76 +0,0 @@ -# https://gofastmcp.com/patterns/decorating-methods -from dataclasses import dataclass, field, is_dataclass -from typing import Type, Literal -import tyro -from fastmcp import FastMCP -from fastapi.middleware.cors import CORSMiddleware -from fastmcp.tools.tool import FunctionTool -from loguru import logger - -from lang_agent.rag.simple import SimpleRagConfig -from lang_agent.base import LangToolBase -from lang_agent.config import InstantiateConfig, ToolConfig -from lang_agent.dummy.calculator import Calculator, CalculatorConfig -from lang_agent.tool_manager import ToolManager, ToolManagerConfig - -# from catering_end.lang_tool import CartToolConfig, CartTool - -@tyro.conf.configure(tyro.conf.SuppressFixed) -@dataclass -class MCPServerConfig(InstantiateConfig): - _target: Type = field(default_factory=lambda: MCPServer) - - server_name:str = "langserver" - - host: str = "127.0.0.1" - """host of server""" - - port: int = 50051 - """port""" - - transport:Literal["stdio", "sse", "streamable-http"] = "streamable-http" - """transport method""" - - toolmanager_config: ToolManagerConfig = field(default_factory=ToolManagerConfig) - - -class MCPServer: - def __init__(self, config: MCPServerConfig): - self.config = config - self.mcp = FastMCP(self.config.server_name) - - self.populate_modules() - self.register_mcp_functions() - - def populate_modules(self): - self.tool_manager:ToolManager = self.config.toolmanager_config.setup() - - def register_mcp_functions(self): - - fncs = self.tool_manager.get_tool_fncs() - for fnc in fncs: - self.mcp.tool(fnc) - - - def run(self): - # 获取FastAPI应用实例 - app = self.mcp.http_app() - - # 配置CORS - app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], - ) - - self.mcp.run(transport=self.config.transport, - host=self.config.host, - port=self.config.port) - -if __name__ == "__main__": - conf:MCPServer = MCPServerConfig().setup() - tool_conf = conf._get_tool_config() - for e in tool_conf: - print(e) \ No newline at end of file diff --git a/scripts/run_agent_server.py b/scripts/run_agent_server.py deleted file mode 100644 index b8c84a0..0000000 --- a/scripts/run_agent_server.py +++ /dev/null @@ -1,16 +0,0 @@ -import tyro -import asyncio - -from lang_agent.pipeline import Pipeline, PipelineConfig -from lang_agent.config import load_tyro_conf - -def main(conf:PipelineConfig): - if conf.config_f is not None: - conf = load_tyro_conf(conf.config_f) - - pipeline:Pipeline = conf.setup() - asyncio.run(pipeline.start_server()) - - -if __name__ == "__main__": - main(tyro.cli(PipelineConfig)) \ No newline at end of file diff --git a/scripts/start_mcp_server.py b/scripts/start_mcp_server.py deleted file mode 100644 index ee83003..0000000 --- a/scripts/start_mcp_server.py +++ /dev/null @@ -1,20 +0,0 @@ -import tyro - -from lang_agent.mcp_server import MCPServerConfig, MCPServer - -# ### NOTE: some sanity check -# async def main(conf:MCPServerConfig): -# server: MCPServer = conf.setup() -# u = await server.mcp._mcp_call_tool("retrieve", {"query":"test"}) -# print(u) - -# import asyncio -# asyncio.run(main(tyro.cli(MCPServerConfig))) - - -def main(conf:MCPServerConfig): - server: MCPServer = conf.setup() - server.run() - -if __name__ == "__main__": - main(tyro.cli(MCPServerConfig)) \ No newline at end of file diff --git a/scripts/ws_start_all_mcps.py b/scripts/ws_start_all_mcps.py deleted file mode 100644 index 4e16a4c..0000000 --- a/scripts/ws_start_all_mcps.py +++ /dev/null @@ -1,84 +0,0 @@ -import json -import subprocess -import asyncio -import aiohttp -from loguru import logger - -async def start_mcps_simple(config): - """Simple function to start MCPs from config dictionary""" - processes = [] - - for name, mcp_config in config.items(): - transport = mcp_config.get("transport", "").lower() - - if transport == "stdio": - # Start stdio-based MCP - try: - command = mcp_config.get("command") - args = mcp_config.get("args", []) - cmd = [command] + args - - logger.info(f"Starting {name} with command: {' '.join(cmd)}") - process = subprocess.Popen( - cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True - ) - processes.append((name, process)) - logger.info(f"Started {name} (PID: {process.pid})") - - except Exception as e: - logger.error(f"Failed to start {name}: {e}") - - elif transport == "streamable_http": - # Check HTTP-based MCP - try: - url = mcp_config.get("url") - async with aiohttp.ClientSession() as session: - async with session.get(url, timeout=5) as response: - if response.status == 200: - logger.info(f"HTTP MCP {name} is accessible at {url}") - else: - logger.warning(f"HTTP MCP {name} returned status {response.status}") - except Exception as e: - logger.error(f"Failed to connect to {name}: {e}") - - return processes - -# Usage example -async def main_simple(config=None): - if config is None: - config = { - "calculator": { - "transport": "stdio", - "command": "python", - "args": ["lang_agent/calculator.py"], - } - } - - processes = await start_mcps_simple(config) - - print(f"Started {len(processes)} processes") - - # Keep running - try: - while True: - await asyncio.sleep(1) - except KeyboardInterrupt: - print("Stopping processes...") - for name, process in processes: - process.terminate() - logger.info(f"Stopped {name}") - - -def run_all_mcps(mcp_config_f="configs/mcp_config.json"): - with open(mcp_config_f, "r") as f: - config = json.load(f) - - asyncio.run(main_simple(config)) - -if __name__ == "__main__": - run_all_mcps() - # asyncio.run(main_simple()) diff --git a/scripts/ws_start_register_tools.py b/scripts/ws_start_register_tools.py deleted file mode 100644 index 41c44f7..0000000 --- a/scripts/ws_start_register_tools.py +++ /dev/null @@ -1,288 +0,0 @@ -""" -Simple MCP stdio <-> WebSocket pipe with optional unified config. -Version: 0.2.0 - -Usage (env): - export MCP_ENDPOINT= - # Windows (PowerShell): $env:MCP_ENDPOINT = "" - -Start server process(es) from config: -Run all configured servers (default) - python mcp_pipe.py - -Run a single local server script (back-compat) - python mcp_pipe.py path/to/server.py - -Config discovery order: - $MCP_CONFIG, then ./mcp_config.json - -Env overrides: - (none for proxy; uses current Python: python -m mcp_proxy) -""" - -import asyncio -import websockets -import subprocess -import logging -import os -import signal -import sys -import json -from dotenv import load_dotenv - -from lang_agent.config import mcp_langchain_to_ws_config - -# Auto-load environment variables from a .env file if present -load_dotenv() - -# Configure logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger('MCP_PIPE') - -# from loguru import logger -# Reconnection settings -INITIAL_BACKOFF = 1 # Initial wait time in seconds -MAX_BACKOFF = 600 # Maximum wait time in seconds - -async def connect_with_retry(uri, target): - """Connect to WebSocket server with retry mechanism for a given server target.""" - reconnect_attempt = 0 - backoff = INITIAL_BACKOFF - while True: # Infinite reconnection - try: - if reconnect_attempt > 0: - logger.info(f"[{target}] Waiting {backoff}s before reconnection attempt {reconnect_attempt}...") - await asyncio.sleep(backoff) - - # Attempt to connect - await connect_to_server(uri, target) - - except Exception as e: - reconnect_attempt += 1 - logger.warning(f"[{target}] Connection closed (attempt {reconnect_attempt}): {e}") - # Calculate wait time for next reconnection (exponential backoff) - backoff = min(backoff * 2, MAX_BACKOFF) - -async def connect_to_server(uri, target): - """Connect to WebSocket server and pipe stdio for the given server target.""" - try: - logger.info(f"[{target}] Connecting to WebSocket server...") - async with websockets.connect(uri) as websocket: - logger.info(f"[{target}] Successfully connected to WebSocket server") - - # Start server process (built from CLI arg or config) - cmd, env = build_server_command(target) - process = subprocess.Popen( - cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - encoding='utf-8', - text=True, - env=env - ) - logger.info(f"[{target}] Started server process: {' '.join(cmd)}") - - # Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket - await asyncio.gather( - pipe_websocket_to_process(websocket, process, target), - pipe_process_to_websocket(process, websocket, target), - pipe_process_stderr_to_terminal(process, target) - ) - except websockets.exceptions.ConnectionClosed as e: - logger.error(f"[{target}] WebSocket connection closed: {e}") - raise # Re-throw exception to trigger reconnection - except Exception as e: - logger.error(f"[{target}] Connection error: {e}") - raise # Re-throw exception - finally: - # Ensure the child process is properly terminated - if 'process' in locals(): - logger.info(f"[{target}] Terminating server process") - try: - process.terminate() - process.wait(timeout=5) - except subprocess.TimeoutExpired: - process.kill() - logger.info(f"[{target}] Server process terminated") - -async def pipe_websocket_to_process(websocket, process, target): - """Read data from WebSocket and write to process stdin""" - try: - while True: - # Read message from WebSocket - message = await websocket.recv() - logger.debug(f"[{target}] << {message[:120]}...") - - # Write to process stdin (in text mode) - if isinstance(message, bytes): - message = message.decode('utf-8') - process.stdin.write(message + '\n') - process.stdin.flush() - except Exception as e: - logger.error(f"[{target}] Error in WebSocket to process pipe: {e}") - raise # Re-throw exception to trigger reconnection - finally: - # Close process stdin - if not process.stdin.closed: - process.stdin.close() - -async def pipe_process_to_websocket(process, websocket, target): - """Read data from process stdout and send to WebSocket""" - try: - while True: - # Read data from process stdout - data = await asyncio.to_thread(process.stdout.readline) - - if not data: # If no data, the process may have ended - logger.info(f"[{target}] Process has ended output") - break - - # Send data to WebSocket - logger.debug(f"[{target}] >> {data[:120]}...") - # In text mode, data is already a string, no need to decode - await websocket.send(data) - except Exception as e: - logger.error(f"[{target}] Error in process to WebSocket pipe: {e}") - raise # Re-throw exception to trigger reconnection - -async def pipe_process_stderr_to_terminal(process, target): - """Read data from process stderr and print to terminal""" - try: - while True: - # Read data from process stderr - data = await asyncio.to_thread(process.stderr.readline) - - if not data: # If no data, the process may have ended - logger.info(f"[{target}] Process has ended stderr output") - break - - # Print stderr data to terminal (in text mode, data is already a string) - sys.stderr.write(data) - sys.stderr.flush() - except Exception as e: - logger.error(f"[{target}] Error in process stderr pipe: {e}") - raise # Re-throw exception to trigger reconnection - -def signal_handler(sig, frame): - """Handle interrupt signals""" - logger.info("Received interrupt signal, shutting down...") - sys.exit(0) - -def load_config(): - """Load JSON config from $MCP_CONFIG or ./mcp_config.json. Return dict or {}.""" - # path = os.environ.get("MCP_CONFIG") or os.path.join(os.getcwd(), "mcp_config.json") - # path = "configs/mcp_config.json" - path = "configs/ws_mcp_config.json" - if not os.path.exists(path): - return {} - try: - with open(path, "r", encoding="utf-8") as f: - # return mcp_langchain_to_ws_config(json.load(f)) - return json.load(f) - except Exception as e: - logger.warning(f"Failed to load config {path}: {e}") - return {} - - -def build_server_command(target=None): - """Build [cmd,...] and env for the server process for a given target. - - Priority: - - If target matches a server in config.mcpServers: use its definition - - Else: treat target as a Python script path (back-compat) - If target is None, read from sys.argv[1]. - """ - if target is None: - assert len(sys.argv) >= 2, "missing server name or script path" - target = sys.argv[1] - cfg = load_config() - servers = cfg.get("mcpServers", {}) if isinstance(cfg, dict) else {} - - if target in servers: - entry = servers[target] or {} - if entry.get("disabled"): - raise RuntimeError(f"Server '{target}' is disabled in config") - typ = (entry.get("type") or entry.get("transportType") or "stdio").lower() - - # environment for child process - child_env = os.environ.copy() - for k, v in (entry.get("env") or {}).items(): - child_env[str(k)] = str(v) - - if typ == "stdio": - command = entry.get("command") - args = entry.get("args") or [] - if not command: - raise RuntimeError(f"Server '{target}' is missing 'command'") - return [command, *args], child_env - - if typ in ("sse", "http", "streamablehttp"): - url = entry.get("url") - if not url: - raise RuntimeError(f"Server '{target}' (type {typ}) is missing 'url'") - # Unified approach: always use current Python to run mcp-proxy module - cmd = [sys.executable, "-m", "mcp_proxy"] - if typ in ("http", "streamablehttp"): - cmd += ["--transport", "streamablehttp"] - # optional headers: {"Authorization": "Bearer xxx"} - headers = entry.get("headers") or {} - for hk, hv in headers.items(): - cmd += ["-H", hk, str(hv)] - cmd.append(url) - return cmd, child_env - - raise RuntimeError(f"Unsupported server type: {typ}") - - # Fallback to script path (back-compat) - script_path = target - if not os.path.exists(script_path): - raise RuntimeError( - f"'{target}' is neither a configured server nor an existing script" - ) - return [sys.executable, script_path], os.environ.copy() - -if __name__ == "__main__": - # Register signal handler - signal.signal(signal.SIGINT, signal_handler) - - # Get token from environment variable or command line arguments - endpoint_url = os.environ.get('MCP_ENDPOINT') - if not endpoint_url: - logger.error("Please set the `MCP_ENDPOINT` environment variable") - sys.exit(1) - - # Determine target: default to all if no arg; single target otherwise - target_arg = sys.argv[1] if len(sys.argv) >= 2 else None - - async def _main(): - if not target_arg: - cfg = load_config() - servers_cfg = (cfg.get("mcpServers") or {}) - all_servers = list(servers_cfg.keys()) - enabled = [name for name, entry in servers_cfg.items() if not (entry or {}).get("disabled")] - skipped = [name for name in all_servers if name not in enabled] - if skipped: - logger.info(f"Skipping disabled servers: {', '.join(skipped)}") - if not enabled: - raise RuntimeError("No enabled mcpServers found in config") - logger.info(f"Starting servers: {', '.join(enabled)}") - tasks = [asyncio.create_task(connect_with_retry(endpoint_url, t)) for t in enabled] - # Run all forever; if any crashes it will auto-retry inside - await asyncio.gather(*tasks) - else: - if os.path.exists(target_arg): - await connect_with_retry(endpoint_url, target_arg) - else: - logger.error("Argument must be a local Python script path. To run configured servers, run without arguments.") - sys.exit(1) - - try: - asyncio.run(_main()) - except KeyboardInterrupt: - logger.info("Program interrupted by user") - except Exception as e: - logger.error(f"Program execution error: {e}")