Compare commits

...

40 Commits

Author SHA1 Message Date
9b3db40b94 enable simple chat 2026-03-06 15:19:51 +08:00
da17f2b319 show inference info 2026-03-06 14:48:00 +08:00
4b6e97d8fb get_pipeline_conversation_messages 2026-03-06 14:47:41 +08:00
112af37151 chat dashscope 2026-03-06 13:51:46 +08:00
3cd46030ad support markdown visualization 2026-03-06 13:43:09 +08:00
dd842fca42 update tests 2026-03-06 13:19:26 +08:00
fc9f0f929d css update 2026-03-06 13:18:31 +08:00
28d99f4b8d save pipeline_id 2026-03-06 13:17:04 +08:00
dac067b6fe list_pipeline_conversations front_api 2026-03-06 13:16:53 +08:00
e90f0afabe ts 2026-03-06 13:16:24 +08:00
0676a68c9e crash the fking thing 2026-03-06 13:16:08 +08:00
f185b70d3f chat convo tab + save yaml 2026-03-06 13:15:51 +08:00
07149e426e moved constant 2026-03-06 11:36:07 +08:00
3fc3d7288c react front end show available tools from mcp 2026-03-05 19:32:46 +08:00
eb7e85e4e6 front_api list_mcp_available_tools 2026-03-05 19:31:41 +08:00
ddfda10700 aget_tool_with_error in client_tool_manager 2026-03-05 19:31:21 +08:00
f8364bea68 remove full path 2026-03-05 17:46:47 +08:00
01b0975abd update how mcp is configured 2026-03-05 17:44:25 +08:00
7e23d5c056 yaml to sql migration script 2026-03-05 17:17:10 +08:00
3b730798f8 yml to yaml 2026-03-05 17:15:20 +08:00
2781172724 use yaml instead of yml 2026-03-05 15:51:59 +08:00
26fba706f2 graph_id button bug fix 2026-03-05 15:31:58 +08:00
ae93ef37b6 update UI 2026-03-05 15:19:10 +08:00
c1b782c6b4 update registry 2026-03-05 15:05:10 +08:00
ab3285a4cf remove gitignore 2026-03-05 15:04:43 +08:00
0484343021 fixed combined.py disc 2026-03-05 14:55:08 +08:00
b87fded473 combined.py 2026-03-05 14:49:47 +08:00
8db22abf3b moved files 2026-03-05 14:48:36 +08:00
f6d86f24bb tests 2026-03-05 14:43:17 +08:00
c1afebd7ba make it a importable package 2026-03-05 14:43:05 +08:00
080631af31 update tests 2026-03-05 14:42:55 +08:00
38b0d5df15 change default port 2026-03-05 14:42:14 +08:00
f7937c3744 print which back end port it is connecting to 2026-03-05 11:51:31 +08:00
867acaf717 combine both front_apis and server_dashscope into one backend 2026-03-05 11:47:30 +08:00
a2890148f9 make this importable without tyro fking around 2026-03-05 11:43:16 +08:00
55b37cc611 change defualt port 2026-03-05 11:25:08 +08:00
c85598418d reload msg 2026-03-05 11:24:54 +08:00
ea605e19aa check for registry update 2026-03-05 11:24:29 +08:00
866edc319f start port at 8500 as default 2026-03-05 11:23:58 +08:00
8c6dd3344f dynamic updates of pipelines from updating pipeline_registry 2026-03-05 11:23:37 +08:00
30 changed files with 4674 additions and 265 deletions

3
.gitignore vendored
View File

@@ -11,4 +11,5 @@ django.log
.env
frontend/node_modules/
frontend/dist/
frontend/dist/
frontend/.vite

View File

