This commit is contained in:
jijiahao
2025-11-06 16:06:25 +08:00
12 changed files with 439 additions and 74 deletions

View File

@@ -1,31 +1,55 @@
# 使用Python 3.10作为基础镜像
FROM python:3.12-slim
FROM condaforge/mambaforge:latest
ARG MAMBA_DOCKERFILE_ACTIVATE=1
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Install dependencies in micromamba base env
RUN mamba install -y -c conda-forge \
python=3.12 \
pip \
curl \
unzip \
c-compiler \
cxx-compiler \
ca-certificates \
vim \
&& mamba clean -a -y
# 复制项目文件
COPY pyproject.toml ./
COPY fastapi_server/requirements.txt ./fastapi_server/
COPY lang_agent/ ./lang_agent/
COPY fastapi_server/ ./fastapi_server/
# 安装Python依赖
RUN pip install --no-cache-dir -r fastapi_server/requirements.txt
RUN pip install --no-cache-dir -e .
# 暴露端口
EXPOSE 8488
# 启动命令
CMD ["python", "fastapi_server/server.py"]
# Install Python dependencies inside micromamba env
RUN python -m pip install --upgrade pip \
-i https://mirrors.aliyun.com/pypi/simple/ \
--trusted-host mirrors.aliyun.com \
--default-timeout=300 && \
python -m pip install --no-cache-dir -r fastapi_server/requirements.txt \
-i https://mirrors.aliyun.com/pypi/simple/ \
--trusted-host mirrors.aliyun.com \
--default-timeout=300 && \
python -m pip install --no-cache-dir -e . \
-i https://mirrors.aliyun.com/pypi/simple/ \
--trusted-host mirrors.aliyun.com \
--default-timeout=300
EXPOSE 8588
# Create entrypoint script that initializes conda/mamba and runs the command
RUN echo '#!/bin/bash\n\
set -e\n\
# Initialize conda (mamba uses conda under the hood)\n\
eval "$(conda shell.bash hook)"\n\
conda activate base\n\
# Execute the command\n\
exec "$@"' > /entrypoint.sh && chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
CMD ["python", "fastapi_server/server_dashscope.py"]

View File

@@ -28,15 +28,6 @@ python -m pip install .
# Runables
all runnables are under scripts
# Start all mcps to websocket
1. Source all env variable
2. run the below
```bash
python scripts/start_mcp_server.py
# update configs/ws_mcp_config.json with link from the command above
python scripts/ws_start_register_tools.py
```
# Eval Dataset Format
see `scripts/make_eval_dataset.py` for example. Specific meaning of each entry:
@@ -48,4 +39,10 @@ see `scripts/make_eval_dataset.py` for example. Specific meaning of each entry:
"tool_use": ["retrieve"]} // tool uses; assume model need to use all tools if more than 1 provided
}
]
```
```
# Configure for Xiaozhi
0. Start the `fastapi_server/server_dashscope.py` file
1. Make a new model entry in `xiaozhi` with AliBL as provider.
2. Fill in the `base_url` entry. The other entries (`API_KEY`, `APP_ID`) can be garbage

9
archived.md Normal file
View File

@@ -0,0 +1,9 @@
# Start all mcps to websocket
1. Source all env variable
2. run the below
```bash
python scripts/start_mcp_server.py
# update configs/ws_mcp_config.json with link from the command above
python scripts/ws_start_register_tools.py
```

View File

@@ -1,24 +1,21 @@
version: '3.8'
services:
lang-agent-api:
build: .
container_name: lang-agent-api
ports:
- "8488:8488"
- "8588:8588"
env_file:
- ./.env
environment:
- PYTHONPATH=/app
- PYTHONUNBUFFERED=1
- RAG_FOLDER_PATH=/app/assets/xiaozhan_emb
volumes:
- ./configs:/app/configs
- ./scripts:/app/scripts
- ./assets:/app/assets
restart: unless-stopped
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8488/health')"]
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8588/health')"]
interval: 30s
timeout: 10s
retries: 3

