[CI][XPU] add pd disaggregation (#5179)

* [CI][XPU] add pd disaggregation

* Clarify comments and install iproute2

Updated comments to clarify script purpose and added installation of iproute2.

---------

Co-authored-by: ddchenhao66 <dhaochen163.com>
Co-authored-by: Jiaxin Sui <95567040+plusNew001@users.noreply.github.com>
This commit is contained in:
ddchenhao66
2025-11-28 10:44:27 +08:00
committed by GitHub
parent aba4fc657f
commit fc88eebc32
2 changed files with 171 additions and 1 deletions

View File

@@ -2,8 +2,9 @@
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
echo "$DIR"
#安装lsof工具
#安装ci必要工具
apt install -y lsof
apt-get install -y iproute2
#先kill一遍
function stop_processes() {
@@ -297,6 +298,132 @@ if [ ${vl_test_exit_code} -ne 0 ]; then
exit 1
fi
echo "============================开始PD分离测试!============================"
export port_num=$((8188 + XPU_ID * 100))
# 起router
export FD_LOG_DIR="log_router"
mkdir -p ${FD_LOG_DIR}
nohup python -m fastdeploy.router.launch \
--port ${port_num} \
--splitwise \
2>&1 >${FD_LOG_DIR}/nohup &
sleep 1
# pd相关环境变量设置
export KVCACHE_GDRCOPY_FLUSH_ENABLE=1
export $(bash $DIR/get_rdma_nics.sh xpu)
echo "KVCACHE_RDMA_NICS:${KVCACHE_RDMA_NICS}"
export CUDA_ENABLE_P2P_NO_UVA=1 # 开启peer mem
if [ -z "${KVCACHE_RDMA_NICS}" ]; then
echo "KVCACHE_RDMA_NICS is empty, please check the output of get_rdma_nics.sh"
exit 1
fi
# 起P节点
export FD_LOG_DIR="log_prefill"
mkdir -p ${FD_LOG_DIR}
if [[ "$XPU_ID" == "0" ]]; then
export XPU_VISIBLE_DEVICES="0"
else
export XPU_VISIBLE_DEVICES="4"
fi
nohup python -m fastdeploy.entrypoints.openai.api_server \
--model ${MODEL_PATH}/ERNIE-4.5-0.3B-Paddle \
--port $((port_num+11)) \
--metrics-port $((port_num+12)) \
--engine-worker-queue-port $((port_num+13)) \
--cache-queue-port $((port_num+14)) \
--tensor-parallel-size 1 \
--max-model-len 32768 \
--splitwise-role "prefill" \
--cache-transfer-protocol "rdma" \
--rdma-comm-ports $((port_num+15)) \
--pd-comm-port $((port_num+16)) \
--router "0.0.0.0:$((port_num))" \
2>&1 >${FD_LOG_DIR}/nohup &
# 起D节点
export FD_LOG_DIR="log_decode"
mkdir -p ${FD_LOG_DIR}
if [[ "$XPU_ID" == "0" ]]; then
export XPU_VISIBLE_DEVICES="1"
else
export XPU_VISIBLE_DEVICES="5"
fi
nohup python -m fastdeploy.entrypoints.openai.api_server \
--model ${MODEL_PATH}/ERNIE-4.5-0.3B-Paddle \
--port $((port_num+21)) \
--metrics-port $((port_num+22)) \
--engine-worker-queue-port $((port_num+23)) \
--cache-queue-port $((port_num+24)) \
--tensor-parallel-size 1 \
--max-model-len 32768 \
--splitwise-role "decode" \
--cache-transfer-protocol "rdma" \
--rdma-comm-ports $((port_num+25)) \
--pd-comm-port $((port_num+26)) \
--router "0.0.0.0:${port_num}" \
2>&1 >${FD_LOG_DIR}/nohup &
sleep 60
# 探活
TIMEOUT=$((10 * 60))
INTERVAL=10 # 检查间隔(秒)
ENDPOINT_P="http://0.0.0.0:$((port_num+11))/health"
ENDPOINT_D="http://0.0.0.0:$((port_num+21))/health"
START_TIME=$(date +%s) # 记录开始时间戳
echo "开始服务健康检查,最长等待时间:${TIMEOUT}"
while true; do
# 计算已耗时
CURRENT_TIME=$(date +%s)
ELAPSED=$((CURRENT_TIME - START_TIME))
# 超时判断
if [ $ELAPSED -ge $TIMEOUT ]; then
echo -e "\n服务启动超时经过 $((TIMEOUT/60)) 分钟服务仍未启动!"
stop_processes
echo "log_prefill/nohup"
cat log_prefill/nohup
echo "log_decode/nohup"
cat log_decode/nohup
exit 1
fi
HTTP_CODE_P=$(curl -s -o /dev/null -w "%{http_code}" -m 2 "$ENDPOINT_P" || true)
HTTP_CODE_D=$(curl -s -o /dev/null -w "%{http_code}" -m 2 "$ENDPOINT_D" || true)
echo -e "\r服务健康检查中... 已等待 ${ELAPSED}当前状态码P节点:${HTTP_CODE_P}D节点:${HTTP_CODE_D}"
if [ "$HTTP_CODE_P" = "200" ] && [ "$HTTP_CODE_D" = "200" ]; then
echo -e "\n服务启动成功耗时 ${ELAPSED}"
break
else
sleep $INTERVAL
fi
done
# 执行服务化推理
python tests/ci_use/XPU_45T/run_pd.py
pd_test_exit_code=$?
echo pd_test_exit_code is ${pd_test_exit_code}
stop_processes >kill.log 2>&1
if [ ${pd_test_exit_code} -ne 0 ]; then
echo "log_prefill/nohup"
cat log_prefill/nohup
echo "log_decode/nohup"
cat log_decode/nohup
echo " pd分离模型 测试失败请检查pr代码"
exit 1
fi
# pd相关环境变量重置
unset KVCACHE_GDRCOPY_FLUSH_ENABLE
unset KVCACHE_RDMA_NICS
unset CUDA_ENABLE_P2P_NO_UVA
echo "============================开始 EP4TP4 在线服务测试!============================"
sleep 5

View File

@@ -0,0 +1,43 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import openai
def test_45t():
ip = "0.0.0.0"
xpu_id = int(os.getenv("XPU_ID", "0"))
service_http_port = 8188 + xpu_id * 100 # 服务配置的
client = openai.Client(base_url=f"http://{ip}:{service_http_port}/v1", api_key="EMPTY_API_KEY")
# base_response_110 = "你好!我是一个基于人工智能技术开发的助手,可以帮你解答问题、提供建议、聊天交流或者完成一些任务。无论是学习、工作还是生活中的疑问,都可以随时告诉我哦~😊 你有什么想聊的吗?"
# base_response_104 = "你好!我是一个基于人工智能技术打造的助手,可以帮你解答问题、提供建议、分享知识,或者陪你聊聊天~😊 无论是学习、工作、生活还是娱乐相关的问题,都可以随时告诉我哦!你今天有什么想聊的吗?"
# 非流式对话
response = client.chat.completions.create(
model="default",
messages=[
{"role": "user", "content": "你好,你是谁?"},
],
temperature=1,
top_p=0,
max_tokens=64,
stream=False,
)
print(response.choices[0].message.content)
# print(base_response)
assert any(keyword in response.choices[0].message.content for keyword in ["AI", "伙伴"])
if __name__ == "__main__":
test_45t()