diff --git a/.github/workflows/ci_metax.yml b/.github/workflows/ci_metax.yml index 0c5c56158..167c2f0bf 100644 --- a/.github/workflows/ci_metax.yml +++ b/.github/workflows/ci_metax.yml @@ -133,9 +133,9 @@ jobs: cp ${REPLACE_FILES_PATH}/all_things.py ${PACKAGES_LINK}/use_triton_in_paddle/cuda/ sudo chmod 777 -R ${REPLACE_FILES_PATH}/mctlass - sudo cp ${REPLACE_FILES_PATH}/mctlass/mctlassEx.h /opt/maca/include/mctlassEx/ - sudo cp ${REPLACE_FILES_PATH}/mctlass/libmctlassEx.so /opt/maca/lib/ - sudo cp ${REPLACE_FILES_PATH}/mctlass/mctlassEx_xcore1000.mcfb /opt/maca/lib/ + sudo cp ${REPLACE_FILES_PATH}/mctlass/mctlassEx.h ${MACA_PATH}/include/mctlassEx/ + sudo cp ${REPLACE_FILES_PATH}/mctlass/libmctlassEx.so ${MACA_PATH}/lib/ + sudo cp ${REPLACE_FILES_PATH}/mctlass/mctlassEx_xcore1000.mcfb ${MACA_PATH}/lib/ bash build.sh @@ -154,9 +154,15 @@ jobs: } } - ignore_error "timeout -s 9 600s python tests/ci_use/Metax_UT/run_ernie_vl_28B.py" + ignore_error "bash scripts/run_ci_metax.sh" + if (( exit_code != 0 )); then + exit ${exit_code} + fi - echo -e "\n=========== Fastdeploy workerlog.0 ===========" - cat log/workerlog.0 + ignore_error "timeout -s 9 600s python tests/metax_ci/run_ernie_vl_28B.py" + if (( exit_code != 0 )); then + echo -e "\n=========== Fastdeploy workerlog.0 ===========" + cat log/workerlog.0 + fi exit ${exit_code} diff --git a/scripts/run_ci_metax.sh b/scripts/run_ci_metax.sh new file mode 100644 index 000000000..d686ad748 --- /dev/null +++ b/scripts/run_ci_metax.sh @@ -0,0 +1,259 @@ +#!/bin/bash + +DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +tests_path="$DIR/../tests/" +export PYTEST_INI="$DIR/../tests/cov_pytest.ini" +run_path=$( realpath "$DIR/../") + +export COVERAGE_FILE=${COVERAGE_FILE:-$DIR/../coveragedata/.coverage} +export COVERAGE_RCFILE=${COVERAGE_RCFILE:-$DIR/../scripts/.coveragerc} + + +LOG_ROOT_PATH=${run_path}/metax_log +LOG_SUBDIR=${LOG_ROOT_PATH}/logs +LOG_RESULT_TMP=$(mktemp) +PASS_FILE_LIST=${LOG_ROOT_PATH}/passed_files.txt +FAIL_FILE_LIST=${LOG_ROOT_PATH}/failed_files.txt +SUMMARY_FILE_LIST=${LOG_ROOT_PATH}/summary.txt +trap 'rm -f "$LOG_RESULT_TMP"' EXIT + +mkdir -p "$LOG_ROOT_PATH" "$LOG_SUBDIR" + +OVERWRITE_OLD_RESULT="yes" +METAX_GPU_TARGET=C500 +PARALLEL_NUM="4" +PYTEST_EXTRA_ARGS="${3:-}" + + +declare -a IGNORE_PATHS=() +while IFS= read -r line; do + if [ -z "$line" ]; then + continue + fi + path=$(echo "$line" | sed 's/^\s*--ignore=//') + if [ -n "$path" ]; then + IGNORE_PATHS+=("$path") + fi +done < <(grep -E '^\s*--ignore=' "$PYTEST_INI") + + +declare -A SEEN=() +declare -a EXCLUDE_PATHS=() +for path in "${IGNORE_PATHS[@]}"; do + if [[ -z "${SEEN[$path]}" ]]; then + SEEN[$path]=1 + EXCLUDE_PATHS+=("$path") + fi +done + +# declare -a CUSTOM_EXCLUDE_PATHS=( +# "tests/e2e" +# "tests/model_loader" +# "tests/pooling" +# "tests/entrypoints" +# ) + +# for path in "${CUSTOM_EXCLUDE_PATHS[@]}"; do +# if [[ -z "${SEEN[$path]}" ]]; then +# SEEN[$path]=1 +# EXCLUDE_PATHS+=("$path") +# fi +# done + + +is_excluded() { + local target_path="$1" + for exclude in "${EXCLUDE_PATHS[@]}"; do + if [ -z "$exclude" ]; then + continue + fi + if [[ "$target_path" == *"$exclude"* ]]; then + return 0 + fi + done + return 1 +} + +# FIND_PATTERN="test_*.py" +# declare -a ALL_PATHS=() + +# while IFS= read -r path; do +# [[ -n "$path" ]] && ALL_PATHS+=("$path") +# done < <(find "${tests_path}" -type f -name "$FIND_PATTERN" | sort | uniq) + + +declare -a FILTERED_PATHS=() + +METAX_CI_CASELIST=( + "tests/metax_ci/test_fused_moe.py" + "tests/operators/test_limit_thinking_content_length.py" + "tests/operators/test_update_inputs_v1.py" + "tests/operators/test_set_value_by_flags_and_idx.py" + "tests/operators/test_get_token_penalty_multi_scores.py" + "tests/operators/test_speculate_get_token_penalty_multi_scores.py" + "tests/operators/test_token_penalty.py" + "tests/operators/test_stop_generation_multi_ends.py" + "tests/operators/test_get_padding_offset.py" + "tests/operators/test_speculate_get_padding_offset.py" + "tests/operators/test_rebuild_padding.py" + "tests/operators/test_share_external_data.py" + "tests/operators/test_rejection_top_p_sampling.py" + "tests/layers/test_min_sampling.py" +) +for path in "${METAX_CI_CASELIST[@]}"; do + FILTERED_PATHS+=("$run_path/$path") + # if ! is_excluded "$path"; then + # FILTERED_PATHS+=("$run_path/$path") + # fi +done + + +echo -e "\n================== Metax CI test total num ( ${#FILTERED_PATHS[@]} ) ==================" + +if [ "$OVERWRITE_OLD_RESULT" = "yes" ]; then + > "${SUMMARY_FILE_LIST}" + > "${PASS_FILE_LIST}" + > "${FAIL_FILE_LIST}" + + rm -f "${LOG_SUBDIR}"/*.log +else + echo -e "\n================== $(date +%Y-%m-%d_%H:%M:%S) =================" >> "${SUMMARY_FILE_LIST}" +fi + + +get_max_free_gpu() { + local log=$(mx-smi) + local gpu_lines=$(echo "$log" | grep -E "MetaX ${METAX_GPU_TARGET}|MiB" | grep -v "Process" | grep -v "Board Name") + + local GPU_INFO=() + local current_gpu_idx="" + + while IFS= read -r line; do + if echo "$line" | grep -q "MetaX ${METAX_GPU_TARGET}"; then + current_gpu_idx=$(echo "$line" | awk '{print $2}' | grep -E '^[0-9]+$') + elif echo "$line" | grep -q "MiB"; then + if [ -n "$current_gpu_idx" ]; then + mem_used=$(echo "$line" | awk '{for(i=1;i<=NF;i++){if($i ~ /MiB/){split($(i-1),mem,"/");print mem[1]}}}' ) + mem_total=$(echo "$line" | awk '{for(i=1;i<=NF;i++){if($i ~ /MiB/){split($(i-1),mem,"/");print mem[2]}}}' ) + mem_free=$((mem_total - mem_used)) + GPU_INFO+=("$current_gpu_idx:$mem_used:$mem_total:$mem_free") + # echo "$current_gpu_idx - ${mem_used}" + current_gpu_idx="" + fi + fi + done <<< "$gpu_lines" + + gpu_mem_info=${GPU_INFO[@]} + + local sorted_gpus=$(echo "$gpu_mem_info" | tr ' ' '\n' | sort -t ':' -k4,4nr -k1,1n) + echo "${sorted_gpus}" + + local count=0 + local top_n=1 + local gpu_list="" + while IFS= read -r gpu && [ $count -lt $top_n ]; do + if [ -n "$gpu" ]; then + local gpu_idx=$(echo "$gpu" | cut -d ':' -f 1) + local gpu_free=$(echo "$gpu" | cut -d ':' -f 4) + gpu_list="$gpu_list,$gpu_idx" + count=$((count + 1)) + fi + done <<< "$sorted_gpus" + + gpu_list=${gpu_list:1:${#gpu_list}} + echo "$gpu_list" +} + +test_env_init() { + export MACA_PATH=/opt/maca + if [ ! -d ${HOME}/cu-bridge ]; then + `${MACA_PATH}/tools/cu-bridge/tools/pre_make` + fi + + export CUDA_PATH=${HOME}/cu-bridge/CUDA_DIR + export LD_LIBRARY_PATH=${CUDA_PATH}/lib64:${MACA_PATH}/lib:${MACA_PATH}/mxgpu_llvm/lib:$LD_LIBRARY_PATH + export PADDLE_XCCL_BACKEND=metax_gpu + export FLAGS_weight_only_linear_arch=80 + export FD_MOE_BACKEND=cutlass # or triton + export FD_METAX_KVCACHE_MEM=8 + export FD_ENC_DEC_BLOCK_NUM=2 + + export MACA_VISIBLE_DEVICES=$(get_max_free_gpu) +} + + +test_single_file() { + test_env_init + + local file="$1" + local extra_args="$2" + local log_file="${LOG_SUBDIR}/$(basename "$file").log" + local start_time=$(date +%s) + + pytest "$file" $extra_args > "$log_file" 2>&1 + local exit_code=$? + local end_time=$(date +%s) + local cost_time=$((end_time - start_time)) + + echo "$file,$exit_code,$cost_time" >> "$LOG_RESULT_TMP" + + if [ $exit_code -eq 0 ]; then + echo "✅ $(basename "$file") passed (cost: ${cost_time}s)" + else + echo "❌ $(basename "$file") failed (cost: ${cost_time}s, exit code: $exit_code)" + fi +} + +export -f get_max_free_gpu +export -f test_env_init +export -f test_single_file +export LOG_RESULT_TMP PYTEST_EXTRA_ARGS LOG_SUBDIR + + +# if [ "$PARALLEL_NUM" = "auto" ]; then +# PARALLEL_NUM=$(nproc --all 2>/dev/null || sysctl -n hw.ncpu 2>/dev/null || echo 4) +# fi + + +printf "%s\n" "${FILTERED_PATHS[@]}" | xargs -I {} -P "$PARALLEL_NUM" bash -c 'test_single_file "{}" "$PYTEST_EXTRA_ARGS"' + + +PASS_COUNT=0 +FAIL_COUNT=0 +TOTAL_COST_TIME=0 +declare -a FAIL_FILES=() + +while IFS=, read -r file exit_code cost_time; do + TOTAL_COST_TIME=$((TOTAL_COST_TIME + cost_time)) + if [ "$exit_code" -eq 0 ]; then + PASS_COUNT=$((PASS_COUNT + 1)) + echo "$file" >> ${PASS_FILE_LIST} + else + FAIL_COUNT=$((FAIL_COUNT + 1)) + FAIL_FILES+=$(basename "$file") + echo "$file" >> ${FAIL_FILE_LIST} + fi +done < "$LOG_RESULT_TMP" + +{ + echo "============================= TEST RESULT SUMMARY ================================" + echo "Pytest date: $(date +%Y-%m-%d\ %H:%M:%S)" + echo "Pytest parallel num: $PARALLEL_NUM" + echo "Pytest extra args: $PYTEST_EXTRA_ARGS" + echo "Pytest total num: $((PASS_COUNT + FAIL_COUNT))" + echo "Pytest successful: $PASS_COUNT" + echo "Pytest failed: $FAIL_COUNT" + echo "Pytest all cost: $TOTAL_COST_TIME s" + echo "==================================================================================" +} >> "${SUMMARY_FILE_LIST}" + + +cat ${SUMMARY_FILE_LIST} + + +if [ "$FAIL_COUNT" -ne 0 ]; then + # echo "Failed test cases are listed in $failed_tests_file" + # cat "$FAIL_FILE_LIST" + echo ${FAIL_FILES[@]} + exit 1 +fi diff --git a/tests/cov_pytest.ini b/tests/cov_pytest.ini index a747d79d4..288a66a91 100644 --- a/tests/cov_pytest.ini +++ b/tests/cov_pytest.ini @@ -8,5 +8,6 @@ addopts = --ignore=tests/model_loader/test_w4a8_model.py --ignore=tests/entrypoints/test_engine_client.py --ignore=tests/xpu_ci + --ignore=tests/metax_ci --ignore=tests/v1/test_schedule_output.py --ignore=tests/graph_optimization/test_cuda_graph_dynamic_subgraph.py diff --git a/tests/ci_use/Metax_UT/run_ernie_vl_28B.py b/tests/metax_ci/run_ernie_vl_28B.py similarity index 93% rename from tests/ci_use/Metax_UT/run_ernie_vl_28B.py rename to tests/metax_ci/run_ernie_vl_28B.py index 59105f38c..e59e7fb23 100644 --- a/tests/ci_use/Metax_UT/run_ernie_vl_28B.py +++ b/tests/metax_ci/run_ernie_vl_28B.py @@ -21,6 +21,7 @@ llm = fastdeploy.LLM( quantization="wint8", load_choices="default_v1", disable_custom_all_reduce=True, + graph_optimization_config={"use_cudagraph": False, "graph_opt_level": 0}, ) prompts = [ diff --git a/tests/metax_ci/test_fused_moe.py b/tests/metax_ci/test_fused_moe.py new file mode 100644 index 000000000..53f8ea774 --- /dev/null +++ b/tests/metax_ci/test_fused_moe.py @@ -0,0 +1,296 @@ +import unittest + +import numpy as np +import paddle +import paddle.nn.functional as F +from paddle import nn +from paddle.incubate.nn.functional import swiglu +from paddle.nn.quant import weight_only_linear, weight_quantize + +from fastdeploy.model_executor.ops.gpu import ( + fused_expert_moe, + moe_expert_dispatch, + moe_expert_ffn, + moe_expert_reduce, +) + +paddle.seed(2025) +np.random.seed(2025) + + +class Expert(nn.Layer): + def __init__(self, d_model, d_feedforward, dtype="bfloat16", quant_type="weight_only_int8"): + super().__init__() + + self.dtype = dtype + self.quant_type = quant_type + self.fc0 = nn.Linear(d_model, d_feedforward * 2) + self.fc1 = nn.Linear(d_feedforward, d_model) + + self.w0_quanted, self.s0 = weight_quantize(self.fc0.weight, quant_type, arch=80, group_size=-1) + self.w1_quanted, self.s1 = weight_quantize(self.fc1.weight, quant_type, arch=80, group_size=-1) + + def load_weight(self, ffn0_gate_proj_weight, ffn0_up_proj_weight, ffn1_down_proj_weight): + concated_gate_up_weight = np.concatenate([ffn0_gate_proj_weight, ffn0_up_proj_weight], axis=-1) + ffn0_weight = paddle.to_tensor(concated_gate_up_weight).cast(self.dtype) + ffn1_weight = paddle.to_tensor(ffn1_down_proj_weight).cast(self.dtype) + + self.fc0.weight.set_value(ffn0_weight) + self.fc1.weight.set_value(ffn1_weight) + + self.w0_quanted, self.s0 = weight_quantize(ffn0_weight, algo=self.quant_type, arch=80, group_size=-1) + self.w1_quanted, self.s1 = weight_quantize(ffn1_weight, algo=self.quant_type, arch=80, group_size=-1) + + def set_value(self, ffn0_weight, ffn1_weight): + self.fc0.weight.set_value(ffn0_weight) + self.fc1.weight.set_value(ffn1_weight) + + self.w0_quanted, self.s0 = weight_quantize(self.fc0.weight, self.quant_type, arch=80, group_size=-1) + self.w1_quanted, self.s1 = weight_quantize(self.fc1.weight, self.quant_type, arch=80, group_size=-1) + + def forward(self, x): + x = self.fc0(x) + x = swiglu(x) + return self.fc1(x) + + def forward_quant(self, x): + x = weight_only_linear(x, self.w0_quanted.T, weight_scale=self.s0) + x = swiglu(x) + return weight_only_linear(x, self.w1_quanted.T, weight_scale=self.s1) + + +class FusedMoe: + def __init__( + self, input_shape: list, d_feedforward, num_experts, top_k, dtype, quant_type="None", rtol=1e-2, atol=1e-2 + ) -> None: + self.batch_size, self.seq_len, self.d_model = input_shape + self.d_feedforward = d_feedforward + self.num_experts = num_experts + self.top_k = top_k + self.dtype = dtype + self.quant_type = quant_type + self.rtol = rtol + self.atol = atol + + self._init_parameters() + self._prepare_data() + + def _init_parameters(self): + # 创建专家层 + self.experts = nn.LayerList( + [Expert(self.d_model, self.d_feedforward, self.dtype, self.quant_type) for _ in range(self.num_experts)] + ) + + # 初始化门控权重 + self.gate = nn.Linear(self.d_model, self.num_experts) + self.gate_weight = self.gate.weight.cast("float32") + + def _prepare_data(self): + """准备输入数据""" + self.x = paddle.randn([self.batch_size, self.seq_len, self.d_model], dtype=self.dtype) + + self.s0 = None + self.s1 = None + if self.quant_type == "weight_only_int8": + self.w0 = paddle.stack([e.w0_quanted for e in self.experts], axis=0).transpose([0, 2, 1]).astype("int8") + self.w1 = paddle.stack([e.w1_quanted for e in self.experts], axis=0).transpose([0, 2, 1]).astype("int8") + self.s0 = paddle.stack([e.s0 for e in self.experts], axis=0).astype(self.dtype) + self.s1 = paddle.stack([e.s1 for e in self.experts], axis=0).astype(self.dtype) + else: + self.w0 = paddle.stack([e.fc0.weight for e in self.experts], axis=0).astype(self.dtype) + self.w1 = paddle.stack([e.fc1.weight for e in self.experts], axis=0).astype(self.dtype) + + self.b0 = ( + paddle.stack([e.fc0.bias for e in self.experts], axis=0) + .reshape([self.num_experts, 1, -1]) + .astype(self.dtype) + ) + self.b1 = ( + paddle.stack([e.fc1.bias for e in self.experts], axis=0) + .reshape([self.num_experts, 1, -1]) + .astype(self.dtype) + ) + + def baseline_forward(self, hidden_states): + """(逐个专家计算)""" + batch_size, seq_len, hidden_dim = hidden_states.shape + hidden_states = hidden_states.reshape([-1, hidden_dim]) + + # 路由计算 + logits = paddle.matmul(hidden_states.cast("float32"), self.gate_weight) + weights = F.softmax(logits, axis=-1) + routing_weights, selected_experts = paddle.topk(weights, self.top_k, axis=-1) + # 结果累加 + final_hidden_states = paddle.zeros_like(hidden_states) + + expert_mask = paddle.transpose(F.one_hot(selected_experts, num_classes=self.num_experts), [2, 1, 0]) + + for expert_id in range(self.num_experts): + expert_layer = self.experts[expert_id] + idx, top_x = paddle.where(expert_mask[expert_id]) + + current_state = paddle.index_select(hidden_states, top_x, axis=0).reshape([-1, hidden_dim]) + if self.quant_type == "None": + current_hidden_states = expert_layer(current_state) * routing_weights[top_x, idx].view([-1, 1]) + else: + current_hidden_states = expert_layer.forward_quant(current_state) * routing_weights[top_x, idx].view( + [-1, 1] + ) + + paddle.index_add_( + x=final_hidden_states, + index=top_x, + axis=0, + value=current_hidden_states.to(hidden_states.dtype), + ) + final_hidden_states = paddle.reshape(final_hidden_states, [batch_size, seq_len, hidden_dim]) + return final_hidden_states + + def fused_forward(self, x): + """测试融合实现""" + return fused_expert_moe( + x, + self.gate_weight, + self.w0, + self.w1, + self.b0, + None if self.quant_type == "None" else self.s0, + self.b1, + None if self.quant_type == "None" else self.s1, + self.quant_type, + self.top_k, + False, + False, + ) + + def split_forward(self, hidden_states): + """测试拆分实现""" + batch_size, seq_len, hidden_dim = hidden_states.shape + hidden_states = hidden_states.reshape([-1, hidden_dim]) + + # 路由计算 + logits = paddle.matmul(hidden_states.cast("float32"), self.gate_weight) + scores = F.softmax(logits, axis=-1) + ( + permute_input, + token_nums_per_expert, + permute_indices_per_token, + top_k_weights, + top_k_indices, + expert_idx_per_token, + ) = moe_expert_dispatch(hidden_states, scores, None, None, self.top_k, False, self.quant_type, True) + + expert_idx_per_token = None + + ffn_out = moe_expert_ffn( + permute_input, + token_nums_per_expert, + self.w0, + self.w1, + self.b0, + None if self.quant_type == "None" else self.s0, + None if self.quant_type == "None" else self.s1, + expert_idx_per_token, + self.quant_type, + ) + output = moe_expert_reduce( + ffn_out, + top_k_weights, + permute_indices_per_token, + top_k_indices, + None, + norm_topk_prob=False, + routed_scaling_factor=1.0, + ) + output = paddle.reshape(output, [batch_size, seq_len, hidden_dim]) + return output + + def test_consistency(self): + base_out = self.baseline_forward(self.x) + split_out = self.split_forward(self.x) + fused_out = self.fused_forward(self.x) + + np.testing.assert_allclose( + split_out.cast("float32").numpy().astype("float32"), + base_out.cast("float32").numpy().astype("float32"), + rtol=self.rtol, + atol=self.atol, + ) + np.testing.assert_allclose( + base_out.cast("float32").numpy().astype("float32"), + fused_out.cast("float32").numpy().astype("float32"), + rtol=self.rtol, + atol=self.atol, + ) + + +class TestMetaxFusedMoe(unittest.TestCase): + @classmethod + def setUpClass(cls): + """Class-level setup that runs once before all tests.""" + cls.set_config() + paddle.set_default_dtype(cls.dtype) + + @classmethod + def set_config(cls): + """Set the configuration parameters for the test.""" + cls.dtype = "bfloat16" + cls.supported_quant_type = ["weight_only_int8"] + + batch_size_list = [1] + seq_len_list = [1, 128, 256, 512, 1024, 2048] + d_model_list = [[7168, 128]] + num_experts_list = [256] + top_k_list = [8] + + cls.test_params = [] + for batch_size in batch_size_list: + for seq_len in seq_len_list: + for d_model in d_model_list: + for num_experts in num_experts_list: + for top_k in top_k_list: + if top_k >= num_experts: + continue + cls.test_params.append( + { + "input_shape": [batch_size, seq_len, d_model[0]], + "d_feedforward": d_model[1], + "num_experts": num_experts, + "top_k": top_k, + } + ) + + def setUp(self): + """Test-level setup that runs before each test.""" + pass + + def test_bfloat16_wint8_quant(self): + rtol = 1e-2 + atol = 1e-2 + quant_type = "weight_only_int8" + assert quant_type in self.supported_quant_type + + for param in self.test_params: + fused_moe_test = FusedMoe( + param["input_shape"], + param["d_feedforward"], + param["num_experts"], + param["top_k"], + self.dtype, + quant_type, + rtol, + atol, + ) + fused_moe_test.test_consistency() + + # def test_bfloat16_without_quant(self): + # quant_type = None + # assert quant_type in self.supported_quant_type + + # for param in self.test_params: + # fused_moe_test = FusedMoe(param['input_shape'], param['d_feedforward'], param['num_experts'], param['top_k'], self.dtype, quant_type, self.rtol, self.atol) + # fused_moe_test.test_consistency() + + +if __name__ == "__main__": + unittest.main()