[BugFix] support real batch_size (#3109) (#3217)

* support real bsz

* fix

* fix xpu_model_runner.py,gpu_model_runner.py,gcu_model_runner.py,iluvatar_model_runner.py

* add event_loop_ep

* fix

* Add comments

* fix

* support mtp real_batch_size

* fix

* self.tmp_seq_lens_this_time->self.seq_lens_this_time_buffer

* fix

* fix VL real_seq_lens_this_time

* fix

* fix mtp

* fix

* fix mtp

* fix xpu

* fix
This commit is contained in:
lizexu123
2025-08-06 14:30:33 +08:00
committed by GitHub
parent 3dd8492601
commit bc0b92bba4
10 changed files with 110 additions and 58 deletions

View File

@@ -107,7 +107,7 @@ class MTPProposer(Proposer):
idx = i idx = i
self.model_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.model_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
self.model_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) self.model_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
self.model_inputs["seq_lens_this_time"][idx : idx + 1] = input_length self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = input_length self.model_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
self.model_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.model_inputs["seq_lens_decoder"][idx : idx + 1] = 0
self.model_inputs["step_idx"][idx : idx + 1] = 0 self.model_inputs["step_idx"][idx : idx + 1] = 0
@@ -118,6 +118,7 @@ class MTPProposer(Proposer):
self.model_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( self.model_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
idx * block_num, (idx + 1) * block_num, 1 idx * block_num, (idx + 1) * block_num, 1
) )
self.model_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
def initialize_kv_cache(self): def initialize_kv_cache(self):
""" """
@@ -263,7 +264,8 @@ class MTPProposer(Proposer):
# Same shape/dytpe with base model # Same shape/dytpe with base model
self.model_inputs["block_tables"] = paddle.clone(self.main_model_inputs["block_tables"]) self.model_inputs["block_tables"] = paddle.clone(self.main_model_inputs["block_tables"])
self.model_inputs["input_ids"] = paddle.clone(self.main_model_inputs["input_ids"]) self.model_inputs["input_ids"] = paddle.clone(self.main_model_inputs["input_ids"])
self.model_inputs["seq_lens_this_time"] = paddle.clone(self.main_model_inputs["seq_lens_this_time"]) self.seq_lens_this_time_buffer = paddle.clone(self.main_model_inputs["seq_lens_this_time"])
self.model_inputs["seq_lens_encoder"] = paddle.clone(self.main_model_inputs["seq_lens_encoder"]) self.model_inputs["seq_lens_encoder"] = paddle.clone(self.main_model_inputs["seq_lens_encoder"])
self.model_inputs["seq_lens_decoder"] = paddle.clone(self.main_model_inputs["seq_lens_decoder"]) self.model_inputs["seq_lens_decoder"] = paddle.clone(self.main_model_inputs["seq_lens_decoder"])
self.model_inputs["step_idx"] = paddle.clone(self.main_model_inputs["step_idx"]) self.model_inputs["step_idx"] = paddle.clone(self.main_model_inputs["step_idx"])
@@ -338,7 +340,7 @@ class MTPProposer(Proposer):
self.main_model_inputs["seq_lens_this_time"], fill_value=-1, dtype="int32" self.main_model_inputs["seq_lens_this_time"], fill_value=-1, dtype="int32"
) )
def insert_prefill_inputs(self, req_dicts: List[Request]): def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int):
""" """
Process inputs for prefill tasks and insert it to model_inputs buffer Process inputs for prefill tasks and insert it to model_inputs buffer
""" """
@@ -372,7 +374,7 @@ class MTPProposer(Proposer):
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.model_inputs["seq_lens_encoder"][idx : idx + 1] = 0
self.model_inputs["seq_lens_decoder"][idx : idx + 1] = length self.model_inputs["seq_lens_decoder"][idx : idx + 1] = length
self.model_inputs["seq_lens_this_time"][idx : idx + 1] = prefill_token_num self.seq_lens_this_time_buffer[idx : idx + 1] = prefill_token_num
self.model_inputs["stop_flags"][idx : idx + 1] = False self.model_inputs["stop_flags"][idx : idx + 1] = False
self.model_inputs["batch_drop"][idx : idx + 1] = False self.model_inputs["batch_drop"][idx : idx + 1] = False
@@ -397,10 +399,10 @@ class MTPProposer(Proposer):
if self.cache_config.enable_chunked_prefill: if self.cache_config.enable_chunked_prefill:
token_chunk_size = request.prefill_chunk_info[0] token_chunk_size = request.prefill_chunk_info[0]
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size self.model_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size
self.model_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size
else: else:
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = length self.model_inputs["seq_lens_encoder"][idx : idx + 1] = length
self.model_inputs["seq_lens_this_time"][idx : idx + 1] = length self.seq_lens_this_time_buffer[idx : idx + 1] = length
self.model_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.model_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
self.model_inputs["stop_flags"][idx : idx + 1] = False self.model_inputs["stop_flags"][idx : idx + 1] = False
@@ -413,6 +415,7 @@ class MTPProposer(Proposer):
request.get("block_tables"), dtype="int32" request.get("block_tables"), dtype="int32"
) )
self.model_inputs["not_need_stop"][0] = True self.model_inputs["not_need_stop"][0] = True
self.model_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
def _initialize_forward_meta(self): def _initialize_forward_meta(self):
""" """

