Compare commits
40 Commits
f58d80ac1b
...
9b3db40b94
| Author | SHA1 | Date | |
|---|---|---|---|
| 9b3db40b94 | |||
| da17f2b319 | |||
| 4b6e97d8fb | |||
| 112af37151 | |||
| 3cd46030ad | |||
| dd842fca42 | |||
| fc9f0f929d | |||
| 28d99f4b8d | |||
| dac067b6fe | |||
| e90f0afabe | |||
| 0676a68c9e | |||
| f185b70d3f | |||
| 07149e426e | |||
| 3fc3d7288c | |||
| eb7e85e4e6 | |||
| ddfda10700 | |||
| f8364bea68 | |||
| 01b0975abd | |||
| 7e23d5c056 | |||
| 3b730798f8 | |||
| 2781172724 | |||
| 26fba706f2 | |||
| ae93ef37b6 | |||
| c1b782c6b4 | |||
| ab3285a4cf | |||
| 0484343021 | |||
| b87fded473 | |||
| 8db22abf3b | |||
| f6d86f24bb | |||
| c1afebd7ba | |||
| 080631af31 | |||
| 38b0d5df15 | |||
| f7937c3744 | |||
| 867acaf717 | |||
| a2890148f9 | |||
| 55b37cc611 | |||
| c85598418d | |||
| ea605e19aa | |||
| 866edc319f | |||
| 8c6dd3344f |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -12,3 +12,4 @@ django.log
|
||||
|
||||
frontend/node_modules/
|
||||
frontend/dist/
|
||||
frontend/.vite
|
||||
@@ -1,24 +1,31 @@
|
||||
{
|
||||
"pipelines": {
|
||||
"xiaozhan": {
|
||||
"enabled": true,
|
||||
"config_file": "configs/pipelines/xiaozhan.yaml"
|
||||
"pipelines": {
|
||||
"xiaozhan": {
|
||||
"enabled": true,
|
||||
"config_file": "configs/pipelines/xiaozhan.yaml",
|
||||
"graph_id": "routing",
|
||||
"overrides": {
|
||||
"llm_name": "qwen-plus"
|
||||
}
|
||||
},
|
||||
"blueberry": {
|
||||
"enabled": true,
|
||||
"config_file": "configs/pipelines/blueberry.yaml",
|
||||
"graph_id": "react",
|
||||
"overrides": {
|
||||
"llm_name": "qwen-plus"
|
||||
}
|
||||
}
|
||||
},
|
||||
"blueberry": {
|
||||
"enabled": true,
|
||||
"config_file": "configs/pipelines/blueberry.yaml"
|
||||
"api_keys": {
|
||||
"sk-6c7091e6a95f404efb2ec30e8f51b897626d670375cdf822d78262f24ab12367": {
|
||||
"example-key-1": {
|
||||
"default_route_id": "default",
|
||||
"allowed_route_ids": [
|
||||
"xiaozhan",
|
||||
"blueberry"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"api_keys": {
|
||||
"sk-6c7091e6a95f404efb2ec30e8f51b897626d670375cdf822d78262f24ab12367": {
|
||||
"example-key-1": {
|
||||
"default_route_id": "default",
|
||||
"allowed_route_ids": [
|
||||
"xiaozhan",
|
||||
"blueberry"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
1477
frontend/package-lock.json
generated
1477
frontend/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -11,7 +11,9 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"react": "^18.3.1",
|
||||
"react-dom": "^18.3.1"
|
||||
"react-dom": "^18.3.1",
|
||||
"react-markdown": "^10.1.0",
|
||||
"remark-gfm": "^4.0.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/react": "^18.3.20",
|
||||
|
||||
1055
frontend/src/App.tsx
1055
frontend/src/App.tsx
File diff suppressed because it is too large
Load Diff
@@ -1,20 +1,33 @@
|
||||
import type {
|
||||
AvailableGraphsResponse,
|
||||
ConversationListItem,
|
||||
ConversationMessageItem,
|
||||
GraphConfigListResponse,
|
||||
GraphConfigReadResponse,
|
||||
GraphConfigUpsertRequest,
|
||||
GraphConfigUpsertResponse,
|
||||
McpAvailableToolsResponse,
|
||||
McpToolConfigResponse,
|
||||
McpToolConfigUpdateRequest,
|
||||
McpToolConfigUpdateResponse,
|
||||
PipelineCreateRequest,
|
||||
PipelineCreateResponse,
|
||||
PipelineConversationListResponse,
|
||||
PipelineConversationMessagesResponse,
|
||||
PipelineListResponse,
|
||||
PipelineStopResponse,
|
||||
RuntimeAuthInfoResponse,
|
||||
} from "../types";
|
||||
|
||||
const API_BASE_URL =
|
||||
import.meta.env.VITE_FRONT_API_BASE_URL?.trim() || "http://127.0.0.1:8001";
|
||||
import.meta.env.VITE_FRONT_API_BASE_URL?.trim() || "http://127.0.0.1:8500";
|
||||
|
||||
// Log which backend the frontend is targeting on startup, with file + line hint.
|
||||
// This runs once when the module is loaded.
|
||||
// eslint-disable-next-line no-console
|
||||
console.info(
|
||||
`[frontend] Using FRONT_API_BASE_URL=${API_BASE_URL} (src/api/frontApis.ts:16)`
|
||||
);
|
||||
|
||||
async function fetchJson<T>(path: string, init?: RequestInit): Promise<T> {
|
||||
const response = await fetch(`${API_BASE_URL}${path}`, {
|
||||
@@ -107,6 +120,10 @@ export function updateMcpToolConfig(
|
||||
});
|
||||
}
|
||||
|
||||
export function listMcpAvailableTools(): Promise<McpAvailableToolsResponse> {
|
||||
return fetchJson("/v1/tool-configs/mcp/tools");
|
||||
}
|
||||
|
||||
export function createPipeline(
|
||||
payload: PipelineCreateRequest
|
||||
): Promise<PipelineCreateResponse> {
|
||||
@@ -126,3 +143,129 @@ export function stopPipeline(pipelineId: string): Promise<PipelineStopResponse>
|
||||
});
|
||||
}
|
||||
|
||||
export function getRuntimeAuthInfo(): Promise<RuntimeAuthInfoResponse> {
|
||||
return fetchJson("/v1/runtime-auth");
|
||||
}
|
||||
|
||||
export async function listPipelineConversations(
|
||||
pipelineId: string,
|
||||
limit = 100
|
||||
): Promise<ConversationListItem[]> {
|
||||
const response = await fetchJson<PipelineConversationListResponse>(
|
||||
`/v1/pipelines/${encodeURIComponent(pipelineId)}/conversations?limit=${limit}`
|
||||
);
|
||||
return response.items || [];
|
||||
}
|
||||
|
||||
export async function getPipelineConversationMessages(
|
||||
pipelineId: string,
|
||||
conversationId: string
|
||||
): Promise<ConversationMessageItem[]> {
|
||||
const response = await fetchJson<PipelineConversationMessagesResponse>(
|
||||
`/v1/pipelines/${encodeURIComponent(pipelineId)}/conversations/${encodeURIComponent(conversationId)}/messages`
|
||||
);
|
||||
return response.items || [];
|
||||
}
|
||||
|
||||
type StreamAgentChatOptions = {
|
||||
appId: string;
|
||||
sessionId: string;
|
||||
apiKey: string;
|
||||
message: string;
|
||||
onText: (text: string) => void;
|
||||
};
|
||||
|
||||
function parseErrorDetail(payload: unknown): string | null {
|
||||
if (!payload || typeof payload !== "object") {
|
||||
return null;
|
||||
}
|
||||
const detail = (payload as { detail?: unknown }).detail;
|
||||
return typeof detail === "string" && detail.trim() ? detail : null;
|
||||
}
|
||||
|
||||
export async function streamAgentChatResponse(
|
||||
options: StreamAgentChatOptions
|
||||
): Promise<string> {
|
||||
const { appId, sessionId, apiKey, message, onText } = options;
|
||||
const response = await fetch(
|
||||
`${API_BASE_URL}/v1/apps/${encodeURIComponent(appId)}/sessions/${encodeURIComponent(sessionId)}/responses`,
|
||||
{
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Authorization: `Bearer ${apiKey}`,
|
||||
},
|
||||
body: JSON.stringify({
|
||||
messages: [{ role: "user", content: message }],
|
||||
stream: true,
|
||||
}),
|
||||
}
|
||||
);
|
||||
|
||||
if (!response.ok) {
|
||||
let messageText = `Request failed (${response.status})`;
|
||||
try {
|
||||
const payload = (await response.json()) as unknown;
|
||||
const detail = parseErrorDetail(payload);
|
||||
if (detail) {
|
||||
messageText = detail;
|
||||
}
|
||||
} catch {
|
||||
// Keep fallback status-based message.
|
||||
}
|
||||
throw new Error(messageText);
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
throw new Error("Streaming response is not available.");
|
||||
}
|
||||
|
||||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffered = "";
|
||||
let latestText = "";
|
||||
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
buffered += decoder.decode(value, { stream: true });
|
||||
|
||||
let splitIndex = buffered.indexOf("\n\n");
|
||||
while (splitIndex >= 0) {
|
||||
const eventBlock = buffered.slice(0, splitIndex);
|
||||
buffered = buffered.slice(splitIndex + 2);
|
||||
splitIndex = buffered.indexOf("\n\n");
|
||||
|
||||
const lines = eventBlock.split("\n");
|
||||
for (const rawLine of lines) {
|
||||
const line = rawLine.trim();
|
||||
if (!line.startsWith("data:")) {
|
||||
continue;
|
||||
}
|
||||
const payloadRaw = line.slice(5).trim();
|
||||
if (!payloadRaw) {
|
||||
continue;
|
||||
}
|
||||
let payload: unknown;
|
||||
try {
|
||||
payload = JSON.parse(payloadRaw);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
const nextText =
|
||||
typeof (payload as { output?: { text?: unknown } })?.output?.text === "string"
|
||||
? ((payload as { output: { text: string } }).output.text as string)
|
||||
: "";
|
||||
if (nextText !== latestText) {
|
||||
latestText = nextText;
|
||||
onText(latestText);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return latestText;
|
||||
}
|
||||
|
||||
|
||||
@@ -65,6 +65,12 @@ button:disabled {
|
||||
gap: 8px;
|
||||
}
|
||||
|
||||
.agent-item-row {
|
||||
display: grid;
|
||||
gap: 6px;
|
||||
grid-template-columns: 1fr auto;
|
||||
}
|
||||
|
||||
.agent-item {
|
||||
align-items: flex-start;
|
||||
display: flex;
|
||||
@@ -73,6 +79,33 @@ button:disabled {
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
.agent-item-title {
|
||||
align-items: center;
|
||||
display: flex;
|
||||
gap: 8px;
|
||||
justify-content: space-between;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
.agent-status-pill {
|
||||
border-radius: 999px;
|
||||
font-size: 11px;
|
||||
font-weight: 600;
|
||||
padding: 2px 8px;
|
||||
}
|
||||
|
||||
.agent-status-pill.running {
|
||||
background: #dff7e7;
|
||||
border: 1px solid #8cd3a1;
|
||||
color: #1a6b35;
|
||||
}
|
||||
|
||||
.agent-status-pill.stopped {
|
||||
background: #f2f4f7;
|
||||
border: 1px solid #d1d8e0;
|
||||
color: #4a5565;
|
||||
}
|
||||
|
||||
.agent-item.selected {
|
||||
border-color: #4d7ef3;
|
||||
background: #edf3ff;
|
||||
@@ -82,6 +115,11 @@ button:disabled {
|
||||
color: #5f6f82;
|
||||
}
|
||||
|
||||
.agent-chat-button {
|
||||
align-self: stretch;
|
||||
min-width: 64px;
|
||||
}
|
||||
|
||||
.content {
|
||||
padding: 20px;
|
||||
}
|
||||
@@ -198,6 +236,32 @@ button:disabled {
|
||||
margin-top: 0;
|
||||
}
|
||||
|
||||
.run-info-header {
|
||||
align-items: center;
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
gap: 8px;
|
||||
}
|
||||
|
||||
.runtime-badge {
|
||||
border-radius: 999px;
|
||||
font-size: 12px;
|
||||
font-weight: 600;
|
||||
padding: 4px 10px;
|
||||
}
|
||||
|
||||
.runtime-badge.running {
|
||||
background: #dff7e7;
|
||||
border: 1px solid #8cd3a1;
|
||||
color: #1a6b35;
|
||||
}
|
||||
|
||||
.runtime-badge.stopped {
|
||||
background: #f2f4f7;
|
||||
border: 1px solid #d1d8e0;
|
||||
color: #4a5565;
|
||||
}
|
||||
|
||||
.graph-arch-section {
|
||||
border: 1px solid #dbe2ea;
|
||||
border-radius: 10px;
|
||||
@@ -229,6 +293,166 @@ button:disabled {
|
||||
padding: 10px;
|
||||
}
|
||||
|
||||
.run-card-columns {
|
||||
display: grid;
|
||||
gap: 12px;
|
||||
grid-template-columns: minmax(280px, 1fr) minmax(420px, 1.3fr);
|
||||
}
|
||||
|
||||
.run-card-left,
|
||||
.run-card-right {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 6px;
|
||||
}
|
||||
|
||||
.run-card-right {
|
||||
border-left: 1px solid #dbe2ea;
|
||||
min-width: 0;
|
||||
padding-left: 12px;
|
||||
}
|
||||
|
||||
.run-card-right code {
|
||||
display: inline-block;
|
||||
max-width: 100%;
|
||||
overflow-x: auto;
|
||||
vertical-align: middle;
|
||||
white-space: nowrap;
|
||||
}
|
||||
|
||||
.discussion-section {
|
||||
background: #f7fbff;
|
||||
border: 1px solid #d7e6f6;
|
||||
border-radius: 10px;
|
||||
padding: 12px;
|
||||
}
|
||||
|
||||
.discussion-header {
|
||||
align-items: center;
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
gap: 12px;
|
||||
}
|
||||
|
||||
.discussion-header h3 {
|
||||
margin: 0;
|
||||
}
|
||||
|
||||
.discussion-layout {
|
||||
display: grid;
|
||||
gap: 12px;
|
||||
grid-template-columns: minmax(260px, 320px) 1fr;
|
||||
margin-top: 10px;
|
||||
}
|
||||
|
||||
.discussion-list {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 8px;
|
||||
max-height: 65vh;
|
||||
overflow-y: auto;
|
||||
}
|
||||
|
||||
.discussion-item {
|
||||
align-items: flex-start;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 4px;
|
||||
text-align: left;
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
.discussion-item.selected {
|
||||
background: #edf3ff;
|
||||
border-color: #4d7ef3;
|
||||
}
|
||||
|
||||
.discussion-item small {
|
||||
color: #687788;
|
||||
}
|
||||
|
||||
.discussion-thread {
|
||||
border: 1px solid #d7e6f6;
|
||||
border-radius: 10px;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 8px;
|
||||
max-height: 65vh;
|
||||
overflow-y: auto;
|
||||
padding: 10px;
|
||||
}
|
||||
|
||||
.discussion-message {
|
||||
background: #fff;
|
||||
border: 1px solid #dbe2ea;
|
||||
border-radius: 8px;
|
||||
padding: 8px;
|
||||
}
|
||||
|
||||
.discussion-message.human {
|
||||
border-left: 3px solid #4d7ef3;
|
||||
}
|
||||
|
||||
.discussion-message.ai {
|
||||
border-left: 3px solid #26a269;
|
||||
}
|
||||
|
||||
.discussion-message.tool {
|
||||
border-left: 3px solid #8e6bd8;
|
||||
}
|
||||
|
||||
.discussion-message-meta {
|
||||
align-items: baseline;
|
||||
display: flex;
|
||||
gap: 8px;
|
||||
}
|
||||
|
||||
.discussion-message pre {
|
||||
font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace;
|
||||
margin: 8px 0 0;
|
||||
overflow-x: auto;
|
||||
white-space: pre;
|
||||
}
|
||||
|
||||
.discussion-message-markdown > :first-child {
|
||||
margin-top: 0;
|
||||
}
|
||||
|
||||
.discussion-message-markdown > :last-child {
|
||||
margin-bottom: 0;
|
||||
}
|
||||
|
||||
.discussion-message-markdown code {
|
||||
background: #f3f5f8;
|
||||
border-radius: 4px;
|
||||
padding: 1px 4px;
|
||||
}
|
||||
|
||||
.discussion-message-markdown pre code {
|
||||
background: transparent;
|
||||
padding: 0;
|
||||
}
|
||||
|
||||
.discussion-message-markdown a {
|
||||
color: #1a4fc5;
|
||||
text-decoration: underline;
|
||||
}
|
||||
|
||||
.discussion-message-markdown p,
|
||||
.discussion-message-markdown ul,
|
||||
.discussion-message-markdown ol,
|
||||
.discussion-message-markdown blockquote,
|
||||
.discussion-message-markdown table {
|
||||
margin: 8px 0;
|
||||
}
|
||||
|
||||
.discussion-message-markdown blockquote {
|
||||
border-left: 3px solid #d0d8e2;
|
||||
color: #4f5f73;
|
||||
margin-left: 0;
|
||||
padding-left: 10px;
|
||||
}
|
||||
|
||||
.mcp-config-section {
|
||||
background: #f7fbff;
|
||||
border: 1px solid #d7e6f6;
|
||||
@@ -258,8 +482,149 @@ button:disabled {
|
||||
width: 100%;
|
||||
}
|
||||
|
||||
.mcp-entry-list {
|
||||
display: grid;
|
||||
gap: 12px;
|
||||
margin-top: 10px;
|
||||
}
|
||||
|
||||
.mcp-tools-error {
|
||||
color: #a33434;
|
||||
margin: 6px 0 0 0;
|
||||
}
|
||||
|
||||
.mcp-tools-inline {
|
||||
background: #f8fbff;
|
||||
border: 1px solid #d7e6f6;
|
||||
border-radius: 8px;
|
||||
margin: 0 0 10px 0;
|
||||
padding: 8px;
|
||||
}
|
||||
|
||||
.mcp-entry-card {
|
||||
background: #fff;
|
||||
border: 1px solid #d7e6f6;
|
||||
border-radius: 10px;
|
||||
padding: 10px;
|
||||
}
|
||||
|
||||
.mcp-entry-header {
|
||||
align-items: center;
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
gap: 10px;
|
||||
margin-bottom: 10px;
|
||||
}
|
||||
|
||||
.mcp-entry-grid {
|
||||
display: grid;
|
||||
gap: 10px;
|
||||
grid-template-columns: repeat(2, minmax(200px, 1fr));
|
||||
}
|
||||
|
||||
.mcp-entry-grid label {
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
font-size: 14px;
|
||||
gap: 6px;
|
||||
}
|
||||
|
||||
.mcp-entry-grid input,
|
||||
.mcp-entry-grid select {
|
||||
border: 1px solid #c9d4e2;
|
||||
border-radius: 8px;
|
||||
font-size: 14px;
|
||||
padding: 8px;
|
||||
}
|
||||
|
||||
.mcp-entry-wide {
|
||||
grid-column: 1 / -1;
|
||||
}
|
||||
|
||||
.empty {
|
||||
color: #687788;
|
||||
margin: 6px 0;
|
||||
}
|
||||
|
||||
.chat-modal-overlay {
|
||||
align-items: center;
|
||||
background: rgba(16, 24, 40, 0.45);
|
||||
display: flex;
|
||||
inset: 0;
|
||||
justify-content: center;
|
||||
position: fixed;
|
||||
z-index: 20;
|
||||
}
|
||||
|
||||
.chat-modal {
|
||||
background: #fff;
|
||||
border: 1px solid #d7e6f6;
|
||||
border-radius: 12px;
|
||||
display: grid;
|
||||
gap: 10px;
|
||||
max-height: 86vh;
|
||||
max-width: 820px;
|
||||
padding: 12px;
|
||||
width: min(92vw, 820px);
|
||||
}
|
||||
|
||||
.chat-modal-header {
|
||||
align-items: center;
|
||||
border-bottom: 1px solid #dbe2ea;
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
padding-bottom: 8px;
|
||||
}
|
||||
|
||||
.chat-modal-header small {
|
||||
color: #687788;
|
||||
display: block;
|
||||
margin-top: 2px;
|
||||
}
|
||||
|
||||
.chat-modal-messages {
|
||||
background: #f8fbff;
|
||||
border: 1px solid #d7e6f6;
|
||||
border-radius: 10px;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
gap: 8px;
|
||||
max-height: 56vh;
|
||||
overflow-y: auto;
|
||||
padding: 10px;
|
||||
}
|
||||
|
||||
.chat-modal-message {
|
||||
background: #fff;
|
||||
border: 1px solid #dbe2ea;
|
||||
border-radius: 8px;
|
||||
padding: 8px;
|
||||
}
|
||||
|
||||
.chat-modal-message.user {
|
||||
border-left: 3px solid #4d7ef3;
|
||||
}
|
||||
|
||||
.chat-modal-message.assistant {
|
||||
border-left: 3px solid #26a269;
|
||||
}
|
||||
|
||||
.chat-modal-message p {
|
||||
margin: 6px 0 0 0;
|
||||
white-space: pre-wrap;
|
||||
}
|
||||
|
||||
.chat-modal-input {
|
||||
display: grid;
|
||||
gap: 8px;
|
||||
grid-template-columns: 1fr auto;
|
||||
}
|
||||
|
||||
.chat-modal-input textarea {
|
||||
border: 1px solid #c9d4e2;
|
||||
border-radius: 8px;
|
||||
font-size: 14px;
|
||||
padding: 8px;
|
||||
resize: vertical;
|
||||
}
|
||||
|
||||
|
||||
@@ -89,6 +89,38 @@ export type PipelineStopResponse = {
|
||||
reload_required: boolean;
|
||||
};
|
||||
|
||||
export type ConversationListItem = {
|
||||
conversation_id: string;
|
||||
pipeline_id: string;
|
||||
message_count: number;
|
||||
last_updated?: string | null;
|
||||
};
|
||||
|
||||
export type PipelineConversationListResponse = {
|
||||
pipeline_id: string;
|
||||
items: ConversationListItem[];
|
||||
count: number;
|
||||
};
|
||||
|
||||
export type ConversationMessageItem = {
|
||||
message_type: string;
|
||||
content: string;
|
||||
sequence_number: number;
|
||||
created_at: string;
|
||||
};
|
||||
|
||||
export type PipelineConversationMessagesResponse = {
|
||||
pipeline_id: string;
|
||||
conversation_id: string;
|
||||
items: ConversationMessageItem[];
|
||||
count: number;
|
||||
};
|
||||
|
||||
export type RuntimeAuthInfoResponse = {
|
||||
fast_api_key: string;
|
||||
source: string;
|
||||
};
|
||||
|
||||
export type McpToolConfigResponse = {
|
||||
path: string;
|
||||
raw_content: string;
|
||||
@@ -105,3 +137,15 @@ export type McpToolConfigUpdateResponse = {
|
||||
tool_keys: string[];
|
||||
};
|
||||
|
||||
export type McpAvailableToolsResponse = {
|
||||
available_tools: string[];
|
||||
errors: string[];
|
||||
servers: Record<
|
||||
string,
|
||||
{
|
||||
tools: string[];
|
||||
error?: string | null;
|
||||
}
|
||||
>;
|
||||
};
|
||||
|
||||
|
||||
@@ -252,31 +252,29 @@ class ClientToolManager:
|
||||
with open(self.config.mcp_config_f, "r") as f:
|
||||
self.mcp_configs:dict = commentjson.load(f)
|
||||
|
||||
def _get_to_load_configs(self) -> dict:
|
||||
if self.config.tool_keys is None:
|
||||
return self.mcp_configs
|
||||
|
||||
if len(self.config.tool_keys) == 0:
|
||||
logger.info("no tools will be loaded")
|
||||
return {}
|
||||
|
||||
to_load_config = {}
|
||||
for key in self.config.tool_keys:
|
||||
val = self.mcp_configs.get(key)
|
||||
if val is None:
|
||||
logger.warning(f"{key} is not in mcp tools")
|
||||
else:
|
||||
to_load_config[key] = val
|
||||
return to_load_config
|
||||
|
||||
async def aget_tools(self):
|
||||
"""
|
||||
Get tools from all configured MCP servers.
|
||||
Handles connection failures gracefully by logging warnings and continuing.
|
||||
"""
|
||||
|
||||
def get_to_load_configs() -> dict:
|
||||
if self.config.tool_keys is None:
|
||||
to_load_config = self.mcp_configs
|
||||
else:
|
||||
if len(self.config.tool_keys) == 0:
|
||||
logger.info("no tools will be loaded")
|
||||
return {}
|
||||
|
||||
to_load_config = {}
|
||||
for key in self.config.tool_keys:
|
||||
val = self.mcp_configs.get(key)
|
||||
if val is None:
|
||||
logger.warning(f"{key} is not in mcp tools")
|
||||
else:
|
||||
to_load_config[key] = val
|
||||
|
||||
return to_load_config
|
||||
|
||||
to_load_config = get_to_load_configs()
|
||||
to_load_config = self._get_to_load_configs()
|
||||
all_tools = []
|
||||
for server_name, server_config in to_load_config.items():
|
||||
try:
|
||||
@@ -298,6 +296,78 @@ class ClientToolManager:
|
||||
|
||||
return all_tools
|
||||
|
||||
async def aget_tools_with_errors(self):
|
||||
"""
|
||||
Get tools and collect human-readable per-server errors.
|
||||
Returns:
|
||||
(all_tools, errors)
|
||||
"""
|
||||
to_load_config = self._get_to_load_configs()
|
||||
all_tools = []
|
||||
errors = []
|
||||
for server_name, server_config in to_load_config.items():
|
||||
try:
|
||||
single_server_config = {server_name: server_config}
|
||||
client = MultiServerMCPClient(single_server_config)
|
||||
tools = await client.get_tools()
|
||||
all_tools.extend(tools)
|
||||
logger.info(
|
||||
f"Successfully connected to MCP server '{server_name}', retrieved {len(tools)} tools"
|
||||
)
|
||||
except Exception as e:
|
||||
url = (
|
||||
server_config.get("url", "unknown URL")
|
||||
if isinstance(server_config, dict)
|
||||
else "unknown URL"
|
||||
)
|
||||
err_msg = (
|
||||
f"{server_name} ({url}): {type(e).__name__}: {e}"
|
||||
)
|
||||
errors.append(err_msg)
|
||||
logger.exception(
|
||||
f"Failed to connect to MCP server '{server_name}' at {url}"
|
||||
)
|
||||
if hasattr(e, "exceptions"):
|
||||
for nested_exc in e.exceptions:
|
||||
errors.append(
|
||||
f"{server_name} nested: {type(nested_exc).__name__}: {nested_exc}"
|
||||
)
|
||||
continue
|
||||
return all_tools, errors
|
||||
|
||||
async def aget_tools_by_server(self) -> dict:
|
||||
"""
|
||||
Get MCP tools grouped by server name, including per-server error (if any).
|
||||
Returns:
|
||||
{
|
||||
"server_name": {
|
||||
"tools": ["tool_a", "tool_b"],
|
||||
"error": "ExceptionType: message" | None,
|
||||
},
|
||||
...
|
||||
}
|
||||
"""
|
||||
to_load_config = self._get_to_load_configs()
|
||||
grouped = {}
|
||||
for server_name, server_config in to_load_config.items():
|
||||
grouped[server_name] = {"tools": [], "error": None}
|
||||
try:
|
||||
single_server_config = {server_name: server_config}
|
||||
client = MultiServerMCPClient(single_server_config)
|
||||
tools = await client.get_tools()
|
||||
tool_names = sorted(
|
||||
{
|
||||
str(getattr(tool, "name", "")).strip()
|
||||
for tool in tools
|
||||
if str(getattr(tool, "name", "")).strip()
|
||||
}
|
||||
)
|
||||
grouped[server_name] = {"tools": tool_names, "error": None}
|
||||
except Exception as e:
|
||||
grouped[server_name]["error"] = f"{type(e).__name__}: {e}"
|
||||
logger.exception(f"Failed to connect to MCP server '{server_name}'")
|
||||
return grouped
|
||||
|
||||
def get_tools(self):
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
@@ -4,6 +4,7 @@ from pathlib import Path as FsPath
|
||||
import os.path as osp
|
||||
import json
|
||||
import copy
|
||||
from threading import RLock
|
||||
from loguru import logger
|
||||
|
||||
from lang_agent.pipeline import Pipeline, PipelineConfig
|
||||
@@ -20,6 +21,9 @@ class ServerPipelineManager:
|
||||
self._api_key_policy: Dict[str, Dict[str, Any]] = {}
|
||||
self._pipelines: Dict[str, Pipeline] = {}
|
||||
self._pipeline_llm: Dict[str, str] = {}
|
||||
self._registry_path: Optional[str] = None
|
||||
self._registry_mtime_ns: Optional[int] = None
|
||||
self._lock = RLock()
|
||||
|
||||
def _resolve_registry_path(self, registry_path: str) -> str:
|
||||
path = FsPath(registry_path)
|
||||
@@ -30,39 +34,102 @@ class ServerPipelineManager:
|
||||
root = FsPath(__file__).resolve().parents[2]
|
||||
return str((root / path).resolve())
|
||||
|
||||
def load_registry(self, registry_path: str) -> None:
|
||||
abs_path = self._resolve_registry_path(registry_path)
|
||||
if not osp.exists(abs_path):
|
||||
raise ValueError(f"pipeline registry file not found: {abs_path}")
|
||||
def _stat_registry_mtime_ns(self, abs_path: str) -> int:
|
||||
return FsPath(abs_path).stat().st_mtime_ns
|
||||
|
||||
def _read_registry(self, abs_path: str) -> Dict[str, Any]:
|
||||
with open(abs_path, "r", encoding="utf-8") as f:
|
||||
registry: dict = json.load(f)
|
||||
return json.load(f)
|
||||
|
||||
def _apply_registry(self, abs_path: str, registry: Dict[str, Any], mtime_ns: int) -> bool:
|
||||
pipelines = registry.get("pipelines")
|
||||
if pipelines is None:
|
||||
if pipelines is None or not isinstance(pipelines, dict):
|
||||
raise ValueError("`pipelines` in pipeline registry must be an object.")
|
||||
|
||||
self._pipeline_specs = {}
|
||||
parsed_specs: Dict[str, Dict[str, Any]] = {}
|
||||
for pipeline_id, spec in pipelines.items():
|
||||
if not isinstance(spec, dict):
|
||||
raise ValueError(
|
||||
f"pipeline spec for `{pipeline_id}` must be an object."
|
||||
)
|
||||
self._pipeline_specs[pipeline_id] = {
|
||||
parsed_specs[pipeline_id] = {
|
||||
"enabled": bool(spec.get("enabled", True)),
|
||||
"config_file": spec.get("config_file"),
|
||||
"overrides": spec.get("overrides", {}),
|
||||
}
|
||||
if not self._pipeline_specs:
|
||||
if not parsed_specs:
|
||||
raise ValueError("pipeline registry must define at least one pipeline.")
|
||||
|
||||
api_key_policy = registry.get("api_keys", {})
|
||||
if api_key_policy and not isinstance(api_key_policy, dict):
|
||||
raise ValueError("`api_keys` in pipeline registry must be an object.")
|
||||
self._api_key_policy = api_key_policy
|
||||
logger.info(
|
||||
f"loaded pipeline registry: {abs_path}, pipelines={list(self._pipeline_specs.keys())}"
|
||||
|
||||
with self._lock:
|
||||
old_specs = self._pipeline_specs
|
||||
old_policy = self._api_key_policy
|
||||
old_mtime = self._registry_mtime_ns
|
||||
|
||||
removed = set(old_specs.keys()) - set(parsed_specs.keys())
|
||||
added = set(parsed_specs.keys()) - set(old_specs.keys())
|
||||
modified = {
|
||||
pipeline_id
|
||||
for pipeline_id in (set(old_specs.keys()) & set(parsed_specs.keys()))
|
||||
if old_specs[pipeline_id] != parsed_specs[pipeline_id]
|
||||
}
|
||||
changed = bool(added or removed or modified or old_policy != api_key_policy)
|
||||
|
||||
# Drop stale cache entries for deleted/changed pipelines so future requests
|
||||
# lazily rebuild from the refreshed registry spec.
|
||||
for pipeline_id in (removed | modified):
|
||||
self._pipelines.pop(pipeline_id, None)
|
||||
self._pipeline_llm.pop(pipeline_id, None)
|
||||
|
||||
self._pipeline_specs = parsed_specs
|
||||
self._api_key_policy = api_key_policy
|
||||
self._registry_path = abs_path
|
||||
self._registry_mtime_ns = mtime_ns
|
||||
|
||||
if changed:
|
||||
logger.info(
|
||||
"refreshed pipeline registry: {} | added={} modified={} removed={} mtime={}",
|
||||
abs_path,
|
||||
sorted(added),
|
||||
sorted(modified),
|
||||
sorted(removed),
|
||||
mtime_ns,
|
||||
)
|
||||
elif old_mtime != mtime_ns:
|
||||
logger.debug("pipeline registry mtime changed but specs were unchanged: {}", abs_path)
|
||||
return changed
|
||||
|
||||
def load_registry(self, registry_path: str) -> None:
|
||||
abs_path = self._resolve_registry_path(registry_path)
|
||||
if not osp.exists(abs_path):
|
||||
raise ValueError(f"pipeline registry file not found: {abs_path}")
|
||||
registry = self._read_registry(abs_path)
|
||||
mtime_ns = self._stat_registry_mtime_ns(abs_path)
|
||||
self._apply_registry(abs_path=abs_path, registry=registry, mtime_ns=mtime_ns)
|
||||
|
||||
def refresh_registry_if_needed(
|
||||
self, registry_path: Optional[str] = None, force: bool = False
|
||||
) -> bool:
|
||||
abs_path = (
|
||||
self._resolve_registry_path(registry_path)
|
||||
if registry_path
|
||||
else self._registry_path
|
||||
)
|
||||
if not abs_path:
|
||||
raise ValueError("registry path is not initialized")
|
||||
if not osp.exists(abs_path):
|
||||
raise ValueError(f"pipeline registry file not found: {abs_path}")
|
||||
|
||||
mtime_ns = self._stat_registry_mtime_ns(abs_path)
|
||||
with self._lock:
|
||||
if not force and self._registry_mtime_ns == mtime_ns:
|
||||
return False
|
||||
|
||||
registry = self._read_registry(abs_path)
|
||||
return self._apply_registry(abs_path=abs_path, registry=registry, mtime_ns=mtime_ns)
|
||||
|
||||
def _resolve_config_path(self, config_file: str) -> str:
|
||||
path = FsPath(config_file)
|
||||
@@ -91,11 +158,12 @@ class ServerPipelineManager:
|
||||
if hasattr(loaded_cfg, "setup"):
|
||||
cfg = loaded_cfg
|
||||
else:
|
||||
logger.warning(
|
||||
f"config_file for pipeline `{pipeline_id}` did not deserialize to config object; "
|
||||
"falling back to default config and applying pipeline-level overrides."
|
||||
raise ValueError(
|
||||
"config_file for pipeline "
|
||||
f"`{pipeline_id}` did not deserialize to a config object. "
|
||||
"Rebuild the pipeline via /v1/pipelines to regenerate a "
|
||||
"valid serialized PipelineConfig file."
|
||||
)
|
||||
cfg = copy.deepcopy(self.default_config)
|
||||
else:
|
||||
cfg = copy.deepcopy(self.default_config)
|
||||
if not isinstance(overrides, dict):
|
||||
@@ -138,29 +206,33 @@ class ServerPipelineManager:
|
||||
or app_id
|
||||
)
|
||||
|
||||
if not pipeline_id:
|
||||
key_policy = (
|
||||
self._api_key_policy.get(api_key, {}) if self._api_key_policy else {}
|
||||
)
|
||||
pipeline_id = key_policy.get(
|
||||
"default_pipeline_id", self.default_pipeline_id
|
||||
)
|
||||
with self._lock:
|
||||
if not pipeline_id:
|
||||
key_policy = (
|
||||
self._api_key_policy.get(api_key, {}) if self._api_key_policy else {}
|
||||
)
|
||||
pipeline_id = key_policy.get(
|
||||
"default_pipeline_id", self.default_pipeline_id
|
||||
)
|
||||
|
||||
if pipeline_id not in self._pipeline_specs:
|
||||
raise HTTPException(
|
||||
status_code=404, detail=f"Unknown pipeline_id: {pipeline_id}"
|
||||
)
|
||||
if pipeline_id not in self._pipeline_specs:
|
||||
raise HTTPException(
|
||||
status_code=404, detail=f"Unknown pipeline_id: {pipeline_id}"
|
||||
)
|
||||
|
||||
self._authorize(api_key, pipeline_id)
|
||||
self._authorize(api_key, pipeline_id)
|
||||
return pipeline_id
|
||||
|
||||
def get_pipeline(self, pipeline_id: str) -> Tuple[Pipeline, str]:
|
||||
cached = self._pipelines.get(pipeline_id)
|
||||
if cached is not None:
|
||||
return cached, self._pipeline_llm[pipeline_id]
|
||||
with self._lock:
|
||||
cached = self._pipelines.get(pipeline_id)
|
||||
if cached is not None:
|
||||
return cached, self._pipeline_llm[pipeline_id]
|
||||
|
||||
pipeline_obj, llm_name = self._build_pipeline(pipeline_id)
|
||||
self._pipelines[pipeline_id] = pipeline_obj
|
||||
self._pipeline_llm[pipeline_id] = llm_name
|
||||
# Build while holding the lock to avoid duplicate construction for
|
||||
# the same pipeline on concurrent first requests.
|
||||
pipeline_obj, llm_name = self._build_pipeline(pipeline_id)
|
||||
self._pipelines[pipeline_id] = pipeline_obj
|
||||
self._pipeline_llm[pipeline_id] = llm_name
|
||||
logger.info(f"lazy-loaded pipeline_id={pipeline_id} model={llm_name}")
|
||||
return pipeline_obj, llm_name
|
||||
|
||||
@@ -12,5 +12,7 @@ from lang_agent.config.constants import (
|
||||
PIPELINE_REGISTRY_PATH,
|
||||
VALID_API_KEYS,
|
||||
API_KEY_HEADER,
|
||||
API_KEY_HEADER_NO_ERROR
|
||||
API_KEY_HEADER_NO_ERROR,
|
||||
_PROJECT_ROOT,
|
||||
TY_BUILD_SCRIPT,
|
||||
)
|
||||
|
||||
@@ -15,3 +15,5 @@ API_KEY_HEADER = APIKeyHeader(name="Authorization", auto_error=True)
|
||||
API_KEY_HEADER_NO_ERROR = APIKeyHeader(name="Authorization", auto_error=False)
|
||||
|
||||
VALID_API_KEYS = set(filter(None, os.environ.get("FAST_AUTH_KEYS", "").split(",")))
|
||||
|
||||
TY_BUILD_SCRIPT = osp.join(_PROJECT_ROOT, "lang_agent", "config", "ty_build_config.py")
|
||||
@@ -96,7 +96,7 @@ class Evaluator:
|
||||
|
||||
df_m.to_csv(metric_f)
|
||||
|
||||
self.config.save_config(f"{head_path}-{n_exp}.yml")
|
||||
self.config.save_config(f"{head_path}-{n_exp}.yaml")
|
||||
|
||||
def format_result_df(self, df:pd.DataFrame):
|
||||
|
||||
|
||||
0
lang_agent/fastapi_server/__init__.py
Normal file
0
lang_agent/fastapi_server/__init__.py
Normal file
33
lang_agent/fastapi_server/combined.py
Normal file
33
lang_agent/fastapi_server/combined.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
import uvicorn
|
||||
|
||||
from lang_agent.fastapi_server.front_apis import app as front_app
|
||||
from lang_agent.fastapi_server.server_dashscope import create_dashscope_router
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="Combined Front + DashScope APIs",
|
||||
description=(
|
||||
"Single-process app exposing front_apis control endpoints and "
|
||||
"DashScope-compatible chat endpoints."
|
||||
),
|
||||
)
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# Keep existing /v1/... admin APIs unchanged.
|
||||
app.include_router(front_app.router)
|
||||
|
||||
# Add DashScope endpoints at their existing URLs. We intentionally skip
|
||||
# DashScope's root/health routes to avoid clashing with front_apis.
|
||||
app.include_router(create_dashscope_router(include_meta_routes=False))
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="0.0.0.0", port=8500)
|
||||
@@ -4,6 +4,7 @@ import os
|
||||
import os.path as osp
|
||||
import sys
|
||||
import json
|
||||
import psycopg
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
@@ -23,6 +24,10 @@ from lang_agent.front_api.build_server_utils import (
|
||||
GRAPH_BUILD_FNCS,
|
||||
update_pipeline_registry,
|
||||
)
|
||||
from lang_agent.components.client_tool_manager import (
|
||||
ClientToolManager,
|
||||
ClientToolManagerConfig,
|
||||
)
|
||||
|
||||
|
||||
class GraphConfigUpsertRequest(BaseModel):
|
||||
@@ -114,6 +119,38 @@ class PipelineStopResponse(BaseModel):
|
||||
reload_required: bool
|
||||
|
||||
|
||||
class ConversationListItem(BaseModel):
|
||||
conversation_id: str
|
||||
pipeline_id: str
|
||||
message_count: int
|
||||
last_updated: Optional[str] = Field(default=None)
|
||||
|
||||
|
||||
class PipelineConversationListResponse(BaseModel):
|
||||
pipeline_id: str
|
||||
items: List[ConversationListItem]
|
||||
count: int
|
||||
|
||||
|
||||
class ConversationMessageItem(BaseModel):
|
||||
message_type: str
|
||||
content: str
|
||||
sequence_number: int
|
||||
created_at: str
|
||||
|
||||
|
||||
class PipelineConversationMessagesResponse(BaseModel):
|
||||
pipeline_id: str
|
||||
conversation_id: str
|
||||
items: List[ConversationMessageItem]
|
||||
count: int
|
||||
|
||||
|
||||
class RuntimeAuthInfoResponse(BaseModel):
|
||||
fast_api_key: str
|
||||
source: str
|
||||
|
||||
|
||||
class ApiKeyPolicyItem(BaseModel):
|
||||
api_key: str
|
||||
default_pipeline_id: Optional[str] = Field(default=None)
|
||||
@@ -154,6 +191,12 @@ class McpConfigUpdateResponse(BaseModel):
|
||||
tool_keys: List[str]
|
||||
|
||||
|
||||
class McpAvailableToolsResponse(BaseModel):
|
||||
available_tools: List[str] = Field(default_factory=list)
|
||||
errors: List[str] = Field(default_factory=list)
|
||||
servers: Dict[str, Dict[str, Any]] = Field(default_factory=dict)
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="Front APIs",
|
||||
description="Manage graph configs and launch graph pipelines.",
|
||||
@@ -190,11 +233,15 @@ async def root():
|
||||
"/v1/pipelines (POST) - build config + upsert pipeline registry entry",
|
||||
"/v1/pipelines (GET) - list registry pipeline specs",
|
||||
"/v1/pipelines/{pipeline_id} (DELETE) - disable pipeline in registry",
|
||||
"/v1/runtime-auth (GET) - show runtime FAST API key info",
|
||||
"/v1/pipelines/{pipeline_id}/conversations (GET) - list pipeline conversations",
|
||||
"/v1/pipelines/{pipeline_id}/conversations/{conversation_id}/messages (GET) - list messages in a conversation",
|
||||
"/v1/pipelines/api-keys (GET) - list API key routing policies",
|
||||
"/v1/pipelines/api-keys/{api_key} (PUT) - upsert API key routing policy",
|
||||
"/v1/pipelines/api-keys/{api_key} (DELETE) - delete API key routing policy",
|
||||
"/v1/tool-configs/mcp (GET)",
|
||||
"/v1/tool-configs/mcp (PUT)",
|
||||
"/v1/tool-configs/mcp/tools (GET)",
|
||||
],
|
||||
}
|
||||
|
||||
@@ -240,6 +287,30 @@ def _write_pipeline_registry(registry: Dict[str, Any]) -> None:
|
||||
f.write("\n")
|
||||
|
||||
|
||||
def _resolve_runtime_fast_api_key() -> RuntimeAuthInfoResponse:
|
||||
"""Pick a runtime auth key from pipeline registry first, then FAST_AUTH_KEYS env."""
|
||||
try:
|
||||
registry = _read_pipeline_registry()
|
||||
api_keys = registry.get("api_keys", {})
|
||||
if isinstance(api_keys, dict):
|
||||
for key in api_keys.keys():
|
||||
candidate = str(key).strip()
|
||||
if candidate:
|
||||
return RuntimeAuthInfoResponse(
|
||||
fast_api_key=candidate, source="pipeline_registry"
|
||||
)
|
||||
except Exception:
|
||||
# fall back to env parsing below
|
||||
pass
|
||||
|
||||
raw_env = os.environ.get("FAST_AUTH_KEYS", "")
|
||||
for token in raw_env.split(","):
|
||||
candidate = token.strip()
|
||||
if candidate:
|
||||
return RuntimeAuthInfoResponse(fast_api_key=candidate, source="env")
|
||||
return RuntimeAuthInfoResponse(fast_api_key="", source="none")
|
||||
|
||||
|
||||
def _normalize_pipeline_spec(pipeline_id: str, spec: Dict[str, Any]) -> PipelineSpec:
|
||||
if not isinstance(spec, dict):
|
||||
raise ValueError(f"pipeline spec for '{pipeline_id}' must be an object")
|
||||
@@ -459,6 +530,38 @@ async def update_mcp_tool_config(body: McpConfigUpdateRequest):
|
||||
)
|
||||
|
||||
|
||||
@app.get("/v1/tool-configs/mcp/tools", response_model=McpAvailableToolsResponse)
|
||||
async def list_mcp_available_tools():
|
||||
try:
|
||||
_read_mcp_config_raw()
|
||||
manager = ClientToolManager(
|
||||
ClientToolManagerConfig(mcp_config_f=MCP_CONFIG_PATH)
|
||||
)
|
||||
servers = await manager.aget_tools_by_server()
|
||||
available_tools = sorted(
|
||||
{
|
||||
tool_name
|
||||
for server_info in servers.values()
|
||||
for tool_name in server_info.get("tools", [])
|
||||
}
|
||||
)
|
||||
errors = [
|
||||
f"{server_name}: {server_info.get('error')}"
|
||||
for server_name, server_info in servers.items()
|
||||
if server_info.get("error")
|
||||
]
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
return McpAvailableToolsResponse(
|
||||
available_tools=available_tools,
|
||||
errors=errors,
|
||||
servers=servers,
|
||||
)
|
||||
|
||||
|
||||
@app.get("/v1/pipelines", response_model=PipelineListResponse)
|
||||
async def list_running_pipelines():
|
||||
try:
|
||||
@@ -511,7 +614,7 @@ async def create_pipeline(body: PipelineCreateRequest):
|
||||
),
|
||||
)
|
||||
|
||||
config_file = f"configs/pipelines/{pipeline_id}.yml"
|
||||
config_file = f"configs/pipelines/{pipeline_id}.yaml"
|
||||
config_abs_dir = osp.join(_PROJECT_ROOT, "configs", "pipelines")
|
||||
try:
|
||||
build_fn(
|
||||
@@ -555,7 +658,7 @@ async def create_pipeline(body: PipelineCreateRequest):
|
||||
config_file=normalized.config_file,
|
||||
llm_name=normalized.llm_name,
|
||||
enabled=normalized.enabled,
|
||||
reload_required=True,
|
||||
reload_required=False,
|
||||
registry_path=PIPELINE_REGISTRY_PATH,
|
||||
)
|
||||
|
||||
@@ -583,7 +686,129 @@ async def stop_pipeline(pipeline_id: str):
|
||||
pipeline_id=pipeline_id,
|
||||
status="disabled",
|
||||
enabled=False,
|
||||
reload_required=True,
|
||||
reload_required=False,
|
||||
)
|
||||
|
||||
|
||||
@app.get("/v1/runtime-auth", response_model=RuntimeAuthInfoResponse)
|
||||
async def get_runtime_auth_info():
|
||||
return _resolve_runtime_fast_api_key()
|
||||
|
||||
|
||||
@app.get(
|
||||
"/v1/pipelines/{pipeline_id}/conversations",
|
||||
response_model=PipelineConversationListResponse,
|
||||
)
|
||||
async def list_pipeline_conversations(pipeline_id: str, limit: int = 100):
|
||||
if limit < 1 or limit > 500:
|
||||
raise HTTPException(status_code=400, detail="limit must be between 1 and 500")
|
||||
|
||||
conn_str = os.environ.get("CONN_STR")
|
||||
if not conn_str:
|
||||
raise HTTPException(status_code=500, detail="CONN_STR not set")
|
||||
|
||||
try:
|
||||
with psycopg.connect(conn_str) as conn:
|
||||
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
conversation_id,
|
||||
pipeline_id,
|
||||
COUNT(*) AS message_count,
|
||||
MAX(created_at) AS last_updated
|
||||
FROM messages
|
||||
WHERE pipeline_id = %s
|
||||
GROUP BY conversation_id, pipeline_id
|
||||
ORDER BY last_updated DESC
|
||||
LIMIT %s
|
||||
""",
|
||||
(pipeline_id, limit),
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
items = [
|
||||
ConversationListItem(
|
||||
conversation_id=str(row["conversation_id"]),
|
||||
pipeline_id=str(row["pipeline_id"]),
|
||||
message_count=int(row["message_count"]),
|
||||
last_updated=(
|
||||
row["last_updated"].isoformat() if row.get("last_updated") else None
|
||||
),
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
return PipelineConversationListResponse(
|
||||
pipeline_id=pipeline_id, items=items, count=len(items)
|
||||
)
|
||||
|
||||
|
||||
@app.get(
|
||||
"/v1/pipelines/{pipeline_id}/conversations/{conversation_id}/messages",
|
||||
response_model=PipelineConversationMessagesResponse,
|
||||
)
|
||||
async def get_pipeline_conversation_messages(pipeline_id: str, conversation_id: str):
|
||||
conn_str = os.environ.get("CONN_STR")
|
||||
if not conn_str:
|
||||
raise HTTPException(status_code=500, detail="CONN_STR not set")
|
||||
|
||||
try:
|
||||
with psycopg.connect(conn_str) as conn:
|
||||
with conn.cursor(row_factory=psycopg.rows.dict_row) as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM messages
|
||||
WHERE pipeline_id = %s AND conversation_id = %s
|
||||
LIMIT 1
|
||||
""",
|
||||
(pipeline_id, conversation_id),
|
||||
)
|
||||
exists = cur.fetchone()
|
||||
if exists is None:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=(
|
||||
f"conversation_id '{conversation_id}' not found for "
|
||||
f"pipeline '{pipeline_id}'"
|
||||
),
|
||||
)
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
message_type,
|
||||
content,
|
||||
sequence_number,
|
||||
created_at
|
||||
FROM messages
|
||||
WHERE pipeline_id = %s AND conversation_id = %s
|
||||
ORDER BY sequence_number ASC
|
||||
""",
|
||||
(pipeline_id, conversation_id),
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
items = [
|
||||
ConversationMessageItem(
|
||||
message_type=str(row["message_type"]),
|
||||
content=str(row["content"]),
|
||||
sequence_number=int(row["sequence_number"]),
|
||||
created_at=row["created_at"].isoformat() if row.get("created_at") else "",
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
return PipelineConversationMessagesResponse(
|
||||
pipeline_id=pipeline_id,
|
||||
conversation_id=conversation_id,
|
||||
items=items,
|
||||
count=len(items),
|
||||
)
|
||||
|
||||
|
||||
@@ -688,5 +913,15 @@ async def delete_pipeline_api_key_policy(api_key: str):
|
||||
return ApiKeyPolicyDeleteResponse(
|
||||
api_key=normalized_key,
|
||||
status="deleted",
|
||||
reload_required=True,
|
||||
reload_required=False,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(
|
||||
"front_apis:app",
|
||||
host="0.0.0.0",
|
||||
port=8500,
|
||||
reload=True,
|
||||
)
|
||||
@@ -1,9 +1,8 @@
|
||||
from fastapi import FastAPI, HTTPException, Path, Request, Depends, Security
|
||||
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Path, Request, Security
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import StreamingResponse, JSONResponse
|
||||
from fastapi.security import APIKeyHeader
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from typing import Any, Dict, List, Optional
|
||||
import os
|
||||
import os.path as osp
|
||||
import sys
|
||||
@@ -20,16 +19,28 @@ from lang_agent.pipeline import PipelineConfig
|
||||
from lang_agent.components.server_pipeline_manager import ServerPipelineManager
|
||||
from lang_agent.config.constants import PIPELINE_REGISTRY_PATH, API_KEY_HEADER, VALID_API_KEYS
|
||||
|
||||
# Load base config for route-level overrides (pipelines are lazy-loaded from registry)
|
||||
pipeline_config = tyro.cli(PipelineConfig)
|
||||
logger.info(f"starting agent with base pipeline config: \n{pipeline_config}")
|
||||
def _build_default_pipeline_config() -> PipelineConfig:
|
||||
"""
|
||||
Build import-time defaults without parsing CLI args.
|
||||
|
||||
This keeps module import safe for reuse by combined apps and tests.
|
||||
"""
|
||||
pipeline_config = PipelineConfig()
|
||||
logger.info(f"starting agent with base pipeline config: \n{pipeline_config}")
|
||||
return pipeline_config
|
||||
|
||||
|
||||
PIPELINE_MANAGER = ServerPipelineManager(
|
||||
default_pipeline_id=os.environ.get("FAST_DEFAULT_PIPELINE_ID", "default"),
|
||||
default_config=pipeline_config,
|
||||
)
|
||||
PIPELINE_MANAGER.load_registry(PIPELINE_REGISTRY_PATH)
|
||||
def _build_pipeline_manager(base_config: PipelineConfig) -> ServerPipelineManager:
|
||||
pipeline_manager = ServerPipelineManager(
|
||||
default_pipeline_id=os.environ.get("FAST_DEFAULT_PIPELINE_ID", "default"),
|
||||
default_config=base_config,
|
||||
)
|
||||
pipeline_manager.load_registry(PIPELINE_REGISTRY_PATH)
|
||||
return pipeline_manager
|
||||
|
||||
|
||||
pipeline_config = _build_default_pipeline_config()
|
||||
PIPELINE_MANAGER = _build_pipeline_manager(pipeline_config)
|
||||
|
||||
|
||||
async def verify_api_key(api_key: str = Security(API_KEY_HEADER)):
|
||||
@@ -55,20 +66,6 @@ class DSApplicationCallRequest(BaseModel):
|
||||
thread_id: Optional[str] = Field(default="3")
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="DashScope-Compatible Application API",
|
||||
description="DashScope Application.call compatible endpoint backed by pipeline.chat",
|
||||
)
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
def sse_chunks_from_stream(
|
||||
chunk_generator, response_id: str, model: str = "qwen-flash"
|
||||
):
|
||||
@@ -188,7 +185,14 @@ async def _process_dashscope_request(
|
||||
app_id: Optional[str],
|
||||
session_id: Optional[str],
|
||||
api_key: str,
|
||||
pipeline_manager: ServerPipelineManager,
|
||||
):
|
||||
try:
|
||||
pipeline_manager.refresh_registry_if_needed()
|
||||
except Exception as e:
|
||||
logger.error(f"failed to refresh pipeline registry: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to refresh pipeline registry: {e}")
|
||||
|
||||
req_app_id = app_id or body.get("app_id")
|
||||
body_input = body.get("input", {}) if isinstance(body.get("input"), dict) else {}
|
||||
req_session_id = session_id or body_input.get("session_id")
|
||||
@@ -201,10 +205,10 @@ async def _process_dashscope_request(
|
||||
thread_id = body_input.get("session_id") or req_session_id or "3"
|
||||
user_msg = _extract_user_message(messages)
|
||||
|
||||
pipeline_id = PIPELINE_MANAGER.resolve_pipeline_id(
|
||||
pipeline_id = pipeline_manager.resolve_pipeline_id(
|
||||
body=body, app_id=req_app_id, api_key=api_key
|
||||
)
|
||||
selected_pipeline, selected_model = PIPELINE_MANAGER.get_pipeline(pipeline_id)
|
||||
selected_pipeline, selected_model = pipeline_manager.get_pipeline(pipeline_id)
|
||||
|
||||
# Namespace thread ids to prevent memory collisions across pipelines.
|
||||
thread_id = f"{pipeline_id}:{thread_id}"
|
||||
@@ -245,76 +249,117 @@ async def _process_dashscope_request(
|
||||
return JSONResponse(content=data)
|
||||
|
||||
|
||||
@app.post("/v1/apps/{app_id}/sessions/{session_id}/responses")
|
||||
@app.post("/api/v1/apps/{app_id}/sessions/{session_id}/responses")
|
||||
async def application_responses(
|
||||
request: Request,
|
||||
app_id: str = Path(...),
|
||||
session_id: str = Path(...),
|
||||
api_key: str = Depends(verify_api_key),
|
||||
):
|
||||
try:
|
||||
body = await request.json()
|
||||
return await _process_dashscope_request(
|
||||
body=body,
|
||||
app_id=app_id,
|
||||
session_id=session_id,
|
||||
api_key=api_key,
|
||||
def create_dashscope_router(
|
||||
pipeline_manager: Optional[ServerPipelineManager] = None,
|
||||
include_meta_routes: bool = True,
|
||||
) -> APIRouter:
|
||||
manager = pipeline_manager or PIPELINE_MANAGER
|
||||
router = APIRouter()
|
||||
|
||||
@router.post("/v1/apps/{app_id}/sessions/{session_id}/responses")
|
||||
@router.post("/api/v1/apps/{app_id}/sessions/{session_id}/responses")
|
||||
async def application_responses(
|
||||
request: Request,
|
||||
app_id: str = Path(...),
|
||||
session_id: str = Path(...),
|
||||
api_key: str = Depends(verify_api_key),
|
||||
):
|
||||
try:
|
||||
body = await request.json()
|
||||
return await _process_dashscope_request(
|
||||
body=body,
|
||||
app_id=app_id,
|
||||
session_id=session_id,
|
||||
api_key=api_key,
|
||||
pipeline_manager=manager,
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"DashScope-compatible endpoint error: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
# Compatibility: some SDKs call /apps/{app_id}/completion without /v1 and
|
||||
# without session in path.
|
||||
@router.post("/apps/{app_id}/completion")
|
||||
@router.post("/v1/apps/{app_id}/completion")
|
||||
@router.post("/api/apps/{app_id}/completion")
|
||||
@router.post("/api/v1/apps/{app_id}/completion")
|
||||
async def application_completion(
|
||||
request: Request,
|
||||
app_id: str = Path(...),
|
||||
api_key: str = Depends(verify_api_key),
|
||||
):
|
||||
try:
|
||||
body = await request.json()
|
||||
return await _process_dashscope_request(
|
||||
body=body,
|
||||
app_id=app_id,
|
||||
session_id=None,
|
||||
api_key=api_key,
|
||||
pipeline_manager=manager,
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"DashScope-compatible completion error: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
if include_meta_routes:
|
||||
@router.get("/")
|
||||
async def root():
|
||||
return {
|
||||
"message": "DashScope Application-compatible API",
|
||||
"endpoints": [
|
||||
"/v1/apps/{app_id}/sessions/{session_id}/responses",
|
||||
"/health",
|
||||
],
|
||||
}
|
||||
|
||||
@router.get("/health")
|
||||
async def health():
|
||||
return {"status": "healthy"}
|
||||
|
||||
return router
|
||||
|
||||
|
||||
def create_dashscope_app(
|
||||
pipeline_manager: Optional[ServerPipelineManager] = None,
|
||||
) -> FastAPI:
|
||||
dashscope_app = FastAPI(
|
||||
title="DashScope-Compatible Application API",
|
||||
description="DashScope Application.call compatible endpoint backed by pipeline.chat",
|
||||
)
|
||||
dashscope_app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
dashscope_app.include_router(
|
||||
create_dashscope_router(
|
||||
pipeline_manager=pipeline_manager,
|
||||
include_meta_routes=True,
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"DashScope-compatible endpoint error: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
)
|
||||
return dashscope_app
|
||||
|
||||
|
||||
# Compatibility: some SDKs call /apps/{app_id}/completion without /v1 and without session in path
|
||||
@app.post("/apps/{app_id}/completion")
|
||||
@app.post("/v1/apps/{app_id}/completion")
|
||||
@app.post("/api/apps/{app_id}/completion")
|
||||
@app.post("/api/v1/apps/{app_id}/completion")
|
||||
async def application_completion(
|
||||
request: Request,
|
||||
app_id: str = Path(...),
|
||||
api_key: str = Depends(verify_api_key),
|
||||
):
|
||||
try:
|
||||
body = await request.json()
|
||||
return await _process_dashscope_request(
|
||||
body=body,
|
||||
app_id=app_id,
|
||||
session_id=None,
|
||||
api_key=api_key,
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"DashScope-compatible completion error: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def root():
|
||||
return {
|
||||
"message": "DashScope Application-compatible API",
|
||||
"endpoints": [
|
||||
"/v1/apps/{app_id}/sessions/{session_id}/responses",
|
||||
"/health",
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
return {"status": "healthy"}
|
||||
dashscope_router = create_dashscope_router(include_meta_routes=False)
|
||||
app = create_dashscope_app()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# CLI parsing is intentionally only in script mode to keep module import safe.
|
||||
cli_pipeline_config = tyro.cli(PipelineConfig)
|
||||
logger.info(f"starting agent with CLI pipeline config: \n{cli_pipeline_config}")
|
||||
cli_pipeline_manager = _build_pipeline_manager(cli_pipeline_config)
|
||||
uvicorn.run(
|
||||
"server_dashscope:app",
|
||||
host="0.0.0.0",
|
||||
port=pipeline_config.port,
|
||||
reload=True,
|
||||
create_dashscope_app(pipeline_manager=cli_pipeline_manager),
|
||||
host=cli_pipeline_config.host,
|
||||
port=cli_pipeline_config.port,
|
||||
reload=False,
|
||||
)
|
||||
@@ -5,15 +5,13 @@ import subprocess
|
||||
import json
|
||||
|
||||
from lang_agent.config.core_config import load_tyro_conf
|
||||
|
||||
_PROJECT_ROOT = osp.dirname(osp.dirname(osp.dirname(osp.abspath(__file__))))
|
||||
_TY_BUILD_SCRIPT = osp.join(_PROJECT_ROOT, "lang_agent", "config", "ty_build_config.py")
|
||||
from lang_agent.config.constants import TY_BUILD_SCRIPT, _PROJECT_ROOT
|
||||
|
||||
|
||||
def opt_to_config(save_path: str, *nargs):
|
||||
os.makedirs(osp.dirname(save_path), exist_ok=True)
|
||||
subprocess.run(
|
||||
["python", _TY_BUILD_SCRIPT, "--save-path", save_path, *nargs],
|
||||
["python", TY_BUILD_SCRIPT, "--save-path", save_path, *nargs],
|
||||
check=True,
|
||||
cwd=_PROJECT_ROOT,
|
||||
)
|
||||
@@ -22,7 +20,7 @@ def opt_to_config(save_path: str, *nargs):
|
||||
def _build_and_load_pipeline_config(
|
||||
pipeline_id: str, pipeline_config_dir: str, cmd: List[str]
|
||||
):
|
||||
save_config_f = osp.join(pipeline_config_dir, f"{pipeline_id}.yml")
|
||||
save_config_f = osp.join(pipeline_config_dir, f"{pipeline_id}.yaml")
|
||||
opt_to_config(save_config_f, *cmd)
|
||||
|
||||
# TODO: think if returning the built pipeline is better or just the config obj for front_api
|
||||
@@ -67,6 +65,7 @@ def build_route(
|
||||
pipeline_config_dir="configs/pipelines",
|
||||
):
|
||||
cmd_opt = [
|
||||
"--pipeline.pipeline-id", pipeline_id,
|
||||
"route", # ------------
|
||||
"--llm-name", llm_name,
|
||||
"--api-key", api_key,
|
||||
@@ -96,6 +95,7 @@ def build_react(
|
||||
pipeline_config_dir="configs/pipelines",
|
||||
):
|
||||
cmd_opt = [
|
||||
"--pipeline.pipeline-id", pipeline_id,
|
||||
"react", # ------------
|
||||
"--llm-name", llm_name,
|
||||
"--api-key", api_key,
|
||||
|
||||
@@ -62,7 +62,7 @@ class PipelineConfig(LLMNodeConfig):
|
||||
host: str = "0.0.0.0"
|
||||
"""where am I hosted"""
|
||||
|
||||
port: int = 8588
|
||||
port: int = 8500
|
||||
"""what is my port"""
|
||||
|
||||
# graph_config: AnnotatedGraph = field(default_factory=ReactGraphConfig)
|
||||
|
||||
141
scripts/py_scripts/chat_dashcope.py
Normal file
141
scripts/py_scripts/chat_dashcope.py
Normal file
@@ -0,0 +1,141 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Simple chat loop to interact with the blueberry pipeline via DashScope-compatible API.
|
||||
|
||||
Usage:
|
||||
python chat_blueberry.py
|
||||
|
||||
The script connects to the server running on http://localhost:8500
|
||||
and uses the API key from the pipeline registry.
|
||||
"""
|
||||
|
||||
import requests
|
||||
import json
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
|
||||
# Configuration from pipeline_registry.json
|
||||
API_KEY = "sk-6c7091e6a95f404efb2ec30e8f51b897626d670375cdf822d78262f24ab12367"
|
||||
PIPELINE_ID = "blueberry"
|
||||
BASE_URL = "http://localhost:8500"
|
||||
SESSION_ID = "chat-session-1"
|
||||
|
||||
|
||||
def send_message(
|
||||
message: str,
|
||||
session_id: str = SESSION_ID,
|
||||
stream: bool = False,
|
||||
app_id: str = PIPELINE_ID,
|
||||
) -> Optional[str]:
|
||||
"""Send a message to the blueberry pipeline and return the response."""
|
||||
url = f"{BASE_URL}/v1/apps/{app_id}/sessions/{session_id}/responses"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {API_KEY}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
payload = {
|
||||
"messages": [
|
||||
{"role": "user", "content": message}
|
||||
],
|
||||
"stream": stream,
|
||||
}
|
||||
|
||||
try:
|
||||
if stream:
|
||||
# Handle streaming response
|
||||
response = requests.post(url, headers=headers, json=payload, stream=True)
|
||||
response.raise_for_status()
|
||||
|
||||
accumulated_text = ""
|
||||
for line in response.iter_lines():
|
||||
if line:
|
||||
line_str = line.decode('utf-8')
|
||||
if line_str.startswith('data: '):
|
||||
data_str = line_str[6:] # Remove 'data: ' prefix
|
||||
try:
|
||||
data = json.loads(data_str)
|
||||
output = data.get("output", {})
|
||||
text = output.get("text", "")
|
||||
if text:
|
||||
accumulated_text = text
|
||||
# Print incremental updates (you can modify this behavior)
|
||||
print(f"\rAssistant: {accumulated_text}", end="", flush=True)
|
||||
|
||||
if data.get("is_end", False):
|
||||
print() # New line after streaming completes
|
||||
return accumulated_text
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return accumulated_text
|
||||
else:
|
||||
# Handle non-streaming response
|
||||
response = requests.post(url, headers=headers, json=payload)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
output = data.get("output", {})
|
||||
return output.get("text", "")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Error sending message: {e}", file=sys.stderr)
|
||||
if hasattr(e, 'response') and e.response is not None:
|
||||
try:
|
||||
error_detail = e.response.json()
|
||||
print(f"Error details: {error_detail}", file=sys.stderr)
|
||||
except:
|
||||
print(f"Response status: {e.response.status_code}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
"""Main chat loop."""
|
||||
print("=" * 60)
|
||||
print(f"Chat with Blueberry Pipeline")
|
||||
print(f"Pipeline ID: {PIPELINE_ID}")
|
||||
print(f"Server: {BASE_URL}")
|
||||
print(f"Session ID: {SESSION_ID}")
|
||||
print("=" * 60)
|
||||
print("Type your messages (or 'quit'/'exit' to end, 'stream' to toggle streaming)")
|
||||
print("Streaming mode is ON by default")
|
||||
print()
|
||||
|
||||
stream_mode = True
|
||||
|
||||
while True:
|
||||
try:
|
||||
user_input = input("You: ").strip()
|
||||
|
||||
if not user_input:
|
||||
continue
|
||||
|
||||
if user_input.lower() in ['quit', 'exit', 'q']:
|
||||
print("Goodbye!")
|
||||
break
|
||||
|
||||
if user_input.lower() == 'stream':
|
||||
stream_mode = not stream_mode
|
||||
print(f"Streaming mode: {'ON' if stream_mode else 'OFF'}")
|
||||
continue
|
||||
|
||||
print("Assistant: ", end="", flush=True)
|
||||
response = send_message(user_input, stream=stream_mode)
|
||||
|
||||
if response is None:
|
||||
print("(No response received)")
|
||||
elif not stream_mode:
|
||||
print(response)
|
||||
# For streaming, the response is already printed incrementally
|
||||
|
||||
print() # Empty line for readability
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\nGoodbye!")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"\nError: {e}", file=sys.stderr)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
364
scripts/py_scripts/migrate_yaml_prompts_to_db.py
Normal file
364
scripts/py_scripts/migrate_yaml_prompts_to_db.py
Normal file
@@ -0,0 +1,364 @@
|
||||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import datetime as dt
|
||||
import glob
|
||||
import os
|
||||
import os.path as osp
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Iterable, List, Optional
|
||||
|
||||
import commentjson
|
||||
import psycopg
|
||||
|
||||
|
||||
PROJECT_ROOT = osp.dirname(osp.dirname(osp.dirname(osp.abspath(__file__))))
|
||||
if PROJECT_ROOT not in sys.path:
|
||||
sys.path.append(PROJECT_ROOT)
|
||||
|
||||
from lang_agent.config import load_tyro_conf # noqa: E402
|
||||
from lang_agent.config.db_config_manager import DBConfigManager # noqa: E402
|
||||
|
||||
|
||||
@dataclass
|
||||
class MigrationPayload:
|
||||
config_path: str
|
||||
pipeline_id: str
|
||||
graph_id: str
|
||||
prompt_dict: Dict[str, str]
|
||||
tool_keys: List[str]
|
||||
api_key: Optional[str]
|
||||
|
||||
|
||||
def _infer_pipeline_id(pipeline_conf, config_path: str) -> str:
|
||||
candidates = [
|
||||
getattr(pipeline_conf, "pipeline_id", None),
|
||||
getattr(getattr(pipeline_conf, "graph_config", None), "pipeline_id", None),
|
||||
]
|
||||
for candidate in candidates:
|
||||
if candidate is None:
|
||||
continue
|
||||
value = str(candidate).strip()
|
||||
if value and value.lower() != "null":
|
||||
return value
|
||||
return osp.splitext(osp.basename(config_path))[0]
|
||||
|
||||
|
||||
def _infer_graph_id(graph_conf) -> str:
|
||||
if graph_conf is None:
|
||||
return "unknown"
|
||||
class_name = graph_conf.__class__.__name__.lower()
|
||||
if "routing" in class_name or class_name == "routeconfig":
|
||||
return "routing"
|
||||
if "react" in class_name:
|
||||
return "react"
|
||||
|
||||
target = getattr(graph_conf, "_target", None)
|
||||
if target is not None:
|
||||
target_name = getattr(target, "__name__", str(target)).lower()
|
||||
if "routing" in target_name:
|
||||
return "routing"
|
||||
if "react" in target_name:
|
||||
return "react"
|
||||
return "unknown"
|
||||
|
||||
|
||||
def _extract_tool_keys(graph_conf) -> List[str]:
|
||||
if graph_conf is None:
|
||||
return []
|
||||
tool_cfg = getattr(graph_conf, "tool_manager_config", None)
|
||||
client_cfg = getattr(tool_cfg, "client_tool_manager", None)
|
||||
keys = getattr(client_cfg, "tool_keys", None)
|
||||
if not keys:
|
||||
return []
|
||||
out: List[str] = []
|
||||
seen = set()
|
||||
for key in keys:
|
||||
cleaned = str(key).strip()
|
||||
if not cleaned or cleaned in seen:
|
||||
continue
|
||||
seen.add(cleaned)
|
||||
out.append(cleaned)
|
||||
return out
|
||||
|
||||
|
||||
def _load_prompt_dict(prompt_path: str, default_key: str = "sys_prompt") -> Dict[str, str]:
|
||||
if not prompt_path:
|
||||
return {}
|
||||
if not osp.exists(prompt_path):
|
||||
return {}
|
||||
|
||||
if osp.isdir(prompt_path):
|
||||
prompt_files = sorted(
|
||||
p for p in glob.glob(osp.join(prompt_path, "*.txt")) if "optional" not in p
|
||||
)
|
||||
out = {}
|
||||
for prompt_f in prompt_files:
|
||||
key = osp.splitext(osp.basename(prompt_f))[0]
|
||||
with open(prompt_f, "r", encoding="utf-8") as f:
|
||||
out[key] = f.read()
|
||||
return out
|
||||
|
||||
if prompt_path.endswith(".json"):
|
||||
with open(prompt_path, "r", encoding="utf-8") as f:
|
||||
obj = commentjson.load(f)
|
||||
if not isinstance(obj, dict):
|
||||
return {}
|
||||
return {str(k): v if isinstance(v, str) else str(v) for k, v in obj.items()}
|
||||
|
||||
if prompt_path.endswith(".txt"):
|
||||
with open(prompt_path, "r", encoding="utf-8") as f:
|
||||
return {default_key: f.read()}
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
def _extract_prompt_dict(graph_conf) -> Dict[str, str]:
|
||||
if graph_conf is None:
|
||||
return {}
|
||||
if hasattr(graph_conf, "sys_prompt_f"):
|
||||
return _load_prompt_dict(str(getattr(graph_conf, "sys_prompt_f")), "sys_prompt")
|
||||
if hasattr(graph_conf, "sys_promp_dir"):
|
||||
return _load_prompt_dict(str(getattr(graph_conf, "sys_promp_dir")))
|
||||
return {}
|
||||
|
||||
|
||||
def _extract_tool_node_prompt_dict(graph_conf) -> Dict[str, str]:
|
||||
tool_node_conf = getattr(graph_conf, "tool_node_config", None)
|
||||
if tool_node_conf is None:
|
||||
return {}
|
||||
|
||||
out: Dict[str, str] = {}
|
||||
if hasattr(tool_node_conf, "tool_prompt_f"):
|
||||
out.update(
|
||||
_load_prompt_dict(str(getattr(tool_node_conf, "tool_prompt_f")), "tool_prompt")
|
||||
)
|
||||
if hasattr(tool_node_conf, "chatty_sys_prompt_f"):
|
||||
out.update(
|
||||
_load_prompt_dict(
|
||||
str(getattr(tool_node_conf, "chatty_sys_prompt_f")), "chatty_prompt"
|
||||
)
|
||||
)
|
||||
return out
|
||||
|
||||
|
||||
def _prompt_key_whitelist(graph_conf, graph_id: str) -> Optional[set]:
|
||||
if graph_id == "react":
|
||||
return {"sys_prompt"}
|
||||
if graph_id != "routing":
|
||||
return None
|
||||
|
||||
allowed = {"route_prompt", "chat_prompt", "tool_prompt"}
|
||||
tool_node_conf = getattr(graph_conf, "tool_node_config", None)
|
||||
if tool_node_conf is None:
|
||||
return allowed
|
||||
|
||||
cls_name = tool_node_conf.__class__.__name__.lower()
|
||||
target = getattr(tool_node_conf, "_target", None)
|
||||
target_name = getattr(target, "__name__", str(target)).lower() if target else ""
|
||||
if "chatty" in cls_name or "chatty" in target_name:
|
||||
allowed.add("chatty_prompt")
|
||||
return allowed
|
||||
|
||||
|
||||
def _collect_payload(config_path: str) -> MigrationPayload:
|
||||
conf = load_tyro_conf(config_path)
|
||||
graph_conf = getattr(conf, "graph_config", None)
|
||||
graph_id = _infer_graph_id(graph_conf)
|
||||
prompt_dict = _extract_prompt_dict(graph_conf)
|
||||
prompt_dict.update(_extract_tool_node_prompt_dict(graph_conf))
|
||||
whitelist = _prompt_key_whitelist(graph_conf, graph_id)
|
||||
if whitelist is not None:
|
||||
prompt_dict = {k: v for k, v in prompt_dict.items() if k in whitelist}
|
||||
return MigrationPayload(
|
||||
config_path=config_path,
|
||||
pipeline_id=_infer_pipeline_id(conf, config_path),
|
||||
graph_id=graph_id,
|
||||
prompt_dict=prompt_dict,
|
||||
tool_keys=_extract_tool_keys(graph_conf),
|
||||
api_key=getattr(conf, "api_key", None),
|
||||
)
|
||||
|
||||
|
||||
def _resolve_config_paths(config_dir: str, config_paths: Optional[Iterable[str]]) -> List[str]:
|
||||
if config_paths:
|
||||
resolved = [osp.abspath(path) for path in config_paths]
|
||||
else:
|
||||
pattern = osp.join(osp.abspath(config_dir), "*.yaml")
|
||||
resolved = sorted(glob.glob(pattern))
|
||||
return [path for path in resolved if osp.exists(path)]
|
||||
|
||||
|
||||
def _ensure_prompt_set(
|
||||
conn: psycopg.Connection,
|
||||
pipeline_id: str,
|
||||
graph_id: str,
|
||||
set_name: str,
|
||||
description: str,
|
||||
) -> str:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT id FROM prompt_sets
|
||||
WHERE pipeline_id = %s AND name = %s
|
||||
ORDER BY updated_at DESC, created_at DESC
|
||||
LIMIT 1
|
||||
""",
|
||||
(pipeline_id, set_name),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if row is not None:
|
||||
return str(row[0])
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO prompt_sets (pipeline_id, graph_id, name, description, is_active, list)
|
||||
VALUES (%s, %s, %s, %s, false, '')
|
||||
RETURNING id
|
||||
""",
|
||||
(pipeline_id, graph_id, set_name, description),
|
||||
)
|
||||
created = cur.fetchone()
|
||||
return str(created[0])
|
||||
|
||||
|
||||
def _activate_prompt_set(conn: psycopg.Connection, pipeline_id: str, prompt_set_id: str) -> None:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"UPDATE prompt_sets SET is_active = false, updated_at = now() WHERE pipeline_id = %s",
|
||||
(pipeline_id,),
|
||||
)
|
||||
cur.execute(
|
||||
"UPDATE prompt_sets SET is_active = true, updated_at = now() WHERE id = %s",
|
||||
(prompt_set_id,),
|
||||
)
|
||||
|
||||
|
||||
def _run_migration(
|
||||
payloads: List[MigrationPayload],
|
||||
set_name: str,
|
||||
description: str,
|
||||
dry_run: bool,
|
||||
activate: bool,
|
||||
) -> None:
|
||||
for payload in payloads:
|
||||
print(
|
||||
f"[PLAN] pipeline={payload.pipeline_id} graph={payload.graph_id} "
|
||||
f"prompts={len(payload.prompt_dict)} tools={len(payload.tool_keys)} "
|
||||
f"config={payload.config_path}"
|
||||
)
|
||||
if dry_run:
|
||||
continue
|
||||
|
||||
manager = DBConfigManager()
|
||||
with psycopg.connect(manager.conn_str) as conn:
|
||||
prompt_set_id = _ensure_prompt_set(
|
||||
conn=conn,
|
||||
pipeline_id=payload.pipeline_id,
|
||||
graph_id=payload.graph_id,
|
||||
set_name=set_name,
|
||||
description=description,
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
manager.set_config(
|
||||
pipeline_id=payload.pipeline_id,
|
||||
graph_id=payload.graph_id,
|
||||
prompt_set_id=prompt_set_id,
|
||||
tool_list=payload.tool_keys,
|
||||
prompt_dict=payload.prompt_dict,
|
||||
api_key=payload.api_key,
|
||||
)
|
||||
|
||||
if activate:
|
||||
_activate_prompt_set(
|
||||
conn=conn,
|
||||
pipeline_id=payload.pipeline_id,
|
||||
prompt_set_id=prompt_set_id,
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
print(
|
||||
f"[DONE] pipeline={payload.pipeline_id} "
|
||||
f"prompt_set={prompt_set_id} activate={activate}"
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
date_str = dt.date.today().isoformat()
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Import prompt definitions from pipeline YAML files into DB prompt_sets."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--config-dir",
|
||||
default=osp.join(PROJECT_ROOT, "configs", "pipelines"),
|
||||
help="Directory containing pipeline YAML files.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--config",
|
||||
action="append",
|
||||
default=[],
|
||||
help="Specific pipeline config yaml path. Can be passed multiple times.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pipeline-id",
|
||||
action="append",
|
||||
default=[],
|
||||
help="If provided, only migrate these pipeline IDs (repeatable).",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--set-name",
|
||||
# default=f"migrated-{date_str}",
|
||||
default="default",
|
||||
help="Prompt set name to create/reuse under each pipeline.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--description",
|
||||
default="Migrated from pipeline YAML prompt files",
|
||||
help="Prompt set description.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Print what would be migrated without writing to DB.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--activate",
|
||||
action="store_true",
|
||||
help="Mark imported set active for each migrated pipeline.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
config_paths = _resolve_config_paths(args.config_dir, args.config)
|
||||
if not config_paths:
|
||||
raise SystemExit("No config files found. Provide --config or --config-dir.")
|
||||
|
||||
requested_pipelines = {p.strip() for p in args.pipeline_id if p.strip()}
|
||||
|
||||
payloads: List[MigrationPayload] = []
|
||||
for config_path in config_paths:
|
||||
payload = _collect_payload(config_path)
|
||||
if requested_pipelines and payload.pipeline_id not in requested_pipelines:
|
||||
continue
|
||||
if not payload.prompt_dict:
|
||||
print(f"[SKIP] no prompts found for config={config_path}")
|
||||
continue
|
||||
payloads.append(payload)
|
||||
|
||||
if not payloads:
|
||||
raise SystemExit("No pipelines matched with prompt content to migrate.")
|
||||
|
||||
_run_migration(
|
||||
payloads=payloads,
|
||||
set_name=args.set_name,
|
||||
description=args.description,
|
||||
dry_run=args.dry_run,
|
||||
activate=args.activate,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
46
tests/test_combined_app.py
Normal file
46
tests/test_combined_app.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import importlib
|
||||
import os
|
||||
import sys
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
os.environ.setdefault("CONN_STR", "postgresql://dummy:dummy@localhost/dummy")
|
||||
|
||||
|
||||
def test_server_dashscope_import_is_cli_safe(monkeypatch):
|
||||
"""
|
||||
Importing server_dashscope should not invoke tyro.cli at module import time.
|
||||
"""
|
||||
import tyro
|
||||
|
||||
monkeypatch.setattr(
|
||||
tyro,
|
||||
"cli",
|
||||
lambda *_args, **_kwargs: (_ for _ in ()).throw(
|
||||
AssertionError("tyro.cli must not run during module import")
|
||||
),
|
||||
)
|
||||
sys.modules.pop("fastapi_server.server_dashscope", None)
|
||||
|
||||
module = importlib.import_module("fastapi_server.server_dashscope")
|
||||
assert module.app is not None
|
||||
assert module.dashscope_router is not None
|
||||
|
||||
|
||||
def test_combined_app_serves_front_and_dashscope_routes():
|
||||
from fastapi_server.combined import app
|
||||
|
||||
client = TestClient(app)
|
||||
|
||||
# front_apis route should be available.
|
||||
front_resp = client.get("/v1/pipelines/graphs")
|
||||
assert front_resp.status_code == 200, front_resp.text
|
||||
assert "available_graphs" in front_resp.json()
|
||||
|
||||
# DashScope route should exist at the same path (missing auth should not be 404).
|
||||
dash_resp = client.post(
|
||||
"/api/v1/apps/blueberry/sessions/test-session/responses",
|
||||
json={"input": {"prompt": "hello"}, "stream": False},
|
||||
)
|
||||
assert dash_resp.status_code != 404, dash_resp.text
|
||||
|
||||
@@ -30,7 +30,7 @@ except Exception as e:
|
||||
|
||||
|
||||
# <<< Paste your running FastAPI base url here >>>
|
||||
BASE_URL = os.getenv("DS_BASE_URL", "http://127.0.0.1:8588/api/")
|
||||
BASE_URL = os.getenv("DS_BASE_URL", "http://127.0.0.1:8500/api/")
|
||||
|
||||
|
||||
# Params
|
||||
|
||||
@@ -1,13 +1,18 @@
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import importlib
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
os.environ.setdefault("CONN_STR", "postgresql://dummy:dummy@localhost/dummy")
|
||||
|
||||
import fastapi_server.front_apis as front_apis
|
||||
try:
|
||||
front_apis = importlib.import_module("lang_agent.fastapi_server.front_apis")
|
||||
except ModuleNotFoundError:
|
||||
front_apis = importlib.import_module("fastapi_server.front_apis")
|
||||
|
||||
|
||||
def _fake_build_fn(
|
||||
@@ -20,7 +25,7 @@ def _fake_build_fn(
|
||||
):
|
||||
out_dir = Path(pipeline_config_dir)
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
out_file = out_dir / f"{pipeline_id}.yml"
|
||||
out_file = out_dir / f"{pipeline_id}.yaml"
|
||||
out_file.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
@@ -36,9 +41,101 @@ def _fake_build_fn(
|
||||
return {"path": str(out_file)}
|
||||
|
||||
|
||||
class _FakeCursor:
|
||||
def __init__(self, rows):
|
||||
self._rows = rows
|
||||
self._result = []
|
||||
self._last_sql = ""
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def execute(self, sql, params=None):
|
||||
self._last_sql = sql
|
||||
query = " ".join(sql.split()).lower()
|
||||
params = params or ()
|
||||
|
||||
if "group by conversation_id, pipeline_id" in query:
|
||||
pipeline_id = params[0]
|
||||
limit = int(params[1])
|
||||
grouped = {}
|
||||
for row in self._rows:
|
||||
if row["pipeline_id"] != pipeline_id:
|
||||
continue
|
||||
conv_id = row["conversation_id"]
|
||||
if conv_id not in grouped:
|
||||
grouped[conv_id] = {
|
||||
"conversation_id": conv_id,
|
||||
"pipeline_id": row["pipeline_id"],
|
||||
"message_count": 0,
|
||||
"last_updated": row["created_at"],
|
||||
}
|
||||
grouped[conv_id]["message_count"] += 1
|
||||
if row["created_at"] > grouped[conv_id]["last_updated"]:
|
||||
grouped[conv_id]["last_updated"] = row["created_at"]
|
||||
values = sorted(grouped.values(), key=lambda x: x["last_updated"], reverse=True)
|
||||
self._result = values[:limit]
|
||||
return
|
||||
|
||||
if "select 1 from messages" in query:
|
||||
pipeline_id, conversation_id = params
|
||||
found = any(
|
||||
row["pipeline_id"] == pipeline_id
|
||||
and row["conversation_id"] == conversation_id
|
||||
for row in self._rows
|
||||
)
|
||||
self._result = [{"exists": 1}] if found else []
|
||||
return
|
||||
|
||||
if "order by sequence_number asc" in query:
|
||||
pipeline_id, conversation_id = params
|
||||
self._result = sorted(
|
||||
[
|
||||
{
|
||||
"message_type": row["message_type"],
|
||||
"content": row["content"],
|
||||
"sequence_number": row["sequence_number"],
|
||||
"created_at": row["created_at"],
|
||||
}
|
||||
for row in self._rows
|
||||
if row["pipeline_id"] == pipeline_id
|
||||
and row["conversation_id"] == conversation_id
|
||||
],
|
||||
key=lambda x: x["sequence_number"],
|
||||
)
|
||||
return
|
||||
|
||||
raise AssertionError(f"Unsupported SQL in test fake: {self._last_sql}")
|
||||
|
||||
def fetchall(self):
|
||||
return self._result
|
||||
|
||||
def fetchone(self):
|
||||
if not self._result:
|
||||
return None
|
||||
return self._result[0]
|
||||
|
||||
|
||||
class _FakeConnection:
|
||||
def __init__(self, rows):
|
||||
self._rows = rows
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def cursor(self, row_factory=None):
|
||||
return _FakeCursor(self._rows)
|
||||
|
||||
|
||||
def test_registry_route_lifecycle(monkeypatch, tmp_path):
|
||||
registry_path = tmp_path / "pipeline_registry.json"
|
||||
monkeypatch.setattr(front_apis, "_PIPELINE_REGISTRY_PATH", str(registry_path))
|
||||
monkeypatch.setattr(front_apis, "PIPELINE_REGISTRY_PATH", str(registry_path))
|
||||
monkeypatch.setitem(front_apis.GRAPH_BUILD_FNCS, "routing", _fake_build_fn)
|
||||
|
||||
client = TestClient(front_apis.app)
|
||||
@@ -60,7 +157,7 @@ def test_registry_route_lifecycle(monkeypatch, tmp_path):
|
||||
assert create_data["pipeline_id"] == "xiaozhan"
|
||||
assert create_data["graph_id"] == "routing"
|
||||
assert create_data["llm_name"] == "qwen-plus"
|
||||
assert create_data["reload_required"] is True
|
||||
assert create_data["reload_required"] is False
|
||||
|
||||
list_resp = client.get("/v1/pipelines")
|
||||
assert list_resp.status_code == 200, list_resp.text
|
||||
@@ -91,7 +188,7 @@ def test_registry_route_lifecycle(monkeypatch, tmp_path):
|
||||
|
||||
def test_registry_api_key_policy_lifecycle(monkeypatch, tmp_path):
|
||||
registry_path = tmp_path / "pipeline_registry.json"
|
||||
monkeypatch.setattr(front_apis, "_PIPELINE_REGISTRY_PATH", str(registry_path))
|
||||
monkeypatch.setattr(front_apis, "PIPELINE_REGISTRY_PATH", str(registry_path))
|
||||
monkeypatch.setitem(front_apis.GRAPH_BUILD_FNCS, "routing", _fake_build_fn)
|
||||
|
||||
client = TestClient(front_apis.app)
|
||||
@@ -136,4 +233,95 @@ def test_registry_api_key_policy_lifecycle(monkeypatch, tmp_path):
|
||||
delete_data = delete_resp.json()
|
||||
assert delete_data["api_key"] == "sk-test-key"
|
||||
assert delete_data["status"] == "deleted"
|
||||
assert delete_data["reload_required"] is True
|
||||
assert delete_data["reload_required"] is False
|
||||
|
||||
|
||||
def test_pipeline_conversation_routes(monkeypatch):
|
||||
now = datetime.now(timezone.utc)
|
||||
rows = [
|
||||
{
|
||||
"conversation_id": "agent-a:conv-1",
|
||||
"pipeline_id": "agent-a",
|
||||
"message_type": "human",
|
||||
"content": "hello",
|
||||
"sequence_number": 1,
|
||||
"created_at": now - timedelta(seconds=30),
|
||||
},
|
||||
{
|
||||
"conversation_id": "agent-a:conv-1",
|
||||
"pipeline_id": "agent-a",
|
||||
"message_type": "ai",
|
||||
"content": "hi there",
|
||||
"sequence_number": 2,
|
||||
"created_at": now - timedelta(seconds=20),
|
||||
},
|
||||
{
|
||||
"conversation_id": "agent-a:conv-2",
|
||||
"pipeline_id": "agent-a",
|
||||
"message_type": "human",
|
||||
"content": "second thread",
|
||||
"sequence_number": 1,
|
||||
"created_at": now - timedelta(seconds=10),
|
||||
},
|
||||
{
|
||||
"conversation_id": "agent-b:conv-9",
|
||||
"pipeline_id": "agent-b",
|
||||
"message_type": "human",
|
||||
"content": "other pipeline",
|
||||
"sequence_number": 1,
|
||||
"created_at": now - timedelta(seconds=5),
|
||||
},
|
||||
]
|
||||
|
||||
monkeypatch.setenv("CONN_STR", "postgresql://dummy:dummy@localhost/dummy")
|
||||
monkeypatch.setattr(
|
||||
front_apis.psycopg,
|
||||
"connect",
|
||||
lambda _conn_str: _FakeConnection(rows),
|
||||
)
|
||||
|
||||
client = TestClient(front_apis.app)
|
||||
|
||||
list_resp = client.get("/v1/pipelines/agent-a/conversations")
|
||||
assert list_resp.status_code == 200, list_resp.text
|
||||
list_data = list_resp.json()
|
||||
assert list_data["pipeline_id"] == "agent-a"
|
||||
assert list_data["count"] == 2
|
||||
assert [item["conversation_id"] for item in list_data["items"]] == [
|
||||
"agent-a:conv-2",
|
||||
"agent-a:conv-1",
|
||||
]
|
||||
assert all(item["pipeline_id"] == "agent-a" for item in list_data["items"])
|
||||
|
||||
msg_resp = client.get("/v1/pipelines/agent-a/conversations/agent-a:conv-1/messages")
|
||||
assert msg_resp.status_code == 200, msg_resp.text
|
||||
msg_data = msg_resp.json()
|
||||
assert msg_data["pipeline_id"] == "agent-a"
|
||||
assert msg_data["conversation_id"] == "agent-a:conv-1"
|
||||
assert msg_data["count"] == 2
|
||||
assert [item["message_type"] for item in msg_data["items"]] == ["human", "ai"]
|
||||
assert [item["sequence_number"] for item in msg_data["items"]] == [1, 2]
|
||||
|
||||
|
||||
def test_pipeline_conversation_messages_404(monkeypatch):
|
||||
rows = [
|
||||
{
|
||||
"conversation_id": "agent-b:conv-9",
|
||||
"pipeline_id": "agent-b",
|
||||
"message_type": "human",
|
||||
"content": "other pipeline",
|
||||
"sequence_number": 1,
|
||||
"created_at": datetime.now(timezone.utc),
|
||||
},
|
||||
]
|
||||
monkeypatch.setenv("CONN_STR", "postgresql://dummy:dummy@localhost/dummy")
|
||||
monkeypatch.setattr(
|
||||
front_apis.psycopg,
|
||||
"connect",
|
||||
lambda _conn_str: _FakeConnection(rows),
|
||||
)
|
||||
|
||||
client = TestClient(front_apis.app)
|
||||
resp = client.get("/v1/pipelines/agent-a/conversations/agent-b:conv-9/messages")
|
||||
assert resp.status_code == 404, resp.text
|
||||
assert "not found for pipeline 'agent-a'" in resp.json()["detail"]
|
||||
|
||||
113
tests/test_migrate_yaml_prompts_to_db.py
Normal file
113
tests/test_migrate_yaml_prompts_to_db.py
Normal file
@@ -0,0 +1,113 @@
|
||||
import importlib.util
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
|
||||
|
||||
def _load_module():
|
||||
project_root = Path(__file__).resolve().parents[1]
|
||||
script_path = project_root / "scripts" / "py_scripts" / "migrate_yaml_prompts_to_db.py"
|
||||
spec = importlib.util.spec_from_file_location("migrate_yaml_prompts_to_db", script_path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
assert spec.loader is not None
|
||||
sys.modules[spec.name] = module
|
||||
spec.loader.exec_module(module)
|
||||
return module
|
||||
|
||||
|
||||
def test_infer_pipeline_id_falls_back_to_filename():
|
||||
module = _load_module()
|
||||
conf = SimpleNamespace(
|
||||
pipeline_id=None,
|
||||
graph_config=SimpleNamespace(pipeline_id=None),
|
||||
)
|
||||
out = module._infer_pipeline_id(conf, "/tmp/blueberry.yaml")
|
||||
assert out == "blueberry"
|
||||
|
||||
|
||||
def test_extract_prompt_dict_for_react_txt(tmp_path):
|
||||
module = _load_module()
|
||||
prompt_f = tmp_path / "sys.txt"
|
||||
prompt_f.write_text("hello react", encoding="utf-8")
|
||||
graph_conf = SimpleNamespace(sys_prompt_f=str(prompt_f))
|
||||
prompt_dict = module._extract_prompt_dict(graph_conf)
|
||||
assert prompt_dict == {"sys_prompt": "hello react"}
|
||||
|
||||
|
||||
def test_extract_prompt_dict_for_routing_dir(tmp_path):
|
||||
module = _load_module()
|
||||
(tmp_path / "route_prompt.txt").write_text("route", encoding="utf-8")
|
||||
(tmp_path / "chat_prompt.txt").write_text("chat", encoding="utf-8")
|
||||
graph_conf = SimpleNamespace(sys_promp_dir=str(tmp_path))
|
||||
prompt_dict = module._extract_prompt_dict(graph_conf)
|
||||
assert prompt_dict["route_prompt"] == "route"
|
||||
assert prompt_dict["chat_prompt"] == "chat"
|
||||
|
||||
|
||||
def test_collect_payload_routing_ignores_chatty_prompt_for_tool_node(tmp_path):
|
||||
module = _load_module()
|
||||
prompt_dir = tmp_path / "prompts"
|
||||
prompt_dir.mkdir()
|
||||
(prompt_dir / "route_prompt.txt").write_text("route", encoding="utf-8")
|
||||
(prompt_dir / "chat_prompt.txt").write_text("chat", encoding="utf-8")
|
||||
(prompt_dir / "tool_prompt.txt").write_text("tool", encoding="utf-8")
|
||||
(prompt_dir / "chatty_prompt.txt").write_text("chatty", encoding="utf-8")
|
||||
|
||||
class RoutingConfig:
|
||||
pass
|
||||
|
||||
class ToolNodeConfig:
|
||||
pass
|
||||
|
||||
graph_conf = RoutingConfig()
|
||||
graph_conf.sys_promp_dir = str(prompt_dir)
|
||||
graph_conf.tool_node_config = ToolNodeConfig()
|
||||
graph_conf.tool_node_config.tool_prompt_f = str(prompt_dir / "tool_prompt.txt")
|
||||
|
||||
conf = SimpleNamespace(
|
||||
pipeline_id=None,
|
||||
api_key="sk",
|
||||
graph_config=graph_conf,
|
||||
)
|
||||
|
||||
module.load_tyro_conf = lambda _: conf
|
||||
payload = module._collect_payload(str(tmp_path / "xiaozhan.yaml"))
|
||||
assert payload.pipeline_id == "xiaozhan"
|
||||
assert set(payload.prompt_dict.keys()) == {"route_prompt", "chat_prompt", "tool_prompt"}
|
||||
assert "chatty_prompt" not in payload.prompt_dict
|
||||
|
||||
|
||||
def test_collect_payload_routing_includes_chatty_prompt_for_chatty_node(tmp_path):
|
||||
module = _load_module()
|
||||
prompt_dir = tmp_path / "prompts"
|
||||
prompt_dir.mkdir()
|
||||
(prompt_dir / "route_prompt.txt").write_text("route", encoding="utf-8")
|
||||
(prompt_dir / "chat_prompt.txt").write_text("chat", encoding="utf-8")
|
||||
(prompt_dir / "tool_prompt.txt").write_text("tool", encoding="utf-8")
|
||||
(prompt_dir / "chatty_prompt.txt").write_text("chatty", encoding="utf-8")
|
||||
|
||||
class RoutingConfig:
|
||||
pass
|
||||
|
||||
class ChattyToolNodeConfig:
|
||||
pass
|
||||
|
||||
graph_conf = RoutingConfig()
|
||||
graph_conf.sys_promp_dir = str(prompt_dir)
|
||||
graph_conf.tool_node_config = ChattyToolNodeConfig()
|
||||
graph_conf.tool_node_config.tool_prompt_f = str(prompt_dir / "tool_prompt.txt")
|
||||
graph_conf.tool_node_config.chatty_sys_prompt_f = str(
|
||||
prompt_dir / "chatty_prompt.txt"
|
||||
)
|
||||
|
||||
conf = SimpleNamespace(
|
||||
pipeline_id="xiaozhan",
|
||||
api_key="sk",
|
||||
graph_config=graph_conf,
|
||||
)
|
||||
|
||||
module.load_tyro_conf = lambda _: conf
|
||||
payload = module._collect_payload(str(tmp_path / "xiaozhan.yaml"))
|
||||
assert payload.pipeline_id == "xiaozhan"
|
||||
assert "chatty_prompt" in payload.prompt_dict
|
||||
|
||||
154
tests/test_server_pipeline_manager_refresh.py
Normal file
154
tests/test_server_pipeline_manager_refresh.py
Normal file
@@ -0,0 +1,154 @@
|
||||
import json
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
|
||||
from lang_agent.components.server_pipeline_manager import ServerPipelineManager
|
||||
|
||||
|
||||
class _DummyPipeline:
|
||||
def __init__(self, model: str):
|
||||
self.model = model
|
||||
|
||||
|
||||
class _DummyConfig:
|
||||
def __init__(self, llm_name: str = "qwen-plus"):
|
||||
self.llm_name = llm_name
|
||||
|
||||
def setup(self):
|
||||
return _DummyPipeline(model=self.llm_name)
|
||||
|
||||
|
||||
def _write_registry(path, pipelines, api_keys=None):
|
||||
content = {"pipelines": pipelines, "api_keys": api_keys or {}}
|
||||
path.write_text(json.dumps(content, indent=2), encoding="utf-8")
|
||||
# Ensure mtime changes reliably on fast CI filesystems.
|
||||
time.sleep(0.01)
|
||||
|
||||
|
||||
def test_refresh_registry_picks_up_new_pipeline(tmp_path):
|
||||
registry_path = tmp_path / "pipeline_registry.json"
|
||||
_write_registry(
|
||||
registry_path,
|
||||
pipelines={
|
||||
"default": {
|
||||
"enabled": True,
|
||||
"config_file": None,
|
||||
"overrides": {"llm_name": "qwen-plus"},
|
||||
}
|
||||
},
|
||||
)
|
||||
manager = ServerPipelineManager(
|
||||
default_pipeline_id="default",
|
||||
default_config=_DummyConfig(),
|
||||
)
|
||||
manager.load_registry(str(registry_path))
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
manager.resolve_pipeline_id(
|
||||
body={"pipeline_id": "blueberry"}, app_id=None, api_key="k1"
|
||||
)
|
||||
assert exc_info.value.status_code == 404
|
||||
|
||||
_write_registry(
|
||||
registry_path,
|
||||
pipelines={
|
||||
"default": {
|
||||
"enabled": True,
|
||||
"config_file": None,
|
||||
"overrides": {"llm_name": "qwen-plus"},
|
||||
},
|
||||
"blueberry": {
|
||||
"enabled": True,
|
||||
"config_file": None,
|
||||
"overrides": {"llm_name": "qwen-max"},
|
||||
},
|
||||
},
|
||||
)
|
||||
changed = manager.refresh_registry_if_needed()
|
||||
assert changed is True
|
||||
|
||||
resolved = manager.resolve_pipeline_id(
|
||||
body={"pipeline_id": "blueberry"}, app_id=None, api_key="k1"
|
||||
)
|
||||
assert resolved == "blueberry"
|
||||
|
||||
|
||||
def test_refresh_registry_invalidates_cache_for_changed_pipeline(tmp_path):
|
||||
registry_path = tmp_path / "pipeline_registry.json"
|
||||
_write_registry(
|
||||
registry_path,
|
||||
pipelines={
|
||||
"blueberry": {
|
||||
"enabled": True,
|
||||
"config_file": None,
|
||||
"overrides": {"llm_name": "qwen-plus"},
|
||||
}
|
||||
},
|
||||
)
|
||||
manager = ServerPipelineManager(
|
||||
default_pipeline_id="blueberry",
|
||||
default_config=_DummyConfig(),
|
||||
)
|
||||
manager.load_registry(str(registry_path))
|
||||
|
||||
first_pipeline, first_model = manager.get_pipeline("blueberry")
|
||||
assert first_model == "qwen-plus"
|
||||
|
||||
_write_registry(
|
||||
registry_path,
|
||||
pipelines={
|
||||
"blueberry": {
|
||||
"enabled": True,
|
||||
"config_file": None,
|
||||
"overrides": {"llm_name": "qwen-max"},
|
||||
}
|
||||
},
|
||||
)
|
||||
changed = manager.refresh_registry_if_needed()
|
||||
assert changed is True
|
||||
|
||||
second_pipeline, second_model = manager.get_pipeline("blueberry")
|
||||
assert second_model == "qwen-max"
|
||||
assert second_pipeline is not first_pipeline
|
||||
|
||||
|
||||
def test_refresh_registry_applies_disabled_state_immediately(tmp_path):
|
||||
registry_path = tmp_path / "pipeline_registry.json"
|
||||
_write_registry(
|
||||
registry_path,
|
||||
pipelines={
|
||||
"blueberry": {
|
||||
"enabled": True,
|
||||
"config_file": None,
|
||||
"overrides": {"llm_name": "qwen-plus"},
|
||||
}
|
||||
},
|
||||
)
|
||||
manager = ServerPipelineManager(
|
||||
default_pipeline_id="blueberry",
|
||||
default_config=_DummyConfig(),
|
||||
)
|
||||
manager.load_registry(str(registry_path))
|
||||
manager.get_pipeline("blueberry")
|
||||
|
||||
_write_registry(
|
||||
registry_path,
|
||||
pipelines={
|
||||
"blueberry": {
|
||||
"enabled": False,
|
||||
"config_file": None,
|
||||
"overrides": {"llm_name": "qwen-plus"},
|
||||
}
|
||||
},
|
||||
)
|
||||
changed = manager.refresh_registry_if_needed()
|
||||
assert changed is True
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
manager.get_pipeline("blueberry")
|
||||
assert exc_info.value.status_code == 403
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user