diff --git a/tests/test_stress_servers.py b/tests/test_stress_servers.py new file mode 100755 index 0000000..a1477c9 --- /dev/null +++ b/tests/test_stress_servers.py @@ -0,0 +1,635 @@ +#!/usr/bin/env python3 +""" +Stress test for server_dashscope.py and server_openai.py + +This test measures: +- Maximum concurrent request handling capacity +- Latency metrics (p50, p95, p99, min, max, avg) +- Throughput (requests per second) +- Success/failure rates + +Instructions: +1. Start the DashScope server: + uvicorn fastapi_server.server_dashscope:app --host 0.0.0.0 --port 8588 +2. Start the OpenAI server: + uvicorn fastapi_server.server_openai:app --host 0.0.0.0 --port 8589 +3. Set environment variables: + FAST_AUTH_KEYS=test-key-1,test-key-2 +4. Run this test: + pytest tests/test_stress_servers.py -v + or + python tests/test_stress_servers.py [--stream | --no-stream] + + Options: + --stream Test only streaming endpoints + --no-stream Test only non-streaming endpoints + (no option) Test both streaming and non-streaming (default) +""" +import os +import sys +import time +import asyncio +import statistics +import argparse +import uuid +from dataclasses import dataclass, field +from typing import List, Optional, Dict +from collections import defaultdict +import httpx +from loguru import logger + +# Load environment variables (matching test_dashscope_client.py and test_openai_client.py) +from dotenv import load_dotenv +load_dotenv() + +# Server URLs (matching test files) +DS_BASE_URL = os.getenv("DS_BASE_URL", "http://127.0.0.1:8588/api/") +OPENAI_BASE_URL = os.getenv("OPENAI_BASE_URL", "http://127.0.0.1:8589/v1") + +# Normalize base URLs (remove trailing slashes) +DASHSCOPE_BASE_URL = DS_BASE_URL.rstrip("/") +OPENAI_BASE_URL = OPENAI_BASE_URL.rstrip("/") + +# API Key (matching test files - use first key if comma-separated) +FAST_AUTH_KEYS = os.getenv("FAST_AUTH_KEYS", "test-key") +API_KEY = FAST_AUTH_KEYS.split(",")[0] if FAST_AUTH_KEYS else "test-key" + + +@dataclass +class RequestResult: + """Result of a single request.""" + success: bool + latency_ms: float + status_code: Optional[int] = None + error: Optional[str] = None + response_size: int = 0 + + +@dataclass +class StressTestResult: + """Results from a stress test run.""" + server_name: str + endpoint: str + concurrency: int + total_requests: int + successful_requests: int + failed_requests: int + latencies_ms: List[float] = field(default_factory=list) + throughput_rps: float = 0.0 + duration_seconds: float = 0.0 + + @property + def success_rate(self) -> float: + """Calculate success rate as percentage.""" + if self.total_requests == 0: + return 0.0 + return (self.successful_requests / self.total_requests) * 100 + + @property + def avg_latency_ms(self) -> float: + """Calculate average latency.""" + if not self.latencies_ms: + return 0.0 + return statistics.mean(self.latencies_ms) + + @property + def min_latency_ms(self) -> float: + """Calculate minimum latency.""" + if not self.latencies_ms: + return 0.0 + return min(self.latencies_ms) + + @property + def max_latency_ms(self) -> float: + """Calculate maximum latency.""" + if not self.latencies_ms: + return 0.0 + return max(self.latencies_ms) + + @property + def p50_latency_ms(self) -> float: + """Calculate 50th percentile latency.""" + if not self.latencies_ms: + return 0.0 + return statistics.median(self.latencies_ms) + + @property + def p95_latency_ms(self) -> float: + """Calculate 95th percentile latency.""" + if not self.latencies_ms: + return 0.0 + return self._percentile(self.latencies_ms, 95) + + @property + def p99_latency_ms(self) -> float: + """Calculate 99th percentile latency.""" + if not self.latencies_ms: + return 0.0 + return self._percentile(self.latencies_ms, 99) + + @staticmethod + def _percentile(data: List[float], percentile: int) -> float: + """Calculate percentile value.""" + sorted_data = sorted(data) + index = (percentile / 100) * (len(sorted_data) - 1) + if index.is_integer(): + return sorted_data[int(index)] + lower = sorted_data[int(index)] + upper = sorted_data[int(index) + 1] + return lower + (upper - lower) * (index - int(index)) + + +async def make_dashscope_request( + client: httpx.AsyncClient, + app_id: str = "test-app", + session_id: str = "test-session", + stream: bool = False, + message: str = "Hello, how are you?", +) -> RequestResult: + """Make a request to the DashScope server.""" + # Use /api/v1/... if base URL contains /api/, otherwise /v1/... + # The server supports both endpoints + if "/api" in DASHSCOPE_BASE_URL: + url = f"{DASHSCOPE_BASE_URL}/v1/apps/{app_id}/sessions/{session_id}/responses" + else: + url = f"{DASHSCOPE_BASE_URL}/api/v1/apps/{app_id}/sessions/{session_id}/responses" + headers = {"Authorization": f"Bearer {API_KEY}"} + payload = { + "input": { + "session_id": session_id, + "messages": [ + {"role": "user", "content": message} + ] + }, + "stream": stream, + } + + start_time = time.perf_counter() + try: + if stream: + response_size = 0 + async with client.stream("POST", url, headers=headers, json=payload, timeout=120.0) as response: + if response.status_code != 200: + error_text = await response.aread() + return RequestResult( + success=False, + latency_ms=(time.perf_counter() - start_time) * 1000, + status_code=response.status_code, + error=f"HTTP {response.status_code}", + ) + async for line in response.aiter_lines(): + if line.startswith("data: "): + response_size += len(line) + # For stress testing, we can stop after receiving first chunk to measure latency + # Uncomment the break below if you want to measure time-to-first-byte only + # break + else: + response = await client.post(url, headers=headers, json=payload, timeout=60.0) + response_size = len(response.content) + if response.status_code != 200: + return RequestResult( + success=False, + latency_ms=(time.perf_counter() - start_time) * 1000, + status_code=response.status_code, + error=response.text[:200], + ) + + latency_ms = (time.perf_counter() - start_time) * 1000 + return RequestResult( + success=True, + latency_ms=latency_ms, + status_code=200, + response_size=response_size, + ) + except Exception as e: + latency_ms = (time.perf_counter() - start_time) * 1000 + return RequestResult( + success=False, + latency_ms=latency_ms, + error=str(e)[:200], + ) + + +async def make_openai_request( + client: httpx.AsyncClient, + stream: bool = False, + message: str = "Hello, how are you?", + thread_id: str = "test-thread", +) -> RequestResult: + """Make a request to the OpenAI-compatible server.""" + url = f"{OPENAI_BASE_URL}/v1/chat/completions" + headers = {"Authorization": f"Bearer {API_KEY}"} + payload = { + "model": "gpt-3.5-turbo", + "messages": [ + {"role": "user", "content": message} + ], + "stream": stream, + "thread_id": thread_id, + } + + start_time = time.perf_counter() + try: + if stream: + response_size = 0 + async with client.stream("POST", url, headers=headers, json=payload, timeout=120.0) as response: + if response.status_code != 200: + error_text = await response.aread() + return RequestResult( + success=False, + latency_ms=(time.perf_counter() - start_time) * 1000, + status_code=response.status_code, + error=f"HTTP {response.status_code}", + ) + async for line in response.aiter_lines(): + if line.startswith("data: "): + response_size += len(line) + # For stress testing, we can stop after receiving first chunk to measure latency + # Uncomment the break below if you want to measure time-to-first-byte only + # break + else: + response = await client.post(url, headers=headers, json=payload, timeout=60.0) + response_size = len(response.content) + if response.status_code != 200: + return RequestResult( + success=False, + latency_ms=(time.perf_counter() - start_time) * 1000, + status_code=response.status_code, + error=response.text[:200], + ) + + latency_ms = (time.perf_counter() - start_time) * 1000 + return RequestResult( + success=True, + latency_ms=latency_ms, + status_code=200, + response_size=response_size, + ) + except Exception as e: + latency_ms = (time.perf_counter() - start_time) * 1000 + return RequestResult( + success=False, + latency_ms=latency_ms, + error=str(e)[:200], + ) + + +async def run_stress_test( + request_func, + server_name: str, + endpoint: str, + concurrency: int, + total_requests: int, + stream: bool = False, +) -> StressTestResult: + """Run a stress test with specified concurrency and total requests.""" + logger.info(f"Starting stress test: {server_name} - {endpoint} - Concurrency: {concurrency}, Total: {total_requests}, Stream: {stream}") + + # Create semaphore to limit concurrency + semaphore = asyncio.Semaphore(concurrency) + results: List[RequestResult] = [] + + async def make_request_with_semaphore(): + async with semaphore: + return await request_func() + + # Create tasks + tasks = [make_request_with_semaphore() for _ in range(total_requests)] + + # Run all requests concurrently + start_time = time.perf_counter() + request_results = await asyncio.gather(*tasks, return_exceptions=True) + end_time = time.perf_counter() + + # Process results + for result in request_results: + if isinstance(result, Exception): + results.append(RequestResult( + success=False, + latency_ms=0.0, + error=str(result)[:200], + )) + else: + results.append(result) + + # Calculate metrics + successful = [r for r in results if r.success] + failed = [r for r in results if not r.success] + latencies = [r.latency_ms for r in successful] + + duration = end_time - start_time + throughput = len(successful) / duration if duration > 0 else 0 + + return StressTestResult( + server_name=server_name, + endpoint=endpoint, + concurrency=concurrency, + total_requests=total_requests, + successful_requests=len(successful), + failed_requests=len(failed), + latencies_ms=latencies, + throughput_rps=throughput, + duration_seconds=duration, + ) + + +def print_results(result: StressTestResult): + """Print formatted stress test results.""" + print(f"\n{'='*80}") + print(f"STRESS TEST RESULTS: {result.server_name}") + print(f"{'='*80}") + print(f"Endpoint: {result.endpoint}") + print(f"Concurrency: {result.concurrency}") + print(f"Total Requests: {result.total_requests}") + print(f"Successful: {result.successful_requests} ({result.success_rate:.2f}%)") + print(f"Failed: {result.failed_requests}") + print(f"Duration: {result.duration_seconds:.3f}s") + print(f"Throughput: {result.throughput_rps:.2f} req/s") + print(f"\nLatency Metrics (ms):") + print(f" Min: {result.min_latency_ms:.2f}") + print(f" Max: {result.max_latency_ms:.2f}") + print(f" Average: {result.avg_latency_ms:.2f}") + print(f" Median (p50): {result.p50_latency_ms:.2f}") + print(f" p95: {result.p95_latency_ms:.2f}") + print(f" p99: {result.p99_latency_ms:.2f}") + + if result.failed_requests > 0: + print(f"\nErrors encountered: {result.failed_requests}") + print(f"{'='*80}\n") + + +async def test_dashscope_server(stream_mode: Optional[bool] = None): + """Test the DashScope server with various concurrency levels. + + Args: + stream_mode: If True, test only streaming. If False, test only non-streaming. + If None, test both. + """ + print("\n" + "="*80) + print("TESTING DASHSCOPE SERVER") + if stream_mode is True: + print("Mode: Streaming only") + elif stream_mode is False: + print("Mode: Non-streaming only") + else: + print("Mode: Both streaming and non-streaming") + print("="*80) + + # Test configurations: (concurrency, total_requests, stream) + all_test_configs = [ + (1, 10, False), # Sequential, non-streaming + (5, 25, False), # Low concurrency + (10, 50, False), # Medium concurrency + (20, 100, False), # High concurrency + (50, 200, False), # Very high concurrency + (1, 10, True), # Sequential, streaming + (10, 50, True), # Medium concurrency, streaming + (20, 100, True), # High concurrency, streaming + ] + + # Filter based on stream_mode + if stream_mode is not None: + test_configs = [cfg for cfg in all_test_configs if cfg[2] == stream_mode] + else: + test_configs = all_test_configs + + all_results = [] + + async with httpx.AsyncClient(timeout=120.0) as client: + for concurrency, total_requests, stream in test_configs: + endpoint = f"/v1/apps/{{app_id}}/sessions/{{session_id}}/responses" + if stream: + endpoint += " (streaming)" + + # Create request function with client bound + async def request_func(): + # Overall server capacity across many sessions: + # use a unique session_id (thread_id) per request to avoid per-thread contention + session_id = str(uuid.uuid4()) + return await make_dashscope_request( + client, + app_id=f"test-app-{concurrency}", + session_id=session_id, + stream=stream, + message=f"Test message for concurrency {concurrency}", + ) + + result = await run_stress_test( + request_func, + "DashScope Server", + endpoint, + concurrency, + total_requests, + stream, + ) + + all_results.append(result) + print_results(result) + + # Small delay between test runs + await asyncio.sleep(1) + + return all_results + + +async def test_openai_server(stream_mode: Optional[bool] = None): + """Test the OpenAI-compatible server with various concurrency levels. + + Args: + stream_mode: If True, test only streaming. If False, test only non-streaming. + If None, test both. + """ + print("\n" + "="*80) + print("TESTING OPENAI SERVER") + if stream_mode is True: + print("Mode: Streaming only") + elif stream_mode is False: + print("Mode: Non-streaming only") + else: + print("Mode: Both streaming and non-streaming") + print("="*80) + + # Test configurations: (concurrency, total_requests, stream) + all_test_configs = [ + (1, 10, False), # Sequential, non-streaming + (5, 25, False), # Low concurrency + (10, 50, False), # Medium concurrency + (20, 100, False), # High concurrency + (50, 200, False), # Very high concurrency + (1, 10, True), # Sequential, streaming + (10, 50, True), # Medium concurrency, streaming + (20, 100, True), # High concurrency, streaming + ] + + # Filter based on stream_mode + if stream_mode is not None: + test_configs = [cfg for cfg in all_test_configs if cfg[2] == stream_mode] + else: + test_configs = all_test_configs + + all_results = [] + + async with httpx.AsyncClient(timeout=120.0) as client: + for concurrency, total_requests, stream in test_configs: + endpoint = "/v1/chat/completions" + if stream: + endpoint += " (streaming)" + + # Create request function with client bound + async def request_func(): + # Overall server capacity across many sessions: + # use a unique thread_id per request to avoid per-thread contention + thread_id = str(uuid.uuid4()) + return await make_openai_request( + client, + stream=stream, + message=f"Test message for concurrency {concurrency}", + thread_id=thread_id, + ) + + result = await run_stress_test( + request_func, + "OpenAI Server", + endpoint, + concurrency, + total_requests, + stream, + ) + + all_results.append(result) + print_results(result) + + # Small delay between test runs + await asyncio.sleep(1) + + return all_results + + +def print_summary(results: List[StressTestResult], header: str): + """Print a summary of test results for a single experiment. + + Args: + results: List of stress test results + header: Header text to print for this experiment + """ + print("\n" + "="*80) + print(header) + print("="*80) + + # Separate streaming and non-streaming results + streaming_results = [r for r in results if "streaming" in r.endpoint] + non_streaming_results = [r for r in results if "streaming" not in r.endpoint] + + # Print non-streaming results if available + if non_streaming_results: + print("\nNon-Streaming:") + print(f"{'Concurrency':<15} {'Requests':<12} {'Success %':<12} {'Throughput (req/s)':<20} {'Avg Latency (ms)':<18} {'p95 (ms)':<12} {'p99 (ms)':<12}") + print("-" * 110) + for result in non_streaming_results: + print(f"{result.concurrency:<15} {result.total_requests:<12} {result.success_rate:<11.2f}% " + f"{result.throughput_rps:<20.2f} {result.avg_latency_ms:<18.2f} " + f"{result.p95_latency_ms:<12.2f} {result.p99_latency_ms:<12.2f}") + + # Print streaming results if available + if streaming_results: + print("\nStreaming:") + print(f"{'Concurrency':<15} {'Requests':<12} {'Success %':<12} {'Throughput (req/s)':<20} {'Avg Latency (ms)':<18} {'p95 (ms)':<12} {'p99 (ms)':<12}") + print("-" * 110) + for result in streaming_results: + print(f"{result.concurrency:<15} {result.total_requests:<12} {result.success_rate:<11.2f}% " + f"{result.throughput_rps:<20.2f} {result.avg_latency_ms:<18.2f} " + f"{result.p95_latency_ms:<12.2f} {result.p99_latency_ms:<12.2f}") + + print("\n" + "="*80) + + +async def main(stream_mode: Optional[bool] = None): + """Main function to run all stress tests. + + Args: + stream_mode: If True, test only streaming. If False, test only non-streaming. + If None, test both. + """ + print("\n" + "="*80) + print("STRESS TEST FOR FASTAPI SERVERS") + print("="*80) + print(f"DashScope Server URL: {DS_BASE_URL}") + print(f"OpenAI Server URL: {OPENAI_BASE_URL}") + print(f"API Key: {API_KEY[:8]}..." if len(API_KEY) > 8 else f"API Key: {API_KEY}") + if stream_mode is True: + print("Testing Mode: Streaming only") + elif stream_mode is False: + print("Testing Mode: Non-streaming only") + else: + print("Testing Mode: Both streaming and non-streaming") + print("="*80) + + # Check if servers are reachable + async with httpx.AsyncClient(timeout=5.0) as client: + try: + # Health endpoint is at root, not under /api/ + # Extract base URL without /api/ path + if "/api" in DASHSCOPE_BASE_URL: + base_without_api = DASHSCOPE_BASE_URL.split("/api")[0] + else: + base_without_api = DASHSCOPE_BASE_URL.rstrip("/") + response = await client.get(f"{base_without_api}/health") + if response.status_code != 200: + logger.warning(f"DashScope server health check failed: {response.status_code}") + except Exception as e: + logger.error(f"Cannot reach DashScope server at {DASHSCOPE_BASE_URL}: {e}") + logger.info("Please start the server: uvicorn fastapi_server.server_dashscope:app --host 0.0.0.0 --port 8588") + + # try: + # response = await client.get(f"{OPENAI_BASE_URL}/health") + # if response.status_code != 200: + # logger.warning(f"OpenAI server health check failed: {response.status_code}") + # except Exception as e: + # logger.error(f"Cannot reach OpenAI server at {OPENAI_BASE_URL}: {e}") + # logger.info("Please start the server: uvicorn fastapi_server.server_openai:app --host 0.0.0.0 --port 8589") + + # Run stress tests + dashscope_results = await test_dashscope_server(stream_mode) + # openai_results = await test_openai_server(stream_mode) + + # Print summaries + print_summary(dashscope_results, "DASHSCOPE SERVER SUMMARY") + # print_summary(openai_results, "OPENAI SERVER SUMMARY") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Stress test for FastAPI servers (DashScope and OpenAI compatible)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python tests/test_stress_servers.py # Test both streaming and non-streaming + python tests/test_stress_servers.py --stream # Test only streaming endpoints + python tests/test_stress_servers.py --no-stream # Test only non-streaming endpoints + """ + ) + group = parser.add_mutually_exclusive_group() + group.add_argument( + "--stream", + action="store_true", + help="Test only streaming endpoints" + ) + group.add_argument( + "--no-stream", + action="store_true", + dest="no_stream", + help="Test only non-streaming endpoints" + ) + + args = parser.parse_args() + + # Determine stream_mode from arguments + if args.stream: + stream_mode = True + elif args.no_stream: + stream_mode = False + else: + stream_mode = None # Test both + + asyncio.run(main(stream_mode)) +