This commit is contained in:
jijiahao
2025-11-05 20:41:47 +08:00
19 changed files with 1125 additions and 7 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

45
.dockerignore Normal file
View File

@@ -0,0 +1,45 @@
# Git
.git
.gitignore
# Python
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
env/
pip-log.txt
pip-delete-this-directory.txt
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.log
.git
.mypy_cache
.pytest_cache
.hypothesis
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Project specific
*.md
!README.md

3
.gitignore vendored
View File

@@ -5,4 +5,5 @@ logs/
*.pyc
*.zip
django.log
django.log
.env

31
Dockerfile Normal file
View File

@@ -0,0 +1,31 @@
# 使用Python 3.10作为基础镜像
FROM python:3.12-slim
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# 复制项目文件
COPY pyproject.toml ./
COPY fastapi_server/requirements.txt ./fastapi_server/
COPY lang_agent/ ./lang_agent/
COPY fastapi_server/ ./fastapi_server/
# 安装Python依赖
RUN pip install --no-cache-dir -r fastapi_server/requirements.txt
RUN pip install --no-cache-dir -e .
# 暴露端口
EXPOSE 8488
# 启动命令
CMD ["python", "fastapi_server/server.py"]

25
docker-compose.yml Normal file
View File

@@ -0,0 +1,25 @@
version: '3.8'
services:
lang-agent-api:
build: .
container_name: lang-agent-api
ports:
- "8488:8488"
env_file:
- ./.env
environment:
- PYTHONPATH=/app
- PYTHONUNBUFFERED=1
- RAG_FOLDER_PATH=/app/assets/xiaozhan_emb
volumes:
- ./configs:/app/configs
- ./scripts:/app/scripts
- ./assets:/app/assets
restart: unless-stopped
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8488/health')"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s

View File

@@ -0,0 +1,20 @@
# 使用Python 3.9作为基础镜像
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 复制requirements文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制项目文件
COPY . .
# 暴露端口
EXPOSE 8488
# 启动命令
CMD ["python", "server.py"]

View File