View File

@@ -152,9 +152,11 @@ class GCUModelRunner(ModelRunnerBase):
schemata_key, schemata_key,
) )
def insert_prefill_inputs(self, req_dicts: List[Request]): def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None):
""" """
Process inputs for prefill tasks and insert it to share_inputs buffer Process inputs for prefill tasks and insert it to share_inputs buffer
req_dict: A list of Request dict
num_running_requests: batch_size
""" """
if req_dicts[-1].disaggregate_info is not None and req_dicts[-1].disaggregate_info["role"] == "prefill": if req_dicts[-1].disaggregate_info is not None and req_dicts[-1].disaggregate_info["role"] == "prefill":
@@ -193,7 +195,7 @@ class GCUModelRunner(ModelRunnerBase):
self.share_inputs["prompt_ids"][idx : idx + 1, :length] = np.array(request.prompt_token_ids) self.share_inputs["prompt_ids"][idx : idx + 1, :length] = np.array(request.prompt_token_ids)
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1 self.seq_lens_this_time_buffer[idx : idx + 1] = 1
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length
self.share_inputs["prompt_lens"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length
@@ -205,7 +207,7 @@ class GCUModelRunner(ModelRunnerBase):
request.draft_token_ids[0:num_prefill_send_token], request.draft_token_ids[0:num_prefill_send_token],
dtype="int64", dtype="int64",
) )
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token
else: else:
self.share_inputs["pre_ids"][idx : idx + 1] = -1 self.share_inputs["pre_ids"][idx : idx + 1] = -1
self.share_inputs["step_idx"][idx : idx + 1] = 0 self.share_inputs["step_idx"][idx : idx + 1] = 0
@@ -222,14 +224,14 @@ class GCUModelRunner(ModelRunnerBase):
) )
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = token_chunk_size self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = token_chunk_size
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size self.share_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size
self.share_inputs["prompt_lens"][idx : idx + 1] = token_chunk_size self.share_inputs["prompt_lens"][idx : idx + 1] = token_chunk_size
else: else:
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length self.seq_lens_this_time_buffer[idx : idx + 1] = length
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["prompt_lens"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length
@@ -293,6 +295,7 @@ class GCUModelRunner(ModelRunnerBase):
if self.speculative_method in ["mtp"]: if self.speculative_method in ["mtp"]:
self.proposer.insert_prefill_inputs(req_dicts) self.proposer.insert_prefill_inputs(req_dicts)
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int): def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int):
"""Set dummy prefill inputs to share_inputs""" """Set dummy prefill inputs to share_inputs"""
@@ -311,7 +314,7 @@ class GCUModelRunner(ModelRunnerBase):
self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
@@ -329,6 +332,7 @@ class GCUModelRunner(ModelRunnerBase):
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
idx * block_num, (idx + 1) * block_num, 1 idx * block_num, (idx + 1) * block_num, 1
) )
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
def _init_share_inputs(self, max_num_seqs: int): def _init_share_inputs(self, max_num_seqs: int):
""" """
@@ -379,7 +383,7 @@ class GCUModelRunner(ModelRunnerBase):
self.share_inputs["max_length"] = paddle.full( self.share_inputs["max_length"] = paddle.full(
[max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64"
) )
self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32") self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32")
self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
@@ -921,6 +925,7 @@ class GCUModelRunner(ModelRunnerBase):
def execute_model( def execute_model(
self, self,
model_forward_batch: Optional[List[Request]] = None, model_forward_batch: Optional[List[Request]] = None,
num_running_requests: int = None,
) -> Optional[ModelRunnerOutput]: ) -> Optional[ModelRunnerOutput]:
""" """
The Entrance of model execute. The Entrance of model execute.
@@ -928,6 +933,7 @@ class GCUModelRunner(ModelRunnerBase):
model_forward_batch: 'Request' contains information related to prompt and is an abstract model_forward_batch: 'Request' contains information related to prompt and is an abstract
class at the server level, which is too granular for ModelRunner. class at the server level, which is too granular for ModelRunner.
We plan to replace it with 'ModelForwardBatch'. We plan to replace it with 'ModelForwardBatch'.
num_running_requests: batch_size
intermediate_tensors: intermediate_tensors:
""" """
# If `not_need_stop`` is False, it means the current worker is in an idle state. # If `not_need_stop`` is False, it means the current worker is in an idle state.
@@ -1053,6 +1059,9 @@ class GCUModelRunner(ModelRunnerBase):
self._update_chunked_prefill(model_forward_batch) self._update_chunked_prefill(model_forward_batch)
self._add_cache(model_forward_batch) self._add_cache(model_forward_batch)
self.seq_lens_this_time_buffer[:num_running_requests].copy_(
self.share_inputs["seq_lens_this_time"][:num_running_requests], False
)
return None return None
def _add_cache(self, model_forward_batch) -> None: def _add_cache(self, model_forward_batch) -> None:

