From 6c72948f5bb6093054502947e1827d17f852ff93 Mon Sep 17 00:00:00 2001 From: goulustis Date: Fri, 30 Jan 2026 13:31:01 +0800 Subject: [PATCH] update base async to also return --- lang_agent/base.py | 37 +++++++++++++------------------------ 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/lang_agent/base.py b/lang_agent/base.py index 27a8026..927991f 100644 --- a/lang_agent/base.py +++ b/lang_agent/base.py @@ -37,21 +37,6 @@ class GraphBase(ABC): def _stream_result(self, *nargs, **kwargs): - # def text_iterator(): - # for chunk, metadata in self.workflow.stream({"inp": nargs}, - # stream_mode="messages", - # subgraphs=True, - # **kwargs): - # if isinstance(metadata, tuple): - # chunk, metadata = metadata - - # tags = metadata.get("tags") - # if not (tags in self.streamable_tags): - # continue - - # if isinstance(chunk, (BaseMessageChunk, BaseMessage)) and getattr(chunk, "content", None): - # yield chunk.content - def text_iterator(): for _, mode, out in self.workflow.stream({"inp": nargs}, stream_mode=["messages", "values"], @@ -127,15 +112,17 @@ class GraphBase(ABC): """Async streaming using LangGraph's astream method.""" async def text_iterator(): - async for chunk, metadata in self.workflow.astream( - {"inp": nargs}, - stream_mode="messages", - subgraphs=True, - **kwargs - ): - if isinstance(metadata, tuple): - chunk, metadata = metadata + async for _, mode, out in self.workflow.astream({"inp": nargs}, + stream_mode=["messages", "values"], + subgraphs=True, + **kwargs): + if mode == "values": + val = out.get("messages") + if val is not None: + yield val + continue + chunk, metadata = out tags = metadata.get("tags") if not (tags in self.streamable_tags): continue @@ -144,8 +131,10 @@ class GraphBase(ABC): yield chunk.content text_releaser = AsyncTextReleaser(*self.textreleaser_delay_keys) + logger.info("streaming output") async for chunk in text_releaser.release(text_iterator()): - print(f"\033[92m{chunk}\033[0m", end="", flush=True) + if isinstance(chunk, str): + print(f"\033[92m{chunk}\033[0m", end="", flush=True) yield chunk def _validate_input(self, *nargs, **kwargs):