Compare commits

..

93 Commits

Author SHA1 Message Date
737a80aa39 llm name fix 2026-03-18 20:44:28 +08:00
bdd4ddec9e show tool message in chat 2026-03-14 11:20:39 +08:00
2ee55d25cc bug fix: skill_dir show up correctly in frontend 2026-03-13 16:09:26 +08:00
bf9ce709e2 bug fix; error when saving config for fs_backend 2026-03-13 16:01:45 +08:00
59331d2435 api bug fix 2026-03-13 14:52:37 +08:00
a9488a655a bug fix 2026-03-13 14:17:28 +08:00
eeadd4c825 update nginx.conf 2026-03-13 14:03:56 +08:00
42d8b8e8e1 bug fixes 2026-03-13 13:57:00 +08:00
da6cc1e18b deepagent backend configurable 2026-03-13 13:56:24 +08:00
691ef1c77d not edit my computer 2026-03-13 13:44:58 +08:00
9f9813f01d add deep agent image 2026-03-13 11:17:31 +08:00
f9f3651c7e fix docker 2026-03-12 20:37:17 +08:00
87407e1656 add chinese mirror 2026-03-12 20:33:11 +08:00
2c226e2df2 fix? 2026-03-12 17:20:32 +08:00
8b2a506177 fixes? 2026-03-12 17:18:23 +08:00
a26cda2f04 add dockerfile.frontend 2026-03-12 16:51:48 +08:00
5e2a86e3be update install 2026-03-12 16:50:26 +08:00
2c7d5ea589 frontend in docker 2026-03-12 16:40:51 +08:00
36e3b40936 build frontend 2026-03-12 16:36:35 +08:00
86f6e4f81c change port num 2026-03-12 16:11:23 +08:00
60f3029e54 update sht 2026-03-12 11:37:08 +08:00
fe7ff9a516 fix tool initialization failure 2026-03-12 11:36:37 +08:00
33faedc1b1 enable comma in tool list 2026-03-12 11:36:11 +08:00
c9b1c5cb32 support both docker and local dev 2026-03-11 22:31:37 +08:00
8f99d47af9 update readme 2026-03-10 11:42:20 +08:00
1d36f196ca readme 2026-03-10 11:24:25 +08:00
9f40ef93b7 track nginx.conf 2026-03-10 11:11:18 +08:00
fda3e86a71 update pyproject.toml 2026-03-10 11:08:33 +08:00
d5303ad201 add network configs 2026-03-10 11:08:02 +08:00
245be63f07 donno what changed 2026-03-10 11:07:51 +08:00
068df7715e grat permissions to app users 2026-03-10 11:07:30 +08:00
95498180a2 update deepagent requirement 2026-03-10 10:21:27 +08:00
7b6ba79417 download_images.sh 2026-03-08 12:27:21 +08:00
a228325d74 update path 2026-03-08 12:17:32 +08:00
e59ae66b85 update path 2026-03-08 12:17:24 +08:00
e524be15e5 add database initialization scripts 2026-03-08 12:17:10 +08:00
2d90a4caac update path 2026-03-08 12:15:03 +08:00
4e8c2b4f5d installation shell script 2026-03-08 12:12:20 +08:00
0ddeb91755 moved shell script 2026-03-08 12:12:13 +08:00
cb3b98db10 moved docker files 2026-03-08 12:03:58 +08:00
487562042f add comments 2026-03-08 12:00:04 +08:00
ac46518cf5 moved docker 2026-03-07 15:00:36 +08:00
c03fb17a5c update default prompt 2026-03-07 14:54:48 +08:00
e049e4e104 sanity check for shell shell configs 2026-03-07 14:52:19 +08:00
8d0cd7861b update tests 2026-03-07 14:52:00 +08:00
7920a1bab3 init deepagent config 2026-03-07 14:51:49 +08:00
30e93ed0a7 update prompt store retreival 2026-03-07 14:51:20 +08:00
0087ac41e6 build_deepagent in build_server_utils 2026-03-07 14:51:04 +08:00
5358b46113 remove override support 2026-03-07 14:50:48 +08:00
3306da038d front_api update 2026-03-07 14:50:25 +08:00
ac99dfd56b front end update:
- chat support markdown
- stop button for chat
- configurable for deepagent
2026-03-07 14:50:01 +08:00
3932d695bf update comments 2026-03-06 18:38:07 +08:00
9ed141ba42 update tests 2026-03-06 18:37:59 +08:00
9b3db40b94 enable simple chat 2026-03-06 15:19:51 +08:00
da17f2b319 show inference info 2026-03-06 14:48:00 +08:00
4b6e97d8fb get_pipeline_conversation_messages 2026-03-06 14:47:41 +08:00
112af37151 chat dashscope 2026-03-06 13:51:46 +08:00
3cd46030ad support markdown visualization 2026-03-06 13:43:09 +08:00
dd842fca42 update tests 2026-03-06 13:19:26 +08:00
fc9f0f929d css update 2026-03-06 13:18:31 +08:00
28d99f4b8d save pipeline_id 2026-03-06 13:17:04 +08:00
dac067b6fe list_pipeline_conversations front_api 2026-03-06 13:16:53 +08:00
e90f0afabe ts 2026-03-06 13:16:24 +08:00
0676a68c9e crash the fking thing 2026-03-06 13:16:08 +08:00
f185b70d3f chat convo tab + save yaml 2026-03-06 13:15:51 +08:00
07149e426e moved constant 2026-03-06 11:36:07 +08:00
3fc3d7288c react front end show available tools from mcp 2026-03-05 19:32:46 +08:00
eb7e85e4e6 front_api list_mcp_available_tools 2026-03-05 19:31:41 +08:00
ddfda10700 aget_tool_with_error in client_tool_manager 2026-03-05 19:31:21 +08:00
f8364bea68 remove full path 2026-03-05 17:46:47 +08:00
01b0975abd update how mcp is configured 2026-03-05 17:44:25 +08:00
7e23d5c056 yaml to sql migration script 2026-03-05 17:17:10 +08:00
3b730798f8 yml to yaml 2026-03-05 17:15:20 +08:00
2781172724 use yaml instead of yml 2026-03-05 15:51:59 +08:00
26fba706f2 graph_id button bug fix 2026-03-05 15:31:58 +08:00
ae93ef37b6 update UI 2026-03-05 15:19:10 +08:00
c1b782c6b4 update registry 2026-03-05 15:05:10 +08:00
ab3285a4cf remove gitignore 2026-03-05 15:04:43 +08:00
0484343021 fixed combined.py disc 2026-03-05 14:55:08 +08:00
b87fded473 combined.py 2026-03-05 14:49:47 +08:00
8db22abf3b moved files 2026-03-05 14:48:36 +08:00
f6d86f24bb tests 2026-03-05 14:43:17 +08:00
c1afebd7ba make it a importable package 2026-03-05 14:43:05 +08:00
080631af31 update tests 2026-03-05 14:42:55 +08:00
38b0d5df15 change default port 2026-03-05 14:42:14 +08:00
f7937c3744 print which back end port it is connecting to 2026-03-05 11:51:31 +08:00
867acaf717 combine both front_apis and server_dashscope into one backend 2026-03-05 11:47:30 +08:00
a2890148f9 make this importable without tyro fking around 2026-03-05 11:43:16 +08:00
55b37cc611 change defualt port 2026-03-05 11:25:08 +08:00
c85598418d reload msg 2026-03-05 11:24:54 +08:00
ea605e19aa check for registry update 2026-03-05 11:24:29 +08:00
866edc319f start port at 8500 as default 2026-03-05 11:23:58 +08:00
8c6dd3344f dynamic updates of pipelines from updating pipeline_registry 2026-03-05 11:23:37 +08:00
60 changed files with 6414 additions and 357 deletions

3
.gitignore vendored
View File

@@ -11,4 +11,5 @@ django.log
.env
frontend/node_modules/
frontend/dist/
frontend/dist/
frontend/.vite

View File

@@ -2,6 +2,10 @@
这是一个基于FastAPI的聊天API服务使用OpenAI格式的请求来调用pipeline.invoke方法进行聊天。
## Docker Installation
For production deployment using Docker, see the [Installation Guide](README_INSTALL.md).
## 安装依赖
```bash
@@ -140,15 +144,35 @@ npm install
### Start the `front_apis` server
The frontend talks to the `front_apis` FastAPI service, which by default listens on `http://127.0.0.1:8001`.
The frontend talks to the `front_apis` FastAPI service, which by default listens on `http://127.0.0.1:8500`.
From the project root:
```bash
uvicorn fastapi_server.front_apis:app --reload --host 0.0.0.0 --port 8001
uvicorn fastapi_server.front_apis:app --reload --host 0.0.0.0 --port 8500
```
You can change the URL by setting `VITE_FRONT_API_BASE_URL` in `frontend/.env` (defaults to `http://127.0.0.1:8001`).
Or run directly:
```bash
python fastapi_server/front_apis.py
```
### Backend run modes
Run whichever backend mode you need from the project root:
```bash
# admin/control plane only (/v1/... frontend APIs)
uvicorn fastapi_server.front_apis:app --reload --host 0.0.0.0 --port 8500
# DashScope chat runtime only (/apps/... and /v1/apps/... APIs)
uvicorn fastapi_server.server_dashscope:app --reload --host 0.0.0.0 --port 8588
# combined mode: one process serves both front_apis + DashScope endpoints
uvicorn fastapi_server.combined:app --reload --host 0.0.0.0 --port 8500
```
You can change the URL by setting `VITE_FRONT_API_BASE_URL` in `frontend/.env` (defaults to `/`, i.e. same-origin).
### Start the development server

267
README_INSTALL.md Normal file
View File

@@ -0,0 +1,267 @@
# Installation Guide
This guide explains how to install and run the LangChain Agent application using Docker.
## Prerequisites
- Docker (version 20.10 or later)
- Docker Compose (version 2.0 or later, or use `docker compose` command)
## Quick Start
1. **Run the installation script:**
```bash
./scripts/shell_scripts/install.sh
```
This script will:
- Check for required tools (Docker, docker-compose)
- Create a `.env` file with default configuration
- Build Docker images (or use pre-loaded images)
- Start all services (PostgreSQL, Backend API, Nginx)
2. **Access the application:**
- Frontend: http://localhost (or http://localhost:80)
- Backend API: http://localhost:8500
- Database: localhost:5432
## Installation for China / Offline Use
If Docker Hub is slow or inaccessible in your region:
### Option 1: Use Chinese Docker Mirrors
Configure Docker to use Chinese registry mirrors:
```bash
sudo tee /etc/docker/daemon.json <<EOF
{
"registry-mirrors": [
"https://registry.docker-cn.com",
"https://mirror.ccsogou.com",
"https://docker.1ms.run"
]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
```
Then run `./scripts/shell_scripts/install.sh`
### configuring '.env' and 'frontend/.env'
```bash
ALI_API_KEY="API_KEY_FOR_ALI_QWEN"
ALI_BASE_URL="https://dashscope.aliyuncs.com/compatible-mode/v1"
POSTGRES_ROOT_PASSWORD="ROOT_PASSOWRD_FOR_DB_IN_CONN_STR - required for installation"
POSTGRES_PASSWORD="USER_PASSWORD_FOR_DB_CONN_STR - required for installation" - need to be same in POSTGRES_PASSWORD
CONN_STR="CONNECTION_STRING_TO_DATABASE" # DOCKER PASSWORD
FAST_AUTH_KEYS="API_KEY_FOR_OTHER_APPLICATIONS_TO_USE_BUILT_PIPELINE"
DAYTONA_API_KEY="DAYTONA_CONFIG - NOT REQUIRED"
```
### Option 2: Pre-load Docker Images Offline
1. On a machine with good Docker Hub access, run:
```bash
./scripts/shell_scripts/download_images.sh
```
This creates `images.tar` with all required images.
2. Transfer `images.tar` to your target machine.
3. Load the images:
```bash
docker load < images.tar
```
4. Run the install script:
```bash
./scripts/shell_scripts/install.sh
```
## Manual Installation
If you prefer to set up manually:
1. **Create environment file:**
```bash
cp .env.example .env # Edit as needed
```
2. **Build and start services:**
```bash
cd docker
docker compose -f docker-compose.prod.yml up -d --build
```
3. **Check service status:**
```bash
cd docker
docker compose -f docker-compose.prod.yml ps
```
## Configuration
Edit the `.env` file to customize:
- `POSTGRES_DB`: Database name (default: `ai_conversations`)
- `POSTGRES_USER`: Database user (default: `myapp_user`)
- `POSTGRES_PASSWORD`: Database password (default: `secure_password_123`)
- `POSTGRES_PORT`: PostgreSQL port (default: `5432`)
- `BACKEND_PORT`: Backend API port (default: `8500`)
- `FRONTEND_PORT`: Frontend web server port (default: `80`)
## Database Initialization
The database is automatically initialized when the PostgreSQL container starts for the first time. The following SQL scripts are executed in order:
1. `scripts/init_database/00_init_user.sh` - Creates database user and database
2. `scripts/init_database/create_conv_store.sql` - Creates conversation storage tables
3. `scripts/init_database/create_prompt_config.sql` - Creates prompt configuration tables
## Service Management
All commands run from the `docker/` directory:
### View logs:
```bash
cd docker
docker compose -f docker-compose.prod.yml logs -f
# Specific service
docker compose -f docker-compose.prod.yml logs -f backend
docker compose -f docker-compose.prod.yml logs -f postgres
docker compose -f docker-compose.prod.yml logs -f nginx
```
### Stop services:
```bash
cd docker
docker compose -f docker-compose.prod.yml down
```
### Restart services:
```bash
cd docker
docker compose -f docker-compose.prod.yml restart
```
### Rebuild after code changes:
```bash
cd docker
docker compose -f docker-compose.prod.yml up -d --build
```
### Reset database (delete all data):
```bash
cd docker
docker compose -f docker-compose.prod.yml down -v
docker compose -f docker-compose.prod.yml up -d
```
## Architecture
The application consists of three main services:
1. **PostgreSQL** (`postgres`): Database server
- Stores conversations and prompt configurations
- Automatically initializes schema on first run
2. **Backend** (`backend`): FastAPI application
- Serves API endpoints at port 8500
- Handles agent management and chat endpoints
- Connects to PostgreSQL database
3. **Nginx** (`nginx`): Web server
- Serves the React frontend (port 80)
- Proxies API requests to the backend
- Handles static file serving
## Project Structure
```
langchain-agent/
├── docker/
│ ├── docker-compose.prod.yml # Production compose file
│ └── Dockerfile.prod # Backend Docker image
├── scripts/
│ ├── shell_scripts/
│ │ ├── install.sh # Main installation script
│ │ └── download_images.sh # For offline image download
│ └── init_database/ # Database initialization scripts
├── frontend/ # React frontend
├── configs/ # Pipeline configurations
├── nginx.conf # Nginx configuration
└── .env # Environment variables
```
## Troubleshooting
### Database connection issues
If the backend can't connect to the database:
1. Check that PostgreSQL is running:
```bash
docker compose -f docker-compose.prod.yml ps postgres
```
2. Verify the connection string in `.env` matches the database configuration
3. Check backend logs:
```bash
docker compose -f docker-compose.prod.yml logs backend
```
### Frontend not loading / NetworkError
1. Check nginx logs:
```bash
docker compose -f docker-compose.prod.yml logs nginx
```
2. Ensure frontend is built with correct API base URL. The `frontend/.env` file should contain:
```
VITE_FRONT_API_BASE_URL=/
```
Then rebuild: `docker compose -f docker-compose.prod.yml build backend`
### Port conflicts
If ports are already in use, update the port mappings in `.env`:
```bash
# Example: use port 5433 for PostgreSQL
POSTGRES_PORT=5433
```
## Development
For development, you may want to run services separately:
1. Start only PostgreSQL:
```bash
cd docker
docker compose -f docker-compose.prod.yml up -d postgres
```
2. Run backend locally:
```bash
export CONN_STR="postgresql://myapp_user:secure_password_123@localhost:5432/ai_conversations"
python -m uvicorn lang_agent.fastapi_server.combined:app --reload --host 0.0.0.0 --port 8500
```
3. Run frontend locally:
```bash
cd frontend
npm install
npm run dev
```
Note: For local frontend development, create a `.env` file in `frontend/` with:
```
VITE_FRONT_API_BASE_URL=http://localhost:8500
```

View File