@@ -0,0 +1,220 @@
# Lang Agent OpenAI 兼容API
这是一个符合OpenAI接口规范的聊天API允许用户使用与OpenAI API相同的方式访问您的Lang Agent服务。
## 快速开始
### 1. 启动服务器
```bash
cd /path/to/lang-agent/fastapi_server
python server.py
```
服务器将在 `http://localhost:8488` 上启动。
### 2. 使用API
#### 使用curl命令
```bash
curl -X POST "http://localhost:8488/v1/chat/completions" \
-H "Authorization: Bearer 123tangledup-ai" \
-H "Content-Type: application/json" \
-d '{
"model": "qwen-plus",
"messages": [
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": "你是谁?"
}
]
}'
```
#### 使用Python requests
```python
import requests
API_BASE_URL = "http://localhost:8488"
API_KEY = "123tangledup-ai"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": "qwen-plus",
"messages": [
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": "你是谁?"
}
]
}
response = requests.post(f"{API_BASE_URL}/v1/chat/completions", headers=headers, json=data)
print(response.json())
```
#### 使用OpenAI Python库
```python
from openai import OpenAI
client = OpenAI(
api_key="123tangledup-ai",
base_url="http://localhost:8488/v1"
)
response = client.chat.completions.create(
model="qwen-plus",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "你是谁?"}
]
)
print(response.choices[0].message.content)
```
## API 端点
### 1. 聊天完成 `/v1/chat/completions`
与OpenAI的chat completions API完全兼容。
**请求参数:**
| 参数 | 类型 | 必需 | 默认值 | 描述 |
|------|------|------|--------|------|
| model | string | 是 | - | 模型名称 |
| messages | array | 是 | - | 消息列表 |
| temperature | number | 否 | 0.7 | 采样温度 |
| max_tokens | integer | 否 | 500 | 最大生成token数 |
| stream | boolean | 否 | false | 是否流式返回 |
| thread_id | integer | 否 | 3 | 线程ID用于多轮对话 |
**响应格式:**
```json
{
"id": "chatcmpl-abc123",
"object": "chat.completion",
"created": 1677652288,
"model": "qwen-plus",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "您好我是一个AI助手..."
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 56,
"completion_tokens": 31,
"total_tokens": 87
}
}
```
### 2. 健康检查 `/health`
检查API服务状态。
**请求:**
```bash
GET /health
```
**响应:**
```json
{
"status": "healthy"
}
```
### 3. API信息 `/`
获取API基本信息。
**请求:**
```bash
GET /
```
**响应:**
```json
{
"message": "Lang Agent Chat API",
"version": "1.0.0",
"description": "使用OpenAI格式调用pipeline.invoke的聊天API",
"authentication": "Bearer Token (API Key)",
"endpoints": {
"/v1/chat/completions": "POST - 聊天完成接口兼容OpenAI格式需要API密钥验证",
"/": "GET - API信息",
"/health": "GET - 健康检查接口"
}
}
```
## 认证
API使用Bearer Token认证。默认API密钥为 `123tangledup-ai`
在请求头中包含:
```
Authorization: Bearer 123tangledup-ai
```
## 测试脚本
项目提供了两个测试脚本:
1. **Bash脚本** (`test_openai_api.sh`) - 使用curl命令测试API
2. **Python脚本** (`test_openai_api.py`) - 使用Python requests库测试API
运行测试脚本:
```bash
# 运行Bash测试脚本
chmod +x test_openai_api.sh
./test_openai_api.sh
# 运行Python测试脚本
python test_openai_api.py
```
## 与OpenAI API的兼容性
此API完全兼容OpenAI的chat completions API您可以
1. 使用任何支持OpenAI API的客户端库
2. 将base_url更改为`http://localhost:8488/v1`
3. 使用提供的API密钥进行认证
## 注意事项
1. 确保服务器正在运行且可访问
2. 流式响应(stream=true)目前可能不完全支持
3. 模型参数(model)主要用于标识,实际使用的模型由服务器配置决定
4. 多轮对话使用thread_id参数来维护上下文
## 故障排除
1. **连接错误**: 确保服务器正在运行检查URL和端口是否正确
2. **认证错误**: 检查API密钥是否正确设置
3. **请求格式错误**: 确保请求体是有效的JSON格式包含所有必需字段

179
fastapi_server/README.md Normal file
View File

