[Optimization] compulte real max_logprobs in batch (#5430)

This commit is contained in:
chen
2025-12-09 14:15:05 +08:00
committed by GitHub
parent f7e832efaf
commit 76649b45c1
4 changed files with 48 additions and 6 deletions

View File

@@ -179,6 +179,12 @@ class Request:
pooling_params = PoolingParams.from_dict(d["pooling_params"])
else:
sampling_params = SamplingParams.from_dict(d)
logprobs = d.get("logprobs", None)
if logprobs is not None:
if logprobs is True:
sampling_params.logprobs = d.get("top_logprobs", None)
elif logprobs is False:
sampling_params.logprobs = None
if "metrics" in d and d["metrics"] is not None:
metrics = RequestMetrics.from_dict(d["metrics"])
else:

View File

@@ -53,6 +53,8 @@ class SamplingMetadata:
stop_flags: Optional[paddle.Tensor] = None
prompt_ids: Optional[paddle.Tensor] = None
prompt_lens: Optional[paddle.Tensor] = None
temp_scaled_logprobs_flag: Optional[bool] = None
top_p_normalized_logprobs_flag: Optional[bool] = None
temp_scaled_logprobs: Optional[paddle.Tensor] = None
top_p_normalized_logprobs: Optional[paddle.Tensor] = None
share_inputs: Optional[Dict[str, paddle.Tensor]] = None

View File

@@ -375,7 +375,7 @@ class Sampler(nn.Layer):
temp_scaled_logprobs = sampling_metadata.temp_scaled_logprobs
top_p_normalized_logprobs = sampling_metadata.top_p_normalized_logprobs
share_inputs = sampling_metadata.share_inputs
if temp_scaled_logprobs is not None:
if temp_scaled_logprobs is not None and sampling_metadata.temp_scaled_logprobs_flag:
real_bsz_temp_scaled = temp_scaled_logprobs[:real_bsz]
temperature = sampling_metadata.temperature[:real_bsz]
temp_temperature = paddle.where(real_bsz_temp_scaled, temperature, paddle.ones_like(temperature))
@@ -385,7 +385,11 @@ class Sampler(nn.Layer):
top_p_logprob = None
top_p_req_mask = None
if top_p_normalized_logprobs is not None and share_inputs is not None:
if (
top_p_normalized_logprobs is not None
and share_inputs is not None
and sampling_metadata.top_p_normalized_logprobs_flag
):
seq_lens_this_time = share_inputs["seq_lens_this_time"].reshape([-1, 1])[:real_bsz]
seq_lens_encoder = share_inputs["seq_lens_encoder"].reshape([-1, 1])[:real_bsz]
seq_lens_decoder = share_inputs["seq_lens_decoder"].reshape([-1, 1])[:real_bsz]

View File

@@ -126,11 +126,18 @@ class GPUModelRunner(ModelRunnerBase):
self.enable_early_stop = self.fd_config.early_stop_config.enable_early_stop
self.is_pooling_model = self.fd_config.model_config.runner_type == "pooling"
self.ori_vocab_size = self.fd_config.model_config.ori_vocab_size
self.max_logprobs = (
self.ori_vocab_size if fd_config.model_config.max_logprobs == -1 else fd_config.model_config.max_logprobs
)
self.max_logprobs = None
if self.enable_logprob:
self.max_logprobs = (
self.ori_vocab_size
if fd_config.model_config.max_logprobs == -1
else fd_config.model_config.max_logprobs
)
self.temp_scaled_logprobs = True
self.top_p_normalized_logprobs = True
self.prompt_logprobs_reqs: dict[str, Request] = {}
self.in_progress_prompt_logprobs: dict[str, LogprobsTensors] = {}
self.forward_batch_reqs_list: list[Request] = [None for _ in range(self.scheduler_config.max_num_seqs)]
# VL model config:
if self.enable_mm:
@@ -664,6 +671,7 @@ class GPUModelRunner(ModelRunnerBase):
# pooling model request.sampling_params is None
if request.sampling_params is not None and request.sampling_params.prompt_logprobs is not None:
self.prompt_logprobs_reqs[request.request_id] = request
self.forward_batch_reqs_list[idx] = request
has_prefill_task = True
# Routing Replay
@@ -696,6 +704,7 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs["is_block_step"][idx : idx + 1] = False
self.prompt_logprobs_reqs.pop(request.request_id, None)
self.in_progress_prompt_logprobs.pop(request.request_id, None)
self.forward_batch_reqs_list[idx] = None
continue
assert len(request.eos_token_ids) == self.model_config.eos_tokens_lens
@@ -1355,6 +1364,24 @@ class GPUModelRunner(ModelRunnerBase):
self.cache_config.block_size,
self.speculative_config.num_speculative_tokens if self.speculative_decoding else 0,
)
logprobs_reqs = [
req
for req in self.forward_batch_reqs_list
if req is not None and req.sampling_params is not None and req.sampling_params.logprobs is not None
]
if len(logprobs_reqs):
self.max_logprobs = max(
[
self.ori_vocab_size if req.sampling_params.logprobs < 0 else req.sampling_params.logprobs
for req in logprobs_reqs
]
)
self.temp_scaled_logprobs = any(req.sampling_params.temp_scaled_logprobs for req in logprobs_reqs)
self.top_p_normalized_logprobs = any(
req.sampling_params.top_p_normalized_logprobs for req in logprobs_reqs
)
else:
self.max_logprobs = None
# Remove padding
(
@@ -1410,9 +1437,11 @@ class GPUModelRunner(ModelRunnerBase):
min_dec_lens=self.share_inputs["min_dec_len"],
bad_words_token_ids=self.share_inputs["bad_tokens"][:, :max_bad_tokens_len],
eos_token_ids=self.share_inputs["eos_token_id"],
max_num_logprobs=self.max_logprobs if self.enable_logprob else None,
max_num_logprobs=self.max_logprobs,
enable_early_stop=self.enable_early_stop,
stop_flags=self.share_inputs["stop_flags"],
temp_scaled_logprobs_flag=self.temp_scaled_logprobs,
top_p_normalized_logprobs_flag=self.top_p_normalized_logprobs,
temp_scaled_logprobs=self.share_inputs["temp_scaled_logprobs"],
top_p_normalized_logprobs=self.share_inputs["top_p_normalized_logprobs"],
logits_processors=self.share_inputs["logits_processors"],
@@ -2666,6 +2695,7 @@ class GPUModelRunner(ModelRunnerBase):
# prompt_logprobs
self.prompt_logprobs_reqs.clear()
self.in_progress_prompt_logprobs.clear()
self.forward_batch_reqs_list = [None for _ in range(self.scheduler_config.max_num_seqs)]
def update_parameters(self, pid):
"""Dynamic model loader use to update parameters use for RL"""