@@ -1,24 +1,31 @@
{
"pipelines": {
"xiaozhan": {
"enabled": true,
"config_file": "configs/pipelines/xiaozhan.yaml"
"pipelines": {
"xiaozhan": {
"enabled": true,
"config_file": "configs/pipelines/xiaozhan.yaml",
"graph_id": "routing",
"overrides": {
"llm_name": "qwen-plus"
}
},
"blueberry": {
"enabled": true,
"config_file": "configs/pipelines/blueberry.yaml",
"graph_id": "react",
"overrides": {
"llm_name": "qwen-plus"
}
}
},
"blueberry": {
"enabled": true,
"config_file": "configs/pipelines/blueberry.yaml"
"api_keys": {
"sk-6c7091e6a95f404efb2ec30e8f51b897626d670375cdf822d78262f24ab12367": {
"example-key-1": {
"default_route_id": "default",
"allowed_route_ids": [
"xiaozhan",
"blueberry"
]
}
}
}
},
"api_keys": {
"sk-6c7091e6a95f404efb2ec30e8f51b897626d670375cdf822d78262f24ab12367": {
"example-key-1": {
"default_route_id": "default",
"allowed_route_ids": [
"xiaozhan",
"blueberry"
]
}
}
}
}
}

View File

@@ -1,3 +1,7 @@
you are a helpful bot enhanced with skills.
You are a helpful bot enhanced with skills.
To use a skill, read its SKILL.md file using the read_file tool. Skills are NOT tools — they are instructions for using existing tools.
Skills with available="false" need dependencies installed first - you can try installing them with apt/brew. You can check if the environment the packages you need.
When using a skill, assume required tools (e.g., npx, curl) are available and execute the commands directly. If a command fails because a tool is missing, install the missing dependency using apt/brew and retry.
For shell commands (e.g., npx, curl), use the execute tool to run them.

View File

@@ -0,0 +1,19 @@
FROM node:20-alpine
WORKDIR /app
RUN npm config set registry https://registry.npmmirror.com
# Build-time API base for Vite (must be set before npm run build).
ARG VITE_FRONT_API_BASE_URL=/
ENV VITE_FRONT_API_BASE_URL=${VITE_FRONT_API_BASE_URL}
COPY package*.json ./
RUN npm install
COPY . .
RUN npm run build && \
mkdir -p /opt/frontend_dist && \
cp -r dist/. /opt/frontend_dist/
CMD ["sh", "-c", "rm -rf /app/dist/* && cp -r /opt/frontend_dist/. /app/dist && ls /app/dist"]

93
docker/Dockerfile.prod Normal file
View File

@@ -0,0 +1,93 @@
# Multi-stage Dockerfile for production deployment
# Stage 1: Build frontend
FROM node:20-alpine AS frontend-builder
WORKDIR /app/frontend
# Copy frontend files
COPY frontend/package*.json ./
RUN npm ci
COPY frontend/ ./
RUN npm run build
# Stage 2: Python backend
FROM python:3.12-slim
WORKDIR /app
# Install system dependencies
RUN set -eux; \
for source_file in /etc/apt/sources.list /etc/apt/sources.list.d/*.list /etc/apt/sources.list.d/*.sources; do \
if [ -f "$source_file" ]; then \
sed -i 's|deb.debian.org|mirrors.aliyun.com|g' "$source_file"; \
sed -i 's|security.debian.org|mirrors.aliyun.com|g' "$source_file"; \
fi; \
done; \
apt-get update; \
apt-get install -y --no-install-recommends \
postgresql-client \
curl; \
rm -rf /var/lib/apt/lists/*
# Copy Python dependencies
COPY pyproject.toml ./
RUN pip install --no-cache-dir --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple && \
pip install --no-cache-dir -e . -i https://pypi.tuna.tsinghua.edu.cn/simple
# Copy application code
COPY lang_agent/ ./lang_agent/
COPY configs/ ./configs/
COPY scripts/ ./scripts/
COPY assets/ ./assets/
COPY static/ ./static/
# Copy built frontend from stage 1
COPY --from=frontend-builder /app/frontend/dist ./frontend/dist
# Set environment variables
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
# Expose port
EXPOSE 8500
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
CMD curl -f http://localhost:8500/health || exit 1
# Create entrypoint script to wait for DB
# Uses Python to check database connection (more reliable than psql)
RUN echo '#!/bin/bash\n\
set -e\n\
echo "Waiting for database to be ready..."\n\
python3 << EOF\n\
import sys\n\
import time\n\
import psycopg\n\
\n\
max_attempts = 30\n\
conn_str = "${CONN_STR}"\n\
\n\
for i in range(max_attempts):\n\
try:\n\
with psycopg.connect(conn_str, connect_timeout=2) as conn:\n\
with conn.cursor() as cur:\n\
cur.execute("SELECT 1")\n\
print("Database is ready!")\n\
sys.exit(0)\n\
except Exception as e:\n\
if i == max_attempts - 1:\n\
print(f"Warning: Database not ready after {max_attempts * 2} seconds, continuing anyway...")\n\
print(f"Error: {e}")\n\
sys.exit(0)\n\
print(f"Database is unavailable - sleeping (attempt {i+1}/{max_attempts})")\n\
time.sleep(2)\n\
EOF\n\
exec "$@"' > /entrypoint.sh && chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
# Run the combined server
CMD ["python", "-m", "uvicorn", "lang_agent.fastapi_server.combined:app", "--host", "0.0.0.0", "--port", "8500"]

View File

@@ -0,0 +1,100 @@
version: '3.8'
services:
# PostgreSQL database
postgres:
image: postgres:16-alpine
container_name: langchain-agent-db
networks:
- app-network
environment:
POSTGRES_DB: postgres
POSTGRES_USER: postgres
POSTGRES_PASSWORD: ${POSTGRES_ROOT_PASSWORD:-postgres_root_password}
# These are used by init scripts to create the app database
APP_DB_NAME: ${POSTGRES_DB:-ai_conversations}
APP_DB_USER: ${POSTGRES_USER:-myapp_user}
APP_DB_PASSWORD: ${POSTGRES_PASSWORD:-secure_password_123}
volumes:
- postgres_data:/var/lib/postgresql/data
- ../scripts/init_database:/docker-entrypoint-initdb.d
ports:
- "${POSTGRES_PORT:-5434}:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
restart: no #unless-stopped
# Backend API server
backend:
build:
context: ..
dockerfile: docker/Dockerfile.prod
container_name: langchain-agent-backend
environment:
- PYTHONPATH=/app
- PYTHONUNBUFFERED=1
- CONN_STR=postgresql://${POSTGRES_USER:-myapp_user}:${POSTGRES_PASSWORD:-secure_password_123}@postgres:5432/${POSTGRES_DB:-ai_conversations}
- POSTGRES_USER=${POSTGRES_USER:-myapp_user}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-secure_password_123}
- POSTGRES_DB=${POSTGRES_DB:-ai_conversations}
ports:
- "${BACKEND_PORT:-8500}:8500"
volumes:
- ../configs:/app/configs
- ../scripts:/app/scripts
- ../assets:/app/assets
- ../static:/app/static
networks:
- app-network
depends_on:
postgres:
condition: service_healthy
restart: no #unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8500/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
# Frontend build service
frontend:
build:
context: ../frontend
dockerfile: ../docker/Dockerfile.frontend
args:
VITE_FRONT_API_BASE_URL: ${VITE_FRONT_API_BASE_URL:-/}
volumes:
- frontend_dist:/app/dist
networks:
- app-network
# Nginx for serving frontend (optional - can also serve via FastAPI)
nginx:
image: nginx:alpine
container_name: langchain-agent-nginx
networks:
- app-network
ports:
- "${FRONTEND_PORT:-8080}:80"
volumes:
- ../nginx.conf:/etc/nginx/nginx.conf:ro
- frontend_dist:/usr/share/nginx/html:ro
depends_on:
frontend:
condition: service_completed_successfully
backend:
condition: service_started
restart: no #unless-stopped
volumes:
postgres_data:
frontend_dist:
networks:
app-network:
driver: bridge

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

File diff suppressed because it is too large Load Diff

View File

@@ -11,7 +11,9 @@
},
"dependencies": {
"react": "^18.3.1",
"react-dom": "^18.3.1"
"react-dom": "^18.3.1",
"react-markdown": "^10.1.0",
"remark-gfm": "^4.0.1"
},
"devDependencies": {
"@types/react": "^18.3.20",

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,22 @@
import { describe, expect, it } from "vitest";
import { joinApiUrl } from "./frontApis";
describe("joinApiUrl", () => {
it("keeps same-origin paths when base url is slash", () => {
expect(joinApiUrl("/", "/v1/pipelines")).toBe("/v1/pipelines");
});
it("joins absolute host and trims trailing slash", () => {
expect(joinApiUrl("http://127.0.0.1:8500/", "/v1/pipelines")).toBe(
"http://127.0.0.1:8500/v1/pipelines"
);
});
it("accepts path without leading slash", () => {
expect(joinApiUrl("http://127.0.0.1:8500", "v1/pipelines")).toBe(
"http://127.0.0.1:8500/v1/pipelines"
);
});
});

View File

@@ -1,23 +1,48 @@
import type {
AvailableGraphsResponse,
ConversationListItem,
ConversationMessageItem,
GraphConfigListResponse,
GraphConfigReadResponse,
GraphConfigUpsertRequest,
GraphConfigUpsertResponse,
McpAvailableToolsResponse,
McpToolConfigResponse,
McpToolConfigUpdateRequest,
McpToolConfigUpdateResponse,
PipelineCreateRequest,
PipelineCreateResponse,
PipelineConversationListResponse,
PipelineConversationMessagesResponse,
PipelineListResponse,
PipelineStopResponse,
RuntimeAuthInfoResponse,
} from "../types";
const API_BASE_URL =
import.meta.env.VITE_FRONT_API_BASE_URL?.trim() || "http://127.0.0.1:8001";
const API_BASE_URL = import.meta.env.VITE_FRONT_API_BASE_URL?.trim() || "/";
export function joinApiUrl(baseUrl: string, path: string): string {
const normalizedPath = path.startsWith("/") ? path : `/${path}`;
const normalizedBase = baseUrl.trim();
// "/" is commonly used in Docker+nginx builds and should resolve as same-origin.
if (!normalizedBase || normalizedBase === "/") {
return normalizedPath;
}
return `${normalizedBase.replace(/\/+$/, "")}${normalizedPath}`;
}
// Log which backend the frontend is targeting on startup, with file + line hint.
// This runs once when the module is loaded.
// eslint-disable-next-line no-console
console.info(
`[frontend] Using FRONT_API_BASE_URL=${API_BASE_URL} (src/api/frontApis.ts:16)`
);
async function fetchJson<T>(path: string, init?: RequestInit): Promise<T> {
const response = await fetch(`${API_BASE_URL}${path}`, {
const url = joinApiUrl(API_BASE_URL, path);
const response = await fetch(url, {
headers: {
"Content-Type": "application/json",
...(init?.headers || {}),
@@ -36,7 +61,24 @@ async function fetchJson<T>(path: string, init?: RequestInit): Promise<T> {
}
throw new Error(message);
}
return (await response.json()) as T;
if (response.status === 204) {
return undefined as T;
}
const bodyText = await response.text();
if (!bodyText.trim()) {
return undefined as T;
}
try {
return JSON.parse(bodyText) as T;
} catch {
const preview = bodyText.slice(0, 160).replace(/\s+/g, " ").trim();
throw new Error(
`Expected JSON response from ${url}, but received non-JSON content: ${preview || "<empty>"}`
);
}
}
export function listAvailableGraphs(): Promise<AvailableGraphsResponse> {
@@ -107,6 +149,10 @@ export function updateMcpToolConfig(
});
}
export function listMcpAvailableTools(): Promise<McpAvailableToolsResponse> {
return fetchJson("/v1/tool-configs/mcp/tools");
}
export function createPipeline(
payload: PipelineCreateRequest
): Promise<PipelineCreateResponse> {
@@ -126,3 +172,138 @@ export function stopPipeline(pipelineId: string): Promise<PipelineStopResponse>
});
}
export function getRuntimeAuthInfo(): Promise<RuntimeAuthInfoResponse> {
return fetchJson("/v1/runtime-auth");
}
export async function listPipelineConversations(
pipelineId: string,
limit = 100
): Promise<ConversationListItem[]> {
const response = await fetchJson<PipelineConversationListResponse>(
`/v1/pipelines/${encodeURIComponent(pipelineId)}/conversations?limit=${limit}`
);
return response.items || [];
}
export async function getPipelineConversationMessages(
pipelineId: string,
conversationId: string
): Promise<ConversationMessageItem[]> {
const response = await fetchJson<PipelineConversationMessagesResponse>(
`/v1/pipelines/${encodeURIComponent(pipelineId)}/conversations/${encodeURIComponent(conversationId)}/messages`
);
return response.items || [];
}
type StreamAgentChatOptions = {
appId: string;
sessionId: string;
apiKey: string;
message: string;
onText: (text: string) => void;
signal?: AbortSignal;
};
function parseErrorDetail(payload: unknown): string | null {
if (!payload || typeof payload !== "object") {
return null;
}
const detail = (payload as { detail?: unknown }).detail;
return typeof detail === "string" && detail.trim() ? detail : null;
}
export async function streamAgentChatResponse(
options: StreamAgentChatOptions
): Promise<string> {
const { appId, sessionId, apiKey, message, onText, signal } = options;
const response = await fetch(
joinApiUrl(
API_BASE_URL,
`/v1/apps/${encodeURIComponent(appId)}/sessions/${encodeURIComponent(sessionId)}/responses`
),
{
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${apiKey}`,
},
body: JSON.stringify({
messages: [{ role: "user", content: message }],
stream: true,
}),
signal,
}
);
if (!response.ok) {
let messageText = `Request failed (${response.status})`;
try {
const payload = (await response.json()) as unknown;
const detail = parseErrorDetail(payload);
if (detail) {
messageText = detail;
}
} catch {
// Keep fallback status-based message.
}
throw new Error(messageText);
}
if (!response.body) {
throw new Error("Streaming response is not available.");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffered = "";
let latestText = "";
while (true) {
if (signal?.aborted) {
reader.cancel();
throw new Error("Request cancelled");
}
const { value, done } = await reader.read();
if (done) {
break;
}
buffered += decoder.decode(value, { stream: true });
let splitIndex = buffered.indexOf("\n\n");
while (splitIndex >= 0) {
const eventBlock = buffered.slice(0, splitIndex);
buffered = buffered.slice(splitIndex + 2);
splitIndex = buffered.indexOf("\n\n");
const lines = eventBlock.split("\n");
for (const rawLine of lines) {
const line = rawLine.trim();
if (!line.startsWith("data:")) {
continue;
}
const payloadRaw = line.slice(5).trim();
if (!payloadRaw) {
continue;
}
let payload: unknown;
try {
payload = JSON.parse(payloadRaw);
} catch {
continue;
}
const nextText =
typeof (payload as { output?: { text?: unknown } })?.output?.text === "string"
? ((payload as { output: { text: string } }).output.text as string)
: "";
if (nextText !== latestText) {
latestText = nextText;
onText(latestText);
}
}
}
}
return latestText;
}

View File

@@ -65,6 +65,12 @@ button:disabled {
gap: 8px;
}
.agent-item-row {
display: grid;
gap: 6px;
grid-template-columns: 1fr auto;
}
.agent-item {
align-items: flex-start;
display: flex;
@@ -73,6 +79,33 @@ button:disabled {
width: 100%;
}
.agent-item-title {
align-items: center;
display: flex;
gap: 8px;
justify-content: space-between;
width: 100%;
}
.agent-status-pill {
border-radius: 999px;
font-size: 11px;
font-weight: 600;
padding: 2px 8px;
}
.agent-status-pill.running {
background: #dff7e7;
border: 1px solid #8cd3a1;
color: #1a6b35;
}
.agent-status-pill.stopped {
background: #f2f4f7;
border: 1px solid #d1d8e0;
color: #4a5565;
}
.agent-item.selected {
border-color: #4d7ef3;
background: #edf3ff;
@@ -82,6 +115,11 @@ button:disabled {
color: #5f6f82;
}
.agent-chat-button {
align-self: stretch;
min-width: 64px;
}
.content {
padding: 20px;
}
@@ -186,6 +224,7 @@ button:disabled {
}
.prompt-section,
.file-backend-config,
.run-info {
border: 1px solid #dbe2ea;
border-radius: 10px;
@@ -194,10 +233,37 @@ button:disabled {
}
.prompt-section h3,
.file-backend-config h3,
.run-info h3 {
margin-top: 0;
}
.run-info-header {
align-items: center;
display: flex;
justify-content: space-between;
gap: 8px;
}
.runtime-badge {
border-radius: 999px;
font-size: 12px;
font-weight: 600;
padding: 4px 10px;
}
.runtime-badge.running {
background: #dff7e7;
border: 1px solid #8cd3a1;
color: #1a6b35;
}
.runtime-badge.stopped {
background: #f2f4f7;
border: 1px solid #d1d8e0;
color: #4a5565;
}
.graph-arch-section {
border: 1px solid #dbe2ea;
border-radius: 10px;
@@ -229,6 +295,166 @@ button:disabled {
padding: 10px;
}
.run-card-columns {
display: grid;
gap: 12px;
grid-template-columns: minmax(280px, 1fr) minmax(420px, 1.3fr);
}
.run-card-left,
.run-card-right {
display: flex;
flex-direction: column;
gap: 6px;
}
.run-card-right {
border-left: 1px solid #dbe2ea;
min-width: 0;
padding-left: 12px;
}
.run-card-right code {
display: inline-block;
max-width: 100%;
overflow-x: auto;
vertical-align: middle;
white-space: nowrap;
}
.discussion-section {
background: #f7fbff;
border: 1px solid #d7e6f6;
border-radius: 10px;
padding: 12px;
}
.discussion-header {
align-items: center;
display: flex;
justify-content: space-between;
gap: 12px;
}
.discussion-header h3 {
margin: 0;
}
.discussion-layout {
display: grid;
gap: 12px;
grid-template-columns: minmax(260px, 320px) 1fr;
margin-top: 10px;
}
.discussion-list {
display: flex;
flex-direction: column;
gap: 8px;
max-height: 65vh;
overflow-y: auto;
}
.discussion-item {
align-items: flex-start;
display: flex;
flex-direction: column;
gap: 4px;
text-align: left;
width: 100%;
}
.discussion-item.selected {
background: #edf3ff;
border-color: #4d7ef3;
}
.discussion-item small {
color: #687788;
}
.discussion-thread {
border: 1px solid #d7e6f6;
border-radius: 10px;
display: flex;
flex-direction: column;
gap: 8px;
max-height: 65vh;
overflow-y: auto;
padding: 10px;
}
.discussion-message {
background: #fff;
border: 1px solid #dbe2ea;
border-radius: 8px;
padding: 8px;
}
.discussion-message.human {
border-left: 3px solid #4d7ef3;
}
.discussion-message.ai {
border-left: 3px solid #26a269;
}
.discussion-message.tool {
border-left: 3px solid #8e6bd8;
}
.discussion-message-meta {
align-items: baseline;
display: flex;
gap: 8px;
}
.discussion-message pre {
font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace;
margin: 8px 0 0;
overflow-x: auto;
white-space: pre;
}
.discussion-message-markdown > :first-child {
margin-top: 0;
}
.discussion-message-markdown > :last-child {
margin-bottom: 0;
}
.discussion-message-markdown code {
background: #f3f5f8;
border-radius: 4px;
padding: 1px 4px;
}
.discussion-message-markdown pre code {
background: transparent;
padding: 0;
}
.discussion-message-markdown a {
color: #1a4fc5;
text-decoration: underline;
}
.discussion-message-markdown p,
.discussion-message-markdown ul,
.discussion-message-markdown ol,
.discussion-message-markdown blockquote,
.discussion-message-markdown table {
margin: 8px 0;
}
.discussion-message-markdown blockquote {
border-left: 3px solid #d0d8e2;
color: #4f5f73;
margin-left: 0;
padding-left: 10px;
}
.mcp-config-section {
background: #f7fbff;
border: 1px solid #d7e6f6;
@@ -258,8 +484,174 @@ button:disabled {
width: 100%;
}
.mcp-entry-list {
display: grid;
gap: 12px;
margin-top: 10px;
}
.mcp-tools-error {
color: #a33434;
margin: 6px 0 0 0;
}
.mcp-tools-inline {
background: #f8fbff;
border: 1px solid #d7e6f6;
border-radius: 8px;
margin: 0 0 10px 0;
padding: 8px;
}
.mcp-entry-card {
background: #fff;
border: 1px solid #d7e6f6;
border-radius: 10px;
padding: 10px;
}
.mcp-entry-header {
align-items: center;
display: flex;
justify-content: space-between;
gap: 10px;
margin-bottom: 10px;
}
.mcp-entry-grid {
display: grid;
gap: 10px;
grid-template-columns: repeat(2, minmax(200px, 1fr));
}
.mcp-entry-grid label {
display: flex;
flex-direction: column;
font-size: 14px;
gap: 6px;
}
.mcp-entry-grid input,
.mcp-entry-grid select {
border: 1px solid #c9d4e2;
border-radius: 8px;
font-size: 14px;
padding: 8px;
}
.mcp-entry-wide {
grid-column: 1 / -1;
}
.empty {
color: #687788;
margin: 6px 0;
}
.chat-modal-overlay {
align-items: center;
background: rgba(16, 24, 40, 0.45);
display: flex;
inset: 0;
justify-content: center;
position: fixed;
z-index: 20;
}
.chat-modal {
background: #fff;
border: 1px solid #d7e6f6;
border-radius: 12px;
display: grid;
gap: 10px;
max-height: 86vh;
max-width: 820px;
padding: 12px;
width: min(92vw, 820px);
}
.chat-modal-header {
align-items: center;
border-bottom: 1px solid #dbe2ea;
display: flex;
justify-content: space-between;
padding-bottom: 8px;
}
.chat-modal-header small {
color: #687788;
display: block;
margin-top: 2px;
}
.chat-modal-messages {
background: #f8fbff;
border: 1px solid #d7e6f6;
border-radius: 10px;
display: flex;
flex-direction: column;
gap: 8px;
max-height: 56vh;
overflow-y: auto;
padding: 10px;
}
.chat-modal-message {
background: #fff;
border: 1px solid #dbe2ea;
border-radius: 8px;
padding: 8px;
}
.chat-modal-message.user {
border-left: 3px solid #4d7ef3;
}
.chat-modal-message.assistant {
border-left: 3px solid #26a269;
}
.chat-modal-message.tool {
border-left: 3px solid #8e6bd8;
}
.chat-modal-message p {
margin: 6px 0 0 0;
white-space: pre-wrap;
}
.chat-modal-input {
display: grid;
gap: 8px;
grid-template-columns: 1fr auto;
align-items: start;
}
.chat-modal-input textarea {
border: 1px solid #c9d4e2;
border-radius: 8px;
font-size: 14px;
padding: 8px;
resize: vertical;
}
.chat-modal-actions {
display: flex;
gap: 8px;
height: 100%;
}
.chat-modal-actions button {
height: auto;
white-space: nowrap;
}
.chat-stop-button {
background-color: #dc3545;
color: white;
}
.chat-stop-button:hover {
background-color: #bb2d3b;
}

View File

@@ -23,6 +23,7 @@ export type GraphConfigReadResponse = {
tool_keys: string[];
prompt_dict: Record<string, string>;
api_key: string;
graph_params?: Record<string, unknown>;
};
export type GraphConfigUpsertRequest = {
@@ -55,6 +56,7 @@ export type PipelineCreateRequest = {
api_key?: string;
llm_name: string;
enabled?: boolean;
graph_params?: Record<string, unknown>;
};
export type PipelineSpec = {
@@ -63,7 +65,6 @@ export type PipelineSpec = {
enabled: boolean;
config_file: string;
llm_name: string;
overrides: Record<string, unknown>;
};
export type PipelineCreateResponse = {
@@ -89,6 +90,38 @@ export type PipelineStopResponse = {
reload_required: boolean;
};
export type ConversationListItem = {
conversation_id: string;
pipeline_id: string;
message_count: number;
last_updated?: string | null;
};
export type PipelineConversationListResponse = {
pipeline_id: string;
items: ConversationListItem[];
count: number;
};
export type ConversationMessageItem = {
message_type: string;
content: string;
sequence_number: number;
created_at: string;
};
export type PipelineConversationMessagesResponse = {
pipeline_id: string;
conversation_id: string;
items: ConversationMessageItem[];
count: number;
};
export type RuntimeAuthInfoResponse = {
fast_api_key: string;
source: string;
};
export type McpToolConfigResponse = {
path: string;
raw_content: string;
@@ -105,3 +138,15 @@ export type McpToolConfigUpdateResponse = {
tool_keys: string[];
};
export type McpAvailableToolsResponse = {
available_tools: string[];
errors: string[];
servers: Record<
string,
{
tools: string[];
error?: string | null;
}
>;
};

File diff suppressed because one or more lines are too long

View File

@@ -1 +1 @@
{"root":["./src/App.tsx","./src/activeConfigSelection.test.ts","./src/activeConfigSelection.ts","./src/main.tsx","./src/types.ts","./src/vite-env.d.ts","./src/api/frontApis.ts"],"version":"5.9.3"}
{"root":["./src/App.tsx","./src/activeConfigSelection.test.ts","./src/activeConfigSelection.ts","./src/main.tsx","./src/types.ts","./src/vite-env.d.ts","./src/api/frontApis.test.ts","./src/api/frontApis.ts"],"version":"5.9.3"}

View File

@@ -4,5 +4,15 @@ export default defineConfig({
plugins: [react()],
server: {
port: 5173,
proxy: {
"/v1": {
target: "http://127.0.0.1:8500",
changeOrigin: true,
},
"/apps": {
target: "http://127.0.0.1:8500",
changeOrigin: true,
},
},
},
});

View File

@@ -5,6 +5,16 @@ export default defineConfig({
plugins: [react()],
server: {
port: 5173,
proxy: {
"/v1": {
target: "http://127.0.0.1:8500",
changeOrigin: true,
},
"/apps": {
target: "http://127.0.0.1:8500",
changeOrigin: true,
},
},
},
});

View File

@@ -251,32 +251,30 @@ class ClientToolManager:
def populate_module(self):
with open(self.config.mcp_config_f, "r") as f:
self.mcp_configs:dict = commentjson.load(f)
def _get_to_load_configs(self) -> dict:
if self.config.tool_keys is None:
return self.mcp_configs
if len(self.config.tool_keys) == 0:
logger.info("no tools will be loaded")
return {}
to_load_config = {}
for key in self.config.tool_keys:
val = self.mcp_configs.get(key)
if val is None:
logger.warning(f"{key} is not in mcp tools")
else:
to_load_config[key] = val
return to_load_config
async def aget_tools(self):
"""
Get tools from all configured MCP servers.
Handles connection failures gracefully by logging warnings and continuing.
"""
def get_to_load_configs() -> dict:
if self.config.tool_keys is None:
to_load_config = self.mcp_configs
else:
if len(self.config.tool_keys) == 0:
logger.info("no tools will be loaded")
return {}
to_load_config = {}
for key in self.config.tool_keys:
val = self.mcp_configs.get(key)
if val is None:
logger.warning(f"{key} is not in mcp tools")
else:
to_load_config[key] = val
return to_load_config
to_load_config = get_to_load_configs()
to_load_config = self._get_to_load_configs()
all_tools = []
for server_name, server_config in to_load_config.items():
try:
@@ -298,6 +296,78 @@ class ClientToolManager:
return all_tools
async def aget_tools_with_errors(self):
"""
Get tools and collect human-readable per-server errors.
Returns:
(all_tools, errors)
"""
to_load_config = self._get_to_load_configs()
all_tools = []
errors = []
for server_name, server_config in to_load_config.items():
try:
single_server_config = {server_name: server_config}
client = MultiServerMCPClient(single_server_config)
tools = await client.get_tools()
all_tools.extend(tools)
logger.info(
f"Successfully connected to MCP server '{server_name}', retrieved {len(tools)} tools"
)
except Exception as e:
url = (
server_config.get("url", "unknown URL")
if isinstance(server_config, dict)
else "unknown URL"
)
err_msg = (
f"{server_name} ({url}): {type(e).__name__}: {e}"
)
errors.append(err_msg)
logger.exception(
f"Failed to connect to MCP server '{server_name}' at {url}"
)
if hasattr(e, "exceptions"):
for nested_exc in e.exceptions:
errors.append(
f"{server_name} nested: {type(nested_exc).__name__}: {nested_exc}"
)
continue
return all_tools, errors
async def aget_tools_by_server(self) -> dict:
"""
Get MCP tools grouped by server name, including per-server error (if any).
Returns:
{
"server_name": {
"tools": ["tool_a", "tool_b"],
"error": "ExceptionType: message" | None,
},
...
}
"""
to_load_config = self._get_to_load_configs()
grouped = {}
for server_name, server_config in to_load_config.items():
grouped[server_name] = {"tools": [], "error": None}
try:
single_server_config = {server_name: server_config}
client = MultiServerMCPClient(single_server_config)
tools = await client.get_tools()
tool_names = sorted(
{
str(getattr(tool, "name", "")).strip()
for tool in tools
if str(getattr(tool, "name", "")).strip()
}
)
grouped[server_name] = {"tools": tool_names, "error": None}
except Exception as e:
grouped[server_name]["error"] = f"{type(e).__name__}: {e}"
logger.exception(f"Failed to connect to MCP server '{server_name}'")
return grouped
def get_tools(self):
try:
loop = asyncio.get_running_loop()

View File

@@ -4,6 +4,7 @@ from pathlib import Path as FsPath
import os.path as osp
import json
import copy
from threading import RLock
from loguru import logger
from lang_agent.pipeline import Pipeline, PipelineConfig
@@ -20,6 +21,9 @@ class ServerPipelineManager:
self._api_key_policy: Dict[str, Dict[str, Any]] = {}
self._pipelines: Dict[str, Pipeline] = {}
self._pipeline_llm: Dict[str, str] = {}
self._registry_path: Optional[str] = None
self._registry_mtime_ns: Optional[int] = None
self._lock = RLock()
def _resolve_registry_path(self, registry_path: str) -> str:
path = FsPath(registry_path)
@@ -30,39 +34,102 @@ class ServerPipelineManager:
root = FsPath(__file__).resolve().parents[2]
return str((root / path).resolve())
def load_registry(self, registry_path: str) -> None:
abs_path = self._resolve_registry_path(registry_path)
if not osp.exists(abs_path):
raise ValueError(f"pipeline registry file not found: {abs_path}")
def _stat_registry_mtime_ns(self, abs_path: str) -> int:
return FsPath(abs_path).stat().st_mtime_ns
def _read_registry(self, abs_path: str) -> Dict[str, Any]:
with open(abs_path, "r", encoding="utf-8") as f:
registry: dict = json.load(f)
return json.load(f)
def _apply_registry(self, abs_path: str, registry: Dict[str, Any], mtime_ns: int) -> bool:
pipelines = registry.get("pipelines")
if pipelines is None:
if pipelines is None or not isinstance(pipelines, dict):
raise ValueError("`pipelines` in pipeline registry must be an object.")
self._pipeline_specs = {}
parsed_specs: Dict[str, Dict[str, Any]] = {}
for pipeline_id, spec in pipelines.items():
if not isinstance(spec, dict):
raise ValueError(
f"pipeline spec for `{pipeline_id}` must be an object."
)
self._pipeline_specs[pipeline_id] = {
parsed_specs[pipeline_id] = {
"enabled": bool(spec.get("enabled", True)),
"config_file": spec.get("config_file"),
"overrides": spec.get("overrides", {}),
"llm_name": spec.get("llm_name"),
}
if not self._pipeline_specs:
if not parsed_specs:
raise ValueError("pipeline registry must define at least one pipeline.")
api_key_policy = registry.get("api_keys", {})
if api_key_policy and not isinstance(api_key_policy, dict):
raise ValueError("`api_keys` in pipeline registry must be an object.")
self._api_key_policy = api_key_policy
logger.info(
f"loaded pipeline registry: {abs_path}, pipelines={list(self._pipeline_specs.keys())}"
with self._lock:
old_specs = self._pipeline_specs
old_policy = self._api_key_policy
old_mtime = self._registry_mtime_ns
removed = set(old_specs.keys()) - set(parsed_specs.keys())
added = set(parsed_specs.keys()) - set(old_specs.keys())
modified = {
pipeline_id
for pipeline_id in (set(old_specs.keys()) & set(parsed_specs.keys()))
if old_specs[pipeline_id] != parsed_specs[pipeline_id]
}
changed = bool(added or removed or modified or old_policy != api_key_policy)
# Drop stale cache entries for deleted/changed pipelines so future requests
# lazily rebuild from the refreshed registry spec.
for pipeline_id in (removed | modified):
self._pipelines.pop(pipeline_id, None)
self._pipeline_llm.pop(pipeline_id, None)
self._pipeline_specs = parsed_specs
self._api_key_policy = api_key_policy
self._registry_path = abs_path
self._registry_mtime_ns = mtime_ns
if changed:
logger.info(
"refreshed pipeline registry: {} | added={} modified={} removed={} mtime={}",
abs_path,
sorted(added),
sorted(modified),
sorted(removed),
mtime_ns,
)
elif old_mtime != mtime_ns:
logger.debug("pipeline registry mtime changed but specs were unchanged: {}", abs_path)
return changed
def load_registry(self, registry_path: str) -> None:
abs_path = self._resolve_registry_path(registry_path)
if not osp.exists(abs_path):
raise ValueError(f"pipeline registry file not found: {abs_path}")
registry = self._read_registry(abs_path)
mtime_ns = self._stat_registry_mtime_ns(abs_path)
self._apply_registry(abs_path=abs_path, registry=registry, mtime_ns=mtime_ns)
def refresh_registry_if_needed(
self, registry_path: Optional[str] = None, force: bool = False
) -> bool:
abs_path = (
self._resolve_registry_path(registry_path)
if registry_path
else self._registry_path
)
if not abs_path:
raise ValueError("registry path is not initialized")
if not osp.exists(abs_path):
raise ValueError(f"pipeline registry file not found: {abs_path}")
mtime_ns = self._stat_registry_mtime_ns(abs_path)
with self._lock:
if not force and self._registry_mtime_ns == mtime_ns:
return False
registry = self._read_registry(abs_path)
return self._apply_registry(abs_path=abs_path, registry=registry, mtime_ns=mtime_ns)
def _resolve_config_path(self, config_file: str) -> str:
path = FsPath(config_file)
@@ -85,32 +152,25 @@ class ServerPipelineManager:
)
config_file = spec.get("config_file")
overrides = spec.get("overrides", {})
registry_llm_name = spec.get("llm_name")
if config_file:
loaded_cfg = load_tyro_conf(self._resolve_config_path(config_file))
if hasattr(loaded_cfg, "setup"):
cfg = loaded_cfg
else:
logger.warning(
f"config_file for pipeline `{pipeline_id}` did not deserialize to config object; "
"falling back to default config and applying pipeline-level overrides."
raise ValueError(
"config_file for pipeline "
f"`{pipeline_id}` did not deserialize to a config object. "
"Rebuild the pipeline via /v1/pipelines to regenerate a "
"valid serialized PipelineConfig file."
)
cfg = copy.deepcopy(self.default_config)
else:
cfg = copy.deepcopy(self.default_config)
if not isinstance(overrides, dict):
raise ValueError(
f"pipeline `overrides` for `{pipeline_id}` must be an object."
)
for key, value in overrides.items():
if not hasattr(cfg, key):
raise ValueError(
f"unknown override field `{key}` for pipeline `{pipeline_id}`"
)
setattr(cfg, key, value)
if registry_llm_name is not None and hasattr(cfg, "llm_name"):
setattr(cfg, "llm_name", registry_llm_name)
p = cfg.setup()
llm_name = getattr(cfg, "llm_name", "unknown-model")
llm_name = str(getattr(cfg, "llm_name", registry_llm_name or "unknown-model"))
return p, llm_name
def _authorize(self, api_key: str, pipeline_id: str) -> None:
@@ -138,29 +198,33 @@ class ServerPipelineManager:
or app_id
)
if not pipeline_id:
key_policy = (
self._api_key_policy.get(api_key, {}) if self._api_key_policy else {}
)
pipeline_id = key_policy.get(
"default_pipeline_id", self.default_pipeline_id
)
with self._lock:
if not pipeline_id:
key_policy = (
self._api_key_policy.get(api_key, {}) if self._api_key_policy else {}
)
pipeline_id = key_policy.get(
"default_pipeline_id", self.default_pipeline_id
)
if pipeline_id not in self._pipeline_specs:
raise HTTPException(
status_code=404, detail=f"Unknown pipeline_id: {pipeline_id}"
)
if pipeline_id not in self._pipeline_specs:
raise HTTPException(
status_code=404, detail=f"Unknown pipeline_id: {pipeline_id}"
)
self._authorize(api_key, pipeline_id)
self._authorize(api_key, pipeline_id)
return pipeline_id
def get_pipeline(self, pipeline_id: str) -> Tuple[Pipeline, str]:
cached = self._pipelines.get(pipeline_id)
if cached is not None:
return cached, self._pipeline_llm[pipeline_id]
with self._lock:
cached = self._pipelines.get(pipeline_id)
if cached is not None:
return cached, self._pipeline_llm[pipeline_id]
pipeline_obj, llm_name = self._build_pipeline(pipeline_id)
self._pipelines[pipeline_id] = pipeline_obj
self._pipeline_llm[pipeline_id] = llm_name
# Build while holding the lock to avoid duplicate construction for
# the same pipeline on concurrent first requests.
pipeline_obj, llm_name = self._build_pipeline(pipeline_id)
self._pipelines[pipeline_id] = pipeline_obj
self._pipeline_llm[pipeline_id] = llm_name
logger.info(f"lazy-loaded pipeline_id={pipeline_id} model={llm_name}")
return pipeline_obj, llm_name

View File

@@ -4,6 +4,7 @@ from lang_agent.config.core_config import (
LLMKeyConfig,
LLMNodeConfig,
load_tyro_conf,
resolve_llm_api_key,
)
from lang_agent.config.constants import (
@@ -12,5 +13,7 @@ from lang_agent.config.constants import (
PIPELINE_REGISTRY_PATH,
VALID_API_KEYS,
API_KEY_HEADER,
API_KEY_HEADER_NO_ERROR
API_KEY_HEADER_NO_ERROR,
_PROJECT_ROOT,
TY_BUILD_SCRIPT,
)

View File

@@ -15,3 +15,5 @@ API_KEY_HEADER = APIKeyHeader(name="Authorization", auto_error=True)
API_KEY_HEADER_NO_ERROR = APIKeyHeader(name="Authorization", auto_error=False)
VALID_API_KEYS = set(filter(None, os.environ.get("FAST_AUTH_KEYS", "").split(",")))
TY_BUILD_SCRIPT = osp.join(_PROJECT_ROOT, "lang_agent", "config", "ty_build_config.py")

View File

@@ -10,6 +10,20 @@ from dotenv import load_dotenv
load_dotenv()
def resolve_llm_api_key(api_key: Optional[str]) -> Optional[str]:
"""Resolve the API key for OpenAI-compatible providers."""
if api_key not in (None, "", "wrong-key"):
resolved_key = api_key
else:
resolved_key = os.environ.get("ALI_API_KEY") or os.environ.get("OPENAI_API_KEY")
# Some OpenAI-compatible integrations still read OPENAI_API_KEY from env.
if resolved_key and not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = resolved_key
return resolved_key
## NOTE: base classes taken from nerfstudio
class PrintableConfig:
"""
@@ -99,12 +113,12 @@ class LLMKeyConfig(InstantiateConfig):
"""api key for llm"""
def __post_init__(self):
if self.api_key == "wrong-key" or self.api_key is None:
self.api_key = os.environ.get("ALI_API_KEY")
if self.api_key is None:
logger.error(f"no ALI_API_KEY provided for embedding")
else:
logger.info("ALI_API_KEY loaded from environ")
original_api_key = self.api_key
self.api_key = resolve_llm_api_key(self.api_key)
if self.api_key is None:
logger.error("no ALI_API_KEY or OPENAI_API_KEY provided for embedding")
elif original_api_key in (None, "", "wrong-key"):
logger.info("LLM API key loaded from environment")
@dataclass

View File

@@ -96,7 +96,7 @@ class Evaluator:
df_m.to_csv(metric_f)
self.config.save_config(f"{head_path}-{n_exp}.yml")
self.config.save_config(f"{head_path}-{n_exp}.yaml")
def format_result_df(self, df:pd.DataFrame):

View File

View File

@@ -0,0 +1,33 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from lang_agent.fastapi_server.front_apis import app as front_app
from lang_agent.fastapi_server.server_dashscope import create_dashscope_router
app = FastAPI(
title="Combined Front + DashScope APIs",
description=(
"Single-process app exposing front_apis control endpoints and "
"DashScope-compatible chat endpoints."
),
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Keep existing /v1/... admin APIs unchanged.
app.include_router(front_app.router)
# Add DashScope endpoints at their existing URLs. We intentionally skip
# DashScope's root/health routes to avoid clashing with front_apis.
app.include_router(create_dashscope_router(include_meta_routes=False))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8500)

View File

@@ -4,6 +4,7 @@ import os
import os.path as osp
import sys
import json
import psycopg
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
@@ -19,10 +20,15 @@ from lang_agent.config.constants import (
MCP_CONFIG_DEFAULT_CONTENT,
PIPELINE_REGISTRY_PATH,
)
from lang_agent.config.core_config import load_tyro_conf
from lang_agent.front_api.build_server_utils import (
GRAPH_BUILD_FNCS,
update_pipeline_registry,
)
from lang_agent.components.client_tool_manager import (
ClientToolManager,
ClientToolManagerConfig,
)
class GraphConfigUpsertRequest(BaseModel):
@@ -50,6 +56,7 @@ class GraphConfigReadResponse(BaseModel):
tool_keys: List[str]
prompt_dict: Dict[str, str]
api_key: str = Field(default="")
graph_params: Dict[str, Any] = Field(default_factory=dict)
class GraphConfigListItem(BaseModel):
@@ -80,6 +87,8 @@ class PipelineCreateRequest(BaseModel):
api_key: Optional[str] = Field(default=None)
llm_name: str = Field(default="qwen-plus")
enabled: bool = Field(default=True)
# Arbitrary per-graph options forwarded to the graph build function.
graph_params: Dict[str, Any] = Field(default_factory=dict)
class PipelineSpec(BaseModel):
@@ -88,7 +97,6 @@ class PipelineSpec(BaseModel):
enabled: bool
config_file: str
llm_name: str
overrides: Dict[str, Any] = Field(default_factory=dict)
class PipelineCreateResponse(BaseModel):
@@ -114,6 +122,38 @@ class PipelineStopResponse(BaseModel):
reload_required: bool
class ConversationListItem(BaseModel):
conversation_id: str
pipeline_id: str
message_count: int
last_updated: Optional[str] = Field(default=None)
class PipelineConversationListResponse(BaseModel):
pipeline_id: str
items: List[ConversationListItem]
count: int
class ConversationMessageItem(BaseModel):
message_type: str
content: str
sequence_number: int
created_at: str
class PipelineConversationMessagesResponse(BaseModel):
pipeline_id: str
conversation_id: str
items: List[ConversationMessageItem]
count: int
class RuntimeAuthInfoResponse(BaseModel):
fast_api_key: str
source: str
class ApiKeyPolicyItem(BaseModel):
api_key: str
default_pipeline_id: Optional[str] = Field(default=None)
@@ -154,6 +194,12 @@ class McpConfigUpdateResponse(BaseModel):
tool_keys: List[str]
class McpAvailableToolsResponse(BaseModel):
available_tools: List[str] = Field(default_factory=list)
errors: List[str] = Field(default_factory=list)
servers: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
app = FastAPI(
title="Front APIs",
description="Manage graph configs and launch graph pipelines.",
@@ -190,11 +236,15 @@ async def root():
"/v1/pipelines (POST) - build config + upsert pipeline registry entry",
"/v1/pipelines (GET) - list registry pipeline specs",
"/v1/pipelines/{pipeline_id} (DELETE) - disable pipeline in registry",
"/v1/runtime-auth (GET) - show runtime FAST API key info",
"/v1/pipelines/{pipeline_id}/conversations (GET) - list pipeline conversations",
"/v1/pipelines/{pipeline_id}/conversations/{conversation_id}/messages (GET) - list messages in a conversation",
"/v1/pipelines/api-keys (GET) - list API key routing policies",
"/v1/pipelines/api-keys/{api_key} (PUT) - upsert API key routing policy",
"/v1/pipelines/api-keys/{api_key} (DELETE) - delete API key routing policy",
"/v1/tool-configs/mcp (GET)",
"/v1/tool-configs/mcp (PUT)",
"/v1/tool-configs/mcp/tools (GET)",
],
}
@@ -240,25 +290,118 @@ def _write_pipeline_registry(registry: Dict[str, Any]) -> None:
f.write("\n")
def _resolve_runtime_fast_api_key() -> RuntimeAuthInfoResponse:
"""Pick a runtime auth key from pipeline registry first, then FAST_AUTH_KEYS env."""
try:
registry = _read_pipeline_registry()
api_keys = registry.get("api_keys", {})
if isinstance(api_keys, dict):
for key in api_keys.keys():
candidate = str(key).strip()
if candidate:
return RuntimeAuthInfoResponse(
fast_api_key=candidate, source="pipeline_registry"
)
except Exception:
# fall back to env parsing below
pass
raw_env = os.environ.get("FAST_AUTH_KEYS", "")
for token in raw_env.split(","):
candidate = token.strip()
if candidate:
return RuntimeAuthInfoResponse(fast_api_key=candidate, source="env")
return RuntimeAuthInfoResponse(fast_api_key="", source="none")
def _normalize_pipeline_spec(pipeline_id: str, spec: Dict[str, Any]) -> PipelineSpec:
if not isinstance(spec, dict):
raise ValueError(f"pipeline spec for '{pipeline_id}' must be an object")
overrides = spec.get("overrides", {})
if overrides is None:
overrides = {}
if not isinstance(overrides, dict):
raise ValueError(f"`overrides` for pipeline '{pipeline_id}' must be an object")
llm_name = str(overrides.get("llm_name") or "unknown")
llm_name = str(spec.get("llm_name") or "unknown")
return PipelineSpec(
pipeline_id=pipeline_id,
graph_id=str(spec.get("graph_id") or pipeline_id),
enabled=bool(spec.get("enabled", True)),
config_file=str(spec.get("config_file") or ""),
llm_name=llm_name,
overrides=overrides,
)
def _resolve_config_path(config_file: str) -> str:
if osp.isabs(config_file):
return config_file
return osp.join(_PROJECT_ROOT, config_file)
def _normalize_deepagent_backend_name(file_backend_config: Any) -> Optional[str]:
if file_backend_config is None:
return None
type_names = {
type(file_backend_config).__name__.lower(),
getattr(getattr(file_backend_config, "_target", None), "__name__", "").lower(),
}
if any("statebk" in name for name in type_names):
return "state_bk"
if any("localshell" in name for name in type_names):
return "local_shell"
if any("daytona" in name for name in type_names):
return "daytona_sandbox"
return None
def _extract_graph_params_from_config(graph_id: Optional[str], loaded_cfg: Any) -> Dict[str, Any]:
if graph_id != "deepagent":
return {}
graph_config = getattr(loaded_cfg, "graph_config", None)
file_backend_config = getattr(graph_config, "file_backend_config", None)
if file_backend_config is None:
return {}
graph_params: Dict[str, Any] = {}
act_bkend = _normalize_deepagent_backend_name(file_backend_config)
if act_bkend:
graph_params["act_bkend"] = act_bkend
serialized_backend_config: Dict[str, Any] = {}
for key in ("skills_dir", "rt_skills_dir", "workspace_dir", "api_key"):
value = getattr(file_backend_config, key, None)
if value is not None:
serialized_backend_config[key] = value
if serialized_backend_config:
graph_params["file_backend_config"] = serialized_backend_config
return graph_params
def _load_graph_params_for_pipeline(
pipeline_id: str, graph_id: Optional[str]
) -> Dict[str, Any]:
try:
registry = _read_pipeline_registry()
pipeline_spec = registry.get("pipelines", {}).get(pipeline_id, {})
config_file = ""
if isinstance(pipeline_spec, dict):
config_file = str(pipeline_spec.get("config_file") or "").strip()
if not config_file:
fallback = osp.join(_PROJECT_ROOT, "configs", "pipelines", f"{pipeline_id}.yaml")
if osp.exists(fallback):
config_file = fallback
if not config_file:
return {}
config_path = _resolve_config_path(config_file)
if not osp.exists(config_path):
return {}
loaded_cfg = load_tyro_conf(config_path)
return _extract_graph_params_from_config(graph_id, loaded_cfg)
except Exception:
return {}
def _normalize_api_key_policy(api_key: str, policy: Dict[str, Any]) -> ApiKeyPolicyItem:
if not isinstance(policy, dict):
raise ValueError(f"api key policy for '{api_key}' must be an object")
@@ -362,6 +505,9 @@ async def get_default_graph_config(pipeline_id: str):
tool_keys=tool_keys,
prompt_dict=prompt_dict,
api_key=(active.get("api_key") or ""),
graph_params=_load_graph_params_for_pipeline(
pipeline_id, active.get("graph_id")
),
)
@@ -400,6 +546,9 @@ async def get_graph_config(pipeline_id: str, prompt_set_id: str):
tool_keys=tool_keys,
prompt_dict=prompt_dict,
api_key=(meta.get("api_key") or ""),
graph_params=_load_graph_params_for_pipeline(
pipeline_id, meta.get("graph_id")
),
)
@@ -459,6 +608,38 @@ async def update_mcp_tool_config(body: McpConfigUpdateRequest):
)
@app.get("/v1/tool-configs/mcp/tools", response_model=McpAvailableToolsResponse)
async def list_mcp_available_tools():
try:
_read_mcp_config_raw()
manager = ClientToolManager(
ClientToolManagerConfig(mcp_config_f=MCP_CONFIG_PATH)
)
servers = await manager.aget_tools_by_server()
available_tools = sorted(
{
tool_name
for server_info in servers.values()
for tool_name in server_info.get("tools", [])
}
)
errors = [
f"{server_name}: {server_info.get('error')}"
for server_name, server_info in servers.items()
if server_info.get("error")
]
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
return McpAvailableToolsResponse(
available_tools=available_tools,
errors=errors,
servers=servers,
)
@app.get("/v1/pipelines", response_model=PipelineListResponse)
async def list_running_pipelines():
try:
@@ -511,8 +692,9 @@ async def create_pipeline(body: PipelineCreateRequest):
),
)
config_file = f"configs/pipelines/{pipeline_id}.yml"
config_file = f"configs/pipelines/{pipeline_id}.yaml"
config_abs_dir = osp.join(_PROJECT_ROOT, "configs", "pipelines")
extra_params = dict(body.graph_params or {})
try:
build_fn(
pipeline_id=pipeline_id,
@@ -521,6 +703,7 @@ async def create_pipeline(body: PipelineCreateRequest):
api_key=resolved_api_key,
llm_name=body.llm_name,
pipeline_config_dir=config_abs_dir,
**extra_params,
)
update_pipeline_registry(
@@ -555,7 +738,7 @@ async def create_pipeline(body: PipelineCreateRequest):
config_file=normalized.config_file,
llm_name=normalized.llm_name,
enabled=normalized.enabled,
reload_required=True,
reload_required=False,
registry_path=PIPELINE_REGISTRY_PATH,
)
@@ -583,7 +766,129 @@ async def stop_pipeline(pipeline_id: str):
pipeline_id=pipeline_id,
status="disabled",
enabled=False,
reload_required=True,
reload_required=False,
)
@app.get("/v1/runtime-auth", response_model=RuntimeAuthInfoResponse)
async def get_runtime_auth_info():
return _resolve_runtime_fast_api_key()
@app.get(
"/v1/pipelines/{pipeline_id}/conversations",
response_model=PipelineConversationListResponse,
)
async def list_pipeline_conversations(pipeline_id: str, limit: int = 100):
if limit < 1 or limit > 500:
raise HTTPException(status_code=400, detail="limit must be between 1 and 500")
conn_str = os.environ.get("CONN_STR")
if not conn_str:
raise HTTPException(status_code=500, detail="CONN_STR not set")
try:
with psycopg.connect(conn_str) as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute(
"""
SELECT
conversation_id,
pipeline_id,
COUNT(*) AS message_count,
MAX(created_at) AS last_updated
FROM messages
WHERE pipeline_id = %s
GROUP BY conversation_id, pipeline_id
ORDER BY last_updated DESC
LIMIT %s
""",
(pipeline_id, limit),
)
rows = cur.fetchall()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
items = [
ConversationListItem(
conversation_id=str(row["conversation_id"]),
pipeline_id=str(row["pipeline_id"]),
message_count=int(row["message_count"]),
last_updated=(
row["last_updated"].isoformat() if row.get("last_updated") else None
),
)
for row in rows
]
return PipelineConversationListResponse(
pipeline_id=pipeline_id, items=items, count=len(items)
)
@app.get(
"/v1/pipelines/{pipeline_id}/conversations/{conversation_id}/messages",
response_model=PipelineConversationMessagesResponse,
)
async def get_pipeline_conversation_messages(pipeline_id: str, conversation_id: str):
conn_str = os.environ.get("CONN_STR")
if not conn_str:
raise HTTPException(status_code=500, detail="CONN_STR not set")
try:
with psycopg.connect(conn_str) as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute(
"""
SELECT 1
FROM messages
WHERE pipeline_id = %s AND conversation_id = %s
LIMIT 1
""",
(pipeline_id, conversation_id),
)
exists = cur.fetchone()
if exists is None:
raise HTTPException(
status_code=404,
detail=(
f"conversation_id '{conversation_id}' not found for "
f"pipeline '{pipeline_id}'"
),
)
cur.execute(
"""
SELECT
message_type,
content,
sequence_number,
created_at
FROM messages
WHERE pipeline_id = %s AND conversation_id = %s
ORDER BY sequence_number ASC
""",
(pipeline_id, conversation_id),
)
rows = cur.fetchall()
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
items = [
ConversationMessageItem(
message_type=str(row["message_type"]),
content=str(row["content"]),
sequence_number=int(row["sequence_number"]),
created_at=row["created_at"].isoformat() if row.get("created_at") else "",
)
for row in rows
]
return PipelineConversationMessagesResponse(
pipeline_id=pipeline_id,
conversation_id=conversation_id,
items=items,
count=len(items),
)
@@ -688,5 +993,15 @@ async def delete_pipeline_api_key_policy(api_key: str):
return ApiKeyPolicyDeleteResponse(
api_key=normalized_key,
status="deleted",
reload_required=True,
reload_required=False,
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"front_apis:app",
host="0.0.0.0",
port=8500,
reload=True,
)

View File

@@ -1,9 +1,8 @@
from fastapi import FastAPI, HTTPException, Path, Request, Depends, Security
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Path, Request, Security
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.security import APIKeyHeader
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional
import os
import os.path as osp
import sys
@@ -20,16 +19,28 @@ from lang_agent.pipeline import PipelineConfig
from lang_agent.components.server_pipeline_manager import ServerPipelineManager
from lang_agent.config.constants import PIPELINE_REGISTRY_PATH, API_KEY_HEADER, VALID_API_KEYS
# Load base config for route-level overrides (pipelines are lazy-loaded from registry)
pipeline_config = tyro.cli(PipelineConfig)
logger.info(f"starting agent with base pipeline config: \n{pipeline_config}")
def _build_default_pipeline_config() -> PipelineConfig:
"""
Build import-time defaults without parsing CLI args.
This keeps module import safe for reuse by combined apps and tests.
"""
pipeline_config = PipelineConfig()
logger.info(f"starting agent with base pipeline config: \n{pipeline_config}")
return pipeline_config
PIPELINE_MANAGER = ServerPipelineManager(
default_pipeline_id=os.environ.get("FAST_DEFAULT_PIPELINE_ID", "default"),
default_config=pipeline_config,
)
PIPELINE_MANAGER.load_registry(PIPELINE_REGISTRY_PATH)
def _build_pipeline_manager(base_config: PipelineConfig) -> ServerPipelineManager:
pipeline_manager = ServerPipelineManager(
default_pipeline_id=os.environ.get("FAST_DEFAULT_PIPELINE_ID", "default"),
default_config=base_config,
)
pipeline_manager.load_registry(PIPELINE_REGISTRY_PATH)
return pipeline_manager
pipeline_config = _build_default_pipeline_config()
PIPELINE_MANAGER = _build_pipeline_manager(pipeline_config)
async def verify_api_key(api_key: str = Security(API_KEY_HEADER)):
@@ -55,20 +66,6 @@ class DSApplicationCallRequest(BaseModel):
thread_id: Optional[str] = Field(default="3")
app = FastAPI(
title="DashScope-Compatible Application API",
description="DashScope Application.call compatible endpoint backed by pipeline.chat",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def sse_chunks_from_stream(
chunk_generator, response_id: str, model: str = "qwen-flash"
):
@@ -188,7 +185,14 @@ async def _process_dashscope_request(
app_id: Optional[str],
session_id: Optional[str],
api_key: str,
pipeline_manager: ServerPipelineManager,
):
try:
pipeline_manager.refresh_registry_if_needed()
except Exception as e:
logger.error(f"failed to refresh pipeline registry: {e}")
raise HTTPException(status_code=500, detail=f"Failed to refresh pipeline registry: {e}")
req_app_id = app_id or body.get("app_id")
body_input = body.get("input", {}) if isinstance(body.get("input"), dict) else {}
req_session_id = session_id or body_input.get("session_id")
@@ -201,10 +205,10 @@ async def _process_dashscope_request(
thread_id = body_input.get("session_id") or req_session_id or "3"
user_msg = _extract_user_message(messages)
pipeline_id = PIPELINE_MANAGER.resolve_pipeline_id(
pipeline_id = pipeline_manager.resolve_pipeline_id(
body=body, app_id=req_app_id, api_key=api_key
)
selected_pipeline, selected_model = PIPELINE_MANAGER.get_pipeline(pipeline_id)
selected_pipeline, selected_model = pipeline_manager.get_pipeline(pipeline_id)
# Namespace thread ids to prevent memory collisions across pipelines.
thread_id = f"{pipeline_id}:{thread_id}"
@@ -245,76 +249,117 @@ async def _process_dashscope_request(
return JSONResponse(content=data)
@app.post("/v1/apps/{app_id}/sessions/{session_id}/responses")
@app.post("/api/v1/apps/{app_id}/sessions/{session_id}/responses")
async def application_responses(
request: Request,
app_id: str = Path(...),
session_id: str = Path(...),
api_key: str = Depends(verify_api_key),
):
try:
body = await request.json()
return await _process_dashscope_request(
body=body,
app_id=app_id,
session_id=session_id,
api_key=api_key,
def create_dashscope_router(
pipeline_manager: Optional[ServerPipelineManager] = None,
include_meta_routes: bool = True,
) -> APIRouter:
manager = pipeline_manager or PIPELINE_MANAGER
router = APIRouter()
@router.post("/v1/apps/{app_id}/sessions/{session_id}/responses")
@router.post("/api/v1/apps/{app_id}/sessions/{session_id}/responses")
async def application_responses(
request: Request,
app_id: str = Path(...),
session_id: str = Path(...),
api_key: str = Depends(verify_api_key),
):
try:
body = await request.json()
return await _process_dashscope_request(
body=body,
app_id=app_id,
session_id=session_id,
api_key=api_key,
pipeline_manager=manager,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"DashScope-compatible endpoint error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Compatibility: some SDKs call /apps/{app_id}/completion without /v1 and
# without session in path.
@router.post("/apps/{app_id}/completion")
@router.post("/v1/apps/{app_id}/completion")
@router.post("/api/apps/{app_id}/completion")
@router.post("/api/v1/apps/{app_id}/completion")
async def application_completion(
request: Request,
app_id: str = Path(...),
api_key: str = Depends(verify_api_key),
):
try:
body = await request.json()
return await _process_dashscope_request(
body=body,
app_id=app_id,
session_id=None,
api_key=api_key,
pipeline_manager=manager,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"DashScope-compatible completion error: {e}")
raise HTTPException(status_code=500, detail=str(e))
if include_meta_routes:
@router.get("/")
async def root():
return {
"message": "DashScope Application-compatible API",
"endpoints": [
"/v1/apps/{app_id}/sessions/{session_id}/responses",
"/health",
],
}
@router.get("/health")
async def health():
return {"status": "healthy"}
return router
def create_dashscope_app(
pipeline_manager: Optional[ServerPipelineManager] = None,
) -> FastAPI:
dashscope_app = FastAPI(
title="DashScope-Compatible Application API",
description="DashScope Application.call compatible endpoint backed by pipeline.chat",
)
dashscope_app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
dashscope_app.include_router(
create_dashscope_router(
pipeline_manager=pipeline_manager,
include_meta_routes=True,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"DashScope-compatible endpoint error: {e}")
raise HTTPException(status_code=500, detail=str(e))
)
return dashscope_app
# Compatibility: some SDKs call /apps/{app_id}/completion without /v1 and without session in path
@app.post("/apps/{app_id}/completion")
@app.post("/v1/apps/{app_id}/completion")
@app.post("/api/apps/{app_id}/completion")
@app.post("/api/v1/apps/{app_id}/completion")
async def application_completion(
request: Request,
app_id: str = Path(...),
api_key: str = Depends(verify_api_key),
):
try:
body = await request.json()
return await _process_dashscope_request(
body=body,
app_id=app_id,
session_id=None,
api_key=api_key,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"DashScope-compatible completion error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
async def root():
return {
"message": "DashScope Application-compatible API",
"endpoints": [
"/v1/apps/{app_id}/sessions/{session_id}/responses",
"/health",
],
}
@app.get("/health")
async def health():
return {"status": "healthy"}
dashscope_router = create_dashscope_router(include_meta_routes=False)
app = create_dashscope_app()
if __name__ == "__main__":
# CLI parsing is intentionally only in script mode to keep module import safe.
cli_pipeline_config = tyro.cli(PipelineConfig)
logger.info(f"starting agent with CLI pipeline config: \n{cli_pipeline_config}")
cli_pipeline_manager = _build_pipeline_manager(cli_pipeline_config)
uvicorn.run(
"server_dashscope:app",
host="0.0.0.0",
port=pipeline_config.port,
reload=True,
create_dashscope_app(pipeline_manager=cli_pipeline_manager),
host=cli_pipeline_config.host,
port=cli_pipeline_config.port,
reload=False,
)

View File

@@ -1,19 +1,26 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Literal, Optional
import os
import os.path as osp
import subprocess
import json
from lang_agent.config.core_config import load_tyro_conf
from lang_agent.config.constants import TY_BUILD_SCRIPT, _PROJECT_ROOT
_PROJECT_ROOT = osp.dirname(osp.dirname(osp.dirname(osp.abspath(__file__))))
_TY_BUILD_SCRIPT = osp.join(_PROJECT_ROOT, "lang_agent", "config", "ty_build_config.py")
_DEEP_AGENT_BACKEND_ALIASES = {
"state_bk": "statebk",
"statebk": "statebk",
"local_shell": "localshell",
"localshell": "localshell",
"daytona_sandbox": "daytonasandbox",
"daytonasandbox": "daytonasandbox",
}
def opt_to_config(save_path: str, *nargs):
os.makedirs(osp.dirname(save_path), exist_ok=True)
subprocess.run(
["python", _TY_BUILD_SCRIPT, "--save-path", save_path, *nargs],
["python", TY_BUILD_SCRIPT, "--save-path", save_path, *nargs],
check=True,
cwd=_PROJECT_ROOT,
)
@@ -22,7 +29,7 @@ def opt_to_config(save_path: str, *nargs):
def _build_and_load_pipeline_config(
pipeline_id: str, pipeline_config_dir: str, cmd: List[str]
):
save_config_f = osp.join(pipeline_config_dir, f"{pipeline_id}.yml")
save_config_f = osp.join(pipeline_config_dir, f"{pipeline_id}.yaml")
opt_to_config(save_config_f, *cmd)
# TODO: think if returning the built pipeline is better or just the config obj for front_api
@@ -52,7 +59,7 @@ def update_pipeline_registry(
pipeline["enabled"] = bool(enabled)
pipeline["config_file"] = config_file
pipeline["graph_id"] = graph_id
pipeline["overrides"] = {"llm_name": llm_name}
pipeline["llm_name"] = llm_name
with open(registry_f, "w", encoding="utf-8") as f:
json.dump(registry, f, indent=4)
@@ -64,25 +71,46 @@ def build_route(
tool_keys: List[str],
api_key: str,
llm_name: str = "qwen-plus",
pipeline_config_dir="configs/pipelines",
pipeline_config_dir: str = "configs/pipelines",
**_: Any,
):
cmd_opt = [
"--pipeline.pipeline-id",
pipeline_id,
"--pipeline.llm-name",
llm_name,
"route", # ------------
"--llm-name", llm_name,
"--api-key", api_key,
"--pipeline-id", pipeline_id,
"--prompt-set-id", prompt_set,
"tool_node", # ------------
"--llm-name", llm_name,
"--api-key", api_key,
"--pipeline-id", pipeline_id,
"--prompt-set-id", prompt_set,
"--llm-name",
llm_name,
"--api-key",
api_key,
"--pipeline-id",
pipeline_id,
"--prompt-set-id",
prompt_set,
]
if tool_keys:
cmd_opt.extend(
["--tool-manager-config.client-tool-manager.tool-keys", *tool_keys]
)
# Tyro parses list options greedily across positional subcommands; repeat a
# parent-level option to terminate list parsing before `tool_node`.
cmd_opt.extend(["--pipeline-id", pipeline_id])
cmd_opt.extend(
[
"tool_node", # ------------
"--llm-name",
llm_name,
"--api-key",
api_key,
"--pipeline-id",
pipeline_id,
"--prompt-set-id",
prompt_set,
]
)
return _build_and_load_pipeline_config(pipeline_id, pipeline_config_dir, cmd_opt)
@@ -93,15 +121,24 @@ def build_react(
tool_keys: List[str],
api_key: str,
llm_name: str = "qwen-plus",
pipeline_config_dir="configs/pipelines",
pipeline_config_dir: str = "configs/pipelines",
**_: Any,
):
cmd_opt = [
"--pipeline.pipeline-id",
pipeline_id,
"--pipeline.llm-name",
llm_name,
"react", # ------------
"--llm-name", llm_name,
"--api-key", api_key,
"--pipeline-id", pipeline_id,
"--prompt-set-id", prompt_set,
]
"--llm-name",
llm_name,
"--api-key",
api_key,
"--pipeline-id",
pipeline_id,
"--prompt-set-id",
prompt_set,
]
if tool_keys:
cmd_opt.extend(
["--tool-manager-config.client-tool-manager.tool-keys", *tool_keys]
@@ -110,8 +147,77 @@ def build_react(
return _build_and_load_pipeline_config(pipeline_id, pipeline_config_dir, cmd_opt)
def build_deep_agent(
pipeline_id: str,
prompt_set: str,
tool_keys: List[str],
api_key: str,
llm_name: str = "qwen-plus",
pipeline_config_dir: str = "configs/pipelines",
act_bkend: Literal[
"local_shell",
"localshell",
"state_bk",
"statebk",
"daytona_sandbox",
"daytonasandbox",
] = "state_bk",
file_backend_config: Optional[Dict[str, Any]] = None,
**_: Any,
):
backend_subcommand = _DEEP_AGENT_BACKEND_ALIASES.get(act_bkend)
if backend_subcommand is None:
raise ValueError(
"Unsupported deepagent backend "
f"'{act_bkend}'. Expected one of {sorted(_DEEP_AGENT_BACKEND_ALIASES.keys())}"
)
cmd_opt = [
"--pipeline.pipeline-id",
pipeline_id,
"--pipeline.llm-name",
llm_name,
"deepagent",
"--llm-name",
llm_name,
"--api-key",
api_key,
"--pipeline-id",
pipeline_id,
"--prompt-set-id",
prompt_set,
]
if tool_keys:
cmd_opt.extend(
["--tool-manager-config.client-tool-manager.tool-keys", *tool_keys]
)
cmd_opt.extend(["--pipeline-id", pipeline_id])
cmd_opt.append(backend_subcommand)
if file_backend_config:
if "skills_dir" in file_backend_config and file_backend_config["skills_dir"]:
cmd_opt.extend(["--skills-dir", file_backend_config["skills_dir"]])
if (
"rt_skills_dir" in file_backend_config
and file_backend_config["rt_skills_dir"]
):
cmd_opt.extend(["--rt-skills-dir", file_backend_config["rt_skills_dir"]])
if (
"workspace_dir" in file_backend_config
and file_backend_config["workspace_dir"]
):
cmd_opt.extend(["--workspace-dir", file_backend_config["workspace_dir"]])
if "api_key" in file_backend_config and file_backend_config["api_key"]:
cmd_opt.extend(["--api-key", file_backend_config["api_key"]])
return _build_and_load_pipeline_config(pipeline_id, pipeline_config_dir, cmd_opt)
# {pipeline_id: build_function}
GRAPH_BUILD_FNCS = {
"routing": build_route,
"react": build_react,
"deepagent": build_deep_agent,
}

View File

@@ -1,8 +1,10 @@
from dataclasses import dataclass, field, is_dataclass
import os
from dataclasses import dataclass
from typing import Any
import tyro
import os.path as osp
from abc import ABC, abstractmethod
from loguru import logger
from lang_agent.config import InstantiateConfig
class BaseFilesystemBackend(ABC):
@@ -25,4 +27,25 @@ class BaseFilesystemBackend(ABC):
if hasattr(self.config, "rt_skills_dir"):
return {"skills" : [self.config.rt_skills_dir]}
else:
return {}
return {}
@dataclass
class FilesystemBackendConfig(InstantiateConfig):
"""
Shared filesystem backend config behavior.
If subclasses define these fields, this hook ensures they exist:
- skills_dir
- workspace_dir
"""
def _ensure_dir_if_present(self, attr_name: str) -> None:
path = getattr(self, attr_name, None)
if not isinstance(path, str) or not path.strip():
return
os.makedirs(path, exist_ok=True)
logger.info(f"Ensured {attr_name} exists: {path}")
def __post_init__(self) -> None:
self._ensure_dir_if_present("skills_dir")
self._ensure_dir_if_present("workspace_dir")

View File

@@ -8,13 +8,12 @@ from loguru import logger
from daytona import Daytona, DaytonaConfig, FileUpload
from langchain_daytona import DaytonaSandbox
from lang_agent.config import InstantiateConfig
from lang_agent.fs_bkends import BaseFilesystemBackend
from lang_agent.fs_bkends.base import BaseFilesystemBackend, FilesystemBackendConfig
@tyro.conf.configure(tyro.conf.SuppressFixed)
@dataclass
class DaytonaSandboxConfig(InstantiateConfig):
class DaytonaSandboxConfig(FilesystemBackendConfig):
_target: Type = field(default_factory=lambda: DaytonaSandboxBk)
api_key: Optional[str] = None
@@ -27,6 +26,7 @@ class DaytonaSandboxConfig(InstantiateConfig):
"""runtime skills path inside the sandbox (auto-set from sandbox workdir)"""
def __post_init__(self):
super().__post_init__()
if self.api_key is None:
self.api_key = os.environ.get("DAYTONA_API_KEY")
if self.api_key is None:

View File

@@ -1,21 +1,16 @@
from dataclasses import dataclass, field, is_dataclass
from typing import Type, TypedDict, Literal, Dict, List, Tuple, Optional
from dataclasses import dataclass, field
from typing import Type
import tyro
import os.path as osp
from abc import ABC, abstractmethod
import glob
from loguru import logger
from deepagents.backends.utils import create_file_data
from deepagents.backends import LocalShellBackend
from lang_agent.config import InstantiateConfig
from lang_agent.fs_bkends import BaseFilesystemBackend
from lang_agent.fs_bkends.base import BaseFilesystemBackend, FilesystemBackendConfig
@tyro.conf.configure(tyro.conf.SuppressFixed)
@dataclass
class LocalShellConfig(InstantiateConfig):
class LocalShellConfig(FilesystemBackendConfig):
_target:Type = field(default_factory=lambda:LocalShell)
workspace_dir:str = "./workspace"
@@ -38,4 +33,24 @@ class LocalShell(BaseFilesystemBackend):
self.backend = LocalShellBackend(root_dir=self.config.workspace_dir,
virtual_mode=True,
# env={"PATH": "/usr/bin:/bin"}
inherit_env=True)
inherit_env=True)
if __name__ == "__main__":
import sys
# Instantiate a LocalShell instance with the default config
config = LocalShellConfig()
shell = LocalShell(config)
# Try checking access to 'npx'
try:
result = shell.backend.execute("npx --version")
if result.exit_code == 0:
print("npx is available, version:", result.output.strip())
else:
print("npx returned non-zero exit code:", result.exit_code, file=sys.stderr)
print("output:", result.output, file=sys.stderr)
except Exception as e:
print("Could not access 'npx':", str(e), file=sys.stderr)

View File

@@ -1,16 +1,14 @@
from dataclasses import dataclass, field, is_dataclass
from typing import Type, TypedDict, Literal, Dict, List, Tuple, Optional
from dataclasses import dataclass, field
from typing import Type
import tyro
import os.path as osp
from abc import ABC, abstractmethod
import glob
from loguru import logger
from deepagents.backends.utils import create_file_data
from deepagents.backends import StateBackend
from lang_agent.config import InstantiateConfig
from lang_agent.fs_bkends import BaseFilesystemBackend
from lang_agent.fs_bkends.base import BaseFilesystemBackend, FilesystemBackendConfig
def read_as_utf8(file_path:str):
with open(file_path, "r", encoding="utf-8") as f:
@@ -31,7 +29,7 @@ def build_skill_fs_dict(skill_dir:str, virt_path:str="/skills"):
@tyro.conf.configure(tyro.conf.SuppressFixed)
@dataclass
class StateBkConfig(InstantiateConfig):
class StateBkConfig(FilesystemBackendConfig):
_target:Type = field(default_factory=lambda:StateBk)
skills_dir:str = "./assets/skills"
@@ -40,10 +38,6 @@ class StateBkConfig(InstantiateConfig):
rt_skills_dir:str = "/skills"
"""path to directory with skills in runtime directory"""
def __post_init__(self):
err_msg = f"{self.skills_dir} does not exist"
assert osp.exists(self.skills_dir), err_msg
class StateBk(BaseFilesystemBackend):
def __init__(self, config:StateBkConfig):

View File

@@ -61,7 +61,12 @@ class DeepAgent(GraphBase):
checkpointer=self.mem,
**bkend_agent_params)
self.prompt_store = build_prompt_store(file_path=self.config.sys_prompt_f, default_key="sys_prompt")
self.prompt_store = build_prompt_store(
pipeline_id=self.config.pipeline_id,
prompt_set_id=self.config.prompt_set_id,
file_path=self.config.sys_prompt_f,
default_key="sys_prompt",
)
self.sys_prompt = self.prompt_store.get("sys_prompt")
def _agent_call(self, state:State):

View File

@@ -13,7 +13,7 @@ from langchain_core.messages import SystemMessage, HumanMessage, BaseMessage
from langchain.agents import create_agent
from langgraph.checkpoint.memory import MemorySaver
from lang_agent.config import LLMNodeConfig, load_tyro_conf
from lang_agent.config import LLMNodeConfig, load_tyro_conf, resolve_llm_api_key
from lang_agent.graphs import AnnotatedGraph, ReactGraphConfig, RoutingConfig
from lang_agent.base import GraphBase
from lang_agent.components import conv_store
@@ -62,7 +62,7 @@ class PipelineConfig(LLMNodeConfig):
host: str = "0.0.0.0"
"""where am I hosted"""
port: int = 8588
port: int = 8500
"""what is my port"""
# graph_config: AnnotatedGraph = field(default_factory=ReactGraphConfig)
@@ -104,7 +104,13 @@ class Pipeline:
if self.config.base_url is not None
else self.config.graph_config.base_url
)
self.config.graph_config.api_key = self.config.api_key
pipeline_api_key = resolve_llm_api_key(self.config.api_key)
graph_api_key = resolve_llm_api_key(
getattr(self.config.graph_config, "api_key", None)
)
resolved_api_key = pipeline_api_key or graph_api_key
self.config.api_key = resolved_api_key
self.config.graph_config.api_key = resolved_api_key
self.graph: GraphBase = self.config.graph_config.setup()

85
nginx.conf Normal file
View File

@@ -0,0 +1,85 @@
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
# Upstream backend
upstream backend {
server backend:8500;
}
server {
listen 80;
server_name localhost;
root /usr/share/nginx/html;
index index.html;
# Always revalidate the SPA entrypoint so clients pick up the latest
# hashed JS bundle after redeploys.
location = /index.html {
add_header Cache-Control "no-store, no-cache, must-revalidate, proxy-revalidate" always;
add_header Pragma "no-cache" always;
add_header Expires "0" always;
try_files $uri =404;
}
# Serve frontend static files
location / {
try_files $uri $uri/ /index.html;
}
# Proxy API requests to backend
location /v1/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_cache_bypass $http_upgrade;
}
# Proxy DashScope API requests
location /apps/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_cache_bypass $http_upgrade;
}
# Proxy v1/apps requests
location /v1/apps/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_cache_bypass $http_upgrade;
}
# Health check endpoint
location /health {
proxy_pass http://backend/health;
access_log off;
}
}
}

View File

@@ -24,7 +24,10 @@ dependencies = [
"commentjson",
"pandas",
"asgiref",
"psycopg[binary]"
"psycopg[binary]",
"deepagents",
"daytona",
"langchain_daytona"
]
[tool.setuptools.packages.find]

View File

@@ -0,0 +1,40 @@
#!/bin/bash
# Initialize database user and database
# This script runs before SQL files in docker-entrypoint-initdb.d
# It must be named with 00_ prefix to run first
set -e
APP_DB_NAME="${APP_DB_NAME:-ai_conversations}"
APP_DB_USER="${APP_DB_USER:-myapp_user}"
APP_DB_PASSWORD="${APP_DB_PASSWORD:-secure_password_123}"
echo "Creating database user: $APP_DB_USER"
# Create user
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
DO \$\$
BEGIN
IF NOT EXISTS (SELECT FROM pg_catalog.pg_user WHERE usename = '$APP_DB_USER') THEN
CREATE USER $APP_DB_USER WITH PASSWORD '$APP_DB_PASSWORD';
END IF;
END
\$\$;
ALTER USER $APP_DB_USER CREATEDB;
EOSQL
echo "Creating database: $APP_DB_NAME"
# Create database
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
SELECT 'CREATE DATABASE $APP_DB_NAME'
WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = '$APP_DB_NAME')\gexec
GRANT ALL PRIVILEGES ON DATABASE $APP_DB_NAME TO $APP_DB_USER;
EOSQL
echo "Granting schema privileges"
# Grant schema privileges
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$APP_DB_NAME" <<-EOSQL
GRANT ALL ON SCHEMA public TO $APP_DB_USER;
EOSQL
echo "Database initialization complete!"

View File

@@ -0,0 +1,25 @@
#!/bin/bash
# Run SQL initialization files in the correct database context
# This script runs after 00_init_user.sh creates the database
set -e
APP_DB_NAME="${APP_DB_NAME:-ai_conversations}"
echo "Running SQL initialization files in database: $APP_DB_NAME"
# Run create_conv_store.sql
if [ -f /docker-entrypoint-initdb.d/create_conv_store.sql ]; then
echo "Executing create_conv_store.sql..."
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$APP_DB_NAME" -f /docker-entrypoint-initdb.d/create_conv_store.sql
fi
# Run create_prompt_config.sql
if [ -f /docker-entrypoint-initdb.d/create_prompt_config.sql ]; then
echo "Executing create_prompt_config.sql..."
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$APP_DB_NAME" -f /docker-entrypoint-initdb.d/create_prompt_config.sql
fi
echo "SQL initialization files completed!"

View File

@@ -1,4 +1,5 @@
-- Create the messages table
-- This script runs in the ai_conversations database context
CREATE TABLE IF NOT EXISTS messages (
id BIGSERIAL PRIMARY KEY,
conversation_id TEXT NOT NULL,
@@ -13,4 +14,8 @@ CREATE TABLE IF NOT EXISTS messages (
CREATE INDEX IF NOT EXISTS idx_messages_conversation ON messages (conversation_id, sequence_number);
-- Index for fast lookup by pipeline_id
CREATE INDEX IF NOT EXISTS idx_messages_pipeline ON messages (pipeline_id);
CREATE INDEX IF NOT EXISTS idx_messages_pipeline ON messages (pipeline_id);
-- Grant permissions to app user
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO myapp_user;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO myapp_user;

View File

@@ -1,4 +1,5 @@
-- A prompt_set groups a full collection of prompts together.
-- This script runs in the ai_conversations database context
-- Each pipeline can have many sets (versions, A/B variants, etc.);
-- exactly one should be marked is_active per pipeline.
CREATE TABLE IF NOT EXISTS prompt_sets (
@@ -41,6 +42,10 @@ CREATE TABLE IF NOT EXISTS prompt_templates (
CREATE INDEX IF NOT EXISTS idx_prompt_templates_set_id
ON prompt_templates(prompt_set_id);
-- Grant permissions to app user
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO myapp_user;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO myapp_user;
-- Seed: initial prompt set for lang_agent/graphs/routing.py
-- The pipeline_id can be used by RoutingConfig.pipeline_id to load these prompts.
INSERT INTO prompt_sets (pipeline_id, graph_id, name, description, is_active, list)
@@ -160,3 +165,30 @@ ON CONFLICT (prompt_set_id, prompt_key)
DO UPDATE SET
content = EXCLUDED.content,
updated_at = now();
-- Seed: initial prompt set for lang_agent/graphs/deepagents_qt.py
-- DeepAgent uses prompt key "sys_prompt" with DB-first, file-fallback loading.
INSERT INTO prompt_sets (pipeline_id, graph_id, name, description, is_active, list)
SELECT
'deepagent',
'deepagent',
'default',
'Initial prompt set for DeepAgent',
true,
''
WHERE NOT EXISTS (
SELECT 1
FROM prompt_sets
WHERE pipeline_id = 'deepagent'
AND name = 'default'
);
INSERT INTO prompt_templates (prompt_set_id, prompt_key, content)
SELECT ps.id, 'sys_prompt', '你是一个擅长调用工具和处理文件任务的深度代理。'
FROM prompt_sets ps
WHERE ps.pipeline_id = 'deepagent'
AND ps.name = 'default'
ON CONFLICT (prompt_set_id, prompt_key)
DO UPDATE SET
content = EXCLUDED.content,
updated_at = now();

View File

@@ -0,0 +1,49 @@
#!/bin/bash
# Database initialization script
# This script runs all SQL initialization files in the correct order
set -e
DB_NAME="${POSTGRES_DB:-ai_conversations}"
DB_USER="${POSTGRES_USER:-myapp_user}"
DB_PASSWORD="${POSTGRES_PASSWORD:-secure_password_123}"
DB_HOST="${POSTGRES_HOST:-localhost}"
DB_PORT="${POSTGRES_PORT:-5432}"
export PGPASSWORD="$DB_PASSWORD"
echo "Initializing database: $DB_NAME on $DB_HOST:$DB_PORT"
# Wait for PostgreSQL to be ready
until psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d postgres -c '\q' 2>/dev/null; do
echo "Waiting for PostgreSQL to be ready..."
sleep 2
done
echo "PostgreSQL is ready!"
# Create database if it doesn't exist
psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d postgres <<EOF
SELECT 'CREATE DATABASE $DB_NAME'
WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = '$DB_NAME')\gexec
EOF
# Grant privileges
psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d postgres <<EOF
GRANT ALL PRIVILEGES ON DATABASE $DB_NAME TO $DB_USER;
EOF
# Run initialization scripts in order
echo "Running database initialization scripts..."
# 1. Create conversation store tables
echo "Creating conversation store tables..."
psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -f /docker-entrypoint-initdb.d/create_conv_store.sql
# 2. Create prompt configuration tables
echo "Creating prompt configuration tables..."
psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -f /docker-entrypoint-initdb.d/create_prompt_config.sql
echo "Database initialization complete!"

View File

@@ -0,0 +1,141 @@
#!/usr/bin/env python3
"""
Simple chat loop to interact with the blueberry pipeline via DashScope-compatible API.
Usage:
python scripts/py_scripts/chat_dashcope.py
The script connects to the server running on http://localhost:8500
and uses the API key from the pipeline registry.
"""
import requests
import json
import sys
from typing import Optional
# Configuration from pipeline_registry.json
API_KEY = "sk-6c7091e6a95f404efb2ec30e8f51b897626d670375cdf822d78262f24ab12367"
PIPELINE_ID = "blueberry"
BASE_URL = "http://localhost:8500"
SESSION_ID = "chat-session-1"
def send_message(
message: str,
session_id: str = SESSION_ID,
stream: bool = False,
app_id: str = PIPELINE_ID,
) -> Optional[str]:
"""Send a message to the blueberry pipeline and return the response."""
url = f"{BASE_URL}/v1/apps/{app_id}/sessions/{session_id}/responses"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
payload = {
"messages": [
{"role": "user", "content": message}
],
"stream": stream,
}
try:
if stream:
# Handle streaming response
response = requests.post(url, headers=headers, json=payload, stream=True)
response.raise_for_status()
accumulated_text = ""
for line in response.iter_lines():
if line:
line_str = line.decode('utf-8')
if line_str.startswith('data: '):
data_str = line_str[6:] # Remove 'data: ' prefix
try:
data = json.loads(data_str)
output = data.get("output", {})
text = output.get("text", "")
if text:
accumulated_text = text
# Print incremental updates (you can modify this behavior)
print(f"\rAssistant: {accumulated_text}", end="", flush=True)
if data.get("is_end", False):
print() # New line after streaming completes
return accumulated_text
except json.JSONDecodeError:
continue
return accumulated_text
else:
# Handle non-streaming response
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
output = data.get("output", {})
return output.get("text", "")
except requests.exceptions.RequestException as e:
print(f"Error sending message: {e}", file=sys.stderr)
if hasattr(e, 'response') and e.response is not None:
try:
error_detail = e.response.json()
print(f"Error details: {error_detail}", file=sys.stderr)
except:
print(f"Response status: {e.response.status_code}", file=sys.stderr)
return None
def main():
"""Main chat loop."""
print("=" * 60)
print(f"Chat with Blueberry Pipeline")
print(f"Pipeline ID: {PIPELINE_ID}")
print(f"Server: {BASE_URL}")
print(f"Session ID: {SESSION_ID}")
print("=" * 60)
print("Type your messages (or 'quit'/'exit' to end, 'stream' to toggle streaming)")
print("Streaming mode is ON by default")
print()
stream_mode = True
while True:
try:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', 'q']:
print("Goodbye!")
break
if user_input.lower() == 'stream':
stream_mode = not stream_mode
print(f"Streaming mode: {'ON' if stream_mode else 'OFF'}")
continue
print("Assistant: ", end="", flush=True)
response = send_message(user_input, stream=stream_mode)
if response is None:
print("(No response received)")
elif not stream_mode:
print(response)
# For streaming, the response is already printed incrementally
print() # Empty line for readability
except KeyboardInterrupt:
print("\n\nGoodbye!")
break
except Exception as e:
print(f"\nError: {e}", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,364 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import datetime as dt
import glob
import os
import os.path as osp
import sys
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional
import commentjson
import psycopg
PROJECT_ROOT = osp.dirname(osp.dirname(osp.dirname(osp.abspath(__file__))))
if PROJECT_ROOT not in sys.path:
sys.path.append(PROJECT_ROOT)
from lang_agent.config import load_tyro_conf # noqa: E402
from lang_agent.config.db_config_manager import DBConfigManager # noqa: E402
@dataclass
class MigrationPayload:
config_path: str
pipeline_id: str
graph_id: str
prompt_dict: Dict[str, str]
tool_keys: List[str]
api_key: Optional[str]
def _infer_pipeline_id(pipeline_conf, config_path: str) -> str:
candidates = [
getattr(pipeline_conf, "pipeline_id", None),
getattr(getattr(pipeline_conf, "graph_config", None), "pipeline_id", None),
]
for candidate in candidates:
if candidate is None:
continue
value = str(candidate).strip()
if value and value.lower() != "null":
return value
return osp.splitext(osp.basename(config_path))[0]
def _infer_graph_id(graph_conf) -> str:
if graph_conf is None:
return "unknown"
class_name = graph_conf.__class__.__name__.lower()
if "routing" in class_name or class_name == "routeconfig":
return "routing"
if "react" in class_name:
return "react"
target = getattr(graph_conf, "_target", None)
if target is not None:
target_name = getattr(target, "__name__", str(target)).lower()
if "routing" in target_name:
return "routing"
if "react" in target_name:
return "react"
return "unknown"
def _extract_tool_keys(graph_conf) -> List[str]:
if graph_conf is None:
return []
tool_cfg = getattr(graph_conf, "tool_manager_config", None)
client_cfg = getattr(tool_cfg, "client_tool_manager", None)
keys = getattr(client_cfg, "tool_keys", None)
if not keys:
return []
out: List[str] = []
seen = set()
for key in keys:
cleaned = str(key).strip()
if not cleaned or cleaned in seen:
continue
seen.add(cleaned)
out.append(cleaned)
return out
def _load_prompt_dict(prompt_path: str, default_key: str = "sys_prompt") -> Dict[str, str]:
if not prompt_path:
return {}
if not osp.exists(prompt_path):
return {}
if osp.isdir(prompt_path):
prompt_files = sorted(
p for p in glob.glob(osp.join(prompt_path, "*.txt")) if "optional" not in p
)
out = {}
for prompt_f in prompt_files:
key = osp.splitext(osp.basename(prompt_f))[0]
with open(prompt_f, "r", encoding="utf-8") as f:
out[key] = f.read()
return out
if prompt_path.endswith(".json"):
with open(prompt_path, "r", encoding="utf-8") as f:
obj = commentjson.load(f)
if not isinstance(obj, dict):
return {}
return {str(k): v if isinstance(v, str) else str(v) for k, v in obj.items()}
if prompt_path.endswith(".txt"):
with open(prompt_path, "r", encoding="utf-8") as f:
return {default_key: f.read()}
return {}
def _extract_prompt_dict(graph_conf) -> Dict[str, str]:
if graph_conf is None:
return {}
if hasattr(graph_conf, "sys_prompt_f"):
return _load_prompt_dict(str(getattr(graph_conf, "sys_prompt_f")), "sys_prompt")
if hasattr(graph_conf, "sys_promp_dir"):
return _load_prompt_dict(str(getattr(graph_conf, "sys_promp_dir")))
return {}
def _extract_tool_node_prompt_dict(graph_conf) -> Dict[str, str]:
tool_node_conf = getattr(graph_conf, "tool_node_config", None)
if tool_node_conf is None:
return {}
out: Dict[str, str] = {}
if hasattr(tool_node_conf, "tool_prompt_f"):
out.update(
_load_prompt_dict(str(getattr(tool_node_conf, "tool_prompt_f")), "tool_prompt")
)
if hasattr(tool_node_conf, "chatty_sys_prompt_f"):
out.update(
_load_prompt_dict(
str(getattr(tool_node_conf, "chatty_sys_prompt_f")), "chatty_prompt"
)
)
return out
def _prompt_key_whitelist(graph_conf, graph_id: str) -> Optional[set]:
if graph_id == "react":
return {"sys_prompt"}
if graph_id != "routing":
return None
allowed = {"route_prompt", "chat_prompt", "tool_prompt"}
tool_node_conf = getattr(graph_conf, "tool_node_config", None)
if tool_node_conf is None:
return allowed
cls_name = tool_node_conf.__class__.__name__.lower()
target = getattr(tool_node_conf, "_target", None)
target_name = getattr(target, "__name__", str(target)).lower() if target else ""
if "chatty" in cls_name or "chatty" in target_name:
allowed.add("chatty_prompt")
return allowed
def _collect_payload(config_path: str) -> MigrationPayload:
conf = load_tyro_conf(config_path)
graph_conf = getattr(conf, "graph_config", None)
graph_id = _infer_graph_id(graph_conf)
prompt_dict = _extract_prompt_dict(graph_conf)
prompt_dict.update(_extract_tool_node_prompt_dict(graph_conf))
whitelist = _prompt_key_whitelist(graph_conf, graph_id)
if whitelist is not None:
prompt_dict = {k: v for k, v in prompt_dict.items() if k in whitelist}
return MigrationPayload(
config_path=config_path,
pipeline_id=_infer_pipeline_id(conf, config_path),
graph_id=graph_id,
prompt_dict=prompt_dict,
tool_keys=_extract_tool_keys(graph_conf),
api_key=getattr(conf, "api_key", None),
)
def _resolve_config_paths(config_dir: str, config_paths: Optional[Iterable[str]]) -> List[str]:
if config_paths:
resolved = [osp.abspath(path) for path in config_paths]
else:
pattern = osp.join(osp.abspath(config_dir), "*.yaml")
resolved = sorted(glob.glob(pattern))
return [path for path in resolved if osp.exists(path)]
def _ensure_prompt_set(
conn: psycopg.Connection,
pipeline_id: str,
graph_id: str,
set_name: str,
description: str,
) -> str:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id FROM prompt_sets
WHERE pipeline_id = %s AND name = %s
ORDER BY updated_at DESC, created_at DESC
LIMIT 1
""",
(pipeline_id, set_name),
)
row = cur.fetchone()
if row is not None:
return str(row[0])
cur.execute(
"""
INSERT INTO prompt_sets (pipeline_id, graph_id, name, description, is_active, list)
VALUES (%s, %s, %s, %s, false, '')
RETURNING id
""",
(pipeline_id, graph_id, set_name, description),
)
created = cur.fetchone()
return str(created[0])
def _activate_prompt_set(conn: psycopg.Connection, pipeline_id: str, prompt_set_id: str) -> None:
with conn.cursor() as cur:
cur.execute(
"UPDATE prompt_sets SET is_active = false, updated_at = now() WHERE pipeline_id = %s",
(pipeline_id,),
)
cur.execute(
"UPDATE prompt_sets SET is_active = true, updated_at = now() WHERE id = %s",
(prompt_set_id,),
)
def _run_migration(
payloads: List[MigrationPayload],
set_name: str,
description: str,
dry_run: bool,
activate: bool,
) -> None:
for payload in payloads:
print(
f"[PLAN] pipeline={payload.pipeline_id} graph={payload.graph_id} "
f"prompts={len(payload.prompt_dict)} tools={len(payload.tool_keys)} "
f"config={payload.config_path}"
)
if dry_run:
continue
manager = DBConfigManager()
with psycopg.connect(manager.conn_str) as conn:
prompt_set_id = _ensure_prompt_set(
conn=conn,
pipeline_id=payload.pipeline_id,
graph_id=payload.graph_id,
set_name=set_name,
description=description,
)
conn.commit()
manager.set_config(
pipeline_id=payload.pipeline_id,
graph_id=payload.graph_id,
prompt_set_id=prompt_set_id,
tool_list=payload.tool_keys,
prompt_dict=payload.prompt_dict,
api_key=payload.api_key,
)
if activate:
_activate_prompt_set(
conn=conn,
pipeline_id=payload.pipeline_id,
prompt_set_id=prompt_set_id,
)
conn.commit()
print(
f"[DONE] pipeline={payload.pipeline_id} "
f"prompt_set={prompt_set_id} activate={activate}"
)
def main() -> None:
date_str = dt.date.today().isoformat()
parser = argparse.ArgumentParser(
description="Import prompt definitions from pipeline YAML files into DB prompt_sets."
)
parser.add_argument(
"--config-dir",
default=osp.join(PROJECT_ROOT, "configs", "pipelines"),
help="Directory containing pipeline YAML files.",
)
parser.add_argument(
"--config",
action="append",
default=[],
help="Specific pipeline config yaml path. Can be passed multiple times.",
)
parser.add_argument(
"--pipeline-id",
action="append",
default=[],
help="If provided, only migrate these pipeline IDs (repeatable).",
)
parser.add_argument(
"--set-name",
# default=f"migrated-{date_str}",
default="default",
help="Prompt set name to create/reuse under each pipeline.",
)
parser.add_argument(
"--description",
default="Migrated from pipeline YAML prompt files",
help="Prompt set description.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print what would be migrated without writing to DB.",
)
parser.add_argument(
"--activate",
action="store_true",
help="Mark imported set active for each migrated pipeline.",
)
args = parser.parse_args()
config_paths = _resolve_config_paths(args.config_dir, args.config)
if not config_paths:
raise SystemExit("No config files found. Provide --config or --config-dir.")
requested_pipelines = {p.strip() for p in args.pipeline_id if p.strip()}
payloads: List[MigrationPayload] = []
for config_path in config_paths:
payload = _collect_payload(config_path)
if requested_pipelines and payload.pipeline_id not in requested_pipelines:
continue
if not payload.prompt_dict:
print(f"[SKIP] no prompts found for config={config_path}")
continue
payloads.append(payload)
if not payloads:
raise SystemExit("No pipelines matched with prompt content to migrate.")
_run_migration(
payloads=payloads,
set_name=args.set_name,
description=args.description,
dry_run=args.dry_run,
activate=args.activate,
)
if __name__ == "__main__":
main()

View File

@@ -1,3 +1,8 @@
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(dirname "$(dirname "$SCRIPT_DIR")")"
cd "$PROJECT_ROOT"
source ~/.bashrc
conda init
conda activate lang

View File

@@ -0,0 +1,37 @@
#!/bin/bash
# Script to download and package Docker images for offline use
# Run this on a machine with good Docker Hub access, then transfer images.tar to China
set -e
echo "=== Docker Image Downloader for Offline Use ==="
echo ""
# Images needed
IMAGES=(
"node:20-alpine"
"python:3.12-slim"
"postgres:16-alpine"
"nginx:alpine"
)
OUTPUT_FILE="images.tar"
echo "Pulling Docker images..."
for img in "${IMAGES[@]}"; do
echo " Pulling $img..."
docker pull "$img"
done
echo ""
echo "Saving to $OUTPUT_FILE..."
docker save "${IMAGES[@]}" -o "$OUTPUT_FILE"
echo ""
echo "Done! File size:"
ls -lh "$OUTPUT_FILE"
echo ""
echo "To transfer to China machine and load:"
echo " scp images.tar user@china-machine:/path/"
echo " docker load < images.tar"

167
scripts/shell_scripts/install.sh Executable file
View File

@@ -0,0 +1,167 @@
#!/bin/bash
# Installation script for LangChain Agent
# This script sets up and runs the entire application stack
set -e
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Configuration
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(dirname "$(dirname "$SCRIPT_DIR")")"
ENV_FILE="$PROJECT_ROOT/.env"
echo -e "${GREEN}=== LangChain Agent Installation Script ===${NC}\n"
# Check for required tools
check_requirements() {
echo -e "${YELLOW}Checking requirements...${NC}"
if ! command -v docker &> /dev/null; then
echo -e "${RED}Error: Docker is not installed. Please install Docker first.${NC}"
exit 1
fi
if ! command -v docker-compose &> /dev/null && ! docker compose version &> /dev/null; then
echo -e "${RED}Error: docker-compose is not installed. Please install docker-compose first.${NC}"
exit 1
fi
echo -e "${GREEN}✓ All requirements met${NC}\n"
}
# Create .env file if it doesn't exist
create_env_file() {
if [ ! -f "$ENV_FILE" ]; then
echo -e "${YELLOW}Creating .env file...${NC}"
cat > "$ENV_FILE" <<EOF
# Database Configuration
POSTGRES_DB=ai_conversations
POSTGRES_USER=myapp_user
POSTGRES_PASSWORD=secure_password_123
POSTGRES_PORT=5432
# Backend Configuration
BACKEND_PORT=8500
# Frontend Configuration
FRONTEND_PORT=8080
# Database Connection String (used by backend)
CONN_STR=postgresql://myapp_user:secure_password_123@postgres:5432/ai_conversations
EOF
echo -e "${GREEN}✓ Created .env file at $ENV_FILE${NC}"
echo -e "${YELLOW} Please review and update the .env file with your preferred settings.${NC}\n"
else
echo -e "${GREEN}✓ .env file already exists${NC}\n"
fi
}
# Build Docker images
build_images() {
echo -e "${YELLOW}Building Docker images (including frontend)...${NC}"
cd "$PROJECT_ROOT"
# Check if docker-compose or docker compose
if docker compose version &> /dev/null; then
COMPOSE_CMD="docker compose"
else
COMPOSE_CMD="docker-compose"
fi
$COMPOSE_CMD -f docker/docker-compose.prod.yml build
echo -e "${GREEN}✓ Docker images built successfully${NC}\n"
}
# Initialize database
init_database() {
echo -e "${YELLOW}Initializing database...${NC}"
# Wait for PostgreSQL to be ready
echo "Waiting for PostgreSQL to start..."
sleep 5
# The SQL files in scripts/init_database/ will be automatically executed
# by PostgreSQL's docker-entrypoint-initdb.d mechanism
# We just need to wait a bit for it to complete
echo -e "${GREEN}✓ Database initialization will be handled automatically by PostgreSQL container${NC}\n"
}
# Start services
start_services() {
echo -e "${YELLOW}Starting services...${NC}"
cd "$PROJECT_ROOT"
# Check if docker-compose or docker compose
if docker compose version &> /dev/null; then
COMPOSE_CMD="docker compose"
else
COMPOSE_CMD="docker-compose"
fi
$COMPOSE_CMD -f docker/docker-compose.prod.yml up -d
echo -e "${GREEN}✓ Services started${NC}\n"
}
# Show status
show_status() {
# Load environment variables from .env if it exists
if [ -f "$ENV_FILE" ]; then
set -a
source "$ENV_FILE"
set +a
fi
echo -e "${GREEN}=== Installation Complete ===${NC}\n"
echo -e "Services are starting up. Please wait a moment for them to be ready.\n"
echo -e "Access points:"
echo -e " - Frontend: http://localhost:${FRONTEND_PORT:-80}"
echo -e " - Backend API: http://localhost:${BACKEND_PORT:-8500}"
echo -e " - Database: localhost:${POSTGRES_PORT:-5432}\n"
echo -e "To view logs:"
echo -e " docker-compose -f docker/docker-compose.prod.yml logs -f\n"
echo -e "To stop services:"
echo -e " docker-compose -f docker/docker-compose.prod.yml down\n"
echo -e "To restart services:"
echo -e " docker-compose -f docker/docker-compose.prod.yml restart\n"
}
# Main execution
main() {
check_requirements
create_env_file
build_images
start_services
init_database
show_status
echo -e "${YELLOW}Waiting for services to be healthy...${NC}"
sleep 10
# Load environment variables for health check
if [ -f "$ENV_FILE" ]; then
set -a
source "$ENV_FILE"
set +a
fi
# Check service health
echo -e "\n${YELLOW}Checking service health...${NC}"
sleep 5 # Give services a bit more time
if curl -f http://localhost:${BACKEND_PORT:-8500}/health &> /dev/null; then
echo -e "${GREEN}✓ Backend is healthy${NC}"
else
echo -e "${YELLOW}⚠ Backend is still starting up. Check logs with: docker-compose -f docker/docker-compose.prod.yml logs backend${NC}"
fi
}
# Run main function
main

View File

@@ -0,0 +1,46 @@
import importlib
import os
import sys
from fastapi.testclient import TestClient
os.environ.setdefault("CONN_STR", "postgresql://dummy:dummy@localhost/dummy")
def test_server_dashscope_import_is_cli_safe(monkeypatch):
"""
Importing server_dashscope should not invoke tyro.cli at module import time.
"""
import tyro
monkeypatch.setattr(
tyro,
"cli",
lambda *_args, **_kwargs: (_ for _ in ()).throw(
AssertionError("tyro.cli must not run during module import")
),
)
sys.modules.pop("fastapi_server.server_dashscope", None)
module = importlib.import_module("fastapi_server.server_dashscope")
assert module.app is not None
assert module.dashscope_router is not None
def test_combined_app_serves_front_and_dashscope_routes():
from fastapi_server.combined import app
client = TestClient(app)
# front_apis route should be available.
front_resp = client.get("/v1/pipelines/graphs")
assert front_resp.status_code == 200, front_resp.text
assert "available_graphs" in front_resp.json()
# DashScope route should exist at the same path (missing auth should not be 404).
dash_resp = client.post(
"/api/v1/apps/blueberry/sessions/test-session/responses",
json={"input": {"prompt": "hello"}, "stream": False},
)
assert dash_resp.status_code != 404, dash_resp.text

View File

@@ -30,7 +30,7 @@ except Exception as e:
# <<< Paste your running FastAPI base url here >>>
BASE_URL = os.getenv("DS_BASE_URL", "http://127.0.0.1:8588/api/")
BASE_URL = os.getenv("DS_BASE_URL", "http://127.0.0.1:8500/api/")
# Params

View File

@@ -1,13 +1,18 @@
import json
import os
from pathlib import Path
from datetime import datetime, timedelta, timezone
import importlib
from fastapi.testclient import TestClient
os.environ.setdefault("CONN_STR", "postgresql://dummy:dummy@localhost/dummy")
import fastapi_server.front_apis as front_apis
try:
front_apis = importlib.import_module("lang_agent.fastapi_server.front_apis")
except ModuleNotFoundError:
front_apis = importlib.import_module("fastapi_server.front_apis")
def _fake_build_fn(
@@ -20,7 +25,7 @@ def _fake_build_fn(
):
out_dir = Path(pipeline_config_dir)
out_dir.mkdir(parents=True, exist_ok=True)
out_file = out_dir / f"{pipeline_id}.yml"
out_file = out_dir / f"{pipeline_id}.yaml"
out_file.write_text(
json.dumps(
{
@@ -36,9 +41,101 @@ def _fake_build_fn(
return {"path": str(out_file)}
class _FakeCursor:
def __init__(self, rows):
self._rows = rows
self._result = []
self._last_sql = ""
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def execute(self, sql, params=None):
self._last_sql = sql
query = " ".join(sql.split()).lower()
params = params or ()
if "group by conversation_id, pipeline_id" in query:
pipeline_id = params[0]
limit = int(params[1])
grouped = {}
for row in self._rows:
if row["pipeline_id"] != pipeline_id:
continue
conv_id = row["conversation_id"]
if conv_id not in grouped:
grouped[conv_id] = {
"conversation_id": conv_id,
"pipeline_id": row["pipeline_id"],
"message_count": 0,
"last_updated": row["created_at"],
}
grouped[conv_id]["message_count"] += 1
if row["created_at"] > grouped[conv_id]["last_updated"]:
grouped[conv_id]["last_updated"] = row["created_at"]
values = sorted(grouped.values(), key=lambda x: x["last_updated"], reverse=True)
self._result = values[:limit]
return
if "select 1 from messages" in query:
pipeline_id, conversation_id = params
found = any(
row["pipeline_id"] == pipeline_id
and row["conversation_id"] == conversation_id
for row in self._rows
)
self._result = [{"exists": 1}] if found else []
return
if "order by sequence_number asc" in query:
pipeline_id, conversation_id = params
self._result = sorted(
[
{
"message_type": row["message_type"],
"content": row["content"],
"sequence_number": row["sequence_number"],
"created_at": row["created_at"],
}
for row in self._rows
if row["pipeline_id"] == pipeline_id
and row["conversation_id"] == conversation_id
],
key=lambda x: x["sequence_number"],
)
return
raise AssertionError(f"Unsupported SQL in test fake: {self._last_sql}")
def fetchall(self):
return self._result
def fetchone(self):
if not self._result:
return None
return self._result[0]
class _FakeConnection:
def __init__(self, rows):
self._rows = rows
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def cursor(self, row_factory=None):
return _FakeCursor(self._rows)
def test_registry_route_lifecycle(monkeypatch, tmp_path):
registry_path = tmp_path / "pipeline_registry.json"
monkeypatch.setattr(front_apis, "_PIPELINE_REGISTRY_PATH", str(registry_path))
monkeypatch.setattr(front_apis, "PIPELINE_REGISTRY_PATH", str(registry_path))
monkeypatch.setitem(front_apis.GRAPH_BUILD_FNCS, "routing", _fake_build_fn)
client = TestClient(front_apis.app)
@@ -60,7 +157,7 @@ def test_registry_route_lifecycle(monkeypatch, tmp_path):
assert create_data["pipeline_id"] == "xiaozhan"
assert create_data["graph_id"] == "routing"
assert create_data["llm_name"] == "qwen-plus"
assert create_data["reload_required"] is True
assert create_data["reload_required"] is False
list_resp = client.get("/v1/pipelines")
assert list_resp.status_code == 200, list_resp.text
@@ -91,7 +188,7 @@ def test_registry_route_lifecycle(monkeypatch, tmp_path):
def test_registry_api_key_policy_lifecycle(monkeypatch, tmp_path):
registry_path = tmp_path / "pipeline_registry.json"
monkeypatch.setattr(front_apis, "_PIPELINE_REGISTRY_PATH", str(registry_path))
monkeypatch.setattr(front_apis, "PIPELINE_REGISTRY_PATH", str(registry_path))
monkeypatch.setitem(front_apis.GRAPH_BUILD_FNCS, "routing", _fake_build_fn)
client = TestClient(front_apis.app)
@@ -136,4 +233,119 @@ def test_registry_api_key_policy_lifecycle(monkeypatch, tmp_path):
delete_data = delete_resp.json()
assert delete_data["api_key"] == "sk-test-key"
assert delete_data["status"] == "deleted"
assert delete_data["reload_required"] is True
assert delete_data["reload_required"] is False
def test_pipeline_conversation_routes(monkeypatch):
now = datetime.now(timezone.utc)
rows = [
{
"conversation_id": "agent-a:conv-1",
"pipeline_id": "agent-a",
"message_type": "human",
"content": "hello",
"sequence_number": 1,
"created_at": now - timedelta(seconds=30),
},
{
"conversation_id": "agent-a:conv-1",
"pipeline_id": "agent-a",
"message_type": "ai",
"content": "hi there",
"sequence_number": 2,
"created_at": now - timedelta(seconds=20),
},
{
"conversation_id": "agent-a:conv-2",
"pipeline_id": "agent-a",
"message_type": "human",
"content": "second thread",
"sequence_number": 1,
"created_at": now - timedelta(seconds=10),
},
{
"conversation_id": "agent-b:conv-9",
"pipeline_id": "agent-b",
"message_type": "human",
"content": "other pipeline",
"sequence_number": 1,
"created_at": now - timedelta(seconds=5),
},
]
monkeypatch.setenv("CONN_STR", "postgresql://dummy:dummy@localhost/dummy")
monkeypatch.setattr(
front_apis.psycopg,
"connect",
lambda _conn_str: _FakeConnection(rows),
)
client = TestClient(front_apis.app)
list_resp = client.get("/v1/pipelines/agent-a/conversations")
assert list_resp.status_code == 200, list_resp.text
list_data = list_resp.json()
assert list_data["pipeline_id"] == "agent-a"
assert list_data["count"] == 2
assert [item["conversation_id"] for item in list_data["items"]] == [
"agent-a:conv-2",
"agent-a:conv-1",
]
assert all(item["pipeline_id"] == "agent-a" for item in list_data["items"])
msg_resp = client.get("/v1/pipelines/agent-a/conversations/agent-a:conv-1/messages")
assert msg_resp.status_code == 200, msg_resp.text
msg_data = msg_resp.json()
assert msg_data["pipeline_id"] == "agent-a"
assert msg_data["conversation_id"] == "agent-a:conv-1"
assert msg_data["count"] == 2
assert [item["message_type"] for item in msg_data["items"]] == ["human", "ai"]
assert [item["sequence_number"] for item in msg_data["items"]] == [1, 2]
def test_pipeline_conversation_messages_404(monkeypatch):
rows = [
{
"conversation_id": "agent-b:conv-9",
"pipeline_id": "agent-b",
"message_type": "human",
"content": "other pipeline",
"sequence_number": 1,
"created_at": datetime.now(timezone.utc),
},
]
monkeypatch.setenv("CONN_STR", "postgresql://dummy:dummy@localhost/dummy")
monkeypatch.setattr(
front_apis.psycopg,
"connect",
lambda _conn_str: _FakeConnection(rows),
)
client = TestClient(front_apis.app)
resp = client.get("/v1/pipelines/agent-a/conversations/agent-b:conv-9/messages")
assert resp.status_code == 404, resp.text
assert "not found for pipeline 'agent-a'" in resp.json()["detail"]
def test_runtime_auth_info_prefers_registry_then_env(monkeypatch, tmp_path):
registry_path = tmp_path / "pipeline_registry.json"
registry_path.write_text(
json.dumps(
{
"pipelines": {},
"api_keys": {
"sk-from-registry": {"default_pipeline_id": "blueberry"},
},
}
),
encoding="utf-8",
)
monkeypatch.setattr(front_apis, "PIPELINE_REGISTRY_PATH", str(registry_path))
monkeypatch.setenv("FAST_AUTH_KEYS", "sk-from-env,other")
client = TestClient(front_apis.app)
resp = client.get("/v1/runtime-auth")
assert resp.status_code == 200, resp.text
data = resp.json()
assert data["fast_api_key"] == "sk-from-registry"
assert data["source"] == "pipeline_registry"

View File

@@ -0,0 +1,113 @@
import importlib.util
import sys
from pathlib import Path
from types import SimpleNamespace
def _load_module():
project_root = Path(__file__).resolve().parents[1]
script_path = project_root / "scripts" / "py_scripts" / "migrate_yaml_prompts_to_db.py"
spec = importlib.util.spec_from_file_location("migrate_yaml_prompts_to_db", script_path)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def test_infer_pipeline_id_falls_back_to_filename():
module = _load_module()
conf = SimpleNamespace(
pipeline_id=None,
graph_config=SimpleNamespace(pipeline_id=None),
)
out = module._infer_pipeline_id(conf, "/tmp/blueberry.yaml")
assert out == "blueberry"
def test_extract_prompt_dict_for_react_txt(tmp_path):
module = _load_module()
prompt_f = tmp_path / "sys.txt"
prompt_f.write_text("hello react", encoding="utf-8")
graph_conf = SimpleNamespace(sys_prompt_f=str(prompt_f))
prompt_dict = module._extract_prompt_dict(graph_conf)
assert prompt_dict == {"sys_prompt": "hello react"}
def test_extract_prompt_dict_for_routing_dir(tmp_path):
module = _load_module()
(tmp_path / "route_prompt.txt").write_text("route", encoding="utf-8")
(tmp_path / "chat_prompt.txt").write_text("chat", encoding="utf-8")
graph_conf = SimpleNamespace(sys_promp_dir=str(tmp_path))
prompt_dict = module._extract_prompt_dict(graph_conf)
assert prompt_dict["route_prompt"] == "route"
assert prompt_dict["chat_prompt"] == "chat"
def test_collect_payload_routing_ignores_chatty_prompt_for_tool_node(tmp_path):
module = _load_module()
prompt_dir = tmp_path / "prompts"
prompt_dir.mkdir()
(prompt_dir / "route_prompt.txt").write_text("route", encoding="utf-8")
(prompt_dir / "chat_prompt.txt").write_text("chat", encoding="utf-8")
(prompt_dir / "tool_prompt.txt").write_text("tool", encoding="utf-8")
(prompt_dir / "chatty_prompt.txt").write_text("chatty", encoding="utf-8")
class RoutingConfig:
pass
class ToolNodeConfig:
pass
graph_conf = RoutingConfig()
graph_conf.sys_promp_dir = str(prompt_dir)
graph_conf.tool_node_config = ToolNodeConfig()
graph_conf.tool_node_config.tool_prompt_f = str(prompt_dir / "tool_prompt.txt")
conf = SimpleNamespace(
pipeline_id=None,
api_key="sk",
graph_config=graph_conf,
)
module.load_tyro_conf = lambda _: conf
payload = module._collect_payload(str(tmp_path / "xiaozhan.yaml"))
assert payload.pipeline_id == "xiaozhan"
assert set(payload.prompt_dict.keys()) == {"route_prompt", "chat_prompt", "tool_prompt"}
assert "chatty_prompt" not in payload.prompt_dict
def test_collect_payload_routing_includes_chatty_prompt_for_chatty_node(tmp_path):
module = _load_module()
prompt_dir = tmp_path / "prompts"
prompt_dir.mkdir()
(prompt_dir / "route_prompt.txt").write_text("route", encoding="utf-8")
(prompt_dir / "chat_prompt.txt").write_text("chat", encoding="utf-8")
(prompt_dir / "tool_prompt.txt").write_text("tool", encoding="utf-8")
(prompt_dir / "chatty_prompt.txt").write_text("chatty", encoding="utf-8")
class RoutingConfig:
pass
class ChattyToolNodeConfig:
pass
graph_conf = RoutingConfig()
graph_conf.sys_promp_dir = str(prompt_dir)
graph_conf.tool_node_config = ChattyToolNodeConfig()
graph_conf.tool_node_config.tool_prompt_f = str(prompt_dir / "tool_prompt.txt")
graph_conf.tool_node_config.chatty_sys_prompt_f = str(
prompt_dir / "chatty_prompt.txt"
)
conf = SimpleNamespace(
pipeline_id="xiaozhan",
api_key="sk",
graph_config=graph_conf,
)
module.load_tyro_conf = lambda _: conf
payload = module._collect_payload(str(tmp_path / "xiaozhan.yaml"))
assert payload.pipeline_id == "xiaozhan"
assert "chatty_prompt" in payload.prompt_dict

View File

@@ -0,0 +1,156 @@
import json
import time
import pytest
from fastapi import HTTPException
from lang_agent.components.server_pipeline_manager import ServerPipelineManager
class _DummyPipeline:
def __init__(self, model: str):
self.model = model
class _DummyConfig:
def __init__(self, llm_name: str = "qwen-plus"):
self.llm_name = llm_name
def setup(self):
return _DummyPipeline(model=self.llm_name)
def _write_registry(path, pipelines, api_keys=None):
content = {"pipelines": pipelines, "api_keys": api_keys or {}}
path.write_text(json.dumps(content, indent=2), encoding="utf-8")
# Ensure mtime changes reliably on fast CI filesystems.
time.sleep(0.01)
def test_refresh_registry_picks_up_new_pipeline(tmp_path):
registry_path = tmp_path / "pipeline_registry.json"
_write_registry(
registry_path,
pipelines={
"default": {
"enabled": True,
"config_file": None,
"llm_name": "qwen-plus",
}
},
)
manager = ServerPipelineManager(
default_pipeline_id="default",
default_config=_DummyConfig(),
)
manager.load_registry(str(registry_path))
with pytest.raises(HTTPException) as exc_info:
manager.resolve_pipeline_id(
body={"pipeline_id": "blueberry"}, app_id=None, api_key="k1"
)
assert exc_info.value.status_code == 404
_write_registry(
registry_path,
pipelines={
"default": {
"enabled": True,
"config_file": None,
"llm_name": "qwen-plus",
},
"blueberry": {
"enabled": True,
"config_file": None,
"llm_name": "qwen-max",
},
},
)
changed = manager.refresh_registry_if_needed()
assert changed is True
resolved = manager.resolve_pipeline_id(
body={"pipeline_id": "blueberry"}, app_id=None, api_key="k1"
)
assert resolved == "blueberry"
def test_refresh_registry_invalidates_cache_for_changed_pipeline(tmp_path):
registry_path = tmp_path / "pipeline_registry.json"
_write_registry(
registry_path,
pipelines={
"blueberry": {
"enabled": True,
"config_file": None,
"llm_name": "qwen-plus",
}
},
)
manager = ServerPipelineManager(
default_pipeline_id="blueberry",
default_config=_DummyConfig(),
)
manager.load_registry(str(registry_path))
first_pipeline, first_model = manager.get_pipeline("blueberry")
assert first_model == "qwen-plus"
_write_registry(
registry_path,
pipelines={
"blueberry": {
"enabled": True,
"config_file": None,
"llm_name": "qwen-max",
}
},
)
changed = manager.refresh_registry_if_needed()
assert changed is True
second_pipeline, second_model = manager.get_pipeline("blueberry")
assert second_model == "qwen-max"
assert second_pipeline is not first_pipeline
def test_refresh_registry_applies_disabled_state_immediately(tmp_path):
registry_path = tmp_path / "pipeline_registry.json"
_write_registry(
registry_path,
pipelines={
"blueberry": {
"enabled": True,
"config_file": None,
"llm_name": "qwen-plus",
}
},
)
manager = ServerPipelineManager(
default_pipeline_id="blueberry",
default_config=_DummyConfig(),
)
manager.load_registry(str(registry_path))
manager.get_pipeline("blueberry")
_write_registry(
registry_path,
pipelines={
"blueberry": {
"enabled": False,
"config_file": None,
"llm_name": "qwen-plus",
}
},
)
changed = manager.refresh_registry_if_needed()
assert changed is True
with pytest.raises(HTTPException) as exc_info:
manager.get_pipeline("blueberry")
assert exc_info.value.status_code == 403