Files
lang-agent/scripts/py_scripts/chat_dashcope.py
2026-03-06 13:51:46 +08:00

142 lines
4.6 KiB
Python

#!/usr/bin/env python3
"""
Simple chat loop to interact with the blueberry pipeline via DashScope-compatible API.
Usage:
python chat_blueberry.py
The script connects to the server running on http://localhost:8500
and uses the API key from the pipeline registry.
"""
import requests
import json
import sys
from typing import Optional
# Configuration from pipeline_registry.json
API_KEY = "sk-6c7091e6a95f404efb2ec30e8f51b897626d670375cdf822d78262f24ab12367"
PIPELINE_ID = "blueberry"
BASE_URL = "http://localhost:8500"
SESSION_ID = "chat-session-1"
def send_message(
message: str,
session_id: str = SESSION_ID,
stream: bool = False,
app_id: str = PIPELINE_ID,
) -> Optional[str]:
"""Send a message to the blueberry pipeline and return the response."""
url = f"{BASE_URL}/v1/apps/{app_id}/sessions/{session_id}/responses"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}
payload = {
"messages": [
{"role": "user", "content": message}
],
"stream": stream,
}
try:
if stream:
# Handle streaming response
response = requests.post(url, headers=headers, json=payload, stream=True)
response.raise_for_status()
accumulated_text = ""
for line in response.iter_lines():
if line:
line_str = line.decode('utf-8')
if line_str.startswith('data: '):
data_str = line_str[6:] # Remove 'data: ' prefix
try:
data = json.loads(data_str)
output = data.get("output", {})
text = output.get("text", "")
if text:
accumulated_text = text
# Print incremental updates (you can modify this behavior)
print(f"\rAssistant: {accumulated_text}", end="", flush=True)
if data.get("is_end", False):
print() # New line after streaming completes
return accumulated_text
except json.JSONDecodeError:
continue
return accumulated_text
else:
# Handle non-streaming response
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
output = data.get("output", {})
return output.get("text", "")
except requests.exceptions.RequestException as e:
print(f"Error sending message: {e}", file=sys.stderr)
if hasattr(e, 'response') and e.response is not None:
try:
error_detail = e.response.json()
print(f"Error details: {error_detail}", file=sys.stderr)
except:
print(f"Response status: {e.response.status_code}", file=sys.stderr)
return None
def main():
"""Main chat loop."""
print("=" * 60)
print(f"Chat with Blueberry Pipeline")
print(f"Pipeline ID: {PIPELINE_ID}")
print(f"Server: {BASE_URL}")
print(f"Session ID: {SESSION_ID}")
print("=" * 60)
print("Type your messages (or 'quit'/'exit' to end, 'stream' to toggle streaming)")
print("Streaming mode is ON by default")
print()
stream_mode = True
while True:
try:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', 'q']:
print("Goodbye!")
break
if user_input.lower() == 'stream':
stream_mode = not stream_mode
print(f"Streaming mode: {'ON' if stream_mode else 'OFF'}")
continue
print("Assistant: ", end="", flush=True)
response = send_message(user_input, stream=stream_mode)
if response is None:
print("(No response received)")
elif not stream_mode:
print(response)
# For streaming, the response is already printed incrementally
print() # Empty line for readability
except KeyboardInterrupt:
print("\n\nGoodbye!")
break
except Exception as e:
print(f"\nError: {e}", file=sys.stderr)
if __name__ == "__main__":
main()