mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-10-04 00:06:38 +08:00
[BugFix] support real batch_size (#3109)
* support real bsz * fix * fix xpu_model_runner.py,gpu_model_runner.py,gcu_model_runner.py,iluvatar_model_runner.py * add event_loop_ep * fix * Add comments * fix * support mtp real_batch_size * fix * self.tmp_seq_lens_this_time->self.seq_lens_this_time_buffer * fix * fix VL real_seq_lens_this_time * fix * fix mtp * fix * fix mtp * fix xpu * fix
This commit is contained in:
@@ -107,7 +107,7 @@ class MTPProposer(Proposer):
|
||||
idx = i
|
||||
self.model_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.model_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
|
||||
self.model_inputs["seq_lens_this_time"][idx : idx + 1] = input_length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
|
||||
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.model_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
self.model_inputs["step_idx"][idx : idx + 1] = 0
|
||||
@@ -118,6 +118,7 @@ class MTPProposer(Proposer):
|
||||
self.model_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
|
||||
idx * block_num, (idx + 1) * block_num, 1
|
||||
)
|
||||
self.model_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
|
||||
|
||||
def initialize_kv_cache(self):
|
||||
"""
|
||||
@@ -263,7 +264,8 @@ class MTPProposer(Proposer):
|
||||
# Same shape/dytpe with base model
|
||||
self.model_inputs["block_tables"] = paddle.clone(self.main_model_inputs["block_tables"])
|
||||
self.model_inputs["input_ids"] = paddle.clone(self.main_model_inputs["input_ids"])
|
||||
self.model_inputs["seq_lens_this_time"] = paddle.clone(self.main_model_inputs["seq_lens_this_time"])
|
||||
self.seq_lens_this_time_buffer = paddle.clone(self.main_model_inputs["seq_lens_this_time"])
|
||||
|
||||
self.model_inputs["seq_lens_encoder"] = paddle.clone(self.main_model_inputs["seq_lens_encoder"])
|
||||
self.model_inputs["seq_lens_decoder"] = paddle.clone(self.main_model_inputs["seq_lens_decoder"])
|
||||
self.model_inputs["step_idx"] = paddle.clone(self.main_model_inputs["step_idx"])
|
||||
@@ -338,7 +340,7 @@ class MTPProposer(Proposer):
|
||||
self.main_model_inputs["seq_lens_this_time"], fill_value=-1, dtype="int32"
|
||||
)
|
||||
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request]):
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int):
|
||||
"""
|
||||
Process inputs for prefill tasks and insert it to model_inputs buffer
|
||||
"""
|
||||
@@ -372,7 +374,7 @@ class MTPProposer(Proposer):
|
||||
|
||||
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.model_inputs["seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.model_inputs["seq_lens_this_time"][idx : idx + 1] = prefill_token_num
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = prefill_token_num
|
||||
|
||||
self.model_inputs["stop_flags"][idx : idx + 1] = False
|
||||
self.model_inputs["batch_drop"][idx : idx + 1] = False
|
||||
@@ -397,10 +399,10 @@ class MTPProposer(Proposer):
|
||||
if self.cache_config.enable_chunked_prefill:
|
||||
token_chunk_size = request.prefill_chunk_info[0]
|
||||
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size
|
||||
self.model_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size
|
||||
else:
|
||||
self.model_inputs["seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.model_inputs["seq_lens_this_time"][idx : idx + 1] = length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = length
|
||||
|
||||
self.model_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.model_inputs["stop_flags"][idx : idx + 1] = False
|
||||
@@ -413,6 +415,7 @@ class MTPProposer(Proposer):
|
||||
request.get("block_tables"), dtype="int32"
|
||||
)
|
||||
self.model_inputs["not_need_stop"][0] = True
|
||||
self.model_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
|
||||
|
||||
def _initialize_forward_meta(self):
|
||||
"""
|
||||
|
@@ -152,9 +152,11 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
schemata_key,
|
||||
)
|
||||
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request]):
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None):
|
||||
"""
|
||||
Process inputs for prefill tasks and insert it to share_inputs buffer
|
||||
req_dict: A list of Request dict
|
||||
num_running_requests: batch_size
|
||||
"""
|
||||
|
||||
if req_dicts[-1].disaggregate_info is not None and req_dicts[-1].disaggregate_info["role"] == "prefill":
|
||||
@@ -193,7 +195,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["prompt_ids"][idx : idx + 1, :length] = np.array(request.prompt_token_ids)
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = 1
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = length
|
||||
@@ -205,7 +207,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
request.draft_token_ids[0:num_prefill_send_token],
|
||||
dtype="int64",
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token
|
||||
else:
|
||||
self.share_inputs["pre_ids"][idx : idx + 1] = -1
|
||||
self.share_inputs["step_idx"][idx : idx + 1] = 0
|
||||
@@ -222,14 +224,14 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
)
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = token_chunk_size
|
||||
else:
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = length
|
||||
@@ -295,6 +297,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
|
||||
if self.speculative_method in ["mtp"]:
|
||||
self.proposer.insert_prefill_inputs(req_dicts)
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
|
||||
|
||||
def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int):
|
||||
"""Set dummy prefill inputs to share_inputs"""
|
||||
@@ -313,7 +316,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
@@ -331,6 +334,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
|
||||
idx * block_num, (idx + 1) * block_num, 1
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
|
||||
|
||||
def _init_share_inputs(self, max_num_seqs: int):
|
||||
"""
|
||||
@@ -381,7 +385,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["max_length"] = paddle.full(
|
||||
[max_num_seqs, 1], self.model_config.max_model_len, dtype="int64"
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
@@ -923,6 +927,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
num_running_requests: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
"""
|
||||
The Entrance of model execute.
|
||||
@@ -930,6 +935,7 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
model_forward_batch: 'Request' contains information related to prompt and is an abstract
|
||||
class at the server level, which is too granular for ModelRunner.
|
||||
We plan to replace it with 'ModelForwardBatch'.
|
||||
num_running_requests: batch_size
|
||||
intermediate_tensors:
|
||||
"""
|
||||
# If `not_need_stop`` is False, it means the current worker is in an idle state.
|
||||
@@ -1055,6 +1061,9 @@ class GCUModelRunner(ModelRunnerBase):
|
||||
|
||||
self._update_chunked_prefill(model_forward_batch)
|
||||
self._add_cache(model_forward_batch)
|
||||
self.seq_lens_this_time_buffer[:num_running_requests].copy_(
|
||||
self.share_inputs["seq_lens_this_time"][:num_running_requests], False
|
||||
)
|
||||
return None
|
||||
|
||||
def _add_cache(self, model_forward_batch) -> None:
|
||||
|
@@ -105,17 +105,18 @@ class GcuWorker(WorkerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
num_running_requests: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
""" """
|
||||
output = self.model_runner.execute_model(model_forward_batch)
|
||||
output = self.model_runner.execute_model(model_forward_batch, num_running_requests)
|
||||
return output
|
||||
|
||||
def preprocess_new_task(self, req_dicts: List[Request]) -> None:
|
||||
def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None:
|
||||
"""Process new requests and then start the decode loop
|
||||
TODO(gongshaotian):The scheduler should schedule the handling of prefill,
|
||||
and workers and modelrunners should not perceive it.
|
||||
"""
|
||||
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts)
|
||||
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests)
|
||||
|
||||
def graph_optimize_and_warm_up_model(self) -> None:
|
||||
"""
|
||||
|
@@ -164,6 +164,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
if self.speculative_method == "ngram":
|
||||
self.proposer = NgramProposer(self.fd_config)
|
||||
elif self.speculative_method == "mtp":
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
|
||||
self.proposer = MTPProposer(
|
||||
self.fd_config,
|
||||
self.get_model(),
|
||||
@@ -193,9 +194,11 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
|
||||
return self.guided_backend.get_logits_processor(schemata_key=schemata_key), schemata_key
|
||||
|
||||
def insert_tasks_v1(self, req_dicts: List[Request]):
|
||||
def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None):
|
||||
"""
|
||||
Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1
|
||||
req_dict: A list of Request dict
|
||||
num_running_requests: batch_size
|
||||
"""
|
||||
# NOTE(luotingdan): Lazy initialize kv cache
|
||||
if "caches" not in self.share_inputs:
|
||||
@@ -264,7 +267,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
)
|
||||
self.share_inputs["stop_flags"][idx : idx + 1] = False
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids)
|
||||
@@ -286,7 +289,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
logger.debug(f"Handle preempted request {request} at idx {idx}")
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :] = -1
|
||||
self.share_inputs["stop_flags"][idx : idx + 1] = True
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 0
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = 0
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["is_block_step"][idx : idx + 1] = False
|
||||
@@ -328,10 +331,13 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
|
||||
if has_prefill_task:
|
||||
self.share_inputs["not_need_stop"][0] = True
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
|
||||
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request]):
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None):
|
||||
"""
|
||||
Process inputs for prefill tasks and insert it to share_inputs buffer
|
||||
req_dict: A list of Request dict
|
||||
num_running_requests: batch_size
|
||||
TODO(gongshaotian): Refactor this func
|
||||
"""
|
||||
|
||||
@@ -365,7 +371,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["prompt_ids"][idx : idx + 1, :length] = np.array(request.prompt_token_ids)
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = 1
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = length
|
||||
@@ -377,7 +383,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
request.draft_token_ids[0:num_prefill_send_token],
|
||||
dtype="int64",
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token
|
||||
else:
|
||||
self.share_inputs["pre_ids"][idx : idx + 1] = -1
|
||||
self.share_inputs["step_idx"][idx : idx + 1] = 0
|
||||
@@ -412,7 +418,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
)
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = token_chunk_size
|
||||
@@ -430,7 +436,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
else:
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = length
|
||||
@@ -516,8 +522,10 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
|
||||
self.share_inputs["not_need_stop"][0] = True
|
||||
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
|
||||
|
||||
if self.speculative_method in ["mtp"]:
|
||||
self.proposer.insert_prefill_inputs(req_dicts)
|
||||
self.proposer.insert_prefill_inputs(req_dicts, num_running_requests)
|
||||
|
||||
def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int):
|
||||
"""Set dummy prefill inputs to share_inputs"""
|
||||
@@ -543,7 +551,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
@@ -561,6 +569,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
|
||||
idx * block_num, (idx + 1) * block_num, 1
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
|
||||
|
||||
def _init_share_inputs(self, max_num_seqs: int):
|
||||
"""
|
||||
@@ -611,7 +620,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["max_length"] = paddle.full(
|
||||
[max_num_seqs, 1], self.model_config.max_model_len, dtype="int64"
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
@@ -1255,6 +1264,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
num_running_requests: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
"""
|
||||
The Entrance of model execute.
|
||||
@@ -1263,6 +1273,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
class at the server level, which is too granular for ModelRunner.
|
||||
We plan to replace it with 'ModelForwardBatch'.
|
||||
intermediate_tensors:
|
||||
num_running_requests: batch_size
|
||||
"""
|
||||
# 1. Prepare inputs of model and sampler.
|
||||
skip_idx_list = self._get_skip_idx(model_forward_batch)
|
||||
@@ -1364,8 +1375,8 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
accept_num=(self.share_inputs["accept_num"] if self.speculative_decoding else None),
|
||||
enable_thinking=(self.share_inputs["enable_thinking"] if self.enable_mm else None),
|
||||
think_end_id=(self.model_config.think_end_id if self.enable_mm else -1),
|
||||
need_think_end=(self.share_inputs["need_think_end"] if self.enable_mm else None),
|
||||
reasoning_index=(self.share_inputs["reasoning_index"] if self.enable_mm else None),
|
||||
need_think_end=(self.share_inputs["need_think_end"][:num_running_requests] if self.enable_mm else None),
|
||||
reasoning_index=(self.share_inputs["reasoning_index"][:num_running_requests] if self.enable_mm else None),
|
||||
stop_token_ids=self.share_inputs["stop_seqs"],
|
||||
stop_seqs_len=self.share_inputs["stop_seqs_len"],
|
||||
)
|
||||
@@ -1405,6 +1416,10 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
|
||||
self._update_chunked_prefill(model_forward_batch)
|
||||
self._add_cache(model_forward_batch)
|
||||
|
||||
self.seq_lens_this_time_buffer[:num_running_requests].copy_(
|
||||
self.share_inputs["seq_lens_this_time"][:num_running_requests], False
|
||||
)
|
||||
return None
|
||||
|
||||
def _add_cache(self, model_forward_batch) -> None:
|
||||
|
@@ -181,20 +181,21 @@ class GpuWorker(WorkerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
num_running_request: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
""" """
|
||||
output = self.model_runner.execute_model(model_forward_batch)
|
||||
output = self.model_runner.execute_model(model_forward_batch, num_running_request)
|
||||
return output
|
||||
|
||||
def preprocess_new_task(self, req_dicts: List[Request]) -> None:
|
||||
def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None:
|
||||
"""Process new requests and then start the decode loop
|
||||
TODO(gongshaotian):The scheduler should schedule the handling of prefill,
|
||||
and workers and modelrunners should not perceive it.
|
||||
"""
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
self.model_runner.insert_tasks_v1(req_dicts=req_dicts)
|
||||
self.model_runner.insert_tasks_v1(req_dicts=req_dicts, num_running_requests=num_running_requests)
|
||||
else:
|
||||
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts)
|
||||
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests)
|
||||
|
||||
def graph_optimize_and_warm_up_model(self) -> None:
|
||||
"""
|
||||
|
@@ -142,9 +142,10 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
schemata_key,
|
||||
)
|
||||
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request]):
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None):
|
||||
"""
|
||||
Process inputs for prefill tasks and insert it to share_inputs buffer
|
||||
num_running_requests: batch_size
|
||||
TODO(gongshaotian): Refactor this func
|
||||
"""
|
||||
|
||||
@@ -176,7 +177,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["input_ids"][idx : idx + 1, 0] = request.prompt_token_ids[0]
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = 1
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = length
|
||||
@@ -188,7 +189,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
request.draft_token_ids[0:num_prefill_send_token],
|
||||
dtype="int64",
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = num_prefill_send_token
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = num_prefill_send_token
|
||||
else:
|
||||
self.share_inputs["pre_ids"][idx : idx + 1] = -1
|
||||
self.share_inputs["step_idx"][idx : idx + 1] = 0
|
||||
@@ -199,7 +200,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
request.set("chunk_idx", 1)
|
||||
logger.info(f"prefill_chunk_info: {request.prefill_chunk_info}")
|
||||
token_chunk_size = request.prefill_chunk_info[0]
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = token_chunk_size
|
||||
self.share_inputs["input_ids"][idx, :token_chunk_size] = np.array(
|
||||
request.prompt_token_ids[:token_chunk_size]
|
||||
)
|
||||
@@ -211,7 +212,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
else:
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = length
|
||||
@@ -264,6 +265,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.sampler.apply_logits_processor(idx, request.get("logits_processor"), prefill_tokens)
|
||||
|
||||
self.share_inputs["not_need_stop"][0] = True
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
|
||||
|
||||
def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, expected_decode_len: int):
|
||||
"""Set dummy prefill inputs to share_inputs"""
|
||||
@@ -283,7 +285,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["prompt_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
@@ -299,6 +301,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
|
||||
idx * block_num, (idx + 1) * block_num, 1
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
|
||||
|
||||
def _init_share_inputs(self, max_num_seqs: int):
|
||||
"""Initialize all share buffers for model inputs.
|
||||
@@ -344,7 +347,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["max_dec_len"] = paddle.full([max_num_seqs, 1], self.model_config.max_length, dtype="int64")
|
||||
self.share_inputs["min_length"] = paddle.full([max_num_seqs, 1], self.model_config.min_length, dtype="int64")
|
||||
self.share_inputs["max_length"] = paddle.full([max_num_seqs, 1], self.model_config.max_length, dtype="int64")
|
||||
self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
@@ -861,6 +864,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
num_running_requests: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
"""
|
||||
The Entrance of model execute.
|
||||
@@ -868,6 +872,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
model_forward_batch: 'Request' contains information related to prompt and is an abstract
|
||||
class at the server level, which is too granular for ModelRunner.
|
||||
We plan to replace it with 'ModelForwardBatch'.
|
||||
num_running_requests: batch_size
|
||||
intermediate_tensors:
|
||||
"""
|
||||
# Note(@wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state.
|
||||
@@ -988,6 +993,9 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
|
||||
self._update_chunked_prefill(model_forward_batch)
|
||||
self._add_cache(model_forward_batch)
|
||||
self.seq_lens_this_time_buffer[:num_running_requests].copy_(
|
||||
self.share_inputs["seq_lens_this_time"][:num_running_requests], False
|
||||
)
|
||||
return None
|
||||
|
||||
def _add_cache(self, model_forward_batch) -> None:
|
||||
|
@@ -106,17 +106,18 @@ class IluvatarWorker(WorkerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
num_running_requests: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
""" """
|
||||
output = self.model_runner.execute_model(model_forward_batch)
|
||||
output = self.model_runner.execute_model(model_forward_batch, num_running_requests)
|
||||
return output
|
||||
|
||||
def preprocess_new_task(self, req_dicts: List[Request]) -> None:
|
||||
def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None:
|
||||
"""Process new requests and then start the decode loop
|
||||
TODO(gongshaotian):The scheduler should schedule the handling of prefill,
|
||||
and workers and modelrunners should not perceive it.
|
||||
"""
|
||||
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts)
|
||||
self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests)
|
||||
|
||||
def graph_optimize_and_warm_up_model(self) -> None:
|
||||
"""
|
||||
|
@@ -257,11 +257,11 @@ class PaddleDisWorkerProc:
|
||||
f"num_insert_requests: {len(req_dicts)}"
|
||||
)
|
||||
# Process prefill inputs
|
||||
self.worker.preprocess_new_task(req_dicts)
|
||||
self.worker.preprocess_new_task(req_dicts, num_running_requests)
|
||||
|
||||
# Execute model to generate token. The generated token will be written to the buffer.
|
||||
# These generated tokens can be obtained through get_output op.
|
||||
self.worker.execute_model()
|
||||
self.worker.execute_model(num_running_requests)
|
||||
|
||||
def event_loop_normal(self) -> None:
|
||||
"""Main event loop for Paddle Distrubuted Workers.
|
||||
@@ -338,7 +338,7 @@ class PaddleDisWorkerProc:
|
||||
)
|
||||
|
||||
# Process prefill inputs
|
||||
self.worker.preprocess_new_task(req_dicts)
|
||||
self.worker.preprocess_new_task(req_dicts, num_running_requests)
|
||||
|
||||
if not self.worker.model_runner.not_need_stop():
|
||||
if self.ranks > 1:
|
||||
@@ -349,7 +349,7 @@ class PaddleDisWorkerProc:
|
||||
|
||||
# Execute model to generate token. The generated token will be written to the buffer.
|
||||
# These generated tokens can be obtained through get_output op.
|
||||
self.worker.execute_model(req_dicts)
|
||||
self.worker.execute_model(req_dicts, num_running_requests)
|
||||
self.exist_prefill_task_signal.value[0] = self.worker.exist_prefill()
|
||||
|
||||
def initialize_kv_cache(self) -> None:
|
||||
|
@@ -373,7 +373,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
# Forward meta store the global meta information of the forward
|
||||
self.forward_meta: ForwardMeta = None
|
||||
|
||||
def insert_tasks_v1(self, req_dicts: List[Request]):
|
||||
def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None):
|
||||
"""
|
||||
Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1
|
||||
"""
|
||||
@@ -403,7 +403,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
)
|
||||
self.share_inputs["stop_flags"][idx : idx + 1] = False
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids)
|
||||
@@ -425,7 +425,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
logger.debug(f"Handle preempted request {request} at idx {idx}")
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :] = -1
|
||||
self.share_inputs["stop_flags"][idx : idx + 1] = True
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 0
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = 0
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
|
||||
self.share_inputs["is_block_step"][idx : idx + 1] = False
|
||||
@@ -462,8 +462,9 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
)
|
||||
if has_prefill_task:
|
||||
self.share_inputs["not_need_stop"][0] = True
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
|
||||
|
||||
def process_prefill_inputs(self, req_dicts: List[Request]):
|
||||
def process_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None):
|
||||
"""Process inputs for prefill tasks and update share_inputs buffer"""
|
||||
req_len = len(req_dicts)
|
||||
for i in range(req_len):
|
||||
@@ -482,7 +483,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0)
|
||||
self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0)
|
||||
self.share_inputs["presence_score"][idx : idx + 1] = request.get("presence_penalty", 0.0)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
@@ -526,6 +527,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
)
|
||||
|
||||
self.share_inputs["not_need_stop"][0] = True
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
|
||||
|
||||
def _init_share_inputs(self, max_num_seqs: int):
|
||||
"""Initialize all share buffers for model inputs.
|
||||
@@ -571,7 +573,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["max_length"] = paddle.full(
|
||||
[max_num_seqs, 1], self.model_config.max_model_len, dtype="int64"
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
|
||||
@@ -813,7 +815,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
idx = i
|
||||
self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
|
||||
self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
|
||||
self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length
|
||||
self.seq_lens_this_time_buffer[idx : idx + 1] = input_length
|
||||
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
|
||||
@@ -829,6 +831,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
|
||||
idx * block_num, (idx + 1) * block_num, 1
|
||||
)
|
||||
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
|
||||
|
||||
def _dummy_run(
|
||||
self,
|
||||
@@ -853,6 +856,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
is_dummy_run: bool = False,
|
||||
num_running_requests: int = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
"""
|
||||
The Entrance of model execute.
|
||||
@@ -860,6 +864,7 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
model_forward_batch: 'Request' contains information related to prompt and is an abstract
|
||||
class at the server level, which is too granular for ModelRunner.
|
||||
We plan to replace it with 'ModelForwardBatch'.
|
||||
num_running_requests: batch_size
|
||||
intermediate_tensors:
|
||||
"""
|
||||
# 1. Prepare inputs of model and decoder.
|
||||
@@ -920,6 +925,10 @@ class XPUModelRunner(ModelRunnerBase):
|
||||
self.cache_config.block_size,
|
||||
self.cache_config.enc_dec_block_num,
|
||||
)
|
||||
if num_running_requests is not None:
|
||||
self.seq_lens_this_time_buffer[:num_running_requests].copy_(
|
||||
self.share_inputs["seq_lens_this_time"][:num_running_requests], False
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
@@ -145,9 +145,14 @@ class XpuWorker(WorkerBase):
|
||||
def execute_model(
|
||||
self,
|
||||
model_forward_batch: Optional[List[Request]] = None,
|
||||
is_dummy_run: bool = False,
|
||||
num_running_requests: Optional[int] = None,
|
||||
) -> Optional[ModelRunnerOutput]:
|
||||
""" """
|
||||
if is_dummy_run:
|
||||
output = self.model_runner.execute_model(model_forward_batch)
|
||||
else:
|
||||
output = self.model_runner.execute_model(model_forward_batch, num_running_requests)
|
||||
return output
|
||||
|
||||
def exist_prefill(self):
|
||||
@@ -156,15 +161,15 @@ class XpuWorker(WorkerBase):
|
||||
"""
|
||||
return self.model_runner.exist_prefill()
|
||||
|
||||
def preprocess_new_task(self, req_dicts: List[Request]) -> None:
|
||||
def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None:
|
||||
"""Process new requests and then start the decode loop
|
||||
TODO(gongshaotian):The scheduler should schedule the handling of prefill,
|
||||
and workers and modelrunners should not perceive it.
|
||||
"""
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
self.model_runner.insert_tasks_v1(req_dicts=req_dicts)
|
||||
self.model_runner.insert_tasks_v1(req_dicts=req_dicts, num_running_requests=num_running_requests)
|
||||
else:
|
||||
self.model_runner.process_prefill_inputs(req_dicts=req_dicts)
|
||||
self.model_runner.process_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests)
|
||||
|
||||
def check_health(self) -> bool:
|
||||
""" """
|
||||
|
Reference in New Issue
Block a user