mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-10-03 07:46:50 +08:00
Compare commits
23 Commits
release/2.
...
release/2.
Author | SHA1 | Date | |
---|---|---|---|
![]() |
5224f6c434 | ||
![]() |
bfef09dd73 | ||
![]() |
1d46420c49 | ||
![]() |
fb0f284e67 | ||
![]() |
5d1788c7b5 | ||
![]() |
abd238fc12 | ||
![]() |
e5804b1d98 | ||
![]() |
8c43bc8176 | ||
![]() |
b0f1e0eef4 | ||
![]() |
69be77c8c0 | ||
![]() |
535a15ab8f | ||
![]() |
580460046f | ||
![]() |
4dbc483713 | ||
![]() |
4ead15822c | ||
![]() |
f941124402 | ||
![]() |
b89f083004 | ||
![]() |
4d05ed596c | ||
![]() |
bc1866af58 | ||
![]() |
fe237fe92b | ||
![]() |
3a480abcbb | ||
![]() |
335609efb6 | ||
![]() |
3464f75f98 | ||
![]() |
09d0073fdc |
8
.github/workflows/ci.yml
vendored
8
.github/workflows/ci.yml
vendored
@@ -2,7 +2,9 @@ name: CI
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches: [ develop ]
|
||||
branches:
|
||||
- develop
|
||||
- 'release/*'
|
||||
workflow_dispatch:
|
||||
|
||||
concurrency:
|
||||
@@ -27,9 +29,11 @@ jobs:
|
||||
REPO="https://github.com/${{ github.repository }}.git"
|
||||
FULL_REPO="${{ github.repository }}"
|
||||
REPO_NAME="${FULL_REPO##*/}"
|
||||
BASE_BRANCH="${{ github.base_ref }}"
|
||||
# Clean the repository directory before starting
|
||||
docker run --rm --net=host -v $(pwd):/workspace -w /workspace \
|
||||
-e "REPO_NAME=${REPO_NAME}" \
|
||||
-e "BASE_BRANCH=${BASE_BRANCH}" \
|
||||
${docker_image} /bin/bash -c '
|
||||
if [ -d ${REPO_NAME} ]; then
|
||||
echo "Directory ${REPO_NAME} exists, removing it..."
|
||||
@@ -38,7 +42,7 @@ jobs:
|
||||
'
|
||||
git config --global user.name "FastDeployCI"
|
||||
git config --global user.email "fastdeploy_ci@example.com"
|
||||
git clone ${REPO} ${REPO_NAME}
|
||||
git clone ${REPO} ${REPO_NAME} -b ${BASE_BRANCH}
|
||||
cd FastDeploy
|
||||
if [ "${{ github.event_name }}" = "pull_request" ]; then
|
||||
git fetch origin pull/${{ github.event.pull_request.number }}/head:pr/${{ github.event.pull_request.number }}
|
||||
|
8
.github/workflows/ci_xpu.yml
vendored
8
.github/workflows/ci_xpu.yml
vendored
@@ -2,7 +2,9 @@ name: CI_XPU
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches: [ develop ]
|
||||
branches:
|
||||
- develop
|
||||
- 'release/*'
|
||||
workflow_dispatch:
|
||||
|
||||
concurrency:
|
||||
@@ -27,9 +29,11 @@ jobs:
|
||||
REPO="https://github.com/${{ github.repository }}.git"
|
||||
FULL_REPO="${{ github.repository }}"
|
||||
REPO_NAME="${FULL_REPO##*/}"
|
||||
BASE_BRANCH="${{ github.base_ref }}"
|
||||
# Clean the repository directory before starting
|
||||
docker run --rm --net=host -v $(pwd):/workspace -w /workspace \
|
||||
-e "REPO_NAME=${REPO_NAME}" \
|
||||
-e "BASE_BRANCH=${BASE_BRANCH}" \
|
||||
${docker_image} /bin/bash -c '
|
||||
if [ -d ${REPO_NAME} ]; then
|
||||
echo "Directory ${REPO_NAME} exists, removing it..."
|
||||
@@ -38,7 +42,7 @@ jobs:
|
||||
'
|
||||
git config --global user.name "FastDeployCI"
|
||||
git config --global user.email "fastdeploy_ci@example.com"
|
||||
git clone ${REPO} ${REPO_NAME}
|
||||
git clone ${REPO} ${REPO_NAME} -b ${BASE_BRANCH}
|
||||
cd FastDeploy
|
||||
if [ "${{ github.event_name }}" = "pull_request" ]; then
|
||||
git fetch origin pull/${{ github.event.pull_request.number }}/head:pr/${{ github.event.pull_request.number }}
|
||||
|
@@ -289,7 +289,7 @@ __global__ void TopKTopPSamplingFromProbKernel(DType* probs, IdType* output,
|
||||
curand_init(philox_seed, bx, philox_offset, &state);
|
||||
const uint32_t row_idx = bx;
|
||||
const uint32_t k = top_k_arr[row_idx] == 0 ? d : top_k_arr[row_idx];
|
||||
const float p = top_p_arr[row_idx] == 0 ? 1e-6 : top_p_arr[row_idx];
|
||||
const float p = top_p_arr[row_idx];
|
||||
|
||||
extern __shared__ __align__(
|
||||
alignof(SamplingTempStorage<BLOCK_THREADS, SCAN_ALGORITHM, REDUCE_ALGORITHM>))
|
||||
|
@@ -20,16 +20,16 @@ __global__ inline void min_length_logits_process(T *logits,
|
||||
const int64_t *min_len,
|
||||
const int64_t *eos_token_id,
|
||||
const int64_t bs,
|
||||
const int64_t length,
|
||||
const int64_t end_length) {
|
||||
const int64_t vocab_size,
|
||||
const int64_t eos_len) {
|
||||
int bi = threadIdx.x;
|
||||
if (bi >= bs) return;
|
||||
if (cur_len[bi] < 0) {
|
||||
return;
|
||||
}
|
||||
if (cur_len[bi] < min_len[bi]) {
|
||||
for (int i = 0; i < end_length; i++) {
|
||||
logits[bi * length + eos_token_id[i]] = -1e10;
|
||||
for (int i = 0; i < eos_len; i++) {
|
||||
logits[bi * vocab_size + eos_token_id[i]] = -1e10;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -41,61 +41,83 @@ __global__ inline void min_length_logits_process<half>(
|
||||
const int64_t *min_len,
|
||||
const int64_t *eos_token_id,
|
||||
const int64_t bs,
|
||||
const int64_t length,
|
||||
const int64_t end_length) {
|
||||
const int64_t vocab_size,
|
||||
const int64_t eos_len) {
|
||||
int bi = threadIdx.x;
|
||||
if (bi >= bs) return;
|
||||
if (cur_len[bi] < 0) {
|
||||
return;
|
||||
}
|
||||
if (cur_len[bi] < min_len[bi]) {
|
||||
for (int i = 0; i < end_length; i++) {
|
||||
logits[bi * length + eos_token_id[i]] = -1e4;
|
||||
for (int i = 0; i < eos_len; i++) {
|
||||
logits[bi * vocab_size + eos_token_id[i]] = -1e4;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
__global__ void update_repeat_times(const int64_t *pre_ids,
|
||||
const int64_t *prompt_ids,
|
||||
const int64_t *prompt_len,
|
||||
const int64_t *cur_len,
|
||||
int *repeat_times,
|
||||
int *is_repeated,
|
||||
const int64_t bs,
|
||||
const int64_t length,
|
||||
const int64_t length_id) {
|
||||
int bi = blockIdx.x;
|
||||
const int64_t vocab_size,
|
||||
const int64_t max_dec_len,
|
||||
const int64_t max_model_len) {
|
||||
int64_t bi = blockIdx.x;
|
||||
if (cur_len[bi] < 0) {
|
||||
return;
|
||||
}
|
||||
int tid = threadIdx.x;
|
||||
const int64_t *pre_ids_now = pre_ids + bi * length_id;
|
||||
int *repeat_times_now = repeat_times + bi * length;
|
||||
for (int i = tid; i < length_id; i += blockDim.x) {
|
||||
int64_t id = pre_ids_now[i];
|
||||
if (id < 0) break;
|
||||
atomicAdd(&repeat_times_now[id], 1);
|
||||
const int64_t prompt_len_now = prompt_len[bi];
|
||||
int64_t tid = threadIdx.x;
|
||||
const int64_t *prompt_now = prompt_ids + bi * max_model_len;
|
||||
const int64_t *pre_ids_now = pre_ids + bi * max_dec_len;
|
||||
int *repeat_times_now = repeat_times + bi * vocab_size;
|
||||
int *is_repeated_now = is_repeated + bi * vocab_size;
|
||||
const int64_t loop_len = prompt_len_now > max_dec_len ? prompt_len_now : max_dec_len;
|
||||
for (int64_t i = tid; i < loop_len; i += blockDim.x) {
|
||||
if (i < max_dec_len) {
|
||||
int64_t id = pre_ids_now[i];
|
||||
if (id >= 0) {
|
||||
atomicAdd(&repeat_times_now[id], 1);
|
||||
atomicAdd(&is_repeated_now[id], 1);
|
||||
}
|
||||
}
|
||||
if (i < prompt_len_now) {
|
||||
int64_t id = prompt_now[i];
|
||||
if (id >= 0) {
|
||||
atomicAdd(&is_repeated_now[id], 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
__global__ void update_value_by_repeat_times(const int *repeat_times,
|
||||
const int *is_repeated,
|
||||
const T *penalty_scores,
|
||||
const T *frequency_score,
|
||||
const T *presence_score,
|
||||
const float *temperatures,
|
||||
T *logits,
|
||||
const int64_t bs,
|
||||
const int64_t length) {
|
||||
const int64_t vocab_size) {
|
||||
int bi = blockIdx.x;
|
||||
int tid = threadIdx.x;
|
||||
T *logits_now = logits + bi * length;
|
||||
const int *repeat_times_now = repeat_times + bi * length;
|
||||
T *logits_now = logits + bi * vocab_size;
|
||||
const int *repeat_times_now = repeat_times + bi * vocab_size;
|
||||
const int *is_repeated_now = is_repeated + bi * vocab_size;
|
||||
float alpha = static_cast<float>(penalty_scores[bi]);
|
||||
float beta = static_cast<float>(frequency_score[bi]);
|
||||
float gamma = static_cast<float>(presence_score[bi]);
|
||||
for (int i = tid; i < length; i += blockDim.x) {
|
||||
for (int i = tid; i < vocab_size; i += blockDim.x) {
|
||||
int times = repeat_times_now[i];
|
||||
float logit_now = static_cast<float>(logits_now[i]);
|
||||
if (times != 0) {
|
||||
if (is_repeated_now[i] != 0) {
|
||||
logit_now = logit_now < 0 ? logit_now * alpha : logit_now / alpha;
|
||||
}
|
||||
if (times != 0) {
|
||||
logit_now = logit_now - times * beta - gamma;
|
||||
}
|
||||
logits_now[i] = static_cast<T>(logit_now / temperatures[bi]);
|
||||
@@ -106,20 +128,22 @@ template <typename T>
|
||||
__global__ void ban_bad_words(T *logits,
|
||||
const int64_t *bad_words_list,
|
||||
const int64_t bs,
|
||||
const int64_t length,
|
||||
const int64_t bad_words_length) {
|
||||
const int64_t vocab_size,
|
||||
const int64_t bad_words_len) {
|
||||
const int bi = blockIdx.x;
|
||||
int tid = threadIdx.x;
|
||||
T *logits_now = logits + bi * length;
|
||||
for (int i = tid; i < bad_words_length; i += blockDim.x) {
|
||||
T *logits_now = logits + bi * vocab_size;
|
||||
for (int i = tid; i < bad_words_len; i += blockDim.x) {
|
||||
const int64_t bad_words_token_id = bad_words_list[i];
|
||||
if (bad_words_token_id >= length || bad_words_token_id < 0) continue;
|
||||
if (bad_words_token_id >= vocab_size || bad_words_token_id < 0) continue;
|
||||
logits_now[bad_words_token_id] = -1e10;
|
||||
}
|
||||
}
|
||||
|
||||
template <paddle::DataType D>
|
||||
void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
|
||||
const paddle::Tensor &prompt_ids,
|
||||
const paddle::Tensor &prompt_len,
|
||||
const paddle::Tensor &logits,
|
||||
const paddle::Tensor &penalty_scores,
|
||||
const paddle::Tensor &frequency_score,
|
||||
@@ -141,12 +165,15 @@ void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
|
||||
std::vector<int64_t> shape = logits.shape();
|
||||
auto repeat_times =
|
||||
paddle::full(shape, 0, paddle::DataType::INT32, pre_ids.place());
|
||||
auto is_repeated =
|
||||
paddle::full(shape, 0, paddle::DataType::INT32, pre_ids.place());
|
||||
int64_t bs = shape[0];
|
||||
int64_t length = shape[1];
|
||||
int64_t length_id = pre_ids.shape()[1];
|
||||
int64_t length_bad_words = bad_tokens.shape()[0];
|
||||
|
||||
int64_t end_length = eos_token_id.shape()[0];
|
||||
int64_t vocab_size = shape[1];
|
||||
int64_t max_dec_len = pre_ids.shape()[1];
|
||||
int64_t bad_words_len = bad_tokens.shape()[0];
|
||||
int64_t eos_len = eos_token_id.shape()[0];
|
||||
int64_t max_model_len = prompt_ids.shape()[1];
|
||||
|
||||
int block_size = (bs + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
|
||||
min_length_logits_process<<<1, block_size, 0, cu_stream>>>(
|
||||
@@ -156,10 +183,10 @@ void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
|
||||
min_len.data<int64_t>(),
|
||||
eos_token_id.data<int64_t>(),
|
||||
bs,
|
||||
length,
|
||||
end_length);
|
||||
vocab_size,
|
||||
eos_len);
|
||||
|
||||
block_size = (length_id + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
|
||||
block_size = (max_dec_len + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
|
||||
#ifdef PADDLE_WITH_COREX
|
||||
block_size = std::min(block_size, 512);
|
||||
#else
|
||||
@@ -167,13 +194,17 @@ void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
|
||||
#endif
|
||||
update_repeat_times<<<bs, block_size, 0, cu_stream>>>(
|
||||
pre_ids.data<int64_t>(),
|
||||
prompt_ids.data<int64_t>(),
|
||||
prompt_len.data<int64_t>(),
|
||||
cur_len.data<int64_t>(),
|
||||
repeat_times.data<int>(),
|
||||
is_repeated.data<int>(),
|
||||
bs,
|
||||
length,
|
||||
length_id);
|
||||
vocab_size,
|
||||
max_dec_len,
|
||||
max_model_len);
|
||||
|
||||
block_size = (length + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
|
||||
block_size = (vocab_size + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
|
||||
#ifdef PADDLE_WITH_COREX
|
||||
block_size = std::min(block_size, 512);
|
||||
#else
|
||||
@@ -181,6 +212,7 @@ void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
|
||||
#endif
|
||||
update_value_by_repeat_times<DataType_><<<bs, block_size, 0, cu_stream>>>(
|
||||
repeat_times.data<int>(),
|
||||
is_repeated.data<int>(),
|
||||
reinterpret_cast<DataType_ *>(
|
||||
const_cast<data_t *>(penalty_scores.data<data_t>())),
|
||||
reinterpret_cast<DataType_ *>(
|
||||
@@ -191,9 +223,9 @@ void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
|
||||
reinterpret_cast<DataType_ *>(
|
||||
const_cast<data_t *>(logits.data<data_t>())),
|
||||
bs,
|
||||
length);
|
||||
vocab_size);
|
||||
|
||||
block_size = (length_bad_words + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
|
||||
block_size = (bad_words_len + WARP_SIZE - 1) / WARP_SIZE * WARP_SIZE;
|
||||
#ifdef PADDLE_WITH_COREX
|
||||
block_size = std::min(block_size, 512);
|
||||
#else
|
||||
@@ -204,11 +236,13 @@ void token_penalty_multi_scores_kernel(const paddle::Tensor &pre_ids,
|
||||
const_cast<data_t *>(logits.data<data_t>())),
|
||||
bad_tokens.data<int64_t>(),
|
||||
bs,
|
||||
length,
|
||||
length_bad_words);
|
||||
vocab_size,
|
||||
bad_words_len);
|
||||
}
|
||||
|
||||
void TokenPenaltyMultiScores(const paddle::Tensor &pre_ids,
|
||||
const paddle::Tensor &prompt_ids,
|
||||
const paddle::Tensor &prompt_len,
|
||||
const paddle::Tensor &logits,
|
||||
const paddle::Tensor &penalty_scores,
|
||||
const paddle::Tensor &frequency_scores,
|
||||
@@ -222,6 +256,8 @@ void TokenPenaltyMultiScores(const paddle::Tensor &pre_ids,
|
||||
case paddle::DataType::BFLOAT16: {
|
||||
return token_penalty_multi_scores_kernel<
|
||||
paddle::DataType::BFLOAT16>(pre_ids,
|
||||
prompt_ids,
|
||||
prompt_len,
|
||||
logits,
|
||||
penalty_scores,
|
||||
frequency_scores,
|
||||
@@ -233,30 +269,34 @@ void TokenPenaltyMultiScores(const paddle::Tensor &pre_ids,
|
||||
eos_token_id);
|
||||
}
|
||||
case paddle::DataType::FLOAT16: {
|
||||
return token_penalty_multi_scores_kernel<paddle::DataType::FLOAT16>(
|
||||
pre_ids,
|
||||
logits,
|
||||
penalty_scores,
|
||||
frequency_scores,
|
||||
presence_scores,
|
||||
temperatures,
|
||||
bad_tokens,
|
||||
cur_len,
|
||||
min_len,
|
||||
eos_token_id);
|
||||
return token_penalty_multi_scores_kernel<
|
||||
paddle::DataType::FLOAT16>(pre_ids,
|
||||
prompt_ids,
|
||||
prompt_len,
|
||||
logits,
|
||||
penalty_scores,
|
||||
frequency_scores,
|
||||
presence_scores,
|
||||
temperatures,
|
||||
bad_tokens,
|
||||
cur_len,
|
||||
min_len,
|
||||
eos_token_id);
|
||||
}
|
||||
case paddle::DataType::FLOAT32: {
|
||||
return token_penalty_multi_scores_kernel<paddle::DataType::FLOAT32>(
|
||||
pre_ids,
|
||||
logits,
|
||||
penalty_scores,
|
||||
frequency_scores,
|
||||
presence_scores,
|
||||
temperatures,
|
||||
bad_tokens,
|
||||
cur_len,
|
||||
min_len,
|
||||
eos_token_id);
|
||||
return token_penalty_multi_scores_kernel<
|
||||
paddle::DataType::FLOAT32>(pre_ids,
|
||||
prompt_ids,
|
||||
prompt_len,
|
||||
logits,
|
||||
penalty_scores,
|
||||
frequency_scores,
|
||||
presence_scores,
|
||||
temperatures,
|
||||
bad_tokens,
|
||||
cur_len,
|
||||
min_len,
|
||||
eos_token_id);
|
||||
}
|
||||
default: {
|
||||
PD_THROW(
|
||||
@@ -269,6 +309,8 @@ void TokenPenaltyMultiScores(const paddle::Tensor &pre_ids,
|
||||
|
||||
PD_BUILD_STATIC_OP(get_token_penalty_multi_scores)
|
||||
.Inputs({"pre_ids",
|
||||
"prompt_ids",
|
||||
"prompt_len",
|
||||
"logits",
|
||||
"penalty_scores",
|
||||
"frequency_scores",
|
||||
|
@@ -25,9 +25,9 @@ Verified platform:
|
||||
```bash
|
||||
mkdir Work
|
||||
cd Work
|
||||
docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.0
|
||||
docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.3
|
||||
docker run --name fastdeploy-xpu --net=host -itd --privileged -v $PWD:/Work -w /Work \
|
||||
ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.0 \
|
||||
ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.3 \
|
||||
/bin/bash
|
||||
docker exec -it fastdeploy-xpu /bin/bash
|
||||
```
|
||||
@@ -49,7 +49,7 @@ python -m pip install --pre paddlepaddle-xpu -i https://www.paddlepaddle.org.cn/
|
||||
### Install FastDeploy (**Do NOT install via PyPI source**)
|
||||
|
||||
```bash
|
||||
python -m pip install fastdeploy-xpu==2.0.0 -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
python -m pip install fastdeploy-xpu==2.0.3 -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
```
|
||||
|
||||
Alternatively, you can install the latest version of FastDeploy (Not recommended)
|
||||
@@ -118,5 +118,5 @@ python -c "from fastdeploy.model_executor.ops.xpu import block_attn"
|
||||
|
||||
If all the above steps execute successfully, FastDeploy is installed correctly.
|
||||
|
||||
## How to deploy services on kunlunxin XPU
|
||||
Refer to [**Supported Models and Service Deployment**](../../usage/kunlunxin_xpu_deployment.md) for the details about the supported models and the way to deploy services on kunlunxin XPU.
|
||||
## How to deploy services on Kunlunxin XPU
|
||||
Refer to [**Supported Models and Service Deployment**](../../usage/kunlunxin_xpu_deployment.md) for the details about the supported models and the way to deploy services on Kunlunxin XPU.
|
||||
|
@@ -33,9 +33,8 @@ When using FastDeploy to deploy models (including offline inference and service
|
||||
| ```long_prefill_token_threshold``` | `int` | When Chunked Prefill is enabled, requests with token count exceeding this value are considered long requests, default: max_model_len*0.04 |
|
||||
| ```static_decode_blocks``` | `int` | During inference, each request is forced to allocate corresponding number of blocks from Prefill's KVCache for Decode use, default: 2 |
|
||||
| ```reasoning_parser``` | `str` | Specify the reasoning parser to extract reasoning content from model output |
|
||||
| ```enable_static_graph_inference``` | `bool` | Whether to use static graph inference mode, default: False |
|
||||
| ```use_cudagraph``` | `bool` | Whether to use cuda graph, default: False |
|
||||
| ```max_capture_batch_size``` | `int` | When cuda graph is enabled, maximum batch size of captured cuda graph, default: 64 |
|
||||
|```graph_optimization_config``` | `str` | Parameters related to graph optimization can be configured, with default values of'{"use_cudagraph":false, "graph_opt_level":0, "cudagraph_capture_sizes": null }' |
|
||||
| ```enable_custom_all_reduce``` | `bool` | Enable Custom all-reduce, default: False |
|
||||
| ```splitwise_role``` | `str` | Whether to enable splitwise inference, default value: mixed, supported parameters: ["mixed", "decode", "prefill"] |
|
||||
| ```innode_prefill_ports``` | `str` | Internal engine startup ports for prefill instances (only required for single-machine PD separation), default: None |
|
||||
@@ -72,20 +71,53 @@ When `enable_chunked_prefill` is enabled, the service processes long input seque
|
||||
To optimize scheduling priority for short requests, new `max_long_partial_prefills` and `long_prefill_token_threshold` parameter combination is added. The former limits the number of long requests in single prefill batch, the latter defines the token threshold for long requests. The system will prioritize batch space for short requests, thereby reducing short request latency in mixed workload scenarios while maintaining stable throughput.
|
||||
|
||||
## 4. GraphOptimizationBackend related configuration parameters
|
||||
Currently, only user configuration of the following parameters is supported:
|
||||
- `use_cudagraph` : bool = False
|
||||
- `graph_optimization_config` : Dict[str, Any]
|
||||
- `graph_opt_level`: int = 0
|
||||
- `use_cudagraph`: bool = False
|
||||
- `cudagraph_capture_sizes` : List[int] = None
|
||||
|
||||
### Static graph inference related parameters
|
||||
CudaGrpah can be enabled by setting `--use-cudagraph` or `--graph-optimization-config '{"use_cudagraph":true}'`. Using two different methods to set the use graph simultaneously may cause conflicts.
|
||||
|
||||
|
||||
The `graph_opt_level` parameter within `--graph-optimization-config` is used to configure the graph optimization level, with the following available options:
|
||||
- `0`: Use Dynamic compute graph, default to 0
|
||||
- `1`: Use Static compute graph, during the initialization phase, Paddle API will be used to convert the dynamic image into a static image
|
||||
- `2`: Base on Static compute graph, use the complier(CINN, Compiler Infrastructure for Neural Networks) of Paddle to compile and optimize
|
||||
|
||||
In general, static graphs have lower Kernel Launch overhead than dynamic graphs, and it is recommended to use static graphs.
|
||||
For adapted models, FastDeploy's CudaGraph * * can support both dynamic and static graphs * * simultaneously.
|
||||
|
||||
When CudaGraph is enabled in the default configuration, a list of Batch Sizes that CudaGraph needs to capture will be automatically set based on the 'max_num_deqs' parameter. The logic for generating the list of Batch Sizes that need to be captured is as follows:
|
||||
|
||||
1. Generate a candidate list with a range of [1,1024] Batch Size.
|
||||
```
|
||||
# Batch Size [1, 2, 4, 8, 16, ... 120, 128]
|
||||
candidate_capture_sizes = [1, 2, 4] + [8 * i for i in range(1, 17)]
|
||||
# Batch Size (128, 144, ... 240, 256]
|
||||
candidate_capture_sizes += [16 * i for i in range(9, 17)]
|
||||
# Batch Size (256, 288, ... 992, 1024]
|
||||
candidate_capture_sizes += [32 * i for i in range(17, 33)]
|
||||
```
|
||||
2. Crop the candidate list based on the user set 'max_num_deqs' to obtain a CudaGraph capture list with a range of [1,' max_num_deqs'].
|
||||
|
||||
Users can also customize the batch size list that needs to be captured by CudaGraph through the parameter `cudagraph_capture_sizes` in`--graph-optimization-config`:
|
||||
```
|
||||
--graph-optimization-config '{"cudagraph_capture_sizes": [1, 3, 5, 7, 9]}'
|
||||
```
|
||||
|
||||
- When ```enable_static_graph_inference``` is enabled, dynamic-to-static graph conversion will be performed, using static graph for inference.
|
||||
|
||||
### CudaGraph related parameters
|
||||
|
||||
For adapted models, FastDeploy's CudaGraph can support both dynamic and static graphs. Using CudaGraph incurs some additional memory overhead, divided into two categories in FastDeploy:
|
||||
Using CudaGraph incurs some additional memory overhead, divided into two categories in FastDeploy:
|
||||
* Additional input Buffer overhead
|
||||
* CudaGraph uses dedicated memory pool, thus holding some intermediate activation memory isolated from main framework
|
||||
|
||||
FastDeploy initialization sequence first uses `gpu_memory_utilization` parameter to calculate available memory for `KVCache`, after initializing `KVCache` then uses remaining memory to initialize CudaGraph. Since CudaGraph is not enabled by default currently, using default startup parameters may encounter `Out of memory` errors, can try following solutions:
|
||||
* Lower `gpu_memory_utilization` value, reserve more memory for CudaGraph.
|
||||
* Lower `max_capture_batch_size` value, reduce CudaGraph memory usage, but also reduce CudaGraph usage during inference.
|
||||
* Lower `max_num_seqs` to decrease the maximum concurrency.
|
||||
* Customize the batch size list that CudaGraph needs to capture through `graph_optimization_config`, and reduce the number of captured graphs by using `cudagraph_capture_sizes`
|
||||
|
||||
- Before use, must ensure loaded model is properly decorated with ```@support_graph_optimization```.
|
||||
|
||||
@@ -118,4 +150,3 @@ FastDeploy initialization sequence first uses `gpu_memory_utilization` parameter
|
||||
```
|
||||
- When ```use_cudagraph``` is enabled, currently only supports single-GPU inference, i.e. ```tensor_parallel_size``` set to 1.
|
||||
- When ```use_cudagraph``` is enabled, cannot enable ```enable_prefix_caching``` or ```enable_chunked_prefill```.
|
||||
- When ```use_cudagraph``` is enabled, batches with size ≤ ```max_capture_batch_size``` will be executed by CudaGraph, batches > ```max_capture_batch_size``` will be executed by original dynamic/static graph. To have all batch sizes executed by CudaGraph, ```max_capture_batch_size``` value should match ```max_num_seqs```. ```max_capture_batch_size``` > ```max_num_seqs``` will cause waste by capturing batches that won't be encountered during inference, occupying more time and memory.
|
@@ -1,20 +1,14 @@
|
||||
## Supported Models
|
||||
|Model Name|Context Length|Quantization|XPUs Required|Deployment Commands|
|
||||
|-|-|-|-|-|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT8|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT4|4 (recommend)|export XPU_VISIBLE_DEVICES="0,1,2,3" or "4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 4 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT4|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-300B-A47B|128K|WINT4|8 (recommend)|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-21B-A3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-21B-A3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-21B-A3B|32K|WINT4|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-21B-A3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-21B-A3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-21B-A3B|128K|WINT4|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-0.3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-0.3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="x" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-0.3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-0.3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|
||||
|Model Name|Context Length|Quantization|XPUs Required|Deployment Commands|Minimum Version Required|
|
||||
|-|-|-|-|-|-|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT8|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT4|4 (recommend)|export XPU_VISIBLE_DEVICES="0,1,2,3" or "4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 4 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT4|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|
||||
|ERNIE-4.5-300B-A47B|128K|WINT4|8 (recommend)|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|
||||
|ERNIE-4.5-0.3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-0.3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="x" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-0.3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-0.3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # Specify any card<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|
||||
## Quick start
|
||||
|
||||
|
@@ -25,9 +25,9 @@
|
||||
```bash
|
||||
mkdir Work
|
||||
cd Work
|
||||
docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.0
|
||||
docker pull ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.3
|
||||
docker run --name fastdeploy-xpu --net=host -itd --privileged -v $PWD:/Work -w /Work \
|
||||
ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.0 \
|
||||
ccr-2vdh3abv-pub.cnc.bj.baidubce.com/paddlepaddle/fastdeploy-xpu:2.0.3 \
|
||||
/bin/bash
|
||||
docker exec -it fastdeploy-xpu /bin/bash
|
||||
```
|
||||
@@ -49,7 +49,7 @@ python -m pip install --pre paddlepaddle-xpu -i https://www.paddlepaddle.org.cn/
|
||||
### 安装 FastDeploy(**注意不要通过 pypi 源安装**)
|
||||
|
||||
```bash
|
||||
python -m pip install fastdeploy-xpu==2.0.0 -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
python -m pip install fastdeploy-xpu==2.0.3 -i https://www.paddlepaddle.org.cn/packages/stable/fastdeploy-xpu-p800/ --extra-index-url https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
|
||||
```
|
||||
|
||||
或者你也可以安装最新版 FastDeploy(不推荐)
|
||||
@@ -119,5 +119,5 @@ python -c "from fastdeploy.model_executor.ops.xpu import block_attn"
|
||||
|
||||
如果上述步骤均执行成功,代表 FastDeploy 已安装成功。
|
||||
|
||||
## 如何在昆仑新XPU上部署服务
|
||||
## 如何在昆仑芯 XPU 上部署服务
|
||||
请参考 [**支持的模型与服务部署**](../../usage/kunlunxin_xpu_deployment.md) 以了解昆仑芯 XPU 支持的模型与服务部署方法。
|
||||
|
@@ -32,9 +32,8 @@
|
||||
| ```long_prefill_token_threshold``` | `int` | 开启Chunked Prefill时,请求Token数超过此值的请求被视为长请求,默认为max_model_len*0.04 |
|
||||
| ```static_decode_blocks``` | `int` | 推理过程中,每条请求强制从Prefill的KVCache分配对应块数给Decode使用,默认2|
|
||||
| ```reasoning_parser``` | `str` | 指定要使用的推理解析器,以便从模型输出中提取推理内容 |
|
||||
| ```enable_static_graph_inference```| `bool` | 是否使用静态图推理模式,默认False |
|
||||
| ```use_cudagraph``` | `bool` | 是否使用cuda graph,默认False |
|
||||
| ```max_capture_batch_size``` | `int` | 开启 cuda graph 时,捕获的 cuda graph的最大batch size,默认为64 |
|
||||
|```graph_optimization_config``` | `str` | 可以配置计算图优化相关的参数,默认值为'{"use_cudagraph":false, "graph_opt_level":0, "cudagraph_capture_sizes": null }' |
|
||||
| ```enable_custom_all_reduce``` | `bool` | 开启Custom all-reduce,默认False |
|
||||
| ```splitwise_role``` | `str` | 是否开启splitwise推理,默认值mixed, 支持参数为["mixed", "decode", "prefill"] |
|
||||
| ```innode_prefill_ports``` | `str` | prefill 实例内部引擎启动端口 (仅单机PD分离需要),默认值None |
|
||||
@@ -70,22 +69,53 @@ FastDeploy在推理过程中,显存被```模型权重```、```预分配KVCache
|
||||
为优化短请求的调度优先级,新增 `max_long_partial_prefills` 与 `long_prefill_token_threshold` 参数组合。前者限制单个预填充批次中的长请求数量,后者定义长请求的token阈值。系统会优先保障短请求的批处理空间,从而在混合负载场景下降低短请求延迟,同时保持整体吞吐稳定。
|
||||
|
||||
## 4. GraphOptimizationBackend 相关配置参数说明
|
||||
当前仅支持用户配置以下参数:
|
||||
- `use_cudagraph` : bool = False
|
||||
- `graph_optimization_config` : Dict[str, Any]
|
||||
- `graph_opt_level`: int = 0
|
||||
- `use_cudagraph`: bool = False
|
||||
- `cudagraph_capture_sizes` : List[int] = None
|
||||
|
||||
### 动态图转静态图相关参数说明
|
||||
可以通过设置 `--use-cudagraph` 或 `--graph-optimization-config '{"use_cudagraph":true}'` 开启 CudaGrpah。
|
||||
|
||||
`--graph-optimization-config` 中的 `graph_opt_level` 参数用于配置图优化等级,可选项如下:
|
||||
- `0`: 动态图,默认为 0
|
||||
- `1`: 静态图,初始化阶段会使用 Paddle API 将动态图转换为静态图
|
||||
- `2`: 在静态图的基础上,使用 Paddle 框架编译器(CINN, Compiler Infrastructure for Neural Networks)进行编译优化
|
||||
|
||||
一般情况下静态图比动态图的 Kernel Launch 开销更小,推荐使用静态图。
|
||||
对于已适配的模型,FastDeploy 的 CudaGraph **可同时支持动态图与静态图**。
|
||||
|
||||
在默认配置下开启 CudaGraph 时,会根据 `max_num_seqs` 参数自动设置 CudaGraph 需要捕获的 Batch Size 列表,需要捕获的 Batch Size 的列表自动生成逻辑如下:
|
||||
1. 生成一个范围为 [1,1024] Batch Size 的候选列表
|
||||
```
|
||||
# Batch Size [1, 2, 4, 8, 16, ... 120, 128]
|
||||
candidate_capture_sizes = [1, 2, 4] + [8 * i for i in range(1, 17)]
|
||||
# Batch Size (128, 144, ... 240, 256]
|
||||
candidate_capture_sizes += [16 * i for i in range(9, 17)]
|
||||
# Batch Size (256, 288, ... 992, 1024]
|
||||
candidate_capture_sizes += [32 * i for i in range(17, 33)]
|
||||
```
|
||||
2. 根据用户设置的 `max_num_seqs` 裁剪候选列表,得到范围为 [1, `max_num_seqs`] 的 CudaGraph 捕获列表。
|
||||
|
||||
用户也可以通过 `--graph-optimization-config` 中的 `cudagraph_capture_sizes` 参数自定义需要被 CudaGraph 捕获的 Batch Size 列表:
|
||||
```
|
||||
--graph-optimization-config '{"cudagraph_capture_sizes": [1, 3, 5, 7, 9]}'
|
||||
```
|
||||
|
||||
- 当开启 ```enable_static_graph_inference```时,会执行动态图转静态图,使用静态图进行推理。
|
||||
|
||||
### CudaGraph相关参数说明
|
||||
|
||||
对于已适配的模型,FastDeploy 的 CudaGraph 可同时支持动态图与静态图。使用 CudaGraph 会产生一些额外的显存开销,在FastDeploy中分为下面两类:
|
||||
使用 CudaGraph 会产生一些额外的显存开销,在FastDeploy中分为下面两类:
|
||||
* 额外的输入 Buffer 开销
|
||||
* CudaGraph 使用了专用的显存池,因此会持有一部分与主框架隔离的中间激活显存
|
||||
|
||||
FastDeploy 的初始化顺序为先使用 `gpu_memory_utilization` 参数计算 `KVCache` 可用的显存,初始化完 `KVCache` 之后才会使用剩余显存初始化 CudaGraph。由于 CudaGraph 目前还不是默认开启的,因此使用默认启动参数可能会遇到 `Out of memory` 错误,可以尝试使用下面两种方式解决:
|
||||
FastDeploy 的初始化顺序为先使用 `gpu_memory_utilization` 参数计算 `KVCache` 可用的显存,初始化完 `KVCache` 之后才会使用剩余显存初始化 CudaGraph。由于 CudaGraph 目前还不是默认开启的,因此使用默认启动参数可能会遇到 `Out Of Memory` 错误,可以尝试使用下面三种方式解决:
|
||||
* 调低 `gpu_memory_utilization` 的值,多预留一些显存给CudaGraph使用。
|
||||
* 调低 `max_capture_batch_size` 的值, 减少CudaGraph的显存占用,同时也会降低推理时CudaGraph的使用率。
|
||||
* 调低 `max_num_seqs` 的值,降低最大并发数。
|
||||
* 通过 `graph_optimization_config` 自定义需要 CudaGraph 捕获的 Batch Size 列表 `cudagraph_capture_sizes`,减少捕获的图的数量
|
||||
|
||||
- 使用之前,需要确保加载的模型被装饰器 ```@support_graph_optimization```正确修饰。
|
||||
|
||||
使用CudaGraph之前,需要确保加载的模型被装饰器 ```@support_graph_optimization```正确修饰。
|
||||
|
||||
```python
|
||||
# 1. import 装饰器
|
||||
@@ -116,4 +146,3 @@ FastDeploy 的初始化顺序为先使用 `gpu_memory_utilization` 参数计算
|
||||
```
|
||||
- 当开启 ```use_cudagraph``` 时,暂时只支持单卡推理,即 ```tensor_parallel_size``` 设为1。
|
||||
- 当开启 ```use_cudagraph``` 时,暂不支持开启 ```enable_prefix_caching``` 或 ```enable_chunked_prefill``` 。
|
||||
- 当开启 ```use_cudagraph``` 后,size小于等于 ```max_capture_batch_size``` 的batch会由CudaGraph来执行前向计算,大于 ```max_capture_batch_size``` 的batch会由原本的动态图/静态图执行前向计算。如果希望所有batch size均由CudaGraph来执行,```max_capture_batch_size``` 的值建议与 ```max_num_seqs``` 一致。```max_capture_batch_size``` 大于 ```max_num_seqs``` 会导致浪费,会多捕获一些推理时不会遇到的batch,占用更多时间与显存。
|
||||
|
@@ -1,20 +1,14 @@
|
||||
## 支持的模型
|
||||
|模型名|上下文长度|量化|所需卡数|部署命令|
|
||||
|-|-|-|-|-|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT8|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT4|4 (推荐)|export XPU_VISIBLE_DEVICES="0,1,2,3" or "4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 4 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT4|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-300B-A47B|128K|WINT4|8 (推荐)|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-21B-A3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-21B-A3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-21B-A3B|32K|WINT4|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-21B-A3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-21B-A3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-21B-A3B|128K|WINT4|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-21B-A3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-0.3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-0.3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="x" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-0.3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|
|
||||
|ERNIE-4.5-0.3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|
|
||||
|模型名|上下文长度|量化|所需卡数|部署命令|最低版本要求|
|
||||
|-|-|-|-|-|-|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT8|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT4|4 (推荐)|export XPU_VISIBLE_DEVICES="0,1,2,3" or "4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 4 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|
||||
|ERNIE-4.5-300B-A47B|32K|WINT4|8|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|
||||
|ERNIE-4.5-300B-A47B|128K|WINT4|8 (推荐)|export XPU_VISIBLE_DEVICES="0,1,2,3,4,5,6,7"<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-300B-A47B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 8 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 64 \ <br> --quantization "wint4" \ <br> --gpu-memory-utilization 0.9|>=2.0.0|
|
||||
|ERNIE-4.5-0.3B|32K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-0.3B|32K|WINT8|1|export XPU_VISIBLE_DEVICES="x" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 32768 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-0.3B|128K|BF16|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|ERNIE-4.5-0.3B|128K|WINT8|1|export XPU_VISIBLE_DEVICES="0" # 指定任意一张卡<br>python -m fastdeploy.entrypoints.openai.api_server \ <br> --model PaddlePaddle/ERNIE-4.5-0.3B-Paddle \ <br> --port 8188 \ <br> --tensor-parallel-size 1 \ <br> --max-model-len 131072 \ <br> --max-num-seqs 128 \ <br> --quantization "wint8" \ <br> --gpu-memory-utilization 0.9|>=2.0.3|
|
||||
|
||||
## 快速开始
|
||||
|
||||
|
@@ -37,6 +37,25 @@ class MoEPhase(Enum):
|
||||
PREFILL = 1
|
||||
DECODER = 2
|
||||
|
||||
class ErnieArchitectures:
|
||||
"""Helper class for ERNIE architecture check."""
|
||||
|
||||
ARCHITECTURES = {
|
||||
"Ernie4_5_ForCausalLM",
|
||||
"Ernie4_5_MoeForCausalLM",
|
||||
"Ernie4_5_VLMoeForConditionalGeneration"
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def contains_ernie_arch(cls, architectures):
|
||||
"""Check if any ERNIE architecture is present in the given architectures."""
|
||||
return any(arch in architectures for arch in cls.ARCHITECTURES)
|
||||
|
||||
@classmethod
|
||||
def is_ernie_arch(cls, architecture):
|
||||
"""Check if the given architecture is an ERNIE architecture."""
|
||||
return architecture in cls.ARCHITECTURES
|
||||
|
||||
PRETRAINED_INIT_CONFIGURATION = {
|
||||
"rope_theta" : 10000.0,
|
||||
"num_key_value_heads" : -1,
|
||||
@@ -46,7 +65,6 @@ PRETRAINED_INIT_CONFIGURATION = {
|
||||
"num_max_dispatch_tokens_per_rank" : 256,
|
||||
"moe_use_aux_free" : False,
|
||||
"vocab_size" : -1,
|
||||
"use_rope": True,
|
||||
"hidden_dropout_prob" : 0.0,
|
||||
"initializer_range" : 0.02,
|
||||
"max_position_embeddings" : 512,
|
||||
@@ -70,7 +88,7 @@ class ModelConfig:
|
||||
self.stop_seqs_max_len = 8
|
||||
|
||||
# NOTE(gongshaotain): form _load_model_init_val()
|
||||
self.top_p = 0.0
|
||||
self.top_p = 1.0
|
||||
self.temperature = 1.0
|
||||
self.rope_theta = 10000.0
|
||||
self.penalty_score = 1.0
|
||||
@@ -89,6 +107,7 @@ class ModelConfig:
|
||||
if hasattr(self, key):
|
||||
setattr(self, key, value)
|
||||
|
||||
assert self.model_name_or_path != ""
|
||||
pretrained_config, _ = PretrainedConfig.get_config_dict(self.model_name_or_path)
|
||||
self.pretrained_config = PretrainedConfig.from_dict(pretrained_config)
|
||||
|
||||
@@ -108,9 +127,10 @@ class ModelConfig:
|
||||
self.vision_config = PretrainedConfig.from_dict(self.vision_config)
|
||||
|
||||
self.ori_vocab_size = self.vocab_size
|
||||
if "Ernie4_5_ForCausalLM" in self.architectures or "Ernie4_5_MoeForCausalLM" in self.architectures:
|
||||
if ErnieArchitectures.contains_ernie_arch(self.architectures):
|
||||
self.ori_vocab_size = args["ori_vocab_size"]
|
||||
|
||||
|
||||
class ParallelConfig:
|
||||
"""Configuration for the distributed execution."""
|
||||
def __init__(
|
||||
@@ -317,7 +337,7 @@ class GraphOptimizationConfig:
|
||||
pre-compute the mapping from batch size to padded graph size
|
||||
"""
|
||||
# Regular capture sizes
|
||||
self.cudagraph_capture_sizes = [size for size in self.cudagraph_capture_sizes if size < max_num_seqs]
|
||||
self.cudagraph_capture_sizes = [size for size in self.cudagraph_capture_sizes if size <= max_num_seqs]
|
||||
dedup_sizes = list(set(self.cudagraph_capture_sizes))
|
||||
if len(dedup_sizes) < len(self.cudagraph_capture_sizes):
|
||||
logger.info(("cudagraph sizes specified by model runner"
|
||||
|
@@ -124,9 +124,19 @@ class EngineArgs:
|
||||
Ratio of tokens to process in a block.
|
||||
"""
|
||||
|
||||
pod_ips: Optional[List[str]] = None
|
||||
dist_init_ip: Optional[str] = None
|
||||
"""
|
||||
List of IP addresses for nodes in the cluster.
|
||||
The master node ip of multinode deployment
|
||||
"""
|
||||
|
||||
nnodes: int = 1
|
||||
"""
|
||||
The number of nodes in multinode deployment
|
||||
"""
|
||||
|
||||
node_rank: int = 0
|
||||
"""
|
||||
The rank of the current node in multinode deployment
|
||||
"""
|
||||
|
||||
swap_space: float = None
|
||||
@@ -485,11 +495,25 @@ class EngineArgs:
|
||||
# Cluster system parameters group
|
||||
system_group = parser.add_argument_group("System Configuration")
|
||||
system_group.add_argument(
|
||||
"--pod-ips",
|
||||
type=lambda s: s.split(",") if s else None,
|
||||
default=EngineArgs.pod_ips,
|
||||
"--dist-init-ip",
|
||||
default=EngineArgs.dist_init_ip,
|
||||
help=
|
||||
"List of IP addresses for nodes in the cluster (comma-separated).")
|
||||
"IP addresses of master node.")
|
||||
|
||||
system_group.add_argument(
|
||||
"--nnodes",
|
||||
type=int,
|
||||
default=EngineArgs.nnodes,
|
||||
help=
|
||||
"The number of all nodes.")
|
||||
|
||||
system_group.add_argument(
|
||||
"--node-rank",
|
||||
type=int,
|
||||
default=EngineArgs.node_rank,
|
||||
help=
|
||||
"node rank id (range [0, nnodes)).")
|
||||
|
||||
|
||||
|
||||
# Performance tuning parameters group
|
||||
@@ -789,7 +813,9 @@ class EngineArgs:
|
||||
max_num_seqs=self.max_num_seqs,
|
||||
speculative_config=speculative_cfg,
|
||||
max_num_batched_tokens=self.max_num_batched_tokens,
|
||||
pod_ips=self.pod_ips,
|
||||
dist_init_ip=self.dist_init_ip,
|
||||
nnodes=self.nnodes,
|
||||
node_rank=self.node_rank,
|
||||
use_warmup=self.use_warmup,
|
||||
engine_worker_queue_port=self.engine_worker_queue_port,
|
||||
limit_mm_per_prompt=self.limit_mm_per_prompt,
|
||||
|
@@ -6,7 +6,7 @@
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
#dist_init_ip
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
@@ -24,7 +24,7 @@ from fastdeploy import envs
|
||||
from fastdeploy.platforms import current_platform
|
||||
from fastdeploy.scheduler import SchedulerConfig
|
||||
from fastdeploy.utils import (ceil_div, check_unified_ckpt, get_host_ip,
|
||||
is_port_available, llm_logger)
|
||||
is_port_available, get_random_port, llm_logger)
|
||||
|
||||
TaskOption = Literal["generate"]
|
||||
|
||||
@@ -642,7 +642,9 @@ class Config:
|
||||
max_model_len: int = 8192,
|
||||
max_num_seqs: int = 8,
|
||||
max_num_batched_tokens: Optional[int] = None,
|
||||
pod_ips: Optional[List[str]] = None,
|
||||
dist_init_ip: str = None,
|
||||
nnodes: int = 1,
|
||||
node_rank: int = 0,
|
||||
speculative_config: Optional[Dict[str, Any]] = None,
|
||||
graph_optimization_config: Optional[Dict[str, Any]] = None,
|
||||
use_warmup: bool = False,
|
||||
@@ -675,7 +677,6 @@ class Config:
|
||||
max_model_len (int): Maximum model length. Default is 8192.
|
||||
max_num_seqs (int): Maximum number of sequences. Default is 8.
|
||||
max_num_batched_tokens (Optional[int]): Maximum number of batched tokens. Default is None.
|
||||
pod_ips (Optional[List[str]]): List of POD IPs. Default is None.
|
||||
mm_processor_kwargs (Optional[Dict[str, Any]]): Additional arguments for multi-modal processor. Default is None.
|
||||
speculative_config (Optional[Dict[str, Any]]): Speculative execution configuration. Default is None.
|
||||
graph_optimization_config (Optional[Dict[str, Any]]): Graph optimizaion backend execution configuration. Default is None.
|
||||
@@ -699,7 +700,16 @@ class Config:
|
||||
self.tokenizer = tokenizer
|
||||
self.max_num_batched_tokens = max_num_batched_tokens
|
||||
self.tensor_parallel_size = tensor_parallel_size
|
||||
self.pod_ips = pod_ips
|
||||
self.dist_init_ip = dist_init_ip
|
||||
|
||||
self.nnode = nnodes
|
||||
self.node_rank = node_rank
|
||||
if self.dist_init_ip is None:
|
||||
self.master_ip = "0.0.0.0"
|
||||
else:
|
||||
self.master_ip = self.dist_init_ip
|
||||
self.dist_init_addr = f"{self.dist_init_ip}:{get_random_port()}"
|
||||
|
||||
self.max_model_len = max_model_len
|
||||
self.max_num_seqs = max_num_seqs
|
||||
self.limit_mm_per_prompt = limit_mm_per_prompt
|
||||
@@ -716,14 +726,8 @@ class Config:
|
||||
self.graph_optimization_config = graph_optimization_config
|
||||
self.guided_decoding_backend = guided_decoding_backend
|
||||
self.disable_any_whitespace = disable_any_whitespace
|
||||
self.is_master = True
|
||||
self._str_to_list("innode_prefill_ports", int)
|
||||
self._str_to_list("pod_ips", str)
|
||||
|
||||
if self.pod_ips is None:
|
||||
self.nnode = 1
|
||||
else:
|
||||
self.nnode = len(self.pod_ips)
|
||||
|
||||
assert self.splitwise_role in ["mixed", "prefill", "decode"]
|
||||
|
||||
@@ -778,9 +782,9 @@ class Config:
|
||||
|
||||
self.host_ip = get_host_ip()
|
||||
|
||||
if self.pod_ips is None:
|
||||
self.pod_ips = ["0.0.0.0"]
|
||||
elif self.host_ip != self.pod_ips[0]:
|
||||
if self.dist_init_ip is None or self.host_ip == self.master_ip:
|
||||
self.is_master = True
|
||||
else:
|
||||
self.is_master = False
|
||||
|
||||
import paddle
|
||||
|
@@ -32,6 +32,7 @@ from typing import Dict, List, Optional, Tuple
|
||||
import numpy as np
|
||||
import paddle
|
||||
import zmq
|
||||
from opentelemetry import trace
|
||||
from tqdm import tqdm
|
||||
|
||||
from fastdeploy.engine.args_utils import EngineArgs
|
||||
@@ -42,13 +43,13 @@ from fastdeploy.input.preprocess import InputPreprocessor
|
||||
from fastdeploy.inter_communicator import (EngineCacheQueue, EngineWorkerQueue,
|
||||
IPCSignal, ZmqClient)
|
||||
from fastdeploy.metrics.metrics import main_process_metrics
|
||||
from fastdeploy.metrics.trace_util import start_span, start_span_request
|
||||
from fastdeploy.model_executor.guided_decoding import schema_checker
|
||||
from fastdeploy.output.token_processor import (TokenProcessor,
|
||||
WarmUpTokenProcessor)
|
||||
from fastdeploy.splitwise.splitwise_connector import SplitwiseConnector
|
||||
from fastdeploy.utils import EngineError, console_logger, llm_logger
|
||||
from fastdeploy.metrics.trace_util import extract_from_metadata, start_span, start_span_request
|
||||
from opentelemetry import trace
|
||||
|
||||
|
||||
class LLMEngine(object):
|
||||
"""
|
||||
@@ -57,7 +58,6 @@ class LLMEngine(object):
|
||||
Attributes:
|
||||
cfg (Config): Configuration object containing all the parameters.
|
||||
cached_generated_tokens (queue.Queue): Queue to store generated tokens.
|
||||
cached_generated_tokens (queue.Queue): Queue to store generated tokens.
|
||||
scheduler (LocalScheduler or GlobalScheduler): Scheduling tasks.
|
||||
input_processor (InputPreprocessor): Preprocessor for input data.
|
||||
resource_manager (ResourceManager): Manager for resource allocation.
|
||||
@@ -174,7 +174,7 @@ class LLMEngine(object):
|
||||
cache_config=self.cfg.cache_config,
|
||||
tensor_parallel_size=self.cfg.tensor_parallel_size,
|
||||
device_ids=device_ids,
|
||||
pod_ip=self.cfg.pod_ips[0],
|
||||
pod_ip=self.cfg.master_ip,
|
||||
engine_worker_queue_port=self.cfg.engine_worker_queue_port,
|
||||
pid_suffix=self.ipc_signal_suffix)
|
||||
|
||||
@@ -239,11 +239,12 @@ class LLMEngine(object):
|
||||
|
||||
if self.cfg.parallel_config.enable_expert_parallel and self.cfg.parallel_config.data_parallel_size > 1:
|
||||
self.dp_processed = []
|
||||
for i in range(1, self.cfg.parallel_config.data_parallel_size):
|
||||
for i in range(1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode):
|
||||
time.sleep(1)
|
||||
self.dp_processed.append(
|
||||
multiprocessing.Process(target=start_expert_service,
|
||||
args=(self.cfg, i,
|
||||
args=(self.cfg,
|
||||
i + self.cfg.node_rank * self.cfg.worker_num_per_node,
|
||||
self.ipc_signal_suffix)))
|
||||
llm_logger.info(f"Engine is initialized successfully with {self.cfg.tensor_parallel_size}" \
|
||||
+ " data parallel id {}".format(i))
|
||||
@@ -263,10 +264,11 @@ class LLMEngine(object):
|
||||
try:
|
||||
results = self.scheduler.get_results()
|
||||
if len(results) == 0:
|
||||
time.sleep(0.001)
|
||||
time.sleep(0.005)
|
||||
continue
|
||||
for request_id, contents in results.items():
|
||||
for result in contents:
|
||||
self.zmq_server.send_multipart(request_id, result)
|
||||
self.zmq_server.send_multipart(request_id, contents)
|
||||
|
||||
except Exception as e:
|
||||
llm_logger.error("Unexcepted error happend: {}, {}".format(
|
||||
e, str(traceback.format_exc())))
|
||||
@@ -357,9 +359,9 @@ class LLMEngine(object):
|
||||
request, insert_task = None, []
|
||||
results: List[Tuple[str, Optional[str]]] = list()
|
||||
if data:
|
||||
request = Request.from_dict(data)
|
||||
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
|
||||
|
||||
request = Request.from_dict(data)
|
||||
start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER)
|
||||
|
||||
|
||||
llm_logger.debug(f"Receive request: {request}")
|
||||
|
||||
@@ -692,7 +694,7 @@ class LLMEngine(object):
|
||||
Insert tasks to engine.
|
||||
"""
|
||||
for task in tasks:
|
||||
start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER)
|
||||
start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER)
|
||||
# TODO 返回至 scheduler
|
||||
if allocated:
|
||||
current_tasks = []
|
||||
@@ -1006,8 +1008,6 @@ class LLMEngine(object):
|
||||
)
|
||||
|
||||
arguments = (
|
||||
f" --nnodes {str(self.cfg.nnode)}"
|
||||
f" --ips {','.join(self.cfg.pod_ips)}"
|
||||
f" --devices {self.cfg.device_ids} {py_script}"
|
||||
f" --max_num_seqs {self.cfg.max_num_seqs} --max_model_len {self.cfg.max_model_len}"
|
||||
f" --gpu_memory_utilization {self.cfg.cache_config.gpu_memory_utilization}"
|
||||
@@ -1015,7 +1015,7 @@ class LLMEngine(object):
|
||||
f" --device_ids {self.cfg.device_ids}"
|
||||
f" --tensor_parallel_size {self.cfg.tensor_parallel_size}"
|
||||
f" --engine_worker_queue_port {str(self.cfg.engine_worker_queue_port)}"
|
||||
f" --pod_ip {self.cfg.pod_ips[0]}"
|
||||
f" --pod_ip {self.cfg.master_ip}"
|
||||
f" --total_block_num {self.cfg.cache_config.total_block_num}"
|
||||
f" --block_size {self.cfg.cache_config.block_size}"
|
||||
f" --enc_dec_block_num {self.cfg.cache_config.enc_dec_block_num}"
|
||||
@@ -1033,10 +1033,9 @@ class LLMEngine(object):
|
||||
f" --speculative_model_name_or_path {self.cfg.speculative_config.model_name_or_path}"
|
||||
f" --speculative_model_quantization {self.cfg.speculative_config.quantization}"
|
||||
f" --speculative_benchmark_mode {self.cfg.speculative_config.benchmark_mode}"
|
||||
f" --graph_optimiaztion_config '{self.cfg.graph_optimization_config.to_json_string()}'"
|
||||
f" --graph_optimization_config '{self.cfg.graph_optimization_config.to_json_string()}'"
|
||||
f" --guided_decoding_backend {self.cfg.guided_decoding_backend}"
|
||||
f" --load_strategy {self.cfg.model_config.load_strategy}"
|
||||
f" --enable_mm {self.cfg.enable_mm}")
|
||||
f" --load_strategy {self.cfg.model_config.load_strategy}")
|
||||
|
||||
|
||||
worker_append_flag = {
|
||||
@@ -1051,12 +1050,17 @@ class LLMEngine(object):
|
||||
"disable_any_whitespace": self.cfg.disable_any_whitespace,
|
||||
"enable-custom-all-reduce": self.cfg.parallel_config.enable_custom_all_reduce,
|
||||
"enable_logprob": self.cfg.enable_logprob,
|
||||
"enable_mm": self.cfg.enable_mm,
|
||||
}
|
||||
for worker_flag, value in worker_append_flag.items():
|
||||
if value:
|
||||
arguments = arguments + f" --{worker_flag}"
|
||||
if self.cfg.nnode > 1:
|
||||
pd_cmd = pd_cmd + f" --ips {self.cfg.ips}"
|
||||
pd_cmd = pd_cmd + (
|
||||
f" --master {self.cfg.dist_init_addr}"
|
||||
f" --nnodes {str(self.cfg.nnode)}"
|
||||
f" --rank {str(self.cfg.node_rank)}"
|
||||
)
|
||||
pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log"
|
||||
llm_logger.info("Launch worker service command: {}".format(pd_cmd))
|
||||
p = subprocess.Popen(
|
||||
@@ -1157,7 +1161,7 @@ class LLMEngine(object):
|
||||
cache_config=self.cfg.cache_config,
|
||||
tensor_parallel_size=self.cfg.tensor_parallel_size,
|
||||
device_ids=device_ids,
|
||||
pod_ip=self.cfg.pod_ips[0],
|
||||
pod_ip=self.cfg.master_ip,
|
||||
engine_worker_queue_port=self.cfg.engine_worker_queue_port,
|
||||
pid_suffix=self.ipc_signal_suffix)
|
||||
def check_health(self, time_interval_threashold=30):
|
||||
@@ -1244,8 +1248,9 @@ class LLMEngine(object):
|
||||
"""
|
||||
start queue service for engine worker communication
|
||||
"""
|
||||
address = (self.cfg.pod_ips[0], self.cfg.engine_worker_queue_port)
|
||||
if self.cfg.host_ip == self.cfg.pod_ips[0] or self.cfg.pod_ips[0] == "0.0.0.0":
|
||||
address = (self.cfg.master_ip, self.cfg.engine_worker_queue_port)
|
||||
if self.cfg.host_ip == self.cfg.master_ip or self.cfg.master_ip == "0.0.0.0":
|
||||
llm_logger.info(f"Starting engine worker queue server service at {address}")
|
||||
self.engine_worker_queue_server = EngineWorkerQueue(
|
||||
address=address,
|
||||
is_server=True,
|
||||
@@ -1255,7 +1260,7 @@ class LLMEngine(object):
|
||||
|
||||
if self.cfg.cache_config.enable_prefix_caching or self.cfg.splitwise_role != 'mixed':
|
||||
self.cache_task_queue = EngineCacheQueue(
|
||||
address=(self.cfg.pod_ips[0], self.cfg.cache_config.cache_queue_port),
|
||||
address=(self.cfg.master_ip, self.cfg.cache_config.cache_queue_port),
|
||||
authkey=b'cache_queue_service',
|
||||
is_server=True,
|
||||
num_client=self.cfg.tensor_parallel_size,
|
||||
@@ -1269,4 +1274,6 @@ class LLMEngine(object):
|
||||
is_server=False,
|
||||
num_client=self.cfg.tensor_parallel_size,
|
||||
client_id=0,
|
||||
local_data_parallel_id=0)
|
||||
local_data_parallel_size=self.cfg.parallel_config.data_parallel_size,
|
||||
local_data_parallel_id= min(self.cfg.worker_num_per_node * self.cfg.node_rank,
|
||||
self.cfg.parallel_config.data_parallel_size - 1))
|
||||
|
@@ -49,8 +49,8 @@ class ExpertService(object):
|
||||
cfg (Config): Config object containing all the configuration parameters.
|
||||
"""
|
||||
self.cfg = cfg
|
||||
start_pos = local_data_parallel_id * self.cfg.tensor_parallel_size
|
||||
end_pos = (local_data_parallel_id + 1) * self.cfg.tensor_parallel_size
|
||||
start_pos = (local_data_parallel_id * self.cfg.tensor_parallel_size) % self.cfg.worker_num_per_node
|
||||
end_pos = ((local_data_parallel_id + 1) * self.cfg.tensor_parallel_size) % self.cfg.worker_num_per_node
|
||||
self.cfg.cache_config.rdma_comm_ports = self.cfg.cache_config.rdma_comm_ports[
|
||||
start_pos:end_pos]
|
||||
self.cfg.local_device_ids = self.cfg.device_ids.split(
|
||||
@@ -65,7 +65,7 @@ class ExpertService(object):
|
||||
|
||||
self.cfg.parallel_config.local_data_parallel_id = local_data_parallel_id
|
||||
|
||||
address = (cfg.pod_ips[0], cfg.engine_worker_queue_port)
|
||||
address = (cfg.master_ip, cfg.engine_worker_queue_port)
|
||||
self.engine_worker_queue = EngineWorkerQueue(
|
||||
address=address,
|
||||
is_server=False,
|
||||
@@ -118,7 +118,7 @@ class ExpertService(object):
|
||||
cache_config=self.cfg.cache_config,
|
||||
tensor_parallel_size=self.cfg.tensor_parallel_size,
|
||||
device_ids=self.cfg.local_device_ids,
|
||||
pod_ip=self.cfg.pod_ips[0],
|
||||
pod_ip=self.cfg.master_ip,
|
||||
engine_worker_queue_port=self.cfg.engine_worker_queue_port,
|
||||
pid_suffix=f"{local_data_parallel_id}_{ipc_signal_suffix}"
|
||||
)
|
||||
|
@@ -20,7 +20,7 @@ import time
|
||||
from dataclasses import asdict, dataclass, fields
|
||||
from typing import Any, Dict, Optional, Union
|
||||
|
||||
import numpy
|
||||
import numpy as np
|
||||
|
||||
from fastdeploy.engine.sampling_params import SamplingParams
|
||||
from fastdeploy.utils import data_processor_logger
|
||||
@@ -181,7 +181,7 @@ class Request:
|
||||
f"sampling_params={self.sampling_params})")
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(slots=True)
|
||||
class CompletionOutput:
|
||||
"""The output data of one completion output of a request.
|
||||
|
||||
@@ -235,7 +235,7 @@ class CompletionOutput:
|
||||
f"reasoning_content={self.reasoning_content!r}")
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(slots=True)
|
||||
class RequestMetrics:
|
||||
"""Metrics associated with a request.
|
||||
|
||||
@@ -333,6 +333,12 @@ class RequestOutput:
|
||||
self.error_code = error_code
|
||||
self.error_msg = error_msg
|
||||
|
||||
|
||||
if prompt_token_ids is None:
|
||||
self.prompt_token_ids = []
|
||||
elif isinstance(self.prompt_token_ids, np.ndarray):
|
||||
self.prompt_token_ids = self.prompt_token_ids.tolist()
|
||||
|
||||
def add(self, next_output: "RequestOutput") -> None:
|
||||
"""Merge RequestOutput into this one"""
|
||||
|
||||
@@ -365,11 +371,6 @@ class RequestOutput:
|
||||
|
||||
def to_dict(self):
|
||||
"""convert RequestOutput into a serializable dict """
|
||||
if self.prompt_token_ids is None:
|
||||
self.prompt_token_ids = []
|
||||
|
||||
if type(self.prompt_token_ids) is numpy.ndarray:
|
||||
self.prompt_token_ids = self.prompt_token_ids.tolist()
|
||||
|
||||
return {
|
||||
"request_id": self.request_id,
|
||||
|
@@ -85,7 +85,7 @@ class LLM:
|
||||
|
||||
self.mutex = threading.Lock()
|
||||
self.req_output = dict()
|
||||
self.master_node_ip = self.llm_engine.cfg.pod_ips[0]
|
||||
self.master_node_ip = self.llm_engine.cfg.master_ip
|
||||
self._receive_output_thread = threading.Thread(
|
||||
target=self._receive_output, daemon=True)
|
||||
self._receive_output_thread.start()
|
||||
@@ -169,6 +169,8 @@ class LLM:
|
||||
|
||||
# get output
|
||||
outputs = self._run_engine(req_ids, use_tqdm=use_tqdm)
|
||||
for i in range(len(outputs)):
|
||||
outputs[i].prompt = prompts[i]
|
||||
return outputs
|
||||
|
||||
def chat(
|
||||
|
@@ -24,7 +24,7 @@ import zmq
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.responses import JSONResponse, Response, StreamingResponse
|
||||
from prometheus_client import CONTENT_TYPE_LATEST
|
||||
from fastdeploy.metrics.trace_util import inject_to_metadata
|
||||
from fastdeploy.metrics.trace_util import inject_to_metadata,instrument
|
||||
|
||||
from fastdeploy.engine.args_utils import EngineArgs
|
||||
from fastdeploy.engine.engine import LLMEngine
|
||||
@@ -122,8 +122,8 @@ async def lifespan(app: FastAPI):
|
||||
args.mm_processor_kwargs, args.enable_mm,
|
||||
args.reasoning_parser)
|
||||
app.state.dynamic_load_weight = args.dynamic_load_weight
|
||||
chat_handler = OpenAIServingChat(engine_client, pid, args.pod_ips)
|
||||
completion_handler = OpenAIServingCompletion(engine_client, pid, args.pod_ips)
|
||||
chat_handler = OpenAIServingChat(engine_client, pid, args.dist_init_ip)
|
||||
completion_handler = OpenAIServingCompletion(engine_client, pid, args.dist_init_ip)
|
||||
engine_client.create_zmq_client(model=pid, mode=zmq.PUSH)
|
||||
engine_client.pid = pid
|
||||
app.state.engine_client = engine_client
|
||||
@@ -141,6 +141,7 @@ async def lifespan(app: FastAPI):
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
instrument(app)
|
||||
|
||||
|
||||
# TODO 传递真实引擎值 通过pid 获取状态
|
||||
@@ -397,11 +398,6 @@ def launch_controller_server():
|
||||
"""Controller server running the sub thread"""
|
||||
if args.controller_port < 0:
|
||||
return
|
||||
|
||||
if not is_port_available(args.host, args.controller_port):
|
||||
raise Exception(
|
||||
f"The parameter `controller_port`:{args.controller_port} is already in use."
|
||||
)
|
||||
|
||||
if not is_port_available(args.host, args.controller_port):
|
||||
raise Exception(
|
||||
|
@@ -20,7 +20,8 @@ import time
|
||||
import traceback
|
||||
import uuid
|
||||
from typing import List, Optional
|
||||
|
||||
import numpy as np
|
||||
import msgpack
|
||||
import aiozmq
|
||||
from aiozmq import zmq
|
||||
|
||||
@@ -39,16 +40,16 @@ class OpenAIServingChat:
|
||||
OpenAI-style chat completions serving
|
||||
"""
|
||||
|
||||
def __init__(self, engine_client, pid, pod_ips):
|
||||
def __init__(self, engine_client, pid, dist_init_ip):
|
||||
self.engine_client = engine_client
|
||||
self.pid = pid
|
||||
self.pod_ips = pod_ips
|
||||
self.master_ip = dist_init_ip
|
||||
self.host_ip = get_host_ip()
|
||||
|
||||
def _check_master(self):
|
||||
if self.pod_ips is None:
|
||||
if self.master_ip is None:
|
||||
return True
|
||||
if self.host_ip == self.pod_ips[0]:
|
||||
if self.host_ip == self.master_ip:
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -74,6 +75,8 @@ class OpenAIServingChat:
|
||||
current_req_dict = request.to_dict_for_infer(request_id)
|
||||
current_req_dict["arrival_time"] = time.time()
|
||||
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
|
||||
if isinstance(prompt_token_ids, np.ndarray):
|
||||
prompt_token_ids = prompt_token_ids.tolist()
|
||||
except Exception as e:
|
||||
return ErrorResponse(code=400, message=str(e))
|
||||
|
||||
@@ -118,6 +121,7 @@ class OpenAIServingChat:
|
||||
num_choices = 1
|
||||
max_streaming_response_tokens = 1
|
||||
enable_thinking = None
|
||||
include_stop_str_in_output = False
|
||||
if request.metadata is not None and request.metadata.get("max_streaming_response_tokens", 1) > 1:
|
||||
max_streaming_response_tokens = request.metadata["max_streaming_response_tokens"]
|
||||
|
||||
@@ -143,6 +147,9 @@ class OpenAIServingChat:
|
||||
dealer.write([b"", request_id.encode('utf-8')])
|
||||
choices = []
|
||||
current_waiting_time = 0
|
||||
if request.metadata is not None:
|
||||
enable_thinking = request.metadata.get("enable_thinking")
|
||||
include_stop_str_in_output = request.metadata.get("include_stop_str_in_output", False)
|
||||
while num_choices > 0:
|
||||
try:
|
||||
raw_data = await asyncio.wait_for(dealer.read(), timeout=10)
|
||||
@@ -158,102 +165,106 @@ class OpenAIServingChat:
|
||||
raise ValueError(f"Engine is not healthy: {msg}")
|
||||
else:
|
||||
current_waiting_time = 0
|
||||
await asyncio.sleep(0.1)
|
||||
await asyncio.sleep(0.01)
|
||||
continue
|
||||
response = msgpack.unpackb(raw_data[-1])
|
||||
for res in response:
|
||||
if res.get("error_code", 200) != 200:
|
||||
raise ValueError("{}".format(res["error_msg"]))
|
||||
|
||||
res = json.loads(raw_data[-1].decode('utf-8'))
|
||||
if res.get("error_code", 200) != 200:
|
||||
raise ValueError("{}".format(res["error_msg"]))
|
||||
if request.metadata is not None:
|
||||
enable_thinking = request.metadata.get("enable_thinking")
|
||||
self.engine_client.data_processor.process_response_dict(
|
||||
res, stream=True, enable_thinking=enable_thinking)
|
||||
self.engine_client.data_processor.process_response_dict(
|
||||
res, stream=True, enable_thinking=enable_thinking, include_stop_str_in_output=include_stop_str_in_output)
|
||||
|
||||
if res['metrics']['first_token_time'] is not None:
|
||||
arrival_time = res['metrics']['first_token_time']
|
||||
inference_start_time = res['metrics']['inference_start_time']
|
||||
else:
|
||||
arrival_time = res['metrics']['arrival_time'] - inference_start_time
|
||||
if first_iteration:
|
||||
num_prompt_tokens = len(prompt_token_ids)
|
||||
num_cached_tokens = res.get("num_cached_tokens", 0)
|
||||
for i in range(num_choices):
|
||||
choice = ChatCompletionResponseStreamChoice(
|
||||
index=i,
|
||||
delta=DeltaMessage(role="assistant", content="", reasoning_content="", tool_calls=None)
|
||||
)
|
||||
if request.metadata is not None and request.metadata.get("training", False):
|
||||
choice.delta.token_ids = prompt_token_ids
|
||||
chunk = ChatCompletionStreamResponse(
|
||||
id=request_id,
|
||||
object=chunk_object_type,
|
||||
created=created_time,
|
||||
choices=[choice],
|
||||
model=model_name
|
||||
)
|
||||
if include_continuous_usage:
|
||||
chunk.usage = UsageInfo(
|
||||
prompt_tokens=num_prompt_tokens,
|
||||
completion_tokens=0,
|
||||
total_tokens=num_prompt_tokens,
|
||||
prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=num_cached_tokens)
|
||||
)
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n"
|
||||
first_iteration = False
|
||||
|
||||
output = res["outputs"]
|
||||
delta_text = output["text"]
|
||||
raw_top_logprobs = output["top_logprobs"]
|
||||
logprobs_res = None
|
||||
if raw_top_logprobs is not None:
|
||||
top_logprobs = LogprobsLists(
|
||||
logprob_token_ids=raw_top_logprobs[0],
|
||||
logprobs=raw_top_logprobs[1],
|
||||
sampled_token_ranks=raw_top_logprobs[2],
|
||||
)
|
||||
logprobs_res = self.build_logprobs_response(
|
||||
request_logprobs=request.logprobs,
|
||||
response_logprobs=top_logprobs,
|
||||
request_top_logprobs=request.top_logprobs,
|
||||
)
|
||||
|
||||
previous_num_tokens += len(output["token_ids"])
|
||||
delta_message = DeltaMessage(content=delta_text, reasoning_content=output.get("reasoning_content"), \
|
||||
token_ids=output.get("token_ids"), tool_calls=output.get("tool_call_content", []))
|
||||
|
||||
choice = ChatCompletionResponseStreamChoice(
|
||||
index=0,
|
||||
delta=delta_message,
|
||||
logprobs=logprobs_res,
|
||||
arrival_time=arrival_time
|
||||
)
|
||||
if res["finished"]:
|
||||
num_choices -= 1
|
||||
work_process_metrics.e2e_request_latency.observe(time.time() - res["metrics"]["request_start_time"])
|
||||
has_no_token_limit = request.max_tokens is None and request.max_completion_tokens is None
|
||||
max_tokens = request.max_completion_tokens or request.max_tokens
|
||||
if has_no_token_limit or previous_num_tokens != max_tokens:
|
||||
choice.finish_reason = "stop"
|
||||
if self.engine_client.reasoning_parser == "ernie_x1" and \
|
||||
output.get("finish_reason", "") == "tool_calls":
|
||||
choice.finish_reason = "tool_calls"
|
||||
if res['metrics']['first_token_time'] is not None:
|
||||
arrival_time = res['metrics']['first_token_time']
|
||||
inference_start_time = res['metrics']['inference_start_time']
|
||||
else:
|
||||
choice.finish_reason = "length"
|
||||
arrival_time = res['metrics']['arrival_time'] - inference_start_time
|
||||
if first_iteration:
|
||||
num_prompt_tokens = len(prompt_token_ids)
|
||||
num_cached_tokens = res.get("num_cached_tokens", 0)
|
||||
for i in range(num_choices):
|
||||
choice = ChatCompletionResponseStreamChoice(
|
||||
index=i,
|
||||
delta=DeltaMessage(role="assistant", content="", reasoning_content="", tool_calls=None)
|
||||
)
|
||||
if request.metadata is not None and request.metadata.get("training", False):
|
||||
choice.delta.token_ids = prompt_token_ids
|
||||
chunk = ChatCompletionStreamResponse(
|
||||
id=request_id,
|
||||
object=chunk_object_type,
|
||||
created=created_time,
|
||||
choices=[choice],
|
||||
model=model_name
|
||||
)
|
||||
if include_continuous_usage:
|
||||
chunk.usage = UsageInfo(
|
||||
prompt_tokens=num_prompt_tokens,
|
||||
completion_tokens=0,
|
||||
total_tokens=num_prompt_tokens,
|
||||
prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=num_cached_tokens)
|
||||
)
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n"
|
||||
first_iteration = False
|
||||
|
||||
if res.get("error_msg") is not None and "Recover" in res["error_msg"]:
|
||||
choice.finish_reason = "recover_stop"
|
||||
output = res["outputs"]
|
||||
delta_text = output["text"]
|
||||
raw_top_logprobs = output["top_logprobs"]
|
||||
logprobs_res = None
|
||||
if raw_top_logprobs is not None:
|
||||
top_logprobs = LogprobsLists(
|
||||
logprob_token_ids=raw_top_logprobs[0],
|
||||
logprobs=raw_top_logprobs[1],
|
||||
sampled_token_ranks=raw_top_logprobs[2],
|
||||
)
|
||||
logprobs_res = self.build_logprobs_response(
|
||||
request_logprobs=request.logprobs,
|
||||
response_logprobs=top_logprobs,
|
||||
request_top_logprobs=request.top_logprobs,
|
||||
)
|
||||
|
||||
if request.metadata is not None and request.metadata.get("training", False) and delta_text != "":
|
||||
choice.delta.token_ids = output["token_ids"]
|
||||
if include_continuous_usage:
|
||||
chunk.usage = UsageInfo(
|
||||
prompt_tokens=num_prompt_tokens,
|
||||
completion_tokens=previous_num_tokens,
|
||||
total_tokens=num_prompt_tokens + previous_num_tokens
|
||||
previous_num_tokens += len(output["token_ids"])
|
||||
delta_message = DeltaMessage(content=delta_text, reasoning_content=output.get("reasoning_content"), \
|
||||
token_ids=output.get("token_ids"), tool_calls=output.get("tool_call_content", []))
|
||||
|
||||
choice = ChatCompletionResponseStreamChoice(
|
||||
index=0,
|
||||
delta=delta_message,
|
||||
logprobs=logprobs_res,
|
||||
arrival_time=arrival_time
|
||||
)
|
||||
choices.append(choice)
|
||||
if res["finished"]:
|
||||
num_choices -= 1
|
||||
work_process_metrics.e2e_request_latency.observe(time.time() - res["metrics"]["request_start_time"])
|
||||
has_no_token_limit = request.max_tokens is None and request.max_completion_tokens is None
|
||||
max_tokens = request.max_completion_tokens or request.max_tokens
|
||||
if has_no_token_limit or previous_num_tokens != max_tokens:
|
||||
choice.finish_reason = "stop"
|
||||
if self.engine_client.reasoning_parser == "ernie_x1" and \
|
||||
output.get("finish_reason", "") == "tool_calls":
|
||||
choice.finish_reason = "tool_calls"
|
||||
else:
|
||||
choice.finish_reason = "length"
|
||||
|
||||
if len(choices) == max_streaming_response_tokens or res["finished"]:
|
||||
if res.get("error_msg") is not None and "Recover" in res["error_msg"]:
|
||||
choice.finish_reason = "recover_stop"
|
||||
|
||||
if request.metadata is not None and request.metadata.get("training", False) and delta_text != "":
|
||||
choice.delta.token_ids = output["token_ids"]
|
||||
if include_continuous_usage:
|
||||
chunk.usage = UsageInfo(
|
||||
prompt_tokens=num_prompt_tokens,
|
||||
completion_tokens=previous_num_tokens,
|
||||
total_tokens=num_prompt_tokens + previous_num_tokens
|
||||
)
|
||||
choices.append(choice)
|
||||
|
||||
if len(choices) == max_streaming_response_tokens or res["finished"]:
|
||||
chunk.choices = choices
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
choices = []
|
||||
|
||||
if choices:
|
||||
chunk.choices = choices
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
choices = []
|
||||
@@ -296,6 +307,7 @@ class OpenAIServingChat:
|
||||
created_time = int(time.time())
|
||||
final_res = None
|
||||
enable_thinking = None
|
||||
include_stop_str_in_output = False
|
||||
try:
|
||||
dealer = await aiozmq.create_zmq_stream(
|
||||
zmq.DEALER,
|
||||
@@ -321,33 +333,39 @@ class OpenAIServingChat:
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
|
||||
data = json.loads(raw_data[-1].decode('utf-8'))
|
||||
if data.get("error_code", 200) != 200:
|
||||
raise ValueError("{}".format(data["error_msg"]))
|
||||
if request.metadata is not None:
|
||||
enable_thinking = request.metadata.get("enable_thinking")
|
||||
data = self.engine_client.data_processor.process_response_dict(
|
||||
data, stream=False, enable_thinking=enable_thinking)
|
||||
# api_server_logger.debug(f"Client {request_id} received: {data}")
|
||||
previous_num_tokens += len(data["outputs"]["token_ids"])
|
||||
# The logprob for handling the response
|
||||
output = data["outputs"]
|
||||
raw_top_logprobs = output["top_logprobs"]
|
||||
if raw_top_logprobs is not None:
|
||||
top_logprobs = LogprobsLists(
|
||||
logprob_token_ids=raw_top_logprobs[0],
|
||||
logprobs=raw_top_logprobs[1],
|
||||
sampled_token_ranks=raw_top_logprobs[2],
|
||||
)
|
||||
logprobs_res = self.build_logprobs_response(
|
||||
request_logprobs=request.logprobs,
|
||||
response_logprobs=top_logprobs,
|
||||
request_top_logprobs=request.top_logprobs,
|
||||
)
|
||||
if logprobs_res and logprobs_res.content is not None:
|
||||
logprob_contents.extend(logprobs_res.content)
|
||||
if data["finished"]:
|
||||
final_res = data
|
||||
response = msgpack.unpackb(raw_data[-1])
|
||||
task_is_finished = False
|
||||
for data in response:
|
||||
if data.get("error_code", 200) != 200:
|
||||
raise ValueError("{}".format(data["error_msg"]))
|
||||
if request.metadata is not None:
|
||||
enable_thinking = request.metadata.get("enable_thinking")
|
||||
include_stop_str_in_output = request.metadata.get("include_stop_str_in_output", False)
|
||||
data = self.engine_client.data_processor.process_response_dict(
|
||||
data, stream=False, enable_thinking=enable_thinking, include_stop_str_in_output=include_stop_str_in_output)
|
||||
# api_server_logger.debug(f"Client {request_id} received: {data}")
|
||||
previous_num_tokens += len(data["outputs"]["token_ids"])
|
||||
# The logprob for handling the response
|
||||
output = data["outputs"]
|
||||
raw_top_logprobs = output["top_logprobs"]
|
||||
if raw_top_logprobs is not None:
|
||||
top_logprobs = LogprobsLists(
|
||||
logprob_token_ids=raw_top_logprobs[0],
|
||||
logprobs=raw_top_logprobs[1],
|
||||
sampled_token_ranks=raw_top_logprobs[2],
|
||||
)
|
||||
logprobs_res = self.build_logprobs_response(
|
||||
request_logprobs=request.logprobs,
|
||||
response_logprobs=top_logprobs,
|
||||
request_top_logprobs=request.top_logprobs,
|
||||
)
|
||||
if logprobs_res and logprobs_res.content is not None:
|
||||
logprob_contents.extend(logprobs_res.content)
|
||||
if data["finished"]:
|
||||
final_res = data
|
||||
task_is_finished = True
|
||||
break
|
||||
if task_is_finished:
|
||||
break
|
||||
finally:
|
||||
dealer.close()
|
||||
|
@@ -17,6 +17,8 @@
|
||||
import asyncio
|
||||
import aiozmq
|
||||
import json
|
||||
import msgpack
|
||||
import numpy as np
|
||||
from aiozmq import zmq
|
||||
from asyncio import FIRST_COMPLETED, AbstractEventLoop, Task
|
||||
import time
|
||||
@@ -44,16 +46,16 @@ from fastdeploy.engine.request import RequestOutput
|
||||
|
||||
|
||||
class OpenAIServingCompletion:
|
||||
def __init__(self, engine_client, pid, pod_ips):
|
||||
def __init__(self, engine_client, pid, dist_init_ip):
|
||||
self.engine_client = engine_client
|
||||
self.pid = pid
|
||||
self.pod_ips = pod_ips
|
||||
self.master_ip = dist_init_ip
|
||||
self.host_ip = get_host_ip()
|
||||
|
||||
def _check_master(self):
|
||||
if self.pod_ips is None:
|
||||
if self.master_ip is None:
|
||||
return True
|
||||
if self.host_ip == self.pod_ips[0]:
|
||||
if self.host_ip == self.master_ip:
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -104,9 +106,10 @@ class OpenAIServingCompletion:
|
||||
current_req_dict = request.to_dict_for_infer(request_id_idx, prompt)
|
||||
try:
|
||||
current_req_dict["arrival_time"] = time.time()
|
||||
prompt_batched_token_ids.append(
|
||||
self.engine_client.format_and_add_data(current_req_dict)
|
||||
)
|
||||
prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict)
|
||||
if isinstance(prompt_token_ids, np.ndarray):
|
||||
prompt_token_ids = prompt_token_ids.tolist()
|
||||
prompt_batched_token_ids.append(prompt_token_ids)
|
||||
except Exception as e:
|
||||
return ErrorResponse(message=str(e), code=400)
|
||||
|
||||
@@ -179,18 +182,20 @@ class OpenAIServingCompletion:
|
||||
current_waiting_time = 0
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
data = json.loads(raw_data[-1].decode("utf-8"))
|
||||
rid = int(data["request_id"].split("-")[-1])
|
||||
if data.get("error_code", 200) != 200:
|
||||
raise ValueError("{}".format(data["error_msg"]))
|
||||
response = msgpack.unpackb(raw_data[-1])
|
||||
for data in response:
|
||||
rid = int(data["request_id"].split("-")[-1])
|
||||
if data.get("error_code", 200) != 200:
|
||||
raise ValueError("{}".format(data["error_msg"]))
|
||||
|
||||
self.engine_client.data_processor.process_response_dict(
|
||||
data, stream=False)
|
||||
output_tokens[rid] += len(data["outputs"]["token_ids"])
|
||||
if data.get("finished", False):
|
||||
data["output_token_ids"] = output_tokens[rid]
|
||||
valid_results[rid] = data
|
||||
num_choices -= 1
|
||||
self.engine_client.data_processor.process_response_dict(
|
||||
data, stream=False)
|
||||
output_tokens[rid] += len(data["outputs"]["token_ids"])
|
||||
if data.get("finished", False):
|
||||
data["output_token_ids"] = output_tokens[rid]
|
||||
valid_results[rid] = data
|
||||
num_choices -= 1
|
||||
break
|
||||
|
||||
return self.request_output_to_completion_response(
|
||||
final_res_batch=valid_results,
|
||||
@@ -238,6 +243,12 @@ class OpenAIServingCompletion:
|
||||
if request.suffix is not None and request.suffix.get("max_streaming_response_tokens", 1) > 1:
|
||||
max_streaming_response_tokens = request.suffix["max_streaming_response_tokens"]
|
||||
choices = []
|
||||
chunk = CompletionStreamResponse(
|
||||
id=request_id,
|
||||
created=created_time,
|
||||
model=model_name,
|
||||
choices=choices
|
||||
)
|
||||
|
||||
current_waiting_time = 0
|
||||
while num_choices > 0:
|
||||
@@ -256,82 +267,86 @@ class OpenAIServingCompletion:
|
||||
continue
|
||||
|
||||
|
||||
res = json.loads(raw_data[-1].decode('utf-8'))
|
||||
idx = int(res["request_id"].split("-")[-1])
|
||||
if res.get("error_code", 200) != 200:
|
||||
raise ValueError("{}".format(res["error_msg"]))
|
||||
response = msgpack.unpackb(raw_data[-1])
|
||||
for res in response:
|
||||
idx = int(res["request_id"].split("-")[-1])
|
||||
if res.get("error_code", 200) != 200:
|
||||
raise ValueError("{}".format(res["error_msg"]))
|
||||
|
||||
if first_iteration[idx]:
|
||||
if request.suffix is not None and request.suffix.get("training", False):
|
||||
if first_iteration[idx]:
|
||||
if request.suffix is not None and request.suffix.get("training", False):
|
||||
chunk = CompletionStreamResponse(
|
||||
id=request_id,
|
||||
created=created_time,
|
||||
model=model_name,
|
||||
choices=[CompletionResponseStreamChoice(
|
||||
index=idx,
|
||||
text="",
|
||||
token_ids=list(prompt_batched_token_ids[idx])
|
||||
)]
|
||||
)
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
first_iteration[idx] = False
|
||||
|
||||
|
||||
self.engine_client.data_processor.process_response_dict(
|
||||
res, stream=True)
|
||||
if res['metrics'].get('first_token_time') is not None:
|
||||
arrival_time = res['metrics']['first_token_time']
|
||||
inference_start_time[idx] = res['metrics']['inference_start_time']
|
||||
else:
|
||||
arrival_time = res['metrics']['arrival_time'] - inference_start_time[idx]
|
||||
|
||||
output = res["outputs"]
|
||||
|
||||
choices.append(CompletionResponseStreamChoice(
|
||||
index=idx,
|
||||
text=output["text"],
|
||||
token_ids=output.get("token_ids"),
|
||||
tool_calls=output.get("tool_call_content"),
|
||||
reasoning_content=output.get("reasoning_content"),
|
||||
arrival_time=arrival_time
|
||||
))
|
||||
if res["finished"]:
|
||||
if request.max_tokens is None or output_tokens[idx] + 1 != request.max_tokens:
|
||||
chunk.choices[0].finish_reason = "stop"
|
||||
if self.engine_client.reasoning_parser == "ernie_x1" and \
|
||||
output.get("finish_reason", "") == "tool_calls":
|
||||
chunk.choices[0].finish_reason = "tool_calls"
|
||||
else:
|
||||
chunk.choices[0].finish_reason = "length"
|
||||
|
||||
output_tokens[idx] += 1
|
||||
|
||||
if len(choices) == max_streaming_response_tokens or res["finished"]:
|
||||
chunk = CompletionStreamResponse(
|
||||
id=request_id,
|
||||
created=created_time,
|
||||
model=model_name,
|
||||
choices=[CompletionResponseStreamChoice(
|
||||
index=idx,
|
||||
text="",
|
||||
token_ids=list(prompt_batched_token_ids[idx])
|
||||
)]
|
||||
choices=choices
|
||||
)
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
first_iteration[idx] = False
|
||||
choices = []
|
||||
|
||||
|
||||
self.engine_client.data_processor.process_response_dict(
|
||||
res, stream=True)
|
||||
if res['metrics'].get('first_token_time') is not None:
|
||||
arrival_time = res['metrics']['first_token_time']
|
||||
inference_start_time[idx] = res['metrics']['inference_start_time']
|
||||
else:
|
||||
arrival_time = res['metrics']['arrival_time'] - inference_start_time[idx]
|
||||
# api_server_logger.info(f"{arrival_time}")
|
||||
|
||||
output = res["outputs"]
|
||||
|
||||
choices.append(CompletionResponseStreamChoice(
|
||||
index=idx,
|
||||
text=output["text"],
|
||||
token_ids=output.get("token_ids"),
|
||||
tool_calls=output.get("tool_call_content"),
|
||||
reasoning_content=output.get("reasoning_content"),
|
||||
arrival_time=arrival_time
|
||||
))
|
||||
if res["finished"]:
|
||||
if request.max_tokens is None or output_tokens[idx] + 1 != request.max_tokens:
|
||||
chunk.choices[0].finish_reason = "stop"
|
||||
if self.engine_client.reasoning_parser == "ernie_x1" and \
|
||||
output.get("finish_reason", "") == "tool_calls":
|
||||
chunk.choices[0].finish_reason = "tool_calls"
|
||||
else:
|
||||
chunk.choices[0].finish_reason = "length"
|
||||
|
||||
output_tokens[idx] += 1
|
||||
|
||||
if len(choices) == max_streaming_response_tokens or res["finished"]:
|
||||
chunk = CompletionStreamResponse(
|
||||
id=request_id,
|
||||
created=created_time,
|
||||
model=model_name,
|
||||
choices=choices
|
||||
)
|
||||
choices = []
|
||||
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
|
||||
if res["finished"]:
|
||||
num_choices -= 1
|
||||
if getattr(request, "stream_options", None) and request.stream_options.include_usage:
|
||||
usage_chunk = CompletionStreamResponse(
|
||||
id=request_id,
|
||||
created=created_time,
|
||||
model=model_name,
|
||||
choices=[],
|
||||
usage=UsageInfo(
|
||||
prompt_tokens=len(prompt_batched_token_ids[idx]),
|
||||
completion_tokens=output_tokens[idx]
|
||||
if res["finished"]:
|
||||
num_choices -= 1
|
||||
if getattr(request, "stream_options", None) and request.stream_options.include_usage:
|
||||
usage_chunk = CompletionStreamResponse(
|
||||
id=request_id,
|
||||
created=created_time,
|
||||
model=model_name,
|
||||
choices=[],
|
||||
usage=UsageInfo(
|
||||
prompt_tokens=len(prompt_batched_token_ids[idx]),
|
||||
completion_tokens=output_tokens[idx]
|
||||
)
|
||||
)
|
||||
)
|
||||
yield f"data: {usage_chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
yield f"data: {usage_chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
if choices:
|
||||
chunk.choices = choices
|
||||
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
|
||||
choices = []
|
||||
|
||||
|
||||
except Exception as e:
|
||||
|
@@ -101,6 +101,34 @@ environment_variables: dict[str, Callable[[], Any]] = {
|
||||
# Whether to use DeepGemm for FP8 blockwise MoE.
|
||||
"FD_USE_DEEP_GEMM":
|
||||
lambda: bool(int(os.getenv("FD_USE_DEEP_GEMM", "1"))),
|
||||
|
||||
# Whether to use aggregate send.
|
||||
"FD_USE_AGGREGATE_SEND":
|
||||
lambda: bool(int(os.getenv("FD_USE_AGGREGATE_SEND", "0"))),
|
||||
|
||||
# Whether to open Trace.
|
||||
"TRACES_ENABLE":
|
||||
lambda: os.getenv("TRACES_ENABLE", "false"),
|
||||
|
||||
# set traec Server name.
|
||||
"FD_SERVICE_NAME":
|
||||
lambda: os.getenv("FD_SERVICE_NAME", "FastDeploy"),
|
||||
|
||||
# set traec host name.
|
||||
"FD_HOST_NAME":
|
||||
lambda: os.getenv("FD_HOST_NAME", "localhost"),
|
||||
|
||||
# set traec exporter.
|
||||
"TRACES_EXPORTER":
|
||||
lambda: os.getenv("TRACES_EXPORTER", "console"),
|
||||
|
||||
# set traec exporter_otlp_endpoint.
|
||||
"EXPORTER_OTLP_ENDPOINT":
|
||||
lambda: os.getenv("EXPORTER_OTLP_ENDPOINT"),
|
||||
|
||||
# set traec exporter_otlp_headers.
|
||||
"EXPORTER_OTLP_HEADERS":
|
||||
lambda: os.getenv("EXPORTER_OTLP_HEADERS"),
|
||||
}
|
||||
|
||||
|
||||
|
@@ -43,8 +43,7 @@ def import_custom_ops(package, module_name, global_ns):
|
||||
logger.warning(f"Failed to import op {func_name}: {e}")
|
||||
|
||||
except Exception:
|
||||
logger.warning(
|
||||
f"Ops of {package} import failed, it may be not compiled.")
|
||||
logger.warning(f"Ops of {package} import failed, it may be not compiled.")
|
||||
|
||||
preprocess_static_op(global_ns)
|
||||
|
||||
@@ -71,20 +70,24 @@ def wrap_unified_op(original_cpp_ext_op, original_custom_op):
|
||||
original_cpp_ext_op: Original C++ extension operator function.
|
||||
original_custom_op: Original custom operator function.
|
||||
"""
|
||||
try:
|
||||
|
||||
@paddle.jit.marker.unified
|
||||
@functools.wraps(original_custom_op)
|
||||
def unified_op(*args, **kwargs):
|
||||
if paddle.in_dynamic_mode():
|
||||
res = original_cpp_ext_op(*args, **kwargs)
|
||||
if res is None:
|
||||
return None
|
||||
# TODO(DrRyanHuang): Remove this if when we align the implementation of custom op and C++ extension
|
||||
if isinstance(res, list) and len(res) == 1:
|
||||
return res[0]
|
||||
return res
|
||||
return original_custom_op(*args, **kwargs)
|
||||
@paddle.jit.marker.unified
|
||||
@functools.wraps(original_custom_op)
|
||||
def unified_op(*args, **kwargs):
|
||||
if paddle.in_dynamic_mode():
|
||||
res = original_cpp_ext_op(*args, **kwargs)
|
||||
if res is None:
|
||||
return None
|
||||
# TODO(DrRyanHuang): Remove this if when we align the implementation of custom op and C++ extension
|
||||
if isinstance(res, list) and len(res) == 1:
|
||||
return res[0]
|
||||
return res
|
||||
return original_custom_op(*args, **kwargs)
|
||||
|
||||
except:
|
||||
unified_op = None
|
||||
logger.warning("Paddle version not support JIT mode.")
|
||||
return unified_op
|
||||
|
||||
|
||||
|
@@ -125,6 +125,8 @@ class ErnieProcessor(BaseDataProcessor):
|
||||
if request.get("temperature") < _SAMPLING_EPS:
|
||||
# zero temperature is equivalent to greedy sampling
|
||||
request.set("temperature", 1)
|
||||
if request.get("top_p") < _SAMPLING_EPS:
|
||||
request.set("top_p", _SAMPLING_EPS)
|
||||
data_processor_logger.info(f"Processed request {request}")
|
||||
return request
|
||||
|
||||
@@ -182,6 +184,8 @@ class ErnieProcessor(BaseDataProcessor):
|
||||
if request.get("temperature") < _SAMPLING_EPS:
|
||||
# zero temperature is equivalent to greedy sampling
|
||||
request["temperature"] = 1
|
||||
if request.get("top_p") < _SAMPLING_EPS:
|
||||
request["top_p"] = _SAMPLING_EPS
|
||||
data_processor_logger.info(f"Processed request {request}")
|
||||
|
||||
return request
|
||||
@@ -248,7 +252,7 @@ class ErnieProcessor(BaseDataProcessor):
|
||||
token_ids = response_dict["outputs"]["token_ids"]
|
||||
is_end = response_dict["finished"]
|
||||
req_id = response_dict["request_id"]
|
||||
if is_end and len(token_ids) > 0:
|
||||
if is_end and len(token_ids) > 0 and not kwargs.get("include_stop_str_in_output"):
|
||||
if token_ids[-1] == self.tokenizer.eos_token_id:
|
||||
token_ids = token_ids[:-1]
|
||||
delta_text, _, previous_texts = self.ids2tokens(token_ids, req_id)
|
||||
@@ -283,7 +287,7 @@ class ErnieProcessor(BaseDataProcessor):
|
||||
req_id = response_dict["request_id"]
|
||||
token_ids = response_dict["outputs"]["token_ids"]
|
||||
|
||||
if is_end and len(token_ids) > 0:
|
||||
if is_end and len(token_ids) > 0 and not kwargs.get("include_stop_str_in_output"):
|
||||
if token_ids[-1] == self.tokenizer.eos_token_id:
|
||||
token_ids = token_ids[:-1]
|
||||
delta_text, previous_token_ids, previous_texts = self.ids2tokens(
|
||||
|
@@ -77,6 +77,7 @@ class DataProcessor:
|
||||
|
||||
CLS_TOKEN = "<|begin_of_sentence|>"
|
||||
SEP_TOKEN = "<|end_of_sentence|>"
|
||||
EOS_TOKEN = "</s>"
|
||||
IMG_START = "<|IMAGE_START|>"
|
||||
IMG_END = "<|IMAGE_END|>"
|
||||
VID_START = "<|VIDEO_START|>"
|
||||
@@ -125,6 +126,7 @@ class DataProcessor:
|
||||
# Special tokens and IDs
|
||||
self.cls_token = self.CLS_TOKEN
|
||||
self.sep_token = self.SEP_TOKEN
|
||||
self.eos_token = self.EOS_TOKEN
|
||||
self.image_start = self.IMG_START
|
||||
self.image_end = self.IMG_END
|
||||
self.video_start = self.VID_START
|
||||
@@ -132,6 +134,9 @@ class DataProcessor:
|
||||
self.image_patch_id = self.tokenizer.convert_tokens_to_ids("<|IMAGE_PLACEHOLDER|>")
|
||||
self.image_start_id = self.tokenizer.convert_tokens_to_ids(self.image_start)
|
||||
self.video_start_id = self.tokenizer.convert_tokens_to_ids(self.video_start)
|
||||
self.sep_token_id = self.tokenizer.convert_tokens_to_ids(self.sep_token)
|
||||
self.eos_token_id = self.tokenizer.convert_tokens_to_ids(self.eos_token)
|
||||
|
||||
|
||||
self.token_type_mapping = self._build_token_type_mapping()
|
||||
self.is_training = True
|
||||
@@ -204,7 +209,7 @@ class DataProcessor:
|
||||
|
||||
return outputs
|
||||
|
||||
def request2ids(self, request: Dict[str, Any]) -> Dict[str, Union[np.ndarray, List[np.ndarray], None]]:
|
||||
def request2ids(self, request: Dict[str, Any],tgts: List[str]=None) -> Dict[str, Union[np.ndarray, List[np.ndarray], None]]:
|
||||
"""
|
||||
Convert chat messages into model inputs.
|
||||
Returns a dict with input_ids, token_type_ids, position_ids, images, grid_thw, image_type_ids, labels.
|
||||
@@ -258,6 +263,10 @@ class DataProcessor:
|
||||
self._add_video(frames, outputs)
|
||||
image_message_index += 1
|
||||
self._add_text(prompt_token_ids[image_start_index:], outputs)
|
||||
|
||||
if self.is_training:
|
||||
assert tgts, f"training must give tgt !"
|
||||
self._extract_labels(outputs,tgts)
|
||||
return outputs
|
||||
|
||||
def _add_special_token(self, token: Union[str, int], outputs: Dict) -> None:
|
||||
@@ -339,6 +348,26 @@ class DataProcessor:
|
||||
outputs["position_ids"].extend(pos_ids)
|
||||
outputs["cur_position"] = np.max(pos_ids) + 1
|
||||
|
||||
def _extract_labels(self, outputs: Dict, tgts: List[str]) -> None:
|
||||
input_ids = copy.deepcopy(outputs['input_ids'])
|
||||
labels = [self.tokenizer.ignored_index] * len(input_ids)
|
||||
|
||||
tgt_count=input_ids.count(self.sep_token_id)
|
||||
assert tgt_count==len(tgts),f'len(tgts) != len(src) {len(tgts)} vs {tgt_count}'
|
||||
|
||||
tgt_index=0
|
||||
for i,token_id in enumerate(input_ids):
|
||||
if token_id==self.sep_token_id:
|
||||
labels_token = self.tokenizer.tokenize(tgts[tgt_index])
|
||||
labels_token_id = self.tokenizer.convert_tokens_to_ids(labels_token)
|
||||
labels[i-len(labels_token_id):i]=labels_token_id
|
||||
labels[i] = self.eos_token_id #</s>
|
||||
tgt_index += 1
|
||||
|
||||
outputs['labels']=labels
|
||||
|
||||
|
||||
|
||||
def _load_and_process_video(self, url: str, item: Dict) -> List[Image.Image]:
|
||||
reader, meta, path = read_video_decord(url, save_to_disk=False)
|
||||
|
||||
|
@@ -17,6 +17,7 @@ from typing import Any, Dict, Optional
|
||||
|
||||
from fastdeploy.engine.config import ModelConfig
|
||||
from fastdeploy.reasoning import ReasoningParserManager
|
||||
from fastdeploy.config import ErnieArchitectures
|
||||
|
||||
|
||||
class InputPreprocessor:
|
||||
@@ -71,8 +72,7 @@ class InputPreprocessor:
|
||||
self.reasoning_parser)
|
||||
architectures = ModelConfig(self.model_name_or_path).architectures
|
||||
if not self.enable_mm:
|
||||
if "Ernie4_5_MoeForCausalLM" not in architectures \
|
||||
and "Ernie4_5_ForCausalLM" not in architectures:
|
||||
if not ErnieArchitectures.contains_ernie_arch(architectures):
|
||||
from fastdeploy.input.text_processor import DataProcessor
|
||||
self.processor = DataProcessor(
|
||||
model_name_or_path=self.model_name_or_path, reasoning_parser_obj=reasoning_parser_obj)
|
||||
|
@@ -258,6 +258,8 @@ class DataProcessor(BaseDataProcessor):
|
||||
if request.get("temperature") < _SAMPLING_EPS:
|
||||
# zero temperature is equivalent to greedy sampling
|
||||
request.set("temperature", 1)
|
||||
if request.get("top_p") < _SAMPLING_EPS:
|
||||
request.set("top_p", _SAMPLING_EPS)
|
||||
data_processor_logger.info(f"Processed request {request}")
|
||||
return request
|
||||
|
||||
@@ -306,6 +308,8 @@ class DataProcessor(BaseDataProcessor):
|
||||
if request.get("temperature") < _SAMPLING_EPS:
|
||||
# zero temperature is equivalent to greedy sampling
|
||||
request["temperature"] = 1
|
||||
if request.get("top_p") < _SAMPLING_EPS:
|
||||
request["top_p"] = _SAMPLING_EPS
|
||||
data_processor_logger.info(f"Processed request {request}")
|
||||
return request
|
||||
|
||||
@@ -355,7 +359,7 @@ class DataProcessor(BaseDataProcessor):
|
||||
token_ids = response_dict["outputs"]["token_ids"]
|
||||
is_end = response_dict["finished"]
|
||||
req_id = response_dict["request_id"]
|
||||
if is_end and len(token_ids) > 0:
|
||||
if is_end and len(token_ids) > 0 and not kwargs.get("include_stop_str_in_output"):
|
||||
if token_ids[-1] == self.tokenizer.eos_token_id:
|
||||
token_ids = token_ids[:-1]
|
||||
delta_text, _, previous_texts = self.ids2tokens(token_ids, req_id)
|
||||
@@ -390,7 +394,7 @@ class DataProcessor(BaseDataProcessor):
|
||||
req_id = response_dict["request_id"]
|
||||
token_ids = response_dict["outputs"]["token_ids"]
|
||||
|
||||
if is_end and len(token_ids) > 0:
|
||||
if is_end and len(token_ids) > 0 and not kwargs.get("include_stop_str_in_output"):
|
||||
if token_ids[-1] == self.tokenizer.eos_token_id:
|
||||
token_ids = token_ids[:-1]
|
||||
delta_text, previous_token_ids, previous_texts = self.ids2tokens(
|
||||
@@ -430,7 +434,7 @@ class DataProcessor(BaseDataProcessor):
|
||||
response_dict, enable_thinking=enable_thinking, **kwargs)
|
||||
else:
|
||||
return self.process_response_dict_normal(
|
||||
response_dict=response_dict, enable_thinking=enable_thinking)
|
||||
response_dict=response_dict, enable_thinking=enable_thinking, **kwargs)
|
||||
|
||||
def text2ids(self, text, max_model_len, raw_request=True):
|
||||
"""
|
||||
|
@@ -20,6 +20,7 @@ import threading
|
||||
import time
|
||||
|
||||
import zmq
|
||||
import msgpack
|
||||
|
||||
from fastdeploy import envs
|
||||
from fastdeploy.utils import llm_logger
|
||||
@@ -37,6 +38,7 @@ class ZmqClient:
|
||||
self.router_path = f"/dev/shm/router_{name}.ipc"
|
||||
|
||||
self.ZMQ_SNDHWM = int(envs.FD_ZMQ_SNDHWM)
|
||||
self.aggregate_send = envs.FD_USE_AGGREGATE_SEND
|
||||
|
||||
self.mutex = threading.Lock()
|
||||
self.req_dict = dict()
|
||||
@@ -93,6 +95,16 @@ class ZmqClient:
|
||||
"""
|
||||
return self.socket.recv_pyobj()
|
||||
|
||||
def pack_aggregated_data(self, data):
|
||||
"""
|
||||
Aggregate multiple responses into one and send them to the client.
|
||||
"""
|
||||
result = data[0]
|
||||
if len(data) > 1:
|
||||
for response in data[1:]:
|
||||
result.add(response)
|
||||
result = msgpack.packb([result.to_dict()])
|
||||
return result
|
||||
def send_multipart(self, req_id, data):
|
||||
"""
|
||||
Send a multipart message to the router socket.
|
||||
@@ -116,14 +128,22 @@ class ZmqClient:
|
||||
break
|
||||
|
||||
try:
|
||||
result = json.dumps(data.to_dict()).encode('utf-8')
|
||||
start_send = time.time()
|
||||
if self.aggregate_send:
|
||||
result = self.pack_aggregated_data(data)
|
||||
else:
|
||||
result = msgpack.packb([response.to_dict() for response in data])
|
||||
self.router.send_multipart([self.req_dict[req_id], b'', result])
|
||||
llm_logger.debug(f"send_multipart result: {req_id} len {len(data)} elapse: {time.time()-start_send}")
|
||||
|
||||
except Exception as e:
|
||||
llm_logger.error(f"Send result to zmq client failed: {e}")
|
||||
|
||||
if data.finished:
|
||||
if data[-1].finished:
|
||||
with self.mutex:
|
||||
self.req_dict.pop(data.request_id, None)
|
||||
self.req_dict.pop(req_id, None)
|
||||
llm_logger.info(f"send_multipart finished, req_id: {req_id}")
|
||||
|
||||
|
||||
def receive_json_once(self, block=False):
|
||||
"""
|
||||
|
@@ -1,15 +1,83 @@
|
||||
from opentelemetry.propagate import inject, extract
|
||||
from opentelemetry import trace
|
||||
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
|
||||
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||||
from fastapi import FastAPI
|
||||
from fastdeploy.utils import (llm_logger)
|
||||
from fastdeploy import envs
|
||||
import json
|
||||
import os
|
||||
|
||||
# create global OpenTelemetry tracer
|
||||
tracer = trace.get_tracer(__name__)
|
||||
|
||||
# OpenTelemetry Trace context store in metadata
|
||||
TRACE_CARRIER = "trace_carrier"
|
||||
|
||||
traces_enable = False
|
||||
tracer = trace.get_tracer(__name__)
|
||||
|
||||
def set_up():
|
||||
try:
|
||||
# when TRACES_ENABLED=true start trace
|
||||
global traces_enable
|
||||
traces_enable = envs.TRACES_ENABLE.lower() == "true"
|
||||
if not traces_enable:
|
||||
llm_logger.warning("Opentelemetry is DISABLED.")
|
||||
return
|
||||
|
||||
llm_logger.info("Opentelemetry is ENABLED, configuring...")
|
||||
# --- read env ---
|
||||
service_name = envs.FD_SERVICE_NAME
|
||||
host_name = envs.FD_HOST_NAME
|
||||
# --- set attributes (Service Name, Host Name, etc.) ---
|
||||
resource_attributes = {
|
||||
"service.name": service_name
|
||||
}
|
||||
if host_name:
|
||||
resource_attributes["host.name"] = host_name
|
||||
|
||||
resource = Resource(attributes=resource_attributes)
|
||||
|
||||
# --- set Exporter ---
|
||||
exporter_type = envs.TRACES_EXPORTER.lower()
|
||||
if exporter_type == "otlp":
|
||||
endpoint = envs.EXPORTER_OTLP_ENDPOINT # should be set
|
||||
headers = envs.EXPORTER_OTLP_HEADERS # e.g., "Authentication=***,k2=v2"
|
||||
|
||||
otlp_exporter = OTLPSpanExporter(
|
||||
endpoint=endpoint,
|
||||
headers=dict(item.split("=") for item in headers.split(",")) if headers else None
|
||||
)
|
||||
processor = BatchSpanProcessor(otlp_exporter)
|
||||
llm_logger.info(f"Using OTLP Exporter, sending to {endpoint} with headers {headers}")
|
||||
else: # default console
|
||||
processor = BatchSpanProcessor(ConsoleSpanExporter())
|
||||
llm_logger.info("Using Console Exporter.")
|
||||
|
||||
# --- set Tracer Provider ---
|
||||
provider = TracerProvider(resource=resource)
|
||||
provider.add_span_processor(processor)
|
||||
trace.set_tracer_provider(provider)
|
||||
global tracer
|
||||
tracer = trace.get_tracer(__name__)
|
||||
except:
|
||||
llm_logger.error("set_up failed")
|
||||
pass
|
||||
|
||||
def instrument(app: FastAPI):
|
||||
try:
|
||||
set_up()
|
||||
if traces_enable:
|
||||
llm_logger.info("Applying instrumentors...")
|
||||
FastAPIInstrumentor.instrument_app(app)
|
||||
except:
|
||||
llm_logger.info("instrument failed")
|
||||
pass
|
||||
|
||||
|
||||
|
||||
def inject_to_metadata(request, metadata_attr='metadata'):
|
||||
"""
|
||||
Inject OpenTelemetry trace context into the metadata field of the request.
|
||||
@@ -28,9 +96,7 @@ def inject_to_metadata(request, metadata_attr='metadata'):
|
||||
- If there is no metadata attribute in the request, an empty dict will be created for it as its attribute
|
||||
"""
|
||||
try:
|
||||
if request is None:
|
||||
return
|
||||
if is_opentelemetry_instrumented() == False:
|
||||
if request is None or traces_enable == False:
|
||||
return
|
||||
|
||||
metadata = request.get(metadata_attr) if isinstance(request, dict) else getattr(request, metadata_attr, None)
|
||||
@@ -48,6 +114,7 @@ def inject_to_metadata(request, metadata_attr='metadata'):
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def extract_from_metadata(request, metadata_attr='metadata'):
|
||||
"""
|
||||
Extract trace context from metadata of request object (dict or class instance).
|
||||
@@ -74,7 +141,7 @@ def extract_from_metadata(request, metadata_attr='metadata'):
|
||||
return ctx
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
|
||||
def extract_from_request(request):
|
||||
"""
|
||||
@@ -100,12 +167,13 @@ def extract_from_request(request):
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
def start_span(span_name, request, kind=trace.SpanKind.CLIENT):
|
||||
"""
|
||||
just start a new span in request trace context
|
||||
"""
|
||||
try:
|
||||
if is_opentelemetry_instrumented() == False:
|
||||
if not traces_enable:
|
||||
return
|
||||
# extract Trace context from request.metadata.trace_carrier
|
||||
ctx = extract_from_metadata(request)
|
||||
@@ -114,31 +182,17 @@ def start_span(span_name, request, kind=trace.SpanKind.CLIENT):
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
def start_span_request(span_name, request, kind=trace.SpanKind.CLIENT):
|
||||
"""
|
||||
just start a new span in request trace context
|
||||
"""
|
||||
try:
|
||||
if is_opentelemetry_instrumented() == False:
|
||||
if not traces_enable:
|
||||
return
|
||||
# extract Trace context from request.metadata.trace_carrier
|
||||
ctx = extract_from_request(request)
|
||||
with tracer.start_as_current_span(span_name, context=ctx, kind=kind) as span:
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
||||
def is_opentelemetry_instrumented() -> bool:
|
||||
"""
|
||||
check OpenTelemetry is start or not
|
||||
"""
|
||||
try:
|
||||
return (
|
||||
os.getenv("OTEL_PYTHONE_DISABLED_INSTRUMENTATIONS") is not None
|
||||
or os.getenv("OTEL_SERVICE_NAME") is not None
|
||||
or os.getenv("OTEL_TRACES_EXPORTER") is not None
|
||||
)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
pass
|
@@ -17,7 +17,7 @@
|
||||
import os
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
from fastdeploy.config import FDConfig
|
||||
from fastdeploy.config import FDConfig, ErnieArchitectures
|
||||
from fastdeploy.engine.request import Request
|
||||
from fastdeploy.utils import llm_logger
|
||||
|
||||
@@ -268,8 +268,7 @@ class BackendBase:
|
||||
"""
|
||||
try:
|
||||
architectures = self.fd_config.model_config.architectures
|
||||
if "Ernie4_5_MoeForCausalLM" not in architectures \
|
||||
and "Ernie4_5_ForCausalLM" not in architectures:
|
||||
if not ErnieArchitectures.contains_ernie_arch(architectures):
|
||||
|
||||
from transformers import AutoTokenizer, PreTrainedTokenizerFast
|
||||
tokenizer = AutoTokenizer.from_pretrained(
|
||||
|
@@ -58,7 +58,6 @@ class VocabParallelEmbedding(nn.Layer):
|
||||
self.column_cut = False
|
||||
self.world_size: int = hcg.get_model_parallel_world_size()
|
||||
self.ring_id: int = hcg.get_model_parallel_group().id
|
||||
self.use_rope: bool = fd_config.model_config.use_rope
|
||||
self.use_ep: bool = fd_config.parallel_config.use_ep
|
||||
self.hidden_dropout_prob: float = fd_config.model_config.hidden_dropout_prob
|
||||
self.initializer_range: float = fd_config.model_config.initializer_range
|
||||
@@ -92,14 +91,6 @@ class VocabParallelEmbedding(nn.Layer):
|
||||
self.embeddings.weight.is_distributed = True
|
||||
self.embeddings.weight.split_axis = 1
|
||||
|
||||
if not self.use_rope:
|
||||
self.position_embeddings = nn.Embedding(
|
||||
self.max_position_embeddings,
|
||||
embedding_dim,
|
||||
weight_attr=paddle.ParamAttr(initializer=nn.initializer.Normal(
|
||||
mean=0.0, std=self.initializer_range), ),
|
||||
)
|
||||
|
||||
self.prefix = prefix
|
||||
self.dropout = nn.Dropout(self.hidden_dropout_prob)
|
||||
|
||||
|
@@ -1,217 +0,0 @@
|
||||
"""
|
||||
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
|
||||
import paddle
|
||||
import paddle.nn.functional as F
|
||||
from paddle import nn
|
||||
from paddle.distributed import fleet
|
||||
from paddle.distributed.fleet.meta_parallel import (ColumnParallelLinear,
|
||||
VocabParallelEmbedding)
|
||||
from paddleformers.utils.log import logger
|
||||
|
||||
from .utils import get_tensor
|
||||
|
||||
|
||||
class ResBlock(nn.Layer):
|
||||
"""
|
||||
A Residual Block module.
|
||||
|
||||
This module performs a linear transformation followed by a SiLU activation,
|
||||
and then adds the result to the original input, creating a residual connection.
|
||||
|
||||
Args:
|
||||
hidden_size (int): The size of the hidden layers in the block.
|
||||
"""
|
||||
|
||||
def __init__(self, hidden_size, num_condition=0):
|
||||
super().__init__()
|
||||
self.linear = nn.Linear(hidden_size * (num_condition + 1), hidden_size)
|
||||
if num_condition > 0:
|
||||
self.res_connection = nn.Linear(
|
||||
hidden_size * (num_condition + 1), hidden_size
|
||||
)
|
||||
else:
|
||||
self.res_connection = nn.Identity()
|
||||
# Initialize as an identity mapping
|
||||
# _no_grad_fill_(self.linear.weight, 0)
|
||||
# Use SiLU activation to keep consistent with the Llama model
|
||||
self.act = nn.Silu()
|
||||
|
||||
@paddle.no_grad()
|
||||
def forward(self, x):
|
||||
"""
|
||||
Forward pass of the ResBlock.
|
||||
|
||||
Args:
|
||||
x (paddle.Tensor): Input tensor.
|
||||
|
||||
Returns:
|
||||
paddle.Tensor: Output after the residual connection and activation.
|
||||
"""
|
||||
return self.res_connection(x) + self.act(self.linear(x))
|
||||
|
||||
|
||||
class HydraHead(nn.Layer):
|
||||
"""
|
||||
A Hydra Head module.
|
||||
|
||||
This module performs multi hydra head layers,
|
||||
each of which is a hydra_lm_head followed by a head
|
||||
|
||||
Args:
|
||||
hydra_num_heads (int): The number of hyhra heads.
|
||||
hydra_num_layers (int): The number of layers.
|
||||
hidden_size (int): The size of the hidden layers in the block.
|
||||
tensor_parallel_degree(int): TP degree.
|
||||
vocab_size (int): The size of vocabulary.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hydra_num_heads,
|
||||
hydra_num_layers,
|
||||
hidden_size,
|
||||
tensor_parallel_degree,
|
||||
vocab_size,
|
||||
):
|
||||
super().__init__()
|
||||
self.hydra_num_heads = hydra_num_heads
|
||||
self.hydra_num_layers = hydra_num_layers
|
||||
self.hidden_size = hidden_size
|
||||
self.tensor_parallel_degree = tensor_parallel_degree
|
||||
self.vocab_size = vocab_size
|
||||
|
||||
self.hydra_mlp = nn.LayerList(
|
||||
[
|
||||
nn.Sequential(
|
||||
ResBlock(self.hidden_size, hydra_head_idx + 1),
|
||||
*([ResBlock(self.hidden_size)] * (self.hydra_num_layers - 1)),
|
||||
)
|
||||
for hydra_head_idx in range(self.hydra_num_heads)
|
||||
]
|
||||
)
|
||||
|
||||
if self.tensor_parallel_degree > 1:
|
||||
self.hydra_lm_head = nn.LayerList(
|
||||
[
|
||||
ColumnParallelLinear(
|
||||
self.hidden_size,
|
||||
self.vocab_size,
|
||||
weight_attr=paddle.ParamAttr(
|
||||
initializer=nn.initializer.Normal(mean=0.0, std=0.0)
|
||||
),
|
||||
gather_output=True,
|
||||
has_bias=False,
|
||||
)
|
||||
for _ in range(self.hydra_num_heads)
|
||||
]
|
||||
)
|
||||
else:
|
||||
self.hydra_lm_head = nn.LayerList(
|
||||
[
|
||||
nn.Linear(self.hidden_size, self.vocab_size, bias_attr=False)
|
||||
for _ in range(self.hydra_num_heads)
|
||||
]
|
||||
)
|
||||
|
||||
self.embeddings = VocabParallelEmbedding(
|
||||
vocab_size,
|
||||
hidden_size,
|
||||
mp_group=fleet.get_hybrid_communicate_group().get_model_parallel_group(),
|
||||
weight_attr=paddle.ParamAttr(initializer=nn.initializer.Normal(mean=0.0)),
|
||||
)
|
||||
|
||||
def custom_set_state_dict(self, state_dict):
|
||||
"""
|
||||
Load Parameter of Hydra Head from state_dict with custom names.
|
||||
|
||||
Args:
|
||||
state_dict (dict): KV pair of name and parameters.
|
||||
"""
|
||||
for hydra_head_idx in range(self.hydra_num_heads):
|
||||
self.hydra_mlp[hydra_head_idx][0].res_connection.weight.set_value(
|
||||
get_tensor(
|
||||
state_dict.pop(f"0.{hydra_head_idx}.0.res_connection.weight")
|
||||
)
|
||||
)
|
||||
self.hydra_mlp[hydra_head_idx][0].res_connection.bias.set_value(
|
||||
get_tensor(state_dict.pop(f"0.{hydra_head_idx}.0.res_connection.bias"))
|
||||
)
|
||||
|
||||
for layer_idx in range(self.hydra_num_layers):
|
||||
self.hydra_mlp[hydra_head_idx][layer_idx].linear.weight.set_value(
|
||||
get_tensor(
|
||||
state_dict.pop(f"0.{hydra_head_idx}.{layer_idx}.linear.weight")
|
||||
)
|
||||
)
|
||||
self.hydra_mlp[hydra_head_idx][layer_idx].linear.bias.set_value(
|
||||
get_tensor(
|
||||
state_dict.pop(f"0.{hydra_head_idx}.{layer_idx}.linear.bias")
|
||||
)
|
||||
)
|
||||
|
||||
self.hydra_lm_head[hydra_head_idx].weight.set_value(
|
||||
get_tensor(state_dict.pop(f"1.{hydra_head_idx}.weight"))
|
||||
)
|
||||
|
||||
self.embeddings.weight.set_value(
|
||||
get_tensor(state_dict.pop("embeddings.weight"))
|
||||
)
|
||||
|
||||
def set_state_dict(self, state_dict):
|
||||
"""
|
||||
Load Parameter of Hydra Head from state_dict.
|
||||
|
||||
Args:
|
||||
state_dict (dict): KV pair of name and parameters.
|
||||
"""
|
||||
is_custom = True
|
||||
for key in state_dict.keys():
|
||||
if key != "embeddings.weight" and (
|
||||
"hydra_mlp" in key or "hydra_head" in key
|
||||
):
|
||||
is_custom = False
|
||||
break
|
||||
|
||||
if is_custom:
|
||||
logger.info("Hydra use custom set_state_dict")
|
||||
self.custom_set_state_dict(state_dict)
|
||||
else:
|
||||
logger.info("Hydra use default set_state_dict")
|
||||
super().set_state_dict(state_dict)
|
||||
|
||||
@paddle.no_grad()
|
||||
def forward(self, input_ids, hidden_states, next_tokens):
|
||||
"""
|
||||
Forward pass of Hydra Head
|
||||
|
||||
Args:
|
||||
input_ids: [batch_size, 1] The tokens sampled by the previous head go through the embedding,
|
||||
starting with the last accept token
|
||||
hidden_states: [batch_size, hidden_size] The hidden_states of the last accept_tokens
|
||||
"""
|
||||
hydra_inputs = [hidden_states]
|
||||
input_embeds = self.embeddings(input_ids)
|
||||
for hydra_head_idx in range(self.hydra_num_heads):
|
||||
hydra_inputs.append(input_embeds)
|
||||
head_input = paddle.concat(hydra_inputs, axis=-1)
|
||||
hidden_states = self.hydra_mlp[hydra_head_idx](head_input)
|
||||
logits = self.hydra_lm_head[hydra_head_idx](hidden_states)
|
||||
probs = F.softmax(logits)
|
||||
_, topk_tokens = paddle.topk(probs, k=1, axis=-1)
|
||||
next_tokens[:, 1 + hydra_head_idx : 2 + hydra_head_idx] = topk_tokens[:]
|
||||
|
||||
input_embeds = self.embeddings(next_tokens[:, 1 + hydra_head_idx])
|
@@ -43,3 +43,5 @@ class SamplingMetadata:
|
||||
top_p: paddle.Tensor
|
||||
top_k: Optional[paddle.Tensor] = None
|
||||
max_num_logprobs: Optional[int] = None
|
||||
prompt_ids: Optional[paddle.Tensor] = None
|
||||
prompt_lens: Optional[paddle.Tensor] = None
|
||||
|
@@ -21,6 +21,8 @@ from fastdeploy.platforms import current_platform
|
||||
|
||||
def apply_penalty_multi_scores(
|
||||
pre_token_ids: paddle.Tensor,
|
||||
prompt_ids: paddle.Tensor,
|
||||
prompt_lens: paddle.Tensor,
|
||||
logits: paddle.Tensor,
|
||||
repetition_penalties: paddle.Tensor,
|
||||
frequency_penalties: paddle.Tensor,
|
||||
@@ -39,6 +41,8 @@ def apply_penalty_multi_scores(
|
||||
get_token_penalty_multi_scores
|
||||
logits = get_token_penalty_multi_scores(
|
||||
pre_token_ids,
|
||||
prompt_ids,
|
||||
prompt_lens,
|
||||
logits,
|
||||
repetition_penalties,
|
||||
frequency_penalties,
|
||||
@@ -69,6 +73,8 @@ def apply_penalty_multi_scores(
|
||||
get_token_penalty_multi_scores
|
||||
logits = get_token_penalty_multi_scores(
|
||||
pre_token_ids,
|
||||
prompt_ids,
|
||||
prompt_lens,
|
||||
logits,
|
||||
repetition_penalties,
|
||||
frequency_penalties,
|
||||
|
@@ -253,6 +253,8 @@ class Sampler(nn.Layer):
|
||||
|
||||
logits = apply_penalty_multi_scores(
|
||||
sampling_metadata.pre_token_ids,
|
||||
sampling_metadata.prompt_ids,
|
||||
sampling_metadata.prompt_lens,
|
||||
logits,
|
||||
sampling_metadata.repetition_penalties,
|
||||
sampling_metadata.frequency_penalties,
|
||||
|
@@ -606,8 +606,8 @@ class Ernie4_5_PretrainedModel(PretrainedModel):
|
||||
return final_actions
|
||||
mappings = get_tensor_parallel_split_mappings(
|
||||
config.num_hidden_layers,
|
||||
config.moe_num_experts,
|
||||
config.moe_layer_start_index,
|
||||
getattr(config, "moe_num_experts", 0),
|
||||
getattr(config, "moe_layer_start_index", -1),
|
||||
config.prefix_name,
|
||||
)
|
||||
return mappings
|
||||
|
@@ -161,7 +161,7 @@ class Ernie4_5_VLMoE(nn.Layer):
|
||||
|
||||
self.num_shared_experts = fd_config.model_config.moe_num_shared_experts
|
||||
if self.num_shared_experts > 0:
|
||||
self.share_experts = Ernie4_5_VLMLP(
|
||||
self.shared_experts = Ernie4_5_VLMLP(
|
||||
fd_config=fd_config,
|
||||
intermediate_size=self.num_shared_experts *
|
||||
fd_config.model_config.moe_intermediate_size[0],
|
||||
@@ -193,11 +193,11 @@ class Ernie4_5_VLMoE(nn.Layer):
|
||||
if self.text_fused_moe.moe_use_gate_correction_bias:
|
||||
state_dict.pop(self.text_fused_moe.gate_correction_bias_key)
|
||||
if self.num_shared_experts > 0:
|
||||
self.share_experts.load_state_dict(state_dict)
|
||||
self.shared_experts.load_state_dict(state_dict)
|
||||
|
||||
def forward(self, hidden_states: paddle.Tensor, vl_moe_meta: VLMoEMeta):
|
||||
if self.num_shared_experts > 0:
|
||||
share_experts_out = self.share_experts(hidden_states)
|
||||
shared_experts_out = self.shared_experts(hidden_states)
|
||||
if vl_moe_meta.image_input is not None:
|
||||
text_image_gather_scatter(
|
||||
hidden_states,
|
||||
@@ -222,7 +222,7 @@ class Ernie4_5_VLMoE(nn.Layer):
|
||||
else:
|
||||
hidden_states = self.text_fused_moe(hidden_states)
|
||||
if self.num_shared_experts > 0:
|
||||
hidden_states += share_experts_out
|
||||
hidden_states += shared_experts_out
|
||||
if self.tp_size > 1:
|
||||
tensor_model_parallel_all_reduce(hidden_states)
|
||||
return hidden_states
|
||||
@@ -759,4 +759,4 @@ class Ernie4_5_VLPretrainedModel(PretrainedModel):
|
||||
config.vision_config.get("depth")
|
||||
)
|
||||
|
||||
return {**mappings, **vision_mappings}
|
||||
return {**mappings, **vision_mappings}
|
@@ -505,8 +505,6 @@ class TokenProcessor(object):
|
||||
result.outputs.token_ids.append(token_id)
|
||||
if token_id in task.eos_token_ids or is_prefill or recovery_stop:
|
||||
result.finished = True
|
||||
result.prompt = task.prompt
|
||||
result.prompt_token_ids = task.prompt_token_ids
|
||||
if recovery_stop:
|
||||
result.error_msg = "Recover is not supported, the result is incomplete!"
|
||||
llm_logger.info(
|
||||
|
@@ -51,12 +51,14 @@ class RolloutModelConfig:
|
||||
enable_prefix_caching: bool = False,
|
||||
splitwise_role: str = "mixed",
|
||||
expert_parallel_size: int = 1,
|
||||
enable_expert_parallell: bool = False,
|
||||
enable_expert_parallel: bool = False,
|
||||
ori_vocab_size: int = None,
|
||||
quantization: str = "None",
|
||||
guided_decoding_backend: str = "off",
|
||||
disable_any_whitespace: bool = True,
|
||||
enable_logprob: bool = False,
|
||||
graph_optimization_config: str = None,
|
||||
local_rank: int = 0
|
||||
):
|
||||
# Required parameters
|
||||
self.model_name_or_path = model_name_or_path
|
||||
@@ -90,16 +92,18 @@ class RolloutModelConfig:
|
||||
self.enable_prefix_caching = enable_prefix_caching
|
||||
self.splitwise_role = splitwise_role
|
||||
self.expert_parallel_size = expert_parallel_size
|
||||
self.enable_expert_parallell = enable_expert_parallell
|
||||
self.enable_expert_parallel = enable_expert_parallel
|
||||
self.ori_vocab_size = ori_vocab_size
|
||||
self.quantization = quantization
|
||||
self.guided_decoding_backend = guided_decoding_backend
|
||||
self.disable_any_whitespace = disable_any_whitespace
|
||||
self.enable_logprob = enable_logprob
|
||||
self.graph_optimization_config = graph_optimization_config
|
||||
self.local_rank = local_rank
|
||||
|
||||
def __str__(self):
|
||||
return "\n".join(f"{k}: {v}" for k, v in self.__dict__.items())
|
||||
|
||||
def initialize(self):
|
||||
"""Initialize the final fd config"""
|
||||
return initialize_fd_config(self, ranks=self.tensor_parallel_size, local_rank=0)
|
||||
return initialize_fd_config(self, ranks=self.tensor_parallel_size, local_rank=self.local_rank)
|
||||
|
@@ -23,15 +23,14 @@ from paddleformers.utils.log import logger
|
||||
from fastdeploy.config import FDConfig
|
||||
from fastdeploy.model_executor.model_loader import ModelRegistry
|
||||
from fastdeploy.model_executor.models.ernie4_5_moe import \
|
||||
Ernie4_5_MoeForCausalLM
|
||||
Ernie4_5_MoeForCausalLM, Ernie4_5_PretrainedModel
|
||||
from fastdeploy.model_executor.models.ernie4_5_vl.ernie4_5_vl_moe import \
|
||||
Ernie4_5_VLMoeForConditionalGeneration
|
||||
from fastdeploy.model_executor.models.qwen2 import Qwen2ForCausalLM
|
||||
from fastdeploy.model_executor.models.qwen3 import Qwen3ForCausalLM
|
||||
from fastdeploy.model_executor.models.qwen3moe import Qwen3MoeForCausalLM
|
||||
Ernie4_5_VLMoeForConditionalGeneration, Ernie4_5_VLPretrainedModel
|
||||
from fastdeploy.model_executor.models.qwen2 import Qwen2ForCausalLM, Qwen2PretrainedModel
|
||||
from fastdeploy.model_executor.models.qwen3 import Qwen3ForCausalLM, Qwen3PretrainedModel
|
||||
from fastdeploy.model_executor.models.qwen3moe import Qwen3MoeForCausalLM, Qwen3MoePretrainedModel
|
||||
from fastdeploy.rl.rollout_config import RolloutModelConfig
|
||||
|
||||
|
||||
class RolloutModel(nn.Layer):
|
||||
"""Main model class for rollout operations, supports multimodal components for train."""
|
||||
|
||||
@@ -39,21 +38,25 @@ class RolloutModel(nn.Layer):
|
||||
"""Initialize with FastDeploy configuration."""
|
||||
super(RolloutModel, self).__init__()
|
||||
self.fd_config = rollout_model_config.initialize()
|
||||
self._init_model()
|
||||
self.rollout_model = self._init_model()
|
||||
|
||||
def _init_model(self):
|
||||
def _init_model(self) -> nn.Layer:
|
||||
"""Load model from loader based on config."""
|
||||
context = paddle.LazyGuard()
|
||||
architectures = f"{self.fd_config.model_config.architectures[0]}RL"
|
||||
with context:
|
||||
model_cls = ModelRegistry.get_class(architectures)
|
||||
model = model_cls(self.fd_config)
|
||||
model.eval()
|
||||
return model
|
||||
|
||||
self.rollout_model = model.eval()
|
||||
|
||||
def get_name_mappings_to_training(self) -> Dict[str, str]:
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Get parameter name mappings between rollout and training models."""
|
||||
return getattr(self.rollout_model, "get_name_mappings_to_training", lambda: {})()
|
||||
return getattr(self.rollout_model, "get_name_mappings_to_training", lambda: {})(trainer_degree)
|
||||
|
||||
def get_quantization_infer_keys(self) -> Dict[str, str]:
|
||||
"""Get parameter name mappings between rollout and training models."""
|
||||
return getattr(self.rollout_model, "get_quantization_infer_keys", lambda: {})()
|
||||
|
||||
@paddle.no_grad()
|
||||
def state_dict(self):
|
||||
@@ -61,10 +64,51 @@ class RolloutModel(nn.Layer):
|
||||
return self.rollout_model.state_dict()
|
||||
|
||||
|
||||
class Ernie4_5_MoeForCausalLMRL(Ernie4_5_MoeForCausalLM):
|
||||
class BaseRLModel(nn.Layer):
|
||||
"""Base class for RL models with common functionality"""
|
||||
def __init__(self,):
|
||||
super(BaseRLModel, self).__init__()
|
||||
self.infer_to_train_mapping = {}
|
||||
self.fd_config = None
|
||||
|
||||
@classmethod
|
||||
def name(cls) -> str:
|
||||
return cls.__name__
|
||||
|
||||
def _update_base_mappings(self, base_name: str) -> None:
|
||||
"""Common static mappings"""
|
||||
static_mappings = {
|
||||
f"{base_name}.embed_tokens.embeddings.weight": f"{base_name}.embed_tokens.weight",
|
||||
"lm_head.linear.weight": "lm_head.weight"
|
||||
}
|
||||
self.infer_to_train_mapping.update(static_mappings)
|
||||
|
||||
def _complete_missing_mappings(self) -> None:
|
||||
"""
|
||||
Complete the mapping dictionary with keys that have identical names in inference and training.
|
||||
"""
|
||||
for key in self.state_dict().keys():
|
||||
if key not in self.infer_to_train_mapping and "_scale" not in key:
|
||||
# Skip weight scale parameters in mapping. Train and infer have same key.
|
||||
self.infer_to_train_mapping[key] = key
|
||||
|
||||
def get_quantization_infer_keys(self) -> list[str]:
|
||||
"""Get quantization infer keys"""
|
||||
quant_weight_key = []
|
||||
if self.fd_config.quant_config.name() == "wint8":
|
||||
""" RL only support weight_only_int8 now"""
|
||||
for key in self.state_dict().keys():
|
||||
if "scale" in key:
|
||||
quant_weight_key.append(key.replace(".weight_scale", ".weight"))
|
||||
else:
|
||||
raise ValueError("Only 'wint8' quantization is supported in RL roullout.")
|
||||
return quant_weight_key
|
||||
|
||||
class Ernie4_5_MoeForCausalLMRL(Ernie4_5_MoeForCausalLM, BaseRLModel):
|
||||
"""
|
||||
Ernie4_5_MoeForCausalLMRL
|
||||
"""
|
||||
_get_tensor_parallel_mappings = Ernie4_5_PretrainedModel._get_tensor_parallel_mappings
|
||||
|
||||
def __init__(self, fd_config: FDConfig):
|
||||
"""
|
||||
@@ -74,42 +118,27 @@ class Ernie4_5_MoeForCausalLMRL(Ernie4_5_MoeForCausalLM):
|
||||
super(Ernie4_5_MoeForCausalLMRL, self).__init__(fd_config)
|
||||
|
||||
@classmethod
|
||||
def name(self):
|
||||
def name(self) -> str:
|
||||
"""name"""
|
||||
return "Ernie4_5_MoeForCausalLMRL"
|
||||
|
||||
def get_name_mappings_to_training(self):
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
|
||||
have_bias = self.fd_config.model_config.get("have_norm_bias", False)
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"] + (["bias"] if have_bias else [])
|
||||
place_holders = ["weight"]
|
||||
|
||||
# Initialize mapping dictionary
|
||||
infer_to_train = {}
|
||||
|
||||
base_name = "ernie"
|
||||
# Static mappings (non-layer specific)
|
||||
static_mappings = {
|
||||
f"{base_name}.embed_tokens.embeddings.weight":
|
||||
f"{base_name}.embed_tokens.weight",
|
||||
"lm_head.linear.weight": "lm_head.weight"
|
||||
}
|
||||
if self.fd_config.model_config.get("tie_word_embeddings", False):
|
||||
# Support tie_word_embeddings
|
||||
logger.debug("enable tie_word_embeddings")
|
||||
static_mappings.pop("lm_head.linear.weight")
|
||||
infer_to_train.update(static_mappings)
|
||||
|
||||
base_name = base_name + ".layers"
|
||||
self._update_base_mappings("ernie")
|
||||
|
||||
base_name = "ernie.layers"
|
||||
# Helper function to add layer mappings
|
||||
def _add_layer_mappings(layer_idx: int):
|
||||
# MoE specific mappings
|
||||
infer_to_train[f"{base_name}.{layer_idx}.mlp.fused_moe.gate_weight"] = \
|
||||
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.fused_moe.gate_weight"] = \
|
||||
f"{base_name}.{layer_idx}.mlp.gate.weight"
|
||||
|
||||
if self.fd_config.model_config.moe_use_aux_free:
|
||||
infer_to_train[f"{base_name}.{layer_idx}.mlp.fused_moe.gate_correction_bias"] = \
|
||||
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.fused_moe.gate_correction_bias"] = \
|
||||
f"{base_name}.{layer_idx}.mlp.moe_statics.e_score_correction_bias"
|
||||
|
||||
# MoE experts mappings
|
||||
@@ -117,17 +146,17 @@ class Ernie4_5_MoeForCausalLMRL(Ernie4_5_MoeForCausalLM):
|
||||
for ph in place_holders:
|
||||
# up_gate_proj (up_gate_proj)
|
||||
up_gate_proj_key = f"{base_name}.{layer_idx}.mlp.fused_moe.up_gate_proj_weight"
|
||||
if up_gate_proj_key not in infer_to_train:
|
||||
infer_to_train[up_gate_proj_key] = []
|
||||
infer_to_train[up_gate_proj_key].append(
|
||||
if up_gate_proj_key not in self.infer_to_train_mapping:
|
||||
self.infer_to_train_mapping[up_gate_proj_key] = []
|
||||
self.infer_to_train_mapping[up_gate_proj_key].append(
|
||||
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.up_gate_proj.{ph}"
|
||||
)
|
||||
|
||||
# down_proj (down_proj)
|
||||
down_proj_key = f"{base_name}.{layer_idx}.mlp.fused_moe.down_proj_weight"
|
||||
if down_proj_key not in infer_to_train:
|
||||
infer_to_train[down_proj_key] = []
|
||||
infer_to_train[down_proj_key].append(
|
||||
if down_proj_key not in self.infer_to_train_mapping:
|
||||
self.infer_to_train_mapping[down_proj_key] = []
|
||||
self.infer_to_train_mapping[down_proj_key].append(
|
||||
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.down_proj.{ph}"
|
||||
)
|
||||
|
||||
@@ -137,13 +166,16 @@ class Ernie4_5_MoeForCausalLMRL(Ernie4_5_MoeForCausalLM):
|
||||
self.fd_config.model_config.num_hidden_layers):
|
||||
_add_layer_mappings(layer_idx)
|
||||
|
||||
return infer_to_train
|
||||
self._complete_missing_mappings()
|
||||
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
|
||||
class Ernie4_5_VLMoeForConditionalGenerationRL(Ernie4_5_VLMoeForConditionalGeneration):
|
||||
class Ernie4_5_VLMoeForConditionalGenerationRL(Ernie4_5_VLMoeForConditionalGeneration, BaseRLModel):
|
||||
"""
|
||||
Ernie4_5_VLMoeForConditionalGenerationRL
|
||||
"""
|
||||
_get_tensor_parallel_mappings = Ernie4_5_VLPretrainedModel._get_tensor_parallel_mappings
|
||||
|
||||
def __init__(self, fd_config: FDConfig):
|
||||
"""
|
||||
@@ -153,69 +185,51 @@ class Ernie4_5_VLMoeForConditionalGenerationRL(Ernie4_5_VLMoeForConditionalGener
|
||||
super(Ernie4_5_VLMoeForConditionalGenerationRL, self).__init__(fd_config)
|
||||
|
||||
@classmethod
|
||||
def name(self):
|
||||
def name(self) -> str:
|
||||
"""name"""
|
||||
return "Ernie4_5_VLMoeForConditionalGenerationRL"
|
||||
|
||||
def get_name_mappings_to_training(self):
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
|
||||
have_bias = self.fd_config.model_config.get("have_norm_bias", False)
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"] + (["bias"] if have_bias else [])
|
||||
place_holders = ["weight"]
|
||||
|
||||
# Initialize mapping dictionary
|
||||
infer_to_train = {}
|
||||
self._update_base_mappings("ernie")
|
||||
|
||||
base_name = "ernie"
|
||||
# Static mappings (non-layer specific)
|
||||
static_mappings = {
|
||||
f"{base_name}.embed_tokens.embeddings.weight":
|
||||
f"{base_name}.embed_tokens.weight",
|
||||
"lm_head.linear.weight": "lm_head.weight"
|
||||
}
|
||||
if self.fd_config.model_config.get("tie_word_embeddings", False):
|
||||
# Support tie_word_embeddings
|
||||
logger.debug("enable tie_word_embeddings")
|
||||
static_mappings.pop("lm_head.linear.weight")
|
||||
infer_to_train.update(static_mappings)
|
||||
|
||||
base_name = base_name + ".layers"
|
||||
base_name = "ernie.layers"
|
||||
|
||||
# Helper function to add layer mappings
|
||||
def _add_layer_mappings(layer_idx: int, moe_tag: str):
|
||||
def _add_expert_mappings(layer_idx: int, moe_tag: str, expert_start: int):
|
||||
# MoE specific mappings
|
||||
infer_to_train[f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.gate_weight"] = f"{base_name}.{layer_idx}.mlp.gate.weight" if moe_tag == "text" else f"{base_name}.{layer_idx}.mlp.gate.weight_1"
|
||||
gate_suffix = "" if moe_tag == "text" else "_1"
|
||||
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.gate_weight"] = \
|
||||
f"{base_name}.{layer_idx}.mlp.gate.weight{gate_suffix}"
|
||||
|
||||
if self.fd_config.model_config.moe_use_aux_free:
|
||||
infer_to_train[f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.gate_correction_bias"] = \
|
||||
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.gate_correction_bias"] = \
|
||||
f"{base_name}.{layer_idx}.mlp.moe_statics.e_score_correction_bias"
|
||||
|
||||
# MoE experts mappings
|
||||
assert isinstance(self.fd_config.model_config.moe_num_experts, list)
|
||||
if moe_tag == "text":
|
||||
expert_idx_start = 0
|
||||
expert_idx_end = self.fd_config.model_config.moe_num_experts[0]
|
||||
else:
|
||||
expert_idx_start = self.fd_config.model_config.moe_num_experts[0]
|
||||
expert_idx_end = self.fd_config.model_config.moe_num_experts[1]
|
||||
|
||||
for expert_idx in range(expert_idx_start, expert_idx_end):
|
||||
# Initialize defaultdict for expert weights
|
||||
from collections import defaultdict
|
||||
from itertools import chain
|
||||
|
||||
def _generate_ranges(start, end, step=16, take=8):
|
||||
"""生成 [start, start+take), [start+step, start+step+take), ... 直到 end"""
|
||||
return chain(
|
||||
*(range(i, min(i + take, end)) # 防止越界
|
||||
for i in range(start, end, step)))
|
||||
|
||||
expert_mappings = defaultdict(list)
|
||||
for expert_idx in _generate_ranges(expert_start, total_moe_num, expert_num_per_rank * 2, expert_num_per_rank):
|
||||
for ph in place_holders:
|
||||
# up_gate_proj (up_gate_proj)
|
||||
up_gate_proj_key = f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.up_gate_proj_weight"
|
||||
if up_gate_proj_key not in infer_to_train:
|
||||
infer_to_train[up_gate_proj_key] = []
|
||||
infer_to_train[up_gate_proj_key].append(
|
||||
expert_mappings[f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.up_gate_proj_weight"].append(
|
||||
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.up_gate_proj.{ph}"
|
||||
)
|
||||
|
||||
# down_proj (down_proj)
|
||||
down_proj_key = f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.down_proj_weight"
|
||||
if down_proj_key not in infer_to_train:
|
||||
infer_to_train[down_proj_key] = []
|
||||
infer_to_train[down_proj_key].append(
|
||||
expert_mappings[f"{base_name}.{layer_idx}.mlp.{moe_tag}_fused_moe.down_proj_weight"].append(
|
||||
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.down_proj.{ph}"
|
||||
)
|
||||
self.infer_to_train_mapping.update(expert_mappings)
|
||||
|
||||
moe_layer_start_index = self.fd_config.model_config.moe_layer_start_index
|
||||
if isinstance(moe_layer_start_index, int):
|
||||
@@ -235,19 +249,28 @@ class Ernie4_5_VLMoeForConditionalGenerationRL(Ernie4_5_VLMoeForConditionalGener
|
||||
else:
|
||||
text_moe_layer_end_index = moe_layer_end_index[0]
|
||||
image_moe_layer_end_index = moe_layer_end_index[1]
|
||||
|
||||
assert isinstance(self.fd_config.model_config.moe_num_experts, list)
|
||||
total_moe_num = sum(self.fd_config.model_config.moe_num_experts)
|
||||
if not trainer_degree:
|
||||
trainer_degree = self.fd_config.parallel_config.tensor_parallel_size
|
||||
expert_num_per_rank = self.fd_config.model_config.moe_num_experts[0] // trainer_degree
|
||||
# Process MoE layers
|
||||
for layer_idx in range(text_moe_layer_start_index, text_moe_layer_end_index):
|
||||
_add_layer_mappings(layer_idx, "text")
|
||||
_add_expert_mappings(layer_idx, "text", expert_start=0)
|
||||
for layer_idx in range(image_moe_layer_start_index, image_moe_layer_end_index):
|
||||
_add_layer_mappings(layer_idx, "image")
|
||||
_add_expert_mappings(layer_idx, "image", expert_start=expert_num_per_rank)
|
||||
|
||||
return infer_to_train
|
||||
self._complete_missing_mappings()
|
||||
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
|
||||
class Qwen2ForCausalLMRL(Qwen2ForCausalLM):
|
||||
class Qwen2ForCausalLMRL(Qwen2ForCausalLM, BaseRLModel):
|
||||
"""
|
||||
Qwen2ForCausalLMRL
|
||||
"""
|
||||
_get_tensor_parallel_mappings = Qwen2PretrainedModel._get_tensor_parallel_mappings
|
||||
|
||||
def __init__(self, fd_config: FDConfig):
|
||||
"""
|
||||
@@ -257,47 +280,39 @@ class Qwen2ForCausalLMRL(Qwen2ForCausalLM):
|
||||
super(Qwen2ForCausalLMRL, self).__init__(fd_config)
|
||||
|
||||
@classmethod
|
||||
def name(self):
|
||||
def name(self) -> str:
|
||||
"""name"""
|
||||
return "Qwen2ForCausalLMRL"
|
||||
|
||||
def get_name_mappings_to_training(self):
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
# Initialize mapping dictionary
|
||||
infer_to_train = {}
|
||||
|
||||
base_name = "qwen2"
|
||||
# Static mappings (non-layer specific)
|
||||
static_mappings = {
|
||||
f"{base_name}.embed_tokens.embeddings.weight":
|
||||
f"{base_name}.embed_tokens.weight",
|
||||
"lm_head.linear.weight": "lm_head.weight"
|
||||
}
|
||||
infer_to_train.update(static_mappings)
|
||||
|
||||
base_name = base_name + ".layers"
|
||||
|
||||
self._update_base_mappings("qwen2")
|
||||
base_name = "qwen2.layers"
|
||||
# Helper function to add layer mappings
|
||||
def _add_layer_mappings(layer_idx):
|
||||
# FFN mappings
|
||||
for ph in place_holders:
|
||||
infer_to_train[f"{base_name}.{layer_idx}.mlp.up_gate_proj.{ph}"] = \
|
||||
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.up_gate_proj.{ph}"] = \
|
||||
f"{base_name}.{layer_idx}.mlp.gate_up_fused_proj.{ph}"
|
||||
|
||||
for layer_idx in range(
|
||||
self.fd_config.model_config.num_hidden_layers):
|
||||
_add_layer_mappings(layer_idx)
|
||||
|
||||
return infer_to_train
|
||||
self._complete_missing_mappings()
|
||||
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
|
||||
class Qwen3MoeForCausalLMRL(Qwen3MoeForCausalLM):
|
||||
class Qwen3MoeForCausalLMRL(Qwen3MoeForCausalLM, BaseRLModel):
|
||||
"""
|
||||
Qwen3MoeForCausalLMRL
|
||||
"""
|
||||
_get_tensor_parallel_mappings = Qwen3MoePretrainedModel._get_tensor_parallel_mappings
|
||||
|
||||
def __init__(self, fd_config: FDConfig):
|
||||
"""
|
||||
@@ -307,37 +322,29 @@ class Qwen3MoeForCausalLMRL(Qwen3MoeForCausalLM):
|
||||
super(Qwen3MoeForCausalLMRL, self).__init__(fd_config)
|
||||
|
||||
@classmethod
|
||||
def name(self):
|
||||
def name(self) -> str:
|
||||
"""name"""
|
||||
return "Qwen3MoeForCausalLMRL"
|
||||
|
||||
def get_name_mappings_to_training(self):
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
"""Generate mapping between inference and training parameter for RL(donot delete!)."""
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
# Initialize mapping dictionary
|
||||
infer_to_train = {}
|
||||
self._update_base_mappings("model")
|
||||
self.infer_to_train_mapping = {}
|
||||
|
||||
base_name = "model"
|
||||
# Static mappings (non-layer specific)
|
||||
static_mappings = {
|
||||
f"{base_name}.embed_tokens.embeddings.weight":
|
||||
f"{base_name}.embed_tokens.weight",
|
||||
"lm_head.linear.weight": "lm_head.weight"
|
||||
}
|
||||
infer_to_train.update(static_mappings)
|
||||
|
||||
base_name = base_name + ".layers"
|
||||
base_name = "model.layers"
|
||||
|
||||
# Helper function to add layer mappings
|
||||
def _add_layer_mappings(layer_idx: int):
|
||||
# MoE specific mappings
|
||||
infer_to_train[f"{base_name}.{layer_idx}.mlp.gate_weight"] = \
|
||||
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.gate_weight"] = \
|
||||
f"{base_name}.{layer_idx}.mlp.gate.weight"
|
||||
|
||||
if self.fd_config.moe_config.moe_use_aux_free:
|
||||
infer_to_train[f"{base_name}.{layer_idx}.mlp.fused_moe.gate_correction_bias"] = \
|
||||
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.fused_moe.gate_correction_bias"] = \
|
||||
f"{base_name}.{layer_idx}.mlp.moe_statics.e_score_correction_bias"
|
||||
|
||||
# MoE experts mappings
|
||||
@@ -345,17 +352,17 @@ class Qwen3MoeForCausalLMRL(Qwen3MoeForCausalLM):
|
||||
for ph in place_holders:
|
||||
# up_gate_proj (up_gate_proj)
|
||||
up_gate_proj_key = f"{base_name}.{layer_idx}.mlp.up_gate_proj_weight"
|
||||
if up_gate_proj_key not in infer_to_train:
|
||||
infer_to_train[up_gate_proj_key] = []
|
||||
infer_to_train[up_gate_proj_key].append(
|
||||
if up_gate_proj_key not in self.infer_to_train_mapping:
|
||||
self.infer_to_train_mapping[up_gate_proj_key] = []
|
||||
self.infer_to_train_mapping[up_gate_proj_key].append(
|
||||
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.up_gate_proj.{ph}"
|
||||
)
|
||||
|
||||
# down_proj (down_proj)
|
||||
down_proj_key = f"{base_name}.{layer_idx}.mlp.down_proj_weight"
|
||||
if down_proj_key not in infer_to_train:
|
||||
infer_to_train[down_proj_key] = []
|
||||
infer_to_train[down_proj_key].append(
|
||||
if down_proj_key not in self.infer_to_train_mapping:
|
||||
self.infer_to_train_mapping[down_proj_key] = []
|
||||
self.infer_to_train_mapping[down_proj_key].append(
|
||||
f"{base_name}.{layer_idx}.mlp.experts.{expert_idx}.down_proj.{ph}"
|
||||
)
|
||||
|
||||
@@ -363,13 +370,16 @@ class Qwen3MoeForCausalLMRL(Qwen3MoeForCausalLM):
|
||||
for layer_idx in range(self.fd_config.model_config.num_hidden_layers):
|
||||
_add_layer_mappings(layer_idx)
|
||||
|
||||
return infer_to_train
|
||||
self._complete_missing_mappings()
|
||||
|
||||
return self.infer_to_train_mapping
|
||||
|
||||
|
||||
class Qwen3ForCausalLMRL(Qwen3ForCausalLM):
|
||||
class Qwen3ForCausalLMRL(Qwen3ForCausalLM, BaseRLModel):
|
||||
"""
|
||||
Qwen3ForCausalLMRL
|
||||
"""
|
||||
_get_tensor_parallel_mappings = Qwen3PretrainedModel._get_tensor_parallel_mappings
|
||||
|
||||
def __init__(self, fd_config: FDConfig):
|
||||
"""
|
||||
@@ -379,6 +389,28 @@ class Qwen3ForCausalLMRL(Qwen3ForCausalLM):
|
||||
super(Qwen3ForCausalLMRL, self).__init__(fd_config)
|
||||
|
||||
@classmethod
|
||||
def name(self):
|
||||
def name(self) -> str:
|
||||
"""name"""
|
||||
return "Qwen3ForCausalLMRL"
|
||||
|
||||
def get_name_mappings_to_training(self, trainer_degree=None) -> Dict[str, str]:
|
||||
# Prepare placeholders
|
||||
place_holders = ["weight"]
|
||||
|
||||
# Initialize mapping dictionary
|
||||
self._update_base_mappings("model")
|
||||
base_name = "model.layers"
|
||||
# Helper function to add layer mappings
|
||||
def _add_layer_mappings(layer_idx):
|
||||
# FFN mappings
|
||||
for ph in place_holders:
|
||||
self.infer_to_train_mapping[f"{base_name}.{layer_idx}.mlp.up_gate_proj.{ph}"] = \
|
||||
f"{base_name}.{layer_idx}.mlp.gate_up_fused_proj.{ph}"
|
||||
|
||||
for layer_idx in range(
|
||||
self.fd_config.model_config.num_hidden_layers):
|
||||
_add_layer_mappings(layer_idx)
|
||||
|
||||
self._complete_missing_mappings()
|
||||
|
||||
return self.infer_to_train_mapping
|
@@ -27,7 +27,8 @@ from datetime import datetime
|
||||
from logging.handlers import BaseRotatingHandler
|
||||
from pathlib import Path
|
||||
from typing import Literal, TypeVar, Union
|
||||
|
||||
import random
|
||||
import socket
|
||||
import requests
|
||||
import yaml
|
||||
from aistudio_sdk.snapshot_download import snapshot_download
|
||||
@@ -421,6 +422,19 @@ def get_host_ip():
|
||||
return ip
|
||||
|
||||
|
||||
|
||||
|
||||
def get_random_port():
|
||||
while True:
|
||||
port = random.randint(49152, 65535)
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
try:
|
||||
s.bind(("0.0.0.0", port))
|
||||
return port
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
|
||||
def is_port_available(host, port):
|
||||
"""
|
||||
Check the port is available
|
||||
|
@@ -47,14 +47,12 @@ from fastdeploy.platforms import current_platform
|
||||
if not current_platform.is_dcu():
|
||||
from fastdeploy.spec_decode import MTPProposer, NgramProposer
|
||||
|
||||
from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer
|
||||
from fastdeploy.input.mm_processor import DataProcessor
|
||||
from fastdeploy.model_executor.forward_meta import ForwardMeta
|
||||
from fastdeploy.model_executor.models.ernie4_5_vl.modeling_resampler import \
|
||||
ScatterOp
|
||||
from fastdeploy.worker.model_runner_base import ModelRunnerBase
|
||||
from fastdeploy.worker.output import ModelOutputData, ModelRunnerOutput
|
||||
from fastdeploy.worker.utils import check_safetensors_model
|
||||
|
||||
|
||||
class GPUModelRunner(ModelRunnerBase):
|
||||
@@ -81,16 +79,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
|
||||
# VL model config:
|
||||
if self.enable_mm:
|
||||
model_path = os.path.dirname(self.parallel_config.model_name_or_path)
|
||||
self.is_safetensors_model = check_safetensors_model(
|
||||
self.parallel_config.model_name_or_path)
|
||||
if not self.is_safetensors_model:
|
||||
self.tokenizer_path = self.image_preprocessor_path = model_path
|
||||
else:
|
||||
self.tokenizer_path = self.parallel_config.model_name_or_path
|
||||
self.image_preprocessor_path = self.parallel_config.model_name_or_path
|
||||
self.vision_model_name_or_path = os.path.join(
|
||||
model_path, "DFNRopeVisionTransformer")
|
||||
self._init_image_preprocess()
|
||||
|
||||
self.amp_black = [
|
||||
"reduce_sum",
|
||||
@@ -151,7 +140,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
"""
|
||||
Check whether prefill stage finished
|
||||
"""
|
||||
if int(paddle.max(self.share_inputs['seq_lens_encoder'])) != 0:
|
||||
if int(paddle.max(self.share_inputs["seq_lens_encoder"])) != 0:
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
@@ -227,12 +216,15 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
1] = request.prompt_token_ids[-1]
|
||||
self.share_inputs["input_ids"][idx:idx + 1,
|
||||
0] = request.prompt_token_ids[0]
|
||||
self.share_inputs["prompt_ids"][idx:idx + 1,
|
||||
:length] = np.array(request.prompt_token_ids)
|
||||
self.share_inputs['seq_lens_encoder'][idx:idx + 1] = 0
|
||||
self.share_inputs['seq_lens_decoder'][idx:idx + 1] = length
|
||||
self.share_inputs['seq_lens_this_time'][idx:idx + 1] = 1
|
||||
self.share_inputs['step_seq_lens_encoder'][idx:idx + 1] = 0
|
||||
self.share_inputs['step_seq_lens_decoder'][idx:idx +
|
||||
1] = length
|
||||
self.share_inputs["prompt_lens"][idx:idx + 1] = length
|
||||
self.share_inputs['step_idx'][idx:idx + 1] = 1
|
||||
|
||||
if self.speculative_decoding:
|
||||
@@ -247,6 +239,9 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["input_ids"][idx:idx +
|
||||
1, :length] = np.array(
|
||||
request.prompt_token_ids)
|
||||
self.share_inputs["prompt_ids"][idx:idx +
|
||||
1, :length] = np.array(
|
||||
request.prompt_token_ids)
|
||||
|
||||
# Use chunked prefill
|
||||
if self.parallel_config.enable_chunked_prefill:
|
||||
@@ -286,6 +281,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
idx:idx + 1] = token_chunk_size
|
||||
self.share_inputs['seq_lens_encoder'][idx:idx +
|
||||
1] = token_chunk_size
|
||||
self.share_inputs["prompt_lens"][idx:idx + 1] = token_chunk_size
|
||||
else:
|
||||
if self.enable_mm:
|
||||
inputs = self._preprocess_mm_task(request.multimodal_inputs)
|
||||
@@ -310,6 +306,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs['step_seq_lens_encoder'][idx:idx +
|
||||
1] = length
|
||||
self.share_inputs['seq_lens_encoder'][idx:idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx:idx + 1] = length
|
||||
|
||||
if self.enable_mm:
|
||||
enable_thinking = request.get("enable_thinking", True)
|
||||
@@ -408,6 +405,8 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["input_ids"][idx:idx +
|
||||
1, :input_length] = np.array(
|
||||
[5] * input_length)
|
||||
self.share_inputs["prompt_ids"][idx:idx + 1, :input_length] = np.array(
|
||||
[5] * input_length)
|
||||
self.share_inputs["eos_token_id"][:] = np.array(
|
||||
[2], dtype="int64").reshape(-1, 1)
|
||||
self.share_inputs["seq_lens_this_time"][idx:idx + 1] = input_length
|
||||
@@ -415,11 +414,11 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
1] = input_length
|
||||
self.share_inputs["seq_lens_encoder"][idx:idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_decoder"][idx:idx + 1] = 0
|
||||
self.share_inputs["prompt_lens"][idx:idx + 1] = 0
|
||||
self.share_inputs["step_idx"][idx:idx + 1] = 0
|
||||
self.share_inputs["max_dec_len"][idx:idx + 1] = max_dec_len
|
||||
self.share_inputs["min_dec_len"][idx:idx + 1] = max_dec_len
|
||||
self.share_inputs["stop_flags"][idx:idx + 1] = False
|
||||
self.share_inputs["top_p"][idx:idx + 1] = 0.0
|
||||
self.share_inputs["temperature"][idx:idx + 1] = 1
|
||||
|
||||
self.share_inputs["first_token_ids"][
|
||||
@@ -446,6 +445,10 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
[max_num_seqs, self.parallel_config.max_model_len],
|
||||
self.parallel_config.pad_token_id,
|
||||
dtype='int64')
|
||||
self.share_inputs["prompt_ids"] = paddle.full(
|
||||
[max_num_seqs, self.parallel_config.max_model_len],
|
||||
self.parallel_config.pad_token_id,
|
||||
dtype='int64')
|
||||
self.share_inputs["eos_token_id"] = paddle.full(
|
||||
[self.parallel_config.eos_tokens_lens, 1], 0, dtype='int64')
|
||||
self.share_inputs["top_p"] = paddle.full([max_num_seqs, 1],
|
||||
@@ -490,6 +493,9 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
[max_num_seqs, 1], 0, dtype='int32')
|
||||
self.share_inputs["step_seq_lens_decoder"] = paddle.full(
|
||||
[max_num_seqs, 1], 0, dtype='int32')
|
||||
self.share_inputs["prompt_lens"] = paddle.full([max_num_seqs, 1],
|
||||
0,
|
||||
dtype='int64')
|
||||
self.share_inputs["step_idx"] = paddle.full([max_num_seqs, 1],
|
||||
0,
|
||||
dtype='int64')
|
||||
@@ -699,6 +705,8 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
top_k=self.share_inputs["top_k"],
|
||||
step_idx=self.share_inputs["step_idx"],
|
||||
pre_token_ids=self.share_inputs["pre_ids"],
|
||||
prompt_ids=self.share_inputs["prompt_ids"],
|
||||
prompt_lens=self.share_inputs["prompt_lens"],
|
||||
frequency_penalties=self.share_inputs["frequency_score"],
|
||||
presence_penalties=self.share_inputs["presence_score"],
|
||||
repetition_penalties=self.share_inputs["penalty_score"],
|
||||
@@ -714,8 +722,6 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
f"Starting to load model {self.model_config.architectures[0]}")
|
||||
time_before_load = time.perf_counter()
|
||||
# 1. Load original model
|
||||
if self.enable_mm:
|
||||
self.load_mm_config_and_image_preprocess()
|
||||
self.model = get_model_from_loader(fd_config=self.fd_config)
|
||||
# 1.1 Load RL dynamic model
|
||||
if self.fd_config.load_config.dynamic_load_weight:
|
||||
@@ -1036,6 +1042,10 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["image_features"] = None
|
||||
token_chunk_size = inputs["input_ids"].shape[1]
|
||||
self.share_inputs["input_ids"][idx:idx + 1, :token_chunk_size] = inputs["input_ids"]
|
||||
self.share_inputs["prompt_ids"][
|
||||
idx:idx + 1,
|
||||
self.share_inputs["prompt_lens"][idx:idx + 1]: self.share_inputs["prompt_lens"][idx:idx + 1] + token_chunk_size
|
||||
] = inputs["input_ids"]
|
||||
self.share_inputs["seq_lens_decoder"][idx:idx +1] = task.start_idx
|
||||
task.start_idx += token_chunk_size
|
||||
else:
|
||||
@@ -1048,6 +1058,7 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
1] = token_chunk_size
|
||||
self.share_inputs['seq_lens_encoder'][idx:idx +
|
||||
1] = token_chunk_size
|
||||
self.share_inputs["prompt_lens"][idx:idx + 1] += token_chunk_size
|
||||
self.share_inputs["step_idx"][idx:idx + 1] = 0
|
||||
|
||||
if self.speculative_decoding and self.proposer.is_chunk_prefill_enabled(
|
||||
@@ -1415,8 +1426,8 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
|
||||
def _init_image_preprocess(self) -> None:
|
||||
processor = DataProcessor(
|
||||
tokenizer_name=self.tokenizer_path,
|
||||
image_preprocessor_name=str(self.image_preprocessor_path),
|
||||
tokenizer_name=self.parallel_config.model_name_or_path,
|
||||
image_preprocessor_name=str(self.parallel_config.model_name_or_path),
|
||||
)
|
||||
processor.eval()
|
||||
image_preprocess = processor.image_preprocessor
|
||||
@@ -1434,31 +1445,6 @@ class GPUModelRunner(ModelRunnerBase):
|
||||
-1)
|
||||
self.image_preprocess = image_preprocess
|
||||
|
||||
def load_mm_config_and_image_preprocess(self) -> None:
|
||||
tokenizer = ErnieBotTokenizer.from_pretrained(
|
||||
self.tokenizer_path,
|
||||
model_max_length=self.parallel_config.max_model_len,
|
||||
padding_side="right",
|
||||
use_fast=False,
|
||||
)
|
||||
tokenizer.ignored_index = -100
|
||||
if tokenizer.pad_token is None:
|
||||
tokenizer.pad_token = tokenizer.unk_token
|
||||
|
||||
self.fd_config.model_config.tensor_parallel_degree = self.parallel_config.tensor_parallel_size
|
||||
self.fd_config.model_config.tensor_parallel_rank = self.parallel_config.tensor_parallel_rank
|
||||
vision_config = self.fd_config.model_config.vision_config
|
||||
vision_config.dtype = self.fd_config.model_config.dtype
|
||||
vision_config.tensor_parallel_degree = self.parallel_config.tensor_parallel_size
|
||||
vision_config.tensor_parallel_rank = self.parallel_config.tensor_parallel_rank
|
||||
self.fd_config.model_config.im_patch_id = tokenizer.get_vocab()[
|
||||
"<|IMAGE_PLACEHOLDER|>"
|
||||
]
|
||||
self.fd_config.model_config.think_end_id = tokenizer.get_vocab()["</think>"]
|
||||
self.fd_config.model_config.sequence_parallel = self.parallel_config.sequence_parallel
|
||||
self.model_config = self.fd_config.model_config
|
||||
self._init_image_preprocess()
|
||||
|
||||
def _preprocess_mm_task(self, one: dict) -> None:
|
||||
"""process batch"""
|
||||
|
||||
|
@@ -23,6 +23,7 @@ import pynvml
|
||||
|
||||
from fastdeploy.config import FDConfig
|
||||
from fastdeploy.engine.request import Request
|
||||
from fastdeploy.platforms import current_platform
|
||||
from fastdeploy.utils import get_logger
|
||||
from fastdeploy.worker.gpu_model_runner import GPUModelRunner
|
||||
from fastdeploy.worker.output import ModelRunnerOutput
|
||||
@@ -50,11 +51,12 @@ class GpuWorker(WorkerBase):
|
||||
"""
|
||||
Initialize device and construct model runner
|
||||
"""
|
||||
self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
|
||||
if self.device_config.device_type == "cuda" and paddle.device.is_compiled_with_cuda(
|
||||
):
|
||||
# Set evironment variable
|
||||
self.device_ids = self.parallel_config.device_ids.split(",")
|
||||
self.device = f"gpu:{self.local_rank}"
|
||||
self.device = f"gpu:{self.local_rank % self.max_chips_per_node}"
|
||||
paddle.device.set_device(self.device)
|
||||
paddle.set_default_dtype(self.parallel_config.dtype)
|
||||
|
||||
@@ -72,7 +74,7 @@ class GpuWorker(WorkerBase):
|
||||
self.model_runner: GPUModelRunner = GPUModelRunner(
|
||||
fd_config=self.fd_config,
|
||||
device=self.device,
|
||||
device_id=self.device_ids[self.local_rank],
|
||||
device_id=self.device_ids[self.local_rank % self.max_chips_per_node],
|
||||
rank=self.rank,
|
||||
local_rank=self.local_rank)
|
||||
|
||||
|
@@ -174,6 +174,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.share_inputs['step_seq_lens_encoder'][idx:idx + 1] = 0
|
||||
self.share_inputs['step_seq_lens_decoder'][idx:idx +
|
||||
1] = length
|
||||
self.share_inputs["prompt_lens"][idx:idx + 1] = length
|
||||
self.share_inputs['step_idx'][idx:idx + 1] = 1
|
||||
|
||||
if self.speculative_decoding:
|
||||
@@ -208,6 +209,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
idx:idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs['step_seq_lens_decoder'][
|
||||
idx:idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
self.share_inputs["prompt_lens"][idx:idx + 1] = token_chunk_size
|
||||
else:
|
||||
self.share_inputs['seq_lens_decoder'][
|
||||
idx:idx + 1] = request.get("seq_lens_decoder", 0)
|
||||
@@ -218,6 +220,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.share_inputs['step_seq_lens_encoder'][idx:idx +
|
||||
1] = length
|
||||
self.share_inputs['seq_lens_encoder'][idx:idx + 1] = length
|
||||
self.share_inputs["prompt_lens"][idx:idx + 1] = length
|
||||
|
||||
if len(request.eos_token_ids
|
||||
) < self.parallel_config.eos_tokens_lens:
|
||||
@@ -290,6 +293,8 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
self.share_inputs["input_ids"][idx:idx +
|
||||
1, :input_length] = np.array(
|
||||
[5] * input_length)
|
||||
self.share_inputs["prompt_ids"][idx:idx + 1, :input_length] = np.array(
|
||||
[5] * input_length)
|
||||
self.share_inputs["eos_token_id"][:] = np.array(
|
||||
[2], dtype="int64").reshape(-1, 1)
|
||||
self.share_inputs["seq_lens_this_time"][idx:idx + 1] = input_length
|
||||
@@ -297,6 +302,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
1] = input_length
|
||||
self.share_inputs["seq_lens_encoder"][idx:idx + 1] = input_length
|
||||
self.share_inputs["seq_lens_decoder"][idx:idx + 1] = 0
|
||||
self.share_inputs["prompt_lens"][idx:idx + 1] = 0
|
||||
self.share_inputs["step_idx"][idx:idx + 1] = 0
|
||||
self.share_inputs["max_dec_len"][idx:idx + 1] = max_dec_len
|
||||
self.share_inputs["stop_flags"][idx:idx + 1] = False
|
||||
@@ -325,6 +331,10 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
[max_num_seqs, self.parallel_config.max_model_len],
|
||||
self.parallel_config.pad_token_id,
|
||||
dtype='int64')
|
||||
self.share_inputs["prompt_ids"] = paddle.full(
|
||||
[max_num_seqs, self.parallel_config.max_model_len],
|
||||
self.parallel_config.pad_token_id,
|
||||
dtype='int64')
|
||||
self.share_inputs["eos_token_id"] = paddle.full(
|
||||
[self.parallel_config.eos_tokens_lens, 1], 0, dtype='int64')
|
||||
self.share_inputs["top_p"] = paddle.full([max_num_seqs, 1],
|
||||
@@ -369,6 +379,9 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
[max_num_seqs, 1], 0, dtype='int32')
|
||||
self.share_inputs["step_seq_lens_decoder"] = paddle.full(
|
||||
[max_num_seqs, 1], 0, dtype='int32')
|
||||
self.share_inputs["prompt_lens"] = paddle.full([max_num_seqs, 1],
|
||||
0,
|
||||
dtype='int64')
|
||||
self.share_inputs["step_idx"] = paddle.full([max_num_seqs, 1],
|
||||
0,
|
||||
dtype='int64')
|
||||
@@ -563,6 +576,8 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
top_k=self.share_inputs["top_k"],
|
||||
step_idx=self.share_inputs["step_idx"],
|
||||
pre_token_ids=self.share_inputs["pre_ids"],
|
||||
prompt_ids=self.share_inputs["prompt_ids"],
|
||||
prompt_lens=self.share_inputs["prompt_lens"],
|
||||
frequency_penalties=self.share_inputs["frequency_score"],
|
||||
presence_penalties=self.share_inputs["presence_score"],
|
||||
repetition_penalties=self.share_inputs["penalty_score"],
|
||||
@@ -845,6 +860,7 @@ class IluvatarModelRunner(ModelRunnerBase):
|
||||
token_chunk_size])
|
||||
self.share_inputs['seq_lens_encoder'][idx:idx +
|
||||
1] = token_chunk_size
|
||||
self.share_inputs["prompt_lens"][idx:idx + 1] += token_chunk_size
|
||||
self.share_inputs["step_idx"][idx:idx + 1] = 0
|
||||
self.share_inputs["seq_lens_decoder"][
|
||||
idx:idx + 1] = start_idx + task.get("seq_lens_decoder", 0)
|
||||
|
@@ -23,9 +23,11 @@ import paddle
|
||||
import paddle.distributed as dist
|
||||
import paddle.distributed.fleet as fleet
|
||||
|
||||
from fastdeploy.config import (DecodingConfig, DeviceConfig, FDConfig,
|
||||
from fastdeploy.config import (DecodingConfig, DeviceConfig,
|
||||
ErnieArchitectures, FDConfig,
|
||||
GraphOptimizationConfig, LoadConfig,
|
||||
ModelConfig, ParallelConfig, SpeculativeConfig)
|
||||
from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer
|
||||
from fastdeploy.inter_communicator import EngineWorkerQueue as TaskQueue
|
||||
from fastdeploy.inter_communicator import IPCSignal
|
||||
from fastdeploy.model_executor.layers.quantization import \
|
||||
@@ -83,6 +85,30 @@ def init_distributed_environment(seed: int = 20) -> List[int]:
|
||||
|
||||
return ranks, local_rank
|
||||
|
||||
def update_fd_config_for_mm(fd_config: FDConfig) -> None:
|
||||
if fd_config.model_config.enable_mm:
|
||||
tokenizer = ErnieBotTokenizer.from_pretrained(
|
||||
fd_config.parallel_config.model_name_or_path,
|
||||
model_max_length=fd_config.parallel_config.max_model_len,
|
||||
padding_side="right",
|
||||
use_fast=False,
|
||||
)
|
||||
tokenizer.ignored_index = -100
|
||||
if tokenizer.pad_token is None:
|
||||
tokenizer.pad_token = tokenizer.unk_token
|
||||
|
||||
fd_config.model_config.tensor_parallel_degree = fd_config.parallel_config.tensor_parallel_size
|
||||
fd_config.model_config.tensor_parallel_rank = fd_config.parallel_config.tensor_parallel_rank
|
||||
vision_config = fd_config.model_config.vision_config
|
||||
vision_config.dtype = fd_config.model_config.dtype
|
||||
# vision_config.tensor_parallel_degree = fd_config.parallel_config.tensor_parallel_size
|
||||
# vision_config.tensor_parallel_rank = fd_config.parallel_config.tensor_parallel_rank
|
||||
fd_config.model_config.im_patch_id = tokenizer.get_vocab()[
|
||||
"<|IMAGE_PLACEHOLDER|>"
|
||||
]
|
||||
fd_config.model_config.think_end_id = tokenizer.get_vocab()["</think>"]
|
||||
fd_config.model_config.sequence_parallel = fd_config.parallel_config.sequence_parallel
|
||||
|
||||
class PaddleDisWorkerProc():
|
||||
"""
|
||||
Paddle Distrubuted wrapper for fastdeploy.worker.Worker,
|
||||
@@ -136,9 +162,9 @@ class PaddleDisWorkerProc():
|
||||
model_weights_status:
|
||||
"""
|
||||
# init worker_ready_signal
|
||||
max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
|
||||
self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
|
||||
array_size = min(
|
||||
max_chips_per_node, self.parallel_config.tensor_parallel_size *
|
||||
self.max_chips_per_node, self.parallel_config.tensor_parallel_size *
|
||||
self.parallel_config.expert_parallel_size)
|
||||
workers_ready = np.zeros(shape=[array_size], dtype=np.int32)
|
||||
self.worker_ready_signal = IPCSignal(
|
||||
@@ -148,10 +174,10 @@ class PaddleDisWorkerProc():
|
||||
suffix=self.parallel_config.engine_pid,
|
||||
create=False)
|
||||
self.worker_ready_signal.value[self.local_rank %
|
||||
max_chips_per_node] = 1
|
||||
self.max_chips_per_node] = 1
|
||||
|
||||
# init worker_healthy_live_signal
|
||||
workers_alive = np.zeros(shape=[self.ranks], dtype=np.int32)
|
||||
workers_alive = np.zeros(shape=[array_size], dtype=np.int32)
|
||||
self.worker_healthy_live_signal = IPCSignal(
|
||||
name="worker_healthy_live_signal",
|
||||
array=workers_alive,
|
||||
@@ -205,7 +231,7 @@ class PaddleDisWorkerProc():
|
||||
Tmp loop function for ep utill DP is supported
|
||||
"""
|
||||
while True:
|
||||
self.worker_healthy_live_signal.value[self.local_rank] = int(
|
||||
self.worker_healthy_live_signal.value[self.local_rank % self.max_chips_per_node] = int(
|
||||
time.time())
|
||||
|
||||
if self.fd_config.parallel_config.tensor_parallel_rank == 0 and self.task_queue.num_tasks(
|
||||
@@ -251,12 +277,12 @@ class PaddleDisWorkerProc():
|
||||
# The first worker detects whether there are tasks in the task queue
|
||||
if self.local_rank % mp_num_per_node == 0:
|
||||
if self.task_queue.num_tasks() > 0:
|
||||
if self.nnode > 1:
|
||||
self.task_queue.read_finish_flag.set(1)
|
||||
else:
|
||||
self.exist_task_signal.value[
|
||||
self.fd_config.parallel_config.
|
||||
expert_parallel_rank] = 1
|
||||
# VL only support 1 batch to prefill
|
||||
if not self.fd_config.model_config.enable_mm or not self.worker.prefill_finished():
|
||||
if self.nnode > 1:
|
||||
self.task_queue.read_finish_flag.set(1)
|
||||
else:
|
||||
self.exist_task_signal.value[self.fd_config.parallel_config.expert_parallel_rank] = 1
|
||||
|
||||
if self.parallel_config.tensor_parallel_size > 1:
|
||||
# Synchronize the signal for other workers
|
||||
@@ -306,10 +332,7 @@ class PaddleDisWorkerProc():
|
||||
# Execute model to generate token. The generated token will be written to the buffer.
|
||||
# These generated tokens can be obtained through get_output op.
|
||||
self.worker.execute_model(req_dicts)
|
||||
|
||||
self.exist_prefill_task_signal.value[
|
||||
0] = self.worker.prefill_finished()
|
||||
|
||||
self.exist_prefill_task_signal.value[0] = self.worker.prefill_finished()
|
||||
|
||||
def determine_num_available_blocks(self) -> None:
|
||||
"""Profiles the peak memory usage of the model to determine how many
|
||||
@@ -504,9 +527,9 @@ def parse_args():
|
||||
type=int,
|
||||
default=1,
|
||||
help="expert parallel size")
|
||||
parser.add_argument("--enable_expert_parallell",
|
||||
parser.add_argument("--enable_expert_parallel",
|
||||
action='store_true',
|
||||
help="enable expert parallell")
|
||||
help="enable expert parallel")
|
||||
parser.add_argument("--ori_vocab_size", type=int, default=None)
|
||||
|
||||
parser.add_argument("--quantization",
|
||||
@@ -517,7 +540,7 @@ def parse_args():
|
||||
"default is None. The priority of this configuration "\
|
||||
"is lower than that of the config file. " \
|
||||
"More complex quantization methods need to be configured via the config file.")
|
||||
parser.add_argument("--graph_optimiaztion_config",
|
||||
parser.add_argument("--graph_optimization_config",
|
||||
type=json.loads,
|
||||
default=None,
|
||||
help=" Configation of Graph optimization backend. "
|
||||
@@ -541,9 +564,8 @@ def parse_args():
|
||||
"'ipc': real-time IPC streaming with automatic resharding, "
|
||||
"'ipc_snapshot': load from disk snapshot of IPC weights.")
|
||||
parser.add_argument("--enable_mm",
|
||||
type=str,
|
||||
default="false",
|
||||
help="Whether to use vl")
|
||||
action='store_true',
|
||||
help="Whether to enable vl model")
|
||||
parser.add_argument("--enable_logprob",
|
||||
action='store_true',
|
||||
help="Enable output of token-level log probabilities.")
|
||||
@@ -572,11 +594,13 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
|
||||
parallel_config.expert_parallel_rank = int(local_rank / ranks)
|
||||
load_config = LoadConfig(vars(args))
|
||||
|
||||
graph_opt_config = GraphOptimizationConfig(
|
||||
use_cudagraph=args.graph_optimiaztion_config["use_cudagraph"],
|
||||
graph_opt_level=args.graph_optimiaztion_config["graph_opt_level"],
|
||||
cudagraph_capture_sizes=args.graph_optimiaztion_config["cudagraph_capture_sizes"]
|
||||
)
|
||||
graph_opt_config = GraphOptimizationConfig()
|
||||
if args.graph_optimization_config is not None:
|
||||
graph_opt_config = GraphOptimizationConfig(
|
||||
use_cudagraph=args.graph_optimization_config["use_cudagraph"],
|
||||
graph_opt_level=args.graph_optimization_config["graph_opt_level"],
|
||||
cudagraph_capture_sizes=args.graph_optimization_config["cudagraph_capture_sizes"]
|
||||
)
|
||||
|
||||
# Note(tangbinhan): used for load_checkpoint
|
||||
model_config.pretrained_config.tensor_parallel_rank = parallel_config.tensor_parallel_rank
|
||||
@@ -615,9 +639,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
|
||||
quant_config_name = args.quantization
|
||||
quantization_config["quantization"] = quant_config_name
|
||||
# Special handling for Ernie models
|
||||
is_ernie = "Ernie4_5_ForCausalLM" in model_config.architectures or \
|
||||
"Ernie4_5_MoeForCausalLM" in model_config.architectures or \
|
||||
"Ernie4_5_VLMoeForConditionalGeneration" in model_config.architectures
|
||||
is_ernie = ErnieArchitectures.contains_ernie_arch(model_config.architectures)
|
||||
if quant_config_name == "wint4" and is_ernie:
|
||||
quantization_config["dense_quant_type"] = "wint8"
|
||||
quantization_config["moe_quant_type"] = "wint4"
|
||||
@@ -650,7 +672,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
|
||||
)
|
||||
|
||||
# Set VL tag
|
||||
model_config.enable_mm = getattr(args, 'enable_mm', 'false').lower() == 'true'
|
||||
model_config.enable_mm = args.enable_mm
|
||||
logger.info(f"- Dynamic load weight: {load_config.dynamic_load_weight}")
|
||||
logger.info(f"- Load strategy: {load_config.load_strategy}")
|
||||
|
||||
@@ -662,6 +684,7 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig:
|
||||
decoding_config=decoding_config,
|
||||
quant_config=quant_config,
|
||||
graph_opt_config=graph_opt_config)
|
||||
update_fd_config_for_mm(fd_config)
|
||||
|
||||
return fd_config
|
||||
|
||||
|
@@ -29,9 +29,11 @@ triton==3.3
|
||||
use-triton-in-paddle
|
||||
crcmod
|
||||
fastsafetensors==0.1.14
|
||||
msgpack
|
||||
opentelemetry-api>=1.24.0
|
||||
opentelemetry-sdk>=1.24.0
|
||||
opentelemetry-instrumentation-redis
|
||||
opentelemetry-instrumentation-mysql
|
||||
opentelemetry-distro
|
||||
opentelemetry-exporter-otlp
|
||||
opentelemetry-instrumentation-fastapi
|
@@ -27,3 +27,4 @@ moviepy
|
||||
use-triton-in-paddle
|
||||
crcmod
|
||||
fastsafetensors==0.1.14
|
||||
msgpack
|
@@ -27,3 +27,4 @@ moviepy
|
||||
use-triton-in-paddle
|
||||
crcmod
|
||||
fastsafetensors==0.1.14
|
||||
msgpack
|
||||
|
2
setup.py
2
setup.py
@@ -193,7 +193,7 @@ def get_name():
|
||||
|
||||
cmdclass_dict = {'bdist_wheel': CustomBdistWheel}
|
||||
cmdclass_dict['build_ext'] = CMakeBuild
|
||||
FASTDEPLOY_VERSION = os.environ.get("FASTDEPLOY_VERSION", "2.0.0-dev")
|
||||
FASTDEPLOY_VERSION = os.environ.get("FASTDEPLOY_VERSION", "2.0.3")
|
||||
cmdclass_dict['build_optl'] = PostInstallCommand
|
||||
|
||||
setup(
|
||||
|
@@ -313,4 +313,66 @@ def test_streaming(openai_client, capsys):
|
||||
output = []
|
||||
for chunk in response:
|
||||
output.append(chunk.choices[0].text)
|
||||
assert len(output) > 0
|
||||
assert len(output) > 0
|
||||
|
||||
def test_non_streaming_with_stop_str(openai_client):
|
||||
"""
|
||||
Test non-streaming chat functionality with the local service
|
||||
"""
|
||||
response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[{"role": "user", "content": "Hello, how are you?"}],
|
||||
temperature=1,
|
||||
max_tokens=5,
|
||||
metadata={"include_stop_str_in_output": True},
|
||||
stream=False,
|
||||
)
|
||||
# Assertions to check the response structure
|
||||
assert hasattr(response, 'choices')
|
||||
assert len(response.choices) > 0
|
||||
assert response.choices[0].message.content.endswith("</s>")
|
||||
|
||||
response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[{"role": "user", "content": "Hello, how are you?"}],
|
||||
temperature=1,
|
||||
max_tokens=5,
|
||||
metadata={"include_stop_str_in_output": False},
|
||||
stream=False,
|
||||
)
|
||||
# Assertions to check the response structure
|
||||
assert hasattr(response, 'choices')
|
||||
assert len(response.choices) > 0
|
||||
assert not response.choices[0].message.content.endswith("</s>")
|
||||
|
||||
def test_streaming_with_stop_str(openai_client):
|
||||
"""
|
||||
Test non-streaming chat functionality with the local service
|
||||
"""
|
||||
response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[{"role": "user", "content": "Hello, how are you?"}],
|
||||
temperature=1,
|
||||
max_tokens=5,
|
||||
metadata={"include_stop_str_in_output": True},
|
||||
stream=True,
|
||||
)
|
||||
# Assertions to check the response structure
|
||||
last_token = ""
|
||||
for chunk in response:
|
||||
last_token = chunk.choices[0].delta.content
|
||||
assert last_token == "</s>"
|
||||
|
||||
response = openai_client.chat.completions.create(
|
||||
model="default",
|
||||
messages=[{"role": "user", "content": "Hello, how are you?"}],
|
||||
temperature=1,
|
||||
max_tokens=5,
|
||||
metadata={"include_stop_str_in_output": False},
|
||||
stream=True,
|
||||
)
|
||||
# Assertions to check the response structure
|
||||
last_token = ""
|
||||
for chunk in response:
|
||||
last_token = chunk.choices[0].delta.content
|
||||
assert last_token != "</s>"
|
||||
|
@@ -57,6 +57,12 @@ def _create_default_sampling_metadata(
|
||||
top_p=paddle.full(shape=[batch_size, 1],
|
||||
fill_value=0.7,
|
||||
dtype="float32"),
|
||||
prompt_ids=paddle.full(shape=[batch_size, max_seq_len],
|
||||
fill_value=0,
|
||||
dtype="int64"),
|
||||
prompt_lens=paddle.full(shape=[batch_size, 1],
|
||||
fill_value=5,
|
||||
dtype="int64"),
|
||||
step_idx=paddle.full(shape=[batch_size, 1],
|
||||
fill_value=0,
|
||||
dtype="int64"),
|
||||
|
142
test/operators/test_get_token_penalty_multi_scores.py
Normal file
142
test/operators/test_get_token_penalty_multi_scores.py
Normal file
@@ -0,0 +1,142 @@
|
||||
# Copyright (c) 2025PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
""" UT for air_topp_sampling kernel """
|
||||
|
||||
import copy
|
||||
import unittest
|
||||
|
||||
import numpy as np
|
||||
import paddle
|
||||
|
||||
|
||||
class Test(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
"""
|
||||
Initialize.
|
||||
"""
|
||||
self.num_seqs = 4
|
||||
self.max_model_len = 32768
|
||||
self.vocab_size = 103424
|
||||
|
||||
# prompt token
|
||||
prompt_ids = paddle.full(shape=[self.num_seqs, self.max_model_len], fill_value=0, dtype='int64')
|
||||
prompt_lens = paddle.randint(low=0, high=100, shape=[self.num_seqs, 1], dtype='int64')
|
||||
fake_tokens = paddle.randint(low=3, high=self.vocab_size, shape=[self.num_seqs, self.max_model_len], dtype='int64')
|
||||
for i in range(self.num_seqs):
|
||||
prompt_ids[i, :prompt_lens[i]] = fake_tokens[i, :prompt_lens[i]]
|
||||
|
||||
# generated token
|
||||
pre_ids = paddle.full(shape=[self.num_seqs, self.max_model_len], fill_value=-1, dtype='int64')
|
||||
step_idx = paddle.randint(low=0, high=100, shape=[self.num_seqs, 1], dtype='int64')
|
||||
fake_tokens = paddle.randint(low=3, high=self.vocab_size, shape=[self.num_seqs, self.max_model_len], dtype='int64')
|
||||
for i in range(self.num_seqs):
|
||||
pre_ids[i, :step_idx[i]] = fake_tokens[i, :step_idx[i]]
|
||||
|
||||
logits = paddle.randn([self.num_seqs, self.vocab_size]).cast("float32")
|
||||
|
||||
penalty_score = paddle.ones([self.num_seqs, 1]) * 1.05
|
||||
frequency_score = paddle.ones([self.num_seqs, 1]) * 0.5
|
||||
presence_score = paddle.ones([self.num_seqs, 1]) * 0.3
|
||||
temperature = paddle.ones([self.num_seqs, 1]) * 0.8
|
||||
|
||||
bad_tokens = paddle.to_tensor([[-1]]).cast("int64")
|
||||
min_dec_len = paddle.ones([self.num_seqs, 1]).cast("int64")
|
||||
eos_token_id = paddle.to_tensor([[2]]).cast("int64")
|
||||
|
||||
self.input_data = {
|
||||
"prompt_ids": prompt_ids,
|
||||
"prompt_lens": prompt_lens,
|
||||
"pre_ids": pre_ids,
|
||||
"step_idx": step_idx,
|
||||
"logits": logits,
|
||||
"bad_tokens": bad_tokens,
|
||||
"min_dec_len": min_dec_len,
|
||||
"eos_token_id": eos_token_id,
|
||||
"penalty_score": penalty_score,
|
||||
"frequency_score": frequency_score,
|
||||
"presence_score": presence_score,
|
||||
"temperature": temperature
|
||||
}
|
||||
|
||||
def get_token_penalty_multi_scores_baseline(self):
|
||||
input_data = copy.deepcopy(self.input_data)
|
||||
logits = input_data["logits"]
|
||||
penalty_score = input_data["penalty_score"]
|
||||
frequency_score = input_data["frequency_score"]
|
||||
presence_score = input_data["presence_score"]
|
||||
temperature = input_data["temperature"]
|
||||
|
||||
# min token penalties
|
||||
mask = input_data["step_idx"] < input_data["min_dec_len"]
|
||||
for bi, flag in enumerate(mask):
|
||||
if flag:
|
||||
logits[bi, input_data["eos_token_id"]] = -1e10
|
||||
|
||||
# bad words exclusion
|
||||
for token in input_data["bad_tokens"]:
|
||||
if token < 0 or token > self.vocab_size:
|
||||
continue
|
||||
logits[:, token] = -1e10
|
||||
# all penalties
|
||||
prompt_ids = input_data["prompt_ids"]
|
||||
for i in range(self.num_seqs):
|
||||
prompt_ids[i, input_data["prompt_lens"][i]:] = -1
|
||||
prompt_repeat_times = paddle.zeros([self.num_seqs, self.vocab_size + 1]).cast("int64")
|
||||
prompt_repeat_times = paddle.put_along_axis(prompt_repeat_times, prompt_ids, paddle.ones_like(input_data["pre_ids"]), axis=1, reduce="add")
|
||||
prompt_repeat_times = prompt_repeat_times[:, :self.vocab_size]
|
||||
prompt_mask = prompt_repeat_times > 0
|
||||
|
||||
pre_ids = input_data["pre_ids"]
|
||||
pre_ids[pre_ids == -1] = self.vocab_size
|
||||
out_repeat_times = paddle.zeros([self.num_seqs, self.vocab_size + 1]).cast("int64")
|
||||
out_repeat_times = paddle.put_along_axis(out_repeat_times, pre_ids, paddle.ones_like(input_data["pre_ids"]), axis=1, reduce="add")
|
||||
out_repeat_times = out_repeat_times[:, :self.vocab_size]
|
||||
output_mask = out_repeat_times > 0
|
||||
|
||||
penalty_score = penalty_score.tile(self.vocab_size)
|
||||
logits[logits > 0] /= paddle.where(output_mask | prompt_mask, penalty_score, 1.0)[logits > 0]
|
||||
logits[logits <= 0] *= paddle.where(output_mask | prompt_mask, penalty_score, 1.0)[logits <= 0]
|
||||
logits -= frequency_score * out_repeat_times.cast("float32")
|
||||
logits -= presence_score * output_mask.cast("float32")
|
||||
|
||||
# temperature
|
||||
logits /= temperature
|
||||
return logits
|
||||
|
||||
|
||||
def test_penalty_op(self):
|
||||
"""
|
||||
"""
|
||||
baseline_out = self.get_token_penalty_multi_scores_baseline()
|
||||
from fastdeploy.model_executor.ops.gpu import \
|
||||
get_token_penalty_multi_scores
|
||||
logits = get_token_penalty_multi_scores(
|
||||
self.input_data["pre_ids"],
|
||||
self.input_data["prompt_ids"],
|
||||
self.input_data["prompt_lens"],
|
||||
self.input_data["logits"],
|
||||
self.input_data["penalty_score"],
|
||||
self.input_data["frequency_score"],
|
||||
self.input_data["presence_score"],
|
||||
self.input_data["temperature"],
|
||||
self.input_data["bad_tokens"],
|
||||
self.input_data["step_idx"],
|
||||
self.input_data["min_dec_len"],
|
||||
self.input_data["eos_token_id"])
|
||||
np.testing.assert_allclose(baseline_out.numpy(), logits.numpy(), rtol=1e-04, atol=1e-04)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Reference in New Issue
Block a user