View File

@@ -105,17 +105,18 @@ class GcuWorker(WorkerBase):
def execute_model( def execute_model(
self, self,
model_forward_batch: Optional[List[Request]] = None, model_forward_batch: Optional[List[Request]] = None,
num_running_requests: int = None,
) -> Optional[ModelRunnerOutput]: ) -> Optional[ModelRunnerOutput]:
""" """ """ """
output = self.model_runner.execute_model(model_forward_batch) output = self.model_runner.execute_model(model_forward_batch, num_running_requests)
return output return output
def preprocess_new_task(self, req_dicts: List[Request]) -> None: def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None:
"""Process new requests and then start the decode loop """Process new requests and then start the decode loop
TODO(gongshaotian):The scheduler should schedule the handling of prefill, TODO(gongshaotian):The scheduler should schedule the handling of prefill,
and workers and modelrunners should not perceive it. and workers and modelrunners should not perceive it.
""" """
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts) self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests)
def graph_optimize_and_warm_up_model(self) -> None: def graph_optimize_and_warm_up_model(self) -> None:
""" """

View File

@@ -164,6 +164,7 @@ class GPUModelRunner(ModelRunnerBase):
if self.speculative_method == "ngram": if self.speculative_method == "ngram":
self.proposer = NgramProposer(self.fd_config) self.proposer = NgramProposer(self.fd_config)
elif self.speculative_method == "mtp": elif self.speculative_method == "mtp":
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
self.proposer = MTPProposer( self.proposer = MTPProposer(
self.fd_config, self.fd_config,
self.get_model(), self.get_model(),
@@ -193,9 +194,11 @@ class GPUModelRunner(ModelRunnerBase):
return self.guided_backend.get_logits_processor(schemata_key=schemata_key), schemata_key return self.guided_backend.get_logits_processor(schemata_key=schemata_key), schemata_key
def insert_tasks_v1(self, req_dicts: List[Request]): def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None):
""" """
Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1 Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1
req_dict: A list of Request dict
num_running_requests: batch_size
""" """
# NOTE(luotingdan): Lazy initialize kv cache # NOTE(luotingdan): Lazy initialize kv cache
if "caches" not in self.share_inputs: if "caches" not in self.share_inputs:
@@ -264,7 +267,7 @@ class GPUModelRunner(ModelRunnerBase):
) )
self.share_inputs["stop_flags"][idx : idx + 1] = False self.share_inputs["stop_flags"][idx : idx + 1] = False
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length self.seq_lens_this_time_buffer[idx : idx + 1] = length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0
self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids) self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids)
@@ -286,7 +289,7 @@ class GPUModelRunner(ModelRunnerBase):
logger.debug(f"Handle preempted request {request} at idx {idx}") logger.debug(f"Handle preempted request {request} at idx {idx}")
self.share_inputs["block_tables"][idx : idx + 1, :] = -1 self.share_inputs["block_tables"][idx : idx + 1, :] = -1
self.share_inputs["stop_flags"][idx : idx + 1] = True self.share_inputs["stop_flags"][idx : idx + 1] = True
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 0 self.seq_lens_this_time_buffer[idx : idx + 1] = 0
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
self.share_inputs["is_block_step"][idx : idx + 1] = False self.share_inputs["is_block_step"][idx : idx + 1] = False
@@ -328,10 +331,13 @@ class GPUModelRunner(ModelRunnerBase):
if has_prefill_task: if has_prefill_task:
self.share_inputs["not_need_stop"][0] = True self.share_inputs["not_need_stop"][0] = True
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
def insert_prefill_inputs(self, req_dicts: List[Request]): def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None):
""" """
Process inputs for prefill tasks and insert it to share_inputs buffer Process inputs for prefill tasks and insert it to share_inputs buffer
req_dict: A list of Request dict
num_running_requests: batch_size
TODO(gongshaotian): Refactor this func TODO(gongshaotian): Refactor this func
""" """
@@ -365,7 +371,7 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs["prompt_ids"][idx : idx + 1, :length] = np.array(request.prompt_token_ids) self.share_inputs["prompt_ids"][idx : idx + 1, :length] = np.array(request.prompt_token_ids)
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1 self.seq_lens_this_time_buffer[idx : idx + 1] = 1
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length
self.share_inputs["prompt_lens"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length
@@ -377,7 +383,7 @@ class GPUModelRunner(ModelRunnerBase):
request.draft_token_ids[0:num_prefill_send_token], request.draft_token_ids[0:num_prefill_send_token],
dtype="int64", dtype="int64",
) )
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token
else: else:
self.share_inputs["pre_ids"][idx : idx + 1] = -1 self.share_inputs["pre_ids"][idx : idx + 1] = -1
self.share_inputs["step_idx"][idx : idx + 1] = 0 self.share_inputs["step_idx"][idx : idx + 1] = 0
@@ -412,7 +418,7 @@ class GPUModelRunner(ModelRunnerBase):
) )
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = token_chunk_size self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = token_chunk_size
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size self.share_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size
self.share_inputs["prompt_lens"][idx : idx + 1] = token_chunk_size self.share_inputs["prompt_lens"][idx : idx + 1] = token_chunk_size
@@ -430,7 +436,7 @@ class GPUModelRunner(ModelRunnerBase):
else: else:
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length self.seq_lens_this_time_buffer[idx : idx + 1] = length
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["prompt_lens"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length
@@ -514,8 +520,10 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs["not_need_stop"][0] = True self.share_inputs["not_need_stop"][0] = True
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
if self.speculative_method in ["mtp"]: if self.speculative_method in ["mtp"]:
self.proposer.insert_prefill_inputs(req_dicts) self.proposer.insert_prefill_inputs(req_dicts, num_running_requests)
def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int): def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int):
"""Set dummy prefill inputs to share_inputs""" """Set dummy prefill inputs to share_inputs"""
@@ -535,7 +543,7 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
@@ -553,6 +561,7 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
idx * block_num, (idx + 1) * block_num, 1 idx * block_num, (idx + 1) * block_num, 1
) )
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
def _init_share_inputs(self, max_num_seqs: int): def _init_share_inputs(self, max_num_seqs: int):
""" """
@@ -603,7 +612,7 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs["max_length"] = paddle.full( self.share_inputs["max_length"] = paddle.full(
[max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64"
) )
self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32") self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32")
self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
@@ -1247,6 +1256,7 @@ class GPUModelRunner(ModelRunnerBase):
def execute_model( def execute_model(
self, self,
model_forward_batch: Optional[List[Request]] = None, model_forward_batch: Optional[List[Request]] = None,
num_running_requests: int = None,
) -> Optional[ModelRunnerOutput]: ) -> Optional[ModelRunnerOutput]:
""" """
The Entrance of model execute. The Entrance of model execute.
@@ -1255,6 +1265,7 @@ class GPUModelRunner(ModelRunnerBase):
class at the server level, which is too granular for ModelRunner. class at the server level, which is too granular for ModelRunner.
We plan to replace it with 'ModelForwardBatch'. We plan to replace it with 'ModelForwardBatch'.
intermediate_tensors: intermediate_tensors:
num_running_requests: batch_size
""" """
# 1. Prepare inputs of model and sampler. # 1. Prepare inputs of model and sampler.
skip_idx_list = self._get_skip_idx(model_forward_batch) skip_idx_list = self._get_skip_idx(model_forward_batch)
@@ -1356,8 +1367,8 @@ class GPUModelRunner(ModelRunnerBase):
accept_num=(self.share_inputs["accept_num"] if self.speculative_decoding else None), accept_num=(self.share_inputs["accept_num"] if self.speculative_decoding else None),
enable_thinking=(self.share_inputs["enable_thinking"] if self.enable_mm else None), enable_thinking=(self.share_inputs["enable_thinking"] if self.enable_mm else None),
think_end_id=(self.model_config.think_end_id if self.enable_mm else -1), think_end_id=(self.model_config.think_end_id if self.enable_mm else -1),
need_think_end=(self.share_inputs["need_think_end"] if self.enable_mm else None), need_think_end=(self.share_inputs["need_think_end"][:num_running_requests] if self.enable_mm else None),
reasoning_index=(self.share_inputs["reasoning_index"] if self.enable_mm else None), reasoning_index=(self.share_inputs["reasoning_index"][:num_running_requests] if self.enable_mm else None),
stop_token_ids=self.share_inputs["stop_seqs"], stop_token_ids=self.share_inputs["stop_seqs"],
stop_seqs_len=self.share_inputs["stop_seqs_len"], stop_seqs_len=self.share_inputs["stop_seqs_len"],
) )
@@ -1397,6 +1408,10 @@ class GPUModelRunner(ModelRunnerBase):
self._update_chunked_prefill(model_forward_batch) self._update_chunked_prefill(model_forward_batch)
self._add_cache(model_forward_batch) self._add_cache(model_forward_batch)
self.seq_lens_this_time_buffer[:num_running_requests].copy_(
self.share_inputs["seq_lens_this_time"][:num_running_requests], False
)
return None return None
def _add_cache(self, model_forward_batch) -> None: def _add_cache(self, model_forward_batch) -> None:

