From 823a47e64a52821db780ebeee4f3aae68203225a Mon Sep 17 00:00:00 2001 From: chen <103103266+ckl117@users.noreply.github.com> Date: Thu, 10 Jul 2025 15:47:42 +0800 Subject: [PATCH] [Feature] Support return logprob of generated tokens (#2784) * online chat support logprobs * check xpu * check vl_gpu_model_runner * only cuda support logprob * get_worker() check platform --------- Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com> --- .../gpu_ops/get_output_msg_with_topk.cc | 16 +- .../gpu_ops/save_output_msg_with_topk.cc | 42 ++--- custom_ops/setup_ops_base.py | 1 + fastdeploy/engine/args_utils.py | 12 ++ fastdeploy/engine/config.py | 3 + fastdeploy/engine/engine.py | 1 + fastdeploy/engine/request.py | 5 + fastdeploy/engine/sampling_params.py | 10 ++ fastdeploy/entrypoints/openai/protocol.py | 37 ++++ fastdeploy/entrypoints/openai/serving_chat.py | 123 ++++++++++--- fastdeploy/input/ernie_processor.py | 7 +- fastdeploy/input/text_processor.py | 4 + .../model_executor/layers/sample/meta_data.py | 1 + .../model_executor/layers/sample/sampler.py | 67 ++++++- .../model_executor/pre_and_post_process.py | 52 +++--- fastdeploy/output/token_processor.py | 168 +++++++++++++++++- fastdeploy/worker/gpu_model_runner.py | 18 +- fastdeploy/worker/output.py | 71 +++++++- fastdeploy/worker/vl_gpu_model_runner.py | 48 +++-- fastdeploy/worker/worker_process.py | 7 + fastdeploy/worker/xpu_model_runner.py | 4 +- 21 files changed, 592 insertions(+), 105 deletions(-) diff --git a/custom_ops/gpu_ops/get_output_msg_with_topk.cc b/custom_ops/gpu_ops/get_output_msg_with_topk.cc index c4b6b14a4..5da88dc1d 100644 --- a/custom_ops/gpu_ops/get_output_msg_with_topk.cc +++ b/custom_ops/gpu_ops/get_output_msg_with_topk.cc @@ -24,16 +24,18 @@ #endif #define MAX_BSZ 512 -#define K 10 +#define K 20 struct msgdata { long mtype; int mtext[MAX_BSZ * (K + 1) + 2]; // stop_flag, bsz, tokens float mtext_f[MAX_BSZ * (K + 1)]; // score + int mtext_ranks[MAX_BSZ]; // ranks }; void GetOutputTopK(const paddle::Tensor& x, const paddle::Tensor& scores, + const paddle::Tensor& ranks, int k, int64_t rank_id, bool wait_flag) { @@ -66,17 +68,18 @@ void GetOutputTopK(const paddle::Tensor& x, int64_t* out_data = const_cast(x.data()); float* scores_data = const_cast(scores.data()); + int64_t* ranks_data = const_cast(ranks.data()); int ret = -1; if (!wait_flag) { ret = msgrcv(msgid, &msg_rcv, - (MAX_BSZ * (K + 1) + 2) * 4 + MAX_BSZ * (K + 1) * 4, + (MAX_BSZ * (K + 1) + 2) * 4 + MAX_BSZ * (K + 1) * 4 + MAX_BSZ * 4, 0, IPC_NOWAIT); } else { ret = msgrcv(msgid, &msg_rcv, - (MAX_BSZ * (K + 1) + 2) * 4 + MAX_BSZ * (K + 1) * 4, + (MAX_BSZ * (K + 1) + 2) * 4 + MAX_BSZ * (K + 1) * 4 + MAX_BSZ * 4, 0, 0); } @@ -97,13 +100,14 @@ void GetOutputTopK(const paddle::Tensor& x, out_data[offset + 2] = (int64_t)msg_rcv.mtext[offset + 2]; scores_data[offset] = msg_rcv.mtext_f[offset]; } + ranks_data[i] = (int64_t)msg_rcv.mtext_ranks[i]; } return; } PD_BUILD_STATIC_OP(get_output_topk) - .Inputs({"x", "scores"}) + .Inputs({"x", "scores", "ranks"}) .Attrs({"k: int", "rank_id: int64_t", "wait_flag: bool"}) - .Outputs({"x_out", "scores_out"}) - .SetInplaceMap({{"x", "x_out"}, {"scores", "scores_out"}}) + .Outputs({"x_out", "scores_out", "ranks_out"}) + .SetInplaceMap({{"x", "x_out"}, {"scores", "scores_out"}, {"ranks", "ranks_out"}}) .SetKernelFn(PD_KERNEL(GetOutputTopK)); diff --git a/custom_ops/gpu_ops/save_output_msg_with_topk.cc b/custom_ops/gpu_ops/save_output_msg_with_topk.cc index ee2cf865d..a9bf763b9 100644 --- a/custom_ops/gpu_ops/save_output_msg_with_topk.cc +++ b/custom_ops/gpu_ops/save_output_msg_with_topk.cc @@ -23,34 +23,34 @@ #define PD_BUILD_STATIC_OP(name) PD_BUILD_OP(static_op_##name) #endif -#define MAX_BSZ 128 -#define K 10 +#define MAX_BSZ 512 +#define K 20 // #define SAVE_WITH_OUTPUT_DEBUG struct msgdata { long mtype; int mtext[MAX_BSZ * (K + 1) + 2]; // stop_flag, bsz, tokens float mtext_f[MAX_BSZ * (K + 1)]; // score + int mtext_ranks[MAX_BSZ]; // ranks }; void SaveOutMmsgTopK(const paddle::Tensor& x, - const paddle::Tensor& scores, - const paddle::Tensor& topk_ids, - const paddle::Tensor& topk_scores, // [bsz, k] + const paddle::Tensor& logprob_token_ids, // [bsz, k+1] + const paddle::Tensor& logprob_scores, // [bsz, k+1] + const paddle::Tensor& ranks, const paddle::Tensor& not_need_stop, - int k, int64_t rank_id) { if (rank_id > 0) { return; } auto x_cpu = x.copy_to(paddle::CPUPlace(), false); - auto scores_cpu = scores.copy_to(paddle::CPUPlace(), false); - auto topk_ids_cpu = topk_ids.copy_to(paddle::CPUPlace(), false); - auto topk_scores_cpu = topk_scores.copy_to(paddle::CPUPlace(), false); + auto logprob_token_ids_cpu = logprob_token_ids.copy_to(paddle::CPUPlace(), false); + auto logprob_scores_cpu = logprob_scores.copy_to(paddle::CPUPlace(), false); + auto ranks_cpu = ranks.copy_to(paddle::CPUPlace(), false); int64_t* x_data = x_cpu.data(); - float* scores_data = scores_cpu.data(); - int64_t* topk_ids_data = topk_ids_cpu.data(); - float* topk_scores_data = topk_scores_cpu.data(); + int64_t* logprob_token_ids_data = logprob_token_ids_cpu.data(); + float* logprob_scores_data = logprob_scores_cpu.data(); + int64_t* ranks_data = ranks_cpu.data(); static struct msgdata msg_sed; int msg_queue_id = 1; if (const char* inference_msg_queue_id_env_p = @@ -106,21 +106,23 @@ void SaveOutMmsgTopK(const paddle::Tensor& x, msg_sed.mtext[0] = not_need_stop_data ? inference_msg_id_from_env : -inference_msg_id_from_env; int bsz = x.shape()[0]; + int max_num_logprobs = logprob_token_ids.shape()[1]; msg_sed.mtext[1] = bsz; for (int i = 0; i < bsz; i++) { - for (int j = 0; j < k + 1; j++) { + for (int j = 0; j < K + 1; j++) { const int64_t offset = i * (K + 1) + j; if (j == 0) { msg_sed.mtext[offset + 2] = (int)x_data[i]; - msg_sed.mtext_f[offset] = scores_data[i]; - } else if (j <= k + 1) { - msg_sed.mtext[offset + 2] = (int)topk_ids_data[i * k + j - 1]; - msg_sed.mtext_f[offset] = topk_scores_data[i * k + j - 1]; + msg_sed.mtext_f[offset] = logprob_scores_data[i * max_num_logprobs + j]; + } else if (j < max_num_logprobs) { + msg_sed.mtext[offset + 2] = (int)logprob_token_ids_data[i * max_num_logprobs + j]; + msg_sed.mtext_f[offset] = logprob_scores_data[i * max_num_logprobs + j]; } else { msg_sed.mtext[offset + 2] = -1; msg_sed.mtext_f[offset] = 0.0; } } + msg_sed.mtext_ranks[i] = (int)ranks_data[i]; } #ifdef SAVE_WITH_OUTPUT_DEBUG std::cout << "msg data: "; @@ -131,7 +133,7 @@ void SaveOutMmsgTopK(const paddle::Tensor& x, #endif if ((msgsnd(msgid, &msg_sed, - (MAX_BSZ * (K + 1) + 2) * 4 + (MAX_BSZ * (K + 1)) * 4, + (MAX_BSZ * (K + 1) + 2) * 4 + (MAX_BSZ * (K + 1)) * 4 + MAX_BSZ * 4, 0)) == -1) { printf("full msg buffer\n"); } @@ -139,8 +141,8 @@ void SaveOutMmsgTopK(const paddle::Tensor& x, } PD_BUILD_STATIC_OP(save_output_topk) - .Inputs({"x", "scores", "topk_ids", "topk_scores", "not_need_stop"}) - .Attrs({"k: int", "rank_id: int64_t"}) + .Inputs({"x", "topk_ids", "logprob_scores", "ranks", "not_need_stop"}) + .Attrs({"rank_id: int64_t"}) .Outputs({"x_out"}) .SetInplaceMap({{"x", "x_out"}}) .SetKernelFn(PD_KERNEL(SaveOutMmsgTopK)); diff --git a/custom_ops/setup_ops_base.py b/custom_ops/setup_ops_base.py index fb8b76b75..d05b1d39e 100644 --- a/custom_ops/setup_ops_base.py +++ b/custom_ops/setup_ops_base.py @@ -22,6 +22,7 @@ setup( "gpu_ops/save_with_output_msg.cc", "gpu_ops/get_output.cc", "gpu_ops/get_output_msg_with_topk.cc", + "gpu_ops/save_output_msg_with_topk.cc", "gpu_ops/transfer_output.cc", "cpu_ops/rebuild_padding.cc", ], diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index fefca65c1..19707435a 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -296,6 +296,12 @@ class EngineArgs: max_capture_batch_size=64, FastDeploy will capture graphs for batches [1,64]. """ + enable_logprob: bool = False + """ + Flag to enable logprob output. Default is False (disabled). + Must be explicitly enabled via the `--enable-logprob` startup parameter to output logprob values. + """ + def __post_init__(self): """ Post-initialization processing to set default tokenizer if not provided. @@ -416,6 +422,11 @@ class EngineArgs: help= "Disabled any whitespaces when using guided decoding backend XGrammar." ) + model_group.add_argument("--enable-logprob", + action="store_true", + default=EngineArgs.enable_logprob, + help="Enable output of token-level log probabilities." + ) # Parallel processing parameters group parallel_group = parser.add_argument_group("Parallel Configuration") @@ -791,4 +802,5 @@ class EngineArgs: max_capture_batch_size=self.max_capture_batch_size, guided_decoding_backend=self.guided_decoding_backend, disable_any_whitespace=self.guided_decoding_disable_any_whitespace, + enable_logprob = self.enable_logprob, ) diff --git a/fastdeploy/engine/config.py b/fastdeploy/engine/config.py index da50988cc..c2c41f807 100644 --- a/fastdeploy/engine/config.py +++ b/fastdeploy/engine/config.py @@ -585,6 +585,7 @@ class Config: max_capture_batch_size: int = 64, guided_decoding_backend: Optional[str] = None, disable_any_whitespace: bool = False, + enable_logprob: bool = False, ): """ Initialize the Config class. @@ -678,6 +679,8 @@ class Config: self.parallel_config.expert_parallel_size), 8))]) self.device_ids = os.getenv("CUDA_VISIBLE_DEVICES", self.device_ids) + self.enable_logprob = enable_logprob + self.read_from_config() self.postprocess() self.check() diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index db6c60602..17692129c 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -1068,6 +1068,7 @@ class LLMEngine(object): self.cfg.enable_static_graph_inference, "use_cudagraph": self.cfg.use_cudagraph, "disable_any_whitespace": self.cfg.disable_any_whitespace, + "enable_logprob": self.cfg.enable_logprob, } for worker_flag, value in worker_append_flag.items(): if value: diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index e71f39806..af3df438a 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -24,6 +24,7 @@ import numpy from fastdeploy.engine.sampling_params import SamplingParams from fastdeploy.utils import data_processor_logger +from fastdeploy.worker.output import LogprobsLists @dataclass @@ -189,6 +190,8 @@ class CompletionOutput: index: int send_idx: int token_ids: list[int] + logprob: Optional[float] = None + top_logprobs: Optional[LogprobsLists] = None draft_token_ids: list[int] = None text: Optional[str] = None reasoning_content: Optional[str] = None @@ -201,6 +204,8 @@ class CompletionOutput: "index": self.index, "send_idx": self.send_idx, "token_ids": self.token_ids, + "logprob": self.logprob, + "top_logprobs": self.top_logprobs, "draft_token_ids": self.draft_token_ids, "text": self.text, "reasoning_content": self.reasoning_content diff --git a/fastdeploy/engine/sampling_params.py b/fastdeploy/engine/sampling_params.py index 00a6aeb13..a7912407a 100644 --- a/fastdeploy/engine/sampling_params.py +++ b/fastdeploy/engine/sampling_params.py @@ -173,6 +173,13 @@ class SamplingParams: f"temperature must be non-negative, got {self.temperature}.") if self.top_p is not None and not 0.0 <= self.top_p <= 1.0: raise ValueError(f"top_p must be in [0, 1], got {self.top_p}.") + # quietly accept -1 as disabled, but prefer 0 + if self.top_k < -1: + raise ValueError(f"top_k must be 0 (disable), or at least 1, " + f"got {self.top_k}.") + if not isinstance(self.top_k, int): + raise TypeError( + f"top_k must be an integer, got {type(self.top_k).__name__}") if self.max_tokens is not None and self.max_tokens < 1: raise ValueError( @@ -192,6 +199,9 @@ class SamplingParams: if self.logprobs is not None and self.logprobs < 0: raise ValueError( f"logprobs must be non-negative, got {self.logprobs}.") + if self.logprobs is not None and self.logprobs > 20: + raise ValueError( + "Invalid value for 'top_logprobs': must be less than or equal to 20.") if not 0 <= self.seed <= 922337203685477580: raise ValueError("seed must be in [0, 922337203685477580], got " diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index 76a575152..d4391e567 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -122,6 +122,7 @@ class ChatCompletionResponseChoice(BaseModel): """ index: int message: ChatMessage + logprobs: Optional[LogProbs] = None finish_reason: Optional[Literal["stop", "length", "tool_calls", "recover_stop"]] @@ -136,6 +137,21 @@ class ChatCompletionResponse(BaseModel): choices: List[ChatCompletionResponseChoice] usage: UsageInfo +class LogProbEntry(BaseModel): + """ + Log probability entry. + """ + token: str + logprob: float + bytes: Optional[List[int]] = None + top_logprobs: Optional[List["LogProbEntry"]] = None + +class LogProbs(BaseModel): + """ + LogProbs. + """ + content: Optional[List[LogProbEntry]] = None + refusal: Optional[Union[str, None]] = None class DeltaMessage(BaseModel): """ @@ -154,6 +170,7 @@ class ChatCompletionResponseStreamChoice(BaseModel): """ index: int delta: DeltaMessage + logprobs: Optional[LogProbs] = None finish_reason: Optional[Literal["stop", "length", "tool_calls"]] = None arrival_time: Optional[float] = None @@ -392,6 +409,8 @@ class ChatCompletionRequest(BaseModel): tools: Optional[List[ChatCompletionToolsParam]] = None model: Optional[str] = "default" frequency_penalty: Optional[float] = None + logprobs: Optional[bool] = False + top_logprobs: Optional[int] = 0 # remove max_tokens when field is removed from OpenAI API max_tokens: Optional[int] = Field( default=None, @@ -434,6 +453,9 @@ class ChatCompletionRequest(BaseModel): if request_id is not None: req_dict['request_id'] = request_id + req_dict["max_tokens"] = self.max_completion_tokens or self.max_tokens + req_dict["logprobs"] = self.top_logprobs if self.logprobs else None + if self.metadata is not None: for key, value in self.metadata.items(): req_dict[key] = value @@ -505,3 +527,18 @@ class ChatCompletionRequest(BaseModel): ) return data + + @model_validator(mode="before") + @classmethod + def check_logprobs(cls, data): + + if (top_logprobs := data.get("top_logprobs")) is not None: + if top_logprobs < 0: + raise ValueError("`top_logprobs` must be a positive value.") + + if top_logprobs > 0 and not data.get("logprobs"): + raise ValueError( + "when using `top_logprobs`, `logprobs` must be set to true." + ) + + return data diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 876adbf87..dc2bb4d45 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -15,34 +15,23 @@ """ import asyncio -import aiozmq -from aiozmq import zmq import json import time -from collections.abc import AsyncGenerator, AsyncIterator -from typing import Callable, Optional, Union, List +import traceback import uuid +from typing import List, Optional + +import aiozmq +from aiozmq import zmq -from fastapi import Request -from pydantic import BaseModel from fastdeploy.entrypoints.openai.protocol import ( - ChatCompletionRequest, - DeltaMessage, - ChatCompletionResponseChoice, - ChatCompletionStreamResponse, - ChatCompletionResponseStreamChoice, - ChatMessage, - UsageInfo, - PromptTokenUsageInfo, - ChatCompletionResponse, - ErrorResponse, -) + ChatCompletionRequest, ChatCompletionResponse, + ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice, + ChatCompletionStreamResponse, ChatMessage, DeltaMessage, ErrorResponse, + LogProbEntry, LogProbs, PromptTokenUsageInfo, UsageInfo) from fastdeploy.metrics.work_metrics import work_process_metrics - from fastdeploy.utils import api_server_logger - -from fastdeploy.engine.request import RequestOutput - +from fastdeploy.worker.output import LogprobsLists class OpenAIServingChat: @@ -157,7 +146,7 @@ class OpenAIServingChat: current_waiting_time = 0 await asyncio.sleep(0.1) continue - + res = json.loads(raw_data[-1].decode('utf-8')) if res.get("error_code", 200) != 200: raise ValueError("{}".format(res["error_msg"])) @@ -200,6 +189,18 @@ class OpenAIServingChat: output = res["outputs"] delta_text = output["text"] + raw_top_logprobs = output["top_logprobs"] + logprobs_res = None + if raw_top_logprobs is not None: + top_logprobs = LogprobsLists( + logprob_token_ids=raw_top_logprobs[0], + logprobs=raw_top_logprobs[1], + sampled_token_ranks=raw_top_logprobs[2], + ) + logprobs_res = self.build_logprobs_response( + logprobs=top_logprobs, + request_top_logprobs=request.top_logprobs, + ) previous_num_tokens += len(output["token_ids"]) delta_message = DeltaMessage(content=delta_text, reasoning_content=output.get("reasoning_content"), \ @@ -208,6 +209,7 @@ class OpenAIServingChat: choice = ChatCompletionResponseStreamChoice( index=0, delta=delta_message, + logprobs=logprobs_res, arrival_time=arrival_time ) if res["finished"]: @@ -220,7 +222,7 @@ class OpenAIServingChat: choice.finish_reason = "tool_calls" else: choice.finish_reason = "length" - + if res.get("error_msg") is not None and "Recover" in res["error_msg"]: choice.finish_reason = "recover_stop" @@ -286,6 +288,7 @@ class OpenAIServingChat: final_res = None previous_num_tokens = 0 current_waiting_time = 0 + logprob_contents = [] while True: try: raw_data = await asyncio.wait_for(dealer.read(), timeout=10) @@ -310,6 +313,21 @@ class OpenAIServingChat: data, stream=False, enable_thinking=enable_thinking) # api_server_logger.debug(f"Client {request_id} received: {data}") previous_num_tokens += len(data["outputs"]["token_ids"]) + # The logprob for handling the response + output = data["outputs"] + raw_top_logprobs = output["top_logprobs"] + if raw_top_logprobs is not None: + top_logprobs = LogprobsLists( + logprob_token_ids=raw_top_logprobs[0], + logprobs=raw_top_logprobs[1], + sampled_token_ranks=raw_top_logprobs[2], + ) + logprobs_res = self.build_logprobs_response( + logprobs=top_logprobs, + request_top_logprobs=request.top_logprobs, + ) + if logprobs_res and logprobs_res.content is not None: + logprob_contents.extend(logprobs_res.content) if data["finished"]: final_res = data break @@ -325,10 +343,16 @@ class OpenAIServingChat: tool_calls=output.get("tool_call_content"), token_ids=output.get("token_ids") ) + logprobs_full_res = None + if logprob_contents: + logprobs_full_res = LogProbs( + content=logprob_contents + ) choice = ChatCompletionResponseChoice( index=0, message=message, + logprobs=logprobs_full_res, finish_reason=None ) if request.max_tokens is None or previous_num_tokens != request.max_tokens: @@ -338,7 +362,7 @@ class OpenAIServingChat: choice.finish_reason = "tool_calls" else: choice.finish_reason = "length" - + if final_res.get("error_msg") is not None and "Recover" in final_res["error_msg"]: choice.finish_reason = "recover_stop" choices.append(choice) @@ -359,3 +383,54 @@ class OpenAIServingChat: choices=choices, usage=usage ) + + def build_logprobs_response( + self, + logprobs: Optional[LogprobsLists], + request_top_logprobs: int, + ) -> Optional[LogProbs]: + """ + Construct a logprobs response object in line with the OpenAI style. + Retain the complete top-k candidates and avoid circular references. + """ + + # Parameter validation + if ( + logprobs is None + or request_top_logprobs is None + or request_top_logprobs <= 0 + or len(logprobs.logprob_token_ids) == 0 + ): + return None + + try: + # The top-k candidates for the current token + topk_token_ids = logprobs.logprob_token_ids[0][:request_top_logprobs + 1] + topk_logprobs = logprobs.logprobs[0][:request_top_logprobs + 1] + + # Construct the candidate token structure (LogProbEntry) of topk + top_logprob_entries: List[LogProbEntry] = [] + for tid, lp in zip(topk_token_ids, topk_logprobs): + token_str = self.engine_client.data_processor.process_logprob_response([tid], + clean_up_tokenization_spaces=False) + # token_bytes = token_str.encode("utf-8", errors="replace") + entry = LogProbEntry( + token=token_str, + logprob=lp, + # bytes=list(token_bytes) + ) + top_logprob_entries.append(entry) + # Construct the sampled token object (avoid sharing references with top_logprob_entries) + sampled_entry = LogProbEntry( + token=top_logprob_entries[0].token, + logprob=top_logprob_entries[0].logprob, + bytes=top_logprob_entries[0].bytes, + top_logprobs=top_logprob_entries[1:] # Here are the complete topk candidates + ) + + return LogProbs(content=[sampled_entry]) + + except Exception as e: + api_server_logger.error("Error in build_logprobs_response: %s", e) + api_server_logger.error(traceback.format_exc()) + return None diff --git a/fastdeploy/input/ernie_processor.py b/fastdeploy/input/ernie_processor.py index 51dbed766..fa241b95b 100644 --- a/fastdeploy/input/ernie_processor.py +++ b/fastdeploy/input/ernie_processor.py @@ -20,10 +20,9 @@ import numpy as np from paddleformers.generation import GenerationConfig from fastdeploy import envs -from fastdeploy.utils import data_processor_logger from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer - from fastdeploy.input.text_processor import BaseDataProcessor +from fastdeploy.utils import data_processor_logger _SAMPLING_EPS = 1e-5 @@ -444,3 +443,7 @@ class ErnieProcessor(BaseDataProcessor): data_processor_logger.debug( f"processed stop_seqs: {stop_seqs}, {stop_seqs_len}") return stop_seqs, stop_seqs_len + + def process_logprob_response(self, token_ids, **kwargs): + full_text = self.tokenizer.decode(token_ids, **kwargs) + return full_text diff --git a/fastdeploy/input/text_processor.py b/fastdeploy/input/text_processor.py index ae2dc1f29..9d30dee3e 100644 --- a/fastdeploy/input/text_processor.py +++ b/fastdeploy/input/text_processor.py @@ -309,6 +309,10 @@ class DataProcessor(BaseDataProcessor): data_processor_logger.info(f"Processed request {request}") return request + def process_logprob_response(self, token_ids, **kwargs): + full_text = self.tokenizer.decode(token_ids, **kwargs) + return full_text + def process_response(self, response_dict, **kwargs): """ Preprocess the response diff --git a/fastdeploy/model_executor/layers/sample/meta_data.py b/fastdeploy/model_executor/layers/sample/meta_data.py index 44ebff8d3..41a96ee1e 100644 --- a/fastdeploy/model_executor/layers/sample/meta_data.py +++ b/fastdeploy/model_executor/layers/sample/meta_data.py @@ -42,3 +42,4 @@ class SamplingMetadata: top_p: paddle.Tensor top_k: Optional[paddle.Tensor] = None + max_num_logprobs: Optional[int] = None diff --git a/fastdeploy/model_executor/layers/sample/sampler.py b/fastdeploy/model_executor/layers/sample/sampler.py index 652fde5f4..88450be26 100644 --- a/fastdeploy/model_executor/layers/sample/sampler.py +++ b/fastdeploy/model_executor/layers/sample/sampler.py @@ -29,6 +29,7 @@ from fastdeploy.model_executor.layers.sample.ops import ( apply_penalty_multi_scores, apply_speculative_penalty_multi_scores, top_k_top_p_sampling) from fastdeploy.platforms import current_platform +from fastdeploy.worker.output import LogprobsTensors, SamplerOutput class SamplerProcessor: @@ -188,14 +189,65 @@ class Sampler(nn.Layer): """ pre process before running """ self.processor.pre_process(skip_idx_list) + def compute_logprobs(self, logits: paddle.Tensor) -> paddle.Tensor: + """ + """ + return F.log_softmax(logits, axis=-1) + + def gather_logprobs( + self, + logprobs: paddle.Tensor, + num_logprobs: int, + token_ids: paddle.Tensor, + ) -> LogprobsTensors: + """ + Gather logprobs for topk and sampled/prompt token. + Args: + logprobs: (num tokens) x (vocab) tensor + num_logprobs: minimum number of logprobs to + retain per token + token_ids: prompt tokens (if prompt logprobs) + or sampled tokens (if sampled + logprobs); 1D token ID tensor + with (num tokens) elements + Must be int64. + Returns: + Top-k int indices tensor, (num tokens) x (num_logprobs + 1) + Top-k float logprobs tensor, (num tokens) x (num_logprobs + 1) + Sampled token rank tensor, (num tokens) + """ + assert token_ids.dtype == paddle.int64 + # Get with the logprob of the prompt or sampled token. + token_logprobs = paddle.take_along_axis(logprobs, token_ids, axis=-1) + + # Compute the ranks of the actual token. + token_ranks = (logprobs >= token_logprobs).sum(-1) + + if num_logprobs >= 1: + # Find the topK values. + topk_logprobs, topk_indices = paddle.topk(logprobs, + num_logprobs, + axis=-1) + indices = paddle.concat([token_ids, topk_indices], axis=1) + top_logprobs = paddle.concat([token_logprobs, topk_logprobs], axis=1) + else: + indices = token_ids + top_logprobs = token_logprobs + + return LogprobsTensors(indices, top_logprobs, token_ranks) + def forward_cuda( self, logits: paddle.Tensor, sampling_metadata: SamplingMetadata, skip_idx_list: List[int] = [], - ) -> paddle.Tensor: + ) -> SamplerOutput: """ """ + num_logprobs = sampling_metadata.max_num_logprobs + if num_logprobs is not None: + raw_logprobs = self.compute_logprobs(logits) + logits = self.processor.apply_token_mask(logits, skip_idx_list) logits = apply_penalty_multi_scores( @@ -215,8 +267,19 @@ class Sampler(nn.Layer): _, next_tokens = top_k_top_p_sampling(probs, sampling_metadata.top_p, sampling_metadata.top_k) + logprobs_tensors = None if num_logprobs is None else \ + self.gather_logprobs(raw_logprobs, num_logprobs, token_ids=next_tokens) + self.processor.update_output_tokens(next_tokens, skip_idx_list) - return next_tokens + + sampler_output = SamplerOutput( + # The sampled tokens are expanded to 2D tensor with shape + # [num_requests, 1], where each row represents one generated + # token per request. + sampled_token_ids=next_tokens, + logprobs_tensors=logprobs_tensors, + ) + return sampler_output class SpeculativeSampler(nn.Layer): diff --git a/fastdeploy/model_executor/pre_and_post_process.py b/fastdeploy/model_executor/pre_and_post_process.py index 8f3601ff6..1b86df6b4 100644 --- a/fastdeploy/model_executor/pre_and_post_process.py +++ b/fastdeploy/model_executor/pre_and_post_process.py @@ -20,14 +20,16 @@ import paddle from fastdeploy import envs from fastdeploy.engine.config import SpeculativeConfig from fastdeploy.model_executor.ops.gpu import ( - get_padding_offset, save_output, set_stop_value_multi_ends, - speculate_clear_accept_nums, speculate_get_output_padding_offset, - speculate_get_padding_offset, speculate_get_seq_lens_output, - speculate_save_output, speculate_set_value_by_flags_and_idx, - speculate_step_paddle, speculate_step_system_cache, speculate_update_v3, - step_paddle, step_system_cache, update_inputs, step_reschedule) + get_padding_offset, save_output, save_output_topk, + set_stop_value_multi_ends, speculate_clear_accept_nums, + speculate_get_output_padding_offset, speculate_get_padding_offset, + speculate_get_seq_lens_output, speculate_save_output, + speculate_set_value_by_flags_and_idx, speculate_step_paddle, + speculate_step_system_cache, speculate_update_v3, step_paddle, + step_reschedule, step_system_cache, update_inputs) from fastdeploy.platforms import current_platform -from fastdeploy.worker.output import ModelOutputData +from fastdeploy.worker.output import (ModelOutputData, ModelRunnerOutput, + SamplerOutput) DISABLE_RECOVER = (envs.FD_DISABLED_RECOVER == "1") @@ -102,10 +104,10 @@ def pre_process( cu_seqlens_k, output_cum_offsets, output_padding_offset) -def post_process_normal(sampled_token_ids: paddle.Tensor, +def post_process_normal(sampler_output: SamplerOutput, model_output: ModelOutputData, save_each_rank: bool = False, - skip_save_output: bool = False) -> None: + skip_save_output: bool = False) -> ModelRunnerOutput: """ Post-processing steps after completing a single token generation. """ # 1. Set stop value paddle.assign( @@ -123,7 +125,7 @@ def post_process_normal(sampled_token_ids: paddle.Tensor, model_output.stop_flags, ) # TODO(gongshaotian): Add use_stop_seqs - set_stop_value_multi_ends(sampled_token_ids, model_output.stop_flags, + set_stop_value_multi_ends(sampler_output.sampled_token_ids, model_output.stop_flags, model_output.seq_lens_this_time, model_output.eos_token_id, model_output.next_tokens, False) # multi ends @@ -138,18 +140,28 @@ def post_process_normal(sampled_token_ids: paddle.Tensor, model_output.seq_lens_decoder, model_output.input_ids, model_output.stop_nums, - sampled_token_ids, + sampler_output.sampled_token_ids, model_output.is_block_step, ) # 3. Transmit the model's output and stop generation signal via message queue. # In the future, we will abandon this approach. if not skip_save_output: - save_output( - sampled_token_ids, - model_output.not_need_stop, - model_output.mp_rank, - save_each_rank, # save_each_rank - ) + if sampler_output.logprobs_tensors is None: + save_output( + sampler_output.sampled_token_ids, + model_output.not_need_stop, + model_output.mp_rank, + save_each_rank, # save_each_rank + ) + else: + save_output_topk( + sampler_output.sampled_token_ids, + sampler_output.logprobs_tensors.logprob_token_ids, + sampler_output.logprobs_tensors.logprobs, + sampler_output.logprobs_tensors.selected_token_ranks, + model_output.not_need_stop, + model_output.mp_rank, + ) def post_process_specualate(model_output, skip_save_output: bool = False): """""" @@ -193,7 +205,7 @@ def post_process_specualate(model_output, skip_save_output: bool = False): ) -def post_process(sampled_token_ids: paddle.Tensor, +def post_process(sampler_output: SamplerOutput, model_output: ModelOutputData, save_each_rank: bool = False, speculative_decoding: bool = False, @@ -202,7 +214,7 @@ def post_process(sampled_token_ids: paddle.Tensor, if speculative_decoding: post_process_specualate(model_output, skip_save_output) else: - post_process_normal(sampled_token_ids, model_output, save_each_rank, + post_process_normal(sampler_output, model_output, save_each_rank, skip_save_output) @@ -217,7 +229,7 @@ def step_cuda( TODO(gongshaotian): normalization name """ - + if speculative_config.method is not None: if enable_prefix_caching: speculate_step_system_cache( diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 7e6e951fa..9185c09f5 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -30,9 +30,11 @@ from fastdeploy.inter_communicator import IPCSignal from fastdeploy.metrics.metrics import main_process_metrics from fastdeploy.platforms import current_platform from fastdeploy.utils import llm_logger, spec_logger +from fastdeploy.worker.output import LogprobsLists RECOVERY_STOP_SIGNAL = -3 MAX_BSZ = 512 +K = 20 MAX_DRAFT_TOKENS = 6 SPECULATE_MAX_BSZ = 256 @@ -62,6 +64,13 @@ class TokenProcessor(object): ], fill_value=2, dtype="int64") + elif self.cfg.enable_logprob: + self.output_tokens = paddle.full( + shape=[MAX_BSZ * (K + 1) + 2, 1], fill_value=2, dtype="int64") + self.output_scores = paddle.full( + shape=[MAX_BSZ * (K + 1), 1], fill_value=0.0, dtype="float32") + self.output_ranks = paddle.full( + shape=[MAX_BSZ], fill_value=0, dtype="int64") else: self.output_tokens = paddle.full(shape=[MAX_BSZ + 2, 1], fill_value=2, @@ -109,12 +118,51 @@ class TokenProcessor(object): assert self.resource_manager is not None, "The resource manager is None, cannot run." if self.worker is not None: raise Exception("Worker is already running!") + use_logprobs = ( + self.cfg.enable_logprob + and not self.speculative_decoding + and not self.cfg.parallel_config.enable_expert_parallel + ) + + target_func = ( + self.process_sampling_with_logprob_results + if use_logprobs else + self.process_sampling_results + ) + + self.worker = threading.Thread(target=target_func) - self.worker = threading.Thread(target=self.process_sampling_results, - args=()) self.worker.daemon = True self.worker.start() + def process_sampling_with_logprob_results(self): + """ + read tokens from paddle inference engine and process logprob results + """ + + if current_platform.is_cuda(): + from fastdeploy.model_executor.ops.gpu import get_output_topk + else: + raise NotImplementedError("Only CUDA platform supports logprob.") + rank_id = self.cfg.parallel_config.local_data_parallel_id + + while True: + try: + is_blocking = True + get_output_topk(self.output_tokens, self.output_scores, self.output_ranks, K, rank_id, is_blocking) + + if self.output_tokens[0, 0] == -2: + continue + llm_logger.debug( + f"rank_id {rank_id} self.output_tokens[0, 0] {self.output_tokens[0, 0]}" + f"rank_id {rank_id} self.output_scores[0, 0] {self.output_scores[0, 0]}" + ) + self._process_prefill_metrics() + self._process_sampling_with_logprob_batch_output() + except Exception as e: + llm_logger.info("while get input_data error: {0} {1}".format( + e, str(traceback.format_exc()))) + def process_sampling_results(self): """ read tokens from paddle inference engine and process @@ -245,6 +293,122 @@ class TokenProcessor(object): self.number_of_output_tokens = 0 self.total_step = 0 + def _process_sampling_with_logprob_batch_output(self): + """ + batch post-processing logprob output function + """ + + batch = self.output_tokens[1, 0] + tokens = self.output_tokens[2:batch * (K + 1) + 2].numpy().reshape( + [batch, K + 1])[:, :(K + 1)] + scores = self.output_scores[:batch * (K + 1)].numpy().reshape( + [batch, K + 1])[:, :(K + 1)] + ranks = self.output_ranks[:batch].numpy() + batch_result = list() + for i in range(batch): + if self.resource_manager.stop_flags[i]: + continue + task = self.resource_manager.tasks_list[i] + task_id = task.request_id + token_id = int(tokens[i, 0]) + token_ids = [token_id] + recovery_stop = token_id == RECOVERY_STOP_SIGNAL + if recovery_stop: + llm_logger.info( + f"recovery stop signal found at task {task_id}") + if not recovery_stop and token_id < 0: + continue + + if task.get("prefill_chunk_info", None) is not None: + prefill_chunk_num = task.get("prefill_chunk_num", 0) + task.prefill_chunk_num = prefill_chunk_num + 1 + + if task.prefill_chunk_num < len(task.prefill_chunk_info): + continue + + self.total_step += 1 + current_time = time.time() + if self.tokens_counter[task_id] == 0: + metrics = RequestMetrics( + arrival_time=task.arrival_time, + inference_start_time=task.inference_start_time, + first_token_time=time.time() - task.inference_start_time, + time_in_queue=task.schedule_start_time - + task.preprocess_end_time, + preprocess_cost_time=task.preprocess_end_time - + task.preprocess_start_time) + + self._record_first_token_metrics(task, current_time) + + else: + metrics = RequestMetrics( + arrival_time=time.time(), + request_start_time=task.arrival_time, + ) + self.number_of_output_tokens += len(token_ids) + self._record_metrics(task, current_time, token_ids) + result = RequestOutput(request_id=task_id, + outputs=CompletionOutput( + index=i, + send_idx=self.tokens_counter[task_id], + token_ids=[], + logprob = None, + draft_token_ids=[], + top_logprobs=None, + ), + finished=False, + metrics=metrics) + if self.tokens_counter[task_id] == 0: + if task.messages is not None: + result.prompt = task.messages + result.num_cached_tokens = task.num_cached_tokens + + is_prefill = task.disaggregate_info is not None and task.disaggregate_info[ + "role"] == "prefill" + + if is_prefill and len(token_ids) > 1: + result.outputs.draft_token_ids = copy.deepcopy(token_ids) + + for idx, token_id in enumerate(token_ids): + self.tokens_counter[task_id] += 1 + if token_id != RECOVERY_STOP_SIGNAL: + result.outputs.token_ids.append(token_id) + result.outputs.logprob = float(scores[i, 0]) + # Construct top_logprobs + topk_token_ids = tokens[i, :].tolist() + topk_logprobs = scores[i, :].tolist() + sampled_rank = ranks[i].item() + + result.outputs.top_logprobs = LogprobsLists( + logprob_token_ids=[topk_token_ids], + logprobs=[topk_logprobs], + sampled_token_ranks=[sampled_rank] + ) + if token_id in task.eos_token_ids or is_prefill or recovery_stop: + result.finished = True + result.prompt = task.prompt + result.prompt_token_ids = task.prompt_token_ids + if recovery_stop: + result.error_msg = "Recover is not supported, the result is incomplete!" + llm_logger.info( + f"Request: {task_id} finished, number of " + f"generated tokens: {self.tokens_counter[task_id]}.") + llm_logger.info( + f"Request: {task_id} token ratio: {self.tokens_counter[task_id] / (time.time() - task.inference_start_time)}" + ) + llm_logger.info(f"{self.resource_manager.info()}") + if self.cfg.speculative_config.method: + self._compute_speculative_status() + if not is_prefill: + self._record_completion_metrics(task, current_time) + self._recycle_resources(task_id, i, task, result, + is_prefill) + break + if not is_prefill or self.cfg.scheduler_config.name == "splitwise": + batch_result.append(result) + + self.postprocess(batch_result) + def _process_batch_output(self): """ batch post-processing function diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 8322672cc..17ee152df 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -63,6 +63,7 @@ class GPUModelRunner(ModelRunnerBase): self.device_id = device_id self.speculative_method = self.fd_config.speculative_config.method self.speculative_decoding = self.speculative_method is not None + self.enable_logprob = fd_config.model_config.enable_logprob self.guided_backend = None if self.fd_config.parallel_config.guided_decoding_backend != "off": @@ -612,6 +613,7 @@ class GPUModelRunner(ModelRunnerBase): min_dec_lens=self.share_inputs["min_dec_len"], bad_words_token_ids=self.share_inputs["bad_tokens"], eos_token_ids=self.share_inputs["eos_token_id"], + max_num_logprobs=20 if self.enable_logprob else None, ) def load_model(self) -> None: @@ -816,15 +818,15 @@ class GPUModelRunner(ModelRunnerBase): self.share_inputs["step_idx"], self.share_inputs["stop_flags"], ) - sampled_token_ids = self.sampler(logits, + sampler_output = self.sampler(logits, self.sampling_metadata) if self.parallel_config.tensor_parallel_degree > 1: - paddle.distributed.broadcast(sampled_token_ids, 0) + paddle.distributed.broadcast(sampler_output.sampled_token_ids, 0) else: self.sampler(logits, self.sampling_metadata, self.parallel_config.max_model_len, self.share_inputs) - sampled_token_ids = None + sampler_output = None if self.parallel_config.tensor_parallel_degree > 1: paddle.distributed.broadcast( self.share_inputs["accept_tokens"], 0) @@ -864,7 +866,7 @@ class GPUModelRunner(ModelRunnerBase): accept_num=self.share_inputs["accept_num"] if self.speculative_decoding else None) - post_process(sampled_token_ids=sampled_token_ids, + post_process(sampler_output=sampler_output, model_output=model_output_data, speculative_decoding=self.speculative_decoding, skip_save_output=True) @@ -1051,18 +1053,18 @@ class GPUModelRunner(ModelRunnerBase): self.share_inputs["step_idx"], self.share_inputs["stop_flags"], ) - sampled_token_ids = self.sampler( + sampler_output = self.sampler( logits, self.sampling_metadata, skip_idx_list, ) if self.parallel_config.tensor_parallel_degree > 1: - paddle.distributed.broadcast(sampled_token_ids, 0) + paddle.distributed.broadcast(sampler_output.sampled_token_ids, 0) else: self.sampler(logits, self.sampling_metadata, self.parallel_config.max_model_len, self.share_inputs) - sampled_token_ids = None + sampler_output = None if self.parallel_config.tensor_parallel_degree > 1: paddle.distributed.broadcast( self.share_inputs["accept_tokens"], 0) @@ -1105,7 +1107,7 @@ class GPUModelRunner(ModelRunnerBase): skip_save_output = True else: skip_save_output = False - post_process(sampled_token_ids=sampled_token_ids, + post_process(sampler_output=sampler_output, model_output=model_output_data, save_each_rank=self.parallel_config.use_ep, speculative_decoding=self.speculative_decoding, diff --git a/fastdeploy/worker/output.py b/fastdeploy/worker/output.py index 7d3c1198f..ae5a6b83d 100644 --- a/fastdeploy/worker/output.py +++ b/fastdeploy/worker/output.py @@ -15,11 +15,80 @@ """ from dataclasses import dataclass -from typing import Optional +from typing import NamedTuple, Optional import paddle +class LogprobsLists(NamedTuple): + """ + """ + + # [num_reqs, max_num_logprobs + 1] + logprob_token_ids: list[list[int]] + # [num_reqs, max_num_logprobs + 1] + logprobs: list[list[float]] + # [num_reqs] + sampled_token_ranks: list[int] + + def slice(self, start: int, end: int): + """slice""" + return LogprobsLists( + self.logprob_token_ids[start:end], + self.logprobs[start:end], + self.sampled_token_ranks[start:end], + ) + + +class LogprobsTensors(NamedTuple): + """ + """ + + # [num_reqs, max_num_logprobs + 1] + logprob_token_ids: paddle.Tensor + # [num_reqs, max_num_logprobs + 1] + logprobs: paddle.Tensor + # [num_reqs] + selected_token_ranks: paddle.Tensor + + def tolists(self): + """Convert to lists.""" + return LogprobsLists( + self.logprob_token_ids.tolist(), + self.logprobs.tolist(), + self.selected_token_ranks.tolist(), + ) + + @staticmethod + def empty_cpu(num_positions: int, + num_tokens_per_position: int) -> "LogprobsTensors": + """Create empty LogprobsTensors on CPU.""" + + logprob_token_ids = paddle.empty( + [num_positions, num_tokens_per_position], + dtype=paddle.int64).cpu() + logprobs = paddle.empty_like(logprob_token_ids, dtype=paddle.float32) + selected_token_ranks = paddle.empty([num_positions], + dtype=paddle.int64).cpu() + return LogprobsTensors( + logprob_token_ids=logprob_token_ids, + logprobs=logprobs, + selected_token_ranks=selected_token_ranks, + ) + + +@dataclass +class SamplerOutput: + """ + """ + + # [num_reqs, max_num_generated_tokens] + # Different requests can have different number of generated tokens. + # All requests are padded to max_num_generated_tokens. + # PLACEHOLDER_TOKEN_ID (-1 by default) is used for padding. + sampled_token_ids: paddle.Tensor + logprobs_tensors: Optional[LogprobsTensors] + @dataclass class ModelOutputData: """ diff --git a/fastdeploy/worker/vl_gpu_model_runner.py b/fastdeploy/worker/vl_gpu_model_runner.py index fb76859b9..fffe3b488 100644 --- a/fastdeploy/worker/vl_gpu_model_runner.py +++ b/fastdeploy/worker/vl_gpu_model_runner.py @@ -45,6 +45,7 @@ from fastdeploy.model_executor.models.ernie4_5_vl.modeling_resampler import ( ScatterOp, VariableResolutionResamplerModel) from fastdeploy.platforms import current_platform from fastdeploy.worker.forward_meta import ForwardMeta +from fastdeploy.worker.output import SamplerOutput from fastdeploy.worker.utils import check_safetensors_model from fastdeploy.worker.vl_model_runner_base import VLModelRunnerBase @@ -52,7 +53,7 @@ if current_platform.is_cuda() and current_platform.available(): from fastdeploy.model_executor.layers.utils import ( remove_padding, speculate_remove_padding) -from fastdeploy.model_executor.ops.gpu import (save_output, +from fastdeploy.model_executor.ops.gpu import (save_output, save_output_topk, set_stop_value_multi_ends, set_value_by_flags_and_idx, update_inputs) @@ -84,7 +85,7 @@ class GPUVLModelRunner(VLModelRunnerBase): self.mp_group = hcg.get_model_parallel_group() self.is_safetensors_model = check_safetensors_model( args.model_name_or_path) - + self.enable_logprob = args.enable_logprob model_path = os.path.dirname(args.model_name_or_path) args.llm_model_name_or_path = args.model_name_or_path if not self.is_safetensors_model: @@ -825,6 +826,7 @@ class GPUVLModelRunner(VLModelRunnerBase): min_dec_lens=self.share_inputs["min_dec_len"], bad_words_token_ids=self.share_inputs["bad_tokens"], eos_token_ids=self.share_inputs["eos_token_id"], + max_num_logprobs=20 if self.enable_logprob else None, ) def generate(self) -> None: @@ -846,17 +848,17 @@ class GPUVLModelRunner(VLModelRunnerBase): self.share_inputs["stop_flags"], ) # sampler & save_output - next_tokens = self.sampler(logits, self.sampling_metadata) + sampler_output = self.sampler(logits, self.sampling_metadata) if self.fd_config.parallel_config.tensor_parallel_degree > 1: - paddle.distributed.broadcast(next_tokens, 0) - self.post_process(next_tokens) + paddle.distributed.broadcast(sampler_output.sampled_token_ids, 0) + self.post_process(sampler_output) - def post_process(self, next_tokens: paddle.Tensor) -> None: + def post_process(self, sampler_output: SamplerOutput) -> None: """ post_process """ if self.share_inputs["enable_thinking"]: - exists_think_end = next_tokens == self.model_cfg.think_end_id + exists_think_end = sampler_output.sampled_token_ids == self.model_cfg.think_end_id paddle.assign( paddle.where( exists_think_end, @@ -872,12 +874,12 @@ class GPUVLModelRunner(VLModelRunnerBase): ), self.share_inputs["reasoning_index"]) stop_wo_think = ( - (next_tokens == self.share_inputs["eos_token_id"]) | + (sampler_output.sampled_token_ids == self.share_inputs["eos_token_id"]) | (self.share_inputs["reasoning_index"] == 0)) & ( self.share_inputs["need_think_end"] > 0) - next_tokens = paddle.where(stop_wo_think, + sampler_output.sampled_token_ids = paddle.where(stop_wo_think, self.model_cfg.think_end_id, - next_tokens) + sampler_output.sampled_token_ids) paddle.assign( paddle.where( stop_wo_think, @@ -900,7 +902,7 @@ class GPUVLModelRunner(VLModelRunnerBase): ) set_stop_value_multi_ends( - next_tokens, + sampler_output.sampled_token_ids, self.share_inputs["stop_flags"], self.share_inputs["seq_lens_this_time"], self.share_inputs["eos_token_id"], @@ -917,15 +919,25 @@ class GPUVLModelRunner(VLModelRunnerBase): self.share_inputs["seq_lens_decoder"], self.share_inputs["input_ids"], self.share_inputs["stop_nums"], - next_tokens, + sampler_output.sampled_token_ids, self.share_inputs["is_block_step"], ) - save_output( - next_tokens, - self.share_inputs["not_need_stop"], - self.rank, - False, # use_ep - ) + if sampler_output.logprobs_tensors is None: + save_output( + sampler_output.sampled_token_ids, + self.share_inputs["not_need_stop"], + self.rank, + False, # use_ep + ) + else: + save_output_topk( + sampler_output.sampled_token_ids, + sampler_output.logprobs_tensors.logprob_token_ids, + sampler_output.logprobs_tensors.logprobs, + sampler_output.logprobs_tensors.selected_token_ranks, + self.share_inputs["not_need_stop"], + self.rank, + ) def _cal_theortical_kvcache(self): """ diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 10d01fbae..3f1ff9aa1 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -42,6 +42,8 @@ def get_worker(fd_config: FDConfig, local_rank: int, rank: int) -> WorkerBase: """ get worker of different device """ + if fd_config.model_config.enable_logprob and not current_platform.is_cuda(): + raise NotImplementedError("Only CUDA platform supports logprob.") if current_platform.is_cuda(): from fastdeploy.worker.gpu_worker import GpuWorker return GpuWorker(fd_config=fd_config, local_rank=local_rank, rank=rank) @@ -550,6 +552,9 @@ def parse_args(): "'ipc_snapshot': load from disk snapshot of IPC weights, " "'meta': provide RL traing worker, no_weights_load" "'normal':normal load weight") + parser.add_argument("--enable_logprob", + action='store_true', + help="Enable output of token-level log probabilities.") args = parser.parse_args() return args @@ -771,6 +776,8 @@ def initialize_fd_config(config) -> FDConfig: "No quantization config found and use original weight and act dtype." ) + model_config.enable_logprob = config.enable_logprob + model_config.architectures = model_config_dict.get("architectures") # Update load config diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index 699b29fcb..c40bac70d 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -720,7 +720,7 @@ class XPUModelRunner(ModelRunnerBase): # 4. Compute logits, Sample logits = self.model.compute_logits(hiddden_states) - sampled_token_ids = self.sampler(logits, self.sampling_metadata) + sampler_output = self.sampler(logits, self.sampling_metadata) # 5. Speculative decode @@ -749,7 +749,7 @@ class XPUModelRunner(ModelRunnerBase): accept_tokens=None, accept_num=None, ) - xpu_post_process(sampled_token_ids=sampled_token_ids, + xpu_post_process(sampled_token_ids=sampler_output.sampled_token_ids, model_output=model_output_data) # 7. Updata 'infer_seed' and step_paddle()