From 702c313ed1c70c8eedbd0bbfdffaf4b9e5056617 Mon Sep 17 00:00:00 2001 From: yinwei Date: Sat, 9 Aug 2025 16:29:35 +0800 Subject: [PATCH] revert pr (#3286) --- fastdeploy/worker/xpu_model_runner.py | 21 +++++++-------------- fastdeploy/worker/xpu_worker.py | 13 ++++++------- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index a153e5556..a5558ac47 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -373,7 +373,7 @@ class XPUModelRunner(ModelRunnerBase): # Forward meta store the global meta information of the forward self.forward_meta: ForwardMeta = None - def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None): + def insert_tasks_v1(self, req_dicts: List[Request]): """ Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1 """ @@ -403,7 +403,7 @@ class XPUModelRunner(ModelRunnerBase): ) self.share_inputs["stop_flags"][idx : idx + 1] = False self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index - self.seq_lens_this_time_buffer[idx : idx + 1] = length + self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids) @@ -425,7 +425,7 @@ class XPUModelRunner(ModelRunnerBase): logger.debug(f"Handle preempted request {request} at idx {idx}") self.share_inputs["block_tables"][idx : idx + 1, :] = -1 self.share_inputs["stop_flags"][idx : idx + 1] = True - self.seq_lens_this_time_buffer[idx : idx + 1] = 0 + self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["is_block_step"][idx : idx + 1] = False @@ -462,9 +462,8 @@ class XPUModelRunner(ModelRunnerBase): ) if has_prefill_task: self.share_inputs["not_need_stop"][0] = True - self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests] - def process_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None): + def process_prefill_inputs(self, req_dicts: List[Request]): """Process inputs for prefill tasks and update share_inputs buffer""" req_len = len(req_dicts) for i in range(req_len): @@ -483,7 +482,7 @@ class XPUModelRunner(ModelRunnerBase): self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0) self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0) self.share_inputs["presence_score"][idx : idx + 1] = request.get("presence_penalty", 0.0) - self.seq_lens_this_time_buffer[idx : idx + 1] = length + self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 @@ -527,7 +526,6 @@ class XPUModelRunner(ModelRunnerBase): ) self.share_inputs["not_need_stop"][0] = True - self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests] def _init_share_inputs(self, max_num_seqs: int): """Initialize all share buffers for model inputs. @@ -573,7 +571,7 @@ class XPUModelRunner(ModelRunnerBase): self.share_inputs["max_length"] = paddle.full( [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" ) - self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32") + self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32") self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") @@ -815,7 +813,7 @@ class XPUModelRunner(ModelRunnerBase): idx = i self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) - self.seq_lens_this_time_buffer[idx : idx + 1] = input_length + self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 @@ -831,7 +829,6 @@ class XPUModelRunner(ModelRunnerBase): self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( idx * block_num, (idx + 1) * block_num, 1 ) - self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer def _dummy_run( self, @@ -925,10 +922,6 @@ class XPUModelRunner(ModelRunnerBase): self.cache_config.block_size, self.cache_config.enc_dec_block_num, ) - if num_running_requests is not None: - self.seq_lens_this_time_buffer[:num_running_requests].copy_( - self.share_inputs["seq_lens_this_time"][:num_running_requests], False - ) return None diff --git a/fastdeploy/worker/xpu_worker.py b/fastdeploy/worker/xpu_worker.py index a5993abb4..f3afb6f72 100644 --- a/fastdeploy/worker/xpu_worker.py +++ b/fastdeploy/worker/xpu_worker.py @@ -149,10 +149,9 @@ class XpuWorker(WorkerBase): num_running_requests: Optional[int] = None, ) -> Optional[ModelRunnerOutput]: """ """ - if is_dummy_run: - output = self.model_runner.execute_model(model_forward_batch) - else: - output = self.model_runner.execute_model(model_forward_batch, num_running_requests) + + output = self.model_runner.execute_model(model_forward_batch) + return output def exist_prefill(self): @@ -161,15 +160,15 @@ class XpuWorker(WorkerBase): """ return self.model_runner.exist_prefill() - def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None: + def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int = -1) -> None: """Process new requests and then start the decode loop TODO(gongshaotian):The scheduler should schedule the handling of prefill, and workers and modelrunners should not perceive it. """ if envs.ENABLE_V1_KVCACHE_SCHEDULER: - self.model_runner.insert_tasks_v1(req_dicts=req_dicts, num_running_requests=num_running_requests) + self.model_runner.insert_tasks_v1(req_dicts=req_dicts) else: - self.model_runner.process_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests) + self.model_runner.process_prefill_inputs(req_dicts=req_dicts) def check_health(self) -> bool: """ """