feat: 更新Docker配置和API端口,优化流式响应处理

refactor: 重构工具管理和路由逻辑,提升代码可维护性

fix: 修正配置文件中的传输协议和工具调用名称

docs: 更新README和归档文件,添加生产环境配置说明

perf: 优化流式响应生成,减少内存消耗
This commit is contained in:
2025-11-07 15:56:45 +08:00
17 changed files with 256 additions and 179 deletions

View File

@@ -17,6 +17,7 @@ from lang_agent.base import GraphBase
from langchain.chat_models import init_chat_model
from langchain_core.messages import SystemMessage, HumanMessage, BaseMessage
from langchain_core.messages.base import BaseMessageChunk
from langchain.agents import create_agent
from langgraph.graph import StateGraph, START, END
@@ -28,7 +29,7 @@ from langgraph.checkpoint.memory import MemorySaver
class RoutingConfig(KeyConfig):
_target: Type = field(default_factory=lambda: RoutingGraph)
llm_name: str = "qwen-flash"
llm_name: str = "qwen-plus"
"""name of llm"""
llm_provider:str = "openai"
@@ -80,29 +81,41 @@ class RoutingGraph(GraphBase):
self.workflow = self._build_graph()
def invoke(self, *nargs, as_stream:bool=False, as_raw:bool=False, **kwargs)->str:
def invoke(self, *nargs, as_stream:bool=False, as_raw:bool=False, **kwargs):
self._validate_input(*nargs, **kwargs)
if as_stream:
# TODO this doesn't stream the entire process, we are blind
for step in self.workflow.stream({"inp": nargs}, stream_mode="updates", **kwargs):
last_el = jax.tree.leaves(step)[-1]
if isinstance(last_el, str):
logger.info(last_el)
elif isinstance(last_el, BaseMessage):
last_el.pretty_print()
state = step
# Stream messages from the workflow
for chunk, metadata in self.workflow.stream({"inp": nargs}, stream_mode="messages", **kwargs):
node = metadata.get("langgraph_node")
if node != "model":
continue # skip router or other intermediate nodes
# Yield only the final message content chunks
if isinstance(chunk, (BaseMessageChunk, BaseMessage)) and getattr(chunk, "content", None):
yield chunk.content
else:
state = self.workflow.invoke({"inp": nargs})
msg_list = jax.tree.leaves(state)
if as_raw:
return msg_list
msg_list = jax.tree.leaves(state)
return msg_list[-1].content
for e in msg_list:
if isinstance(e, BaseMessage):
e.pretty_print()
if as_raw:
return msg_list
return msg_list[-1].content
def _validate_input(self, *nargs, **kwargs):
print("\033[93m====================INPUT MESSAGES=============================\033[0m")
for e in nargs[0]["messages"]:
if isinstance(e, BaseMessage):
e.pretty_print()
print("\033[93m====================END INPUT MESSAGES=============================\033[0m")
print(f"\033[93 model used: {self.config.llm_name}\033[0m")
assert len(nargs[0]["messages"]) >= 2, "need at least 1 system and 1 human message"
assert len(kwargs) == 0, "due to inp assumptions"
@@ -244,5 +257,25 @@ class RoutingGraph(GraphBase):
plt.show()
if __name__ == "__main__":
route = RoutingConfig().setup()
route.show_graph()
from dotenv import load_dotenv
from langchain.messages import SystemMessage, HumanMessage
from langchain_core.messages.base import BaseMessageChunk
load_dotenv()
route:RoutingGraph = RoutingConfig().setup()
graph = route.workflow
nargs = {
"messages": [SystemMessage("you are a helpful bot named jarvis"),
HumanMessage("use the calculator tool to calculate 92*55 and say the answer")]
},{"configurable": {"thread_id": "3"}}
for chunk, metadata in graph.stream({"inp": nargs}, stream_mode="messages"):
node = metadata.get("langgraph_node")
if node not in ("model"):
continue # skip router or other intermediate nodes
# Print only the final message content
if isinstance(chunk, (BaseMessageChunk, BaseMessage)) and getattr(chunk, "content", None):
print(chunk.content, end="", flush=True)

View File

