[Feature] support pooling model runner (#4590)
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled

* support qwen3-embedding

* support qwen3-embedding-0.6b

* fix

* fix bug

* fix test_return_token_ids.py and update enable_thinking

* fix mtp dummy_run

* merge develop

* fix np.float32

* delete FD_DISABLE_CHUNKED_PREFILL and FD_USE_GET_SAVE_OUTPUT_V1

* delete and build_stream_transfer_data

* fix test_update_v1:

* fix

* fix

* update dummy_run post_process

* delete test_update_v1

* fix

* fix dummy_run

* fix model_path

* fix model_path

* fix dummy_run
This commit is contained in:
lizexu123
2025-10-31 22:32:05 +08:00
committed by GitHub
parent acef624049
commit 4ac6de9a3c
7 changed files with 716 additions and 236 deletions

View File

@@ -15,7 +15,7 @@
"""
import queue
from typing import Dict, Optional
from typing import Dict, List, Optional, Union
import numpy as np
import paddle
@@ -85,6 +85,7 @@ else:
speculate_limit_thinking_content_length_v2,
)
from fastdeploy.output.pooler import PoolerOutput, PoolingSequenceGroupOutput
from fastdeploy.output.stream_transfer_data import DecoderState, StreamTransferData
from fastdeploy.worker.output import ModelOutputData, ModelRunnerOutput, SamplerOutput
@@ -248,17 +249,31 @@ def pre_process(
)
def _build_stream_transfer_data(output_tokens: np.ndarray):
def _build_stream_transfer_data(output_tokens: np.ndarray, pooler_outputs: List[PoolingSequenceGroupOutput] = None):
"""Split output_tokens and output"""
output_tokens = output_tokens.reshape([-1]).numpy()
output_tokens_lists = np.split(output_tokens, output_tokens.shape[0])
stream_transfer_datas = []
for bid, output_token_per_sample in enumerate(output_tokens_lists):
stream_transfer_data = StreamTransferData(
decoder_state=DecoderState.TEXT, tokens=output_token_per_sample, batch_id=bid
)
stream_transfer_datas.append(stream_transfer_data)
if output_tokens is not None:
output_tokens = output_tokens.reshape([-1]).numpy()
output_tokens_lists = np.split(output_tokens, output_tokens.shape[0])
for bid, output_token_per_sample in enumerate(output_tokens_lists):
stream_transfer_data = StreamTransferData(
decoder_state=DecoderState.TEXT, tokens=output_token_per_sample, batch_id=bid
)
stream_transfer_datas.append(stream_transfer_data)
elif pooler_outputs is not None:
for bid, pooler_output in enumerate(pooler_outputs):
if pooler_output.dtype == paddle.bfloat16:
pooler_output = pooler_output.astype("float32")
pooler_output = pooler_output.numpy()
stream_transfer_data = StreamTransferData(
decoder_state=DecoderState.TEXT, pooler_output=pooler_output, batch_id=bid
)
stream_transfer_datas.append(stream_transfer_data)
return stream_transfer_datas
@@ -470,7 +485,7 @@ def post_process_specualate(
def post_process(
sampler_output: SamplerOutput,
sampler_or_pooler_output: Union[SamplerOutput, PoolerOutput],
model_output: ModelOutputData,
share_inputs: Dict[str, paddle.Tensor],
block_size: int = 64,
@@ -482,28 +497,40 @@ def post_process(
line_break_id: int = -1,
) -> None:
"""Post-processing steps after completing a single token generation."""
if speculative_decoding:
post_process_specualate(
sampler_output,
model_output,
share_inputs,
save_each_rank,
skip_save_output,
think_end_id,
line_break_id,
)
else:
post_process_normal(
sampler_output,
if isinstance(sampler_or_pooler_output, PoolerOutput):
post_process_pooling(
sampler_or_pooler_output,
model_output,
share_inputs,
block_size,
save_each_rank,
skip_save_output,
async_output_queue,
think_end_id,
line_break_id,
)
else:
if speculative_decoding:
post_process_specualate(
sampler_or_pooler_output,
model_output,
share_inputs,
save_each_rank,
skip_save_output,
think_end_id,
line_break_id,
)
else:
post_process_normal(
sampler_or_pooler_output,
model_output,
share_inputs,
block_size,
save_each_rank,
skip_save_output,
async_output_queue,
think_end_id,
line_break_id,
)
def step_cuda(
@@ -787,3 +814,59 @@ def rebuild_padding(
else:
raise RuntimeError("Not supported platform")
return hidden_states
def post_process_pooling(
pooler_output: PoolerOutput,
model_output: ModelOutputData,
share_inputs: Dict[str, paddle.Tensor],
block_size: int = 64,
save_each_rank: bool = False,
skip_save_output: bool = False,
async_output_queue: queue.Queue = None,
) -> None:
paddle.assign(
paddle.where(
model_output.stop_flags,
model_output.step_idx,
model_output.step_idx + 1,
),
model_output.step_idx,
)
length_cond = paddle.greater_equal(model_output.step_idx, model_output.max_dec_len)
paddle.assign(
paddle.logical_or(model_output.stop_flags, length_cond),
model_output.stop_flags,
)
with paddle.framework._no_check_dy2st_diff():
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
dummy_sampled_tokens = paddle.full_like(model_output.next_tokens, -1, dtype="int64")
paddle.assign(
paddle.ones_like(model_output.stop_flags, dtype="bool"),
model_output.stop_flags,
)
update_inputs_v1(
model_output.stop_flags,
model_output.not_need_stop,
model_output.seq_lens_this_time,
model_output.seq_lens_encoder,
model_output.seq_lens_decoder,
share_inputs["step_seq_lens_decoder"],
share_inputs["prompt_lens"],
dummy_sampled_tokens,
model_output.input_ids,
share_inputs["block_tables"],
model_output.stop_nums,
model_output.next_tokens,
model_output.is_block_step,
block_size,
)
if not skip_save_output:
if save_each_rank or model_output.mp_rank == 0:
output = _build_stream_transfer_data(output_tokens=None, pooler_outputs=pooler_output.outputs)
async_output_queue.put(output)