Files
lang-agent/tests/test_stress_servers.py
2026-01-28 14:39:05 +08:00

636 lines
23 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Stress test for server_dashscope.py and server_openai.py
This test measures:
- Maximum concurrent request handling capacity
- Latency metrics (p50, p95, p99, min, max, avg)
- Throughput (requests per second)
- Success/failure rates
Instructions:
1. Start the DashScope server:
uvicorn fastapi_server.server_dashscope:app --host 0.0.0.0 --port 8588
2. Start the OpenAI server:
uvicorn fastapi_server.server_openai:app --host 0.0.0.0 --port 8589
3. Set environment variables:
FAST_AUTH_KEYS=test-key-1,test-key-2
4. Run this test:
pytest tests/test_stress_servers.py -v
or
python tests/test_stress_servers.py [--stream | --no-stream]
Options:
--stream Test only streaming endpoints
--no-stream Test only non-streaming endpoints
(no option) Test both streaming and non-streaming (default)
"""
import os
import sys
import time
import asyncio
import statistics
import argparse
import uuid
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from collections import defaultdict
import httpx
from loguru import logger
# Load environment variables (matching test_dashscope_client.py and test_openai_client.py)
from dotenv import load_dotenv
load_dotenv()
# Server URLs (matching test files)
DS_BASE_URL = os.getenv("DS_BASE_URL", "http://127.0.0.1:8588/api/")
OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "http://127.0.0.1:8589/v1")
# Normalize base URLs (remove trailing slashes)
DASHSCOPE_BASE_URL = DS_BASE_URL.rstrip("/")
OPENAI_BASE_URL = OPENAI_BASE_URL.rstrip("/")
# API Key (matching test files - use first key if comma-separated)
FAST_AUTH_KEYS = os.getenv("FAST_AUTH_KEYS", "test-key")
API_KEY = FAST_AUTH_KEYS.split(",")[0] if FAST_AUTH_KEYS else "test-key"
@dataclass
class RequestResult:
"""Result of a single request."""
success: bool
latency_ms: float
status_code: Optional[int] = None
error: Optional[str] = None
response_size: int = 0
@dataclass
class StressTestResult:
"""Results from a stress test run."""
server_name: str
endpoint: str
concurrency: int
total_requests: int
successful_requests: int
failed_requests: int
latencies_ms: List[float] = field(default_factory=list)
throughput_rps: float = 0.0
duration_seconds: float = 0.0
@property
def success_rate(self) -> float:
"""Calculate success rate as percentage."""
if self.total_requests == 0:
return 0.0
return (self.successful_requests / self.total_requests) * 100
@property
def avg_latency_ms(self) -> float:
"""Calculate average latency."""
if not self.latencies_ms:
return 0.0
return statistics.mean(self.latencies_ms)
@property
def min_latency_ms(self) -> float:
"""Calculate minimum latency."""
if not self.latencies_ms:
return 0.0
return min(self.latencies_ms)
@property
def max_latency_ms(self) -> float:
"""Calculate maximum latency."""
if not self.latencies_ms:
return 0.0
return max(self.latencies_ms)
@property
def p50_latency_ms(self) -> float:
"""Calculate 50th percentile latency."""
if not self.latencies_ms:
return 0.0
return statistics.median(self.latencies_ms)
@property
def p95_latency_ms(self) -> float:
"""Calculate 95th percentile latency."""
if not self.latencies_ms:
return 0.0
return self._percentile(self.latencies_ms, 95)
@property
def p99_latency_ms(self) -> float:
"""Calculate 99th percentile latency."""
if not self.latencies_ms:
return 0.0
return self._percentile(self.latencies_ms, 99)
@staticmethod
def _percentile(data: List[float], percentile: int) -> float:
"""Calculate percentile value."""
sorted_data = sorted(data)
index = (percentile / 100) * (len(sorted_data) - 1)
if index.is_integer():
return sorted_data[int(index)]
lower = sorted_data[int(index)]
upper = sorted_data[int(index) + 1]
return lower + (upper - lower) * (index - int(index))
async def make_dashscope_request(
client: httpx.AsyncClient,
app_id: str = "test-app",
session_id: str = "test-session",
stream: bool = False,
message: str = "Hello, how are you?",
) -> RequestResult:
"""Make a request to the DashScope server."""
# Use /api/v1/... if base URL contains /api/, otherwise /v1/...
# The server supports both endpoints
if "/api" in DASHSCOPE_BASE_URL:
url = f"{DASHSCOPE_BASE_URL}/v1/apps/{app_id}/sessions/{session_id}/responses"
else:
url = f"{DASHSCOPE_BASE_URL}/api/v1/apps/{app_id}/sessions/{session_id}/responses"
headers = {"Authorization": f"Bearer {API_KEY}"}
payload = {
"input": {
"session_id": session_id,
"messages": [
{"role": "user", "content": message}
]
},
"stream": stream,
}
start_time = time.perf_counter()
try:
if stream:
response_size = 0
async with client.stream("POST", url, headers=headers, json=payload, timeout=120.0) as response:
if response.status_code != 200:
error_text = await response.aread()
return RequestResult(
success=False,
latency_ms=(time.perf_counter() - start_time) * 1000,
status_code=response.status_code,
error=f"HTTP {response.status_code}",
)
async for line in response.aiter_lines():
if line.startswith("data: "):
response_size += len(line)
# For stress testing, we can stop after receiving first chunk to measure latency
# Uncomment the break below if you want to measure time-to-first-byte only
# break
else:
response = await client.post(url, headers=headers, json=payload, timeout=60.0)
response_size = len(response.content)
if response.status_code != 200:
return RequestResult(
success=False,
latency_ms=(time.perf_counter() - start_time) * 1000,
status_code=response.status_code,
error=response.text[:200],
)
latency_ms = (time.perf_counter() - start_time) * 1000
return RequestResult(
success=True,
latency_ms=latency_ms,
status_code=200,
response_size=response_size,
)
except Exception as e:
latency_ms = (time.perf_counter() - start_time) * 1000
return RequestResult(
success=False,
latency_ms=latency_ms,
error=str(e)[:200],
)
async def make_openai_request(
client: httpx.AsyncClient,
stream: bool = False,
message: str = "Hello, how are you?",
thread_id: str = "test-thread",
) -> RequestResult:
"""Make a request to the OpenAI-compatible server."""
url = f"{OPENAI_BASE_URL}/v1/chat/completions"
headers = {"Authorization": f"Bearer {API_KEY}"}
payload = {
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": message}
],
"stream": stream,
"thread_id": thread_id,
}
start_time = time.perf_counter()
try:
if stream:
response_size = 0
async with client.stream("POST", url, headers=headers, json=payload, timeout=120.0) as response:
if response.status_code != 200:
error_text = await response.aread()
return RequestResult(
success=False,
latency_ms=(time.perf_counter() - start_time) * 1000,
status_code=response.status_code,
error=f"HTTP {response.status_code}",
)
async for line in response.aiter_lines():
if line.startswith("data: "):
response_size += len(line)
# For stress testing, we can stop after receiving first chunk to measure latency
# Uncomment the break below if you want to measure time-to-first-byte only
# break
else:
response = await client.post(url, headers=headers, json=payload, timeout=60.0)
response_size = len(response.content)
if response.status_code != 200:
return RequestResult(
success=False,
latency_ms=(time.perf_counter() - start_time) * 1000,
status_code=response.status_code,
error=response.text[:200],
)
latency_ms = (time.perf_counter() - start_time) * 1000
return RequestResult(
success=True,
latency_ms=latency_ms,
status_code=200,
response_size=response_size,
)
except Exception as e:
latency_ms = (time.perf_counter() - start_time) * 1000
return RequestResult(
success=False,
latency_ms=latency_ms,
error=str(e)[:200],
)
async def run_stress_test(
request_func,
server_name: str,
endpoint: str,
concurrency: int,
total_requests: int,
stream: bool = False,
) -> StressTestResult:
"""Run a stress test with specified concurrency and total requests."""
logger.info(f"Starting stress test: {server_name} - {endpoint} - Concurrency: {concurrency}, Total: {total_requests}, Stream: {stream}")
# Create semaphore to limit concurrency
semaphore = asyncio.Semaphore(concurrency)
results: List[RequestResult] = []
async def make_request_with_semaphore():
async with semaphore:
return await request_func()
# Create tasks
tasks = [make_request_with_semaphore() for _ in range(total_requests)]
# Run all requests concurrently
start_time = time.perf_counter()
request_results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.perf_counter()
# Process results
for result in request_results:
if isinstance(result, Exception):
results.append(RequestResult(
success=False,
latency_ms=0.0,
error=str(result)[:200],
))
else:
results.append(result)
# Calculate metrics
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
latencies = [r.latency_ms for r in successful]
duration = end_time - start_time
throughput = len(successful) / duration if duration > 0 else 0
return StressTestResult(
server_name=server_name,
endpoint=endpoint,
concurrency=concurrency,
total_requests=total_requests,
successful_requests=len(successful),
failed_requests=len(failed),
latencies_ms=latencies,
throughput_rps=throughput,
duration_seconds=duration,
)
def print_results(result: StressTestResult):
"""Print formatted stress test results."""
print(f"\n{'='*80}")
print(f"STRESS TEST RESULTS: {result.server_name}")
print(f"{'='*80}")
print(f"Endpoint: {result.endpoint}")
print(f"Concurrency: {result.concurrency}")
print(f"Total Requests: {result.total_requests}")
print(f"Successful: {result.successful_requests} ({result.success_rate:.2f}%)")
print(f"Failed: {result.failed_requests}")
print(f"Duration: {result.duration_seconds:.3f}s")
print(f"Throughput: {result.throughput_rps:.2f} req/s")
print(f"\nLatency Metrics (ms):")
print(f" Min: {result.min_latency_ms:.2f}")
print(f" Max: {result.max_latency_ms:.2f}")
print(f" Average: {result.avg_latency_ms:.2f}")
print(f" Median (p50): {result.p50_latency_ms:.2f}")
print(f" p95: {result.p95_latency_ms:.2f}")
print(f" p99: {result.p99_latency_ms:.2f}")
if result.failed_requests > 0:
print(f"\nErrors encountered: {result.failed_requests}")
print(f"{'='*80}\n")
async def test_dashscope_server(stream_mode: Optional[bool] = None):
"""Test the DashScope server with various concurrency levels.
Args:
stream_mode: If True, test only streaming. If False, test only non-streaming.
If None, test both.
"""
print("\n" + "="*80)
print("TESTING DASHSCOPE SERVER")
if stream_mode is True:
print("Mode: Streaming only")
elif stream_mode is False:
print("Mode: Non-streaming only")
else:
print("Mode: Both streaming and non-streaming")
print("="*80)
# Test configurations: (concurrency, total_requests, stream)
all_test_configs = [
(1, 10, False), # Sequential, non-streaming
(5, 25, False), # Low concurrency
(10, 50, False), # Medium concurrency
(20, 100, False), # High concurrency
(50, 200, False), # Very high concurrency
(1, 10, True), # Sequential, streaming
(10, 50, True), # Medium concurrency, streaming
(20, 100, True), # High concurrency, streaming
]
# Filter based on stream_mode
if stream_mode is not None:
test_configs = [cfg for cfg in all_test_configs if cfg[2] == stream_mode]
else:
test_configs = all_test_configs
all_results = []
async with httpx.AsyncClient(timeout=120.0) as client:
for concurrency, total_requests, stream in test_configs:
endpoint = f"/v1/apps/{{app_id}}/sessions/{{session_id}}/responses"
if stream:
endpoint += " (streaming)"
# Create request function with client bound
async def request_func():
# Overall server capacity across many sessions:
# use a unique session_id (thread_id) per request to avoid per-thread contention
session_id = str(uuid.uuid4())
return await make_dashscope_request(
client,
app_id=f"test-app-{concurrency}",
session_id=session_id,
stream=stream,
message=f"Test message for concurrency {concurrency}",
)
result = await run_stress_test(
request_func,
"DashScope Server",
endpoint,
concurrency,
total_requests,
stream,
)
all_results.append(result)
print_results(result)
# Small delay between test runs
await asyncio.sleep(1)
return all_results
async def test_openai_server(stream_mode: Optional[bool] = None):
"""Test the OpenAI-compatible server with various concurrency levels.
Args:
stream_mode: If True, test only streaming. If False, test only non-streaming.
If None, test both.
"""
print("\n" + "="*80)
print("TESTING OPENAI SERVER")
if stream_mode is True:
print("Mode: Streaming only")
elif stream_mode is False:
print("Mode: Non-streaming only")
else:
print("Mode: Both streaming and non-streaming")
print("="*80)
# Test configurations: (concurrency, total_requests, stream)
all_test_configs = [
(1, 10, False), # Sequential, non-streaming
(5, 25, False), # Low concurrency
(10, 50, False), # Medium concurrency
(20, 100, False), # High concurrency
(50, 200, False), # Very high concurrency
(1, 10, True), # Sequential, streaming
(10, 50, True), # Medium concurrency, streaming
(20, 100, True), # High concurrency, streaming
]
# Filter based on stream_mode
if stream_mode is not None:
test_configs = [cfg for cfg in all_test_configs if cfg[2] == stream_mode]
else:
test_configs = all_test_configs
all_results = []
async with httpx.AsyncClient(timeout=120.0) as client:
for concurrency, total_requests, stream in test_configs:
endpoint = "/v1/chat/completions"
if stream:
endpoint += " (streaming)"
# Create request function with client bound
async def request_func():
# Overall server capacity across many sessions:
# use a unique thread_id per request to avoid per-thread contention
thread_id = str(uuid.uuid4())
return await make_openai_request(
client,
stream=stream,
message=f"Test message for concurrency {concurrency}",
thread_id=thread_id,
)
result = await run_stress_test(
request_func,
"OpenAI Server",
endpoint,
concurrency,
total_requests,
stream,
)
all_results.append(result)
print_results(result)
# Small delay between test runs
await asyncio.sleep(1)
return all_results
def print_summary(results: List[StressTestResult], header: str):
"""Print a summary of test results for a single experiment.
Args:
results: List of stress test results
header: Header text to print for this experiment
"""
print("\n" + "="*80)
print(header)
print("="*80)
# Separate streaming and non-streaming results
streaming_results = [r for r in results if "streaming" in r.endpoint]
non_streaming_results = [r for r in results if "streaming" not in r.endpoint]
# Print non-streaming results if available
if non_streaming_results:
print("\nNon-Streaming:")
print(f"{'Concurrency':<15} {'Requests':<12} {'Success %':<12} {'Throughput (req/s)':<20} {'Avg Latency (ms)':<18} {'p95 (ms)':<12} {'p99 (ms)':<12}")
print("-" * 110)
for result in non_streaming_results:
print(f"{result.concurrency:<15} {result.total_requests:<12} {result.success_rate:<11.2f}% "
f"{result.throughput_rps:<20.2f} {result.avg_latency_ms:<18.2f} "
f"{result.p95_latency_ms:<12.2f} {result.p99_latency_ms:<12.2f}")
# Print streaming results if available
if streaming_results:
print("\nStreaming:")
print(f"{'Concurrency':<15} {'Requests':<12} {'Success %':<12} {'Throughput (req/s)':<20} {'Avg Latency (ms)':<18} {'p95 (ms)':<12} {'p99 (ms)':<12}")
print("-" * 110)
for result in streaming_results:
print(f"{result.concurrency:<15} {result.total_requests:<12} {result.success_rate:<11.2f}% "
f"{result.throughput_rps:<20.2f} {result.avg_latency_ms:<18.2f} "
f"{result.p95_latency_ms:<12.2f} {result.p99_latency_ms:<12.2f}")
print("\n" + "="*80)
async def main(stream_mode: Optional[bool] = None):
"""Main function to run all stress tests.
Args:
stream_mode: If True, test only streaming. If False, test only non-streaming.
If None, test both.
"""
print("\n" + "="*80)
print("STRESS TEST FOR FASTAPI SERVERS")
print("="*80)
print(f"DashScope Server URL: {DS_BASE_URL}")
print(f"OpenAI Server URL: {OPENAI_BASE_URL}")
print(f"API Key: {API_KEY[:8]}..." if len(API_KEY) > 8 else f"API Key: {API_KEY}")
if stream_mode is True:
print("Testing Mode: Streaming only")
elif stream_mode is False:
print("Testing Mode: Non-streaming only")
else:
print("Testing Mode: Both streaming and non-streaming")
print("="*80)
# Check if servers are reachable
async with httpx.AsyncClient(timeout=5.0) as client:
try:
# Health endpoint is at root, not under /api/
# Extract base URL without /api/ path
if "/api" in DASHSCOPE_BASE_URL:
base_without_api = DASHSCOPE_BASE_URL.split("/api")[0]
else:
base_without_api = DASHSCOPE_BASE_URL.rstrip("/")
response = await client.get(f"{base_without_api}/health")
if response.status_code != 200:
logger.warning(f"DashScope server health check failed: {response.status_code}")
except Exception as e:
logger.error(f"Cannot reach DashScope server at {DASHSCOPE_BASE_URL}: {e}")
logger.info("Please start the server: uvicorn fastapi_server.server_dashscope:app --host 0.0.0.0 --port 8588")
# try:
# response = await client.get(f"{OPENAI_BASE_URL}/health")
# if response.status_code != 200:
# logger.warning(f"OpenAI server health check failed: {response.status_code}")
# except Exception as e:
# logger.error(f"Cannot reach OpenAI server at {OPENAI_BASE_URL}: {e}")
# logger.info("Please start the server: uvicorn fastapi_server.server_openai:app --host 0.0.0.0 --port 8589")
# Run stress tests
dashscope_results = await test_dashscope_server(stream_mode)
# openai_results = await test_openai_server(stream_mode)
# Print summaries
print_summary(dashscope_results, "DASHSCOPE SERVER SUMMARY")
# print_summary(openai_results, "OPENAI SERVER SUMMARY")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Stress test for FastAPI servers (DashScope and OpenAI compatible)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python tests/test_stress_servers.py # Test both streaming and non-streaming
python tests/test_stress_servers.py --stream # Test only streaming endpoints
python tests/test_stress_servers.py --no-stream # Test only non-streaming endpoints
"""
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--stream",
action="store_true",
help="Test only streaming endpoints"
)
group.add_argument(
"--no-stream",
action="store_true",
dest="no_stream",
help="Test only non-streaming endpoints"
)
args = parser.parse_args()
# Determine stream_mode from arguments
if args.stream:
stream_mode = True
elif args.no_stream:
stream_mode = False
else:
stream_mode = None # Test both
asyncio.run(main(stream_mode))