diff --git a/fastapi_server/server_dashscope.py b/fastapi_server/server_dashscope.py new file mode 100644 index 0000000..09b7772 --- /dev/null +++ b/fastapi_server/server_dashscope.py @@ -0,0 +1,264 @@ +from fastapi import FastAPI, HTTPException, Path, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import StreamingResponse, JSONResponse +from pydantic import BaseModel, Field +from typing import List, Optional +import os +import sys +import time +import json +import uvicorn +from loguru import logger + +# Ensure we can import from project root +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from lang_agent.pipeline import Pipeline, PipelineConfig + + +class DSMessage(BaseModel): + role: str + content: str + + +class DSApplicationCallRequest(BaseModel): + api_key: Optional[str] = Field(default=None) + app_id: Optional[str] = Field(default=None) + session_id: Optional[str] = Field(default=None) + messages: List[DSMessage] + stream: bool = Field(default=True) + # Optional overrides for pipeline behavior + thread_id: Optional[int] = Field(default=3) + + +app = FastAPI(title="DashScope-Compatible Application API", + description="DashScope Application.call compatible endpoint backed by pipeline.chat") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +# Initialize Pipeline once +pipeline_config = PipelineConfig() +pipeline_config.llm_name = "qwen-flash" +pipeline_config.llm_provider = "openai" +pipeline_config.base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1" +pipeline = Pipeline(pipeline_config) + + +def sse_chunks_from_text(full_text: str, response_id: str, model: str = "qwen-flash", chunk_size: int = 10): + created_time = int(time.time()) + + for i in range(0, len(full_text), chunk_size): + chunk = full_text[i:i + chunk_size] + if chunk: + data = { + "request_id": response_id, + "code": 200, + "message": "OK", + "output": { + # Send empty during stream; many SDKs only expose output_text on final + "text": "", + "created": created_time, + "model": model, + }, + "is_end": False, + } + yield f"data: {json.dumps(data)}\n\n" + + final = { + "request_id": response_id, + "code": 200, + "message": "OK", + "output": { + "text": full_text, + "created": created_time, + "model": model, + }, + "is_end": True, + } + yield f"data: {json.dumps(final)}\n\n" + + +@app.post("/v1/apps/{app_id}/sessions/{session_id}/responses") +async def application_responses( + request: Request, + app_id: str = Path(...), + session_id: str = Path(...), +): + try: + body = await request.json() + + # Prefer path params + req_app_id = app_id or body.get("app_id") + req_session_id = session_id or body.get("session_id") + + # Normalize messages + messages = body.get("messages") + if messages is None and isinstance(body.get("input"), dict): + messages = body.get("input", {}).get("messages") + if messages is None and isinstance(body.get("input"), dict): + prompt = body.get("input", {}).get("prompt") + if isinstance(prompt, str): + messages = [{"role": "user", "content": prompt}] + + if not messages: + raise HTTPException(status_code=400, detail="messages is required") + + # Determine stream flag + stream = body.get("stream") + if stream is None: + stream = body.get("parameters", {}).get("stream", True) + + thread_id = body.get("thread_id", 3) + + # Extract latest user message + user_msg = None + for m in reversed(messages): + role = m.get("role") if isinstance(m, dict) else None + content = m.get("content") if isinstance(m, dict) else None + if role == "user" and content: + user_msg = content + break + if user_msg is None: + last = messages[-1] + user_msg = last.get("content") if isinstance(last, dict) else str(last) + + # Invoke pipeline (non-stream) then stream-chunk it to the client + result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id) + if not isinstance(result_text, str): + result_text = str(result_text) + + response_id = f"appcmpl-{os.urandom(12).hex()}" + + if stream: + return StreamingResponse( + sse_chunks_from_text(result_text, response_id=response_id, model=pipeline_config.llm_name, chunk_size=10), + media_type="text/event-stream", + ) + + # Non-streaming response structure + data = { + "request_id": response_id, + "code": 200, + "message": "OK", + "app_id": req_app_id, + "session_id": req_session_id, + "output": { + "text": result_text, + "created": int(time.time()), + "model": pipeline_config.llm_name, + }, + "is_end": True, + } + return JSONResponse(content=data) + + except HTTPException: + raise + except Exception as e: + logger.error(f"DashScope-compatible endpoint error: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +# Compatibility: some SDKs call /apps/{app_id}/completion without /v1 and without session in path +@app.post("/apps/{app_id}/completion") +@app.post("/v1/apps/{app_id}/completion") +async def application_completion( + request: Request, + app_id: str = Path(...), +): + try: + body = await request.json() + + req_session_id = body.get("session_id") + + # Normalize messages + messages = body.get("messages") + if messages is None and isinstance(body.get("input"), dict): + messages = body.get("input", {}).get("messages") + if messages is None and isinstance(body.get("input"), dict): + prompt = body.get("input", {}).get("prompt") + if isinstance(prompt, str): + messages = [{"role": "user", "content": prompt}] + + if not messages: + raise HTTPException(status_code=400, detail="messages is required") + + stream = body.get("stream") + if stream is None: + stream = body.get("parameters", {}).get("stream", True) + + thread_id = body.get("thread_id", 3) + + user_msg = None + for m in reversed(messages): + role = m.get("role") if isinstance(m, dict) else None + content = m.get("content") if isinstance(m, dict) else None + if role == "user" and content: + user_msg = content + break + if user_msg is None: + last = messages[-1] + user_msg = last.get("content") if isinstance(last, dict) else str(last) + + result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id) + if not isinstance(result_text, str): + result_text = str(result_text) + + response_id = f"appcmpl-{os.urandom(12).hex()}" + + if stream: + return StreamingResponse( + sse_chunks_from_text(result_text, response_id=response_id, model=pipeline_config.llm_name, chunk_size=10), + media_type="text/event-stream", + ) + + data = { + "request_id": response_id, + "code": 200, + "message": "OK", + "app_id": app_id, + "session_id": req_session_id, + "output": { + "text": result_text, + "created": int(time.time()), + "model": pipeline_config.llm_name, + }, + "is_end": True, + } + return JSONResponse(content=data) + + except HTTPException: + raise + except Exception as e: + logger.error(f"DashScope-compatible completion error: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@app.get("/") +async def root(): + return {"message": "DashScope Application-compatible API", "endpoints": [ + "/v1/apps/{app_id}/sessions/{session_id}/responses", + "/health", + ]} + + +@app.get("/health") +async def health(): + return {"status": "healthy"} + + +if __name__ == "__main__": + uvicorn.run( + "server_dashscope:app", + host="0.0.0.0", + port=8588, + reload=True, + ) + + diff --git a/fastapi_server/test_dashscope_client.py b/fastapi_server/test_dashscope_client.py new file mode 100644 index 0000000..162af2f --- /dev/null +++ b/fastapi_server/test_dashscope_client.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +""" +Minimal test for DashScope Application.call against server_dashscope.py + +Instructions: +- Start the DashScope-compatible server first, e.g.: + uvicorn fastapi_server.server_dashscope:app --host 0.0.0.0 --port 8588 --reload +- Set BASE_URL below to the server base URL you started. +- Optionally set environment variables ALI_API_KEY and ALI_APP_ID. +""" +import os +import uuid +from dotenv import load_dotenv +from loguru import logger +from http import HTTPStatus + +TAG = __name__ + +load_dotenv() + +try: + from dashscope import Application + import dashscope +except Exception as e: + print("dashscope package not found. Please install it: pip install dashscope") + raise + + +# <<< Paste your running FastAPI base url here >>> +BASE_URL = os.getenv("DS_BASE_URL", "http://localhost:8588") + +# Params +API_KEY = os.getenv("ALI_API_KEY", "test-key") +APP_ID = os.getenv("ALI_APP_ID", "test-app") +SESSION_ID = str(uuid.uuid4()) + +dialogue = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Say 'the world is awesome and beautiful'."}, +] + +call_params = { + "api_key": API_KEY, + "app_id": APP_ID, + "session_id": SESSION_ID, + "messages": dialogue, + "stream": True, +} + + +def main(): + # Point the SDK to our FastAPI implementation + dashscope.base_http_api_url = BASE_URL + print(f"Using base_http_api_url = {dashscope.base_http_api_url}") + + print("\nCalling Application.call(stream=True)...\n") + responses = Application.call(**call_params) + + try: + last_text = "" + u = "" + for resp in responses: + if resp.status_code != HTTPStatus.OK: + logger.bind(tag=TAG).error( + f"code={resp.status_code}, message={resp.message}, 请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code" + ) + continue + current_text = getattr(getattr(resp, "output", None), "text", None) + if current_text is None: + continue + # SDK流式为增量覆盖,计算差量输出 + if len(current_text) >= len(last_text): + delta = current_text[len(last_text):] + else: + # 避免偶发回退 + delta = current_text + if delta: + u = delta + last_text = current_text + + print("from stream: ", u) + except TypeError: + # 非流式回落(一次性返回) + if responses.status_code != HTTPStatus.OK: + logger.bind(tag=TAG).error( + f"code={responses.status_code}, message={responses.message}, 请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code" + ) + u = "【阿里百练API服务响应异常】" + else: + full_text = getattr(getattr(responses, "output", None), "text", "") + logger.bind(tag=TAG).info( + f"【阿里百练API服务】完整响应长度: {len(full_text)}" + ) + u = full_text + print("from non-stream: ", u) + except Exception as e: + logger.bind(tag=TAG).error(f"Error: {e}") + u = "【阿里百练API服务响应异常】" + + + +if __name__ == "__main__": + main() + +