From ce9a49f6bf3aa34770db49796eae9fd15e36c222 Mon Sep 17 00:00:00 2001 From: Juncai <52520497+juncaipeng@users.noreply.github.com> Date: Thu, 27 Nov 2025 14:27:17 +0800 Subject: [PATCH] [PD Disaggregation] Add unittest for splitwise deployment with using rdma (#5189) * Add splitwise deployment with using rdma * clean cuda --- .github/workflows/_unit_test_coverage.yml | 12 +- tests/e2e/test_ernie_03b_pd_router_v0.py | 12 +- ....py => test_ernie_03b_pd_router_v1_ipc.py} | 17 +- tests/e2e/test_ernie_03b_pd_router_v1_rdma.py | 423 ++++++++++++++++++ .../test_ernie_03b_pd_splitwise_scheduler.py | 12 +- tests/e2e/test_ernie_03b_router.py | 125 +----- tests/e2e/test_ernie_21b_mtp.py | 8 +- tests/e2e/utils/get_rdma_nics.sh | 229 ++++++++++ tests/e2e/utils/serving_utils.py | 24 + 9 files changed, 723 insertions(+), 139 deletions(-) rename tests/e2e/{test_ernie_03b_pd_router_v1.py => test_ernie_03b_pd_router_v1_ipc.py} (97%) create mode 100644 tests/e2e/test_ernie_03b_pd_router_v1_rdma.py create mode 100644 tests/e2e/utils/get_rdma_nics.sh diff --git a/.github/workflows/_unit_test_coverage.yml b/.github/workflows/_unit_test_coverage.yml index 37ad42e30..3559cc665 100644 --- a/.github/workflows/_unit_test_coverage.yml +++ b/.github/workflows/_unit_test_coverage.yml @@ -105,6 +105,7 @@ jobs: FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100)) FD_ROUTER_PORT=$((8048 + DEVICE_PORT * 100)) FD_CONNECTOR_PORT=$((8038 + DEVICE_PORT * 100)) + FD_RDMA_PORT=$((8028 + DEVICE_PORT * 100)) echo "Test ENV Parameter:" echo "=========================================================" echo "FLASK_PORT=${FLASK_PORT}" @@ -114,6 +115,7 @@ jobs: echo "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" echo "FD_ROUTER_PORT=${FD_ROUTER_PORT}" echo "FD_CONNECTOR_PORT=${FD_CONNECTOR_PORT}" + echo "FD_RDMA_PORT=${FD_RDMA_PORT}" echo "DEVICES=${DEVICES}" echo "=========================================================" @@ -149,9 +151,15 @@ jobs: docker rm -f ${runner_name} || true fi + export RDMA_DEVICES=$(find /dev/infiniband/uverbs* -maxdepth 1 -not -type d | xargs -I{} echo '--device {}:{}') + docker run --rm --net=host \ --name ${runner_name} \ - --cap-add=SYS_PTRACE --shm-size=64G \ + --cap-add=SYS_PTRACE --cap-add=IPC_LOCK \ + --shm-size=64G \ + ${RDMA_DEVICES} \ + --device=/dev/infiniband/rdma_cm \ + --ulimit memlock=-1:-1 \ -v $(pwd):/workspace -w /workspace \ -v "${CACHE_DIR}/gitconfig:/etc/gitconfig:ro" \ -v "${CACHE_DIR}/.cache:/root/.cache" \ @@ -165,6 +173,8 @@ jobs: -e "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" \ -e "FD_ROUTER_PORT=${FD_ROUTER_PORT}" \ -e "FD_CONNECTOR_PORT=${FD_CONNECTOR_PORT}" \ + -e "FD_RDMA_PORT=${FD_RDMA_PORT}" \ + -e "CLEAN_CUDA=1" \ -e TZ="Asia/Shanghai" \ -e "fd_wheel_url=${fd_wheel_url}" \ -e "BASE_REF=${BASE_REF}" \ diff --git a/tests/e2e/test_ernie_03b_pd_router_v0.py b/tests/e2e/test_ernie_03b_pd_router_v0.py index c8da6adbb..1576a7a36 100644 --- a/tests/e2e/test_ernie_03b_pd_router_v0.py +++ b/tests/e2e/test_ernie_03b_pd_router_v0.py @@ -30,7 +30,7 @@ from utils.serving_utils import ( FD_CACHE_QUEUE_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, - clean_ports, + clean, get_registered_number, ) @@ -64,7 +64,7 @@ def setup_and_run_server(): - Tears down server after all tests finish """ print("Pre-test port cleanup...") - clean_ports(PORTS_TO_CLEAN) + clean(PORTS_TO_CLEAN) print("log dir clean ") if os.path.exists("log_router") and os.path.isdir("log_router"): @@ -111,7 +111,7 @@ def setup_and_run_server(): env_prefill["CUDA_VISIBLE_DEVICES"] = "0" env_prefill["ENABLE_V1_KVCACHE_SCHEDULER"] = "0" env_prefill["FD_LOG_DIR"] = "log_prefill" - prefill_log_path = "server.log" + prefill_log_path = "server_prefill.log" prefill_cmd = [ sys.executable, "-m", @@ -161,7 +161,7 @@ def setup_and_run_server(): env_decode["CUDA_VISIBLE_DEVICES"] = "1" env_decode["ENABLE_V1_KVCACHE_SCHEDULER"] = "0" env_decode["FD_LOG_DIR"] = "log_decode" - decode_log_path = "decode_server.log" + decode_log_path = "server_decode.log" decode_cmd = [ sys.executable, "-m", @@ -216,7 +216,7 @@ def setup_and_run_server(): try: os.killpg(process_prefill.pid, signal.SIGTERM) os.killpg(process_decode.pid, signal.SIGTERM) - clean_ports() + clean() except Exception as e: print(f"Failed to kill process group: {e}") raise RuntimeError(f"API server did not start on port {FD_API_PORT}") @@ -228,7 +228,7 @@ def setup_and_run_server(): os.killpg(process_router.pid, signal.SIGTERM) os.killpg(process_prefill.pid, signal.SIGTERM) os.killpg(process_decode.pid, signal.SIGTERM) - clean_ports(PORTS_TO_CLEAN) + clean(PORTS_TO_CLEAN) print(f"Prefill server (pid={process_prefill.pid}) terminated") print(f"Decode server (pid={process_decode.pid}) terminated") except Exception as e: diff --git a/tests/e2e/test_ernie_03b_pd_router_v1.py b/tests/e2e/test_ernie_03b_pd_router_v1_ipc.py similarity index 97% rename from tests/e2e/test_ernie_03b_pd_router_v1.py rename to tests/e2e/test_ernie_03b_pd_router_v1_ipc.py index b8be6e3fa..baa3ec715 100644 --- a/tests/e2e/test_ernie_03b_pd_router_v1.py +++ b/tests/e2e/test_ernie_03b_pd_router_v1_ipc.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Test splitwise deployment which uses local_scheduler + router, -# and ENABLE_V1_KVCACHE_SCHEDULER is 1 +# Test splitwise deployment: use local_scheduler + router, +# set ENABLE_V1_KVCACHE_SCHEDULER is 1, use ipc to transfer cache. import json import os @@ -30,7 +30,7 @@ from utils.serving_utils import ( FD_CACHE_QUEUE_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, - clean_ports, + clean, get_registered_number, ) @@ -64,7 +64,7 @@ def setup_and_run_server(): - Tears down server after all tests finish """ print("Pre-test port cleanup...") - clean_ports(PORTS_TO_CLEAN) + clean(PORTS_TO_CLEAN) print("log dir clean ") if os.path.exists("log_router") and os.path.isdir("log_router"): @@ -111,7 +111,7 @@ def setup_and_run_server(): env_prefill["CUDA_VISIBLE_DEVICES"] = "0" env_prefill["ENABLE_V1_KVCACHE_SCHEDULER"] = "1" env_prefill["FD_LOG_DIR"] = "log_prefill" - prefill_log_path = "server.log" + prefill_log_path = "prefill.log" prefill_cmd = [ sys.executable, "-m", @@ -161,7 +161,7 @@ def setup_and_run_server(): env_decode["CUDA_VISIBLE_DEVICES"] = "1" env_decode["ENABLE_V1_KVCACHE_SCHEDULER"] = "1" env_decode["FD_LOG_DIR"] = "log_decode" - decode_log_path = "decode_server.log" + decode_log_path = "decode.log" decode_cmd = [ sys.executable, "-m", @@ -214,9 +214,10 @@ def setup_and_run_server(): else: print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") try: + os.killpg(process_router.pid, signal.SIGTERM) os.killpg(process_prefill.pid, signal.SIGTERM) os.killpg(process_decode.pid, signal.SIGTERM) - clean_ports() + clean(PORTS_TO_CLEAN) except Exception as e: print(f"Failed to kill process group: {e}") raise RuntimeError(f"API server did not start on port {FD_API_PORT}") @@ -228,7 +229,7 @@ def setup_and_run_server(): os.killpg(process_router.pid, signal.SIGTERM) os.killpg(process_prefill.pid, signal.SIGTERM) os.killpg(process_decode.pid, signal.SIGTERM) - clean_ports(PORTS_TO_CLEAN) + clean(PORTS_TO_CLEAN) print(f"Prefill server (pid={process_prefill.pid}) terminated") print(f"Decode server (pid={process_decode.pid}) terminated") except Exception as e: diff --git a/tests/e2e/test_ernie_03b_pd_router_v1_rdma.py b/tests/e2e/test_ernie_03b_pd_router_v1_rdma.py new file mode 100644 index 000000000..6aefba964 --- /dev/null +++ b/tests/e2e/test_ernie_03b_pd_router_v1_rdma.py @@ -0,0 +1,423 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Test splitwise deployment: use local_scheduler + router, +# set ENABLE_V1_KVCACHE_SCHEDULER is 1, use rdma to transfer cache. + +import json +import os +import shutil +import signal +import subprocess +import sys +import time + +import pytest +import requests +from utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean, + get_registered_number, +) + +# Read ports from environment variables; use default values if not set +FD_CONNECTOR_PORT = int(os.getenv("FD_CONNECTOR_PORT", 8433)) +FD_ROUTER_PORT = int(os.getenv("FD_ROUTER_PORT", 8533)) +FD_RDMA_PORT = int(os.getenv("FD_RDMA_PORT", 8623)) + +# List of ports to clean before and after tests +PORTS_TO_CLEAN = [ + FD_API_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + FD_CACHE_QUEUE_PORT, + FD_CONNECTOR_PORT, + FD_RDMA_PORT, + FD_API_PORT + 1, + FD_ENGINE_QUEUE_PORT + 1, + FD_METRICS_PORT + 1, + FD_CACHE_QUEUE_PORT + 1, + FD_CONNECTOR_PORT + 1, + FD_RDMA_PORT + 1, + FD_ROUTER_PORT, +] + + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server as a subprocess + - Waits for server port to open (up to 30 seconds) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean(PORTS_TO_CLEAN) + + print("log dir clean ") + if os.path.exists("log_router") and os.path.isdir("log_router"): + shutil.rmtree("log_router") + if os.path.exists("log_prefill") and os.path.isdir("log_prefill"): + shutil.rmtree("log_prefill") + if os.path.exists("log_decode") and os.path.isdir("log_decode"): + shutil.rmtree("log_decode") + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "ERNIE-4.5-0.3B-Paddle") + else: + model_path = "baidu/ERNIE-4.5-0.3B-Paddle" + print(f"model_path: {model_path}") + + # get rdma nics + current_dir = os.path.dirname(os.path.abspath(__file__)) + shell_path = os.path.join(current_dir, "utils/get_rdma_nics.sh") + output = subprocess.check_output(["bash", shell_path, "gpu"], text=True) + _, rdma_nics = output.split("=") + print(f"shell_path: {shell_path}, rdma_nics: {rdma_nics}") + + # router + print("start router...") + env_router = os.environ.copy() + env_router["FD_LOG_DIR"] = "log_router" + router_log_path = "router.log" + + router_cmd = [ + sys.executable, + "-m", + "fastdeploy.router.launch", + "--port", + str(FD_ROUTER_PORT), + "--splitwise", + ] + + with open(router_log_path, "w") as logfile: + process_router = subprocess.Popen( + router_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_router, + ) + + # prefill实例 + print("start prefill...") + env_prefill = os.environ.copy() + env_prefill["CUDA_VISIBLE_DEVICES"] = "0" + env_prefill["FD_LOG_DIR"] = "log_prefill" + env_prefill["KVCACHE_RDMA_NICS"] = rdma_nics + + prefill_log_path = "prefill.log" + prefill_cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), + "--max-model-len", + "8192", + "--splitwise-role", + "prefill", + "--cache-transfer-protocol", + "rdma", + "--rdma-comm-ports", + str(FD_RDMA_PORT), + "--pd-comm-port", + str(FD_CONNECTOR_PORT), + "--router", + f"0.0.0.0:{FD_ROUTER_PORT}", + ] + + # Start subprocess in new process group + with open(prefill_log_path, "w") as logfile: + process_prefill = subprocess.Popen( + prefill_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_prefill, + ) + time.sleep(1) + + # decode实例 + print("start decode...") + env_decode = os.environ.copy() + env_decode["CUDA_VISIBLE_DEVICES"] = "1" + env_decode["FD_LOG_DIR"] = "log_decode" + env_decode["KVCACHE_RDMA_NICS"] = rdma_nics + + decode_log_path = "decode.log" + decode_cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT + 1), + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT + 1), + "--metrics-port", + str(FD_METRICS_PORT + 1), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT + 1), + "--max-model-len", + "8192", + "--splitwise-role", + "decode", + "--cache-transfer-protocol", + "rdma", + "--rdma-comm-ports", + str(FD_RDMA_PORT + 1), + "--pd-comm-port", + str(FD_CONNECTOR_PORT + 1), + "--router", + f"0.0.0.0:{FD_ROUTER_PORT}", + ] + + # Start subprocess in new process group + with open(decode_log_path, "w") as logfile: + process_decode = subprocess.Popen( + decode_cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_decode, + ) + + # Wait up to 300 seconds for API server to be ready + for _ in range(60): + registered_numbers = get_registered_number(f"0.0.0.0:{FD_ROUTER_PORT}") + if registered_numbers["prefill"] >= 1 and registered_numbers["decode"] >= 1: + print("Prefill and decode servers are both online") + break + time.sleep(5) + else: + print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") + try: + os.killpg(process_router.pid, signal.SIGTERM) + os.killpg(process_prefill.pid, signal.SIGTERM) + os.killpg(process_decode.pid, signal.SIGTERM) + clean(PORTS_TO_CLEAN) + except Exception as e: + print(f"Failed to kill process group: {e}") + raise RuntimeError(f"API server did not start on port {FD_API_PORT}") + + yield # Run tests + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process_router.pid, signal.SIGTERM) + os.killpg(process_prefill.pid, signal.SIGTERM) + os.killpg(process_decode.pid, signal.SIGTERM) + clean(PORTS_TO_CLEAN) + print(f"Prefill server (pid={process_prefill.pid}) terminated") + print(f"Decode server (pid={process_decode.pid}) terminated") + except Exception as e: + print(f"Failed to terminate API server: {e}") + + +@pytest.fixture(scope="session") +def api_url(request): + """ + Returns the API endpoint URL for chat completions. + """ + return f"http://0.0.0.0:{FD_ROUTER_PORT}/v1/chat/completions" + + +@pytest.fixture(scope="session") +def metrics_url(request): + """ + Returns the metrics endpoint URL. + """ + return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics" + + +@pytest.fixture +def headers(): + """ + Returns common HTTP request headers. + """ + return {"Content-Type": "application/json"} + + +def test_metrics_config(metrics_url): + timeout = 600 + url = metrics_url.replace("metrics", "config-info") + res = requests.get(url, timeout=timeout) + assert res.status_code == 200 + + +def send_request(url, payload, timeout=600): + """ + 发送请求到指定的URL,并返回响应结果。 + """ + headers = { + "Content-Type": "application/json", + } + + try: + res = requests.post(url, headers=headers, json=payload, timeout=timeout) + print("🟢 接收响应中...\n") + return res + except requests.exceptions.Timeout: + print(f"❌ 请求超时(超过 {timeout} 秒)") + return None + except requests.exceptions.RequestException as e: + print(f"❌ 请求失败:{e}") + return None + + +def get_stream_chunks(response): + """解析流式返回,生成chunk List[dict]""" + chunks = [] + + if response.status_code == 200: + for line in response.iter_lines(decode_unicode=True): + if line: + if line.startswith("data: "): + line = line[len("data: ") :] + + if line.strip() == "[DONE]": + break + + try: + chunk = json.loads(line) + chunks.append(chunk) + except Exception as e: + print(f"解析失败: {e}, 行内容: {line}") + else: + print(f"请求失败,状态码: {response.status_code}") + print("返回内容:", response.text) + + return chunks + + +def test_chat_usage_stream(api_url): + """测试流式chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "max_tokens": 50, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + "metadata": {"min_tokens": 10}, + } + + response = send_request(url=api_url, payload=payload) + chunks = get_stream_chunks(response) + result = "".join([x["choices"][0]["delta"]["content"] for x in chunks[:-1]]) + print("Decode Response:", result) + assert result != "", "结果为空" + usage = chunks[-1]["usage"] + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +def test_chat_usage_non_stream(api_url): + """测试非流式chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "max_tokens": 50, + "stream": False, + "metadata": {"min_tokens": 10}, + } + + response = send_request(url=api_url, payload=payload).json() + usage = response["usage"] + result = response["choices"][0]["message"]["content"] + assert result != "", "结果为空" + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +def test_non_chat_usage_stream(api_url): + """测试流式非chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "prompt": "牛顿的三大运动定律是什么?", + "max_tokens": 50, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + "metadata": {"min_tokens": 10}, + } + api_url = api_url.replace("chat/completions", "completions") + + response = send_request(url=api_url, payload=payload) + chunks = get_stream_chunks(response) + result = "".join([x["choices"][0]["text"] for x in chunks[:-1]]) + print("Decode Response:", result) + assert result != "", "结果为空" + usage = chunks[-1]["usage"] + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +def test_non_chat_usage_non_stream(api_url): + """测试非流式非chat usage""" + payload = { + "model": "default", + "temperature": 0, + "top_p": 0, + "seed": 33, + "prompt": "牛顿的三大运动定律是什么?", + "max_tokens": 50, + "stream": False, + "metadata": {"min_tokens": 10}, + } + api_url = api_url.replace("chat/completions", "completions") + + response = send_request(url=api_url, payload=payload).json() + usage = response["usage"] + result = response["choices"][0]["text"] + print("Decode Response:", result) + assert result != "", "结果为空" + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" diff --git a/tests/e2e/test_ernie_03b_pd_splitwise_scheduler.py b/tests/e2e/test_ernie_03b_pd_splitwise_scheduler.py index b8af9c011..77f32b0d0 100644 --- a/tests/e2e/test_ernie_03b_pd_splitwise_scheduler.py +++ b/tests/e2e/test_ernie_03b_pd_splitwise_scheduler.py @@ -30,7 +30,7 @@ from utils.serving_utils import ( FD_CACHE_QUEUE_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, - clean_ports, + clean, is_port_open, ) @@ -64,7 +64,7 @@ def setup_and_run_server(): - Tears down server after all tests finish """ print("Pre-test port cleanup...") - clean_ports(PORTS_TO_CLEAN) + clean(PORTS_TO_CLEAN) print("log dir clean ") if os.path.exists("log_redis") and os.path.isdir("log_redis"): @@ -109,7 +109,7 @@ def setup_and_run_server(): env_prefill["CUDA_VISIBLE_DEVICES"] = "0" env_prefill["ENABLE_V1_KVCACHE_SCHEDULER"] = "0" env_prefill["FD_LOG_DIR"] = "log_prefill" - prefill_log_path = "server.log" + prefill_log_path = "server_prefill.log" prefill_cmd = [ sys.executable, "-m", @@ -163,7 +163,7 @@ def setup_and_run_server(): env_decode["CUDA_VISIBLE_DEVICES"] = "1" env_decode["ENABLE_V1_KVCACHE_SCHEDULER"] = "0" env_decode["FD_LOG_DIR"] = "log_decode" - decode_log_path = "decode_server.log" + decode_log_path = "server_decode.log" decode_cmd = [ sys.executable, "-m", @@ -222,7 +222,7 @@ def setup_and_run_server(): try: os.killpg(process_prefill.pid, signal.SIGTERM) os.killpg(process_decode.pid, signal.SIGTERM) - clean_ports(PORTS_TO_CLEAN) + clean(PORTS_TO_CLEAN) except Exception as e: print(f"Failed to kill process group: {e}") raise RuntimeError(f"API server did not start on port {FD_API_PORT}") @@ -234,7 +234,7 @@ def setup_and_run_server(): os.killpg(process_redis.pid, signal.SIGTERM) os.killpg(process_prefill.pid, signal.SIGTERM) os.killpg(process_decode.pid, signal.SIGTERM) - clean_ports(PORTS_TO_CLEAN) + clean(PORTS_TO_CLEAN) print(f"Prefill server (pid={process_prefill.pid}) terminated") print(f"Decode server (pid={process_decode.pid}) terminated") except Exception as e: diff --git a/tests/e2e/test_ernie_03b_router.py b/tests/e2e/test_ernie_03b_router.py index 4b2cf71df..d8948cc12 100644 --- a/tests/e2e/test_ernie_03b_router.py +++ b/tests/e2e/test_ernie_03b_router.py @@ -18,19 +18,21 @@ import json import os import shutil import signal -import socket import subprocess import sys import time import pytest import requests +from utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean, +) # Read ports from environment variables; use default values if not set -FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) -FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) -FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) -FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333)) FD_ROUTER_PORT = int(os.getenv("FD_ROUTER_PORT", 8533)) # List of ports to clean before and after tests @@ -47,42 +49,6 @@ PORTS_TO_CLEAN = [ ] -def is_port_open(host: str, port: int, timeout=1.0): - """ - Check if a TCP port is open on the given host. - Returns True if connection succeeds, False otherwise. - """ - try: - with socket.create_connection((host, port), timeout): - return True - except Exception: - return False - - -def check_service_health(base_url: str, timeout: int = 3) -> bool: - """ - Check the health status of a service. - - Args: - base_url (str): The base URL of the service, e.g. "http://127.0.0.1:8080" - timeout (int): Request timeout in seconds. - - Returns: - bool: True if the service is healthy, False otherwise. - """ - if not base_url.startswith("http"): - base_url = f"http://{base_url}" - url = f"{base_url.rstrip('/')}/health" - try: - resp = requests.get(url, timeout=timeout) - if resp.status_code == 200: - return True - else: - return False - except Exception: - return False - - def get_registered_number(router_url) -> list: """ Get the number of registered models in the router. @@ -104,76 +70,6 @@ def get_registered_number(router_url) -> list: return {"mixed": 0, "prefill": 0, "decode": 0} -def kill_process_on_port(port: int): - """ - Kill processes that are listening on the given port. - Uses multiple methods to ensure thorough cleanup. - """ - current_pid = os.getpid() - parent_pid = os.getppid() - - # Method 1: Use lsof to find processes - try: - output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() - for pid in output.splitlines(): - pid = int(pid) - if pid in (current_pid, parent_pid): - print(f"Skip killing current process (pid={pid}) on port {port}") - continue - try: - # First try SIGTERM for graceful shutdown - os.kill(pid, signal.SIGTERM) - time.sleep(1) - # Then SIGKILL if still running - os.kill(pid, signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") - except ProcessLookupError: - pass # Process already terminated - except subprocess.CalledProcessError: - pass - - # Method 2: Use netstat and fuser as backup - try: - # Find processes using netstat and awk - cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1" - output = subprocess.check_output(cmd, shell=True).decode().strip() - for pid in output.splitlines(): - if pid and pid.isdigit(): - pid = int(pid) - if pid in (current_pid, parent_pid): - continue - try: - os.kill(pid, signal.SIGKILL) - print(f"Killed process (netstat) on port {port}, pid={pid}") - except ProcessLookupError: - pass - except (subprocess.CalledProcessError, FileNotFoundError): - pass - - # Method 3: Use fuser if available - try: - subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5) - except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): - pass - - -def clean_ports(): - """ - Kill all processes occupying the ports listed in PORTS_TO_CLEAN. - """ - print(f"Cleaning ports: {PORTS_TO_CLEAN}") - for port in PORTS_TO_CLEAN: - kill_process_on_port(port) - - # Double check and retry if ports are still in use - time.sleep(2) - for port in PORTS_TO_CLEAN: - if is_port_open("127.0.0.1", port, timeout=0.1): - print(f"Port {port} still in use, retrying cleanup...") - kill_process_on_port(port) - time.sleep(1) - - @pytest.fixture(scope="session", autouse=True) def setup_and_run_server(): """ @@ -184,7 +80,7 @@ def setup_and_run_server(): - Tears down server after all tests finish """ print("Pre-test port cleanup...") - clean_ports() + clean(PORTS_TO_CLEAN) print("log dir clean ") if os.path.exists("log_router") and os.path.isdir("log_router"): @@ -323,9 +219,10 @@ def setup_and_run_server(): else: print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") try: + os.killpg(process_router.pid, signal.SIGTERM) os.killpg(process_server_0.pid, signal.SIGTERM) os.killpg(process_server_1.pid, signal.SIGTERM) - clean_ports() + clean(PORTS_TO_CLEAN) except Exception as e: print(f"Failed to kill process group: {e}") raise RuntimeError(f"API server did not start on port {FD_API_PORT}") @@ -337,7 +234,7 @@ def setup_and_run_server(): os.killpg(process_router.pid, signal.SIGTERM) os.killpg(process_server_0.pid, signal.SIGTERM) os.killpg(process_server_1.pid, signal.SIGTERM) - clean_ports() + clean(PORTS_TO_CLEAN) print(f"server (pid={process_server_0.pid}) terminated") print(f"server (pid={process_server_1.pid}) terminated") except Exception as e: diff --git a/tests/e2e/test_ernie_21b_mtp.py b/tests/e2e/test_ernie_21b_mtp.py index d285a0335..dd05cdd6a 100644 --- a/tests/e2e/test_ernie_21b_mtp.py +++ b/tests/e2e/test_ernie_21b_mtp.py @@ -27,7 +27,7 @@ from utils.serving_utils import ( FD_CACHE_QUEUE_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, - clean_ports, + clean, is_port_open, ) @@ -42,7 +42,7 @@ def setup_and_run_server(): - Tears down server after all tests finish """ print("Pre-test port cleanup...") - clean_ports() + clean() print("log dir clean ") if os.path.exists("log") and os.path.isdir("log"): @@ -107,7 +107,7 @@ def setup_and_run_server(): print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") try: os.killpg(process.pid, signal.SIGTERM) - clean_ports() + clean() except Exception as e: print(f"Failed to kill process group: {e}") raise RuntimeError(f"API server did not start on port {FD_API_PORT}") @@ -117,7 +117,7 @@ def setup_and_run_server(): print("\n===== Post-test server cleanup... =====") try: os.killpg(process.pid, signal.SIGTERM) - clean_ports() + clean() print(f"server (pid={process.pid}) terminated") except Exception as e: print(f"Failed to terminate API server: {e}") diff --git a/tests/e2e/utils/get_rdma_nics.sh b/tests/e2e/utils/get_rdma_nics.sh new file mode 100644 index 000000000..59d945718 --- /dev/null +++ b/tests/e2e/utils/get_rdma_nics.sh @@ -0,0 +1,229 @@ +#!/bin/bash + +# It is used to get the RDMA NICs +# This file is same as the one in fastdeploy/scripts/get_rdma_nics.sh + +Cur_Dir=$(cd `dirname $0`; pwd) +NICNAME_TYPE=xgbe # 默认检测类型 +type=$1 + +if [ "$ENABLE_EP_DP" == "1" ]; then + gpu_root_port_filename="${Cur_Dir}/gpu_rootport_${DP_RANK}.txt" +else + gpu_root_port_filename="${Cur_Dir}/gpu_rootport.txt" +fi + +function __NEW_GPU_ROOTPORT_FILE__() { + touch ${gpu_root_port_filename} 2>/dev/null + echo "" > ${gpu_root_port_filename} 2>/dev/null + for gpu_bus in $(lspci 2>/dev/null | grep -iE "Communication controller: | controller: NVIDIA" | awk '{print $1}') + do + readlink "/sys/bus/pci/devices/0000:${gpu_bus}" 2>/dev/null | awk -F [/] '{print $6}' >> ${gpu_root_port_filename} + done +} + +function __RM_GPU_ROOTPORT_FILE__() { + rm -rf ${gpu_root_port_filename} 2>/dev/null +} + +function __JUDGE_NIC_TYPE__() { + XGBE_NUM=$(ip a 2>/dev/null | grep -c ": ${NICNAME_TYPE}") + gpu_first=true + xpu_first=true + cpu_first=true + + for (( xgbe_no=0; xgbe_no < XGBE_NUM; xgbe_no++ )) + do + [ ! -d "/sys/class/net/${NICNAME_TYPE}${xgbe_no}" ] && continue + + PCI_ADDRESS=$(ethtool -i "${NICNAME_TYPE}${xgbe_no}" 2>/dev/null | awk -F '0000:' '/bus-info/{print $2}') + [ -z "$PCI_ADDRESS" ] && continue + NIC_ROOTPORT=$(readlink "/sys/bus/pci/devices/0000:${PCI_ADDRESS}" 2>/dev/null | awk -F '/' '{print $6}') + + NIC_TYPE="CPU_NIC" + grep -qxF "$NIC_ROOTPORT" ${gpu_root_port_filename} 2>/dev/null && NIC_TYPE="GPU_NIC" + + if [[ "$type" == "gpu" && "$NIC_TYPE" == "GPU_NIC" ]]; then + ibdev=$(ibdev2netdev 2>/dev/null | awk -v nic="${NICNAME_TYPE}${xgbe_no}" '$5 == nic {print $1}') + if [ -n "$ibdev" ] && ip link show "${NICNAME_TYPE}${xgbe_no}" | grep -q "state UP"; then + if $gpu_first; then + printf "KVCACHE_RDMA_NICS=%s" "$ibdev" + gpu_first=false + else + printf ",%s" "$ibdev" + fi + fi + fi + + if [[ "$type" == "xpu" && "$NIC_TYPE" == "GPU_NIC" ]]; then + ibdev=$(ibdev2netdev 2>/dev/null | awk -v nic="${NICNAME_TYPE}${xgbe_no}" '$5 == nic {print $1}') + if [ -n "$ibdev" ] && ip link show "${NICNAME_TYPE}${xgbe_no}" | grep -q "state UP"; then + if $xpu_first; then + printf "KVCACHE_RDMA_NICS=%s,%s" "$ibdev" "$ibdev" + xpu_first=false + else + printf ",%s,%s" "$ibdev" "$ibdev" + fi + fi + fi + + if [[ "$type" == "cpu" ]]; then + for (( xgbe_no=0; xgbe_no < XGBE_NUM; xgbe_no++ )) + do + [ ! -d "/sys/class/net/${NICNAME_TYPE}${xgbe_no}" ] && continue + + PCI_ADDRESS=$(ethtool -i "${NICNAME_TYPE}${xgbe_no}" 2>/dev/null | awk -F '0000:' '/bus-info/{print $2}') + [ -z "$PCI_ADDRESS" ] && continue + + NIC_ROOTPORT=$(readlink "/sys/bus/pci/devices/0000:${PCI_ADDRESS}" 2>/dev/null | awk -F '/' '{print $6}') + grep -qxF "$NIC_ROOTPORT" ${gpu_root_port_filename} 2>/dev/null && continue + + if ip link show "${NICNAME_TYPE}${xgbe_no}" | grep -q "state UP" && \ + ip a show "${NICNAME_TYPE}${xgbe_no}" | grep -q "inet"; then + printf "KV_CACHE_SOCKET_IFNAME=%s\n" "${NICNAME_TYPE}${xgbe_no}" + return 0 + fi + done + echo "ERROR: No active CPU NIC with IP found!" >&2 + return 1 + fi + + if [[ "$type" == "cpu_ib" && "$NIC_TYPE" == "CPU_NIC" ]]; then + ibdev=$(ibdev2netdev 2>/dev/null | awk -v nic="${NICNAME_TYPE}${xgbe_no}" '$5 == nic {print $1}') + if [ -n "$ibdev" ] && ip link show "${NICNAME_TYPE}${xgbe_no}" | grep -q "state UP" && \ + ip a show "${NICNAME_TYPE}${xgbe_no}" | grep -q "inet "; then + if $cpu_ib_first; then + printf "KVCACHE_RDMA_NICS=%s" "$ibdev" + cpu_ib_first=false + else + printf ",%s" "$ibdev" + fi + fi + fi + + done + + case "$type" in + gpu) ! $gpu_first && printf "\n" ;; + xpu) ! $xpu_first && printf "\n" ;; + cpu) ! $cpu_first && printf "\n" ;; + cpu_ib) ! $cpu_ib_first && printf "\n" ;; + esac +} + +function get_vxpu_nics() { + local topo_output=$(xpu-smi topo -m) + local xpu_info=$(echo "$topo_output" | grep -E '^XPU[0-9]+') + + local nic_mapping=() + while IFS= read -r line; do + if [[ $line =~ NIC([0-9]+):\ +(mlx[0-9_]+) ]]; then + local nic_idx=${BASH_REMATCH[1]} + local nic_name=${BASH_REMATCH[2]} + nic_mapping[$nic_idx]=$nic_name + fi + done < <(echo "$topo_output" | grep -E '^\s*NIC[0-9]+:') + + local nic_count=${#nic_mapping[@]} + + declare -A priority_map=([PIX]=2 [NODE]=1 [SYS]=0) + local optimal_nics=() + + while IFS= read -r line; do + local fields=($line) + local nic_start_index=5 + local max_nics=$(( ${#fields[@]} - nic_start_index )) + local actual_nic_count=$(( max_nics < nic_count ? max_nics : nic_count )) + + local best_priority=-1 + local best_nic="" + + for ((nic_idx=0; nic_idx best_priority )); then + best_priority=$current_priority + best_nic="${nic_mapping[$nic_idx]}" + fi + done + + if [[ -n "$best_nic" ]]; then + optimal_nics+=("$best_nic") + fi + done <<< "$xpu_info" + + local IFS=, + export KVCACHE_RDMA_NICS="${optimal_nics[*]}" + echo "KVCACHE_RDMA_NICS=${optimal_nics[*]}" +} + +function get_vcpu_nics() { + ip -o addr show | awk '$3 == "inet" && $4 ~ /^10\./ {print "KV_CACHE_SOCKET_IFNAME="$2; exit}' +} + +function __main__() { + if [[ "$type" == "vxpu" ]]; then + get_vxpu_nics + return 0 + fi + if [[ "$type" == "vcpu" ]]; then + get_vcpu_nics + return 0 + fi + + # 处理 bond 情况 + if [[ "$type" == "cpu" ]]; then + for bond in $(ls -d /sys/class/net/bond* 2>/dev/null); do + bond_if=$(basename "$bond") + if ip link show "$bond_if" | grep -q "state UP" && \ + ip a show "$bond_if" | grep -q "inet "; then + printf "KV_CACHE_SOCKET_IFNAME=%s\n" "$bond_if" + return 0 + fi + done + fi + + if [[ "$type" == "cpu_ib" ]]; then + first=true + for bond in $(ls -d /sys/class/net/bond* 2>/dev/null); do + bond_if=$(basename "$bond") + __NEW_GPU_ROOTPORT_FILE__ + + ibdev=$(ibdev2netdev 2>/dev/null | grep -w "$bond_if" | awk '{print $1}') + if [ -n "$ibdev" ] && ip link show "$bond_if" | grep -q "state UP" && \ + ip a show "$bond_if" | grep -q "inet "; then + if $first; then + printf "KVCACHE_RDMA_NICS=%s" "$ibdev" + first=false + else + printf ",%s" "$ibdev" + fi + fi + + bondib=$(show_gids 2>/dev/null | grep -w "$bond_if" | awk '{print $1}' | grep "mlx.*bond" | head -1) + if [ -n "$bondib" ] && ip link show "$bond_if" | grep -q "state UP" && \ + ip a show "$bond_if" | grep -q "inet " && $first; then + printf "KVCACHE_RDMA_NICS=%s" "$bondib" + first=false + fi + + __RM_GPU_ROOTPORT_FILE__ + done + + ! $first && printf "\n" + [ ! $first ] && return 0 + fi + + local nic_types=("eth" "ib" "xgbe") + for nt in "${nic_types[@]}"; do + if ip a | grep -iq "$nt"; then + __NEW_GPU_ROOTPORT_FILE__ + NICNAME_TYPE=$nt + __JUDGE_NIC_TYPE__ + __RM_GPU_ROOTPORT_FILE__ + fi + done +} + +__main__ diff --git a/tests/e2e/utils/serving_utils.py b/tests/e2e/utils/serving_utils.py index ad2538e49..07acb42a3 100644 --- a/tests/e2e/utils/serving_utils.py +++ b/tests/e2e/utils/serving_utils.py @@ -28,6 +28,17 @@ def is_port_open(host: str, port: int, timeout=1.0): return False +def _clean_cuda_process(): + """ + Kill processes that are using CUDA devices. + NOTE: Do not call this function directly, use the `clean` function instead. + """ + try: + subprocess.run("fuser -k /dev/nvidia*", shell=True, timeout=5) + except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): + pass + + def kill_process_on_port(port: int): """ Kill processes that are listening on the given port. @@ -101,6 +112,19 @@ def clean_ports(ports=None): time.sleep(1) +def clean(ports=None): + """ + Clean up resources used during testing. + """ + clean_ports(ports) + + # Clean CUDA devices before and after tests. + # NOTE: It is dangerous to use this flag on development machines, as it may kill other processes + clean_cuda = int(os.getenv("CLEAN_CUDA", "0")) == 1 + if clean_cuda: + _clean_cuda_process() + + def check_service_health(base_url: str, timeout: int = 3) -> bool: """ Check the health status of a service.