View File

@@ -175,20 +175,21 @@ class GpuWorker(WorkerBase):
def execute_model( def execute_model(
self, self,
model_forward_batch: Optional[List[Request]] = None, model_forward_batch: Optional[List[Request]] = None,
num_running_request: int = None,
) -> Optional[ModelRunnerOutput]: ) -> Optional[ModelRunnerOutput]:
""" """ """ """
output = self.model_runner.execute_model(model_forward_batch) output = self.model_runner.execute_model(model_forward_batch, num_running_request)
return output return output
def preprocess_new_task(self, req_dicts: List[Request]) -> None: def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None:
"""Process new requests and then start the decode loop """Process new requests and then start the decode loop
TODO(gongshaotian):The scheduler should schedule the handling of prefill, TODO(gongshaotian):The scheduler should schedule the handling of prefill,
and workers and modelrunners should not perceive it. and workers and modelrunners should not perceive it.
""" """
if envs.ENABLE_V1_KVCACHE_SCHEDULER: if envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.model_runner.insert_tasks_v1(req_dicts=req_dicts) self.model_runner.insert_tasks_v1(req_dicts=req_dicts, num_running_requests=num_running_requests)
else: else:
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts) self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests)
def graph_optimize_and_warm_up_model(self) -> None: def graph_optimize_and_warm_up_model(self) -> None:
""" """

