Compare commits

...

23 Commits

Author SHA1 Message Date
gaoziyuan
5224f6c434 support qwen3 name_mapping (#3170) 2025-08-04 16:37:40 +08:00
yinwei
bfef09dd73 update user email (#3087) 2025-07-31 14:25:31 +08:00
LokeZhou
1d46420c49 [cherry-pick][MM_PROCESS] add _extract_labels (#2879) (#2993) 2025-07-24 11:04:43 +08:00
ltd0924
fb0f284e67 [BugFix] fix prompt token ids type (#2994)
* Update serving_completion.py

* fix

* fix
2025-07-23 21:00:56 +08:00
Zero Rains
5d1788c7b5 polish code for prefill restrictions (#2992) 2025-07-23 05:12:01 -07:00
Zero Rains
abd238fc12 [Cherry-Pick][BugFix] Add prefill restrictions for chunked_prefill+VL (#2984) 2025-07-23 01:53:26 -07:00
Jiang-Jia-Jun
e5804b1d98 Revert "[LLM] fix multinode bugs (#2945)" (#2971)
This reverts commit b0f1e0eef4.
2025-07-22 21:23:48 +08:00
Sunny-bot1
8c43bc8176 [FIX 2.0.3]fix rejection sampling when topp=0 using _SAMPLING_EPS (#2966)
* fix rejection sampling when topp=0

* fix

* fix
2025-07-22 05:53:04 -07:00
ltd0924
b0f1e0eef4 [LLM] fix multinode bugs (#2945)
* [LLM] fix multinode bugs

* [LLM] fix multinode bugs

* [LLM] fix multinode bugs

* [LLM] fix ci bugs

* fix ci bugs

* fix ci bugs
2025-07-22 20:23:37 +08:00
ming1753
69be77c8c0 [Feature] support prompt repetition_penalty (#2954)
* [Feature] support prompt repetition_penalty (#2806)

* [Bug Fix] fix bug of prompt penalty (#2888)
2025-07-22 19:42:33 +08:00
gaoziyuan
535a15ab8f [Fix]Fix vl when import fastdeploy and fix rl config rank bug (#2953)
* support vl ori_vacab_size

* support trainer_degree in name_mapping

* fix

* fix import error

* fix local rank
2025-07-22 04:40:27 -07:00
sg263
580460046f merge 2.0.2 into 2.0.3 (#2917)
Co-authored-by: shige <shige@baidu.com>
2025-07-22 14:46:20 +08:00
Sunny-bot1
4dbc483713 [BugFix]Fix sample rejection (#2908) (#2949)
* fix config

* fix rejection

Co-authored-by: YuanRisheng <yuanrisheng@baidu.com>
2025-07-22 13:39:34 +08:00
gaoziyuan
4ead15822c 【Sync develop】support vl model name_mapping and ori_vocab_size (#2915)
* support vl ori_vacab_size

* support trainer_degree in name_mapping

* fix
2025-07-20 23:14:15 -07:00
Jiang-Jia-Jun
f941124402 [Feature] Support include_stop_str_in_output (#2930)
Co-authored-by: Jiang-Jia-Jun <jiangjiajun@baidu.com>
2025-07-21 10:58:32 +08:00
RAM
b89f083004 [Executor] Fix set capture sizes bug (#2903) 2025-07-18 10:58:05 +08:00
RAM
4d05ed596c Update GraphOptimizationBackend docs (#2899) 2025-07-17 21:41:38 +08:00
ltd0924
bc1866af58 [LLM] delete fixed slot (#2894) 2025-07-17 20:44:55 +08:00
yulangz
fe237fe92b [XPU][doc] pick xpu doc fix (#2897) 2025-07-17 20:01:40 +08:00
YUNSHEN XIE
3a480abcbb enable CI workflow for pull requests targeting release/* branches (#2886)
* enable CI workflow for pull requests targeting release/* branches

* fix
2025-07-17 16:48:13 +08:00
Yuanle Liu
335609efb6 fix rollout_model and add rl ut (#2882) 2025-07-17 13:37:54 +08:00
Jiang-Jia-Jun
3464f75f98 Update setup.py 2025-07-16 23:45:05 +08:00
Jiang-Jia-Jun
09d0073fdc [Sync Code] develop to release/2.0.3 (#2873)
* [LLM] support send batch data and aggregate data (#2860)

* [LLM] support send batch data and aggregate data

* [LLM] fix ci bugs

* [LLM] fix ci bugs

* [LLM] fix ci bugs

* [LLM] fix ci bugs

* [LLM] update

* [LLM] Update Multinode Deployment (#2830)

* [LLM] fix multinode bugs

* [LLM] update multinode deployment

* [LLM] update multinode deployment

* [LLM] update multinode deployment

* [LLM] update multinode deployment

* [LLM] update multinode deployment

* [LLM] fix ci bugs

* Update fastdeploy/engine/args_utils.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* [LLM] update random port

* [LLM] update random port

* [LLM] fix ci bugs

* fix ci bugs

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: ltd0924 <32387785+ltd0924@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-07-16 23:44:26 +08:00
51 changed files with 1306 additions and 905 deletions

View File

@@ -2,7 +2,9 @@ name: CI
on:
pull_request:
branches: [ develop ]
branches:
- develop
- 'release/*'
workflow_dispatch:
concurrency:
@@ -27,9 +29,11 @@ jobs:
REPO="https://github.com/${{ github.repository }}.git"
FULL_REPO="${{ github.repository }}"
REPO_NAME="${FULL_REPO##*/}"
BASE_BRANCH="${{ github.base_ref }}"
# Clean the repository directory before starting
docker run --rm --net=host -v $(pwd):/workspace -w /workspace \
-e "REPO_NAME=${REPO_NAME}" \
-e "BASE_BRANCH=${BASE_BRANCH}" \
${docker_image} /bin/bash -c '
if [ -d ${REPO_NAME} ]; then
echo "Directory ${REPO_NAME} exists, removing it..."
@@ -38,7 +42,7 @@ jobs:
'
git config --global user.name "FastDeployCI"
git config --global user.email "fastdeploy_ci@example.com"
git clone ${REPO} ${REPO_NAME}
git clone ${REPO} ${REPO_NAME} -b ${BASE_BRANCH}
cd FastDeploy
if [ "${{ github.event_name }}" = "pull_request" ]; then
git fetch origin pull/${{ github.event.pull_request.number }}/head:pr/${{ github.event.pull_request.number }}

View File

@@ -2,7 +2,9 @@ name: CI_XPU
on:
pull_request:
branches: [ develop ]
branches:
- develop
- 'release/*'
workflow_dispatch:
concurrency:
@@ -27,9 +29,11 @@ jobs:
REPO="https://github.com/${{ github.repository }}.git"
FULL_REPO="${{ github.repository }}"
REPO_NAME="${FULL_REPO##*/}"
BASE_BRANCH="${{ github.base_ref }}"
# Clean the repository directory before starting
docker run --rm --net=host -v $(pwd):/workspace -w /workspace \
-e "REPO_NAME=${REPO_NAME}" \
-e "BASE_BRANCH=${BASE_BRANCH}" \
${docker_image} /bin/bash -c '
if [ -d ${REPO_NAME} ]; then
echo "Directory ${REPO_NAME} exists, removing it..."
@@ -38,7 +42,7 @@ jobs:
'
git config --global user.name "FastDeployCI"
git config --global user.email "fastdeploy_ci@example.com"
git clone ${REPO} ${REPO_NAME}
git clone ${REPO} ${REPO_NAME} -b ${BASE_BRANCH}
cd FastDeploy
if [ "${{ github.event_name }}" = "pull_request" ]; then
git fetch origin pull/${{ github.event.pull_request.number }}/head:pr/${{ github.event.pull_request.number }}

View File

@@ -289,7 +289,7 @@ __global__ void TopKTopPSamplingFromProbKernel(DType* probs, IdType* output,
curand_init(philox_seed, bx, philox_offset, &state);
const uint32_t row_idx = bx;
const uint32_t k = top_k_arr[row_idx] == 0 ? d : top_k_arr[row_idx];
const float p = top_p_arr[row_idx] == 0 ? 1e-6 : top_p_arr[row_idx];
const float p = top_p_arr[row_idx];
extern __shared__ __align__(
alignof(SamplingTempStorage<BLOCK_THREADS, SCAN_ALGORITHM, REDUCE_ALGORITHM>))

View File

@@ -20,16 +20,16 @@ __global__ inline void min_length_logits_process(T *logits,
const int64_t *min_len,
const int64_t *eos_token_id,
const int64_t bs,
const int64_t length,
const int64_t end_length) {
const int64_t vocab_size,
const int64_t eos_len) {
int bi = threadIdx.x;
if (bi >= bs) return;
if (cur_len[bi] < 0) {
return;
}
if (cur_len[bi] < min_len[bi]) {
for (int i = 0; i < end_length; i++) {
logits[bi * length + eos_token_id[i]] = -1e10;
for (int i = 0; i < eos_len; i++) {
logits[bi * vocab_size + eos_token_id[i]] = -1e10;
}
}
}
@@ -41,61 +41,83 @@ __global__ inline void min_length_logits_process<half>(
const int64_t *min_len,
const int64_t *eos_token_id,
const int64_t bs,
const int64_t length,
const int64_t end_length) {
const int64_t vocab_size,
const int64_t eos_len) {
int bi = threadIdx.x;
if (bi >= bs) return;
if (cur_len[bi] < 0) {
return;
}
if (cur_len[bi] < min_len[bi]) {
for (int i = 0; i < end_length; i++) {
logits[bi * length + eos_token_id[i]] = -1e4;
for (int i = 0; i < eos_len; i++) {
logits[bi * vocab_size + eos_token_id[i]] = -1e4;
}
}
}
__global__ void update_repeat_times(const int64_t *pre_ids,
const int64_t *prompt_ids,
const int64_t *prompt_len,
const int64_t *cur_len,
int *repeat_times,
int *is_repeated,
const int64_t bs,
const int64_t length,
const int64_t length_id) {
int bi = blockIdx.x;
const int64_t vocab_size,
const int64_t max_dec_len,
const int64_t max_model_len) {
int64_t bi = blockIdx.x;
if (cur_len[bi] < 0) {
return;
}
int tid = threadIdx.x;
const int64_t *pre_ids_now = pre_ids + bi * length_id;
int *repeat_times_now = repeat_times + bi * length;
for (int i = tid; i < length_id; i += blockDim.x) {
int64_t id = pre_ids_now[i];
if (id < 0) break;
atomicAdd(&repeat_times_now[id], 1);
const int64_t prompt_len_now = prompt_len[bi];
int64_t tid = threadIdx.x;
const int64_t *prompt_now = prompt_ids + bi * max_model_len;
const int64_t *pre_ids_now = pre_ids + bi * max_dec_len;
int *repeat_times_now = repeat_times + bi * vocab_size;
int *is_repeated_now = is_repeated + bi * vocab_size;
const int64_t loop_len = prompt_len_now > max_dec_len ? prompt_len_now : max_dec_len;
for (int64_t i = tid; i < loop_len; i += blockDim.x) {
if (i < max_dec_len) {
int64_t id = pre_ids_now[i];
if (id >= 0) {
atomicAdd(&repeat_times_now[id], 1);
atomicAdd(&is_repeated_now[id], 1);
}
}
if (i < prompt_len_now) {
int64_t id = prompt_now[i];
if (id >= 0) {
atomicAdd(&is_repeated_now[id], 1);
}
}
}
}
template <typename T>
__global__ void update_value_by_repeat_times(const int *repeat_times,
const int *is_repeated,
const T *penalty_scores,
const T *frequency_score,
const T *presence_score,
const float *temperatures,
T *logits,
const int64_t bs,
const int64_t length) {
const int64_t vocab_size) {
int bi = blockIdx.x;
int tid = threadIdx.x;
T *logits_now = logits + bi * length;
const int *repeat_times_now = repeat_times + bi * length;
T *logits_now = logits + bi * vocab_size;
const int *repeat_times_now = repeat_times + bi * vocab_size;
const int *is_repeated_now = is_repeated + bi * vocab_size;
float alpha = static_cast<float>(penalty_scores[bi]);
float beta = static_cast<float>(frequency_score[bi]);
float gamma = static_cast<float>(presence_score[bi]);
for (int i = tid; i < length; i += blockDim.x) {
for (int i = tid; i < vocab_size; i += blockDim.x) {
int times = repeat_times_now[i];
float logit_now = static_cast<float>(logits_now[i]);
if (times != 0) {
if (is_repeated_now[i] != 0) {
logit_now = logit_now < 0 ? logit_now * alpha : logit_now / alpha;
}
if (times != 0) {
logit_now = logit_now - times * beta - gamma;
}
logits_now[i] = static_cast<T>(logit_now / temperatures[bi]);
@@ -106,20 +128,22 @@ template <typename T>
__global__ void ban_bad_words(T *logits,
const int64_t *bad_words_list,
const int64_t bs,
const int64_t length,
const int64_t bad_words_length) {
const int64_t vocab_size,
const int64_t bad_words_len) {
const int bi = blockIdx.x;
int tid = threadIdx.x;
T *logits_now = logits + bi * length;
for (int i = tid; i < bad_words_length; i += blockDim.x) {
T *logits_now = logits + bi * vocab_size;
for (int i = tid; i < bad_words_len; i += blockDim.x) {
const int64_t bad_words_token_id = bad_words_list[i];
if (bad_words_token_id >= length || bad_words_token_id < 0) continue;
if (bad_words_token_id >= vocab_size || bad_words_token_id < 0) continue;
logits_now[bad_words_token_id] = -1e10;
}
}
template <paddle::DataType D>
void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
const paddle::Tensor &prompt_ids,
const paddle::Tensor &prompt_len,
const paddle::Tensor &logits,
const paddle::Tensor &penalty_scores,
const paddle::Tensor &frequency_score,
@@ -141,12 +165,15 @@ void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
std::vector<int64_t> shape = logits.shape();
auto repeat_times =
paddle::full(shape, 0, paddle::DataType::INT32, pre_ids.place());
auto is_repeated =
paddle::full(shape, 0, paddle::DataType::INT32, pre_ids.place());
int64_t bs = shape[0];
int64_t length = shape[1];
int64_t length_id = pre_ids.shape()[1];
int64_t length_bad_words = bad_tokens.shape()[0];
int64_t end_length = eos_token_id.shape()[0];
int64_t vocab_size = shape[1];
int64_t max_dec_len = pre_ids.shape()[1];
int64_t bad_words_len = bad_tokens.shape()[0];
int64_t eos_len = eos_token_id.shape()[0];
int64_t max_model_len = prompt_ids.shape()[1];
int block_size = (bs + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
min_length_logits_process<<<1, block_size, 0, cu_stream>>>(
@@ -156,10 +183,10 @@ void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
min_len.data<int64_t>(),
eos_token_id.data<int64_t>(),
bs,
length,
end_length);
vocab_size,
eos_len);
block_size = (length_id + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
block_size = (max_dec_len + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
#ifdef PADDLE_WITH_COREX
block_size = std::min(block_size, 512);
#else
@@ -167,13 +194,17 @@ void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
#endif
update_repeat_times<<<bs, block_size, 0, cu_stream>>>(
pre_ids.data<int64_t>(),
prompt_ids.data<int64_t>(),
prompt_len.data<int64_t>(),
cur_len.data<int64_t>(),
repeat_times.data<int>(),
is_repeated.data<int>(),
bs,
length,
length_id);
vocab_size,
max_dec_len,
max_model_len);
block_size = (length + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
block_size = (vocab_size + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
#ifdef PADDLE_WITH_COREX
block_size = std::min(block_size, 512);
#else
@@ -181,6 +212,7 @@ void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
#endif
update_value_by_repeat_times<DataType_><<<bs, block_size, 0, cu_stream>>>(
repeat_times.data<int>(),
is_repeated.data<int>(),
reinterpret_cast<DataType_ *>(
const_cast<data_t *>(penalty_scores.data<data_t>())),
reinterpret_cast<DataType_ *>(
@@ -191,9 +223,9 @@ void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
reinterpret_cast<DataType_ *>(
const_cast<data_t *>(logits.data<data_t>())),
bs,
length);
vocab_size);
block_size = (length_bad_words + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
block_size = (bad_words_len + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
#ifdef PADDLE_WITH_COREX
block_size = std::min(block_size, 512);
#else
@@ -204,11 +236,13 @@ void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
const_cast<data_t *>(logits.data<data_t>())),
bad_tokens.data<int64_t>(),
bs,
length,
length_bad_words);
vocab_size,
bad_words_len);
}
void TokenPenaltyMultiScores(const paddle::Tensor &pre_ids,
const paddle::Tensor &prompt_ids,
const paddle::Tensor &prompt_len,
const paddle::Tensor &logits,
const paddle::Tensor &penalty_scores,
const paddle::Tensor &frequency_scores,
@@ -222,6 +256,8 @@ void TokenPenaltyMultiScores(const paddle::Tensor &pre_ids,
case paddle::DataType::BFLOAT16: {
return token_penalty_multi_scores_kernel<
paddle::DataType::BFLOAT16>(pre_ids,
prompt_ids,
prompt_len,
logits,
penalty_scores,
frequency_scores,
@@ -233,30 +269,34 @@ void TokenPenaltyMultiScores(const paddle::Tensor &pre_ids,
eos_token_id);
}
case paddle::DataType::FLOAT16: {
return token_penalty_multi_scores_kernel<paddle::DataType::FLOAT16>(
pre_ids,
logits,
penalty_scores,
frequency_scores,
presence_scores,
temperatures,
bad_tokens,
cur_len,
min_len,
eos_token_id);
return token_penalty_multi_scores_kernel<
paddle::DataType::FLOAT16>(pre_ids,
prompt_ids,
prompt_len,
logits,
penalty_scores,
frequency_scores,
presence_scores,
temperatures,
bad_tokens,
cur_len,
min_len,
eos_token_id);
}
case paddle::DataType::FLOAT32: {
return token_penalty_multi_scores_kernel<paddle::DataType::FLOAT32>(
pre_ids,
logits,
penalty_scores,
frequency_scores,
presence_scores,
temperatures,
bad_tokens,
cur_len,
min_len,
eos_token_id);
return token_penalty_multi_scores_kernel<
paddle::DataType::FLOAT32>(pre_ids,
prompt_ids,
prompt_len,
logits,
penalty_scores,
frequency_scores,
presence_scores,
temperatures,
bad_tokens,
cur_len,
min_len,
eos_token_id);
}
default: {
PD_THROW(
@@ -269,6 +309,8 @@ void TokenPenaltyMultiScores(const paddle::Tensor &pre_ids,
PD_BUILD_STATIC_OP(get_token_penalty_multi_scores)
.Inputs({"pre_ids",
"prompt_ids",
"prompt_len",
"logits",
"penalty_scores",
"frequency_scores",

View File

@@ -25,9 +25,9 @@ Verified platform:
```bash
mkdir Work
cd Work
docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.0
docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.3
docker run --name fastdeploy-xpu --net=host -itd --privileged -v $PWD:/Work -w /Work \
ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.0 \
ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.3 \
/bin/bash
docker exec -it fastdeploy-xpu /bin/bash
```
@@ -49,7 +49,7 @@ python -m pip install --pre paddlepaddle-xpu -i https://www.paddlepaddle.org.cn/
### Install FastDeploy (**Do NOT install via PyPI source**)
```bash
python -m pip install fastdeploy-xpu==2.0.0 -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
python -m pip install fastdeploy-xpu==2.0.3 -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
```
Alternatively, you can install the latest version of FastDeploy (Not recommended)
@@ -118,5 +118,5 @@ python -c "from fastdeploy.model_executor.ops.xpu import block_attn"
If all the above steps execute successfully, FastDeploy is installed correctly.
## How to deploy services on kunlunxin XPU
Refer to [**Supported Models and Service Deployment**](../../usage/kunlunxin_xpu_deployment.md) for the details about the supported models and the way to deploy services on kunlunxin XPU.
## How to deploy services on Kunlunxin XPU
Refer to [**Supported Models and Service Deployment**](../../usage/kunlunxin_xpu_deployment.md) for the details about the supported models and the way to deploy services on Kunlunxin XPU.

View File

@@ -33,9 +33,8 @@ When using FastDeploy to deploy models (including offline inference and service
| ```long_prefill_token_threshold``` | `int` | When Chunked Prefill is enabled, requests with token count exceeding this value are considered long requests, default: max_model_len*0.04 |
| ```static_decode_blocks``` | `int` | During inference, each request is forced to allocate corresponding number of blocks from Prefill's KVCache for Decode use, default: 2 |
| ```reasoning_parser``` | `str` | Specify the reasoning parser to extract reasoning content from model output |
| ```enable_static_graph_inference``` | `bool` | Whether to use static graph inference mode, default: False |
| ```use_cudagraph``` | `bool` | Whether to use cuda graph, default: False |
| ```max_capture_batch_size``` | `int` | When cuda graph is enabled, maximum batch size of captured cuda graph, default: 64 |
```graph_optimization_config``` | `str` | Parameters related to graph optimization can be configured, with default values of'{"use_cudagraph":false, "graph_opt_level":0, "cudagraph_capture_sizes": null }' |
| ```enable_custom_all_reduce``` | `bool` | Enable Custom all-reduce, default: False |
| ```splitwise_role``` | `str` | Whether to enable splitwise inference, default value: mixed, supported parameters: ["mixed", "decode", "prefill"] |
| ```innode_prefill_ports``` | `str` | Internal engine startup ports for prefill instances (only required for single-machine PD separation), default: None |
@@ -72,20 +71,53 @@ When `enable_chunked_prefill` is enabled, the service processes long input seque
To optimize scheduling priority for short requests, new `max_long_partial_prefills` and `long_prefill_token_threshold` parameter combination is added. The former limits the number of long requests in single prefill batch, the latter defines the token threshold for long requests. The system will prioritize batch space for short requests, thereby reducing short request latency in mixed workload scenarios while maintaining stable throughput.
## 4. GraphOptimizationBackend related configuration parameters
Currently, only user configuration of the following parameters is supported
- `use_cudagraph` : bool = False
- `graph_optimization_config` : Dict[str, Any]
- `graph_opt_level`: int = 0
- `use_cudagraph`: bool = False
- `cudagraph_capture_sizes` : List[int] = None
### Static graph inference related parameters
CudaGrpah can be enabled by setting `--use-cudagraph` or `--graph-optimization-config '{"use_cudagraph":true}'`. Using two different methods to set the use graph simultaneously may cause conflicts.
The `graph_opt_level` parameter within `--graph-optimization-config` is used to configure the graph optimization level, with the following available options:
- `0`: Use Dynamic compute graph, default to 0
- `1`: Use Static compute graph, during the initialization phase, Paddle API will be used to convert the dynamic image into a static image
- `2`: Base on Static compute graph, use the complier(CINN, Compiler Infrastructure for Neural Networks) of Paddle to compile and optimize
In general, static graphs have lower Kernel Launch overhead than dynamic graphs, and it is recommended to use static graphs.
For adapted models, FastDeploy's CudaGraph * * can support both dynamic and static graphs * * simultaneously.
When CudaGraph is enabled in the default configuration, a list of Batch Sizes that CudaGraph needs to capture will be automatically set based on the 'max_num_deqs' parameter. The logic for generating the list of Batch Sizes that need to be captured is as follows
1. Generate a candidate list with a range of [1,1024] Batch Size.
```
# Batch Size [1, 2, 4, 8, 16, ... 120, 128]
candidate_capture_sizes = [1, 2, 4] + [8 * i for i in range(1, 17)]
# Batch Size (128, 144, ... 240, 256]
candidate_capture_sizes += [16 * i for i in range(9, 17)]
# Batch Size (256, 288, ... 992, 1024]
candidate_capture_sizes += [32 * i for i in range(17, 33)]
```
2. Crop the candidate list based on the user set 'max_num_deqs' to obtain a CudaGraph capture list with a range of [1,' max_num_deqs'].
Users can also customize the batch size list that needs to be captured by CudaGraph through the parameter `cudagraph_capture_sizes` in`--graph-optimization-config`:
```
--graph-optimization-config '{"cudagraph_capture_sizes": [1, 3, 5, 7, 9]}'
```
- When ```enable_static_graph_inference``` is enabled, dynamic-to-static graph conversion will be performed, using static graph for inference.
### CudaGraph related parameters
For adapted models, FastDeploy's CudaGraph can support both dynamic and static graphs. Using CudaGraph incurs some additional memory overhead, divided into two categories in FastDeploy:
Using CudaGraph incurs some additional memory overhead, divided into two categories in FastDeploy:
* Additional input Buffer overhead
* CudaGraph uses dedicated memory pool, thus holding some intermediate activation memory isolated from main framework
FastDeploy initialization sequence first uses `gpu_memory_utilization` parameter to calculate available memory for `KVCache`, after initializing `KVCache` then uses remaining memory to initialize CudaGraph. Since CudaGraph is not enabled by default currently, using default startup parameters may encounter `Out of memory` errors, can try following solutions:
* Lower `gpu_memory_utilization` value, reserve more memory for CudaGraph.
* Lower `max_capture_batch_size` value, reduce CudaGraph memory usage, but also reduce CudaGraph usage during inference.
* Lower `max_num_seqs` to decrease the maximum concurrency.
* Customize the batch size list that CudaGraph needs to capture through `graph_optimization_config`, and reduce the number of captured graphs by using `cudagraph_capture_sizes`
- Before use, must ensure loaded model is properly decorated with ```@support_graph_optimization```.
@@ -118,4 +150,3 @@ FastDeploy initialization sequence first uses `gpu_memory_utilization` parameter
```
- When ```use_cudagraph``` is enabled, currently only supports single-GPU inference, i.e. ```tensor_parallel_size``` set to 1.
- When ```use_cudagraph``` is enabled, cannot enable ```enable_prefix_caching``` or ```enable_chunked_prefill```.
- When ```use_cudagraph``` is enabled, batches with size ≤ ```max_capture_batch_size``` will be executed by CudaGraph, batches > ```max_capture_batch_size``` will be executed by original dynamic/static graph. To have all batch sizes executed by CudaGraph, ```max_capture_batch_size``` value should match ```max_num_seqs```. ```max_capture_batch_size``` > ```max_num_seqs``` will cause waste by capturing batches that won't be encountered during inference, occupying more time and memory.

View File

@@ -1,20 +1,14 @@
## Supported Models
|Model Name|Context Length|Quantization|XPUs Required|Deployment Commands|
|-|-|-|-|-|
|ERNIE-4.5-300B-A47B|32K|WINT8|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-300B-A47B|32K|WINT4|4 (recommend)|export XPU_VISIBLE_DEVICES="0,1,2,3" or "4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 4 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-300B-A47B|32K|WINT4|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-300B-A47B|128K|WINT4|8 (recommend)|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-21B-A3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-21B-A3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-21B-A3B|32K|WINT4|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-21B-A3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-21B-A3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-21B-A3B|128K|WINT4|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-0.3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-0.3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="x" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-0.3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-0.3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|Model Name|Context Length|Quantization|XPUs Required|Deployment Commands|Minimum Version Required|
|-|-|-|-|-|-|
|ERNIE-4.5-300B-A47B|32K|WINT8|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|ERNIE-4.5-300B-A47B|32K|WINT4|4 (recommend)|export XPU_VISIBLE_DEVICES="0,1,2,3" or "4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 4 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|ERNIE-4.5-300B-A47B|32K|WINT4|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|ERNIE-4.5-300B-A47B|128K|WINT4|8 (recommend)|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|ERNIE-4.5-0.3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|ERNIE-4.5-0.3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="x" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|ERNIE-4.5-0.3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|ERNIE-4.5-0.3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
## Quick start

View File

@@ -25,9 +25,9 @@
```bash
mkdir Work
cd Work
docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.0
docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.3
docker run --name fastdeploy-xpu --net=host -itd --privileged -v $PWD:/Work -w /Work \
ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.0 \
ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.3 \
/bin/bash
docker exec -it fastdeploy-xpu /bin/bash
```
@@ -49,7 +49,7 @@ python -m pip install --pre paddlepaddle-xpu -i https://www.paddlepaddle.org.cn/
### 安装 FastDeploy**注意不要通过 pypi 源安装**
```bash
python -m pip install fastdeploy-xpu==2.0.0 -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
python -m pip install fastdeploy-xpu==2.0.3 -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
```
或者你也可以安装最新版 FastDeploy不推荐
@@ -119,5 +119,5 @@ python -c "from fastdeploy.model_executor.ops.xpu import block_attn"
如果上述步骤均执行成功,代表 FastDeploy 已安装成功。
## 如何在昆仑XPU上部署服务
## 如何在昆仑XPU 上部署服务
请参考 [**支持的模型与服务部署**](../../usage/kunlunxin_xpu_deployment.md) 以了解昆仑芯 XPU 支持的模型与服务部署方法。

View File

@@ -32,9 +32,8 @@
| ```long_prefill_token_threshold``` | `int` | 开启Chunked Prefill时请求Token数超过此值的请求被视为长请求默认为max_model_len*0.04 |
| ```static_decode_blocks``` | `int` | 推理过程中每条请求强制从Prefill的KVCache分配对应块数给Decode使用默认2|
| ```reasoning_parser``` | `str` | 指定要使用的推理解析器,以便从模型输出中提取推理内容 |
| ```enable_static_graph_inference```| `bool` | 是否使用静态图推理模式默认False |
| ```use_cudagraph``` | `bool` | 是否使用cuda graph默认False |
| ```max_capture_batch_size``` | `int` | 开启 cuda graph 时,捕获的 cuda graph的最大batch size默认为64 |
```graph_optimization_config``` | `str` | 可以配置计算图优化相关的参数,默认值为'{"use_cudagraph":false, "graph_opt_level":0, "cudagraph_capture_sizes": null }' |
| ```enable_custom_all_reduce``` | `bool` | 开启Custom all-reduce默认False |
| ```splitwise_role``` | `str` | 是否开启splitwise推理默认值mixed 支持参数为["mixed", "decode", "prefill"] |
| ```innode_prefill_ports``` | `str` | prefill 实例内部引擎启动端口 仅单机PD分离需要默认值None |
@@ -70,22 +69,53 @@ FastDeploy在推理过程中显存被```模型权重```、```预分配KVCache
为优化短请求的调度优先级,新增 `max_long_partial_prefills` 与 `long_prefill_token_threshold` 参数组合。前者限制单个预填充批次中的长请求数量后者定义长请求的token阈值。系统会优先保障短请求的批处理空间从而在混合负载场景下降低短请求延迟同时保持整体吞吐稳定。
## 4. GraphOptimizationBackend 相关配置参数说明
当前仅支持用户配置以下参数:
- `use_cudagraph` : bool = False
- `graph_optimization_config` : Dict[str, Any]
- `graph_opt_level`: int = 0
- `use_cudagraph`: bool = False
- `cudagraph_capture_sizes` : List[int] = None
### 动态图转静态图相关参数说明
可以通过设置 `--use-cudagraph` 或 `--graph-optimization-config '{"use_cudagraph":true}'` 开启 CudaGrpah。
`--graph-optimization-config` 中的 `graph_opt_level` 参数用于配置图优化等级,可选项如下:
- `0`: 动态图,默认为 0
- `1`: 静态图,初始化阶段会使用 Paddle API 将动态图转换为静态图
- `2`: 在静态图的基础上,使用 Paddle 框架编译器CINN, Compiler Infrastructure for Neural Networks进行编译优化
一般情况下静态图比动态图的 Kernel Launch 开销更小,推荐使用静态图。
对于已适配的模型FastDeploy 的 CudaGraph **可同时支持动态图与静态图**。
在默认配置下开启 CudaGraph 时,会根据 `max_num_seqs` 参数自动设置 CudaGraph 需要捕获的 Batch Size 列表,需要捕获的 Batch Size 的列表自动生成逻辑如下:
1. 生成一个范围为 [1,1024] Batch Size 的候选列表
```
# Batch Size [1, 2, 4, 8, 16, ... 120, 128]
candidate_capture_sizes = [1, 2, 4] + [8 * i for i in range(1, 17)]
# Batch Size (128, 144, ... 240, 256]
candidate_capture_sizes += [16 * i for i in range(9, 17)]
# Batch Size (256, 288, ... 992, 1024]
candidate_capture_sizes += [32 * i for i in range(17, 33)]
```
2. 根据用户设置的 `max_num_seqs` 裁剪候选列表,得到范围为 [1, `max_num_seqs`] 的 CudaGraph 捕获列表。
用户也可以通过 `--graph-optimization-config` 中的 `cudagraph_capture_sizes` 参数自定义需要被 CudaGraph 捕获的 Batch Size 列表:
```
--graph-optimization-config '{"cudagraph_capture_sizes": [1, 3, 5, 7, 9]}'
```
- 当开启 ```enable_static_graph_inference```时,会执行动态图转静态图,使用静态图进行推理。
### CudaGraph相关参数说明
对于已适配的模型FastDeploy 的 CudaGraph 可同时支持动态图与静态图。使用 CudaGraph 会产生一些额外的显存开销在FastDeploy中分为下面两类
使用 CudaGraph 会产生一些额外的显存开销在FastDeploy中分为下面两类
* 额外的输入 Buffer 开销
* CudaGraph 使用了专用的显存池,因此会持有一部分与主框架隔离的中间激活显存
FastDeploy 的初始化顺序为先使用 `gpu_memory_utilization` 参数计算 `KVCache` 可用的显存,初始化完 `KVCache` 之后才会使用剩余显存初始化 CudaGraph。由于 CudaGraph 目前还不是默认开启的,因此使用默认启动参数可能会遇到 `Out of memory` 错误,可以尝试使用下面种方式解决:
FastDeploy 的初始化顺序为先使用 `gpu_memory_utilization` 参数计算 `KVCache` 可用的显存,初始化完 `KVCache` 之后才会使用剩余显存初始化 CudaGraph。由于 CudaGraph 目前还不是默认开启的,因此使用默认启动参数可能会遇到 `Out Of Memory` 错误,可以尝试使用下面种方式解决:
* 调低 `gpu_memory_utilization` 的值多预留一些显存给CudaGraph使用。
* 调低 `max_capture_batch_size` 的值, 减少CudaGraph的显存占用同时也会降低推理时CudaGraph的使用率
* 调低 `max_num_seqs` 的值,降低最大并发数
* 通过 `graph_optimization_config` 自定义需要 CudaGraph 捕获的 Batch Size 列表 `cudagraph_capture_sizes`,减少捕获的图的数量
- 使用之前,需要确保加载的模型被装饰器 ```@support_graph_optimization```正确修饰。
使用CudaGraph之前需要确保加载的模型被装饰器 ```@support_graph_optimization```正确修饰。
```python
# 1. import 装饰器
@@ -116,4 +146,3 @@ FastDeploy 的初始化顺序为先使用 `gpu_memory_utilization` 参数计算
```
- 当开启 ```use_cudagraph``` 时,暂时只支持单卡推理,即 ```tensor_parallel_size``` 设为1。
- 当开启 ```use_cudagraph``` 时,暂不支持开启 ```enable_prefix_caching``` 或 ```enable_chunked_prefill``` 。
- 当开启 ```use_cudagraph``` 后size小于等于 ```max_capture_batch_size``` 的batch会由CudaGraph来执行前向计算大于 ```max_capture_batch_size``` 的batch会由原本的动态图/静态图执行前向计算。如果希望所有batch size均由CudaGraph来执行```max_capture_batch_size``` 的值建议与 ```max_num_seqs``` 一致。```max_capture_batch_size``` 大于 ```max_num_seqs``` 会导致浪费会多捕获一些推理时不会遇到的batch占用更多时间与显存。

View File

@@ -1,20 +1,14 @@
## 支持的模型
|模型名|上下文长度|量化|所需卡数|部署命令|
|-|-|-|-|-|
|ERNIE-4.5-300B-A47B|32K|WINT8|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-300B-A47B|32K|WINT4|4 (推荐)|export XPU_VISIBLE_DEVICES="0,1,2,3" or "4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 4 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-300B-A47B|32K|WINT4|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-300B-A47B|128K|WINT4|8 (推荐)|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-21B-A3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-21B-A3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-21B-A3B|32K|WINT4|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-21B-A3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-21B-A3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-21B-A3B|128K|WINT4|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-0.3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-0.3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="x" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-0.3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|ERNIE-4.5-0.3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|模型名|上下文长度|量化|所需卡数|部署命令|最低版本要求|
|-|-|-|-|-|-|
|ERNIE-4.5-300B-A47B|32K|WINT8|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|ERNIE-4.5-300B-A47B|32K|WINT4|4 (推荐)|export XPU_VISIBLE_DEVICES="0,1,2,3" or "4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 4 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|ERNIE-4.5-300B-A47B|32K|WINT4|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|ERNIE-4.5-300B-A47B|128K|WINT4|8 (推荐)|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|ERNIE-4.5-0.3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|ERNIE-4.5-0.3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="x" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|ERNIE-4.5-0.3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|ERNIE-4.5-0.3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
## 快速开始

View File

@@ -37,6 +37,25 @@ class MoEPhase(Enum):
PREFILL = 1
DECODER = 2
class ErnieArchitectures:
"""Helper class for ERNIE architecture check."""
ARCHITECTURES = {
"Ernie4_5_ForCausalLM",
"Ernie4_5_MoeForCausalLM",
"Ernie4_5_VLMoeForConditionalGeneration"
}
@classmethod
def contains_ernie_arch(cls, architectures):
"""Check if any ERNIE architecture is present in the given architectures."""
return any(arch in architectures for arch in cls.ARCHITECTURES)
@classmethod
def is_ernie_arch(cls, architecture):
"""Check if the given architecture is an ERNIE architecture."""
return architecture in cls.ARCHITECTURES
PRETRAINED_INIT_CONFIGURATION = {
"rope_theta" : 10000.0,
"num_key_value_heads" : -1,
@@ -46,7 +65,6 @@ PRETRAINED_INIT_CONFIGURATION = {
"num_max_dispatch_tokens_per_rank" : 256,
"moe_use_aux_free" : False,
"vocab_size" : -1,
"use_rope": True,
"hidden_dropout_prob" : 0.0,
"initializer_range" : 0.02,
"max_position_embeddings" : 512,
@@ -70,7 +88,7 @@ class ModelConfig:
self.stop_seqs_max_len = 8
# NOTE(gongshaotain): form _load_model_init_val()
self.top_p = 0.0
self.top_p = 1.0
self.temperature = 1.0
self.rope_theta = 10000.0
self.penalty_score = 1.0
@@ -89,6 +107,7 @@ class ModelConfig:
if hasattr(self, key):
setattr(self, key, value)
assert self.model_name_or_path != ""
pretrained_config, _ = PretrainedConfig.get_config_dict(self.model_name_or_path)
self.pretrained_config = PretrainedConfig.from_dict(pretrained_config)
@@ -108,9 +127,10 @@ class ModelConfig:
self.vision_config = PretrainedConfig.from_dict(self.vision_config)
self.ori_vocab_size = self.vocab_size
if "Ernie4_5_ForCausalLM" in self.architectures or "Ernie4_5_MoeForCausalLM" in self.architectures:
if ErnieArchitectures.contains_ernie_arch(self.architectures):
self.ori_vocab_size = args["ori_vocab_size"]
class ParallelConfig:
"""Configuration for the distributed execution."""
def __init__(
@@ -317,7 +337,7 @@ class GraphOptimizationConfig:
pre-compute the mapping from batch size to padded graph size
"""
# Regular capture sizes
self.cudagraph_capture_sizes = [size for size in self.cudagraph_capture_sizes if size < max_num_seqs]
self.cudagraph_capture_sizes = [size for size in self.cudagraph_capture_sizes if size <= max_num_seqs]
dedup_sizes = list(set(self.cudagraph_capture_sizes))
if len(dedup_sizes) < len(self.cudagraph_capture_sizes):
logger.info(("cudagraph sizes specified by model runner"

View File

@@ -124,9 +124,19 @@ class EngineArgs:
Ratio of tokens to process in a block.
"""
pod_ips: Optional[List[str]] = None
dist_init_ip: Optional[str] = None
"""
List of IP addresses for nodes in the cluster.
The master node ip of multinode deployment
"""
nnodes: int = 1
"""
The number of nodes in multinode deployment
"""
node_rank: int = 0
"""
The rank of the current node in multinode deployment
"""
swap_space: float = None
@@ -485,11 +495,25 @@ class EngineArgs:
# Cluster system parameters group
system_group = parser.add_argument_group("System Configuration")
system_group.add_argument(
"--pod-ips",
type=lambda s: s.split(",") if s else None,
default=EngineArgs.pod_ips,
"--dist-init-ip",
default=EngineArgs.dist_init_ip,
help=
"List of IP addresses for nodes in the cluster (comma-separated).")
"IP addresses of master node.")
system_group.add_argument(
"--nnodes",
type=int,
default=EngineArgs.nnodes,
help=
"The number of all nodes.")
system_group.add_argument(
"--node-rank",
type=int,
default=EngineArgs.node_rank,
help=
"node rank id (range [0, nnodes)).")
# Performance tuning parameters group
@@ -789,7 +813,9 @@ class EngineArgs:
max_num_seqs=self.max_num_seqs,
speculative_config=speculative_cfg,
max_num_batched_tokens=self.max_num_batched_tokens,
pod_ips=self.pod_ips,
dist_init_ip=self.dist_init_ip,
nnodes=self.nnodes,
node_rank=self.node_rank,
use_warmup=self.use_warmup,
engine_worker_queue_port=self.engine_worker_queue_port,
limit_mm_per_prompt=self.limit_mm_per_prompt,

View File

@@ -6,7 +6,7 @@
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#dist_init_ip
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,7 +24,7 @@ from fastdeploy import envs
from fastdeploy.platforms import current_platform
from fastdeploy.scheduler import SchedulerConfig
from fastdeploy.utils import (ceil_div, check_unified_ckpt, get_host_ip,
is_port_available, llm_logger)
is_port_available, get_random_port, llm_logger)
TaskOption = Literal["generate"]
@@ -642,7 +642,9 @@ class Config:
max_model_len: int = 8192,
max_num_seqs: int = 8,
max_num_batched_tokens: Optional[int] = None,
pod_ips: Optional[List[str]] = None,
dist_init_ip: str = None,
nnodes: int = 1,
node_rank: int = 0,
speculative_config: Optional[Dict[str, Any]] = None,
graph_optimization_config: Optional[Dict[str, Any]] = None,
use_warmup: bool = False,
@@ -675,7 +677,6 @@ class Config:
max_model_len (int): Maximum model length. Default is 8192.
max_num_seqs (int): Maximum number of sequences. Default is 8.
max_num_batched_tokens (Optional[int]): Maximum number of batched tokens. Default is None.
pod_ips (Optional[List[str]]): List of POD IPs. Default is None.
mm_processor_kwargs (Optional[Dict[str, Any]]): Additional arguments for multi-modal processor. Default is None.
speculative_config (Optional[Dict[str, Any]]): Speculative execution configuration. Default is None.
graph_optimization_config (Optional[Dict[str, Any]]): Graph optimizaion backend execution configuration. Default is None.
@@ -699,7 +700,16 @@ class Config:
self.tokenizer = tokenizer
self.max_num_batched_tokens = max_num_batched_tokens
self.tensor_parallel_size = tensor_parallel_size
self.pod_ips = pod_ips
self.dist_init_ip = dist_init_ip
self.nnode = nnodes
self.node_rank = node_rank
if self.dist_init_ip is None:
self.master_ip = "0.0.0.0"
else:
self.master_ip = self.dist_init_ip
self.dist_init_addr = f"{self.dist_init_ip}:{get_random_port()}"
self.max_model_len = max_model_len
self.max_num_seqs = max_num_seqs
self.limit_mm_per_prompt = limit_mm_per_prompt
@@ -716,14 +726,8 @@ class Config:
self.graph_optimization_config = graph_optimization_config
self.guided_decoding_backend = guided_decoding_backend
self.disable_any_whitespace = disable_any_whitespace
self.is_master = True
self._str_to_list("innode_prefill_ports", int)
self._str_to_list("pod_ips", str)
if self.pod_ips is None:
self.nnode = 1
else:
self.nnode = len(self.pod_ips)
assert self.splitwise_role in ["mixed", "prefill", "decode"]
@@ -778,9 +782,9 @@ class Config:
self.host_ip = get_host_ip()
if self.pod_ips is None:
self.pod_ips = ["0.0.0.0"]
elif self.host_ip != self.pod_ips[0]:
if self.dist_init_ip is None or self.host_ip == self.master_ip:
self.is_master = True
else:
self.is_master = False
import paddle

View File

@@ -32,6 +32,7 @@ from typing import Dict, List, Optional, Tuple
import numpy as np
import paddle
import zmq
from opentelemetry import trace
from tqdm import tqdm
from fastdeploy.engine.args_utils import EngineArgs
@@ -42,13 +43,13 @@ from fastdeploy.input.preprocess import InputPreprocessor
from fastdeploy.inter_communicator import (EngineCacheQueue, EngineWorkerQueue,
IPCSignal, ZmqClient)
from fastdeploy.metrics.metrics import main_process_metrics
from fastdeploy.metrics.trace_util import start_span, start_span_request
from fastdeploy.model_executor.guided_decoding import schema_checker
from fastdeploy.output.token_processor import (TokenProcessor,
WarmUpTokenProcessor)
from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector
from fastdeploy.utils import EngineError, console_logger, llm_logger
from fastdeploy.metrics.trace_util import extract_from_metadata, start_span, start_span_request
from opentelemetry import trace
class LLMEngine(object):
"""
@@ -57,7 +58,6 @@ class LLMEngine(object):
Attributes:
cfg (Config): Configuration object containing all the parameters.
cached_generated_tokens (queue.Queue): Queue to store generated tokens.
cached_generated_tokens (queue.Queue): Queue to store generated tokens.
scheduler (LocalScheduler or GlobalScheduler): Scheduling tasks.
input_processor (InputPreprocessor): Preprocessor for input data.
resource_manager (ResourceManager): Manager for resource allocation.
@@ -174,7 +174,7 @@ class LLMEngine(object):
cache_config=self.cfg.cache_config,
tensor_parallel_size=self.cfg.tensor_parallel_size,
device_ids=device_ids,
pod_ip=self.cfg.pod_ips[0],
pod_ip=self.cfg.master_ip,
engine_worker_queue_port=self.cfg.engine_worker_queue_port,
pid_suffix=self.ipc_signal_suffix)
@@ -239,11 +239,12 @@ class LLMEngine(object):
if self.cfg.parallel_config.enable_expert_parallel and self.cfg.parallel_config.data_parallel_size > 1:
self.dp_processed = []
for i in range(1, self.cfg.parallel_config.data_parallel_size):
for i in range(1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode):
time.sleep(1)
self.dp_processed.append(
multiprocessing.Process(target=start_expert_service,
args=(self.cfg, i,
args=(self.cfg,
i + self.cfg.node_rank * self.cfg.worker_num_per_node,
self.ipc_signal_suffix)))
llm_logger.info(f"Engine is initialized successfully with {self.cfg.tensor_parallel_size}" \
+ " data parallel id {}".format(i))
@@ -263,10 +264,11 @@ class LLMEngine(object):
try:
results = self.scheduler.get_results()
if len(results) == 0:
time.sleep(0.001)
time.sleep(0.005)
continue
for request_id, contents in results.items():
for result in contents:
self.zmq_server.send_multipart(request_id, result)
self.zmq_server.send_multipart(request_id, contents)
except Exception as e:
llm_logger.error("Unexcepted error happend: {}, {}".format(
e, str(traceback.format_exc())))
@@ -357,9 +359,9 @@ class LLMEngine(object):
request, insert_task = None, []
results: List[Tuple[str, Optional[str]]] = list()
if data:
request = Request.from_dict(data)
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
request = Request.from_dict(data)
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
llm_logger.debug(f"Receive request: {request}")
@@ -692,7 +694,7 @@ class LLMEngine(object):
Insert tasks to engine.
"""
for task in tasks:
start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER)
start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER)
# TODO 返回至 scheduler
if allocated:
current_tasks = []
@@ -1006,8 +1008,6 @@ class LLMEngine(object):
)
arguments = (
f" --nnodes {str(self.cfg.nnode)}"
f" --ips {','.join(self.cfg.pod_ips)}"
f" --devices {self.cfg.device_ids} {py_script}"
f" --max_num_seqs {self.cfg.max_num_seqs} --max_model_len {self.cfg.max_model_len}"
f" --gpu_memory_utilization {self.cfg.cache_config.gpu_memory_utilization}"
@@ -1015,7 +1015,7 @@ class LLMEngine(object):
f" --device_ids {self.cfg.device_ids}"
f" --tensor_parallel_size {self.cfg.tensor_parallel_size}"
f" --engine_worker_queue_port {str(self.cfg.engine_worker_queue_port)}"
f" --pod_ip {self.cfg.pod_ips[0]}"
f" --pod_ip {self.cfg.master_ip}"
f" --total_block_num {self.cfg.cache_config.total_block_num}"
f" --block_size {self.cfg.cache_config.block_size}"
f" --enc_dec_block_num {self.cfg.cache_config.enc_dec_block_num}"
@@ -1033,10 +1033,9 @@ class LLMEngine(object):
f" --speculative_model_name_or_path {self.cfg.speculative_config.model_name_or_path}"
f" --speculative_model_quantization {self.cfg.speculative_config.quantization}"
f" --speculative_benchmark_mode {self.cfg.speculative_config.benchmark_mode}"
f" --graph_optimiaztion_config '{self.cfg.graph_optimization_config.to_json_string()}'"
f" --graph_optimization_config '{self.cfg.graph_optimization_config.to_json_string()}'"
f" --guided_decoding_backend {self.cfg.guided_decoding_backend}"
f" --load_strategy {self.cfg.model_config.load_strategy}"
f" --enable_mm {self.cfg.enable_mm}")
f" --load_strategy {self.cfg.model_config.load_strategy}")
worker_append_flag = {
@@ -1051,12 +1050,17 @@ class LLMEngine(object):
"disable_any_whitespace": self.cfg.disable_any_whitespace,
"enable-custom-all-reduce": self.cfg.parallel_config.enable_custom_all_reduce,
"enable_logprob": self.cfg.enable_logprob,
"enable_mm": self.cfg.enable_mm,
}
for worker_flag, value in worker_append_flag.items():
if value:
arguments = arguments + f" --{worker_flag}"
if self.cfg.nnode > 1:
pd_cmd = pd_cmd + f" --ips {self.cfg.ips}"
pd_cmd = pd_cmd + (
f" --master {self.cfg.dist_init_addr}"
f" --nnodes {str(self.cfg.nnode)}"
f" --rank {str(self.cfg.node_rank)}"
)
pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log"
llm_logger.info("Launch worker service command: {}".format(pd_cmd))
p = subprocess.Popen(
@@ -1157,7 +1161,7 @@ class LLMEngine(object):
cache_config=self.cfg.cache_config,
tensor_parallel_size=self.cfg.tensor_parallel_size,
device_ids=device_ids,
pod_ip=self.cfg.pod_ips[0],
pod_ip=self.cfg.master_ip,
engine_worker_queue_port=self.cfg.engine_worker_queue_port,
pid_suffix=self.ipc_signal_suffix)
def check_health(self, time_interval_threashold=30):
@@ -1244,8 +1248,9 @@ class LLMEngine(object):
"""
start queue service for engine worker communication
"""
address = (self.cfg.pod_ips[0], self.cfg.engine_worker_queue_port)
if self.cfg.host_ip == self.cfg.pod_ips[0] or self.cfg.pod_ips[0] == "0.0.0.0":
address = (self.cfg.master_ip, self.cfg.engine_worker_queue_port)
if self.cfg.host_ip == self.cfg.master_ip or self.cfg.master_ip == "0.0.0.0":
llm_logger.info(f"Starting engine worker queue server service at {address}")
self.engine_worker_queue_server = EngineWorkerQueue(
address=address,
is_server=True,
@@ -1255,7 +1260,7 @@ class LLMEngine(object):
if self.cfg.cache_config.enable_prefix_caching or self.cfg.splitwise_role != 'mixed':
self.cache_task_queue = EngineCacheQueue(
address=(self.cfg.pod_ips[0], self.cfg.cache_config.cache_queue_port),
address=(self.cfg.master_ip, self.cfg.cache_config.cache_queue_port),
authkey=b'cache_queue_service',
is_server=True,
num_client=self.cfg.tensor_parallel_size,
@@ -1269,4 +1274,6 @@ class LLMEngine(object):
is_server=False,
num_client=self.cfg.tensor_parallel_size,
client_id=0,
local_data_parallel_id=0)
local_data_parallel_size=self.cfg.parallel_config.data_parallel_size,
local_data_parallel_id= min(self.cfg.worker_num_per_node * self.cfg.node_rank,
self.cfg.parallel_config.data_parallel_size - 1))

View File

@@ -49,8 +49,8 @@ class ExpertService(object):
cfg (Config): Config object containing all the configuration parameters.
"""
self.cfg = cfg
start_pos = local_data_parallel_id * self.cfg.tensor_parallel_size
end_pos = (local_data_parallel_id + 1) * self.cfg.tensor_parallel_size
start_pos = (local_data_parallel_id * self.cfg.tensor_parallel_size) % self.cfg.worker_num_per_node
end_pos = ((local_data_parallel_id + 1) * self.cfg.tensor_parallel_size) % self.cfg.worker_num_per_node
self.cfg.cache_config.rdma_comm_ports = self.cfg.cache_config.rdma_comm_ports[
start_pos:end_pos]
self.cfg.local_device_ids = self.cfg.device_ids.split(
@@ -65,7 +65,7 @@ class ExpertService(object):
self.cfg.parallel_config.local_data_parallel_id = local_data_parallel_id
address = (cfg.pod_ips[0], cfg.engine_worker_queue_port)
address = (cfg.master_ip, cfg.engine_worker_queue_port)
self.engine_worker_queue = EngineWorkerQueue(
address=address,
is_server=False,
@@ -118,7 +118,7 @@ class ExpertService(object):
cache_config=self.cfg.cache_config,
tensor_parallel_size=self.cfg.tensor_parallel_size,
device_ids=self.cfg.local_device_ids,
pod_ip=self.cfg.pod_ips[0],
pod_ip=self.cfg.master_ip,
engine_worker_queue_port=self.cfg.engine_worker_queue_port,
pid_suffix=f"{local_data_parallel_id}_{ipc_signal_suffix}"
)

View File

@@ -20,7 +20,7 @@ import time
from dataclasses import asdict, dataclass, fields
from typing import Any, Dict, Optional, Union
import numpy
import numpy as np
from fastdeploy.engine.sampling_params import SamplingParams
from fastdeploy.utils import data_processor_logger
@@ -181,7 +181,7 @@ class Request:
f"sampling_params={self.sampling_params})")
@dataclass
@dataclass(slots=True)
class CompletionOutput:
"""The output data of one completion output of a request.
@@ -235,7 +235,7 @@ class CompletionOutput:
f"reasoning_content={self.reasoning_content!r}")
@dataclass
@dataclass(slots=True)
class RequestMetrics:
"""Metrics associated with a request.
@@ -333,6 +333,12 @@ class RequestOutput:
self.error_code = error_code
self.error_msg = error_msg
if prompt_token_ids is None:
self.prompt_token_ids = []
elif isinstance(self.prompt_token_ids, np.ndarray):
self.prompt_token_ids = self.prompt_token_ids.tolist()
def add(self, next_output: "RequestOutput") -> None:
"""Merge RequestOutput into this one"""
@@ -365,11 +371,6 @@ class RequestOutput:
def to_dict(self):
"""convert RequestOutput into a serializable dict """
if self.prompt_token_ids is None:
self.prompt_token_ids = []
if type(self.prompt_token_ids) is numpy.ndarray:
self.prompt_token_ids = self.prompt_token_ids.tolist()
return {
"request_id": self.request_id,

View File

@@ -85,7 +85,7 @@ class LLM:
self.mutex = threading.Lock()
self.req_output = dict()
self.master_node_ip = self.llm_engine.cfg.pod_ips[0]
self.master_node_ip = self.llm_engine.cfg.master_ip
self._receive_output_thread = threading.Thread(
target=self._receive_output, daemon=True)
self._receive_output_thread.start()
@@ -169,6 +169,8 @@ class LLM:
# get output
outputs = self._run_engine(req_ids, use_tqdm=use_tqdm)
for i in range(len(outputs)):
outputs[i].prompt = prompts[i]
return outputs
def chat(

View File

@@ -24,7 +24,7 @@ import zmq
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, Response, StreamingResponse
from prometheus_client import CONTENT_TYPE_LATEST
from fastdeploy.metrics.trace_util import inject_to_metadata
from fastdeploy.metrics.trace_util import inject_to_metadata,instrument
from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.engine import LLMEngine
@@ -122,8 +122,8 @@ async def lifespan(app: FastAPI):
args.mm_processor_kwargs, args.enable_mm,
args.reasoning_parser)
app.state.dynamic_load_weight = args.dynamic_load_weight
chat_handler = OpenAIServingChat(engine_client, pid, args.pod_ips)
completion_handler = OpenAIServingCompletion(engine_client, pid, args.pod_ips)
chat_handler = OpenAIServingChat(engine_client, pid, args.dist_init_ip)
completion_handler = OpenAIServingCompletion(engine_client, pid, args.dist_init_ip)
engine_client.create_zmq_client(model=pid, mode=zmq.PUSH)
engine_client.pid = pid
app.state.engine_client = engine_client
@@ -141,6 +141,7 @@ async def lifespan(app: FastAPI):
app = FastAPI(lifespan=lifespan)
instrument(app)
# TODO 传递真实引擎值 通过pid 获取状态
@@ -397,11 +398,6 @@ def launch_controller_server():
"""Controller server running the sub thread"""
if args.controller_port < 0:
return
if not is_port_available(args.host, args.controller_port):
raise Exception(
f"The parameter `controller_port`:{args.controller_port} is already in use."
)
if not is_port_available(args.host, args.controller_port):
raise Exception(

View File

@@ -20,7 +20,8 @@ import time
import traceback
import uuid
from typing import List, Optional
import numpy as np
import msgpack
import aiozmq
from aiozmq import zmq
@@ -39,16 +40,16 @@ class OpenAIServingChat:
OpenAI-style chat completions serving
"""
def __init__(self, engine_client, pid, pod_ips):
def __init__(self, engine_client, pid, dist_init_ip):
self.engine_client = engine_client
self.pid = pid
self.pod_ips = pod_ips
self.master_ip = dist_init_ip
self.host_ip = get_host_ip()
def _check_master(self):
if self.pod_ips is None:
if self.master_ip is None:
return True
if self.host_ip == self.pod_ips[0]:
if self.host_ip == self.master_ip:
return True
return False
@@ -74,6 +75,8 @@ class OpenAIServingChat:
current_req_dict = request.to_dict_for_infer(request_id)
current_req_dict["arrival_time"] = time.time()
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
if isinstance(prompt_token_ids, np.ndarray):
prompt_token_ids = prompt_token_ids.tolist()
except Exception as e:
return ErrorResponse(code=400, message=str(e))
@@ -118,6 +121,7 @@ class OpenAIServingChat:
num_choices = 1
max_streaming_response_tokens = 1
enable_thinking = None
include_stop_str_in_output = False
if request.metadata is not None and request.metadata.get("max_streaming_response_tokens", 1) > 1:
max_streaming_response_tokens = request.metadata["max_streaming_response_tokens"]
@@ -143,6 +147,9 @@ class OpenAIServingChat:
dealer.write([b"", request_id.encode('utf-8')])
choices = []
current_waiting_time = 0
if request.metadata is not None:
enable_thinking = request.metadata.get("enable_thinking")
include_stop_str_in_output = request.metadata.get("include_stop_str_in_output", False)
while num_choices > 0:
try:
raw_data = await asyncio.wait_for(dealer.read(), timeout=10)
@@ -158,102 +165,106 @@ class OpenAIServingChat:
raise ValueError(f"Engine is not healthy: {msg}")
else:
current_waiting_time = 0
await asyncio.sleep(0.1)
await asyncio.sleep(0.01)
continue
response = msgpack.unpackb(raw_data[-1])
for res in response:
if res.get("error_code", 200) != 200:
raise ValueError("{}".format(res["error_msg"]))
res = json.loads(raw_data[-1].decode('utf-8'))
if res.get("error_code", 200) != 200:
raise ValueError("{}".format(res["error_msg"]))
if request.metadata is not None:
enable_thinking = request.metadata.get("enable_thinking")
self.engine_client.data_processor.process_response_dict(
res, stream=True, enable_thinking=enable_thinking)
self.engine_client.data_processor.process_response_dict(
res, stream=True, enable_thinking=enable_thinking, include_stop_str_in_output=include_stop_str_in_output)
if res['metrics']['first_token_time'] is not None:
arrival_time = res['metrics']['first_token_time']
inference_start_time = res['metrics']['inference_start_time']
else:
arrival_time = res['metrics']['arrival_time'] - inference_start_time
if first_iteration:
num_prompt_tokens = len(prompt_token_ids)
num_cached_tokens = res.get("num_cached_tokens", 0)
for i in range(num_choices):
choice = ChatCompletionResponseStreamChoice(
index=i,
delta=DeltaMessage(role="assistant", content="", reasoning_content="", tool_calls=None)
)
if request.metadata is not None and request.metadata.get("training", False):
choice.delta.token_ids = prompt_token_ids
chunk = ChatCompletionStreamResponse(
id=request_id,
object=chunk_object_type,
created=created_time,
choices=[choice],
model=model_name
)
if include_continuous_usage:
chunk.usage = UsageInfo(
prompt_tokens=num_prompt_tokens,
completion_tokens=0,
total_tokens=num_prompt_tokens,
prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=num_cached_tokens)
)
yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n"
first_iteration = False
output = res["outputs"]
delta_text = output["text"]
raw_top_logprobs = output["top_logprobs"]
logprobs_res = None
if raw_top_logprobs is not None:
top_logprobs = LogprobsLists(
logprob_token_ids=raw_top_logprobs[0],
logprobs=raw_top_logprobs[1],
sampled_token_ranks=raw_top_logprobs[2],
)
logprobs_res = self.build_logprobs_response(
request_logprobs=request.logprobs,
response_logprobs=top_logprobs,
request_top_logprobs=request.top_logprobs,
)
previous_num_tokens += len(output["token_ids"])
delta_message = DeltaMessage(content=delta_text, reasoning_content=output.get("reasoning_content"), \
token_ids=output.get("token_ids"), tool_calls=output.get("tool_call_content", []))
choice = ChatCompletionResponseStreamChoice(
index=0,
delta=delta_message,
logprobs=logprobs_res,
arrival_time=arrival_time
)
if res["finished"]:
num_choices -= 1
work_process_metrics.e2e_request_latency.observe(time.time() - res["metrics"]["request_start_time"])
has_no_token_limit = request.max_tokens is None and request.max_completion_tokens is None
max_tokens = request.max_completion_tokens or request.max_tokens
if has_no_token_limit or previous_num_tokens != max_tokens:
choice.finish_reason = "stop"
if self.engine_client.reasoning_parser == "ernie_x1" and \
output.get("finish_reason", "") == "tool_calls":
choice.finish_reason = "tool_calls"
if res['metrics']['first_token_time'] is not None:
arrival_time = res['metrics']['first_token_time']
inference_start_time = res['metrics']['inference_start_time']
else:
choice.finish_reason = "length"
arrival_time = res['metrics']['arrival_time'] - inference_start_time
if first_iteration:
num_prompt_tokens = len(prompt_token_ids)
num_cached_tokens = res.get("num_cached_tokens", 0)
for i in range(num_choices):
choice = ChatCompletionResponseStreamChoice(
index=i,
delta=DeltaMessage(role="assistant", content="", reasoning_content="", tool_calls=None)
)
if request.metadata is not None and request.metadata.get("training", False):
choice.delta.token_ids = prompt_token_ids
chunk = ChatCompletionStreamResponse(
id=request_id,
object=chunk_object_type,
created=created_time,
choices=[choice],
model=model_name
)
if include_continuous_usage:
chunk.usage = UsageInfo(
prompt_tokens=num_prompt_tokens,
completion_tokens=0,
total_tokens=num_prompt_tokens,
prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=num_cached_tokens)
)
yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n"
first_iteration = False
if res.get("error_msg") is not None and "Recover" in res["error_msg"]:
choice.finish_reason = "recover_stop"
output = res["outputs"]
delta_text = output["text"]
raw_top_logprobs = output["top_logprobs"]
logprobs_res = None
if raw_top_logprobs is not None:
top_logprobs = LogprobsLists(
logprob_token_ids=raw_top_logprobs[0],
logprobs=raw_top_logprobs[1],
sampled_token_ranks=raw_top_logprobs[2],
)
logprobs_res = self.build_logprobs_response(
request_logprobs=request.logprobs,
response_logprobs=top_logprobs,
request_top_logprobs=request.top_logprobs,
)
if request.metadata is not None and request.metadata.get("training", False) and delta_text != "":
choice.delta.token_ids = output["token_ids"]
if include_continuous_usage:
chunk.usage = UsageInfo(
prompt_tokens=num_prompt_tokens,
completion_tokens=previous_num_tokens,
total_tokens=num_prompt_tokens + previous_num_tokens
previous_num_tokens += len(output["token_ids"])
delta_message = DeltaMessage(content=delta_text, reasoning_content=output.get("reasoning_content"), \
token_ids=output.get("token_ids"), tool_calls=output.get("tool_call_content", []))
choice = ChatCompletionResponseStreamChoice(
index=0,
delta=delta_message,
logprobs=logprobs_res,
arrival_time=arrival_time
)
choices.append(choice)
if res["finished"]:
num_choices -= 1
work_process_metrics.e2e_request_latency.observe(time.time() - res["metrics"]["request_start_time"])
has_no_token_limit = request.max_tokens is None and request.max_completion_tokens is None
max_tokens = request.max_completion_tokens or request.max_tokens
if has_no_token_limit or previous_num_tokens != max_tokens:
choice.finish_reason = "stop"
if self.engine_client.reasoning_parser == "ernie_x1" and \
output.get("finish_reason", "") == "tool_calls":
choice.finish_reason = "tool_calls"
else:
choice.finish_reason = "length"
if len(choices) == max_streaming_response_tokens or res["finished"]:
if res.get("error_msg") is not None and "Recover" in res["error_msg"]:
choice.finish_reason = "recover_stop"
if request.metadata is not None and request.metadata.get("training", False) and delta_text != "":
choice.delta.token_ids = output["token_ids"]
if include_continuous_usage:
chunk.usage = UsageInfo(
prompt_tokens=num_prompt_tokens,
completion_tokens=previous_num_tokens,
total_tokens=num_prompt_tokens + previous_num_tokens
)
choices.append(choice)
if len(choices) == max_streaming_response_tokens or res["finished"]:
chunk.choices = choices
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
choices = []
if choices:
chunk.choices = choices
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
choices = []
@@ -296,6 +307,7 @@ class OpenAIServingChat:
created_time = int(time.time())
final_res = None
enable_thinking = None
include_stop_str_in_output = False
try:
dealer = await aiozmq.create_zmq_stream(
zmq.DEALER,
@@ -321,33 +333,39 @@ class OpenAIServingChat:
await asyncio.sleep(0.1)
continue
data = json.loads(raw_data[-1].decode('utf-8'))
if data.get("error_code", 200) != 200:
raise ValueError("{}".format(data["error_msg"]))
if request.metadata is not None:
enable_thinking = request.metadata.get("enable_thinking")
data = self.engine_client.data_processor.process_response_dict(
data, stream=False, enable_thinking=enable_thinking)
# api_server_logger.debug(f"Client {request_id} received: {data}")
previous_num_tokens += len(data["outputs"]["token_ids"])
# The logprob for handling the response
output = data["outputs"]
raw_top_logprobs = output["top_logprobs"]
if raw_top_logprobs is not None:
top_logprobs = LogprobsLists(
logprob_token_ids=raw_top_logprobs[0],
logprobs=raw_top_logprobs[1],
sampled_token_ranks=raw_top_logprobs[2],
)
logprobs_res = self.build_logprobs_response(
request_logprobs=request.logprobs,
response_logprobs=top_logprobs,
request_top_logprobs=request.top_logprobs,
)
if logprobs_res and logprobs_res.content is not None:
logprob_contents.extend(logprobs_res.content)
if data["finished"]:
final_res = data
response = msgpack.unpackb(raw_data[-1])
task_is_finished = False
for data in response:
if data.get("error_code", 200) != 200:
raise ValueError("{}".format(data["error_msg"]))
if request.metadata is not None:
enable_thinking = request.metadata.get("enable_thinking")
include_stop_str_in_output = request.metadata.get("include_stop_str_in_output", False)
data = self.engine_client.data_processor.process_response_dict(
data, stream=False, enable_thinking=enable_thinking, include_stop_str_in_output=include_stop_str_in_output)
# api_server_logger.debug(f"Client {request_id} received: {data}")
previous_num_tokens += len(data["outputs"]["token_ids"])
# The logprob for handling the response
output = data["outputs"]
raw_top_logprobs = output["top_logprobs"]
if raw_top_logprobs is not None:
top_logprobs = LogprobsLists(
logprob_token_ids=raw_top_logprobs[0],
logprobs=raw_top_logprobs[1],
sampled_token_ranks=raw_top_logprobs[2],
)
logprobs_res = self.build_logprobs_response(
request_logprobs=request.logprobs,
response_logprobs=top_logprobs,
request_top_logprobs=request.top_logprobs,
)
if logprobs_res and logprobs_res.content is not None:
logprob_contents.extend(logprobs_res.content)
if data["finished"]:
final_res = data
task_is_finished = True
break
if task_is_finished:
break
finally:
dealer.close()

View File

@@ -17,6 +17,8 @@
import asyncio
import aiozmq
import json
import msgpack
import numpy as np
from aiozmq import zmq
from asyncio import FIRST_COMPLETED, AbstractEventLoop, Task
import time
@@ -44,16 +46,16 @@ from fastdeploy.engine.request import RequestOutput
class OpenAIServingCompletion:
def __init__(self, engine_client, pid, pod_ips):
def __init__(self, engine_client, pid, dist_init_ip):
self.engine_client = engine_client
self.pid = pid
self.pod_ips = pod_ips
self.master_ip = dist_init_ip
self.host_ip = get_host_ip()
def _check_master(self):
if self.pod_ips is None:
if self.master_ip is None:
return True
if self.host_ip == self.pod_ips[0]:
if self.host_ip == self.master_ip:
return True
return False
@@ -104,9 +106,10 @@ class OpenAIServingCompletion:
current_req_dict = request.to_dict_for_infer(request_id_idx, prompt)
try:
current_req_dict["arrival_time"] = time.time()
prompt_batched_token_ids.append(
self.engine_client.format_and_add_data(current_req_dict)
)
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
if isinstance(prompt_token_ids, np.ndarray):
prompt_token_ids = prompt_token_ids.tolist()
prompt_batched_token_ids.append(prompt_token_ids)
except Exception as e:
return ErrorResponse(message=str(e), code=400)
@@ -179,18 +182,20 @@ class OpenAIServingCompletion:
current_waiting_time = 0
await asyncio.sleep(0.1)
continue
data = json.loads(raw_data[-1].decode("utf-8"))
rid = int(data["request_id"].split("-")[-1])
if data.get("error_code", 200) != 200:
raise ValueError("{}".format(data["error_msg"]))
response = msgpack.unpackb(raw_data[-1])
for data in response:
rid = int(data["request_id"].split("-")[-1])
if data.get("error_code", 200) != 200:
raise ValueError("{}".format(data["error_msg"]))
self.engine_client.data_processor.process_response_dict(
data, stream=False)
output_tokens[rid] += len(data["outputs"]["token_ids"])
if data.get("finished", False):
data["output_token_ids"] = output_tokens[rid]
valid_results[rid] = data
num_choices -= 1
self.engine_client.data_processor.process_response_dict(
data, stream=False)
output_tokens[rid] += len(data["outputs"]["token_ids"])
if data.get("finished", False):
data["output_token_ids"] = output_tokens[rid]
valid_results[rid] = data
num_choices -= 1
break
return self.request_output_to_completion_response(
final_res_batch=valid_results,
@@ -238,6 +243,12 @@ class OpenAIServingCompletion:
if request.suffix is not None and request.suffix.get("max_streaming_response_tokens", 1) > 1:
max_streaming_response_tokens = request.suffix["max_streaming_response_tokens"]
choices = []
chunk = CompletionStreamResponse(
id=request_id,
created=created_time,
model=model_name,
choices=choices
)
current_waiting_time = 0
while num_choices > 0:
@@ -256,82 +267,86 @@ class OpenAIServingCompletion:
continue
res = json.loads(raw_data[-1].decode('utf-8'))
idx = int(res["request_id"].split("-")[-1])
if res.get("error_code", 200) != 200:
raise ValueError("{}".format(res["error_msg"]))
response = msgpack.unpackb(raw_data[-1])
for res in response:
idx = int(res["request_id"].split("-")[-1])
if res.get("error_code", 200) != 200:
raise ValueError("{}".format(res["error_msg"]))
if first_iteration[idx]:
if request.suffix is not None and request.suffix.get("training", False):
if first_iteration[idx]:
if request.suffix is not None and request.suffix.get("training", False):
chunk = CompletionStreamResponse(
id=request_id,
created=created_time,
model=model_name,
choices=[CompletionResponseStreamChoice(
index=idx,
text="",
token_ids=list(prompt_batched_token_ids[idx])
)]
)
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
first_iteration[idx] = False
self.engine_client.data_processor.process_response_dict(
res, stream=True)
if res['metrics'].get('first_token_time') is not None:
arrival_time = res['metrics']['first_token_time']
inference_start_time[idx] = res['metrics']['inference_start_time']
else:
arrival_time = res['metrics']['arrival_time'] - inference_start_time[idx]
output = res["outputs"]
choices.append(CompletionResponseStreamChoice(
index=idx,
text=output["text"],
token_ids=output.get("token_ids"),
tool_calls=output.get("tool_call_content"),
reasoning_content=output.get("reasoning_content"),
arrival_time=arrival_time
))
if res["finished"]:
if request.max_tokens is None or output_tokens[idx] + 1 != request.max_tokens:
chunk.choices[0].finish_reason = "stop"
if self.engine_client.reasoning_parser == "ernie_x1" and \
output.get("finish_reason", "") == "tool_calls":
chunk.choices[0].finish_reason = "tool_calls"
else:
chunk.choices[0].finish_reason = "length"
output_tokens[idx] += 1
if len(choices) == max_streaming_response_tokens or res["finished"]:
chunk = CompletionStreamResponse(
id=request_id,
created=created_time,
model=model_name,
choices=[CompletionResponseStreamChoice(
index=idx,
text="",
token_ids=list(prompt_batched_token_ids[idx])
)]
choices=choices
)
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
first_iteration[idx] = False
choices = []
self.engine_client.data_processor.process_response_dict(
res, stream=True)
if res['metrics'].get('first_token_time') is not None:
arrival_time = res['metrics']['first_token_time']
inference_start_time[idx] = res['metrics']['inference_start_time']
else:
arrival_time = res['metrics']['arrival_time'] - inference_start_time[idx]
# api_server_logger.info(f"{arrival_time}")
output = res["outputs"]
choices.append(CompletionResponseStreamChoice(
index=idx,
text=output["text"],
token_ids=output.get("token_ids"),
tool_calls=output.get("tool_call_content"),
reasoning_content=output.get("reasoning_content"),
arrival_time=arrival_time
))
if res["finished"]:
if request.max_tokens is None or output_tokens[idx] + 1 != request.max_tokens:
chunk.choices[0].finish_reason = "stop"
if self.engine_client.reasoning_parser == "ernie_x1" and \
output.get("finish_reason", "") == "tool_calls":
chunk.choices[0].finish_reason = "tool_calls"
else:
chunk.choices[0].finish_reason = "length"
output_tokens[idx] += 1
if len(choices) == max_streaming_response_tokens or res["finished"]:
chunk = CompletionStreamResponse(
id=request_id,
created=created_time,
model=model_name,
choices=choices
)
choices = []
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
if res["finished"]:
num_choices -= 1
if getattr(request, "stream_options", None) and request.stream_options.include_usage:
usage_chunk = CompletionStreamResponse(
id=request_id,
created=created_time,
model=model_name,
choices=[],
usage=UsageInfo(
prompt_tokens=len(prompt_batched_token_ids[idx]),
completion_tokens=output_tokens[idx]
if res["finished"]:
num_choices -= 1
if getattr(request, "stream_options", None) and request.stream_options.include_usage:
usage_chunk = CompletionStreamResponse(
id=request_id,
created=created_time,
model=model_name,
choices=[],
usage=UsageInfo(
prompt_tokens=len(prompt_batched_token_ids[idx]),
completion_tokens=output_tokens[idx]
)
)
)
yield f"data: {usage_chunk.model_dump_json(exclude_unset=True)}\n\n"
yield f"data: {usage_chunk.model_dump_json(exclude_unset=True)}\n\n"
if choices:
chunk.choices = choices
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
choices = []
except Exception as e:

View File

@@ -101,6 +101,34 @@ environment_variables: dict[str, Callable[[], Any]] = {
# Whether to use DeepGemm for FP8 blockwise MoE.
"FD_USE_DEEP_GEMM":
lambda: bool(int(os.getenv("FD_USE_DEEP_GEMM", "1"))),
# Whether to use aggregate send.
"FD_USE_AGGREGATE_SEND":
lambda: bool(int(os.getenv("FD_USE_AGGREGATE_SEND", "0"))),
# Whether to open Trace.
"TRACES_ENABLE":
lambda: os.getenv("TRACES_ENABLE", "false"),
# set traec Server name.
"FD_SERVICE_NAME":
lambda: os.getenv("FD_SERVICE_NAME", "FastDeploy"),
# set traec host name.
"FD_HOST_NAME":
lambda: os.getenv("FD_HOST_NAME", "localhost"),
# set traec exporter.
"TRACES_EXPORTER":
lambda: os.getenv("TRACES_EXPORTER", "console"),
# set traec exporter_otlp_endpoint.
"EXPORTER_OTLP_ENDPOINT":
lambda: os.getenv("EXPORTER_OTLP_ENDPOINT"),
# set traec exporter_otlp_headers.
"EXPORTER_OTLP_HEADERS":
lambda: os.getenv("EXPORTER_OTLP_HEADERS"),
}

View File

@@ -43,8 +43,7 @@ def import_custom_ops(package, module_name, global_ns):
logger.warning(f"Failed to import op {func_name}: {e}")
except Exception:
logger.warning(
f"Ops of {package} import failed, it may be not compiled.")
logger.warning(f"Ops of {package} import failed, it may be not compiled.")
preprocess_static_op(global_ns)
@@ -71,20 +70,24 @@ def wrap_unified_op(original_cpp_ext_op, original_custom_op):
original_cpp_ext_op: Original C++ extension operator function.
original_custom_op: Original custom operator function.
"""
try:
@paddle.jit.marker.unified
@functools.wraps(original_custom_op)
def unified_op(*args, **kwargs):
if paddle.in_dynamic_mode():
res = original_cpp_ext_op(*args, **kwargs)
if res is None:
return None
# TODO(DrRyanHuang): Remove this if when we align the implementation of custom op and C++ extension
if isinstance(res, list) and len(res) == 1:
return res[0]
return res
return original_custom_op(*args, **kwargs)
@paddle.jit.marker.unified
@functools.wraps(original_custom_op)
def unified_op(*args, **kwargs):
if paddle.in_dynamic_mode():
res = original_cpp_ext_op(*args, **kwargs)
if res is None:
return None
# TODO(DrRyanHuang): Remove this if when we align the implementation of custom op and C++ extension
if isinstance(res, list) and len(res) == 1:
return res[0]
return res
return original_custom_op(*args, **kwargs)
except:
unified_op = None
logger.warning("Paddle version not support JIT mode.")
return unified_op

View File

@@ -125,6 +125,8 @@ class ErnieProcessor(BaseDataProcessor):
if request.get("temperature") < _SAMPLING_EPS:
# zero temperature is equivalent to greedy sampling
request.set("temperature", 1)
if request.get("top_p") < _SAMPLING_EPS:
request.set("top_p", _SAMPLING_EPS)
data_processor_logger.info(f"Processed request {request}")
return request
@@ -182,6 +184,8 @@ class ErnieProcessor(BaseDataProcessor):
if request.get("temperature") < _SAMPLING_EPS:
# zero temperature is equivalent to greedy sampling
request["temperature"] = 1
if request.get("top_p") < _SAMPLING_EPS:
request["top_p"] = _SAMPLING_EPS
data_processor_logger.info(f"Processed request {request}")
return request
@@ -248,7 +252,7 @@ class ErnieProcessor(BaseDataProcessor):
token_ids = response_dict["outputs"]["token_ids"]
is_end = response_dict["finished"]
req_id = response_dict["request_id"]
if is_end and len(token_ids) > 0:
if is_end and len(token_ids) > 0 and not kwargs.get("include_stop_str_in_output"):
if token_ids[-1] == self.tokenizer.eos_token_id:
token_ids = token_ids[:-1]
delta_text, _, previous_texts = self.ids2tokens(token_ids, req_id)
@@ -283,7 +287,7 @@ class ErnieProcessor(BaseDataProcessor):
req_id = response_dict["request_id"]
token_ids = response_dict["outputs"]["token_ids"]
if is_end and len(token_ids) > 0:
if is_end and len(token_ids) > 0 and not kwargs.get("include_stop_str_in_output"):
if token_ids[-1] == self.tokenizer.eos_token_id:
token_ids = token_ids[:-1]
delta_text, previous_token_ids, previous_texts = self.ids2tokens(

View File

@@ -77,6 +77,7 @@ class DataProcessor:
CLS_TOKEN = "<|begin_of_sentence|>"
SEP_TOKEN = "<|end_of_sentence|>"
EOS_TOKEN = "</s>"
IMG_START = "<|IMAGE_START|>"
IMG_END = "<|IMAGE_END|>"
VID_START = "<|VIDEO_START|>"
@@ -125,6 +126,7 @@ class DataProcessor:
# Special tokens and IDs
self.cls_token = self.CLS_TOKEN
self.sep_token = self.SEP_TOKEN
self.eos_token = self.EOS_TOKEN
self.image_start = self.IMG_START
self.image_end = self.IMG_END
self.video_start = self.VID_START
@@ -132,6 +134,9 @@ class DataProcessor:
self.image_patch_id = self.tokenizer.convert_tokens_to_ids("<|IMAGE_PLACEHOLDER|>")
self.image_start_id = self.tokenizer.convert_tokens_to_ids(self.image_start)
self.video_start_id = self.tokenizer.convert_tokens_to_ids(self.video_start)
self.sep_token_id = self.tokenizer.convert_tokens_to_ids(self.sep_token)
self.eos_token_id = self.tokenizer.convert_tokens_to_ids(self.eos_token)
self.token_type_mapping = self._build_token_type_mapping()
self.is_training = True
@@ -204,7 +209,7 @@ class DataProcessor:
return outputs
def request2ids(self, request: Dict[str, Any]) -> Dict[str, Union[np.ndarray, List[np.ndarray], None]]:
def request2ids(self, request: Dict[str, Any],tgts: List[str]=None) -> Dict[str, Union[np.ndarray, List[np.ndarray], None]]:
"""
Convert chat messages into model inputs.
Returns a dict with input_ids, token_type_ids, position_ids, images, grid_thw, image_type_ids, labels.
@@ -258,6 +263,10 @@ class DataProcessor:
self._add_video(frames, outputs)
image_message_index += 1
self._add_text(prompt_token_ids[image_start_index:], outputs)
if self.is_training:
assert tgts, f"training must give tgt !"
self._extract_labels(outputs,tgts)
return outputs
def _add_special_token(self, token: Union[str, int], outputs: Dict) -> None:
@@ -339,6 +348,26 @@ class DataProcessor:
outputs["position_ids"].extend(pos_ids)
outputs["cur_position"] = np.max(pos_ids) + 1
def _extract_labels(self, outputs: Dict, tgts: List[str]) -> None:
input_ids = copy.deepcopy(outputs['input_ids'])
labels = [self.tokenizer.ignored_index] * len(input_ids)
tgt_count=input_ids.count(self.sep_token_id)
assert tgt_count==len(tgts),f'len(tgts) != len(src) {len(tgts)} vs {tgt_count}'
tgt_index=0
for i,token_id in enumerate(input_ids):
if token_id==self.sep_token_id:
labels_token = self.tokenizer.tokenize(tgts[tgt_index])
labels_token_id = self.tokenizer.convert_tokens_to_ids(labels_token)
labels[i-len(labels_token_id):i]=labels_token_id
labels[i] = self.eos_token_id #</s>
tgt_index += 1
outputs['labels']=labels
def _load_and_process_video(self, url: str, item: Dict) -> List[Image.Image]:
reader, meta, path = read_video_decord(url, save_to_disk=False)

View File

@@ -17,6 +17,7 @@ from typing import Any, Dict, Optional
from fastdeploy.engine.config import ModelConfig
from fastdeploy.reasoning import ReasoningParserManager
from fastdeploy.config import ErnieArchitectures
class InputPreprocessor:
@@ -71,8 +72,7 @@ class InputPreprocessor:
self.reasoning_parser)
architectures = ModelConfig(self.model_name_or_path).architectures
if not self.enable_mm:
if "Ernie4_5_MoeForCausalLM" not in architectures \
and "Ernie4_5_ForCausalLM" not in architectures:
if not ErnieArchitectures.contains_ernie_arch(architectures):
from fastdeploy.input.text_processor import DataProcessor
self.processor = DataProcessor(
model_name_or_path=self.model_name_or_path, reasoning_parser_obj=reasoning_parser_obj)

View File

@@ -258,6 +258,8 @@ class DataProcessor(BaseDataProcessor):
if request.get("temperature") < _SAMPLING_EPS:
# zero temperature is equivalent to greedy sampling
request.set("temperature", 1)
if request.get("top_p") < _SAMPLING_EPS:
request.set("top_p", _SAMPLING_EPS)
data_processor_logger.info(f"Processed request {request}")
return request
@@ -306,6 +308,8 @@ class DataProcessor(BaseDataProcessor):
if request.get("temperature") < _SAMPLING_EPS:
# zero temperature is equivalent to greedy sampling
request["temperature"] = 1
if request.get("top_p") < _SAMPLING_EPS:
request["top_p"] = _SAMPLING_EPS
data_processor_logger.info(f"Processed request {request}")
return request
@@ -355,7 +359,7 @@ class DataProcessor(BaseDataProcessor):
token_ids = response_dict["outputs"]["token_ids"]
is_end = response_dict["finished"]
req_id = response_dict["request_id"]
if is_end and len(token_ids) > 0:
if is_end and len(token_ids) > 0 and not kwargs.get("include_stop_str_in_output"):
if token_ids[-1] == self.tokenizer.eos_token_id:
token_ids = token_ids[:-1]
delta_text, _, previous_texts = self.ids2tokens(token_ids, req_id)
@@ -390,7 +394,7 @@ class DataProcessor(BaseDataProcessor):
req_id = response_dict["request_id"]
token_ids = response_dict["outputs"]["token_ids"]
if is_end and len(token_ids) > 0:
if is_end and len(token_ids) > 0 and not kwargs.get("include_stop_str_in_output"):
if token_ids[-1] == self.tokenizer.eos_token_id:
token_ids = token_ids[:-1]
delta_text, previous_token_ids, previous_texts = self.ids2tokens(
@@ -430,7 +434,7 @@ class DataProcessor(BaseDataProcessor):
response_dict, enable_thinking=enable_thinking, **kwargs)
else:
return self.process_response_dict_normal(
response_dict=response_dict, enable_thinking=enable_thinking)
response_dict=response_dict, enable_thinking=enable_thinking, **kwargs)
def text2ids(self, text, max_model_len, raw_request=True):
"""

View File

@@ -20,6 +20,7 @@ import threading
import time
import zmq
import msgpack
from fastdeploy import envs
from fastdeploy.utils import llm_logger
@@ -37,6 +38,7 @@ class ZmqClient:
self.router_path = f"/dev/shm/router_{name}.ipc"
self.ZMQ_SNDHWM = int(envs.FD_ZMQ_SNDHWM)
self.aggregate_send = envs.FD_USE_AGGREGATE_SEND
self.mutex = threading.Lock()
self.req_dict = dict()
@@ -93,6 +95,16 @@ class ZmqClient:
"""
return self.socket.recv_pyobj()
def pack_aggregated_data(self, data):
"""
Aggregate multiple responses into one and send them to the client.
"""
result = data[0]
if len(data) > 1:
for response in data[1:]:
result.add(response)
result = msgpack.packb([result.to_dict()])
return result
def send_multipart(self, req_id, data):
"""
Send a multipart message to the router socket.
@@ -116,14 +128,22 @@ class ZmqClient:
break
try:
result = json.dumps(data.to_dict()).encode('utf-8')
start_send = time.time()
if self.aggregate_send:
result = self.pack_aggregated_data(data)
else:
result = msgpack.packb([response.to_dict() for response in data])
self.router.send_multipart([self.req_dict[req_id], b'', result])
llm_logger.debug(f"send_multipart result: {req_id} len {len(data)} elapse: {time.time()-start_send}")
except Exception as e:
llm_logger.error(f"Send result to zmq client failed: {e}")
if data.finished:
if data[-1].finished:
with self.mutex:
self.req_dict.pop(data.request_id, None)
self.req_dict.pop(req_id, None)
llm_logger.info(f"send_multipart finished, req_id: {req_id}")
def receive_json_once(self, block=False):
"""

View File

@@ -1,15 +1,83 @@
from opentelemetry.propagate import inject, extract
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from fastapi import FastAPI
from fastdeploy.utils import (llm_logger)
from fastdeploy import envs
import json
import os
# create global OpenTelemetry tracer
tracer = trace.get_tracer(__name__)
# OpenTelemetry Trace context store in metadata
TRACE_CARRIER = "trace_carrier"
traces_enable = False
tracer = trace.get_tracer(__name__)
def set_up():
try:
# when TRACES_ENABLED=true start trace
global traces_enable
traces_enable = envs.TRACES_ENABLE.lower() == "true"
if not traces_enable:
llm_logger.warning("Opentelemetry is DISABLED.")
return
llm_logger.info("Opentelemetry is ENABLED, configuring...")
# --- read env ---
service_name = envs.FD_SERVICE_NAME
host_name = envs.FD_HOST_NAME
# --- set attributes (Service Name, Host Name, etc.) ---
resource_attributes = {
"service.name": service_name
}
if host_name:
resource_attributes["host.name"] = host_name
resource = Resource(attributes=resource_attributes)
# --- set Exporter ---
exporter_type = envs.TRACES_EXPORTER.lower()
if exporter_type == "otlp":
endpoint = envs.EXPORTER_OTLP_ENDPOINT # should be set
headers = envs.EXPORTER_OTLP_HEADERS # e.g., "Authentication=***,k2=v2"
otlp_exporter = OTLPSpanExporter(
endpoint=endpoint,
headers=dict(item.split("=") for item in headers.split(",")) if headers else None
)
processor = BatchSpanProcessor(otlp_exporter)
llm_logger.info(f"Using OTLP Exporter, sending to {endpoint} with headers {headers}")
else: # default console
processor = BatchSpanProcessor(ConsoleSpanExporter())
llm_logger.info("Using Console Exporter.")
# --- set Tracer Provider ---
provider = TracerProvider(resource=resource)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
global tracer
tracer = trace.get_tracer(__name__)
except:
llm_logger.error("set_up failed")
pass
def instrument(app: FastAPI):
try:
set_up()
if traces_enable:
llm_logger.info("Applying instrumentors...")
FastAPIInstrumentor.instrument_app(app)
except:
llm_logger.info("instrument failed")
pass
def inject_to_metadata(request, metadata_attr='metadata'):
"""
Inject OpenTelemetry trace context into the metadata field of the request.
@@ -28,9 +96,7 @@ def inject_to_metadata(request, metadata_attr='metadata'):
- If there is no metadata attribute in the request, an empty dict will be created for it as its attribute
"""
try:
if request is None:
return
if is_opentelemetry_instrumented() == False:
if request is None or traces_enable == False:
return
metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None)
@@ -48,6 +114,7 @@ def inject_to_metadata(request, metadata_attr='metadata'):
except:
pass
def extract_from_metadata(request, metadata_attr='metadata'):
"""
Extract trace context from metadata of request object (dict or class instance).
@@ -74,7 +141,7 @@ def extract_from_metadata(request, metadata_attr='metadata'):
return ctx
except:
return None
def extract_from_request(request):
"""
@@ -100,12 +167,13 @@ def extract_from_request(request):
except:
return None
def start_span(span_name, request, kind=trace.SpanKind.CLIENT):
"""
just start a new span in request trace context
"""
try:
if is_opentelemetry_instrumented() == False:
if not traces_enable:
return
# extract Trace context from request.metadata.trace_carrier
ctx = extract_from_metadata(request)
@@ -114,31 +182,17 @@ def start_span(span_name, request, kind=trace.SpanKind.CLIENT):
except:
pass
def start_span_request(span_name, request, kind=trace.SpanKind.CLIENT):
"""
just start a new span in request trace context
"""
try:
if is_opentelemetry_instrumented() == False:
if not traces_enable:
return
# extract Trace context from request.metadata.trace_carrier
ctx = extract_from_request(request)
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
pass
except:
pass
def is_opentelemetry_instrumented() -> bool:
"""
check OpenTelemetry is start or not
"""
try:
return (
os.getenv("OTEL_PYTHONE_DISABLED_INSTRUMENTATIONS") is not None
or os.getenv("OTEL_SERVICE_NAME") is not None
or os.getenv("OTEL_TRACES_EXPORTER") is not None
)
except Exception:
return False
pass

View File

@@ -17,7 +17,7 @@
import os
from concurrent.futures import ThreadPoolExecutor
from fastdeploy.config import FDConfig
from fastdeploy.config import FDConfig, ErnieArchitectures
from fastdeploy.engine.request import Request
from fastdeploy.utils import llm_logger
@@ -268,8 +268,7 @@ class BackendBase:
"""
try:
architectures = self.fd_config.model_config.architectures
if "Ernie4_5_MoeForCausalLM" not in architectures \
and "Ernie4_5_ForCausalLM" not in architectures:
if not ErnieArchitectures.contains_ernie_arch(architectures):
from transformers import AutoTokenizer, PreTrainedTokenizerFast
tokenizer = AutoTokenizer.from_pretrained(

View File

@@ -58,7 +58,6 @@ class VocabParallelEmbedding(nn.Layer):
self.column_cut = False
self.world_size: int = hcg.get_model_parallel_world_size()
self.ring_id: int = hcg.get_model_parallel_group().id
self.use_rope: bool = fd_config.model_config.use_rope
self.use_ep: bool = fd_config.parallel_config.use_ep
self.hidden_dropout_prob: float = fd_config.model_config.hidden_dropout_prob
self.initializer_range: float = fd_config.model_config.initializer_range
@@ -92,14 +91,6 @@ class VocabParallelEmbedding(nn.Layer):
self.embeddings.weight.is_distributed = True
self.embeddings.weight.split_axis = 1
if not self.use_rope:
self.position_embeddings = nn.Embedding(
self.max_position_embeddings,
embedding_dim,
weight_attr=paddle.ParamAttr(initializer=nn.initializer.Normal(
mean=0.0, std=self.initializer_range), ),
)
self.prefix = prefix
self.dropout = nn.Dropout(self.hidden_dropout_prob)

View File

@@ -1,217 +0,0 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import paddle
import paddle.nn.functional as F
from paddle import nn
from paddle.distributed import fleet
from paddle.distributed.fleet.meta_parallel import (ColumnParallelLinear,
VocabParallelEmbedding)
from paddleformers.utils.log import logger
from .utils import get_tensor
class ResBlock(nn.Layer):
"""
A Residual Block module.
This module performs a linear transformation followed by a SiLU activation,
and then adds the result to the original input, creating a residual connection.
Args:
hidden_size (int): The size of the hidden layers in the block.
"""
def __init__(self, hidden_size, num_condition=0):
super().__init__()
self.linear = nn.Linear(hidden_size * (num_condition + 1), hidden_size)
if num_condition > 0:
self.res_connection = nn.Linear(
hidden_size * (num_condition + 1), hidden_size
)
else:
self.res_connection = nn.Identity()
# Initialize as an identity mapping
# _no_grad_fill_(self.linear.weight, 0)
# Use SiLU activation to keep consistent with the Llama model
self.act = nn.Silu()
@paddle.no_grad()
def forward(self, x):
"""
Forward pass of the ResBlock.
Args:
x (paddle.Tensor): Input tensor.
Returns:
paddle.Tensor: Output after the residual connection and activation.
"""
return self.res_connection(x) + self.act(self.linear(x))
class HydraHead(nn.Layer):
"""
A Hydra Head module.
This module performs multi hydra head layers,
each of which is a hydra_lm_head followed by a head
Args:
hydra_num_heads (int): The number of hyhra heads.
hydra_num_layers (int): The number of layers.
hidden_size (int): The size of the hidden layers in the block.
tensor_parallel_degree(int): TP degree.
vocab_size (int): The size of vocabulary.
"""
def __init__(
self,
hydra_num_heads,
hydra_num_layers,
hidden_size,
tensor_parallel_degree,
vocab_size,
):
super().__init__()
self.hydra_num_heads = hydra_num_heads
self.hydra_num_layers = hydra_num_layers
self.hidden_size = hidden_size
self.tensor_parallel_degree = tensor_parallel_degree
self.vocab_size = vocab_size
self.hydra_mlp = nn.LayerList(
[
nn.Sequential(
ResBlock(self.hidden_size, hydra_head_idx + 1),
*([ResBlock(self.hidden_size)] * (self.hydra_num_layers - 1)),
)
for hydra_head_idx in range(self.hydra_num_heads)
]
)
if self.tensor_parallel_degree > 1:
self.hydra_lm_head = nn.LayerList(
[
ColumnParallelLinear(
self.hidden_size,
self.vocab_size,
weight_attr=paddle.ParamAttr(
initializer=nn.initializer.Normal(mean=0.0, std=0.0)
),
gather_output=True,
has_bias=False,
)
for _ in range(self.hydra_num_heads)
]
)
else:
self.hydra_lm_head = nn.LayerList(
[
nn.Linear(self.hidden_size, self.vocab_size, bias_attr=False)
for _ in range(self.hydra_num_heads)
]
)
self.embeddings = VocabParallelEmbedding(
vocab_size,
hidden_size,
mp_group=fleet.get_hybrid_communicate_group().get_model_parallel_group(),
weight_attr=paddle.ParamAttr(initializer=nn.initializer.Normal(mean=0.0)),
)
def custom_set_state_dict(self, state_dict):
"""
Load Parameter of Hydra Head from state_dict with custom names.
Args:
state_dict (dict): KV pair of name and parameters.
"""
for hydra_head_idx in range(self.hydra_num_heads):
self.hydra_mlp[hydra_head_idx][0].res_connection.weight.set_value(
get_tensor(
state_dict.pop(f"0.{hydra_head_idx}.0.res_connection.weight")
)
)
self.hydra_mlp[hydra_head_idx][0].res_connection.bias.set_value(
get_tensor(state_dict.pop(f"0.{hydra_head_idx}.0.res_connection.bias"))
)
for layer_idx in range(self.hydra_num_layers):
self.hydra_mlp[hydra_head_idx][layer_idx].linear.weight.set_value(
get_tensor(
state_dict.pop(f"0.{hydra_head_idx}.{layer_idx}.linear.weight")
)
)
self.hydra_mlp[hydra_head_idx][layer_idx].linear.bias.set_value(
get_tensor(
state_dict.pop(f"0.{hydra_head_idx}.{layer_idx}.linear.bias")
)
)
self.hydra_lm_head[hydra_head_idx].weight.set_value(
get_tensor(state_dict.pop(f"1.{hydra_head_idx}.weight"))
)
self.embeddings.weight.set_value(
get_tensor(state_dict.pop("embeddings.weight"))
)
def set_state_dict(self, state_dict):
"""
Load Parameter of Hydra Head from state_dict.
Args:
state_dict (dict): KV pair of name and parameters.
"""
is_custom = True
for key in state_dict.keys():
if key != "embeddings.weight" and (
"hydra_mlp" in key or "hydra_head" in key
):
is_custom = False
break
if is_custom:
logger.info("Hydra use custom set_state_dict")
self.custom_set_state_dict(state_dict)
else:
logger.info("Hydra use default set_state_dict")
super().set_state_dict(state_dict)
@paddle.no_grad()
def forward(self, input_ids, hidden_states, next_tokens):
"""
Forward pass of Hydra Head
Args:
input_ids: [batch_size, 1] The tokens sampled by the previous head go through the embedding,
starting with the last accept token
hidden_states: [batch_size, hidden_size] The hidden_states of the last accept_tokens
"""
hydra_inputs = [hidden_states]
input_embeds = self.embeddings(input_ids)
for hydra_head_idx in range(self.hydra_num_heads):
hydra_inputs.append(input_embeds)
head_input = paddle.concat(hydra_inputs, axis=-1)
hidden_states = self.hydra_mlp[hydra_head_idx](head_input)
logits = self.hydra_lm_head[hydra_head_idx](hidden_states)
probs = F.softmax(logits)
_, topk_tokens = paddle.topk(probs, k=1, axis=-1)
next_tokens[:, 1 + hydra_head_idx : 2 + hydra_head_idx] = topk_tokens[:]
input_embeds = self.embeddings(next_tokens[:, 1 + hydra_head_idx])

View File

@@ -43,3 +43,5 @@ class SamplingMetadata:
top_p: paddle.Tensor
top_k: Optional[paddle.Tensor] = None
max_num_logprobs: Optional[int] = None
prompt_ids: Optional[paddle.Tensor] = None
prompt_lens: Optional[paddle.Tensor] = None

View File

@@ -21,6 +21,8 @@ from fastdeploy.platforms import current_platform
def apply_penalty_multi_scores(
pre_token_ids: paddle.Tensor,
prompt_ids: paddle.Tensor,
prompt_lens: paddle.Tensor,
logits: paddle.Tensor,
repetition_penalties: paddle.Tensor,
frequency_penalties: paddle.Tensor,
@@ -39,6 +41,8 @@ def apply_penalty_multi_scores(
get_token_penalty_multi_scores
logits = get_token_penalty_multi_scores(
pre_token_ids,
prompt_ids,
prompt_lens,
logits,
repetition_penalties,
frequency_penalties,
@@ -69,6 +73,8 @@ def apply_penalty_multi_scores(
get_token_penalty_multi_scores
logits = get_token_penalty_multi_scores(
pre_token_ids,
prompt_ids,
prompt_lens,
logits,
repetition_penalties,
frequency_penalties,

View File

@@ -253,6 +253,8 @@ class Sampler(nn.Layer):
logits = apply_penalty_multi_scores(
sampling_metadata.pre_token_ids,
sampling_metadata.prompt_ids,
sampling_metadata.prompt_lens,
logits,
sampling_metadata.repetition_penalties,
sampling_metadata.frequency_penalties,

View File

@@ -606,8 +606,8 @@ class Ernie4_5_PretrainedModel(PretrainedModel):
return final_actions
mappings = get_tensor_parallel_split_mappings(
config.num_hidden_layers,
config.moe_num_experts,
config.moe_layer_start_index,
getattr(config, "moe_num_experts", 0),
getattr(config, "moe_layer_start_index", -1),
config.prefix_name,
)
return mappings

View File

@@ -161,7 +161,7 @@ class Ernie4_5_VLMoE(nn.Layer):
self.num_shared_experts = fd_config.model_config.moe_num_shared_experts
if self.num_shared_experts > 0:
self.share_experts = Ernie4_5_VLMLP(
self.shared_experts = Ernie4_5_VLMLP(
fd_config=fd_config,
intermediate_size=self.num_shared_experts *
fd_config.model_config.moe_intermediate_size[0],
@@ -193,11 +193,11 @@ class Ernie4_5_VLMoE(nn.Layer):
if self.text_fused_moe.moe_use_gate_correction_bias:
state_dict.pop(self.text_fused_moe.gate_correction_bias_key)
if self.num_shared_experts > 0:
self.share_experts.load_state_dict(state_dict)
self.shared_experts.load_state_dict(state_dict)
def forward(self, hidden_states: paddle.Tensor, vl_moe_meta: VLMoEMeta):
if self.num_shared_experts > 0:
share_experts_out = self.share_experts(hidden_states)
shared_experts_out = self.shared_experts(hidden_states)
if vl_moe_meta.image_input is not None:
text_image_gather_scatter(
hidden_states,
@@ -222,7 +222,7 @@ class Ernie4_5_VLMoE(nn.Layer):
else:
hidden_states = self.text_fused_moe(hidden_states)
if self.num_shared_experts > 0:
hidden_states += share_experts_out
hidden_states += shared_experts_out
if self.tp_size > 1:
tensor_model_parallel_all_reduce(hidden_states)
return hidden_states
@@ -759,4 +759,4 @@ class Ernie4_5_VLPretrainedModel(PretrainedModel):
config.vision_config.get("depth")
)
return {**mappings, **vision_mappings}
return {**mappings, **vision_mappings}

View File

@@ -505,8 +505,6 @@ class TokenProcessor(object):
result.outputs.token_ids.append(token_id)
if token_id in task.eos_token_ids or is_prefill or recovery_stop:
result.finished = True
result.prompt = task.prompt
result.prompt_token_ids = task.prompt_token_ids
if recovery_stop:
result.error_msg = "Recover is not supported, the result is incomplete!"
llm_logger.info(

View File

@@ -51,12 +51,14 @@ class RolloutModelConfig:
enable_prefix_caching: bool = False,
splitwise_role: str = "mixed",
expert_parallel_size: int = 1,
enable_expert_parallell: bool = False,
enable_expert_parallel: bool = False,
ori_vocab_size: int = None,
quantization: str = "None",
guided_decoding_backend: str = "off",
disable_any_whitespace: bool = True,
enable_logprob: bool = False,
graph_optimization_config: str = None,
local_rank: int = 0
):
# Required parameters
self.model_name_or_path = model_name_or_path
@@ -90,16 +92,18 @@ class RolloutModelConfig:
self.enable_prefix_caching = enable_prefix_caching
self.splitwise_role = splitwise_role
self.expert_parallel_size = expert_parallel_size
self.enable_expert_parallell = enable_expert_parallell
self.enable_expert_parallel = enable_expert_parallel
self.ori_vocab_size = ori_vocab_size
self.quantization = quantization
self.guided_decoding_backend = guided_decoding_backend
self.disable_any_whitespace = disable_any_whitespace
self.enable_logprob = enable_logprob
self.graph_optimization_config = graph_optimization_config
self.local_rank = local_rank
def __str__(self):
return "\n".join(f"{k}: {v}" for k, v in self.__dict__.items())
def initialize(self):
"""Initialize the final fd config"""
return initialize_fd_config(self, ranks=self.tensor_parallel_size, local_rank=0)
return initialize_fd_config(self, ranks=self.tensor_parallel_size, local_rank=self.local_rank)

View File

@@ -23,15 +23,14 @@ from paddleformers.utils.log import logger
from fastdeploy.config import FDConfig
from fastdeploy.model_executor.model_loader import ModelRegistry
from fastdeploy.model_executor.models.ernie4_5_moe import \
Ernie4_5_MoeForCausalLM
Ernie4_5_MoeForCausalLM, Ernie4_5_PretrainedModel
from fastdeploy.model_executor.models.ernie4_5_vl.ernie4_5_vl_moe import \
Ernie4_5_VLMoeForConditionalGeneration
from fastdeploy.model_executor.models.qwen2 import Qwen2ForCausalLM
from fastdeploy.model_executor.models.qwen3 import Qwen3ForCausalLM
from fastdeploy.model_executor.models.qwen3moe import Qwen3MoeForCausalLM
Ernie4_5_VLMoeForConditionalGeneration, Ernie4_5_VLPretrainedModel
from fastdeploy.model_executor.models.qwen2 import Qwen2ForCausalLM, Qwen2PretrainedModel
from fastdeploy.model_executor.models.qwen3 import Qwen3ForCausalLM, Qwen3PretrainedModel
from fastdeploy.model_executor.models.qwen3moe import Qwen3MoeForCausalLM, Qwen3MoePretrainedModel
from fastdeploy.rl.rollout_config import RolloutModelConfig
class RolloutModel(nn.Layer):
"""Main model class for rollout operations, supports multimodal components for train."""
@@ -39,21 +38,25 @@ class RolloutModel(nn.Layer):
"""Initialize with FastDeploy configuration."""
super(RolloutModel, self).__init__()
self.fd_config = rollout_model_config.initialize()
self._init_model()
self.rollout_model = self._init_model()
def _init_model(self):
def _init_model(self) -> nn.Layer:
"""Load model from loader based on config."""
context = paddle.LazyGuard()
architectures = f"{self.fd_config.model_config.architectures[0]}RL"
with context:
model_cls = ModelRegistry.get_class(architectures)
model = model_cls(self.fd_config)
model.eval()
return model
self.rollout_model = model.eval()
def get_name_mappings_to_training(self) -> Dict[str, str]:
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
"""Get parameter name mappings between rollout and training models."""
return getattr(self.rollout_model, "get_name_mappings_to_training", lambda: {})()
return getattr(self.rollout_model, "get_name_mappings_to_training", lambda: {})(trainer_degree)
def get_quantization_infer_keys(self) -> Dict[str, str]:
"""Get parameter name mappings between rollout and training models."""
return getattr(self.rollout_model, "get_quantization_infer_keys", lambda: {})()
@paddle.no_grad()
def state_dict(self):
@@ -61,10 +64,51 @@ class RolloutModel(nn.Layer):
return self.rollout_model.state_dict()
class Ernie4_5_MoeForCausalLMRL(Ernie4_5_MoeForCausalLM):
class BaseRLModel(nn.Layer):
"""Base class for RL models with common functionality"""
def __init__(self,):
super(BaseRLModel, self).__init__()
self.infer_to_train_mapping = {}
self.fd_config = None
@classmethod
def name(cls) -> str:
return cls.__name__
def _update_base_mappings(self, base_name: str) -> None:
"""Common static mappings"""
static_mappings = {
f"{base_name}.embed_tokens.embeddings.weight": f"{base_name}.embed_tokens.weight",
"lm_head.linear.weight": "lm_head.weight"
}
self.infer_to_train_mapping.update(static_mappings)
def _complete_missing_mappings(self) -> None:
"""
Complete the mapping dictionary with keys that have identical names in inference and training.
"""
for key in self.state_dict().keys():
if key not in self.infer_to_train_mapping and "_scale" not in key:
# Skip weight scale parameters in mapping. Train and infer have same key.
self.infer_to_train_mapping[key] = key
def get_quantization_infer_keys(self) -> list[str]:
"""Get quantization infer keys"""
quant_weight_key = []
if self.fd_config.quant_config.name() == "wint8":
""" RL only support weight_only_int8 now"""
for key in self.state_dict().keys():
if "scale" in key:
quant_weight_key.append(key.replace(".weight_scale", ".weight"))
else:
raise ValueError("Only 'wint8' quantization is supported in RL roullout.")
return quant_weight_key
class Ernie4_5_MoeForCausalLMRL(Ernie4_5_MoeForCausalLM, BaseRLModel):
"""
Ernie4_5_MoeForCausalLMRL
"""
_get_tensor_parallel_mappings = Ernie4_5_PretrainedModel._get_tensor_parallel_mappings
def __init__(self, fd_config: FDConfig):
"""
@@ -74,42 +118,27 @@ class Ernie4_5_MoeForCausalLMRL(Ernie4_5_MoeForCausalLM):
super(Ernie4_5_MoeForCausalLMRL, self).__init__(fd_config)
@classmethod
def name(self):
def name(self) -> str:
"""name"""
return "Ernie4_5_MoeForCausalLMRL"
def get_name_mappings_to_training(self):
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
have_bias = self.fd_config.model_config.get("have_norm_bias", False)
# Prepare placeholders
place_holders = ["weight"] + (["bias"] if have_bias else [])
place_holders = ["weight"]
# Initialize mapping dictionary
infer_to_train = {}
base_name = "ernie"
# Static mappings (non-layer specific)
static_mappings = {
f"{base_name}.embed_tokens.embeddings.weight":
f"{base_name}.embed_tokens.weight",
"lm_head.linear.weight": "lm_head.weight"
}
if self.fd_config.model_config.get("tie_word_embeddings", False):
# Support tie_word_embeddings
logger.debug("enable tie_word_embeddings")
static_mappings.pop("lm_head.linear.weight")
infer_to_train.update(static_mappings)
base_name = base_name + ".layers"
self._update_base_mappings("ernie")
base_name = "ernie.layers"
# Helper function to add layer mappings
def _add_layer_mappings(layer_idx: int):
# MoE specific mappings
infer_to_train[f"{base_name}.{layer_idx}.mlp.fused_moe.gate_weight"] = \
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.fused_moe.gate_weight"] = \
f"{base_name}.{layer_idx}.mlp.gate.weight"
if self.fd_config.model_config.moe_use_aux_free:
infer_to_train[f"{base_name}.{layer_idx}.mlp.fused_moe.gate_correction_bias"] = \
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.fused_moe.gate_correction_bias"] = \
f"{base_name}.{layer_idx}.mlp.moe_statics.e_score_correction_bias"
# MoE experts mappings
@@ -117,17 +146,17 @@ class Ernie4_5_MoeForCausalLMRL(Ernie4_5_MoeForCausalLM):
for ph in place_holders:
# up_gate_proj (up_gate_proj)
up_gate_proj_key = f"{base_name}.{layer_idx}.mlp.fused_moe.up_gate_proj_weight"
if up_gate_proj_key not in infer_to_train:
infer_to_train[up_gate_proj_key] = []
infer_to_train[up_gate_proj_key].append(
if up_gate_proj_key not in self.infer_to_train_mapping:
self.infer_to_train_mapping[up_gate_proj_key] = []
self.infer_to_train_mapping[up_gate_proj_key].append(
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.up_gate_proj.{ph}"
)
# down_proj (down_proj)
down_proj_key = f"{base_name}.{layer_idx}.mlp.fused_moe.down_proj_weight"
if down_proj_key not in infer_to_train:
infer_to_train[down_proj_key] = []
infer_to_train[down_proj_key].append(
if down_proj_key not in self.infer_to_train_mapping:
self.infer_to_train_mapping[down_proj_key] = []
self.infer_to_train_mapping[down_proj_key].append(
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.down_proj.{ph}"
)
@@ -137,13 +166,16 @@ class Ernie4_5_MoeForCausalLMRL(Ernie4_5_MoeForCausalLM):
self.fd_config.model_config.num_hidden_layers):
_add_layer_mappings(layer_idx)
return infer_to_train
self._complete_missing_mappings()
return self.infer_to_train_mapping
class Ernie4_5_VLMoeForConditionalGenerationRL(Ernie4_5_VLMoeForConditionalGeneration):
class Ernie4_5_VLMoeForConditionalGenerationRL(Ernie4_5_VLMoeForConditionalGeneration, BaseRLModel):
"""
Ernie4_5_VLMoeForConditionalGenerationRL
"""
_get_tensor_parallel_mappings = Ernie4_5_VLPretrainedModel._get_tensor_parallel_mappings
def __init__(self, fd_config: FDConfig):
"""
@@ -153,69 +185,51 @@ class Ernie4_5_VLMoeForConditionalGenerationRL(Ernie4_5_VLMoeForConditionalGener
super(Ernie4_5_VLMoeForConditionalGenerationRL, self).__init__(fd_config)
@classmethod
def name(self):
def name(self) -> str:
"""name"""
return "Ernie4_5_VLMoeForConditionalGenerationRL"
def get_name_mappings_to_training(self):
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
have_bias = self.fd_config.model_config.get("have_norm_bias", False)
# Prepare placeholders
place_holders = ["weight"] + (["bias"] if have_bias else [])
place_holders = ["weight"]
# Initialize mapping dictionary
infer_to_train = {}
self._update_base_mappings("ernie")
base_name = "ernie"
# Static mappings (non-layer specific)
static_mappings = {
f"{base_name}.embed_tokens.embeddings.weight":
f"{base_name}.embed_tokens.weight",
"lm_head.linear.weight": "lm_head.weight"
}
if self.fd_config.model_config.get("tie_word_embeddings", False):
# Support tie_word_embeddings
logger.debug("enable tie_word_embeddings")
static_mappings.pop("lm_head.linear.weight")
infer_to_train.update(static_mappings)
base_name = base_name + ".layers"
base_name = "ernie.layers"
# Helper function to add layer mappings
def _add_layer_mappings(layer_idx: int, moe_tag: str):
def _add_expert_mappings(layer_idx: int, moe_tag: str, expert_start: int):
# MoE specific mappings
infer_to_train[f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.gate_weight"] = f"{base_name}.{layer_idx}.mlp.gate.weight" if moe_tag == "text" else f"{base_name}.{layer_idx}.mlp.gate.weight_1"
gate_suffix = "" if moe_tag == "text" else "_1"
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.gate_weight"] = \
f"{base_name}.{layer_idx}.mlp.gate.weight{gate_suffix}"
if self.fd_config.model_config.moe_use_aux_free:
infer_to_train[f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.gate_correction_bias"] = \
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.gate_correction_bias"] = \
f"{base_name}.{layer_idx}.mlp.moe_statics.e_score_correction_bias"
# MoE experts mappings
assert isinstance(self.fd_config.model_config.moe_num_experts, list)
if moe_tag == "text":
expert_idx_start = 0
expert_idx_end = self.fd_config.model_config.moe_num_experts[0]
else:
expert_idx_start = self.fd_config.model_config.moe_num_experts[0]
expert_idx_end = self.fd_config.model_config.moe_num_experts[1]
for expert_idx in range(expert_idx_start, expert_idx_end):
# Initialize defaultdict for expert weights
from collections import defaultdict
from itertools import chain
def _generate_ranges(start, end, step=16, take=8):
"""生成 [start, start+take), [start+step, start+step+take), ... 直到 end"""
return chain(
*(range(i, min(i + take, end)) # 防止越界
for i in range(start, end, step)))
expert_mappings = defaultdict(list)
for expert_idx in _generate_ranges(expert_start, total_moe_num, expert_num_per_rank * 2, expert_num_per_rank):
for ph in place_holders:
# up_gate_proj (up_gate_proj)
up_gate_proj_key = f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.up_gate_proj_weight"
if up_gate_proj_key not in infer_to_train:
infer_to_train[up_gate_proj_key] = []
infer_to_train[up_gate_proj_key].append(
expert_mappings[f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.up_gate_proj_weight"].append(
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.up_gate_proj.{ph}"
)
# down_proj (down_proj)
down_proj_key = f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.down_proj_weight"
if down_proj_key not in infer_to_train:
infer_to_train[down_proj_key] = []
infer_to_train[down_proj_key].append(
expert_mappings[f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.down_proj_weight"].append(
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.down_proj.{ph}"
)
self.infer_to_train_mapping.update(expert_mappings)
moe_layer_start_index = self.fd_config.model_config.moe_layer_start_index
if isinstance(moe_layer_start_index, int):
@@ -235,19 +249,28 @@ class Ernie4_5_VLMoeForConditionalGenerationRL(Ernie4_5_VLMoeForConditionalGener
else:
text_moe_layer_end_index = moe_layer_end_index[0]
image_moe_layer_end_index = moe_layer_end_index[1]
assert isinstance(self.fd_config.model_config.moe_num_experts, list)
total_moe_num = sum(self.fd_config.model_config.moe_num_experts)
if not trainer_degree:
trainer_degree = self.fd_config.parallel_config.tensor_parallel_size
expert_num_per_rank = self.fd_config.model_config.moe_num_experts[0] // trainer_degree
# Process MoE layers
for layer_idx in range(text_moe_layer_start_index, text_moe_layer_end_index):
_add_layer_mappings(layer_idx, "text")
_add_expert_mappings(layer_idx, "text", expert_start=0)
for layer_idx in range(image_moe_layer_start_index, image_moe_layer_end_index):
_add_layer_mappings(layer_idx, "image")
_add_expert_mappings(layer_idx, "image", expert_start=expert_num_per_rank)
return infer_to_train
self._complete_missing_mappings()
return self.infer_to_train_mapping
class Qwen2ForCausalLMRL(Qwen2ForCausalLM):
class Qwen2ForCausalLMRL(Qwen2ForCausalLM, BaseRLModel):
"""
Qwen2ForCausalLMRL
"""
_get_tensor_parallel_mappings = Qwen2PretrainedModel._get_tensor_parallel_mappings
def __init__(self, fd_config: FDConfig):
"""
@@ -257,47 +280,39 @@ class Qwen2ForCausalLMRL(Qwen2ForCausalLM):
super(Qwen2ForCausalLMRL, self).__init__(fd_config)
@classmethod
def name(self):
def name(self) -> str:
"""name"""
return "Qwen2ForCausalLMRL"
def get_name_mappings_to_training(self):
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
# Prepare placeholders
place_holders = ["weight"]
# Initialize mapping dictionary
infer_to_train = {}
base_name = "qwen2"
# Static mappings (non-layer specific)
static_mappings = {
f"{base_name}.embed_tokens.embeddings.weight":
f"{base_name}.embed_tokens.weight",
"lm_head.linear.weight": "lm_head.weight"
}
infer_to_train.update(static_mappings)
base_name = base_name + ".layers"
self._update_base_mappings("qwen2")
base_name = "qwen2.layers"
# Helper function to add layer mappings
def _add_layer_mappings(layer_idx):
# FFN mappings
for ph in place_holders:
infer_to_train[f"{base_name}.{layer_idx}.mlp.up_gate_proj.{ph}"] = \
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.up_gate_proj.{ph}"] = \
f"{base_name}.{layer_idx}.mlp.gate_up_fused_proj.{ph}"
for layer_idx in range(
self.fd_config.model_config.num_hidden_layers):
_add_layer_mappings(layer_idx)
return infer_to_train
self._complete_missing_mappings()
return self.infer_to_train_mapping
class Qwen3MoeForCausalLMRL(Qwen3MoeForCausalLM):
class Qwen3MoeForCausalLMRL(Qwen3MoeForCausalLM, BaseRLModel):
"""
Qwen3MoeForCausalLMRL
"""
_get_tensor_parallel_mappings = Qwen3MoePretrainedModel._get_tensor_parallel_mappings
def __init__(self, fd_config: FDConfig):
"""
@@ -307,37 +322,29 @@ class Qwen3MoeForCausalLMRL(Qwen3MoeForCausalLM):
super(Qwen3MoeForCausalLMRL, self).__init__(fd_config)
@classmethod
def name(self):
def name(self) -> str:
"""name"""
return "Qwen3MoeForCausalLMRL"
def get_name_mappings_to_training(self):
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
# Prepare placeholders
place_holders = ["weight"]
# Initialize mapping dictionary
infer_to_train = {}
self._update_base_mappings("model")
self.infer_to_train_mapping = {}
base_name = "model"
# Static mappings (non-layer specific)
static_mappings = {
f"{base_name}.embed_tokens.embeddings.weight":
f"{base_name}.embed_tokens.weight",
"lm_head.linear.weight": "lm_head.weight"
}
infer_to_train.update(static_mappings)
base_name = base_name + ".layers"
base_name = "model.layers"
# Helper function to add layer mappings
def _add_layer_mappings(layer_idx: int):
# MoE specific mappings
infer_to_train[f"{base_name}.{layer_idx}.mlp.gate_weight"] = \
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.gate_weight"] = \
f"{base_name}.{layer_idx}.mlp.gate.weight"
if self.fd_config.moe_config.moe_use_aux_free:
infer_to_train[f"{base_name}.{layer_idx}.mlp.fused_moe.gate_correction_bias"] = \
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.fused_moe.gate_correction_bias"] = \
f"{base_name}.{layer_idx}.mlp.moe_statics.e_score_correction_bias"
# MoE experts mappings
@@ -345,17 +352,17 @@ class Qwen3MoeForCausalLMRL(Qwen3MoeForCausalLM):
for ph in place_holders:
# up_gate_proj (up_gate_proj)
up_gate_proj_key = f"{base_name}.{layer_idx}.mlp.up_gate_proj_weight"
if up_gate_proj_key not in infer_to_train:
infer_to_train[up_gate_proj_key] = []
infer_to_train[up_gate_proj_key].append(
if up_gate_proj_key not in self.infer_to_train_mapping:
self.infer_to_train_mapping[up_gate_proj_key] = []
self.infer_to_train_mapping[up_gate_proj_key].append(
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.up_gate_proj.{ph}"
)
# down_proj (down_proj)
down_proj_key = f"{base_name}.{layer_idx}.mlp.down_proj_weight"
if down_proj_key not in infer_to_train:
infer_to_train[down_proj_key] = []
infer_to_train[down_proj_key].append(
if down_proj_key not in self.infer_to_train_mapping:
self.infer_to_train_mapping[down_proj_key] = []
self.infer_to_train_mapping[down_proj_key].append(
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.down_proj.{ph}"
)
@@ -363,13 +370,16 @@ class Qwen3MoeForCausalLMRL(Qwen3MoeForCausalLM):
for layer_idx in range(self.fd_config.model_config.num_hidden_layers):
_add_layer_mappings(layer_idx)
return infer_to_train
self._complete_missing_mappings()
return self.infer_to_train_mapping
class Qwen3ForCausalLMRL(Qwen3ForCausalLM):
class Qwen3ForCausalLMRL(Qwen3ForCausalLM, BaseRLModel):
"""
Qwen3ForCausalLMRL
"""
_get_tensor_parallel_mappings = Qwen3PretrainedModel._get_tensor_parallel_mappings
def __init__(self, fd_config: FDConfig):
"""
@@ -379,6 +389,28 @@ class Qwen3ForCausalLMRL(Qwen3ForCausalLM):
super(Qwen3ForCausalLMRL, self).__init__(fd_config)
@classmethod
def name(self):
def name(self) -> str:
"""name"""
return "Qwen3ForCausalLMRL"
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
# Prepare placeholders
place_holders = ["weight"]
# Initialize mapping dictionary
self._update_base_mappings("model")
base_name = "model.layers"
# Helper function to add layer mappings
def _add_layer_mappings(layer_idx):
# FFN mappings
for ph in place_holders:
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.up_gate_proj.{ph}"] = \
f"{base_name}.{layer_idx}.mlp.gate_up_fused_proj.{ph}"
for layer_idx in range(
self.fd_config.model_config.num_hidden_layers):
_add_layer_mappings(layer_idx)
self._complete_missing_mappings()
return self.infer_to_train_mapping

View File

@@ -27,7 +27,8 @@ from datetime import datetime
from logging.handlers import BaseRotatingHandler
from pathlib import Path
from typing import Literal, TypeVar, Union
import random
import socket
import requests
import yaml
from aistudio_sdk.snapshot_download import snapshot_download
@@ -421,6 +422,19 @@ def get_host_ip():
return ip
def get_random_port():
while True:
port = random.randint(49152, 65535)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("0.0.0.0", port))
return port
except OSError:
continue
def is_port_available(host, port):
"""
Check the port is available

View File

@@ -47,14 +47,12 @@ from fastdeploy.platforms import current_platform
if not current_platform.is_dcu():
from fastdeploy.spec_decode import MTPProposer, NgramProposer
from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer
from fastdeploy.input.mm_processor import DataProcessor
from fastdeploy.model_executor.forward_meta import ForwardMeta
from fastdeploy.model_executor.models.ernie4_5_vl.modeling_resampler import \
ScatterOp
from fastdeploy.worker.model_runner_base import ModelRunnerBase
from fastdeploy.worker.output import ModelOutputData, ModelRunnerOutput
from fastdeploy.worker.utils import check_safetensors_model
class GPUModelRunner(ModelRunnerBase):
@@ -81,16 +79,7 @@ class GPUModelRunner(ModelRunnerBase):
# VL model config:
if self.enable_mm:
model_path = os.path.dirname(self.parallel_config.model_name_or_path)
self.is_safetensors_model = check_safetensors_model(
self.parallel_config.model_name_or_path)
if not self.is_safetensors_model:
self.tokenizer_path = self.image_preprocessor_path = model_path
else:
self.tokenizer_path = self.parallel_config.model_name_or_path
self.image_preprocessor_path = self.parallel_config.model_name_or_path
self.vision_model_name_or_path = os.path.join(
model_path, "DFNRopeVisionTransformer")
self._init_image_preprocess()
self.amp_black = [
"reduce_sum",
@@ -151,7 +140,7 @@ class GPUModelRunner(ModelRunnerBase):
"""
Check whether prefill stage finished
"""
if int(paddle.max(self.share_inputs['seq_lens_encoder'])) != 0:
if int(paddle.max(self.share_inputs["seq_lens_encoder"])) != 0:
return 1
else:
return 0
@@ -227,12 +216,15 @@ class GPUModelRunner(ModelRunnerBase):
1] = request.prompt_token_ids[-1]
self.share_inputs["input_ids"][idx:idx + 1,
0] = request.prompt_token_ids[0]
self.share_inputs["prompt_ids"][idx:idx + 1,
:length] = np.array(request.prompt_token_ids)
self.share_inputs['seq_lens_encoder'][idx:idx + 1] = 0
self.share_inputs['seq_lens_decoder'][idx:idx + 1] = length
self.share_inputs['seq_lens_this_time'][idx:idx + 1] = 1
self.share_inputs['step_seq_lens_encoder'][idx:idx + 1] = 0
self.share_inputs['step_seq_lens_decoder'][idx:idx +
1] = length
self.share_inputs["prompt_lens"][idx:idx + 1] = length
self.share_inputs['step_idx'][idx:idx + 1] = 1
if self.speculative_decoding:
@@ -247,6 +239,9 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs["input_ids"][idx:idx +
1, :length] = np.array(
request.prompt_token_ids)
self.share_inputs["prompt_ids"][idx:idx +
1, :length] = np.array(
request.prompt_token_ids)
# Use chunked prefill
if self.parallel_config.enable_chunked_prefill:
@@ -286,6 +281,7 @@ class GPUModelRunner(ModelRunnerBase):
idx:idx + 1] = token_chunk_size
self.share_inputs['seq_lens_encoder'][idx:idx +
1] = token_chunk_size
self.share_inputs["prompt_lens"][idx:idx + 1] = token_chunk_size
else:
if self.enable_mm:
inputs = self._preprocess_mm_task(request.multimodal_inputs)
@@ -310,6 +306,7 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs['step_seq_lens_encoder'][idx:idx +
1] = length
self.share_inputs['seq_lens_encoder'][idx:idx + 1] = length
self.share_inputs["prompt_lens"][idx:idx + 1] = length
if self.enable_mm:
enable_thinking = request.get("enable_thinking", True)
@@ -408,6 +405,8 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs["input_ids"][idx:idx +
1, :input_length] = np.array(
[5] * input_length)
self.share_inputs["prompt_ids"][idx:idx + 1, :input_length] = np.array(
[5] * input_length)
self.share_inputs["eos_token_id"][:] = np.array(
[2], dtype="int64").reshape(-1, 1)
self.share_inputs["seq_lens_this_time"][idx:idx + 1] = input_length
@@ -415,11 +414,11 @@ class GPUModelRunner(ModelRunnerBase):
1] = input_length
self.share_inputs["seq_lens_encoder"][idx:idx + 1] = input_length
self.share_inputs["seq_lens_decoder"][idx:idx + 1] = 0
self.share_inputs["prompt_lens"][idx:idx + 1] = 0
self.share_inputs["step_idx"][idx:idx + 1] = 0
self.share_inputs["max_dec_len"][idx:idx + 1] = max_dec_len
self.share_inputs["min_dec_len"][idx:idx + 1] = max_dec_len
self.share_inputs["stop_flags"][idx:idx + 1] = False
self.share_inputs["top_p"][idx:idx + 1] = 0.0
self.share_inputs["temperature"][idx:idx + 1] = 1
self.share_inputs["first_token_ids"][
@@ -446,6 +445,10 @@ class GPUModelRunner(ModelRunnerBase):
[max_num_seqs, self.parallel_config.max_model_len],
self.parallel_config.pad_token_id,
dtype='int64')
self.share_inputs["prompt_ids"] = paddle.full(
[max_num_seqs, self.parallel_config.max_model_len],
self.parallel_config.pad_token_id,
dtype='int64')
self.share_inputs["eos_token_id"] = paddle.full(
[self.parallel_config.eos_tokens_lens, 1], 0, dtype='int64')
self.share_inputs["top_p"] = paddle.full([max_num_seqs, 1],
@@ -490,6 +493,9 @@ class GPUModelRunner(ModelRunnerBase):
[max_num_seqs, 1], 0, dtype='int32')
self.share_inputs["step_seq_lens_decoder"] = paddle.full(
[max_num_seqs, 1], 0, dtype='int32')
self.share_inputs["prompt_lens"] = paddle.full([max_num_seqs, 1],
0,
dtype='int64')
self.share_inputs["step_idx"] = paddle.full([max_num_seqs, 1],
0,
dtype='int64')
@@ -699,6 +705,8 @@ class GPUModelRunner(ModelRunnerBase):
top_k=self.share_inputs["top_k"],
step_idx=self.share_inputs["step_idx"],
pre_token_ids=self.share_inputs["pre_ids"],
prompt_ids=self.share_inputs["prompt_ids"],
prompt_lens=self.share_inputs["prompt_lens"],
frequency_penalties=self.share_inputs["frequency_score"],
presence_penalties=self.share_inputs["presence_score"],
repetition_penalties=self.share_inputs["penalty_score"],
@@ -714,8 +722,6 @@ class GPUModelRunner(ModelRunnerBase):
f"Starting to load model {self.model_config.architectures[0]}")
time_before_load = time.perf_counter()
# 1. Load original model
if self.enable_mm:
self.load_mm_config_and_image_preprocess()
self.model = get_model_from_loader(fd_config=self.fd_config)
# 1.1 Load RL dynamic model
if self.fd_config.load_config.dynamic_load_weight:
@@ -1036,6 +1042,10 @@ class GPUModelRunner(ModelRunnerBase):
self.share_inputs["image_features"] = None
token_chunk_size = inputs["input_ids"].shape[1]
self.share_inputs["input_ids"][idx:idx + 1, :token_chunk_size] = inputs["input_ids"]
self.share_inputs["prompt_ids"][
idx:idx + 1,
self.share_inputs["prompt_lens"][idx:idx + 1]: self.share_inputs["prompt_lens"][idx:idx + 1] + token_chunk_size
] = inputs["input_ids"]
self.share_inputs["seq_lens_decoder"][idx:idx +1] = task.start_idx
task.start_idx += token_chunk_size
else:
@@ -1048,6 +1058,7 @@ class GPUModelRunner(ModelRunnerBase):
1] = token_chunk_size
self.share_inputs['seq_lens_encoder'][idx:idx +
1] = token_chunk_size
self.share_inputs["prompt_lens"][idx:idx + 1] += token_chunk_size
self.share_inputs["step_idx"][idx:idx + 1] = 0
if self.speculative_decoding and self.proposer.is_chunk_prefill_enabled(
@@ -1415,8 +1426,8 @@ class GPUModelRunner(ModelRunnerBase):
def _init_image_preprocess(self) -> None:
processor = DataProcessor(
tokenizer_name=self.tokenizer_path,
image_preprocessor_name=str(self.image_preprocessor_path),
tokenizer_name=self.parallel_config.model_name_or_path,
image_preprocessor_name=str(self.parallel_config.model_name_or_path),
)
processor.eval()
image_preprocess = processor.image_preprocessor
@@ -1434,31 +1445,6 @@ class GPUModelRunner(ModelRunnerBase):
-1)
self.image_preprocess = image_preprocess
def load_mm_config_and_image_preprocess(self) -> None:
tokenizer = ErnieBotTokenizer.from_pretrained(
self.tokenizer_path,
model_max_length=self.parallel_config.max_model_len,
padding_side="right",
use_fast=False,
)
tokenizer.ignored_index = -100
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.unk_token
self.fd_config.model_config.tensor_parallel_degree = self.parallel_config.tensor_parallel_size
self.fd_config.model_config.tensor_parallel_rank = self.parallel_config.tensor_parallel_rank
vision_config = self.fd_config.model_config.vision_config
vision_config.dtype = self.fd_config.model_config.dtype
vision_config.tensor_parallel_degree = self.parallel_config.tensor_parallel_size
vision_config.tensor_parallel_rank = self.parallel_config.tensor_parallel_rank
self.fd_config.model_config.im_patch_id = tokenizer.get_vocab()[
"<|IMAGE_PLACEHOLDER|>"
]
self.fd_config.model_config.think_end_id = tokenizer.get_vocab()["</think>"]
self.fd_config.model_config.sequence_parallel = self.parallel_config.sequence_parallel
self.model_config = self.fd_config.model_config
self._init_image_preprocess()
def _preprocess_mm_task(self, one: dict) -> None:
"""process batch"""

View File

@@ -23,6 +23,7 @@ import pynvml
from fastdeploy.config import FDConfig
from fastdeploy.engine.request import Request
from fastdeploy.platforms import current_platform
from fastdeploy.utils import get_logger
from fastdeploy.worker.gpu_model_runner import GPUModelRunner
from fastdeploy.worker.output import ModelRunnerOutput
@@ -50,11 +51,12 @@ class GpuWorker(WorkerBase):
"""
Initialize device and construct model runner
"""
self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
if self.device_config.device_type == "cuda" and paddle.device.is_compiled_with_cuda(
):
# Set evironment variable
self.device_ids = self.parallel_config.device_ids.split(",")
self.device = f"gpu:{self.local_rank}"
self.device = f"gpu:{self.local_rank % self.max_chips_per_node}"
paddle.device.set_device(self.device)
paddle.set_default_dtype(self.parallel_config.dtype)
@@ -72,7 +74,7 @@ class GpuWorker(WorkerBase):
self.model_runner: GPUModelRunner = GPUModelRunner(
fd_config=self.fd_config,
device=self.device,
device_id=self.device_ids[self.local_rank],
device_id=self.device_ids[self.local_rank % self.max_chips_per_node],
rank=self.rank,
local_rank=self.local_rank)

View File

@@ -174,6 +174,7 @@ class IluvatarModelRunner(ModelRunnerBase):
self.share_inputs['step_seq_lens_encoder'][idx:idx + 1] = 0
self.share_inputs['step_seq_lens_decoder'][idx:idx +
1] = length
self.share_inputs["prompt_lens"][idx:idx + 1] = length
self.share_inputs['step_idx'][idx:idx + 1] = 1
if self.speculative_decoding:
@@ -208,6 +209,7 @@ class IluvatarModelRunner(ModelRunnerBase):
idx:idx + 1] = request.get("seq_lens_decoder", 0)
self.share_inputs['step_seq_lens_decoder'][
idx:idx + 1] = request.get("seq_lens_decoder", 0)
self.share_inputs["prompt_lens"][idx:idx + 1] = token_chunk_size
else:
self.share_inputs['seq_lens_decoder'][
idx:idx + 1] = request.get("seq_lens_decoder", 0)
@@ -218,6 +220,7 @@ class IluvatarModelRunner(ModelRunnerBase):
self.share_inputs['step_seq_lens_encoder'][idx:idx +
1] = length
self.share_inputs['seq_lens_encoder'][idx:idx + 1] = length
self.share_inputs["prompt_lens"][idx:idx + 1] = length
if len(request.eos_token_ids
) < self.parallel_config.eos_tokens_lens:
@@ -290,6 +293,8 @@ class IluvatarModelRunner(ModelRunnerBase):
self.share_inputs["input_ids"][idx:idx +
1, :input_length] = np.array(
[5] * input_length)
self.share_inputs["prompt_ids"][idx:idx + 1, :input_length] = np.array(
[5] * input_length)
self.share_inputs["eos_token_id"][:] = np.array(
[2], dtype="int64").reshape(-1, 1)
self.share_inputs["seq_lens_this_time"][idx:idx + 1] = input_length
@@ -297,6 +302,7 @@ class IluvatarModelRunner(ModelRunnerBase):
1] = input_length
self.share_inputs["seq_lens_encoder"][idx:idx + 1] = input_length
self.share_inputs["seq_lens_decoder"][idx:idx + 1] = 0
self.share_inputs["prompt_lens"][idx:idx + 1] = 0
self.share_inputs["step_idx"][idx:idx + 1] = 0
self.share_inputs["max_dec_len"][idx:idx + 1] = max_dec_len
self.share_inputs["stop_flags"][idx:idx + 1] = False
@@ -325,6 +331,10 @@ class IluvatarModelRunner(ModelRunnerBase):
[max_num_seqs, self.parallel_config.max_model_len],
self.parallel_config.pad_token_id,
dtype='int64')
self.share_inputs["prompt_ids"] = paddle.full(
[max_num_seqs, self.parallel_config.max_model_len],
self.parallel_config.pad_token_id,
dtype='int64')
self.share_inputs["eos_token_id"] = paddle.full(
[self.parallel_config.eos_tokens_lens, 1], 0, dtype='int64')
self.share_inputs["top_p"] = paddle.full([max_num_seqs, 1],
@@ -369,6 +379,9 @@ class IluvatarModelRunner(ModelRunnerBase):
[max_num_seqs, 1], 0, dtype='int32')
self.share_inputs["step_seq_lens_decoder"] = paddle.full(
[max_num_seqs, 1], 0, dtype='int32')
self.share_inputs["prompt_lens"] = paddle.full([max_num_seqs, 1],
0,
dtype='int64')
self.share_inputs["step_idx"] = paddle.full([max_num_seqs, 1],
0,
dtype='int64')
@@ -563,6 +576,8 @@ class IluvatarModelRunner(ModelRunnerBase):
top_k=self.share_inputs["top_k"],
step_idx=self.share_inputs["step_idx"],
pre_token_ids=self.share_inputs["pre_ids"],
prompt_ids=self.share_inputs["prompt_ids"],
prompt_lens=self.share_inputs["prompt_lens"],
frequency_penalties=self.share_inputs["frequency_score"],
presence_penalties=self.share_inputs["presence_score"],
repetition_penalties=self.share_inputs["penalty_score"],
@@ -845,6 +860,7 @@ class IluvatarModelRunner(ModelRunnerBase):
token_chunk_size])
self.share_inputs['seq_lens_encoder'][idx:idx +
1] = token_chunk_size
self.share_inputs["prompt_lens"][idx:idx + 1] += token_chunk_size
self.share_inputs["step_idx"][idx:idx + 1] = 0
self.share_inputs["seq_lens_decoder"][
idx:idx + 1] = start_idx + task.get("seq_lens_decoder", 0)

View File

@@ -23,9 +23,11 @@ import paddle
import paddle.distributed as dist
import paddle.distributed.fleet as fleet
from fastdeploy.config import (DecodingConfig, DeviceConfig, FDConfig,
from fastdeploy.config import (DecodingConfig, DeviceConfig,
ErnieArchitectures, FDConfig,
GraphOptimizationConfig, LoadConfig,
ModelConfig, ParallelConfig, SpeculativeConfig)
from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer
from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue
from fastdeploy.inter_communicator import IPCSignal
from fastdeploy.model_executor.layers.quantization import \
@@ -83,6 +85,30 @@ def init_distributed_environment(seed: int = 20) -> List[int]:
return ranks, local_rank
def update_fd_config_for_mm(fd_config: FDConfig) -> None:
if fd_config.model_config.enable_mm:
tokenizer = ErnieBotTokenizer.from_pretrained(
fd_config.parallel_config.model_name_or_path,
model_max_length=fd_config.parallel_config.max_model_len,
padding_side="right",
use_fast=False,
)
tokenizer.ignored_index = -100
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.unk_token
fd_config.model_config.tensor_parallel_degree = fd_config.parallel_config.tensor_parallel_size
fd_config.model_config.tensor_parallel_rank = fd_config.parallel_config.tensor_parallel_rank
vision_config = fd_config.model_config.vision_config
vision_config.dtype = fd_config.model_config.dtype
# vision_config.tensor_parallel_degree = fd_config.parallel_config.tensor_parallel_size
# vision_config.tensor_parallel_rank = fd_config.parallel_config.tensor_parallel_rank
fd_config.model_config.im_patch_id = tokenizer.get_vocab()[
"<|IMAGE_PLACEHOLDER|>"
]
fd_config.model_config.think_end_id = tokenizer.get_vocab()["</think>"]
fd_config.model_config.sequence_parallel = fd_config.parallel_config.sequence_parallel
class PaddleDisWorkerProc():
"""
Paddle Distrubuted wrapper for fastdeploy.worker.Worker,
@@ -136,9 +162,9 @@ class PaddleDisWorkerProc():
model_weights_status:
"""
# init worker_ready_signal
max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
array_size = min(
max_chips_per_node, self.parallel_config.tensor_parallel_size *
self.max_chips_per_node, self.parallel_config.tensor_parallel_size *
self.parallel_config.expert_parallel_size)
workers_ready = np.zeros(shape=[array_size], dtype=np.int32)
self.worker_ready_signal = IPCSignal(
@@ -148,10 +174,10 @@ class PaddleDisWorkerProc():
suffix=self.parallel_config.engine_pid,
create=False)
self.worker_ready_signal.value[self.local_rank %
max_chips_per_node] = 1
self.max_chips_per_node] = 1
# init worker_healthy_live_signal
workers_alive = np.zeros(shape=[self.ranks], dtype=np.int32)
workers_alive = np.zeros(shape=[array_size], dtype=np.int32)
self.worker_healthy_live_signal = IPCSignal(
name="worker_healthy_live_signal",
array=workers_alive,
@@ -205,7 +231,7 @@ class PaddleDisWorkerProc():
Tmp loop function for ep utill DP is supported
"""
while True:
self.worker_healthy_live_signal.value[self.local_rank] = int(
self.worker_healthy_live_signal.value[self.local_rank % self.max_chips_per_node] = int(
time.time())
if self.fd_config.parallel_config.tensor_parallel_rank == 0 and self.task_queue.num_tasks(
@@ -251,12 +277,12 @@ class PaddleDisWorkerProc():
# The first worker detects whether there are tasks in the task queue
if self.local_rank % mp_num_per_node == 0:
if self.task_queue.num_tasks() > 0:
if self.nnode > 1:
self.task_queue.read_finish_flag.set(1)
else:
self.exist_task_signal.value[
self.fd_config.parallel_config.
expert_parallel_rank] = 1
# VL only support 1 batch to prefill
if not self.fd_config.model_config.enable_mm or not self.worker.prefill_finished():
if self.nnode > 1:
self.task_queue.read_finish_flag.set(1)
else:
self.exist_task_signal.value[self.fd_config.parallel_config.expert_parallel_rank] = 1
if self.parallel_config.tensor_parallel_size > 1:
# Synchronize the signal for other workers
@@ -306,10 +332,7 @@ class PaddleDisWorkerProc():
# Execute model to generate token. The generated token will be written to the buffer.
# These generated tokens can be obtained through get_output op.
self.worker.execute_model(req_dicts)
self.exist_prefill_task_signal.value[
0] = self.worker.prefill_finished()
self.exist_prefill_task_signal.value[0] = self.worker.prefill_finished()
def determine_num_available_blocks(self) -> None:
"""Profiles the peak memory usage of the model to determine how many
@@ -504,9 +527,9 @@ def parse_args():
type=int,
default=1,
help="expert parallel size")
parser.add_argument("--enable_expert_parallell",
parser.add_argument("--enable_expert_parallel",
action='store_true',
help="enable expert parallell")
help="enable expert parallel")
parser.add_argument("--ori_vocab_size", type=int, default=None)
parser.add_argument("--quantization",
@@ -517,7 +540,7 @@ def parse_args():
"default is None. The priority of this configuration "\
"is lower than that of the config file. " \
"More complex quantization methods need to be configured via the config file.")
parser.add_argument("--graph_optimiaztion_config",
parser.add_argument("--graph_optimization_config",
type=json.loads,
default=None,
help=" Configation of Graph optimization backend. "
@@ -541,9 +564,8 @@ def parse_args():
"'ipc': real-time IPC streaming with automatic resharding, "
"'ipc_snapshot': load from disk snapshot of IPC weights.")
parser.add_argument("--enable_mm",
type=str,
default="false",
help="Whether to use vl")
action='store_true',
help="Whether to enable vl model")
parser.add_argument("--enable_logprob",
action='store_true',
help="Enable output of token-level log probabilities.")
@@ -572,11 +594,13 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
parallel_config.expert_parallel_rank = int(local_rank / ranks)
load_config = LoadConfig(vars(args))
graph_opt_config = GraphOptimizationConfig(
use_cudagraph=args.graph_optimiaztion_config["use_cudagraph"],
graph_opt_level=args.graph_optimiaztion_config["graph_opt_level"],
cudagraph_capture_sizes=args.graph_optimiaztion_config["cudagraph_capture_sizes"]
)
graph_opt_config = GraphOptimizationConfig()
if args.graph_optimization_config is not None:
graph_opt_config = GraphOptimizationConfig(
use_cudagraph=args.graph_optimization_config["use_cudagraph"],
graph_opt_level=args.graph_optimization_config["graph_opt_level"],
cudagraph_capture_sizes=args.graph_optimization_config["cudagraph_capture_sizes"]
)
# Note(tangbinhan): used for load_checkpoint
model_config.pretrained_config.tensor_parallel_rank = parallel_config.tensor_parallel_rank
@@ -615,9 +639,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
quant_config_name = args.quantization
quantization_config["quantization"] = quant_config_name
# Special handling for Ernie models
is_ernie = "Ernie4_5_ForCausalLM" in model_config.architectures or \
"Ernie4_5_MoeForCausalLM" in model_config.architectures or \
"Ernie4_5_VLMoeForConditionalGeneration" in model_config.architectures
is_ernie = ErnieArchitectures.contains_ernie_arch(model_config.architectures)
if quant_config_name == "wint4" and is_ernie:
quantization_config["dense_quant_type"] = "wint8"
quantization_config["moe_quant_type"] = "wint4"
@@ -650,7 +672,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
)
# Set VL tag
model_config.enable_mm = getattr(args, 'enable_mm', 'false').lower() == 'true'
model_config.enable_mm = args.enable_mm
logger.info(f"- Dynamic load weight: {load_config.dynamic_load_weight}")
logger.info(f"- Load strategy: {load_config.load_strategy}")
@@ -662,6 +684,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
decoding_config=decoding_config,
quant_config=quant_config,
graph_opt_config=graph_opt_config)
update_fd_config_for_mm(fd_config)
return fd_config

View File

@@ -29,9 +29,11 @@ triton==3.3
use-triton-in-paddle
crcmod
fastsafetensors==0.1.14
msgpack
opentelemetry-api>=1.24.0
opentelemetry-sdk>=1.24.0
opentelemetry-instrumentation-redis
opentelemetry-instrumentation-mysql
opentelemetry-distro 
opentelemetry-exporter-otlp
opentelemetry-instrumentation-fastapi

View File

@@ -27,3 +27,4 @@ moviepy
use-triton-in-paddle
crcmod
fastsafetensors==0.1.14
msgpack

View File

@@ -27,3 +27,4 @@ moviepy
use-triton-in-paddle
crcmod
fastsafetensors==0.1.14
msgpack

View File

@@ -193,7 +193,7 @@ def get_name():
cmdclass_dict = {'bdist_wheel': CustomBdistWheel}
cmdclass_dict['build_ext'] = CMakeBuild
FASTDEPLOY_VERSION = os.environ.get("FASTDEPLOY_VERSION", "2.0.0-dev")
FASTDEPLOY_VERSION = os.environ.get("FASTDEPLOY_VERSION", "2.0.3")
cmdclass_dict['build_optl'] = PostInstallCommand
setup(

View File

@@ -313,4 +313,66 @@ def test_streaming(openai_client, capsys):
output = []
for chunk in response:
output.append(chunk.choices[0].text)
assert len(output) > 0
assert len(output) > 0
def test_non_streaming_with_stop_str(openai_client):
"""
Test non-streaming chat functionality with the local service
"""
response = openai_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": "Hello, how are you?"}],
temperature=1,
max_tokens=5,
metadata={"include_stop_str_in_output": True},
stream=False,
)
# Assertions to check the response structure
assert hasattr(response, 'choices')
assert len(response.choices) > 0
assert response.choices[0].message.content.endswith("</s>")
response = openai_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": "Hello, how are you?"}],
temperature=1,
max_tokens=5,
metadata={"include_stop_str_in_output": False},
stream=False,
)
# Assertions to check the response structure
assert hasattr(response, 'choices')
assert len(response.choices) > 0
assert not response.choices[0].message.content.endswith("</s>")
def test_streaming_with_stop_str(openai_client):
"""
Test non-streaming chat functionality with the local service
"""
response = openai_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": "Hello, how are you?"}],
temperature=1,
max_tokens=5,
metadata={"include_stop_str_in_output": True},
stream=True,
)
# Assertions to check the response structure
last_token = ""
for chunk in response:
last_token = chunk.choices[0].delta.content
assert last_token == "</s>"
response = openai_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": "Hello, how are you?"}],
temperature=1,
max_tokens=5,
metadata={"include_stop_str_in_output": False},
stream=True,
)
# Assertions to check the response structure
last_token = ""
for chunk in response:
last_token = chunk.choices[0].delta.content
assert last_token != "</s>"

View File

@@ -57,6 +57,12 @@ def _create_default_sampling_metadata(
top_p=paddle.full(shape=[batch_size, 1],
fill_value=0.7,
dtype="float32"),
prompt_ids=paddle.full(shape=[batch_size, max_seq_len],
fill_value=0,
dtype="int64"),
prompt_lens=paddle.full(shape=[batch_size, 1],
fill_value=5,
dtype="int64"),
step_idx=paddle.full(shape=[batch_size, 1],
fill_value=0,
dtype="int64"),

View File

@@ -0,0 +1,142 @@
# Copyright (c) 2025PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" UT for air_topp_sampling kernel """
import copy
import unittest
import numpy as np
import paddle
class Test(unittest.TestCase):
def setUp(self):
"""
Initialize.
"""
self.num_seqs = 4
self.max_model_len = 32768
self.vocab_size = 103424
# prompt token
prompt_ids = paddle.full(shape=[self.num_seqs, self.max_model_len], fill_value=0, dtype='int64')
prompt_lens = paddle.randint(low=0, high=100, shape=[self.num_seqs, 1], dtype='int64')
fake_tokens = paddle.randint(low=3, high=self.vocab_size, shape=[self.num_seqs, self.max_model_len], dtype='int64')
for i in range(self.num_seqs):
prompt_ids[i, :prompt_lens[i]] = fake_tokens[i, :prompt_lens[i]]
# generated token
pre_ids = paddle.full(shape=[self.num_seqs, self.max_model_len], fill_value=-1, dtype='int64')
step_idx = paddle.randint(low=0, high=100, shape=[self.num_seqs, 1], dtype='int64')
fake_tokens = paddle.randint(low=3, high=self.vocab_size, shape=[self.num_seqs, self.max_model_len], dtype='int64')
for i in range(self.num_seqs):
pre_ids[i, :step_idx[i]] = fake_tokens[i, :step_idx[i]]
logits = paddle.randn([self.num_seqs, self.vocab_size]).cast("float32")
penalty_score = paddle.ones([self.num_seqs, 1]) * 1.05
frequency_score = paddle.ones([self.num_seqs, 1]) * 0.5
presence_score = paddle.ones([self.num_seqs, 1]) * 0.3
temperature = paddle.ones([self.num_seqs, 1]) * 0.8
bad_tokens = paddle.to_tensor([[-1]]).cast("int64")
min_dec_len = paddle.ones([self.num_seqs, 1]).cast("int64")
eos_token_id = paddle.to_tensor([[2]]).cast("int64")
self.input_data = {
"prompt_ids": prompt_ids,
"prompt_lens": prompt_lens,
"pre_ids": pre_ids,
"step_idx": step_idx,
"logits": logits,
"bad_tokens": bad_tokens,
"min_dec_len": min_dec_len,
"eos_token_id": eos_token_id,
"penalty_score": penalty_score,
"frequency_score": frequency_score,
"presence_score": presence_score,
"temperature": temperature
}
def get_token_penalty_multi_scores_baseline(self):
input_data = copy.deepcopy(self.input_data)
logits = input_data["logits"]
penalty_score = input_data["penalty_score"]
frequency_score = input_data["frequency_score"]
presence_score = input_data["presence_score"]
temperature = input_data["temperature"]
# min token penalties
mask = input_data["step_idx"] < input_data["min_dec_len"]
for bi, flag in enumerate(mask):
if flag:
logits[bi, input_data["eos_token_id"]] = -1e10
# bad words exclusion
for token in input_data["bad_tokens"]:
if token < 0 or token > self.vocab_size:
continue
logits[:, token] = -1e10
# all penalties
prompt_ids = input_data["prompt_ids"]
for i in range(self.num_seqs):
prompt_ids[i, input_data["prompt_lens"][i]:] = -1
prompt_repeat_times = paddle.zeros([self.num_seqs, self.vocab_size + 1]).cast("int64")
prompt_repeat_times = paddle.put_along_axis(prompt_repeat_times, prompt_ids, paddle.ones_like(input_data["pre_ids"]), axis=1, reduce="add")
prompt_repeat_times = prompt_repeat_times[:, :self.vocab_size]
prompt_mask = prompt_repeat_times > 0
pre_ids = input_data["pre_ids"]
pre_ids[pre_ids == -1] = self.vocab_size
out_repeat_times = paddle.zeros([self.num_seqs, self.vocab_size + 1]).cast("int64")
out_repeat_times = paddle.put_along_axis(out_repeat_times, pre_ids, paddle.ones_like(input_data["pre_ids"]), axis=1, reduce="add")
out_repeat_times = out_repeat_times[:, :self.vocab_size]
output_mask = out_repeat_times > 0
penalty_score = penalty_score.tile(self.vocab_size)
logits[logits > 0] /= paddle.where(output_mask | prompt_mask, penalty_score, 1.0)[logits > 0]
logits[logits <= 0] *= paddle.where(output_mask | prompt_mask, penalty_score, 1.0)[logits <= 0]
logits -= frequency_score * out_repeat_times.cast("float32")
logits -= presence_score * output_mask.cast("float32")
# temperature
logits /= temperature
return logits
def test_penalty_op(self):
"""
"""
baseline_out = self.get_token_penalty_multi_scores_baseline()
from fastdeploy.model_executor.ops.gpu import \
get_token_penalty_multi_scores
logits = get_token_penalty_multi_scores(
self.input_data["pre_ids"],
self.input_data["prompt_ids"],
self.input_data["prompt_lens"],
self.input_data["logits"],
self.input_data["penalty_score"],
self.input_data["frequency_score"],
self.input_data["presence_score"],
self.input_data["temperature"],
self.input_data["bad_tokens"],
self.input_data["step_idx"],
self.input_data["min_dec_len"],
self.input_data["eos_token_id"])
np.testing.assert_allclose(baseline_out.numpy(), logits.numpy(), rtol=1e-04, atol=1e-04)
if __name__ == "__main__":
unittest.main()