This commit is contained in:
yinwei
2025-08-09 16:29:35 +08:00
committed by GitHub
parent 6706ccb37e
commit 702c313ed1
2 changed files with 13 additions and 21 deletions

View File

@@ -373,7 +373,7 @@ class XPUModelRunner(ModelRunnerBase):
# Forward meta store the global meta information of the forward # Forward meta store the global meta information of the forward
self.forward_meta: ForwardMeta = None self.forward_meta: ForwardMeta = None
def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None): def insert_tasks_v1(self, req_dicts: List[Request]):
""" """
Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1 Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1
""" """
@@ -403,7 +403,7 @@ class XPUModelRunner(ModelRunnerBase):
) )
self.share_inputs["stop_flags"][idx : idx + 1] = False self.share_inputs["stop_flags"][idx : idx + 1] = False
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index
self.seq_lens_this_time_buffer[idx : idx + 1] = length self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0
self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids) self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids)
@@ -425,7 +425,7 @@ class XPUModelRunner(ModelRunnerBase):
logger.debug(f"Handle preempted request {request} at idx {idx}") logger.debug(f"Handle preempted request {request} at idx {idx}")
self.share_inputs["block_tables"][idx : idx + 1, :] = -1 self.share_inputs["block_tables"][idx : idx + 1, :] = -1
self.share_inputs["stop_flags"][idx : idx + 1] = True self.share_inputs["stop_flags"][idx : idx + 1] = True
self.seq_lens_this_time_buffer[idx : idx + 1] = 0 self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 0
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0
self.share_inputs["is_block_step"][idx : idx + 1] = False self.share_inputs["is_block_step"][idx : idx + 1] = False
@@ -462,9 +462,8 @@ class XPUModelRunner(ModelRunnerBase):
) )
if has_prefill_task: if has_prefill_task:
self.share_inputs["not_need_stop"][0] = True self.share_inputs["not_need_stop"][0] = True
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
def process_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None): def process_prefill_inputs(self, req_dicts: List[Request]):
"""Process inputs for prefill tasks and update share_inputs buffer""" """Process inputs for prefill tasks and update share_inputs buffer"""
req_len = len(req_dicts) req_len = len(req_dicts)
for i in range(req_len): for i in range(req_len):
@@ -483,7 +482,7 @@ class XPUModelRunner(ModelRunnerBase):
self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0) self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0)
self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0) self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0)
self.share_inputs["presence_score"][idx : idx + 1] = request.get("presence_penalty", 0.0) self.share_inputs["presence_score"][idx : idx + 1] = request.get("presence_penalty", 0.0)
self.seq_lens_this_time_buffer[idx : idx + 1] = length self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
@@ -527,7 +526,6 @@ class XPUModelRunner(ModelRunnerBase):
) )
self.share_inputs["not_need_stop"][0] = True self.share_inputs["not_need_stop"][0] = True
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests]
def _init_share_inputs(self, max_num_seqs: int): def _init_share_inputs(self, max_num_seqs: int):
"""Initialize all share buffers for model inputs. """Initialize all share buffers for model inputs.
@@ -573,7 +571,7 @@ class XPUModelRunner(ModelRunnerBase):
self.share_inputs["max_length"] = paddle.full( self.share_inputs["max_length"] = paddle.full(
[max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64"
) )
self.seq_lens_this_time_buffer = paddle.full(max_num_seqs, 0, dtype="int32") self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32")
self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32")
@@ -815,7 +813,7 @@ class XPUModelRunner(ModelRunnerBase):
idx = i idx = i
self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length)
self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1) self.share_inputs["eos_token_id"][:] = np.array([2], dtype="int64").reshape(-1, 1)
self.seq_lens_this_time_buffer[idx : idx + 1] = input_length self.share_inputs["seq_lens_this_time"][idx : idx + 1] = input_length
self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length
self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length
self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0
@@ -831,7 +829,6 @@ class XPUModelRunner(ModelRunnerBase):
self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange(
idx * block_num, (idx + 1) * block_num, 1 idx * block_num, (idx + 1) * block_num, 1
) )
self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer
def _dummy_run( def _dummy_run(
self, self,
@@ -925,10 +922,6 @@ class XPUModelRunner(ModelRunnerBase):
self.cache_config.block_size, self.cache_config.block_size,
self.cache_config.enc_dec_block_num, self.cache_config.enc_dec_block_num,
) )
if num_running_requests is not None:
self.seq_lens_this_time_buffer[:num_running_requests].copy_(
self.share_inputs["seq_lens_this_time"][:num_running_requests], False
)
return None return None

View File

@@ -149,10 +149,9 @@ class XpuWorker(WorkerBase):
num_running_requests: Optional[int] = None, num_running_requests: Optional[int] = None,
) -> Optional[ModelRunnerOutput]: ) -> Optional[ModelRunnerOutput]:
""" """ """ """
if is_dummy_run:
output = self.model_runner.execute_model(model_forward_batch) output = self.model_runner.execute_model(model_forward_batch)
else:
output = self.model_runner.execute_model(model_forward_batch, num_running_requests)
return output return output
def exist_prefill(self): def exist_prefill(self):
@@ -161,15 +160,15 @@ class XpuWorker(WorkerBase):
""" """
return self.model_runner.exist_prefill() return self.model_runner.exist_prefill()
def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None: def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int = -1) -> None:
"""Process new requests and then start the decode loop """Process new requests and then start the decode loop
TODO(gongshaotian):The scheduler should schedule the handling of prefill, TODO(gongshaotian):The scheduler should schedule the handling of prefill,
and workers and modelrunners should not perceive it. and workers and modelrunners should not perceive it.
""" """
if envs.ENABLE_V1_KVCACHE_SCHEDULER: if envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.model_runner.insert_tasks_v1(req_dicts=req_dicts, num_running_requests=num_running_requests) self.model_runner.insert_tasks_v1(req_dicts=req_dicts)
else: else:
self.model_runner.process_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests) self.model_runner.process_prefill_inputs(req_dicts=req_dicts)
def check_health(self) -> bool: def check_health(self) -> bool:
""" """ """ """