diff --git a/fastapi_server/front_apis.py b/fastapi_server/front_apis.py index d72cdaf..f343be2 100644 --- a/fastapi_server/front_apis.py +++ b/fastapi_server/front_apis.py @@ -1,11 +1,9 @@ -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Any import commentjson import os import os.path as osp -import secrets -import subprocess import sys -import uuid +import json from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware @@ -15,11 +13,12 @@ from pydantic import BaseModel, Field sys.path.append(osp.dirname(osp.dirname(osp.abspath(__file__)))) from lang_agent.config.db_config_manager import DBConfigManager -from lang_agent.front_api.build_server_utils import GRAPH_BUILD_FNCS +from lang_agent.front_api.build_server_utils import GRAPH_BUILD_FNCS, update_pipeline_registry _PROJECT_ROOT = osp.dirname(osp.dirname(osp.abspath(__file__))) _MCP_CONFIG_PATH = osp.join(_PROJECT_ROOT, "configs", "mcp_config.json") _MCP_CONFIG_DEFAULT_CONTENT = "{\n}\n" +_PIPELINE_REGISTRY_PATH = osp.join(_PROJECT_ROOT, "configs", "pipeline_registry.json") class GraphConfigUpsertRequest(BaseModel): graph_id: str @@ -72,6 +71,9 @@ class PipelineCreateRequest(BaseModel): api_key: str entry_point: str = Field(default="fastapi_server/server_dashscope.py") llm_name: str = Field(default="qwen-plus") + route_id: Optional[str] = Field(default=None) + enabled: bool = Field(default=True) + prompt_pipeline_id: Optional[str] = Field(default=None) class PipelineCreateResponse(BaseModel): run_id: str @@ -85,6 +87,11 @@ class PipelineCreateResponse(BaseModel): auth_header_name: str auth_key_once: str auth_key_masked: str + route_id: str + enabled: bool + config_file: str + reload_required: bool + registry_path: str class PipelineRunInfo(BaseModel): run_id: str @@ -97,6 +104,9 @@ class PipelineRunInfo(BaseModel): auth_type: str auth_header_name: str auth_key_masked: str + route_id: str + enabled: bool + config_file: Optional[str] = Field(default=None) class PipelineListResponse(BaseModel): items: List[PipelineRunInfo] @@ -105,6 +115,9 @@ class PipelineListResponse(BaseModel): class PipelineStopResponse(BaseModel): run_id: str status: str + route_id: str + enabled: bool + reload_required: bool class McpConfigReadResponse(BaseModel): path: str @@ -134,28 +147,7 @@ app.add_middleware( ) _db = DBConfigManager() -_running_pipelines: Dict[str, Dict[str, object]] = {} - - -def _generate_auth_key() -> str: - return f"agk_{secrets.token_urlsafe(24)}" - - -def _mask_auth_key(value: str) -> str: - if not value: - return "" - if len(value) <= 10: - return value - return f"{value[:5]}...{value[-5:]}" - -def _prune_stopped_pipelines() -> None: - stale_ids: List[str] = [] - for run_id, info in _running_pipelines.items(): - proc = info["proc"] - if proc.poll() is not None: - stale_ids.append(run_id) - for run_id in stale_ids: - _running_pipelines.pop(run_id, None) +_DASHSCOPE_URL = os.environ.get("FAST_DASHSCOPE_URL", "http://127.0.0.1:8588") @app.get("/health") @@ -175,9 +167,9 @@ async def root(): "/v1/graph-configs/{pipeline_id}/{prompt_set_id} (GET)", "/v1/graph-configs/{pipeline_id}/{prompt_set_id} (DELETE)", "/v1/pipelines/graphs (GET)", - "/v1/pipelines (POST)", - "/v1/pipelines (GET)", - "/v1/pipelines/{run_id} (DELETE)", + "/v1/pipelines (POST) - upsert route registry entry", + "/v1/pipelines (GET) - list route registry entries", + "/v1/pipelines/{route_id} (DELETE) - disable route", "/v1/tool-configs/mcp (GET)", "/v1/tool-configs/mcp (PUT)", ], @@ -200,6 +192,26 @@ def _read_mcp_config_raw() -> str: return f.read() +def _read_pipeline_registry() -> Dict[str, Any]: + if not osp.exists(_PIPELINE_REGISTRY_PATH): + os.makedirs(osp.dirname(_PIPELINE_REGISTRY_PATH), exist_ok=True) + with open(_PIPELINE_REGISTRY_PATH, "w", encoding="utf-8") as f: + json.dump({"routes": {}, "api_keys": {}}, f, indent=2) + with open(_PIPELINE_REGISTRY_PATH, "r", encoding="utf-8") as f: + registry = json.load(f) + routes = registry.get("routes") + if not isinstance(routes, dict): + raise ValueError("`routes` in pipeline registry must be an object") + return registry + + +def _write_pipeline_registry(registry: Dict[str, Any]) -> None: + os.makedirs(osp.dirname(_PIPELINE_REGISTRY_PATH), exist_ok=True) + with open(_PIPELINE_REGISTRY_PATH, "w", encoding="utf-8") as f: + json.dump(registry, f, indent=2) + f.write("\n") + + @app.post("/v1/graph-configs", response_model=GraphConfigUpsertResponse) async def upsert_graph_config(body: GraphConfigUpsertRequest): try: @@ -357,22 +369,36 @@ async def update_mcp_tool_config(body: McpConfigUpdateRequest): @app.get("/v1/pipelines", response_model=PipelineListResponse) async def list_running_pipelines(): - _prune_stopped_pipelines() - items = [ - PipelineRunInfo( - run_id=run_id, - pid=info["proc"].pid, - graph_id=info["graph_id"], - pipeline_id=info["pipeline_id"], - prompt_set_id=info["prompt_set_id"], - url=info["url"], - port=info["port"], - auth_type="bearer", - auth_header_name="Authorization", - auth_key_masked=info.get("auth_key_masked", ""), + try: + registry = _read_pipeline_registry() + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + items: List[PipelineRunInfo] = [] + routes = registry.get("routes", {}) + for route_id, spec in sorted(routes.items()): + if not isinstance(spec, dict): + continue + enabled = bool(spec.get("enabled", True)) + items.append( + PipelineRunInfo( + run_id=route_id, + pid=-1, + graph_id=str(spec.get("graph_id") or route_id), + pipeline_id=str(spec.get("prompt_pipeline_id") or route_id), + prompt_set_id="default", + url=_DASHSCOPE_URL, + port=-1, + auth_type="bearer", + auth_header_name="Authorization", + auth_key_masked="", + route_id=route_id, + enabled=enabled, + config_file=spec.get("config_file"), + ) ) - for run_id, info in _running_pipelines.items() - ] return PipelineListResponse(items=items, count=len(items)) @@ -385,60 +411,85 @@ async def create_pipeline(body: PipelineCreateRequest): detail=f"Unknown graph_id '{body.graph_id}'. Valid options: {sorted(GRAPH_BUILD_FNCS.keys())}", ) - auth_key = _generate_auth_key() - auth_key_masked = _mask_auth_key(auth_key) + route_id = (body.route_id or body.pipeline_id).strip() + if not route_id: + raise HTTPException(status_code=400, detail="route_id or pipeline_id is required") + prompt_pipeline_id = (body.prompt_pipeline_id or body.pipeline_id).strip() + if not prompt_pipeline_id: + raise HTTPException(status_code=400, detail="prompt_pipeline_id or pipeline_id is required") + config_file = f"configs/pipelines/{route_id}.yml" + config_abs_dir = osp.join(_PROJECT_ROOT, "configs", "pipelines") try: - proc, url = build_fn( - pipeline_id=body.pipeline_id, + build_fn( + pipeline_id=prompt_pipeline_id, prompt_set=body.prompt_set_id, tool_keys=body.tool_keys, - port=str(body.port), api_key=body.api_key, - fast_auth_keys=auth_key, - entry_pnt=body.entry_point, llm_name=body.llm_name, + pipeline_config_dir=config_abs_dir, + ) + generated_config_file = f"configs/pipelines/{prompt_pipeline_id}.yml" + if prompt_pipeline_id != route_id: + # Keep runtime route_id and config_file aligned for lazy loading by route. + src = osp.join(config_abs_dir, f"{prompt_pipeline_id}.yml") + dst = osp.join(config_abs_dir, f"{route_id}.yml") + if osp.exists(src): + with open(src, "r", encoding="utf-8") as rf, open(dst, "w", encoding="utf-8") as wf: + wf.write(rf.read()) + generated_config_file = config_file + + update_pipeline_registry( + pipeline_id=route_id, + prompt_set=prompt_pipeline_id, + graph_id=body.graph_id, + config_file=generated_config_file, + llm_name=body.llm_name, + enabled=body.enabled, + registry_f=_PIPELINE_REGISTRY_PATH, ) except Exception as e: - raise HTTPException(status_code=500, detail=f"Failed to start pipeline: {e}") - - run_id = str(uuid.uuid4()) - _running_pipelines[run_id] = { - "proc": proc, - "graph_id": body.graph_id, - "pipeline_id": body.pipeline_id, - "prompt_set_id": body.prompt_set_id, - "url": url, - "port": body.port, - "auth_key_masked": auth_key_masked, - } + raise HTTPException(status_code=500, detail=f"Failed to register route: {e}") return PipelineCreateResponse( - run_id=run_id, - pid=proc.pid, + run_id=route_id, + pid=-1, graph_id=body.graph_id, - pipeline_id=body.pipeline_id, + pipeline_id=prompt_pipeline_id, prompt_set_id=body.prompt_set_id, - url=url, - port=body.port, + url=_DASHSCOPE_URL, + port=-1, auth_type="bearer", auth_header_name="Authorization", - auth_key_once=auth_key, - auth_key_masked=auth_key_masked, + auth_key_once="", + auth_key_masked="", + route_id=route_id, + enabled=body.enabled, + config_file=config_file, + reload_required=True, + registry_path=_PIPELINE_REGISTRY_PATH, ) -@app.delete("/v1/pipelines/{run_id}", response_model=PipelineStopResponse) -async def stop_pipeline(run_id: str): - info = _running_pipelines.pop(run_id, None) - if info is None: - raise HTTPException(status_code=404, detail=f"run_id '{run_id}' not found") +@app.delete("/v1/pipelines/{route_id}", response_model=PipelineStopResponse) +async def stop_pipeline(route_id: str): + try: + registry = _read_pipeline_registry() + routes = registry.get("routes", {}) + spec = routes.get(route_id) + if not isinstance(spec, dict): + raise HTTPException(status_code=404, detail=f"route_id '{route_id}' not found") + spec["enabled"] = False + _write_pipeline_registry(registry) + except HTTPException: + raise + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) - proc = info["proc"] - if proc.poll() is None: - proc.terminate() - try: - proc.wait(timeout=5) - except subprocess.TimeoutExpired: - proc.kill() - proc.wait(timeout=5) - - return PipelineStopResponse(run_id=run_id, status="stopped") + return PipelineStopResponse( + run_id=route_id, + status="disabled", + route_id=route_id, + enabled=False, + reload_required=True, + )