From 6048ea37bda4596fe234c534ce8acb324d089951 Mon Sep 17 00:00:00 2001 From: qw86972190 <127910106+qw86972190@users.noreply.github.com> Date: Tue, 2 Dec 2025 15:32:28 +0800 Subject: [PATCH] [XPU]add enable_logprob (#5279) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [XPU]Update document * [XPU]Update documentation * [XPU]add enable_logprob * Fix code style issues * “doc” * “docs” * “doc” * Fix code style via pre-commit --------- Co-authored-by: root --- .../src/ops/get_output_msg_with_topk.cc | 106 +++++++++++++ .../src/ops/save_output_msg_with_topk.cc | 149 ++++++++++++++++++ fastdeploy/engine/args_utils.py | 4 +- .../xpu_pre_and_post_process.py | 33 +++- fastdeploy/output/token_processor.py | 10 +- fastdeploy/worker/worker_process.py | 4 +- fastdeploy/worker/xpu_model_runner.py | 23 ++- 7 files changed, 315 insertions(+), 14 deletions(-) create mode 100644 custom_ops/xpu_ops/src/ops/get_output_msg_with_topk.cc create mode 100644 custom_ops/xpu_ops/src/ops/save_output_msg_with_topk.cc diff --git a/custom_ops/xpu_ops/src/ops/get_output_msg_with_topk.cc b/custom_ops/xpu_ops/src/ops/get_output_msg_with_topk.cc new file mode 100644 index 000000000..f00313e87 --- /dev/null +++ b/custom_ops/xpu_ops/src/ops/get_output_msg_with_topk.cc @@ -0,0 +1,106 @@ +// Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include "paddle/extension.h" + +#ifndef PD_BUILD_STATIC_OP +#define PD_BUILD_STATIC_OP(name) PD_BUILD_OP(static_op_##name) +#endif + +#define MAX_BSZ 128 +#define K 5 + +struct msgdata { + long mtype; + int mtext[MAX_BSZ * (K + 1) + 2]; // stop_flag, bsz, tokens + float mtext_f[MAX_BSZ * (K + 1)]; // score + int mtext_ranks[MAX_BSZ]; // ranks +}; + +void GetOutputTopK(const paddle::Tensor& x, + const paddle::Tensor& scores, + const paddle::Tensor& ranks, + int k, + int64_t rank_id, + bool wait_flag) { + static struct msgdata msg_rcv; + int msg_queue_id = 1; + + if (const char* inference_msg_queue_id_env_p = + std::getenv("INFERENCE_MSG_QUEUE_ID")) { + std::string inference_msg_queue_id_env_str(inference_msg_queue_id_env_p); + int inference_msg_queue_id_from_env = + std::stoi(inference_msg_queue_id_env_str); +#ifdef GET_OUTPUT_DEBUG + std::cout << "Your INFERENCE_MSG_QUEUE_ID is: " + << inference_msg_queue_id_from_env << std::endl; +#endif + msg_queue_id = inference_msg_queue_id_from_env; + } + static key_t key = ftok("/dev/shm", msg_queue_id); + + static int msgid = msgget(key, IPC_CREAT | 0666); +#ifdef GET_OUTPUT_DEBUG + std::cout << "get_output_key: " << key << std::endl; + std::cout << "get_output msgid: " << msgid << std::endl; +#endif + + int64_t* out_data = const_cast(x.data()); + float* scores_data = const_cast(scores.data()); + int64_t* ranks_data = const_cast(ranks.data()); + + size_t msg_len = (MAX_BSZ * (K + 1) + 2) * sizeof(int) + + (MAX_BSZ * (K + 1)) * sizeof(float) + MAX_BSZ * sizeof(int); + + int ret = -1; + if (!wait_flag) { + ret = msgrcv(msgid, &msg_rcv, msg_len, 0, IPC_NOWAIT); + } else { + ret = msgrcv(msgid, &msg_rcv, msg_len, 0, 0); + } + + if (ret == -1) { + out_data[0] = -2; + out_data[1] = 0; + return; + } + + int bsz = msg_rcv.mtext[1]; + out_data[0] = (int64_t)msg_rcv.mtext[0]; + out_data[1] = (int64_t)msg_rcv.mtext[1]; + + for (int i = 0; i < bsz; i++) { + for (int j = 0; j < k + 1; j++) { + const int64_t offset = i * (K + 1) + j; + out_data[offset + 2] = (int64_t)msg_rcv.mtext[offset + 2]; + scores_data[offset] = msg_rcv.mtext_f[offset]; + } + ranks_data[i] = (int64_t)msg_rcv.mtext_ranks[i]; + } + return; +} + +PD_BUILD_STATIC_OP(get_output_topk) + .Inputs({"x", "scores", "ranks"}) + .Attrs({"k: int", "rank_id: int64_t", "wait_flag: bool"}) + .Outputs({"x_out", "scores_out", "ranks_out"}) + .SetInplaceMap({{"x", "x_out"}, + {"scores", "scores_out"}, + {"ranks", "ranks_out"}}) + .SetKernelFn(PD_KERNEL(GetOutputTopK)); diff --git a/custom_ops/xpu_ops/src/ops/save_output_msg_with_topk.cc b/custom_ops/xpu_ops/src/ops/save_output_msg_with_topk.cc new file mode 100644 index 000000000..596eb4763 --- /dev/null +++ b/custom_ops/xpu_ops/src/ops/save_output_msg_with_topk.cc @@ -0,0 +1,149 @@ +// Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include "paddle/extension.h" + +#ifndef PD_BUILD_STATIC_OP +#define PD_BUILD_STATIC_OP(name) PD_BUILD_OP(static_op_##name) +#endif + +#define MAX_BSZ 128 +#define K 5 +// #define SAVE_WITH_OUTPUT_DEBUG + +struct msgdata { + long mtype; + int mtext[MAX_BSZ * (K + 1) + 2]; // stop_flag, bsz, tokens + float mtext_f[MAX_BSZ * (K + 1)]; // score + int mtext_ranks[MAX_BSZ]; // ranks +}; + +void SaveOutMmsgTopK(const paddle::Tensor& x, + const paddle::Tensor& logprob_token_ids, // [bsz, k+1] + const paddle::Tensor& logprob_scores, // [bsz, k+1] + const paddle::Tensor& ranks, + const paddle::Tensor& not_need_stop, + int64_t rank_id) { + if (rank_id > 0) { + return; + } + auto x_cpu = x.copy_to(paddle::CPUPlace(), false); + auto logprob_token_ids_cpu = + logprob_token_ids.copy_to(paddle::CPUPlace(), false); + auto logprob_scores_cpu = logprob_scores.copy_to(paddle::CPUPlace(), false); + auto ranks_cpu = ranks.copy_to(paddle::CPUPlace(), false); + int64_t* x_data = x_cpu.data(); + int64_t* logprob_token_ids_data = logprob_token_ids_cpu.data(); + float* logprob_scores_data = logprob_scores_cpu.data(); + int64_t* ranks_data = ranks_cpu.data(); + static struct msgdata msg_sed; + int msg_queue_id = 1; + if (const char* inference_msg_queue_id_env_p = + std::getenv("INFERENCE_MSG_QUEUE_ID")) { + std::string inference_msg_queue_id_env_str(inference_msg_queue_id_env_p); + int inference_msg_queue_id_from_env = + std::stoi(inference_msg_queue_id_env_str); + msg_queue_id = inference_msg_queue_id_from_env; +#ifdef SAVE_WITH_OUTPUT_DEBUG + std::cout << "Your INFERENCE_MSG_QUEUE_ID is: " + << inference_msg_queue_id_from_env << std::endl; +#endif + } else { +#ifdef SAVE_WITH_OUTPUT_DEBUG + std::cout << "Failed to got INFERENCE_MSG_QUEUE_ID at env, use default." + << std::endl; +#endif + } + int inference_msg_id_from_env = 1; + if (const char* inference_msg_id_env_p = std::getenv("INFERENCE_MSG_ID")) { + std::string inference_msg_id_env_str(inference_msg_id_env_p); + inference_msg_id_from_env = std::stoi(inference_msg_id_env_str); + if (inference_msg_id_from_env == 2) { + // 2 and -2 is preserve for no-output indication. + throw std::runtime_error( + " INFERENCE_MSG_ID cannot be 2, please use other number."); + } + if (inference_msg_id_from_env < 0) { + throw std::runtime_error( + " INFERENCE_MSG_ID cannot be negative, please use other " + "number."); + } +#ifdef SAVE_WITH_OUTPUT_DEBUG + std::cout << "Your INFERENCE_MSG_ID is: " << inference_msg_id_from_env + << std::endl; +#endif + } else { +#ifdef SAVE_WITH_OUTPUT_DEBUG + std::cout << "Failed to got INFERENCE_MSG_ID at env, use (int)1 as default." + << std::endl; +#endif + } + static key_t key = ftok("/dev/shm", msg_queue_id); + static int msgid = msgget(key, IPC_CREAT | 0666); +#ifdef SAVE_WITH_OUTPUT_DEBUG + std::cout << "save_output_key: " << key << std::endl; + std::cout << "save msgid: " << msgid << std::endl; +#endif + msg_sed.mtype = 1; + bool not_need_stop_data = not_need_stop.data()[0]; + msg_sed.mtext[0] = not_need_stop_data ? inference_msg_id_from_env + : -inference_msg_id_from_env; + int bsz = x.shape()[0]; + int max_num_logprobs = logprob_token_ids.shape()[1]; + msg_sed.mtext[1] = bsz; + for (int i = 0; i < bsz; i++) { + for (int j = 0; j < K + 1; j++) { + const int64_t offset = i * (K + 1) + j; + if (j == 0) { + msg_sed.mtext[offset + 2] = (int)x_data[i]; + msg_sed.mtext_f[offset] = logprob_scores_data[i * max_num_logprobs + j]; + } else if (j < max_num_logprobs) { + msg_sed.mtext[offset + 2] = + (int)logprob_token_ids_data[i * max_num_logprobs + j]; + msg_sed.mtext_f[offset] = logprob_scores_data[i * max_num_logprobs + j]; + } else { + msg_sed.mtext[offset + 2] = -1; + msg_sed.mtext_f[offset] = 0.0; + } + } + msg_sed.mtext_ranks[i] = (int)ranks_data[i]; + } +#ifdef SAVE_WITH_OUTPUT_DEBUG + std::cout << "msg data: "; + for (int i = 0; i < bsz; i++) { + std::cout << " " << (int)x_data[i]; + } + std::cout << std::endl; +#endif + + size_t msg_len = (MAX_BSZ * (K + 1) + 2) * sizeof(int) + + (MAX_BSZ * (K + 1)) * sizeof(float) + MAX_BSZ * sizeof(int); + + if ((msgsnd(msgid, &msg_sed, msg_len, 0)) == -1) { + printf("full msg buffer\n"); + } + return; +} + +PD_BUILD_STATIC_OP(save_output_topk) + .Inputs({"x", "topk_ids", "logprob_scores", "ranks", "not_need_stop"}) + .Attrs({"rank_id: int64_t"}) + .Outputs({"x_out"}) + .SetInplaceMap({{"x", "x_out"}}) + .SetKernelFn(PD_KERNEL(SaveOutMmsgTopK)); diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 3fec68001..b0a50f94a 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -503,8 +503,8 @@ class EngineArgs: # if self.dynamic_load_weight: # self.enable_prefix_caching = False if self.enable_logprob: - if not current_platform.is_cuda(): - raise NotImplementedError("Only CUDA platform supports logprob.") + if not current_platform.is_cuda() and not current_platform.is_xpu(): + raise NotImplementedError("Only CUDA and XPU platforms support logprob.") if self.speculative_config is not None and self.logprobs_mode.startswith("processed"): raise NotImplementedError("processed_logprobs not support in speculative.") if self.speculative_config is not None and self.max_logprobs == -1: diff --git a/fastdeploy/model_executor/xpu_pre_and_post_process.py b/fastdeploy/model_executor/xpu_pre_and_post_process.py index 861b3b533..2673af276 100644 --- a/fastdeploy/model_executor/xpu_pre_and_post_process.py +++ b/fastdeploy/model_executor/xpu_pre_and_post_process.py @@ -20,6 +20,7 @@ import paddle from fastdeploy import envs from fastdeploy.model_executor.forward_meta import XPUForwardMeta +from fastdeploy.model_executor.layers.sample.sampler import Sampler from fastdeploy.platforms import current_platform from fastdeploy.worker.output import ModelOutputData @@ -32,6 +33,7 @@ if current_platform.is_xpu(): limit_thinking_content_length_v1, limit_thinking_content_length_v2, save_output, + save_output_topk, set_stop_value_multi_ends, speculate_clear_accept_nums, speculate_get_output_padding_offset, @@ -210,7 +212,7 @@ def xpu_process_output( def xpu_post_process_normal( - sampled_token_ids: paddle.Tensor, + sampler_output: Sampler, model_output: ModelOutputData, share_inputs: Dict[str, paddle.Tensor], block_size: int = 64, @@ -220,6 +222,8 @@ def xpu_post_process_normal( ) -> None: """ """ + sampled_token_ids = sampler_output.sampled_token_ids + if think_end_id > 0: limit_strategy = envs.FD_LIMIT_THINKING_CONTENT_TRUNCATE_STR max_think_lens = share_inputs["max_think_lens"] @@ -310,12 +314,27 @@ def xpu_post_process_normal( # 3. Transmit the model's output and stop generation signal via message queue. # In the future, we will abandon this approach. if not skip_save_output: - save_output( - sampled_token_ids, - model_output.not_need_stop, - model_output.mp_rank, - False, # use_ep - ) + if sampler_output.logprobs_tensors is None: + save_output( + sampled_token_ids, + model_output.not_need_stop, + model_output.mp_rank, + False, # use_ep + ) + else: + if save_output_topk is None: + raise ImportError( + "save_output_topk operator is not available. " + "Please rebuild the XPU operators with the new get_output_msg_with_topk.cc and save_output_msg_with_topk.cc files." + ) + save_output_topk( + sampled_token_ids, + sampler_output.logprobs_tensors.logprob_token_ids, + sampler_output.logprobs_tensors.logprobs, + sampler_output.logprobs_tensors.selected_token_ranks, + model_output.not_need_stop, + model_output.mp_rank, + ) def xpu_post_process_specualate( diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index cd6ee320c..67ad1ce54 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -45,11 +45,16 @@ from fastdeploy.utils import llm_logger, spec_logger from fastdeploy.worker.output import LogprobsLists RECOVERY_STOP_SIGNAL = -3 -MAX_BSZ = 512 -K = 20 MAX_DRAFT_TOKENS = 6 SPECULATE_MAX_BSZ = 256 +if current_platform.is_xpu(): + MAX_BSZ = 128 + K = 5 +else: + MAX_BSZ = 512 + K = 20 + class TokenProcessor: """ @@ -343,6 +348,7 @@ class TokenProcessor: from fastdeploy.model_executor.ops.xpu import ( get_output, get_output_ep, + get_output_topk, speculate_get_output, ) elif current_platform.is_iluvatar(): diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 67d58e7f8..64572734b 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -69,8 +69,8 @@ def get_worker(fd_config: FDConfig, local_rank: int, rank: int) -> WorkerBase: """ get worker of different device """ - if fd_config.model_config.enable_logprob and not current_platform.is_cuda(): - raise NotImplementedError("Only CUDA platform supports logprob.") + if fd_config.model_config.enable_logprob and not current_platform.is_cuda() and not current_platform.is_xpu(): + raise NotImplementedError("Only CUDA and XPU platforms support logprob.") if current_platform.is_dcu(): from fastdeploy.worker.dcu_worker import DcuWorker diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index 64c8f8f37..c5ade2af7 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -81,6 +81,11 @@ class XPUModelRunner(ModelRunnerBase): self.local_rank = local_rank self.device_id = device_id self.enable_early_stop = self.fd_config.early_stop_config.enable_early_stop + self.enable_logprob = fd_config.model_config.enable_logprob + self.ori_vocab_size = self.fd_config.model_config.ori_vocab_size + self.max_logprobs = ( + self.ori_vocab_size if fd_config.model_config.max_logprobs == -1 else fd_config.model_config.max_logprobs + ) # VL model config: if self.enable_mm: @@ -300,6 +305,10 @@ class XPUModelRunner(ModelRunnerBase): self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0) self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0) self.share_inputs["presence_score"][idx : idx + 1] = request.get("presence_penalty", 0.0) + self.share_inputs["temp_scaled_logprobs"][idx : idx + 1] = request.get("temp_scaled_logprobs", False) + self.share_inputs["top_p_normalized_logprobs"][idx : idx + 1] = request.get( + "top_p_normalized_logprobs", False + ) self.share_inputs["min_dec_len"][idx : idx + 1] = request.get("min_tokens", 1) self.share_inputs["max_dec_len"][idx : idx + 1] = request.get( @@ -453,6 +462,12 @@ class XPUModelRunner(ModelRunnerBase): self.share_inputs["presence_score"][idx : idx + 1] = get_attr_from_request( request, "presence_penalty", 0.0 ) + self.share_inputs["temp_scaled_logprobs"][idx : idx + 1] = get_attr_from_request( + request, "temp_scaled_logprobs", False + ) + self.share_inputs["top_p_normalized_logprobs"][idx : idx + 1] = get_attr_from_request( + request, "top_p_normalized_logprobs", False + ) self.share_inputs["min_dec_len"][idx : idx + 1] = request.get("min_tokens", 1) self.share_inputs["max_dec_len"][idx : idx + 1] = request.get( "max_tokens", self.model_config.max_model_len @@ -547,6 +562,8 @@ class XPUModelRunner(ModelRunnerBase): self.share_inputs["presence_score"] = paddle.full( [max_num_seqs, 1], self.model_config.presence_score, dtype="float32" ) + self.share_inputs["temp_scaled_logprobs"] = paddle.full([max_num_seqs, 1], False, dtype="bool") + self.share_inputs["top_p_normalized_logprobs"] = paddle.full([max_num_seqs, 1], False, dtype="bool") self.share_inputs["min_dec_len"] = paddle.full([max_num_seqs, 1], self.model_config.min_length, dtype="int64") self.share_inputs["max_dec_len"] = paddle.full( @@ -766,8 +783,12 @@ class XPUModelRunner(ModelRunnerBase): min_dec_lens=self.share_inputs["min_dec_len"], bad_words_token_ids=self.share_inputs["bad_tokens"][:, :max_bad_tokens_len], eos_token_ids=self.share_inputs["eos_token_id"], + max_num_logprobs=self.max_logprobs if self.enable_logprob else None, enable_early_stop=self.enable_early_stop, stop_flags=self.share_inputs["stop_flags"], + temp_scaled_logprobs=self.share_inputs["temp_scaled_logprobs"], + top_p_normalized_logprobs=self.share_inputs["top_p_normalized_logprobs"], + share_inputs=self.share_inputs, ) def load_model(self) -> None: @@ -1137,7 +1158,7 @@ class XPUModelRunner(ModelRunnerBase): xpu_post_process_specualate(model_output_data, False, is_dummy_run) else: xpu_post_process_normal( - sampled_token_ids=sampler_output.sampled_token_ids, + sampler_output=sampler_output, model_output=model_output_data, share_inputs=self.share_inputs, block_size=self.cache_config.block_size,