mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-12-24 13:28:13 +08:00
[CP][BugFix]Dev fix custom ar unstable result (#5186)
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
* [CP][BugFix]Dev fix custom ar unstable result (#4437) * code check * revert delete * check * pre_commit
This commit is contained in:
@@ -53,7 +53,7 @@ try:
|
||||
global _TP_AR
|
||||
if _TP_AR is not None and _TP_AR.should_custom_ar(input_):
|
||||
# TODO: supports different_group custom allreduce
|
||||
_TP_AR.custom_all_reduce(input_)
|
||||
input_ = _TP_AR.custom_all_reduce(input_)
|
||||
elif paddle.in_dynamic_mode():
|
||||
if group_ is not None:
|
||||
dist.all_reduce(input_, group=group_)
|
||||
@@ -63,6 +63,7 @@ try:
|
||||
dist.all_reduce(input_, group=mp_group)
|
||||
else:
|
||||
dist.all_reduce(input_)
|
||||
return input_
|
||||
|
||||
except:
|
||||
tensor_model_parallel_all_reduce = None
|
||||
|
||||
@@ -212,13 +212,13 @@ class CustomAllreduce:
|
||||
stream_capturing = lib.cudaStreamIsCapturing(stream)
|
||||
if stream_capturing.value == 1:
|
||||
# 1 is cudaStreamCaptureStatusActive: The stream is capturing.
|
||||
return self.all_reduce(input, input, registered=True)
|
||||
return self.all_reduce(input, registered=True)
|
||||
else:
|
||||
# If warm up, mimic the allocation pattern since custom
|
||||
# allreduce is out-of-place.
|
||||
return paddle.empty_like(input)
|
||||
else:
|
||||
return self.all_reduce(input, input, registered=False)
|
||||
return self.all_reduce(input, registered=False)
|
||||
|
||||
def close(self):
|
||||
if self._ptr:
|
||||
|
||||
@@ -243,5 +243,5 @@ class DCUTritonWeightOnlyMoEMethod(QuantMethodBase):
|
||||
out = intermediate_cache3.sum(axis=1)
|
||||
|
||||
if layer.tp_size > 1:
|
||||
tensor_model_parallel_all_reduce(out)
|
||||
out = tensor_model_parallel_all_reduce(out)
|
||||
return out
|
||||
|
||||
@@ -180,7 +180,7 @@ class GCUFusedMoeMethod(UnquantizedFusedMoEMethod):
|
||||
tensor_model_parallel_all_reduce,
|
||||
)
|
||||
|
||||
tensor_model_parallel_all_reduce(fused_moe_out)
|
||||
fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
|
||||
|
||||
return fused_moe_out
|
||||
|
||||
|
||||
@@ -778,7 +778,7 @@ class RowParallelLinear(LinearBase):
|
||||
out = paddle.matmul(x, self.weight)
|
||||
|
||||
if self.reduce_results and self.nranks > 1:
|
||||
tensor_model_parallel_all_reduce(out, self.tp_group)
|
||||
out = tensor_model_parallel_all_reduce(out, self.tp_group)
|
||||
if not self.fd_config.quant_config and self.add_bias:
|
||||
out = paddle.add(out, self.bias)
|
||||
return out
|
||||
|
||||
@@ -298,7 +298,7 @@ class CutlassMoEMethod(UnquantizedFusedMoEMethod):
|
||||
)
|
||||
|
||||
if layer.reduce_results and layer.tp_size > 1:
|
||||
tensor_model_parallel_all_reduce(fused_moe_out)
|
||||
fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
|
||||
|
||||
return fused_moe_out
|
||||
|
||||
|
||||
@@ -575,6 +575,6 @@ class DeepGemmFusedMoeMethod(MoEMethodBase):
|
||||
1.0,
|
||||
)[0]
|
||||
if layer.tp_size > 1:
|
||||
tensor_model_parallel_all_reduce(tmp_ffn_out)
|
||||
tmp_ffn_out = tensor_model_parallel_all_reduce(tmp_ffn_out)
|
||||
|
||||
return tmp_ffn_out
|
||||
|
||||
@@ -353,6 +353,6 @@ class MarlinWeightOnlyMoEMethod(QuantMethodBase):
|
||||
ffn_out = ffn_out.sum(axis=1)
|
||||
|
||||
if layer.reduce_results and layer.tp_size > 1:
|
||||
tensor_model_parallel_all_reduce(ffn_out)
|
||||
ffn_out = tensor_model_parallel_all_reduce(ffn_out)
|
||||
|
||||
return ffn_out
|
||||
|
||||
@@ -599,7 +599,7 @@ class TensorWiseFP8MoEMethod(QuantMethodBase):
|
||||
out = down_proj_out.sum(axis=1)
|
||||
|
||||
if layer.tp_size > 1:
|
||||
tensor_model_parallel_all_reduce(out)
|
||||
out = tensor_model_parallel_all_reduce(out)
|
||||
|
||||
return out
|
||||
|
||||
@@ -997,6 +997,6 @@ class BlockWiseFP8MoEMethod(QuantMethodBase):
|
||||
out = intermediate_cache3.sum(axis=1)
|
||||
|
||||
if layer.tp_size > 1:
|
||||
tensor_model_parallel_all_reduce(out)
|
||||
out = tensor_model_parallel_all_reduce(out)
|
||||
|
||||
return out
|
||||
|
||||
@@ -318,7 +318,7 @@ class CutlassWint2FusedMoeMethod(Wint2MoeMethod):
|
||||
)
|
||||
|
||||
if layer.tp_size > 1:
|
||||
tensor_model_parallel_all_reduce(fused_moe_out)
|
||||
fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
|
||||
|
||||
return fused_moe_out
|
||||
|
||||
@@ -488,6 +488,6 @@ class TritonWint2FusedMoeMethod(CutlassWint2FusedMoeMethod):
|
||||
fused_moe_out = paddle.sum(intermediate_cache3, axis=1)
|
||||
|
||||
if layer.tp_size > 1:
|
||||
tensor_model_parallel_all_reduce(fused_moe_out)
|
||||
fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
|
||||
|
||||
return fused_moe_out
|
||||
|
||||
@@ -73,7 +73,7 @@ class XPUMoEMethod(UnquantizedFusedMoEMethod):
|
||||
tensor_model_parallel_all_reduce,
|
||||
)
|
||||
|
||||
tensor_model_parallel_all_reduce(fused_moe_out)
|
||||
fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
|
||||
|
||||
return fused_moe_out
|
||||
|
||||
@@ -253,6 +253,6 @@ class XPUWeightOnlyMoEMethod(QuantMethodBase):
|
||||
tensor_model_parallel_all_reduce,
|
||||
)
|
||||
|
||||
tensor_model_parallel_all_reduce(fused_moe_out)
|
||||
fused_moe_out = tensor_model_parallel_all_reduce(fused_moe_out)
|
||||
|
||||
return fused_moe_out
|
||||
|
||||
@@ -180,7 +180,7 @@ class DeepSeekV3MoE(nn.Layer):
|
||||
moe_out = moe_out + shared_experts_out
|
||||
# We do to TP all reduce after the sum of experts.
|
||||
if self.tp_size > 1:
|
||||
tensor_model_parallel_all_reduce(moe_out)
|
||||
moe_out = tensor_model_parallel_all_reduce(moe_out)
|
||||
return moe_out
|
||||
|
||||
|
||||
|
||||
@@ -287,7 +287,7 @@ class Ernie4_5_VLMoE(nn.Layer):
|
||||
if self.num_shared_experts > 0:
|
||||
hidden_states += shared_experts_out
|
||||
if self.tp_size > 1:
|
||||
tensor_model_parallel_all_reduce(hidden_states)
|
||||
hidden_states = tensor_model_parallel_all_reduce(hidden_states)
|
||||
return hidden_states
|
||||
|
||||
|
||||
|
||||
@@ -54,14 +54,14 @@ class Test(unittest.TestCase):
|
||||
fa = CustomAllreduce(model_parallel_group)
|
||||
|
||||
for m, n in mns:
|
||||
data_cusom_ar = paddle.rand([m, n], dtype="bfloat16")
|
||||
data_paddle = data_cusom_ar.clone()
|
||||
if fa.should_custom_ar(data_cusom_ar):
|
||||
fa.custom_all_reduce(data_cusom_ar)
|
||||
data_custom_ar = paddle.rand([m, n], dtype="bfloat16")
|
||||
data_paddle = data_custom_ar.clone()
|
||||
if fa.should_custom_ar(data_custom_ar):
|
||||
data_custom_ar = fa.custom_all_reduce(data_custom_ar)
|
||||
dist.all_reduce(data_paddle)
|
||||
if dist.get_rank() == 0:
|
||||
np.testing.assert_allclose(
|
||||
data_cusom_ar.numpy(),
|
||||
data_custom_ar.numpy(),
|
||||
data_paddle.numpy(),
|
||||
rtol=1e-04,
|
||||
atol=1e-04,
|
||||
|
||||
Reference in New Issue
Block a user