View File

@@ -142,9 +142,10 @@ class IluvatarModelRunner(ModelRunnerBase):
schemata_key, schemata_key,
) )
def insert_prefill_inputs(self, req_dicts: List[Request]): def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None):
""" """
Process inputs for prefill tasks and insert it to share_inputs buffer Process inputs for prefill tasks and insert it to share_inputs buffer
num_running_requests: batch_size
TODO(gongshaotian): Refactor this func TODO(gongshaotian): Refactor this func
""" """
@@ -176,7 +177,7 @@ class IluvatarModelRunner(ModelRunnerBase):
self.share_inputs["input_ids"][idx : idx + 1, 0] = request.prompt_token_ids[0] self.share_inputs["input_ids"][idx : idx + 1, 0] = request.prompt_token_ids[0]
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1 self.seq_lens_this_time_buffer[idx : idx + 1] = 1
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length
self.share_inputs["prompt_lens"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length
@@ -188,7 +189,7 @@ class IluvatarModelRunner(ModelRunnerBase):
request.draft_token_ids[0:num_prefill_send_token], request.draft_token_ids[0:num_prefill_send_token],
dtype="int64", dtype="int64",
) )
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token
else: else:
self.share_inputs["pre_ids"][idx : idx + 1] = -1 self.share_inputs["pre_ids"][idx : idx + 1] = -1
self.share_inputs["step_idx"][idx : idx + 1] = 0 self.share_inputs["step_idx"][idx : idx + 1] = 0
@@ -199,7 +200,7 @@ class IluvatarModelRunner(ModelRunnerBase):
request.set("chunk_idx", 1) request.set("chunk_idx", 1)
logger.info(f"prefill_chunk_info: {request.prefill_chunk_info}") logger.info(f"prefill_chunk_info: {request.prefill_chunk_info}")
token_chunk_size = request.prefill_chunk_info[0] token_chunk_size = request.prefill_chunk_info[0]
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size
self.share_inputs["input_ids"][idx, :token_chunk_size] = np.array( self.share_inputs["input_ids"][idx, :token_chunk_size] = np.array(
request.prompt_token_ids[:token_chunk_size] request.prompt_token_ids[:token_chunk_size]
) )
@@ -211,7 +212,7 @@ class IluvatarModelRunner(ModelRunnerBase):
else: else:
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length self.seq_lens_this_time_buffer[idx : idx + 1] = length
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["prompt_lens"][idx : idx + 1] = length self.share_inputs["prompt_lens"][idx : idx + 1] = length
@@ -262,6 +263,7 @@ class IluvatarModelRunner(ModelRunnerBase):
self.sampler.apply_logits_processor(idx, request.get("logits_processor"), prefill_tokens) self.sampler.apply_logits_processor(idx, request.get("logits_processor"), prefill_tokens)
self.share_inputs["not_need_stop"][0] = True self.share_inputs["not_need_stop"][0] = True
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int): def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int):
"""Set dummy prefill inputs to share_inputs""" """Set dummy prefill inputs to share_inputs"""
@@ -281,7 +283,7 @@ class IluvatarModelRunner(ModelRunnerBase):
self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
@@ -297,6 +299,7 @@ class IluvatarModelRunner(ModelRunnerBase):
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
idx * block_num, (idx + 1) * block_num, 1 idx * block_num, (idx + 1) * block_num, 1
) )
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
def _init_share_inputs(self, max_num_seqs: int): def _init_share_inputs(self, max_num_seqs: int):
"""Initialize all share buffers for model inputs. """Initialize all share buffers for model inputs.
@@ -342,7 +345,7 @@ class IluvatarModelRunner(ModelRunnerBase):
self.share_inputs["max_dec_len"] = paddle.full([max_num_seqs, 1], self.model_config.max_length, dtype="int64") self.share_inputs["max_dec_len"] = paddle.full([max_num_seqs, 1], self.model_config.max_length, dtype="int64")
self.share_inputs["min_length"] = paddle.full([max_num_seqs, 1], self.model_config.min_length, dtype="int64") self.share_inputs["min_length"] = paddle.full([max_num_seqs, 1], self.model_config.min_length, dtype="int64")
self.share_inputs["max_length"] = paddle.full([max_num_seqs, 1], self.model_config.max_length, dtype="int64") self.share_inputs["max_length"] = paddle.full([max_num_seqs, 1], self.model_config.max_length, dtype="int64")
self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32") self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32")
self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
@@ -859,6 +862,7 @@ class IluvatarModelRunner(ModelRunnerBase):
def execute_model( def execute_model(
self, self,
model_forward_batch: Optional[List[Request]] = None, model_forward_batch: Optional[List[Request]] = None,
num_running_requests: int = None,
) -> Optional[ModelRunnerOutput]: ) -> Optional[ModelRunnerOutput]:
""" """
The Entrance of model execute. The Entrance of model execute.
@@ -866,6 +870,7 @@ class IluvatarModelRunner(ModelRunnerBase):
model_forward_batch: 'Request' contains information related to prompt and is an abstract model_forward_batch: 'Request' contains information related to prompt and is an abstract
class at the server level, which is too granular for ModelRunner. class at the server level, which is too granular for ModelRunner.
We plan to replace it with 'ModelForwardBatch'. We plan to replace it with 'ModelForwardBatch'.
num_running_requests: batch_size
intermediate_tensors: intermediate_tensors:
""" """
# Note(@wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state. # Note(@wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state.
@@ -986,6 +991,9 @@ class IluvatarModelRunner(ModelRunnerBase):
self._update_chunked_prefill(model_forward_batch) self._update_chunked_prefill(model_forward_batch)
self._add_cache(model_forward_batch) self._add_cache(model_forward_batch)
self.seq_lens_this_time_buffer[:num_running_requests].copy_(
self.share_inputs["seq_lens_this_time"][:num_running_requests], False
)
return None return None
def _add_cache(self, model_forward_batch) -> None: def _add_cache(self, model_forward_batch) -> None:

