dashscope pipeline manages multiple pipelines

This commit is contained in:
2026-03-03 15:44:11 +08:00
parent bc208209c7
commit 4974ca936c

View File

@@ -20,6 +20,7 @@ sys.path.append(osp.dirname(osp.dirname(osp.abspath(__file__))))
from lang_agent.pipeline import Pipeline, PipelineConfig
from lang_agent.config.core_config import load_tyro_conf
from lang_agent.components.server_pipeline_manager import ServerPipelineManager
# Initialize default pipeline once (used when no explicit pipeline id is provided)
pipeline_config = tyro.cli(PipelineConfig)
@@ -35,136 +36,8 @@ REGISTRY_FILE = os.environ.get(
)
class PipelineManager:
"""Lazily load and cache multiple pipelines keyed by a client-facing id."""
def __init__(self, default_pipeline_id: str, default_config: PipelineConfig, default_pipeline: Pipeline):
self.default_pipeline_id = default_pipeline_id
self.default_config = default_config
self._pipeline_specs: Dict[str, Dict[str, Any]] = {}
self._api_key_policy: Dict[str, Dict[str, Any]] = {}
self._pipelines: Dict[str, Pipeline] = {default_pipeline_id: default_pipeline}
self._pipeline_llm: Dict[str, str] = {default_pipeline_id: default_config.llm_name}
self._pipeline_specs[default_pipeline_id] = {"enabled": True, "config_file": None}
def _resolve_registry_path(self, registry_path: str) -> str:
path = FsPath(registry_path)
if path.is_absolute():
return str(path)
root = FsPath(osp.dirname(osp.dirname(osp.abspath(__file__))))
return str((root / path).resolve())
def load_registry(self, registry_path: str) -> None:
abs_path = self._resolve_registry_path(registry_path)
if not osp.exists(abs_path):
logger.warning(f"pipeline registry file not found: {abs_path}. Using default pipeline only.")
return
with open(abs_path, "r", encoding="utf-8") as f:
registry:dict = json.load(f)
pipelines = registry.get("pipelines", {})
if not isinstance(pipelines, dict):
raise ValueError("`pipelines` in pipeline registry must be an object.")
for pipeline_id, spec in pipelines.items():
if not isinstance(spec, dict):
raise ValueError(f"pipeline spec for `{pipeline_id}` must be an object.")
self._pipeline_specs[pipeline_id] = {
"enabled": bool(spec.get("enabled", True)),
"config_file": spec.get("config_file"),
"overrides": spec.get("overrides", {}),
}
api_key_policy = registry.get("api_keys", {})
if api_key_policy and not isinstance(api_key_policy, dict):
raise ValueError("`api_keys` in pipeline registry must be an object.")
self._api_key_policy = api_key_policy
logger.info(f"loaded pipeline registry: {abs_path}, pipelines={list(self._pipeline_specs.keys())}")
def _resolve_config_path(self, config_file: str) -> str:
path = FsPath(config_file)
if path.is_absolute():
return str(path)
root = FsPath(osp.dirname(osp.dirname(osp.abspath(__file__))))
return str((root / path).resolve())
def _build_pipeline(self, pipeline_id: str) -> Tuple[Pipeline, str]:
spec = self._pipeline_specs.get(pipeline_id)
if spec is None:
raise HTTPException(status_code=404, detail=f"Unknown pipeline_id: {pipeline_id}")
if not spec.get("enabled", True):
raise HTTPException(status_code=403, detail=f"Pipeline disabled: {pipeline_id}")
config_file = spec.get("config_file")
overrides = spec.get("overrides", {})
if not config_file and not overrides:
# default pipeline
p = self._pipelines[self.default_pipeline_id]
llm_name = self._pipeline_llm[self.default_pipeline_id]
return p, llm_name
if config_file:
cfg = load_tyro_conf(self._resolve_config_path(config_file))
else:
# Build from default config + shallow overrides so new pipelines can be
# added via registry without additional yaml files.
cfg = copy.deepcopy(self.default_config)
if not isinstance(overrides, dict):
raise ValueError(f"pipeline `overrides` for `{pipeline_id}` must be an object.")
for key, value in overrides.items():
if not hasattr(cfg, key):
raise ValueError(f"unknown override field `{key}` for pipeline `{pipeline_id}`")
setattr(cfg, key, value)
p = cfg.setup()
llm_name = getattr(cfg, "llm_name", "unknown-model")
return p, llm_name
def _authorize(self, api_key: str, pipeline_id: str) -> None:
if not self._api_key_policy:
return
policy = self._api_key_policy.get(api_key)
if policy is None:
return
allowed = policy.get("allowed_pipeline_ids")
if allowed and pipeline_id not in allowed:
raise HTTPException(status_code=403, detail=f"pipeline_id `{pipeline_id}` is not allowed for this API key")
def resolve_pipeline_id(self, body: Dict[str, Any], app_id: Optional[str], api_key: str) -> str:
body_input = body.get("input", {})
pipeline_id = (
body.get("pipeline_id")
or (body_input.get("pipeline_id") if isinstance(body_input, dict) else None)
or app_id
)
if not pipeline_id:
key_policy = self._api_key_policy.get(api_key, {}) if self._api_key_policy else {}
pipeline_id = key_policy.get("default_pipeline_id", self.default_pipeline_id)
if pipeline_id not in self._pipeline_specs:
raise HTTPException(status_code=404, detail=f"Unknown pipeline_id: {pipeline_id}")
self._authorize(api_key, pipeline_id)
return pipeline_id
def get_pipeline(self, pipeline_id: str) -> Tuple[Pipeline, str]:
cached = self._pipelines.get(pipeline_id)
if cached is not None:
return cached, self._pipeline_llm[pipeline_id]
pipeline_obj, llm_name = self._build_pipeline(pipeline_id)
self._pipelines[pipeline_id] = pipeline_obj
self._pipeline_llm[pipeline_id] = llm_name
logger.info(f"lazy-loaded pipeline_id={pipeline_id} model={llm_name}")
return pipeline_obj, llm_name
PIPELINE_MANAGER = PipelineManager(
default_pipeline_id=os.environ.get("FAST_DEFAULT_PIPELINE_ID", "default"),
PIPELINE_MANAGER = ServerPipelineManager(
default_route_id=os.environ.get("FAST_DEFAULT_ROUTE_ID", os.environ.get("FAST_DEFAULT_PIPELINE_ID", "default")),
default_config=pipeline_config,
default_pipeline=pipeline,
)
@@ -334,11 +207,11 @@ async def _process_dashscope_request(
thread_id = body_input.get("session_id") or req_session_id or "3"
user_msg = _extract_user_message(messages)
pipeline_id = PIPELINE_MANAGER.resolve_pipeline_id(body=body, app_id=req_app_id, api_key=api_key)
selected_pipeline, selected_model = PIPELINE_MANAGER.get_pipeline(pipeline_id)
route_id = PIPELINE_MANAGER.resolve_route_id(body=body, app_id=req_app_id, api_key=api_key)
selected_pipeline, selected_model = PIPELINE_MANAGER.get_pipeline(route_id)
# Namespace thread ids to prevent memory collisions across pipelines.
thread_id = f"{pipeline_id}:{thread_id}"
thread_id = f"{route_id}:{thread_id}"
response_id = f"appcmpl-{os.urandom(12).hex()}"
@@ -364,7 +237,9 @@ async def _process_dashscope_request(
"created": int(time.time()),
"model": selected_model,
},
"pipeline_id": pipeline_id,
"route_id": route_id,
# Backward compatibility: keep pipeline_id in response as the route id selector.
"pipeline_id": route_id,
"is_end": True,
}
return JSONResponse(content=data)