[Feature] support reward model (#5301)

* Your commit message here

* add test

* update develop

* support reward

* support enable_chunk_prefill

* support bingfa

* support convert is reward

* update test

* delete print

* fix enable_thinking

* add document

* fix place

* fix test

* fix

* support enable_prefix_caching

* add no-enable_prefix-caching test

* fix

* support enable_prefix_caching

* delete print

* fix document

* fix

* fix test

* fix document and delete chinese

* udpate

* enable_thinking

* fix test
This commit is contained in:
lizexu123
2025-12-02 14:55:31 +08:00
committed by GitHub
parent 2e1680838f
commit c563eca791
17 changed files with 636 additions and 58 deletions

View File

@@ -0,0 +1,175 @@
[简体中文](../zh/features//pooling_models.md)
# Pooling Models
FastDeploy also supports pooling models, such as embedding models.
In FastDeploy, pooling models implement the `FdModelForPooling` interface.
These models use a `Pooler` to extract the final hidden states of the input
before returning them.
## Configuration
### Model Runner
Run a model in pooling mode via the option `--runner pooling`.
!!! tip<br>
There is no need to set this option in the vast majority of cases as Fastdeploy can automatically
detect the appropriate model runner via `--runner auto`.
### Model Conversion
FastDeploy can adapt models for various pooling tasks via the option `--convert <type>`.
If `--runner pooling` has been set (manually or automatically) but the model does not implement the
`FdModelForPooling` interface,
vLLM will attempt to automatically convert the model according to the architecture names
shown in the table below.
| Architecture | `--convert` | Supported pooling tasks |
|-------------------------------------------------|-------------|---------------------------------------|
| `*ForTextEncoding`, `*EmbeddingModel`, `*Model` `*ForProcessRewardModel` | `embed` | `embed` |
!!! tip<br>
You can explicitly set `--convert <type>` to specify how to convert the model.
### Pooler Configuration
#### Predefined models
If the `Pooler` defined by the model accepts `pooler_config`,
you can override some of its attributes via the `--pooler-config` option.
#### Converted models
If the model has been converted via `--convert` (see above),
the pooler assigned to each task has the following attributes by default:
| Task | Pooling Type | Normalization | Softmax |
|------------|--------------|---------------|---------|
| `embed` | `LAST` | ✅︎ | ❌ |
When loading [Sentence Transformers](https://huggingface.co/sentence-transformers) models,
its Sentence Transformers configuration file (`modules.json`) takes priority over the model's defaults and It can also be specified during model network construction through @default_pooling_type("LAST").
##### Pooling Type
1.LastPool(PoolingType.LAST)
Purpose:Extracts the hidden state of the last token in each sequence
2.AllPool(PoolingType.ALL)
Purpose:Returns the hidden states of all tokens in each sequence
3.CLSPool(PoolingType.CLS)
Purpose:Returns the hidden state of the first token in each sequence (CLS token)
4.MeanPool(PoolingType.MEAN)
Purpose:Computes the average of all token hidden states in each sequence
## Online Serving
FastDeploy's OpenAI-compatible server provides API endpoints and custom reward interfaces.
[Embeddings API], supports text and multi-modal inputs
[Reward API], scores specific content
### Embedding Model:
```python
model_path=Qwen/Qwen3-Embedding-0.6B
python -m fastdeploy.entrypoints.openai.api_server --model ${model_path} \
--max-num-seqs 256 --max-model-len 32768 \
--port 9412 --engine-worker-queue-port 7142 \
--metrics-port 7211 --tensor-parallel-size 1 \
--gpu-memory-utilization 0.9 \
--runner pooling
```
Request Methods:
A. EmbeddingCompletionRequest Example (Standard Text Input)
```bash
curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \
-H 'Content-Type: application/json' \
-d '{
"model": "text-embedding-chat-model",
"input": [
"This is a sentence for pooling embedding.",
"Another input text."
],
"user": "test_client"
}'
```
B. EmbeddingChatRequest Example (Message Sequence Input)
```bash
curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \
-H 'Content-Type: application/json' \
-d '{
"model": "text-embedding-chat-model",
"messages": [
{"role": "user", "content": "Generate embedding for user query."}
]
}'
```
### Pooling Model and reward score
```python
model_path=RM_v1008
python -m fastdeploy.entrypoints.openai.api_server \
--model ${model_path} \
--max-num-seqs 256 \
--max-model-len 8192 \
--port 13351 \
--engine-worker-queue-port 7562 \
--metrics-port 7531 \
--tensor-parallel-size 8 \
--gpu-memory-utilization 0.9 \
--runner pooling \
--convert embed
```
Request Method: ChatRewardRequest
```bash
curl --location 'http://xxxx/v1/chat/reward' \
--header 'Content-Type: application/json' \
--data '{
"model": "",
"messages": [
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": "https://xxx/a.png"
}
}
]
},
{
"role": "assistant",
"content": [
{
"type": "text",
"text": "图里有几个人"
}
]
}
],
"user": "user-123",
"chat_template": null,
"chat_template_kwargs": {
"custom_var": "value"
},
"mm_processor_kwargs": {
"image_size": 224
}
}'
```

View File

@@ -0,0 +1,168 @@
[English](../../features/pooling_models.md)
# Pooling Models
FastDeploy也支持pooling模型例如嵌入(embedding)模型。
在FastDeploy中,池化模型通过`FdModelForPooling`接口。这些模型使用一个`Pooler`来提取输入的最终隐藏状态并返回。
## Configuration
### Model Runner
通过`--runner pooling`选项以池化模型运行模型。
!!! 提示<br>
在绝大多数情况下无需手动设置该选项因此Fastdeploy可以通过--runner auto(默认值)自动检测合适的runner。
### Model Conversion
如果模型未实现FdModelForPooling接口但你希望以池化模式运行FastDeploy可通过`--convert <type>`自动转换模型。
当设置了`--runner pooling`(手动或自动)但模型不符合接口时FastDeploy会根据模型架构名称自动转换:
| Architecture | `--convert` | 支持的池化类型 |
|-------------------------------------------------|-------------|---------------------------------------|
| `*ForTextEncoding`, `*EmbeddingModel`, `*Model` `**ForProcessRewardModel` | `embed` | `embed` |
!!! 提示<br>
你可以显示设置`--convert <type>`来制定模型转换方式。
### Pooler Configuration
#### Predefined models
如果模型定义的`Pooler`接受pooler_config你可以通过--pooler_config覆盖部分属性。
#### Converted models
如果模型通过--convert转换各任务默认的池化配置如下:
| Task | Pooling Type | Normalization | Softmax |
|------------|--------------|---------------|---------|
| `embed` | `LAST` | ✅︎ | ❌ |
加载[Sentence Transformers](https://huggingface.co/sentence-transformers)模型时,其`modules.json`配置优于默认值,也可以通过@default_pooling_type("LAST")在模型组网时指定。
#### Pooling Type
1.LastPool(PoolingType.LAST)
作用:提取每个序列的最后一个token的隐藏状态
2.AllPool(PoolingType.ALL)
作用:返回每个序列的所有token的隐藏状态
3.CLSPool(PoolingType.CLS)
作用:返回每个序列的第一个token(CLS token)的隐藏状态
4.MeanPool(PoolingType.MEAN)
作用:计算每个序列所有token隐藏状态的平均值
## Online Serving
FastDeploy的OpenAI兼容服务器提供了API的端点和自定义的reward接口
- `Embeddings API`,支持文本和多模态输入
- `Reward API`,给指定的内容打分
### Embedding模型:
```python
model_path=Qwen/Qwen3-Embedding-0.6B
python -m fastdeploy.entrypoints.openai.api_server --model ${model_path} \
--max-num-seqs 256 --max-model-len 32768 \
--port 9412 --engine-worker-queue-port 7142 \
--metrics-port 7211 --tensor-parallel-size 1 \
--gpu-memory-utilization 0.9 \
--runner pooling \
```
请求方式:<br>
A. EmbeddingCompletionRequest 示例(标准文本输入)
```bash
curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \
-H 'Content-Type: application/json' \
-d '{
"model": "text-embedding-chat-model",
"input": [
"This is a sentence for pooling embedding.",
"Another input text."
],
"user": "test_client"
}'
```
B. EmbeddingChatRequest 示例(消息序列输入)
```bash
curl -X POST 'YOUR_SERVICE_URL/v1/embeddings' \
-H 'Content-Type: application/json' \
-d '{
"model": "text-embedding-chat-model",
"messages": [
{"role": "user", "content": "Generate embedding for user query."}
]
}'
```
### Pooling模型和打分机制
```python
model_path=RM_v1008
python -m fastdeploy.entrypoints.openai.api_server \
--model ${model_path} \
--max-num-seqs 256 \
--max-model-len 8192 \
--port 13351 \
--engine-worker-queue-port 7562 \
--metrics-port 7531 \
--tensor-parallel-size 8 \
--gpu-memory-utilization 0.9 \
--runner pooling \
--convert embed \
```
请求方式: ChatRewardRequest
```bash
curl --location 'http://xxxx/v1/chat/reward' \
--header 'Content-Type: application/json' \
--data '{
"model": "",
"messages": [
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": "https://xxx/a.png"
}
}
]
},
{
"role": "assistant",
"content": [
{
"type": "text",
"text": "图里有几个人"
}
]
}
],
"user": "user-123",
"chat_template": null,
"chat_template_kwargs": {
"custom_var": "value"
},
"mm_processor_kwargs": {
"image_size": 224
}
}'
```

View File

@@ -299,7 +299,7 @@ class ModelConfig:
self.tensor_parallel_size = self.infer_model_mp_num
del self.infer_model_mp_num
if hasattr(self, "num_hidden_layers"):
if hasattr(self, "num_hidden_layers") and self.runner != "pooling":
if hasattr(self, "remove_tail_layer"):
if self.remove_tail_layer is True:
self.num_hidden_layers -= 1

View File

@@ -164,7 +164,7 @@ class PoolingParams(
self.softmax = True
elif self.task == "reward":
if self.normalize is None:
self.normalize = True
self.normalize = False
else:
raise ValueError(f"Unknown pooling task: {self.task}")

View File

@@ -187,6 +187,7 @@ class Request:
pooling_params = PoolingParams.from_dict(d["pooling_params"])
else:
sampling_params = SamplingParams.from_dict(d)
if (
isinstance(d.get("multimodal_inputs"), dict)
and isinstance(d["multimodal_inputs"].get("mm_positions"), list)
@@ -202,7 +203,6 @@ class Request:
data_processor_logger.error(
f"Convert mm_positions to ImagePosition error: {e}, {str(traceback.format_exc())}"
)
return cls(
request_id=d["request_id"],
prompt=d.get("prompt"),

View File

@@ -920,16 +920,6 @@ class EmbeddingChatRequest(BaseModel):
user: Optional[str] = None
truncate_prompt_tokens: Optional[Annotated[int, Field(ge=-1)]] = None
# --8<-- [start:chat-embedding-extra-params]
add_generation_prompt: bool = Field(
default=False,
description=(
"If true, the generation prompt will be added to the chat template. "
"This is a parameter used by chat template in tokenizer config of the "
"model."
),
)
add_special_tokens: bool = Field(
default=False,
description=(
@@ -1013,9 +1003,9 @@ PoolingChatRequest = EmbeddingChatRequest
class ChatRewardRequest(BaseModel):
model: Optional[str] = None # 指定模型,例如 "default" 或支持 embedding 的 chat 模型
messages: Union[List[Any], List[int]] # 聊天消息列表(必选)
user: Optional[str] = None # 调用方标识符
model: Optional[str] = None
messages: Union[List[Any], List[int]]
user: Optional[str] = None
dimensions: Optional[int] = None
truncate_prompt_tokens: Optional[Annotated[int, Field(ge=-1)]] = None
@@ -1084,15 +1074,15 @@ class ChatRewardRequest(BaseModel):
class ChatRewardData(BaseModel):
index: Optional[int] = None # 数据索引(可选)
object: str = "reward" # 固定为 "reward"
score: List[float] # reward 分数(浮点数列表)
index: Optional[int] = None
object: str = "reward"
score: List[float]
class ChatRewardResponse(BaseModel):
id: str # 响应 ID例如 chat-reward-<uuid>
object: str = "object" # 固定为 "object"
created: int # 创建时间Unix 时间戳)
model: str # 使用的模型名
data: List[ChatRewardData] # reward 结果列表
usage: Optional[UsageInfo] = None # Token 使用情况
id: str
object: str = "object"
created: int
model: str
data: List[ChatRewardData]
usage: Optional[UsageInfo] = None

View File

@@ -256,7 +256,6 @@ class ZmqOpenAIServing(OpenAIServing):
chat_template_kwargs.update(
{
"chat_template": request_dict.get("chat_template"),
"add_generation_prompt": request_dict.get("add_generation_prompt"),
"add_stop_sequences": request_dict.get("add_stop_sequences"),
}
)

View File

@@ -15,12 +15,14 @@
"""
from dataclasses import dataclass
from typing import Optional
from typing import Optional, Union
import paddle
from fastdeploy.engine.pooling_params import PoolingParams
Device = Union[paddle.CPUPlace, paddle.CUDAPlace, paddle.XPUPlace]
@dataclass
class PoolingCursor:
@@ -60,21 +62,21 @@ class PoolingMetadata:
pooling_cursor=None if self.pooling_cursor is None else self.pooling_cursor[indices],
)
def build_pooling_cursor(self, num_scheduled_tokens: list[int], device: str):
def build_pooling_cursor(self, num_scheduled_tokens: list[int], device: Device):
self.pooling_cursor = build_pooling_cursor(num_scheduled_tokens, self.prompt_lens, device)
def build_pooling_cursor(num_scheduled_tokens: list[int], prompt_lens: paddle.Tensor, device: str):
def build_pooling_cursor(num_scheduled_tokens: list[int], prompt_lens: paddle.Tensor, device: Device):
assert len(prompt_lens) == len(num_scheduled_tokens)
n_seq = len(num_scheduled_tokens)
index = list(range(n_seq))
num_scheduled_tokens = paddle.to_tensor(num_scheduled_tokens)
num_scheduled_tokens = paddle.to_tensor(num_scheduled_tokens, dtype="int64")
cumsum = paddle.zeros([n_seq + 1], dtype="int64")
paddle.cumsum(num_scheduled_tokens, axis=0, out=cumsum[1:])
if device == "gpu":
cumsum_device = cumsum.cuda()
if isinstance(device, paddle.CUDAPlace):
cumsum_device = paddle.assign(cumsum).cuda(device.get_device_id())
else:
cumsum_device = cumsum
return PoolingCursor(

View File

@@ -78,7 +78,6 @@ def get_pooling_params(pooling_metadata: PoolingMetadata) -> list[PoolingParams]
def get_tasks(pooling_metadata: PoolingMetadata) -> list[PoolingTask]:
pooling_params = get_pooling_params(pooling_metadata)
tasks: list[PoolingTask] = [task for pooling_param in pooling_params if (task := pooling_param.task) is not None]
assert len(pooling_params) == len(tasks)
@@ -108,7 +107,7 @@ class Pooler(nn.Layer, ABC):
@staticmethod
def for_encode(pooler_config: PoolerConfig, model_config: Optional["ModelConfig"] = None):
if pooler_config.pooling_type == "STEP":
return StepPooler()
return StepPooler(model_config)
resolved_config = ResolvedPoolingConfig(task="encode", pooling_type=PoolingType.ALL)
return SimplePooler.from_config(resolved_config, model_config)
@@ -121,6 +120,14 @@ class Pooler(nn.Layer, ABC):
)
return SimplePooler.from_config(resolved_config, model_config)
@staticmethod
def for_reward(pooler_config: PoolerConfig, model_config: Optional["ModelConfig"] = None):
resolved_config = ResolvedPoolingConfig.from_config(
task="reward",
pooler_config=pooler_config,
)
return SimplePooler.from_config(resolved_config, model_config)
@staticmethod
def for_classify(
pooler_config: PoolerConfig,
@@ -274,6 +281,7 @@ class EmbeddingPoolerHead(PoolerHead):
pooled_data = [vecs if d is None else vecs[..., :d] for vecs, d in zip(pooled_data, dimensions_list)]
# for normalize
flags = [p.normalize for p in pooling_params]
if len(set(flags)) == 1:
if flags[0]:
pooled_data = self.activation(pooled_data)
@@ -293,7 +301,6 @@ class RewardPoolerHead(PoolerHead):
def forward(self, pooled_data: Union[list[paddle.Tensor], paddle.Tensor], pooling_metadata: PoolingMetadata):
pooling_params = get_pooling_params(pooling_metadata)
# for softmax
flags = [p.softmax for p in pooling_params]
if len(set(flags)) == 1:
if flags[0]:
@@ -345,7 +352,7 @@ class PoolingMethod(nn.Layer, ABC):
class LastPool(PoolingMethod):
def get_supported_tasks(self) -> Set[PoolingTask]:
return {"encode", "embed", "classify", "score"}
return {"encode", "embed", "classify", "score", "reward"}
def forward_all(
self,
@@ -366,8 +373,8 @@ class AllPool(PoolingMethod):
) -> Union[list[paddle.Tensor], paddle.Tensor]:
assert not pooling_cursor.is_partial_prefill(), "partial prefill not supported with ALL pooling"
hidden_states_lst = list(hidden_states.split(pooling_cursor.num_scheduled_tokens_cpu.tolist()))
return [hidden_states_lst[i] for i in pooling_cursor.index]
@@ -416,11 +423,12 @@ class CLSPool(PoolingMethod):
class StepPooler(Pooler):
def __init__(
self,
model_config: ModelConfig,
) -> None:
super().__init__()
self.pooling = AllPool()
self.head = RewardPoolerHead()
self.head = RewardPoolerHead(model_config)
def extract_states(
self,
@@ -455,14 +463,11 @@ class StepPooler(Pooler):
def forward(
self,
hidden_states: Union[paddle.Tensor, list[paddle.Tensor]],
hidden_states: paddle.Tensor | list[paddle.Tensor],
pooling_metadata: PoolingMetadata,
) -> PoolerOutput:
pooled_data = self.extract_states(hidden_states, pooling_metadata)
pooling_params = get_pooling_params(pooling_metadata)
assert len(pooled_data) == len(pooling_params)
pooled_data = [self.head(d, p) for d, p in zip(pooled_data, pooling_params)]
pooled_data = self.head(pooled_data, pooling_metadata)
return pooled_data
@@ -484,7 +489,7 @@ class SimplePooler(Pooler):
pooling = PoolingMethod.from_pooling_type(pooler_config.pooling_type)
if pooler_config.task == "embed":
head = EmbeddingPoolerHead(model_config)
elif pooler_config.task == "encode":
elif pooler_config.task == "encode" or pooler_config.task == "reward":
head = RewardPoolerHead(model_config)
else:
raise NotImplementedError(f"Unknown task: {pooler_config.task}")

View File

@@ -166,6 +166,7 @@ def as_embedding_model(cls: _T) -> _T:
{
"encode": Pooler.for_encode(pooler_config, fd_config.model_config),
"embed": Pooler.for_embed(pooler_config, fd_config.model_config),
"reward": Pooler.for_reward(pooler_config, fd_config.model_config),
},
)

View File

@@ -348,12 +348,17 @@ class Ernie4_5_VLDecoderLayer(nn.Layer):
prefix=f"{prefix}.mlp",
)
norm_dtype = None
if fd_config.model_config.architectures[0] == "Ernie4_5_VLMoeForProcessRewardModel":
norm_dtype = "float32"
self.input_layernorm = RMSNorm(
fd_config,
hidden_size=fd_config.model_config.hidden_size,
eps=fd_config.model_config.rms_norm_eps,
prefix=f"{prefix}.input_layernorm",
layer_id=layer_id,
dtype=norm_dtype,
)
self.post_attention_layernorm = RMSNorm(
@@ -362,6 +367,7 @@ class Ernie4_5_VLDecoderLayer(nn.Layer):
eps=fd_config.model_config.rms_norm_eps,
prefix=f"{prefix}.post_attention_layernorm",
layer_id=layer_id,
dtype=norm_dtype,
)
def load_state_dict(self, state_dict):
@@ -542,7 +548,6 @@ class Ernie4_5_VLModel(nn.Layer):
)
out = self.norm(hidden_states, residual, forward_meta=forward_meta)[0]
return out

View File

@@ -57,6 +57,7 @@ class Ernie4_5_VLMoeRewardBaseModel(nn.Layer):
)
self.ernie = Ernie4_5_VLModel(fd_config=fd_config)
self.head_dtype = paddle.bfloat16
self.fd_config = fd_config
# Persistent buffers for CUDA graphs.
if fd_config.graph_opt_config.use_cudagraph:
@@ -111,7 +112,7 @@ class Ernie4_5_VLMoeRewardBaseModel(nn.Layer):
input_embeddings = self.get_input_embeddings(
ids_remove_padding=ids_remove_padding,
image_features=image_features,
image_token_num=vl_moe_meta.image_token_num.item(),
image_token_num=vl_moe_meta.num_image_patch_id.item(),
)
if forward_meta.step_use_cudagraph:
@@ -124,18 +125,22 @@ class Ernie4_5_VLMoeRewardBaseModel(nn.Layer):
forward_meta=forward_meta,
vl_moe_meta=vl_moe_meta,
)
if isinstance(hidden_states, tuple):
hidden_states = hidden_states[0]
hidden_states = hidden_states.to(self.head_dtype)
logits = self.rm_head(hidden_states)
return logits
return logits.cast("float32")
@ModelRegistry.register_model_class(
architecture="Ernie4_5_VLMoeForProcessRewardModel",
module_name="ernie_vl_rm",
category=[ModelCategory.REWARD],
primary_use=ModelCategory.REWARD,
category=ModelCategory.REWARD | ModelCategory.MULTIMODAL,
primary_use=ModelCategory.REWARD | ModelCategory.MULTIMODAL,
)
@default_pooling_type("ALL")
@default_pooling_type("LAST")
class Ernie4_5_VLMoeForProcessRewardModel(Ernie4_5_VLMoeRewardBaseModel):
def __init__(self, fd_config: FDConfig):
@@ -147,7 +152,13 @@ class Ernie4_5_VLMoeForProcessRewardModel(Ernie4_5_VLMoeRewardBaseModel):
pooler_config = fd_config.model_config.pooler_config
assert pooler_config is not None
self.pooler = DispatchPooler({"encode": Pooler.for_encode(pooler_config)})
self.pooler = DispatchPooler(
{
"encode": Pooler.for_encode(pooler_config, fd_config.model_config),
"embed": Pooler.for_embed(pooler_config, fd_config.model_config),
"reward": Pooler.for_reward(pooler_config, fd_config.model_config),
},
)
self.process_weights_before_loading_fn = process_weights_before_loading(skip_prefixes=["lm_head"])
@@ -159,4 +170,5 @@ class Ernie4_5_VLMoeForProcessRewardModel(Ernie4_5_VLMoeRewardBaseModel):
@paddle.no_grad()
def load_weights(self, weights_iterator):
# Filter out lm_head weights of Ernie4_5_VLMoeForConditionalGeneration
Ernie4_5_VLMoeForConditionalGeneration.load_weights(self, weights_iterator)

View File

@@ -58,7 +58,7 @@ class ModelInfo:
is_text_generation=ModelCategory.TEXT_GENERATION in category,
is_multimodal=ModelCategory.MULTIMODAL in category,
is_reasoning=ModelCategory.REASONING in category,
is_pooling=ModelCategory.EMBEDDING in category,
is_pooling=(ModelCategory.EMBEDDING in category) or (ModelCategory.REWARD in category),
default_pooling_type=get_default_pooling_type(model_cls),
module_path=module_path,
)

View File

@@ -298,6 +298,8 @@ def _build_stream_transfer_data(
stream_transfer_datas.append(stream_transfer_data)
elif pooler_outputs is not None:
for bid, pooler_output in enumerate(pooler_outputs):
if pooler_output is None:
continue
if pooler_output.dtype == paddle.bfloat16:
pooler_output = pooler_output.astype("float32")

View File

@@ -542,7 +542,11 @@ class GPUModelRunner(ModelRunnerBase):
rope_3d_position_ids["position_ids_offset"].append(
position_ids.shape[0] + rope_3d_position_ids["position_ids_offset"][-1]
)
rope_3d_position_ids["max_tokens_lst"].append(request.get("max_tokens", 2048))
if self.is_pooling_model:
rope_3d_position_ids["max_tokens_lst"].append(0)
else:
rope_3d_position_ids["max_tokens_lst"].append(request.get("max_tokens", 2048))
def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None):
"""
@@ -2428,7 +2432,6 @@ class GPUModelRunner(ModelRunnerBase):
def _pool(self, hidden_states: paddle.Tensor, num_running_requests: int) -> Optional[ModelRunnerOutput]:
num_scheduled_tokens = int(self.share_inputs["seq_lens_this_time"][:num_running_requests].sum())
hidden_states = hidden_states[:num_scheduled_tokens]
prompt_lens = self.share_inputs["prompt_lens"][:num_running_requests]
@@ -2446,11 +2449,23 @@ class GPUModelRunner(ModelRunnerBase):
pooling_metadata.build_pooling_cursor(num_scheduled_tokens_list, device=device_str)
raw_pooler_output = self.model.pooler(hidden_states=hidden_states, pooling_metadata=pooling_metadata)
seq_lens_cpu = self.share_inputs["seq_lens_this_time"][:num_running_requests]
pooler_output: list[Optional[paddle.Tensor]] = []
for raw_output, seq_len, prompt_len in zip(raw_pooler_output, seq_lens_cpu, pooling_metadata.prompt_lens):
output = raw_output.data if int(seq_len) == int(prompt_len) else None
pooler_output.append(output)
seq_lens_decoder_batch = self.share_inputs["seq_lens_decoder"][:num_running_requests]
for i, (seq_len, prompt_len) in enumerate(zip(seq_lens_cpu, pooling_metadata.prompt_lens)):
if not self.cache_config.enable_prefix_caching:
output = raw_pooler_output[i].data if int(seq_len) == int(prompt_len) else None
pooler_output.append(output)
else:
current_seq_len_decoder = seq_lens_decoder_batch[i]
if int(current_seq_len_decoder) + int(seq_len) == int(prompt_len):
output = raw_pooler_output[i].data
else:
output = None
pooler_output.append(output)
pooler_output = PoolerOutput(
outputs=pooler_output,

View File

@@ -0,0 +1,204 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import signal
import subprocess
import sys
import time
import pytest
import requests
from e2e.utils.serving_utils import (
FD_API_PORT,
FD_CACHE_QUEUE_PORT,
FD_ENGINE_QUEUE_PORT,
FD_METRICS_PORT,
clean_ports,
is_port_open,
)
# ==========================
# Shared Helper Functions
# ==========================
def _start_server_process(enable_caching: bool, log_filename: str):
print(f"\n[Server Setup] Cleaning ports before starting (Caching={'ON' if enable_caching else 'OFF'})...")
clean_ports()
base_path = os.getenv("MODEL_PATH")
if base_path:
model_path = os.path.join(base_path, "RM_v1008_5")
else:
model_path = "./RM_v1008_5"
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model path not found: {model_path}")
cmd = [
sys.executable,
"-m",
"fastdeploy.entrypoints.openai.api_server",
"--model",
model_path,
"--port",
str(FD_API_PORT),
"--tensor-parallel-size",
"2",
"--engine-worker-queue-port",
str(FD_ENGINE_QUEUE_PORT),
"--metrics-port",
str(FD_METRICS_PORT),
"--cache-queue-port",
str(FD_CACHE_QUEUE_PORT),
"--max-model-len",
"8192",
"--max-num-seqs",
"256",
"--runner",
"pooling",
"--convert",
"embed",
]
if enable_caching:
cmd.append("--enable-prefix-caching")
else:
cmd.append("--no-enable-prefix-caching")
print(f"[Server Setup] Command: {' '.join(cmd)}")
with open(log_filename, "w") as logfile:
process = subprocess.Popen(
cmd,
stdout=logfile,
stderr=subprocess.STDOUT,
start_new_session=True,
)
# Wait for server to start
for _ in range(300):
if is_port_open("127.0.0.1", FD_API_PORT):
print(f"[Server Setup] Server is up on port {FD_API_PORT}")
break
time.sleep(1)
else:
print("[Server Setup] Server failed to start. Cleaning up...")
try:
os.killpg(process.pid, signal.SIGTERM)
except Exception:
pass
if os.path.exists(log_filename):
with open(log_filename, "r") as f:
print(f"Server Log Tail ({log_filename}):\n{f.read()[-500:]}")
raise RuntimeError(f"Server did not start on port {FD_API_PORT}")
return process
@pytest.fixture(scope="function")
def reward_api_url():
"""Returns the API endpoint URL for reward."""
return f"http://0.0.0.0:{FD_API_PORT}/v1/reward"
@pytest.fixture(scope="function")
def headers():
"""Returns common HTTP request headers."""
return {"Content-Type": "application/json"}
@pytest.fixture(scope="function")
def server_default_caching():
_start_server_process(enable_caching=True, log_filename="reward_server_caching_on.log")
@pytest.fixture(scope="function")
def server_no_caching():
_start_server_process(enable_caching=False, log_filename="reward_server_caching_off.log")
def save_score_baseline(score: float, baseline_file: str):
"""Save reward score to baseline file."""
baseline_data = {"score": score}
with open(baseline_file, "w", encoding="utf-8") as f:
json.dump(baseline_data, f, indent=2)
print(f"Baseline saved to: {baseline_file}")
def check_score_against_baseline(current_score: float, baseline_file: str, threshold: float = 0.01):
"""Check reward score against baseline file."""
try:
with open(baseline_file, "r", encoding="utf-8") as f:
baseline_data = json.load(f)
baseline_score = baseline_data["score"]
except FileNotFoundError:
print(f"Baseline file not found: {baseline_file}. Saving current as baseline.")
save_score_baseline(current_score, baseline_file)
return
diff = abs(current_score - baseline_score)
print(f"Score Difference: {diff:.6f} (Current: {current_score}, Baseline: {baseline_score})")
if diff >= threshold:
temp_file = f"{baseline_file}.current"
save_score_baseline(current_score, temp_file)
raise AssertionError(
f"Score differs from baseline by too much (diff={diff:.6f} >= {threshold}):\n"
f"Current score saved to: {temp_file}"
)
def _run_test_logic(reward_api_url, headers, baseline_filename):
payload = {
"model": "default",
"messages": [
{"role": "user", "content": [{"type": "text", "text": "北京天安门在哪里?"}]},
{"role": "assistant", "content": [{"type": "text", "text": "北京天安门在中国北京故宫的前面。"}]},
],
"user": "user-123",
"enable_thinking": False,
}
print(f"\n=== Sending request to {reward_api_url} ===")
response = requests.post(reward_api_url, headers=headers, json=payload, timeout=30)
assert response.status_code == 200, f"API request failed with status {response.status_code}: {response.text}"
result = response.json()
print(f"Response: {json.dumps(result, indent=2, ensure_ascii=False)}")
assert "data" in result and len(result["data"]) > 0
score = float(result["data"][0]["score"][0])
print(f"✓ Reward Score: {score}")
base_path = os.getenv("MODEL_PATH", "")
if base_path:
baseline_file = os.path.join(base_path, baseline_filename)
else:
baseline_file = baseline_filename
check_score_against_baseline(score, baseline_file, threshold=0.01)
def test_reward_model_with_caching(server_default_caching, reward_api_url, headers):
print("\n>>> Running Test: WITH Prefix Caching")
_run_test_logic(reward_api_url, headers, baseline_filename="reward_score_baseline.json")
def test_reward_model_without_caching(server_no_caching, reward_api_url, headers):
print("\n>>> Running Test: WITHOUT Prefix Caching")
_run_test_logic(reward_api_url, headers, baseline_filename="reward_score_baseline_no_caching.json")

View File

@@ -79,7 +79,7 @@ def setup_and_run_embedding_server():
model_path = "./Qwen3-Embedding-0.6B"
if not os.path.exists(model_path):
pytest.skip(f"Model path not found: {model_path}")
raise FileNotFoundError(f"Model path not found: {model_path}")
log_path = "embedding_server.log"
cmd = [