mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-10-07 17:41:52 +08:00
[Cherry-Pick][BugFix] Add prefill restrictions for chunked_prefill+VL (#2984)
This commit is contained in:
@@ -140,7 +140,14 @@ class GPUModelRunner(ModelRunnerBase):
|
|||||||
"""
|
"""
|
||||||
Check whether prefill stage finished
|
Check whether prefill stage finished
|
||||||
"""
|
"""
|
||||||
if int(paddle.max(self.share_inputs['seq_lens_encoder'])) != 0:
|
if self.enable_mm:
|
||||||
|
# VL only support 1 batch to prefill
|
||||||
|
prefill_statue = (self.share_inputs["seq_lens_this_time"] != 0) & (
|
||||||
|
self.share_inputs["seq_lens_this_time"] != 1
|
||||||
|
)
|
||||||
|
return not paddle.any(prefill_statue).numpy()
|
||||||
|
else:
|
||||||
|
if int(paddle.max(self.share_inputs["seq_lens_encoder"])) != 0:
|
||||||
return 1
|
return 1
|
||||||
else:
|
else:
|
||||||
return 0
|
return 0
|
||||||
|
@@ -23,10 +23,10 @@ import paddle
|
|||||||
import paddle.distributed as dist
|
import paddle.distributed as dist
|
||||||
import paddle.distributed.fleet as fleet
|
import paddle.distributed.fleet as fleet
|
||||||
|
|
||||||
from fastdeploy.config import (DecodingConfig, DeviceConfig, FDConfig,
|
from fastdeploy.config import (DecodingConfig, DeviceConfig,
|
||||||
|
ErnieArchitectures, FDConfig,
|
||||||
GraphOptimizationConfig, LoadConfig,
|
GraphOptimizationConfig, LoadConfig,
|
||||||
ModelConfig, ParallelConfig, SpeculativeConfig,
|
ModelConfig, ParallelConfig, SpeculativeConfig)
|
||||||
ErnieArchitectures)
|
|
||||||
from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer
|
from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer
|
||||||
from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue
|
from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue
|
||||||
from fastdeploy.inter_communicator import IPCSignal
|
from fastdeploy.inter_communicator import IPCSignal
|
||||||
@@ -277,12 +277,12 @@ class PaddleDisWorkerProc():
|
|||||||
# The first worker detects whether there are tasks in the task queue
|
# The first worker detects whether there are tasks in the task queue
|
||||||
if self.local_rank % mp_num_per_node == 0:
|
if self.local_rank % mp_num_per_node == 0:
|
||||||
if self.task_queue.num_tasks() > 0:
|
if self.task_queue.num_tasks() > 0:
|
||||||
|
# VL only support 1 batch to prefill
|
||||||
|
if not self.fd_config.model_config.enable_mm or self.worker.prefill_finished():
|
||||||
if self.nnode > 1:
|
if self.nnode > 1:
|
||||||
self.task_queue.read_finish_flag.set(1)
|
self.task_queue.read_finish_flag.set(1)
|
||||||
else:
|
else:
|
||||||
self.exist_task_signal.value[
|
self.exist_task_signal.value[self.fd_config.parallel_config.expert_parallel_rank] = 1
|
||||||
self.fd_config.parallel_config.
|
|
||||||
expert_parallel_rank] = 1
|
|
||||||
|
|
||||||
if self.parallel_config.tensor_parallel_size > 1:
|
if self.parallel_config.tensor_parallel_size > 1:
|
||||||
# Synchronize the signal for other workers
|
# Synchronize the signal for other workers
|
||||||
@@ -332,10 +332,8 @@ class PaddleDisWorkerProc():
|
|||||||
# Execute model to generate token. The generated token will be written to the buffer.
|
# Execute model to generate token. The generated token will be written to the buffer.
|
||||||
# These generated tokens can be obtained through get_output op.
|
# These generated tokens can be obtained through get_output op.
|
||||||
self.worker.execute_model(req_dicts)
|
self.worker.execute_model(req_dicts)
|
||||||
|
if not self.fd_config.model_config.enable_mm:
|
||||||
self.exist_prefill_task_signal.value[
|
self.exist_prefill_task_signal.value[0] = self.worker.prefill_finished()
|
||||||
0] = self.worker.prefill_finished()
|
|
||||||
|
|
||||||
|
|
||||||
def determine_num_available_blocks(self) -> None:
|
def determine_num_available_blocks(self) -> None:
|
||||||
"""Profiles the peak memory usage of the model to determine how many
|
"""Profiles the peak memory usage of the model to determine how many
|
||||||
|
Reference in New Issue
Block a user