@@ -18,6 +18,45 @@ from lang_agent.graphs import AnnotatedGraph, ReactGraphConfig, RoutingConfig
from lang_agent.base import GraphBase
DEFAULT_PROMPT="""
[角色设定]
你是一个和人对话的 AI叫做小盏是半盏青年茶馆的智能助手
[形象背景]
小盏是一只中式茶盖碗,名字来源半盏新青年茶馆,一盏茶。它有个标志性的蓝色鼻子, 小盏很像一只可爱的小熊。茶盖碗里绵绵能随心情和季节变换好喝的茶饮来, 茶饮充满魔法,能治愈人心,小盏的茶盖打开的时候可能不小心会把思绪也飞出来。
[品牌背景]
半盏新青年茶馆成立时间与理念2023 年创立于云南,结合茶饮与创意生活方式,致力于解构传统茶文化,重构 “无边界的饮茶生活”,以新青年视角探索云南风物。探索云南风物的过程,我们将以新青年的视角,解构传统茶饮的魅力,重构充满创意与温度的新式茶文化。通过嗅觉、味觉、听觉乃至视觉的世界里,讲述云南的故事。
[茶馆背景]
半盏新青年茶馆,是一家现代的创意茶体验品牌,提供纯茶、调饮、茶食、茶酒。“新青年茶馆”也是我们的定位,年轻化的茶馆,通过创意的产品让大家像喝咖啡一样喝茶。目前半盏有 2 个店,昆明、玉溪。全国培训新茶饮市场,线上基础课程 1980线下带店服务线下产品定制服务。
[特殊故事]
-《云南茶事》特调茶饮,是从云南山野和云南茶到轻松小酌的创意新味。讲述的一个嗅觉、味觉、听觉乃至视觉的世界里,在云南的故事,留下对云南的记忆。--该故事对应云南茶事系列菜品要使用get_resorce工具查找相关商品
-城市味觉漫游计划介绍:
「城市味觉漫游计划」如同一颗风味的种子,于城市破土而出
旨在探寻城市的文化肌理与生活美学。我们相信,风味是一颗蕴藏无限可能的种子,能在街巷中生根,与社群共同成长。这是一场关于味觉的集体创作,邀你共酿城市的风味与故事。--该故事对应城市味觉漫游系列菜品要使用get_resorce工具查找相关商品
[公司背景]
创造你的公司叫叠加态 AITANGLED UP AI是一家专注于 AI 技术应用的公司,由一帮名校和海归创始人创立,致力于将 AI 技术落地到实际场景中。2023年3月成立专注于AI前沿应用拓展是云南地区在该领域的新兴力量且作为省共青团委旗下的青年创业者协会青创园第1批被引进的AI企业获得了相关政策支持与关注。口号是“进入人类AI时代”。价值观是自驱 务实 志存高远。叠加态的愿景:成为一帮抬头看星星的人。叠加态的使命: 带领人类进入AI时代
[核心特征]
- 回复简洁明了,不会给用户提供选择并且对话过程中不会提示用户下一步的操作
- 每次对话时先判断是否需要调用工具,需要的话要准确调用工具
[工作流程]
-用户提到“你们有哪些茶品或者饮品”时调用get_resouce工具查询所有菜品
-用户提到“下单”、“购买”等带有目的性的词汇之前都一定要使用get_resouce工具查询是否真实存在这个菜品
- 用户要添加菜/饮品→具体菜品名称必须先用MCP工具查询所有菜/饮品,确认后再添加到购物车。没有的话提醒用户错误
- 支付流程返回收到订单创建成功的字段时调用MPC tool self.screen.display_payment_qr_code工具生成二维码。调用了self.screen.display_payment_qr_code工具之后立马调用MCP tool http-server__query_wechat_order查询订单状态如果返回收到“支付成功”或“支付失败”的字段时调用MPC tool self.screen.payment_success工具退出二维码如果没有收到“支付成功”或“支付失败”的字段时继续调用MCP tool http-server__query_wechat_order工具直到获取字段。
[交互指南]
与用户交流:
- 形象背景和公司背景只有在问到你时才做回答
- 你是智能助手,在与用户交流时扮演服务员角色
- 不会出现、回答、显示任何有关智能的词语
- 牢记[核心特征中的内容],按照[工作流程]工作
绝不:
- 出现任何工具调用的说明、任何有关技术的说明
- 出现购物车、订单、微信支付相关说明
- 对话风格可爱,并且详细认真的介绍产品
"""
@tyro.conf.configure(tyro.conf.SuppressFixed)
@dataclass
@@ -74,9 +113,16 @@ class Pipeline:
else:
logger.info(f"show graph not supported for {type(self.graph)}")
def invoke(self, *nargs, **kwargs)->str:
def invoke(self, *nargs, **kwargs):
out = self.graph.invoke(*nargs, **kwargs)
# If streaming, yield chunks from the generator
if kwargs.get("as_stream"):
for chunk in out:
yield chunk
return
# Non-streaming path
if kwargs.get("as_raw"):
return out
@@ -120,43 +166,9 @@ class Pipeline:
return f"ws://{self.config.host}:{self.config.port}"
def chat(self, inp:str, as_stream:bool=False, as_raw:bool=False, thread_id:int = None)->str:
u = """
[角色设定]
你是一个和人对话的 AI叫做小盏是半盏青年茶馆的智能助手
[形象背景]
小盏是一只中式茶盖碗,名字来源半盏新青年茶馆,一盏茶。它有个标志性的蓝色鼻子, 小盏很像一只可爱的小熊。茶盖碗里绵绵能随心情和季节变换好喝的茶饮来, 茶饮充满魔法,能治愈人心,小盏的茶盖打开的时候可能不小心会把思绪也飞出来。
[品牌背景]
半盏新青年茶馆成立时间与理念2023 年创立于云南,结合茶饮与创意生活方式,致力于解构传统茶文化,重构 “无边界的饮茶生活”,以新青年视角探索云南风物。探索云南风物的过程,我们将以新青年的视角,解构传统茶饮的魅力,重构充满创意与温度的新式茶文化。通过嗅觉、味觉、听觉乃至视觉的世界里,讲述云南的故事。
[茶馆背景]
半盏新青年茶馆,是一家现代的创意茶体验品牌,提供纯茶、调饮、茶食、茶酒。“新青年茶馆”也是我们的定位,年轻化的茶馆,通过创意的产品让大家像喝咖啡一样喝茶。目前半盏有 2 个店,昆明、玉溪。全国培训新茶饮市场,线上基础课程 1980线下带店服务线下产品定制服务。
[特殊故事]
-《云南茶事》特调茶饮,是从云南山野和云南茶到轻松小酌的创意新味。讲述的一个嗅觉、味觉、听觉乃至视觉的世界里,在云南的故事,留下对云南的记忆。--该故事对应云南茶事系列菜品要使用get_resorce工具查找相关商品
-城市味觉漫游计划介绍:
「城市味觉漫游计划」如同一颗风味的种子,于城市破土而出
旨在探寻城市的文化肌理与生活美学。我们相信,风味是一颗蕴藏无限可能的种子,能在街巷中生根,与社群共同成长。这是一场关于味觉的集体创作,邀你共酿城市的风味与故事。--该故事对应城市味觉漫游系列菜品要使用get_resorce工具查找相关商品
[公司背景]
创造你的公司叫叠加态 AITANGLED UP AI是一家专注于 AI 技术应用的公司,由一帮名校和海归创始人创立,致力于将 AI 技术落地到实际场景中。2023年3月成立专注于AI前沿应用拓展是云南地区在该领域的新兴力量且作为省共青团委旗下的青年创业者协会青创园第1批被引进的AI企业获得了相关政策支持与关注。口号是“进入人类AI时代”。价值观是自驱 务实 志存高远。叠加态的愿景:成为一帮抬头看星星的人。叠加态的使命: 带领人类进入AI时代
[核心特征]
- 回复简洁明了,不会给用户提供选择并且对话过程中不会提示用户下一步的操作
- 每次对话时先判断是否需要调用工具,需要的话要准确调用工具
[工作流程]
-用户提到“你们有哪些茶品或者饮品”时调用get_resouce工具查询所有菜品
-用户提到“下单”、“购买”等带有目的性的词汇之前都一定要使用get_resouce工具查询是否真实存在这个菜品
- 用户要添加菜/饮品→具体菜品名称必须先用MCP工具查询所有菜/饮品,确认后再添加到购物车。没有的话提醒用户错误
- 支付流程返回收到订单创建成功的字段时调用MPC tool self.screen.display_payment_qr_code工具生成二维码。调用了self.screen.display_payment_qr_code工具之后立马调用MCP tool http-server__query_wechat_order查询订单状态如果返回收到“支付成功”或“支付失败”的字段时调用MPC tool self.screen.payment_success工具退出二维码如果没有收到“支付成功”或“支付失败”的字段时继续调用MCP tool http-server__query_wechat_order工具直到获取字段。
[交互指南]
与用户交流:
- 形象背景和公司背景只有在问到你时才做回答
- 你是智能助手,在与用户交流时扮演服务员角色
- 不会出现、回答、显示任何有关智能的词语
- 牢记[核心特征中的内容],按照[工作流程]工作
绝不:
- 出现任何工具调用的说明、任何有关技术的说明
- 出现购物车、订单、微信支付相关说明
- 对话风格可爱,并且详细认真的介绍产品
"""
def chat(self, inp:str, as_stream:bool=False, as_raw:bool=False, thread_id:int = None):
# NOTE: this prompt will be overwritten by 'configs/route_sys_prompts/chat_prompt.txt' for route graph
u = DEFAULT_PROMPT
thread_id = thread_id if thread_id is not None else 3
inp = {"messages":[SystemMessage(u),
@@ -164,5 +176,9 @@ class Pipeline:
out = self.invoke(*inp, as_stream=as_stream, as_raw=as_raw)
# return out['messages'][-1].content
return out
if as_stream:
# Yield chunks from the generator
for chunk in out:
yield chunk
else:
return out

View File

View File

@@ -9,8 +9,10 @@ from loguru import logger
from fastmcp.tools.tool import Tool
from lang_agent.config import InstantiateConfig, ToolConfig
from lang_agent.base import LangToolBase
from lang_agent.client_tool_manager import ClientToolManagerConfig
from lang_agent.rag.simple import SimpleRagConfig
from lang_agent.dummy.calculator import CalculatorConfig
# from lang_agent.dummy.calculator import CalculatorConfig
# from catering_end.lang_tool import CartToolConfig, CartTool
from langchain_core.tools.structured import StructuredTool
from lang_agent.client_tool_manager import ClientToolManager
@@ -19,12 +21,14 @@ from lang_agent.client_tool_manager import ClientToolManager
class ToolManagerConfig(InstantiateConfig):
_target: Type = field(default_factory=lambda: ToolManager)
client_tool_manager: ClientToolManagerConfig = field(default_factory=ClientToolManagerConfig)
# tool configs here; MUST HAVE 'config' in name and must be dataclass
rag_config: SimpleRagConfig = field(default_factory=SimpleRagConfig)
# cart_config: CartToolConfig = field(default_factory=CartToolConfig)
calc_config: CalculatorConfig = field(default_factory=CalculatorConfig)
# calc_config: CalculatorConfig = field(default_factory=CalculatorConfig)
def async_to_sync(async_func: Callable) -> Callable:
@@ -97,9 +101,10 @@ class ToolManager:
logger.info(f"skipping tool:{tool_name}")
try:
from lang_agent.client_tool_manager import ClientToolManagerConfig
client_config = ClientToolManagerConfig()
self.client_tool_manager = ClientToolManager(client_config)
# client_config = self.config.client_tool_manager
# self.client_tool_manager = ClientToolManager(client_config)
# self.client_tool_manager = ClientToolManager(self.config.client_tool_manager)
self.client_tool_manager:ClientToolManager = self.config.client_tool_manager.setup()
logger.info("Successfully initialized client_tool_manager for MCP tools")
except Exception as e:
logger.warning(f"Failed to initialize client_tool_manager: {e}")
@@ -134,25 +139,26 @@ class ToolManager:
self.langchain_tools = []
for func in self.get_tool_fncs():
if isinstance(func, StructuredTool):
self.langchain_tools.append(func)
if hasattr(func, 'coroutine') and func.coroutine is not None and (not hasattr(func, 'func') or func.func is None):
sync_func = async_to_sync(func.coroutine)
new_tool = StructuredTool(
name=func.name,
description=func.description,
args_schema=func.args_schema,
func=sync_func,
coroutine=func.coroutine,
metadata=func.metadata if hasattr(func, 'metadata') else None,
return_direct=func.return_direct if hasattr(func, 'return_direct') else False,
)
self.langchain_tools.append(new_tool)
else:
self.langchain_tools.append(func)
else:
self.langchain_tools.append(self.fnc_to_structool(func))
return self.langchain_tools
def get_list_langchain_tools(self)->List[StructuredTool]:
all_langchain_tools = []
all_langchain_tools.extend(self.langchain_tools)
# 如果有 client_tool_manager添加 MCP 工具(已经是 LangChain 格式)
if self.client_tool_manager:
try:
# 获取 MCP 工具(已经是 StructuredTool 格式)
mcp_tools = self.client_tool_manager.get_tools()
all_langchain_tools.extend(mcp_tools)
except Exception as e:
logger.warning(f"Failed to get MCP tools: {e}")
return all_langchain_tools
return self.langchain_tools
if __name__ == "__main__":