From 9c0744e1bc74aa4fbd1d8d9bc3762e34a006142d Mon Sep 17 00:00:00 2001 From: goulustis Date: Wed, 4 Mar 2026 16:04:31 +0800 Subject: [PATCH] update front_apis --- fastapi_server/front_apis.py | 320 +++++++++++++++++++++++++++-------- 1 file changed, 248 insertions(+), 72 deletions(-) diff --git a/fastapi_server/front_apis.py b/fastapi_server/front_apis.py index 82e8822..9d599cc 100644 --- a/fastapi_server/front_apis.py +++ b/fastapi_server/front_apis.py @@ -76,59 +76,67 @@ class PipelineCreateRequest(BaseModel): pipeline_id: str prompt_set_id: str tool_keys: List[str] = Field(default_factory=list) - port: int - api_key: str - entry_point: str = Field(default="fastapi_server/server_dashscope.py") + api_key: Optional[str] = Field(default=None) llm_name: str = Field(default="qwen-plus") enabled: bool = Field(default=True) -class PipelineCreateResponse(BaseModel): - run_id: str - pid: int - graph_id: str +class PipelineSpec(BaseModel): pipeline_id: str - prompt_set_id: str - url: str - port: int - auth_type: str - auth_header_name: str - auth_key_once: str - auth_key_masked: str + graph_id: str enabled: bool config_file: str + llm_name: str + overrides: Dict[str, Any] = Field(default_factory=dict) + + +class PipelineCreateResponse(BaseModel): + pipeline_id: str + prompt_set_id: str + graph_id: str + config_file: str + llm_name: str + enabled: bool reload_required: bool registry_path: str -class PipelineRunInfo(BaseModel): - run_id: str - pid: int - graph_id: str - pipeline_id: str - prompt_set_id: str - url: str - port: int - auth_type: str - auth_header_name: str - auth_key_masked: str - enabled: bool - config_file: Optional[str] = Field(default=None) - - class PipelineListResponse(BaseModel): - items: List[PipelineRunInfo] + items: List[PipelineSpec] count: int class PipelineStopResponse(BaseModel): - run_id: str - status: str pipeline_id: str + status: str enabled: bool reload_required: bool +class ApiKeyPolicyItem(BaseModel): + api_key: str + default_pipeline_id: Optional[str] = Field(default=None) + allowed_pipeline_ids: List[str] = Field(default_factory=list) + app_id: Optional[str] = Field(default=None) + + +class ApiKeyPolicyListResponse(BaseModel): + items: List[ApiKeyPolicyItem] + count: int + + +class ApiKeyPolicyUpsertRequest(BaseModel): + default_pipeline_id: Optional[str] = Field(default=None) + allowed_pipeline_ids: List[str] = Field(default_factory=list) + app_id: Optional[str] = Field(default=None) + + +class ApiKeyPolicyDeleteResponse(BaseModel): + api_key: str + status: str + reload_required: bool + + class McpConfigReadResponse(BaseModel): path: str raw_content: str @@ -159,7 +167,6 @@ app.add_middleware( ) _db = DBConfigManager() -_DASHSCOPE_URL = os.environ.get("FAST_DASHSCOPE_URL", "http://127.0.0.1:8588") @app.get("/health") @@ -179,9 +186,12 @@ async def root(): "/v1/graph-configs/{pipeline_id}/{prompt_set_id} (GET)", "/v1/graph-configs/{pipeline_id}/{prompt_set_id} (DELETE)", "/v1/pipelines/graphs (GET)", - "/v1/pipelines (POST) - upsert route registry entry", - "/v1/pipelines (GET) - list route registry entries", - "/v1/pipelines/{route_id} (DELETE) - disable route", + "/v1/pipelines (POST) - build config + upsert pipeline registry entry", + "/v1/pipelines (GET) - list registry pipeline specs", + "/v1/pipelines/{pipeline_id} (DELETE) - disable pipeline in registry", + "/v1/pipelines/api-keys (GET) - list API key routing policies", + "/v1/pipelines/api-keys/{api_key} (PUT) - upsert API key routing policy", + "/v1/pipelines/api-keys/{api_key} (DELETE) - delete API key routing policy", "/v1/tool-configs/mcp (GET)", "/v1/tool-configs/mcp (PUT)", ], @@ -214,6 +224,11 @@ def _read_pipeline_registry() -> Dict[str, Any]: pipelines = registry.get("pipelines") if not isinstance(pipelines, dict): raise ValueError("`pipelines` in pipeline registry must be an object") + api_keys = registry.get("api_keys") + if api_keys is None: + registry["api_keys"] = {} + elif not isinstance(api_keys, dict): + raise ValueError("`api_keys` in pipeline registry must be an object") return registry @@ -224,6 +239,55 @@ def _write_pipeline_registry(registry: Dict[str, Any]) -> None: f.write("\n") +def _normalize_pipeline_spec(pipeline_id: str, spec: Dict[str, Any]) -> PipelineSpec: + if not isinstance(spec, dict): + raise ValueError(f"pipeline spec for '{pipeline_id}' must be an object") + overrides = spec.get("overrides", {}) + if overrides is None: + overrides = {} + if not isinstance(overrides, dict): + raise ValueError(f"`overrides` for pipeline '{pipeline_id}' must be an object") + llm_name = str(overrides.get("llm_name") or "unknown") + return PipelineSpec( + pipeline_id=pipeline_id, + graph_id=str(spec.get("graph_id") or pipeline_id), + enabled=bool(spec.get("enabled", True)), + config_file=str(spec.get("config_file") or ""), + llm_name=llm_name, + overrides=overrides, + ) + + +def _normalize_api_key_policy(api_key: str, policy: Dict[str, Any]) -> ApiKeyPolicyItem: + if not isinstance(policy, dict): + raise ValueError(f"api key policy for '{api_key}' must be an object") + allowed = policy.get("allowed_pipeline_ids") or [] + if not isinstance(allowed, list): + raise ValueError( + f"`allowed_pipeline_ids` for api key '{api_key}' must be a list" + ) + cleaned_allowed = [] + seen = set() + for pid in allowed: + pipeline_id = str(pid).strip() + if not pipeline_id or pipeline_id in seen: + continue + seen.add(pipeline_id) + cleaned_allowed.append(pipeline_id) + default_pipeline_id = policy.get("default_pipeline_id") + if default_pipeline_id is not None: + default_pipeline_id = str(default_pipeline_id).strip() or None + app_id = policy.get("app_id") + if app_id is not None: + app_id = str(app_id).strip() or None + return ApiKeyPolicyItem( + api_key=api_key, + default_pipeline_id=default_pipeline_id, + allowed_pipeline_ids=cleaned_allowed, + app_id=app_id, + ) + + @app.post("/v1/graph-configs", response_model=GraphConfigUpsertResponse) async def upsert_graph_config(body: GraphConfigUpsertRequest): try: @@ -403,28 +467,10 @@ async def list_running_pipelines(): except Exception as e: raise HTTPException(status_code=500, detail=str(e)) - items: List[PipelineRunInfo] = [] + items: List[PipelineSpec] = [] pipelines = registry.get("pipelines", {}) for pipeline_id, spec in sorted(pipelines.items()): - if not isinstance(spec, dict): - continue - enabled = bool(spec.get("enabled", True)) - items.append( - PipelineRunInfo( - run_id=pipeline_id, - pid=-1, - graph_id=str(spec.get("graph_id") or pipeline_id), - pipeline_id=pipeline_id, - prompt_set_id="default", - url=_DASHSCOPE_URL, - port=-1, - auth_type="bearer", - auth_header_name="Authorization", - auth_key_masked="", - enabled=enabled, - config_file=spec.get("config_file"), - ) - ) + items.append(_normalize_pipeline_spec(pipeline_id, spec)) return PipelineListResponse(items=items, count=len(items)) @@ -440,14 +486,38 @@ async def create_pipeline(body: PipelineCreateRequest): pipeline_id = body.pipeline_id.strip() if not pipeline_id: raise HTTPException(status_code=400, detail="pipeline_id is required") + prompt_set_id = body.prompt_set_id.strip() + if not prompt_set_id: + raise HTTPException(status_code=400, detail="prompt_set_id is required") + + resolved_api_key = (body.api_key or "").strip() + if not resolved_api_key: + meta = _db.get_prompt_set(pipeline_id=pipeline_id, prompt_set_id=prompt_set_id) + if meta is None: + raise HTTPException( + status_code=400, + detail=( + f"prompt_set_id '{prompt_set_id}' not found for pipeline '{pipeline_id}', " + "and request api_key is empty" + ), + ) + resolved_api_key = str(meta.get("api_key") or "").strip() + if not resolved_api_key: + raise HTTPException( + status_code=400, + detail=( + "api_key is required either in request body or in prompt set metadata" + ), + ) + config_file = f"configs/pipelines/{pipeline_id}.yml" config_abs_dir = osp.join(_PROJECT_ROOT, "configs", "pipelines") try: build_fn( pipeline_id=pipeline_id, - prompt_set=body.prompt_set_id, + prompt_set=prompt_set_id, tool_keys=body.tool_keys, - api_key=body.api_key, + api_key=resolved_api_key, llm_name=body.llm_name, pipeline_config_dir=config_abs_dir, ) @@ -463,20 +533,26 @@ async def create_pipeline(body: PipelineCreateRequest): except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to register pipeline: {e}") + try: + registry = _read_pipeline_registry() + pipeline_spec = registry.get("pipelines", {}).get(pipeline_id) + if pipeline_spec is None: + raise ValueError( + f"pipeline '{pipeline_id}' missing from registry after update" + ) + normalized = _normalize_pipeline_spec(pipeline_id, pipeline_spec) + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Failed to read pipeline registry after update: {e}" + ) + return PipelineCreateResponse( - run_id=pipeline_id, - pid=-1, - graph_id=body.graph_id, pipeline_id=pipeline_id, - prompt_set_id=body.prompt_set_id, - url=_DASHSCOPE_URL, - port=-1, - auth_type="bearer", - auth_header_name="Authorization", - auth_key_once="", - auth_key_masked="", - enabled=body.enabled, - config_file=config_file, + prompt_set_id=prompt_set_id, + graph_id=normalized.graph_id, + config_file=normalized.config_file, + llm_name=normalized.llm_name, + enabled=normalized.enabled, reload_required=True, registry_path=_PIPELINE_REGISTRY_PATH, ) @@ -502,9 +578,109 @@ async def stop_pipeline(pipeline_id: str): raise HTTPException(status_code=500, detail=str(e)) return PipelineStopResponse( - run_id=pipeline_id, - status="disabled", pipeline_id=pipeline_id, + status="disabled", enabled=False, reload_required=True, ) + + +@app.get("/v1/pipelines/api-keys", response_model=ApiKeyPolicyListResponse) +async def list_pipeline_api_keys(): + try: + registry = _read_pipeline_registry() + api_keys = registry.get("api_keys", {}) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + items: List[ApiKeyPolicyItem] = [] + for api_key, policy in sorted(api_keys.items()): + items.append(_normalize_api_key_policy(str(api_key), policy)) + return ApiKeyPolicyListResponse(items=items, count=len(items)) + + +@app.put( + "/v1/pipelines/api-keys/{api_key}", + response_model=ApiKeyPolicyItem, +) +async def upsert_pipeline_api_key_policy(api_key: str, body: ApiKeyPolicyUpsertRequest): + normalized_key = api_key.strip() + if not normalized_key: + raise HTTPException(status_code=400, detail="api_key path parameter is required") + try: + registry = _read_pipeline_registry() + pipelines = registry.get("pipelines", {}) + if not isinstance(pipelines, dict): + raise ValueError("`pipelines` in pipeline registry must be an object") + known_pipeline_ids = set(pipelines.keys()) + + allowed = [] + seen = set() + for pipeline_id in body.allowed_pipeline_ids: + cleaned = str(pipeline_id).strip() + if not cleaned or cleaned in seen: + continue + if cleaned not in known_pipeline_ids: + raise ValueError( + f"unknown pipeline_id '{cleaned}' in allowed_pipeline_ids" + ) + seen.add(cleaned) + allowed.append(cleaned) + + default_pipeline_id = body.default_pipeline_id + if default_pipeline_id is not None: + default_pipeline_id = default_pipeline_id.strip() or None + if default_pipeline_id and default_pipeline_id not in known_pipeline_ids: + raise ValueError(f"unknown default_pipeline_id '{default_pipeline_id}'") + + app_id = body.app_id.strip() if body.app_id else None + policy: Dict[str, Any] = {} + if default_pipeline_id: + policy["default_pipeline_id"] = default_pipeline_id + if allowed: + policy["allowed_pipeline_ids"] = allowed + if app_id: + policy["app_id"] = app_id + + registry.setdefault("api_keys", {})[normalized_key] = policy + _write_pipeline_registry(registry) + return _normalize_api_key_policy(normalized_key, policy) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@app.delete( + "/v1/pipelines/api-keys/{api_key}", + response_model=ApiKeyPolicyDeleteResponse, +) +async def delete_pipeline_api_key_policy(api_key: str): + normalized_key = api_key.strip() + if not normalized_key: + raise HTTPException(status_code=400, detail="api_key path parameter is required") + try: + registry = _read_pipeline_registry() + api_keys = registry.get("api_keys", {}) + if normalized_key not in api_keys: + raise HTTPException( + status_code=404, detail=f"api_key '{normalized_key}' not found" + ) + del api_keys[normalized_key] + _write_pipeline_registry(registry) + except HTTPException: + raise + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + return ApiKeyPolicyDeleteResponse( + api_key=normalized_key, + status="deleted", + reload_required=True, + )