fix ep prefill (#2762)

This commit is contained in:
RichardWooSJTU
2025-07-09 14:03:05 +08:00
committed by GitHub
parent c4718fd693
commit fee544e808
7 changed files with 66 additions and 32 deletions

View File

@@ -33,6 +33,7 @@ from fastdeploy.config import FDConfig
from fastdeploy.model_executor.layers.attention.attention import Attention
from fastdeploy.model_executor.layers.attention.base_attention_backend import (
AttentionBackend, AttentionMetadata)
from fastdeploy.model_executor.layers.attention.utils import init_rank_and_device_id
from fastdeploy.worker.forward_meta import ForwardMeta
@@ -91,7 +92,6 @@ class AppendAttentionBackend(AttentionBackend):
self.use_speculate: bool = self.speculative_method is not None
self.speculate_max_draft_token_num: int = fd_config.speculative_config.num_speculative_tokens
self.keep_pd_step_flag: bool = fd_config.speculative_config.model_type == "mtp"
self.rank: int = fd_config.parallel_config.tensor_parallel_rank
self.kv_num_heads: int = kv_num_heads
self.num_heads: int = num_heads
@@ -104,16 +104,11 @@ class AppendAttentionBackend(AttentionBackend):
self.use_pd_disaggregation: int = int(
os.getenv("FLAGS_use_pd_disaggregation", 0))
self.start_layer_index: int = fd_config.model_config.start_layer_index
self.device_id: int = os.getenv("CUDA_VISIBLE_DEVICES", None)
if fd_config.parallel_config.expert_parallel_rank is None:
fd_config.parallel_config.expert_parallel_rank = 0
device_id = self.rank + fd_config.parallel_config.tensor_parallel_degree * \
fd_config.parallel_config.expert_parallel_rank
if self.device_id is None:
self.device_id = device_id
else:
self.device_id = self.device_id.split(",")[device_id]
self.rank, self.device_id = init_rank_and_device_id(fd_config)
def init_attention_metadata(self, forward_meta: ForwardMeta):
"""Initialize attntion metadata hence all layers in the forward pass can reuse it."""

View File

@@ -34,6 +34,7 @@ from fastdeploy.model_executor.layers.attention.base_attention_backend import (
from fastdeploy.model_executor.layers.attention.ops import (
get_block_shape_and_split_kv_block, gqa_rope_write_cache,
init_signal_layerwise, open_shm_and_get_meta_signal, pre_cache_len_concat)
from fastdeploy.model_executor.layers.attention.utils import init_rank_and_device_id
from fastdeploy.worker.forward_meta import ForwardMeta
@@ -100,22 +101,16 @@ class FlashAttentionBackend(AttentionBackend):
self.use_speculate = self.speculative_method is not None
self.speculate_max_draft_token_num = fd_config.speculative_config.num_speculative_tokens
self.keep_pd_step_flag: bool = fd_config.speculative_config.model_type == "mtp"
self.rank: int = fd_config.parallel_config.tensor_parallel_rank
# pd_disaggregation
self.use_pd_disaggregation: int = int(
os.getenv("FLAGS_use_pd_disaggregation", 0))
self.start_layer_index: int = fd_config.model_config.start_layer_index
self.device_id: int = os.getenv("CUDA_VISIBLE_DEVICES", None)
if fd_config.parallel_config.expert_parallel_rank is None:
fd_config.parallel_config.expert_parallel_rank = 0
device_id = self.rank + fd_config.parallel_config.tensor_parallel_degree * \
fd_config.parallel_config.expert_parallel_rank
if self.device_id is None:
self.device_id = device_id
else:
self.device_id = self.device_id.split(",")[device_id]
self.rank, self.device_id = init_rank_and_device_id(fd_config)
def get_attntion_meta(self):
"""get_attntion_meta"""

View File

@@ -41,6 +41,7 @@ from fastdeploy.config import FDConfig
from fastdeploy.model_executor.layers.attention.attention import Attention
from fastdeploy.model_executor.layers.attention.base_attention_backend import (
AttentionBackend, AttentionMetadata)
from fastdeploy.model_executor.layers.attention.utils import init_rank_and_device_id
from fastdeploy.worker.forward_meta import ForwardMeta
@@ -109,7 +110,6 @@ class MLAAttentionBackend(AttentionBackend):
self.use_speculate: bool = self.speculative_method is not None
self.speculate_max_draft_token_num: int = fd_config.speculative_config.num_speculative_tokens
self.keep_pd_step_flag: bool = fd_config.speculative_config.model_type == "mtp"
self.rank: int = fd_config.parallel_config.tensor_parallel_rank
self.kv_num_heads: int = kv_num_heads
self.num_heads: int = num_heads
@@ -135,10 +135,8 @@ class MLAAttentionBackend(AttentionBackend):
os.getenv("FLAGS_use_pd_disaggregation", 0))
self.start_layer_index: int = fd_config.model_config.start_layer_index
self.device_id: int = os.getenv("CUDA_VISIBLE_DEVICES", None)
if self.device_id is None:
self.device_id = self.rank
else:
self.device_id = self.device_id.split(",")[self.rank]
self.rank, self.device_id = init_rank_and_device_id(fd_config)
def init_attention_metadata(self, forward_meta: ForwardMeta):
"""Initialize attention metadata hence all layers in the forward pass can reuse it."""

View File

@@ -0,0 +1,36 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import os
from fastdeploy.config import FDConfig
def init_rank_and_device_id(fd_config: FDConfig):
"""
"""
rank = (fd_config.parallel_config.expert_parallel_rank *
fd_config.parallel_config.tensor_parallel_degree + fd_config.parallel_config.tensor_parallel_rank)
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES", None)
if cuda_visible_devices is None:
device_id = rank
else:
cuda_visible_devices = cuda_visible_devices.split(",")
rank_index = rank % len(cuda_visible_devices)
device_id = cuda_visible_devices[rank_index]
return rank, device_id

View File

@@ -144,7 +144,10 @@ class DeepGemmFusedMoeMethod(MoEMethodBase):
if token_all_num > 0:
logger.info(f"token_all_num {token_all_num}")
(recv_x, recv_x_scale) = recv_x
tmp = count_tokens_per_expert_func(recv_topk_idx, layer.num_local_experts)
token_nums_this_rank = count_tokens_per_expert_func(recv_topk_idx, layer.num_local_experts)
token_nums_this_rank_padded = sum(token_nums_this_rank[1].numpy().tolist())
(
permute_input,
permute_scale,
@@ -160,8 +163,10 @@ class DeepGemmFusedMoeMethod(MoEMethodBase):
recv_x_scale,
recv_topk_idx,
recv_topk_weights,
tmp[0],
tmp[1]
token_nums_this_rank[0],
token_nums_this_rank[1],
True, # use_in_ep
token_nums_this_rank_padded,
)
permute_scale = permute_scale.transpose([1, 0]).contiguous()
@@ -328,6 +333,8 @@ class DeepGemmFusedMoeMethod(MoEMethodBase):
topk_weights,
tmp[0],
tmp[1],
False, # use_in_ep
-1,
)
permute_scale = permute_scale.transpose([1, 0]).contiguous()