diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index c639c29ef..2896c1994 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -150,10 +150,17 @@ class GPUModelRunner(ModelRunnerBase): """ Check whether prefill stage finished """ - if int(paddle.max(self.share_inputs["seq_lens_encoder"])) != 0: - return 1 + if self.enable_mm: + # VL only support 1 batch to prefill + prefill_statue = (self.share_inputs["seq_lens_this_time"] != 0) & ( + self.share_inputs["seq_lens_this_time"] != 1 + ) + return not paddle.any(prefill_statue).numpy() else: - return 0 + if int(paddle.max(self.share_inputs["seq_lens_encoder"])) != 0: + return 1 + else: + return 0 def _init_speculative_proposer(self): """ diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 157300cd5..9c7e74e6c 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -285,10 +285,12 @@ class PaddleDisWorkerProc: # The first worker detects whether there are tasks in the task queue if self.local_rank % mp_num_per_node == 0: if self.task_queue.num_tasks() > 0: - if self.nnode > 1: - self.task_queue.read_finish_flag.set(1) - else: - self.exist_task_signal.value[self.fd_config.parallel_config.expert_parallel_rank] = 1 + # VL only support 1 batch to prefill + if not self.fd_config.model_config.enable_mm or self.worker.prefill_finished(): + if self.nnode > 1: + self.task_queue.read_finish_flag.set(1) + else: + self.exist_task_signal.value[self.fd_config.parallel_config.expert_parallel_rank] = 1 if self.parallel_config.tensor_parallel_size > 1: # Synchronize the signal for other workers @@ -344,8 +346,8 @@ class PaddleDisWorkerProc: # Execute model to generate token. The generated token will be written to the buffer. # These generated tokens can be obtained through get_output op. self.worker.execute_model(req_dicts) - - self.exist_prefill_task_signal.value[0] = self.worker.prefill_finished() + if not self.fd_config.model_config.enable_mm: + self.exist_prefill_task_signal.value[0] = self.worker.prefill_finished() def initialize_kv_cache(self) -> None: """Profiles the peak memory usage of the model to determine how many