From a920454f3916f3f3f7aa529c892b8eff3ee6fa9b Mon Sep 17 00:00:00 2001 From: goulustis Date: Fri, 7 Nov 2025 14:55:08 +0800 Subject: [PATCH] dashscope use actual streaming instead --- fastapi_server/server_dashscope.py | 48 ++++++++++++++++++------------ 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/fastapi_server/server_dashscope.py b/fastapi_server/server_dashscope.py index f9b9e8b..3c9a244 100644 --- a/fastapi_server/server_dashscope.py +++ b/fastapi_server/server_dashscope.py @@ -48,12 +48,17 @@ pipeline_config = PipelineConfig() pipeline:Pipeline = pipeline_config.setup() -def sse_chunks_from_text(full_text: str, response_id: str, model: str = "qwen-flash", chunk_size: int = 1000): +def sse_chunks_from_stream(chunk_generator, response_id: str, model: str = "qwen-flash"): + """ + Stream chunks from pipeline and format as SSE. + Accumulates text and sends incremental updates. + """ created_time = int(time.time()) + accumulated_text = "" - for i in range(0, len(full_text), chunk_size): - chunk = full_text[i:i + chunk_size] + for chunk in chunk_generator: if chunk: + accumulated_text += chunk data = { "request_id": response_id, "code": 200, @@ -68,12 +73,13 @@ def sse_chunks_from_text(full_text: str, response_id: str, model: str = "qwen-fl } yield f"data: {json.dumps(data)}\n\n" + # Final message with complete text final = { "request_id": response_id, "code": 200, "message": "OK", "output": { - "text": full_text, + "text": accumulated_text, "created": created_time, "model": model, }, @@ -127,20 +133,21 @@ async def application_responses( last = messages[-1] user_msg = last.get("content") if isinstance(last, dict) else str(last) - # Invoke pipeline (non-stream) then stream-chunk it to the client + response_id = f"appcmpl-{os.urandom(12).hex()}" + + if stream: + # Use actual streaming from pipeline + chunk_generator = pipeline.chat(inp=user_msg, as_stream=True, thread_id=thread_id) + return StreamingResponse( + sse_chunks_from_stream(chunk_generator, response_id=response_id, model=pipeline_config.llm_name), + media_type="text/event-stream", + ) + + # Non-streaming: get full result result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id) if not isinstance(result_text, str): result_text = str(result_text) - response_id = f"appcmpl-{os.urandom(12).hex()}" - - if stream: - return StreamingResponse( - sse_chunks_from_text(result_text, response_id=response_id, model=pipeline_config.llm_name, chunk_size=10), - media_type="text/event-stream", - ) - - # Non-streaming response structure data = { "request_id": response_id, "code": 200, @@ -206,18 +213,21 @@ async def application_completion( last = messages[-1] user_msg = last.get("content") if isinstance(last, dict) else str(last) - result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id) - if not isinstance(result_text, str): - result_text = str(result_text) - response_id = f"appcmpl-{os.urandom(12).hex()}" if stream: + # Use actual streaming from pipeline + chunk_generator = pipeline.chat(inp=user_msg, as_stream=True, thread_id=thread_id) return StreamingResponse( - sse_chunks_from_text(result_text, response_id=response_id, model=pipeline_config.llm_name, chunk_size=1000), + sse_chunks_from_stream(chunk_generator, response_id=response_id, model=pipeline_config.llm_name), media_type="text/event-stream", ) + # Non-streaming: get full result + result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id) + if not isinstance(result_text, str): + result_text = str(result_text) + data = { "request_id": response_id, "code": 200,