From 7f82faa7b934ba59f95108a1480acee1f6b25602 Mon Sep 17 00:00:00 2001 From: goulustis Date: Mon, 3 Nov 2025 16:11:06 +0800 Subject: [PATCH 01/10] spelling --- scripts/make_eval_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/make_eval_dataset.py b/scripts/make_eval_dataset.py index 46aaae1..1c89e0a 100644 --- a/scripts/make_eval_dataset.py +++ b/scripts/make_eval_dataset.py @@ -24,7 +24,7 @@ examples = [ "inputs": {"text": ["我要购买一杯野星星", "我要再加一杯"]}, "outputs": {"answer": "你的野星星已经下单成功", - "tool_use": ["retrieve|get_resources", + "tool_use": ["retrieve|get_resource", "start_shopping_session", "add_to_cart", "create_wechat_pay", From 64e384023e947b2a8c8e007aac0a1c154a9520f7 Mon Sep 17 00:00:00 2001 From: goulustis Date: Mon, 3 Nov 2025 23:28:54 +0800 Subject: [PATCH 02/10] ignore sht --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index e7275c1..4aa1146 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ logs/ *.pyc *.zip -django.log \ No newline at end of file +django.log +.env \ No newline at end of file From a75989c3e648fe157fad56f83eaf44cf3bc23b22 Mon Sep 17 00:00:00 2001 From: goulustis Date: Mon, 3 Nov 2025 23:36:23 +0800 Subject: [PATCH 03/10] load dotenv --- lang_agent/config.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lang_agent/config.py b/lang_agent/config.py index c95499b..789da90 100644 --- a/lang_agent/config.py +++ b/lang_agent/config.py @@ -6,6 +6,9 @@ from typing import Dict import os from loguru import logger +from dotenv import load_dotenv + +load_dotenv() ## NOTE: base classes taken from nerfstudio class PrintableConfig: From 8a9aef87aa1df6549e042583b596ea7d466b746d Mon Sep 17 00:00:00 2001 From: jeremygan2021 Date: Tue, 4 Nov 2025 23:57:37 +0800 Subject: [PATCH 04/10] =?UTF-8?q?=E5=B0=81=E8=A3=85fastAPI=20openAI?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E8=A7=84=E8=8C=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .DS_Store | Bin 0 -> 6148 bytes .dockerignore | 45 +++++ Dockerfile | 31 ++++ docker-compose.yml | 25 +++ fastapi_server/Dockerfile.api | 20 +++ fastapi_server/OpenAI_API_README.md | 220 ++++++++++++++++++++++++ fastapi_server/README.md | 179 +++++++++++++++++++ fastapi_server/docker-compose.api.yml | 18 ++ fastapi_server/openai_client_example.py | 129 ++++++++++++++ fastapi_server/requirements.txt | 24 +++ fastapi_server/server.py | 163 ++++++++++++++++++ fastapi_server/start_server.sh | 19 ++ lang_agent/rag/simple.py | 30 +++- lang_agent/test.py | 0 lang_agent/tool_manager.py | 8 +- 15 files changed, 904 insertions(+), 7 deletions(-) create mode 100644 .DS_Store create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 docker-compose.yml create mode 100644 fastapi_server/Dockerfile.api create mode 100644 fastapi_server/OpenAI_API_README.md create mode 100644 fastapi_server/README.md create mode 100644 fastapi_server/docker-compose.api.yml create mode 100644 fastapi_server/openai_client_example.py create mode 100644 fastapi_server/requirements.txt create mode 100644 fastapi_server/server.py create mode 100755 fastapi_server/start_server.sh create mode 100644 lang_agent/test.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..022c56e1e44bf3bfee7e99e7a153f3eaff9daf03 GIT binary patch literal 6148 zcmeHKO)mpc6g`iIDiL90Bbh(YN&JH$wXm?^2hdKdL7ApX!fN*Zim>(9tT^|+7nwJ$ zEh6{IduQg&J@?~`_Vod9`*;0aU;{vtL$J}}G{@w*j2T-9D{Dk2F%CyX-WwG?-b+9; zQ@|AX+X~2Mw}BJ`+~7X`{4NanEd5Z^CYI2}E&m3X;x_hmFZ6h;__nIPjQ@xWjBA|t zTKr|?C|JiieTVf7*sFPOqEGJ}SFC4(wsNmA=9*NP;sMtf#Tm7u=cvj~C5Gf3VGwiN z4)azy>#R?7m~tMIajN+yxR*L&?u>#h#_zJjAz59{qWU!E;I-E0);x~Bu^}V0S%D~4 z#8I^B>bqNKtq&X%_It^^IqOe3idmIZ>1xL-IK}~2J2)byWc1@~T62}LOigm;;ya)7 zzw%V;khf;PpaA!5vF55rn@s^zz!dmWK)xRm4#6y7=~2HrIO!39*x<4nul1)tF_8sg z7O?cl6`J!@qNloW#Bkx{;5?nrv*2HP^mI7Q%s3&l3nvs84jwL1ceu=>&8C1U5G%0g zZ#QKBKluLqACv6P6fgz;l>)BOIqB@PrLecoG$(s)!g0nSCVr(yrEm({v0lhlyv?D; ZXNg=8vw)>XuF&j=0.104.0 +uvicorn>=0.24.0 +pydantic>=2.0.0,<2.12 +loguru>=0.7.0 +python-dotenv>=1.0.0 +langchain==1.0 +langchain-core>=0.1.0 +langchain-community +langchain-openai +langchain-mcp-adapters +langgraph>=0.0.40 +tyro>=0.7.0 +commentjson>=0.9.0 +matplotlib>=3.7.0 +Pillow>=10.0.0 +jax>=0.4.0 +httpx[socks] +dashscope +websockets>=11.0.3 +mcp>=1.8.1 +mcp-proxy>=0.8.2 +faiss-cpu +fastmcp +pandas diff --git a/fastapi_server/server.py b/fastapi_server/server.py new file mode 100644 index 0000000..394d404 --- /dev/null +++ b/fastapi_server/server.py @@ -0,0 +1,163 @@ +from fastapi import FastAPI, HTTPException, Depends, Security +from fastapi.middleware.cors import CORSMiddleware +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from pydantic import BaseModel, Field +from typing import List, Optional, Dict, Any, Union +import os +import sys +import time +import uvicorn +from loguru import logger + +# 添加父目录到系统路径,以便导入lang_agent模块 +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from lang_agent.pipeline import Pipeline, PipelineConfig + +# 定义OpenAI格式的请求模型 +class ChatMessage(BaseModel): + role: str = Field(..., description="消息角色,可以是 'system', 'user', 'assistant'") + content: str = Field(..., description="消息内容") + +class ChatCompletionRequest(BaseModel): + model: str = Field(default="qwen-flash", description="模型名称") + messages: List[ChatMessage] = Field(..., description="对话消息列表") + temperature: Optional[float] = Field(default=0.7, description="采样温度") + max_tokens: Optional[int] = Field(default=500, description="最大生成token数") + stream: Optional[bool] = Field(default=False, description="是否流式返回") + thread_id: Optional[int] = Field(default=3, description="线程ID,用于多轮对话") + +class ChatCompletionResponseChoice(BaseModel): + index: int + message: ChatMessage + finish_reason: str + +class ChatCompletionResponseUsage(BaseModel): + prompt_tokens: int + completion_tokens: int + total_tokens: int + +class ChatCompletionResponse(BaseModel): + id: str + object: str = "chat.completion" + created: int + model: str + choices: List[ChatCompletionResponseChoice] + usage: Optional[ChatCompletionResponseUsage] = None + +# 初始化FastAPI应用 +app = FastAPI(title="Lang Agent Chat API", description="使用OpenAI格式调用pipeline.invoke的聊天API") + +# 设置API密钥 +API_KEY = "123tangledup-ai" + +# 创建安全方案 +security = HTTPBearer() + +# 验证API密钥的依赖项 +async def verify_api_key(credentials: HTTPAuthorizationCredentials = Security(security)): + if credentials.credentials != API_KEY: + raise HTTPException( + status_code=401, + detail="无效的API密钥", + headers={"WWW-Authenticate": "Bearer"}, + ) + return credentials + +# 添加CORS中间件 +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# 初始化Pipeline +pipeline_config = PipelineConfig() +pipeline_config.llm_name = "qwen-flash" +pipeline_config.llm_provider = "openai" +pipeline_config.base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1" + +pipeline = Pipeline(pipeline_config) + +@app.post("/v1/chat/completions", response_model=ChatCompletionResponse) +async def chat_completions( + request: ChatCompletionRequest, + credentials: HTTPAuthorizationCredentials = Depends(verify_api_key) +): + """ + 使用OpenAI格式的聊天完成API + """ + try: + # 提取用户消息 + user_message = None + system_message = None + + for message in request.messages: + if message.role == "user": + user_message = message.content + elif message.role == "system": + system_message = message.content + + if not user_message: + raise HTTPException(status_code=400, detail="缺少用户消息") + + # 调用pipeline的chat方法 + response_content = pipeline.chat( + inp=user_message, + as_stream=request.stream, + thread_id=request.thread_id + ) + + # 构建响应 + response = ChatCompletionResponse( + id=f"chatcmpl-{os.urandom(12).hex()}", + created=int(time.time()), + model=request.model, + choices=[ + ChatCompletionResponseChoice( + index=0, + message=ChatMessage(role="assistant", content=response_content), + finish_reason="stop" + ) + ] + ) + + return response + + except Exception as e: + logger.error(f"处理聊天请求时出错: {str(e)}") + raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}") + +@app.get("/") +async def root(): + """ + 根路径,返回API信息 + """ + return { + "message": "Lang Agent Chat API", + "version": "1.0.0", + "description": "使用OpenAI格式调用pipeline.invoke的聊天API", + "authentication": "Bearer Token (API Key)", + "endpoints": { + "/v1/chat/completions": "POST - 聊天完成接口,兼容OpenAI格式,需要API密钥验证", + "/": "GET - API信息", + "/health": "GET - 健康检查接口" + } + } + +@app.get("/health") +async def health_check(): + """ + 健康检查接口 + """ + return {"status": "healthy"} + +if __name__ == "__main__": + uvicorn.run( + "server:app", + host="0.0.0.0", + port=8488, + reload=True + ) \ No newline at end of file diff --git a/fastapi_server/start_server.sh b/fastapi_server/start_server.sh new file mode 100755 index 0000000..852ab95 --- /dev/null +++ b/fastapi_server/start_server.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +echo "启动Lang Agent Chat API服务器..." + +# 检查Python环境 +if ! command -v python &> /dev/null; then + echo "错误: 未找到Python。请确保Python已安装并添加到PATH中。" + exit 1 +fi + +# 检查环境变量 +if [ -z "$ALI_API_KEY" ]; then + echo "警告: 未设置ALI_API_KEY环境变量。请确保已设置此变量。" + echo "例如: export ALI_API_KEY='your_api_key'" +fi + +# 启动服务器 +cd "$(dirname "$0")" +python server.py \ No newline at end of file diff --git a/lang_agent/rag/simple.py b/lang_agent/rag/simple.py index bb64487..f7724cd 100644 --- a/lang_agent/rag/simple.py +++ b/lang_agent/rag/simple.py @@ -21,8 +21,21 @@ class SimpleRagConfig(ToolConfig, KeyConfig): model_name:str = "text-embedding-v4" """embedding model name""" - folder_path:str = "/home/smith/projects/work/langchain-agent/assets/xiaozhan_emb" - """path to local database""" + folder_path:str = "/Users/jeremygan/Desktop/TangledupAI/lang-agent/assets/xiaozhan_emb" + """path to docker database""" + + # @property + # def folder_path(self) -> str: + # """Dynamically determine the folder path for the vector store""" + # # Check if environment variable is set + # env_path = os.environ.get("RAG_FOLDER_PATH") + # if env_path: + # return env_path + + # # Default to relative path from current working directory + # return os.path.join(os.getcwd(), "assets", "xiaozhan_emb") + + @@ -31,8 +44,19 @@ class SimpleRag(LangToolBase): self.config = config self.emb = QwenEmbeddings(self.config.api_key, self.config.model_name) + + # Determine the folder path dynamically + # folder_path = os.environ.get("RAG_FOLDER_PATH") + # if not folder_path: + # # Default to relative path from current working directory + # folder_path = os.path.join(os.getcwd(), "assets", "xiaozhan_emb") + + # logger.info(f"Loading FAISS index from: {folder_path}") + + folder_path = "/Users/jeremygan/Desktop/TangledupAI/lang-agent/assets/xiaozhan_emb" + self.vec_store = FAISS.load_local( - folder_path=self.config.folder_path, + folder_path=folder_path, embeddings=self.emb, allow_dangerous_deserialization=True # Required for LangChain >= 0.1.1 ) diff --git a/lang_agent/test.py b/lang_agent/test.py new file mode 100644 index 0000000..e69de29 diff --git a/lang_agent/tool_manager.py b/lang_agent/tool_manager.py index c63d36b..73f0ef8 100644 --- a/lang_agent/tool_manager.py +++ b/lang_agent/tool_manager.py @@ -6,14 +6,14 @@ import inspect import asyncio import os.path as osp from loguru import logger -from fastmcp.tools.tool import FunctionTool +from fastmcp.tools.tool import Tool from lang_agent.config import InstantiateConfig, ToolConfig from lang_agent.base import LangToolBase from lang_agent.rag.simple import SimpleRagConfig from lang_agent.dummy.calculator import CalculatorConfig -from catering_end.lang_tool import CartToolConfig, CartTool +# from catering_end.lang_tool import CartToolConfig, CartTool from langchain_core.tools.structured import StructuredTool import jax @@ -26,7 +26,7 @@ class ToolManagerConfig(InstantiateConfig): # tool configs here; MUST HAVE 'config' in name and must be dataclass rag_config: SimpleRagConfig = field(default_factory=SimpleRagConfig) - cart_config: CartToolConfig = field(default_factory=CartToolConfig) + # cart_config: CartToolConfig = field(default_factory=CartToolConfig) calc_config: CalculatorConfig = field(default_factory=CalculatorConfig) @@ -78,7 +78,7 @@ class ToolManager: def _get_tool_fnc(self, tool_obj:LangToolBase)->List: fnc_list = [] for fnc in tool_obj.get_tool_fnc(): - if isinstance(fnc, FunctionTool): + if isinstance(fnc, Tool): fnc = fnc.fn fnc_list.append(fnc) From 9bef61a6c3dde9fa4da10780ff08f973060a0da9 Mon Sep 17 00:00:00 2001 From: goulustis Date: Wed, 5 Nov 2025 00:16:20 +0800 Subject: [PATCH 05/10] get datapath dynamically --- lang_agent/rag/simple.py | 29 +++++++---------------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/lang_agent/rag/simple.py b/lang_agent/rag/simple.py index f7724cd..a77fff9 100644 --- a/lang_agent/rag/simple.py +++ b/lang_agent/rag/simple.py @@ -3,7 +3,7 @@ from typing import Type, List import tyro from mcp.server.fastmcp import FastMCP from loguru import logger -import os +import os.path as osp from langchain_community.vectorstores import FAISS from langchain_core.documents.base import Document @@ -21,19 +21,13 @@ class SimpleRagConfig(ToolConfig, KeyConfig): model_name:str = "text-embedding-v4" """embedding model name""" - folder_path:str = "/Users/jeremygan/Desktop/TangledupAI/lang-agent/assets/xiaozhan_emb" + folder_path:str = None """path to docker database""" - # @property - # def folder_path(self) -> str: - # """Dynamically determine the folder path for the vector store""" - # # Check if environment variable is set - # env_path = os.environ.get("RAG_FOLDER_PATH") - # if env_path: - # return env_path - - # # Default to relative path from current working directory - # return os.path.join(os.getcwd(), "assets", "xiaozhan_emb") + def __post_init__(self): + if self.folder_path is None: + self.folder_path = osp.join(osp.dirname(osp.dirname(osp.dirname(__file__))), "assets", "xiaozhan_emb") + logger.info(f"no rag database provided, using default {self.folder_path}") @@ -45,18 +39,9 @@ class SimpleRag(LangToolBase): self.emb = QwenEmbeddings(self.config.api_key, self.config.model_name) - # Determine the folder path dynamically - # folder_path = os.environ.get("RAG_FOLDER_PATH") - # if not folder_path: - # # Default to relative path from current working directory - # folder_path = os.path.join(os.getcwd(), "assets", "xiaozhan_emb") - - # logger.info(f"Loading FAISS index from: {folder_path}") - - folder_path = "/Users/jeremygan/Desktop/TangledupAI/lang-agent/assets/xiaozhan_emb" self.vec_store = FAISS.load_local( - folder_path=folder_path, + folder_path=self.config.folder_path, embeddings=self.emb, allow_dangerous_deserialization=True # Required for LangChain >= 0.1.1 ) From b2756aa9c95c77dcaca79ced88b74eb738381d2c Mon Sep 17 00:00:00 2001 From: goulustis Date: Wed, 5 Nov 2025 00:20:38 +0800 Subject: [PATCH 06/10] add post init super call --- lang_agent/rag/simple.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lang_agent/rag/simple.py b/lang_agent/rag/simple.py index a77fff9..d1b05cb 100644 --- a/lang_agent/rag/simple.py +++ b/lang_agent/rag/simple.py @@ -25,6 +25,7 @@ class SimpleRagConfig(ToolConfig, KeyConfig): """path to docker database""" def __post_init__(self): + super().__post_init__() if self.folder_path is None: self.folder_path = osp.join(osp.dirname(osp.dirname(osp.dirname(__file__))), "assets", "xiaozhan_emb") logger.info(f"no rag database provided, using default {self.folder_path}") From 47bdd9d5a589c232b520bd94ae85e014502cc009 Mon Sep 17 00:00:00 2001 From: jeremygan2021 Date: Wed, 5 Nov 2025 00:29:54 +0800 Subject: [PATCH 07/10] =?UTF-8?q?model=20=E6=9B=B4=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fastapi_server/server.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/fastapi_server/server.py b/fastapi_server/server.py index 394d404..aa9328c 100644 --- a/fastapi_server/server.py +++ b/fastapi_server/server.py @@ -26,6 +26,8 @@ class ChatCompletionRequest(BaseModel): max_tokens: Optional[int] = Field(default=500, description="最大生成token数") stream: Optional[bool] = Field(default=False, description="是否流式返回") thread_id: Optional[int] = Field(default=3, description="线程ID,用于多轮对话") + llm_provider: Optional[str] = Field(default="openai", description="LLM提供商") + base_url: Optional[str] = Field(default="https://dashscope.aliyuncs.com/compatible-mode/v1", description="LLM API基础URL") class ChatCompletionResponseChoice(BaseModel): index: int @@ -73,14 +75,6 @@ app.add_middleware( allow_headers=["*"], ) -# 初始化Pipeline -pipeline_config = PipelineConfig() -pipeline_config.llm_name = "qwen-flash" -pipeline_config.llm_provider = "openai" -pipeline_config.base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1" - -pipeline = Pipeline(pipeline_config) - @app.post("/v1/chat/completions", response_model=ChatCompletionResponse) async def chat_completions( request: ChatCompletionRequest, @@ -103,6 +97,15 @@ async def chat_completions( if not user_message: raise HTTPException(status_code=400, detail="缺少用户消息") + # 动态创建PipelineConfig + pipeline_config = PipelineConfig() + pipeline_config.llm_name = request.model + pipeline_config.llm_provider = request.llm_provider + pipeline_config.base_url = request.base_url + + # 创建新的Pipeline实例 + pipeline = Pipeline(pipeline_config) + # 调用pipeline的chat方法 response_content = pipeline.chat( inp=user_message, From 4e83426d16460e122187eb88181abfc49c0a07ef Mon Sep 17 00:00:00 2001 From: jeremygan2021 Date: Wed, 5 Nov 2025 00:37:09 +0800 Subject: [PATCH 08/10] =?UTF-8?q?=E5=B0=81=E8=A3=85fastAPI=20openAI?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E8=A7=84=E8=8C=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fastapi_server/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastapi_server/server.py b/fastapi_server/server.py index aa9328c..834d88a 100644 --- a/fastapi_server/server.py +++ b/fastapi_server/server.py @@ -91,7 +91,7 @@ async def chat_completions( for message in request.messages: if message.role == "user": user_message = message.content - elif message.role == "system": + elif message.role == "system" or message.role == "assistant": system_message = message.content if not user_message: From 304fe0879c404709020962198e9d01979d18a34d Mon Sep 17 00:00:00 2001 From: goulustis Date: Wed, 5 Nov 2025 03:00:37 +0800 Subject: [PATCH 09/10] use fake streaming --- fastapi_server/server.py | 180 +++++++++++++++++++++++++++++++++++---- 1 file changed, 165 insertions(+), 15 deletions(-) diff --git a/fastapi_server/server.py b/fastapi_server/server.py index 394d404..6f5fb88 100644 --- a/fastapi_server/server.py +++ b/fastapi_server/server.py @@ -1,12 +1,16 @@ from fastapi import FastAPI, HTTPException, Depends, Security from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any, Union import os import sys import time import uvicorn +import httpx +import openai +import json from loguru import logger # 添加父目录到系统路径,以便导入lang_agent模块 @@ -45,6 +49,74 @@ class ChatCompletionResponse(BaseModel): choices: List[ChatCompletionResponseChoice] usage: Optional[ChatCompletionResponseUsage] = None +# OpenAI客户端包装类 +class OpenAIClientWrapper: + def __init__( + self, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + timeout: float = 60.0, + model_name: str = "qwen-flash", + max_tokens: int = 500, + temperature: float = 0.7, + top_p: float = 1.0, + frequency_penalty: float = 0.0, + ): + """ + 初始化OpenAI客户端包装器 + + Args: + api_key: API密钥,如果为None则从环境变量OPENAI_API_KEY获取 + base_url: API基础URL,如果为None则从环境变量OPENAI_BASE_URL获取 + timeout: 请求超时时间(秒) + model_name: 默认模型名称 + max_tokens: 默认最大token数 + temperature: 默认采样温度 + top_p: 默认top_p参数 + frequency_penalty: 默认频率惩罚 + """ + self.api_key = api_key or os.getenv("OPENAI_API_KEY", "") + self.base_url = base_url or os.getenv("OPENAI_BASE_URL", None) + self.timeout = timeout + self.model_name = model_name + self.max_tokens = max_tokens + self.temperature = temperature + self.top_p = top_p + self.frequency_penalty = frequency_penalty + + self.client = openai.OpenAI( + api_key=self.api_key, + base_url=self.base_url, + timeout=httpx.Timeout(self.timeout) + ) + + def response(self, session_id: str, dialogue: List[Dict[str, str]], **kwargs): + """ + 生成聊天响应(流式) + + Args: + session_id: 会话ID + dialogue: 对话消息列表,格式为 [{"role": "user", "content": "..."}, ...] + **kwargs: 额外的参数,可以覆盖默认的max_tokens, temperature, top_p, frequency_penalty + + Returns: + OpenAI流式响应对象 + """ + try: + responses = self.client.chat.completions.create( + model=self.model_name, + messages=dialogue, + stream=True, + max_tokens=kwargs.get("max_tokens", self.max_tokens), + temperature=kwargs.get("temperature", self.temperature), + top_p=kwargs.get("top_p", self.top_p), + frequency_penalty=kwargs.get("frequency_penalty", self.frequency_penalty), + ) + return responses + except Exception as e: + logger.error(f"OpenAI客户端响应错误: {str(e)}") + raise + # 初始化FastAPI应用 app = FastAPI(title="Lang Agent Chat API", description="使用OpenAI格式调用pipeline.invoke的聊天API") @@ -55,14 +127,14 @@ API_KEY = "123tangledup-ai" security = HTTPBearer() # 验证API密钥的依赖项 -async def verify_api_key(credentials: HTTPAuthorizationCredentials = Security(security)): - if credentials.credentials != API_KEY: - raise HTTPException( - status_code=401, - detail="无效的API密钥", - headers={"WWW-Authenticate": "Bearer"}, - ) - return credentials +# async def verify_api_key(credentials: HTTPAuthorizationCredentials = Security(security)): +# if credentials.credentials != API_KEY: +# raise HTTPException( +# status_code=401, +# detail="无效的API密钥", +# headers={"WWW-Authenticate": "Bearer"}, +# ) +# return credentials # 添加CORS中间件 app.add_middleware( @@ -81,10 +153,64 @@ pipeline_config.base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1" pipeline = Pipeline(pipeline_config) -@app.post("/v1/chat/completions", response_model=ChatCompletionResponse) +# 初始化OpenAI客户端包装器(可选,用于直接调用OpenAI API) +openai_client = OpenAIClientWrapper( + api_key=os.getenv("OPENAI_API_KEY"), + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + timeout=60.0, + model_name="qwen-flash", + max_tokens=500, + temperature=0.7, + top_p=1.0, + frequency_penalty=0.0, +) + +def generate_streaming_chunks(full_text: str, response_id: str, model: str, chunk_size: int = 10): + """ + Generate streaming chunks from non-streaming result + """ + created_time = int(time.time()) + + # Stream content chunks + for i in range(0, len(full_text), chunk_size): + chunk = full_text[i:i + chunk_size] + if chunk: + chunk_data = { + "id": response_id, + "object": "chat.completion.chunk", + "created": created_time, + "model": model, + "choices": [ + { + "index": 0, + "delta": {"content": chunk}, + "finish_reason": None + } + ] + } + yield f"data: {json.dumps(chunk_data)}\n\n" + + # Send final chunk with finish_reason + final_chunk = { + "id": response_id, + "object": "chat.completion.chunk", + "created": created_time, + "model": model, + "choices": [ + { + "index": 0, + "delta": {}, + "finish_reason": "stop" + } + ] + } + yield f"data: {json.dumps(final_chunk)}\n\n" + yield "data: [DONE]\n\n" + +@app.post("/v1/chat/completions") async def chat_completions( - request: ChatCompletionRequest, - credentials: HTTPAuthorizationCredentials = Depends(verify_api_key) + request: ChatCompletionRequest#, + # credentials: HTTPAuthorizationCredentials = Depends(verify_api_key) ): """ 使用OpenAI格式的聊天完成API @@ -94,6 +220,7 @@ async def chat_completions( user_message = None system_message = None + # TODO: wrap this sht as human and system message for message in request.messages: if message.role == "user": user_message = message.content @@ -103,16 +230,39 @@ async def chat_completions( if not user_message: raise HTTPException(status_code=400, detail="缺少用户消息") - # 调用pipeline的chat方法 + # 调用pipeline的chat方法 (always get non-streaming result) response_content = pipeline.chat( inp=user_message, - as_stream=request.stream, + as_stream=False, # Always get full result, then chunk it if streaming thread_id=request.thread_id ) - # 构建响应 + # Ensure response_content is a string + if not isinstance(response_content, str): + response_content = str(response_content) + + logger.info(f"Pipeline response - Length: {len(response_content)}, Content: {repr(response_content[:200])}") + + if len(response_content) == 0: + logger.warning("Pipeline returned empty response!") + + response_id = f"chatcmpl-{os.urandom(12).hex()}" + + # If streaming requested, return streaming response + if request.stream: + return StreamingResponse( + generate_streaming_chunks( + full_text=response_content, + response_id=response_id, + model=request.model, + chunk_size=10 + ), + media_type="text/event-stream" + ) + + # Otherwise return normal response response = ChatCompletionResponse( - id=f"chatcmpl-{os.urandom(12).hex()}", + id=response_id, created=int(time.time()), model=request.model, choices=[ From 6d481fb9fa2afa198ee6ed14bcc8b6c00938ef27 Mon Sep 17 00:00:00 2001 From: goulustis Date: Wed, 5 Nov 2025 03:00:50 +0800 Subject: [PATCH 10/10] stream test --- fastapi_server/test_openai_client.py | 79 ++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 fastapi_server/test_openai_client.py diff --git a/fastapi_server/test_openai_client.py b/fastapi_server/test_openai_client.py new file mode 100644 index 0000000..a2d345e --- /dev/null +++ b/fastapi_server/test_openai_client.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +""" +Simple test for OpenAI client chat.completions.create +""" +import os +import httpx +import openai +from dotenv import load_dotenv + +load_dotenv() + +print("Initializing OpenAI client...") +print(f"Base URL: http://localhost:8488/v1") +print(f"API Key set: {'Yes' if os.getenv('ALI_API_KEY') else 'No'}") + +# Initialize client (pointing to FastAPI server from server.py) +client = openai.OpenAI( + api_key=os.getenv("ALI_API_KEY"), + base_url="http://localhost:8488/v1", + timeout=httpx.Timeout(60.0) +) + +print("\nTesting chat completion (non-streaming)...") +# try: +# # Test chat completion (non-streaming first) +# response = client.chat.completions.create( +# model="qwen-flash", +# messages=[ +# {'role':'system', 'content': 'your name is steve'} +# ,{"role": "user", "content": "Say hello!"}], +# stream=False, +# max_tokens=100, +# temperature=0.7 +# ) + +# print(f"Response ID: {response.id}") +# print(f"Model: {response.model}") +# print(f"Content: {response.choices[0].message.content}") +# print("\n✓ Non-streaming test successful!") + +# except Exception as e: +# print(f"\n✗ Error: {str(e)}") +# import traceback +# traceback.print_exc() + +print("\nTesting chat completion (streaming)...") +try: + # Test streaming with same message as non-streaming test + response = client.chat.completions.create( + model="qwen-flash", + messages=[ + {'role':'system', 'content': 'your name is steve'}, + {"role": "user", "content": "Say hello!"} + ], + stream=True, + max_tokens=100, + temperature=0.7 + ) + + print("Streaming response:") + full_content = "" + chunk_count = 0 + for chunk in response: + chunk_count += 1 + if hasattr(chunk, 'choices') and len(chunk.choices) > 0: + if hasattr(chunk.choices[0], 'delta') and chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + print(content, end="", flush=True) + full_content += content + + print(f"\n\nTotal chunks received: {chunk_count}") + print(f"Full content: {repr(full_content)}") + print(f"Content length: {len(full_content)}") + print("\n✓ Streaming test successful!") + +except Exception as e: + print(f"\n✗ Error: {str(e)}") + import traceback + traceback.print_exc()