[PD Disaggregation] Add unittest for splitwise deployment with using rdma (#5189)
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled

* Add splitwise deployment with using rdma
* clean cuda
This commit is contained in:
Juncai
2025-11-27 14:27:17 +08:00
committed by GitHub
parent 373b5c3807
commit ce9a49f6bf
9 changed files with 723 additions and 139 deletions

View File

@@ -105,6 +105,7 @@ jobs:
FD_CACHE_QUEUE_PORT=$((8098 + DEVICE_PORT * 100))
FD_ROUTER_PORT=$((8048 + DEVICE_PORT * 100))
FD_CONNECTOR_PORT=$((8038 + DEVICE_PORT * 100))
FD_RDMA_PORT=$((8028 + DEVICE_PORT * 100))
echo "Test ENV Parameter:"
echo "========================================================="
echo "FLASK_PORT=${FLASK_PORT}"
@@ -114,6 +115,7 @@ jobs:
echo "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}"
echo "FD_ROUTER_PORT=${FD_ROUTER_PORT}"
echo "FD_CONNECTOR_PORT=${FD_CONNECTOR_PORT}"
echo "FD_RDMA_PORT=${FD_RDMA_PORT}"
echo "DEVICES=${DEVICES}"
echo "========================================================="
@@ -149,9 +151,15 @@ jobs:
docker rm -f ${runner_name} || true
fi
export RDMA_DEVICES=$(find /dev/infiniband/uverbs* -maxdepth 1 -not -type d | xargs -I{} echo '--device {}:{}')
docker run --rm --net=host \
--name ${runner_name} \
--cap-add=SYS_PTRACE --shm-size=64G \
--cap-add=SYS_PTRACE --cap-add=IPC_LOCK \
--shm-size=64G \
${RDMA_DEVICES} \
--device=/dev/infiniband/rdma_cm \
--ulimit memlock=-1:-1 \
-v $(pwd):/workspace -w /workspace \
-v "${CACHE_DIR}/gitconfig:/etc/gitconfig:ro" \
-v "${CACHE_DIR}/.cache:/root/.cache" \
@@ -165,6 +173,8 @@ jobs:
-e "FD_CACHE_QUEUE_PORT=${FD_CACHE_QUEUE_PORT}" \
-e "FD_ROUTER_PORT=${FD_ROUTER_PORT}" \
-e "FD_CONNECTOR_PORT=${FD_CONNECTOR_PORT}" \
-e "FD_RDMA_PORT=${FD_RDMA_PORT}" \
-e "CLEAN_CUDA=1" \
-e TZ="Asia/Shanghai" \
-e "fd_wheel_url=${fd_wheel_url}" \
-e "BASE_REF=${BASE_REF}" \

View File

@@ -30,7 +30,7 @@ from utils.serving_utils import (
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
clean,
get_registered_number,
)
@@ -64,7 +64,7 @@ def setup_and_run_server():
- Tears down server after all tests finish
"""
print("Pre-test port cleanup...")
clean_ports(PORTS_TO_CLEAN)
clean(PORTS_TO_CLEAN)
print("log dir clean ")
if os.path.exists("log_router") and os.path.isdir("log_router"):
@@ -111,7 +111,7 @@ def setup_and_run_server():
env_prefill["CUDA_VISIBLE_DEVICES"] = "0"
env_prefill["ENABLE_V1_KVCACHE_SCHEDULER"] = "0"
env_prefill["FD_LOG_DIR"] = "log_prefill"
prefill_log_path = "server.log"
prefill_log_path = "server_prefill.log"
prefill_cmd = [
sys.executable,
"-m",
@@ -161,7 +161,7 @@ def setup_and_run_server():
env_decode["CUDA_VISIBLE_DEVICES"] = "1"
env_decode["ENABLE_V1_KVCACHE_SCHEDULER"] = "0"
env_decode["FD_LOG_DIR"] = "log_decode"
decode_log_path = "decode_server.log"
decode_log_path = "server_decode.log"
decode_cmd = [
sys.executable,
"-m",
@@ -216,7 +216,7 @@ def setup_and_run_server():
try:
os.killpg(process_prefill.pid, signal.SIGTERM)
os.killpg(process_decode.pid, signal.SIGTERM)
clean_ports()
clean()
except Exception as e:
print(f"Failed to kill process group: {e}")
raise RuntimeError(f"API server did not start on port {FD_API_PORT}")
@@ -228,7 +228,7 @@ def setup_and_run_server():
os.killpg(process_router.pid, signal.SIGTERM)
os.killpg(process_prefill.pid, signal.SIGTERM)
os.killpg(process_decode.pid, signal.SIGTERM)
clean_ports(PORTS_TO_CLEAN)
clean(PORTS_TO_CLEAN)
print(f"Prefill server (pid={process_prefill.pid}) terminated")
print(f"Decode server (pid={process_decode.pid}) terminated")
except Exception as e:

View File

@@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# Test splitwise deployment which uses local_scheduler + router,
# and ENABLE_V1_KVCACHE_SCHEDULER is 1
# Test splitwise deployment: use local_scheduler + router,
# set ENABLE_V1_KVCACHE_SCHEDULER is 1, use ipc to transfer cache.
import json
import os
@@ -30,7 +30,7 @@ from utils.serving_utils import (
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
clean,
get_registered_number,
)
@@ -64,7 +64,7 @@ def setup_and_run_server():
- Tears down server after all tests finish
"""
print("Pre-test port cleanup...")
clean_ports(PORTS_TO_CLEAN)
clean(PORTS_TO_CLEAN)
print("log dir clean ")
if os.path.exists("log_router") and os.path.isdir("log_router"):
@@ -111,7 +111,7 @@ def setup_and_run_server():
env_prefill["CUDA_VISIBLE_DEVICES"] = "0"
env_prefill["ENABLE_V1_KVCACHE_SCHEDULER"] = "1"
env_prefill["FD_LOG_DIR"] = "log_prefill"
prefill_log_path = "server.log"
prefill_log_path = "prefill.log"
prefill_cmd = [
sys.executable,
"-m",
@@ -161,7 +161,7 @@ def setup_and_run_server():
env_decode["CUDA_VISIBLE_DEVICES"] = "1"
env_decode["ENABLE_V1_KVCACHE_SCHEDULER"] = "1"
env_decode["FD_LOG_DIR"] = "log_decode"
decode_log_path = "decode_server.log"
decode_log_path = "decode.log"
decode_cmd = [
sys.executable,
"-m",
@@ -214,9 +214,10 @@ def setup_and_run_server():
else:
print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...")
try:
os.killpg(process_router.pid, signal.SIGTERM)
os.killpg(process_prefill.pid, signal.SIGTERM)
os.killpg(process_decode.pid, signal.SIGTERM)
clean_ports()
clean(PORTS_TO_CLEAN)
except Exception as e:
print(f"Failed to kill process group: {e}")
raise RuntimeError(f"API server did not start on port {FD_API_PORT}")
@@ -228,7 +229,7 @@ def setup_and_run_server():
os.killpg(process_router.pid, signal.SIGTERM)
os.killpg(process_prefill.pid, signal.SIGTERM)
os.killpg(process_decode.pid, signal.SIGTERM)
clean_ports(PORTS_TO_CLEAN)
clean(PORTS_TO_CLEAN)
print(f"Prefill server (pid={process_prefill.pid}) terminated")
print(f"Decode server (pid={process_decode.pid}) terminated")
except Exception as e:

View File

@@ -0,0 +1,423 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Test splitwise deployment: use local_scheduler + router,
# set ENABLE_V1_KVCACHE_SCHEDULER is 1, use rdma to transfer cache.
import json
import os
import shutil
import signal
import subprocess
import sys
import time
import pytest
import requests
from utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean,
get_registered_number,
)
# Read ports from environment variables; use default values if not set
FD_CONNECTOR_PORT = int(os.getenv("FD_CONNECTOR_PORT", 8433))
FD_ROUTER_PORT = int(os.getenv("FD_ROUTER_PORT", 8533))
FD_RDMA_PORT = int(os.getenv("FD_RDMA_PORT", 8623))
# List of ports to clean before and after tests
PORTS_TO_CLEAN = [
FD_API_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
FD_CACHE_QUEUE_PORT,
FD_CONNECTOR_PORT,
FD_RDMA_PORT,
FD_API_PORT + 1,
FD_ENGINE_QUEUE_PORT + 1,
FD_METRICS_PORT + 1,
FD_CACHE_QUEUE_PORT + 1,
FD_CONNECTOR_PORT + 1,
FD_RDMA_PORT + 1,
FD_ROUTER_PORT,
]
@pytest.fixture(scope="session", autouse=True)
def setup_and_run_server():
"""
Pytest fixture that runs once per test session:
- Cleans ports before tests
- Starts the API server as a subprocess
- Waits for server port to open (up to 30 seconds)
- Tears down server after all tests finish
"""
print("Pre-test port cleanup...")
clean(PORTS_TO_CLEAN)
print("log dir clean ")
if os.path.exists("log_router") and os.path.isdir("log_router"):
shutil.rmtree("log_router")
if os.path.exists("log_prefill") and os.path.isdir("log_prefill"):
shutil.rmtree("log_prefill")
if os.path.exists("log_decode") and os.path.isdir("log_decode"):
shutil.rmtree("log_decode")
base_path = os.getenv("MODEL_PATH")
if base_path:
model_path = os.path.join(base_path, "ERNIE-4.5-0.3B-Paddle")
else:
model_path = "baidu/ERNIE-4.5-0.3B-Paddle"
print(f"model_path: {model_path}")
# get rdma nics
current_dir = os.path.dirname(os.path.abspath(__file__))
shell_path = os.path.join(current_dir, "utils/get_rdma_nics.sh")
output = subprocess.check_output(["bash", shell_path, "gpu"], text=True)
_, rdma_nics = output.split("=")
print(f"shell_path: {shell_path}, rdma_nics: {rdma_nics}")
# router
print("start router...")
env_router = os.environ.copy()
env_router["FD_LOG_DIR"] = "log_router"
router_log_path = "router.log"
router_cmd = [
sys.executable,
"-m",
"fastdeploy.router.launch",
"--port",
str(FD_ROUTER_PORT),
"--splitwise",
]
with open(router_log_path, "w") as logfile:
process_router = subprocess.Popen(
router_cmd,
stdout=logfile,
stderr=subprocess.STDOUT,
start_new_session=True, # Enables killing full group via os.killpg
env=env_router,
)
# prefill实例
print("start prefill...")
env_prefill = os.environ.copy()
env_prefill["CUDA_VISIBLE_DEVICES"] = "0"
env_prefill["FD_LOG_DIR"] = "log_prefill"
env_prefill["KVCACHE_RDMA_NICS"] = rdma_nics
prefill_log_path = "prefill.log"
prefill_cmd = [
sys.executable,
"-m",
"fastdeploy.entrypoints.openai.api_server",
"--model",
model_path,
"--port",
str(FD_API_PORT),
"--engine-worker-queue-port",
str(FD_ENGINE_QUEUE_PORT),
"--metrics-port",
str(FD_METRICS_PORT),
"--cache-queue-port",
str(FD_CACHE_QUEUE_PORT),
"--max-model-len",
"8192",
"--splitwise-role",
"prefill",
"--cache-transfer-protocol",
"rdma",
"--rdma-comm-ports",
str(FD_RDMA_PORT),
"--pd-comm-port",
str(FD_CONNECTOR_PORT),
"--router",
f"0.0.0.0:{FD_ROUTER_PORT}",
]
# Start subprocess in new process group
with open(prefill_log_path, "w") as logfile:
process_prefill = subprocess.Popen(
prefill_cmd,
stdout=logfile,
stderr=subprocess.STDOUT,
start_new_session=True, # Enables killing full group via os.killpg
env=env_prefill,
)
time.sleep(1)
# decode实例
print("start decode...")
env_decode = os.environ.copy()
env_decode["CUDA_VISIBLE_DEVICES"] = "1"
env_decode["FD_LOG_DIR"] = "log_decode"
env_decode["KVCACHE_RDMA_NICS"] = rdma_nics
decode_log_path = "decode.log"
decode_cmd = [
sys.executable,
"-m",
"fastdeploy.entrypoints.openai.api_server",
"--model",
model_path,
"--port",
str(FD_API_PORT + 1),
"--engine-worker-queue-port",
str(FD_ENGINE_QUEUE_PORT + 1),
"--metrics-port",
str(FD_METRICS_PORT + 1),
"--cache-queue-port",
str(FD_CACHE_QUEUE_PORT + 1),
"--max-model-len",
"8192",
"--splitwise-role",
"decode",
"--cache-transfer-protocol",
"rdma",
"--rdma-comm-ports",
str(FD_RDMA_PORT + 1),
"--pd-comm-port",
str(FD_CONNECTOR_PORT + 1),
"--router",
f"0.0.0.0:{FD_ROUTER_PORT}",
]
# Start subprocess in new process group
with open(decode_log_path, "w") as logfile:
process_decode = subprocess.Popen(
decode_cmd,
stdout=logfile,
stderr=subprocess.STDOUT,
start_new_session=True, # Enables killing full group via os.killpg
env=env_decode,
)
# Wait up to 300 seconds for API server to be ready
for _ in range(60):
registered_numbers = get_registered_number(f"0.0.0.0:{FD_ROUTER_PORT}")
if registered_numbers["prefill"] >= 1 and registered_numbers["decode"] >= 1:
print("Prefill and decode servers are both online")
break
time.sleep(5)
else:
print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...")
try:
os.killpg(process_router.pid, signal.SIGTERM)
os.killpg(process_prefill.pid, signal.SIGTERM)
os.killpg(process_decode.pid, signal.SIGTERM)
clean(PORTS_TO_CLEAN)
except Exception as e:
print(f"Failed to kill process group: {e}")
raise RuntimeError(f"API server did not start on port {FD_API_PORT}")
yield # Run tests
print("\n===== Post-test server cleanup... =====")
try:
os.killpg(process_router.pid, signal.SIGTERM)
os.killpg(process_prefill.pid, signal.SIGTERM)
os.killpg(process_decode.pid, signal.SIGTERM)
clean(PORTS_TO_CLEAN)
print(f"Prefill server (pid={process_prefill.pid}) terminated")
print(f"Decode server (pid={process_decode.pid}) terminated")
except Exception as e:
print(f"Failed to terminate API server: {e}")
@pytest.fixture(scope="session")
def api_url(request):
"""
Returns the API endpoint URL for chat completions.
"""
return f"http://0.0.0.0:{FD_ROUTER_PORT}/v1/chat/completions"
@pytest.fixture(scope="session")
def metrics_url(request):
"""
Returns the metrics endpoint URL.
"""
return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics"
@pytest.fixture
def headers():
"""
Returns common HTTP request headers.
"""
return {"Content-Type": "application/json"}
def test_metrics_config(metrics_url):
timeout = 600
url = metrics_url.replace("metrics", "config-info")
res = requests.get(url, timeout=timeout)
assert res.status_code == 200
def send_request(url, payload, timeout=600):
"""
发送请求到指定的URL并返回响应结果。
"""
headers = {
"Content-Type": "application/json",
}
try:
res = requests.post(url, headers=headers, json=payload, timeout=timeout)
print("🟢 接收响应中...\n")
return res
except requests.exceptions.Timeout:
print(f"❌ 请求超时(超过 {timeout} 秒)")
return None
except requests.exceptions.RequestException as e:
print(f"❌ 请求失败:{e}")
return None
def get_stream_chunks(response):
"""解析流式返回生成chunk List[dict]"""
chunks = []
if response.status_code == 200:
for line in response.iter_lines(decode_unicode=True):
if line:
if line.startswith("data: "):
line = line[len("data: ") :]
if line.strip() == "[DONE]":
break
try:
chunk = json.loads(line)
chunks.append(chunk)
except Exception as e:
print(f"解析失败: {e}, 行内容: {line}")
else:
print(f"请求失败,状态码: {response.status_code}")
print("返回内容:", response.text)
return chunks
def test_chat_usage_stream(api_url):
"""测试流式chat usage"""
payload = {
"model": "default",
"temperature": 0,
"top_p": 0,
"seed": 33,
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "牛顿的三大运动定律是什么?"},
],
"max_tokens": 50,
"stream": True,
"stream_options": {"include_usage": True, "continuous_usage_stats": True},
"metadata": {"min_tokens": 10},
}
response = send_request(url=api_url, payload=payload)
chunks = get_stream_chunks(response)
result = "".join([x["choices"][0]["delta"]["content"] for x in chunks[:-1]])
print("Decode Response:", result)
assert result != "", "结果为空"
usage = chunks[-1]["usage"]
total_tokens = usage["completion_tokens"] + usage["prompt_tokens"]
assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens"
assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens"
assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens"
def test_chat_usage_non_stream(api_url):
"""测试非流式chat usage"""
payload = {
"model": "default",
"temperature": 0,
"top_p": 0,
"seed": 33,
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "牛顿的三大运动定律是什么?"},
],
"max_tokens": 50,
"stream": False,
"metadata": {"min_tokens": 10},
}
response = send_request(url=api_url, payload=payload).json()
usage = response["usage"]
result = response["choices"][0]["message"]["content"]
assert result != "", "结果为空"
total_tokens = usage["completion_tokens"] + usage["prompt_tokens"]
assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens"
assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens"
assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens"
def test_non_chat_usage_stream(api_url):
"""测试流式非chat usage"""
payload = {
"model": "default",
"temperature": 0,
"top_p": 0,
"seed": 33,
"prompt": "牛顿的三大运动定律是什么?",
"max_tokens": 50,
"stream": True,
"stream_options": {"include_usage": True, "continuous_usage_stats": True},
"metadata": {"min_tokens": 10},
}
api_url = api_url.replace("chat/completions", "completions")
response = send_request(url=api_url, payload=payload)
chunks = get_stream_chunks(response)
result = "".join([x["choices"][0]["text"] for x in chunks[:-1]])
print("Decode Response:", result)
assert result != "", "结果为空"
usage = chunks[-1]["usage"]
total_tokens = usage["completion_tokens"] + usage["prompt_tokens"]
assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens"
assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens"
assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens"
def test_non_chat_usage_non_stream(api_url):
"""测试非流式非chat usage"""
payload = {
"model": "default",
"temperature": 0,
"top_p": 0,
"seed": 33,
"prompt": "牛顿的三大运动定律是什么?",
"max_tokens": 50,
"stream": False,
"metadata": {"min_tokens": 10},
}
api_url = api_url.replace("chat/completions", "completions")
response = send_request(url=api_url, payload=payload).json()
usage = response["usage"]
result = response["choices"][0]["text"]
print("Decode Response:", result)
assert result != "", "结果为空"
total_tokens = usage["completion_tokens"] + usage["prompt_tokens"]
assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens"
assert payload["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens"
assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens"

View File

@@ -30,7 +30,7 @@ from utils.serving_utils import (
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
clean,
is_port_open,
)
@@ -64,7 +64,7 @@ def setup_and_run_server():
- Tears down server after all tests finish
"""
print("Pre-test port cleanup...")
clean_ports(PORTS_TO_CLEAN)
clean(PORTS_TO_CLEAN)
print("log dir clean ")
if os.path.exists("log_redis") and os.path.isdir("log_redis"):
@@ -109,7 +109,7 @@ def setup_and_run_server():
env_prefill["CUDA_VISIBLE_DEVICES"] = "0"
env_prefill["ENABLE_V1_KVCACHE_SCHEDULER"] = "0"
env_prefill["FD_LOG_DIR"] = "log_prefill"
prefill_log_path = "server.log"
prefill_log_path = "server_prefill.log"
prefill_cmd = [
sys.executable,
"-m",
@@ -163,7 +163,7 @@ def setup_and_run_server():
env_decode["CUDA_VISIBLE_DEVICES"] = "1"
env_decode["ENABLE_V1_KVCACHE_SCHEDULER"] = "0"
env_decode["FD_LOG_DIR"] = "log_decode"
decode_log_path = "decode_server.log"
decode_log_path = "server_decode.log"
decode_cmd = [
sys.executable,
"-m",
@@ -222,7 +222,7 @@ def setup_and_run_server():
try:
os.killpg(process_prefill.pid, signal.SIGTERM)
os.killpg(process_decode.pid, signal.SIGTERM)
clean_ports(PORTS_TO_CLEAN)
clean(PORTS_TO_CLEAN)
except Exception as e:
print(f"Failed to kill process group: {e}")
raise RuntimeError(f"API server did not start on port {FD_API_PORT}")
@@ -234,7 +234,7 @@ def setup_and_run_server():
os.killpg(process_redis.pid, signal.SIGTERM)
os.killpg(process_prefill.pid, signal.SIGTERM)
os.killpg(process_decode.pid, signal.SIGTERM)
clean_ports(PORTS_TO_CLEAN)
clean(PORTS_TO_CLEAN)
print(f"Prefill server (pid={process_prefill.pid}) terminated")
print(f"Decode server (pid={process_decode.pid}) terminated")
except Exception as e:

View File

@@ -18,19 +18,21 @@ import json
import os
import shutil
import signal
import socket
import subprocess
import sys
import time
import pytest
import requests
from utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean,
)
# Read ports from environment variables; use default values if not set
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))
FD_ROUTER_PORT = int(os.getenv("FD_ROUTER_PORT", 8533))
# List of ports to clean before and after tests
@@ -47,42 +49,6 @@ PORTS_TO_CLEAN = [
]
def is_port_open(host: str, port: int, timeout=1.0):
"""
Check if a TCP port is open on the given host.
Returns True if connection succeeds, False otherwise.
"""
try:
with socket.create_connection((host, port), timeout):
return True
except Exception:
return False
def check_service_health(base_url: str, timeout: int = 3) -> bool:
"""
Check the health status of a service.
Args:
base_url (str): The base URL of the service, e.g. "http://127.0.0.1:8080"
timeout (int): Request timeout in seconds.
Returns:
bool: True if the service is healthy, False otherwise.
"""
if not base_url.startswith("http"):
base_url = f"http://{base_url}"
url = f"{base_url.rstrip('/')}/health"
try:
resp = requests.get(url, timeout=timeout)
if resp.status_code == 200:
return True
else:
return False
except Exception:
return False
def get_registered_number(router_url) -> list:
"""
Get the number of registered models in the router.
@@ -104,76 +70,6 @@ def get_registered_number(router_url) -> list:
return {"mixed": 0, "prefill": 0, "decode": 0}
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
Uses multiple methods to ensure thorough cleanup.
"""
current_pid = os.getpid()
parent_pid = os.getppid()
# Method 1: Use lsof to find processes
try:
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
for pid in output.splitlines():
pid = int(pid)
if pid in (current_pid, parent_pid):
print(f"Skip killing current process (pid={pid}) on port {port}")
continue
try:
# First try SIGTERM for graceful shutdown
os.kill(pid, signal.SIGTERM)
time.sleep(1)
# Then SIGKILL if still running
os.kill(pid, signal.SIGKILL)
print(f"Killed process on port {port}, pid={pid}")
except ProcessLookupError:
pass # Process already terminated
except subprocess.CalledProcessError:
pass
# Method 2: Use netstat and fuser as backup
try:
# Find processes using netstat and awk
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
output = subprocess.check_output(cmd, shell=True).decode().strip()
for pid in output.splitlines():
if pid and pid.isdigit():
pid = int(pid)
if pid in (current_pid, parent_pid):
continue
try:
os.kill(pid, signal.SIGKILL)
print(f"Killed process (netstat) on port {port}, pid={pid}")
except ProcessLookupError:
pass
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# Method 3: Use fuser if available
try:
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
def clean_ports():
"""
Kill all processes occupying the ports listed in PORTS_TO_CLEAN.
"""
print(f"Cleaning ports: {PORTS_TO_CLEAN}")
for port in PORTS_TO_CLEAN:
kill_process_on_port(port)
# Double check and retry if ports are still in use
time.sleep(2)
for port in PORTS_TO_CLEAN:
if is_port_open("127.0.0.1", port, timeout=0.1):
print(f"Port {port} still in use, retrying cleanup...")
kill_process_on_port(port)
time.sleep(1)
@pytest.fixture(scope="session", autouse=True)
def setup_and_run_server():
"""
@@ -184,7 +80,7 @@ def setup_and_run_server():
- Tears down server after all tests finish
"""
print("Pre-test port cleanup...")
clean_ports()
clean(PORTS_TO_CLEAN)
print("log dir clean ")
if os.path.exists("log_router") and os.path.isdir("log_router"):
@@ -323,9 +219,10 @@ def setup_and_run_server():
else:
print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...")
try:
os.killpg(process_router.pid, signal.SIGTERM)
os.killpg(process_server_0.pid, signal.SIGTERM)
os.killpg(process_server_1.pid, signal.SIGTERM)
clean_ports()
clean(PORTS_TO_CLEAN)
except Exception as e:
print(f"Failed to kill process group: {e}")
raise RuntimeError(f"API server did not start on port {FD_API_PORT}")
@@ -337,7 +234,7 @@ def setup_and_run_server():
os.killpg(process_router.pid, signal.SIGTERM)
os.killpg(process_server_0.pid, signal.SIGTERM)
os.killpg(process_server_1.pid, signal.SIGTERM)
clean_ports()
clean(PORTS_TO_CLEAN)
print(f"server (pid={process_server_0.pid}) terminated")
print(f"server (pid={process_server_1.pid}) terminated")
except Exception as e:

View File

@@ -27,7 +27,7 @@ from utils.serving_utils import (
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
clean,
is_port_open,
)
@@ -42,7 +42,7 @@ def setup_and_run_server():
- Tears down server after all tests finish
"""
print("Pre-test port cleanup...")
clean_ports()
clean()
print("log dir clean ")
if os.path.exists("log") and os.path.isdir("log"):
@@ -107,7 +107,7 @@ def setup_and_run_server():
print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...")
try:
os.killpg(process.pid, signal.SIGTERM)
clean_ports()
clean()
except Exception as e:
print(f"Failed to kill process group: {e}")
raise RuntimeError(f"API server did not start on port {FD_API_PORT}")
@@ -117,7 +117,7 @@ def setup_and_run_server():
print("\n===== Post-test server cleanup... =====")
try:
os.killpg(process.pid, signal.SIGTERM)
clean_ports()
clean()
print(f"server (pid={process.pid}) terminated")
except Exception as e:
print(f"Failed to terminate API server: {e}")

View File

@@ -0,0 +1,229 @@
#!/bin/bash
# It is used to get the RDMA NICs
# This file is same as the one in fastdeploy/scripts/get_rdma_nics.sh
Cur_Dir=$(cd `dirname $0`; pwd)
NICNAME_TYPE=xgbe # 默认检测类型
type=$1
if [ "$ENABLE_EP_DP" == "1" ]; then
gpu_root_port_filename="${Cur_Dir}/gpu_rootport_${DP_RANK}.txt"
else
gpu_root_port_filename="${Cur_Dir}/gpu_rootport.txt"
fi
function __NEW_GPU_ROOTPORT_FILE__() {
touch ${gpu_root_port_filename} 2>/dev/null
echo "" > ${gpu_root_port_filename} 2>/dev/null
for gpu_bus in $(lspci 2>/dev/null | grep -iE "Communication controller: | controller: NVIDIA" | awk '{print $1}')
do
readlink "/sys/bus/pci/devices/0000:${gpu_bus}" 2>/dev/null | awk -F [/] '{print $6}' >> ${gpu_root_port_filename}
done
}
function __RM_GPU_ROOTPORT_FILE__() {
rm -rf ${gpu_root_port_filename} 2>/dev/null
}
function __JUDGE_NIC_TYPE__() {
XGBE_NUM=$(ip a 2>/dev/null | grep -c ": ${NICNAME_TYPE}")
gpu_first=true
xpu_first=true
cpu_first=true
for (( xgbe_no=0; xgbe_no < XGBE_NUM; xgbe_no++ ))
do
[ ! -d "/sys/class/net/${NICNAME_TYPE}${xgbe_no}" ] && continue
PCI_ADDRESS=$(ethtool -i "${NICNAME_TYPE}${xgbe_no}" 2>/dev/null | awk -F '0000:' '/bus-info/{print $2}')
[ -z "$PCI_ADDRESS" ] && continue
NIC_ROOTPORT=$(readlink "/sys/bus/pci/devices/0000:${PCI_ADDRESS}" 2>/dev/null | awk -F '/' '{print $6}')
NIC_TYPE="CPU_NIC"
grep -qxF "$NIC_ROOTPORT" ${gpu_root_port_filename} 2>/dev/null && NIC_TYPE="GPU_NIC"
if [[ "$type" == "gpu" && "$NIC_TYPE" == "GPU_NIC" ]]; then
ibdev=$(ibdev2netdev 2>/dev/null | awk -v nic="${NICNAME_TYPE}${xgbe_no}" '$5 == nic {print $1}')
if [ -n "$ibdev" ] && ip link show "${NICNAME_TYPE}${xgbe_no}" | grep -q "state UP"; then
if $gpu_first; then
printf "KVCACHE_RDMA_NICS=%s" "$ibdev"
gpu_first=false
else
printf ",%s" "$ibdev"
fi
fi
fi
if [[ "$type" == "xpu" && "$NIC_TYPE" == "GPU_NIC" ]]; then
ibdev=$(ibdev2netdev 2>/dev/null | awk -v nic="${NICNAME_TYPE}${xgbe_no}" '$5 == nic {print $1}')
if [ -n "$ibdev" ] && ip link show "${NICNAME_TYPE}${xgbe_no}" | grep -q "state UP"; then
if $xpu_first; then
printf "KVCACHE_RDMA_NICS=%s,%s" "$ibdev" "$ibdev"
xpu_first=false
else
printf ",%s,%s" "$ibdev" "$ibdev"
fi
fi
fi
if [[ "$type" == "cpu" ]]; then
for (( xgbe_no=0; xgbe_no < XGBE_NUM; xgbe_no++ ))
do
[ ! -d "/sys/class/net/${NICNAME_TYPE}${xgbe_no}" ] && continue
PCI_ADDRESS=$(ethtool -i "${NICNAME_TYPE}${xgbe_no}" 2>/dev/null | awk -F '0000:' '/bus-info/{print $2}')
[ -z "$PCI_ADDRESS" ] && continue
NIC_ROOTPORT=$(readlink "/sys/bus/pci/devices/0000:${PCI_ADDRESS}" 2>/dev/null | awk -F '/' '{print $6}')
grep -qxF "$NIC_ROOTPORT" ${gpu_root_port_filename} 2>/dev/null && continue
if ip link show "${NICNAME_TYPE}${xgbe_no}" | grep -q "state UP" && \
ip a show "${NICNAME_TYPE}${xgbe_no}" | grep -q "inet"; then
printf "KV_CACHE_SOCKET_IFNAME=%s\n" "${NICNAME_TYPE}${xgbe_no}"
return 0
fi
done
echo "ERROR: No active CPU NIC with IP found!" >&2
return 1
fi
if [[ "$type" == "cpu_ib" && "$NIC_TYPE" == "CPU_NIC" ]]; then
ibdev=$(ibdev2netdev 2>/dev/null | awk -v nic="${NICNAME_TYPE}${xgbe_no}" '$5 == nic {print $1}')
if [ -n "$ibdev" ] && ip link show "${NICNAME_TYPE}${xgbe_no}" | grep -q "state UP" && \
ip a show "${NICNAME_TYPE}${xgbe_no}" | grep -q "inet "; then
if $cpu_ib_first; then
printf "KVCACHE_RDMA_NICS=%s" "$ibdev"
cpu_ib_first=false
else
printf ",%s" "$ibdev"
fi
fi
fi
done
case "$type" in
gpu) ! $gpu_first && printf "\n" ;;
xpu) ! $xpu_first && printf "\n" ;;
cpu) ! $cpu_first && printf "\n" ;;
cpu_ib) ! $cpu_ib_first && printf "\n" ;;
esac
}
function get_vxpu_nics() {
local topo_output=$(xpu-smi topo -m)
local xpu_info=$(echo "$topo_output" | grep -E '^XPU[0-9]+')
local nic_mapping=()
while IFS= read -r line; do
if [[ $line =~ NIC([0-9]+):\ +(mlx[0-9_]+) ]]; then
local nic_idx=${BASH_REMATCH[1]}
local nic_name=${BASH_REMATCH[2]}
nic_mapping[$nic_idx]=$nic_name
fi
done < <(echo "$topo_output" | grep -E '^\s*NIC[0-9]+:')
local nic_count=${#nic_mapping[@]}
declare -A priority_map=([PIX]=2 [NODE]=1 [SYS]=0)
local optimal_nics=()
while IFS= read -r line; do
local fields=($line)
local nic_start_index=5
local max_nics=$(( ${#fields[@]} - nic_start_index ))
local actual_nic_count=$(( max_nics < nic_count ? max_nics : nic_count ))
local best_priority=-1
local best_nic=""
for ((nic_idx=0; nic_idx<actual_nic_count; nic_idx++)); do
local conn_type=${fields[nic_idx+nic_start_index]}
local current_priority=${priority_map[$conn_type]:--1}
if (( current_priority > best_priority )); then
best_priority=$current_priority
best_nic="${nic_mapping[$nic_idx]}"
fi
done
if [[ -n "$best_nic" ]]; then
optimal_nics+=("$best_nic")
fi
done <<< "$xpu_info"
local IFS=,
export KVCACHE_RDMA_NICS="${optimal_nics[*]}"
echo "KVCACHE_RDMA_NICS=${optimal_nics[*]}"
}
function get_vcpu_nics() {
ip -o addr show | awk '$3 == "inet" && $4 ~ /^10\./ {print "KV_CACHE_SOCKET_IFNAME="$2; exit}'
}
function __main__() {
if [[ "$type" == "vxpu" ]]; then
get_vxpu_nics
return 0
fi
if [[ "$type" == "vcpu" ]]; then
get_vcpu_nics
return 0
fi
# 处理 bond 情况
if [[ "$type" == "cpu" ]]; then
for bond in $(ls -d /sys/class/net/bond* 2>/dev/null); do
bond_if=$(basename "$bond")
if ip link show "$bond_if" | grep -q "state UP" && \
ip a show "$bond_if" | grep -q "inet "; then
printf "KV_CACHE_SOCKET_IFNAME=%s\n" "$bond_if"
return 0
fi
done
fi
if [[ "$type" == "cpu_ib" ]]; then
first=true
for bond in $(ls -d /sys/class/net/bond* 2>/dev/null); do
bond_if=$(basename "$bond")
__NEW_GPU_ROOTPORT_FILE__
ibdev=$(ibdev2netdev 2>/dev/null | grep -w "$bond_if" | awk '{print $1}')
if [ -n "$ibdev" ] && ip link show "$bond_if" | grep -q "state UP" && \
ip a show "$bond_if" | grep -q "inet "; then
if $first; then
printf "KVCACHE_RDMA_NICS=%s" "$ibdev"
first=false
else
printf ",%s" "$ibdev"
fi
fi
bondib=$(show_gids 2>/dev/null | grep -w "$bond_if" | awk '{print $1}' | grep "mlx.*bond" | head -1)
if [ -n "$bondib" ] && ip link show "$bond_if" | grep -q "state UP" && \
ip a show "$bond_if" | grep -q "inet " && $first; then
printf "KVCACHE_RDMA_NICS=%s" "$bondib"
first=false
fi
__RM_GPU_ROOTPORT_FILE__
done
! $first && printf "\n"
[ ! $first ] && return 0
fi
local nic_types=("eth" "ib" "xgbe")
for nt in "${nic_types[@]}"; do
if ip a | grep -iq "$nt"; then
__NEW_GPU_ROOTPORT_FILE__
NICNAME_TYPE=$nt
__JUDGE_NIC_TYPE__
__RM_GPU_ROOTPORT_FILE__
fi
done
}
__main__

View File

@@ -28,6 +28,17 @@ def is_port_open(host: str, port: int, timeout=1.0):
return False
def _clean_cuda_process():
"""
Kill processes that are using CUDA devices.
NOTE: Do not call this function directly, use the `clean` function instead.
"""
try:
subprocess.run("fuser -k /dev/nvidia*", shell=True, timeout=5)
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
pass
def kill_process_on_port(port: int):
"""
Kill processes that are listening on the given port.
@@ -101,6 +112,19 @@ def clean_ports(ports=None):
time.sleep(1)
def clean(ports=None):
"""
Clean up resources used during testing.
"""
clean_ports(ports)
# Clean CUDA devices before and after tests.
# NOTE: It is dangerous to use this flag on development machines, as it may kill other processes
clean_cuda = int(os.getenv("CLEAN_CUDA", "0")) == 1
if clean_cuda:
_clean_cuda_process()
def check_service_health(base_url: str, timeout: int = 3) -> bool:
"""
Check the health status of a service.