dashscope api

This commit is contained in:
2025-11-05 18:15:55 +08:00
parent d2854f604c
commit 08a86a48d5
2 changed files with 369 additions and 0 deletions

View File

@@ -0,0 +1,264 @@
from fastapi import FastAPI, HTTPException, Path, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
from typing import List, Optional
import os
import sys
import time
import json
import uvicorn
from loguru import logger
# Ensure we can import from project root
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lang_agent.pipeline import Pipeline, PipelineConfig
class DSMessage(BaseModel):
role: str
content: str
class DSApplicationCallRequest(BaseModel):
api_key: Optional[str] = Field(default=None)
app_id: Optional[str] = Field(default=None)
session_id: Optional[str] = Field(default=None)
messages: List[DSMessage]
stream: bool = Field(default=True)
# Optional overrides for pipeline behavior
thread_id: Optional[int] = Field(default=3)
app = FastAPI(title="DashScope-Compatible Application API",
description="DashScope Application.call compatible endpoint backed by pipeline.chat")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize Pipeline once
pipeline_config = PipelineConfig()
pipeline_config.llm_name = "qwen-flash"
pipeline_config.llm_provider = "openai"
pipeline_config.base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
pipeline = Pipeline(pipeline_config)
def sse_chunks_from_text(full_text: str, response_id: str, model: str = "qwen-flash", chunk_size: int = 10):
created_time = int(time.time())
for i in range(0, len(full_text), chunk_size):
chunk = full_text[i:i + chunk_size]
if chunk:
data = {
"request_id": response_id,
"code": 200,
"message": "OK",
"output": {
# Send empty during stream; many SDKs only expose output_text on final
"text": "",
"created": created_time,
"model": model,
},
"is_end": False,
}
yield f"data: {json.dumps(data)}\n\n"
final = {
"request_id": response_id,
"code": 200,
"message": "OK",
"output": {
"text": full_text,
"created": created_time,
"model": model,
},
"is_end": True,
}
yield f"data: {json.dumps(final)}\n\n"
@app.post("/v1/apps/{app_id}/sessions/{session_id}/responses")
async def application_responses(
request: Request,
app_id: str = Path(...),
session_id: str = Path(...),
):
try:
body = await request.json()
# Prefer path params
req_app_id = app_id or body.get("app_id")
req_session_id = session_id or body.get("session_id")
# Normalize messages
messages = body.get("messages")
if messages is None and isinstance(body.get("input"), dict):
messages = body.get("input", {}).get("messages")
if messages is None and isinstance(body.get("input"), dict):
prompt = body.get("input", {}).get("prompt")
if isinstance(prompt, str):
messages = [{"role": "user", "content": prompt}]
if not messages:
raise HTTPException(status_code=400, detail="messages is required")
# Determine stream flag
stream = body.get("stream")
if stream is None:
stream = body.get("parameters", {}).get("stream", True)
thread_id = body.get("thread_id", 3)
# Extract latest user message
user_msg = None
for m in reversed(messages):
role = m.get("role") if isinstance(m, dict) else None
content = m.get("content") if isinstance(m, dict) else None
if role == "user" and content:
user_msg = content
break
if user_msg is None:
last = messages[-1]
user_msg = last.get("content") if isinstance(last, dict) else str(last)
# Invoke pipeline (non-stream) then stream-chunk it to the client
result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id)
if not isinstance(result_text, str):
result_text = str(result_text)
response_id = f"appcmpl-{os.urandom(12).hex()}"
if stream:
return StreamingResponse(
sse_chunks_from_text(result_text, response_id=response_id, model=pipeline_config.llm_name, chunk_size=10),
media_type="text/event-stream",
)
# Non-streaming response structure
data = {
"request_id": response_id,
"code": 200,
"message": "OK",
"app_id": req_app_id,
"session_id": req_session_id,
"output": {
"text": result_text,
"created": int(time.time()),
"model": pipeline_config.llm_name,
},
"is_end": True,
}
return JSONResponse(content=data)
except HTTPException:
raise
except Exception as e:
logger.error(f"DashScope-compatible endpoint error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Compatibility: some SDKs call /apps/{app_id}/completion without /v1 and without session in path
@app.post("/apps/{app_id}/completion")
@app.post("/v1/apps/{app_id}/completion")
async def application_completion(
request: Request,
app_id: str = Path(...),
):
try:
body = await request.json()
req_session_id = body.get("session_id")
# Normalize messages
messages = body.get("messages")
if messages is None and isinstance(body.get("input"), dict):
messages = body.get("input", {}).get("messages")
if messages is None and isinstance(body.get("input"), dict):
prompt = body.get("input", {}).get("prompt")
if isinstance(prompt, str):
messages = [{"role": "user", "content": prompt}]
if not messages:
raise HTTPException(status_code=400, detail="messages is required")
stream = body.get("stream")
if stream is None:
stream = body.get("parameters", {}).get("stream", True)
thread_id = body.get("thread_id", 3)
user_msg = None
for m in reversed(messages):
role = m.get("role") if isinstance(m, dict) else None
content = m.get("content") if isinstance(m, dict) else None
if role == "user" and content:
user_msg = content
break
if user_msg is None:
last = messages[-1]
user_msg = last.get("content") if isinstance(last, dict) else str(last)
result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id)
if not isinstance(result_text, str):
result_text = str(result_text)
response_id = f"appcmpl-{os.urandom(12).hex()}"
if stream:
return StreamingResponse(
sse_chunks_from_text(result_text, response_id=response_id, model=pipeline_config.llm_name, chunk_size=10),
media_type="text/event-stream",
)
data = {
"request_id": response_id,
"code": 200,
"message": "OK",
"app_id": app_id,
"session_id": req_session_id,
"output": {
"text": result_text,
"created": int(time.time()),
"model": pipeline_config.llm_name,
},
"is_end": True,
}
return JSONResponse(content=data)
except HTTPException:
raise
except Exception as e:
logger.error(f"DashScope-compatible completion error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
async def root():
return {"message": "DashScope Application-compatible API", "endpoints": [
"/v1/apps/{app_id}/sessions/{session_id}/responses",
"/health",
]}
@app.get("/health")
async def health():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(
"server_dashscope:app",
host="0.0.0.0",
port=8588,
reload=True,
)

View File

@@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
Minimal test for DashScope Application.call against server_dashscope.py
Instructions:
- Start the DashScope-compatible server first, e.g.:
uvicorn fastapi_server.server_dashscope:app --host 0.0.0.0 --port 8588 --reload
- Set BASE_URL below to the server base URL you started.
- Optionally set environment variables ALI_API_KEY and ALI_APP_ID.
"""
import os
import uuid
from dotenv import load_dotenv
from loguru import logger
from http import HTTPStatus
TAG = __name__
load_dotenv()
try:
from dashscope import Application
import dashscope
except Exception as e:
print("dashscope package not found. Please install it: pip install dashscope")
raise
# <<< Paste your running FastAPI base url here >>>
BASE_URL = os.getenv("DS_BASE_URL", "http://localhost:8588")
# Params
API_KEY = os.getenv("ALI_API_KEY", "test-key")
APP_ID = os.getenv("ALI_APP_ID", "test-app")
SESSION_ID = str(uuid.uuid4())
dialogue = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Say 'the world is awesome and beautiful'."},
]
call_params = {
"api_key": API_KEY,
"app_id": APP_ID,
"session_id": SESSION_ID,
"messages": dialogue,
"stream": True,
}
def main():
# Point the SDK to our FastAPI implementation
dashscope.base_http_api_url = BASE_URL
print(f"Using base_http_api_url = {dashscope.base_http_api_url}")
print("\nCalling Application.call(stream=True)...\n")
responses = Application.call(**call_params)
try:
last_text = ""
u = ""
for resp in responses:
if resp.status_code != HTTPStatus.OK:
logger.bind(tag=TAG).error(
f"code={resp.status_code}, message={resp.message}, 请参考文档https://help.aliyun.com/zh/model-studio/developer-reference/error-code"
)
continue
current_text = getattr(getattr(resp, "output", None), "text", None)
if current_text is None:
continue
# SDK流式为增量覆盖计算差量输出
if len(current_text) >= len(last_text):
delta = current_text[len(last_text):]
else:
# 避免偶发回退
delta = current_text
if delta:
u = delta
last_text = current_text
print("from stream: ", u)
except TypeError:
# 非流式回落(一次性返回)
if responses.status_code != HTTPStatus.OK:
logger.bind(tag=TAG).error(
f"code={responses.status_code}, message={responses.message}, 请参考文档https://help.aliyun.com/zh/model-studio/developer-reference/error-code"
)
u = "【阿里百练API服务响应异常】"
else:
full_text = getattr(getattr(responses, "output", None), "text", "")
logger.bind(tag=TAG).info(
f"【阿里百练API服务】完整响应长度: {len(full_text)}"
)
u = full_text
print("from non-stream: ", u)
except Exception as e:
logger.bind(tag=TAG).error(f"Error: {e}")
u = "【阿里百练API服务响应异常】"
if __name__ == "__main__":
main()