diff --git a/fastdeploy/spec_decode/mtp.py b/fastdeploy/spec_decode/mtp.py index 39f0fce42..3033e4146 100644 --- a/fastdeploy/spec_decode/mtp.py +++ b/fastdeploy/spec_decode/mtp.py @@ -107,7 +107,7 @@ class MTPProposer(Proposer): idx = i self.model_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.model_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) - self.model_inputs["seq_lens_this_time"][idx : idx + 1] = input_length + self.seq_lens_this_time_buffer[idx : idx + 1] = input_length self.model_inputs["seq_lens_encoder"][idx : idx + 1] = input_length self.model_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.model_inputs["step_idx"][idx : idx + 1] = 0 @@ -118,6 +118,7 @@ class MTPProposer(Proposer): self.model_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( idx * block_num, (idx + 1) * block_num, 1 ) + self.model_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer def initialize_kv_cache(self): """ @@ -263,7 +264,8 @@ class MTPProposer(Proposer): # Same shape/dytpe with base model self.model_inputs["block_tables"] = paddle.clone(self.main_model_inputs["block_tables"]) self.model_inputs["input_ids"] = paddle.clone(self.main_model_inputs["input_ids"]) - self.model_inputs["seq_lens_this_time"] = paddle.clone(self.main_model_inputs["seq_lens_this_time"]) + self.seq_lens_this_time_buffer = paddle.clone(self.main_model_inputs["seq_lens_this_time"]) + self.model_inputs["seq_lens_encoder"] = paddle.clone(self.main_model_inputs["seq_lens_encoder"]) self.model_inputs["seq_lens_decoder"] = paddle.clone(self.main_model_inputs["seq_lens_decoder"]) self.model_inputs["step_idx"] = paddle.clone(self.main_model_inputs["step_idx"]) @@ -338,7 +340,7 @@ class MTPProposer(Proposer): self.main_model_inputs["seq_lens_this_time"], fill_value=-1, dtype="int32" ) - def insert_prefill_inputs(self, req_dicts: List[Request]): + def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int): """ Process inputs for prefill tasks and insert it to model_inputs buffer """ @@ -372,7 +374,7 @@ class MTPProposer(Proposer): self.model_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.model_inputs["seq_lens_decoder"][idx : idx + 1] = length - self.model_inputs["seq_lens_this_time"][idx : idx + 1] = prefill_token_num + self.seq_lens_this_time_buffer[idx : idx + 1] = prefill_token_num self.model_inputs["stop_flags"][idx : idx + 1] = False self.model_inputs["batch_drop"][idx : idx + 1] = False @@ -397,10 +399,10 @@ class MTPProposer(Proposer): if self.cache_config.enable_chunked_prefill: token_chunk_size = request.prefill_chunk_info[0] self.model_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size - self.model_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size + self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size else: self.model_inputs["seq_lens_encoder"][idx : idx + 1] = length - self.model_inputs["seq_lens_this_time"][idx : idx + 1] = length + self.seq_lens_this_time_buffer[idx : idx + 1] = length self.model_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.model_inputs["stop_flags"][idx : idx + 1] = False @@ -413,6 +415,7 @@ class MTPProposer(Proposer): request.get("block_tables"), dtype="int32" ) self.model_inputs["not_need_stop"][0] = True + self.model_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests] def _initialize_forward_meta(self): """ diff --git a/fastdeploy/worker/gcu_model_runner.py b/fastdeploy/worker/gcu_model_runner.py index 541aad349..e0086b503 100644 --- a/fastdeploy/worker/gcu_model_runner.py +++ b/fastdeploy/worker/gcu_model_runner.py @@ -152,9 +152,11 @@ class GCUModelRunner(ModelRunnerBase): schemata_key, ) - def insert_prefill_inputs(self, req_dicts: List[Request]): + def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None): """ Process inputs for prefill tasks and insert it to share_inputs buffer + req_dict: A list of Request dict + num_running_requests: batch_size """ if req_dicts[-1].disaggregate_info is not None and req_dicts[-1].disaggregate_info["role"] == "prefill": @@ -193,7 +195,7 @@ class GCUModelRunner(ModelRunnerBase): self.share_inputs["prompt_ids"][idx : idx + 1, :length] = np.array(request.prompt_token_ids) self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1 + self.seq_lens_this_time_buffer[idx : idx + 1] = 1 self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length @@ -205,7 +207,7 @@ class GCUModelRunner(ModelRunnerBase): request.draft_token_ids[0:num_prefill_send_token], dtype="int64", ) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token + self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token else: self.share_inputs["pre_ids"][idx : idx + 1] = -1 self.share_inputs["step_idx"][idx : idx + 1] = 0 @@ -222,14 +224,14 @@ class GCUModelRunner(ModelRunnerBase): ) self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size + self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = token_chunk_size self.share_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size self.share_inputs["prompt_lens"][idx : idx + 1] = token_chunk_size else: self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length + self.seq_lens_this_time_buffer[idx : idx + 1] = length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length @@ -295,6 +297,7 @@ class GCUModelRunner(ModelRunnerBase): if self.speculative_method in ["mtp"]: self.proposer.insert_prefill_inputs(req_dicts) + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests] def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int): """Set dummy prefill inputs to share_inputs""" @@ -313,7 +316,7 @@ class GCUModelRunner(ModelRunnerBase): self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length + self.seq_lens_this_time_buffer[idx : idx + 1] = input_length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 @@ -331,6 +334,7 @@ class GCUModelRunner(ModelRunnerBase): self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( idx * block_num, (idx + 1) * block_num, 1 ) + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer def _init_share_inputs(self, max_num_seqs: int): """ @@ -381,7 +385,7 @@ class GCUModelRunner(ModelRunnerBase): self.share_inputs["max_length"] = paddle.full( [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" ) - self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32") + self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32") self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") @@ -923,6 +927,7 @@ class GCUModelRunner(ModelRunnerBase): def execute_model( self, model_forward_batch: Optional[List[Request]] = None, + num_running_requests: int = None, ) -> Optional[ModelRunnerOutput]: """ The Entrance of model execute. @@ -930,6 +935,7 @@ class GCUModelRunner(ModelRunnerBase): model_forward_batch: 'Request' contains information related to prompt and is an abstract class at the server level, which is too granular for ModelRunner. We plan to replace it with 'ModelForwardBatch'. + num_running_requests: batch_size intermediate_tensors: """ # If `not_need_stop`` is False, it means the current worker is in an idle state. @@ -1055,6 +1061,9 @@ class GCUModelRunner(ModelRunnerBase): self._update_chunked_prefill(model_forward_batch) self._add_cache(model_forward_batch) + self.seq_lens_this_time_buffer[:num_running_requests].copy_( + self.share_inputs["seq_lens_this_time"][:num_running_requests], False + ) return None def _add_cache(self, model_forward_batch) -> None: diff --git a/fastdeploy/worker/gcu_worker.py b/fastdeploy/worker/gcu_worker.py index 77a8a50d4..a16836780 100644 --- a/fastdeploy/worker/gcu_worker.py +++ b/fastdeploy/worker/gcu_worker.py @@ -105,17 +105,18 @@ class GcuWorker(WorkerBase): def execute_model( self, model_forward_batch: Optional[List[Request]] = None, + num_running_requests: int = None, ) -> Optional[ModelRunnerOutput]: """ """ - output = self.model_runner.execute_model(model_forward_batch) + output = self.model_runner.execute_model(model_forward_batch, num_running_requests) return output - def preprocess_new_task(self, req_dicts: List[Request]) -> None: + def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None: """Process new requests and then start the decode loop TODO(gongshaotian):The scheduler should schedule the handling of prefill, and workers and modelrunners should not perceive it. """ - self.model_runner.insert_prefill_inputs(req_dicts=req_dicts) + self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests) def graph_optimize_and_warm_up_model(self) -> None: """ diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 548429c6f..70c37d245 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -164,6 +164,7 @@ class GPUModelRunner(ModelRunnerBase): if self.speculative_method == "ngram": self.proposer = NgramProposer(self.fd_config) elif self.speculative_method == "mtp": + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer self.proposer = MTPProposer( self.fd_config, self.get_model(), @@ -193,9 +194,11 @@ class GPUModelRunner(ModelRunnerBase): return self.guided_backend.get_logits_processor(schemata_key=schemata_key), schemata_key - def insert_tasks_v1(self, req_dicts: List[Request]): + def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None): """ Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1 + req_dict: A list of Request dict + num_running_requests: batch_size """ # NOTE(luotingdan): Lazy initialize kv cache if "caches" not in self.share_inputs: @@ -264,7 +267,7 @@ class GPUModelRunner(ModelRunnerBase): ) self.share_inputs["stop_flags"][idx : idx + 1] = False self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length + self.seq_lens_this_time_buffer[idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids) @@ -286,7 +289,7 @@ class GPUModelRunner(ModelRunnerBase): logger.debug(f"Handle preempted request {request} at idx {idx}") self.share_inputs["block_tables"][idx : idx + 1, :] = -1 self.share_inputs["stop_flags"][idx : idx + 1] = True - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 0 + self.seq_lens_this_time_buffer[idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["is_block_step"][idx : idx + 1] = False @@ -328,10 +331,13 @@ class GPUModelRunner(ModelRunnerBase): if has_prefill_task: self.share_inputs["not_need_stop"][0] = True + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests] - def insert_prefill_inputs(self, req_dicts: List[Request]): + def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None): """ Process inputs for prefill tasks and insert it to share_inputs buffer + req_dict: A list of Request dict + num_running_requests: batch_size TODO(gongshaotian): Refactor this func """ @@ -365,7 +371,7 @@ class GPUModelRunner(ModelRunnerBase): self.share_inputs["prompt_ids"][idx : idx + 1, :length] = np.array(request.prompt_token_ids) self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1 + self.seq_lens_this_time_buffer[idx : idx + 1] = 1 self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length @@ -377,7 +383,7 @@ class GPUModelRunner(ModelRunnerBase): request.draft_token_ids[0:num_prefill_send_token], dtype="int64", ) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token + self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token else: self.share_inputs["pre_ids"][idx : idx + 1] = -1 self.share_inputs["step_idx"][idx : idx + 1] = 0 @@ -412,7 +418,7 @@ class GPUModelRunner(ModelRunnerBase): ) self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size + self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = token_chunk_size self.share_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size self.share_inputs["prompt_lens"][idx : idx + 1] = token_chunk_size @@ -430,7 +436,7 @@ class GPUModelRunner(ModelRunnerBase): else: self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length + self.seq_lens_this_time_buffer[idx : idx + 1] = length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length @@ -516,8 +522,10 @@ class GPUModelRunner(ModelRunnerBase): self.share_inputs["not_need_stop"][0] = True + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests] + if self.speculative_method in ["mtp"]: - self.proposer.insert_prefill_inputs(req_dicts) + self.proposer.insert_prefill_inputs(req_dicts, num_running_requests) def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int): """Set dummy prefill inputs to share_inputs""" @@ -543,7 +551,7 @@ class GPUModelRunner(ModelRunnerBase): self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length + self.seq_lens_this_time_buffer[idx : idx + 1] = input_length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 @@ -561,6 +569,7 @@ class GPUModelRunner(ModelRunnerBase): self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( idx * block_num, (idx + 1) * block_num, 1 ) + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer def _init_share_inputs(self, max_num_seqs: int): """ @@ -611,7 +620,7 @@ class GPUModelRunner(ModelRunnerBase): self.share_inputs["max_length"] = paddle.full( [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" ) - self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32") + self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32") self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") @@ -1255,6 +1264,7 @@ class GPUModelRunner(ModelRunnerBase): def execute_model( self, model_forward_batch: Optional[List[Request]] = None, + num_running_requests: int = None, ) -> Optional[ModelRunnerOutput]: """ The Entrance of model execute. @@ -1263,6 +1273,7 @@ class GPUModelRunner(ModelRunnerBase): class at the server level, which is too granular for ModelRunner. We plan to replace it with 'ModelForwardBatch'. intermediate_tensors: + num_running_requests: batch_size """ # 1. Prepare inputs of model and sampler. skip_idx_list = self._get_skip_idx(model_forward_batch) @@ -1364,8 +1375,8 @@ class GPUModelRunner(ModelRunnerBase): accept_num=(self.share_inputs["accept_num"] if self.speculative_decoding else None), enable_thinking=(self.share_inputs["enable_thinking"] if self.enable_mm else None), think_end_id=(self.model_config.think_end_id if self.enable_mm else -1), - need_think_end=(self.share_inputs["need_think_end"] if self.enable_mm else None), - reasoning_index=(self.share_inputs["reasoning_index"] if self.enable_mm else None), + need_think_end=(self.share_inputs["need_think_end"][:num_running_requests] if self.enable_mm else None), + reasoning_index=(self.share_inputs["reasoning_index"][:num_running_requests] if self.enable_mm else None), stop_token_ids=self.share_inputs["stop_seqs"], stop_seqs_len=self.share_inputs["stop_seqs_len"], ) @@ -1405,6 +1416,10 @@ class GPUModelRunner(ModelRunnerBase): self._update_chunked_prefill(model_forward_batch) self._add_cache(model_forward_batch) + + self.seq_lens_this_time_buffer[:num_running_requests].copy_( + self.share_inputs["seq_lens_this_time"][:num_running_requests], False + ) return None def _add_cache(self, model_forward_batch) -> None: diff --git a/fastdeploy/worker/gpu_worker.py b/fastdeploy/worker/gpu_worker.py index 242559413..94dc5fc19 100644 --- a/fastdeploy/worker/gpu_worker.py +++ b/fastdeploy/worker/gpu_worker.py @@ -181,20 +181,21 @@ class GpuWorker(WorkerBase): def execute_model( self, model_forward_batch: Optional[List[Request]] = None, + num_running_request: int = None, ) -> Optional[ModelRunnerOutput]: """ """ - output = self.model_runner.execute_model(model_forward_batch) + output = self.model_runner.execute_model(model_forward_batch, num_running_request) return output - def preprocess_new_task(self, req_dicts: List[Request]) -> None: + def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None: """Process new requests and then start the decode loop TODO(gongshaotian):The scheduler should schedule the handling of prefill, and workers and modelrunners should not perceive it. """ if envs.ENABLE_V1_KVCACHE_SCHEDULER: - self.model_runner.insert_tasks_v1(req_dicts=req_dicts) + self.model_runner.insert_tasks_v1(req_dicts=req_dicts, num_running_requests=num_running_requests) else: - self.model_runner.insert_prefill_inputs(req_dicts=req_dicts) + self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests) def graph_optimize_and_warm_up_model(self) -> None: """ diff --git a/fastdeploy/worker/iluvatar_model_runner.py b/fastdeploy/worker/iluvatar_model_runner.py index a5652edda..4a7aaaf8d 100644 --- a/fastdeploy/worker/iluvatar_model_runner.py +++ b/fastdeploy/worker/iluvatar_model_runner.py @@ -142,9 +142,10 @@ class IluvatarModelRunner(ModelRunnerBase): schemata_key, ) - def insert_prefill_inputs(self, req_dicts: List[Request]): + def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None): """ Process inputs for prefill tasks and insert it to share_inputs buffer + num_running_requests: batch_size TODO(gongshaotian): Refactor this func """ @@ -176,7 +177,7 @@ class IluvatarModelRunner(ModelRunnerBase): self.share_inputs["input_ids"][idx : idx + 1, 0] = request.prompt_token_ids[0] self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1 + self.seq_lens_this_time_buffer[idx : idx + 1] = 1 self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length @@ -188,7 +189,7 @@ class IluvatarModelRunner(ModelRunnerBase): request.draft_token_ids[0:num_prefill_send_token], dtype="int64", ) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token + self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token else: self.share_inputs["pre_ids"][idx : idx + 1] = -1 self.share_inputs["step_idx"][idx : idx + 1] = 0 @@ -199,7 +200,7 @@ class IluvatarModelRunner(ModelRunnerBase): request.set("chunk_idx", 1) logger.info(f"prefill_chunk_info: {request.prefill_chunk_info}") token_chunk_size = request.prefill_chunk_info[0] - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size + self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size self.share_inputs["input_ids"][idx, :token_chunk_size] = np.array( request.prompt_token_ids[:token_chunk_size] ) @@ -211,7 +212,7 @@ class IluvatarModelRunner(ModelRunnerBase): else: self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length + self.seq_lens_this_time_buffer[idx : idx + 1] = length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length @@ -264,6 +265,7 @@ class IluvatarModelRunner(ModelRunnerBase): self.sampler.apply_logits_processor(idx, request.get("logits_processor"), prefill_tokens) self.share_inputs["not_need_stop"][0] = True + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests] def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int): """Set dummy prefill inputs to share_inputs""" @@ -283,7 +285,7 @@ class IluvatarModelRunner(ModelRunnerBase): self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length + self.seq_lens_this_time_buffer[idx : idx + 1] = input_length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 @@ -299,6 +301,7 @@ class IluvatarModelRunner(ModelRunnerBase): self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( idx * block_num, (idx + 1) * block_num, 1 ) + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer def _init_share_inputs(self, max_num_seqs: int): """Initialize all share buffers for model inputs. @@ -344,7 +347,7 @@ class IluvatarModelRunner(ModelRunnerBase): self.share_inputs["max_dec_len"] = paddle.full([max_num_seqs, 1], self.model_config.max_length, dtype="int64") self.share_inputs["min_length"] = paddle.full([max_num_seqs, 1], self.model_config.min_length, dtype="int64") self.share_inputs["max_length"] = paddle.full([max_num_seqs, 1], self.model_config.max_length, dtype="int64") - self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32") + self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32") self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") @@ -861,6 +864,7 @@ class IluvatarModelRunner(ModelRunnerBase): def execute_model( self, model_forward_batch: Optional[List[Request]] = None, + num_running_requests: int = None, ) -> Optional[ModelRunnerOutput]: """ The Entrance of model execute. @@ -868,6 +872,7 @@ class IluvatarModelRunner(ModelRunnerBase): model_forward_batch: 'Request' contains information related to prompt and is an abstract class at the server level, which is too granular for ModelRunner. We plan to replace it with 'ModelForwardBatch'. + num_running_requests: batch_size intermediate_tensors: """ # Note(@wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state. @@ -988,6 +993,9 @@ class IluvatarModelRunner(ModelRunnerBase): self._update_chunked_prefill(model_forward_batch) self._add_cache(model_forward_batch) + self.seq_lens_this_time_buffer[:num_running_requests].copy_( + self.share_inputs["seq_lens_this_time"][:num_running_requests], False + ) return None def _add_cache(self, model_forward_batch) -> None: diff --git a/fastdeploy/worker/iluvatar_worker.py b/fastdeploy/worker/iluvatar_worker.py index 6c390584f..cd899619b 100644 --- a/fastdeploy/worker/iluvatar_worker.py +++ b/fastdeploy/worker/iluvatar_worker.py @@ -106,17 +106,18 @@ class IluvatarWorker(WorkerBase): def execute_model( self, model_forward_batch: Optional[List[Request]] = None, + num_running_requests: int = None, ) -> Optional[ModelRunnerOutput]: """ """ - output = self.model_runner.execute_model(model_forward_batch) + output = self.model_runner.execute_model(model_forward_batch, num_running_requests) return output - def preprocess_new_task(self, req_dicts: List[Request]) -> None: + def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None: """Process new requests and then start the decode loop TODO(gongshaotian):The scheduler should schedule the handling of prefill, and workers and modelrunners should not perceive it. """ - self.model_runner.insert_prefill_inputs(req_dicts=req_dicts) + self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests) def graph_optimize_and_warm_up_model(self) -> None: """ diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index eace66487..d123e7dff 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -257,11 +257,11 @@ class PaddleDisWorkerProc: f"num_insert_requests: {len(req_dicts)}" ) # Process prefill inputs - self.worker.preprocess_new_task(req_dicts) + self.worker.preprocess_new_task(req_dicts, num_running_requests) # Execute model to generate token. The generated token will be written to the buffer. # These generated tokens can be obtained through get_output op. - self.worker.execute_model() + self.worker.execute_model(num_running_requests) def event_loop_normal(self) -> None: """Main event loop for Paddle Distrubuted Workers. @@ -338,7 +338,7 @@ class PaddleDisWorkerProc: ) # Process prefill inputs - self.worker.preprocess_new_task(req_dicts) + self.worker.preprocess_new_task(req_dicts, num_running_requests) if not self.worker.model_runner.not_need_stop(): if self.ranks > 1: @@ -349,7 +349,7 @@ class PaddleDisWorkerProc: # Execute model to generate token. The generated token will be written to the buffer. # These generated tokens can be obtained through get_output op. - self.worker.execute_model(req_dicts) + self.worker.execute_model(req_dicts, num_running_requests) self.exist_prefill_task_signal.value[0] = self.worker.exist_prefill() def initialize_kv_cache(self) -> None: diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index 7a787115c..a153e5556 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -373,7 +373,7 @@ class XPUModelRunner(ModelRunnerBase): # Forward meta store the global meta information of the forward self.forward_meta: ForwardMeta = None - def insert_tasks_v1(self, req_dicts: List[Request]): + def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None): """ Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1 """ @@ -403,7 +403,7 @@ class XPUModelRunner(ModelRunnerBase): ) self.share_inputs["stop_flags"][idx : idx + 1] = False self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length + self.seq_lens_this_time_buffer[idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids) @@ -425,7 +425,7 @@ class XPUModelRunner(ModelRunnerBase): logger.debug(f"Handle preempted request {request} at idx {idx}") self.share_inputs["block_tables"][idx : idx + 1, :] = -1 self.share_inputs["stop_flags"][idx : idx + 1] = True - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 0 + self.seq_lens_this_time_buffer[idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["is_block_step"][idx : idx + 1] = False @@ -462,8 +462,9 @@ class XPUModelRunner(ModelRunnerBase): ) if has_prefill_task: self.share_inputs["not_need_stop"][0] = True + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests] - def process_prefill_inputs(self, req_dicts: List[Request]): + def process_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None): """Process inputs for prefill tasks and update share_inputs buffer""" req_len = len(req_dicts) for i in range(req_len): @@ -482,7 +483,7 @@ class XPUModelRunner(ModelRunnerBase): self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0) self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0) self.share_inputs["presence_score"][idx : idx + 1] = request.get("presence_penalty", 0.0) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length + self.seq_lens_this_time_buffer[idx : idx + 1] = length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 @@ -526,6 +527,7 @@ class XPUModelRunner(ModelRunnerBase): ) self.share_inputs["not_need_stop"][0] = True + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests] def _init_share_inputs(self, max_num_seqs: int): """Initialize all share buffers for model inputs. @@ -571,7 +573,7 @@ class XPUModelRunner(ModelRunnerBase): self.share_inputs["max_length"] = paddle.full( [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" ) - self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32") + self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32") self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") @@ -813,7 +815,7 @@ class XPUModelRunner(ModelRunnerBase): idx = i self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) - self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length + self.seq_lens_this_time_buffer[idx : idx + 1] = input_length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 @@ -829,6 +831,7 @@ class XPUModelRunner(ModelRunnerBase): self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( idx * block_num, (idx + 1) * block_num, 1 ) + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer def _dummy_run( self, @@ -853,6 +856,7 @@ class XPUModelRunner(ModelRunnerBase): self, model_forward_batch: Optional[List[Request]] = None, is_dummy_run: bool = False, + num_running_requests: int = None, ) -> Optional[ModelRunnerOutput]: """ The Entrance of model execute. @@ -860,6 +864,7 @@ class XPUModelRunner(ModelRunnerBase): model_forward_batch: 'Request' contains information related to prompt and is an abstract class at the server level, which is too granular for ModelRunner. We plan to replace it with 'ModelForwardBatch'. + num_running_requests: batch_size intermediate_tensors: """ # 1. Prepare inputs of model and decoder. @@ -920,6 +925,10 @@ class XPUModelRunner(ModelRunnerBase): self.cache_config.block_size, self.cache_config.enc_dec_block_num, ) + if num_running_requests is not None: + self.seq_lens_this_time_buffer[:num_running_requests].copy_( + self.share_inputs["seq_lens_this_time"][:num_running_requests], False + ) return None diff --git a/fastdeploy/worker/xpu_worker.py b/fastdeploy/worker/xpu_worker.py index 16b51d2e5..a5993abb4 100644 --- a/fastdeploy/worker/xpu_worker.py +++ b/fastdeploy/worker/xpu_worker.py @@ -145,9 +145,14 @@ class XpuWorker(WorkerBase): def execute_model( self, model_forward_batch: Optional[List[Request]] = None, + is_dummy_run: bool = False, + num_running_requests: Optional[int] = None, ) -> Optional[ModelRunnerOutput]: """ """ - output = self.model_runner.execute_model(model_forward_batch) + if is_dummy_run: + output = self.model_runner.execute_model(model_forward_batch) + else: + output = self.model_runner.execute_model(model_forward_batch, num_running_requests) return output def exist_prefill(self): @@ -156,15 +161,15 @@ class XpuWorker(WorkerBase): """ return self.model_runner.exist_prefill() - def preprocess_new_task(self, req_dicts: List[Request]) -> None: + def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None: """Process new requests and then start the decode loop TODO(gongshaotian):The scheduler should schedule the handling of prefill, and workers and modelrunners should not perceive it. """ if envs.ENABLE_V1_KVCACHE_SCHEDULER: - self.model_runner.insert_tasks_v1(req_dicts=req_dicts) + self.model_runner.insert_tasks_v1(req_dicts=req_dicts, num_running_requests=num_running_requests) else: - self.model_runner.process_prefill_inputs(req_dicts=req_dicts) + self.model_runner.process_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests) def check_health(self) -> bool: """ """