View File

@@ -106,17 +106,18 @@ class IluvatarWorker(WorkerBase):
def execute_model( def execute_model(
self, self,
model_forward_batch: Optional[List[Request]] = None, model_forward_batch: Optional[List[Request]] = None,
num_running_requests: int = None,
) -> Optional[ModelRunnerOutput]: ) -> Optional[ModelRunnerOutput]:
""" """ """ """
output = self.model_runner.execute_model(model_forward_batch) output = self.model_runner.execute_model(model_forward_batch, num_running_requests)
return output return output
def preprocess_new_task(self, req_dicts: List[Request]) -> None: def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None:
"""Process new requests and then start the decode loop """Process new requests and then start the decode loop
TODO(gongshaotian):The scheduler should schedule the handling of prefill, TODO(gongshaotian):The scheduler should schedule the handling of prefill,
and workers and modelrunners should not perceive it. and workers and modelrunners should not perceive it.
""" """
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts) self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests)
def graph_optimize_and_warm_up_model(self) -> None: def graph_optimize_and_warm_up_model(self) -> None:
""" """

View File

@@ -257,11 +257,11 @@ class PaddleDisWorkerProc:
f"num_insert_requests: {len(req_dicts)}" f"num_insert_requests: {len(req_dicts)}"
) )
# Process prefill inputs # Process prefill inputs
self.worker.preprocess_new_task(req_dicts) self.worker.preprocess_new_task(req_dicts, num_running_requests)
# Execute model to generate token. The generated token will be written to the buffer. # Execute model to generate token. The generated token will be written to the buffer.
# These generated tokens can be obtained through get_output op. # These generated tokens can be obtained through get_output op.
self.worker.execute_model() self.worker.execute_model(num_running_requests)
def event_loop_normal(self) -> None: def event_loop_normal(self) -> None:
"""Main event loop for Paddle Distrubuted Workers. """Main event loop for Paddle Distrubuted Workers.
@@ -338,7 +338,7 @@ class PaddleDisWorkerProc:
) )
# Process prefill inputs # Process prefill inputs
self.worker.preprocess_new_task(req_dicts) self.worker.preprocess_new_task(req_dicts, num_running_requests)
if not self.worker.model_runner.not_need_stop(): if not self.worker.model_runner.not_need_stop():
if self.ranks > 1: if self.ranks > 1:
@@ -349,7 +349,7 @@ class PaddleDisWorkerProc:
# Execute model to generate token. The generated token will be written to the buffer. # Execute model to generate token. The generated token will be written to the buffer.
# These generated tokens can be obtained through get_output op. # These generated tokens can be obtained through get_output op.
self.worker.execute_model(req_dicts) self.worker.execute_model(req_dicts, num_running_requests)
self.exist_prefill_task_signal.value[0] = self.worker.exist_prefill() self.exist_prefill_task_signal.value[0] = self.worker.exist_prefill()
def initialize_kv_cache(self) -> None: def initialize_kv_cache(self) -> None:

