From 8e0f4dfd0cf70a0a67aca59629089b4a193404bb Mon Sep 17 00:00:00 2001 From: Jiaxin Sui <95567040+plusNew001@users.noreply.github.com> Date: Tue, 2 Dec 2025 17:15:51 +0800 Subject: [PATCH] [XPU] [CI] Xpu Ci Refactor (#5252) * add xpu ci * add case * add case * fix ci bug * Update Docker image tag to 'latest' in CI workflow * Fix set -e usage in run_xpu_ci_pytest.sh * add pd case * add case * Configure pip to use Tsinghua mirror for dependencies Set the global pip index URL to Tsinghua mirror. * fix ci bug * fix bug * fix bug --------- Co-authored-by: suijiaxin Co-authored-by: root Co-authored-by: root --- .github/workflows/ci_xpu.yml | 2 +- scripts/run_xpu_ci_pytest.sh | 123 ++++++++ tests/cov_pytest.ini | 1 + tests/xpu_ci/README.md | 356 +++++++++++++++++++++++ tests/xpu_ci/conftest.py | 430 ++++++++++++++++++++++++++++ tests/xpu_ci/test_ep4tp1_online.py | 126 ++++++++ tests/xpu_ci/test_ep4tp4_all2all.py | 128 +++++++++ tests/xpu_ci/test_ep4tp4_online.py | 128 +++++++++ tests/xpu_ci/test_pd_separation.py | 330 +++++++++++++++++++++ tests/xpu_ci/test_v1_mode.py | 103 +++++++ tests/xpu_ci/test_vl_model.py | 117 ++++++++ tests/xpu_ci/test_w4a8.py | 101 +++++++ 12 files changed, 1944 insertions(+), 1 deletion(-) create mode 100644 scripts/run_xpu_ci_pytest.sh create mode 100644 tests/xpu_ci/README.md create mode 100644 tests/xpu_ci/conftest.py create mode 100644 tests/xpu_ci/test_ep4tp1_online.py create mode 100644 tests/xpu_ci/test_ep4tp4_all2all.py create mode 100644 tests/xpu_ci/test_ep4tp4_online.py create mode 100644 tests/xpu_ci/test_pd_separation.py create mode 100644 tests/xpu_ci/test_v1_mode.py create mode 100644 tests/xpu_ci/test_vl_model.py create mode 100644 tests/xpu_ci/test_w4a8.py diff --git a/.github/workflows/ci_xpu.yml b/.github/workflows/ci_xpu.yml index 9b9438361..86d4b7679 100644 --- a/.github/workflows/ci_xpu.yml +++ b/.github/workflows/ci_xpu.yml @@ -82,5 +82,5 @@ jobs: ${docker_image} /bin/bash -c " git config --global --add safe.directory /workspace/FastDeploy cd FastDeploy - bash scripts/run_ci_xpu.sh + bash scripts/run_xpu_ci_pytest.sh " diff --git a/scripts/run_xpu_ci_pytest.sh b/scripts/run_xpu_ci_pytest.sh new file mode 100644 index 000000000..f9163bcee --- /dev/null +++ b/scripts/run_xpu_ci_pytest.sh @@ -0,0 +1,123 @@ +#!/bin/bash +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# XPU CI测试入口脚本 - 基于pytest框架 +# +# 使用方法: +# bash scripts/run_xpu_ci_pytest.sh +# +# 环境变量: +# XPU_ID: XPU设备ID(0或1) +# MODEL_PATH: 模型路径 + +set +e +DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +echo "脚本目录: $DIR" + +# ============ 环境准备阶段 ============ + +echo "============================环境准备============================" + +# 安装lsof工具 +echo "安装lsof工具..." +apt install -y lsof + +# 设置XPU_VISIBLE_DEVICES +if [[ "$XPU_ID" == "0" ]]; then + export XPU_VISIBLE_DEVICES="0,1,2,3" +else + export XPU_VISIBLE_DEVICES="4,5,6,7" +fi +echo "XPU_VISIBLE_DEVICES=$XPU_VISIBLE_DEVICES" + +# 下载和安装xre +echo "下载和安装xre..." +mkdir -p /workspace/deps +cd /workspace/deps +if [ ! -d "xre" ]; then + wget -q https://klx-sdk-release-public.su.bcebos.com/xre/kl3-release/5.0.21.21/xre-Linux-x86_64-5.0.21.21.tar.gz + tar -zxf xre-Linux-x86_64-5.0.21.21.tar.gz && mv xre-Linux-x86_64-5.0.21.21 xre +fi +cd - +export PATH=/workspace/deps/xre/bin:$PATH + +# 重启XPU卡 +echo "重启XPU卡..." +xpu-smi -r -i $XPU_VISIBLE_DEVICES +xpu-smi +set -e +# ============ Python环境配置 ============ + +echo "============================Python环境配置============================" + +# 安装Python依赖 +echo "安装Python依赖..." +python -m pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple +python -m pip install -r requirements.txt + +# 卸载旧版本 +echo "卸载旧版本..." +python -m pip uninstall paddlepaddle-xpu -y +python -m pip uninstall fastdeploy-xpu -y + +# 安装PaddlePaddle +echo "安装PaddlePaddle..." +python -m pip install https://paddle-whl.bj.bcebos.com/nightly/xpu-p800/paddlepaddle-xpu/paddlepaddle_xpu-3.3.0.dev20251123-cp310-cp310-linux_x86_64.whl + +# ============ 编译项目 ============ + +echo "============================编译项目============================" +bash custom_ops/xpu_ops/download_dependencies.sh develop +export CLANG_PATH=$(pwd)/custom_ops/xpu_ops/third_party/xtdk +export XVLLM_PATH=$(pwd)/custom_ops/xpu_ops/third_party/xvllm +bash build.sh || exit 1 + +# ============ 安装测试依赖 ============ + +echo "============================安装测试依赖============================" +python -m pip install openai -U +python -m pip uninstall -y triton +python -m pip install triton==3.3.0 +python -m pip install pytest +python -m pip install pytest-timeout + +# 清除代理设置 +unset http_proxy +unset https_proxy +unset no_proxy + +# ============ 运行pytest测试 ============ + +echo "============================开始运行pytest测试============================" + +# 切换到项目根目录(如果不在的话) +cd "$(dirname "$DIR")" + +# 运行pytest +# -v: 详细输出 +# -s: 不捕获输出,直接显示print内容 +# --tb=short: 简短的traceback +# --junit-xml: 生成junit格式的测试报告 +python -m pytest -v -s --tb=short tests/xpu_ci/ + +# 获取pytest退出码 +exit_code=$? + +if [ $exit_code -eq 0 ]; then + echo "============================所有测试通过!============================" +else + echo "============================测试失败,请检查日志!============================" + exit $exit_code +fi diff --git a/tests/cov_pytest.ini b/tests/cov_pytest.ini index 9fe77b410..af774ec0d 100644 --- a/tests/cov_pytest.ini +++ b/tests/cov_pytest.ini @@ -7,3 +7,4 @@ addopts = --ignore=tests/operators/test_w4afp8_gemm.py --ignore=tests/model_loader/test_w4a8_model.py --ignore=tests/entrypoints/test_engine_client.py + --ignore=tests/xpu_ci diff --git a/tests/xpu_ci/README.md b/tests/xpu_ci/README.md new file mode 100644 index 000000000..52de5f901 --- /dev/null +++ b/tests/xpu_ci/README.md @@ -0,0 +1,356 @@ +# XPU CI 测试框架 + +基于pytest的XPU硬件CI测试框架,用于自动化测试FastDeploy在XPU硬件上的各种配置和模型。 + +## 目录结构 + +``` +tests/xpu_ci/ +├── conftest.py # pytest配置文件,包含通用函数和fixture +├── test_v1_mode.py # V1模式测试(wint4量化) +├── test_w4a8.py # W4A8量化测试 +├── test_vl_model.py # VL视觉语言模型测试 +├── test_ep4tp4_online.py # EP4TP4在线服务测试 +├── test_ep4tp1_online.py # EP4TP1在线服务测试 +└── test_ep4tp4_all2all.py # EP4TP4 all2all通信测试 +``` + +## 使用方法 + +### 运行所有测试 + +```bash +# 设置环境变量 +export XPU_ID=0 # 或 1 +export MODEL_PATH=/path/to/models + +# 运行CI测试 +bash scripts/run_xpu_ci_pytest.sh +``` + +### 运行单个测试 + +```bash +# 进入项目根目录 +cd /path/to/FastDeploy + +# 设置环境变量 +export XPU_ID=0 +export MODEL_PATH=/path/to/models + +# 运行单个测试 +python -m pytest -v -s tests/xpu_ci/test_v1_mode.py + +# 或者直接运行测试文件 +cd tests/xpu_ci +python test_v1_mode.py +``` + +### 运行指定的测试 + +```bash +# 运行多个测试 +python -m pytest -v -s \ + tests/xpu_ci/test_v1_mode.py \ + tests/xpu_ci/test_w4a8.py + +# 使用pytest的过滤功能 +python -m pytest -v -s -k "v1_mode or w4a8" tests/xpu_ci/ +``` + +## 添加新的测试Case + +### 步骤1: 创建新的测试文件 + +在 `tests/xpu_ci/` 目录下创建新的测试文件,文件名必须以 `test_` 开头,例如 `test_new_feature.py` + +### 步骤2: 编写测试代码 + +参考现有的测试case,复制一个最相似的测试文件作为模板。基本结构如下: + +```python +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# ... (许可证声明) + +""" +测试说明 - 简短描述这个测试的目的 + +测试配置: +- 模型: 模型名称 +- 量化: 量化方式 +- 其他重要配置 +""" + +import os +import pytest +import openai +from conftest import ( + get_port_num, + get_model_path, + start_server, + print_logs_on_failure, + xpu_env, +) + + +def test_new_feature(xpu_env): + """新功能测试""" + + print("\n============================开始新功能测试!============================") + + # 获取配置 + port_num = get_port_num() + model_path = get_model_path() + + # 构建服务器启动参数 + server_args = [ + "--model", f"{model_path}/YOUR_MODEL_NAME", + "--port", str(port_num), + # ... 其他参数 + ] + + # 启动服务器 + if not start_server(server_args): + pytest.fail("服务启动失败") + + # 执行测试 + try: + ip = "0.0.0.0" + client = openai.Client( + base_url=f"http://{ip}:{port_num}/v1", + api_key="EMPTY_API_KEY" + ) + + # 调用API进行测试 + response = client.chat.completions.create( + model="default", + messages=[ + {"role": "user", "content": "你好,你是谁?"}, + ], + temperature=1, + top_p=0, + max_tokens=64, + stream=False, + ) + + print(f"\n模型回复: {response.choices[0].message.content}") + + # 验证响应 + assert "预期的关键词" in response.choices[0].message.content + + print("\n新功能测试通过!") + + except Exception as e: + print(f"\n新功能测试失败: {str(e)}") + print_logs_on_failure() + pytest.fail(f"新功能测试失败: {str(e)}") + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"]) +``` + +### 步骤3: 添加到CI流程 + + `scripts/run_xpu_ci_pytest.sh`会自动扫描 tests/xpu_ci/ 目录下 test_ 开头的测试文件进行测试 + +### 步骤4: 测试验证 + +```bash +# 先单独运行新的测试case,确保能够正常工作 +python -m pytest -v -s tests/xpu_ci/test_new_feature.py + +# 然后运行完整的CI流程 +bash scripts/run_xpu_ci_pytest.sh +``` + +## 通用函数说明 + +在 `conftest.py` 中提供了以下通用函数,可以在测试中直接使用: + +### 基础配置函数 + +- `get_xpu_id()` - 获取XPU_ID环境变量 +- `get_port_num()` - 根据XPU_ID计算端口号 +- `get_model_path()` - 获取MODEL_PATH环境变量 + +### 进程管理函数 + +- `stop_processes()` - 停止所有相关进程 +- `cleanup_resources()` - 清理资源(log目录、core文件、消息队列) + +### 服务器管理函数 + +- `start_server(server_args, wait_before_check=60)` - 启动API服务器 + - `server_args`: 服务器启动参数列表 + - `wait_before_check`: 启动后等待多少秒再进行健康检查 + - 返回: bool,服务是否启动成功 + +- `wait_for_health_check(timeout=900, interval=10)` - 等待服务健康检查通过 + - `timeout`: 超时时间(秒) + - `interval`: 检查间隔(秒) + - 返回: bool,服务是否启动成功 + +### 日志函数 + +- `print_logs_on_failure()` - 失败时打印日志(server.log和workerlog.0) + +### EP并行测试函数 + +- `setup_ep_env()` - 设置EP(Expert Parallel)相关环境变量 + - 返回: dict,原始环境变量值,用于后续恢复 + +- `restore_env(original_values)` - 恢复环境变量 + - `original_values`: setup_ep_env()返回的原始环境变量值 + +- `download_and_build_xdeepep()` - 下载并编译xDeepEP(用于EP并行测试) + - 返回: bool,是否成功 + +### Pytest Fixture + +- `xpu_env` - 设置XPU环境变量的fixture + - 自动设置XPU_VISIBLE_DEVICES + - 测试结束后自动停止服务 + - 使用方法: 在测试函数参数中声明即可 + +## 测试Case模板 + +### 普通测试模板 + +用于不需要EP并行的测试: + +```python +def test_example(xpu_env): + """示例测试""" + print("\n============================开始示例测试!============================") + + port_num = get_port_num() + model_path = get_model_path() + + server_args = [ + "--model", f"{model_path}/YOUR_MODEL", + "--port", str(port_num), + # 添加其他参数... + ] + + if not start_server(server_args): + pytest.fail("服务启动失败") + + try: + # 执行测试逻辑 + client = openai.Client(base_url=f"http://0.0.0.0:{port_num}/v1", api_key="EMPTY_API_KEY") + response = client.chat.completions.create(...) + assert "预期结果" in response.choices[0].message.content + print("\n示例测试通过!") + except Exception as e: + print_logs_on_failure() + pytest.fail(f"测试失败: {str(e)}") +``` + +### EP并行测试模板 + +用于需要EP并行的测试: + +```python +def test_ep_example(xpu_env): + """EP并行示例测试""" + print("\n============================开始EP并行示例测试!============================") + + if not download_and_build_xdeepep(): + pytest.fail("xDeepEP下载或编译失败") + + original_env = setup_ep_env() + + try: + port_num = get_port_num() + model_path = get_model_path() + + server_args = [ + "--model", f"{model_path}/YOUR_MODEL", + "--enable-expert-parallel", + # 添加其他参数... + ] + + if not start_server(server_args): + pytest.fail("服务启动失败") + + # 执行测试逻辑 + client = openai.Client(base_url=f"http://0.0.0.0:{port_num}/v1", api_key="EMPTY_API_KEY") + response = client.chat.completions.create(...) + assert "预期结果" in response.choices[0].message.content + print("\nEP并行示例测试通过!") + except Exception as e: + print_logs_on_failure() + pytest.fail(f"测试失败: {str(e)}") + finally: + restore_env(original_env) +``` + +## 常见问题 + +### 1. 如何调试单个测试? + +```bash +# 使用pytest的调试选项 +python -m pytest -v -s --pdb tests/xpu_ci/test_xxx.py + +# 或者直接在代码中添加断点 +import pdb; pdb.set_trace() +``` + +### 2. 如何查看服务器日志? + +测试失败时会自动打印 `server.log` 和 `log/workerlog.0` 的内容。 +你也可以在测试运行时手动查看: + +```bash +tail -f server.log +tail -f log/workerlog.0 +``` + +### 3. 如何跳过某个测试? + +```python +@pytest.mark.skip(reason="暂时跳过此测试") +def test_example(xpu_env): + pass +``` + +### 4. 如何添加超时控制? + +```python +@pytest.mark.timeout(300) # 5分钟超时 +def test_example(xpu_env): + pass +``` + +## 与旧版本的对比 + +### 旧版本 (run_ci_xpu.sh) + +- 所有测试逻辑都在一个大的shell脚本中 +- 代码重复率高(每个测试都重复启动服务、健康检查等逻辑) +- 难以维护和扩展 +- 添加新测试需要修改主脚本 + +### 新版本 (基于pytest) + +- 每个测试case独立成文件 +- 通用逻辑抽象到conftest.py中 +- 易于维护和扩展 +- 添加新测试只需新建文件,无需修改主脚本(只需在run_xpu_ci_pytest.sh中添加文件名) +- 支持pytest的所有功能(参数化、fixture、插件等) + +## 注意事项 + +1. **环境变量**: 确保设置了 `XPU_ID` 和 `MODEL_PATH` 环境变量 +2. **端口冲突**: 每个测试会自动根据XPU_ID分配不同的端口,避免冲突 +3. **资源清理**: 使用 `xpu_env` fixture会自动清理资源,无需手动清理 +4. **测试顺序**: pytest会按文件名顺序执行测试,可以通过pytest参数调整 +5. **日志输出**: 使用 `-s` 参数可以看到print输出,方便调试 + +## 参考资料 + +- [pytest官方文档](https://docs.pytest.org/) +- [pytest fixture文档](https://docs.pytest.org/en/stable/fixture.html) +- [FastDeploy文档](https://github.com/PaddlePaddle/FastDeploy) diff --git a/tests/xpu_ci/conftest.py b/tests/xpu_ci/conftest.py new file mode 100644 index 000000000..0e1f60884 --- /dev/null +++ b/tests/xpu_ci/conftest.py @@ -0,0 +1,430 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +XPU CI测试框架 - 通用配置和辅助函数 + +这个文件包含了所有测试case共用的函数和fixture。 +主要功能: +1. 进程管理 - 启动和停止API服务器 +2. 健康检查 - 等待服务启动成功 +3. 资源清理 - 清理日志、core文件、消息队列等 +4. 环境配置 - 设置XPU相关环境变量 +""" + +import json +import os +import shutil +import subprocess +import time + +import pytest + + +def get_xpu_id(): + """获取XPU_ID环境变量""" + return int(os.getenv("XPU_ID", "0")) + + +def get_port_num(): + """根据XPU_ID计算端口号""" + xpu_id = get_xpu_id() + return 8188 + xpu_id * 100 + + +def stop_processes(): + """ + 停止所有相关进程(最小改动版,避免误杀 pytest) + """ + xpu_id = get_xpu_id() # noqa: F841 + port_num = get_port_num() + + # 获取 pytest 主进程 PID + try: + pytest_pids = subprocess.check_output("pgrep -f pytest || true", shell=True).decode().strip().split() + except subprocess.CalledProcessError: + pytest_pids = [] + + def safe_kill_cmd(cmd): + """执行 kill 命令,但排除 pytest 进程""" + try: + # 先执行命令获取到候选 PID(kill -9 替换成 cat) + list_cmd = cmd.replace("kill -9", "cat") + output = subprocess.check_output(list_cmd, shell=True, stderr=subprocess.DEVNULL).decode().strip().split() + + # 过滤:排除 pytest + safe_pids = [pid for pid in output if pid and pid not in pytest_pids] + + # 真正 kill + for pid in safe_pids: + subprocess.run(f"kill -9 {pid}", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except Exception: + pass + + commands = [ + "ps -efww | grep -E 'cache_transfer_manager.py' | grep -v grep | awk '{print $2}' | xargs echo", + "ps -efww | grep -E 'api_server' | grep -v grep | awk '{print $2}' | xargs echo", + f"ps -efww | grep -E '{port_num}' | grep -v grep | awk '{{print $2}}' | xargs echo", + f"lsof -t -i :{port_num} | xargs echo", + ] + + # Kill additional ports + for port in range(port_num + 10, port_num + 41): + commands.append(f"lsof -t -i :{port} | xargs echo") + + # Kill processes using netstat + commands.extend( + [ + f"netstat -tunlp 2>/dev/null | grep {port_num + 2} | awk '{{print $NF}}' | awk -F'/' '{{print $1}}' | xargs echo", + f"netstat -tunlp 2>/dev/null | grep {port_num + 2} | awk '{{print $(NF-1)}}' | cut -d/ -f1 | grep -E '^[0-9]+$' | xargs echo", + ] + ) + + for cmd in commands: + safe_kill_cmd(cmd) + + +def cleanup_resources(): + """ + 清理资源 + + 包括: + 1. 删除log目录 + 2. 删除core文件 + 3. 清空消息队列 + """ + # 删除log目录 + if os.path.exists("log"): + shutil.rmtree("log") + + # 删除core文件 + subprocess.run("rm -f core*", shell=True) + + # 清空消息队列 + subprocess.run( + "ipcrm --all=msg 2>/dev/null || true", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL + ) + + +def wait_for_health_check(timeout=900, interval=10): + """ + 等待服务健康检查通过 + + Args: + timeout: 超时时间(秒), 默认15分钟 + interval: 检查间隔(秒), 默认10秒 + + Returns: + bool: 服务是否启动成功 + """ + port_num = get_port_num() + health_endpoint = f"http://0.0.0.0:{port_num}/health" + models_endpoint = f"http://0.0.0.0:{port_num}/v1/models" + start_time = time.time() + + print(f"开始服务健康检查,最长等待时间:{timeout}秒") + + # 第一阶段: 等待 /health 返回 200 + while True: + elapsed = int(time.time() - start_time) + + # 超时判断 + if elapsed >= timeout: + print(f"\n服务启动超时:经过 {timeout//60} 分钟服务仍未启动!") + return False + + # 发送健康检查请求 + try: + result = subprocess.run( + f'curl -s -o /dev/null -w "%{{http_code}}" -m 2 {health_endpoint}', + shell=True, + capture_output=True, + text=True, + ) + http_code = result.stdout.strip() + except Exception: + http_code = "000" + + print(f"\r服务健康检查中... 已等待 {elapsed} 秒,当前状态码:{http_code}", end="", flush=True) + + if http_code == "200": + print(f"\n健康检查通过!耗时 {elapsed} 秒") + break + + time.sleep(interval) + + # 第二阶段: 等待 /v1/models 返回有效模型列表,确保模型完全就绪 + print("开始验证模型是否就绪...") + while True: + elapsed = int(time.time() - start_time) + + # 超时判断 + if elapsed >= timeout: + print(f"\n模型就绪超时:经过 {timeout//60} 分钟模型仍未就绪!") + return False + + # 检查模型列表 + try: + result = subprocess.run(f"curl -s -m 5 {models_endpoint}", shell=True, capture_output=True, text=True) + response = result.stdout.strip() + if response: + data = json.loads(response) + # 检查是否有模型数据 + if data.get("data") and len(data["data"]) > 0: + model_id = data["data"][0].get("id", "unknown") + print(f"\n模型就绪!模型ID: {model_id}, 总耗时 {elapsed} 秒") + return True + except (json.JSONDecodeError, Exception) as e: # noqa: F841 + pass + + print(f"\r等待模型就绪中... 已等待 {elapsed} 秒", end="", flush=True) + time.sleep(interval) + + +def print_logs_on_failure(): + """失败时打印日志""" + print("\n========== server.log ==========") + if os.path.exists("server.log"): + with open("server.log", "r") as f: + print(f.read()) + + print("\n========== log/workerlog.0 ==========") + if os.path.exists("log/workerlog.0"): + with open("log/workerlog.0", "r") as f: + print(f.read()) + + +def start_server(server_args, wait_before_check=60): + """ + 启动API服务器 + + Args: + server_args: 服务器启动参数列表 + wait_before_check: 启动后等待多少秒再进行健康检查,默认60秒 + + Returns: + bool: 服务是否启动成功 + """ + # 停止旧进程 + stop_processes() + + # 清理资源 + cleanup_resources() + + # 构建启动命令 + cmd = ["python", "-m", "fastdeploy.entrypoints.openai.api_server"] + server_args + + # 启动服务(后台运行) + with open("server.log", "w") as log_file: + subprocess.Popen(cmd, stdout=log_file, stderr=subprocess.STDOUT, start_new_session=True) + + print(f"服务启动命令: {' '.join(cmd)}") + print(f"等待 {wait_before_check} 秒...") + time.sleep(wait_before_check) + + # 健康检查 + if not wait_for_health_check(): + print_logs_on_failure() + stop_processes() + return False + + return True + + +@pytest.fixture(scope="function") +def xpu_env(): + """ + 设置XPU环境变量 + + 这个fixture会在每个测试开始时设置XPU_VISIBLE_DEVICES环境变量 + 测试结束后自动清理 + """ + xpu_id = get_xpu_id() + + # 设置XPU_VISIBLE_DEVICES + if xpu_id == 0: + os.environ["XPU_VISIBLE_DEVICES"] = "0,1,2,3" + else: + os.environ["XPU_VISIBLE_DEVICES"] = "4,5,6,7" + + print(f"\n设置环境变量: XPU_VISIBLE_DEVICES={os.environ['XPU_VISIBLE_DEVICES']}") + + yield + + # 测试结束后停止进程 + print("\n测试结束,停止服务...") + stop_processes() + + +def get_model_path(): + """获取MODEL_PATH环境变量""" + model_path = os.getenv("MODEL_PATH") + if not model_path: + raise ValueError("MODEL_PATH environment variable is not set") + return model_path + + +def setup_ep_env(): + """ + 设置EP(Expert Parallel)相关环境变量 + + Returns: + dict: 原始环境变量值,用于后续恢复 + """ + env_vars = { + "BKCL_ENABLE_XDR": "1", + "BKCL_RDMA_NICS": "xgbe1,xgbe2,xgbe3,xgbe4", + "BKCL_TRACE_TOPO": "1", + "BKCL_PCIE_RING": "1", + "XSHMEM_MODE": "1", + "XSHMEM_QP_NUM_PER_RANK": "32", + "BKCL_RDMA_VERBS": "1", + } + + # 保存原始值 + original_values = {} + for key in env_vars: + original_values[key] = os.environ.get(key) + + # 设置新值 + for key, value in env_vars.items(): + os.environ[key] = value + print(f"设置环境变量: {key}={value}") + + return original_values + + +def restore_env(original_values): + """ + 恢复环境变量 + + Args: + original_values: setup_ep_env()返回的原始环境变量值 + """ + for key, value in original_values.items(): + if value is None: + if key in os.environ: + del os.environ[key] + print(f"删除环境变量: {key}") + else: + os.environ[key] = value + print(f"恢复环境变量: {key}={value}") + + +def download_and_build_xdeepep(): + """下载并编译xDeepEP(用于EP并行测试)""" + if os.path.exists("xDeepEP"): + print("xDeepEP已存在,跳过下载") + return True + + print("下载xDeepEP...") + result = subprocess.run("wget -q https://paddle-qa.bj.bcebos.com/xpu_third_party/xDeepEP.tar.gz", shell=True) + if result.returncode != 0: + print("下载xDeepEP失败") + return False + + print("解压xDeepEP...") + result = subprocess.run("tar -xzf xDeepEP.tar.gz", shell=True) + if result.returncode != 0: + print("解压xDeepEP失败") + return False + + print("编译xDeepEP...") + result = subprocess.run("cd xDeepEP && bash build.sh && cd -", shell=True) + if result.returncode != 0: + print("编译xDeepEP失败") + return False + + return True + + +# ============ PD分离相关函数 ============ + + +def get_script_dir(): + """获取scripts目录路径""" + # conftest.py在tests/xpu_ci_pytest/下,scripts在项目根目录下 + current_dir = os.path.dirname(os.path.abspath(__file__)) + project_root = os.path.dirname(os.path.dirname(current_dir)) + return os.path.join(project_root, "scripts") + + +def get_rdma_nics(): + """ + 获取RDMA网卡配置 + + Returns: + str: KVCACHE_RDMA_NICS的值,失败返回空字符串 + """ + script_path = os.path.join(get_script_dir(), "get_rdma_nics.sh") + + try: + result = subprocess.run(f"bash {script_path} xpu", shell=True, capture_output=True, text=True) + output = result.stdout.strip() + # 解析 KVCACHE_RDMA_NICS=xxx 格式 + if output.startswith("KVCACHE_RDMA_NICS="): + return output.split("=", 1)[1] + return output + except Exception as e: + print(f"获取RDMA网卡失败: {e}") + return "" + + +def setup_pd_env(): + """ + 设置PD分离相关环境变量 + + Returns: + dict: 原始环境变量值,用于后续恢复 + """ + original_values = {} + env_keys = ["KVCACHE_GDRCOPY_FLUSH_ENABLE", "KVCACHE_RDMA_NICS", "CUDA_ENABLE_P2P_NO_UVA"] + + # 保存原始值 + for key in env_keys: + original_values[key] = os.environ.get(key) + + # 设置新值 + os.environ["KVCACHE_GDRCOPY_FLUSH_ENABLE"] = "1" + os.environ["CUDA_ENABLE_P2P_NO_UVA"] = "1" # 开启peer mem + print("设置环境变量: KVCACHE_GDRCOPY_FLUSH_ENABLE=1") + print("设置环境变量: CUDA_ENABLE_P2P_NO_UVA=1") + + # 获取并设置RDMA网卡 + rdma_nics = get_rdma_nics() + if rdma_nics: + os.environ["KVCACHE_RDMA_NICS"] = rdma_nics + print(f"设置环境变量: KVCACHE_RDMA_NICS={rdma_nics}") + + return original_values + + +def restore_pd_env(original_values): + """ + 恢复PD分离相关环境变量 + + Args: + original_values: setup_pd_env()返回的原始环境变量值 + """ + env_keys = ["KVCACHE_GDRCOPY_FLUSH_ENABLE", "KVCACHE_RDMA_NICS", "CUDA_ENABLE_P2P_NO_UVA"] + + for key in env_keys: + if key in original_values: + if original_values[key] is None: + if key in os.environ: + del os.environ[key] + print(f"删除环境变量: {key}") + else: + os.environ[key] = original_values[key] + print(f"恢复环境变量: {key}={original_values[key]}") diff --git a/tests/xpu_ci/test_ep4tp1_online.py b/tests/xpu_ci/test_ep4tp1_online.py new file mode 100644 index 000000000..e83ba2275 --- /dev/null +++ b/tests/xpu_ci/test_ep4tp1_online.py @@ -0,0 +1,126 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +EP4TP1在线服务测试 - Expert Parallel + Tensor Parallel + +测试配置: +- 模型: ERNIE-4.5-300B-A47B-Paddle +- 量化: wint4 +- Tensor Parallel: 1 +- Expert Parallel: 启用 +- Data Parallel: 4 +""" + + +import openai +import pytest +from conftest import ( + download_and_build_xdeepep, + get_model_path, + get_port_num, + print_logs_on_failure, + restore_env, + setup_ep_env, + start_server, +) + + +def test_ep4tp1_online(xpu_env): + """EP4TP1在线服务测试""" + + print("\n============================开始 EP4TP1 在线服务测试!============================") + + # 下载并编译xDeepEP + if not download_and_build_xdeepep(): + pytest.fail("xDeepEP下载或编译失败") + + # 设置EP环境变量 + original_env = setup_ep_env() + + try: + # 获取配置 + port_num = get_port_num() + model_path = get_model_path() + + # 构建服务器启动参数 + server_args = [ + "--model", + f"{model_path}/ERNIE-4.5-300B-A47B-Paddle", + "--port", + str(port_num), + "--tensor-parallel-size", + "1", + "--enable-expert-parallel", + "--data-parallel-size", + "4", + "--max-model-len", + "32768", + "--max-num-seqs", + "64", + "--quantization", + "wint4", + "--engine-worker-queue-port", + f"{port_num + 10},{port_num + 20},{port_num + 30},{port_num + 40}", + "--metrics-port", + str(port_num + 2), + "--cache-queue-port", + str(port_num + 47873), + "--gpu-memory-utilization", + "0.9", + "--load-choices", + "default", + ] + + # 启动服务器 + if not start_server(server_args): + pytest.fail("EP4TP1在线服务启动失败") + + # 执行测试 + ip = "0.0.0.0" + client = openai.Client(base_url=f"http://{ip}:{port_num}/v1", api_key="EMPTY_API_KEY") + + # 非流式对话 + response = client.chat.completions.create( + model="default", + messages=[ + {"role": "user", "content": "你好,你是谁?"}, + ], + temperature=1, + top_p=0, + max_tokens=64, + stream=False, + ) + + print(f"\n模型回复: {response.choices[0].message.content}") + + # 验证响应 + assert any( + keyword in response.choices[0].message.content for keyword in ["人工智能", "文心一言", "百度", "智能助手"] + ), f"响应内容不符合预期: {response.choices[0].message.content}" + + print("\nEP4TP1在线服务测试通过!") + + except Exception as e: + print(f"\nEP4TP1在线服务测试失败: {str(e)}") + print_logs_on_failure() + pytest.fail(f"EP4TP1在线服务测试失败: {str(e)}") + + finally: + # 恢复环境变量 + restore_env(original_env) + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"]) diff --git a/tests/xpu_ci/test_ep4tp4_all2all.py b/tests/xpu_ci/test_ep4tp4_all2all.py new file mode 100644 index 000000000..e8e42aa22 --- /dev/null +++ b/tests/xpu_ci/test_ep4tp4_all2all.py @@ -0,0 +1,128 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +EP4TP4 all2all测试 - Expert Parallel + Tensor Parallel (all2all通信) + +测试配置: +- 模型: ERNIE-4.5-300B-A47B-Paddle +- 量化: wint4 +- Tensor Parallel: 4 +- Expert Parallel: 启用 +- Data Parallel: 1 +- 注意: 不使用 --disable-sequence-parallel-moe,启用all2all通信 +""" + + +import openai +import pytest +from conftest import ( + download_and_build_xdeepep, + get_model_path, + get_port_num, + print_logs_on_failure, + restore_env, + setup_ep_env, + start_server, +) + + +def test_ep4tp4_all2all(xpu_env): + """EP4TP4 all2all通信测试""" + + print("\n============================开始 EP4TP4 all2all 测试!============================") + + # 下载并编译xDeepEP + if not download_and_build_xdeepep(): + pytest.fail("xDeepEP下载或编译失败") + + # 设置EP环境变量 + original_env = setup_ep_env() + + try: + # 获取配置 + port_num = get_port_num() + model_path = get_model_path() + + # 构建服务器启动参数 + # 注意: 与EP4TP4 online相比,这里不使用 --disable-sequence-parallel-moe + server_args = [ + "--model", + f"{model_path}/ERNIE-4.5-300B-A47B-Paddle", + "--port", + str(port_num), + "--tensor-parallel-size", + "4", + "--enable-expert-parallel", + "--data-parallel-size", + "1", + "--max-model-len", + "32768", + "--max-num-seqs", + "64", + "--quantization", + "wint4", + "--engine-worker-queue-port", + str(port_num + 10), + "--metrics-port", + str(port_num + 2), + "--cache-queue-port", + str(port_num + 47873), + "--gpu-memory-utilization", + "0.9", + "--load-choices", + "default", + ] + + # 启动服务器 + if not start_server(server_args): + pytest.fail("EP4TP4 all2all服务启动失败") + + # 执行测试 + ip = "0.0.0.0" + client = openai.Client(base_url=f"http://{ip}:{port_num}/v1", api_key="EMPTY_API_KEY") + + # 非流式对话 + response = client.chat.completions.create( + model="default", + messages=[ + {"role": "user", "content": "你好,你是谁?"}, + ], + temperature=1, + top_p=0, + max_tokens=64, + stream=False, + ) + + print(f"\n模型回复: {response.choices[0].message.content}") + + # 验证响应 + assert any( + keyword in response.choices[0].message.content for keyword in ["人工智能", "文心一言", "百度", "智能助手"] + ), f"响应内容不符合预期: {response.choices[0].message.content}" + + print("\nEP4TP4 all2all测试通过!") + + except Exception as e: + print(f"\nEP4TP4 all2all测试失败: {str(e)}") + print_logs_on_failure() + pytest.fail(f"EP4TP4 all2all测试失败: {str(e)}") + + finally: + # 恢复环境变量 + restore_env(original_env) + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"]) diff --git a/tests/xpu_ci/test_ep4tp4_online.py b/tests/xpu_ci/test_ep4tp4_online.py new file mode 100644 index 000000000..4c6cc5fc1 --- /dev/null +++ b/tests/xpu_ci/test_ep4tp4_online.py @@ -0,0 +1,128 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +EP4TP4在线服务测试 - Expert Parallel + Tensor Parallel + +测试配置: +- 模型: ERNIE-4.5-300B-A47B-Paddle +- 量化: wint4 +- Tensor Parallel: 4 +- Expert Parallel: 启用 +- Data Parallel: 1 +- 特性: disable-sequence-parallel-moe +""" + + +import openai +import pytest +from conftest import ( + download_and_build_xdeepep, + get_model_path, + get_port_num, + print_logs_on_failure, + restore_env, + setup_ep_env, + start_server, +) + + +def test_ep4tp4_online(xpu_env): + """EP4TP4在线服务测试""" + + print("\n============================开始 EP4TP4 在线服务测试!============================") + + # 下载并编译xDeepEP + if not download_and_build_xdeepep(): + pytest.fail("xDeepEP下载或编译失败") + + # 设置EP环境变量 + original_env = setup_ep_env() + + try: + # 获取配置 + port_num = get_port_num() + model_path = get_model_path() + + # 构建服务器启动参数 + server_args = [ + "--model", + f"{model_path}/ERNIE-4.5-300B-A47B-Paddle", + "--port", + str(port_num), + "--tensor-parallel-size", + "4", + "--enable-expert-parallel", + "--data-parallel-size", + "1", + "--max-model-len", + "32768", + "--max-num-seqs", + "64", + "--quantization", + "wint4", + "--engine-worker-queue-port", + str(port_num + 10), + "--metrics-port", + str(port_num + 2), + "--cache-queue-port", + str(port_num + 47873), + "--disable-sequence-parallel-moe", + "--gpu-memory-utilization", + "0.9", + "--load-choices", + "default", + ] + + # 启动服务器 + if not start_server(server_args): + pytest.fail("EP4TP4在线服务启动失败") + + # 执行测试 + ip = "0.0.0.0" + client = openai.Client(base_url=f"http://{ip}:{port_num}/v1", api_key="EMPTY_API_KEY") + + # 非流式对话 + response = client.chat.completions.create( + model="default", + messages=[ + {"role": "user", "content": "你好,你是谁?"}, + ], + temperature=1, + top_p=0, + max_tokens=64, + stream=False, + ) + + print(f"\n模型回复: {response.choices[0].message.content}") + + # 验证响应 + assert any( + keyword in response.choices[0].message.content for keyword in ["人工智能", "文心一言", "百度", "智能助手"] + ), f"响应内容不符合预期: {response.choices[0].message.content}" + + print("\nEP4TP4在线服务测试通过!") + + except Exception as e: + print(f"\nEP4TP4在线服务测试失败: {str(e)}") + print_logs_on_failure() + pytest.fail(f"EP4TP4在线服务测试失败: {str(e)}") + + finally: + # 恢复环境变量 + restore_env(original_env) + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"]) diff --git a/tests/xpu_ci/test_pd_separation.py b/tests/xpu_ci/test_pd_separation.py new file mode 100644 index 000000000..05b215186 --- /dev/null +++ b/tests/xpu_ci/test_pd_separation.py @@ -0,0 +1,330 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +PD分离测试 - Prefill/Decode分离部署模式 + +测试配置: +- 模型: ERNIE-4.5-0.3B-Paddle +- Tensor Parallel: 1 +- 特性: splitwise PD分离, RDMA cache传输 +- 节点: Router + Prefill节点 + Decode节点 +""" + +import os +import shutil +import subprocess +import time + +import openai +import pytest +from conftest import ( + cleanup_resources, + get_model_path, + get_port_num, + get_xpu_id, + restore_pd_env, + setup_pd_env, + stop_processes, +) + + +def wait_for_pd_health_check(port_p, port_d, timeout=600, interval=10): + """ + 等待PD分离服务健康检查通过(检查P节点和D节点) + + Args: + port_p: Prefill节点端口 + port_d: Decode节点端口 + timeout: 超时时间(秒), 默认10分钟 + interval: 检查间隔(秒), 默认10秒 + + Returns: + bool: 服务是否启动成功 + """ + endpoint_p = f"http://0.0.0.0:{port_p}/health" + endpoint_d = f"http://0.0.0.0:{port_d}/health" + start_time = time.time() + + print(f"开始PD分离服务健康检查,最长等待时间:{timeout}秒") + + while True: + elapsed = int(time.time() - start_time) + + # 超时判断 + if elapsed >= timeout: + print(f"\nPD分离服务启动超时:经过 {timeout//60} 分钟服务仍未启动!") + return False + + # 检查P节点 + try: + result_p = subprocess.run( + f'curl -s -o /dev/null -w "%{{http_code}}" -m 2 {endpoint_p}', + shell=True, + capture_output=True, + text=True, + ) + http_code_p = result_p.stdout.strip() + except Exception: + http_code_p = "000" + + # 检查D节点 + try: + result_d = subprocess.run( + f'curl -s -o /dev/null -w "%{{http_code}}" -m 2 {endpoint_d}', + shell=True, + capture_output=True, + text=True, + ) + http_code_d = result_d.stdout.strip() + except Exception: + http_code_d = "000" + + print( + f"\r服务健康检查中... 已等待 {elapsed} 秒,P节点状态码:{http_code_p},D节点状态码:{http_code_d}", + end="", + flush=True, + ) + + if http_code_p == "200" and http_code_d == "200": + print(f"\nPD分离服务启动成功!耗时 {elapsed} 秒") + return True + + time.sleep(interval) + + +def print_pd_logs_on_failure(): + """失败时打印PD分离相关日志""" + log_dirs = ["log_router", "log_prefill", "log_decode"] + + for log_dir in log_dirs: + nohup_path = os.path.join(log_dir, "nohup") + if os.path.exists(nohup_path): + print(f"\n========== {nohup_path} ==========") + with open(nohup_path, "r") as f: + print(f.read()) + + +def start_pd_server(model_path, port_num, wait_before_check=60): + """ + 启动PD分离服务(Router + Prefill节点 + Decode节点) + + Args: + model_path: 模型路径 + port_num: 基础端口号 + wait_before_check: 启动后等待多少秒再进行健康检查,默认60秒 + + Returns: + bool: 服务是否启动成功 + """ + xpu_id = get_xpu_id() + + # 停止旧进程 + stop_processes() + + # 清理资源 + cleanup_resources() + + # 清理并创建日志目录 + for log_dir in ["log_router", "log_prefill", "log_decode"]: + if os.path.exists(log_dir): + shutil.rmtree(log_dir) + os.makedirs(log_dir, exist_ok=True) + + # 1. 启动Router + print("启动Router...") + router_env = os.environ.copy() + router_env["FD_LOG_DIR"] = "log_router" + router_cmd = [ + "python", + "-m", + "fastdeploy.router.launch", + "--port", + str(port_num), + "--splitwise", + ] + + with open("log_router/nohup", "w") as log_file: + subprocess.Popen(router_cmd, stdout=log_file, stderr=subprocess.STDOUT, start_new_session=True, env=router_env) + print(f"Router启动命令: {' '.join(router_cmd)}") + time.sleep(1) + + # 2. 启动Prefill节点 + print("启动Prefill节点...") + prefill_env = os.environ.copy() + prefill_env["FD_LOG_DIR"] = "log_prefill" + if xpu_id == 0: + prefill_env["XPU_VISIBLE_DEVICES"] = "0" + else: + prefill_env["XPU_VISIBLE_DEVICES"] = "4" + + prefill_cmd = [ + "python", + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + f"{model_path}/ERNIE-4.5-0.3B-Paddle", + "--port", + str(port_num + 11), + "--metrics-port", + str(port_num + 12), + "--engine-worker-queue-port", + str(port_num + 13), + "--cache-queue-port", + str(port_num + 14), + "--tensor-parallel-size", + "1", + "--max-model-len", + "32768", + "--splitwise-role", + "prefill", + "--cache-transfer-protocol", + "rdma", + "--rdma-comm-ports", + str(port_num + 15), + "--pd-comm-port", + str(port_num + 16), + "--router", + f"0.0.0.0:{port_num}", + ] + + with open("log_prefill/nohup", "w") as log_file: + subprocess.Popen( + prefill_cmd, stdout=log_file, stderr=subprocess.STDOUT, start_new_session=True, env=prefill_env + ) + print(f"Prefill节点启动命令: {' '.join(prefill_cmd)}") + + # 3. 启动Decode节点 + print("启动Decode节点...") + decode_env = os.environ.copy() + decode_env["FD_LOG_DIR"] = "log_decode" + if xpu_id == 0: + decode_env["XPU_VISIBLE_DEVICES"] = "1" + else: + decode_env["XPU_VISIBLE_DEVICES"] = "5" + + decode_cmd = [ + "python", + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + f"{model_path}/ERNIE-4.5-0.3B-Paddle", + "--port", + str(port_num + 21), + "--metrics-port", + str(port_num + 22), + "--engine-worker-queue-port", + str(port_num + 23), + "--cache-queue-port", + str(port_num + 24), + "--tensor-parallel-size", + "1", + "--max-model-len", + "32768", + "--splitwise-role", + "decode", + "--cache-transfer-protocol", + "rdma", + "--rdma-comm-ports", + str(port_num + 25), + "--pd-comm-port", + str(port_num + 26), + "--router", + f"0.0.0.0:{port_num}", + ] + + with open("log_decode/nohup", "w") as log_file: + subprocess.Popen(decode_cmd, stdout=log_file, stderr=subprocess.STDOUT, start_new_session=True, env=decode_env) + print(f"Decode节点启动命令: {' '.join(decode_cmd)}") + + # 等待服务启动 + print(f"等待 {wait_before_check} 秒让服务初始化...") + time.sleep(wait_before_check) + + # 健康检查(检查P节点和D节点) + port_p = port_num + 11 + port_d = port_num + 21 + + if not wait_for_pd_health_check(port_p, port_d): + print_pd_logs_on_failure() + stop_processes() + return False + + return True + + +def test_pd_separation(): + """PD分离部署模式测试""" + + print("\n============================开始PD分离测试!============================") + + # 设置PD分离环境变量 + original_env = setup_pd_env() + + # 检查RDMA网卡是否配置成功 + rdma_nics = os.environ.get("KVCACHE_RDMA_NICS", "") + if not rdma_nics: + pytest.fail("KVCACHE_RDMA_NICS is empty, please check the output of get_rdma_nics.sh") + print(f"KVCACHE_RDMA_NICS: {rdma_nics}") + + try: + # 获取配置 + port_num = get_port_num() + model_path = get_model_path() + + # 启动PD分离服务 + if not start_pd_server(model_path, port_num): + pytest.fail("PD分离服务启动失败") + + # 执行测试 - 通过Router端口访问 + ip = "0.0.0.0" + client = openai.Client(base_url=f"http://{ip}:{port_num}/v1", api_key="EMPTY_API_KEY") + + # 非流式对话 + response = client.chat.completions.create( + model="default", + messages=[ + {"role": "user", "content": "你好,你是谁?"}, + ], + temperature=1, + top_p=0, + max_tokens=64, + stream=False, + ) + + print(f"\n模型回复: {response.choices[0].message.content}") + + # 验证响应 + assert any( + keyword in response.choices[0].message.content for keyword in ["AI", "伙伴"] + ), f"响应内容不符合预期: {response.choices[0].message.content}" + + print("\nPD分离测试通过!") + + except Exception as e: + print(f"\nPD分离测试失败: {str(e)}") + print_pd_logs_on_failure() + pytest.fail(f"PD分离测试失败: {str(e)}") + + finally: + # 停止服务 + print("\n停止PD分离服务...") + stop_processes() + + # 恢复环境变量 + restore_pd_env(original_env) + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"]) diff --git a/tests/xpu_ci/test_v1_mode.py b/tests/xpu_ci/test_v1_mode.py new file mode 100644 index 000000000..5ca7fbe0c --- /dev/null +++ b/tests/xpu_ci/test_v1_mode.py @@ -0,0 +1,103 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +V1模式测试 - ERNIE-4.5-300B-A47B 模型 + +测试配置: +- 模型: ERNIE-4.5-300B-A47B-Paddle +- 量化: wint4 +- Tensor Parallel: 4 +- 特性: enable-prefix-caching, enable-chunked-prefill +""" + + +import openai +import pytest +from conftest import get_model_path, get_port_num, print_logs_on_failure, start_server + + +def test_v1_mode(xpu_env): + """V1模式测试""" + + print("\n============================开始V1模式测试!============================") + + # 获取配置 + port_num = get_port_num() + model_path = get_model_path() + + # 构建服务器启动参数 + server_args = [ + "--model", + f"{model_path}/ERNIE-4.5-300B-A47B-Paddle", + "--port", + str(port_num), + "--engine-worker-queue-port", + str(port_num + 1), + "--metrics-port", + str(port_num + 2), + "--cache-queue-port", + str(port_num + 47873), + "--tensor-parallel-size", + "4", + "--num-gpu-blocks-override", + "16384", + "--max-model-len", + "32768", + "--max-num-seqs", + "128", + "--quantization", + "wint4", + "--enable-prefix-caching", + "--enable-chunked-prefill", + ] + + # 启动服务器 + if not start_server(server_args): + pytest.fail("V1模式服务启动失败") + + # 执行测试 + try: + ip = "0.0.0.0" + client = openai.Client(base_url=f"http://{ip}:{port_num}/v1", api_key="EMPTY_API_KEY") + + # 非流式对话 + response = client.chat.completions.create( + model="default", + messages=[ + {"role": "user", "content": "你好,你是谁?"}, + ], + temperature=1, + top_p=0, + max_tokens=64, + stream=False, + ) + + print(f"\n模型回复: {response.choices[0].message.content}") + + # 验证响应 + assert any( + keyword in response.choices[0].message.content for keyword in ["人工智能", "文心一言", "百度", "智能助手"] + ), f"响应内容不符合预期: {response.choices[0].message.content}" + + print("\nV1模式测试通过!") + + except Exception as e: + print(f"\nV1模式测试失败: {str(e)}") + print_logs_on_failure() + pytest.fail(f"V1模式测试失败: {str(e)}") + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"]) diff --git a/tests/xpu_ci/test_vl_model.py b/tests/xpu_ci/test_vl_model.py new file mode 100644 index 000000000..5785c7e22 --- /dev/null +++ b/tests/xpu_ci/test_vl_model.py @@ -0,0 +1,117 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +VL模型测试 - ERNIE-4.5-VL-28B 视觉语言模型 + +测试配置: +- 模型: ERNIE-4.5-VL-28B-A3B-Thinking +- 量化: wint8 +- Tensor Parallel: 4 +- 特性: reasoning-parser, tool-call-parser, enable-chunked-prefill +""" + + +import openai +import pytest +from conftest import get_model_path, get_port_num, print_logs_on_failure, start_server + + +def test_vl_model(xpu_env): + """VL视觉语言模型测试""" + + print("\n============================开始vl模型测试!============================") + + # 获取配置 + port_num = get_port_num() + model_path = get_model_path() + + # 构建服务器启动参数 + server_args = [ + "--model", + f"{model_path}/ERNIE-4.5-VL-28B-A3B-Thinking", + "--port", + str(port_num), + "--engine-worker-queue-port", + str(port_num + 1), + "--metrics-port", + str(port_num + 2), + "--cache-queue-port", + str(port_num + 47873), + "--tensor-parallel-size", + "4", + "--max-model-len", + "32768", + "--max-num-seqs", + "32", + "--quantization", + "wint8", + "--reasoning-parser", + "ernie-45-vl-thinking", + "--tool-call-parser", + "ernie-45-vl-thinking", + "--mm-processor-kwargs", + '{"image_max_pixels": 12845056 }', + "--enable-chunked-prefill", + ] + + # 启动服务器 + if not start_server(server_args): + pytest.fail("VL模型服务启动失败") + + # 执行测试 + try: + ip = "0.0.0.0" + client = openai.Client(base_url=f"http://{ip}:{port_num}/v1", api_key="EMPTY_API_KEY") + + # 非流式对话(带图像) + response = client.chat.completions.create( + model="default", + messages=[ + { + "role": "user", + "content": [ + { + "type": "image_url", + "image_url": { + "url": "https://paddlenlp.bj.bcebos.com/datasets/paddlemix/demo_images/example2.jpg" + }, + }, + {"type": "text", "text": "图片中的文物来自哪个时代?"}, + ], + }, + ], + temperature=1, + top_p=0, + max_tokens=70, + stream=False, + ) + + print(f"\n模型回复: {response.choices[0].message.content}") + + # 验证响应 + assert any( + keyword in response.choices[0].message.content for keyword in ["北魏", "北齐", "释迦牟尼", "北朝"] + ), f"响应内容不符合预期: {response.choices[0].message.content}" + + print("\nVL模型测试通过!") + + except Exception as e: + print(f"\nVL模型测试失败: {str(e)}") + print_logs_on_failure() + pytest.fail(f"VL模型测试失败: {str(e)}") + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"]) diff --git a/tests/xpu_ci/test_w4a8.py b/tests/xpu_ci/test_w4a8.py new file mode 100644 index 000000000..6c3d346da --- /dev/null +++ b/tests/xpu_ci/test_w4a8.py @@ -0,0 +1,101 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +W4A8模式测试 - ERNIE-4.5-300B W4A8C8量化模型 + +测试配置: +- 模型: ERNIE-4.5-300B-A47B-W4A8C8-TP4-Paddle +- 量化: W4A8 +- Tensor Parallel: 4 +""" + + +import openai +import pytest +from conftest import get_model_path, get_port_num, print_logs_on_failure, start_server + + +def test_w4a8(xpu_env): + """W4A8量化模式测试""" + + print("\n============================开始W4A8测试!============================") + + # 获取配置 + port_num = get_port_num() + model_path = get_model_path() + + # 构建服务器启动参数 + server_args = [ + "--model", + f"{model_path}/ERNIE-4.5-300B-A47B-W4A8C8-TP4-Paddle", + "--port", + str(port_num), + "--engine-worker-queue-port", + str(port_num + 1), + "--metrics-port", + str(port_num + 2), + "--cache-queue-port", + str(port_num + 47873), + "--tensor-parallel-size", + "4", + "--num-gpu-blocks-override", + "16384", + "--max-model-len", + "32768", + "--max-num-seqs", + "64", + "--quantization", + "W4A8", + ] + + # 启动服务器 + if not start_server(server_args): + pytest.fail("W4A8模式服务启动失败") + + # 执行测试 + try: + ip = "0.0.0.0" + client = openai.Client(base_url=f"http://{ip}:{port_num}/v1", api_key="EMPTY_API_KEY") + + # 非流式对话 + response = client.chat.completions.create( + model="default", + messages=[ + {"role": "user", "content": "你好,你是谁?"}, + ], + temperature=1, + top_p=0, + max_tokens=64, + stream=False, + ) + + print(f"\n模型回复: {response.choices[0].message.content}") + + # 验证响应 + assert any( + keyword in response.choices[0].message.content + for keyword in ["人工智能", "文心一言", "小度", "百度", "智能助手"] + ), f"响应内容不符合预期: {response.choices[0].message.content}" + + print("\nW4A8测试通过!") + + except Exception as e: + print(f"\nW4A8测试失败: {str(e)}") + print_logs_on_failure() + pytest.fail(f"W4A8测试失败: {str(e)}") + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "-s"])