[cp][BugFix]2.2_fix_custom_ar_unstable_result (#4436)
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled

* [BugFix]Dev fix custom ar unstable result (#4437)

* code check
This commit is contained in:
chen
2025-10-17 16:04:54 +08:00
committed by GitHub
parent 4178c110d2
commit f660188a85
15 changed files with 22 additions and 21 deletions

View File

@@ -59,7 +59,7 @@ try:
global _TP_AR
if _TP_AR is not None and _TP_AR.should_custom_ar(input_):
# TODO: supports different_group custom allreduce
_TP_AR.custom_all_reduce(input_)
input_ = _TP_AR.custom_all_reduce(input_)
elif paddle.in_dynamic_mode():
if group_ is not None:
dist.all_reduce(input_, group=group_)
@@ -69,6 +69,7 @@ try:
dist.all_reduce(input_, group=mp_group)
else:
dist.all_reduce(input_)
return input_
except:
tensor_model_parallel_all_reduce = None

View File

@@ -213,13 +213,13 @@ class CustomAllreduce:
stream_capturing = lib.cudaStreamIsCapturing(stream)
if stream_capturing.value == 1:
# 1 is cudaStreamCaptureStatusActive: The stream is capturing.
return self.all_reduce(input, input, registered=True)
return self.all_reduce(input, registered=True)
else:
# If warm up, mimic the allocation pattern since custom
# allreduce is out-of-place.
return paddle.empty_like(input)
else:
return self.all_reduce(input, input, registered=False)
return self.all_reduce(input, registered=False)
def clear_ipc_handles(self):
clear_ipc_handles(self._ptr)

View File

@@ -243,5 +243,5 @@ class DCUTritonWeightOnlyMoEMethod(QuantMethodBase):
out = intermediate_cache3.sum(axis=1)
if layer.tp_size > 1:
tensor_model_parallel_all_reduce(out)
out = tensor_model_parallel_all_reduce(out)
return out

View File

@@ -180,7 +180,7 @@ class GCUFusedMoeMethod(UnquantizedFusedMoEMethod):
tensor_model_parallel_all_reduce,
)
tensor_model_parallel_all_reduce(fused_moe_out)
fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
return fused_moe_out

View File

@@ -778,7 +778,7 @@ class RowParallelLinear(LinearBase):
out = paddle.matmul(x, self.weight)
if self.reduce_results and self.nranks > 1:
tensor_model_parallel_all_reduce(out, self.tp_group)
out = tensor_model_parallel_all_reduce(out, self.tp_group)
if not self.fd_config.quant_config and self.add_bias:
out = paddle.add(out, self.bias)
return out

View File

@@ -298,7 +298,7 @@ class CutlassMoEMethod(UnquantizedFusedMoEMethod):
)
if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce(fused_moe_out, layer.fd_config.parallel_config.tp_group)
fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out, layer.fd_config.parallel_config.tp_group)
return fused_moe_out

View File

@@ -572,6 +572,6 @@ class DeepGemmFusedMoeMethod(MoEMethodBase):
1.0,
)[0]
if layer.tp_size > 1:
tensor_model_parallel_all_reduce(tmp_ffn_out)
tmp_ffn_out = tensor_model_parallel_all_reduce(tmp_ffn_out)
return tmp_ffn_out

View File

@@ -353,6 +353,6 @@ class MarlinWeightOnlyMoEMethod(QuantMethodBase):
ffn_out = ffn_out.sum(axis=1)
if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce(ffn_out)
ffn_out = tensor_model_parallel_all_reduce(ffn_out)
return ffn_out

View File

@@ -388,7 +388,7 @@ class TritonWeightOnlyMoEMethod(QuantMethodBase):
down_proj_out.reshape_([token_num, top_k, hidden_size])
out = down_proj_out.sum(axis=1)
if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce(out)
out = tensor_model_parallel_all_reduce(out)
return out
@@ -759,7 +759,7 @@ class Wfp8Afp8MoEMethod(QuantMethodBase):
out = down_proj_out.sum(axis=1)
if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce(out)
out = tensor_model_parallel_all_reduce(out)
return out
@@ -1060,7 +1060,7 @@ class TensorWiseFP8MoEMethod(QuantMethodBase):
out = down_proj_out.sum(axis=1)
if layer.tp_size > 1:
tensor_model_parallel_all_reduce(out)
out = tensor_model_parallel_all_reduce(out)
return out
@@ -1453,6 +1453,6 @@ class BlockWiseFP8MoEMethod(QuantMethodBase):
out = intermediate_cache3.sum(axis=1)
if layer.tp_size > 1:
tensor_model_parallel_all_reduce(out)
out = tensor_model_parallel_all_reduce(out)
return out

View File

@@ -318,7 +318,7 @@ class CutlassWint2FusedMoeMethod(Wint2MoeMethod):
)
if layer.tp_size > 1:
tensor_model_parallel_all_reduce(fused_moe_out)
fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
return fused_moe_out
@@ -488,6 +488,6 @@ class TritonWint2FusedMoeMethod(CutlassWint2FusedMoeMethod):
fused_moe_out = paddle.sum(intermediate_cache3, axis=1)
if layer.tp_size > 1:
tensor_model_parallel_all_reduce(fused_moe_out)
fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
return fused_moe_out

View File

@@ -73,7 +73,7 @@ class XPUMoEMethod(UnquantizedFusedMoEMethod):
tensor_model_parallel_all_reduce,
)
tensor_model_parallel_all_reduce(fused_moe_out)
fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
return fused_moe_out
@@ -253,6 +253,6 @@ class XPUWeightOnlyMoEMethod(QuantMethodBase):
tensor_model_parallel_all_reduce,
)
tensor_model_parallel_all_reduce(fused_moe_out)
fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
return fused_moe_out

View File

@@ -182,7 +182,7 @@ class DeepSeekV3MoE(nn.Layer):
moe_out = moe_out + shared_experts_out
# We do to TP all reduce after the sum of experts.
if self.tp_size > 1:
tensor_model_parallel_all_reduce(moe_out)
moe_out = tensor_model_parallel_all_reduce(moe_out)
return moe_out

View File

@@ -287,7 +287,7 @@ class Ernie4_5_VLMoE(nn.Layer):
if self.num_shared_experts > 0:
hidden_states += shared_experts_out
if self.tp_size > 1:
tensor_model_parallel_all_reduce(hidden_states)
hidden_states = tensor_model_parallel_all_reduce(hidden_states)
return hidden_states

View File

@@ -163,7 +163,7 @@ class Glm4Moe(nn.Layer):
out = out + shared_experts_out
# We do to TP all reduce after the sum of experts.
if self.tensor_parallel_size > 1:
tensor_model_parallel_all_reduce(out, self.tp_group)
out = tensor_model_parallel_all_reduce(out, self.tp_group)
return out

View File

@@ -57,7 +57,7 @@ class Test(unittest.TestCase):
data_cusom_ar = paddle.rand([m, n], dtype="bfloat16")
data_paddle = data_cusom_ar.clone()
if fa.should_custom_ar(data_cusom_ar):
fa.custom_all_reduce(data_cusom_ar)
data_cusom_ar = fa.custom_all_reduce(data_cusom_ar)
dist.all_reduce(data_paddle)
if dist.get_rank() == 0:
np.testing.assert_allclose(