View File

@@ -373,7 +373,7 @@ class XPUModelRunner(ModelRunnerBase):
# Forward meta store the global meta information of the forward # Forward meta store the global meta information of the forward
self.forward_meta: ForwardMeta = None self.forward_meta: ForwardMeta = None
def insert_tasks_v1(self, req_dicts: List[Request]): def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None):
""" """
Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1 Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1
""" """
@@ -403,7 +403,7 @@ class XPUModelRunner(ModelRunnerBase):
) )
self.share_inputs["stop_flags"][idx : idx + 1] = False self.share_inputs["stop_flags"][idx : idx + 1] = False
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length self.seq_lens_this_time_buffer[idx : idx + 1] = length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0
self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids) self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids)
@@ -425,7 +425,7 @@ class XPUModelRunner(ModelRunnerBase):
logger.debug(f"Handle preempted request {request} at idx {idx}") logger.debug(f"Handle preempted request {request} at idx {idx}")
self.share_inputs["block_tables"][idx : idx + 1, :] = -1 self.share_inputs["block_tables"][idx : idx + 1, :] = -1
self.share_inputs["stop_flags"][idx : idx + 1] = True self.share_inputs["stop_flags"][idx : idx + 1] = True
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 0 self.seq_lens_this_time_buffer[idx : idx + 1] = 0
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
self.share_inputs["is_block_step"][idx : idx + 1] = False self.share_inputs["is_block_step"][idx : idx + 1] = False
@@ -462,8 +462,9 @@ class XPUModelRunner(ModelRunnerBase):
) )
if has_prefill_task: if has_prefill_task:
self.share_inputs["not_need_stop"][0] = True self.share_inputs["not_need_stop"][0] = True
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
def process_prefill_inputs(self, req_dicts: List[Request]): def process_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None):
"""Process inputs for prefill tasks and update share_inputs buffer""" """Process inputs for prefill tasks and update share_inputs buffer"""
req_len = len(req_dicts) req_len = len(req_dicts)
for i in range(req_len): for i in range(req_len):
@@ -482,7 +483,7 @@ class XPUModelRunner(ModelRunnerBase):
self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0) self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0)
self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0) self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0)
self.share_inputs["presence_score"][idx : idx + 1] = request.get("presence_penalty", 0.0) self.share_inputs["presence_score"][idx : idx + 1] = request.get("presence_penalty", 0.0)
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length self.seq_lens_this_time_buffer[idx : idx + 1] = length
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
@@ -524,6 +525,7 @@ class XPUModelRunner(ModelRunnerBase):
) )
self.share_inputs["not_need_stop"][0] = True self.share_inputs["not_need_stop"][0] = True
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
def _init_share_inputs(self, max_num_seqs: int): def _init_share_inputs(self, max_num_seqs: int):
"""Initialize all share buffers for model inputs. """Initialize all share buffers for model inputs.
@@ -569,7 +571,7 @@ class XPUModelRunner(ModelRunnerBase):
self.share_inputs["max_length"] = paddle.full( self.share_inputs["max_length"] = paddle.full(
[max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64"
) )
self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32") self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32")
self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
@@ -811,7 +813,7 @@ class XPUModelRunner(ModelRunnerBase):
idx = i idx = i
self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
@@ -827,6 +829,7 @@ class XPUModelRunner(ModelRunnerBase):
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
idx * block_num, (idx + 1) * block_num, 1 idx * block_num, (idx + 1) * block_num, 1
) )
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
def _dummy_run( def _dummy_run(
self, self,
@@ -851,6 +854,7 @@ class XPUModelRunner(ModelRunnerBase):
self, self,
model_forward_batch: Optional[List[Request]] = None, model_forward_batch: Optional[List[Request]] = None,
is_dummy_run: bool = False, is_dummy_run: bool = False,
num_running_requests: int = None,
) -> Optional[ModelRunnerOutput]: ) -> Optional[ModelRunnerOutput]:
""" """
The Entrance of model execute. The Entrance of model execute.
@@ -858,6 +862,7 @@ class XPUModelRunner(ModelRunnerBase):
model_forward_batch: 'Request' contains information related to prompt and is an abstract model_forward_batch: 'Request' contains information related to prompt and is an abstract
class at the server level, which is too granular for ModelRunner. class at the server level, which is too granular for ModelRunner.
We plan to replace it with 'ModelForwardBatch'. We plan to replace it with 'ModelForwardBatch'.
num_running_requests: batch_size
intermediate_tensors: intermediate_tensors:
""" """
# 1. Prepare inputs of model and decoder. # 1. Prepare inputs of model and decoder.
@@ -918,6 +923,10 @@ class XPUModelRunner(ModelRunnerBase):
self.cache_config.block_size, self.cache_config.block_size,
self.cache_config.enc_dec_block_num, self.cache_config.enc_dec_block_num,
) )
if num_running_requests is not None:
self.seq_lens_this_time_buffer[:num_running_requests].copy_(
self.share_inputs["seq_lens_this_time"][:num_running_requests], False
)
return None return None

