from typing import List import subprocess def _build_template(graph:str, pipeline_id:str, prompt_set:str, tool_keys:List[str], port:str, entry_pnt:str="fastapi_server/server_dashscope.py", llm_name:str="qwen-plus"): cmd = [ "python", entry_pnt, "--llm-name", llm_name, "--port", str(port), graph, "--pipeline-id", pipeline_id, "--prompt-set-id", prompt_set, ] if tool_keys: cmd.extend( ["--tool-manager-config.client-tool-manager.tool-keys", *tool_keys] ) sv_prc = subprocess.Popen(cmd) return sv_prc, f"http://0.0.0.0:{port}" def build_route(pipeline_id:str, prompt_set:str, tool_keys:List[str], port:str, entry_pnt:str="fastapi_server/server_dashscope.py", llm_name:str="qwen-plus"): return _build_template("route", pipeline_id, prompt_set, tool_keys, port, entry_pnt, llm_name) def build_react(pipeline_id:str, prompt_set:str, tool_keys:List[str], port:str, entry_pnt:str="fastapi_server/server_dashscope.py", llm_name:str="qwen-plus"): return _build_template("react", pipeline_id, prompt_set, tool_keys, port, entry_pnt, llm_name) # {pipeline_id: build_function} GRAPH_BUILD_FNCS = { "routing": build_route, "react": build_react, }