diff --git a/fastapi_server/front_apis.py b/fastapi_server/front_apis.py index 4cdb352..40e71a3 100644 --- a/fastapi_server/front_apis.py +++ b/fastapi_server/front_apis.py @@ -1,6 +1,7 @@ from typing import Dict, List, Optional import os import os.path as osp +import subprocess import sys import uuid @@ -26,6 +27,26 @@ class GraphConfigUpsertResponse(BaseModel): tool_keys: List[str] prompt_keys: List[str] +class GraphConfigReadResponse(BaseModel): + pipeline_id: str + prompt_set_id: str + tool_keys: List[str] + prompt_dict: Dict[str, str] + +class GraphConfigListItem(BaseModel): + pipeline_id: str + prompt_set_id: str + name: str + description: str + is_active: bool + tool_keys: List[str] + created_at: Optional[str] = Field(default=None) + updated_at: Optional[str] = Field(default=None) + +class GraphConfigListResponse(BaseModel): + items: List[GraphConfigListItem] + count: int + class PipelineCreateRequest(BaseModel): graph_id: str = Field( description="Graph key from GRAPH_BUILD_FNCS, e.g. routing or react" @@ -46,6 +67,23 @@ class PipelineCreateResponse(BaseModel): url: str port: int +class PipelineRunInfo(BaseModel): + run_id: str + pid: int + graph_id: str + pipeline_id: str + prompt_set_id: str + url: str + port: int + +class PipelineListResponse(BaseModel): + items: List[PipelineRunInfo] + count: int + +class PipelineStopResponse(BaseModel): + run_id: str + status: str + app = FastAPI( title="Front APIs", @@ -63,6 +101,15 @@ app.add_middleware( _db = DBConfigManager() _running_pipelines: Dict[str, Dict[str, object]] = {} +def _prune_stopped_pipelines() -> None: + stale_ids: List[str] = [] + for run_id, info in _running_pipelines.items(): + proc = info["proc"] + if proc.poll() is not None: + stale_ids.append(run_id) + for run_id in stale_ids: + _running_pipelines.pop(run_id, None) + @app.get("/health") async def health(): @@ -75,9 +122,14 @@ async def root(): "message": "Front APIs", "endpoints": [ "/v1/graph-configs (POST)", + "/v1/graph-configs (GET)", + "/v1/graph-configs/default/{pipeline_id} (GET)", + "/v1/graph-configs/{pipeline_id}/{prompt_set_id} (GET)", "/v1/graph-configs/{pipeline_id}/{prompt_set_id} (DELETE)", "/v1/pipelines/graphs (GET)", "/v1/pipelines (POST)", + "/v1/pipelines (GET)", + "/v1/pipelines/{run_id} (DELETE)", ], } @@ -103,6 +155,74 @@ async def upsert_graph_config(body: GraphConfigUpsertRequest): prompt_keys=list(body.prompt_dict.keys()), ) +@app.get("/v1/graph-configs", response_model=GraphConfigListResponse) +async def list_graph_configs(pipeline_id: Optional[str] = None, graph_id: Optional[str] = None): + resolved_pipeline_id = pipeline_id or graph_id + try: + rows = _db.list_prompt_sets(pipeline_id=resolved_pipeline_id) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + items = [GraphConfigListItem(**row) for row in rows] + return GraphConfigListResponse(items=items, count=len(items)) + +@app.get("/v1/graph-configs/default/{pipeline_id}", response_model=GraphConfigReadResponse) +async def get_default_graph_config(pipeline_id: str): + try: + prompt_dict, tool_keys = _db.get_config(pipeline_id=pipeline_id, prompt_set_id=None) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + if not prompt_dict and not tool_keys: + raise HTTPException( + status_code=404, + detail=f"No active prompt set found for pipeline '{pipeline_id}'", + ) + + rows = _db.list_prompt_sets(pipeline_id=pipeline_id) + active = next((row for row in rows if row["is_active"]), None) + if active is None: + raise HTTPException( + status_code=404, + detail=f"No active prompt set found for pipeline '{pipeline_id}'", + ) + + return GraphConfigReadResponse( + pipeline_id=pipeline_id, + prompt_set_id=active["prompt_set_id"], + tool_keys=tool_keys, + prompt_dict=prompt_dict, + ) + +@app.get("/v1/graph-configs/{pipeline_id}/{prompt_set_id}", response_model=GraphConfigReadResponse) +async def get_graph_config(pipeline_id: str, prompt_set_id: str): + try: + meta = _db.get_prompt_set(pipeline_id=pipeline_id, prompt_set_id=prompt_set_id) + if meta is None: + raise HTTPException( + status_code=404, + detail=f"prompt_set_id '{prompt_set_id}' not found for pipeline '{pipeline_id}'", + ) + prompt_dict, tool_keys = _db.get_config( + pipeline_id=pipeline_id, + prompt_set_id=prompt_set_id, + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + return GraphConfigReadResponse( + pipeline_id=pipeline_id, + prompt_set_id=prompt_set_id, + tool_keys=tool_keys, + prompt_dict=prompt_dict, + ) + @app.delete("/v1/graph-configs/{pipeline_id}/{prompt_set_id}") async def delete_graph_config(pipeline_id: str, prompt_set_id: str): @@ -124,6 +244,23 @@ async def delete_graph_config(pipeline_id: str, prompt_set_id: str): async def available_graphs(): return {"available_graphs": sorted(GRAPH_BUILD_FNCS.keys())} +@app.get("/v1/pipelines", response_model=PipelineListResponse) +async def list_running_pipelines(): + _prune_stopped_pipelines() + items = [ + PipelineRunInfo( + run_id=run_id, + pid=info["proc"].pid, + graph_id=info["graph_id"], + pipeline_id=info["pipeline_id"], + prompt_set_id=info["prompt_set_id"], + url=info["url"], + port=info["port"], + ) + for run_id, info in _running_pipelines.items() + ] + return PipelineListResponse(items=items, count=len(items)) + @app.post("/v1/pipelines", response_model=PipelineCreateResponse) async def create_pipeline(body: PipelineCreateRequest): @@ -136,7 +273,7 @@ async def create_pipeline(body: PipelineCreateRequest): try: proc, url = build_fn( - pipelin_id=body.pipeline_id, + pipeline_id=body.pipeline_id, prompt_set=body.prompt_set_id, tool_keys=body.tool_keys, port=str(body.port), @@ -165,3 +302,20 @@ async def create_pipeline(body: PipelineCreateRequest): url=url, port=body.port, ) + +@app.delete("/v1/pipelines/{run_id}", response_model=PipelineStopResponse) +async def stop_pipeline(run_id: str): + info = _running_pipelines.pop(run_id, None) + if info is None: + raise HTTPException(status_code=404, detail=f"run_id '{run_id}' not found") + + proc = info["proc"] + if proc.poll() is None: + proc.terminate() + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=5) + + return PipelineStopResponse(run_id=run_id, status="stopped")