dashscope use actual streaming instead

This commit is contained in:
2025-11-07 14:55:08 +08:00
parent 1cb8a09c02
commit a920454f39

View File

@@ -48,12 +48,17 @@ pipeline_config = PipelineConfig()
pipeline:Pipeline = pipeline_config.setup()
def sse_chunks_from_text(full_text: str, response_id: str, model: str = "qwen-flash", chunk_size: int = 1000):
def sse_chunks_from_stream(chunk_generator, response_id: str, model: str = "qwen-flash"):
"""
Stream chunks from pipeline and format as SSE.
Accumulates text and sends incremental updates.
"""
created_time = int(time.time())
accumulated_text = ""
for i in range(0, len(full_text), chunk_size):
chunk = full_text[i:i + chunk_size]
for chunk in chunk_generator:
if chunk:
accumulated_text += chunk
data = {
"request_id": response_id,
"code": 200,
@@ -68,12 +73,13 @@ def sse_chunks_from_text(full_text: str, response_id: str, model: str = "qwen-fl
}
yield f"data: {json.dumps(data)}\n\n"
# Final message with complete text
final = {
"request_id": response_id,
"code": 200,
"message": "OK",
"output": {
"text": full_text,
"text": accumulated_text,
"created": created_time,
"model": model,
},
@@ -127,20 +133,21 @@ async def application_responses(
last = messages[-1]
user_msg = last.get("content") if isinstance(last, dict) else str(last)
# Invoke pipeline (non-stream) then stream-chunk it to the client
response_id = f"appcmpl-{os.urandom(12).hex()}"
if stream:
# Use actual streaming from pipeline
chunk_generator = pipeline.chat(inp=user_msg, as_stream=True, thread_id=thread_id)
return StreamingResponse(
sse_chunks_from_stream(chunk_generator, response_id=response_id, model=pipeline_config.llm_name),
media_type="text/event-stream",
)
# Non-streaming: get full result
result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id)
if not isinstance(result_text, str):
result_text = str(result_text)
response_id = f"appcmpl-{os.urandom(12).hex()}"
if stream:
return StreamingResponse(
sse_chunks_from_text(result_text, response_id=response_id, model=pipeline_config.llm_name, chunk_size=10),
media_type="text/event-stream",
)
# Non-streaming response structure
data = {
"request_id": response_id,
"code": 200,
@@ -206,18 +213,21 @@ async def application_completion(
last = messages[-1]
user_msg = last.get("content") if isinstance(last, dict) else str(last)
result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id)
if not isinstance(result_text, str):
result_text = str(result_text)
response_id = f"appcmpl-{os.urandom(12).hex()}"
if stream:
# Use actual streaming from pipeline
chunk_generator = pipeline.chat(inp=user_msg, as_stream=True, thread_id=thread_id)
return StreamingResponse(
sse_chunks_from_text(result_text, response_id=response_id, model=pipeline_config.llm_name, chunk_size=1000),
sse_chunks_from_stream(chunk_generator, response_id=response_id, model=pipeline_config.llm_name),
media_type="text/event-stream",
)
# Non-streaming: get full result
result_text = pipeline.chat(inp=user_msg, as_stream=False, thread_id=thread_id)
if not isinstance(result_text, str):
result_text = str(result_text)
data = {
"request_id": response_id,
"code": 200,