@@ -0,0 +1,179 @@
# Lang Agent Chat API
这是一个基于FastAPI的聊天API服务使用OpenAI格式的请求来调用pipeline.invoke方法进行聊天。
## 功能特点
- 兼容OpenAI API格式的聊天接口
- 支持多轮对话通过thread_id
- 使用qwen-flash模型
- 支持流式和非流式响应
- 提供健康检查接口
## 安装依赖
```bash
pip install -r requirements.txt
```
## 环境变量
确保设置以下环境变量:
```bash
export ALI_API_KEY="your_ali_api_key"
```
## 运行服务
### 方法1使用启动脚本
```bash
./start_server.sh
```
### 方法2直接运行Python文件
```bash
python server.py
```
服务将在 `http://localhost:8000` 启动。
## API接口
### 聊天完成接口
**端点**: `POST /v1/chat/completions`
**请求格式**:
```json
{
"model": "qwen-flash",
"messages": [
{
"role": "system",
"content": "你是一个有用的助手。"
},
{
"role": "user",
"content": "你好,请介绍一下你自己。"
}
],
"temperature": 0.7,
"max_tokens": 1000,
"stream": false,
"thread_id": 3
}
```
**响应格式**:
```json
{
"id": "chatcmpl-abc123",
"object": "chat.completion",
"created": 1677652288,
"model": "qwen-flash",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "你好!我是小盏,是半盏青年茶馆的智能助手..."
},
"finish_reason": "stop"
}
]
}
```
### API信息接口
**端点**: `GET /`
返回API的基本信息。
### 健康检查接口
**端点**: `GET /health`
返回服务的健康状态。
## 使用示例
### 使用OpenAI Python客户端库
首先安装OpenAI库
```bash
pip install openai
```
然后使用以下代码:
```python
from openai import OpenAI
# 设置API基础URL和API密钥这里使用一个虚拟的密钥因为我们没有实现认证
client = OpenAI(
api_key="your-api-key", # 这里可以使用任意值因为我们的API没有实现认证
base_url="http://localhost:8000/v1"
)
# 发送聊天请求
response = client.chat.completions.create(
model="qwen-flash",
messages=[
{"role": "system", "content": "你是一个有用的助手。"},
{"role": "user", "content": "你好,请介绍一下你自己。"}
],
temperature=0.7,
thread_id=1 # 用于多轮对话
)
print(response.choices[0].message.content)
```
### 使用curl
```bash
curl -X POST "http://localhost:8000/v1/chat/completions" \
-H "Content-Type: application/json" \
-d '{
"model": "qwen-flash",
"messages": [
{
"role": "user",
"content": "你好,请介绍一下你自己。"
}
]
}'
```
### 使用Python requests
```python
import requests
url = "http://localhost:8000/v1/chat/completions"
headers = {"Content-Type": "application/json"}
data = {
"model": "qwen-flash",
"messages": [
{
"role": "user",
"content": "你好,请介绍一下你自己。"
}
]
}
response = requests.post(url, headers=headers, json=data)
print(response.json())
```
## 注意事项
1. 确保已设置正确的API密钥环境变量
2. 默认使用qwen-flash模型可以通过修改代码中的配置来更改模型
3. thread_id用于多轮对话相同的thread_id会保持对话上下文
4. 目前stream参数设置为true时仍会返回非流式响应可根据需要进一步实现

View File

@@ -0,0 +1,18 @@
version: '3.8'
services:
lang-agent-api:
build:
context: .
dockerfile: Dockerfile.api
ports:
- "8488:8488"
environment:
- PYTHONUNBUFFERED=1
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8488/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s

