[Intel HPU] fix memory fragmentation issue due to warmup process and fix moe all_reduce issue (#5357)

This commit is contained in:
fmiao2372
2025-12-04 11:29:41 +08:00
committed by GitHub
parent 946025480e
commit 209006e6a6
3 changed files with 14 additions and 11 deletions

View File

@@ -18,7 +18,6 @@ import paddle
from paddle import nn
from fastdeploy import envs
from fastdeploy.distributed.communication import tensor_model_parallel_all_reduce_custom
from fastdeploy.model_executor.layers.moe.fused_moe_backend_base import (
UnquantizedFusedMoEMethod,
)
@@ -96,8 +95,6 @@ class HpuMoEMethod(UnquantizedFusedMoEMethod):
experts_max=layer.expert_id_offset + layer.num_local_experts - 1,
chunk_size=chunk_size,
)
if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce_custom(fused_moe_out)
return fused_moe_out
@@ -198,7 +195,4 @@ class HpuTensorWiseFP8MoEMethod(HpuMoEMethod):
chunk_size=chunk_size,
)
if layer.reduce_results and layer.tp_size > 1:
tensor_model_parallel_all_reduce_custom(fused_moe_out)
return fused_moe_out

View File

@@ -21,7 +21,10 @@ from paddle import nn
from paddleformers.utils.log import logger
from fastdeploy import envs
from fastdeploy.distributed.communication import tensor_model_parallel_all_reduce
from fastdeploy.distributed.communication import (
tensor_model_parallel_all_reduce,
tensor_model_parallel_all_reduce_custom,
)
from fastdeploy.model_executor.layers.utils import get_tensor
from fastdeploy.model_executor.utils import h2d_copy, slice_fn
from fastdeploy.platforms import current_platform
@@ -643,7 +646,10 @@ class FusedMoE(nn.Layer):
out = self.forward_normal(x, gate)
if self.reduce_results and self.tp_size > 1:
out = tensor_model_parallel_all_reduce(out, self.tp_group)
if current_platform.is_intel_hpu():
tensor_model_parallel_all_reduce_custom(out)
else:
out = tensor_model_parallel_all_reduce(out, self.tp_group)
return out
def forward_chunked_moe(self, x: paddle.Tensor, gate: nn.Layer):

View File

@@ -1120,10 +1120,11 @@ class HPUModelRunner(ModelRunnerBase):
max_prefill_length = self.cache_config.block_size + warmup_max_model_len
prefill_context_block_step = int(os.environ.get("CONTEXT_BLOCK_STEP_PREFILL", 1))
prefill_batchs.reverse()
prefill_length_with_contexts = list(range(self.cache_config.block_size, max_prefill_length, prefill_seq_step))
prefill_length_with_contexts.reverse()
for prefill_batch in prefill_batchs:
for prefill_length_with_context in range(
self.cache_config.block_size, max_prefill_length, prefill_seq_step
):
for prefill_length_with_context in prefill_length_with_contexts:
if prefill_length_with_context * prefill_batch > self.scheduler_config.max_num_batched_tokens:
continue
for context_len in range(
@@ -1171,6 +1172,8 @@ class HPUModelRunner(ModelRunnerBase):
current_decode_block_num += decode_block_num_step
logger.info(f"warmup decode_batchs: {decode_batchs}, decode_block_nums: {decode_block_nums} start")
decode_batchs.reverse()
decode_block_nums.reverse()
for decode_batch in decode_batchs:
for decode_block_num in decode_block_nums:
if decode_block_num < decode_batch: