mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-10-03 15:56:49 +08:00
Compare commits
23 Commits
fix-gpu-me
...
release/2.
Author | SHA1 | Date | |
---|---|---|---|
![]() |
bd30b08521 | ||
![]() |
1aa16146ba | ||
![]() |
dac0a00d0f | ||
![]() |
c5591c45df | ||
![]() |
121ac85d7d | ||
![]() |
d233e3c97c | ||
![]() |
2136990144 | ||
![]() |
b7890cbe8d | ||
![]() |
bc388b65c7 | ||
![]() |
71af0ca04a | ||
![]() |
d66660a0d1 | ||
![]() |
f0519aec67 | ||
![]() |
1f5983290c | ||
![]() |
c6a133d573 | ||
![]() |
4646aff25c | ||
![]() |
a84a98b107 | ||
![]() |
c208086f61 | ||
![]() |
ce1d4944e7 | ||
![]() |
5439fb6336 | ||
![]() |
a592d17615 | ||
![]() |
eca8fc7ca6 | ||
![]() |
0463797fc2 | ||
![]() |
0ab8645fc4 |
26
.github/workflows/ci.yml
vendored
26
.github/workflows/ci.yml
vendored
@@ -67,9 +67,29 @@ jobs:
|
||||
gpu_id=0
|
||||
DEVICES="0,1"
|
||||
fi
|
||||
FD_API_PORT=$((9180 + gpu_id * 100))
|
||||
FD_ENGINE_QUEUE_PORT=$((9150 + gpu_id * 100))
|
||||
FD_METRICS_PORT=$((9170 + gpu_id * 100))
|
||||
|
||||
FLASK_PORT=$((41068 + gpu_id * 100))
|
||||
FD_API_PORT=$((41088 + gpu_id * 100))
|
||||
FD_ENGINE_QUEUE_PORT=$((41058 + gpu_id * 100))
|
||||
FD_METRICS_PORT=$((41078 + gpu_id * 100))
|
||||
|
||||
PORTS=($FLASK_PORT $FD_API_PORT $FD_ENGINE_QUEUE_PORT $FD_METRICS_PORT)
|
||||
LOG_FILE="./port_cleanup_$(date +%Y%m%d_%H%M%S).log"
|
||||
echo "==== LOG_FILE is ${LOG_FILE} ===="
|
||||
|
||||
echo "==== PORT CLEAN BEFORE TASK RUN ====" | tee -a $LOG_FILE
|
||||
|
||||
for port in "${PORTS[@]}"; do
|
||||
PIDS=$(lsof -t -i :$port || true)
|
||||
if [ -n "$PIDS" ]; then
|
||||
echo "Port $port is occupied by PID(s): $PIDS" | tee -a $LOG_FILE
|
||||
echo "$PIDS" | xargs -r kill -9
|
||||
echo "Port $port cleared" | tee -a $LOG_FILE
|
||||
else
|
||||
echo "Port $port is free" | tee -a $LOG_FILE
|
||||
fi
|
||||
done
|
||||
echo "==== PORT CLEAN COMPLETE ====" | tee -a $LOG_FILE
|
||||
|
||||
PARENT_DIR=$(dirname "$WORKSPACE")
|
||||
echo "PARENT_DIR:$PARENT_DIR"
|
||||
|
@@ -8,14 +8,14 @@ Reasoning models return an additional `reasoning_content` field in their output,
|
||||
| baidu/ERNIE-4.5-VL-424B-A47B-Paddle | ernie-45-vl | ✓ |
|
||||
| baidu/ERNIE-4.5-VL-28B-A3B-Paddle | ernie-45-vl | ✓ |
|
||||
|
||||
The reasoning model requires a specified parser to extract reasoning content. The reasoning mode can be disabled by setting the `enable_thinking=False` parameter.
|
||||
The reasoning model requires a specified parser to extract reasoning content. The reasoning mode can be disabled by setting the `"enable_thinking": false` parameter.
|
||||
|
||||
Interfaces that support toggling the reasoning mode:
|
||||
1. `/v1/chat/completions` requests in OpenAI services.
|
||||
2. `/v1/chat/completions` requests in the OpenAI Python client.
|
||||
3. `llm.chat` requests in Offline interfaces.
|
||||
|
||||
For reasoning models, the length of the reasoning content can be controlled via `reasoning_max_tokens`. Add `metadata={"reasoning_max_tokens": 1024}` to the request.
|
||||
For reasoning models, the length of the reasoning content can be controlled via `reasoning_max_tokens`. Add `"reasoning_max_tokens": 1024` to the request.
|
||||
|
||||
### Quick Start
|
||||
When launching the model service, specify the parser name using the `--reasoning-parser` argument.
|
||||
@@ -43,7 +43,8 @@ curl -X POST "http://0.0.0.0:8192/v1/chat/completions" \
|
||||
{"type": "text", "text": "Which era does the cultural relic in the picture belong to"}
|
||||
]}
|
||||
],
|
||||
"metadata": {"enable_thinking": true}
|
||||
"enable_thinking": true,
|
||||
"reasoning_max_tokens": 1024
|
||||
}'
|
||||
```
|
||||
|
||||
@@ -68,7 +69,10 @@ chat_response = client.chat.completions.create(
|
||||
],
|
||||
model="vl",
|
||||
stream=True,
|
||||
metadata={"enable_thinking": True}
|
||||
extra_body={
|
||||
"enable_thinking": True,
|
||||
"reasoning_max_tokens": 1024
|
||||
}
|
||||
)
|
||||
for chunk in chat_response:
|
||||
if chunk.choices[0].delta is not None:
|
||||
|
@@ -113,7 +113,7 @@ curl -X POST "http://0.0.0.0:8180/v1/chat/completions" \
|
||||
{"type": "text", "text": "From which era does the artifact in the image originate?"}
|
||||
]}
|
||||
],
|
||||
"metadata": {"enable_thinking": false}
|
||||
"enable_thinking": false
|
||||
}'
|
||||
```
|
||||
|
||||
|
@@ -74,7 +74,7 @@ curl -X POST "http://0.0.0.0:8180/v1/chat/completions" \
|
||||
{"type": "text", "text": "What era does this artifact belong to?"}
|
||||
]}
|
||||
],
|
||||
"metadata": {"enable_thinking": false}
|
||||
"enable_thinking": false
|
||||
}'
|
||||
```
|
||||
|
||||
@@ -96,7 +96,7 @@ response = client.chat.completions.create(
|
||||
{"type": "text", "text": "What era does this artifact belong to?"},
|
||||
]},
|
||||
],
|
||||
metadata={"enable_thinking": false},
|
||||
extra_body={"enable_thinking": false},
|
||||
stream=True,
|
||||
)
|
||||
for chunk in response:
|
||||
|
@@ -88,11 +88,12 @@ The differences in request parameters between FastDeploy and the OpenAI protocol
|
||||
- `stream_options`: Optional[StreamOptions] = None
|
||||
- `temperature`: Optional[float] = None
|
||||
- `top_p`: Optional[float] = None
|
||||
- `metadata`: Optional[dict] = None (supported only in `v1/chat/completions` for configuring additional parameters, e.g., `metadata={"enable_thinking": True}`)
|
||||
- `extra_body`: Optional[dict] = None (supported only in `v1/chat/completions` for configuring additional parameters, e.g., `extra_body={"enable_thinking": True}`)
|
||||
- `min_tokens`: Optional[int] = 1 (minimum number of tokens generated)
|
||||
- `reasoning_max_tokens`: Optional[int] = None (maximum number of tokens for reasoning content, defaults to the same as `max_tokens`)
|
||||
- `enable_thinking`: Optional[bool] = True (whether to enable reasoning for models that support deep thinking)
|
||||
- `repetition_penalty`: Optional[float] = None (coefficient for directly penalizing repeated token generation (>1 penalizes repetition, <1 encourages repetition))
|
||||
- `return_token_ids`: Optional[bool] = False: (whether to return token ids as a list)
|
||||
|
||||
> Note: For multimodal models, since the reasoning chain is enabled by default, resulting in overly long outputs, `max_tokens` can be set to the model's maximum output length or the default value can be used.
|
||||
|
||||
@@ -102,6 +103,8 @@ The additional return fields added by FastDeploy are as follows:
|
||||
|
||||
- `arrival_time`: Returns the cumulative time taken for all tokens
|
||||
- `reasoning_content`: The returned result of the reasoning chain
|
||||
- `prompt_token_ids`: The token id list of the prompt
|
||||
- `completion_token_ids`: The token id list of the completion
|
||||
|
||||
Overview of return parameters:
|
||||
|
||||
@@ -112,7 +115,7 @@ ChatCompletionStreamResponse:
|
||||
created: int = Field(default_factory=lambda: int(time.time()))
|
||||
model: str
|
||||
choices: List[ChatCompletionResponseStreamChoice]
|
||||
ChatCompletionResponseStreamChoice:
|
||||
ChatCompletionResponseStreamChoice:
|
||||
index: int
|
||||
delta: DeltaMessage
|
||||
finish_reason: Optional[Literal["stop", "length"]] = None
|
||||
@@ -120,6 +123,7 @@ ChatCompletionStreamResponse:
|
||||
DeltaMessage:
|
||||
role: Optional[str] = None
|
||||
content: Optional[str] = None
|
||||
token_ids: Optional[List[int]] = None
|
||||
prompt_token_ids: Optional[List[int]] = None
|
||||
completion_token_ids: Optional[List[int]] = None
|
||||
reasoning_content: Optional[str] = None
|
||||
```
|
||||
|
@@ -8,18 +8,18 @@
|
||||
| baidu/ERNIE-4.5-VL-424B-A47B-Paddle | ernie-45-vl | ✓ |
|
||||
| baidu/ERNIE-4.5-VL-28B-A3B-Paddle | ernie-45-vl | ✓ |
|
||||
|
||||
思考模型需要指定解析器,以便于对思考内容进行解析. 通过`enable_thinking=False` 参数可以关闭模型思考模式.
|
||||
思考模型需要指定解析器,以便于对思考内容进行解析. 通过 `"enable_thinking": false` 参数可以关闭模型思考模式.
|
||||
|
||||
可以支持思考模式开关的接口:
|
||||
1. OpenAI 服务中 `/v1/chat/completions` 请求.
|
||||
2. OpenAI Python客户端中 `/v1/chat/completions` 请求.
|
||||
3. Offline 接口中 `llm.chat`请求.
|
||||
|
||||
同时在思考模型中,支持通过```reasoning_max_tokens```控制思考内容的长度,在请求中添加```metadata={"reasoning_max_tokens": 1024}```即可。
|
||||
同时在思考模型中,支持通过 `reasoning_max_tokens` 控制思考内容的长度,在请求中添加 `"reasoning_max_tokens": 1024` 即可。
|
||||
|
||||
## 快速使用
|
||||
在启动模型服务时, 通过`--reasoning-parser`参数指定解析器名称.
|
||||
该解析器会解析思考模型的输出, 提取`reasoning_content`字段.
|
||||
在启动模型服务时, 通过 `--reasoning-parser` 参数指定解析器名称.
|
||||
该解析器会解析思考模型的输出, 提取 `reasoning_content` 字段.
|
||||
|
||||
```bash
|
||||
python -m fastdeploy.entrypoints.openai.api_server \
|
||||
@@ -43,15 +43,16 @@ curl -X POST "http://0.0.0.0:8192/v1/chat/completions" \
|
||||
{"type": "text", "text": "图中的文物属于哪个年代"}
|
||||
]}
|
||||
],
|
||||
"metadata": {"enable_thinking": true}
|
||||
"enable_thinking": true,
|
||||
"reasoning_max_tokens": 1024
|
||||
}'
|
||||
|
||||
```
|
||||
|
||||
字段`reasoning_content`包含得出最终结论的思考步骤,而`content`字段包含最终结论。
|
||||
字段 `reasoning_content` 包含得出最终结论的思考步骤,而 `content` 字段包含最终结论。
|
||||
|
||||
### 流式会话
|
||||
在流式会话中, `reasoning_content`字段会可以在`chat completion response chunks`中的 `delta` 中获取
|
||||
在流式会话中, `reasoning_content` 字段会可以在 `chat completion response chunks` 中的 `delta` 中获取
|
||||
|
||||
```python
|
||||
from openai import OpenAI
|
||||
@@ -69,7 +70,10 @@ chat_response = client.chat.completions.create(
|
||||
],
|
||||
model="vl",
|
||||
stream=True,
|
||||
metadata={"enable_thinking": True}
|
||||
extra_body={
|
||||
"enable_thinking": True,
|
||||
"reasoning_max_tokens": 1024
|
||||
}
|
||||
)
|
||||
for chunk in chat_response:
|
||||
if chunk.choices[0].delta is not None:
|
||||
|
@@ -110,7 +110,7 @@ curl -X POST "http://0.0.0.0:8180/v1/chat/completions" \
|
||||
{"type": "text", "text": "图中的文物属于哪个年代"}
|
||||
]}
|
||||
],
|
||||
"metadata": {"enable_thinking": false}
|
||||
"enable_thinking": false
|
||||
}'
|
||||
```
|
||||
|
||||
|
@@ -73,7 +73,7 @@ curl -X POST "http://0.0.0.0:8180/v1/chat/completions" \
|
||||
{"type": "text", "text": "图中的文物属于哪个年代"}
|
||||
]}
|
||||
],
|
||||
"metadata": {"enable_thinking": false}
|
||||
"enable_thinking": false
|
||||
}'
|
||||
```
|
||||
|
||||
@@ -93,7 +93,7 @@ response = client.chat.completions.create(
|
||||
{"type": "text", "text": "图中的文物属于哪个年代?"},
|
||||
]},
|
||||
],
|
||||
metadata={"enable_thinking": false},
|
||||
extra_body={"enable_thinking": false},
|
||||
stream=True,
|
||||
)
|
||||
for chunk in response:
|
||||
|
@@ -87,11 +87,12 @@ FastDeploy 与 OpenAI 协议的请求参数差异如下,其余请求参数会
|
||||
- `stream_options`: Optional[StreamOptions] = None
|
||||
- `temperature`: Optional[float] = None
|
||||
- `top_p`: Optional[float] = None
|
||||
- `metadata`: Optional[dict] = None (仅在v1/chat/compeltions中支持,用于配置额外参数, 如metadata={"enable_thinking": True})
|
||||
- `extra_body`: Optional[dict] = None (仅在 v1/chat/compeltions 中支持,用于配置额外参数, 如 `extra_body={"enable_thinking": True}`)
|
||||
- `min_tokens`: Optional[int] = 1 最小生成的Token个数
|
||||
- `reasoning_max_tokens`: Optional[int] = None 思考内容最大Token数,默认与max_tokens一致
|
||||
- `enable_thinking`: Optional[bool] = True 支持深度思考的模型是否打开思考
|
||||
- `repetition_penalty`: Optional[float] = None: 直接对重复生成的token进行惩罚的系数(>1时惩罚重复,<1时鼓励重复)
|
||||
- `return_token_ids`: Optional[bool] = False: 是否返回 token id 列表
|
||||
|
||||
> 注: 若为多模态模型 由于思考链默认打开导致输出过长,max tokens 可以设置为模型最长输出,或使用默认值。
|
||||
|
||||
@@ -101,6 +102,8 @@ FastDeploy 增加的返回字段如下:
|
||||
|
||||
- `arrival_time`:返回所有 token 的累计耗时
|
||||
- `reasoning_content`: 思考链的返回结果
|
||||
- `prompt_token_ids`: 输入序列的 token id 列表
|
||||
- `completion_token_ids`: 输出序列的 token id 列表
|
||||
|
||||
返回参数总览:
|
||||
|
||||
@@ -111,7 +114,7 @@ ChatCompletionStreamResponse:
|
||||
created: int = Field(default_factory=lambda: int(time.time()))
|
||||
model: str
|
||||
choices: List[ChatCompletionResponseStreamChoice]
|
||||
ChatCompletionResponseStreamChoice:
|
||||
ChatCompletionResponseStreamChoice:
|
||||
index: int
|
||||
delta: DeltaMessage
|
||||
finish_reason: Optional[Literal["stop", "length"]] = None
|
||||
@@ -119,6 +122,7 @@ ChatCompletionStreamResponse:
|
||||
DeltaMessage:
|
||||
role: Optional[str] = None
|
||||
content: Optional[str] = None
|
||||
token_ids: Optional[List[int]] = None
|
||||
prompt_token_ids: Optional[List[int]] = None
|
||||
completion_token_ids: Optional[List[int]] = None
|
||||
reasoning_content: Optional[str] = None
|
||||
```
|
||||
|
@@ -24,6 +24,11 @@ os.environ["GLOG_minloglevel"] = "2"
|
||||
os.environ["AISTUDIO_LOG"] = "critical"
|
||||
from fastdeploy.engine.sampling_params import SamplingParams
|
||||
from fastdeploy.entrypoints.llm import LLM
|
||||
from fastdeploy.utils import envs
|
||||
from paddleformers.utils.log import logger as pf_logger
|
||||
if envs.FD_DEBUG != "1":
|
||||
import logging
|
||||
pf_logger.logger.setLevel(logging.INFO)
|
||||
|
||||
__all__ = ["LLM", "SamplingParams"]
|
||||
|
||||
|
@@ -64,7 +64,10 @@ class PrefixCacheManager:
|
||||
self.speculative_config = config.speculative_config
|
||||
self.local_data_parallel_id = local_data_parallel_id
|
||||
|
||||
self.num_gpu_blocks = self.cache_config.prefill_kvcache_block_num
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
self.num_gpu_blocks = self.cache_config.total_block_num
|
||||
else:
|
||||
self.num_gpu_blocks = self.cache_config.prefill_kvcache_block_num
|
||||
self.num_cpu_blocks = self.cache_config.num_cpu_blocks
|
||||
self.gpu_free_block_list = list(range(self.num_gpu_blocks - 1, -1, -1))
|
||||
if self.num_cpu_blocks > 0:
|
||||
|
@@ -108,6 +108,7 @@ class ModelConfig:
|
||||
self.enable_mm = False
|
||||
self.enable_redundant_experts = False
|
||||
self.redundant_experts_num = 0
|
||||
self.lm_head_fp32: bool = False
|
||||
|
||||
for key, value in args.items():
|
||||
if hasattr(self, key):
|
||||
@@ -132,9 +133,7 @@ class ModelConfig:
|
||||
if hasattr(self, "vision_config"):
|
||||
self.vision_config = PretrainedConfig.from_dict(self.vision_config)
|
||||
|
||||
self.ori_vocab_size = self.vocab_size
|
||||
if ErnieArchitectures.contains_ernie_arch(self.architectures):
|
||||
self.ori_vocab_size = args.get("ori_vocab_size", self.ori_vocab_size)
|
||||
self.ori_vocab_size = args.get("ori_vocab_size", self.vocab_size)
|
||||
|
||||
|
||||
class ParallelConfig:
|
||||
|
@@ -15,6 +15,7 @@
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import asdict, dataclass
|
||||
from dataclasses import fields as dataclass_fields
|
||||
from typing import Any, Dict, List, Optional
|
||||
@@ -314,6 +315,11 @@ class EngineArgs:
|
||||
Must be explicitly enabled via the `--enable-logprob` startup parameter to output logprob values.
|
||||
"""
|
||||
|
||||
lm_head_fp32: bool = None
|
||||
"""
|
||||
Flag to specify the data type of lm_head as FP32.
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
"""
|
||||
Post-initialization processing to set default tokenizer if not provided.
|
||||
@@ -465,6 +471,12 @@ class EngineArgs:
|
||||
default=EngineArgs.enable_logprob,
|
||||
help="Enable output of token-level log probabilities.",
|
||||
)
|
||||
model_group.add_argument(
|
||||
"--lm_head-fp32",
|
||||
action="store_true",
|
||||
default=EngineArgs.lm_head_fp32,
|
||||
help="Specify the dtype of lm_head weight as float32.",
|
||||
)
|
||||
|
||||
# Parallel processing parameters group
|
||||
parallel_group = parser.add_argument_group("Parallel Configuration")
|
||||
@@ -768,6 +780,7 @@ class EngineArgs:
|
||||
quantization=self.quantization,
|
||||
dynamic_load_weight=self.dynamic_load_weight,
|
||||
load_strategy=self.load_strategy,
|
||||
lm_head_fp32=self.lm_head_fp32,
|
||||
)
|
||||
|
||||
def create_cache_config(self, model_cfg) -> CacheConfig:
|
||||
@@ -854,7 +867,10 @@ class EngineArgs:
|
||||
if self.enable_chunked_prefill:
|
||||
self.max_num_batched_tokens = 2048
|
||||
else:
|
||||
self.max_num_batched_tokens = self.max_model_len
|
||||
if not int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")):
|
||||
self.max_num_batched_tokens = self.max_model_len
|
||||
else:
|
||||
self.max_num_batched_tokens = 8192
|
||||
scheduler_cfg = self.create_scheduler_config()
|
||||
speculative_cfg = self.create_speculative_config()
|
||||
graph_opt_cfg = self.create_graph_optimization_config()
|
||||
|
@@ -51,6 +51,7 @@ class ModelConfig:
|
||||
load_strategy: str = "ipc_snapshot",
|
||||
quantization: str = None,
|
||||
download_dir: Optional[str] = None,
|
||||
lm_head_fp32: bool = False,
|
||||
):
|
||||
"""
|
||||
Initialize the ModelConfig class.
|
||||
@@ -65,6 +66,7 @@ class ModelConfig:
|
||||
self.dynamic_load_weight = dynamic_load_weight
|
||||
self.load_strategy = load_strategy
|
||||
self.quantization = quantization
|
||||
self.lm_head_fp32 = lm_head_fp32
|
||||
|
||||
config_file = os.path.join(model_name_or_path, config_json_file)
|
||||
if os.path.isfile(model_name_or_path):
|
||||
@@ -211,6 +213,8 @@ class CacheConfig:
|
||||
self.gpu_memory_utilization = gpu_memory_utilization
|
||||
self.num_gpu_blocks_override = num_gpu_blocks_override
|
||||
self.kv_cache_ratio = kv_cache_ratio
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
self.kv_cache_ratio = 1.0
|
||||
self.enc_dec_block_num = enc_dec_block_num
|
||||
self.prealloc_dec_block_slot_num_threshold = prealloc_dec_block_slot_num_threshold
|
||||
self.cache_dtype = cache_dtype
|
||||
@@ -291,7 +295,10 @@ class CacheConfig:
|
||||
self.dec_token_num = self.enc_dec_block_num * self.block_size
|
||||
if self.num_gpu_blocks_override is not None:
|
||||
self.total_block_num = self.num_gpu_blocks_override
|
||||
self.prefill_kvcache_block_num = int(self.total_block_num * self.kv_cache_ratio)
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
self.prefill_kvcache_block_num = self.total_block_num
|
||||
else:
|
||||
self.prefill_kvcache_block_num = int(self.total_block_num * self.kv_cache_ratio)
|
||||
else:
|
||||
length = num_total_tokens // number_of_tasks
|
||||
block_num = (length + self.block_size - 1 + self.dec_token_num) // self.block_size
|
||||
@@ -304,7 +311,10 @@ class CacheConfig:
|
||||
reset gpu block number
|
||||
"""
|
||||
self.total_block_num = num_gpu_blocks
|
||||
self.prefill_kvcache_block_num = int(self.total_block_num * self.kv_cache_ratio)
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
self.prefill_kvcache_block_num = self.total_block_num
|
||||
else:
|
||||
self.prefill_kvcache_block_num = int(self.total_block_num * self.kv_cache_ratio)
|
||||
llm_logger.info(
|
||||
f"Reset block num, the total_block_num:{self.total_block_num},"
|
||||
f" prefill_kvcache_block_num:{self.prefill_kvcache_block_num}"
|
||||
@@ -796,7 +806,10 @@ class Config:
|
||||
if self.cache_config.enable_chunked_prefill:
|
||||
self.max_num_batched_tokens = 2048
|
||||
else:
|
||||
self.max_num_batched_tokens = self.max_model_len
|
||||
if not int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")):
|
||||
self.max_num_batched_tokens = self.max_model_len
|
||||
else:
|
||||
self.max_num_batched_tokens = 8192
|
||||
|
||||
if self.long_prefill_token_threshold == 0:
|
||||
self.long_prefill_token_threshold = int(self.max_model_len * 0.04)
|
||||
@@ -844,10 +857,11 @@ class Config:
|
||||
)
|
||||
|
||||
if not self.cache_config.enable_chunked_prefill:
|
||||
assert self.max_num_batched_tokens >= self.max_model_len, (
|
||||
f"max_num_batched_tokens: {self.max_num_batched_tokens} "
|
||||
f"should be larger than or equal to max_model_len: {self.max_model_len}"
|
||||
)
|
||||
if not int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")):
|
||||
assert self.max_num_batched_tokens >= self.max_model_len, (
|
||||
f"max_num_batched_tokens: {self.max_num_batched_tokens} "
|
||||
f"should be larger than or equal to max_model_len: {self.max_model_len}"
|
||||
)
|
||||
else:
|
||||
assert self.max_num_batched_tokens >= self.cache_config.block_size, (
|
||||
f"max_num_batched_tokens: {self.max_num_batched_tokens} "
|
||||
|
@@ -499,6 +499,7 @@ class LLMEngine:
|
||||
enable_thinking = kwargs.get("enable_thinking", None)
|
||||
request = self.data_processor.process_request(request, self.cfg.max_model_len, enable_thinking=enable_thinking)
|
||||
request.prompt_token_ids_len = len(request.prompt_token_ids)
|
||||
request.need_prefill_tokens = request.prompt_token_ids_len
|
||||
input_ids_len = request.prompt_token_ids_len
|
||||
request.set(
|
||||
"max_tokens",
|
||||
@@ -1098,6 +1099,7 @@ class LLMEngine:
|
||||
"enable_custom_all_reduce": self.cfg.parallel_config.enable_custom_all_reduce,
|
||||
"enable_logprob": self.cfg.enable_logprob,
|
||||
"enable_mm": self.cfg.enable_mm,
|
||||
"lm_head_fp32": self.cfg.model_config.lm_head_fp32,
|
||||
}
|
||||
for worker_flag, value in worker_append_flag.items():
|
||||
if value:
|
||||
|
@@ -60,6 +60,7 @@ class Request:
|
||||
preprocess_end_time: Optional[float] = None,
|
||||
multimodal_inputs: Optional[dict] = None,
|
||||
multimodal_data: Optional[dict] = None,
|
||||
disable_chat_template: bool = False,
|
||||
disaggregate_info: Optional[dict] = None,
|
||||
draft_token_ids: Optional[list[int]] = None,
|
||||
guided_json: Optional[Any] = None,
|
||||
@@ -87,6 +88,7 @@ class Request:
|
||||
self.arrival_time = arrival_time
|
||||
self.preprocess_start_time = preprocess_start_time
|
||||
self.preprocess_end_time = preprocess_end_time
|
||||
self.disable_chat_template = disable_chat_template
|
||||
self.disaggregate_info = disaggregate_info
|
||||
|
||||
# speculative method in disaggregate-mode
|
||||
@@ -115,6 +117,7 @@ class Request:
|
||||
self.status = RequestStatus.WAITING
|
||||
self.task_type = RequestType.PREFILL
|
||||
self.idx = None
|
||||
self.need_prefill_tokens = self.prompt_token_ids_len
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: dict):
|
||||
@@ -136,6 +139,7 @@ class Request:
|
||||
preprocess_end_time=d.get("preprocess_end_time"),
|
||||
multimodal_inputs=d.get("multimodal_inputs"),
|
||||
multimodal_data=d.get("multimodal_data"),
|
||||
disable_chat_template=d.get("disable_chat_template"),
|
||||
disaggregate_info=d.get("disaggregate_info"),
|
||||
draft_token_ids=d.get("draft_token_ids"),
|
||||
guided_json=d.get("guided_json", None),
|
||||
@@ -180,6 +184,7 @@ class Request:
|
||||
"preprocess_end_time": self.preprocess_end_time,
|
||||
"multimodal_inputs": self.multimodal_inputs,
|
||||
"multimodal_data": self.multimodal_data,
|
||||
"disable_chat_template": self.disable_chat_template,
|
||||
"disaggregate_info": self.disaggregate_info,
|
||||
"draft_token_ids": self.draft_token_ids,
|
||||
"enable_thinking": self.enable_thinking,
|
||||
|
@@ -95,6 +95,9 @@ class SamplingParams:
|
||||
reasoning_max_tokens: Optional[int] = None
|
||||
min_tokens: int = 1
|
||||
logprobs: Optional[int] = None
|
||||
# For logits and logprobs post processing
|
||||
temp_scaled_logprobs: bool = False
|
||||
top_p_normalized_logprobs: bool = False
|
||||
bad_words: Optional[List[str]] = None
|
||||
|
||||
@classmethod
|
||||
|
@@ -56,6 +56,7 @@ class ResourceManagerV1(ResourceManager):
|
||||
self.running: list[Request] = []
|
||||
self.finish_execution_pool = ThreadPoolExecutor(max_workers=1)
|
||||
self.lock = threading.Lock()
|
||||
self.to_be_rescheduled_request_id_set = set()
|
||||
|
||||
def allocated_slots(self, request: Request):
|
||||
return len(request.block_tables) * self.config.cache_config.block_size
|
||||
@@ -76,6 +77,13 @@ class ResourceManagerV1(ResourceManager):
|
||||
|
||||
def _prepare_preempt_task(self, request):
|
||||
return ScheduledPreemptTask(idx=request.idx, request_id=request.request_id)
|
||||
|
||||
def reschedule_preempt_task(self, request_id):
|
||||
with self.lock:
|
||||
if request_id in self.to_be_rescheduled_request_id_set and request_id in self.requests:
|
||||
request = self.requests[request_id]
|
||||
self.waiting.appendleft(request)
|
||||
self.to_be_rescheduled_request_id_set.remove(request_id)
|
||||
|
||||
def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_reqs):
|
||||
can_schedule = True
|
||||
@@ -85,7 +93,7 @@ class ResourceManagerV1(ResourceManager):
|
||||
preempted_req.status = RequestStatus.PREEMPTED
|
||||
preempted_req.num_computed_tokens = 0
|
||||
self._free_blocks(preempted_req)
|
||||
self.waiting.appendleft(preempted_req)
|
||||
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
|
||||
preempted_reqs.append(preempted_req)
|
||||
scheduled_reqs.append(self._prepare_preempt_task(preempted_req))
|
||||
if preempted_req == request:
|
||||
@@ -98,6 +106,87 @@ class ResourceManagerV1(ResourceManager):
|
||||
break
|
||||
return can_schedule
|
||||
|
||||
def _get_num_new_tokens(self, request, token_budget):
|
||||
num_new_tokens = request.need_prefill_tokens - request.num_computed_tokens
|
||||
num_new_tokens = min(num_new_tokens, token_budget)
|
||||
|
||||
if not self.config.enable_mm:
|
||||
return num_new_tokens
|
||||
|
||||
inputs = request.multimodal_inputs
|
||||
request.with_image = False
|
||||
# Compatible with scenarios without images and videos.
|
||||
if inputs["images"] is None:
|
||||
return num_new_tokens
|
||||
|
||||
input_ids_lst = request.prompt_token_ids + request.output_token_ids
|
||||
input_ids = paddle.to_tensor(input_ids_lst, dtype="int64")
|
||||
grid_thw = []
|
||||
for one in inputs["grid_thw"]:
|
||||
if one[0] == 1:
|
||||
grid_thw.append(one)
|
||||
else:
|
||||
grid_thw.extend([[2, one[1], one[2]]] * (one[0] // 2))
|
||||
|
||||
image_patch_id = inputs["image_patch_id"]
|
||||
grid_thw = paddle.to_tensor(grid_thw, dtype="int64")
|
||||
if request.multimodal_img_boundaries is None:
|
||||
from fastdeploy.model_executor.ops.gpu import get_img_boundaries
|
||||
|
||||
request.multimodal_img_boundaries = get_img_boundaries(
|
||||
task_input_ids=input_ids, grid_thw=grid_thw, image_patch_id=image_patch_id
|
||||
).numpy()
|
||||
|
||||
img_boundaries_idx = request.multimodal_img_boundaries[0]
|
||||
img_num_per_boundary = request.multimodal_img_boundaries[1]
|
||||
ori_prompt_len = img_boundaries_idx[-1].item()
|
||||
grid_thw = grid_thw.numpy().reshape([-1, 3])
|
||||
pre_end_idx = request.num_computed_tokens
|
||||
new_end_idx = pre_end_idx + num_new_tokens
|
||||
if new_end_idx < ori_prompt_len and input_ids[new_end_idx - 1] == image_patch_id:
|
||||
boundary_idx = np.searchsorted(img_boundaries_idx, new_end_idx, side="left").item()
|
||||
if boundary_idx == len(img_boundaries_idx):
|
||||
new_end_idx = ori_prompt_len
|
||||
else:
|
||||
new_end_idx = img_boundaries_idx[boundary_idx].item()
|
||||
elif new_end_idx >= ori_prompt_len and paddle.sum(input_ids[pre_end_idx:new_end_idx] == image_patch_id):
|
||||
new_end_idx = ori_prompt_len
|
||||
num_new_tokens = new_end_idx - pre_end_idx
|
||||
|
||||
image_mask = input_ids[pre_end_idx:new_end_idx] == image_patch_id
|
||||
request.with_image = image_mask.any()
|
||||
if request.with_image:
|
||||
pre_boundary_idx = np.searchsorted(img_boundaries_idx, pre_end_idx, side="left").item()
|
||||
if pre_boundary_idx == len(img_boundaries_idx):
|
||||
request.num_image_start = img_num_per_boundary[-1]
|
||||
else:
|
||||
pre_boundary_idx = (
|
||||
pre_boundary_idx if pre_end_idx == img_boundaries_idx[pre_boundary_idx] else pre_boundary_idx - 1
|
||||
)
|
||||
request.num_image_start = img_num_per_boundary[pre_boundary_idx]
|
||||
|
||||
new_boundary_idx = np.searchsorted(img_boundaries_idx, new_end_idx, side="left").item()
|
||||
if new_boundary_idx == len(img_boundaries_idx):
|
||||
request.num_image_end = img_num_per_boundary[-1]
|
||||
else:
|
||||
new_boundary_idx = (
|
||||
new_boundary_idx if new_end_idx == img_boundaries_idx[new_boundary_idx] else new_boundary_idx - 1
|
||||
)
|
||||
request.num_image_end = img_num_per_boundary[new_boundary_idx]
|
||||
|
||||
request.num_image_end = img_num_per_boundary[new_boundary_idx]
|
||||
request.image_type_ids_start = np.sum(grid_thw[: request.num_image_start, 0])
|
||||
request.image_type_ids_end = np.sum(grid_thw[: request.num_image_end, 0])
|
||||
request.image_start = np.sum(np.prod(grid_thw[: request.num_image_start], axis=1))
|
||||
request.image_end = np.sum(np.prod(grid_thw[: request.num_image_end], axis=1))
|
||||
return num_new_tokens
|
||||
|
||||
def exist_prefill(self, scheduled_reqs):
|
||||
for request in scheduled_reqs:
|
||||
if request.task_type == RequestType.PREFILL:
|
||||
return True
|
||||
return False
|
||||
|
||||
def schedule(self):
|
||||
with self.lock:
|
||||
scheduled_reqs: list[Request] = []
|
||||
@@ -109,8 +198,8 @@ class ResourceManagerV1(ResourceManager):
|
||||
num_decoding_req_nums = 0
|
||||
while req_index < len(self.running) and token_budget > 0:
|
||||
request = self.running[req_index]
|
||||
if request.num_computed_tokens >= request.prompt_token_ids_len: # to be decoding
|
||||
if request.num_total_tokens > request.prompt_token_ids_len: # has generated tokens
|
||||
if request.num_computed_tokens >= request.need_prefill_tokens: # to be decoding
|
||||
if request.num_total_tokens > request.need_prefill_tokens: # has generated tokens
|
||||
request.num_computed_tokens = request.num_total_tokens - 1
|
||||
if (
|
||||
self.allocated_slots(request) - request.num_total_tokens
|
||||
@@ -143,10 +232,9 @@ class ResourceManagerV1(ResourceManager):
|
||||
token_budget -= 1
|
||||
else: # need to prefill
|
||||
llm_logger.debug(
|
||||
f"scheduler prefill task: {request} request.prompt_token_ids_len {request.prompt_token_ids_len} request.num_computed_tokens {request.num_computed_tokens}"
|
||||
f"scheduler prefill task: {request} request.need_prefill_tokens {request.need_prefill_tokens} request.num_computed_tokens {request.num_computed_tokens}"
|
||||
)
|
||||
num_new_tokens = request.prompt_token_ids_len - request.num_computed_tokens
|
||||
num_new_tokens = min(num_new_tokens, token_budget)
|
||||
num_new_tokens = self._get_num_new_tokens(request, token_budget)
|
||||
num_new_block = self.get_new_block_nums(request, num_new_tokens)
|
||||
# Allocate blocks to prefill
|
||||
if self.cache_manager.can_allocate_gpu_blocks(num_new_block):
|
||||
@@ -170,8 +258,7 @@ class ResourceManagerV1(ResourceManager):
|
||||
break
|
||||
request = self.waiting[0]
|
||||
if request.status == RequestStatus.WAITING:
|
||||
num_new_tokens = request.num_total_tokens - request.num_computed_tokens
|
||||
num_new_tokens = min(num_new_tokens, token_budget)
|
||||
num_new_tokens = self._get_num_new_tokens(request, token_budget)
|
||||
num_new_block = self.get_new_block_nums(request, num_new_tokens)
|
||||
# Allocate blocks to prefill
|
||||
if self.cache_manager.can_allocate_gpu_blocks(num_new_block):
|
||||
@@ -192,8 +279,8 @@ class ResourceManagerV1(ResourceManager):
|
||||
else:
|
||||
break
|
||||
elif request.status == RequestStatus.PREEMPTED:
|
||||
num_new_tokens = request.num_total_tokens - request.num_computed_tokens
|
||||
num_new_tokens = min(num_new_tokens, token_budget)
|
||||
request.need_prefill_tokens = request.num_total_tokens # Before preempted task rescheduled, preempted task has been sent to engine, no more tokens are output, here num_total_tokens should be static and correct
|
||||
num_new_tokens = self._get_num_new_tokens(request, token_budget)
|
||||
num_new_block = self.get_new_block_nums(request, num_new_tokens)
|
||||
# Allocate blocks to prefill
|
||||
if self.cache_manager.can_allocate_gpu_blocks(num_new_block):
|
||||
@@ -228,8 +315,9 @@ class ResourceManagerV1(ResourceManager):
|
||||
return self.real_bsz
|
||||
|
||||
def add_request(self, request: Request) -> None:
|
||||
self.waiting.append(request)
|
||||
self.requests[request.request_id] = request
|
||||
with self.lock:
|
||||
self.waiting.append(request)
|
||||
self.requests[request.request_id] = request
|
||||
|
||||
def _free_blocks(self, request: Request):
|
||||
self.cache_manager.recycle_gpu_blocks(request.block_tables)
|
||||
@@ -251,9 +339,15 @@ class ResourceManagerV1(ResourceManager):
|
||||
if request is None:
|
||||
# Invalid request ID.
|
||||
continue
|
||||
request.status = RequestStatus.FINISHED
|
||||
self.running.remove(request)
|
||||
self._free_blocks(request)
|
||||
if request in self.running: # normally run and finished
|
||||
self.running.remove(request)
|
||||
request.status = RequestStatus.FINISHED
|
||||
self._free_blocks(request)
|
||||
if request.request_id in self.to_be_rescheduled_request_id_set: # finished after preempted, blocks have been recycled.
|
||||
self.to_be_rescheduled_request_id_set.remove(request.request_id) # just remove from to_be_rescheduled_request_id_set
|
||||
if request in self.waiting: # after finished, this request still scheduled from preempted to waiting, unexpected error, should not be here
|
||||
raise RuntimeError(f"request {request.request_id} scheduled into waiting list, after finished")
|
||||
|
||||
self.tasks_list[request.idx] = None
|
||||
self.stop_flags[request.idx] = True
|
||||
del self.requests[req_id]
|
||||
|
@@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
"""
|
||||
|
||||
import time
|
||||
from copy import deepcopy
|
||||
from typing import List, Literal, Union
|
||||
from urllib.parse import urlparse
|
||||
@@ -29,6 +30,7 @@ from typing_extensions import Required, TypeAlias, TypedDict
|
||||
|
||||
from fastdeploy.input.multimodal.image import ImageMediaIO
|
||||
from fastdeploy.input.multimodal.video import VideoMediaIO
|
||||
from fastdeploy.utils import api_server_logger
|
||||
|
||||
|
||||
class VideoURL(TypedDict, total=False):
|
||||
@@ -87,12 +89,32 @@ class MultiModalPartParser:
|
||||
"""Parse Video"""
|
||||
return self.load_from_url(video_url, self.video_io)
|
||||
|
||||
def http_get_with_retry(self, url, max_retries=3, retry_delay=1, backoff_factor=2):
|
||||
"""HTTP retry"""
|
||||
|
||||
retry_cnt = 0
|
||||
delay = retry_delay
|
||||
|
||||
while retry_cnt < max_retries:
|
||||
try:
|
||||
response = requests.get(url)
|
||||
response.raise_for_status()
|
||||
return response.content
|
||||
except Exception as e:
|
||||
retry_cnt += 1
|
||||
if retry_cnt >= max_retries:
|
||||
api_server_logger.error(f"HTTP GET failed: {e}. Max retries reached")
|
||||
raise
|
||||
api_server_logger.info(f"HTTP GET failed: {e}. Start retry {retry_cnt}")
|
||||
time.sleep(delay)
|
||||
delay *= backoff_factor
|
||||
|
||||
def load_from_url(self, url, media_io):
|
||||
"""Load media from URL"""
|
||||
|
||||
parsed = urlparse(url)
|
||||
if parsed.scheme.startswith("http"):
|
||||
media_bytes = requests.get(url).content
|
||||
media_bytes = self.http_get_with_retry(url)
|
||||
return media_io.load_bytes(media_bytes)
|
||||
|
||||
if parsed.scheme.startswith("data"):
|
||||
|
@@ -45,7 +45,7 @@ from fastdeploy.metrics.metrics import (
|
||||
get_filtered_metrics,
|
||||
main_process_metrics,
|
||||
)
|
||||
from fastdeploy.metrics.trace_util import inject_to_metadata, instrument
|
||||
from fastdeploy.metrics.trace_util import fd_start_span, inject_to_metadata, instrument
|
||||
from fastdeploy.utils import (
|
||||
FlexibleArgumentParser,
|
||||
api_server_logger,
|
||||
@@ -197,6 +197,7 @@ async def create_chat_completion(request: ChatCompletionRequest):
|
||||
"""
|
||||
Create a chat completion for the provided prompt and parameters.
|
||||
"""
|
||||
api_server_logger.info(f"Chat Received request: {request.model_dump_json()}")
|
||||
if app.state.dynamic_load_weight:
|
||||
status, msg = app.state.engine_client.is_workers_alive()
|
||||
if not status:
|
||||
@@ -218,6 +219,7 @@ async def create_completion(request: CompletionRequest):
|
||||
"""
|
||||
Create a completion for the provided prompt and parameters.
|
||||
"""
|
||||
api_server_logger.info(f"Completion Received request: {request.model_dump_json()}")
|
||||
if app.state.dynamic_load_weight:
|
||||
status, msg = app.state.engine_client.is_workers_alive()
|
||||
if not status:
|
||||
@@ -269,6 +271,7 @@ def launch_api_server() -> None:
|
||||
|
||||
api_server_logger.info(f"launch Fastdeploy api server... port: {args.port}")
|
||||
api_server_logger.info(f"args: {args.__dict__}")
|
||||
fd_start_span("FD_START")
|
||||
|
||||
try:
|
||||
uvicorn.run(
|
||||
|
@@ -333,6 +333,9 @@ class CompletionRequest(BaseModel):
|
||||
echo: Optional[bool] = False
|
||||
frequency_penalty: Optional[float] = None
|
||||
logprobs: Optional[int] = None
|
||||
# For logits and logprobs post processing
|
||||
temp_scaled_logprobs: bool = False
|
||||
top_p_normalized_logprobs: bool = False
|
||||
max_tokens: Optional[int] = None
|
||||
n: int = 1
|
||||
presence_penalty: Optional[float] = None
|
||||
@@ -346,8 +349,10 @@ class CompletionRequest(BaseModel):
|
||||
top_k: Optional[int] = None
|
||||
min_p: Optional[float] = None
|
||||
user: Optional[str] = None
|
||||
extra_body: Optional[dict] = None
|
||||
return_token_ids: Optional[bool] = False
|
||||
|
||||
min_tokens: Optional[int] = None
|
||||
return_token_ids: Optional[bool] = None
|
||||
max_streaming_response_tokens: Optional[int] = None
|
||||
prompt_token_ids: Optional[List[int]] = None
|
||||
|
||||
response_format: Optional[AnyResponseFormat] = None
|
||||
@@ -373,16 +378,13 @@ class CompletionRequest(BaseModel):
|
||||
if request_id is not None:
|
||||
req_dict["request_id"] = request_id
|
||||
|
||||
# parse request model into dict, priority: request > extra_body > suffix
|
||||
# parse request model into dict
|
||||
if self.suffix is not None:
|
||||
for key, value in self.suffix.items():
|
||||
req_dict[key] = value
|
||||
for key, value in self.dict().items():
|
||||
if value is not None:
|
||||
req_dict[key] = value
|
||||
if self.extra_body is not None:
|
||||
for key, value in self.extra_body.items():
|
||||
req_dict.setdefault(key, value)
|
||||
if self.suffix is not None:
|
||||
for key, value in self.suffix.items():
|
||||
req_dict.setdefault(key, value)
|
||||
|
||||
if prompt is not None:
|
||||
req_dict["prompt"] = prompt
|
||||
@@ -462,6 +464,11 @@ class ChatCompletionRequest(BaseModel):
|
||||
frequency_penalty: Optional[float] = None
|
||||
logprobs: Optional[bool] = False
|
||||
top_logprobs: Optional[int] = 0
|
||||
|
||||
# For logits and logprobs post processing
|
||||
temp_scaled_logprobs: bool = False
|
||||
top_p_normalized_logprobs: bool = False
|
||||
|
||||
# remove max_tokens when field is removed from OpenAI API
|
||||
max_tokens: Optional[int] = Field(
|
||||
default=None,
|
||||
@@ -480,9 +487,15 @@ class ChatCompletionRequest(BaseModel):
|
||||
min_p: Optional[float] = None
|
||||
user: Optional[str] = None
|
||||
metadata: Optional[dict] = None
|
||||
extra_body: Optional[dict] = None
|
||||
return_token_ids: Optional[bool] = False
|
||||
|
||||
return_token_ids: Optional[bool] = None
|
||||
prompt_token_ids: Optional[List[int]] = None
|
||||
disable_chat_template: Optional[bool] = False
|
||||
min_tokens: Optional[int] = None
|
||||
enable_thinking: Optional[bool] = None
|
||||
reasoning_max_tokens: Optional[int] = None
|
||||
max_streaming_response_tokens: Optional[int] = None
|
||||
include_stop_str_in_output: Optional[bool] = None
|
||||
|
||||
response_format: Optional[AnyResponseFormat] = None
|
||||
guided_json: Optional[Union[str, dict, BaseModel]] = None
|
||||
@@ -510,20 +523,19 @@ class ChatCompletionRequest(BaseModel):
|
||||
|
||||
req_dict["max_tokens"] = self.max_completion_tokens or self.max_tokens
|
||||
req_dict["logprobs"] = self.top_logprobs if self.logprobs else None
|
||||
req_dict["temp_scaled_logprobs"] = self.temp_scaled_logprobs
|
||||
req_dict["top_p_normalized_logprobs"] = self.top_p_normalized_logprobs
|
||||
|
||||
# parse request model into dict, priority: request > extra_body > metadata
|
||||
for key, value in self.dict().items():
|
||||
if value is not None:
|
||||
req_dict[key] = value
|
||||
if self.extra_body is not None:
|
||||
for key, value in self.extra_body.items():
|
||||
req_dict.setdefault(key, value)
|
||||
# parse request model into dict, priority: request params > metadata params
|
||||
if self.metadata is not None:
|
||||
assert (
|
||||
"raw_request" not in self.metadata
|
||||
), "The parameter `raw_request` is not supported now, please use completion api instead."
|
||||
for key, value in self.metadata.items():
|
||||
req_dict.setdefault(key, value)
|
||||
req_dict[key] = value
|
||||
for key, value in self.dict().items():
|
||||
if value is not None:
|
||||
req_dict[key] = value
|
||||
|
||||
if "prompt_token_ids" in req_dict:
|
||||
if "messages" in req_dict:
|
||||
@@ -531,6 +543,11 @@ class ChatCompletionRequest(BaseModel):
|
||||
else:
|
||||
assert len(self.messages) > 0
|
||||
|
||||
# If disable_chat_template is set, then the first message in messages will be used as the prompt.
|
||||
if self.disable_chat_template:
|
||||
req_dict["prompt"] = req_dict["messages"][0]["content"]
|
||||
del req_dict["messages"]
|
||||
|
||||
guided_json_object = None
|
||||
if self.response_format is not None:
|
||||
if self.response_format.type == "json_object":
|
||||
|
@@ -124,11 +124,21 @@ class OpenAIServingChat:
|
||||
previous_num_tokens = 0
|
||||
num_prompt_tokens = 0
|
||||
num_choices = 1
|
||||
max_streaming_response_tokens = 1
|
||||
enable_thinking = None
|
||||
include_stop_str_in_output = False
|
||||
if request.metadata is not None and request.metadata.get("max_streaming_response_tokens", 1) > 1:
|
||||
max_streaming_response_tokens = request.metadata["max_streaming_response_tokens"]
|
||||
max_streaming_response_tokens = (
|
||||
request.max_streaming_response_tokens
|
||||
if request.max_streaming_response_tokens is not None
|
||||
else (request.metadata or {}).get("max_streaming_response_tokens", 1)
|
||||
) # dierctly passed & passed in metadata
|
||||
enable_thinking = (
|
||||
request.enable_thinking
|
||||
if request.enable_thinking is not None
|
||||
else (request.metadata or {}).get("enable_thinking")
|
||||
)
|
||||
include_stop_str_in_output = (
|
||||
request.include_stop_str_in_output
|
||||
if request.include_stop_str_in_output is not None
|
||||
else (request.metadata or {}).get("include_stop_str_in_output", False)
|
||||
)
|
||||
|
||||
stream_options = request.stream_options
|
||||
if stream_options is None:
|
||||
@@ -149,12 +159,6 @@ class OpenAIServingChat:
|
||||
dealer.write([b"", request_id.encode("utf-8")])
|
||||
choices = []
|
||||
current_waiting_time = 0
|
||||
if request.metadata is not None:
|
||||
enable_thinking = request.metadata.get("enable_thinking")
|
||||
include_stop_str_in_output = request.metadata.get("include_stop_str_in_output", False)
|
||||
enable_return_token_ids = request.return_token_ids or (
|
||||
request.extra_body is not None and request.extra_body.get("return_token_ids", False)
|
||||
)
|
||||
while num_choices > 0:
|
||||
try:
|
||||
raw_data = await asyncio.wait_for(dealer.read(), timeout=10)
|
||||
@@ -204,7 +208,7 @@ class OpenAIServingChat:
|
||||
completion_token_ids=None,
|
||||
),
|
||||
)
|
||||
if enable_return_token_ids:
|
||||
if request.return_token_ids:
|
||||
choice.delta.prompt_token_ids = list(prompt_token_ids)
|
||||
chunk = ChatCompletionStreamResponse(
|
||||
id=request_id,
|
||||
@@ -221,6 +225,7 @@ class OpenAIServingChat:
|
||||
prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=num_cached_tokens),
|
||||
)
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n"
|
||||
api_server_logger.info(f"Chat Streaming response send_idx 0: {chunk.model_dump_json()}")
|
||||
first_iteration = False
|
||||
|
||||
output = res["outputs"]
|
||||
@@ -254,6 +259,7 @@ class OpenAIServingChat:
|
||||
logprobs=logprobs_res,
|
||||
arrival_time=arrival_time,
|
||||
)
|
||||
|
||||
if res["finished"]:
|
||||
num_choices -= 1
|
||||
work_process_metrics.e2e_request_latency.observe(
|
||||
@@ -274,7 +280,7 @@ class OpenAIServingChat:
|
||||
if res.get("error_msg") is not None and "Recover" in res["error_msg"]:
|
||||
choice.finish_reason = "recover_stop"
|
||||
|
||||
if enable_return_token_ids:
|
||||
if request.return_token_ids:
|
||||
choice.delta.completion_token_ids = list(output["token_ids"])
|
||||
if include_continuous_usage:
|
||||
chunk.usage = UsageInfo(
|
||||
@@ -287,13 +293,11 @@ class OpenAIServingChat:
|
||||
if len(choices) == max_streaming_response_tokens or res["finished"]:
|
||||
chunk.choices = choices
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
# 打印尾包
|
||||
if res["finished"]:
|
||||
api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}")
|
||||
choices = []
|
||||
|
||||
if choices:
|
||||
chunk.choices = choices
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
choices = []
|
||||
|
||||
if include_usage:
|
||||
completion_tokens = previous_num_tokens
|
||||
usage = UsageInfo(
|
||||
@@ -330,11 +334,17 @@ class OpenAIServingChat:
|
||||
"""
|
||||
created_time = int(time.time())
|
||||
final_res = None
|
||||
enable_thinking = None
|
||||
include_stop_str_in_output = False
|
||||
enable_return_token_ids = request.return_token_ids or (
|
||||
request.extra_body is not None and request.extra_body.get("return_token_ids", False)
|
||||
enable_thinking = (
|
||||
request.enable_thinking
|
||||
if request.enable_thinking is not None
|
||||
else (request.metadata or {}).get("enable_thinking")
|
||||
)
|
||||
include_stop_str_in_output = (
|
||||
request.include_stop_str_in_output
|
||||
if request.include_stop_str_in_output is not None
|
||||
else (request.metadata or {}).get("include_stop_str_in_output", False)
|
||||
)
|
||||
|
||||
try:
|
||||
dealer = await aiozmq.create_zmq_stream(zmq.DEALER, connect=f"ipc:///dev/shm/router_{self.pid}.ipc")
|
||||
dealer.write([b"", request_id.encode("utf-8")])
|
||||
@@ -363,9 +373,6 @@ class OpenAIServingChat:
|
||||
for data in response:
|
||||
if data.get("error_code", 200) != 200:
|
||||
raise ValueError("{}".format(data["error_msg"]))
|
||||
if request.metadata is not None:
|
||||
enable_thinking = request.metadata.get("enable_thinking")
|
||||
include_stop_str_in_output = request.metadata.get("include_stop_str_in_output", False)
|
||||
data = self.engine_client.data_processor.process_response_dict(
|
||||
data,
|
||||
stream=False,
|
||||
@@ -407,8 +414,8 @@ class OpenAIServingChat:
|
||||
content=output["text"],
|
||||
reasoning_content=output.get("reasoning_content"),
|
||||
tool_calls=output.get("tool_call_content"),
|
||||
prompt_token_ids=prompt_token_ids if enable_return_token_ids else None,
|
||||
completion_token_ids=(completion_token_ids if enable_return_token_ids else None),
|
||||
prompt_token_ids=prompt_token_ids if request.return_token_ids else None,
|
||||
completion_token_ids=completion_token_ids if request.return_token_ids else None,
|
||||
)
|
||||
logprobs_full_res = None
|
||||
if logprob_contents:
|
||||
@@ -442,13 +449,15 @@ class OpenAIServingChat:
|
||||
prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=final_res.get("num_cached_tokens", 0)),
|
||||
)
|
||||
work_process_metrics.e2e_request_latency.observe(time.time() - final_res["metrics"]["request_start_time"])
|
||||
return ChatCompletionResponse(
|
||||
res = ChatCompletionResponse(
|
||||
id=request_id,
|
||||
created=created_time,
|
||||
model=model_name,
|
||||
choices=choices,
|
||||
usage=usage,
|
||||
)
|
||||
api_server_logger.info(f"Chat response: {res.model_dump_json()}")
|
||||
return res
|
||||
|
||||
def build_logprobs_response(
|
||||
self,
|
||||
|
@@ -190,8 +190,7 @@ class OpenAIServingCompletion:
|
||||
valid_results[rid] = data
|
||||
num_choices -= 1
|
||||
break
|
||||
|
||||
return self.request_output_to_completion_response(
|
||||
res = self.request_output_to_completion_response(
|
||||
final_res_batch=valid_results,
|
||||
request=request,
|
||||
request_id=request_id,
|
||||
@@ -200,6 +199,8 @@ class OpenAIServingCompletion:
|
||||
prompt_batched_token_ids=prompt_batched_token_ids,
|
||||
completion_batched_token_ids=completion_batched_token_ids,
|
||||
)
|
||||
api_server_logger.info(f"Completion response: {res.model_dump_json()}")
|
||||
return res
|
||||
except Exception as e:
|
||||
api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True)
|
||||
raise
|
||||
@@ -228,9 +229,11 @@ class OpenAIServingCompletion:
|
||||
output_tokens = [0] * num_choices
|
||||
inference_start_time = [0] * num_choices
|
||||
first_iteration = [True] * num_choices
|
||||
max_streaming_response_tokens = 1
|
||||
if request.suffix is not None and request.suffix.get("max_streaming_response_tokens", 1) > 1:
|
||||
max_streaming_response_tokens = request.suffix["max_streaming_response_tokens"]
|
||||
max_streaming_response_tokens = (
|
||||
request.max_streaming_response_tokens
|
||||
if request.max_streaming_response_tokens is not None
|
||||
else (request.suffix or {}).get("max_streaming_response_tokens", 1)
|
||||
) # dierctly passed & passed in suffix
|
||||
choices = []
|
||||
chunk = CompletionStreamResponse(
|
||||
id=request_id,
|
||||
@@ -238,9 +241,6 @@ class OpenAIServingCompletion:
|
||||
model=model_name,
|
||||
choices=choices,
|
||||
)
|
||||
enable_return_token_ids = request.return_token_ids or (
|
||||
request.extra_body is not None and request.extra_body.get("return_token_ids", False)
|
||||
)
|
||||
current_waiting_time = 0
|
||||
while num_choices > 0:
|
||||
try:
|
||||
@@ -264,7 +264,7 @@ class OpenAIServingCompletion:
|
||||
raise ValueError("{}".format(res["error_msg"]))
|
||||
|
||||
if first_iteration[idx]:
|
||||
if enable_return_token_ids:
|
||||
if request.return_token_ids:
|
||||
chunk = CompletionStreamResponse(
|
||||
id=request_id,
|
||||
created=created_time,
|
||||
@@ -273,14 +273,15 @@ class OpenAIServingCompletion:
|
||||
CompletionResponseStreamChoice(
|
||||
index=idx,
|
||||
text="",
|
||||
prompt_token_ids=(
|
||||
list(prompt_batched_token_ids[idx]) if enable_return_token_ids else None
|
||||
),
|
||||
prompt_token_ids=list(prompt_batched_token_ids[idx]),
|
||||
completion_token_ids=None,
|
||||
)
|
||||
],
|
||||
)
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
api_server_logger.info(
|
||||
f"Completion Streaming response send_idx 0: {chunk.model_dump_json()}"
|
||||
)
|
||||
first_iteration[idx] = False
|
||||
|
||||
self.engine_client.data_processor.process_response_dict(res, stream=True)
|
||||
@@ -297,7 +298,7 @@ class OpenAIServingCompletion:
|
||||
index=idx,
|
||||
text=output["text"],
|
||||
prompt_token_ids=None,
|
||||
completion_token_ids=(output.get("token_ids") if enable_return_token_ids else None),
|
||||
completion_token_ids=output.get("token_ids") if request.return_token_ids else None,
|
||||
tool_calls=output.get("tool_call_content"),
|
||||
reasoning_content=output.get("reasoning_content"),
|
||||
arrival_time=arrival_time,
|
||||
@@ -316,6 +317,16 @@ class OpenAIServingCompletion:
|
||||
|
||||
output_tokens[idx] += 1
|
||||
|
||||
send_idx = output.get("send_idx")
|
||||
# 只有当 send_idx 明确为 0 时才记录日志
|
||||
if send_idx == 0 and not request.return_token_ids:
|
||||
chunk_temp = chunk
|
||||
chunk_temp.choices = choices
|
||||
api_server_logger.info(
|
||||
f"Completion Streaming response send_idx 0: {chunk_temp.model_dump_json()}"
|
||||
)
|
||||
del chunk_temp
|
||||
|
||||
if len(choices) == max_streaming_response_tokens or res["finished"]:
|
||||
chunk = CompletionStreamResponse(
|
||||
id=request_id,
|
||||
@@ -340,10 +351,7 @@ class OpenAIServingCompletion:
|
||||
),
|
||||
)
|
||||
yield f"data: {usage_chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
if choices:
|
||||
chunk.choices = choices
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
choices = []
|
||||
api_server_logger.info(f"Completion Streaming response last send: {chunk.model_dump_json()}")
|
||||
|
||||
except Exception as e:
|
||||
yield f"data: {ErrorResponse(message=str(e), code=400).model_dump_json(exclude_unset=True)}\n\n"
|
||||
@@ -366,9 +374,6 @@ class OpenAIServingCompletion:
|
||||
choices: List[CompletionResponseChoice] = []
|
||||
num_prompt_tokens = 0
|
||||
num_generated_tokens = 0
|
||||
enable_return_token_ids = request.return_token_ids or (
|
||||
request.extra_body is not None and request.extra_body.get("return_token_ids", False)
|
||||
)
|
||||
|
||||
for idx in range(len(final_res_batch)):
|
||||
final_res = final_res_batch[idx]
|
||||
@@ -394,8 +399,8 @@ class OpenAIServingCompletion:
|
||||
token_ids=token_ids,
|
||||
index=len(choices),
|
||||
text=output_text,
|
||||
prompt_token_ids=prompt_token_ids if enable_return_token_ids else None,
|
||||
completion_token_ids=(completion_token_ids if enable_return_token_ids else None),
|
||||
prompt_token_ids=prompt_token_ids if request.return_token_ids else None,
|
||||
completion_token_ids=completion_token_ids if request.return_token_ids else None,
|
||||
reasoning_content=output.get("reasoning_content"),
|
||||
tool_calls=output.get("tool_call_content"),
|
||||
logprobs=None,
|
||||
|
@@ -80,6 +80,8 @@ environment_variables: dict[str, Callable[[], Any]] = {
|
||||
"EXPORTER_OTLP_HEADERS": lambda: os.getenv("EXPORTER_OTLP_HEADERS"),
|
||||
# enable kv cache block scheduler v1 (no need for kv_cache_ratio)
|
||||
"ENABLE_V1_KVCACHE_SCHEDULER": lambda: int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "0")),
|
||||
# set trace attribute job_id.
|
||||
"FD_JOB_ID": lambda: os.getenv("FD_JOB_ID"),
|
||||
}
|
||||
|
||||
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import json
|
||||
import os
|
||||
|
||||
from fastapi import FastAPI
|
||||
from opentelemetry import trace
|
||||
@@ -176,7 +177,22 @@ def start_span(span_name, request, kind=trace.SpanKind.CLIENT):
|
||||
return
|
||||
# extract Trace context from request.metadata.trace_carrier
|
||||
ctx = extract_from_metadata(request)
|
||||
with tracer.start_as_current_span(span_name, context=ctx, kind=kind):
|
||||
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
|
||||
span.set_attribute("job_id", os.getenv("FD_JOB_ID", default="null"))
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def fd_start_span(span_name, kind=trace.SpanKind.CLIENT):
|
||||
"""
|
||||
when fd start, start a new span show start success
|
||||
"""
|
||||
try:
|
||||
if not traces_enable:
|
||||
return
|
||||
with tracer.start_as_current_span(span_name, kind=kind) as span:
|
||||
span.set_attribute("job_id", os.getenv("FD_JOB_ID", default="null"))
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
@@ -191,7 +207,8 @@ def start_span_request(span_name, request, kind=trace.SpanKind.CLIENT):
|
||||
return
|
||||
# extract Trace context from request.metadata.trace_carrier
|
||||
ctx = extract_from_request(request)
|
||||
with tracer.start_as_current_span(span_name, context=ctx, kind=kind):
|
||||
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
|
||||
span.set_attribute("job_id", os.getenv("FD_JOB_ID", default="null"))
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
@@ -23,7 +23,7 @@ from paddle.distributed import fleet
|
||||
|
||||
from fastdeploy.config import FDConfig
|
||||
|
||||
from .utils import get_tensor
|
||||
from .utils import get_tensor, temporary_dtype
|
||||
|
||||
|
||||
class ParallelLMHead(nn.Layer):
|
||||
@@ -38,6 +38,7 @@ class ParallelLMHead(nn.Layer):
|
||||
embedding_dim: int,
|
||||
prefix: str = "",
|
||||
with_bias: bool = False,
|
||||
dtype: str = None,
|
||||
) -> None:
|
||||
"""
|
||||
Parallelized LMhead.
|
||||
@@ -50,6 +51,7 @@ class ParallelLMHead(nn.Layer):
|
||||
embedding_dim (int): size of hidden state.
|
||||
prefix (str): The name of current layer. Defaults to "".
|
||||
with_bias (bool): whether to have bias. Default: False.
|
||||
dtype (str): The dtype of weight. Defalut: None.
|
||||
"""
|
||||
super(ParallelLMHead, self).__init__()
|
||||
self.weight_key: str = prefix + ".weight"
|
||||
@@ -62,37 +64,37 @@ class ParallelLMHead(nn.Layer):
|
||||
|
||||
ColumnParallelLinear = fleet.meta_parallel.ColumnParallelLinear
|
||||
RowParallelLinear = fleet.meta_parallel.RowParallelLinear
|
||||
|
||||
self.dtype = "float32" if fd_config.model_config.lm_head_fp32 else dtype
|
||||
self.tie_word_embeddings: bool = fd_config.model_config.tie_word_embeddings
|
||||
|
||||
if self.use_ep:
|
||||
self.weight = self.create_parameter(
|
||||
shape=[embedding_dim, num_embeddings],
|
||||
dtype=paddle.get_default_dtype(),
|
||||
is_bias=False,
|
||||
)
|
||||
else:
|
||||
if self.column_cut:
|
||||
need_gather = True
|
||||
self.linear = ColumnParallelLinear(
|
||||
embedding_dim,
|
||||
num_embeddings,
|
||||
mp_group=fleet.get_hybrid_communicate_group().get_model_parallel_group(),
|
||||
weight_attr=None,
|
||||
has_bias=True if self.bias_key is not None else False,
|
||||
gather_output=need_gather,
|
||||
fuse_matmul_bias=False, # False diff更小
|
||||
with temporary_dtype(self.dtype):
|
||||
if self.use_ep:
|
||||
self.weight = self.create_parameter(
|
||||
shape=[embedding_dim, num_embeddings],
|
||||
dtype=paddle.get_default_dtype(),
|
||||
is_bias=False,
|
||||
)
|
||||
else:
|
||||
self.linear = RowParallelLinear(
|
||||
embedding_dim,
|
||||
num_embeddings,
|
||||
mp_group=fleet.get_hybrid_communicate_group().get_model_parallel_group(),
|
||||
weight_attr=None,
|
||||
has_bias=True if self.bias_key is not None else False,
|
||||
input_is_parallel=False,
|
||||
fuse_matmul_bias=False, # False diff更小
|
||||
)
|
||||
if self.column_cut:
|
||||
need_gather = True
|
||||
self.linear = ColumnParallelLinear(
|
||||
embedding_dim,
|
||||
num_embeddings,
|
||||
mp_group=fleet.get_hybrid_communicate_group().get_model_parallel_group(),
|
||||
weight_attr=None,
|
||||
has_bias=True if self.bias_key is not None else False,
|
||||
gather_output=need_gather,
|
||||
fuse_matmul_bias=False, # False diff更小
|
||||
)
|
||||
else:
|
||||
self.linear = RowParallelLinear(
|
||||
embedding_dim,
|
||||
num_embeddings,
|
||||
mp_group=fleet.get_hybrid_communicate_group().get_model_parallel_group(),
|
||||
weight_attr=None,
|
||||
has_bias=True if self.bias_key is not None else False,
|
||||
input_is_parallel=False,
|
||||
fuse_matmul_bias=False, # False diff更小
|
||||
)
|
||||
|
||||
def load_state_dict(self, state_dict: Dict[str, paddle.Tensor | np.ndarray]):
|
||||
"""
|
||||
@@ -103,20 +105,20 @@ class ParallelLMHead(nn.Layer):
|
||||
"""
|
||||
|
||||
if self.use_ep:
|
||||
self.weight.set_value(get_tensor(state_dict.pop(self.weight_key)).astype(paddle.get_default_dtype()))
|
||||
self.weight.set_value(get_tensor(state_dict.pop(self.weight_key)).astype(self.weight.dtype))
|
||||
else:
|
||||
if self.tie_word_embeddings:
|
||||
self.linear.weight.set_value(
|
||||
get_tensor(state_dict.pop(self.weight_key)).astype(paddle.get_default_dtype()).transpose([1, 0])
|
||||
get_tensor(state_dict.pop(self.weight_key)).astype(self.linear.weight.dtype).transpose([1, 0])
|
||||
)
|
||||
else:
|
||||
weight_tensor = get_tensor(state_dict.pop(self.weight_key)).astype(paddle.get_default_dtype())
|
||||
weight_tensor = get_tensor(state_dict.pop(self.weight_key)).astype(self.linear.weight.dtype)
|
||||
if self.linear.weight.shape != weight_tensor.shape:
|
||||
weight_tensor = weight_tensor.transpose([1, 0])
|
||||
self.linear.weight.set_value(weight_tensor)
|
||||
|
||||
if self.bias_key is not None:
|
||||
bias = get_tensor(state_dict.pop(self.bias_key)).astype(paddle.get_default_dtype())
|
||||
bias = get_tensor(state_dict.pop(self.bias_key)).astype(self.linear.bias.dtype)
|
||||
self.linear.bias.set_value(bias)
|
||||
|
||||
def forward(self, input: paddle.Tensor) -> paddle.Tensor:
|
||||
@@ -131,7 +133,7 @@ class ParallelLMHead(nn.Layer):
|
||||
"""
|
||||
logits = input
|
||||
if self.use_ep:
|
||||
logits = paddle.matmul(logits, self.weight)
|
||||
logits = paddle.matmul(logits.astype(self.weight.dtype), self.weight)
|
||||
else:
|
||||
logits = self.linear(logits)
|
||||
logits = self.linear(logits.astype(self.linear.weight.dtype))
|
||||
return logits
|
||||
|
@@ -15,7 +15,7 @@
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
from typing import Dict, Optional
|
||||
|
||||
import paddle
|
||||
|
||||
@@ -46,3 +46,6 @@ class SamplingMetadata:
|
||||
max_num_logprobs: Optional[int] = None
|
||||
prompt_ids: Optional[paddle.Tensor] = None
|
||||
prompt_lens: Optional[paddle.Tensor] = None
|
||||
temp_scaled_logprobs: Optional[paddle.Tensor] = None
|
||||
top_p_normalized_logprobs: Optional[paddle.Tensor] = None
|
||||
share_inputs: Optional[Dict[str, paddle.Tensor]] = None
|
||||
|
@@ -37,6 +37,18 @@ from fastdeploy.platforms import current_platform
|
||||
from fastdeploy.worker.output import LogprobsTensors, SamplerOutput
|
||||
|
||||
|
||||
def top_p_normalize_probs_paddle(
|
||||
probs: paddle.Tensor,
|
||||
top_ps: paddle.Tensor,
|
||||
):
|
||||
probs_idx = probs.argsort(axis=-1, descending=True)
|
||||
probs_sort = paddle.take_along_axis(probs, probs_idx, axis=-1)
|
||||
probs_sum = paddle.cumsum(probs_sort, axis=-1)
|
||||
probs_sort = paddle.where((probs_sum - probs_sort) > top_ps, paddle.zeros_like(probs_sort), probs_sort)
|
||||
probs_sort.divide_(probs_sort.sum(axis=-1, keepdim=True))
|
||||
return paddle.put_along_axis(paddle.zeros_like(probs_sort), probs_idx, probs_sort, -1)
|
||||
|
||||
|
||||
class SamplerProcessor:
|
||||
"""
|
||||
SamplingProcessor for guided decoding.
|
||||
@@ -194,9 +206,45 @@ class Sampler(nn.Layer):
|
||||
"""pre process before running"""
|
||||
self.processor.pre_process(skip_idx_list)
|
||||
|
||||
def compute_logprobs(self, logits: paddle.Tensor) -> paddle.Tensor:
|
||||
def compute_logprobs(
|
||||
self,
|
||||
logits: paddle.Tensor,
|
||||
sampling_metadata: SamplingMetadata,
|
||||
) -> paddle.Tensor:
|
||||
""" """
|
||||
return F.log_softmax(logits, axis=-1)
|
||||
last_logits = logits
|
||||
real_bsz = last_logits.shape[0]
|
||||
temp_scaled_logprobs = sampling_metadata.temp_scaled_logprobs
|
||||
top_p_normalized_logprobs = sampling_metadata.top_p_normalized_logprobs
|
||||
share_inputs = sampling_metadata.share_inputs
|
||||
if temp_scaled_logprobs is not None:
|
||||
real_bsz_temp_scaled = temp_scaled_logprobs[:real_bsz]
|
||||
temperature = sampling_metadata.temperature[:real_bsz]
|
||||
temp_temperature = paddle.where(real_bsz_temp_scaled, temperature, paddle.ones_like(temperature))
|
||||
last_logits = last_logits / temp_temperature
|
||||
|
||||
last_logprobs = F.log_softmax(last_logits, axis=-1)
|
||||
top_p_logprob = None
|
||||
top_p_req_mask = None
|
||||
|
||||
if top_p_normalized_logprobs is not None and share_inputs is not None:
|
||||
seq_lens_this_time = share_inputs["seq_lens_this_time"].reshape([-1, 1])[:real_bsz]
|
||||
seq_lens_encoder = share_inputs["seq_lens_encoder"].reshape([-1, 1])[:real_bsz]
|
||||
seq_lens_decoder = share_inputs["seq_lens_decoder"].reshape([-1, 1])[:real_bsz]
|
||||
seq_lens_time_sum = seq_lens_this_time + seq_lens_encoder + seq_lens_decoder
|
||||
real_req_mask = seq_lens_time_sum > 0
|
||||
top_p_req_mask = paddle.logical_and(top_p_normalized_logprobs[:real_bsz], real_req_mask)
|
||||
real_req_top_p = sampling_metadata.top_p[:real_bsz]
|
||||
# Normalize logprobs if top_p normalization is enabled
|
||||
# NOTE: only normalize logprobs when top_p is set and not equal to 1.0
|
||||
top_p_req_mask = paddle.logical_and(top_p_req_mask, real_req_top_p != 1.0)
|
||||
if top_p_req_mask.any():
|
||||
probs = F.softmax(last_logits, axis=-1)
|
||||
probs = top_p_normalize_probs_paddle(probs, real_req_top_p)
|
||||
top_p_logprob = paddle.log(probs)
|
||||
if top_p_logprob is not None:
|
||||
last_logprobs = paddle.where(top_p_req_mask, top_p_logprob, last_logprobs)
|
||||
return last_logprobs
|
||||
|
||||
def gather_logprobs(
|
||||
self,
|
||||
@@ -221,6 +269,7 @@ class Sampler(nn.Layer):
|
||||
Sampled token rank tensor, (num tokens)
|
||||
"""
|
||||
assert token_ids.dtype == paddle.int64
|
||||
logprobs.clip_(min=paddle.finfo(logprobs.dtype).min)
|
||||
# Get with the logprob of the prompt or sampled token.
|
||||
token_logprobs = paddle.take_along_axis(logprobs, token_ids, axis=-1)
|
||||
|
||||
@@ -247,7 +296,7 @@ class Sampler(nn.Layer):
|
||||
""" """
|
||||
num_logprobs = sampling_metadata.max_num_logprobs
|
||||
if num_logprobs is not None:
|
||||
raw_logprobs = self.compute_logprobs(logits)
|
||||
raw_logprobs = self.compute_logprobs(logits, sampling_metadata)
|
||||
|
||||
logits = self.processor.apply_token_mask(logits, skip_idx_list)
|
||||
|
||||
|
@@ -15,6 +15,7 @@
|
||||
"""
|
||||
|
||||
import functools
|
||||
from contextlib import contextmanager
|
||||
from typing import Tuple, Union
|
||||
|
||||
import numpy as np
|
||||
@@ -377,3 +378,15 @@ def create_empty_tensor(shape: Tuple[int, ...], dtype: Union[paddle.dtype, str])
|
||||
paddle.Tensor: An empty tensor with the specified shape and data type.
|
||||
"""
|
||||
return paddle.empty(list(shape), dtype=dtype)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def temporary_dtype(dtype: str):
|
||||
"""Temporarily set Paddle default dtype"""
|
||||
orig_dtype = paddle.get_default_dtype()
|
||||
try:
|
||||
if dtype is not None and dtype == "float32":
|
||||
paddle.set_default_dtype(dtype)
|
||||
yield
|
||||
finally:
|
||||
paddle.set_default_dtype(orig_dtype)
|
||||
|
@@ -613,7 +613,7 @@ class DeepseekV3ForCausalLM(ModelForCasualLM):
|
||||
def compute_logits(self, hidden_states: paddle.Tensor):
|
||||
""" """
|
||||
logits = self.lm_head(hidden_states)
|
||||
logits = paddle.cast(logits, paddle.float32)
|
||||
logits = logits.astype(paddle.float32)
|
||||
logits[:, self.ori_vocab_size :] = -float("inf")
|
||||
return logits
|
||||
|
||||
|
@@ -412,13 +412,13 @@ class Ernie4_5_MoeForCausalLM(ModelForCasualLM):
|
||||
"""
|
||||
self.ernie.load_state_dict(state_dict)
|
||||
if self.tie_word_embeddings:
|
||||
self.lm_head.linear.weight.set_value(self.ernie.embed_tokens.embeddings.weight.transpose([1, 0]))
|
||||
self.lm_head.load_state_dict({self.lm_head.weight_key: self.ernie.embed_tokens.embeddings.weight})
|
||||
else:
|
||||
self.lm_head.load_state_dict(state_dict)
|
||||
|
||||
def compute_logits(self, hidden_states: paddle.Tensor):
|
||||
logits = self.lm_head(hidden_states)
|
||||
logits = paddle.cast(logits, paddle.float32)
|
||||
logits = logits.astype(paddle.float32)
|
||||
logits[:, self.ori_vocab_size :] = -float("inf")
|
||||
|
||||
return logits
|
||||
|
@@ -363,7 +363,7 @@ class Ernie4_5_MTPForCausalLM(ModelForCasualLM):
|
||||
compute logits
|
||||
"""
|
||||
logits = self.lm_head(hidden_states)
|
||||
logits = paddle.cast(logits, paddle.float32)
|
||||
logits = logits.astype(paddle.float32)
|
||||
logits[:, self.ori_vocab_size :] = -float("inf")
|
||||
|
||||
return logits
|
||||
|
@@ -570,13 +570,13 @@ class Ernie4_5_VLMoeForConditionalGeneration(ModelForCasualLM):
|
||||
self.vision_model.load_state_dict(state_dict)
|
||||
self.resampler_model.load_state_dict(state_dict)
|
||||
if self.tie_word_embeddings:
|
||||
self.lm_head.linear.weight.set_value(self.ernie.embed_tokens.embeddings.weight.transpose([1, 0]))
|
||||
self.lm_head.load_state_dict({self.lm_head.weight_key: self.ernie.embed_tokens.embeddings.weight})
|
||||
else:
|
||||
self.lm_head.load_state_dict(state_dict)
|
||||
|
||||
def compute_logits(self, hidden_states: paddle.Tensor):
|
||||
logits = self.lm_head(hidden_states)
|
||||
logits = paddle.cast(logits, paddle.float32)
|
||||
logits = logits.astype(paddle.float32)
|
||||
logits[:, self.ori_vocab_size :] = -float("inf")
|
||||
|
||||
return logits
|
||||
|
@@ -326,7 +326,7 @@ class Qwen2ForCausalLM(ModelForCasualLM):
|
||||
def compute_logits(self, hidden_states: paddle.Tensor):
|
||||
""" """
|
||||
logits = self.lm_head(hidden_states)
|
||||
logits = paddle.cast(logits, paddle.float32)
|
||||
logits = logits.astype(paddle.float32)
|
||||
logits[:, self.ori_vocab_size :] = -float("inf")
|
||||
|
||||
return logits
|
||||
|
@@ -257,14 +257,14 @@ class Qwen3ForCausalLM(ModelForCasualLM):
|
||||
"""
|
||||
self.model.load_state_dict(state_dict)
|
||||
if self.tie_word_embeddings:
|
||||
self.lm_head.linear.weight.set_value(self.model.embed_tokens.embeddings.weight.transpose([1, 0]))
|
||||
self.lm_head.load_state_dict({self.lm_head.weight_key: self.model.embed_tokens.embeddings.weight})
|
||||
else:
|
||||
self.lm_head.load_state_dict(state_dict)
|
||||
|
||||
def compute_logits(self, hidden_states: paddle.Tensor):
|
||||
""" """
|
||||
logits = self.lm_head(hidden_states)
|
||||
logits = paddle.cast(logits, paddle.float32)
|
||||
logits = logits.astype(paddle.float32)
|
||||
logits[:, self.ori_vocab_size :] = -float("inf")
|
||||
|
||||
return logits
|
||||
|
@@ -298,7 +298,7 @@ class Qwen3MoeForCausalLM(ModelForCasualLM):
|
||||
def compute_logits(self, hidden_states: paddle.Tensor):
|
||||
""" """
|
||||
logits = self.lm_head(hidden_states)
|
||||
logits = paddle.cast(logits, paddle.float32)
|
||||
logits = logits.astype(paddle.float32)
|
||||
logits[:, self.ori_vocab_size :] = -float("inf")
|
||||
|
||||
return logits
|
||||
|
@@ -315,6 +315,11 @@ class TokenProcessor:
|
||||
scores = self.output_scores[: batch * (K + 1)].numpy().reshape([batch, K + 1])[:, : (K + 1)]
|
||||
ranks = self.output_ranks[:batch].numpy()
|
||||
batch_result = list()
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
need_to_be_reschedule_req_ids = list(self.resource_manager.to_be_rescheduled_request_id_set)
|
||||
for request_id in need_to_be_reschedule_req_ids:
|
||||
if self.resource_manager.requests[request_id].idx >= (batch - 1): # No more token generated for preempted request
|
||||
self.resource_manager.reschedule_preempt_task(request_id)
|
||||
for i in range(batch):
|
||||
if self.resource_manager.stop_flags[i]:
|
||||
continue
|
||||
@@ -326,6 +331,9 @@ class TokenProcessor:
|
||||
if recovery_stop:
|
||||
llm_logger.info(f"recovery stop signal found at task {task_id}")
|
||||
if not recovery_stop and token_id < 0:
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
if task_id in self.resource_manager.to_be_rescheduled_request_id_set:
|
||||
self.resource_manager.reschedule_preempt_task(task_id)
|
||||
continue
|
||||
|
||||
if task.get("prefill_chunk_info", None) is not None:
|
||||
@@ -382,6 +390,7 @@ class TokenProcessor:
|
||||
self.tokens_counter[task_id] += 1
|
||||
if token_id != RECOVERY_STOP_SIGNAL:
|
||||
result.outputs.token_ids.append(token_id)
|
||||
task.output_token_ids.append(token_id)
|
||||
result.outputs.logprob = float(scores[i, 0])
|
||||
# Construct top_logprobs
|
||||
topk_token_ids = tokens[i, :].tolist()
|
||||
@@ -432,6 +441,11 @@ class TokenProcessor:
|
||||
tokens = tokens[2 : batch + 2]
|
||||
|
||||
batch_result = list()
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
need_to_be_reschedule_req_ids = list(self.resource_manager.to_be_rescheduled_request_id_set)
|
||||
for request_id in need_to_be_reschedule_req_ids:
|
||||
if self.resource_manager.requests[request_id].idx >= (batch - 1): # No more token generated for preempted request
|
||||
self.resource_manager.reschedule_preempt_task(request_id)
|
||||
for i in range(batch):
|
||||
if self.resource_manager.stop_flags[i]:
|
||||
continue
|
||||
@@ -458,6 +472,9 @@ class TokenProcessor:
|
||||
if recovery_stop:
|
||||
llm_logger.info(f"recovery stop signal found at task {task_id}")
|
||||
if not recovery_stop and token_id < 0:
|
||||
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
|
||||
if task_id in self.resource_manager.to_be_rescheduled_request_id_set:
|
||||
self.resource_manager.reschedule_preempt_task(task_id)
|
||||
continue
|
||||
|
||||
if task.get("prefill_chunk_info", None) is not None:
|
||||
|
@@ -86,6 +86,7 @@ class BaseRLModel(nn.Layer):
|
||||
super(BaseRLModel, self).__init__()
|
||||
self.infer_to_train_mapping = {}
|
||||
self.fd_config = None
|
||||
self._mappings_built = False
|
||||
|
||||
@classmethod
|
||||
def name(cls) -> str:
|
||||
@@ -142,6 +143,12 @@ class Ernie4_5_MoeForCausalLMRL(Ernie4_5_MoeForCausalLM, BaseRLModel):
|
||||
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
|
||||
if self._mappings_built:
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
self.infer_to_train_mapping = {}
|
||||
self._mappings_built = True
|
||||
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
@@ -215,6 +222,11 @@ class Ernie4_5_VLMoeForConditionalGenerationRL(Ernie4_5_VLMoeForConditionalGener
|
||||
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
|
||||
if self._mappings_built:
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
self.infer_to_train_mapping = {}
|
||||
self._mappings_built = True
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
@@ -316,6 +328,11 @@ class Qwen2ForCausalLMRL(Qwen2ForCausalLM, BaseRLModel):
|
||||
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
|
||||
if self._mappings_built:
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
self.infer_to_train_mapping = {}
|
||||
self._mappings_built = True
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
@@ -360,6 +377,11 @@ class Qwen3MoeForCausalLMRL(Qwen3MoeForCausalLM, BaseRLModel):
|
||||
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
|
||||
if self._mappings_built:
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
self.infer_to_train_mapping = {}
|
||||
self._mappings_built = True
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
@@ -429,4 +451,29 @@ class Qwen3ForCausalLMRL(Qwen3ForCausalLM, BaseRLModel):
|
||||
return "Qwen3ForCausalLMRL"
|
||||
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
pass
|
||||
if self._mappings_built:
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
self.infer_to_train_mapping = {}
|
||||
self._mappings_built = True
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
# Initialize mapping dictionary
|
||||
self._update_base_mappings("model")
|
||||
base_name = "model.layers"
|
||||
|
||||
# Helper function to add layer mappings
|
||||
def _add_layer_mappings(layer_idx):
|
||||
# FFN mappings
|
||||
for ph in place_holders:
|
||||
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.up_gate_proj.{ph}"] = (
|
||||
f"{base_name}.{layer_idx}.mlp.gate_up_fused_proj.{ph}"
|
||||
)
|
||||
|
||||
for layer_idx in range(self.fd_config.model_config.num_hidden_layers):
|
||||
_add_layer_mappings(layer_idx)
|
||||
|
||||
self._complete_missing_mappings()
|
||||
|
||||
return self.infer_to_train_mapping
|
||||
|
@@ -203,15 +203,20 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
|
||||
req_len = len(req_dicts)
|
||||
has_prefill_task = False
|
||||
has_decode_task = False
|
||||
for i in range(req_len):
|
||||
request = req_dicts[i]
|
||||
idx = request.idx
|
||||
if request.task_type.value == RequestType.PREFILL.value: # prefill task
|
||||
logger.debug(f"Handle prefill request {request} at idx {idx}")
|
||||
prefill_start_index = request.prefill_start_index
|
||||
prefill_end_index = request.prefill_end_index
|
||||
length = prefill_end_index - prefill_start_index
|
||||
input_ids = request.prompt_token_ids + request.output_token_ids
|
||||
logger.debug(
|
||||
f"Handle prefill request {request} at idx {idx}, "
|
||||
f"{prefill_start_index=}, {prefill_end_index=}, "
|
||||
f"need_prefilled_token_num={len(input_ids)}"
|
||||
)
|
||||
self.share_inputs["input_ids"][idx : idx + 1, :length] = np.array(
|
||||
input_ids[prefill_start_index:prefill_end_index]
|
||||
)
|
||||
@@ -240,6 +245,8 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array(
|
||||
request.block_tables, dtype="int32"
|
||||
)
|
||||
if self.share_inputs["is_block_step"][idx]: # has tasks to continue to decode
|
||||
has_decode_task = True
|
||||
continue
|
||||
else: # preempted task
|
||||
logger.debug(f"Handle preempted request {request} at idx {idx}")
|
||||
@@ -260,6 +267,10 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0)
|
||||
self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0)
|
||||
self.share_inputs["presence_score"][idx : idx + 1] = request.get("presence_penalty", 0.0)
|
||||
self.share_inputs["temp_scaled_logprobs"][idx : idx + 1] = request.get("temp_scaled_logprobs", False)
|
||||
self.share_inputs["top_p_normalized_logprobs"][idx : idx + 1] = request.get(
|
||||
"top_p_normalized_logprobs", False
|
||||
)
|
||||
|
||||
self.share_inputs["min_dec_len"][idx : idx + 1] = request.get("min_tokens", 1)
|
||||
self.share_inputs["max_dec_len"][idx : idx + 1] = request.get(
|
||||
@@ -280,7 +291,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["stop_seqs"][:stop_seqs_num, : len(request.get("stop_token_ids")[0])] = np.array(
|
||||
request.get("stop_token_ids"), dtype="int64"
|
||||
)
|
||||
if has_prefill_task:
|
||||
if has_prefill_task or has_decode_task:
|
||||
self.share_inputs["not_need_stop"][0] = True
|
||||
|
||||
def insert_prefill_inputs(self, req_dicts: List[Request]):
|
||||
@@ -424,6 +435,12 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["presence_score"][idx : idx + 1] = get_attr_from_request(
|
||||
request, "presence_penalty", 0.0
|
||||
)
|
||||
self.share_inputs["temp_scaled_logprobs"][idx : idx + 1] = get_attr_from_request(
|
||||
request, "temp_scaled_logprobs", False
|
||||
)
|
||||
self.share_inputs["top_p_normalized_logprobs"][idx : idx + 1] = get_attr_from_request(
|
||||
request, "top_p_normalized_logprobs", False
|
||||
)
|
||||
|
||||
self.share_inputs["min_dec_len"][idx : idx + 1] = request.get("min_tokens", 1)
|
||||
self.share_inputs["max_dec_len"][idx : idx + 1] = request.get(
|
||||
@@ -536,6 +553,8 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["presence_score"] = paddle.full(
|
||||
[max_num_seqs, 1], self.model_config.presence_score, dtype="float32"
|
||||
)
|
||||
self.share_inputs["temp_scaled_logprobs"] = paddle.full([max_num_seqs, 1], False, dtype="bool")
|
||||
self.share_inputs["top_p_normalized_logprobs"] = paddle.full([max_num_seqs, 1], False, dtype="bool")
|
||||
|
||||
self.share_inputs["min_dec_len"] = paddle.full([max_num_seqs, 1], self.model_config.min_length, dtype="int64")
|
||||
self.share_inputs["max_dec_len"] = paddle.full(
|
||||
@@ -741,6 +760,9 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
bad_words_token_ids=self.share_inputs["bad_tokens"],
|
||||
eos_token_ids=self.share_inputs["eos_token_id"],
|
||||
max_num_logprobs=20 if self.enable_logprob else None,
|
||||
temp_scaled_logprobs=self.share_inputs["temp_scaled_logprobs"],
|
||||
top_p_normalized_logprobs=self.share_inputs["top_p_normalized_logprobs"],
|
||||
share_inputs=self.share_inputs,
|
||||
)
|
||||
|
||||
def load_model(self) -> None:
|
||||
|
@@ -587,6 +587,11 @@ def parse_args():
|
||||
action="store_true",
|
||||
help="Enable output of token-level log probabilities.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--lm_head_fp32",
|
||||
action="store_true",
|
||||
help="The data type of lm_head",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
return args
|
||||
|
@@ -8,9 +8,9 @@ aiozmq
|
||||
openai>=1.93.0
|
||||
tqdm
|
||||
pynvml
|
||||
uvicorn
|
||||
uvicorn==0.29.0
|
||||
fastapi
|
||||
paddleformers
|
||||
paddleformers==0.1.4
|
||||
redis
|
||||
etcd3
|
||||
httpx
|
||||
|
@@ -8,7 +8,7 @@ aiozmq
|
||||
openai
|
||||
tqdm
|
||||
pynvml
|
||||
uvicorn
|
||||
uvicorn==0.29.0
|
||||
fastapi
|
||||
paddleformers
|
||||
redis
|
||||
|
@@ -8,7 +8,7 @@ aiozmq
|
||||
openai
|
||||
tqdm
|
||||
pynvml
|
||||
uvicorn
|
||||
uvicorn==0.29.0
|
||||
fastapi
|
||||
paddleformers
|
||||
redis
|
||||
|
@@ -3,7 +3,7 @@ DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
echo "$DIR"
|
||||
|
||||
python -m pip config set global.index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
python -m pip install --pre paddlepaddle-gpu -i https://www.paddlepaddle.org.cn/packages/nightly/cu126/
|
||||
python -m pip install paddlepaddle-gpu==3.1.0 -i https://www.paddlepaddle.org.cn/packages/stable/cu126/
|
||||
python -m pip install -r requirements.txt
|
||||
python -m pip install jsonschema aistudio_sdk==0.3.5
|
||||
bash build.sh || exit 1
|
||||
@@ -24,7 +24,7 @@ for subdir in "$run_path"*/; do
|
||||
echo "------------------------------------------------------------"
|
||||
|
||||
set +e
|
||||
timeout 360 python -m pytest --disable-warnings -sv "$file"
|
||||
timeout 600 python -m pytest --disable-warnings -sv "$file"
|
||||
exit_code=$?
|
||||
set -e
|
||||
|
||||
@@ -44,7 +44,7 @@ for subdir in "$run_path"*/; do
|
||||
if [ "$exit_code" -eq 1 ] || [ "$exit_code" -eq 124 ]; then
|
||||
echo "[ERROR] $file 起服务或执行异常,exit_code=$exit_code"
|
||||
if [ "$exit_code" -eq 124 ]; then
|
||||
echo "[TIMEOUT] $file 脚本执行超过 6 分钟, 任务超时退出!"
|
||||
echo "[TIMEOUT] $file 脚本执行超过 10 分钟, 任务超时退出!"
|
||||
fi
|
||||
fi
|
||||
|
||||
|
2
setup.py
2
setup.py
@@ -181,7 +181,7 @@ def get_name():
|
||||
|
||||
cmdclass_dict = {"bdist_wheel": CustomBdistWheel}
|
||||
cmdclass_dict["build_ext"] = CMakeBuild
|
||||
FASTDEPLOY_VERSION = os.environ.get("FASTDEPLOY_VERSION", "2.0.0-dev")
|
||||
FASTDEPLOY_VERSION = os.environ.get("FASTDEPLOY_VERSION", "2.0.4")
|
||||
cmdclass_dict["build_optl"] = PostInstallCommand
|
||||
|
||||
setup(
|
||||
|
@@ -662,3 +662,55 @@ def test_streaming_completion_with_prompt_token_ids(openai_client, capsys):
|
||||
else:
|
||||
assert hasattr(chunk.usage, "prompt_tokens")
|
||||
assert chunk.usage.prompt_tokens == 9
|
||||
|
||||
|
||||
def test_non_streaming_chat_completion_disable_chat_template(openai_client, capsys):
|
||||
"""
|
||||
Test disable_chat_template option in chat functionality with the local service.
|
||||
"""
|
||||
enabled_response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[{"role": "user", "content": "Hello, how are you?"}],
|
||||
max_tokens=10,
|
||||
temperature=0.0,
|
||||
top_p=0,
|
||||
extra_body={"disable_chat_template": False},
|
||||
stream=False,
|
||||
)
|
||||
assert hasattr(enabled_response, "choices")
|
||||
assert len(enabled_response.choices) > 0
|
||||
|
||||
# from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer
|
||||
# tokenizer = ErnieBotTokenizer.from_pretrained("PaddlePaddle/ERNIE-4.5-0.3B-Paddle", trust_remote_code=True)
|
||||
# prompt = tokenizer.apply_chat_template([{"role": "user", "content": "Hello, how are you?"}], tokenize=False)
|
||||
prompt = "<|begin_of_sentence|>User: Hello, how are you?\nAssistant: "
|
||||
disabled_response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
max_tokens=10,
|
||||
temperature=0,
|
||||
top_p=0,
|
||||
extra_body={"disable_chat_template": True},
|
||||
stream=False,
|
||||
)
|
||||
assert hasattr(disabled_response, "choices")
|
||||
assert len(disabled_response.choices) > 0
|
||||
assert enabled_response.choices[0].message.content == disabled_response.choices[0].message.content
|
||||
|
||||
|
||||
def test_non_streaming_chat_with_min_tokens(openai_client, capsys):
|
||||
"""
|
||||
Test min_tokens option in non-streaming chat functionality with the local service
|
||||
"""
|
||||
min_tokens = 1000
|
||||
response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[{"role": "user", "content": "Hello, how are you?"}],
|
||||
temperature=1,
|
||||
max_tokens=1010,
|
||||
extra_body={"min_tokens": min_tokens},
|
||||
stream=False,
|
||||
)
|
||||
assert hasattr(response, "usage")
|
||||
assert hasattr(response.usage, "completion_tokens")
|
||||
assert response.usage.completion_tokens >= min_tokens
|
||||
|
@@ -116,6 +116,8 @@ def setup_and_run_server():
|
||||
"0.71",
|
||||
"--quantization",
|
||||
"wint4",
|
||||
"--reasoning-parser",
|
||||
"ernie-45-vl",
|
||||
]
|
||||
|
||||
# Start subprocess in new process group
|
||||
@@ -127,14 +129,14 @@ def setup_and_run_server():
|
||||
start_new_session=True, # Enables killing full group via os.killpg
|
||||
)
|
||||
|
||||
# Wait up to 300 seconds for API server to be ready
|
||||
for _ in range(300):
|
||||
# Wait up to 600 seconds for API server to be ready
|
||||
for _ in range(600):
|
||||
if is_port_open("127.0.0.1", FD_API_PORT):
|
||||
print(f"API server is up on port {FD_API_PORT}")
|
||||
break
|
||||
time.sleep(1)
|
||||
else:
|
||||
print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...")
|
||||
print("[TIMEOUT] API server failed to start in 10 minutes. Cleaning up...")
|
||||
try:
|
||||
os.killpg(process.pid, signal.SIGTERM)
|
||||
except Exception as e:
|
||||
@@ -214,7 +216,11 @@ def test_consistency_between_runs(api_url, headers, consistent_payload):
|
||||
resp1 = requests.post(api_url, headers=headers, json=consistent_payload)
|
||||
assert resp1.status_code == 200
|
||||
result1 = resp1.json()
|
||||
content1 = result1["choices"][0]["message"]["content"]
|
||||
content1 = (
|
||||
result1["choices"][0]["message"]["reasoning_content"]
|
||||
+ "</think>"
|
||||
+ result1["choices"][0]["message"]["content"]
|
||||
)
|
||||
file_res_temp = "ernie-4_5-vl"
|
||||
f_o = open(file_res_temp, "a")
|
||||
f_o.writelines(content1)
|
||||
@@ -223,9 +229,9 @@ def test_consistency_between_runs(api_url, headers, consistent_payload):
|
||||
# base result
|
||||
base_path = os.getenv("MODEL_PATH")
|
||||
if base_path:
|
||||
base_file = os.path.join(base_path, "ernie-4_5-vl-base-tp2")
|
||||
base_file = os.path.join(base_path, "ernie-4_5-vl-base-tp2-204")
|
||||
else:
|
||||
base_file = "ernie-4_5-vl-base-tp2"
|
||||
base_file = "ernie-4_5-vl-base-tp2-204"
|
||||
with open(base_file, "r") as f:
|
||||
content2 = f.read()
|
||||
|
||||
@@ -338,10 +344,7 @@ def test_non_streaming_chat_with_return_token_ids(openai_client, capsys):
|
||||
response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful AI assistant.",
|
||||
}, # system不是必需,可选
|
||||
{"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
@@ -373,10 +376,7 @@ def test_non_streaming_chat_with_return_token_ids(openai_client, capsys):
|
||||
response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful AI assistant.",
|
||||
}, # system不是必需,可选
|
||||
{"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
@@ -413,10 +413,7 @@ def test_streaming_chat_with_return_token_ids(openai_client, capsys):
|
||||
response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful AI assistant.",
|
||||
}, # system不是必需,可选
|
||||
{"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
@@ -455,10 +452,7 @@ def test_streaming_chat_with_return_token_ids(openai_client, capsys):
|
||||
response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a helpful AI assistant.",
|
||||
}, # system不是必需,可选
|
||||
{"role": "system", "content": "You are a helpful AI assistant."}, # system不是必需,可选
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
@@ -486,3 +480,54 @@ def test_streaming_chat_with_return_token_ids(openai_client, capsys):
|
||||
assert chunk.choices[0].delta.prompt_token_ids is None
|
||||
assert hasattr(chunk.choices[0].delta, "completion_token_ids")
|
||||
assert chunk.choices[0].delta.completion_token_ids is None
|
||||
|
||||
|
||||
def test_chat_with_thinking(openai_client, capsys):
|
||||
"""
|
||||
Test enable_thinking & reasoning_max_tokens option in non-streaming chat functionality with the local service
|
||||
"""
|
||||
# enable thinking, non-streaming
|
||||
response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[{"role": "user", "content": "Explain gravity in a way that a five-year-old child can understand."}],
|
||||
temperature=1,
|
||||
stream=False,
|
||||
max_tokens=10,
|
||||
extra_body={"enable_thinking": True},
|
||||
)
|
||||
assert response.choices[0].message.reasoning_content is not None
|
||||
|
||||
# disable thinking, non-streaming
|
||||
response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[{"role": "user", "content": "Explain gravity in a way that a five-year-old child can understand."}],
|
||||
temperature=1,
|
||||
stream=False,
|
||||
max_tokens=10,
|
||||
extra_body={"enable_thinking": False},
|
||||
)
|
||||
assert response.choices[0].message.reasoning_content is None
|
||||
|
||||
# enable thinking, streaming
|
||||
reasoning_max_tokens = 3
|
||||
response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[{"role": "user", "content": "Explain gravity in a way that a five-year-old child can understand."}],
|
||||
temperature=1,
|
||||
extra_body={"enable_thinking": True, "reasoning_max_tokens": reasoning_max_tokens, "return_token_ids": True},
|
||||
stream=True,
|
||||
max_tokens=10,
|
||||
)
|
||||
completion_tokens = reasoning_tokens = 1
|
||||
total_tokens = 0
|
||||
for chunk_id, chunk in enumerate(response):
|
||||
if chunk_id == 0: # the first chunk is an extra chunk
|
||||
continue
|
||||
delta_message = chunk.choices[0].delta
|
||||
if delta_message.content != "" and delta_message.reasoning_content == "":
|
||||
completion_tokens += len(delta_message.completion_token_ids)
|
||||
elif delta_message.reasoning_content != "" and delta_message.content == "":
|
||||
reasoning_tokens += len(delta_message.completion_token_ids)
|
||||
total_tokens += len(delta_message.completion_token_ids)
|
||||
assert completion_tokens + reasoning_tokens == total_tokens
|
||||
assert reasoning_tokens <= reasoning_max_tokens
|
||||
|
Reference in New Issue
Block a user