View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""
使用OpenAI Python客户端库调用我们的FastAPI聊天API的示例
"""
from openai import OpenAI
import os
# 设置API基础URL和API密钥这里使用一个虚拟的密钥因为我们没有实现认证
client = OpenAI(
api_key="your-api-key", # 这里可以使用任意值因为我们的API没有实现认证
base_url="http://localhost:8000/v1"
)
def simple_chat():
"""简单的聊天示例"""
print("=" * 50)
print("简单聊天示例")
print("=" * 50)
response = client.chat.completions.create(
model="qwen-flash",
messages=[
{"role": "user", "content": "你好,请介绍一下你自己。"}
],
temperature=0.7,
thread_id=1
)
print(f"助手回复: {response.choices[0].message.content}")
print("\n")
def multi_turn_chat():
"""多轮对话示例"""
print("=" * 50)
print("多轮对话示例")
print("=" * 50)
# 第一轮对话
print("第一轮对话:")
response1 = client.chat.completions.create(
model="qwen-flash",
messages=[
{"role": "user", "content": "你推荐什么茶?"}
],
temperature=0.7,
thread_id=2
)
print(f"用户: 你推荐什么茶?")
print(f"助手: {response1.choices[0].message.content}")
# 第二轮对话使用相同的thread_id
print("\n第二轮对话:")
response2 = client.chat.completions.create(
model="qwen-flash",
messages=[
{"role": "user", "content": "为什么推荐这个茶?"}
],
temperature=0.7,
thread_id=2 # 使用相同的thread_id
)
print(f"用户: 为什么推荐这个茶?")
print(f"助手: {response2.choices[0].message.content}")
print("\n")
def system_prompt_example():
"""使用系统提示的示例"""
print("=" * 50)
print("系统提示示例")
print("=" * 50)
response = client.chat.completions.create(
model="qwen-flash",
messages=[
{"role": "system", "content": "你是一个专业的茶艺师用简洁的语言回答问题不超过50字。"},
{"role": "user", "content": "请介绍一下普洱茶。"}
],
temperature=0.3,
thread_id=3
)
print(f"用户: 请介绍一下普洱茶。")
print(f"助手: {response.choices[0].message.content}")
print("\n")
def interactive_chat():
"""交互式聊天示例"""
print("=" * 50)
print("交互式聊天 (输入'quit'退出)")
print("=" * 50)
thread_id = 4 # 为这个会话分配一个固定的thread_id
while True:
user_input = input("你: ")
if user_input.lower() == 'quit':
break
try:
response = client.chat.completions.create(
model="qwen-flash",
messages=[
{"role": "user", "content": user_input}
],
temperature=0.7,
thread_id=thread_id
)
print(f"助手: {response.choices[0].message.content}")
except Exception as e:
print(f"错误: {str(e)}")
if __name__ == "__main__":
print("使用OpenAI客户端库调用FastAPI聊天API示例")
print("注意: 确保服务器在 http://localhost:8000 上运行\n")
# 简单聊天示例
simple_chat()
# 多轮对话示例
multi_turn_chat()
# 系统提示示例
system_prompt_example()
# 交互式聊天示例
interactive_chat()

View File

@@ -0,0 +1,24 @@
fastapi>=0.104.0
uvicorn>=0.24.0
pydantic>=2.0.0,<2.12
loguru>=0.7.0
python-dotenv>=1.0.0
langchain==1.0
langchain-core>=0.1.0
langchain-community
langchain-openai
langchain-mcp-adapters
langgraph>=0.0.40
tyro>=0.7.0
commentjson>=0.9.0
matplotlib>=3.7.0
Pillow>=10.0.0
jax>=0.4.0
httpx[socks]
dashscope
websockets>=11.0.3
mcp>=1.8.1
mcp-proxy>=0.8.2
faiss-cpu
fastmcp
pandas

315
fastapi_server/server.py Normal file
View File

@@ -0,0 +1,315 @@
from fastapi import FastAPI, HTTPException, Depends, Security
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any, Union
import os
import sys
import time
import uvicorn
import httpx
import openai
import json
from loguru import logger
# 添加父目录到系统路径以便导入lang_agent模块
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lang_agent.pipeline import Pipeline, PipelineConfig
# 定义OpenAI格式的请求模型
class ChatMessage(BaseModel):
role: str = Field(..., description="消息角色,可以是 'system', 'user', 'assistant'")
content: str = Field(..., description="消息内容")
class ChatCompletionRequest(BaseModel):
model: str = Field(default="qwen-flash", description="模型名称")
messages: List[ChatMessage] = Field(..., description="对话消息列表")
temperature: Optional[float] = Field(default=0.7, description="采样温度")
max_tokens: Optional[int] = Field(default=500, description="最大生成token数")
stream: Optional[bool] = Field(default=False, description="是否流式返回")
thread_id: Optional[int] = Field(default=3, description="线程ID用于多轮对话")
llm_provider: Optional[str] = Field(default="openai", description="LLM提供商")
base_url: Optional[str] = Field(default="https://dashscope.aliyuncs.com/compatible-mode/v1", description="LLM API基础URL")
class ChatCompletionResponseChoice(BaseModel):
index: int
message: ChatMessage
finish_reason: str
class ChatCompletionResponseUsage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
class ChatCompletionResponse(BaseModel):
id: str
object: str = "chat.completion"
created: int
model: str
choices: List[ChatCompletionResponseChoice]
usage: Optional[ChatCompletionResponseUsage] = None
# OpenAI客户端包装类
class OpenAIClientWrapper:
def __init__(
self,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: float = 60.0,
model_name: str = "qwen-flash",
max_tokens: int = 500,
temperature: float = 0.7,
top_p: float = 1.0,
frequency_penalty: float = 0.0,
):
"""
初始化OpenAI客户端包装器
Args:
api_key: API密钥如果为None则从环境变量OPENAI_API_KEY获取
base_url: API基础URL如果为None则从环境变量OPENAI_BASE_URL获取
timeout: 请求超时时间(秒)
model_name: 默认模型名称
max_tokens: 默认最大token数
temperature: 默认采样温度
top_p: 默认top_p参数
frequency_penalty: 默认频率惩罚
"""
self.api_key = api_key or os.getenv("OPENAI_API_KEY", "")
self.base_url = base_url or os.getenv("OPENAI_BASE_URL", None)
self.timeout = timeout
self.model_name = model_name
self.max_tokens = max_tokens
self.temperature = temperature
self.top_p = top_p
self.frequency_penalty = frequency_penalty
self.client = openai.OpenAI(
api_key=self.api_key,
base_url=self.base_url,
timeout=httpx.Timeout(self.timeout)
)
def response(self, session_id: str, dialogue: List[Dict[str, str]], **kwargs):
"""
生成聊天响应(流式)
Args:
session_id: 会话ID
dialogue: 对话消息列表,格式为 [{"role": "user", "content": "..."}, ...]
**kwargs: 额外的参数可以覆盖默认的max_tokens, temperature, top_p, frequency_penalty
Returns:
OpenAI流式响应对象
"""
try:
responses = self.client.chat.completions.create(
model=self.model_name,
messages=dialogue,
stream=True,
max_tokens=kwargs.get("max_tokens", self.max_tokens),
temperature=kwargs.get("temperature", self.temperature),
top_p=kwargs.get("top_p", self.top_p),
frequency_penalty=kwargs.get("frequency_penalty", self.frequency_penalty),
)
return responses
except Exception as e:
logger.error(f"OpenAI客户端响应错误: {str(e)}")
raise
# 初始化FastAPI应用
app = FastAPI(title="Lang Agent Chat API", description="使用OpenAI格式调用pipeline.invoke的聊天API")
# 设置API密钥
API_KEY = "123tangledup-ai"
# 创建安全方案
security = HTTPBearer()
# 验证API密钥的依赖项
# async def verify_api_key(credentials: HTTPAuthorizationCredentials = Security(security)):
# if credentials.credentials != API_KEY:
# raise HTTPException(
# status_code=401,
# detail="无效的API密钥",
# headers={"WWW-Authenticate": "Bearer"},
# )
# return credentials
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 初始化Pipeline
pipeline_config = PipelineConfig()
pipeline_config.llm_name = "qwen-flash"
pipeline_config.llm_provider = "openai"
pipeline_config.base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
pipeline = Pipeline(pipeline_config)
# 初始化OpenAI客户端包装器可选用于直接调用OpenAI API
openai_client = OpenAIClientWrapper(
api_key=os.getenv("OPENAI_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
timeout=60.0,
model_name="qwen-flash",
max_tokens=500,
temperature=0.7,
top_p=1.0,
frequency_penalty=0.0,
)
def generate_streaming_chunks(full_text: str, response_id: str, model: str, chunk_size: int = 10):
"""
Generate streaming chunks from non-streaming result
"""
created_time = int(time.time())
# Stream content chunks
for i in range(0, len(full_text), chunk_size):
chunk = full_text[i:i + chunk_size]
if chunk:
chunk_data = {
"id": response_id,
"object": "chat.completion.chunk",
"created": created_time,
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": chunk},
"finish_reason": None
}
]
}
yield f"data: {json.dumps(chunk_data)}\n\n"
# Send final chunk with finish_reason
final_chunk = {
"id": response_id,
"object": "chat.completion.chunk",
"created": created_time,
"model": model,
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop"
}
]
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
@app.post("/v1/chat/completions")
async def chat_completions(
request: ChatCompletionRequest#,
# credentials: HTTPAuthorizationCredentials = Depends(verify_api_key)
):
"""
使用OpenAI格式的聊天完成API
"""
try:
# 提取用户消息
user_message = None
system_message = None
# TODO: wrap this sht as human and system message
for message in request.messages:
if message.role == "user":
user_message = message.content
elif message.role == "system" or message.role == "assistant":
system_message = message.content
if not user_message:
raise HTTPException(status_code=400, detail="缺少用户消息")
# 调用pipeline的chat方法 (always get non-streaming result)
response_content = pipeline.chat(
inp=user_message,
as_stream=False, # Always get full result, then chunk it if streaming
thread_id=request.thread_id
)
# Ensure response_content is a string
if not isinstance(response_content, str):
response_content = str(response_content)
logger.info(f"Pipeline response - Length: {len(response_content)}, Content: {repr(response_content[:200])}")
if len(response_content) == 0:
logger.warning("Pipeline returned empty response!")
response_id = f"chatcmpl-{os.urandom(12).hex()}"
# If streaming requested, return streaming response
if request.stream:
return StreamingResponse(
generate_streaming_chunks(
full_text=response_content,
response_id=response_id,
model=request.model,
chunk_size=10
),
media_type="text/event-stream"
)
# Otherwise return normal response
response = ChatCompletionResponse(
id=response_id,
created=int(time.time()),
model=request.model,
choices=[
ChatCompletionResponseChoice(
index=0,
message=ChatMessage(role="assistant", content=response_content),
finish_reason="stop"
)
]
)
return response
except Exception as e:
logger.error(f"处理聊天请求时出错: {str(e)}")
raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}")
@app.get("/")
async def root():
"""
根路径返回API信息
"""
return {
"message": "Lang Agent Chat API",
"version": "1.0.0",
"description": "使用OpenAI格式调用pipeline.invoke的聊天API",
"authentication": "Bearer Token (API Key)",
"endpoints": {
"/v1/chat/completions": "POST - 聊天完成接口兼容OpenAI格式需要API密钥验证",
"/": "GET - API信息",
"/health": "GET - 健康检查接口"
}
}
@app.get("/health")
async def health_check():
"""
健康检查接口
"""
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(
"server:app",
host="0.0.0.0",
port=8488,
reload=True
)

19
fastapi_server/start_server.sh Executable file
View File

@@ -0,0 +1,19 @@
#!/bin/bash
echo "启动Lang Agent Chat API服务器..."
# 检查Python环境
if ! command -v python &> /dev/null; then
echo "错误: 未找到Python。请确保Python已安装并添加到PATH中。"
exit 1
fi
# 检查环境变量
if [ -z "$ALI_API_KEY" ]; then
echo "警告: 未设置ALI_API_KEY环境变量。请确保已设置此变量。"
echo "例如: export ALI_API_KEY='your_api_key'"
fi
# 启动服务器
cd "$(dirname "$0")"
python server.py

View File

@@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""
Simple test for OpenAI client chat.completions.create
"""
import os
import httpx
import openai
from dotenv import load_dotenv
load_dotenv()
print("Initializing OpenAI client...")
print(f"Base URL: http://localhost:8488/v1")
print(f"API Key set: {'Yes' if os.getenv('ALI_API_KEY') else 'No'}")
# Initialize client (pointing to FastAPI server from server.py)
client = openai.OpenAI(
api_key=os.getenv("ALI_API_KEY"),
base_url="http://localhost:8488/v1",
timeout=httpx.Timeout(60.0)
)
print("\nTesting chat completion (non-streaming)...")
# try:
# # Test chat completion (non-streaming first)
# response = client.chat.completions.create(
# model="qwen-flash",
# messages=[
# {'role':'system', 'content': 'your name is steve'}
# ,{"role": "user", "content": "Say hello!"}],
# stream=False,
# max_tokens=100,
# temperature=0.7
# )
# print(f"Response ID: {response.id}")
# print(f"Model: {response.model}")
# print(f"Content: {response.choices[0].message.content}")
# print("\n✓ Non-streaming test successful!")
# except Exception as e:
# print(f"\n✗ Error: {str(e)}")
# import traceback
# traceback.print_exc()
print("\nTesting chat completion (streaming)...")
try:
# Test streaming with same message as non-streaming test
response = client.chat.completions.create(
model="qwen-flash",
messages=[
{'role':'system', 'content': 'your name is steve'},
{"role": "user", "content": "Say hello!"}
],
stream=True,
max_tokens=100,
temperature=0.7
)
print("Streaming response:")
full_content = ""
chunk_count = 0
for chunk in response:
chunk_count += 1
if hasattr(chunk, 'choices') and len(chunk.choices) > 0:
if hasattr(chunk.choices[0], 'delta') and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_content += content
print(f"\n\nTotal chunks received: {chunk_count}")
print(f"Full content: {repr(full_content)}")
print(f"Content length: {len(full_content)}")
print("\n✓ Streaming test successful!")
except Exception as e:
print(f"\n✗ Error: {str(e)}")
import traceback
traceback.print_exc()