View File

@@ -1,20 +0,0 @@
# 使用Python 3.9作为基础镜像
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 复制requirements文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制项目文件
COPY . .
# 暴露端口
EXPOSE 8488
# 启动命令
CMD ["python", "server.py"]

View File

@@ -1,18 +0,0 @@
version: '3.8'
services:
lang-agent-api:
build:
context: .
dockerfile: Dockerfile.api
ports:
- "8488:8488"
environment:
- PYTHONUNBUFFERED=1
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8488/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s

View File

@@ -1,5 +1,5 @@
fastapi>=0.104.0
uvicorn>=0.24.0
fastapi
uvicorn
pydantic>=2.0.0,<2.12
loguru>=0.7.0
python-dotenv>=1.0.0
@@ -7,6 +7,7 @@ langchain==1.0
langchain-core>=0.1.0
langchain-community
langchain-openai
openai>=1.0.0
langchain-mcp-adapters
langgraph>=0.0.40
tyro>=0.7.0

View File

@@ -0,0 +1,267 @@
from fastapi import FastAPI, HTTPException, Path, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
from typing import List, Optional
import os
import sys
import time
import json
import uvicorn
from loguru import logger
# Ensure we can import from project root
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lang_agent.pipeline import Pipeline, PipelineConfig
class DSMessage(BaseModel):
role: str
content: str
class DSApplicationCallRequest(BaseModel):
api_key: Optional[str] = Field(default=None)
app_id: Optional[str] = Field(default=None)
session_id: Optional[str] = Field(default=None)
messages: List[DSMessage]
stream: bool = Field(default=True)
# Optional overrides for pipeline behavior
thread_id: Optional[int] = Field(default=3)
app = FastAPI(title="DashScope-Compatible Application API",
description="DashScope Application.call compatible endpoint backed by pipeline.chat")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize Pipeline once
pipeline_config = PipelineConfig()
pipeline_config.llm_name = "qwen-flash"
pipeline_config.llm_provider = "openai"
pipeline_config.base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
pipeline = Pipeline(pipeline_config)
def sse_chunks_from_text(full_text: str, response_id: str, model: str = "qwen-flash", chunk_size: int = 10):
created_time = int(time.time())
for i in range(0, len(full_text), chunk_size):
chunk = full_text[i:i + chunk_size]
if chunk:
data = {
"request_id": response_id,
"code": 200,
"message": "OK",
"output": {
# Send empty during stream; many SDKs only expose output_text on final
"text": "",
"created": created_time,
"model": model,
},
"is_end": False,
}
yield f"data: {json.dumps(data)}\n\n"
final = {
"request_id": response_id,
"code": 200,
"message": "OK",
"output": {
"text": full_text,
"created": created_time,
"model": model,
},
"is_end": True,
}
yield f"data: {json.dumps(final)}\n\n"
@app.post("/v1/apps/{app_id}/sessions/{session_id}/responses")
@app.post("/api/v1/apps/{app_id}/sessions/{session_id}/responses")
async def application_responses(
request: Request,
app_id: str = Path(...),
session_id: str = Path(...),
):
try:
body = await request.json()
# Prefer path params
req_app_id = app_id or body.get("app_id")
req_session_id = session_id or body.get("session_id")
# Normalize messages
messages = body.get("messages")
if messages is None and isinstance(body.get("input"), dict):
messages = body.get("input", {}).get("messages")
if messages is None and isinstance(body.get("input"), dict):
prompt = body.get("input", {}).get("prompt")
if isinstance(prompt, str):
messages = [{"role": "user", "content": prompt}]
if not messages:
raise HTTPException(status_code=400, detail="messages is required")
# Determine stream flag
stream = body.get("stream")
if stream is None:
stream = body.get("parameters", {}).get("stream", True)
thread_id = body.get("thread_id", 3)
# Extract latest user message
user_msg = None
for m in reversed(messages):
role = m.get("role") if isinstance(m, dict) else None
content = m.get("content") if isinstance(m, dict) else None
if role == "user" and content:
user_msg = content
break
if user_msg is None:
last = messages[-1]
user_msg = last.get("content") if isinstance(last, dict) else str(last)
# Invoke pipeline (non-stream) then stream-chunk it to the client
result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id)
if not isinstance(result_text, str):
result_text = str(result_text)
response_id = f"appcmpl-{os.urandom(12).hex()}"
if stream:
return StreamingResponse(
sse_chunks_from_text(result_text, response_id=response_id, model=pipeline_config.llm_name, chunk_size=10),
media_type="text/event-stream",
)
# Non-streaming response structure
data = {
"request_id": response_id,
"code": 200,
"message": "OK",
"app_id": req_app_id,
"session_id": req_session_id,
"output": {
"text": result_text,
"created": int(time.time()),
"model": pipeline_config.llm_name,
},
"is_end": True,
}
return JSONResponse(content=data)
except HTTPException:
raise
except Exception as e:
logger.error(f"DashScope-compatible endpoint error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Compatibility: some SDKs call /apps/{app_id}/completion without /v1 and without session in path
@app.post("/apps/{app_id}/completion")
@app.post("/v1/apps/{app_id}/completion")
@app.post("/api/apps/{app_id}/completion")
@app.post("/api/v1/apps/{app_id}/completion")
async def application_completion(
request: Request,
app_id: str = Path(...),
):
try:
body = await request.json()
req_session_id = body.get("session_id")
# Normalize messages
messages = body.get("messages")
if messages is None and isinstance(body.get("input"), dict):
messages = body.get("input", {}).get("messages")
if messages is None and isinstance(body.get("input"), dict):
prompt = body.get("input", {}).get("prompt")
if isinstance(prompt, str):
messages = [{"role": "user", "content": prompt}]
if not messages:
raise HTTPException(status_code=400, detail="messages is required")
stream = body.get("stream")
if stream is None:
stream = body.get("parameters", {}).get("stream", True)
thread_id = body.get("thread_id", 3)
user_msg = None
for m in reversed(messages):
role = m.get("role") if isinstance(m, dict) else None
content = m.get("content") if isinstance(m, dict) else None
if role == "user" and content:
user_msg = content
break
if user_msg is None:
last = messages[-1]
user_msg = last.get("content") if isinstance(last, dict) else str(last)
result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id)
if not isinstance(result_text, str):
result_text = str(result_text)
response_id = f"appcmpl-{os.urandom(12).hex()}"
if stream:
return StreamingResponse(
sse_chunks_from_text(result_text, response_id=response_id, model=pipeline_config.llm_name, chunk_size=10),
media_type="text/event-stream",
)
data = {
"request_id": response_id,
"code": 200,
"message": "OK",
"app_id": app_id,
"session_id": req_session_id,
"output": {
"text": result_text,
"created": int(time.time()),
"model": pipeline_config.llm_name,
},
"is_end": True,
}
return JSONResponse(content=data)
except HTTPException:
raise
except Exception as e:
logger.error(f"DashScope-compatible completion error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
async def root():
return {"message": "DashScope Application-compatible API", "endpoints": [
"/v1/apps/{app_id}/sessions/{session_id}/responses",
"/health",
]}
@app.get("/health")
async def health():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(
"server_dashscope:app",
host="0.0.0.0",
port=8588,
reload=True,
)

View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""
Minimal test for DashScope Application.call against server_dashscope.py
Instructions:
- Start the DashScope-compatible server first, e.g.:
uvicorn fastapi_server.server_dashscope:app --host 0.0.0.0 --port 8588 --reload
- Set BASE_URL below to the server base URL you started.
- Optionally set environment variables ALI_API_KEY and ALI_APP_ID.
"""
import os
import uuid
from dotenv import load_dotenv
from loguru import logger
from http import HTTPStatus
TAG = __name__
load_dotenv()
try:
from dashscope import Application
import dashscope
except Exception as e:
print("dashscope package not found. Please install it: pip install dashscope")
raise
# <<< Paste your running FastAPI base url here >>>
BASE_URL = os.getenv("DS_BASE_URL", "http://127.0.0.1:8588/api/")
# Params
API_KEY = "salkjhglakshfs" #os.getenv("ALI_API_KEY", "test-key")
APP_ID = os.getenv("ALI_APP_ID", "test-app")
SESSION_ID = str(uuid.uuid4())
dialogue = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Say 'the world is awesome and beautiful'."},
]
call_params = {
"api_key": API_KEY,
"app_id": APP_ID,
"session_id": SESSION_ID,
"messages": dialogue,
"stream": True,
}
def main():
# Point the SDK to our FastAPI implementation
if BASE_URL and ("/api/" in BASE_URL):
dashscope.base_http_api_url = BASE_URL
# dashscope.base_http_api_url = BASE_URL
print(f"Using base_http_api_url = {dashscope.base_http_api_url}")
print("\nCalling Application.call(stream=True)...\n")
responses = Application.call(**call_params)
try:
last_text = ""
u = ""
for resp in responses:
if resp.status_code != HTTPStatus.OK:
logger.bind(tag=TAG).error(
f"code={resp.status_code}, message={resp.message}, 请参考文档https://help.aliyun.com/zh/model-studio/developer-reference/error-code"
)
continue
current_text = getattr(getattr(resp, "output", None), "text", None)
if current_text is None:
continue
# SDK流式为增量覆盖计算差量输出
if len(current_text) >= len(last_text):
delta = current_text[len(last_text):]
else:
# 避免偶发回退
delta = current_text
if delta:
u = delta
last_text = current_text
print("from stream: ", u)
except TypeError:
# 非流式回落(一次性返回)
if responses.status_code != HTTPStatus.OK:
logger.bind(tag=TAG).error(
f"code={responses.status_code}, message={responses.message}, 请参考文档https://help.aliyun.com/zh/model-studio/developer-reference/error-code"
)
u = "【阿里百练API服务响应异常】"
else:
full_text = getattr(getattr(responses, "output", None), "text", "")
logger.bind(tag=TAG).info(
f"【阿里百练API服务】完整响应长度: {len(full_text)}"
)
u = full_text
print("from non-stream: ", u)
except Exception as e:
logger.bind(tag=TAG).error(f"Error: {e}")
u = "【阿里百练API服务响应异常】"
if __name__ == "__main__":
main()

View File

@@ -121,6 +121,7 @@ class Pipeline:
def chat(self, inp:str, as_stream:bool=False, as_raw:bool=False, thread_id:int = None)->str:
# NOTE: this prompt will be overwritten by 'configs/route_sys_prompts/chat_prompt.txt' for route graph
u = """
[角色设定]
你是一个和人对话的 AI叫做小盏是半盏青年茶馆的智能助手

View File

View File

@@ -13,7 +13,7 @@ from lang_agent.base import LangToolBase
from lang_agent.client_tool_manager import ClientToolManagerConfig
from lang_agent.rag.simple import SimpleRagConfig
from lang_agent.dummy.calculator import CalculatorConfig
# from lang_agent.dummy.calculator import CalculatorConfig
# from catering_end.lang_tool import CartToolConfig, CartTool
from langchain_core.tools.structured import StructuredTool
@@ -32,7 +32,7 @@ class ToolManagerConfig(InstantiateConfig):
# cart_config: CartToolConfig = field(default_factory=CartToolConfig)
calc_config: CalculatorConfig = field(default_factory=CalculatorConfig)
# calc_config: CalculatorConfig = field(default_factory=CalculatorConfig)
def async_to_sync(async_func: Callable) -> Callable: