[Feature] Support return logprob of generated tokens (#2784)

* online chat support logprobs

* check xpu

* check vl_gpu_model_runner

* only cuda support logprob

* get_worker() check platform

---------

Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
This commit is contained in:
chen
2025-07-10 15:47:42 +08:00
committed by GitHub
parent 39d2a1de46
commit 823a47e64a
21 changed files with 592 additions and 105 deletions

View File

@@ -24,16 +24,18 @@
#endif
#define MAX_BSZ 512
#define K 10
#define K 20
struct msgdata {
long mtype;
int mtext[MAX_BSZ * (K + 1) + 2]; // stop_flag, bsz, tokens
float mtext_f[MAX_BSZ * (K + 1)]; // score
int mtext_ranks[MAX_BSZ]; // ranks
};
void GetOutputTopK(const paddle::Tensor& x,
const paddle::Tensor& scores,
const paddle::Tensor& ranks,
int k,
int64_t rank_id,
bool wait_flag) {
@@ -66,17 +68,18 @@ void GetOutputTopK(const paddle::Tensor& x,
int64_t* out_data = const_cast<int64_t*>(x.data<int64_t>());
float* scores_data = const_cast<float*>(scores.data<float>());
int64_t* ranks_data = const_cast<int64_t*>(ranks.data<int64_t>());
int ret = -1;
if (!wait_flag) {
ret = msgrcv(msgid,
&msg_rcv,
(MAX_BSZ * (K + 1) + 2) * 4 + MAX_BSZ * (K + 1) * 4,
(MAX_BSZ * (K + 1) + 2) * 4 + MAX_BSZ * (K + 1) * 4 + MAX_BSZ * 4,
0,
IPC_NOWAIT);
} else {
ret = msgrcv(msgid,
&msg_rcv,
(MAX_BSZ * (K + 1) + 2) * 4 + MAX_BSZ * (K + 1) * 4,
(MAX_BSZ * (K + 1) + 2) * 4 + MAX_BSZ * (K + 1) * 4 + MAX_BSZ * 4,
0,
0);
}
@@ -97,13 +100,14 @@ void GetOutputTopK(const paddle::Tensor& x,
out_data[offset + 2] = (int64_t)msg_rcv.mtext[offset + 2];
scores_data[offset] = msg_rcv.mtext_f[offset];
}
ranks_data[i] = (int64_t)msg_rcv.mtext_ranks[i];
}
return;
}
PD_BUILD_STATIC_OP(get_output_topk)
.Inputs({"x", "scores"})
.Inputs({"x", "scores", "ranks"})
.Attrs({"k: int", "rank_id: int64_t", "wait_flag: bool"})
.Outputs({"x_out", "scores_out"})
.SetInplaceMap({{"x", "x_out"}, {"scores", "scores_out"}})
.Outputs({"x_out", "scores_out", "ranks_out"})
.SetInplaceMap({{"x", "x_out"}, {"scores", "scores_out"}, {"ranks", "ranks_out"}})
.SetKernelFn(PD_KERNEL(GetOutputTopK));

View File

@@ -23,34 +23,34 @@
#define PD_BUILD_STATIC_OP(name) PD_BUILD_OP(static_op_##name)
#endif
#define MAX_BSZ 128
#define K 10
#define MAX_BSZ 512
#define K 20
// #define SAVE_WITH_OUTPUT_DEBUG
struct msgdata {
long mtype;
int mtext[MAX_BSZ * (K + 1) + 2]; // stop_flag, bsz, tokens
float mtext_f[MAX_BSZ * (K + 1)]; // score
int mtext_ranks[MAX_BSZ]; // ranks
};
void SaveOutMmsgTopK(const paddle::Tensor& x,
const paddle::Tensor& scores,
const paddle::Tensor& topk_ids,
const paddle::Tensor& topk_scores, // [bsz, k]
const paddle::Tensor& logprob_token_ids, // [bsz, k+1]
const paddle::Tensor& logprob_scores, // [bsz, k+1]
const paddle::Tensor& ranks,
const paddle::Tensor& not_need_stop,
int k,
int64_t rank_id) {
if (rank_id > 0) {
return;
}
auto x_cpu = x.copy_to(paddle::CPUPlace(), false);
auto scores_cpu = scores.copy_to(paddle::CPUPlace(), false);
auto topk_ids_cpu = topk_ids.copy_to(paddle::CPUPlace(), false);
auto topk_scores_cpu = topk_scores.copy_to(paddle::CPUPlace(), false);
auto logprob_token_ids_cpu = logprob_token_ids.copy_to(paddle::CPUPlace(), false);
auto logprob_scores_cpu = logprob_scores.copy_to(paddle::CPUPlace(), false);
auto ranks_cpu = ranks.copy_to(paddle::CPUPlace(), false);
int64_t* x_data = x_cpu.data<int64_t>();
float* scores_data = scores_cpu.data<float>();
int64_t* topk_ids_data = topk_ids_cpu.data<int64_t>();
float* topk_scores_data = topk_scores_cpu.data<float>();
int64_t* logprob_token_ids_data = logprob_token_ids_cpu.data<int64_t>();
float* logprob_scores_data = logprob_scores_cpu.data<float>();
int64_t* ranks_data = ranks_cpu.data<int64_t>();
static struct msgdata msg_sed;
int msg_queue_id = 1;
if (const char* inference_msg_queue_id_env_p =
@@ -106,21 +106,23 @@ void SaveOutMmsgTopK(const paddle::Tensor& x,
msg_sed.mtext[0] = not_need_stop_data ? inference_msg_id_from_env
: -inference_msg_id_from_env;
int bsz = x.shape()[0];
int max_num_logprobs = logprob_token_ids.shape()[1];
msg_sed.mtext[1] = bsz;
for (int i = 0; i < bsz; i++) {
for (int j = 0; j < k + 1; j++) {
for (int j = 0; j < K + 1; j++) {
const int64_t offset = i * (K + 1) + j;
if (j == 0) {
msg_sed.mtext[offset + 2] = (int)x_data[i];
msg_sed.mtext_f[offset] = scores_data[i];
} else if (j <= k + 1) {
msg_sed.mtext[offset + 2] = (int)topk_ids_data[i * k + j - 1];
msg_sed.mtext_f[offset] = topk_scores_data[i * k + j - 1];
msg_sed.mtext_f[offset] = logprob_scores_data[i * max_num_logprobs + j];
} else if (j < max_num_logprobs) {
msg_sed.mtext[offset + 2] = (int)logprob_token_ids_data[i * max_num_logprobs + j];
msg_sed.mtext_f[offset] = logprob_scores_data[i * max_num_logprobs + j];
} else {
msg_sed.mtext[offset + 2] = -1;
msg_sed.mtext_f[offset] = 0.0;
}
}
msg_sed.mtext_ranks[i] = (int)ranks_data[i];
}
#ifdef SAVE_WITH_OUTPUT_DEBUG
std::cout << "msg data: ";
@@ -131,7 +133,7 @@ void SaveOutMmsgTopK(const paddle::Tensor& x,
#endif
if ((msgsnd(msgid,
&msg_sed,
(MAX_BSZ * (K + 1) + 2) * 4 + (MAX_BSZ * (K + 1)) * 4,
(MAX_BSZ * (K + 1) + 2) * 4 + (MAX_BSZ * (K + 1)) * 4 + MAX_BSZ * 4,
0)) == -1) {
printf("full msg buffer\n");
}
@@ -139,8 +141,8 @@ void SaveOutMmsgTopK(const paddle::Tensor& x,
}
PD_BUILD_STATIC_OP(save_output_topk)
.Inputs({"x", "scores", "topk_ids", "topk_scores", "not_need_stop"})
.Attrs({"k: int", "rank_id: int64_t"})
.Inputs({"x", "topk_ids", "logprob_scores", "ranks", "not_need_stop"})
.Attrs({"rank_id: int64_t"})
.Outputs({"x_out"})
.SetInplaceMap({{"x", "x_out"}})
.SetKernelFn(PD_KERNEL(SaveOutMmsgTopK));

View File

@@ -22,6 +22,7 @@ setup(
"gpu_ops/save_with_output_msg.cc",
"gpu_ops/get_output.cc",
"gpu_ops/get_output_msg_with_topk.cc",
"gpu_ops/save_output_msg_with_topk.cc",
"gpu_ops/transfer_output.cc",
"cpu_ops/rebuild_padding.cc",
],

View File

@@ -296,6 +296,12 @@ class EngineArgs:
max_capture_batch_size=64, FastDeploy will capture graphs for batches [1,64].
"""
enable_logprob: bool = False
"""
Flag to enable logprob output. Default is False (disabled).
Must be explicitly enabled via the `--enable-logprob` startup parameter to output logprob values.
"""
def __post_init__(self):
"""
Post-initialization processing to set default tokenizer if not provided.
@@ -416,6 +422,11 @@ class EngineArgs:
help=
"Disabled any whitespaces when using guided decoding backend XGrammar."
)
model_group.add_argument("--enable-logprob",
action="store_true",
default=EngineArgs.enable_logprob,
help="Enable output of token-level log probabilities."
)
# Parallel processing parameters group
parallel_group = parser.add_argument_group("Parallel Configuration")
@@ -791,4 +802,5 @@ class EngineArgs:
max_capture_batch_size=self.max_capture_batch_size,
guided_decoding_backend=self.guided_decoding_backend,
disable_any_whitespace=self.guided_decoding_disable_any_whitespace,
enable_logprob = self.enable_logprob,
)

View File

@@ -585,6 +585,7 @@ class Config:
max_capture_batch_size: int = 64,
guided_decoding_backend: Optional[str] = None,
disable_any_whitespace: bool = False,
enable_logprob: bool = False,
):
"""
Initialize the Config class.
@@ -678,6 +679,8 @@ class Config:
self.parallel_config.expert_parallel_size), 8))])
self.device_ids = os.getenv("CUDA_VISIBLE_DEVICES", self.device_ids)
self.enable_logprob = enable_logprob
self.read_from_config()
self.postprocess()
self.check()

View File

@@ -1068,6 +1068,7 @@ class LLMEngine(object):
self.cfg.enable_static_graph_inference,
"use_cudagraph": self.cfg.use_cudagraph,
"disable_any_whitespace": self.cfg.disable_any_whitespace,
"enable_logprob": self.cfg.enable_logprob,
}
for worker_flag, value in worker_append_flag.items():
if value:

View File

@@ -24,6 +24,7 @@ import numpy
from fastdeploy.engine.sampling_params import SamplingParams
from fastdeploy.utils import data_processor_logger
from fastdeploy.worker.output import LogprobsLists
@dataclass
@@ -189,6 +190,8 @@ class CompletionOutput:
index: int
send_idx: int
token_ids: list[int]
logprob: Optional[float] = None
top_logprobs: Optional[LogprobsLists] = None
draft_token_ids: list[int] = None
text: Optional[str] = None
reasoning_content: Optional[str] = None
@@ -201,6 +204,8 @@ class CompletionOutput:
"index": self.index,
"send_idx": self.send_idx,
"token_ids": self.token_ids,
"logprob": self.logprob,
"top_logprobs": self.top_logprobs,
"draft_token_ids": self.draft_token_ids,
"text": self.text,
"reasoning_content": self.reasoning_content

View File

@@ -173,6 +173,13 @@ class SamplingParams:
f"temperature must be non-negative, got {self.temperature}.")
if self.top_p is not None and not 0.0 <= self.top_p <= 1.0:
raise ValueError(f"top_p must be in [0, 1], got {self.top_p}.")
# quietly accept -1 as disabled, but prefer 0
if self.top_k < -1:
raise ValueError(f"top_k must be 0 (disable), or at least 1, "
f"got {self.top_k}.")
if not isinstance(self.top_k, int):
raise TypeError(
f"top_k must be an integer, got {type(self.top_k).__name__}")
if self.max_tokens is not None and self.max_tokens < 1:
raise ValueError(
@@ -192,6 +199,9 @@ class SamplingParams:
if self.logprobs is not None and self.logprobs < 0:
raise ValueError(
f"logprobs must be non-negative, got {self.logprobs}.")
if self.logprobs is not None and self.logprobs > 20:
raise ValueError(
"Invalid value for 'top_logprobs': must be less than or equal to 20.")
if not 0 <= self.seed <= 922337203685477580:
raise ValueError("seed must be in [0, 922337203685477580], got "

View File

@@ -122,6 +122,7 @@ class ChatCompletionResponseChoice(BaseModel):
"""
index: int
message: ChatMessage
logprobs: Optional[LogProbs] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]]
@@ -136,6 +137,21 @@ class ChatCompletionResponse(BaseModel):
choices: List[ChatCompletionResponseChoice]
usage: UsageInfo
class LogProbEntry(BaseModel):
"""
Log probability entry.
"""
token: str
logprob: float
bytes: Optional[List[int]] = None
top_logprobs: Optional[List["LogProbEntry"]] = None
class LogProbs(BaseModel):
"""
LogProbs.
"""
content: Optional[List[LogProbEntry]] = None
refusal: Optional[Union[str, None]] = None
class DeltaMessage(BaseModel):
"""
@@ -154,6 +170,7 @@ class ChatCompletionResponseStreamChoice(BaseModel):
"""
index: int
delta: DeltaMessage
logprobs: Optional[LogProbs] = None
finish_reason: Optional[Literal["stop", "length", "tool_calls"]] = None
arrival_time: Optional[float] = None
@@ -392,6 +409,8 @@ class ChatCompletionRequest(BaseModel):
tools: Optional[List[ChatCompletionToolsParam]] = None
model: Optional[str] = "default"
frequency_penalty: Optional[float] = None
logprobs: Optional[bool] = False
top_logprobs: Optional[int] = 0
# remove max_tokens when field is removed from OpenAI API
max_tokens: Optional[int] = Field(
default=None,
@@ -434,6 +453,9 @@ class ChatCompletionRequest(BaseModel):
if request_id is not None:
req_dict['request_id'] = request_id
req_dict["max_tokens"] = self.max_completion_tokens or self.max_tokens
req_dict["logprobs"] = self.top_logprobs if self.logprobs else None
if self.metadata is not None:
for key, value in self.metadata.items():
req_dict[key] = value
@@ -505,3 +527,18 @@ class ChatCompletionRequest(BaseModel):
)
return data
@model_validator(mode="before")
@classmethod
def check_logprobs(cls, data):
if (top_logprobs := data.get("top_logprobs")) is not None:
if top_logprobs < 0:
raise ValueError("`top_logprobs` must be a positive value.")
if top_logprobs > 0 and not data.get("logprobs"):
raise ValueError(
"when using `top_logprobs`, `logprobs` must be set to true."
)
return data

View File

@@ -15,34 +15,23 @@
"""
import asyncio
import aiozmq
from aiozmq import zmq
import json
import time
from collections.abc import AsyncGenerator, AsyncIterator
from typing import Callable, Optional, Union, List
import traceback
import uuid
from typing import List, Optional
import aiozmq
from aiozmq import zmq
from fastapi import Request
from pydantic import BaseModel
from fastdeploy.entrypoints.openai.protocol import (
ChatCompletionRequest,
DeltaMessage,
ChatCompletionResponseChoice,
ChatCompletionStreamResponse,
ChatCompletionResponseStreamChoice,
ChatMessage,
UsageInfo,
PromptTokenUsageInfo,
ChatCompletionResponse,
ErrorResponse,
)
ChatCompletionRequest, ChatCompletionResponse,
ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice,
ChatCompletionStreamResponse, ChatMessage, DeltaMessage, ErrorResponse,
LogProbEntry, LogProbs, PromptTokenUsageInfo, UsageInfo)
from fastdeploy.metrics.work_metrics import work_process_metrics
from fastdeploy.utils import api_server_logger
from fastdeploy.engine.request import RequestOutput
from fastdeploy.worker.output import LogprobsLists
class OpenAIServingChat:
@@ -200,6 +189,18 @@ class OpenAIServingChat:
output = res["outputs"]
delta_text = output["text"]
raw_top_logprobs = output["top_logprobs"]
logprobs_res = None
if raw_top_logprobs is not None:
top_logprobs = LogprobsLists(
logprob_token_ids=raw_top_logprobs[0],
logprobs=raw_top_logprobs[1],
sampled_token_ranks=raw_top_logprobs[2],
)
logprobs_res = self.build_logprobs_response(
logprobs=top_logprobs,
request_top_logprobs=request.top_logprobs,
)
previous_num_tokens += len(output["token_ids"])
delta_message = DeltaMessage(content=delta_text, reasoning_content=output.get("reasoning_content"), \
@@ -208,6 +209,7 @@ class OpenAIServingChat:
choice = ChatCompletionResponseStreamChoice(
index=0,
delta=delta_message,
logprobs=logprobs_res,
arrival_time=arrival_time
)
if res["finished"]:
@@ -286,6 +288,7 @@ class OpenAIServingChat:
final_res = None
previous_num_tokens = 0
current_waiting_time = 0
logprob_contents = []
while True:
try:
raw_data = await asyncio.wait_for(dealer.read(), timeout=10)
@@ -310,6 +313,21 @@ class OpenAIServingChat:
data, stream=False, enable_thinking=enable_thinking)
# api_server_logger.debug(f"Client {request_id} received: {data}")
previous_num_tokens += len(data["outputs"]["token_ids"])
# The logprob for handling the response
output = data["outputs"]
raw_top_logprobs = output["top_logprobs"]
if raw_top_logprobs is not None:
top_logprobs = LogprobsLists(
logprob_token_ids=raw_top_logprobs[0],
logprobs=raw_top_logprobs[1],
sampled_token_ranks=raw_top_logprobs[2],
)
logprobs_res = self.build_logprobs_response(
logprobs=top_logprobs,
request_top_logprobs=request.top_logprobs,
)
if logprobs_res and logprobs_res.content is not None:
logprob_contents.extend(logprobs_res.content)
if data["finished"]:
final_res = data
break
@@ -325,10 +343,16 @@ class OpenAIServingChat:
tool_calls=output.get("tool_call_content"),
token_ids=output.get("token_ids")
)
logprobs_full_res = None
if logprob_contents:
logprobs_full_res = LogProbs(
content=logprob_contents
)
choice = ChatCompletionResponseChoice(
index=0,
message=message,
logprobs=logprobs_full_res,
finish_reason=None
)
if request.max_tokens is None or previous_num_tokens != request.max_tokens:
@@ -359,3 +383,54 @@ class OpenAIServingChat:
choices=choices,
usage=usage
)
def build_logprobs_response(
self,
logprobs: Optional[LogprobsLists],
request_top_logprobs: int,
) -> Optional[LogProbs]:
"""
Construct a logprobs response object in line with the OpenAI style.
Retain the complete top-k candidates and avoid circular references.
"""
# Parameter validation
if (
logprobs is None
or request_top_logprobs is None
or request_top_logprobs <= 0
or len(logprobs.logprob_token_ids) == 0
):
return None
try:
# The top-k candidates for the current token
topk_token_ids = logprobs.logprob_token_ids[0][:request_top_logprobs + 1]
topk_logprobs = logprobs.logprobs[0][:request_top_logprobs + 1]
# Construct the candidate token structure (LogProbEntry) of topk
top_logprob_entries: List[LogProbEntry] = []
for tid, lp in zip(topk_token_ids, topk_logprobs):
token_str = self.engine_client.data_processor.process_logprob_response([tid],
clean_up_tokenization_spaces=False)
# token_bytes = token_str.encode("utf-8", errors="replace")
entry = LogProbEntry(
token=token_str,
logprob=lp,
# bytes=list(token_bytes)
)
top_logprob_entries.append(entry)
# Construct the sampled token object (avoid sharing references with top_logprob_entries)
sampled_entry = LogProbEntry(
token=top_logprob_entries[0].token,
logprob=top_logprob_entries[0].logprob,
bytes=top_logprob_entries[0].bytes,
top_logprobs=top_logprob_entries[1:] # Here are the complete topk candidates
)
return LogProbs(content=[sampled_entry])
except Exception as e:
api_server_logger.error("Error in build_logprobs_response: %s", e)
api_server_logger.error(traceback.format_exc())
return None

View File

@@ -20,10 +20,9 @@ import numpy as np
from paddleformers.generation import GenerationConfig
from fastdeploy import envs
from fastdeploy.utils import data_processor_logger
from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer
from fastdeploy.input.text_processor import BaseDataProcessor
from fastdeploy.utils import data_processor_logger
_SAMPLING_EPS = 1e-5
@@ -444,3 +443,7 @@ class ErnieProcessor(BaseDataProcessor):
data_processor_logger.debug(
f"processed stop_seqs: {stop_seqs}, {stop_seqs_len}")
return stop_seqs, stop_seqs_len
def process_logprob_response(self, token_ids, **kwargs):
full_text = self.tokenizer.decode(token_ids, **kwargs)
return full_text

View File

@@ -309,6 +309,10 @@ class DataProcessor(BaseDataProcessor):
data_processor_logger.info(f"Processed request {request}")
return request
def process_logprob_response(self, token_ids, **kwargs):
full_text = self.tokenizer.decode(token_ids, **kwargs)
return full_text
def process_response(self, response_dict, **kwargs):
"""
Preprocess the response

View File

@@ -42,3 +42,4 @@ class SamplingMetadata:
top_p: paddle.Tensor
top_k: Optional[paddle.Tensor] = None
max_num_logprobs: Optional[int] = None

View File

@@ -29,6 +29,7 @@ from fastdeploy.model_executor.layers.sample.ops import (
apply_penalty_multi_scores, apply_speculative_penalty_multi_scores,
top_k_top_p_sampling)
from fastdeploy.platforms import current_platform
from fastdeploy.worker.output import LogprobsTensors, SamplerOutput
class SamplerProcessor:
@@ -188,14 +189,65 @@ class Sampler(nn.Layer):
""" pre process before running """
self.processor.pre_process(skip_idx_list)
def compute_logprobs(self, logits: paddle.Tensor) -> paddle.Tensor:
"""
"""
return F.log_softmax(logits, axis=-1)
def gather_logprobs(
self,
logprobs: paddle.Tensor,
num_logprobs: int,
token_ids: paddle.Tensor,
) -> LogprobsTensors:
"""
Gather logprobs for topk and sampled/prompt token.
Args:
logprobs: (num tokens) x (vocab) tensor
num_logprobs: minimum number of logprobs to
retain per token
token_ids: prompt tokens (if prompt logprobs)
or sampled tokens (if sampled
logprobs); 1D token ID tensor
with (num tokens) elements
Must be int64.
Returns:
Top-k int indices tensor, (num tokens) x (num_logprobs + 1)
Top-k float logprobs tensor, (num tokens) x (num_logprobs + 1)
Sampled token rank tensor, (num tokens)
"""
assert token_ids.dtype == paddle.int64
# Get with the logprob of the prompt or sampled token.
token_logprobs = paddle.take_along_axis(logprobs, token_ids, axis=-1)
# Compute the ranks of the actual token.
token_ranks = (logprobs >= token_logprobs).sum(-1)
if num_logprobs >= 1:
# Find the topK values.
topk_logprobs, topk_indices = paddle.topk(logprobs,
num_logprobs,
axis=-1)
indices = paddle.concat([token_ids, topk_indices], axis=1)
top_logprobs = paddle.concat([token_logprobs, topk_logprobs], axis=1)
else:
indices = token_ids
top_logprobs = token_logprobs
return LogprobsTensors(indices, top_logprobs, token_ranks)
def forward_cuda(
self,
logits: paddle.Tensor,
sampling_metadata: SamplingMetadata,
skip_idx_list: List[int] = [],
) -> paddle.Tensor:
) -> SamplerOutput:
"""
"""
num_logprobs = sampling_metadata.max_num_logprobs
if num_logprobs is not None:
raw_logprobs = self.compute_logprobs(logits)
logits = self.processor.apply_token_mask(logits, skip_idx_list)
logits = apply_penalty_multi_scores(
@@ -215,8 +267,19 @@ class Sampler(nn.Layer):
_, next_tokens = top_k_top_p_sampling(probs, sampling_metadata.top_p, sampling_metadata.top_k)
logprobs_tensors = None if num_logprobs is None else \
self.gather_logprobs(raw_logprobs, num_logprobs, token_ids=next_tokens)
self.processor.update_output_tokens(next_tokens, skip_idx_list)
return next_tokens
sampler_output = SamplerOutput(
# The sampled tokens are expanded to 2D tensor with shape
# [num_requests, 1], where each row represents one generated
# token per request.
sampled_token_ids=next_tokens,
logprobs_tensors=logprobs_tensors,
)
return sampler_output
class SpeculativeSampler(nn.Layer):

View File

@@ -20,14 +20,16 @@ import paddle
from fastdeploy import envs
from fastdeploy.engine.config import SpeculativeConfig
from fastdeploy.model_executor.ops.gpu import (
get_padding_offset, save_output, set_stop_value_multi_ends,
speculate_clear_accept_nums, speculate_get_output_padding_offset,
speculate_get_padding_offset, speculate_get_seq_lens_output,
speculate_save_output, speculate_set_value_by_flags_and_idx,
speculate_step_paddle, speculate_step_system_cache, speculate_update_v3,
step_paddle, step_system_cache, update_inputs, step_reschedule)
get_padding_offset, save_output, save_output_topk,
set_stop_value_multi_ends, speculate_clear_accept_nums,
speculate_get_output_padding_offset, speculate_get_padding_offset,
speculate_get_seq_lens_output, speculate_save_output,
speculate_set_value_by_flags_and_idx, speculate_step_paddle,
speculate_step_system_cache, speculate_update_v3, step_paddle,
step_reschedule, step_system_cache, update_inputs)
from fastdeploy.platforms import current_platform
from fastdeploy.worker.output import ModelOutputData
from fastdeploy.worker.output import (ModelOutputData, ModelRunnerOutput,
SamplerOutput)
DISABLE_RECOVER = (envs.FD_DISABLED_RECOVER == "1")
@@ -102,10 +104,10 @@ def pre_process(
cu_seqlens_k, output_cum_offsets, output_padding_offset)
def post_process_normal(sampled_token_ids: paddle.Tensor,
def post_process_normal(sampler_output: SamplerOutput,
model_output: ModelOutputData,
save_each_rank: bool = False,
skip_save_output: bool = False) -> None:
skip_save_output: bool = False) -> ModelRunnerOutput:
""" Post-processing steps after completing a single token generation. """
# 1. Set stop value
paddle.assign(
@@ -123,7 +125,7 @@ def post_process_normal(sampled_token_ids: paddle.Tensor,
model_output.stop_flags,
)
# TODO(gongshaotian): Add use_stop_seqs
set_stop_value_multi_ends(sampled_token_ids, model_output.stop_flags,
set_stop_value_multi_ends(sampler_output.sampled_token_ids, model_output.stop_flags,
model_output.seq_lens_this_time,
model_output.eos_token_id,
model_output.next_tokens, False) # multi ends
@@ -138,18 +140,28 @@ def post_process_normal(sampled_token_ids: paddle.Tensor,
model_output.seq_lens_decoder,
model_output.input_ids,
model_output.stop_nums,
sampled_token_ids,
sampler_output.sampled_token_ids,
model_output.is_block_step,
)
# 3. Transmit the model's output and stop generation signal via message queue.
# In the future, we will abandon this approach.
if not skip_save_output:
if sampler_output.logprobs_tensors is None:
save_output(
sampled_token_ids,
sampler_output.sampled_token_ids,
model_output.not_need_stop,
model_output.mp_rank,
save_each_rank, # save_each_rank
)
else:
save_output_topk(
sampler_output.sampled_token_ids,
sampler_output.logprobs_tensors.logprob_token_ids,
sampler_output.logprobs_tensors.logprobs,
sampler_output.logprobs_tensors.selected_token_ranks,
model_output.not_need_stop,
model_output.mp_rank,
)
def post_process_specualate(model_output, skip_save_output: bool = False):
""""""
@@ -193,7 +205,7 @@ def post_process_specualate(model_output, skip_save_output: bool = False):
)
def post_process(sampled_token_ids: paddle.Tensor,
def post_process(sampler_output: SamplerOutput,
model_output: ModelOutputData,
save_each_rank: bool = False,
speculative_decoding: bool = False,
@@ -202,7 +214,7 @@ def post_process(sampled_token_ids: paddle.Tensor,
if speculative_decoding:
post_process_specualate(model_output, skip_save_output)
else:
post_process_normal(sampled_token_ids, model_output, save_each_rank,
post_process_normal(sampler_output, model_output, save_each_rank,
skip_save_output)

View File

@@ -30,9 +30,11 @@ from fastdeploy.inter_communicator import IPCSignal
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.platforms import current_platform
from fastdeploy.utils import llm_logger, spec_logger
from fastdeploy.worker.output import LogprobsLists
RECOVERY_STOP_SIGNAL = -3
MAX_BSZ = 512
K = 20
MAX_DRAFT_TOKENS = 6
SPECULATE_MAX_BSZ = 256
@@ -62,6 +64,13 @@ class TokenProcessor(object):
],
fill_value=2,
dtype="int64")
elif self.cfg.enable_logprob:
self.output_tokens = paddle.full(
shape=[MAX_BSZ * (K + 1) + 2, 1], fill_value=2, dtype="int64")
self.output_scores = paddle.full(
shape=[MAX_BSZ * (K + 1), 1], fill_value=0.0, dtype="float32")
self.output_ranks = paddle.full(
shape=[MAX_BSZ], fill_value=0, dtype="int64")
else:
self.output_tokens = paddle.full(shape=[MAX_BSZ + 2, 1],
fill_value=2,
@@ -109,12 +118,51 @@ class TokenProcessor(object):
assert self.resource_manager is not None, "The resource manager is None, cannot run."
if self.worker is not None:
raise Exception("Worker is already running!")
use_logprobs = (
self.cfg.enable_logprob
and not self.speculative_decoding
and not self.cfg.parallel_config.enable_expert_parallel
)
target_func = (
self.process_sampling_with_logprob_results
if use_logprobs else
self.process_sampling_results
)
self.worker = threading.Thread(target=target_func)
self.worker = threading.Thread(target=self.process_sampling_results,
args=())
self.worker.daemon = True
self.worker.start()
def process_sampling_with_logprob_results(self):
"""
read tokens from paddle inference engine and process logprob results
"""
if current_platform.is_cuda():
from fastdeploy.model_executor.ops.gpu import get_output_topk
else:
raise NotImplementedError("Only CUDA platform supports logprob.")
rank_id = self.cfg.parallel_config.local_data_parallel_id
while True:
try:
is_blocking = True
get_output_topk(self.output_tokens, self.output_scores, self.output_ranks, K, rank_id, is_blocking)
if self.output_tokens[0, 0] == -2:
continue
llm_logger.debug(
f"rank_id {rank_id} self.output_tokens[0, 0] {self.output_tokens[0, 0]}"
f"rank_id {rank_id} self.output_scores[0, 0] {self.output_scores[0, 0]}"
)
self._process_prefill_metrics()
self._process_sampling_with_logprob_batch_output()
except Exception as e:
llm_logger.info("while get input_data error: {0} {1}".format(
e, str(traceback.format_exc())))
def process_sampling_results(self):
"""
read tokens from paddle inference engine and process
@@ -245,6 +293,122 @@ class TokenProcessor(object):
self.number_of_output_tokens = 0
self.total_step = 0
def _process_sampling_with_logprob_batch_output(self):
"""
batch post-processing logprob output function
"""
batch = self.output_tokens[1, 0]
tokens = self.output_tokens[2:batch * (K + 1) + 2].numpy().reshape(
[batch, K + 1])[:, :(K + 1)]
scores = self.output_scores[:batch * (K + 1)].numpy().reshape(
[batch, K + 1])[:, :(K + 1)]
ranks = self.output_ranks[:batch].numpy()
batch_result = list()
for i in range(batch):
if self.resource_manager.stop_flags[i]:
continue
task = self.resource_manager.tasks_list[i]
task_id = task.request_id
token_id = int(tokens[i, 0])
token_ids = [token_id]
recovery_stop = token_id == RECOVERY_STOP_SIGNAL
if recovery_stop:
llm_logger.info(
f"recovery stop signal found at task {task_id}")
if not recovery_stop and token_id < 0:
continue
if task.get("prefill_chunk_info", None) is not None:
prefill_chunk_num = task.get("prefill_chunk_num", 0)
task.prefill_chunk_num = prefill_chunk_num + 1
if task.prefill_chunk_num < len(task.prefill_chunk_info):
continue
self.total_step += 1
current_time = time.time()
if self.tokens_counter[task_id] == 0:
metrics = RequestMetrics(
arrival_time=task.arrival_time,
inference_start_time=task.inference_start_time,
first_token_time=time.time() - task.inference_start_time,
time_in_queue=task.schedule_start_time -
task.preprocess_end_time,
preprocess_cost_time=task.preprocess_end_time -
task.preprocess_start_time)
self._record_first_token_metrics(task, current_time)
else:
metrics = RequestMetrics(
arrival_time=time.time(),
request_start_time=task.arrival_time,
)
self.number_of_output_tokens += len(token_ids)
self._record_metrics(task, current_time, token_ids)
result = RequestOutput(request_id=task_id,
outputs=CompletionOutput(
index=i,
send_idx=self.tokens_counter[task_id],
token_ids=[],
logprob = None,
draft_token_ids=[],
top_logprobs=None,
),
finished=False,
metrics=metrics)
if self.tokens_counter[task_id] == 0:
if task.messages is not None:
result.prompt = task.messages
result.num_cached_tokens = task.num_cached_tokens
is_prefill = task.disaggregate_info is not None and task.disaggregate_info[
"role"] == "prefill"
if is_prefill and len(token_ids) > 1:
result.outputs.draft_token_ids = copy.deepcopy(token_ids)
for idx, token_id in enumerate(token_ids):
self.tokens_counter[task_id] += 1
if token_id != RECOVERY_STOP_SIGNAL:
result.outputs.token_ids.append(token_id)
result.outputs.logprob = float(scores[i, 0])
# Construct top_logprobs
topk_token_ids = tokens[i, :].tolist()
topk_logprobs = scores[i, :].tolist()
sampled_rank = ranks[i].item()
result.outputs.top_logprobs = LogprobsLists(
logprob_token_ids=[topk_token_ids],
logprobs=[topk_logprobs],
sampled_token_ranks=[sampled_rank]
)
if token_id in task.eos_token_ids or is_prefill or recovery_stop:
result.finished = True
result.prompt = task.prompt
result.prompt_token_ids = task.prompt_token_ids
if recovery_stop:
result.error_msg = "Recover is not supported, the result is incomplete!"
llm_logger.info(
f"Request: {task_id} finished, number of "
f"generated tokens: {self.tokens_counter[task_id]}.")
llm_logger.info(
f"Request: {task_id} token ratio: {self.tokens_counter[task_id] / (time.time() - task.inference_start_time)}"
)
llm_logger.info(f"{self.resource_manager.info()}")
if self.cfg.speculative_config.method:
self._compute_speculative_status()
if not is_prefill:
self._record_completion_metrics(task, current_time)
self._recycle_resources(task_id, i, task, result,
is_prefill)
break
if not is_prefill or self.cfg.scheduler_config.name == "splitwise":
batch_result.append(result)
self.postprocess(batch_result)
def _process_batch_output(self):
"""
batch post-processing function

View File

@@ -63,6 +63,7 @@ class GPUModelRunner(ModelRunnerBase):
self.device_id = device_id
self.speculative_method = self.fd_config.speculative_config.method
self.speculative_decoding = self.speculative_method is not None
self.enable_logprob = fd_config.model_config.enable_logprob
self.guided_backend = None
if self.fd_config.parallel_config.guided_decoding_backend != "off":
@@ -612,6 +613,7 @@ class GPUModelRunner(ModelRunnerBase):
min_dec_lens=self.share_inputs["min_dec_len"],
bad_words_token_ids=self.share_inputs["bad_tokens"],
eos_token_ids=self.share_inputs["eos_token_id"],
max_num_logprobs=20 if self.enable_logprob else None,
)
def load_model(self) -> None:
@@ -816,15 +818,15 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs["step_idx"],
self.share_inputs["stop_flags"],
)
sampled_token_ids = self.sampler(logits,
sampler_output = self.sampler(logits,
self.sampling_metadata)
if self.parallel_config.tensor_parallel_degree > 1:
paddle.distributed.broadcast(sampled_token_ids, 0)
paddle.distributed.broadcast(sampler_output.sampled_token_ids, 0)
else:
self.sampler(logits, self.sampling_metadata,
self.parallel_config.max_model_len,
self.share_inputs)
sampled_token_ids = None
sampler_output = None
if self.parallel_config.tensor_parallel_degree > 1:
paddle.distributed.broadcast(
self.share_inputs["accept_tokens"], 0)
@@ -864,7 +866,7 @@ class GPUModelRunner(ModelRunnerBase):
accept_num=self.share_inputs["accept_num"]
if self.speculative_decoding else None)
post_process(sampled_token_ids=sampled_token_ids,
post_process(sampler_output=sampler_output,
model_output=model_output_data,
speculative_decoding=self.speculative_decoding,
skip_save_output=True)
@@ -1051,18 +1053,18 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs["step_idx"],
self.share_inputs["stop_flags"],
)
sampled_token_ids = self.sampler(
sampler_output = self.sampler(
logits,
self.sampling_metadata,
skip_idx_list,
)
if self.parallel_config.tensor_parallel_degree > 1:
paddle.distributed.broadcast(sampled_token_ids, 0)
paddle.distributed.broadcast(sampler_output.sampled_token_ids, 0)
else:
self.sampler(logits, self.sampling_metadata,
self.parallel_config.max_model_len, self.share_inputs)
sampled_token_ids = None
sampler_output = None
if self.parallel_config.tensor_parallel_degree > 1:
paddle.distributed.broadcast(
self.share_inputs["accept_tokens"], 0)
@@ -1105,7 +1107,7 @@ class GPUModelRunner(ModelRunnerBase):
skip_save_output = True
else:
skip_save_output = False
post_process(sampled_token_ids=sampled_token_ids,
post_process(sampler_output=sampler_output,
model_output=model_output_data,
save_each_rank=self.parallel_config.use_ep,
speculative_decoding=self.speculative_decoding,

View File

@@ -15,11 +15,80 @@
"""
from dataclasses import dataclass
from typing import Optional
from typing import NamedTuple, Optional
import paddle
class LogprobsLists(NamedTuple):
"""
"""
# [num_reqs, max_num_logprobs + 1]
logprob_token_ids: list[list[int]]
# [num_reqs, max_num_logprobs + 1]
logprobs: list[list[float]]
# [num_reqs]
sampled_token_ranks: list[int]
def slice(self, start: int, end: int):
"""slice"""
return LogprobsLists(
self.logprob_token_ids[start:end],
self.logprobs[start:end],
self.sampled_token_ranks[start:end],
)
class LogprobsTensors(NamedTuple):
"""
"""
# [num_reqs, max_num_logprobs + 1]
logprob_token_ids: paddle.Tensor
# [num_reqs, max_num_logprobs + 1]
logprobs: paddle.Tensor
# [num_reqs]
selected_token_ranks: paddle.Tensor
def tolists(self):
"""Convert to lists."""
return LogprobsLists(
self.logprob_token_ids.tolist(),
self.logprobs.tolist(),
self.selected_token_ranks.tolist(),
)
@staticmethod
def empty_cpu(num_positions: int,
num_tokens_per_position: int) -> "LogprobsTensors":
"""Create empty LogprobsTensors on CPU."""
logprob_token_ids = paddle.empty(
[num_positions, num_tokens_per_position],
dtype=paddle.int64).cpu()
logprobs = paddle.empty_like(logprob_token_ids, dtype=paddle.float32)
selected_token_ranks = paddle.empty([num_positions],
dtype=paddle.int64).cpu()
return LogprobsTensors(
logprob_token_ids=logprob_token_ids,
logprobs=logprobs,
selected_token_ranks=selected_token_ranks,
)
@dataclass
class SamplerOutput:
"""
"""
# [num_reqs, max_num_generated_tokens]
# Different requests can have different number of generated tokens.
# All requests are padded to max_num_generated_tokens.
# PLACEHOLDER_TOKEN_ID (-1 by default) is used for padding.
sampled_token_ids: paddle.Tensor
logprobs_tensors: Optional[LogprobsTensors]
@dataclass
class ModelOutputData:
"""

View File

@@ -45,6 +45,7 @@ from fastdeploy.model_executor.models.ernie4_5_vl.modeling_resampler import (
ScatterOp, VariableResolutionResamplerModel)
from fastdeploy.platforms import current_platform
from fastdeploy.worker.forward_meta import ForwardMeta
from fastdeploy.worker.output import SamplerOutput
from fastdeploy.worker.utils import check_safetensors_model
from fastdeploy.worker.vl_model_runner_base import VLModelRunnerBase
@@ -52,7 +53,7 @@ if current_platform.is_cuda() and current_platform.available():
from fastdeploy.model_executor.layers.utils import (
remove_padding, speculate_remove_padding)
from fastdeploy.model_executor.ops.gpu import (save_output,
from fastdeploy.model_executor.ops.gpu import (save_output, save_output_topk,
set_stop_value_multi_ends,
set_value_by_flags_and_idx,
update_inputs)
@@ -84,7 +85,7 @@ class GPUVLModelRunner(VLModelRunnerBase):
self.mp_group = hcg.get_model_parallel_group()
self.is_safetensors_model = check_safetensors_model(
args.model_name_or_path)
self.enable_logprob = args.enable_logprob
model_path = os.path.dirname(args.model_name_or_path)
args.llm_model_name_or_path = args.model_name_or_path
if not self.is_safetensors_model:
@@ -825,6 +826,7 @@ class GPUVLModelRunner(VLModelRunnerBase):
min_dec_lens=self.share_inputs["min_dec_len"],
bad_words_token_ids=self.share_inputs["bad_tokens"],
eos_token_ids=self.share_inputs["eos_token_id"],
max_num_logprobs=20 if self.enable_logprob else None,
)
def generate(self) -> None:
@@ -846,17 +848,17 @@ class GPUVLModelRunner(VLModelRunnerBase):
self.share_inputs["stop_flags"],
)
# sampler & save_output
next_tokens = self.sampler(logits, self.sampling_metadata)
sampler_output = self.sampler(logits, self.sampling_metadata)
if self.fd_config.parallel_config.tensor_parallel_degree > 1:
paddle.distributed.broadcast(next_tokens, 0)
self.post_process(next_tokens)
paddle.distributed.broadcast(sampler_output.sampled_token_ids, 0)
self.post_process(sampler_output)
def post_process(self, next_tokens: paddle.Tensor) -> None:
def post_process(self, sampler_output: SamplerOutput) -> None:
"""
post_process
"""
if self.share_inputs["enable_thinking"]:
exists_think_end = next_tokens == self.model_cfg.think_end_id
exists_think_end = sampler_output.sampled_token_ids == self.model_cfg.think_end_id
paddle.assign(
paddle.where(
exists_think_end,
@@ -872,12 +874,12 @@ class GPUVLModelRunner(VLModelRunnerBase):
), self.share_inputs["reasoning_index"])
stop_wo_think = (
(next_tokens == self.share_inputs["eos_token_id"]) |
(sampler_output.sampled_token_ids == self.share_inputs["eos_token_id"]) |
(self.share_inputs["reasoning_index"] == 0)) & (
self.share_inputs["need_think_end"] > 0)
next_tokens = paddle.where(stop_wo_think,
sampler_output.sampled_token_ids = paddle.where(stop_wo_think,
self.model_cfg.think_end_id,
next_tokens)
sampler_output.sampled_token_ids)
paddle.assign(
paddle.where(
stop_wo_think,
@@ -900,7 +902,7 @@ class GPUVLModelRunner(VLModelRunnerBase):
)
set_stop_value_multi_ends(
next_tokens,
sampler_output.sampled_token_ids,
self.share_inputs["stop_flags"],
self.share_inputs["seq_lens_this_time"],
self.share_inputs["eos_token_id"],
@@ -917,15 +919,25 @@ class GPUVLModelRunner(VLModelRunnerBase):
self.share_inputs["seq_lens_decoder"],
self.share_inputs["input_ids"],
self.share_inputs["stop_nums"],
next_tokens,
sampler_output.sampled_token_ids,
self.share_inputs["is_block_step"],
)
if sampler_output.logprobs_tensors is None:
save_output(
next_tokens,
sampler_output.sampled_token_ids,
self.share_inputs["not_need_stop"],
self.rank,
False, # use_ep
)
else:
save_output_topk(
sampler_output.sampled_token_ids,
sampler_output.logprobs_tensors.logprob_token_ids,
sampler_output.logprobs_tensors.logprobs,
sampler_output.logprobs_tensors.selected_token_ranks,
self.share_inputs["not_need_stop"],
self.rank,
)
def _cal_theortical_kvcache(self):
"""

View File

@@ -42,6 +42,8 @@ def get_worker(fd_config: FDConfig, local_rank: int, rank: int) -> WorkerBase:
"""
get worker of different device
"""
if fd_config.model_config.enable_logprob and not current_platform.is_cuda():
raise NotImplementedError("Only CUDA platform supports logprob.")
if current_platform.is_cuda():
from fastdeploy.worker.gpu_worker import GpuWorker
return GpuWorker(fd_config=fd_config, local_rank=local_rank, rank=rank)
@@ -550,6 +552,9 @@ def parse_args():
"'ipc_snapshot': load from disk snapshot of IPC weights, "
"'meta': provide RL traing worker, no_weights_load"
"'normal':normal load weight")
parser.add_argument("--enable_logprob",
action='store_true',
help="Enable output of token-level log probabilities.")
args = parser.parse_args()
return args
@@ -771,6 +776,8 @@ def initialize_fd_config(config) -> FDConfig:
"No quantization config found and use original weight and act dtype."
)
model_config.enable_logprob = config.enable_logprob
model_config.architectures = model_config_dict.get("architectures")
# Update load config

View File

@@ -720,7 +720,7 @@ class XPUModelRunner(ModelRunnerBase):
# 4. Compute logits, Sample
logits = self.model.compute_logits(hiddden_states)
sampled_token_ids = self.sampler(logits, self.sampling_metadata)
sampler_output = self.sampler(logits, self.sampling_metadata)
# 5. Speculative decode
@@ -749,7 +749,7 @@ class XPUModelRunner(ModelRunnerBase):
accept_tokens=None,
accept_num=None,
)
xpu_post_process(sampled_token_ids=sampled_token_ids,
xpu_post_process(sampled_token_ids=sampler_output.sampled_token_ids,
model_output=model_output_data)
# 7. Updata 'infer_seed' and step_paddle()