@@ -1,24 +1,31 @@
{
"pipelines": {
"xiaozhan": {
"enabled": true,
"config_file": "configs/pipelines/xiaozhan.yaml"
"pipelines": {
"xiaozhan": {
"enabled": true,
"config_file": "configs/pipelines/xiaozhan.yaml",
"graph_id": "routing",
"overrides": {
"llm_name": "qwen-plus"
}
},
"blueberry": {
"enabled": true,
"config_file": "configs/pipelines/blueberry.yaml",
"graph_id": "react",
"overrides": {
"llm_name": "qwen-plus"
}
}
},
"blueberry": {
"enabled": true,
"config_file": "configs/pipelines/blueberry.yaml"
"api_keys": {
"sk-6c7091e6a95f404efb2ec30e8f51b897626d670375cdf822d78262f24ab12367": {
"example-key-1": {
"default_route_id": "default",
"allowed_route_ids": [
"xiaozhan",
"blueberry"
]
}
}
}
},
"api_keys": {
"sk-6c7091e6a95f404efb2ec30e8f51b897626d670375cdf822d78262f24ab12367": {
"example-key-1": {
"default_route_id": "default",
"allowed_route_ids": [
"xiaozhan",
"blueberry"
]
}
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -11,7 +11,9 @@
},
"dependencies": {
"react": "^18.3.1",
"react-dom": "^18.3.1"
"react-dom": "^18.3.1",
"react-markdown": "^10.1.0",
"remark-gfm": "^4.0.1"
},
"devDependencies": {
"@types/react": "^18.3.20",

File diff suppressed because it is too large Load Diff

View File

@@ -1,20 +1,33 @@
import type {
AvailableGraphsResponse,
ConversationListItem,
ConversationMessageItem,
GraphConfigListResponse,
GraphConfigReadResponse,
GraphConfigUpsertRequest,
GraphConfigUpsertResponse,
McpAvailableToolsResponse,
McpToolConfigResponse,
McpToolConfigUpdateRequest,
McpToolConfigUpdateResponse,
PipelineCreateRequest,
PipelineCreateResponse,
PipelineConversationListResponse,
PipelineConversationMessagesResponse,
PipelineListResponse,
PipelineStopResponse,
RuntimeAuthInfoResponse,
} from "../types";
const API_BASE_URL =
import.meta.env.VITE_FRONT_API_BASE_URL?.trim() || "http://127.0.0.1:8001";
import.meta.env.VITE_FRONT_API_BASE_URL?.trim() || "http://127.0.0.1:8500";
// Log which backend the frontend is targeting on startup, with file + line hint.
// This runs once when the module is loaded.
// eslint-disable-next-line no-console
console.info(
`[frontend] Using FRONT_API_BASE_URL=${API_BASE_URL} (src/api/frontApis.ts:16)`
);
async function fetchJson<T>(path: string, init?: RequestInit): Promise<T> {
const response = await fetch(`${API_BASE_URL}${path}`, {
@@ -107,6 +120,10 @@ export function updateMcpToolConfig(
});
}
export function listMcpAvailableTools(): Promise<McpAvailableToolsResponse> {
return fetchJson("/v1/tool-configs/mcp/tools");
}
export function createPipeline(
payload: PipelineCreateRequest
): Promise<PipelineCreateResponse> {
@@ -126,3 +143,129 @@ export function stopPipeline(pipelineId: string): Promise<PipelineStopResponse>
});
}
export function getRuntimeAuthInfo(): Promise<RuntimeAuthInfoResponse> {
return fetchJson("/v1/runtime-auth");
}
export async function listPipelineConversations(
pipelineId: string,
limit = 100
): Promise<ConversationListItem[]> {
const response = await fetchJson<PipelineConversationListResponse>(
`/v1/pipelines/${encodeURIComponent(pipelineId)}/conversations?limit=${limit}`
);
return response.items || [];
}
export async function getPipelineConversationMessages(
pipelineId: string,
conversationId: string
): Promise<ConversationMessageItem[]> {
const response = await fetchJson<PipelineConversationMessagesResponse>(
`/v1/pipelines/${encodeURIComponent(pipelineId)}/conversations/${encodeURIComponent(conversationId)}/messages`
);
return response.items || [];
}
type StreamAgentChatOptions = {
appId: string;
sessionId: string;
apiKey: string;
message: string;
onText: (text: string) => void;
};
function parseErrorDetail(payload: unknown): string | null {
if (!payload || typeof payload !== "object") {
return null;
}
const detail = (payload as { detail?: unknown }).detail;
return typeof detail === "string" && detail.trim() ? detail : null;
}
export async function streamAgentChatResponse(
options: StreamAgentChatOptions
): Promise<string> {
const { appId, sessionId, apiKey, message, onText } = options;
const response = await fetch(
`${API_BASE_URL}/v1/apps/${encodeURIComponent(appId)}/sessions/${encodeURIComponent(sessionId)}/responses`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${apiKey}`,
},
body: JSON.stringify({
messages: [{ role: "user", content: message }],
stream: true,
}),
}
);
if (!response.ok) {
let messageText = `Request failed (${response.status})`;
try {
const payload = (await response.json()) as unknown;
const detail = parseErrorDetail(payload);
if (detail) {
messageText = detail;
}
} catch {
// Keep fallback status-based message.
}
throw new Error(messageText);
}
if (!response.body) {
throw new Error("Streaming response is not available.");
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffered = "";
let latestText = "";
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
buffered += decoder.decode(value, { stream: true });
let splitIndex = buffered.indexOf("\n\n");
while (splitIndex >= 0) {
const eventBlock = buffered.slice(0, splitIndex);
buffered = buffered.slice(splitIndex + 2);
splitIndex = buffered.indexOf("\n\n");
const lines = eventBlock.split("\n");
for (const rawLine of lines) {
const line = rawLine.trim();
if (!line.startsWith("data:")) {
continue;
}
const payloadRaw = line.slice(5).trim();
if (!payloadRaw) {
continue;
}
let payload: unknown;
try {
payload = JSON.parse(payloadRaw);
} catch {
continue;
}
const nextText =
typeof (payload as { output?: { text?: unknown } })?.output?.text === "string"
? ((payload as { output: { text: string } }).output.text as string)
: "";
if (nextText !== latestText) {
latestText = nextText;
onText(latestText);
}
}
}
}
return latestText;
}

View File

@@ -65,6 +65,12 @@ button:disabled {
gap: 8px;
}
.agent-item-row {
display: grid;
gap: 6px;
grid-template-columns: 1fr auto;
}
.agent-item {
align-items: flex-start;
display: flex;
@@ -73,6 +79,33 @@ button:disabled {
width: 100%;
}
.agent-item-title {
align-items: center;
display: flex;
gap: 8px;
justify-content: space-between;
width: 100%;
}
.agent-status-pill {
border-radius: 999px;
font-size: 11px;
font-weight: 600;
padding: 2px 8px;
}
.agent-status-pill.running {
background: #dff7e7;
border: 1px solid #8cd3a1;
color: #1a6b35;
}
.agent-status-pill.stopped {
background: #f2f4f7;
border: 1px solid #d1d8e0;
color: #4a5565;
}
.agent-item.selected {
border-color: #4d7ef3;
background: #edf3ff;
@@ -82,6 +115,11 @@ button:disabled {
color: #5f6f82;
}
.agent-chat-button {
align-self: stretch;
min-width: 64px;
}
.content {
padding: 20px;
}
@@ -198,6 +236,32 @@ button:disabled {
margin-top: 0;
}
.run-info-header {
align-items: center;
display: flex;
justify-content: space-between;
gap: 8px;
}
.runtime-badge {
border-radius: 999px;
font-size: 12px;
font-weight: 600;
padding: 4px 10px;
}
.runtime-badge.running {
background: #dff7e7;
border: 1px solid #8cd3a1;
color: #1a6b35;
}
.runtime-badge.stopped {
background: #f2f4f7;
border: 1px solid #d1d8e0;
color: #4a5565;
}
.graph-arch-section {
border: 1px solid #dbe2ea;
border-radius: 10px;
@@ -229,6 +293,166 @@ button:disabled {
padding: 10px;
}
.run-card-columns {
display: grid;
gap: 12px;
grid-template-columns: minmax(280px, 1fr) minmax(420px, 1.3fr);
}
.run-card-left,
.run-card-right {
display: flex;
flex-direction: column;
gap: 6px;
}
.run-card-right {
border-left: 1px solid #dbe2ea;
min-width: 0;
padding-left: 12px;
}
.run-card-right code {
display: inline-block;
max-width: 100%;
overflow-x: auto;
vertical-align: middle;
white-space: nowrap;
}
.discussion-section {
background: #f7fbff;
border: 1px solid #d7e6f6;
border-radius: 10px;
padding: 12px;
}
.discussion-header {
align-items: center;
display: flex;
justify-content: space-between;
gap: 12px;
}
.discussion-header h3 {
margin: 0;
}
.discussion-layout {
display: grid;
gap: 12px;
grid-template-columns: minmax(260px, 320px) 1fr;
margin-top: 10px;
}
.discussion-list {
display: flex;
flex-direction: column;
gap: 8px;
max-height: 65vh;
overflow-y: auto;
}
.discussion-item {
align-items: flex-start;
display: flex;
flex-direction: column;
gap: 4px;
text-align: left;
width: 100%;
}
.discussion-item.selected {
background: #edf3ff;
border-color: #4d7ef3;
}
.discussion-item small {
color: #687788;
}
.discussion-thread {
border: 1px solid #d7e6f6;
border-radius: 10px;
display: flex;
flex-direction: column;
gap: 8px;
max-height: 65vh;
overflow-y: auto;
padding: 10px;
}
.discussion-message {
background: #fff;
border: 1px solid #dbe2ea;
border-radius: 8px;
padding: 8px;
}
.discussion-message.human {
border-left: 3px solid #4d7ef3;
}
.discussion-message.ai {
border-left: 3px solid #26a269;
}
.discussion-message.tool {
border-left: 3px solid #8e6bd8;
}
.discussion-message-meta {
align-items: baseline;
display: flex;
gap: 8px;
}
.discussion-message pre {
font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace;
margin: 8px 0 0;
overflow-x: auto;
white-space: pre;
}
.discussion-message-markdown > :first-child {
margin-top: 0;
}
.discussion-message-markdown > :last-child {
margin-bottom: 0;
}
.discussion-message-markdown code {
background: #f3f5f8;
border-radius: 4px;
padding: 1px 4px;
}
.discussion-message-markdown pre code {
background: transparent;
padding: 0;
}
.discussion-message-markdown a {
color: #1a4fc5;
text-decoration: underline;
}
.discussion-message-markdown p,
.discussion-message-markdown ul,
.discussion-message-markdown ol,
.discussion-message-markdown blockquote,
.discussion-message-markdown table {
margin: 8px 0;
}
.discussion-message-markdown blockquote {
border-left: 3px solid #d0d8e2;
color: #4f5f73;
margin-left: 0;
padding-left: 10px;
}
.mcp-config-section {
background: #f7fbff;
border: 1px solid #d7e6f6;
@@ -258,8 +482,149 @@ button:disabled {
width: 100%;
}
.mcp-entry-list {
display: grid;
gap: 12px;
margin-top: 10px;
}
.mcp-tools-error {
color: #a33434;
margin: 6px 0 0 0;
}
.mcp-tools-inline {
background: #f8fbff;
border: 1px solid #d7e6f6;
border-radius: 8px;
margin: 0 0 10px 0;
padding: 8px;
}
.mcp-entry-card {
background: #fff;
border: 1px solid #d7e6f6;
border-radius: 10px;
padding: 10px;
}
.mcp-entry-header {
align-items: center;
display: flex;
justify-content: space-between;
gap: 10px;
margin-bottom: 10px;
}
.mcp-entry-grid {
display: grid;
gap: 10px;
grid-template-columns: repeat(2, minmax(200px, 1fr));
}
.mcp-entry-grid label {
display: flex;
flex-direction: column;
font-size: 14px;
gap: 6px;
}
.mcp-entry-grid input,
.mcp-entry-grid select {
border: 1px solid #c9d4e2;
border-radius: 8px;
font-size: 14px;
padding: 8px;
}
.mcp-entry-wide {
grid-column: 1 / -1;
}
.empty {
color: #687788;
margin: 6px 0;
}
.chat-modal-overlay {
align-items: center;
background: rgba(16, 24, 40, 0.45);
display: flex;
inset: 0;
justify-content: center;
position: fixed;
z-index: 20;
}
.chat-modal {
background: #fff;
border: 1px solid #d7e6f6;
border-radius: 12px;
display: grid;
gap: 10px;
max-height: 86vh;
max-width: 820px;
padding: 12px;
width: min(92vw, 820px);
}
.chat-modal-header {
align-items: center;
border-bottom: 1px solid #dbe2ea;
display: flex;
justify-content: space-between;
padding-bottom: 8px;
}
.chat-modal-header small {
color: #687788;
display: block;
margin-top: 2px;
}
.chat-modal-messages {
background: #f8fbff;
border: 1px solid #d7e6f6;
border-radius: 10px;
display: flex;
flex-direction: column;
gap: 8px;
max-height: 56vh;
overflow-y: auto;
padding: 10px;
}
.chat-modal-message {
background: #fff;
border: 1px solid #dbe2ea;
border-radius: 8px;
padding: 8px;
}
.chat-modal-message.user {
border-left: 3px solid #4d7ef3;
}
.chat-modal-message.assistant {
border-left: 3px solid #26a269;
}
.chat-modal-message p {
margin: 6px 0 0 0;
white-space: pre-wrap;
}
.chat-modal-input {
display: grid;
gap: 8px;
grid-template-columns: 1fr auto;
}
.chat-modal-input textarea {
border: 1px solid #c9d4e2;
border-radius: 8px;
font-size: 14px;
padding: 8px;
resize: vertical;
}

View File

@@ -89,6 +89,38 @@ export type PipelineStopResponse = {
reload_required: boolean;
};
export type ConversationListItem = {
conversation_id: string;
pipeline_id: string;
message_count: number;
last_updated?: string | null;
};
export type PipelineConversationListResponse = {
pipeline_id: string;
items: ConversationListItem[];
count: number;
};
export type ConversationMessageItem = {
message_type: string;
content: string;
sequence_number: number;
created_at: string;
};
export type PipelineConversationMessagesResponse = {
pipeline_id: string;
conversation_id: string;
items: ConversationMessageItem[];
count: number;
};
export type RuntimeAuthInfoResponse = {
fast_api_key: string;
source: string;
};
export type McpToolConfigResponse = {
path: string;
raw_content: string;
@@ -105,3 +137,15 @@ export type McpToolConfigUpdateResponse = {
tool_keys: string[];
};
export type McpAvailableToolsResponse = {
available_tools: string[];
errors: string[];
servers: Record<
string,
{
tools: string[];
error?: string | null;
}
>;
};

View File

@@ -251,32 +251,30 @@ class ClientToolManager:
def populate_module(self):
with open(self.config.mcp_config_f, "r") as f:
self.mcp_configs:dict = commentjson.load(f)
def _get_to_load_configs(self) -> dict:
if self.config.tool_keys is None:
return self.mcp_configs
if len(self.config.tool_keys) == 0:
logger.info("no tools will be loaded")
return {}
to_load_config = {}
for key in self.config.tool_keys:
val = self.mcp_configs.get(key)
if val is None:
logger.warning(f"{key} is not in mcp tools")
else:
to_load_config[key] = val
return to_load_config
async def aget_tools(self):
"""
Get tools from all configured MCP servers.
Handles connection failures gracefully by logging warnings and continuing.
"""
def get_to_load_configs() -> dict:
if self.config.tool_keys is None:
to_load_config = self.mcp_configs
else:
if len(self.config.tool_keys) == 0:
logger.info("no tools will be loaded")
return {}
to_load_config = {}
for key in self.config.tool_keys:
val = self.mcp_configs.get(key)
if val is None:
logger.warning(f"{key} is not in mcp tools")
else:
to_load_config[key] = val
return to_load_config
to_load_config = get_to_load_configs()
to_load_config = self._get_to_load_configs()
all_tools = []
for server_name, server_config in to_load_config.items():
try:
@@ -298,6 +296,78 @@ class ClientToolManager:
return all_tools
async def aget_tools_with_errors(self):
"""
Get tools and collect human-readable per-server errors.
Returns:
(all_tools, errors)
"""
to_load_config = self._get_to_load_configs()
all_tools = []
errors = []
for server_name, server_config in to_load_config.items():
try:
single_server_config = {server_name: server_config}
client = MultiServerMCPClient(single_server_config)
tools = await client.get_tools()
all_tools.extend(tools)
logger.info(
f"Successfully connected to MCP server '{server_name}', retrieved {len(tools)} tools"
)
except Exception as e:
url = (
server_config.get("url", "unknown URL")
if isinstance(server_config, dict)
else "unknown URL"
)
err_msg = (
f"{server_name} ({url}): {type(e).__name__}: {e}"
)
errors.append(err_msg)
logger.exception(
f"Failed to connect to MCP server '{server_name}' at {url}"
)
if hasattr(e, "exceptions"):
for nested_exc in e.exceptions:
errors.append(
f"{server_name} nested: {type(nested_exc).__name__}: {nested_exc}"
)
continue
return all_tools, errors
async def aget_tools_by_server(self) -> dict:
"""
Get MCP tools grouped by server name, including per-server error (if any).
Returns:
{
"server_name": {
"tools": ["tool_a", "tool_b"],
"error": "ExceptionType: message" | None,
},
...
}
"""
to_load_config = self._get_to_load_configs()
grouped = {}
for server_name, server_config in to_load_config.items():
grouped[server_name] = {"tools": [], "error": None}
try:
single_server_config = {server_name: server_config}
client = MultiServerMCPClient(single_server_config)
tools = await client.get_tools()
tool_names = sorted(
{
str(getattr(tool, "name", "")).strip()
for tool in tools
if str(getattr(tool, "name", "")).strip()
}
)
grouped[server_name] = {"tools": tool_names, "error": None}
except Exception as e:
grouped[server_name]["error"] = f"{type(e).__name__}: {e}"
logger.exception(f"Failed to connect to MCP server '{server_name}'")
return grouped
def get_tools(self):
try:
loop = asyncio.get_running_loop()

View File

@@ -4,6 +4,7 @@ from pathlib import Path as FsPath
import os.path as osp
import json
import copy
from threading import RLock
from loguru import logger
from lang_agent.pipeline import Pipeline, PipelineConfig
@@ -20,6 +21,9 @@ class ServerPipelineManager:
self._api_key_policy: Dict[str, Dict[str, Any]] = {}
self._pipelines: Dict[str, Pipeline] = {}
self._pipeline_llm: Dict[str, str] = {}
self._registry_path: Optional[str] = None
self._registry_mtime_ns: Optional[int] = None
self._lock = RLock()
def _resolve_registry_path(self, registry_path: str) -> str:
path = FsPath(registry_path)
@@ -30,39 +34,102 @@ class ServerPipelineManager:
root = FsPath(__file__).resolve().parents[2]
return str((root / path).resolve())
def load_registry(self, registry_path: str) -> None:
abs_path = self._resolve_registry_path(registry_path)
if not osp.exists(abs_path):
raise ValueError(f"pipeline registry file not found: {abs_path}")
def _stat_registry_mtime_ns(self, abs_path: str) -> int:
return FsPath(abs_path).stat().st_mtime_ns
def _read_registry(self, abs_path: str) -> Dict[str, Any]:
with open(abs_path, "r", encoding="utf-8") as f:
registry: dict = json.load(f)
return json.load(f)
def _apply_registry(self, abs_path: str, registry: Dict[str, Any], mtime_ns: int) -> bool:
pipelines = registry.get("pipelines")
if pipelines is None:
if pipelines is None or not isinstance(pipelines, dict):
raise ValueError("`pipelines` in pipeline registry must be an object.")
self._pipeline_specs = {}
parsed_specs: Dict[str, Dict[str, Any]] = {}
for pipeline_id, spec in pipelines.items():
if not isinstance(spec, dict):
raise ValueError(
f"pipeline spec for `{pipeline_id}` must be an object."
)
self._pipeline_specs[pipeline_id] = {
parsed_specs[pipeline_id] = {
"enabled": bool(spec.get("enabled", True)),
"config_file": spec.get("config_file"),
"overrides": spec.get("overrides", {}),
}
if not self._pipeline_specs:
if not parsed_specs:
raise ValueError("pipeline registry must define at least one pipeline.")
api_key_policy = registry.get("api_keys", {})
if api_key_policy and not isinstance(api_key_policy, dict):
raise ValueError("`api_keys` in pipeline registry must be an object.")
self._api_key_policy = api_key_policy
logger.info(
f"loaded pipeline registry: {abs_path}, pipelines={list(self._pipeline_specs.keys())}"
with self._lock:
old_specs = self._pipeline_specs
old_policy = self._api_key_policy
old_mtime = self._registry_mtime_ns
removed = set(old_specs.keys()) - set(parsed_specs.keys())
added = set(parsed_specs.keys()) - set(old_specs.keys())
modified = {
pipeline_id
for pipeline_id in (set(old_specs.keys()) & set(parsed_specs.keys()))
if old_specs[pipeline_id] != parsed_specs[pipeline_id]
}
changed = bool(added or removed or modified or old_policy != api_key_policy)
# Drop stale cache entries for deleted/changed pipelines so future requests
# lazily rebuild from the refreshed registry spec.
for pipeline_id in (removed | modified):
self._pipelines.pop(pipeline_id, None)
self._pipeline_llm.pop(pipeline_id, None)
self._pipeline_specs = parsed_specs
self._api_key_policy = api_key_policy
self._registry_path = abs_path
self._registry_mtime_ns = mtime_ns
if changed:
logger.info(
"refreshed pipeline registry: {} | added={} modified={} removed={} mtime={}",
abs_path,
sorted(added),
sorted(modified),
sorted(removed),
mtime_ns,
)
elif old_mtime != mtime_ns:
logger.debug("pipeline registry mtime changed but specs were unchanged: {}", abs_path)
return changed
def load_registry(self, registry_path: str) -> None:
abs_path = self._resolve_registry_path(registry_path)
if not osp.exists(abs_path):
raise ValueError(f"pipeline registry file not found: {abs_path}")
registry = self._read_registry(abs_path)
mtime_ns = self._stat_registry_mtime_ns(abs_path)
self._apply_registry(abs_path=abs_path, registry=registry, mtime_ns=mtime_ns)
def refresh_registry_if_needed(
self, registry_path: Optional[str] = None, force: bool = False
) -> bool:
abs_path = (
self._resolve_registry_path(registry_path)
if registry_path
else self._registry_path
)
if not abs_path:
raise ValueError("registry path is not initialized")
if not osp.exists(abs_path):
raise ValueError(f"pipeline registry file not found: {abs_path}")
mtime_ns = self._stat_registry_mtime_ns(abs_path)
with self._lock:
if not force and self._registry_mtime_ns == mtime_ns:
return False
registry = self._read_registry(abs_path)
return self._apply_registry(abs_path=abs_path, registry=registry, mtime_ns=mtime_ns)
def _resolve_config_path(self, config_file: str) -> str:
path = FsPath(config_file)
@@ -91,11 +158,12 @@ class ServerPipelineManager:
if hasattr(loaded_cfg, "setup"):
cfg = loaded_cfg
else:
logger.warning(
f"config_file for pipeline `{pipeline_id}` did not deserialize to config object; "
"falling back to default config and applying pipeline-level overrides."
raise ValueError(
"config_file for pipeline "
f"`{pipeline_id}` did not deserialize to a config object. "
"Rebuild the pipeline via /v1/pipelines to regenerate a "
"valid serialized PipelineConfig file."
)
cfg = copy.deepcopy(self.default_config)
else:
cfg = copy.deepcopy(self.default_config)
if not isinstance(overrides, dict):
@@ -138,29 +206,33 @@ class ServerPipelineManager:
or app_id
)
if not pipeline_id:
key_policy = (
self._api_key_policy.get(api_key, {}) if self._api_key_policy else {}
)
pipeline_id = key_policy.get(
"default_pipeline_id", self.default_pipeline_id
)
with self._lock:
if not pipeline_id:
key_policy = (
self._api_key_policy.get(api_key, {}) if self._api_key_policy else {}
)
pipeline_id = key_policy.get(
"default_pipeline_id", self.default_pipeline_id
)
if pipeline_id not in self._pipeline_specs:
raise HTTPException(
status_code=404, detail=f"Unknown pipeline_id: {pipeline_id}"
)
if pipeline_id not in self._pipeline_specs:
raise HTTPException(
status_code=404, detail=f"Unknown pipeline_id: {pipeline_id}"
)
self._authorize(api_key, pipeline_id)
self._authorize(api_key, pipeline_id)
return pipeline_id
def get_pipeline(self, pipeline_id: str) -> Tuple[Pipeline, str]:
cached = self._pipelines.get(pipeline_id)
if cached is not None:
return cached, self._pipeline_llm[pipeline_id]
with self._lock:
cached = self._pipelines.get(pipeline_id)
if cached is not None:
return cached, self._pipeline_llm[pipeline_id]
pipeline_obj, llm_name = self._build_pipeline(pipeline_id)
self._pipelines[pipeline_id] = pipeline_obj
self._pipeline_llm[pipeline_id] = llm_name
# Build while holding the lock to avoid duplicate construction for
# the same pipeline on concurrent first requests.
pipeline_obj, llm_name = self._build_pipeline(pipeline_id)
self._pipelines[pipeline_id] = pipeline_obj
self._pipeline_llm[pipeline_id] = llm_name
logger.info(f"lazy-loaded pipeline_id={pipeline_id} model={llm_name}")
return pipeline_obj, llm_name

View File

@@ -12,5 +12,7 @@ from lang_agent.config.constants import (
PIPELINE_REGISTRY_PATH,
VALID_API_KEYS,
API_KEY_HEADER,
API_KEY_HEADER_NO_ERROR
API_KEY_HEADER_NO_ERROR,
_PROJECT_ROOT,
TY_BUILD_SCRIPT,
)

View File

@@ -15,3 +15,5 @@ API_KEY_HEADER = APIKeyHeader(name="Authorization", auto_error=True)
API_KEY_HEADER_NO_ERROR = APIKeyHeader(name="Authorization", auto_error=False)
VALID_API_KEYS = set(filter(None, os.environ.get("FAST_AUTH_KEYS", "").split(",")))
TY_BUILD_SCRIPT = osp.join(_PROJECT_ROOT, "lang_agent", "config", "ty_build_config.py")

View File

@@ -96,7 +96,7 @@ class Evaluator:
df_m.to_csv(metric_f)
self.config.save_config(f"{head_path}-{n_exp}.yml")
self.config.save_config(f"{head_path}-{n_exp}.yaml")
def format_result_df(self, df:pd.DataFrame):

View File

View File

@@ -0,0 +1,33 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from lang_agent.fastapi_server.front_apis import app as front_app
from lang_agent.fastapi_server.server_dashscope import create_dashscope_router
app = FastAPI(
title="Combined Front + DashScope APIs",
description=(
"Single-process app exposing front_apis control endpoints and "
"DashScope-compatible chat endpoints."
),
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Keep existing /v1/... admin APIs unchanged.
app.include_router(front_app.router)
# Add DashScope endpoints at their existing URLs. We intentionally skip
# DashScope's root/health routes to avoid clashing with front_apis.
app.include_router(create_dashscope_router(include_meta_routes=False))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8500)

View File

@@ -4,6 +4,7 @@ import os
import os.path as osp
import sys
import json
import psycopg
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
@@ -23,6 +24,10 @@ from lang_agent.front_api.build_server_utils import (
GRAPH_BUILD_FNCS,
update_pipeline_registry,
)
from lang_agent.components.client_tool_manager import (
ClientToolManager,
ClientToolManagerConfig,
)
class GraphConfigUpsertRequest(BaseModel):
@@ -114,6 +119,38 @@ class PipelineStopResponse(BaseModel):
reload_required: bool
class ConversationListItem(BaseModel):
conversation_id: str
pipeline_id: str
message_count: int
last_updated: Optional[str] = Field(default=None)
class PipelineConversationListResponse(BaseModel):
pipeline_id: str
items: List[ConversationListItem]
count: int
class ConversationMessageItem(BaseModel):
message_type: str
content: str
sequence_number: int
created_at: str
class PipelineConversationMessagesResponse(BaseModel):
pipeline_id: str
conversation_id: str
items: List[ConversationMessageItem]
count: int
class RuntimeAuthInfoResponse(BaseModel):
fast_api_key: str
source: str
class ApiKeyPolicyItem(BaseModel):
api_key: str
default_pipeline_id: Optional[str] = Field(default=None)
@@ -154,6 +191,12 @@ class McpConfigUpdateResponse(BaseModel):
tool_keys: List[str]
class McpAvailableToolsResponse(BaseModel):
available_tools: List[str] = Field(default_factory=list)
errors: List[str] = Field(default_factory=list)
servers: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
app = FastAPI(
title="Front APIs",
description="Manage graph configs and launch graph pipelines.",
@@ -190,11 +233,15 @@ async def root():
"/v1/pipelines (POST) - build config + upsert pipeline registry entry",
"/v1/pipelines (GET) - list registry pipeline specs",
"/v1/pipelines/{pipeline_id} (DELETE) - disable pipeline in registry",
"/v1/runtime-auth (GET) - show runtime FAST API key info",
"/v1/pipelines/{pipeline_id}/conversations (GET) - list pipeline conversations",
"/v1/pipelines/{pipeline_id}/conversations/{conversation_id}/messages (GET) - list messages in a conversation",
"/v1/pipelines/api-keys (GET) - list API key routing policies",
"/v1/pipelines/api-keys/{api_key} (PUT) - upsert API key routing policy",
"/v1/pipelines/api-keys/{api_key} (DELETE) - delete API key routing policy",
"/v1/tool-configs/mcp (GET)",
"/v1/tool-configs/mcp (PUT)",
"/v1/tool-configs/mcp/tools (GET)",
],
}
@@ -240,6 +287,30 @@ def _write_pipeline_registry(registry: Dict[str, Any]) -> None:
f.write("\n")
def _resolve_runtime_fast_api_key() -> RuntimeAuthInfoResponse:
"""Pick a runtime auth key from pipeline registry first, then FAST_AUTH_KEYS env."""
try:
registry = _read_pipeline_registry()
api_keys = registry.get("api_keys", {})
if isinstance(api_keys, dict):
for key in api_keys.keys():
candidate = str(key).strip()
if candidate:
return RuntimeAuthInfoResponse(
fast_api_key=candidate, source="pipeline_registry"
)
except Exception:
# fall back to env parsing below
pass
raw_env = os.environ.get("FAST_AUTH_KEYS", "")
for token in raw_env.split(","):
candidate = token.strip()
if candidate:
return RuntimeAuthInfoResponse(fast_api_key=candidate, source="env")
return RuntimeAuthInfoResponse(fast_api_key="", source="none")
def _normalize_pipeline_spec(pipeline_id: str, spec: Dict[str, Any]) -> PipelineSpec:
if not isinstance(spec, dict):
raise ValueError(f"pipeline spec for '{pipeline_id}' must be an object")
@@ -459,6 +530,38 @@ async def update_mcp_tool_config(body: McpConfigUpdateRequest):
)
@app.get("/v1/tool-configs/mcp/tools", response_model=McpAvailableToolsResponse)
async def list_mcp_available_tools():
try:
_read_mcp_config_raw()
manager = ClientToolManager(
ClientToolManagerConfig(mcp_config_f=MCP_CONFIG_PATH)
)
servers = await manager.aget_tools_by_server()
available_tools = sorted(
{
tool_name
for server_info in servers.values()
for tool_name in server_info.get("tools", [])
}
)
errors = [
f"{server_name}: {server_info.get('error')}"
for server_name, server_info in servers.items()
if server_info.get("error")
]
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
return McpAvailableToolsResponse(
available_tools=available_tools,
errors=errors,
servers=servers,
)
@app.get("/v1/pipelines", response_model=PipelineListResponse)
async def list_running_pipelines():
try:
@@ -511,7 +614,7 @@ async def create_pipeline(body: PipelineCreateRequest):
),
)
config_file = f"configs/pipelines/{pipeline_id}.yml"
config_file = f"configs/pipelines/{pipeline_id}.yaml"
config_abs_dir = osp.join(_PROJECT_ROOT, "configs", "pipelines")
try:
build_fn(
@@ -555,7 +658,7 @@ async def create_pipeline(body: PipelineCreateRequest):
config_file=normalized.config_file,
llm_name=normalized.llm_name,
enabled=normalized.enabled,
reload_required=True,
reload_required=False,
registry_path=PIPELINE_REGISTRY_PATH,
)
@@ -583,7 +686,129 @@ async def stop_pipeline(pipeline_id: str):
pipeline_id=pipeline_id,
status="disabled",
enabled=False,
reload_required=True,
reload_required=False,
)
@app.get("/v1/runtime-auth", response_model=RuntimeAuthInfoResponse)
async def get_runtime_auth_info():
return _resolve_runtime_fast_api_key()
@app.get(
"/v1/pipelines/{pipeline_id}/conversations",
response_model=PipelineConversationListResponse,
)
async def list_pipeline_conversations(pipeline_id: str, limit: int = 100):
if limit < 1 or limit > 500:
raise HTTPException(status_code=400, detail="limit must be between 1 and 500")
conn_str = os.environ.get("CONN_STR")
if not conn_str:
raise HTTPException(status_code=500, detail="CONN_STR not set")
try:
with psycopg.connect(conn_str) as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute(
"""
SELECT
conversation_id,
pipeline_id,
COUNT(*) AS message_count,
MAX(created_at) AS last_updated
FROM messages
WHERE pipeline_id = %s
GROUP BY conversation_id, pipeline_id
ORDER BY last_updated DESC
LIMIT %s
""",
(pipeline_id, limit),
)
rows = cur.fetchall()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
items = [
ConversationListItem(
conversation_id=str(row["conversation_id"]),
pipeline_id=str(row["pipeline_id"]),
message_count=int(row["message_count"]),
last_updated=(
row["last_updated"].isoformat() if row.get("last_updated") else None
),
)
for row in rows
]
return PipelineConversationListResponse(
pipeline_id=pipeline_id, items=items, count=len(items)
)
@app.get(
"/v1/pipelines/{pipeline_id}/conversations/{conversation_id}/messages",
response_model=PipelineConversationMessagesResponse,
)
async def get_pipeline_conversation_messages(pipeline_id: str, conversation_id: str):
conn_str = os.environ.get("CONN_STR")
if not conn_str:
raise HTTPException(status_code=500, detail="CONN_STR not set")
try:
with psycopg.connect(conn_str) as conn:
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
cur.execute(
"""
SELECT 1
FROM messages
WHERE pipeline_id = %s AND conversation_id = %s
LIMIT 1
""",
(pipeline_id, conversation_id),
)
exists = cur.fetchone()
if exists is None:
raise HTTPException(
status_code=404,
detail=(
f"conversation_id '{conversation_id}' not found for "
f"pipeline '{pipeline_id}'"
),
)
cur.execute(
"""
SELECT
message_type,
content,
sequence_number,
created_at
FROM messages
WHERE pipeline_id = %s AND conversation_id = %s
ORDER BY sequence_number ASC
""",
(pipeline_id, conversation_id),
)
rows = cur.fetchall()
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
items = [
ConversationMessageItem(
message_type=str(row["message_type"]),
content=str(row["content"]),
sequence_number=int(row["sequence_number"]),
created_at=row["created_at"].isoformat() if row.get("created_at") else "",
)
for row in rows
]
return PipelineConversationMessagesResponse(
pipeline_id=pipeline_id,
conversation_id=conversation_id,
items=items,
count=len(items),
)
@@ -688,5 +913,15 @@ async def delete_pipeline_api_key_policy(api_key: str):
return ApiKeyPolicyDeleteResponse(
api_key=normalized_key,
status="deleted",
reload_required=True,
reload_required=False,
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"front_apis:app",
host="0.0.0.0",
port=8500,
reload=True,
)

View File

@@ -1,9 +1,8 @@
from fastapi import FastAPI, HTTPException, Path, Request, Depends, Security
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Path, Request, Security
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.security import APIKeyHeader
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional
import os
import os.path as osp
import sys
@@ -20,16 +19,28 @@ from lang_agent.pipeline import PipelineConfig
from lang_agent.components.server_pipeline_manager import ServerPipelineManager
from lang_agent.config.constants import PIPELINE_REGISTRY_PATH, API_KEY_HEADER, VALID_API_KEYS
# Load base config for route-level overrides (pipelines are lazy-loaded from registry)
pipeline_config = tyro.cli(PipelineConfig)
logger.info(f"starting agent with base pipeline config: \n{pipeline_config}")
def _build_default_pipeline_config() -> PipelineConfig:
"""
Build import-time defaults without parsing CLI args.
This keeps module import safe for reuse by combined apps and tests.
"""
pipeline_config = PipelineConfig()
logger.info(f"starting agent with base pipeline config: \n{pipeline_config}")
return pipeline_config
PIPELINE_MANAGER = ServerPipelineManager(
default_pipeline_id=os.environ.get("FAST_DEFAULT_PIPELINE_ID", "default"),
default_config=pipeline_config,
)
PIPELINE_MANAGER.load_registry(PIPELINE_REGISTRY_PATH)
def _build_pipeline_manager(base_config: PipelineConfig) -> ServerPipelineManager:
pipeline_manager = ServerPipelineManager(
default_pipeline_id=os.environ.get("FAST_DEFAULT_PIPELINE_ID", "default"),
default_config=base_config,
)
pipeline_manager.load_registry(PIPELINE_REGISTRY_PATH)
return pipeline_manager
pipeline_config = _build_default_pipeline_config()
PIPELINE_MANAGER = _build_pipeline_manager(pipeline_config)
async def verify_api_key(api_key: str = Security(API_KEY_HEADER)):
@@ -55,20 +66,6 @@ class DSApplicationCallRequest(BaseModel):
thread_id: Optional[str] = Field(default="3")
app = FastAPI(
title="DashScope-Compatible Application API",
description="DashScope Application.call compatible endpoint backed by pipeline.chat",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def sse_chunks_from_stream(
chunk_generator, response_id: str, model: str = "qwen-flash"
):
@@ -188,7 +185,14 @@ async def _process_dashscope_request(
app_id: Optional[str],
session_id: Optional[str],
api_key: str,
pipeline_manager: ServerPipelineManager,
):
try:
pipeline_manager.refresh_registry_if_needed()
except Exception as e:
logger.error(f"failed to refresh pipeline registry: {e}")
raise HTTPException(status_code=500, detail=f"Failed to refresh pipeline registry: {e}")
req_app_id = app_id or body.get("app_id")
body_input = body.get("input", {}) if isinstance(body.get("input"), dict) else {}
req_session_id = session_id or body_input.get("session_id")
@@ -201,10 +205,10 @@ async def _process_dashscope_request(
thread_id = body_input.get("session_id") or req_session_id or "3"
user_msg = _extract_user_message(messages)
pipeline_id = PIPELINE_MANAGER.resolve_pipeline_id(
pipeline_id = pipeline_manager.resolve_pipeline_id(
body=body, app_id=req_app_id, api_key=api_key
)
selected_pipeline, selected_model = PIPELINE_MANAGER.get_pipeline(pipeline_id)
selected_pipeline, selected_model = pipeline_manager.get_pipeline(pipeline_id)
# Namespace thread ids to prevent memory collisions across pipelines.
thread_id = f"{pipeline_id}:{thread_id}"
@@ -245,76 +249,117 @@ async def _process_dashscope_request(
return JSONResponse(content=data)
@app.post("/v1/apps/{app_id}/sessions/{session_id}/responses")
@app.post("/api/v1/apps/{app_id}/sessions/{session_id}/responses")
async def application_responses(
request: Request,
app_id: str = Path(...),
session_id: str = Path(...),
api_key: str = Depends(verify_api_key),
):
try:
body = await request.json()
return await _process_dashscope_request(
body=body,
app_id=app_id,
session_id=session_id,
api_key=api_key,
def create_dashscope_router(
pipeline_manager: Optional[ServerPipelineManager] = None,
include_meta_routes: bool = True,
) -> APIRouter:
manager = pipeline_manager or PIPELINE_MANAGER
router = APIRouter()
@router.post("/v1/apps/{app_id}/sessions/{session_id}/responses")
@router.post("/api/v1/apps/{app_id}/sessions/{session_id}/responses")
async def application_responses(
request: Request,
app_id: str = Path(...),
session_id: str = Path(...),
api_key: str = Depends(verify_api_key),
):
try:
body = await request.json()
return await _process_dashscope_request(
body=body,
app_id=app_id,
session_id=session_id,
api_key=api_key,
pipeline_manager=manager,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"DashScope-compatible endpoint error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Compatibility: some SDKs call /apps/{app_id}/completion without /v1 and
# without session in path.
@router.post("/apps/{app_id}/completion")
@router.post("/v1/apps/{app_id}/completion")
@router.post("/api/apps/{app_id}/completion")
@router.post("/api/v1/apps/{app_id}/completion")
async def application_completion(
request: Request,
app_id: str = Path(...),
api_key: str = Depends(verify_api_key),
):
try:
body = await request.json()
return await _process_dashscope_request(
body=body,
app_id=app_id,
session_id=None,
api_key=api_key,
pipeline_manager=manager,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"DashScope-compatible completion error: {e}")
raise HTTPException(status_code=500, detail=str(e))
if include_meta_routes:
@router.get("/")
async def root():
return {
"message": "DashScope Application-compatible API",
"endpoints": [
"/v1/apps/{app_id}/sessions/{session_id}/responses",
"/health",
],
}
@router.get("/health")
async def health():
return {"status": "healthy"}
return router
def create_dashscope_app(
pipeline_manager: Optional[ServerPipelineManager] = None,
) -> FastAPI:
dashscope_app = FastAPI(
title="DashScope-Compatible Application API",
description="DashScope Application.call compatible endpoint backed by pipeline.chat",
)
dashscope_app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
dashscope_app.include_router(
create_dashscope_router(
pipeline_manager=pipeline_manager,
include_meta_routes=True,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"DashScope-compatible endpoint error: {e}")
raise HTTPException(status_code=500, detail=str(e))
)
return dashscope_app
# Compatibility: some SDKs call /apps/{app_id}/completion without /v1 and without session in path
@app.post("/apps/{app_id}/completion")
@app.post("/v1/apps/{app_id}/completion")
@app.post("/api/apps/{app_id}/completion")
@app.post("/api/v1/apps/{app_id}/completion")
async def application_completion(
request: Request,
app_id: str = Path(...),
api_key: str = Depends(verify_api_key),
):
try:
body = await request.json()
return await _process_dashscope_request(
body=body,
app_id=app_id,
session_id=None,
api_key=api_key,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"DashScope-compatible completion error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
async def root():
return {
"message": "DashScope Application-compatible API",
"endpoints": [
"/v1/apps/{app_id}/sessions/{session_id}/responses",
"/health",
],
}
@app.get("/health")
async def health():
return {"status": "healthy"}
dashscope_router = create_dashscope_router(include_meta_routes=False)
app = create_dashscope_app()
if __name__ == "__main__":
# CLI parsing is intentionally only in script mode to keep module import safe.
cli_pipeline_config = tyro.cli(PipelineConfig)
logger.info(f"starting agent with CLI pipeline config: \n{cli_pipeline_config}")
cli_pipeline_manager = _build_pipeline_manager(cli_pipeline_config)
uvicorn.run(
"server_dashscope:app",
host="0.0.0.0",
port=pipeline_config.port,
reload=True,
create_dashscope_app(pipeline_manager=cli_pipeline_manager),
host=cli_pipeline_config.host,
port=cli_pipeline_config.port,
reload=False,
)

View File

@@ -5,15 +5,13 @@ import subprocess
import json
from lang_agent.config.core_config import load_tyro_conf
_PROJECT_ROOT = osp.dirname(osp.dirname(osp.dirname(osp.abspath(__file__))))
_TY_BUILD_SCRIPT = osp.join(_PROJECT_ROOT, "lang_agent", "config", "ty_build_config.py")
from lang_agent.config.constants import TY_BUILD_SCRIPT, _PROJECT_ROOT
def opt_to_config(save_path: str, *nargs):
os.makedirs(osp.dirname(save_path), exist_ok=True)
subprocess.run(
["python", _TY_BUILD_SCRIPT, "--save-path", save_path, *nargs],
["python", TY_BUILD_SCRIPT, "--save-path", save_path, *nargs],
check=True,
cwd=_PROJECT_ROOT,
)
@@ -22,7 +20,7 @@ def opt_to_config(save_path: str, *nargs):
def _build_and_load_pipeline_config(
pipeline_id: str, pipeline_config_dir: str, cmd: List[str]
):
save_config_f = osp.join(pipeline_config_dir, f"{pipeline_id}.yml")
save_config_f = osp.join(pipeline_config_dir, f"{pipeline_id}.yaml")
opt_to_config(save_config_f, *cmd)
# TODO: think if returning the built pipeline is better or just the config obj for front_api
@@ -67,6 +65,7 @@ def build_route(
pipeline_config_dir="configs/pipelines",
):
cmd_opt = [
"--pipeline.pipeline-id", pipeline_id,
"route", # ------------
"--llm-name", llm_name,
"--api-key", api_key,
@@ -96,6 +95,7 @@ def build_react(
pipeline_config_dir="configs/pipelines",
):
cmd_opt = [
"--pipeline.pipeline-id", pipeline_id,
"react", # ------------
"--llm-name", llm_name,
"--api-key", api_key,

View File

@@ -62,7 +62,7 @@ class PipelineConfig(LLMNodeConfig):
host: str = "0.0.0.0"
"""where am I hosted"""
port: int = 8588
port: int = 8500
"""what is my port"""
# graph_config: AnnotatedGraph = field(default_factory=ReactGraphConfig)

View File

@@ -0,0 +1,141 @@
#!/usr/bin/env python3
"""
Simple chat loop to interact with the blueberry pipeline via DashScope-compatible API.
Usage:
python chat_blueberry.py
The script connects to the server running on http://localhost:8500
and uses the API key from the pipeline registry.
"""
import requests
import json
import sys
from typing import Optional
# Configuration from pipeline_registry.json
API_KEY = "sk-6c7091e6a95f404efb2ec30e8f51b897626d670375cdf822d78262f24ab12367"
PIPELINE_ID = "blueberry"
BASE_URL = "http://localhost:8500"
SESSION_ID = "chat-session-1"
def send_message(
message: str,
session_id: str = SESSION_ID,
stream: bool = False,
app_id: str = PIPELINE_ID,
) -> Optional[str]:
"""Send a message to the blueberry pipeline and return the response."""
url = f"{BASE_URL}/v1/apps/{app_id}/sessions/{session_id}/responses"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
payload = {
"messages": [
{"role": "user", "content": message}
],
"stream": stream,
}
try:
if stream:
# Handle streaming response
response = requests.post(url, headers=headers, json=payload, stream=True)
response.raise_for_status()
accumulated_text = ""
for line in response.iter_lines():
if line:
line_str = line.decode('utf-8')
if line_str.startswith('data: '):
data_str = line_str[6:] # Remove 'data: ' prefix
try:
data = json.loads(data_str)
output = data.get("output", {})
text = output.get("text", "")
if text:
accumulated_text = text
# Print incremental updates (you can modify this behavior)
print(f"\rAssistant: {accumulated_text}", end="", flush=True)
if data.get("is_end", False):
print() # New line after streaming completes
return accumulated_text
except json.JSONDecodeError:
continue
return accumulated_text
else:
# Handle non-streaming response
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
output = data.get("output", {})
return output.get("text", "")
except requests.exceptions.RequestException as e:
print(f"Error sending message: {e}", file=sys.stderr)
if hasattr(e, 'response') and e.response is not None:
try:
error_detail = e.response.json()
print(f"Error details: {error_detail}", file=sys.stderr)
except:
print(f"Response status: {e.response.status_code}", file=sys.stderr)
return None
def main():
"""Main chat loop."""
print("=" * 60)
print(f"Chat with Blueberry Pipeline")
print(f"Pipeline ID: {PIPELINE_ID}")
print(f"Server: {BASE_URL}")
print(f"Session ID: {SESSION_ID}")
print("=" * 60)
print("Type your messages (or 'quit'/'exit' to end, 'stream' to toggle streaming)")
print("Streaming mode is ON by default")
print()
stream_mode = True
while True:
try:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', 'q']:
print("Goodbye!")
break
if user_input.lower() == 'stream':
stream_mode = not stream_mode
print(f"Streaming mode: {'ON' if stream_mode else 'OFF'}")
continue
print("Assistant: ", end="", flush=True)
response = send_message(user_input, stream=stream_mode)
if response is None:
print("(No response received)")
elif not stream_mode:
print(response)
# For streaming, the response is already printed incrementally
print() # Empty line for readability
except KeyboardInterrupt:
print("\n\nGoodbye!")
break
except Exception as e:
print(f"\nError: {e}", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,364 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import datetime as dt
import glob
import os
import os.path as osp
import sys
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional
import commentjson
import psycopg
PROJECT_ROOT = osp.dirname(osp.dirname(osp.dirname(osp.abspath(__file__))))
if PROJECT_ROOT not in sys.path:
sys.path.append(PROJECT_ROOT)
from lang_agent.config import load_tyro_conf # noqa: E402
from lang_agent.config.db_config_manager import DBConfigManager # noqa: E402
@dataclass
class MigrationPayload:
config_path: str
pipeline_id: str
graph_id: str
prompt_dict: Dict[str, str]
tool_keys: List[str]
api_key: Optional[str]
def _infer_pipeline_id(pipeline_conf, config_path: str) -> str:
candidates = [
getattr(pipeline_conf, "pipeline_id", None),
getattr(getattr(pipeline_conf, "graph_config", None), "pipeline_id", None),
]
for candidate in candidates:
if candidate is None:
continue
value = str(candidate).strip()
if value and value.lower() != "null":
return value
return osp.splitext(osp.basename(config_path))[0]
def _infer_graph_id(graph_conf) -> str:
if graph_conf is None:
return "unknown"
class_name = graph_conf.__class__.__name__.lower()
if "routing" in class_name or class_name == "routeconfig":
return "routing"
if "react" in class_name:
return "react"
target = getattr(graph_conf, "_target", None)
if target is not None:
target_name = getattr(target, "__name__", str(target)).lower()
if "routing" in target_name:
return "routing"
if "react" in target_name:
return "react"
return "unknown"
def _extract_tool_keys(graph_conf) -> List[str]:
if graph_conf is None:
return []
tool_cfg = getattr(graph_conf, "tool_manager_config", None)
client_cfg = getattr(tool_cfg, "client_tool_manager", None)
keys = getattr(client_cfg, "tool_keys", None)
if not keys:
return []
out: List[str] = []
seen = set()
for key in keys:
cleaned = str(key).strip()
if not cleaned or cleaned in seen:
continue
seen.add(cleaned)
out.append(cleaned)
return out
def _load_prompt_dict(prompt_path: str, default_key: str = "sys_prompt") -> Dict[str, str]:
if not prompt_path:
return {}
if not osp.exists(prompt_path):
return {}
if osp.isdir(prompt_path):
prompt_files = sorted(
p for p in glob.glob(osp.join(prompt_path, "*.txt")) if "optional" not in p
)
out = {}
for prompt_f in prompt_files:
key = osp.splitext(osp.basename(prompt_f))[0]
with open(prompt_f, "r", encoding="utf-8") as f:
out[key] = f.read()
return out
if prompt_path.endswith(".json"):
with open(prompt_path, "r", encoding="utf-8") as f:
obj = commentjson.load(f)
if not isinstance(obj, dict):
return {}
return {str(k): v if isinstance(v, str) else str(v) for k, v in obj.items()}
if prompt_path.endswith(".txt"):
with open(prompt_path, "r", encoding="utf-8") as f:
return {default_key: f.read()}
return {}
def _extract_prompt_dict(graph_conf) -> Dict[str, str]:
if graph_conf is None:
return {}
if hasattr(graph_conf, "sys_prompt_f"):
return _load_prompt_dict(str(getattr(graph_conf, "sys_prompt_f")), "sys_prompt")
if hasattr(graph_conf, "sys_promp_dir"):
return _load_prompt_dict(str(getattr(graph_conf, "sys_promp_dir")))
return {}
def _extract_tool_node_prompt_dict(graph_conf) -> Dict[str, str]:
tool_node_conf = getattr(graph_conf, "tool_node_config", None)
if tool_node_conf is None:
return {}
out: Dict[str, str] = {}
if hasattr(tool_node_conf, "tool_prompt_f"):
out.update(
_load_prompt_dict(str(getattr(tool_node_conf, "tool_prompt_f")), "tool_prompt")
)
if hasattr(tool_node_conf, "chatty_sys_prompt_f"):
out.update(
_load_prompt_dict(
str(getattr(tool_node_conf, "chatty_sys_prompt_f")), "chatty_prompt"
)
)
return out
def _prompt_key_whitelist(graph_conf, graph_id: str) -> Optional[set]:
if graph_id == "react":
return {"sys_prompt"}
if graph_id != "routing":
return None
allowed = {"route_prompt", "chat_prompt", "tool_prompt"}
tool_node_conf = getattr(graph_conf, "tool_node_config", None)
if tool_node_conf is None:
return allowed
cls_name = tool_node_conf.__class__.__name__.lower()
target = getattr(tool_node_conf, "_target", None)
target_name = getattr(target, "__name__", str(target)).lower() if target else ""
if "chatty" in cls_name or "chatty" in target_name:
allowed.add("chatty_prompt")
return allowed
def _collect_payload(config_path: str) -> MigrationPayload:
conf = load_tyro_conf(config_path)
graph_conf = getattr(conf, "graph_config", None)
graph_id = _infer_graph_id(graph_conf)
prompt_dict = _extract_prompt_dict(graph_conf)
prompt_dict.update(_extract_tool_node_prompt_dict(graph_conf))
whitelist = _prompt_key_whitelist(graph_conf, graph_id)
if whitelist is not None:
prompt_dict = {k: v for k, v in prompt_dict.items() if k in whitelist}
return MigrationPayload(
config_path=config_path,
pipeline_id=_infer_pipeline_id(conf, config_path),
graph_id=graph_id,
prompt_dict=prompt_dict,
tool_keys=_extract_tool_keys(graph_conf),
api_key=getattr(conf, "api_key", None),
)
def _resolve_config_paths(config_dir: str, config_paths: Optional[Iterable[str]]) -> List[str]:
if config_paths:
resolved = [osp.abspath(path) for path in config_paths]
else:
pattern = osp.join(osp.abspath(config_dir), "*.yaml")
resolved = sorted(glob.glob(pattern))
return [path for path in resolved if osp.exists(path)]
def _ensure_prompt_set(
conn: psycopg.Connection,
pipeline_id: str,
graph_id: str,
set_name: str,
description: str,
) -> str:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id FROM prompt_sets
WHERE pipeline_id = %s AND name = %s
ORDER BY updated_at DESC, created_at DESC
LIMIT 1
""",
(pipeline_id, set_name),
)
row = cur.fetchone()
if row is not None:
return str(row[0])
cur.execute(
"""
INSERT INTO prompt_sets (pipeline_id, graph_id, name, description, is_active, list)
VALUES (%s, %s, %s, %s, false, '')
RETURNING id
""",
(pipeline_id, graph_id, set_name, description),
)
created = cur.fetchone()
return str(created[0])
def _activate_prompt_set(conn: psycopg.Connection, pipeline_id: str, prompt_set_id: str) -> None:
with conn.cursor() as cur:
cur.execute(
"UPDATE prompt_sets SET is_active = false, updated_at = now() WHERE pipeline_id = %s",
(pipeline_id,),
)
cur.execute(
"UPDATE prompt_sets SET is_active = true, updated_at = now() WHERE id = %s",
(prompt_set_id,),
)
def _run_migration(
payloads: List[MigrationPayload],
set_name: str,
description: str,
dry_run: bool,
activate: bool,
) -> None:
for payload in payloads:
print(
f"[PLAN] pipeline={payload.pipeline_id} graph={payload.graph_id} "
f"prompts={len(payload.prompt_dict)} tools={len(payload.tool_keys)} "
f"config={payload.config_path}"
)
if dry_run:
continue
manager = DBConfigManager()
with psycopg.connect(manager.conn_str) as conn:
prompt_set_id = _ensure_prompt_set(
conn=conn,
pipeline_id=payload.pipeline_id,
graph_id=payload.graph_id,
set_name=set_name,
description=description,
)
conn.commit()
manager.set_config(
pipeline_id=payload.pipeline_id,
graph_id=payload.graph_id,
prompt_set_id=prompt_set_id,
tool_list=payload.tool_keys,
prompt_dict=payload.prompt_dict,
api_key=payload.api_key,
)
if activate:
_activate_prompt_set(
conn=conn,
pipeline_id=payload.pipeline_id,
prompt_set_id=prompt_set_id,
)
conn.commit()
print(
f"[DONE] pipeline={payload.pipeline_id} "
f"prompt_set={prompt_set_id} activate={activate}"
)
def main() -> None:
date_str = dt.date.today().isoformat()
parser = argparse.ArgumentParser(
description="Import prompt definitions from pipeline YAML files into DB prompt_sets."
)
parser.add_argument(
"--config-dir",
default=osp.join(PROJECT_ROOT, "configs", "pipelines"),
help="Directory containing pipeline YAML files.",
)
parser.add_argument(
"--config",
action="append",
default=[],
help="Specific pipeline config yaml path. Can be passed multiple times.",
)
parser.add_argument(
"--pipeline-id",
action="append",
default=[],
help="If provided, only migrate these pipeline IDs (repeatable).",
)
parser.add_argument(
"--set-name",
# default=f"migrated-{date_str}",
default="default",
help="Prompt set name to create/reuse under each pipeline.",
)
parser.add_argument(
"--description",
default="Migrated from pipeline YAML prompt files",
help="Prompt set description.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print what would be migrated without writing to DB.",
)
parser.add_argument(
"--activate",
action="store_true",
help="Mark imported set active for each migrated pipeline.",
)
args = parser.parse_args()
config_paths = _resolve_config_paths(args.config_dir, args.config)
if not config_paths:
raise SystemExit("No config files found. Provide --config or --config-dir.")
requested_pipelines = {p.strip() for p in args.pipeline_id if p.strip()}
payloads: List[MigrationPayload] = []
for config_path in config_paths:
payload = _collect_payload(config_path)
if requested_pipelines and payload.pipeline_id not in requested_pipelines:
continue
if not payload.prompt_dict:
print(f"[SKIP] no prompts found for config={config_path}")
continue
payloads.append(payload)
if not payloads:
raise SystemExit("No pipelines matched with prompt content to migrate.")
_run_migration(
payloads=payloads,
set_name=args.set_name,
description=args.description,
dry_run=args.dry_run,
activate=args.activate,
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,46 @@
import importlib
import os
import sys
from fastapi.testclient import TestClient
os.environ.setdefault("CONN_STR", "postgresql://dummy:dummy@localhost/dummy")
def test_server_dashscope_import_is_cli_safe(monkeypatch):
"""
Importing server_dashscope should not invoke tyro.cli at module import time.
"""
import tyro
monkeypatch.setattr(
tyro,
"cli",
lambda *_args, **_kwargs: (_ for _ in ()).throw(
AssertionError("tyro.cli must not run during module import")
),
)
sys.modules.pop("fastapi_server.server_dashscope", None)
module = importlib.import_module("fastapi_server.server_dashscope")
assert module.app is not None
assert module.dashscope_router is not None
def test_combined_app_serves_front_and_dashscope_routes():
from fastapi_server.combined import app
client = TestClient(app)
# front_apis route should be available.
front_resp = client.get("/v1/pipelines/graphs")
assert front_resp.status_code == 200, front_resp.text
assert "available_graphs" in front_resp.json()
# DashScope route should exist at the same path (missing auth should not be 404).
dash_resp = client.post(
"/api/v1/apps/blueberry/sessions/test-session/responses",
json={"input": {"prompt": "hello"}, "stream": False},
)
assert dash_resp.status_code != 404, dash_resp.text

View File

@@ -30,7 +30,7 @@ except Exception as e:
# <<< Paste your running FastAPI base url here >>>
BASE_URL = os.getenv("DS_BASE_URL", "http://127.0.0.1:8588/api/")
BASE_URL = os.getenv("DS_BASE_URL", "http://127.0.0.1:8500/api/")
# Params

View File

@@ -1,13 +1,18 @@
import json
import os
from pathlib import Path
from datetime import datetime, timedelta, timezone
import importlib
from fastapi.testclient import TestClient
os.environ.setdefault("CONN_STR", "postgresql://dummy:dummy@localhost/dummy")
import fastapi_server.front_apis as front_apis
try:
front_apis = importlib.import_module("lang_agent.fastapi_server.front_apis")
except ModuleNotFoundError:
front_apis = importlib.import_module("fastapi_server.front_apis")
def _fake_build_fn(
@@ -20,7 +25,7 @@ def _fake_build_fn(
):
out_dir = Path(pipeline_config_dir)
out_dir.mkdir(parents=True, exist_ok=True)
out_file = out_dir / f"{pipeline_id}.yml"
out_file = out_dir / f"{pipeline_id}.yaml"
out_file.write_text(
json.dumps(
{
@@ -36,9 +41,101 @@ def _fake_build_fn(
return {"path": str(out_file)}
class _FakeCursor:
def __init__(self, rows):
self._rows = rows
self._result = []
self._last_sql = ""
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def execute(self, sql, params=None):
self._last_sql = sql
query = " ".join(sql.split()).lower()
params = params or ()
if "group by conversation_id, pipeline_id" in query:
pipeline_id = params[0]
limit = int(params[1])
grouped = {}
for row in self._rows:
if row["pipeline_id"] != pipeline_id:
continue
conv_id = row["conversation_id"]
if conv_id not in grouped:
grouped[conv_id] = {
"conversation_id": conv_id,
"pipeline_id": row["pipeline_id"],
"message_count": 0,
"last_updated": row["created_at"],
}
grouped[conv_id]["message_count"] += 1
if row["created_at"] > grouped[conv_id]["last_updated"]:
grouped[conv_id]["last_updated"] = row["created_at"]
values = sorted(grouped.values(), key=lambda x: x["last_updated"], reverse=True)
self._result = values[:limit]
return
if "select 1 from messages" in query:
pipeline_id, conversation_id = params
found = any(
row["pipeline_id"] == pipeline_id
and row["conversation_id"] == conversation_id
for row in self._rows
)
self._result = [{"exists": 1}] if found else []
return
if "order by sequence_number asc" in query:
pipeline_id, conversation_id = params
self._result = sorted(
[
{
"message_type": row["message_type"],
"content": row["content"],
"sequence_number": row["sequence_number"],
"created_at": row["created_at"],
}
for row in self._rows
if row["pipeline_id"] == pipeline_id
and row["conversation_id"] == conversation_id
],
key=lambda x: x["sequence_number"],
)
return
raise AssertionError(f"Unsupported SQL in test fake: {self._last_sql}")
def fetchall(self):
return self._result
def fetchone(self):
if not self._result:
return None
return self._result[0]
class _FakeConnection:
def __init__(self, rows):
self._rows = rows
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def cursor(self, row_factory=None):
return _FakeCursor(self._rows)
def test_registry_route_lifecycle(monkeypatch, tmp_path):
registry_path = tmp_path / "pipeline_registry.json"
monkeypatch.setattr(front_apis, "_PIPELINE_REGISTRY_PATH", str(registry_path))
monkeypatch.setattr(front_apis, "PIPELINE_REGISTRY_PATH", str(registry_path))
monkeypatch.setitem(front_apis.GRAPH_BUILD_FNCS, "routing", _fake_build_fn)
client = TestClient(front_apis.app)
@@ -60,7 +157,7 @@ def test_registry_route_lifecycle(monkeypatch, tmp_path):
assert create_data["pipeline_id"] == "xiaozhan"
assert create_data["graph_id"] == "routing"
assert create_data["llm_name"] == "qwen-plus"
assert create_data["reload_required"] is True
assert create_data["reload_required"] is False
list_resp = client.get("/v1/pipelines")
assert list_resp.status_code == 200, list_resp.text
@@ -91,7 +188,7 @@ def test_registry_route_lifecycle(monkeypatch, tmp_path):
def test_registry_api_key_policy_lifecycle(monkeypatch, tmp_path):
registry_path = tmp_path / "pipeline_registry.json"
monkeypatch.setattr(front_apis, "_PIPELINE_REGISTRY_PATH", str(registry_path))
monkeypatch.setattr(front_apis, "PIPELINE_REGISTRY_PATH", str(registry_path))
monkeypatch.setitem(front_apis.GRAPH_BUILD_FNCS, "routing", _fake_build_fn)
client = TestClient(front_apis.app)
@@ -136,4 +233,95 @@ def test_registry_api_key_policy_lifecycle(monkeypatch, tmp_path):
delete_data = delete_resp.json()
assert delete_data["api_key"] == "sk-test-key"
assert delete_data["status"] == "deleted"
assert delete_data["reload_required"] is True
assert delete_data["reload_required"] is False
def test_pipeline_conversation_routes(monkeypatch):
now = datetime.now(timezone.utc)
rows = [
{
"conversation_id": "agent-a:conv-1",
"pipeline_id": "agent-a",
"message_type": "human",
"content": "hello",
"sequence_number": 1,
"created_at": now - timedelta(seconds=30),
},
{
"conversation_id": "agent-a:conv-1",
"pipeline_id": "agent-a",
"message_type": "ai",
"content": "hi there",
"sequence_number": 2,
"created_at": now - timedelta(seconds=20),
},
{
"conversation_id": "agent-a:conv-2",
"pipeline_id": "agent-a",
"message_type": "human",
"content": "second thread",
"sequence_number": 1,
"created_at": now - timedelta(seconds=10),
},
{
"conversation_id": "agent-b:conv-9",
"pipeline_id": "agent-b",
"message_type": "human",
"content": "other pipeline",
"sequence_number": 1,
"created_at": now - timedelta(seconds=5),
},
]
monkeypatch.setenv("CONN_STR", "postgresql://dummy:dummy@localhost/dummy")
monkeypatch.setattr(
front_apis.psycopg,
"connect",
lambda _conn_str: _FakeConnection(rows),
)
client = TestClient(front_apis.app)
list_resp = client.get("/v1/pipelines/agent-a/conversations")
assert list_resp.status_code == 200, list_resp.text
list_data = list_resp.json()
assert list_data["pipeline_id"] == "agent-a"
assert list_data["count"] == 2
assert [item["conversation_id"] for item in list_data["items"]] == [
"agent-a:conv-2",
"agent-a:conv-1",
]
assert all(item["pipeline_id"] == "agent-a" for item in list_data["items"])
msg_resp = client.get("/v1/pipelines/agent-a/conversations/agent-a:conv-1/messages")
assert msg_resp.status_code == 200, msg_resp.text
msg_data = msg_resp.json()
assert msg_data["pipeline_id"] == "agent-a"
assert msg_data["conversation_id"] == "agent-a:conv-1"
assert msg_data["count"] == 2
assert [item["message_type"] for item in msg_data["items"]] == ["human", "ai"]
assert [item["sequence_number"] for item in msg_data["items"]] == [1, 2]
def test_pipeline_conversation_messages_404(monkeypatch):
rows = [
{
"conversation_id": "agent-b:conv-9",
"pipeline_id": "agent-b",
"message_type": "human",
"content": "other pipeline",
"sequence_number": 1,
"created_at": datetime.now(timezone.utc),
},
]
monkeypatch.setenv("CONN_STR", "postgresql://dummy:dummy@localhost/dummy")
monkeypatch.setattr(
front_apis.psycopg,
"connect",
lambda _conn_str: _FakeConnection(rows),
)
client = TestClient(front_apis.app)
resp = client.get("/v1/pipelines/agent-a/conversations/agent-b:conv-9/messages")
assert resp.status_code == 404, resp.text
assert "not found for pipeline 'agent-a'" in resp.json()["detail"]

View File

@@ -0,0 +1,113 @@
import importlib.util
import sys
from pathlib import Path
from types import SimpleNamespace
def _load_module():
project_root = Path(__file__).resolve().parents[1]
script_path = project_root / "scripts" / "py_scripts" / "migrate_yaml_prompts_to_db.py"
spec = importlib.util.spec_from_file_location("migrate_yaml_prompts_to_db", script_path)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def test_infer_pipeline_id_falls_back_to_filename():
module = _load_module()
conf = SimpleNamespace(
pipeline_id=None,
graph_config=SimpleNamespace(pipeline_id=None),
)
out = module._infer_pipeline_id(conf, "/tmp/blueberry.yaml")
assert out == "blueberry"
def test_extract_prompt_dict_for_react_txt(tmp_path):
module = _load_module()
prompt_f = tmp_path / "sys.txt"
prompt_f.write_text("hello react", encoding="utf-8")
graph_conf = SimpleNamespace(sys_prompt_f=str(prompt_f))
prompt_dict = module._extract_prompt_dict(graph_conf)
assert prompt_dict == {"sys_prompt": "hello react"}
def test_extract_prompt_dict_for_routing_dir(tmp_path):
module = _load_module()
(tmp_path / "route_prompt.txt").write_text("route", encoding="utf-8")
(tmp_path / "chat_prompt.txt").write_text("chat", encoding="utf-8")
graph_conf = SimpleNamespace(sys_promp_dir=str(tmp_path))
prompt_dict = module._extract_prompt_dict(graph_conf)
assert prompt_dict["route_prompt"] == "route"
assert prompt_dict["chat_prompt"] == "chat"
def test_collect_payload_routing_ignores_chatty_prompt_for_tool_node(tmp_path):
module = _load_module()
prompt_dir = tmp_path / "prompts"
prompt_dir.mkdir()
(prompt_dir / "route_prompt.txt").write_text("route", encoding="utf-8")
(prompt_dir / "chat_prompt.txt").write_text("chat", encoding="utf-8")
(prompt_dir / "tool_prompt.txt").write_text("tool", encoding="utf-8")
(prompt_dir / "chatty_prompt.txt").write_text("chatty", encoding="utf-8")
class RoutingConfig:
pass
class ToolNodeConfig:
pass
graph_conf = RoutingConfig()
graph_conf.sys_promp_dir = str(prompt_dir)
graph_conf.tool_node_config = ToolNodeConfig()
graph_conf.tool_node_config.tool_prompt_f = str(prompt_dir / "tool_prompt.txt")
conf = SimpleNamespace(
pipeline_id=None,
api_key="sk",
graph_config=graph_conf,
)
module.load_tyro_conf = lambda _: conf
payload = module._collect_payload(str(tmp_path / "xiaozhan.yaml"))
assert payload.pipeline_id == "xiaozhan"
assert set(payload.prompt_dict.keys()) == {"route_prompt", "chat_prompt", "tool_prompt"}
assert "chatty_prompt" not in payload.prompt_dict
def test_collect_payload_routing_includes_chatty_prompt_for_chatty_node(tmp_path):
module = _load_module()
prompt_dir = tmp_path / "prompts"
prompt_dir.mkdir()
(prompt_dir / "route_prompt.txt").write_text("route", encoding="utf-8")
(prompt_dir / "chat_prompt.txt").write_text("chat", encoding="utf-8")
(prompt_dir / "tool_prompt.txt").write_text("tool", encoding="utf-8")
(prompt_dir / "chatty_prompt.txt").write_text("chatty", encoding="utf-8")
class RoutingConfig:
pass
class ChattyToolNodeConfig:
pass
graph_conf = RoutingConfig()
graph_conf.sys_promp_dir = str(prompt_dir)
graph_conf.tool_node_config = ChattyToolNodeConfig()
graph_conf.tool_node_config.tool_prompt_f = str(prompt_dir / "tool_prompt.txt")
graph_conf.tool_node_config.chatty_sys_prompt_f = str(
prompt_dir / "chatty_prompt.txt"
)
conf = SimpleNamespace(
pipeline_id="xiaozhan",
api_key="sk",
graph_config=graph_conf,
)
module.load_tyro_conf = lambda _: conf
payload = module._collect_payload(str(tmp_path / "xiaozhan.yaml"))
assert payload.pipeline_id == "xiaozhan"
assert "chatty_prompt" in payload.prompt_dict

View File

@@ -0,0 +1,154 @@
import json
import time
import pytest
from fastapi import HTTPException
from lang_agent.components.server_pipeline_manager import ServerPipelineManager
class _DummyPipeline:
def __init__(self, model: str):
self.model = model
class _DummyConfig:
def __init__(self, llm_name: str = "qwen-plus"):
self.llm_name = llm_name
def setup(self):
return _DummyPipeline(model=self.llm_name)
def _write_registry(path, pipelines, api_keys=None):
content = {"pipelines": pipelines, "api_keys": api_keys or {}}
path.write_text(json.dumps(content, indent=2), encoding="utf-8")
# Ensure mtime changes reliably on fast CI filesystems.
time.sleep(0.01)
def test_refresh_registry_picks_up_new_pipeline(tmp_path):
registry_path = tmp_path / "pipeline_registry.json"
_write_registry(
registry_path,
pipelines={
"default": {
"enabled": True,
"config_file": None,
"overrides": {"llm_name": "qwen-plus"},
}
},
)
manager = ServerPipelineManager(
default_pipeline_id="default",
default_config=_DummyConfig(),
)
manager.load_registry(str(registry_path))
with pytest.raises(HTTPException) as exc_info:
manager.resolve_pipeline_id(
body={"pipeline_id": "blueberry"}, app_id=None, api_key="k1"
)
assert exc_info.value.status_code == 404
_write_registry(
registry_path,
pipelines={
"default": {
"enabled": True,
"config_file": None,
"overrides": {"llm_name": "qwen-plus"},
},
"blueberry": {
"enabled": True,
"config_file": None,
"overrides": {"llm_name": "qwen-max"},
},
},
)
changed = manager.refresh_registry_if_needed()
assert changed is True
resolved = manager.resolve_pipeline_id(
body={"pipeline_id": "blueberry"}, app_id=None, api_key="k1"
)
assert resolved == "blueberry"
def test_refresh_registry_invalidates_cache_for_changed_pipeline(tmp_path):
registry_path = tmp_path / "pipeline_registry.json"
_write_registry(
registry_path,
pipelines={
"blueberry": {
"enabled": True,
"config_file": None,
"overrides": {"llm_name": "qwen-plus"},
}
},
)
manager = ServerPipelineManager(
default_pipeline_id="blueberry",
default_config=_DummyConfig(),
)
manager.load_registry(str(registry_path))
first_pipeline, first_model = manager.get_pipeline("blueberry")
assert first_model == "qwen-plus"
_write_registry(
registry_path,
pipelines={
"blueberry": {
"enabled": True,
"config_file": None,
"overrides": {"llm_name": "qwen-max"},
}
},
)
changed = manager.refresh_registry_if_needed()
assert changed is True
second_pipeline, second_model = manager.get_pipeline("blueberry")
assert second_model == "qwen-max"
assert second_pipeline is not first_pipeline
def test_refresh_registry_applies_disabled_state_immediately(tmp_path):
registry_path = tmp_path / "pipeline_registry.json"
_write_registry(
registry_path,
pipelines={
"blueberry": {
"enabled": True,
"config_file": None,
"overrides": {"llm_name": "qwen-plus"},
}
},
)
manager = ServerPipelineManager(
default_pipeline_id="blueberry",
default_config=_DummyConfig(),
)
manager.load_registry(str(registry_path))
manager.get_pipeline("blueberry")
_write_registry(
registry_path,
pipelines={
"blueberry": {
"enabled": False,
"config_file": None,
"overrides": {"llm_name": "qwen-plus"},
}
},
)
changed = manager.refresh_registry_if_needed()
assert changed is True
with pytest.raises(HTTPException) as exc_info:
manager.get_pipeline("blueberry")
assert exc_info.value.status_code == 403