View File

@@ -145,9 +145,14 @@ class XpuWorker(WorkerBase):
def execute_model( def execute_model(
self, self,
model_forward_batch: Optional[List[Request]] = None, model_forward_batch: Optional[List[Request]] = None,
is_dummy_run: bool = False,
num_running_requests: Optional[int] = None,
) -> Optional[ModelRunnerOutput]: ) -> Optional[ModelRunnerOutput]:
""" """ """ """
if is_dummy_run:
output = self.model_runner.execute_model(model_forward_batch) output = self.model_runner.execute_model(model_forward_batch)
else:
output = self.model_runner.execute_model(model_forward_batch, num_running_requests)
return output return output
def exist_prefill(self): def exist_prefill(self):
@@ -156,15 +161,15 @@ class XpuWorker(WorkerBase):
""" """
return self.model_runner.exist_prefill() return self.model_runner.exist_prefill()
def preprocess_new_task(self, req_dicts: List[Request]) -> None: def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None:
"""Process new requests and then start the decode loop """Process new requests and then start the decode loop
TODO(gongshaotian):The scheduler should schedule the handling of prefill, TODO(gongshaotian):The scheduler should schedule the handling of prefill,
and workers and modelrunners should not perceive it. and workers and modelrunners should not perceive it.
""" """
if envs.ENABLE_V1_KVCACHE_SCHEDULER: if envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.model_runner.insert_tasks_v1(req_dicts=req_dicts) self.model_runner.insert_tasks_v1(req_dicts=req_dicts, num_running_requests=num_running_requests)
else: else:
self.model_runner.process_prefill_inputs(req_dicts=req_dicts) self.model_runner.process_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests)
def check_health(self) -> bool: def check_health(self) -> bool:
""" """ """ """