From 4974ca936cca3cdcc79be520aa99b310e230fe02 Mon Sep 17 00:00:00 2001 From: goulustis Date: Tue, 3 Mar 2026 15:44:11 +0800 Subject: [PATCH] dashscope pipeline manages multiple pipelines --- fastapi_server/server_dashscope.py | 143 ++--------------------------- 1 file changed, 9 insertions(+), 134 deletions(-) diff --git a/fastapi_server/server_dashscope.py b/fastapi_server/server_dashscope.py index 76aae23..a5087eb 100644 --- a/fastapi_server/server_dashscope.py +++ b/fastapi_server/server_dashscope.py @@ -20,6 +20,7 @@ sys.path.append(osp.dirname(osp.dirname(osp.abspath(__file__)))) from lang_agent.pipeline import Pipeline, PipelineConfig from lang_agent.config.core_config import load_tyro_conf +from lang_agent.components.server_pipeline_manager import ServerPipelineManager # Initialize default pipeline once (used when no explicit pipeline id is provided) pipeline_config = tyro.cli(PipelineConfig) @@ -35,136 +36,8 @@ REGISTRY_FILE = os.environ.get( ) -class PipelineManager: - """Lazily load and cache multiple pipelines keyed by a client-facing id.""" - - def __init__(self, default_pipeline_id: str, default_config: PipelineConfig, default_pipeline: Pipeline): - self.default_pipeline_id = default_pipeline_id - self.default_config = default_config - self._pipeline_specs: Dict[str, Dict[str, Any]] = {} - self._api_key_policy: Dict[str, Dict[str, Any]] = {} - self._pipelines: Dict[str, Pipeline] = {default_pipeline_id: default_pipeline} - self._pipeline_llm: Dict[str, str] = {default_pipeline_id: default_config.llm_name} - self._pipeline_specs[default_pipeline_id] = {"enabled": True, "config_file": None} - - def _resolve_registry_path(self, registry_path: str) -> str: - path = FsPath(registry_path) - if path.is_absolute(): - return str(path) - root = FsPath(osp.dirname(osp.dirname(osp.abspath(__file__)))) - return str((root / path).resolve()) - - def load_registry(self, registry_path: str) -> None: - abs_path = self._resolve_registry_path(registry_path) - if not osp.exists(abs_path): - logger.warning(f"pipeline registry file not found: {abs_path}. Using default pipeline only.") - return - - with open(abs_path, "r", encoding="utf-8") as f: - registry:dict = json.load(f) - - pipelines = registry.get("pipelines", {}) - if not isinstance(pipelines, dict): - raise ValueError("`pipelines` in pipeline registry must be an object.") - - for pipeline_id, spec in pipelines.items(): - if not isinstance(spec, dict): - raise ValueError(f"pipeline spec for `{pipeline_id}` must be an object.") - self._pipeline_specs[pipeline_id] = { - "enabled": bool(spec.get("enabled", True)), - "config_file": spec.get("config_file"), - "overrides": spec.get("overrides", {}), - } - - api_key_policy = registry.get("api_keys", {}) - if api_key_policy and not isinstance(api_key_policy, dict): - raise ValueError("`api_keys` in pipeline registry must be an object.") - self._api_key_policy = api_key_policy - logger.info(f"loaded pipeline registry: {abs_path}, pipelines={list(self._pipeline_specs.keys())}") - - def _resolve_config_path(self, config_file: str) -> str: - path = FsPath(config_file) - if path.is_absolute(): - return str(path) - root = FsPath(osp.dirname(osp.dirname(osp.abspath(__file__)))) - return str((root / path).resolve()) - - def _build_pipeline(self, pipeline_id: str) -> Tuple[Pipeline, str]: - spec = self._pipeline_specs.get(pipeline_id) - if spec is None: - raise HTTPException(status_code=404, detail=f"Unknown pipeline_id: {pipeline_id}") - if not spec.get("enabled", True): - raise HTTPException(status_code=403, detail=f"Pipeline disabled: {pipeline_id}") - - config_file = spec.get("config_file") - overrides = spec.get("overrides", {}) - if not config_file and not overrides: - # default pipeline - p = self._pipelines[self.default_pipeline_id] - llm_name = self._pipeline_llm[self.default_pipeline_id] - return p, llm_name - - if config_file: - cfg = load_tyro_conf(self._resolve_config_path(config_file)) - else: - # Build from default config + shallow overrides so new pipelines can be - # added via registry without additional yaml files. - cfg = copy.deepcopy(self.default_config) - if not isinstance(overrides, dict): - raise ValueError(f"pipeline `overrides` for `{pipeline_id}` must be an object.") - for key, value in overrides.items(): - if not hasattr(cfg, key): - raise ValueError(f"unknown override field `{key}` for pipeline `{pipeline_id}`") - setattr(cfg, key, value) - - p = cfg.setup() - llm_name = getattr(cfg, "llm_name", "unknown-model") - return p, llm_name - - def _authorize(self, api_key: str, pipeline_id: str) -> None: - if not self._api_key_policy: - return - - policy = self._api_key_policy.get(api_key) - if policy is None: - return - - allowed = policy.get("allowed_pipeline_ids") - if allowed and pipeline_id not in allowed: - raise HTTPException(status_code=403, detail=f"pipeline_id `{pipeline_id}` is not allowed for this API key") - - def resolve_pipeline_id(self, body: Dict[str, Any], app_id: Optional[str], api_key: str) -> str: - body_input = body.get("input", {}) - pipeline_id = ( - body.get("pipeline_id") - or (body_input.get("pipeline_id") if isinstance(body_input, dict) else None) - or app_id - ) - - if not pipeline_id: - key_policy = self._api_key_policy.get(api_key, {}) if self._api_key_policy else {} - pipeline_id = key_policy.get("default_pipeline_id", self.default_pipeline_id) - - if pipeline_id not in self._pipeline_specs: - raise HTTPException(status_code=404, detail=f"Unknown pipeline_id: {pipeline_id}") - - self._authorize(api_key, pipeline_id) - return pipeline_id - - def get_pipeline(self, pipeline_id: str) -> Tuple[Pipeline, str]: - cached = self._pipelines.get(pipeline_id) - if cached is not None: - return cached, self._pipeline_llm[pipeline_id] - - pipeline_obj, llm_name = self._build_pipeline(pipeline_id) - self._pipelines[pipeline_id] = pipeline_obj - self._pipeline_llm[pipeline_id] = llm_name - logger.info(f"lazy-loaded pipeline_id={pipeline_id} model={llm_name}") - return pipeline_obj, llm_name - - -PIPELINE_MANAGER = PipelineManager( - default_pipeline_id=os.environ.get("FAST_DEFAULT_PIPELINE_ID", "default"), +PIPELINE_MANAGER = ServerPipelineManager( + default_route_id=os.environ.get("FAST_DEFAULT_ROUTE_ID", os.environ.get("FAST_DEFAULT_PIPELINE_ID", "default")), default_config=pipeline_config, default_pipeline=pipeline, ) @@ -334,11 +207,11 @@ async def _process_dashscope_request( thread_id = body_input.get("session_id") or req_session_id or "3" user_msg = _extract_user_message(messages) - pipeline_id = PIPELINE_MANAGER.resolve_pipeline_id(body=body, app_id=req_app_id, api_key=api_key) - selected_pipeline, selected_model = PIPELINE_MANAGER.get_pipeline(pipeline_id) + route_id = PIPELINE_MANAGER.resolve_route_id(body=body, app_id=req_app_id, api_key=api_key) + selected_pipeline, selected_model = PIPELINE_MANAGER.get_pipeline(route_id) # Namespace thread ids to prevent memory collisions across pipelines. - thread_id = f"{pipeline_id}:{thread_id}" + thread_id = f"{route_id}:{thread_id}" response_id = f"appcmpl-{os.urandom(12).hex()}" @@ -364,7 +237,9 @@ async def _process_dashscope_request( "created": int(time.time()), "model": selected_model, }, - "pipeline_id": pipeline_id, + "route_id": route_id, + # Backward compatibility: keep pipeline_id in response as the route id selector. + "pipeline_id": route_id, "is_end": True, } return JSONResponse(content=data)