From cc588b70abd2a7a3fd7d8e8ffe329b52cb7fafaf Mon Sep 17 00:00:00 2001 From: chen <103103266+ckl117@users.noreply.github.com> Date: Mon, 24 Nov 2025 15:28:01 +0800 Subject: [PATCH] [CP][BugFix]Dev fix custom ar unstable result (#5186) * [CP][BugFix]Dev fix custom ar unstable result (#4437) * code check * revert delete * check * pre_commit --- fastdeploy/distributed/communication.py | 3 ++- .../distributed/custom_all_reduce/custom_all_reduce.py | 4 ++-- .../layers/backends/dcu/fused_moe_triton_backends.py | 2 +- .../backends/gcu/moe/fused_moe_method_gcu_backend.py | 2 +- fastdeploy/model_executor/layers/linear.py | 2 +- .../layers/moe/fused_moe_cutlass_backend.py | 2 +- .../layers/moe/fused_moe_deepgemm_backend.py | 2 +- .../layers/moe/fused_moe_marlin_backend.py | 2 +- .../layers/moe/fused_moe_triton_backend.py | 4 ++-- .../layers/moe/fused_moe_wint2_backend.py | 4 ++-- .../model_executor/layers/moe/fused_moe_xpu_backend.py | 4 ++-- fastdeploy/model_executor/models/deepseek_v3.py | 2 +- .../models/ernie4_5_vl/ernie4_5_vl_moe.py | 2 +- tests/distributed/custom_all_reduce.py | 10 +++++----- 14 files changed, 23 insertions(+), 22 deletions(-) diff --git a/fastdeploy/distributed/communication.py b/fastdeploy/distributed/communication.py index bcc45f140..f2629ca26 100644 --- a/fastdeploy/distributed/communication.py +++ b/fastdeploy/distributed/communication.py @@ -53,7 +53,7 @@ try: global _TP_AR if _TP_AR is not None and _TP_AR.should_custom_ar(input_): # TODO: supports different_group custom allreduce - _TP_AR.custom_all_reduce(input_) + input_ = _TP_AR.custom_all_reduce(input_) elif paddle.in_dynamic_mode(): if group_ is not None: dist.all_reduce(input_, group=group_) @@ -63,6 +63,7 @@ try: dist.all_reduce(input_, group=mp_group) else: dist.all_reduce(input_) + return input_ except: tensor_model_parallel_all_reduce = None diff --git a/fastdeploy/distributed/custom_all_reduce/custom_all_reduce.py b/fastdeploy/distributed/custom_all_reduce/custom_all_reduce.py index 9a38b728e..1c9aecf89 100644 --- a/fastdeploy/distributed/custom_all_reduce/custom_all_reduce.py +++ b/fastdeploy/distributed/custom_all_reduce/custom_all_reduce.py @@ -212,13 +212,13 @@ class CustomAllreduce: stream_capturing = lib.cudaStreamIsCapturing(stream) if stream_capturing.value == 1: # 1 is cudaStreamCaptureStatusActive: The stream is capturing. - return self.all_reduce(input, input, registered=True) + return self.all_reduce(input, registered=True) else: # If warm up, mimic the allocation pattern since custom # allreduce is out-of-place. return paddle.empty_like(input) else: - return self.all_reduce(input, input, registered=False) + return self.all_reduce(input, registered=False) def close(self): if self._ptr: diff --git a/fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py b/fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py index 0038ed149..f1ea6572f 100644 --- a/fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py +++ b/fastdeploy/model_executor/layers/backends/dcu/fused_moe_triton_backends.py @@ -243,5 +243,5 @@ class DCUTritonWeightOnlyMoEMethod(QuantMethodBase): out = intermediate_cache3.sum(axis=1) if layer.tp_size > 1: - tensor_model_parallel_all_reduce(out) + out = tensor_model_parallel_all_reduce(out) return out diff --git a/fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py b/fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py index c899cafc7..c13a68f31 100644 --- a/fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py +++ b/fastdeploy/model_executor/layers/backends/gcu/moe/fused_moe_method_gcu_backend.py @@ -180,7 +180,7 @@ class GCUFusedMoeMethod(UnquantizedFusedMoEMethod): tensor_model_parallel_all_reduce, ) - tensor_model_parallel_all_reduce(fused_moe_out) + fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out) return fused_moe_out diff --git a/fastdeploy/model_executor/layers/linear.py b/fastdeploy/model_executor/layers/linear.py index 0d079c90c..7b1dc794a 100644 --- a/fastdeploy/model_executor/layers/linear.py +++ b/fastdeploy/model_executor/layers/linear.py @@ -778,7 +778,7 @@ class RowParallelLinear(LinearBase): out = paddle.matmul(x, self.weight) if self.reduce_results and self.nranks > 1: - tensor_model_parallel_all_reduce(out, self.tp_group) + out = tensor_model_parallel_all_reduce(out, self.tp_group) if not self.fd_config.quant_config and self.add_bias: out = paddle.add(out, self.bias) return out diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py index 9dd5c9984..b547e1129 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py @@ -298,7 +298,7 @@ class CutlassMoEMethod(UnquantizedFusedMoEMethod): ) if layer.reduce_results and layer.tp_size > 1: - tensor_model_parallel_all_reduce(fused_moe_out) + fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out) return fused_moe_out diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py index 178bc74b3..e76bdf55c 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py @@ -575,6 +575,6 @@ class DeepGemmFusedMoeMethod(MoEMethodBase): 1.0, )[0] if layer.tp_size > 1: - tensor_model_parallel_all_reduce(tmp_ffn_out) + tmp_ffn_out = tensor_model_parallel_all_reduce(tmp_ffn_out) return tmp_ffn_out diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py index b3aa306e9..705dfee92 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_marlin_backend.py @@ -353,6 +353,6 @@ class MarlinWeightOnlyMoEMethod(QuantMethodBase): ffn_out = ffn_out.sum(axis=1) if layer.reduce_results and layer.tp_size > 1: - tensor_model_parallel_all_reduce(ffn_out) + ffn_out = tensor_model_parallel_all_reduce(ffn_out) return ffn_out diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py index 735662642..e9bf781a2 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py @@ -599,7 +599,7 @@ class TensorWiseFP8MoEMethod(QuantMethodBase): out = down_proj_out.sum(axis=1) if layer.tp_size > 1: - tensor_model_parallel_all_reduce(out) + out = tensor_model_parallel_all_reduce(out) return out @@ -997,6 +997,6 @@ class BlockWiseFP8MoEMethod(QuantMethodBase): out = intermediate_cache3.sum(axis=1) if layer.tp_size > 1: - tensor_model_parallel_all_reduce(out) + out = tensor_model_parallel_all_reduce(out) return out diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py index f9f717d31..7cbb46dc1 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_wint2_backend.py @@ -318,7 +318,7 @@ class CutlassWint2FusedMoeMethod(Wint2MoeMethod): ) if layer.tp_size > 1: - tensor_model_parallel_all_reduce(fused_moe_out) + fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out) return fused_moe_out @@ -488,6 +488,6 @@ class TritonWint2FusedMoeMethod(CutlassWint2FusedMoeMethod): fused_moe_out = paddle.sum(intermediate_cache3, axis=1) if layer.tp_size > 1: - tensor_model_parallel_all_reduce(fused_moe_out) + fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out) return fused_moe_out diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py index b83cce96d..272899531 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py @@ -73,7 +73,7 @@ class XPUMoEMethod(UnquantizedFusedMoEMethod): tensor_model_parallel_all_reduce, ) - tensor_model_parallel_all_reduce(fused_moe_out) + fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out) return fused_moe_out @@ -253,6 +253,6 @@ class XPUWeightOnlyMoEMethod(QuantMethodBase): tensor_model_parallel_all_reduce, ) - tensor_model_parallel_all_reduce(fused_moe_out) + fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out) return fused_moe_out diff --git a/fastdeploy/model_executor/models/deepseek_v3.py b/fastdeploy/model_executor/models/deepseek_v3.py index beb348779..7660945ef 100644 --- a/fastdeploy/model_executor/models/deepseek_v3.py +++ b/fastdeploy/model_executor/models/deepseek_v3.py @@ -180,7 +180,7 @@ class DeepSeekV3MoE(nn.Layer): moe_out = moe_out + shared_experts_out # We do to TP all reduce after the sum of experts. if self.tp_size > 1: - tensor_model_parallel_all_reduce(moe_out) + moe_out = tensor_model_parallel_all_reduce(moe_out) return moe_out diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index fff670334..4a28bd45d 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -287,7 +287,7 @@ class Ernie4_5_VLMoE(nn.Layer): if self.num_shared_experts > 0: hidden_states += shared_experts_out if self.tp_size > 1: - tensor_model_parallel_all_reduce(hidden_states) + hidden_states = tensor_model_parallel_all_reduce(hidden_states) return hidden_states diff --git a/tests/distributed/custom_all_reduce.py b/tests/distributed/custom_all_reduce.py index ccc984d3d..be4fd9d5c 100644 --- a/tests/distributed/custom_all_reduce.py +++ b/tests/distributed/custom_all_reduce.py @@ -54,14 +54,14 @@ class Test(unittest.TestCase): fa = CustomAllreduce(model_parallel_group) for m, n in mns: - data_cusom_ar = paddle.rand([m, n], dtype="bfloat16") - data_paddle = data_cusom_ar.clone() - if fa.should_custom_ar(data_cusom_ar): - fa.custom_all_reduce(data_cusom_ar) + data_custom_ar = paddle.rand([m, n], dtype="bfloat16") + data_paddle = data_custom_ar.clone() + if fa.should_custom_ar(data_custom_ar): + data_custom_ar = fa.custom_all_reduce(data_custom_ar) dist.all_reduce(data_paddle) if dist.get_rank() == 0: np.testing.assert_allclose( - data_cusom_ar.numpy(), + data_custom_ar.numpy(), data_paddle.numpy(), rtol=1e-04, atol=1e-04,