diff --git a/scripts/run_ci.sh b/scripts/run_ci.sh index 6b7f40071..0d2c76108 100644 --- a/scripts/run_ci.sh +++ b/scripts/run_ci.sh @@ -9,21 +9,54 @@ python -m pip install jsonschema aistudio_sdk==0.2.6 bash build.sh || exit 1 failed_files=() -run_path="$DIR/../test/ci_use" -pushd "$run_path" || exit 1 # 目录不存在时退出 +run_path="$DIR/../test/ci_use/" -for file in test_*; do - if [ -f "$file" ]; then - abs_path=$(realpath "$file") - echo "Running pytest on $abs_path" - if ! python -m pytest -sv "$abs_path"; then - echo "Test failed: $file" - failed_files+=("$file") - fi +# load all test files +for subdir in "$run_path"*/; do + if [ -d "$subdir" ]; then + pushd "$subdir" > /dev/null || continue # into test dir or continue + + # search for test_*.py files + for file in test_*.py; do + if [ -f "$file" ]; then + echo "============================================================" + echo "Running pytest on $(realpath "$file")" + echo "------------------------------------------------------------" + + set +e + timeout 360 python -m pytest --disable-warnings -sv "$file" + exit_code=$? + set -e + + if [ $exit_code -ne 0 ]; then + if [ -f "${subdir%/}/log/workerlog.0" ]; then + echo "---------------- log/workerlog.0 -------------------" + cat "${subdir%/}/log/workerlog.0" + echo "----------------------------------------------------" + fi + + if [ -f "${subdir%/}/server.log" ]; then + echo "---------------- server.log ----------------" + cat "${subdir%/}/server.log" + echo "--------------------------------------------" + fi + + if [ "$exit_code" -eq 1 ] || [ "$exit_code" -eq 124 ]; then + echo "[ERROR] $file 起服务或执行异常,exit_code=$exit_code" + if [ "$exit_code" -eq 124 ]; then + echo "[TIMEOUT] $file 脚本执行超过 6 分钟, 任务超时退出!" + fi + fi + + failed_files+=("$subdir$file") + exit 1 + fi + echo "------------------------------------------------------------" + fi + done + popd > /dev/null # back to test dir fi done -popd - if [ ${#failed_files[@]} -gt 0 ]; then echo "The following tests failed:" diff --git a/test/ci_use/EB_Lite/test_EB_Lite_serving.py b/test/ci_use/EB_Lite/test_EB_Lite_serving.py new file mode 100644 index 000000000..82c9e634e --- /dev/null +++ b/test/ci_use/EB_Lite/test_EB_Lite_serving.py @@ -0,0 +1,323 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import requests +import time +import subprocess +import socket +import os +import signal +import sys +import openai + +# Read ports from environment variables; use default values if not set +FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) +FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) +FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) + +# List of ports to clean before and after tests +PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT] + +def is_port_open(host: str, port: int, timeout=1.0): + """ + Check if a TCP port is open on the given host. + Returns True if connection succeeds, False otherwise. + """ + try: + with socket.create_connection((host, port), timeout): + return True + except Exception: + return False + +def kill_process_on_port(port: int): + """ + Kill processes that are listening on the given port. + Uses `lsof` to find process ids and sends SIGKILL. + """ + try: + output = subprocess.check_output("lsof -i:{} -t".format(port), shell=True).decode().strip() + for pid in output.splitlines(): + os.kill(int(pid), signal.SIGKILL) + print("Killed process on port {}, pid={}".format(port, pid)) + except subprocess.CalledProcessError: + pass + +def clean_ports(): + """ + Kill all processes occupying the ports listed in PORTS_TO_CLEAN. + """ + for port in PORTS_TO_CLEAN: + kill_process_on_port(port) + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server as a subprocess + - Waits for server port to open (up to 30 seconds) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean_ports() + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "ernie-4_5-21b-a3b-bf16-paddle") + else: + model_path = "./ernie-4_5-21b-a3b-bf16-paddle" + + log_path = "server.log" + cmd = [ + sys.executable, "-m", "fastdeploy.entrypoints.openai.api_server", + "--model", model_path, + "--port", str(FD_API_PORT), + "--tensor-parallel-size", "1", + "--engine-worker-queue-port", str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", str(FD_METRICS_PORT), + "--max-model-len", "32768", + "--max-num-seqs", "128", + "--quantization", "wint4", + ] + + # Set environment variables + env = os.environ.copy() + env["ENABLE_FASTDEPLOY_LOAD_MODEL_CONCURRENCY"] = "0" + env["FLAGS_use_append_attn"] = "1" + env["ELLM_DYNAMIC_MODE"] = "1" + env["NCCL_ALGO"] = "Ring" + env["USE_WORKER_V1"] = "1" + + # Start subprocess in new process group + with open(log_path, "w") as logfile: + process = subprocess.Popen( + cmd, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True # Enables killing full group via os.killpg + ) + + # Wait up to 300 seconds for API server to be ready + for _ in range(300): + if is_port_open("127.0.0.1", FD_API_PORT): + print("API server is up on port {}".format(FD_API_PORT)) + break + time.sleep(1) + else: + print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") + try: + os.killpg(process.pid, signal.SIGTERM) + except Exception as e: + print("Failed to kill process group: {}".format(e)) + raise RuntimeError("API server did not start on port {}".format(FD_API_PORT)) + + yield # Run tests + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process.pid, signal.SIGTERM) + print("API server (pid={}) terminated".format(process.pid)) + except Exception as e: + print("Failed to terminate API server: {}".format(e)) + + +@pytest.fixture(scope="session") +def api_url(request): + """ + Returns the API endpoint URL for chat completions. + """ + return "http://0.0.0.0:{}/v1/chat/completions".format(FD_API_PORT) + + +@pytest.fixture(scope="session") +def metrics_url(request): + """ + Returns the metrics endpoint URL. + """ + return "http://0.0.0.0:{}/metrics".format(FD_METRICS_PORT) + + +@pytest.fixture +def headers(): + """ + Returns common HTTP request headers. + """ + return {"Content-Type": "application/json"} + + +@pytest.fixture +def consistent_payload(): + """ + Returns a fixed payload for consistency testing, + including a fixed random seed and temperature. + """ + return { + "messages": [{"role": "user", "content": "用一句话介绍 PaddlePaddle"}], + "temperature": 0.9, + "top_p": 0, # fix top_p to reduce randomness + "seed": 13 # fixed random seed + } + +# ========================== +# Helper function to calculate difference rate between two texts +# ========================== +def calculate_diff_rate(text1, text2): + """ + Calculate the difference rate between two strings + based on the normalized Levenshtein edit distance. + Returns a float in [0,1], where 0 means identical. + """ + if text1 == text2: + return 0.0 + + len1, len2 = len(text1), len(text2) + dp = [[0] * (len2 + 1) for _ in range(len1 + 1)] + + for i in range(len1 + 1): + for j in range(len2 + 1): + if i == 0 or j == 0: + dp[i][j] = i + j + elif text1[i - 1] == text2[j - 1]: + dp[i][j] = dp[i - 1][j - 1] + else: + dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1]) + + edit_distance = dp[len1][len2] + max_len = max(len1, len2) + return edit_distance / max_len if max_len > 0 else 0.0 + +# ========================== +# Consistency test for repeated runs with fixed payload +# ========================== +def test_consistency_between_runs(api_url, headers, consistent_payload): + """ + Test that two runs with the same fixed input produce similar outputs. + """ + # First request + resp1 = requests.post(api_url, headers=headers, json=consistent_payload) + assert resp1.status_code == 200 + result1 = resp1.json() + content1 = result1["choices"][0]["message"]["content"] + + # Second request + resp2 = requests.post(api_url, headers=headers, json=consistent_payload) + assert resp2.status_code == 200 + result2 = resp2.json() + content2 = result2["choices"][0]["message"]["content"] + + # Calculate difference rate + diff_rate = calculate_diff_rate(content1, content2) + + # Verify that the difference rate is below the threshold + assert diff_rate < 0.05, "Output difference too large ({:.4%})".format(diff_rate) + +# ========================== +# OpenAI Client chat.completions Test +# ========================== + +@pytest.fixture +def openai_client(): + ip = "0.0.0.0" + service_http_port = str(FD_API_PORT) + client = openai.Client( + base_url="http://{}:{}/v1".format(ip, service_http_port), + api_key="EMPTY_API_KEY" + ) + return client + +# Non-streaming test +def test_non_streaming_chat(openai_client): + """ + Test non-streaming chat functionality with the local service + """ + response = openai_client.chat.completions.create( + model="default", + messages=[ + {"role": "system", "content": "You are a helpful AI assistant."}, + {"role": "user", "content": "List 3 countries and their capitals."}, + ], + temperature=1, + max_tokens=1024, + stream=False, + ) + + assert hasattr(response, 'choices') + assert len(response.choices) > 0 + assert hasattr(response.choices[0], 'message') + assert hasattr(response.choices[0].message, 'content') + +# Streaming test +def test_streaming_chat(openai_client, capsys): + """ + Test streaming chat functionality with the local service + """ + response = openai_client.chat.completions.create( + model="default", + messages=[ + {"role": "system", "content": "You are a helpful AI assistant."}, + {"role": "user", "content": "List 3 countries and their capitals."}, + {"role": "assistant", "content": "China(Beijing), France(Paris), Australia(Canberra)."}, + {"role": "user", "content": "OK, tell more."}, + ], + temperature=1, + max_tokens=1024, + stream=True, + ) + + output = [] + for chunk in response: + if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): + output.append(chunk.choices[0].delta.content) + assert len(output) > 2 + +# ========================== +# OpenAI Client completions Test +# ========================== + +def test_non_streaming(openai_client): + """ + Test non-streaming chat functionality with the local service + """ + response = openai_client.completions.create( + model="default", + prompt="Hello, how are you?", + temperature=1, + max_tokens=1024, + stream=False, + ) + + # Assertions to check the response structure + assert hasattr(response, 'choices') + assert len(response.choices) > 0 + + +def test_streaming(openai_client, capsys): + """ + Test streaming functionality with the local service + """ + response = openai_client.completions.create( + model="default", + prompt="Hello, how are you?", + temperature=1, + max_tokens=1024, + stream=True, + ) + + # Collect streaming output + output = [] + for chunk in response: + output.append(chunk.choices[0].text) + assert len(output) > 0 \ No newline at end of file diff --git a/test/ci_use/EB_VL_Lite/test_EB_VL_Lite_serving.py b/test/ci_use/EB_VL_Lite/test_EB_VL_Lite_serving.py new file mode 100644 index 000000000..a68313d4d --- /dev/null +++ b/test/ci_use/EB_VL_Lite/test_EB_VL_Lite_serving.py @@ -0,0 +1,331 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import requests +import time +import json +import subprocess +import socket +import os +import signal +import sys +import openai + + +# Read ports from environment variables; use default values if not set +FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) +FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) +FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) + +# List of ports to clean before and after tests +PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT] + +def is_port_open(host: str, port: int, timeout=1.0): + """ + Check if a TCP port is open on the given host. + Returns True if connection succeeds, False otherwise. + """ + try: + with socket.create_connection((host, port), timeout): + return True + except Exception: + return False + +def kill_process_on_port(port: int): + """ + Kill processes that are listening on the given port. + Uses `lsof` to find process ids and sends SIGKILL. + """ + try: + output = subprocess.check_output("lsof -i:{} -t".format(port), shell=True).decode().strip() + for pid in output.splitlines(): + os.kill(int(pid), signal.SIGKILL) + print("Killed process on port {}, pid={}".format(port, pid)) + except subprocess.CalledProcessError: + pass + +def clean_ports(): + """ + Kill all processes occupying the ports listed in PORTS_TO_CLEAN. + """ + for port in PORTS_TO_CLEAN: + kill_process_on_port(port) + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server as a subprocess + - Waits for server port to open (up to 30 seconds) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean_ports() + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path=os.path.join(base_path, "ernie-4_5-vl-28b-a3b-bf16-paddle") + else: + model_path="./ernie-4_5-vl-28b-a3b-bf16-paddle" + + log_path = "server.log" + limit_mm_str = json.dumps({"image": 100, "video": 100}) + + cmd = [ + sys.executable, "-m", "fastdeploy.entrypoints.openai.api_server", + "--model", model_path, + "--port", str(FD_API_PORT), + "--tensor-parallel-size", "1", + "--engine-worker-queue-port", str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", str(FD_METRICS_PORT), + "--enable-mm", + "--max-model-len", "32768", + "--max-num-batched-tokens", "384", + "--max-num-seqs", "128", + "--limit-mm-per-prompt", limit_mm_str, + "--enable-chunked-prefill", + "--kv-cache-ratio", "0.71", + "--quantization", "wint4" + ] + + # Set environment variables + env = os.environ.copy() + env["ENABLE_FASTDEPLOY_LOAD_MODEL_CONCURRENCY"] = "0" + env["NCCL_ALGO"] = "Ring" + + # Start subprocess in new process group + with open(log_path, "w") as logfile: + process = subprocess.Popen( + cmd, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True # Enables killing full group via os.killpg + ) + + # Wait up to 300 seconds for API server to be ready + for _ in range(300): + if is_port_open("127.0.0.1", FD_API_PORT): + print("API server is up on port {}".format(FD_API_PORT)) + break + time.sleep(1) + else: + print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") + try: + os.killpg(process.pid, signal.SIGTERM) + except Exception as e: + print("Failed to kill process group: {}".format(e)) + raise RuntimeError("API server did not start on port {}".format(FD_API_PORT)) + + yield # Run tests + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process.pid, signal.SIGTERM) + print("API server (pid={}) terminated".format(process.pid)) + except Exception as e: + print("Failed to terminate API server: {}".format(e)) + + +@pytest.fixture(scope="session") +def api_url(request): + """ + Returns the API endpoint URL for chat completions. + """ + return "http://0.0.0.0:{}/v1/chat/completions".format(FD_API_PORT) + + +@pytest.fixture(scope="session") +def metrics_url(request): + """ + Returns the metrics endpoint URL. + """ + return "http://0.0.0.0:{}/metrics".format(FD_METRICS_PORT) + + +@pytest.fixture +def headers(): + """ + Returns common HTTP request headers. + """ + return {"Content-Type": "application/json"} + + +@pytest.fixture +def consistent_payload(): + """ + Returns a fixed payload for consistency testing, + including a fixed random seed and temperature. + """ + return { + "messages": [ + {"role": "user", "content": [ + {"type": "image_url", "image_url": {"url": "https://ku.baidu-int.com/vk-assets-ltd/space/2024/09/13/933d1e0a0760498e94ec0f2ccee865e0", "detail": "high"}}, + {"type": "text", "text": "请描述图片内容"} + ]} + ], + "temperature": 0.8, + "top_p": 0, # fix top_p to reduce randomness + "seed": 13 # fixed random seed + } + +# ========================== +# Helper function to calculate difference rate between two texts +# ========================== +def calculate_diff_rate(text1, text2): + """ + Calculate the difference rate between two strings + based on the normalized Levenshtein edit distance. + Returns a float in [0,1], where 0 means identical. + """ + if text1 == text2: + return 0.0 + + len1, len2 = len(text1), len(text2) + dp = [[0] * (len2 + 1) for _ in range(len1 + 1)] + + for i in range(len1 + 1): + for j in range(len2 + 1): + if i == 0 or j == 0: + dp[i][j] = i + j + elif text1[i - 1] == text2[j - 1]: + dp[i][j] = dp[i - 1][j - 1] + else: + dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1]) + + edit_distance = dp[len1][len2] + max_len = max(len1, len2) + return edit_distance / max_len if max_len > 0 else 0.0 + +# ========================== +# Consistency test for repeated runs with fixed payload +# ========================== +def test_consistency_between_runs(api_url, headers, consistent_payload): + """ + Test that two runs with the same fixed input produce similar outputs. + """ + # First request + resp1 = requests.post(api_url, headers=headers, json=consistent_payload) + assert resp1.status_code == 200 + result1 = resp1.json() + content1 = result1["choices"][0]["message"]["content"] + + # Second request + resp2 = requests.post(api_url, headers=headers, json=consistent_payload) + assert resp2.status_code == 200 + result2 = resp2.json() + content2 = result2["choices"][0]["message"]["content"] + + # Calculate difference rate + diff_rate = calculate_diff_rate(content1, content2) + + # Verify that the difference rate is below the threshold + assert diff_rate < 0.05, "Output difference too large ({:.4%})".format(diff_rate) + +# ========================== +# OpenAI Client Chat Completion Test +# ========================== + +@pytest.fixture +def openai_client(): + ip = "0.0.0.0" + service_http_port = str(FD_API_PORT) + client = openai.Client( + base_url = "http://{}:{}/v1".format(ip, service_http_port), + api_key="EMPTY_API_KEY" + ) + return client + +# Non-streaming test +def test_non_streaming_chat(openai_client): + """Test non-streaming chat functionality with the local service""" + response = openai_client.chat.completions.create( + model="default", + messages=[ + { + "role": "system", + "content": "You are a helpful AI assistant." + }, # system不是必需,可选 + { + "role": + "user", + "content": [{ + "type": "image_url", + "image_url": { + "url": + "https://ku.baidu-int.com/vk-assets-ltd/space/2024/09/13/933d1e0a0760498e94ec0f2ccee865e0", + "detail": "high" + } + }, { + "type": "text", + "text": "请描述图片内容" + }] + } + ], + temperature=1, + max_tokens=53, + stream=False, + ) + + assert hasattr(response, 'choices') + assert len(response.choices) > 0 + assert hasattr(response.choices[0], 'message') + assert hasattr(response.choices[0].message, 'content') + +# Streaming test +def test_streaming_chat(openai_client, capsys): + """Test streaming chat functionality with the local service""" + response = openai_client.chat.completions.create( + model="default", + messages=[ + { + "role": "system", + "content": "You are a helpful AI assistant." + }, # system不是必需,可选 + { + "role": "user", + "content": "List 3 countries and their capitals." + }, + { + "role": "assistant", + "content": "China(Beijing), France(Paris), Australia(Canberra)." + }, + { + "role": + "user", + "content": [{ + "type": "image_url", + "image_url": { + "url": + "https://ku.baidu-int.com/vk-assets-ltd/space/2024/09/13/933d1e0a0760498e94ec0f2ccee865e0", + "detail": "high" + } + }, { + "type": "text", + "text": "请描述图片内容" + }] + }, + ], + temperature=1, + max_tokens=512, + stream=True, + ) + + output = [] + for chunk in response: + if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): + output.append(chunk.choices[0].delta.content) + assert len(output) > 2 \ No newline at end of file diff --git a/test/ci_use/test_qwen2_offline.py b/test/ci_use/Qwen2-7B-Instruct_offline/test_Qwen2-7B-Instruct_offline.py similarity index 80% rename from test/ci_use/test_qwen2_offline.py rename to test/ci_use/Qwen2-7B-Instruct_offline/test_Qwen2-7B-Instruct_offline.py index f2443934e..dc7f97070 100644 --- a/test/ci_use/test_qwen2_offline.py +++ b/test/ci_use/Qwen2-7B-Instruct_offline/test_Qwen2-7B-Instruct_offline.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,8 +18,24 @@ from fastdeploy import LLM, SamplingParams import os import subprocess import signal +import time +import socket + FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8313)) +MAX_WAIT_SECONDS = 60 + +def is_port_open(host: str, port: int, timeout=1.0): + """ + Check if a TCP port is open on the given host. + Returns True if connection succeeds, False otherwise. + """ + try: + with socket.create_connection((host, port), timeout): + return True + except Exception: + return False + def format_chat_prompt(messages): """ @@ -49,27 +65,39 @@ def model_path(): else: return "./Qwen2-7B-Instruct" + @pytest.fixture(scope="module") def llm(model_path): """ Fixture to initialize the LLM model with a given model path """ try: - output = subprocess.check_output(f"lsof -i:{FD_ENGINE_QUEUE_PORT} -t", shell=True).decode().strip() + output = subprocess.check_output("lsof -i:{} -t".format(FD_ENGINE_QUEUE_PORT), shell=True).decode().strip() for pid in output.splitlines(): os.kill(int(pid), signal.SIGKILL) - print(f"Killed process on port {FD_ENGINE_QUEUE_PORT}, pid={pid}") + print("Killed process on port {}, pid={}".format(FD_ENGINE_QUEUE_PORT, pid)) except subprocess.CalledProcessError: pass try: + start = time.time() llm = LLM( model=model_path, tensor_parallel_size=1, engine_worker_queue_port=FD_ENGINE_QUEUE_PORT, - max_model_len=4096 + max_model_len=32768, + quantization="wint8" ) - print("Model loaded successfully from {}.".format(model_path)) + + # Wait for the port to be open + wait_start = time.time() + while not is_port_open("127.0.0.1", FD_ENGINE_QUEUE_PORT): + if time.time() - wait_start > MAX_WAIT_SECONDS: + pytest.fail("Model engine did not start within {} seconds on port {}".format( + MAX_WAIT_SECONDS, FD_ENGINE_QUEUE_PORT)) + time.sleep(1) + + print("Model loaded successfully from {} in {:.2f}s.".format(model_path, time.time() - start)) yield llm except Exception: print("Failed to load model from {}.".format(model_path)) @@ -84,8 +112,8 @@ def test_generate_prompts(llm): # Only one prompt enabled for testing currently prompts = [ "请介绍一下中国的四大发明。", - # "太阳和地球之间的距离是多少?", - # "写一首关于春天的古风诗。", + "太阳和地球之间的距离是多少?", + "写一首关于春天的古风诗。", ] sampling_params = SamplingParams( diff --git a/test/ci_use/test_qwen2_serving.py b/test/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py similarity index 55% rename from test/ci_use/test_qwen2_serving.py rename to test/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py index 4f79ea834..76e9bbc38 100644 --- a/test/ci_use/test_qwen2_serving.py +++ b/test/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved. +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,18 +18,18 @@ import time import json from jsonschema import validate import concurrent.futures -import numpy as np import subprocess import socket import os import signal import sys +import openai # Read ports from environment variables; use default values if not set -FD_API_PORT = int(os.getenv("FD_API_PORT", 8189)) -FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8013)) -FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8333)) +FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) +FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) +FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) # List of ports to clean before and after tests PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT] @@ -51,10 +51,10 @@ def kill_process_on_port(port: int): Uses `lsof` to find process ids and sends SIGKILL. """ try: - output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() + output = subprocess.check_output("lsof -i:{} -t".format(port), shell=True).decode().strip() for pid in output.splitlines(): os.kill(int(pid), signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") + print("Killed process on port {}, pid={}".format(port, pid)) except subprocess.CalledProcessError: pass @@ -83,46 +83,58 @@ def setup_and_run_server(): else: model_path="./Qwen2-7B-Instruct" - log_path = "api_server.log" + log_path = "server.log" cmd = [ sys.executable, "-m", "fastdeploy.entrypoints.openai.api_server", "--model", model_path, "--port", str(FD_API_PORT), "--tensor-parallel-size", "1", "--engine-worker-queue-port", str(FD_ENGINE_QUEUE_PORT), - "--metrics-port", str(FD_METRICS_PORT) + "--metrics-port", str(FD_METRICS_PORT), + "--max-model-len", "32768", + "--max-num-seqs", "128", + "--quantization", "wint8" ] + # Start subprocess in new process group with open(log_path, "w") as logfile: - process = subprocess.Popen(cmd, stdout=logfile, stderr=subprocess.STDOUT) + process = subprocess.Popen( + cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True # Enables killing full group via os.killpg + ) - # Wait up to 120 seconds for API server port to become available - for _ in range(120): + # Wait up to 300 seconds for API server to be ready + for _ in range(300): if is_port_open("127.0.0.1", FD_API_PORT): - print(f"API server is up on port {FD_API_PORT}") + print("API server is up on port {}".format(FD_API_PORT)) break time.sleep(1) else: - process.terminate() - raise RuntimeError(f"API server did not start on port {FD_API_PORT}") + print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") + try: + os.killpg(process.pid, signal.SIGTERM) + except Exception as e: + print("Failed to kill process group: {}".format(e)) + raise RuntimeError("API server did not start on port {}".format(FD_API_PORT)) - yield + yield # Run tests - print("Post-test server cleanup...") + print("\n===== Post-test server cleanup... =====") try: - os.kill(process.pid, signal.SIGTERM) - print("API server terminated") + os.killpg(process.pid, signal.SIGTERM) + print("API server (pid={}) terminated".format(process.pid)) except Exception as e: - print(f"Failed to kill server: {e}") + print("Failed to terminate API server: {}".format(e)) - clean_ports() @pytest.fixture(scope="session") def api_url(request): """ Returns the API endpoint URL for chat completions. """ - return f"http://0.0.0.0:{FD_API_PORT}" + "/v1/chat/completions" + return "http://0.0.0.0:{}/v1/chat/completions".format(FD_API_PORT) @pytest.fixture(scope="session") @@ -130,7 +142,7 @@ def metrics_url(request): """ Returns the metrics endpoint URL. """ - return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics" + return "http://0.0.0.0:{}/metrics".format(FD_METRICS_PORT) @pytest.fixture @@ -222,7 +234,6 @@ def calculate_diff_rate(text1, text2): valid_prompts = [ [{"role": "user", "content": "你好"}], [{"role": "user", "content": "用一句话介绍 FastDeploy"}], - [{"role": "user", "content": "今天天气怎么样?"}], ] @pytest.mark.parametrize("messages", valid_prompts) @@ -230,13 +241,10 @@ def test_valid_chat(messages, api_url, headers): """ Test valid chat requests. """ - start = time.time() resp = requests.post(api_url, headers=headers, json={"messages": messages}) - duration = time.time() - start assert resp.status_code == 200 validate(instance=resp.json(), schema=chat_response_schema) - assert duration < 5, "Response too slow:{:.2f}s".format(duration) # ========================== # Consistency test for repeated runs with fixed payload @@ -261,7 +269,7 @@ def test_consistency_between_runs(api_url, headers, consistent_payload): diff_rate = calculate_diff_rate(content1, content2) # Verify that the difference rate is below the threshold - assert diff_rate < 0.05, f"Output difference too large ({diff_rate:.4%})" + assert diff_rate < 0.05, "Output difference too large ({:.4%})".format(diff_rate) # ========================== # Invalid prompt tests @@ -303,7 +311,6 @@ def test_exceed_context_length(api_url, headers): # Check if the response indicates a token limit error or server error (500) try: response_json = resp.json() - print("Response JSON content:", json.dumps(response_json, ensure_ascii=False)[:1000]) except Exception: response_json = {} @@ -311,63 +318,6 @@ def test_exceed_context_length(api_url, headers): assert resp.status_code != 200 or "token" in json.dumps(response_json).lower(), \ "Expected token limit error or similar, but got a normal response: {}".format(response_json) -# ========================== -# ChatTemplate Valid Structure Test -# ========================== - -chat_template_cases = [ - {"template": "chatml", "messages": [{"role": "user", "content": "你是谁?"}]}, - {"template": "llama", "messages": [{"role": "user", "content": "请自我介绍"}]}, - {"template": "alpaca", "messages": [{"role": "user", "content": "介绍一下 FastDeploy"}]}, -] - -@pytest.mark.parametrize("payload", chat_template_cases) -def test_chattemplate_valid(payload, api_url, headers): - """ - Test valid ChatTemplate structures. - """ - resp = requests.post(api_url, headers=headers, json=payload) - assert resp.status_code == 200, "Request failed for template={}".format(payload['template']) - validate(instance=resp.json(), schema=chat_response_schema) - -# ========================== -# ChatTemplate Invalid Structure Test -# ========================== - -invalid_template_cases = [ - {"template": "nonexist", "messages": [{"role": "user", "content": "你好"}]}, - {"template": 123, "messages": [{"role": "user", "content": "你好"}]}, - {"template": "", "messages": [{"role": "user", "content": "你好"}]}, -] - - -@pytest.mark.parametrize("payload", invalid_template_cases) -@pytest.mark.skip(reason="Validation not yet supported; assertion temporarily disabled") -def test_chattemplate_invalid(payload, api_url, headers): - """ - Test invalid ChatTemplate structures. - """ - resp = requests.post(api_url, headers=headers, json=payload) - assert resp.status_code >= 400, "Invalid template should return an error status code" - -# ========================== -# System Role Test -# ========================== - -def test_system_role(api_url, headers): - """ - Test whether the system role can correctly guide model behavior. - """ - messages = [ - {"role": "system", "content": "You are an English translation assistant."}, - {"role": "user", "content": "Please translate: 你好"}, - ] - resp = requests.post(api_url, headers=headers, json={"messages": messages}) - assert resp.status_code == 200 - validate(instance=resp.json(), schema=chat_response_schema) - result = resp.json()["choices"][0]["message"]["content"] - assert "hello" in result.lower() - # ========================== # Multi-turn Conversation Test # ========================== @@ -384,23 +334,9 @@ def test_multi_turn_conversation(api_url, headers): assert resp.status_code == 200 validate(instance=resp.json(), schema=chat_response_schema) -# ========================== -# Simple Performance Test -# ========================== - -def test_simple_perf(api_url, headers): - """ - Send 10 requests to check response stability. - """ - prompts = [{"role": "user", "content": "Introduce FastDeploy."}] - for _ in range(10): - resp = requests.post(api_url, headers=headers, json={"messages": prompts}) - assert resp.status_code == 200 - # ========================== # Concurrent Performance Test # ========================== -@pytest.mark.skip(reason="concurrent is unavailable") def test_concurrent_perf(api_url, headers): """ Send concurrent requests to test stability and response time. @@ -415,11 +351,11 @@ def test_concurrent_perf(api_url, headers): assert resp.status_code == 200 return resp.elapsed.total_seconds() - with concurrent.futures.ThreadPoolExecutor(max_workers=33) as executor: - futures = [executor.submit(send_request) for _ in range(33)] + with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: + futures = [executor.submit(send_request) for _ in range(8)] durations = [f.result() for f in futures] - print("Response time for each request:", durations) + print("\nResponse time for each request:", durations) # ========================== # Metrics Endpoint Test @@ -436,14 +372,11 @@ def test_metrics_endpoint(metrics_url): # Parse Prometheus metrics data metrics_data = resp.text - # print(metrics_data) lines = metrics_data.split("\n") metric_lines = [line for line in lines if not line.startswith("#") and line.strip() != ""] - assert len(metric_lines) > 0, "No valid Prometheus metrics found" - - # Assert specific metric values + # 断言 具体值 num_requests_running_found = False num_requests_waiting_found = False time_to_first_token_seconds_sum_found = False @@ -451,41 +384,185 @@ def test_metrics_endpoint(metrics_url): e2e_request_latency_seconds_sum_found = False request_inference_time_seconds_sum_found = False request_queue_time_seconds_sum_found = False + request_prefill_time_seconds_sum_found = False + request_decode_time_seconds_sum_found = False + prompt_tokens_total_found = False + generation_tokens_total_found = False + request_prompt_tokens_sum_found = False + request_generation_tokens_sum_found = False + gpu_cache_usage_perc_found = False + request_params_max_tokens_sum_found = False + request_success_total_found = False for line in metric_lines: if line.startswith("fastdeploy:num_requests_running"): _, value = line.rsplit(" ", 1) - assert float(value) >= 0, "Invalid value for num_requests_running" + assert float(value) >= 0, "num_requests_running 值错误" num_requests_running_found = True elif line.startswith("fastdeploy:num_requests_waiting"): _, value = line.rsplit(" ", 1) - assert float(value) >= 0, "Invalid value for num_requests_waiting" num_requests_waiting_found = True + assert float(value) >= 0, "num_requests_waiting 值错误" elif line.startswith("fastdeploy:time_to_first_token_seconds_sum"): _, value = line.rsplit(" ", 1) - assert float(value) >= 0, "Invalid value for time_to_first_token_seconds_sum" + assert float(value) >= 0, "time_to_first_token_seconds_sum 值错误" time_to_first_token_seconds_sum_found = True elif line.startswith("fastdeploy:time_per_output_token_seconds_sum"): _, value = line.rsplit(" ", 1) - assert float(value) >= 0, "Invalid value for time_per_output_token_seconds_sum" + assert float(value) >= 0, "time_per_output_token_seconds_sum 值错误" time_per_output_token_seconds_sum_found = True elif line.startswith("fastdeploy:e2e_request_latency_seconds_sum"): _, value = line.rsplit(" ", 1) - assert float(value) >= 0, "Invalid value for e2e_request_latency_seconds_sum" + assert float(value) >= 0, "e2e_request_latency_seconds_sum_found 值错误" e2e_request_latency_seconds_sum_found = True elif line.startswith("fastdeploy:request_inference_time_seconds_sum"): _, value = line.rsplit(" ", 1) - assert float(value) >= 0, "Invalid value for request_inference_time_seconds_sum" + assert float(value) >= 0, "request_inference_time_seconds_sum 值错误" request_inference_time_seconds_sum_found = True elif line.startswith("fastdeploy:request_queue_time_seconds_sum"): _, value = line.rsplit(" ", 1) - assert float(value) >= 0, "Invalid value for request_queue_time_seconds_sum" + assert float(value) >= 0, "request_queue_time_seconds_sum 值错误" request_queue_time_seconds_sum_found = True + elif line.startswith("fastdeploy:request_prefill_time_seconds_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_prefill_time_seconds_sum 值错误" + request_prefill_time_seconds_sum_found = True + elif line.startswith("fastdeploy:request_decode_time_seconds_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_decode_time_seconds_sum 值错误" + request_decode_time_seconds_sum_found = True + elif line.startswith("fastdeploy:prompt_tokens_total"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "prompt_tokens_total 值错误" + prompt_tokens_total_found = True + elif line.startswith("fastdeploy:generation_tokens_total"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "generation_tokens_total 值错误" + generation_tokens_total_found = True + elif line.startswith("fastdeploy:request_prompt_tokens_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_prompt_tokens_sum 值错误" + request_prompt_tokens_sum_found = True + elif line.startswith("fastdeploy:request_generation_tokens_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_generation_tokens_sum 值错误" + request_generation_tokens_sum_found = True + elif line.startswith("fastdeploy:gpu_cache_usage_perc"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "gpu_cache_usage_perc 值错误" + gpu_cache_usage_perc_found = True + elif line.startswith("fastdeploy:request_params_max_tokens_sum"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_params_max_tokens_sum 值错误" + request_params_max_tokens_sum_found = True + elif line.startswith("fastdeploy:request_success_total"): + _, value = line.rsplit(" ", 1) + assert float(value) >= 0, "request_success_total 值错误" + request_success_total_found = True - assert num_requests_running_found, "Missing metric: fastdeploy:num_requests_running" - assert num_requests_waiting_found, "Missing metric: fastdeploy:num_requests_waiting" - assert time_to_first_token_seconds_sum_found, "Missing metric: fastdeploy:time_to_first_token_seconds_sum" - assert time_per_output_token_seconds_sum_found, "Missing metric: fastdeploy:time_per_output_token_seconds_sum" - assert e2e_request_latency_seconds_sum_found, "Missing metric: fastdeploy:e2e_request_latency_seconds_sum" - assert request_inference_time_seconds_sum_found, "Missing metric: fastdeploy:request_inference_time_seconds_sum" - assert request_queue_time_seconds_sum_found, "Missing metric: fastdeploy:request_queue_time_seconds_sum" \ No newline at end of file + assert num_requests_running_found, "缺少 fastdeploy:num_requests_running 指标" + assert num_requests_waiting_found, "缺少 fastdeploy:num_requests_waiting 指标" + assert time_to_first_token_seconds_sum_found, "缺少 fastdeploy:time_to_first_token_seconds_sum 指标" + assert time_per_output_token_seconds_sum_found, "缺少 fastdeploy:time_per_output_token_seconds_sum 指标" + assert e2e_request_latency_seconds_sum_found, "缺少 fastdeploy:e2e_request_latency_seconds_sum_found 指标" + assert request_inference_time_seconds_sum_found, "缺少 fastdeploy:request_inference_time_seconds_sum 指标" + assert request_queue_time_seconds_sum_found, "缺少 fastdeploy:request_queue_time_seconds_sum 指标" + assert request_prefill_time_seconds_sum_found, "缺少 fastdeploy:request_prefill_time_seconds_sum 指标" + assert request_decode_time_seconds_sum_found, "缺少 fastdeploy:request_decode_time_seconds_sum 指标" + assert prompt_tokens_total_found, "缺少 fastdeploy:prompt_tokens_total 指标" + assert generation_tokens_total_found, "缺少 fastdeploy:generation_tokens_total 指标" + assert request_prompt_tokens_sum_found, "缺少 fastdeploy:request_prompt_tokens_sum 指标" + assert request_generation_tokens_sum_found, "缺少 fastdeploy:request_generation_tokens_sum 指标" + assert gpu_cache_usage_perc_found, "缺少 fastdeploy:gpu_cache_usage_perc 指标" + assert request_params_max_tokens_sum_found, "缺少 fastdeploy:request_params_max_tokens_sum 指标" + assert request_success_total_found, "缺少 fastdeploy:request_success_total 指标" + +# ========================== +# OpenAI Client chat.completions Test +# ========================== + +@pytest.fixture +def openai_client(): + ip = "0.0.0.0" + service_http_port = str(FD_API_PORT) + client = openai.Client( + base_url = "http://{}:{}/v1".format(ip, service_http_port), + api_key="EMPTY_API_KEY" + ) + return client + +# Non-streaming test +def test_non_streaming_chat(openai_client): + """Test non-streaming chat functionality with the local service""" + response = openai_client.chat.completions.create( + model="default", + messages=[ + {"role": "system", "content": "You are a helpful AI assistant."}, + {"role": "user", "content": "List 3 countries and their capitals."}, + ], + temperature=1, + max_tokens=1024, + stream=False, + ) + + assert hasattr(response, 'choices') + assert len(response.choices) > 0 + assert hasattr(response.choices[0], 'message') + assert hasattr(response.choices[0].message, 'content') + +# Streaming test +def test_streaming_chat(openai_client, capsys): + """Test streaming chat functionality with the local service""" + response = openai_client.chat.completions.create( + model="default", + messages=[ + {"role": "system", "content": "You are a helpful AI assistant."}, + {"role": "user", "content": "List 3 countries and their capitals."}, + {"role": "assistant", "content": "China(Beijing), France(Paris), Australia(Canberra)."}, + {"role": "user", "content": "OK, tell more."}, + ], + temperature=1, + max_tokens=1024, + stream=True, + ) + + output = [] + for chunk in response: + if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): + output.append(chunk.choices[0].delta.content) + assert len(output) > 2 + +# ========================== +# OpenAI Client completions Test +# ========================== + +def test_non_streaming(openai_client): + """Test non-streaming chat functionality with the local service""" + response = openai_client.completions.create( + model="default", + prompt="Hello, how are you?", + temperature=1, + max_tokens=1024, + stream=False, + ) + + # Assertions to check the response structure + assert hasattr(response, 'choices') + assert len(response.choices) > 0 + + +def test_streaming(openai_client, capsys): + """Test streaming functionality with the local service""" + response = openai_client.completions.create( + model="default", + prompt="Hello, how are you?", + temperature=1, + max_tokens=1024, + stream=True, + ) + + # Collect streaming output + output = [] + for chunk in response: + output.append(chunk.choices[0].text) + assert len(output) > 0 \ No newline at end of file diff --git a/test/ci_use/Qwen3-MoE/test_Qwen3-MoE_serving.py b/test/ci_use/Qwen3-MoE/test_Qwen3-MoE_serving.py new file mode 100644 index 000000000..70f919203 --- /dev/null +++ b/test/ci_use/Qwen3-MoE/test_Qwen3-MoE_serving.py @@ -0,0 +1,283 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import requests +import time +import subprocess +import socket +import os +import signal +import sys + + +# Read ports from environment variables; use default values if not set +FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) +FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) +FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) + +# List of ports to clean before and after tests +PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT] + +def is_port_open(host: str, port: int, timeout=1.0): + """ + Check if a TCP port is open on the given host. + Returns True if connection succeeds, False otherwise. + """ + try: + with socket.create_connection((host, port), timeout): + return True + except Exception: + return False + +def kill_process_on_port(port: int): + """ + Kill processes that are listening on the given port. + Uses `lsof` to find process ids and sends SIGKILL. + """ + try: + output = subprocess.check_output("lsof -i:{} -t".format(port), shell=True).decode().strip() + for pid in output.splitlines(): + os.kill(int(pid), signal.SIGKILL) + print("Killed process on port {}, pid={}".format(port, pid)) + except subprocess.CalledProcessError: + pass + +def clean_ports(): + """ + Kill all processes occupying the ports listed in PORTS_TO_CLEAN. + """ + for port in PORTS_TO_CLEAN: + kill_process_on_port(port) + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server as a subprocess + - Waits for server port to open (up to 30 seconds) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean_ports() + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path=os.path.join(base_path, "Qwen3-30B-A3B") + else: + model_path="./Qwen3-30B-A3B" + + log_path = "server.log" + cmd = [ + sys.executable, "-m", "fastdeploy.entrypoints.openai.api_server", + "--model", model_path, + "--port", str(FD_API_PORT), + "--tensor-parallel-size", "1", + "--engine-worker-queue-port", str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", str(FD_METRICS_PORT), + "--max-model-len", "32768", + "--max-num-seqs", "50", + "--quantization", "wint4" + ] + + # Set environment variables + env = os.environ.copy() + env["ENABLE_FASTDEPLOY_LOAD_MODEL_CONCURRENCY"] = "0" + env["NCCL_ALGO"] = "Ring" + env["FLAG_SAMPLING_CLASS"] = "rejection" + + # Start subprocess in new process group + with open(log_path, "w") as logfile: + process = subprocess.Popen( + cmd, + env=env, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True # Enables killing full group via os.killpg + ) + + # Wait up to 300 seconds for API server to be ready + for _ in range(300): + if is_port_open("127.0.0.1", FD_API_PORT): + print("API server is up on port {}".format(FD_API_PORT)) + break + time.sleep(1) + else: + print("API server failed to start in time. Cleaning up...") + try: + os.killpg(process.pid, signal.SIGTERM) + except Exception as e: + print("Failed to kill process group: {}".format(e)) + raise RuntimeError("API server did not start on port {}".format(FD_API_PORT)) + + yield # Run tests + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process.pid, signal.SIGTERM) + print("API server (pid={}) terminated".format(process.pid)) + except Exception as e: + print("Failed to terminate API server: {}".format(e)) + + +@pytest.fixture(scope="session") +def api_url(request): + """ + Returns the API endpoint URL for chat completions. + """ + return "http://0.0.0.0:{}/v1/chat/completions".format(FD_API_PORT) + + +@pytest.fixture(scope="session") +def metrics_url(request): + """ + Returns the metrics endpoint URL. + """ + return "http://0.0.0.0:{}/metrics".format(FD_METRICS_PORT) + + +@pytest.fixture +def headers(): + """ + Returns common HTTP request headers. + """ + return {"Content-Type": "application/json"} + +@pytest.fixture +def consistent_payload(): + """ + Returns a fixed payload for consistency testing, + including a fixed random seed and temperature. + """ + return { + "messages": [{"role": "user", "content": "用一句话介绍 PaddlePaddle, 30字以内 /no_think"}], + "temperature": 0.8, + "top_p": 0, # fix top_p to reduce randomness + "seed": 13 # fixed random seed + } + +# ========================== +# Helper function to calculate difference rate between two texts +# ========================== +def calculate_diff_rate(text1, text2): + """ + Calculate the difference rate between two strings + based on the normalized Levenshtein edit distance. + Returns a float in [0,1], where 0 means identical. + """ + if text1 == text2: + return 0.0 + + len1, len2 = len(text1), len(text2) + dp = [[0] * (len2 + 1) for _ in range(len1 + 1)] + + for i in range(len1 + 1): + for j in range(len2 + 1): + if i == 0 or j == 0: + dp[i][j] = i + j + elif text1[i - 1] == text2[j - 1]: + dp[i][j] = dp[i - 1][j - 1] + else: + dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1]) + + edit_distance = dp[len1][len2] + max_len = max(len1, len2) + return edit_distance / max_len if max_len > 0 else 0.0 + +# ========================== +# Consistency test for repeated runs with fixed payload +# ========================== +def test_consistency_between_runs(api_url, headers, consistent_payload): + """ + Test that two runs with the same fixed input produce similar outputs. + """ + # First request + resp1 = requests.post(api_url, headers=headers, json=consistent_payload) + assert resp1.status_code == 200 + result1 = resp1.json() + content1 = result1["choices"][0]["message"]["content"] + + # Second request + resp2 = requests.post(api_url, headers=headers, json=consistent_payload) + assert resp2.status_code == 200 + result2 = resp2.json() + content2 = result2["choices"][0]["message"]["content"] + + # Calculate difference rate + diff_rate = calculate_diff_rate(content1, content2) + + # Verify that the difference rate is below the threshold + assert diff_rate < 0.05, "Output difference too large ({:.4%})".format(diff_rate) + +# ========================== +# think Prompt Test +# ========================== + +def test_thinking_prompt(api_url, headers): + """ + Test case to verify normal 'thinking' behavior (no '/no_think' appended). + """ + messages = [ + {"role": "user", "content": "北京天安门在哪里"} + ] + + payload = { + "messages": messages, + "max_tokens": 100, + "temperature": 0.8, + "top_p": 0.01 + } + + resp = requests.post(api_url, headers=headers, json=payload) + assert resp.status_code == 200, "Unexpected status code: {}".format(resp.status_code) + + try: + response_json = resp.json() + except Exception as e: + assert False, "Response is not valid JSON: {}".format(e) + + content = response_json.get("choices", [{}])[0].get("message", {}).get("content", "").lower() + assert "天安门" in content or "北京" in content, "Expected a location-related response with reasoning" + +# ========================== +# no_think Prompt Test +# ========================== + +def test_non_thinking_prompt(api_url, headers): + """ + Test case to verify non-thinking behavior (with '/no_think'). + """ + messages = [ + {"role": "user", "content": "北京天安门在哪里 /no_think"} + ] + + payload = { + "messages": messages, + "max_tokens": 100, + "temperature": 0.8, + "top_p": 0.01 + } + + resp = requests.post(api_url, headers=headers, json=payload) + assert resp.status_code == 200, "Unexpected status code: {}".format(resp.status_code) + + try: + response_json = resp.json() + except Exception as e: + assert False, "Response is not valid JSON: {}".format(e) + + content = response_json.get("choices", [{}])[0].get("message", {}).get("content", "").lower() + assert not any(x in content for x in ["根据", "我认为", "推测", "可能"]), \ + "Expected no reasoning in non-thinking response" \ No newline at end of file