diff --git a/Dockerfile b/Dockerfile index 906aeaa..51073cb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,31 +1,55 @@ -# 使用Python 3.10作为基础镜像 -FROM python:3.12-slim +FROM condaforge/mambaforge:latest + +ARG MAMBA_DOCKERFILE_ACTIVATE=1 -# 设置工作目录 WORKDIR /app -# 设置环境变量 ENV PYTHONPATH=/app ENV PYTHONUNBUFFERED=1 -# 安装系统依赖 -RUN apt-get update && apt-get install -y \ - gcc \ - g++ \ - && rm -rf /var/lib/apt/lists/* +# Install dependencies in micromamba base env +RUN mamba install -y -c conda-forge \ + python=3.12 \ + pip \ + curl \ + unzip \ + c-compiler \ + cxx-compiler \ + ca-certificates \ + vim \ + && mamba clean -a -y -# 复制项目文件 COPY pyproject.toml ./ COPY fastapi_server/requirements.txt ./fastapi_server/ COPY lang_agent/ ./lang_agent/ COPY fastapi_server/ ./fastapi_server/ -# 安装Python依赖 -RUN pip install --no-cache-dir -r fastapi_server/requirements.txt -RUN pip install --no-cache-dir -e . -# 暴露端口 -EXPOSE 8488 -# 启动命令 -CMD ["python", "fastapi_server/server.py"] \ No newline at end of file +# Install Python dependencies inside micromamba env +RUN python -m pip install --upgrade pip \ + -i https://mirrors.aliyun.com/pypi/simple/ \ + --trusted-host mirrors.aliyun.com \ + --default-timeout=300 && \ + python -m pip install --no-cache-dir -r fastapi_server/requirements.txt \ + -i https://mirrors.aliyun.com/pypi/simple/ \ + --trusted-host mirrors.aliyun.com \ + --default-timeout=300 && \ + python -m pip install --no-cache-dir -e . \ + -i https://mirrors.aliyun.com/pypi/simple/ \ + --trusted-host mirrors.aliyun.com \ + --default-timeout=300 + +EXPOSE 8588 + +# Create entrypoint script that initializes conda/mamba and runs the command +RUN echo '#!/bin/bash\n\ +set -e\n\ +# Initialize conda (mamba uses conda under the hood)\n\ +eval "$(conda shell.bash hook)"\n\ +conda activate base\n\ +# Execute the command\n\ +exec "$@"' > /entrypoint.sh && chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] +CMD ["python", "fastapi_server/server_dashscope.py"] \ No newline at end of file diff --git a/README.md b/README.md index fe54b9f..4a712bc 100644 --- a/README.md +++ b/README.md @@ -28,15 +28,6 @@ python -m pip install . # Runables all runnables are under scripts -# Start all mcps to websocket -1. Source all env variable -2. run the below -```bash -python scripts/start_mcp_server.py - -# update configs/ws_mcp_config.json with link from the command above -python scripts/ws_start_register_tools.py -``` # Eval Dataset Format see `scripts/make_eval_dataset.py` for example. Specific meaning of each entry: @@ -48,4 +39,10 @@ see `scripts/make_eval_dataset.py` for example. Specific meaning of each entry: "tool_use": ["retrieve"]} // tool uses; assume model need to use all tools if more than 1 provided } ] -``` \ No newline at end of file +``` + + +# Configure for Xiaozhi +0. Start the `fastapi_server/server_dashscope.py` file +1. Make a new model entry in `xiaozhi` with AliBL as provider. +2. Fill in the `base_url` entry. The other entries (`API_KEY`, `APP_ID`) can be garbage \ No newline at end of file diff --git a/archived.md b/archived.md new file mode 100644 index 0000000..b625636 --- /dev/null +++ b/archived.md @@ -0,0 +1,9 @@ +# Start all mcps to websocket +1. Source all env variable +2. run the below +```bash +python scripts/start_mcp_server.py + +# update configs/ws_mcp_config.json with link from the command above +python scripts/ws_start_register_tools.py +``` \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index e4af20d..2179c9b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,24 +1,21 @@ -version: '3.8' - services: lang-agent-api: build: . container_name: lang-agent-api ports: - - "8488:8488" + - "8588:8588" env_file: - ./.env environment: - PYTHONPATH=/app - PYTHONUNBUFFERED=1 - - RAG_FOLDER_PATH=/app/assets/xiaozhan_emb volumes: - ./configs:/app/configs - ./scripts:/app/scripts - ./assets:/app/assets restart: unless-stopped healthcheck: - test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8488/health')"] + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8588/health')"] interval: 30s timeout: 10s retries: 3 diff --git a/fastapi_server/Dockerfile.api b/fastapi_server/Dockerfile.api deleted file mode 100644 index 691bd1e..0000000 --- a/fastapi_server/Dockerfile.api +++ /dev/null @@ -1,20 +0,0 @@ -# 使用Python 3.9作为基础镜像 -FROM python:3.9-slim - -# 设置工作目录 -WORKDIR /app - -# 复制requirements文件 -COPY requirements.txt . - -# 安装Python依赖 -RUN pip install --no-cache-dir -r requirements.txt - -# 复制项目文件 -COPY . . - -# 暴露端口 -EXPOSE 8488 - -# 启动命令 -CMD ["python", "server.py"] \ No newline at end of file diff --git a/fastapi_server/docker-compose.api.yml b/fastapi_server/docker-compose.api.yml deleted file mode 100644 index 8c0ae26..0000000 --- a/fastapi_server/docker-compose.api.yml +++ /dev/null @@ -1,18 +0,0 @@ -version: '3.8' - -services: - lang-agent-api: - build: - context: . - dockerfile: Dockerfile.api - ports: - - "8488:8488" - environment: - - PYTHONUNBUFFERED=1 - restart: unless-stopped - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8488/health"] - interval: 30s - timeout: 10s - retries: 3 - start_period: 40s \ No newline at end of file diff --git a/fastapi_server/requirements.txt b/fastapi_server/requirements.txt index c7f0a7e..ad49bad 100644 --- a/fastapi_server/requirements.txt +++ b/fastapi_server/requirements.txt @@ -1,5 +1,5 @@ -fastapi>=0.104.0 -uvicorn>=0.24.0 +fastapi +uvicorn pydantic>=2.0.0,<2.12 loguru>=0.7.0 python-dotenv>=1.0.0 @@ -7,6 +7,7 @@ langchain==1.0 langchain-core>=0.1.0 langchain-community langchain-openai +openai>=1.0.0 langchain-mcp-adapters langgraph>=0.0.40 tyro>=0.7.0 diff --git a/fastapi_server/server_dashscope.py b/fastapi_server/server_dashscope.py new file mode 100644 index 0000000..8079124 --- /dev/null +++ b/fastapi_server/server_dashscope.py @@ -0,0 +1,267 @@ +from fastapi import FastAPI, HTTPException, Path, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import StreamingResponse, JSONResponse +from pydantic import BaseModel, Field +from typing import List, Optional +import os +import sys +import time +import json +import uvicorn +from loguru import logger + +# Ensure we can import from project root +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from lang_agent.pipeline import Pipeline, PipelineConfig + + +class DSMessage(BaseModel): + role: str + content: str + + +class DSApplicationCallRequest(BaseModel): + api_key: Optional[str] = Field(default=None) + app_id: Optional[str] = Field(default=None) + session_id: Optional[str] = Field(default=None) + messages: List[DSMessage] + stream: bool = Field(default=True) + # Optional overrides for pipeline behavior + thread_id: Optional[int] = Field(default=3) + + +app = FastAPI(title="DashScope-Compatible Application API", + description="DashScope Application.call compatible endpoint backed by pipeline.chat") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +# Initialize Pipeline once +pipeline_config = PipelineConfig() +pipeline_config.llm_name = "qwen-flash" +pipeline_config.llm_provider = "openai" +pipeline_config.base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1" +pipeline = Pipeline(pipeline_config) + + +def sse_chunks_from_text(full_text: str, response_id: str, model: str = "qwen-flash", chunk_size: int = 10): + created_time = int(time.time()) + + for i in range(0, len(full_text), chunk_size): + chunk = full_text[i:i + chunk_size] + if chunk: + data = { + "request_id": response_id, + "code": 200, + "message": "OK", + "output": { + # Send empty during stream; many SDKs only expose output_text on final + "text": "", + "created": created_time, + "model": model, + }, + "is_end": False, + } + yield f"data: {json.dumps(data)}\n\n" + + final = { + "request_id": response_id, + "code": 200, + "message": "OK", + "output": { + "text": full_text, + "created": created_time, + "model": model, + }, + "is_end": True, + } + yield f"data: {json.dumps(final)}\n\n" + + +@app.post("/v1/apps/{app_id}/sessions/{session_id}/responses") +@app.post("/api/v1/apps/{app_id}/sessions/{session_id}/responses") +async def application_responses( + request: Request, + app_id: str = Path(...), + session_id: str = Path(...), +): + try: + body = await request.json() + + # Prefer path params + req_app_id = app_id or body.get("app_id") + req_session_id = session_id or body.get("session_id") + + # Normalize messages + messages = body.get("messages") + if messages is None and isinstance(body.get("input"), dict): + messages = body.get("input", {}).get("messages") + if messages is None and isinstance(body.get("input"), dict): + prompt = body.get("input", {}).get("prompt") + if isinstance(prompt, str): + messages = [{"role": "user", "content": prompt}] + + if not messages: + raise HTTPException(status_code=400, detail="messages is required") + + # Determine stream flag + stream = body.get("stream") + if stream is None: + stream = body.get("parameters", {}).get("stream", True) + + thread_id = body.get("thread_id", 3) + + # Extract latest user message + user_msg = None + for m in reversed(messages): + role = m.get("role") if isinstance(m, dict) else None + content = m.get("content") if isinstance(m, dict) else None + if role == "user" and content: + user_msg = content + break + if user_msg is None: + last = messages[-1] + user_msg = last.get("content") if isinstance(last, dict) else str(last) + + # Invoke pipeline (non-stream) then stream-chunk it to the client + result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id) + if not isinstance(result_text, str): + result_text = str(result_text) + + response_id = f"appcmpl-{os.urandom(12).hex()}" + + if stream: + return StreamingResponse( + sse_chunks_from_text(result_text, response_id=response_id, model=pipeline_config.llm_name, chunk_size=10), + media_type="text/event-stream", + ) + + # Non-streaming response structure + data = { + "request_id": response_id, + "code": 200, + "message": "OK", + "app_id": req_app_id, + "session_id": req_session_id, + "output": { + "text": result_text, + "created": int(time.time()), + "model": pipeline_config.llm_name, + }, + "is_end": True, + } + return JSONResponse(content=data) + + except HTTPException: + raise + except Exception as e: + logger.error(f"DashScope-compatible endpoint error: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +# Compatibility: some SDKs call /apps/{app_id}/completion without /v1 and without session in path +@app.post("/apps/{app_id}/completion") +@app.post("/v1/apps/{app_id}/completion") +@app.post("/api/apps/{app_id}/completion") +@app.post("/api/v1/apps/{app_id}/completion") +async def application_completion( + request: Request, + app_id: str = Path(...), +): + try: + body = await request.json() + + req_session_id = body.get("session_id") + + # Normalize messages + messages = body.get("messages") + if messages is None and isinstance(body.get("input"), dict): + messages = body.get("input", {}).get("messages") + if messages is None and isinstance(body.get("input"), dict): + prompt = body.get("input", {}).get("prompt") + if isinstance(prompt, str): + messages = [{"role": "user", "content": prompt}] + + if not messages: + raise HTTPException(status_code=400, detail="messages is required") + + stream = body.get("stream") + if stream is None: + stream = body.get("parameters", {}).get("stream", True) + + thread_id = body.get("thread_id", 3) + + user_msg = None + for m in reversed(messages): + role = m.get("role") if isinstance(m, dict) else None + content = m.get("content") if isinstance(m, dict) else None + if role == "user" and content: + user_msg = content + break + if user_msg is None: + last = messages[-1] + user_msg = last.get("content") if isinstance(last, dict) else str(last) + + result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id) + if not isinstance(result_text, str): + result_text = str(result_text) + + response_id = f"appcmpl-{os.urandom(12).hex()}" + + if stream: + return StreamingResponse( + sse_chunks_from_text(result_text, response_id=response_id, model=pipeline_config.llm_name, chunk_size=10), + media_type="text/event-stream", + ) + + data = { + "request_id": response_id, + "code": 200, + "message": "OK", + "app_id": app_id, + "session_id": req_session_id, + "output": { + "text": result_text, + "created": int(time.time()), + "model": pipeline_config.llm_name, + }, + "is_end": True, + } + return JSONResponse(content=data) + + except HTTPException: + raise + except Exception as e: + logger.error(f"DashScope-compatible completion error: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@app.get("/") +async def root(): + return {"message": "DashScope Application-compatible API", "endpoints": [ + "/v1/apps/{app_id}/sessions/{session_id}/responses", + "/health", + ]} + + +@app.get("/health") +async def health(): + return {"status": "healthy"} + + +if __name__ == "__main__": + uvicorn.run( + "server_dashscope:app", + host="0.0.0.0", + port=8588, + reload=True, + ) + + diff --git a/fastapi_server/test_dashscope_client.py b/fastapi_server/test_dashscope_client.py new file mode 100644 index 0000000..45e9e5d --- /dev/null +++ b/fastapi_server/test_dashscope_client.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +""" +Minimal test for DashScope Application.call against server_dashscope.py + +Instructions: +- Start the DashScope-compatible server first, e.g.: + uvicorn fastapi_server.server_dashscope:app --host 0.0.0.0 --port 8588 --reload +- Set BASE_URL below to the server base URL you started. +- Optionally set environment variables ALI_API_KEY and ALI_APP_ID. +""" +import os +import uuid +from dotenv import load_dotenv +from loguru import logger +from http import HTTPStatus + +TAG = __name__ + +load_dotenv() + +try: + from dashscope import Application + import dashscope +except Exception as e: + print("dashscope package not found. Please install it: pip install dashscope") + raise + + +# <<< Paste your running FastAPI base url here >>> +BASE_URL = os.getenv("DS_BASE_URL", "http://127.0.0.1:8588/api/") + +# Params +API_KEY = "salkjhglakshfs" #os.getenv("ALI_API_KEY", "test-key") +APP_ID = os.getenv("ALI_APP_ID", "test-app") +SESSION_ID = str(uuid.uuid4()) + +dialogue = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Say 'the world is awesome and beautiful'."}, +] + +call_params = { + "api_key": API_KEY, + "app_id": APP_ID, + "session_id": SESSION_ID, + "messages": dialogue, + "stream": True, +} + + +def main(): + # Point the SDK to our FastAPI implementation + if BASE_URL and ("/api/" in BASE_URL): + dashscope.base_http_api_url = BASE_URL + # dashscope.base_http_api_url = BASE_URL + print(f"Using base_http_api_url = {dashscope.base_http_api_url}") + + print("\nCalling Application.call(stream=True)...\n") + responses = Application.call(**call_params) + + try: + last_text = "" + u = "" + for resp in responses: + if resp.status_code != HTTPStatus.OK: + logger.bind(tag=TAG).error( + f"code={resp.status_code}, message={resp.message}, 请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code" + ) + continue + current_text = getattr(getattr(resp, "output", None), "text", None) + if current_text is None: + continue + # SDK流式为增量覆盖,计算差量输出 + if len(current_text) >= len(last_text): + delta = current_text[len(last_text):] + else: + # 避免偶发回退 + delta = current_text + if delta: + u = delta + last_text = current_text + + print("from stream: ", u) + except TypeError: + # 非流式回落(一次性返回) + if responses.status_code != HTTPStatus.OK: + logger.bind(tag=TAG).error( + f"code={responses.status_code}, message={responses.message}, 请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code" + ) + u = "【阿里百练API服务响应异常】" + else: + full_text = getattr(getattr(responses, "output", None), "text", "") + logger.bind(tag=TAG).info( + f"【阿里百练API服务】完整响应长度: {len(full_text)}" + ) + u = full_text + print("from non-stream: ", u) + except Exception as e: + logger.bind(tag=TAG).error(f"Error: {e}") + u = "【阿里百练API服务响应异常】" + + + +if __name__ == "__main__": + main() + + diff --git a/lang_agent/pipeline.py b/lang_agent/pipeline.py index 3fa7ded..913f81c 100644 --- a/lang_agent/pipeline.py +++ b/lang_agent/pipeline.py @@ -121,6 +121,7 @@ class Pipeline: def chat(self, inp:str, as_stream:bool=False, as_raw:bool=False, thread_id:int = None)->str: + # NOTE: this prompt will be overwritten by 'configs/route_sys_prompts/chat_prompt.txt' for route graph u = """ [角色设定] 你是一个和人对话的 AI,叫做小盏,是半盏青年茶馆的智能助手 diff --git a/lang_agent/test.py b/lang_agent/test.py deleted file mode 100644 index e69de29..0000000 diff --git a/lang_agent/tool_manager.py b/lang_agent/tool_manager.py index 335b196..997b553 100644 --- a/lang_agent/tool_manager.py +++ b/lang_agent/tool_manager.py @@ -13,7 +13,7 @@ from lang_agent.base import LangToolBase from lang_agent.client_tool_manager import ClientToolManagerConfig from lang_agent.rag.simple import SimpleRagConfig -from lang_agent.dummy.calculator import CalculatorConfig +# from lang_agent.dummy.calculator import CalculatorConfig # from catering_end.lang_tool import CartToolConfig, CartTool from langchain_core.tools.structured import StructuredTool @@ -32,7 +32,7 @@ class ToolManagerConfig(InstantiateConfig): # cart_config: CartToolConfig = field(default_factory=CartToolConfig) - calc_config: CalculatorConfig = field(default_factory=CalculatorConfig) + # calc_config: CalculatorConfig = field(default_factory=CalculatorConfig) def async_to_sync(async_func: Callable) -> Callable: