[FIX 2.0.2] Topk topp sampling fix (#2805)

* fix topk-topp

* fix
This commit is contained in:
Sunny-bot1
2025-07-10 21:15:03 +08:00
committed by GitHub
parent e681e1e719
commit 4025ea7e5b
6 changed files with 27 additions and 70 deletions

View File

@@ -52,7 +52,7 @@ environment_variables: dict[str, Callable[[], Any]] = {
"FD_ATTENTION_BACKEND": "FD_ATTENTION_BACKEND":
lambda: os.getenv("FD_ATTENTION_BACKEND", "APPEND_ATTN"), lambda: os.getenv("FD_ATTENTION_BACKEND", "APPEND_ATTN"),
# Sampling class ("base", "air", or "rejection") # Sampling class ("base", "base_non_truncated", "air", or "rejection")
"FD_SAMPLING_CLASS": "FD_SAMPLING_CLASS":
lambda: os.getenv("FD_SAMPLING_CLASS", "base"), lambda: os.getenv("FD_SAMPLING_CLASS", "base"),
@@ -67,6 +67,6 @@ environment_variables: dict[str, Callable[[], Any]] = {
# Switch from standalone PD to centralized inference (0 or 1) # Switch from standalone PD to centralized inference (0 or 1)
"FD_PD_CHANGEABLE": "FD_PD_CHANGEABLE":
lambda: os.getenv("FD_PD_CHANGEABLE", "1"), lambda: os.getenv("FD_PD_CHANGEABLE", "1"),
} }
``` ```

View File

@@ -1,5 +1,6 @@
# FastDeploy 环境变量说明 # FastDeploy 环境变量说明
FastDeploy 的环境变量保存在了代码库根目录下 fastdeploy/envs.py 文件中,以下是其对应的中文版说明: FastDeploy 的环境变量保存在了代码库根目录下 fastdeploy/envs.py 文件中,以下是其对应的中文版说明:
```python ```python
environment_variables: dict[str, Callable[[], Any]] = { environment_variables: dict[str, Callable[[], Any]] = {
# 构建 FastDeploy 时使用的 CUDA 架构版本,这是一个字符串列表,例如[80,90] # 构建 FastDeploy 时使用的 CUDA 架构版本,这是一个字符串列表,例如[80,90]
@@ -50,7 +51,7 @@ environment_variables: dict[str, Callable[[], Any]] = {
"FD_ATTENTION_BACKEND": "FD_ATTENTION_BACKEND":
lambda: os.getenv("FD_ATTENTION_BACKEND", "APPEND_ATTN"), lambda: os.getenv("FD_ATTENTION_BACKEND", "APPEND_ATTN"),
# 设置采样类别,当前可设置为 "base"、"air" 或 "rejection" # 设置采样类别,当前可设置为 "base"、"base_non_truncated"、"air" 或 "rejection"
"FD_SAMPLING_CLASS": "FD_SAMPLING_CLASS":
lambda: os.getenv("FD_SAMPLING_CLASS", "base"), lambda: os.getenv("FD_SAMPLING_CLASS", "base"),
@@ -65,6 +66,6 @@ environment_variables: dict[str, Callable[[], Any]] = {
# 是否从单机 PD 分离转换为集中式推理 # 是否从单机 PD 分离转换为集中式推理
"FD_PD_CHANGEABLE": "FD_PD_CHANGEABLE":
lambda: os.getenv("FD_PD_CHANGEABLE", "1"), lambda: os.getenv("FD_PD_CHANGEABLE", "1"),
} }
``` ```

View File

@@ -74,7 +74,7 @@ environment_variables: dict[str, Callable[[], Any]] = {
"FD_ATTENTION_BACKEND": "FD_ATTENTION_BACKEND":
lambda: os.getenv("FD_ATTENTION_BACKEND", "APPEND_ATTN"), lambda: os.getenv("FD_ATTENTION_BACKEND", "APPEND_ATTN"),
# Set sampling class. "base", "air" and "rejection" can be set currently. # Set sampling class. "base", "base_non_truncated", "air" and "rejection" can be set currently.
"FD_SAMPLING_CLASS": "FD_SAMPLING_CLASS":
lambda: os.getenv("FD_SAMPLING_CLASS", "base"), lambda: os.getenv("FD_SAMPLING_CLASS", "base"),

View File

@@ -64,9 +64,20 @@ def top_k_top_p_sampling(
seed=seed, seed=seed,
k=k, k=k,
mode=mode) mode=mode)
# rejection
elif top_p_class == "rejection": elif top_p_class == "rejection":
ids = rejection_top_p_sampling(x, top_p, top_k, seed, order) ids = rejection_top_p_sampling(x, top_p, top_k, seed, order)
_ = None _ = None
# base non-truncated
elif top_p_class == "base_non_truncated":
_, ids = paddle.tensor.top_p_sampling(x,
top_p,
threshold=threshold,
topp_seed=topp_seed,
seed=seed,
k=k,
mode="non-truncated")
# base truncated
else: else:
_, ids = paddle.tensor.top_p_sampling(x, _, ids = paddle.tensor.top_p_sampling(x,
top_p, top_p,
@@ -74,7 +85,7 @@ def top_k_top_p_sampling(
topp_seed=topp_seed, topp_seed=topp_seed,
seed=seed, seed=seed,
k=k, k=k,
mode=mode) mode="truncated")
return _, ids return _, ids
@@ -102,26 +113,25 @@ def air_top_p_sampling(
def rejection_top_p_sampling( def rejection_top_p_sampling(
x: paddle.Tensor, x: paddle.Tensor,
top_p: paddle.Tensor, top_p: paddle.Tensor,
top_k: Optional[paddle.Tensor] = None, top_k: paddle.Tensor,
seed: int = -1, seed: int = -1,
order: Literal['top_k_first', 'joint'] = "top_k_first", order: Literal['top_k_first', 'joint'] = "top_k_first",
) -> paddle.Tensor: ) -> paddle.Tensor:
""" """
rejection_top_p_sampling rejection_top_p_sampling
""" """
assert top_p is not None, "Top_p should not be none when FD_SAMPLING_CLASS is rejection"
try: try:
from fastdeploy.model_executor.ops.gpu import ( from fastdeploy.model_executor.ops.gpu import (
rejection_top_p_sampling, top_k_renorm_probs) rejection_top_p_sampling, top_k_renorm_probs)
if top_k is None: if paddle.count_nonzero(top_k) == 0:
ids = rejection_top_p_sampling( ids = rejection_top_p_sampling(
x, x,
top_p, top_p,
None, None,
seed, seed,
) )
elif top_k is not None and top_p is not None: else:
if order == "top_k_first": if order == "top_k_first":
renorm_probs = top_k_renorm_probs(x, top_k) renorm_probs = top_k_renorm_probs(x, top_k)
ids = rejection_top_p_sampling( ids = rejection_top_p_sampling(
@@ -137,10 +147,6 @@ def rejection_top_p_sampling(
top_k, top_k,
seed, seed,
) )
else:
raise ValueError(
"Top_p cannot be none."
)
except ImportError: except ImportError:
raise RuntimeError("Cannot import rejection_top_p_sampling op.") raise RuntimeError("Cannot import rejection_top_p_sampling op.")
return ids return ids

View File

@@ -164,28 +164,12 @@ class GPUModelRunner(ModelRunnerBase):
-1].disaggregate_info["role"] == "prefill": -1].disaggregate_info["role"] == "prefill":
os.environ['PREFILL_NODE_ONE_STEP_STOP'] = "1" os.environ['PREFILL_NODE_ONE_STEP_STOP'] = "1"
top_k_reqs = []
top_p_reqs = []
max_num_seqs = self.parallel_config.max_num_seqs
top_p_buffer = paddle.full([max_num_seqs, 1],
self.model_config.top_p,
dtype='float32')
top_k_buffer = paddle.full([max_num_seqs, 1],
0,
dtype='int64')
req_len = len(req_dicts) req_len = len(req_dicts)
for i in range(req_len): for i in range(req_len):
request = req_dicts[i] request = req_dicts[i]
idx = request.idx idx = request.idx
length = len(request.prompt_token_ids) length = len(request.prompt_token_ids)
if sampling_params := request.sampling_params:
if sampling_params.top_p < 1:
top_p_reqs.append(idx)
top_k = sampling_params.top_k
if top_k > 0:
top_k_reqs.append(idx)
prefill_tokens = [] prefill_tokens = []
if (request.guided_json is not None if (request.guided_json is not None
or request.guided_regex is not None or request.guided_regex is not None
@@ -260,8 +244,8 @@ class GPUModelRunner(ModelRunnerBase):
request.eos_token_ids.append(request.eos_token_ids[0]) request.eos_token_ids.append(request.eos_token_ids[0])
self.share_inputs["eos_token_id"][:] = np.array( self.share_inputs["eos_token_id"][:] = np.array(
request.eos_token_ids, dtype="int64").reshape(-1, 1) request.eos_token_ids, dtype="int64").reshape(-1, 1)
top_p_buffer[idx:idx + 1] = request.get("top_p", 1.0) self.share_inputs["top_p"][idx:idx + 1] = request.get("top_p", 1.0)
top_k_buffer[idx:idx + 1] = request.get("top_k", 0) self.share_inputs["top_k"][idx:idx + 1] = request.get("top_k", 0)
self.share_inputs["temperature"][idx:idx + 1] = request.get( self.share_inputs["temperature"][idx:idx + 1] = request.get(
"temperature", 0.95) "temperature", 0.95)
self.share_inputs["penalty_score"][idx:idx + 1] = request.get( self.share_inputs["penalty_score"][idx:idx + 1] = request.get(
@@ -312,16 +296,6 @@ class GPUModelRunner(ModelRunnerBase):
if self.speculative_method in ["mtp"]: if self.speculative_method in ["mtp"]:
self.proposer.insert_prefill_inputs(req_dicts) self.proposer.insert_prefill_inputs(req_dicts)
if len(top_k_reqs) == 0:
self.share_inputs["top_k"] = None
else:
self.share_inputs["top_k"] = top_k_buffer
if len(top_p_reqs) == 0:
self.share_inputs["top_p"] = None
else:
self.share_inputs["top_p"] = top_p_buffer
def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int, def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int,
expected_decode_len: int): expected_decode_len: int):
""" Set dummy prefill inputs to share_inputs """ """ Set dummy prefill inputs to share_inputs """

View File

@@ -266,26 +266,11 @@ class XPUModelRunner(ModelRunnerBase):
def process_prefill_inputs(self, req_dicts: List[Request]): def process_prefill_inputs(self, req_dicts: List[Request]):
""" Process inputs for prefill tasks and update share_inputs buffer """ """ Process inputs for prefill tasks and update share_inputs buffer """
top_k_reqs = []
top_p_reqs = []
max_num_seqs = self.parallel_config.max_num_seqs
top_p_buffer = paddle.full([max_num_seqs, 1],
self.model_config.top_p,
dtype='float32')
top_k_buffer = paddle.full([max_num_seqs, 1],
0,
dtype='int64')
req_len = len(req_dicts) req_len = len(req_dicts)
for i in range(req_len): for i in range(req_len):
request = req_dicts[i] request = req_dicts[i]
idx = request.idx idx = request.idx
length = request.prompt_token_ids_len length = request.prompt_token_ids_len
if sampling_params := request.sampling_params:
if sampling_params.top_p < 1:
top_p_reqs.append(idx)
top_k = sampling_params.top_k
if top_k > 0:
top_k_reqs.append(idx)
self.share_inputs["input_ids"][idx:idx + 1, :length] = np.array( self.share_inputs["input_ids"][idx:idx + 1, :length] = np.array(
request.prompt_token_ids) request.prompt_token_ids)
if len(request.eos_token_ids if len(request.eos_token_ids
@@ -294,8 +279,8 @@ class XPUModelRunner(ModelRunnerBase):
self.share_inputs["eos_token_id"][:] = np.array( self.share_inputs["eos_token_id"][:] = np.array(
request.eos_token_ids, dtype="int64").reshape(-1, 1) request.eos_token_ids, dtype="int64").reshape(-1, 1)
self.share_inputs["pre_ids"][idx:idx + 1] = -1 self.share_inputs["pre_ids"][idx:idx + 1] = -1
top_p_buffer[idx:idx + 1] = request.get("top_p", 1.0) self.share_inputs["top_p"][idx:idx + 1] = request.get("top_p", 1.0)
top_k_buffer[idx:idx + 1] = request.get("top_k", 0) self.share_inputs["top_k"][idx:idx + 1] = request.get("top_k", 0)
self.share_inputs["temperature"][idx:idx + 1] = request.get( self.share_inputs["temperature"][idx:idx + 1] = request.get(
"temperature", 0.95) "temperature", 0.95)
self.share_inputs["penalty_score"][idx:idx + 1] = request.get( self.share_inputs["penalty_score"][idx:idx + 1] = request.get(
@@ -344,15 +329,6 @@ class XPUModelRunner(ModelRunnerBase):
request.get("stop_token_ids"), dtype="int64") request.get("stop_token_ids"), dtype="int64")
self.share_inputs["not_need_stop"][0] = True self.share_inputs["not_need_stop"][0] = True
if len(top_k_reqs) == 0:
self.share_inputs["top_k"] = None
else:
self.share_inputs["top_k"] = top_k_buffer
if len(top_p_reqs) == 0:
self.share_inputs["top_p"] = None
else:
self.share_inputs["top_p"] = top_p_buffer
def _init_share_inputs(self, max_num_seqs: int): def _init_share_inputs(self, max_num_seqs: int):
"""Initialize all share buffers for model inputs. """Initialize all share buffers for model inputs.