View File

@@ -6,6 +6,9 @@ from typing import Dict
import os
from loguru import logger
from dotenv import load_dotenv
load_dotenv()
## NOTE: base classes taken from nerfstudio
class PrintableConfig:

View File

@@ -3,7 +3,7 @@ from typing import Type, List
import tyro
from mcp.server.fastmcp import FastMCP
from loguru import logger
import os
import os.path as osp
from langchain_community.vectorstores import FAISS
from langchain_core.documents.base import Document
@@ -21,8 +21,16 @@ class SimpleRagConfig(ToolConfig, KeyConfig):
model_name:str = "text-embedding-v4"
"""embedding model name"""
folder_path:str = "/home/smith/projects/work/langchain-agent/assets/xiaozhan_emb"
"""path to local database"""
folder_path:str = None
"""path to docker database"""
def __post_init__(self):
super().__post_init__()
if self.folder_path is None:
self.folder_path = osp.join(osp.dirname(osp.dirname(osp.dirname(__file__))), "assets", "xiaozhan_emb")
logger.info(f"no rag database provided, using default {self.folder_path}")
@@ -31,6 +39,8 @@ class SimpleRag(LangToolBase):
self.config = config
self.emb = QwenEmbeddings(self.config.api_key,
self.config.model_name)
self.vec_store = FAISS.load_local(
folder_path=self.config.folder_path,
embeddings=self.emb,

0
lang_agent/test.py Normal file
View File

View File

@@ -6,7 +6,7 @@ import inspect
import asyncio
import os.path as osp
from loguru import logger
from fastmcp.tools.tool import FunctionTool
from fastmcp.tools.tool import Tool
from lang_agent.config import InstantiateConfig, ToolConfig
from lang_agent.base import LangToolBase
@@ -80,7 +80,7 @@ class ToolManager:
def _get_tool_fnc(self, tool_obj:LangToolBase)->List:
fnc_list = []
for fnc in tool_obj.get_tool_fnc():
if isinstance(fnc, FunctionTool):
if isinstance(fnc, Tool):
fnc = fnc.fn
fnc_list.append(fnc)

View File

@@ -24,7 +24,7 @@ examples = [
"inputs": {"text": ["我要购买一杯野星星",
"我要再加一杯"]},
"outputs": {"answer": "你的野星星已经下单成功",
"tool_use": ["retrieve|get_resources",
"tool_use": ["retrieve|get_resource",
"start_shopping_session",
"add_to_cart",
"create_wechat_pay",